前世今生

Spark SQL是从shark发展而来。Shark为了实现Hive兼容,在HQL方面重用了Hive中HQL的解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划从MR作业替换成了Spark作业。
Spark SQL一直到2.0版本才算是真正稳定,发展经历如下几个阶段:
image.png
Spark SQL2.0数据结构演变:
image.png
Spark SQL从Hive发展演变而来,功能远大于Hive;Spark SQL提供SQLDSL两种方式分析数据,并且Hive框架拥有的功能Spark SQL都有。

SparkSession

加载读取外部数据源的数据,并将要处理的数据封装至数据结构(Dataset/DataFrame)。主要分为2种读取方式:

  • 静态数据:spark.read
  • 流式数据:spark.readStream ```scala //采用构造者设计模式 val spark: SparkSession = SparkSession
    1. .builder()
    2. .appName(this.getClass.getSimpleName.stripSuffix("$"))
    3. .master("local[4]")
    4. .config("spark.sql.shuffle.partitions", "4")
    5. .getOrCreate()

spark.close()

<a name="hr3p1"></a>
# 数据结构
Dataset  =  RDD[CaseClass] + Schema<br />DataFrame = Dataset[Row]   =   RDD[Row] + Schema<br />**Dataset/DataFrame **是基于RDD之上的分布式数据集**。**<br />**解释:**

- ROW:弱类型,相当于元组 <br />row.getAs[type](name)<br />Row(value1,value2,...)
- Schema:结构化类型 StructType(封装StructField,包括name、type、nullable、metadata)
<a name="tmSDe"></a>
## DataFrame特点
数据集,来源于Python和Pandas和R语言数据结构dataFrame

- 分布式数据集,并且以列的方式组合,相当于具有schema的RDD
- 相当于关系型数据库中的表,但是底层有优化
- 提供一些抽象操作,如select、filter、aggregation、plot
- 由R语言或Pandas语言处理小数据集经验应用到处理分布式大数据集上
- 在1.3版本之前,叫SchemaRDD
<a name="ibXef"></a>
## RDD转换DataFrame

- 反射类型  RDD[caseClass]
- 自定义Schema   RDD[Row]<br />1、RDD中数据类型为Row<br />2、针对Row中数据定义为Schema<br />3、使用SparkSession中方法将定义的Schema应用到RDD[Row]上
<a name="wgyJj"></a>
## toDF指定列名称
RDD或者Seq中类型必须是元组
```scala
 val userDF: DataFrame = usersRDD
      .map { line =>
        val Array(id, age, sex, name, userId) = line.split("\\|")
        (id.toInt, age.toInt, sex, name, userId)
      }
      .toDF("id", "age", "sex", "name", "userId")

Dataset

结合RDD与DataFrame优势的一个新型数据抽象,建立在RDD之上的分布式数据集。三者之间相互转换如下:
image.png

数据分析

  • SQL编程,来源于Hive
  • DSL编程,调用Dataset API,主要包括两大类函数:类似RDD转换函数、类似SQL关键字函数

    数据源

    外部数据源

    从Spark1.4开始提供一套完善外部数据源接口,以便从任意数据源加载Load和保存Save数据

  • 加载读取数据:spark.read.format().option().load()

  • 保存写入数据:dataset/dataframe.write.mode().format().option().load()

    内置数据源

  • 文本文件:text、json、csv

  • 列式存储:parquet、orc
  • 数据库:jdbc、hive

    自定义UDF函数

  • 在SQL中使用:spark.udf.register(“函数名称”,(匿名函数)=>{ } )

  • 在DSL中使用:val name_udf = udf( ()=>{ } )

    Spark on Hive

    Spark on Hive与Hive on Spark

    image.png

SparkSQL读取Hive MetaStore元数据,只需要Hive MetaStore服务启动。
image.png

