描述

保存正在运行的Spark实例的所有运行时环境对象(master或者worker),包括序列化程序,RpcEnv,块管理器,map output tracker,等等。目前Spark代码通过全局变量找到SparkEnv,因此所有线程都可以访问到同一个SparkEnv。可以通过SparkEnv.get获取(在SparkContext创建之后)。注释:这不是外部使用的。在未来的版本中可能会变为私有。

主要函数

  1. createDriverEnv
    • 创建driver的SparkEnv
  2. createExecutorEnv
    • 创建executor的SparkEnv
    • 在粗粒度模式下,executor提供已经实例化的RpvEnv
  3. create
    • 为driver或者executor创建SparkEnv的辅助方法。

_

create函数流程详解

  1. 创建安全管理器SecurityManager
  2. 创建rpcEnv
  3. 创建序列化管理器serializerManager
  4. 创建广播管理器broadcastManager
  5. 创建map任务输出跟踪器mapOutputTracker
  6. 创建ShuffleManager
  7. 创建memoryManager
  8. 创建blockTransferService
  9. 创建blockManagerMaster
  10. 创建blockManager
  11. 创建metricsSystem
  12. 创建outputCommitCoordinator
  13. 创建outputCommitCoordinatorRef
  14. 创建envInstance= new SparkEnv


创建安全管理器SecurityManager

  1. // Set our own authenticator to properly negotiate user/password for HTTP connections.
  2. // This is needed by the HTTP client fetching from the HttpServer. Put here so its
  3. // only set once.
  4. if (authOn) {
  5. Authenticator.setDefault(
  6. new Authenticator() {
  7. override def getPasswordAuthentication(): PasswordAuthentication = {
  8. var passAuth: PasswordAuthentication = null
  9. val userInfo = getRequestingURL().getUserInfo()
  10. if (userInfo != null) {
  11. val parts = userInfo.split(":", 2)
  12. passAuth = new PasswordAuthentication(parts(0), parts(1).toCharArray())
  13. }
  14. return passAuth
  15. }
  16. }
  17. )
  18. }

创建rpcEnv

SparkEnv - 图1

创建序列化管理器serializerManager

为各种Spark组件配置序列化,压缩和加密的组件,包括自动选择哪种序列化用于shuffles。

创建广播管理器broadcastManager

详细实现参考org.apache.spark.broadcast.TorrentBroadcast

  1. val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
  2. // Called by SparkContext or Executor before using Broadcast
  3. private def initialize() {
  4. synchronized {
  5. if (!initialized) {
  6. broadcastFactory = new TorrentBroadcastFactory
  7. broadcastFactory.initialize(isDriver, conf, securityManager)
  8. initialized = true
  9. }
  10. }
  11. }

创建map任务输出跟踪器mapOutputTracker

MapOutputTrackerMaster
Driver端的类,用于跟踪stage的map output的地址。
DAGScheduler使用此类进行注册或者取消map output statuses,并查找统计信息用于执行本地感知,减少任务调度。
ShuffleMapStage使用此类跟踪可用或者丢失的outputs,用于决定哪些tasks需要重跑。

  1. // HashMap for storing shuffleStatuses in the driver.
  2. // Statuses are dropped only by explicit de-registering.
  3. // Exposed for testing
  4. //保存每一个shuffle对应的状态
  5. val shuffleStatuses = new ConcurrentHashMap[Int, ShuffleStatus]().asScala
  6. //保存每一个partition的MapStatus
  7. ShuffleStatus.mapStatuses = new Array[MapStatus](numPartitions)
  8. // 保存获取map output status的请求
  9. private val mapOutputRequests = new LinkedBlockingQueue[GetMapOutputMessage]
  10. // 处理map output status 请求的线程池
  11. private val threadpool: ThreadPoolExecutor = {
  12. val numThreads = conf.getInt("spark.shuffle.mapOutput.dispatcher.numThreads", 8)
  13. val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "map-output-dispatcher")
  14. for (i <- 0 until numThreads) {
  15. pool.execute(new MessageLoop)
  16. }
  17. pool
  18. }
  19. /** Message loop used for dispatching messages. */
  20. private class MessageLoop

