继承
@PublicEvolving
public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> extends RichSinkFunction<IN>
implements CheckpointedFunction, CheckpointListener { ... }
- 继承
RichSinkFunction
- 实现
CheckpointedFunction
- 实现
CheckpointListener
两阶段提交 2PC
在分布式系统中,为了让每个节点都能够感知到其他节点的事务执行状况,需要引入一个中心节点来统一处理所有节点的执行逻辑,这个中心节点叫做协调者(coordinator),被中心节点调度的其他业务节点叫做参与者(participant)。 协调者与参与者
接下来正式介绍2PC。顾名思义,2PC将分布式事务分成了两个阶段,两个阶段分别为提交请求(投票)和提交(执行)。协调者根据参与者的响应来决定是否需要真正地执行事务,具体流程如下。
1. 预提交阶段 (Prepare)
- 协调者向所有参与者发送 prepare 请求与内容事件,询问是否可以准备事务提交,并等待参与者的响
- 参与者执行事务种包含的事件,并记录undo日志 和 redo 日志 (回滚/重放) 但不会真正提交
- 参与者执行内容事件返回事务操作结果,执行成功返回 Yes, 失败返回 No
2. 提交阶段 (Commit / rollback )
- 若所有参与者都返回 Yes ,说明可以触发事务提交 Commit
- 协调者向所有参与者发送 commit 请求
- 参与者收到 commit 请求后,将事务真正地提交上去,并释放占用的事务资源,并向协调者返回ack
- 协调者收到所有参与者的ack消息,事务成功完成
- 若有参与者返回 no 或者超时未返回,说明事务中断,需要回滚
- 协调者向所有参与者发送 rollback 请求。
- 参与者收到rollback请求后,根据undo日志回滚到事务执行前的状态,释放占用的事务资源,并向协调者返回ack
- 协调者收到所有参与者的ack消息,事务回滚完成
Flink 实现两阶段递交 基于 Kafka
Flink 两阶段协议 - 预提交阶段
两阶段递交 实现基于文件的 SInk TEST
TwoPhaseCommitSinkFunction
TwoPhaseCommitSinkFunction 抽象方法
// ------ methods that should be implemented in child class to support two phase commit
// algorithm ------
/** Write value within a transaction. */
protected abstract void invoke(TXN transaction, IN value, Context context) throws Exception;
/**
* Method that starts a new transaction.
*
* @return newly created transaction.
*/
protected abstract TXN beginTransaction() throws Exception;
/**
* Pre commit previously created transaction. Pre commit must make all of the necessary steps to
* prepare the transaction for a commit that might happen in the future. After this point the
* transaction might still be aborted, but underlying implementation must ensure that commit
* calls on already pre committed transactions will always succeed.
*
* <p>Usually implementation involves flushing the data.
*/
protected abstract void preCommit(TXN transaction) throws Exception;
/**
* Commit a pre-committed transaction. If this method fail, Flink application will be restarted
* and {@link TwoPhaseCommitSinkFunction#recoverAndCommit(Object)} will be called again for the
* same transaction.
*/
protected abstract void commit(TXN transaction);
/**
* Invoked on recovered transactions after a failure. User implementation must ensure that this
* call will eventually succeed. If it fails, Flink application will be restarted and it will be
* invoked again. If it does not succeed eventually, a data loss will occur. Transactions will
* be recovered in an order in which they were created.
*/
protected void recoverAndCommit(TXN transaction) {
commit(transaction);
}
/** Abort a transaction. */
protected abstract void abort(TXN transaction);
/** Abort a transaction that was rejected by a coordinator after a failure. */
protected void recoverAndAbort(TXN transaction) {
abort(transaction);
}
PreCommit
// TODO 预提交阶段
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// this is like the pre-commit of a 2-phase-commit transaction
// we are ready to commit and remember the transaction
checkState(
currentTransactionHolder != null,
"bug: no transaction object when performing state snapshot");
long checkpointId = context.getCheckpointId();
LOG.debug(
"{} - checkpoint {} triggered, flushing transaction '{}'",
name(),
context.getCheckpointId(),
currentTransactionHolder);
preCommit(currentTransactionHolder.handle);
pendingCommitTransactions.put(checkpointId, currentTransactionHolder);
LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions);
currentTransactionHolder = beginTransactionInternal();
LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);
state.clear();
state.add(
new State<>(
this.currentTransactionHolder,
new ArrayList<>(pendingCommitTransactions.values()),
userContext));
}
Commit
// TODO Commit 方法
@Override
public final void notifyCheckpointComplete(long checkpointId) throws Exception {
Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator =
pendingCommitTransactions.entrySet().iterator();
Throwable firstError = null;
while (pendingTransactionIterator.hasNext()) {
Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();
Long pendingTransactionCheckpointId = entry.getKey();
TransactionHolder<TXN> pendingTransaction = entry.getValue();
if (pendingTransactionCheckpointId > checkpointId) {
continue;
}
LOG.info(
"{} - checkpoint {} complete, committing transaction {} from checkpoint {}",
name(),
checkpointId,
pendingTransaction,
pendingTransactionCheckpointId);
logWarningIfTimeoutAlmostReached(pendingTransaction);
try {
commit(pendingTransaction.handle);
} catch (Throwable t) {
if (firstError == null) {
firstError = t;
}
}
LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);
pendingTransactionIterator.remove();
}
if (firstError != null) {
throw new FlinkRuntimeException(
"Committing one of transactions failed, logging first encountered failure",
firstError);
}
}