1、产品概述

大数据设计的DMP标签体系致力于对创蓝所用于的号码用户群,进行标签体系构建,包括触达标签,偏好标签,私域流量标签,基础信息标签等,通过不同来源数据整合,清洗,挖掘构建一个公司层面的用户标签体系。当前整个DMP标签体系正在设计开发中。
origin_img_v2_6a749ba4-ab2f-4838-b0d1-8285de99710g.png

2、触达标签的大表格的设计

mobile(号码) telcom (运营商) base_province (号码省份) base_city (号码城市) appplatform(平台1:IOS 2:安卓) devicename (设备) imei(设备信息) oaid(设备信息) aim(aim方式) aim_device(aim设备) rcs_video(视频短信) rcs_5g(5g触达) rcs_tailsms(贴尾短信) rcs_ip(ip短信) rcs_number_porting(携号转网) rcs_status(号码状态)

3、表格构建代码

  1. CREATE TABLE `cl_nlp.nlp_contact_label_system`(
  2. `mobile` string COMMENT '手机号',
  3. `telcom` string COMMENT '运营商',
  4. `base_province` string COMMENT '手机号归属省份',
  5. `base_city` string COMMENT '手机号归属市',
  6. `applatform` string COMMENT '平台1:IOS 2:安卓',
  7. `devicename` string COMMENT '设备名称',
  8. `imei` string COMMENT '设别信息imei',
  9. `oaid` string COMMENT '设备信息oaid',
  10. `aim` string COMMENT 'aim方式1:aim触达,0:未知',
  11. `aim_device` string COMMENT 'aim设备',
  12. `rcs_video` string COMMENT '1:视频短信,0:未知',
  13. `rcs_5g` string COMMENT '1:5g触达,0:未知',
  14. `rcs_tailsms` string COMMENT '1:贴尾短信,0:未知',
  15. `rcs_ip` string COMMENT '1:ip短信,0:未知',
  16. `rcs_number_porting` string COMMENT '1:携号转网,0:未知',
  17. `rcs_status` string COMMENT '号码状态'
  18. )
  19. PARTITIONED BY (
  20. `ptt_day` string)
  21. ROW FORMAT SERDE
  22. 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
  23. STORED AS INPUTFORMAT
  24. 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
  25. OUTPUTFORMAT
  26. 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
  27. TBLPROPERTIES("parquet.compression"="SNAPPY");

注:生产环境中,这张表还没有建立,因为后续可能融入偏好和私域的标签,表格可能需要扩列。

4、触达标签数据流

origin_img_v2_569af1c5-14f9-4432-ac3d-291465d9278g.png

4.1 创蓝号码池

  • cl_nlp.nlp_mobile_di

由于数据量较大,需要规范一定的范围,缩小号码量,作为dmp标签的号码池,该号码池可以每月(或其他)更新一次。

  1. spark.sql(
  2. """
  3. |select
  4. | mobile
  5. |from
  6. | cl_nlp.nlp_mobile_di
  7. |where
  8. | max_date>'$time'
  9. |group by
  10. | mobile
  11. |""".stripMargin).cache().createOrReplaceTempView("tmp1_mobile")

4.2 aim方式

当前可实现aim的有三种类型机型的表格:

  • cl_nlp.nlp_oppo_aim_df:7690
  • cl_nlp.nlp_device_xiaomi_aim_df:40122683
  • cl_nlp.nlp_aliyun_aim_df

这三张表中分别包含mobile(号码)和device(设备)字段
这三张表的数据量较少,先将三张表union all

  1. spark.sql(
  2. """
  3. |select
  4. | substr(mobile,4) mobile,
  5. | device
  6. |from
  7. | cl_nlp.nlp_oppo_aim_df
  8. |union all
  9. |select
  10. | mobile,
  11. | device
  12. |from
  13. | cl_nlp.nlp_device_xiaomi_aim_df
  14. |group by
  15. | mobile,
  16. | device
  17. |union all
  18. |select
  19. | mobile,
  20. | device
  21. |from
  22. | cl_nlp.nlp_aliyun_aim_df
  23. |group by
  24. | mobile
  25. | device
  26. |""".stripMargin).cache().createOrReplaceTempView("tmp2_aim")

4.3 ip短信

  • cl_nlp.dwd_vivo_device_result_agg

该表格是以日全量量进行更新,所以获取最新日期的数据作为有效的号码

  1. spark.sql(
  2. s"""
  3. |select
  4. | mobile
  5. |from
  6. | cl_nlp.dwd_vivo_device_result_ag
  7. |where
  8. | ptt_day='$ptt_day'
  9. |group by
  10. | mobile
  11. |""".stripMargin).cache().createOrReplaceTempView("tmp3_ip")

