1、初始化 Spark

Spark 程序必须做的第一件事情是创建一个 SparkContext 对象,它会告诉 Spark 如何访问集群。要创建一个 SparkContext,首先需要构建一个包含应用程序的信息的 SparkConf 对象。
每一个 JVM 可能只能激活一个 SparkContext 对象。在创新一个新的对象之前,必须调用 stop() 该方法停止活跃的 SparkContext

  1. SparkConf sparkConf = new SparkConf()
  2. .setAppName("javaSparkPi")
  3. .setMaster("local")
  4. .set("spark.driver.host", "localhost").set("spark.testing.memory", "21474800000");
  5. JavaSparkContext jsc=new JavaSparkContext(sparkConf);

这个 appName 参数是一个在集群 UI 上展示应用程序的名称。master 是一个 spark集群的url,或者指定为在 local mode(本地模式)中运行的 “local” 字符串。

2、弹性分布式数据集(RDDs)

Spark 主要以一个 弹性分布式数据集(RDD)的概念为中心,它是一个容错且可以执行并行操作的元素的集合。有两种方法可以创建 RDD一种是针对_一个已存在的集合,另一种是在外部存储系统中引用一个数据集,例如,一个共享文件系统,HDFS,HBase,或者提供 Hadoop InputFormat 的任何数据源。

2.1 并行集合

针对已存在的集合通过调用 SparkContext 的 parallelize 方法来创建并行集合。在创建后,该 distributed dataset(分布式数据集)(distData)可以并行的执行操作。

  1. //List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
  2. //JavaRDD<Integer> distData = jsc.parallelize(data);
  3. List<Integer> l = new ArrayList<>(n);
  4. for (int i = 0; i < n; i++) {
  5. l.add(i);
  6. }
  7. JavaRDD<Integer> dataSet = jsc.parallelize(l, slices);

并行集合中一个很重要参数是 partitions(分区)的数量,它可用来切割 dataset(数据集)。Spark 将在集群中的每一个分区上运行一个任务。通常您希望群集中的每一个 CPU 计算 2-4 个分区。一般情况下,Spark 会尝试根据您的群集情况来自动的设置的分区的数量。当然,您也可以将分区数作为第二个参数传递到 parallelize(例如,sc.parallelize(data, 10))方法中来手动的设置它。

2.2 外部 Datasets(数据集)

