消息可靠性篇
消息可靠性是消息队列在生产环境中最重要的考量之一。消息一旦丢失,可能导致业务数据不一致、订单遗漏、资金差错等严重事故。本篇详细分析消息丢失的三大环节及对应防范方案。
一、消息丢失的三大环节
[生产者] ──发送──▶ [MQ Broker] ──投递──▶ [消费者]
① ② ③
| 环节 | 丢失原因 | 风险等级 |
|---|---|---|
| ① 生产端 | 网络超时、Broker 宕机、发送异常未重试 | ⭐⭐⭐⭐⭐ |
| ② Broker 端 | 消息未持久化、宕机丢内存数据、副本未同步 | ⭐⭐⭐⭐⭐ |
| ③ 消费端 | 自动 ACK 后消费异常、消费逻辑抛出异常 | ⭐⭐⭐⭐ |
二、生产端 — 生产者可靠发送
2.1 确认机制(ACK)
生产者发送消息后,Broker 返回确认信号,告知发送结果。
RocketMQ 三种发送方式:
java
public class ReliableProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("reliable_group");
producer.setNamesrvAddr("localhost:9876");
producer.setRetryTimesWhenSendFailed(3); // 失败重试 3 次
producer.setSendMsgTimeout(3000); // 发送超时 3 秒
producer.start();
Message msg = new Message("reliable_topic",
"消息内容".getBytes(StandardCharsets.UTF_8));
// 方式一:同步发送(推荐)— 等待 Broker 确认
SendResult result = producer.send(msg);
if (result.getSendStatus() == SendStatus.SEND_OK) {
System.out.println("发送成功, msgId=" + result.getMsgId());
}
// 方式二:异步发送 — 通过回调接收结果
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult result) {
System.out.println("异步发送成功: " + result.getMsgId());
}
@Override
public void onException(Throwable e) {
System.err.println("异步发送失败: " + e.getMessage());
// 记录日志,后续补偿
}
});
// 方式三:单向发送 — 不关心结果(性能最高但有丢失风险)
producer.sendOneway(msg);
producer.shutdown();
}
}
Kafka ACK 级别配置:
java
Properties props = new Properties();
// acks=0: 发出去就不管了(丢最多)
// acks=1: Leader 写入成功即返回(默认,一般场景)
// acks=all: Leader + 所有 ISR 副本都写入才返回(最可靠)
props.put("acks", "all");
// 最小同步副本数(配合 acks=all 使用)
// topic 级别配置:min.insync.replicas=2
| ACK 级别 | 可靠性 | 性能 | 适用场景 |
|---|---|---|---|
| acks=0 | ❌ 最低 | 最高 | 日志等可丢场景 |
| acks=1 | ⚠️ 中等 | 高 | 一般业务(默认) |
| acks=all | ✅ 最高 | 较低 | 订单、支付等核心链路 |
2.2 生产者重试机制
java
// RocketMQ 重试配置
producer.setRetryTimesWhenSendFailed(3); // 同步重试次数
producer.setRetryTimesWhenSendAsyncFailed(3); // 异步重试次数
// Kafka 重试配置
props.put("retries", 3);
props.put("retry.backoff.ms", 500); // 重试间隔 500ms
props.put("delivery.timeout.ms", 120000); // 总超时 2 分钟
2.3 本地消息表(终极兜底方案)
当 MQ 完全不可用时,通过本地消息表保证最终一致性。
核心流程:
1. 业务操作 + 消息记录 在同一个本地事务中
2. 定时任务扫描未发送的消息,尝试发送
3. 发送成功后标记为已发送
4. 超过次数仍未成功则告警人工介入
sql
-- 本地消息表结构
CREATE TABLE `local_message` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`biz_key` varchar(128) NOT NULL COMMENT '业务唯一键',
`message_body` text NOT NULL COMMENT '消息体(JSON)',
`topic` varchar(64) NOT NULL COMMENT '目标主题',
`status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '0-待发送 1-已发送 2-发送失败',
`retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '已重试次数',
`max_retry` int(11) NOT NULL DEFAULT '5' COMMENT '最大重试次数',
`create_time` datetime NOT NULL,
`update_time` datetime NOT NULL,
PRIMARY KEY (`id`),
KEY `idx_status` (`status`)
);
java
// 1. 业务操作 + 写本地消息表(同一事务)
@Transactional
public void createOrder(Order order) {
// 业务操作
orderDao.insert(order);
// 写本地消息表
LocalMessage msg = new LocalMessage();
msg.setBizKey("order_" + order.getId());
msg.setTopic("order_topic");
msg.setMessageBody(JSON.toJSONString(order));
msg.setStatus(0);
localMessageDao.insert(msg);
}
// 2. 定时任务扫描发送
@Scheduled(fixedDelay = 5000) // 每 5 秒扫描一次
public void scanAndSend() {
List<LocalMessage> pending = localMessageDao.selectByStatus(0, 100);
for (LocalMessage msg : pending) {
try {
producer.send(msg.getTopic(), msg.getMessageBody());
localMessageDao.updateStatus(msg.getId(), 1); // 标记已发送
} catch (Exception e) {
log.error("消息发送失败", e);
localMessageDao.incrementRetry(msg.getId());
if (msg.getRetryCount() >= msg.getMaxRetry()) {
localMessageDao.updateStatus(msg.getId(), 2); // 标记失败
alertService.sendAlert("消息发送超过最大重试次数: " + msg.getId());
}
}
}
}
三、Broker 端 — 消息持久化与副本
3.1 消息持久化
将消息从内存写入磁盘,防止 Broker 重启后丢失。
RocketMQ 持久化配置:
properties
# broker.conf
# 刷盘方式
flushDiskType=ASYNC_FLUSH # 异步刷盘(性能好,丢失概率极低)
# flushDiskType=SYNC_FLUSH # 同步刷盘(最安全,性能降低 10 倍)
# 存储路径
storePathRootDir=/data/rocketmq/store
storePathCommitLog=/data/rocketmq/store/commitlog
storePathConsumeQueue=/data/rocketmq/store/consumequeue
两种刷盘策略对比:
同步刷盘:
Producer → Broker(写入内存) → Broker(写入磁盘) → 返回 ACK
↓
✅ 必须落盘才确认
异步刷盘:
Producer → Broker(写入内存) → 返回 ACK(快速)
↓
⏳ 后台线程批量刷盘(默认 500ms 间隔)
| 刷盘策略 | 可靠性 | 吞吐量 | 场景 |
|---|---|---|---|
| 同步刷盘 | ⭐⭐⭐⭐⭐ | ~3万 TPS | 金融、对账核心链路 |
| 异步刷盘 | ⭐⭐⭐⭐ | ~10万+ TPS | 大部分业务场景 |
3.2 副本同步
properties
# RocketMQ 主从同步配置
# Master 配置
brokerRole=SYNC_MASTER # 同步复制模式
# brokerRole=ASYNC_MASTER # 异步复制模式
# Slave 配置
brokerRole=SLAVE
java
// Kafka 副本配置(Topic 级别)
// 创建 topic 时指定副本因子
// --replication-factor 3 # 每个分区 3 个副本
// --min-insync-replicas 2 # 最少同步副本数
三种复制模式数据安全对比:
| 场景 | SYNC_MASTER | ASYNC_MASTER |
|---|---|---|
| Master 宕机但 Slave 已同步 | ✅ 无丢失 | ✅ 无丢失 |
| Master 宕机且 Slave 未同步 | ❌ 不丢(写 Slave 才返回) | ⚠️ 可能丢消息 |
| Master 磁盘损坏 | 依赖 Slave | 依赖 Slave |
| 性能 | 中等 | 高 |
四、消费端 — 消费者可靠消费
4.1 手动 ACK(最关键的防线)
绝对禁止使用自动 ACK,这是消费端丢消息最常见的原因。
java
// ❌ 错误做法:自动 ACK(消费未完成就确认)
props.put("enable.auto.commit", "true"); // Kafka
props.put("auto.commit.interval.ms", "5000");
// ✅ 正确做法:手动 ACK,业务处理完成再确认
// RocketMQ 手动确认
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("topic", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
try {
// 1. 处理业务逻辑
processMessage(msg);
// 2. 业务成功 → 返回消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
log.error("消费失败,稍后重试", e);
// 3. 返回重试状态,消息不会丢失
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
java
// Kafka 手动提交偏移量
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
try {
processMessage(record); // 处理业务
// 处理成功 → 提交偏移量
consumer.commitSync(); // 同步提交
// 或 consumer.commitAsync(); // 异步提交
} catch (Exception e) {
// 处理失败 → 不提交偏移量,下次重新拉取该消息
log.error("消费失败,偏移量不回退: {}", record.offset());
break;
}
}
}
4.2 消费重试机制
消费失败时,MQ 会自动重试。
RocketMQ 重试配置:
java
// 消费者设置重试次数
consumer.setMaxReconsumeTimes(16); // 最大重试 16 次(默认)
// 重试间隔(RocketMQ 默认递增加载)
// 第1次:10 秒 第2次:30 秒 第3次:1 分钟
// 第4次:2 分钟 第5次:3 分钟 第6次:4 分钟
// 第7次:5 分钟 第8次:6 分钟 第9次:7 分钟
// 第10次:8 分钟 第11次:9 分钟 第12次:10 分钟
// 第13次:20 分钟 第14次:30 分钟 第15次:1 小时
// 第16次:2 小时
Kafka 重试策略(需在消费者中自行实现):
java
// Kafka 没有内置重试队列,通常结合重试 Topic 实现
public class RetryConsumer {
// 主消费者
public void consumeMain() {
KafkaConsumer<String, String> consumer = createConsumer("main_topic");
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
try {
processMessage(record);
consumer.commitSync();
} catch (RetryableException e) {
// 发送到重试 Topic
sendToRetryTopic(record, 1); // 第 1 次重试
consumer.commitSync(); // 提交偏移量(防止重复消费)
}
}
}
}
// 重试消费者(延迟消费)
public void consumeRetry() {
KafkaConsumer<String, String> consumer = createConsumer("retry_topic");
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
int retryCount = extractRetryCount(record);
if (retryCount >= 3) {
sendToDlq(record); // 超过 3 次 → 死信队列
consumer.commitSync();
continue;
}
try {
processMessage(record);
consumer.commitSync();
} catch (RetryableException e) {
Thread.sleep(retryCount * 1000L); // 递增延迟
sendToRetryTopic(record, retryCount + 1);
consumer.commitSync();
}
}
}
}
}
五、死信队列(DLQ, Dead Letter Queue)
5.1 什么是死信队列
消息在以下情况会进入死信队列:
- 消费重试次数超过最大限制
- 消息过期未被消费
- 消息在 Broker 端校验失败
重试次数超限
正常消费 ───▶ 重试 16 次 ─────────▶ 死信队列
│
┌─────▼──────┐
│ 人工/自动 │
│ 排查处理 │
└─────────────┘
5.2 RocketMQ 死信队列实战
java
// RocketMQ 自动创建死信队列
// 原消费组: consumer_group
// 原 Topic: order_topic
// 死信队列 Topic: %DLQ%consumer_group
// 死信队列中的消息格式与原消息一致,额外增加重试次数 header
// 配置消费者监听死信队列
DefaultMQPushConsumer dlqConsumer = new DefaultMQPushConsumer("dlq_consumer_group");
dlqConsumer.setNamesrvAddr("localhost:9876");
dlqConsumer.subscribe("%DLQ%consumer_group", "*");
dlqConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
String msgId = msg.getMsgId();
String body = new String(msg.getBody(), StandardCharsets.UTF_8);
int retryTimes = msg.getReconsumeTimes();
log.error("死信消息: msgId={}, retryTimes={}, body={}", msgId, retryTimes, body);
// 发送告警
alertService.sendAlert("消息进入死信队列: " + msgId);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
dlqConsumer.start();
5.3 Kafka 死信队列实现
java
// Kafka 需要手动实现死信队列
public class KafkaDlqExample {
private static final String MAIN_TOPIC = "order_topic";
private static final String DLQ_TOPIC = "order_topic_dlq";
// 死信队列生产者
public void sendToDlq(ConsumerRecord<String, String> record, String errorMsg) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
// 原消息体 + 异常信息
JsonObject dlqMessage = new JsonObject();
dlqMessage.addProperty("original_value", record.value());
dlqMessage.addProperty("error", errorMsg);
dlqMessage.addProperty("original_topic", record.topic());
dlqMessage.addProperty("original_partition", record.partition());
dlqMessage.addProperty("original_offset", record.offset());
dlqMessage.addProperty("timestamp", LocalDateTime.now().toString());
ProducerRecord<String, String> dlqRecord =
new ProducerRecord<>(DLQ_TOPIC, record.key(), dlqMessage.toString());
producer.send(dlqRecord);
}
}
// 死信队列消费者
public void consumeDlq() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "dlq_consumer_group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(DLQ_TOPIC));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
log.warn("死信消息: {}", record.value());
// 人工介入处理
consumer.commitSync();
}
}
}
}
六、全链路可靠性实战方案
6.1 核心链路配置清单
生产端:
✅ 同步发送 + ACK 确认
✅ 发送重试 3 次
✅ 本地消息表兜底
Broker 端:
✅ 消息持久化(同步刷盘或异步刷盘)
✅ 主从同步(SYNC_MASTER 模式)
✅ 多副本(至少 2 副本)
消费端:
✅ 手动 ACK(绝对禁止自动 ACK)
✅ 业务成功才确认
✅ 合理配置重试次数
✅ 死信队列监控与告警
6.2 可靠性验证 Checklist
markdown
- [ ] 生产端:发送失败时业务有补偿机制?
- [ ] 生产端:是否配置了重试次数和超时时间?
- [ ] Broker:刷盘策略是否合理?
- [ ] Broker:副本数是否 ≥ 2?
- [ ] Broker:是否有监控(磁盘、内存、堆积)?
- [ ] 消费端:是否使用手动 ACK?
- [ ] 消费端:重试次数是否合理?
- [ ] 消费端:死信队列是否有专人处理?
- [ ] 全链路:是否有消息对账机制?
6.3 消息轨迹与链路追踪
java
// RocketMQ 内置消息轨迹(Trace)
DefaultMQProducer producer = new DefaultMQProducer("group");
producer.setNamesrvAddr("localhost:9876");
producer.setEnableMsgTrace(true); // 开启轨迹追踪
producer.setCustomizedTraceTopic("rmq_sys_trace"); // 指定轨迹存储 Topic
// 查询消息轨迹(RocketMQ 控制台或 API)
// 可以看到:
// 1. 消息什么时候产生的
// 2. 什么时候到达 Broker
// 3. 哪个消费者在什么时候消费的
// 4. 消费是否成功
七、面试高频问题
Q1:如何保证消息不丢失?
从三个环节分别回答:
- 生产端:ACK + 重试 + 本地消息表
- Broker:持久化 + 副本同步
- 消费端:手动 ACK + 重试 + DLQ
Q2:同步刷盘和异步刷盘的区别?
| 维度 | 同步刷盘 | 异步刷盘 |
|---|---|---|
| 机制 | 写入磁盘才返回 ACK | 写入内存即返回 ACK |
| 可靠性 | 极高 | 高(可能丢毫秒级数据) |
| 性能 | 约 3万 TPS | 约 10万+ TPS |
| 场景 | 金融、支付 | 大部分业务 |
Q3:什么是死信队列?有什么作用?
- 死信是消费失败次数超限或过期的消息
- Rrole:兜底处理,防止消息无限重试,便于问题排查
- 应配置监控告警,死信出现说明存在消费异常
总结:消息可靠性需要全链路保障。三个环节中最容易被忽视的是消费端自动 ACK 问题,这也是生产环境消息丢失最常见的元凶。建议核心链路采用 同步发送 + SYNC_MASTER + 手动 ACK 的配置组合,同时配合死信队列监控,确保消息万无一失。