Spark运行逻辑

通过 wordCount 程序, 分析Spark程序的运行逻辑

  1. object ScalaWordCount {
  2. def main(args: Array[String]): Unit = {
  3. //创建 Scala 的配置, 设置应用程序的名字
  4. //local[*] local 代表使用本地模式运行, * 代表使用的线程数量
  5. val conf: SparkConf = new SparkConf() .setAppName("ScalaWordCount").setMaster("local[*]")
  6. val sc = new SparkContext(conf)
  7. //读取文件
  8. val liens: RDD[String] = sc.textFile(args(0))
  9. //切分压平
  10. val words: RDD[String] = liens.flatMap(_.split(" "))
  11. //单词映射
  12. val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
  13. //分组聚合运算
  14. val reduceRdd: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)
  15. //文件写出到 hdfs
  16. reduceRdd.saveAsTextFile(args(1))
  17. //释放资源
  18. sc.stop()
  19. }
  20. }

wordCount RDD个数

通过查看DAG图,图中的一个原点表示一个RDD
wordCount程序有6个RDD
image.png
textFile()

  • HadoopRDD[LongWritable, Text]:调用Hadoop读取HDFS的文件,形成元组,元组内(偏移量,行内容),偏移量几乎用不着;LongWritable 和 Text 都是Hadoop的类型
  • MapPartitionsRDD[String]:通过 map ,把偏移量去掉,只留下行内容
  • 也即,textFile 最终返回 MapPartitionsRDD[String]

flatMap

  • 接收 MapPartitionsRDD[String],返回 MapPartitionsRDD[String]

map

  • 接收 MapPartitionsRDD[String],返回 MapPartitionsRDD[(String, Int)]

reduceByKey

  • 接收 MapPartitionsRDD[(String, Int)],返回 ShuffleRDD[(String, Int)]
  • 先局部聚合,结果会写入Executor 的本地磁盘中,而不是保存到内存
  • 进行全局聚合,根据分区器(对key进行操作,得到各个分区的编号),将本地的计算结果进行网络传输,相同的Key分到相同的 Executor,然后在Executor中进行各自Key汇总计算
  • ShuffleRDD[(String, Int)] 内部是进行了网络传输的的

saveAsTextFile

  • 接收 ShuffleRDD[(String, Int)],返回 MapPartitionsRDD[NullWritable, Text] :NullWritable 和 Text 都是Hadoop的类型
  • 输出内容到 HDFS 中

image.png

WordCount RDD 流转图形化

Shuffle 操作时

  • 局部聚合操作的结果会先存到Executor的磁盘中,成为文件,而不是内存中
  • 全局聚合时,Executor 读取自己磁盘的数据,将key按照分区器的划分,得到分区号,网络传输到分区所在的Executor中

Shuffle 触发时机

  • 只要是返回ShuffleRDD的算子, 就会触发Shuffle操作

image.png

WordCount Stage 划分

以RDD写入磁盘的操作作为 stage 的划分标准
分别命名为:

  • ShuffleMapStage
  • ResultStage

不管是 ShuffleMapStage 还是 ResultStage,也不管其内有多少个算子,Spark 都会将 stage 作为一个完整的 Task 交给 Executor 来执行
优势

  • 算子执行连贯,优化方便
  • 输入输出标准,都是从磁盘(HDFS)中获取数据,最后输出到磁盘中(HDFS)
  • Task 运行中其他的RDD转换都是基于内存的,不会和磁盘做交互

Spark 执行流程(1)及缓存 - 图4

Stage 在 SparkUI 的信息

ShuffleMapStage 的输入输出

  • Input:从数据源中获取的数据大小
  • Shuffle Write:所有Executor 在 Shuffle 写入到磁盘的总大小

ResultStage 的输入输出

  • Shuffle Read:所有Executor 在 Shuffle 从磁盘读取的总大小
  • Output:输出到数据源的数据总大小

image.png
每个Stage都是划分为多个任务的,一般为一个分区一个任务,每个分区的任务分到一个Executor中执行
ResultStage每个Task的输出都是独立的,如果输出到文件,则是不同的文件,这个和MR是一致的
image.png

