1、背景

MapReduce的局限性:

  • 仅支持Map和Reduce两种操作
  • 编程复杂度和学习及使用成本略高
  • 处理效率低
    • Map中间结果写磁盘,Reduce写HDFS,多个MR之间通过HDFS交换数据
    • 任务调度及启动开销大
  • 在机械学习和图计算等方面支持有限,性能效率表现较差

    • MapReduce的机械学习框架称为mahout

      2、定义

      专为大规模数据处理而设计的快速通用的计算引擎,并形成一个高速 发展应用广泛的生态系统。

      3、特点

  • 速度快

    • 内存计算下,Spark比Hadoop快100倍
  • 易用性
    • 80 多个高级运算符
    • 跨语言:使用 Java,Scala,Python,R 和 SQL 快速编写应用程序
  • 通用性
    • Spark 提供了大量的库,包括 SQL、DataFrames、MLib、GraphX、 Spark Streaming。
    • 开发者可以在同一个应用程序中无缝组合使用这些库
  • 支持多种资源管理器
    • Spark 支持 Hadoop YARN,Apache Mesos,及其自带的独立集群管理 器
  • 生态组件丰富与成熟

    • spark streaming:实时数据处理
    • shark/sparkSQL:用 sql 语句操作 spark 引擎
    • sparkR:用 R 语言操作 Spark
    • mlib:机器学习算法库
    • graphx:图计算组件

      4、在生态圈中的位置

      4.1、hadoop生态圈

      image.png

      4.2、spark生态圈

      image.png

      4.3、Spark Core

      包含Spark的基本功能;尤其是定义RDD(弹性分布式数据集,resilient distributed dataset)的API、操作以及这两者上的动作。其他Spark的库都是构建在RDD和Spark Core之上的

      4.4、Spark Core

      提供通过 Apache Hive 的 SQL 变体 Hive 查询语言 (HiveQL)与 Spark 进行交互的 API。每个数据库表被当做一 个 RDD,Spark SQL 查询被转换为 Spark 操作

      4.5、Spark Core

      对实时数据流进行处理和控制。Spark Streaming 允许程序能够像普通 RDD 一样处理实时数据

      4.6、 MLib

      一个常用机器学习算法库,算法被实现为对 RDD 的 Spark 操作。这个库包含可扩展的学习算法,比如分类、回归 等需要对大量数据集进行迭代的操作。

      4.7、 GraphX

      控制图、并行图操作和计算的一组算法和工具的集 合。GraphX 扩展了 RDD API,包含控制图、创建子图、访问路 径上所有顶点的操作

      4.8、 SparkR

      是一个提供从 R 中使用 Spark 的轻量级前端的 R 包。 在 Spark1.6 以后,SparkR 提供了分布式数据框架,它支持 selection,filtering,aggregation 等操作。也支持使用 MLib 分布式机器学习

      5、Spark Core

      5.1、架构设计图

      image.png

      5.2、相关术语

  • RDD(Resilient Distributed DataSet)

    • 弹性分布式数据集,是对数据集在spark存储和计算过程中的一种抽象
  • Partition(分区)
    • 计算是以partition为单位进行的,提供了一种划分数据的方式
  • 算子
    • 对任何函数进行某一项操作都可以认为是一个算子
  • Transformation类算子
    • 从一个RDD 转换生成另一个 RDD 的转换操作
  • Action类算子
    • 会触发 Spark 提交作业(Job),并将数据输出 Spark系统
  • 窄依赖
    • 一个父RDD的每个分区只被子RDD的一个分区使用
  • 宽依赖
    • 一个父RDD的每个分区要被子RDD 的多个分区使用
  • Application
    • 用户编写的Spark应用程序,包含了一个Driver功能的代码和分布在集群中多个节点上运行的Executor代码
  • Driver
    • 运行main函数并且创建SparkContext的程序
  • ClusterManager
    • 集群的资源管理器,在集群上获取资源的服务
  • WorkerNode
    • 集群中任何一个可以运行spark应用代码的节点
  • Executor
    • Application运行在Worker节点上的一个进程,该进程负责运行Task,并且负责将数据存在内存或者磁盘上
  • Task
    • 分配到executor上的基本工作单元,执行实际的计算任务
  • Job(作业)
    • 每个action的计算会生成一个job
  • Stage(阶段)

    • Job的组成部分,每个Job可以包含1个或者多个Stage

      5.3、运行模式

  • 本地运行模式(local)

    • 单台机器多线程来模拟spark分布式计算
  • 伪分布式模式
    • 单台机器多进程来模拟spark分布式计算
  • standalone(client)
    • 独立布署spark计算集群,driver运行在spark submit client端
  • standalone(cluster)
    • 独立布署spark计算集群,driver运行在spark worker node端
  • spark on yarn(yarn-client)
    • 以yarn集群为基础,driver运行在yarn client上
  • spark on yarn(yarn-cluster)
    • 以yarn集群为基础,driver运行在集群的am contianer中
  • spark on mesos/ec2

    • 与spark on yarn类似

      5.4、用户交互方式

  • spark-shell

    • spark命令行方式来操作spark作业
  • spark-submit
    • 通过程序脚本,提交相关的代码、依赖等来操作spark作业
  • spark-sql
    • 通过sql的方式操作spark作业
  • spark-class
    • 最底层的调用方式,其它调用方式多是最终转化到该方式中去提交
  • sparkR,sparkPython

    • 通过其它非java、非scala语言直接操作spark作业的方式

      5.5、spark-shell

      1. # 启动spark
      2. spark-shell --master local[*]
      3. # 直接构建一个rdd
      4. var listRdd = sc.parallelize(Seq(1,2,3,4,5,6))
      5. # 通过本地文件构建一个rdd
      6. var listRdd = sc.textfile("file:///home/spark/input.txt")
      7. # 通过HDFS文件构建一个rdd
      8. var listRdd = sc.textfile("hdfs:///home/spark/input.txt")
      9. # 进行相关算子操作
      10. listRdd.foreach(println)

      5.6、Java实现spark版wordcount

      5.6.1、POM配置

      ```xml <project xmlns=”http://maven.apache.org/POM/4.0.0

      1. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      2. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

      homework com.codelx 1.0-SNAPSHOT 4.0.0 spark_wordcount_java 8 8

      1. <id>nexus-aliyun</id>
      2. <name>Nexus aliyun</name>
      3. <url>http://maven.aliyun.com/nexus/content/groups/public</url>

      org.apache.spark

      1. <artifactId>spark-core_2.11</artifactId>
      2. <version>2.3.2</version>
      3. <scope>provided</scope>

      1. <groupId>log4j</groupId>
      2. <artifactId>log4j</artifactId>
      3. <version>1.2.17</version>
      4. <scope>provided</scope>

      spark_study maven-compiler-plugin 2.3.2 1.8 1.8 UTF-8 maven-assembly-plugin jar-withdependencies make-assembly package assembly

  1. <a name="Zxmb7"></a>
  2. ### 5.6.2、具体实现
  3. ```java
  4. import org.apache.spark.SparkConf;
  5. import org.apache.spark.api.java.JavaPairRDD;
  6. import org.apache.spark.api.java.JavaRDD;
  7. import org.apache.spark.api.java.JavaSparkContext;
  8. import org.apache.spark.api.java.function.FlatMapFunction;
  9. import org.apache.spark.api.java.function.Function2;
  10. import org.apache.spark.api.java.function.PairFunction;
  11. import scala.Tuple2;
  12. import java.util.Arrays;
  13. import java.util.List;
  14. public class WordCount {
  15. public static void main(String[] args) {
  16. // 创建配置文件
  17. SparkConf sparkConf = new SparkConf();
  18. sparkConf.setAppName("spark_wordcount_java");
  19. // 创建上下文
  20. JavaSparkContext jsc = new JavaSparkContext(sparkConf);
  21. // 定义输入文件
  22. if (args[0] == null || args.length != 1) {
  23. System.err.println("传参有误 ,请检查!!!");
  24. System.exit(-1);
  25. }
  26. // 构建file rdd
  27. JavaRDD<String> fileRdd = jsc.textFile(args[0]);
  28. // line rdd -> word rdd
  29. JavaRDD<String> wordOneRdd = fileRdd.flatMap((FlatMapFunction<String, String>) line -> Arrays.asList(line.split("\\s+")).iterator());
  30. // word rdd -> (word,1) rdd
  31. JavaPairRDD<String, Integer> wordRdd = wordOneRdd.mapToPair((PairFunction<String, String, Integer>) word -> new Tuple2<>(word, 1));
  32. // (word,1) rdd -> (word,freq) rdd
  33. JavaPairRDD<String, Integer> javaPairRdd = wordRdd.reduceByKey((Function2<Integer, Integer, Integer>) Integer::sum);
  34. // 将(word,freq) rdd -> collect
  35. List<Tuple2<String, Integer>> list= javaPairRdd.collect();
  36. // 打印结果
  37. list.forEach(System.out::println);
  38. // 关闭上下文
  39. jsc.close();
  40. }
  41. }

