Menu

C之并发工具类,相关整理



此篇博客所有源码均来自JDK 1.8

文前说明

作为码农中的一员,需要不断的学习,我工作之余将一些分析总结和学习笔记写成博客与大家一起交流,也希望采用这种方式记录自己的学习之旅。

本文仅供学习交流使用,侵权必删。不用于商业目的,转载请注明出处。

前面三篇博客分别介绍了CyclicBarrier、CountDownLatch、Semaphore,现在介绍并发工具类中的最后一个Exchange。Exchange是最简单的也是最复杂的,简单在于API非常简单,就一个构造方法和两个exchange()方法,最复杂在于它的实现是最复杂的。在API是这么介绍的:可以在对中对元素进行配对和交换的线程的同步点。每个线程将条目上的某个方法呈现给
exchange
方法,与伙伴线程进行匹配,并且在返回时接收其伙伴的对象。Exchanger
可能被视为 SynchronousQueue 的双向形式。Exchanger
可能在应用程序(比如遗传算法和管道设计)中很有用。

  • Exchanger是自 JDK 1.5 起开始提供的工具套件,源于
    java.util.concurrent 包。

    • 是一个用于线程间协作的工具类。
    • Exchanger
      用于进行线程间的数据交换。它提供一个同步点,在这个同步点两个线程可以交换彼此的数据。
  • 此类提供对外的操作是同步的。
  • 用于 成对 出现的线程之间交换数据。
  • 可以视作双向的同步队列。
  • 可应用于基因算法、流水线设计等场景。

Exchanger,它允许在并发任务之间交换数据。具体来说,Exchanger类允许在两个线程之间定义同步点。当两个线程都到达同步点时,他们交换数据结构,因此第一个线程的数据结构进入到第二个线程中,第二个线程的数据结构进入到第一个线程中。

Exchange实现较为复杂,我们先看其怎么使用,然后再来分析其源码。现在我们用Exchange来模拟生产-消费者问题:

  • Exchanger 用于进行线程间的数据交换。
    • 它提供一个同步点,在这个同步点两个线程可以交换彼此的数据。
    • 两个线程通过 exchange() 方法交换数据, 如果第一个线程先执行
      exchange() 方法,会一直等待第二个线程也执行
      exchange(),当两个线程都到达同步点时,两个线程交换数据,将本线程生产出来的数据传递给对方。
    • 使用 Exchanger 的重点是成对的线程使用 exchange() 方法。
  • 这个类提供一个无参构造函数,两个重载的范型 exchange() 方法。
public class ExchangerTest { static class Producer implements Runnable{ //生产者、消费者交换的数据结构 private List<String> buffer; //步生产者和消费者的交换对象 private Exchanger<List<String>> exchanger; Producer(List<String> buffer,Exchanger<List<String>> exchanger){ this.buffer = buffer; this.exchanger = exchanger; } @Override public void run() { for(int i = 1 ; i < 5 ; i++){ System.out.println("生产者第" + i + "次提供"); for(int j = 1 ; j <= 3 ; j++){ System.out.println("生产者装入" + i + "--" + j); buffer.add("buffer:" + i + "--" + j); } System.out.println("生产者装满,等待与消费者交换..."); try { exchanger.exchange; } catch (InterruptedException e) { e.printStackTrace(); } } } } static class Consumer implements Runnable { private List<String> buffer; private final Exchanger<List<String>> exchanger; public Consumer(List<String> buffer, Exchanger<List<String>> exchanger) { this.buffer = buffer; this.exchanger = exchanger; } @Override public void run() { for (int i = 1; i < 5; i++) { //调用exchange()与消费者进行数据交换 try { buffer = exchanger.exchange; } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("消费者第" + i + "次提取"); for (int j = 1; j <= 3 ; j++) { System.out.println("消费者 : " + buffer.get; buffer.remove; } } } } public static void main(String[] args){ List<String> buffer1 = new ArrayList<String>(); List<String> buffer2 = new ArrayList<String>(); Exchanger<List<String>> exchanger = new Exchanger<List<String>>(); Thread producerThread = new Thread(new Producer(buffer1,exchanger)); Thread consumerThread = new Thread(new Consumer(buffer2,exchanger)); producerThread.start(); consumerThread.start(); }}

运行结果:

public V exchange throws InterruptedExceptionpublic V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException

图片 1Exchanger

  • 在 Exchanger 中,如果一个线程已经到达了 exchanger()
    时,对于其伙伴结点的情况分为三种。

