笔记内容选自慕课网《大数据开发工程师》体系课

9.1 什么是Spark?

  • Spark是一个用于大规模数据处理的统一计算引擎
    • 可以做MapReduce的离线数据计算
    • 还可以做实时计算
    • 还有实现类似Hive的SQL计算
  • Spark中一个最重要的特性就是基于内存进行计算
    • 从而让它的速度可以达到 MapReduce 的几十倍甚至上百倍

9.1.1 Spark的特点?

  • Speed:速度快

image.png
由于Spark是基于内存进行计算的,所以它的计算性能理论上可以比MapReduce快100倍
Spark使用最先进的DAG调度器查询优化器和物理执行引擎,实现了高性能的批处理和流处理
注意:批处理其实就是离线计算流处理就是实时计算,只是说法不一样罢了,意思是一样的

  • Easy of Use:易用性

image.png
Spark的易用性主要体现在两个方面
1. 可以使用多种编程语言快速编写应用程序,例如JavaScala、Python、R和SQL
2. Spark提供了80多个高阶函数,可以轻松构建Spark任务

  • Generality:通用性

image.png
Spark提供了CoreSQLStreamingMLlibGraphX等技术组件
可以一站式地完成大数据领域的离线批处理SQL交互式查询流式实时计算机器学习图计算等常见的任务

  • Runs Everywhere:到处运行

image.png
你可以在Hadoop YARNMesosKubernetes使用Spark集群
并且可以访问HDFSAlluxioApache CassandraApache HBaseApache Hive和数百个其它数据源中的数据

9.1.2 Spark vs Hadoop

  • 综合能力
    • Spark是一个综合性质的计算引擎
    • Hadoop既包含MapReduce(计算),还包含HDFS(存储)YARN(资源管理)
  • 计算模型
    • Spark任务可以包含多个计算操作,轻松实现复杂迭代计算
    • HadoopMap中ReduceReduce任务只包含Map和阶段Reduce阶段,不够灵活
  • 处理速度
    • Spark任务的数据是存放在内存中的
    • 而 Hadoop中 Map Reduce 任务是基于磁盘的


9.1.3 Hadoop+Spark

image.png

  • 底层是Hadoop的HDFS和YARN
  • Spark core指的是Spark的离线批处理
  • Spark Streaming指的是Spark的实时流计算
  • SparkSQL指的是Spark中的SQL计算
  • Spark Mllib指的是Spark中的机器学习库,这里面集成了很多机器学习算法
  • Spark GraphX是指图计算

9.1.4 Spark的应用场景

  • 低延时的海量数据计算需求
    • 这个说的就是针对Spark core的应用
  • 低延时SQL交互查询需求
    • 这个说的就是针对Spark SQL的应用
  • 准实时(秒级)海量数据计算需求
    • 这个说的就是针对Spark Streaming的应用

9.1.5 Spark集群安装部署

  • 下载地址:https://archive.apache.org/dist/spark/
  • Standalone:独立集群
  • ON YARN:共用 Hadoop集群资源【推荐使用
    • 先保证有一个Hadoop集群,然后只需要部署一个Spark的客户端节点即可,不需要启动任何进程
    • 注意
      • Spark的客户端节点同时也需要是Hadoop的客户端节点,因为Spark需要依赖于Hadoop ```basic

        ON YARN安装方式,把 spark-2.4.3-bin-hadoop2.7.tgz 上传到客户端节点bigdata04

        [root@bigdata04 soft]# tar -zxvf spark-2.4.3-bin-hadoop2.7.tgz

进入目录

cd spark-2.4.3-bin-hadoop2.7/conf

给配置改名

mv spark-env.sh.template spark-env.sh

修改配置

vi spark-env.sh … export JAVA_HOME=/data/soft/jdk1.8 export HADOOP_CONF_DIR=/data/soft/hadoop-3.2.0/etc/hadoop

配置SPARK_HOME环境变量

vi /etc/profile … export JAVA_HOME=/data/soft/jdk1.8 export HADOOP_HOME=/data/soft/hadoop-3.2.0 export HIVE_HOME=/data/soft/apache-hive-3.1.2-bin export SPARK_HOME=/data/soft/spark-2.4.3-bin-hadoop2.7 export PATH=.:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HIVE_HOME/bin:$SPARK_HOME/bin:$PATH

source /etc/profile

切换bigdata01启动Hadoop集群

[root@bigdata01 soft]# /data/soft/hadoop-3.2.0/sbin/start-all.sh

切换bigdata04客户端节点,启动spark

cd /data/soft/spark-2.4.3-bin-hadoop2.7 [root@bigdata04 spark-2.4.3-bin-hadoop2.7]# bin/spark-submit —class org.apache.spark.examples.SparkPi —master yarn —deploy-mode cluster examples/jars/spark-examples_2.11-2.4.3.jar 2

  1. - 访问:[http://bigdata01:8088/cluster](http://bigdata01:8088/cluster)
  2. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/1580562/1629300585436-04766664-f6e6-423a-b762-a85f6e749801.png#clientId=ud9ac7dda-1f62-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=552&id=ue7a2b448&margin=%5Bobject%20Object%5D&name=image.png&originHeight=552&originWidth=1912&originalType=binary&ratio=1&rotation=0&showTitle=false&size=140320&status=done&style=none&taskId=ua4ac584a-b57a-4f2f-a81a-5b7000956b9&title=&width=1912)
  3. <a name="nL8Xi"></a>
  4. ## 9.2 Spark的底层实现
  5. <a name="UU260"></a>
  6. ### 9.2.1 Spark的工作原理
  7. 1. 首先通过Spark客户端**提交任务到Spark集群**
  8. 1. 然后Spark任务在执行的时候会**读取数据源HDFS中的数据**,**将数据加载到内存中**,**转化为RDD**
  9. 1. 然后针**对RDD调用一些高阶函数对数据进行处理**, 中间可以**调用多个高阶函数**
  10. 1. 此时经过flatmap计算之后,**前面RDD的数据传输到后面节点上面这个过程是不需要经过shue的**,可以**通过网络直接拷贝过去**
  11. 1. **RDD节点是一一对应的**,如节点1的数据拷贝→节点4,节点2的数据拷贝→节点5
  12. 1. 最终把计算出来的**结果数据写到HDFS中**
  13. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/1580562/1629301057285-33eb04dd-9f26-4692-a132-aa1ec0c5b87a.png#clientId=ubd9270b3-017b-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=831&id=u8c7223bf&margin=%5Bobject%20Object%5D&name=image.png&originHeight=831&originWidth=1773&originalType=binary&ratio=1&rotation=0&showTitle=false&size=118260&status=done&style=shadow&taskId=u0274b7ff-ef82-4f74-a170-655ab439a20&title=&width=1773)
  14. <a name="e7atm"></a>
  15. ### 9.2.2 什么是RDD?
  16. - RDD通常通过Hadoop上的文件
  17. - 即**HDFS文件进行创建**
  18. - 也可以**通过程序中的集合来创建**
  19. - 还可以**通过其他数据源创建**
  20. - RDDSpark提供的核心抽象
  21. - 全称为 **Resillient Distributed Dataset**
  22. - **弹性分布式数据集**
  23. - RDD的特点
  24. - **弹性**
  25. - RDD数据默认情况下存放在内存中,但是在**内存资源不足时**, Spark也会**自动将RDD数据写入磁盘**
  26. - **分布式**
  27. - RDD在抽象上来说**是一种元素数据的集合**,它是**被分区的**,**每个分区分布在集群中的不同节点上**,从而让RDD中的数据**可以被并行操作**
  28. - **容错性**
  29. - RDD最重要的特性就是提供了容错性,可以**自动从节点失败中恢复过来**
  30. <a name="rLrSL"></a>
  31. ### 9.2.3 Spark架构相关进程
  32. - Driver
  33. - 编写的Spark程序由Driver进程负责执行
  34. - Driver进程所在的节点可以是Spark集群的某一个节点或者就是我们提交Spark程序的客户端节点
  35. - Master
  36. - 集群的主节点中启动的进程
  37. - 主要负责集群资源的管理和分配,还有集群的监控等
  38. - Worker
  39. - 集群的从节点中启动的进程
  40. - 主要负责启动其它进程来执行具体数据的处理和计算任务
  41. - Executor
  42. - Worker负责启动的进程
  43. - 主要为了执行数据处理和计算
  44. - Task
  45. - Executor负责启动的线程
  46. - 它主要负责执行数据处理和计算,它是真正干活的
  47. - flatmapmap那些任务具体执行的时候,最终就会转化成这种Task
  48. <a name="X4g5q"></a>
  49. ### 9.2.4 Spark架构原理
  50. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/1580562/1629303325255-8f0d92cf-feb8-4a47-9686-49c1348f51c7.png#clientId=u9eec52ac-3b0e-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=626&id=uc854ba3e&margin=%5Bobject%20Object%5D&name=image.png&originHeight=626&originWidth=1268&originalType=binary&ratio=1&rotation=0&showTitle=false&size=150324&status=done&style=shadow&taskId=u85877294-f182-4210-9c42-618324f5a76&title=&width=1268)
  51. 1. 首先我们在spark的客户端机器上通过driver进程执行我们的Spark代码 当我们通过spark-submit脚本提交Spark任务的时候Driver进程就启动了。
  52. 1. Driver进程启动之后,会做一些初始化的操作,会找到集群master进程,对Spark应用程序进行注册
  53. 1. Master收到Spark程序的注册申请之后,会发送请求给Worker,进行资源的调度和分配
  54. 1. Worker收到Master的请求之后,会为Spark应用启动Executor进程会启动一个或者多个Executor,具体启动多少个,会根据你的配置来启动
  55. 1. Executor启动之后,会向Driver进行反注册,这样Driver就知道哪些Executor在为它服务了
  56. 1. Driver会根据我们对RDD定义的操作,提交一堆的taskExecutor上执行task里面执行的其实就是具体的map、flatMap这些操作
  57. <a name="E74RI"></a>
  58. ## 9.3 Spark上手体验
  59. <a name="lx27N"></a>
  60. ### 9.3.1 WordCount For Scala
  61. - 计算每个单词出现的次数
  62. <a name="LkqQj"></a>
  63. #### 1、准备数据
  64. ```basic
  65. vi hello.txt
  66. hello you
  67. hello me