5.7、Scala实现spark版wordcount

5.7.1、创建scala的maven项目

新增
image.png
image.png
GroupId:net.alchim31.maven
ArtifactId: scala-archetype-simple
Version: 1.7
Repository:https://maven.aliyun.com/repository/central
点击ok后出现以下界面
image.png
点击下一步填写相关信息
image.png
点击下一步,然后点击完成等待项目构建完成
修改POM文件信息
image.png

5.7.2 具体实现

import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit = {
    //  创建配置文件
    val sparkConf = new SparkConf()
    sparkConf.setAppName("spark_wordcount_scala")
    //    sparkConf.setMaster("local[*]")
    //  创建上下文
    val sc = new SparkContext(sparkConf)
    //  创建文件输入
    var filePath = ""
    if (args.length == 0 || args(0) == null) {
      sc.stop()
      println("请输入文件路径")
    } else {
      filePath = args(0)
    }
    //  创建rdd
    var rdd = sc.textFile(filePath)
    //  Scala处理流程
    var list = rdd.flatMap(_.split(" ")).map(item => (item, 1)).reduceByKey((a, b) => a + b).collect()
    //    结果展示及关闭流
    list.foreach(println)
    sc.stop()
  }
}

6、常用算子

6.1、 转换算子(Transformation)

