前言
概念引用自MQTT中文网 http://mqtt.p2hp.com/mqtt-5-0
主题通配符
主题层级(topic level)分隔符用于将结构化引入主题名。如果存在分隔符,它将主题名分割为多个主题层级 topic level 。
订阅的主题过滤器可以包含特殊的通配符,允许客户端一次订阅多个主题。
主题过滤器中可以使用通配符,但是主题名不能使用通配符 [MQTT-4.7.0-1]。
主题层级分隔符
斜杠(“/” U+002F)用于分割主题的每个层级,为主题名提供一个分层结构。当客户端订阅指定的主题过滤器包含两种通配符时,主题层级分隔符就很有用了。主题层级分隔符可以出现在主题过滤器或主题名字的任何位置。相邻的主题层次分隔符表示一个零长度的主题层级。
多层通配符
数字符号(“#” U+0023)是用于匹配主题中任意层级的通配符。多层通配符表示它的父级和任意数量的子层级。多层通配符必须单独指定,或者跟在主题层级分隔符后面。不管哪种情况,它都必须**是主题过滤器的最后一个字符** [MQTT-4.7.1-1]。
例如,如果客户端订阅主题 “sport/tennis/player1/#”,它会收到使用下列主题名发布的消息:
- “sport/tennis/player1”
- “sport/tennis/player1/ranking”
- “sport/tennis/player1/score/wimbledon”
非规范评注
- “sport/#”也匹配单独的 “sport” 主题名,因为#包括它的父级。
- “#”是有效的,会收到所有的应用消息。
- “sport/tennis/#”也是有效的。
- “sport/tennis#”是无效的。
- “sport/tennis/#/ranking”是无效的。4.7.1-1]。
单层通配符
加号(“+” U+002B) 是只能用于单个主题层级匹配的通配符。
在主题过滤器的任意层级都可以使用单层通配符,包括第一个和最后一个层级。在使用它时,它必须占据过滤器的整个层级 [MQTT-4.7.1-2]。可以在主题过滤器中的多个层级中使用它,也可以和多层通配符一起使用。
例如,“sport/tennis/+”匹配“sport/tennis/player1”和“sport/tennis/player2”,但是不匹配“sport/tennis/player1/ranking”。同时,由于单层通配符只能匹配一个层级,“sport/+”不匹配“sport”但是却匹配 “sport/”。
- “+” 是有效的。
- “+/tennis/#” 是有效的。
- “sport+” 是无效的。
- “sport/+/player1” 也是有效的。
- “/finance” 匹配 “+/+” 和 “/+” ,但是不匹配 “+”。
正文
在mqtt协议使用中,如何验证一个主题是否符合订阅主题格式呢
假设发送消息到主题v1/123456/telemetry/location
其中第二层123456是设备编号,第四层location是遥测消息类型
客户端订阅v1/+/telemetry/+
即可收到所有设备的所有遥测类型消息
虽然很简单,但是还是值得看一下的,来看thingsboard是怎么做的
public static void main(String[] args) {
System.out.println(MqttTopicFilterFactory.toFilter("v1/+/telemetry/+").filter("v1/123456/telemetry/location"));
System.out.println(MqttTopicFilterFactory.toFilter("v1/+/telemetry/+").filter("v1/712361/telemetry/speed"));
System.out.println(MqttTopicFilterFactory.toFilter("v1/#").filter("v1/712361/telemetry/speed"));
System.out.println(MqttTopicFilterFactory.toFilter("v1/+/telemetry/+").filter("v1/712361/attribute/speed"));
}
输出
12:59:06.518 [main] DEBUG xxx - Converting [v1/+/telemetry/+] to [v1/[^/]+/telemetry/[^/]+]
true
true
12:59:06.525 [main] DEBUG xxx - Converting [v1/#] to [v1($|/.*)]
true
false
接口MqttTopicFilter的定义,有三个实现common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/MqttTopicFilter.java
public interface MqttTopicFilter {
boolean filter(String topic);
}
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/AlwaysTrueTopicFilter.java
@Data
public class AlwaysTrueTopicFilter implements MqttTopicFilter {
@Override
public boolean filter(String topic) {
return true;
}
}
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/EqualsTopicFilter.java
@Data
public class EqualsTopicFilter implements MqttTopicFilter {
private final String filter;
@Override
public boolean filter(String topic) {
return filter.equals(topic);
}
}
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/RegexTopicFilter.java
@Data
public class RegexTopicFilter implements MqttTopicFilter {
private final Pattern regex;
public RegexTopicFilter(String regex) {
this.regex = Pattern.compile(regex);
}
@Override
public boolean filter(String topic) {
return regex.matcher(topic).matches();
}
}
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/MqttTopicFilterFactory.java
@Slf4j
public class MqttTopicFilterFactory {
private static final ConcurrentMap<String, MqttTopicFilter> filters = new ConcurrentHashMap<>();
private static final MqttTopicFilter DEFAULT_TELEMETRY_TOPIC_FILTER = toFilter(MqttTopics.DEVICE_TELEMETRY_TOPIC);
private static final MqttTopicFilter DEFAULT_ATTRIBUTES_TOPIC_FILTER = toFilter(MqttTopics.DEVICE_ATTRIBUTES_TOPIC);
public static MqttTopicFilter toFilter(String topicFilter) {
if (topicFilter == null || topicFilter.isEmpty()) {
throw new IllegalArgumentException("Topic filter can't be empty!");
}
return filters.computeIfAbsent(topicFilter, filter -> {
if (filter.equals("#")) {
return new AlwaysTrueTopicFilter();
} else if (filter.contains("+") || filter.contains("#")) {
String regex = filter
.replace("\\", "\\\\")
.replace("+", "[^/]+")
.replace("/#", "($|/.*)");
log.debug("Converting [{}] to [{}]", filter, regex);
return new RegexTopicFilter(regex);
} else {
return new EqualsTopicFilter(filter);
}
});
}
public static MqttTopicFilter getDefaultTelemetryFilter() {
return DEFAULT_TELEMETRY_TOPIC_FILTER;
}
public static MqttTopicFilter getDefaultAttributesFilter() {
return DEFAULT_ATTRIBUTES_TOPIC_FILTER;
}
}