Spark SQL and DataFrame引言

  1. Spark SQLSpark中的一个模块,主要用于进行结构化数据的处理。它提供的最核心的编程抽象,就是DataFrame。同时Spark SQL还可以作为分布式的SQL查询引擎。Spark SQL最重要的功能之一,就是从Hive中查询数据。
  2. DataFrame,可以理解为是,以列的形式组织的,分布式的数据集合。它其实和关系型数据库中的表非常类似,但是底层做了很多的优化。DataFrame可以通过很多来源进行构建,包括:结构化的数据文件,Hive中的表,外部的关系型数据库,以及RDD

SQLContext

  1. 要使用Spark SQL,首先就得创建一个创建一个SQLContext对象,或者是它的子类的对象,比如HiveContext的对象。
  2. Java版本:
  3. JavaSparkContext sc = ...;
  4. SQLContext sqlContext = new SQLContext(sc);
  5. Scala版本:
  6. val sc: SparkContext = ...
  7. val sqlContext = new SQLContext(sc)
  8. import sqlContext.implicits._

HiveContext

  1. 除了基本的SQLContext以外,还可以使用它的子类——HiveContextHiveContext的功能除了包含SQLContext提供的所有功能之外,还包括了额外的专门针对Hive的一些功能。这些额外功能包括:使用HiveQL语法来编写和执行SQL,使用Hive中的UDF函数,从Hive表中读取数据。
  2. 要使用HiveContext,就必须预先安装好HiveSQLContext支持的数据源,HiveContext也同样支持——而不只是支持Hive。对于Spark 1.3.x以上的版本,都推荐使用HiveContext,因为其功能更加丰富和完善。
  3. Spark SQL还支持用spark.sql.dialect参数设置SQL的方言。使用SQLContextsetConf()即可进行设置。对于SQLContext,它只支持“sql”一种方言。对于HiveContext,它默认的方言是“hiveql”。

创建DataFrame

  1. 使用SQLContext,可以从RDDHive表或者其他数据源,来创建一个DataFrame。以下是一个使用JSON文件创建DataFrame的例子:
  2. Java版本:
  3. JavaSparkContext sc = ...;
  4. SQLContext sqlContext = new SQLContext(sc);
  5. DataFrame df = sqlContext.read().json("hdfs://spark1:9000/students.json");
  6. df.show();
  7. Scala版本:
  8. val sc: SparkContext = ...
  9. val sqlContext = new SQLContext(sc)
  10. val df = sqlContext.read.json("hdfs://spark1:9000/students.json")
  11. df.show()

DataFrame的常用操作

  1. Java版本
  2. DataFrame df = sqlContext.read().json("hdfs://spark1:9000/students.json");
  3. df.show();
  4. df.printSchema();
  5. df.select("name").show();
  6. df.select(df.col("name"), df.col("age").plus(1)).show();
  7. df.filter(df.col("age").gt(21)).show();
  8. df.groupBy("age").count().show();
  9. Scala版本
  10. val df = sqlContext.read.json("hdfs://spark1:9000/students.json")
  11. df.show()
  12. df.printSchema()
  13. df.select("name").show()
  14. df.select(df("name"), df("age") + 1).show()
  15. df.filter(df("age") > 21).show()
  16. df.groupBy("age").count().show()

DataFrame Create

java版

  1. public class createdataframe {
  2. public static void main(String[] args) {
  3. SparkConf conf = new SparkConf().setAppName("dataframecreate").setMaster("local");
  4. JavaSparkContext sc = new JavaSparkContext(conf);
  5. SQLContext sqlContext = new SQLContext(sc);
  6. DataFrame df = sqlContext.read().json("/Users/gaozhen/tmp/students.json");
  7. df.show();
  8. }
  9. }

scala 版

  1. object createdataframe1 {
  2. def main(args: Array[String]): Unit = {
  3. val conf = new SparkConf().setAppName("createdataframe").setMaster("local");
  4. val sc = new SparkContext(conf)
  5. val sqlContext = new SQLContext(sc);
  6. val df = sqlContext.read.json("/Users/gaozhen/tmp/students.json");
  7. df.show();
  8. }
  9. }

DataFrame Operation

java版

  1. public class DataFrameOperation {
  2. public static void main(String[] args) {
  3. // 创建DataFrame
  4. SparkConf conf = new SparkConf()
  5. .setAppName("DataFrameCreate");
  6. JavaSparkContext sc = new JavaSparkContext(conf);
  7. SQLContext sqlContext = new SQLContext(sc);
  8. // 创建出来的DataFrame完全可以理解为一张表
  9. DataFrame df = sqlContext.read().json("hdfs://spark1:9000/students.json");
  10. // 打印DataFrame中所有的数据(select * from ...)
  11. df.show();
  12. // 打印DataFrame的元数据(Schema)
  13. df.printSchema();
  14. // 查询某列所有的数据
  15. df.select("name").show();
  16. // 查询某几列所有的数据,并对列进行计算
  17. df.select(df.col("name"), df.col("age").plus(1)).show();
  18. // 根据某一列的值进行过滤
  19. df.filter(df.col("age").gt(18)).show();
  20. // 根据某一列进行分组,然后进行聚合
  21. df.groupBy(df.col("age")).count().show();
  22. }
  23. }

scala 版

  1. object DataFrameOperation {
  2. def main(args: Array[String]) {
  3. val conf = new SparkConf()
  4. .setAppName("DataFrameCreate")
  5. val sc = new SparkContext(conf)
  6. val sqlContext = new SQLContext(sc)
  7. val df = sqlContext.read.json("hdfs://spark1:9000/students.json")
  8. df.show()
  9. df.printSchema()
  10. df.select("name").show()
  11. df.select(df("name"), df("age") + 1).show()
  12. df.filter(df("age") > 18).show()
  13. df.groupBy("age").count().show()
  14. }
  15. }