3.3.1 aggregate
aggregate 详解
aggregateU: ClassTag(seqOp: (U, T) => U, combOp: (U, U) => U): U 对给定的数据集进行方法设定 zeroValue_ 运算符的每个分区的累积结果的初始值,以及combOp运算符的不同分区的合并结果的初始值-这通常是中性元素(例如,对于列表串联为零,对于求和为0) seqOp–用于在分区内累积结果的运算符 commOp–用于组合来自不同分区的结果的关联运算符
aggregate 代码示例
object ChapterRDD1 {// 环境val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("test")val sc: SparkContext = new SparkContext(conf)def main(args: Array[String]): Unit = {System.setProperty("hadoop.home.dir", "H:\\winutils\\winutils-master\\hadoop-2.6.0")// 生成一个RDD[Int] 并且规定numSlices为1 这样就代表一个节点1val rdd1: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4, 5, 6), 1)// 传入初始值为0 传入math.max方法 聚合结果 _+_// Array(1, 2, 3, 4, 5, 6) 聚合来结果为 6val i1: Int = rdd1.aggregate(0)(math.max(_: Int, _: Int), (_: Int) + (_: Int))println(i1)// 得出结果为6// 生成一个RDD[Int] 并且规定numSlices为2 这样就代表一个节点2// Array(1,2,3) Array(4,5,6)val rdd2: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4, 5, 6), 2)// 传入初始值为0 传入math.max方方法// Array(1,2,3) Array(4,5,6) 两个节点得出结果 3 , 6 _+_ 聚合之后就变成了 9val i2: Int = rdd2.aggregate(0)(math.max(_: Int, _: Int), (_: Int) + (_: Int))println(i2)// 得出结果为 9// 加深印象 操作字符串val rdd3: RDD[String] = sc.parallelize(Array("a", "b", "c", "d", "e", "f"), 1)val str: String = rdd3.aggregate("")((_: String) + (_: String), (_: String) + (_: String))println(str)}}
上面的案列不仅仅使我们更加了解 aggregate 也了解了 SparkContext.parallelize 如何创建RDD 和 parallelize的分区
3.3.2 提前计算的cache方法
cache 详解
cache方法的作用是将数据内容计算并保存在计算节点的内存中,这个方法的使用是针对Spark的Lazy数据处理模式
在Lazy模式中,数据在i安逸和未使用时是不进行计算的,而仅仅保存其存储地址,只有在Action方法到来时才正式计算。这样做的好处在于可以极大的减少存储空间,从而·提高利用率,而有时必须要求数据及进行计算,此时就需要用cache方法, 可以直接调用
cache 代码示例
object ChapterRDD_cache {// 环境val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("test")val sc: SparkContext = new SparkContext(conf)def main(args: Array[String]): Unit = {System.setProperty("hadoop.home.dir", "H:\\winutils\\winutils-master\\hadoop-2.6.0")val rdd: RDD[String] = sc.parallelize(Array("a", "b", "c", "d", "e", "f"), 1)val value: RDD[String] = rdd.filter((_: String) != "a").map((_: String).toLowerCase())// 这样写的话 每一次输入都要重新进行计算value.foreach(println)value.foreach(println)value.foreach(println)println("*************************")// 先把计算结果放入缓存 这样输出的时候 或者再一次进行操作 不需要对前面的操作重复value.cache()value.foreach(println)value.foreach(println)value.foreach(println)// 感兴趣的小伙伴可以用时间 来看看运行时间差}}
3.3.3 笛卡尔操作的cartesian方法
cartesian 详解
此方法是用于对不同的数组进行笛卡尔操作,要求是数据集的长度必须相同, 此结果作为一个新的数据集返回。
cartesian 代码示例
object ChapterRDD_cartesian {// 环境val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("test")val sc: SparkContext = new SparkContext(conf)def main(args: Array[String]): Unit = {val rdd1: RDD[String] = sc.parallelize(Array("a", "b", "c"), 1)val rdd2: RDD[String] = sc.parallelize(Array("d", "e", "f"), 1)val value: RDD[(String, String)] = rdd1.cartesian(rdd2)value.foreach(println)// 输出的结果 就是每一个值 去匹配第二个rdd的每一个值// (a,d)// (a,e)// (a,f)// (b,d)// (b,e)// (b,f)// (c,d)// (c,e)// (c,f)}}
3.3.4 分片存储的coalesce
coalesce 详解
:::tips
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
// 第一个参数是将数据重新分成的片数
// 布尔值指的是将数据分成更小的片时使用 举例中将其设置为true
:::
coalesce 代码示例
object ChapterRDD_coaleasce {// 环境val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("test")val sc: SparkContext = new SparkContext(conf)def main(args: Array[String]): Unit = {val rdd2: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4, 5, 6), 3)val rdd2Cpy: RDD[Int] = rdd2.coalesce(1, true)val ressult: Int = rdd2.aggregate(0)(math.max(_: Int, _: Int), (_: Int) + (_: Int))val ressult2: Int = rdd2Cpy.aggregate(0)(math.max(_: Int, _: Int), (_: Int) + (_: Int))println(s"result: ${ressult}")println(s"rdd2.getNumPartitions: ${rdd2.getNumPartitions}")println(s"rdd2Cpy.getNumPartitions: ${rdd2Cpy.getNumPartitions}")println(s"result: ${ressult2}")//result: 12//rdd2.getNumPartitions: 3//rdd2Cpy.getNumPartitions: 1//result: 6}}
RDD还有一个repartition方法
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
} // 可以看到 repartition就是coalesce 为true的时候
3.3.5 以value计算的 countByValue方法
countByValue 详解
countByValue 方法是计算数据集某个数据出现的个数,并且以map的形式返回
countByValue代码示例
object ChapterRDD_countByValue {// 环境val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("test")val sc: SparkContext = new SparkContext(conf)def main(args: Array[String]): Unit = {System.setProperty("hadoop.home.dir", "H:\\winutils\\winutils-master\\hadoop-2.6.0")// 生成一个RDD[Int] 并且规定numSlices为1 这样就代表一个节点1val rdd1: RDD[Int] = sc.parallelize(Array(1, 2))rdd1.countByValue().foreach(println)// 输出结果//(1,1)//(2,1)}}
3.3.6 以key计算的 countByKey方法
countByKey详解
countByKey方法与countByValue 方法有本质的区别。countByKey是计算数组中元数据键值对key出现的个数
countByKey代码示例
object ChapterRDD_countByKey {// 环境val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("test")val sc: SparkContext = new SparkContext(conf)def main(args: Array[String]): Unit = {val rdd1: RDD[Int] = sc.parallelize(Array(5,5,6,6,7,7,5))// 1 map 把每一个元素变成 (element, 1)// 2 countByKey 进行key的计算val map: collection.Map[Int, Long] = rdd1.map(((_: Int),1)).countByKey()println(map)// 输出结果//Map(6 -> 2, 7 -> 2, 5 -> 3)}}
3.3.7 除去数据集中重复项的distinct方法
distinct 详解
distinct代码演示
3.3.8 过滤数据的filter方法
filter详解
filter代码演示
rdd.filter(bool) 保留满足bool判断的数据
3.3.9 flatMap & Map
flatMap & Map 详解
flatMap以行为单位 ,map以单个元素为单位,最终返回的就是一个数据集
flatmap会把结果里的集合展开。而map不会。
flatMap 就是先map 然后flat
flatMap & Map 代码示例
object ChapterRDD_flatMap_Map {// 环境val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("test")val sc: SparkContext = new SparkContext(conf)def main(args: Array[String]): Unit = {var rdd1: RDD[Int] = sc.parallelize(Array(5,5,7,5))val resultMap: RDD[List[Int]] = rdd1.map((x: Int) =>List(x+2))resultMap.foreach(println)// 输出结果//List(7)//List(9)//List(7)//List(7)println("--------------------------")val resultFlatMap: RDD[Int] = rdd1.flatMap((x: Int) =>List(x+2))resultFlatMap.foreach(println)// 输出结果//7//7//7//9}}