在spark中, 提交一个任务时,
在数据层面上, 会被拆分成分区 进行执行
在执行流程上, 也会被分成两种任务类型
ShuffleMapTask 和 ResultTask
只要是返回ShuffleRDD的算子, 就会触发Shuffle操作, 就一定会生成ShuffleMapTask任务
但是否会生成ResultTask, 要看后续是否有Action算子, 将结果输出
如果后续又有一个会触发Shuffle的算子, 则中间这段依旧是ShuffleMapTask任务
ShuffleMapTask: 即只要数据集的数据从内存落到Executor(执行的机器的容器)的本地磁盘上, 那么从这个操作往前的任务都会被视为ShuffleMapTask, 分界岭就是具有Shuffer操作的前半段
ShuffleMapTask任务在物理上, 也是被串联在一起执行的, 如上图, map, flatMap, map再到reduceByKey的局部聚合落到磁盘上的这一整个操作, 都是一个完整的任务, 会在一个程序内完成
ResultTask: 从Executor的本地磁盘上读取数据, 然后执行, 最后输出
ResultTask任务在物理上也是一个完整的程序任务, 在一个程序内
注意,如果一个任务中,遇到了两次shuffle,则会有两个ShuffleMapTask,和一个ResultTask

image.png
从Spark界面的DAG(有向无环图)中就可以清晰看到, wordCount 任务分为两个stage, 一个是shuffleMapTask, 一个是resultTask, 且stage本身是一个连续执行的完整子程序, 不会说执行一下停下来等算子, 而是持续执行的
算子内的节点表示RDD, 可以清楚看到, textFile中有两个节点,这两个节点分别表示HadoopRDD和mapPartitionsRDD
虽然reduceByKey被分到了stage2, 但其实其局部聚合和写入本地磁盘是在stage1中的

image.png
Input: 从HDFS写入, 这里是指写入了110.0B
Shuffle Write: 指shuffleMapTask任务结束后写入到本地磁盘的量 —> 所有分区写入的总和
Shuffle Read: 指后续的从所有Executor的本地磁盘读取的总量, 通过http的方式读取各个Executor的本地磁盘文件
Output: 写出到HDFS的文件大小

Spark 执行流程分析

image.png
主要执行步骤

  • 当我们执行 bin/spark-submit --master --executor-memory 2g --total-executor-cores 6 --class cn.wolfcode.spark.ScalaWordCount original-SparkDemo-1.0.0.jar 命令时,本地程序执行 SparkSubmit 程序,该程序会告诉 Master 需要创建多少个 executor 进程
    • executor 的个数由分区数决定,而分区数可以自定义,也可以默认,默认时由读取的HDFS文件个数,每个文件的区块数决定
  • Master 通知对应的 Worker,让 Worker 创建对应的 executor 进程
    • 通过 RPC
  • executor 进程创建完毕,建立和 sparkSubmit 进程之间的通信
    • 通过RPC
  • 开始在 SprkSubmit 进程中执行对应的程序,生成对应的 RDD(不会实际拉取数据,也不会实际运算,只是保存计算的逻辑),包括对文件分区的划分
  • 当执行 Action 命令执行的时候,会生成对应的 job 任务,并且把 job 任务分发到对应的 executor 进程中执行
    • 执行到有shuffle运算的算子,算子内部会有一个方法 runJob() ,此时就会通过 rpc 让 executor 执行拉取 sparkSubmit 中的 rdd 的算子计算逻辑,到 executor 中进行实际的运算:从 HDFS 拉取该 executor 对应的文件区块,执行各个算子的计算
    • 在 ShuffleMapTask 阶段,executor 执行的是文件分区的局部运算,算完之后调用分区器,计算好每行数据接下来需要去的分区号,存储到磁盘
    • 在 ResultTask 阶段,executor 根据自己分配到的分区号,去各个 executor的磁盘中获取对应分区号的数据,拉取到自己的内存中,进行汇总计算,并输出 —> 这个汇总输出的意思是以分区号汇总数据计算,并输出
  • 如果在 executor 计算完,最终不是需要分区计算输出,而是要把全量的数据汇总进行计算的话(如计算条数 count),一般会再通过 rpc 把各个 executor 的条数传回给 SparkSubmit (Driver)程序所在的机器,也即提交任务的客户端机器所在,进行汇总计算输出

    2. Spark 的 Cache

    2.1. Spark 的缓存的引入

    我们上传我们的文件 teacher.txt 文件(800000 万数据)复制 10 份上传到 hdfs 的指定目录 /cache,接下来我们进行单词函数的一个统计(使用 spark-shell 的操作)
    再执行下面的操作

    1. val words=sc.textFile("hdfs://lab150:9000/cache/")
    2. words.count

    我们会发现第一次执行的比较慢,但是第二次肯定会比第一次快
    因为在我们的 job 结束以后,但是如果我们的任务程序 submit 程序没有退出的话,我们 job 程序会在 executor 上面缓存上次的一些结果在磁盘(主动),所以第二次我们在执行的时候就快很多,
    那么对于这个结果其实还可以提升的更快,那就是使用我们的 Spark 的内存缓存

    2.2. Spark 的内存缓存

    当我们在一个处理任务的程序中,如果需要对中间结果进行多次的访问统计分析,那么我们可以把中间结果缓存到我们的 executor 内存当中,极大提高我们二次查询的性能
    具体操作

  • 调用 rdd 的 cache 方法,默认会把 rdd 的处理的数据存放到我们的 executor 的内存当中

  • cache 并不是一个 Action 算子,执行时只会标记这个RDD需要进行缓存,等执行到 Action 算子时,才会在 Executor 中缓存

