1, 设备安装app统计分析
代码有删减,但整体保持原结构
package com.wsy.learnimport org.apache.hadoop.conf.Configurationimport org.apache.hadoop.fs.{FileSystem, Path}import org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.functions._import org.apache.spark.internal.Loggingimport org.apache.spark.storage.StorageLevelobject InstallAppCount extends Logging {def main(args: Array[String]): Unit = {if (args.length > 0) {logInfo(s"------ args=${args(0)}----------")} else {logError(s"------ 请传递一个逗号分割的查询开始日期,结束日志,结果保存路径的参数,例如 20200615,20200616,/your/path ----------")System.exit(1)}val startDate = args(0).split(",")(0).trimval endDate = args(0).split(",")(1).trimval output = args(0).split(",")(2).trimlogInfo(s" ------- startDate=${startDate}, endDate=${endDate}, output=${output} ----------")val spark = SparkSession.builder().master("yarn").appName("install_app_count").config("spark.sql.strict.mode", false).config("spark.default.parallelism", 200).config("spark.sql.shuffle.partitions", 200).config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").enableHiveSupport().getOrCreate()//spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")//spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")//此路径可以已存在spark.sparkContext.setCheckpointDir("/home/wsy/dir")import spark.implicits._//spark.sql("set spark.sql.strict.mode = false ")spark.sql("use your_database")//table 是分区表,select必须在where中指定分区val df1 = spark.sql(s"select module, x from your_table where (modulename = 'a' or modulename = 'b' ) and day >= '${startDate}' and day < '${endDate}' ")val df2 = df1.select($"module",get_json_object($"x", "$.a").as("a"),get_json_object($"x", "$.b").as("b"),when($"module" === "a", get_json_object($"x", "$.a")).otherwise(get_json_object($"x", "$.b")).as("device"))val df3 = df2.filter($"x" === "3" || $"x" === "4").filter($"device".isNotNull).repartition(200)val df4 = df3.withColumn("deviceid",when($"x" === "3", get_json_object($"device", "$.a")).otherwise(get_json_object($"device", "$.b")))val df5 = df4.select($"module", $"a", $"b", $"c", $"deviceid")//优化复用,原始数据一个月3.5T, df5大概220GB,对3.5Tshuffle,250core大概1小时,500core,大概30分钟// persist或者checkpoint,把原来3小时40分钟缩短到40分钟,原程序对df5有5次复用,这里代码有删减// 缓存以后,原来重复5遍的计算,只需要一次,是一个大大的优化df5.persist(StorageLevel.MEMORY_AND_DISK_SER)//val df5=df51.checkpoint()df5.count()val df_app = df5.filter($"source" === "ios" || $"source" === "android").select($"source", $"deviceid").distinct().repartition(100)df_app.cache()val result_app = df_app.count()val df_accountsystem = df5.filter($"tixiid" === "1").select($"tixiid", $"deviceid", $"source").distinct().repartition(100)df_accountsystem.cache()val result_accountsystem = df_accountsystem.count()val result_x: Seq[String] = Seq(s"安装APP的数量, ${result_app}", s"安装账号体系数量, ${result_accountsystem}")val df_x = df5.filter($"a" === "ios" || $"a" === "android").select($"a", $"deviceid").distinct().repartition(100)df_x.cache()val result_11: Long = df_x.count()//统计结果val df_accountsystem_1 = df_accountsystem.filter($"a" =!= "ios" && $"a" =!= "android").select($"x", $"deviceid")val result_12: Long = df_x.join(df_app, "deviceid").count()val result_13: Long = df_x.join(df_accountsystem_1, "deviceid").count()val result_1: Seq[String] = Seq(s"安装x APP的数量, ${result_11}", s"同时安装x及y APP的数量, ${result_12}", s"同时安装x 及账号体系其他APP的数量, ${result_13}")//如果已存在,删除,防止目录已存在异常val conf = new Configuration()val fs = FileSystem.get(conf)val output_path = new Path(output)if (fs.exists(output_path)) {fs.delete(output_path, true)}val result: Seq[String] = result_x ++ result_1logInfo(s"------result=${result}---------------")spark.sparkContext.parallelize(result, 1).saveAsTextFile(output)spark.stop()}}
