RocketMQ 提供了多种消息过滤方式,可以根据业务需求高效地筛选消息。以下是完整的消息过滤实现方案:
一、过滤类型对比
过滤方式 | 实现位置 | 优点 | 缺点 | 适用场景 |
---|---|---|---|---|
Tag过滤 | Broker端 | 高效 | 只能精确匹配 | 简单分类 |
SQL92过滤 | Broker端 | 灵活 | 性能损耗 | 复杂条件 |
类过滤 | 消费者端 | 完全自定义 | 全量传输 | 特殊逻辑 |
二、Tag过滤(最常用)
1. 生产者设置Tag
Message msg = new Message("TopicTest", "TagA", "Hello World".getBytes());
producer.send(msg);
2. 消费者订阅指定Tag
// 单个Tag
consumer.subscribe("TopicTest", "TagA");
// 多个Tag(或关系)
consumer.subscribe("TopicTest", "TagA || TagB");
// 全部Tag
consumer.subscribe("TopicTest", "*");
实现原理:
- Broker在存储时维护Tag的Hash索引
- 投递时快速匹配Hash值
三、SQL92过滤(功能强大)
1. 启用配置(Broker端)
enablePropertyFilter=true
2. 生产者设置属性
Message msg = new Message("TopicTest", "TagA", "Hello World".getBytes());
msg.putUserProperty("a", String.valueOf(10));
msg.putUserProperty("b", "hello");
3. 消费者SQL订阅
consumer.subscribe("TopicTest",
MessageSelector.bySql("a between 0 and 10 AND b = 'hello'"));
支持语法:
- 数值比较:
>
,>=
,<
,<=
,=
- 逻辑运算:
AND
,OR
,NOT
- 空判断:
IS NULL
,IS NOT NULL
- 范围查询:
IN
,BETWEEN
四、类过滤(完全自定义)
1. 实现过滤接口
public class MyMessageFilter implements MessageFilter {
@Override
public boolean match(MessageExt msg) {
String body = new String(msg.getBody());
return body.contains("important");
}
}
2. 消费者配置
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
// 应用过滤
List<MessageExt> filtered = msgs.stream()
.filter(new MyMessageFilter())
.collect(Collectors.toList());
// 处理过滤后的消息...
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
五、高级过滤方案
1. 多维度联合过滤
// SQL表达式组合多个属性
consumer.subscribe("TopicTest",
MessageSelector.bySql("region = 'East' AND price > 100 AND category IN ('Electronics', 'Furniture')"));
2. 过滤性能优化
# Broker端配置(增加过滤线程数)
filterServerNums=4
3. 服务端过滤+客户端过滤
// 先通过Tag粗筛,再在消费者精确过滤
consumer.subscribe("TopicTest", "Important*");
consumer.registerMessageListener(msgs -> {
msgs.stream()
.filter(msg -> checkComplexCondition(msg))
.forEach(this::process);
});
六、实现原理剖析
- Tag过滤:
- 使用Bloom Filter加速匹配
- 每个ConsumeQueue条目存储Tag HashCode
- SQL过滤:
- 基于RocketMQ的Filter Server组件
- 编译SQL表达式为过滤树
- 对消息属性进行运行时计算
- 性能对比:复制Tag过滤 → SQL过滤 → 类过滤 (最快) (中等) (最慢)
七、最佳实践
- 设计建议:
- 优先使用Tag进行一级分类
- 属性命名采用蛇形命名法(如
user_id
) - 避免过于复杂的SQL表达式
- 错误用法:
// 反例:在SQL中解析消息体(极其低效) consumer.subscribe("TopicTest", "JSON_EXTRACT(body, '$.price') > 100");
- 监控指标:
# 查看过滤统计 sh mqadmin brokerStatus -n namesrv:9876 -b brokerAddress | grep filter
RocketMQ的过滤机制可以显著减少网络传输和消费者处理压力,根据业务特点选择合适的过滤方式能极大提升系统效率。对于日均亿级消息的系统,合理的Tag设计可以降低50%以上的资源消耗。
THE END
暂无评论内容