← 返回 MQ 列表

消息队列运维与调优篇

消息队列运维与调优篇

消息队列在生产环境中的稳定性至关重要。本篇从监控指标、性能调优、高可用保障、安全四大维度出发,结合实际场景介绍 MQ 的运维实践。


一、监控指标体系

1.1 核心监控指标

分类 指标 说明 告警阈值
生产端 生产 TPS 每秒消息发送量 超过阈值告警
发送耗时 消息发送到确认的延迟 > 500ms 告警
发送失败率 发送失败的比例 > 1% 告警
消费端 消费 TPS 每秒消息消费量 过低告警
消费延迟 消息从生产到消费的时间差 > 5分钟 告警
消费失败率 消费确认失败的比例 > 1% 告警
Broker 堆积数量 待消费消息总数 > 1万 预警,> 10万 告警
磁盘使用率 存储空间使用 > 80% 预警,> 90% 告警
内存使用率 JVM 堆内存 > 80% 告警
CPU 使用率 Broker 进程 CPU > 80% 告警
网络 网络吞吐 入站/出站流量 带宽 > 80% 告警
连接数 生产者+消费者连接 接近上限告警

1.2 RocketMQ 监控实战

java
/**
 * 使用 RocketMQ Admin API 获取监控数据
 */
@Component
public class RocketMQMonitor {

    private DefaultMQAdminExt admin;

    @PostConstruct
    public void init() {
        admin = new DefaultMQAdminExt();
        admin.setNamesrvAddr("localhost:9876");
        admin.start();
    }

    /**
     * 获取消费者堆积情况
     */
    public Map<String, Long> getConsumerOffset(String consumerGroup, String topic)
            throws Exception {
        Map<String, Long> result = new HashMap<>();

        // 获取 Broker 上的消费进度
        ConsumeStats consumeStats = admin.examineConsumeStats(consumerGroup, topic);

        for (Map.Entry<MessageQueue, OffsetWrapper> entry :
                consumeStats.getOffsetTable().entrySet()) {
            MessageQueue mq = entry.getKey();
            OffsetWrapper offsetWrapper = entry.getValue();

            long consumedOffset = offsetWrapper.getConsumerOffset(); // 已消费
            long brokerOffset = offsetWrapper.getBrokerOffset();     // 最新消息
            long diff = brokerOffset - consumedOffset;               // 堆积量

            String key = mq.getBrokerName() + "-" + mq.getQueueId();
            result.put(key, diff);
        }
        return result;
    }

    /**
     * 监控 Topic 的生产速率
     */
    public TopicStatsTable getTopicStats(String topic) throws Exception {
        return admin.examineTopicStats(topic);
    }

    /**
     * 定时收集监控数据
     */
    @Scheduled(fixedDelay = 30000) // 每 30 秒
    public void collectMetrics() {
        try {
            // 1. 检查堆积
            Map<String, Long> offsets = getConsumerOffset("order_group", "order_topic");
            long totalDiff = offsets.values().stream().mapToLong(Long::longValue).sum();

            // 2. 发送到监控系统(Prometheus / Grafana)
            if (totalDiff > 10000) {
                alertService.sendUrgent("消息堆积告警: " + totalDiff);
            }
            metricsCollector.gauge("mq.accumulation", totalDiff, "topic=order_topic");

            // 3. 检查集群状态
            ClusterInfo clusterInfo = admin.examineBrokerClusterInfo();
            for (String brokerName : clusterInfo.getBrokerAddrTable().keySet()) {
                BrokerData brokerData = clusterInfo.getBrokerAddrTable().get(brokerName);
                metricsCollector.gauge("mq.broker.count", 1,
                    "broker=" + brokerName);
            }

        } catch (Exception e) {
            log.error("监控数据采集失败", e);
        }
    }
}

1.3 Kafka 监控实战

java
/**
 * Kafka 监控 — 使用 AdminClient
 */
@Component
public class KafkaMonitor {

    private final KafkaAdminClient adminClient;

    public KafkaMonitor() {
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        this.adminClient = (KafkaAdminClient) KafkaAdminClient.create(props);
    }

