本文基于 Spark 3.4.0-SNAPSHOT

概述

作为分布式计算框架,Spark 很多地方涉及到网络通信:

  • Spark 各组件间消息互通
  • 用户文件与 Jar 包上传
  • 节点间的 shuffle 过程
  • Block 数据的复制与备份

Spark 1.6 之前,Spark RPC 是基于 Akka 实现的,Akka 是一个基于 Scala 的异步消息框架,但由于 Akka 不适合大文件传输,所以采用 Jetty 实现的 HttpFileServer。但在 Spark 1.6 中移除了 Akka 。
原因概括为

  • 很多 Spark 用户也使用 Akka,但是 Akka 不同版本之间无法互相通信,要求用户必须使用跟 Spark 完全一样的 Akka 版本,导致用户无法升级 Akka;
  • Spark 的 Akka 配置是针对 Spark 自身来调优的,可能跟用户自己代码中的 Akka 配置冲突
  • Spark 用的 Akka 特性很少,该部分特性很容易自己实现。同时,这部分代码量相比 Akka 来说少很多,debug 比较容易。如果遇到什么 bug,也可以自己马上 fix,不需要等 Akka 上游发布新版本。而且,Spark 升级 Akka 本身又因为第一点会强制要求用户升级他们使用的 Akka,是不现实的。