2、创建空的Maven项目,引入Scala

image.png

3、引入依赖

  1. <dependency>
  2. <groupId>org.apache.spark</groupId>
  3. <artifactId>spark-core_2.11</artifactId>
  4. <version>2.4.3</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>com.alibaba</groupId>
  8. <artifactId>fastjson</artifactId>
  9. <version>1.2.68</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.apache.spark</groupId>
  13. <artifactId>spark-sql_2.11</artifactId>
  14. <version>2.4.3</version>
  15. </dependency>

4、编写Scala操作spark的代码

package com.jade.scala

import org.apache.spark.{SparkConf, SparkContext}

/**
 * 需求:单词计数
 * Created by xuwei
 */
object WordCountScala {

  def main(args: Array[String]): Unit = {
    //第一步:创建SparkContext
    val conf = new SparkConf()
    conf.setAppName("WordCountScala")//设置任务名称
      .setMaster("local")//local表示在本地测试执行

    val sc = new SparkContext(conf)

    //第二步:加载数据
    var path = "/Users/angel/Desktop/data/hello.txt"
    if(args.length==1){
      path = args(0)
    }
    val linesRDD = sc.textFile(path)

    //第三步:对数据进行切割,把一行数据切分成一个一个的单词
    val wordsRDD = linesRDD.flatMap(_.split(" "))

    //第四步:迭代words,将每个word转换为(word,1)这种形式
    val pairRDD = wordsRDD.map((_,1))

    //第五步:根据key(其实就是word)进行分组聚合统计
    val wordCountRDD = pairRDD.reduceByKey(_ + _)

    //第六步:将结果打印到控制台
    //注意:只有当任务执行到这一行代码的时候,任务才会真正开始执行计算
    //如果任务中没有这一行代码,前面的所有算子是不会执行的
    wordCountRDD.foreach(wordCount=>println(wordCount._1+"--"+wordCount._2))

    //第七步:停止SparkContext
    sc.stop()
  }
}

5、关键步骤具体分析

//第二步:加载数据
val linesRDD = sc.textFile("D:\\hello.txt")
linesRDD中的数据是这样的:
hello you
hello me


//第三步:对数据进行切割,把一行数据切分成一个一个的单词
val wordsRDD = linesRDD.flatMap(_.split(" "))
wordsRDD中的数据是这样的:
hello
you
hello
me


//第四步:迭代words,将每个word转换为(word,1)这种形式
val pairRDD = wordsRDD.map((_,1))
pairRDD中的数据是这样的:
(hello,1)
(you,1)
(hello,1)
(me,1)

//第五步:根据key(其实就是word)进行分组聚合统计
val wordCountRDD = pairRDD.reduceByKey(_ + _)
wordCountRDD中的数据是这样的:
(hello,2)
(you,1)
(me,1)

9.3.2 WordCount For Java

package com.jade.java;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

/**
 * 需求:单词计数
 * Created by xuwei
 */
public class WordCountJava {
    public static void main(String[] args) {
        //第一步:创建JavaSparkContext
        SparkConf conf = new SparkConf();
        conf.setAppName("WordCountJava").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        //第二步:加载数据
        String path = "/Users/angel/Desktop/data/hello.txt";
        if(args.length==1){
            path = args[0];
        }
        JavaRDD<String> linesRDD = sc.textFile(path);
        //第三步:对数据进行切割,把一行数据切分成一个一个的单词
        //注意:FlatMapFunction的泛型,第一个参数表示输入数据类型,第二个表示是输出数据类型
        JavaRDD<String> wordsRDD = linesRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" ")).iterator();
            }
        });
        //第四步:迭代words,将每个word转换为(word,1)这种形式
        //注意:PairFunction的泛型,第一个参数是输入数据类型
        //第二个是输出tuple中的第一个参数类型,第三个是输出tuple中的第二个参数类型
        //注意:如果后面需要使用到....ByKey,前面都需要使用mapToPair去处理
        JavaPairRDD<String, Integer> pairRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<String, Integer>(word, 1);
            }
        });
        //第五步:根据key(其实就是word)进行分组聚合统计
        JavaPairRDD<String, Integer> wordCountRDD = pairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer i1, Integer i2) throws Exception {
                return i1 + i2;
            }
        });
        //第六步:将结果打印到控制台
        wordCountRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {
            @Override
            public void call(Tuple2<String, Integer> tup) throws Exception {
                System.out.println(tup._1+"--"+tup._2);
            }
        });
        //第七步:停止SparkContext
        sc.stop();
    }
}

9.3.3 Spark任务的三种提交方式

  1. 直接在idea中执行,方便在本地环境调试代码
  2. 使用 spark-submit,提交到集群执行【实际工作中使用
  3. 使用 spark-shell,方便在集群环境调试代码

1、在maven添加打包插件

