1.EMQX集群搭建

首先准备两台服务器,我这里用公司的服务器(10.xx.xx.111,10.xx.xx.138)做演示,首先在两台服务器安装emqx,具体过程参考MQTT协议的认识与应用
在111服务器上修改/etc/emqx.conf文件:

  1. node.name = emqx@10.xx.xx.111

在138服务器上修改/etc/emqx.conf文件:

  1. node.name = emqx@10.xx.xx.138

然后启动两个节点,启动完成后,在138服务器上执行以下命令

  1. ./bin/emqx_ctl cluster join emqx@10.xx.xx.111

然后查看监控页面:
image.png
这样,一个两个节点的集群就搭建好了

2.关闭匿名访问

在生产环境中,我们需要关闭设备的匿名链接,需要修改emqx的配置
修改/etc/emqx.conf

  1. allow_anonymous = false(默认为true)

这样我们就关闭匿名访问了

3.WebHook的使用

emqx使用webhook首先需要开启emqx_web_hook插件,然后修改/etc/plugins/emqx_web_hook.conf里面的配置:

  1. web.hook.api.url = http://10.xx.xx.222:8081/webHook (改为你的web服务接口地址)
  2. web.hook.rule.client.disconnected.1 = {"action": "on_client_disconnected"} (取消注释)

接口简单测试:

  1. @PostMapping("/webHook")
  2. @ResponseBody
  3. public void test(@RequestBody JSONObject body){
  4. SimpleDateFormat simpleDateFormat = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss");
  5. System.out.println(body.getString("action"));
  6. System.out.println(body.getString("reason"));
  7. System.out.println(body.getString("node"));
  8. System.out.println(body.getString("clientid"));
  9. System.out.println(simpleDateFormat.format(new Date(body.getLong("disconnected_at"))));
  10. System.out.println(body.getString("username"));
  11. }

我们使用MQTT.fx链接,然后再断开,查看控制台:
image.png
当然,如果是集群部署的话,需要每一台都进行配置修改
这就是webhook的简单使用

注意:接触到规则引擎后,我不太推荐使用配置的webhook,原因之一是,如果emqx是集群部署,触发webhook则需要每台机器都配置相同的配置文件。但是使用规则引擎后,只需要创建规则即可在所有节点使用,方便高效便于管理。配置的webhook实现的功能均可用规则引擎实现。 第二,配置的webhook只能设置一个接口,倘若要检测设备的在线离线,只能用一个接口,传入的参数不同而且还需要在接口内部做处理,显的代码臃肿。使用规则引擎配置两条规则即可,方便维护,代码简洁

4.代理订阅

EMQ X 的代理订阅功能使得客户端在连接建立时,不需要发送额外的 SUBSCRIBE 报文,便能自动建立用户预设的订阅关系。
首先我们需要开启emqx_mod_subscription模块,然后修改/etc/emqx.conf文件,在末尾(为了方便查找)加入规则:

  1. module.subscription.1.topic = client/%c
  2. module.subscription.2.topic = user/%u
  3. module.subscription.2.qos = 2
  4. module.subscription.2.nl = 1
  5. module.subscription.2.rap = 1
  6. module.subscription.2.rh = 1

在配置代理订阅的主题时,EMQ X 提供了 %c 和 %u 两个占位符供用户使用,EMQ X 会在执行代理订阅时将配置中的 %c 和 %u 分别替换为客户端的 Client ID 和 Username,需要注意的是,%c 和 %u 必须占用一整个主题层级。
关于nl( No Local)、rap(Retain As Published)、rh(Retain Handling)的具体含义参考这篇文章:https://blog.csdn.net/emqx_broker/article/details/107655500

然后我们使用MQTT.fx模拟设备接入,查看该设备订阅的主题:
image.png

5.设备接入认证

我们这里先用最简单的认证试一下,使用传统的username与password,首先我们开启emqx_auth_username模块,然后我们在模块管理页面加入一个用户。

我们来用MQTT.fx模拟链接,当不配置用户名密码时:
image.png
然后我们配置上用户名密码:
image.png
再次链接后就可以了

6.MySql认证设备

传统的username与password感觉安全性还是低,我们使用mysql认证:
首先需要禁用emqx_auth_username模块,然后开启emqx_auth_mysql模块,开启mysql模块之前,需要先配置一下:

  1. ## 服务器地址
  2. auth.mysql.server = 127.0.0.1:3306
  3. ## 连接池大小
  4. auth.mysql.pool = 8
  5. auth.mysql.username = emqx
  6. auth.mysql.password = public
  7. auth.mysql.database = mqtt
  8. auth.mysql.query_timeout = 5s

