本文基于 Spark 3.4.0-SNAPSHOT
概述
作为分布式计算框架,Spark 很多地方涉及到网络通信:
- Spark 各组件间消息互通
- 用户文件与 Jar 包上传
- 节点间的 shuffle 过程
- Block 数据的复制与备份
Spark 1.6 之前,Spark RPC 是基于 Akka 实现的,Akka 是一个基于 Scala 的异步消息框架,但由于 Akka 不适合大文件传输,所以采用 Jetty 实现的 HttpFileServer。但在 Spark 1.6 中移除了 Akka 。
原因概括为
- 很多 Spark 用户也使用 Akka,但是 Akka 不同版本之间无法互相通信,要求用户必须使用跟 Spark 完全一样的 Akka 版本,导致用户无法升级 Akka;
- Spark 的 Akka 配置是针对 Spark 自身来调优的,可能跟用户自己代码中的 Akka 配置冲突;
- Spark 用的 Akka 特性很少,该部分特性很容易自己实现。同时,这部分代码量相比 Akka 来说少很多,debug 比较容易。如果遇到什么 bug,也可以自己马上 fix,不需要等 Akka 上游发布新版本。而且,Spark 升级 Akka 本身又因为第一点会强制要求用户升级他们使用的 Akka,是不现实的。
在 Spark 2.0.0 中也移除了 Jetty,在 Spark 2.0.0 借鉴了 Akka 的设计,重构了基于 Netty 的 RPC 框架体系,其中 RPC 和大文件传输都是使用 Netty。 :::tips 注:
- Akka 是基于 Actor 并发编程模型实现的并发的分布式的框架。Akka 是用 Scala 语言编写的,它提供了Java 和 Scala 两种语言的 API,减少开发人员对并发的细节处理,并保证分布式调用的最终一致性。
- Jetty 是一个开源的 Servlet 容器,它为基于 Java 的 Web 容器,例如 JSP 和 Servlet 提供运行环境。Jetty 是使用 Java 语言编写的,它的 API 以一组 JAR 包的形式发布。开发人员可以将 Jetty 容器实例化成一个对象,可以迅速为一些独立运行的 Java 应用提供网络和 We b连接。
- Netty 是由 Jboss 提供的一个基于 NIO 的客户、服务器端编程框架,使用 Netty 可以确保你快速、简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。
:::
基本概念
下面介绍一些重要概念:
- RpcEnv:RpcEnv 抽象类表示一个 RPC Environment,管理整个 RpcEndpoint 的生命周期,每个 Rpc 端点运行时依赖的环境称之为 RpcEnv;
/*** An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to* receives messages. Then [[RpcEnv]] will process messages sent from [[RpcEndpointRef]] or remote* nodes, and deliver them to corresponding [[RpcEndpoint]]s. For uncaught exceptions caught by* [[RpcEnv]], [[RpcEnv]] will use [[RpcCallContext.sendFailure]] to send exceptions back to the* sender, or logging them if no such sender or `NotSerializableException`.** [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s given name or uri.*/private[spark] abstract class RpcEnv(conf: SparkConf) {// ...}
- NettyRpcEnv:RpcEnv 的唯一实现类
RpcEndpoint:Rpc 端点,Spark 将每个通信实体都称之为一个 Rpc 端点,且都实现 RpcEndpoint 接口,比如 DriverEndpoint、BlockManagerMasterEndpoint,内部根据不同端点的需求,设计不同的消息和不同的业务处理
/*** An end point for the RPC that defines what functions to trigger given a message.** It is guaranteed that `onStart`, `receive` and `onStop` will be called in sequence.** The life-cycle of an endpoint is:** {@code constructor -> onStart -> receive* -> onStop}** Note: `receive` can be called concurrently. If you want `receive` to be thread-safe, please use* [[ThreadSafeRpcEndpoint]]** If any error is thrown from one of [[RpcEndpoint]] methods except `onError`, `onError` will be* invoked with the cause. If `onError` throws an error, [[RpcEnv]] will ignore it.*/private[spark] trait RpcEndpoint {// ...}
Dispatcher:消息分发器(来自 Netty 的概念),负责将 RpcMessage 分发至对应的 RpcEndpoint。Dispatcher 中包含一个 MessageLoop,它读取 LinkedBlockingQueue 中投递 RpcMessage,根据客户端指定的 Endpoint 标识,找到 Endpoint 的 Inbox,然后投递进去,由于是阻塞队列,当没有消息时自然阻塞,一旦有消息,就开始工作。Dispatcher 的 ThreadPool 负责消费这些 Message。
/*** A message dispatcher, responsible for routing RPC messages to the appropriate endpoint(s).** @param numUsableCores Number of CPU cores allocated to the process, for sizing the thread pool.* If 0, will consider the available CPUs on the host.*/private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) extends Logging {// ...}
Inbox:一个本地端点对应一个收件箱,Inbox 里面有一个 InboxMessage 的链表,InboxMessage 有很多子类:
- RpcMessage:远程调用
- OneWayMessage:远程调用过来的 fire-and-forget 单向消息
- onStart、onStop:服务启动、链路断开
/*** An inbox that stores messages for an [[RpcEndpoint]] and posts messages to it thread-safely.*/private[netty] class Inbox(val endpointName: String, val endpoint: RpcEndpoint)extends Logging {// ...}
RpcEndPointRef:RpcEndpointRef 是一个对 RpcEndpoint 的远程引用对象,通过它可以向远程的 RpcEndpoint 端发送消息以进行通信。
/*** A reference for a remote [[RpcEndpoint]]. [[RpcEndpointRef]] is thread-safe.*/private[spark] abstract class RpcEndpointRef(conf: SparkConf)extends Serializable with Logging {// ...}
NettyRpcEndpointRef:RpcEndpointRef 的实现类
- 在拥有 RpcEndpoint 节点上,它是一个围绕 RpcEndpointAddress 实例的简单包装
- 在其他收到引用的序列化机器上,它会跟踪发送引用的 TransportClient,因此到端点的消息将通过客户端连接发送,而不是需要打开一个新的连接
/*** The NettyRpcEnv version of RpcEndpointRef.** This class behaves differently depending on where it's created. On the node that "owns" the* RpcEndpoint, it's a simple wrapper around the RpcEndpointAddress instance.** On other machines that receive a serialized version of the reference, the behavior changes. The* instance will keep track of the TransportClient that sent the reference, so that messages* to the endpoint are sent over the client connection, instead of needing a new connection to* be opened.** The RpcAddress of this ref can be null; what that means is that the ref can only be used through* a client connection, since the process hosting the endpoint is not listening for incoming* connections. These refs should not be shared with 3rd parties, since they will not be able to* send messages to the endpoint.** @param conf Spark configuration.* @param endpointAddress The address where the endpoint is listening.* @param nettyEnv The RpcEnv associated with this ref.*/private[netty] class NettyRpcEndpointRef(@transient private val conf: SparkConf,private val endpointAddress: RpcEndpointAddress,@transient @volatile private var nettyEnv: NettyRpcEnv) extends RpcEndpointRef(conf) {// ...}
RpcEndpointAddress:主要包含了 RpcAddress (host 和 port)和 Rpc Endpoint Name 信息
- Outbox:一个远程端点对应一个发件箱,NettyRpcEnv 中包含一个 ConcurrentHashMap[RpcAddress, Outbox]。当消息放入 Outbox 后,紧接着将消息通过 TransportClient 发送出去。
- TransportContext:
- 创建 TransportServer、TransportClientFactory
- 使用 TransportChannelHandler 建立 Netty channel pipeline 的上下文
TransportClient 提供了两种通信协议:控制层面的 RPC 以及数据层面的 chunk 抓取。用户通过构造方法传入的 rpcHandler 负责处理 RPC 请求。并且 rpcHandler 负责设置流,这些流可以使用零拷贝 IO 以数据块的形式流式传输。TransportServer 和 TransportClientFactory 都为每一个 channel 创建一个 TransportChannelHandler 对象。每一个 TransportChannelHandler 包含一个 TransportClient,这使服务器进程能够在现有通道上将消息发送回客户端。
/*** Contains the context to create a {@link TransportServer}, {@link TransportClientFactory}, and to* setup Netty Channel pipelines with a* {@link org.apache.spark.network.server.TransportChannelHandler}.** There are two communication protocols that the TransportClient provides, control-plane RPCs and* data-plane "chunk fetching". The handling of the RPCs is performed outside of the scope of the* TransportContext (i.e., by a user-provided handler), and it is responsible for setting up streams* which can be streamed through the data plane in chunks using zero-copy IO.** The TransportServer and TransportClientFactory both create a TransportChannelHandler for each* channel. As each TransportChannelHandler contains a TransportClient, this enables server* processes to send messages back to the client on an existing channel.*/public class TransportContext implements Closeable {// ...}
- TransportServer:TransportServer是RPC框架的服务端,可提供高效的、低级别的流服务。
TransportServerBootstrap:定义了服务端引导程序的规范,服务端引导程序旨在当客户端与服务端建立连接之后,在服务端持有的客户端管道上执行的引导程序。用于初始化TransportServer
/*** A bootstrap which is executed on a TransportServer's client channel once a client connects* to the server. This allows customizing the client channel to allow for things such as SASL* authentication.*/public interface TransportServerBootstrap {// ...}
TransportClientFactory:创建传输客户端(TransportClient)的传输客户端工厂类。工厂维持着一个连接池,一个远端 host 对应一个客户端。也可以让所有的客户端共享一个工作线程池。
/*** Factory for creating {@link TransportClient}s by using createClient.** The factory maintains a connection pool to other hosts and should return the same* TransportClient for the same remote host. It also shares a single worker thread pool for* all TransportClients.** TransportClients will be reused whenever possible. Prior to completing the creation of a new* TransportClient, all given {@link TransportClientBootstrap}s will be run.*/public class TransportClientFactory implements Closeable {// ...}
TransportClient:RPC 框架的客户端,用于获取预先协商好的流中的连续块。TransportClient 旨在允许有效传输大量数据,这些数据将被拆分成几百 KB 到几 MB 的块。简言之,可以认为 TransportClient 就是 Spark Rpc 最底层的基础客户端类。主要用于向 server 端发送 rpc 请求和从 server 端获取流的 chunk 块。
/*** Client for fetching consecutive chunks of a pre-negotiated stream. This API is intended to allow* efficient transfer of a large amount of data, broken up into chunks with size ranging from* hundreds of KB to a few MB.** Note that while this client deals with the fetching of chunks from a stream (i.e., data plane),* the actual setup of the streams is done outside the scope of the transport layer. The convenience* method "sendRPC" is provided to enable control plane communication between the client and server* to perform this setup.** For example, a typical workflow might be:* client.sendRPC(new OpenFile("/foo")) --> returns StreamId = 100* client.fetchChunk(streamId = 100, chunkIndex = 0, callback)* client.fetchChunk(streamId = 100, chunkIndex = 1, callback)* ...* client.sendRPC(new CloseStream(100))** Construct an instance of TransportClient using {@link TransportClientFactory}. A single* TransportClient may be used for multiple streams, but any given stream must be restricted to a* single client, in order to avoid out-of-order responses.** NB: This class is used to make requests to the server, while {@link TransportResponseHandler} is* responsible for handling responses from the server.** Concurrency: thread safe and can be called from multiple threads.*/public class TransportClient implements Closeable {// ...}
TransportClientBootstrap:是在TransportClient上执行的客户端引导程序,主要对连接建立时进行一些初始化的准备(例如验证、加密)。TransportClientBootstrap所作的操作往往是昂贵的,好在建立的连接可以重用。用于初始化TransportClient。
/*** A bootstrap which is executed on a TransportClient before it is returned to the user.* This enables an initial exchange of information (e.g., SASL authentication tokens) on a once-per-* connection basis.** Since connections (and TransportClients) are reused as much as possible, it is generally* reasonable to perform an expensive bootstrapping operation, as they often share a lifespan with* the JVM itself.*/public interface TransportClientBootstrap {// ...}
TransportChannelHandler:用于分发 request 到 TransportRequestHandler,分发 response 到 TransportResponseHandler。这是因为全双工通信通道,所以无论是客户端还是服务端,都需要处理 request 与 response。
/*** The single Transport-level Channel handler which is used for delegating requests to the* {@link TransportRequestHandler} and responses to the {@link TransportResponseHandler}.** All channels created in the transport layer are bidirectional. When the Client initiates a Netty* Channel with a RequestMessage (which gets handled by the Server's RequestHandler), the Server* will produce a ResponseMessage (handled by the Client's ResponseHandler). However, the Server* also gets a handle on the same Channel, so it may then begin to send RequestMessages to the* Client.* This means that the Client also needs a RequestHandler and the Server needs a ResponseHandler,* for the Client's responses to the Server's requests.** This class also handles timeouts from a {@link io.netty.handler.timeout.IdleStateHandler}.* We consider a connection timed out if there are outstanding fetch or RPC requests but no traffic* on the channel for at least `requestTimeoutMs`. Note that this is duplex traffic; we will not* timeout if the client is continuously sending but getting no responses, for simplicity.*/public class TransportChannelHandler extends SimpleChannelInboundHandler<Message> {// ...}
TransportResponseHandler:用于处理服务端的响应,并且对发出请求的客户端进行响应的处理程序。
/*** Handler that processes server responses, in response to requests issued from a* [[TransportClient]]. It works by tracking the list of outstanding requests (and their callbacks).** Concurrency: thread safe and can be called from multiple threads.*/public class TransportResponseHandler extends MessageHandler<ResponseMessage> {// ...}
TransportRequestHandler:用于处理客户端的请求并在写完块数据后返回的处理程序。
/*** A handler that processes requests from clients and writes chunk data back. Each handler is* attached to a single Netty channel, and keeps track of which streams have been fetched via this* channel, in order to clean them up if the channel is terminated (see #channelUnregistered).** The messages should have been processed by the pipeline setup by {@link TransportServer}.*/public class TransportRequestHandler extends MessageHandler<RequestMessage> {// ...}
MessageEncoder:�在将消息放入管道前,先对消息内容进行编码,防止管道另一端读取时丢包和解析错误。
/*** Encoder used by the server side to encode server-to-client responses.* This encoder is stateless so it is safe to be shared by multiple threads.*/@ChannelHandler.Sharablepublic final class MessageEncoder extends MessageToMessageEncoder<Message> {// ...}
MessageDecoder:对从管道中读取的 ByteBuf 进行解析,防止丢包和解析错误。
/*** Decoder used by the client side to encode server-to-client responses.* This encoder is stateless so it is safe to be shared by multiple threads.*/@ChannelHandler.Sharablepublic final class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {// ...}
TransportFrameDecoder:对从管道中读取的ByteBuf按照数据帧进行解析
/*** A customized frame decoder that allows intercepting raw data.* <p>* This behaves like Netty's frame decoder (with hard coded parameters that match this library's* needs), except it allows an interceptor to be installed to read data directly before it's* framed.* <p>* Unlike Netty's frame decoder, each frame is dispatched to child handlers as soon as it's* decoded, instead of building as many frames as the current buffer allows and dispatching* all of them. This allows a child handler to install an interceptor if needed.* <p>* If an interceptor is installed, framing stops, and data is instead fed directly to the* interceptor. When the interceptor indicates that it doesn't need to read any more data,* framing resumes. Interceptors should not hold references to the data buffers provided* to their handle() method.*/public class TransportFrameDecoder extends ChannelInboundHandlerAdapter {// ...}
StreamManager:处理 ChunkFetchRequest 和 StreamRequest 请求
/*** The StreamManager is used to fetch individual chunks from a stream. This is used in* {@link TransportRequestHandler} in order to respond to fetchChunk() requests. Creation of the* stream is outside the scope of the transport layer, but a given stream is guaranteed to be read* by only one client connection, meaning that getChunk() for a particular stream will be called* serially and that once the connection associated with the stream is closed, that stream will* never be used again.*/public abstract class StreamManager {// ...}
RpcHandler:处理 RpcRequest 和 OneWayMessage 请求
/*** Handler for sendRPC() messages sent by {@link org.apache.spark.network.client.TransportClient}s.*/public abstract class RpcHandler {// ...}
Message:Message 是消息的抽象接口,消息实现类都直接或间接的实现了 RequestMessage 或 ResponseMessage 接口。
/** An on-the-wire transmittable message. */public interface Message extends Encodable {// ...}
