一、消息消费方式总览
RocketMQ 提供三种核心消费模式,满足不同业务场景需求:
消费方式 | 实现类 | 并发性 | 顺序保证 | 重试机制 | 适用场景 |
---|---|---|---|---|---|
并发消费(默认) | MessageListenerConcurrently | 高 | 无 | 支持 | 普通业务处理 |
顺序消费 | MessageListenerOrderly | 中 | 队列级 | 有限支持 | 订单状态流转 |
推/拉模式 | PullConsumer/PushConsumer | 可配置 | 可配置 | 可配置 | 特殊定制需求 |
二、并发消费(MessageListenerConcurrently)
1. 核心特性
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
// 并行处理消息
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
- 线程模型:线程池并发处理
- 消息分配:同一队列的消息可能被不同线程处理
- 性能:吞吐量最高(单机万级TPS)
2. 适用场景
- 日志处理
- 通知类消息
- 数据ETL作业
- 对消息顺序无严格要求的大流量场景
3. 配置优化
// 调整并发线程数(默认20)
consumer.setConsumeThreadMin(10);
consumer.setConsumeThreadMax(32);
// 设置批量消费大小(默认1)
consumer.setConsumeMessageBatchMaxSize(10);
三、顺序消费(MessageListenerOrderly)
1. 核心特性
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
// 单线程顺序处理
return ConsumeOrderlyStatus.SUCCESS;
}
});
- 线程模型:每个队列独占线程
- 锁机制:队列级别分布式锁
- 性能:吞吐量较低(约并发模式的1/10)
2. 适用场景
- 订单状态变更(创建→支付→发货)
- 库存扣减操作
- 数据库binlog同步
- 需要严格保证处理顺序的业务
3. 关键配置
// 设置顺序消费超时时间(默认60s)
consumer.setConsumeTimeout(120);
// 暂停队列时间(失败时)
context.setSuspendCurrentQueueTimeMillis(1000);
四、推拉模式选择
1. PushConsumer(推荐默认)
// 推模式实现(底层仍是长轮询)
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group");
- 特点:Broker主动推送(实际为长轮询)
- 优势:实时性高,开发简单
- 劣势:流控机制较复杂
2. PullConsumer
// 拉模式示例
PullResult pullResult = consumer.pull(MessageQueue mq, String subExpression,
long offset, int maxNums);
- 特点:消费者主动拉取
- 优势:完全自主控制节奏
- 劣势:需要自行管理offset
3. 模式对比
特性 | PushConsumer | PullConsumer |
---|---|---|
实时性 | 高(毫秒级) | 依赖拉取间隔 |
复杂度 | 低 | 高(需管理offset) |
流量控制 | 由Consumer端控制 | 完全自主控制 |
适用场景 | 绝大多数场景 | 特殊调度需求 |
五、特殊消费场景
1. 广播模式消费
consumer.setMessageModel(MessageModel.BROADCASTING);
- 特点:全量消息推送所有实例
- 场景:配置刷新、缓存同步
2. 定时/延迟消息消费
msg.setDelayTimeLevel(3); // 设置延迟级别
- 场景:订单超时关闭、预约提醒
3. 事务消息消费
TransactionMQProducer producer = new TransactionMQProducer("group");
- 场景:分布式事务最终一致性
六、消费模式选型决策树
是否需要顺序保证?
├── 是 → 选择顺序消费(MessageListenerOrderly)
└── 否 → 是否需要高吞吐?
├── 是 → 选择并发消费(MessageListenerConcurrently)
└── 否 → 是否有特殊控制需求?
├── 是 → 选择PullConsumer
└── 否 → 使用默认PushConsumer
七、最佳实践建议
- 常规业务首选:
PushConsumer + MessageListenerConcurrently
- 顺序敏感业务:
PushConsumer + MessageListenerOrderly
+ 合理设计消息Key - 性能优化要点:
// 并发消费优化组合 consumer.setConsumeThreadMax(50); consumer.setConsumeMessageBatchMaxSize(32); consumer.setPullBatchSize(32);
- 异常处理:
- 并发消费:返回
RECONSUME_LATER
- 顺序消费:返回
SUSPEND_CURRENT_QUEUE_A_MOMENT
- 并发消费:返回
- 监控指标:
# 查看消费进度 sh mqadmin consumerProgress -n namesrv:9876 # 监控线程堆积 jstack <pid> | grep ConsumeMessageThread_
根据业务特点合理选择消费模式,可以显著提升系统可靠性和性能。建议新业务从并发消费开始,确有顺序需求再改用顺序消费,特殊场景才考虑PullConsumer。
THE END
暂无评论内容