RDD,指的是一个只读的,可分区的分布式数据集。这个数据集的全部或部分可以缓存在内存中,可在多次计算间重用;

1. RDD 特性

  • 转换运算
  • 动作运算
  • 持久化
  • 转换1 运算产生 RDD1 只记录操作命令;
  • RDD1 执行转换2 只记录操作命令;
  • RDD2 执行转换3 只记录操作命令;
  • RDD2 执行转换4 只记录操作命令;
  • RDD3 执行动作1 实际执行 转换1+转换2+转换3+动作1;
  • RDD4 执行动作2 实际执行 转换1+转换2+转换4+动作2(若 转换1+转换2 前步骤已经执行 则只执行 转换4+动作2);
  • local 模式测试 RDD
    • local 模式运行 Spark
    • 创建新 ipynb 测试 RDD
  1. cd ~/pythonwork/jupyternotebook
  2. PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS="notebook" pyspark

2. RDD 转换运算

2.1 RDD 基本转换运算

  1. 创建 RDD ```python

    创建 listRDD,parallelize 方法输入一个 list 参数

    listRDD = sc.parallelize([3,1,2,5,5])

intRDD 转换为 List,动作运算,立即执行

intRDD.collect()

  1. 2. map
  2. > map 运算通过传入的函数将每个元素经过函数运算产生另外一个 RDD
  3. - 使用具名函数
  4. ```python
  5. # 创建 addOne 函数
  6. def addOne(x):
  7. return (x+1)
  8. intRDD=sc.parallelize([3,1,2,5,5])
  9. intRDD.map(addOne).colletc()
  • 使用匿名函数
    1. intRDD.map(lambda x:x+1).collect()
  1. filter ```python

    筛选数字小于3

    intRDD.filter(lambda x:x<3).collect()

字符串运算

stringRDD=sc.parallelize([“Item1”,”Item2”,”Item3”]) stringRDD.filter(lambda x:”It”in x).collect()

  1. 4. distinct
  2. ```python
  3. # 删除重复元素
  4. intRDD.distinct().collect()
  1. randomSplit
    将整个集合元素以随机数的方式按比例为多个RDD
    1. # 4:6 分配
    2. sRDD=intRDD.randomSplit([0.4,0.6])
    3. # RDD 1
    4. sRDD[0].collect()
    5. # RDD 2
    6. sRDD[1].collect()
  1. groupBy ```python

    groupBy 按照传入匿名函数规则将数据分为多个 List

    gRDD=intRDD.groupBy(lambda x:”even”if(x%2==0) else “odd”).collect()

print(gRDD[0][0],sorted(gRDD[0][1])) print(gRDD[1][0],sorted(gRDD[1][1]))

  1. <a name="9132beda"></a>
  2. ### 2.2 多个 RDD 转换运算
  3. 1. 创建范例
  4. ```python
  5. intRDD1=sc.parallelize([3,1,2,5,5])
  6. intRDD2=sc.parallelize([5,6])
  7. intRDD3=sc.parallelize([2,7])
  1. union
    1. # 并集运算
    2. intRDD1.union(intRDD2).union(intRDD3).collect()
  1. intersection
    1. # 交集运算
    2. intRDD1.intersection(intRDD2).collect()
  1. subtract
    1. # 差集运算
    2. intRDD1.subtract(intRDD2).collect()
  1. cartesian
    1. # 笛卡尔乘积
    2. intRDD1.cartesian(intRDD2).collect()

2.3 RDD Key-Value 基本转换运算

