高级功能篇
消息队列的高级功能扩展了其应用边界,从简单的异步通信演进为支撑复杂业务场景的基础设施。本篇重点介绍延时消息、消息过滤、事务消息、广播/集群消费等核心高级特性。
一、延时/定时消息
1.1 什么是延时消息
生产者发送消息后,消息不会立即投递给消费者,而是在指定的延迟时间后才投递。
发送时间 延迟时间 投递时间
│ │ │
▼ ▼ ▼
──────────────────●───────────────●─────▶
消息到达 Broker 等待延迟 到达消费者
1.2 RocketMQ 延时消息
java
/**
* RocketMQ 延时消息示例
* 等级:1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m,
* 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h
* 对应等级:1~18
*/
public class DelayMessageProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("delay_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
// 订单超时关闭 — 延迟 30 分钟
Message orderMsg = new Message("order_topic", "订单超时关闭".getBytes());
orderMsg.setDelayTimeLevel(14); // 等级 14 = 10 分钟
// 如果消息需要 30 分钟延迟 = 等级 16
orderMsg.setDelayTimeLevel(16); // 等级 16 = 30 分钟
producer.send(orderMsg);
log.info("延时消息发送成功,将在 30 分钟后投递");
producer.shutdown();
}
}
// 消费者 — 与普通消息一样的写法,只是到达时间晚了
@MessageListener(topic = "order_topic")
public void handleDelayMessage(MessageExt msg) {
// 计算实际延迟时间
long bornTime = msg.getBornTimestamp();
long storeTime = msg.getStoreTimestamp();
long delayMs = storeTime - bornTime;
log.info("消息延迟了 {}ms 才到达", delayMs);
// 业务处理:关闭超时未支付的订单
String orderId = new String(msg.getBody());
orderService.closeExpiredOrder(orderId);
}
1.3 RocketMQ 自定义延迟(开源版 >= 4.8.0)
java
/**
* RocketMQ 4.8.0+ 支持自定义延迟时间
* 不再局限于 18 个等级
*/
// Broker 配置开启自定义延迟
// messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
// 自定义延迟消息(开源版 >= 5.x)
Message msg = new Message("topic", "body".getBytes());
msg.setDeliverTimeMs(System.currentTimeMillis() + 300000); // 延迟 5 分钟
producer.send(msg);
1.4 Kafka 延时消息(需要自行实现)
java
/**
* Kafka 原生不支持延时消息,通常通过以下方式实现
*/
// 方案一:时间轮 + 本地延迟队列
// 方案二:存储延迟时间在消息中,消费者判断是否可处理
// 方案三:Kafka 的基于时间戳的延时消费(借助 log 时间戳)
// 方案二:自定义延迟消息
public class KafkaDelayMessage {
public static void main(String[] args) {
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 消息中携带期望处理的时间戳
JsonObject message = new JsonObject();
message.addProperty("body", "订单超时关闭");
message.addProperty("expectedProcessTime",
System.currentTimeMillis() + 30 * 60 * 1000); // 30 分钟后处理
producer.send(new ProducerRecord<>("delay_topic", message.toString()));
producer.close();
}
}
// 消费者:检查时间戳,没到时间就暂停消费
public class KafkaDelayConsumer {
public static void main(String[] args) {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("delay_topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
JsonObject msg = JsonParser.parseString(record.value()).getAsJsonObject();
long expectedTime = msg.get("expectedProcessTime").getAsLong();
long delay = expectedTime - System.currentTimeMillis();
if (delay > 0) {
// 还没到处理时间,暂停消费并等待(注意不要提交偏移量)
Thread.sleep(Math.min(delay, 30000));
break; // 重新 poll,该消息还在
}
process(msg.get("body").getAsString());
consumer.commitSync();
}
}
}
}
1.5 实战:订单超时关闭
java
/**
* 电商订单超时关闭完整方案
*/
@Service
public class OrderTimeoutService {
@Autowired
private DefaultMQProducer producer;
/**
* 用户下单成功后,发送延时消息
*/
public void sendOrderTimeoutCheck(Order order) {
TimeoutMessage timeoutMsg = new TimeoutMessage(order.getOrderNo(), order.getUserId());
Message message = new Message(
"order_timeout_topic",
JSON.toJSONBytes(timeoutMsg)
);
// RocketMQ: level 16 = 30 分钟
message.setDelayTimeLevel(16);
try {
SendResult result = producer.send(message);
log.info("超时检查消息已发送: orderNo={}, msgId={}",
order.getOrderNo(), result.getMsgId());
} catch (Exception e) {
log.error("超时消息发送失败", e);
// 降级:直接查库判断(兜底方案)
}
}
}
/**
* 消费者 — 处理超时订单
*/
@Component
public class OrderTimeoutConsumer {
@Autowired
private OrderDao orderDao;
@Autowired
private StockService stockService;
@MessageListener(topic = "order_timeout_topic")
public ConsumeResult handleTimeout(MessageExt msg) {
TimeoutMessage timeoutMsg = JSON.parseObject(msg.getBody(), TimeoutMessage.class);
// 1. 查询订单状态
Order order = orderDao.selectByOrderNo(timeoutMsg.getOrderNo());
if (order == null) {
return ConsumeResult.SUCCESS; // 订单不存在,跳过
}
// 2. 只有待支付状态的订单才需要关闭
if (order.getStatus() != OrderStatus.WAIT_PAY) {
log.info("订单已支付,不需要关闭: {}", order.getOrderNo());
return ConsumeResult.SUCCESS;
}
// 3. 关闭订单(幂等)
try {
orderDao.closeOrder(order.getOrderNo());
// 4. 释放库存
stockService.releaseStock(order.getOrderNo());
log.info("超时订单已关闭: {}", order.getOrderNo());
return ConsumeResult.SUCCESS;
} catch (Exception e) {
log.error("关闭超时订单失败", e);
return ConsumeResult.RETRY; // 重试
}
}
}
延时消息场景总结:
| 场景 | 延迟时间 | 说明 |
|---|---|---|
| 订单超时关闭 | 30 分钟 | 最常见场景 |
| 秒杀提醒 | 5 分钟 | 开抢前提醒用户 |
| 定时任务 | 指定时间 | 报表生成、数据同步 |
| 重试间隔 | 递增延迟 | 失败重试策略 |
二、消息过滤
2.1 Tag 过滤(RocketMQ)
java
/**
* RocketMQ Tag 过滤 — Broker 端过滤,减少网络传输
*/
// 生产者:发送时指定 Tag
public class TagProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("tag_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
// 不同 Tag 的消息发送到同一个 Topic
Message orderCreated = new Message("order_topic", "created", "订单创建消息".getBytes());
Message orderPaid = new Message("order_topic", "paid", "订单支付消息".getBytes());
Message orderRefund = new Message("order_topic", "refund", "订单退款消息".getBytes());
producer.send(orderCreated);
producer.send(orderPaid);
producer.send(orderRefund);
producer.shutdown();
}
}
// 消费者A:只订阅 "created" tag
DefaultMQPushConsumer consumerA = new DefaultMQPushConsumer("order_group_a");
consumerA.subscribe("order_topic", "created");
// 效果:只收到订单创建消息
// 消费者B:订阅 "paid || refund" tag
DefaultMQPushConsumer consumerB = new DefaultMQPushConsumer("order_group_b");
consumerB.subscribe("order_topic", "paid || refund");
// 效果:只收到支付和退款消息
// 消费者C:订阅所有 tag
DefaultMQPushConsumer consumerC = new DefaultMQPushConsumer("order_group_c");
consumerC.subscribe("order_topic", "*");
// 效果:收到全部消息
2.2 SQL 表达式过滤(RocketMQ)
java
/**
* RocketMQ SQL 过滤 — 基于消息属性的灵活过滤
* 需要 Broker 开启配置:enablePropertyFilter=true
*/
// 生产者:发送消息时设置属性
public class SqlFilterProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("sql_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("order_topic", "订单数据".getBytes());
msg.putUserProperty("orderType", "vip"); // 订单类型
msg.putUserProperty("amount", "299.00"); // 金额
msg.putUserProperty("region", "china"); // 区域
msg.putUserProperty("isNewUser", "true"); // 是否新用户
producer.send(msg);
producer.shutdown();
}
}
// 消费者:使用 SQL 表达式过滤
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sql_group");
consumer.setNamesrvAddr("localhost:9876");
// 只订阅 VIP 订单且金额大于 100 的消息
// 支持语法:AND, OR, NOT, >, <, =, IS NULL, IS NOT NULL, IN, BETWEEN, LIKE
consumer.subscribe("order_topic",
MessageSelector.bySql("(orderType = 'vip' AND amount > 100) OR region IN ('china', 'us')"));
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
log.info("收到符合条件的消息: {}",
JSON.toJSONString(msg.getProperties()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
2.3 Kafka 消息过滤(消费端过滤)
java
/**
* Kafka 不支持 Broker 端过滤,只能消费端自行过滤
*/
public class KafkaFilterConsumer {
public static void main(String[] args) {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("order_topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
JsonObject msg = JsonParser.parseString(record.value()).getAsJsonObject();
String orderType = msg.get("orderType").getAsString();
double amount = msg.get("amount").getAsDouble();
// 消费端过滤(不满足条件的消息也需要提交偏移量)
if (!"vip".equals(orderType) || amount <= 100) {
continue; // 跳过不关心的消息
}
// 处理 VIP 且金额 > 100 的订单
processVipOrder(msg);
}
consumer.commitSync();
}
}
}
过滤方式对比:
| 方式 | 过滤位置 | 性能 | 灵活性 | 支持中间件 |
|---|---|---|---|---|
| Tag 过滤 | Broker 端 | ⭐⭐⭐⭐⭐ | 低(精确匹配) | RocketMQ |
| SQL 过滤 | Broker 端 | ⭐⭐⭐⭐ | 高(条件表达式) | RocketMQ |
| 消费端过滤 | 消费端 | ⭐⭐⭐ | 最高 | 所有 MQ |
| 分区路由 | 生产端 | ⭐⭐⭐⭐ | 中 | Kafka |
三、事务消息
3.1 什么是事务消息
背景:在分布式系统中,保证"本地数据库操作"和"发送消息"两个操作的原子性。
RocketMQ 事务消息流程:
1. 生产者发送半消息(Half Message)→ Broker
2. Broker 存储半消息(对消费者不可见)
3. 生产者执行本地事务
4. 生产者向 Broker 提交 Commit/Rollback
5A. Commit → Broker 投递消息给消费者
5B. Rollback → Broker 删除半消息
6. 如果第 4 步因网络超时未响应 → Broker 发起回查
7. 生产者回查本地事务状态 → 再次 Commit/Rollback
3.2 RocketMQ 事务消息实战
java
/**
* 场景:用户下单 → 扣减积分
* 保证"创建订单"和"发送消息"原子性
*/
// ========== 生产者:事务消息发送方 ==========
@Component
public class OrderTransactionProducer {
@Autowired
private OrderService orderService;
private DefaultMQProducer producer;
@PostConstruct
public void init() {
producer = new DefaultMQProducer("order_tx_group");
producer.setNamesrvAddr("localhost:9876");
// 设置回查次数(默认 15 次)
producer.setCheckRequestHoldMax(15);
producer.start();
}
/**
* 创建订单(使用事务消息)
*/
public void createOrderWithTx(Order order) {
Message message = new Message(
"order_tx_topic",
JSON.toJSONBytes(order)
);
// 发送事务消息
TransactionSendResult result = producer.sendMessageInTransaction(
message,
order // 传递给监听器的参数
);
log.info("事务消息发送结果: {}, msgId={}",
result.getLocalTransactionState(), result.getMsgId());
}
@Bean
public TransactionListener transactionListener() {
return new OrderTransactionListener(orderService);
}
}
// ========== 事务监听器:执行本地事务 + 回查 ==========
public class OrderTransactionListener implements TransactionListener {
private final OrderService orderService;
public OrderTransactionListener(OrderService orderService) {
this.orderService = orderService;
}
/**
* 执行本地事务
* 返回 COMMIT → MQ 投递消息
* 返回 ROLLBACK → MQ 丢弃消息
* 返回 UNKNOWN → MQ 等待回查
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
Order order = (Order) arg;
try {
// 本地事务:创建订单
orderService.createOrder(order);
log.info("本地事务执行成功: {}", order.getOrderNo());
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
log.error("本地事务执行失败: {}", order.getOrderNo(), e);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
/**
* 事务回查(当 executeLocalTransaction 返回 UNKNOWN
* 或网络超时导致 Broker 未收到 Commit/Rollback 时触发)
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Order order = JSON.parseObject(msg.getBody(), Order.class);
boolean exists = orderService.checkOrderExists(order.getOrderNo());
if (exists) {
log.info("回查确认订单存在: {}, 提交事务", order.getOrderNo());
return LocalTransactionState.COMMIT_MESSAGE;
} else {
log.warn("回查确认订单不存在: {}, 回滚事务", order.getOrderNo());
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
}
// ========== 消费者:收到消息说明本地事务已提交 ==========
@Component
public class OrderTxConsumer {
@Autowired
private PointsService pointsService;
@MessageListener(topic = "order_tx_topic")
public ConsumeResult handleOrderCreated(MessageExt msg) {
Order order = JSON.parseObject(msg.getBody(), Order.class);
// 消息到达此处,说明订单已成功创建
// 可以安全地执行积分扣减等后续操作
pointsService.deductPoints(order.getUserId(), order.getAmount());
log.info("积分扣减完成: userId={}, amount={}",
order.getUserId(), order.getAmount());
return ConsumeResult.SUCCESS;
}
}
// ========== 订单服务(本地事务) ==========
@Service
public class OrderService {
@Autowired
private OrderDao orderDao;
@Autowired
private AccountDao accountDao;
@Transactional
public void createOrder(Order order) {
// 插入订单
orderDao.insert(order);
// 扣减账户余额
accountDao.deductBalance(order.getUserId(), order.getAmount());
// 如果这里抛出异常,整个事务回滚
// MQ 收到 ROLLBACK,消息不会投递
}
public boolean checkOrderExists(String orderNo) {
return orderDao.selectByOrderNo(orderNo) != null;
}
}
3.3 事务消息 vs 本地消息表
| 对比维度 | 事务消息(RocketMQ) | 本地消息表 |
|---|---|---|
| 实现复杂度 | 低(中间件原生支持) | 高(需建表+定时任务) |
| 侵入性 | 低 | 高(业务库建表) |
| 一致性保障 | 强(半消息+回查) | 强(事务+定时补偿) |
| 性能损耗 | 中等(增加回查) | 低(本地事务) |
| 适用范围 | 可使用 RocketMQ 的项目 | 所有 MQ 中间件 |
| 两阶段 | 自动 | 手动实现 |
3.4 实现细节与注意事项
半消息生命周期
┌──────────────────┐
│ 半消息(PRE) │
│ 对消费者不可见 │
└────────┬─────────┘
│
┌──────────────┼──────────────┐
▼ ▼ ▼
COMMIT ROLLBACK UNKNOWN
│ │ │
▼ ▼ ▼
投递消息 删除消息 发起回查
消费者可见 │
▼
COMMIT / ROLLBACK
注意事项
java
/**
* 事务消息使用注意事项
*/
public class TransactionMsgNotes {
// 1. 事务消息不支持延时消息和批量消息
// ❌ 不支持:message.setDelayTimeLevel(x);
// ❌ 不支持:producer.send(Collection<Message> messages)
// 2. 回查次数有限制(默认 15 次)
// 超过次数仍未确定 → 消息丢失
// 可配置:
producer.setCheckRequestHoldMax(15);
// 回查间隔默认 60 秒
// 3. 半消息不存储消费队列(无法直接消费)
// 4. 事务消息的生产者 ID 不能与其他类型消息共用
// 5. Broker 需要开启事务支持(默认开启)
// 6. 建议回查逻辑是幂等的(可能被多次调用)
}
四、广播消费与集群消费
4.1 集群消费(Clustering)
默认模式,同一消费组内的消费者负载均衡消费,一条消息只被一个消费者处理。
┌──────────────────┐
│ Topic │
│ message 1~100 │
└────────┬─────────┘
│
┌─────────────┼─────────────┐
▼ ▼ ▼
┌────────┐ ┌────────┐ ┌────────┐
│消费组内 │ │消费组内 │ │消费组内 │
│实例 A │ │实例 B │ │实例 C │
│msg 1~33 │ │msg 34~66│ │msg 67~100│
└────────┘ └────────┘ └────────┘
java
// RocketMQ 集群消费(默认)
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cluster_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("topic", "*");
consumer.setMessageModel(MessageModel.CLUSTERING); // 默认就是集群模式
// 效果:100 条消息,3 个实例均分,每条消息只被消费一次
4.2 广播消费(Broadcasting)
同一消费组内的所有消费者,各自收到全部消息。
java
/**
* 广播消费 — 每个消费者独立收到全量消息
*/
// RocketMQ 广播消费
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("broadcast_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("topic", "*");
consumer.setMessageModel(MessageModel.BROADCASTING); // 设为广播模式
// 效果:100 条消息,所有实例都收到 100 条
// ⚠️ 广播消费注意事项:
// 1. 广播模式不维护消费进度(消费者重启后会重新消费)
// 2. 广播模式下消息量 = 消息数 × 消费者数
// 3. 不适合消息量大的场景
4.3 场景对比
java
/**
* 场景一:配置同步(广播消费)
*/
// 所有服务节点都需要收到配置变更通知
@MessageListener(topic = "config_change")
public void handleConfigChange(ConfigMessage msg) {
// 每个节点独立更新本地配置
localConfig.refresh(msg.getConfigKey(), msg.getConfigValue());
}
/**
* 场景二:订单处理(集群消费)
*/
// 订单只需要被一个 worker 处理
@MessageListener(topic = "order_topic")
public void handleOrder(OrderMessage msg) {
// 集群模式下,每条订单只被一个实例处理
orderProcessor.process(msg);
}
五、消息优先级
5.1 RocketMQ 优先级方案
RocketMQ 原生不直接支持优先级,常通过以下方式实现:
java
/**
* 方案一:不同优先级用不同 Topic
*/
// 高优先级 Topic:urgent_topic
// 普通优先级 Topic:normal_topic
// 低优先级 Topic:low_topic
// 高优先级消费者优先消费 urgent_topic
// 消费完后再消费 normal_topic
/**
* 方案二:同一 Topic,消费者自行排序
*/
public class PriorityConsumer {
// 缓存消息,按优先级排序处理
private final PriorityQueue<MessageExt> priorityQueue =
new PriorityQueue<>((a, b) -> {
int pa = Integer.parseInt(a.getProperty("priority"));
int pb = Integer.parseInt(b.getProperty("priority"));
return Integer.compare(pb, pa); // 高优先级优先
});
@MessageListener(topic = "mixed_topic")
public void consume(MessageExt msg) {
priorityQueue.offer(msg);
processQueue();
}
private void processQueue() {
while (!priorityQueue.isEmpty()) {
MessageExt msg = priorityQueue.poll();
// 处理消息
}
}
}
5.2 RabbitMQ 优先级队列
java
/**
* RabbitMQ 原生支持优先级队列
*/
// 声明优先级队列
Map<String, Object> args = new HashMap<>();
args.put("x-max-priority", 10); // 优先级范围 0-10
channel.queueDeclare("priority_queue", true, false, false, args);
// 发送高优先级消息
AMQP.BasicProperties highPriorityProps = new AMQP.BasicProperties.Builder()
.priority(10) // 最高优先级
.build();
channel.basicPublish("", "priority_queue", highPriorityProps, "紧急消息".getBytes());
// 发送普通消息
AMQP.BasicProperties normalProps = new AMQP.BasicProperties.Builder()
.priority(1) // 普通优先级
.build();
channel.basicPublish("", "priority_queue", normalProps, "普通消息".getBytes());
六、实战:综合高级功能应用
业务场景:电商订单全生命周期
java
/**
* 电商订单全流程 — 综合运用多种高级功能
*/
@Component
public class OrderFullLifecycleService {
@Autowired
private DefaultMQProducer producer;
/**
* 1. 用户下单 → 事务消息
*/
public void placeOrder(Order order) {
// 使用事务消息保证订单创建和消息发送原子性
Message msg = new Message("order_tx", JSON.toJSONBytes(order));
producer.sendMessageInTransaction(msg, order);
}
/**
* 2. 发送延时消息 — 30 分钟未支付自动关闭
*/
public void scheduleTimeoutCheck(Order order) {
Message msg = new Message("order_delay", JSON.toJSONBytes(order));
msg.setDelayTimeLevel(16); // 30 分钟
producer.send(msg);
}
/**
* 3. 支付成功通知 — 不同 tag 供不同消费者订阅
*/
public void notifyPayment(Order order) {
Message msg = new Message("order_event", "paid", JSON.toJSONBytes(order));
producer.send(msg);
}
/**
* 4. 退款处理 — 高优先级
*/
public void refund(Order order) {
Message msg = new Message("order_refund", JSON.toJSONBytes(order));
// 退款消息使用独立 Topic,消费者较少,保证高优处理
producer.send(msg);
}
/**
* 5. 订单完成后的异步任务 — 广播消费
*/
public void notifyAfterSale(Order order) {
Message msg = new Message("order_finished_tag", JSON.toJSONBytes(order));
msg.putUserProperty("orderType", order.getOrderType());
producer.send(msg);
}
}
消费者配置总览
java
@Configuration
public class OrderConsumerConfig {
// 订单创建 — 集群消费(默认)
@Bean
public DefaultMQPushConsumer orderTxConsumer() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_tx_group");
consumer.subscribe("order_tx", "*");
consumer.setMessageModel(MessageModel.CLUSTERING);
return consumer;
}
// 超时检查 — 集群模式
@Bean
public DefaultMQPushConsumer orderDelayConsumer() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_delay_group");
consumer.subscribe("order_delay", "*");
consumer.setMessageModel(MessageModel.CLUSTERING);
return consumer;
}
// 订单事件 — 按 Tag 过滤
@Bean
public DefaultMQPushConsumer orderEventConsumer() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_event_group");
consumer.subscribe("order_event", "paid || refund");
return consumer;
}
// 售后通知 — 广播模式(所有节点都收到)
@Bean
public DefaultMQPushConsumer orderBroadcastConsumer() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_broadcast_group");
consumer.subscribe("order_finished_tag", "*");
consumer.setMessageModel(MessageModel.BROADCASTING);
return consumer;
}
}
总结:高级功能在实际项目中非常实用:
- 延时消息:处理超时、定时任务
- 消息过滤:减少不必要的数据传输
- 事务消息:保证分布式事务最终一致性,是 RocketMQ 的核心亮点
- 集群/广播消费:灵活选择消息投递模式
面试中重点关注事务消息的实现原理(半消息+回查机制),以及延时消息的应用场景。