Temporal 是可伸缩且可靠的运行时,用于可重入进程,称为 Temporal Workflow Executions.
The Temporal System

Temporal 应用

Temporal 应用程序是一组 Temporal Workflow Executions. 每个 Temporal Workflow Executions 都可以独占访问其本地状态,与所有 Temporal Workflow Executions 同时执行,并通过消息传递与其他 Temporal Workflow Executions 进行通信。

Temporal 应用程序可以包含数百万到数十亿的 Workflow Executions 。Workflow Executions 是轻量级组件。Workflow Executions 消耗很少的计算资源; 如果Workflow Executions 被暂停 (例如处于等待状态),则不消耗任何计算资源。

可重入执行
Temporal Workflow Execution 是可重入。可重入进程是 Resumable、Recoverable 和 Reactive的。

  • Resumable: 进程在执行暂停 _awaitable_后继续执行的能力 。
  • Recoverable: 进程在执行失败 _failure _后继续执行的能力 。
  • Reactive: 进程对外部事件做出反应的能力.

因此, 无论您的代码在存在任意负荷和任意故障,无论需要执行几秒钟还是几年 Temporal Workflow Execution 会执行 Temporal Workflow Definition, 也称为 Temporal Workflow Function,即您的应用程序代码,只会恰好一次执行并最终执行完成。提供了 exactly once 的语义。

Temporal Platform

Temporal Platform 由 Temporal ClusterWorker Processes。这些组件一起为 Workflow Executions 创建运行时。

The Temporal Platform (runtime)
Temporal Cluster 是开源的,可以由您操作并自托管。Temporal Cloud 是我们托管操作的一组集群。
Worker Processes 由您托管并执行您的代码。它们通过 gRPC 与 Temporal Cluster 通信。

Temporal SDK

Temporal SDK 是一个特定于语言的库,提供 api 来执行以下操作:

  1. 构造并使用 Temporal 客户端与 Temporal 集群通信。
  2. 开发工作流定义
  3. 开发 Worker 进程

Temporal SDK 使您能够使用编程语言的全部功能来编写应用程序代码,而Temporal 平台可以处理应用程序的持久性、可用性和可伸缩性。
Sdk提供以下语言版本:

  • Go
  • Java
  • PHP
  • TypeScript

    Why Temporal?

    Temporal 系统的一个方面是它抽象了分布式系统的复杂性。随着系统潜在负载的变化,分布式系统的存在是为了跨多台机器扩展计算。理论上,分布式系统有助于可靠且高性能的应用程序。

然而,应用程序的等待下游部分响应的时候,如果遇到失败都会使事情变得非常复杂,尤其是在大规模的情况下。

Distributed application failures
如果没有响应【比如超时】,应用程序的下游部分将如何知道在状态更改之前是否存在故障或之后是否存在故障?应用程序将如何跟踪和协调不一致的状态?

在传统系统中,通常会进行大量投资以维护每个组件的运行状况,可视化整个系统的运行状况,为计算定义超时约束,为失败的计算协调重试,并保持一致的状态。

这些系统通常是无状态服务、数据库、cron作业和队列的混合体。随着这些系统的扩展,响应多个异步事件、与不可靠的外部资源通信或跟踪非常复杂的事物的状态变得越来越具有挑战性。

Temporal 将服务、数据库、cron作业、队列、主机进程和sdk的使用重组为 Temporal 的平台,并直接统一解决这些故障。

在传统系统中,服务的存在是为了生成函数并执行。 Temporal 平台通过 Workflow Executions 来处理这部分逻辑.

Temporal vs Traditional system
尽管乍一看这两个系统看起来很相似,但它们在几个重要方面有所不同。

失败
在传统系统中,服务函数的执行既易变又短暂。

  • 如果函数执行失败,则无法恢复,因为所有执行状态都丢失。函数执行等待的时间越长,失败的可能性就越大。
  • 传统函数执行的寿命通常有限,通常以分钟为单位。

使用 Temporal ,Workflow Executions是可恢复的。

  • 失败后,工作流执行完全可恢复。
  • Temporal 对工作流执行没有规定截止日期。

状态
对于传统系统,停止或失败意味着所有执行状态都丢失。您的应用程序 (或支持性组件) 必须监视服务的响应,以启动服务执行的重试。而且是从最 状态重新开始重试 。

Temporal,则可以恢复 最新 状态。保留之前已经做过的所有进度。

通讯
在传统系统中,您无法与函数执行进行通信。

用 Temporal, 信号查询 允许将数据发送到工作流执行或从工作流执行中提取数据。

范围
对于传统系统,服务功能执行最多可以代表一条业务流程。通常,它仅代表某条业务流程的一部分。

Temporal Workflow Execution可以表示业务流程或整个业务对象。【workflow as code 给予了最强的领域建模能力,DDD 实践者了】

订阅 用例

让我们看一个基于订阅的用例,以比较基于 Temporal 的 APP 程序 和其他传统方法之间的差异。

基本业务步骤如下:

  1. 客户注册具有试用期的服务。
  2. 试用期过后,如果客户没有取消,应每月收取一次费用。
  3. 必须通过电子邮件通知客户费用,并且应该能够随时取消订阅。

这种业务逻辑不是很复杂,可以用几十行代码来表达。任何实际实施都必须确保业务流程是容错的可扩展的

以数据库为中心的设计方案

第一种方法可能是将所有内容都集中在数据库周围,在该数据库中,应用程序进程将定期扫描数据库表以查找处于特定状态的客户,执行必要的操作,并更新数据库以反映更改。

然而,有各种各样的缺点。

  • 最明显的一点是,客户状态的应用状态机很快变得极其复杂。例如,如果由于下游系统不可用而导致信用卡收费尝试失败或发送电子邮件失败,则该状态现在处于不确定的中间状态。
  • 失败的调用可能需要重试很长时间,并且需要限制这些调用以避免外部资源过载。
  • 需要有逻辑来处理损坏的客户记录,以避免阻塞整个过程。
  • 此外,数据库具有性能和可伸缩性限制 (最终需要分片),对于需要持续轮询的场景效率不高。

队列系统设计方案

下一个常用的方法是使用计时器服务和队列。更新被推送到队列中,而服务一次消耗一个更新,然后更新数据库,同时可能将更多消息推送到其他下游队列。会使用一个计时器服务可用于调度队列轮询或数据库操作。

虽然这种方法已经显示出更好的扩展,但编程模型可能会变得非常复杂且容易出错,因为排队系统、计时器服务、和数据库之间通常没有事务更新。【引入分布式事务是不是错误率更高了】

Temporal 设计方案

Temporal 旨在以简单的函数或对象方法封装和实现整个业务逻辑。由于有了 Temporal ,函数/方法具有持久的状态,并且实现者不需要使用任何额外的系统来确保一致性和容错性。

以下是在Java、Go和PHP中实现订阅管理用例的示例工作流定义:

