Skip to content

Disruptor:高性能队列的秘密

如果我告诉你,有一个队列的性能比 ArrayBlockingQueue 快 10 倍以上,你信吗?

这就是 Disruptor,由 LMAX 交易所开发的一个高性能消息队列框架。LMAX 是一家英国的金融科技公司,他们用 Java 做到了每秒钟处理 600 万订单,靠的就是 Disruptor。

先看个对比

java
// ArrayBlockingQueue:传统阻塞队列
ArrayBlockingQueue<Order> queue = new ArrayBlockingQueue<>(1024);

// Disruptor:高并发无锁队列
Disruptor<Order> disruptor = new Disruptor<>(Order::new, 1024, DaemonThreadFactory.INSTANCE);

差距有多大?

ArrayBlockingQueue 吞吐量: ~300,000 ops/s
Disruptor 吞吐量: ~6,000,000 ops/s

性能提升: 20 倍!

为什么这么快?让我慢慢道来。

核心原理

1. Ring Buffer - 预分配的循环数组

Disruptor 底层用的是 Ring Buffer(环形缓冲区):

Ring Buffer 结构:

┌─────────────────────────────────────────────────────┐
│                    Ring Buffer                      │
│  index:  0     1     2     3     4     5     6    │
│  value: [msg1] [msg2] [msg3] [msg4] [msg5] [msg6]   │
│                                                     │
│  producer 写入 ──────────────────────────▶         │
│                      ◀────────────────── consumer  │
└─────────────────────────────────────────────────────┘

特点:
- 预分配内存,运行时不需要 GC(对象复用)
- 数组在内存上是连续的,CPU 缓存命中率高
- 读写不需要锁,利用 CAS 保证线程安全
- 长度固定,head/tail 指针循环,不会有内存分配

为什么用 Ring Buffer?

传统队列(如 LinkedBlockingQueue)的问题:

  • 每次入队都 new 对象 → 频繁 GC
  • 每次出队对象可能被回收 → 内存碎片
  • 链表节点不连续 → CPU 缓存不友好

Ring Buffer 的优势:

  • 预分配固定数量的槽位
  • 生产者和消费者各自维护 Sequence(序号)
  • 通过 CAS 操作序号,无锁并发

2. Sequence - 序号追踪

每个消费者和生产者都有一个 Sequence,记录自己处理到哪个位置:

生产者 Sequence = 5  (已写入到位置 5)
消费者 Sequence = 3  (已消费到位置 3)

可用消息 = 生产者 Sequence - 消费者 Sequence = 2 条

Sequence 本质上是一个 Long 类型的原子变量,通过 CAS 更新:

java
// 伪代码
public long incrementAndGet() {
    long current;
    long next;
    do {
        current = sequence.get();
        next = current + 1;
    } while (!sequence.compareAndSet(current, next));
    return next;
}

3. 无锁设计

Disruptor 只需要 一把锁(生产者的 claim),而 ArrayBlockingQueue 需要 两把锁(入队 + 出队)。

ArrayBlockingQueue 的问题:
- 入队需要获取 putLock
- 出队需要获取 takeLock
- 两个操作不能同时进行

Disruptor 的设计:
- 生产者通过 CAS claim 下一个可用槽位
- 消费者各自读取,互相不阻塞
- 只有在队列满/空时才会阻塞

核心概念

消费者模式

Disruptor 支持多种消费者模式:

1. 单生产者单消费者(最简单的场景)
   Producer ──▶ [Ring Buffer] ──▶ Consumer

2. 多消费者(每个消息被所有消费者处理)
   Producer ──▶ [Ring Buffer] ──▶ Consumer1
                                 ──▶ Consumer2
                                 ──▶ Consumer3

3. 消费者组(每个消息只被一个消费者处理)
   Producer ──▶ [Ring Buffer] ──▶ ConsumerGroup
                                     ├── Consumer A
                                     └── Consumer B

4. 菱形消费(先并行再串行)
   Producer ──▶ [Ring Buffer] ──▶ Handler1 ──▶ Handler3
                                     └─▶ Handler2 ──┘

5. 链式依赖(A → B → C)
   Producer ──▶ [Ring Buffer] ──▶ A ──▶ B ──▶ C

WaitStrategy - 等待策略

消费者需要等待可用消息,Disruptor 提供了多种等待策略:

1. BlockingWaitStrategy(默认)
   - 使用 ReentrantLock + Condition
   - 吞吐量低但 CPU 占用最低
   - 适合对延迟不敏感的场景

2. SleepingWaitStrategy
   - 先自旋,失败后 sleep
   - 延迟不稳定但 CPU 消耗低
   - 适合异步日志等场景