<build>
    <plugins>
        <!-- java编译插件 -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
        <!-- scala编译插件 -->
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.1.6</version>
            <configuration>
                <scalaCompatVersion>2.11</scalaCompatVersion>
                <scalaVersion>2.11.12</scalaVersion>
            </configuration>
            <executions>
                <execution>
                    <id>compile-scala</id>
                    <phase>compile</phase>
                    <goals>
                        <goal>add-source</goal>
                        <goal>compile</goal>
                    </goals>
                </execution>
                <execution>
                    <id>test-compile-scala</id>
                    <phase>test-compile</phase>
                    <goals>
                        <goal>add-source</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
        <!-- 打包插件 -->
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
                <archive>
                    <manifest>
                        <mainClass></mainClass>
                    </manifest>
                </archive>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

2、过滤掉依赖打包

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.4.3</version>
  <scope>provided</scope>
</dependency>
<dependency>
  <groupId>com.alibaba</groupId>
  <artifactId>fastjson</artifactId>
  <version>1.2.68</version>
  <scope>provided</scope>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.11</artifactId>
  <version>2.4.3</version>
  <scope>provided</scope>
</dependency>

3、注释Java和Scala代码中的本地测试方法

//local表示在本地测试执行
//.setMaster("local")

4、打包编译

image.png

5、创建文件夹,上传包

[root@bigdata04 /]# mkdir /data/soft/sparkjars

image.png

6、编写Spark-Submit脚本

[root@bigdata04 sparkjars]# vi wordCountJob.sh

spark-submit \
--class com.jade.scala.WordCountScala \
--master yarn \
--deploy-mode client \
--executor-memory 1G \
--num-executors 1 \
db_scala-1.0-SNAPSHOT-jar-with-dependencies.jar \
hdfs://bigdata01:9000/test/hello.txt

7、切换bigdata01启动Hadoop集群和historyserver进程

[root@bigdata01 ~]# /data/soft/hadoop-3.2.0/sbin/start-all.sh
[root@bigdata01 ~]# /data/soft/hadoop-3.2.0/bin/mapred --daemon start historyserver

[root@bigdata02 ~]# /data/soft/hadoop-3.2.0/bin/mapred --daemon start historyserver

[root@bigdata03 ~]# /data/soft/hadoop-3.2.0/bin/mapred --daemon start historyserver

8、切换bigdata04启动脚本

[root@bigdata04 sparkjars]# sh -x wordCountJob.sh

9、查看结果

image.png

9.3.4 Spark提交任务之Spark-Shell

  • 使用Local模式开启本地集群
  • 使用on yarn模式,「比较常用

1、local模式

[root@bigdata04 /]# spark-shell
2021-08-20 18:39:22,969 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://bigdata04:4040
Spark context available as 'sc' (master = local[*], app id = local-1629455973389).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.3
      /_/

Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_202)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

2、On Yarn模式

[root@bigdata04 /]# spark-shell --master yarn --deploy-mode client
2021-08-20 19:09:10,832 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2021-08-20 19:09:19,176 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
Spark context Web UI available at http://bigdata04:4040
Spark context available as 'sc' (master = yarn, app id = application_1629453518219_0001).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.3
      /_/

Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_202)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

image.png

  • 这个时候执行的代码就是在YARN上执行的了 ```scala scala> val linesRDD = sc.textFile(“/test/hello.txt”) linesRDD: org.apache.spark.rdd.RDD[String] = /test/hello.txt MapPartitionsRDD[1] at textFile at :24

scala> val wordsRDD = linesRDD.flatMap(_.split(“ “)) wordsRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at :25

scala> val pairRDD = wordsRDD.map((_,1)) pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at :25

scala> val wordCountRDD = pairRDD.reduceByKey( + ) wordCountRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at :25

scala> wordCountRDD.foreach(wordCount=>println(wordCount._1+”—“+wordCount._2)) [Stage 0:> (0 [Stage 0:===================> (1 [Stage 0:=======================================> (2



**_「注意:On Yarn模式是查看不到输出结果的,需要到Yarn查看日志」_**


<a name="ydRSf"></a>
#### 3、Spark开启historyServer服务
```basic
# 切换目录到conf
[root@bigdata04 /]# cd /data/soft/spark-2.4.3-bin-hadoop2.7/conf

# 改配置名
mv spark-defaults.conf.template spark-defaults.conf

# 添加第一个配置信息
vi spark-defaults.conf
...
spark.eventLog.enabled=true
spark.eventLog.compress=true
spark.eventLog.dir=hdfs://bigdata01:9000/tmp/logs/root/logs
spark.history.fs.logDirectory=hdfs://bigdata01:9000/tmp/logs/root/logs
spark.yarn.historyServer.address=http://bigdata04:18080

# 添加第二个配置信息
vi spark-env.sh
...
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.fs.logDirectory=hdfs://bigdata01:9000/tmp/logs/root/logs"

「注意:在哪个节点上启动 spark 的 historyserver 进程, spark.yarn.historyServer.address 的值里 面就指定哪个节点的主机名信息」

4、启动historyServer服务

# 启动服务
[root@bigdata04 /]# /data/soft/spark-2.4.3-bin-hadoop2.7/sbin/start-history-server.sh

# 验证服务
[root@bigdata04 /]# jps
7654 Jps
7613 HistoryServer

5、启动spark脚本

# 进入目录
[root@bigdata04 /]# cd /data/soft/sparkjars

# 启动脚本
[root@bigdata04 /]# sh -x wordCountJob.sh

image.png

image.png

image.png

image.png

9.4 Spark RDD编程实战

9.4.1 创建RDD的三种方式

  • RDD是 Spark编程的核心,在进行 Spark编程时,首要任务是创建一个初始的RDD
  • Spark提供三种创建RDD方式
    • 集合
      • 主要用于进行测试,可以在实际部署到集群运行之前,自己使用集合构 造一些测试数据,来测试后面的spark应用程序的流程
    • 本地文件
      • 主要用于临时性地处理一些存储了大量数据的文件
    • HDFS文件「工作常用」
      • 是最常用的生产环境的处理方式,主要可以针对HDFS上存储的数据,进 行离线批处理操作

9.4.2 集合创建RDD

  • 通过 SparkContextparallelize的()方法将集合转化为RDD
  • 可以通过 parallelize() 方法的第二个参数来设置RDD的partition数量
    • Spark会为每一个partition运行一个task来进行处理

1、Scala实现

import org.apache.spark.{SparkConf, SparkContext}

/**
 * @description: 在本地用数组创建RDD
 * @projectName: db_scala
 * @since: com.jade.scala
 * @author: jade
 * @createTime: 2021/8/24 6:59 下午
 */
object CreateRddByArrayScala {
  def main(args: Array[String]): Unit = {
    //创建SparkContext
    val conf = new SparkConf()
    conf.setAppName("CreateRddByArrayScala").setMaster("local")
    val sc = new SparkContext(conf)

    //创建集合
    val arr = Array(1,2,3,4,5)
    //基于集合创建RDD
    val rdd = sc.parallelize(arr)
    //对集合中的数据求和
    val sum = rdd.reduce(_ + _)

    //注意:这行println代码是在driver进程中执行的
    println(sum)
  }
}

2、Java实现

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import java.util.Arrays;
import java.util.List;

/**
 * 需求:使用集合创建RDD
 * Created by jade
 */
public class CreateRddByArrayJava {
    public static void main(String[] args) {
        //创建JavaSparkContext
        SparkConf conf = new SparkConf();
        conf.setAppName("CreateRddByArrayJava")
            .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        //创建集合
        List<Integer> arr = Arrays.asList(1, 2, 3, 4, 5);
        JavaRDD<Integer> rdd = sc.parallelize(arr);
        Integer sum = rdd.reduce(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer i1, Integer i2) throws Exception {
                return i1 + i2;
            }
        });

        System.out.println(sum);

        sc.stop();
    }
}

