本地运行
val spark = SparkSession.builder().master("local[*]").appName("RealTime DataFlow") .config("hive.exec.dynamic.partition.mode", "nonstrict") .config("spark.streaming.kafka.consumer.cache.enabled", false) .config("spark.sql.streaming.metricsEnabled", "true") .getOrCreate()
Spark之DataFrame创建
RDD 转换 DataFrame
import spark.implicits._val df = spark.sparkContext.makeRDD(Seq( ("A", 20, "B"), ("B", 50, "F"), ("C", 18, "B"), ("D", 21, "G"))).toDF("child", "age", "parent")
HDFS文件转换DataFrame
//读取csvval df = spark.read.option("header", "true").csv("/tmp/demo.csv")//读取jsonval df = spark.read.json("/tmp/demo.json")//读取textval df = spark.read.textFile("/tmp/demo.txt")
Hive转换DataFrame
//读取hive表val df = spark.sql("select * from demo")
mysql转换DataFrame
方式1:
//读取mysqlval jdbcDF = spark.read.format("jdbc") .option("url", url) .option("driver", "com.mysql.jdbc.Driver") .option("dbtable", "demo") .option("user", "mysql username") .option("password", "mysql password") .load()
方式2:
//读取mysqlval jdbcDF = spark.read.format("jdbc") .option("url", url) .option("driver", "com.mysql.jdbc.Driver") .option("dbtable", "(select * from demo) as temp") .option("user", "mysql username") .option("password", "mysql password") .load()
Spark之DataFrame常用操作
基础数据
val spark = SparkSession.builder() .master("local[1]") .appName("DataFrameTest") .getOrCreate()import spark.implicits._val df = spark.sparkContext.makeRDD(Seq( ("A", 20, "B"), ("B", 50, "F"), ("C", 18, "B"), ("D", 21, "G"))).toDF("child", "age", "parent")
添加列
//添加常量列val df2 = df.withColumn("gender", functions.lit("male"))df2.show() //通过自定义函数添加一列val code:(Int => String) = (age: Int) => {if (age < 30) "young" else "old"}val myudf = functions.udf(code)val df3 = df.withColumn("state", myudf(df.col("age")))df3.show() //注意了,注意了,自定义函数不支持常量字段,必须传入列作为函数的参数。 val code2:((Int,Int) => String) = (age: Int,kage: Int) => {if (age < kage) "young" else "old"}val myudf2 = functions.udf(code2)//myudf2第第二个参数不能直接写成18,必须是一个Column类型参数val df4 = df.withColumn("state", myudf2(df.col("age"), functions.lit(18)))df4.show() //如果列名已经存在,则会替换旧的df.withColumn("parent", functions.lit("bob")).show()
修改列
//修改列名df.withColumnRenamed("parent", "p").show()
删除列
//删除一列val df2 = df.drop("child")df2.show() //删除多列val df3 = df.drop("age", "parent")df3.show()
查询部分列
//通过列名选择val df2 = df.select("child", "parent")df2.show()//通过Column选择,可以对字段进行重命名val df3 = df.select(df.col("child"), df.col("parent").alias("p"))df3.show()//通过Column选择,可以对字段进行类型转换val df4 = df.select(df.col("child").cast(StringType).alias("c"), df.col("parent").cast(StringType).alias("p"), df.col("age").cast(IntegerType).alias("age"))df4.show()
Spark之DataFrame连接操作
连接类型 | 描述 | 简化记忆 |
内连接(inner) | 左右两边都匹配的数据 | A ∩ B |
左连接(left join) | 以左边为准,右边没有匹配则为null | ^A ∪ (A ∩ B) |
右连接(right join) | 以右边为准, 左边没有匹配则为null | (A ∩ B) ∪ ^B |
外连接(outer) | 两边都要 | ^A ∪ (A ∩ B) ∪ ^B |
说明: ^A 表示 A - (A ∩ B), ^B 表示 B - (A ∩ B)
数据准备
val spark = SparkSession.builder() .master("local[1]") .appName("DataFrameTest") .getOrCreate()import spark.implicits._//用户信息val userDf = spark.sparkContext.makeRDD(Seq( (1, "张三", 20, "male", 11, 2), (2, "李四", 50, "female", 22, 3), (3, "王五", 18, "male", 11, 3))).toDF("id", "name", "age", "gender", "deptId", "leader")//部分信息val deptDf = spark.sparkContext.makeRDD(Seq( (10, "IT技术部"), (11, "研发部"), (12, "行政部"))).toDF("id", "name")
内连接
import spark.implicits._ //第一种方式val df2 = userDf.join(deptDf, userDf("deptId") === deptDf("id"), "inner")df2.select(userDf("*"), deptDf("name").alias("deptName")).show() //第二种方式val df3 = userDf.alias("first").join(deptDf.alias("second"), $"first.deptId" === $"second.id", "inner")df3.selectExpr("first.*", "second.name as deptName").show()
结果
左连接
import spark.implicits._ //第一种方式val df2 = userDf.join(deptDf, userDf("deptId") === deptDf("id"), "left")df2.select(userDf("*"), deptDf("name").alias("deptName")).show() //第二种方式val df3 = userDf.alias("first").join(deptDf.alias("second"), $"first.deptId" === $"second.id", "left")df3.selectExpr("first.*", "second.name as deptName").show()
结果
右连接
import spark.implicits._//第一种方式val df2 = userDf.join(deptDf, userDf("deptId") === deptDf("id"), "right")df2.select(userDf("*"), deptDf("name").alias("deptName"), deptDf("id").alias("deptId")).show()//第二种方式val df3 = userDf.alias("first").join(deptDf.alias("second"), $"first.deptId" === $"second.id", "right")df3.selectExpr("first.*", "second.name as deptName", "second.id as deptId").show()
结果
外连接
import spark.implicits._val df2 = userDf.join(deptDf, userDf("deptId") === deptDf("id"), "outer")df2.select(userDf("*"), deptDf("name").alias("deptName"), deptDf("id").alias("deptId")).show()val df3 = userDf.alias("first").join(deptDf.alias("second"), $"first.deptId" === $"second.id", "outer")df3.selectExpr("first.*", "second.name as deptName", "second.id as deptId").show()
结果:
Spark之DataFrame聚合操作
数据准备
一个关于学生选课成绩的数据
val spark = SparkSession.builder() .master("local[1]") .appName("WordCount") .config("spark.streaming.kafka.consumer.cache.enabled", false) .getOrCreate()import spark.implicits._val df = spark.sparkContext.makeRDD(Seq( ("A", 20, "男", "scala", 90), ("B", 18, "女", "scala", 80), ("C", 22, "男", "scala", 100), ("A", 20, "男", "java", 60), ("C", 22, "男", "java", 50), ("A", 20, "男", "c", 50), ("B", 21, "女", "c", 50))).toDF("name", "age", "gender", "course","score")
聚合示例
统计每个选修课程的参与人数
val df1 = df.groupBy("course").count()df1.show(false)
执行结果
+------+-----+|course|count|+------+-----+|scala |3 ||c |2 ||java |2 |+------+-----+
统计各选修课程男女的平均成绩
val df2 = df.groupBy("course","gender").avg("score")df2.show(false)
执行结果
+------+------+----------+|course|gender|avg(score)|+------+------+----------+|c |女 |50.0 ||java |男 |55.0 ||c |男 |50.0 ||scala |女 |80.0 ||scala |男 |95.0 |+------+------+----------+
统计各选修课程的人数及选修人员姓名
import org.apache.spark.sql.functions._val df3 = df.groupBy("course").agg(count("name"), collect_list("name"))df3.show(false)
执行结果
+------+-----------+------------------+|course|count(name)|collect_list(name)|+------+-----------+------------------+|scala |3 |[A, B, C] ||c |2 |[A, B] ||java |2 |[A, C] |+------+-----------+------------------+
统计各选修课程中男女成绩最好的用户姓名
val df4 = df.groupByKey(row => (row.getAs[String]("course"), row.getAs[String]("gender"))).mapGroups({ case ((course, gender), users) => (course, gender, users.maxBy(r => r.getAs[Integer]("score")).getAs[String]("name"))}).toDF("course", "gender", "name")df4.show(false)
执行结果
+------+------+----+|course|gender|name|+------+------+----+|c |女 |B ||java |男 |A ||c |男 |A ||scala |女 |B ||scala |男 |C |+------+------+----+