第一章.Spark概述

1.什么是Spark

回顾:Hadoop主要解决海量数据的存储和海量数据的分析计算

Spark是一种基于内存的快速,通用,可扩展的大数据分析计算引擎

2.Hadoop与Spark历史

$01[Spark概述_运行模式_WordCount案例] - 图1

$01[Spark概述_运行模式_WordCount案例] - 图2

3.MR与Spark框架对比

$01[Spark概述_运行模式_WordCount案例] - 图3

4.Spark内置模块

$01[Spark概述_运行模式_WordCount案例] - 图4

  • Spark Core: 实现Spark的基本功能,包含任务调度,内存管理,错误恢复,与存储系统交互等模块,Spark Core中还包含了对弹性分布式数据集(Resilient Distributed DataSet,简称RDD)的API定义
  • Spark SQL:是Spark用来操作结构化数据的程序包,通过spark Sql,我们可以使用SQL或者Apache Hive版本的HQL来查询数据,Spark SQL 支持多种数据源,如Hive表,Parquet以及JSON等
  • Spark Streaming: 是Spark提供的对实时数据进行流式计算的组件,提供了用来操作数据库流的API,并且与Spark Core中的RDD API高度适应
  • Spark MLlib: 提供常见的机器学习功能的程序库,包括分类,回归,聚类,协同过滤,还提供了模型评估,数据,导入等额外的支持功能
  • Spark GraphX: 主要用于图形并行计算和图挖掘系统的组件

5.Spark特点

$01[Spark概述_运行模式_WordCount案例] - 图5

第二章.Spark运行模式

部署Spark集群大体上分为两种模式:单机模式与集群模式

大多数分布式框架都支持单机模式,方便开发者调试框架的运行环境,但是在生产环境中,并不会使用单机模式,因此,后续直接按照集群模式部署Spark集群

下面详细列举了Spark目前支持的部署模式

  1. Local模式:在本地部署单个Spark服务
  2. Standalone模式:Spark自带的任务调度模式(国内常用)
  3. YARN模式:Spark使用Hadoop的Yarn组件进行资源与任务调度(国内常用)
  4. Mesos模式: Spark使用Mesos平台进行资源与任务的调度

官网地址:http://spark.apache.org/

文档查看地址:https://spark.apache.org/docs/3.0.0/