6.1.1、Value型

  • map
    • 类比于 mapreduce 中的 map 操作,给定一个输入通过 map 函数映到成 一个新的元素输出
  • flatMap
    • 给定一个输入,将返回的所有结果打平成一个一维集合结构
  • mapPartitions
    • 以分区为单位进行计算处理,而 map 是以每个元素为单位进行计算处 理
    • 当在 map 过程中需要频繁创建额外对象时,如文件输出流操作、jdbc 操作、Socket 操作等时,当用 mapPartitions 算子
  • glom
    • 以分区为单位,将每个分区的值形成一个数组
  • union 算子
    • 将两个 RDD 合并成一个 RDD,并不去重
    • 会发生多分区合并成一个分区的情况
  • groupBy 算子
    • 输入分区与输出分区多对多型,聚合
  • filter 算子
    • 输出分区为输入分区子集型 ,过滤
  • distinct 算子
    • 输出分区为输入分区子集型,全局去重
  • cache 算子

    • cache 将 RDD 元素从磁盘缓存到内存。 相当于 persist(MEMORY_ONLY) 函数的功能

      6.1.1、Key-Value型

  • mapValues 算子

    • 输入分区与输出分区一对一
    • 针对(Key,Value)型数据中的 Value 进行 Map 操作,而不对 Key 进行处理
  • combineByKey 算子
    • createCombiner:对每个分区内的同组元素如何聚合,形成一个累加 器
    • mergeValue:将前边的累加器与新遇到的值进行合并的方法
    • mergeCombiners:每个分区都是独立处理,故同一个键可以有多个累 加器。如果有两个或者更多的分区都有对应同一个键的累加器,用方 法将各个分区的结果进行合并。
  • reduceByKey 算子
    • 按 key 聚合后对组进行归约处理,如求和、连接等操作
  • join 算子

    • 对 Key-Value 结构的 RDD 进行按 Key 的 join 操作,最后将 V 部分做 flatmap 打平操作

      6.1、 行动算子(Action)

      此种算子会触发 SparkContext 提交作业,触发了 RDD DAG 的 执行
  • foreach

    • 循环遍历
  • saveAsTextFile 算子
    • 指定本地保存的目录 first.saveAsTextFile(“file:///home/spark/text”)
    • 指定 hdfs 保存的目录,默认路径是 hdfs 系统中/user/当前用户路径下 first.saveAsTextFile(“spark_shell_output_1”)
  • collect 算子
    • 相当于 toArray 操作,将分布式 RDD 返回成为一个 scala array 数组 结果,实际是 Driver 所在的机器节点,再针对该结果操作
  • collectAsMap 算子
    • 相当于 toMap 操作,将分布式 RDD 的 kv 对形式返回成为一个的 scala map 集合,实际是 Driver 所在的机器节点,再针对该结果操作
  • lookup 算子
    • 对(Key,Value)型的 RDD 操作,返回指定 Key 对应的元素形成的 Seq。
  • reduce 算子
    • 先对两个元素进行 reduce 函数操作,然后将结果和迭代器取出的下一 个元素进行 reduce 函数操作,直到迭代器遍历完所有元素,得到最后 结果。
  • fold 算子

    • fold 算子签名: def fold(zeroValue: T)(op: (T, T) => T): T
    • 其实就是先对 rdd 分区的每一个分区进行 op 函数,在调用 op 函数过 程中将 zeroValue 参与计算,最后在对所有分区的结果调用 op 函数, 同理此处 zeroValue 再次参与计算。

      6.3、其他经典算子

  • cartesian 算子

    • 计算两个RDD的笛卡尔积
  • subtract 算子
    • 返回在RDD中出现,并且不在otherRDD中出现的元素,不去重
  • sample 算子
    • sample算子是用来抽样用的,其有3个参数
    • withReplacement:表示抽出样本后是否在放回去,true表示会放回去,这也就意味着抽出的样本可能有重复
    • fraction :抽出多少,这是一个double类型的参数,0-1之间,eg:0.3表示抽出30%
    • seed:表示一个种子,根据这个seed随机抽取,一般情况下只用前两个参数就可以,这个参数一般用于调试,有时候不知道是程序出问题还是数据出了问题,就可以将这个参数设置为定值
  • takeSample 算子
    • 返回此RDD的固定大小的采样子集。返回Array[T]
    • 注意:仅当预期结果数组较小时才应使用此方法,因为所有数据均已加载到驱动程序的内存中
  • persist 算子
    • MEMORY_ONLY 使用未序列化的Java对象格式,将数据保存在内存中。
    • MEMORY_AND_DISK 使用未序列化的Java对象格式,优先尝试将数据保存在内存中。如果内存不够存放所有的数据,会将数据写入磁盘文件中
    • MEMORY_ONLY_SER 基本含义同MEMORY_ONLY。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。
    • MEMORY_AND_DISK_SER 基本含义同MEMORY_AND_DISK。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。
    • DISK_ONLY 使用未序列化的Java对象格式,将数据全部写入磁盘文件中。
    • MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等对于上述任意一种持久化策略,如果加上后缀_2,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上。
  • cogroup 算子
    • 相当于SQL中的全外关联full outer join,返回左右RDD中的记录,关联不上的为空
  • leftOuterJoin 算子
    • 左外连接
  • rightOuterJoin 算子
    • 右外连接
  • saveAsObjectFile 算子
    • 将元素序列化成对象,存储到文件中
  • count 算子
    • 计算个数
  • top 算子
    • top函数用于从RDD中,按照默认(降序)或者指定的排序规则,返回前num个元素。
  • aggregate 算子

    • 接受一个初始值,在各个分区之间应用f1函数,然后分区之间应用f2函数最后得到一个结果进行返回

      7、Spark-Sql

      7.1、前世今生

  • 先出现 MapReduce,后本着 sql on mr 的思路,产生了 Hive

  • MapReduce 执行效率太慢,效率升级后产生了 Tez,再效率升级 后产生了 Spark
  • Spark 为了方便操作,本着 sql on spark 的思路产生 Shark, 但 Shark 太多的借鉴和依赖 Hive,制约了 Spark 的 One Stack Rule Them All 既定方针,制约了 Spark 各个组件的相互集成
  • 最后被 Spark 团队推翻和停更,推出了 sql on spark 新项目 SparkSql,使 sql on spark 的发展方向上,得到质的提升

    7.2、简介

  • Spark SQL 是 Spark 处理数据的一个模块

  • 专门用来处理结构化数据的模块,像 json,parquet,avro,csv,普 通表格数据等均可
  • 与基础 RDD 的 API 不同,Spark SQL 中提供的接口将提供给更多关于结 构化数据和计算的信息,并针对这些信息,进行额外的处理优化

    7.3、操作方式

    7.3.1、SparkSql Shell

    类似于 hive shell

    //直接输入 spark-sql+自己想要添加的参数即可,与 spark-shell 相似
    spark-sql [options]
    //如指定运行模式
    spark-sql local[*]
    //如指定运行 spark webui 服务的端口,解决多人共用一个入口机时候的进入时候报 port bind exception 的问题
    spark-sql --conf spark.ui.port=4075
    //也可以用于似于 hive -e 的方式,直接直接一段 sparksql 代码
    spark-sql –e “sparksql code”
    

    7.3.2、 DataFrames API

  • 最早专为 sql on spark 设计的数据抽象,与 RDD 相似,增加了数据 结构 scheme 描述信息部分

  • 写 spark 代码,面向 DF(DataFrams 缩写)编程,可以与其它 Spark 应 用代码无缝集成
  • 比 RDD 更丰富的算子,更有利于提升执行效率、减少数据读取、执行 计划优化
    ```scala import org.apache.spark.sql.{DataFrame, SparkSession}

object TestDF { def main(args: Array[String]): Unit = { // 创建sql上下文 val ss = SparkSession .builder() .appName(“Test DF”) .master(“local[]”) .getOrCreate() // 创建df var df: DataFrame = ss.read.json(“C:\idea_workspace\homework\data\data.json”) df.select(“content”, “createTime”, “weiboUrl”) .orderBy(“createTime”) .show() // 也可以将 df 对象直接注册成一张临时表 df. createTempView (“weibo_doc”); // 然后直接写sql var resultDF =ss.sql(“select from weibo_doc”) // 数据持久化 resultDF.repartition(1).write.format(“parquet”).save(“F:\test _sbt\FirstSpark4Scala\save3”) } }

**DataFrame 数据持久化**

-  parquet 数据格式,默认的输入和输出均为该格式
- spark 自带:天然集成,强力推荐的数据格式 

**parquet 产生背景 ** 

-  面向分析型业务的列式存储格式
- 由 Twitter 和 Cloudera 合作开发,2015 年 5 月从 Apache 的孵化器里 毕业成为 Apache 顶级项目
- Twitter 的日志结构是复杂的嵌套数据类型,需要设计一种列式存储格 式,既能支持关系型数据(简单数据类型),又能支持复杂的嵌套类 型的数据,同时能够适配多种数据处理框架

**parquet 的优点**

-  压缩数据,内部自带 gzip 压缩 
- 不失真 
- 减少 IO 吞吐量 
- 高效的查询 
- 多数据处理平台,均支持 parquet,包括 hive 等。  

**常用命令 **<br />show、printSchema、select、filter、groupBy、count、orderBy<br />**其它操作类比于 sql 即可**
<a name="s77aX"></a>
### 7.3.3、 DataSets API 

-   集成了 RDD 强类型和 DataFrames 结构化的优点,官方正强力打造的 新数据抽象类型
- 写 spark 代码,面向 DS 编程,可以与其它 Spark 应用代码无缝集 成 
- 比 RDD 更丰富的算子,更有利于提升执行效率、减少数据读取、执行 计划优化
```scala
import org.apache.spark.sql.SparkSession
case class Student(name: String, age: Long, address: String)
/**
 * 抽象数据类型 DataSet 测试类
 */
object TestSparkSqlDataSet {
  def main(args: Array[String]): Unit = {
    //1、构建 spark session
    val sparkSession = SparkSession
      .builder()
      .appName("SparkSql-2.3.2-TestCase")
      .master("local[*]")
      .getOrCreate()
    //引入自动隐式类型转换
    import sparkSession.implicits._
    // 从基础数据对象类型创建 DataSet
    val primitiveDS = Seq(1, 2, 3).toDS()
    val col = primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)
    col.foreach(println)
    println("-----------------")
    primitiveDS.show()
    // 已为样例类 case class 创建完成编码类 Encoder
    val caseClassDS = Seq(Student("脱口秀大会", 3, "北京")).toDS()
    caseClassDS.show()

    // 指定相应的文件导入形成样例类对应的 DataSet,通过 json 的 key 和 样例类的字段名称对应即可
    val path = "C:\\idea_workspace\\homework\\data\\student_data.txt"
    val peopleDS = sparkSession.read.json(path).as[Student]
    peopleDS.select("name", "age", "address").show()
    //关停会话上下文
    sparkSession.stop()
  }
}

