spark 可视化web端:http://192.168.19.137:4040 hadoop 可视化web端:http://192.168.19.137:50070/ ————————— Spark的开启方式: 先开启hadoop服务器,再运行Spark。 命令如下:
cd sbin ./start-dfs.sh pyspark sc 即可成功运行Spark并使用SparkContext服务。 ————————— mobaxterm远程连接jupyter notebook方式: cd bigdata cd spark cd sbin ./start-master.sh -h 192.168.19.137 ./start-slave.sh spark://192.168.19.137:7077 cd ~ source activate py365 jupyter notebook --ip 0.0.0.0 --allow-root

一.Spark-core

1.什么是RDD

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合.

  • Dataset:一个数据集,简单的理解为集合,用于存放数据的
  • Distributed:它内部的元素进行了分布式存储,方便后期进行和分布式计算。
  • Resilient:弹性的
    • RDD的数据可以保存在内存或是磁盘中。
    • 数据分布式也是弹性的
    • 弹性:并不是指他可以动态扩展,而是容错机制。
  • 不可变
  • 可分区
  • 并行计算

2.RDD的五大特性

2.1 A list of partitions

一个分区(Partition)列表,组成了该RDD的数据。
这里表示一个rdd有很多分区,每一个分区内部是包含了该rdd的部分数据,spark中任务是以task线程的方式运行, 一个分区就对应一个task线程。

用户可以在创建RDD时指定RDD的分区个数,如果没有指定,那么就会采用默认值。(比如:读取HDFS上数据文件产生的RDD分区数跟block的个数相等)

RDD的分区其实可以简单这样理解,比如说我现在要来一个 wordCount,这个文本的大小是300M,那按照我们 HDFS 的套路,每128M是一个 block 块,那这个300M的文件就是3个 block ,然后我们的RDD会按照你这个文件的拥有的 block 块数来决定 RDD 的分区数,此时 RDD 的分区数就是3,但是如果我这个文件本身就小于128M呢,那 RDD 就会默认为2个分区数。

2.2 A function for computing each split

每个分区的计算函数都算是一个RDD —- Spark 中 RDD 的计算是以分区为单位的,每个RDD都会实现 compute 函数以达到这个目的。

2.3 A list of dependencies on other RDDs

一个rdd会依赖于其他多个rdd —- rdd与rdd之间的依赖关系,spark 任务的容错机制就是根据这个特性而来。

2.4 Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)

针对key-value类型的RDD才有分区函数,分区函数其实就是把计算的结果丢到不同的分区中。

当前Spark中实现了两种类型的分区函数,一个是基于哈希的 HashPartitioner,另外一个是基于范围的 RangePartitioner

只有对于key-value的RDD,并且产生shuffle,才会有 Partitioner,非key-value的RDD的 Parititioner 的值是None。
HashPartitioner 的套路在 MapReduce 中有描述,其实大数据的那些分区套路好多都是这个套路。
hash.png
RangePartitioner 就是类似规定了多少到多少丢这个分区,多少到多少又丢那个分区。

2.5 Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

计算任务的位置优先为存储每个Partition 的位置 (可自定义)

这里涉及到数据的本地性,数据块位置最优。简单点说也就是说哪里有数据我们就在哪里做计算的意思。

spark任务在调度的时候会优先注意只是优先,而不是必然)考虑存有数据的节点开启计算任务,减少数据的网络传输,提升计算效率。

3.RDD的创建

创建RDD之前要有spark context.

官方解释:SparkContext是spark功能的主要入口。其代表与spark集群的连接,能够用来在集群上创建RDD、累加器、广播变量。每个JVM里只能存在一个处于激活状态的SparkContext,在创建新的SparkContext之前必须调用stop()来关闭之前的SparkContext。

通过内存中的数据创建RDD

  1. data = [1,2,3,4,5]
  2. rdd1 = sc.parallelize(data)
  3. rdd2 = sc.parallelize(data,5) # 5是partition的数量,一个partition会对应一个task
  4. rdd1.reduce(lambda x,y:x+y)
  5. rdd2.reduce(lambda x,y;x+y)

后面两行代码均输出15。
reduce 将RDD中的元素两两传递给函数,同时产生新的值,新产生的值会被递归调用。因此与lambda一起使用的作用效果是数据累加。
tasks.png
上图显示出两次 parallelize 操作产生的 Tasks 数量不同,以及因此对 Duration (运行时间) 产生的影响。

4 RDD 的算子分类

4.1 transformation(转换)

