def saveDf(df: DataFrame, table: String): Unit = {df.createOrReplaceTempView(s"${table}")}def makeJsonDf(joinSql: String, jsonColumnName: String, otherCollectFields: String): DataFrame = {//先join获取全部字段信息//拿到t1和t2两张表val karr = joinSql.split(" ")val k1 = karr.indexOf("t1")val k2 = karr.indexOf("t2")var table1 = karr(k1-1)var table2 = karr(k2-1)val df1: DataFrame = spark.sql("select * from " + table1)var df2: DataFrame = spark.sql("select * from " + table2)val df: DataFrame = spark.sql(joinSql)var realSql: String = joinSql.replace("\n", "").trim()println(s"sql: ${realSql}")println("join表schema:")df1.printSchema()println()val mfield = realSql.slice("select ".length(), realSql.indexOf(" from "))//默认以t1开头的字段作为group by分组字段var groupByFields = mfield.split(",").filter(_.contains("t1.")).map(item => {val kstr = item.trim()if(kstr.contains("distinct ")){kstr.substring("distinct t1.".length)}else{kstr.substring(3)}})//不是t1开头的其他字段作为collect字段var fields = mfield.split(",").filter(_.contains(".")).filter(!_.contains("t1.")).map(item => {val kstr = item.trim()if(kstr.contains(" as ")){kstr.split(" as ")(1)}else{kstr.substring(3)}})println("group fields....")groupByFields.foreach(println)if (groupByFields.contains("*")) {df1.schema.fieldNames.foreach(one => {groupByFields = groupByFields :+ one})}if(fields.contains("*")){df2.schema.fieldNames.foreach(one => {fields = fields :+ one})}otherCollectFields.split(",").foreach(one => {fields = fields :+ one})var groupByColumns: Array[Column] = Array[Column]()groupByFields.distinct.foreach(one => {if (df1.columns.contains(one)) {groupByColumns = groupByColumns :+ df("t1." + one)}})val mts = df.columnsmts.foreach(println)var collectColumns: Array[Column] = Array[Column]()fields.distinct.foreach(field => {if (df.columns.contains(field)) {collectColumns = collectColumns :+ lit(field)collectColumns = collectColumns :+ df("t2." + field)}})println("group fields:")groupByColumns.foreach(println)println()println("collect fields:")collectColumns.foreach(println)println()val df5: DataFrame = df.groupBy(groupByColumns: _*).agg(collect_list(map(collectColumns: _*)).as(jsonColumnName))val df6 = df5.withColumn(jsonColumnName, to_json(df5(jsonColumnName)))df6}
使用示例
var sql = "select t1.registno,t1.accidentno,t1.policyno,t2.casetype as caseflag from ods_new.ods_car_lregist t1 left join ods_new.ods_car_lclaim t2 on t1.accidentno = t2.accidentno and t1.policyno = t2.policyno"var claimDf = spark.sql(sql)saveDf(claimDf, "tmp_claim")// 理赔 join 报案sql = "select t1.*,t2.frameno,t2.licenseno,t2.reportormobile,t2.comcode,t2.accidentno,t2.policyno from tmp_claim t1 left join ods_new.ods_car_lregist t2 on t1.accidentno = t2.accidentno and t1.registno = t2.registno";claimDf = makeJsonDf(sql, "incident", "")saveDf(claimDf, "tmp_claim")
