← 返回 MQ 列表

消息可靠性篇

消息可靠性篇

消息可靠性是消息队列在生产环境中最重要的考量之一。消息一旦丢失,可能导致业务数据不一致、订单遗漏、资金差错等严重事故。本篇详细分析消息丢失的三大环节及对应防范方案。


一、消息丢失的三大环节

  [生产者] ──发送──▶ [MQ Broker] ──投递──▶ [消费者]
      ①                 ②                   ③
环节 丢失原因 风险等级
① 生产端 网络超时、Broker 宕机、发送异常未重试 ⭐⭐⭐⭐⭐
② Broker 端 消息未持久化、宕机丢内存数据、副本未同步 ⭐⭐⭐⭐⭐
③ 消费端 自动 ACK 后消费异常、消费逻辑抛出异常 ⭐⭐⭐⭐

二、生产端 — 生产者可靠发送

2.1 确认机制(ACK)

生产者发送消息后,Broker 返回确认信号,告知发送结果。

RocketMQ 三种发送方式

java
public class ReliableProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("reliable_group");
        producer.setNamesrvAddr("localhost:9876");
        producer.setRetryTimesWhenSendFailed(3);   // 失败重试 3 次
        producer.setSendMsgTimeout(3000);           // 发送超时 3 秒
        producer.start();

        Message msg = new Message("reliable_topic",
            "消息内容".getBytes(StandardCharsets.UTF_8));

        // 方式一:同步发送(推荐)— 等待 Broker 确认
        SendResult result = producer.send(msg);
        if (result.getSendStatus() == SendStatus.SEND_OK) {
            System.out.println("发送成功, msgId=" + result.getMsgId());
        }

        // 方式二:异步发送 — 通过回调接收结果
        producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult result) {
                System.out.println("异步发送成功: " + result.getMsgId());
            }
            @Override
            public void onException(Throwable e) {
                System.err.println("异步发送失败: " + e.getMessage());
                // 记录日志,后续补偿
            }
        });

        // 方式三:单向发送 — 不关心结果(性能最高但有丢失风险)
        producer.sendOneway(msg);

        producer.shutdown();
    }
}

Kafka ACK 级别配置

java
Properties props = new Properties();

// acks=0: 发出去就不管了(丢最多)
// acks=1: Leader 写入成功即返回(默认,一般场景)
// acks=all: Leader + 所有 ISR 副本都写入才返回(最可靠)
props.put("acks", "all");

// 最小同步副本数(配合 acks=all 使用)
// topic 级别配置:min.insync.replicas=2
ACK 级别 可靠性 性能 适用场景
acks=0 ❌ 最低 最高 日志等可丢场景
acks=1 ⚠️ 中等 一般业务(默认)
acks=all ✅ 最高 较低 订单、支付等核心链路

2.2 生产者重试机制

java
// RocketMQ 重试配置
producer.setRetryTimesWhenSendFailed(3);     // 同步重试次数
producer.setRetryTimesWhenSendAsyncFailed(3); // 异步重试次数

// Kafka 重试配置
props.put("retries", 3);
props.put("retry.backoff.ms", 500);          // 重试间隔 500ms
props.put("delivery.timeout.ms", 120000);    // 总超时 2 分钟

2.3 本地消息表(终极兜底方案)

当 MQ 完全不可用时,通过本地消息表保证最终一致性。

核心流程

1. 业务操作 + 消息记录 在同一个本地事务中
2. 定时任务扫描未发送的消息,尝试发送
3. 发送成功后标记为已发送
4. 超过次数仍未成功则告警人工介入
sql
-- 本地消息表结构
CREATE TABLE `local_message` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `biz_key` varchar(128) NOT NULL COMMENT '业务唯一键',
  `message_body` text NOT NULL COMMENT '消息体(JSON)',
  `topic` varchar(64) NOT NULL COMMENT '目标主题',
  `status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '0-待发送 1-已发送 2-发送失败',
  `retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '已重试次数',
  `max_retry` int(11) NOT NULL DEFAULT '5' COMMENT '最大重试次数',
  `create_time` datetime NOT NULL,
  `update_time` datetime NOT NULL,
  PRIMARY KEY (`id`),
  KEY `idx_status` (`status`)
);
java
// 1. 业务操作 + 写本地消息表(同一事务)
@Transactional
public void createOrder(Order order) {
    // 业务操作
    orderDao.insert(order);

    // 写本地消息表
    LocalMessage msg = new LocalMessage();
    msg.setBizKey("order_" + order.getId());
    msg.setTopic("order_topic");
    msg.setMessageBody(JSON.toJSONString(order));
    msg.setStatus(0);
    localMessageDao.insert(msg);
}

