本文首发于泊浮目的专栏:https://segmentfault.com/blog/camile

版本 日期 备注
1.0 2017.12.19 文章首发
1.1 2021.5.21 添加小结

前言

在ZStack(或者说产品化的IaaS软件)中的任务通常有很长的执行路径,错误可能发生在路径的任意一处。为了保证系统的正确性,需提供一种较为完善的回滚机制——在ZStack中,通过一个工作流引擎,ZStack的每一个步骤都被包裹在独立的工作流中,可以在出错的时候回滚。此外,通过在配置文件中组装工作流的方式,关键的执行路径可以被配置,这使得架构的耦合度进一步降低。

系统解耦合的手段除了之前文章所提到的分层、分割、分布等,还有一个重要手段是异步,业务之间的消息传递不是同步调用,而是将一个业务操作分成多个阶段,每个阶段之间通过共享数据的方式异步执行进行协作。

这即是一种在业务设计原则中——流程可定义原则的具象化。接触过金融行业的同学肯定知道,不同的保险理赔流程是不一样的。而承保流程和理赔流程是分离的,在需要时进行关联,从而可以复用一些理赔流程,并提供一些个性化理赔流程。

演示代码

就以创建VM为例,在ZStack中大致可以分以下几个步骤:

  1. <bean id="VmInstanceManager" class="org.zstack.compute.vm.VmInstanceManagerImpl">
  2. <property name="createVmWorkFlowElements">
  3. <list>
  4. <value>org.zstack.compute.vm.VmImageSelectBackupStorageFlow</value>
  5. <value>org.zstack.compute.vm.VmAllocateHostFlow</value>
  6. <value>org.zstack.compute.vm.VmAllocatePrimaryStorageFlow</value>
  7. <value>org.zstack.compute.vm.VmAllocateVolumeFlow</value>
  8. <value>org.zstack.compute.vm.VmAllocateNicFlow</value>
  9. <value>org.zstack.compute.vm.VmInstantiateResourcePreFlow</value>
  10. <value>org.zstack.compute.vm.VmCreateOnHypervisorFlow</value>
  11. <value>org.zstack.compute.vm.VmInstantiateResourcePostFlow</value>
  12. </list>
  13. </property>
  14. <!-- 还有很多,介于篇幅不再列出 -->

可以说是代码即文档了。在这里,ZStack显式声明这些Flow在Spring XML中,这些属性将会被注入到createVmWorkFlowElements中。每一个Flow都被拆成了一个个较小的单元,好处不仅是将业务操作分成了多个阶段易于回滚,还是可以有效复用这些Flow。这也是编程思想中“组合”的体现。

如何使用

除了这种配置型声明,还可以在代码中灵活的使用这些FlowChain。在这里,我们将以Case来说明这些FlowChain的用法,避免对ZStack业务逻辑不熟悉的读者看的一头雾水。

一共有两种可用的FlowChain:

  • SimpleFlowChain
  • ShareFlowChain

SimpleFlowChain

我们先来看一个Case

  1. @Test
  2. public void test() {
  3. FlowChain chain = FlowChainBuilder.newShareFlowChain();
  4. chain.then(new ShareFlow() {
  5. int a;
  6. @Override
  7. public void setup() {
  8. flow(new NoRollbackFlow() {
  9. @Override
  10. public void run(FlowTrigger trigger, Map data) {
  11. a = 1;
  12. increase();
  13. trigger.next();
  14. }
  15. });
  16. flow(new NoRollbackFlow() {
  17. @Override
  18. public void run(FlowTrigger trigger, Map data) {
  19. a = 2;
  20. increase();
  21. trigger.next();
  22. }
  23. });
  24. }
  25. }).done(new FlowDoneHandler(null) {
  26. @Override
  27. public void handle(Map data) {
  28. success = true;
  29. }
  30. }).start();
  31. Assert.assertTrue(success);
  32. expect(2);
  33. }

