本地运行

  1. val spark = SparkSession.builder().master("local[*]").appName("RealTime DataFlow") .config("hive.exec.dynamic.partition.mode", "nonstrict") .config("spark.streaming.kafka.consumer.cache.enabled", false) .config("spark.sql.streaming.metricsEnabled", "true") .getOrCreate()

Spark之DataFrame创建

RDD 转换 DataFrame

import spark.implicits._val df = spark.sparkContext.makeRDD(Seq(  ("A", 20, "B"),  ("B", 50, "F"),  ("C", 18, "B"),  ("D", 21, "G"))).toDF("child", "age", "parent")

HDFS文件转换DataFrame

//读取csvval df = spark.read.option("header", "true").csv("/tmp/demo.csv")//读取jsonval df = spark.read.json("/tmp/demo.json")//读取textval df = spark.read.textFile("/tmp/demo.txt")

Hive转换DataFrame

//读取hive表val df = spark.sql("select * from demo")

mysql转换DataFrame

方式1:

//读取mysqlval jdbcDF = spark.read.format("jdbc")  .option("url", url)  .option("driver", "com.mysql.jdbc.Driver")  .option("dbtable", "demo")  .option("user", "mysql username")  .option("password", "mysql password")  .load()

方式2:

//读取mysqlval jdbcDF = spark.read.format("jdbc")  .option("url", url)  .option("driver", "com.mysql.jdbc.Driver")  .option("dbtable", "(select * from demo) as temp")  .option("user", "mysql username")  .option("password", "mysql password")  .load()

Spark之DataFrame常用操作

基础数据

 val spark = SparkSession.builder()  .master("local[1]")  .appName("DataFrameTest")  .getOrCreate()import spark.implicits._val df = spark.sparkContext.makeRDD(Seq(  ("A", 20, "B"),  ("B", 50, "F"),  ("C", 18, "B"),  ("D", 21, "G"))).toDF("child", "age", "parent")

添加列

//添加常量列val df2 = df.withColumn("gender", functions.lit("male"))df2.show()  //通过自定义函数添加一列val code:(Int => String) = (age: Int) => {if (age < 30) "young" else "old"}val myudf = functions.udf(code)val df3 = df.withColumn("state", myudf(df.col("age")))df3.show() //注意了,注意了,自定义函数不支持常量字段,必须传入列作为函数的参数。 val code2:((Int,Int) => String) = (age: Int,kage: Int) => {if (age < kage) "young" else "old"}val myudf2 = functions.udf(code2)//myudf2第第二个参数不能直接写成18,必须是一个Column类型参数val df4 = df.withColumn("state", myudf2(df.col("age"), functions.lit(18)))df4.show()  //如果列名已经存在,则会替换旧的df.withColumn("parent", functions.lit("bob")).show()

修改列

//修改列名df.withColumnRenamed("parent", "p").show()

删除列

//删除一列val df2 = df.drop("child")df2.show() //删除多列val df3 = df.drop("age", "parent")df3.show()

查询部分列

//通过列名选择val df2 = df.select("child", "parent")df2.show()//通过Column选择,可以对字段进行重命名val df3 = df.select(df.col("child"), df.col("parent").alias("p"))df3.show()//通过Column选择,可以对字段进行类型转换val df4 = df.select(df.col("child").cast(StringType).alias("c"), df.col("parent").cast(StringType).alias("p"), df.col("age").cast(IntegerType).alias("age"))df4.show()

Spark之DataFrame连接操作

连接类型 描述 简化记忆
内连接(inner) 左右两边都匹配的数据 A ∩ B
左连接(left join) 以左边为准,右边没有匹配则为null ^A ∪ (A ∩ B)
右连接(right join) 以右边为准, 左边没有匹配则为null (A ∩ B) ∪ ^B
外连接(outer) 两边都要 ^A ∪ (A ∩ B) ∪ ^B

说明: ^A 表示 A - (A ∩ B), ^B 表示 B - (A ∩ B)

数据准备

val spark = SparkSession.builder()  .master("local[1]")  .appName("DataFrameTest")  .getOrCreate()import spark.implicits._//用户信息val userDf = spark.sparkContext.makeRDD(Seq(  (1, "张三", 20, "male", 11, 2),  (2, "李四", 50, "female", 22, 3),  (3, "王五", 18, "male", 11, 3))).toDF("id", "name", "age", "gender", "deptId", "leader")//部分信息val deptDf = spark.sparkContext.makeRDD(Seq(  (10, "IT技术部"),  (11, "研发部"),  (12, "行政部"))).toDF("id", "name")

