Transformation算子
RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。
map
rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)
# 定义方法, 作为算子的传入函数体
def add(data):
return data * 10
print(rdd.map(add).collect())
# 更简单的方式 是定义lambda表达式来写匿名函数
print(rdd.map(lambda data: data * 10).collect())
"""
对于算子的接受函数来说, 两种方法都可以
lambda表达式 适用于 一行代码就搞定的函数体, 如果是多行, 需要定义独立的方法.
"""
[10, 20, 30, 40, 50, 60]
[10, 20, 30, 40, 50, 60]
flatMap
功能:对rdd执行map操作,然后进行解除嵌套操作
rdd = sc.parallelize(["hadoop spark hadoop", "spark hadoop hadoop", "hadoop flink spark"])
rdd2 = rdd.map(lambda line: line.split(" "))
print(rdd2.collect())
# 得到所有的单词, 组成RDD, flatMap的传入参数 和map一致, 就是给map逻辑用的, 解除嵌套无需逻辑(传参)
rdd3 = rdd.flatMap(lambda line: line.split(" "))
print(rdd3.collect())
[['hadoop', 'spark', 'hadoop'], ['spark', 'hadoop', 'hadoop'], ['hadoop', 'flink', 'spark']]
['hadoop', 'spark', 'hadoop', 'spark', 'hadoop', 'hadoop', 'hadoop', 'flink', 'spark']
reduceByKey
功能:针对KV型RDD,自动按照key分组,然后根据聚合逻辑,完成组内数据value的聚合操作。
rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 1), ('a', 1)])
# reduceByKey 对相同key 的数据执行聚合相加
print(rdd.reduceByKey(lambda a, b: a + b).collect())
[('b', 2), ('a', 3)]
mapValues
功能:针对KV型RDD,对内部的二元元组的Value执行map操作
rdd = sc.parallelize([("a",1),("a",2),("a",3),("b",3),("b",5)])
print(rdd.mapValues(lambda x:x*10).collect())
[('a', 10), ('a', 20), ('a', 30), ('b', 30), ('b', 50)]
groupBy
功能:将rdd的数据进行分组
rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 2), ('b', 3)])
# 通过groupBy对数据进行分组
# groupBy传入的函数的 意思是: 通过这个函数, 确定按照谁来分组(返回谁即可)
# 分组规则 和SQL是一致的, 也就是相同的在一个组(Hash分组)
result = rdd.groupBy(lambda t: t[0])
print(result.map(lambda t:(t[0], list(t[1]))).collect())
[('b', [('b', 1), ('b', 2), ('b', 3)]), ('a', [('a', 1), ('a', 1)])]
filter
功能:过滤想要的数据进行保留
rdd = sc.parallelize([1, 2, 3, 4, 5, 6])
# 通过Filter算子, 过滤奇数
result = rdd.filter(lambda x: x % 2 == 1)
print(result.collect())
[1, 3, 5]
distinct
功能:对RDD数据进行去重,返回新的RDD
rdd = sc.parallelize([1, 1, 1, 2, 2, 2, 3, 3, 3])
# distinct 进行RDD数据去重操作
print(rdd.distinct().collect())
rdd2 = sc.parallelize([('a', 1), ('a', 1), ('a', 3)])
print(rdd2.distinct().collect())
[1, 2, 3]
[('a', 1), ('a', 3)]
union
rdd1 = sc.parallelize([1, 1, 3, 3])
rdd2 = sc.parallelize(["a", "b", "a"])
rdd3 = rdd1.union(rdd2)
print(rdd3.collect())
"""
1. 可以看到 union算子是不会去重的
2. RDD的类型不同也是可以合并的.
"""
[1, 1, 3, 3, 'a', 'b', 'a']
join
功能:对两个RDD执行JOIN操作(可实现SQL的内\外连接)
注意:join算子只能用于二元元组
#rdd.join(other_rdd) # 内连接
#rdd.leftOuterJoin(other_rdd) # 左外
#rdd.rightOuterJoin(other_rdd) #右外
rdd1 = sc.parallelize([ (1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhaoliu") ])
rdd2 = sc.parallelize([ (1001, "销售部"), (1002, "科技部")])
# 通过join算子来进行rdd之间的关联
# 对于join算子来说 关联条件 按照二元元组的key来进行关联
print(rdd1.join(rdd2).collect())
# 左外连接, 右外连接 可以更换一下rdd的顺序 或者调用rightOuterJoin即可
print(rdd1.leftOuterJoin(rdd2).collect())
[(1001, ('zhangsan', '销售部')), (1002, ('lisi', '科技部'))]
[(1001, ('zhangsan', '销售部')), (1002, ('lisi', '科技部')), (1003, ('wangwu', None)), (1004, ('zhaoliu', None))]
intersection
功能:求2个rdd的交集,返回一个新rdd
rdd1 = sc.parallelize([('a', 1), ('a', 3)])
rdd2 = sc.parallelize([('a', 1), ('b', 3)])
# 通过intersection算子求RDD之间的交集, 将交集取出 返回新RDD
rdd3 = rdd1.intersection(rdd2)
print(rdd3.collect())
[('a', 1)]
glom
功能:将RDD的数据,加上嵌套,这个嵌套按照分区来进行
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 2)
print(rdd.glom().flatMap(lambda x: x).collect())
[1, 2, 3, 4, 5, 6, 7, 8, 9]
groupByKey
rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 1), ('b', 1)])
rdd2 = rdd.groupByKey()
print(rdd2.map(lambda x: (x[0], list(x[1]))).collect())
RDD.sortBy(keyfunc, ascending=True,numPartitions=None)
rdd = sc.parallelize([('c', 3), ('f', 1), ('b', 11), ('c', 3), ('a', 1), ('c', 5), ('e', 1), ('n', 9), ('a', 1)], 3)
# 使用sortBy对rdd执行排序
# 按照value 数字进行排序
# 参数1函数, 表示的是 , 告知Spark 按照数据的哪个列进行排序
# 参数2: True表示升序 False表示降序
# 参数3: 排序的分区数
"""注意: 如果要全局有序, 排序分区数请设置为1"""
print(rdd.sortBy(lambda x: x[1], ascending=True, numPartitions=1).collect())
# 按照key来进行排序
print(rdd.sortBy(lambda x: x[0], ascending=False, numPartitions=1).collect())
Action算子
定义: 返回值 不是RDD的就是action算子.
对于这两类算子来说, Transformation算子,相当于在构建执行计划, action是一个
指令让这个执行计划开始工作.
如果没有action, Transformation算子之间的迭代关系,就是一个没有通电的 流水线.
只有action到来,这个数据处理的流水线才开始工作.
countByKey
功能:统计key出现的次数
collect
功能:将RDD各个分区内的数据统一收集到Driver中,形成一个List对象
fold
功能:和reduce一样,接受传入逻辑进行聚合,聚合是带初始值的
rdd = sc.parallelize(range(1, 10), 3)
print(rdd.fold(10, lambda a, b: a + b))
from operator import add
sc.parallelize(range(1, 10), 3).fold(10, add)
85
1+2+3+10=16
4+5+6+10=25
7+8+9+10=34
16+25+34+10=85
RDD.first()
功能,取出rdd的第一个元素
RDD.takeSample(withReplacement, num, seed=None)
功能:随机取出一个
RDD.takeOrdered(num, key=None)
功能:取出前N个数
sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6)
[1, 2, 3, 4, 5, 6]
sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x)
[10, 9, 7, 6, 5, 4]
控制算子
Spark中控制算子也是懒执行的,需要Action算子触发才能执行,主要是为了对数据进行缓存。。当有Action算子出现时,他才会真正的执行
其中延迟计算就是懒执行的意思,就像是创建了一个视图,他并不是把查询好的数据放入视图了,而是当你需要这些数据时,查看视图时,他才执行定义视图时候的SQL语句
[
](https://blog.csdn.net/weixin_44694973/article/details/95616615)
