nsqd

nsqd 是一个守护进程,负责接收,排队,投递消息给客户端。它可以独立运行,不过通常它是由 nsqlookupd 实例所在集群配置的(它在这能声明 topics 和 channels,以便大家能找到)。它在 2 个 TCP 端口监听,一个给客户端,另一个是 HTTP API。同时,它也能在第三个端口监听 HTTPS。

命令行选项

param description
-auth-http-address=[addr]:[port] 查询授权服务器 (可能会给多次)
-broadcast-address=”” 通过 lookupd 注册的地址(默认名是 OS)
-config=”” 配置文件路径
-data-path=”” 缓存消息的磁盘路径
-deflate=true 运行协商压缩特性(客户端压缩)
-e2e-processing-latency-percentile= 消息处理时间的百分比(通过逗号可以多次指定,默认为 none)
-e2e-processing-latency-window-time=10m0s 计算这段时间里,点对点时间延迟(例如,60s 仅计算过去 60 秒)
-http-address=”0.0.0.0:4151” 为 HTTP 客户端监听 [addr]:[port]
-https-address=”” 为 HTTPS 客户端 监听 [addr]:[port]
-lookupd-tcp-address= 解析 TCP 地址名字 (可能会给多次)
-max-body-size=5123840 单个命令体的最大尺寸
-max-bytes-per-file=104857600 每个磁盘队列文件的字节数
-max-deflate-level=6 最大的压缩比率等级(] values == ] nsqd CPU usage)
-max-heartbeat-interval=1m0s 在客户端心跳间,最大的客户端配置时间间隔
-max-message-size=1024768 (弃用 —max-msg-size) 单个消息体的最大字节数
-max-msg-size=1024768 单个消息体的最大字节数
-max-msg-timeout=15m0s 消息超时的最大时间间隔
-max-output-buffer-size=65536 最大客户端输出缓存可配置大小(字节)
-max-output-buffer-timeout=1s 在 flushing 到客户端前,最长的配置时间间隔。
-max-rdy-count=2500 客户端最大的 RDY 数量
-max-req-timeout=1h0m0s 消息重新排队的超时时间
-mem-queue-size=10000 内存里的消息数(per topic/channel)
-msg-timeout=”60s” 自动重新队列消息前需要等待的时间
-snappy=true 打开快速选项 (客户端压缩)
-statsd-address=”” 统计进程的 UDP [addr]:[port]
-statsd-interval=”60s” 从推送到统计的时间间隔
-statsd-mem-stats=true 切换发送内存和 GC 统计数据
-statsd-prefix=”nsq.%s” 发送给统计keys 的前缀(%s for host replacement)
-sync-every=2500 磁盘队列 fsync 的消息数
-sync-timeout=2s 每个磁盘队列 fsync 平均耗时
-tcp-address=”0.0.0.0:4150” TCP 客户端 监听的 [addr]:[port]
-tls-cert=”” 证书文件路径
-tls-client-auth-policy=”” 客户端证书授权策略 (‘require’ or ‘require-verify’)
-tls-key=”” 私钥路径文件
-tls-required=false 客户端连接需求 TLS -tls-root-ca-file=””
-verbose=false 打开日志
-version=false 打印版本
-worker-id=0 进程的唯一码(默认是主机名的哈希值)

HTTP API

API description
/ping 活跃度
/info 版本
/stats 检查综合运行
/pub 发布消息到话题(topic)
/mpub 发布多个消息到话题(topic)
/debug/pprof pprof 调试入口
/debug/pprof/profile 生成 pprof CPU 配置文件
/debug/pprof/goroutine 生成 pprof 计算配置文件
/debug/pprof/heap 生成 pprof 堆配置文件
/debug/pprof/block 生成 pprof 块配置文件
/debug/pprof/threadcreate 生成 pprof OS 线程配置文件
/topic/create 创建一个新的话题(topic)
/topic/delete 删除一个话题(topic)
/topic/empty 清空话题(topic)
/topic/pause 暂停话题(topic)的消息流
/topic/unpause 恢复话题(topic)的消息流
/channel/create 创建一个新的通道(channel)
/channel/delete 删除一个通道(channel)
/channel/empty 清空一个通道(channel)
/channel/pause 暂停通道(channel)的消息流
/channel/unpause 恢复通道(channel)的消息流

NOTE: 这些结束点返回 “wrapped” JSON:

  1. {"status_code":200, "status_text":"OK", "data":{...}}

