事件系统
异步系统
任务处理系统
watermill
内置了很多订阅-发布实现,最简单、直接的要属GoChannel
。我们就以这个实现为例介绍watermill
的特性。
Go library for building event-driven applications.
github.com/ThreeDotsLabs/watermill
https://watermill.io/docs/getting-started/
Watermill is a Golang library for working efficiently with message streams. It is intended for building event-driven applications. It can be used for event sourcing, RPC over messages, sagas, and whatever else comes to your mind. You can use conventional pub/sub implementations like Kafka or RabbitMQ, but also HTTP or MySQL binlog, if that fits your use case.
It comes with a set of Pub/Sub implementations and can be easily extended by your own.
Watermill also ships with standard middlewares like instrumentation, poison queue, throttling, correlation, and other tools used by every message-driven application.
watermill是 Go 语言的一个异步消息解决方案,它支持消息重传、保存消息,后启动的订阅者也能收到前面发布的消息。watermill
内置了多种订阅-发布实现,包括Kafka/RabbitMQ
,甚至还支持HTTP/MySQL binlog
。当然也可以编写自己的订阅-发布实现。此外,它还提供了监控、限流等中间件。
https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/basic/1-your-first-app
Golang CQRS implementation in Watermill.
https://watermill.io/docs/cqrs
watermill是 Go 语言的一个异步消息解决方案,它支持消息重传、保存消息,后启动的订阅者也能收到前面发布的消息。watermill
内置了多种订阅-发布实现,包括Kafka/RabbitMQ
,甚至还支持HTTP/MySQL binlog
。当然也可以编写自己的订阅-发布实现。此外,它还提供了监控、限流等中间件。
路由其实管理多个订阅者,每个订阅者在一个独立的goroutine
中运行,彼此互不干扰。订阅者收到消息后,交由注册时指定的处理函数(HandlerFunc)。路由还可以设置插件(plugin)和中间件(middleware),插件是定制路由的行为,而中间件是定制处理器的行为。处理器处理消息后会返回若干消息,这些消息会被路由重新发布到(另一个)管理器中。
watermill
中内置了几个比较常用的中间件:
IgnoreErrors
:可以忽略指定的错误;Throttle
:限流,限制单位时间内处理的消息数量;Poison
:将处理失败的消息以另一个主题发布;Retry
:重试,处理失败可以重试;Timeout
:超时,如果消息处理时间超过给定的时间,直接失败。InstantAck
:直接调用消息的Ack()
方法,不管后续成功还是失败;RandomFail
:随机抛出错误,测试时使用;Duplicator
:调用两次处理函数,两次返回的消息都重新发布出去,double~Correlation
:处理函数生成的消息都统一设置成原始消息中的correlation id
,方便追踪消息来源;Recoverer
:捕获处理函数中的panic
,包装成错误返回。
Event-Driven
https://github.com/ThreeDotsLabs/event-driven-example
- github.com/ThreeDotsLabs/watermill/message/message.go
// ...
type Message struct {
// UUID 是消息的唯一标识符。
//
// 它仅由 Watermill 用于调试。
// UUID 可以为空。
UUID string
// Metadata 包含消息元数据。
//
// 可以用于存储不需要对整个有效负载 unmarshaling 的数据。
// 它类似于 HTTP 请求的头。
//
// Metadata 将被 marshal,并将保存到 PubSub。
Metadata Metadata
// Payload 是消息的有效负载。
Payload Payload
// 当收到确认时,ack 关闭。
ack chan struct{}
// noACk is closed, when negative acknowledge is received.
// 当收到否定应答时,noACk 关闭。
noAck chan struct{}
ackMutex sync.Mutex
ackSentType ackType
ctx context.Context
}
// ...