3. YieldingWaitStrategy
   - 自旋 + Thread.yield()
   - 延迟低,吞吐量高
   - 适合高并发低延迟场景

4. BusySpinWaitStrategy
   - 纯自旋,延迟最低
   - CPU 占用最高
   - 适合超低延迟场景(但要求核心数够多)

Event 与 EventFactory

java
// 1. 定义 Event(要传递的消息)
@Data
public class OrderEvent {
    private Long orderId;
    private BigDecimal amount;
    private String status;
}

// 2. 定义 EventFactory(用于预分配)
public class OrderEventFactory implements EventFactory<OrderEvent> {
    @Override
    public OrderEvent newInstance() {
        return new OrderEvent();
    }
}

// 3. 定义 Handler(处理逻辑)
public class OrderHandler implements EventHandler<OrderEvent> {
    @Override
    public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) {
        log.info("处理订单: orderId={}, amount={}",
                 event.getOrderId(), event.getAmount());
    }
}

基础用法

引入依赖

xml
<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.4</version>
</dependency>

最简单的例子

java
public class SimpleDisruptorDemo {

    public static void main(String[] args) {
        // 1. 创建 Disruptor
        Disruptor<OrderEvent> disruptor = new Disruptor<>(
            OrderEvent::new,    // Event 工厂
            1024,               // Ring Buffer 大小(必须是 2 的幂)
            DaemonThreadFactory.INSTANCE  // 线程工厂
        );

        // 2. 注册消费者
        disruptor.handleEventsWith((event, sequence, endOfBatch) -> {
            log.info("收到订单: orderId={}, amount={}",
                     event.getOrderId(), event.getAmount());
        });

        // 3. 启动
        disruptor.start();

        // 4. 获取 Ring Buffer 发消息
        RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
        for (int i = 0; i < 10; i++) {
            long seq = ringBuffer.next();
            try {
                OrderEvent event = ringBuffer.get(seq);
                event.setOrderId((long) i);
                event.setAmount(BigDecimal.valueOf(100 * i));
            } finally {
                ringBuffer.publish(seq);  // 必须 publish,否则消息不可见
            }
        }

        // 5. 关闭
        disruptor.shutdown();
    }
}

使用 Lambda 简化

java
// 不用单独定义 Handler,直接用 Lambda
Disruptor<OrderEvent> disruptor = new Disruptor<>(
    OrderEvent::new, 1024, DaemonThreadFactory.INSTANCE
);

disruptor.handleEventsWith((event, sequence, endOfBatch) -> {
    log.info("处理订单: {}", event.getOrderId());
});

// Lambda 方式二
disruptor.handleEventsWith(event -> {
    log.info("处理订单: {}", event.getOrderId());
});

实战:订单处理系统

1. 定义订单 Event

java
@Data
public class OrderEvent {
    private Long orderId;
    private Long userId;
    private BigDecimal amount;
    private LocalDateTime createTime;

    public static OrderEvent of(Long orderId, Long userId, BigDecimal amount) {
        OrderEvent event = new OrderEvent();
        event.setOrderId(orderId);
        event.setUserId(userId);
        event.setAmount(amount);
        event.setCreateTime(LocalDateTime.now());
        return event;
    }
}

2. 定义多个 Handler(链式处理)

java
// 订单验证
public class OrderValidator implements EventHandler<OrderEvent> {
    @Override
    public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) {
        if (event.getAmount().compareTo(BigDecimal.ZERO) <= 0) {
            throw new IllegalArgumentException("订单金额必须大于0");
        }
        log.info("[验证] 订单校验通过: orderId={}", event.getOrderId());
    }
}

// 扣减库存
public class StockDeductHandler implements EventHandler<OrderEvent> {
    @Override
    public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) {
        log.info("[库存] 扣减库存: orderId={}", event.getOrderId());
        // stockService.deduct(event.getOrderId());
    }
}

// 发送通知
public class NotificationHandler implements EventHandler<OrderEvent> {
    @Override
    public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) {
        log.info("[通知] 发送订单通知: orderId={}", event.getOrderId());
        // notificationService.send(event.getUserId(), "订单已创建");
    }
}

// 记录日志
public class LoggingHandler implements EventHandler<OrderEvent> {
    @Override
    public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) {
        log.debug("[日志] 订单处理完成: orderId={}, sequence={}",
                  event.getOrderId(), sequence);
    }
}

3. 配置 Disruptor

java
@Configuration
public class OrderDisruptorConfig {