根据已经存在的 rdd 转换生成一个新的 rdd , 它是延迟加载,它不会立即执行。
比如,map 就是一个 transformation 操作,把数据集中的每一个元素传给一个函数并返回一个新的 RDD,代表 transformation 操作的结果。

4.2 action (动作)

它会真正触发任务的运行。将rdd的计算的结果数据返回给Driver端,或者是保存结果数据到外部存储介质中。
比如,reduce 就是一个 action 操作,使用某个函数聚合 RDD 所有元素的操作,并返回最终计算的结果

5.RDD的常见算子

5.1 transformation算子

转换 含义
map(func) 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
filter(func) 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
flatMap(func) 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
mapPartitions(func) 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func) 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]
union(otherDataset) 对源RDD和参数RDD求并集后返回一个新的RDD
intersection(otherDataset) 对源RDD和参数RDD求交集后返回一个新的RDD
distinct([numTasks])) 对源RDD进行去重后返回一个新的RDD
groupByKey([numTasks]) 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
reduceByKey(func, [numTasks]) 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置
sortByKey([ascending], [numTasks]) 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks]) 与sortByKey类似,但是更灵活
join(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
coalesce(numPartitions) 减少 RDD 的分区数到指定值。
repartition(numPartitions) 重新给 RDD 分区
repartitionAndSortWithinPartitions(partitioner) 重新给 RDD 分区,并且每个分区内以记录的 key 排序

5.2 action算子

动作 含义
reduce(func) reduce将RDD中元素前两个传给输入函数,产生一个新的return值,新产生的return值与RDD中下一个元素(第三个元素)组成两个元素,再被传给输入函数,直到最后只有一个值为止。
collect() 在驱动程序中,以数组的形式返回数据集的所有元素
count() 返回RDD的元素个数
first() 返回RDD的第一个元素(类似于take(1))
take(n) 返回一个由数据集的前n个元素组成的数组
takeOrdered(n, [ordering]) 返回自然顺序或者自定义顺序的前 n 个元素
saveAsTextFile(path) 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
saveAsSequenceFile(path) 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
saveAsObjectFile(path) 将数据集的元素,以 Java 序列化的方式保存到指定的目录下
countByKey() 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
foreach(func) 在数据集的每一个元素上,运行函数func
foreachPartition(func) 在数据集的每一个分区上,运行函数func

二.Spark SQL

1.简介

Spark Sql 是 Spark 中用于处理结构化数据的一个模块。它主要用于结构化数据处理和对Spark数据执行类SQL的查询。通过Spark SQL,可以针对不同格式的数据执行ETL操作(如JSON,Parquet,数据库)然后完成特定的查询操作。

2.DataFrame

2.1介绍

DataFrame是一个分布式的行集合
为了支持结构化数据的处理,Spark SQL 提供了新的数据结构 DataFrame。DataFrame 是一个由具名列组成的数据集。它在概念上等同于关系数据库中的表或 R/Python 语言中的 data frame

2.2DataFrame 对比 RDD

DataFrame 和 RDDs 最主要的区别在于一个面向的是结构化数据,一个面向的是非结构化数据,它们内部的数据结构如下:
Snipaste_2021-05-23_10-26-40_看图王.png
DataFrame 内部的有明确 Scheme 结构,即列名、列字段类型都是已知的,这带来的好处是可以减少数据读取以及更好地优化执行计划,从而保证查询效率。

  • DataFrame相当于是一个带着schema的RDD。
  • DataFrame还引入了off-heap,意味着JVM堆以外的内存。
  • RDD是分布式的Java对象的集合。DataFrame是分布式的Row对象的集合。

2.3DataFrame的创建

  • 创教DataFrame先需要一个spark session。 ```python SPARK_APP_NAME = “dataframetest” SPARK_URL = “spark://192.168.19.137:7077”

conf = SparkConf() # 创建spark config对象 config = ( (“spark.app.name”, SPARK_APP_NAME), # 设置启动的spark的app名称,没有提供,将随机产生一个名称 (“spark.executor.memory”, “2g”), # 设置该app启动时占用的内存用量,默认1g (“spark.master”, SPARK_URL), # spark master的地址 (“spark.executor.cores”, “2”), # 设置spark executor使用的CPU核心数

conf.setAll(config)

利用config对象,创建spark session

spark = SparkSession.builder.config(conf=conf).getOrCreate()

  1. - CSV文件创建DataFrame
  2. ```python
  3. df = spark.read.format('csv').option ('header true').load('/iris. csv') # csv文件默认从hdfs找