1.EMQX集群搭建
首先准备两台服务器,我这里用公司的服务器(10.xx.xx.111,10.xx.xx.138)做演示,首先在两台服务器安装emqx,具体过程参考MQTT协议的认识与应用
在111服务器上修改/etc/emqx.conf文件:
node.name = emqx@10.xx.xx.111
在138服务器上修改/etc/emqx.conf文件:
node.name = emqx@10.xx.xx.138
然后启动两个节点,启动完成后,在138服务器上执行以下命令
./bin/emqx_ctl cluster join emqx@10.xx.xx.111
然后查看监控页面:
这样,一个两个节点的集群就搭建好了
2.关闭匿名访问
在生产环境中,我们需要关闭设备的匿名链接,需要修改emqx的配置
修改/etc/emqx.conf
allow_anonymous = false(默认为true)
这样我们就关闭匿名访问了
3.WebHook的使用
emqx使用webhook首先需要开启emqx_web_hook插件,然后修改/etc/plugins/emqx_web_hook.conf里面的配置:
web.hook.api.url = http://10.xx.xx.222:8081/webHook (改为你的web服务接口地址)
web.hook.rule.client.disconnected.1 = {"action": "on_client_disconnected"} (取消注释)
接口简单测试:
@PostMapping("/webHook")
@ResponseBody
public void test(@RequestBody JSONObject body){
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss");
System.out.println(body.getString("action"));
System.out.println(body.getString("reason"));
System.out.println(body.getString("node"));
System.out.println(body.getString("clientid"));
System.out.println(simpleDateFormat.format(new Date(body.getLong("disconnected_at"))));
System.out.println(body.getString("username"));
}
我们使用MQTT.fx链接,然后再断开,查看控制台:
当然,如果是集群部署的话,需要每一台都进行配置修改
这就是webhook的简单使用
注意:接触到规则引擎后,我不太推荐使用配置的webhook,原因之一是,如果emqx是集群部署,触发webhook则需要每台机器都配置相同的配置文件。但是使用规则引擎后,只需要创建规则即可在所有节点使用,方便高效便于管理。配置的webhook实现的功能均可用规则引擎实现。 第二,配置的webhook只能设置一个接口,倘若要检测设备的在线离线,只能用一个接口,传入的参数不同而且还需要在接口内部做处理,显的代码臃肿。使用规则引擎配置两条规则即可,方便维护,代码简洁
4.代理订阅
EMQ X 的代理订阅功能使得客户端在连接建立时,不需要发送额外的 SUBSCRIBE 报文,便能自动建立用户预设的订阅关系。
首先我们需要开启emqx_mod_subscription模块,然后修改/etc/emqx.conf文件,在末尾(为了方便查找)加入规则:
module.subscription.1.topic = client/%c
module.subscription.2.topic = user/%u
module.subscription.2.qos = 2
module.subscription.2.nl = 1
module.subscription.2.rap = 1
module.subscription.2.rh = 1
在配置代理订阅的主题时,EMQ X 提供了 %c 和 %u 两个占位符供用户使用,EMQ X 会在执行代理订阅时将配置中的 %c 和 %u 分别替换为客户端的 Client ID 和 Username,需要注意的是,%c 和 %u 必须占用一整个主题层级。
关于nl( No Local)、rap(Retain As Published)、rh(Retain Handling)的具体含义参考这篇文章:https://blog.csdn.net/emqx_broker/article/details/107655500
然后我们使用MQTT.fx模拟设备接入,查看该设备订阅的主题:
5.设备接入认证
我们这里先用最简单的认证试一下,使用传统的username与password,首先我们开启emqx_auth_username模块,然后我们在模块管理页面加入一个用户。
我们来用MQTT.fx模拟链接,当不配置用户名密码时:
然后我们配置上用户名密码:
再次链接后就可以了
6.MySql认证设备
传统的username与password感觉安全性还是低,我们使用mysql认证:
首先需要禁用emqx_auth_username模块,然后开启emqx_auth_mysql模块,开启mysql模块之前,需要先配置一下:
## 服务器地址
auth.mysql.server = 127.0.0.1:3306
## 连接池大小
auth.mysql.pool = 8
auth.mysql.username = emqx
auth.mysql.password = public
auth.mysql.database = mqtt
auth.mysql.query_timeout = 5s
默认表结构:
CREATE TABLE `mqtt_user` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`username` varchar(100) DEFAULT NULL,
`password` varchar(100) DEFAULT NULL,
`salt` varchar(35) DEFAULT NULL,
`is_superuser` tinyint(1) DEFAULT 0,
`created` datetime DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `mqtt_username` (`username`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
默认用户:
INSERT INTO `mqtt_user` ( `username`, `password`, `salt`)
VALUES
('emqx', 'efa1f375d76194fa51a3556a97e641e61685f914d446979da50a551a4333ffd7', NULL);
然后我们使用emqx、public就可以进行设备接入了
加盐认证:
当然我们也可以使用加盐认证,首先需要更改一下配置:
auth.mysql.password_hash = md5,salt (salt在前代表 salt + 密码进行sha256加密,salt在后代表 密码 + salt进行sha256加密)
auth.mysql.auth_query = select password,salt from mqtt_user where username = '%u' limit 1
这样以后,我们就需要在mysql的用户表中加入salt的内容,也就是盐值。
password根据你选取是前缀还是后缀进行加密后,插入到数据库中,这样就完成了mysql加盐认证
7.MySQL ACL
MySQL ACL 使用外部 MySQL 数据库存储 ACL 规则,可以存储大量数据、动态管理 ACL,方便与外部设备管理系统集成
配置与mysql认证一样,但需要新增acl表
CREATE TABLE `mqtt_acl` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`allow` int(1) DEFAULT 1 COMMENT '0: deny, 1: allow',
`ipaddr` varchar(60) DEFAULT NULL COMMENT 'IpAddress',
`username` varchar(100) DEFAULT NULL COMMENT 'Username',
`clientid` varchar(100) DEFAULT NULL COMMENT 'ClientId',
`access` int(2) NOT NULL COMMENT '1: subscribe, 2: publish, 3: pubsub',
`topic` varchar(100) NOT NULL DEFAULT '' COMMENT 'Topic Filter',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
- allow:禁止(0),允许(1)
- ipaddr:设置 IP 地址
- username:连接客户端的用户名,此处的值如果设置为
$all
表示该规则适用于所有的用户 - clientid:连接客户端的 Client ID
- access:允许的操作:订阅(1),发布(2),订阅发布都可以(3)
- topic:控制的主题,可以使用通配符,并且可以在主题中加入占位符来匹配客户端信息,例如
t/%c
则在匹配时主题将会替换为当前客户端的 Client ID- %u:用户名
- %c:Client ID
acl规则测试语句
-- 所有用户不可以订阅系统主题
INSERT INTO mqtt_acl (allow, ipaddr, username, clientid, access, topic) VALUES (0, NULL, '$all', NULL, 1, '$SYS/#');
-- 允许 10.59.1.100 上的客户端订阅系统主题
INSERT INTO mqtt_acl (allow, ipaddr, username, clientid, access, topic) VALUES (1, '10.59.1.100', NULL, NULL, 1, '$SYS/#');
-- 禁止客户端订阅 /smarthome/+/temperature 主题
INSERT INTO mqtt_acl (allow, ipaddr, username, clientid, access, topic) VALUES (0, NULL, NULL, NULL, 1, '/smarthome/+/temperature');
-- 允许客户端订阅包含自身 Client ID 的 /smarthome/${clientid}/temperature 主题
INSERT INTO mqtt_acl (allow, ipaddr, username, clientid, access, topic) VALUES (1, NULL, NULL, NULL, 1, '/smarthome/%c/temperature');
-- 禁止客户端订阅 /test/topic 主题
INSERT INTO mqtt_acl (allow, ipaddr, username, clientid, access, topic) VALUES (0, NULL, NULL, NULL, 1, '/test/topic');
然后我们用MQTT.fx订阅/test/topic主题,向主题发送消息:
然后我们查看log
说明我们的Acl已经生效了
这里踩坑:默认表结构的字段类型不可更改差异太大,我这里因为allow只有两种情况,很适合用bit,就修改为bit类型了,然后会导致acl失效,但是id的int类型改为bigint类型没有问题,所以这里要注意一下
8.规则引擎简单实用
规则引擎是emqx很强大的功能,企业版可直接将数据写入mysql,减少项目中业务复杂性等。
首先我们打开dashboard,点击规则引擎,点击规则,然后新建规则:
支持多种事件规则,如设备的上线下线等,这里用下线举例子,规则sql如下:
SELECT
*
FROM
"$events/client_disconnected"
然后我们测试一下sql,点击下面的sql测试,然后查看查出来的数据:
这是设备下线时,emqx内部查出来的数据,我们如何写入到数据库中呢?企业版的emqx中提供通过sql语句将数据直接写入mysql,我们这里使用开源版,通过webhook将数据发送到web服务中,我们内部处理。
下面添加响应动作:
在添加响应动作需要关联资源,我们这里新增一个资源:
添加资源时,只需要填入接口地址即可,回到添加响应动作页面,勾选这个资源即可,path与消息内容模板不需要填写,即emqx查出来的数据是什么样,接受的数据就是什么样。
然后我们点击新建,这样一个规则就建立好了,接下来我们测试一下,用MQTT.fx链接emqx,然后emqx后触发规则并将数据传到web服务中:
我们在服务打个断点查看数据:
web服务已经接受到了信息,接下来我们可以再web服务中处理、存库等操作