Spark Core

1、Spark架构设计

1.1 架构设计图

2 Spark Core - 图1

1.2 相关术语名词解释

  • 1、RDD (Resilient Distributed DataSet)
    • 弹性分布式数据集,是对数据集在spark存储和计算过程中的一种抽象。
    • 是一组只读、可分区的分布式数据集合。
    • 一个RDD 包含多个分区Partition(类似于MapReduce中的InputSplit中的block),分区是依照一定的规则的,将具有相同规则的属性的数据记录放在一起。
    • 横向上可切分并行计算,以分区Partition为切分后的最小存储和计算单元。
    • 纵向上可进行内外存切换使用,即当数据在内存不足时,可以用外存磁盘来补充。
  • 2、Partition(分区)
    • Partition类似hadoop的Split中的block,计算是以partition为单位进行的,提供了一种划分数据的方式。
    • partition的划分依据有很多,常见的有Hash分区、范围分区等,也可以自己定义的,像HDFS文件,划分的方式就和MapReduce一样,以文件的block来划分不同的partition。
    • 一个Partition交给一个Task去计算处理
  • 3、算子
    • 英文简称:Operator,简称op
    • 广义上讲,对任何函数进行某一项操作都可以认为是一个算子
    • 通俗上讲,算子即为映射、关系、变换。
    • MapReduce算子,主要分为两个,即为Map和Reduce两个主要操作的算子,导致灵活可用性比较差。
    • Spark算子,分为两大类,即为Transformation和Action类,合计有80多个。
  • 4、Transformation类算子
    • 操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。
    • 细分类
      • Value数据类型的Transformation算子
      • Key-Value数据类型的Transfromation算子
  • 5、Action类算子
    • 会触发 Spark 提交作业(Job),并将数据输出 Spark系统。
  • 6、窄依赖
    • 如果一个父RDD的每个分区只被子RDD的一个分区使用 ——> 一对一关系
  • 7、宽依赖
    • 如果一个父RDD的每个分区要被子RDD 的多个分区使用 ——> 一对多关系
  • 8、Application
    • Spark Application的概念和MapReduce中的job或者yarn中的application类似,指的是用户编写的Spark应用程序,包含了一个Driver功能的代码和分布在集群中多个节点上运行的Executor代码
    • 一般是指整个Spark项目从开发、测试、布署、运行的全部。
  • 9、Driver
    • 运行main函数并且创建SparkContext的程序。
    • 称为驱动程序,Driver Program类似于hadoop的wordcount程序中的driver类的main函数。
  • 10、Cluster Manager
    • 集群的资源管理器,在集群上获取资源的服务。如Yarn、Mesos、Spark Standalone等。
    • 以Yarn为例,驱动程序会向Yarn申请计算我这个任务需要多少的内存,多少CPU等,后由Cluster Manager会通过调度告诉驱动程序可以使用,然后驱动程序将任务分配到既定的Worker Node上面执行。
  • 11、WorkerNode
    • 集群中任何一个可以运行spark应用代码的节点。
    • Worker Node就是物理机器节点,可以在上面启动Executor进程。
  • 12、Executor
    • Application运行在Worker节点上的一个进程,该进程负责运行Task,并且负责将数据存在内存或者磁盘上,每个Application都有各自独立专享的一批Executor。
    • Executor即为spark概念的资源容器,类比于yarn的container容器,真正承载Task的运行与管理,以多线程的方式运行Task,更加高效快速。
  • 13、Task
    • 与Hadoop中的Map Task或者Reduce Task是类同的。
    • 分配到executor上的基本工作单元,执行实际的计算任务。
    • Task分为两类,即为ShuffleMapTask和ResultTask。
      • ShuffleMapTask:即为Map任务和发生Shuffle的任务的操作,由Transformation操作组成,其输出结果是为下个阶段任务(ResultTask)进行做准备,不是最终要输出的结果。
      • ResultTask:即为Action操作触发的Job作业的最后一个阶段任务,其输出结果即为Application最终的输出或存储结果。
  • 14、Job(作业)
    • Spark RDD里的每个action的计算会生成一个job。
    • 用户提交的Job会提交给DAGScheduler(Job调度器),Job会被分解成Stage去执行,每个Stage由一组相同计算规则的Task组成,该组Task也称为TaskSet,实际交由TaskScheduler去调度Task的机器执行节点,最终完成作业的执行。
  • 15、Stage(阶段)
    • Stage是Job的组成部分,每个Job可以包含1个或者多个Stage。
    • Job切分成Stage是以Shuffle作为分隔依据,Shuffle前是一个Stage,Shuffle后是一个Stage。即为按RDD宽窄依赖来划分Stage。
    • 每个Job会被拆分很多组Task,每组任务被称为Stage,也可称TaskSet,一个作业可以被分为一个或多个阶段

