公众号:秦夏 漫谈大数据 网址:https://mp.weixin.qq.com/s/tYvlAM9b9dpkYfCtPAInpg



这是《漫谈分布式系统》系列的第 13 篇,预计会写 30 篇左右。每篇文末有为懒人准备的 TL;DR,还有给勤奋者的关联阅读。扫描文末二维码,关注公众号,听我娓娓道来。也欢迎转发朋友圈分享给更多人。

从 ACID 到 BASE

在业务很简单,数据量不大的时候,传统单机关系数据库就够用了。在各个关系数据库里,很早就实现了单机版的事务,并总结成了 ACID 四个特性。

  • Atomity,原子性,事务内的所有操作要么全部执行,要么全部不执行。
  • Consistency,一致性,事务执行完后,数据库仍然保持合法的状态,如不违反主外键约束等。
  • Isolation,隔离性,事务之间互相隔离,并发执行时也不受影响。
  • Durability,持久性,事务提交后,涉及的变化都会被持久化下来,不会因系统故障而丢失。

另一个关联很大,但容易混淆的概念,是前面文章讲过的的 CAP。

  • Consistency,一致性,不同时刻的不同请求都能返回同样的数据。
  • Availability,可用性,每个外部请求都能得到系统的有效响应。
  • Partition tolerance,分区容忍性,在出现网络分区时,系统仍然能正常运转。

可以看到,ACID 和 CAP 里的 C 有着完全不一样的含义。前者强调数据库状态始终合法有效,后者强调不同副本上的数据对外而言始终一样。

以 MySQL 为典型的单机关系型数据库能很好的支持 ACID,但是,当数据量和请求并发膨胀到一定程度后,必然会横向扩展为分布式数据库。

典型的实现是在单机数据库的基础上,做多库多表。无论是纵向的对同一类数据的切分,比如把用户表切分后放到 100 个数据库实例;还是横向的对不同类型的数据做切分,比如把用户表和商品表分别放到不同的数据库实例,都是难免的。

数据库变成分布式之后,我们自然能像前面文章介绍的那样,在单机事务的基础上,通过 2PC 的方式实现分布式事务。

而出于高可用的需要,还可以采用 binlog 等方式把数据复制到 slave 机器上,这个过程也可以通过 2PC 实现。

但是,任何分布式系统都逃不过 CAP 的诅咒。

前面文章已经详细说明过,2PC 模式的分布式事务,多轮多节点的协商导致性能不佳,并且无法提供分区容忍性。

虽然 ACID 中的 C 和 CAP 中的 C 含义并不一样,但引申到分布式语境下,ACID 确实也隐含了强一致性保证。而基于 2PC 的分布式事务则在分布式的场景下延续了对强一致性的追求,可以称之为 ACID on 2PC。哪怕我们继续优化,比如 ACID on 3PC,也没法从根本上解决问题。

同时,上篇文章说过,在大数据量和高并发的场景下,有时候,可用性和性能(也可以看作可用性的体现)的重要性并不比一致性弱。一个动不动不响应或者响应非常慢的系统,数据再一致,也很难大规模应用起来。

一方面,一致性难以完全保证,另一方面,可用性和性能又不能不管,那对分布式事务而言,到底有没有更好的出路呢?

答案是肯定的。并且已经有人将这个思路总结为 BASE 理论:

  • BA,Basically Available,基本可用,不追求完整的可用性。部分可用也好过完全不可用。
  • Soft state,软状态,不追求状态机那样机械的状态转换,允许出现中间状态。所谓「柔性事务」也是这个意思。
  • Eventually consistency,最终一致,不追求无时无刻的强一致。

简单讲,就是牺牲一部分一致性,来换取可用性。非常重要也非常典型的 trade-off。

在化学术语中,ACID 是酸的意思,而 BASE 则是碱的意思。从名字上也能看出二者的关系。

基于 Dynamo 的分布式事务

对了啊!上篇文章不是刚介绍过 Dynamo 吗,现成的因果(弱)一致性分布式数据库,那干嘛不直接在 Dynamo 的基础上做事务?!

在客户端实现事务

Amazon 官方曾经提供过一个叫 dynamodb-transactions 的库,来帮助应用在客户端实现分布式事务。

大致来说,是一个多步提交(multi-phase commit protocol)的实现:

  • create,创建一个主键唯一的 TX record,保存为 Dynamo 对象。
  • add,把事务相关的对象添加到 TX 对象的对象列表里。
  • lock,逐个把相关对象的 lock 设置为本 TX id。
  • save,保存相关对象的副本,以备回滚。
  • verify,重读 TX record,确保状态仍是 pending,以防止和其他事务产生竞争。
  • apply,执行事务对应的操作。修改操作会直接改动原对象,删除操作这时不会执行。
  • commit,TX record 状态从 pending 改为 commited。
  • complete,释放事务相关对象的锁,并删除 save 阶段保存的副本。
  • clean,TX record 状态更新为 complete。
  • delete,删除 TX record。

