RocketMQ 的消息持久化主要通过 CommitLog 和 ConsumeQueue 实现,以下是配置方法和相关机制:
1. 核心持久化组件
- CommitLog:存储消息内容的物理文件,顺序写入
- ConsumeQueue:逻辑队列,存储消息在CommitLog中的位置索引
- IndexFile:消息索引文件,支持按Key或时间区间查询
2. 主要配置参数
2.1 Broker 端配置(broker.conf)
# 存储路径配置
storePathRootDir=/home/rocketmq/store
storePathCommitLog=/home/rocketmq/store/commitlog
# CommitLog 文件大小 (默认1GB)
mapedFileSizeCommitLog=1073741824
# 刷盘策略
## 同步刷盘 (最安全但性能低)
flushDiskType=SYNC_FLUSH
## 异步刷盘 (默认,性能高)
flushDiskType=ASYNC_FLUSH
# 刷盘频率 (异步刷盘时生效,单位ms)
flushIntervalCommitLog=500
# 文件保留时间 (小时)
fileReservedTime=72
# 磁盘空间警戒线 (当剩余空间低于该值时,会拒绝写入)
diskMaxUsedSpaceRatio=75
2.2 存储层优化配置
# 启用 transientStorePool (堆外内存缓冲,提高写入性能)
transientStorePoolEnable=true
# 堆外内存池大小 (默认5个缓冲区)
transientStorePoolSize=5
# 每个缓冲区大小 (默认1M=CommitLog文件大小/映射文件大小)
mappedFileSizeCommitLog=1073741824
3. 持久化级别选择
3.1 同步刷盘 (最可靠)
flushDiskType=SYNC_FLUSH
- 特点:消息写入物理磁盘后才返回ACK
- 性能影响:吞吐量降低约10倍
- 适用场景:金融交易等对可靠性要求极高的场景
3.2 异步刷盘 (默认)
flushDiskType=ASYNC_FLUSH
- 特点:消息写入Page Cache后即返回ACK,异步刷盘
- 性能:吞吐量高
- 风险:机器宕机可能丢失部分数据
4. 生产端持久化保证
// 发送同步消息 (等待存储完成响应)
SendResult result = producer.send(msg);
// 发送异步消息 (通过回调确认存储状态)
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 消息已持久化
}
@Override
public void onException(Throwable e) {
// 持久化失败
}
});
5. 消费者端确认机制
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
// 处理消息...
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 确认消费成功
}
});
6. 注意事项
- 多磁盘配置:建议将CommitLog和ConsumeQueue放在不同磁盘
- 文件清理策略:
- 默认保留3天(
fileReservedTime=72
) - 可通过手动执行
cleanlog.sh
脚本清理
- 默认保留3天(
- 性能监控指标:
putMessageDistributeTime
:消息存储耗时flushDiskTimes
:刷盘次数dispatchBehindBytes
:未分发字节数
- 灾难恢复:
- 启用
dleger
模式实现多副本 - 定期备份存储目录
- 启用
RocketMQ的持久化配置需要在可靠性和性能之间取得平衡,生产环境建议根据业务需求进行压测后确定最佳配置。
THE END
暂无评论内容