← 返回 MQ 列表

消息问题与解决方案篇

消息问题与解决方案篇

消息队列在生产环境中常见的四大问题:重复消费、消息乱序、消息堆积、消息过期。其中重复消费和消息乱序是面试最高频的问题。


一、消息重复消费

1.1 为什么会重复消费?

场景 1:网络抖动导致 ACK 丢失
  消费者处理完成,发送 ACK 时网络超时
  Broker 未收到 ACK,认为消费失败
  重新投递消息 → 消费者收到同一条消息两次

场景 2:生产者重试
  生产者发送消息,Broker 写入成功
  但返回 ACK 时网络超时
  生产者重试,Broker 又写入一条 → 两条相同消息

场景 3:消费者宕机重启
  消费者拉取一批消息,处理了一部分后宕机
  重启后偏移量未提交 → 重新拉取同一批消息

1.2 解决方案:接口幂等性

核心原则:无论消息重复多少次,业务结果只生效一次。

方案一:唯一 ID 去重(最常用)

java
/**
 * 基于业务唯一 ID 的去重方案
 * 适用于:订单号、支付流水号等天然具有唯一标识的场景
 */
public class IdempotentConsumer {

    // 使用 Redis 存储已处理的消息 ID(带过期时间)
    @Autowired
    private StringRedisTemplate redisTemplate;

    private static final String IDEMPOTENT_KEY_PREFIX = "msg_processed:";

    public void consume(Message message) {
        String businessId = message.getBusinessKey(); // 业务唯一 ID

        // 1. 检查是否已处理(幂等判断)
        Boolean existed = redisTemplate.opsForValue()
            .setIfAbsent(IDEMPOTENT_KEY_PREFIX + businessId, "1", Duration.ofHours(24));

        if (Boolean.FALSE.equals(existed)) {
            log.info("消息已处理,跳过: {}", businessId);
            return; // 已处理过,直接跳过
        }

        // 2. 处理业务逻辑
        try {
            processBusiness(message);
        } catch (Exception e) {
            // 处理失败时删除幂等标识,允许重试
            redisTemplate.delete(IDEMPOTENT_KEY_PREFIX + businessId);
            throw e;
        }
    }
}

方案二:数据库唯一键约束

java
/**
 * 利用数据库唯一键约束保证幂等
 * 适用于:插入类操作(如创建订单、记录流水)
 */
@Service
public class OrderService {

    @Autowired
    private OrderDao orderDao;

