TCP 协议规范
NSQ 协议足够简单,用任何语言编译客户端都很容易。我们提供官方的 Go 和 Python 客户端库。 nsqd
进程通过监听配置的 TCP 端口来接受客户端连接。
连接后,客户端必须发送一个 4 字节的 “magic” 标识码,表示通讯协议的版本。
V2
(4 个字节的 ASCII[space][space][V][2]
) 消费用到的推送流协议(和发布用到的请求/响应协议)
认证后,客户端可以发送 IDENTIFY
命令来停供常用的元数据(比如,更多的描述标识码)和协商特性。为了 消费消息,客户端必须 SUB
到一个通道(channel)。
订阅的时候,客户端的 RDY
状态为 0。意味着没有消息会被发送到客户端。当客户端已经准备好接受消息时,需要把 RDY
设置为 #。比如设置为 100,不需要任何附加命令,将会有 100 条消息推送到客户端(每次服务端都会相应的减少 RDY
的值)。
V2 版本的协议让客户端拥有心跳功能。每隔 30 秒(默认设置), nsqd
将会发送一个 _heartbeat_
响应,并期待返回。如果客户端空闲,发送 NOP
命令。如果 2 个 _heartbeat_
响应没有被应答, nsqd
将会超时,并且强制关闭客户端连接。 IDENTIFY
命令可以用来改变/禁用这个行为。
注意
- 除非 stated,所有的传输的二级制大小/整数都是网络字节顺序。(列如. big endian)
- 有效的话题(topic)和通道(channel)名必须是字符
[.a-zA-Z0-9_-]
和数字1 < length <= 64
(在nsqd 0.2.28
版本前最长32
位)命令
IDENTIFY
更新服务器上的客户端元数据和协商功能。
注意: 这个命令包含 JSON 的相关内容,包括IDENTIFY\n
[ 4-byte size in bytes ][ N-byte JSON data ]
client_id | 这个标示符用来消除客户端的歧义 (比如. 一些指定给消费者) |
---|---|
hostname | 部署了客户端的主机名 |
feature_negotiation | 用来标示客户端支持的协商特性。如果服务器接受,将会以 JSON 的形式发送支持的特性和元数据 |
heartbeat_interval | 心跳的毫秒数 |
output_buffer_size | 当 nsqd 写到这个客户端时将会用到的缓存的大小(字节数) |
output_buffer_timeout | 超时后,nsqd 缓冲的数据都会刷新到此客户端 |
tls_v1 | 允许 TLS 来连接,如果服务器支持 TLS,将会回复 “tls_v1”: true 。 客户端读取 IDENTIFY 响应后,必须立即开始 TLS 握手。 完成 TLS 握手后服务器将会响应 OK |
snappy | 允许 snappy 压缩这次连接 |
deflate | 允许 deflate 压缩这次连接 |
deflate_level | 配置 deflate 压缩这次连接的级别 |
sample_rate | 投递此次连接的消息接收率 |
user_agent | 这个客户端的代理字符串 |
msg_timeout | 配置服务端发送消息给客户端的超时时间 |
成功后响应:
OK
注意: 如果客户端发送了 feature_negotiation (并且服务端支持),响应体将会是 JSON。 错误后的响应内容:
E_INVALID
E_BAD_BODY
SUB
订阅话题(topic) /通道(channel)
SUB <topic_name> <channel_name>\n
<topic_name> - 字符串 (建议包含 #ephemeral 后缀)
<channel_name> - 字符串 (建议包含 #ephemeral 后缀)
成功后响应:
OK
错误后响应:
E_INVALID E_BAD_TOPIC
E_BAD_CHANNEL
PUB
发布一个消息到话题(topic):
PUB <topic_name>\n
[ 4-byte size in bytes ][ N-byte binary data ]
<topic_name> - 字符串 (建议 having #ephemeral suffix)
成功后响应:
OK
错误后响应:
E_INVALID E_BAD_TOPIC
E_BAD_MESSAGE E_PUB_FAILED
MPUB
发布多个消息到 话题(topic) (自动)
MPUB <topic_name>\n
[ 4-byte body size ]
[ 4-byte num messages ]
[ 4-byte message #1 size ][ N-byte binary data ]
... (repeated <num_messages> times)
<topic_name> - 字符串 (建议 having #ephemeral suffix)
成功后响应:
OK
错误后响应:
E_INVALID
E_BAD_TOPIC
E_BAD_BODY
E_BAD_MESSAGE
E_MPUB_FAILED
RDY
更新 RDY 状态 (表示你已经准备好接收 N 消息)
注意: nsqd v0.2.20+ 使用 --max-rdy-count
表示这个值
RDY <count>\n
<count> - a string representation of integer N where 0 < N <= configured_max
注意: 这个没有成功后响应
错误后响应:
E_INVALID
FIN
完成一个消息 (表示成功处理)
FIN <message_id>\n
<message_id> - message id as 16-byte hex string
注意: 这里没有成功后响应
错误后响应:
E_INVALID
E_FIN_FAILED
REQ
重新将消息队列(表示处理失败)
这个消息放在队尾,表示已经发布过,但是因为很多实现细节问题,不要严格信赖这个,将来会改进。
简单来说,消息在传播途中,并且超时就表示 REQ 。
REQ <message_id> <timeout>\n
<message_id> - message id as 16-byte hex string
<timeout> - a string representation of integer N where N <= configured max timeout
0 is a special case that will not defer re-queueing
注意: 这里没有成功后响应
错误后响应:
E_INVALID
E_REQ_FAILED
TOUCH
重置传播途中的消息超时时间
TOUCH <message_id>\n
<message_id> - the hex id of the message
注意: 这里没有成功后响应
错误后响应:
E_INVALID
E_TOUCH_FAILED
CLS
清除连接(不再发送消息)
CLS\n
成功后响应:
CLOSE_WAIT
错误后响应:
E_INVALID
NOP
No-op
NOP\n
AUTH
如果 IDENTIFY
响应中有 auth_required=true
,客户端必须在 SUB
, PUB
或 MPUB
命令前前发送 AUTH
。否则,客户端不需要认证。
当 nsqd
接收到 AUTH
命令,它通过执行 HTTP 配置 --auth-http-address
,这个请求包括以下查询参 数:连接的远程地址,TLS 状态,支持的认证密码。更多细节参见: AUTH
AUTH\n
[4-byte size in bytes ][ N-byte Auth Secret ]
成功后响应:
JSON 包含授权给客户端的身份,可选的 URL,和授权过的权限列表。
{"identity":"...", "identity_url":"...", "permission_count":1}
错误后响应:
E_AUTH_FAILED - An error occurred contacting an auth server
E_UNAUTHORIZED - No permissions found
数据格式
数据异步传输给客户端,并且支持各种回复体,比如
[x][x][x][x][x][x][x][x][x][x][x][x]...
| (int32) || (int32) || (binary)
| 4-byte || 4-byte || N-byte
------------------------------------...
size frame type data
客户端必须是以下类型之一:
FrameTypeResponse int32 = 0
FrameTypeError int32 = 1
FrameTypeMessage int32 = 2
客户端必须是以下类型之一:
FrameTypeResponse int32 = 0
FrameTypeError int32 = 1
FrameTypeMessage int32 = 2
以及消息格式:
[x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]...
| (int64) || || (hex string encoded in ASCII) || (binary)
| 8-byte || || 16-byte || N-byte
------------------------------------------------------------------------------------------...
nanosecond timestamp ^^ message ID message body
(uint16)
2-byte
attempts
客户端库
支持下列客户端
Name | Language | SUB | PUB | Discovery | Backoff | TLS | Snappy | Sampling | AUTH | Notes |
---|---|---|---|---|---|---|---|---|---|---|
nsqd | HTTP | √ | built-in | |||||||
go-nsq | Go | √ | √ | √ | √ | √ | √ | √ | √ | official |
pynsq | Python | √ | √ | √ | √ | √ | √ | √ | √ | official |
nsqjs | JavaScript (CoffeeScript) | √ | √ | √ | √ | √ | √ | √ | √ | official |
nsq-py | Python | √ | √ | √ | √ | √ | √ | √ | √ | |
gnsq | Python | √ | √ | √ | √ | √ | √ | √ | √ | |
krakow | Ruby | √ | √ | √ | √ | √ | √ | √ | ||
JavaNSQClient | Java | √ | √ | √ | √ | √ | √ | √ | ||
ensq | Erlang | √ | √ | √ | √ | |||||
nsq.js | JavaScript | √ | √ | √ | ||||||
TrendrrNSQClient | Java | √ | √ | √ | ||||||
nsqjava | Java | √ | √ | |||||||
nsqphp | PHP | √ | √ | √ | ||||||
node-nsqueue | JavaScript | √ | √ | |||||||
ruby_nsq | Ruby | √ | √ | √ | ||||||
libnsq | C | √ | official | |||||||
nsq-ruby | Ruby | √ | √ | √ | ||||||
NsqSpinner | Python | √ | √ | √ | √ | √ | √ | √ | ||
nsq-java | Java | √ | √ | √ | ||||||
NSQnet | .NET | √ | √ | √ | ||||||
nsq-client | JavaScript | √ | √ | |||||||
hsnsq | Haskell | √ | √ | |||||||
perl-anyevent-nsq | Perl | √ | √ | √ | ||||||
nsq-clojure | Clojure | |||||||||
nsqie | Scala | √ | √ | |||||||
nodensq | JavaScript | √ | √ |
编译客户端库
NSQ 将一些功能集成到客户端库中,以便维持集群的健壮性和性能。
这篇文章试图列出客户端库通常需要完成的功能。因为发布到 nsqd 非常的琐碎(仅用 HTTP POST /put
节点就可),这个文档主要关注消费者。
配置
从高层看,配置相关的设计理念是希望系统能支持不同的工作负载,使用相同的默认值能立即可用,并且能将拨号数最小化。
消费者通过 TCP 连接到 nsqd
实例,订阅通道(channel
) 上的 话题(topic
)。每个连接只能订阅一个话题(topic),因此消费多个话题(topic),必须响应的结构化。
使用 nsqlookupd
来发现是方案之一,所以客户端库必须支持消费者直接连接一个或多个 nsqd 实例,或者它可用轮询一个或多个 nsqlookupd
实例。当消费者轮询 nsqlookupd
的时候,时间间隔必须是可配置的。另外,因为 NSQ 的标准部署是分布式环境,包含很多消费者和生产者,客户端库必须根据配置值得随机性自动添加抖动。更多细节参考发现.
对于消费者来说,在 nsqd 响应前能接收到多少消息是非常重要的指标。这个管道促进缓存,批处理,异步消息处理。这个值称为 max_in_flight
,并且它影响了 RDY
状态。更多细节参见 RDY
状态。
设计系统时通常会考虑优雅处理失败,客户端库希望能实现失败消息的重试,并提供边界参数来处理每个消息尝试次数。更多细节参见消息处理。
当消息处理失败的时候,客户端库能自动将消息重新队列。NSQ 支持使用 REQ 命令发送延迟。客户端库需要能提供延迟的初始化值(第一次失败时),以及重新队列失败该如何改变。更多细节参见 Backoff.
最重要的时,客户端库必须支持消息处理的回调函数配置。这些回调函数必须简单,通常都支持一个参数(消息对象的实例)。
发现
nsqlookupd
是 NSQ 的重要组成部分,它为消费者发现服务提供来定位 nsqd
,它在运行时提供一个指定话题(topic)。
虽然使用 nsqlookupd
能大幅减少配置数目,但是需要维持并放大一个巨大的分布式 NSQ 集群。
当消费者使用 nsqlookupd
来发现时,客户端库必须管理轮询所有 nsqlookupd 实例的进程,最新的 nsqd
组合以问题形式提供了话题(topic),并且管理到这些 nsqd 的连接。
查询一个 nsqlookupd
实例非常的简单。执行一个 HTTP 请求,使用消费者试图发现的话题(topic) 作为查询参数来查找节点(例如/lookup?topic=clicks
). 响应体是 JSON:
{
"status_code": 200,
"status_txt": "OK",
"data": {
"channels": ["archive", "science", "metrics"],
"producers": [
{
"broadcast_address": "clicksapi01.routable.domain.net",
"hostname": "clicksapi01.domain.net",
"tcp_port": 4150,
"http_port": 4151,
"version": "0.2.18"
},
{
"broadcast_address": "clicksapi02.routable.domain.net",
"hostname": "clicksapi02.domain.net",
"tcp_port": 4150,
"http_port": 4151,
"version": "0.2.18"
}
]
}
}
broadcast_address
和 tcp_port
必须用来连接 nsqd。 因为从设计上来说 nsqlookupd
实例不会分享或协调他们的数据,客户端库必须联合它接收到得所有 nsqlookupd
查询列表来建立 nsqd
最终列表。使用 broadcast_address:tcp_port
作为这个联合的唯一 KEY。
必须用周期性的计时器来重复的轮询 nsqlookupd
的配置,这样消费者能自动的发现新的 nsqd
。客户端库必须自动的初始化到所有新发现的实例的连接。
当客户端库开始执行的时候,它必须通过踢开配置 nsqlookupd
实例的一组请求,来引导轮询。
连接处理
一旦消费者有一个 nsqd
可以连接(通过发现或手工配置), 它就必须打开一个 TCP 连接到 broadcast_address:port
。一个单独的 TCP 连接必须能让消费者可以订阅到每个 nsqd
的话题(topic)。
当连接到一个 nsqd
实例时,客户端库必须发送以下数据,顺序是:
- 魔术标识符
- 一个
IDENTIFY
命令 (和负载) 和读/验证响应 - 一个
SUB
命令 (指定需要的话题(topic)) 和读/验证响应 - 一个初始化
RDY
值 1
(低级别的细节参见 spec)
重新连接
客户端库必须通过以下方法自动重新连接:
- 如果消费者通过特定的
nsqd
列表指定,重新连接必须通过延迟重试来处理。(列如,8s, 16s, 32s, 等等, 到最大值后重试)。 - 如果消费者通过
nsqlookupd
来发现实例,必须通过轮询间隔来自动处理重新连接(例如,如果消费者断开和nsqd
的连接,客户端库仅在随后的nsqlookupd
轮询发现的实例后重新连接)。这能保证消费者了解nsqd
。特性协商
IDENTIFY
命令可以用来设置nsqd
端的元数据,修改客户端设置,并特性协商,它满足亮点:
- 某些情况下,客户端可能会修改
nsqd
的交互方式(比如,修改客户端的心跳间隔,并允许压缩,TLS,输出缓存,等等-完整列表参见 spec) nsqd
使用 JSON payload 来响应IDENTIFY
命令,它包含了重要的服务端配置值,客户端和之交互时必须遵守。
连接后,根据用户的配置, 客户端库必须发送一个 IDENTIFY
命令, 它的内容是 JSON payload:
{
"client_id": "metrics_increment",
"hostname": "app01.bitly.net",
"heartbeat_interval": 30000,
"feature_negotiation": true
}
feature_negotiation
位表示客户端可以接受返回值是 JSON payload。 client_id
和 hostname
是随意的文本字段,nsqd
(和 nsqadmin
)会用来区别客户端。 heartbeat_interval
配置每个客户端的心跳间隔。
nsqd
必须响应 OK, 否则:
{
"max_rdy_count": 2500,
"version": "0.2.20-alpha"
}
数据流和心跳
一旦消费者处于订阅状态,NSQ 协议里的数据流时异步的。对于消费者来说,这就是说如果想建立一个健壮并高效的客户端库,就必须使用异步的网络 IO 循环和/或“线程”(线程表示 OS 级别的线程和用户空间(userland)的进程,比如协同程序(coroutines))。
另外,期望客户端能响应它们连接到的 nsqd
实例的周期性心跳。通常这个周期是 30 秒。客户端可以使用任何命令响应,不过通常方便起见,使用 NOP
响应心跳。更多细节参见 protocol spec。
“进程”必须专注于读取 TCP socket 的数据,解包帧数据,并执行多路逻辑来传输。这也是处理心跳最佳点。从最低级别看,读取协议包括以下步骤:
- 读取 4 字节 big endian uint32 大小
- 读取字节大小数据
- 解包数据
- …
- profit
- goto 1
一个和错误相关小插曲
根据系统的异步特性,会采用更多的状态来追踪相关协议的由命令产生的错误。我们会采用“快速错误”(”fail fast”)方法,所以大量协议级别错误处理都是致命的。这意味着如果客户端发送一个无效命令(或者自己是无效状态),通过强制关闭连接(如果可能,发送一个错误给客户端),它连接到的nsqd
实例将会保护自己(和系统)。和之前提到的连接处理相配合,使得系统更加健壮和稳定。
仅有的几个非致命错误是:
E_FIN_FAILED
-FIN
命令, 无效的消息 IDE_REQ_FAILED
-REQ
命令 无效的消息 IDE_TOUCH_FAILED
-TOUCH
命令 无效的消息 ID
因为这些错误通常和时间有关,所以不当做致命错误。这些错误通常发生在 nsqd 端消息超时,重新队列时,和投递到其他消费者时。原先的接受者不再允许响应这个消息。
消息处理
当 IO 循环解包包含消息的帧数据时,它必须路由这个消息给配置处理函数来处理。
发送 nsqd,在配置消息超时时希望收到回复(默认:60秒)。可能有以下场景:
- 处理函数表示消息已经成功处理
- 处理函数表示消息正处理成功
- 处理函数表示需要更多的时间来处理消息
- in-flight 超时,并且
nsqd
自动重新队列消息
前 3 个情况,客户端库必须发送合适消费者方面的命令 (FIN
,REQ
,和 TOUCH
)。
FIN
命令最简单。它告诉 nsqd
它能安全的抛弃消息。FIN
也能抛弃那些你不想处理或重试的消息。
REQ
命令告诉 nsqd
,消息必须重新队列(可选参数指定了重试的次数)。如果消费者没有指定可选参数,客户端库必须自动算出相关联的消息处理的时长(通常设置为多倍,这样效率更高)。客户端库必须抛弃超过最多重试次数的消息。当它发生的时候,必须执行用户提供的回调来通知,并运行特定的回调。
如果消息处理函数需要的时间超过配置的超时时间,可以用 TOUCH
命令来重置 nsqd
端的计时器。可以重复这个动作,直到消息 FIN
或 REQ
,或发送 nsqd
的配置属性 max_msg_timeout
。客户端库不能自动 TOUCH
代表消费者。
如果发送 nsqd
实例没有接收到响应,消息将会超时,并会自动重新队列来投递到可用的消费者。
最后,每个消息的属性是尝试次数。客户端库必须比较这个值和配置的最大值,并且抛弃已经超过这个值得消息。当消息已经抛弃的时候,需要触发回调。通常这个回调的实现必须包括写入磁盘,日志等等。用户必须能重写默认的处理函数。
RDY 状态
因为消息是从 nsqd
推送到消费者那,我们必须拥有一个方法来管理数据流,而不仅依赖于低级别的 TCP 语法。消费者的 RDY
状态是 NSQ
的流控制机制。
如配置中列出的内容,通过 max_in_flight
配置消费者。这是并行的并且性能 knob。比如一些 downstream 系统可以更加容易进行消息批处理,并对更高级的 max-in-flight
有利。
当消费者连接到 nsqd
(并且订阅) ,RDY
初始化状态为 0。不会投递任何消息。
客户端库拥有很少的责任:
- 引导并最终分布配置
max_in_flight
到所有的连接。 - 永远不允许汇集所有连接
RDY
的和(total_rdy_count
),为超过max_in_flight
的配置。 - 永远不要超过每个连接
nsqd
配置的max_rdy_count
。 - 暴露一个 API 方法给值得信赖的消息流。
1. 引导和分布
为连接选择RDY
值,需要考虑的因素很少(最终分布为max_in_flight
):
- 连接 # 是动态的,通常并不知道次数(例如,当通过
nsqlookupd
发现nsqd
)。 max_in_flight
可能会小于你的连接数
为了开始消息流,客户端库必须发送一个初始的 RDY
值。因为最终的连接数并不知道(通常从 ‘1’ 开始),所以客户端库必能公平对待每个连接。
另外,每个消息处理后,客户端库必须评估什么时候更新 RDY
状态。如果当前值是 ‘0’,或者低于最后发送的值的 25% 必须触发更新。客户端库必须一直尝试最终分布 RDY
值到所有的连接。通常来说,它可以通过 max_in_flight
/ num_conns
实现。
然而,当 max_in_flight < num_conns
这个简单的公式无效的时候。客户端库必须执行一个动态的运行评估,自从通过之前的连接接收到得消息后,连接的 nsqd
‘活跃度’的时间。当配置到期后,他必须重新分布,不论 RDY
值是否对于新的 nsqd
有效。这么做,你能保证你可以通过消息找到 nsqd
。这些会有延迟的影响。
2. 维护 max_in_flight
客户端库必须维护指定消费者的消息in flight
的最大值。尤其,汇集每个连接的 RDY
值永远不能超过配置的 max_in_flight
值。
底下的 Python 代码,它指出 RDY
值是否对于指定的连接有效。
def send_ready(reader, conn, count):
if (reader.total_ready_count + count) > reader.max_in_flight:
return
conn.send_ready(count)
conn.rdy_count = count
reader.total_ready_count += count
3. nsqd 最大 RDY 值
每个 nsqd
通过 --max-rdy-count
配置,如果消费者发送的 RDY
值超过了可接受的范围,它的连接将强制关闭。为了向后兼容,这个值必须假设为 2500
,如果 nsqd
实例不能支持特性协商。
4. 消息流 Starvation
最终,客户端库必须提供一个 API 方法,来表示消息流 starvation
。对于消费者(消费者处理函数)来说,简单比较 in-flight
的消息数和 max_in_flight
值,来决定是否”批处理“不太合适。有两种情况有问题:
当消费者配置 max_in_flight > 1
, 根据变量 num_conns
,max_in_flight
除 num_conns
除不尽。因为你永远不能超过max_in_flight
,你必须降低,并且在 RDY
值少于 max_in_flight
时结束。
如果仅仅 nsqd
的子集有消息,因为 even distribution
的 RDY
预期值, 这些活跃 nsqd
仅有 max_in_flight
的片段。
以上两种情况,消费者实际上永远不会接受消息的 max_in_flight
。因此,客户端库必须暴露一个方法 is_starved
,表示任何连接是否 starved
,如下:
def is_starved(conns):
for c in conns:
# the constant 0.85 is designed to *anticipate* starvation rather than wait for it
if c.in_flight > 0 and c.in_flight >= (c.last_ready * 0.85):
return True
return False
is_starved
方法必须由消息处理函数使用,来发现什么时候处理批量消息。
Backoff
消息处理失败的时候如何处理是一个非常复杂的问题。消息处理章节介绍了客户端库动作,它会处理和时间相关的失败的消息。其他的问题是是否减少吞吐量。这两个功能对于整个系统的稳定性至关重要。
通过减慢处理的速率,或者 “backing off”,消费者允许 downstream 系统回收传输失败。然而这个行为必须是可配置的,因为不是什么时候都能称心如意,这种情况下延迟必须优先处理。
Backoff 必须通过发送 RDY 0
到合适的 nsqd
来实现,停止消息流。这个状态的时长通过重试的失败来计算。处理成功会减少这个时长,直到 reader 不再是 backoff 状态。
当 reader 是 backoff 状态时,超时后,客户端库必须仅发送过 RDY 1
,而不是 max_in_flight
。 在返回完整的 throttle 前,这是有效的 “tests the waters”。另外,backoff 超时时,客户端库必须忽略任何和计算 backoff 时间成功或者失败结果。(比如,每次超时时它仅信任一个结果)
图片 3.1 nsq_客户端_flow
加密/压缩
NSQ 支持加密和/或压缩特性协商,通过 IDENTIFY
命令。 TLS 用来加密。 Snappy 和 DEFLATE 都支持压缩。Snappy 可作为第三方库使用,但是基本所有的语言都支持 DEFLATE
。
收到 IDENTIFY
响应时,并且你通过 tls_v1
标志位请求 TLS,你得到的东西和以下内容类似:
{
"deflate": false,
"deflate_level": 0,
"max_deflate_level": 6,
"max_msg_timeout": 900000,
"max_rdy_count": 2500,
"msg_timeout": 60000,
"sample_rate": 0,
"snappy": true,
"tls_v1": true,
"version": "0.2.28"
}
确认 tls_v1
为 true
后(意味着服务器支持 TLS),在接受和发送任何消息前,你需要初始化 TLS 握手(例如,Python 使用 ssl.wrap_socket
表示完成)。TLS 握手成功后,你必须立即读取一个 NSQ 加密的 OK 响应。
如果你想压缩,可以设置 snappy
或 deflate
为 true
,并且使用合适压缩(解压缩)调用读写。同样的你必须立即读取一个 NSQ 压缩的 OK
响应。
这些压缩特性是互斥的。
你不能阻止缓存直到加密/压缩协商完成,或者确保小心的读取到内存。
汇总
分布式系统非常有意思。不同的 NSQ 集群部门间交互在一个平台上,它健壮,高性能,并且稳定。希望您能这篇文章里了解到客户端是多么重要。
这些细节的实现,我们将 pynsq
和 go-nsq
作为代码基础。pynsq
可以切割为 2 个部分:
Message
- 高级别的消息对象,它暴露了状态方法,来响应nsqd
(FIN
,REQ
,TOUCH
等等),同时元数据包含目的和时间戳。Connection
- 高级别的封装,包含 TCP 连接到一个指定的nsqd
,它包含 flight 消息,RDY
状态,协商特性,和不同时间。消费者
- 和用户打交道的 API,它处理发现,创建连接(和订阅),引导和管理RDY
状态,解析收到的数据,创建消息对象,和分发消息给处理函数。Producer
-和用户打交道的 API,处理发布。
我们很高兴能帮助任何对编写客户端库有兴趣的人。我们希望大家能加入到社区,扩展目前已经存在的库。社区 已经开源很多客户端库。