面试题:RocketMQ 的消息过滤是如何实现的?

RocketMQ 提供了多种消息过滤方式,可以根据业务需求高效地筛选消息。以下是完整的消息过滤实现方案:

一、过滤类型对比

过滤方式实现位置优点缺点适用场景
Tag过滤Broker端高效只能精确匹配简单分类
SQL92过滤Broker端灵活性能损耗复杂条件
类过滤消费者端完全自定义全量传输特殊逻辑

二、Tag过滤(最常用)

1. 生产者设置Tag

Message msg = new Message("TopicTest", "TagA", "Hello World".getBytes());
producer.send(msg);

2. 消费者订阅指定Tag

// 单个Tag
consumer.subscribe("TopicTest", "TagA");

// 多个Tag(或关系)
consumer.subscribe("TopicTest", "TagA || TagB");

// 全部Tag
consumer.subscribe("TopicTest", "*");

实现原理

  • Broker在存储时维护Tag的Hash索引
  • 投递时快速匹配Hash值

三、SQL92过滤(功能强大)

1. 启用配置(Broker端)

enablePropertyFilter=true

2. 生产者设置属性

Message msg = new Message("TopicTest", "TagA", "Hello World".getBytes());
msg.putUserProperty("a", String.valueOf(10));
msg.putUserProperty("b", "hello");

3. 消费者SQL订阅

consumer.subscribe("TopicTest", 
    MessageSelector.bySql("a between 0 and 10 AND b = 'hello'"));

支持语法

  • 数值比较:>>=<<==
  • 逻辑运算:ANDORNOT
  • 空判断:IS NULLIS NOT NULL
  • 范围查询:INBETWEEN

四、类过滤(完全自定义)

1. 实现过滤接口

public class MyMessageFilter implements MessageFilter {
    @Override
    public boolean match(MessageExt msg) {
        String body = new String(msg.getBody());
        return body.contains("important");
    }
}

2. 消费者配置

consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
        ConsumeConcurrentlyContext context) {
        
        // 应用过滤
        List<MessageExt> filtered = msgs.stream()
            .filter(new MyMessageFilter())
            .collect(Collectors.toList());
            
        // 处理过滤后的消息...
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

五、高级过滤方案

1. 多维度联合过滤

// SQL表达式组合多个属性
consumer.subscribe("TopicTest", 
    MessageSelector.bySql("region = 'East' AND price > 100 AND category IN ('Electronics', 'Furniture')"));

2. 过滤性能优化

# Broker端配置(增加过滤线程数)
filterServerNums=4

3. 服务端过滤+客户端过滤

// 先通过Tag粗筛,再在消费者精确过滤
consumer.subscribe("TopicTest", "Important*");
consumer.registerMessageListener(msgs -> {
    msgs.stream()
        .filter(msg -> checkComplexCondition(msg))
        .forEach(this::process);
});

六、实现原理剖析

  1. Tag过滤
    • 使用Bloom Filter加速匹配
    • 每个ConsumeQueue条目存储Tag HashCode
  2. SQL过滤
    • 基于RocketMQ的Filter Server组件
    • 编译SQL表达式为过滤树
    • 对消息属性进行运行时计算
  3. 性能对比:复制Tag过滤 → SQL过滤 → 类过滤 (最快) (中等) (最慢)

七、最佳实践

  1. 设计建议
    • 优先使用Tag进行一级分类
    • 属性命名采用蛇形命名法(如user_id
    • 避免过于复杂的SQL表达式
  2. 错误用法
    // 反例:在SQL中解析消息体(极其低效)
    consumer.subscribe("TopicTest", "JSON_EXTRACT(body, '$.price') > 100");
  3. 监控指标
    # 查看过滤统计
    sh mqadmin brokerStatus -n namesrv:9876 -b brokerAddress | grep filter

RocketMQ的过滤机制可以显著减少网络传输和消费者处理压力,根据业务特点选择合适的过滤方式能极大提升系统效率。对于日均亿级消息的系统,合理的Tag设计可以降低50%以上的资源消耗。

THE END
点赞5 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片

    暂无评论内容