RocketMQ 消息发送模式:同步与异步实现详解
一、消息发送模式对比
发送方式 | 特点 | 性能 | 可靠性 | 适用场景 |
---|---|---|---|---|
同步发送 | 阻塞等待Broker响应 | 较低 | 最高 | 金融交易、重要状态变更 |
异步发送 | 回调通知发送结果 | 高 | 高 | 日志处理、通知类消息 |
单向发送 | 不关心发送结果(不可靠) | 最高 | 低 | 吞吐量优先的非关键业务 |
二、同步发送实现
1. 基础同步发送
DefaultMQProducer producer = new DefaultMQProducer("sync_producer_group");
producer.start();
Message msg = new Message("TopicTest", "TagA", "同步消息".getBytes());
SendResult sendResult = producer.send(msg); // 阻塞等待结果
System.out.println("发送结果:" + sendResult);
producer.shutdown();
2. 同步发送关键配置
// 设置发送超时时间(默认3000ms)
producer.setSendMsgTimeout(5000);
// 设置失败重试次数(默认2次)
producer.setRetryTimesWhenSendFailed(3);
// 开启VIP通道(减少网络跳数)
producer.setVipChannelEnabled(true);
3. 同步发送执行流程
- 客户端发送请求到Broker
- Broker处理并返回响应
- 客户端收到响应后继续执行
- 如果超时或失败,按配置重试
三、异步发送实现
1. 基础异步发送
DefaultMQProducer producer = new DefaultMQProducer("async_producer_group");
producer.start();
Message msg = new Message("TopicTest", "TagA", "异步消息".getBytes());
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功:" + sendResult);
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
// 添加重试或补偿逻辑
}
});
// 注意:不要立即shutdown,等待回调完成
2. 异步发送关键配置
// 设置异步发送失败重试次数(默认2次)
producer.setRetryTimesWhenSendAsyncFailed(3);
// 设置回调线程池(默认64个线程)
producer.setCallbackExecutor(Executors.newFixedThreadPool(32));
// 设置异步发送超时(默认3000ms)
producer.setSendMsgTimeout(5000);
3. 异步发送执行流程
- 客户端发送请求到内存队列
- 立即返回不阻塞业务线程
- 后台线程异步处理发送
- 通过回调接口通知结果
- 失败时按配置自动重试
四、高级特性与最佳实践
1. 批量消息发送
List<Message> messages = new ArrayList<>();
messages.add(new Message(...));
messages.add(new Message(...));
// 同步批量发送
SendResult result = producer.send(messages);
// 异步批量发送
producer.send(messages, new SendCallback() {...});
2. 发送性能优化
// 合并小消息(默认4K)
producer.setCompressMsgBodyOverHowmuch(1024 * 4);
// 开启消息轨迹
producer.setEnableMsgTrace(true);
3. 生产环境建议配置
# 同步发送配置建议
sendMsgTimeout=5000
retryTimesWhenSendFailed=3
compressMsgBodyOverHowmuch=4096
# 异步发送配置建议
retryTimesWhenSendAsyncFailed=2
callbackExecutorThreads=32
maxMessageSize=4194304 # 4MB
五、异常处理方案
1. 同步发送异常处理
try {
SendResult result = producer.send(msg);
} catch (MQClientException e) {
// 客户端异常(参数错误等)
log.error("客户端异常:", e);
} catch (RemotingException e) {
// 网络异常
log.error("网络异常:", e);
} catch (MQBrokerException e) {
// Broker异常
log.error("Broker异常[{}]:", e.getResponseCode(), e);
} catch (InterruptedException e) {
// 中断异常
log.error("发送中断:", e);
}
2. 异步发送异常处理
producer.send(msg, new SendCallback() {
@Override
public void onException(Throwable e) {
if (e instanceof MQBrokerException) {
// Broker返回的错误
} else if (e instanceof RemotingException) {
// 网络异常
} else {
// 其他异常
}
// 重要消息可加入重试队列
retryQueue.add(msg);
}
});
六、模式选型决策树
是否必须确认发送成功?
├── 是 → 是否需要低延迟?
├── 是 → 同步发送
└── 否 → 异步发送
└── 否 → 单向发送
七、性能对比数据
模式 | 单线程TPS | CPU占用 | 网络IO | 可靠性保证 |
---|---|---|---|---|
同步发送 | 2,000-5,000 | 高 | 高 | 强 |
异步发送 | 10,000-50,000 | 中 | 中 | 中 |
单向发送 | 50,000+ | 低 | 低 | 无 |
注:测试环境为8C16G虚拟机,消息大小1KB,Broker与Producer同机房
根据业务场景合理选择发送模式,关键业务推荐同步发送,高吞吐场景使用异步发送,监控类数据可考虑单向发送。建议配合消息轨迹功能监控发送状态。
THE END
暂无评论内容