def saveDf(df: DataFrame, table: String): Unit = {
df.createOrReplaceTempView(s"${table}")
}
def makeJsonDf(joinSql: String, jsonColumnName: String, otherCollectFields: String): DataFrame = {
//先join获取全部字段信息
//拿到t1和t2两张表
val karr = joinSql.split(" ")
val k1 = karr.indexOf("t1")
val k2 = karr.indexOf("t2")
var table1 = karr(k1-1)
var table2 = karr(k2-1)
val df1: DataFrame = spark.sql("select * from " + table1)
var df2: DataFrame = spark.sql("select * from " + table2)
val df: DataFrame = spark.sql(joinSql)
var realSql: String = joinSql.replace("\n", "").trim()
println(s"sql: ${realSql}")
println("join表schema:")
df1.printSchema()
println()
val mfield = realSql.slice("select ".length(), realSql.indexOf(" from "))
//默认以t1开头的字段作为group by分组字段
var groupByFields = mfield.split(",").filter(_.contains("t1.")).map(item => {
val kstr = item.trim()
if(kstr.contains("distinct ")){
kstr.substring("distinct t1.".length)
}
else{
kstr.substring(3)
}
})
//不是t1开头的其他字段作为collect字段
var fields = mfield.split(",").filter(_.contains(".")).filter(!_.contains("t1.")).map(item => {
val kstr = item.trim()
if(kstr.contains(" as ")){
kstr.split(" as ")(1)
}
else{
kstr.substring(3)
}
})
println("group fields....")
groupByFields.foreach(println)
if (groupByFields.contains("*")) {
df1.schema.fieldNames.foreach(one => {
groupByFields = groupByFields :+ one
})
}
if(fields.contains("*")){
df2.schema.fieldNames.foreach(one => {
fields = fields :+ one
})
}
otherCollectFields.split(",").foreach(one => {
fields = fields :+ one
})
var groupByColumns: Array[Column] = Array[Column]()
groupByFields.distinct.foreach(one => {
if (df1.columns.contains(one)) {
groupByColumns = groupByColumns :+ df("t1." + one)
}
})
val mts = df.columns
mts.foreach(println)
var collectColumns: Array[Column] = Array[Column]()
fields.distinct.foreach(field => {
if (df.columns.contains(field)) {
collectColumns = collectColumns :+ lit(field)
collectColumns = collectColumns :+ df("t2." + field)
}
})
println("group fields:")
groupByColumns.foreach(println)
println()
println("collect fields:")
collectColumns.foreach(println)
println()
val df5: DataFrame = df.groupBy(groupByColumns: _*).agg(collect_list(map(collectColumns: _*)).as(jsonColumnName))
val df6 = df5.withColumn(jsonColumnName, to_json(df5(jsonColumnName)))
df6
}
使用示例
var sql = "select t1.registno,t1.accidentno,t1.policyno,t2.casetype as caseflag from ods_new.ods_car_lregist t1 left join ods_new.ods_car_lclaim t2 on t1.accidentno = t2.accidentno and t1.policyno = t2.policyno"
var claimDf = spark.sql(sql)
saveDf(claimDf, "tmp_claim")
// 理赔 join 报案
sql = "select t1.*,t2.frameno,t2.licenseno,t2.reportormobile,t2.comcode,t2.accidentno,t2.policyno from tmp_claim t1 left join ods_new.ods_car_lregist t2 on t1.accidentno = t2.accidentno and t1.registno = t2.registno";
claimDf = makeJsonDf(sql, "incident", "")
saveDf(claimDf, "tmp_claim")