今日内容:
- 1- 如何构建RDD
- 2- RDD算子相关的操作(记录)
- 3- 综合案例
1. 如何构建RDD
构建RDD对象的方式主要有二种:
通过 parallelize('''') API 来构建RDD对象
通过加载外部数据集的方式构建: textFile(....)
1.1 通过并行化的方式来构建RDD
代码演示:
# 演示如何创建RDD对象" 方式一
from pyspark import SparkContext, SparkConf
import os
# 锁定远端python版本:
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print("演示:如何创建RDD")
# 1- 构建SparkContext对象
conf = SparkConf().setMaster("local[3]").setAppName('create_rdd')
sc = SparkContext(conf=conf)
# 2- 构建RDD对象
rdd_init = sc.parallelize(['张三','李四','王五','赵六','田七','周八','李九'],5)
# 需求:给每一个元素添加一个 10序号
rdd_map = rdd_init.map(lambda name: name + '10')
# 如何查看RDD中每个分区的数据,以及分区的数量
print(rdd_init.getNumPartitions())
print(rdd_init.glom().collect())
print(rdd_map.glom().collect())
说明:
验证RDD是存在分区的, 而且每一个计算函数作用在每个分区上进行处理
如何查看RDD有多少个分区: rdd.getNumPartitions()
如何查看RDD中每一个分区的数据: rdd.glom()
当采用本地模拟数据(并行方式)构建RDD的时候, RDD分区数量取决于:
1- 默认: setMaster("local[N]") 取决于 N的值, 当N为*的时候, 表示为当前节点的CPU核心数
2- 同时也可以通过parallelize算子来设置分区数量, 设置多少就为多少个分区
1.2 通过读取外部数据方式构建RDD
代码实现:
# 演示: 如何构建RDD 方式二 通过外部文件的方式构建
from pyspark import SparkContext, SparkConf
import os
# 锁定远端python版本:
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print("演示构建RDD的方式二:")
# 1- 创建SparkContext对象:
conf = SparkConf().setMaster('local[*]').setAppName('create_rdd_2')
sc = SparkContext(conf=conf)
# 2- 读取外部的文件数据
# 路径写法: 协议 + 路径
# 本地路径: file:///
# HDFS: hdfs://node1:8020/
rdd_init = sc.textFile('file:///export/data/workspace/sz30_pyspark_parent/_02_pyspark_core/data/')
#rdd_init = sc.textFile('hdfs://node1:8020/pyspark_data/words.txt',15)
print(rdd_init.getNumPartitions())
print(rdd_init.glom().collect())
当读取后, 分区 过多, 在写入到HDFS的时候, 就会产生大量的小文件, 如果由于数据源文件比较多导致的, 可以采用在读取数据的时候, 尽量减少分区数:
# 演示: 如何构建RDD 方式二 小文件合并操作
from pyspark import SparkContext, SparkConf
import os
# 锁定远端python版本:
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print("演示构建RDD的方式二:")
# 1- 创建SparkContext对象:
conf = SparkConf().setMaster('local[*]').setAppName('create_rdd_2')
sc = SparkContext(conf=conf)
# 2- 读取外部的文件数据
# 路径写法: 协议 + 路径
# 本地路径: file:///
# HDFS: hdfs://node1:8020/
# wholeTextFiles: 尽可能的减少分区数量, 从而减少最终输出到目的地文件数量
rdd_init = sc.wholeTextFiles('file:///export/data/workspace/sz30_pyspark_parent/_02_pyspark_core/data',1)
print(rdd_init.getNumPartitions())
print(rdd_init.glom().collect())
关于textFile在读取数据的时候, 分区数量确定:
确定分区数量取决于两个公式:
defaultMinPartitions = min(default.parallelism,2) | 设置minPartition的值
其中:
default.parallelism = setMaster('local[N]') = N值
设置minPartition的值 : textFile(path,minPartition)
如果设置minPartition的值, defaultMinPartitions 等于此值, 否则按照min(default.parallelism,2) 来计算
本地文件:
rdd分区数量 = max(文件分片数量,defaultMinPartitions)
HDFS文件:
rdd分区数量 = max(文件block块数量,defaultMinPartitions)
2. RDD算子相关的操作
在spark中, 将支持传递函数的或者说具有一些特殊功能的方法或者函数称为算子
2.1 RDD算子分类
在整个RDD算子中, 主要可以将算子分为两大类: transformation(转换算子) 和 action(动作算子)
转换算子:
1- 返回一个新的RDD
2- 所有的转换算子都是惰性的, 只有遇到action算子才会触发执行
3- RDD并不存储实际的数据, 只存储转换的规则, 当遇到action算子后, 根据规则对数据进行处理即可
动作算子:
1- 不会返回一个RDD, 要不然没有返回值, 要不返回其他的
2- 所有动作算子都是立即执行, 在代码中每一个action都会去触发一个JOB的任务
转换算子:
action算子:
整个Spark所有的RDD的算子文档: https://spark.apache.org/docs/3.1.2/api/python/reference/pyspark.html#rdd-apis
2.2 RDD的Transformation算子操作
值类型的算子: 主要是针对value进行处理相关的算子
- Map算子: 一对一的转换操作
- 作用: 根据用户传入的自定义转换规则(函数) 将数据一对一的转换称为一个新的RDD
- map算子是作用在每一个分区上每一个元素上,对每一个元素执行自定义的函数
案例演示:
rdd_init = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
# 需求: 对元素中每一个数值进行 + 1 返回
rdd_init.map(lambda num: num + 1).collect()
结果为:
[2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
说明: 上述的自定义的匿名函数(lambda表达式),同样也可以使用普通函数来使用
def map_fn(num) -> int:
return num + 1
rdd_init.map(map_fn).collect()
- groupBy算子: 用于执行分组操作
- 格式: groupBy(fn)
- 作用: 根据传入的函数对数据进行分组操作, 每一组都是一个迭代器(列表)
案例演示:
rdd_init = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
# 需求: 将数据分为偶数和奇数两部分
rdd_init.groupBy(lambda num : 'o' if(num % 2 == 0) else 'j').mapValues(list).collect()
结果为:
[('j', [1, 3, 5, 7, 9]), ('o', [2, 4, 6, 8, 10])]
说明:
mapValues(list): 对kv类型进行处理, 主要对其kv中value数据进行转换操作, 参数可以写一个列表对象即可 表示直接将其转换为list列表
- filter算子:
- 格式: filter(fn)
- 作用: 根据传入自定义函数对数据进行过滤操作, 自定义函数用于定义过滤规则
- 注意: 自定义函数返回false 表示过滤掉, 返回true表示保留此数据
案例演示:
rdd_init = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
# 需求: 将>5的数据返回来
rdd_init.filter(lambda num: num > 5).collect()
结果为:
[6, 7, 8, 9, 10]
说明:
需要保留那些数据, 规则写: 保留数据的规则
- flatMap算子:
- 格式: flatMap(fn)
- 作用: 先对数据根据传入的函数进行map操作, 然后对map的结果进行扁平化
案例演示:
rdd_init = sc.parallelize(['张三 李四 王五','赵六 田七 周八'])
# 需求: 将每一个名字提取出来, 形成一个列表: 结果: ['张三', '李四, '王五','赵六', '田七', '周八']
rdd_init.flatMap(lambda names: names.split()).collect()
结果:
['张三', '李四', '王五', '赵六', '田七', '周八']
双值类型的相关算子:
- union 和 intersection :
- union: 并集 , 用于计算两个RDD的并集
- intersection: 交集, 用于计算两个RDD的交集
案例演示:
rdd1 = sc.parallelize([1,2,3,9,10])
rdd2 = sc.parallelize([1,4,5,6,7,10])
计算交集:
rdd1.intersection(rdd2).collect() 结果: [1, 10]
计算并集:
rdd1.union(rdd2).collect() 结果: [1, 2, 3, 9, 10, 1, 4, 5, 6, 7, 10]
rdd1.union(rdd2).distinct().collect() 结果: [4, 1, 9, 5, 2, 10, 6, 3, 7]
kv类型的相关的算子:
- groupByKey算子:
- 格式: groupByKey()
- 作用: 根据Key进行分组, 将value合并为一个迭代器(列表)
案例:
rdd_init = sc.parallelize([('c01','张三'),('c02','李四'),('c01','王五'),('c03','赵六'),('c02','田七'),('c01','周八')])
# 需求: 根据key分组,将同一个班级的数据进行聚合在一起
rdd_init.groupByKey().mapValues(list).collect()
结果:
[('c01', ['张三', '王五', '周八']), ('c02', ['李四', '田七']), ('c03', ['赵六']
- reduceBykey算子:
- 格式: reduceByKey(fn)
- 作用: 根据key进行分组, 将value合并为一个迭代器(列表).然后根据传入的自定义函数, 对每组中value数据进行聚合处理
案例:
rdd_init = sc.parallelize([('c01','张三'),('c02','李四'),('c01','王五'),('c03','赵六'),('c02','田七'),('c01','周八')])
# 需求: 根据key分组,将同一个班级的数据进行聚合在一起, 统计每组中有多少个
rdd_map = rdd_init.map(lambda tup: (tup[0],1))
rdd_map.reduceByKey(lambda agg,curr: agg + curr).collect()
结果:[('c01', 3), ('c02', 2), ('c03', 1)]
- sortByKey算子:
- 格式: sortByKey()
- 作用: 根据key进行排序操作, 默认为升序, 如果要倒序排列,需要设置 asc参数为 False
案例:
rdd_init = sc.parallelize([(100,'pv'),(300,'pv'),(200,'pv'),(500,'pv'),(800,'pv')])
# 需求: 根据key进行倒序排序
rdd_init.sortByKey().collect()
结果: [(100, 'pv'), (200, 'pv'), (300, 'pv'), (500, 'pv'), (800, 'pv')]
rdd_init.sortByKey(False).collect()
结果: [(800, 'pv'), (500, 'pv'), (300, 'pv'), (200, 'pv'), (100, 'pv')]
- countByValue(了解)
- 格式: countByValue()
- 作用: 对value进行count的数量统计
rdd_init = sc.parallelize([1,2,3,1,3,2,1,3,2,1,9,10])
rdd_init.countByValue()
结果为:
{1: 4, 2: 3, 3: 3, 9: 1, 10: 1}
2.3 RDD的action算子
- collect():
- 格式: collect()
- 作用: 将各个分区的处理完成的数据收集回来, 统一的放置在一个列表中
- reduce():
- 格式: reduce(fn)
- 作用: 对数据进行聚合操作, 根据传入的fn完成相关聚合统计
案例:
rdd_init = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
# 需求: 将整个结果进行求和计算:
rdd_init.reduce(lambda agg,curr: agg + curr)
结果: 55
- first:
- 格式: first()
- 作用: 获取第一个数据
案例:
rdd_init = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
rdd_init.first()
# 1
- take:
- 格式: take(N)
- 作用: 获取前N个数据 类似于 SQL中limit
案例:
rdd_init = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
rdd_init.take(5)
# [1, 2, 3, 4, 5]
- top:
- 格式: top(N,[fn])
- 作用: 首先对数据进行降序排序, 排序后, 获取前N个元素, 同时支持自定义排序规则
- 对于单值类型的数据, 直接降序排序, 对于kv类型的, 默认按照k进行倒序排序, 也可以自定义排序方案
案例:
rdd_init = sc.parallelize([5,6,1,2,3,4,7,9,10,8])
rdd_init.top(5)
# [10, 9, 8, 7, 6]
- count:
- 格式: count()
- 作用: 获取一共返回多少个元素
案例:
rdd_init = sc.parallelize([5,6,1,2,3,4,7,9,10,8])
rdd_init.count()
# 10
- takeSample:
- 格式: takeSample(是否允许重复抽取, 抽样数量[, 种子值])
- 作用: 用于对数据进行抽样
- 说明: 一旦设置种子值, 会导致每次采样的结果都是一致的
案例:
rdd_init = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
# 进行抽样
>>> rdd_init.takeSample(False,5,1)
[7, 9, 10, 8, 6]
>>> rdd_init.takeSample(False,5,1)
[7, 9, 10, 8, 6]
>>> rdd_init.takeSample(False,5)
[8, 3, 6, 5, 2]
>>> rdd_init.takeSample(False,5)
[1, 4, 7, 10, 2]
>>> rdd_init.takeSample(True,5)
[9, 1, 1, 1, 6]
- foreach:
- 格式: foreach(fn)
- 作用: 对数据执行遍历操作, fn表示每次遍历要执行什么操作,没有返回值的
案例:
rdd_init = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
# 对数据进行遍历打印
rdd_init.foreach(lambda num:print(num))
2.4 RDD的重要算子
- 基本函数:
- 分区函数:
分区函数: 指的对rdd中每一个分区进行处理操作
普通函数: 指的对RDD中每一个分区下每一个元素进行操作
比如说: map 和 mapPartitions foreach 和foreachPartition
下面的两个图描述了分区函数和普通函数区别, 一个对每个分区中每个元素执行自定义函数, 而分区函数是直接将整个分区数据传递给了自定义函数, 从而导致 分区函数触发执行自定义函数次数更低一些, 普通函数自定义函数触发次数更多
分区函数的好处:
减少自定义函数触发的次数, 如果说自定义函数中有一些资源相关的操作(比如IO), 触发次数减少了 资源调度少了
举例说明:
比如说, 需要将每一个数据依次的写入到数据库中, 此时可以通过foreach相关函数来处理, 遍历一个, 然后将数据在自定义函数开启连接数据库连接 将数据写入到数据库中, 有多少个元素, 就需要开启多少次的连接
但是如何使用foreachPartition, 可能还需要开启2次即可解决问题
以后在工作中, 使用相关的算子的时候, 如果有分区函数, 建议优先使用分区函数
Map算子:
mapPartition算子:
map和mapPartitions
案例演示:
rdd_init = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
# 需求: 对元素中每一个数值进行 + 1 返回
# map算子:
rdd_init.map(lambda num:num + 1).collect()
# mapPartitions
# 原始写法
def fn2(iter):
arr = []
for num in iter:
arr.append(num + 1)
return arr
# 便捷写法
def fn1(iter):
for num in iter:
yield num + 1
rdd_init.mapPartitions(fn1).collect()
foreach 和 foreachPartition
案例:
rdd_init = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
# 对数据进行遍历打印
rdd_init.foreach(lambda num:print(num))
# foreachPartition
def fn1(iter):
for num in iter:
print(num)
rdd_init.foreachPartition(fn1)
- 重分区的函数
作用: 对RDD中分区数量进行增大或者减少
在什么时候需要增大分区数量?
一个分区最终是通过一个线程来处理,当分区中数据量比较大的时候, 也就意味每个线程处理的数据量比较大,此时可以增大分区数量, 从而减少每个线程处理的数据量 提高并行度 提升效率
在什么时候需要减少分区数量呢?
当每个分区中数据量比较少的时候, 过多的分区增加了资源损耗, 此时可以减少分区的数量,减少资源的浪费
当需要将数据写出到文件系统的时候, 分区越多意味着输出的文件数量越多, 减少分区数量 从而减少输出文件数量,避免小文件过多
repartition算子:
作用; 增加或者减少分区的数量, 此算子会产生shuffle
格式: reparation(N)
演示:
rdd_init = sc.parallelize([1,2,3,4,5,6,7,8,9,10],3)
rdd_init.glom().collect()
各分区的数据: [[1, 2, 3], [4, 5, 6], [7, 8, 9, 10]]
# 将分区变更为 5个
rdd = rdd_init.repartition(5)
rdd.getNumPartitions()
# 5
rdd.glom().collect()
# [[], [1, 2, 3], [7, 8, 9, 10], [4, 5, 6], []]
coalesce算子:
作用: 减少分区, 而且不会产生shuffle
格式: coalesce(N,ifShuffle=False) 参数2 表示是否有shuffle, 默认为False 表示没有 此时只能减少分区, 如果要增大分区, 需要设置为True
案例:
rdd_init = sc.parallelize([1,2,3,4,5,6,7,8,9,10],3)
# 首先看每一个分区的数据
rdd_init.glom().collect()
结果: [[1, 2, 3], [4, 5, 6], [7, 8, 9, 10]]
# 将三个分区, 合并为二个分区
rdd_init.coalesce(2).glom().collect()
结果: [[1, 2, 3], [4, 5, 6, 7, 8, 9, 10]]
尝试增大分区:
rdd_init.coalesce(5).glom().collect()
结果: [[1, 2, 3], [4, 5, 6], [7, 8, 9, 10]] 发现依然还是三个分区 所以无法增大分区
rdd_init.coalesce(5,True).glom().collect()
结果: [[], [1, 2, 3], [7, 8, 9, 10], [4, 5, 6], []] 参数2设置为True即可增加分区了
总结说明:
reparation 是coalesce的一种当参数2为True一种简写
coalesce 默认只能进行减少分区, 如果要增大分区, 需要将参数2设置为True, 一旦设置为True就会产生shuffle
区别:
1) 两个算子都是用于重分区的算子, 一个能增大也能减少, 而coalesce默认只能减少分区
2) reparation默认会产生shuffle 而coalesce默认没有shuffle
3) reparation 是coalesce的一种当参数2为True一种简写
专门用于针对kv类型的调整分区的函数: partitionBy
格式:
partitionBy(N[,fn]) 参数2是用于设置分区规则
演示:
rdd_init = sc.parallelize([1,2,3,4,5,6,7,8,9,10],3)
rdd_map = rdd_init.map(lambda num: (num,1))
#查看每个分区的数据
rdd_map.glom().collect()
结果: [
[(1, 1), (2, 1), (3, 1)],
[(4, 1), (5, 1), (6, 1)],
[(7, 1), (8, 1), (9, 1), (10, 1)]
]
# 进行重分区操作:
rdd_map.partitionBy(2).glom().collect()
结果: [[(2, 1), (4, 1), (6, 1), (8, 1), (10, 1)], [(1, 1), (3, 1), (5, 1), (7, 1), (9, 1)]]
说明: 默认根据key的hash值进行分区操作
rdd_map.partitionBy(2,lambda num : 0 if(num > 5) else 1).glom().collect()
结果: [
[(6, 1), (7, 1), (8, 1), (9, 1), (10, 1)],
[(1, 1), (2, 1), (3, 1), (4, 1), (5, 1)]
]
聚合函数:
第一类: 单值聚合算子
reduce | fold | aggregate
格式:
reduce(fn) : 根据传入的自定义函数, 对整个数据集进行聚合统计 默认 agg初始值为 0
fold(default,fn): 根据传入的自定义函数, 对整个数据集进行聚合统计 并且可以设置agg的初始值
aggregate(default,fn1,fn2): 参数1 表示agg初始值 参数2: 对每个分区执行函数 参数3 : 对各个分区结果进行聚合函数
演示:
rdd_init = sc.parallelize([1,2,3,4,5,6,7,8,9,10]) 只有2个分区
# 需求: 对所有的数据进行求和计算
# reduce
rdd_init.reduce(lambda agg,curr: agg+curr) 结果 55
# fold
rdd_init.fold(10,lambda agg,curr: agg+curr) 结果 85
# aggregate(default,fn1,fn2)
def fn1(agg,curr):
return agg+curr
def fn2(agg,curr):
return agg+curr
rdd_init.aggregate(10,fn1,fn2) 结果为 85
说明: 当参数2和参数3处理逻辑是一样的, 可以简写为 fold方案, 当初始值为0的时候, 可以简写为reduce
第二类: kv类型聚合算子
相关算子: groupByKey() reduceByKey() foldByKey() aggregateByKey()
说明:
reduceByKey() foldByKey() aggregateByKey() 和 reduce | fold | aggregate 都是类型的时候, 只是比单值多了一个分组操作, 对每一个组内部聚合聚合
groupByKey 是仅分组 无聚合计算
面试题: reduceBykey === groupByKey + reduce 请问两种处理模式, 那种效率更高一些呢?
**reduceBykey的处理逻辑**
**groupByKey + reduce处理逻辑:**
肯定是reduceBykey效率更高一些, 因为可以现在每个分区中执行提前聚合计算, 从而减少中间shuffle的数据量 提升效率
关联函数:
涉及到函数格式:
join() : 内连接
leftOuterJoin() : 左关联
rightOuterJoin(): 右关联
fullOuterJoin() : 全外关联
针对数据类型: kv
案例:
rdd1 = sc.parallelize([('c01','大数据一班'),('c02','大数据二班'),('c03','大数据三班')])
rdd2 = sc.parallelize([('c01','张三'),('c02','李四'),('c01','王五'),('c04','赵六'),('c02','田七'),('c01','周八')])
# Join
rdd1.join(rdd2).collect()
结果: [
('c01', ('大数据一班', '张三')),
('c01', ('大数据一班', '王五')),
('c01', ('大数据一班', '周八')),
('c02', ('大数据二班', '李四')),
('c02', ('大数据二班', '田七'))
]
# leftJoin
rdd1.leftOuterJoin(rdd2).collect()
结果:
[
('c01', ('大数据一班', '张三')),
('c01', ('大数据一班', '王五')),
('c01', ('大数据一班', '周八')),
('c02', ('大数据二班', '李四')),
('c02', ('大数据二班', '田七')),
('c03', ('大数据三班', None))
]
# rightJoin
rdd1.rightOuterJoin(rdd2).collect()
结果:
[
('c04', (None, '赵六')),
('c01', ('大数据一班', '张三')),
('c01', ('大数据一班', '王五')),
('c01', ('大数据一班', '周八')),
('c02', ('大数据二班', '李四')),
('c02', ('大数据二班', '田七'))
]
# fullJoin
rdd1.fullOuterJoin(rdd2).collect()
结果:
[
('c04', (None, '赵六')),
('c01', ('大数据一班', '张三')),
('c01', ('大数据一班', '王五')),
('c01', ('大数据一班', '周八')),
('c02', ('大数据二班', '李四')),
('c02', ('大数据二班', '田七')),
('c03', ('大数据三班', None))
]
3 综合案例
3.1 搜索案例
数据集介绍:
访问时间 用户id []里面是用户输入搜索内容 url结果排名 用户点击页面排序 用户点击URL
字段与字段之间的分隔符号为 \t和空格 (制表符号)
需求一: 统计每个关键词出现了多少次
需求二: 统计每个用户每个搜索词点击的次数
需求三: 统计每个小时点击次数
3.2 点击流日志分析
点击流日志数据结构说明:
1- ip地址:
2- 用户标识cookie信息(- - 标识没有)
3- 访问时间(时间,时区)
4- 请求方式(get / post /Head ....)
5- 请求的URL路径
6- 请求的协议
7- 请求状态码: 200 成功
8- 响应的字节长度
9- 来源的URL( - 标识直接访问, 不是从某个页面跳转来的)
10- 访问的浏览器标识
- 需求一: 统计pv(访问次数) 和 uv(用户数量)
- 需求二: 统计每个访问的URL的次数, 找到前10个