1、产品概述
大数据设计的DMP标签体系致力于对创蓝所用于的号码用户群,进行标签体系构建,包括触达标签,偏好标签,私域流量标签,基础信息标签等,通过不同来源数据整合,清洗,挖掘构建一个公司层面的用户标签体系。当前整个DMP标签体系正在设计开发中。
2、触达标签的大表格的设计
| mobile(号码) | telcom (运营商) | base_province (号码省份) | base_city (号码城市) | appplatform(平台1:IOS 2:安卓) | devicename (设备) | imei(设备信息) | oaid(设备信息) | aim(aim方式) | aim_device(aim设备) | rcs_video(视频短信) | rcs_5g(5g触达) | rcs_tailsms(贴尾短信) | rcs_ip(ip短信) | rcs_number_porting(携号转网) | rcs_status(号码状态) | … |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
3、表格构建代码
CREATE TABLE `cl_nlp.nlp_contact_label_system`(`mobile` string COMMENT '手机号',`telcom` string COMMENT '运营商',`base_province` string COMMENT '手机号归属省份',`base_city` string COMMENT '手机号归属市',`applatform` string COMMENT '平台1:IOS 2:安卓',`devicename` string COMMENT '设备名称',`imei` string COMMENT '设别信息imei',`oaid` string COMMENT '设备信息oaid',`aim` string COMMENT 'aim方式1:aim触达,0:未知',`aim_device` string COMMENT 'aim设备',`rcs_video` string COMMENT '1:视频短信,0:未知',`rcs_5g` string COMMENT '1:5g触达,0:未知',`rcs_tailsms` string COMMENT '1:贴尾短信,0:未知',`rcs_ip` string COMMENT '1:ip短信,0:未知',`rcs_number_porting` string COMMENT '1:携号转网,0:未知',`rcs_status` string COMMENT '号码状态')PARTITIONED BY (`ptt_day` string)ROW FORMAT SERDE'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'STORED AS INPUTFORMAT'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'OUTPUTFORMAT'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'TBLPROPERTIES("parquet.compression"="SNAPPY");
注:生产环境中,这张表还没有建立,因为后续可能融入偏好和私域的标签,表格可能需要扩列。
4、触达标签数据流
4.1 创蓝号码池
- cl_nlp.nlp_mobile_di
由于数据量较大,需要规范一定的范围,缩小号码量,作为dmp标签的号码池,该号码池可以每月(或其他)更新一次。
spark.sql("""|select| mobile|from| cl_nlp.nlp_mobile_di|where| max_date>'$time'|group by| mobile|""".stripMargin).cache().createOrReplaceTempView("tmp1_mobile")
4.2 aim方式
当前可实现aim的有三种类型机型的表格:
- cl_nlp.nlp_oppo_aim_df:7690
- cl_nlp.nlp_device_xiaomi_aim_df:40122683
- cl_nlp.nlp_aliyun_aim_df
这三张表中分别包含mobile(号码)和device(设备)字段
这三张表的数据量较少,先将三张表union all
spark.sql("""|select| substr(mobile,4) mobile,| device|from| cl_nlp.nlp_oppo_aim_df|union all|select| mobile,| device|from| cl_nlp.nlp_device_xiaomi_aim_df|group by| mobile,| device|union all|select| mobile,| device|from| cl_nlp.nlp_aliyun_aim_df|group by| mobile| device|""".stripMargin).cache().createOrReplaceTempView("tmp2_aim")
4.3 ip短信
- cl_nlp.dwd_vivo_device_result_agg
该表格是以日全量量进行更新,所以获取最新日期的数据作为有效的号码
spark.sql(s"""|select| mobile|from| cl_nlp.dwd_vivo_device_result_ag|where| ptt_day='$ptt_day'|group by| mobile|""".stripMargin).cache().createOrReplaceTempView("tmp3_ip")
4.4 视频短信
- cl_ods.ods_rcs_m_batch_detail_di
该表格中号码进行加密,获取有效号码时需要先解密
spark.sql(s"""|select| concat(cl_ods.decrypt(substr(phone,1,30)),substr(phone,31,34)) mobile,|from| cl_ods.ods_rcs_m_batch_detail_di|group by| mobile|""".stripMargin).cache().createOrReplaceTempView("tmp2_video")
4.5 SDK设备信息
- cl_cdm.dwd_fls_sdk_mobile_detail_org_di
从SDK表格数据中可以获取运营商,手机归属省份相关信息,平台,设备名,imei设备信息,oaid设备信息等信息,由于该表是依据用户点击时间记录,所以使用用户最新一条数据代表用户信息。
spark.sql(s"""|select| *|from| (select| mobile,| telcom,| base_province,| base_city,| appplatform,| devicename,| imei,| oaid,| row_number()over(partition BY mobile ORDER BY m_time DESC) rn| from| cl_cdm.dwd_fls_sdk_mobile_detail_org_di| whereptt_mon<="$ptt_mon"| ) t|where| t.rn = 1|""".stripMargin).cache().createOrReplaceTempView("tmp3_sdk")
4.6 贴尾触达
4.7 5G触达
4.8 普通短信
- 运营商
- 携号转网
- 号码状态
这部分的表格还在处理中
5、整体撞库拼接代码
package com.bigdata.Temporary.tmpimport org.apache.spark.sql.SparkSessionimport java.text.SimpleDateFormatimport java.util.Dateobject DMP_CD {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("DMP_CD_Day").config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").config("hive.warehouse.subdir.inherit.perms", "false").enableHiveSupport().getOrCreate()// 定义变量,日更的时间,月更的时间val sf1 = new SimpleDateFormat("yyyyMMdd")//当前的时间多一天的时间val date1 = new Date(System.currentTimeMillis - 1000 * 60 * 60 * 24)val ptt_day = sf1.format(date1)// 构建有效号码池spark.sql("""|select| mobile|from| cl_nlp.nlp_mobile_di|where| max_date>'20220101'|group by| mobile|""".stripMargin).cache().createOrReplaceTempView("tmp1_mobile")// 将aim方式三张数据表拼接到一起// 按照日更新的数据为:cl_nlp.nlp_oppo_aim_df,cl_nlp.nlp_xiaomi_aim_df,cl_nlp.nlp_aliyun_aim_dfspark.sql("""|select| substr(mobile,4) mobile,| device|from| cl_nlp.nlp_oppo_aim_df|union all|select| mobile,| device|from| cl_nlp.nlp_device_xiaomi_aim_df|group by| mobile,| device|union all|select| mobile,| device|from| cl_nlp.nlp_aliyun_aim_df|group by| mobile| device|""".stripMargin).cache().createOrReplaceTempView("tmp2_aim")//ip短信:cl_nlp.dwd_vivo_device_result_agg,天全量表spark.sql(s"""|select| mobile|from| cl_nlp.dwd_vivo_device_result_agg|where| ptt_day='$ptt_day'|group by| mobile|""".stripMargin).cache().createOrReplaceTempView("tmp3_ip")// 数据量较大:去重后亿级别// 视频短信:cl_ods.ods_rcs_m_batch_detail_dispark.sql(s"""|select| concat(cl_ods.decrypt(substr(phone,1,30)),substr(phone,31,34)) mobile,|from| cl_ods.ods_rcs_m_batch_detail_di|group by| mobile|""".stripMargin).cache().createOrReplaceTempView("tmp4_video")// 数据量较大,去重后亿级别//SDK数据:cl_cdm.dwd_fls_sdk_mobile_detail_org_dispark.sql(s"""|select| *|from| (select| mobile,| telcom,| base_province,| base_city,| appplatform,| devicename,| imei,| oaid,| row_number()over(partition BY mobile ORDER BY m_time DESC) rn| from| cl_cdm.dwd_fls_sdk_mobile_detail_org_di| ) t|where| t.rn = 1|""".stripMargin).cache().createOrReplaceTempView("tmp5_sdk")// 表格与号码池之间拼接//aim数据与号码库拼接spark.sql("""|select| tmp1_mobile.mobile| tmp2_aim.device| if(tmp2.device is NULL,0,1) as aim|from| tmp1_mobile|left join| tmp2_aim|on| tmp1_mobile.mobile=tmp2_aim.mobile|""".stripMargin).cache().createOrReplaceTempView("tmp6_aim_mobile")// ip号码与号码库拼接spark.sql("""|select| tmp6_aim_mobil.mobile,| tmp6_aim_mobi.device| tmp6_aim_mobi.aim| if(tmp3_ip is null,0,1) as rcs_ip|from| tmp6_aim_mobile|left join| tmp3_ip|on| tmp6_aim_mobile.mobile = tmp3_ip.mobile|""".stripMargin).cache().createOrReplaceTempView("tmp7_aim_ip_mobile")//视频短信号码与号码库拼接spark.sql("""|select| tmp7_aim_ip_mobile.mobile,| tmp7_aim_ip_mobile.device,| tmp7_aim_ip_mobile.aim,| tmp7_aim_ip_mobile.rcs_ip| if(tmp4_video. is NULL,0,1) as rcs_video|from| tmp7_aim_ip_mobile|left join| tmp4_video|on| tmp7_aim_ip_mobile.mobile=tmp4_video.mobile|""".stripMargin).cache().createOrReplaceTempView("tmp8_aim_ip_video_mobile")//SDK数据与号码库拼库spark.sql("""|insert overwrite into cl_nlp.nlp_contact_label_system|select| tmp8_aim_ip_mobile.mobile| tmp5_sdk.telcom,| tmp5_sdk.base_province,| tmp5_sdk.base_city,| tmp5_sdk.appplatform,| tmp5_sdk.devicename,| tmp5_sdk.imei,| tmp5_sdk.oaid,| tmp8_aim_ip_mobile.aim,| tmp8_aim_ip_mobile.device,| tmp8_aim_ip_mobile.rcs_ip.| tmp8_aim_ip_mobile.rcs_video|from| tmp8_aim_ip_video_mobile|left join| tmp5_sdk|on| tmp8_aim_ip_video_mobile.mobile = tmp5_sdk.mobileimei|""".stripMargin)spark.table("tmp1_mobile").unpersist()spark.table("tmp2_aim").unpersist()spark.table("tmp3_ip").unpersist()spark.table("tmp4_video").unpersist()spark.table("tmp5_sdk").unpersist()spark.table("tmp6_aim_mobile").unpersist()spark.table("tmp7_aim_ip_mobile").unpersist()spark.table("tmp8_aim_ip_video_mobile").unpersist()spark.stop()}}
注:该代码暂未在生产环境中实践,由于数据量巨大这样拼接方式可能耗费较大资源,后续会继续优化
�
