RDD 编程模型

在 Spark 中,RDD 被表示为对象,通过对象上的方法调用来对 RDD 进行转换。经过一系列的transformations定义 RDD 之后,就可以调用 actions 触发 RDD 的计算,action可以是向应用程序返回结果(count, collect等),或者是向存储系统保存数据(saveAsTextFile等)。
  在Spark中,只有遇到action,才会执行 RDD 的计算(即延迟计算),这样在运行时可以通过管道的方式传输多个转换。要使用 Spark,开发者需要编写一个 Driver 程序,它被提交到集群以调度运行 Worker。Driver 中定义了一个或多个 RDD,并调用 RDD 上的 action,Worker 则执行 RDD 分区计算任务。

RDD的创建

在Spark中创建RDD的方式可以分为三种:

  • 从集合中创建RDD;
  • 从外部存储创建RDD;
  • 从其他RDD创建。

    从集合中创建 RDD

  1. 使用parallelize函数创建
    scala> val arr = Array(10,20,30,40,50,60)
    arr: Array[Int] = Array(10, 20, 30, 40, 50, 60)

  2. 使用makeRDD函数创建
    val rdd1 = sc.makeRDD(Array(10,20,30,40,50,60))
    说明

  • 一旦 RDD 创建成功, 就可以通过并行的方式去操作这个分布式的数据集了.
  • parallelize和makeRDD还有一个重要的参数就是把数据集切分成的分区数.
  • Spark 会为每个分区运行一个任务(task). 正常情况下, Spark 会自动的根据你的集群来设置分区数

从外部存储创建 RDD

 Spark 也可以从任意 Hadoop 支持的存储数据源来创建分布式数据集.
 可以是本地文件系统, HDFS, Cassandra, HVase, Amazon S3 等等.
 Spark 支持 文本文件, SequenceFiles, 和其他所有的 Hadoop InputFormat.

scala> var distFile = sc.textFile(“words.txt”)
distFile.collect

从其他 RDD 转换得到新的 RDD

就是通过 RDD 的各种转换算子来得到新的 RDD.