一、WordCount示例

  1. // 基本实现
  2. package com.pg.learning.demo
  3. import org.apache.spark.rdd.RDD
  4. import org.apache.spark.{SparkConf, SparkContext}
  5. object FirstSparkDemo {
  6. def main(args: Array[String]): Unit = {
  7. //通过SparkContext建立与spark之间的连接,设置对应的属性
  8. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
  9. val sc = new SparkContext(sparkConf)
  10. //通过连接进行操作的业务
  11. val line: RDD[String] = sc.textFile("words.txt")
  12. //val line: RDD[String] = sc.wholeTextFile(path , minPartitions=2)// 读取小文件可用,通过设置minPartitions避免过多的分区
  13. val groupRDD: RDD[(String, Iterable[(String, Int)])] = line.flatMap(_.split(" ")).map((_,1)).groupBy(_._1)
  14. val result: RDD[(String, Int)] = groupRDD.map{case(word,list) => {(word,list.size)}}
  15. //输出
  16. val tuples: Array[(String, Int)] = result.collect()
  17. tuples.foreach(println)
  18. //关闭连接
  19. sc.stop()
  20. }
  21. }
  22. //使用spark的api实现
  23. object WordCountWithSparkAPI {
  24. def main(args: Array[String]): Unit = {
  25. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCountWithSparkAPI")
  26. val sc = new SparkContext(sparkConf)
  27. val line: RDD[String] = sc.textFile("words.txt")
  28. val mapRDD: RDD[(String, Int)] = line.flatMap(_.split(" ")).map((_,1))
  29. //reduceByKey:相同key的数据,可以对value进行reduce聚合
  30. val result: RDD[(String, Int)] = mapRDD.reduceByKey(_+_)
  31. val tuples: Array[(String, Int)] = result.collect()
  32. tuples.foreach(println)
  33. sc.stop()
  34. }
  35. }

二、spark运行模式

2.1、local模式

1、通过命令进行命令行模式:/bin/spark-shell ,可以在命令行工具执行spark语句。
2、通过sumbit提交:

  1. spark-submit \
  2. --master local[2] \
  3. --class T1 \
  4. --name T1 \
  5. --driver-memory 2g \
  6. --driver-cores 2 \
  7. --num-executors 10 \
  8. --executor-cores 1 \
  9. --executor-memory 2g \
  10. --conf spark.yarn.executor.memoryOverhead=4096 \
  11. --conf spark.network.timeout=10000000 \
  12. --conf spark.executor.heartbeatInterval=10000000 \
  13. --conf spark.shuffle.service.enabled=true \
  14. --conf "spark.driver.extraJavaOptions=-XX:MaxPermSize=1g -XX:+UseConcMarkSweepGC" \
  15. --conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC" \
  16. /data/batchJob/XXXHdfs.jar
  17. ## 通过spark-shell --help 可以查看帮助参数