    /**
     * 获取消费者组堆积
     */
    public Map<TopicPartition, Long> getGroupLag(String groupId) throws Exception {
        Map<TopicPartition, Long> result = new HashMap<>();

        // 获取消费者组信息
        ConsumerGroupDescription groupDesc = adminClient
            .describeConsumerGroups(Collections.singletonList(groupId))
            .describedGroups()
            .get(groupId, 10, TimeUnit.SECONDS);

        // 获取各分区最新偏移量
        List<TopicPartition> partitions = groupDesc.members().stream()
            .flatMap(m -> m.assignment().topicPartitions().stream())
            .collect(Collectors.toList());

        Map<TopicPartition, OffsetSpec> requestLatest = partitions.stream()
            .collect(Collectors.toMap(p -> p, p -> OffsetSpec.latest()));

        Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> latestOffsets =
            adminClient.listOffsets(requestLatest).all()
                .get(10, TimeUnit.SECONDS);

        // 获取消费组已提交的偏移量
        Map<TopicPartition, OffsetAndMetadata> committed =
            adminClient.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata()
                .get(10, TimeUnit.SECONDS);

        // 计算 Lag
        for (TopicPartition tp : partitions) {
            long latest = latestOffsets.get(tp).offset();
            long committedOffset = committed.containsKey(tp)
                ? committed.get(tp).offset() : 0;
            result.put(tp, latest - committedOffset);
        }

        return result;
    }
}

1.4 Grafana 告警规则示例

yaml
# Prometheus 告警规则
groups:
  - name: mq_alerts
    rules:
      - alert: MQAccumulationHigh
        expr: mq_accumulation > 10000
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "消息堆积超过 1 万"
          description: "Topic: {{ $labels.topic }}, 堆积: {{ $value }}"

      - alert: MQConsumeLagHigh
        expr: mq_consume_lag_seconds > 300
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "消费延迟超过 5 分钟"

      - alert: MQDiskUsageHigh
        expr: mq_disk_usage_percent > 85
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "MQ 磁盘使用率超过 85%"

二、性能调优

2.1 RocketMQ 调优

Broker 配置调优

properties
# broker.conf — 核心性能参数

# === 存储调优 ===
# 异步刷盘间隔(默认 500ms)
flushInterval=500
# 同步刷盘提交间隔
syncFlushTimeout=5000

# === 内存调优 ===
# 映射文件总内存(建议 4-8G)
mapedFileSizeCommitLog=1073741824  # 1GB
# 传输队列内存
mapedFileSizeConsumeQueue=300000   # 30万的索引
# 最大传输队列偏移量
maxTransferBytesOnMessageInMemory=131072  # 每个请求最大 128KB
# 内存中最大消息数
maxTransferCountOnMessageInMemory=32

# === 线程池调优 ===
# 发送消息线程数
sendMessageThreadPoolNums=16
# 拉消息线程数
pullMessageThreadPoolNums=16
# 处理请求线程数
serverWorkerThreads=8
# 处理回调线程数
serverCallbackExecutorThreads=8

# === 网络调优 ===
# 单次最大拉取消息数
pullBatchSize=32
# 消费者最大批量拉取大小
consumerFallbackThreshold=32
# Broker 处理能力上限
osPageCacheBusyTimeOutMills=3000

JVM 调优

bash
# bin/runbroker.sh — JVM 参数
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m"
JAVA_OPT="${JAVA_OPT} -XX:MaxGCPauseMillis=20"
JAVA_OPT="${JAVA_OPT} -XX:+DisableExplicitGC"
JAVA_OPT="${JAVA_OPT} -XX:+PrintGCDetails -XX:+PrintGCDateStamps"
JAVA_OPT="${JAVA_OPT} -Xloggc:/var/log/rocketmq/gc.log"

# 内存分配建议:
# 总内存 8G 时:
#   -Xms8g          堆内存 8G
#   -Xmn4g          年轻代 4G(一半给年轻代)
#   -XX:MaxDirectMemorySize=2g  堆外内存 2G

生产者调优

java
/**
 * RocketMQ 生产者性能调优
 */
