image.png
你看阿里云把“服务端订阅”和“云产品流转”都划分在”规则引擎“这个业务下面,证明两者属于同意类型的业务场景。
比如你只有阿里云物联网产品没有其他云产品无法做云产品流转,但是于此同时你又需要获取这些上报的数据你会怎么办?那这就是“服务端订阅”。https://help.aliyun.com/document_detail/89225.html
image.png

1-创建消费组

image.png

2-创建订阅

image.png

3-服务端开发

https://help.aliyun.com/document_detail/175270.html

  1. # encoding=utf-8
  2. import time
  3. import sys
  4. import hashlib
  5. import hmac
  6. import base64
  7. import stomp
  8. import ssl
  9. import schedule
  10. import threading
  11. def connect_and_subscribe(conn):
  12. accessKey = " "
  13. accessSecret = " "
  14. consumerGroupId = " "
  15. # iotInstanceId:实例ID。
  16. iotInstanceId = " "
  17. clientId = "12345678"#这个客户端id自定义
  18. # 签名方法:支持hmacmd5,hmacsha1和hmacsha256。
  19. signMethod = "hmacsha1"
  20. timestamp = current_time_millis()
  21. # userName组装方法,请参见AMQP客户端接入说明文档。
  22. # 若使用二进制传输,则userName需要添加encode=base64参数,服务端会将消息体base64编码后再推送。具体添加方法请参见下一章节“二进制消息体说明”。
  23. username = clientId + "|authMode=aksign" + ",signMethod=" + signMethod \
  24. + ",timestamp=" + timestamp + ",authId=" + accessKey \
  25. + ",iotInstanceId=" + iotInstanceId \
  26. + ",consumerGroupId=" + consumerGroupId + "|"
  27. signContent = "authId=" + accessKey + "&timestamp=" + timestamp
  28. # 计算签名,password组装方法,请参见AMQP客户端接入说明文档。
  29. password = do_sign(accessSecret.encode("utf-8"), signContent.encode("utf-8"))
  30. conn.set_listener('', MyListener(conn))
  31. conn.connect(username, password, wait=True)
  32. # 清除历史连接检查任务,新建连接检查任务
  33. schedule.clear('conn-check')
  34. schedule.every(1).seconds.do(do_check, conn).tag('conn-check')
  35. class MyListener(stomp.ConnectionListener):
  36. def __init__(self, conn):
  37. self.conn = conn
  38. def on_error(self, frame):
  39. print('received an error "%s"' % frame.body)
  40. def on_message(self, frame):
  41. print('received a message "%s"' % frame.body)
  42. def on_heartbeat_timeout(self):
  43. print('on_heartbeat_timeout')
  44. def on_connected(self, headers):
  45. print("successfully connected")
  46. conn.subscribe(destination='/topic/#', id=1, ack='auto')
  47. print("successfully subscribe")
  48. def on_disconnected(self):
  49. print('disconnected')
  50. connect_and_subscribe(self.conn)
  51. def current_time_millis():
  52. return str(int(round(time.time() * 1000)))
  53. def do_sign(secret, sign_content):
  54. m = hmac.new(secret, sign_content, digestmod=hashlib.sha1)
  55. return base64.b64encode(m.digest()).decode("utf-8")
  56. # 检查连接,如果未连接则重新建连
  57. def do_check(conn):
  58. print('check connection, is_connected: %s', conn.is_connected())
  59. if (not conn.is_connected()):
  60. try:
  61. connect_and_subscribe(conn)
  62. except Exception as e:
  63. print('disconnected, ', e)
  64. # 定时任务方法,检查连接状态
  65. def connection_check_timer():
  66. while 1:
  67. schedule.run_pending()
  68. time.sleep(10)
  69. # 接入域名,请参见AMQP客户端接入说明文档。这里直接填入域名,不需要带amqps://前缀
  70. conn = stomp.Connection([('xxxx.amqp.iothub.aliyuncs.com', 61614)])
  71. conn.set_ssl(for_hosts=[('xxxx.amqp.iothub.aliyuncs.com', 61614)], ssl_version=ssl.PROTOCOL_TLS)
  72. try:
  73. connect_and_subscribe(conn)
  74. except Exception as e:
  75. print('connecting failed')
  76. raise e
  77. # 异步线程运行定时任务,检查连接状态
  78. thread = threading.Thread(target=connection_check_timer)
  79. thread.start()

4-mqtt.fx模拟设备上报

image.png

5-日志查看

image.png
image.png
image.png

6-云产品流转实现amqp

在云产品流转的时候也可以选择流转到服务端订阅,感觉这种方式可以可无,非要说的话可以用云产品流转的数据脚本解析的功能等。

image.png