面试题:RocketMQ 的消息消费有几种方式?各自适用于什么场景?

一、消息消费方式总览

RocketMQ 提供三种核心消费模式,满足不同业务场景需求:

消费方式实现类并发性顺序保证重试机制适用场景
并发消费(默认)MessageListenerConcurrently支持普通业务处理
顺序消费MessageListenerOrderly队列级有限支持订单状态流转
推/拉模式PullConsumer/PushConsumer可配置可配置可配置特殊定制需求

二、并发消费(MessageListenerConcurrently)

1. 核心特性

consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {
        // 并行处理消息
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
  • 线程模型:线程池并发处理
  • 消息分配:同一队列的消息可能被不同线程处理
  • 性能:吞吐量最高(单机万级TPS)

2. 适用场景

  • 日志处理
  • 通知类消息
  • 数据ETL作业
  • 对消息顺序无严格要求的大流量场景

3. 配置优化

// 调整并发线程数(默认20)
consumer.setConsumeThreadMin(10);
consumer.setConsumeThreadMax(32);

// 设置批量消费大小(默认1)
consumer.setConsumeMessageBatchMaxSize(10);

三、顺序消费(MessageListenerOrderly)

1. 核心特性

consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
            ConsumeOrderlyContext context) {
        // 单线程顺序处理
        return ConsumeOrderlyStatus.SUCCESS;
    }
});
  • 线程模型:每个队列独占线程
  • 锁机制:队列级别分布式锁
  • 性能:吞吐量较低(约并发模式的1/10)

2. 适用场景

  • 订单状态变更(创建→支付→发货)
  • 库存扣减操作
  • 数据库binlog同步
  • 需要严格保证处理顺序的业务

3. 关键配置

// 设置顺序消费超时时间(默认60s)
consumer.setConsumeTimeout(120);

// 暂停队列时间(失败时)
context.setSuspendCurrentQueueTimeMillis(1000);

四、推拉模式选择

1. PushConsumer(推荐默认)

// 推模式实现(底层仍是长轮询)
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group");
  • 特点:Broker主动推送(实际为长轮询)
  • 优势:实时性高,开发简单
  • 劣势:流控机制较复杂

2. PullConsumer

// 拉模式示例
PullResult pullResult = consumer.pull(MessageQueue mq, String subExpression, 
    long offset, int maxNums);
  • 特点:消费者主动拉取
  • 优势:完全自主控制节奏
  • 劣势:需要自行管理offset

3. 模式对比

特性PushConsumerPullConsumer
实时性高(毫秒级)依赖拉取间隔
复杂度高(需管理offset)
流量控制由Consumer端控制完全自主控制
适用场景绝大多数场景特殊调度需求

五、特殊消费场景

1. 广播模式消费

consumer.setMessageModel(MessageModel.BROADCASTING);
  • 特点:全量消息推送所有实例
  • 场景:配置刷新、缓存同步

2. 定时/延迟消息消费

msg.setDelayTimeLevel(3);  // 设置延迟级别
  • 场景:订单超时关闭、预约提醒

3. 事务消息消费

TransactionMQProducer producer = new TransactionMQProducer("group");
  • 场景:分布式事务最终一致性

六、消费模式选型决策树

是否需要顺序保证?
├── 是 → 选择顺序消费(MessageListenerOrderly)
└── 否 → 是否需要高吞吐?
    ├── 是 → 选择并发消费(MessageListenerConcurrently)
    └── 否 → 是否有特殊控制需求?
        ├── 是 → 选择PullConsumer
        └── 否 → 使用默认PushConsumer

七、最佳实践建议

  1. 常规业务首选PushConsumer + MessageListenerConcurrently
  2. 顺序敏感业务PushConsumer + MessageListenerOrderly + 合理设计消息Key
  3. 性能优化要点
    // 并发消费优化组合
    consumer.setConsumeThreadMax(50);
    consumer.setConsumeMessageBatchMaxSize(32);
    consumer.setPullBatchSize(32);
  4. 异常处理
    • 并发消费:返回RECONSUME_LATER
    • 顺序消费:返回SUSPEND_CURRENT_QUEUE_A_MOMENT
  5. 监控指标
    # 查看消费进度
    sh mqadmin consumerProgress -n namesrv:9876
    
    # 监控线程堆积
    jstack <pid> | grep ConsumeMessageThread_

根据业务特点合理选择消费模式,可以显著提升系统可靠性和性能。建议新业务从并发消费开始,确有顺序需求再改用顺序消费,特殊场景才考虑PullConsumer。

THE END
点赞11 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片

    暂无评论内容