Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。三大数据结构分别是:
- RDD : 弹性分布式数据集
- 累加器:分布式共享只写变量
- 广播变量:分布式共享只读变量
接下来我们一起看看这三大数据结构是如何在数据处理中使用的。
一、RDD
1、介绍
1、概念
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
弹性
- 存储的弹性:内存与磁盘的自动切换;
- 容错的弹性:数据丢失可以自动恢复;
- 计算的弹性:计算出错重试机制;
- 分片的弹性:可根据需要重新分片。
分布式:数据存储在大数据集群不同节点上
- 数据集:RDD 封装了计算逻辑,并不保存数据
- 数据抽象:RDD 是一个抽象类,需要子类具体实现
- 不可变:RDD 封装了计算逻辑,是不可以改变的,想要改变,只能产生新的 RDD,在新的 RDD 里面封装计算逻辑
- 可分区、并行计算
RDD的数据处理方式类似于IO流,也有装饰者设计模式
RDD的数据只有在调用collect方法时,才会真正执行业务逻辑操作。之前都不会
RDD是不保存数据的,但是IO可以临时保存一部分数据在缓冲区中。
2、核心属性
* Internally, each RDD is characterized by five main properties:
*
* - A list of partitions
* - A function for computing each split
* - A list of dependencies on other RDDs
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
* an HDFS file)
- 分区列表
RDD 数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性。
/**
* Implemented by subclasses to return the set of partitions in this RDD. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*
* The partitions in this array must satisfy the following property:
* `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
*/
protected def getPartitions: Array[Partition]
- 分区计算函数
Spark 在计算时,是使用分区函数对每一个分区进行计算
/**
* :: DeveloperApi ::
* Implemented by subclasses to compute a given partition.
*/
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]
- RDD 之间的依赖关系
RDD 是计算模型的封装,当需求中需要将多个计算模型进行组合时,就需要将多个 RDD 建立依赖关系
/**
* Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*/
protected def getDependencies: Seq[Dependency[_]] = deps
- 分区器(可选)
当数据为 KV 类型数据时,可以通过设定分区器自定义数据的分区
/** Optionally overridden by subclasses to specify how they are partitioned. */
@transient val partitioner: Option[Partitioner] = None
- 首选位置(可选)
计算数据时,可以根据计算节点的状态选择不同的节点位置进行计算
/**
* Optionally overridden by subclasses to specify placement preferences.
*/
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
3、执行原理
从计算的角度来讲,数据处理过程中需要计算资源(内存 & CPU)和计算模型(逻辑)。执行时,需要将计算资源和计算模型进行协调和整合。
Spark 框架在执行时,先申请资源,然后将应用程序的数据处理逻辑分解成一个一个的计算任务。然后将任务发到已经分配资源的计算节点上, 按照指定的计算模型进行数据计算。最后得到计算结果。
RDD 是 Spark 框架中用于数据处理的核心模型,接下来我们看看,在 Yarn 环境中,RDD的工作原理:
1、启动Yarn集群
2、Spark通过申请资源创建调度节点和计算节点
3、Spark框架根据需求计算逻辑根据分区划分成不同的任务
4、调度节点根据计算节点状态将任务发送到对应的计算节点进行计算
2、基础使用
1、创建RDD
在 Spark 中创建 RDD 的创建方式可以分为四种:
- 从集合(内存)中创建 RDD
从集合中创建 RDD,Spark 主要提供了两个方法:parallelize 和 makeRDD
object RDD_Memory {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD_MEMORY")
val context = new SparkContext(conf)
// 创建rdd
// seq: Seq[T],
// numSlices: Int = defaultParallelism
val seq: Seq[Int] = Seq(1, 2, 3)
val rdd: RDD[Int] = context.parallelize(seq);
// 其实makeRDD就是封装了parallelize方法
val rdd2: RDD[Int] = context.makeRDD(seq)
// rdd.collect.foreach(println)
rdd2.collect.foreach(println)
context.stop()
}
}
从底层代码实现来讲,makeRDD 方法其实就是 parallelize 方法
def makeRDD[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
// 调用parallelize()方法
parallelize(seq, numSlices)
}
从外部存储(文件)创建 RDD
- textFile:以行为单位读取数据,不考虑文件来自哪里 ```scala object RDD_File {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster(“local[*]”).setAppName(“RDD_MEMORY”))
// 创建rdd // 1、path路径默认从当前环境的根路径为基准,可以绝对和相对路径 // val rdd1: RDD[String] = context.textFile(“datas/1.txt”) // 2、path路径可以是具体的文件,也可以是目录,读取该目录下所有文件 val rdd2: RDD[String] = context.textFile(“datas”) // 3、path路径可以使用通配符 val rdd3: RDD[String] = context.textFile(“datas/1.txt”) // 只读取1的文件 // 4、path路径也可以是分布式存储系统,如HDFS,Ceph、Nfs等 context.textFile(“hdfs://hadoop:9820/hello”)
rdd1.collect.foreach(println)
context.stop()
} }
- whoTextFiles:以文件为单位读取数据,读取的结果表示为元组,第一个元素表示文件路径,第二个元素表示文件内容
```scala
// 获取文件来源
val rdd5: RDD[(String, String)] = context.wholeTextFiles("datas")
rdd5.collect.foreach(x => println(x._2))
- 从其他 RDD 创建
- 主要是通过一个 RDD 运算完后,再产生新的 RDD。
- 直接创建 RDD(new)
- 使用 new 的方式直接构造 RDD,一般由 Spark 框架自身使用。
2、并行度和分区
1、集合数据的分配
默认情况下,Spark 可以将一个作业切分多个任务后,发送给 Executor 节点并行计算,而能够并行计算的任务数量我们称之为并行度。这个数量可以在构建 RDD 时指定。记住,这里的并行执行的任务数量,并不是指的切分任务的数量,不要混淆了。
我们在使用parallelize方法的时候,可以传入第二个参数:numSlices: Int = defaultParallelism,默认并行度。
来到TaskScheduler特质中
// Get the default level of parallelism to use in the cluster, as a hint for sizing jobs.
def defaultParallelism(): Int
找其实现类
override def defaultParallelism(): Int = backend.defaultParallelism()
来到SchedulerBackend特质中
def defaultParallelism(): Int
继续找其实现类
分模式来找对应的实现
本地实现:找spark.default.parallelism
配置项的值,如果没有设置就用默认totalCores,本地所有cpu核数
override def defaultParallelism(): Int =
scheduler.conf.getInt("spark.default.parallelism", totalCores)
集群模式中,如果没有设置,就使用cpu总核数或者2中的最大值
override def defaultParallelism(): Int = {
conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
}
可以使用saveAsObjectFile将rdd产出文件保存到目录中
val rdd2: RDD[Int] = context.makeRDD(seq)
rdd2.saveAsObjectFile("datas/output")
指定数量之后:
val rdd2: RDD[Int] = context.makeRDD(seq,3)
rdd2.saveAsObjectFile("datas/output")
那么这个切分,是按照什么切分呢?切分后的数据到底存放在哪里呢?
比如一个5长度的数组,切分3个分区,数据怎么分呢?
来深入源码分析:
此时5个元素,三个分片
1、先来到SparkContext的parallelize()方法
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
// 创建一个ParallelCollectionRDD对象,
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
2、创建ParallelCollectionRDD对象
private[spark] class ParallelCollectionRDD[T: ClassTag](
sc: SparkContext,
@transient private val data: Seq[T],
numSlices: Int,
locationPrefs: Map[Int, Seq[String]])
extends RDD[T](sc, Nil) {
// 在这里获取分区数量
override def getPartitions: Array[Partition] = {
// 获取分片
val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
}
3、进入ParallelCollectionRDD.slice(data, numSlices)方法,求出数据如何分片
def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
// 分片小于1抛出异常
if (numSlices < 1) {
throw new IllegalArgumentException("Positive number of partitions required")
}
// Sequences need to be sliced at the same set of index positions for operations
// like RDD.zip() to behave as expected
// 求出位置 5 3
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
// 遍历所有分片,0, 1, 2
(0 until numSlices).iterator.map { i =>
// start = 0 * 5 / 3 = 0
// 分片 * 数据长度 / 分片数量,决定轮询到哪个分片
val start = ((i * length) / numSlices).toInt
// 0 + 1 * 数据长度 / 分片数量
val end = (((i + 1) * length) / numSlices).toInt
// 就成了 (0,1) (1,3) (3,5)
(start, end)
}
}
// 模式匹配
seq match {
// 不是范围
case r: Range =>
positions(r.length, numSlices).zipWithIndex.map { case ((start, end), index) =>
// If the range is inclusive, use inclusive range for the last slice
if (r.isInclusive && index == numSlices - 1) {
new Range.Inclusive(r.start + start * r.step, r.end, r.step)
}
else {
new Range(r.start + start * r.step, r.start + end * r.step, r.step)
}
}.toSeq.asInstanceOf[Seq[Seq[T]]]
case nr: NumericRange[_] =>
// For ranges of Long, Double, BigInteger, etc
val slices = new ArrayBuffer[Seq[T]](numSlices)
var r = nr
for ((start, end) <- positions(nr.length, numSlices)) {
val sliceSize = end - start
slices += r.take(sliceSize).asInstanceOf[Seq[T]]
r = r.drop(sliceSize)
}
slices
// 默认兜底处理
case _ =>
val array = seq.toArray // To prevent O(n^2) operations for List etc
// 5,3
positions(array.length, numSlices).map {
// // 就成了 (0,1) (1,3) (3,5)
case (start, end) =>
array.slice(start, end).toSeq
}.toSeq
}
}
4、slice最终分片
override def slice(from: Int, until: Int): Array[T] = {
// 获取数组
val reprVal = repr
//
val lo = math.max(from, 0)
val hi = math.min(math.max(until, 0), reprVal.length)
// 最大值减去最小值,决定每个分片存放数据的数量
val size = math.max(hi - lo, 0)
val result = java.lang.reflect.Array.newInstance(elementClass, size)
if (size > 0) {
// 通过copy方法,将原数组中的每个分片的位置拷贝size个元素
Array.copy(reprVal, lo, result, 0, size)
}
result.asInstanceOf[Array[T]]
}
5、copy数组元素
* @param src the source array.
* @param srcPos starting position in the source array.
* @param dest destination array.
* @param destPos starting position in the destination array.
* @param length the number of array elements to be copied.
*
* @see `java.lang.System#arraycopy`
*/
def copy(src: AnyRef, srcPos: Int, dest: AnyRef, destPos: Int, length: Int) {
val srcClass = src.getClass
if (srcClass.isArray && dest.getClass.isAssignableFrom(srcClass))
java.lang.System.arraycopy(src, srcPos, dest, destPos, length)
else
slowcopy(src, srcPos, dest, destPos, length)
}
2、文件数据的分配
读取文件数据时,数据是按照 Hadoop 文件读取的规则进行切片分区,而切片规则和数据读取的规则有些差异。
在读取文件的时候同样可以指定最小分区数量。
// minPartitions: Int = defaultMinPartitions 最小分区数量
val rdd1: RDD[String] = context.textFile("datas/1.txt",2)
// def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
我们这个1.txt文件如下:
看着是三个字节,其实是包含了7个字节,回车和换行符。
1、textFile进行读取
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
// 来到HadoopFile,其实是由Hadoop接管了
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
2、HadoopFile创建一个新的HadoopRDD对象。
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
assertNotStopped()
// This is a hack to enforce loading hdfs-site.xml.
// See SPARK-11227 for details.
FileSystem.getLocal(hadoopConfiguration)
// A Hadoop configuration can be about 10 KiB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
// 我们看到了熟悉的FileInputFormat,这就解释了为什么是一行一行读取。
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)
}
3、等到collect收集的时候
来到HadoopRDD的getPartitions方法
override def getPartitions: Array[Partition] = {
val jobConf = getJobConf()
// add the credentials here as this can be called before SparkContext initialized
SparkHadoopUtil.get.addCredentials(jobConf)
try {
// 从这里获取输入流,并且获取分片
val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
val inputSplits = if (ignoreEmptySplits) {
allInputSplits.filter(_.getLength > 0)
} else {
allInputSplits
}
if (inputSplits.length == 1 && inputSplits(0).isInstanceOf[FileSplit]) {
val fileSplit = inputSplits(0).asInstanceOf[FileSplit]
val path = fileSplit.getPath
if (fileSplit.getLength > conf.get(IO_WARNING_LARGEFILETHRESHOLD)) {
val codecFactory = new CompressionCodecFactory(jobConf)
if (Utils.isFileSplittable(path, codecFactory)) {
logWarning(s"Loading one large file ${path.toString} with only one partition, " +
s"we can increase partition numbers for improving performance.")
} else {
logWarning(s"Loading one large unsplittable file ${path.toString} with only one " +
s"partition, because the file is compressed by unsplittable compression codec.")
}
}
}
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
}
array
} catch {
case e: InvalidInputException if ignoreMissingFiles =>
logWarning(s"${jobConf.get(FileInputFormat.INPUT_DIR)} doesn't exist and no" +
s" partitions returned from this path.", e)
Array.empty[Partition]
}
}
4、来到Hadoop读取分片的getSplits方法,至此获取分片数量,完成任务
/** Splits files returned by {@link #listStatus(JobConf)} when
* they're too big.*/
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
StopWatch sw = new StopWatch().start();
FileStatus[] files = listStatus(job);
// Save the number of input files for metrics/loadgen
job.setLong(NUM_INPUT_FILES, files.length);
// 获取文件数据的总字节大小
long totalSize = 0; // compute total size
for (FileStatus file: files) { // check we have valid files
if (file.isDirectory()) {
throw new IOException("Not a file: "+ file.getPath());
}
totalSize += file.getLen();
}
// goalSize就是总字节 / 有指定几个分区,就除几,获取每个分区应该存放多少字节
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
// generate splits
ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
NetworkTopology clusterMap = new NetworkTopology();
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
FileSystem fs = path.getFileSystem(job);
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
// 如果可以分片,进行分片操作
if (isSplitable(fs, path)) {
long blockSize = file.getBlockSize(); // 获取块大小,集群模式128m,local模式32m
// 获取切片大小,取块大小和目标分区大小的最小值
long splitSize = computeSplitSize(goalSize, minSize, blockSize);
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
length-bytesRemaining, splitSize, clusterMap);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
splitHosts[0], splitHosts[1]));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
- bytesRemaining, bytesRemaining, clusterMap);
splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
splitHosts[0], splitHosts[1]));
}
} else {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
}
return splits.toArray(new FileSplit[splits.size()]);
}
分区的数据是如何分配的呢?
1、spark读取文件,是以hadoop的FileInputFormat为默认实现的,所以是一行一行读取。和字节数没有关系
2、数据读取时以偏移量为单位,并且不会重复读取
我们的数据是 1@@ ==> 012 2@@ ==> 345 3 ==> 6
3、数据分区的偏移量范围的计算
0 ==> [0,3] => 12 1 ==> [3,6] => 3 2 ==> [6,7] =>
这个读取范围两边都可以取到
,我们读取0,1,2,3。由于是按照一行一行读取的,所以读取到了数据1和2。由于能够读取到2,所以说2所在的一行已经被读取了,不能被重复读取。所以继续往下走,读取第三行的3到第二个分区中,所以第三个分区什么也没有
3、RDD算子
RDD的方法总共分两种,大致分为:
- 转换:功能的补充和封装,将旧的RDD包装成新的RDD,比如flatMap、map操作
- 行动:触发任务的调度和执行
RDD的方法也称为算子。
认知心理学认为解决问题其实是将问题的状态进行改变:
问题(初始) => 操作(算子) => 问题(审核中) => 操作(算子) => 问题(完成)
这个 操作翻译过来就叫做Operator,所以RDD的方法就叫做算子。
1、RDD转换算子
RDD 根据数据处理方式的不同将算子整体上分为 Value 类型、双 Value 类型和 Key-Value类型
- Value类型
1、map
def map[U: ClassTag](f: T => U): RDD[U]
将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。
object Transform_map {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
val rdd: RDD[Int] = context.makeRDD(List(1, 2, 3, 4))
// 定义一个转换函数,将数值都乘以2返回
def double(num: Int): Int = {
num * 2
}
val value: RDD[Int] = rdd.map(double)
// 函数至简原则,能省则省,
// rdd.map((num: Int) => {num * 2})
// 最简单形式:
rdd.map(_*2)
value.collect.foreach(println)
}
}
小案例:收集Apache的log日志,并获取uri
apache.log
思路:日志的话,如果格式可能一直,ip地址可能不一致,所以偏移量切割不太可行。尽量按照空格等切割。apache的日志访问uri都在最后一个,其实也可以倒着切
object Transform_map_test {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
val rdd: RDD[String] = context.textFile("datas/apache.log")
// 方便理解
rdd.map(
line => {
val strings: Array[String] = line.split(" ")
strings(6)
}
)
// 柯里化传值
val value: RDD[String] = rdd.map(_.split(" ")(6))
value.collect.foreach(println)
}
}
map的计算其实是并行的
演示:
object Transform_map_parallel_test {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
val rdd: RDD[Int] = context.makeRDD(List(1, 2, 3, 4),1)
val mappedRdd: RDD[Int] = rdd.map(num => {
println(Thread.currentThread.getName + "=======>" + num)
num
})
val mappedRdd1: RDD[Int] = mappedRdd.map(num => {
println(Thread.currentThread.getName + "#######》" + num)
num
})
mappedRdd1.collect.foreach(println)
context.stop()
}
}
Rdd的计算,在一个分区内是一个一个执行逻辑,只有前一个数据的逻辑执行完毕之后才可以执行下一个数据,分区内数据是有序的。
当只有一个分区的结果:
Executor task launch worker for task 0=======>1 Executor task launch worker for task 0#######》1 Executor task launch worker for task 0=======>2 Executor task launch worker for task 0#######》2 Executor task launch worker for task 0=======>3 Executor task launch worker for task 0#######》3 Executor task launch worker for task 0=======>4 Executor task launch worker for task 0#######》4
分区数量改为2之后,同一个分区内是有序的,但是执行顺序就是并行的,谁先执行就不知道了
Executor task launch worker for task 1=======>3 Executor task launch worker for task 1#######》3 Executor task launch worker for task 1=======>4 Executor task launch worker for task 1#######》4 Executor task launch worker for task 0=======>1 Executor task launch worker for task 0#######》1 Executor task launch worker for task 0=======>2 Executor task launch worker for task 0#######》2
2、mapPartitions
按照分区进行计算
def mapPartitionsU: ClassTag: RDD[U]
object MapPartition_Test {
def main(args: Array[String]): Unit = {
val context: SparkContext = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("mapPartition"))
val rdd = context.makeRDD(List(1, 2, 3, 4), 1)
// def mapPartitions[U: ClassTag](
// f: Iterator[T] => Iterator[U],
// preservesPartitioning: Boolean = false): RDD[U]
//
val value: RDD[Int] = rdd.mapPartitions(iter => {
println("--------------")
iter.map(_ * 2)
})
value.collect.foreach(println)
}
}
如果是两个分区的话就可以在每个分区执行一遍该操作。——-就会被输出两次。
小功能:获取每个分区的最大值
mapPartitions就可以轻松实现
def main(args: Array[String]): Unit = {
val context: SparkContext = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("mapPartition"))
val rdd = context.makeRDD(List(1, 2, 3, 4), 2)
// 但是这个mapPartitions方法,占用内存,会导致引用指向内存中的数据导致GC不会释放。所以慎用,或者考虑使用软引用
// def mapPartitions[U: ClassTag](
// f: Iterator[T] => Iterator[U],
// preservesPartitioning: Boolean = false): RDD[U]
//
val value: RDD[Int] = rdd.mapPartitions(iter => {
// 最外层是迭代器,所以要包装一下
List(iter.max).iterator
})
value.collect.foreach(println)
}
}
思考一下:map和mapPartitions的区别?
- 数据处理角度
Map 算子是分区内一个数据一个数据的执行,类似于串行操作。而 mapPartitions 算子是以分区为单位进行批处理操作。
- 功能的角度
Map 算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据
- 性能的角度
Map 算子因为类似于串行操作,所以性能比较低,而是 mapPartitions 算子类似于批处理,所以性能较高。但是 mapPartitions 算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。使用 map 操作。
3、mapPartitionsWithIndex
**
def mapPartitionsWithIndexU: ClassTag => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。
获取第二个数据分区的数据
object Transform_mapPartitionsWithIndex {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
val rdd: RDD[Int] = context.makeRDD(List(1,2,3,4),2)
// mapPartitionsWithIndex可以根据索引选择
// 比如现在只要第二个分区的数据。
val mappedRdd: RDD[Int] = rdd.mapPartitionsWithIndex(
(index, iter) => {
if (index == 1) {
iter
} else {
// 其他分区返回null
Nil.iterator
}
}
)
mappedRdd.collect.foreach(println)
}
}
将数据根据索引放到不同分区
object Transform_mapPartitionsWithIndex_2 {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
val rdd: RDD[Int] = context.makeRDD(List(1,2,3,4),2)
// mapPartitionsWithIndex可以根据索引选择
//
val mappedRdd: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex(
// 将获取到的数据,根据分区索引,将数据放到不同的分区
(index, iter) => {
iter.map(num => (index, num))
}
)
mappedRdd.collect.foreach(println)
}
}
4、flatMap
def flatMapU: ClassTag: RDD[U]
将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射
object Transform_flatMap {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
val rdd: RDD[List[Int]] = context.makeRDD(List(List(1,2),List(3,4)))
// 不能使用_
val mappedRdd = rdd.flatMap(list => list)
mappedRdd.collect.foreach(println)
}
}
我们进行wordCount的时候,拆分字符串就可以使用这个方法,因为String切割之后是一个个的Array[String],所以扁平映射将所有String拿出来。
小功能:将 List(List(1,2),3,List(4,5))进行扁平化操作
这个是两个List并且混杂着一个Int,所以外面不能确定元素的类型,可以考虑使用模式匹配。你不是,让你变成List类型!
object Transform_flatMap_2 {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
val rdd = context.makeRDD(List(List(1, 2), 3, List(4, 5)))
// 不能使用_
val mappedRdd = rdd.flatMap{
// 不是集合,变成集合
case list: List[_] => list
case data => List(data)
}
mappedRdd.collect.foreach(println)
}
}
5、glom
def glom(): RDD[Array[T]]
将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变
object Transform_glom {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
val rdd = context.makeRDD(List(1, 2, 3, 4), 2)
// glom:对同一个分区内的数据转换为相同类型数组处理,分区不变
val value: RDD[Array[Int]] = rdd.glom()
value.collect.foreach(array => println(array.mkString(", ")))
}
}
小功能:计算所有分区最大值求和(分区内取最大值,分区间最大值求和)
object Transform_glom {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
val rdd = context.makeRDD(List(1, 2, 3, 4), 2)
// glom:对同一个分区内的数据转换为相同类型数组处理,分区不变
val value = rdd.glom() // 分区内转成Array
.map( _.max) // 每个分区求出最大值
println(value.collect.sum) // 求和
}
}
6、groupBy
def groupByK(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为 shuffle。极限情况下,数据可能被分在同一个分区中
一个组的数据在一个分区中,但是并不是说一个分区中只有一个组
**
object Transform_groupBy {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
val rdd: RDD[Int] = context.makeRDD(List(1, 2, 3, 4), 2)
// groupBy分组,按照余数作为key分组
val value: RDD[(Int, Iterable[Int])] = rdd.groupBy(_ % 2)
value.collect.foreach(println)
}
}
小功能:按照首字母分组
object Transform_groupBy1 {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
val rdd: RDD[String] = context.makeRDD(List("Hello", "Spark", "Hadoop", "Storm"), 2)
// 根据首字母分组
val value: RDD[(Char, Iterable[String])] = rdd.groupBy(_.charAt(0))
value.collect.foreach(println)
}
}
小功能:分析apache.log日志中时间段的统计量
object Transform_groupBy2 {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
val rdd: RDD[String] = context.textFile("datas/apache.log")
// 获取访问数量
val value: RDD[(String, Int)] = rdd.map(
f = line => {
// 方式一:如果日期格式固定,通过字符串截串
// 方式二:通过SimpleDateFormat格式转换后获取
// 17/05/2015:10:05:03
var time = line.split(" ")(3)
// 获取到之后再转换成只有小时的形式
val date: Date = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss").parse(time)
// 返回
val hour: String = new SimpleDateFormat("HH").format(date)
// 返回二元组形式
(hour, 1)
}
).groupBy(_._1) // 根据时间分组
.map { case (hour, iter) => (hour, iter.size)} // 分组之后将结果聚合,
value.collect.foreach(println)
}
}
7、filter
def filter(f: T => Boolean): RDD[T]
将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜
**
object Transform_filter {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
val rdd: RDD[Int] = context.makeRDD(List(1, 2, 3, 4), 2)
// 过滤数据
val value: RDD[Int] = rdd.filter(_ % 2 == 0)
value.collect.foreach(println)
context.stop()
}
}
小功能:获取apache.log中访问时间在17/05/2015的
object Transform_filter_Test {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
val rdd: RDD[String] = context.textFile("datas/apache.log")
// 过滤数据,===当然也可以使用map,但是数据会被丢弃,不如直接过滤
val value: RDD[String] = rdd.filter(
// 可以直接匹配以这个日期开头的
line => {
val time: String = line.split(" ")(3)
time.startsWith("17/05/2015")
}
)
value.collect.foreach(println)
context.stop()
}
}
8、sample
/**
* Return a sampled subset of this RDD.
*
* @param withReplacement: 一个数是否能被抽取多次,即放回其中。
* @param fraction : 期待rdd中数据的概率
* 如果withReplacement设置为false,即不放回,表示每个元素被选中的可能性; fraction must be [0, 1]
* 如果withReplacement设置为true,放回数据: 表示每个元素被选中的次数; fraction must be greater than or equal to 0
* @param seed :随机生成的种子,同样种子每次产生的数据是相同的。默认是系统当前时间戳
*
* @note This is NOT guaranteed to provide exactly the fraction of the count
* of the given [[RDD]].
*/
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T] = {
require(fraction >= 0,
s"Fraction must be nonnegative, but got ${fraction}")
withScope {
require(fraction >= 0.0, "Negative fraction value: " + fraction)
if (withReplacement) {
// PoissonSampler算法保证放回
new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
} else {
// BernoulliSampler算法保证不放回
new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)
}
}
}
根据指定的规则从数据集中抽取数据
object Transform_sample {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
val rdd: RDD[Int] = context.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
val value: RDD[Int] = rdd.sample(
false // 是否放回
, 0.3 // false,每个元素出现的概率,
// , 1 // 种子,相同种子的时间戳生成数据一样的
)
println(value.collect.mkString(", "))
context.stop()
}
}
**
思考一个问题:有啥用,抽奖吗?
其实不是这样的,主要是为了解决极端情况下的数据倾斜问题
9、distinct
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
将数据集中重复的数据去重.
object Transform_distinct {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
val rdd = context.makeRDD(List(1, 2, 3, 4,1, 2, 3, 4))
// distinct去重
val value: RDD[Int] = rdd.distinct()
value.collect.foreach(println)
}
}
distinct是如何将数据去重的呢?
我们知道,scala中也有distinct函数,但是是使用底层HashSet数据结构实现的去重操作:
def distinct: Repr = {
val isImmutable = this.isInstanceOf[immutable.Seq[_]]
if (isImmutable && lengthCompare(1) <= 0) repr
else {
val b = newBuilder
// hashset
val seen = new mutable.HashSet[A]()
var it = this.iterator
var different = false
while (it.hasNext) {
val next = it.next
if (seen.add(next)) b += next else different = true
}
if (different || !isImmutable) b.result() else repr
}
}
但是spark中的distinct却是这样做的:
/**
* Return a new RDD containing the distinct elements in this RDD.
*/
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
def removeDuplicatesInPartition(partition: Iterator[T]): Iterator[T] = {
// Create an instance of external append only map which ignores values.
val map = new ExternalAppendOnlyMap[T, Null, Null](
createCombiner = _ => null,
mergeValue = (a, b) => a,
mergeCombiners = (a, b) => a)
map.insertAll(partition.map(_ -> null))
map.iterator.map(_._1)
}
// 在这里匹配可选的分区
partitioner match {
case Some(_) if numPartitions == partitions.length =>
mapPartitions(removeDuplicatesInPartition, preservesPartitioning = true)
// 来到这里兜底,将数据变成二元组形式,根据key聚合之后,只保留第一个,然后每个结果取元组第一个元素完成去重操作
case _ => map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
}
}
10、coalesce
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T]
根据数据量缩减分区**,用于大数据集过滤后,提高小数据集的执行效率
当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少分区的个数,减小任务调度成本
object Transform_coalesce {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
val rdd = context.makeRDD(List(1, 2, 3, 4, 5, 6),4)
// coalesce缩减分区
val value: RDD[Int] = rdd.coalesce(2)
value.saveAsTextFile("datas/output")
context.stop()
}
}
分区后两个分区分别是123,456
如果是三个分区呢?
object Transform_coalesce {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
val rdd = context.makeRDD(List(1, 2, 3, 4, 5, 6),3)
// coalesce缩减分区
val value: RDD[Int] = rdd.coalesce(2)
value.saveAsTextFile("datas/output")
context.stop()
}
}
数据变成了12,3456???
因为coalesce默认是不会将数据打乱重新组合的。这样子缩减分区可能出现数据倾斜。可以使用shuffle处理。
object Transform_coalesce {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
val rdd = context.makeRDD(List(1, 2, 3, 4, 5, 6),3)
// coalesce缩减分区
// 进行shuffle洗牌
val value: RDD[Int] = rdd.coalesce(2,true)
value.saveAsTextFile("datas/output")
context.stop()
}
}
数据变成了145,236,顺序完全乱了,但是数据不倾斜了。
思考:我想扩大分区怎么办?
扩大分区需要将coalesce调大并且一定要进行shuffle操作才可以打乱分区。虽然数据打乱,但是扩大了分区。
11、repartition
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
是一种简化的扩大分区,底层调用就是coalesce
object Transform_repartition {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
val rdd = context.makeRDD(List(1, 2, 3, 4, 5, 6),2)
// coalesce缩减分区
// 进行shuffle洗牌
val value: RDD[Int] = rdd.repartition(3)
value.saveAsTextFile("datas/output")
context.stop()
}
}
12、sortBy
def sortByK => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一致。中间存在 shuffle 的过程
object Transform_sortBy {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
val rdd = context.makeRDD(List(1, 3, 6, 4, 5, 2), 2)
// 按照自身升序排列,分区数量不会改变,但是会进行shuffle打乱操作
// rdd.sortBy(k => k).saveAsTextFile("datas/output")
// 第二个参数控制降序排列
rdd.sortBy(k => k,false).saveAsTextFile("datas/output")
context.stop()
}
}
- 双Value类型
13、intersection
对源 RDD 和参数 RDD 求交集后返回一个新的 RDD
14、union
对源 RDD 和参数 RDD 求并集后返回一个新的 RDD
15、subtract
以一个 RDD 元素为主,去除两个 RDD 重复元素,将其他元素保留下来。求差集D
思考:如果两个RDD数据类型不一致怎么办
16、zip
将两个 RDD 中的元素,以键值对的形式进行合并。其中,键值对中的 Key 为第 1 个 RDD中的元素,Value 为第 2 个 RDD 中的相同位置的元素。
思考:如果两个RDD数据类型不一致怎么办 思考:如果两个RDD数据分区不一致怎么办 思考:如果两个RDD分区数据量不一致怎么办
object Transform_doubleValue {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
val rdd1 = context.makeRDD(List(1, 2, 3, 4))
val rdd2 = context.makeRDD(List(3, 4, 5, 6))
// 交集
val rdd3: RDD[Int] = rdd1.intersection(rdd2)
println(rdd3.collect.mkString(","))
// 并集
val rdd4: RDD[Int] = rdd1.union(rdd2)
println(rdd4.collect.mkString(","))
// 差集
val rdd5: RDD[Int] = rdd1.subtract(rdd2)
println(rdd5.collect.mkString(","))
// 拉链
val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
println(rdd6.collect.mkString(","))
context.stop()
}
}
数据类型问题:
交集、并集、差集的数据源必须要求数据类型一致,方法签名规定了返回泛型T
def intersection(other: RDD[T]): RDD[T]
但是zip拉链可以返回不同数据类型。
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
分区数量问题:
val rdd1 = context.makeRDD(List(1, 2, 3, 4),2)
val rdd2 = context.makeRDD(List(3, 4, 5, 6),3)
// Can't zip RDDs with unequal numbers of partitions: List(2, 3)
val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
println(rdd6.collect.mkString(","))
运行时出错,不相等的数据分区,所以zip要求分区数量一致
分区数据量问题:
val rdd1 = context.makeRDD(List(1, 2, 3, 4, 5, 6), 3)
val rdd2 = context.makeRDD(List(3, 4, 5, 6), 3)
// Can only zip RDDs with same number of elements in each partition
val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
println(rdd6.collect.mkString(","))
只能够对分区数据量相同的分区进行拉链
- key-value类型
17、partitionBy
/**
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
*/
class PairRDDFunctions[K, V](self: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)
extends Logging with Serializable {
这个方法时PairRDDFunctions中的,并不是RDD的方法,并且只对键值类型的RDD才可以使用。
我们没有看到它和RDD的继承关系,为什么它也是RDD呢?
在RDD类中,有伴生对象RDD,定义了隐式转换函数,将键值类型的RDD转换成PairRDDFunctions
implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
new PairRDDFunctions(rdd)
}
这个partitionBy方法需要传入一个Partitioner分区器,我们以它的实现类HashPartitioner来传递
object Transform_partitionBy {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
val rdd: RDD[Int] = context.makeRDD(List(1, 2, 3, 4, 5, 6))
val mappedRdd: RDD[(Int, Int)] = rdd.map((_, 1))
// RDD => PairRDDFunctions
// 不是RDD的方法,但是PairRddFunctions
val value: RDD[(Int, Int)] = mappedRdd.partitionBy(new HashPartitioner(2)) // 需要传入分区器,这里以Hash分区
value.collect.foreach(println)
context.stop()
}
}
来看一下这个HashPartitioner,对数据进行分区的方法
class HashPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
def numPartitions: Int = partitions
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
// 其实就是取模运算
def nonNegativeMod(x: Int, mod: Int): Int = {
val rawMod = x % mod
rawMod + (if (rawMod < 0) mod else 0)
}
思考:如果重分区的分区器和当前RDD的分区器一样怎么办?
首先第一次调用partitionBy的时候传入HashPartitioner(2),方法中会跟自身partitioner进行比较,如果相同返回self本身,不同则洗牌。
def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
// 这里的 == 比较在scala中就是调用equals方法进行比较
if (self.partitioner == Some(partitioner)) {
self
} else {
new ShuffledRDD[K, V, V](self, partitioner)
}
}
在Partitioner中的equals方法如下:
override def equals(other: Any): Boolean = other match {
// 如果是HashPartitioner,比较两次分区数量是否一致
case h: HashPartitioner =>
h.numPartitions == numPartitions
// 其他返回false,可以重分区
case _ =>
false
}
所以两次如果传入相同的HashPartitioner和相同的分区数量,是不会重新分区的,而是将原先数据直接返回。
Spark还有其他分区器吗?
Partitioner是一个抽象类,有三个子类
RangePartitioner一般在排序的时候使用。我们在使用sortBy的时候就使用了
def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
this.keyBy[K](f)
// 这里
.sortByKey(ascending, numPartitions)
.values
}
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)] = self.withScope
{
// RangePartitioner
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
}
PythonPartitioner是锁着的,只有在特定包权限下才可以使用
private[spark] class PythonPartitioner(
override val numPartitions: Int,
val pyPartitionFunctionId: Long)
extends Partitioner {
也可以自己继承Partitioner定制分区规则
18、reduceByKey
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
可以将数据按照相同的 Key 对 Value 进行聚合
object Transform_reduceByKey {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
val rdd = context.makeRDD(List(("a",1),("a",2) ,("a",3),("b",4)))
// 根据相同key对数据进行两两聚合。
// scala中是两两聚合,spark是基于scala,所以也是两两聚合
// 如果相同的key只有一个,则不会参与聚合操作
val reducedRdd: RDD[(String, Int)] = rdd.reduceByKey((x: Int, y: Int) => {
println(s"x: $x , y: $y")
x + y
})
reducedRdd.collect.foreach(println)
context.stop()
}
}
19、groupByKey
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
将数据源的数据根据 key 对 value 进行分组
object Transform_groupByKey {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
val rdd = context.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 4)))
// 根据相同key对数据分组,形成一个对偶元组,value是一个确定类型的集合
val value: RDD[(String, Iterable[Int])] = rdd.groupByKey()
value.collect.foreach(println)
// 对比group,分组的规则不明确,可以是key或自定义规则,分组后的value,也是一个k,v形式的
val value1: RDD[(String, Iterable[(String, Int)])] = rdd.groupBy(_._1)
value1.collect.foreach(println)
context.stop()
}
}
groupByKey和reduceByKey的区别?
- 从 shuffle 的角度:reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高。
- 从功能的角度:reduceByKey 其实包含分组和聚合的功能。groupByKey 只能分组,不能聚合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用 groupByKey
reduceByKey分区内和分区间的计算规则是相同的。
20、aggregateByKey
/**
* The former operation is used for merging values within a
* partition, and the latter is used for merging values between partitions. To avoid memory
* allocation, both of these functions are allowed to modify and return their first argument
* instead of creating a new U.
第一个参数表示分区内的聚合规则,第二个参数表示分区间的计算规则
*/
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp)
}
将数据根据不同的规则进行分区内计算和分区间计算
object Transform_aggregationByKey {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
val rdd = context.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 4)))
// 柯里化传值,第一个参数是分区内聚合规则
// 参数表示第一个值的初始值,比如第一个数,没有第二个数,如何比较它是大是小
// 第二个参数是分区间聚合规则
// 参数第一个表示序列的比较规则
// 参数第二个表示聚合的操作
val value: RDD[(String, Int)] = rdd.aggregateByKey(0)(
// 分区内计算规则
(x, y) => math.max(x, y),
// 分区间计算规则
(x, y) => x + y
)
value.collect.foreach(println)
context.stop()
}
}
小练习:求相同key的平均值
**
object Transform_aggregationByKey_Test {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
val rdd = context.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 4)))
// 获取函数平均值
val value: RDD[(String, Int)] = rdd.aggregateByKey((0, 0))(
// 转换成元组
(t, v) => {
// 第一个表示为总和,第二个为该key出现的次数
(t._1 + v, t._2 + 1)
},
(t1, t2) => {
// 聚合
(t1._1 + t2._1, t1._2 + t2._2)
}
)
// 聚合之后根据key将value进行映射
.mapValues {
// 获取平均值
case (sum, count) => sum / count
}
value.collect.foreach(println)
context.stop()
}
}
21、foldByKey**
def foldByKey(
zeroValue: V,
partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey。
object Transform_foldByKey {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
val rdd = context.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 4)),2)
// 聚合计算时,分区内和分区间计算规则相同,可以简化
val value: RDD[(String, Int)] = rdd.foldByKey(0)(_ + _)
value.collect.foreach(println)
context.stop()
}
}
22、combineByKey
def combineByKeyC => C,
mergeCombiners: (C, C) => C,
numPartitions: Int): RDD[(K, C)]
最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致。
aggregationByKey的数据还需要一个初始值,我们可不可以把第一个出现的值作为初始值呢?是不是就省去了一些操作
object Transform_combineByKey {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
val rdd = context.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 4)))
// 获取函数平均值
// 第一个参数表示第一个值的转换,
// 第二个参数表示分区内的计算规则
// 第三个参数表示分区间的计算规则
val value: RDD[(String, Int)] = rdd.combineByKey(v => (v,1),
// 需要加上类型声明
(t: (Int,Int), v) => {
(t._1 + v,t._2 + 1)
},
(t1: (Int,Int),t2: (Int,Int)) => {
(t1._1 + t2._1,t1._2 + t2._2)
}
)
// 聚合之后根据key将value进行映射
.mapValues {
// 获取平均值
case (sum, count) => sum / count
}
value.collect.foreach(println)
context.stop()
}
}
四个聚合算子的区别:
本质都是调用combineByKeyWithClassTag方法,区别在于计算规则和初始值的规则
object Transform_aggregations_distinction {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
val rdd = context.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 4)))
// 四种聚合算子的区别
/*
aggregateByKey:
combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v), // 初始值进行转换
cleanedSeqOp // 分区内计算规则
, combOp // 分区间计算规则
, partitioner) // 分区器
foldByKey:
combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v), // 初始值进行转换
cleanedFunc // 分区内计算规则
, cleanedFunc // 分区间计算规则
, partitioner) // 分区器
reduceByKey:
combineByKeyWithClassTag[V]((v: V) => v // 初始值进行转换
, func // 分区内计算规则
, func // 分区间计算规则
, partitioner) // 分区器
combineByKey:
combineByKeyWithClassTag(createCombiner // 第一个值作为初始值
, mergeValue // 分区内计算规则
, mergeCombiners // 分区间计算规则
, defaultPartitioner(self)) // 分区器
*/
rdd.aggregateByKey(0)(_+_,_+_)
rdd.foldByKey(0)(_+_)
rdd.reduceByKey(_+_)
rdd.combineByKey(v=>v,(a: Int,b) => a+b,(a: Int,b: Int) => a+b)
context.stop()
}
}
24、join
**
def joinW]): RDD[(K, (V, W))]
在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的(K,(V,W))的 RDD
object Transform_join {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
val rdd = context.makeRDD(List(("a",1),("a",2),("c",3)))
val rdd2 = context.makeRDD(List(("a",4),("a",5),("f",6)))
// join : 两个不同数据源的数据,相同的key的value会连接在一起,形成元组
// 如果两个数据源key没有匹配上,那么数据不会出现在结果中
// 如果两个数据源key有多个相同的,会依次进行匹配,可能出现笛卡尔乘积,数据量增加。
val value: RDD[(String, (Int, Int))] = rdd.join(rdd2)
value.collect.foreach(println)
context.stop()
}
}
25、leftOuterJoin
def leftOuterJoinW],
partitioner: Partitioner): RDD[(K, (V, Option[W]))]
类似于 SQL 语句的左外连接
26、rightOuterJoin
def rightOuterJoinW], partitioner: Partitioner)
: RDD[(K, (Option[V], W))]
类似于SQL语句的右外连接
object Transform_leftOuterJoin {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
val rdd = context.makeRDD(List(("a",1),("b",2),("c",3)))
val rdd2 = context.makeRDD(List(("a",4),("b",5),("c",6)))
// leftOuterJoin
val value: RDD[(String, (Int, Option[Int]))] = rdd.leftOuterJoin(rdd2)
// rightOuterJoin
val value1: RDD[(String, (Option[Int], Int))] = rdd.rightOuterJoin(rdd2)
value.collect.foreach(println)
value1.collect.foreach(println)
context.stop()
}
}
27、cogroup
def cogroupW1, W2, W3], other2: RDD[(K, W2)], other3: RDD[(K, W3)])
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]
在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable
object Transform_cogroup {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("map"))
val rdd = context.makeRDD(List(("a",1),("b",2),("c",3),("c",6)))
val rdd2 = context.makeRDD(List(("a",4),("b",5)))
val rdd3 = context.makeRDD(List(("a",4),("b",5),("d",5)))
val rdd4 = context.makeRDD(List(("a",4),("b",5),("c",2)))
// cogroup: connect + group
//把相同key连接在一起,value是一个个组,如果没有就是()空的
// 如果一个数据源中多个相同key,连接放在一起
// 最多可以同时连接三个group
val value: RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int], Iterable[Int]))] = rdd.cogroup(rdd2,rdd3,rdd4)
value.collect().foreach(println)
context.stop()
}
}
2、案例实操
agent.log:时间戳,省份,城市,用户,广告,中间字段使用空格分隔。
需求描述:统计出每一个省份每个广告被点击数量排行的 Top3
object Transform_case {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("operator-case"))
// 1、获取原始数据: 时间戳,省份,城市,用户,广告
val rawRdd: RDD[String] = context.textFile("datas/agent.log")
// 2、将原始数据进行结构的转换。方便统计
// 时间戳,省份,城市,用户,广告 => (( 省份,广告), 1)
val transformedRdd: RDD[((String, String), Int)] = rawRdd.map(
line => {
val datas: Array[String] = line.split(" ")
((datas(1), datas(4)), 1)
}
)
// 3、将转换后的数据,分组聚合
// (( 省份,广告), 1) => (( 省份,广告), sum)
val reducedRdd: RDD[((String, String), Int)] = transformedRdd.reduceByKey(_ + _)
// 4、将聚合后的数据进行结构的转换
// (( 省份,广告), sum) => (省份,(广告, sum))
val convertedRdd: RDD[(String, (String, Int))] = reducedRdd.map {
case ((prv, ad), sum) => (prv, (ad, sum))
}
// 5、将数据根据省份分组
// (省份,【(广告A,sumA)】,【(广告B,sumB)】
val groupedRdd: RDD[(String, Iterable[(String, Int)])] = convertedRdd.groupByKey()
// 6、将分组后的数据组内降序排序,取前三
val value: RDD[(String, List[(String, Int)])] = groupedRdd.mapValues(
iter => iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(3)
)
// 7、采集数据打印在控制台
value.collect.foreach(println)
context.stop()
}
}
3、行动算子
所谓的行动算子,其实就是触发作业(Job)执行的方法,底层代码调用的是环境对象的runJob方法,底层代码中会创建ActiveJob,并提交执行。
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
......
// 一系列的封装之后,来到DAGScheduler中的runJob
def runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
val start = System.nanoTime
// 在这里提交Job
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
waiter.completionFuture.value.get match {
case scala.util.Success(_) =>
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
case scala.util.Failure(exception) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
val callerStackTrace = Thread.currentThread().getStackTrace.tail
exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
throw exception
}
}
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
// Check to make sure we are not launching a task on a partition that does not exist.
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
throw new IllegalArgumentException(
"Attempting to access a non-existent partition: " + p + ". " +
"Total number of partitions: " + maxPartitions)
}
val jobId = nextJobId.getAndIncrement()
if (partitions.isEmpty) {
val clonedProperties = Utils.cloneProperties(properties)
if (sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION) == null) {
clonedProperties.setProperty(SparkContext.SPARK_JOB_DESCRIPTION, callSite.shortForm)
}
val time = clock.getTimeMillis()
listenerBus.post(
SparkListenerJobStart(jobId, time, Seq.empty, clonedProperties))
listenerBus.post(
SparkListenerJobEnd(jobId, time, JobSucceeded))
// Return immediately if the job is running 0 tasks
return new JobWaiter[U](this, jobId, 0, resultHandler)
}
assert(partitions.nonEmpty)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter[U](this, jobId, partitions.size, resultHandler)
// 在这里会触发一个Job被提交的事件,具体怎么处理后面再说
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
Utils.cloneProperties(properties)))
waiter
}
在这个eventProcessLoop中,也就是DAGScheduler中,有handleXxxx,处理各种事件的方法,这里处理Job被提交
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties): Unit = {
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: BarrierJobSlotsNumberCheckFailed =>
// If jobId doesn't exist in the map, Scala coverts its value null to 0: Int automatically.
val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId,
(_: Int, value: Int) => value + 1)
logWarning(s"Barrier stage in job $jobId requires ${e.requiredConcurrentTasks} slots, " +
s"but only ${e.maxConcurrentTasks} are available. " +
s"Will retry up to ${maxFailureNumTasksCheck - numCheckFailures + 1} more times")
if (numCheckFailures <= maxFailureNumTasksCheck) {
messageScheduler.schedule(
new Runnable {
override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func,
partitions, callSite, listener, properties))
},
timeIntervalNumTasksCheck,
TimeUnit.SECONDS
)
return
} else {
// Job failed, clear internal data.
barrierJobIdToNumTasksCheckFailures.remove(jobId)
listener.jobFailed(e)
return
}
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
// Job submitted, clear internal data.
barrierJobIdToNumTasksCheckFailures.remove(jobId)
// 这里创建一个ActiveJob
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions".format(
job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)
}
1、reduce
def reduce(f: (T, T) => T): T
聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据
2、collect
def collect(): Array[T]
在驱动程序中,以数组 Array 的形式返回数据集的所有元素
3、count
def count(): Long
返回 RDD 中元素的个数
4、first
def first(): T
返回 RDD 中的第一个元素
5、take
def take(num: Int): Array[T]
返回一个由 RDD 的前 n 个元素组成的数组
6、takeOrdered
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
返回一个由RDD的前n个元素组成的数组
object Action_Normal {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("operator-action"))
val rdd: RDD[Int] = context.makeRDD(List(1, 2, 3, 4))
// reduce : 对数据进行归约计算,直接出结果
val i: Int = rdd.reduce(_ + _)
println(i)
// collect : 收集数据,并且在Driver中转换成数组
val ints: Array[Int] = rdd.collect()
println(ints.mkString(", "))
// count : 统计数据的个数
val l: Long = rdd.count()
println(l)
// first : 取数据源的第一个数据
val i1: Int = rdd.first()
println(i1)
// take : 取数据源的前n个数据
val ints1: Array[Int] = rdd.take(3)
println(ints1.mkString(", "))
val rdd1 = context.makeRDD(List(4,2,1,3))
// takeOrdered : 先对数据源进行排序操作,再取前n个数据
val ints2: Array[Int] = rdd1.takeOrdered(3)(Ordering.Int.reverse)
println(ints2.mkString(", "))
context.stop()
}
}
7、aggregate
def aggregateU: ClassTag(seqOp: (U, T) => U, combOp: (U, U) => U): U
分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合
object Action_aggregate {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("operator-action"))
val rdd: RDD[Int] = context.makeRDD(List(1, 2, 3, 4),2)
// aggregate : 聚合
// 分区内:0 + 1 + 2 ,0 + 3 + 4, 分区间:3 + 7 = 10
// val res: Int = rdd.aggregate(0)(_ + _, _ + _)
// 如果没有指定分区,8个分区, 4个空,4个有数据。
// 每个分区内计算都是加10=80 + (1 + 2 + 3 + 4) + 10(分区间计算)
val res: Int = rdd.aggregate(10)(_ + _, _ + _)
/*
aggregateByKey : 聚合操作的初始值只参与分区内计算
aggregate : 聚合操作的初始值不仅参与分区内计算,而且参与分区间计算
*/
println(res)
context.stop()
}
}
8、fold
def fold(zeroValue: T)(op: (T, T) => T): T
折叠操作,aggregate 的简化版操作
如果分区内和分区间计算规则一致时,可以使用fold简化
9、countByKey
def countByKey(): Map[K, Long]
统计每种 key 的个数
10、countByValue
def countByValue()(implicit ord: Ordering[T]
统计每种value的个数
object Action_fold_countByKey_Value {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("operator-action"))
val rdd: RDD[Int] = context.makeRDD(List(1, 2, 3, 4),2)
// aggregate : 聚合
// 分区内:0 + 1 + 2 ,0 + 3 + 4, 分区间:3 + 7 = 10
// val res: Int = rdd.aggregate(0)(_ + _, _ + _)
// 如果没有指定分区,8个分区, 4个空,4个有数据。
// 每个分区内计算都是加10=80 + (1 + 2 + 3 + 4) + 10(分区间计算)
val res: Int = rdd.fold(10)(_ + _)
println(res)
val rdd1: RDD[(String, Int)] = context.makeRDD(List(("a", 1), ("b", 1), ("b", 1)))
// countByKey : 根据键值对类型,计算key的数量
val stringToLong: collection.Map[String, Long] = rdd1.countByKey()
println(stringToLong)
rdd.countByValue()
// countByValue : 任何类型,获取单值的数量
val valRes: collection.Map[Int, Long] = rdd.countByValue()
println(valRes)
context.stop()
}
}
11、save的相关算法
def saveAsTextFile(path: String): Unit
def saveAsObjectFile(path: String): Unit
def saveAsSequenceFile(
path: String,
codec: Option[Class[_ <: CompressionCodec]] = None): Unit
object Action_save {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("operator-action"))
val rdd: RDD[Int] = context.makeRDD(List(1, 2, 3, 4),2)
rdd.saveAsTextFile("output")
rdd.saveAsObjectFile("output1")
rdd.map((_,1)).saveAsSequenceFile("output2")
context.stop()
}
}
12、foreach
/**
* Applies a function f to all elements of this RDD.
*/
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
我们经常使用rdd.collect().foreach()遍历打印,谁又知道rdd.foreach()区别呢?
object Action_foreach {
def main(args: Array[String]): Unit = {
val context = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("operator-action"))
val rdd: RDD[Int] = context.makeRDD(List(1, 2, 3, 4),2)
// 这个其实是在Driver端内存集合中的循环遍历方法
rdd.collect().foreach(println)
// foreach其实是Executor端内存集合数据打印
rdd.foreach(println)
// RDD的方法和Scala集合对象中的方法不一样
// 集合对象的方法都是在同一个节点的内存中完成的
// RDD的方法可以将计算逻辑发送到Executor(分布式节点)执行
// RDD方法外部的操作都是在Driver端进行的,而方法内部的逻辑代码是在Executor端进行的。
context.stop()
}
}
4、wordCount的11种死法
1、groupBy
通过对相同数据的分组,进行聚合操作,实现功能。
/**
* 1、groupBy : 传统
*/
def wordCount1(sc: SparkContext) = {
val rdd: RDD[String] = sc.makeRDD(List("Hello Spark", "Hello Scala"))
// 分割
val words: RDD[String] = rdd.flatMap(_.split(" "))
// 分组
val group: RDD[(String, Iterable[String])] = words.groupBy(word => word)
// 聚合(求大小)
val wordCount: RDD[(String, Int)] = group.mapValues(iter => iter.size)
}
2、groupByKey
/**
* 2、groupByKey : 简化一丢丢,但是效率上,由于是走shuffle的,会打乱
*/
def wordCount2(sc: SparkContext) = {
val rdd: RDD[String] = sc.makeRDD(List("Hello Spark", "Hello Scala"))
// 分割
val words: RDD[String] = rdd.flatMap(_.split(" "))
// 转换成元组形式
val word: RDD[(String, Int)] = words.map((_, 1))
// 分组
val group: RDD[(String, Iterable[Int])] = word.groupByKey()
// 聚合(求大小)
val wordCount: RDD[(String, Int)] = group.mapValues(iter => iter.size)
}
3、reduceByKey
/**
* 3、reduceByKey : 效率比groupByKey好点,不会走shuffle,直接聚合
*/
def wordCount3(sc: SparkContext) = {
val rdd: RDD[String] = sc.makeRDD(List("Hello Spark", "Hello Scala"))
// 分割
val words: RDD[String] = rdd.flatMap(_.split(" "))
// 转换成元组形式
val word: RDD[(String, Int)] = words.map((_, 1))
// 聚合(求大小)
val wordCount: RDD[(String, Int)] = word.reduceByKey(_ + _)
}
4、aggregateByKey
/**
* 4、aggregateByKey :
*/
def wordCount4(sc: SparkContext) = {
val rdd: RDD[String] = sc.makeRDD(List("Hello Spark", "Hello Scala"))
// 分割
val words: RDD[String] = rdd.flatMap(_.split(" "))
// 转换成元组形式
val word: RDD[(String, Int)] = words.map((_, 1))
// 聚合(求大小)
val wordCount: RDD[(String, Int)] = word.aggregateByKey(0)(_ + _, _ + _)
wordCount.collect().foreach(println)
}
5、foldByKey
/**
* 5、foldByKey :分区内和分区间规则相同时可简化
*/
def wordCount5(sc: SparkContext) = {
val rdd: RDD[String] = sc.makeRDD(List("Hello Spark", "Hello Scala"))
// 分割
val words: RDD[String] = rdd.flatMap(_.split(" "))
// 转换成元组形式
val word: RDD[(String, Int)] = words.map((_, 1))
// 聚合(求大小)
val wordCount: RDD[(String, Int)] = word.foldByKey(0)(_ + _)
wordCount.collect().foreach(println)
}
6、combineByKey
/**
* 6、combineByKey :三个参数,初始值进行操作
*/
def wordCount6(sc: SparkContext) = {
val rdd: RDD[String] = sc.makeRDD(List("Hello Spark", "Hello Scala"))
// 分割
val words: RDD[String] = rdd.flatMap(_.split(" "))
// 转换成元组形式
val word: RDD[(String, Int)] = words.map((_, 1))
// 聚合(求大小)
val wordCount: RDD[(String, Int)] = word.combineByKey(v => v,
(x: Int, y) => x + y,
(x1: Int, x2: Int) => x1 + x2
)
wordCount.collect().foreach(println)
}
7、countByKey
/**
* 7、countByKey :直接对key的数量聚合
*/
def wordCount7(sc: SparkContext) = {
val rdd: RDD[String] = sc.makeRDD(List("Hello Spark", "Hello Scala"))
// 分割
val words: RDD[String] = rdd.flatMap(_.split(" "))
// 转换成元组形式
val word: RDD[(String, Int)] = words.map((_, 1))
val wordCount: collection.Map[String, Long] = word.countByKey()
println(wordCount)
}
8、countByValue
/**
* 8、countByValue :直接对value的数量聚合
*/
def wordCount8(sc: SparkContext) = {
val rdd: RDD[String] = sc.makeRDD(List("Hello Spark", "Hello Scala"))
// 分割
val words: RDD[String] = rdd.flatMap(_.split(" "))
// 转换成元组形式
val wordCount: collection.Map[String, Long] = words.countByValue()
println(wordCount)
}
9、reduce
/**
* 9、reduce :直接聚合
*/
def wordCount9(sc: SparkContext) = {
val rdd: RDD[String] = sc.makeRDD(List("Hello Spark", "Hello Scala"))
// 分割
val words: RDD[String] = rdd.flatMap(_.split(" "))
// 为了能够直接reduce聚合,考虑将每个单词转换成Map形式,次数为1
val maps: RDD[mutable.Map[String, Long]] = words.map(
word => {
// 1、创建一个单独的Map,将每个单词放入其中
mutable.Map[String, Long]((word, 1))
}
)
// 2、将所有Map计算
// reduce只能对相同类型进行聚合
val wordCount: mutable.Map[String, Long] = maps.reduce(
// 遍历所有map,不断往第一个map中更新数据
(map1, map2) => {
map2.foreach {
case (word, count) => {
// 取出所有word的count
val cnt: Long = map1.getOrElse(word, 0L)
// 给map1中更新值
map1.update(word, cnt + count)
}
}
// 将map1返回
map1
}
)
println(wordCount)
}
10、aggregate
/**
* 10、aggregate :跟aggregateByKey差不多
*/
def wordCount10(sc: SparkContext) = {
val rdd: RDD[String] = sc.makeRDD(List("Hello Spark", "Hello Scala"))
// 拆分成单词
val words: RDD[String] = rdd.flatMap(_.split(" "))
// 根据key进行分组
val wordCount: mutable.Map[String, Long] = words.aggregate(mutable.Map[String, Long]())(
(map: mutable.Map[String, Long], word) => {
// 分区内集合,一个分区中没有改数据,获取0放入,否则数量+1
val count = map.getOrElse(word, 0L)
map.put(word, count + 1L)
map
},
// 分区间聚合
(map1: mutable.Map[String, Long], map2: mutable.Map[String, Long]) => {
map2.foreach {
case (word, count) => {
// 取出所有word的count
val cnt: Long = map1.getOrElse(word, 0L)
// 给map1中更新值
map1.update(word, cnt + count)
}
}
// 将map1返回
map1
}
)
println(wordCount)
}
11、fold
待续
object wordCount8fold {
def main(args: Array[String]): Unit = {
//partitionBy
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("File - RDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(
List(("a", 1), ("b", 2), ("a", 1), ("b", 2), ("a", 1), ("b", 2)), 2
)
val strRDD: RDD[String] = rdd.map {
case (str, sum) => {
(str + " ") * sum
}
}
val flatMapRDD = strRDD.flatMap(_.split(" "))
flatMapRDD.map(s => mutable.Map(s -> 1)).fold(mutable.Map[String, Int]())(
(map1: mutable.Map[String, Int], map2: mutable.Map[String, Int]) => {
map1.foldLeft(map2)(
(innerMap, kv) => {
innerMap(kv._1) = innerMap.getOrElse(kv._1, 0) + kv._2
innerMap
}
)
}
).foreach(println)
// println (result.collect().mkString(","))
}
}
原文链接:https://blog.csdn.net/qq_34158117/article/details/106611349