在微服务架构中,应用程序中的服务应该是相互隔离的,这样它们就可以独立地被开发、托管、升级和扩展。消息传递是一种重要的技术,可以在这些松散耦合的服务之间实现灵活的通信模式。作为一个分布式运行时,Dapr 为开发者设计事件驱动的微服务应用提供了内置的消息传递支持。
事件驱动编程
我们的世界充满了事件 —— 事实上,任何发生的事情都可以被认为是一个事件。恒星的碰撞、火山的爆发、书页的翻动、眼睛的眨动;这些都是事件,有不同程度的意义与之相关。有些事件,如眨眼,很少引起任何重大反应。而其他事件,如自然灾害,则肯定会引起反应。
事件驱动编程对事件如何触发反应进行建模。它已经成为 GUI 程序的普遍编程模型,因为它提供了一种自然的方式来模拟人机交互 —— 有人点击了一个按钮,就有事情发生。
事件驱动编程在由松散耦合的组件组成的系统中也很适用。一个组件通过引发一个事件来通知其他组件事情已经发生,而这个事件可以被一个或多个感兴趣的监听器接收并触发额外的动作。
本章和下一章介绍了 Dapr 如何使用触发器、绑定和发布/订阅支持事件驱动编程。
消息与事件
事件和消息都是携带一些信息的数据包。然而,它们有一些微妙的区别,特别是在处理它们的方式上。
事件表明某些事情已经发生。你不能否认一个事件的发生,因为它发生在过去。例如,你通过电子邮件收到一个新的营销手册的事实就是一个事件。你可以选择忽略这个事件,也许还有小册子本身,但你不能改变它被发送给你的事实。触发事件的一方通常并不关心该事件是否被任何监听者所接收。它发送事件是为了通知任何感兴趣的人;接收者(如果有的话)做什么回应不是它关心的。
另一方面,消息是由一个发送者发送给特定的接收者的。在这种情况下,信息发送者有意向已知的一方发送一段数据。而且,它通常也期望从收件人那里得到一个回应。
因此,我们谈论的是发布一个事件和发送一个消息。
现在让我们把注意力转向接收者方面。为了接收某些类型的事件,接收者需要调到相应的频道,在这里通常被称为主题。这就像把收音机调到一个特定的电台来听你喜欢的音乐。在编程中,一个事件监听器经常在等待事件的过程中阻塞,它在事件来临时处理事件。另一方面,一个消息接收者不需要主动收听一个频道。它可以偶尔检查一下消息,并逐个或成批地处理收到的消息。当然,消息接收者也可以选择在等待新消息时进行阻塞,但它不需要监听一个主题,因为消息将直接传递给它的接收端点。
消息可以是推送的,也可以是拉取的。当发件人知道收件人的地址时,它可以将消息推送给所有指定的收件人。反之,知道发件人地址的收件人可以主动轮询发件人,把消息拉下来。一条消息通常由一个收件人处理,因为一旦消息被一个收件人检索到,它就不能被其他收件人使用。这与事件不同,事件通常是广播的,由多个事件处理程序处理。
当一个消息经过一系列的发送者和接收者时,它被说成是经过一个管道或一个工作流。这有时也被称为反应式编程(Reactive Programming)。
在本章的其余部分,我们将不严格区分这两个术语,因为传递消息和事件的机制是一样的。然而,保持对它们之间区别的清晰理解是有帮助的。
输入绑定和输出绑定
网络服务是消息或事件的接收者。接收消息很简单,服务只需监听其端点并等待新消息的到来。然而,获取事件则比较麻烦。事件通常通过不同的消息传递骨干网发布,这些骨干网使用不同的认证和授权方法、不同的通信协议、不同的 API 和不同的交付方式(如推送与拉取)。
这就是输入绑定和输出绑定的作用。输入绑定将一个服务附加到一个事件源上,这样该服务就可以被来自该源的事件所触发。通过绑定,网络服务可以使用相同的处理程序对消息和外部事件做出反应,因为绑定获得了事件并将事件作为消息直接传递给服务的端点。输出绑定允许服务将消息推送到目的地。服务也可以从输出绑定中获得响应。换句话说,输入绑定是从外部系统到你的应用程序的单向通信渠道,而输出绑定是你的应用程序和外部系统之间的双向通信渠道。
绑定定义对外部系统的所有细节进行了抽象。网络服务可以被附加到任何事件源和任何事件目的地(或 “槽” Sink),而不需要进行任何修改。这是一个强大的机制,允许系统在运行时动态地适应不同的消息流。
我(Haishi)很喜欢通过说一些他不喜欢的话来惹恼 Yaron,比如 “《星际迷航》比《星球大战》好”,“Go 不是一种真正的编程语言,因为它没有继承和泛型”。玩笑归玩笑,我真诚地希望 Go 有一天真的支持泛型。如果你仔细想想,绑定机制的工作原理有点像泛型 —— 它允许你设计一个泛型处理管道,然后通过绑定将该管道附加到不同的事件流上。
绑定允许多个系统通过定义的管道被 “粘” 在一起。而连接到不同的系统可能会产生完全不同的结果。例如,图 3-1 显示了一个有三个连接点的简单图像处理流水线。将这些点与不同的服务绑定会产生不同的情况:
- 一个典型的 Web 服务:
- 绑定到一个 HTTP 输入绑定
- 连接到一个人工智能模型(如 GPT-3),生成摘要文本
- 连接到出站的 HTTP 以返回结果
- 一个人脸检测流水线:
- 绑定到一个事件集线器,传输摄像头拍摄的图像
- 连接到一个检测人脸的人工智能模型(如 YOLO)
- 连接到一个数据库以保存结果
- 一个智能合约流水线:
- 绑定到一个传输扫描文件的事件中心
- 连接到光学字符识别(OCR)服务以检测文本
- 将检测到的文本发送到一个智能合约,将识别的文件内容归档
图 3-1. 使用绑定的图像处理流水线
发布/订阅
我们在 第 1 章 中简单地谈到了发布/订阅。正如你所看到的,Dapr 本身并不是一个消息总线。相反,Dapr 集成了流行的消息总线(感谢我们伟大的社区贡献!)来传递消息。
Dapr 选择使用 CloudEvents v1.0(一个 CNCF 项目)作为通用的事件信封格式,以提高连接服务的互操作性。CloudEvents 将事件定义为代表一个发生的背景和数据。事件可以通过行业标准(如 HTTP、AMQP、MQTT 和 SMTP)来传递。下面是 CloudEvents 资源库中的一个例子(被序列化为 JSON 文档):
{"specversion" : "1.0","type" : "com.github.pull.create","source" : "https://github.com/cloudevents/spec/pull","subject" : "123","id" : "A234-1234-1234","time" : "2018-04-05T17:31:00Z","comexampleextension1" : "value","comexampleothervalue" : 5,"datacontenttype" : "text/xml","data" : "<much wow=\"xml\"/>"}
Dapr 使用 Redis Streams 作为默认的发布/订阅消息传递主干。Redis Stream 是一个仅有附加的数据结构。它的有趣之处在于,它允许多个消费者阻塞并等待新数据。这些消费者被加入到不同的消费者组中,这使得同一消息流的一部分可以被不同的消费者连接起来,而不会相互干扰。
要发布一个事件到一个流中,你可以使用 XADD 命令和一个键/值对列表:XADD somestream * device-id 123 reading 35235.6。
为了监听流中的新项目,你可以使用带有特殊的 BLOCK 选项的 XREAD 命令,其超时时间为 0 毫秒,这意味着阻塞直到新数据到达:XREAD BLOCK 0 STREAMS somestream $。
请注意,一旦一个消息被读取,它就被认为是处于等待状态,但不会被从流中删除。消费者需要发出 XACK 命令来彻底地删除该消息。这种额外的确认通常是消息传递系统所要求的,以确保消息至少被处理一次。从本质上讲,消费者使用 XACK 来表示消息的成功处理。如果消费者在发送 XACK 命令之前就崩溃了,它可以在下一次恢复的时候拿起同样的消息。
Redis 还提供了一个 XCLAIM 命令,以从永久失效的消费者那里回收待处理的消息。在此,我们不会进一步讨论该命令。
这就是我们要讨论的理论性的东西。现在是将发布/订阅付诸行动的时候了。
使用 Dapr 来发布/订阅
在本讲中,我们将使用 PowerShell 编写一个脚本,订阅一个主题并在控制台中打印出收到的消息。
用 PowerShell 脚本进行实现
PowerShell 脚本需要做两件事。首先,它需要监听 /dapr/ 订阅路由,并在请求时以 JSON 数组的形式返回要订阅的任何主题的名称(在下面的例子中的 A)。第二,它需要监听相关路由(这里是 /A),等待 Dapr 发布事件。创建一个新的 app.ps1 文件,内容如下:
$httpServer = [System.Net.HttpListener]::new()$httpServer.Prefixes.Add("http://localhost:3000/")$httpServer.Start()while ($httpServer.IsListening) {$context = $httpServer.GetContext()if ($context.Request.HttpMethod -eq 'GET' -and $context.Request.RawUrl -eq '/dapr/subscribe') {$buffer = [System.Text.Encoding]::UTF8.GetBytes("[{topic: 'A', route: '/A']")$context.Response.ContentLength64 = $buffer.Length$context.Response.ContentType = "application/json"$context.Response.OutputStream.Write($buffer, 0, $buffer.Length)$context.Response.OutputStream.Close()}if ($context.Request.HttpMethod -eq 'POST' -and $context.Request.RawUrl -eq '/A') {$length = $context.Request.ContentLength64$buffer = [System.Byte[]]::CreateInstance([System.Byte],$length)$context.Request.InputStream.Read($buffer,0,$length)$message = [System.Text.Encoding]::UTF8.GetString($buffer)write-host $message$buffer = [System.Text.Encoding]::UTF8.GetBytes("OK")$context.Response.ContentLength64 = $buffer.Length$context.Response.OutputStream.Write($buffer, 0, $buffer.Length)$context.Response.OutputStream.Close()}}
这个脚本非常简单,它启动了一个 HTTP 监听器,并监听上述路由。当它收到对 /dapr/subscribe 路由的 GET 请求时,它返回一个 JSON 数组,其中包含对路由 /A 的主题 A 的订阅。当它收到对 /A 路由的 POST 请求时,它将打印出收到的消息。
用 Dapr CLI 测试发布/订阅
Dapr CLI 有一个内置的命令,可以用来发布消息到一个主题。这对于测试发布/订阅非常方便。我们将在这个练习中使用该命令:
- 确保你有 Redis 在本地运行。如果没有,你可以使用 Docker 启动一个新的 Redis 服务器:
docker run --name redis -d redis; 使用 Dapr 启动 PowerShell 脚本:
$ dapr run --app-id ps --port 3500 --app-port 3000 --protocol http cmd /c "powershell -f app.ps1"
使用 Dapr CLI 发送一个测试信息:
$ dapr publish --topic A --payload "{ \"message\": \"This is a test\" }"
你应该看到 PowerShell 控制台中显示的信息。
:::info 对于感兴趣的读者,Dapr 的 样例仓库 包括一个广泛的例子,涉及两个订阅者和一个 GUI 发布者。 :::
Dapr 发布/订阅行为
如前所述,Dapr 保证了至少一次的消息传递。如果你的应用程序需要准确地处理一次消息,你将需要实现你自己的跟踪方法 —— 例如使用状态存储 —— 以确保你不会多次处理相同的传输动作。无论如何,你的应用程序可能会被触发相同的消息,你将不得不在你的应用逻辑中处理重复数据。
一条消息被发布给所有订阅者。然而,如果你有多个具有相同应用 ID 的 Dapr 实例,只有其中一个实例会得到消息。当你使用竞争的消费者模式来消耗一个有多个实例的主题时,这很有用。因为每条消息将只发送给其中一个实例,这些实例可以通过分担工作量来集体处理待处理的消息。
扩展 Dapr 发布/订阅
Dapr 发布/订阅组件的工作方式与其他 Dapr 组件类型类似,如状态存储组件。在撰写本文时,Dapr 社区已经贡献了一些发布/订阅组件,包括 Redis Streams、NATS、Azure Service Bus 和 RabbitMQ。
Dapr 为发布/订阅定义了一个简单的接口,如下面的代码片断所示:
type PubSub interface {Init(metadata Metadata) errorPublish(req *PublishRequest) errorSubscribe(req SubscribeRequest, handler func(msg *NewMessage) error) error}
你应该在 Init 方法中初始化与消息传递骨干的连接。你还应该在 Publish 方法和 Subscribe 方法中自动提供主题。下面的代码片断显示了默认的 Redis 实现:
func (r *redisStreams) Publish(req *pubsub.PublishRequest) error {_, err := r.client.XAdd(&redis.XAddArgs{Stream: req.Topic,Values: map[string]interface{}{"data": req.Data},}).Result()if err != nil {return fmt.Errorf("redis streams: error from publish: %s", err)}return nil}func (r *redisStreams) Subscribe(req pubsub.SubscribeRequest, handler func(msg *pubsub.NewMessage) error) error {err := r.client.XGroupCreateMkStream(req.Topic, r.metadata.consumerID, "0").Err()if err != nil {log.Warnf("redis streams: %s", err)}go r.beginReadingFromStream(req.Topic, r.metadata.consumerID, handler)return nil}
在写这篇文章的时候,Dapr 还不支持动态主题订阅。因为这是一个经常被要求的功能,它可能很快就会被实现(当你读到这段文字的时候,可能已经实现了)。然而,在此之前,你的应用程序需要在启动时配置自己的路由,以订阅选定的主题,然后监听相应的路由。
使用 Dapr 进行输入输出绑定
输入和输出绑定是 Dapr 中的组件类型。输入绑定监控事件源并在收到新消息时触发应用逻辑。输出绑定将消息写到外部消息汇中。要使用一个输入绑定,你需要定义一个描述绑定的清单文件。然后,为了从绑定的主题中获取消息,你需要监听一个与主题同名的路径,用于 POST 请求。要向输出绑定发送消息,请向 Dapr 边车的 /v1.0/bind ings/<topic name> 端点发出 POST 请求。
在写这篇文章的时候,Dapr 支持几个输入和输出绑定,如 表 3-1 所总结。
表 3-1. Dapr 输入输出绑定
| 名称 | 输入绑定 | 输出绑定 |
|---|---|---|
| HTTP | ✅ | |
| Kafka | ✅ | ✅ |
| Kubernetes 事件 | ✅ | |
| MQTT | ✅ | ✅ |
| RabbitMQ | ✅ | ✅ |
| Redis | ✅ | |
| Twilio SMS | ✅ | |
| AWS DynamoDB | ✅ | |
| AWS S3 | ✅ | |
| AWS SNS | ✅ | |
| AWS SQS | ✅ | ✅ |
| Azure Blob 存储 | ✅ | |
| Azure Cosmos DB | ✅ | |
| Azure Event Hub | ✅ | ✅ |
| Azure Service Bus 队列 | ✅ | ✅ |
| Azure SignalR | ✅ | |
| GCP Cloud Pub/Sub | ✅ | ✅ |
| Google Cloud Storage 存储 | ✅ |
使用输入绑定
下面的 YAML 文件是一个输入绑定清单样本,描述了一个与 topic1 主题绑定的 Kafka 输入绑定:
apiVersion: dapr.io/v1alpha1kind: Componentmetadata:name: myEventspec:type: bindings.kafkametadata:- name: topicsvalue: topic1- name: brokersvalue: localhost:9092- name: consumerGroupvalue: group1
当收到新的消息时,Dapr 通过 POST 调用将消息发布到你的应用程序所托管的相应终端。例如,要订阅一个 sample-topic触发器,你的代码需要监听一个 /sample-topic路由,如下面的 Node.js 代码片段所示:
app.post('/sample-topic', (req, res) => {console.log(req.body);res.status(200).send();});
为了通知 Dapr 你的应用程序已经成功地处理了一条消息,你需要在处理完消息后返回一个 200 状态代码。否则,Dapr 会认为处理失败,并会尝试重新发送消息。尽管 Dapr 团队正在努力为所有 Dapr 组件提供自动重试,包括绑定。Dapr 有一些重试逻辑,但消息的交付并不是自动保证的。相反,由输入绑定实现来选择消息的交付方式,比如至少一次或最佳尝试。
使用输出绑定
为了向输出绑定发送消息,你将在 JSON 文档的数据字段中编码的消息有效载荷发送到端点 /v1.0/bindings/<topic name>,如下面来自 Dapr 样例仓库 的 Python 代码所示:
import timeimport requestsimport osdapr_port = os.getenv("DAPR_HTTP_PORT", 3500)dapr_url = "http://localhost:{}/v1.0/bindings/sample-topic".format(dapr_port)n = 0while True:n += 1payload = { "data": {"orderId": n}}print(payload, flush=True)try:response = requests.post(dapr_url, json=payload)print(response.text, flush=True)except Exception as e:print(e)time.sleep(1)
实现输入绑定
输入绑定是由一个 InputBinding 接口定义的:
type InputBinding interface {Init(metadata Metadata) errorRead(handler func(*ReadResponse) error) error}
读取方法应该是一个阻塞的方法 —— 它应该阻塞,并且在整个生命周期内不返回。它等待来自消息源的消息,并在新的消息到达时调用指定的回调函数(通常在一个单独的 Go 例程中)。下面的代码片段展示了 Kafka 触发器的实现。你可以看到读取方法如何不断循环以消耗来自主题的事件,直到该进程被系统中断信号强行终止:
func (k *Kafka) Read(handler func(*bindings.ReadResponse) error) error {config := sarama.NewConfig()config.Version = sarama.V1_0_0_0if k.authRequired {updateAuthInfo(config, k.saslUsername, k.saslPassword)}c := consumer {callback: handler,ready: make(chan bool),}client, err := sarama.NewConsumerGroup(k.brokers, k.consumerGroup, config)if err != nil {return err}ctx, cancel := context.WithCancel(context.Background())wg := &sync.WaitGroup{}wg.Add(1)go func() {defer wg.Done()for {if err = client.Consume(ctx, k.topics, &c); err != nil {log.Errorf("error from c: %s", err)}// check if context was cancelled, signaling that the c should stopif ctx.Err() != nil {return}c.ready = make(chan bool)}}()<-c.readysigterm := make(chan os.Signal, 1)signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)<-sigtermcancel()wg.Wait()if err = client.Close(); err != nil {return err}return nil}
实现输出绑定
输出绑定接口也同样简单:
type OutputBinding interface {Init(metadata Metadata) errorInvoke(req *InvokeRequest) (InvokeResponse, error)Operations() []OperationKind}
为了实现输出绑定,你在 Init 方法中初始化一个与下游事件目标的连接,并使用 Invoke 方法将给定的事件写入目标。如果 Invoke 方法返回时没有任何错误,那么消息就被认为已经成功发送。
实现一个输出绑定是非常直接的。一天下午,Mark Russinovich 决定试一试。虽然他当时对 Go 语言不是很熟悉,但他在几个小时内就实现了一个实用的 Twilio 短信输出绑定。该实现使用 Twilio 的 HTTP REST API 向输出绑定元数据中配置的指定收件人发送 SMS 消息。
func (t *SMS) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {toNumberValue := t.metadata.toNumberif toNumberValue == "" {toNumberFromRequest, ok := req.Metadata[toNumber]if !ok || toNumberFromRequest == "" {return nil, errors.New("twilio missing \"toNumber\" field")}toNumberValue = toNumberFromRequest}v := url.Values{}v.Set("To", toNumberValue)v.Set("From", t.metadata.fromNumber)v.Set("Body", string(req.Data))vDr := *strings.NewReader(v.Encode())twilioURL := fmt.Sprintf("%s%s/Messages.json", twilioURLBase, t.metadata.accountSid)httpReq, err := http.NewRequest("POST", twilioURL, &vDr)if err != nil {return nil, err}httpReq.SetBasicAuth(t.metadata.accountSid, t.metadata.authToken)httpReq.Header.Add("Accept", "application/json")httpReq.Header.Add("Content-Type", "application/x-www-form-urlencoded")resp, err := t.httpClient.Do(httpReq)if err != nil {return nil, err}defer resp.Body.Close()if !(resp.StatusCode >= 200 && resp.StatusCode < 300) {return nil, fmt.Errorf("error from Twilio: %s", resp.Status)}return nil, nil}
Invoke 方法可以通过 bindings.InvokeResponse 返回参数从连接的系统返回结果。这允许应用程序从外部系统获得响应,同时将消息分派给这些系统。
一个输出绑定也可以通过 Operations 方法定义它支持的操作类型。例如,下面的代码片断定义了一个输出绑定支持创建操作:
func (c *CosmosDB) Operations() []bindings.OperationKind {return []bindings.OperationKind{bindings.CreateOperation}}
使用 KEDA 的自动缩放功能
基于 Kubernetes 的事件驱动的自动缩放(KEDA)是 Yaron 发起的另一个项目,根据消息骨干的队列长度自动缩放 Pod,例如 Azure Event Hubs、Kafka、NATS 或 RabbitMQ。
使得 KEDA 格外有趣的是其从零开始/到零的扩展能力。当没有事件进入时,KEDA 可以移除所有的应用 Pod,这样你的应用在闲置时的托管成本就会降到最低。当有新的事件进入时,KEDA 在旋转 Pod 时暂时保留这些事件,然后将保留的事件分配给 Pod。然后,它作为一个 Kubernetes 指标服务器,将指标反馈给 Kubernetes Horizontal Pod Autoscaler,以进一步扩大规模。
你通过 ScaleObject 自定义资源来控制 KEDA 的行为。下面是一个 ScaleObject 的例子,当一个项目出现在 myQueItem Azure Storage 队列上时,它将扩展消息处理器的部署。
apiVersion: keda.k8s.io/v1alpha1kind: ScaledObjectmetadata:name: message-processornamespace: defaultlabels:deploymentName: message-processorspec:scaleTargetRef:deploymentName: message-processortriggers:- type: azure-queuemetadata:name: myQueueItemtype: queueTriggerqueueName: itemsqueueLength: "1"connection: AzureWebJobsStorage
你可以使用 KEDA 来实现诸如竞争消费者的模式。图 3-2 说明了一个消息驱动的系统,它使用绑定来交换生产者和消费者之间的数据。在这种情况下,消费者是由 KEDA 自动扩展的。当生产者不产生任何事件时,所有的消费者都被终止了。随着队列长度的增加,越来越多的消费者被旋转起来,以迅速耗尽请求队列。Dapr 只向共享同一应用ID的消费者实例中的一个发送每个消息的副本。换句话说,消费者实例在争夺消息的处理权 —— 这就是 “竞争的消费者”(Competing Consumer)模式的名称。
图 3-2. 与 KEDA 自动缩放相竞争的消费模式
当没有事件需要处理时,你也可以使用 KEDA 将整个处理管道扩展为零,如 图 3-3 所示。这适合那些偶尔被触发的系统,或者有季节性或周期性流量的系统。当系统处于空闲状态时,你所支付的只是一个监控舱(假设你使用的是理想的无服务器托管环境,按 Pod 分钟收费)。
图 3-3. 使用 KEDA 来扩展整个管道
接下来,我们将通过一些消息传递模式,并解释如何使用 Dapr 来实现它们。外面有很多消息传递模式,我们的选择有些随意。我们希望这个选择能代表一组不同的消息传递场景,但这绝不是一个全面的列表。
消息传递模式
基于消息的系统就像乐高积木一样:你可以将较小的碎片组装成有趣的图案。这一节涵盖了一些你可以使用 Dapr 的触发器和连接器组装的模式。
Saga 模式
许多复杂的工作流程涉及到从不同的服务提供者那里调用分布式服务。例如,为了预订一次旅行,你需要执行几个分布式事务,包括预订航班、预订酒店和预订租车。如果任何一个事务失败,你可能想重试失败的事务,或者你可能想回滚所有的事务。调用几个服务并不困难。处理不同的失败条件是很棘手的。本节讨论了如何使用 Dapr 连接器用 Saga 模式实现分布式事务。
Saga 模式的最初想法是将一个长效事务(LLT,Long-lived Transaction)分解成一系列较小的事务,以避免对数据库记录的长时间保留。这种模式在分布式系统中效果很好,因为在这些系统中不可能有这样的长期保留(或通用锁)。
Saga 模式中的事务可以由一个中央协调者协调,也可以由事件触发器协调。在中央协调器的情况下,协调器调用每个服务并跟踪整个交易的状态。在事件触发器的情况下,当一个事务完成后,它通过发送一个事件来触发下一个事务。
使用一个中央协调人
图 3-4 说明了中央协调器如何协调分布式事务。协调器依次调用各个服务来完成整个工作流程,并将其状态保存在一个持久化的状态存储中,以便它能从崩溃中恢复。编写协调器的一种方法是使用带有 Dapr 状态存储的 Dapr 角色。当你使用 Dapr 行为体时,每个分布式事务都由一个唯一的行为体 ID 来识别。这使得多个分布式事务可以并行执行。为了使事务自我驱动,你可以使用行为体提醒或定时器,让它定期检查自己的状态,并采取必要的行动来驱动事务的完成。
如果任何一个步骤失败,协调者将调用相应的服务程序来取消交易。你也可以在事务消息模式中应用灵活的策略,例如在不能进行某项预订时寻找替代方案。当然,这意味着要写更多的代码,但你会得到更精细的控制权作为回报。
图 3-4. 有中央协调人的分布式事务
使用事件
另一种实现 Saga 模式的方法是在工作流进行过程中使用事件来触发下一步。图 3-5 显示了一种通过一系列事件来触发事务性步骤的设计。工作流是由一个外部事件发起的,例如来自 Web 界面的提交。然后,第一个事务被执行。完成后,第一个事务会引发一个事件来触发第二个事务,以此类推。
图 3-5. 事件驱动的分布式事务
这种策略对于有几个事务的简单工作流来说通常很有效。这种设计允许最大限度的并行化,而且它允许在不影响整个工作流程的情况下更换不同的服务提供者。例如,预订酒店事务处理程序可以被一个更高级的处理程序所取代,该处理程序订阅相同的事件,但要尝试几个酒店进行预订。
你也可以临时改变处理系统的整体拓扑结构。例如,你可以让多个事务处理程序订阅同一个 On_BookTrip 事件,并试图并行地执行多个预订。
然而,当工作流程变得更加复杂时,跟踪正在发生的事情就变得很困难。由于系统的分布式性质,测试也更加困难。图 3-6 显示了在考虑故障时对消息系统的更全面的看法。当任何一个事务处理程序不能进行预约时,它就会引发一个单独的事件(如 On_CarOutOfStock)来否定该事务。最终,一个计费服务在事件链的末端作出反应,并向用户退款。正如你所看到的,当你考虑不同的可能性时,事件的数量和交易处理程序之间互动的复杂性迅速增加。
图 3-6. 带有取消事件的分布式事务
如果一些信息丢失,工作流程也可能停顿。而且,由于至少有一次的交付行为,你必须确保你不会重复预订。
总之,建立一个高度并行的、高度可靠的、带有分布式事务的工作流系统需要相当多的工作。在写这篇文章的时候,Dapr 团队正在积极研究使用 Dapr Actor 作为有状态的协调器(Dapr Workflow)的工作流功能。我们还将与现有的产品和开源项目合作,以提高不同工作流系统的互操作性。这可能需要采用一种通用的工作流描述语言,或一系列能够在描述格式之间进行转换的转换层。
无论如何,Dapr 的工作流引擎的目标是自成一体、轻量级和高度并行化。例如,在我们建立了工作流行动的依赖树之后,我们将尽可能地平行执行行动,并且我们将允许 “N 个行动中的任何一个” 模式,其中一个行动可以等待,并在任何一个上游行动完成后继续。
基于内容的路由
聆听事件源是有一定代价的。至少,你需要维持一个与事件源的连接,以便接收新的事件。当你有许多不同类型的消息处理程序对同一事件源感兴趣时,保持与同一事件源的许多并发连接是低效或不切实际的。在这种情况下,你可能想建立一个中央消息分配器,订阅事件源并根据消息头或消息体将消息分配给不同的处理程序。图 3-7 显示了一个消息分配器根据消息内容向不同的处理程序发送消息。
图 3-7. 基于内容的路由
当你有多个消息处理程序候选者来处理一个消息时,基于内容的路由很有用。请记住,每一个处理程序的设计都可能没有考虑到任何整合。当一个处理程序看到一个无法识别的消息格式时,它可能会简单地失败。如果它们都订阅了原始的 On_Message 事件,并独自行动,那么就很难跟踪一条消息是否被多次处理,或者是否被处理。当你使用基于内容的路由模式时,调度器会检查传入的消息,并将它们分配给最合适的处理程序。
调度器是无状态的,因为检查单个消息与其他消息没有关系。你可以扩大调度器的规模(例如,通过使用竞争的消费者模式),以避免它成为一个瓶颈。
你也可以用一个死信队列(Dead Letter Queue)来扩展这个模式。当调度器检测到一个不能被任何处理程序处理的消息时,它可以将该消息排入死信队列,该队列将由一个离线进程检查和清理。
路由单
在一个复杂的基于消息的系统中,消息处理程序通过消息主干网相互连接,形成具有许多不同路径的错综复杂的消息管道。如果你想控制消息选择的确切路径,你可以给消息附加一个路由单(Routing Slip)。路由单定义了消息应该经过的路径。当处理程序完成其操作时,它在路由单上勾选自己,并将信息发送到列表中的下一个收件人。路由单上的条目可以是处理程序的地址,但这将使处理程序紧密地结合在一起。相反,这些条目可以是消息主题,处理者通过这些主题进行连接,如 图 3-8 所示。
图 3-8. 路由单
路由单模式的主要好处是能够动态地配置每条消息的路线。该模式允许分布式的、异步的、平行的消息处理,同时确保预定义的流程在没有中央消息协调人的情况下精确执行。
智能代理
如前所述,Dapr 使用一个边车结构来提供通用的功能,如状态管理和消息传递给应用程序。如果我们扩展 Dapr 并允许它向应用程序提供任意的功能,如人脸检测,会怎么样呢?换句话说,如果一个应用程序可以通过 Dapr 动态地获取和消费任何需要的功能呢?
这的确是一个迷人的想法。然而,要使它成为现实,还需要大量的工作。一个应用程序如何描述它的意图?意图如何被转化为匹配能力的搜索标准?能力是如何被消耗的?所有这些问题都必须被回答。我们认为需要一个新的架构范式,我们称之为面向能力的架构(COA,Capability Oriented Architecture),以系统地解决这些问题。对 COA 的详细讨论超出了本书的范围。相反,我们将讨论一个更简单但同样有用的模式:智能代理模式。
一个智能代理暴露了一个本地端点供客户调用,然后使用各种技术的组合来带来所需的能力。例如,它可以直接调用一个远程服务,或者向下游处理程序发送一个消息并等待响应。它甚至可以调用一个复杂的工作流程,下载一个 Docker 镜像,启动它,然后调用一个由 Docker 容器暴露的端点。不管代理背后发生了什么,调用代理的客户端只看到通过本地主机进行的简单、直接的服务调用。
图 3-9 显示了一个作为智能代理的 Dapr 边车的可能配置。你可以看到代理使用不同的技术来调用所需的服务,并在单一服务调用的背景下将结果返回给调用者。
图 3-9. 智能代理
MapReduce
MapReduce 模式将一个大任务分割成小任务,并并行运行小任务。然后,小任务的结果被汇总到原始任务的最终输出中。
有几种不同的方法可以用 Dapr 实现 MapReduce 模式。例如,你可以用 Dapr Actor 来实现竞争的消费者模式。当行为体收到大任务时,它可以把任务分割成小任务,并把这些小任务排到队列中。然后订阅该队列的任务处理器可以竞争完成所有的任务。当一个处理器完成了一个小任务,它就会回调给行为者报告结果。每次行动者收到回调时,它都会增加一个内部计数器。当计数器与生成的任务数相匹配时,行为体就会将结果汇总并生成最终的输出,如 图 3-10 所示。
:::tips 基于回合的并发性确保了对行为体的并发调用是有顺序的,所以在增加计数器时不会有冲突。关于 Actor 的更多信息,请参见 第 5 章。 :::
图 3-10. MapReduce 模式
当数据量巨大时,你可以选择将数据保存到外部存储,只将数据索引发送到任务处理器。同样地,数据处理器可以将结果写回外部数据库。当所有的任务完成后,行为者可以从数据库中组合出结果。
这就结束了我们对消息传递模式的讨论。同样,我们只是从几十种有用的消息传递模式中挑选了少数几种;对于有兴趣学习更多知识的读者,我们推荐 Gregor Hohpe 和 Bobby Woolf(Addison-Wesley Professional)的《企业集成模式》(Enterprise Integration Patterns)。
总结
消息传递是设计松散耦合的微服务系统的一个非常强大的工具。Dapr 提供了常见的消息传递结构,如发布/订阅和绑定,但它并没有创建自己的消息传递骨干网。相反,它与流行的消息传递骨干网集成以提供这些功能。
Dapr 的发布/订阅允许消息发布者将消息发布到一个主题,而该主题的所有子抄写者都能得到消息的副本。Dapr 确保一个消息至少被处理一次。
Dapr 输入绑定可用于通过任何支持的事件源的事件来触发应用程序。Dapr 输出绑定允许应用程序连接到流行的外部系统来交换数据。绑定允许你定义一个通用的处理管道,可以动态地连接到不同的外部系统。
在下一章中,我们将介绍另一个基本主题:安全。
