第一章.SparkSQL编程

1.RDD、DataFrame、DataSet相互转换

&08[SparlSQL(编程_数据的加载和保存)] - 图1

  1. package com.atguigu.spark.day08
  2. import org.junit.Test
  3. /**
  4. * RDD,DataFrame,DataSet相互转换
  5. * rdd转DataFrame:通过toDF方法
  6. * rdd转DataSet:通过toDS方法
  7. * DataFrame转RDD: df.rdd
  8. * DataSet转RDD: ds.rdd
  9. * DataSet转DataFrame: ds.toDF(..)
  10. * DataFrame转DataSet: as[行的类型]
  11. * DataFrame转DataSet的时候,行的类型一般是写元组或者样例类
  12. * 行的类型是元组,此时元组的元素个数要和列的个数一致,类型也要一致
  13. * 行的类型是样例类,此时样例类属性的个数不能大于列的个数,属性名要与列名要一致
  14. *
  15. * Row类型的取值: row.getAs[列的类型](列名)
  16. */
  17. class $01_RDDDateFramaDataSet {
  18. import org.apache.spark.sql.SparkSession
  19. val spark = SparkSession.builder().master("local[4]").appName("test").getOrCreate()
  20. import spark.implicits._
  21. @Test
  22. def cover():Unit={
  23. val rdd = spark.
  24. sparkContext.parallelize(List((1,"zhangsan"),2->"lisi",3->"wangwu"))
  25. //RDD转DataFrame:通过toDF方法
  26. val df = rdd.toDF("id", "name")
  27. df.show
  28. //RDD转DataSet:通过toDS方法
  29. val ds = rdd.toDS()
  30. ds.show()
  31. //dataFrame转RDD: df.rdd
  32. val rdd2 = df.rdd
  33. val rdd3 = rdd2.map(row => {
  34. //row类型取值
  35. val name = row.getAs[String]("name")
  36. name
  37. })
  38. println(rdd3.collect().toList)
  39. //dateset转rdd: ds.rdd
  40. val rdd4 = ds.rdd
  41. println(rdd4.collect().toList)
  42. //dataset转dataframe: ds.toDF(..)
  43. val df2 = ds.toDF("id", "name")
  44. df2.show()
  45. //dateFrame转dateSet:as[行的类型]
  46. val ds3 = df.as[(Int, String)]
  47. ds3.show()
  48. val ds4 = df.as[AA]
  49. ds4.show()
  50. }
  51. }
  52. case class AA()

2.DataFrame与DataSet的区别

  1. package com.atguigu.spark.day08
  2. import com.atguigu.spark.day07.Person
  3. import org.junit.Test
  4. /**
  5. * DataFrame与DataSet的区别:
  6. * 1.DataFrame是只关注列的信息,不关注行的类型,是弱类型
  7. * DataSet即关注行也关注列,是强类型
  8. * 2.DataFrame是运行期安全,编译期不安全
  9. * DataSet是运行期和编译期都安全
  10. *
  11. *DataFrame与DataSet的使用场景
  12. * 1.如果需要将RDD转成SparkSQL操作
  13. * 如果RDD中元素类型是元组,此时推荐使用toDF重定义列名转成DataFrame
  14. * 如果RDD中元素类型是样例类,此时可以随意转换
  15. * 2.如果需要重定义列名推荐使用toDF重定义列名转成DataFrame
  16. * 3.如果需要使用map,flatMap这种强类型算子,推荐使用DataSet
  17. *
  18. */
  19. class $02_DataFrameDataSet {
  20. import org.apache.spark.sql.SparkSession
  21. val spark = SparkSession.builder().master("local[4]").appName("test").getOrCreate()
  22. import spark.implicits._
  23. @Test
  24. def diff():Unit={
  25. val list = List(
  26. Person(1,"lisi1",21,"shenzhen"),
  27. Person(2,"lisi2",22,"beijing"),
  28. Person(2,"lisi2",22,"beijing"),
  29. Person(2,"lisi2",30,"beijing"),
  30. Person(3,"lisi3",23,"tianj"),
  31. Person(4,"lisi4",24,"shanghai"),
  32. Person(6,"lisi4",35,"shenzhen"),
  33. Person(7,"lisi4",29,"hangzhou"),
  34. Person(8,"lisi4",30,"guangzhou")
  35. )
  36. val df = list.toDF()
  37. df.where("age>20").show()
  38. df.map(row=>row.getAs[String]("name")).show()
  39. val ds = list.toDS()
  40. //ds.map(x=>x.xx)
  41. }
  42. }