Spark 可以从 Hadoop 所支持的任何存储源中创建 distributed dataset(分布式数据集),包括本地文件系统,HDFS,Cassandra,HBase,Amazon S3 等等。Spark 支持文本文件,SequenceFiles,以及任何其它的 Hadoop InputFormat。
可以使用 SparkContext 的 textFile 方法来创建文本文件的 RDD。此方法需要一个文件的 URI(计算机上的本地路径(绝对路径和相对路径),hdfs://,s3n:// 等等的 URI),并且读取它们作为一个 lines(行)的集合。下面是一个调用示例:

  1. JavaRDD<String> lines = jsc.textFile("src/main/resources/demo/kdy.txt");

3、RDD操作

RDDs support 两种类型的操作:transformations(转换),它会在一个已存在的 dataset 上创建一个新的 dataset,和 actions(动作),将在 dataset 上运行的计算后返回到 driver 程序。
Spark 中所有的 transformations 都是 _lazy(懒加载的)
,因此它不会立刻计算出结果,默认情况下,每次你在 RDD 运行一个 action 的时,每个 transformed RDD 都会被重新计算。但是,您也可用 persist(或 cache)方法将 RDD persist(持久化)到内存中;在这种情况下,Spark 为了下次查询时可以更快地访问,会把数据保存在集群上。

3.1 Transformations(转换)

下表列出了一些 Spark 常用的 transformations(转换)。

Transformation(转换) Meaning(含义)
map(func) 返回一个新的 distributed dataset(分布式数据集),它由每个 source(数据源)中的元素应用一个函数 func 来生成。
filter(func) 返回一个新的 distributed dataset(分布式数据集),它由每个 source(数据源)中应用一个函数 func 且返回值为 true 的元素来生成。
flatMap(func) 与 map 类似,但是每一个输入的 item 可以被映射成 0 个或多个输出的 items(所以 func 应该返回一个 Seq 而不是一个单独的 item)。
mapPartitions(func) 与 map 类似,但是单独的运行在在每个 RDD 的 partition(分区,block)上,所以在一个类型为 T 的 RDD 上运行时 func 必须是 Iterator => Iterator 类型。
mapPartitionsWithIndex(func) 与 mapPartitions 类似,但是也需要提供一个代表 partition 的 index(索引)的 interger value(整型值)作为参数的 func,所以在一个类型为 T 的 RDD 上运行时 func 必须是 (Int, Iterator) => Iterator 类型。
sample(withReplacement, fraction, seed) 样本数据,设置是否放回(withReplacement),采样的百分比(fraction)、使用指定的随机数生成器的种子(seed)。
union(otherDataset) 反回一个新的 dataset,它包含了 source dataset(源数据集)和 otherDataset(其它数据集)的并集。
intersection(otherDataset) 返回一个新的 RDD,它包含了 source dataset(源数据集)和 otherDataset(其它数据集)的交集。
distinct([numTasks])) 返回一个新的 dataset,它包含了 source dataset(源数据集)中去重的元素。
groupByKey([numTasks]) 在一个 (K, V) pair 的 dataset 上调用时,返回一个 (K, Iterable) .
Note: 如果分组是为了在每一个 key 上执行聚合操作(例如,sum 或 average),此时使用 reduceByKey 或 aggregateByKey 来计算性能会更好.
Note: 默认情况下,并行度取决于父 RDD 的分区数。可以传递一个可选的 numTasks 参数来设置不同的任务数。
reduceByKey(func, [numTasks]) 在 (K, V) pairs 的 dataset 上调用时,返回 dataset of (K, V) pairs 的 dataset,其中的 values 是针对每个 key 使用给定的函数 func 来进行聚合的,它必须是 type (V,V) => V 的类型。像 groupByKey 一样,reduce tasks 的数量是可以通过第二个可选的参数来配置的。
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 在 (K, V) pairs 的 dataset 上调用时,返回 (K, U) pairs 的 dataset,其中的 values 是针对每个 key 使用给定的 combine 函数以及一个 neutral “0” 值来进行聚合的。允许聚合值的类型与输入值的类型不一样,同时避免不必要的配置。像 groupByKey 一样,reduce tasks 的数量是可以通过第二个可选的参数来配置的。
sortByKey([ascending], [numTasks]) 在一个 (K, V) pair 的 dataset 上调用时,其中的 K 实现了 Ordered,返回一个按 keys 升序或降序的 (K, V) pairs 的 dataset,由 boolean 类型的 ascending 参数来指定。
join(otherDataset, [numTasks]) 在一个 (K, V) 和 (K, W) 类型的 dataset 上调用时,返回一个 (K, (V, W)) pairs 的 dataset,它拥有每个 key 中所有的元素对。Outer joins 可以通过 leftOuterJoin, rightOuterJoin 和 fullOuterJoin 来实现。
cogroup(otherDataset, [numTasks]) 在一个 (K, V) 和的 dataset 上调用时,返回一个 (K, (Iterable, Iterable)) tuples 的 dataset。这个操作也调用了 groupWith。
cartesian(otherDataset) 在一个 T 和 U 类型的 dataset 上调用时,返回一个 (T, U) pairs 类型的 dataset(所有元素的 pairs,即笛卡尔积)。
pipe(command, [envVars]) 通过使用 shell 命令来将每个 RDD 的分区给 Pipe。例如,一个 Perl 或 bash 脚本。RDD 的元素会被写入进程的标准输入(stdin),并且 lines(行)输出到它的标准输出(stdout)被作为一个字符串型 RDD 的 string 返回。
coalesce(numPartitions) Decrease(降低)RDD 中 partitions(分区)的数量为 numPartitions。对于执行过滤后一个大的 dataset 操作是更有效的。
repartition(numPartitions) Reshuffle(重新洗牌)RDD 中的数据以创建或者更多的 partitions(分区)并将每个分区中的数据尽量保持均匀。该操作总是通过网络来 shuffles 所有的数据。
repartitionAndSortWithinPartitions(partitioner) 根据给定的 partitioner(分区器)对 RDD 进行重新分区,并在每个结果分区中,按照 key 值对记录排序。这比每一个分区中先调用 repartition 然后再 sorting(排序)效率更高,因为它可以将排序过程推送到 shuffle 操作的机器上进行。

