- 一.Spark-core
- 1.什么是RDD
- 2.RDD的五大特性
- 2.1 A list of partitions
- 2.2 A function for computing each split
- 2.3 A list of dependencies on other RDDs
- 2.4 Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
- 2.5 Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
- 3.RDD的创建
- 4 RDD 的算子分类
- 5.RDD的常见算子
- 二.Spark SQL
- 利用config对象,创建spark session
spark 可视化web端:http://192.168.19.137:4040 hadoop 可视化web端:http://192.168.19.137:50070/ ————————— Spark的开启方式: 先开启hadoop服务器,再运行Spark。 命令如下:
cd sbin
./start-dfs.sh
pyspark
sc
即可成功运行Spark并使用SparkContext服务。 ————————— mobaxterm远程连接jupyter notebook方式:cd bigdata
cd spark
cd sbin
./start-master.sh -h 192.168.19.137
./start-slave.sh spark://192.168.19.137:7077
cd ~
source activate py365
jupyter notebook --ip 0.0.0.0 --allow-root
一.Spark-core
1.什么是RDD
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合.
- Dataset:一个数据集,简单的理解为集合,用于存放数据的
- Distributed:它内部的元素进行了分布式存储,方便后期进行和分布式计算。
- Resilient:弹性的
- RDD的数据可以保存在内存或是磁盘中。
- 数据分布式也是弹性的
- 弹性:并不是指他可以动态扩展,而是容错机制。
- 不可变
- 可分区
- 并行计算
2.RDD的五大特性
2.1 A list of partitions
一个分区(Partition)列表,组成了该RDD的数据。
这里表示一个rdd有很多分区,每一个分区内部是包含了该rdd的部分数据,spark中任务是以task线程的方式运行, 一个分区就对应一个task线程。
用户可以在创建RDD时指定RDD的分区个数,如果没有指定,那么就会采用默认值。(比如:读取HDFS上数据文件产生的RDD分区数跟block的个数相等)
RDD的分区其实可以简单这样理解,比如说我现在要来一个 wordCount,这个文本的大小是300M,那按照我们 HDFS 的套路,每128M是一个 block 块,那这个300M的文件就是3个 block ,然后我们的RDD会按照你这个文件的拥有的 block 块数来决定 RDD 的分区数,此时 RDD 的分区数就是3,但是如果我这个文件本身就小于128M呢,那 RDD 就会默认为2个分区数。
2.2 A function for computing each split
每个分区的计算函数都算是一个RDD —- Spark 中 RDD 的计算是以分区为单位的,每个RDD都会实现 compute 函数以达到这个目的。
2.3 A list of dependencies on other RDDs
一个rdd会依赖于其他多个rdd —- rdd与rdd之间的依赖关系,spark 任务的容错机制就是根据这个特性而来。
2.4 Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
针对key-value类型的RDD才有分区函数,分区函数其实就是把计算的结果丢到不同的分区中。
当前Spark中实现了两种类型的分区函数,一个是基于哈希的 HashPartitioner,另外一个是基于范围的 RangePartitioner。
只有对于key-value的RDD,并且产生shuffle,才会有 Partitioner,非key-value的RDD的 Parititioner 的值是None。
HashPartitioner 的套路在 MapReduce 中有描述,其实大数据的那些分区套路好多都是这个套路。
RangePartitioner 就是类似规定了多少到多少丢这个分区,多少到多少又丢那个分区。
2.5 Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
计算任务的位置优先为存储每个Partition 的位置 (可自定义)
这里涉及到数据的本地性,数据块位置最优。简单点说也就是说哪里有数据我们就在哪里做计算的意思。
spark任务在调度的时候会优先(注意只是优先,而不是必然)考虑存有数据的节点开启计算任务,减少数据的网络传输,提升计算效率。
3.RDD的创建
创建RDD之前要有spark context.
官方解释:SparkContext是spark功能的主要入口。其代表与spark集群的连接,能够用来在集群上创建RDD、累加器、广播变量。每个JVM里只能存在一个处于激活状态的SparkContext,在创建新的SparkContext之前必须调用stop()来关闭之前的SparkContext。
通过内存中的数据创建RDD
data = [1,2,3,4,5]
rdd1 = sc.parallelize(data)
rdd2 = sc.parallelize(data,5) # 5是partition的数量,一个partition会对应一个task
rdd1.reduce(lambda x,y:x+y)
rdd2.reduce(lambda x,y;x+y)
后面两行代码均输出15。reduce
将RDD中的元素两两传递给函数,同时产生新的值,新产生的值会被递归调用。因此与lambda一起使用的作用效果是数据累加。
上图显示出两次 parallelize 操作产生的 Tasks 数量不同,以及因此对 Duration (运行时间) 产生的影响。
4 RDD 的算子分类
4.1 transformation(转换)
根据已经存在的 rdd 转换生成一个新的 rdd , 它是延迟加载,它不会立即执行。
比如,map 就是一个 transformation 操作,把数据集中的每一个元素传给一个函数并返回一个新的 RDD,代表 transformation 操作的结果。
4.2 action (动作)
它会真正触发任务的运行。将rdd的计算的结果数据返回给Driver端,或者是保存结果数据到外部存储介质中。
比如,reduce 就是一个 action 操作,使用某个函数聚合 RDD 所有元素的操作,并返回最终计算的结果。
5.RDD的常见算子
5.1 transformation算子
转换 | 含义 |
---|---|
map(func) | 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成 |
filter(func) | 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成 |
flatMap(func) | 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素) |
mapPartitions(func) | 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) | 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U] |
union(otherDataset) | 对源RDD和参数RDD求并集后返回一个新的RDD |
intersection(otherDataset) | 对源RDD和参数RDD求交集后返回一个新的RDD |
distinct([numTasks])) | 对源RDD进行去重后返回一个新的RDD |
groupByKey([numTasks]) | 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD |
reduceByKey(func, [numTasks]) | 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置 |
sortByKey([ascending], [numTasks]) | 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD |
sortBy(func,[ascending], [numTasks]) | 与sortByKey类似,但是更灵活 |
join(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD |
cogroup(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD |
coalesce(numPartitions) | 减少 RDD 的分区数到指定值。 |
repartition(numPartitions) | 重新给 RDD 分区 |
repartitionAndSortWithinPartitions(partitioner) | 重新给 RDD 分区,并且每个分区内以记录的 key 排序 |
5.2 action算子
动作 | 含义 |
---|---|
reduce(func) | reduce将RDD中元素前两个传给输入函数,产生一个新的return值,新产生的return值与RDD中下一个元素(第三个元素)组成两个元素,再被传给输入函数,直到最后只有一个值为止。 |
collect() | 在驱动程序中,以数组的形式返回数据集的所有元素 |
count() | 返回RDD的元素个数 |
first() | 返回RDD的第一个元素(类似于take(1)) |
take(n) | 返回一个由数据集的前n个元素组成的数组 |
takeOrdered(n, [ordering]) | 返回自然顺序或者自定义顺序的前 n 个元素 |
saveAsTextFile(path) | 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本 |
saveAsSequenceFile(path) | 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。 |
saveAsObjectFile(path) | 将数据集的元素,以 Java 序列化的方式保存到指定的目录下 |
countByKey() | 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。 |
foreach(func) | 在数据集的每一个元素上,运行函数func |
foreachPartition(func) | 在数据集的每一个分区上,运行函数func |
二.Spark SQL
1.简介
Spark Sql 是 Spark 中用于处理结构化数据的一个模块。它主要用于结构化数据处理和对Spark数据执行类SQL的查询。通过Spark SQL,可以针对不同格式的数据执行ETL操作(如JSON,Parquet,数据库)然后完成特定的查询操作。
2.DataFrame
2.1介绍
DataFrame是一个分布式的行集合。
为了支持结构化数据的处理,Spark SQL 提供了新的数据结构 DataFrame。DataFrame 是一个由具名列组成的数据集。它在概念上等同于关系数据库中的表或 R/Python 语言中的 data frame
。
2.2DataFrame 对比 RDD
DataFrame 和 RDDs 最主要的区别在于一个面向的是结构化数据,一个面向的是非结构化数据,它们内部的数据结构如下:
DataFrame 内部的有明确 Scheme 结构,即列名、列字段类型都是已知的,这带来的好处是可以减少数据读取以及更好地优化执行计划,从而保证查询效率。
- DataFrame相当于是一个带着schema的RDD。
- DataFrame还引入了off-heap,意味着JVM堆以外的内存。
- RDD是分布式的Java对象的集合。DataFrame是分布式的Row对象的集合。
2.3DataFrame的创建
- 创教DataFrame先需要一个spark session。 ```python SPARK_APP_NAME = “dataframetest” SPARK_URL = “spark://192.168.19.137:7077”
conf = SparkConf() # 创建spark config对象 config = ( (“spark.app.name”, SPARK_APP_NAME), # 设置启动的spark的app名称,没有提供,将随机产生一个名称 (“spark.executor.memory”, “2g”), # 设置该app启动时占用的内存用量,默认1g (“spark.master”, SPARK_URL), # spark master的地址 (“spark.executor.cores”, “2”), # 设置spark executor使用的CPU核心数
conf.setAll(config)
利用config对象,创建spark session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
- 从CSV文件创建DataFrame
```python
df = spark.read.format('csv').option ('header true').load('/iris. csv') # csv文件默认从hdfs找