一、作业要求

  1. 在会员分析中计算最近七天连续三天活跃会员数。
  2. 项目的数据采集过程中,有哪些地方能够优化,如何实现?

二、实现步骤

1. 在会员分析中计算最近七天连续三天活跃会员数

1.1 采集数据

本需求只需要启动日志,不需要业务日志,不过为了练习写 flume 的拦截器和 conf 文件,这里加了进来,之后数仓分层没有写业务数据

  1. /**
  2. 自定义拦截器
  3. **/
  4. package com.lagou.zz.flume;
  5. import com.alibaba.fastjson.JSON;
  6. import com.alibaba.fastjson.JSONArray;
  7. import com.alibaba.fastjson.JSONException;
  8. import com.alibaba.fastjson.JSONObject;
  9. import com.google.common.base.Charsets;
  10. import org.apache.flume.Context;
  11. import org.apache.flume.Event;
  12. import org.apache.flume.event.SimpleEvent;
  13. import org.apache.flume.interceptor.Interceptor;
  14. import org.junit.Test;
  15. import java.time.Instant;
  16. import java.time.LocalDateTime;
  17. import java.time.ZoneId;
  18. import java.time.format.DateTimeFormatter;
  19. import java.util.*;
  20. public class LogInterceptor implements Interceptor {
  21. @Override
  22. public void initialize() {
  23. }
  24. @Override
  25. public Event intercept(Event event) {
  26. // 获取 header
  27. Map<String, String> headersMap = event.getHeaders();
  28. // 获取 body
  29. String eventBody = new String(event.getBody(),Charsets.UTF_8);
  30. // 根据空格分割日志
  31. String[] bodyArr = eventBody.split("\\s+");
  32. try{
  33. // 获取关键日志 json
  34. String jsonStr = bodyArr[6];
  35. String logTimestamp = "";
  36. JSONObject jsonObj = JSON.parseObject(jsonStr);
  37. if (headersMap.getOrDefault("logType","").equals("start")){
  38. // 获取日志时间,也就是 time 字段的值
  39. logTimestamp = jsonObj.getJSONObject("app_active").getString("time");
  40. } else if (headersMap.getOrDefault("logType","").equals("event")){
  41. // event 日志的格式和 start 不同
  42. JSONArray jsonArr = jsonObj.getJSONArray("lagou_event");
  43. if (jsonArr.size() > 0){
  44. logTimestamp = jsonArr.getJSONObject(0).getString("time");
  45. }
  46. }
  47. // 将拿到的时间戳字符串转换成 "yyyy-MM-dd" 格式
  48. long timestamp = Long.parseLong(logTimestamp);
  49. // 将时间戳转换为 Instant 实例
  50. Instant instant = Instant.ofEpochMilli(timestamp);
  51. // 获取本地集群时间
  52. LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
  53. // 格式化时间
  54. DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
  55. String date = formatter.format(localDateTime);
  56. // 转化后的时间放到 header 中
  57. headersMap.put("logTime",date);
  58. event.setHeaders(headersMap);
  59. } catch (JSONException e) {
  60. headersMap.put("logTime","unknown");
  61. event.setHeaders(headersMap);
  62. }
  63. return event;
  64. }
  65. @Override
  66. public List<Event> intercept(List<Event> list) {
  67. List<Event> eventList = new ArrayList<>();
  68. for (Event event : list) {
  69. Event intercept = intercept(event);
  70. if (intercept != null){
  71. eventList.add(event);
  72. }
  73. }
  74. return eventList;
  75. }
  76. @Override
  77. public void close() {
  78. }
  79. public static class Builder implements Interceptor.Builder{
  80. @Override
  81. public Interceptor build() {
  82. return new LogInterceptor();
  83. }
  84. @Override
  85. public void configure(Context context) {
  86. }
  87. }
  88. @Test
  89. public void startTest(){
  90. SimpleEvent event = new SimpleEvent();
  91. HashMap<String, String> map = new HashMap<>();
  92. map.put("logType","start");
  93. event.setHeaders(map);
  94. String s = "2020-08-20 11:56:08.221 [main] INFO com.lagou.ecommerce.AppStart - {\"app_active\":{\"name\":\"app_active\",\"json\":{\"entry\":\"1\",\"action\":\"1\",\"error_code\":\"0\"},\"time\":1595334358233},\"attr\":{\"area\":\"江门\",\"uid\":\"2F10092A10\",\"app_v\":\"1.1.5\",\"event_type\":\"common\",\"device_id\":\"1FB872-9A10010\",\"os_type\":\"0.60\",\"channel\":\"ZD\",\"language\":\"chinese\",\"brand\":\"Huawei-9\"}}";
  95. event.setBody(s.getBytes(Charsets.UTF_8));
  96. LogInterceptor logInterceptor = new LogInterceptor();
  97. Event event1 = logInterceptor.intercept(event);
  98. System.out.println(JSON.toJSONString(event1.getHeaders()));
  99. }
  100. @Test
  101. public void eventTest(){
  102. SimpleEvent event = new SimpleEvent();
  103. HashMap<String, String> map = new HashMap<>();
  104. map.put("logType","event");
  105. event.setHeaders(map);
  106. String s = "2020-08-20 12:00:58.405 [main] INFO com.lagou.ecommerce.AppEvent - {\"lagou_event\":[{\"name\":\"goods_detail_loading\",\"json\":{\"entry\":\"3\",\"goodsid\":\"0\",\"loading_time\":\"51\",\"action\":\"3\",\"staytime\":\"29\",\"showtype\":\"0\"},\"time\":1595302590721},{\"name\":\"loading\",\"json\":{\"loading_time\":\"0\",\"action\":\"1\",\"loading_type\":\"1\",\"type\":\"1\"},\"time\":1595278548405},{\"name\":\"notification\",\"json\":{\"action\":\"4\",\"type\":\"4\"},\"time\":1595306410242},{\"name\":\"ad\",\"json\":{\"duration\":\"12\",\"ad_action\":\"0\",\"shop_id\":\"0\",\"event_type\":\"ad\",\"ad_type\":\"2\",\"show_style\":\"1\",\"product_id\":\"47\",\"place\":\"placecampaign3_left\",\"sort\":\"7\"},\"time\":1595289701252},{\"name\":\"favorites\",\"json\":{\"course_id\":7,\"id\":0,\"userid\":0},\"time\":1595324348119}],\"attr\":{\"area\":\"海口\",\"uid\":\"2F10092A2\",\"app_v\":\"1.1.15\",\"event_type\":\"common\",\"device_id\":\"1FB872-9A1002\",\"os_type\":\"0.5.4\",\"channel\":\"BA\",\"language\":\"chinese\",\"brand\":\"iphone-6\"}}";
  107. event.setBody(s.getBytes(Charsets.UTF_8));
  108. LogInterceptor logInterceptor = new LogInterceptor();
  109. Event event1 = logInterceptor.intercept(event);
  110. System.out.println(JSON.toJSONString(event1.getHeaders()));
  111. }
  112. }
  1. # 定义source channel sink名称
  2. a1.sources = r1
  3. a1.channels = c1
  4. a1.sinks = k1
  5. # 定义 source 具体内容
  6. a1.sources.r1.type = TAILDIR
  7. a1.sources.r1.positionFile = /opt/lagou/data/taildir_position.json
  8. a1.sources.r1.filegroups = f1 f2
  9. a1.sources.r1.filegroups.f1 = /opt/lagou/data/start/.*log
  10. a1.sources.r1.headers.f1.logType = start
  11. a1.sources.r1.filegroups.f2 = /opt/lagou/data/event/.*log
  12. a1.sources.r1.headers.f2.logType = event
  13. # 定义自定义拦截器
  14. a1.sources.r1.interceptors = i1
  15. a1.sources.r1.interceptors.i1.type = com.lagou.zz.flume.LogInterceptor$Builder
  16. # 定义 channel
  17. a1.channels.c1.type = memory
  18. a1.channels.c1.capacity = 100000
  19. a1.channels.c1.transactionCapacity = 2000
  20. # 定义 sink
  21. a1.sinks.k1.type = hdfs
  22. a1.sinks.k1.hdfs.path = /user/data/logs/%{logType}/%{logTime}
  23. a1.sinks.k1.hdfs.filePrefix = %{logType}
  24. a1.sinks.k1.hdfs.fileType = DataStream
  25. # 配置文件滚动方式
  26. a1.sinks.k1.hdfs.rollSize = 33554432
  27. a1.sinks.k1.hdfs.rollCount = 0
  28. a1.sinks.k1.hdfs.rollInterval = 0
  29. a1.sinks.k1.hdfs.idleTimeout = 0
  30. a1.sinks.k1.hdfs.minBlockReplicas = 1
  31. # 向hdfs上刷新的event的个数
  32. a1.sinks.k1.hdfs.batchSize = 1000
  33. # 绑定三个组件
  34. a1.sources.r1.channels = c1
  35. a1.sinks.k1.channel = c1
  1. # 启动 flume 同步作业,因为是本地跑,直接挂前台看日志了
  2. ./bin/flume-ng agent\
  3. --conf /opt/lagou/servers/apache-flume-1.9.0-bin/conf/ \
  4. --conf-file /opt/lagou/servers/apache-flume-1.9.0-bin/conf/flume-log2hdfs.conf \
  5. -name a1\
  6. -Dflume.root.logger=INFO,console
  1. # 启动后向监控目录传输文件
  2. cp eventlog0721.small.log ../event/
  3. cp start0721.small.log ../start/
  4. cp start0722.small.log ../start/
  5. cp start0723.small.log ../start/

