前言
MQTT协议通过预定义的MQTT控制报文来通信,这一节按照MQTT控制报文类型,通过跟踪Thingsboard处理这些协议报文来解读程序如何处理协议的。
MQTT控制报文由三部分组成,按照 表格1 –MQTT控制报文的结构` 描述的顺序:
表格1 –MQTT控制报文的结构
Fixed header | 固定报头,所有控制报文都包含 |
---|---|
Variable header | 可变报文,部分控制报文包含 |
Payload | 有效载荷,部分控制报文包含 |
固定报头 Fixed header: 1. MQTT控制报文类型,2. 标志Flags(DUP1=控制报文的重复分发标志,PUBLISH报文的服务质量等级,RETAIN3=PUBLISH报文的保留标志),3.剩余长度 Remaining Length
可变报文: 报文标识符Packet Identifier
有效载荷: 某些MQTT控制报文载报文的最后部分包含一个有效载荷,对于PUBLISH来说有效载荷就是应用消息。
更多MQTT信息参见:
- MQTT3.1 英文文档 http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html
- MQTT3.1中文文档 https://mcxiaoke.gitbook.io/mqtt/
处理入口
代码块1-1
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
log.trace("[{}] Processing msg: {}", sessionId, msg);
try {
if (msg instanceof MqttMessage) {
processMqttMsg(ctx, (MqttMessage) msg);
} else {
ctx.close();
}
} finally {
ReferenceCountUtil.safeRelease(msg);
}
}Copy to clipboardErrorCopied
第5-9行代码,通过判断读取到的数据是否为MqttMessage,如果是则进入processMqttMsg(cox, (MqttMessage) msg)
类; 如果不是则主动关闭连接。
第11行代码,自从Netty 4开始,对象的生命周期由它们的引用计数(reference counts)管理,而不是由垃圾收集器(garbage collector)管理了。Bytebuf是最值得注意的,它使用了引用计数来改进分配内存和释放内存的性能。ReferenceCountUtil.safeRelease(msg)
: 将引用计数减少。
代码块1-2
private void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {
address = (InetSocketAddress) ctx.channel().remoteAddress();
if (msg.fixedHeader() == null) {
log.info("[{}:{}] Invalid message received", address.getHostName(), address.getPort());
processDisconnect(ctx);
return;
}
deviceSessionCtx.setChannel(ctx);
switch (msg.fixedHeader().messageType()) {
case CONNECT:
processConnect(ctx, (MqttConnectMessage) msg);
break;
case PUBLISH:
processPublish(ctx, (MqttPublishMessage) msg);
break;
case SUBSCRIBE:
processSubscribe(ctx, (MqttSubscribeMessage) msg);
break;
case UNSUBSCRIBE:
processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
break;
case PINGREQ:
//校验设备连接状态
if (checkConnected(ctx, msg)) {
ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
transportService.reportActivity(sessionInfo);
if (gatewaySessionHandler != null) {
gatewaySessionHandler.reportActivity();
}
}
break;
case DISCONNECT:
if (checkConnected(ctx, msg)) {
processDisconnect(ctx);
}
break;
default:
break;
}
}Copy to clipboardErrorCopied
第3-7行代码,如果消息固定报头为空,则打印客户端的ip和端口。并进入processDisconnect(ctx)
类。通过判断MQTT协议的固定报头类型,进行相应的逻辑操作。
表格2-控制报文的类型和相应的逻辑类
名字 | 报文流动方向 | 描述 | 逻辑处理类 |
---|---|---|---|
CONNECT | 客户端到服务端 | 客户端请求连接服务端 | processConnect |
PUBLISH | 两个方向都允许 | 发布消息 | processPublish |
SUBSCRIBE | 客户端到服务端 | 客户端订阅请求 | processSubscribe |
UNSUBSCRIBE | 客户端到服务端 | 客户端取消订阅请求 | processUnsubscribe |
PINGREQ | 客户端到服务端 | 心跳请求 | 直接报文回复 |
DISCONNECT | 客户端到服务端 | 客户端断开连接 | processDisconnect(ctx) |
PINGREQ – 心跳请求
客户端发送PINGREQ报文给服务端的,用于:
- 在没有任何其他控制报文从客户端发给服务端时,告知服务端客户端还活着。
- 请求服务端发送响应确认它还活着。
- 使用网络以确认网络连接没有断开。
服务端发送PINGRESP报文响应客户端的PINGREQ报文。表示服务端还活着。
PINGRESP报文没有可变报头和有效载荷。
在代码块1-2中,第22-30行代码,如果设备处于连接状态的话。
则回复PINGRESP报文给客户端并记录该设备/网关最新一次活动。
ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));Copy to clipboardErrorCopied
CONNECT-连接服务端
客户端到服务端的网络连接建立后,客户端发送给服务端的第一个报文必须是CONNECT报文 [MQTT-3.1.0-1]。
在一个网络连接上,客户端只能发送一次CONNECT报文。服务端必须将客户端发送的第二个CONNECT报文当作协议违规处理并断开客户端的连接 [MQTT-3.1.0-2]。
有效载荷包含一个或多个编码的字段。包括客户端的唯一标识符,Will主题,Will消息,用户名和密码。除了客户端标识之外,其它的字段都是可选的,基于标志位来决定可变报头中是否需要包含这些字段。
服务端发送CONNACK报文响应从客户端收到的CONNECT报文。服务端发送给客户端的第一个报文必须是CONNACK [MQTT-3.2.0-1]。
如果客户端在合理的时间内没有收到服务端的CONNACK报文,客户端应该关闭网络连接。合理 的时间取决于应用的类型和通信基础设施。
代码块3-1
private void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
log.info("[{}] Processing connect msg for client: {}!", sessionId, msg.payload().clientIdentifier());
X509Certificate cert;
if (sslHandler != null && (cert = getX509Certificate()) != null) {
processX509CertConnect(ctx, cert);
} else {
processAuthTokenConnect(ctx, msg);
}
}Copy to clipboardErrorCopied
第2行代码,打印有效载荷Payload的客户端标志符Client Identifier, 服务端使用客户端标识符 (ClientId) 识别客户端。连接服务端的每个客户端都有唯一的客户端标识符(ClientId)。客户端和服务端都必须使用ClientId识别两者之间的MQTT会话相关的状态
第3-7行代码,通过判断是否有X509加密认证和sslhandler,对客户端的连接处理进行相应的认证处理。
代码块3-2
private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
String userName = msg.payload().userName();
log.info("[{}] Processing connect msg for client with user name: {}!", sessionId, userName);
if (StringUtils.isEmpty(userName)) {
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD));
ctx.close();
} else {
transportService.process({
...
});
}
}Copy to clipboardErrorCopied
在AuthToken连接认证中,如果客户端的用户名为空,响应错误用户名或密码连接拒绝错误码((byte) 0x04)。具体逻辑处理类:
代码块3-2-1
private MqttConnAckMessage createMqttConnAckMsg(MqttConnectReturnCode returnCode) {
//mqtt报文数据:CONNACK,false(DUP),AT_MOST_ONCE(QoS),0(isRetain),0x04(连接已拒绝,无效的用户名或密码)
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(CONNACK, false, AT_MOST_ONCE, false, 0);
MqttConnAckVariableHeader mqttConnAckVariableHeader =
new MqttConnAckVariableHeader(returnCode, true);
return new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader);
}Copy to clipboardErrorCopied
代码块3-3
private void processX509CertConnect(ChannelHandlerContext ctx, X509Certificate cert) {
try {
String strCert = SslUtil.getX509CertificateString(cert);
String sha3Hash = EncryptionUtil.getSha3Hash(strCert);
transportService.process({
...
});
} catch (Exception e) {
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
ctx.close();
}
}Copy to clipboardErrorCopied
第3-7行代码,对认证报文进行逻辑处理,当出现异常时,响应客户端未被授权连接到此服务器错误码((byte) 0x05),具体逻辑处理类:
代码块3-3-1
private MqttConnAckMessage createMqttConnAckMsg(MqttConnectReturnCode returnCode) {
//mqtt报文数据:CONNACK, false(DUP),AT_MOST_ONCE,0(isRetain), 0x05(连接已拒绝,未授权)
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(CONNACK, false, AT_MOST_ONCE, false, 0);
MqttConnAckVariableHeader mqttConnAckVariableHeader =
new MqttConnAckVariableHeader(returnCode, true);
return new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader);
}Copy to clipboardErrorCopied
DISCONNECT –断开连接
DISCONNECT报文是客户端发给服务端的最后一个控制报文。表示客户端正常断开连接。
代码块4-1
private void processDisconnect(ChannelHandlerContext ctx) {
//关闭连接
ctx.close();
log.info("[{}] Client disconnected!", sessionId);
//如果设备会话上下文是连接的情况,需要通知程序,该设备/网关已关闭
if (deviceSessionCtx.isConnected()) {
transportService.process(sessionInfo, AbstractTransportService.getSessionEventMsg(SessionEvent.CLOSED), null);
transportService.deregisterSession(sessionInfo);
if (gatewaySessionHandler != null) {
gatewaySessionHandler.onGatewayDisconnect();
}
}
}Copy to clipboardErrorCopied
第3行代码,关闭这个协议报文不合理或不被认证成功的设备连接。
第6-12行代码,如果设备会话上下文是连接的情况,需要通知设备,该设备/网关已关闭。
PUBLISH – 发布消息
ThingsBoard仅支持QoS 1级别的发布消息
PUBLISH控制报文是指从客户端向服务端或者服务端向客户端传输一个应用消息。
PUBACK报文是对QoS 1等级的PUBLISH报文的响应。
代码块5-1
private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) {
//校验设备是否为连接状态
if (!checkConnected(ctx, mqttMsg)) {
return;
}
//获取主题名
String topicName = mqttMsg.variableHeader().topicName();
//获取报文标识符Packet Identifier
int msgId = mqttMsg.variableHeader().packetId();
log.trace("[{}][{}] Processing publish msg [{}][{}]!", sessionId, deviceSessionCtx.getDeviceId(), topicName, msgId);
//判断主题名是否以v1/gateway开头
if (topicName.startsWith(MqttTopics.BASE_GATEWAY_API_TOPIC)) {
if (gatewaySessionHandler != null) {
handleGatewayPublishMsg(topicName, msgId, mqttMsg);
}
} else {
processDevicePublish(ctx, mqttMsg, topicName, msgId);
}
}Copy to clipboardErrorCopied
第3行代码,校验设备是否为连接状态,如果false
,则返回。
第6行代码,获取PUBLISH类型报文的主题名,可变报头按顺序包含主题名和报文标识符。主题名(Topic Name)用于识别有效载荷数据应该被发布到哪一个信息通道。
第9行代码,获取报文标识符Packet Identifier, 只有当QoS等级是1或2时,报文标识符(Packet Identifier)字段才能出现在PUBLISH报文中。
第12-18行代码中,判断主题名是否以v1/gateway
开头。如果是的话,判断该设备为网关,进入处理网关PUBLISH报文类型的逻辑类中,反之进入设备PUBLISH报文类型的逻辑类中。
代码块5-1-1
private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) {
try {
if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_TOPIC)) {
...
} else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_TOPIC)) {
...
} else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
...
} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_TOPIC)) {
...
} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC)) {
...
} else if (topicName.equals(MqttTopics.DEVICE_CLAIM_TOPIC)) {
...
}
} catch (AdaptorException e) {
log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId);
ctx.close();
}
}Copy to clipboardErrorCopied
第3行代码, 如果设备的PUBLISH消息主题是v1/devices/me/telemetry
,则表明该消息为设备遥测数据。
第5行代码,如果设备的PUBLISH消息主题是v1/devices/me/attributes
,则表明该消息为客户端设备属性数据。
第7行代码,为了向ThingsBoard服务器节点请求客户端或共享设备属性,请将PUBLISH消息发送到以下主题:v1/devices/me/attributes/request/$request_id
,其中$ request_id是您的整数请求标识符。
第9行代码,来自服务器的响应将发布到以下主题: v1/devices/me/rpc/response/$request_id
第11行代码, 为了将RPC命令发送到服务器,请将PUBLISH消息发送到以下主题: v1/devices/me/rpc/request/$request_id
。
第13行代码,为了启动声明设备,请向以下主题发送PUBLISH消息:v1/devices/me/claim
代码块5-1-2
private void handleGatewayPublishMsg(String topicName, int msgId, MqttPublishMessage mqttMsg) {
try {
switch (topicName) {
case MqttTopics.GATEWAY_TELEMETRY_TOPIC:
...
break;
case MqttTopics.GATEWAY_CLAIM_TOPIC:
...
break;
case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC:
...
break;
case MqttTopics.GATEWAY_ATTRIBUTES_REQUEST_TOPIC:
...
break;
case MqttTopics.GATEWAY_RPC_TOPIC:
...
break;
case MqttTopics.GATEWAY_CONNECT_TOPIC:
...
break;
case MqttTopics.GATEWAY_DISCONNECT_TOPIC:
...
break;
}
} catch (RuntimeException | AdaptorException e) {
log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
}
}Copy to clipboardErrorCopied
第4行代码中,为了将网关遥测数据发布到ThingsBoard服务器节点,请将PUBLISH消息发送到以下主题:
v1/gateway/telemetryCopy to clipboardErrorCopied
第7行代码中,为了启动声明设备,请向以下主题发送PUBLISH消息:
v1/gateway/claimCopy to clipboardErrorCopied
第10行代码中,为了将网关设备属性发布到ThingsBoard服务器节点,请将PUBLISH消息发送到以下主题:
v1/gateway/attributesCopy to clipboardErrorCopied
第13行代码中,为了向ThingsBoard服务器节点请求客户端或共享设备属性,请将PUBLISH消息发送到以下主题:
v1/gateway/attributes/requestCopy to clipboardErrorCopied
第16行代码中,设备发布给服务器RPC命令,请将PUBLISH消息发送到以下主题:
v1/gateway/rpcCopy to clipboardErrorCopied
第19行代码中,为了通知ThingsBoard设备已连接到网关,需要发布以下消息:
v1/gateway/connectCopy to clipboardErrorCopied
第22行代码中,为了通知ThingsBoard设备已与网关断开连接,需要发布以下消息:
v1/gateway/disconnectCopy to clipboardErrorCopied
SUBSCRIBE - 订阅主题
客户端向服务端发送SUBSCRIBE报文用于创建一个或多个订阅。每个订阅注册客户端关心的一个或多个主题。为了将应用消息转发给与那些订阅匹配的主题,服务端发送PUBLISH报文给客户端。SUBSCRIBE报文也(为每个订阅)指定了最大的QoS等级,服务端根据这个发送应用消息给客户端
private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) {
//检查设备是否处于连接状态
if (!checkConnected(ctx, mqttMsg)) {
return;
}
log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
List<Integer> grantedQoSList = new ArrayList<>();
for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) {
String topic = subscription.topicName();
MqttQoS reqQoS = subscription.qualityOfService();
try {
switch (topic) {
case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: {
...
registerSubQoS(topic, grantedQoSList, reqQoS);
break;
}
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC: {
...
registerSubQoS(topic, grantedQoSList, reqQoS);
break;
}
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC:
case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC:
case MqttTopics.GATEWAY_RPC_TOPIC:
case MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
registerSubQoS(topic, grantedQoSList, reqQoS);
break;
default:
log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
grantedQoSList.add(FAILURE.value());
break;
}
} catch (Exception e) {
log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
grantedQoSList.add(FAILURE.value());
}
}
ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList));
}Copy to clipboardErrorCopied
第8-10行代码,SUBSCRIBE报文的有效载荷包含了一个主题过滤器列表,它们表示客户端想要订阅的主题。每一个过滤器后面跟着一个字节,这个字节被叫做 服务质量要求(Requested QoS)。它给出了服务端向客户端发送应用消息所允许的最大QoS等级。SUBSCRIBE报文的有效载荷必须包含至少一对主题过滤器 和 QoS等级字段组合。
第13-17行代码,订阅共享设备属性更改,发送SUBSCRIBE消息到以下主题:
v1/devices/me/attributesCopy to clipboardErrorCopied
第18-22行代码,为了从服务器订阅RPC命令,请将SUBSCRIBE消息发送到以下主题:
v1/devices/me/rpc/request/+Copy to clipboardErrorCopied
第23-29行代码,当主题为**v1/devices/me/rpc/response/+****
、v1/gateway/attributes
、v1/gateway/rpc
、v1/gateway/attributes/response
和v1/devices/me/attributes/response/+**
时,注册主题和QoS。
第30-33行代码,当主题不符合上述主题时,累加设备发给服务端的QoS。
第35-38行代码,当出现错误时,累加设备发给服务端的QoS
private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap;
private void registerSubQoS(String topic, List<Integer> grantedQoSList, MqttQoS reqQoS) {
grantedQoSList.add(getMinSupportedQos(reqQoS));
mqttQoSMap.put(new MqttTopicMatcher(topic), getMinSupportedQos(reqQoS));
}Copy to clipboardErrorCopied
服务端可以授予比订阅者要求的低一些的QoS等级。为响应订阅而发出的消息的有效载荷的QoS必须是原始发布消息的QoS和服务端授予的QoS两者中的最小值。
如果原始消息的QoS是1而被授予的最大QoS是0,允许服务端重复发送一个消息的副本给订阅者。
private static MqttSubAckMessage createSubAckMessage(Integer msgId, List<Integer> grantedQoSList) {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(SUBACK, false, AT_LEAST_ONCE, false, 0);
MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(msgId);
MqttSubAckPayload mqttSubAckPayload = new MqttSubAckPayload(grantedQoSList);
return new MqttSubAckMessage(mqttFixedHeader, mqttMessageIdVariableHeader, mqttSubAckPayload);
}Copy to clipboardErrorCopied
服务端发送给客户端的SUBACK报文对每一对主题过滤器 和QoS等级都必须包含一个返回码。这个返回码必须表示那个订阅被授予的最大QoS等级,或者表示这个订阅失败 。
UNSUBSCRIBE –取消订阅
客户端发送UNSUBSCRIBE报文给服务端,用于取消订阅主题。
服务端发送UNSUBACK报文给客户端用于确认收到UNSUBSCRIBE报文。
private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) {
//检查设备是否处于连接状态
if (!checkConnected(ctx, mqttMsg)) {
return;
}
log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
//遍历UNSUBSCRIBE报文提供的主题过滤器
for (String topicName : mqttMsg.payload().topics()) {
mqttQoSMap.remove(new MqttTopicMatcher(topicName));
try {
switch (topicName) {
case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: {
...
break;
}
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC: {
...
break;
}
}
} catch (Exception e) {
log.warn("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName);
}
}
ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId()));
}Copy to clipboardErrorCopied
第8-9行代码中,遍历UNSUBSCRIBE报文提供的主题过滤器,如果有主题匹配,那么它(服务端)自己的订阅将被删除。
第12-19行代码中,当主题为/v1/devices/me/attributes
和v1/devices/me/rpc/request/+
时,必须停止分发任何新消息给这个客户端等逻辑操作。
private MqttMessage createUnSubAckMessage(int msgId) {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(UNSUBACK, false, AT_LEAST_ONCE, false, 0);
MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(msgId);
return new MqttMessage(mqttFixedHeader, mqttMessageIdVariableHeader);
}Copy to clipboardErrorCopied
服务端必须发送UNSUBACK报文响应客户端的UNSUBSCRIBE请求。UNSUBACK报文必须包含和UNSUBSCRIBE报文相同的报文标识符 [MQTT-3.10.4-4]。即使没有删除任何主题订阅,服务端也必须发送一个UNSUBACK响应 [MQTT-3.10.4-5]。