1、心跳
客户端在连接到MQTT服务器时(connect方法),可在options中指定keepalive字段进行配置,默认为60s。称为最大空闲时间T,当客户端检测到当前连接空闲时间超过最大空闲时间时,则会想Broker发送一个结构为
{cmd: “pingreq”}
的心跳包,Broker会在收到消息后,返回一个心跳响应。结构为:
{
cmd: "pingresp"
dup: false
length: 0
payload: null
qos: 0
retain: false
topic: null
}
如果Broker在此客户端连接时设定的1.5T的时间内仍未收到心跳包,则会断开此客户端的连接。并投递 遗嘱消息 到订阅方。如果客户端在1.5T内未收到pingresp包,也会主动断开与服务器的连接。这样做的好处是弱化对带宽的依赖。
2、保留消息
在服务端收到一个retain标志位true的消息时,服务端在正常转发的情况下,会保留一份到本地。每个主题的保留消息只能有一条,就得会被新的所覆盖。当一个连接订阅这个主题时,保留消息会第一时间推送给订阅者。
EMQ保留消息功能由 emqx_retainer插件提供,这个插件是默认开启的
配置方式:
1.要禁用保留消息需要在etc/emqx.conf中修改mqtt.retain_available 为 false
如果在Broker端禁用了保留消息的情况下,任然收到了来自客户端的保留消息,会返回原因码为 0x9A(不支持保留消息)的 DISCONNECT 报文。
2.通过修改emqx_retainer 插件的配置,可以修改储存保留消息的位置,限制接收保留消息数量和 Payload 最大长度,以及调整保留消息的过期时间。
保留消息可以存储到外部数据库(企业版)
3、共享订阅
共享订阅在多个订阅者之间实现负载均衡的订阅方式。
EMQX提供两种方式的共享订阅:
实例 | 前缀 | 真实主题 |
---|---|---|
$queue/t/1 | $queue/ | t/1 |
$share/abc/t/1 | $share/abc | t/1 |
上表中$ queue和 $share/abc称为共享订阅前缀。
3.1 $share//
这种形式的共享订阅成为带群组的共享订阅,属于同一个群组的订阅者会以负载均衡的方式接收消息,但EMQX会在不同的群组之间广播消息。看一下EMQ文档中的一个例子:
实验:
1.材料:部署到linux的EMQ服务器,EMQ后台DashBoard,MQTTX客户端,VUE前端
2.步骤:
1)三个客户端都连接到EMQ,DashBoard和VUE前端订阅一个命名为$ share/group1/abc的主题,MQTTX订阅一个名为 $share/group2/abc的主题
2)在MQTTX客户端连续发布一个topic为abc的消息
3.预期:MQTTX一定会受到消息,因为group2群组只有它自己。DashBoard和VUE中每条消息有且只有一个会收到。因为他们在同一分组,遵循负载均衡的方式。
4.结果:
MQTTX端:
VUE端:
DashBoard端:
预期成立。
3.2$queue/
不带群组的共享订阅,与所有人都在同一个群组相同。
3.3 EMQ的均衡策略与派发ACK
EMQ的均衡策略和派发ACK可以在 etc/emqx.conf进行配置:
# etc/emqx.conf
# 均衡策略
broker.shared_subscription_strategy = random
# 适用于 QoS1 QoS2 消息,启用时在其中一个组离线时,将派发给另一个组
broker.shared_dispatch_ack_enabled = false
均衡策略配置可选项:
均衡策略 | 描述 |
---|---|
random | 在所有订阅者中随机选择 |
round_robin | 按照订阅顺序 |
sticky | 一直发往上次选取的订阅者 |
hash | 按照发布者 ClientID 的哈希值 |
4、延迟发布
延迟发布功能由 emqx_mod_delayed模块实现,默认启用
延迟发布与共享订阅类似,使用特殊主体前缀实现。主题结构为:
$delayed/{DelayInterval}/{TopicName}
- $delayed: 延迟发布主题前缀,使用此前缀的主题的消息都被视为延迟发布的消息
- {DelayInterval}:时间间隔,单位为秒,最大为4294967,若 {DelayInterval}无法被解析成一个整形数字,则这个消息会被丢弃。
- {TopicName}:真实主题名
案例:

5、代理订阅
代理订阅是在在客户端登录时,不需要发送额外的订阅消息即可自动建立预设好的订阅关系。
代理订阅功能由emqx_mod_subscription
内置模块提供。
配置方式(linux):编辑‘/etc/emqx/emqx.conf’ 文件,在最下方添加代理订阅规则即可。
代理订阅规则:
## 代理订阅的主题
module.subscription.<number>.topic = <topic>
## 代理订阅的订阅选项:QoS
## 可选值: 0、1、2
## 默认值:1
module.subscription.<number>.qos = <qos>
## 代理订阅的订阅选项:No Local
## 可选值: 0、1
## 默认值:0
module.subscription.<number>.nl = <nl>
## 代理订阅的订阅选项:Retain As Published
## 可选值: 0、1
## 默认值:0
module.subscription.<number>.rap = <rap>
## 代理订阅的订阅选项:Retain Handling
## 可选值: 0、1、2
## 默认值:0
module.subscription.<number>.rh = <rh>
举例:
module.subscription.1.topic = client/%c
module.subscription.2.topic = user/%u
module.subscription.2.qos = 2
module.subscription.2.nl = 1
module.subscription.2.rap = 1
module.subscription.2.rh = 1
// %c 和%u是通配符,%c代表clientid ,%u代表username
//订阅选项中的 No Local、Retain As Published、Retain Handling 仅支持 MQTT V5 协议
//在当前使用的协议版本小于5的时候,只有topic和qos生效。
//除topic外,别的配置选项都有默认值
6、集群部署
配置方式:
- 确保两台服务器可以相互通信
- 使用static方式创建集群,修改etc/emqx/emqx.conf文件中如下两处:
3.重启服务
7、消息桥接
EMQ的消息桥接分为两种:
- RPC模式:使用 Erlang RPC 协议的桥接方式,适合与另外一台EMQ服务器桥接,使用比较简单
- MQTT模式,使用 MQTT 协议、适合与另外一台MQTT服务器或者EMQ服务器桥接,如RocketMQ等。
下文中主要说明RPC模式.
发布者可以通过桥接发送消息到远程的EMQ服务器