前言

概念引用自MQTT中文网 http://mqtt.p2hp.com/mqtt-5-0

先了解几个概念

主题通配符

主题层级(topic level)分隔符用于将结构化引入主题名。如果存在分隔符,它将主题名分割为多个主题层级 topic level
订阅的主题过滤器可以包含特殊的通配符,允许客户端一次订阅多个主题。
主题过滤器中可以使用通配符,但是主题名不能使用通配符 [MQTT-4.7.0-1]。

主题层级分隔符

斜杠(“/” U+002F)用于分割主题的每个层级,为主题名提供一个分层结构。当客户端订阅指定的主题过滤器包含两种通配符时,主题层级分隔符就很有用了。主题层级分隔符可以出现在主题过滤器或主题名字的任何位置。相邻的主题层次分隔符表示一个零长度的主题层级。

多层通配符

数字符号(“#” U+0023)是用于匹配主题中任意层级的通配符。多层通配符表示它的父级和任意数量的子层级。多层通配符必须单独指定,或者跟在主题层级分隔符后面。不管哪种情况,它都必须**是主题过滤器的最后一个字符** [MQTT-4.7.1-1]。

例如,如果客户端订阅主题 “sport/tennis/player1/#”,它会收到使用下列主题名发布的消息:

  • “sport/tennis/player1”
  • “sport/tennis/player1/ranking”
  • “sport/tennis/player1/score/wimbledon”

非规范评注

  • “sport/#”也匹配单独的 “sport” 主题名,因为#包括它的父级。
  • “#”是有效的,会收到所有的应用消息。
  • “sport/tennis/#”也是有效的。
  • “sport/tennis#”是无效的。
  • “sport/tennis/#/ranking”是无效的。4.7.1-1]。

    单层通配符

    加号(“+” U+002B) 是只能用于单个主题层级匹配的通配符。
    在主题过滤器的任意层级都可以使用单层通配符,包括第一个和最后一个层级。在使用它时,它必须占据过滤器的整个层级 [MQTT-4.7.1-2]。可以在主题过滤器中的多个层级中使用它,也可以和多层通配符一起使用。

例如,“sport/tennis/+”匹配“sport/tennis/player1”和“sport/tennis/player2”,但是不匹配“sport/tennis/player1/ranking”。同时,由于单层通配符只能匹配一个层级,“sport/+”不匹配“sport”但是却匹配 “sport/”。

  • “+” 是有效的。
  • “+/tennis/#” 是有效的。
  • “sport+” 是无效的。
  • “sport/+/player1” 也是有效的。
  • “/finance” 匹配 “+/+” 和 “/+” ,但是不匹配 “+”。

    正文

    在mqtt协议使用中,如何验证一个主题是否符合订阅主题格式呢
    假设发送消息到主题 v1/123456/telemetry/location
    其中第二层123456是设备编号,第四层location是遥测消息类型
    客户端订阅 v1/+/telemetry/+即可收到所有设备的所有遥测类型消息
    虽然很简单,但是还是值得看一下的,来看thingsboard是怎么做的
  1. public static void main(String[] args) {
  2. System.out.println(MqttTopicFilterFactory.toFilter("v1/+/telemetry/+").filter("v1/123456/telemetry/location"));
  3. System.out.println(MqttTopicFilterFactory.toFilter("v1/+/telemetry/+").filter("v1/712361/telemetry/speed"));
  4. System.out.println(MqttTopicFilterFactory.toFilter("v1/#").filter("v1/712361/telemetry/speed"));
  5. System.out.println(MqttTopicFilterFactory.toFilter("v1/+/telemetry/+").filter("v1/712361/attribute/speed"));
  6. }

输出

  1. 12:59:06.518 [main] DEBUG xxx - Converting [v1/+/telemetry/+] to [v1/[^/]+/telemetry/[^/]+]
  2. true
  3. true
  4. 12:59:06.525 [main] DEBUG xxx - Converting [v1/#] to [v1($|/.*)]
  5. true
  6. false

接口MqttTopicFilter的定义,有三个实现
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/MqttTopicFilter.java

  1. public interface MqttTopicFilter {
  2. boolean filter(String topic);
  3. }

common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/AlwaysTrueTopicFilter.java

  1. @Data
  2. public class AlwaysTrueTopicFilter implements MqttTopicFilter {
  3. @Override
  4. public boolean filter(String topic) {
  5. return true;
  6. }
  7. }

common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/EqualsTopicFilter.java

  1. @Data
  2. public class EqualsTopicFilter implements MqttTopicFilter {
  3. private final String filter;
  4. @Override
  5. public boolean filter(String topic) {
  6. return filter.equals(topic);
  7. }
  8. }

common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/RegexTopicFilter.java

  1. @Data
  2. public class RegexTopicFilter implements MqttTopicFilter {
  3. private final Pattern regex;
  4. public RegexTopicFilter(String regex) {
  5. this.regex = Pattern.compile(regex);
  6. }
  7. @Override
  8. public boolean filter(String topic) {
  9. return regex.matcher(topic).matches();
  10. }
  11. }

common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/MqttTopicFilterFactory.java

  1. @Slf4j
  2. public class MqttTopicFilterFactory {
  3. private static final ConcurrentMap<String, MqttTopicFilter> filters = new ConcurrentHashMap<>();
  4. private static final MqttTopicFilter DEFAULT_TELEMETRY_TOPIC_FILTER = toFilter(MqttTopics.DEVICE_TELEMETRY_TOPIC);
  5. private static final MqttTopicFilter DEFAULT_ATTRIBUTES_TOPIC_FILTER = toFilter(MqttTopics.DEVICE_ATTRIBUTES_TOPIC);
  6. public static MqttTopicFilter toFilter(String topicFilter) {
  7. if (topicFilter == null || topicFilter.isEmpty()) {
  8. throw new IllegalArgumentException("Topic filter can't be empty!");
  9. }
  10. return filters.computeIfAbsent(topicFilter, filter -> {
  11. if (filter.equals("#")) {
  12. return new AlwaysTrueTopicFilter();
  13. } else if (filter.contains("+") || filter.contains("#")) {
  14. String regex = filter
  15. .replace("\\", "\\\\")
  16. .replace("+", "[^/]+")
  17. .replace("/#", "($|/.*)");
  18. log.debug("Converting [{}] to [{}]", filter, regex);
  19. return new RegexTopicFilter(regex);
  20. } else {
  21. return new EqualsTopicFilter(filter);
  22. }
  23. });
  24. }
  25. public static MqttTopicFilter getDefaultTelemetryFilter() {
  26. return DEFAULT_TELEMETRY_TOPIC_FILTER;
  27. }
  28. public static MqttTopicFilter getDefaultAttributesFilter() {
  29. return DEFAULT_ATTRIBUTES_TOPIC_FILTER;
  30. }
  31. }