使用php代码实现发布订阅。
演示服务环境:Ubuntu18,php7.4,Mosquitto(mosquitto is an MQTT v3.1.1/v3.1 broker)

需要安装mosquitto 的php扩展

不像rabbitmq,MQTT无论是发布还是订阅都不会有任何触发事件,这里的解决方案是使用遗嘱消息LWT(Last Will & Testament)。当 broker 检测到网络故障、客户端异常等问题,需要关闭某个客户端的连接时,向该客户端发布一条消息(必须在connect之前调用)。

当客户端连接到Broker时,可以指定LWT,Broker会定期检测客户端是否有异常。
当客户端异常掉线时,Broker就往连接时指定的topic里推送当时指定的LWT消息。

客户端使用php,当设备的上、下线消息通过broker发送到到php的客户时,php客户端把设备信息存入数据库中,并维护上下线的状态。
附2. 实战模拟设备上线/下线(php-mosquitto) - 图1

1. 设备上线、下线及状态维护

1.1 Client 设备端上、下线(发布者)

使用PHP模拟两台设备来实现模拟上线、下线;

  1. 设备序列号作为topic;
  2. 设备上线将设备信息发布给订阅者;
  3. 设备下线,将下线状态发布给订阅者;

php模拟设备端发布者代码如下:
**publisher1.php**

  1. <?php
  2. //模拟设备上下线
  3. $client = new Mosquitto\Client();
  4. //设备上下线消息
  5. $payload = [
  6. 'status' => 'online', //上线标识
  7. 'device_sn' => 'AB9CDDBDB62D', //设备序列号
  8. ];
  9. $off_line = [
  10. 'status' => 'offline', //下线标识
  11. 'device_sn' => 'AB9CDDBDB62D',
  12. ];
  13. /**
  14. * setWill($topic, $payload[, $qos = 0, $retain = false])
  15. * 设置“遗嘱消息”,当 broker 检测到网络故障、客户端异常等问题,需要关闭某个客户端的连接时,向该客户端发布一条消息。必须在connect之前调用
  16. * $topic (string) – 发表遗嘱消息的主题。
  17. * $payload (string) – 要发送的数据。
  18. * $qos (int) –可选。服务质量。默认值为0.整数0,1或2。
  19. * $retain (boolean) – 可选。默认为false。设置为true,则该消息将被保留。
  20. */
  21. //设备下线消息,当网中断时,broker会推送给订阅者
  22. $client->setWill('device/AB9CDDBDB62D', json_encode($off_line), 2, false);
  23. //连接broker
  24. $client->connect('10.10.20.200', 1883, 5);
  25. //设备上线消息
  26. $client->publish('device/AB9CDDBDB62D', json_encode($payload), 2, false);
  27. $client->loopForever();

这里以设备序列号来区分不同的设备,再创建一个 publisher2.php 来模拟第二个设备,同 publisher1.php 里的代码一样,只需要更改下设备序列号即可以(如:改成 D49CDDBDB62D )。

注意:这里创建topic有一个规律,由device 和 序列号组成层级,不同的序列号只需要更改topic主题的第二层级。如 device/ab1、device/dc2。

这里设置了一个遗愿消息,当设备只要中断broker连接时,broker就会给订阅这个主题的订阅者推送一条消息。

1.2 Client PHP端(订阅者)

设备上线后,php端通过订阅所有设备topic后(topic多层通配符 # 匹配),就可以拿到设备信息;当设备下线后,php也可以拿到设备下线消息维护下线状态(通过遗愿消息LWT)。

**subscriber.php**

  1. <?php
  2. //Mosquitto客户端类
  3. $client = new Mosquitto\Client();
  4. //连接broker
  5. $client->connect('10.10.20.200', 1883, 5);
  6. //订阅设备上线、下线topic,服务质量为2
  7. $client->subscribe("device/#", 2);
  8. //$client->subscribe('device/AB9CDDBDB62D', 2);
  9. //逻辑编写,订阅的设备上线、下线消息
  10. $client->onMessage(function($message) {
  11. var_dump($message);
  12. //var_dump(json_decode($message->payload));
  13. });
  14. $client->loopForever();

打开三个客户端

  1. 执行 **php subscriber.php** ,该窗口没有内容输出;
  2. 分别执行 **php publisher1.php****php publisher2.php**,此时订阅者(subscriber.php)的窗口有两条内容打印出来:

    1. object(Mosquitto\Message)#3 (5) {
    2. ["mid"]=>
    3. int(1)
    4. ["topic"]=>
    5. string(19) "device/AB9CDDBDB62D"
    6. ["payload"]=>
    7. string(46) "{"status":"online","device_sn":"AB9CDDBDB62D"}"
    8. ["qos"]=>
    9. int(2)
    10. ["retain"]=>
    11. bool(false)
    12. }
    13. object(Mosquitto\Message)#3 (5) {
    14. ["mid"]=>
    15. int(2)
    16. ["topic"]=>
    17. string(19) "device/D49CDDBDB62D"
    18. ["payload"]=>
    19. string(46) "{"status":"online","device_sn":"D49CDDBDB62D"}"
    20. ["qos"]=>
    21. int(2)
    22. ["retain"]=>
    23. bool(false)
    24. }

    说明己经获取到两个设备上线的信息。

  3. 结束 **publisher1.php****publisher2.php** 这两个模拟设备的进程,订阅者(subscriber.php)的窗口有两条设备下线的消息(遗愿消息):

    1. object(Mosquitto\Message)#3 (5) {
    2. ["mid"]=>
    3. int(3)
    4. ["topic"]=>
    5. string(19) "device/AB9CDDBDB62D"
    6. ["payload"]=>
    7. string(47) "{"status":"offline","device_sn":"AB9CDDBDB62D"}"
    8. ["qos"]=>
    9. int(2)
    10. ["retain"]=>
    11. bool(false)
    12. }
    13. object(Mosquitto\Message)#3 (5) {
    14. ["mid"]=>
    15. int(4)
    16. ["topic"]=>
    17. string(19) "device/D49CDDBDB62D"
    18. ["payload"]=>
    19. string(47) "{"status":"offline","device_sn":"D49CDDBDB62D"}"
    20. ["qos"]=>
    21. int(2)
    22. ["retain"]=>
    23. bool(false)
    24. }

    2. 给设备发送消息

    如何给指定设备推送消息?

