两阶段提交实现

本文档概述了rocksdb中两阶段提交的实现。

本项目可分为五个重点领域: 1.WAL格式修改 2.现有事务API的扩展 3.修改写入路径 4.修改恢复路径 5.与MyRocks集成

WAL格式修改

WAL由一个或多个日志组成。每个日志都是一个或多个序列化的writebatch。 在恢复期间,writebatch从日志中重建。要修改WAL格式或扩展其功能,我们必须只关注WriteBatch。

WriteBatch是一组有序的记录(Put(k,v)、Merge(k,v)、Delete(k)、SingleDelete(k)),它们表示RocksDB的写操作。 每个记录都有一个二进制字符串表示。当记录被添加到WriteBatch时,它们的二进制表示被附加到WriteBatch的二进制字符串表示中。这个二进制字符串的前缀是批处理的起始序列号,后面跟着批处理中包含的记录数。 如果操作不应用于默认列族,则可以在每个记录前面加上一个列族修饰符记录。

可以通过扩展WriteBatch::Handler遍历WriteBatch。 MemTableInserter是WriteBatch::Handler的扩展,它将WriteBatch中包含的操作插入到适当的列族MemTable中。

现有的WriteBatch可能具有以下逻辑表示:Sequence(0);NumRecords(3);Put(a,1);Merge(a,1);

修改2PC的WriteBatch格式包括添加4条新记录。

  • Prepare(xid)
  • EndPrepare()
  • Commit(xid)
  • Rollback(xid)

一个支持2PC的WriteBatch可以有以下逻辑表示: Sequence(0);NumRecords(6);Prepare(foo);Put(A,b);

可以看出,Prepare(xid)和EndPrepare()类似于匹配括号,其中包含属于ID ‘foo’事务的操作。 Commit(xid)和Rollback(xid)标记属于ID xid事务的操作应该提交或回滚。

序列ID分配

当WriteBatch(通过MemTableInserter)插入memtable时,每个操作的序列ID等于WriteBatch的序列ID加上WriteBatch中该操作之前使用的序列ID记录的数量。 添加2PC后,WriteBatch中序列id的隐式映射将不再有效。包含在Prepare()附件中的操作将使用序列id,就好像它们是从相对的Commit()标记的位置开始插入的一样。 这个Commit()标记可能位于不同的WriteBatch或日志中,与它所应用的准备操作不同。

向后兼容性

WAL格式没有版本控制,所以我们需要注意向后兼容性。当前版本的RocksDB无法从包含2PC标记的WAL文件中恢复自身。 事实上,对于无法识别的记录id,这将是致命的。然而,要修补当前版本的RocksDB,使其能够从这种新的WAL格式中恢复过来,只需跳过准备好的部分和未知的标记,这是非常简单的。

现有的进展

在这方面已取得进展,有关讨论可在 https://reviews.facebook.net/D54093 找到

现有事务API的扩展

目前我们只关注悲观事务的2PC。如果客户端打算使用两阶段提交语义,则必须提前指定。 例如,客户端代码可以设想为:

  1. TransactionDB* db;
  2. TransactionDB::Open(Options(), TransactionDBOptions(), "foodb", &db);
  3. TransactionOptions txn_options;
  4. txn_options.two_phase_commit = tr
  5. txn_options.xid = "12345";
  6. Transaction* txn = db->BeginTransaction(write_options, txn_options);
  7. txn->Put(...);
  8. txn->Prepare();
  9. txn->Commit();

事务对象现在可以占据更多的状态,所以我们的状态枚举现在变成:

  1. enum ExecutionStatus {
  2. STARTED = 0,
  3. AWAITING_PREPARE = 1,
  4. PREPARED = 2,
  5. AWAITING_COMMIT = 3,
  6. COMMITED = 4,
  7. AWAITING_ROLLBACK = 5,
  8. ROLLEDBACK = 6,
  9. LOCKS_STOLEN = 7,
  10. };

