Skip to content

RabbitMQ 消息队列实战

基于 mju-ai-server 项目,深入理解消息队列的应用场景与实现

一、为什么需要消息队列

在项目开发中,我们经常遇到这样的场景:用户提交一个请求,后需要调用多个外部服务或者执行耗时操作。如果直接同步处理,用户需要等待很长时间,严重影响体验。

同步处理流程:
用户请求 → 服务A → 服务B → 服务C → 返回结果
            (3秒)   (2秒)   (5秒)
            总耗时:10秒

异步处理流程:
用户请求 → 存入消息队列 → 立即返回成功

              消费者异步处理
              (耗时操作)

消息队列的核心价值:

1. 异步解耦:发送方和接收方不需要同时在线
2. 削峰填谷:高峰期请求存入队列,平缓处理
3. 可靠传输:消息持久化,确保不丢失
4. 负载均衡:多个消费者分担压力

二、项目应用场景

本项目使用 RabbitMQ 处理 AI 任务异步调用,主要涉及以下场景:

任务类型业务说明处理逻辑
简历分析分析用户上传的简历调用 AI 分析,生成建议
性格推荐根据用户信息推荐性格学生服务处理
出题任务根据技能点生成题目树节点服务生成
判题任务判断用户答题是否正确用户技能点服务处理
技能建议生成学习建议资源服务处理

三、核心代码实现

3.1 消息实体定义

java
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MjuDifyTask {

    @TableId(type = IdType.AUTO)
    private Long difyTaskId;
    private String difyTaskType;     // 任务类型
    private Long sourceId;           // 来源ID
    private String inputs;          // 输入参数
    private Long num;               // 执行次数
    private Long userId;            // 用户ID
    private String state;           // 任务状态:0-待执行,2-失败,3-成功
}

消息对象用于在队列中传输:

java
@Data
@AllArgsConstructor
public class DifyTaskMessage {
    private Long difyTaskId;              // 数据库任务ID
    private Long sourceId;                // 来源ID
    private String difyTaskType;          // 任务类型
    private Map<String, Object> inputs;   // 输入参数
    private Long num;                     // 执行次数
    private Long userId;                  // 用户ID
}

3.2 消息生产者

生产者负责将任务发送到消息队列:

java
@Component
public class DifyTaskProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private IMjuDifyTaskService difyTaskService;

    @Value("${ai.queue}")
    private String QUEUE;                  // 队列名称

    @Value("${ai.exchange}")
    public String EXCHANGE;                // 交换机名称

    @Value("${ai.routing}")
    public String ROUTING_KEY;             // 路由键

    @Bean
    public Queue taskQueue() {
        // 创建持久化队列,消息不会因服务器重启而丢失
        return new Queue(QUEUE, true);
    }

    @Bean
    public DirectExchange taskExchange() {
        // 创建持久化直接交换机
        return new DirectExchange(EXCHANGE, true, false);
    }

    @Bean
    public Binding binding(Queue taskQueue, DirectExchange taskExchange) {
        // 绑定队列到交换机,指定路由键
        return BindingBuilder.bind(taskQueue).to(taskExchange).with(ROUTING_KEY);
    }

    /**
     * 发送技能树和技能点关联信息到消息队列
     */
    public void sendMessage(Long sourceId, String type, Map<String, Object> inputs, Long userId) {
        // 创建任务记录
        MjuDifyTask difyTask = new MjuDifyTask();
        difyTask.setSourceId(sourceId);
        difyTask.setDifyTaskType(type);
        difyTask.setUserId(userId);
        difyTask.setNum(0L);
        difyTask.setInputs(JSON.toJSONString(inputs));
        difyTask.setState("0");  // 待执行状态

        // 先保存到数据库
        difyTaskService.save(difyTask);

        // 创建消息对象
        DifyTaskMessage message = new DifyTaskMessage(
                difyTask.getDifyTaskId(),
                difyTask.getSourceId(),
                difyTask.getDifyTaskType(),
                inputs,
                difyTask.getNum(),
                userId
        );

        // 发送到消息队列
        rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY, message);
    }
}

3.3 消息消费者

消费者负责从队列中获取消息并处理:

java
@Component
@RabbitListener(queues = "${ai.queue}", containerFactory = "singleTaskListenerFactory")
@Slf4j
public class DifyTaskConsumer {

    @Autowired
    private IMjuTreePointService treePointService;
    @Autowired
    private IMjuDifyTaskService difyTaskService;
    @Resource
    private IMjuStudentService studentService;
    @Resource
    private IMjuUserSkillPointService userSkillPointService;
    @Resource
    private IMjuResourceService resourceService;
    @Autowired
    private MjuResumeService mjuResumeService;

