7.3.1 运行资源调优概述

7.3 运行资源调优 - 图1

在开发完 Spark 作业之后,就该为作业配置合适的资源了。Spark 的资源参数,基本都可以在 spark-submit 命令中作为参数设置。很多 Spark 初学者,通常不知道该设置哪些必要的参数,以及如何设置这些参数,最后就只能胡乱设置,甚至压根儿不设置。资源参数设置的不合理,可能会导致没有充分利用集群资源,作业运行会极其缓慢;或者设置的资源过大,队列没有足够的资源来提供,进而导致各种异常。

总之,无论是哪种情况,都会导致 Spark 作业的运行效率低下,甚至根本无法运行。因此我们必须对 Spark 作业的资源使用原理有一个清晰的认识,并知道在 Spark 作业运行过程中,有哪些资源参数是可以设置的,以及如何设置合适的参数值。

7.3.2 Spark 作业基本运行原理

image.png

详细原理见上图。我们使用 spark-submit 提交一个 Spark 作业之后,这个作业就会启动一个对应的 Driver 进程。根据你使用的部署模式(deploy-mode)不同,Driver 进程可能在本地启动,也可能在集群中某个工作节点上启动。Driver 进程本身会根据我们设置的参数,占有一定数量的内存和 CPU core。而 Driver 进程要做的第一件事情,就是向集群管理器(可以是 Spark Standalone 集群,也可以是其他的资源管理集群,美团 - 大众点评使用的是 YARN 作为资源管理集群)申请运行 Spark 作业需要使用的资源,这里的资源指的就是 Executor 进程。YARN 集群管理器会根据我们为 Spark 作业设置的资源参数,在各个工作节点上,启动一定数量的 Executor 进程,每个 Executor 进程都占有一定数量的内存和 CPU core。

在申请到了作业执行所需的资源之后,Driver 进程就会开始调度和执行我们编写的作业代码了。Driver 进程会将我们编写的 Spark 作业代码分拆为多个 stage,每个 stage 执行一部分代码片段,并为每个 stage 创建一批 task,然后将这些 task 分配到各个 Executor 进程中执行。task 是最小的计算单元,负责执行一模一样的计算逻辑(也就是我们自己编写的某个代码片段),只是每个 task 处理的数据不同而已。一个 stage 的所有 task 都执行完毕之后,会在各个节点本地的磁盘文件中写入计算中间结果,然后 Driver 就会调度运行下一个 stage。下一个 stage 的 task 的输入数据就是上一个 stage 输出的中间结果。如此循环往复,直到将我们自己编写的代码逻辑全部执行完,并且计算完所有的数据,得到我们想要的结果为止。

Spark 是根据 shuffle 类算子来进行 stage 的划分。如果我们的代码中执行了某个 shuffle 类算子(比如 reduceByKey、join 等),那么就会在该算子处,划分出一个 stage 界限来。可以大致理解为,shuffle 算子执行之前的代码会被划分为一个 stage,shuffle 算子执行以及之后的代码会被划分为下一个 stage。因此一个 stage 刚开始执行的时候,它的每个 task 可能都会从上一个 stage 的 task 所在的节点,去通过网络传输拉取需要自己处理的所有 key,然后对拉取到的所有相同的 key 使用我们自己编写的算子函数执行聚合操作(比如 reduceByKey() 算子接收的函数)。这个过程就是 shuffle。

当我们在代码中执行了 cache/persist 等持久化操作时,根据我们选择的持久化级别的不同,每个 task 计算出来的数据也会保存到 Executor 进程的内存或者所在节点的磁盘文件中。

因此 Executor 的内存主要分为三块:第一块是让 task 执行我们自己编写的代码时使用,默认是占 Executor 总内存的 20%;第二块是让 task 通过 shuffle 过程拉取了上一个 stage 的 task 的输出后,进行聚合等操作时使用,默认也是占 Executor 总内存的 20%;第三块是让 RDD 持久化时使用,默认占 Executor 总内存的 60%

task 的执行速度是跟每个 Executor 进程的 CPU core 数量有直接关系的。一个 CPU core 同一时间只能执行一个线程。而每个 Executor 进程上分配到的多个 task,都是以每个 task 一条线程的方式,多线程并发运行的。如果 CPU core 数量比较充足,而且分配到的 task 数量比较合理,那么通常来说,可以比较快速和高效地执行完这些 task 线程。

以上就是 Spark 作业的基本运行原理的说明,大家可以结合上图来理解。理解作业基本原理,是我们进行资源参数调优的基本前提。

