首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >从 CLQ 到 Disruptor:无锁队列的下一阶段进化

从 CLQ 到 Disruptor:无锁队列的下一阶段进化

原创
作者头像
叫我阿柒啊
发布2025-11-28 17:56:53
发布2025-11-28 17:56:53
1730
举报

前言

在多线程并发处理数据的场景中,队列是不可少的,通常使用的都是线程安全的并发队列,例如ConcurrentLinkedQueue,其无锁队列(lock-free)+ CAS 的设计,生产者/消费者全部走 CAS 原子操作,不阻塞队列,写入吞吐可以做到百万级 ops,在高并发的场景简直如鱼得水。

但是 ConcurrentLinkedQueue是链表结构,指针跳来跳去,对CPU缓存极度不友好。同时无界队列的设计,在消费者的处理速度小于生产速度的时候,极容易产生OOM。

这段代码虽然简短,但展示了Disruptor的基本使用方式。你可能会觉得,这跟普通的队列好像也没差多少?别急,真正厉害的地方在于它的内部机制。

核心机制

Disruptor的核心是Ring Buffer,它是一个固定大小的数组,用来存储事件。生产者和消费者都通过索引访问这个数组,不需要频繁地申请和释放内存。这样就避免了GC的压力。避免了频繁的锁操作。它通过CAS(Compare and Swap)这种无锁算法来实现数据的读写,性能提升明显。而且它还支持多消费者模式,可以并行处理任务,不像传统队列那样只能串行消费。

另外,Disruptor采用了一种叫做“Sequence”的机制来管理事件的生产和消费进度。每个消费者都有自己的Sequence,用来记录当前已经处理到哪个位置。生产者则通过CAS操作更新自己的Sequence,确保不会覆盖未处理的数据。

这听起来有点像数据库里的事务日志,只不过Disruptor是纯内存操作,效率更高。

为什么适合高并发

我之前遇到一个问题,就是订单处理系统在高峰期经常出现延迟,甚至超时。后来换成Disruptor后,不仅延迟降低了,系统的吞吐量也提升了3倍以上。当然,这也取决于你的业务逻辑是否适合用Disruptor。

比如,如果你的事件处理逻辑很复杂,里面有IO操作或者网络请求,那Disruptor可能不会带来太大提升。因为它本身是单线程处理事件的,除非你用了多消费者。

所以,Disruptor更适合那种计算密集型低延迟高吞吐量的场景。比如实时交易、消息处理、日志收集等。

用法

其实用起来也不复杂。首先你需要定义一个事件类,然后创建一个EventFactory来生成这些事件对象。接着,初始化一个Disruptor实例,指定事件的大小,然后设置生产者和消费者的逻辑。

下面是Disruptor组件和用法的示例代码:

一. 基础组件

  1. Event:生产者和消费者之间传递的对象
代码语言:java
复制
public class ByteArrayEvent {
    private byte[] bytes;

    public void setBytes(byte[] bytes) {
        this.bytes = bytes;
    }
}
2. EventFactory :创建event的工厂类
public class ByteArrayEventFactory implements EventFactory<ByteArrayEvent> {
    @Override
    public ByteArrayEvent newInstance() {
        return new ByteArrayEvent();
    }
}
3.  EventHandler:消费者的消费逻辑
public class ByteArrayEventHandler implements EventHandler<ByteArrayEvent> {
    @Override
    public void onEvent(ByteArrayEvent byteArrayEvent, long sequence, boolean endOfBatch) throws Exception {
        // 处理事件的逻辑
    }
}

二. 构建Disruptor

默认情况下,Disruptor会将生产者指定为多线程模式,ProducerType.SINGLE来设置生产者为单线程模式。

代码语言:java
复制
// 必须是2的幂
int bufferSize = 1024;
/**
DaemonThreadFactory:线程池,create threads for processors.
ProducerType#SINGLE:一个ringbuffer支持多个publisher; ProducerType#MULTI:支持多个publisher
BlockingWaitStrategy:消费者等待策略。SleepingWaitStrategy:对生产者影响小。BlockingWaitStrategy使用了lock,效率不太行。
YieldingWaitStrategy性能最好无锁策略,使用了 Thread.yield() 多线程交替执行
**/
disruptor = new Disruptor<>(new ByteArrayEventFactory(), bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BlockingWaitStrategy());

// handleEventsWith:消费数据,每一次绑定一个消费者,可以使用then进行链式处理
// 一个handler就是一个消费者
disruptor.handleEventsWith(new ByteArrayEventHandler());
// 启动
disruptor.start();

其中RingBuffer是存放数据的容器

  • EventFactory:创建事件(任务)的工厂类。(这里任务会创建好,保存在内存中,可以看做是一个空任务)。
  • ringBufferSize:容器的长度。( Disruptor 的核心容器是 ringBuffer,环转数组,有限长度)。
  • Executor:消费者线程池,执行任务的线程。(每一个消费者都需要从线程池里获得线程去消费任务)。
  • ProductType:生产者类型:单生产者、多生产者。
  • WaitStrategy:等待策略。(当队列里的数据都被消费完之后,消费者和生产者之间的等待策略)。
  • EventHandler:事件处理器。

三. 生产者

代码语言:java
复制
1. 创建生产者
public class ByteArrayProducer {
    private final RingBuffer<ByteArrayEvent> ringBuffer;

    public ByteArrayProducer(RingBuffer<ByteArrayEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }
    
    // 转换器,将原始数据转换成Event
    private static final EventTranslatorOneArg<ByteArrayEvent, byte[]> TRANSLATOR =
            new EventTranslatorOneArg<ByteArrayEvent, byte[]>() {
                @Override
                public void translateTo(ByteArrayEvent event, long sequence, byte[] data) {
                    event.setBytes(data);
                }
            };
    // publishEvent生产event
    public void onData(byte[] data) {
        ringBuffer.publishEvent(TRANSLATOR, data);
    }
}
  1. 生产数据
代码语言:java
复制
RingBuffer ringBuffer = disruptor.getRingBuffer();
// 可以不用定义生产者类,直接 ringBuffer.publishEvent((event, sequence, buffer) -> event.set(data)), data) 的lambda
ByteArrayProducer producer =  new ByteArrayProducer(ringBuffer);
producer.onData(data);

四. 消费

ByteArrayEventHandler里的onEvent()就是消费者,实现了处理逻辑。每一个Handler就是一个消费者,在构建Disruptor时,几个消费者就在handleEventsWith绑定几个。

代码语言:java
复制
public class ByteArrayEventHandler implements EventHandler<ByteArrayEvent> {

    @Override
    public void onEvent(ByteArrayEvent event, long sequence, boolean endOfBatch) throws Exception {
        byte[] data = event.getBytes();

        // 模拟业务处理
        // 你随便换成 json 解析、交易校验、写入内存队列都行
        if (data != null) {
            System.out.println("消费到事件 seq = " + sequence + ", size=" + data.length);
        }
    }
}

总结

Disruptor不是万能的,但它确实是一个非常强大的工具。尤其在高并发、低延迟的场景下,它能带来显著的性能提升。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 核心机制
  • 为什么适合高并发
  • 用法
    • 一. 基础组件
    • 二. 构建Disruptor
    • 三. 生产者
  • 四. 消费
  • 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档