参考文档:http://spark.apache.org/docs/latest/sql-getting-started.html

Spark SQL

是 Spark 处理结构化数据的一个模块.与基础的 Spark RDD API 不同, Spark SQL 提供了查询结构化数据及计算结果等信息的接口.在内部, Spark SQL 使用这个额外的信息去执行额外的优化.有几种方式可以跟 Spark SQL 进行交互, 包括 SQL 和 Dataset API.

SQL

Spark SQL 的功能之一是执行 SQL 查询. 当以另外的编程语言运行SQL 时, 查询结果将以 Dataset/DataFrame的形式返回.也可以使用 命令行或者通过JDBC/ODBC与 SQL 接口交互.

Datasets and DataFrames

一个 Dataset 是一个分布式的数据集合 Dataset 是在 Spark 1.6 中被添加的新接口, 它提供了 RDD 的优点(强类型化, 能够使用强大的 lambda 函数)与Spark SQL执行引擎的优点.一个 Dataset 可以从 JVM 对象来 构造 并且使用转换功能(map, flatMap, filter, 等等). Dataset API 在Scala 和 Java是可用的.
一个 DataFrame 是一个 Dataset 组成的指定列.它的概念与一个在关系型数据库或者在 R/Python 中的表是相等的, 但是有很多优化. DataFrames 可以从大量的 sources 中构造出来, 比如: 结构化的文本文件, Hive中的表, 外部数据库, 或者已经存在的 RDDs. DataFrame API 可以在 Scala, Java, Python, 和 R中实现. 在 Scala 和 Java中, DataFrame 由 DataSet 中的 RowS(多个 Row)来表示. 在 the Scala API中, DataFrame 仅仅是一个 Dataset[Row]类型的别名. 然而, 在 Java API中, 用户需要去使用 Dataset 去代表一个 DataFrame.

SparkSession

Spark SQL中所有功能的入口点是 SparkSession 类.

SparkConf sparkConf = new _SparkConf()
.setAppName(“JavaDecisionTreeClassificationExample”)
.setMaster(“local”)
.set(“spark.driver.host”, “localhost”).set(“spark.testing.memory”, “21474800000”);
// Create a SparkSession.
_JavaSparkContext jsc=_new _JavaSparkContext(sparkConf);

SparkSession spark = SparkSession
.builder()
.appName(“sparkSession”)
.sparkContext(jsc.sc())
.getOrCreate();

创建DataFrame

在一个SparkSession中, 应用程序可以从一个 已经存在的RDD, 从hive表, 或者从 Spark数据源中创建一个DataFrames.
举个例子, 下面就是基于一个JSON文件创建一个DataFrame:
Dataset<_Row_> sparkSql = spark.read().json(“src/main/java/com/kdy/spark/ml/data/people.json”);
sparkSql.show();

image.png

DataFrame操作

DataFrames为ScalaJavaPythonR中的结构化数据操作提供了一种特定于域的语言。。
在这里,我们包括一些使用数据集进行结构化数据处理的基本示例:

sparkSql.show();

_// +——+———-+
//| age| name|
//+——+———-+
//|null|Michael|
//| 30| Andy|
//| 19| Justin|
//+——+———-+

  1. // Print the schema in a tree format<br /> _sparkSql.printSchema();<br />_// root<br />// |-- age: long (nullable = true)<br />// |-- name: string (nullable = true)

// Select only the “name” column
sparkSql.select(“name”).show();
// +———-+
// | name|
// +———-+
// |Michael|
// | Andy|
// | Justin|
// +———-+

// Select everybody, but increment the age by 1
sparkSql.select(_col(“name”), col(“age”).plus(1)).show();
_// +———-+————-+
// | name|(age + 1)|
// +———-+————-+
// |Michael| null|
// | Andy| 31|
// | Justin| 20|
// +———-+————-+

// Select people older than 21
sparkSql.filter(_col(“age”).gt(21)).show();
_// +—-+——+
// |age|name|
// +—-+——+
// | 30|Andy|
// +—-+——+

// Count people by age
sparkSql.groupBy(“age”).count().show();
// +——+——-+
// | age|count|
// +——+——-+
// | 19| 1|
// |null| 1|
// | 30| 1|
// +——+——-+_