这里只展示 golang 的更多请看链接

  1. package subscription
  2. import (
  3. "log"
  4. "time"
  5. "go.temporal.io/sdk/workflow"
  6. )
  7. func SubscriptionWorkflow(ctx workflow.Context, customer Customer) (string, error) {
  8. workflowCustomer := customer
  9. subscriptionCancelled := false
  10. billingPeriodNum := 0
  11. actResult := ""
  12. QueryCustomerIdName := "customerid"
  13. QueryBillingPeriodNumberName := "billingperiodnumber"
  14. QueryBillingPeriodChargeAmountName := "billingperiodchargeamount"
  15. logger := workflow.GetLogger(ctx)
  16. // Define query handlers
  17. // Register query handler to return trip count
  18. err := workflow.SetQueryHandler(ctx, QueryCustomerIdName, func() (string, error) {
  19. return workflowCustomer.Id, nil
  20. })
  21. if err != nil {
  22. logger.Info("QueryCustomerIdName handler failed.", "Error", err)
  23. return "Error", err
  24. }
  25. err = workflow.SetQueryHandler(ctx, QueryBillingPeriodNumberName, func() (int, error) {
  26. return billingPeriodNum, nil
  27. })
  28. if err != nil {
  29. logger.Info("QueryBillingPeriodNumberName handler failed.", "Error", err)
  30. return "Error", err
  31. }
  32. err = workflow.SetQueryHandler(ctx, QueryBillingPeriodChargeAmountName, func() (int, error) {
  33. return workflowCustomer.Subscription.BillingPeriodCharge, nil
  34. })
  35. if err != nil {
  36. logger.Info("QueryBillingPeriodChargeAmountName handler failed.", "Error", err)
  37. return "Error", err
  38. }
  39. // end defining query handlers
  40. // Define signal channels
  41. // 1) billing period charge change signal
  42. chargeSelector := workflow.NewSelector(ctx)
  43. signalCh := workflow.GetSignalChannel(ctx, "billingperiodcharge")
  44. chargeSelector.AddReceive(signalCh, func(ch workflow.ReceiveChannel, _ bool) {
  45. var chargeSignal int
  46. ch.Receive(ctx, &chargeSignal)
  47. workflowCustomer.Subscription.BillingPeriodCharge = chargeSignal
  48. })
  49. // 2) cancel subscription signal
  50. cancelSelector := workflow.NewSelector(ctx)
  51. cancelCh := workflow.GetSignalChannel(ctx, "cancelsubscription")
  52. cancelSelector.AddReceive(cancelCh, func(ch workflow.ReceiveChannel, _ bool) {
  53. var cancelSubSignal bool
  54. ch.Receive(ctx, &cancelSubSignal)
  55. subscriptionCancelled = cancelSubSignal
  56. })
  57. // end defining signal channels
  58. ao := workflow.ActivityOptions{
  59. StartToCloseTimeout: time.Minute * 5,
  60. }
  61. ctx = workflow.WithActivityOptions(ctx, ao)
  62. logger.Info("Subscription workflow started for: " + customer.Id)
  63. var activities *Activities
  64. // Send welcome email to customer
  65. err = workflow.ExecuteActivity(ctx, activities.SendWelcomeEmail, workflowCustomer).Get(ctx, &actResult)
  66. if err != nil {
  67. log.Fatalln("Failure executing SendWelcomeEmail", err)
  68. }
  69. // Start the free trial period. User can still cancel subscription during this time
  70. workflow.AwaitWithTimeout(ctx, workflowCustomer.Subscription.TrialPeriod, func() bool {
  71. return subscriptionCancelled == true
  72. })
  73. // If customer cancelled their subscription during trial period, send notification email
  74. if subscriptionCancelled == true {
  75. err = workflow.ExecuteActivity(ctx, activities.SendCancellationEmailDuringTrialPeriod, workflowCustomer).Get(ctx, &actResult)
  76. if err != nil {
  77. log.Fatalln("Failure executing SendCancellationEmailDuringTrialPeriod", err)
  78. }
  79. // We have completed subscription for this customer.
  80. // Finishing workflow execution
  81. return "Subscription finished for: " + workflowCustomer.Id, err
  82. }
  83. // Trial period is over, start billing until
  84. // we reach the max billing periods for the subscription
  85. // or sub has been cancelled
  86. for {
  87. if billingPeriodNum >= workflowCustomer.Subscription.MaxBillingPeriods {
  88. break
  89. }
  90. // Charge customer for the billing period
  91. err = workflow.ExecuteActivity(ctx, activities.ChargeCustomerForBillingPeriod, workflowCustomer).Get(ctx, &actResult)
  92. if err != nil {
  93. log.Fatalln("Failure executing ChargeCustomerForBillingPeriod", err)
  94. }
  95. // Wait 1 billing period to charge customer or if they cancel subscription
  96. // whichever comes first
  97. workflow.AwaitWithTimeout(ctx, workflowCustomer.Subscription.BillingPeriod, func() bool {
  98. return subscriptionCancelled
  99. })
  100. // If customer cancelled their subscription send notification email
  101. for cancelSelector.HasPending() {
  102. cancelSelector.Select(ctx)
  103. }
  104. if subscriptionCancelled {
  105. err = workflow.ExecuteActivity(ctx, activities.SendCancellationEmailDuringActiveSubscription, workflowCustomer).Get(ctx, &actResult)
  106. if err != nil {
  107. log.Fatalln("Failure executing SendCancellationEmailDuringActiveSubscription", err)
  108. }
  109. break
  110. }
  111. billingPeriodNum++
  112. for chargeSelector.HasPending() {
  113. chargeSelector.Select(ctx)
  114. }
  115. }
  116. // if we get here the subscription period is over
  117. // notify the customer to buy a new subscription
  118. if !subscriptionCancelled {
  119. err = workflow.ExecuteActivity(ctx, activities.SendSubscriptionOverEmail, workflowCustomer).Get(ctx, &actResult)
  120. if err != nil {
  121. log.Fatalln("Failure executing SendSubscriptionOverEmail", err)
  122. }
  123. }
  124. return "Completed Subscription Workflow", err
  125. }

同样,需要注意的是,这是直接实现业务逻辑的工作应用程序代码。如果任何操作需要很长时间,代码不需要改变。
**chargeCustomerForBillingPeriod **阻塞完全可行,比如下游处理服务关闭或没有响应,则持续一天或更长时间。甚至,直接在工作流代码中休眠 30 天是完全正常的操作。这是可行的,因为基础设施故障不会影响工作流状态-包括线程阻塞调用任何变量。
Temporal 实际上对开放工作流执行的数量没有可伸缩性限制,因此即使您的应用程序拥有数亿客户,也可以反复使用此代码。