Key-Value 运算是 Map/Reduce 的基础

  1. 创建 Key-Value RDD
    1. # (3,4) key=3 value=4
    2. kvRDD1=sc.parallelize([(3,4),(3,6),(5,6),(1,2)])
    3. kvRDD1.collect()
  1. 列出
    1. # 列出 key 值
    2. kvRDD1.keys().collect()
    3. # 列出 value 值
    4. kvRDD1.values().collect()
  1. 筛选 filter
    1. # 筛选 key <5
    2. kvRDD1.filter(lambda keyValue:keyValue[0]<5).collect()
    3. # 筛选 value >5
    4. kvRDD1.filter(lambda keyValue:keyValue[1]>5).collect()
  1. mapValues 运算
    mapValues 运算针对 RDD 内每一组(Key,Value)运算,产生另外一个RDD
    1. # value 每一个值进行平方运算
    2. kvRDD1.mapValues(lambda x:x*x).collect()
  1. sortByKey 排序
    1. # key 值从小到大排序
    2. kvRDD1.sortByKey(ascending=True).collect()
    3. kvRDD1.sortByKey().collect()
    4. # key 值从大到小排序
    5. kvRDD1.sortByKey(ascending=False).collect()
  1. reduceByKey
    按照 Key 值进行 reduce 运算,相同 key值,将 value 相加
    1. # 相同 key 值相加
    2. kvRDD1.reduceByKey(lambda x,y:x+y).collect()
    3. # 相同 key 值数据合并
    4. kvRDD1.reduceByKey((x,y)=>x+y)

2.4 多个 RDD Key-Value 基本转换运算

  1. 创建范例 ```python kvRDD1=sc.parallelize([(3,4),(3,6),(5,6),(1,2)]) kvRDD2=sc.parallelize([(3,8)])

kvRDD1.collect() kvRDD2.collect()

  1. 2. Key-Value RDD join 运算<br />按相同 key join value
  2. ```python
  3. kvRDD1.join(kvRDD2).collect()
  1. Key-Value leftOuterJoin 运算
    左边集合对应到右边集合,显示所有左边集合中的元素
    1. kvRDD1.leftOuterJoin(kvRDD2).collect()
  1. Key-Value RDD rightOuterJoin 运算
    从右边的集合对应到左边的集合,并显示所有右边集合中的元素
    1. kvRDD1.rightOuterJoin(kvRDD2).collect()
  1. Key-value subtractByKey 运算
    删除相同 key 值的数据
    1. kvRDD1.subtractByKey(kvRDD2).collect()

3. RDD 动作运算

3.1 RDD 基本动作运算

  1. 读取元素
    1. # 取出第1项数据
    2. intRDD.first()
    3. # 取出第2项数据
    4. intRDD.take(2)
    5. # 从小到大排序取出前三项数据
    6. intRDD.takeOrdered(3)
    7. # 从大到小排序取出前三项数据
    8. intRDD.takeOrdered(3,key=lambda x:-x)
  1. 统计数据
    1. # 统计
    2. intRDD.stats()
    3. # 最小
    4. intRDD.min()
    5. # 最大
    6. intRDD.max()
    7. # 标准差
    8. intRDD.stdev()
    9. # 计数
    10. intRDD.count()
    11. # 总和
    12. intRDD.sum()
    13. # 平均
    14. intRDD.mean()

3.3 Key-Value 动作运算

  1. Key-Value first 运算 ```python kvRDD1.first()

取前 n 项数据

kvRDD1.take(2)

  1. 2. 统计每个 key 值项数
  2. ```python
  3. kvRDD1.countByKey()
  1. collectAsMap 创建 Key-Value 字典
    1. kv=kvRDD1.collectAsMap()
    2. kv
  1. lookup 运算
    输入 key 查找 value
    1. kvRDD1.lookup(3)
    2. kvRDD1.lookup(5)

4. Shared variable 共享变量

  • 在Spark中,当任何函数传递给转换操作时,它将在远程集群节点上执行;
  • 适用于函数中使用的所有变量的不同副本,将这些变量将复制到每台计算机,并且远程计算机上的变量更新不会恢复到驱动程序;

共享变量:

  • Broadcast 广播变量
  • accumulator 累加器

广播变量允许开发人员在每个节点上缓存一个只读变量,无需每个任务中拷贝。可用于高效的为每个节点拷贝一份大的输入数据集;

4.1 Bordcast 广播变量范例