1.2 ods 层数据加载

  1. -- 创建数据仓库分层
  2. create database ods;
  3. create database dwd;
  4. create database dws;
  5. create database ads;
  1. use ods;
  2. -- 创建 ods 层启动日志信息表,将 json 字符串存到一个字段中
  3. create external table ods.ods_start_log(
  4. `str` String
  5. )comment '用户启动日志信息'
  6. partitioned by (`dt` String)
  7. location '/user/data/logs/start';
  1. #!/bin/bash
  2. APP=ods
  3. hive=/opt/lagou/servers/hive-2.3.7
  4. # 向 ods 表中添加日志
  5. if [ -n "$1"]
  6. then
  7. do_date=$1
  8. else
  9. do_date=`date -d "-1 day" +%F`
  10. fi
  11. sql="alter table "$APP".ods_start_log add partition(dt='$do_date');"
  12. hive -e "$sql"
  1. # 执行写好的脚本
  2. sh loadstart2ods.sh 2020-07-21
  3. sh loadstart2ods.sh 2020-07-22
  4. sh loadstart2ods.sh 2020-07-23

1.3 dwd 层数据加载

  1. # 启动日志数据格式
  2. 2020-08-20 11:56:08.211 [main] INFO com.lagou.ecommerce.AppStart - {"app_active":{"name":"app_active","json":{"entry":"2","action":"0","error_code":"0"},"time":1595288248066},"attr":{"area":"三门峡","uid":"2F10092A1","app_v":"1.1.0","event_type":"common","device_id":"1FB872-9A1001","os_type":"0.97","channel":"WM","language":"chinese","brand":"xiaomi-3"}}
  1. 启动日志需要解析以下字段
  2. device_id
  3. area
  4. uid
  5. app_v
  6. event_type
  7. os_type
  8. channel
  9. language
  10. brand
  11. entry
  12. action
  13. error_code
  1. -- 根据需要的字段创建 dwd 层表
  2. use dwd;
  3. drop table if exists dwd.start_log;
  4. create table if not exists dwd.start_log(
  5. device_id string,
  6. area string,
  7. uid string,
  8. app_v string,
  9. event_type string,
  10. os_type string,
  11. channel string,
  12. language string,
  13. brand string,
  14. entry string,
  15. action string,
  16. error_code string
  17. )comment '启动日志宽表'
  18. partitioned by(dt string)
  19. stored as parquet;