3.自定义UDF函数

  1. package com.atguigu.spark.day08
  2. /**
  3. * UDF:一进一出
  4. * UDAF:多进一出
  5. * UDTF:一进多出[spark没有]
  6. *
  7. * spark中自定义UDF函数
  8. * 1.定义一个函数
  9. * 2.将函数注册成udf函数
  10. * 3.使用
  11. */
  12. object $03_UDF {
  13. def main(args: Array[String]): Unit = {
  14. import org.apache.spark.sql.SparkSession
  15. val spark = SparkSession.builder().master("local[4]").appName("test").getOrCreate()
  16. import spark.implicits._
  17. val df = spark.sparkContext.parallelize(List(
  18. ("10001", "zhangsan"),
  19. ("00102", "zhangsan"),
  20. ("111000", "zhangsan"),
  21. ("010", "zhangsan"),
  22. ("00560", "zhangsan")
  23. )).toDF("id", "name")
  24. //将数据集注册成表
  25. df.createOrReplaceTempView("person")
  26. //注册udf函数
  27. spark.udf.register("xxx",prfixed _)
  28. //需求:员工id不满8位,员工id前面以0补齐
  29. spark.sql(
  30. """
  31. |select xxx(id),name from person
  32. |""".stripMargin
  33. ).show()
  34. }
  35. def prfixed(id:String):String={
  36. val currentLength = id.length
  37. "0" * (8-currentLength) +id
  38. }
  39. }

4.自定义UDAF函数(弱类型)

  • MyAvgWeakType.scala
  1. package com.atguigu.spark.day08
  2. import org.apache.parquet.filter2.predicate.Operators.UserDefined
  3. import org.apache.spark.sql.Row
  4. import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
  5. import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, StructField, StructType}
  6. /**
  7. * spark2.xx版本
  8. * 自定义UDAF函数(弱类型)
  9. */
  10. class MyAvgWeakType extends UserDefinedAggregateFunction{
  11. //自定义UDAF的参数类型
  12. override def inputSchema: StructType = {
  13. /*
  14. 第一种方式
  15. val fields = Array[StructField](
  16. StructField("input", IntegerType)
  17. )
  18. val schema = StructType(fields)
  19. schema*/
  20. //第二种方式
  21. new StructType().add("input",IntegerType)
  22. }
  23. //定义中间变量类型
  24. override def bufferSchema: StructType = {
  25. new StructType().add("sum",IntegerType).add("count",IntegerType)
  26. }
  27. //最终结果类型
  28. override def dataType: DataType = DoubleType
  29. //设置一致性
  30. override def deterministic: Boolean = true
  31. //初始化中间变量
  32. override def initialize(buffer: MutableAggregationBuffer): Unit = {
  33. //初始化sum
  34. buffer(0)=0
  35. //初始化count
  36. buffer(1)=0
  37. }
  38. /**
  39. * combine阶段对每个组进行聚合
  40. * @param buffer
  41. * @param input
  42. */
  43. override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
  44. //每次进来一个数据,sum+age
  45. buffer(0) = buffer.getAs[Int](0) + input.getAs[Int](0)
  46. //每次进来一个年龄,count+1
  47. buffer(1) = buffer.getAs[Int](1) + 1
  48. }
  49. /**
  50. * 在reduce阶段全局汇总
  51. * @param buffer1
  52. * @param buffer2
  53. */
  54. override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
  55. //将多个分区的sum汇总
  56. buffer1(0) = buffer1.getAs[Int](0) + buffer2.getAs[Int](0)
  57. //将多个分区的count汇总
  58. buffer1(1) = buffer1.getAs[Int](1) + buffer2.getAs[Int](1)
  59. }
  60. //计算最终结果
  61. override def evaluate(buffer: Row): Any = {
  62. buffer.getAs[Int](0).toDouble / buffer.getAs[Int](1)
  63. }
  64. }
  • UDAF.scala
  1. package com.atguigu.spark.day08
  2. import com.atguigu.spark.day07.Person
  3. object $04_UDAF {
  4. def main(args: Array[String]): Unit = {
  5. import org.apache.spark.sql.SparkSession
  6. val spark = SparkSession.builder().master("local[4]").appName("test").getOrCreate()
  7. import spark.implicits._
  8. val list = List(
  9. Person(1,"lisi1",21,"shenzhen"),
  10. Person(2,"lisi2",22,"beijing"),
  11. Person(2,"lisi2",22,"beijing"),
  12. Person(2,"lisi2",30,"beijing"),
  13. Person(3,"lisi3",23,"tianjin"),
  14. Person(4,"lisi4",24,"shanghai"),
  15. Person(6,"lisi4",35,"shenzhen"),
  16. Person(7,"lisi4",29,"hangzhou"),
  17. Person(8,"lisi4",30,"guangzhou")
  18. )
  19. val df = list.toDF()
  20. df.createOrReplaceTempView("person")
  21. //注册udaf函数(弱类型)
  22. spark.udf.register("myAvg",new MyAvgWeakType)
  23. spark.sql(
  24. """
  25. |select address,myAvg(age) avg_age from person group by address
  26. |""".stripMargin
  27. ).show()
  28. }
  29. }