// 2. 定时任务扫描发送
@Scheduled(fixedDelay = 5000)  // 每 5 秒扫描一次
public void scanAndSend() {
    List<LocalMessage> pending = localMessageDao.selectByStatus(0, 100);
    for (LocalMessage msg : pending) {
        try {
            producer.send(msg.getTopic(), msg.getMessageBody());
            localMessageDao.updateStatus(msg.getId(), 1); // 标记已发送
        } catch (Exception e) {
            log.error("消息发送失败", e);
            localMessageDao.incrementRetry(msg.getId());
            if (msg.getRetryCount() >= msg.getMaxRetry()) {
                localMessageDao.updateStatus(msg.getId(), 2); // 标记失败
                alertService.sendAlert("消息发送超过最大重试次数: " + msg.getId());
            }
        }
    }
}

三、Broker 端 — 消息持久化与副本

3.1 消息持久化

将消息从内存写入磁盘,防止 Broker 重启后丢失。

RocketMQ 持久化配置

properties
# broker.conf
# 刷盘方式
flushDiskType=ASYNC_FLUSH   # 异步刷盘(性能好,丢失概率极低)
# flushDiskType=SYNC_FLUSH  # 同步刷盘(最安全,性能降低 10 倍)

# 存储路径
storePathRootDir=/data/rocketmq/store
storePathCommitLog=/data/rocketmq/store/commitlog
storePathConsumeQueue=/data/rocketmq/store/consumequeue

两种刷盘策略对比

同步刷盘:
  Producer → Broker(写入内存) → Broker(写入磁盘) → 返回 ACK
                              ↓
                        ✅ 必须落盘才确认

异步刷盘:
  Producer → Broker(写入内存) → 返回 ACK(快速)
                              ↓
                        ⏳ 后台线程批量刷盘(默认 500ms 间隔)
刷盘策略 可靠性 吞吐量 场景
同步刷盘 ⭐⭐⭐⭐⭐ ~3万 TPS 金融、对账核心链路
异步刷盘 ⭐⭐⭐⭐ ~10万+ TPS 大部分业务场景

3.2 副本同步

properties
# RocketMQ 主从同步配置

# Master 配置
brokerRole=SYNC_MASTER    # 同步复制模式
# brokerRole=ASYNC_MASTER # 异步复制模式

# Slave 配置
brokerRole=SLAVE
java
// Kafka 副本配置(Topic 级别)
// 创建 topic 时指定副本因子
// --replication-factor 3     # 每个分区 3 个副本
// --min-insync-replicas 2    # 最少同步副本数

三种复制模式数据安全对比

场景 SYNC_MASTER ASYNC_MASTER
Master 宕机但 Slave 已同步 ✅ 无丢失 ✅ 无丢失
Master 宕机且 Slave 未同步 ❌ 不丢(写 Slave 才返回) ⚠️ 可能丢消息
Master 磁盘损坏 依赖 Slave 依赖 Slave
性能 中等

四、消费端 — 消费者可靠消费

4.1 手动 ACK(最关键的防线)

绝对禁止使用自动 ACK,这是消费端丢消息最常见的原因。

java
// ❌ 错误做法:自动 ACK(消费未完成就确认)
props.put("enable.auto.commit", "true");       // Kafka
props.put("auto.commit.interval.ms", "5000");

// ✅ 正确做法:手动 ACK,业务处理完成再确认
// RocketMQ 手动确认
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("topic", "*");

consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    for (MessageExt msg : msgs) {
        try {
            // 1. 处理业务逻辑
            processMessage(msg);
            // 2. 业务成功 → 返回消费成功
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        } catch (Exception e) {
            log.error("消费失败,稍后重试", e);
            // 3. 返回重试状态,消息不会丢失
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
java
// Kafka 手动提交偏移量
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        try {
            processMessage(record);       // 处理业务
            // 处理成功 → 提交偏移量
            consumer.commitSync();        // 同步提交
            // 或 consumer.commitAsync();  // 异步提交
        } catch (Exception e) {
            // 处理失败 → 不提交偏移量,下次重新拉取该消息
            log.error("消费失败,偏移量不回退: {}", record.offset());
            break;
        }
    }
}

4.2 消费重试机制

消费失败时,MQ 会自动重试。

RocketMQ 重试配置

java
// 消费者设置重试次数
consumer.setMaxReconsumeTimes(16);   // 最大重试 16 次(默认)

// 重试间隔(RocketMQ 默认递增加载)
// 第1次:10 秒    第2次:30 秒    第3次:1 分钟
// 第4次:2 分钟   第5次:3 分钟   第6次:4 分钟
// 第7次:5 分钟   第8次:6 分钟   第9次:7 分钟
// 第10次:8 分钟  第11次:9 分钟  第12次:10 分钟
// 第13次:20 分钟 第14次:30 分钟 第15次:1 小时
// 第16次:2 小时

Kafka 重试策略(需在消费者中自行实现):

java
// Kafka 没有内置重试队列,通常结合重试 Topic 实现
public class RetryConsumer {
    // 主消费者
    public void consumeMain() {
        KafkaConsumer<String, String> consumer = createConsumer("main_topic");
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                try {
                    processMessage(record);
                    consumer.commitSync();
                } catch (RetryableException e) {
                    // 发送到重试 Topic
                    sendToRetryTopic(record, 1);  // 第 1 次重试
                    consumer.commitSync();         // 提交偏移量(防止重复消费)
                }
            }
        }
    }

    // 重试消费者(延迟消费)
    public void consumeRetry() {
        KafkaConsumer<String, String> consumer = createConsumer("retry_topic");
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                int retryCount = extractRetryCount(record);
                if (retryCount >= 3) {
                    sendToDlq(record);  // 超过 3 次 → 死信队列
                    consumer.commitSync();
                    continue;
                }
                try {
                    processMessage(record);
                    consumer.commitSync();
                } catch (RetryableException e) {
                    Thread.sleep(retryCount * 1000L);  // 递增延迟
                    sendToRetryTopic(record, retryCount + 1);
                    consumer.commitSync();
                }
            }
        }
    }
}

五、死信队列(DLQ, Dead Letter Queue)

5.1 什么是死信队列

消息在以下情况会进入死信队列:

  1. 消费重试次数超过最大限制
  2. 消息过期未被消费
  3. 消息在 Broker 端校验失败
                         重试次数超限
  正常消费 ───▶ 重试 16 次 ─────────▶ 死信队列
                                          │
                                    ┌─────▼──────┐
                                    │   人工/自动   │
                                    │   排查处理    │
                                    └─────────────┘

5.2 RocketMQ 死信队列实战

java
// RocketMQ 自动创建死信队列
// 原消费组: consumer_group
// 原 Topic: order_topic
// 死信队列 Topic: %DLQ%consumer_group
// 死信队列中的消息格式与原消息一致,额外增加重试次数 header

// 配置消费者监听死信队列
DefaultMQPushConsumer dlqConsumer = new DefaultMQPushConsumer("dlq_consumer_group");
dlqConsumer.setNamesrvAddr("localhost:9876");
dlqConsumer.subscribe("%DLQ%consumer_group", "*");

dlqConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    for (MessageExt msg : msgs) {
        String msgId = msg.getMsgId();
        String body = new String(msg.getBody(), StandardCharsets.UTF_8);
        int retryTimes = msg.getReconsumeTimes();

        log.error("死信消息: msgId={}, retryTimes={}, body={}", msgId, retryTimes, body);

        // 发送告警
        alertService.sendAlert("消息进入死信队列: " + msgId);
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
dlqConsumer.start();

5.3 Kafka 死信队列实现

java
// Kafka 需要手动实现死信队列
public class KafkaDlqExample {

    private static final String MAIN_TOPIC = "order_topic";
    private static final String DLQ_TOPIC = "order_topic_dlq";

    // 死信队列生产者
    public void sendToDlq(ConsumerRecord<String, String> record, String errorMsg) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
            // 原消息体 + 异常信息
            JsonObject dlqMessage = new JsonObject();
            dlqMessage.addProperty("original_value", record.value());
            dlqMessage.addProperty("error", errorMsg);
            dlqMessage.addProperty("original_topic", record.topic());
            dlqMessage.addProperty("original_partition", record.partition());
            dlqMessage.addProperty("original_offset", record.offset());
            dlqMessage.addProperty("timestamp", LocalDateTime.now().toString());

            ProducerRecord<String, String> dlqRecord =
                new ProducerRecord<>(DLQ_TOPIC, record.key(), dlqMessage.toString());
            producer.send(dlqRecord);
        }
    }

    // 死信队列消费者
    public void consumeDlq() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "dlq_consumer_group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("enable.auto.commit", "false");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(DLQ_TOPIC));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                log.warn("死信消息: {}", record.value());
                // 人工介入处理
                consumer.commitSync();
            }
        }
    }
}

六、全链路可靠性实战方案

6.1 核心链路配置清单

生产端:
  ✅ 同步发送 + ACK 确认
  ✅ 发送重试 3 次
  ✅ 本地消息表兜底

Broker 端:
  ✅ 消息持久化(同步刷盘或异步刷盘)
  ✅ 主从同步(SYNC_MASTER 模式)
  ✅ 多副本(至少 2 副本)

消费端:
  ✅ 手动 ACK(绝对禁止自动 ACK)
  ✅ 业务成功才确认
  ✅ 合理配置重试次数
  ✅ 死信队列监控与告警

6.2 可靠性验证 Checklist

markdown
- [ ] 生产端:发送失败时业务有补偿机制?
- [ ] 生产端:是否配置了重试次数和超时时间?
- [ ] Broker:刷盘策略是否合理?
- [ ] Broker:副本数是否 ≥ 2?
- [ ] Broker:是否有监控(磁盘、内存、堆积)?
- [ ] 消费端:是否使用手动 ACK?
- [ ] 消费端:重试次数是否合理?
- [ ] 消费端:死信队列是否有专人处理?
- [ ] 全链路:是否有消息对账机制?

6.3 消息轨迹与链路追踪

java
// RocketMQ 内置消息轨迹(Trace)
DefaultMQProducer producer = new DefaultMQProducer("group");
producer.setNamesrvAddr("localhost:9876");
producer.setEnableMsgTrace(true);                  // 开启轨迹追踪
producer.setCustomizedTraceTopic("rmq_sys_trace"); // 指定轨迹存储 Topic

// 查询消息轨迹(RocketMQ 控制台或 API)
// 可以看到:
// 1. 消息什么时候产生的
// 2. 什么时候到达 Broker
// 3. 哪个消费者在什么时候消费的
// 4. 消费是否成功

七、面试高频问题

Q1:如何保证消息不丢失?

从三个环节分别回答:

  1. 生产端:ACK + 重试 + 本地消息表
  2. Broker:持久化 + 副本同步
  3. 消费端:手动 ACK + 重试 + DLQ

Q2:同步刷盘和异步刷盘的区别?

维度 同步刷盘 异步刷盘
机制 写入磁盘才返回 ACK 写入内存即返回 ACK
可靠性 极高 高(可能丢毫秒级数据)
性能 约 3万 TPS 约 10万+ TPS
场景 金融、支付 大部分业务

Q3:什么是死信队列?有什么作用?

  • 死信是消费失败次数超限或过期的消息
  • Rrole:兜底处理,防止消息无限重试,便于问题排查
  • 应配置监控告警,死信出现说明存在消费异常

总结:消息可靠性需要全链路保障。三个环节中最容易被忽视的是消费端自动 ACK 问题,这也是生产环境消息丢失最常见的元凶。建议核心链路采用 同步发送 + SYNC_MASTER + 手动 ACK 的配置组合,同时配合死信队列监控,确保消息万无一失。