使用php代码实现发布订阅。
演示服务环境:Ubuntu18,php7.4,Mosquitto(mosquitto is an MQTT v3.1.1/v3.1 broker)
需要安装mosquitto 的php扩展
不像rabbitmq,MQTT无论是发布还是订阅都不会有任何触发事件,这里的解决方案是使用遗嘱消息LWT(Last Will & Testament)。当 broker 检测到网络故障、客户端异常等问题,需要关闭某个客户端的连接时,向该客户端发布一条消息(必须在connect之前调用)。
当客户端连接到Broker时,可以指定LWT,Broker会定期检测客户端是否有异常。
当客户端异常掉线时,Broker就往连接时指定的topic里推送当时指定的LWT消息。
客户端使用php,当设备的上、下线消息通过broker发送到到php的客户时,php客户端把设备信息存入数据库中,并维护上下线的状态。
1. 设备上线、下线及状态维护
1.1 Client 设备端上、下线(发布者)
使用PHP模拟两台设备来实现模拟上线、下线;
- 设备序列号作为topic;
- 设备上线将设备信息发布给订阅者;
- 设备下线,将下线状态发布给订阅者;
php模拟设备端发布者代码如下:**publisher1.php**
<?php
//模拟设备上下线
$client = new Mosquitto\Client();
//设备上下线消息
$payload = [
'status' => 'online', //上线标识
'device_sn' => 'AB9CDDBDB62D', //设备序列号
];
$off_line = [
'status' => 'offline', //下线标识
'device_sn' => 'AB9CDDBDB62D',
];
/**
* setWill($topic, $payload[, $qos = 0, $retain = false])
* 设置“遗嘱消息”,当 broker 检测到网络故障、客户端异常等问题,需要关闭某个客户端的连接时,向该客户端发布一条消息。必须在connect之前调用
* $topic (string) – 发表遗嘱消息的主题。
* $payload (string) – 要发送的数据。
* $qos (int) –可选。服务质量。默认值为0.整数0,1或2。
* $retain (boolean) – 可选。默认为false。设置为true,则该消息将被保留。
*/
//设备下线消息,当网中断时,broker会推送给订阅者
$client->setWill('device/AB9CDDBDB62D', json_encode($off_line), 2, false);
//连接broker
$client->connect('10.10.20.200', 1883, 5);
//设备上线消息
$client->publish('device/AB9CDDBDB62D', json_encode($payload), 2, false);
$client->loopForever();
这里以设备序列号来区分不同的设备,再创建一个 publisher2.php
来模拟第二个设备,同 publisher1.php
里的代码一样,只需要更改下设备序列号即可以(如:改成 D49CDDBDB62D )。
注意:这里创建topic有一个规律,由device 和 序列号组成层级,不同的序列号只需要更改topic主题的第二层级。如 device/ab1、device/dc2。
这里设置了一个遗愿消息,当设备只要中断broker连接时,broker就会给订阅这个主题的订阅者推送一条消息。
1.2 Client PHP端(订阅者)
设备上线后,php端通过订阅所有设备topic后(topic多层通配符 # 匹配),就可以拿到设备信息;当设备下线后,php也可以拿到设备下线消息维护下线状态(通过遗愿消息LWT)。
**subscriber.php**
<?php
//Mosquitto客户端类
$client = new Mosquitto\Client();
//连接broker
$client->connect('10.10.20.200', 1883, 5);
//订阅设备上线、下线topic,服务质量为2
$client->subscribe("device/#", 2);
//$client->subscribe('device/AB9CDDBDB62D', 2);
//逻辑编写,订阅的设备上线、下线消息
$client->onMessage(function($message) {
var_dump($message);
//var_dump(json_decode($message->payload));
});
$client->loopForever();
打开三个客户端
- 执行
**php subscriber.php**
,该窗口没有内容输出; 分别执行
**php publisher1.php**
、**php publisher2.php**
,此时订阅者(subscriber.php)的窗口有两条内容打印出来:object(Mosquitto\Message)#3 (5) {
["mid"]=>
int(1)
["topic"]=>
string(19) "device/AB9CDDBDB62D"
["payload"]=>
string(46) "{"status":"online","device_sn":"AB9CDDBDB62D"}"
["qos"]=>
int(2)
["retain"]=>
bool(false)
}
object(Mosquitto\Message)#3 (5) {
["mid"]=>
int(2)
["topic"]=>
string(19) "device/D49CDDBDB62D"
["payload"]=>
string(46) "{"status":"online","device_sn":"D49CDDBDB62D"}"
["qos"]=>
int(2)
["retain"]=>
bool(false)
}
说明己经获取到两个设备上线的信息。
结束
**publisher1.php**
、**publisher2.php**
这两个模拟设备的进程,订阅者(subscriber.php)的窗口有两条设备下线的消息(遗愿消息):object(Mosquitto\Message)#3 (5) {
["mid"]=>
int(3)
["topic"]=>
string(19) "device/AB9CDDBDB62D"
["payload"]=>
string(47) "{"status":"offline","device_sn":"AB9CDDBDB62D"}"
["qos"]=>
int(2)
["retain"]=>
bool(false)
}
object(Mosquitto\Message)#3 (5) {
["mid"]=>
int(4)
["topic"]=>
string(19) "device/D49CDDBDB62D"
["payload"]=>
string(47) "{"status":"offline","device_sn":"D49CDDBDB62D"}"
["qos"]=>
int(2)
["retain"]=>
bool(false)
}
2. 给设备发送消息
如何给指定设备推送消息?
上面的代码,我们创建topic时,有固定规则 device 后面再跟上 设备序列号,为每个设备创建了一个topic,即 device/设备序列号,设备上、下线时,我们通过多层通配符 # 来匹配所有设备。
当我们需要给某一个设备或者某几个设备发送消息指令时,就可以通过准确匹配topic来实现。
2.1. Client PHP端(发布者)
给设备发送消息指令,如开关机指令等。**pulisher.php**
<?php
/**
* mqtt 发布者类
* @author maclechan@qq.com
* @date 2021-08-13
* 参考文档 https://www.kancloud.cn/liao-song/mosquitto-php/500403
*/
class MQTTPublisher
{
/**
* @var \Mosquitto\client
*/
private $mqtt;
/**
* mqtt服务地址
* @var string
*/
private $host;
/**
* mqtt服务端口
* @var int
*/
private $port;
/**
* 静态私有的变量保存该类对象
* @var $instance
*/
private static $instance;
/**
* 初始化
*/
private function __construct()
{
$this->host = '10.10.20.200';
$this->port = 1883;
//Mosquitto客户端类
$this->mqtt = new \Mosquitto\client();
/**
* 设置连接到服务器的用户名和密码。必须在connect之前调用
* setCredentials($username, $password)
* $username (string) -连接到代理的用户名。
* $password (string) -连接到代理的密码。
*/
//$this->mqtt->setCredentials('root', '123456');
/**
* 连接地址
* connect($host[, $port = 1883, $keepalive = 60, $interface = null])
* $host (string) – 连接的主机名
* $port (int) – 可选。 要连接的端口号。默认为1883。
* $keepalive (int) – 可选。在没有收到消息的情况下,服务器应该ping客户端的部分数量。
* $interface (string) – 可选。要为此连接绑定的本地接口的地址或主机名。
*/
$this->mqtt->connect($this->host, $this->port, 5);
}
//防止实例被克隆(这会创建实例的副本)
private function __clone(){}
/**
* 实例化类
*/
public static function getInstance()
{
if (!self::$instance instanceof self) {
self::$instance = (new self());
}
return self::$instance;
}
/**
* 发布消息到Broker
* @param string $topic 主题
* @param string $payload 消息体
* @param int $qos 服务质量,值[0,1,2]
* @param bool $retain 是否保留此消息,默认为false
*/
public function publisher($topic, $payload, $qos,$retain = false)
{
//Returns: 服务器返回该消息ID(警告:消息ID并不是唯一的)
$mgid = $this->mqtt->publish($topic, $payload, $qos, $retain);
if ($mgid) echo '推送成功'.$mgid."\r\n";
$this->mqtt->loopForever();
}
}
//发布测试代码
$data = [
'code' => 0,
'message' => '成功',
'data' => [
'id' => 1,
'name' => '设备号名称',
'linktel' => 13251079793
]
];
//通过更换topic,来实现指定某些设备
$client = MQTTPublisher::getInstance()->publisher('device/AB9CDDBDB62D',json_encode($data),2);
2.1 Client 设备端(订阅者)
模拟AB9CDDBDB62D这个设备。subscriber1.php
<?php
$client = new Mosquitto\Client();
$client->connect('10.10.20.200', 1883, 5);
$client->subscribe('device/AB9CDDBDB62D', 2);//订阅topic为chan的消息
$client->onMessage(function($m) {
var_dump($m);
});
$client->loopForever();
执行 php subscriber1.php
,窗口处于等待状态;
重新打开一新窗口,然后执行 php publisher.php
订阅者窗口开始有数据出来,发布成功,打印消息如下:
object(Mosquitto\Message)#3 (5) {
["mid"]=>
int(1)
["topic"]=>
string(19) "device/AB9CDDBDB62D"
["payload"]=>
string(113) "{"code":0,"message":"\u6210\u529f","data":{"id":1,"name":"\u8bbe\u5907\u53f7\u540d\u79f0","linktel":13151179793}}"
["qos"]=>
int(2)
["retain"]=>
bool(false)
}