启动hive服务

  • hive —service metastore
    (必须带上后面参数,否则无法连接Failed to connect to the MetaStore)
  • nohup hive —service metastore > metastore.log 2>&1 &

    spark配置

    touch /opt/bigdata/spark/conf/hive-site.xml
    vim hive-site.xml ```xml <?xml version=”1.0” encoding=”UTF-8” standalone=”no”?> <?xml-stylesheet type=”text/xsl” href=”configuration.xsl”?>

hive.metastore.uris thrift://master01:9083

spark-shell --master local[2]
```scala
import org.apache.spark.sql.hive.HiveContext
val hc = new HiveContext(sc)
hc.sql("show databases").show

hc.sql("use test_hive")
hc.sql("select * from test_hive.employee").show

或者
val empDF = spark.read.table("test_hive.employee")
 empDF.show(20)

或者
spark.sql("select * from test_hive.employee").show(20)

其结果如下:
image.png

IDEA集成Spark on hive

  1. 引入依赖(scala.binary.version = 2.12 和 spark.version =3.1.2)

    <!-- spark SQL 集成Hive -->
    <dependency>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-hive_${scala.binary.version}</artifactId>
     <version>${spark.version}</version>
    </dependency>
    
  2. 引入配置文件(resources)

/opt/bigdata/hadoop/etc/hadoop/core-site.xml
/opt/bigdata/hadoop/etc/hadoop/hdfs-site.xml
/opt/bigdata/spark/conf/hive-site.xml

  1. Scala类编写

    object SparkOnHive {
    
    def main(args: Array[String]): Unit = {
     //采用构造者设计模式
     val spark: SparkSession = SparkSession
       .builder()
       .appName(this.getClass.getSimpleName.stripSuffix("$"))
       .master("local[4]")
       //指定hive元数据在hdfs上的位置
       .config("spark.sql.warehouse.dir", "hdfs://master01:8020/user/hive/warehouse")
       // 导入hive集成
       .config("hive.metastore.uris", "thrift://master01:9083")
       .enableHiveSupport()
       .config("spark.sql.shuffle.partitions", "4")
       .getOrCreate()
     // 导入隐式转换
     spark.read.table("test_hive.employee")
       .show(10, truncate = false)
    
     spark.sql("select * from test_hive.employee")
       .show(10, truncate = false)
    
     spark.close()
    }
    }
    

    注意:spark.config(“hive.metastore.uris”,”thrift://master01:9083”).enableHiveSupport()

    分布式SQL引擎

  • Spark SQL CLI(不推荐)
  • ThriftServer JDBC/ODBC Server(推荐)

    Hive远程用户名和密码配置

    vim /opt/bigdata/hive/conf/hive-site.xml
    <!--自定义远程连接用户名和密码-->
    <property>
      <name>hive.server2.authentication</name>
      <value>root</value>
    </property>
    <!--设置用户名和密码,如果有多个用户和密码,可以多写几个property-->
    <property>
         <name>hive.jdbc_passwd.auth.yonghu</name>
         <value>123456</value>
    </property>
    

    ThriftServer Server配置

    启动metastore服务

  1. nohup hive —service metastore > metastore.log 2>&1 &
  2. 启动thriftserver服务:start-thriftserver.sh —master local[2]
  3. beeline -u jdbc:hive2://master01:10000 -n root -p 123456
  4. 验证:http://master01:4040/jobs/

    不启动metastore服务

  5. 删除/opt/bigdata/spark/conf/ hive-site.xml,否则无法启动thriftserver

  6. 启动thriftserver服务(启动一个SparkSubmit进程)
    start-thriftserver.sh —hiveconf hive.server2.thrift.port=10000 —hiveconf hive.server2.thrift.bind.host=master01 —master local[2]
  7. beeline
  8. !connect jdbc:hive2://master01:10000
  9. 输入用户和密码
  10. 验证:http://master01:4040/jobs/

注意:如果hive配置mysql元数据库,还需要cp mysql-connector-java-8.0.22.jar /opt/bigdata/spark/jars/