4.4 视频短信

  • cl_ods.ods_rcs_m_batch_detail_di

该表格中号码进行加密,获取有效号码时需要先解密

  1. spark.sql(
  2. s"""
  3. |select
  4. | concat(cl_ods.decrypt(substr(phone,1,30)),substr(phone,31,34)) mobile,
  5. |from
  6. | cl_ods.ods_rcs_m_batch_detail_di
  7. |group by
  8. | mobile
  9. |""".stripMargin).cache().createOrReplaceTempView("tmp2_video")

4.5 SDK设备信息

  • cl_cdm.dwd_fls_sdk_mobile_detail_org_di

从SDK表格数据中可以获取运营商,手机归属省份相关信息,平台,设备名,imei设备信息,oaid设备信息等信息,由于该表是依据用户点击时间记录,所以使用用户最新一条数据代表用户信息。

  1. spark.sql(
  2. s"""
  3. |select
  4. | *
  5. |from
  6. | (select
  7. | mobile,
  8. | telcom,
  9. | base_province,
  10. | base_city,
  11. | appplatform,
  12. | devicename,
  13. | imei,
  14. | oaid,
  15. | row_number()over(partition BY mobile ORDER BY m_time DESC) rn
  16. | from
  17. | cl_cdm.dwd_fls_sdk_mobile_detail_org_di
  18. | where
  19. ptt_mon<="$ptt_mon"
  20. | ) t
  21. |where
  22. | t.rn = 1
  23. |""".stripMargin).cache().createOrReplaceTempView("tmp3_sdk")

4.6 贴尾触达

注:好像是依据国内短信的批次号来识别(目前还不明确)

4.7 5G触达

当前逻辑关系还不明确

4.8 普通短信

  • 运营商
  • 携号转网
  • 号码状态

这部分的表格还在处理中