7.3.4、 面向程序接口对接的操作

通过 jdbc、odbc 链接后,发送相关的 sparksql 请求,实现基于 sparksql 功能开发

7.3.5、Rdd转DF

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row
object TestRddToDataFrame {
 def main(args: Array[String]): Unit = {
 //1、构建 spark session
 val sparkSession = SparkSession
 .builder()
 .appName("SparkSql-2.3.2-TestCase")
 .master("local[*]")
 .getOrCreate()
 // For implicit conversions like converting RDDs to DataFrames
 import sparkSession.implicits._
 //2、构建 scheme
 val schema =
 StructType(
 "stdNo name classNo className".split(" ").map(fieldName =>
StructField(fieldName, StringType, true)))
 //3、构建 rdd,linesRDD
 val studentRDD =
sparkSession.sparkContext.textFile("F:\\test_sbt\\FirstSpark4Scala\\stu
dent_mysql.txt")
 //4、将 linesRDD 转换成 Row rdd
 val rowRDD = studentRDD.map(_.split("\\t")).map(p => Row(p(0),
p(1), p(2), p(3)))
 //5、创建 df,由 row rdd + scheme
 val studentDataFrame = sparkSession.createDataFrame(rowRDD, schema)
 //6、df 算子操作
 studentDataFrame.printSchema()
 studentDataFrame.show()
 //7、停掉相关会话
 sparkSession.stop()
 }
}

7.4、 RDD、DataFrame、DataSet对比分析

相同点