5.自定义UDAF函数(强类型)

  • MyAvgStronglyType.scala
  1. package com.atguigu.spark.day08
  2. import org.apache.spark.sql.{Encoder,Encoders}
  3. import org.apache.spark.sql.expressions.Aggregator
  4. /**
  5. * spark3.xx版本
  6. * 自定义UDAF函数(强类型)
  7. *
  8. */
  9. case class AvgBuff(var sum:Int,var count:Int)
  10. class MyAvgStronglyType extends Aggregator[Int,AvgBuff,Double]{
  11. //初始化中间变量
  12. override def zero: AvgBuff = {
  13. AvgBuff(0,0)
  14. }
  15. //在combine阶段的聚合逻辑
  16. override def reduce(b: AvgBuff, a: Int): AvgBuff = {
  17. b.sum = b.sum + a
  18. b.count = b.count + 1
  19. }
  20. //在reduce阶段的聚合逻辑
  21. override def merge(b1: AvgBuff, b2: AvgBuff): AvgBuff = {
  22. b1.sum = b1.sum + b2.sum
  23. b1.count = b1.count + b2.count
  24. b1
  25. }
  26. //计算最终结果
  27. override def finish(reduction: AvgBuff): Double = {
  28. reduction.sum.toDouble / reduction.count
  29. }
  30. //指定中间变量的编码格式
  31. override def bufferEncoder: Encoder[AvgBuff] = Encoders.product[AvgBuff]
  32. //指定最终结果类型的编码格式
  33. override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
  34. }
  • UDAF.scala
  1. package com.atguigu.spark.day08
  2. import com.atguigu.spark.day07.Person
  3. object $04_UDAF {
  4. def main(args: Array[String]): Unit = {
  5. import org.apache.spark.sql.SparkSession
  6. val spark = SparkSession.builder().master("local[4]").appName("test").getOrCreate()
  7. import spark.implicits._
  8. val list = List(
  9. Person(1,"lisi1",21,"shenzhen"),
  10. Person(2,"lisi2",22,"beijing"),
  11. Person(2,"lisi2",22,"beijing"),
  12. Person(2,"lisi2",30,"beijing"),
  13. Person(3,"lisi3",23,"tianjin"),
  14. Person(4,"lisi4",24,"shanghai"),
  15. Person(6,"lisi4",35,"shenzhen"),
  16. Person(7,"lisi4",29,"hangzhou"),
  17. Person(8,"lisi4",30,"guangzhou")
  18. )
  19. val df = list.toDF()
  20. df.createOrReplaceTempView("person")
  21. /*
  22. 注册udaf函数(弱类型)
  23. spark.udf.register("myAvg",new MyAvgWeakType)
  24. */
  25. //注册udaf函数(强类型)
  26. import org.apache.spark.sql.functions._
  27. spark.udf.register("myAvg",udaf(new MyAvgStronglyType))
  28. spark.sql(
  29. """
  30. |select address,myAvg(age) avg_age from person group by address
  31. |""".stripMargin
  32. ).show()
  33. }
  34. }

第二章.SparkSQL数据的加载与保存

