RDD,指的是一个只读的,可分区的分布式数据集。这个数据集的全部或部分可以缓存在内存中,可在多次计算间重用;
1. RDD 特性
- 转换运算
- 动作运算
- 持久化
- 转换1 运算产生 RDD1 只记录操作命令;
- RDD1 执行转换2 只记录操作命令;
- RDD2 执行转换3 只记录操作命令;
- RDD2 执行转换4 只记录操作命令;
- RDD3 执行动作1 实际执行 转换1+转换2+转换3+动作1;
- RDD4 执行动作2 实际执行 转换1+转换2+转换4+动作2(若 转换1+转换2 前步骤已经执行 则只执行 转换4+动作2);
- local 模式测试 RDD
- local 模式运行 Spark
- 创建新 ipynb 测试 RDD
cd ~/pythonwork/jupyternotebookPYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS="notebook" pyspark
2. RDD 转换运算
2.1 RDD 基本转换运算
intRDD 转换为 List,动作运算,立即执行
intRDD.collect()
2. map> map 运算通过传入的函数将每个元素经过函数运算产生另外一个 RDD- 使用具名函数```python# 创建 addOne 函数def addOne(x):return (x+1)intRDD=sc.parallelize([3,1,2,5,5])intRDD.map(addOne).colletc()
- 使用匿名函数
intRDD.map(lambda x:x+1).collect()
字符串运算
stringRDD=sc.parallelize([“Item1”,”Item2”,”Item3”]) stringRDD.filter(lambda x:”It”in x).collect()
4. distinct```python# 删除重复元素intRDD.distinct().collect()
- randomSplit
将整个集合元素以随机数的方式按比例为多个RDD# 4:6 分配sRDD=intRDD.randomSplit([0.4,0.6])# RDD 1sRDD[0].collect()# RDD 2sRDD[1].collect()
- groupBy
```python
groupBy 按照传入匿名函数规则将数据分为多个 List
gRDD=intRDD.groupBy(lambda x:”even”if(x%2==0) else “odd”).collect()
print(gRDD[0][0],sorted(gRDD[0][1])) print(gRDD[1][0],sorted(gRDD[1][1]))
<a name="9132beda"></a>### 2.2 多个 RDD 转换运算1. 创建范例```pythonintRDD1=sc.parallelize([3,1,2,5,5])intRDD2=sc.parallelize([5,6])intRDD3=sc.parallelize([2,7])
- union
# 并集运算intRDD1.union(intRDD2).union(intRDD3).collect()
- intersection
# 交集运算intRDD1.intersection(intRDD2).collect()
- subtract
# 差集运算intRDD1.subtract(intRDD2).collect()
- cartesian
# 笛卡尔乘积intRDD1.cartesian(intRDD2).collect()
2.3 RDD Key-Value 基本转换运算
Key-Value 运算是 Map/Reduce 的基础
- 创建 Key-Value RDD
# (3,4) key=3 value=4kvRDD1=sc.parallelize([(3,4),(3,6),(5,6),(1,2)])kvRDD1.collect()
- 列出
# 列出 key 值kvRDD1.keys().collect()# 列出 value 值kvRDD1.values().collect()
- 筛选 filter
# 筛选 key <5kvRDD1.filter(lambda keyValue:keyValue[0]<5).collect()# 筛选 value >5kvRDD1.filter(lambda keyValue:keyValue[1]>5).collect()
- mapValues 运算
mapValues 运算针对 RDD 内每一组(Key,Value)运算,产生另外一个RDD# value 每一个值进行平方运算kvRDD1.mapValues(lambda x:x*x).collect()
- sortByKey 排序
# key 值从小到大排序kvRDD1.sortByKey(ascending=True).collect()kvRDD1.sortByKey().collect()# key 值从大到小排序kvRDD1.sortByKey(ascending=False).collect()
- reduceByKey
按照 Key 值进行 reduce 运算,相同 key值,将 value 相加# 相同 key 值相加kvRDD1.reduceByKey(lambda x,y:x+y).collect()# 相同 key 值数据合并kvRDD1.reduceByKey((x,y)=>x+y)
2.4 多个 RDD Key-Value 基本转换运算
- 创建范例 ```python kvRDD1=sc.parallelize([(3,4),(3,6),(5,6),(1,2)]) kvRDD2=sc.parallelize([(3,8)])
kvRDD1.collect() kvRDD2.collect()
2. Key-Value RDD join 运算<br />按相同 key 值 join value```pythonkvRDD1.join(kvRDD2).collect()
- Key-Value leftOuterJoin 运算
左边集合对应到右边集合,显示所有左边集合中的元素kvRDD1.leftOuterJoin(kvRDD2).collect()
- Key-Value RDD rightOuterJoin 运算
从右边的集合对应到左边的集合,并显示所有右边集合中的元素kvRDD1.rightOuterJoin(kvRDD2).collect()
- Key-value subtractByKey 运算
删除相同 key 值的数据kvRDD1.subtractByKey(kvRDD2).collect()
3. RDD 动作运算
3.1 RDD 基本动作运算
- 读取元素
# 取出第1项数据intRDD.first()# 取出第2项数据intRDD.take(2)# 从小到大排序取出前三项数据intRDD.takeOrdered(3)# 从大到小排序取出前三项数据intRDD.takeOrdered(3,key=lambda x:-x)
- 统计数据
# 统计intRDD.stats()# 最小intRDD.min()# 最大intRDD.max()# 标准差intRDD.stdev()# 计数intRDD.count()# 总和intRDD.sum()# 平均intRDD.mean()
3.3 Key-Value 动作运算
- Key-Value first 运算 ```python kvRDD1.first()
取前 n 项数据
kvRDD1.take(2)
2. 统计每个 key 值项数```pythonkvRDD1.countByKey()
- collectAsMap 创建 Key-Value 字典
kv=kvRDD1.collectAsMap()kv
- lookup 运算
输入 key 查找 valuekvRDD1.lookup(3)kvRDD1.lookup(5)
4. Shared variable 共享变量
- 在Spark中,当任何函数传递给转换操作时,它将在远程集群节点上执行;
- 适用于函数中使用的所有变量的不同副本,将这些变量将复制到每台计算机,并且远程计算机上的变量更新不会恢复到驱动程序;
共享变量:
- Broadcast 广播变量
- accumulator 累加器
广播变量允许开发人员在每个节点上缓存一个只读变量,无需每个任务中拷贝。可用于高效的为每个节点拷贝一份大的输入数据集;
4.1 Bordcast 广播变量范例
Broadcast 使用规则:
- 使用 SparkContext.broadcats([初始值]) 创建;
- 使用 .value 读取变量值;
- Broadcas 广播变量创建后不能修改;
4.1.1 不使用广播变量创建范例
# 创建 RDDkvFruit=sc.parallelize([(1,"orange"),(2,"apple"),(3,"banana"),(4,"watermelon")])# 创建字典fruitMap=kvFruit.collectAsMap()# 创建 fruitIdsfruitIds=sc.parallelize([2,4,3,1])# 水果名称字典 fruitNamesfruitNames=fruitIds.map(lambda x:fruitMap[x]).collect()print(str(fruitNames))
范例在并行处理过程中每执行一次转换,都需要将 fruitIds 和 fruitMap 上传到Worker Node 才能执行转换,将耗费大量内存和时间,因此使用 Broadcast 广播变量。
4.1.2 使用 Broadcast 广播变量的范例
# 创建 RDDkvFruit=sc.parallelize([(1,"orange"),(2,"apple"),(3,"banana"),(4,"watermelon")])# 创建字典fruitMap=kvFruit.collectAsMap()# 创建 fruitMap 字典转换为 bcFruitMap 广播变量bcFruitMap=sc.broadcast(fruitMap)# 创建 fruitIdsfruitIds=sc.parallelize([2,4,3,1])# 使用 bcFruitMap.value 字典进行转换fruitNames=fruitIds.map(lambda x:bcFruitMap.value[x]).collect()print(str(fruitNames))
并行处理中 bcFruitMap 广播变量将会传送到 Worker Node 机器,并存储在内存中,后续 Worker Node都可以使用这个 bcFruitMap 广播变量执行转换,以节省内存和传送时间;
4.2 accumulator 累加器
- SparkContext.accumulator([初始值]) 创建;
- 使用 .add() 进行累加;
- 在 task 中不能读取累加器的值;
- 只有在驱动程序,即循环外才可用 .value 读取累加器结果
范例:
intRDD=sc.parallelize([3,1,2,5,5])# 创建累加器total=sc.accumulator(0.0)num=sc.accumulator(0)intRDD.foreach(lambda i:[total.add(1),num.add(1)])print(total.value)print(num.value)
5. RDD Persistence 持久化
Spark 持久化用于将重复运算的RDD存储在内存中,来提高运算效率
- RDD.persist(存储等级) # 默认 MEMORY_ONLY 存储在内存中
- RDD.unpersist() # 取消持久化
- 持久化
intRDDMemory=sc.parallelize([3,1,2,5,5])intRDDMemory.persist()# 检查是否已经缓存intRDDMemory.is_cached
- 取消持久化
intRDDMemory.unpersist()# 检查是否已经缓存intRDDMemory.is_cached
6. Spark 创建 WordCount
- 创建测试文件 ```bash mkdir -p ~/pythonwork/jupyternotebook/wordcount cd ~/pythonwork/jupyternotebook/wordcount
vim test.txt
Apple Apple Orange Banana Grape Grape
2. 使用 jupyter notebook 进行测试```python# 读取文件textFile=sc.textFile("test.txt")# flatMap 空格符分割单词,并读取stringRDD=textFile.flatMap(lambda line:line.split(" "))# map reduce 计算单词出现次数countsRDD=stringRDD.map(lambda word:(word,1)).reduceByKey(lambda x,y:x+y)# 保存计算结果countsRDD.saveAsTextFile("output")
查看测试结果