  • 全都是 spark 平台下的分布式弹性数据集,为处理超大型数据提供便 利
  • 三者都有惰性机制,在进行 Transform 操作时不会立即执行,在遇到 Action 操作时会正式提交作业执行。
  • 均采用 spark 的内存运算和优化策略,内存使用和执行效率上均可以 得到保障。
  • 均有 partition 的概念,便于分布式并行计算处理,达到分而治之。
  • 均有许多共同的函数,如 map、filter、sort 等。
  • 在进行三者的相关操作时候,个别特殊操作时必须引入一个相同的包 依赖。( 早期称为 import sqlContext.implicits.,最新版本称 为 import spark.implicits.)
  • DF 和 DS 均可以通过模式匹配获取内部的变量类型和值。
  • DF 和 DS 产生于 SparkSql,天然支持 SparkSql

    区别点

  • RDD

    • 不支持 SparkSql 操作,均需进行转成 DF 或是 DS 才行。
    • 类型是安全的,编译时候即可检查出类型错误。(强类型)
    • 机器间通信、IO 操作均需要序列化、反序列化对象,性能开销大。
  • DataFrame
    • 有 scheme 的 RDD:比 RDD 增加了数据的描述信息。
    • 比 RDD 的 API 更丰富,增加了针对结构化数据 API。
    • 只有一个固定类型的 DataSet,即为 DataFrame=DataSet[Row]
    • 序列化和反序列化时做了结构化优化,减少了不必要的结构化信息的 序列化,提高了执行效率。
  • DataSet
    • 强类型的 DataFrame,与 DF 有完全相同的成员函数。
    • 每行的类型不固定,需要使用模式匹配 case class 后,获取实际的 类信息、字段类型、字段值。
    • 访问对象数据时,比 DF 更加直接简单。
    • 在序列化和反序列化时,引入了 Encoder 机制,达到按需序列化和反 序列化,不必像之前整个对象操作了,进一步提高了效率。

应用场景

