1, 设备安装app统计分析

代码有删减,但整体保持原结构

  1. package com.wsy.learn
  2. import org.apache.hadoop.conf.Configuration
  3. import org.apache.hadoop.fs.{FileSystem, Path}
  4. import org.apache.spark.sql.SparkSession
  5. import org.apache.spark.sql.functions._
  6. import org.apache.spark.internal.Logging
  7. import org.apache.spark.storage.StorageLevel
  8. object InstallAppCount extends Logging {
  9. def main(args: Array[String]): Unit = {
  10. if (args.length > 0) {
  11. logInfo(s"------ args=${args(0)}----------")
  12. } else {
  13. logError(s"------ 请传递一个逗号分割的查询开始日期,结束日志,结果保存路径的参数,例如 20200615,20200616,/your/path ----------")
  14. System.exit(1)
  15. }
  16. val startDate = args(0).split(",")(0).trim
  17. val endDate = args(0).split(",")(1).trim
  18. val output = args(0).split(",")(2).trim
  19. logInfo(s" ------- startDate=${startDate}, endDate=${endDate}, output=${output} ----------")
  20. val spark = SparkSession.builder()
  21. .master("yarn")
  22. .appName("install_app_count")
  23. .config("spark.sql.strict.mode", false)
  24. .config("spark.default.parallelism", 200)
  25. .config("spark.sql.shuffle.partitions", 200)
  26. .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  27. .enableHiveSupport()
  28. .getOrCreate()
  29. //spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
  30. //spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
  31. //此路径可以已存在
  32. spark.sparkContext.setCheckpointDir("/home/wsy/dir")
  33. import spark.implicits._
  34. //spark.sql("set spark.sql.strict.mode = false ")
  35. spark.sql("use your_database")
  36. //table 是分区表,select必须在where中指定分区
  37. val df1 = spark.sql(s"select module, x from your_table where (modulename = 'a' or modulename = 'b' ) and day >= '${startDate}' and day < '${endDate}' ")
  38. val df2 = df1.select(
  39. $"module",
  40. get_json_object($"x", "$.a").as("a"),
  41. get_json_object($"x", "$.b").as("b"),
  42. when($"module" === "a", get_json_object($"x", "$.a"))
  43. .otherwise(get_json_object($"x", "$.b"))
  44. .as("device")
  45. )
  46. val df3 = df2.filter($"x" === "3" || $"x" === "4").filter($"device".isNotNull).repartition(200)
  47. val df4 = df3.withColumn("deviceid",
  48. when($"x" === "3", get_json_object($"device", "$.a"))
  49. .otherwise(get_json_object($"device", "$.b"))
  50. )
  51. val df5 = df4.select($"module", $"a", $"b", $"c", $"deviceid")
  52. //优化复用,原始数据一个月3.5T, df5大概220GB,对3.5Tshuffle,250core大概1小时,500core,大概30分钟
  53. // persist或者checkpoint,把原来3小时40分钟缩短到40分钟,原程序对df5有5次复用,这里代码有删减
  54. // 缓存以后,原来重复5遍的计算,只需要一次,是一个大大的优化
  55. df5.persist(StorageLevel.MEMORY_AND_DISK_SER)
  56. //val df5=df51.checkpoint()
  57. df5.count()
  58. val df_app = df5.filter($"source" === "ios" || $"source" === "android")
  59. .select($"source", $"deviceid")
  60. .distinct()
  61. .repartition(100)
  62. df_app.cache()
  63. val result_app = df_app.count()
  64. val df_accountsystem = df5.filter($"tixiid" === "1")
  65. .select($"tixiid", $"deviceid", $"source")
  66. .distinct()
  67. .repartition(100)
  68. df_accountsystem.cache()
  69. val result_accountsystem = df_accountsystem.count()
  70. val result_x: Seq[String] = Seq(s"安装APP的数量, ${result_app}", s"安装账号体系数量, ${result_accountsystem}")
  71. val df_x = df5.filter($"a" === "ios" || $"a" === "android")
  72. .select($"a", $"deviceid")
  73. .distinct()
  74. .repartition(100)
  75. df_x.cache()
  76. val result_11: Long = df_x.count()
  77. //统计结果
  78. val df_accountsystem_1 = df_accountsystem.filter($"a" =!= "ios" && $"a" =!= "android")
  79. .select($"x", $"deviceid")
  80. val result_12: Long = df_x.join(df_app, "deviceid").count()
  81. val result_13: Long = df_x.join(df_accountsystem_1, "deviceid").count()
  82. val result_1: Seq[String] = Seq(s"安装x APP的数量, ${result_11}", s"同时安装x及y APP的数量, ${result_12}", s"同时安装x 及账号体系其他APP的数量, ${result_13}")
  83. //如果已存在,删除,防止目录已存在异常
  84. val conf = new Configuration()
  85. val fs = FileSystem.get(conf)
  86. val output_path = new Path(output)
  87. if (fs.exists(output_path)) {
  88. fs.delete(output_path, true)
  89. }
  90. val result: Seq[String] = result_x ++ result_1
  91. logInfo(s"------result=${result}---------------")
  92. spark.sparkContext.parallelize(result, 1).saveAsTextFile(output)
  93. spark.stop()
  94. }
  95. }