描述
保存正在运行的Spark实例的所有运行时环境对象(master或者worker),包括序列化程序,RpcEnv,块管理器,map output tracker,等等。目前Spark代码通过全局变量找到SparkEnv,因此所有线程都可以访问到同一个SparkEnv。可以通过SparkEnv.get获取(在SparkContext创建之后)。注释:这不是外部使用的。在未来的版本中可能会变为私有。
主要函数
- createDriverEnv
- 创建driver的SparkEnv
- createExecutorEnv
- 创建executor的SparkEnv
- 在粗粒度模式下,executor提供已经实例化的RpvEnv
- create
- 为driver或者executor创建SparkEnv的辅助方法。
create函数流程详解
- 创建安全管理器SecurityManager
- 创建rpcEnv
- 创建序列化管理器serializerManager
- 创建广播管理器broadcastManager
- 创建map任务输出跟踪器mapOutputTracker
- 创建ShuffleManager
- 创建memoryManager
- 创建blockTransferService
- 创建blockManagerMaster
- 创建blockManager
- 创建metricsSystem
- 创建outputCommitCoordinator
- 创建outputCommitCoordinatorRef
- 创建envInstance= new SparkEnv
创建安全管理器SecurityManager
// Set our own authenticator to properly negotiate user/password for HTTP connections.// This is needed by the HTTP client fetching from the HttpServer. Put here so its// only set once.if (authOn) {Authenticator.setDefault(new Authenticator() {override def getPasswordAuthentication(): PasswordAuthentication = {var passAuth: PasswordAuthentication = nullval userInfo = getRequestingURL().getUserInfo()if (userInfo != null) {val parts = userInfo.split(":", 2)passAuth = new PasswordAuthentication(parts(0), parts(1).toCharArray())}return passAuth}})}
创建rpcEnv
创建序列化管理器serializerManager
为各种Spark组件配置序列化,压缩和加密的组件,包括自动选择哪种序列化用于shuffles。
创建广播管理器broadcastManager
详细实现参考org.apache.spark.broadcast.TorrentBroadcast
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)// Called by SparkContext or Executor before using Broadcastprivate def initialize() {synchronized {if (!initialized) {broadcastFactory = new TorrentBroadcastFactorybroadcastFactory.initialize(isDriver, conf, securityManager)initialized = true}}}
创建map任务输出跟踪器mapOutputTracker
MapOutputTrackerMaster
Driver端的类,用于跟踪stage的map output的地址。
DAGScheduler使用此类进行注册或者取消map output statuses,并查找统计信息用于执行本地感知,减少任务调度。
ShuffleMapStage使用此类跟踪可用或者丢失的outputs,用于决定哪些tasks需要重跑。
// HashMap for storing shuffleStatuses in the driver.// Statuses are dropped only by explicit de-registering.// Exposed for testing//保存每一个shuffle对应的状态val shuffleStatuses = new ConcurrentHashMap[Int, ShuffleStatus]().asScala//保存每一个partition的MapStatusShuffleStatus.mapStatuses = new Array[MapStatus](numPartitions)// 保存获取map output status的请求private val mapOutputRequests = new LinkedBlockingQueue[GetMapOutputMessage]// 处理map output status 请求的线程池private val threadpool: ThreadPoolExecutor = {val numThreads = conf.getInt("spark.shuffle.mapOutput.dispatcher.numThreads", 8)val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "map-output-dispatcher")for (i <- 0 until numThreads) {pool.execute(new MessageLoop)}pool}/** Message loop used for dispatching messages. */private class MessageLoop
MapOutputTrackerWorker
Executor客户端,用于从driver的MapOutTrackerMaster获取map output信息。请注意,这不用于本地模式,相反,本地模式的Executors可以直接访问MapOutoutTrackerMaster(这可能是因为master和worker共享一超类)
//注册获取获取MapOutputTrackerMasterEndpoint//driver注册,executor获取def registerOrLookupEndpoint(name: String, endpointCreator: => RpcEndpoint):RpcEndpointRef = {if (isDriver) {logInfo("Registering " + name)rpcEnv.setupEndpoint(name, endpointCreator)} else {RpcUtils.makeDriverRef(name, conf, rpcEnv)}}
创建ShuffleManager
// Let the user specify short names for shuffle managersval shortShuffleMgrNames = Map("sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,"tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")val shuffleMgrClass =shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
ShuffleManager
shuffle系统的可插拔接口。 ShuffleManager在driver和每一个executor的SparkRnv中创建,基于spark.shuffle.manager.setting.driver使用它注册shuffles,executors用于读写数据。
创建memoryManager
val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)val memoryManager: MemoryManager =if (useLegacyMemoryManager) {new StaticMemoryManager(conf, numUsableCores)} else {UnifiedMemoryManager(conf, numUsableCores)}
抽象内存管理器,强制执行和存储之间如何共享内存。这种情况下,执行内存是指用于shuffles计算,joins,排序和聚合中进行计算的内存,存储内存是指用于在整个集群中缓存和传播内部数据的内存。出每一个JVM有一个内存管理器。
详情参考子类:StaticMemoryManager和UnifiedMemoryManager
创建blockTransferService
使用NettyTransferService,它使用Netty提供的异步事件驱动的网络应用框架,提供Web服务及客户端,获取远程节点上的Block的集合。
val blockTransferService =new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,blockManagerPort, numUsableCores)
创建blockManagerMaster
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(BlockManagerMaster.DRIVER_ENDPOINT_NAME,new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),conf, isDriver)
创建blockManager
在(driver和executors)的每个节点上运行的管理器,它提供了接口用于存放和检索本地和远程各种形式存储的(memory,disk,off-heap)块。
注意:在使用BlockManager之前,必须调用initialize()方法.
// NB: blockManager is not valid until initialize() is called later.val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,blockTransferService, securityManager, numUsableCores)
创建metricsSystem
创建测量系统
val metricsSystem = if (isDriver) {// Don't start metrics system right now for Driver.// We need to wait for the task scheduler to give us an app ID.// Then we can start the metrics system.MetricsSystem.createMetricsSystem("driver", conf, securityManager)} else {// We need to set the executor ID before the MetricsSystem is created because sources and// sinks specified in the metrics configuration file will want to incorporate this executor's// ID into the metrics they report.conf.set("spark.executor.id", executorId)val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)ms.start()ms}
Spark测量系统,由特定的实例创建,由source,sink组合,定期轮询源指标数据到sink目的地。
//初始化metricsConfig.initialize()//注册sources//注册sinks//启用sinksdef start() {require(!running, "Attempting to start a MetricsSystem that is already running")running = trueStaticSources.allSources.foreach(registerSource)registerSources()registerSinks()sinks.foreach(_.start)}
创建outputCommitCoordinator
决定任务是否可以提交输出到HDFS,使用”first committer wins”策略。在driver和executors都进行了实例化。
在executors,配置了driver端的OutputCommitCoordinatorEndpoint的引用,所以它的请求提交输出,将会转发给driver的OutputCommitCoordinator。SPARK-4879中引入了此类,有关更广泛的设计讨论请参考JIRA问题(以及相关的pull请求)
val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {new OutputCommitCoordinator(conf, isDriver)}
创建outputCommitCoordinatorRef
同理:driver端创建,executors端持有引用。
val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)