  • 使用 RDD 场景 
    • 数据为非结构化,如流媒体等数据
    • 对数据集进行底层的转换、处理、控制
    • 不需要列式处理,而是通过常规的对象.属性来使用数据。
    • 对 DF、DS 带来的开发效率、执行效率提升不敏感时
  • 使用 DF(必须)
    • R 或是 python 语言开发者,使用 DF
  • 使用 DS(必须)
    • 在编译时就有高度的类型安全,想要有类型的 JVM 对象,用上 Catalyst 优化,并得益于 Tungsten 生成的高效代码
  • 使用 DF、DS 场景

    • 需要丰富的语义、高级抽象和特定领域专用的 API 时
    • 处理需要对半结构化数据进行高级处理,如 filter、map、 aggregation、average、sum、SQL 查询、列式访问或使用 lambda 函 数
    • 在不同的 Spark 库之间使用一致和简化的 API

      8、Spark-Streaming

      8.1、概述

  • 数据处理类型分类

    • 静态数据
      • 数据源是不变的、有限的、显式离散的
      • 多适用于批量计算、离线计算
    • 流数据
      • 数据是变动的、无限的、连续的
      • 多适用于实时计算,能在秒级、秒内处理完成
    • sparkstreaming 是什么
      • 一句话总结:微批处理的流式(数据)实时计算框架。
      • 原理:是把输入数据以某一时间间隔批量的处理,当批处理间隔缩短 到秒级时,即可用于处理实时数据流。
      • 优点
        • 可以和 spark core、sparksql 等无缝集成
        • 支持从多种数据源获取数据,包括 Kafka、Flume、Twitter、 ZeroMQ、Kinesis 以及 TCP sockets,然后可以使用诸如 map、 reduce、join 等高级函数进行复杂算法的处理,最后可以将处 理结果存储到 HDFS 文件系统,数据库等。
    • 重要概念说明
      • StreamingContext
        • 类比于 SparkContext,SparkSqlContext
        • 流计算框架中的中枢类,负责各种环境信息、分发调度等任 务。
      • 数据源 
        • 简称:Source,意为 DataSource 的缩写
        • 指流数据的来源是哪里,如文件,Socket 输入、Kafka 等。
      • 离散流
        • 英文称 Discretized Stream,简称 DStream,即为 sparkstreaming 微批处理当中的数据抽象单位。
        • 是继 spark core 的 RDD、spark sql 的 DataFrame 和 DataSet 后又一基础的数据类型,是 spark streaming 特有的数据类 型。
      • 输入离散流
        • 英文简称:Input DStream
        • 将 Spark Streaming 连接到一个外部 Source 数据源来读取数 据的统称
      • 批数据
        • 英文称 Batch Data
        • 连续数据离散化的步骤:将流式实时连续的数据整体转化成以 时间片为单位进行分批,即将流式数据转化成时间片为单位数 据进行批数据处理,随着时间推移,这些处理结果即形成结果 数据流,即流处理引擎形成。
      • 时间片或批处理时间间隔
        • 英文称 batch interval
        • 人为对流数据进行定量的标准,以时间片作为拆分流数据的依 据。
        • 一个时间片的数据对应一个 RDD 实例。
      • 窗口长度
        • 英文称 window length
        • 一个窗口覆盖的流数据的时间长度,必须是批处理时间间隔的 倍数。
      • 滑动窗口时间间隔
        • 滑动窗口:简称 Sliding window
        • 前一个窗口到后一个窗口所经过的时间长度间隔。必须是批处 理时间间隔的倍数
  • 处理流程

image.png

