RabbitMQ 消息队列实战
基于 mju-ai-server 项目,深入理解消息队列的应用场景与实现
一、为什么需要消息队列
在项目开发中,我们经常遇到这样的场景:用户提交一个请求,后需要调用多个外部服务或者执行耗时操作。如果直接同步处理,用户需要等待很长时间,严重影响体验。
同步处理流程:
用户请求 → 服务A → 服务B → 服务C → 返回结果
(3秒) (2秒) (5秒)
总耗时:10秒
异步处理流程:
用户请求 → 存入消息队列 → 立即返回成功
↓
消费者异步处理
(耗时操作)消息队列的核心价值:
1. 异步解耦:发送方和接收方不需要同时在线
2. 削峰填谷:高峰期请求存入队列,平缓处理
3. 可靠传输:消息持久化,确保不丢失
4. 负载均衡:多个消费者分担压力二、项目应用场景
本项目使用 RabbitMQ 处理 AI 任务异步调用,主要涉及以下场景:
| 任务类型 | 业务说明 | 处理逻辑 |
|---|---|---|
| 简历分析 | 分析用户上传的简历 | 调用 AI 分析,生成建议 |
| 性格推荐 | 根据用户信息推荐性格 | 学生服务处理 |
| 出题任务 | 根据技能点生成题目 | 树节点服务生成 |
| 判题任务 | 判断用户答题是否正确 | 用户技能点服务处理 |
| 技能建议 | 生成学习建议 | 资源服务处理 |
三、核心代码实现
3.1 消息实体定义
java
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MjuDifyTask {
@TableId(type = IdType.AUTO)
private Long difyTaskId;
private String difyTaskType; // 任务类型
private Long sourceId; // 来源ID
private String inputs; // 输入参数
private Long num; // 执行次数
private Long userId; // 用户ID
private String state; // 任务状态:0-待执行,2-失败,3-成功
}消息对象用于在队列中传输:
java
@Data
@AllArgsConstructor
public class DifyTaskMessage {
private Long difyTaskId; // 数据库任务ID
private Long sourceId; // 来源ID
private String difyTaskType; // 任务类型
private Map<String, Object> inputs; // 输入参数
private Long num; // 执行次数
private Long userId; // 用户ID
}3.2 消息生产者
生产者负责将任务发送到消息队列:
java
@Component
public class DifyTaskProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private IMjuDifyTaskService difyTaskService;
@Value("${ai.queue}")
private String QUEUE; // 队列名称
@Value("${ai.exchange}")
public String EXCHANGE; // 交换机名称
@Value("${ai.routing}")
public String ROUTING_KEY; // 路由键
@Bean
public Queue taskQueue() {
// 创建持久化队列,消息不会因服务器重启而丢失
return new Queue(QUEUE, true);
}
@Bean
public DirectExchange taskExchange() {
// 创建持久化直接交换机
return new DirectExchange(EXCHANGE, true, false);
}
@Bean
public Binding binding(Queue taskQueue, DirectExchange taskExchange) {
// 绑定队列到交换机,指定路由键
return BindingBuilder.bind(taskQueue).to(taskExchange).with(ROUTING_KEY);
}
/**
* 发送技能树和技能点关联信息到消息队列
*/
public void sendMessage(Long sourceId, String type, Map<String, Object> inputs, Long userId) {
// 创建任务记录
MjuDifyTask difyTask = new MjuDifyTask();
difyTask.setSourceId(sourceId);
difyTask.setDifyTaskType(type);
difyTask.setUserId(userId);
difyTask.setNum(0L);
difyTask.setInputs(JSON.toJSONString(inputs));
difyTask.setState("0"); // 待执行状态
// 先保存到数据库
difyTaskService.save(difyTask);
// 创建消息对象
DifyTaskMessage message = new DifyTaskMessage(
difyTask.getDifyTaskId(),
difyTask.getSourceId(),
difyTask.getDifyTaskType(),
inputs,
difyTask.getNum(),
userId
);
// 发送到消息队列
rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY, message);
}
}3.3 消息消费者
消费者负责从队列中获取消息并处理:
java
@Component
@RabbitListener(queues = "${ai.queue}", containerFactory = "singleTaskListenerFactory")
@Slf4j
public class DifyTaskConsumer {
@Autowired
private IMjuTreePointService treePointService;
@Autowired
private IMjuDifyTaskService difyTaskService;
@Resource
private IMjuStudentService studentService;
@Resource
private IMjuUserSkillPointService userSkillPointService;
@Resource
private IMjuResourceService resourceService;
@Autowired
private MjuResumeService mjuResumeService;
@RabbitHandler
public void onMessage(DifyTaskMessage message, Message amqpMessage, Channel channel) throws IOException {
try {
log.info("[onMessage][消息内容({})],开始任务", message);
// 调用业务处理方法
processDifyTask(message);
log.info("[onMessage][消息内容({})],任务结束", message);
// 手动确认消息处理成功
channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
log.error("消息消费失败:{}", e.getMessage(), e);
// 查询数据库中的任务记录
MjuDifyTask difyTask = difyTaskService.getById(message.getDifyTaskId());
// 如果任务不存在或已重试3次,放弃处理
if (difyTask == null || difyTask.getNum() >= 3) {
difyTaskService.updateById(new MjuDifyTask(message.getDifyTaskId(), "2"));
channel.basicNack(amqpMessage.getMessageProperties().getDeliveryTag(), false, false);
return;
}
// 未超过3次,重新放入队列
channel.basicNack(amqpMessage.getMessageProperties().getDeliveryTag(), false, true);
}
}
/**
* 根据任务类型分发处理
*/
private void processDifyTask(DifyTaskMessage message) {
log.info("处理Dify任务,任务ID: {}, 任务类型: {}, 输入参数: {}",
message.getDifyTaskId(), message.getDifyTaskType(), message.getInputs());
// 增加执行次数
message.setNum(message.getNum() + 1);
MjuDifyTask difyTask = difyTaskService.getById(message.getDifyTaskId());
if (difyTask == null) {
throw new ServiceException("任务不存在");
}
difyTask.setNum(difyTask.getNum() + 1);
difyTaskService.updateById(difyTask);
// 根据类型分发处理
switch (message.getDifyTaskType()) {
case "RESUME_ANALYSIS":
// 简历分析任务
log.info("开始执行简历分析任务");
break;
case "CHARACTER_RECOMMEND":
// 性格推荐任务,调用学生服务
studentService.spawns(message);
break;
case "TOPIC":
// 出题任务,生成技能点测试
treePointService.generateSkillPointTest(message);
break;
case "DECIDE_TOPIC":
// 判题任务,判断答题正确性
userSkillPointService.decide(message);
break;
case "RECOMMENDATIONS":
// 技能点建议任务
userSkillPointService.recommend(message);
break;
case "LEARNING":
// 学习建议任务
resourceService.improves(message);
break;
default:
throw new IllegalArgumentException("未知的Dify任务类型: " + message.getDifyTaskType());
}
// 更新任务状态为成功
difyTaskService.updateById(new MjuDifyTask(message.getDifyTaskId(), "3"));
}
}四、核心知识点
4.1 手动确认机制
消息确认是 RabbitMQ 确保消息可靠传输的关键:
自动确认:消息投递给消费者后立即确认,可能丢失
手动确认:消费者处理完成后手动确认,确保可靠性手动确认的实现:
java
// 处理成功,手动确认
channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);
// 处理失败,参数说明:
// - deliveryTag:消息的唯一标识
// - multiple:是否批量确认
// - requeue:是否重新入队
channel.basicNack(deliveryTag, false, false); // 丢弃消息
channel.basicNack(deliveryTag, false, true); // 重新入队4.2 失败重试策略
本项目采用数据库记录加消息重试的策略:
1. 每次消费失败,增加执行次数 num
2. 如果 num >= 3,标记任务为失败状态,不再重试
3. 如果 num < 3,消息重新入队,稍后重试4.3 任务持久化
将任务先存入数据库,再发送到队列:
好处:
1. 任务状态可追踪
2. 服务重启后任务不丢失
3. 方便排查问题和统计分析五、配置说明
yaml
# application.yml 配置示例
ai:
queue: dify_task_queue # 队列名称
exchange: dify_task_exchange # 交换机名称
routing: dify_task_routing # 路由键
spring:
rabbitmq:
host: localhost # RabbitMQ 地址
port: 5672 # RabbitMQ 端口
username: admin # 用户名
password: admin123 # 密码
listener:
simple:
acknowledge-mode: manual # 手动确认
prefetch: 1 # 每次只取一条消息六、总结
| 知识点 | 说明 |
|---|---|
| Direct Exchange | 直接交换机,根据路由键精确匹配 |
| Queue 持久化 | 队列 durable=true,消息不丢失 |
| Message 持久化 | 消息 persistent=true |
| 手动 ACK | 确保消息可靠处理 |
| 失败重试 | 最多重试3次,超时标记失败 |
| 任务状态追踪 | 数据库记录任务状态 |
消息队列是分布式系统中不可或缺的组件,通过本项目的实践,我们可以学习到:
1. 如何设计消息格式
2. 如何实现可靠的消息传输
3. 如何处理消费失败和重试
4. 如何追踪任务状态