Refer

Cadence 的各种概念

该示例的工作流会提交一个账单请求到另一个面板等待请求允许或者拒绝。但是 活动 activity 并不会等待批准,而是通过返回一个特殊错误来完成这一步

workflow 的下一步是需要使用 一个轮询 或者 listener 监听请求是否被拒绝,调用WorkflowClient.CompleteActivity() 来完成这个 activity,现实世界这样做,在该实验中,会自动执行这个工作。

Prequire

语雀内容

Step

源参考

  1. 启动 dummy server
    1. $ ./bin/dummy

如果 not found ,在 cadence-samples 目录下 执行 make 命令

  1. 在另一个终端中命令,部署 workflow 和 worker

    1. $ ./bin/expense -m worker
  2. 在另一个终端中触发工作流程

    1. $ ./bin/expense -m trigger
  3. 查看 localhost:8099/list

image.png

  1. 如果时间较久可以发现该 workflow close 了,同时状态时显示 超时。

image.png
如果你够快,那么该 workflow 中的 activity 能完成。
image.png

Analytics

Go 代码写作指导-官网

该 workflow 是用 Go 写的,下图是目录结构。
image.png

Dummy - 一个 web server

dummy 账单操作服务器,对于现实世界,应该是实际的业务服务器,同时可能负担着对于数据库的操作,在这里, Dummy server 的作用是 list expenses, create new expense, update expense state and checking expense state。路由如下:
image.png

Workflow

workflow 指导
注意: 在 cadence 中 go, channel, select, time.Now(), time.Sleep() 需要使用 官方提供的 sdk

  1. Coroutine related constructs:
  2. workflow.Go : This is a replacement for the the go statement.
  3. workflow.Channel : This is a replacement for the native chan type. Cadence provides support for both buffered and unbuffered channels.
  4. workflow.Selector : This is a replacement for the select statement.
  5. Time related functions:
  6. workflow.Now() : This is a replacement for time.Now().
  7. workflow.Sleep() : This is a replacement for time.Sleep().
  1. 需要注册你的 workflow

image.png

  1. 查看 workflow 内容,第一步配置 该 workflow 的一些配置如 超时时间。
  • 可以看到 workflow 由多个 workflow.WithActivityOptions() 来组成核心骨架,同时他们的参数createExpenseActivity表示了他们是和业务交互的部分,其他是围绕这些操作做的限制,日志,超时,判断状态等操作。
  • 注意 workflow 中使用的信号,如最后的”COMPLETED“ 不难猜出其中使用的信号机制,可以去看这方面的源代码。

image.png

Activity

注册实际上是构造了一个函数映射表,注册了之后其他地方就能使用,如上文中的 workflow 中使用的createExpenseActivity
image.png

其中重点是 waitForDecisionActivity 的操作,这是一个异步的 Activity 因为他一开始是注定失败的,同时返回一个错误 activity.ErrResultPending 向 cadence 表明自己未完成,直到超时或者 CompleteActivity 方法被调用,在这个例子中,这个方法是在 dummy expense server 中调用的。

  1. status := string(body)
  2. if status == "SUCCEED" {
  3. // register callback succeed
  4. logger.Info("Successfully registered callback.", zap.String("ExpenseID", expenseID))
  5. // ErrActivityResultPending is returned from activity's execution to indicate the activity is not completed when it returns.
  6. // activity will be completed asynchronously when Client.CompleteActivity() is called.
  7. return "", activity.ErrResultPending
  8. }

所以需要将这个 activity 的信息传给 dummy server

  1. // save current activity info so it can be completed asynchronously when expense is approved/rejected
  2. activityInfo := activity.GetInfo(ctx)
  3. formData := url.Values{}
  4. formData.Add("task_token", string(activityInfo.TaskToken))
  5. registerCallbackURL := expenseServerHostPort + "/registerCallback?id=" + expenseID
  6. resp, err := http.PostForm(registerCallbackURL, formData)
  7. if err != nil {
  8. logger.Info("waitForDecisionActivity failed to register callback.", zap.Error(err))
  9. return "", err
  10. }

而在 dummy server 中,当某一个账单申请被 同意(approve) 或者 拒绝(reject)的时候,会调用对应账单的 id,找到他对应的 activity 信息并调用 CompleteActivity 方法
image.png

notifyExpenseStateChange 方法在 /action Handler 中调用

  1. if oldState == created && (allExpense[id] == approved || allExpense[id] == rejected) {
  2. // report state change
  3. notifyExpenseStateChange(id, string(allExpense[id]))
  4. }

Main

整个流程就是这样,开启这个 workflow,之后 通过 trigger 触发这个 workflow,其中,workflow 首先创建账单,然后等待 dummy server 通知或者超时,最后完成。我们讲解了使用 cadence 如何为这个业务创建一个 等幂,可控,可视的流程(workflow)

  1. // This needs to be done as part of a bootstrap step when the process starts.
  2. // The workers are supposed to be long running.
  3. func startWorkers(h *common.SampleHelper) {
  4. // Configure worker options.
  5. workerOptions := worker.Options{
  6. MetricsScope: h.Scope,
  7. Logger: h.Logger,
  8. }
  9. h.StartWorkers(h.Config.DomainName, ApplicationName, workerOptions)
  10. }
  11. func startWorkflow(h *common.SampleHelper, expenseID string) {
  12. workflowOptions := client.StartWorkflowOptions{
  13. ID: "expense_" + uuid.New(),
  14. TaskList: ApplicationName,
  15. ExecutionStartToCloseTimeout: time.Minute,
  16. DecisionTaskStartToCloseTimeout: time.Minute,
  17. }
  18. h.StartWorkflow(workflowOptions, SampleExpenseWorkflow, expenseID)
  19. }
  20. func main() {
  21. var mode string
  22. flag.StringVar(&mode, "m", "trigger", "Mode is worker or trigger.")
  23. flag.Parse()
  24. var h common.SampleHelper
  25. h.SetupServiceConfig()
  26. switch mode {
  27. case "worker":
  28. startWorkers(&h)
  29. // The workers are supposed to be long running process that should not exit.
  30. // Use select{} to block indefinitely for samples, you can quit by CMD+C.
  31. select {}
  32. case "trigger":
  33. startWorkflow(&h, uuid.New())
  34. }
  35. }