  • 内部工作流程

Spark Streaming 接收实时输入数据流并将数据分成批处理,然后由 SparkCore 引擎处理,以批量生成最终结果流
image.png

8.2、代码实现

POM依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.codelx</groupId>
  <artifactId>2022_03_07</artifactId>
  <version>1.0-SNAPSHOT</version>
  <name>${project.artifactId}</name>
  <description>My wonderfull scala app</description>
  <inceptionYear>2018</inceptionYear>
  <licenses>
    <license>
      <name>My License</name>
      <url>http://....</url>
      <distribution>repo</distribution>
    </license>
  </licenses>

  <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <encoding>UTF-8</encoding>
    <scala.version>2.11.11</scala.version>
    <scala.compat.version>2.11</scala.compat.version>
    <spec2.version>4.2.0</spec2.version>
  </properties>

  <dependencies>
    <!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
    <dependency>
      <groupId>com.google.guava</groupId>
      <artifactId>guava</artifactId>
      <version>21.0</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>2.7.7</version>
      <scope>provided</scope>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core -->
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-core</artifactId>
      <version>2.7.7</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_${scala.compat.version}</artifactId>
      <version>2.3.2</version>
      <scope>provided</scope>
    </dependency>
    <!-- Test -->
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.scalatest</groupId>
      <artifactId>scalatest_${scala.compat.version}</artifactId>
      <version>3.0.5</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.specs2</groupId>
      <artifactId>specs2-core_${scala.compat.version}</artifactId>
      <version>${spec2.version}</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.specs2</groupId>
      <artifactId>specs2-junit_${scala.compat.version}</artifactId>
      <version>${spec2.version}</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.17</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_${scala.compat.version}</artifactId>
      <version>2.3.2</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_${scala.compat.version}</artifactId>
      <version>2.3.2</version>
      <scope>provided</scope>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <!-- 因为往往是scala和java在一起混合开发,故需要设置多个源文件目录,故需要maven新插件build-helper-maven-plugin来支持设置多个源文件夹,也可以设置多个资源路径 -->
      <plugin>
        <groupId>org.codehaus.mojo</groupId>
        <artifactId>build-helper-maven-plugin</artifactId>
        <version>3.0.0</version>
        <executions>
          <execution>
            <id>add-source</id>
            <phase>generate-sources</phase>
            <goals>
              <goal>add-source</goal>
            </goals>
            <configuration>
              <sources>
                <!-- 我们可以通过在这里添加多个source节点,来添加任意多个源文件夹 -->
                <source>${basedir}/src/main/java</source>
                <source>${basedir}/src/main/scala</source>
              </sources>
            </configuration>
          </execution>
          <execution>
            <id>add-test-source</id>
            <phase>generate-test-sources</phase>
            <goals>
              <goal>add-test-source</goal>
            </goals>
            <configuration>
              <sources>
                <source>${basedir}/src/test/scala</source>
                <source>${basedir}/src/test/java</source>
              </sources>
            </configuration>
          </execution>
        </executions>
      </plugin>

      <plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <configuration>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
          <!-- <archive>  <manifest>  <mainClass>com.tl.job017.hdfs.HdfsFileRead</mainClass>
            </manifest>  </archive> -->
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>assembly</goal>
            </goals>
          </execution>
        </executions>
      </plugin>


