1, 设备安装app统计分析
代码有删减,但整体保持原结构
package com.wsy.learn
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.internal.Logging
import org.apache.spark.storage.StorageLevel
object InstallAppCount extends Logging {
def main(args: Array[String]): Unit = {
if (args.length > 0) {
logInfo(s"------ args=${args(0)}----------")
} else {
logError(s"------ 请传递一个逗号分割的查询开始日期,结束日志,结果保存路径的参数,例如 20200615,20200616,/your/path ----------")
System.exit(1)
}
val startDate = args(0).split(",")(0).trim
val endDate = args(0).split(",")(1).trim
val output = args(0).split(",")(2).trim
logInfo(s" ------- startDate=${startDate}, endDate=${endDate}, output=${output} ----------")
val spark = SparkSession.builder()
.master("yarn")
.appName("install_app_count")
.config("spark.sql.strict.mode", false)
.config("spark.default.parallelism", 200)
.config("spark.sql.shuffle.partitions", 200)
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport()
.getOrCreate()
//spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
//spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
//此路径可以已存在
spark.sparkContext.setCheckpointDir("/home/wsy/dir")
import spark.implicits._
//spark.sql("set spark.sql.strict.mode = false ")
spark.sql("use your_database")
//table 是分区表,select必须在where中指定分区
val df1 = spark.sql(s"select module, x from your_table where (modulename = 'a' or modulename = 'b' ) and day >= '${startDate}' and day < '${endDate}' ")
val df2 = df1.select(
$"module",
get_json_object($"x", "$.a").as("a"),
get_json_object($"x", "$.b").as("b"),
when($"module" === "a", get_json_object($"x", "$.a"))
.otherwise(get_json_object($"x", "$.b"))
.as("device")
)
val df3 = df2.filter($"x" === "3" || $"x" === "4").filter($"device".isNotNull).repartition(200)
val df4 = df3.withColumn("deviceid",
when($"x" === "3", get_json_object($"device", "$.a"))
.otherwise(get_json_object($"device", "$.b"))
)
val df5 = df4.select($"module", $"a", $"b", $"c", $"deviceid")
//优化复用,原始数据一个月3.5T, df5大概220GB,对3.5Tshuffle,250core大概1小时,500core,大概30分钟
// persist或者checkpoint,把原来3小时40分钟缩短到40分钟,原程序对df5有5次复用,这里代码有删减
// 缓存以后,原来重复5遍的计算,只需要一次,是一个大大的优化
df5.persist(StorageLevel.MEMORY_AND_DISK_SER)
//val df5=df51.checkpoint()
df5.count()
val df_app = df5.filter($"source" === "ios" || $"source" === "android")
.select($"source", $"deviceid")
.distinct()
.repartition(100)
df_app.cache()
val result_app = df_app.count()
val df_accountsystem = df5.filter($"tixiid" === "1")
.select($"tixiid", $"deviceid", $"source")
.distinct()
.repartition(100)
df_accountsystem.cache()
val result_accountsystem = df_accountsystem.count()
val result_x: Seq[String] = Seq(s"安装APP的数量, ${result_app}", s"安装账号体系数量, ${result_accountsystem}")
val df_x = df5.filter($"a" === "ios" || $"a" === "android")
.select($"a", $"deviceid")
.distinct()
.repartition(100)
df_x.cache()
val result_11: Long = df_x.count()
//统计结果
val df_accountsystem_1 = df_accountsystem.filter($"a" =!= "ios" && $"a" =!= "android")
.select($"x", $"deviceid")
val result_12: Long = df_x.join(df_app, "deviceid").count()
val result_13: Long = df_x.join(df_accountsystem_1, "deviceid").count()
val result_1: Seq[String] = Seq(s"安装x APP的数量, ${result_11}", s"同时安装x及y APP的数量, ${result_12}", s"同时安装x 及账号体系其他APP的数量, ${result_13}")
//如果已存在,删除,防止目录已存在异常
val conf = new Configuration()
val fs = FileSystem.get(conf)
val output_path = new Path(output)
if (fs.exists(output_path)) {
fs.delete(output_path, true)
}
val result: Seq[String] = result_x ++ result_1
logInfo(s"------result=${result}---------------")
spark.sparkContext.parallelize(result, 1).saveAsTextFile(output)
spark.stop()
}
}