我们可以看到,这就是一个工作流。完成一个工作流的时候(回调触发时)执行下一个工作流——由trigger.next触发。不仅如此,还可以添加Rollback属性

  1. @Test
  2. public void test() throws WorkFlowException {
  3. final int[] count = {0};
  4. new SimpleFlowChain()
  5. .then(new Flow() {
  6. @Override
  7. public void run(FlowTrigger chain, Map data) {
  8. count[0]++;
  9. chain.next();
  10. }
  11. @Override
  12. public void rollback(FlowRollback chain, Map data) {
  13. count[0]--;
  14. chain.rollback();
  15. }
  16. })
  17. .then(new Flow() {
  18. @Override
  19. public void run(FlowTrigger chain, Map data) {
  20. count[0]++;
  21. chain.next();
  22. }
  23. @Override
  24. public void rollback(FlowRollback chain, Map data) {
  25. count[0]--;
  26. chain.rollback();
  27. }
  28. })
  29. .then(new Flow() {
  30. @Override
  31. public void run(FlowTrigger chain, Map data) {
  32. chain.fail(null);
  33. }
  34. @Override
  35. public void rollback(FlowRollback chain, Map data) {
  36. count[0]--;
  37. chain.rollback();
  38. }
  39. })
  40. .start();
  41. Assert.assertEquals(-1, count[0]);
  42. }

rollback由FlowTrigger的fail触发。这样我们可以保证在发生一些错误的时候及时回滚,防止我们的系统处于一个有脏数据的中间状态。同时,Map也可以用来在Flow之间传递上下文。

ShareFlowChain

  1. public class TestShareFlow {
  2. int[] count = {0};
  3. boolean success;
  4. private void increase() {
  5. count[0]++;
  6. }
  7. private void decrease() {
  8. count[0]--;
  9. }
  10. private void expect(int ret) {
  11. Assert.assertEquals(count[0], ret);
  12. }
  13. @Test
  14. public void test() {
  15. FlowChain chain = FlowChainBuilder.newShareFlowChain();
  16. chain.then(new ShareFlow() {
  17. int a;
  18. @Override
  19. public void setup() {
  20. flow(new NoRollbackFlow() {
  21. @Override
  22. public void run(FlowTrigger trigger, Map data) {
  23. a = 1;
  24. increase();
  25. trigger.next();
  26. }
  27. });
  28. flow(new NoRollbackFlow() {
  29. @Override
  30. public void run(FlowTrigger trigger, Map data) {
  31. a = 2;
  32. increase();
  33. trigger.next();
  34. }
  35. });
  36. }
  37. }).done(new FlowDoneHandler(null) {
  38. @Override
  39. public void handle(Map data) {
  40. success = true;
  41. }
  42. }).start();
  43. Assert.assertTrue(success);
  44. expect(2);
  45. }
  46. @Before
  47. public void setUp() throws Exception {
  48. new BeanConstructor().build();
  49. }
  50. }

比起SimpleFlowChain,ShareFlowChain则是一个Inner class,在相同的作用域里,传递数据变得更加的方便了。

它的实现

在ZStack中,FlowChain作为核心库,其实现也是非常的简单(可以直接参考SimpleFlowChainShareFlowChain),本质就是将任务放入List中,由内部方法进行迭代,在此基础上做了一系列操作。下面将开始分析它的源码。

从接口说起

  1. public interface FlowChain {
  2. List<Flow> getFlows();
  3. FlowChain insert(Flow flow);
  4. FlowChain insert(int pos, Flow flow);
  5. FlowChain setFlowMarshaller(FlowMarshaller marshaller);
  6. FlowChain then(Flow flow);
  7. FlowChain done(FlowDoneHandler handler);
  8. FlowChain error(FlowErrorHandler handler);
  9. FlowChain Finally(FlowFinallyHandler handler);
  10. FlowChain setData(Map data);
  11. FlowChain putData(Map.Entry... es);
  12. FlowChain setName(String name);
  13. void setProcessors(List<FlowChainProcessor> processors);
  14. Map getData();
  15. void start();
  16. FlowChain noRollback(boolean no);
  17. FlowChain allowEmptyFlow();
  18. }