9.4.3 HDFS文件创建RDD

  • 通过SparkContext的textFile()方法,可以针对本地文件或HDFS文件创建RDD
  • textFile() 方法支持针对目录、压缩文件以及通配符创建RDD
  • Spark默认会为HDFS文件的每一个Block创建一个partition
    • 也可以通过textFile()的第二个参数手动设置分区数量
    • 只能比Block数量多,不能比Block数量少

1、Scala实现

import org.apache.spark.{SparkConf, SparkContext}

/**
 * @description: 用文件创建RDD
 * @projectName: db_scala
 * @since: com.jade.scala
 * @author: jade
 * @createTime: 2021/8/24 7:05 下午
 */
object CreateRddByFileScala {
  def main(args: Array[String]): Unit = {
    //创建SparkContext
    val conf = new SparkConf()
    conf.setAppName("CreateRddByFileScala")
      .setMaster("local")
    val sc = new SparkContext(conf)

    // 本地文件
    var path = "/Users/angel/Desktop/data/hello.txt"
    // HDFS文件
    path = "hdfs://bigdata01:9000/test/hello.txt"
    //读取文件数据,可以在textFile中指定生成的RDD的分区数量
    val rdd = sc.textFile(path,2)

    //获取每一行数据的长度,计算文件内数据的总长度
    val length = rdd.map(_.length).reduce(_ + _)

    println(length)

    sc.stop()
  }
}

2、Java实现

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;

/**
 * 需求:通过文件创建RDD
 * Created by jade
 */
public class CreateRddByFileJava {
    public static void main(String[] args) {
        //创建JavaSparkContext
        SparkConf conf = new SparkConf();
        conf.setAppName("CreateRddByArrayJava")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        String path = "/Users/angel/Desktop/data/hello.txt";
        path = "hdfs://bigdata01:9000/test/hello.txt";
        JavaRDD<String> rdd = sc.textFile(path, 2);
        //获取每一行数据的长度
        JavaRDD<Integer> lengthRDD = rdd.map(new Function<String, Integer>() {
            @Override
            public Integer call(String line) throws Exception {
                return line.length();
            }
        });

        //计算文件内数据的总长度
        Integer length = lengthRDD.reduce(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer i1, Integer i2) throws Exception {
                return i1 + i2;
            }
        });

        System.out.println(length);

        sc.stop();
    }
}

9.4.4 Transformation和Action

  • Spark支持两种RDD操作,也称为算子
    • Transformation
    • Action
  • Transformation会针对已有的RDD创建一个新的RDD
    • 比如:map算子flatMap算子filter算子
  • Action主要对RDD进行最后的操作
    • 比如:遍历reduce算子保存到文件
    • 并且还可以把结果返回给Driver程序
  • 有一个特性
    • lazy
      • transformation 是不会触发 spark 任务的执行, 它们只是记录了对 RDD 所做的操作, 不会执行
      • 只有当transformation之后,接着执行了一个action操作,那么所有的transformation才会执行
      • park通过lazy这种特性,来进行底层的spark任务执行的优化,避免产生过多中间结果。
  • Action的特性
    • 执行Action操作才会触发一个Spark Job的运行,从而触发这个Action之前所有的Transformation的执行

1、常用Transformation

算子 介绍
map 将RDD中的每个元素进行处理,一进一出
filter 对RDD中每个元素进行判断,返回true则保留
flatMap 与map类似,但是每个元素都可以返回一个或多个新元素
groupByKey 根据key进行分组,每个key对应一个Iterable
reduceByKey 对每个相同key对应的value进行reduce操作
sortByKey 对每个相同key对应的value进行排序操作(全局排序)
join 对两个包含对的RDD进行join操作
distinct 对RDD中的元素进行全局去重

2、Transformation之Scala

import org.apache.spark.{SparkConf, SparkContext}

/**
 * 需求:Transformation实战
 * map:对集合中每个元素乘以2
 * filter:过滤出集合中的偶数
 * flatMap:将行拆分为单词
 * groupByKey:对每个大区的主播进行分组
 * reduceByKey:统计每个大区的主播数量
 * sortByKey:对主播的音浪收入排序
 * join:打印每个主播的大区信息和音浪收入
 * distinct:统计当天开播的大区信息
 */
object TransformationOpScala {

  def main(args: Array[String]): Unit = {
    val sc = getSparkContext
    //map:对集合中每个元素乘以2
    //mapOp(sc)
    //filter:过滤出集合中的偶数
    //filterOp(sc)
    //flatMap:将行拆分为单词
    //flatMapOp(sc)
    //groupByKey:对每个大区的主播进行分组
    //groupByKeyOp(sc)
    //groupByKeyOp2(sc)
    //reduceByKey:统计每个大区的主播数量
    //reduceByKeyOp(sc)
    //sortByKey:对主播的音浪收入排序
    //sortByKeyOp(sc)
    //join:打印每个主播的大区信息和音浪收入
    //joinOp(sc)
    //distinct:统计当天开播的大区信息
    //distinctOp(sc)

    sc.stop()
  }

  def distinctOp(sc: SparkContext): Unit = {
    val dataRDD = sc.parallelize(Array((150001,"US"),(150002,"CN"),(150003,"CN"),(150004,"IN")))
    //由于是统计开播的大区信息,需要根据大区信息去重,所以只保留大区信息
    dataRDD.map(_._2).distinct().foreach(println(_))
  }

  def joinOp(sc: SparkContext): Unit = {
    val dataRDD1 = sc.parallelize(Array((150001,"US"),(150002,"CN"),(150003,"CN"),(150004,"IN")))
    val dataRDD2 = sc.parallelize(Array((150001,400),(150002,200),(150003,300),(150004,100)))

    val joinRDD = dataRDD1.join(dataRDD2)
    //joinRDD.foreach(println(_))
    joinRDD.foreach(tup=>{
      //用户id
      val uid = tup._1
      val area_gold = tup._2
      //大区
      val area = area_gold._1
      //音浪收入
      val gold = area_gold._2
      println(uid+"\t"+area+"\t"+gold)
    })
  }

  def sortByKeyOp(sc: SparkContext): Unit = {
    val dataRDD = sc.parallelize(Array((150001,400),(150002,200),(150003,300),(150004,100)))
    //由于需要对音浪收入进行排序,所以需要把音浪收入作为key,在这里要进行位置和互换
    /*dataRDD.map(tup=>(tup._2,tup._1))
      .sortByKey(false)//默认是正序,第一个参数为true,想要倒序需要把这个参数设置为false
      .foreach(println(_))*/
    //sortBy的使用:可以动态指定排序字段,比较灵活
    dataRDD.sortBy(_._2,false).foreach(println(_))
    dataRDD.sortByKey()
  }

  def reduceByKeyOp(sc: SparkContext): Unit = {
    val dataRDD = sc.parallelize(Array((150001,"US"),(150002,"CN"),(150003,"CN"),(150004,"IN")))
    //由于这个需求只需要使用到大区信息,所以在mao操作的时候只保留大区信息即可
    dataRDD.map(tup=>(tup._2,1)).reduceByKey(_ + _).foreach(println(_))
  }

  def groupByKeyOp(sc: SparkContext): Unit = {
    val dataRDD = sc.parallelize(Array((150001,"US"),(150002,"CN"),(150003,"CN"),(150004,"IN")))

    //需要使用map对tuple中的数据进行位置互换,因为我们需要把大区作为key进行分组操作
    //此时的key就是tuple中的第一列,其实在这里就可以把这个tuple认为是一个key-value
    //注意:在使用类似于groupByKey这种基于key的算子的时候,需要提前把RDD中的数据组装成tuple2这种形式
    //此时map算子之后生成的新的数据格式是这样的:("US",150001)
    //如果tuple中的数据列数超过了2列怎么办?看groupByKeyOp2
    dataRDD.map(tup=>(tup._2,tup._1)).groupByKey().foreach(tup=>{
      //获取大区信息
      val area = tup._1
      print(area+":")
      //获取同一个大区对应的所有用户id
      val it = tup._2
      for(uid <- it){
        print(uid+" ")
      }
      println()
    })
  }