看起来列了很多步,有点吓人,实际上只是操作细节而已,和 2PC 的 Prepare-Commit 差不多。

上面是没有出现冲突时的正常执行流程。一旦出现冲突,则会进入另外的处理流程:

  • decide,从对象锁拿到占用它的 TX id,判断该 TX 状态,如果是 pending,说明没开始,则改成 roll-back(所以上面正常流程才有 verify 这步)。
  • complete,如果该事务状态是 committed,则继续执行正常的提交流程;如果是 roll-back,则执行回滚流程。
  • clean,同上的 clean 操作。

可以看到,事务之间是可以互相帮忙推进流程的,这个比较激进的机制当然有利于推动事务尽快完成,但也加剧了竞争。频繁的互相回滚是可以预见的局面。所以可以考虑在 decide 步骤前加入等待等办法来缓解。

除了事务之间的竞争,DynamoDB 还支持多个 coordinator 处理同一个事务,不过解决竞争的思路类似,就不再赘述了。

需要注意的是,apply 阶段的修改操作会直接更新对象,即使事务没提交,这个修改也是可见的。当然,可以通过加读锁来避免。

在服务端实现事务

客户端自己实现总是不方便且容易出错,于是在 2018 年,DynamoDB 终于在服务端集成了事务功能。

  1. / 以下省略 checkCustomerValidmarkItemSold createOrder 的定义代码
  2. Collection<TransactWriteItem> actions = Arrays.asList(
  3. new TransactWriteItem().withConditionCheck(checkCustomerValid),
  4. new TransactWriteItem().withUpdate(markItemSold),
  5. new TransactWriteItem().withPut(createOrder));
  6. TransactWriteItemsRequest placeOrderTransaction = new TransactWriteItemsRequest()
  7. .withTransactItems(actions)
  8. .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL);
  9. // Execute the transaction and process the result.
  10. try {
  11. client.transactWriteItems(placeOrderTransaction);
  12. System.out.println("Transaction Successful");
  13. } catch (ResourceNotFoundException rnf) {
  14. System.err.println("One of the table involved in the transaction is not found" + rnf.getMessage());
  15. } catch (InternalServerErrorException ise) {
  16. System.err.println("Internal Server Error" + ise.getMessage());
  17. } catch (TransactionCanceledException tce) {
  18. System.out.println("Transaction Canceled " + tce.getMessage());
  19. }

无论是客户端还是服务端的实现,由于 DynamoDB 采用了 W+R 的读写模式,即使事务执行成功,其他请求仍有可能读到过时的副本。不过,可以设置 ConsistentRead 参数强制读足够多的副本来获取最新的数据。

如上图的示例代码,通过以 transactWriteItemstransactGetItems 为核心的事务 API,几行熟悉的代码,就能实现事务的效果。

另外,为了降低网络问题导致的事务大量失败带来的用户体验问题,DynamoDB 只支持同一个 region(简单理解为 AWS 上的一个机房) 下的事务,不支持跨 region 的 global table 上的事务。

基于 BASE 的分布式事务

DynamoDB 虽好,但是如果我们已有的系统没用 DynamoDB 呢,或者用了 DynamoDB,但事务里面还包含其他数据库和服务(heterogeneous distributed transactions)呢,又该怎么办?

牺牲一致性

再回过头想想,既然 BASE 想要做的是牺牲一致性来换取可用性和性能(某种程度看也属于可用性)。那就先牺牲起来,怎么牺牲一致性呢?

回顾前面几篇文章,我们提到的单主同步、2PC 和 Paxos 这些强一致性模型,虽然差别很大,但都有一个共性:通过同步的方式做数据交互。

那想要牺牲一致性,只要改同步为异步就好了。这符合我们在系列第 8 篇讲到的内容,也正好解决了提升性能的需求。

而要做异步,最常用的就是 MQ。

举一个例子说明(来自 Ebay 的 Dan Pritchett)。

有一个电商系统,其中有 user 和 transaction 两种表,分别记录用户和交易信息。

每产生一次交易,都会生成一条 transaction 记录,并更新 user 表里的 amt_sold 或 amt_bought 字段。很明显,这两个表的数据是需要关联一致的。

一开始,ACID 下的事务就够用了:

  1. begin transaction
  2. insert into transaction(xid, seller_id, buyer_id, amount);
  3. update user set amt_sold=amt_sold+$amount where id=$seller_id;
  4. update user set amt_bought=amt_bought+$amount where id=$buyer_id;
  5. end transaction

改造成 BASE 之后的伪代码大概会是这样:

  1. begin transaction
  2. insert into transaction(xid, seller_id, buyer_id, amount);
  3. queue message "update user("seller", seller_id, amount)";
  4. queue message "update user("buyer", buyer_id, amount)";
  5. end transaction
  6. for each message in queue
  7. begin transaction
  8. dequeue message
  9. if message.balance = "seller"
  10. update user set amt_sold=amt_sold+message.amount where id=message.id
  11. else
  12. update user set amt_bought=amt_bought+message.amount where id=message.id
  13. end if
  14. end transaction
  15. end for