注意:

  • cache 返回的是原RDD,不接收也行,cache 并不会返回新的RDD
    1. val cacheRDD=words.cache
    2. cacheRDD.count # 在执行统计的时候会把结果统计同时保存到 executo 的内存当中
    在管理控制台查看缓存信息
    image.png
    Spark 缓存执行示意图
    image.png
  1. Spark 的 RDD 第一次从 HDFS 读取数据,然后缓存在 Executor 中,返回缓存后的 RDD
  2. 当第二次需要使用到相同的RDD时,如果程序调用的时缓存后的RDD,Executor 就会从内存中拿出缓存的RDD进行后续的计算

image.png
注意:缓存只是针对当前的一个会话(SparkContext),如果我们关闭 spark-shell,然后在重新执行任务的话缓存是没有, 只有重新运行程序把数据缓存到内存中后才可以使用缓存

2.3. Spark 的内存的缓存的引用

在我们编写的第二个 TopN 的统计程序中,我们是多次提交任务的,但是对于每次提交的任务都会重新去hdfs 文件系统中读取,我们把数据缓存到内存则会极大的提高后面的 Task 程序的运行效率(3s->87ms)
image.png

2.4. Spark 的缓存部分数据

如果需要缓存的数据量比 executor 的内存还大,那么就会缓存部分数据
注意:如果需要缓存的RDD没有全部缓存到内存,那基本无意义,剩余部分仍然需要去HDFS中获取

  1. bin/spark-shell --master spark://lab150:7077 --executor-memory 1g --total-executor-cores 6
  2. val words=sc.textFile("hdfs://lab150:9000/cache/")
  3. val cacheRDD=words.cache
  4. cacheRDD.count

image.png

