← 返回 MQ 列表

高级功能篇

高级功能篇

消息队列的高级功能扩展了其应用边界,从简单的异步通信演进为支撑复杂业务场景的基础设施。本篇重点介绍延时消息、消息过滤、事务消息、广播/集群消费等核心高级特性。


一、延时/定时消息

1.1 什么是延时消息

生产者发送消息后,消息不会立即投递给消费者,而是在指定的延迟时间后才投递。

发送时间           延迟时间         投递时间
   │                  │               │
   ▼                  ▼               ▼
   ──────────────────●───────────────●─────▶
   消息到达 Broker   等待延迟        到达消费者

1.2 RocketMQ 延时消息

java
/**
 * RocketMQ 延时消息示例
 * 等级:1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m,
 *       7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h
 * 对应等级:1~18
 */
public class DelayMessageProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("delay_group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        // 订单超时关闭 — 延迟 30 分钟
        Message orderMsg = new Message("order_topic", "订单超时关闭".getBytes());
        orderMsg.setDelayTimeLevel(14);  // 等级 14 = 10 分钟
        // 如果消息需要 30 分钟延迟 = 等级 16
        orderMsg.setDelayTimeLevel(16);  // 等级 16 = 30 分钟
        producer.send(orderMsg);

        log.info("延时消息发送成功,将在 30 分钟后投递");
        producer.shutdown();
    }
}

// 消费者 — 与普通消息一样的写法,只是到达时间晚了
@MessageListener(topic = "order_topic")
public void handleDelayMessage(MessageExt msg) {
    // 计算实际延迟时间
    long bornTime = msg.getBornTimestamp();
    long storeTime = msg.getStoreTimestamp();
    long delayMs = storeTime - bornTime;
    log.info("消息延迟了 {}ms 才到达", delayMs);

    // 业务处理:关闭超时未支付的订单
    String orderId = new String(msg.getBody());
    orderService.closeExpiredOrder(orderId);
}

1.3 RocketMQ 自定义延迟(开源版 >= 4.8.0)

java
/**
 * RocketMQ 4.8.0+ 支持自定义延迟时间
 * 不再局限于 18 个等级
 */
// Broker 配置开启自定义延迟
// messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

// 自定义延迟消息(开源版 >= 5.x)
Message msg = new Message("topic", "body".getBytes());
msg.setDeliverTimeMs(System.currentTimeMillis() + 300000); // 延迟 5 分钟
producer.send(msg);

1.4 Kafka 延时消息(需要自行实现)

java
/**
 * Kafka 原生不支持延时消息,通常通过以下方式实现
 */
// 方案一:时间轮 + 本地延迟队列
// 方案二:存储延迟时间在消息中,消费者判断是否可处理
// 方案三:Kafka 的基于时间戳的延时消费(借助 log 时间戳)

// 方案二:自定义延迟消息
public class KafkaDelayMessage {
    public static void main(String[] args) {
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // 消息中携带期望处理的时间戳
        JsonObject message = new JsonObject();
        message.addProperty("body", "订单超时关闭");
        message.addProperty("expectedProcessTime",
            System.currentTimeMillis() + 30 * 60 * 1000); // 30 分钟后处理

        producer.send(new ProducerRecord<>("delay_topic", message.toString()));
        producer.close();
    }
}

// 消费者:检查时间戳,没到时间就暂停消费
public class KafkaDelayConsumer {
    public static void main(String[] args) {
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("delay_topic"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                JsonObject msg = JsonParser.parseString(record.value()).getAsJsonObject();
                long expectedTime = msg.get("expectedProcessTime").getAsLong();
                long delay = expectedTime - System.currentTimeMillis();
                if (delay > 0) {
                    // 还没到处理时间,暂停消费并等待(注意不要提交偏移量)
                    Thread.sleep(Math.min(delay, 30000));
                    break; // 重新 poll,该消息还在
                }
                process(msg.get("body").getAsString());
                consumer.commitSync();
            }
        }
    }
}

1.5 实战:订单超时关闭

java
/**
 * 电商订单超时关闭完整方案
 */
