继承
@PublicEvolvingpublic abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> extends RichSinkFunction<IN>implements CheckpointedFunction, CheckpointListener { ... }
- 继承
RichSinkFunction - 实现
CheckpointedFunction - 实现
CheckpointListener
两阶段提交 2PC
在分布式系统中,为了让每个节点都能够感知到其他节点的事务执行状况,需要引入一个中心节点来统一处理所有节点的执行逻辑,这个中心节点叫做协调者(coordinator),被中心节点调度的其他业务节点叫做参与者(participant)。 协调者与参与者
接下来正式介绍2PC。顾名思义,2PC将分布式事务分成了两个阶段,两个阶段分别为提交请求(投票)和提交(执行)。协调者根据参与者的响应来决定是否需要真正地执行事务,具体流程如下。
1. 预提交阶段 (Prepare)
- 协调者向所有参与者发送 prepare 请求与内容事件,询问是否可以准备事务提交,并等待参与者的响
- 参与者执行事务种包含的事件,并记录undo日志 和 redo 日志 (回滚/重放) 但不会真正提交
- 参与者执行内容事件返回事务操作结果,执行成功返回 Yes, 失败返回 No
2. 提交阶段 (Commit / rollback )
- 若所有参与者都返回 Yes ,说明可以触发事务提交 Commit
- 协调者向所有参与者发送 commit 请求
- 参与者收到 commit 请求后,将事务真正地提交上去,并释放占用的事务资源,并向协调者返回ack
- 协调者收到所有参与者的ack消息,事务成功完成
- 若有参与者返回 no 或者超时未返回,说明事务中断,需要回滚
- 协调者向所有参与者发送 rollback 请求。
- 参与者收到rollback请求后,根据undo日志回滚到事务执行前的状态,释放占用的事务资源,并向协调者返回ack
- 协调者收到所有参与者的ack消息,事务回滚完成
Flink 实现两阶段递交 基于 Kafka

Flink 两阶段协议 - 预提交阶段


两阶段递交 实现基于文件的 SInk TEST

TwoPhaseCommitSinkFunction
TwoPhaseCommitSinkFunction 抽象方法
// ------ methods that should be implemented in child class to support two phase commit// algorithm ------/** Write value within a transaction. */protected abstract void invoke(TXN transaction, IN value, Context context) throws Exception;/*** Method that starts a new transaction.** @return newly created transaction.*/protected abstract TXN beginTransaction() throws Exception;/*** Pre commit previously created transaction. Pre commit must make all of the necessary steps to* prepare the transaction for a commit that might happen in the future. After this point the* transaction might still be aborted, but underlying implementation must ensure that commit* calls on already pre committed transactions will always succeed.** <p>Usually implementation involves flushing the data.*/protected abstract void preCommit(TXN transaction) throws Exception;/*** Commit a pre-committed transaction. If this method fail, Flink application will be restarted* and {@link TwoPhaseCommitSinkFunction#recoverAndCommit(Object)} will be called again for the* same transaction.*/protected abstract void commit(TXN transaction);/*** Invoked on recovered transactions after a failure. User implementation must ensure that this* call will eventually succeed. If it fails, Flink application will be restarted and it will be* invoked again. If it does not succeed eventually, a data loss will occur. Transactions will* be recovered in an order in which they were created.*/protected void recoverAndCommit(TXN transaction) {commit(transaction);}/** Abort a transaction. */protected abstract void abort(TXN transaction);/** Abort a transaction that was rejected by a coordinator after a failure. */protected void recoverAndAbort(TXN transaction) {abort(transaction);}
PreCommit
// TODO 预提交阶段@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {// this is like the pre-commit of a 2-phase-commit transaction// we are ready to commit and remember the transactioncheckState(currentTransactionHolder != null,"bug: no transaction object when performing state snapshot");long checkpointId = context.getCheckpointId();LOG.debug("{} - checkpoint {} triggered, flushing transaction '{}'",name(),context.getCheckpointId(),currentTransactionHolder);preCommit(currentTransactionHolder.handle);pendingCommitTransactions.put(checkpointId, currentTransactionHolder);LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions);currentTransactionHolder = beginTransactionInternal();LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);state.clear();state.add(new State<>(this.currentTransactionHolder,new ArrayList<>(pendingCommitTransactions.values()),userContext));}
Commit
// TODO Commit 方法@Overridepublic final void notifyCheckpointComplete(long checkpointId) throws Exception {Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator =pendingCommitTransactions.entrySet().iterator();Throwable firstError = null;while (pendingTransactionIterator.hasNext()) {Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();Long pendingTransactionCheckpointId = entry.getKey();TransactionHolder<TXN> pendingTransaction = entry.getValue();if (pendingTransactionCheckpointId > checkpointId) {continue;}LOG.info("{} - checkpoint {} complete, committing transaction {} from checkpoint {}",name(),checkpointId,pendingTransaction,pendingTransactionCheckpointId);logWarningIfTimeoutAlmostReached(pendingTransaction);try {commit(pendingTransaction.handle);} catch (Throwable t) {if (firstError == null) {firstError = t;}}LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);pendingTransactionIterator.remove();}if (firstError != null) {throw new FlinkRuntimeException("Committing one of transactions failed, logging first encountered failure",firstError);}}
