
在多线程并发处理数据的场景中,队列是不可少的,通常使用的都是线程安全的并发队列,例如ConcurrentLinkedQueue,其无锁队列(lock-free)+ CAS 的设计,生产者/消费者全部走 CAS 原子操作,不阻塞队列,写入吞吐可以做到百万级 ops,在高并发的场景简直如鱼得水。
但是 ConcurrentLinkedQueue是链表结构,指针跳来跳去,对CPU缓存极度不友好。同时无界队列的设计,在消费者的处理速度小于生产速度的时候,极容易产生OOM。
这段代码虽然简短,但展示了Disruptor的基本使用方式。你可能会觉得,这跟普通的队列好像也没差多少?别急,真正厉害的地方在于它的内部机制。
Disruptor的核心是Ring Buffer,它是一个固定大小的数组,用来存储事件。生产者和消费者都通过索引访问这个数组,不需要频繁地申请和释放内存。这样就避免了GC的压力。避免了频繁的锁操作。它通过CAS(Compare and Swap)这种无锁算法来实现数据的读写,性能提升明显。而且它还支持多消费者模式,可以并行处理任务,不像传统队列那样只能串行消费。
另外,Disruptor采用了一种叫做“Sequence”的机制来管理事件的生产和消费进度。每个消费者都有自己的Sequence,用来记录当前已经处理到哪个位置。生产者则通过CAS操作更新自己的Sequence,确保不会覆盖未处理的数据。
这听起来有点像数据库里的事务日志,只不过Disruptor是纯内存操作,效率更高。
我之前遇到一个问题,就是订单处理系统在高峰期经常出现延迟,甚至超时。后来换成Disruptor后,不仅延迟降低了,系统的吞吐量也提升了3倍以上。当然,这也取决于你的业务逻辑是否适合用Disruptor。
比如,如果你的事件处理逻辑很复杂,里面有IO操作或者网络请求,那Disruptor可能不会带来太大提升。因为它本身是单线程处理事件的,除非你用了多消费者。
所以,Disruptor更适合那种计算密集型、低延迟、高吞吐量的场景。比如实时交易、消息处理、日志收集等。
其实用起来也不复杂。首先你需要定义一个事件类,然后创建一个EventFactory来生成这些事件对象。接着,初始化一个Disruptor实例,指定事件的大小,然后设置生产者和消费者的逻辑。
下面是Disruptor组件和用法的示例代码:
public class ByteArrayEvent {
private byte[] bytes;
public void setBytes(byte[] bytes) {
this.bytes = bytes;
}
}
2. EventFactory :创建event的工厂类
public class ByteArrayEventFactory implements EventFactory<ByteArrayEvent> {
@Override
public ByteArrayEvent newInstance() {
return new ByteArrayEvent();
}
}
3. EventHandler:消费者的消费逻辑
public class ByteArrayEventHandler implements EventHandler<ByteArrayEvent> {
@Override
public void onEvent(ByteArrayEvent byteArrayEvent, long sequence, boolean endOfBatch) throws Exception {
// 处理事件的逻辑
}
}默认情况下,Disruptor会将生产者指定为多线程模式,ProducerType.SINGLE来设置生产者为单线程模式。
// 必须是2的幂
int bufferSize = 1024;
/**
DaemonThreadFactory:线程池,create threads for processors.
ProducerType#SINGLE:一个ringbuffer支持多个publisher; ProducerType#MULTI:支持多个publisher
BlockingWaitStrategy:消费者等待策略。SleepingWaitStrategy:对生产者影响小。BlockingWaitStrategy使用了lock,效率不太行。
YieldingWaitStrategy性能最好无锁策略,使用了 Thread.yield() 多线程交替执行
**/
disruptor = new Disruptor<>(new ByteArrayEventFactory(), bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BlockingWaitStrategy());
// handleEventsWith:消费数据,每一次绑定一个消费者,可以使用then进行链式处理
// 一个handler就是一个消费者
disruptor.handleEventsWith(new ByteArrayEventHandler());
// 启动
disruptor.start();其中RingBuffer是存放数据的容器
1. 创建生产者
public class ByteArrayProducer {
private final RingBuffer<ByteArrayEvent> ringBuffer;
public ByteArrayProducer(RingBuffer<ByteArrayEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
// 转换器,将原始数据转换成Event
private static final EventTranslatorOneArg<ByteArrayEvent, byte[]> TRANSLATOR =
new EventTranslatorOneArg<ByteArrayEvent, byte[]>() {
@Override
public void translateTo(ByteArrayEvent event, long sequence, byte[] data) {
event.setBytes(data);
}
};
// publishEvent生产event
public void onData(byte[] data) {
ringBuffer.publishEvent(TRANSLATOR, data);
}
}RingBuffer ringBuffer = disruptor.getRingBuffer();
// 可以不用定义生产者类,直接 ringBuffer.publishEvent((event, sequence, buffer) -> event.set(data)), data) 的lambda
ByteArrayProducer producer = new ByteArrayProducer(ringBuffer);
producer.onData(data);ByteArrayEventHandler里的onEvent()就是消费者,实现了处理逻辑。每一个Handler就是一个消费者,在构建Disruptor时,几个消费者就在handleEventsWith绑定几个。
public class ByteArrayEventHandler implements EventHandler<ByteArrayEvent> {
@Override
public void onEvent(ByteArrayEvent event, long sequence, boolean endOfBatch) throws Exception {
byte[] data = event.getBytes();
// 模拟业务处理
// 你随便换成 json 解析、交易校验、写入内存队列都行
if (data != null) {
System.out.println("消费到事件 seq = " + sequence + ", size=" + data.length);
}
}
}Disruptor不是万能的,但它确实是一个非常强大的工具。尤其在高并发、低延迟的场景下,它能带来显著的性能提升。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。