一、作业要求
- 在会员分析中计算最近七天连续三天活跃会员数。
- 项目的数据采集过程中,有哪些地方能够优化,如何实现?
二、实现步骤
1. 在会员分析中计算最近七天连续三天活跃会员数
1.1 采集数据
本需求只需要启动日志,不需要业务日志,不过为了练习写 flume 的拦截器和 conf 文件,这里加了进来,之后数仓分层没有写业务数据
/**自定义拦截器**/package com.lagou.zz.flume;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONArray;import com.alibaba.fastjson.JSONException;import com.alibaba.fastjson.JSONObject;import com.google.common.base.Charsets;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.event.SimpleEvent;import org.apache.flume.interceptor.Interceptor;import org.junit.Test;import java.time.Instant;import java.time.LocalDateTime;import java.time.ZoneId;import java.time.format.DateTimeFormatter;import java.util.*;public class LogInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {// 获取 headerMap<String, String> headersMap = event.getHeaders();// 获取 bodyString eventBody = new String(event.getBody(),Charsets.UTF_8);// 根据空格分割日志String[] bodyArr = eventBody.split("\\s+");try{// 获取关键日志 jsonString jsonStr = bodyArr[6];String logTimestamp = "";JSONObject jsonObj = JSON.parseObject(jsonStr);if (headersMap.getOrDefault("logType","").equals("start")){// 获取日志时间,也就是 time 字段的值logTimestamp = jsonObj.getJSONObject("app_active").getString("time");} else if (headersMap.getOrDefault("logType","").equals("event")){// event 日志的格式和 start 不同JSONArray jsonArr = jsonObj.getJSONArray("lagou_event");if (jsonArr.size() > 0){logTimestamp = jsonArr.getJSONObject(0).getString("time");}}// 将拿到的时间戳字符串转换成 "yyyy-MM-dd" 格式long timestamp = Long.parseLong(logTimestamp);// 将时间戳转换为 Instant 实例Instant instant = Instant.ofEpochMilli(timestamp);// 获取本地集群时间LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());// 格式化时间DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");String date = formatter.format(localDateTime);// 转化后的时间放到 header 中headersMap.put("logTime",date);event.setHeaders(headersMap);} catch (JSONException e) {headersMap.put("logTime","unknown");event.setHeaders(headersMap);}return event;}@Overridepublic List<Event> intercept(List<Event> list) {List<Event> eventList = new ArrayList<>();for (Event event : list) {Event intercept = intercept(event);if (intercept != null){eventList.add(event);}}return eventList;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder{@Overridepublic Interceptor build() {return new LogInterceptor();}@Overridepublic void configure(Context context) {}}@Testpublic void startTest(){SimpleEvent event = new SimpleEvent();HashMap<String, String> map = new HashMap<>();map.put("logType","start");event.setHeaders(map);String s = "2020-08-20 11:56:08.221 [main] INFO com.lagou.ecommerce.AppStart - {\"app_active\":{\"name\":\"app_active\",\"json\":{\"entry\":\"1\",\"action\":\"1\",\"error_code\":\"0\"},\"time\":1595334358233},\"attr\":{\"area\":\"江门\",\"uid\":\"2F10092A10\",\"app_v\":\"1.1.5\",\"event_type\":\"common\",\"device_id\":\"1FB872-9A10010\",\"os_type\":\"0.60\",\"channel\":\"ZD\",\"language\":\"chinese\",\"brand\":\"Huawei-9\"}}";event.setBody(s.getBytes(Charsets.UTF_8));LogInterceptor logInterceptor = new LogInterceptor();Event event1 = logInterceptor.intercept(event);System.out.println(JSON.toJSONString(event1.getHeaders()));}@Testpublic void eventTest(){SimpleEvent event = new SimpleEvent();HashMap<String, String> map = new HashMap<>();map.put("logType","event");event.setHeaders(map);String s = "2020-08-20 12:00:58.405 [main] INFO com.lagou.ecommerce.AppEvent - {\"lagou_event\":[{\"name\":\"goods_detail_loading\",\"json\":{\"entry\":\"3\",\"goodsid\":\"0\",\"loading_time\":\"51\",\"action\":\"3\",\"staytime\":\"29\",\"showtype\":\"0\"},\"time\":1595302590721},{\"name\":\"loading\",\"json\":{\"loading_time\":\"0\",\"action\":\"1\",\"loading_type\":\"1\",\"type\":\"1\"},\"time\":1595278548405},{\"name\":\"notification\",\"json\":{\"action\":\"4\",\"type\":\"4\"},\"time\":1595306410242},{\"name\":\"ad\",\"json\":{\"duration\":\"12\",\"ad_action\":\"0\",\"shop_id\":\"0\",\"event_type\":\"ad\",\"ad_type\":\"2\",\"show_style\":\"1\",\"product_id\":\"47\",\"place\":\"placecampaign3_left\",\"sort\":\"7\"},\"time\":1595289701252},{\"name\":\"favorites\",\"json\":{\"course_id\":7,\"id\":0,\"userid\":0},\"time\":1595324348119}],\"attr\":{\"area\":\"海口\",\"uid\":\"2F10092A2\",\"app_v\":\"1.1.15\",\"event_type\":\"common\",\"device_id\":\"1FB872-9A1002\",\"os_type\":\"0.5.4\",\"channel\":\"BA\",\"language\":\"chinese\",\"brand\":\"iphone-6\"}}";event.setBody(s.getBytes(Charsets.UTF_8));LogInterceptor logInterceptor = new LogInterceptor();Event event1 = logInterceptor.intercept(event);System.out.println(JSON.toJSONString(event1.getHeaders()));}}
# 定义source channel sink名称a1.sources = r1a1.channels = c1a1.sinks = k1# 定义 source 具体内容a1.sources.r1.type = TAILDIRa1.sources.r1.positionFile = /opt/lagou/data/taildir_position.jsona1.sources.r1.filegroups = f1 f2a1.sources.r1.filegroups.f1 = /opt/lagou/data/start/.*loga1.sources.r1.headers.f1.logType = starta1.sources.r1.filegroups.f2 = /opt/lagou/data/event/.*loga1.sources.r1.headers.f2.logType = event# 定义自定义拦截器a1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = com.lagou.zz.flume.LogInterceptor$Builder# 定义 channela1.channels.c1.type = memorya1.channels.c1.capacity = 100000a1.channels.c1.transactionCapacity = 2000# 定义 sinka1.sinks.k1.type = hdfsa1.sinks.k1.hdfs.path = /user/data/logs/%{logType}/%{logTime}a1.sinks.k1.hdfs.filePrefix = %{logType}a1.sinks.k1.hdfs.fileType = DataStream# 配置文件滚动方式a1.sinks.k1.hdfs.rollSize = 33554432a1.sinks.k1.hdfs.rollCount = 0a1.sinks.k1.hdfs.rollInterval = 0a1.sinks.k1.hdfs.idleTimeout = 0a1.sinks.k1.hdfs.minBlockReplicas = 1# 向hdfs上刷新的event的个数a1.sinks.k1.hdfs.batchSize = 1000# 绑定三个组件a1.sources.r1.channels = c1a1.sinks.k1.channel = c1
# 启动 flume 同步作业,因为是本地跑,直接挂前台看日志了./bin/flume-ng agent\--conf /opt/lagou/servers/apache-flume-1.9.0-bin/conf/ \--conf-file /opt/lagou/servers/apache-flume-1.9.0-bin/conf/flume-log2hdfs.conf \-name a1\-Dflume.root.logger=INFO,console
# 启动后向监控目录传输文件cp eventlog0721.small.log ../event/cp start0721.small.log ../start/cp start0722.small.log ../start/cp start0723.small.log ../start/
1.2 ods 层数据加载
-- 创建数据仓库分层create database ods;create database dwd;create database dws;create database ads;
use ods;-- 创建 ods 层启动日志信息表,将 json 字符串存到一个字段中create external table ods.ods_start_log(`str` String)comment '用户启动日志信息'partitioned by (`dt` String)location '/user/data/logs/start';
#!/bin/bashAPP=odshive=/opt/lagou/servers/hive-2.3.7# 向 ods 表中添加日志if [ -n "$1"]thendo_date=$1elsedo_date=`date -d "-1 day" +%F`fisql="alter table "$APP".ods_start_log add partition(dt='$do_date');"hive -e "$sql"
# 执行写好的脚本sh loadstart2ods.sh 2020-07-21sh loadstart2ods.sh 2020-07-22sh loadstart2ods.sh 2020-07-23
1.3 dwd 层数据加载
# 启动日志数据格式2020-08-20 11:56:08.211 [main] INFO com.lagou.ecommerce.AppStart - {"app_active":{"name":"app_active","json":{"entry":"2","action":"0","error_code":"0"},"time":1595288248066},"attr":{"area":"三门峡","uid":"2F10092A1","app_v":"1.1.0","event_type":"common","device_id":"1FB872-9A1001","os_type":"0.97","channel":"WM","language":"chinese","brand":"xiaomi-3"}}
启动日志需要解析以下字段device_idareauidapp_vevent_typeos_typechannellanguagebrandentryactionerror_code
-- 根据需要的字段创建 dwd 层表use dwd;drop table if exists dwd.start_log;create table if not exists dwd.start_log(device_id string,area string,uid string,app_v string,event_type string,os_type string,channel string,language string,brand string,entry string,action string,error_code string)comment '启动日志宽表'partitioned by(dt string)stored as parquet;
编写解析启动日志的脚本文件
#!/bin/bashhive=/opt/lagou/servers/hive-2.3.7if [ -n "$1" ]thendo_date=$1elsedo_date=`date -d "-1 day" +%F`fisql="with tmp as(selectsplit(str,' ')[7] line,dtfrom ods.ods_start_logwhere dt='$do_date')insert overwrite table dwd.start_logpartition(dt='$do_date')selectget_json_object(line,'$.attr.device_id') device_id,get_json_object(line,'$.attr.area') area,get_json_object(line,'$.attr.uid') uid,get_json_object(line,'$.attr.app_v') app_v,get_json_object(line,'$.attr.event_type') event_type,get_json_object(line,'$.attr.os_type') os_type,get_json_object(line,'$.attr.channel') channel,get_json_object(line,'$.attr.language') language,get_json_object(line,'$.attr.brand') brand,get_json_object(line,'$.app_active.json.entry') entry,get_json_object(line,'$.app_active.json.action') action,get_json_object(line,'$.app_active.json.error_code') error_codefrom tmp"hive -e "$sql"
sh start_ods2dwd.sh 2020-07-21sh start_ods2dwd.sh 2020-07-22sh start_ods2dwd.sh 2020-07-23
1.3 dws 层加载数据
因为需求是计算最近七天连续三天活跃会员数,所以 dws 层只需要每日会员活跃信息
# 需要的字段device_iduidapp_vos_typelanguagechannelareabrand
# 创建每日会员活跃信息use dws;drop table if exists dws.dws_member_start_day;create table if not exists dws.dws_member_start_day(device_id string,uid string,app_v string,os_type string,language string,channel string,area string,brand string)COMMENT '每日会员活跃信息'partitioned by(dt string)stored as parquet;
一个会员无论一天有几条日志也只能算作一次,所以需要做分组
#!/bin/bashhive=/opt/lagou/servers/hive-2.3.7if [ -n "$1"]thendo_date=$1elsedo_date=`date -d "-1 day" +%F`fisql="insert overwrite table dws.dws_member_start_daypartition(dt='$do_date')selectdevice_id,concat_ws("|",collect_set(uid)),concat_ws('|', collect_set(app_v)),concat_ws('|', collect_set(os_type)),concat_ws('|', collect_set(language)),concat_ws('|', collect_set(channel)),concat_ws('|', collect_set(area)),concat_ws('|', collect_set(brand))from dwd.start_logwhere dt='$do_date'group by device_id;"hive -e "$sql"
1.4 ads 层加载数据
根据需求创建对应的表,只需要两个字段
use ads;drop table if exists ads.ads_consecutive3_in_last7_active_member_counts;create table if not exists ads.ads_consecutive3_in_last7_active_member_counts(counts int)comment '最近七天活跃超过连续三天会员数'partitioned by(dt string)row format delimited fields terminated by ',';
#!/bin/bashhive=/opt/lagou/servers/hive-2.3.7if [ -n "$1"]thendo_date=$1elsedo_date=`date -d "-1 day" +%F`fisql="with tmp as(select tbl.device_idfrom (selectdevice_id,dt,date_sub(dt,row_number() over(partition by device_id order by dt)) gidfrom dws.dws_member_start_daywhere dt between date_sub('$do_date',7) and '$do_date') tblgroup by tbl.device_id,tbl.gidhaving count(*) >= 3)insert overwrite table ads.ads_consecutive3_in_last7_active_member_countspartition(dt='$do_date')select count(*) from tmp;"hive -e "$sql"
以上需要注意两个问题
- date_sub() 函数就是求输入日期的前几天,所以要求某一日期的前七天,写法应该是 date_sub(current_date,7),而不是 -7,这个需要注意
- 注意需求是 最近七天活跃超过连续三天的用户数 而不是用户
运行脚本后查询 ads.ads_consecutive3_in_last7_active_member_counts 拿到结果sh start_ads_consecutive_member.sh 2020-07-23

2. 项目的数据采集过程中,有哪些地方能够优化,如何实现?
2.1 参数方面
- source
- batchSize
该参数可以控制一次读取和发送到 channel 的最大行数,一般情况下用默认的 100 就很好,如果文件比较大可以适当调整。实现方式:在 conf 文件中添加 a1.source.r1.batchSize = 200
- channel
- transactionCapacity
该参数可以控制从 source 接收或给 sink 的每个事务的最大事件数
- Kafka Channel
将 channel 类型更换为 Kafka,兼具File Channel和Memory Channel的优点。实现方式:a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
- sink
- batchSize
控制在文件刷新到HDFS之前写入文件的事件数,可以适当调整
- rollInterval
控制在滚动当前文件之前等待的秒数,数据量小的时候可以调小
2.2 数据方面
写自定义拦截器的时候,可以直接将 event 的 body 输出成需要的 json 字符串,可以减少 hive 中 split 那一步