接口的名字非常的易懂,那么在这里就不多作解释了。FlowChain仅仅定义了一个Flow最小应有的行为。

  1. //定义了Flow的回滚操作接口
  2. public interface FlowRollback extends AsyncBackup {
  3. //回滚操作
  4. void rollback();
  5. //设置跳过回滚操作
  6. void skipRestRollbacks();
  7. }
  1. //定义了触发器的行为接口
  2. public interface FlowTrigger extends AsyncBackup {
  3. //触发失败,调用errorHandle
  4. void fail(ErrorCode errorCode);
  5. //触发下一个flow
  6. void next();
  7. //setError后,在下次调用next的时才会调用errorHandle
  8. void setError(ErrorCode error);
  9. }

源码解析

Flow

  1. public interface Flow {
  2. void run(FlowTrigger trigger, Map data);
  3. void rollback(FlowRollback trigger, Map data);
  4. }

Flow的定义其实非常的简单——一组方法。执行和对应的回滚,一般在ZStack中都以匿名内部类的方式传入。

Chain的用法

在之前的SimpleFlowChain的case中。我们可以看到一系列的链式调用,大致如下:

  1. new SimpleFlowChain().then(new flow()).then(new flow()).then(new flow()).start();

then本质是往List flows里添加一个flow。

  1. public SimpleFlowChain then(Flow flow) {
  2. flows.add(flow);
  3. return this;
  4. }

再来看看start

  1. @Override
  2. public void start() {
  3. // 检测flow中是否设置了processors。一般用来打trace
  4. if (processors != null) {
  5. for (FlowChainProcessor p : processors) {
  6. p.processFlowChain(this);
  7. }
  8. }
  9. //如果flows为空但是之前在设置中允许为空,那么就直接直接done部分的逻辑。不然就报错
  10. if (flows.isEmpty() && allowEmptyFlow) {
  11. callDoneHandler();
  12. return;
  13. }
  14. if (flows.isEmpty()) {
  15. throw new CloudRuntimeException("you must call then() to add flow before calling start() or allowEmptyFlow() to run empty flow chain on purpose");
  16. }
  17. //每个flow必须有一个map,用来传递上下文
  18. if (data == null) {
  19. data = new HashMap<String, Object>();
  20. }
  21. //标记为已经开始
  22. isStart = true;
  23. //如果没有名字的话给flow 取一个名字,因为很有可能是匿名使用的flow
  24. if (name == null) {
  25. name = "anonymous-chain";
  26. }
  27. logger.debug(String.format("[FlowChain(%s): %s] starts", id, name));
  28. //打印trace,方便调试
  29. if (logger.isTraceEnabled()) {
  30. List<String> names = CollectionUtils.transformToList(flows, new Function<String, Flow>() {
  31. @Override
  32. public String call(Flow arg) {
  33. return String.format("%s[%s]", arg.getClass(), getFlowName(arg));
  34. }
  35. });
  36. logger.trace(String.format("execution path:\n%s", StringUtils.join(names, " -->\n")));
  37. }
  38. //生成一个迭代器
  39. it = flows.iterator();
  40. //从it中获取一个不需要跳过的flow开始执行。如果没有获取到,就执行done逻辑
  41. Flow flow = getFirstNotSkippedFlow();
  42. if (flow == null) {
  43. // all flows are skipped
  44. callDoneHandler();
  45. } else {
  46. runFlow(flow);
  47. }
  48. }