上面的代码,我们创建topic时,有固定规则 device 后面再跟上 设备序列号,为每个设备创建了一个topic,即 device/设备序列号,设备上、下线时,我们通过多层通配符 # 来匹配所有设备。

当我们需要给某一个设备或者某几个设备发送消息指令时,就可以通过准确匹配topic来实现。

2.1. Client PHP端(发布者)

给设备发送消息指令,如开关机指令等。
**pulisher.php**

  1. <?php
  2. /**
  3. * mqtt 发布者类
  4. * @author maclechan@qq.com
  5. * @date 2021-08-13
  6. * 参考文档 https://www.kancloud.cn/liao-song/mosquitto-php/500403
  7. */
  8. class MQTTPublisher
  9. {
  10. /**
  11. * @var \Mosquitto\client
  12. */
  13. private $mqtt;
  14. /**
  15. * mqtt服务地址
  16. * @var string
  17. */
  18. private $host;
  19. /**
  20. * mqtt服务端口
  21. * @var int
  22. */
  23. private $port;
  24. /**
  25. * 静态私有的变量保存该类对象
  26. * @var $instance
  27. */
  28. private static $instance;
  29. /**
  30. * 初始化
  31. */
  32. private function __construct()
  33. {
  34. $this->host = '10.10.20.200';
  35. $this->port = 1883;
  36. //Mosquitto客户端类
  37. $this->mqtt = new \Mosquitto\client();
  38. /**
  39. * 设置连接到服务器的用户名和密码。必须在connect之前调用
  40. * setCredentials($username, $password)
  41. * $username (string) -连接到代理的用户名。
  42. * $password (string) -连接到代理的密码。
  43. */
  44. //$this->mqtt->setCredentials('root', '123456');
  45. /**
  46. * 连接地址
  47. * connect($host[, $port = 1883, $keepalive = 60, $interface = null])
  48. * $host (string) – 连接的主机名
  49. * $port (int) – 可选。 要连接的端口号。默认为1883。
  50. * $keepalive (int) – 可选。在没有收到消息的情况下,服务器应该ping客户端的部分数量。
  51. * $interface (string) – 可选。要为此连接绑定的本地接口的地址或主机名。
  52. */
  53. $this->mqtt->connect($this->host, $this->port, 5);
  54. }
  55. //防止实例被克隆(这会创建实例的副本)
  56. private function __clone(){}
  57. /**
  58. * 实例化类
  59. */
  60. public static function getInstance()
  61. {
  62. if (!self::$instance instanceof self) {
  63. self::$instance = (new self());
  64. }
  65. return self::$instance;
  66. }
  67. /**
  68. * 发布消息到Broker
  69. * @param string $topic 主题
  70. * @param string $payload 消息体
  71. * @param int $qos 服务质量,值[0,1,2]
  72. * @param bool $retain 是否保留此消息,默认为false
  73. */
  74. public function publisher($topic, $payload, $qos,$retain = false)
  75. {
  76. //Returns: 服务器返回该消息ID(警告:消息ID并不是唯一的)
  77. $mgid = $this->mqtt->publish($topic, $payload, $qos, $retain);
  78. if ($mgid) echo '推送成功'.$mgid."\r\n";
  79. $this->mqtt->loopForever();
  80. }
  81. }
  82. //发布测试代码
  83. $data = [
  84. 'code' => 0,
  85. 'message' => '成功',
  86. 'data' => [
  87. 'id' => 1,
  88. 'name' => '设备号名称',
  89. 'linktel' => 13251079793
  90. ]
  91. ];
  92. //通过更换topic,来实现指定某些设备
  93. $client = MQTTPublisher::getInstance()->publisher('device/AB9CDDBDB62D',json_encode($data),2);

如果想批量给某些设备,这里自行修改代码逻辑实现。

2.1 Client 设备端(订阅者)

模拟AB9CDDBDB62D这个设备。
subscriber1.php

  1. <?php
  2. $client = new Mosquitto\Client();
  3. $client->connect('10.10.20.200', 1883, 5);
  4. $client->subscribe('device/AB9CDDBDB62D', 2);//订阅topic为chan的消息
  5. $client->onMessage(function($m) {
  6. var_dump($m);
  7. });
  8. $client->loopForever();

执行 php subscriber1.php ,窗口处于等待状态;
重新打开一新窗口,然后执行 php publisher.php

订阅者窗口开始有数据出来,发布成功,打印消息如下:

  1. object(Mosquitto\Message)#3 (5) {
  2. ["mid"]=>
  3. int(1)
  4. ["topic"]=>
  5. string(19) "device/AB9CDDBDB62D"
  6. ["payload"]=>
  7. string(113) "{"code":0,"message":"\u6210\u529f","data":{"id":1,"name":"\u8bbe\u5907\u53f7\u540d\u79f0","linktel":13151179793}}"
  8. ["qos"]=>
  9. int(2)
  10. ["retain"]=>
  11. bool(false)
  12. }