1. package com.sql
    2. import org.apache.log4j.{Level, Logger}
    3. import org.apache.spark.SparkConf
    4. import org.apache.spark.sql.{DataFrame, SparkSession}
    5. object Spark_SQL_Basic {
    6. def main(args: Array[String]): Unit = {
    7. Logger.getLogger("org").setLevel(Level.ERROR)
    8. //Spark环境
    9. val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getName)
    10. val sc = SparkSession.builder().config(conf).getOrCreate()
    11. //TODO 逻辑操作
    12. //隐式转换,sc不是包,而是spark的SparkSession类
    13. import sc.implicits._
    14. //DataFrame
    15. println("-----------DataFrame常规展示----------------------")
    16. //读取JSON数据
    17. val df: DataFrame = sc.read.json("data/user.json")
    18. df.show()
    19. println("-------------DataFrame SQL展示 --------------------")
    20. //DataFrame => SQL
    21. df.createTempView("user")
    22. sc.sql("select * from user").show()
    23. println("------------DataFrame DSL展示--------------------")
    24. //DataFrame => DSL
    25. df.select("username","age").show()
    26. //运算的时候,一定要加入隐式转换,
    27. df.select($"age" + 1 as "age").show()
    28. df.select('age + 2 as "age").show()
    29. //DataSet
    30. //DataFrame其实是特殊泛型的DataSet
    31. println("---------以下为DataSet展示---------")
    32. val seq = Seq(1,2,3,4)
    33. val ds = seq.toDS()
    34. ds.show()
    35. //RDD <=> DataFrame
    36. println("---------以下为RDD <=> DataFrame展示---------")
    37. val list = List((1, "huanhuan",18), (2, "haoge",19))
    38. val rdd = sc.sparkContext.makeRDD(list)
    39. val df1: DataFrame = rdd.toDF("id", "name", "age")
    40. df1.show()
    41. val rowRDD = df1.rdd
    42. rowRDD.collect().foreach(println)
    43. //DataSet <=> DataFrame
    44. println("---------DataSet <=> DataFrame展示---------")
    45. val ds1 = df1.as[User]
    46. ds1.show()
    47. val df2 = ds1.toDF()
    48. df2.show(1) //只展示一个
    49. //RDD <=> DataSet
    50. println("---------DataSet <=> RDD展示---------")
    51. //RDD转换DataSet 尽量用模式匹配。
    52. val df3 = rdd.map {
    53. case (id, name, age) => {
    54. User(id, name, age)
    55. }
    56. }.toDS()
    57. df3.show()
    58. val rdd1 = df3.rdd
    59. rdd1.collect().foreach(println)
    60. //停止Spark
    61. sc.stop()
    62. }
    63. case class User(id:Int,name:String,age:Int)
    64. }