1 Spark概述

1.1 什么是Spark(官网:http://spark.apache.org

Spark是一种快速、通用、可扩展的大数据分析引擎,2009年诞生于加州大学伯克利分校AMPLab,2010年开源,2013年6月成为Apache孵化项目,2014年2月成为Apache顶级项目。目前,Spark生态系统已经发展成为一个包含多个子项目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等子项目,Spark是基于内存计算的大数据并行计算框架。Spark基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将Spark部署在大量廉价硬件之上,形成集群。
内存 + 磁盘

1.2 为什么要学Spark

中间结果输出:基于MapReduce的计算引擎通常会将中间结果输出到磁盘上,进行存储和容错。出于任务管道承接的考虑,当一些查询翻译到MapReduce任务时,往往会产生多个Stage,而这些串联的Stage又依赖于底层文件系统(如HDFS)来存储每一个Stage的输出结果
Spark安装 - 图2

Spark是在借鉴了MapReduce之上发展而来的,继承了其分布式并行计算的优点并改进了MapReduce明显的缺陷,(spark与hadoop的差异)具体如下:
首先,Spark把中间数据放到内存中,迭代运算效率高。MapReduce中计算结果需要落地,保存到磁盘上,这样势必会影响整体速度,而Spark支持DAG图的分布式并行计算的编程框架,减少了迭代过程中数据的落地,提高了处理效率。(延迟加载)

其次,Spark容错性高。Spark引进了弹性分布式数据集RDD (Resilient Distributed Dataset) 的抽象,它是分布在一组节点中的只读对象集合,这些集合是弹性的,如果数据集一部分丢失,则可以根据“血统”(即允许基于数据衍生过程)对它们进行重建。另外在RDD计算时可以通过CheckPoint来实现容错。

最后,Spark更加通用。不像Hadoop只提供了Map和Reduce两种操作,Spark提供的数据集操作类型有很多种,大致分为:Transformations和Actions两大类。Transformations包括Map、Filter、FlatMap、Sample、GroupByKey、ReduceByKey、Union、Join、Cogroup、MapValues、Sort等多种操作类型,同时还提供Count, Actions包括Collect、Reduce、Lookup和Save等操作。

Spark是MapReduce的替代方案,而且兼容HDFS、Hive,可融入Hadoop的生态系统,以弥补MapReduce的不足。

1.3 Spark特点

1.3.1 快

与Hadoop的MapReduce相比,Spark基于内存的运算要快100倍以上,基于硬盘的运算也要快10倍以上。Spark实现了高效的DAG执行引擎,可以通过基于内存来高效处理数据流。
Spark安装 - 图3

1.3.2 易用

Spark支持Java、Python和Scala的API,还支持超过80种高级算法,使用户可以快速构建不同的应用。而且Spark支持交互式的Python和Scala的shell,可以非常方便地在这些shell中使用Spark集群来验证解决问题的方法。
Spark安装 - 图4

1.3.3 通用

Spark提供了统一的解决方案。Spark可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。这些不同类型的处理都可以在同一个应用中无缝使用。Spark统一的解决方案非常具有吸引力,毕竟任何公司都想用统一的平台去处理遇到的问题,减少开发和维护的人力成本和部署平台的物力成本。
Spark安装 - 图5

1.3.4 兼容性

Spark可以非常方便地与其他的开源产品进行融合。比如,Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,并且可以处理所有Hadoop支持的数据,包括HDFS、HBase和Cassandra等。这对于已经部署Hadoop集群的用户特别重要,因为不需要做任何数据迁移就可以使用Spark的强大处理能力。Spark也可以不依赖于第三方的资源管理和调度器,它实现了Standalone作为其内置的资源管理和调度框架,这样进一步降低了Spark的使用门槛,使得所有人都可以非常容易地部署和使用Spark。此外,Spark还提供了在EC2上部署Standalone的Spark集群的工具。
Spark安装 - 图6

2 Spark集群安装

2.1 安装

注意:安装spark时,无需安装scala

2.1.1 下载Spark安装包

Spark安装 - 图7

上传spark-安装包到Linux上
解压安装包到指定位置
# tar -zxvf spark-2.2.0-bin-hadoop2.7.tgz -C apps/
# ln -s /root/apps/spark-2.2.0-bin-hadoop2.7/ /root/apps/spark
Spark安装包目录结构:

bin 可执行脚本
conf 配置文件
data 示例程序使用数据
examples 示例程序
jars 依赖jar包
LICENSE
licenses
NOTICE
python pythonAPI
R R语言API
README.md
RELEASE
sbin 集群管理命令

2.1.2 机器部署

准备4台Linux服务器,安装好JDK
hdp-01 hdp-02 hdp-03 hdp-04

2.1.3 配置Spark standalone集群

确保集群中各节点的防火墙是关闭的。

查看防火墙状态
# service iptables status
关闭防火墙
# service iptables stop
永久关闭防火墙
# chkconfig iptables off

确保主节点到各从节点的免密登录配置好了

进入到Spark安装目录
# cd apps/spark
进入conf目录并重命名并修改spark-env.sh.template文件
# cd conf/
# mv spark-env.sh.template spark-env.sh
# vim spark-env.sh
在该配置文件中添加如下配置

export JAVA_HOME=/usr/local/jdk
export SPARK_MASTER_HOST=hdp-01
export SPARK_MASTER_PORT=7077

保存退出
重命名并修改slaves.template文件
# mv slaves.template slaves
# vim slaves
思考?为什么要修改slaves配置文件
在该文件中添加子节点所在的位置(Worker节点)

hdp-01
hdp-02
hdp-03

保存退出
将配置好的Spark拷贝到其他节点上
# scp -r /root/apps/spark-2.2.0-bin-hadoop2.7/ hdp-02:/root/apps/
# ssh hdp-02 “ln -s /root/apps/spark-2.2.0-bin-hadoop2.7/ /root/apps/spark”

Spark集群配置完毕,目前是1个Master,3个Work,在hdp-01上启动Spark集群

为了能方便使用,配置一下环境变量:
Spark安装 - 图8

启动:
单独启动master(在hdp-01上):
# start-master.sh

启动众worker
# start-slaves.sh

停止:
# stop-slaves.sh
# stop-master.sh

批量脚本启动:
# start-all.sh
停止:
# stop-all.sh

启动后执行jps命令,主节点上有Master进程,其他子节点上有Work进程,
登录Spark管理界面查看集群状态(主节点):http://hdp-01:8080/
Spark安装 - 图9
Spark安装 - 图10
默认情况下:spark会占用机器上的所有cores,
memory呢,会默认的使用ram – 1G
默认配置:
http://spark.apache.org/docs/latest/spark-standalone.html
Spark安装 - 图11
到此为止,Spark集群安装完毕。

2.1.4 HA 集群

Master节点存在单点故障,要解决此问题,就要借助zookeeper,并且启动至少两个Master节点来实现高可靠,配置方式比较简单:
Spark集群重新规划:(添加一台hdp-05节点,新增一台master节点)
hdp-01,hdp-02是Master;
hdp-01 hdp-02 hdp-03 hdp-04 hdp-05是Worker
安装配置zk集群,并启动zk集群,验证zk集群
Spark安装 - 图12

停止spark所有服务,在master节点上执行(hdp-01)
# stop-all.sh
修改配置文件spark-env.sh,在该配置文件中删掉SPARK_MASTER_HOST并添加如下配置
# vim /root/apps/spark/conf/spark-env.sh
export SPARK_DAEMON_JAVA_OPTS=”-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hdp-01:2181,hdp-02:2181,hdp-03:2181 -Dspark.deploy.zookeeper.dir=/spark”
Spark安装 - 图13

  1. 在hdp-01节点上修改slaves配置文件内容指定worker节点

vim /root/apps/spark/conf/slaves
Spark安装 - 图14

  1. 在hdp-01上执行start-all.sh脚本,然后在hdp-02上执行start-master.sh启动第二个Master

Spark安装 - 图15
Spark安装 - 图16

3 执行Spark程序

3.1 执行第一个spark示例程序

spark-submit \
—class org.apache.spark.examples.SparkPi \
/root/apps/spark/examples/jars/spark-examples_2.11-2.2.0.jar 100
该算法是利用蒙特·卡罗算法求PI

3.2 启动Spark Shell

spark-shell 用命令行的方式提交任务到集群的一个客户端。spark-shell是Spark自带的交互式Shell程序,方便用户进行交互式编程,用户可以在该命令行下用scala编写spark程序。

3.2.1 启动spark shell

直接启动spark-shell默认使用的是local模式,和spark集群无关
Spark安装 - 图17
local模式没有指定master地址,仅在本机启动一个进程(SparkSubmit),没有与集群建立联系。但是也可以正常启动spark shell和执行spark shell中的程序

指定集群模式启动:
hdfs://hdp-01:9000
spark的协议URI:spark://hdp-01:7077
# spark-shell —master
Spark安装 - 图18

在webUI界面,可以查看到正在运行的程序:
Spark安装 - 图19

还可以指定启动的参数:
# spark-shell —master spark://hdp-01:7077,hdp-02:7077 —executor-memory 512m —total-executor-cores 2
或者多行:
# spark-shell \
—master spark://hdp-01:7077,hdp-02:7077 \
—executor-memory 2g \
—total-executor-cores 2

参数说明:
—master spark://hdp-01:7077 指定Master的地址
—executor-memory 2g 指定每个executor可用内存为2G( 512m)
—total-executor-cores 2 指定整个集群使用的cup核数为2个
注意:如果worker节点的内存不足,那么在启动spark-shell的时候,就不能为executor分配超出worker可用的内存容量。

Spark Shell中已经默认将SparkContext类初始化为对象sc。用户代码如果需要用到,则直接应用sc即可

3.2.2 在spark shell中编写WordCount程序

  1. 首先启动hdfs
  2. 向hdfs上传一个文件到hdfs://hdp-01:9000/wordcount/input/a.txt
  3. 在spark shell中用scala语言编写spark程序

scala> sc.textFile(“hdfs://hdp-01:9000/wordcount/input/“)
spark是懒加载的,所以这里并没有真正执行任务。可使用collect方法快速查看数据。
lazy执行的,只有调用了action方法,才正式开始运行。

scala>sc.textFile(“hdfs://hdp-01:9000/wordcount/input/“).flatMap(.split(“ “)).map((,1)).reduceByKey( + ).sortBy(_._2,false).collect
注意:这些flatMap,map等方法是RDD上的方法,要区分于原生的scala方法。
和原生scala的方法名称有的相同,但属于不通的类的方法,底层实现完全不一致。
原生的方法: 对单机的数组或集合进行操作。
RDD上的方法:
RDD是spark的计算模型,RDD上有很多的方法,这些方法通常称为算子,主要有两类算子,一类是transform,一类是action,transform是懒加载的。

scala>sc.textFile(“hdfs://hdp-01:9000/wordcount/input/“).flatMap(.split(“ “)).map((,1)).reduceByKey(+).saveAsTextFile(“hdfs://hdp-01:9000/wordcount/outspark1”)

  1. 使用hdfs命令查看结果

hadoop fs -ls /wordcount/outspark1
说明:
sc是SparkContext对象,该对象是提交spark程序的入口
textFile(hdfs://hdp-01:9000/wordcount/intput/a.txt)是hdfs中读取数据
flatMap(.split(“ “))先map再压平
map((
,1))将单词和1构成元组
reduceByKey(+)按照key进行reduce,并将value累加
saveAsTextFile(“hdfs://hdp-01:9000/outspark1”)将结果写入到hdfs中

3.3 在IDEA中编写WordCount程序

spark shell仅在测试和验证我们的程序时使用的较多,在生产环境中,通常会在IDE中开发程序,然后打成jar包,然后提交到集群,最常用的是创建一个Maven项目,利用Maven来管理jar包的依赖。

1.创建一个项目
Spark安装 - 图20

2.选择Maven项目,然后点击next
Spark安装 - 图21

3.填写maven的GAV,然后点击next
Spark安装 - 图22

  1. 填写项目名称,然后点击finish

Spark安装 - 图23

5.创建好maven项目后,点击Enable Auto-Import
Spark安装 - 图24

  1. 配置Maven的pom.xml

详见pom.xml文件

  1. 新建一个scala object
  2. 编写spark程序 | | | —- |

3.4 打包并上传到集群

点击idea右侧的Maven Project选项
点击Lifecycle,选择clean和package,然后点击Run Maven Build
Spark安装 - 图25

  1. 选择编译成功的jar包,并将该jar上传到Spark集群中的某个节点上(任意节点即可)

Spark安装 - 图26
Spark安装 - 图27
确保启动了hdfs集群和spark集群

# hdfs启动(在namenode节点上)
# /root/apps/hadoop/sbin/start-dfs.sh
# spark启动(在master节点上)
# start-all.sh

3.5 提交任务

  1. 使用spark-submit命令提交Spark应用(注意参数的顺序) | spark-submit —master spark://hdp-01:7077,hdp-02:7077 —executor-memory 2g —total-executor-cores 4 —class cn.edu360.spark.WordCount sparkcore-1.0-SNAPSHOT.jar hdfs://hdp-01:9000/wordcount/input hdfs://hdp-01:9000/wordcount/output | | —- |

可以分多行写:
spark-submit \
—class cn.edu360.spark.WordCount \
—master spark://hdp-01:7077,hdp-02:7077 \
—executor-memory 2G \
—total-executor-cores 4 \
/root/sparkcore-1.0-SNAPSHOT.jar \
hdfs://hdp-01:9000/wordcount/input \
hdfs://hdp-01:9000/wordcount/output
Spark安装 - 图28
查看程序执行过程:
在web页面查看程序运行状态:http://hdp-01:8080
使用jps命令查看进行信息
查看hdfs文件结果
hdfs dfs -cat hdfs://hdp-01:9000/output/part-00000

如果内存或者cores不足,启动spark-submit就会报错,任务不能正常执行。
Spark安装 - 图29

3.6 spark集群各角色简介

通过指定参数(2g,2cores)启动了spark-shell后,可以在webUI界面监控到:
Spark安装 - 图30
只有部分的节点资源被使用了。
使用jps查看进程,发现在资源被使用的机器上多了进程:
Spark安装 - 图31
在提交spark-shell命令的机器上有进程:
Spark安装 - 图32

这一个超长的进程通常叫做Executor,被worker进程启动。真正负责任务的运行。
提交任务的节点,通常称为Dirver
当执行spark-shell时,会携带参数,并向master发送任务请求,master在接收到请求之后,会根据客户端需要的任务资源,选择出合适的Worker节点,然后向worker发送任务指令,接收到任务之后,worker会启动一个executor进程,
executor启动后,会等待分配计算任务,那然后呢,executor会向driver通信,
当有driver任务要执行时,任务就会分发到executor上, 然后并行执行。