1 编译与CDH集成

  1. 下载Spark源码:Spark3.2.1<br /> 官方下载:[https://spark.apache.org/downloads.html](https://spark.apache.org/downloads.html)<br />选择源码包:<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/25487103/1651733021741-3f24293c-88b9-4a2d-a96c-688e6ccf40a5.png#clientId=u83c595f9-1277-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=232&id=u6f0c6470&margin=%5Bobject%20Object%5D&name=image.png&originHeight=232&originWidth=1003&originalType=binary&ratio=1&rotation=0&showTitle=false&size=41529&status=done&style=stroke&taskId=uc094f5ef-e69b-40a7-ba03-fcd7fa86edc&title=&width=1003)<br />注意:Spark3只支持Scala2.12+<br />指定CDH环境hadoop、hive、scala版本并编译Spark<br />编译命令:<br />./dev/make-distribution.sh --name 3.0.0-cdh6.3.2 --tgz -Phive -Phive-thriftserver -Pyarn -Phadoop-3.0.0 -Dhadoop.version=3.0.0-cdh6.3.2 -DskipTests clean package -X<br />编译好的gz包在当前目录下,上传至部署服务器中

2 部署Spark

解压gz包:
tar -zxvf /opt/spark-3.2.1-bin-3.0.0-cdh6.3.2.tar.gz
赋予root权限:
chown -R root:root /opt/spark-3.2.1-bin-3.0.0-cdh6.3.2
建立软连接方便管理:
ln -s /opt/spark-3.2.1-bin-3.0.0-cdh6.3.2 /opt/spark3
修改配置:
cp -r /opt/spark3/conf/spark-defaults.conf.template /opt/spark3/conf/spark-defaults.conf
cp -r /opt/spark3/conf/spark-env.sh.template /opt/spark3/conf/spark-env.sh
cp -r /opt/spark3/conf/workers.template /opt/spark3/conf/workers
拷贝cdh自带spark的默认配置spark-default.conf并修改为如下配置:

  1. spark.authenticate=false
  2. spark.driver.log.dfsDir=/user/spark3/driverLogs
  3. spark.driver.log.persistToDfs.enabled=true
  4. spark.dynamicAllocation.enabled=true
  5. spark.dynamicAllocation.executorIdleTimeout=60
  6. spark.dynamicAllocation.cachedExecutorIdleTimeout=600
  7. spark.dynamicAllocation.minExecutors=0
  8. spark.dynamicAllocation.maxExecutors=100
  9. spark.dynamicAllocation.schedulerBacklogTimeout=1
  10. spark.eventLog.enabled=true
  11. spark.io.encryption.enabled=false
  12. spark.network.crypto.enabled=false
  13. spark.serializer=org.apache.spark.serializer.KryoSerializer
  14. spark.shuffle.service.enabled=true
  15. spark.shuffle.service.port=7338
  16. spark.ui.enabled=true
  17. spark.ui.killEnabled=true
  18. spark.lineage.log.dir=/var/log/spark3/lineage
  19. spark.lineage.enabled=true
  20. spark.master=yarn
  21. spark.submit.deployMode=client
  22. spark.eventLog.dir=hdfs://hdfs/user/spark3/applicationHistory
  23. spark.yarn.historyServer.address=http://data-master2.bj.sm:18088
  24. spark.driver.extraLibraryPath=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hadoop/lib/native
  25. spark.executor.extraLibraryPath=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hadoop/lib/native
  26. spark.yarn.am.extraLibraryPath=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hadoop/lib/native
  27. spark.yarn.config.gatewayPath=/opt/cloudera/parcels
  28. spark.yarn.config.replacementPath={{HADOOP_COMMON_HOME}}/../../..
  29. spark.yarn.historyServer.allowTracking=true

spark的环境配置spark-env.sh并修改为如下配置:

  1. #shumei spark env properties
  2. export SPARK_HOME=/opt/spark3
  3. export HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoop
  4. export HADOOP_CONF_DIR=/etc/hadoop/conf
  5. export YARN_CONF_DIR=/etc/hadoop/conf.cloudera.yarn

workers中添加所有datanode节点:

  1. data-slave1.bj.sm
  2. data-slave2.bj.sm
  3. data-slave3.bj.sm
  4. data-slave4.bj.sm
  5. data-slave5.bj.sm
  6. data-slave6.bj.sm
  7. data-slave7.bj.sm
  8. data-slave8.bj.sm
  9. data-slave9.bj.sm
  10. data-slave10.bj.sm
  11. data-slave11.bj.sm
  12. data-slave12.bj.sm
  13. data-slave13.bj.sm
  14. data-slave14.bj.sm
  15. data-slave15.bj.sm
  16. data-slave16.bj.sm
  17. data-slave17.bj.sm
  18. data-slave18.bj.sm
  19. data-slave19.bj.sm
  20. data-slave20.bj.sm
  21. data-slave21.bj.sm
  22. data-slave22.bj.sm
  23. data-slave23.bj.sm
  24. data-slave24.bj.sm
  25. data-slave25.bj.sm
  26. data-slave26.bj.sm

在conf下添加cdh环境配置文件软连接:
ln -s /etc/hadoop/conf/core-site.xml /opt/spark3/conf/core-site.xml
ln -s /etc/hadoop/conf/hdfs-site.xml /opt/spark3/conf/hdfs-site.xml
ln -s /etc/hive/conf/hive-site.xml /opt/spark3/conf/hive-site.xml
测试spark-shell是否正常启动:
/opt/spark3/bin/spark-shell
在需要Spark的机器上部署即可,无需分布式部署

3 配置

3.1 调度配置
调度相关的配置主要有:
- 数据本地性相关,可以控制判断数据本地性的时间 - 任务调度的模式,如FIFO, FAIR等
- 黑名单机制,如失败多少次这个executor拉入黑名单、失败多少次节点拉入黑名单、黑名单后多长时间能再次启用
- 推测执行相关,为长时间的task重新启动一个task,哪个先完成就使用哪个的结果,并Kill掉另一个task。
- 任务相关,比如使用的CPU、重试的次数等

  1. # 开启speculative,默认关闭
  2. spark.speculation=true
  3. # 检测周期,单位毫秒
  4. spark.speculation.interval=100
  5. # 任务完成的百分比,比如同一个stage中task的完成占比
  6. spark.speculation.quantile=0.75
  7. # 任务延迟的比例,比如当75%的task都完成,那么取他们的中位数跟还未执行完的任务作对比。如果超过1.5倍,则开启推测执行。
  8. spark.speculation.multiplier=1.5
  9. spark.cores.max
  10. 当运行在standalonemesos,应用可以使用的最大核数。如果没有配置,默认是 spark.deploy.defaultCores
  11. spark.locality.wait
  12. 默认值,3s
  13. 启动数据本地任务等待多长时间后启动非本地节点。数据本地性有多种级别,进程本地性、节点本地性、机架本地性或其他。也可以根据不同的级别配置等待时间,如 spark.locality.wait.node 等。如果任务时间长或者很少看见本地性可以调整该配置。
  14. spark.locality.wait.node
  15. 默认值,spark.locality.wait
  16. 定义节点本地性的等待时间,比如,可以配置0跳过节点本地性,直接使用rack本地性。
  17. spark.locality.wait.process
  18. 默认值,spark.locality.wait
  19. 定义进程本地性的等待时间
  20. spark.locality.wait.rack
  21. 定义机架本地性的等待时间
  22. spark.scheduler.maxRegisteredResourcesWaitingTime
  23. 默认值,30s
  24. 调度任务前等待资源注册的时间
  25. spark.scheduler.minRegisteredResourcesRatio
  26. 默认值,yarn模式下0.8standalonemesos模式下0.0
  27. spark.scheduler.mode
  28. 默认值,FIFO
  29. 在相同的SparkContext中提交的任务调度模式,默认先进先出,也可以是FAIR公平调度
  30. spark.scheduler.revive.interval
  31. 默认值,1s
  32. 检查worker资源准备的间隔时间
  33. spark.scheduler.listenerbus.eventqueue.capacity
  34. 默认值,10000
  35. Spark监听总线的事件队列长度,必须大于0.如果监听的事件被丢弃,可以增加该值。增加会使driver需要更多的内存。
  36. spark.scheduler.blacklist.unschedulableTaskSetTimeout
  37. 默认值,120s
  38. spark.blacklist.enabled
  39. 默认值,false
  40. 如果为true,阻止spark在失败多次而进入黑名单的executor 调度任务。
  41. spark.blacklist.timeout
  42. 默认值,1h
  43. 当节点或执行者被当做黑名单时的时间,之后会从黑名单移除正常参与执行任务。
  44. spark.blacklist.task.maxTaskAttemptsPerExecutor
  45. 默认值,1
  46. 试验特性,对于一个task,在一个executor进入黑名单前可以执行重试多少次。
  47. spark.blacklist.task.maxTaskAttemptsPerNode
  48. 默认值,2
  49. 试验特性,对于一个task,在一个node成为黑名单前可以执行重试多少次。
  50. spark.blacklist.stage.maxFailedTasksPerExecutor
  51. 默认值,2
  52. 试验特性,executor针对某个stage成为黑名单需要失败多少个任务。
  53. spark.blacklist.stage.maxFailedExecutorsPerNode
  54. 默认值,2
  55. 试验特性,在一个节点上,有几个executor针对某个stage成为黑名单,这个node才算黑名单。
  56. spark.blacklist.application.maxFailedTasksPerExecutor
  57. 默认值,2
  58. 试验特性,有多少不同的任务在同一个executor失败后,这个executor针对整个应用成为黑名单。黑名单的executor spark.blacklist.timeout 超时后,会被自动添加到资源池中。注意如果使用了动态分配,那么executor可能会被标记为空闲,从而被资源调度框架回收。
  59. spark.blacklist.application.maxFailedExecutorsPerNode
  60. 默认值,2
  61. 试验特性,针对整个应用,有多少不同的executor成为黑名单后,这个节点也会被标记为黑名单。
  62. spark.blacklist.killBlacklistedExecutors
  63. 默认值,false
  64. 如果为true,允许spark立即删除黑名单的executor。如果node被标记为黑名单,那么上面那所有的executor都会被kill
  65. spark.blacklist.application.fetchFailure.enabled
  66. 默认值,false
  67. 如果配置为true,当executor执行fetch操作报错时,将直接会拉入黑名单。如果使用外部shuffle服务,那么整个节点将会被拉入黑名单。
  68. spark.speculation
  69. 默认值,false
  70. 如果为true,将会针对任务进行推测执行。比如某个任务执行缓慢,会再开启一个任务,哪个执行快用哪个作为结果。
  71. spark.speculation.interval
  72. 默认值,100ms
  73. 针对任务进行探测的间隔时间
  74. spark.speculation.multiplier
  75. 默认值,1.5
  76. 任务比均值慢多少将会执行推测
  77. spark.speculation.quantile
  78. 默认值,0.75
  79. 在进行推测前任务需要完成多少
  80. spark.task.cpus
  81. 默认值,1
  82. 每个task允许使用的核数
  83. spark.task.maxFailures
  84. 默认值,4
  85. 任务在放弃执行前可以允许的失败次数。不同的任务的失败次数,不会导致job失败。
  86. spark.task.reaper.enabled
  87. 默认值,false
  88. 启用任务的停止监控。当配置成true,任何任务被kill,都会被executor监控到,直到任务完成。参考 spark.task.reaper.* 配置。
  89. spark.task.reaper.pollingInterval
  90. 默认值,10s
  91. enabledtrue时,这个配置控制executor多长时间检测一次。
  92. spark.task.reaper.threadDump
  93. 默认值,true
  94. 当任务停止后dump出日志。
  95. spark.task.reaper.killTimeout
  96. 默认值,-1
  97. 用来配置任务无法终止时,使用JVM进行停止的等待时间。-1为禁用。
  98. spark.stage.maxConsecutiveAttempts
  99. 默认值,4
  100. stage停止前可以尝试的次数。