NSQ 初识
NSQ 是一个基于 Go 语言的分布式实时消息平台,它基于 MIT 开源协议发布,由 bitly 公司开源出来的一款简单易用的消息中间件。
NSQ 可用于大规模系统中的实时消息服务,并且每天能够处理数亿级别的消息,其设计目标是为在分布式环境下运行的去中心化服务提供一个强大的基础架构。
NSQ 具有分布式、去中心化的拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。NSQ 非常容易配置和部署,且具有最大的灵活性,支持众多消息协议。
NSQ架构
Topic 消息的逻辑关键词
topic 是 NSQ 消息发布的 逻辑关键词 ,可以理解为人为定义的一种消息类型。当程序初次发布带 topic 的消息时,如果 topic 不存在,则会在 nsqd中创建。
Producer 消息的生产者/发布者
producer 通过 HTTP API 将消息发布到 nsqd 的指定 topic ,一般有 pub/mpub 两种方式, pub 发布一个消息, mpub 一个往返发布多个消息。
- producer 也可以通过 nsqd客户端 的 TCP接口 将消息发布给 nsqd 的指定 topic 。
当生产者 producer 初次发布带 topic 的消息给 nsqd 时,如果 topic 不存在,则会在 nsqd 中创建 topic.
Channel 消息传递的通道
当生产者每次发布消息的时候,消息会采用多播的方式被拷贝到各个 channel 中, channel 起到队列的作用。
- channel 与 consumer(消费者) 相关,是消费者之间的负载均衡,消费者通过这个特殊的 channel 读取消息。
- 在 consumer 想单独获取某个 topic 的消息时,可以 subscribe(订阅)一个自己单独命名的 nsqd 中还不存在的 channel, nsqd 会为这个 consumer创建其命名的 channel
- channel 会将消息进行排列,如果没有 consumer 读取消息,消息首先会在内存中排队,当量太大时就会被保存到磁盘中。可以在配置中配置具体参数。
- 一个 channel 一般会有多个 consumer 连接。假设所有已连接的 consumer 处于准备接收消息的状态,每个消息将被传递到一个随机的 consumer。
Go 语言中的 channel 是表达队列的一种自然方式,因此一个 NSQ 的 topic/channel,其核心就是一个存放消息指针的 Go-channel 缓冲区。缓冲区的大小由 —mem-queue-size 配置参数确定。
Consumer 消息的消费者
consumer 通过 TCP subscribe 自己需要的 channel, topic 和 channel 都没有预先配置。 topic 由第一次发布消息到命名 topic 的 producer 创建或第一次通过 subscribe 订阅一个命名 topic 的 consumer 来创建。 channel 被 consumer 第一次 subscribe 订阅到指定的 channel 创建。
- 多个 consumer subscribe一个 channel,假设所有已连接的客户端处于准备接收消息的状态,每个消息将被传递到一个 随机 的 consumer。
- NSQ 支持延时消息, consumer 在配置的延时时间后才能接受相关消息。
Channel 在 consumer 退出后并不会删除,这点需要特别注意。
NSQ 四个组件
NSQ 四个组件:nsqd、nsqlookupd、nsqadmin 和 utilities。
nsqd
nsqd 是消息队列的主体,对消息的接收、处理、把消息分发到客户端。
nsq 上有一个叫 “clicks” 的 topic, clicks 下面有三条 channel, 也就是三个消费者组,其中 channel 名称为 “metrics” 的有三个实例。消息 A 来到 nsq 后,被复制到三条 channel.
- 接着在 metrics 上的那个 A,被推送到了第二个实例上。
- 接着又来了一个叫 B 的消息,这一次 B 被推送给了第一个实例进行处理。
首先,假设我的会员系统,部署了三台实例,他们都订阅了 topic 为 “order_created” 的消息,那么一旦有订单创建,这三台实例就都会收到消息,并且去更新会员积分信息,而其实我只需要更新一次就ok了。
这就涉及到一个消费者组(Comsumer Group)的概念。消费者组是 Kafka 里提到的,在 Nsq 对应的术语是 channel.
会员系统的三个实例,当它们收到消息时,要做的事情是一样的,并且只需要有一个实例执行,那么它们就是一个消费者组里面的,要标识为同一个 channel,比如说叫 “update_memeber_credit” 的 channel,而短信系统和推荐系统,也要有自己的 channel,用来和会员系统作区分,比如说叫 “send_msg” 和 “update_user_interesting_goods”.
当 nsq 收到消息时,会给每个 channel 复制一份消息,然后 channel 再给对应的消费者组,推送一条消息。消费者组里有多个实例,那么要推给谁呢?这就涉及到负载均衡,比如有一个消费者组里有 ABC 三个实例,这次推给了 A,那么下次有可能是推送给 B,再下次,也许就是 C …
nsqlookupd
是 NSQ 的一个助手进程,提供了字典式的查询服务。消费者可以通过 nsqlookup 查询他们感兴趣订阅的 topic 所对应的 nsqd 实例的地址。从配置层面上而言,这种方式解耦了消费者与生产者,两者都只需要与nsqlookupd通信,双方不需要直接绑定配置,减少了复杂度和维护成本,而且每个 nsqd 都与 nsqlookupd 有着长期的通信,使得 nsqd 持续将自己的状态反映给 nsqlookupd。这些状态数据由 nsqlookupd 发送给消费者,告知他们最新的nsqd的地址。
nsqadmin
nsqadmin 设计用来方便查看,管理集群的。它提供一个 Web 用户界面来浏览 topic/channel/consumer 的层次结构,并检查每一层的深度和其他关键性的统计信息。同时还支持一些管理命令,比如删除或者清空 channel 等。
含义:
- NSQd Host: 节点的地址
- depth: 当前消息数,即当前的积压量。 最老的待确认的 ack 消息条数 距离 最新的条数的间隔
- DepthTimestamp: 最新收到的 ack 确认信息时间戳
- RecentDelayed: 延迟队列中下一条应该投递的消息的延迟到期时间
- memory+disk: 内存+硬盘分别积压的消息数
- In-Flight:当前未完成的消息数:包括发送但未返回 FIN(消费成功)/ 重新入队列 REQ / 超时 TIMEOUT 三种消息数之和,代表已经投递还未消费掉的消息 待 ack 的消息条数,包括正在处理的和内存延迟的消息
- Deferred:在内存中的延迟消息, 如果过多会移动至磁盘延迟队列
- DelayedQueue: 延迟队列中的消息条数
- Requeued:累计重试的消息条数
- Timed Out:累计超时的消息条数
- Messages:队列中的消息总条数
-
utilities
有赞NSQ整体架构
nsq的总体架构如下图所示:
较之原有的NSQ,它有以下新的特性: 动态发现NSQ。原生的NSQ中,生产者和NSQD是一一对应的,会造成一定的资源浪费。现在,生产者可以像消费者一样,通过nsqlookup查找NSQ。
- 支持数据备份。原生NSQ使用内存存储消息,只有当消息超过一定限制时才会存入磁盘。有赞借鉴了Kafka的Partition机制,消息可以存放到多个Partition中,提高了数据的安全性。同时,会将所有数据写入磁盘。
- 支持顺序消费。通过生产者和消费者的配合,可以支持消息顺序消费。
1、生产者动态发现 nsq
nsq 官方推荐的集群策略,要求每个生产者都配置一个nsq,这样有两个问题:
- 每增加一个生产者,就要增加一个nsq,有点浪费
- 如果生产者配套的 nsq 挂了,这个生产者就不能发布消息
解决方案:让生产者像消费者那样,通过 nsqlookup 来动态查找 nsq 的消息。
2、数据缺少备份
nsq 选择把消息放到内存中,只有当队列里消息的数量超过--mem-queue-size
配置的限制时,才会对消息进行持久化,把消息保存到磁盘中,简称”刷盘“。
但是即使将 —mem-queue-size 设置为 0,即每条消息都会保存到磁盘(当然这样会很影响效率),也不能保证数据的安全,一旦 nsq 节点磁盘受损,数据还是会丢失。
需要对数据进行复制,才能实现消息的真正可靠性。
有赞借鉴了 Kafka 的 partition 机制,把消息复制到多个 nsq 的 partition 中,比如 topic A 的消息,配置了两个 partition,一个在 nsq A,另一个在 nsq C,那么一旦有新的 topic A 的消息被生产,消息就会被复制到这两个 nsq 中,原理和 Kafka 的一致,消息先被发布到 leader partition,leader partition 再把消息复制到其他 partition。
同时,有赞也对进来 nsq 的消息,直接进行刷盘,不再等队列里消息的数量超过 —mem-queue-size 配置的限制时,才会对消息进行持久化,channel 主动过来磁盘读取消息,下面是改造前和改造后,消息读取方式的对比:
3、无法实现顺序消费
有赞 nsq 实现顺序消费,同样借鉴了 Kafka 的实现思路:
- 相同 ID 的,只会去到相同的 partition
- 调整并发消费策略, 保证同一时刻只会投递一条消息进行消费, 等待上一次 ack 成功后才继续投递下一条消息