← 返回 MQ 列表

消息队列基础概念篇

消息队列基础概念篇

一、什么是消息队列

消息队列(Message Queue, MQ) 是一种分布式异步通信中间件,本质是一个先进先出(FIFO) 的队列数据结构,用于暂存和传递消息。它让服务之间通过"队列"这个中间层进行通信,而不是直接调用。

核心思想

传统调用:服务A → 服务B(同步阻塞,耦合紧密)
MQ模式:服务A → 队列(立即返回) → 服务B(异步消费)

二、核心角色

角色 说明 类比
生产者(Producer) 发送消息的服务/客户端 寄信人
消息(Message) 传输的数据载体 信件内容
队列/主题(Queue/Topic) 消息存储与分组载体 邮局/信箱
消费者(Consumer) 接收并处理消息的服务/客户端 收信人
Broker MQ 服务器本身 邮局系统

消息的组成结构

一条消息通常包含以下元数据:

json
{
  "msgId": "AC1E8A9B3F4D2C7E",       // 全局唯一ID
  "topic": "order_paid",               // 主题
  "tag": "vip_order",                  // 标签(用于过滤)
  "key": "order_20240527_10086",       // 业务Key
  "body": {                            // 消息体(业务数据)
    "orderId": 10086,
    "userId": 9527,
    "amount": 299.00
  },
  "bornTime": 1716787200000,           // 产生时间戳
  "retryTimes": 0                      // 已重试次数
}

三、核心特性

1. 先进先出(FIFO)

同一个队列内,消息按发送顺序存储和投递,保证基本的有序性。

2. 异步

生产者发送消息后立即返回,无需等待消费者处理,显著降低链路延迟。

3. 解耦

生产者和消费者不直接依赖,各自独立演进。消费者宕机不影响生产者。

4. 削峰填谷

突发流量先进入队列缓冲,消费者按自身能力平稳消费,防止下游被冲垮。

5. 重试与容错

消费失败可自动重试,多次失败转入死信队列,不阻塞主流程。

6. 持久化

消息可落盘存储,即使 MQ 重启也不丢失数据。


四、使用场景(含实战案例)

场景一:应用解耦

业务背景:用户下单后,需要通知库存系统扣减库存、通知积分系统发放积分、通知物流系统创建配送单。

传统耦合方式的问题

java
// 伪代码 — 强耦合调用
public void createOrder(Order order) {
    orderDao.insert(order);                    // 1. 保存订单
    inventoryService.deduct(order);            // 2. 扣库存(强依赖)
    pointsService.add(order.getUserId(), 100); // 3. 加积分(强依赖)
    logisticsService.create(order);            // 4. 创建物流(强依赖)
}

问题:任何一个下游系统宕机,下单就会失败。

MQ 解耦方案

java
// 生产者:下单后只发送一条消息
public void createOrder(Order order) {
    orderDao.insert(order);
    // 发送订单消息到 MQ,立即返回
    producer.send("order_paid", order.toMessage());
    // 下单成功,无需等待其他系统
}

// 消费者1:库存服务 — 独立消费
@MessageListener(topic = "order_paid")
public void handleStock(OrderMessage msg) {
    inventoryService.deduct(msg.toOrder());
}

// 消费者2:积分服务 — 独立消费
@MessageListener(topic = "order_paid")
public void handlePoints(OrderMessage msg) {
    pointsService.add(msg.getUserId(), 100);
}

// 消费者3:物流服务 — 独立消费
@MessageListener(topic = "order_paid")
public void handleLogistics(OrderMessage msg) {
    logisticsService.create(msg.toOrder());
}

效果:库存服务临时宕机,下单照常进行,消息积压在 MQ,库存服务恢复后继续消费。


场景二:异步处理

业务背景:用户注册后需要发送欢迎短信、邮件、推送,这些操作耗时且不需要即时返回。

java
// 同步方案:总计约 3 秒
public void register(User user) {
    userDao.insert(user);                 // 50ms
    smsService.sendWelcome(user.getPhone());   // 800ms
    emailService.sendWelcome(user.getEmail()); // 1500ms
    pushService.sendWelcome(user.getDeviceId()); // 700ms
    return "注册成功";  // 用户等待 3 秒才看到结果
}

// MQ 异步方案:总计约 100ms
public void register(User user) {
    userDao.insert(user);                     // 50ms
    producer.send("user_registered", user);   // 30ms
    return "注册成功";  // 用户几乎立即看到结果
    // 短信/邮件/推送异步执行,不影响用户体验
}