@Service
public class OrderTimeoutService {

    @Autowired
    private DefaultMQProducer producer;

    /**
     * 用户下单成功后,发送延时消息
     */
    public void sendOrderTimeoutCheck(Order order) {
        TimeoutMessage timeoutMsg = new TimeoutMessage(order.getOrderNo(), order.getUserId());

        Message message = new Message(
            "order_timeout_topic",
            JSON.toJSONBytes(timeoutMsg)
        );
        // RocketMQ: level 16 = 30 分钟
        message.setDelayTimeLevel(16);

        try {
            SendResult result = producer.send(message);
            log.info("超时检查消息已发送: orderNo={}, msgId={}",
                order.getOrderNo(), result.getMsgId());
        } catch (Exception e) {
            log.error("超时消息发送失败", e);
            // 降级:直接查库判断(兜底方案)
        }
    }
}

/**
 * 消费者 — 处理超时订单
 */
@Component
public class OrderTimeoutConsumer {

    @Autowired
    private OrderDao orderDao;

    @Autowired
    private StockService stockService;

    @MessageListener(topic = "order_timeout_topic")
    public ConsumeResult handleTimeout(MessageExt msg) {
        TimeoutMessage timeoutMsg = JSON.parseObject(msg.getBody(), TimeoutMessage.class);

        // 1. 查询订单状态
        Order order = orderDao.selectByOrderNo(timeoutMsg.getOrderNo());
        if (order == null) {
            return ConsumeResult.SUCCESS; // 订单不存在,跳过
        }

        // 2. 只有待支付状态的订单才需要关闭
        if (order.getStatus() != OrderStatus.WAIT_PAY) {
            log.info("订单已支付,不需要关闭: {}", order.getOrderNo());
            return ConsumeResult.SUCCESS;
        }

        // 3. 关闭订单(幂等)
        try {
            orderDao.closeOrder(order.getOrderNo());
            // 4. 释放库存
            stockService.releaseStock(order.getOrderNo());
            log.info("超时订单已关闭: {}", order.getOrderNo());
            return ConsumeResult.SUCCESS;
        } catch (Exception e) {
            log.error("关闭超时订单失败", e);
            return ConsumeResult.RETRY; // 重试
        }
    }
}

延时消息场景总结

场景 延迟时间 说明
订单超时关闭 30 分钟 最常见场景
秒杀提醒 5 分钟 开抢前提醒用户
定时任务 指定时间 报表生成、数据同步
重试间隔 递增延迟 失败重试策略

二、消息过滤

2.1 Tag 过滤(RocketMQ)

java
/**
 * RocketMQ Tag 过滤 — Broker 端过滤,减少网络传输
 */
// 生产者:发送时指定 Tag
public class TagProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("tag_group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        // 不同 Tag 的消息发送到同一个 Topic
        Message orderCreated = new Message("order_topic", "created", "订单创建消息".getBytes());
        Message orderPaid = new Message("order_topic", "paid", "订单支付消息".getBytes());
        Message orderRefund = new Message("order_topic", "refund", "订单退款消息".getBytes());

        producer.send(orderCreated);
        producer.send(orderPaid);
        producer.send(orderRefund);

        producer.shutdown();
    }
}

// 消费者A:只订阅 "created" tag
DefaultMQPushConsumer consumerA = new DefaultMQPushConsumer("order_group_a");
consumerA.subscribe("order_topic", "created");
// 效果:只收到订单创建消息

// 消费者B:订阅 "paid || refund" tag 
DefaultMQPushConsumer consumerB = new DefaultMQPushConsumer("order_group_b");
consumerB.subscribe("order_topic", "paid || refund");
// 效果:只收到支付和退款消息

// 消费者C:订阅所有 tag
DefaultMQPushConsumer consumerC = new DefaultMQPushConsumer("order_group_c");
consumerC.subscribe("order_topic", "*");
// 效果:收到全部消息

2.2 SQL 表达式过滤(RocketMQ)

