这一课,我们继续完成处理其他事件的代码,但是因为本课程的篇幅有限,这里只完成 IotHub 目前需要的 “client.disconnected” 和 “message.publish” 事件的处理代码,其他事件的处理很简单,只需要依葫芦画瓢就可以了。有需要的话,大家可以自行进行扩展。

处理 “client.disconnected” 事件

这个事件的处理和 “client.connected” 事件,不过需要过滤掉 client 因为用户名和密码没有通过认证,触发的 “client.disconnected”:

  1. %% emqx_rabbitmq_hook/src/emqx_rabbitmq_hook.erl
  2. on_client_disconnected(#{}, auth_failure, _Env) ->
  3. ok;
  4. on_client_disconnected(#{client_id := ClientId, username := Username}, ReasonCode, _Env) ->
  5. Reason = if
  6. is_atom(ReasonCode) ->
  7. ReasonCode;
  8. true ->
  9. unknown
  10. end,
  11. Doc = {
  12. client_id, ClientId,
  13. username, Username,
  14. disconnected_at, emqx_time:now_ms(),
  15. reason, Reason
  16. },
  17. emqx_rabbitmq_hook_cli:publish(bson_binary:put_document(Doc), <<"client.disconnected">>),
  18. ok.

我们通过参数的模式匹配,没通过认证的 “client.disconnected” 事件会落入第一个 on_client_disconnected 函数中,不作任何处理。

处理 “message.publish” 事件

在处理这个事件时,需要过滤掉来自系统主题的 Publish 事件:

  1. %% emqx_rabbitmq_hook/src/emqx_rabbitmq_hook.erl
  2. on_message_publish(Message = #message{topic = <<"$SYS/", _/binary>>}, _Env) ->
  3. {ok, Message};
  4. on_message_publish(Message = #message{topic = Topic, flags = #{retain := Retain}}, _Env) ->
  5. Username = case maps:find(username, Message#message.headers) of
  6. {ok, Value} -> Value;
  7. _ -> undefined
  8. end,
  9. Doc = {
  10. client_id, Message#message.from,
  11. username, Username,
  12. topic, Topic,
  13. qos, Message#message.qos,
  14. retained, Retain,
  15. payload, {bin, bin, Message#message.payload},
  16. published_at, emqx_time:now_ms(Message#message.timestamp)
  17. },
  18. emqx_rabbitmq_hook_cli:publish(bson_binary:put_document(Doc), <<"message.publish">>),
  19. {ok, Message}.

同样地,这里使用参数的模式匹配,来自于系统主题的 “message.publish” 事件会落入第一个 on_message_publish 函数中,不作任何处理。
这里使用 emqx_time:now_ms 函数获取到消息发布的以毫秒为单位的时间,这样可以解决之前 NTP 服务中以秒为单位而导致的计时不够精确的问题。

插件配置文件

.config 配置文件

插件的配置文件是放在emqx_rabbitmq_hook/etc/ 下的,默认情况下是一个 Erlang 风格的 .config 文件,这种配置文件实际上就是 Erlang 的源文件,内容是一个 Erlang 的列表,例如:

  1. %% emqx_rabbitmq_hook/etc/emqx_rabbitmq_hook.config
  2. [
  3. {emqx_rabbitmq_hook, [{enabled, true}]}
  4. ].

EMQ X 在启动的时候会加载这个列表,在插件里可以通过下面的方式读取到这个列表里元素的值:

  1. (emqx@127.0.0.1)1> application:get_env(emqx_rabbitmq_hook, enabled).
  2. {ok,true}

这种风格的配置文件对 Erlang 用户来说是没什么问题的,但是对非 Erlang 的用户来说,可读性还是稍微差了一点,EMQ X 3.0 以后提供了非 Erlang 格式的 .conf 配置文件,我们在之前的课程中已经见到过了:

  1. xxx.xx.xx = xxx

这种配置文件需要配置一个映射规则,在 EMQ X 启动时通过 cuttlefish 转换成上面的 Erlang 列表。接下来我们来看怎么做。

.conf 配置文件

映射文件

以是否监听 “client.connected” 事件的配置为例,首先新增配置文件:

  1. ### emqx_rabbitmq_hook/etc/emqx_rabbitmq_hook.conf
  2. hook.rabbitmq.client.connected = on

然后,新增映射规则:

  1. %% emqx_rabbitmq_hook/priv/emqx_rabbitmq_hook.schema
  2. {mapping, "hook.rabbitmq.client.connected", "emqx_rabbitmq_hook.client_connected", [
  3. {default, on},
  4. {datatype, flag}
  5. ]}.

映射规则文件其实也是一个 Erlang 的源文件,上面的代码将 “hook.rabbitmq.client.connected” 进行映射,并指定它的默认值和类型。
在 emqx-rabbitmq-hook 的 Makefile 里面指明了用 cuttlefish 对配置文件和映射文件进行处理:

  1. emqx_rabbitmq_hook/Makefile
  2. app.config::
  3. ./deps/cuttlefish/cuttlefish -l info -e etc/ -c etc/emqx_rabbitmq_hook.conf -i priv/emqx_rabbitmq_hook.schema -d data

重新编译后,运行 emqx-rel/_build/emqx/rel/emqx/bin/emqx console,在控制台中输入application:get_env(emqx_rabbitmq_hook, client_connected).,就可以获取这个配置项的值:

  1. emqx@127.0.0.1)1> application:get_env(emqx_rabbitmq_hook, client_connected).
  2. {ok,true}

函数的参数 (emqx_rabbitmq_hook, client_connected) 和在映射文件里面配置的”emqx_rabbitmq_hook.client_connected”是对应的。

更复杂的映射

在映射某些配置项时,还需要写一点代码,比如配置发布事件的 exchange 名时,RabbitMQ Erlang Client接受的 exchange 参数是二进制串,比如<<mqtt.events>>,而从 .conf 配置文件只能读取到字符串值,所以需要再做一个转化:

  1. %% emqx_rabbitmq_hook/priv/emqx_rabbitmq_hook.schema
  2. {mapping, "hook.rabbitmq.exchange", "emqx_rabbitmq_hook.exchange", [
  3. {default, "mqtt.events"},
  4. {datatype, string}
  5. ]}.
  6. {translation, "emqx_rabbitmq_hook.exchange", fun(Conf) ->
  7. list_to_binary(cuttlefish:conf_get("hook.rabbitmq.exchange", Conf))
  8. end}.

连接池 ecpool 的初始化方法接受的是一个配置项的列表,所以需要将配置文件中的 key-value 对转换成一个列表:

  1. %% emqx_rabbitmq_hook/priv/emqx_rabbitmq_hook.schema
  2. {translation, "emqx_rabbitmq_hook.server", fun(Conf) ->
  3. Pool = cuttlefish:conf_get("hook.rabbitmq.pool", Conf),
  4. Host = cuttlefish:conf_get("hook.rabbitmq.host", Conf),
  5. Port = cuttlefish:conf_get("hook.rabbitmq.port", Conf),
  6. [{pool_size, Pool},
  7. {host, Host},
  8. {port, Port}
  9. ]
  10. end}.

修改完配置映射文件后,需要重新编译。我们可以按照上面的规则继续添加更多的配置项。

使用配置项

最后一步是在代码里读取这些配置项,然后根据配置项的值进行相应操作。
初始化连接池:

  1. %% emqx_rabbitmq_hook/src/emqx_rabbitmq_hook_sub.erl
  2. init([]) ->
  3. {ok, PoolOpts} = application:get_env(?APP, server),
  4. PoolSpec = ecpool:pool_spec(?APP, ?APP, emqx_rabbitmq_hook_cli, PoolOpts),
  5. {ok, {{one_for_one, 10, 100}, [PoolSpec]}}.

设置 exchange:

  1. %% emqx_rabbitmq_hook/src/emqx_rabbitmq_hook_cli.erl
  2. ensure_exchange() ->
  3. {ok, ExchangeName} = application:get_env(?APP, exchange),
  4. ensure_exchange(ExchangeName).
  5. publish(Payload, RoutingKey) ->
  6. {ok, ExchangeName} = application:get_env(?APP, exchange),
  7. publish(ExchangeName, Payload, RoutingKey).

注册 Hook: 这里首先实现一个根据配置进行 hook 注册的工具方法:

  1. %% emqx_rabbitmq_hook/src/emqx_rabbitmq_hook.erl
  2. hookup(Event, ConfigName, Func, InitArgs) ->
  3. case application:get_env(?APP, ConfigName) of
  4. {ok, true} -> emqx:hook(Event, Func, InitArgs);
  5. _ -> ok
  6. end.

然后在插件加载时调用:

  1. %% emqx_rabbitmq_hook/src/emqx_rabbitmq_hook.erl
  2. load(Env) ->
  3. ...
  4. hookup('client.connected', client_connected, fun ?MODULE:on_client_connected/4, [Env]),
  5. hookup('client.disconnected', client_disconnected, fun ?MODULE:on_client_disconnected/3, [Env]),
  6. hookup('message.publish', message_publish, fun ?MODULE:on_message_publish/2, [Env]).

重新编译之后,加载插件,修改几个配置项以后再重新加载插件,可以观察配置项是否生效。

这一节我们完成了 emqx-rabbitmq-hook 的全部功能,下一节我们将在 IotHub 中使用 emqx-rabbitmq-hook。