编写解析启动日志的脚本文件

  1. #!/bin/bash
  2. hive=/opt/lagou/servers/hive-2.3.7
  3. if [ -n "$1" ]
  4. then
  5. do_date=$1
  6. else
  7. do_date=`date -d "-1 day" +%F`
  8. fi
  9. sql=
  10. "
  11. with tmp as(
  12. select
  13. split(str,' ')[7] line,
  14. dt
  15. from ods.ods_start_log
  16. where dt='$do_date'
  17. )
  18. insert overwrite table dwd.start_log
  19. partition(dt='$do_date')
  20. select
  21. get_json_object(line,'$.attr.device_id') device_id,
  22. get_json_object(line,'$.attr.area') area,
  23. get_json_object(line,'$.attr.uid') uid,
  24. get_json_object(line,'$.attr.app_v') app_v,
  25. get_json_object(line,'$.attr.event_type') event_type,
  26. get_json_object(line,'$.attr.os_type') os_type,
  27. get_json_object(line,'$.attr.channel') channel,
  28. get_json_object(line,'$.attr.language') language,
  29. get_json_object(line,'$.attr.brand') brand,
  30. get_json_object(line,'$.app_active.json.entry') entry,
  31. get_json_object(line,'$.app_active.json.action') action,
  32. get_json_object(line,'$.app_active.json.error_code') error_code
  33. from tmp
  34. "
  35. hive -e "$sql"
  1. sh start_ods2dwd.sh 2020-07-21
  2. sh start_ods2dwd.sh 2020-07-22
  3. sh start_ods2dwd.sh 2020-07-23