在 Spark 2.0.0 中也移除了 Jetty,在 Spark 2.0.0 借鉴了 Akka 的设计,重构了基于 Netty 的 RPC 框架体系,其中 RPC 和大文件传输都是使用 Netty。 :::tips 注:

  1. Akka 是基于 Actor 并发编程模型实现的并发的分布式的框架。Akka 是用 Scala 语言编写的,它提供了Java 和 Scala 两种语言的 API,减少开发人员对并发的细节处理,并保证分布式调用的最终一致性。
  2. Jetty 是一个开源的 Servlet 容器,它为基于 Java 的 Web 容器,例如 JSP 和 Servlet 提供运行环境。Jetty 是使用 Java 语言编写的,它的 API 以一组 JAR 包的形式发布。开发人员可以将 Jetty 容器实例化成一个对象,可以迅速为一些独立运行的 Java 应用提供网络和 We b连接。
  3. Netty 是由 Jboss 提供的一个基于 NIO 的客户、服务器端编程框架,使用 Netty 可以确保你快速、简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。 :::

    基本概念

    [Spark RPC] 概述与基本概念 - 图1下面介绍一些重要概念:
  • RpcEnv:RpcEnv 抽象类表示一个 RPC Environment,管理整个 RpcEndpoint 的生命周期,每个 Rpc 端点运行时依赖的环境称之为 RpcEnv;
    1. /**
    2. * An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to
    3. * receives messages. Then [[RpcEnv]] will process messages sent from [[RpcEndpointRef]] or remote
    4. * nodes, and deliver them to corresponding [[RpcEndpoint]]s. For uncaught exceptions caught by
    5. * [[RpcEnv]], [[RpcEnv]] will use [[RpcCallContext.sendFailure]] to send exceptions back to the
    6. * sender, or logging them if no such sender or `NotSerializableException`.
    7. *
    8. * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s given name or uri.
    9. */
    10. private[spark] abstract class RpcEnv(conf: SparkConf) {
    11. // ...
    12. }
    [Spark RPC] 概述与基本概念 - 图2
  • NettyRpcEnv:RpcEnv 的唯一实现类
  • RpcEndpoint:Rpc 端点,Spark 将每个通信实体都称之为一个 Rpc 端点,且都实现 RpcEndpoint 接口,比如 DriverEndpoint、BlockManagerMasterEndpoint,内部根据不同端点的需求,设计不同的消息和不同的业务处理

    1. /**
    2. * An end point for the RPC that defines what functions to trigger given a message.
    3. *
    4. * It is guaranteed that `onStart`, `receive` and `onStop` will be called in sequence.
    5. *
    6. * The life-cycle of an endpoint is:
    7. *
    8. * {@code constructor -> onStart -> receive* -> onStop}
    9. *
    10. * Note: `receive` can be called concurrently. If you want `receive` to be thread-safe, please use
    11. * [[ThreadSafeRpcEndpoint]]
    12. *
    13. * If any error is thrown from one of [[RpcEndpoint]] methods except `onError`, `onError` will be
    14. * invoked with the cause. If `onError` throws an error, [[RpcEnv]] will ignore it.
    15. */
    16. private[spark] trait RpcEndpoint {
    17. // ...
    18. }
  • Dispatcher:消息分发器(来自 Netty 的概念),负责将 RpcMessage 分发至对应的 RpcEndpoint。Dispatcher 中包含一个 MessageLoop,它读取 LinkedBlockingQueue 中投递 RpcMessage,根据客户端指定的 Endpoint 标识,找到 Endpoint 的 Inbox,然后投递进去,由于是阻塞队列,当没有消息时自然阻塞,一旦有消息,就开始工作。Dispatcher 的 ThreadPool 负责消费这些 Message。

    1. /**
    2. * A message dispatcher, responsible for routing RPC messages to the appropriate endpoint(s).
    3. *
    4. * @param numUsableCores Number of CPU cores allocated to the process, for sizing the thread pool.
    5. * If 0, will consider the available CPUs on the host.
    6. */
    7. private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) extends Logging {
    8. // ...
    9. }
  • Inbox:一个本地端点对应一个收件箱,Inbox 里面有一个 InboxMessage 的链表,InboxMessage 有很多子类:

    • RpcMessage:远程调用
    • OneWayMessage:远程调用过来的 fire-and-forget 单向消息
    • onStart、onStop:服务启动、链路断开
      1. /**
      2. * An inbox that stores messages for an [[RpcEndpoint]] and posts messages to it thread-safely.
      3. */
      4. private[netty] class Inbox(val endpointName: String, val endpoint: RpcEndpoint)
      5. extends Logging {
      6. // ...
      7. }
  • RpcEndPointRef:RpcEndpointRef 是一个对 RpcEndpoint 的远程引用对象,通过它可以向远程的 RpcEndpoint 端发送消息以进行通信。

    1. /**
    2. * A reference for a remote [[RpcEndpoint]]. [[RpcEndpointRef]] is thread-safe.
    3. */
    4. private[spark] abstract class RpcEndpointRef(conf: SparkConf)
    5. extends Serializable with Logging {
    6. // ...
    7. }
  • NettyRpcEndpointRef:RpcEndpointRef 的实现类

    • 在拥有 RpcEndpoint 节点上,它是一个围绕 RpcEndpointAddress 实例的简单包装
    • 在其他收到引用的序列化机器上,它会跟踪发送引用的 TransportClient,因此到端点的消息将通过客户端连接发送,而不是需要打开一个新的连接
      1. /**
      2. * The NettyRpcEnv version of RpcEndpointRef.
      3. *
      4. * This class behaves differently depending on where it's created. On the node that "owns" the
      5. * RpcEndpoint, it's a simple wrapper around the RpcEndpointAddress instance.
      6. *
      7. * On other machines that receive a serialized version of the reference, the behavior changes. The
      8. * instance will keep track of the TransportClient that sent the reference, so that messages
      9. * to the endpoint are sent over the client connection, instead of needing a new connection to
      10. * be opened.
      11. *
      12. * The RpcAddress of this ref can be null; what that means is that the ref can only be used through
      13. * a client connection, since the process hosting the endpoint is not listening for incoming
      14. * connections. These refs should not be shared with 3rd parties, since they will not be able to
      15. * send messages to the endpoint.
      16. *
      17. * @param conf Spark configuration.
      18. * @param endpointAddress The address where the endpoint is listening.
      19. * @param nettyEnv The RpcEnv associated with this ref.
      20. */
      21. private[netty] class NettyRpcEndpointRef(
      22. @transient private val conf: SparkConf,
      23. private val endpointAddress: RpcEndpointAddress,
      24. @transient @volatile private var nettyEnv: NettyRpcEnv) extends RpcEndpointRef(conf) {
      25. // ...
      26. }
  • RpcEndpointAddress:主要包含了 RpcAddress (host 和 port)和 Rpc Endpoint Name 信息

  • Outbox:一个远程端点对应一个发件箱,NettyRpcEnv 中包含一个 ConcurrentHashMap[RpcAddress, Outbox]。当消息放入 Outbox 后,紧接着将消息通过 TransportClient 发送出去。
  • TransportContext
    • 创建 TransportServer、TransportClientFactory
    • 使用 TransportChannelHandler 建立 Netty channel pipeline 的上下文

