问题导读:
1、如何设计用户行为启动表数据解析?
2、get_json_object 函数如何使用?
3、DWD 层用户行为事件表数据如何解析?
4、如何自定义 UDF 函数?
**
一、数仓搭建 - DWD 层
- 1)对用户行为数据解析
- 2)对核心数据进行判空过滤
- 3)对业务数据采用维度模型重新建模,即维度退化
1.1 DWD 层(用户行为启动表数据解析)
1.1.1 创建启动表 1)建表语句
1. drop table if exists dwd_start_log;
2. CREATE EXTERNAL TABLE dwd_start_log(
3. `mid_id` string,
4. `user_id` string,
5. `version_code` string,
6. `version_name` string,
7. `lang` string,
8. `source` string,
9. `os` string,
10. `area` string,
11. `model` string,
12. `brand` string,
13. `sdk_version` string,
14. `gmail` string,
15. `height_width` string,
16. `app_time` string,
17. `network` string,
18. `lng` string,
19. `lat` string,
20. `entry` string,
21. `open_ad_type` string,
22. `action` string,
23. `loading_time` string,
24. `detail` string,
25. `extend1` string
26. )
27. PARTITIONED BY (dt string)
28. stored as parquet
29. location '/warehouse/gmall/dwd/dwd_start_log/'
30. TBLPROPERTIES('parquet.compression'='lzo');
31.
说明:数据采用 parquet 存储方式,是可以支持切片的,不需要再对数据创建索引
1.1.2 get_json_object 函数使用
1)输入数据 xjson
1. Xjson=[{"name":" 大 郎 ","sex":" 男 ","age":"25"},{"name":" 西 门 庆 ","sex":" 男","age":"47"}]
2)取出第一个 json 对象
1. SELECT get_json_object(xjson,"$.[0]") FROM person;
结果是:{“name”:“大郎”,“sex”:“男”,“age”:“25”}
3)取出第一个 json 的 age 字段的值
1. SELECT get_json_object(xjson,"$.[0].age") FROM person;
结果是:25
1.1.3 向启动表导入数据
1. insert overwrite table dwd_start_log
2. PARTITION (dt='2020-03-10')
3. select
4. get_json_object(line,'$.mid') mid_id,
5. get_json_object(line,'$.uid') user_id,
6. get_json_object(line,'$.vc') version_code,
7. get_json_object(line,'$.vn') version_name,
8. get_json_object(line,'$.l') lang,
9. get_json_object(line,'$.sr') source,
10. get_json_object(line,'$.os') os,
11. get_json_object(line,'$.ar') area,
12. get_json_object(line,'$.md') model,
13. get_json_object(line,'$.ba') brand,
14. get_json_object(line,'$.sv') sdk_version,
15. get_json_object(line,'$.g') gmail,
16. get_json_object(line,'$.hw') height_width,
17. get_json_object(line,'$.t') app_time,
18. get_json_object(line,'$.nw') network,
19. get_json_object(line,'$.ln') lng,
20. get_json_object(line,'$.la') lat,
21. get_json_object(line,'$.entry') entry,
22. get_json_object(line,'$.open_ad_type') open_ad_type,
23. get_json_object(line,'$.action') action,
24. get_json_object(line,'$.loading_time') loading_time,
25. get_json_object(line,'$.detail') detail,
26. get_json_object(line,'$.extend1') extend1
27. from ods_start_log
28. where dt='2020-03-10';
29.
复制代码
3)测试
1. select * from dwd_start_log where dt='2020-03-10' limit 2;
复制代码
1.1.4 DWD 层启动表加载数据脚本
1)vim ods_to_dwd_log.sh
在脚本中编写如下内容
1. #!/bin/bash
2. # 定义变量方便修改
3. APP=gmall
4. hive=/opt/modules/hive/bin/hive
5. # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
6. if [ -n "$1" ] ;then
7. do_date=$1
8. else
9. do_date=`date -d "-1 day" +%F`
10. fi
11. sql="
12. set hive.exec.dynamic.partition.mode=nonstrict;
13. insert overwrite table "$APP".dwd_start_log
14. PARTITION (dt='$do_date')
15. select
16. get_json_object(line,'$.mid') mid_id,
17. get_json_object(line,'$.uid') user_id,
18. get_json_object(line,'$.vc') version_code,
19. get_json_object(line,'$.vn') version_name,
20. get_json_object(line,'$.l') lang,
21. get_json_object(line,'$.sr') source,
22. get_json_object(line,'$.os') os,
23. get_json_object(line,'$.ar') area,
24. get_json_object(line,'$.md') model,
25. get_json_object(line,'$.ba') brand,
26. get_json_object(line,'$.sv') sdk_version,
27. get_json_object(line,'$.g') gmail,
28. get_json_object(line,'$.hw') height_width,
29. get_json_object(line,'$.t') app_time,
30. get_json_object(line,'$.nw') network,
31. get_json_object(line,'$.ln') lng,
32. get_json_object(line,'$.la') lat,
33. get_json_object(line,'$.entry') entry,
34. get_json_object(line,'$.open_ad_type') open_ad_type,
35. get_json_object(line,'$.action') action,
36. get_json_object(line,'$.loading_time') loading_time,
37. get_json_object(line,'$.detail') detail,
38. get_json_object(line,'$.extend1') extend1
39. from "$APP".ods_start_log
40. where dt='$do_date';
41. "
42. $hive -e "$sql"
43.
2)增加脚本执行权限
chmod 770 ods_to_dwd_log.sh 3)脚本使用
ods_to_dwd_log.sh 2020-03-11 4)查询导入结果
select * from dwd_start_log where dt=’2020-03-11’ limit 2;
1.2 DWD 层(用户行为事件表数据解析)
1.2.1 创建基础明细表
明细表用于存储 ODS 层原始表转换过来的明细数据
1)创建事件日志基础明细表
1. drop table if exists dwd_base_event_log;
2. CREATE EXTERNAL TABLE dwd_base_event_log(
3. `mid_id` string,
4. `user_id` string,
5. `version_code` string,
6. `version_name` string,
7. `lang` string,
8. `source` string,
9. `os` string,
10. `area` string,
11. `model` string,
12. `brand` string,
13. `sdk_version` string,
14. `gmail` string,
15. `height_width` string,
16. `app_time` string,
17. `network` string,
18. `lng` string,
19. `lat` string,
20. `event_name` string,
21. `event_json` string,
22. `server_time` string)
23. PARTITIONED BY (`dt` string)
24. stored as parquet
25. location '/warehouse/gmall/dwd/dwd_base_event_log/'
26. TBLPROPERTIES('parquet.compression'='lzo');
27.
复制代码
2)说明:其中 event_name 和 event_json 用来对应事件名和整个事件。这个地方将原始日志1 对多的形式拆分出来了。操作的时候我们需要将原始日志展平,需要用到 UDF 和 UDTF
1.2.2 自定义 UDF 函数(解析公共字段)
UDF 函数特点:一行进一行出。简称,一进一出
1)创建一个 maven 工程:hivefunction
2)创建包名:com.zsy.udf
3)在 pom.xml 文件中添加如下内容
1. <properties>
2. <hive.version>2.3.0</hive.version>
3. </properties>
4.
5. <repositories>
6. <repository>
7. <id>spring-plugin</id>
8. <url>https://repo.spring.io/plugins-release/</url>
9. </repository>
10. </repositories>
11.
12. <dependencies>
13. <!--添加 hive 依赖-->
14. <dependency>
15. <groupId>org.apache.hive</groupId>
16. <artifactId>hive-exec</artifactId>
17. <version>${hive.version}</version>
18. </dependency>
19. </dependencies>
20.
21. <build>
22. <plugins>
23. <plugin>
24. <artifactId>maven-compiler-plugin</artifactId>
25. <version>2.3.2</version>
26. <configuration>
27. <source>1.8</source>
28. <target>1.8</target>
29. </configuration>
30. </plugin>
31. <plugin>
32. <artifactId>maven-assembly-plugin</artifactId>
33. <configuration>
34. <descriptorRefs>
35. <descriptorRef>jar-with-dependencies</descriptorRef>
36. </descriptorRefs>
37. </configuration>
38. <executions>
39. <execution>
40. <id>make-assembly</id>
41. <phase>package</phase>
42. <goals>
43. <goal>single</goal>
44. </goals>
45. </execution>
46. </executions>
47. </plugin>
48. </plugins>
49. </build>
注意 1:如果 hive 的 jar 包下载失败,可以将如下参数配置添加到 idea 中
1. -Dmaven.wagon.http.ssl.insecure=true -Dmaven.wagon.http.ssl.allowall=true
2. -Dmaven.wagon.http.ssl.ignore.validity.dates=true
详情请点击博客👉:maven下载依赖时候忽略SSL证书校验
注意 2:打包时如果出现如下错误,说明 idea 内存溢出
1. Exception in thread "main" java.lang.StackOverflowError
解决办法:把 -Xss4m 添加到下图位置
4)UDF 用于解析公共字段
1. package com.zsy.udf;
2.
3. import org.apache.commons.lang.StringUtils;
4. import org.apache.hadoop.hive.ql.exec.UDF;
5. import org.json.JSONObject;
6.
7. public class BaseFieldUDF extends UDF {
8.
9. public String evaluate(String line,String key){
10. // 1.切分数据
11. String[] log = line.split("\\|");
12.
13. String result = "";
14.
15. // 2.校验
16. if(log.length != 2 || StringUtils.isBlank(log[1])){
17. return result;
18. }
19.
20. // 3.解析数据获取json对象
21. JSONObject json = new JSONObject(log[1].trim());
22.
23. // 4.根据传入的key获取对应的值
24. if("st".equals(key)){
25. result = log[0].trim();
26. }else if("et".equals(key)){
27. if(json.has("et")){
28. result = json.getString("et");
29. }
30. }else{
31. JSONObject cm = json.getJSONObject("cm");
32. if(cm.has(key)){
33. result = cm.getString(key);
34. }
35. }
36. return result;
37. }
38.
39. /**
40. * 测试
41. */
42. // public static void main(String[] args) {
43. // String line = "1583776132686|{"cm":{"ln":"-42.8","sv":"V2.3.9","os":"8.1.7","g":"X470IP70@gmail.com","mid":"0","nw":"4G","l":"en","vc":"13","hw":"1080*1920","ar":"MX","uid":"0","t":"1583758268106","la":"-31.3","md":"sumsung-18","vn":"1.1.1","ba":"Sumsung","sr":"M"},"ap":"app","et":[{"ett":"1583685512624","en":"display","kv":{"goodsid":"0","action":"2","extend1":"2","place":"1","category":"17"}},{"ett":"1583769686402","en":"newsdetail","kv":{"entry":"3","goodsid":"1","news_staytime":"16","loading_time":"0","action":"4","showtype":"5","category":"97","type1":""}},{"ett":"1583709065211","en":"ad","kv":{"activityId":"1","displayMills":"58537","entry":"1","action":"3","contentType":"0"}},{"ett":"1583693966746","en":"active_background","kv":{"active_source":"3"}},{"ett":"1583734521683","en":"error","kv":{"errorDetail":"java.lang.NullPointerException\\\\n at cn.lift.appIn.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)\\\\n at cn.lift.dfdf.web.AbstractBaseController.validInbound","errorBrief":"at cn.lift.dfdf.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)"}},{"ett":"1583755388633","en":"praise","kv":{"target_id":0,"id":1,"type":3,"add_time":"1583713812739","userid":4}}]}";
44. // String result = new BaseFieldUDF().evaluate(line, "st");
45. // System.out.println(result);
46. // }
47. }
48.
复制代码
1.2.3 自定义 UDTF 函数(解析事件字段)
UDTF 函数特点:多行进多行出。 简称,多进多出。
1)创建包名:com.zsy.udtf
2)在 com.zsy.udtf 包下创建类名:EventJsonUDTF
3)用于展开业务字段
1. package com.zsy.udtf;
2.
3. import org.apache.commons.lang.StringUtils;
4. import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
5. import org.apache.hadoop.hive.ql.metadata.HiveException;
6. import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
7. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
8. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
9. import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
10. import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
11. import org.json.JSONArray;
12. import org.json.JSONException;
13.
14. import java.util.ArrayList;
15. import java.util.List;
16.
17. public class EventJsonUDTF extends GenericUDTF {
18.
19. @Override
20. public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
21. // 定义UDTF返回值类型和名称
22. List<String> fieldName = new ArrayList<>();
23. List<ObjectInspector> fieldType = new ArrayList<>();
24. fieldName.add("event_name");
25. fieldName.add("event_json");
26. fieldType.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
27. fieldType.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
28. return ObjectInspectorFactory.getStandardStructObjectInspector(fieldName, fieldType);
29. }
30.
31. @Override
32. public void process(Object[] objects) throws HiveException {
33. // 1.获取传入的数据,传入的是Json array =》 UDF传入et
34. String input = objects[0].toString();
35.
36. // 2.校验
37. if (StringUtils.isBlank(input)) {
38. return;
39. } else {
40. JSONArray ja = new JSONArray(input);
41. if (ja == null) {
42. return;
43. }
44. // 循环遍历array当中的每一个元素,封装成 事件名称和事件内容
45. for (int i = 0; i < ja.length(); i++) {
46. String[] result = new String[2];
47. try {
48. result[0] = ja.getJSONObject(i).getString("en");
49. result[1] = ja.getString(i);
50. } catch (JSONException ex) {
51. continue;
52. }
53. // 写出数据
54. forward(result);
55. }
56. }
57. }
58.
59. @Override
60. public void close() throws HiveException {
61.
62. }
63. }
64.
4)打包,上传到HDFS的 /user/hive/jars
1. hdfs dfs -mkdir /user/hive/jars
2.
3. hdfs dfs -put ./hivefunction-1.0-SNAPSHOT.jar /user/hive/jars
注意:如果修改了自定义函数重新生成 jar 包怎么处理?只需要替换 HDFS 路径上的旧
jar 包,然后重启 Hive 客户端即可
1.2.4 解析事件日志基础明细表
1)解析事件日志基础明细表
1. insert overwrite table dwd_base_event_log partition(dt='2020-03-10')
2. select
3. base_analizer(line,'mid') as mid_id,
4. base_analizer(line,'uid') as user_id,
5. base_analizer(line,'vc') as version_code,
6. base_analizer(line,'vn') as version_name,
7. base_analizer(line,'l') as lang,
8. base_analizer(line,'sr') as source,
9. base_analizer(line,'os') as os,
10. base_analizer(line,'ar') as area,
11. base_analizer(line,'md') as model,
12. base_analizer(line,'ba') as brand,
13. base_analizer(line,'sv') as sdk_version,
14. base_analizer(line,'g') as gmail,
15. base_analizer(line,'hw') as height_width,
16. base_analizer(line,'t') as app_time,
17. base_analizer(line,'nw') as network,
18. base_analizer(line,'ln') as lng,
19. base_analizer(line,'la') as lat,
20. event_name,
21. event_json,
22. base_analizer(line,'st') as server_time
23. from ods_event_log lateral view flat_analizer(base_analizer(line,'et')) tmp_flat as
24. event_name,event_json
25. where dt='2020-03-10' and base_analizer(line,'et')<>'';
2)测试
select * from dwd_base_event_log where dt=’2020-03-10’ limit 2;
1.2.5 DWD 层数据解析脚本
1)vim ods_to_dwd_base_log.sh
在脚本中编写如下内容
1. #!/bin/bash
2. # 定义变量方便修改
3. APP=gmall
4. hive=/opt/modules/hive/bin/hive
5. # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
6. if [ -n "$1" ] ;then
7. do_date=$1
8. else
9. do_date=`date -d "-1 day" +%F`
10. fi
11. sql="
12. use gmall;
13. insert overwrite table "$APP".dwd_base_event_log partition(dt='$do_date')
14. select
15. base_analizer(line,'mid') as mid_id,
16. base_analizer(line,'uid') as user_id,
17. base_analizer(line,'vc') as version_code,
18. base_analizer(line,'vn') as version_name,
19. base_analizer(line,'l') as lang,
20. base_analizer(line,'sr') as source,
21. base_analizer(line,'os') as os,
22. base_analizer(line,'ar') as area,
23. base_analizer(line,'md') as model,
24. base_analizer(line,'ba') as brand,
25. base_analizer(line,'sv') as sdk_version,
26. base_analizer(line,'g') as gmail,
27. base_analizer(line,'hw') as height_width,
28. base_analizer(line,'t') as app_time,
29. base_analizer(line,'nw') as network,
30. base_analizer(line,'ln') as lng,
31. base_analizer(line,'la') as lat,
32. event_name,
33. event_json,
34. base_analizer(line,'st') as server_time
35. from "$APP".ods_event_log lateral view flat_analizer(base_analizer(line,'et')) tem_flat as
36. event_name,event_json
37. where dt='$do_date' and base_analizer(line,'et')<>''; "
38. $hive -e "$sql"
39.
注意:使用自定义函数时,需要在执行脚本前,增加上要使用的库。例如:use gmall;
2)增加脚本执行权限
1. chmod 770 ods_to_dwd_base_log.sh
3)脚本使用
1. ods_to_dwd_base_log.sh 2020-03-11
4)查询导入结果
1. select * from dwd_base_event_log where dt='2020-03-11' limit 2;
复制代码
1.3 DWD 层(用户行为事件表获取)
1.3.1 商品点击表
1)建表语句
1. drop table if exists dwd_display_log;
2. CREATE EXTERNAL TABLE dwd_display_log(
3. `mid_id` string,
4. `user_id` string,
5. `version_code` string,
6. `version_name` string,
7. `lang` string,
8. `source` string,
9. `os` string,
10. `area` string,
11. `model` string,
12. `brand` string,
13. `sdk_version` string,
14. `gmail` string,
15. `height_width` string,
16. `app_time` string,
17. `network` string,
18. `lng` string,
19. `lat` string,
20. `action` string,
21. `goodsid` string,
22. `place` string,
23. `extend1` string,
24. `category` string,
25. `server_time` string
26. )
27. PARTITIONED BY (dt string)
28. stored as parquet
29. location '/warehouse/gmall/dwd/dwd_display_log/'
30. TBLPROPERTIES('parquet.compression'='lzo');
31.
复制代码
2)导入数据
1. insert overwrite table dwd_display_log PARTITION (dt='2020-03-10')
2. select
3. mid_id,
4. user_id,
5. version_code,
6. version_name,
7. lang,
8. source,
9. os,
10. area,
11. model,
12. brand,
13. sdk_version,
14. gmail,
15. height_width,
16. app_time,
17. network,
18. lng,
19. lat,
20. get_json_object(event_json,'$.kv.action') action,
21. get_json_object(event_json,'$.kv.goodsid') goodsid,
22. get_json_object(event_json,'$.kv.place') place,
23. get_json_object(event_json,'$.kv.extend1') extend1,
24. get_json_object(event_json,'$.kv.category') category,
25. server_time
26. from dwd_base_event_log
27. where dt='2020-03-10' and event_name='display';
28.
复制代码
3)测试
1. select * from dwd_display_log where dt='2020-03-10' limit 2;
复制代码
1.3.2 商品详情页表
1)建表语句
1. drop table if exists dwd_newsdetail_log;
2. CREATE EXTERNAL TABLE dwd_newsdetail_log(
3. `mid_id` string,
4. `user_id` string,
5. `version_code` string,
6. `version_name` string,
7. `lang` string,
8. `source` string,
9. `os` string,
10. `area` string,
11. `model` string,
12. `brand` string,
13. `sdk_version` string,
14. `gmail` string,
15. `height_width` string,
16. `app_time` string,
17. `network` string,
18. `lng` string,
19. `lat` string,
20. `entry` string,
21. `action` string,
22. `goodsid` string,
23. `showtype` string,
24. `news_staytime` string,
25. `loading_time` string,
26. `type1` string,
27. `category` string,
28. `server_time` string)
29. PARTITIONED BY (dt string)
30. stored as parquet
31. location '/warehouse/gmall/dwd/dwd_newsdetail_log/'
32. TBLPROPERTIES('parquet.compression'='lzo');
33.
复制代码
2)导入数据
1. insert overwrite table dwd_newsdetail_log PARTITION (dt='2020-03-10')
2. select
3. mid_id,
4. user_id,
5. version_code,
6. version_name,
7. lang,
8. source,
9. os,
10. area,
11. model,
12. brand,
13. sdk_version,
14. gmail,
15. height_width,
16. app_time,
17. network,
18. lng,
19. lat,
20. get_json_object(event_json,'$.kv.entry') entry,
21. get_json_object(event_json,'$.kv.action') action,
22. get_json_object(event_json,'$.kv.goodsid') goodsid,
23. get_json_object(event_json,'$.kv.showtype') showtype,
24. get_json_object(event_json,'$.kv.news_staytime') news_staytime,
25. get_json_object(event_json,'$.kv.loading_time') loading_time,
26. get_json_object(event_json,'$.kv.type1') type1,
27. get_json_object(event_json,'$.kv.category') category,
28. server_time
29. from dwd_base_event_log
30. where dt='2020-03-10' and event_name='newsdetail';
31.
复制代码
3)测试
1. select * from dwd_newsdetail_log where dt='2020-03-10' limit 2;
复制代码
1.3.3 商品列表页表
1)建表语句
1. drop table if exists dwd_loading_log;
2. CREATE EXTERNAL TABLE dwd_loading_log(
3. `mid_id` string,
4. `user_id` string,
5. `version_code` string,
6. `version_name` string,
7. `lang` string,
8. `source` string,
9. `os` string,
10. `area` string,
11. `model` string,
12. `brand` string,
13. `sdk_version` string,
14. `gmail` string,
15. `height_width` string,
16. `app_time` string,
17. `network` string,
18. `lng` string,
19. `lat` string,
20. `action` string,
21. `loading_time` string,
22. `loading_way` string,
23. `extend1` string,
24. `extend2` string,
25. `type` string,
26. `type1` string,
27. `server_time` string)
28. PARTITIONED BY (dt string)
29. stored as parquet
30. location '/warehouse/gmall/dwd/dwd_loading_log/'
31. TBLPROPERTIES('parquet.compression'='lzo');
32.
2)导入数据
1. insert overwrite table dwd_loading_log PARTITION (dt='2020-03-10')
2. select
3. mid_id,
4. user_id,
5. version_code,
6. version_name,
7. lang,
8. source,
9. os,
10. area,
11. model,
12. brand,
13. sdk_version,
14. gmail,
15. height_width,
16. app_time,
17. network,
18. lng,
19. lat,
20. get_json_object(event_json,'$.kv.action') action,
21. get_json_object(event_json,'$.kv.loading_time') loading_time,
22. get_json_object(event_json,'$.kv.loading_way') loading_way,
23. get_json_object(event_json,'$.kv.extend1') extend1,
24. get_json_object(event_json,'$.kv.extend2') extend2,
25. get_json_object(event_json,'$.kv.type') type,
26. get_json_object(event_json,'$.kv.type1') type1,
27. server_time
28. from dwd_base_event_log
29. where dt='2020-03-10' and event_name='loading';
30.
3)测试
1. hive (gmall)> select * from dwd_loading_log where dt='2020-03-10' limit 2;
复制代码
1.3.4 广告表
1)建表语句
1. drop table if exists dwd_ad_log;
2. CREATE EXTERNAL TABLE dwd_ad_log(
3. `mid_id` string,
4. `user_id` string,
5. `version_code` string,
6. `version_name` string,
7. `lang` string,
8. `source` string,
9. `os` string,
10. `area` string,
11. `model` string,
12. `brand` string,
13. `sdk_version` string,
14. `gmail` string,
15. `height_width` string,
16. `app_time` string,
17. `network` string,
18. `lng` string,
19. `lat` string,
20. `entry` string,
21. `action` string,
22. `contentType` string,
23. `displayMills` string,
24. `itemId` string,
25. `activityId` string,
26. `server_time` string)
27. PARTITIONED BY (dt string)
28. stored as parquet
29. location '/warehouse/gmall/dwd/dwd_ad_log/'
30. TBLPROPERTIES('parquet.compression'='lzo');
31.
2)导入数据
1. insert overwrite table dwd_ad_log PARTITION (dt='2020-03-10')
2. select
3. mid_id,
4. user_id,
5. version_code,
6. version_name,
7. lang,
8. source,
9. os,
10. area,
11. model,
12. brand,
13. sdk_version,
14. gmail,
15. height_width,
16. app_time,
17. network,
18. lng,
19. lat,
20. get_json_object(event_json,'$.kv.entry') entry,
21. get_json_object(event_json,'$.kv.action') action,
22. get_json_object(event_json,'$.kv.contentType') contentType,
23. get_json_object(event_json,'$.kv.displayMills') displayMills,
24. get_json_object(event_json,'$.kv.itemId') itemId,
25. get_json_object(event_json,'$.kv.activityId') activityId,
26. server_time
27. from dwd_base_event_log
28. where dt='2020-03-10' and event_name='ad';
29.
3)测试
1. select * from dwd_ad_log where dt='2020-03-10' limit 2;
1.3.5 消息通知表
1)建表语句
1. drop table if exists dwd_notification_log;
2. CREATE EXTERNAL TABLE dwd_notification_log(
3. `mid_id` string,
4. `user_id` string,
5. `version_code` string,
6. `version_name` string,
7. `lang` string,
8. `source` string,
9. `os` string,
10. `area` string,
11. `model` string,
12. `brand` string,
13. `sdk_version` string,
14. `gmail` string,
15. `height_width` string,
16. `app_time` string,
17. `network` string,
18. `lng` string,
19. `lat` string,
20. `action` string,
21. `noti_type` string,
22. `ap_time` string,
23. `content` string,
24. `server_time` string
25. )
26. PARTITIONED BY (dt string)
27. stored as parquet
28. location '/warehouse/gmall/dwd/dwd_notification_log/'
29. TBLPROPERTIES('parquet.compression'='lzo');
30.
复制代码
2)导入数据
1. insert overwrite table dwd_notification_log PARTITION (dt='2020-03-10')
2. select
3. mid_id,
4. user_id,
5. version_code,
6. version_name,
7. lang,
8. source,
9. os,
10. area,
11. model,
12. brand,
13. sdk_version,
14. gmail,
15. height_width,
16. app_time,
17. network,
18. lng,
19. lat,
20. get_json_object(event_json,'$.kv.action') action,
21. get_json_object(event_json,'$.kv.noti_type') noti_type,
22. get_json_object(event_json,'$.kv.ap_time') ap_time,
23. get_json_object(event_json,'$.kv.content') content,
24. server_time
25. from dwd_base_event_log
26. where dt='2020-03-10' and event_name='notification';
27.
复制代码
3)测试
1. select * from dwd_notification_log where dt='2020-03-10' limit 2;
复制代码
1.3.6 用户后台活跃表
1)建表语句
1. drop table if exists dwd_active_background_log;
2. CREATE EXTERNAL TABLE dwd_active_background_log(
3. `mid_id` string,
4. `user_id` string,
5. `version_code` string,
6. `version_name` string,
7. `lang` string,
8. `source` string,
9. `os` string,
10. `area` string,
11. `model` string,
12. `brand` string,
13. `sdk_version` string,
14. `gmail` string,
15. `height_width` string,
16. `app_time` string,
17. `network` string,
18. `lng` string,
19. `lat` string,
20. `active_source` string,
21. `server_time` string
22. )
23. PARTITIONED BY (dt string)
24. stored as parquet
25. location '/warehouse/gmall/dwd/dwd_background_log/'
26. TBLPROPERTIES('parquet.compression'='lzo');
27.
复制代码
2)导入数据
1. insert overwrite table dwd_active_background_log PARTITION
2. (dt='2020-03-10')
3. select
4. mid_id,
5. user_id,
6. version_code,
7. version_name,
8. lang,
9. source,
10. os,
11. area,
12. model,
13. brand,
14. sdk_version,
15. gmail,
16. height_width,
17. app_time,
18. network,
19. lng,
20. lat,
21. get_json_object(event_json,'$.kv.active_source') active_source,
22. server_time
23. from dwd_base_event_log
24. where dt='2020-03-10' and event_name='active_background';
25.
复制代码
3)测试
1. select * from dwd_active_background_log where dt='2020-03-10' limit 2;
复制代码
1.3.7 评论表
1)建表语句
1. drop table if exists dwd_comment_log;
2. CREATE EXTERNAL TABLE dwd_comment_log(
3. `mid_id` string,
4. `user_id` string,
5. `version_code` string,
6. `version_name` string,
7. `lang` string,
8. `source` string,
9. `os` string,
10. `area` string,
11. `model` string,
12. `brand` string,
13. `sdk_version` string,
14. `gmail` string,
15. `height_width` string,
16. `app_time` string,
17. `network` string,
18. `lng` string,
19. `lat` string,
20. `comment_id` int,
21. `userid` int,
22. `p_comment_id` int,
23. `content` string,
24. `addtime` string,
25. `other_id` int,
26. `praise_count` int,
27. `reply_count` int,
28. `server_time` string
29. )
30. PARTITIONED BY (dt string)
31. stored as parquet
32. location '/warehouse/gmall/dwd/dwd_comment_log/'
33. TBLPROPERTIES('parquet.compression'='lzo');
34.
2)导入数据
1. insert overwrite table dwd_comment_log PARTITION (dt='2020-03-10')
2. select
3. mid_id,
4. user_id,
5. version_code,
6. version_name,
7. lang,
8. source,
9. os,
10. area,
11. model,
12. brand,
13. sdk_version,
14. gmail,
15. height_width,
16. app_time,
17. network,
18. lng,
19. lat,
20. get_json_object(event_json,'$.kv.comment_id') comment_id,
21. get_json_object(event_json,'$.kv.userid') userid,
22. get_json_object(event_json,'$.kv.p_comment_id') p_comment_id,
23. get_json_object(event_json,'$.kv.content') content,
24. get_json_object(event_json,'$.kv.addtime') addtime,
25. get_json_object(event_json,'$.kv.other_id') other_id,
26. get_json_object(event_json,'$.kv.praise_count') praise_count,
27. get_json_object(event_json,'$.kv.reply_count') reply_count,
28. server_time
29. from dwd_base_event_log
30. where dt='2020-03-10' and event_name='comment';
31.
3)测试
1. select * from dwd_comment_log where dt='2020-03-10' limit 2;
1.3.8 收藏表
1)建表语句
1. drop table if exists dwd_favorites_log;
2. CREATE EXTERNAL TABLE dwd_favorites_log(
3. `mid_id` string,
4. `user_id` string,
5. `version_code` string,
6. `version_name` string,
7. `lang` string,
8. `source` string,
9. `os` string,
10. `area` string,
11. `model` string,
12. `brand` string,
13. `sdk_version` string,
14. `gmail` string,
15. `height_width` string,
16. `app_time` string,
17. `network` string,
18. `lng` string,
19. `lat` string,
20. `id` int,
21. `course_id` int,
22. `userid` int,
23. `add_time` string,
24. `server_time` string
25. )
26. PARTITIONED BY (dt string)
27. stored as parquet
28. location '/warehouse/gmall/dwd/dwd_favorites_log/'
29. TBLPROPERTIES('parquet.compression'='lzo');
30.
复制代码
2)导入数据
1. insert overwrite table dwd_favorites_log PARTITION (dt='2020-03-10')
2. select
3. mid_id,
4. user_id,
5. version_code,
6. version_name,
7. lang,
8. source,
9. os,
10. area,
11. model,
12. brand,
13. sdk_version,
14. gmail,
15. height_width,
16. app_time,
17. network,
18. lng,
19. lat,
20. get_json_object(event_json,'$.kv.id') id,
21. get_json_object(event_json,'$.kv.course_id') course_id,
22. get_json_object(event_json,'$.kv.userid') userid,
23. get_json_object(event_json,'$.kv.add_time') add_time,
24. server_time
25. from dwd_base_event_log
26. where dt='2020-03-10' and event_name='favorites';
3)测试
1. select * from dwd_favorites_log where dt='2020-03-10' limit 2;
1.3.9 点赞表
1)建表语句
1. drop table if exists dwd_praise_log;
2. CREATE EXTERNAL TABLE dwd_praise_log(
3. `mid_id` string,
4. `user_id` string,
5. `version_code` string,
6. `version_name` string,
7. `lang` string,
8. `source` string,
9. `os` string,
10. `area` string,
11. `model` string,
12. `brand` string,
13. `sdk_version` string,
14. `gmail` string,
15. `height_width` string,
16. `app_time` string,
17. `network` string,
18. `lng` string,
19. `lat` string,
20. `id` string,
21. `userid` string,
22. `target_id` string,
23. `type` string,
24. `add_time` string,
25. `server_time` string
26. )
27. PARTITIONED BY (dt string)
28. stored as parquet
29. location '/warehouse/gmall/dwd/dwd_praise_log/'
30. TBLPROPERTIES('parquet.compression'='lzo');
2)导入数据
1. insert overwrite table dwd_praise_log PARTITION (dt='2020-03-10')
2. select
3. mid_id,
4. user_id,
5. version_code,
6. version_name,
7. lang,
8. source,
9. os,
10. area,
11. model,
12. brand,
13. sdk_version,
14. gmail,
15. height_width,
16. app_time,
17. network,
18. lng,
19. lat,
20. get_json_object(event_json,'$.kv.id') id,
21. get_json_object(event_json,'$.kv.userid') userid,
22. get_json_object(event_json,'$.kv.target_id') target_id,
23. get_json_object(event_json,'$.kv.type') type,
24. get_json_object(event_json,'$.kv.add_time') add_time,
25. server_time
26. from dwd_base_event_log
27. where dt='2020-03-10' and event_name='praise';
28.
3)测试
1. select * from dwd_praise_log where dt='2020-03-10' limit 2;
1.3.10 错误日志表
1)建表语句
1. drop table if exists dwd_error_log;
2. CREATE EXTERNAL TABLE dwd_error_log(
3. `mid_id` string,
4. `user_id` string,
5. `version_code` string,
6. `version_name` string,
7. `lang` string,
8. `source` string,
9. `os` string,
10. `area` string,
11. `model` string,
12. `brand` string,
13. `sdk_version` string,
14. `gmail` string,
15. `height_width` string,
16. `app_time` string,
17. `network` string,
18. `lng` string,
19. `lat` string,
20. `errorBrief` string,
21. `errorDetail` string,
22. `server_time` string)
23. PARTITIONED BY (dt string)
24. stored as parquet
25. location '/warehouse/gmall/dwd/dwd_error_log/'
26. TBLPROPERTIES('parquet.compression'='lzo');
27.
复制代码
2)导入数据
1. insert overwrite table dwd_error_log PARTITION (dt='2020-03-10')
2. select
3. mid_id,
4. user_id,
5. version_code,
6. version_name,
7. lang,
8. source,
9. os,
10. area,
11. model,
12. brand,
13. sdk_version,
14. gmail,
15. height_width,
16. app_time,
17. network,
18. lng,
19. lat,
20. get_json_object(event_json,'$.kv.errorBrief') errorBrief,
21. get_json_object(event_json,'$.kv.errorDetail') errorDetail,
22. server_time
23. from dwd_base_event_log
24. where dt='2020-03-10' and event_name='error';
25.
复制代码
3)测试
1. select * from dwd_error_log where dt='2020-03-10' limit 2;
复制代码
1.3.11 DWD 层事件表加载数据脚本
1) vim ods_to_dwd_event_log.sh
在脚本中编写如下内容
1. #!/bin/bash
2. # 定义变量方便修改
3. APP=gmall
4. hive=/opt/modules/hive/bin/hive
5. # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
6. if [ -n "$1" ] ;then
7. do_date=$1
8. else
9. do_date=`date -d "-1 day" +%F`
10. fi
11. sql="
12. insert overwrite table "$APP".dwd_display_log
13. PARTITION (dt='$do_date')
14. select
15. mid_id,
16. user_id,
17. version_code,
18. version_name,
19. lang,
20. source,
21. os,
22. area,
23. model,
24. brand,
25. sdk_version,
26. gmail,
27. height_width,
28. app_time,
29. network,
30. lng,
31. lat,
32. get_json_object(event_json,'$.kv.action') action,
33. get_json_object(event_json,'$.kv.goodsid') goodsid,
34. get_json_object(event_json,'$.kv.place') place,
35. get_json_object(event_json,'$.kv.extend1') extend1,
36. get_json_object(event_json,'$.kv.category') category,
37. server_time
38. from "$APP".dwd_base_event_log
39. where dt='$do_date' and event_name='display';
40.
41.
42. insert overwrite table "$APP".dwd_newsdetail_log
43. PARTITION (dt='$do_date')
44. select
45. mid_id,
46. user_id,
47. version_code,
48. version_name,
49. lang,
50. source,
51. os,
52. area,
53. model,
54. brand,
55. sdk_version,
56. gmail,
57. height_width,
58. app_time,
59. network,
60. lng,
61. lat,
62. get_json_object(event_json,'$.kv.entry') entry,
63. get_json_object(event_json,'$.kv.action') action,
64. get_json_object(event_json,'$.kv.goodsid') goodsid,
65. get_json_object(event_json,'$.kv.showtype') showtype,
66. get_json_object(event_json,'$.kv.news_staytime')
67. news_staytime,
68. get_json_object(event_json,'$.kv.loading_time')
69. loading_time,
70. get_json_object(event_json,'$.kv.type1') type1,
71. get_json_object(event_json,'$.kv.category') category,
72. server_time
73. from "$APP".dwd_base_event_log
74. where dt='$do_date' and event_name='newsdetail';
75.
76.
77. insert overwrite table "$APP".dwd_loading_log
78. PARTITION (dt='$do_date')
79. select
80. mid_id,
81. user_id,
82. version_code,
83. version_name,
84. lang,
85. source,
86. os,
87. area,
88. model,
89. brand,
90. sdk_version,
91. gmail,
92. height_width,
93. app_time,
94. network,
95. lng,
96. lat,
97. get_json_object(event_json,'$.kv.action') action,
98. get_json_object(event_json,'$.kv.loading_time')
99. loading_time,
100. get_json_object(event_json,'$.kv.loading_way') loading_way,
101. get_json_object(event_json,'$.kv.extend1') extend1,
102. get_json_object(event_json,'$.kv.extend2') extend2,
103. get_json_object(event_json,'$.kv.type') type,
104. get_json_object(event_json,'$.kv.type1') type1,
105. server_time
106. from "$APP".dwd_base_event_log
107. where dt='$do_date' and event_name='loading';
108.
109.
110. insert overwrite table "$APP".dwd_ad_log
111. PARTITION (dt='$do_date')
112. select
113. mid_id,
114. user_id,
115. version_code,
116. version_name,
117. lang,
118. source,
119. os,
120. area,
121. model,
122. brand,
123. sdk_version,
124. gmail,
125. height_width,
126. app_time,
127. network,
128. lng,
129. lat,
130. get_json_object(event_json,'$.kv.entry') entry,
131. get_json_object(event_json,'$.kv.action') action,
132. get_json_object(event_json,'$.kv.contentType') contentType,
133. get_json_object(event_json,'$.kv.displayMills')
134. displayMills,
135. get_json_object(event_json,'$.kv.itemId') itemId,
136. get_json_object(event_json,'$.kv.activityId') activityId,
137. server_time
138. from "$APP".dwd_base_event_log
139. where dt='$do_date' and event_name='ad';
140.
141.
142. insert overwrite table "$APP".dwd_notification_log
143. PARTITION (dt='$do_date')
144. select
145. mid_id,
146. user_id,
147. version_code,
148. version_name,
149. lang,
150. source,
151. os,
152. area,
153. model,
154. brand,
155. sdk_version,
156. gmail,
157. height_width,
158. app_time,
159. network,
160. lng,
161. lat,
162. get_json_object(event_json,'$.kv.action') action,
163. get_json_object(event_json,'$.kv.noti_type') noti_type,
164. get_json_object(event_json,'$.kv.ap_time') ap_time,
165. get_json_object(event_json,'$.kv.content') content,
166. server_time
167. from "$APP".dwd_base_event_log
168. where dt='$do_date' and event_name='notification';
169.
170.
171. insert overwrite table "$APP".dwd_active_background_log
172. PARTITION (dt='$do_date')
173. select
174. mid_id,
175. user_id,
176. version_code,
177. version_name,
178. lang,
179. source,
180. os,
181. area,
182. model,
183. brand,
184. sdk_version,
185. gmail,
186. height_width,
187. app_time,
188. network,
189. lng,
190. lat,
191. get_json_object(event_json,'$.kv.active_source')
192. active_source,
193. server_time
194. from "$APP".dwd_base_event_log
195. where dt='$do_date' and event_name='active_background';
196.
197.
198. insert overwrite table "$APP".dwd_comment_log
199. PARTITION (dt='$do_date')
200. select
201. mid_id,
202. user_id,
203. version_code,
204. version_name,
205. lang,
206. source,
207. os,
208. area,
209. model,
210. brand,
211. sdk_version,
212. gmail,
213. height_width,
214. app_time,
215. network,
216. lng,
217. lat,
218. get_json_object(event_json,'$.kv.comment_id') comment_id,
219. get_json_object(event_json,'$.kv.userid') userid,
220. get_json_object(event_json,'$.kv.p_comment_id')
221. p_comment_id,
222. get_json_object(event_json,'$.kv.content') content,
223. get_json_object(event_json,'$.kv.addtime') addtime,
224. get_json_object(event_json,'$.kv.other_id') other_id,
225. get_json_object(event_json,'$.kv.praise_count')
226. praise_count,
227. get_json_object(event_json,'$.kv.reply_count') reply_count,
228. server_time
229. from "$APP".dwd_base_event_log
230. where dt='$do_date' and event_name='comment';
231.
232.
233. insert overwrite table "$APP".dwd_favorites_log
234. PARTITION (dt='$do_date')
235. select
236. mid_id,
237. user_id,
238. version_code,
239. version_name,
240. lang,
241. source,
242. os,
243. area,
244. model,
245. brand,
246. sdk_version,
247. gmail,
248. height_width,
249. app_time,
250. network,
251. lng,
252. lat,
253. get_json_object(event_json,'$.kv.id') id,
254. get_json_object(event_json,'$.kv.course_id') course_id,
255. get_json_object(event_json,'$.kv.userid') userid,
256. get_json_object(event_json,'$.kv.add_time') add_time,
257. server_time
258. from "$APP".dwd_base_event_log
259. where dt='$do_date' and event_name='favorites';
260.
261.
262. insert overwrite table "$APP".dwd_praise_log
263. PARTITION (dt='$do_date')
264. select
265. mid_id,
266. user_id,
267. version_code,
268. version_name,
269. lang,
270. source,
271. os,
272. area,
273. model,
274. brand,
275. sdk_version,
276. gmail,
277. height_width,
278. app_time,
279. network,
280. lng,
281. lat,
282. get_json_object(event_json,'$.kv.id') id,
283. get_json_object(event_json,'$.kv.userid') userid,
284. get_json_object(event_json,'$.kv.target_id') target_id,
285. get_json_object(event_json,'$.kv.type') type,
286. get_json_object(event_json,'$.kv.add_time') add_time,
287. server_time
288. from "$APP".dwd_base_event_log
289. where dt='$do_date' and event_name='praise';
290.
291.
292. insert overwrite table "$APP".dwd_error_log
293. PARTITION (dt='$do_date')
294. select
295. mid_id,
296. user_id,
297. version_code,
298. version_name,
299. lang,
300. source,
301. os,
302. area,
303. model,
304. brand,
305. sdk_version,
306. gmail,
307. height_width,
308. app_time,
309. network,
310. lng,
311. lat,
312. get_json_object(event_json,'$.kv.errorBrief') errorBrief,
313. get_json_object(event_json,'$.kv.errorDetail') errorDetail,
314. server_time
315. from "$APP".dwd_base_event_log
316. where dt='$do_date' and event_name='error';
317. "
318.
319. $hive -e "$sql"
320.
复制代码
2)增加脚本执行权限
1. chmod 770 ods_to_dwd_event_log.sh
3)脚本使用
1. ods_to_dwd_event_log.sh 2020-03-11
4)查询导入结果
1. select * from dwd_comment_log where dt='2020-03-11' limit 2;
结束语
本章对ODS层的用户行为数据进行了解析,构建并将数据导入到了DWD层,下章将会对ODS层的业务数据解析,导入DWD层!