java
/**
 * RocketMQ SQL 过滤 — 基于消息属性的灵活过滤
 * 需要 Broker 开启配置:enablePropertyFilter=true
 */

// 生产者:发送消息时设置属性
public class SqlFilterProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("sql_group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        Message msg = new Message("order_topic", "订单数据".getBytes());
        msg.putUserProperty("orderType", "vip");       // 订单类型
        msg.putUserProperty("amount", "299.00");        // 金额
        msg.putUserProperty("region", "china");         // 区域
        msg.putUserProperty("isNewUser", "true");       // 是否新用户

        producer.send(msg);
        producer.shutdown();
    }
}

// 消费者:使用 SQL 表达式过滤
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sql_group");
consumer.setNamesrvAddr("localhost:9876");

// 只订阅 VIP 订单且金额大于 100 的消息
// 支持语法:AND, OR, NOT, >, <, =, IS NULL, IS NOT NULL, IN, BETWEEN, LIKE
consumer.subscribe("order_topic",
    MessageSelector.bySql("(orderType = 'vip' AND amount > 100) OR region IN ('china', 'us')"));

consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    for (MessageExt msg : msgs) {
        log.info("收到符合条件的消息: {}",
            JSON.toJSONString(msg.getProperties()));
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();

2.3 Kafka 消息过滤(消费端过滤)

java
/**
 * Kafka 不支持 Broker 端过滤,只能消费端自行过滤
 */
public class KafkaFilterConsumer {
    public static void main(String[] args) {
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("order_topic"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                JsonObject msg = JsonParser.parseString(record.value()).getAsJsonObject();
                String orderType = msg.get("orderType").getAsString();
                double amount = msg.get("amount").getAsDouble();

                // 消费端过滤(不满足条件的消息也需要提交偏移量)
                if (!"vip".equals(orderType) || amount <= 100) {
                    continue; // 跳过不关心的消息
                }

                // 处理 VIP 且金额 > 100 的订单
                processVipOrder(msg);
            }
            consumer.commitSync();
        }
    }
}

过滤方式对比

方式 过滤位置 性能 灵活性 支持中间件
Tag 过滤 Broker 端 ⭐⭐⭐⭐⭐ 低(精确匹配) RocketMQ
SQL 过滤 Broker 端 ⭐⭐⭐⭐ 高(条件表达式) RocketMQ
消费端过滤 消费端 ⭐⭐⭐ 最高 所有 MQ
分区路由 生产端 ⭐⭐⭐⭐ Kafka

三、事务消息

3.1 什么是事务消息

背景:在分布式系统中,保证"本地数据库操作"和"发送消息"两个操作的原子性。

RocketMQ 事务消息流程

1. 生产者发送半消息(Half Message)→ Broker
2. Broker 存储半消息(对消费者不可见)
3. 生产者执行本地事务
4. 生产者向 Broker 提交 Commit/Rollback
5A. Commit → Broker 投递消息给消费者
5B. Rollback → Broker 删除半消息
6. 如果第 4 步因网络超时未响应 → Broker 发起回查
7. 生产者回查本地事务状态 → 再次 Commit/Rollback

3.2 RocketMQ 事务消息实战

java
/**
 * 场景:用户下单 → 扣减积分
 * 保证"创建订单"和"发送消息"原子性
 */

// ========== 生产者:事务消息发送方 ==========
@Component
public class OrderTransactionProducer {

    @Autowired
    private OrderService orderService;

    private DefaultMQProducer producer;

    @PostConstruct
    public void init() {
        producer = new DefaultMQProducer("order_tx_group");
        producer.setNamesrvAddr("localhost:9876");
        // 设置回查次数(默认 15 次)
        producer.setCheckRequestHoldMax(15);
        producer.start();
    }

    /**
     * 创建订单(使用事务消息)
     */
    public void createOrderWithTx(Order order) {
        Message message = new Message(
            "order_tx_topic",
            JSON.toJSONBytes(order)
        );

        // 发送事务消息
        TransactionSendResult result = producer.sendMessageInTransaction(
            message,
            order  // 传递给监听器的参数
        );

        log.info("事务消息发送结果: {}, msgId={}",
            result.getLocalTransactionState(), result.getMsgId());
    }