1.读取文件

  1. package com.atguigu.spark.day08
  2. import java.util.Properties
  3. import org.junit.Test
  4. import org.apache.spark.sql.SaveMode
  5. class $05_Reader {
  6. import org.apache.spark.sql.SparkSession
  7. val spark = SparkSession.builder().master("local[4]").appName("test").getOrCreate()
  8. import spark.implicits._
  9. /**
  10. * 文件读取方式:
  11. * 1.spark.read
  12. * .format(text/json/csv/jdbc/orc/parquet) --指定文件格式
  13. * .option(K,V) --设置读取的参数
  14. * .load(path) --加载数据
  15. * 2.spark.read.textFile/json/orc/csv/parquet
  16. */
  17. @Test
  18. def read():Unit={
  19. /*
  20. 读取文本数据
  21. spark.read.textFile("datas/wc.txt").show()
  22. spark.read.format("text").load("datas/wc.txt").show()
  23. */
  24. /*
  25. 读取json数据
  26. spark.read.json("datas/pmt.json").show()
  27. spark.read.format("json").load("datas/pmt.json").show()
  28. */
  29. /**
  30. * 读取csv数据
  31. * 常用option:
  32. * sep:设置字段之间的分割符
  33. * header:是否以文件的第一行作为列名
  34. * inferSchema:是否推断列的类型
  35. *
  36. * spark.read.option("header","true").option("inferSchema","true")
  37. * .csv("datas/presidential_polls.csv").printSchema()
  38. * spark.read.format("csv").option("header","true")
  39. * .option("inferSchema","true").load("datas/presidential_polls.csv").show()
  40. *
  41. */
  42. /*
  43. 保存为parquet文件
  44. spark.read.format("csv").option("header", "true")
  45. .option("inferSchema", "true")
  46. .load("datas/presidential_polls.csv")
  47. .write.mode(SaveMode.Overwrite).parquet("output/parquet")
  48. */
  49. /*
  50. 读取parquet文件
  51. spark.read.load("output/parquet").show()
  52. spark.read.format("parquet").load("output/parquet").show
  53. */
  54. }
  55. }

2.读取jdbc

  1. /**
  2. * 读取mysql数据
  3. */
  4. @Test
  5. def readJdbc():Unit={
  6. //第一种方式
  7. spark.read.format("jdbc")
  8. .option("url","jdbc:mysql://hadoop102:3306/gmall")
  9. .option("dbtable","user_info")
  10. .option("user","root")
  11. .option("password","321074")
  12. .load()
  13. .show()
  14. //第二种方式
  15. //此种方式读取mysql只会生成一个分区<只用于小数据量场景>
  16. val url = "jdbc:mysql://hadoop102:3306/gmall"
  17. val tableName = "user_info"
  18. val props = new Properties()
  19. props.setProperty("user","root")
  20. props.setProperty("password","321074")
  21. val df = spark.read.jdbc(url, tableName, props)
  22. df.show()
  23. println(df.rdd.getNumPartitions)
  24. //此种方式读取mysql的分区数 = 数组中where条件的个数<不用>
  25. val condition =Array("id<20","id>=20 and id<50","id>=50")
  26. val df2 = spark.read.jdbc(url,tableName,condition,props)
  27. println(df2.rdd.getNumPartitions)
  28. /*
  29. 第三种方式(常用)
  30. columnName必须是数字,日期,时间戳类型的列名
  31. 此种方式读取的mysql分区数 = upperBound-lowerBound >= numPartitions ? numPartition : upperBound - lowerBound
  32. */
  33. //动态获取lowerBound与upperBound
  34. val minDF = spark.read.jdbc(url,"(select min(id) min_id from user_info) user_min_id",props)
  35. val minRdd = minDF.rdd
  36. val minid = minRdd.collect().head.getAs[Long]("min_id")
  37. val maxDF = spark.read.jdbc(url,"(select max(id) max_id from user_info) user_max_id",props)
  38. val maxRdd = maxDF.rdd
  39. val maxid = maxRdd.collect().head.getAs[Long]("max_id")
  40. println(minid,maxid)
  41. val df3 = spark.read.jdbc(url,tableName,"id",minid,maxid,5,props)
  42. println(df3.rdd.getNumPartitions)
  43. }