      <plugin>
        <!-- see http://davidb.github.com/scala-maven-plugin -->
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.3.2</version>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
            <configuration>
              <args>
                <arg>-dependencyfile</arg>
                <arg>${project.build.directory}/.scala_dependencies</arg>
              </args>
            </configuration>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <version>2.21.0</version>
        <configuration>
          <!-- Tests will be run with scalatest-maven-plugin instead -->
          <skipTests>true</skipTests>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.scalatest</groupId>
        <artifactId>scalatest-maven-plugin</artifactId>
        <version>2.0.0</version>
        <configuration>
          <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
          <junitxml>.</junitxml>
          <filereports>TestSuiteReport.txt</filereports>
          <!-- Comma separated list of JUnit test class names to execute -->
          <jUnitClasses>samples.AppTest</jUnitClasses>
        </configuration>
        <executions>
          <execution>
            <id>test</id>
            <goals>
              <goal>test</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>

具体实现

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object NetworkWordCount {
  def main(args: Array[String]): Unit = {
    // 要初始化 Spark Streaming 程序,必须创建一个 StreamingContext 对象,
    //它是所有 Spark Streaming 功能的主要入口点。
    //一切都从 SparkConf 开始
    val conf = new SparkConf().setMaster("local[*]")
      .setAppName("NetworkWordCount")
    //指定时间间隔的 ssc 初始化
    val ssc = new StreamingContext(conf, Seconds(5))
    //ssc 指定来自 TCP 源作为输入数据源,即链接一个指定主机的已打开的 TCP 端口,从该端口中读取文本数据,每行以”\n”作为每行的结尾。
    val lines = ssc.socketTextStream("192.168.2.57", 9999)
    //将 DStream 进行打平处理,实际是对其内部的离散的 rdd 进行打平处理
    val words = lines.flatMap(_.split(" "))
    // 将单列的 word 转化为双列的 kv 结构,用于后边的 wc 操作
    val pairs = words.map(word => (word, 1))
    //对 kv 的输出进行 wc 计算
    val wordCounts = pairs.reduceByKey(_ + _)
    //打印 wc 结果到控制台
    wordCounts.print()
    //正式开始计算
    ssc.start()
    //等待计算结束,一般流式计算没有异常或人为干预是一直保持运行状态的
    ssc.awaitTermination()
  }
}

8.3、常见问题

  • 输入 DStream 和 Receivers
    • 输入 DStreams :即为从数据流源接收的输入数据流的 DStream。
      • 内置两类流媒体源
        • 基本流数据源:包括本地或是 hdfs 文件系统、Socket 套接字 链接、Akka actor 等。
        • 高级流数据源:包括 Kafka、Flume、Kinesis、ZeroMQ 等数据 源,可以通过引入第三方工具库来使用该数据源。
    • Receivers:每个输入 DStream(除文件流之外)均与 Receiver 相对关 联,该对象负责从流源接收数据并将其存储在 Spark 的内存中进行处 理。
  • 关于多路输入流的处理 o
    • SparkStreaming 对多路输入流进行自然和谐的支持。
    • 通过 ssc 可以同时创建多路输入流,并直接提供 api 进行各种的 join、leftOuterJoin、rightOuterJoin 等操作。
  • DStreams 的输出操作
    • print():在运行流应用程序的驱动程序节点上打印 DStream 中每批数 据的前十个元素。
    • saveAsTextFiles:将此 DStream 的内容保存为文本文件
    • saveAsObjectFiles:将此 DStream 的内容保存为 SequenceFiles 序列 化 Java 对象。
    • saveAsHadoopFiles:将此 DStream 的内容保存为 Hadoop 文件。
    • foreachRDD:最通用的输出运算符,它将函数 func 应用于从流生成的 每个 RDD。此函数应将每个 RDD 中的数据推送到外部系统,例如将 RDD 保存到文件,或通过网络将其写入数据库。请注意,函数 func 在运行 流应用程序的驱动程序 Driver 进程中执行。
    • foreach 和 foreachPartion 运行在 Worker 节点。
  • 关于 SparkStreaming 本地运行时线程数量的设置
    • 在本地运行 Spark Streaming 程序时,不能使用“local”或“local [1]”作为主 URL。
    • 该设置即只有一个线程将用于本地运行任务。如果正使用基于接收器 的输入 DStream,则必须使用单个线程来运行接收器,而无法留下用于 处理接收数据的线程。故本地运行时,始终使用“local [ n ]”作为 主 URL,其中 n >要运行的接收器数量
  • 关于 SparkStreaming 在集群运行时 CPU 逻辑核心数设置
    • 一个逻辑 CPU 的资源相当于可以开启一个线程的能力。
    • 在集群上运行时,分配给 SparkStreaming 应用程序的核心数必须大于 接收器数。否则系统将只能接收数据,但无法处理数据