    @Bean
    public TransactionListener transactionListener() {
        return new OrderTransactionListener(orderService);
    }
}

// ========== 事务监听器:执行本地事务 + 回查 ==========
public class OrderTransactionListener implements TransactionListener {

    private final OrderService orderService;

    public OrderTransactionListener(OrderService orderService) {
        this.orderService = orderService;
    }

    /**
     * 执行本地事务
     * 返回 COMMIT → MQ 投递消息
     * 返回 ROLLBACK → MQ 丢弃消息
     * 返回 UNKNOWN → MQ 等待回查
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        Order order = (Order) arg;
        try {
            // 本地事务:创建订单
            orderService.createOrder(order);
            log.info("本地事务执行成功: {}", order.getOrderNo());
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            log.error("本地事务执行失败: {}", order.getOrderNo(), e);
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }

    /**
     * 事务回查(当 executeLocalTransaction 返回 UNKNOWN
     * 或网络超时导致 Broker 未收到 Commit/Rollback 时触发)
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Order order = JSON.parseObject(msg.getBody(), Order.class);
        boolean exists = orderService.checkOrderExists(order.getOrderNo());

        if (exists) {
            log.info("回查确认订单存在: {}, 提交事务", order.getOrderNo());
            return LocalTransactionState.COMMIT_MESSAGE;
        } else {
            log.warn("回查确认订单不存在: {}, 回滚事务", order.getOrderNo());
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }
}

// ========== 消费者:收到消息说明本地事务已提交 ==========
@Component
public class OrderTxConsumer {

    @Autowired
    private PointsService pointsService;

    @MessageListener(topic = "order_tx_topic")
    public ConsumeResult handleOrderCreated(MessageExt msg) {
        Order order = JSON.parseObject(msg.getBody(), Order.class);

        // 消息到达此处,说明订单已成功创建
        // 可以安全地执行积分扣减等后续操作
        pointsService.deductPoints(order.getUserId(), order.getAmount());

        log.info("积分扣减完成: userId={}, amount={}",
            order.getUserId(), order.getAmount());
        return ConsumeResult.SUCCESS;
    }
}

// ========== 订单服务(本地事务) ==========
@Service
public class OrderService {

    @Autowired
    private OrderDao orderDao;

    @Autowired
    private AccountDao accountDao;

    @Transactional
    public void createOrder(Order order) {
        // 插入订单
        orderDao.insert(order);
        // 扣减账户余额
        accountDao.deductBalance(order.getUserId(), order.getAmount());
        // 如果这里抛出异常,整个事务回滚
        // MQ 收到 ROLLBACK,消息不会投递
    }

    public boolean checkOrderExists(String orderNo) {
        return orderDao.selectByOrderNo(orderNo) != null;
    }
}

3.3 事务消息 vs 本地消息表

对比维度 事务消息(RocketMQ) 本地消息表
实现复杂度 低(中间件原生支持) 高(需建表+定时任务)
侵入性 高(业务库建表)
一致性保障 强(半消息+回查) 强(事务+定时补偿)
性能损耗 中等(增加回查) 低(本地事务)
适用范围 可使用 RocketMQ 的项目 所有 MQ 中间件
两阶段 自动 手动实现

3.4 实现细节与注意事项

半消息生命周期

                    ┌──────────────────┐
                    │  半消息(PRE)    │
                    │ 对消费者不可见    │
                    └────────┬─────────┘
                             │
              ┌──────────────┼──────────────┐
              ▼              ▼              ▼
         COMMIT          ROLLBACK        UNKNOWN
              │              │              │
              ▼              ▼              ▼
        投递消息          删除消息       发起回查
        消费者可见                        │
                                          ▼
                                   COMMIT / ROLLBACK

注意事项

java
/**
 * 事务消息使用注意事项
 */
public class TransactionMsgNotes {

    // 1. 事务消息不支持延时消息和批量消息
    // ❌ 不支持:message.setDelayTimeLevel(x);
    // ❌ 不支持:producer.send(Collection<Message> messages)