视图

  1. Spark SQL中的临时视图是会话作用域的,如果创建它的会话终止,它将消失。如果要在所有会话之间共享一个临时视图并保持活动状态,直到Spark应用程序终止,则可以创建一个全局临时视图。全局临时视图与系统保留的数据库相关联global_temp,我们必须使用限定名称来引用它,例如SELECT * FROM global_temp.view1

_// $example on:run_sql$
// Register the DataFrame as a SQL temporary view
_df.createOrReplaceTempView(“people”);

  1. Dataset<_Row_> sqlDF = spark.sql("SELECT * FROM people");<br /> sqlDF.show();<br /> _// +----+-------+<br /> // | age| name|<br /> // +----+-------+<br /> // |null|Michael|<br /> // | 30| Andy|<br /> // | 19| Justin|<br /> // +----+-------+<br /> // $example off:run_sql$
  2. // $example on:global_temp_view$<br /> // Register the DataFrame as a global temporary view<br /> _df.createGlobalTempView("people");
  3. _// Global temporary view is tied to a system preserved database `global_temp`<br /> _spark.sql("SELECT * FROM global_temp.people").show();<br /> _// +----+-------+<br /> // | age| name|<br /> // +----+-------+<br /> // |null|Michael|<br /> // | 30| Andy|<br /> // | 19| Justin|<br /> // +----+-------+
  4. // Global temporary view is cross-session<br /> _spark.newSession().sql("SELECT * FROM global_temp.people").show();<br /> _// +----+-------+<br /> // | age| name|<br /> // +----+-------+<br /> // |null|Michael|<br /> // | 30| Andy|<br /> // | 19| Justin|<br /> // +----+-------+<br /> // $example off:global_temp_view$_

函数

  1. _//Aggregate functions<br />//计算平均年龄<br /> _df.agg(_avg_("age")).show();

_// +————+
//|avg(age)|
//+————+
//| 24.5|
//+————+

//Collection functions
Dataset<_Row> arrdf = df.withColumn(“info”, array(“age”, “name”));

  1. arrdf.show();<br />_// +----+-------+------------+<br />//| age| name| info|<br />//+----+-------+------------+<br />//|null|Michael| [, Michael]|<br />//| 30| Andy| [30, Andy]|<br />//| 19| Justin|[19, Justin]|<br />//+----+-------+------------+
  2. _arrdf.withColumn("explode",_explode_(_col_("info"))).show();

// +——+———-+——————+———-+
//| age| name| info|explode|
//+——+———-+——————+———-+
//|null|Michael| [, Michael]| null|
//|null|Michael| [, Michael]|Michael|
//| 30| Andy| [30, Andy]| 30|
//| 30| Andy| [30, Andy]| Andy|
//| 19| Justin|[19, Justin]| 19|
//| 19| Justin|[19, Justin]| Justin|
//+——+———-+——————+———-+
//计算数组长度
_arrdf.withColumn(“size”,_size
(col(“info”))).show();

// +——+———-+——————+——+
//| age| name| info|size|
//+——+———-+——————+——+
//|null|Michael| [, Michael]| 2|
//| 30| Andy| [30, Andy]| 2|
//| 19| Justin|[19, Justin]| 2|
//+——+———-+——————+——+
//排序
_df.orderBy(“age”).show();
//+——+———-+
//| age| name|
//+——+———-+
//|null|Michael|
//| 19| Justin|
//| 30| Andy|
//+——+———-+
//
// Sorting functions
//降序
df.orderBy(_desc(“age”)).show();
//+——+———-+
//| age| name|
//+——+———-+
//| 30| Andy|
//| 19| Justin|
//|null|Michael|
//+——+———-+
// Date time functions
//添加日期
_arrdf.withColumn(“date”, _current_date
()).show();
_// +——+———-+——————+—————+
//| age| name| info| date|
//+——+———-+——————+—————+
//|null|Michael| [, Michael]|2021-05-25|
//| 30| Andy| [30, Andy]|2021-05-25|
//| 19| Justin|[19, Justin]|2021-05-25|
//+——+———-+——————+—————+

_