2、spark运行模式与用户交互方式

2.1运行模式

即作业以什么样的模式去执行,主要是单机、分布式两种方式的细节选择。

序号 模式名称 特点 应用场景
1 本地运行模式(local) 单台机器多线程来模拟spark分布式计算 机器资源不够
测试验证程序逻辑的正确性
2 伪分布式模式 单台机器多进程来模拟spark分布式计算 机器资源不够
测试验证程序逻辑的正确性
3 standalone(client) 独立布署spark计算集群
自带clustermanager
driver运行在spark submit client端
机器资源充分
纯用spark计算框架
任务提交后在spark submit client端实时查看反馈信息
数据共享性弱
测试使用还可以,生产环境极少使用该种模式
4 standalone(cluster) 独立布署spark计算集群
自带clustermanager
driver运行在spark worker node端
机器资源充分
纯用spark计算框架
任务提交后将退出spark submit client端
数据共享性弱
测试和生产环境均可以自由使用,但更多用于生产环境
5 spark on yarn
(yarn-client)
以yarn集群为基础
只添加spark计算框架相关包
driver运行在yarn client上
机器资源充分
多种计算框架混用
数据共享性强
任务提交后在yarn client端实时查看反馈信息
6 spark on yarn
(yarn-cluster)
以yarn集群为基础
只添加spark计算框架相关包
driver运行在集群的am contianer中
机器资源充分
多种计算框架混用
数据共享性强
任务提交后将退出yarn client端
7 spark on mesos/ec2 与spark on yarn类似 与spark on yarn类似
在国内应用较少

2.2用户交互方式

1、spark-shell:spark命令行方式来操作spark作业。

  • 多用于简单的学习、测试、简易作业操作。

2、spark-submit:通过程序脚本,提交相关的代码、依赖等来操作spark作业。

  • 最多见的提交任务的交互方式,简单易用、参数齐全。

3、spark-sql:通过sql的方式操作spark作业。

  • sql相关的学习、测试、生产环境研发均可以使用该直接操作交互方式。

4、spark-class:最低层的调用方式,其它调用方式多是最终转化到该方式中去提交。

  • 直接使用较少

5、sparkR,sparkPython:通过其它非java、非scala语言直接操作spark作业的方式。

  • R、python语言使用者的交互方式。

    2.2.1重要交互方式使用介绍

    重点说明spark-shell,spark-submit两大方式,spark-sql后有专门章节介绍,其它小众方式不做介绍。
    1、spark-shell
  • 交互方式定位
    • 一个强大的交互式数据操作与分析的工具,提供一个简单的方式快速学习spark相关的API。
  • 启动方式
    • 前置环境:已将spark-shell等交互式脚本已加入系统PATH变量,可在任意位置使用。
    • 以本地2个线程来模拟运行spark相关操作,该数量一般与本机的cpu核数相一致为最佳spark-shell —master local[2]
  • 相关参数
    • 1.参数列表获取方式:spark-shell —help
    • 2.其参数非常多,但由于该方式主要是简单学习使用,故其参数使用极少,故不做详解。
  • 使用示例介绍
    • 交互式入口

2 Spark Core - 图2

  • 构建一个scala列表,并输出

2 Spark Core - 图3

  • 通过scala列表,构造一个rdd,并进行基本操作

2 Spark Core - 图4

  • 通过本地文本文件构建rdd,并进行基本操作

2 Spark Core - 图5

  • 通过hdfs文本文件构建rdd,并进行基本操作

2 Spark Core - 图6

  • 对rdd进行整型过滤操作

2 Spark Core - 图7

  • 对rdd进行求最大值操作

2 Spark Core - 图8
2 Spark Core - 图9

  • 对输入进行wordcount计算-无排序

2 Spark Core - 图10

  • 对输入进行wordcount计算-按词频降序排列输出

2 Spark Core - 图11
2、spark-submit

  • 交互方式定位
    • 最常用的通过程序脚本,提交相关的代码、依赖等来操作spark作业的方式。
  • 启动方式
    • spark-submit提交任务的模板