    // 2. 回查次数有限制(默认 15 次)
    // 超过次数仍未确定 → 消息丢失
    // 可配置:
    producer.setCheckRequestHoldMax(15);
    // 回查间隔默认 60 秒

    // 3. 半消息不存储消费队列(无法直接消费)
    // 4. 事务消息的生产者 ID 不能与其他类型消息共用
    // 5. Broker 需要开启事务支持(默认开启)

    // 6. 建议回查逻辑是幂等的(可能被多次调用)
}

四、广播消费与集群消费

4.1 集群消费(Clustering)

默认模式,同一消费组内的消费者负载均衡消费,一条消息只被一个消费者处理。

          ┌──────────────────┐
          │     Topic        │
          │   message 1~100  │
          └────────┬─────────┘
                   │
     ┌─────────────┼─────────────┐
     ▼             ▼             ▼
 ┌────────┐  ┌────────┐  ┌────────┐
 │消费组内 │  │消费组内 │  │消费组内 │
 │实例 A   │  │实例 B   │  │实例 C   │
 │msg 1~33 │  │msg 34~66│  │msg 67~100│
 └────────┘  └────────┘  └────────┘
java
// RocketMQ 集群消费(默认)
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cluster_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("topic", "*");
consumer.setMessageModel(MessageModel.CLUSTERING); // 默认就是集群模式

// 效果:100 条消息,3 个实例均分,每条消息只被消费一次

4.2 广播消费(Broadcasting)

同一消费组内的所有消费者,各自收到全部消息。

java
/**
 * 广播消费 — 每个消费者独立收到全量消息
 */
// RocketMQ 广播消费
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("broadcast_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("topic", "*");
consumer.setMessageModel(MessageModel.BROADCASTING); // 设为广播模式

// 效果:100 条消息,所有实例都收到 100 条

// ⚠️ 广播消费注意事项:
// 1. 广播模式不维护消费进度(消费者重启后会重新消费)
// 2. 广播模式下消息量 = 消息数 × 消费者数
// 3. 不适合消息量大的场景

4.3 场景对比

java
/**
 * 场景一:配置同步(广播消费)
 */
// 所有服务节点都需要收到配置变更通知
@MessageListener(topic = "config_change")
public void handleConfigChange(ConfigMessage msg) {
    // 每个节点独立更新本地配置
    localConfig.refresh(msg.getConfigKey(), msg.getConfigValue());
}

/**
 * 场景二:订单处理(集群消费)
 */
// 订单只需要被一个 worker 处理
@MessageListener(topic = "order_topic")
public void handleOrder(OrderMessage msg) {
    // 集群模式下,每条订单只被一个实例处理
    orderProcessor.process(msg);
}

五、消息优先级

5.1 RocketMQ 优先级方案

RocketMQ 原生不直接支持优先级,常通过以下方式实现:

java
/**
 * 方案一:不同优先级用不同 Topic
 */
// 高优先级 Topic:urgent_topic
// 普通优先级 Topic:normal_topic
// 低优先级 Topic:low_topic

// 高优先级消费者优先消费 urgent_topic
// 消费完后再消费 normal_topic

/**
 * 方案二:同一 Topic,消费者自行排序
 */
public class PriorityConsumer {
    // 缓存消息,按优先级排序处理
    private final PriorityQueue<MessageExt> priorityQueue =
        new PriorityQueue<>((a, b) -> {
            int pa = Integer.parseInt(a.getProperty("priority"));
            int pb = Integer.parseInt(b.getProperty("priority"));
            return Integer.compare(pb, pa); // 高优先级优先
        });

    @MessageListener(topic = "mixed_topic")
    public void consume(MessageExt msg) {
        priorityQueue.offer(msg);
        processQueue();
    }

    private void processQueue() {
        while (!priorityQueue.isEmpty()) {
            MessageExt msg = priorityQueue.poll();
            // 处理消息
        }
    }
}

5.2 RabbitMQ 优先级队列

java
/**
 * RabbitMQ 原生支持优先级队列
 */
// 声明优先级队列
Map<String, Object> args = new HashMap<>();
args.put("x-max-priority", 10); // 优先级范围 0-10
channel.queueDeclare("priority_queue", true, false, false, args);

// 发送高优先级消息
AMQP.BasicProperties highPriorityProps = new AMQP.BasicProperties.Builder()
    .priority(10)  // 最高优先级
    .build();
channel.basicPublish("", "priority_queue", highPriorityProps, "紧急消息".getBytes());

// 发送普通消息
AMQP.BasicProperties normalProps = new AMQP.BasicProperties.Builder()
    .priority(1)   // 普通优先级
    .build();
channel.basicPublish("", "priority_queue", normalProps, "普通消息".getBytes());

六、实战:综合高级功能应用

业务场景:电商订单全生命周期

java
/**
 * 电商订单全流程 — 综合运用多种高级功能
 */
@Component
public class OrderFullLifecycleService {

    @Autowired
    private DefaultMQProducer producer;

    /**
     * 1. 用户下单 → 事务消息
     */
    public void placeOrder(Order order) {
        // 使用事务消息保证订单创建和消息发送原子性
        Message msg = new Message("order_tx", JSON.toJSONBytes(order));
        producer.sendMessageInTransaction(msg, order);
    }

    /**
     * 2. 发送延时消息 — 30 分钟未支付自动关闭
     */
    public void scheduleTimeoutCheck(Order order) {
        Message msg = new Message("order_delay", JSON.toJSONBytes(order));
        msg.setDelayTimeLevel(16); // 30 分钟
        producer.send(msg);
    }

    /**
     * 3. 支付成功通知 — 不同 tag 供不同消费者订阅
     */
    public void notifyPayment(Order order) {
        Message msg = new Message("order_event", "paid", JSON.toJSONBytes(order));
        producer.send(msg);
    }

    /**
     * 4. 退款处理 — 高优先级
     */
    public void refund(Order order) {
        Message msg = new Message("order_refund", JSON.toJSONBytes(order));
        // 退款消息使用独立 Topic,消费者较少,保证高优处理
        producer.send(msg);
    }

    /**
     * 5. 订单完成后的异步任务 — 广播消费
     */
    public void notifyAfterSale(Order order) {
        Message msg = new Message("order_finished_tag", JSON.toJSONBytes(order));
        msg.putUserProperty("orderType", order.getOrderType());
        producer.send(msg);
    }
}

消费者配置总览

java
@Configuration
public class OrderConsumerConfig {

    // 订单创建 — 集群消费(默认)
    @Bean
    public DefaultMQPushConsumer orderTxConsumer() {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_tx_group");
        consumer.subscribe("order_tx", "*");
        consumer.setMessageModel(MessageModel.CLUSTERING);
        return consumer;
    }

    // 超时检查 — 集群模式
    @Bean
    public DefaultMQPushConsumer orderDelayConsumer() {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_delay_group");
        consumer.subscribe("order_delay", "*");
        consumer.setMessageModel(MessageModel.CLUSTERING);
        return consumer;
    }

    // 订单事件 — 按 Tag 过滤
    @Bean
    public DefaultMQPushConsumer orderEventConsumer() {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_event_group");
        consumer.subscribe("order_event", "paid || refund");
        return consumer;
    }

    // 售后通知 — 广播模式(所有节点都收到)
    @Bean
    public DefaultMQPushConsumer orderBroadcastConsumer() {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_broadcast_group");
        consumer.subscribe("order_finished_tag", "*");
        consumer.setMessageModel(MessageModel.BROADCASTING);
        return consumer;
    }
}

总结:高级功能在实际项目中非常实用:

  • 延时消息:处理超时、定时任务
  • 消息过滤:减少不必要的数据传输
  • 事务消息:保证分布式事务最终一致性,是 RocketMQ 的核心亮点
  • 集群/广播消费:灵活选择消息投递模式

面试中重点关注事务消息的实现原理(半消息+回查机制),以及延时消息的应用场景