下载地址:https://spark.apache.org/downloads.html

  1. [https://archive.apache.org/dist/spark/](https://archive.apache.org/dist/spark/)

1.Local模式

Local模式就是运行在一台计算机上的模式,通常就是用于在本机上练手和测试

一.安装与使用

  1. 上传并解压Spark安装包
  1. tar -zxvf spark-3.0.0-bin-hadoop3.2.tgz -C /opt/module
  2. mv spark-3.0.0-bin-hadoop3.2 spark-local
  1. 官方求PI案例
  1. bin/spark-submit \
  2. --class org.apache.spark.examples.SparkPi \
  3. --master local[2] \
  4. ./examples/jars/spark-examples_2.12-3.0.0.jar \
  5. 10
  • —class: 表示要指定程序的主类
  • —master local[2]
  • local:没有指定线程数,则所有计算都运行在一个线程当中,没有任何并行计算
  • local[K]:指定使用K个Core来运行计算,比如local[2]就是运行两个Core来执行
  • local[*]:自动帮你按照CPU最多核来设置线程数,比如CPU 有8核,Spark帮你自动设置8个线程计算
  • spark-example_2.12-3.0.0.jar:要运行的程序
  • 10:要运行程序的输入参数(计算圆周率π的次数,计算次数越多,准确率越高)

二.官方WordCount案例

  1. - 需求:读取多个输入文件,统计每个单词出现的总次数
  2. - 需求分析

$01[Spark概述_运行模式_WordCount案例] - 图6

  1. 准备文件 wc.txt /opt/module/spark-local/input
  2. 启动spark-shell

$01[Spark概述_运行模式_WordCount案例] - 图7

注意:sc是SparkCore程序的入口,Spark是SparkSQL程序入口;master=local[*]表示本地模式运行

  1. 再开启一个hadoop102远程连接窗口,发现一个SparkSubmit进程

$01[Spark概述_运行模式_WordCount案例] - 图8

运行任务方式说明: spark-submit,是将jar上传到集群,执行spark任务,Spark-shell,相当于命令行工具,本身也是一个Application

  1. 登录hadoop102:4040,查看程序运行情况(注意spark-shell窗口关闭后,则hadoop102:4040页面关闭)

$01[Spark概述_运行模式_WordCount案例] - 图9

说明:本地模式下,默认的调度器是FIFO

  1. 运行WordCount程序
  1. sc.textFile("/opt/module/spark-local/input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect

$01[Spark概述_运行模式_WordCount案例] - 图10

注意:只有collect开始执行时,才会加载数据

$01[Spark概述_运行模式_WordCount案例] - 图11

2.集群角色

一.Master和Worker集群资源管理

$01[Spark概述_运行模式_WordCount案例] - 图12

二.Driver和Executor任务的管理者

$01[Spark概述_运行模式_WordCount案例] - 图13

3.Standalone模式

  1. StandLone模式是spark自带的资源调动引擎,构建一个由Master+Slave构成的spark集群,Spark运行在集群中
  2. 这个要和HadoopStandalone区别开来,这里的Standalone是指只用spark来搭建一个集群,不需要借助HadoopYarnMesos等其他框架

一.安装使用

  1. 集群规划 | | hadoop102 | hadoop103 | hadoop104 | | —- | —- | —- | —- | | Spark | Master Worker | Worker | Worker |
  1. 再解压一份spark安装包,并修改解压后的文件夹名称为spark-standalone
  1. tar -zxvf spark-3.0.0-bin-hadoop3.2.tgz -C /opt/module/
  2. mv spark-3.0.0-bin-hadoop3.2 spark-standalone
  1. 进入Spark的配置目录/opt/module/spark-standalone/conf
  2. 修改slave文件,添加work节点
  1. mv slaves.template slaves
  2. vim slaves
  3. # 添加work节点
  4. hadoop102
  5. hadoop103
  6. hadoop104
  1. 修改spark-env.sh文件,添加master节点
  1. mv spark-env.sh.template spark-env.sh
  2. vim spark-env.sh
  3. # 添加master节点
  4. SPARK_MASTER_HOST=hadoop102
  5. SPARK_MASTER_PORT=7077
  1. 分发spark-standalone包
  1. xsync spark-standalone/
  1. 启动spark集群
  1. sbin/start-all.sh

查看三台服务器运行进程

$01[Spark概述_运行模式_WordCount案例] - 图14

  1. 官方求π案例
  1. bin/spark-submit \
  2. --class org.apache.spark.examples.SparkPi \
  3. --master spark://hadoop102:7077 \
  4. ./examples/jars/spark-examples_2.12-3.0.0.jar \
  5. 10

二.参数说明

参数 解释 可选值举例
—class Spark程序中包含主函数的类
—master Spark程序运行的模式 本地模式:local[*]、spark://hadoop102:7077、yarn
—executor-memory 1G 指定每个executor可用内存为1G
—total-executor-cores 2 指定所有executor使用的cpu核数为2个
application-jar 打包好的应用jar,包含依赖。这个URL在集群中全局可见。 比如hdfs:// 共享存储系统,如果是file:// path,那么所有的节点的path都包含同样的jar
application-arguments 传给main()方法的参数

三.配置历史服务器

由于spark-shell停止掉后,hadoop102:4040页面就看不到历史任务的运行情况,所以开发时都配置历史服务器记录任务运行情况

  1. 修改spark-default.conf.template名称
  1. mv spark-defaults.conf.template spark-defaults.conf
  1. 修改spark-default.conf文件,配置日志存储路径(写),并分发
  1. spark.eventLog.enabled true
  2. spark.eventLog.dir hdfs://hadoop102:8020/directory

注意:需要启动hadoop集群.HDFS上的目录需要提前存在

  1. 修改spark-env.sh添加如下配置
  1. vi spark-env.sh
  2. export SPARK_HISTORY_OPTS="
  3. -Dspark.history.ui.port=18080
  4. -Dspark.history.fs.logDirectory=hdfs://hadoop102:8020/directory
  5. -Dspark.history.retainedApplications=30"
  1. 分发配置文件
  1. xsync spark-env.sh
  1. 启动历史任务
  1. sbin/start-history-server.sh
  1. 再次执行任务
  1. bin/spark-submit \
  2. --class org.apache.spark.examples.SparkPi \
  3. --master spark://hadoop102:7077 \
  4. --executor-memory 1G \
  5. --total-executor-cores 2 \
  6. ./examples/jars/spark-examples_2.12-3.0.0.jar \
  7. 10
  1. 查看spark历史服务网址

http: hadoop102:18080

四.配置高可用

  1. 高可用原理

$01[Spark概述_运行模式_WordCount案例] - 图15

  1. 配置高可用
  2. 停止集群
  1. sbin/stop-all.sh
  1. 启动Zookeeper
  1. zookeeper.sh start
  1. 修改spark-env.sh文件添加如下配置
  1. #注释掉如下内容:
  2. #SPARK_MASTER_HOST=hadoop102
  3. #SPARK_MASTER_PORT=7077
  4. #添加上如下内容。配置由Zookeeper管理Master,在Zookeeper节点中自动创建/spark目录,用于管理:
  5. export SPARK_DAEMON_JAVA_OPTS="
  6. -Dspark.deploy.recoveryMode=ZOOKEEPER
  7. -Dspark.deploy.zookeeper.url=hadoop102,hadoop103,hadoop104
  8. -Dspark.deploy.zookeeper.dir=/spark"
  9. #添加如下代码
  10. #Zookeeper3.5的AdminServer默认端口是8080,和Spark的WebUI冲突
  11. export SPARK_MASTER_WEBUI_PORT=8989
  1. 分发配置文件
  1. xsync spark-env.sh
  1. 在hadoop102上启动所有节点
  1. sbin/start-all.sh
  1. 在hadoop103上单独启动master节点
  1. sbin/start-master.sh
  1. Spark HA集群访问
  1. bin/spark-shell \
  2. --master spark://hadoop102:7077,hadoop103:7077 \
  3. --executor-memory 2g \
  4. --total-executor-cores 2

参数:—master spark://hadoop102:7077指定要连接的集群的master

五.运行流程

Spark有standalone-client和standalone-cluster两种模式,主要区别在于:Driver程序的运行节点。

  1. 客户端模式
  1. [atguigu@hadoop102 spark-standalone]$ bin/spark-submit \
  2. --class org.apache.spark.examples.SparkPi \
  3. --master spark://hadoop102:7077,hadoop103:7077 \
  4. --executor-memory 2G \
  5. --total-executor-cores 2 \
  6. --deploy-mode client \
  7. ./examples/jars/spark-examples_2.12-3.0.0.jar \
  8. 10
  1. --deploy-mode client,表示Driver程序运行在本地客户端

$01[Spark概述_运行模式_WordCount案例] - 图16

  1. 集群模式
  1. [atguigu@hadoop102 spark-standalone]$ bin/spark-submit \
  2. --class org.apache.spark.examples.SparkPi \
  3. --master spark://hadoop102:7077,hadoop103:7077 \
  4. --executor-memory 2G \
  5. --total-executor-cores 2 \
  6. --deploy-mode cluster \
  7. ./examples/jars/spark-examples_2.12-3.0.0.jar \
  8. 10
  1. --deploy-mode cluster,表示Driver程序运行在集群

$01[Spark概述_运行模式_WordCount案例] - 图17

4.Yarn模式

spark客户端直接连接Yarn,不需要额外构建Spark集群

一.安装使用

  1. 停止Standalone模式下的spark集群
  1. [atguigu@hadoop102 spark-standalone]$ sbin/stop-all.sh
  2. [atguigu@hadoop102 spark-standalone]$ zk.sh stop
  3. [atguigu@hadoop103 spark-standalone]$ sbin/stop-master.sh
  1. 为了防止和Standalone模式冲突,再单独解压一份spark
  1. tar -zxvf spark-3.0.0-bin-hadoop3.2.tgz -C /opt/module/
  1. 进入到/opt/module目录,修改spark-3.0.0-bin-hadoop3.2名称为spark-yarn
  1. mv spark-3.0.0-bin-hadoop3.2/ spark-yarn
  1. 修改hadoop配置文件/opt/module/hadoop-3.1.3/etc/hadoop/yarn-site.xml,添加如下内容
  1. [atguigu@hadoop102 hadoop]$ vi yarn-site.xml
  2. <!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
  3. <property>
  4. <name>yarn.nodemanager.pmem-check-enabled</name>
  5. <value>false</value>
  6. </property>
  7. <!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
  8. <property>
  9. <name>yarn.nodemanager.vmem-check-enabled</name>
  10. <value>false</value>
  11. </property>
  1. 分发配置文件
  1. xsync /opt/module/hadoop-3.1.3/etc/hadoop/yarn-site.xml
  1. 修改/opt/module/spark-yarn/conf/spark-env.sh,添加YARN_CONF_DIR配置,保证后续运行任务的路径都变成集群路径
  1. [atguigu@hadoop102 conf]$ mv spark-env.sh.template spark-env.sh
  2. [atguigu@hadoop102 conf]$ vi spark-env.sh
  3. YARN_CONF_DIR=/opt/module/hadoop-3.1.3/etc/hadoop
  1. 启动HDFS以及YARN集群
  1. [atguigu@hadoop102 hadoop-3.1.3]$ sbin/start-dfs.sh
  2. [atguigu@hadoop103 hadoop-3.1.3]$ sbin/start-yarn.sh

二.配置历史服务

由于是重新解压的Spark压缩文件,所以需要针对Yarn模式,再次配置一下历史服务器。

  1. 修改spark-default.conf.template名称
  1. mv spark-defaults.conf.template spark-defaults.conf
  1. 修改spark-default.conf文件,配置日志存储路径(写)
  1. [atguigu@hadoop102 conf]$ vi spark-defaults.conf
  2. spark.eventLog.enabled true
  3. spark.eventLog.dir hdfs://hadoop102:8020/directory
  1. 修改spark-env.sh文件,添加如下配置:
  1. [atguigu@hadoop102 conf]$ vi spark-env.sh
  2. export SPARK_HISTORY_OPTS="
  3. -Dspark.history.ui.port=18080
  4. -Dspark.history.fs.logDirectory=hdfs://hadoop102:8020/directory
  5. -Dspark.history.retainedApplications=30"

三.配置查看历史日志

为了从Yarn上关联到Spark历史服务器,需要配置关联路径

  1. 修改配置文件/opt/module/spark-yarn/conf/spark-defaults.conf
  1. #添加如下内容
  2. spark.yarn.historyServer.address=hadoop102:18080
  3. spark.history.ui.port=18080
  1. 重启Spark历史服务
  1. [atguigu@hadoop102 spark-yarn]$ sbin/stop-history-server.sh
  2. [atguigu@hadoop102 spark-yarn]$ sbin/start-history-server.sh
  1. 提交任务到Yarn执行
  1. bin/spark-submit \
  2. --class org.apache.spark.examples.SparkPi \
  3. --master yarn \
  4. ./examples/jars/spark-examples_2.12-3.0.0.jar \
  5. 10
  1. Web页面查看日志:http://hadoop103:8088/cluster

四.运行流程

spark有yarn-client和yarn-cluster两种模式,主要区别在于:Driver程序的运行节点

  • yarn-client:Driver程序运行在客户端,适用于交互、调试,希望立即看到app的输出。
  • yarn-cluster:Driver程序运行在由ResourceManager启动的APPMaster适用于生产环境。
  1. 客户端模式(默认)
  1. [atguigu@hadoop102 spark-yarn]$ bin/spark-submit \
  2. --class org.apache.spark.examples.SparkPi \
  3. --master yarn \
  4. --deploy-mode client \
  5. ./examples/jars/spark-examples_2.12-3.0.0.jar \
  6. 10

$01[Spark概述_运行模式_WordCount案例] - 图18

  1. 集群模式
  1. [atguigu@hadoop102 spark-yarn]$ bin/spark-submit \
  2. --class org.apache.spark.examples.SparkPi \
  3. --master yarn \
  4. --deploy-mode cluster \
  5. ./examples/jars/spark-examples_2.12-3.0.0.jar \
  6. 10

$01[Spark概述_运行模式_WordCount案例] - 图19

注意:在测试Standalone模式,cluster运行流程的时候,阿里云用户访问不到Worker,因为Worker是从Master内部跳转的,这是正常的,实际工作中我们不可能通过客户端访问的,这些端口对外都会禁用,需要的时候会通过授权到Master访问Worker

5.Mesos模式(了解)

Spark客户端直接连接Mesos,不需要额外构建Spark集群,国内应用比较少,更多的是运用Yarn调度

6.几种模式对比

模式 Spark安装机器数 需启动的进程 所属者
Local 1 Spark
Standlone 3 Master及Worker Spark
Yarn 1 Yarn及HDFS Hadoop

7.端口号总结

  • Spark查看当前Spark-shell运行任务情况端口号:4040
  • Spark Master内部通信服务端口号:7077 (类比于Hadoop的9820(8020)端口)
  • Spark Standalone模式Master Web端口号:8080(类比于Hadoop YARN任务运行情况查看端口号:8088)
  • Spark历史服务器端口号:18080 (类比于Hadoop历史服务器端口号:19888)

第三章.WordCount案例实操

Spark Shell仅在测试和验证我们的程序时使用的较多,在生产环境中,通常会在idea中编制程序,然后打成jar包,提交到集群,最常用的是创建一个Maven项目,,利用Maven来管理Jar包的依赖

1.编写程序

  1. 新建一个Maven工程,并添加scala支持

$01[Spark概述_运行模式_WordCount案例] - 图20

  1. 在main下创建scala文件夹,并右键mark directory as Sources Root
  2. 准备测试文件
  3. 导入项目依赖
  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.spark</groupId>
  4. <artifactId>spark-core_2.12</artifactId>
  5. <version>3.0.0</version>
  6. </dependency>
  7. </dependencies>
  8. <build>
  9. <sourceDirectory>src/main/scala</sourceDirectory>
  10. <plugins>
  11. <plugin>
  12. <groupId>org.apache.maven.plugins</groupId>
  13. <artifactId>maven-compiler-plugin</artifactId>
  14. <version>3.0</version>
  15. <configuration>
  16. <source>1.8</source>
  17. <target>1.8</target>
  18. <encoding>UTF-8</encoding>
  19. </configuration>
  20. </plugin>
  21. <plugin>
  22. <groupId>net.alchim31.maven</groupId>
  23. <artifactId>scala-maven-plugin</artifactId>
  24. <version>3.2.0</version>
  25. <executions>
  26. <execution>
  27. <goals>
  28. <goal>compile</goal>
  29. <goal>testCompile</goal>
  30. </goals>
  31. <configuration>
  32. <args>
  33. <arg>-dependencyfile</arg>
  34. <arg>${project.build.directory}/.scala_dependencies</arg>
  35. </args>
  36. </configuration>
  37. </execution>
  38. </executions>
  39. </plugin>
  40. <plugin>
  41. <groupId>org.apache.maven.plugins</groupId>
  42. <artifactId>maven-shade-plugin</artifactId>
  43. <version>3.1.1</version>
  44. <executions>
  45. <execution>
  46. <phase>package</phase>
  47. <goals>
  48. <goal>shade</goal>
  49. </goals>
  50. <configuration>
  51. <filters>
  52. <filter>
  53. <artifact>*:*</artifact>
  54. <excludes>
  55. <exclude>META-INF/*.SF</exclude>
  56. <exclude>META-INF/*.DSA</exclude>
  57. <exclude>META-INF/*.RSA</exclude>
  58. </excludes>
  59. </filter>
  60. </filters>
  61. <transformers>
  62. <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
  63. <mainClass></mainClass>
  64. </transformer>
  65. </transformers>
  66. </configuration>
  67. </execution>
  68. </executions>
  69. </plugin>
  70. </plugins>
  71. </build>
  1. 编写代码(本地)
  1. package com.atguigu.spark.day01
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object $01_WordCount {
  4. def main(args: Array[String]): Unit = {
  5. val conf = new SparkConf().setMaster("local[4]").setAppName("test")
  6. val sc = new SparkContext(conf)
  7. //读取数据
  8. val rdd1 = sc.textFile("datas/wc.txt")
  9. //切割+压平
  10. val rdd2 = rdd1.flatMap(_.split(" "))
  11. //按照单词分组
  12. val rdd3 = rdd2.groupBy(x => x)
  13. //聚合
  14. val rdd4 = rdd3.map(x => {
  15. (x._1, x._2.size)
  16. })
  17. val arr = rdd4.collect()
  18. //结果展示
  19. println(arr.toList)
  20. }
  21. }
  1. 编写代码(集群)
  1. package com.atguigu.spark.day01
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object $02_WordCountCluster {
  5. def main(args: Array[String]): Unit = {
  6. val conf = new SparkConf().setAppName("test")
  7. val sc = new SparkContext(conf)
  8. //读取数据
  9. val rdd1:RDD[String] = sc.textFile(args(0))
  10. //切割+压平
  11. val rdd2 = rdd1.flatMap(_.split(" "))
  12. //按照单词分组
  13. val rdd3 = rdd2.groupBy(x => x)
  14. //聚合
  15. val rdd4 = rdd3.map(x => {
  16. (x._1, x._2.size)
  17. })
  18. val arr = rdd4.collect()
  19. //结果展示
  20. println(arr.toList)
  21. }
  22. }
  1. 打包到集群进行测试
  1. bin/spark-submit --class com.atguigu.spark.day01.\$02_WordCountCluster --master yarn /opt/module/original-spark-demo-1.0-SNAPSHOT.jar /input /output

发现报错:

$01[Spark概述_运行模式_WordCount案例] - 图21

原因:Spark on Yarn会默认使用Hadoop集群配置文件设置编码方式,但是Spark在自己的spark-yarn/jars 包里面没有找到支持lzo压缩的jar包,所以报错。

  1. 解决方案:拷贝lzo的包到/opt/module/spark-yarns/jar,并重新执行
  1. cp /opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-lzo-0.4.20.jar /opt/module/spark-yarn/jars

$01[Spark概述_运行模式_WordCount案例] - 图22