数据下行也是一样的,只要mqtt客户端(设备)订阅某个topic,业务端往这个topic中发布一个消息,设备端就会收到这个消息。
image.png
对于设备下发操作,阿里云提供了在线调试的功能:
image.png但是在线调试只支持“属性调试”,“服务调用”,“远程登录”功能,这个功能模块都是在阿里云自己封装的业务基础上调试的:
image.png但是还是那句话原生的就是mqqt的发布订阅,这些只不过是阿里云帮你封装的功能,我们还是用刚才的那个topic自己实现发布和订阅,也就是发布和订阅都是一个设备(当然这有个前提就是我们创建的这个topic是同时支持发布和订阅的)。

1-mqqt.fx订阅自定义topic

image.png

订阅成功同一个topic

2-mqqt.fx模拟业务端发送开机指令

2.1-topic发布消息

image.png因为我们发布订阅都是同一个设备,从日志中可以看到我们先是发布成功了一条消息,马上同一个设备收到了一条消息

2.2-查看阿里云日志

image.png从日志的时间顺序上可以看出,显示设备发送了一条信息,然后云帮我们把这个消息发送到了设备,也就是这个topic的订阅者。

2.2.1-设备到云(publish)

image.png

2.2.2-云到设备(subscribe)

image.png

2.2.3-设备本地日志

image.png
image.png
到这里数据下行就算结束了。也就是说在实际的业务场景中你可以在你的管理后台搞一个mqqt客户端去pulish这些消息远程管理这个设备。同时阿里云也提供了http api的方式封装了对topic消息的发布,阿里云内部的broker自己帮你转发消息,那么你的业务系统就不需要维护mqqt协议的组建了,和传统的http方式开发试一样的。

3-http api模拟业务端发送开机指令

https://help.aliyun.com/document_detail/69893.html#title-hf1-90r-55o
image.png
https://help.aliyun.com/document_detail/69793.htm?spm=a2c4g.11186623.0.0.50761c80eJ32FT#reference-k3b-yrz-wdb

3.1-根据文档编写代码

先找到“三元组”:
image.png

  1. import requests
  2. import uuid
  3. import datetime
  4. import hmac
  5. import hashlib
  6. import base64
  7. import pprint
  8. # http api实现自定义topic发送消息 从云到设备
  9. def percent_encoding(string):
  10. """
  11. 满足RFC3986编码的规则函数
  12. """
  13. result = ''
  14. accepted = [c for c in 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-._~'.encode('utf-8')]
  15. for char in string.encode('utf-8'):
  16. result += chr(char) if char in accepted else '%{}'.format(hex(char)[2:]).upper()
  17. return result
  18. def Pub(content=""):
  19. base_url = "https://iot.cn-shenzhen.aliyuncs.com/?" # 基础url
  20. AccessKey_Secret = "ZEZHyWv8hh06qq6uVQbky1ejXR0iBA" # 阿里云AccessKey_Secret
  21. params = {
  22. # 公共参数
  23. "Format": "json", # 返回内容格式为json
  24. "Version": "2018-01-20", # 固定值
  25. "AccessKeyId": "LTAI5tArgc71UVg8JRTkUar7", # 阿里云AccessKeyId
  26. "SignatureMethod": "HMAC-SHA1", # 签名方式 固定值
  27. "SignatureVersion": "1.0", # 签名算法版本 固定值
  28. "SignatureNonce": str(uuid.uuid4()).replace("-", ""), # 随机字符串
  29. "RegionId": "cn-shenzhen", # 设备所在地域
  30. "Timestamp": str(datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")), # 当前请求的时间戳 根据请求动态生成
  31. # 业务接口的自定义参数
  32. "Action": "Pub",
  33. "MessageContent": str(base64.b64encode(content.encode('utf-8')), "utf-8"), # 消息内容
  34. "ProductKey": "guonWDsfHNn", # 产品的ProductKey。
  35. "TopicFullName": "/guonWDsfHNn/device1/user/boot", # 设备订阅的topic
  36. "IotInstanceId": "iot-040a0aee", # iot实例ID
  37. }
  38. pprint.pprint(params)
  39. # 1-构造规范化的请求字符串
  40. # 参数排序后 ket value需要编码 且用$符号连接
  41. src = '&'.join(['%s=%s' % (percent_encoding(k), percent_encoding(v)) for k, v in sorted(params.items())])
  42. # 2-准备构造签名字符串
  43. string_to_sign = "GET" + "&" + percent_encoding("/") + "&" + percent_encoding(src)
  44. # 3-HMAC签名串
  45. hmac_string = hmac.new((AccessKey_Secret + "&").encode(),
  46. string_to_sign.encode(),
  47. hashlib.sha1)
  48. # 4-转换base64格式
  49. signature = base64.b64encode(hmac_string.digest()).strip()
  50. url = base_url + src + "&Signature=" + percent_encoding(signature.decode())
  51. res = requests.get(url)
  52. pprint.pprint(res.json())
  53. return 1
  54. if __name__ == "__main__":
  55. res = Pub(content="请你再次开机")

返回体:

  1. {
  2. 'MessageId': 1499044502701082112,
  3. 'RequestId': 'C08C4A2F-FB22-50F0-9F13-C6E041E195AB',
  4. 'Success': True
  5. }

3.2-查看阿里云日志

image.png由两条操作行为日志,从时间上看应该是api调用在前面,然后阿里云帮我们发布消息实现从云到设备,也就是业务系统中的远程控制功能。
image.png
image.png
而且在阿里云看来这属于同一条业务意义上的消息所以你看他的traceId是一样的
image.png
但是两次请求的消息内容是不一样的
image.png