- 1 Spark概述
- 2 Spark集群安装
- ">vim /root/apps/spark/conf/slaves
- 3 执行Spark程序
- hadoop fs -ls /wordcount/outspark1
说明:
sc是SparkContext对象,该对象是提交spark程序的入口
textFile(hdfs://hdp-01:9000/wordcount/intput/a.txt)是hdfs中读取数据
flatMap(.split(“ “))先map再压平
map((,1))将单词和1构成元组
reduceByKey(+)按照key进行reduce,并将value累加
saveAsTextFile(“hdfs://hdp-01:9000/outspark1”)将结果写入到hdfs中
1 Spark概述
1.1 什么是Spark(官网:http://spark.apache.org)
Spark是一种快速、通用、可扩展的大数据分析引擎,2009年诞生于加州大学伯克利分校AMPLab,2010年开源,2013年6月成为Apache孵化项目,2014年2月成为Apache顶级项目。目前,Spark生态系统已经发展成为一个包含多个子项目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等子项目,Spark是基于内存计算的大数据并行计算框架。Spark基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将Spark部署在大量廉价硬件之上,形成集群。
内存 + 磁盘
1.2 为什么要学Spark
中间结果输出:基于MapReduce的计算引擎通常会将中间结果输出到磁盘上,进行存储和容错。出于任务管道承接的考虑,当一些查询翻译到MapReduce任务时,往往会产生多个Stage,而这些串联的Stage又依赖于底层文件系统(如HDFS)来存储每一个Stage的输出结果
Spark是在借鉴了MapReduce之上发展而来的,继承了其分布式并行计算的优点并改进了MapReduce明显的缺陷,(spark与hadoop的差异)具体如下:
首先,Spark把中间数据放到内存中,迭代运算效率高。MapReduce中计算结果需要落地,保存到磁盘上,这样势必会影响整体速度,而Spark支持DAG图的分布式并行计算的编程框架,减少了迭代过程中数据的落地,提高了处理效率。(延迟加载)
其次,Spark容错性高。Spark引进了弹性分布式数据集RDD (Resilient Distributed Dataset) 的抽象,它是分布在一组节点中的只读对象集合,这些集合是弹性的,如果数据集一部分丢失,则可以根据“血统”(即允许基于数据衍生过程)对它们进行重建。另外在RDD计算时可以通过CheckPoint来实现容错。
最后,Spark更加通用。不像Hadoop只提供了Map和Reduce两种操作,Spark提供的数据集操作类型有很多种,大致分为:Transformations和Actions两大类。Transformations包括Map、Filter、FlatMap、Sample、GroupByKey、ReduceByKey、Union、Join、Cogroup、MapValues、Sort等多种操作类型,同时还提供Count, Actions包括Collect、Reduce、Lookup和Save等操作。
Spark是MapReduce的替代方案,而且兼容HDFS、Hive,可融入Hadoop的生态系统,以弥补MapReduce的不足。
1.3 Spark特点
1.3.1 快
与Hadoop的MapReduce相比,Spark基于内存的运算要快100倍以上,基于硬盘的运算也要快10倍以上。Spark实现了高效的DAG执行引擎,可以通过基于内存来高效处理数据流。
1.3.2 易用
Spark支持Java、Python和Scala的API,还支持超过80种高级算法,使用户可以快速构建不同的应用。而且Spark支持交互式的Python和Scala的shell,可以非常方便地在这些shell中使用Spark集群来验证解决问题的方法。
1.3.3 通用
Spark提供了统一的解决方案。Spark可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。这些不同类型的处理都可以在同一个应用中无缝使用。Spark统一的解决方案非常具有吸引力,毕竟任何公司都想用统一的平台去处理遇到的问题,减少开发和维护的人力成本和部署平台的物力成本。
1.3.4 兼容性
Spark可以非常方便地与其他的开源产品进行融合。比如,Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,并且可以处理所有Hadoop支持的数据,包括HDFS、HBase和Cassandra等。这对于已经部署Hadoop集群的用户特别重要,因为不需要做任何数据迁移就可以使用Spark的强大处理能力。Spark也可以不依赖于第三方的资源管理和调度器,它实现了Standalone作为其内置的资源管理和调度框架,这样进一步降低了Spark的使用门槛,使得所有人都可以非常容易地部署和使用Spark。此外,Spark还提供了在EC2上部署Standalone的Spark集群的工具。
2 Spark集群安装
2.1 安装
2.1.1 下载Spark安装包
上传spark-安装包到Linux上
解压安装包到指定位置
# tar -zxvf spark-2.2.0-bin-hadoop2.7.tgz -C apps/
# ln -s /root/apps/spark-2.2.0-bin-hadoop2.7/ /root/apps/spark
Spark安装包目录结构:
bin 可执行脚本 conf 配置文件 data 示例程序使用数据 examples 示例程序 jars 依赖jar包 LICENSE licenses NOTICE python pythonAPI R R语言API README.md RELEASE sbin 集群管理命令 |
---|
2.1.2 机器部署
准备4台Linux服务器,安装好JDK
hdp-01 hdp-02 hdp-03 hdp-04
2.1.3 配置Spark standalone集群
确保集群中各节点的防火墙是关闭的。
查看防火墙状态 # service iptables status 关闭防火墙 # service iptables stop 永久关闭防火墙 # chkconfig iptables off |
---|
确保主节点到各从节点的免密登录配置好了
进入到Spark安装目录
# cd apps/spark
进入conf目录并重命名并修改spark-env.sh.template文件
# cd conf/
# mv spark-env.sh.template spark-env.sh
# vim spark-env.sh
在该配置文件中添加如下配置
export JAVA_HOME=/usr/local/jdk export SPARK_MASTER_HOST=hdp-01 export SPARK_MASTER_PORT=7077 |
---|
保存退出
重命名并修改slaves.template文件
# mv slaves.template slaves
# vim slaves
思考?为什么要修改slaves配置文件
在该文件中添加子节点所在的位置(Worker节点)
hdp-01 hdp-02 hdp-03 |
---|
保存退出
将配置好的Spark拷贝到其他节点上
# scp -r /root/apps/spark-2.2.0-bin-hadoop2.7/ hdp-02:/root/apps/
# ssh hdp-02 “ln -s /root/apps/spark-2.2.0-bin-hadoop2.7/ /root/apps/spark”
Spark集群配置完毕,目前是1个Master,3个Work,在hdp-01上启动Spark集群
为了能方便使用,配置一下环境变量:
启动:
单独启动master(在hdp-01上):
# start-master.sh
启动众worker
# start-slaves.sh
停止:
# stop-slaves.sh
# stop-master.sh
批量脚本启动:
# start-all.sh
停止:
# stop-all.sh
启动后执行jps命令,主节点上有Master进程,其他子节点上有Work进程,
登录Spark管理界面查看集群状态(主节点):http://hdp-01:8080/
默认情况下:spark会占用机器上的所有cores,
memory呢,会默认的使用ram – 1G
默认配置:
http://spark.apache.org/docs/latest/spark-standalone.html
到此为止,Spark集群安装完毕。
2.1.4 HA 集群
Master节点存在单点故障,要解决此问题,就要借助zookeeper,并且启动至少两个Master节点来实现高可靠,配置方式比较简单:
Spark集群重新规划:(添加一台hdp-05节点,新增一台master节点)
hdp-01,hdp-02是Master;
hdp-01 hdp-02 hdp-03 hdp-04 hdp-05是Worker
安装配置zk集群,并启动zk集群,验证zk集群
停止spark所有服务,在master节点上执行(hdp-01)
# stop-all.sh
修改配置文件spark-env.sh,在该配置文件中删掉SPARK_MASTER_HOST并添加如下配置
# vim /root/apps/spark/conf/spark-env.sh
export SPARK_DAEMON_JAVA_OPTS=”-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hdp-01:2181,hdp-02:2181,hdp-03:2181 -Dspark.deploy.zookeeper.dir=/spark”
- 在hdp-01节点上修改slaves配置文件内容指定worker节点
vim /root/apps/spark/conf/slaves
![Spark安装 - 图14](/uploads/projects/xiaohuolu-bag4u@fxmgs3/0519adaa8732da2b105af349010261ac.png)
- 在hdp-01上执行start-all.sh脚本,然后在hdp-02上执行start-master.sh启动第二个Master
3 执行Spark程序
3.1 执行第一个spark示例程序
spark-submit \
—class org.apache.spark.examples.SparkPi \
/root/apps/spark/examples/jars/spark-examples_2.11-2.2.0.jar 100
该算法是利用蒙特·卡罗算法求PI
3.2 启动Spark Shell
spark-shell 用命令行的方式提交任务到集群的一个客户端。spark-shell是Spark自带的交互式Shell程序,方便用户进行交互式编程,用户可以在该命令行下用scala编写spark程序。
3.2.1 启动spark shell
直接启动spark-shell默认使用的是local模式,和spark集群无关
local模式没有指定master地址,仅在本机启动一个进程(SparkSubmit),没有与集群建立联系。但是也可以正常启动spark shell和执行spark shell中的程序
指定集群模式启动:
hdfs://hdp-01:9000
spark的协议URI:spark://hdp-01:7077
# spark-shell —master
在webUI界面,可以查看到正在运行的程序:
还可以指定启动的参数:
# spark-shell —master spark://hdp-01:7077,hdp-02:7077 —executor-memory 512m —total-executor-cores 2
或者多行:
# spark-shell \
—master spark://hdp-01:7077,hdp-02:7077 \
—executor-memory 2g \
—total-executor-cores 2
参数说明:
—master spark://hdp-01:7077 指定Master的地址
—executor-memory 2g 指定每个executor可用内存为2G( 512m)
—total-executor-cores 2 指定整个集群使用的cup核数为2个
注意:如果worker节点的内存不足,那么在启动spark-shell的时候,就不能为executor分配超出worker可用的内存容量。
Spark Shell中已经默认将SparkContext类初始化为对象sc。用户代码如果需要用到,则直接应用sc即可
3.2.2 在spark shell中编写WordCount程序
- 首先启动hdfs
- 向hdfs上传一个文件到hdfs://hdp-01:9000/wordcount/input/a.txt
- 在spark shell中用scala语言编写spark程序
scala> sc.textFile(“hdfs://hdp-01:9000/wordcount/input/“)
spark是懒加载的,所以这里并没有真正执行任务。可使用collect方法快速查看数据。
lazy执行的,只有调用了action方法,才正式开始运行。
scala>sc.textFile(“hdfs://hdp-01:9000/wordcount/input/“).flatMap(.split(“ “)).map((,1)).reduceByKey( + ).sortBy(_._2,false).collect
注意:这些flatMap,map等方法是RDD上的方法,要区分于原生的scala方法。
和原生scala的方法名称有的相同,但属于不通的类的方法,底层实现完全不一致。
原生的方法: 对单机的数组或集合进行操作。
RDD上的方法:
RDD是spark的计算模型,RDD上有很多的方法,这些方法通常称为算子,主要有两类算子,一类是transform,一类是action,transform是懒加载的。
scala>sc.textFile(“hdfs://hdp-01:9000/wordcount/input/“).flatMap(.split(“ “)).map((,1)).reduceByKey(+).saveAsTextFile(“hdfs://hdp-01:9000/wordcount/outspark1”)
- 使用hdfs命令查看结果
hadoop fs -ls /wordcount/outspark1
说明:
sc是SparkContext对象,该对象是提交spark程序的入口
textFile(hdfs://hdp-01:9000/wordcount/intput/a.txt)是hdfs中读取数据
flatMap(.split(“ “))先map再压平
map((,1))将单词和1构成元组
reduceByKey(+)按照key进行reduce,并将value累加
saveAsTextFile(“hdfs://hdp-01:9000/outspark1”)将结果写入到hdfs中
3.3 在IDEA中编写WordCount程序
spark shell仅在测试和验证我们的程序时使用的较多,在生产环境中,通常会在IDE中开发程序,然后打成jar包,然后提交到集群,最常用的是创建一个Maven项目,利用Maven来管理jar包的依赖。
1.创建一个项目
2.选择Maven项目,然后点击next
3.填写maven的GAV,然后点击next
- 填写项目名称,然后点击finish
5.创建好maven项目后,点击Enable Auto-Import
- 配置Maven的pom.xml
详见pom.xml文件
- 新建一个scala object
- 编写spark程序 | | | —- |
3.4 打包并上传到集群
点击idea右侧的Maven Project选项
点击Lifecycle,选择clean和package,然后点击Run Maven Build
- 选择编译成功的jar包,并将该jar上传到Spark集群中的某个节点上(任意节点即可)
确保启动了hdfs集群和spark集群
# hdfs启动(在namenode节点上) # /root/apps/hadoop/sbin/start-dfs.sh # spark启动(在master节点上) # start-all.sh |
---|
3.5 提交任务
- 使用spark-submit命令提交Spark应用(注意参数的顺序) | spark-submit —master spark://hdp-01:7077,hdp-02:7077 —executor-memory 2g —total-executor-cores 4 —class cn.edu360.spark.WordCount sparkcore-1.0-SNAPSHOT.jar hdfs://hdp-01:9000/wordcount/input hdfs://hdp-01:9000/wordcount/output | | —- |
可以分多行写:
spark-submit \
—class cn.edu360.spark.WordCount \
—master spark://hdp-01:7077,hdp-02:7077 \
—executor-memory 2G \
—total-executor-cores 4 \
/root/sparkcore-1.0-SNAPSHOT.jar \
hdfs://hdp-01:9000/wordcount/input \
hdfs://hdp-01:9000/wordcount/output
查看程序执行过程:
在web页面查看程序运行状态:http://hdp-01:8080
使用jps命令查看进行信息
查看hdfs文件结果
hdfs dfs -cat hdfs://hdp-01:9000/output/part-00000
如果内存或者cores不足,启动spark-submit就会报错,任务不能正常执行。
3.6 spark集群各角色简介
通过指定参数(2g,2cores)启动了spark-shell后,可以在webUI界面监控到:
只有部分的节点资源被使用了。
使用jps查看进程,发现在资源被使用的机器上多了进程:
在提交spark-shell命令的机器上有进程:
这一个超长的进程通常叫做Executor,被worker进程启动。真正负责任务的运行。
提交任务的节点,通常称为Dirver
当执行spark-shell时,会携带参数,并向master发送任务请求,master在接收到请求之后,会根据客户端需要的任务资源,选择出合适的Worker节点,然后向worker发送任务指令,接收到任务之后,worker会启动一个executor进程,
executor启动后,会等待分配计算任务,那然后呢,executor会向driver通信,
当有driver任务要执行时,任务就会分发到executor上, 然后并行执行。