一、事务消息基本概念
事务消息是 RocketMQ 提供的一种特殊消息类型,用于解决分布式系统中的事务一致性问题。它通过”半消息”机制实现分布式事务的最终一致性。
核心特点:
- 两阶段提交:准备阶段和确认阶段
- 状态回查:Broker主动检查事务状态
- 最终一致性:不保证强一致性,但保证最终一致
二、事务消息实现流程
1. 事务消息生命周期
- 发送半消息:消息对消费者不可见
- 执行本地事务:业务系统执行本地数据库操作
- 提交/回滚事务:根据本地事务结果确认消息
- 状态回查(可选):解决超时未确认的情况
2. 核心代码实现
// 1. 创建事务生产者
TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");
producer.setNamesrvAddr("namesrv:9876");
// 2. 设置事务监听器
producer.setTransactionListener(new TransactionListener() {
// 执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 执行本地数据库事务
boolean success = doBusinessTransaction();
return success ? LocalTransactionState.COMMIT_MESSAGE :
LocalTransactionState.ROLLBACK_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.UNKNOW;
}
}
// 事务状态回查
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
return checkTransactionStatus(msg.getTransactionId()) ?
LocalTransactionState.COMMIT_MESSAGE :
LocalTransactionState.ROLLBACK_MESSAGE;
}
});
// 3. 发送事务消息
Message msg = new Message("transaction_topic", "tag", "KEY", "body".getBytes());
TransactionSendResult result = producer.sendMessageInTransaction(msg, null);
三、分布式事务实现方案
1. 基于事务消息的最终一致性方案
适用场景:跨系统数据一致性,如订单创建后通知库存系统
实现步骤:
- 订单系统发送半消息
- 创建订单(本地事务)
- 根据订单创建结果提交/回滚消息
- 库存系统消费消息执行库存扣减
2. 事务消息+本地事务表方案
增强可靠性方案:
- 在业务数据库中创建事务记录表
- 发送消息和记录事务状态在同一个本地事务中
- 定时任务补偿未完成的事务
CREATE TABLE transaction_log (
id VARCHAR(64) PRIMARY KEY,
business_id VARCHAR(64),
status TINYINT,
create_time DATETIME
);
3. 最大努力通知型事务
实现要点:
- 消息队列持久化消息
- 消费者实现幂等处理
- 设置合理的重试策略
四、关键配置参数
# 事务消息检查间隔(默认60秒)
transactionCheckInterval=60000
# 事务超时时间(默认6分钟)
transactionTimeout=360000
# 最大检查次数(默认15次)
transactionCheckMax=15
五、注意事项
- 幂等设计:消费者必须实现幂等处理
- 状态回查:必须可靠实现checkLocalTransaction方法
- 性能影响:事务消息吞吐量约为普通消息的1/10
- 异常处理:
- 网络异常时可能产生”悬而未决”的消息
- 需要监控和人工干预机制
六、与其他方案的对比
方案 | 一致性 | 性能 | 复杂度 | 适用场景 |
---|---|---|---|---|
事务消息 | 最终一致 | 中 | 中 | 跨系统事务 |
TCC | 强一致 | 低 | 高 | 金融交易 |
本地消息表 | 最终一致 | 中 | 中 | 数据库+MQ组合 |
SAGA | 最终一致 | 高 | 高 | 长事务 |
RocketMQ事务消息为分布式系统提供了一种平衡性能与一致性的解决方案,适合大多数需要最终一致性的业务场景。
THE END
暂无评论内容