3.2 Actions(动作)

下表列出了一些 Spark 常用的 actions 操作。

Action(动作) Meaning(含义)
reduce(func) 使用函数 func 聚合 dataset 中的元素,这个函数 func 输入为两个元素,返回为一个元素。这个函数应该是可交换(commutative)和关联(associative)的,这样才能保证它可以被并行地正确计算。
collect() 在 driver 程序中,以一个 array 数组的形式返回 dataset 的所有元素。这在过滤器(filter)或其他操作(other operation)之后返回足够小(sufficiently small)的数据子集通常是有用的。
count() 返回 dataset 中元素的个数。
first() 返回 dataset 中的第一个元素(类似于 take(1)。
take(n) 将数据集中的前 n 个元素作为一个 array 数组返回。
takeSample(withReplacement, num, [seed]) 对一个 dataset 进行随机抽样,返回一个包含 num 个随机抽样(random sample)元素的数组,参数 withReplacement 指定是否有放回抽样,参数 seed 指定生成随机数的种子。
takeOrdered(n, [ordering]) 返回 RDD 按自然顺序(natural order)或自定义比较器(custom comparator)排序后的前 n 个元素。
saveAsTextFile(path) 将 dataset 中的元素以文本文件(或文本文件集合)的形式写入本地文件系统、HDFS 或其它 Hadoop 支持的文件系统中的给定目录中。Spark 将对每个元素调用 toString 方法,将数据元素转换为文本文件中的一行记录。
saveAsSequenceFile(path)
(Java and Scala) 将 dataset 中的元素以 Hadoop SequenceFile 的形式写入到本地文件系统、HDFS 或其它 Hadoop 支持的文件系统指定的路径中。该操作可以在实现了 Hadoop 的 Writable 接口的键值对(key-value pairs)的 RDD 上使用。在 Scala 中,它还可以隐式转换为 Writable 的类型(Spark 包括了基本类型的转换,例如 Int,Double,String 等等)。
saveAsObjectFile(path)
(Java and Scala) 使用 Java 序列化(serialization)以简单的格式(simple format)编写数据集的元素,然后使用 SparkContext.objectFile() 进行加载。
countByKey() 仅适用于(K,V)类型的 RDD。返回具有每个 key 的计数的(K , Int)pairs 的 hashmap。
foreach(func) 对 dataset 中每个元素运行函数 func。这通常用于副作用(side effects),例如更新一个 Accumulator(累加器)或与外部存储系统(external storage systems)进行交互。Note:修改除 foreach()之外的累加器以外的变量(variables)可能会导致未定义的行为(undefined behavior)。详细介绍请阅读 Understanding closures(理解闭包) 部分。

4、RDD实例介绍

4.1、计数 count、countByKey、countByValue

  1. System.out.println(lines.count());
  2. System.out.println(counts.countByValue());
  3. System.out.println(counts.countByKey());

4.2、一对多转换 flatMap map

flatmap属于一类非常常用的spark函数,简单的说作用就是将一条rdd数据使用你定义的函数给分解成多条rdd数据

map通常是一对一,即输入一个,对应输出一个。但是输出的结果可以是一个元组,一个元组则可能包含多个数据,但是一个元组是一个整体,因此算是一个元素。

  1. //Spark中的RDD就是一个不可变的分布式对象集合
  2. //按空格切割每一条数据返回一个新的rdd new FlatMapFunction<加载后的格式,处理后的格式>()
  3. JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
  4. @Override
  5. public Iterator<String> call(String line) {
  6. List<String> strings = Arrays.asList(line.split("\\s+"));
  7. return strings.iterator();
  8. }
  9. });
  10. 123 234 123 => [123,234,123]

4.3、T转map mapToPair

表示输入类型为T,生成的key-value对中的key类型为k,value类型为v,对本例,T=String, K=String, V=Integer(计数)

  1. //将rdd转化成二元的rdd,拼接参数new PairFunction<加载后的key,处理后的key,拼接的value>()
  2. JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String,Integer>() {
  3. @Override
  4. public Tuple2<String, Integer> call(String word) throws Exception {//拼接的value
  5. return new Tuple2<>(word, 1);
  6. }
  7. });
  8. [123,234,123] => [(123,1),(234,1),(123,1)]

4.4、聚合 reduceByKey reduce

