今天在排查Flink作业某个task一直卡在CANCELING状态时翻到一块代码,在org.apache.flink.runtime.executiongraph.Execution#cancel中执行cancel的时候会判断当前状态是处于RUNNING或者DEPLOY状态,会发送一个RPCCall进行cancel
// these two are the common cases where we need to send a cancel callelse if (current == RUNNING || current == DEPLOYING) {// try to transition to canceling, if successful, send the cancel callif (transitionState(current, CANCELING)) {sendCancelRpcCall();return;}// else: fall through the loop}
最终会调用slot信息中的taskManagerGateWay发起rpc请求
CompletableFuture<Acknowledge> cancelResultFuture = FutureUtils.retry(() -> taskManagerGateway.cancelTask(attemptId, rpcTimeout),NUM_CANCEL_CALL_TRIES,executor);cancelResultFuture.whenCompleteAsync((ack, failure) -> {if (failure != null) {fail(new Exception("Task could not be canceled.", failure));}},executor);
这里使用的实现类是RpcTaskManagerGateway,直接调用taskExecutorGateway来进行cancel
public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {return taskExecutorGateway.cancelTask(executionAttemptID, timeout);}
这里Execution执行的地方是在JobMaster中,理论上应该有一次rpc调用才对,但是实际代码中看TaskExecutorGateway的实现类只有TaskExecutor,那这里是怎么实现对gateway对象的cancelTask调用转发到远程的TaskExecutor节点的呢?
这里想到应该是使用了动态代理的机制,将请求进行转发
AkkaRpcService
org.apache.flink.runtime.rpc.akka.AkkaRpcService#startServer
在启动rpc server的时候会执行以下逻辑
if (rpcEndpoint instanceof FencedRpcEndpoint) {// a FencedRpcEndpoint needs a FencedAkkaInvocationHandlerakkaInvocationHandler = new FencedAkkaInvocationHandler<>(akkaAddress,hostname,actorRef,timeout,maximumFramesize,terminationFuture,((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken);implementedRpcGateways.add(FencedMainThreadExecutable.class);} else {akkaInvocationHandler = new AkkaInvocationHandler(akkaAddress,hostname,actorRef,timeout,maximumFramesize,terminationFuture);}// Rather than using the System ClassLoader directly, we derive the ClassLoader// from this class . That works better in cases where Flink runs embedded and all Flink// code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoaderClassLoader classLoader = getClass().getClassLoader();@SuppressWarnings("unchecked")RpcServer server = (RpcServer) Proxy.newProxyInstance(classLoader,implementedRpcGateways.toArray(new Class<?>[implementedRpcGateways.size()]),akkaInvocationHandler);
根据endpoint类型选择相应的invocationHandler实现,在动态代理模式中,invocationHandler是realObject真实处理逻辑的地方
AkkaInvocationHandler
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler#invoke
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {Class<?> declaringClass = method.getDeclaringClass();Object result;if (declaringClass.equals(AkkaBasedEndpoint.class) ||declaringClass.equals(Object.class) ||declaringClass.equals(RpcGateway.class) ||declaringClass.equals(StartStoppable.class) ||declaringClass.equals(MainThreadExecutable.class) ||declaringClass.equals(RpcServer.class)) {result = method.invoke(this, args);} else if (declaringClass.equals(FencedRpcGateway.class)) {throw new UnsupportedOperationException("AkkaInvocationHandler does not support the call FencedRpcGateway#" +method.getName() + ". This indicates that you retrieved a FencedRpcGateway without specifying a " +"fencing token. Please use RpcService#connect(RpcService, F, Time) with F being the fencing token to " +"retrieve a properly FencedRpcGateway.");} else {result = invokeRpc(method, args);}return result;}
判断执行method的类是哪个类中的,来区分是在本地还是在远程执行,在invokeRpc中根据调用的类型来使用akka的tell/ask或者同步调用,通过这种方式就很容易封装了rpc的实现
Class<?> returnType = method.getReturnType();final Object result;if (Objects.equals(returnType, Void.TYPE)) {tell(rpcInvocation);result = null;} else if (Objects.equals(returnType, CompletableFuture.class)) {// execute an asynchronous callresult = ask(rpcInvocation, futureTimeout);} else {// execute a synchronous callCompletableFuture<?> futureResult = ask(rpcInvocation, futureTimeout);result = futureResult.get(futureTimeout.getSize(), futureTimeout.getUnit());}
动态代理
https://juejin.im/post/5bac34b4e51d450e5d0b236b
https://www.atatech.org/articles/81885
https://juejin.im/post/5a99048a6fb9a028d5668e62
https://www.cnkirito.moe/rpc-dynamic-proxy/