TransportClient 提供了两种通信协议:控制层面的 RPC 以及数据层面的 chunk 抓取。用户通过构造方法传入的 rpcHandler 负责处理 RPC 请求。并且 rpcHandler 负责设置流,这些流可以使用零拷贝 IO 以数据块的形式流式传输。TransportServer 和 TransportClientFactory 都为每一个 channel 创建一个 TransportChannelHandler 对象。每一个 TransportChannelHandler 包含一个 TransportClient,这使服务器进程能够在现有通道上将消息发送回客户端。

  1. /**
  2. * Contains the context to create a {@link TransportServer}, {@link TransportClientFactory}, and to
  3. * setup Netty Channel pipelines with a
  4. * {@link org.apache.spark.network.server.TransportChannelHandler}.
  5. *
  6. * There are two communication protocols that the TransportClient provides, control-plane RPCs and
  7. * data-plane "chunk fetching". The handling of the RPCs is performed outside of the scope of the
  8. * TransportContext (i.e., by a user-provided handler), and it is responsible for setting up streams
  9. * which can be streamed through the data plane in chunks using zero-copy IO.
  10. *
  11. * The TransportServer and TransportClientFactory both create a TransportChannelHandler for each
  12. * channel. As each TransportChannelHandler contains a TransportClient, this enables server
  13. * processes to send messages back to the client on an existing channel.
  14. */
  15. public class TransportContext implements Closeable {
  16. // ...
  17. }
  • TransportServer:TransportServer是RPC框架的服务端,可提供高效的、低级别的流服务。
  • TransportServerBootstrap:定义了服务端引导程序的规范,服务端引导程序旨在当客户端与服务端建立连接之后,在服务端持有的客户端管道上执行的引导程序。用于初始化TransportServer

    1. /**
    2. * A bootstrap which is executed on a TransportServer's client channel once a client connects
    3. * to the server. This allows customizing the client channel to allow for things such as SASL
    4. * authentication.
    5. */
    6. public interface TransportServerBootstrap {
    7. // ...
    8. }
  • TransportClientFactory:创建传输客户端(TransportClient)的传输客户端工厂类。工厂维持着一个连接池,一个远端 host 对应一个客户端。也可以让所有的客户端共享一个工作线程池。

    1. /**
    2. * Factory for creating {@link TransportClient}s by using createClient.
    3. *
    4. * The factory maintains a connection pool to other hosts and should return the same
    5. * TransportClient for the same remote host. It also shares a single worker thread pool for
    6. * all TransportClients.
    7. *
    8. * TransportClients will be reused whenever possible. Prior to completing the creation of a new
    9. * TransportClient, all given {@link TransportClientBootstrap}s will be run.
    10. */
    11. public class TransportClientFactory implements Closeable {
    12. // ...
    13. }
  • TransportClient:RPC 框架的客户端,用于获取预先协商好的流中的连续块。TransportClient 旨在允许有效传输大量数据,这些数据将被拆分成几百 KB 到几 MB 的块。简言之,可以认为 TransportClient 就是 Spark Rpc 最底层的基础客户端类。主要用于向 server 端发送 rpc 请求和从 server 端获取流的 chunk 块。

    1. /**
    2. * Client for fetching consecutive chunks of a pre-negotiated stream. This API is intended to allow
    3. * efficient transfer of a large amount of data, broken up into chunks with size ranging from
    4. * hundreds of KB to a few MB.
    5. *
    6. * Note that while this client deals with the fetching of chunks from a stream (i.e., data plane),
    7. * the actual setup of the streams is done outside the scope of the transport layer. The convenience
    8. * method "sendRPC" is provided to enable control plane communication between the client and server
    9. * to perform this setup.
    10. *
    11. * For example, a typical workflow might be:
    12. * client.sendRPC(new OpenFile("/foo")) --> returns StreamId = 100
    13. * client.fetchChunk(streamId = 100, chunkIndex = 0, callback)
    14. * client.fetchChunk(streamId = 100, chunkIndex = 1, callback)
    15. * ...
    16. * client.sendRPC(new CloseStream(100))
    17. *
    18. * Construct an instance of TransportClient using {@link TransportClientFactory}. A single
    19. * TransportClient may be used for multiple streams, but any given stream must be restricted to a
    20. * single client, in order to avoid out-of-order responses.
    21. *
    22. * NB: This class is used to make requests to the server, while {@link TransportResponseHandler} is
    23. * responsible for handling responses from the server.
    24. *
    25. * Concurrency: thread safe and can be called from multiple threads.
    26. */
    27. public class TransportClient implements Closeable {
    28. // ...
    29. }
  • TransportClientBootstrap:是在TransportClient上执行的客户端引导程序,主要对连接建立时进行一些初始化的准备(例如验证、加密)。TransportClientBootstrap所作的操作往往是昂贵的,好在建立的连接可以重用。用于初始化TransportClient。

    1. /**
    2. * A bootstrap which is executed on a TransportClient before it is returned to the user.
    3. * This enables an initial exchange of information (e.g., SASL authentication tokens) on a once-per-
    4. * connection basis.
    5. *
    6. * Since connections (and TransportClients) are reused as much as possible, it is generally
    7. * reasonable to perform an expensive bootstrapping operation, as they often share a lifespan with
    8. * the JVM itself.
    9. */
    10. public interface TransportClientBootstrap {
    11. // ...
    12. }
  • TransportChannelHandler:用于分发 request 到 TransportRequestHandler,分发 response 到 TransportResponseHandler。这是因为全双工通信通道,所以无论是客户端还是服务端,都需要处理 request 与 response。

    1. /**
    2. * The single Transport-level Channel handler which is used for delegating requests to the
    3. * {@link TransportRequestHandler} and responses to the {@link TransportResponseHandler}.
    4. *
    5. * All channels created in the transport layer are bidirectional. When the Client initiates a Netty
    6. * Channel with a RequestMessage (which gets handled by the Server's RequestHandler), the Server
    7. * will produce a ResponseMessage (handled by the Client's ResponseHandler). However, the Server
    8. * also gets a handle on the same Channel, so it may then begin to send RequestMessages to the
    9. * Client.
    10. * This means that the Client also needs a RequestHandler and the Server needs a ResponseHandler,
    11. * for the Client's responses to the Server's requests.
    12. *
    13. * This class also handles timeouts from a {@link io.netty.handler.timeout.IdleStateHandler}.
    14. * We consider a connection timed out if there are outstanding fetch or RPC requests but no traffic
    15. * on the channel for at least `requestTimeoutMs`. Note that this is duplex traffic; we will not
    16. * timeout if the client is continuously sending but getting no responses, for simplicity.
    17. */
    18. public class TransportChannelHandler extends SimpleChannelInboundHandler<Message> {
    19. // ...
    20. }
  • TransportResponseHandler:用于处理服务端的响应,并且对发出请求的客户端进行响应的处理程序。

    1. /**
    2. * Handler that processes server responses, in response to requests issued from a
    3. * [[TransportClient]]. It works by tracking the list of outstanding requests (and their callbacks).
    4. *
    5. * Concurrency: thread safe and can be called from multiple threads.
    6. */
    7. public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
    8. // ...
    9. }
  • TransportRequestHandler:用于处理客户端的请求并在写完块数据后返回的处理程序。

    1. /**
    2. * A handler that processes requests from clients and writes chunk data back. Each handler is
    3. * attached to a single Netty channel, and keeps track of which streams have been fetched via this
    4. * channel, in order to clean them up if the channel is terminated (see #channelUnregistered).
    5. *
    6. * The messages should have been processed by the pipeline setup by {@link TransportServer}.
    7. */
    8. public class TransportRequestHandler extends MessageHandler<RequestMessage> {
    9. // ...
    10. }
  • MessageEncoder:�在将消息放入管道前,先对消息内容进行编码,防止管道另一端读取时丢包和解析错误。

    1. /**
    2. * Encoder used by the server side to encode server-to-client responses.
    3. * This encoder is stateless so it is safe to be shared by multiple threads.
    4. */
    5. @ChannelHandler.Sharable
    6. public final class MessageEncoder extends MessageToMessageEncoder<Message> {
    7. // ...
    8. }
  • MessageDecoder:对从管道中读取的 ByteBuf 进行解析,防止丢包和解析错误。

    1. /**
    2. * Decoder used by the client side to encode server-to-client responses.
    3. * This encoder is stateless so it is safe to be shared by multiple threads.
    4. */
    5. @ChannelHandler.Sharable
    6. public final class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {
    7. // ...
    8. }
  • TransportFrameDecoder:对从管道中读取的ByteBuf按照数据帧进行解析

    1. /**
    2. * A customized frame decoder that allows intercepting raw data.
    3. * <p>
    4. * This behaves like Netty's frame decoder (with hard coded parameters that match this library's
    5. * needs), except it allows an interceptor to be installed to read data directly before it's
    6. * framed.
    7. * <p>
    8. * Unlike Netty's frame decoder, each frame is dispatched to child handlers as soon as it's
    9. * decoded, instead of building as many frames as the current buffer allows and dispatching
    10. * all of them. This allows a child handler to install an interceptor if needed.
    11. * <p>
    12. * If an interceptor is installed, framing stops, and data is instead fed directly to the
    13. * interceptor. When the interceptor indicates that it doesn't need to read any more data,
    14. * framing resumes. Interceptors should not hold references to the data buffers provided
    15. * to their handle() method.
    16. */
    17. public class TransportFrameDecoder extends ChannelInboundHandlerAdapter {
    18. // ...
    19. }
  • StreamManager:处理 ChunkFetchRequest 和 StreamRequest 请求

    1. /**
    2. * The StreamManager is used to fetch individual chunks from a stream. This is used in
    3. * {@link TransportRequestHandler} in order to respond to fetchChunk() requests. Creation of the
    4. * stream is outside the scope of the transport layer, but a given stream is guaranteed to be read
    5. * by only one client connection, meaning that getChunk() for a particular stream will be called
    6. * serially and that once the connection associated with the stream is closed, that stream will
    7. * never be used again.
    8. */
    9. public abstract class StreamManager {
    10. // ...
    11. }
  • RpcHandler:处理 RpcRequest 和 OneWayMessage 请求

    1. /**
    2. * Handler for sendRPC() messages sent by {@link org.apache.spark.network.client.TransportClient}s.
    3. */
    4. public abstract class RpcHandler {
    5. // ...
    6. }
  • Message:Message 是消息的抽象接口,消息实现类都直接或间接的实现了 RequestMessage 或 ResponseMessage 接口。

    1. /** An on-the-wire transmittable message. */
    2. public interface Message extends Encodable {
    3. // ...
    4. }