    @Transactional
    public void createOrder(OrderMessage msg) {
        Order order = msg.toOrder();

        try {
            // 数据库 order 表的业务流水号有 UNIQUE 约束
            // 第二次插入会抛出 DuplicateKeyException
            orderDao.insert(order);
        } catch (DuplicateKeyException e) {
            // 已存在,幂等跳过
            log.info("订单已存在,幂等跳过: {}", order.getOrderNo());
            return;
        }

        // 后续处理(减库存等)
        inventoryService.deduct(order);
    }
}
sql
-- 订单表唯一键设计
CREATE TABLE `t_order` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `order_no` varchar(32) NOT NULL COMMENT '业务订单号(唯一)',
  `status` tinyint(4) NOT NULL DEFAULT '0',
  `amount` decimal(10,2) NOT NULL,
  `create_time` datetime NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_order_no` (`order_no`)  -- 唯一约束保证幂等
);

方案三:状态机幂等

java
/**
 * 通过状态流转检查,防止重复处理
 * 适用于:有明确状态流转的业务(订单状态机)
 */
public class StateMachineConsumer {

    @Transactional
    public void processPaidOrder(OrderMessage msg) {
        Order order = orderDao.selectByOrderNo(msg.getOrderNo());

        // 状态机检查:只有"待付款"状态才能变成"已付款"
        if (order.getStatus() != OrderStatus.PENDING_PAYMENT) {
            log.info("订单状态已变更,跳过: {} -> {}", order.getOrderNo(), order.getStatus());
            return;
        }

        // 更新状态(乐观锁 + 状态校验)
        int updated = orderDao.updateStatus(
            order.getId(),
            OrderStatus.PENDING_PAYMENT,  // 期望当前状态
            OrderStatus.PAID               // 目标状态
        );

        if (updated == 0) {
            // 状态已被其他线程修改,幂等跳过
            log.warn("订单状态已被修改,幂等跳过: {}", order.getOrderNo());
            return;
        }

        // 后续处理
        pointsService.add(msg.getUserId(), 100);
    }
}

二、消息顺序性

2.1 乱序原因

正常顺序:订单创建 → 订单支付 → 订单退款
乱序情况:
  分区 1:订单创建
  分区 2:订单退款     ← 退款比支付先到达消费者
  分区 3:订单支付

结果:先退款后支付,业务异常!

产生乱序的根因

  1. 多分区/多队列:同一个业务 ID 的消息被路由到不同分区
  2. 多消费者并发:不同消费者同时消费同一业务的不同消息
  3. 消费重试:消息 A 消费失败重试,消息 B 先被处理

2.2 解决方案

方案一:单分区/单队列(强制顺序)

java
/**
 * RocketMQ — 使用 MessageQueueSelector 保证顺序
 * 相同业务 ID 的消息进入同一个队列
 */
public class OrderedProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("ordered_group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        String orderId = "ORDER_10086";

        // 第一个消息:订单创建
        Message createMsg = new Message("order_topic", "创建".getBytes());
        producer.send(createMsg, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                // 根据订单 ID 选择固定队列(同一订单 ID → 同一队列)
                String orderId = (String) arg;
                int index = orderId.hashCode() % mqs.size();
                return mqs.get(Math.abs(index));
            }
        }, orderId);

        // 第二个消息:订单支付(确保进入同一队列)
        Message payMsg = new Message("order_topic", "支付".getBytes());
        producer.send(payMsg, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                String orderId = (String) arg;
                int index = orderId.hashCode() % mqs.size();
                return mqs.get(Math.abs(index));
            }
        }, orderId);

        producer.shutdown();
    }
}
java
// 顺序消费者 — 单线程消费,保证顺序
public class OrderedConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ordered_group");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("order_topic", "*");

        // 使用 MessageListenerOrderly(顺序消费监听器)
        consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
            // 注意:顺序消息中 RECONSUME_LATER 会导致消息阻塞
            // 不要返回 RECONSUME_LATER,应自行捕获异常并处理
            for (MessageExt msg : msgs) {
                try {
                    processMessage(msg);
                } catch (Exception e) {
                    log.error("顺序消费异常", e);
                    // 可以记录日志并继续,让后续消息也能被消费
                    // 或抛异常让框架暂停队列消费
                }
            }
            return ConsumeOrderlyStatus.SUCCESS;
        });

        consumer.start();
    }
}

方案二:Kafka 分区路由

java
/**
 * Kafka — 通过 key 保证同一 key 进入同一分区
 */
public class KafkaOrderedProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 分区器默认策略:相同 key 发到相同分区
        // 也可自定义 partitioner.class

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // 核心:用业务 ID 作为 key
        String orderId = "ORDER_10086";
        producer.send(new ProducerRecord<>("order_topic", orderId, "订单创建"));
        producer.send(new ProducerRecord<>("order_topic", orderId, "订单支付"));
        producer.send(new ProducerRecord<>("order_topic", orderId, "订单退款"));

        // 同一个 key → 同一个分区 → 同一分区内有序
        // ⚠️ 注意:Kafka 只能保证分区内有序,不能保证全局有序

        producer.close();
    }
}

2.3 全局严格有序

java
/**
 * 全局顺序消息(RocketMQ)
 * 将 Topic 的队列数设置为 1
 * 注意:全局有序牺牲了并发性能,谨慎使用
 */
// 创建 Topic 时指定队列数为 1
// bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t global_order_topic -r 1 -w 1

// 生产者
DefaultMQProducer producer = new DefaultMQProducer("global_order_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();

// 所有消息发送到唯一队列
Message msg = new Message("global_order_topic", "data".getBytes());
producer.send(msg, (mqs, message, arg) -> mqs.get(0), null);

三、消息堆积

3.1 堆积原因分析

正常状态:生产速率 ≈ 消费速率
堆积状态:生产速率 >> 消费速率
          → 消息在 Broker 积压
          → 延迟持续上升
          → 最终可能内存溢出或磁盘写满

常见原因

原因 场景 排查方向
消费者宕机 进程挂了、OOM 查看消费者进程状态
消费阻塞 调外部接口超时、死循环 查看消费者日志
消费能力不足 并发数太低、处理太慢 检查线程数、处理逻辑
生产突发 秒杀、大促活动 查看生产速率监控

3.2 解决方案矩阵

方案一:水平扩容消费者

java
/**
 * 扩容是最直接有效的解决方案
 */
// RocketMQ 注意事项:
// 1. 同一消费组内,队列数 >= 消费者数
// 2. 增加消费者时需要 队列数 >= 消费者数,否则多余的消费者空闲
// 3. Topic 创建时合理设置队列数(预留扩容空间)

// 创建 Topic 时预留足够的队列
// bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t order_topic -r 32 -w 32
// r=32 表示 32 个读队列,w=32 表示 32 个写队列
// 后续可以扩容到最多 32 个消费者并行消费

方案二:排查慢消费

java
/**
 * 消费性能排查示例
 */
public class SlowConsumerDetector {
    // 统计消费耗时
    private static final MetricsCollector metrics = new MetricsCollector();

    @MessageListener(topic = "order_topic")
    public ConsumeResult consume(Message msg) {
        long start = System.currentTimeMillis();
        try {
            // 业务处理
            process(msg);
            return ConsumeResult.SUCCESS;
        } finally {
            long cost = System.currentTimeMillis() - start;
            metrics.record("consume_cost", cost);
            if (cost > 1000) {
                log.warn("消费耗时过长: {}ms, msgId={}", cost, msg.getMsgId());
            }
        }
    }
}

方案三:临时队列 + 分流

java
/**
 * 严重堆积时的紧急方案
 * 1. 创建临时 Topic
 * 2. 从原 Topic 批量转发消息到临时 Topic
 * 3. 启动额外消费者处理临时 Topic
 */
public class EmergencyDrainService {

    @Autowired
    private DefaultMQProducer producer;

    // 紧急分流:从堆积的 Topic 转发到新 Topic
    public void drainMessages(String sourceTopic, String targetTopic, int count) {
        DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer("drain_group");
        pullConsumer.setNamesrvAddr("localhost:9876");
        pullConsumer.start();

        Set<MessageQueue> queues = pullConsumer.fetchSubscribeMessageQueues(sourceTopic);
        int drained = 0;

        for (MessageQueue queue : queues) {
            long offset = pullConsumer.fetchConsumeOffset(queue, false);
            long maxOffset = pullConsumer.maxOffset(queue);

            while (offset < maxOffset && drained < count) {
                try {
                    List<MessageExt> messages = pullConsumer.pull(
                        queue, "*", offset, 32).getMsgFoundList();

                    if (messages != null) {
                        for (MessageExt msg : messages) {
                            // 转发到新 Topic
                            Message newMsg = new Message(targetTopic, msg.getBody());
                            producer.send(newMsg);
                            drained++;
                        }
                        offset += messages.size();
                    }
                } catch (Exception e) {
                    log.error("分流失败", e);
                    break;
                }
            }
        }

        log.info("紧急分流完成: {} 条消息从 {} 转发到 {}", drained, sourceTopic, targetTopic);
        pullConsumer.shutdown();
    }
}

3.3 监控与告警

java
/**
 * 堆积监控(使用 RocketMQ 命令行工具或 Prometheus)
 */
// RocketMQ 命令行查看堆积
// $ bin/mqadmin consumerProgress -n localhost:9876 -g consumer_group

// 输出示例:
// #Topic         | Group           | 堆积数量
// order_topic    | order_group     | 152000    ← 严重告警
// sms_topic      | sms_group       | 320       ← 正常

// 关键指标:
// 1. 堆积数量(Diff Total):> 1000 告警,> 10000 严重告警
// 2. 消费延迟(Last Timestamp):距离当前时间 > 5 分钟告警
// 3. 消费速率(Consume TPS):接近 0 说明消费阻塞

四、消息过期

4.1 消息过期机制

java
/**
 * RocketMQ 消息过期设置
 * RocketMQ 本身不直接支持单个消息的 TTL,通过文件保留时间控制
 */
// broker.conf 配置
fileReservedTime=72    // 消息文件保留 72 小时
deleteWhen=04          // 凌晨 4 点清理过期文件
java
/**
 * Kafka 消息过期设置(基于 Topic)
 */
// 设置 Topic 级别的日志保留时间
// 1. 创建时设置
// kafka-topics.sh --create --topic order_topic \
//   --config retention.ms=86400000 \         # 保留 24 小时
//   --config cleanup.policy=delete \         # 过期删除策略
//   --bootstrap-server localhost:9092

// 2. 修改已有 Topic
// kafka-configs.sh --bootstrap-server localhost:9092 \
//   --entity-type topics --entity-name order_topic \
//   --alter --add-config retention.ms=86400000

// Kafka 清理时机:segment 关闭后检查,不是实时清理
// 因此消息过期后不会立即删除,会有最多 segment.ms 的延迟
java
/**
 * RabbitMQ 消息过期(支持单个消息 TTL)
 */
// 方式一:设置队列的 x-message-ttl
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 60000); // 队列中消息 60 秒过期
channel.queueDeclare("queue_name", true, false, false, args);

// 方式二:设置单个消息的 TTL(RabbitMQ 独有特性)
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
    .expiration("60000")  // 该消息 60 秒后过期
    .build();
channel.basicPublish("exchange", "routing_key", properties, messageBody);

4.2 过期消息处理

java
/**
 * 过期消息转入死信队列(RabbitMQ)
 */
// 声明带死信功能的队列
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 60000);              // 消息 TTL:60 秒
args.put("x-dead-letter-exchange", "dlx.exchange");  // 死信交换机
args.put("x-dead-letter-routing-key", "dlx.key");    // 死信路由键
channel.queueDeclare("order_queue", true, false, false, args);
channel.queueBind("order_queue", "order.exchange", "order.key");

// 处理死信队列中的过期消息
String dlqQueue = channel.queueDeclare("dlq.queue", true, false, false, null).getQueue();
channel.queueBind(dlqQueue, "dlx.exchange", "dlx.key");
channel.basicConsume(dlqQueue, false, (consumerTag, delivery) -> {
    String body = new String(delivery.getBody(), StandardCharsets.UTF_8);
    log.warn("过期消息(死信): {}", body);
    // 分析过期原因、重新处理或告警
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}, consumerTag -> {});

五、综合实战:电商订单系统消息治理

5.1 业务场景

订单创建 → [MQ] → 扣库存、加积分、发短信
订单支付 → [MQ] → 更新订单状态、通知物流
订单退款 → [MQ] → 退款处理

5.2 问题治理方案

java
/**
 * 订单消息综合治理
 */
@Component
public class OrderMessageHandler {

    // ========== 幂等方案 ==========
    @Autowired
    private RedisTemplate<String, String> redis;

    @Transactional
    public void handleOrderPaid(OrderMessage msg) {
        // 1. 幂等检查(Redis 去重)
        String idempotentKey = "order:paid:" + msg.getOrderNo();
        Boolean success = redis.opsForValue()
            .setIfAbsent(idempotentKey, "1", Duration.ofHours(1));
        if (Boolean.FALSE.equals(success)) {
            log.info("重复消息跳过: {}", msg.getOrderNo());
            return;
        }

        // 2. 顺序检查(状态机)
        Order order = orderDao.selectByOrderNo(msg.getOrderNo());
        if (order.getStatus() != OrderStatus.WAIT_PAY) {
            log.warn("订单状态异常: {}, 当前: {}", msg.getOrderNo(), order.getStatus());
            return;
        }

        // 3. 业务处理
        orderDao.updateStatus(order.getId(), OrderStatus.PAID);
        inventoryService.deduct(order);      // 扣库存
        pointsService.add(order.getUserId()); // 加积分
        smsService.sendPaidNotify(order);     // 发短信
    }

    // ========== 堆积告警 ==========
    @Scheduled(fixedDelay = 60000)
    public void monitorAccumulation() {
        // 调用 MQ 管理 API 获取堆积数据
        Long diffTotal = mqAdmin.clusterDiffTotal("order_group");
        if (diffTotal > 10000) {
            alertService.sendUrgent("订单消息堆积严重: " + diffTotal);
        } else if (diffTotal > 1000) {
            log.warn("订单消息堆积: {}", diffTotal);
        }
    }

    // ========== 死信处理 ==========
    @MessageListener(topic = "%DLQ%order_group")
    public void handleDeadLetter(MessageExt msg) {
        log.error("订单死信: msgId={}, body={}", msg.getMsgId(), new String(msg.getBody()));
        alertService.sendAlert("订单死信,需人工介入: " + msg.getMsgId());
        // 记录到死信数据库,定期对账
        deadLetterDao.insert(DeadLetter.from(msg));
    }
}

5.3 配置建议总结

问题 推荐方案 紧急预案
重复消费 唯一 ID 去重(Redis/Database) 人工对账修复
消息乱序 同一业务 ID 路由到同一分区 状态机校验
消息堆积 扩容消费者、排查慢消费 临时队列分流
消息过期 合理配置 retention 死信队列兜底

总结:消息队列的四大问题中,重复消费消息顺序是面试必考题,核心方案分别是幂等设计分区路由。实际生产环境中,这些问题往往是组合出现的,需要建立完善的监控告警体系,做到及时发现、快速响应、自动恢复。