1.3 dws 层加载数据

因为需求是计算最近七天连续三天活跃会员数,所以 dws 层只需要每日会员活跃信息

  1. # 需要的字段
  2. device_id
  3. uid
  4. app_v
  5. os_type
  6. language
  7. channel
  8. area
  9. brand
  1. # 创建每日会员活跃信息
  2. use dws;
  3. drop table if exists dws.dws_member_start_day;
  4. create table if not exists dws.dws_member_start_day(
  5. device_id string,
  6. uid string,
  7. app_v string,
  8. os_type string,
  9. language string,
  10. channel string,
  11. area string,
  12. brand string
  13. )COMMENT '每日会员活跃信息'
  14. partitioned by(dt string)
  15. stored as parquet;

一个会员无论一天有几条日志也只能算作一次,所以需要做分组

  1. #!/bin/bash
  2. hive=/opt/lagou/servers/hive-2.3.7
  3. if [ -n "$1"]
  4. then
  5. do_date=$1
  6. else
  7. do_date=`date -d "-1 day" +%F`
  8. fi
  9. sql="insert overwrite table dws.dws_member_start_day
  10. partition(dt='$do_date')
  11. select
  12. device_id,
  13. concat_ws("|",collect_set(uid)),
  14. concat_ws('|', collect_set(app_v)),
  15. concat_ws('|', collect_set(os_type)),
  16. concat_ws('|', collect_set(language)),
  17. concat_ws('|', collect_set(channel)),
  18. concat_ws('|', collect_set(area)),
  19. concat_ws('|', collect_set(brand))
  20. from dwd.start_log
  21. where dt='$do_date'
  22. group by device_id;"
  23. hive -e "$sql"

1.4 ads 层加载数据

根据需求创建对应的表,只需要两个字段

  1. use ads;
  2. drop table if exists ads.ads_consecutive3_in_last7_active_member_counts;
  3. create table if not exists ads.ads_consecutive3_in_last7_active_member_counts(
  4. counts int
  5. )comment '最近七天活跃超过连续三天会员数'
  6. partitioned by(dt string)
  7. row format delimited fields terminated by ',';
  1. #!/bin/bash
  2. hive=/opt/lagou/servers/hive-2.3.7
  3. if [ -n "$1"]
  4. then
  5. do_date=$1
  6. else
  7. do_date=`date -d "-1 day" +%F`
  8. fi
  9. sql=
  10. "with tmp as(
  11. select tbl.device_id
  12. from (
  13. select
  14. device_id,
  15. dt,
  16. date_sub(dt,row_number() over(partition by device_id order by dt)) gid
  17. from dws.dws_member_start_day
  18. where dt between date_sub('$do_date',7) and '$do_date'
  19. ) tbl
  20. group by tbl.device_id,tbl.gid
  21. having count(*) >= 3
  22. )
  23. insert overwrite table ads.ads_consecutive3_in_last7_active_member_counts
  24. partition(dt='$do_date')
  25. select count(*) from tmp;"
  26. hive -e "$sql"

以上需要注意两个问题

  1. date_sub() 函数就是求输入日期的前几天,所以要求某一日期的前七天,写法应该是 date_sub(current_date,7),而不是 -7,这个需要注意
  2. 注意需求是 最近七天活跃超过连续三天的用户数 而不是用户
    1. sh start_ads_consecutive_member.sh 2020-07-23
    运行脚本后查询 ads.ads_consecutive3_in_last7_active_member_counts 拿到结果
    第三阶段模块一(离线数仓上) - 图1

2. 项目的数据采集过程中,有哪些地方能够优化,如何实现?

2.1 参数方面

  • source
    • batchSize

该参数可以控制一次读取和发送到 channel 的最大行数,一般情况下用默认的 100 就很好,如果文件比较大可以适当调整。实现方式:在 conf 文件中添加 a1.source.r1.batchSize = 200

  • channel
    • transactionCapacity

该参数可以控制从 source 接收或给 sink 的每个事务的最大事件数

  • Kafka Channel

将 channel 类型更换为 Kafka,兼具File Channel和Memory Channel的优点。实现方式:a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel

  • sink
    • batchSize

控制在文件刷新到HDFS之前写入文件的事件数,可以适当调整

  • rollInterval

控制在滚动当前文件之前等待的秒数,数据量小的时候可以调小

2.2 数据方面

写自定义拦截器的时候,可以直接将 event 的 body 输出成需要的 json 字符串,可以减少 hive 中 split 那一步