    @Bean
    public Disruptor<OrderEvent> orderDisruptor() {
        // Ring Buffer 大小 4096(2的12次幂)
        int bufferSize = 4096;

        Disruptor<OrderEvent> disruptor = new Disruptor<>(
            OrderEvent::new,
            bufferSize,
            DaemonThreadFactory.INSTANCE,
            ProducerType.SINGLE,    // 单生产者
            new BlockingWaitStrategy()  // 默认等待策略
        );

        // 链式处理:验证 → 库存 → 通知 → 日志
        disruptor.handleEventsWith(new OrderValidator())
                 .then(new StockDeductHandler())
                 .then(new NotificationHandler())
                 .then(new LoggingHandler());

        // 设置异常处理
        disruptor.setDefaultExceptionHandler(new LoggingExceptionHandler());

        disruptor.start();
        return disruptor;
    }
}

4. 生产者服务

java
@Service
@RequiredArgsConstructor
public class OrderProducer {

    private final Disruptor<OrderEvent> orderDisruptor;

    public void sendOrder(Long orderId, Long userId, BigDecimal amount) {
        RingBuffer<OrderEvent> ringBuffer = orderDisruptor.getRingBuffer();

        // 两种发布方式

        // 方式一:手动发布
        long sequence = ringBuffer.next();
        try {
            OrderEvent event = ringBuffer.get(sequence);
            event.setOrderId(orderId);
            event.setUserId(userId);
            event.setAmount(amount);
            event.setCreateTime(LocalDateTime.now());
        } finally {
            ringBuffer.publish(sequence);
        }

        // 方式二:使用 Lambda(更简洁,推荐)
        // ringBuffer.publishEvent((event, seq) -> {
        //     event.setOrderId(orderId);
        //     event.setUserId(userId);
        //     event.setAmount(amount);
        // }, orderId, userId, amount);
    }
}

5. 订单服务整合

java
@Service
@RequiredArgsConstructor
public class OrderService {

    private final OrderProducer orderProducer;
    private final OrderMapper orderMapper;

    @Transactional
    public void createOrder(Long userId, BigDecimal amount) {
        // 1. 创建订单
        Order order = new Order();
        order.setUserId(userId);
        order.setAmount(amount);
        order.setStatus("pending");
        orderMapper.insert(order);

        // 2. 发送消息到 Disruptor(异步处理后续流程)
        orderProducer.sendOrder(order.getId(), userId, amount);
    }
}

高级用法

多生产者模式

java
// 多生产者需要用 ProducerType.MULTI
Disruptor<OrderEvent> disruptor = new Disruptor<>(
    OrderEvent::new,
    1024,
    DaemonThreadFactory.INSTANCE,
    ProducerType.MULTI,  // 多生产者
    new BusySpinWaitStrategy()  // 多生产者需要更激进的等待策略
);

// 多生产者的消费者需要处理消息的顺序问题
disruptor.handleEventsWith((event, sequence, endOfBatch) -> {
    // 消息可能来自不同生产者,顺序不保证
    // 需要业务方自己保证幂等
});

消费者组(多消费者分工)

java
// 同一个消息只会被一个消费者处理
disruptor.handleEventsWithWorkerPool(
    new OrderHandler("consumer-1"),
    new OrderHandler("consumer-2"),
    new OrderHandler("consumer-3")
);

// 每个消费者处理不同类型的消息(基于 Event 的某个字段分发)
disruptor.handleEventsWith(new OrderValidator())
         .thenHandleEventsWithWorkerPool(
             handlerForTypeA,
             handlerForTypeB
         );

依赖处理(先并行再串行)

java
// 场景:订单验证和库存扣减可以并行,但都要完成后才能发送通知
//    验证 ──┐
//          ├──▶ 通知 ──▶ 记录
//    库存 ──┘

disruptor.handleEventsWith(
    new OrderValidator(),
    new StockDeductHandler()
).then(new NotificationHandler());

// 或者更复杂的 Diamond 依赖
EventHandler<OrderEvent> handler1 = ...;
EventHandler<OrderEvent> handler2 = ...;
EventHandler<OrderEvent> handler3 = ...;

disruptor.handleEventsWith(handler1, handler2)
         .then(handler3);

动态添加消费者

java
// 获取当前消费位置
long consumedSequence = ringBuffer.getCursor();

// 添加新的消费者,从某个位置开始消费
disruptor.handleEventsWith(
    new EventHandler<OrderEvent>() {
        @Override
        public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) {
            // 处理逻辑
        }
    }
);

延迟处理

java
// 使用 withDelay() 让 Handler 延迟执行
disruptor.handleEventsWith(new OrderValidator())
         .then(new StockDeductHandler().withDelay(100, TimeUnit.MILLISECONDS))
         .then(new NotificationHandler());

性能调优

