目的:
- 匹配高中信息技术物联网部分的教学目标。将MQTT作为主要讲授内容。
- 在Python编辑器中,可以通过mqtt与硬件之间进行交互。
- 将账号云广播背后的MQTT的概念暴露出来,讲MQTT与物联网的关系而不是账号云广播与物联网的关系。通过mqtt的应用来讲解物联网。
- 通过mqtt帮助实现数据可视化。
物联网知识的可能学习路径:
阶段1: 账号云广播(体验)
阶段2: mqtt 入门(概念认知与简单应用) mkcloud + cyberpi.mqtt (基于慧编程的服务器和mkcloud)
阶段3: mqtt 进阶 (进阶的应用) mqtt.MQTTClient(硬件) + paho-mqtt(python 第三方库)
使用场景:
教学当中mqtt使用的服务器为broker.emqx.io(研发表示不建议使用mq.makeblock.com)。
1.童芯派和Python编辑器通过mqtt实现通信。比如童芯派传感器收集到的数值能够发送至MQTT服务器上,Python编辑器能够通过mqtt服务器获取到童芯派发送的数值。
2.童芯派与童芯派之间实现mqtt通信。 比如通过MQTT,童芯派A能够控制童芯派B的灯光亮灭。
3.童芯派自发自收。
当前状态:
mkcloud部分的mqtt功能已经开发完成。
mkcloud库中关于mqtt功能的定义:(童芯派的MQTT API 参照下方,保持一致。前缀最好是cyberpi, 如连接MQTT,cyberpi.mqtt.connect() )具体实现上请研发同事进行评估。
需要注意的点:
mkcloud的发送的topic会默认添加一个”MB_PY/“的前缀。设定接收话题的时候也会。
童芯派上要能够实现中文信息的收发。(mkcloud可以实现中文的收发),目前好像还不行。
mkcloud下的mqtt功能实现
# python 3.6
import random
import time
from paho.mqtt import client as mqtt_client
class MqTT:
# default
broker = 'broker.emqx.io'
port = 1883
# generate client ID with pub prefix randomly
client_id = f'py-mqtt-mb-{random.randint(0, 1000)}'
mq_client = mqtt_client.Client(client_id)
is_connect = False
topic_prefix = "MB_PY/"
callback = None
is_on_message = False
def __init__(self):
pass
# version
def version(self):
return "1.0.1"
# set broker
def set_broker(self,client_id,server,port):
if client_id != None:
self.client_id = client_id
self.broker = server
self.port = port
# connect
def connect(self):
self.connect_mq()
self.loop_start()
time.sleep(1.8)
return self.is_connect
# connect server
def connect_mq(self):
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
self.is_connect = True
else:
self.is_connect = False
print("Failed to connect, return code %d\n", rc)
self.mq_client = mqtt_client.Client(self.client_id)
self.mq_client.on_connect = on_connect
self.mq_client.connect(self.broker, self.port)
return self.mq_client
# disconnect
def off(self):
self.is_on_message == False
self.is_connect = False
self.mq_client.disconnect()
# publish
def publish_message(self,topic="mb", data="msg"):
topic = self.topic_prefix + topic
result = self.mq_client.publish(topic, data)
# result: [0, 1]
status = result[0] # 0 is ok
return status
def on_message(self,client, userdata, msg):
#print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
self.callback(msg.topic,msg.payload.decode())
# subscribe
def subscribe_message(self,topic):
topic = self.topic_prefix + topic
self.mq_client.subscribe(topic)
if self.is_on_message == False :
self.mq_client.on_message = self.on_message
self.is_on_message = True
# callback
def set_callback(self,callback):
self.callback = callback
self.mq_client.on_message = self.on_message
# loop_start
def loop_start(self):
self.mq_client.loop_start()
# loop_forever
def loop_forever(self):
self.mq_client.loop_forever()
# client
def get_client(self):
return self.mq_client
def get_broker(self):
return self.broker
def get_port(self):
return self.port
def get_topic_prefix(self):
return self.topic_prefix
mqtt = MqTT()
主要用到的API:
API | 功能介绍 |
---|---|
cyberpi.mqtt.set_broker(client_id, server, port) | 设定mqtt服务器。 参数: client_id:可以不填写。 server:填入服务器地址。str port:填入端口号;int |
cyberpi.mqtt.connect() | 连接至服务器,连接成功返回True,否则False |
cyberpi.mqtt.off() | 断开当前服务器的连接。 |
cyberpi.mqtt.publish_message(topic, data) | 推送消息至指定消息队列下。 参数: topic:消息队列名称(函数中会给用户的目标话题自动增加一个”MB_PY/“的前缀)。str data:信息内容(需要能支持中文信息的推送)。str |
cyberpi.mqtt.subscribe_message(topic) | 订阅指定的话题下的消息。 参数: topic:消息队列名称(函数中会给用户的目标话题自动增加一个”MB_PY/“的前缀)。str |
cyberpi.mqtt.set_callback(callback) | 设置回调函数,用以处理接收到的消息。(具体实现方式参考mkcloud的mqtt功能) 参数: callback为用户自定义的函数名称。 |