  /**
   * TODO: 考虑使用grouBy去实现
   * @param sc
   */
  def groupByKeyOp2(sc: SparkContext): Unit = {
    val dataRDD = sc.parallelize(Array((150001, "US","male"), (150002, "CN","female"), (150003, "CN","male"), (150004, "IN","female")))
    //如果tuple中的数据列数超过了2列怎么办?
    //把需要作为key的那一列作为tuple2的第一列,剩下的可以再使用一个tuple2包装一下
    //此时map算子之后生成的新的数据格式是这样的:("US",(150001,"male"))
    //注意:如果你的数据结构比较负责,你可以在执行每一个算子之后都调用foreach打印一下,确认数据的格式
    dataRDD.map(tup=>(tup._2,(tup._1,tup._3))).groupByKey().foreach(tup=>{
      //获取大区信息
      val area = tup._1
      print(area+":")
      //获取同一个大区对应的所有用户id和性别信息
      val it = tup._2
      for((uid,sex) <- it){
        print("<"+uid+","+sex+"> ")
      }
      println()
    })
  }

    def flatMapOp(sc: SparkContext): Unit = {
    val dataRDD = sc.parallelize(Array("good good study","day day up"))
    dataRDD.flatMap(_.split(" ")).foreach(println(_))
  }

  def filterOp(sc: SparkContext): Unit = {
    val dataRDD = sc.parallelize(Array(1,2,3,4,5))
    //满足条件的保留下来
    dataRDD.filter(_ % 2 == 0).foreach(println(_))
  }

  def mapOp(sc: SparkContext): Unit = {
    val dataRDD = sc.parallelize(Array(1,2,3,4,5))
    dataRDD.map(_ * 2).foreach(println(_))
  }

  def getSparkContext = {
    val conf = new SparkConf()
    conf.setAppName("WordCountScala")
      .setMaster("local")
    new SparkContext(conf)
  }

}

3、Transformation之Java

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import scala.Tuple2;
import scala.Tuple3;

import java.util.Arrays;
import java.util.Iterator;

/**
 * 需求:Transformation实战
 * map:对集合中每个元素乘以2
 * filter:过滤出集合中的偶数
 * flatMap:将行拆分为单词
 * groupByKey:对每个大区的主播进行分组
 * reduceByKey:统计每个大区的主播数量
 * sortByKey:对主播的音浪收入排序
 * join:打印每个主播的大区信息和音浪收入
 * distinct:统计当天开播的大区信息
 */
public class TransformationOpJava {

    public static void main(String[] args) {
        JavaSparkContext sc = getSparkContext();
        //map:对集合中每个元素乘以2
        //mapOp(sc);
        //filter:过滤出集合中的偶数
        //filterOp(sc);
        //flatMap:将行拆分为单词
        //flatMapOp(sc);
        //groupByKey:对每个大区的主播进行分组
        //groupByKeyOp(sc);
        //groupByKeyOp2(sc);
        //reduceByKey:统计每个大区的主播数量
        //reduceByKeyOp(sc);
        //sortByKey:对主播的音浪收入排序
        //sortByKeyOp(sc);
        //join:打印每个主播的大区信息和音浪收入
        //joinOp(sc);
        //distinct:统计当天开播的大区信息
        //distinctOp(sc);

        sc.stop();
    }

    private static void distinctOp(JavaSparkContext sc) {
        Tuple2<Integer, String> t1 = new Tuple2<>(150001, "US");
        Tuple2<Integer, String> t2 = new Tuple2<>(150002, "CN");
        Tuple2<Integer, String> t3 = new Tuple2<>(150003, "CN");
        Tuple2<Integer, String> t4 = new Tuple2<>(150004, "IN");
        JavaRDD<Tuple2<Integer, String>> dataRDD = sc.parallelize(Arrays.asList(t1, t2, t3, t4));

        dataRDD.map(new Function<Tuple2<Integer, String>, String>() {
            @Override
            public String call(Tuple2<Integer, String> tup) throws Exception {
                return tup._2;
            }
        }).distinct().foreach(new VoidFunction<String>() {
            @Override
            public void call(String area) throws Exception {
                System.out.println(area);
            }
        });
    }

    private static void joinOp(JavaSparkContext sc) {
        Tuple2<Integer, String> t1 = new Tuple2<>(150001, "US");
        Tuple2<Integer, String> t2 = new Tuple2<>(150002, "CN");
        Tuple2<Integer, String> t3 = new Tuple2<>(150003, "CN");
        Tuple2<Integer, String> t4 = new Tuple2<>(150004, "IN");
        Tuple2<Integer, Integer> t5 = new Tuple2<Integer, Integer>(150001, 400);
        Tuple2<Integer, Integer> t6 = new Tuple2<Integer, Integer>(150002, 200);
        Tuple2<Integer, Integer> t7 = new Tuple2<Integer, Integer>(150003, 300);
        Tuple2<Integer, Integer> t8 = new Tuple2<Integer, Integer>(150004, 100);

        JavaRDD<Tuple2<Integer, String>> dataRDD1 = sc.parallelize(Arrays.asList(t1, t2, t3, t4));
        JavaRDD<Tuple2<Integer, Integer>> dataRDD2 = sc.parallelize(Arrays.asList(t5, t6, t7, t8));

        JavaPairRDD<Integer, String> dataRDD1Pair = dataRDD1.mapToPair(new PairFunction<Tuple2<Integer, String>, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(Tuple2<Integer, String> tup)
                    throws Exception {
                return new Tuple2<Integer, String>(tup._1, tup._2);
            }
        });