2.5. Spark 缓存的应用场景

  1. 要求计算速度快
  2. 集群的资源需要足够大,可以存放足够多的数据的缓存
  3. 被 Cache 的数据会多次触发 Action 的操作,否则就没有缓存的意义
  4. 先会对原始数据进行过滤,然后在存放到缓存中

    2.6. Spark 缓存的释放

    当我们的任务都执行完成以后,我们需要释放对应的缓存资源,那么就可以直接调用一个方法 ```java cacheRDD.unpersist(true) # 其中 true 表示异步执行, false 表示同步执行
  1. 释放完成以后我们在管理控制台的 storage 里面是没有对应的信息
  2. <a name="iwbXl"></a>
  3. ## 2.7. Spark 的缓存的源码分析、
  4. 调用的 cache 方法,实际上就是调用 `persist()` 方法<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/1774803/1634525770617-d1212bd8-a260-4197-aef2-fa1d483bf063.png#clientId=u1e185ee5-5f6d-4&from=paste&height=98&id=ubdeddf43&margin=%5Bobject%20Object%5D&name=image.png&originHeight=289&originWidth=2010&originalType=binary&ratio=1&size=395198&status=done&style=shadow&taskId=u3cf2825e-f6d0-4241-af6f-436e1d6acc2&width=683)<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/1774803/1634525802302-37098cc3-0307-47d4-8460-e196f2729e48.png#clientId=u1e185ee5-5f6d-4&from=paste&height=494&id=ub321b62e&margin=%5Bobject%20Object%5D&name=image.png&originHeight=1301&originWidth=2145&originalType=binary&ratio=1&size=2877477&status=done&style=shadow&taskId=u0440cf6f-1965-4a4c-b4c6-8941ce98a56&width=814)<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/1774803/1634525831718-863c835f-f365-4b08-9632-172c413499d6.png#clientId=u1e185ee5-5f6d-4&from=paste&height=249&id=u960518c7&margin=%5Bobject%20Object%5D&name=image.png&originHeight=619&originWidth=1748&originalType=binary&ratio=1&size=315986&status=done&style=shadow&taskId=u64860030-abae-4fa7-a16f-84ba02614c8&width=704)
  5. <a name="pMcJw"></a>
  6. # 3. Spark 的 CheckPoint 机制
  7. 对于一些机器学习、人工智能的一些应用,往往需要经过无数次对中间结果的一些迭代计算。那么对于这种迭代计算的中间结果,我们可以使用 cache 存放到一个内存中,提高计算效率。<br />但是这个并不是我们的最优的解决方案, 主要存在以下几个问题:
  8. 1. **局部不安全**:数据是保存在当前的 executor 中的,如果这个 executor 挂掉那么对应的缓存的数据就没有了,那么在对于中间数据比较重要的应用是灾难性的
  9. 1. **全局不安全**:对于迭代计算,很可能在运行阶段我们的 **SparkSubmit 程序挂掉**,**那么这个时候所有的中间数据都会丢失**
  10. 因为存在上面两种情况,我们应该把中间结果持久化保存,这个时候我们可以把我们的数据保存到分布式文件系统(HDFS)上面,这样的话我们的数据就比较安全
  11. <a name="SOGLP"></a>
  12. ## 3.1. CheckPoint 的实现
  13. 设置缓存目录
  14. ```java
  15. sc.setCheckpointDir("hdfs://lab150:9000/ckdir") # 设置缓存的目录, 目录不存在,则会创建

设置需要缓存的文件

  1. bigdataRDD.checkpoint() # 设置需要缓存的 RDD 的数据, Action 触发的时候回执行 checkpoint 操作
  2. bigdataRDD.count

3.2. CheckPoint 的使用场景

  1. 需要求保证数据的安全,不可丢失
  2. 相对于使用 cache 来说,速度要求不高
  3. 将中间结果保存到 HDFS 分布式文件系统

对于我们 checkPoint 保存后的数据,下次在进行统计分析的时候对于 checkPoint 的之前的 RDD 的运算就不会在重新读取,直接从 checkPoint 的 RDD 开始进行操作计算
image.png

3.3. Checkpoint 源码分析

image.png
image.png
如果当前的 RDD 既做了 cache,又做了 checkpoint,那么会优先使用 cache 的数据

4. 综合案例

4.1. 根据 Ip 找到对应的省份信息的需求

在现在的业务系统,很多广告投放的时候回指定区域投放广告,比如当前的访问用户是海南的用户,如果你给他推送一个羽绒服的广告显然是在浪费钱,如果当前的访问用户是新疆的用户,我们给用户推送猪肉系列的零食显然是有问题的
所以在很多的网站,现在基本上都会通过用户的相关信息进行用户的行为分析,通过后台发送日志到日志服务器
比如京东、爱奇艺、淘宝等网站基本上都会通过日志来收集用户的相关的行为信息分析

4.2. 需求分析

根据访问用户的日志信息中的 Ip 地址计算其请求的归属地,并且按照省份统计每个省份的访问次数,然后将计算好的结果写入到 MySQL 中

  1. 整理用户请求的日志数据,切分出对应的 IP 地址,并且转换为十进制数据
  2. 加载并整理规则,取出有用的字段,然后将数据缓存到内存中(Executor 内存)
  3. 将访问日志的 ip 信息去规则中查找对应的省份信息(二分法查找)
  4. 取出对应的省份名称,然后将其和 1 组合在一起
  5. 按照省份名称分组聚合
  6. 将聚合后的数据写入到 MySQl 中