再来看一下runFlow中的代码

  1. private void runFlow(Flow flow) {
  2. try {
  3. //看报错信息就可以猜到在做什么防御措施了:如果一个transaction在一个flow中没有被关闭而跳到下一个flow时,会抛出异常。这个防御机制来自于一个实习生写的bug,当时被排查出来的时候花了非常大的力气——现象非常的诡异。所以现在被写在了这里。
  4. if (TransactionSynchronizationManager.isActualTransactionActive()) {
  5. String flowName = null;
  6. String flowClassName = null;
  7. if (currentFlow != null) {
  8. flowName = getFlowName(currentFlow);
  9. flowClassName = currentFlow.getClass().getName();
  10. }
  11. throw new CloudRuntimeException(String.format("flow[%s:%s] opened a transaction but forgot closing it", flowClassName, flowName));
  12. }
  13. //toRun就是一个当前要run的flow
  14. Flow toRun = null;
  15. if (flowMarshaller != null) {
  16. //flowMarshaller 实际上是一个非常恶心的玩意儿。尤其在一些配置好掉的xml flow突然因为一些条件而改变接下来执行的flow令人很无语...但是也提供了一些灵活性。
  17. toRun = flowMarshaller.marshalTheNextFlow(currentFlow == null ? null : currentFlow.getClass().getName(),
  18. flow.getClass().getName(), this, data);
  19. if (toRun != null) {
  20. logger.debug(String.format("[FlowChain(%s): %s] FlowMarshaller[%s] replaces the next flow[%s] to the flow[%s]",
  21. id, name, flowMarshaller.getClass(), flow.getClass(), toRun.getClass()));
  22. }
  23. }
  24. if (toRun == null) {
  25. toRun = flow;
  26. }
  27. if (CoreGlobalProperty.PROFILER_WORKFLOW) {
  28. //对flow的监视。比如flow的执行时间等
  29. stopWatch.start(toRun);
  30. }
  31. currentFlow = toRun;
  32. String flowName = getFlowName(currentFlow);
  33. String info = String.format("[FlowChain(%s): %s] start executing flow[%s]", id, name, flowName);
  34. logger.debug(info);
  35. //在flow中还允许定义afterDone afterError afterFinal的行为。稍后将会介绍
  36. collectAfterRunnable(toRun);
  37. //终于到了run,这里就是调用者传入的行为来决定run中的逻辑
  38. toRun.run(this, data);
  39. //fail的逻辑稍后解析
  40. } catch (OperationFailureException oe) {
  41. String errInfo = oe.getErrorCode() != null ? oe.getErrorCode().toString() : "";
  42. logger.warn(errInfo, oe);
  43. fail(oe.getErrorCode());
  44. } catch (FlowException fe) {
  45. String errInfo = fe.getErrorCode() != null ? fe.getErrorCode().toString() : "";
  46. logger.warn(errInfo, fe);
  47. fail(fe.getErrorCode());
  48. } catch (Throwable t) {
  49. logger.warn(String.format("[FlowChain(%s): %s] unhandled exception when executing flow[%s], start to rollback",
  50. id, name, flow.getClass().getName()), t);
  51. fail(errf.throwableToInternalError(t));
  52. }
  53. }

fail

    @Override
    public void fail(ErrorCode errorCode) {
        isFailCalled = true;
        setErrorCode(errorCode);
        //放入Stack中,之后Rollback会根据Stack中的flow顺序来
        rollBackFlows.push(currentFlow);
        //rollback会对this.rollBackFlows中flow按照顺序调用rollback
        rollback();
    }

FlowTrigger

//定义了触发器的行为接口
public interface FlowTrigger extends AsyncBackup {
    //触发失败,调用errorHandle
    void fail(ErrorCode errorCode);
    //触发下一个flow
    void next();
    //setError后,在下次调用next的时才会调用errorHandle
    void setError(ErrorCode error);
}

之前已经看过fail的代码。接下来来看看nextsetError

    @Override
    public void next() {
        //如果flow没有run起来的情况下,是不能调用next的
        if (!isStart) {
            throw new CloudRuntimeException(
                    String.format("[FlowChain(%s): %s] you must call start() first, and only call next() in Flow.run()",
                            id, name));
        }
        //当rollback开始的时候也不允许next
        if (isRollbackStart) {
            throw new CloudRuntimeException(
                    String.format("[FlowChain(%s): %s] rollback has started, you can't call next()", id, name));
        }
        //将当前flow的push进rollback用的stack
        rollBackFlows.push(currentFlow);

        logger.debug(String.format("[FlowChain(%s): %s] successfully executed flow[%s]", id, name, getFlowName(currentFlow)));
        //获取下一个flow。在这里才是真正意义上的next
        Flow flow = getFirstNotSkippedFlow();
        if (flow == null) {
            // no flows, or all flows are skipped
            if (errorCode == null) {
                callDoneHandler();
            } else {
                callErrorHandler(false);
            }
        } else {
            runFlow(flow);
        }
    }

