消息队列基础概念篇
一、什么是消息队列
消息队列(Message Queue, MQ) 是一种分布式异步通信中间件,本质是一个先进先出(FIFO) 的队列数据结构,用于暂存和传递消息。它让服务之间通过"队列"这个中间层进行通信,而不是直接调用。
核心思想
传统调用:服务A → 服务B(同步阻塞,耦合紧密)
MQ模式:服务A → 队列(立即返回) → 服务B(异步消费)
二、核心角色
| 角色 | 说明 | 类比 |
|---|---|---|
| 生产者(Producer) | 发送消息的服务/客户端 | 寄信人 |
| 消息(Message) | 传输的数据载体 | 信件内容 |
| 队列/主题(Queue/Topic) | 消息存储与分组载体 | 邮局/信箱 |
| 消费者(Consumer) | 接收并处理消息的服务/客户端 | 收信人 |
| Broker | MQ 服务器本身 | 邮局系统 |
消息的组成结构
一条消息通常包含以下元数据:
json
{
"msgId": "AC1E8A9B3F4D2C7E", // 全局唯一ID
"topic": "order_paid", // 主题
"tag": "vip_order", // 标签(用于过滤)
"key": "order_20240527_10086", // 业务Key
"body": { // 消息体(业务数据)
"orderId": 10086,
"userId": 9527,
"amount": 299.00
},
"bornTime": 1716787200000, // 产生时间戳
"retryTimes": 0 // 已重试次数
}
三、核心特性
1. 先进先出(FIFO)
同一个队列内,消息按发送顺序存储和投递,保证基本的有序性。
2. 异步
生产者发送消息后立即返回,无需等待消费者处理,显著降低链路延迟。
3. 解耦
生产者和消费者不直接依赖,各自独立演进。消费者宕机不影响生产者。
4. 削峰填谷
突发流量先进入队列缓冲,消费者按自身能力平稳消费,防止下游被冲垮。
5. 重试与容错
消费失败可自动重试,多次失败转入死信队列,不阻塞主流程。
6. 持久化
消息可落盘存储,即使 MQ 重启也不丢失数据。
四、使用场景(含实战案例)
场景一:应用解耦
业务背景:用户下单后,需要通知库存系统扣减库存、通知积分系统发放积分、通知物流系统创建配送单。
传统耦合方式的问题:
java
// 伪代码 — 强耦合调用
public void createOrder(Order order) {
orderDao.insert(order); // 1. 保存订单
inventoryService.deduct(order); // 2. 扣库存(强依赖)
pointsService.add(order.getUserId(), 100); // 3. 加积分(强依赖)
logisticsService.create(order); // 4. 创建物流(强依赖)
}
问题:任何一个下游系统宕机,下单就会失败。
MQ 解耦方案:
java
// 生产者:下单后只发送一条消息
public void createOrder(Order order) {
orderDao.insert(order);
// 发送订单消息到 MQ,立即返回
producer.send("order_paid", order.toMessage());
// 下单成功,无需等待其他系统
}
// 消费者1:库存服务 — 独立消费
@MessageListener(topic = "order_paid")
public void handleStock(OrderMessage msg) {
inventoryService.deduct(msg.toOrder());
}
// 消费者2:积分服务 — 独立消费
@MessageListener(topic = "order_paid")
public void handlePoints(OrderMessage msg) {
pointsService.add(msg.getUserId(), 100);
}
// 消费者3:物流服务 — 独立消费
@MessageListener(topic = "order_paid")
public void handleLogistics(OrderMessage msg) {
logisticsService.create(msg.toOrder());
}
效果:库存服务临时宕机,下单照常进行,消息积压在 MQ,库存服务恢复后继续消费。
场景二:异步处理
业务背景:用户注册后需要发送欢迎短信、邮件、推送,这些操作耗时且不需要即时返回。
java
// 同步方案:总计约 3 秒
public void register(User user) {
userDao.insert(user); // 50ms
smsService.sendWelcome(user.getPhone()); // 800ms
emailService.sendWelcome(user.getEmail()); // 1500ms
pushService.sendWelcome(user.getDeviceId()); // 700ms
return "注册成功"; // 用户等待 3 秒才看到结果
}
// MQ 异步方案:总计约 100ms
public void register(User user) {
userDao.insert(user); // 50ms
producer.send("user_registered", user); // 30ms
return "注册成功"; // 用户几乎立即看到结果
// 短信/邮件/推送异步执行,不影响用户体验
}
场景三:流量削峰
业务背景:秒杀活动开始瞬间,每秒 10 万请求,数据库只能承受每秒 1 万。
java
// 秒杀接口 — 流量先进 MQ 缓冲
@PostMapping("/seckill")
public Result seckill(Long userId, Long productId) {
// 1. 前置校验(轻量)
if (stockService.hasStock(productId)) {
// 2. 秒杀请求写入 MQ,立即返回"排队中"
producer.send("seckill_request", new SeckillMessage(userId, productId));
return Result.pending("排队中,请稍后...");
}
return Result.fail("已售罄");
}
// 消费者 — 按数据库承受能力平滑消费
@MessageListener(topic = "seckill_request")
public void processSeckill(SeckillMessage msg) {
// 每次只消费 5000 条/秒(数据库的承受能力)
orderService.createSeckillOrder(msg.getUserId(), msg.getProductId());
}
效果对比:
| 指标 | 无 MQ | 有 MQ |
|---|---|---|
| 瞬间请求量 | 10万/秒 | 10万/秒写入MQ |
| 数据库压力 | 10万/秒 → 直接崩溃 | 5000/秒 → 平稳运行 |
| 用户体验 | 大量超时/500 | 统一提示"排队中",异步告知结果 |
场景四:分布式事务最终一致性
业务背景:下单扣库存,订单系统和库存系统分属不同数据库。
java
// 使用 RocketMQ 事务消息实现最终一致性
public void createOrderWithTransaction(Order order) {
// 1. 发送半消息(prepare 阶段)
TransactionSendResult result = producer.sendMessageInTransaction(
"order_tx", order, null
);
}
// 2. RocketMQ 回调执行本地事务
@TransactionListener
public class OrderTransactionListener implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
Order order = msg.toOrder();
try {
orderDao.insert(order); // 本地事务
return LocalTransactionState.COMMIT; // 提交 → MQ 投递消息
} catch (Exception e) {
return LocalTransactionState.ROLLBACK; // 回滚 → MQ 删除消息
}
}
@Override
public LocalTransactionState checkLocalTransaction(Message msg) {
// 事务回查(网络超时等异常情况)
return orderDao.exists(msg.getOrderId())
? LocalTransactionState.COMMIT
: LocalTransactionState.ROLLBACK;
}
}
// 3. 库存服务消费消息(最终一致)
@MessageListener(topic = "order_tx")
public void deductStock(Order order) {
stockService.deduct(order.getProductId(), order.getQuantity());
}
场景五:数据同步
业务背景:MySQL 数据同步到 Elasticsearch 做全文搜索。
java
// 业务服务 — 数据变更时发送消息
public void updateProduct(Product product) {
productDao.update(product);
producer.send("product_change", new SyncMessage("UPDATE", product));
}
// 同步服务 — 消费消息写入 ES
@MessageListener(topic = "product_change")
public void syncToEs(Product product) {
if ("UPDATE".equals(product.getAction())) {
esClient.update("product_index", product.toJson());
} else if ("DELETE".equals(product.getAction())) {
esClient.delete("product_index", product.getId());
}
}
五、使用 MQ 的注意事项
1. 引入 MQ 带来的新问题
| 问题 | 说明 |
|---|---|
| 系统复杂度上升 | 需要维护 MQ 集群,排查链路更长 |
| 消息丢失风险 | 需配置持久化、ACK 等机制保证 |
| 消息重复风险 | 生产者重试导致重复消息,消费端需幂等 |
| 顺序问题 | 多分区/多消费者场景需要额外设计 |
| 运维成本 | 需要监控堆积、延迟、节点健康等 |
2. 什么时候不适合用 MQ
- 调用方需要同步实时结果(如查询操作)
- 业务链路需要强事务一致性(如转账扣款)
- 系统规模小、服务数量少(引入 MQ 反而增加复杂度)
3. 选型初步考虑
| 需求场景 | 推荐方案 |
|---|---|
| 复杂路由、灵活交换 | RabbitMQ |
| 高可用、事务消息、国内业务 | RocketMQ |
| 大数据、日志、流处理 | Kafka |
| 轻量级、小项目 | Redis List / 内存队列 |
总结:消息队列的核心价值在于解耦、异步、削峰,但引入的同时也带来了一致性、顺序、幂等等新挑战。生产环境中通常结合具体业务场景选择合适的一款 MQ,并做好监控和容错。