3.读取mysql的分区数源码

  1. def columnPartition(
  2. schema: StructType,
  3. resolver: Resolver,
  4. timeZoneId: String,
  5. jdbcOptions: JDBCOptions): Array[Partition] = {
  6. val partitioning = {
  7. import JDBCOptions._
  8. val partitionColumn = jdbcOptions.partitionColumn
  9. //partitionColumn = "id"
  10. val lowerBound = jdbcOptions.lowerBound
  11. // lowerBound = 1
  12. val upperBound = jdbcOptions.upperBound
  13. // upperBound = 100
  14. val numPartitions = jdbcOptions.numPartitions
  15. //numPartitions = 5
  16. //没有指定分区列
  17. if (partitionColumn.isEmpty) {
  18. assert(lowerBound.isEmpty && upperBound.isEmpty, "When 'partitionColumn' is not " +
  19. s"specified, '$JDBC_LOWER_BOUND' and '$JDBC_UPPER_BOUND' are expected to be empty")
  20. null
  21. } else {
  22. //有指定分区列
  23. assert(lowerBound.nonEmpty && upperBound.nonEmpty && numPartitions.nonEmpty,
  24. s"When 'partitionColumn' is specified, '$JDBC_LOWER_BOUND', '$JDBC_UPPER_BOUND', and " +
  25. s"'$JDBC_NUM_PARTITIONS' are also required")
  26. //判断分区列的类型是否为数字、日期、时间戳类型,如果不是则抛错
  27. val (column, columnType) = verifyAndGetNormalizedPartitionColumn(
  28. schema, partitionColumn.get, resolver, jdbcOptions)
  29. //
  30. val lowerBoundValue = toInternalBoundValue(lowerBound.get, columnType, timeZoneId)
  31. //lowerBoundValue = 1L
  32. val upperBoundValue = toInternalBoundValue(upperBound.get, columnType, timeZoneId)
  33. //upperBoundValue = 100L
  34. JDBCPartitioningInfo(
  35. column, columnType, lowerBoundValue, upperBoundValue, numPartitions.get)
  36. }
  37. }
  38. //如果分区列没有指定则只分配一个分区
  39. if (partitioning == null || partitioning.numPartitions <= 1 ||
  40. partitioning.lowerBound == partitioning.upperBound) {
  41. return Array[Partition](JDBCPartition(null, 0))
  42. }
  43. val lowerBound = partitioning.lowerBound
  44. //lowerBound = 1
  45. val upperBound = partitioning.upperBound
  46. //upperBound = 100
  47. val boundValueToString: Long => String =
  48. toBoundValueInWhereClause(_, partitioning.columnType, timeZoneId)
  49. //分区数 = (upperBound - lowerBound) >= partitioning.numPartitions ? partitioning.numPartitions : upperBound - lowerBound
  50. val numPartitions =
  51. if ((upperBound - lowerBound) >= partitioning.numPartitions || /* check for overflow */
  52. (upperBound - lowerBound) < 0) {
  53. partitioning.numPartitions
  54. } else {
  55. logWarning("The number of partitions is reduced because the specified number of " +
  56. "partitions is less than the difference between upper bound and lower bound. " +
  57. s"Updated number of partitions: ${upperBound - lowerBound}; Input number of " +
  58. s"partitions: ${partitioning.numPartitions}; " +
  59. s"Lower bound: ${boundValueToString(lowerBound)}; " +
  60. s"Upper bound: ${boundValueToString(upperBound)}.")
  61. upperBound - lowerBound
  62. }
  63. // 计算每个分区的数据的步长 = 100 / 5 - 1/5 = 20
  64. val stride: Long = upperBound / numPartitions - lowerBound / numPartitions
  65. var i: Int = 0
  66. //column = "id"
  67. val column = partitioning.column
  68. var currentValue = lowerBound
  69. //currentValue = 1
  70. //创建一个存储分区的容器
  71. val ans = new ArrayBuffer[Partition]()
  72. while (i < numPartitions) {
  73. //第一次遍历 i = 0 numPartitions=5 currentValue=1
  74. // lBoundValue = "1"
  75. // lBound = null
  76. // currentValue = currentValue + stride = 1 + 20 = 21
  77. // uBoundValue = "21"
  78. // uBound = s"id < 21"
  79. // whereClause = s"id < 21 or id is null"
  80. //第二次遍历 i = 1 numPartitions=5 currentValue=21
  81. // lBoundValue = "21"
  82. // lBound = s"id >= 21"
  83. // currentValue = currentValue + stride = 21 + 20 = 41
  84. // uBoundValue = "41"
  85. // uBound = s"id < 41"
  86. // whereClause = "id >= 21 and id < 41"
  87. val lBoundValue = boundValueToString(currentValue)
  88. val lBound = if (i != 0) s"$column >= $lBoundValue" else null
  89. currentValue += stride
  90. val uBoundValue = boundValueToString(currentValue)
  91. val uBound = if (i != numPartitions - 1) s"$column < $uBoundValue" else null
  92. val whereClause =
  93. if (uBound == null) {
  94. lBound
  95. } else if (lBound == null) {
  96. s"$uBound or $column is null"
  97. } else {
  98. s"$lBound AND $uBound"
  99. }
  100. ans += JDBCPartition(whereClause, i)
  101. i = i + 1
  102. }
  103. val partitions = ans.toArray
  104. logInfo(s"Number of partitions: $numPartitions, WHERE clauses of these partitions: " +
  105. partitions.map(_.asInstanceOf[JDBCPartition].whereClause).mkString(", "))
  106. partitions
  107. }

