创建RDD的三种方式

RDD是Spark编程的核心,在进行Spark编程时,首要任务是创建一个初始的RDD
这样就相当于设置了Spark应用程序的输入源数据
然后在创建了初始的RDD之后,才可以通过Spark 提供的一些高阶函数,对这个RDD进行操作,来获取其它的RDD

Spark提供三种创建RDD方式:集合、本地文件、HDFS文件

  • 使用程序中的集合创建RDD,主要用于进行测试,可以在实际部署到集群运行之前,自己使用集合构造一些测试数据,来测试后面的spark应用程序的流程。
  • 使用本地文件创建RDD,主要用于临时性地处理一些存储了大量数据的文件
  • 使用HDFS文件创建RDD,是最常用的生产环境的处理方式,主要可以针对HDFS上存储的数据,进行离线批处理操作。

    集合(parallelize)

    通过SparkContext的parallelize()方法将集合转为RDD
    调用parallelize()时,有一个重要的参数可以指定,就是将集合切分成多少个partition。Spark会为每一个partition运行一个task来进行处理。
    Spark默认会根据集群的配置来设置partition的数量。可以通过parallelize()方法的二个参数,来设置RDD的partition数量,例如:parallelize(arr, 5)

案例:集合所有元素求和

scala代码

  1. object CreateRddByArrayScala {
  2. def main(args: Array[String]): Unit = {
  3. //创建SparkContext
  4. val conf = new SparkConf()
  5. conf.setAppName("CreateRddByArrayScala")
  6. conf.setMaster("local")
  7. val sc = new SparkContext(conf)
  8. //创建集合
  9. val arr = Array(1, 2, 3, 4, 5)
  10. //基于集合创建RDD
  11. val rdd = sc.parallelize(arr)
  12. //对集合中的数据求和
  13. val sum = rdd.reduce(_ + _)
  14. //-->在driver执行
  15. println(sum)
  16. //停止SparkContext
  17. sc.stop()
  18. }
  19. }

运行结果:
image.png
java代码

public class CreateRddByArrayJava {
    public static void main(String[] args) {
        //创建JavaSparkContext
        SparkConf conf = new SparkConf();
        conf.setAppName("CreateRddByArrayJava");
        conf.setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        //创建集合
        List<Integer> arr = Arrays.asList(1, 2, 3, 4, 5);
        JavaRDD<Integer> rdd = sc.parallelize(arr);
        Integer sum = rdd.reduce((Function2<Integer, Integer, Integer>) (i1, i2) -> i1 + i2);

        System.out.println(sum);

        sc.stop();
    }
}

本地文件和HDFS文件(textFile)

通过SparkContext的textFile()方法,可以针对本地文件或HDFS文件创建RDD,RDD中的每个元素就是文件中的一行文本内容
textFile()方法支持针对目录、压缩文件以及通配符创建RDD
Spark默认会为HDFS文件的每一个Block创建一个partition,也可以通过textFile()的第二个参数手动设置分区数量,只能比Block数量多,不能比Block数量少

案例:求HDFS文本的总长度 hello you
hello me

scala代码

object CreateRddByFileScala {

  def main(args: Array[String]): Unit = {
    //创建SparkContext
    val conf = new SparkConf()
    conf.setAppName("CreateRddByFileScala")
    conf.setMaster("local")
    val sc = new SparkContext(conf)

    //读取文件数据,可以在textFile中指定生成的RDD的分区数量,默认等于block块数量
    val path = "hdfs://bigdata1:9000/test/hello.txt"
    val rdd = sc.textFile(path)

    //获取每一行数据的长度,计算文件内数据的总长度
    val length = rdd.map(_.length).reduce(_ + _)

    println(length)

    sc.stop()

  }

}

运行结果
image.png
java 代码

