事件系统
异步系统
任务处理系统

watermill内置了很多订阅-发布实现,最简单、直接的要属GoChannel。我们就以这个实现为例介绍watermill的特性。
Go library for building event-driven applications.
github.com/ThreeDotsLabs/watermill
https://watermill.io/docs/getting-started/

Watermill is a Golang library for working efficiently with message streams. It is intended for building event-driven applications. It can be used for event sourcing, RPC over messages, sagas, and whatever else comes to your mind. You can use conventional pub/sub implementations like Kafka or RabbitMQ, but also HTTP or MySQL binlog, if that fits your use case.
It comes with a set of Pub/Sub implementations and can be easily extended by your own.
Watermill also ships with standard middlewares like instrumentation, poison queue, throttling, correlation, and other tools used by every message-driven application.

watermill是 Go 语言的一个异步消息解决方案,它支持消息重传、保存消息,后启动的订阅者也能收到前面发布的消息。watermill内置了多种订阅-发布实现,包括Kafka/RabbitMQ,甚至还支持HTTP/MySQL binlog。当然也可以编写自己的订阅-发布实现。此外,它还提供了监控、限流等中间件。
https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/basic/1-your-first-app

Golang CQRS implementation in Watermill.
https://watermill.io/docs/cqrs

watermill是 Go 语言的一个异步消息解决方案,它支持消息重传、保存消息,后启动的订阅者也能收到前面发布的消息。watermill内置了多种订阅-发布实现,包括Kafka/RabbitMQ,甚至还支持HTTP/MySQL binlog。当然也可以编写自己的订阅-发布实现。此外,它还提供了监控、限流等中间件。

image.png
路由其实管理多个订阅者,每个订阅者在一个独立的goroutine中运行,彼此互不干扰。订阅者收到消息后,交由注册时指定的处理函数(HandlerFunc)。路由还可以设置插件(plugin)和中间件(middleware),插件是定制路由的行为,而中间件是定制处理器的行为。处理器处理消息后会返回若干消息,这些消息会被路由重新发布到(另一个)管理器中。

watermill中内置了几个比较常用的中间件:

  • IgnoreErrors:可以忽略指定的错误;
  • Throttle:限流,限制单位时间内处理的消息数量;
  • Poison:将处理失败的消息以另一个主题发布;
  • Retry:重试,处理失败可以重试;
  • Timeout:超时,如果消息处理时间超过给定的时间,直接失败。
  • InstantAck:直接调用消息的Ack()方法,不管后续成功还是失败;
  • RandomFail:随机抛出错误,测试时使用;
  • Duplicator:调用两次处理函数,两次返回的消息都重新发布出去,double~
  • Correlation:处理函数生成的消息都统一设置成原始消息中的correlation id,方便追踪消息来源;
  • Recoverer:捕获处理函数中的panic,包装成错误返回。

Event-Driven

https://github.com/ThreeDotsLabs/event-driven-example
image.png

  • github.com/ThreeDotsLabs/watermill/message/message.go
    1. // ...
    2. type Message struct {
    3. // UUID 是消息的唯一标识符。
    4. //
    5. // 它仅由 Watermill 用于调试。
    6. // UUID 可以为空。
    7. UUID string
    8. // Metadata 包含消息元数据。
    9. //
    10. // 可以用于存储不需要对整个有效负载 unmarshaling 的数据。
    11. // 它类似于 HTTP 请求的头。
    12. //
    13. // Metadata 将被 marshal,并将保存到 PubSub。
    14. Metadata Metadata
    15. // Payload 是消息的有效负载。
    16. Payload Payload
    17. // 当收到确认时,ack 关闭。
    18. ack chan struct{}
    19. // noACk is closed, when negative acknowledge is received.
    20. // 当收到否定应答时,noACk 关闭。
    21. noAck chan struct{}
    22. ackMutex sync.Mutex
    23. ackSentType ackType
    24. ctx context.Context
    25. }
    26. // ...