        JavaPairRDD<Integer, Integer> dataRDD2Pair = dataRDD2.mapToPair(new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
            @Override
            public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> tup)
                    throws Exception {
                return new Tuple2<Integer, Integer>(tup._1, tup._2);
            }
        });

        dataRDD1Pair.join(dataRDD2Pair).foreach(new VoidFunction<Tuple2<Integer, Tuple2<String, Integer>>>() {
            @Override
            public void call(Tuple2<Integer, Tuple2<String, Integer>> tup)
                    throws Exception {
                System.out.println(tup);
            }
        });

    }

    private static void sortByKeyOp(JavaSparkContext sc) {
        Tuple2<Integer, Integer> t1 = new Tuple2<Integer, Integer>(150001, 400);
        Tuple2<Integer, Integer> t2 = new Tuple2<Integer, Integer>(150002, 200);
        Tuple2<Integer, Integer> t3 = new Tuple2<Integer, Integer>(150003, 300);
        Tuple2<Integer, Integer> t4 = new Tuple2<Integer, Integer>(150004, 100);
        JavaRDD<Tuple2<Integer, Integer>> dataRDD = sc.parallelize(Arrays.asList(t1, t2, t3, t4));
        /*dataRDD.mapToPair(new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
            @Override
            public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> tup)
                    throws Exception {
                return new Tuple2<Integer, Integer>(tup._2,tup._1);
            }
        }).sortByKey(false).foreach(new VoidFunction<Tuple2<Integer, Integer>>() {
            @Override
            public void call(Tuple2<Integer, Integer> tup) throws Exception {
                System.out.println(tup);
            }
        });*/
        //使用sortBy
        dataRDD.sortBy(new Function<Tuple2<Integer, Integer>, Integer>() {
            @Override
            public Integer call(Tuple2<Integer, Integer> tup) throws Exception {
                return tup._2;
            }
        },false,1).foreach(new VoidFunction<Tuple2<Integer, Integer>>() {
            @Override
            public void call(Tuple2<Integer, Integer> tup)
                    throws Exception {
                System.out.println(tup);
            }
        });
    }

    private static void reduceByKeyOp(JavaSparkContext sc) {
        Tuple2<Integer, String> t1 = new Tuple2<>(150001, "US");
        Tuple2<Integer, String> t2 = new Tuple2<>(150002, "CN");
        Tuple2<Integer, String> t3 = new Tuple2<>(150003, "CN");
        Tuple2<Integer, String> t4 = new Tuple2<>(150004, "IN");

        JavaRDD<Tuple2<Integer, String>> dataRDD = sc.parallelize(Arrays.asList(t1, t2, t3, t4));
        dataRDD.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<Integer, String> tup)
                    throws Exception {
                return new Tuple2<String, Integer>(tup._2,1);
            }
        }).reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer i1, Integer i2) throws Exception {
                return i1 + i2;
            }
        }).foreach(new VoidFunction<Tuple2<String, Integer>>() {
            @Override
            public void call(Tuple2<String, Integer> tup) throws Exception {
                System.out.println(tup);
            }
        });
    }


    private static void groupByKeyOp(JavaSparkContext sc) {
        Tuple2<Integer, String> t1 = new Tuple2<>(150001, "US");
        Tuple2<Integer, String> t2 = new Tuple2<>(150002, "CN");
        Tuple2<Integer, String> t3 = new Tuple2<>(150003, "CN");
        Tuple2<Integer, String> t4 = new Tuple2<>(150004, "IN");

        JavaRDD<Tuple2<Integer, String>> dataRDD = sc.parallelize(Arrays.asList(t1, t2, t3, t4));

        dataRDD.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<Integer, String> tup)
                    throws Exception {
                return new Tuple2<String, Integer>(tup._2,tup._1);
            }
        }).groupByKey().foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
            @Override
            public void call(Tuple2<String, Iterable<Integer>> tup)
                    throws Exception {
                //获取大区信息
                String area = tup._1;
                System.out.print(area+":");
                //获取同一个大区对应的所用用户id
                Iterable<Integer> it = tup._2;
                for(Integer uid : it){
                    System.out.print(uid+" ");
                }
                System.out.println();
            }
        });
    }

    private static void groupByKeyOp2(JavaSparkContext sc) {
        Tuple3<Integer, String,String> t1 = new Tuple3<Integer, String,String>(150001, "US","male");
        Tuple3<Integer, String,String> t2 = new Tuple3<Integer, String,String>(150002, "CN","female");
        Tuple3<Integer, String,String> t3 = new Tuple3<Integer, String,String>(150003, "CN","male");
        Tuple3<Integer, String,String> t4 = new Tuple3<Integer, String,String>(150004, "IN","female");
        JavaRDD<Tuple3<Integer, String, String>> dataRDD = sc.parallelize(Arrays.asList(t1, t2, t3, t4));

        dataRDD.mapToPair(new PairFunction<Tuple3<Integer, String, String>, String, Tuple2<Integer,String>>() {
            @Override
            public Tuple2<String, Tuple2<Integer, String>> call(Tuple3<Integer, String, String> tup)
                    throws Exception {
                //("US",(150001,"male"))
                return new Tuple2<String, Tuple2<Integer, String>>(tup._2(),new Tuple2<Integer, String>(tup._1(),tup._3()));
            }
        }).groupByKey().foreach(new VoidFunction<Tuple2<String, Iterable<Tuple2<Integer, String>>>>() {
            @Override
            public void call(Tuple2<String, Iterable<Tuple2<Integer, String>>> tup)
                    throws Exception {
                //获取大区信息
                String area = tup._1;
                System.out.print(area+":");
                //获取同一个大区对应的所用用户id和性别信息
                Iterable<Tuple2<Integer, String>> it = tup._2;
                for(Tuple2<Integer,String> tu : it){
                    System.out.print("<"+tu._1+","+tu._2+"> ");
                }
                System.out.println();
            }
        });
    }


    private static void flatMapOp(JavaSparkContext sc) {
        JavaRDD<String> dataRDD = sc.parallelize(Arrays.asList("good good study","day day up"));
        dataRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String line) throws Exception {
                String[] words = line.split(" ");
                return Arrays.asList(words).iterator();
            }
        }).foreach(new VoidFunction<String>() {
            @Override
            public void call(String word) throws Exception {
                System.out.println(word);
            }
        });
    }

    private static void filterOp(JavaSparkContext sc) {
        JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
        dataRDD.filter(new Function<Integer, Boolean>() {
            @Override
            public Boolean call(Integer i1) throws Exception {
                return i1 % 2 == 0;
            }
        }).foreach(new VoidFunction<Integer>() {
            @Override
            public void call(Integer i1) throws Exception {
                System.out.println(i1);
            }
        });
    }

    private static void mapOp(JavaSparkContext sc) {
        JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
        dataRDD.map(new Function<Integer, Integer>() {
            @Override
            public Integer call(Integer i1) throws Exception {
                return i1 * 2;
            }
        }).foreach(new VoidFunction<Integer>() {
            @Override
            public void call(Integer i1) throws Exception {
                System.out.println(i1);
            }
        });
    }

    private static JavaSparkContext getSparkContext() {
        //创建JavaSparkContext
        SparkConf conf = new SparkConf();
        conf.setAppName("TransformationOpJava")
                .setMaster("local");
        return new JavaSparkContext(conf);
    }
}

4、常用Action

