spark_day01学习笔记

1、课程目标

  • 1、掌握spark相关概念
  • 2、掌握搭建一个spark集群
  • 3、掌握简单的spark应用程序开发

2、spark概述

2.1 什么是spark
  • Apache Spark™ is a unified analytics engine for large-scale data processing.

  • apache的spark是一个针对于大规模数据处理的统一分析引擎

    1. spark是基于内存的计算框架,计算速度非常快,但是这里仅仅只涉及到数据的计算,并没有涉及到数据的存储。
    2. 后期需要进行数据的计算,这里就可以对接不同的外部数据源(比如hdfs

2.2 为什么要学习spark
  • 就是由于spark的处理速度比mapreduce要快很多,后期很受企业青睐。

3、spark的四大特性

3.1、速度快
  • spark比mapreduce在内存中快100倍,比mapreduce在磁盘中快10倍

  • spark比mapreduce快很多的主要的2个原因: ``` (1)mapreduce在处理任务的时候,每一个job 的输出结果都会落地到磁盘,后续有其他的job需要依赖于前面job的输出结果,这个时候只能够从磁盘中加载得到,需要大量的磁盘io操作; spark在处理任务的时候,每一个job的输出结果可以保存在内存中,后续有其他的job需要依赖于前面job的输出结果,这个时候就可以直接从内存获取得到,大大减少磁盘io操作,最后性能肯定是大大的提升。l

例如:hivesql—-> select name,age from (select * from user where age >30);
job2<—————————-job1 map 0% reduce 0% map 10% reduce 0%

(2) mapreduce任务是以进程的方式运行在yarn集群中。比如说一个mapreduce任务中有100个MapTask, 后期要处理这100个MapTask就需要开启100个进程; spark任务是以线程的方式运行在spark集群中进程里面,比如说一个spark任务有100个MapTask, 这个时候可以极端一点,只需要启动一个进程,在一个进程中运行100个线程中就可以,进程的启动与线程的启动代价肯定是不一样,一个进程的启动需要的调度时间和资源远远大于一个线程。

  1. <a name="13935083"></a>
  2. ##### 3.2、易用性
  3. - 可以快速写一个spark应用程序,通过java、scala、python、R、sql等不同的语言进行代码开发。
  4. <a name="3a95d61b"></a>
  5. ##### 3.3、通用性
  6. -
  7. spark框架不再是一个简单的框架,它发展成一个生态系统
  8. -
  9. 就包括了sparksql、sparkStreaming、Mlib、Graphx不同的子项目
  10. -
  11. 后期按照公司当前的业务需求就可以灵活的使用到这些不同的子项目
  12. -
  13. 一站式解决所有应用场景
  14. - 离线
  15. - 实时
  16. - 机器学习算法库
  17. - 图计算、图挖掘
  18. <a name="52bbfc24"></a>
  19. ##### 3.4、兼容性
  20. - spark任务就是一个计算程序,哪里可以给当前这个任务提供计算资源,我们就可以把这个任务提交到哪里去运行。
  21. - standAlone
  22. - 它是spark集群自带的模式,整个任务的资源分配由Master老大去负责
  23. - yarn
  24. - 可以把spark程序提交到yarn中运行,整个任务的资源分配由ResourceManager负责
  25. - mesos
  26. - 它是一个apache开源的类似于yarn的资源调度平台
  27. <a name="0618a78a"></a>
  28. #### 4、spark集群安装部署
  29. -
  30. 1、下载对应的安装包
  31. - [http://mirrors.shu.edu.cn/apache/spark/spark-2.1.3/spark-2.1.3-bin-hadoop2.7.tgz](http://mirrors.shu.edu.cn/apache/spark/spark-2.1.3/spark-2.1.3-bin-hadoop2.7.tgz)
  32. - spark-2.1.3-bin-hadoop2.7.tgz
  33. -
  34. 2、规划安装目录
  35. - /export/servers
  36. -
  37. 3、上传安装包到服务器中
  38. -
  39. 4、解压安装包到指定的规划目录
  40. - tar -zxvf spark-2.1.3-bin-hadoop2.7.tgz -C /export/servers
  41. -
  42. 5、重命名解压目录
  43. - mv spark-2.1.3-bin-hadoop2.7 spark
  44. -
  45. 6、修改配置文件
  46. -
  47. 进入到spark安装目录下的conf文件夹中
  48. -
  49. vim spark-env.sh ( mv spark-env.sh.template spark-env.sh)

配置java环境变量

export JAVA_HOME=/export/servers/jdk

配置master地址

export SPARK_MASTER_HOST=node01

配置master的端口

export SPARK_MASTER_PORT=7077

  1. -
  2. vim slaves (mv slaves.template slaves)

指定哪些节点是小弟worker

node02 node03

  1. -
  2. 7、配置spark的环境变量
  3. -
  4. vim /etc/profile

export SPARK_HOME=/export/servers/spark export PATH=$PATH:$SPARK_HOME/sbin:$SPARK_HOME/bin

  1. -
  2. 8、分发spark的安装目录和环境变量

scp -r spark node02:/export/servers scp -r spark node03:/export/servers

scp /etc/profile node02:/etc scp /etc/profile node03:/etc

  1. -
  2. 9、让所有spark节点的环境变量生效
  3. - 在所有节点上执行
  4. - source /etc/profile
  5. <a name="7dd7a26c"></a>
  6. #### 5、spark集群的启动和停止
  7. <a name="da9b97d5"></a>
  8. ##### 5.1 启动
  9. - 在主节点上进入到sbin目录下
  10. - ./start-all.sh
  11. <a name="6473f8d8"></a>
  12. ##### 5.2 停止
  13. -
  14. 在主节点上进入到sbin目录下
  15. - ./stop-all.sh
  16. <a name="e5daef57"></a>
  17. #### 6、spark集群的web管理界面
  18. namenode: hdfs://node01:8020-----------------> node01:50070
  19. master: spark://node01:7077 --------------------> node01:8080
  20. -
  21. 启动spark集群
  22. -
  23. 访问
  24. - master所在的主机名或者是ip地址:8080
  25. - 可以看到spark集群的所有信息
  26. - spark集群总的资源信息(有多少core 有多少内存)
  27. - spark集群已经使用的资源信息
  28. - spark集群还剩的资源信息
  29. - spark集群对应的worker信息
  30. - spark集群正在运行的任务信息
  31. - spark集群已经完成的任务信息

在实际工作中我们会搭建一个非常多的服务器组成一个spark集群,每一台服务器都有对应的资源信息 10台worker,每一个worker的cpu核数20 内存大小100G worker会参与任务的计算,整个spark集群所有的资源信息就是把所有的worker节点资源信息进行累加

spark集群总的资源信息: 内存: 10100G=1T cpu核数:1020=200核

后期会把大量的spark任务提交到spark集群去运行,这个时候就需要考虑spark集群还剩的资源信息,以及后面可能还会跑其他的任务,他们也需要对应的资源。资源分配的时候要合理点。

  1. <a name="2a438413"></a>
  2. #### 7、基于zookeeper构建高可用的spark集群
  3. -
  4. 1、前置依赖
  5. - 依赖于zk,事先需要搭建zk集群
  6. -
  7. 2、node01上修改配置文件
  8. -
  9. vim spark-env.sh

手动注释掉配置的master地址

export SPARK_MASTER_HOST=node01

引入zk 构建高可用spark集群

export SPARK_DAEMON_JAVA_OPTS=”-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=node01:2181,n ode2:2181,node03:2181 -Dspark.deploy.zookeeper.dir=/spark”

  1. -
  2. 分发到其他机器

scp spark-env.sh node02:/export/servers/spark/conf scp spark-env.sh node03:/export/servers/spark/conf

  1. -
  2. 3、启动高可用spark集群
  3. -
  4. 1、启动zk集群
  5. ```shell
  6. #!/bin/sh
  7. for host in node01 node02 node03
  8. do
  9. ssh $host "source /etc/profile;nohup zkServer.sh start > /dev/null 2>&1 &"
  10. echo "$host zk is running"
  11. done
  • 2、可以在任意一台机器来执行脚本(有前提条件:需要实现任意2台机器之间ssh免密登录)

    • start-all.sh

      • 你在那台机器执行这个脚本,它就会在当前机器启动一个Master进程
      • 整个集群中的worker进程的启动由slaves文件决定
  • 3、后期可以在其他机器单独启动master进程

    • start-master.sh
  • 4、高可用集群部署梳理 ``` 这个时候引入了zk,构建了一个高可用的spark集群,在这里就有很多个master,其中有一个master被选举成活着的master,其他的多个master都处理standBy,活着的master会提供服务,处理standBy中的master不会提供服务,同时也把整个spark集群的元数据信息通过zk集群中的”/spark”节点去保存。

如果此时活着的master挂掉了,首先zk会感知到,然后在所有处理standBy中的master进行选举,最后产生一个新的活着的master,这个新的活着的master会读取到zk集群中保存spark集群元数据信息的节点,最后恢复到上一次挂掉的状态。整个恢复阶段需要一定时间的。一般就是1-2分钟。

整个集群活着的master挂掉了,目前正在处于恢复阶段,对任务有什么影响? (1)对于正在运行的任务有没有影响? 没有,既然任务正在运行,就说明它是已经获取得到了资源,既然有了资源,就不需要master了,继续运行就可以了。

(2)对于在恢复阶段提交的任务有没有影响? 有,对于这个任务来说,由于整个spark集群没有这样一个活着的Master存在,任务也就分配不到计算资源,这个任务拿不到资源,任务就无法运行。

  1. <a name="fcdaea70"></a>
  2. #### 8、spark的角色
  3. ![](spark.png#alt=)
  4. - 1、Driver
  5. - 它会运行客户端写好的main方法,并且它会创建SparkContext对象,该对象是所有spark程序的执行入口
  6. - 2、Application
  7. - 它就是一个应用程序,它包括了Driver端的代码逻辑和任务在运行的时候需要的资源信息
  8. - 3、ClusterManager
  9. - 它既是给当前任务提供计算资源的外部服务
  10. - standAlone
  11. - 它是spark自带的集群模式,整个任务的资源分配由Master负责
  12. - yarn
  13. - spark程序可以提交到yarn中去运行,整个任务的资源分配由ResourceManager负责
  14. - mesos
  15. - 就是一个apache开源的类似于yarn资源调度平台
  16. - 4、Master
  17. - 它是整个spark集群的老大。它负责资源的分配
  18. - 5、Worker
  19. - 它是整个spark集群的小弟,它负责任务的计算节点
  20. - 6、Executor
  21. - 它是一个进程,它会在Worker节点上启动对应的executor进程
  22. - 7、task
  23. - 它就是一个线程,它是以线程的方式运行在worker节点的executor进程中
  24. <a name="d8a81ebc"></a>
  25. #### 9、初识spark程序
  26. -
  27. 1、普通模式下提交任务
  28. - 就是我们已经知道了活着的master地址

bin/spark-submit \ —class org.apache.spark.examples.SparkPi \ —master spark://node01:7077 \ —executor-memory 1G \ —total-executor-cores 2 \ examples/jars/spark-examples_2.11-2.1.3.jar \ 10

—class :指定程序的主类 —master:指定master地址 —executor-memory :指定每一个executor需要的内存大小 —total-executor-cores :执行总的cpu核数

  1. -
  2. 2、高可用模式下提交任务
  3. - 就是我们并不知道哪一个master是活着的master

bin/spark-submit \ —class org.apache.spark.examples.SparkPi \ —master spark://node01:7077,node02:7077,node03:7077 \ —executor-memory 1G \ —total-executor-cores 2 \ examples/jars/spark-examples_2.11-2.1.3.jar \ 10

在高可用模式下提交任务,需要把所有的master地址进行罗列 —master spark://node01:7077,node02:7077,node03:7077 后期程序后依次轮训整个master列表,最后找到活着的master,然后向这个活着的master去提交任务。

  1. <a name="4ee24e0c"></a>
  2. #### 10、spark-shell 使用
  3. <a name="616fb2a7"></a>
  4. ##### 10.1 通过 spark-shell --master local[N] 读取本地数据文件实现单词统计
  5. -
  6. --master local[N]
  7. -
  8. local表示本地运行,跟集群没有任何关系,方便做一些测试和学习
  9. -
  10. N 表示一个正整数
  11. -
  12. local[N] 表示本地采用N个线程去运行任务
  13. -
  14. spark-shell --master local[2]
  15. - 它会产生一个SparkSubmit进程
  16. ```scala
  17. sc.textFile("file:///root/words.txt").flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey((x,y)=>x+y).collect
  18. sc.textFile("file:///root/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect

10.2 通过 spark-shell —master local[N] 读取HDFS上数据文件实现单词统计
  • spark-shell —master local[2]

    • spark整合HDFS

      • vim spark-env.sh
        1. #spark整合HDFS
        2. export HADOOP_CONF_DIR=/export/servers/hadoop/etc/hadoop
  1. sc.textFile("hdfs://node01:9000/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
  2. sc.textFile("/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect

10.3 通过 spark-shell —master 指定master为集群中活着的master
  • 需求:实现读取HDFS上的数据进行单词统计,最后把结果数据保存到HDFS上的目录中

  • spark-shell —master spark://node02:7077 —executor-memory 1g —total-executor-cores 4

    1. sc.textFile("/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("/out")

11、通过IDEA开发spark的wordCount程序

  • 构建一个maven工程

  • 引入依赖

    1. <dependency>
    2. <groupId>org.apache.spark</groupId>
    3. <artifactId>spark-core_2.11</artifactId>
    4. <version>2.1.3</version>
    5. </dependency>

11.1 通过scala语言开发spark的wordcount程序(本地运行)
  • 1、代码开发 ```scala package cn.itcast.wordCount

import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}

//todo:利用scala语言开发spark的wordcount程序 object WordCount { def main(args: Array[String]): Unit = { //1、创建SparkConf对象 设置appName和master地址 local[2]表示本地采用2个线程去运行 val sparkConf: SparkConf = new SparkConf().setAppName(“WordCount”).setMaster(“local[2]”)

  1. //2、创建SparkContext对象 它是所有spark程序的执行入口,它内部会构建DAGScheduler和TaskScheduler
  2. val sc = new SparkContext(sparkConf)
  3. //设置日志输出级别
  4. sc.setLogLevel("warn")
  5. //3、读取本地数据文件
  6. val data: RDD[String] = sc.textFile("E:\\words.txt")
  7. //4、切分每一行,获取所有的单词
  8. val words: RDD[String] = data.flatMap(_.split(" "))
  9. //5、每个单词计为1
  10. val wordAndOne: RDD[(String, Int)] = words.map((_,1))
  11. //6、相同单词出现的1累加
  12. val result: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)
  13. //按照单词出现的次数降序排列,默认是第二个参数是true是升序,改为false 就是降序
  14. val sortedRDD: RDD[(String, Int)] = result.sortBy(x=>x._2,false)
  15. //7、收集打印
  16. val finalResult: Array[(String, Int)] = sortedRDD.collect()
  17. finalResult.foreach(println)
  18. //
  19. //8、关闭sc
  20. sc.stop()

} }

  1. <a name="fc5d26be"></a>
  2. ##### 11.2 通过scala语言开发spark的wordcount程序(集群运行)
  3. - 1、代码开发
  4. ```scala
  5. package cn.itcast.wordCount
  6. import org.apache.spark.{SparkConf, SparkContext}
  7. import org.apache.spark.rdd.RDD
  8. //todo:通过scala语言开发spark的wordcount程序(集群运行)
  9. object WordCount_Online {
  10. def main(args: Array[String]): Unit = {
  11. //1、创建SparkConf对象 设置appName
  12. val sparkConf: SparkConf = new SparkConf().setAppName("WordCount_Online")
  13. //2、创建SparkContext对象 它是所有spark程序的执行入口,它内部会构建DAGScheduler和TaskScheduler
  14. val sc = new SparkContext(sparkConf)
  15. //设置日志输出级别
  16. sc.setLogLevel("warn")
  17. //3、读取本地数据文件
  18. val data: RDD[String] = sc.textFile(args(0))
  19. //4、切分每一行,获取所有的单词
  20. val words: RDD[String] = data.flatMap(_.split(" "))
  21. //5、每个单词计为1
  22. val wordAndOne: RDD[(String, Int)] = words.map((_,1))
  23. //6、相同单词出现的1累加
  24. val result: RDD[(String, Int)] = cwordAndOne.reduceByKey(_+_)
  25. //7、保存结果数据到HDFS上
  26. result.saveAsTextFile(args(1))
  27. //8、关闭sc
  28. sc.stop()
  29. }
  30. }
  • 2、打成jar包提交到集群中运行
    1. spark-submit --master spark://node03:7077 --class cn.itcast.wordCount.WordCount_Online --executor-memory 1g --total-executor-cores 4 original-spark_class15-1.0-SNAPSHOT.jar /words.txt /out_spark

11.3 通过java语言开发spark的wordcount程序(本地运行)
  • 1、代码开发 ```java package cn.itcast.wordCount;

import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2;

import java.util.Arrays; import java.util.Iterator; import java.util.List;

//todo;通过java语言开发实现spark的wordcount程序 public class WordCount_Java { public static void main(String[] args) { //1、创建SparkConf对象 SparkConf sparkConf = new SparkConf().setAppName(“WordCount_Java”).setMaster(“local[2]”);

  1. //2、创建JavaSparkContext
  2. JavaSparkContext jsc = new JavaSparkContext(sparkConf);
  3. //3、读取数据文件
  4. JavaRDD<String> data = jsc.textFile("E:\\words.txt");
  5. //4、切分每一行,获取所有的单词
  6. JavaRDD<String> words = data.flatMap(new FlatMapFunction<String, String>() {
  7. public Iterator<String> call(String line) throws Exception {
  8. String[] words = line.split(" ");
  9. return Arrays.asList(words).iterator();
  10. }
  11. });
  12. //5、每个单词计为1
  13. JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(new PairFunction<String, String, Integer>() {
  14. public Tuple2<String, Integer> call(String word) throws Exception {
  15. return new Tuple2<String, Integer>(word, 1);
  16. }
  17. });
  18. //6、相同单词出现的1累加
  19. JavaPairRDD<String, Integer> result = wordAndOne.reduceByKey(new Function2<Integer, Integer, Integer>() {
  20. public Integer call(Integer v1, Integer v2) throws Exception {
  21. return v1 + v2;
  22. }
  23. });
  24. //按照单词出现的次数降序排列 (单词,次数)------->(次数,单词).sortByKey(false)------>(单词,次数)
  25. JavaPairRDD<Integer, String> reverseRDD = result.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
  26. public Tuple2<Integer, String> call(Tuple2<String, Integer> t) throws Exception {
  27. return new Tuple2<Integer, String>(t._2, t._1);
  28. }
  29. });
  30. JavaPairRDD<String, Integer> sortedRDD = reverseRDD.sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
  31. public Tuple2<String, Integer> call(Tuple2<Integer, String> t) throws Exception {
  32. return new Tuple2<String, Integer>(t._2, t._1);
  33. }
  34. });
  35. //7、收集打印
  36. List<Tuple2<String, Integer>> finalResult = sortedRDD.collect();
  37. for (Tuple2<String, Integer> tuple : finalResult) {
  38. System.out.println("单词:"+tuple._1+" 次数:"+tuple._2);
  39. }
  40. //8、关闭jsc
  41. jsc.stop();
  42. }

} ```