默认表结构:

  1. CREATE TABLE `mqtt_user` (
  2. `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  3. `username` varchar(100) DEFAULT NULL,
  4. `password` varchar(100) DEFAULT NULL,
  5. `salt` varchar(35) DEFAULT NULL,
  6. `is_superuser` tinyint(1) DEFAULT 0,
  7. `created` datetime DEFAULT NULL,
  8. PRIMARY KEY (`id`),
  9. UNIQUE KEY `mqtt_username` (`username`)
  10. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

默认用户:

  1. INSERT INTO `mqtt_user` ( `username`, `password`, `salt`)
  2. VALUES
  3. ('emqx', 'efa1f375d76194fa51a3556a97e641e61685f914d446979da50a551a4333ffd7', NULL);

然后我们使用emqx、public就可以进行设备接入了

加盐认证:
当然我们也可以使用加盐认证,首先需要更改一下配置:

  1. auth.mysql.password_hash = md5,salt (salt在前代表 salt + 密码进行sha256加密,salt在后代表 密码 + salt进行sha256加密)
  2. auth.mysql.auth_query = select password,salt from mqtt_user where username = '%u' limit 1

这样以后,我们就需要在mysql的用户表中加入salt的内容,也就是盐值。
image.png
password根据你选取是前缀还是后缀进行加密后,插入到数据库中,这样就完成了mysql加盐认证

7.MySQL ACL

MySQL ACL 使用外部 MySQL 数据库存储 ACL 规则,可以存储大量数据、动态管理 ACL,方便与外部设备管理系统集成
配置与mysql认证一样,但需要新增acl表

  1. CREATE TABLE `mqtt_acl` (
  2. `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  3. `allow` int(1) DEFAULT 1 COMMENT '0: deny, 1: allow',
  4. `ipaddr` varchar(60) DEFAULT NULL COMMENT 'IpAddress',
  5. `username` varchar(100) DEFAULT NULL COMMENT 'Username',
  6. `clientid` varchar(100) DEFAULT NULL COMMENT 'ClientId',
  7. `access` int(2) NOT NULL COMMENT '1: subscribe, 2: publish, 3: pubsub',
  8. `topic` varchar(100) NOT NULL DEFAULT '' COMMENT 'Topic Filter',
  9. PRIMARY KEY (`id`)
  10. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
  • allow:禁止(0),允许(1)
  • ipaddr:设置 IP 地址
  • username:连接客户端的用户名,此处的值如果设置为 $all 表示该规则适用于所有的用户
  • clientid:连接客户端的 Client ID
  • access:允许的操作:订阅(1),发布(2),订阅发布都可以(3)
  • topic:控制的主题,可以使用通配符,并且可以在主题中加入占位符来匹配客户端信息,例如t/%c则在匹配时主题将会替换为当前客户端的 Client ID
    • %u:用户名
    • %c:Client ID

acl规则测试语句

  1. -- 所有用户不可以订阅系统主题
  2. INSERT INTO mqtt_acl (allow, ipaddr, username, clientid, access, topic) VALUES (0, NULL, '$all', NULL, 1, '$SYS/#');
  3. -- 允许 10.59.1.100 上的客户端订阅系统主题
  4. INSERT INTO mqtt_acl (allow, ipaddr, username, clientid, access, topic) VALUES (1, '10.59.1.100', NULL, NULL, 1, '$SYS/#');
  5. -- 禁止客户端订阅 /smarthome/+/temperature 主题
  6. INSERT INTO mqtt_acl (allow, ipaddr, username, clientid, access, topic) VALUES (0, NULL, NULL, NULL, 1, '/smarthome/+/temperature');
  7. -- 允许客户端订阅包含自身 Client ID /smarthome/${clientid}/temperature 主题
  8. INSERT INTO mqtt_acl (allow, ipaddr, username, clientid, access, topic) VALUES (1, NULL, NULL, NULL, 1, '/smarthome/%c/temperature');
  9. -- 禁止客户端订阅 /test/topic 主题
  10. INSERT INTO mqtt_acl (allow, ipaddr, username, clientid, access, topic) VALUES (0, NULL, NULL, NULL, 1, '/test/topic');

然后我们用MQTT.fx订阅/test/topic主题,向主题发送消息:
image.png
然后我们查看log
image.png
说明我们的Acl已经生效了

这里踩坑:默认表结构的字段类型不可更改差异太大,我这里因为allow只有两种情况,很适合用bit,就修改为bit类型了,然后会导致acl失效,但是id的int类型改为bigint类型没有问题,所以这里要注意一下

8.规则引擎简单实用

规则引擎是emqx很强大的功能,企业版可直接将数据写入mysql,减少项目中业务复杂性等。
首先我们打开dashboard,点击规则引擎,点击规则,然后新建规则:
image.png
支持多种事件规则,如设备的上线下线等,这里用下线举例子,规则sql如下:

  1. SELECT
  2. *
  3. FROM
  4. "$events/client_disconnected"

然后我们测试一下sql,点击下面的sql测试,然后查看查出来的数据:
image.png
这是设备下线时,emqx内部查出来的数据,我们如何写入到数据库中呢?企业版的emqx中提供通过sql语句将数据直接写入mysql,我们这里使用开源版,通过webhook将数据发送到web服务中,我们内部处理。
下面添加响应动作:
image.png
在添加响应动作需要关联资源,我们这里新增一个资源:
image.png
添加资源时,只需要填入接口地址即可,回到添加响应动作页面,勾选这个资源即可,path与消息内容模板不需要填写,即emqx查出来的数据是什么样,接受的数据就是什么样。
然后我们点击新建,这样一个规则就建立好了,接下来我们测试一下,用MQTT.fx链接emqx,然后emqx后触发规则并将数据传到web服务中:
image.png
我们在服务打个断点查看数据:
image.png
web服务已经接受到了信息,接下来我们可以再web服务中处理、存库等操作