可以看一下getFirstNotSkippedFlow,本质上是利用了迭代器的特性。

    private Flow getFirstNotSkippedFlow() {
        Flow flow = null;
        while (it.hasNext()) {
            flow = it.next();
            if (!isSkipFlow(flow)) {
                break;
            }
        }

        return flow;
    }

接下来是setError

    @Override
    public void setError(ErrorCode error) {
        setErrorCode(error);
    }

//往下看
    private void setErrorCode(ErrorCode errorCode) {
        this.errorCode = errorCode;
    }

根据之前的next逻辑:

        if (flow == null) {
            // no flows, or all flows are skipped
            if (errorCode == null) {
                callDoneHandler();
            } else {
                callErrorHandler(false);
            }
        } else {
            runFlow(flow);
        }

我们可以大致猜想到,如果在next的时候当前error不为空,则调用错误handle。这样在setError后还可以做一些事情。

无论是调用errorHandle还是doneHandle,都会调用finalHandle。finalHandle也允许用户定义这部分的逻辑,使flow更加的灵活。

更好的选择

由于该库是为ZStack定制而生,故此有一些防御性判断,源码显得略为verbose。如果有同学对此感兴趣,想将其应用到自己的系统中,笔者推荐使用:jdeferred

Java Deferred/Promise library similar to JQuery

由于JavaScript 中的代码都是异步调用的。简单说,它的思想是,每一个异步任务返回一个Promise对象,该对象有一个then方法,允许指定回调函数。

在这里列出几个较为简单的示范,或者有兴趣的读者也可以参考这里

import org.jdeferred.DeferredManager;
import org.jdeferred.Promise;
import org.jdeferred.impl.DefaultDeferredManager;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

import java.util.concurrent.TimeUnit;


public class deferSimpleTest {

    private static int var = 0;
    final DeferredManager dm = new DefaultDeferredManager();

    @After
    public void cleanUp() {
        var = 0;
    }


    @Test
    public void test() {
        Promise p1 = dm.when(() -> {
            var += 1;
        }).then(result -> {
            var += 1;
        });

        Promise p2 = dm.when(() -> {
            var += 1;
        }).then(result -> {
            var += 1;
        });

        dm.when(p1, p2).done(Void -> var += 1);
        Assert.assertEquals(5, var);
    }

    @Test
    public void test2() {
        final DeferredManager dm = new DefaultDeferredManager();

        Promise promise = dm.when(() -> {
                var += 1;
            }).then(result -> {
                var += 1;
            });

        dm.when(promise).done(Void -> var += 1);
        Assert.assertEquals(3, var);
    }

    @Test
    public void testBadCallback() {
        Promise promise = dm.when(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        dm.when(promise).done(Void -> {
                    var += 1;
                    throw new RuntimeException("this exception is expected");
                }
        ).fail(Void -> {
            System.out.print("fail!");
            var -= 1;
        });
        Assert.assertEquals(0, var);

    }
}

如果你在使用Java8,那么也可以通过CompletableFuture来得到“类似”的支持。

小结

本文和大家一起了解了FlowChain的实现,但其实这并不是什么新颖的东西。该组件的思想参考了SAGA——SAGA 事务模式的历史十分悠久,比分布式事务的概念提出还要更早。SAGA 的意思是“长篇故事、长篇记叙、一长串事件”,它起源于 1987 年普林斯顿大学的赫克托 · 加西亚 · 莫利纳(Hector Garcia Molina)和肯尼斯 · 麦克米伦(Kenneth Salem)在 ACM 发表的一篇论文《SAGAS》(这就是论文的全名)。