package com.sql
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
object Spark_SQL_Basic {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
//Spark环境
val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getName)
val sc = SparkSession.builder().config(conf).getOrCreate()
//TODO 逻辑操作
//隐式转换,sc不是包,而是spark的SparkSession类
import sc.implicits._
//DataFrame
println("-----------DataFrame常规展示----------------------")
//读取JSON数据
val df: DataFrame = sc.read.json("data/user.json")
df.show()
println("-------------DataFrame SQL展示 --------------------")
//DataFrame => SQL
df.createTempView("user")
sc.sql("select * from user").show()
println("------------DataFrame DSL展示--------------------")
//DataFrame => DSL
df.select("username","age").show()
//运算的时候,一定要加入隐式转换,
df.select($"age" + 1 as "age").show()
df.select('age + 2 as "age").show()
//DataSet
//DataFrame其实是特殊泛型的DataSet
println("---------以下为DataSet展示---------")
val seq = Seq(1,2,3,4)
val ds = seq.toDS()
ds.show()
//RDD <=> DataFrame
println("---------以下为RDD <=> DataFrame展示---------")
val list = List((1, "huanhuan",18), (2, "haoge",19))
val rdd = sc.sparkContext.makeRDD(list)
val df1: DataFrame = rdd.toDF("id", "name", "age")
df1.show()
val rowRDD = df1.rdd
rowRDD.collect().foreach(println)
//DataSet <=> DataFrame
println("---------DataSet <=> DataFrame展示---------")
val ds1 = df1.as[User]
ds1.show()
val df2 = ds1.toDF()
df2.show(1) //只展示一个
//RDD <=> DataSet
println("---------DataSet <=> RDD展示---------")
//RDD转换DataSet 尽量用模式匹配。
val df3 = rdd.map {
case (id, name, age) => {
User(id, name, age)
}
}.toDS()
df3.show()
val rdd1 = df3.rdd
rdd1.collect().foreach(println)
//停止Spark
sc.stop()
}
case class User(id:Int,name:String,age:Int)
}