RDD,指的是一个只读的,可分区的分布式数据集。这个数据集的全部或部分可以缓存在内存中,可在多次计算间重用;
1. RDD 特性
- 转换运算
- 动作运算
- 持久化
- 转换1 运算产生 RDD1 只记录操作命令;
- RDD1 执行转换2 只记录操作命令;
- RDD2 执行转换3 只记录操作命令;
- RDD2 执行转换4 只记录操作命令;
- RDD3 执行动作1 实际执行 转换1+转换2+转换3+动作1;
- RDD4 执行动作2 实际执行 转换1+转换2+转换4+动作2(若 转换1+转换2 前步骤已经执行 则只执行 转换4+动作2);
- local 模式测试 RDD
- local 模式运行 Spark
- 创建新 ipynb 测试 RDD
cd ~/pythonwork/jupyternotebook
PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS="notebook" pyspark
2. RDD 转换运算
2.1 RDD 基本转换运算
intRDD 转换为 List,动作运算,立即执行
intRDD.collect()
2. map
> map 运算通过传入的函数将每个元素经过函数运算产生另外一个 RDD
- 使用具名函数
```python
# 创建 addOne 函数
def addOne(x):
return (x+1)
intRDD=sc.parallelize([3,1,2,5,5])
intRDD.map(addOne).colletc()
- 使用匿名函数
intRDD.map(lambda x:x+1).collect()
字符串运算
stringRDD=sc.parallelize([“Item1”,”Item2”,”Item3”]) stringRDD.filter(lambda x:”It”in x).collect()
4. distinct
```python
# 删除重复元素
intRDD.distinct().collect()
- randomSplit
将整个集合元素以随机数的方式按比例为多个RDD# 4:6 分配
sRDD=intRDD.randomSplit([0.4,0.6])
# RDD 1
sRDD[0].collect()
# RDD 2
sRDD[1].collect()
- groupBy
```python
groupBy 按照传入匿名函数规则将数据分为多个 List
gRDD=intRDD.groupBy(lambda x:”even”if(x%2==0) else “odd”).collect()
print(gRDD[0][0],sorted(gRDD[0][1])) print(gRDD[1][0],sorted(gRDD[1][1]))
<a name="9132beda"></a>
### 2.2 多个 RDD 转换运算
1. 创建范例
```python
intRDD1=sc.parallelize([3,1,2,5,5])
intRDD2=sc.parallelize([5,6])
intRDD3=sc.parallelize([2,7])
- union
# 并集运算
intRDD1.union(intRDD2).union(intRDD3).collect()
- intersection
# 交集运算
intRDD1.intersection(intRDD2).collect()
- subtract
# 差集运算
intRDD1.subtract(intRDD2).collect()
- cartesian
# 笛卡尔乘积
intRDD1.cartesian(intRDD2).collect()
2.3 RDD Key-Value 基本转换运算
Key-Value 运算是 Map/Reduce 的基础
- 创建 Key-Value RDD
# (3,4) key=3 value=4
kvRDD1=sc.parallelize([(3,4),(3,6),(5,6),(1,2)])
kvRDD1.collect()
- 列出
# 列出 key 值
kvRDD1.keys().collect()
# 列出 value 值
kvRDD1.values().collect()
- 筛选 filter
# 筛选 key <5
kvRDD1.filter(lambda keyValue:keyValue[0]<5).collect()
# 筛选 value >5
kvRDD1.filter(lambda keyValue:keyValue[1]>5).collect()
- mapValues 运算
mapValues 运算针对 RDD 内每一组(Key,Value)运算,产生另外一个RDD# value 每一个值进行平方运算
kvRDD1.mapValues(lambda x:x*x).collect()
- sortByKey 排序
# key 值从小到大排序
kvRDD1.sortByKey(ascending=True).collect()
kvRDD1.sortByKey().collect()
# key 值从大到小排序
kvRDD1.sortByKey(ascending=False).collect()
- reduceByKey
按照 Key 值进行 reduce 运算,相同 key值,将 value 相加# 相同 key 值相加
kvRDD1.reduceByKey(lambda x,y:x+y).collect()
# 相同 key 值数据合并
kvRDD1.reduceByKey((x,y)=>x+y)
2.4 多个 RDD Key-Value 基本转换运算
- 创建范例 ```python kvRDD1=sc.parallelize([(3,4),(3,6),(5,6),(1,2)]) kvRDD2=sc.parallelize([(3,8)])
kvRDD1.collect() kvRDD2.collect()
2. Key-Value RDD join 运算<br />按相同 key 值 join value
```python
kvRDD1.join(kvRDD2).collect()
- Key-Value leftOuterJoin 运算
左边集合对应到右边集合,显示所有左边集合中的元素kvRDD1.leftOuterJoin(kvRDD2).collect()
- Key-Value RDD rightOuterJoin 运算
从右边的集合对应到左边的集合,并显示所有右边集合中的元素kvRDD1.rightOuterJoin(kvRDD2).collect()
- Key-value subtractByKey 运算
删除相同 key 值的数据kvRDD1.subtractByKey(kvRDD2).collect()
3. RDD 动作运算
3.1 RDD 基本动作运算
- 读取元素
# 取出第1项数据
intRDD.first()
# 取出第2项数据
intRDD.take(2)
# 从小到大排序取出前三项数据
intRDD.takeOrdered(3)
# 从大到小排序取出前三项数据
intRDD.takeOrdered(3,key=lambda x:-x)
- 统计数据
# 统计
intRDD.stats()
# 最小
intRDD.min()
# 最大
intRDD.max()
# 标准差
intRDD.stdev()
# 计数
intRDD.count()
# 总和
intRDD.sum()
# 平均
intRDD.mean()
3.3 Key-Value 动作运算
- Key-Value first 运算 ```python kvRDD1.first()
取前 n 项数据
kvRDD1.take(2)
2. 统计每个 key 值项数
```python
kvRDD1.countByKey()
- collectAsMap 创建 Key-Value 字典
kv=kvRDD1.collectAsMap()
kv
- lookup 运算
输入 key 查找 valuekvRDD1.lookup(3)
kvRDD1.lookup(5)
4. Shared variable 共享变量
- 在Spark中,当任何函数传递给转换操作时,它将在远程集群节点上执行;
- 适用于函数中使用的所有变量的不同副本,将这些变量将复制到每台计算机,并且远程计算机上的变量更新不会恢复到驱动程序;
共享变量:
- Broadcast 广播变量
- accumulator 累加器
广播变量允许开发人员在每个节点上缓存一个只读变量,无需每个任务中拷贝。可用于高效的为每个节点拷贝一份大的输入数据集;
4.1 Bordcast 广播变量范例
Broadcast 使用规则:
- 使用 SparkContext.broadcats([初始值]) 创建;
- 使用 .value 读取变量值;
- Broadcas 广播变量创建后不能修改;
4.1.1 不使用广播变量创建范例
# 创建 RDD
kvFruit=sc.parallelize([(1,"orange"),(2,"apple"),(3,"banana"),(4,"watermelon")])
# 创建字典
fruitMap=kvFruit.collectAsMap()
# 创建 fruitIds
fruitIds=sc.parallelize([2,4,3,1])
# 水果名称字典 fruitNames
fruitNames=fruitIds.map(lambda x:fruitMap[x]).collect()
print(str(fruitNames))
范例在并行处理过程中每执行一次转换,都需要将 fruitIds 和 fruitMap 上传到Worker Node 才能执行转换,将耗费大量内存和时间,因此使用 Broadcast 广播变量。
4.1.2 使用 Broadcast 广播变量的范例
# 创建 RDD
kvFruit=sc.parallelize([(1,"orange"),(2,"apple"),(3,"banana"),(4,"watermelon")])
# 创建字典
fruitMap=kvFruit.collectAsMap()
# 创建 fruitMap 字典转换为 bcFruitMap 广播变量
bcFruitMap=sc.broadcast(fruitMap)
# 创建 fruitIds
fruitIds=sc.parallelize([2,4,3,1])
# 使用 bcFruitMap.value 字典进行转换
fruitNames=fruitIds.map(lambda x:bcFruitMap.value[x]).collect()
print(str(fruitNames))
并行处理中 bcFruitMap 广播变量将会传送到 Worker Node 机器,并存储在内存中,后续 Worker Node都可以使用这个 bcFruitMap 广播变量执行转换,以节省内存和传送时间;
4.2 accumulator 累加器
- SparkContext.accumulator([初始值]) 创建;
- 使用 .add() 进行累加;
- 在 task 中不能读取累加器的值;
- 只有在驱动程序,即循环外才可用 .value 读取累加器结果
范例:
intRDD=sc.parallelize([3,1,2,5,5])
# 创建累加器
total=sc.accumulator(0.0)
num=sc.accumulator(0)
intRDD.foreach(lambda i:[total.add(1),num.add(1)])
print(total.value)
print(num.value)
5. RDD Persistence 持久化
Spark 持久化用于将重复运算的RDD存储在内存中,来提高运算效率
- RDD.persist(存储等级) # 默认 MEMORY_ONLY 存储在内存中
- RDD.unpersist() # 取消持久化
- 持久化
intRDDMemory=sc.parallelize([3,1,2,5,5])
intRDDMemory.persist()
# 检查是否已经缓存
intRDDMemory.is_cached
- 取消持久化
intRDDMemory.unpersist()
# 检查是否已经缓存
intRDDMemory.is_cached
6. Spark 创建 WordCount
- 创建测试文件 ```bash mkdir -p ~/pythonwork/jupyternotebook/wordcount cd ~/pythonwork/jupyternotebook/wordcount
vim test.txt
Apple Apple Orange Banana Grape Grape
2. 使用 jupyter notebook 进行测试
```python
# 读取文件
textFile=sc.textFile("test.txt")
# flatMap 空格符分割单词,并读取
stringRDD=textFile.flatMap(lambda line:line.split(" "))
# map reduce 计算单词出现次数
countsRDD=stringRDD.map(lambda word:(word,1)).reduceByKey(lambda x,y:x+y)
# 保存计算结果
countsRDD.saveAsTextFile("output")
查看测试结果