DefaultMQProducer producer = new DefaultMQProducer("optimized_group");
producer.setNamesrvAddr("localhost:9876");

// === 批量发送 ===
// 开启批量发送(默认已开启)
producer.setSendLatencyFaultEnable(true);
// 批量消息大小上限(默认 4MB,需与 Broker 配置一致)
producer.setMaxMessageSize(4194304); // 4MB

// === 异步发送(推荐高吞吐场景)===
// 异步发送的回执线程池
producer.setCallbackExecutor(new ThreadPoolExecutor(
    4, 8, 60, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(1000)));

// === 超时与重试 ===
producer.setSendMsgTimeout(3000);
producer.setRetryTimesWhenSendFailed(2);
producer.setRetryTimesWhenSendAsyncFailed(2);

消费者调优

java
/**
 * RocketMQ 消费者性能调优
 */
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("optimized_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("topic", "*");

// === 消费线程数 ===
consumer.setConsumeThreadMin(20);   // 最小消费线程
consumer.setConsumeThreadMax(64);   // 最大消费线程

// === 批量消费 ===
consumer.setConsumeMessageBatchMaxSize(32); // 一次拉取最多 32 条

// === 消费模式 ===
consumer.setPullBatchSize(32);              // 每次拉取 32 条
consumer.setPullInterval(0);                // 拉取间隔(0 不等待)

// === 消费重试 ===
consumer.setMaxReconsumeTimes(16);          // 最大重试次数

2.2 Kafka 调优

properties
# server.properties — Kafka Broker 配置

# === 日志段配置 ===
log.segment.bytes=1073741824          # 1GB,增大减少文件数
log.retention.hours=72                # 保留 72 小时
log.retention.check.interval.ms=300000 # 5 分钟检查一次

# === 网络调优 ===
# 单 socket 最大请求大小
socket.request.max.bytes=104857600    # 100MB
# 网络线程数(CPU 核心数 * 2)
network.threads=8
# IO 线程数
io.threads=8

# === 刷盘调优 ===
# 数据刷盘前最大累积消息数
log.flush.interval.messages=10000
# 刷盘间隔
log.flush.interval.ms=1000

# === 复制调优 ===
replica.fetch.max.bytes=10485760      # 10MB
num.replica.fetchers=4
java
/**
 * Kafka 生产者调优
 */
Properties props = new Properties();

// === 批次优化 ===
props.put("batch.size", 32768);           // 32KB,批次越大吞吐越高
props.put("linger.ms", 10);               // 最多等待 10ms
props.put("buffer.memory", 33554432);     // 32MB 缓冲区

// === 压缩 ===
props.put("compression.type", "snappy");  // snappy/lz4/zstd

// === ACK 级别 ===
props.put("acks", "1");                   // 吞吐与可靠性的平衡

// === 重试 ===
props.put("retries", 3);
props.put("retry.backoff.ms", 500);
java
/**
 * Kafka 消费者调优
 */
Properties props = new Properties();

// === 拉取调优 ===
props.put("fetch.min.bytes", 1024);           // 最少拉取 1KB
props.put("fetch.max.wait.ms", 500);           // 最多等待 500ms
props.put("max.partition.fetch.bytes", 1048576); // 每个分区最大 1MB

// === 消费速率 ===
props.put("max.poll.records", 500);            // 单次最多 500 条

// === Session 超时 ===
props.put("session.timeout.ms", 30000);        // 30 秒检测消费者存活
props.put("heartbeat.interval.ms", 3000);      // 3 秒心跳

2.3 性能调优 Checklist

markdown
## 通用
- [ ] 消息体是否过大?建议 < 1MB,避免大消息
- [ ] 是否开启批量发送/消费?
- [ ] 是否使用异步发送(非必要等待结果时)?
- [ ] 消费者线程数是否合理?

## RocketMQ
- [ ] Broker JVM 参数是否优化(GC 策略、堆大小)?
- [ ] 刷盘策略是否合理(异步刷盘性能远高于同步)?
- [ ] 队列数是否合理(建议 8-32,消费者数 ≤ 队列数)?
- [ ] 是否开启了消息轨迹(影响性能,非必须时关闭)?

## Kafka
- [ ] Topic 分区数是否合理(建议 3-10 个/节点)?
- [ ] 是否开启了消息压缩?
- [ ] 批次大小(batch.size)是否调优?
- [ ] Kafka 版本是否较新(新版本性能持续提升)?

三、高可用保障

3.1 架构高可用

markdown
## RocketMQ 高可用架构

          NameServer1          NameServer2
              │                    │
     ┌────────┼────────────────────┼────────┐
     │        │                    │        │
  Broker-A    │              Broker-B      │
  (Master)────┤──SYNC──────(Master)        │
     │        │                    │        │
  Broker-A    │              Broker-B      │
  (Slave)     │              (Slave)        │
     └────────┴────────────────────┴────────┘

要求:
  ✅ 至少 2 个 NameServer(无状态,不互相通信)
  ✅ Broker 主从部署(至少 2 台物理机)
  ✅ 消费者/生产者连接多个 NameServer
yaml
# 高可用部署建议(以 RocketMQ 为例)
# 机器配置:4 台物理机(2 主 2 从)
主机1: NameServer1 + Broker-A Master
主机2: NameServer2 + Broker-A Slave
主机3: NameServer1 + Broker-B Master  (与主机1 共用 NameServer)
主机4: NameServer2 + Broker-B Slave  (与主机2 共用 NameServer)

3.2 集群故障演练

java
/**
 * 故障场景与应对方案
 */
public class FailoverTest {

    /**
     * 场景 1:NameServer 挂了一个
     * 影响:无影响(客户端连接另一个)
     * 应对:无需干预,自动切换
     */

    /**
     * 场景 2:Master 宕机
     * 影响:
     *   - 同步复制:Slave 自动接管
     *   - 异步复制:Slave 接管,可能丢少量数据
     * 应对:
     *   1. 检查 Slave 是否已接管
     *   2. 重启 Master 或切新 Master
     *   RocketMQ 4.5+ 支持自动主从切换(DLedger)
     */
    // broker.conf — 开启 DLedger
    enableDLegerCommitLog=true
    dLegerGroup=broker-a
    dLegerPeers=n1-127.0.0.1:40911;n2-127.0.0.1:40912;n3-127.0.0.1:40913
    dLegerSelfId=n1

    /**
     * 场景 3:磁盘写满
     * 影响:消息无法写入,Broker 拒绝服务
     * 应对:
     *   1. 清理过期消息(fileReservedTime)
     *   2. 扩容磁盘
     *   3. 配置磁盘告警(提前发现)
     */

    /**
     * 场景 4:网络分区
     * 影响:部分客户端断开连接
     * 应对:
     *   1. 多机房部署
     *   2. 跨机房同步(RocketMQ 支持多机房复制)
     */
}

3.3 容灾备份方案

bash
# 方案一:数据备份(物理文件拷贝)
# 定期备份 CommitLog 文件
rsync -avz /data/rocketmq/store/commitlog/ backup@backup-server:/backup/mq/

# 方案二:跨集群复制(RocketMQ)
# 使用 RocketMQ-Exporter 或 MirrorMaker
bin/mqadmin brokerConsumerOffset --group group_name --topic topic_name --broker broker_name

# 方案三:消息对账
# 生产者记录发送日志
# 消费者记录消费日志
# 定时对账,发现差异人工修复

四、安全管理

4.1 访问控制(ACL)

properties
# RocketMQ ACL 配置
# plain_acl.yml
accounts:
  - accessKey: "admin"
    secretKey: "admin123"
    admin: true
    defaultTopicPerm: "PUB|SUB"
    defaultGroupPerm: "PUB|SUB"
    topicPerms:
      - order_topic=PUB|SUB
      - pay_topic=PUB|SUB
    groupPerms:
      - order_group=SUB

  - accessKey: "order_service"
    secretKey: "order2024"
    admin: false
    topicPerms:
      - order_topic=PUB
    groupPerms:
      - order_group=SUB

  - accessKey: "pay_service"
    secretKey: "pay2024"
    admin: false
    topicPerms:
      - pay_topic=PUB
    groupPerms:
      - pay_group=SUB
java
/**
 * RocketMQ ACL 客户端配置
 */
// 生产者
DefaultMQProducer producer = new DefaultMQProducer("order_group");
producer.setNamesrvAddr("localhost:9876");

// 配置 ACL
producer.setAccessChannel(AccessChannel.LOCAL);
RPCHook aclHook = new AclClientRPCHook(
    new SessionCredentials("order_service", "order2024"));
producer.setRPCHook(aclHook);

4.2 Kafka 安全配置

properties
# Kafka SSL 配置
# server.properties
listeners=SSL://localhost:9093
ssl.keystore.location=/etc/kafka/secrets/kafka.keystore.jks
ssl.keystore.password=changeit
ssl.key.password=changeit
ssl.truststore.location=/etc/kafka/secrets/kafka.truststore.jks
ssl.truststore.password=changeit
ssl.client.auth=required

# SASL 认证
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
allow.everyone.if.no.acl.found=false
super.users=User:admin

4.3 传输加密

bash
# RocketMQ TLS 配置
# broker.conf
tls.enable=true
tls.truststore.path=/etc/rocketmq/tls/truststore.jks
tls.truststore.password=changeit
tls.keystore.path=/etc/rocketmq/tls/keystore.jks
tls.keystore.password=changeit

# 客户端配置
properties.setProperty(TlsSystemConfig.TLS_ENABLE, "true");
properties.setProperty(TlsSystemConfig.TLS_TRUSTSTORE_PATH, "/etc/rocketmq/tls/truststore.jks");

4.4 安全最佳实践

markdown
## 生产环境安全 Checklist

- [ ] 生产环境开启 ACL(禁止无认证访问)
- [ ] 最小权限原则,每个服务只授权需要的 Topic/Group
- [ ] 使用 SSL/TLS 加密网络传输
- [ ] 管理后台(Dashboard)设置登录认证
- [ ] 定期轮换密钥和密码
- [ ] 审计日志记录(谁在何时做了什么操作)
- [ ] 端口限制(只开放必要端口,限制 IP 白名单)
- [ ] 不将 MQ 暴露在公网

五、运维实战:常见问题处理

5.1 消息堆积紧急处理

bash
# 1. 确认堆积情况
bin/mqadmin consumerProgress -n localhost:9876 -g order_group

# 2. 临时扩容消费者(情况紧急时)
# RocketMQ 扩容:增加消费者实例(确保队列数足够)

# 3. 如果无法扩容,重设消费偏移量跳过部分消息
bin/mqadmin resetOffsetByTime -n localhost:9876 \
  -g order_group -t order_topic -s now

# 4. 临时关闭不重要的消费者
# 将资源让给核心业务的消费者

# 5. 告警:如果以上步骤不奏效,人工介入排查慢消费原因

5.2 消费者宕机恢复

bash
# 1. 查看消费组状态
bin/mqadmin consumerStatus -n localhost:9876 -g order_group

# 2. 确认消费者是否在线
bin/mqadmin consumerConnection -n localhost:9876 -g order_group

# 3. 重新启动消费者后,检查偏移量是否正常
# RocketMQ 会自动从上次提交的偏移量开始消费(集群模式)

# 4. 如果消费者长期未上线,偏移量已过期
# 需要决定是从最新开始消费还是从头开始

5.3 版本升级与迁移

bash
# RocketMQ 灰度升级步骤
# 1. 先升级 Slave 节点
bin/mqshutdown broker
# 更新部署包
bin/mqbroker -c conf/broker-slave.conf

# 2. 等待 Slave 同步完成
# 3. 进行主从切换
# 4. 升级原 Master 节点
# 5. 重新建立主从关系

总结:MQ 运维的核心在于监控告警性能调优高可用架构三者的结合。提前建立完善的监控体系(堆积、延迟、系统资源),并通过压测确定合理的性能参数,比发生故障后再排查要高效得多。安全方面,生产环境至少应开启 ACL 认证和网络隔离。