4.保存数据

  1. @Test
  2. def write():Unit={
  3. val df = spark.read.json("datas/pmt.json")
  4. //保存为文本
  5. val ds = df.toJSON
  6. //ds.write.mode(SaveMode.Overwrite).text("output/text")
  7. //ds.write.mode(SaveMode.Overwrite).format("text").save("output/text1")
  8. //保存为json
  9. //df.write.mode(SaveMode.Overwrite).format("json").save("output/json")
  10. //df.write.mode(SaveMode.Overwrite).json("output/text")
  11. //保存为parquet
  12. //df.write.mode(SaveMode.Overwrite).format("parquet").save("output/parquet")
  13. //df.write.mode(SaveMode.Overwrite).parquet("output/parquet")
  14. //保存为csv
  15. //df.write.mode(SaveMode.Overwrite).option("sep","#").option("header","true").format("csv").save("output/csv")
  16. //df.write.mode(SaveMode.Overwrite).option("sep","#").option("header","true").csv("output/csv")
  17. //保存数据到mysql
  18. val props = new Properties()
  19. props.setProperty("user","root")
  20. props.setProperty("password","root123")
  21. //df.write.mode(SaveMode.Append).jdbc("jdbc:mysql://hadoop102:3306/test","xx",props)
  22. //上面直接写入数据到mysql的时候可能出现主键冲突的问题,此时需要使用foreachPartitions,自己使用 INSERT INTO xx VALUES (..) ON DUPLICATE KEY UPDATE(....) 更新数据。
  23. //df.rdd.foreachPartition(x=> //)
  24. }

5.Spark整合Hive

SparkSQL可以采用内嵌Hive,也可以采用外部Hive,企业开发中,通常采用外部Hive

一.内嵌Hive应用

直接进入spark-yarn,直接使用spark-shell即可

&08[SparlSQL(编程_数据的加载和保存)] - 图2

执行完后,发现多了metastore_db和derby.log,用于存储元数据,spark-warehouse,用于存储数据库数据

&08[SparlSQL(编程_数据的加载和保存)] - 图3

然而在实际使用中,几乎没有任何人会使用内置的Hive,因为元数据存储在derby数据库,不支持多客户端访问

二.外部Hive应用

如果spark要接管Hive外部已经部署好的hive,需要通过以下几个步骤

  1. 为了说明内嵌hive与外部hive的区别:删除内嵌hive的metastore_db , spark_warehouse
  1. rm -rf spark-warehouse/ metastore_db/
  1. 将hive-site.xml拷贝到spark的conf目录下
  1. cp /opt/module/hive-3.1.2/conf/hive-site.xml conf/
  1. 把mysql的驱动包拷贝到spark的jars目录下
  1. cp /opt/software/mysql-connector-java-8.0.19.jar ./jars
  1. 启动spark-sql
  1. bin/spark-sql

&08[SparlSQL(编程_数据的加载和保存)] - 图4

  1. idea操作Hive

一.添加spark-hive依赖

  1. <dependency>
  2. <groupId>org.apache.spark</groupId>
  3. <artifactId>spark-hive_2.12</artifactId>
  4. <version>3.0.0</version>
  5. </dependency>

二.拷贝hive-site.xml到resources目录

三.编写代码

package com.atguigu.spark.day08

object $06_SparkHive {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME","atguigu")
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder()
      .master("local[4]").
      appName("test")
      //开启hive支持
      .enableHiveSupport()
      .getOrCreate()
    import spark.implicits._
    spark.sql("select * from student").show()

  }
}