spark-submit \
—class \
—master \
—jars jar_list_by_comma \
—conf = \
… # other options
\
[application-arguments]

  • spark-submit 详细参数说明 | 参数名 | 参数说明 | | —- | —- | | —master | master 的地址,提交任务到哪里执行,例如 spark://host:port, yarn, local | | —deploy-mode | 在本地 (client) 启动 driver 或在 cluster 上启动,默认是 client | | —class | 应用程序的主类,仅针对 java 或 scala 应用 | | —name | 应用程序的名称 | | —jars | 用逗号分隔的本地jar 包,设置后,这些 jar 将包含在 driver 和 executor 的 classpath 下 | | —packages | 包含在driver 和executor 的 classpath 中的 jar 的 maven 坐标 | | —exclude-packages | 为了避免冲突 而指定不包含的 package | | —repositories | 远程 repository | | —conf PROP=VALUE | 指定 spark 配置属性的值,
    例如 -conf spark.executor.extraJavaOptions=”-XX:MaxPermSize=256m” | | —properties-file | 加载的配置文件,默认为 conf/spark-defaults.conf | | —driver-memory | Driver内存,默认 1G | | —driver-java-options | 传给 driver 的额外的 Java 选项 | | —driver-library-path | 传给 driver 的额外的库路径 | | —driver-class-path | 传给 driver 的额外的类路径 | | —driver-cores | Driver 的核数,默认是1。在 yarn 或者 standalone 下使用 | | —executor-memory | 每个 executor 的内存,默认是1G | | —total-executor-cores | 所有 executor 总共的核数。仅仅在 mesos 或者 standalone 下使用 | | —num-executors | 启动的 executor 数量。默认为2。在 yarn 下使用 | | —executor-core | 每个 executor 的核数。在yarn或者standalone下使用 |

  • 关于—master取值的特别说明 | local | 本地worker线程中运行spark,完全没有并行 | | —- | —- | | local[K] | 在本地work线程中启动K个线程运行spark | | local[*] | 启动与本地work机器的core个数相同的线程数来运行spark | | spark://HOST:PORT | 连接指定的standalone集群的master,
    默认7077端口 | | mesos://HOST:PORT | 连接到mesos集群,国内用的极少 | | yarn | 使用yarn的cluster或者yarn的client模式连接。取决于—deploy-mode参数,由deploy-mode的取值为client或是cluster来最终决定。
    也可以用yarn-client或是yarn-cluster进行二合一参数使用,保留—master去掉—deploy-mode参数亦可。
    —master yarn-client,相当于—master yarn –deploy-mode client的二合一 |

3、java实现spark wordcount示例

3.1 开发与测试环境准备

  • 配置winutils.exe
    • 意义
      • hadoop基于linux开发和布署运行,故不能将hadoop环境原始运行在windows上。
    • 操作系统环境差异说明
      • linux环境不需要
      • windows下运行需要该文件
      • 配置到运行环境当前目录下的null/bin目录下
      • 若为eclipse开发,则配置到项目根目录下的null/bin/下即可
    • winutils下载
      • 天亮教育官方云盘可直接下载

2 Spark Core - 图12

  • maven pom配置
    • 配置项
      • eclipse-jee版
      • spark-2.3.2
      • jdk1.8.x(自spark2.2.0起,jdk1.7将不再支持)
      • log4j
    • pom配置模板


4.0.0
study
spark_study
0.1

UTF-8




nexus-aliyun
Nexus aliyun
http://maven.aliyun.com/nexus/content/groups/public



org.apache.spark
spark-core_2.11
2.3.2
provided



log4j
log4j
1.2.17
provided



spark_study


maven-compiler-plugin
2.3.2

1.8
1.8
UTF-8



maven-assembly-plugin


jar-with-dependencies




make-assembly
package

assembly






3.2 java操作spark api实现wordcount代码编写

package com.tl.job011.spark.demo;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

/
java版实现的spark word count

@author zel