发送 Accept: application/vnd.nsq; version=1.0 头将会协商使用未封装的 JSON 响应格式 (as of nsqd v 0.2.29+ )。

/pub

  1. 发布一个消息
  2. 参数:
  3. topic - the topic to publish to
  4. POST body - the raw message bytes
  5. $ curl -d "<message>" http://127.0.0.1:4151/pub?topic=message_topic`

/mpub

  1. 一个往返发布多个消息
  2. 参数:
  3. topic - 发布到的话题(topic)
  4. binary - bool ('true' or 'false') 允许二进制模式
  5. POST body - `\n` 分离原始消息字节

注意:默认的 /mpub 希望消息使用 \n 切割,使用 ?binary=true 查询参数来允许二进制模式,希望发送的消息体能成为以下的格式(HTTP ‘Content-Length’ 头必须是将要发送的消息体的总大小):

  1. [ 4-byte num messages ]
  2. [ 4-byte message #1 size ][ N-byte binary data ]
  3. ... (repeated <num_messages> times)
  4. $ curl -d "<message>\n<message>\n<message>" http://127.0.0.1:4151/mpub?topic=message_topic`

/topic/create

  1. 创建一个话题(topic)
  2. 参数:
  3. topic - 将要创建的话题

/topic/delete

  1. 删除一个已经存在的话题(topic) (和所有的通道(channel))
  2. 参数:
  3. topic - 现有的话题

/channel/create

  1. 为现有的话题(topic) 创建一个通道(channel)
  2. 参数:
  3. topic - 现有的话题
  4. channel - 要创建的频道

/channel/delete

  1. 删除现有的话题(topic) 一个的通道(channel)
  2. 参数:
  3. topic - 现有的话题
  4. channel - 待删除的通道

/topic/empty

  1. 清空现有话题(topic) 队列中所有的消息(内存和磁盘中)
  2. 参数:
  3. topic - 待清空的话题

/channel/empty

  1. 清空现有通道(channel) 队列中所有的消息(内存和磁盘中)
  2. 参数:
  3. topic - 现有的话题(topic)
  4. channel - 待清空的通道(channel)

/topic/pause

  1. 暂停已有话题(topic) 的所有通道(channel)的消息(消息将会在话题(topic) 里排队)
  2. 参数:
  3. topic - 现有的话题(topic)

/topic/unpause

  1. 为现有的话题(topic) 的通道(channel) 重启消息流
  2. 参数:
  3. topic - 现有的话题(topic)

/channel/pause

  1. 暂停发送已有的通道(channel) 给消费者(消息将会队列)
  2. 参数:
  3. topic - 现有的话题(topic)
  4. channel - 已有的通道(channel)将会被暂停

/channel/unpause

  1. 重新发送通道(channel) 里的消息给消费者
  2. 参数:
  3. topic - 现有的话题(topic)
  4. channel - 将要暂停的通道(channel)

/stats

  1. 返回内部统计数据
  2. 参数
  3. format - (可选) `text` or `json` (默认 = `text`)

/ping

  1. 监控结束点,必须返回 OK 。如果有问题返回 500。同时,如果写消息到磁盘失败将会返回错误状态。

/info

  1. 返回版本信息

/debug/pprof

  1. 可用的调试节点的页码

/debug/pprof/profile

  1. 开始 30秒的 pprof CPU 配置,并通过请求返回。
  2. 注意,因为它在运行时的性能和时间,这个结束点并没在 /debug/pprof 页面列表中。

/debug/pprof/goroutine

  1. 为所有运行的 goroutines 返回栈记录。

/debug/pprof/heap

  1. 返回堆和内存配置信息(前面的内容可作为 pprof 配置信息)

/debug/pprof/block

  1. 返回 goroutine 块配置信息

/debug/pprof/threadcreate

  1. 返回 goroutine 栈记录

Debugging and Profiling

nsqd 提供一套节点的配置信息,直接通过 Go 的 pprof 工具。如果你有 go 工具套装,只要运行:

  1. # memory profiling
  2. $ go 工具 pprof http://localhost:4151/debug/pprof/heap
  3. # cpu profiling
  4. $ go 工具 pprof http://localhost:4151/debug/pprof/profile

TLS

为了加强安全性,可以通过 —tls-cert 和 —tls-key 客户端配置 nsqd ,升级他们的链接为 TLS。 另外,你可以要求客户端使用 —tls-required ( nsqd v0.2.28+ )协商 TLS。
你可以通过 —tls-client-auth-policy ( require 或 require-verify )配置一个 nsqd 客户端证书:

  • require - 客户端必须提供一个证书,否则将会被拒绝
  • require-verify - 客户端必须提供一个有效的证书,根据 —tls-root-ca-file 指定的链接或者默认的 C

A,否则将会被拒绝。
可以当做客户端授权的表单( nsqd v0.2.28+ )。 如果你想生成一个 password-less,自签名证书,用:

  1. $ openssl req -x509 -newkey rsa:2048 -keyout key.pem -out cert.pem -days 365 -nodes

AUTH

注意: 在 nsqd v0.2.29+ 可用
通过使用一个遵从 Auth HTTP 协议的授权服务器,指定 -auth-http-address=host:port 标志,你可以配置 nsqd 。
注意: 希望当仅有 nsqd TCP 协议暴露给外部客户端时使用授权,而不是 HTTP(S) 节点。参见底下说明: Auth 服务器必须接受 HTTP 请求:

  1. /auth?remote_ip=...&tls=...&auth_secret=...

返回结果格式如下:

  1. {
  2. "ttl": 3600,
  3. "identity": "username",
  4. "identity_url": "http://....",
  5. "authorizations": [{
  6. "permissions": [
  7. "subscribe",
  8. "publish"
  9. ],
  10. "topic": ".*",
  11. "channels": [
  12. ".*"
  13. ]
  14. }]
  15. }

注意话题(topic) 和通道(channel) 字符串必须用 nsqd 的正则表达式来申请授权。 nsqd 将会为 TTL 间 隔,并会在这个间隔时间里重新请求授权。

通常情况,将会使用 TLS 来加强安全性。 nsqd 和 授权服务器间通过信任的网络通信(并没被加密)。如果一 个授权服务器通过远程 IP 信息来授权,客户端可以使用占位符(比如 . ),作为 AUTH 命令(Auth 服务器忽 略)。

授权服务器例子 pynsqauthd。

帮助服务器暴露 nsqlookupd 和 nsqd /stats 数据给客户端,从授权服务器通过权限过滤,在以下可以找到
nsqauthfilter。

当使用命令行工具,可以通过使用 —reader-opt 标志来授权。

  1. $ nsq_tail ... -reader-opt="tls_v1,true" -reader-opt="auth_secret,$SECRET"

点对点处理延迟

你可以选择设置 nsqd 来收集和发射点对点信息处理延迟,通过 —e2e-processing-latency-percentile 标志位来配置百分比。

使用概率百分比技术(参见 Effective Computation of Biased Quantiles over Data Streams)来计算值。我 们通过 bmizerany 来使用 perks 包,它能实现这个算法。

我们内部维持 2 个通道(channel),每个通道(channel)存储 N/2 分钟的延迟数据。每个 N/2 分钟我们重置了每个通道(channel)(并开始插入新的数据)。

因为我们仅在通道级别收集数据,对于话题我们聚合并合并所有的通道数量的 quantiles。如果数据在同一个 nsqd 实例上时,可以使用这个技术。然而当数据已经精确的通过 nsqd (通过 nsqlookupd ),我们为每个 nsqd 取平均值。为了维持统计的精确性,除了平均值,我们也提供最大最小值。

注意: 如果没有消费者连接,不能更新值,尽管消息队列的点对点时间会缓慢增长。这是因为仅在 nsqd 收到从客户端发来 FIN 消息时才会重新计算。当消费者重新连接,这些值将会重新调整。

  1. Statsd / Graphite Integration

当使用 —statsd-address 来为 statsd (或类似 statsdaemon)指定 UDP : 时, nsqd 将会在 —statsd-interval 定期推送数据给 statsd(注意:这个间隔必须始终小于等于 graphite 的刷入间隔)。设置 nsqadmin 可以显示图标。

推荐以下配置(但是这些选择必须建立在你的可用资源和要求上)。同时,statsd 的刷入间隔必须小于或者等于 storage-schemas.conf 的最小值,并且 nsqd 必须通过 —statsd-interval 来确认刷入时间小于等于时间间
隔。

  1. # storage-schemas.conf
  2. [nsq]
  3. pattern = ^nsq\..*
  4. retentions = 1m:1d,5m:30d,15m:1y
  5. # storage-aggregation.conf
  6. [default_nsq]
  7. pattern = ^nsq\..*
  8. xFilesFactor = 0.2
  9. aggregationMethod = average

nsqd 实例将会推送给以下 statsd 路径:

  1. nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.backend_depth [gauge]
  2. nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.depth [gauge]
  3. nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.message_count
  4. nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.channel.<channel_name>.backend_depth [gauge]
  5. nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.channel.<channel_name>.clients [gauge]
  6. nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.channel.<channel_name>.deferred_count [gauge]
  7. nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.channel.<channel_name>.depth [gauge]
  8. nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.channel.<channel_name>.in_flight_count [gauge]
  9. nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.channel.<channel_name>.message_count nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.channel.<channel_name>.requeue_count nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.channel.<channel_name>.timeout_count
  10. # if --statsd-mem-stats is enabled nsq.<nsqd_host>_<nsqd_port>.mem.heap_objects [gauge]
  11. nsq.<nsqd_host>_<nsqd_port>.mem.heap_idle_bytes [gauge]
  12. nsq.<nsqd_host>_<nsqd_port>.mem.heap_in_use_bytes [gauge]
  13. nsq.<nsqd_host>_<nsqd_port>.mem.heap_released_bytes [gauge]
  14. nsq.<nsqd_host>_<nsqd_port>.mem.gc_pause_usec_100 [gauge]
  15. nsq.<nsqd_host>_<nsqd_port>.mem.gc_pause_usec_99 [gauge]
  16. nsq.<nsqd_host>_<nsqd_port>.mem.gc_pause_usec_95 [gauge]
  17. nsq.<nsqd_host>_<nsqd_port>.mem.mem.next_gc_bytes [gauge]
  18. nsq.<nsqd_host>_<nsqd_port>.mem.gc_runs
  19. # if --e2e-processing-latency-percentile is specified, for each percentile
  20. nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.e2e_processing_latency_<percent> [gauge]
  21. nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.channel.<channel_name>.e2e_processing_latency_<percent> [gauge]

nsqlookupd

nsqlookupd 是守护进程负责管理拓扑信息。客户端通过查询 nsqlookupd 来发现指定话题(topic)的生产者,并且 nsqd 节点广播话题(topic)和通道(channel)信息。

有两个接口:TCP 接口, nsqd 用它来广播。HTTP 接口,客户端用它来发现和管理。

命令行选项

param description
-http-address=”0.0.0.0:4161” [addr]:[port] 监听 HTTP 客户端
-inactive-producer-timeout=5m0s 从上次 ping 之后,生产者驻留在活跃列表中的时长
-tcp-address=”0.0.0.0:4160” TCP 客户端监听的 [addr]:[port]
-broadcast-address 这个 lookupd 节点的外部地址, (默认是 OS 主机名)
-tombstone-lifetime=45s 生产者保持 tombstoned 的时长
-verbose=false 允许输出日志
-version=false 打印版本信息

HTTP 接口

/lookup

  1. 返回某个话题(topic)的生产者列表。
  2. 参数:
  3. topic - 指定的话题

/topics

  1. 返回所有已知的话题(topic)

/channels

  1. 返回已知话题(topic)里的通道(channel)
  2. 参数:
  3. topic - 指定的话题

/nodes

  1. 返回所有已知的 nsqd 列表

/delete_topic

  1. 删除一个已存在的话题(topic)
  2. 参数:
  3. topic - 需要删除的话题(topic)

/delete_channel

  1. 删除一个已存在话题(topic)的通道(channel)
  2. 参数:
  3. topic - 已经存在的话题(topic)
  4. channel - 将要删除的通道(channel)

/tombstone_topic_producer

  1. 逻辑删除(Tombstones)某个话题(topic)的生产者。参见 deletion and tombstones
  2. 参数:
  3. topic - 已经存在的话题(topic)
  4. node - 将要逻辑删除(tombstones)的生产者(nsqd) (通过 <broadcast_address>:<http_port> 识别)

/ping

  1. 监控端点,必须返回 OK

/info

  1. 返回版本信息

删除和逻辑删除(Tombstones)

当一个话题(topic)不再全局生产,相对简单的操作是从集群里清理这个消息。假设所有的应用生产的消息下 降,使用 /delete_topic 结束 nsqlookupd 实例的,是必须要完成的操作。(内部来说,它将会识别 nsqd 生产者,并对这些节点执行合适的操作)。

全局来看,通道(channel)删除进程都很类似,不同点是你需用 /delete_channel 结束 nsqlookupd 实例,并且你必须保证所有的订阅了通道(channel)得消费者已经下降(downed)。

然而,当话题(topic)不再在节点的子集上生产的时候情况比较复杂。因为消费者查询 nsqlookupd 的方法并且连接到所有生产者,你加入的竞争环境尝试移除集群的信息,消费者发现这些节点并重新连接。(因此推送更新,话题(topic)仍然在节点上生产)。解决办法就是逻辑删除(tombstones)。逻辑删除(tombstone s)在 nsqlookupd 上下文是特定的生产者和最后的配置 —tombstone-lifetime 时间。在这个窗口中,生产者不会在 /lookup 查询中列出,允许节点删除话题(topic),扩散这些信息到 nsqlookupd (接着逻辑删除(to mbstoned)生产者),并阻止生产者重新发现这个节点。

nsqadmin

nsqadmin 是一套 WEB UI,用来汇集集群的实时统计,并执行不同的管理任务。

命令行选项

param description
-graphite-url=”” URL to graphite HTTP 地址
-http-address=”0.0.0.0:4171” : HTTP clients 监听的地址和端口
-lookupd-http-address=[] lookupd HTTP 地址 (可能会提供多次)
-notification-http-endpoint=”” HTTP 端点 (完全限定) ,管理动作将会发送到
-nsqd-http-address=[] nsqd HTTP 地址 (可能会提供多次)
-proxy-graphite=false Proxy HTTP requests to graphite
-template-dir=”” 临时目录路径
-use-statsd-prefixes=true expect statsd prefixed keys in graphite (ie
-version=false 打印版本信息

statsd / Graphite Integration

使用 nsqd —statsd-address=… 的时候,你可以指定一个 nsqadmin —graphite-url=http://graphite.yourdo main.com 允许 nsqadmin 上的 graphite 图表。如果使用一个统计克隆 (例如 statsdaemon),它没有前缀的 键值,也可以指定 —use-statsd-prefix=false 。

Admin 通知

如果设置了 —notification-http-endpoint 标志,每次 admin 动作执行的时候(例如暂停一个通道(channel)), nsqadmin 将会发送一个 POST 请求到指定(完全限定)端点。 请求的内容包含的动作信息,例如:

  1. {
  2. "action": "unpause_channel",
  3. "channel": "mouth",
  4. "topic": "beer",
  5. "timestamp": 1357683731,
  6. "user": "df",
  7. "user_agent": "Mozilla/5.0 (Macintosh; Iphone 8)",
  8. "remote_ip": "1.2.3.4:5678"
  9. }

如果在请求时用户名可用, user 字段将会填充,如果之前使用 htpasswd 授权,或者google-auth-proxy 之 后,否则卫空字符串。当不可用时 channel 字段也会为空。

提示: 通过设置 —notification-http-endpoint 为 http://addr.of.nsqd/put?topic=admin_actions ,你可以创建 一个 admin 的动作通知 NSQ 流,话题(topic)名为 admin_actions 。

工具

这些工具辅助到数据流的通用功能和内部检查。

nsq_stat

为所有的话题(topic)和通道(channel)的生产者轮询 /stats ,并显示统计数据:

  1. -----------depth-----------+--------------metadata---------------
  2. total mem disk inflt def | req t-o msgs clients
  3. 24660 24660 0 0 20 | 102688 0 132492418 1
  4. 25001 25001 0 0 20 | 102688 0 132493086 1
  5. 21132 21132 0 0 21 | 102688 0 132493729 1

命令行参数

param description
-channel=”” NSQ 通道(channel)
-lookupd-http-address= lookupd HTTP 地址 (可能会给多次)
-nsqd-http-address= nsqd HTTP 地址 (可能会给多次)
-status-every=2s 轮询/打印输出见的时间间隔
-topic=”” NSQ 话题(topic)
-version=false 打印版本

nsq_tail

消费指定的话题(topic)/通道(channel),并写到 stdout (和 tail(1) 类似)。

命令行参数

param description
-channel=”” NSQ 通道(channel)
-consumer-opt= 传递给 nsq.Consumer (可能会给多次, http://godoc.org/github.com/bitly/go-nsq#Config)
-lookupd-http-address= lookupd HTTP 地址 (可能会给多次)
-max-in-flight=200 允许进入 flight 状态最大的消息数
-n=0 显示的消息总数(消息不足时会阻塞)
-nsqd-tcp-address= nsqd TCP 地址 (可能会给多次)
-topic=”” NSQ 话题(topic)
-version=false 打印版本信息

nsq_to_file

消费指定的话题(topic)/通道(channel),并写到文件中,有选择的滚动和/或压缩文件。

命令行参数

param description
-channel=”nsq_to_file” nsq 通道(channel)
-consumer-opt= 传递给 nsq.Consumer 的参数 (可能会给多次, http://godoc.org/github.com/bitly/go-nsq#Config)
-datetime-format=”%Y-%m-%d_%H” strftime,和 filename 里格式兼容
-filename-format=”…log” output 文件名格式
-gzip=false gzip 输出文件
-gzip-level=6 gzip 压缩级别 (1-9, 1=BestSpeed, 9=BestCompression)
-host-identifier=”” 输出到 log 文件,提供主机名。 是有效的替换者
-lookupd-http-address= lookupd HTTP 地址 (可能会给多次)
-max-in-flight=200 允许进入 flight 状态最大的消息数
-nsqd-tcp-address= nsqd TCP 地址 (可能会给多次)
-output-dir=”/tmp” 输出文件所在的文件夹
-skip-empty-files=false 忽略写空文件
-topic= nsq 话题(topic) (可能会给多次)
-topic-refresh=1m0s 话题(topic)列表刷新的频率
-version=false 打印版本信息

nsq_to_http

消费指定的话题(topic)/通道(channel)和执行 HTTP requests (GET/POST) 到指定的端点。

命令行参数

param description
-channel=”nsq_to_http” nsq 通道(channel)
-consumer-opt= 参数,通过 nsq.Consumer (可能会给多次, http://godoc.org/github.com/bitly/go-nsq#Config)
-content-type=”application/octet-stream” the Content-Type 使用 d for POST requests
-get= Get HTTP 地址
-http-timeout=20s HTTP 超时
-lookupd-http-address= lookupd HTTP 地址 (可能会给多次)
-max-in-flight=200 允许进入 flight 状态最大的消息数
-mode=”round-robin” 数据流方式: multicast, round-robin, hostpool
-n=100 发送并发数
-nsqd-tcp-address= nsqd TCP 地址 (可能会给多次)
-post= POST HTTP 地址
-sample=1 发布的消息百分比
-status-every=250 记录 status 时间间隔
-topic=”” nsq 话题(topic)
-version=false 打印版本信息

nsq_to_nsq

消费者指定的话题/通道和重发布消息到目的地 nsqd 通过 TCP。

命令行参数

param description
-channel=”nsq_to_nsq” nsq 通道(channel)
-consumer-opt= 参数,通过 nsq.Consumer (可能会给多次, see http://godoc.org/github.com/bitly/go-nsq#Config)
-destination-nsqd-tcp-address= destination nsqd TCP 地址 (可能会给多次)
-destination-topic=”” destination nsq 话题(topic)
-lookupd-http-address= lookupd HTTP 地址 (可能会给多次)
-max-in-flight=200 允许 flight 最大的消息数
-mode=”round-robin” 上行请求的参数: round-robin (默认), hostpool
-nsqd-tcp-address= nsqd TCP 地址 (可能会给多次)
-producer-opt= 传递到 nsq.Producer (可能会给多次, 参见 http://godoc.org/github.com/bitly/go-nsq#Config)
-require-json-field=”” JSON 消息: 仅传递消息,包含这个参数
-require-json-value=”” JSON 消息: 仅传递消息要求参数有这个值
-status-every=250 # 请求日志的状态(每个目的地), 0 不可用
-topic=”” nsq 话题(topic)
-version=false 打印版本信息
-whitelist-json-field= JSON 消息: 传递这个字段 (可能会给多次)

to_nsq

采用 stdin 流,并分解到新行(默认),通过 TCP 重新发布到目的地 nsqd 。

命令行参数

param description
-delimiter=”\n” 分割字符串(默认’\n’)
-nsqd-tcp-address= 目的地 nsqd TCP 地址 (可能会给多次)
-producer-opt= 参数,通过 nsq.Producer (可能会给多次, http://godoc.org/github.com/bitly/go-nsq#Config)
-topic=”” 发布到的 NSQ 话题(topic)