RabbitMQ 深入实战
死信队列、延迟队列、消息可靠、幂等、堆积解决方案
一、死信队列(Dead Letter Queue)
1.1 什么是死信?
死信 = 无法被正常消费的消息
触发条件:
1. 消息被拒绝(reject/nack),且不重新入队
2. 消息过期(TTL 到期)
3. 队列达到最大长度1.2 死信队列配置
java
// 1. 创建死信交换机和死信队列
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx.exchange"); // 死信交换机
args.put("x-dead-letter-routing-key", "dlx.key"); // 死信路由key
// 2. 正常队列绑定死信配置
Queue normalQueue = new Queue("normal.queue", true, false, false, args);java
// 3. 声明死信交换机和队列
DirectExchange dlxExchange = new DirectExchange("dlx.exchange");
Queue dlq = new Queue("dlq", true);
// 绑定
BindingBuilder.bind(dlq).to(dlxExchange).with("dlx.key");1.3 死信消息特性
java
// 死信消息会携带额外信息
MessageProperties properties = message.getMessageProperties();
properties.getDeadLetterExchange(); // 死信交换机
properties.getDeadLetterRoutingKey(); // 死信路由key
properties.getExpiration(); // 消息TTL(如果是过期导致)
properties.getRedelivered(); // 是否被重试过1.4 死信队列应用场景
| 场景 | 说明 |
|---|---|
| 订单超时未支付 | 消息过期后进入死信队列,关闭订单 |
| 消息消费失败 | 多次重试失败后进入死信队列,人工处理 |
| 削峰填谷 | 超过队列长度,进入死信队列 |
二、延迟队列(Delay Queue)
2.1 延迟队列原理
延迟队列 = 消息不立即消费,延迟一段时间后再处理
实现方式:
1. 消息 TTL + 死信队列
2. 插件 rabbitmq_delayed_message_exchange2.2 方式一:TTL + 死信队列
java
// 1. 创建延迟队列(设置 TTL)
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 5000); // 5秒延迟
args.put("x-dead-letter-exchange", "order.exchange");
args.put("x-dead-letter-routing-key", "order.created");
Queue delayQueue = new Queue("delay.queue", false, false, false, args);
// 2. 绑定交换机
channel.exchangeDeclare("delay.exchange", "direct", true);
channel.queueDeclare("delay.queue", false, false, false, args);
channel.queueBind("delay.queue", "delay.exchange", "delay.key");
// 3. 死信交换机和队列(真正处理)
channel.exchangeDeclare("order.exchange", "direct", true);
channel.queueDeclare("order.queue", true, false, false, null);
channel.queueBind("order.queue", "order.exchange", "order.created");2.3 方式二:延迟插件(推荐)
bash
# 安装插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchangejava
// 声明延迟交换机
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare(
"delay.exchange",
"x-delayed-message", // 交换机类型
true,
false,
args
);
// 发送延迟消息(添加 header)
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.headers(Map.of("x-delay", 5000)) // 延迟5秒
.build();
channel.basicPublish("delay.exchange", "delay.key", properties, body);2.4 延迟队列应用场景
| 场景 | 延迟时间 |
|---|---|
| 订单超时取消 | 15/30分钟 |
| 短信验证码 | 60秒 |
| 订单发货提醒 | 24小时 |
| 批量处理 | 10分钟 |
三、消息可靠性保证
3.1 消息丢失场景
3.2 生产端可靠性
java
// 1. 开启生产者确认
channel.confirmSelect();
channel.addConfirmListener((ack, deliveryTag) -> {
// 消息成功到达 Broker
}, (ack, deliveryTag) -> {
// 消息发送失败,重试
});
// 2. 发送消息时指定 correlationId
String correlationId = UUID.randomUUID().toString();
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.correlationId(correlationId)
.deliveryMode(2) // 持久化
.build();
channel.basicPublish(exchange, routingKey, properties, message);3.3 Broker 可靠性
java
// 1. 交换机持久化
channel.exchangeDeclare(exchange, "direct", true); // durable = true
// 2. 队列持久化
channel.queueDeclare(queue, true, false, false, null); // durable = true
// 3. 消息持久化
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 2 = 持久化
.build();3.4 消费端可靠性
java
// 1. 手动确认
channel.basicConsume(queue, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) {
try {
// 处理业务
process(body);
// 手动确认
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
// 处理失败,拒绝消息(可选择是否重新入队)
channel.basicNack(envelope.getDeliveryTag(), false, true);
}
}
});3.5 可靠消息方案总结
| 环节 | 方案 |
|---|---|
| 生产端 | 生产者确认 + 消息持久化 |
| Broker | 交换机/队列/消息 3重持久化 |
| 消费端 | 手动 ACK + 幂等处理 |
四、消息幂等性
4.1 重复消息原因
4.2 幂等方案
方案1:消息ID去重
java
// 生产者:生成唯一消息ID
String messageId = UUID.randomUUID().toString();
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.messageId(messageId)
.build();
// 消费者:去重表
private final Set<String> processedIds = new HashSet<>();
public void handleDelivery(Message message) {
String messageId = message.getMessageId();
if (processedIds.contains(messageId)) {
// 重复消息,跳过
return;
}
// 处理业务
process(message);
// 标记已处理
processedIds.add(messageId);
}方案2:Redis 去重
java
public void handleDelivery(Message message) {
String messageId = message.getMessageId();
String key = "mq:dedup:" + messageId;
// SETNX 原子操作
Boolean result = redisTemplate.opsForValue().setIfAbsent(key, "1", 24, TimeUnit.HOURS);
if (!result) {
// 已存在,重复消息
return;
}
process(message);
}方案3:数据库唯一索引
sql
-- 消息处理记录表
CREATE TABLE mq_message_log (
message_id VARCHAR(64) PRIMARY KEY,
status VARCHAR(20), -- 处理状态
create_time DATETIME
);
-- 消费时插入
INSERT INTO mq_message_log (message_id, status) VALUES ('msg_123', 'PROCESSED');
-- 成功 → 处理消息
-- 失败(主键冲突)→ 重复,跳过4.3 幂等方案对比
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 内存 Set | 性能高 | 重启丢失,内存受限 | 单机低并发 |
| Redis | 性能高,持久化 | Redis 挂了? | 高并发 |
| 数据库 | 可靠 | 性能一般 | 高可靠场景 |
五、消息堆积处理
5.1 堆积原因
消息堆积 = 生产速度 > 消费速度
常见原因:
1. 消费者故障/重启
2. 消费者性能不足
3. 消费逻辑复杂耗时
4. 突发流量5.2 解决方案
5.3 具体方案
方案1:增加消费者
java
// 启动多个消费者实例
for (int i = 0; i < 10; i++) {
new Thread(() -> {
channel.basicConsume(queue, false, consumer);
}).start();
}方案2:临时扩容消费能力
java
// 提高 prefetch 数量(一次获取更多消息)
channel.basicQos(100); // 一次最多获取100条方案3:消息过期处理
java
// 设置队列消息过期时间(ms)
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 3600000); // 1小时
args.put("x-dead-letter-exchange", "dlx.exchange");
// 或设置单条消息过期
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.expiration("3600000")
.build();方案4:消息分流
java
// 重要消息:正常队列
// 非重要消息:死信队列或直接丢弃
if (isImportant) {
channel.basicPublish("important.exchange", "key", properties, body);
} else {
// 直接丢弃或记录日志
log.warn("消息丢弃: {}", messageId);
}5.4 监控告警
java
// 监控队列消息数量
Map<String, Object> queueInfo = channel.queueDeclarePassive(queue).getQueueArgs();
Long messageCount = (Long) queueInfo.get("messageCount");
if (messageCount > 10000) {
// 发送告警
alert("消息堆积告警: " + queue + " 有 " + messageCount + " 条消息");
}5.5 预防措施
| 措施 | 说明 |
|---|---|
| 限流 | 避免突发流量压垮消费者 |
| 监控 | 队列大小、消息数量告警 |
| 隔离 | 重要/非重要消息分开处理 |
| 死信 | 设置死信队列,处理失败消息 |
六、完整实战示例
6.1 订单超时取消场景
java
// 1. 配置类
@Configuration
public class OrderDelayConfig {
// 订单超时交换机
@Bean
public DirectExchange orderDelayExchange() {
return new DirectExchange("order.delay.exchange", true, false);
}
// 延迟队列(设置 TTL,过期后进死信)
@Bean
public Queue orderDelayQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 1800000); // 30分钟
args.put("x-dead-letter-exchange", "order.dlx.exchange");
args.put("x-dead-letter-routing-key", "order.cancel");
return new Queue("order.delay.queue", false, false, false, args);
}
// 死信交换机(真正处理)
@Bean
public DirectExchange orderDlxExchange() {
return new DirectExchange("order.dlx.exchange", true, false);
}
// 死信队列(取消订单)
@Bean
public Queue orderCancelQueue() {
return new Queue("order.cancel.queue", true);
}
@Bean
public Binding orderDelayBinding() {
return BindingBuilder.bind(orderDelayQueue())
.to(orderDelayExchange())
.with("order.delay");
}
@Bean
public Binding orderCancelBinding() {
return BindingBuilder.bind(orderCancelQueue())
.to(orderDlxExchange())
.with("order.cancel");
}
}java
// 2. 生产者:下单时发送延迟消息
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void createOrder(Order order) {
// 创建订单...
// 发送延迟消息(30分钟后取消)
rabbitTemplate.convertAndSend(
"order.delay.exchange",
"order.delay",
order.getOrderId(),
message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
}
);
}
}java
// 3. 消费者:处理超时订单
@Service
public class OrderCancelConsumer {
@RabbitListener(queues = "order.cancel.queue")
public void handleCancelOrder(String orderId) {
// 查询订单状态
Order order = orderMapper.selectById(orderId);
if (order != null && "待支付".equals(order.getStatus())) {
// 取消订单
order.setStatus("已取消");
orderMapper.updateById(order);
// 恢复库存
stockService.releaseStock(order.getProductId(), order.getQuantity());
log.info("订单超时取消: {}", orderId);
}
}
}七、总结
| 特性 | 核心方案 |
|---|---|
| 死信队列 | 拒绝/过期/超长 → 死信交换机 → 死信队列 |
| 延迟队列 | TTL + 死信 或 延迟插件 |
| 消息可靠 | 生产确认 + 3重持久化 + 手动ACK |
| 消息幂等 | 消息ID + Redis/数据库去重 |
| 消息堆积 | 增加消费者 + 临时扩容 + 过期清理 |