事件系统
异步系统
任务处理系统
watermill内置了很多订阅-发布实现,最简单、直接的要属GoChannel。我们就以这个实现为例介绍watermill的特性。
Go library for building event-driven applications.
github.com/ThreeDotsLabs/watermill
https://watermill.io/docs/getting-started/
Watermill is a Golang library for working efficiently with message streams. It is intended for building event-driven applications. It can be used for event sourcing, RPC over messages, sagas, and whatever else comes to your mind. You can use conventional pub/sub implementations like Kafka or RabbitMQ, but also HTTP or MySQL binlog, if that fits your use case.
It comes with a set of Pub/Sub implementations and can be easily extended by your own.
Watermill also ships with standard middlewares like instrumentation, poison queue, throttling, correlation, and other tools used by every message-driven application.
watermill是 Go 语言的一个异步消息解决方案,它支持消息重传、保存消息,后启动的订阅者也能收到前面发布的消息。watermill内置了多种订阅-发布实现,包括Kafka/RabbitMQ,甚至还支持HTTP/MySQL binlog。当然也可以编写自己的订阅-发布实现。此外,它还提供了监控、限流等中间件。
https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/basic/1-your-first-app
Golang CQRS implementation in Watermill.
https://watermill.io/docs/cqrs
watermill是 Go 语言的一个异步消息解决方案,它支持消息重传、保存消息,后启动的订阅者也能收到前面发布的消息。watermill内置了多种订阅-发布实现,包括Kafka/RabbitMQ,甚至还支持HTTP/MySQL binlog。当然也可以编写自己的订阅-发布实现。此外,它还提供了监控、限流等中间件。

路由其实管理多个订阅者,每个订阅者在一个独立的goroutine中运行,彼此互不干扰。订阅者收到消息后,交由注册时指定的处理函数(HandlerFunc)。路由还可以设置插件(plugin)和中间件(middleware),插件是定制路由的行为,而中间件是定制处理器的行为。处理器处理消息后会返回若干消息,这些消息会被路由重新发布到(另一个)管理器中。
watermill中内置了几个比较常用的中间件:
IgnoreErrors:可以忽略指定的错误;Throttle:限流,限制单位时间内处理的消息数量;Poison:将处理失败的消息以另一个主题发布;Retry:重试,处理失败可以重试;Timeout:超时,如果消息处理时间超过给定的时间,直接失败。InstantAck:直接调用消息的Ack()方法,不管后续成功还是失败;RandomFail:随机抛出错误,测试时使用;Duplicator:调用两次处理函数,两次返回的消息都重新发布出去,double~Correlation:处理函数生成的消息都统一设置成原始消息中的correlation id,方便追踪消息来源;Recoverer:捕获处理函数中的panic,包装成错误返回。
Event-Driven
https://github.com/ThreeDotsLabs/event-driven-example
- github.com/ThreeDotsLabs/watermill/message/message.go
// ...type Message struct {// UUID 是消息的唯一标识符。//// 它仅由 Watermill 用于调试。// UUID 可以为空。UUID string// Metadata 包含消息元数据。//// 可以用于存储不需要对整个有效负载 unmarshaling 的数据。// 它类似于 HTTP 请求的头。//// Metadata 将被 marshal,并将保存到 PubSub。Metadata Metadata// Payload 是消息的有效负载。Payload Payload// 当收到确认时,ack 关闭。ack chan struct{}// noACk is closed, when negative acknowledge is received.// 当收到否定应答时,noACk 关闭。noAck chan struct{}ackMutex sync.MutexackSentType ackTypectx context.Context}// ...
 