    @RabbitHandler
    public void onMessage(DifyTaskMessage message, Message amqpMessage, Channel channel) throws IOException {
        try {
            log.info("[onMessage][消息内容({})],开始任务", message);

            // 调用业务处理方法
            processDifyTask(message);

            log.info("[onMessage][消息内容({})],任务结束", message);

            // 手动确认消息处理成功
            channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            log.error("消息消费失败:{}", e.getMessage(), e);

            // 查询数据库中的任务记录
            MjuDifyTask difyTask = difyTaskService.getById(message.getDifyTaskId());

            // 如果任务不存在或已重试3次,放弃处理
            if (difyTask == null || difyTask.getNum() >= 3) {
                difyTaskService.updateById(new MjuDifyTask(message.getDifyTaskId(), "2"));
                channel.basicNack(amqpMessage.getMessageProperties().getDeliveryTag(), false, false);
                return;
            }

            // 未超过3次,重新放入队列
            channel.basicNack(amqpMessage.getMessageProperties().getDeliveryTag(), false, true);
        }
    }

    /**
     * 根据任务类型分发处理
     */
    private void processDifyTask(DifyTaskMessage message) {
        log.info("处理Dify任务,任务ID: {}, 任务类型: {}, 输入参数: {}",
                message.getDifyTaskId(), message.getDifyTaskType(), message.getInputs());

        // 增加执行次数
        message.setNum(message.getNum() + 1);
        MjuDifyTask difyTask = difyTaskService.getById(message.getDifyTaskId());
        if (difyTask == null) {
            throw new ServiceException("任务不存在");
        }
        difyTask.setNum(difyTask.getNum() + 1);
        difyTaskService.updateById(difyTask);

        // 根据类型分发处理
        switch (message.getDifyTaskType()) {
            case "RESUME_ANALYSIS":
                // 简历分析任务
                log.info("开始执行简历分析任务");
                break;
            case "CHARACTER_RECOMMEND":
                // 性格推荐任务,调用学生服务
                studentService.spawns(message);
                break;
            case "TOPIC":
                // 出题任务,生成技能点测试
                treePointService.generateSkillPointTest(message);
                break;
            case "DECIDE_TOPIC":
                // 判题任务,判断答题正确性
                userSkillPointService.decide(message);
                break;
            case "RECOMMENDATIONS":
                // 技能点建议任务
                userSkillPointService.recommend(message);
                break;
            case "LEARNING":
                // 学习建议任务
                resourceService.improves(message);
                break;
            default:
                throw new IllegalArgumentException("未知的Dify任务类型: " + message.getDifyTaskType());
        }

        // 更新任务状态为成功
        difyTaskService.updateById(new MjuDifyTask(message.getDifyTaskId(), "3"));
    }
}

四、核心知识点

4.1 手动确认机制

消息确认是 RabbitMQ 确保消息可靠传输的关键:

自动确认:消息投递给消费者后立即确认,可能丢失
手动确认:消费者处理完成后手动确认,确保可靠性

手动确认的实现:

java
// 处理成功,手动确认
channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);

// 处理失败,参数说明:
// - deliveryTag:消息的唯一标识
// - multiple:是否批量确认
// - requeue:是否重新入队
channel.basicNack(deliveryTag, false, false);  // 丢弃消息
channel.basicNack(deliveryTag, false, true);   // 重新入队

4.2 失败重试策略

本项目采用数据库记录加消息重试的策略:

1. 每次消费失败,增加执行次数 num
2. 如果 num >= 3,标记任务为失败状态,不再重试
3. 如果 num < 3,消息重新入队,稍后重试

4.3 任务持久化

将任务先存入数据库,再发送到队列:

好处:
1. 任务状态可追踪
2. 服务重启后任务不丢失
3. 方便排查问题和统计分析

五、配置说明

yaml
# application.yml 配置示例
ai:
  queue: dify_task_queue                    # 队列名称
  exchange: dify_task_exchange              # 交换机名称
  routing: dify_task_routing                # 路由键

spring:
  rabbitmq:
    host: localhost                         # RabbitMQ 地址
    port: 5672                              # RabbitMQ 端口
    username: admin                         # 用户名
    password: admin123                      # 密码
    listener:
      simple:
        acknowledge-mode: manual             # 手动确认
        prefetch: 1                          # 每次只取一条消息

六、总结

知识点说明
Direct Exchange直接交换机,根据路由键精确匹配
Queue 持久化队列 durable=true,消息不丢失
Message 持久化消息 persistent=true
手动 ACK确保消息可靠处理
失败重试最多重试3次,超时标记失败
任务状态追踪数据库记录任务状态

消息队列是分布式系统中不可或缺的组件,通过本项目的实践,我们可以学习到:

1. 如何设计消息格式
2. 如何实现可靠的消息传输
3. 如何处理消费失败和重试
4. 如何追踪任务状态