7.3.3 运行资源中的几种情况

几种情况:

  • 实践中跑的 Spark job,有的特别,并且查看 CPU,发现 CPU 利用率很低,可以尝试减少每个 executor 占用 CPU core 的数量增加并行的 executor 数量,同时配合增加分片,整体上增加了 CPU 的利用率,加快数据处理速度。
  • 发现某 job 很容易发生内存溢出,我们就增大分片数量,从而减少了每片数据的规模,同时可以减少并行的 executor 数量,这样相同的内存资源分配给数量更少的 executor,相当于增加了每个 task 的内存分配,这样运行速度可能慢了些,但是总比 OOM 强。
  • 数据量特别少,有大量的小文件生成就减少文件分片,没必要创建那么多 task,这种情况,如果只是最原始的 input 比较小,一般都能被注意到;但是,如果是在运算过程中,比如应用某个 reduceBy 或者某个 filter 以后,数据大量减少,这种低效情况就很少被留意到。

7.3.4 运行资源参数调优

了解完了 Spark 作业运行的基本原理之后,对资源相关的参数就容易理解了。所谓的 Spark 资源参数调优,其实主要就是对 Spark 运行过程中各个使用资源的地方,通过调节各种参数,来优化资源使用的效率,从而提升 Spark 作业的执行性能。以下参数就是 Spark 中主要的资源参数,每个参数都对应着作业运行原理中的某个部分,我们同时也给出了一个调优的参考值。

num-executors — YARN-only

参数说明该参数用于设置 Spark 作业总共要用多少个 Executor 进程来执行。Driver 在向 YARN 集群管理器申请资源时,YARN 集群管理器会尽可能按照你的设置来在集群的各个工作节点上,启动相应数量的 Executor 进程。这个参数非常之重要,如果不设置的话,默认只会给你启动少量的 Executor 进程,此时你的 Spark 作业的运行速度是非常慢的。

参数调优建议:每个 Spark 作业的运行一般设置 50~100 个左右的 Executor 进程比较合适,设置太少或太多的 Executor 进程都不好。设置的太少,无法充分利用集群资源;设置的太多的话,大部分队列可能无法给予充分的资源。

executor-memory

参数说明:
该参数用于设置每个 Executor 进程的内存**。Executor 内存的大小,很多时候直接决定了 Spark 作业的性能,而且跟常见的 JVM OOM 异常,也有直接的关联。

参数调优建议:每个 Executor 进程的内存设置 4G~8G 较为合适。但是这只是一个参考值,具体的设置还是得根据不同部门的资源队列来定。可以看看自己团队的资源队列的最大内存限制是多少,num-executors * executor-memory,是不能超过队列的最大内存量的。此外,如果你是跟团队里其他人共享这个资源队列,那么申请的内存量最好不要超过资源队列最大总内存的 1/3~1/2,避免你自己的 Spark 作业占用了队列所有的资源,导致别的同学的作业无法运行。

executor-cores — Spark standalone and YARN only

参数说明:该参数用于设置每个 Executor 进程的 CPU core 数量。这个参数决定了每个 Executor 进程并行执行 task 线程的能力。因为每个 CPU core 同一时间只能执行一个 task 线程,因此每个 Executor 进程的 CPU core 数量越多,越能够快速地执行完分配给自己的所有 task 线程。

参数调优建议:Executor 的 CPU core 数量设置为 2~4 个较为合适。同样得根据不同部门的资源队列来定,可以看看自己的资源队列的最大 CPU core 限制是多少,再依据设置的 Executor 数量,来决定每个 Executor 进程可以分配到几个 CPU core。同样建议,如果是跟他人共享这个队列,那么 num-executors * executor-cores 不要超过队列总 CPU core 的 1/3~1/2 左右比较合适,也是避免影响其他同学的作业运行。

driver-memory

参数说明:
该参数用于设置 Driver 进程的内存**。

参数调优建议:Driver 的内存通常来说不设置,或者设置 1G 左右应该就够了。唯一需要注意的一点是,如果需要使用 collect 算子将 RDD 的数据全部拉取到 Driver 上进行处理,那么必须确保 Driver 的内存足够大,否则会出现 OOM 内存溢出的问题。

spark.default.parallelism

参数说明:
该参数用于设置每个 stage 的默认 task 数量**。这个参数极为重要,如果不设置可能会直接影响你的 Spark 作业性能。