引入 MQ 解耦后,两张表的更新不在一个事务内完成,数据的强一致性就没有保障了,但性能得到大幅提升。

找回一致性

牺牲一致性是暂时的妥协,不是放弃,最终一致性还是要保证的。

怎么保证呢?要解决问题就要先找到问题。

先看看可能出现一致性的几个地方:

  • 插入 user 表和向 MQ 发送消息,这两个操作的事务性怎么保障
  • dequeue message 和向 user 表插入数据,这两个操作的事务性又怎么保证

其实就是数据库和 MQ 的异构事务性保证

第一个办法,很容易想到 2PC 不然之前白讲了呵呵)。这样就需要 MQ 支持消息的预提交,比如 RocketMQ 就支持。

  • 第一阶段,发起数据库事务,并向 MQ 预提交消息。
  • 第二阶段,如果数据库事务执行成功,则向 MQ 正式提交消息,否则取消消息或不正式提交后超时丢弃。

当然,由于数据库已经支持事务了,实际写法上不是严格的两阶段,而是把消息的预提交和正式提交/回滚嵌入到数据库的事务代码内。伪代码简单示例如下:

  1. begin transaction
  2. try
  3. database.update_row()
  4. mq.prepare_message()
  5. except
  6. database.rollback()
  7. mq.cancle_message()
  8. else
  9. database.commit()
  10. mq.commit_message()
  11. end transaction

生产消息的事务解决了,消费消息的事务也类似,利用 MQ 的 ACK 功能实现。

可是,如果系统里使用的 MQ 不支持消息预提交呢,又怎么实现异构事务?

  1. begin transaction
  2. try
  3. database.update_row()
  4. mq.commit_message()
  5. except
  6. database.rollback()
  7. else
  8. database.commit()
  9. end transaction

所谓幂等,就是同样的操作无论执行多少次,产生的结果都是一样的。比如 a = 1 就是幂等的,a++ 就不是幂等的。

所以有第二个办法,需要下游消息的消费方支持 幂等 操作。

所谓幂等,就是同样的操作无论执行多少次,产生的结果都是一样的。比如 a = 1 就是幂等的,a++ 就不是幂等的。

一个最简单的保证幂等的办法,就是给每条消息一个唯一的 id,下游维护一个已经消费过的消息 id 的缓存,每次消费消息的时候,检查这个 id,如果在缓存里,表示已经处理过,就应该丢弃掉。

(这里又涉及到我们已经提到过好几次的 exactly once 问题,更加完善的处理方法,还是等到后面系统再聊。)

这样,一个提供 BASE 保证的分布式系统就成型了。

我们就通过 MQ 异步解耦,提升了系统整体性能。与之而来的代价,就是只能提供最终一致性(E),流程中会出现中间状态(S)。

而解耦之后的系统,部分组件的失败,也只会导致部分的不可用,而能最大程度保留系统整体的可用性(BA)。


TL;DR

  • 单机事务自动演化到分布式场景下,得到类似 ACID on 2PC 的方案,但性能和可用性都不够好。
  • BASE 降低了对一致性的追求,以便获得更好的可用性和性能。
  • 上面文章介绍过 Dynamo,是典型的最终一致性的分布式数据库,很容易想到用来实现 BASE。
  • DynamoDB 可以有客户端和服务端两种方式实现事务。
  • 如果没有用或不只用了 DynamoDB,需要自己实现异构事务。
  • 可以基于 2PC 实现异构 BASE 事务。
  • 也可以通过支持幂等来实现异构 BASE 事务。

系列第 10 篇我们介绍了强一致性下的分布式事务,这篇又了解了弱一致性下的分布式事务。

紧接着就面临一个问题:什么时候该用哪个?

(如果你还在问 哪个更好 这样的问题,那你需要逐渐习惯,往往没有最好,只有 trade-off 下的更好。)

两种模型的优缺点都非常明显,所以都没办法一统天下,也没办法被取代。需要我们自己,根据应用场景灵活选择。

比如 Amazon 上的商品详情页里的库存数量,不需要时时准确但访问量巨大,就可以用 BASE;而如果你的系统里有现金转账的功能,你对赔钱的担忧又远超对性能的抱怨,那就可以考虑继续 2PC 下的 ACID。

好了,分布式事务的介绍就到此为止。有心的人应该可以发现,我是从数据一致性逐渐引出分布式事务这个主题的。这是我写这个系列的思路的延续,所以会有侧重点。

所以,我并没有想要穷举所有分布式事务的实现方案,比如 TCC 只提了下,SAGA 甚至都没提。也没有仔细去讲事务里面非常重要的隔离性(Isolation level)在各个实现下的体现。有兴趣的同学,可以再额外去了解。