5、整体撞库拼接代码

  1. package com.bigdata.Temporary.tmp
  2. import org.apache.spark.sql.SparkSession
  3. import java.text.SimpleDateFormat
  4. import java.util.Date
  5. object DMP_CD {
  6. def main(args: Array[String]): Unit = {
  7. val spark = SparkSession.builder()
  8. .appName("DMP_CD_Day")
  9. .config("hive.exec.dynamic.partition", "true")
  10. .config("hive.exec.dynamic.partition.mode", "nonstrict")
  11. .config("hive.warehouse.subdir.inherit.perms", "false")
  12. .enableHiveSupport()
  13. .getOrCreate()
  14. // 定义变量,日更的时间,月更的时间
  15. val sf1 = new SimpleDateFormat("yyyyMMdd")
  16. //当前的时间多一天的时间
  17. val date1 = new Date(System.currentTimeMillis - 1000 * 60 * 60 * 24)
  18. val ptt_day = sf1.format(date1)
  19. // 构建有效号码池
  20. spark.sql(
  21. """
  22. |select
  23. | mobile
  24. |from
  25. | cl_nlp.nlp_mobile_di
  26. |where
  27. | max_date>'20220101'
  28. |group by
  29. | mobile
  30. |""".stripMargin).cache().createOrReplaceTempView("tmp1_mobile")
  31. // 将aim方式三张数据表拼接到一起
  32. // 按照日更新的数据为:cl_nlp.nlp_oppo_aim_df,cl_nlp.nlp_xiaomi_aim_df,cl_nlp.nlp_aliyun_aim_df
  33. spark.sql(
  34. """
  35. |select
  36. | substr(mobile,4) mobile,
  37. | device
  38. |from
  39. | cl_nlp.nlp_oppo_aim_df
  40. |union all
  41. |select
  42. | mobile,
  43. | device
  44. |from
  45. | cl_nlp.nlp_device_xiaomi_aim_df
  46. |group by
  47. | mobile,
  48. | device
  49. |union all
  50. |select
  51. | mobile,
  52. | device
  53. |from
  54. | cl_nlp.nlp_aliyun_aim_df
  55. |group by
  56. | mobile
  57. | device
  58. |""".stripMargin).cache().createOrReplaceTempView("tmp2_aim")
  59. //ip短信:cl_nlp.dwd_vivo_device_result_agg,天全量表
  60. spark.sql(
  61. s"""
  62. |select
  63. | mobile
  64. |from
  65. | cl_nlp.dwd_vivo_device_result_agg
  66. |where
  67. | ptt_day='$ptt_day'
  68. |group by
  69. | mobile
  70. |""".stripMargin).cache().createOrReplaceTempView("tmp3_ip")
  71. // 数据量较大:去重后亿级别
  72. // 视频短信:cl_ods.ods_rcs_m_batch_detail_di
  73. spark.sql(
  74. s"""
  75. |select
  76. | concat(cl_ods.decrypt(substr(phone,1,30)),substr(phone,31,34)) mobile,
  77. |from
  78. | cl_ods.ods_rcs_m_batch_detail_di
  79. |group by
  80. | mobile
  81. |""".stripMargin).cache().createOrReplaceTempView("tmp4_video")
  82. // 数据量较大,去重后亿级别
  83. //SDK数据:cl_cdm.dwd_fls_sdk_mobile_detail_org_di
  84. spark.sql(
  85. s"""
  86. |select
  87. | *
  88. |from
  89. | (select
  90. | mobile,
  91. | telcom,
  92. | base_province,
  93. | base_city,
  94. | appplatform,
  95. | devicename,
  96. | imei,
  97. | oaid,
  98. | row_number()over(partition BY mobile ORDER BY m_time DESC) rn
  99. | from
  100. | cl_cdm.dwd_fls_sdk_mobile_detail_org_di
  101. | ) t
  102. |where
  103. | t.rn = 1
  104. |""".stripMargin).cache().createOrReplaceTempView("tmp5_sdk")
  105. // 表格与号码池之间拼接
  106. //aim数据与号码库拼接
  107. spark.sql(
  108. """
  109. |select
  110. | tmp1_mobile.mobile
  111. | tmp2_aim.device
  112. | if(tmp2.device is NULL,0,1) as aim
  113. |from
  114. | tmp1_mobile
  115. |left join
  116. | tmp2_aim
  117. |on
  118. | tmp1_mobile.mobile=tmp2_aim.mobile
  119. |""".stripMargin).cache().createOrReplaceTempView("tmp6_aim_mobile")
  120. // ip号码与号码库拼接
  121. spark.sql(
  122. """
  123. |select
  124. | tmp6_aim_mobil.mobile,
  125. | tmp6_aim_mobi.device
  126. | tmp6_aim_mobi.aim
  127. | if(tmp3_ip is null,0,1) as rcs_ip
  128. |from
  129. | tmp6_aim_mobile
  130. |left join
  131. | tmp3_ip
  132. |on
  133. | tmp6_aim_mobile.mobile = tmp3_ip.mobile
  134. |""".stripMargin).cache().createOrReplaceTempView("tmp7_aim_ip_mobile")
  135. //视频短信号码与号码库拼接
  136. spark.sql(
  137. """
  138. |select
  139. | tmp7_aim_ip_mobile.mobile,
  140. | tmp7_aim_ip_mobile.device,
  141. | tmp7_aim_ip_mobile.aim,
  142. | tmp7_aim_ip_mobile.rcs_ip
  143. | if(tmp4_video. is NULL,0,1) as rcs_video
  144. |from
  145. | tmp7_aim_ip_mobile
  146. |left join
  147. | tmp4_video
  148. |on
  149. | tmp7_aim_ip_mobile.mobile=tmp4_video.mobile
  150. |""".stripMargin).cache().createOrReplaceTempView("tmp8_aim_ip_video_mobile")
  151. //SDK数据与号码库拼库
  152. spark.sql(
  153. """
  154. |insert overwrite into cl_nlp.nlp_contact_label_system
  155. |select
  156. | tmp8_aim_ip_mobile.mobile
  157. | tmp5_sdk.telcom,
  158. | tmp5_sdk.base_province,
  159. | tmp5_sdk.base_city,
  160. | tmp5_sdk.appplatform,
  161. | tmp5_sdk.devicename,
  162. | tmp5_sdk.imei,
  163. | tmp5_sdk.oaid,
  164. | tmp8_aim_ip_mobile.aim,
  165. | tmp8_aim_ip_mobile.device,
  166. | tmp8_aim_ip_mobile.rcs_ip.
  167. | tmp8_aim_ip_mobile.rcs_video
  168. |from
  169. | tmp8_aim_ip_video_mobile
  170. |left join
  171. | tmp5_sdk
  172. |on
  173. | tmp8_aim_ip_video_mobile.mobile = tmp5_sdk.mobileimei
  174. |""".stripMargin)
  175. spark.table("tmp1_mobile").unpersist()
  176. spark.table("tmp2_aim").unpersist()
  177. spark.table("tmp3_ip").unpersist()
  178. spark.table("tmp4_video").unpersist()
  179. spark.table("tmp5_sdk").unpersist()
  180. spark.table("tmp6_aim_mobile").unpersist()
  181. spark.table("tmp7_aim_ip_mobile").unpersist()
  182. spark.table("tmp8_aim_ip_video_mobile").unpersist()
  183. spark.stop()
  184. }
  185. }

注:该代码暂未在生产环境中实践,由于数据量巨大这样拼接方式可能耗费较大资源,后续会继续优化