第一章.Spark概述
1.什么是Spark
回顾:Hadoop主要解决海量数据的存储和海量数据的分析计算
Spark是一种基于内存的快速,通用,可扩展的大数据分析计算引擎
2.Hadoop与Spark历史
![$01[Spark概述_运行模式_WordCount案例] - 图1](/uploads/projects/liuye-6lcqc@gws1uf/c6fec402979b012f43a9e897f0f1bbb3.png)
![$01[Spark概述_运行模式_WordCount案例] - 图2](/uploads/projects/liuye-6lcqc@gws1uf/d17b0eeb0c24f94cd3a777b6c972e974.png)
3.MR与Spark框架对比
![$01[Spark概述_运行模式_WordCount案例] - 图3](/uploads/projects/liuye-6lcqc@gws1uf/cb0fa971fcb1165958c8cff07d9fe547.png)
4.Spark内置模块
![$01[Spark概述_运行模式_WordCount案例] - 图4](/uploads/projects/liuye-6lcqc@gws1uf/21454917a92b9a0f0b23a06e65bd6e1d.png)
- Spark Core: 实现Spark的基本功能,包含任务调度,内存管理,错误恢复,与存储系统交互等模块,Spark Core中还包含了对弹性分布式数据集(Resilient Distributed DataSet,简称RDD)的API定义
- Spark SQL:是Spark用来操作结构化数据的程序包,通过spark Sql,我们可以使用SQL或者Apache Hive版本的HQL来查询数据,Spark SQL 支持多种数据源,如Hive表,Parquet以及JSON等
- Spark Streaming: 是Spark提供的对实时数据进行流式计算的组件,提供了用来操作数据库流的API,并且与Spark Core中的RDD API高度适应
- Spark MLlib: 提供常见的机器学习功能的程序库,包括分类,回归,聚类,协同过滤,还提供了模型评估,数据,导入等额外的支持功能
- Spark GraphX: 主要用于图形并行计算和图挖掘系统的组件
5.Spark特点
![$01[Spark概述_运行模式_WordCount案例] - 图5](/uploads/projects/liuye-6lcqc@gws1uf/da6ef0c796034fbe36546a8f0579d3a4.png)
第二章.Spark运行模式
部署Spark集群大体上分为两种模式:单机模式与集群模式
大多数分布式框架都支持单机模式,方便开发者调试框架的运行环境,但是在生产环境中,并不会使用单机模式,因此,后续直接按照集群模式部署Spark集群
下面详细列举了Spark目前支持的部署模式
- Local模式:在本地部署单个Spark服务
- Standalone模式:Spark自带的任务调度模式(国内常用)
- YARN模式:Spark使用Hadoop的Yarn组件进行资源与任务调度(国内常用)
- Mesos模式: Spark使用Mesos平台进行资源与任务的调度
文档查看地址:https://spark.apache.org/docs/3.0.0/
下载地址:https://spark.apache.org/downloads.html
[https://archive.apache.org/dist/spark/](https://archive.apache.org/dist/spark/)
1.Local模式
Local模式就是运行在一台计算机上的模式,通常就是用于在本机上练手和测试
一.安装与使用
- 上传并解压Spark安装包
tar -zxvf spark-3.0.0-bin-hadoop3.2.tgz -C /opt/modulemv spark-3.0.0-bin-hadoop3.2 spark-local
- 官方求PI案例
bin/spark-submit \--class org.apache.spark.examples.SparkPi \--master local[2] \./examples/jars/spark-examples_2.12-3.0.0.jar \10
- —class: 表示要指定程序的主类
- —master local[2]
- local:没有指定线程数,则所有计算都运行在一个线程当中,没有任何并行计算
- local[K]:指定使用K个Core来运行计算,比如local[2]就是运行两个Core来执行
- local[*]:自动帮你按照CPU最多核来设置线程数,比如CPU 有8核,Spark帮你自动设置8个线程计算
- spark-example_2.12-3.0.0.jar:要运行的程序
- 10:要运行程序的输入参数(计算圆周率π的次数,计算次数越多,准确率越高)
二.官方WordCount案例
- 需求:读取多个输入文件,统计每个单词出现的总次数- 需求分析
![$01[Spark概述_运行模式_WordCount案例] - 图6](/uploads/projects/liuye-6lcqc@gws1uf/73f580431b0b1346110e691951a111c3.png)
- 准备文件 wc.txt /opt/module/spark-local/input
- 启动spark-shell
![$01[Spark概述_运行模式_WordCount案例] - 图7](/uploads/projects/liuye-6lcqc@gws1uf/62e7f4b515ebcbe1412605652eeefde3.png)
注意:sc是SparkCore程序的入口,Spark是SparkSQL程序入口;master=local[*]表示本地模式运行
- 再开启一个hadoop102远程连接窗口,发现一个SparkSubmit进程
![$01[Spark概述_运行模式_WordCount案例] - 图8](/uploads/projects/liuye-6lcqc@gws1uf/9b51d8bb7f9ecb8ed176575381ac2921.png)
运行任务方式说明: spark-submit,是将jar上传到集群,执行spark任务,Spark-shell,相当于命令行工具,本身也是一个Application
- 登录hadoop102:4040,查看程序运行情况(注意spark-shell窗口关闭后,则hadoop102:4040页面关闭)
![$01[Spark概述_运行模式_WordCount案例] - 图9](/uploads/projects/liuye-6lcqc@gws1uf/03dfe0f44774a8d4594bd72ac1530b69.png)
说明:本地模式下,默认的调度器是FIFO
- 运行WordCount程序
sc.textFile("/opt/module/spark-local/input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
![$01[Spark概述_运行模式_WordCount案例] - 图10](/uploads/projects/liuye-6lcqc@gws1uf/5d7f416b8be6b2c0f0134327bfc7675a.png)
注意:只有collect开始执行时,才会加载数据
![$01[Spark概述_运行模式_WordCount案例] - 图11](/uploads/projects/liuye-6lcqc@gws1uf/0d5936274ff167c85d2fca55a5ad0f5e.png)
2.集群角色
一.Master和Worker集群资源管理
![$01[Spark概述_运行模式_WordCount案例] - 图12](/uploads/projects/liuye-6lcqc@gws1uf/0c18153edf8db3368350ea6cb6ece84c.png)
二.Driver和Executor任务的管理者
![$01[Spark概述_运行模式_WordCount案例] - 图13](/uploads/projects/liuye-6lcqc@gws1uf/e7631e6e55c286f11e124844b2d72f96.png)
3.Standalone模式
StandLone模式是spark自带的资源调动引擎,构建一个由Master+Slave构成的spark集群,Spark运行在集群中这个要和Hadoop的Standalone区别开来,这里的Standalone是指只用spark来搭建一个集群,不需要借助Hadoop的Yarn和Mesos等其他框架
一.安装使用
- 集群规划 | | hadoop102 | hadoop103 | hadoop104 | | —- | —- | —- | —- | | Spark | Master Worker | Worker | Worker |
- 再解压一份spark安装包,并修改解压后的文件夹名称为spark-standalone
tar -zxvf spark-3.0.0-bin-hadoop3.2.tgz -C /opt/module/mv spark-3.0.0-bin-hadoop3.2 spark-standalone
- 进入Spark的配置目录/opt/module/spark-standalone/conf
- 修改slave文件,添加work节点
mv slaves.template slavesvim slaves# 添加work节点hadoop102hadoop103hadoop104
- 修改spark-env.sh文件,添加master节点
mv spark-env.sh.template spark-env.shvim spark-env.sh# 添加master节点SPARK_MASTER_HOST=hadoop102SPARK_MASTER_PORT=7077
- 分发spark-standalone包
xsync spark-standalone/
- 启动spark集群
sbin/start-all.sh
查看三台服务器运行进程
![$01[Spark概述_运行模式_WordCount案例] - 图14](/uploads/projects/liuye-6lcqc@gws1uf/3ed6a0a3c62741458bafe6cd2b051663.png)
- 官方求π案例
bin/spark-submit \--class org.apache.spark.examples.SparkPi \--master spark://hadoop102:7077 \./examples/jars/spark-examples_2.12-3.0.0.jar \10
二.参数说明
| 参数 | 解释 | 可选值举例 |
|---|---|---|
| —class | Spark程序中包含主函数的类 | |
| —master | Spark程序运行的模式 | 本地模式:local[*]、spark://hadoop102:7077、yarn |
| —executor-memory 1G | 指定每个executor可用内存为1G | |
| —total-executor-cores 2 | 指定所有executor使用的cpu核数为2个 | |
| application-jar | 打包好的应用jar,包含依赖。这个URL在集群中全局可见。 比如hdfs:// 共享存储系统,如果是file:// path,那么所有的节点的path都包含同样的jar | |
| application-arguments | 传给main()方法的参数 |
三.配置历史服务器
由于spark-shell停止掉后,hadoop102:4040页面就看不到历史任务的运行情况,所以开发时都配置历史服务器记录任务运行情况
- 修改spark-default.conf.template名称
mv spark-defaults.conf.template spark-defaults.conf
- 修改spark-default.conf文件,配置日志存储路径(写),并分发
spark.eventLog.enabled truespark.eventLog.dir hdfs://hadoop102:8020/directory
注意:需要启动hadoop集群.HDFS上的目录需要提前存在
- 修改spark-env.sh添加如下配置
vi spark-env.shexport SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080-Dspark.history.fs.logDirectory=hdfs://hadoop102:8020/directory-Dspark.history.retainedApplications=30"
- 分发配置文件
xsync spark-env.sh
- 启动历史任务
sbin/start-history-server.sh
- 再次执行任务
bin/spark-submit \--class org.apache.spark.examples.SparkPi \--master spark://hadoop102:7077 \--executor-memory 1G \--total-executor-cores 2 \./examples/jars/spark-examples_2.12-3.0.0.jar \10
- 查看spark历史服务网址
http: hadoop102:18080
四.配置高可用
- 高可用原理
![$01[Spark概述_运行模式_WordCount案例] - 图15](/uploads/projects/liuye-6lcqc@gws1uf/c73835bdf6936ff24ee3c79badf4ce4a.png)
- 配置高可用
- 停止集群
sbin/stop-all.sh
- 启动Zookeeper
zookeeper.sh start
- 修改spark-env.sh文件添加如下配置
#注释掉如下内容:#SPARK_MASTER_HOST=hadoop102#SPARK_MASTER_PORT=7077#添加上如下内容。配置由Zookeeper管理Master,在Zookeeper节点中自动创建/spark目录,用于管理:export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER-Dspark.deploy.zookeeper.url=hadoop102,hadoop103,hadoop104-Dspark.deploy.zookeeper.dir=/spark"#添加如下代码#Zookeeper3.5的AdminServer默认端口是8080,和Spark的WebUI冲突export SPARK_MASTER_WEBUI_PORT=8989
- 分发配置文件
xsync spark-env.sh
- 在hadoop102上启动所有节点
sbin/start-all.sh
- 在hadoop103上单独启动master节点
sbin/start-master.sh
- Spark HA集群访问
bin/spark-shell \--master spark://hadoop102:7077,hadoop103:7077 \--executor-memory 2g \--total-executor-cores 2
参数:—master spark://hadoop102:7077指定要连接的集群的master
五.运行流程
Spark有standalone-client和standalone-cluster两种模式,主要区别在于:Driver程序的运行节点。
- 客户端模式
[atguigu@hadoop102 spark-standalone]$ bin/spark-submit \--class org.apache.spark.examples.SparkPi \--master spark://hadoop102:7077,hadoop103:7077 \--executor-memory 2G \--total-executor-cores 2 \--deploy-mode client \./examples/jars/spark-examples_2.12-3.0.0.jar \10
--deploy-mode client,表示Driver程序运行在本地客户端
![$01[Spark概述_运行模式_WordCount案例] - 图16](/uploads/projects/liuye-6lcqc@gws1uf/88971f953d076444ce1131e25c8231a4.png)
- 集群模式
[atguigu@hadoop102 spark-standalone]$ bin/spark-submit \--class org.apache.spark.examples.SparkPi \--master spark://hadoop102:7077,hadoop103:7077 \--executor-memory 2G \--total-executor-cores 2 \--deploy-mode cluster \./examples/jars/spark-examples_2.12-3.0.0.jar \10
--deploy-mode cluster,表示Driver程序运行在集群
![$01[Spark概述_运行模式_WordCount案例] - 图17](/uploads/projects/liuye-6lcqc@gws1uf/2b8d53e7d795d64235dbacd55de3dcb1.png)
4.Yarn模式
spark客户端直接连接Yarn,不需要额外构建Spark集群
一.安装使用
- 停止Standalone模式下的spark集群
[atguigu@hadoop102 spark-standalone]$ sbin/stop-all.sh[atguigu@hadoop102 spark-standalone]$ zk.sh stop[atguigu@hadoop103 spark-standalone]$ sbin/stop-master.sh
- 为了防止和Standalone模式冲突,再单独解压一份spark
tar -zxvf spark-3.0.0-bin-hadoop3.2.tgz -C /opt/module/
- 进入到/opt/module目录,修改spark-3.0.0-bin-hadoop3.2名称为spark-yarn
mv spark-3.0.0-bin-hadoop3.2/ spark-yarn
- 修改hadoop配置文件/opt/module/hadoop-3.1.3/etc/hadoop/yarn-site.xml,添加如下内容
[atguigu@hadoop102 hadoop]$ vi yarn-site.xml<!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true --><property><name>yarn.nodemanager.pmem-check-enabled</name><value>false</value></property><!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true --><property><name>yarn.nodemanager.vmem-check-enabled</name><value>false</value></property>
- 分发配置文件
xsync /opt/module/hadoop-3.1.3/etc/hadoop/yarn-site.xml
- 修改/opt/module/spark-yarn/conf/spark-env.sh,添加YARN_CONF_DIR配置,保证后续运行任务的路径都变成集群路径
[atguigu@hadoop102 conf]$ mv spark-env.sh.template spark-env.sh[atguigu@hadoop102 conf]$ vi spark-env.shYARN_CONF_DIR=/opt/module/hadoop-3.1.3/etc/hadoop
- 启动HDFS以及YARN集群
[atguigu@hadoop102 hadoop-3.1.3]$ sbin/start-dfs.sh[atguigu@hadoop103 hadoop-3.1.3]$ sbin/start-yarn.sh
二.配置历史服务
由于是重新解压的Spark压缩文件,所以需要针对Yarn模式,再次配置一下历史服务器。
- 修改spark-default.conf.template名称
mv spark-defaults.conf.template spark-defaults.conf
- 修改spark-default.conf文件,配置日志存储路径(写)
[atguigu@hadoop102 conf]$ vi spark-defaults.confspark.eventLog.enabled truespark.eventLog.dir hdfs://hadoop102:8020/directory
- 修改spark-env.sh文件,添加如下配置:
[atguigu@hadoop102 conf]$ vi spark-env.shexport SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080-Dspark.history.fs.logDirectory=hdfs://hadoop102:8020/directory-Dspark.history.retainedApplications=30"
三.配置查看历史日志
为了从Yarn上关联到Spark历史服务器,需要配置关联路径
- 修改配置文件/opt/module/spark-yarn/conf/spark-defaults.conf
#添加如下内容spark.yarn.historyServer.address=hadoop102:18080spark.history.ui.port=18080
- 重启Spark历史服务
[atguigu@hadoop102 spark-yarn]$ sbin/stop-history-server.sh[atguigu@hadoop102 spark-yarn]$ sbin/start-history-server.sh
- 提交任务到Yarn执行
bin/spark-submit \--class org.apache.spark.examples.SparkPi \--master yarn \./examples/jars/spark-examples_2.12-3.0.0.jar \10
- Web页面查看日志:http://hadoop103:8088/cluster
四.运行流程
spark有yarn-client和yarn-cluster两种模式,主要区别在于:Driver程序的运行节点
- yarn-client:Driver程序运行在客户端,适用于交互、调试,希望立即看到app的输出。
- yarn-cluster:Driver程序运行在由ResourceManager启动的APPMaster适用于生产环境。
- 客户端模式(默认)
[atguigu@hadoop102 spark-yarn]$ bin/spark-submit \--class org.apache.spark.examples.SparkPi \--master yarn \--deploy-mode client \./examples/jars/spark-examples_2.12-3.0.0.jar \10
![$01[Spark概述_运行模式_WordCount案例] - 图18](/uploads/projects/liuye-6lcqc@gws1uf/8b0591d1e91446e44fc623ad7d07d59d.png)
- 集群模式
[atguigu@hadoop102 spark-yarn]$ bin/spark-submit \--class org.apache.spark.examples.SparkPi \--master yarn \--deploy-mode cluster \./examples/jars/spark-examples_2.12-3.0.0.jar \10
![$01[Spark概述_运行模式_WordCount案例] - 图19](/uploads/projects/liuye-6lcqc@gws1uf/e1497331efb18b1223610b55545df5fb.png)
注意:在测试Standalone模式,cluster运行流程的时候,阿里云用户访问不到Worker,因为Worker是从Master内部跳转的,这是正常的,实际工作中我们不可能通过客户端访问的,这些端口对外都会禁用,需要的时候会通过授权到Master访问Worker
5.Mesos模式(了解)
Spark客户端直接连接Mesos,不需要额外构建Spark集群,国内应用比较少,更多的是运用Yarn调度
6.几种模式对比
| 模式 | Spark安装机器数 | 需启动的进程 | 所属者 |
|---|---|---|---|
| Local | 1 | 无 | Spark |
| Standlone | 3 | Master及Worker | Spark |
| Yarn | 1 | Yarn及HDFS | Hadoop |
7.端口号总结
- Spark查看当前Spark-shell运行任务情况端口号:4040
- Spark Master内部通信服务端口号:7077 (类比于Hadoop的9820(8020)端口)
- Spark Standalone模式Master Web端口号:8080(类比于Hadoop YARN任务运行情况查看端口号:8088)
- Spark历史服务器端口号:18080 (类比于Hadoop历史服务器端口号:19888)
第三章.WordCount案例实操
Spark Shell仅在测试和验证我们的程序时使用的较多,在生产环境中,通常会在idea中编制程序,然后打成jar包,提交到集群,最常用的是创建一个Maven项目,,利用Maven来管理Jar包的依赖
1.编写程序
- 新建一个Maven工程,并添加scala支持
![$01[Spark概述_运行模式_WordCount案例] - 图20](/uploads/projects/liuye-6lcqc@gws1uf/cc83b49438d2762f5c80a665811c25cf.png)
- 在main下创建scala文件夹,并右键mark directory as Sources Root
- 准备测试文件
- 导入项目依赖
<dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.0.0</version></dependency></dependencies><build><sourceDirectory>src/main/scala</sourceDirectory><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.0</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.0</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass></mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build>
- 编写代码(本地)
package com.atguigu.spark.day01import org.apache.spark.{SparkConf, SparkContext}object $01_WordCount {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[4]").setAppName("test")val sc = new SparkContext(conf)//读取数据val rdd1 = sc.textFile("datas/wc.txt")//切割+压平val rdd2 = rdd1.flatMap(_.split(" "))//按照单词分组val rdd3 = rdd2.groupBy(x => x)//聚合val rdd4 = rdd3.map(x => {(x._1, x._2.size)})val arr = rdd4.collect()//结果展示println(arr.toList)}}
- 编写代码(集群)
package com.atguigu.spark.day01import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}object $02_WordCountCluster {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("test")val sc = new SparkContext(conf)//读取数据val rdd1:RDD[String] = sc.textFile(args(0))//切割+压平val rdd2 = rdd1.flatMap(_.split(" "))//按照单词分组val rdd3 = rdd2.groupBy(x => x)//聚合val rdd4 = rdd3.map(x => {(x._1, x._2.size)})val arr = rdd4.collect()//结果展示println(arr.toList)}}
- 打包到集群进行测试
bin/spark-submit --class com.atguigu.spark.day01.\$02_WordCountCluster --master yarn /opt/module/original-spark-demo-1.0-SNAPSHOT.jar /input /output
发现报错:
![$01[Spark概述_运行模式_WordCount案例] - 图21](/uploads/projects/liuye-6lcqc@gws1uf/7b75c0580c6b7f0a4cb9c62c82700963.png)
原因:Spark on Yarn会默认使用Hadoop集群配置文件设置编码方式,但是Spark在自己的spark-yarn/jars 包里面没有找到支持lzo压缩的jar包,所以报错。
- 解决方案:拷贝lzo的包到/opt/module/spark-yarns/jar,并重新执行
cp /opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-lzo-0.4.20.jar /opt/module/spark-yarn/jars
![$01[Spark概述_运行模式_WordCount案例] - 图22](/uploads/projects/liuye-6lcqc@gws1uf/af94b37eb4e26f6e90a80c94ec07dabf.png)