日志采集框架图
image.png
image.png

  1. 前端记录用户信息,发送Get请求,通常会将接口地址伪装成资源地址,如 xxx.gif
  2. 用户信息请求频率高,对应的api服务应当使用负载均衡
    1. 使用 nginx 主从进行请求负载均衡转发
    2. api服务多套部署
  3. api 服务接收到用户信息后,进行信息解析
  4. 解析后的信息作为日志写入到本地的日志文件中 —> 这是原始信息,应当写入文件中,而不是写入数据库中
  5. 另外起一个 HDFS-Client,定时将各个 api 服务机器上的文件写入 HDFS 中,然后清除机器上的日志文件
  6. 日志分析 ETL 启动,从 HDFS 上拉取文件

    4.3. 具体实现步骤

    IP 地址可以在 api 服务中直接获取
    这里只是模拟日志分析ETL的实现
    先完成一个单机版的
  • 接收一个 ip i地址
  • 转成 Long 数值
  • 根据 Long 数值,查规则文件,返回地址

    4.3.1.完成对于 Ip 地址的一个转换功能

    IP 本身是一个 32 位的二进制数据,而 java Long 类型也是32位的,因此可以将 IP 转为 Long 数值,方便进行比对
    帮助类

    1. //把 IP 转换为 Long 类型数据
    2. def ip2Long(ip: String): Long = {
    3. val fragments = ip.split("[.]")
    4. var ipNum = 0L
    5. for (i <- 0 until fragments.length){
    6. ipNum = fragments(i).toLong | ipNum << 8L
    7. }
    8. ipNum
    9. }

    4.3.2.把规则加载到内存并返回一个数组

    1. //读取规则
    2. def readRules(path: String): Array[(Long, Long, String)] = {
    3. //读取 ip 规则
    4. val bf: BufferedSource = Source.fromFile(path)
    5. val lines: Iterator[String] = bf.getLines()
    6. //对 ip 规则进行整理, 并放入到内存
    7. val rules: Array[(Long, Long, String)] = lines.map(line => {
    8. val fileds = line.split("[|]")
    9. val startNum = fileds(2).toLong
    10. val endNum = fileds(3).toLong
    11. val province = fileds(6)
    12. (startNum, endNum, province)
    13. }).toArray
    14. rules
    15. }

    4.3.3.二分法查找

    1. //二分法查找数据 返回查找到对应的索引,没有找到返回-1
    2. def binarySearch(lines: Array[(Long, Long, String)], ip: Long) : Int = {
    3. var low = 0
    4. var high = lines.length - 1
    5. while (low <= high) {
    6. val middle = (low + high) / 2
    7. if ((ip >= lines(middle)._1) && (ip <= lines(middle)._2))
    8. return middle
    9. if (ip < lines(middle)._1)
    10. high = middle - 1
    11. else {
    12. low = middle + 1
    13. }
    14. }
    15. -1
    16. }

    4.3.4. 单机测试

    1. def main(args: Array[String]): Unit = {
    2. //数据是在内存中
    3. val rules: Array[(Long, Long, String)] = readRules("d:/ip/ip.txt")
    4. //将 ip 地址转换成十进制
    5. val ipNum = ip2Long("223.242.48.0")
    6. //查找
    7. val index = binarySearch(rules, ipNum)
    8. //根据脚本到 rules 中查找对应的数据
    9. val tp = rules(index)
    10. val province = tp._3
    11. println(province)
    12. }

    4.3.5.日志统计业务处理流程图

    image.png

  • 在我们的 Driver 编写客户端代码,实现 RDD 的封装和依赖关系的转换

  • 在 Driver 端读取出 IP 归属地的数据到内存
  • 把 Driver 端的 IP 的数据的广播到所有的 Executor 机器上面
  • 在 Executor 执行任务把数据保存到 MySQL 数据库中

    4.3.6. 读取本地的规则

    注意:
    规则是在本地读取的,也即是在 sparkSubmit(Driver)中读取的,而 runJob 时,只会发送RDD的运算逻辑到 Executor,读取的文件并不会主动发送过去
    因此不能直接使用于 RDD 的操作中,会报错的
    需要先将本地读取的文件内容进行广播,广播到各个 Executor 中后续才能在 RDD 中使用

    1. //读取本地的规则进行数据的查找
    2. object IPLocaltion01 {
    3. def main(args: Array[String]): Unit = {
    4. val conf = new SparkConf().setAppName("IPLocaltion01").setMaster("local[4]")
    5. val sc = new SparkContext(conf)
    6. //第一步, 先把所有的规则读取出来
    7. val rules: Array[(Long, Long, String)] = MyUtils.readRules(args(0))
    8. // 把规则广播到对应的所有的 Executor 进程
    9. val rulesRef: Broadcast[Array[(Long, Long, String)]] = sc.broadcast(rules)
    10. //开始读取 hdfs 的中日志数据
    11. val lines: RDD[String] = sc.textFile(args(1))
    12. val func=(line:String)=>{
    13. //对每行日志进行处理
    14. //223.223.206.0|223.223.206.255|3755986432|3755986687| 亚 洲 | 中 国| 北 京 | 北 京|| 联 通 |110100|China|CN|116.405285|39.904989
    15. val fields: Array[String] = line.split("[|]")
    16. val ip=fields(1)
    17. val ipNum=MyUtils.ip2Long(ip)
    18. val rules: Array[(Long, Long, String)] = rulesRef.value
    19. var province="未知"
    20. val index:Int = MyUtils.binarySearch(rules,ipNum)
    21. if(index != -1){
    22. province=rules(index)._3
    23. }
    24. (province,1)
    25. }
    26. //映射操作
    27. val provinceAndOne: RDD[(String, Int)] = lines.map(func)
    28. //聚合操作
    29. val reduced: RDD[(String, Int)] = provinceAndOne.reduceByKey(_+_)
    30. //打印收集结果
    31. println(reduced.collect().toBuffer)
    32. sc.stop()
    33. }
    34. }

    4.3.7.读取 HDFS 文件系统的规则

    读取HDFS的内容都是在各个Executor上的,Driver不会实际运行RDD
    存在问题:

  • 各个Executor 读取的HDFS文件,读取的时文件的区块,并不是 ip地域规则的完整文件,拿这个去匹配的话,会导致大量的ip匹配不上

