1 编译与CDH集成
下载Spark源码:Spark3.2.1<br /> 官方下载:[https://spark.apache.org/downloads.html](https://spark.apache.org/downloads.html)<br />选择源码包:<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/25487103/1651733021741-3f24293c-88b9-4a2d-a96c-688e6ccf40a5.png#clientId=u83c595f9-1277-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=232&id=u6f0c6470&margin=%5Bobject%20Object%5D&name=image.png&originHeight=232&originWidth=1003&originalType=binary&ratio=1&rotation=0&showTitle=false&size=41529&status=done&style=stroke&taskId=uc094f5ef-e69b-40a7-ba03-fcd7fa86edc&title=&width=1003)<br />注意:Spark3只支持Scala2.12+<br />指定CDH环境hadoop、hive、scala版本并编译Spark<br />编译命令:<br />./dev/make-distribution.sh --name 3.0.0-cdh6.3.2 --tgz -Phive -Phive-thriftserver -Pyarn -Phadoop-3.0.0 -Dhadoop.version=3.0.0-cdh6.3.2 -DskipTests clean package -X<br />编译好的gz包在当前目录下,上传至部署服务器中
2 部署Spark
解压gz包:
tar -zxvf /opt/spark-3.2.1-bin-3.0.0-cdh6.3.2.tar.gz
赋予root权限:
chown -R root:root /opt/spark-3.2.1-bin-3.0.0-cdh6.3.2
建立软连接方便管理:
ln -s /opt/spark-3.2.1-bin-3.0.0-cdh6.3.2 /opt/spark3
修改配置:
cp -r /opt/spark3/conf/spark-defaults.conf.template /opt/spark3/conf/spark-defaults.conf
cp -r /opt/spark3/conf/spark-env.sh.template /opt/spark3/conf/spark-env.sh
cp -r /opt/spark3/conf/workers.template /opt/spark3/conf/workers
拷贝cdh自带spark的默认配置spark-default.conf并修改为如下配置:
spark.authenticate=false
spark.driver.log.dfsDir=/user/spark3/driverLogs
spark.driver.log.persistToDfs.enabled=true
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.executorIdleTimeout=60
spark.dynamicAllocation.cachedExecutorIdleTimeout=600
spark.dynamicAllocation.minExecutors=0
spark.dynamicAllocation.maxExecutors=100
spark.dynamicAllocation.schedulerBacklogTimeout=1
spark.eventLog.enabled=true
spark.io.encryption.enabled=false
spark.network.crypto.enabled=false
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.shuffle.service.enabled=true
spark.shuffle.service.port=7338
spark.ui.enabled=true
spark.ui.killEnabled=true
spark.lineage.log.dir=/var/log/spark3/lineage
spark.lineage.enabled=true
spark.master=yarn
spark.submit.deployMode=client
spark.eventLog.dir=hdfs://hdfs/user/spark3/applicationHistory
spark.yarn.historyServer.address=http://data-master2.bj.sm:18088
spark.driver.extraLibraryPath=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hadoop/lib/native
spark.executor.extraLibraryPath=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hadoop/lib/native
spark.yarn.am.extraLibraryPath=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hadoop/lib/native
spark.yarn.config.gatewayPath=/opt/cloudera/parcels
spark.yarn.config.replacementPath={{HADOOP_COMMON_HOME}}/../../..
spark.yarn.historyServer.allowTracking=true
spark的环境配置spark-env.sh并修改为如下配置:
#shumei spark env properties
export SPARK_HOME=/opt/spark3
export HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoop
export HADOOP_CONF_DIR=/etc/hadoop/conf
export YARN_CONF_DIR=/etc/hadoop/conf.cloudera.yarn
workers中添加所有datanode节点:
data-slave1.bj.sm
data-slave2.bj.sm
data-slave3.bj.sm
data-slave4.bj.sm
data-slave5.bj.sm
data-slave6.bj.sm
data-slave7.bj.sm
data-slave8.bj.sm
data-slave9.bj.sm
data-slave10.bj.sm
data-slave11.bj.sm
data-slave12.bj.sm
data-slave13.bj.sm
data-slave14.bj.sm
data-slave15.bj.sm
data-slave16.bj.sm
data-slave17.bj.sm
data-slave18.bj.sm
data-slave19.bj.sm
data-slave20.bj.sm
data-slave21.bj.sm
data-slave22.bj.sm
data-slave23.bj.sm
data-slave24.bj.sm
data-slave25.bj.sm
data-slave26.bj.sm
在conf下添加cdh环境配置文件软连接:
ln -s /etc/hadoop/conf/core-site.xml /opt/spark3/conf/core-site.xml
ln -s /etc/hadoop/conf/hdfs-site.xml /opt/spark3/conf/hdfs-site.xml
ln -s /etc/hive/conf/hive-site.xml /opt/spark3/conf/hive-site.xml
测试spark-shell是否正常启动:
/opt/spark3/bin/spark-shell
在需要Spark的机器上部署即可,无需分布式部署
3 配置
3.1 调度配置
调度相关的配置主要有:
- 数据本地性相关,可以控制判断数据本地性的时间 - 任务调度的模式,如FIFO, FAIR等
- 黑名单机制,如失败多少次这个executor拉入黑名单、失败多少次节点拉入黑名单、黑名单后多长时间能再次启用
- 推测执行相关,为长时间的task重新启动一个task,哪个先完成就使用哪个的结果,并Kill掉另一个task。
- 任务相关,比如使用的CPU、重试的次数等
# 开启speculative,默认关闭
spark.speculation=true
# 检测周期,单位毫秒
spark.speculation.interval=100
# 任务完成的百分比,比如同一个stage中task的完成占比
spark.speculation.quantile=0.75
# 任务延迟的比例,比如当75%的task都完成,那么取他们的中位数跟还未执行完的任务作对比。如果超过1.5倍,则开启推测执行。
spark.speculation.multiplier=1.5
spark.cores.max
当运行在standalone和mesos,应用可以使用的最大核数。如果没有配置,默认是 spark.deploy.defaultCores。
spark.locality.wait
默认值,3s
启动数据本地任务等待多长时间后启动非本地节点。数据本地性有多种级别,进程本地性、节点本地性、机架本地性或其他。也可以根据不同的级别配置等待时间,如 spark.locality.wait.node 等。如果任务时间长或者很少看见本地性可以调整该配置。
spark.locality.wait.node
默认值,spark.locality.wait
定义节点本地性的等待时间,比如,可以配置0跳过节点本地性,直接使用rack本地性。
spark.locality.wait.process
默认值,spark.locality.wait
定义进程本地性的等待时间
spark.locality.wait.rack
定义机架本地性的等待时间
spark.scheduler.maxRegisteredResourcesWaitingTime
默认值,30s
调度任务前等待资源注册的时间
spark.scheduler.minRegisteredResourcesRatio
默认值,yarn模式下0.8,standalone和mesos模式下0.0
spark.scheduler.mode
默认值,FIFO
在相同的SparkContext中提交的任务调度模式,默认先进先出,也可以是FAIR公平调度
spark.scheduler.revive.interval
默认值,1s
检查worker资源准备的间隔时间
spark.scheduler.listenerbus.eventqueue.capacity
默认值,10000
Spark监听总线的事件队列长度,必须大于0.如果监听的事件被丢弃,可以增加该值。增加会使driver需要更多的内存。
spark.scheduler.blacklist.unschedulableTaskSetTimeout
默认值,120s
spark.blacklist.enabled
默认值,false
如果为true,阻止spark在失败多次而进入黑名单的executor上 调度任务。
spark.blacklist.timeout
默认值,1h
当节点或执行者被当做黑名单时的时间,之后会从黑名单移除正常参与执行任务。
spark.blacklist.task.maxTaskAttemptsPerExecutor
默认值,1
试验特性,对于一个task,在一个executor进入黑名单前可以执行重试多少次。
spark.blacklist.task.maxTaskAttemptsPerNode
默认值,2
试验特性,对于一个task,在一个node成为黑名单前可以执行重试多少次。
spark.blacklist.stage.maxFailedTasksPerExecutor
默认值,2
试验特性,executor针对某个stage成为黑名单需要失败多少个任务。
spark.blacklist.stage.maxFailedExecutorsPerNode
默认值,2
试验特性,在一个节点上,有几个executor针对某个stage成为黑名单,这个node才算黑名单。
spark.blacklist.application.maxFailedTasksPerExecutor
默认值,2
试验特性,有多少不同的任务在同一个executor失败后,这个executor针对整个应用成为黑名单。黑名单的executor当 spark.blacklist.timeout 超时后,会被自动添加到资源池中。注意如果使用了动态分配,那么executor可能会被标记为空闲,从而被资源调度框架回收。
spark.blacklist.application.maxFailedExecutorsPerNode
默认值,2
试验特性,针对整个应用,有多少不同的executor成为黑名单后,这个节点也会被标记为黑名单。
spark.blacklist.killBlacklistedExecutors
默认值,false
如果为true,允许spark立即删除黑名单的executor。如果node被标记为黑名单,那么上面那所有的executor都会被kill。
spark.blacklist.application.fetchFailure.enabled
默认值,false
如果配置为true,当executor执行fetch操作报错时,将直接会拉入黑名单。如果使用外部shuffle服务,那么整个节点将会被拉入黑名单。
spark.speculation
默认值,false
如果为true,将会针对任务进行推测执行。比如某个任务执行缓慢,会再开启一个任务,哪个执行快用哪个作为结果。
spark.speculation.interval
默认值,100ms
针对任务进行探测的间隔时间
spark.speculation.multiplier
默认值,1.5
任务比均值慢多少将会执行推测
spark.speculation.quantile
默认值,0.75
在进行推测前任务需要完成多少
spark.task.cpus
默认值,1
每个task允许使用的核数
spark.task.maxFailures
默认值,4
任务在放弃执行前可以允许的失败次数。不同的任务的失败次数,不会导致job失败。
spark.task.reaper.enabled
默认值,false
启用任务的停止监控。当配置成true,任何任务被kill,都会被executor监控到,直到任务完成。参考 spark.task.reaper.* 配置。
spark.task.reaper.pollingInterval
默认值,10s
当enabled为true时,这个配置控制executor多长时间检测一次。
spark.task.reaper.threadDump
默认值,true
当任务停止后dump出日志。
spark.task.reaper.killTimeout
默认值,-1
用来配置任务无法终止时,使用JVM进行停止的等待时间。-1为禁用。
spark.stage.maxConsecutiveAttempts
默认值,4
在stage停止前可以尝试的次数。