今天在排查Flink作业某个task一直卡在CANCELING状态时翻到一块代码,在org.apache.flink.runtime.executiongraph.Execution#cancel中执行cancel的时候会判断当前状态是处于RUNNING或者DEPLOY状态,会发送一个RPCCall进行cancel

  1. // these two are the common cases where we need to send a cancel call
  2. else if (current == RUNNING || current == DEPLOYING) {
  3. // try to transition to canceling, if successful, send the cancel call
  4. if (transitionState(current, CANCELING)) {
  5. sendCancelRpcCall();
  6. return;
  7. }
  8. // else: fall through the loop
  9. }

最终会调用slot信息中的taskManagerGateWay发起rpc请求

  1. CompletableFuture<Acknowledge> cancelResultFuture = FutureUtils.retry(
  2. () -> taskManagerGateway.cancelTask(attemptId, rpcTimeout),
  3. NUM_CANCEL_CALL_TRIES,
  4. executor);
  5. cancelResultFuture.whenCompleteAsync(
  6. (ack, failure) -> {
  7. if (failure != null) {
  8. fail(new Exception("Task could not be canceled.", failure));
  9. }
  10. },
  11. executor);

这里使用的实现类是RpcTaskManagerGateway,直接调用taskExecutorGateway来进行cancel

  1. public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
  2. return taskExecutorGateway.cancelTask(executionAttemptID, timeout);
  3. }

这里Execution执行的地方是在JobMaster中,理论上应该有一次rpc调用才对,但是实际代码中看TaskExecutorGateway的实现类只有TaskExecutor,那这里是怎么实现对gateway对象的cancelTask调用转发到远程的TaskExecutor节点的呢?

这里想到应该是使用了动态代理的机制,将请求进行转发

AkkaRpcService

org.apache.flink.runtime.rpc.akka.AkkaRpcService#startServer

在启动rpc server的时候会执行以下逻辑

  1. if (rpcEndpoint instanceof FencedRpcEndpoint) {
  2. // a FencedRpcEndpoint needs a FencedAkkaInvocationHandler
  3. akkaInvocationHandler = new FencedAkkaInvocationHandler<>(
  4. akkaAddress,
  5. hostname,
  6. actorRef,
  7. timeout,
  8. maximumFramesize,
  9. terminationFuture,
  10. ((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken);
  11. implementedRpcGateways.add(FencedMainThreadExecutable.class);
  12. } else {
  13. akkaInvocationHandler = new AkkaInvocationHandler(
  14. akkaAddress,
  15. hostname,
  16. actorRef,
  17. timeout,
  18. maximumFramesize,
  19. terminationFuture);
  20. }
  21. // Rather than using the System ClassLoader directly, we derive the ClassLoader
  22. // from this class . That works better in cases where Flink runs embedded and all Flink
  23. // code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
  24. ClassLoader classLoader = getClass().getClassLoader();
  25. @SuppressWarnings("unchecked")
  26. RpcServer server = (RpcServer) Proxy.newProxyInstance(
  27. classLoader,
  28. implementedRpcGateways.toArray(new Class<?>[implementedRpcGateways.size()]),
  29. akkaInvocationHandler);

根据endpoint类型选择相应的invocationHandler实现,在动态代理模式中,invocationHandler是realObject真实处理逻辑的地方

AkkaInvocationHandler

org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler#invoke

  1. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  2. Class<?> declaringClass = method.getDeclaringClass();
  3. Object result;
  4. if (declaringClass.equals(AkkaBasedEndpoint.class) ||
  5. declaringClass.equals(Object.class) ||
  6. declaringClass.equals(RpcGateway.class) ||
  7. declaringClass.equals(StartStoppable.class) ||
  8. declaringClass.equals(MainThreadExecutable.class) ||
  9. declaringClass.equals(RpcServer.class)) {
  10. result = method.invoke(this, args);
  11. } else if (declaringClass.equals(FencedRpcGateway.class)) {
  12. throw new UnsupportedOperationException("AkkaInvocationHandler does not support the call FencedRpcGateway#" +
  13. method.getName() + ". This indicates that you retrieved a FencedRpcGateway without specifying a " +
  14. "fencing token. Please use RpcService#connect(RpcService, F, Time) with F being the fencing token to " +
  15. "retrieve a properly FencedRpcGateway.");
  16. } else {
  17. result = invokeRpc(method, args);
  18. }
  19. return result;
  20. }

判断执行method的类是哪个类中的,来区分是在本地还是在远程执行,在invokeRpc中根据调用的类型来使用akka的tell/ask或者同步调用,通过这种方式就很容易封装了rpc的实现

  1. Class<?> returnType = method.getReturnType();
  2. final Object result;
  3. if (Objects.equals(returnType, Void.TYPE)) {
  4. tell(rpcInvocation);
  5. result = null;
  6. } else if (Objects.equals(returnType, CompletableFuture.class)) {
  7. // execute an asynchronous call
  8. result = ask(rpcInvocation, futureTimeout);
  9. } else {
  10. // execute a synchronous call
  11. CompletableFuture<?> futureResult = ask(rpcInvocation, futureTimeout);
  12. result = futureResult.get(futureTimeout.getSize(), futureTimeout.getUnit());
  13. }

动态代理

https://juejin.im/post/5bac34b4e51d450e5d0b236b
https://www.atatech.org/articles/81885
https://juejin.im/post/5a99048a6fb9a028d5668e62
https://www.cnkirito.moe/rpc-dynamic-proxy/