要求被操作的数据(即下面实例中的ones)是KV键值对形式,该方法会按照key相同的进行聚合,在两两运算

reduce(fun) 算子:

  • 每次传入两个参数通过fun 的到一个返回值,该返回值继续与后面的值进行调用fun,
  • 直到所有的数据计算完成,最后返回一个计算结果
  1. //聚合相同key的rdd并统计次数
  2. JavaPairRDD<String,Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
  3. @Override
  4. public Integer call(Integer v1, Integer v2) throws Exception {
  5. return v1+v2;
  6. }
  7. });
  8. //求总和
  9. List<Integer> data = Lists.newArrayList(1, 2, 3, 4, 5, 6);
  10. JavaRDD<Integer> rdd01 = javaSparkContext.parallelize(data);
  11. List<Integer> result2 = Lists.newArrayList();
  12. result2.add(rdd01.reduce((Integer v1, Integer v2) -> {
  13. return v1 + v2;
  14. }));

4.5、缓存 cache persist

cache和persist都是用于将一个RDD进行缓存的,这样在之后使用的过程中就不需要重新计算了,可以大大节省程序运行时间。

cache只有一个默认的缓存级别MEMORY_ONLY ,

而persist可以根据情况设置其它的缓存级别。 硬盘 内存 堆外内存 12种缓存级别

  1. //缓存
  2. counts.cache();
  3. counts.persist(StorageLevel.MEMORY_AND_DISK());

4.6、收集 collect() Action算子

Spark内有collect方法,是Action操作里边的一个算子,这个方法可以将RDD类型的数据转化为数组,同时会从远程集群是拉取数据到driver端。

  1. JavaPairRDD<String, Integer> counts = ones.reduceByKey((i1, i2) -> i1 + i2);
  2. List<Tuple2<String, Integer>> output = counts.collect();
  3. for (Tuple2<?,?> tuple : output) {
  4. System.out.println("输出:"+tuple._1() + ": " + tuple._2());
  5. }

4.7、 取前面的数据 take top

take函数用于获取RDD中从0到num-1下标的元素(不排序)。

top函数用户从RDD中按照默认降序或者指定的排序规则返回前N个元素。该函数带隐式函数Ordering[T],既可以指定排序规则。

  1. sorted.take(10);
  2. List<Tuple2<String, Integer>> output = sorted.top(k,new MyComparator());
  3. for (Tuple2<String, Integer> tuple : output) {
  4. result.put(tuple._1(), tuple._2());
  5. }
  6. //实现序列化
  7. public class MyComparator implements Comparator<Tuple2<String, Integer>>, Serializable {
  8. @Override
  9. public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) {
  10. return o1._2()> o2._2()?1:0;
  11. }
  12. }

4.8、排序 sortByKey sortBy

sortByKey函数是对PairRDD进行排序,也就是有Key和Value的RDD

true升序,false降序

  1. temp.sortByKey(false)

sortBy是对标准的RDD进行排序

4.9、 过滤 filter

filter 算子使用

通过函数筛选出需要的数据元素,返回true表示保留,返回false表示抛弃

  1. JavaRDD<Integer> rdd01 = rdd01.filter(new Function<Integer, Boolean>() {
  2. @Override
  3. public Boolean call(Integer integer) throws Exception {
  4. return integer<6;
  5. }
  6. });

4.10、union

union 算子:

  • 取两个RDD的并集,不去重,会增加partition的数量,同时并行度也会增加
  1. JavaRDD<Integer> unionRdd = javaSparkContext.parallelize(data);
  2. rdd01 = rdd01.union(unionRdd);

4.11、遍历forech result2.forEach(System.out::println);

4.12 分组 groupBy

groupBy()方法是根据用户自定义的情况进行分组

groupByKey()则是根据key值进行分组的

  1. JavaPairRDD<String, Iterable<Integer>> groupRdd = rdd01.groupBy(x -> {
  2. log.info("======grouby========:{}", x);
  3. if (x > 3) return "大于三";
  4. else return "小于等于三";
  5. });

4.13、 快照 checkpoint

通过将计算代价较大的 RDD checkpoint 一下,当下游 RDD 计算出错时,可以直接从 checkpoint 过的 RDD 那里读取数据继续算。

  1. javaSparkContext.setCheckpointDir("d:/checkpoint");
  2. //TODO
  3. counts.checkpoint();