参数调优建议:Spark 作业的默认 task 数量为 500~1000 个较为合适。很多同学常犯的一个错误就是不去设置这个参数,那么此时就会导致 Spark 自己根据底层 HDFS 的 block 数量来设置 task 的数量,默认是一个 HDFS block 对应一个 task。通常来说,Spark 默认设置的数量是偏少的(比如就几十个 task),如果 task 数量偏少的话,就会导致你前面设置好的 Executor 的参数都前功尽弃。试想一下,无论你的 Executor 进程有多少个,内存和 CPU 有多大,但是 task 只有 1 个或者 10 个,那么 90% 的 Executor 进程可能根本就没有 task 执行,也就是白白浪费了资源!因此 Spark 官网建议的设置原则是,设置该参数为 num-executors * executor-cores 2~3 倍 较为合适,比如 Executor 的总 CPU core 数量为 300 个,那么设置 1000 个 task 是可以的,此时可以充分地利用 Spark 集群的资源。

扩展知识:

假设一个父 RDD 要执行 reduceByKey 任务,我们可以显式的指定分区器 val rdd_child = rdd_parent.reduceByKey(new HashPartitioner(3), _+_)。HashPartitioner 构造参数 3 就是分区数量,也是启动的 reduce task 数量,也是 reduceByKey 结果返回的子 RDD 的 partitions 方法返回的数组的长度。

如果没有显式指定分区器,则会调用 org.apache.spark 包下伴生对象 Partitioner 的 defaultPartitioner 静态方法返回的分区器作为默认分区器。

defaultPartitioner 返回默认分区器的过程:尝试利用父 RDD 的 partitioner,如果父 RDD 没有 partitioner,则会查看 sparkConf 中是否定义了 spark.default.parallelism 配置参数,如果定义了就返回 new HashPartitioner(sc.defaultParallelism) 作为默认分区器,如果没定义就返回 new HashPartitioner(rdd_parent.partitions.length) 作为默认分区器。

defaultPartitioner 源码:

  1. //org.apache.spark包下伴生对象object Partitioner的方法
  2. def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
  3. val rdds = (Seq(rdd) ++ others)
  4. val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0))
  5. if (hasPartitioner.nonEmpty) {
  6. hasPartitioner.maxBy(_.partitions.length).partitioner.get
  7. } else {
  8. if (rdd.context.conf.contains("spark.default.parallelism")) {
  9. new HashPartitioner(rdd.context.defaultParallelism)
  10. } else {
  11. new HashPartitioner(rdds.map(_.partitions.length).max)
  12. }
  13. }
  14. }

spark.storage.memoryFraction

参数说明:该参数用于设置 RDD 持久化数据在 Executor 内存中能占的比例,默认是 0.6。也就是说,默认 Executor 60% 的内存,可以用来保存持久化的 RDD 数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘

参数调优建议:如果 Spark 作业中,有较多的 RDD 持久化操作该参数的值可以适当提高一些,保证持久化的数据能够容纳在内存中。避免内存不够缓存所有的数据,导致数据只能写入磁盘中,降低了性能。但是如果 Spark 作业中的 shuffle 类操作比较多而持久化操作比较少那么这个参数的值适当降低一些比较合适。此外,如果发现作业由于频繁的 gc 导致运行缓慢(通过 spark web ui 可以观察到作业的 gc 消耗),意味着 task 执行用户代码的内存不够用,那么同样建议调低这个参数的值。

spark.shuffle.memoryFraction

参数说明:该参数用于设置 shuffle 过程中一个 task 拉取到上个 stage 的 task 的输出后,进行聚合操作时能够使用的 Executor 内存的比例,默认是 0.2。也就是说,Executor 默认只有 20% 的内存用来进行该操作。shuffle 操作在进行聚合时,如果发现使用的内存超出了这个 20% 的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。

参数调优建议:如果 Spark 作业中的 RDD 持久化操作较少shuffle 操作较多时建议降低持久化操作的内存占比,提高 shuffle 操作的内存占比比例,避免 shuffle 过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的 gc 导致运行缓慢,意味着 task 执行用户代码的内存不够用,那么同样建议调低这个参数的值。

小结

资源参数的调优,没有一个固定的值,需要同学们根据自己的实际情况(包括 Spark 作业中的 shuffle 操作数量、RDD 持久化操作数量以及 spark web ui 中显示的作业 gc 情况),同时参考本篇文章中给出的原理以及调优建议,合理地设置上述参数。