内连接

import spark.implicits._ //第一种方式val df2 = userDf.join(deptDf, userDf("deptId") === deptDf("id"), "inner")df2.select(userDf("*"), deptDf("name").alias("deptName")).show() //第二种方式val df3 = userDf.alias("first").join(deptDf.alias("second"), $"first.deptId" === $"second.id", "inner")df3.selectExpr("first.*", "second.name as deptName").show()

结果

左连接

import spark.implicits._ //第一种方式val df2 = userDf.join(deptDf, userDf("deptId") === deptDf("id"), "left")df2.select(userDf("*"), deptDf("name").alias("deptName")).show() //第二种方式val df3 = userDf.alias("first").join(deptDf.alias("second"), $"first.deptId" === $"second.id", "left")df3.selectExpr("first.*", "second.name as deptName").show()

结果

右连接

import spark.implicits._//第一种方式val df2 = userDf.join(deptDf, userDf("deptId") === deptDf("id"), "right")df2.select(userDf("*"), deptDf("name").alias("deptName"),  deptDf("id").alias("deptId")).show()//第二种方式val df3 = userDf.alias("first").join(deptDf.alias("second"), $"first.deptId" === $"second.id", "right")df3.selectExpr("first.*", "second.name as deptName", "second.id as deptId").show()

结果

外连接

import spark.implicits._val df2 = userDf.join(deptDf, userDf("deptId") === deptDf("id"), "outer")df2.select(userDf("*"), deptDf("name").alias("deptName"),  deptDf("id").alias("deptId")).show()val df3 = userDf.alias("first").join(deptDf.alias("second"), $"first.deptId" === $"second.id", "outer")df3.selectExpr("first.*", "second.name as deptName", "second.id as deptId").show()

结果:

Spark之DataFrame聚合操作

数据准备

一个关于学生选课成绩的数据

val spark = SparkSession.builder()  .master("local[1]")  .appName("WordCount")  .config("spark.streaming.kafka.consumer.cache.enabled", false)  .getOrCreate()import spark.implicits._val df = spark.sparkContext.makeRDD(Seq(  ("A", 20, "男", "scala", 90),  ("B", 18, "女", "scala", 80),  ("C", 22, "男", "scala", 100),  ("A", 20, "男", "java", 60),  ("C", 22, "男", "java", 50),  ("A", 20, "男", "c", 50),  ("B", 21, "女", "c", 50))).toDF("name", "age", "gender", "course","score")

聚合示例

统计每个选修课程的参与人数

val df1 = df.groupBy("course").count()df1.show(false)

执行结果

+------+-----+|course|count|+------+-----+|scala |3    ||c     |2    ||java  |2    |+------+-----+

统计各选修课程男女的平均成绩

val df2 = df.groupBy("course","gender").avg("score")df2.show(false)

执行结果

+------+------+----------+|course|gender|avg(score)|+------+------+----------+|c     |女    |50.0      ||java  |男    |55.0      ||c     |男    |50.0      ||scala |女    |80.0      ||scala |男    |95.0      |+------+------+----------+

统计各选修课程的人数及选修人员姓名

import org.apache.spark.sql.functions._val df3 = df.groupBy("course").agg(count("name"), collect_list("name"))df3.show(false)

执行结果

+------+-----------+------------------+|course|count(name)|collect_list(name)|+------+-----------+------------------+|scala |3          |[A, B, C]         ||c     |2          |[A, B]            ||java  |2          |[A, C]            |+------+-----------+------------------+

统计各选修课程中男女成绩最好的用户姓名

val df4 = df.groupByKey(row => (row.getAs[String]("course"), row.getAs[String]("gender"))).mapGroups({  case ((course, gender), users) => (course, gender, users.maxBy(r => r.getAs[Integer]("score")).getAs[String]("name"))}).toDF("course", "gender", "name")df4.show(false)

执行结果

+------+------+----+|course|gender|name|+------+------+----+|c     |女    |B   ||java  |男    |A   ||c     |男    |A   ||scala |女    |B   ||scala |男    |C   |+------+------+----+