1. Ring Buffer 大小

java
// 必须是 2 的幂,否则报错
int bufferSize = 4096;  // 2^12 = 4096

// 太大浪费内存,太小容易阻塞
// 建议:预估最大吞吐量 * 期望最大延迟
// 例如:每秒 10000 消息,期望延迟 1 秒,则 bufferSize >= 10000

2. 等待策略选择

java
// 低延迟场景(推荐 YieldingWaitStrategy)
Disruptor<OrderEvent> disruptor = new Disruptor<>(
    ...,
    new YieldingWaitStrategy()
);

// 高吞吐场景(BlockingWaitStrategy)
Disruptor<OrderEvent> disruptor = new Disruptor<>(
    ...,
    new BlockingWaitStrategy()
);

// 超低延迟场景(BusySpinWaitStrategy)
Disruptor<OrderEvent> disruptor = new Disruptor<>(
    ...,
    new BusySpinWaitStrategy()
);

3. 生产者数量

java
// 单生产者性能最好
// 如果确实需要多生产者,减少锁竞争
ProducerType.SINGLE

// 多生产者场景,使用 MPSC 队列优化

4. 批处理优化

java
// Handler 中的 endOfBatch 参数可以帮助做批处理
@Override
public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) {
    batch.add(event);

    if (endOfBatch) {
        // 批量处理
        processBatch(batch);
        batch.clear();
    }
}

5. CPU 缓存优化

java
// Event 字段顺序会影响 CPU 缓存命中
// 热字段(频繁访问)放前面
@Data
public class OrderEvent {
    private long orderId;        // 最热,放第一位
    private int status;          // 较热
    private long userId;
    private BigDecimal amount;   // 冷数据放后面
    private LocalDateTime createTime;
}

// 或者使用注解指定字段顺序
@JCStress
public class OrderEvent {
    @HotSpots(1)  private long orderId;
    @HotSpots(2)  private int status;
}

与 ArrayBlockingQueue 对比

特性ArrayBlockingQueueDisruptor
底层结构数组 + 两把锁Ring Buffer + CAS
吞吐量~300,000 ops/s~6,000,000 ops/s
延迟抖动大稳定,低延迟
内存每次入队 new 对象预分配,对象复用
消费模式单消费者多消费者、链式处理
等待策略单一多种可选
API 复杂度简单较复杂

常见问题

1. Ring Buffer 大小必须是 2 的幂

java
// 错误:会报错
new Disruptor<>(..., 1000, ...);

// 正确:自动找最近的 2 的幂
// 1000 -> 1024
// 2048 -> 2048
// 3000 -> 4096

2. publish 必须放在 finally 中

java
long sequence = ringBuffer.next();
try {
    // 处理逻辑
} finally {
    ringBuffer.publish(sequence);  // 必须调用,否则消息不可见
}

3. 消费者要处理异常

java
// 设置全局异常处理
disruptor.setDefaultExceptionHandler(new ExceptionHandler() {
    @Override
    public void handleEventException(Throwable ex, long sequence, Event event) {
        log.error("处理消息异常: sequence={}", sequence, ex);
    }

    @Override
    public void handleOnStartException(Throwable ex) {
        log.error("Disruptor 启动异常", ex);
    }

    @Override
    public void handleOnShutdownException(Throwable ex) {
        log.error("Disruptor 关闭异常", ex);
    }
});

// 或者针对单个 Handler
disruptor.handleEventsWith(new OrderHandler())
         .handleExceptionsWith(new CustomExceptionHandler());

4. 优雅关闭

java
// 方式一:带超时的关闭
disruptor.shutdown(10, TimeUnit.SECONDS);

// 方式二:异步关闭
CompletableFuture<Void> future = disruptor.shutdown();
// 做其他事情
future.get(10, TimeUnit.SECONDS);

// 方式三:注册关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(disruptor::shutdown));

总结

Disruptor 快的秘密:

  1. Ring Buffer - 预分配、无 GC、缓存友好
  2. 无锁设计 - 只需要 CAS,不需要锁
  3. CPU 缓存优化 - 连续内存访问
  4. 多种等待策略 - 根据场景选择最适合的

适合场景:

  • 低延迟交易系统
  • 日志收集
  • 消息推送
  • 高并发订单处理
  • 任何对性能有极端要求的并发场景

不适合场景:

  • 消息需要持久化(Disruptor 是纯内存的)
  • 消息量巨大需要堆积(内存受限)
  • API 简单就好(ArrayBlockingQueue 更易用)

如果你在做一个高频交易系统、实时日志分析,或者只是对 Java 性能优化感兴趣,Disruptor 绝对值得学习!