RocketMQ 本身不提供消息去重功能,但可以通过以下机制实现消息的幂等消费,避免重复消费问题:
一、消息重复的根本原因
- 生产者重复发送
- 网络抖动导致生产者重试
- 事务消息回查机制可能造成重复
- Broker端重复
- 主从切换时可能产生重复
- 消息重投机制(RETRY topic)
- 消费者重复处理
- 消费成功后崩溃,未提交offset
- 消费者重启后从之前位点重新消费
二、解决方案(从简单到复杂)
1. 业务层幂等设计(推荐)
// 基于唯一业务ID的幂等处理示例
public class OrderService {
// 使用ConcurrentHashMap模拟去重存储(生产环境用Redis/DB)
private ConcurrentHashMap<String, Boolean> processedOrders = new ConcurrentHashMap<>();
public void processOrder(OrderMessage msg) {
// 1. 提取唯一业务ID
String orderId = msg.getOrderId();
// 2. 检查是否已处理
if (processedOrders.containsKey(orderId)) {
log.warn("重复订单: {}", orderId);
return; // 已处理则直接返回
}
// 3. 处理核心业务逻辑
try {
createOrder(msg);
// 4. 记录处理状态(必须先完成业务操作再记录)
processedOrders.put(orderId, true);
} catch (Exception e) {
// 失败时需清除标记(根据业务需求决定)
processedOrders.remove(orderId);
throw e;
}
}
}
实现要点:
- 使用业务唯一标识(订单ID、支付流水号等)
- 先执行业务操作再更新状态(避免状态更新成功但业务失败)
- 状态存储需要持久化(推荐Redis或数据库)
2. 消息唯一ID去重
// 基于MessageId的去重(适用于短时间窗口)
public class DedupConsumer {
private Set<String> processedMsgIds = Collections.newSetFromMap(new ConcurrentHashMap<>());
public void consume(MessageExt msg) {
String msgId = msg.getMsgId();
if (processedMsgIds.contains(msgId)) {
return;
}
// 处理业务...
processedMsgIds.add(msgId);
// 定时清理旧消息ID(避免内存溢出)
}
}
限制:
- 只适用于短时间内去重
- 集群环境下需要共享存储
3. 事务表+本地事务(最可靠)
CREATE TABLE message_dedup (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
msg_id VARCHAR(64) UNIQUE,
business_id VARCHAR(64),
status TINYINT,
create_time TIMESTAMP
);
@Transactional
public void processWithTransaction(MessageExt msg) {
// 1. 检查去重表
if (dedupMapper.exists(msg.getMsgId())) {
return;
}
// 2. 执行业务操作
orderService.createOrder(extractOrder(msg));
// 3. 记录消费状态(与业务同一个事务)
dedupMapper.insert(new DedupRecord(msg.getMsgId(), "SUCCESS"));
}
4. Redis原子操作
// 使用Redis的SETNX命令实现分布式锁
public boolean tryProcess(String messageKey) {
String lockKey = "dedup:" + messageKey;
// 设置过期时间防止死锁
Boolean success = redisTemplate.opsForValue()
.setIfAbsent(lockKey, "1", 24, TimeUnit.HOURS);
return Boolean.TRUE.equals(success);
}
三、RocketMQ特定方案
1. 顺序消息+单线程消费
// 顺序消费天然减少重复概率
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
// 单线程顺序处理
}
});
2. 调整重试策略
// 减少重试次数(降低重复概率)
consumer.setMaxReconsumeTimes(3);
四、方案选型建议
方案 | 适用场景 | 优点 | 缺点 |
---|---|---|---|
业务幂等 | 所有场景 | 最可靠 | 需要业务配合 |
消息ID去重 | 短期去重 | 实现简单 | 集群环境复杂 |
事务表 | 金融级场景 | 强一致 | 性能影响大 |
Redis方案 | 高并发场景 | 性能好 | 依赖Redis |
五、最佳实践
- 多级防御:
- 生产者避免重复发送(消息ID去重)
- Broker端合理配置重试策略
- 消费者必须实现幂等
- 监控措施:bash复制# 监控重复消息比例 sh mqadmin consumerProgress -n namesrv:9876 | grep DUPLICATE
- 压测验证:
- 模拟网络异常和消费者重启
- 验证消息是否被重复处理
终极建议:在业务层实现幂等处理是最可靠的方式,其他方案可作为辅助手段。对于金融交易等关键业务,建议采用”业务幂等+事务表”的双重保障机制。
THE END
暂无评论内容