算子 介绍
reduce 聚合计算
collect 获取元素集合
take(n 获取前n个元素
count 获取元素总数
saveAsTextFile 保存文件
countByKey 统计相同的key出现多少次
foreach 迭代遍历元素

5、Action之Scala

import org.apache.spark.{SparkConf, SparkContext}

/**
 * 需求:Action实战
 * reduce:聚合计算
 * collect:获取元素集合
 * take(n):获取前n个元素
 * count:获取元素总数
 * saveAsTextFile:保存文件
 * countByKey:统计相同的key出现多少次
 * foreach:迭代遍历元素
 */
object ActionOpScala {

  def main(args: Array[String]): Unit = {
    val sc = getSparkContext
    //reduce:聚合计算
    //reduceOp(sc)
    //collect:获取元素集合
    //collectOp(sc)
    //take(n):获取前n个元素
    //takeOp(sc)
    //count:获取元素总数
    //countOp(sc)
    //saveAsTextFile:保存文件
    //saveAsTextFileOp(sc)
    //countByKey:统计相同的key出现多少次
    //countByKeyOp(sc)
    //foreach:迭代遍历元素
    //foreachOp(sc)

    sc.stop()
  }

  def foreachOp(sc: SparkContext): Unit = {
    val dataRDD = sc.parallelize(Array(1,2,3,4,5))
    //注意:foreach不仅限于执行println操作,这个只是在测试的时候使用的
    //实际工作中如果需要把计算的结果保存到第三方的存储介质中,就需要使用foreach
    //在里面实现具体向外部输出数据的代码
    dataRDD.foreach(println(_))
  }

  def countByKeyOp(sc: SparkContext): Unit = {
    val daraRDD = sc.parallelize(Array(("A",1001),("B",1002),("A",1003),("C",1004)))
    //返回的是一个map类型的数据
    val res = daraRDD.countByKey()
    for((k,v) <- res){
      println(k+","+v)
    }
  }

  def saveAsTextFileOp(sc: SparkContext): Unit = {
    val dataRDD = sc.parallelize(Array(1,2,3,4,5))
    //指定HDFS的路径信息即可,需要指定一个不存在的目录
    dataRDD.saveAsTextFile("hdfs://bigdata01:9000/out001")
  }

  def countOp(sc: SparkContext): Unit = {
    val dataRDD = sc.parallelize(Array(1,2,3,4,5))
    val res = dataRDD.count()
    println(res)
  }

  def takeOp(sc: SparkContext): Unit = {
    val dataRDD = sc.parallelize(Array(1,2,3,4,5))
    //从RDD中获取前2个元素
    val res = dataRDD.take(2)
    for(item <- res){
      println(item)
    }
  }

  def collectOp(sc: SparkContext): Unit = {
    val dataRDD = sc.parallelize(Array(1,2,3,4,5))
    //collect返回的是一个Array数组
    //注意:如果RDD中数据量过大,不建议使用collect,因为最终的数据会返回给Driver进程所在的节点
    //如果想要获取几条数据,查看一下数据格式,可以使用take(n)
    val res = dataRDD.collect()
    for(item <- res){
      println(item)
    }
  }

  def reduceOp(sc: SparkContext): Unit = {
    val dataRDD = sc.parallelize(Array(1,2,3,4,5))
    val num = dataRDD.reduce(_ + _)
    println(num)
  }

  def getSparkContext = {
    val conf = new SparkConf()
    conf.setAppName("ActionOpScala")
      .setMaster("local")
    new SparkContext(conf)
  }

}

5、Action之Java

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

/**
 * 需求:Action实战
 * reduce:聚合计算
 * collect:获取元素集合
 * take(n):获取前n个元素
 * count:获取元素总数
 * saveAsTextFile:保存文件
 * countByKey:统计相同的key出现多少次
 * foreach:迭代遍历元素
 */
public class ActionOpJava {

    public static void main(String[] args) {
        JavaSparkContext sc = getSparkContext();
        //reduce:聚合计算
        //reduceOp(sc);
        //collect:获取元素集合
        //collectOp(sc);
        //take(n):获取前n个元素
        //takeOp(sc);
        //count:获取元素总数
        //countOp(sc);
        //saveAsTextFile:保存文件
        //saveAsTextFileOp(sc);
        //countByKey:统计相同的key出现多少次
        //countByKeyOp(sc);
        //foreach:迭代遍历元素
        //foreachOp(sc);

        sc.stop();
    }

    private static void foreachOp(JavaSparkContext sc) {
        JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
        dataRDD.foreach(new VoidFunction<Integer>() {
            @Override
            public void call(Integer i) throws Exception {
                System.out.println(i);
            }
        });
    }

    private static void countByKeyOp(JavaSparkContext sc) {
        Tuple2<String, Integer> t1 = new Tuple2<>("A", 1001);
        Tuple2<String, Integer> t2 = new Tuple2<>("B", 1002);
        Tuple2<String, Integer> t3 = new Tuple2<>("A", 1003);
        Tuple2<String, Integer> t4 = new Tuple2<>("C", 1004);

        JavaRDD<Tuple2<String, Integer>> dataRDD = sc.parallelize(Arrays.asList(t1, t2, t3, t4));
        //想要使用countByKey,需要先使用mapToPair对RDD进行转换
        Map<String, Long> res = dataRDD.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<String, Integer> tup)
                    throws Exception {
                return new Tuple2<String, Integer>(tup._1, tup._2);
            }
        }).countByKey();
        for(Map.Entry<String,Long> entry : res.entrySet()){
            System.out.println(entry.getKey()+","+entry.getValue());
        }

    }

    private static void saveAsTextFileOp(JavaSparkContext sc) {
        JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
        dataRDD.saveAsTextFile("hdfs://bigdata01:9000/out002");
    }

    private static void countOp(JavaSparkContext sc) {
        JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
        long res = dataRDD.count();
        System.out.println(res);
    }

    private static void takeOp(JavaSparkContext sc) {
        JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
        List<Integer> res = dataRDD.take(2);
        for(Integer item : res){
            System.out.println(item);
        }
    }

    private static void collectOp(JavaSparkContext sc) {
        JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
        List<Integer> res = dataRDD.collect();
        for(Integer item : res){
            System.out.println(item);
        }
    }

    private static void reduceOp(JavaSparkContext sc) {
        JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
        Integer num = dataRDD.reduce(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer i1, Integer i2) throws Exception {
                return i1 + i2;
            }
        });

        System.out.println(num);
    }

    private static JavaSparkContext getSparkContext() {
        //创建JavaSparkContext
        SparkConf conf = new SparkConf();
        conf.setAppName("ActionOpJava")
                .setMaster("local");
        return new JavaSparkContext(conf);
    }
}

9.4.5 RDD持久化

1、RDD持久化原理

  • Spark中有一个非常重要的功能就是可以对RDD进行持久化

    • 当对RDD执行持久化操作时,每个节点都会 将自己操作的RDD的partition数据持久化到内存中,并且在之后对该RDD的反复使用中,直接使用内存中缓存的partition数据
    • 针对一个 RDD 反复执行多个操作的场景, 就只需要对 RDD 计算一次即可, 后面直接使用该 RDD,而不需要反复计算多次该RDD,特别是对于迭代式算法和快速交互式应用来说,RDD持久化,是非常重要的
    • 在该RDD第一次被计算出来时,就会直接缓存在每个节点中
      • 而且Spark的持久化机制还是自动容错的,如果持久化的RDD的任何partition数据丢失了,那么Spark会自动通过其源RDD,使用transformation算子重新计算该partition的数据
    • 要持久化一个RDD,只需要调用它的 cache() 或者 persist() 方法即可
    • cache()和persist()的区别在于:
      • cache() 是 persist() 的一种简化方式, cache() 的底层就是调用的 persist() 的无参版本
      • 也就是调用 persist(MEMORY_ONLY),将数据持久化到内存中
      • 如果需要从内存中清除缓存,那么可以使用 unpersist() 方法

        2、RDD持久化策略

        | 策略 | 介绍 | | —- | —- | | MEMORY_ONLY | 以非序列化的方式持久化在JVM内存中 | | MEMORY_AND_DISK | 同上,但是当某些partition无法存储在内存中时,会持久化到磁盘中 | | MEMORY_ONLY_SER | 同MEMORY_ONLY,但是会序列化 | | MEMORY_AND_DISK_SER | 同MEMORY_AND_DSK,但是会序列化 | | DISK_ONLY | 以非序列化的方式完全存储到磁盘上 | | MEMORY_ONLY_2、MEMORY_AND_DISK_2等 | 尾部加了2的持久化级别,表示会将持久化数据复制 |
  • MEMORY_ONLY

    • 以非序列化的Java对象的方式持久化在JVM内存中。如果内存无法完全存储RDD所有
  • MEMORY_AND_DISK
    • 当某些 partition 无法存储在内存中时, 会持久化到磁盘中。 下次需要使用这些 partition时,需要从磁盘上读取,不需要重新计算
  • MEMORY_ONLY_SER
    • 同MEMORY_ONLY,但是会使用Java的序列化方式,将Java对象序列化后进行持 久化。可以减少内存开销,但是在使用的时候需要进行反序列化,因此会增加CPU开销
  • MEMORY_AND_DISK_SER
    • 同MEMORY_AND_DSK。但是会使用序列化方式持久化Java对象
  • DISK_ONLY
    • 使用非序列化Java对象的方式持久化,完全存储到磁盘上
  • MEMORY_ONLY_2、MEMORY_AND_DISK_2
    • 如果是尾部加了2的持久化级别,表示会将持久化数据 复制一份,保存到其它节点,从而在数据丢失时,不需要重新计算,只需要使用备份数据即可

