- 1、初始化 Spark
- 弹性分布式数据集(RDDs)">2、弹性分布式数据集(RDDs)
- 2.1 并行集合
- 2.2 外部 Datasets(数据集)
- 3、RDD操作
- 4、RDD实例介绍
- 4.1、计数 count、countByKey、countByValue
- 4.2、一对多转换 flatMap map
- 4.3、T转map mapToPair
- 4.4、聚合 reduceByKey reduce
- 4.5、缓存 cache persist
- 4.6、收集 collect() Action算子
- 4.7、 取前面的数据 take top
- 4.8、排序 sortByKey sortBy
- sortBy是对标准的RDD进行排序
- 4.9、 过滤 filter
- 4.10、union
- 4.11、遍历forech result2.forEach(System.out::println);
- 4.12 分组 groupBy
- 4.13、 快照 checkpoint
1、初始化 Spark
Spark 程序必须做的第一件事情是创建一个 SparkContext 对象,它会告诉 Spark 如何访问集群。要创建一个 SparkContext,首先需要构建一个包含应用程序的信息的 SparkConf 对象。
每一个 JVM 可能只能激活一个 SparkContext 对象。在创新一个新的对象之前,必须调用 stop() 该方法停止活跃的 SparkContext。
SparkConf sparkConf = new SparkConf()
.setAppName("javaSparkPi")
.setMaster("local")
.set("spark.driver.host", "localhost").set("spark.testing.memory", "21474800000");
JavaSparkContext jsc=new JavaSparkContext(sparkConf);
这个 appName 参数是一个在集群 UI 上展示应用程序的名称。master 是一个 spark集群的url,或者指定为在 local mode(本地模式)中运行的 “local” 字符串。
2、弹性分布式数据集(RDDs)
Spark 主要以一个 弹性分布式数据集(RDD)的概念为中心,它是一个容错且可以执行并行操作的元素的集合。有两种方法可以创建 RDD:一种是针对_一个已存在的集合,另一种是在外部存储系统中引用一个数据集,例如,一个共享文件系统,HDFS,HBase,或者提供 Hadoop InputFormat 的任何数据源。
2.1 并行集合
针对已存在的集合通过调用 SparkContext 的 parallelize 方法来创建并行集合。在创建后,该 distributed dataset(分布式数据集)(distData)可以并行的执行操作。
//List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
//JavaRDD<Integer> distData = jsc.parallelize(data);
List<Integer> l = new ArrayList<>(n);
for (int i = 0; i < n; i++) {
l.add(i);
}
JavaRDD<Integer> dataSet = jsc.parallelize(l, slices);
并行集合中一个很重要参数是 partitions(分区)的数量,它可用来切割 dataset(数据集)。Spark 将在集群中的每一个分区上运行一个任务。通常您希望群集中的每一个 CPU 计算 2-4 个分区。一般情况下,Spark 会尝试根据您的群集情况来自动的设置的分区的数量。当然,您也可以将分区数作为第二个参数传递到 parallelize(例如,sc.parallelize(data, 10))方法中来手动的设置它。
2.2 外部 Datasets(数据集)
Spark 可以从 Hadoop 所支持的任何存储源中创建 distributed dataset(分布式数据集),包括本地文件系统,HDFS,Cassandra,HBase,Amazon S3 等等。Spark 支持文本文件,SequenceFiles,以及任何其它的 Hadoop InputFormat。
可以使用 SparkContext 的 textFile 方法来创建文本文件的 RDD。此方法需要一个文件的 URI(计算机上的本地路径(绝对路径和相对路径),hdfs://,s3n:// 等等的 URI),并且读取它们作为一个 lines(行)的集合。下面是一个调用示例:
JavaRDD<String> lines = jsc.textFile("src/main/resources/demo/kdy.txt");
3、RDD操作
RDDs support 两种类型的操作:transformations(转换),它会在一个已存在的 dataset 上创建一个新的 dataset,和 actions(动作),将在 dataset 上运行的计算后返回到 driver 程序。
Spark 中所有的 transformations 都是 _lazy(懒加载的),因此它不会立刻计算出结果,默认情况下,每次你在 RDD 运行一个 action 的时,每个 transformed RDD 都会被重新计算。但是,您也可用 persist(或 cache)方法将 RDD persist(持久化)到内存中;在这种情况下,Spark 为了下次查询时可以更快地访问,会把数据保存在集群上。
3.1 Transformations(转换)
下表列出了一些 Spark 常用的 transformations(转换)。
Transformation(转换) | Meaning(含义) |
---|---|
map(func) | 返回一个新的 distributed dataset(分布式数据集),它由每个 source(数据源)中的元素应用一个函数 func 来生成。 |
filter(func) | 返回一个新的 distributed dataset(分布式数据集),它由每个 source(数据源)中应用一个函数 func 且返回值为 true 的元素来生成。 |
flatMap(func) | 与 map 类似,但是每一个输入的 item 可以被映射成 0 个或多个输出的 items(所以 func 应该返回一个 Seq 而不是一个单独的 item)。 |
mapPartitions(func) | 与 map 类似,但是单独的运行在在每个 RDD 的 partition(分区,block)上,所以在一个类型为 T 的 RDD 上运行时 func 必须是 Iterator |
mapPartitionsWithIndex(func) | 与 mapPartitions 类似,但是也需要提供一个代表 partition 的 index(索引)的 interger value(整型值)作为参数的 func,所以在一个类型为 T 的 RDD 上运行时 func 必须是 (Int, Iterator |
sample(withReplacement, fraction, seed) | 样本数据,设置是否放回(withReplacement),采样的百分比(fraction)、使用指定的随机数生成器的种子(seed)。 |
union(otherDataset) | 反回一个新的 dataset,它包含了 source dataset(源数据集)和 otherDataset(其它数据集)的并集。 |
intersection(otherDataset) | 返回一个新的 RDD,它包含了 source dataset(源数据集)和 otherDataset(其它数据集)的交集。 |
distinct([numTasks])) | 返回一个新的 dataset,它包含了 source dataset(源数据集)中去重的元素。 |
groupByKey([numTasks]) | 在一个 (K, V) pair 的 dataset 上调用时,返回一个 (K, Iterable |
Note: 如果分组是为了在每一个 key 上执行聚合操作(例如,sum 或 average),此时使用 reduceByKey 或 aggregateByKey 来计算性能会更好. | |
Note: 默认情况下,并行度取决于父 RDD 的分区数。可以传递一个可选的 numTasks 参数来设置不同的任务数。 | |
reduceByKey(func, [numTasks]) | 在 (K, V) pairs 的 dataset 上调用时,返回 dataset of (K, V) pairs 的 dataset,其中的 values 是针对每个 key 使用给定的函数 func 来进行聚合的,它必须是 type (V,V) => V 的类型。像 groupByKey 一样,reduce tasks 的数量是可以通过第二个可选的参数来配置的。 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 在 (K, V) pairs 的 dataset 上调用时,返回 (K, U) pairs 的 dataset,其中的 values 是针对每个 key 使用给定的 combine 函数以及一个 neutral “0” 值来进行聚合的。允许聚合值的类型与输入值的类型不一样,同时避免不必要的配置。像 groupByKey 一样,reduce tasks 的数量是可以通过第二个可选的参数来配置的。 |
sortByKey([ascending], [numTasks]) | 在一个 (K, V) pair 的 dataset 上调用时,其中的 K 实现了 Ordered,返回一个按 keys 升序或降序的 (K, V) pairs 的 dataset,由 boolean 类型的 ascending 参数来指定。 |
join(otherDataset, [numTasks]) | 在一个 (K, V) 和 (K, W) 类型的 dataset 上调用时,返回一个 (K, (V, W)) pairs 的 dataset,它拥有每个 key 中所有的元素对。Outer joins 可以通过 leftOuterJoin, rightOuterJoin 和 fullOuterJoin 来实现。 |
cogroup(otherDataset, [numTasks]) | 在一个 (K, V) 和的 dataset 上调用时,返回一个 (K, (Iterable |
cartesian(otherDataset) | 在一个 T 和 U 类型的 dataset 上调用时,返回一个 (T, U) pairs 类型的 dataset(所有元素的 pairs,即笛卡尔积)。 |
pipe(command, [envVars]) | 通过使用 shell 命令来将每个 RDD 的分区给 Pipe。例如,一个 Perl 或 bash 脚本。RDD 的元素会被写入进程的标准输入(stdin),并且 lines(行)输出到它的标准输出(stdout)被作为一个字符串型 RDD 的 string 返回。 |
coalesce(numPartitions) | Decrease(降低)RDD 中 partitions(分区)的数量为 numPartitions。对于执行过滤后一个大的 dataset 操作是更有效的。 |
repartition(numPartitions) | Reshuffle(重新洗牌)RDD 中的数据以创建或者更多的 partitions(分区)并将每个分区中的数据尽量保持均匀。该操作总是通过网络来 shuffles 所有的数据。 |
repartitionAndSortWithinPartitions(partitioner) | 根据给定的 partitioner(分区器)对 RDD 进行重新分区,并在每个结果分区中,按照 key 值对记录排序。这比每一个分区中先调用 repartition 然后再 sorting(排序)效率更高,因为它可以将排序过程推送到 shuffle 操作的机器上进行。 |
3.2 Actions(动作)
下表列出了一些 Spark 常用的 actions 操作。
Action(动作) | Meaning(含义) |
---|---|
reduce(func) | 使用函数 func 聚合 dataset 中的元素,这个函数 func 输入为两个元素,返回为一个元素。这个函数应该是可交换(commutative)和关联(associative)的,这样才能保证它可以被并行地正确计算。 |
collect() | 在 driver 程序中,以一个 array 数组的形式返回 dataset 的所有元素。这在过滤器(filter)或其他操作(other operation)之后返回足够小(sufficiently small)的数据子集通常是有用的。 |
count() | 返回 dataset 中元素的个数。 |
first() | 返回 dataset 中的第一个元素(类似于 take(1)。 |
take(n) | 将数据集中的前 n 个元素作为一个 array 数组返回。 |
takeSample(withReplacement, num, [seed]) | 对一个 dataset 进行随机抽样,返回一个包含 num 个随机抽样(random sample)元素的数组,参数 withReplacement 指定是否有放回抽样,参数 seed 指定生成随机数的种子。 |
takeOrdered(n, [ordering]) | 返回 RDD 按自然顺序(natural order)或自定义比较器(custom comparator)排序后的前 n 个元素。 |
saveAsTextFile(path) | 将 dataset 中的元素以文本文件(或文本文件集合)的形式写入本地文件系统、HDFS 或其它 Hadoop 支持的文件系统中的给定目录中。Spark 将对每个元素调用 toString 方法,将数据元素转换为文本文件中的一行记录。 |
saveAsSequenceFile(path) | |
(Java and Scala) | 将 dataset 中的元素以 Hadoop SequenceFile 的形式写入到本地文件系统、HDFS 或其它 Hadoop 支持的文件系统指定的路径中。该操作可以在实现了 Hadoop 的 Writable 接口的键值对(key-value pairs)的 RDD 上使用。在 Scala 中,它还可以隐式转换为 Writable 的类型(Spark 包括了基本类型的转换,例如 Int,Double,String 等等)。 |
saveAsObjectFile(path) | |
(Java and Scala) | 使用 Java 序列化(serialization)以简单的格式(simple format)编写数据集的元素,然后使用 SparkContext.objectFile() 进行加载。 |
countByKey() | 仅适用于(K,V)类型的 RDD。返回具有每个 key 的计数的(K , Int)pairs 的 hashmap。 |
foreach(func) | 对 dataset 中每个元素运行函数 func。这通常用于副作用(side effects),例如更新一个 Accumulator(累加器)或与外部存储系统(external storage systems)进行交互。Note:修改除 foreach()之外的累加器以外的变量(variables)可能会导致未定义的行为(undefined behavior)。详细介绍请阅读 Understanding closures(理解闭包) 部分。 |
4、RDD实例介绍
4.1、计数 count、countByKey、countByValue
System.out.println(lines.count());
System.out.println(counts.countByValue());
System.out.println(counts.countByKey());
4.2、一对多转换 flatMap map
flatmap属于一类非常常用的spark函数,简单的说作用就是将一条rdd数据使用你定义的函数给分解成多条rdd数据
map通常是一对一,即输入一个,对应输出一个。但是输出的结果可以是一个元组,一个元组则可能包含多个数据,但是一个元组是一个整体,因此算是一个元素。
//Spark中的RDD就是一个不可变的分布式对象集合
//按空格切割每一条数据返回一个新的rdd new FlatMapFunction<加载后的格式,处理后的格式>()
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String line) {
List<String> strings = Arrays.asList(line.split("\\s+"));
return strings.iterator();
}
});
123 234 123 => [123,234,123]
4.3、T转map mapToPair
表示输入类型为T,生成的key-value对中的key类型为k,value类型为v,对本例,T=String, K=String, V=Integer(计数)
//将rdd转化成二元的rdd,拼接参数new PairFunction<加载后的key,处理后的key,拼接的value>()
JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String,Integer>() {
@Override
public Tuple2<String, Integer> call(String word) throws Exception {//拼接的value
return new Tuple2<>(word, 1);
}
});
[123,234,123] => [(123,1),(234,1),(123,1)]
4.4、聚合 reduceByKey reduce
要求被操作的数据(即下面实例中的ones)是KV键值对形式,该方法会按照key相同的进行聚合,在两两运算
reduce(fun) 算子:
- 每次传入两个参数通过fun 的到一个返回值,该返回值继续与后面的值进行调用fun,
- 直到所有的数据计算完成,最后返回一个计算结果
//聚合相同key的rdd并统计次数
JavaPairRDD<String,Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1+v2;
}
});
//求总和
List<Integer> data = Lists.newArrayList(1, 2, 3, 4, 5, 6);
JavaRDD<Integer> rdd01 = javaSparkContext.parallelize(data);
List<Integer> result2 = Lists.newArrayList();
result2.add(rdd01.reduce((Integer v1, Integer v2) -> {
return v1 + v2;
}));
4.5、缓存 cache persist
cache和persist都是用于将一个RDD进行缓存的,这样在之后使用的过程中就不需要重新计算了,可以大大节省程序运行时间。
cache只有一个默认的缓存级别MEMORY_ONLY ,
而persist可以根据情况设置其它的缓存级别。 硬盘 内存 堆外内存 12种缓存级别
//缓存
counts.cache();
counts.persist(StorageLevel.MEMORY_AND_DISK());
4.6、收集 collect() Action算子
Spark内有collect方法,是Action操作里边的一个算子,这个方法可以将RDD类型的数据转化为数组,同时会从远程集群是拉取数据到driver端。
JavaPairRDD<String, Integer> counts = ones.reduceByKey((i1, i2) -> i1 + i2);
List<Tuple2<String, Integer>> output = counts.collect();
for (Tuple2<?,?> tuple : output) {
System.out.println("输出:"+tuple._1() + ": " + tuple._2());
}
4.7、 取前面的数据 take top
take函数用于获取RDD中从0到num-1下标的元素(不排序)。
top函数用户从RDD中按照默认降序或者指定的排序规则返回前N个元素。该函数带隐式函数Ordering[T],既可以指定排序规则。
sorted.take(10);
List<Tuple2<String, Integer>> output = sorted.top(k,new MyComparator());
for (Tuple2<String, Integer> tuple : output) {
result.put(tuple._1(), tuple._2());
}
//实现序列化
public class MyComparator implements Comparator<Tuple2<String, Integer>>, Serializable {
@Override
public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) {
return o1._2()> o2._2()?1:0;
}
}
4.8、排序 sortByKey sortBy
sortByKey函数是对PairRDD进行排序,也就是有Key和Value的RDD
true升序,false降序
temp.sortByKey(false)
sortBy是对标准的RDD进行排序
4.9、 过滤 filter
filter 算子使用
通过函数筛选出需要的数据元素,返回true表示保留,返回false表示抛弃
JavaRDD<Integer> rdd01 = rdd01.filter(new Function<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) throws Exception {
return integer<6;
}
});
4.10、union
union 算子:
- 取两个RDD的并集,不去重,会增加partition的数量,同时并行度也会增加
JavaRDD<Integer> unionRdd = javaSparkContext.parallelize(data);
rdd01 = rdd01.union(unionRdd);
4.11、遍历forech result2.forEach(System.out::println);
4.12 分组 groupBy
groupBy()方法是根据用户自定义的情况进行分组
groupByKey()则是根据key值进行分组的
JavaPairRDD<String, Iterable<Integer>> groupRdd = rdd01.groupBy(x -> {
log.info("======grouby========:{}", x);
if (x > 3) return "大于三";
else return "小于等于三";
});
4.13、 快照 checkpoint
通过将计算代价较大的 RDD checkpoint 一下,当下游 RDD 计算出错时,可以直接从 checkpoint 过的 RDD 那里读取数据继续算。
javaSparkContext.setCheckpointDir("d:/checkpoint");
//TODO
counts.checkpoint();