一个 CPU core 同一时间只能执行一个线程。而每个 Executor 进程上分配到的多个 task,都是以每个 task 一条线程的方式,多线程并发运行的。

资源参数参考示例

以下是一份 spark-submit 命令的示例,大家可以参考一下,并根据自己的
实际情况进行调节**:

  1. ./bin/spark-submit \
  2. --master yarn-cluster \
  3. --num-executors 100 \
  4. --executor-memory 6G \
  5. --executor-cores 4 \
  6. --driver-memory 1G \
  7. --conf spark.default.parallelism=1000 \
  8. --conf spark.storage.memoryFraction=0.5 \
  9. --conf spark.shuffle.memoryFraction=0.3 \

bin/spark-submit -help 帮助参数如下:

  1. $ bin/spark-submit -help
  2. Error: Unrecognized option: -help
  3. Usage: spark-submit [options] <app jar | python file> [app arguments]
  4. Usage: spark-submit --kill [submission ID] --master [spark://...]
  5. Usage: spark-submit --status [submission ID] --master [spark://...]
  6. Usage: spark-submit run-example [options] example-class [example args]
  7. Options:
  8. --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local.
  9. --deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or
  10. on one of the worker machines inside the cluster ("cluster")
  11. (Default: client).
  12. --class CLASS_NAME Your application's main class (for Java / Scala apps).
  13. --name NAME A name of your application.
  14. --jars JARS Comma-separated list of local jars to include on the driver
  15. and executor classpaths.
  16. --packages Comma-separated list of maven coordinates of jars to include
  17. on the driver and executor classpaths. Will search the local
  18. maven repo, then maven central and any additional remote
  19. repositories given by --repositories. The format for the
  20. coordinates should be groupId:artifactId:version.
  21. --exclude-packages Comma-separated list of groupId:artifactId, to exclude while
  22. resolving the dependencies provided in --packages to avoid
  23. dependency conflicts.
  24. --repositories Comma-separated list of additional remote repositories to
  25. search for the maven coordinates given with --packages.
  26. --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place
  27. on the PYTHONPATH for Python apps.
  28. --files FILES Comma-separated list of files to be placed in the working
  29. directory of each executor.
  30. --conf PROP=VALUE Arbitrary Spark configuration property.
  31. --properties-file FILE Path to a file from which to load extra properties. If not
  32. specified, this will look for conf/spark-defaults.conf.
  33. --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 1024M).
  34. --driver-java-options Extra Java options to pass to the driver.
  35. --driver-library-path Extra library path entries to pass to the driver.
  36. --driver-class-path Extra class path entries to pass to the driver. Note that
  37. jars added with --jars are automatically included in the
  38. classpath.
  39. --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).
  40. --proxy-user NAME User to impersonate when submitting the application.
  41. This argument does not work with --principal / --keytab.
  42. --help, -h Show this help message and exit.
  43. --verbose, -v Print additional debug output.
  44. --version, Print the version of current Spark.
  45. Spark standalone with cluster deploy mode only:
  46. --driver-cores NUM Cores for driver (Default: 1).
  47. Spark standalone or Mesos with cluster deploy mode only:
  48. --supervise If given, restarts the driver on failure.
  49. --kill SUBMISSION_ID If given, kills the driver specified.
  50. --status SUBMISSION_ID If given, requests the status of the driver specified.
  51. Spark standalone and Mesos only:
  52. --total-executor-cores NUM Total cores for all executors.
  53. Spark standalone and YARN only:
  54. --executor-cores NUM Number of cores per executor. (Default: 1 in YARN mode, #注意:设置这个参数的时候会出现 bug,分配的 executor 核心数不起作用!!!
  55. or all available cores on the worker in standalone mode)
  56. YARN-only:
  57. --driver-cores NUM Number of cores used by the driver, only in cluster mode
  58. (Default: 1).
  59. --queue QUEUE_NAME The YARN queue to submit to (Default: "default").
  60. --num-executors NUM Number of executors to launch (Default: 2).
  61. If dynamic allocation is enabled, the initial number of
  62. executors will be at least NUM.
  63. --archives ARCHIVES Comma separated list of archives to be extracted into the
  64. working directory of each executor.
  65. --principal PRINCIPAL Principal to be used to login to KDC, while running on
  66. secure HDFS.
  67. --keytab KEYTAB The full path to the file that contains the keytab for the
  68. principal specified above. This keytab will be copied to
  69. the node running the Application Master via the Secure
  70. Distributed Cache, for renewing the login tickets and the
  71. delegation tokens periodically.