描述
Spark主要功能的入口点。SparkContext代表Spark和cluster的连接,可以在集群上创建RDDs,累加器和广播变量。每一个JVM只允许激活一个SparkContext。创建一个新的SparkContext之前,你必须关闭激活的SparkContext。此限制可能最终被消除,更多详细请参考 SPARK-2243。
流程详解
- 创建Spark执行环境SparkEnv
- 创建并初始化SparkUI
- Hadoop相关配置及Executor环境变量的设置
- 注册HeartbeatReceiver
- 创建任务调度TaskScheduler
- 创建和启动DAGScheduler
- TaskScheduler的启动
- 初始化快管理器BlockManager
- 启动测量系统MetricsSystem
- 创建和启动Executor分配管理器ExecutorAllocationManager
- 创建RDD清理器metadataCleaner
- ContextCleaner的创建与启动
- Spark环境更新
- 创建DAGSchedulerSource和BlockManagerSource
- 将SparkContext标记为激活
- 添加shutdown hook
创建Spark执行环境SparkEnv
// This function allows components created by SparkEnv to be mocked in unit tests:private[spark] def createSparkEnv(conf: SparkConf,isLocal: Boolean,listenerBus: LiveListenerBus): SparkEnv = {SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master, conf))}
创建并初始化SparkUI
_ui =if (conf.getBoolean("spark.ui.enabled", true)) {Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "",startTime))} else {// For tests, do not enable the UINone}// Bind the UI before starting the task scheduler to communicate// the bound port to the cluster manager properly_ui.foreach(_.bind())
Hadoop相关配置及Executor环境变量的设置
_hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)// Add each JAR given through the constructorif (jars != null) {jars.foreach(addJar)}if (files != null) {files.foreach(addFile)}_executorMemory = _conf.getOption("spark.executor.memory").orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY"))).orElse(Option(System.getenv("SPARK_MEM")).map(warnSparkMem)).map(Utils.memoryStringToMb).getOrElse(1024)// Convert java options to env vars as a work around// since we can't set env vars directly in sbt.for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {executorEnvs(envKey) = value}Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v =>executorEnvs("SPARK_PREPEND_CLASSES") = v}// The Mesos scheduler backend relies on this environment variable to set executor memory.// TODO: Set this only in the Mesos scheduler.executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"executorEnvs ++= _conf.getExecutorEnvexecutorEnvs("SPARK_USER") = sparkUser
注册HeartbeatReceiver
// We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will// retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)_heartbeatReceiver = env.rpcEnv.setupEndpoint(HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
创建任务调度TaskScheduler
// Create and start the schedulerval (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)_schedulerBackend = sched_taskScheduler = ts
创建和启动DAGScheduler
_dagScheduler = new DAGScheduler(this)_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
TaskScheduler的启动
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's// constructor_taskScheduler.start()
初始化快管理器BlockManager
_applicationId = _taskScheduler.applicationId()_applicationAttemptId = taskScheduler.applicationAttemptId()_conf.set("spark.app.id", _applicationId)if (_conf.getBoolean("spark.ui.reverseProxy", false)) {System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId)}_ui.foreach(_.setAppId(_applicationId))_env.blockManager.initialize(_applicationId)
启动测量系统MetricsSystem
// The metrics system for Driver need to be set spark.app.id to app ID.// So it should start after we get app ID from the task scheduler and set spark.app.id._env.metricsSystem.start()// Attach the driver metrics servlet handler to the web ui after the metrics system is started._env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
创建和启动Executor分配管理器ExecutorAllocationManager
// Optionally scale number of executors dynamically based on workload. Exposed for testing.val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)_executorAllocationManager =if (dynamicAllocationEnabled) {schedulerBackend match {case b: ExecutorAllocationClient =>Some(new ExecutorAllocationManager(schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,_env.blockManager.master))case _ =>None}} else {None}_executorAllocationManager.foreach(_.start())
ContextCleaner的创建与启动
_cleaner =if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {Some(new ContextCleaner(this))} else {None}_cleaner.foreach(_.start())
Spark环境更新
setupAndStartListenerBus()postEnvironmentUpdate()postApplicationStart()
创建DAGSchedulerSource和BlockManagerSource
// Post init_taskScheduler.postStartHook()_env.metricsSystem.registerSource(_dagScheduler.metricsSource)_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))_executorAllocationManager.foreach { e =>_env.metricsSystem.registerSource(e.executorAllocationManagerSource)}
添加shutdown hook
// Make sure the context is stopped if the user forgets about it. This avoids leaving// unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM// is killed, though.logDebug("Adding shutdown hook") // force eager creation of logger_shutdownHookRef = ShutdownHookManager.addShutdownHook(ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>logInfo("Invoking stop() from shutdown hook")try {stop()} catch {case e: Throwable =>logWarning("Ignoring Exception while stopping SparkContext from shutdown hook", e)}}