事务API将获得一个新的成员函数Prepare()。Prepare()将调用WriteImpl,并在它自身的上下文中提供WriteImpl和WriteThread对执行状态、XID和WriteBatch的访问权。 WriteImpl将插入Prepare(xid)标记,后面跟着WriteBatch的内容,后面跟着EndPrepare()标记。不会发出memtable插入。 同样,当同一个事务实例发出commit时,它调用WriteImpl()。这一次,只有一个Commit()标记被代表它插入到WAL中,WriteBatch的内容被插入到适当的memtables中。 当调用事务上的Rollback()时,将清除事务的内容,如果事务处于准备状态,则调用WriteImpl插入Rollback(xid)标记。

这些所谓”meta标记”(Prepare(xid)、EndPrepare()、Commit(xid)、Rollback(xid))永远不会直接插入到写批处理中。 写入路径(WriteImpl())将具有它正在写入的事务的上下文。它使用这个上下文将相关标记直接插入到WAL中(因此它们在插入到WAL之前插入到聚合WriteBatch中,但不插入其他WriteBatch)。 在恢复过程中,MemTableInserter将遇到这些标记,他将使用这些标记重构以前准备好的事务。

事务Wallclock过期

当前,在事务提交时,有一个回调函数,如果事务已过期,该回调函数的写操作将失败。 类似地,如果一个事务已经过期,那么它现在有资格让其他事务窃取它的锁。这些机制应该仍然适用于2PC—不同之处在于在准备时将调用过期回调。 如果事务在准备时没有过期,则不能在提交时过期。

TransactionDB修改

要使用事务,客户机必须打开TransactionDB。然后使用这个TransactionDB实例创建事务。 这个TransactionDB现在跟踪从XID到已创建的所有两阶段事务的映射。当一个事务被删除或回滚时,它将从这个映射中删除。 还有一个API用于查询所有未完成的准备事务。这是在MyRocks恢复期间使用的。

TransactionDB还跟踪包含准备部分的所有日志号的最小堆。 当一个事务被”准备”时,它的WriteBatch被写到一个日志中,然后这个日志号存储在事务对象中,然后存储在最小堆中。 当一个事务被提交时,它的日志号将从最小堆中删除,但不会被忘记!现在,每个memtable都有责任跟踪它需要保存的最古老的日志,直到成功地将其刷新到L0为止。

修改写入路径

写路径可以分解为两个主要的关注点。DBImpl: WriteImpl(…)和MemTableInserter。 多个客户机线程将调用WriteImpl。第一个线程将被指定为”leader”,随后的多个线程将被指定为”follower”。 领导者和追随者都将被组合成一个逻辑组,称为”写组”。leader将获取写组的所有writebatch,将它们连接在一起,并将这个blob写到WAL。 根据写组的大小和当前memtables支持并行写的意愿,领导可以将所有WriteBatch插入memtable,或者让每个线程将自己的WriteBatch插入memtable。

所有memtable插入都由MemTableInserter处理。这是WriteBatch::Handler的实现 — 一个WriteBatch迭代器处理程序。 这个处理程序遍历WriteBatch (Put、Delete、Merge等)中的所有元素,并对当前MemTable进行适当的调用。 MemTableInserter还将处理就地合并、删除和更新。

写入路径的修改将包括向DBImpl::WriteImpl添加一个可选参数。这个可选参数将是一个指针,指向正在编写数据的两阶段事务实例。 这个对象将使写路径能够洞察两阶段事务的当前状态。一个2PC事务将调用WriteImpl一次用于准备,一次用于提交,一次用于回滚——尽管提交和回滚显然是独占操作。

  1. Status DBImpl::WriteImpl(
  2. const WriteOptions& write_options,
  3. WriteBatch* my_batch,
  4. WriteCallback* callback,
  5. Transaction* txn
  6. ) {
  7. WriteThread::Writer w;
  8. //...
  9. w.txn = txn; // writethreads also have txn context for memtable insert
  10. // we are now the group leader
  11. int total_count = 0;
  12. uint64_t total_byte_size = 0;
  13. for (auto writer : write_group) {
  14. if (writer->CheckCallback(this)) {
  15. if (writer->ShouldWriteToMem())
  16. total_count += WriteBatchInternal::Count(writer->batch)
  17. }
  18. }
  19. const SequenceNumber current_sequence = last_sequence + 1;
  20. last_sequence += total_count;
  21. // now we produce the WAL entry from our write group
  22. for (auto writer : write_group) {
  23. // currently only optimistic transactions use callbacks
  24. // and optimistic transaction do not support 2pc
  25. if (writer->CallbackFailed()) {
  26. continue;
  27. } else if (writer->IsCommitPhase()) {
  28. WriteBatchInternal::MarkCommit(merged_batch, writer->txn->XID_);
  29. } else if (writer->IsRollbackPhase()) {
  30. WriteBatchInternal::MarkRollback(merged_batch, writer->txn->XID_);
  31. } else if (writer->IsPreparePhase()) {
  32. WriteBatchInternal::MarkBeginPrepare(merged_batch, writer->txn->XID_);
  33. WriteBatchInternal::Append(merged_batch, writer->batch);
  34. WriteBatchInternal::MarkEndPrepare(merged_batch);
  35. writer->txn->log_number_ = logfile_number_;
  36. } else {
  37. assert(writer->ShouldWriteToMem());
  38. WriteBatchInternal::Append(merged_batch, writer->batch);
  39. }
  40. }
  41. //now do MemTable Inserts for WriteGroup
  42. }