@date 2020年8月5日
/
public class SparkWordCount4Java {
public static void main(String[] args) {
/

##总步骤划分#
1、初始化spark conf,设置app name
2、构建java spark context
3、定义输入文件,本地或是hdfs等均可
4、构建file rdd,其实质是以文本行为单位的组织形式,也可以称为lines rdd
5、lines rdd -> word rdd
6、word rdd -> (word,1)的rdd
7、(word,1)的rdd -> (word,freq)的rdd
8、将(word,freq) rdd -> collect落地形成本地对象
9、将对象数据打印出来
10、关闭上下文环境
/

// 1、初始化spark conf,设置app name
SparkConf conf = new SparkConf();
conf.setAppName(“JavaSparkWordCount-4-job011”);
// conf.setMaster(“local[2]”);
// conf.setMaster(“yarn-client”);

// 2、构建java spark context
JavaSparkContext jsc = new JavaSparkContext(conf);

// 3、定义输入文件,本地或是hdfs等均可
// String localFilePath = “file:////D:/temp/input.txt”;
if (args == null || args.length != 1) {
System.err.println(“传参有误 ,请检查!!!”);
System.exit(-1);
}
String localFilePath = args[0];

// 4、构建file rdd=line rdd
JavaRDD fileRDD = jsc.textFile(localFilePath);

// 5、lines rdd -> word rdd
JavaRDD wordRDD = fileRDD
.flatMap(new FlatMapFunction() {

@Override
public Iterator call(String line) throws Exception {
return Arrays.asList(line.split(“\\s+”)).iterator();
}

});

// 6、word rdd -> (word,1)的rdd
JavaPairRDD kvPairRDD = wordRDD
.mapToPair(new PairFunction() {

@Override
public Tuple2 call(String word)
throws Exception {
return new Tuple2(word, 1);
}

});

// 7、(word,1)的rdd -> (word,freq)的rdd
JavaPairRDD wcPairRDD = kvPairRDD
.reduceByKey(new Function2() {

@Override
public Integer call(Integer v1, Integer v2)
throws Exception {
return v1 + v2;
}
});

// 8、将(word,freq) rdd -> collect落地形成本地对象
List> pairList = wcPairRDD.collect();

// 9、将对象数据打印出来
for (Tuple2 tuple2 : pairList) {
System.out.println(tuple2);
}

// 10、关闭上下文环境
jsc.stop();

System.out.println(“done!!”);

}
}

3.3 代码运行

  • 本地方式-local-开发环境
    • 直接在主类中,run as application即可

2 Spark Core - 图13

  • linux本地方式-local-生产环境
    • 开发环境打包:跟之前的maven打包完全一致
    • 编写Shell运行脚本
      • 注意修改代码中的setMaster代码,将之前代码设置注释掉

! /bin/sh
# 配置成hadoop配置文件存放目录
export HADOOP_CONF_DIR=/usr/hdp/3.1.0.0-78/hadoop/conf/
spark-submit \
—class com.tl.job014.spark.SparkWordCount4Java \
—master local[2] \
—driver-memory 512m \
—executor-memory 512m \
—num-executors 2 \
/home/zel/job014/FirstSparkWordCount4Java/FirstSpark4Java-jar-with-dependencies.jar \
file:///home/zel/job014/FirstSparkWordCount4Java/input.txt
#hdfs://cluster0.hadoop:8020/tmp/spark/input.txt

  • 输出效果

2 Spark Core - 图14

  • 集群方式
    • yarn-client
      • 注意修改代码中的setMaster代码,将之前代码设置注释掉

! /bin/sh
# 配置成hadoop配置文件存放目录
export HADOOP_CONF_DIR=/usr/hdp/3.1.0.0-78/hadoop/conf/
spark-submit \
—class com.tl.job014.spark.SparkWordCount4Java \
—master yarn-client \ #此处有变化
—driver-memory 512m \
—executor-memory 512m \
—num-executors 2 \
/home/zel/job014/FirstSparkWordCount4Java/FirstSpark4Java-jar-with-dependencies.jar \
file:///home/zel/job014/FirstSparkWordCount4Java/input.txt
#hdfs://cluster0.hadoop:8020/tmp/spark/input.txt

  • 运行效果
    • 注意输入路径,不能再为本地路径输入,应改为hdfs等分布式文件系统的路径
    • yarn-client等于—master设置成yarn,deploy-mode设置成client
      • yarn-cluster

! /bin/sh
# 配置成hadoop配置文件存放目录
export HADOOP_CONF_DIR=/usr/hdp/3.1.0.0-78/hadoop/conf/
spark-submit \
—class com.tl.job014.spark.SparkWordCount4Java \
—master yarn-cluster \ #此处有变化
—driver-memory 512m \
—executor-memory 512m \
—num-executors 2 \
/home/zel/job014/FirstSparkWordCount4Java/FirstSpark4Java-jar-with-dependencies.jar \
file:///home/zel/job014/FirstSparkWordCount4Java/input.txt
#hdfs://cluster0.hadoop:8020/tmp/spark/input.txt

  • 运行效果

    • 注意输入路径,不能再为本地路径输入,应改为hdfs等分布式文件系统的路径
    • yarn-cluster等于—master设置成yarn,—deploy-mode设置成cluster

      4、scala实现spark wordcount示例

  • 详见“maven构建scala项目”专题篇。

    5、经典习题(spark-shell实现即可)

  • 给定一个hdfs文件,内部均为空格隔开的数值,求最大值。

  • 给定一个hdfs文件,内部均为空格隔开的数值,求最小值。
  • 给定一个hdfs文件,内部均为空格隔开的任意字符串,求字符串中包含”天亮教育”四个字的字符串有多少个?