描述
保存正在运行的Spark实例的所有运行时环境对象(master或者worker),包括序列化程序,RpcEnv,块管理器,map output tracker,等等。目前Spark代码通过全局变量找到SparkEnv,因此所有线程都可以访问到同一个SparkEnv。可以通过SparkEnv.get获取(在SparkContext创建之后)。注释:这不是外部使用的。在未来的版本中可能会变为私有。
主要函数
- createDriverEnv
- 创建driver的SparkEnv
- createExecutorEnv
- 创建executor的SparkEnv
- 在粗粒度模式下,executor提供已经实例化的RpvEnv
- create
- 为driver或者executor创建SparkEnv的辅助方法。
create函数流程详解
- 创建安全管理器SecurityManager
- 创建rpcEnv
- 创建序列化管理器serializerManager
- 创建广播管理器broadcastManager
- 创建map任务输出跟踪器mapOutputTracker
- 创建ShuffleManager
- 创建memoryManager
- 创建blockTransferService
- 创建blockManagerMaster
- 创建blockManager
- 创建metricsSystem
- 创建outputCommitCoordinator
- 创建outputCommitCoordinatorRef
- 创建envInstance= new SparkEnv
创建安全管理器SecurityManager
// Set our own authenticator to properly negotiate user/password for HTTP connections.
// This is needed by the HTTP client fetching from the HttpServer. Put here so its
// only set once.
if (authOn) {
Authenticator.setDefault(
new Authenticator() {
override def getPasswordAuthentication(): PasswordAuthentication = {
var passAuth: PasswordAuthentication = null
val userInfo = getRequestingURL().getUserInfo()
if (userInfo != null) {
val parts = userInfo.split(":", 2)
passAuth = new PasswordAuthentication(parts(0), parts(1).toCharArray())
}
return passAuth
}
}
)
}
创建rpcEnv
创建序列化管理器serializerManager
为各种Spark组件配置序列化,压缩和加密的组件,包括自动选择哪种序列化用于shuffles。
创建广播管理器broadcastManager
详细实现参考org.apache.spark.broadcast.TorrentBroadcast
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
// Called by SparkContext or Executor before using Broadcast
private def initialize() {
synchronized {
if (!initialized) {
broadcastFactory = new TorrentBroadcastFactory
broadcastFactory.initialize(isDriver, conf, securityManager)
initialized = true
}
}
}
创建map任务输出跟踪器mapOutputTracker
MapOutputTrackerMaster
Driver端的类,用于跟踪stage的map output的地址。
DAGScheduler使用此类进行注册或者取消map output statuses,并查找统计信息用于执行本地感知,减少任务调度。
ShuffleMapStage使用此类跟踪可用或者丢失的outputs,用于决定哪些tasks需要重跑。
// HashMap for storing shuffleStatuses in the driver.
// Statuses are dropped only by explicit de-registering.
// Exposed for testing
//保存每一个shuffle对应的状态
val shuffleStatuses = new ConcurrentHashMap[Int, ShuffleStatus]().asScala
//保存每一个partition的MapStatus
ShuffleStatus.mapStatuses = new Array[MapStatus](numPartitions)
// 保存获取map output status的请求
private val mapOutputRequests = new LinkedBlockingQueue[GetMapOutputMessage]
// 处理map output status 请求的线程池
private val threadpool: ThreadPoolExecutor = {
val numThreads = conf.getInt("spark.shuffle.mapOutput.dispatcher.numThreads", 8)
val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "map-output-dispatcher")
for (i <- 0 until numThreads) {
pool.execute(new MessageLoop)
}
pool
}
/** Message loop used for dispatching messages. */
private class MessageLoop
MapOutputTrackerWorker
Executor客户端,用于从driver的MapOutTrackerMaster获取map output信息。请注意,这不用于本地模式,相反,本地模式的Executors可以直接访问MapOutoutTrackerMaster(这可能是因为master和worker共享一超类)
//注册获取获取MapOutputTrackerMasterEndpoint
//driver注册,executor获取
def registerOrLookupEndpoint(
name: String, endpointCreator: => RpcEndpoint):
RpcEndpointRef = {
if (isDriver) {
logInfo("Registering " + name)
rpcEnv.setupEndpoint(name, endpointCreator)
} else {
RpcUtils.makeDriverRef(name, conf, rpcEnv)
}
}
创建ShuffleManager
// Let the user specify short names for shuffle managers
val shortShuffleMgrNames = Map(
"sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
"tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
val shuffleMgrClass =
shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
ShuffleManager
shuffle系统的可插拔接口。 ShuffleManager在driver和每一个executor的SparkRnv中创建,基于spark.shuffle.manager.setting.driver使用它注册shuffles,executors用于读写数据。
创建memoryManager
val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
val memoryManager: MemoryManager =
if (useLegacyMemoryManager) {
new StaticMemoryManager(conf, numUsableCores)
} else {
UnifiedMemoryManager(conf, numUsableCores)
}
抽象内存管理器,强制执行和存储之间如何共享内存。这种情况下,执行内存是指用于shuffles计算,joins,排序和聚合中进行计算的内存,存储内存是指用于在整个集群中缓存和传播内部数据的内存。出每一个JVM有一个内存管理器。
详情参考子类:StaticMemoryManager和UnifiedMemoryManager
创建blockTransferService
使用NettyTransferService,它使用Netty提供的异步事件驱动的网络应用框架,提供Web服务及客户端,获取远程节点上的Block的集合。
val blockTransferService =
new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
blockManagerPort, numUsableCores)
创建blockManagerMaster
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_ENDPOINT_NAME,
new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
conf, isDriver)
创建blockManager
在(driver和executors)的每个节点上运行的管理器,它提供了接口用于存放和检索本地和远程各种形式存储的(memory,disk,off-heap)块。
注意:在使用BlockManager之前,必须调用initialize()方法.
// NB: blockManager is not valid until initialize() is called later.
val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
blockTransferService, securityManager, numUsableCores)
创建metricsSystem
创建测量系统
val metricsSystem = if (isDriver) {
// Don't start metrics system right now for Driver.
// We need to wait for the task scheduler to give us an app ID.
// Then we can start the metrics system.
MetricsSystem.createMetricsSystem("driver", conf, securityManager)
} else {
// We need to set the executor ID before the MetricsSystem is created because sources and
// sinks specified in the metrics configuration file will want to incorporate this executor's
// ID into the metrics they report.
conf.set("spark.executor.id", executorId)
val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
ms.start()
ms
}
Spark测量系统,由特定的实例创建,由source,sink组合,定期轮询源指标数据到sink目的地。
//初始化
metricsConfig.initialize()
//注册sources
//注册sinks
//启用sinks
def start() {
require(!running, "Attempting to start a MetricsSystem that is already running")
running = true
StaticSources.allSources.foreach(registerSource)
registerSources()
registerSinks()
sinks.foreach(_.start)
}
创建outputCommitCoordinator
决定任务是否可以提交输出到HDFS,使用”first committer wins”策略。在driver和executors都进行了实例化。
在executors,配置了driver端的OutputCommitCoordinatorEndpoint的引用,所以它的请求提交输出,将会转发给driver的OutputCommitCoordinator。SPARK-4879中引入了此类,有关更广泛的设计讨论请参考JIRA问题(以及相关的pull请求)
val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
new OutputCommitCoordinator(conf, isDriver)
}
创建outputCommitCoordinatorRef
同理:driver端创建,executors端持有引用。
val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)