继承

  1. @PublicEvolving
  2. public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> extends RichSinkFunction<IN>
  3. implements CheckpointedFunction, CheckpointListener { ... }

两阶段提交 2PC

  • 在分布式系统中,为了让每个节点都能够感知到其他节点的事务执行状况,需要引入一个中心节点来统一处理所有节点的执行逻辑,这个中心节点叫做协调者(coordinator),被中心节点调度的其他业务节点叫做参与者(participant)。 协调者与参与者

  • 接下来正式介绍2PC。顾名思义,2PC将分布式事务分成了两个阶段,两个阶段分别为提交请求(投票)和提交(执行)。协调者根据参与者的响应来决定是否需要真正地执行事务,具体流程如下。

1. 预提交阶段 (Prepare)

    1. 协调者向所有参与者发送 prepare 请求与内容事件,询问是否可以准备事务提交,并等待参与者的响
    1. 参与者执行事务种包含的事件,并记录undo日志 和 redo 日志 (回滚/重放) 但不会真正提交
    1. 参与者执行内容事件返回事务操作结果,执行成功返回 Yes, 失败返回 No


2. 提交阶段 (Commit / rollback )

    1. 若所有参与者都返回 Yes ,说明可以触发事务提交 Commit
    1. 协调者向所有参与者发送 commit 请求
    1. 参与者收到 commit 请求后,将事务真正地提交上去,并释放占用的事务资源,并向协调者返回ack
    1. 协调者收到所有参与者的ack消息,事务成功完成


    1. 若有参与者返回 no 或者超时未返回,说明事务中断,需要回滚
    1. 协调者向所有参与者发送 rollback 请求。
    1. 参与者收到rollback请求后,根据undo日志回滚到事务执行前的状态,释放占用的事务资源,并向协调者返回ack
    1. 协调者收到所有参与者的ack消息,事务回滚完成

Flink 实现两阶段递交 基于 Kafka

image.png


Flink 两阶段协议 - 预提交阶段

image.png
image.png


两阶段递交 实现基于文件的 SInk TEST

image.png


TwoPhaseCommitSinkFunction

TwoPhaseCommitSinkFunction 抽象方法

  1. // ------ methods that should be implemented in child class to support two phase commit
  2. // algorithm ------
  3. /** Write value within a transaction. */
  4. protected abstract void invoke(TXN transaction, IN value, Context context) throws Exception;
  5. /**
  6. * Method that starts a new transaction.
  7. *
  8. * @return newly created transaction.
  9. */
  10. protected abstract TXN beginTransaction() throws Exception;
  11. /**
  12. * Pre commit previously created transaction. Pre commit must make all of the necessary steps to
  13. * prepare the transaction for a commit that might happen in the future. After this point the
  14. * transaction might still be aborted, but underlying implementation must ensure that commit
  15. * calls on already pre committed transactions will always succeed.
  16. *
  17. * <p>Usually implementation involves flushing the data.
  18. */
  19. protected abstract void preCommit(TXN transaction) throws Exception;
  20. /**
  21. * Commit a pre-committed transaction. If this method fail, Flink application will be restarted
  22. * and {@link TwoPhaseCommitSinkFunction#recoverAndCommit(Object)} will be called again for the
  23. * same transaction.
  24. */
  25. protected abstract void commit(TXN transaction);
  26. /**
  27. * Invoked on recovered transactions after a failure. User implementation must ensure that this
  28. * call will eventually succeed. If it fails, Flink application will be restarted and it will be
  29. * invoked again. If it does not succeed eventually, a data loss will occur. Transactions will
  30. * be recovered in an order in which they were created.
  31. */
  32. protected void recoverAndCommit(TXN transaction) {
  33. commit(transaction);
  34. }
  35. /** Abort a transaction. */
  36. protected abstract void abort(TXN transaction);
  37. /** Abort a transaction that was rejected by a coordinator after a failure. */
  38. protected void recoverAndAbort(TXN transaction) {
  39. abort(transaction);
  40. }


PreCommit

  1. // TODO 预提交阶段
  2. @Override
  3. public void snapshotState(FunctionSnapshotContext context) throws Exception {
  4. // this is like the pre-commit of a 2-phase-commit transaction
  5. // we are ready to commit and remember the transaction
  6. checkState(
  7. currentTransactionHolder != null,
  8. "bug: no transaction object when performing state snapshot");
  9. long checkpointId = context.getCheckpointId();
  10. LOG.debug(
  11. "{} - checkpoint {} triggered, flushing transaction '{}'",
  12. name(),
  13. context.getCheckpointId(),
  14. currentTransactionHolder);
  15. preCommit(currentTransactionHolder.handle);
  16. pendingCommitTransactions.put(checkpointId, currentTransactionHolder);
  17. LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions);
  18. currentTransactionHolder = beginTransactionInternal();
  19. LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);
  20. state.clear();
  21. state.add(
  22. new State<>(
  23. this.currentTransactionHolder,
  24. new ArrayList<>(pendingCommitTransactions.values()),
  25. userContext));
  26. }

Commit

  1. // TODO Commit 方法
  2. @Override
  3. public final void notifyCheckpointComplete(long checkpointId) throws Exception {
  4. Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator =
  5. pendingCommitTransactions.entrySet().iterator();
  6. Throwable firstError = null;
  7. while (pendingTransactionIterator.hasNext()) {
  8. Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();
  9. Long pendingTransactionCheckpointId = entry.getKey();
  10. TransactionHolder<TXN> pendingTransaction = entry.getValue();
  11. if (pendingTransactionCheckpointId > checkpointId) {
  12. continue;
  13. }
  14. LOG.info(
  15. "{} - checkpoint {} complete, committing transaction {} from checkpoint {}",
  16. name(),
  17. checkpointId,
  18. pendingTransaction,
  19. pendingTransactionCheckpointId);
  20. logWarningIfTimeoutAlmostReached(pendingTransaction);
  21. try {
  22. commit(pendingTransaction.handle);
  23. } catch (Throwable t) {
  24. if (firstError == null) {
  25. firstError = t;
  26. }
  27. }
  28. LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);
  29. pendingTransactionIterator.remove();
  30. }
  31. if (firstError != null) {
  32. throw new FlinkRuntimeException(
  33. "Committing one of transactions failed, logging first encountered failure",
  34. firstError);
  35. }
  36. }


参考: https://www.jianshu.com/p/02d6d1103746