面试题:RocketMQ 中如何实现消息的同步与异步发送?

RocketMQ 消息发送模式:同步与异步实现详解

一、消息发送模式对比

发送方式特点性能可靠性适用场景
同步发送阻塞等待Broker响应较低最高金融交易、重要状态变更
异步发送回调通知发送结果日志处理、通知类消息
单向发送不关心发送结果(不可靠)最高吞吐量优先的非关键业务

二、同步发送实现

1. 基础同步发送

DefaultMQProducer producer = new DefaultMQProducer("sync_producer_group");
producer.start();

Message msg = new Message("TopicTest", "TagA", "同步消息".getBytes());
SendResult sendResult = producer.send(msg); // 阻塞等待结果

System.out.println("发送结果:" + sendResult);
producer.shutdown();

2. 同步发送关键配置

// 设置发送超时时间(默认3000ms)
producer.setSendMsgTimeout(5000);

// 设置失败重试次数(默认2次)
producer.setRetryTimesWhenSendFailed(3);

// 开启VIP通道(减少网络跳数)
producer.setVipChannelEnabled(true);

3. 同步发送执行流程

  1. 客户端发送请求到Broker
  2. Broker处理并返回响应
  3. 客户端收到响应后继续执行
  4. 如果超时或失败,按配置重试

三、异步发送实现

1. 基础异步发送

DefaultMQProducer producer = new DefaultMQProducer("async_producer_group");
producer.start();

Message msg = new Message("TopicTest", "TagA", "异步消息".getBytes());

producer.send(msg, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        System.out.println("发送成功:" + sendResult);
    }

    @Override
    public void onException(Throwable e) {
        e.printStackTrace();
        // 添加重试或补偿逻辑
    }
});

// 注意:不要立即shutdown,等待回调完成

2. 异步发送关键配置

// 设置异步发送失败重试次数(默认2次)
producer.setRetryTimesWhenSendAsyncFailed(3);

// 设置回调线程池(默认64个线程)
producer.setCallbackExecutor(Executors.newFixedThreadPool(32));

// 设置异步发送超时(默认3000ms)
producer.setSendMsgTimeout(5000);

3. 异步发送执行流程

  1. 客户端发送请求到内存队列
  2. 立即返回不阻塞业务线程
  3. 后台线程异步处理发送
  4. 通过回调接口通知结果
  5. 失败时按配置自动重试

四、高级特性与最佳实践

1. 批量消息发送

List<Message> messages = new ArrayList<>();
messages.add(new Message(...));
messages.add(new Message(...));

// 同步批量发送
SendResult result = producer.send(messages);

// 异步批量发送
producer.send(messages, new SendCallback() {...});

2. 发送性能优化

// 合并小消息(默认4K)
producer.setCompressMsgBodyOverHowmuch(1024 * 4);

// 开启消息轨迹
producer.setEnableMsgTrace(true);

3. 生产环境建议配置

# 同步发送配置建议
sendMsgTimeout=5000
retryTimesWhenSendFailed=3
compressMsgBodyOverHowmuch=4096

# 异步发送配置建议
retryTimesWhenSendAsyncFailed=2
callbackExecutorThreads=32
maxMessageSize=4194304  # 4MB

五、异常处理方案

1. 同步发送异常处理

try {
    SendResult result = producer.send(msg);
} catch (MQClientException e) {
    // 客户端异常(参数错误等)
    log.error("客户端异常:", e);
} catch (RemotingException e) {
    // 网络异常
    log.error("网络异常:", e);
} catch (MQBrokerException e) {
    // Broker异常
    log.error("Broker异常[{}]:", e.getResponseCode(), e);
} catch (InterruptedException e) {
    // 中断异常
    log.error("发送中断:", e);
}

2. 异步发送异常处理

producer.send(msg, new SendCallback() {
    @Override
    public void onException(Throwable e) {
        if (e instanceof MQBrokerException) {
            // Broker返回的错误
        } else if (e instanceof RemotingException) {
            // 网络异常
        } else {
            // 其他异常
        }

        // 重要消息可加入重试队列
        retryQueue.add(msg);
    }
});

六、模式选型决策树

是否必须确认发送成功?
├── 是 → 是否需要低延迟?
    ├── 是 → 同步发送
    └── 否 → 异步发送
└── 否 → 单向发送

七、性能对比数据

模式单线程TPSCPU占用网络IO可靠性保证
同步发送2,000-5,000
异步发送10,000-50,000
单向发送50,000+

:测试环境为8C16G虚拟机,消息大小1KB,Broker与Producer同机房

根据业务场景合理选择发送模式,关键业务推荐同步发送,高吞吐场景使用异步发送,监控类数据可考虑单向发送。建议配合消息轨迹功能监控发送状态。

THE END
点赞11 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片

    暂无评论内容