MapOutputTrackerWorker
Executor客户端,用于从driver的MapOutTrackerMaster获取map output信息。请注意,这不用于本地模式,相反,本地模式的Executors可以直接访问MapOutoutTrackerMaster(这可能是因为master和worker共享一超类)

  1. //注册获取获取MapOutputTrackerMasterEndpoint
  2. //driver注册,executor获取
  3. def registerOrLookupEndpoint(
  4. name: String, endpointCreator: => RpcEndpoint):
  5. RpcEndpointRef = {
  6. if (isDriver) {
  7. logInfo("Registering " + name)
  8. rpcEnv.setupEndpoint(name, endpointCreator)
  9. } else {
  10. RpcUtils.makeDriverRef(name, conf, rpcEnv)
  11. }
  12. }

创建ShuffleManager

  1. // Let the user specify short names for shuffle managers
  2. val shortShuffleMgrNames = Map(
  3. "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
  4. "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
  5. val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
  6. val shuffleMgrClass =
  7. shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
  8. val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

ShuffleManager
shuffle系统的可插拔接口。 ShuffleManager在driver和每一个executor的SparkRnv中创建,基于spark.shuffle.manager.setting.driver使用它注册shuffles,executors用于读写数据。

详情参考子类:SortShuffleManager

创建memoryManager

  1. val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
  2. val memoryManager: MemoryManager =
  3. if (useLegacyMemoryManager) {
  4. new StaticMemoryManager(conf, numUsableCores)
  5. } else {
  6. UnifiedMemoryManager(conf, numUsableCores)
  7. }

抽象内存管理器,强制执行和存储之间如何共享内存。这种情况下,执行内存是指用于shuffles计算,joins,排序和聚合中进行计算的内存,存储内存是指用于在整个集群中缓存和传播内部数据的内存。出每一个JVM有一个内存管理器。
详情参考子类:StaticMemoryManager和UnifiedMemoryManager

创建blockTransferService

使用NettyTransferService,它使用Netty提供的异步事件驱动的网络应用框架,提供Web服务及客户端,获取远程节点上的Block的集合。

  1. val blockTransferService =
  2. new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
  3. blockManagerPort, numUsableCores)

创建blockManagerMaster

  1. val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
  2. BlockManagerMaster.DRIVER_ENDPOINT_NAME,
  3. new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
  4. conf, isDriver)

创建blockManager

在(driver和executors)的每个节点上运行的管理器,它提供了接口用于存放和检索本地和远程各种形式存储的(memory,disk,off-heap)块。
注意:在使用BlockManager之前,必须调用initialize()方法.

  1. // NB: blockManager is not valid until initialize() is called later.
  2. val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
  3. serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
  4. blockTransferService, securityManager, numUsableCores)

创建metricsSystem

创建测量系统

  1. val metricsSystem = if (isDriver) {
  2. // Don't start metrics system right now for Driver.
  3. // We need to wait for the task scheduler to give us an app ID.
  4. // Then we can start the metrics system.
  5. MetricsSystem.createMetricsSystem("driver", conf, securityManager)
  6. } else {
  7. // We need to set the executor ID before the MetricsSystem is created because sources and
  8. // sinks specified in the metrics configuration file will want to incorporate this executor's
  9. // ID into the metrics they report.
  10. conf.set("spark.executor.id", executorId)
  11. val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
  12. ms.start()
  13. ms
  14. }

Spark测量系统,由特定的实例创建,由source,sink组合,定期轮询源指标数据到sink目的地。

  1. //初始化
  2. metricsConfig.initialize()
  3. //注册sources
  4. //注册sinks
  5. //启用sinks
  6. def start() {
  7. require(!running, "Attempting to start a MetricsSystem that is already running")
  8. running = true
  9. StaticSources.allSources.foreach(registerSource)
  10. registerSources()
  11. registerSinks()
  12. sinks.foreach(_.start)
  13. }

创建outputCommitCoordinator

决定任务是否可以提交输出到HDFS,使用”first committer wins”策略。在driver和executors都进行了实例化。
在executors,配置了driver端的OutputCommitCoordinatorEndpoint的引用,所以它的请求提交输出,将会转发给driver的OutputCommitCoordinator。SPARK-4879中引入了此类,有关更广泛的设计讨论请参考JIRA问题(以及相关的pull请求)

  1. val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
  2. new OutputCommitCoordinator(conf, isDriver)
  3. }

创建outputCommitCoordinatorRef

同理:driver端创建,executors端持有引用。

  1. val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
  2. new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
  3. outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)

创建SparkEnv实例

SparkEnv - 图2