今天在排查Flink作业某个task一直卡在CANCELING状态时翻到一块代码,在org.apache.flink.runtime.executiongraph.Execution#cancel中执行cancel的时候会判断当前状态是处于RUNNING或者DEPLOY状态,会发送一个RPCCall进行cancel
// these two are the common cases where we need to send a cancel call
else if (current == RUNNING || current == DEPLOYING) {
// try to transition to canceling, if successful, send the cancel call
if (transitionState(current, CANCELING)) {
sendCancelRpcCall();
return;
}
// else: fall through the loop
}
最终会调用slot信息中的taskManagerGateWay发起rpc请求
CompletableFuture<Acknowledge> cancelResultFuture = FutureUtils.retry(
() -> taskManagerGateway.cancelTask(attemptId, rpcTimeout),
NUM_CANCEL_CALL_TRIES,
executor);
cancelResultFuture.whenCompleteAsync(
(ack, failure) -> {
if (failure != null) {
fail(new Exception("Task could not be canceled.", failure));
}
},
executor);
这里使用的实现类是RpcTaskManagerGateway,直接调用taskExecutorGateway来进行cancel
public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
return taskExecutorGateway.cancelTask(executionAttemptID, timeout);
}
这里Execution执行的地方是在JobMaster中,理论上应该有一次rpc调用才对,但是实际代码中看TaskExecutorGateway的实现类只有TaskExecutor,那这里是怎么实现对gateway对象的cancelTask调用转发到远程的TaskExecutor节点的呢?
这里想到应该是使用了动态代理的机制,将请求进行转发
AkkaRpcService
org.apache.flink.runtime.rpc.akka.AkkaRpcService#startServer
在启动rpc server的时候会执行以下逻辑
if (rpcEndpoint instanceof FencedRpcEndpoint) {
// a FencedRpcEndpoint needs a FencedAkkaInvocationHandler
akkaInvocationHandler = new FencedAkkaInvocationHandler<>(
akkaAddress,
hostname,
actorRef,
timeout,
maximumFramesize,
terminationFuture,
((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken);
implementedRpcGateways.add(FencedMainThreadExecutable.class);
} else {
akkaInvocationHandler = new AkkaInvocationHandler(
akkaAddress,
hostname,
actorRef,
timeout,
maximumFramesize,
terminationFuture);
}
// Rather than using the System ClassLoader directly, we derive the ClassLoader
// from this class . That works better in cases where Flink runs embedded and all Flink
// code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
ClassLoader classLoader = getClass().getClassLoader();
@SuppressWarnings("unchecked")
RpcServer server = (RpcServer) Proxy.newProxyInstance(
classLoader,
implementedRpcGateways.toArray(new Class<?>[implementedRpcGateways.size()]),
akkaInvocationHandler);
根据endpoint类型选择相应的invocationHandler实现,在动态代理模式中,invocationHandler是realObject真实处理逻辑的地方
AkkaInvocationHandler
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler#invoke
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Class<?> declaringClass = method.getDeclaringClass();
Object result;
if (declaringClass.equals(AkkaBasedEndpoint.class) ||
declaringClass.equals(Object.class) ||
declaringClass.equals(RpcGateway.class) ||
declaringClass.equals(StartStoppable.class) ||
declaringClass.equals(MainThreadExecutable.class) ||
declaringClass.equals(RpcServer.class)) {
result = method.invoke(this, args);
} else if (declaringClass.equals(FencedRpcGateway.class)) {
throw new UnsupportedOperationException("AkkaInvocationHandler does not support the call FencedRpcGateway#" +
method.getName() + ". This indicates that you retrieved a FencedRpcGateway without specifying a " +
"fencing token. Please use RpcService#connect(RpcService, F, Time) with F being the fencing token to " +
"retrieve a properly FencedRpcGateway.");
} else {
result = invokeRpc(method, args);
}
return result;
}
判断执行method的类是哪个类中的,来区分是在本地还是在远程执行,在invokeRpc中根据调用的类型来使用akka的tell/ask或者同步调用,通过这种方式就很容易封装了rpc的实现
Class<?> returnType = method.getReturnType();
final Object result;
if (Objects.equals(returnType, Void.TYPE)) {
tell(rpcInvocation);
result = null;
} else if (Objects.equals(returnType, CompletableFuture.class)) {
// execute an asynchronous call
result = ask(rpcInvocation, futureTimeout);
} else {
// execute a synchronous call
CompletableFuture<?> futureResult = ask(rpcInvocation, futureTimeout);
result = futureResult.get(futureTimeout.getSize(), futureTimeout.getUnit());
}
动态代理
https://juejin.im/post/5bac34b4e51d450e5d0b236b
https://www.atatech.org/articles/81885
https://juejin.im/post/5a99048a6fb9a028d5668e62
https://www.cnkirito.moe/rpc-dynamic-proxy/