一、Producer 工作原理
1. 核心工作流程
graph TD
A[启动Producer] --> B[获取Topic路由信息]
B --> C[选择MessageQueue]
C --> D[网络传输]
D --> E[Broker处理]
E --> F[返回发送结果]
2. 关键组件与机制
(1) 路由管理
- 定时获取:每30秒从NameServer获取最新路由
- 本地缓存:
TopicPublishInfo
维护路由表 - 自动更新:检测到Broker变化时立即更新
(2) 队列选择策略
// 内置选择器示例
public interface MessageQueueSelector {
MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg);
}
- 默认策略:轮询选择队列
- 顺序消息:哈希取模选择固定队列
(3) 故障转移
- 自动重试其他Broker(当
retryAnotherBrokerWhenNotStoreOK=true
) - 规避故障Broker(基于响应时间)
(4) 发送模式
- 同步发送:阻塞等待Broker响应
- 异步发送:回调通知结果
- 单向发送:不等待响应
3. 消息处理流程
- 消息压缩:超过阈值(默认4K)自动压缩
- 消息校验:检查大小(默认最大4MB)和必填字段
- Hook处理:执行发送前拦截器
- 网络序列化:使用Remoting协议编码
二、Consumer 工作原理
1. 核心工作流程
graph TD
A[启动Consumer] --> B[获取Topic路由]
B --> C[队列分配]
C --> D[拉取消息]
D --> E[消费处理]
E --> F[提交Offset]
2. 关键组件与机制
(1) 队列分配策略
- 平均分配:
AllocateMessageQueueAveragely
- 环形分配:
AllocateMessageQueueAveragelyByCircle
- 一致性哈希:
AllocateMessageQueueConsistentHash
- 自定义策略:实现
AllocateMessageQueueStrategy
(2) 消息拉取
- 长轮询机制:默认拉取间隔20秒(Broker端挂起请求)
- 流控参数:
consumer.setPullBatchSize(32); // 每次拉取最大消息数 consumer.setPullInterval(0); // 拉取间隔(ms)
(3) 消费模式
- 并发消费:
MessageListenerConcurrently
- 顺序消费:
MessageListenerOrderly
- 推/拉模式:PushConsumer基于Pull实现
(4) Offset管理
- 本地模式:
LocalFileOffsetStore
- 远程模式:
RemoteBrokerOffsetStore
- 提交策略:定时提交(默认5秒)或批量提交
3. 负载均衡机制
(1) RebalanceService
- 每20秒执行一次重新分配
- 触发条件:
- 消费者数量变化
- Topic队列数量变化
- 网络分区恢复
(2) 分配流程
- 获取所有消费者ID
- 获取Topic所有队列
- 按策略重新分配
- 释放/申请队列锁(顺序消费)
三、核心交互流程
1. 生产者与Broker交互
sequenceDiagram
Producer->>Broker: 发送消息(SEND_MESSAGE)
Broker-->>Producer: 返回SendResult
Note right of Broker: 同步刷盘/异步刷盘
2. 消费者与Broker交互
sequenceDiagram
Consumer->>Broker: 拉取消息(PULL_MESSAGE)
Broker-->>Consumer: 返回消息或挂起
Consumer->>Broker: 提交Offset(UPDATE_CONSUMER_OFFSET)
四、关键参数配置
1. Producer核心参数
参数 | 默认值 | 说明 |
---|---|---|
sendMsgTimeout | 3000ms | 发送超时时间 |
compressMsgBodyOverHowmuch | 4096字节 | 压缩阈值 |
retryTimesWhenSendFailed | 2 | 同步发送重试次数 |
maxMessageSize | 4MB | 消息最大大小 |
2. Consumer核心参数
参数 | 默认值 | 说明 |
---|---|---|
consumeThreadMin | 20 | 最小消费线程数 |
consumeThreadMax | 64 | 最大消费线程数 |
pullBatchSize | 32 | 每次拉取消息数 |
persistConsumerOffsetInterval | 5000ms | Offset提交间隔 |
五、异常处理机制
1. Producer异常
- 网络异常:自动重试其他Broker
- Broker响应异常:根据错误码处理
- 客户端异常:校验失败直接抛出
2. Consumer异常
- 消费失败:返回
RECONSUME_LATER
- 重试机制:进入
%RETRY%
队列 - 死信队列:超过最大重试次数(默认16次)
六、性能优化建议
1. Producer优化
// 批量发送提升吞吐
producer.send(msgs);
// 启用压缩减少网络IO
producer.setCompressMsgBodyOverHowmuch(1024);
// 关闭VIP通道(跨机房时)
producer.setVipChannelEnabled(false);
2. Consumer优化
// 增大拉取批次
consumer.setPullBatchSize(64);
// 调整消费线程池
consumer.setConsumeThreadMax(128);
// 关闭自动提交(特殊场景)
consumer.setAutoCommit(false);
七、设计思想总结
- 生产者设计:
- 轻量级路由管理
- 智能故障转移
- 多模式发送适应不同场景
- 消费者设计:
- 动态负载均衡
- 灵活消费模式
- 可靠Offset管理
- 整体架构:
- 去中心化设计(依赖NameServer)
- 读写分离(主写从读)
- 最终一致性保证
理解这些核心原理有助于在实际开发中合理配置参数、优化性能,并快速排查生产环境问题。建议结合RocketMQ控制台实时监控各组件的运行状态。
THE END
暂无评论内容