package com.sqlimport org.apache.log4j.{Level, Logger}import org.apache.spark.SparkConfimport org.apache.spark.sql.{DataFrame, SparkSession}object Spark_SQL_Basic { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.ERROR) //Spark环境 val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getName) val sc = SparkSession.builder().config(conf).getOrCreate() //TODO 逻辑操作 //隐式转换,sc不是包,而是spark的SparkSession类 import sc.implicits._ //DataFrame println("-----------DataFrame常规展示----------------------") //读取JSON数据 val df: DataFrame = sc.read.json("data/user.json") df.show() println("-------------DataFrame SQL展示 --------------------") //DataFrame => SQL df.createTempView("user") sc.sql("select * from user").show() println("------------DataFrame DSL展示--------------------") //DataFrame => DSL df.select("username","age").show() //运算的时候,一定要加入隐式转换, df.select($"age" + 1 as "age").show() df.select('age + 2 as "age").show() //DataSet //DataFrame其实是特殊泛型的DataSet println("---------以下为DataSet展示---------") val seq = Seq(1,2,3,4) val ds = seq.toDS() ds.show() //RDD <=> DataFrame println("---------以下为RDD <=> DataFrame展示---------") val list = List((1, "huanhuan",18), (2, "haoge",19)) val rdd = sc.sparkContext.makeRDD(list) val df1: DataFrame = rdd.toDF("id", "name", "age") df1.show() val rowRDD = df1.rdd rowRDD.collect().foreach(println) //DataSet <=> DataFrame println("---------DataSet <=> DataFrame展示---------") val ds1 = df1.as[User] ds1.show() val df2 = ds1.toDF() df2.show(1) //只展示一个 //RDD <=> DataSet println("---------DataSet <=> RDD展示---------") //RDD转换DataSet 尽量用模式匹配。 val df3 = rdd.map { case (id, name, age) => { User(id, name, age) } }.toDS() df3.show() val rdd1 = df3.rdd rdd1.collect().foreach(println) //停止Spark sc.stop() } case class User(id:Int,name:String,age:Int)}