然后可以修改WriteBatchInternal::InsertInto,使其只遍历没有关联事务或事务处于提交状态的写入器。

修改写入路径的MemTableInserter

正如您在上面所看到的,当一个事务被准备好时,这个事务会记录下它所准备的部分所在的日志号。 在插入时,每个MemTable必须跟踪包含已插入到它的准备数据的最小日志号。 这个修改将在MemTableInserter中进行。我们将在log lifecycle部分讨论如何使用这个值。

修改恢复路径

当前的恢复路径已经非常适合两阶段提交。它按时间顺序遍历所有日志中的所有批,并沿着日志号将它们输入MemTableInserter。 然后MemTableInserter遍历这些批并将值插入到正确的MemTable中。每个MemTable都知道可以忽略哪些值,以便根据当前正在恢复的日志号进行插入。

要使恢复工作为2PC,我们必须只修改MemTableInserter,以了解我们的四个新的”meta标记”。

请记住:当提交一个两阶段事务时,它包含将作用于多个CFs(多个memtables)的插入。这些内存表将在不同的时间刷新。我们仍然使用CF日志号来避免重复插入已恢复的、两阶段的、已提交的事务。

考虑以下场景:

1.两个阶段的事务TXN插入到CFA和CFB中 2.TXN准备LOG 1 3.TXN在LOG 2中被标记为已提交 4.TXN被插入到MemTables中 5.CFA被刷新为L0 6.CFA log_number现在是LOG 3 7.CFB没有被刷新,它仍然引用LOG 1 prep节 8.崩溃恢复 9.LOG 1仍然存在,因为CFB引用了LOG 1 prep部分 10.迭代从LOG 1开始的日志 11.CFB已将准备好的值重新插入mem,再次引用LOG 1 prep部分 12.CFA跳过了LOG 2中的提交标记的插入,因为它与LOG 3是一致的 13.CFB被刷新到L0,现在与LOG 3保持一致 14.LOG 1 , LOG 2现在可以释放了

重建事务

如前所述,修改恢复路径只需要修改MemTableInserter来处理新的元标记。因为在恢复时,我们不能访问TransactionDB的完整实例,所以必须重新创建空的”shill”事务。 这本质上是对所有回收的准备事务的XID→(WriteBatch, log_number)的映射。当我们点击Commit(xid)标记时,我们尝试为这个xid查找shill事务并重新插入Mem。 如果我们点击回滚(xid)标记,我们将删除shill事务。在恢复结束时,我们剩下一组shill格式的所有准备好的事务。 然后,我们从这些对象重新创建完整的事务,获得所需的锁。RocksDB现在的状态与crash/shutdown之前相同。

日志的寿命

要找到必须保留的最小日志,首先要找到每个列族的最小lognumber

我们还必须考虑TransactionDB中准备好的部分堆中的最小值。这表示包含尚未提交的prep部分的最早日志。

我们还必须考虑所有未被刷新的MemTables和ImmutableMemTables引用的最小prep节日志。

这三个值中的最小值是仍然包含未刷新到L0的数据的最早日志。

与MyRocks集成

  1. 略。