public class CreateRddByFileJava {
    public static void main(String[] args) {
        //创建JavaSparkContext
        SparkConf conf = new SparkConf();
        conf.setAppName("CreateRddByArrayJava")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        String path = "D:\\test\\hello.txt";
        path = "hdfs://bigdata1:9000/test/hello.txt";

        JavaRDD<String> rdd = sc.textFile(path, 2);

        //获取每一行数据的长度
        JavaRDD<Integer> lengthRDD = rdd.map((Function<String, Integer>) line -> line.length());

        //计算文件内数据的总长度
        Integer length = lengthRDD.reduce((Function2<Integer, Integer, Integer>) (i1, i2) -> i1 + i2);

        System.out.println(length);

        sc.stop();
    }
}

Transformation和Action

Spark对RDD的操作可以整体分为两类:TransformationAction
不管是Transformation里面的操作还是Action里面的操作,我们一般会把它们称之为算子,例如:map算子、reduce算子

Transformation(转换,lazy)

可以翻译为转换,表示是针对RDD中数据的转换操作,主要会针对已有的RDD创建一个新的RDD:常见的有map、flatMap、filter等等

Transformation的特性:lazy
如果一个spark任务中只定义了transformation算子,那么即使你执行这个任务,任务中的算子也不会执行。

也就是说,transformation是不会触发spark任务的执行,它们只是记录了对RDD所做的操作,不会执行。只有当transformation之后,接着执行了一个action操作,那么所有的transformation才会执行。
Spark通过lazy这种特性,来进行底层的spark任务执行的优化,避免产生过多中间结果。

Action(执行)

可以翻译为执行,表示是触发任务执行的操作,主要对RDD进行最后的操作,比如遍历、reduce、保存到文件等,并且还可以把结果返回给Driver程序

Action的特性:执行Action操作才会触发一个Spark 任务的运行,从而触发这个Action之前所有的Transformation的执行

以我们的WordCount代码为例:

 //第一步:创建SparkContext     
    val conf = new SparkConf()     
    conf.setAppName("WordCountScala")//设置任务名称     
      //.setMaster("local")//local表示在本地执行     
    val sc = new SparkContext(conf)     

    //第二步:加载数据     
    var path = "D:\\hello.txt"     
    if(args.length==1){     
      path = args(0)     
    }     

//这里通过textFile()方法,针对外部文件创建了一个RDD,linesRDD,实际上,程序执行到这里为止,hello.txt文件的数据是不会被加载到内存中的。linesRDD只是代表了一个指向hello.txt文件的引用     
    val linesRDD = sc.textFile(path)     

    //第三步:对数据进行切割,把一行数据切分成一个一个的单词     
//这里通过flatMap算子对linesRDD进行了转换操作,把每一行数据中的单词切开,获取了一个转换后的wordsRDD,但是由于linesRDD目前是没有数据的,现在不会做任何操作,只是进行了逻辑上的定义而已,最终生成的wordsRDD也只是一个逻辑上的RDD,此时里面并没有任何数据     
    val wordsRDD = linesRDD.flatMap(_.split(" "))     

    //第四步:迭代words,将每个word转化为(word,1)这种形式     
//这个操作和前面分析的flatMap的操作是一样的,最终获取了一个逻辑上的pairRDD,此时里面也是没有任何数据的     
    val pairRDD = wordsRDD.map((_,1))     

    //第五步:根据key(其实就是word)进行分组聚合统计     
//这个操作也是和前面分析的flatMap操作是一样的,最终获取了一个逻辑上的wordCountRDD,此时里面也是没有任何数据的     
    val wordCountRDD = pairRDD.reduceByKey(_ + _)     

    //第六步:将结果打印到控制台     
//这行代码执行了一个action操作,foreach,此时会触发之前所有transformation算子的执行,Spark会将这些算子拆分成多个task发送到多个机器上并行执行,这个foreach算子是没有返回值的,所以不会向Driver进程返回数据,如果是reduce操作,则会向Driver进程返回最终的结果数据。     
//注意:只有当任务执行到这一行代码的时候任务才会真正开始执行计算,如果任务中没有这一行代码,前面的所有算子是不会执行的     
    wordCountRDD.foreach(wordCount=>println(wordCount._1+"--"+wordCount._2))     

    //第七步:停止SparkContext     
    sc.stop()

常用Transformation算子