3、如何选择RDD持久化策略

  • Spark提供了多种持久化级别,主要是为了在CPU和内存消耗之间进行取舍
  • 以下建议:

    • 优先使用MEMORY_ONLY,纯内存速度最快,而且没有序列化不需要消耗CPU进行反序列化操作, 缺点就是比较耗内存
    • MEMORY_ONLY_SER,将数据进行序列化存储,纯内存操作还是非常快,只是在使用的时候需要消耗CPU进行反序列化
    • 如果需要进行数据的快速失败恢复,那么就选择带后缀为_2的策略,进行数据的备份,这样在失败时,就不需要重新计算了
    • 能不使用 DISK 相关的策略, 就不要使用, 因为有的时候, 从磁盘读取数据, 还不如重新计算一次

      4、案例:使用RDD的持久化

  • Scala实现 ```scala import org.apache.spark.{SparkConf, SparkContext}

/**

  • 需求:RDD持久化 */ object PersistRddScala {

    def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName(“PersistRddScala”) .setMaster(“local”) val sc = new SparkContext(conf)

    //注意cache的用法和位置 //cache默认是基于内存的持久化 // cache()=persist()=persist(StorageLevel.MEMORY_ONLY) val dataRDD = sc.textFile(“/Users/angel/Desktop/data/hello.txt”).cache() var start_time = System.currentTimeMillis() var count = dataRDD.count() println(count) var end_time = System.currentTimeMillis() println(“第一次耗时:”+(end_time-start_time))

    start_time = System.currentTimeMillis() count = dataRDD.count() println(count) end_time = System.currentTimeMillis() println(“第二次耗时:”+(end_time-start_time))

    sc.stop() } } ```

  • Java实现 ```java import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext;

/**

  • 需求:RDD持久化 */ public class PersistRddJava {

    public static void main(String[] args) {

     //创建JavaSparkContext
     SparkConf conf = new SparkConf();
     conf.setAppName("PersistRddJava")
             .setMaster("local");
     JavaSparkContext sc = new JavaSparkContext(conf);
    
     JavaRDD<String> dataRDD = sc.textFile("/Users/angel/Desktop/data/hello.txt").cache();
    
     long start_time = System.currentTimeMillis();
     long count = dataRDD.count();
     System.out.println(count);
     long end_time = System.currentTimeMillis();
     System.out.println("第一次耗时:"+(end_time-start_time));
    
    start_time = System.currentTimeMillis();
    count = dataRDD.count();
    System.out.println(count);
    end_time = System.currentTimeMillis();
    System.out.println("第二次耗时:"+(end_time-start_time));

    sc.stop();
}

}

<a name="dBDgY"></a>
### 
<a name="MTkd7"></a>
### 9.4.6 共享变量
<a name="mGM4h"></a>
#### 1、共享变量的工作原理

- 默认情况下,一个算子函数中使用到了某个外部的变量,那么这个变量的值会被拷贝到每个task中,此时每个task只能操作自己的那份变量数据
- Spark提供了两种共享变量,一种是**Broadcast Variable(广播变量)**,另一种是**Accumulator(累加变量)**
<a name="cwIL5"></a>
#### 2、Broadcast Variable

- Broadcast Variable会将使用到的变量,仅仅为每个节点拷贝一份,更大的用处是优化性能,减少网络传输以及内存消耗
- 通过调用SparkContext的 **broadcast() **方法,针对某个变量创建广播变量【**广播变量是只读的**】,然后在算子函数内,**使用到广播变量时,每个节点只会拷贝一份副本。可以使用广播变量的 value() 方法 获取值**
- 如图所示

![image.png](https://cdn.nlark.com/yuque/0/2021/png/1580562/1629970245819-1a7352cd-f893-4c6a-8a21-e8dd8901b402.png#clientId=ud320ad91-4c1e-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=604&id=ub158b734&margin=%5Bobject%20Object%5D&name=image.png&originHeight=604&originWidth=1265&originalType=binary&ratio=1&rotation=0&showTitle=false&size=144122&status=done&style=shadow&taskId=ua651813b-d071-43d9-bde1-ec97bd892c3&title=&width=1265)
<a name="iIXUo"></a>
#### 

- Scala实现
```scala
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 需求:使用广播变量
 */
object BoradcastOpScala {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("BoradcastOpScala")
      .setMaster("local")
    val sc = new SparkContext(conf)

    val dataRDD = sc.parallelize(Array(1,2,3,4,5))
    val varable = 2
    //dataRDD.map(_ * varable)
    //1:定义广播变量
    val varableBroadcast = sc.broadcast(varable)

    //2:使用广播变量,调用其value方法
    dataRDD.map(_ * varableBroadcast.value).foreach(println(_))

    sc.stop()
  }
}
  • Java实现 ```java import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.broadcast.Broadcast;

import java.util.Arrays;

/**

  • 需求:使用广播变量 */ public class BroadcastOpJava {

    public static void main(String[] args) {

     //创建JavaSparkContext
     SparkConf conf = new SparkConf();
     conf.setAppName("BroadcastOpJava")
             .setMaster("local");
     JavaSparkContext sc = new JavaSparkContext(conf);
    
     JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
     int varable = 2;
     //1:定义广播变量
     Broadcast<Integer> varableBroadcast = sc.broadcast(varable);
    
     //2:使用广播变量
     dataRDD.map(new Function<Integer, Integer>() {
         @Override
         public Integer call(Integer i1) throws Exception {
             return i1 * varableBroadcast.value();
         }
     }).foreach(new VoidFunction<Integer>() {
         @Override
         public void call(Integer i) throws Exception {
             System.out.println(i);
         }
     });
    
     sc.stop();
    

    } } ```

3、Accumulator

  • Accumulator:用于多个节点对一个变量进行共享性的操作

「注意:Accumulator只提供了累加的功能,在task中只能对 Accumulator进行累加操作,不能读取它的值。只有Driver 进程中才可以读取Accumulator的值」

  • Scala实现 ```scala import org.apache.spark.{SparkConf, SparkContext}

/**

  • 需求:使用累加变量 / object AccumulatorOpScala { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName(“AccumulatorOpScala”) .setMaster(“local”) val sc = new SparkContext(conf) val dataRDD = sc.parallelize(Array(1,2,3,4,5)) //这种写法是错误的,因为foreach代码是在worker节点上执行的 //var total = 0 和 println(total) 是在Driver进程中执行的 //所以无法实现累加操作 //并且foreach算子可能会在多个task中执行,这样foreach内部实现的累加也不是最终全局累加的结果 /var total = 0 dataRDD.foreach(num=>total += num) println(total)*/

    //所以此时想要实现累加操作就需要使用累加变量了 //1:定义累加变量 val sumAccumulator = sc.longAccumulator

    //2:使用累加变量 dataRDD.foreach(num=>sumAccumulator.add(num))

    //注意:只能在Driver进程中获取累加变量的结果 println(sumAccumulator.value)

    sc.stop() } } ```

  • Java实现 ```java import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.util.LongAccumulator;

import java.util.Arrays;

/**

  • 需求:使用累加变量 */ public class AccumulatorOpJava {

    public static void main(String[] args) {

     //创建JavaSparkContext
     SparkConf conf = new SparkConf();
     conf.setAppName("AccumulatorOpJava")
             .setMaster("local");
     JavaSparkContext sc = new JavaSparkContext(conf);
     JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
     //1:定义累加变量
     LongAccumulator sumAccumulator = sc.sc().longAccumulator();
    
     //2:使用累加变量
     dataRDD.foreach(new VoidFunction<Integer>() {
         @Override
         public void call(Integer i) throws Exception {
             sumAccumulator.add(i);
         }
     });
    
     //获取累加变量的值
     System.out.println(sumAccumulator.value());
     sc.stop();
    

    } } ```