Disruptor:高性能队列的秘密
如果我告诉你,有一个队列的性能比 ArrayBlockingQueue 快 10 倍以上,你信吗?
这就是 Disruptor,由 LMAX 交易所开发的一个高性能消息队列框架。LMAX 是一家英国的金融科技公司,他们用 Java 做到了每秒钟处理 600 万订单,靠的就是 Disruptor。
先看个对比
java
// ArrayBlockingQueue:传统阻塞队列
ArrayBlockingQueue<Order> queue = new ArrayBlockingQueue<>(1024);
// Disruptor:高并发无锁队列
Disruptor<Order> disruptor = new Disruptor<>(Order::new, 1024, DaemonThreadFactory.INSTANCE);差距有多大?
ArrayBlockingQueue 吞吐量: ~300,000 ops/s
Disruptor 吞吐量: ~6,000,000 ops/s
性能提升: 20 倍!为什么这么快?让我慢慢道来。
核心原理
1. Ring Buffer - 预分配的循环数组
Disruptor 底层用的是 Ring Buffer(环形缓冲区):
Ring Buffer 结构:
┌─────────────────────────────────────────────────────┐
│ Ring Buffer │
│ index: 0 1 2 3 4 5 6 │
│ value: [msg1] [msg2] [msg3] [msg4] [msg5] [msg6] │
│ │
│ producer 写入 ──────────────────────────▶ │
│ ◀────────────────── consumer │
└─────────────────────────────────────────────────────┘
特点:
- 预分配内存,运行时不需要 GC(对象复用)
- 数组在内存上是连续的,CPU 缓存命中率高
- 读写不需要锁,利用 CAS 保证线程安全
- 长度固定,head/tail 指针循环,不会有内存分配为什么用 Ring Buffer?
传统队列(如 LinkedBlockingQueue)的问题:
- 每次入队都 new 对象 → 频繁 GC
- 每次出队对象可能被回收 → 内存碎片
- 链表节点不连续 → CPU 缓存不友好
Ring Buffer 的优势:
- 预分配固定数量的槽位
- 生产者和消费者各自维护 Sequence(序号)
- 通过 CAS 操作序号,无锁并发
2. Sequence - 序号追踪
每个消费者和生产者都有一个 Sequence,记录自己处理到哪个位置:
生产者 Sequence = 5 (已写入到位置 5)
消费者 Sequence = 3 (已消费到位置 3)
可用消息 = 生产者 Sequence - 消费者 Sequence = 2 条Sequence 本质上是一个 Long 类型的原子变量,通过 CAS 更新:
java
// 伪代码
public long incrementAndGet() {
long current;
long next;
do {
current = sequence.get();
next = current + 1;
} while (!sequence.compareAndSet(current, next));
return next;
}3. 无锁设计
Disruptor 只需要 一把锁(生产者的 claim),而 ArrayBlockingQueue 需要 两把锁(入队 + 出队)。
ArrayBlockingQueue 的问题:
- 入队需要获取 putLock
- 出队需要获取 takeLock
- 两个操作不能同时进行
Disruptor 的设计:
- 生产者通过 CAS claim 下一个可用槽位
- 消费者各自读取,互相不阻塞
- 只有在队列满/空时才会阻塞核心概念
消费者模式
Disruptor 支持多种消费者模式:
1. 单生产者单消费者(最简单的场景)
Producer ──▶ [Ring Buffer] ──▶ Consumer
2. 多消费者(每个消息被所有消费者处理)
Producer ──▶ [Ring Buffer] ──▶ Consumer1
──▶ Consumer2
──▶ Consumer3
3. 消费者组(每个消息只被一个消费者处理)
Producer ──▶ [Ring Buffer] ──▶ ConsumerGroup
├── Consumer A
└── Consumer B
4. 菱形消费(先并行再串行)
Producer ──▶ [Ring Buffer] ──▶ Handler1 ──▶ Handler3
└─▶ Handler2 ──┘
5. 链式依赖(A → B → C)
Producer ──▶ [Ring Buffer] ──▶ A ──▶ B ──▶ CWaitStrategy - 等待策略
消费者需要等待可用消息,Disruptor 提供了多种等待策略:
1. BlockingWaitStrategy(默认)
- 使用 ReentrantLock + Condition
- 吞吐量低但 CPU 占用最低
- 适合对延迟不敏感的场景
2. SleepingWaitStrategy
- 先自旋,失败后 sleep
- 延迟不稳定但 CPU 消耗低
- 适合异步日志等场景
3. YieldingWaitStrategy
- 自旋 + Thread.yield()
- 延迟低,吞吐量高
- 适合高并发低延迟场景
4. BusySpinWaitStrategy
- 纯自旋,延迟最低
- CPU 占用最高
- 适合超低延迟场景(但要求核心数够多)Event 与 EventFactory
java
// 1. 定义 Event(要传递的消息)
@Data
public class OrderEvent {
private Long orderId;
private BigDecimal amount;
private String status;
}
// 2. 定义 EventFactory(用于预分配)
public class OrderEventFactory implements EventFactory<OrderEvent> {
@Override
public OrderEvent newInstance() {
return new OrderEvent();
}
}
// 3. 定义 Handler(处理逻辑)
public class OrderHandler implements EventHandler<OrderEvent> {
@Override
public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) {
log.info("处理订单: orderId={}, amount={}",
event.getOrderId(), event.getAmount());
}
}基础用法
引入依赖
xml
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.4</version>
</dependency>最简单的例子
java
public class SimpleDisruptorDemo {
public static void main(String[] args) {
// 1. 创建 Disruptor
Disruptor<OrderEvent> disruptor = new Disruptor<>(
OrderEvent::new, // Event 工厂
1024, // Ring Buffer 大小(必须是 2 的幂)
DaemonThreadFactory.INSTANCE // 线程工厂
);
// 2. 注册消费者
disruptor.handleEventsWith((event, sequence, endOfBatch) -> {
log.info("收到订单: orderId={}, amount={}",
event.getOrderId(), event.getAmount());
});
// 3. 启动
disruptor.start();
// 4. 获取 Ring Buffer 发消息
RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
for (int i = 0; i < 10; i++) {
long seq = ringBuffer.next();
try {
OrderEvent event = ringBuffer.get(seq);
event.setOrderId((long) i);
event.setAmount(BigDecimal.valueOf(100 * i));
} finally {
ringBuffer.publish(seq); // 必须 publish,否则消息不可见
}
}
// 5. 关闭
disruptor.shutdown();
}
}使用 Lambda 简化
java
// 不用单独定义 Handler,直接用 Lambda
Disruptor<OrderEvent> disruptor = new Disruptor<>(
OrderEvent::new, 1024, DaemonThreadFactory.INSTANCE
);
disruptor.handleEventsWith((event, sequence, endOfBatch) -> {
log.info("处理订单: {}", event.getOrderId());
});
// Lambda 方式二
disruptor.handleEventsWith(event -> {
log.info("处理订单: {}", event.getOrderId());
});实战:订单处理系统
1. 定义订单 Event
java
@Data
public class OrderEvent {
private Long orderId;
private Long userId;
private BigDecimal amount;
private LocalDateTime createTime;
public static OrderEvent of(Long orderId, Long userId, BigDecimal amount) {
OrderEvent event = new OrderEvent();
event.setOrderId(orderId);
event.setUserId(userId);
event.setAmount(amount);
event.setCreateTime(LocalDateTime.now());
return event;
}
}2. 定义多个 Handler(链式处理)
java
// 订单验证
public class OrderValidator implements EventHandler<OrderEvent> {
@Override
public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) {
if (event.getAmount().compareTo(BigDecimal.ZERO) <= 0) {
throw new IllegalArgumentException("订单金额必须大于0");
}
log.info("[验证] 订单校验通过: orderId={}", event.getOrderId());
}
}
// 扣减库存
public class StockDeductHandler implements EventHandler<OrderEvent> {
@Override
public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) {
log.info("[库存] 扣减库存: orderId={}", event.getOrderId());
// stockService.deduct(event.getOrderId());
}
}
// 发送通知
public class NotificationHandler implements EventHandler<OrderEvent> {
@Override
public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) {
log.info("[通知] 发送订单通知: orderId={}", event.getOrderId());
// notificationService.send(event.getUserId(), "订单已创建");
}
}
// 记录日志
public class LoggingHandler implements EventHandler<OrderEvent> {
@Override
public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) {
log.debug("[日志] 订单处理完成: orderId={}, sequence={}",
event.getOrderId(), sequence);
}
}3. 配置 Disruptor
java
@Configuration
public class OrderDisruptorConfig {
@Bean
public Disruptor<OrderEvent> orderDisruptor() {
// Ring Buffer 大小 4096(2的12次幂)
int bufferSize = 4096;
Disruptor<OrderEvent> disruptor = new Disruptor<>(
OrderEvent::new,
bufferSize,
DaemonThreadFactory.INSTANCE,
ProducerType.SINGLE, // 单生产者
new BlockingWaitStrategy() // 默认等待策略
);
// 链式处理:验证 → 库存 → 通知 → 日志
disruptor.handleEventsWith(new OrderValidator())
.then(new StockDeductHandler())
.then(new NotificationHandler())
.then(new LoggingHandler());
// 设置异常处理
disruptor.setDefaultExceptionHandler(new LoggingExceptionHandler());
disruptor.start();
return disruptor;
}
}4. 生产者服务
java
@Service
@RequiredArgsConstructor
public class OrderProducer {
private final Disruptor<OrderEvent> orderDisruptor;
public void sendOrder(Long orderId, Long userId, BigDecimal amount) {
RingBuffer<OrderEvent> ringBuffer = orderDisruptor.getRingBuffer();
// 两种发布方式
// 方式一:手动发布
long sequence = ringBuffer.next();
try {
OrderEvent event = ringBuffer.get(sequence);
event.setOrderId(orderId);
event.setUserId(userId);
event.setAmount(amount);
event.setCreateTime(LocalDateTime.now());
} finally {
ringBuffer.publish(sequence);
}
// 方式二:使用 Lambda(更简洁,推荐)
// ringBuffer.publishEvent((event, seq) -> {
// event.setOrderId(orderId);
// event.setUserId(userId);
// event.setAmount(amount);
// }, orderId, userId, amount);
}
}5. 订单服务整合
java
@Service
@RequiredArgsConstructor
public class OrderService {
private final OrderProducer orderProducer;
private final OrderMapper orderMapper;
@Transactional
public void createOrder(Long userId, BigDecimal amount) {
// 1. 创建订单
Order order = new Order();
order.setUserId(userId);
order.setAmount(amount);
order.setStatus("pending");
orderMapper.insert(order);
// 2. 发送消息到 Disruptor(异步处理后续流程)
orderProducer.sendOrder(order.getId(), userId, amount);
}
}高级用法
多生产者模式
java
// 多生产者需要用 ProducerType.MULTI
Disruptor<OrderEvent> disruptor = new Disruptor<>(
OrderEvent::new,
1024,
DaemonThreadFactory.INSTANCE,
ProducerType.MULTI, // 多生产者
new BusySpinWaitStrategy() // 多生产者需要更激进的等待策略
);
// 多生产者的消费者需要处理消息的顺序问题
disruptor.handleEventsWith((event, sequence, endOfBatch) -> {
// 消息可能来自不同生产者,顺序不保证
// 需要业务方自己保证幂等
});消费者组(多消费者分工)
java
// 同一个消息只会被一个消费者处理
disruptor.handleEventsWithWorkerPool(
new OrderHandler("consumer-1"),
new OrderHandler("consumer-2"),
new OrderHandler("consumer-3")
);
// 每个消费者处理不同类型的消息(基于 Event 的某个字段分发)
disruptor.handleEventsWith(new OrderValidator())
.thenHandleEventsWithWorkerPool(
handlerForTypeA,
handlerForTypeB
);依赖处理(先并行再串行)
java
// 场景:订单验证和库存扣减可以并行,但都要完成后才能发送通知
// 验证 ──┐
// ├──▶ 通知 ──▶ 记录
// 库存 ──┘
disruptor.handleEventsWith(
new OrderValidator(),
new StockDeductHandler()
).then(new NotificationHandler());
// 或者更复杂的 Diamond 依赖
EventHandler<OrderEvent> handler1 = ...;
EventHandler<OrderEvent> handler2 = ...;
EventHandler<OrderEvent> handler3 = ...;
disruptor.handleEventsWith(handler1, handler2)
.then(handler3);动态添加消费者
java
// 获取当前消费位置
long consumedSequence = ringBuffer.getCursor();
// 添加新的消费者,从某个位置开始消费
disruptor.handleEventsWith(
new EventHandler<OrderEvent>() {
@Override
public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) {
// 处理逻辑
}
}
);延迟处理
java
// 使用 withDelay() 让 Handler 延迟执行
disruptor.handleEventsWith(new OrderValidator())
.then(new StockDeductHandler().withDelay(100, TimeUnit.MILLISECONDS))
.then(new NotificationHandler());性能调优
1. Ring Buffer 大小
java
// 必须是 2 的幂,否则报错
int bufferSize = 4096; // 2^12 = 4096
// 太大浪费内存,太小容易阻塞
// 建议:预估最大吞吐量 * 期望最大延迟
// 例如:每秒 10000 消息,期望延迟 1 秒,则 bufferSize >= 100002. 等待策略选择
java
// 低延迟场景(推荐 YieldingWaitStrategy)
Disruptor<OrderEvent> disruptor = new Disruptor<>(
...,
new YieldingWaitStrategy()
);
// 高吞吐场景(BlockingWaitStrategy)
Disruptor<OrderEvent> disruptor = new Disruptor<>(
...,
new BlockingWaitStrategy()
);
// 超低延迟场景(BusySpinWaitStrategy)
Disruptor<OrderEvent> disruptor = new Disruptor<>(
...,
new BusySpinWaitStrategy()
);3. 生产者数量
java
// 单生产者性能最好
// 如果确实需要多生产者,减少锁竞争
ProducerType.SINGLE
// 多生产者场景,使用 MPSC 队列优化4. 批处理优化
java
// Handler 中的 endOfBatch 参数可以帮助做批处理
@Override
public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) {
batch.add(event);
if (endOfBatch) {
// 批量处理
processBatch(batch);
batch.clear();
}
}5. CPU 缓存优化
java
// Event 字段顺序会影响 CPU 缓存命中
// 热字段(频繁访问)放前面
@Data
public class OrderEvent {
private long orderId; // 最热,放第一位
private int status; // 较热
private long userId;
private BigDecimal amount; // 冷数据放后面
private LocalDateTime createTime;
}
// 或者使用注解指定字段顺序
@JCStress
public class OrderEvent {
@HotSpots(1) private long orderId;
@HotSpots(2) private int status;
}与 ArrayBlockingQueue 对比
| 特性 | ArrayBlockingQueue | Disruptor |
|---|---|---|
| 底层结构 | 数组 + 两把锁 | Ring Buffer + CAS |
| 吞吐量 | ~300,000 ops/s | ~6,000,000 ops/s |
| 延迟 | 抖动大 | 稳定,低延迟 |
| 内存 | 每次入队 new 对象 | 预分配,对象复用 |
| 消费模式 | 单消费者 | 多消费者、链式处理 |
| 等待策略 | 单一 | 多种可选 |
| API 复杂度 | 简单 | 较复杂 |
常见问题
1. Ring Buffer 大小必须是 2 的幂
java
// 错误:会报错
new Disruptor<>(..., 1000, ...);
// 正确:自动找最近的 2 的幂
// 1000 -> 1024
// 2048 -> 2048
// 3000 -> 40962. publish 必须放在 finally 中
java
long sequence = ringBuffer.next();
try {
// 处理逻辑
} finally {
ringBuffer.publish(sequence); // 必须调用,否则消息不可见
}3. 消费者要处理异常
java
// 设置全局异常处理
disruptor.setDefaultExceptionHandler(new ExceptionHandler() {
@Override
public void handleEventException(Throwable ex, long sequence, Event event) {
log.error("处理消息异常: sequence={}", sequence, ex);
}
@Override
public void handleOnStartException(Throwable ex) {
log.error("Disruptor 启动异常", ex);
}
@Override
public void handleOnShutdownException(Throwable ex) {
log.error("Disruptor 关闭异常", ex);
}
});
// 或者针对单个 Handler
disruptor.handleEventsWith(new OrderHandler())
.handleExceptionsWith(new CustomExceptionHandler());4. 优雅关闭
java
// 方式一:带超时的关闭
disruptor.shutdown(10, TimeUnit.SECONDS);
// 方式二:异步关闭
CompletableFuture<Void> future = disruptor.shutdown();
// 做其他事情
future.get(10, TimeUnit.SECONDS);
// 方式三:注册关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(disruptor::shutdown));总结
Disruptor 快的秘密:
- Ring Buffer - 预分配、无 GC、缓存友好
- 无锁设计 - 只需要 CAS,不需要锁
- CPU 缓存优化 - 连续内存访问
- 多种等待策略 - 根据场景选择最适合的
适合场景:
- 低延迟交易系统
- 日志收集
- 消息推送
- 高并发订单处理
- 任何对性能有极端要求的并发场景
不适合场景:
- 消息需要持久化(Disruptor 是纯内存的)
- 消息量巨大需要堆积(内存受限)
- API 简单就好(ArrayBlockingQueue 更易用)
如果你在做一个高频交易系统、实时日志分析,或者只是对 Java 性能优化感兴趣,Disruptor 绝对值得学习!