面试题:在 RocketMQ 中,如何确保消息不会重复消费?

RocketMQ 本身不提供消息去重功能,但可以通过以下机制实现消息的幂等消费,避免重复消费问题:

一、消息重复的根本原因

  1. 生产者重复发送
    • 网络抖动导致生产者重试
    • 事务消息回查机制可能造成重复
  2. Broker端重复
    • 主从切换时可能产生重复
    • 消息重投机制(RETRY topic)
  3. 消费者重复处理
    • 消费成功后崩溃,未提交offset
    • 消费者重启后从之前位点重新消费

二、解决方案(从简单到复杂)

1. 业务层幂等设计(推荐)

// 基于唯一业务ID的幂等处理示例
public class OrderService {
    // 使用ConcurrentHashMap模拟去重存储(生产环境用Redis/DB)
    private ConcurrentHashMap<String, Boolean> processedOrders = new ConcurrentHashMap<>();
    
    public void processOrder(OrderMessage msg) {
        // 1. 提取唯一业务ID
        String orderId = msg.getOrderId();
        
        // 2. 检查是否已处理
        if (processedOrders.containsKey(orderId)) {
            log.warn("重复订单: {}", orderId);
            return; // 已处理则直接返回
        }
        
        // 3. 处理核心业务逻辑
        try {
            createOrder(msg);
            
            // 4. 记录处理状态(必须先完成业务操作再记录)
            processedOrders.put(orderId, true);
        } catch (Exception e) {
            // 失败时需清除标记(根据业务需求决定)
            processedOrders.remove(orderId);
            throw e;
        }
    }
}

实现要点

  • 使用业务唯一标识(订单ID、支付流水号等)
  • 先执行业务操作再更新状态(避免状态更新成功但业务失败)
  • 状态存储需要持久化(推荐Redis或数据库)

2. 消息唯一ID去重

// 基于MessageId的去重(适用于短时间窗口)
public class DedupConsumer {
    private Set<String> processedMsgIds = Collections.newSetFromMap(new ConcurrentHashMap<>());
    
    public void consume(MessageExt msg) {
        String msgId = msg.getMsgId();
        if (processedMsgIds.contains(msgId)) {
            return;
        }
        
        // 处理业务...
        processedMsgIds.add(msgId);
        
        // 定时清理旧消息ID(避免内存溢出)
    }
}

限制

  • 只适用于短时间内去重
  • 集群环境下需要共享存储

3. 事务表+本地事务(最可靠)

CREATE TABLE message_dedup (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    msg_id VARCHAR(64) UNIQUE,
    business_id VARCHAR(64),
    status TINYINT,
    create_time TIMESTAMP
);
@Transactional
public void processWithTransaction(MessageExt msg) {
    // 1. 检查去重表
    if (dedupMapper.exists(msg.getMsgId())) {
        return;
    }
    
    // 2. 执行业务操作
    orderService.createOrder(extractOrder(msg));
    
    // 3. 记录消费状态(与业务同一个事务)
    dedupMapper.insert(new DedupRecord(msg.getMsgId(), "SUCCESS"));
}

4. Redis原子操作

// 使用Redis的SETNX命令实现分布式锁
public boolean tryProcess(String messageKey) {
    String lockKey = "dedup:" + messageKey;
    // 设置过期时间防止死锁
    Boolean success = redisTemplate.opsForValue()
        .setIfAbsent(lockKey, "1", 24, TimeUnit.HOURS);
    return Boolean.TRUE.equals(success);
}

三、RocketMQ特定方案

1. 顺序消息+单线程消费

// 顺序消费天然减少重复概率
consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, 
            ConsumeOrderlyContext context) {
        // 单线程顺序处理
    }
});

2. 调整重试策略

// 减少重试次数(降低重复概率)
consumer.setMaxReconsumeTimes(3);

四、方案选型建议

方案适用场景优点缺点
业务幂等所有场景最可靠需要业务配合
消息ID去重短期去重实现简单集群环境复杂
事务表金融级场景强一致性能影响大
Redis方案高并发场景性能好依赖Redis

五、最佳实践

  1. 多级防御
    • 生产者避免重复发送(消息ID去重)
    • Broker端合理配置重试策略
    • 消费者必须实现幂等
  2. 监控措施:bash复制# 监控重复消息比例 sh mqadmin consumerProgress -n namesrv:9876 | grep DUPLICATE
  3. 压测验证
    • 模拟网络异常和消费者重启
    • 验证消息是否被重复处理

终极建议:在业务层实现幂等处理是最可靠的方式,其他方案可作为辅助手段。对于金融交易等关键业务,建议采用”业务幂等+事务表”的双重保障机制。

THE END
点赞5 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片

    暂无评论内容