—master local:表示本地运行,只有一个工作进程,无并行计算能力。
—master local[K]:表示本地运行,有K个工作进程,通常K设置为CPU的核数。
—master local[*]:表示本地运行,工作进程数等于机器的CPU核数数量。[

](https://blog.csdn.net/silentanytime/article/details/107035491)

2.2、Standlone模式

独立部署(Standalone)模式是Spark 自身节点运行的集群模式,Spark 的Standalone 模式体现了经典的master-slave 模式。集群一般规划如下:

Linux1 Linux2 Linux3
Spark Worker Master Worker Worker

具体安装步骤如下:

  1. 1、解压缩文件
  2. spark-3.0.0-bin-hadoop3.2.tgz 文件上传到Linux 并解压缩在指定位置
  3. tar -zxvf spark-3.0.0-bin-hadoop3.2.tgz -C /opt/module cd /opt/module
  4. mv spark-3.0.0-bin-hadoop3.2 spark-standalone
  5. 2、修改配置文件
  6. 1)进入解压缩后路径的 conf 目录,修改 slaves.template 文件名为 slaves
  7. mv slaves.template slaves
  8. 2)修改 slaves 文件,添加work节点,内容为如下:
  9. linux1
  10. linux2
  11. linux3
  12. 3)修改 spark-env.sh.template 文件名为 spark-env.sh
  13. mv spark-env.sh.template spark-env.sh
  14. 4)修改 spark-env.sh 文件,添加 JAVA_HOME 环境变量和集群对应的 master 节点,内容为:
  15. export JAVA_HOME=/opt/module/jdk1.8.0_144
  16. SPARK_MASTER_HOST=linux1 # 指定master的ip
  17. SPARK_MASTER_PORT=7077 # 指定提交任务的端口
  18. SPARK_MASTER_WEBUI_PORT=8080
  19. HADOOP_CONF_DIR=... #读取hadoop配置的路径
  20. YARN_CONF_DIR=... #读取yarn配置的路径
  21. 注意:7077 端口,相当于 hadoop3 内部通信的 8020 端口,此处的端口需要确认自己的Hadoop配置
  22. 5)分发 spark-standalone 目录
  23. xsync spark-standalone
  24. 3、启动集群
  25. 1)执行脚本命令:
  26. sbin/start-all.sh
  27. 4、可以查看三台服务器的进程
  28. ================linux1================
  29. 3330 Jps
  30. 3238 Worker
  31. 3163 Master
  32. ================linux2================
  33. 2966 Jps
  34. 2908 Worker
  35. ================linux3================
  36. 2978 Worker
  37. 3036 Jps
  38. 5、查看 Master 资源监控Web UI 界面: http://linux1:8080
  39. 6、提交应用命令:
  40. bin/spark-submit \
  41. --class org.apache.spark.examples.SparkPi \
  42. --master spark://linux1:7077 \
  43. ./examples/jars/spark-examples_2.12-3.0.0.jar \ 10
  44. 参数说明:
  45. 1)--class 表示要执行程序的主类
  46. 2)--master spark://linux1:7077 独立部署模式,连接到Spark 集群,一般有模式:local[*]、spark://linux1:7077、Yarn
  47. 3)spark-examples_2.12-3.0.0.jar 运行类所在的 jar 包,包含依赖。这个URL在集群中全局可见。比如 hdfs://共享存储系统,如果是file:// path, 那么所有的节点的
  48. path 都包含同样的 jar
  49. 4)数字 10 表示程序的入口参数,用于设定当前应用的任务数量
  50. 5)--executor-memory 1G:指定每个executor可用内存为 1G
  51. 6)--total-executor-cores 2:指定所有executor使用的cpu 核数为2
  52. 7)--executor-cores:指定每个executor 使用的cpu 核数
  53. 8)application-arguments:传给 main()方法的参数
  54. 7、执行任务时,会产生多个 Java 进程:
  55. 一般会有:SparkSubmitCoarseGrainedExecutorBackend

2.3、配置历史服务器

由于 spark-shell 停止掉后,集群监控 linux1:4040 页面就看不到历史任务的运行情况,所以开发时都配置历史服务器记录任务运行情况。

  1. 1)修改 spark-defaults.conf.template 文件名为 spark-defaults.conf
  2. mv spark-defaults.conf.template spark-defaults.conf
  3. 2)修改 spark-default.conf 文件,配置日志存储路径
  4. spark.eventLog.enabled true
  5. spark.eventLog.dir hdfs://linux1:8020/directory
  6. 注意:需要启动 hadoop 集群,HDFS 上的directory 目录需要提前存在。
  7. sbin/start-dfs.sh
  8. hadoop fs -mkdir /directory
  9. 3)修改 spark-env.sh 文件, 添加日志配置
  10. export SPARK_HISTORY_OPTS="
  11. -Dspark.history.ui.port=18080
  12. -Dspark.history.fs.logDirectory=hdfs://linux1:8020/directory
  13. -Dspark.history.retainedApplications=30"
  14. ⚫参数 1 含义:WEB UI 访问的端口号为 18080
  15. ⚫参数 2 含义:指定历史服务器日志存储路径
  16. ⚫参数 3 含义:指定保存Application 历史记录的个数,如果超过这个值,旧的应用程序信息将被删除,这个是内存中的应用数,而不是页面上显示的应用数。
  17. 3.1)如果是yarn模式的还需要修改配置spark-defaults.conf
  18. spark.yarn.historyServer.address=linux1:18080
  19. spark.history.ui.port=18080
  20. 4)分发配置文件
  21. 5)重新启动集群和历史服务
  22. sbin/start-all.sh
  23. sbin/start-history-server.sh
  24. 6)重新执行任务,就可以在:http://linux1:18080,查看历史服务。

2.4、配置高可用( HA)

所谓的高可用是因为当前集群中的 Master 节点只有一个,所以会存在单点故障问题。所以为了解决单点故障问题,需要在集群中配置多个 Master 节点,一旦处于活动状态的 Master 发生故障时,由备用 Master 提供服务,保证作业可以继续执行。这里的高可用一般采用Zookeeper 设置,zookeeper配置如下:

Linux1 Linux2 Linux3
Spark Master Zookeeper
Worker
Master Zookeeper
Worker


Zookeeper Worker
  1. 1)停止集群
  2. sbin/stop-all.sh
  3. 2)启动Zookeeper
  4. xstart zk
  5. 3)修改 spark-env.sh 文件添加如下配置
  6. #SPARK_MASTER_HOST=linux1
  7. #SPARK_MASTER_PORT=7077
  8. 添加如下内容:
  9. #Master 监控页面默认访问端口为 8080,但是可能会和 Zookeeper 冲突,所以改成 8989,也可以自定义,访问 UI 监控页面时请注意
  10. SPARK_MASTER_WEBUI_PORT=8989
  11. export SPARK_DAEMON_JAVA_OPTS="
  12. -Dspark.deploy.recoveryMode=ZOOKEEPER
  13. -Dspark.deploy.zookeeper.url=linux1:2181,linux2:2181,linux3:2181
  14. -Dspark.deploy.zookeeper.dir=/spark"
  15. 4)分发配置文件
  16. xsync conf/
  17. 5)启动集群
  18. sbin/start-all.sh
  19. 6)启动 linux2 的单独 Master 节点,此时 linux2 节点 Master 状态处于备用状态
  20. sbin/start-master.sh
  21. 7)提交应用到高可用集群
  22. bin/spark-submit \
  23. --class org.apache.spark.examples.SparkPi \
  24. --master spark://linux1:7077,linux2:7077 \
  25. ./examples/jars/spark-examples_2.12-3.0.0.jar \ 10

2.5、Yarn模式

独立部署(Standalone)模式由 Spark 自身提供计算资源,无需其他框架提供资源。这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但是你也要记住,Spark 主要是计算框架,而不是资源调度框架,所以本身提供的资源调度并不是它的强项,所以还是和其他专业的资源调度框架集成会更靠谱一些。
SparkOnYarn根据Driver运行在哪里可以区分两种模式:client模式和cluster模式。

  1. 1)修改 hadoop 配置文件/opt/module/hadoop/etc/hadoop/yarn-site.xml
  2. <!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是 true -->
  3. <property>
  4. <name>yarn.nodemanager.pmem-check-enabled</name>
  5. <value>false</value>
  6. </property>
  7. <!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是 true -->
  8. <property>
  9. <name>yarn.nodemanager.vmem-check-enabled</name>
  10. <value>false</value>
  11. </property>
  12. 2)修改 conf/spark-env.sh,添加 JAVA_HOME YARN_CONF_DIR 配置
  13. mv spark-env.sh.template spark-env.sh
  14. export JAVA_HOME=/opt/module/jdk1.8.0_144
  15. YARN_CONF_DIR=/opt/module/hadoop/etc/hadoop
  16. HADOOP_CONF_DIR=/opt/module/hadoop/etc/hadoop #需要使用hdfs时配置
  17. 3)启动yarnhdfs
  18. 4)启动spark
  19. 5)提交作业
  20. bin/spark-submit \
  21. --class org.apache.spark.examples.SparkPi \
  22. --master yarn \
  23. --deploy-mode cluster \
  24. ./examples/jars/spark-examples_2.12-3.0.0.jar \ 10
  25. master 需要指定为yarn
  26. deploy-mode 可以指定为cluster模式或者client模式,client模式会把结果打印在控制台,而cluster模式不会。

2.5.1、client模式

client模式下:spark的Driver驱动程序,运行在提交任务的客户端上,和集群的通讯成本高。因为Driver在客户端,所以在Driver中的程序结果输出可以在客户端的控制台看到。
image.png

2.5.2、cluster模式

cluster模式下:Spark的Driver驱动程序,运行在yarn集群上,和集群的通讯成本低。且Driver是交给Yarn管理的,如果失败了会由yarn重启,因为Driver运行在yarn上,所以本地客户端控制台看不到输出的结果,需要在Yarn日志中查看。
image.png

2.6、部署模式对比

模式 Spark 安装机器数 需启动的进程 所属者 应用场景
Local 1 Spark 测试
Standalone 3 Master 及Worker Spark 单独部署
Yarn 1 Yarn 及HDFS Hadoop 混合部署

2.7、常用端口号

Ø Spark 查看当前Spark-shell 运行任务情况端口号:4040(计算)
Ø Spark Master 内部通信服务端口号:7077
Ø Standalone 模式下,Spark Master Web 端口号:8080(资源)
Ø Spark 历史服务器端口号:18080
Ø Hadoop YARN 任务运行情况查看端口号:8088