SparkSQL是Spark用于处理结构化数据的模块,前身为Shark。
特点:
- 易整合
- 无缝的整合了SQL查询和Spark编程
- 统一的数据访问
- 相同方式连接不同数据源
- 兼容Hive
- 支持SQL或者HiveQL
- 标准数据连接
- 通过JDBC或ODBC连接
数据模型
1、DataFrame
在 Spark 中,DataFrame 是一种以 RDD 为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame 与 RDD 的主要区别在于,前者带有 schema 元信息,即 DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得 Spark SQL 得以洞察更多的结构信息,从而对藏于 DataFrame 背后的数据源以及作用于 DataFrame 之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观 RDD,由于无从得知所存数据元素的具体内部结构,Spark Core 只能在 stage 层面进行简单、通用的流水线优化。
同时,与 Hive 类似,DataFrame 也支持嵌套数据类型(struct、array 和 map)。从 API易用性的角度上看,DataFrame API 提供的是一套高层的关系操作,比函数式的 RDD API 要更加友好,门槛更低。
- 通过JDBC或ODBC连接
和RDD的区别:
左侧的 RDD[Person]虽然以 Person 为类型参数,但 Spark 框架本身不了解 Person 类的内部结构。而右侧的 DataFrame 却提供了详细的结构信息,使得 Spark SQL 可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么
DataFrame 也是懒执行的,但性能上比 RDD 要高,主要原因:优化的执行计划,即查询计划通过 Spark catalyst optimiser 进行优化。
2、DataSet
DataSet 是分布式数据集合。DataSet 是 Spark 1.6 中添加的一个新抽象,是 DataFrame的一个扩展。它提供了 RDD 的优势(强类型,使用强大的 lambda 函数的能力)以及 Spark SQL 优化执行引擎的优点。DataSet 也可以使用功能性的转换(操作 map,flatMap,filter等等)
- DataSet 是 DataFrame API 的一个扩展,是 SparkSQL 最新的数据抽象
- 用户友好的 API 风格,既具有类型安全检查也具有 DataFrame 的查询优化特性;
- 用样例类来对 DataSet 中定义数据的结构信息,样例类中每个属性的名称直接映射到DataSet 中的字段名称;
- DataSet 是强类型的。比如可以有 DataSet[Car],DataSet[Person]。
- DataFrame 是 DataSet 的特列,DataFrame=DataSet[Row] ,所以可以通过 as 方法将
- DataFrame 转换为 DataSet。Row 是一个类型,跟 Car、Person 这些的类型一样,所有的表结构信息都用 Row 来表示。获取数据时需要指定顺序
SparkSQL编程
在Spark-Core里面,环境对象为上下文对象:SparkContext,而在SparkSQL里面老的版本中,SparkSQL 提供两种 SQL 查询起始点:一个叫 SQLContext,用于 Spark自己提供的 SQL 查询;一个叫 HiveContext,用于连接 Hive 的查询。
SparkSession 是 Spark 最新的 SQL 查询起始点,实质上是 SQLContext 和 HiveContext的组合,所以在 SQLContex 和HiveContext 上可用的 API 在 SparkSession 上同样是可以使用的。SparkSession 内部封装了 SparkContext,所以计算实际上是由 sparkContext 完成的。
启动spark-shell,会准备好SparkSession对象spark
有一个read命令,可以读取文件,现在在bin/data/创建一个json文件,叫user.json,写入如下数据:
{"username":"zhangsan","age":30}
{"username":"lisi","age":20}
{"username":"wangwu","age":40}
使用spark.read.json()读取文件,会得到一个DataFrame对象,结构会按照字典顺序排序,使用show来展示该对象
使用DataFrame对象创建临时视图,可以使用sql语句进行查询了。
df.createTempView()
spark.sql(sql语句).show
1、DataFrame
在 Spark SQL 中 SparkSession 是创建 DataFrame 和执行 SQL 的入口,创建 DataFrame有三种方式:
- 通过 Spark 的数据源进行创建;
- 从一个存在的 RDD 进行转换;
- 还可以从 Hive Table 进行查询返回。
1、SQL语法
1、读取文件创建DataFrame
spark.read.xxx格式(文件名)
2、创建一个临时表
df.createOrReplaceTempView(表名)
可以使用df.createOrReplaceGlobalTempView()创建全局临时表
注意:普通临时表是 Session 范围内的,如果想应用范围内有效,可以使用全局临时表。使用全局临时表时需要全路径访问,加上前缀global_temp
如:global_temp.people
spark.newSession.xxx,新建一个session进行操作。
2、DSL语法
DataFrame 提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据。可以在 Scala, Java, Python 和 R 中使用 DSL,使用 DSL 语法风格不必去创建临时视图了
查看用法:df.printSchema
可以写的所有函数:
1、只查看“username”列:
select()选择某列
2、查看username列并且将所有age+1
可以使用$"字段"
引用该字段,但是如果有一个引用的字段,所有字段都需要使用$引用
提供了一种简便的方式:使用单引号字段名
即可引用该字段。
3、过滤字段
filter()过滤条件
4、分组并计数
分组使用groupBy()
计数使用count()
3、与RDD之间的转换
- RDD转换为DataFrame
- 使用rdd.toDF(字段名1,字段名2)
- DataFrame转换为RDD
- df.rdd()
注意:此时得到的 RDD 存储类型为 Row
2、DataSet
DataSet 是具有强类型的数据集合,需要提供对应的类型信息。
1、创建DataSet
使用样例类创建DataSet
case class Person(name: String, age: Long)
val ds = Seq(Person(“zhangsan”,2)).toDS()
ds.show
使用基本序列创建DataSet
注意:在实际使用的时候,很少用到把序列转换成 DataSet,更多的是通过RDD来得到 DataSet
**
2、与DataFrame的转换
- DataSet to DataFrame
- ds.toDF
- DataFrame to DataSet
- 定义一个case class,内部属性和DataSet一致
- df.as[case class]
3、与RDD的直接转换
前提:一个RDD有其类型信息,可以是一个样例类
样例类rdd.toDS
ds.rdd
3、RDD、DataFrame、DataSet三者的关系
- Spark1.0 => Rdd
- Spark1.3 => DataFrame
- Spark1.6 => DataSet
1、共性
- RDD、DataFrame、DataSet 全都是 spark 平台下的分布式弹性数据集,为处理超大型数据提供便利;
- 三者都有惰性机制,在进行创建、转换,如 map 方法时,不会立即执行,只有在遇到Action 如 foreach 时,三者才会开始遍历运算;
- 三者有许多共同的函数,如 filter,排序等;
- 在对 DataFrame 和 Dataset 进行操作许多操作都需要这个包:
import spark.implicits._
(在创建好 SparkSession 对象后尽量直接导入) - 三者都会根据 Spark 的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出
- 三者都有 partition 的概念
- DataFrame 和 DataSet 均可使用模式匹配获取各个字段的值和类型
2、不同
- RDD
- RDD 一般和 spark mllib 同时使用
- RDD 不支持 sparksql 操作
- DataFrame
- 与 RDD 和 Dataset 不同,DataFrame 每一行的类型固定为 Row,每一列的值没法直接访问,只有通过解析才能获取各个字段的值
- DataFrame 与 DataSet 一般不与 spark mllib 同时使用
- DataFrame 与 DataSet 均支持 SparkSQL 的操作,比如 select,groupby 之类,还能注册临时表/视窗,进行 sql 语句操作
- DataFrame 与 DataSet 支持一些特别方便的保存方式,比如保存成 csv,可以带上表头,这样每一列的字段名一目了然(后面专门讲解)
- DataSet
- Dataset 和 DataFrame 拥有完全相同的成员函数,区别只是每一行的数据类型不同。DataFrame 其实就是 DataSet 的一个特例 type DataFrame = Dataset[Row]
- DataFrame 也可以叫 Dataset[Row],每一行的类型是 Row,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用上面提到的 getAS 方法或者共性中的第七条提到的模式匹配拿出特定字段。而 Dataset 中,每一行是什么类型是不一定的,在自定义了 case class 之后可以很自由的获得每一行的信息
4、Idea开发
1、创建SparkSQL的运行环境
SparkSession由于构造器是private私有的,所以不可以直接new出来,可以考虑通过伴生对象或者工厂方法等产生。
添加spark-sql的依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.0</version>
</dependency>
class SparkSession private(
private[sql] def this(sc: SparkContext) {
this(sc, None, None,
SparkSession.applyExtensions(
sc.getConf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS).getOrElse(Seq.empty),
new SparkSessionExtensions))
}
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("spark-sql")
// 私有的构造器,使用伴生对象的工厂方法创建
val spark = SparkSession.builder().config(conf).getOrCreate()
spark.close()
}
2、DataFrame基本操作
唯一需要注意的是,要对数据进行转换时,需要导入sparkSession对象的隐式转换规则。import spark.implicits._
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("spark-sql")
// 私有的构造器,使用伴生对象的工厂方法创建
val spark = SparkSession.builder().config(conf).getOrCreate()
// 逻辑
// DataFrame
val df: DataFrame = spark.read.json("datas/user.json")
df.show()
df.createTempView("user")
df.select("username").show()
spark.sql("select * from user").show()
// 需要引入隐式转换。是sparkSession对象里面的!!!
import spark.implicits._
df.select($"age"+1).show()
df.filter('age > 20).show()
// 关闭
spark.close()
}
3、DataSet基本操作
DataFrame其实就是特定泛型的DataSet。
type DataFrame = Dataset[Row]
// DataSet
val seq = Seq(1,2,3,4)
val ds: Dataset[Int] = seq.toDS()
ds.show()
4、DataFrame和DataSet转换
// RDD => DataFrame
val originRdd: RDD[(Int, String)] = spark.sparkContext.makeRDD(List((1, "zhangsan"), (2, "lisi"), (3, "wang5")))
val rddDf: DataFrame = originRdd.toDF("id", "name")
rddDf.show()
// DataFrame => RDD
val dfRdd: RDD[Row] = rddDf.rdd
dfRdd.collect.foreach(println)
// RDD => DataSet
val userRdd: RDD[User] = spark.sparkContext.makeRDD(List(User(1, "zhangsan"), User(2, "lisi"), User(3, "wang5")))
val rddDs: Dataset[User] = userRdd.toDS()
rddDs.show()
// DataSet => RDD
val dsRdd: RDD[User] = rddDs.rdd
dsRdd.collect.foreach(println)
// DataFrame => DataSet
val dff: DataFrame = userRdd.toDF("id", "name")
val dfDs: Dataset[User] = dff.as[User]
dfDs.show()
// DataSet => DataFrame
val dsDf: DataFrame = dfDs.toDF()
dsDf.show()
// 关闭
spark.close()
}
case class User(id: Int, name: String)
5、UDF函数
用户自定义函数(User Defined Function),可以用来在spark.sql()中执行自定义函数
// 注册函数
// 注册的函数名;自定义函数逻辑
spark.udf.register("prefix",(username:String) => {
"Name: " + username
})
// 自定义函数UDF,实现用户自定义逻辑
spark.sql("select prefix(username) from user").show()
6、UDAF函数
强类型的 Dataset 和弱类型的 DataFrame 都提供了相关的聚合函数, 如 count(),countDistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义聚合函数。通过继承 UserDefinedAggregateFunction 来实现用户自定义弱类型聚合函数。从 Spark3.0 版本后,UserDefinedAggregateFunction 已经不推荐使用了。可以统一采用强类型聚合函数Aggregator
1、弱类型函数实现
计算年龄平均值
object TestUDFA {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("spark-sql")
// 私有的构造器,使用伴生对象的工厂方法创建
val spark = SparkSession.builder().config(conf).getOrCreate()
// 逻辑
// DataFrame
val df: DataFrame = spark.read.json("datas/user.json")
df.show()
df.createTempView("user")
df.select("username").show()
spark.sql("select * from user").show()
// 注册函数
// 注册的函数名;自定义函数逻辑
spark.udf.register("avgAge",new MyAvgUDFA)
// 自定义函数UDF,实现用户自定义逻辑
spark.sql("select avgAge(age) from user").show()
// 关闭
spark.close()
}
/**
* 自定义输入函数
*/
class MyAvgUDFA extends UserDefinedAggregateFunction{
// 输入数据的结构
override def inputSchema: StructType = {
// case class StructType(fields: Array[StructField])
StructType(
Array(
//case class StructField(
// name: String,
// dataType: DataType,
// nullable: Boolean = true,
// metadata: Metadata = Metadata.empty)
StructField("age",LongType)
)
)
}
// 缓冲区结构
override def bufferSchema: StructType = {
StructType(Array(
StructField("total",LongType), // 计算总和
StructField("count",LongType) // 所有个数
))
}
// 输出的数据类型
override def dataType: DataType = LongType
// 函数的稳定性
override def deterministic: Boolean = true
// 缓冲区初始化
override def initialize(buffer: MutableAggregationBuffer): Unit = {
// 初始化总和和个数为0
buffer(0) = 0L // buffer(0) = Row.apply(0) = buffer.get(0)
buffer(1) = 0L
// 或者可以使用update
// buffer.update(0,0L)
// buffer.update(1,0L)
}
// 更新缓冲区的值
override def update(buffer: MutableAggregationBuffer, input: Row): Unit ={
// 每来一条数据,更新一次记录
buffer.update(0,buffer.getLong(0) + input.getLong(0))
buffer.update(1,buffer.getLong(1) + 1)
}
// 合并缓冲区
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1.update(0,buffer1.getLong(0) + buffer2.getLong(0))
buffer1.update(1,buffer1.getLong(1) + buffer2.getLong(1))
}
// 计算
override def evaluate(buffer: Row): Any = {
// 我们求平均值,缓冲区第一个是总和,第二个是总量,相除即可获取平均值
buffer.getLong(0) / buffer.getLong(1)
}
}
}
2、强类型函数实现
继承Aggregator,定义泛型,序列化的时候使用Encoders.product序列化缓冲区,使用Encoders.scalaxxx序列化基本数据类型
在注册自定义函数的时候,使用functions.udaf()包裹自定义函数类
object TestUDFA_StrongType {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("spark-sql")
// 私有的构造器,使用伴生对象的工厂方法创建
val spark = SparkSession.builder().config(conf).getOrCreate()
// 逻辑
// DataFrame
val df: DataFrame = spark.read.json("datas/user.json")
df.show()
df.createTempView("user")
df.select("username").show()
spark.sql("select * from user").show()
// 注册函数
// 注册的函数名;自定义函数逻辑
// 使用functions.udaf(new MyAvgUDFA) 包裹
spark.udf.register("avgAge",functions.udaf(new MyAvgUDFA))
// 自定义函数UDF,实现用户自定义逻辑
spark.sql("select avgAge(age) from user").show()
// 关闭
spark.close()
}
/**
* 自定义输入函数
*/
// abstract class Aggregator[-IN, BUF, OUT] extends Serializable {
case class Buff(var total:Long,var count:Long)
class MyAvgUDFA extends Aggregator[Long,Buff,Long]{
// 初始值
override def zero: Buff = {
Buff(0L,0L)
}
// 计算
override def reduce(b: Buff, a: Long): Buff = {
b.total += a
b.count += 1
b
}
// 合并
override def merge(b1: Buff, b2: Buff): Buff = {
b1.total += b2.total
b1.count += b2.count
b1
}
// 完成结果
override def finish(reduction: Buff): Long = {
reduction.total / reduction.count
}
// 编码
// 如果是缓冲区,使用Encoders.produce
// 如果是基本数据类型,使用Encoders.scalaxxxx
override def bufferEncoder: Encoder[Buff] = Encoders.product
override def outputEncoder: Encoder[Long] = Encoders.scalaLong
}
}
object TestUDFA_StrongType {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("spark-sql")
// 私有的构造器,使用伴生对象的工厂方法创建
val spark = SparkSession.builder().config(conf).getOrCreate()
// 逻辑
// DataFrame
val df: DataFrame = spark.read.json("datas/user.json")
df.show()
df.createTempView("user")
df.select("username").show()
spark.sql("select * from user").show()
// 注册函数
// 注册的函数名;自定义函数逻辑
// 使用functions.udaf(new MyAvgUDFA) 包裹
spark.udf.register("avgAge",functions.udaf(new MyAvgUDFA))
// 自定义函数UDF,实现用户自定义逻辑
spark.sql("select avgAge(age) from user").show()
// 关闭
spark.close()
}
/**
* 自定义输入函数
*/
// abstract class Aggregator[-IN, BUF, OUT] extends Serializable {
case class Buff(var total:Long,var count:Long)
class MyAvgUDFA extends Aggregator[Long,Buff,Long]{
// 初始值
override def zero: Buff = {
Buff(0L,0L)
}
// 计算
override def reduce(b: Buff, a: Long): Buff = {
b.total += a
b.count += 1
b
}
// 合并
override def merge(b1: Buff, b2: Buff): Buff = {
b1.total += b2.total
b1.count += b2.count
b1
}
// 完成结果
override def finish(reduction: Buff): Long = {
reduction.total / reduction.count
}
// 编码
// 如果是缓冲区,使用Encoders.produce
// 如果是基本数据类型,使用Encoders.scalaxxxx
override def bufferEncoder: Encoder[Buff] = Encoders.product
override def outputEncoder: Encoder[Long] = Encoders.scalaLong
}
}
3、早期强类型聚合函数实现
Aggregator是Spark3.0的时候才出现的,早期如何在弱类型的sql中使用强类型的聚合函数呢?
1、需要一个ds对象确定类型,自然要有一个样例类
2、将df转为ds,并执行查询操作
3、将UDAF对象转换为查询的列对象:toColumn()
4、ds.select(column)
object TestUDFA_Far_StrongType {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("spark-sql")
// 私有的构造器,使用伴生对象的工厂方法创建
val spark = SparkSession.builder().config(conf).getOrCreate()
// 逻辑
// DataFrame
val df: DataFrame = spark.read.json("datas/user.json")
df.show()
df.createTempView("user")
import spark.implicits._
val ds: Dataset[User] = df.as[User]
// 将UDAF转成列对象
val column: TypedColumn[User, Long] = new MyAvgUDFA().toColumn
// 使用ds查询该列
ds.select(column).show()
// 关闭
spark.close()
}
case class User(username:String,age:Long)
/**
* 自定义输入函数
*/
// abstract class Aggregator[-IN, BUF, OUT] extends Serializable {
case class Buff(var total:Long,var count:Long)
class MyAvgUDFA extends Aggregator[User,Buff,Long]{
// 初始值
override def zero: Buff = {
Buff(0L,0L)
}
// 计算
override def reduce(b: Buff, a: User): Buff = {
b.total += a.age
b.count += 1
b
}
// 合并
override def merge(b1: Buff, b2: Buff): Buff = {
b1.total += b2.total
b1.count += b2.count
b1
}
// 完成结果
override def finish(reduction: Buff): Long = {
reduction.total / reduction.count
}
// 编码
// 如果是缓冲区,使用Encoders.produce
// 如果是基本数据类型,使用Encoders.scalaxxxx
override def bufferEncoder: Encoder[Buff] = Encoders.product
override def outputEncoder: Encoder[Long] = Encoders.scalaLong
}
}
5、数据的加载和保存
1、通用方法
加载数据:
spark.read.load
默认加载和保存的都是parquet文件,
保存数据:
spark.write.save
保存数据还涉及到了一个保存模式,SaveMode
spark.write.format(“json”).mode(xxx).save
- error(default):如果文件存在,直接抛异常
- append:追加
- overwrite:覆盖
- ignore:忽略本次操作
如果我们就想要读取json文件,该如何?
spark.read.format(“json”).load(xxx)
spark.write.format(“json”).save(xx)
spark封装了json,spark.read.json
2、操作json和csv
如果我们想要进行查询操作的时候,都是先要创建一张表,然后才能spark.sql(x)查询。
其实我们也可以直接在文件上查询,这其实还是创建了一张表,不过是由spark给我们做了。
spark.sql(“select * from json.user.json
“).show()
使用````将文件引起来,前面加上要读取的格式
数据源为 Parquet 文件时,Spark SQL 可以方便的执行所有的操作,不需要使用 format。
修改配置项 spark.sql.sources.default
,可修改默认数据源格式。
注意:Spark 读取的 JSON 文件不是传统的 JSON 文件,每一行都应该是一个 JSON 串。
读取csv文件:
Spark SQL 可以配置 CSV 文件的列表信息,读取 CSV 文件,CSV 文件的第一行设置为数据列
spark.read.format("csv")
.option("sep",";") # 以分号为分隔符
.option("inferSchema",true) # 内部结构
.option("header",true) # 是否将第一行作为表头
.load("data/user.csv")
3、操作MySQL
1、导入mysql驱动
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
2、使用jdbc方式读取数据
object TestMySQL {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("spark-sql")
// 私有的构造器,使用伴生对象的工厂方法创建
val spark = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
// 读
val df = spark.read.format("jdbc")
.option("url","jdbc:mysql://localhost:3306/guliedu")
.option("driver","com.mysql.jdbc.Driver")
.option("user","root")
.option("password","root")
.option("dbtable","acl_permission")
.load()
df.show()
// 写
df.write.format("jdbc").option("url","jdbc:mysql://localhost:3306/guliedu")
.option("driver","com.mysql.jdbc.Driver")
.option("user","root")
.option("password","root")
.option("dbtable","acl_permission")
.mode(SaveMode.Overwrite)
.save
}
}
4、操作Hive
若要把 Spark SQL 连接到一个部署好的 Hive 上,你必须把 hive-site.xml 复制到Spark 的配置文件目录中($SPARK_HOME/conf)。即使没有部署好 Hive,Spark SQL 也可以运行。 需要注意的是,如果你没有部署好 Hive,Spark SQL 会在当前的工作目录中创建出自己的 Hive 元数据仓库,叫作 metastore_db
。此外,如果你尝试使用 HiveQL 中的CREATE TABLE (并非 CREATE EXTERNAL TABLE)语句来创建表,这些表会被放在你默认的文件系统中的 /user/hive/warehouse
目录中(如果你的 classpath 中有配好的hdfs-site.xml,默认的文件系统就是 HDFS,否则就是本地文件系统)。spark-shell 默认是 Hive 支持的;代码中是默认不支持的,需要手动指定(加一个参数即可)。
1、操作内置Hive
查询数据表:
spark.sql(“show tables”).show
创建数据表:
spark.sql(“create table aa(id int)”)
向表加载本地数据:
spark.sql(“load data local inpath ‘input/ids.txt’ into table aa”)
spark.sql(“select * from aa”)
在实际使用中, 几乎没有任何人会使用内置的 Hive
**
2、操作外置Hive
- Spark接管hive,将hive-site.xml拷贝到conf目录下
- 把MySQL的驱动拷贝到spark的jars中
- 如果访问不到hdfs,需要把core-site.xml和hdfs-site.xml拷贝到conf下
- 重启spark-shell
代码中操作Hive:
1、添加hive-site.xml到类路径下
2、启用Hive支持 :enableHiveSupport
3、添加依赖
添加依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
碰到jar包在中央仓库找不到,下载下来安装到本地仓库
jar包:
<!-- https://mvnrepository.com/artifact/org.pentaho/pentaho-aggdesigner-algorithm -->
<dependency>
<groupId>org.pentaho</groupId>
<artifactId>pentaho-aggdesigner-algorithm</artifactId>
<version>5.1.5-jhyde</version>
<scope>test</scope>
</dependency>
使用命令安装到本地仓库
mvn install:install-file -DgroupId=org.pentaho -DartifactId=pentaho-aggdesigner-algorithm -Dversion=5.1.5-jhyde -Dpackaging=jar -Dfile=pentaho-aggdesigner-algorithm-5.1.5-jhyde.jar
配置hive的地址、目录等
val spark = SparkSession
.builder()
.appName("Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()
如果遇到文件权限问题,可以在系统配置中添加当前用户为操作hadoop集群的用户
System.setProperty("HADOOP_USER_NAME",root);
3、beeline操作Hive
如果想连接 Thrift Server,需要通过以下几个步骤:
- Spark 要接管 Hive 需要把 hive-site.xml 拷贝到 conf/目录下
- 把 Mysql 的驱动 copy 到 jars/目录下
- 如果访问不到 hdfs,则需要把 core-site.xml 和 hdfs-site.xml 拷贝到 conf/目录下
- 启动 Thrift Server
bin/beeline \
-u jdbc:hive2://linux1:10000 \ # 连接到的hive地址
-n root #以什么用户运行