场景三:流量削峰

业务背景:秒杀活动开始瞬间,每秒 10 万请求,数据库只能承受每秒 1 万。

java
// 秒杀接口 — 流量先进 MQ 缓冲
@PostMapping("/seckill")
public Result seckill(Long userId, Long productId) {
    // 1. 前置校验(轻量)
    if (stockService.hasStock(productId)) {
        // 2. 秒杀请求写入 MQ,立即返回"排队中"
        producer.send("seckill_request", new SeckillMessage(userId, productId));
        return Result.pending("排队中,请稍后...");
    }
    return Result.fail("已售罄");
}

// 消费者 — 按数据库承受能力平滑消费
@MessageListener(topic = "seckill_request")
public void processSeckill(SeckillMessage msg) {
    // 每次只消费 5000 条/秒(数据库的承受能力)
    orderService.createSeckillOrder(msg.getUserId(), msg.getProductId());
}

效果对比

指标 无 MQ 有 MQ
瞬间请求量 10万/秒 10万/秒写入MQ
数据库压力 10万/秒 → 直接崩溃 5000/秒 → 平稳运行
用户体验 大量超时/500 统一提示"排队中",异步告知结果

场景四:分布式事务最终一致性

业务背景:下单扣库存,订单系统和库存系统分属不同数据库。

java
// 使用 RocketMQ 事务消息实现最终一致性
public void createOrderWithTransaction(Order order) {
    // 1. 发送半消息(prepare 阶段)
    TransactionSendResult result = producer.sendMessageInTransaction(
        "order_tx", order, null
    );
}

// 2. RocketMQ 回调执行本地事务
@TransactionListener
public class OrderTransactionListener implements TransactionListener {
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        Order order = msg.toOrder();
        try {
            orderDao.insert(order);                    // 本地事务
            return LocalTransactionState.COMMIT;       // 提交 → MQ 投递消息
        } catch (Exception e) {
            return LocalTransactionState.ROLLBACK;     // 回滚 → MQ 删除消息
        }
    }

    @Override
    public LocalTransactionState checkLocalTransaction(Message msg) {
        // 事务回查(网络超时等异常情况)
        return orderDao.exists(msg.getOrderId())
            ? LocalTransactionState.COMMIT
            : LocalTransactionState.ROLLBACK;
    }
}

// 3. 库存服务消费消息(最终一致)
@MessageListener(topic = "order_tx")
public void deductStock(Order order) {
    stockService.deduct(order.getProductId(), order.getQuantity());
}

场景五:数据同步

业务背景:MySQL 数据同步到 Elasticsearch 做全文搜索。

java
// 业务服务 — 数据变更时发送消息
public void updateProduct(Product product) {
    productDao.update(product);
    producer.send("product_change", new SyncMessage("UPDATE", product));
}

// 同步服务 — 消费消息写入 ES
@MessageListener(topic = "product_change")
public void syncToEs(Product product) {
    if ("UPDATE".equals(product.getAction())) {
        esClient.update("product_index", product.toJson());
    } else if ("DELETE".equals(product.getAction())) {
        esClient.delete("product_index", product.getId());
    }
}

五、使用 MQ 的注意事项

1. 引入 MQ 带来的新问题

问题 说明
系统复杂度上升 需要维护 MQ 集群,排查链路更长
消息丢失风险 需配置持久化、ACK 等机制保证
消息重复风险 生产者重试导致重复消息,消费端需幂等
顺序问题 多分区/多消费者场景需要额外设计
运维成本 需要监控堆积、延迟、节点健康等

2. 什么时候不适合用 MQ

  • 调用方需要同步实时结果(如查询操作)
  • 业务链路需要强事务一致性(如转账扣款)
  • 系统规模小、服务数量少(引入 MQ 反而增加复杂度)

3. 选型初步考虑

需求场景 推荐方案
复杂路由、灵活交换 RabbitMQ
高可用、事务消息、国内业务 RocketMQ
大数据、日志、流处理 Kafka
轻量级、小项目 Redis List / 内存队列

总结:消息队列的核心价值在于解耦、异步、削峰,但引入的同时也带来了一致性、顺序、幂等等新挑战。生产环境中通常结合具体业务场景选择合适的一款 MQ,并做好监控和容错。