1、产品概述
大数据设计的DMP标签体系致力于对创蓝所用于的号码用户群,进行标签体系构建,包括触达标签,偏好标签,私域流量标签,基础信息标签等,通过不同来源数据整合,清洗,挖掘构建一个公司层面的用户标签体系。当前整个DMP标签体系正在设计开发中。
2、触达标签的大表格的设计
mobile(号码) | telcom (运营商) | base_province (号码省份) | base_city (号码城市) | appplatform(平台1:IOS 2:安卓) | devicename (设备) | imei(设备信息) | oaid(设备信息) | aim(aim方式) | aim_device(aim设备) | rcs_video(视频短信) | rcs_5g(5g触达) | rcs_tailsms(贴尾短信) | rcs_ip(ip短信) | rcs_number_porting(携号转网) | rcs_status(号码状态) | … |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
3、表格构建代码
CREATE TABLE `cl_nlp.nlp_contact_label_system`(
`mobile` string COMMENT '手机号',
`telcom` string COMMENT '运营商',
`base_province` string COMMENT '手机号归属省份',
`base_city` string COMMENT '手机号归属市',
`applatform` string COMMENT '平台1:IOS 2:安卓',
`devicename` string COMMENT '设备名称',
`imei` string COMMENT '设别信息imei',
`oaid` string COMMENT '设备信息oaid',
`aim` string COMMENT 'aim方式1:aim触达,0:未知',
`aim_device` string COMMENT 'aim设备',
`rcs_video` string COMMENT '1:视频短信,0:未知',
`rcs_5g` string COMMENT '1:5g触达,0:未知',
`rcs_tailsms` string COMMENT '1:贴尾短信,0:未知',
`rcs_ip` string COMMENT '1:ip短信,0:未知',
`rcs_number_porting` string COMMENT '1:携号转网,0:未知',
`rcs_status` string COMMENT '号码状态'
)
PARTITIONED BY (
`ptt_day` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
TBLPROPERTIES("parquet.compression"="SNAPPY");
注:生产环境中,这张表还没有建立,因为后续可能融入偏好和私域的标签,表格可能需要扩列。
4、触达标签数据流
4.1 创蓝号码池
- cl_nlp.nlp_mobile_di
由于数据量较大,需要规范一定的范围,缩小号码量,作为dmp标签的号码池,该号码池可以每月(或其他)更新一次。
spark.sql(
"""
|select
| mobile
|from
| cl_nlp.nlp_mobile_di
|where
| max_date>'$time'
|group by
| mobile
|""".stripMargin).cache().createOrReplaceTempView("tmp1_mobile")
4.2 aim方式
当前可实现aim的有三种类型机型的表格:
- cl_nlp.nlp_oppo_aim_df:7690
- cl_nlp.nlp_device_xiaomi_aim_df:40122683
- cl_nlp.nlp_aliyun_aim_df
这三张表中分别包含mobile(号码)和device(设备)字段
这三张表的数据量较少,先将三张表union all
spark.sql(
"""
|select
| substr(mobile,4) mobile,
| device
|from
| cl_nlp.nlp_oppo_aim_df
|union all
|select
| mobile,
| device
|from
| cl_nlp.nlp_device_xiaomi_aim_df
|group by
| mobile,
| device
|union all
|select
| mobile,
| device
|from
| cl_nlp.nlp_aliyun_aim_df
|group by
| mobile
| device
|""".stripMargin).cache().createOrReplaceTempView("tmp2_aim")
4.3 ip短信
- cl_nlp.dwd_vivo_device_result_agg
该表格是以日全量量进行更新,所以获取最新日期的数据作为有效的号码
spark.sql(
s"""
|select
| mobile
|from
| cl_nlp.dwd_vivo_device_result_ag
|where
| ptt_day='$ptt_day'
|group by
| mobile
|""".stripMargin).cache().createOrReplaceTempView("tmp3_ip")
4.4 视频短信
- cl_ods.ods_rcs_m_batch_detail_di
该表格中号码进行加密,获取有效号码时需要先解密
spark.sql(
s"""
|select
| concat(cl_ods.decrypt(substr(phone,1,30)),substr(phone,31,34)) mobile,
|from
| cl_ods.ods_rcs_m_batch_detail_di
|group by
| mobile
|""".stripMargin).cache().createOrReplaceTempView("tmp2_video")
4.5 SDK设备信息
- cl_cdm.dwd_fls_sdk_mobile_detail_org_di
从SDK表格数据中可以获取运营商,手机归属省份相关信息,平台,设备名,imei设备信息,oaid设备信息等信息,由于该表是依据用户点击时间记录,所以使用用户最新一条数据代表用户信息。
spark.sql(
s"""
|select
| *
|from
| (select
| mobile,
| telcom,
| base_province,
| base_city,
| appplatform,
| devicename,
| imei,
| oaid,
| row_number()over(partition BY mobile ORDER BY m_time DESC) rn
| from
| cl_cdm.dwd_fls_sdk_mobile_detail_org_di
| where
ptt_mon<="$ptt_mon"
| ) t
|where
| t.rn = 1
|""".stripMargin).cache().createOrReplaceTempView("tmp3_sdk")
4.6 贴尾触达
4.7 5G触达
4.8 普通短信
- 运营商
- 携号转网
- 号码状态
这部分的表格还在处理中
5、整体撞库拼接代码
package com.bigdata.Temporary.tmp
import org.apache.spark.sql.SparkSession
import java.text.SimpleDateFormat
import java.util.Date
object DMP_CD {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("DMP_CD_Day")
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("hive.warehouse.subdir.inherit.perms", "false")
.enableHiveSupport()
.getOrCreate()
// 定义变量,日更的时间,月更的时间
val sf1 = new SimpleDateFormat("yyyyMMdd")
//当前的时间多一天的时间
val date1 = new Date(System.currentTimeMillis - 1000 * 60 * 60 * 24)
val ptt_day = sf1.format(date1)
// 构建有效号码池
spark.sql(
"""
|select
| mobile
|from
| cl_nlp.nlp_mobile_di
|where
| max_date>'20220101'
|group by
| mobile
|""".stripMargin).cache().createOrReplaceTempView("tmp1_mobile")
// 将aim方式三张数据表拼接到一起
// 按照日更新的数据为:cl_nlp.nlp_oppo_aim_df,cl_nlp.nlp_xiaomi_aim_df,cl_nlp.nlp_aliyun_aim_df
spark.sql(
"""
|select
| substr(mobile,4) mobile,
| device
|from
| cl_nlp.nlp_oppo_aim_df
|union all
|select
| mobile,
| device
|from
| cl_nlp.nlp_device_xiaomi_aim_df
|group by
| mobile,
| device
|union all
|select
| mobile,
| device
|from
| cl_nlp.nlp_aliyun_aim_df
|group by
| mobile
| device
|""".stripMargin).cache().createOrReplaceTempView("tmp2_aim")
//ip短信:cl_nlp.dwd_vivo_device_result_agg,天全量表
spark.sql(
s"""
|select
| mobile
|from
| cl_nlp.dwd_vivo_device_result_agg
|where
| ptt_day='$ptt_day'
|group by
| mobile
|""".stripMargin).cache().createOrReplaceTempView("tmp3_ip")
// 数据量较大:去重后亿级别
// 视频短信:cl_ods.ods_rcs_m_batch_detail_di
spark.sql(
s"""
|select
| concat(cl_ods.decrypt(substr(phone,1,30)),substr(phone,31,34)) mobile,
|from
| cl_ods.ods_rcs_m_batch_detail_di
|group by
| mobile
|""".stripMargin).cache().createOrReplaceTempView("tmp4_video")
// 数据量较大,去重后亿级别
//SDK数据:cl_cdm.dwd_fls_sdk_mobile_detail_org_di
spark.sql(
s"""
|select
| *
|from
| (select
| mobile,
| telcom,
| base_province,
| base_city,
| appplatform,
| devicename,
| imei,
| oaid,
| row_number()over(partition BY mobile ORDER BY m_time DESC) rn
| from
| cl_cdm.dwd_fls_sdk_mobile_detail_org_di
| ) t
|where
| t.rn = 1
|""".stripMargin).cache().createOrReplaceTempView("tmp5_sdk")
// 表格与号码池之间拼接
//aim数据与号码库拼接
spark.sql(
"""
|select
| tmp1_mobile.mobile
| tmp2_aim.device
| if(tmp2.device is NULL,0,1) as aim
|from
| tmp1_mobile
|left join
| tmp2_aim
|on
| tmp1_mobile.mobile=tmp2_aim.mobile
|""".stripMargin).cache().createOrReplaceTempView("tmp6_aim_mobile")
// ip号码与号码库拼接
spark.sql(
"""
|select
| tmp6_aim_mobil.mobile,
| tmp6_aim_mobi.device
| tmp6_aim_mobi.aim
| if(tmp3_ip is null,0,1) as rcs_ip
|from
| tmp6_aim_mobile
|left join
| tmp3_ip
|on
| tmp6_aim_mobile.mobile = tmp3_ip.mobile
|""".stripMargin).cache().createOrReplaceTempView("tmp7_aim_ip_mobile")
//视频短信号码与号码库拼接
spark.sql(
"""
|select
| tmp7_aim_ip_mobile.mobile,
| tmp7_aim_ip_mobile.device,
| tmp7_aim_ip_mobile.aim,
| tmp7_aim_ip_mobile.rcs_ip
| if(tmp4_video. is NULL,0,1) as rcs_video
|from
| tmp7_aim_ip_mobile
|left join
| tmp4_video
|on
| tmp7_aim_ip_mobile.mobile=tmp4_video.mobile
|""".stripMargin).cache().createOrReplaceTempView("tmp8_aim_ip_video_mobile")
//SDK数据与号码库拼库
spark.sql(
"""
|insert overwrite into cl_nlp.nlp_contact_label_system
|select
| tmp8_aim_ip_mobile.mobile
| tmp5_sdk.telcom,
| tmp5_sdk.base_province,
| tmp5_sdk.base_city,
| tmp5_sdk.appplatform,
| tmp5_sdk.devicename,
| tmp5_sdk.imei,
| tmp5_sdk.oaid,
| tmp8_aim_ip_mobile.aim,
| tmp8_aim_ip_mobile.device,
| tmp8_aim_ip_mobile.rcs_ip.
| tmp8_aim_ip_mobile.rcs_video
|from
| tmp8_aim_ip_video_mobile
|left join
| tmp5_sdk
|on
| tmp8_aim_ip_video_mobile.mobile = tmp5_sdk.mobileimei
|""".stripMargin)
spark.table("tmp1_mobile").unpersist()
spark.table("tmp2_aim").unpersist()
spark.table("tmp3_ip").unpersist()
spark.table("tmp4_video").unpersist()
spark.table("tmp5_sdk").unpersist()
spark.table("tmp6_aim_mobile").unpersist()
spark.table("tmp7_aim_ip_mobile").unpersist()
spark.table("tmp8_aim_ip_video_mobile").unpersist()
spark.stop()
}
}
注:该代码暂未在生产环境中实践,由于数据量巨大这样拼接方式可能耗费较大资源,后续会继续优化