先来看一下官方文档,进入2.4.3的文档界面
image.png
image.png
这里面列出了Spark支持的所有的transformation算子
image.png

算子 介绍
map 将RDD中的每个元素进行处理,一进一出
filter 对RDD中每个元素进行判断,返回true则保留
flatMap 与map类似,但是每个元素都可以返回一个或多个新元素
groupByKey 根据key进行分组,每个key对应一个Iterable
reduceByKey 对每个相同key对应的value进行reduce操作
sortByKey 对每个相同key对应的value进行排序操作(全局排序)
join 对两个包含对的RDD进行join操作
distinct 对RDD中的元素进行全局去重

map

将RDD中的每个元素进行处理,一进一出

对集合中每个元素乘以2

  def mapOp(sc: SparkContext): Unit = {
    val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
    rdd.map(_ * 2).foreach(println(_))
  }

image.png

filter

对RDD中每个元素进行判断,返回true则保留

过滤出集合中的偶数

  def filterOP(sc: SparkContext): Unit = {
    val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
    rdd.filter(_ % 2 == 0).foreach(println(_))
  }

image.png

flatMap

与map类似,但是每个元素都可以返回一个或多个新元素

将行拆分为单词

  def flatMapOP(sc: SparkContext): Unit = {
    val rdd = sc.parallelize(Array("hello world", "hello scala"))
    rdd.flatMap(_.split(" ")).foreach(println(_))
  }

image.png

groupByKey

根据key进行分组,每个key对应一个Iterable

对每个大区的主播进行分组

  def groupByKeyOp(sc: SparkContext): Unit = {

    //使用Tuple类型存放主播的信息,格式:(主播id,地区),在这里可以把这个tuple认为是一个key-value
    val rdd = sc.parallelize(Array((150001, "US"), (150002, "CN"), (150003, "CN"), (150004, "IN")))

    //需要使用map对tuple中的数据进行位置互换,因为我们需要把大区作为key进行分组操作,map算子之后的格式数据格式是("US",150001)
    //注意:在使用类似于groupByKey这种基于key的算子的时候,需要提前把RDD中的数据组装成tuple2这种形式
    rdd.map(tup => (tup._2, tup._1)).groupByKey().foreach(tup => {
      //获取大区信息
      val area = tup._1
      print(area + ":")

      val it = tup._2
      //获取同一个大区下面的所有用户id
      for (uid <- it) {
        print(uid + " ")
      }

      println()
    })

  }

image.png

如果tuple中的数据列数超过了2列怎么办?比如多了主播性别

  def groupByKeyOp2(sc: SparkContext): Unit = {

    val rdd = sc.parallelize(Array((150001, "US", "male"), (150002, "CN", "female"), (150003, "CN", "male"), (150004, "IN", "female")))

    //把需要作为key的那一列作为tuple2的第一列,剩下的可以再使用一个tuple2包装一下,格式:("US",(150001,"male"))
    rdd.map(tup => (tup._2, (tup._1, tup._3))).groupByKey().foreach(tup => {
      val area = tup._1
      print(area + ":")

      val it = tup._2
      for (elem <- it) {
        //elem也是一个tuple,格式 :(150001,"male")
        print(elem + " ")
      }

      println()
    })

  }

image.png

reduceByKey

对每个相同key对应的value进行reduce操作

统计每个大区的主播数量

  def reduceByKeyOp(sc: SparkContext): Unit = {
    val rdd = sc.parallelize(Array((150001, "US"), (150002, "CN"), (150003, "CN"), (150004, "IN")))

    //由于这个需求只需要使用到大区信息,所以在map操作的时候只保留大区信息即可
    rdd.map(tup => (tup._2, 1)).reduceByKey(_ + _).foreach(tup => {
      println(tup._1 + ":" + tup._2)
    })

  }

image.png

sortByKey和sortBy

对每个相同key对应的value进行排序操作(全局排序)

对主播的音浪收入排序

  def sortByKey(sc: SparkContext): Unit = {
    val rdd = sc.parallelize(Array((150001, 400), (150002, 200), (150003, 300), (150004, 100)))

    //由于需要对音浪收入进行排序,所以需要把音浪收入作为key,在这里要进行位置和互换
    //rdd.map(tup => (tup._2, tup._1)).sortByKey().foreach(println(_))

    //sortByKey默认是正序,第一个参数为true,想要倒序需要把这个参数设置为false
    //rdd.map(tup => (tup._2, tup._1)).sortByKey(false).foreach(println(_))

    //sortBy的使用:可以指定排序字段,比较灵活
    rdd.sortBy(_._2, false).foreach(println(_))

  }

image.pngimage.png

join

对两个包含对的RDD进行join操作

打印每个主播的大区信息和音浪收入

  def joinOp(sc: SparkContext): Unit = {
    //主播地区信息
    val rdd1 = sc.parallelize(Array((150001, "US"), (150002, "CN"), (150003, "CN"), (150004, "IN")))
    //主播收入信息
    val rdd2 = sc.parallelize(Array((150001, 400), (150002, 200), (150003, 300), (150004, 100)))

    val joinRdd = rdd1.join(rdd2)
    //joinRdd.foreach(println(_)) //join后rdd的格式:(150001,(US,400))
    joinRdd.foreach(tup => {
      val uid = tup._1
      val tupleData = tup._2
      val area = tupleData._1
      val money = tupleData._2
      println("uid=" + uid + " area=" + area + " money=" + money)
    })

  }

image.png

distinct

对RDD中的元素进行全局去重

统计当天开播的大区信息

  def distinctOp(sc: SparkContext): Unit = {
    val rdd = sc.parallelize(Array((150001, "US"), (150002, "CN"), (150003, "CN"), (150004, "IN")))
    rdd.map(_._2).distinct().foreach(println(_))
  }

image.png

常用Action算子image.png

reduce

将RDD中的所有元素进行聚合操作

  def reduceOP(sc: SparkContext): Unit = {
    val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
    val sum = rdd.reduce(_ + _)
    println(sum)
  }

image.png

collect

将RDD中所有元素获取到本地客户端(Driver)

注意:如果RDD中数据量过大,不建议使用collect,因为最终的数据会返回给Driver进程所在的节点 如果想要获取几条数据,查看一下数据格式,可以使用take(n)

  def collectOp(sc: SparkContext): Unit = {
    val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
    //collect返回的是一个Array数组
    rdd.collect().foreach(println(_))
  }

image.png

take(n)

获取RDD中前n个元素

  def takeOP(sc: SparkContext): Unit = {
    val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
    //从RDD中获取前3个元素
    rdd.take(3).foreach(println(_))
  }

image.png

count

获取RDD中元素总数

  def countOp(sc: SparkContext): Unit = {
    val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
    val res = rdd.count()
    println(res)
  }

image.png

saveAsTextFile

将RDD中元素保存到文件中,对每个元素调用toString

  def saveAsTextFileOp(sc: SparkContext): Unit = {
    val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
    //指定HDFS的路径信息即可,需要指定一个不存在的目录
    rdd.saveAsTextFile("hdfs://bigdata1:9000/out0203")

    //也可以存到本地文件
    //rdd.saveAsTextFile("D:\\test2")
  }

image.png
image.png

countByKey

对每个key对应的值进行count计数

统计相同的key出现多少次

  def countByKeyOp(sc: SparkContext): Unit = {
    val rdd = sc.parallelize(Array(("A", 1001), ("B", 1002), ("A", 1003), ("C", 1004)))
    //返回的是一个map类型的数据
    val res = rdd.countByKey()
    for ((k, v) <- res) {
      println(k + "," + v)
    }
  }

image.png

foreach

遍历RDD中的每个元素

注意:foreach不仅限于执行println操作,这个只是在测试的时候使用的 实际工作中如果需要把计算的结果保存到第三方的存储介质中,就需要使用foreach,在里面实现具体向外部输出数据的代码

  def foreachOp(sc: SparkContext): Unit = {
    val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
    rdd.foreach(println(_))
  }

image.png