概述

官方地址 http://spark.apache.org/sql/
Spark SQL是Spark用来处理结构化数据的一个模块,它提供了2个编程抽象:DataFrame和DataSet,并且作为分布式SQL查询引擎的作用。
Spark SQL可以很好地支持SQL查询,一方面,可以编写Spark应用程序使用SQL语句进行数据查询,另一方面,也可以使用标准的数据库连接器(比如JDBC或ODBC)连接Spark进行SQL查询。

DataFrame

SparkSQL使用的数据抽象是DataFrame ,DataFrame让Spark具备了处理大数据结构化数据的能力,它不仅比原来的RDD转换方式更加简单易用,而且获得了更高的计算能力。Spark 能够轻松实现从Mysql到DataFrame的转化,并且支持SQL查询。
DataFrame是一种以RDD为基础的分布式数据集,提供了详细的数据信息,就相当于关系数据库的一张表,每个RDD元素都是一个Java对象,即Person对象,但是无法知道Person对象的内部结构信息,而采用DataFrame时,Person对象内部机构信息一目了然。它包含Name,Age,并且知道每个字段的数据类型。
Spark-SQL简介 - 图1

DataFrame创建

从Spark2.0以上版本开始,Spark使用全新的SparkSession接口替代Spark1.6中的SQLContext及HiveContext接口来实现其对数据加载、转换、处理等功能。SparkSession实现了SQLContext及HiveContext所有功能。
SparkSession支持从不同的数据源加载数据,并把数据转换成DataFrame,并且支持把DataFrame转换成SQLContext自身中的表,然后使用SQL语句来操作数据。SparkSession亦提供了HiveQL以及其他依赖于Hive的功能的支持。

HelloWorld

在当前工程目录底下创建input目录 创建测试数据文件 people.json

  1. {"name":"Michael"}
  2. {"name":"Andy", "age":30}
  3. {"name":"Justin", "age":19}

maven 配置

https://mvnrepository.com/search?q=org.apache.spark
Spark-SQL简介 - 图2

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>cn.bx.spark</groupId>
  7. <artifactId>SparkNote</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <properties>
  10. <scala.version>2.11.12</scala.version>
  11. <spark.version>2.2.3</spark.version>
  12. </properties>
  13. <dependencies>
  14. <dependency>
  15. <groupId>org.scala-lang</groupId>
  16. <artifactId>scala-library</artifactId>
  17. <version>${scala.version}</version>
  18. </dependency>
  19. <dependency>
  20. <groupId>org.apache.spark</groupId>
  21. <artifactId>spark-core_2.11</artifactId>
  22. <version>${spark.version}</version>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.apache.spark</groupId>
  26. <artifactId>spark-sql_2.11</artifactId>
  27. <version>${spark.version}</version>
  28. </dependency>
  29. </dependencies>
  30. <build>
  31. <sourceDirectory>src/main/scala</sourceDirectory>
  32. <plugins>
  33. <plugin>
  34. <groupId>org.scala-tools</groupId>
  35. <artifactId>maven-scala-plugin</artifactId>
  36. <version>2.15.0</version>
  37. <executions>
  38. <execution>
  39. <goals>
  40. <goal>compile</goal>
  41. </goals>
  42. <configuration>
  43. <args>
  44. <arg>-dependencyfile</arg>
  45. <arg>${project.build.directory}/.scala_dependencies</arg>
  46. </args>
  47. </configuration>
  48. </execution>
  49. </executions>
  50. </plugin>
  51. </plugins>
  52. </build>
  53. </project>

SparkSQL.scala

  1. package cn.bx.spark
  2. import org.apache.spark.sql.{DataFrame, SparkSession}
  3. object SparkSQLNote {
  4. def main(args: Array[String]): Unit = {
  5. val sparkSession: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()
  6. val df: DataFrame = sparkSession.read.json("input/people.json")
  7. df.show()
  8. df.printSchema()
  9. df.select(df("name"),df("age")).show()
  10. df.filter(df("age") > 20 ).show()
  11. df.groupBy("name").count().show()
  12. df.sort(df("age").desc).show()
  13. df.sort(df("age").desc, df("name").asc).show()
  14. df.select(df("name").as("user_name"),df("age").as("user_age")).show()
  15. sparkSession.stop()
  16. }
  17. }