解决方法

  • 各个 Executor 读取完后,将读取的RDD归拢到Driver,再有Driver进行广播到各个Executor,进行后续的计算

操作

  • textFile —> collect —> broadcast ```scala package day04.cn.wolfcode.spark import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}

object IPLocaltion02 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(“IPLocaltion02”).setMaster(“local[4]”) val sc = new SparkContext(conf) //第一步, 从 HDFS 文件系统读取规则并且回收到 Driver 端 val ipRulesLines: RDD[String] = sc.textFile(args(0)) val ipRulesRDD: RDD[(Long, Long, String)] = ipRulesLines.map(line => { val fields: Array[String] = line.split(“[|]”) val startNum = fields(2).toLong val endNum = fields(3).toLong val province = fields(6) (startNum, endNum, province) }) val ipRules: Array[(Long, Long, String)] = ipRulesRDD.collect() val ipRulesRef: Broadcast[Array[(Long, Long, String)]] = sc.broadcast(ipRules) val func=(line:String)=>{ //对每行日志进行处理 //223.223.206.0|223.223.206.255|3755986432|3755986687| 亚 洲 | 中 国| 北 京 | 北 京|| 联 通|110100|China|CN|116.405285|39.904989 val fields: Array[String] = line.split(“[|]”) val ip=fields(1) val ipNum=MyUtils.ip2Long(ip) val rules: Array[(Long, Long, String)] = ipRulesRef.value var province=”未知” val index:Int = MyUtils.binarySearch(rules,ipNum) if(index != -1){ province=rules(index).3 } (province,1) } val lines: RDD[String] = sc.textFile(args(1)) //映射操作 val provinceAndOne: RDD[(String, Int)] = lines.map(func) //聚合操作 val reduced: RDD[(String, Int)] = provinceAndOne.reduceByKey(+_) //打印收集结果 println(reduced.collect().toBuffer) sc.stop() } }

  1. <a name="RNdyv"></a>
  2. ### 4.3.8. 保存数据到 MySQL
  3. 导入连接驱动程序
  4. ```xml
  5. <dependency>
  6. <groupId>mysql</groupId>
  7. <artifactId>mysql-connector-java</artifactId>
  8. <version>5.1.38</version>
  9. </dependency>

实现代码

  1. //保存数据到 MySQL 数据库
  2. reduced.foreachPartition(it=>MyUtils.data2MySQL(it))