大家好,我是程序员牛肉。
对于 Java 初学者而言,JUC是突破高并发编程的关键跳板。其中精妙的设计思想 —— 从锁优化到无锁并发,从线程协同到资源调度 —— 堪称并发编程的 "设计模式宝典"。
今天我们来和大家一起读一下Exchanger类。
Exchanger类被用来实现交换两或多个线程的数据。它的优点在于能够简洁高效地实现两或多个线程间的数据交换。通过Exchanger,开发者可以避免复杂的锁和同步机制,降低并发编程的难度,同时它还提供了线程安全的数据交换保障,使得多线程协作更为可靠、易用性和高效性。
这个类提供的方法很简单,就三个:
// 构造方法
public Exchanger()
// 交换方法(无超时)
public V exchange(V x) throws InterruptedException
// 交换方法(有超时)
public V exchange(V x, long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException
其中,exchange方法是Exchanger的核心,它会阻塞当前线程直到以下情况之一发生:
当我们当线程A调用exchange方法,如果此时没有其他线程在等待交换,线程A将进入等待状态。此时当线程B也调用exchange方法,系统检测到有线程A正在等待交换,则立即进行数据交换。交换完成后,两个线程同时被唤醒,各自获得对方的数据对象
了解Exchanger的工作流程之后,我们再来看一看exchange方法是怎么实现的:
@SuppressWarnings("unchecked")
public V exchange(V x) throws InterruptedException {
Object v;
Node[] a;
Object item = (x == null) ? NULL_ITEM : x; // translate null args
if (((a = arena) != null ||
(v = slotExchange(item, false, 0L)) == null) &&
((Thread.interrupted() || // disambiguates null return
(v = arenaExchange(item, false, 0L)) == null)))
throw new InterruptedException();
return (v == NULL_ITEM) ? null : (V)v;
}
我们会检查当前竞技场(arena)是否存在,如果不存在的话我们就使用slotExchange方法进行交换。
如果竞技场(arena)存在的话,并且线程没有被中断,我们就使用arenaExchange方法进行交换。
这样做的思路其实很好理解。并不一定只有两个线程在调用exchange方法进行数据交换。如果是多个线程呢?
在两个线程的情况下,你基于单个共享变量的方法进行数据交换的性能还可以。但是如果在多个线程的情况下,就会出现大量的CAS和重试。
那为了缓解这种情况,我们就设计了竞技场的模式。其实就是一个共享变量数组。这样的话线程A和B在数组[0]上进行互换数据,线程C和D在数组[1]上进行互换数据。
这样的话我们就在一定程度上减低了CAS和重试的次数。降低了竞争的激烈度。我们从源码的角度来解读一下,先看slotExchange吧。
我们会先获取到当前线程的node节点,其实就是当前线程想要共享的数据。之后判断一下当前线程是否被中断。
之后我们会判断一下当前的共享变量槽位(slot)是否有值,如果有值说明此时已经有线程在等待交换,你只需要和那个等待的线程进行数据互换就好了。
之前我们说过当一个线程先调用exchange方法之后会进入阻塞状态,而此时由于你已经成功的进行了数据交换,因此在最后使用了unpark方法来唤醒对应的线程。
当你CAS失败之后,CAS失败说明当前竞争激烈。竞争激烈的情况下我们就要开启竞技场了。也就是这一部分逻辑:
我们会检查当前的CPU核数是否大于1,因为作者认为如果你CPU核数都不大于1,那你就不会有多少线程来竞争。所以没必要开竞技场。
之后我们会修改标记量bound表示当前的线程已经开启竞技场,创建出一个大小为[(full+2)<<Ashift]的竞技场。这个值可不是乱设计的,从并发安全以及性能的角度考虑,我们的竞技场大小应该满足两个条件:
Node 上,竞争无法有效分散。
当多个线程同时访问位于同一缓存行中但逻辑上独立的变量时,即使这些变量没有真正的依赖关系,缓存系统也会将它们视为一个整体。此时,一个线程对其中一个变量的修改会导致整个缓存行失效,迫使其他线程重新从主内存加载数据,产生不必要的性能开销。
所以我们在这里尽量把一个node填充为一个缓存行,这样的话就可以避免伪共享问题。
当创建好竞技场之后,如果竞技场不为空的话,就调用areanExchange方法进行竞争。如果CPU核数小于1导致创建竞技场失败的话,就更新SLOT的值继续进行单槽更新。
那此时还没有return的就只有两个情况,要么你是第一个进入exchange的线程,需要等待第二个线程进入来交换数据。要么你是CPU小于1且slot为空,没办法进入竞技场,只能重新进行slotExchange。
所以下面的代码就只有一个作用,让当前线程自旋或者阻塞:
这里的设计思路就是先自旋,当达到自旋的上线次数之后,有选择性的调用park或者parkNanos方法来进行阻塞线程,等待下一个线程来和自己交换变量。
读完了slotExchange之后,我们再来看一看areanExchange方法
当单槽模式(slot)竞争激烈时,Exchanger 会初始化一个 Node[] 数组(arena),让线程在不同的 Node 上操作,避免集中竞争。arenaExchange 的核心流程:
Node 或空闲节点。
Node 数组中并进入自旋 / 阻塞等待。
arena,优化性能。
当前线程会被随机路由到一个槽位中,如果当前槽位有值的话,就进行数值交换。
而如果当前槽位中没有可以配对的线程之后,我们就把当前线程的值加入到当前的槽位中。
当前线程的值被加入到node数组中后,我们还是要对当前线程先自旋,再阻塞。之后当一个node节点被多个线程竞争的时候,我们会选择进行是重新路由位置或者将当前线程的值放到目标位置的-1中。
所以其实Exchanger会存在一个问题:如果当前多个线程都被路由到了不同的空槽位中等待线程的配对,那么就会出现阻塞情况。
而且由于阻塞并没有设置默认的超时时间,所以就有可能出现死锁的情况。因此我们在使用Exchanger最好使用带超时时间的Exchange方法。
并且其实我们通过源码也可以发现:Exchanger更加关注于数据之间交换,而不是数据之间谁和谁进行交换。
当多个线程使用一个Exchanger的时候,他们会被随机路由到node槽中来进行配对,而无法指定具体的交换线程。其实你从node的设计中也可以证明:
在node中,其实并没有绑定当前线程要和哪个线程进行交换。因此当多个线程调用一个Exchanger的时候,他们的数据交换其实是随机的。
并且当你设置交换线程数量的时候,也最好是偶数个。因为Exchanger强制要求一对一配对。当你设置为奇数个线程的时候,就会导致剩下一个线程一直被阻塞等待配对线程。
并且比较好玩的是,Exchanger为性能做出了大量的优化,例如对于高频记录的slot,bound和记录绑定结果的match直接使用了Java 的VarHandle以此实现低延迟的原子操作
Exchanger通过精心的设计和多种性能优化技术,在特定场景下展现出优异的性能和易用性。尽管Exchanger有其局限性,如仅支持两个线程、可能的线程饥饿和死锁风险等,但在适合的场景中,它仍然是一个强大而有价值的并发工具。
那今天关于Exchanger的源码讲解就到这里了。相信通过我的介绍,你已经了解了Exchanger的设计思想。当你下次需要进行两个线程的数据交换,可以尝试一下使用Exchanger哦