消息问题与解决方案篇
消息队列在生产环境中常见的四大问题:重复消费、消息乱序、消息堆积、消息过期。其中重复消费和消息乱序是面试最高频的问题。
一、消息重复消费
1.1 为什么会重复消费?
场景 1:网络抖动导致 ACK 丢失
消费者处理完成,发送 ACK 时网络超时
Broker 未收到 ACK,认为消费失败
重新投递消息 → 消费者收到同一条消息两次
场景 2:生产者重试
生产者发送消息,Broker 写入成功
但返回 ACK 时网络超时
生产者重试,Broker 又写入一条 → 两条相同消息
场景 3:消费者宕机重启
消费者拉取一批消息,处理了一部分后宕机
重启后偏移量未提交 → 重新拉取同一批消息
1.2 解决方案:接口幂等性
核心原则:无论消息重复多少次,业务结果只生效一次。
方案一:唯一 ID 去重(最常用)
java
/**
* 基于业务唯一 ID 的去重方案
* 适用于:订单号、支付流水号等天然具有唯一标识的场景
*/
public class IdempotentConsumer {
// 使用 Redis 存储已处理的消息 ID(带过期时间)
@Autowired
private StringRedisTemplate redisTemplate;
private static final String IDEMPOTENT_KEY_PREFIX = "msg_processed:";
public void consume(Message message) {
String businessId = message.getBusinessKey(); // 业务唯一 ID
// 1. 检查是否已处理(幂等判断)
Boolean existed = redisTemplate.opsForValue()
.setIfAbsent(IDEMPOTENT_KEY_PREFIX + businessId, "1", Duration.ofHours(24));
if (Boolean.FALSE.equals(existed)) {
log.info("消息已处理,跳过: {}", businessId);
return; // 已处理过,直接跳过
}
// 2. 处理业务逻辑
try {
processBusiness(message);
} catch (Exception e) {
// 处理失败时删除幂等标识,允许重试
redisTemplate.delete(IDEMPOTENT_KEY_PREFIX + businessId);
throw e;
}
}
}
方案二:数据库唯一键约束
java
/**
* 利用数据库唯一键约束保证幂等
* 适用于:插入类操作(如创建订单、记录流水)
*/
@Service
public class OrderService {
@Autowired
private OrderDao orderDao;
@Transactional
public void createOrder(OrderMessage msg) {
Order order = msg.toOrder();
try {
// 数据库 order 表的业务流水号有 UNIQUE 约束
// 第二次插入会抛出 DuplicateKeyException
orderDao.insert(order);
} catch (DuplicateKeyException e) {
// 已存在,幂等跳过
log.info("订单已存在,幂等跳过: {}", order.getOrderNo());
return;
}
// 后续处理(减库存等)
inventoryService.deduct(order);
}
}
sql
-- 订单表唯一键设计
CREATE TABLE `t_order` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`order_no` varchar(32) NOT NULL COMMENT '业务订单号(唯一)',
`status` tinyint(4) NOT NULL DEFAULT '0',
`amount` decimal(10,2) NOT NULL,
`create_time` datetime NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_order_no` (`order_no`) -- 唯一约束保证幂等
);
方案三:状态机幂等
java
/**
* 通过状态流转检查,防止重复处理
* 适用于:有明确状态流转的业务(订单状态机)
*/
public class StateMachineConsumer {
@Transactional
public void processPaidOrder(OrderMessage msg) {
Order order = orderDao.selectByOrderNo(msg.getOrderNo());
// 状态机检查:只有"待付款"状态才能变成"已付款"
if (order.getStatus() != OrderStatus.PENDING_PAYMENT) {
log.info("订单状态已变更,跳过: {} -> {}", order.getOrderNo(), order.getStatus());
return;
}
// 更新状态(乐观锁 + 状态校验)
int updated = orderDao.updateStatus(
order.getId(),
OrderStatus.PENDING_PAYMENT, // 期望当前状态
OrderStatus.PAID // 目标状态
);
if (updated == 0) {
// 状态已被其他线程修改,幂等跳过
log.warn("订单状态已被修改,幂等跳过: {}", order.getOrderNo());
return;
}
// 后续处理
pointsService.add(msg.getUserId(), 100);
}
}
二、消息顺序性
2.1 乱序原因
正常顺序:订单创建 → 订单支付 → 订单退款
乱序情况:
分区 1:订单创建
分区 2:订单退款 ← 退款比支付先到达消费者
分区 3:订单支付
结果:先退款后支付,业务异常!
产生乱序的根因:
- 多分区/多队列:同一个业务 ID 的消息被路由到不同分区
- 多消费者并发:不同消费者同时消费同一业务的不同消息
- 消费重试:消息 A 消费失败重试,消息 B 先被处理
2.2 解决方案
方案一:单分区/单队列(强制顺序)
java
/**
* RocketMQ — 使用 MessageQueueSelector 保证顺序
* 相同业务 ID 的消息进入同一个队列
*/
public class OrderedProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ordered_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
String orderId = "ORDER_10086";
// 第一个消息:订单创建
Message createMsg = new Message("order_topic", "创建".getBytes());
producer.send(createMsg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 根据订单 ID 选择固定队列(同一订单 ID → 同一队列)
String orderId = (String) arg;
int index = orderId.hashCode() % mqs.size();
return mqs.get(Math.abs(index));
}
}, orderId);
// 第二个消息:订单支付(确保进入同一队列)
Message payMsg = new Message("order_topic", "支付".getBytes());
producer.send(payMsg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
String orderId = (String) arg;
int index = orderId.hashCode() % mqs.size();
return mqs.get(Math.abs(index));
}
}, orderId);
producer.shutdown();
}
}
java
// 顺序消费者 — 单线程消费,保证顺序
public class OrderedConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ordered_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("order_topic", "*");
// 使用 MessageListenerOrderly(顺序消费监听器)
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
// 注意:顺序消息中 RECONSUME_LATER 会导致消息阻塞
// 不要返回 RECONSUME_LATER,应自行捕获异常并处理
for (MessageExt msg : msgs) {
try {
processMessage(msg);
} catch (Exception e) {
log.error("顺序消费异常", e);
// 可以记录日志并继续,让后续消息也能被消费
// 或抛异常让框架暂停队列消费
}
}
return ConsumeOrderlyStatus.SUCCESS;
});
consumer.start();
}
}
方案二:Kafka 分区路由
java
/**
* Kafka — 通过 key 保证同一 key 进入同一分区
*/
public class KafkaOrderedProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 分区器默认策略:相同 key 发到相同分区
// 也可自定义 partitioner.class
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 核心:用业务 ID 作为 key
String orderId = "ORDER_10086";
producer.send(new ProducerRecord<>("order_topic", orderId, "订单创建"));
producer.send(new ProducerRecord<>("order_topic", orderId, "订单支付"));
producer.send(new ProducerRecord<>("order_topic", orderId, "订单退款"));
// 同一个 key → 同一个分区 → 同一分区内有序
// ⚠️ 注意:Kafka 只能保证分区内有序,不能保证全局有序
producer.close();
}
}
2.3 全局严格有序
java
/**
* 全局顺序消息(RocketMQ)
* 将 Topic 的队列数设置为 1
* 注意:全局有序牺牲了并发性能,谨慎使用
*/
// 创建 Topic 时指定队列数为 1
// bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t global_order_topic -r 1 -w 1
// 生产者
DefaultMQProducer producer = new DefaultMQProducer("global_order_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
// 所有消息发送到唯一队列
Message msg = new Message("global_order_topic", "data".getBytes());
producer.send(msg, (mqs, message, arg) -> mqs.get(0), null);
三、消息堆积
3.1 堆积原因分析
正常状态:生产速率 ≈ 消费速率
堆积状态:生产速率 >> 消费速率
→ 消息在 Broker 积压
→ 延迟持续上升
→ 最终可能内存溢出或磁盘写满
常见原因:
| 原因 | 场景 | 排查方向 |
|---|---|---|
| 消费者宕机 | 进程挂了、OOM | 查看消费者进程状态 |
| 消费阻塞 | 调外部接口超时、死循环 | 查看消费者日志 |
| 消费能力不足 | 并发数太低、处理太慢 | 检查线程数、处理逻辑 |
| 生产突发 | 秒杀、大促活动 | 查看生产速率监控 |
3.2 解决方案矩阵
方案一:水平扩容消费者
java
/**
* 扩容是最直接有效的解决方案
*/
// RocketMQ 注意事项:
// 1. 同一消费组内,队列数 >= 消费者数
// 2. 增加消费者时需要 队列数 >= 消费者数,否则多余的消费者空闲
// 3. Topic 创建时合理设置队列数(预留扩容空间)
// 创建 Topic 时预留足够的队列
// bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t order_topic -r 32 -w 32
// r=32 表示 32 个读队列,w=32 表示 32 个写队列
// 后续可以扩容到最多 32 个消费者并行消费
方案二:排查慢消费
java
/**
* 消费性能排查示例
*/
public class SlowConsumerDetector {
// 统计消费耗时
private static final MetricsCollector metrics = new MetricsCollector();
@MessageListener(topic = "order_topic")
public ConsumeResult consume(Message msg) {
long start = System.currentTimeMillis();
try {
// 业务处理
process(msg);
return ConsumeResult.SUCCESS;
} finally {
long cost = System.currentTimeMillis() - start;
metrics.record("consume_cost", cost);
if (cost > 1000) {
log.warn("消费耗时过长: {}ms, msgId={}", cost, msg.getMsgId());
}
}
}
}
方案三:临时队列 + 分流
java
/**
* 严重堆积时的紧急方案
* 1. 创建临时 Topic
* 2. 从原 Topic 批量转发消息到临时 Topic
* 3. 启动额外消费者处理临时 Topic
*/
public class EmergencyDrainService {
@Autowired
private DefaultMQProducer producer;
// 紧急分流:从堆积的 Topic 转发到新 Topic
public void drainMessages(String sourceTopic, String targetTopic, int count) {
DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer("drain_group");
pullConsumer.setNamesrvAddr("localhost:9876");
pullConsumer.start();
Set<MessageQueue> queues = pullConsumer.fetchSubscribeMessageQueues(sourceTopic);
int drained = 0;
for (MessageQueue queue : queues) {
long offset = pullConsumer.fetchConsumeOffset(queue, false);
long maxOffset = pullConsumer.maxOffset(queue);
while (offset < maxOffset && drained < count) {
try {
List<MessageExt> messages = pullConsumer.pull(
queue, "*", offset, 32).getMsgFoundList();
if (messages != null) {
for (MessageExt msg : messages) {
// 转发到新 Topic
Message newMsg = new Message(targetTopic, msg.getBody());
producer.send(newMsg);
drained++;
}
offset += messages.size();
}
} catch (Exception e) {
log.error("分流失败", e);
break;
}
}
}
log.info("紧急分流完成: {} 条消息从 {} 转发到 {}", drained, sourceTopic, targetTopic);
pullConsumer.shutdown();
}
}
3.3 监控与告警
java
/**
* 堆积监控(使用 RocketMQ 命令行工具或 Prometheus)
*/
// RocketMQ 命令行查看堆积
// $ bin/mqadmin consumerProgress -n localhost:9876 -g consumer_group
// 输出示例:
// #Topic | Group | 堆积数量
// order_topic | order_group | 152000 ← 严重告警
// sms_topic | sms_group | 320 ← 正常
// 关键指标:
// 1. 堆积数量(Diff Total):> 1000 告警,> 10000 严重告警
// 2. 消费延迟(Last Timestamp):距离当前时间 > 5 分钟告警
// 3. 消费速率(Consume TPS):接近 0 说明消费阻塞
四、消息过期
4.1 消息过期机制
java
/**
* RocketMQ 消息过期设置
* RocketMQ 本身不直接支持单个消息的 TTL,通过文件保留时间控制
*/
// broker.conf 配置
fileReservedTime=72 // 消息文件保留 72 小时
deleteWhen=04 // 凌晨 4 点清理过期文件
java
/**
* Kafka 消息过期设置(基于 Topic)
*/
// 设置 Topic 级别的日志保留时间
// 1. 创建时设置
// kafka-topics.sh --create --topic order_topic \
// --config retention.ms=86400000 \ # 保留 24 小时
// --config cleanup.policy=delete \ # 过期删除策略
// --bootstrap-server localhost:9092
// 2. 修改已有 Topic
// kafka-configs.sh --bootstrap-server localhost:9092 \
// --entity-type topics --entity-name order_topic \
// --alter --add-config retention.ms=86400000
// Kafka 清理时机:segment 关闭后检查,不是实时清理
// 因此消息过期后不会立即删除,会有最多 segment.ms 的延迟
java
/**
* RabbitMQ 消息过期(支持单个消息 TTL)
*/
// 方式一:设置队列的 x-message-ttl
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 60000); // 队列中消息 60 秒过期
channel.queueDeclare("queue_name", true, false, false, args);
// 方式二:设置单个消息的 TTL(RabbitMQ 独有特性)
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.expiration("60000") // 该消息 60 秒后过期
.build();
channel.basicPublish("exchange", "routing_key", properties, messageBody);
4.2 过期消息处理
java
/**
* 过期消息转入死信队列(RabbitMQ)
*/
// 声明带死信功能的队列
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 60000); // 消息 TTL:60 秒
args.put("x-dead-letter-exchange", "dlx.exchange"); // 死信交换机
args.put("x-dead-letter-routing-key", "dlx.key"); // 死信路由键
channel.queueDeclare("order_queue", true, false, false, args);
channel.queueBind("order_queue", "order.exchange", "order.key");
// 处理死信队列中的过期消息
String dlqQueue = channel.queueDeclare("dlq.queue", true, false, false, null).getQueue();
channel.queueBind(dlqQueue, "dlx.exchange", "dlx.key");
channel.basicConsume(dlqQueue, false, (consumerTag, delivery) -> {
String body = new String(delivery.getBody(), StandardCharsets.UTF_8);
log.warn("过期消息(死信): {}", body);
// 分析过期原因、重新处理或告警
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}, consumerTag -> {});
五、综合实战:电商订单系统消息治理
5.1 业务场景
订单创建 → [MQ] → 扣库存、加积分、发短信
订单支付 → [MQ] → 更新订单状态、通知物流
订单退款 → [MQ] → 退款处理
5.2 问题治理方案
java
/**
* 订单消息综合治理
*/
@Component
public class OrderMessageHandler {
// ========== 幂等方案 ==========
@Autowired
private RedisTemplate<String, String> redis;
@Transactional
public void handleOrderPaid(OrderMessage msg) {
// 1. 幂等检查(Redis 去重)
String idempotentKey = "order:paid:" + msg.getOrderNo();
Boolean success = redis.opsForValue()
.setIfAbsent(idempotentKey, "1", Duration.ofHours(1));
if (Boolean.FALSE.equals(success)) {
log.info("重复消息跳过: {}", msg.getOrderNo());
return;
}
// 2. 顺序检查(状态机)
Order order = orderDao.selectByOrderNo(msg.getOrderNo());
if (order.getStatus() != OrderStatus.WAIT_PAY) {
log.warn("订单状态异常: {}, 当前: {}", msg.getOrderNo(), order.getStatus());
return;
}
// 3. 业务处理
orderDao.updateStatus(order.getId(), OrderStatus.PAID);
inventoryService.deduct(order); // 扣库存
pointsService.add(order.getUserId()); // 加积分
smsService.sendPaidNotify(order); // 发短信
}
// ========== 堆积告警 ==========
@Scheduled(fixedDelay = 60000)
public void monitorAccumulation() {
// 调用 MQ 管理 API 获取堆积数据
Long diffTotal = mqAdmin.clusterDiffTotal("order_group");
if (diffTotal > 10000) {
alertService.sendUrgent("订单消息堆积严重: " + diffTotal);
} else if (diffTotal > 1000) {
log.warn("订单消息堆积: {}", diffTotal);
}
}
// ========== 死信处理 ==========
@MessageListener(topic = "%DLQ%order_group")
public void handleDeadLetter(MessageExt msg) {
log.error("订单死信: msgId={}, body={}", msg.getMsgId(), new String(msg.getBody()));
alertService.sendAlert("订单死信,需人工介入: " + msg.getMsgId());
// 记录到死信数据库,定期对账
deadLetterDao.insert(DeadLetter.from(msg));
}
}
5.3 配置建议总结
| 问题 | 推荐方案 | 紧急预案 |
|---|---|---|
| 重复消费 | 唯一 ID 去重(Redis/Database) | 人工对账修复 |
| 消息乱序 | 同一业务 ID 路由到同一分区 | 状态机校验 |
| 消息堆积 | 扩容消费者、排查慢消费 | 临时队列分流 |
| 消息过期 | 合理配置 retention | 死信队列兜底 |
总结:消息队列的四大问题中,重复消费和消息顺序是面试必考题,核心方案分别是幂等设计和分区路由。实际生产环境中,这些问题往往是组合出现的,需要建立完善的监控告警体系,做到及时发现、快速响应、自动恢复。