    • 如果伙伴结点在该线程到达之前已经调用了 exchanger()
      方法,则唤醒该伙伴结点然后进行数据交换,得到各自数据返回。
    • 如果伙伴结点还没有到达交换点,则该线程被挂起,等待伙伴结点到达后被唤醒,完成数据交换。
    • 如果当前线程被中断了则抛出异常,或者等待超时,则抛出超时异常。
  • Exchanger
    有单槽位和多槽位之分,单个槽位在同一时刻只能用于两个线程交换数据,这样在竞争比较激烈的时候,会影响到性能,多个槽位就是多个线程可以同时进行两个的数据交换,彼此之间不受影响,这样可以很好的提高吞吐量。

首先生产者Producer、消费者Consumer首先都创建一个缓冲列表,通过Exchanger来同步交换数据。消费中通过调用Exchanger与生产者进行同步来获取数据,而生产者则通过for循环向缓存队列存储数据并使用exchanger对象消费者同步。到消费者从exchanger哪里得到数据后,他的缓冲列表中有3个数据,而生产者得到的则是一个空的列表。上面的例子充分展示了消费者-生产者是如何利用Exchanger来完成数据交换的。

数据结构

在Exchanger中,如果一个线程已经到达了exchanger节点时,对于它的伙伴节点的情况有三种:

@sun.misc.Contended static final class Node { int index; // arena的下标,多个槽位的时候利用 int bound; // 上一次记录的Exchanger.bound; int collides; // 在当前bound下CAS失败的次数; int hash; // 用于自旋; Object item; // 这个线程的当前项,也就是需要交换的数据; volatile Object match; // 交换的数据 volatile Thread parked; // 线程}/** * Value representing null arguments/returns from public * methods. Needed because the API originally didn't disallow null * arguments, which it should have. * 如果交换的数据为 null,则用NULL_ITEM 代替 */private static final Object NULL_ITEM = new Object();
  1. 如果它的伙伴节点在该线程到达之前已经调用了exchanger方法,则它会唤醒它的伙伴然后进行数据交换,得到各自数据返回。
  2. 如果它的伙伴节点还没有到达交换点,则该线程将会被挂起,等待它的伙伴节点到达被唤醒,完成数据交换。
  3. 如果当前线程被中断了则抛出异常,或者等待超时了,则抛出超时异常。
  • Node 定义中,index,bound,collides 用于多槽位。
  • item 是当前线程需要交换的数据。
  • match 是和其它线程交换后的数据,初始为 null。
  • parked 是记录线程,用于阻塞和唤醒线程。

Exchanger算法的核心是通过一个可交换数据的slot,以及一个可以带有数据item的参与者。源码中的描述如下:

 for  { if (slot is empty) { // offer place item in a Node; if (can CAS slot from empty to node) { wait for release; return matching item in node; } } else if (can CAS slot from node to empty) { // release get the item in node; set matching item in node; release waiting thread; } // else retry on CAS failure }
  • Node 是每个线程自身用于数据交换的结点,每个 Node
    就代表了每个线程,为了保证线程安全,把线程的 Node 结点放在
    ThreadLocal。
  • slot 为单槽。

Exchanger中定义了如下几个重要的成员变量:

private final Participant participant;private volatile Node[] arena;private volatile Node slot;
/** The number of CPUs, for sizing and spin control */private static final int NCPU = Runtime.getRuntime().availableProcessors();/** * The bound for spins while waiting for a match. The actual * number of iterations will on average be about twice this value * due to randomization. Note: Spinning is disabled when NCPU==1. */private static final int SPINS = 1 << 10; // 自旋次数/** * Slot used until contention detected. */private volatile Node slot; // 用于交换数据的槽位/** * Per-thread state 每个线程的数据,ThreadLocal 子类 */private final Participant participant;/** The corresponding thread local class */ static final class Participant extends ThreadLocal<Node> { // 初始值返回Node public Node initialValue() { return new Node(); } }

participant的作用是为每个线程保留唯一的一个Node节点。

exchange 方法

slot为单个槽,arena为数组槽。他们都是Node类型。在这里可能会感觉到疑惑,slot作为Exchanger交换数据的场景,应该只需要一个就可以了啊?为何还多了一个Participant
和数组类型的arena呢?一个slot交换场所原则上来说应该是可以的,但实际情况却不是如此,多个参与者使用同一个交换场所时,会存在严重伸缩性问题。既然单个交换场所存在问题,那么我们就安排多个,也就是数组arena。通过数组arena来安排不同的线程使用不同的slot来降低竞争问题,并且可以保证最终一定会成对交换数据。但是Exchanger不是一来就会生成arena数组来降低竞争,只有当产生竞争是才会生成arena数组。那么怎么将Node与当前线程绑定呢?Participant
,Participant
的作用就是为每个线程保留唯一的一个Node节点,它继承ThreadLocal,同时在Node节点中记录在arena中的下标index。

  • 等待另一个线程到达此交换点(除非当前线程被中断),然后将给定的对象传送给该线程,并接收该线程的对象。

Node定义如下:

没有设定超时时间的 exchange 方法

 @sun.misc.Contended static final class Node { int index; // Arena index int bound; // Last recorded value of Exchanger.bound int collides; // Number of CAS failures at current bound int hash; // Pseudo-random for spins Object item; // This thread's current item volatile Object match; // Item provided by releasing thread volatile Thread parked; // Set to this thread when parked, else null }
public V exchange throws InterruptedException { Object v; Object item = (x == null) ? NULL_ITEM : x; // 转换成空对象 // arena == null, 路由到slotExchange, 如果arena != null或者单槽交换失败,且线程没有被中断,则路由到arenaExchange,返回null,则抛出中断异常 if ((arena != null || (v = slotExchange(item, false, 0L)) == null) && ((Thread.interrupted() || (v = arenaExchange(item, false, 0L)) == null))) throw new InterruptedException(); return (v == NULL_ITEM) ? null :  v;}
  • index:arena的下标;
  • bound:上一次记录的Exchanger.bound;
  • collides:在当前bound下CAS失败的次数;
  • hash:伪随机数,用于自旋;
  • item:这个线程的当前项,也就是需要交换的数据;
  • match:做releasing操作的线程传递的项;
  • parked:挂起时设置线程值,其他情况下为null;
  • arena 为多槽位,如果为 null,则执行 slotExchange()
    单槽方法,否则判断线程是否中断,如果中断值抛出 InterruptedException
    异常,没有中断则执行 arenaExchange() 多槽方法,如果该方法返回
    null,抛出中断异常,最后返回结果。

在Node定义中有两个变量值得思考:bound以及collides。前面提到了数组area是为了避免竞争而产生的,如果系统不存在竞争问题,那么完全没有必要开辟一个高效的arena来徒增系统的复杂性。首先通过单个slot的exchanger来交换数据,当探测到竞争时将安排不同的位置的slot来保存线程Node,并且可以确保没有slot会在同一个缓存行上。如何来判断会有竞争呢?CAS替换slot失败,如果失败,则通过记录冲突次数来扩展arena的尺寸,我们在记录冲突的过程中会跟踪“bound”的值,以及会重新计算冲突次数在bound的值被改变时。这里阐述可能有点儿模糊,不着急,我们先有这个概念,后面在arenaExchange中再次做详细阐述。

具有超时功能的 exchange 方法

我们直接看exchange()方法

public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { Object v; Object item = (x == null) ? NULL_ITEM : x;// 转换成空对象 long ns = unit.toNanos; // arena == null, 路由到slotExchange, 如果arena != null或者单槽交换失败,且线程没有被中断,则路由到arenaExchange,返回null,则抛出中断异常 if ((arena != null || (v = slotExchange(item, true, ns)) == null) && ((Thread.interrupted() || (v = arenaExchange(item, true, ns)) == null))) throw new InterruptedException(); if (v == TIMED_OUT)// 超时 throw new TimeoutException(); return (v == NULL_ITEM) ? null :  v;}

exchange:等待另一个线程到达此交换点(除非当前线程被中断),然后将给定的对象传送给该线程,并接收该线程的对象。方法定义如下:

  • 增加超时的判断。
 public V exchange throws InterruptedException { Object v; Object item = (x == null) ? NULL_ITEM : x; // translate null args if ((arena != null || (v = slotExchange(item, false, 0L)) == null) && ((Thread.interrupted() || // disambiguates null return (v = arenaExchange(item, false, 0L)) == null))) throw new InterruptedException(); return (v == NULL_ITEM) ? null : v; }

slotExchange 方法

这个方法比较好理解:arena为数组槽,如果为null,则执行slotExchange()方法,否则判断线程是否中断,如果中断值抛出InterruptedException异常,没有中断则执行arenaExchange()方法。整套逻辑就是:如果slotExchange(Object
item, boolean timed, long ns)方法执行失败了就执行arenaExchange(Object
item, boolean timed, long ns)方法,最后返回结果V。

private final Object slotExchange(Object item, boolean timed, long ns) { Node p = participant.get(); // 获取当前线程携带的Node Thread t = Thread.currentThread(); // 当前线程 if (t.isInterrupted // 保留中断状态,以便调用者可以重新检查,Thread.interrupted() 会清除中断状态标记 return null; for  { if ( != null) { // slot不为null, 说明已经有线程在这里等待了 if (U.compareAndSwapObject(this, SLOT, q, null)) { // 将slot重新设置为null, CAS操作 Object v = q.item; // 取出等待线程携带的数据 q.match = item; // 将当前线程的携带的数据交给等待线程 Thread w = q.parked; // 可能存在的等待线程 if (w != null) U.unpark; // 唤醒等待线程 return v; // 返回结果,交易成功 } // CPU的个数多于1个,并且bound为0时创建 arena,并将bound设置为SEQ大小 if (NCPU > 1 && bound == 0 && U.compareAndSwapInt(this, BOUND, 0, SEQ)) arena = new Node[ << ASHIFT]; // 根据CPU的个数估计Node的数量 } else if (arena != null) return null; // 如果slot为null, 但arena不为null, 则转而路由到arenaExchange方法 else { // 最后一种情况,说明当前线程先到,则占用此slot p.item = item; // 将携带的数据卸下,等待别的线程来交易 if (U.compareAndSwapObject(this, SLOT, null, p)) // 将slot的设为当前线程携带的Node break; // 成功则跳出循环 p.item = null; // 失败,将数据清除,继续循环 } } // 当前线程等待被释放, spin -> yield -> block/cancel int h = p.hash; // 伪随机,用于自旋 long end = timed ? System.nanoTime() + ns : 0L; // 如果timed为true,等待超时的时间点; 0表示没有设置超时 int spins = (NCPU > 1) ? SPINS : 1; // 自旋次数 Object v; while ((v = p.match) == null) { // 一直循环,直到有线程来交易 if (spins > 0) { // 自旋,直至spins不大于0 h ^= h << 1; // 伪随机算法, 目的是等h小于0 h ^= h >>> 3; h ^= h << 10; if  // 初始值 h = SPINS |  t.getId(); else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0) Thread.yield(); // 等到h < 0, 而spins的低9位也为0(防止spins过大,CPU空转过久),让出CPU时间片,每一次等待有两次让出CPU的时机(SPINS >>> 1) } else if (slot != p) // 别的线程已经到来,正在准备数据,自旋等待一会儿,马上就好 spins = SPINS; // 如果线程没被中断,且arena还没被创建,并且没有超时 else if (!t.isInterrupted() && arena == null && (!timed || (ns = end - System.nanoTime > 0L)) { U.putObject(t, BLOCKER, this); // 设置当前线程将阻塞在当前对象上 p.parked = t; // 挂在此结点上的阻塞着的线程 if (slot == p) U.park(false, ns); // 阻塞, 等着被唤醒或中断 p.parked = null; // 醒来后,解除与结点的联系 U.putObject(t, BLOCKER, null); // 解除阻塞对象 } else if (U.compareAndSwapObject(this, SLOT, p, null)) { // 超时或其它,给其它线程腾出slot v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null; break; } } // 归位 U.putOrderedObject(p, MATCH, null); p.item = null; p.hash = h; return v;}

NULL_ITEM
为一个空节点,其实就是一个Object对象而已,slotExchange()为单个slot交换。

  • 执行流程。
    1. 检查 slot
      是否为空,不为空,说明已经有线程在此等待,尝试占领该槽位,如果占领成功,与等待线程交换数据,并唤醒等待线程,交易结束,返回。
    2. 如果占领槽位失败,创建 arena,继续步骤 1 尝试抢占 slot,直至
      slot 为空,或者抢占成功,交易结束返回。
    3. 如果 slot 为空,则判断 arena 是否为空,如果 arena 不为空,返回
      null,重新路由到 arenaExchange 方法。
    4. 如果 arena 为空,说明当前线程是先到达的,尝试占有
      slot,如果成功,将 slot 标记为自己占用,跳出循环,继续步骤
      5,如果失败,则继续步骤 1。
    5. 当前线程等待被释放,等待的顺序是先自旋,不成功则让出 CPU
      时间片,最后还不行就阻塞,spin -> yield -> block。
    6. 如果超时或被中断,则退出循环。
    7. 最后,重置数据,下次重用,返回结果,结束。

slotExchange(Object item, boolean timed, long ns)

图片 2slotExchange
流程图

 private final Object slotExchange(Object item, boolean timed, long ns) { // 获取当前线程的节点 p Node p = participant.get(); // 当前线程 Thread t = Thread.currentThread(); // 线程中断,直接返回 if (t.isInterrupted return null; // 自旋 for  { //slot != null if ( != null) { //尝试CAS替换 if (U.compareAndSwapObject(this, SLOT, q, null)) { Object v = q.item; // 当前线程的项,也就是交换的数据 q.match = item; // 做releasing操作的线程传递的项 Thread w = q.parked; // 挂起时设置线程值 // 挂起线程不为null,线程挂起 if (w != null) U.unpark; return v; } //如果失败了,则创建arena //bound 则是上次Exchanger.bound if (NCPU > 1 && bound == 0 && U.compareAndSwapInt(this, BOUND, 0, SEQ)) arena = new Node[ << ASHIFT]; } //如果arena != null,直接返回,进入arenaExchange逻辑处理 else if (arena != null) return null; else { p.item = item; if (U.compareAndSwapObject(this, SLOT, null, p)) break; p.item = null; } } /* * 等待 release * 进入spin+block模式 */ int h = p.hash; long end = timed ? System.nanoTime() + ns : 0L; int spins = (NCPU > 1) ? SPINS : 1; Object v; while ((v = p.match) == null) { if (spins > 0) { h ^= h << 1; h ^= h >>> 3; h ^= h << 10; if  h = SPINS | t.getId(); else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0) Thread.yield(); } else if (slot != p) spins = SPINS; else if (!t.isInterrupted() && arena == null && (!timed || (ns = end - System.nanoTime > 0L)) { U.putObject(t, BLOCKER, this); p.parked = t; if (slot == p) U.park(false, ns); p.parked = null; U.putObject(t, BLOCKER, null); } else if (U.compareAndSwapObject(this, SLOT, p, null)) { v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null; break; } } U.putOrderedObject(p, MATCH, null); p.item = null; p.hash = h; return v; }
  • 一个 Node 数组 arena,代表了很多的槽位。

程序首先通过participant获取当前线程节点Node。检测是否中断,如果中断return
null,等待后续抛出InterruptedException异常。

如果slot不为null,则进行slot消除,成功直接返回数据V,否则失败,则创建arena消除数组。

private static final int ASHIFT = 7; // 两个有效槽(slot -> Node)之间的字节地址长度(内存地址,以字节为单位),1 << 7至少为缓存行的大小,防止伪共享 private static final int MMASK = 0xff; // 场地(一排槽,arena -> Node[])的可支持的最大索引,可分配的大小为 MMASK + 1private static final int SEQ = MMASK + 1; // bound的递增单元,确立其唯一性private static final int NCPU = Runtime.getRuntime().availableProcessors(); // CPU的个数,用于场地大小和自旋控制static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1; // 最大的arena索引private static final int SPINS = 1 << 10; // 自旋次数,NCPU = 1时,禁用private static final Object NULL_ITEM = new Object();// 空对象,对应nullprivate static final Object TIMED_OUT = new Object();// 超时对象,对应timeout// 多个线程交换/多槽位private volatile Node[] arena;

如果slot为null,但arena不为null,则返回null,进入arenaExchange逻辑。

arenaExchange 方法

如果slot为null,且arena也为null,则尝试占领该slot,失败重试,成功则跳出循环进入spin+block模式。

private final Object arenaExchange(Object item, boolean timed, long ns) { Node[] a = arena; // 交换场地,一排slot Node p = participant.get(); // 获取当前线程携带的Node p.index 初始值为 0 for (int i = p.index;;) { // arena的索引,数组下标 int b, m, c; long j; // 原数组偏移量,包括填充值 // 从场地中选出偏移地址为(i << ASHIFT) + ABASE的内存值,也即真正可用的Node //如果i为0,j相当于是 "第一个"槽位 Node q =  U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE); if (q != null && U.compareAndSwapObject(a, j, q, null)) { // 此槽位不为null, 说明已经有线程在这里等了,重新将其设置为null, CAS操作 Object v = q.item; // 取出等待线程携带的数据 q.match = item; // 将当前线程携带的数据交给等待线程 Thread w = q.parked; // 可能存在的等待线程 if (w != null) U.unpark; // 唤醒等待线程 return v; // 返回结果, 交易成功 } else if (i <= (m = (b = bound) & MMASK) && q == null) { // 有效交换位置,且槽位为空 p.item = item; // 将携带的数据卸下,等待别的线程来交易 if (U.compareAndSwapObject(a, j, null, p)) { // 槽位占领成功 long end = (timed && m == 0) ? System.nanoTime() + ns : 0L; // 计算出超时结束时间点 Thread t = Thread.currentThread(); // 当前线程 for (int h = p.hash, spins = SPINS;;) { // 一直循环,直到有别的线程来交易,或超时,或中断 Object v = p.match; // 检查是否有别的线程来交换数据 if (v != null) { // 有则返回 U.putOrderedObject(p, MATCH, null); // match重置,等着下次使用 p.item = null; // 清空,下次接着使用 p.hash = h; return v; // 返回结果,交易结束 } else if (spins > 0) { // 自旋 h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // 移位加异或,伪随机 if  // 初始值 h = SPINS |  t.getId(); else if (h < 0 && // SPINS >>> 1, 一半的概率 (--spins & ((SPINS >>> 1) - 1)) == 0) Thread.yield(); // 每一次等待有两次让出CPU的时机 } else if (U.getObjectVolatile != p) spins = SPINS; // 别的线程已经到来,正在准备数据,自旋等待一会儿,马上就好 else if (!t.isInterrupted() && m == 0 && (!timed || (ns = end - System.nanoTime > 0L)) { U.putObject(t, BLOCKER, this); // 设置当前线程将阻塞在当前对象上 p.parked = t; // 挂在此结点上的阻塞着的线程 if (U.getObjectVolatile == p) U.park(false, ns); // 阻塞, 等着被唤醒或中断 p.parked = null; // 醒来后,解除与结点的联系 U.putObject(t, BLOCKER, null); // 解除阻塞对象 } else if (U.getObjectVolatile == p && U.compareAndSwapObject(a, j, p, null)) { if  // 尝试缩减 U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1); // 更新bound, 高位递增,低位 -1 p.item = null; // 重置 p.hash = h; i = p.index >>>= 1; // 索引减半,为的是快速找到汇合点 if (Thread.interrupted// 保留中断状态,以便调用者可以重新检查,Thread.interrupted() 会清除中断状态标记 return null; if (timed && m == 0 && ns <= 0L) // 超时 return TIMED_OUT; break; // 重新开始 } } } else p.item = null; // 重置 } else { if (p.bound != b) { // 别的线程更改了bound,重置collides为0, i的情况如下:当i != m, 或者m = 0时,i = m; 否则,i = m-1; 从右往左遍历 p.bound = b; p.collides = 0; i = (i != m || m == 0) ? m : m - 1; // index 左移 } else if ((c = p.collides) < m || m == FULL || !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) { // 更新bound, 高位递增,低位 +1 p.collides = c + 1; i =  ? m : i - 1; // 左移,遍历槽位,m == FULL时,i == 0,重置i = m, 重新从右往左循环遍历 } else i = m + 1; // 槽位增长 p.index = i; } }}

在自旋+阻塞模式中,首先取得结束时间和自旋次数。如果match(做releasing操作的线程传递的项)为null,其首先尝试spins+随机次自旋(改自旋使用当前节点中的hash,并改变之)和退让。当自旋数为0后,假如slot发生了改变(slot
!=
p)则重置自旋数并重试。否则假如:当前未中断&arena为null&(当前不是限时版本或者限时版本+当前时间未结束):阻塞或者限时阻塞。假如:当前中断或者arena不为null或者当前为限时版本+时间已经结束:不限时版本:置v为null;限时版本:如果时间结束以及未中断则TIMED_OUT;否则给出null(原因是探测到arena非空或者当前线程中断)。

  • 执行流程。
    1. 从场地中选出偏移地址为(i << ASHIFT)+ ABASE
      的内存值,也即第 i 个真正可用的
      Node,判断其槽位是否为空,为空,进入步骤 2。

      • 不为空,说明有线程在此等待,尝试抢占该槽位,抢占成功,交换数据,并唤醒等待线程,返回,结束。
      • 没有抢占成功,进入步骤 9。
    2. 检查索引是否越界,越界,进入步骤 9。没有越界,进入步骤 3。
    3. 尝试占有该槽位,抢占失败,进入步骤 1。抢占成功,进入步骤 4。
    4. 检查
      match,是否有线程来交换数据,如果有,交换数据,结束。如果没有,进入步骤
      5。
    5. 检查 spin 是否大于 0,如果不大于 0,进入步骤 6。
      • 如果大于 0,检查 hash 是否小于 0,并且 spin 减半或为
        0,如果不是,进入步骤 4。
      • 如果是,让出 CPU 时间,过一会儿,进入步骤 4。
    6. 检查是否中断,m
      达到最小值,是否超时,如果没有中断,没有超时,并且 m
      达到最小值,阻塞,过一会儿进入步骤 4。否则,进入步骤 7。
    7. 没有线程交换数据,尝试丢弃原有的槽位重新开始,丢弃失败,进入步骤
      4。否则,进入步骤 8。
    8. bound 减 1,索引减半。
      • 检查是否中断或超时,如果没有,进入步骤 1。
      • 否则,返回,结束。
    9. 检查 bound 是否发生变化,如果变化,重置 collides,索引重置为 m
      或左移,转向步骤 1。否则,进入步骤 10。
    10. 检查 collides
      是否达到最大值,如果没有,进入步骤13。否则,进入步骤 11。
    11. m 是否达到 FULL,是,进入步骤13。否则,进入步骤 12。
    12. CAS bound 加 1 是否成功,如果成功,i 置为
      m+1,槽位增长,进入步骤 1。否则,进入步骤 13。
    13. collides 加 1,索引左移,进入步骤 1。

match不为空时跳出循环。

整个slotExchange清晰明了。

static final class Participant extends ThreadLocal<Node> { public Node initialValue() { return new Node(); }}

arenaExchange(Object item, boolean timed, long ns)

  • 通过 participant 取得当前结点 Node,然后根据当前结点 Node 的 index
    去取 arena 中相对应的结点。
 private final Object arenaExchange(Object item, boolean timed, long ns) { Node[] a = arena; Node p = participant.get(); for (int i = p.index;;) { // access slot at i int b, m, c; long j; // j is raw array offset Node q = U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE); if (q != null && U.compareAndSwapObject(a, j, q, null)) { Object v = q.item; // release q.match = item; Thread w = q.parked; if (w != null) U.unpark; return v; } else if (i <= (m = (b = bound) & MMASK) && q == null) { p.item = item; // offer if (U.compareAndSwapObject(a, j, null, p)) { long end = (timed && m == 0) ? System.nanoTime() + ns : 0L; Thread t = Thread.currentThread(); // wait for (int h = p.hash, spins = SPINS;;) { Object v = p.match; if (v != null) { U.putOrderedObject(p, MATCH, null); p.item = null; // clear for next use p.hash = h; return v; } else if (spins > 0) { h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift if  // initialize hash h = SPINS | t.getId(); else if (h < 0 && // approx 50% true (--spins & ((SPINS >>> 1) - 1)) == 0) Thread.yield(); // two yields per wait } else if (U.getObjectVolatile != p) spins = SPINS; // releaser hasn't set match yet else if (!t.isInterrupted() && m == 0 && (!timed || (ns = end - System.nanoTime > 0L)) { U.putObject(t, BLOCKER, this); // emulate LockSupport p.parked = t; // minimize window if (U.getObjectVolatile == p) U.park(false, ns); p.parked = null; U.putObject(t, BLOCKER, null); } else if (U.getObjectVolatile == p && U.compareAndSwapObject(a, j, p, null)) { if  // try to shrink U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1); p.item = null; p.hash = h; i = p.index >>>= 1; // descend if (Thread.interrupted return null; if (timed && m == 0 && ns <= 0L) return TIMED_OUT; break; // expired; restart } } } else p.item = null; // clear offer } else { if (p.bound != b) { // stale; reset p.bound = b; p.collides = 0; i = (i != m || m == 0) ? m : m - 1; } else if ((c = p.collides) < m || m == FULL || !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) { p.collides = c + 1; i =  ? m : i - 1; // cyclically traverse } else i = m + 1; // grow p.index = i; } } }

伪随机

首先通过participant取得当前节点Node,然后根据当前节点Node的index去取arena中相对应的节点node。前面提到过arena可以确保不同的slot在arena中是不会相冲突的,那么是怎么保证的呢?我们先看arena的创建:

h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
arena = new Node[ << ASHIFT];
  • xorshift 算法。
    • T = 。
      • L 代表左移。
      • R 代表右移。
      • a,b,c 分别为代码中的 1,3,10。
      • I 代表矩阵 {0, 1} 共 32 位,即是 int 类型的二进制。
      • T 代表的是随机算法。
  • 伪随机通过 xorshift
    算法模拟随机,为了达到更好的随机效果,周期自然是越大越好。

    • 周期
      指的是,当给定一个输入,得到的输出再作为下一次的输入,如此反复,直到某次输出恰巧等于最初的输入,这便是随机算法的一个周期。
    • int 类型的最大周期应该是遍历该类型所有的值(0 除外,如果是 0
      ,输出便一直是 0,不能随机),即
      max - min = 2^32 - 1 = 4294967295

这个arena到底有多大呢?我们先看FULL 和ASHIFT的定义:

为什么选用 1,3,10

static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;private static final int ASHIFT = 7;private static final int NCPU = Runtime.getRuntime().availableProcessors();private static final int MMASK = 0xff; // 255
  • 当 a,b,c 分别为 1,3,10 时,周期刚好是 2^32 - 1 = 4294967295
  • 以下几种组合也是可以的。

假如我的机器NCPU = 8
,则得到的是768大小的arena数组。然后通过以下代码取得在arena中的节点:

 Node q = U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
[4294967295]       

他仍然是通过右移ASHIFT位来取得Node的,ABASE定义如下:

为什么要有两次左移和一次右移

Class<?> ak = Node[].class;ABASE = U.arrayBaseOffset + (1 << ASHIFT);
  • 虽然只一次左移+异或就能达到随机的效果。
  • 但是第一次左移可以让高位多 1,右移可以让低位多
    1,高位低位都参与计算,可以增加随机性,第二次左移,再进行真正的随机计算。

U.arrayBaseOffset获取对象头长度,数组元素的大小可以通过unsafe.arrayIndexScale(T[].class)
方法获取到。这也就是说要访问类型为T的第N个元素的话,你的偏移量offset应该是arrayOffset+N*arrayScale。也就是说BASE
= arrayOffset+ 128 。其次我们再看Node节点的定义

自旋等待

 @sun.misc.Contended static final class Node{ .... }
private static final int SPINS = 1 << 10;else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0) // h < 0,一半的概率 Thread.yield(); // 每一次等待有两次让出CPU的时机

在Java 8 中我们是可以利用sun.misc.Contended来规避伪共享的。所以说通过
<<
ASHIFT方式加上sun.misc.Contended,所以使得任意两个可用Node不会再同一个缓存行中。

  • 等待其它线程交换数据时,会进行自旋等待,自旋的过程中,当前线程会有
    2 次让出 CPU 的时机。

    • SPINS 为 1024, ((1024 >>>1) -1) = 511 = 0111111111,spins
      默认为 1024 循环递减。
    • 当 spins 的最高位为 0 或 1 并且其它位为 0 时进行
      计算的结果为 0。

关于伪共享请参考如下博文:伪共享(False
Sharing)]())
Java8中用sun.misc.Contended避免伪共享(false
sharing)]())

arena 的创建

我们再次回到arenaExchange()。取得arena中的node节点后,如果定位的节点q
不为空,且CAS操作成功,则交换数据,返回交换的数据,唤醒等待的线程。

static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;private static final int ASHIFT = 7;private static final int NCPU = Runtime.getRuntime().availableProcessors();private static final int MMASK = 0xff; // 255......if (NCPU > 1 && bound == 0 &&U.compareAndSwapInt(this, BOUND, 0, SEQ)) arena = new Node[ << ASHIFT];

如果q等于null且下标在bound &
MMASK范围之内,则尝试占领该位置,如果成功,则采用自旋 +
阻塞的方式进行等待交换数据。

  • slotExchange() 方法中存在竞争时,会构建 arena。
    • 初始化 arena 时会设置 bound 为 SEQ(SEQ=MMASK + 1),255 + 1 =
      256。
    • NCPU 为到 Java
      虚拟机可用的处理器数量。Runtime.getRuntime().availableProcessors()
    • 假设 NCPU 为 2,则 arena 数组大小为 384(2 >>> 1 然后
      << 7)。

如果下标不在bound &
MMASK范围之内获取由于q不为null但是竞争失败的时候:消除p。加入bound
不等于当前节点的bond(b != p.bound),则更新p.bound = b,collides = 0
,i = m或者m – 1。如果冲突的次数不到m 获取m
已经为最大值或者修改当前bound的值失败,则通过增加一次collides以及循环递减下标i的值;否则更新当前bound的值成功:我们令i为m+1即为此时最大的下标。最后更新当前index的值。

Exchanger使用、原理都比较好理解,但是这个源码看起来真心有点儿复杂,是真心难看懂,但是这种交换的思路Doug
Lea在后续博文中还会提到,例如SynchronousQueue、LinkedTransferQueue。

private static final sun.misc.Unsafe U;private static final int ABASE;U = sun.misc.Unsafe.getUnsafe();Class<?> ak = Node[].class;s = U.arrayIndexScale;ABASE = U.arrayBaseOffset + (1 << ASHIFT);

最后用一个在网上看到的段子结束此篇博客(

FULL 和 ASHIFT 的定义

其实就是”我”和”你”(可能有多个”我”,多个”你”)在一个叫Slot的地方做交易(一手交钱,一手交货),过程分以下步骤:

  • arena
    数组很大,但里面并不是每个位置都被使用了,还有一些是没有使用的。

    • 通过 Unsafe 的 arrayBaseOffset 方法可以返回 arena
      数组中第一个元素的偏移地址。
    • 通过 arrayIndexScale 方法可以返回 arena
      数组中每一个元素占用的大小,也就是元素与元素之间的间隔,即
      1 << ASHIFT 为 128。

      • ABASE = arrayBaseOffset + (1 << ASHIFT) 是 arena
        的起始位置加上 128 位这个偏移量。
      • arena 实际使用了 ABASE 做为起始位置,那么其前 128
        位的位置都是没有使用的。
      • 那么要访问 arena 的第 N 个元素,偏移量 offset 为
        arrayBaseOffset + N * arrayIndexScale
    • @sun.misc.Contended 注解 和 1 << ASHIFT 主要是用于避免
      伪共享。1 << ASHIFT 可以避免两个 Node 在同一个共享区。

      • 主流缓存行大小一般为 32 字节到 256 字节,128
        个地址位基本覆盖到了常见的处理器平台。
      • arena 数组中元素的分布间隔为 128
        个整数倍地址位,也就是说最小相差 128 个地址位。
  1. 我先到一个叫做Slot的交易场所交易,发现你已经到了,那我就尝试喊你交易,如果你回应了我,决定和我交易那么进入第2步;如果别人抢先一步把你喊走了,那我就进入第5步。
  2. 我拿出钱交给你,你可能会接收我的钱,然后把货给我,交易结束;也可能嫌我掏钱太慢或者接个电话,TM的不卖了,走了,那我只能再找别人买货了。
  3. 我到交易地点的时候,你不在,那我先尝试把这个交易点给占了(一屁股做凳子上…),如果我成功抢占了单间,那就坐这儿等着你拿货来交易,进入第4步;如果被别人抢座了,那我只能在找别的地方儿了,进入第5步。
  4. 你拿着货来了,喊我交易,然后完成交易;也可能我等了好长时间你都没来,我不等了,继续找别人交易去,走的时候我看了一眼,一共没多少人,弄了这么多单间,太TM浪费了,我喊来交易地点管理员:一共也没几个人,搞这么多单间儿干毛,给哥撤一个!。然后再找别人买货;或者我老大给我打了个电话,不让我买货了。
  5. 我跑去喊管理员,尼玛,就一个坑交易个毛啊,然后管理在一个更加开阔的地方开辟了好多个单间,然后我就挨个来看每个单间是否有人。如果有人我就问他是否可以交易,如果回应了我,那我就进入第2步。如果我没有人,那我就占着这个单间等其他人来交易,进入第4步。6.如果我尝试了几次都没有成功,我就会认为,是不是我TM选的这个单间风水不好?不行,得换个地儿继续;如果我尝试了多次发现还没有成功,怒了,把管理员喊来:给哥再开一个单间,加一个凳子,这么多人就这么几个破凳子够谁用!

图片 3arena
数组结构

Node q = U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE); if (q != null && U.compareAndSwapObject(a, j, q, null)) { Object v = q.item; // 获取槽位中结点 q 的数据 q.match = item; // 把当前线程的数据交换给它 Thread w = q.parked; // 获得槽位中结点 q 对应的线程对象 if (w != null) U.unpark; //唤醒该线程 return v;}

bound 和 collides

  • bound 是上一次记录的 Exchanger.bound。
    • bound 会记录 最大有效 的 arena
      索引,是动态变化的,竞争激烈时增加, 槽位空旷时减小。
    • bound + SEQ 确立其唯一性,低 8 位记录 有效索引
  • collides 是在当前 bound 下 CAS 失败的次数。
    • 最大为 m,m(bound & MMASK)为当前 bound 下最大有效索引。
    • 槽位最大值为 MMASK,bound 最大值也就是 255,m 和 i 的范围为
      [0,255]。
    • 从右往左遍历,等到 collides == m
      时,有效索引的槽位已经遍历完,这时需要增长槽位。
    • 增长的方式是重置 bound(依赖 SEQ 更新其版本,低位 + 1),同时
      collides 重置。

private static final int MMASK = 0xff;private static final int SEQ = MMASK + 1;......// MASK: 00000000000000000000000011111111// SEQ: 00000000000000000000000100000000// 1: 00000000000000000000000000000001if (NCPU > 1 && bound == 0 && U.compareAndSwapInt(this, BOUND, 0, SEQ))// 当 bound 为 0 时,bound 被更新为 SEQ//第一次更新//b0: 00000000000000000000000100000000U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)//SEQ+1: 00000000000000000000000100000001//b0+SEQ+1=b1: 00000000000000000000000200000001//第二次更新//b1+SEQ: 00000000000000000000000300000001//第二次是 -1 的情况U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1)//b1+SEQ-1=b2: 00000000000000000000000300000000
  • bound + SEQ 是版本递增的过程,b + SEQ + 1 后再
    b + SEQ - 1,实际经历了两个版本,并且会将 collides 重置。
  • 下图中去除了实际存在的未使用位置,只保留了数组中被使用的位置。
    • 其中被使用的位置数量最大值为 MMASK,FULL <= MMASK。
    • 当前线程进入 ” 第一个 ” 槽位,发现有其它线程在交换数据,则增加 1
      个槽位并且 bound 递增,此时最大有效索引为 1。

      • m 等于 1,i 范围为 [0,1],p.index 等于 1。
    • 当前线程进入后续槽位(包含之前增加的槽位),如果发现同样有其它线程在交换数据,则继续增加槽位,bound
      递增。
    • 当前线程进入后续槽位(包含之前增加的槽位),没有元素,则尝试占据该槽位,占据成功则等待其它线程。
      • 当等待超时则删除该槽位,再次从头开始遍历有效索引,寻找其它线程交换数据。

图片 4bound
操作

  • bound
    版本唯一性的作用主要用于更新索引,将有效索引更新到最右侧位置,使得可以再次从右向左遍历。

    • 如果没有 bound
      的版本唯一性,便没有索引更新,就会一直往左遍历竞争激烈的槽位。
    • 如果没有 bound 的版本唯一性,还会使得 bound 只增不减,影响效率。

  • 当前线程 A 和其它线程 B在槽中交换数据。
    1. 单槽方法(slotExchange)执行,A 发现 B
      已经在槽中,则尝试交换数据,如果成功,则进入第 2
      步骤。如果失败则说明有其它线程已经在和 B 进行数据交换,则进入第
      5 步骤。
    2. 交换数据成功,则交换结束。也可能超时或者中断,造成交换失败,只能从头开始。
    3. 到达槽位,未发现其它线程,则尝试占位,抢占成功,则自旋等待其它线程交换数据,进入第
      4 步骤。抢占失败,则说明被其它线程抢占了槽位,则进入第 5 步骤。
    4. 其它线程来交换数据,成功则交换结束。如果等待超时则寻找其它线程进行交换,先删除一个槽位,再从头开始寻找其它线程交换数据。也有可能会被中断。
    5. 转为多槽方法(arenaExchange)执行,挨个寻找槽中是否有可交换数据的对象,如果发现交换对象且尝试交换数据成功,则进入第
      2 步骤。如果为空槽,则占据并等待其它线程来交换数据,进入第 4
      步骤。
    6. 尝试多次交换都未成功,则增加槽位,然后再从头开始。
标签:,

发表评论

电子邮件地址不会被公开。 必填项已用*标注

相关文章

网站地图xml地图