描述

Spark主要功能的入口点。SparkContext代表Spark和cluster的连接,可以在集群上创建RDDs,累加器和广播变量。每一个JVM只允许激活一个SparkContext。创建一个新的SparkContext之前,你必须关闭激活的SparkContext。此限制可能最终被消除,更多详细请参考 SPARK-2243。 SparkContext - 图1

流程详解

  1. 创建Spark执行环境SparkEnv
  2. 创建并初始化SparkUI
  3. Hadoop相关配置及Executor环境变量的设置
  4. 注册HeartbeatReceiver
  5. 创建任务调度TaskScheduler
  6. 创建和启动DAGScheduler
  7. TaskScheduler的启动
  8. 初始化快管理器BlockManager
  9. 启动测量系统MetricsSystem
  10. 创建和启动Executor分配管理器ExecutorAllocationManager
  11. 创建RDD清理器metadataCleaner
  12. ContextCleaner的创建与启动
  13. Spark环境更新
  14. 创建DAGSchedulerSource和BlockManagerSource
  15. 将SparkContext标记为激活
  16. 添加shutdown hook

创建Spark执行环境SparkEnv

  1. // This function allows components created by SparkEnv to be mocked in unit tests:
  2. private[spark] def createSparkEnv(
  3. conf: SparkConf,
  4. isLocal: Boolean,
  5. listenerBus: LiveListenerBus): SparkEnv = {
  6. SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master, conf))
  7. }

创建并初始化SparkUI

  1. _ui =
  2. if (conf.getBoolean("spark.ui.enabled", true)) {
  3. Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "",
  4. startTime))
  5. } else {
  6. // For tests, do not enable the UI
  7. None
  8. }
  9. // Bind the UI before starting the task scheduler to communicate
  10. // the bound port to the cluster manager properly
  11. _ui.foreach(_.bind())

Hadoop相关配置及Executor环境变量的设置

  1. _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
  2. // Add each JAR given through the constructor
  3. if (jars != null) {
  4. jars.foreach(addJar)
  5. }
  6. if (files != null) {
  7. files.foreach(addFile)
  8. }
  9. _executorMemory = _conf.getOption("spark.executor.memory")
  10. .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
  11. .orElse(Option(System.getenv("SPARK_MEM"))
  12. .map(warnSparkMem))
  13. .map(Utils.memoryStringToMb)
  14. .getOrElse(1024)
  15. // Convert java options to env vars as a work around
  16. // since we can't set env vars directly in sbt.
  17. for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))
  18. value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
  19. executorEnvs(envKey) = value
  20. }
  21. Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v =>
  22. executorEnvs("SPARK_PREPEND_CLASSES") = v
  23. }
  24. // The Mesos scheduler backend relies on this environment variable to set executor memory.
  25. // TODO: Set this only in the Mesos scheduler.
  26. executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
  27. executorEnvs ++= _conf.getExecutorEnv
  28. executorEnvs("SPARK_USER") = sparkUser

注册HeartbeatReceiver

  1. // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
  2. // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
  3. _heartbeatReceiver = env.rpcEnv.setupEndpoint(
  4. HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))

创建任务调度TaskScheduler

  1. // Create and start the scheduler
  2. val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
  3. _schedulerBackend = sched
  4. _taskScheduler = ts

创建和启动DAGScheduler

  1. _dagScheduler = new DAGScheduler(this)
  2. _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

TaskScheduler的启动

  1. // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
  2. // constructor
  3. _taskScheduler.start()

初始化快管理器BlockManager

  1. _applicationId = _taskScheduler.applicationId()
  2. _applicationAttemptId = taskScheduler.applicationAttemptId()
  3. _conf.set("spark.app.id", _applicationId)
  4. if (_conf.getBoolean("spark.ui.reverseProxy", false)) {
  5. System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId)
  6. }
  7. _ui.foreach(_.setAppId(_applicationId))
  8. _env.blockManager.initialize(_applicationId)

启动测量系统MetricsSystem

  1. // The metrics system for Driver need to be set spark.app.id to app ID.
  2. // So it should start after we get app ID from the task scheduler and set spark.app.id.
  3. _env.metricsSystem.start()
  4. // Attach the driver metrics servlet handler to the web ui after the metrics system is started.
  5. _env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))

创建和启动Executor分配管理器ExecutorAllocationManager

  1. // Optionally scale number of executors dynamically based on workload. Exposed for testing.
  2. val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
  3. _executorAllocationManager =
  4. if (dynamicAllocationEnabled) {
  5. schedulerBackend match {
  6. case b: ExecutorAllocationClient =>
  7. Some(new ExecutorAllocationManager(
  8. schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
  9. _env.blockManager.master))
  10. case _ =>
  11. None
  12. }
  13. } else {
  14. None
  15. }
  16. _executorAllocationManager.foreach(_.start())

ContextCleaner的创建与启动

  1. _cleaner =
  2. if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
  3. Some(new ContextCleaner(this))
  4. } else {
  5. None
  6. }
  7. _cleaner.foreach(_.start())

Spark环境更新

  1. setupAndStartListenerBus()
  2. postEnvironmentUpdate()
  3. postApplicationStart()

创建DAGSchedulerSource和BlockManagerSource

  1. // Post init
  2. _taskScheduler.postStartHook()
  3. _env.metricsSystem.registerSource(_dagScheduler.metricsSource)
  4. _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
  5. _executorAllocationManager.foreach { e =>
  6. _env.metricsSystem.registerSource(e.executorAllocationManagerSource)
  7. }

添加shutdown hook

  1. // Make sure the context is stopped if the user forgets about it. This avoids leaving
  2. // unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
  3. // is killed, though.
  4. logDebug("Adding shutdown hook") // force eager creation of logger
  5. _shutdownHookRef = ShutdownHookManager.addShutdownHook(
  6. ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
  7. logInfo("Invoking stop() from shutdown hook")
  8. try {
  9. stop()
  10. } catch {
  11. case e: Throwable =>
  12. logWarning("Ignoring Exception while stopping SparkContext from shutdown hook", e)
  13. }
  14. }