Broadcast 使用规则:

  • 使用 SparkContext.broadcats([初始值]) 创建;
  • 使用 .value 读取变量值;
  • Broadcas 广播变量创建后不能修改;

4.1.1 不使用广播变量创建范例

  1. # 创建 RDD
  2. kvFruit=sc.parallelize([(1,"orange"),(2,"apple"),(3,"banana"),(4,"watermelon")])
  3. # 创建字典
  4. fruitMap=kvFruit.collectAsMap()
  5. # 创建 fruitIds
  6. fruitIds=sc.parallelize([2,4,3,1])
  7. # 水果名称字典 fruitNames
  8. fruitNames=fruitIds.map(lambda x:fruitMap[x]).collect()
  9. print(str(fruitNames))

范例在并行处理过程中每执行一次转换,都需要将 fruitIds 和 fruitMap 上传到Worker Node 才能执行转换,将耗费大量内存和时间,因此使用 Broadcast 广播变量。

4.1.2 使用 Broadcast 广播变量的范例

  1. # 创建 RDD
  2. kvFruit=sc.parallelize([(1,"orange"),(2,"apple"),(3,"banana"),(4,"watermelon")])
  3. # 创建字典
  4. fruitMap=kvFruit.collectAsMap()
  5. # 创建 fruitMap 字典转换为 bcFruitMap 广播变量
  6. bcFruitMap=sc.broadcast(fruitMap)
  7. # 创建 fruitIds
  8. fruitIds=sc.parallelize([2,4,3,1])
  9. # 使用 bcFruitMap.value 字典进行转换
  10. fruitNames=fruitIds.map(lambda x:bcFruitMap.value[x]).collect()
  11. print(str(fruitNames))

并行处理中 bcFruitMap 广播变量将会传送到 Worker Node 机器,并存储在内存中,后续 Worker Node都可以使用这个 bcFruitMap 广播变量执行转换,以节省内存和传送时间;

4.2 accumulator 累加器

  • SparkContext.accumulator([初始值]) 创建;
  • 使用 .add() 进行累加;
  • 在 task 中不能读取累加器的值;
  • 只有在驱动程序,即循环外才可用 .value 读取累加器结果

范例:

  1. intRDD=sc.parallelize([3,1,2,5,5])
  2. # 创建累加器
  3. total=sc.accumulator(0.0)
  4. num=sc.accumulator(0)
  5. intRDD.foreach(lambda i:[total.add(1),num.add(1)])
  6. print(total.value)
  7. print(num.value)

5. RDD Persistence 持久化

Spark 持久化用于将重复运算的RDD存储在内存中,来提高运算效率

  • RDD.persist(存储等级) # 默认 MEMORY_ONLY 存储在内存中
  • RDD.unpersist() # 取消持久化
  1. 持久化
    1. intRDDMemory=sc.parallelize([3,1,2,5,5])
    2. intRDDMemory.persist()
    3. # 检查是否已经缓存
    4. intRDDMemory.is_cached
  1. 取消持久化
    1. intRDDMemory.unpersist()
    2. # 检查是否已经缓存
    3. intRDDMemory.is_cached

6. Spark 创建 WordCount

  1. 创建测试文件 ```bash mkdir -p ~/pythonwork/jupyternotebook/wordcount cd ~/pythonwork/jupyternotebook/wordcount

vim test.txt

Apple Apple Orange Banana Grape Grape

  1. 2. 使用 jupyter notebook 进行测试
  2. ```python
  3. # 读取文件
  4. textFile=sc.textFile("test.txt")
  5. # flatMap 空格符分割单词,并读取
  6. stringRDD=textFile.flatMap(lambda line:line.split(" "))
  7. # map reduce 计算单词出现次数
  8. countsRDD=stringRDD.map(lambda word:(word,1)).reduceByKey(lambda x,y:x+y)
  9. # 保存计算结果
  10. countsRDD.saveAsTextFile("output")


查看测试结果