梳理

数据仓库分层

用户行为数仓(ODS和DWD) - 图1

数据库和表梳理

image.png

任务脚本梳理

image.png

ODS层(原始数据)

ODS层开发

用户行为数据相关表

表名 解释
ods_user_active 用户主动活跃表,打开APP(act=1)
ods_click_good 点击商品表(act=2)
ods_good_item 商品详情页表(act=3)
ods_good_list 商品列表页表(act=4)
ods_app_close APP崩溃数据表(act=5)

注意: 1:建议在创建和使用表的时候,在表名前面加上数据库的名称:数据库名称.表名称 2:考虑到SQL重跑的情, 需要在SQL语句中添加if not exists 3:string、date、timestamp,建议使用string对日期格式进行统一

image.png

  1. create database ods_mall;

ods_user_active(打开APP)

  1. create external table if not exists ods_mall.ods_user_active(
  2. log string
  3. )partitioned by (dt string)
  4. row format delimited
  5. fields terminated by '\t'
  6. location 'hdfs://bigdata1:9000/data/ods/user_action/';
  7. --分区的location使用相对路径,省略了前面的hdfs://bigdata1:9000/data/ods/user_action/
  8. alter table ods_mall.ods_user_active add if not exists partition(dt='20220211') location '20220211/1';

image.png
image.png

ods_click_good(点击商品)

  1. create external table if not exists ods_mall.ods_click_good(
  2. log string
  3. )partitioned by (dt string)
  4. row format delimited
  5. fields terminated by '\t'
  6. location 'hdfs://bigdata1:9000/data/ods/user_action/';
  7. alter table ods_mall.ods_click_good add if not exists partition(dt='20220211') location '20220211/2';

image.png

ods_good_item(商品详情页)

  1. create external table if not exists ods_mall.ods_good_item(
  2. log string
  3. )partitioned by (dt string)
  4. row format delimited
  5. fields terminated by '\t'
  6. location 'hdfs://bigdata1:9000/data/ods/user_action/';
  7. alter table ods_mall.ods_good_item add if not exists partition(dt='20220211') location '20220211/3';

ods_good_list(商品列表页)

  1. create external table if not exists ods_mall.ods_good_list(
  2. log string
  3. )partitioned by (dt string)
  4. row format delimited
  5. fields terminated by '\t'
  6. location 'hdfs://bigdata1:9000/data/ods/user_action/';
  7. alter table ods_mall.ods_good_list add if not exists partition(dt='20220211') location '20220211/4';

ods_app_close(关闭APP)

  1. create external table if not exists ods_mall.ods_app_close(
  2. log string
  3. )partitioned by (dt string)
  4. row format delimited
  5. fields terminated by '\t'
  6. location 'hdfs://bigdata1:9000/data/ods/user_action/';
  7. alter table ods_mall.ods_app_close add if not exists partition(dt='20220211') location '20220211/5';

ODS层脚本抽取

针对ods层抽取脚本

ods_mall_init_table.sh

表初始化脚本(初始化执行一次)

  1. #!/bin/bash
  2. # ods层数据库和表初始化脚本,只需要执行一次
  3. hive -e "
  4. create database if not exists ods_mall;
  5. create external table if not exists ods_mall.ods_user_active(
  6. log string
  7. )partitioned by (dt string)
  8. row format delimited
  9. fields terminated by '\t'
  10. location 'hdfs://bigdata1:9000/data/ods/user_action/';
  11. create external table if not exists ods_mall.ods_click_good(
  12. log string
  13. )partitioned by (dt string)
  14. row format delimited
  15. fields terminated by '\t'
  16. location 'hdfs://bigdata1:9000/data/ods/user_action/';
  17. create external table if not exists ods_mall.ods_good_item(
  18. log string
  19. )partitioned by (dt string)
  20. row format delimited
  21. fields terminated by '\t'
  22. location 'hdfs://bigdata1:9000/data/ods/user_action/';
  23. create external table if not exists ods_mall.ods_good_list(
  24. log string
  25. )partitioned by (dt string)
  26. row format delimited
  27. fields terminated by '\t'
  28. location 'hdfs://bigdata1:9000/data/ods/user_action/';
  29. create external table if not exists ods_mall.ods_app_close(
  30. log string
  31. )partitioned by (dt string)
  32. row format delimited
  33. fields terminated by '\t'
  34. location 'hdfs://bigdata1:9000/data/ods/user_action/';
  35. "

add_partition.sh

添加区分数据脚本(每天执行一次)
这个脚本是通用的,所有添加分区的地方都可以使用

  1. #!/bin/bash
  2. # 给外部分区表添加分区
  3. # 接收三个参数
  4. #1:表名
  5. #2:分区字段dt的值:格式20260101
  6. #3:分区路径(相对路径或者绝对路径都可以)
  7. if [ $# != 3 ]
  8. then
  9. echo "参数异常:add_partition.sh <tabkle_name> <dt> <path>"
  10. exit 100
  11. fi
  12. table_name=$1
  13. dt=$2
  14. path=$3
  15. hive -e "
  16. alter table ${table_name} add if not exists partition(dt='${dt}') location '${path}';
  17. "

ods_mall_add_partition.sh

给ods层的表添加分区,这个脚本后期每天执行一次

  1. #!/bin/bash
  2. # 给ods层的表添加分区,这个脚本后期每天执行一次
  3. # 每天凌晨,添加昨天的分区,添加完分区之后,再执行后面的计算脚本
  4. # 默认获取昨天的日期,也支持传参指定一个日期
  5. if [ "z$1" = "z" ]
  6. then
  7. dt=`date +%Y%m%d --date="1 days ago"`
  8. else
  9. dt=$1
  10. fi
  11. #alter table ods_mall.ods_user_active add if not exists partition(dt='20260101') location '20260101/1';
  12. #alter table ods_mall.ods_click_good add if not exists partition(dt='20260101') location '20260101/2';
  13. #alter table ods_mall.ods_good_item add if not exists partition(dt='20260101') location '20260101/3';
  14. #alter table ods_mall.ods_good_list add if not exists partition(dt='20260101') location '20260101/4';
  15. #alter table ods_mall.ods_app_close add if not exists partition(dt='20260101') location '20260101/5';
  16. sh add_partition.sh ods_mall.ods_user_active ${dt} ${dt}/1
  17. sh add_partition.sh ods_mall.ods_click_good ${dt} ${dt}/2
  18. sh add_partition.sh ods_mall.ods_good_item ${dt} ${dt}/3
  19. sh add_partition.sh ods_mall.ods_good_list ${dt} ${dt}/4
  20. sh add_partition.sh ods_mall.ods_app_close ${dt} ${dt}/5

创建文件夹mkdir warehouse_shell_user_action,上传脚本

  1. cd /data/soft
  2. mkdir warehouse_shell_user_action

测试:
1、删除数据仓库ods_mall

  1. drop table ods_app_close;
  2. drop table ods_click_good;
  3. drop table ods_good_item;
  4. drop table ods_good_list;
  5. drop table ods_user_active;
  6. drop database ods_mall;

2、执行脚本

  1. sh ods_mall_init_table.sh
  2. sh ods_mall_add_partition.sh 20220211

image.png

DWD层(明细数据)

DWD层开发

  1. create database dwd_mall;

dwd_user_active

  1. create external table if not exists dwd_mall.dwd_user_active(
  2. user_id bigint,
  3. xaid string,
  4. platform tinyint,
  5. ver string,
  6. vercode string,
  7. net bigint,
  8. brand string,
  9. model string,
  10. display string,
  11. osver string,
  12. acttime bigint,
  13. ad_status tinyint,
  14. loading_time bigint
  15. )partitioned by(dt string)
  16. row format delimited
  17. fields terminated by '\t'
  18. location 'hdfs://bigdata1:9000/data/dwd/user_active/';
  1. insert overwrite table dwd_mall.dwd_user_active partition(dt='20220211') select
  2. get_json_object(log,'$.uid') as user_id,
  3. get_json_object(log,'$.xaid') as xaid,
  4. get_json_object(log,'$.platform') as platform,
  5. get_json_object(log,'$.ver') as ver,
  6. get_json_object(log,'$.vercode') as vercode,
  7. get_json_object(log,'$.net') as net,
  8. get_json_object(log,'$.brand') as brand,
  9. get_json_object(log,'$.model') as model,
  10. get_json_object(log,'$.display') as display,
  11. get_json_object(log,'$.osver') as osver,
  12. get_json_object(log,'$.acttime') as acttime,
  13. get_json_object(log,'$.ad_status') as ad_status,
  14. get_json_object(log,'$.loading_time') as loading_time
  15. from
  16. (
  17. select log from ods_mall.ods_user_active where dt = '20220211' group by log
  18. ) as tmp
  19. where get_json_object(log,'$.xaid') !='';

使用insert overwrite而不是insert into,重复执行时覆盖旧数据 使用group by log对重复的采集的log去重,效率比distinct高 where get_json_object(log,’$.xaid’) !=’’; 过滤掉PC端,只统计APP端

image.png

dwd_click_good

  1. create external table if not exists dwd_mall.dwd_click_good(
  2. user_id bigint,
  3. xaid string,
  4. platform tinyint,
  5. ver string,
  6. vercode string,
  7. net bigint,
  8. brand string,
  9. model string,
  10. display string,
  11. osver string,
  12. acttime bigint,
  13. goods_id bigint,
  14. location tinyint
  15. )partitioned by(dt string)
  16. row format delimited
  17. fields terminated by '\t'
  18. location 'hdfs://bigdata1:9000/data/dwd/click_good/';
  1. insert overwrite table dwd_mall.dwd_click_good partition(dt='20220211') select
  2. get_json_object(log,'$.uid') as user_id,
  3. get_json_object(log,'$.xaid') as xaid,
  4. get_json_object(log,'$.platform') as platform,
  5. get_json_object(log,'$.ver') as ver,
  6. get_json_object(log,'$.vercode') as vercode,
  7. get_json_object(log,'$.net') as net,
  8. get_json_object(log,'$.brand') as brand,
  9. get_json_object(log,'$.model') as model,
  10. get_json_object(log,'$.display') as display,
  11. get_json_object(log,'$.osver') as osver,
  12. get_json_object(log,'$.acttime') as acttime,
  13. get_json_object(log,'$.goods_id') as goods_id,
  14. get_json_object(log,'$.location') as location
  15. from
  16. (
  17. select log from ods_mall.ods_click_good where dt = '20220211' group by log
  18. ) as tmp
  19. where get_json_object(log,'$.xaid') !='';

dwd_good_item

  1. create external table if not exists dwd_mall.dwd_good_item(
  2. user_id bigint,
  3. xaid string,
  4. platform tinyint,
  5. ver string,
  6. vercode string,
  7. net bigint,
  8. brand string,
  9. model string,
  10. display string,
  11. osver string,
  12. acttime bigint,
  13. goods_id bigint,
  14. stay_time bigint,
  15. loading_time bigint
  16. )partitioned by(dt string)
  17. row format delimited
  18. fields terminated by '\t'
  19. location 'hdfs://bigdata1:9000/data/dwd/good_item/';
  1. insert overwrite table dwd_mall.dwd_good_item partition(dt='20220211') select
  2. get_json_object(log,'$.uid') as user_id,
  3. get_json_object(log,'$.xaid') as xaid,
  4. get_json_object(log,'$.platform') as platform,
  5. get_json_object(log,'$.ver') as ver,
  6. get_json_object(log,'$.vercode') as vercode,
  7. get_json_object(log,'$.net') as net,
  8. get_json_object(log,'$.brand') as brand,
  9. get_json_object(log,'$.model') as model,
  10. get_json_object(log,'$.display') as display,
  11. get_json_object(log,'$.osver') as osver,
  12. get_json_object(log,'$.acttime') as acttime,
  13. get_json_object(log,'$.goods_id') as goods_id,
  14. get_json_object(log,'$.stay_time') as stay_time,
  15. get_json_object(log,'$.loading_time') as loading_time
  16. from
  17. (
  18. select log from ods_mall.ods_good_item where dt = '20220211' group by log
  19. ) as tmp
  20. where get_json_object(log,'$.xaid') !='';

dwd_good_list

  1. create external table if not exists dwd_mall.dwd_good_list(
  2. user_id bigint,
  3. xaid string,
  4. platform tinyint,
  5. ver string,
  6. vercode string,
  7. net bigint,
  8. brand string,
  9. model string,
  10. display string,
  11. osver string,
  12. acttime bigint,
  13. loading_time bigint,
  14. loading_type tinyint,
  15. goods_num tinyint
  16. )partitioned by(dt string)
  17. row format delimited
  18. fields terminated by '\t'
  19. location 'hdfs://bigdata1:9000/data/dwd/good_list/';
  1. insert overwrite table dwd_mall.dwd_good_list partition(dt='20220211') select
  2. get_json_object(log,'$.uid') as user_id,
  3. get_json_object(log,'$.xaid') as xaid,
  4. get_json_object(log,'$.platform') as platform,
  5. get_json_object(log,'$.ver') as ver,
  6. get_json_object(log,'$.vercode') as vercode,
  7. get_json_object(log,'$.net') as net,
  8. get_json_object(log,'$.brand') as brand,
  9. get_json_object(log,'$.model') as model,
  10. get_json_object(log,'$.display') as display,
  11. get_json_object(log,'$.osver') as osver,
  12. get_json_object(log,'$.acttime') as acttime,
  13. get_json_object(log,'$.loading_time') as loading_time,
  14. get_json_object(log,'$.loading_type') as loading_type,
  15. get_json_object(log,'$.goods_num') as goods_num
  16. from
  17. (
  18. select log from ods_mall.ods_good_list where dt = '20220211' group by log
  19. ) as tmp
  20. where get_json_object(log,'$.xaid') !='';

dwd_app_close

create external table if not exists dwd_mall.dwd_app_close(
    user_id    bigint,
    xaid    string,
    platform    tinyint,
    ver    string,
    vercode    string,
    net    bigint,
    brand    string,
    model    string,
    display    string,
    osver    string,
    acttime    bigint
)partitioned by(dt string) 
 row format delimited  
 fields terminated by '\t'
 location 'hdfs://bigdata1:9000/data/dwd/app_close/';
insert overwrite table dwd_mall.dwd_app_close partition(dt='20220211') select 
get_json_object(log,'$.uid') as user_id,
get_json_object(log,'$.xaid') as xaid,
get_json_object(log,'$.platform') as platform,
get_json_object(log,'$.ver') as ver,
get_json_object(log,'$.vercode') as vercode,
get_json_object(log,'$.net') as net,
get_json_object(log,'$.brand') as brand,
get_json_object(log,'$.model') as model,
get_json_object(log,'$.display') as display,
get_json_object(log,'$.osver') as osver,
get_json_object(log,'$.acttime') as acttime
from
(
select log from ods_mall.ods_app_close where dt = '20220211' group by log
) as tmp
where get_json_object(log,'$.xaid') !='';

DWD层脚本抽取

针对dwd层抽取脚本

dwd_mall_init_table.sh

表初始化脚本(初始化执行一次)

#!/bin/bash
# dwd层数据库和表初始化脚本,只需要执行一次即可

hive -e "
create database if not exists dwd_mall;

create external table if not exists dwd_mall.dwd_user_active(
    user_id    bigint,
    xaid    string,
    platform    tinyint,
    ver    string,
    vercode    string,
    net    bigint,
    brand    string,
    model    string,
    display    string,
    osver    string,
    acttime    bigint,
    ad_status    tinyint,
    loading_time    bigint
)partitioned by(dt string) 
 row format delimited  
 fields terminated by '\t'
 location 'hdfs://bigdata1:9000/data/dwd/user_active/';




create external table if not exists dwd_mall.dwd_click_good(
    user_id    bigint,
    xaid    string,
    platform    tinyint,
    ver    string,
    vercode    string,
    net    bigint,
    brand    string,
    model    string,
    display    string,
    osver    string,
    acttime    bigint,
    goods_id    bigint,
    location    tinyint
)partitioned by(dt string) 
 row format delimited  
 fields terminated by '\t'
 location 'hdfs://bigdata1:9000/data/dwd/click_good/';



create external table if not exists dwd_mall.dwd_good_item(
    user_id    bigint,
    xaid    string,
    platform    tinyint,
    ver    string,
    vercode    string,
    net    bigint,
    brand    string,
    model    string,
    display    string,
    osver    string,
    acttime    bigint,
    goods_id    bigint,
    stay_time    bigint,
    loading_time    bigint
)partitioned by(dt string) 
 row format delimited  
 fields terminated by '\t'
 location 'hdfs://bigdata1:9000/data/dwd/good_item/';



create external table if not exists dwd_mall.dwd_good_list(
    user_id    bigint,
    xaid    string,
    platform    tinyint,
    ver    string,
    vercode    string,
    net    bigint,
    brand    string,
    model    string,
    display    string,
    osver    string,
    acttime    bigint,
    loading_time    bigint,
    loading_type    tinyint,
    goods_num    tinyint
)partitioned by(dt string) 
 row format delimited  
 fields terminated by '\t'
 location 'hdfs://bigdata1:9000/data/dwd/good_list/';


create external table if not exists dwd_mall.dwd_app_close(
    user_id    bigint,
    xaid    string,
    platform    tinyint,
    ver    string,
    vercode    string,
    net    bigint,
    brand    string,
    model    string,
    display    string,
    osver    string,
    acttime    bigint
)partitioned by(dt string) 
 row format delimited  
 fields terminated by '\t'
 location 'hdfs://bigdata1:9000/data/dwd/app_close/';
"

dwd_mall_add_partition.sh

添加区分数据脚本(每天执行一次)

#!/bin/bash
# 基于ods层的表进行清洗,将清洗之后的数据添加到dwd层对应表的对应分区中
# 每天凌晨执行一次

# 默认获取昨天的日期,也支持传参指定一个日期
if [ "z$1" = "z" ]
then
dt=`date +%Y%m%d --date="1 days ago"`
else
dt=$1
fi

hive -e "
insert overwrite table dwd_mall.dwd_user_active partition(dt='${dt}')  select
get_json_object(log,'$.uid') as user_id,
get_json_object(log,'$.xaid') as xaid,
get_json_object(log,'$.platform') as platform,
get_json_object(log,'$.ver') as ver,
get_json_object(log,'$.vercode') as vercode,
get_json_object(log,'$.net') as net,
get_json_object(log,'$.brand') as brand,
get_json_object(log,'$.model') as model,
get_json_object(log,'$.display') as display,
get_json_object(log,'$.osver') as osver,
get_json_object(log,'$.acttime') as acttime,
get_json_object(log,'$.ad_status') as ad_status,
get_json_object(log,'$.loading_time') as loading_time
from 
(
select log from ods_mall.ods_user_active where dt = '${dt}' group by log
) as tmp
where get_json_object(log,'$.xaid') !='';



insert overwrite table dwd_mall.dwd_click_good partition(dt='${dt}')  select 
get_json_object(log,'$.uid') as user_id,
get_json_object(log,'$.xaid') as xaid,
get_json_object(log,'$.platform') as platform,
get_json_object(log,'$.ver') as ver,
get_json_object(log,'$.vercode') as vercode,
get_json_object(log,'$.net') as net,
get_json_object(log,'$.brand') as brand,
get_json_object(log,'$.model') as model,
get_json_object(log,'$.display') as display,
get_json_object(log,'$.osver') as osver,
get_json_object(log,'$.acttime') as acttime,
get_json_object(log,'$.goods_id') as goods_id,
get_json_object(log,'$.location') as location
from
(
select log from ods_mall.ods_click_good where dt = '${dt}' group by log
) as tmp
where get_json_object(log,'$.xaid') !='';


insert overwrite table dwd_mall.dwd_good_item partition(dt='${dt}') select 
get_json_object(log,'$.uid') as user_id,
get_json_object(log,'$.xaid') as xaid,
get_json_object(log,'$.platform') as platform,
get_json_object(log,'$.ver') as ver,
get_json_object(log,'$.vercode') as vercode,
get_json_object(log,'$.net') as net,
get_json_object(log,'$.brand') as brand,
get_json_object(log,'$.model') as model,
get_json_object(log,'$.display') as display,
get_json_object(log,'$.osver') as osver,
get_json_object(log,'$.acttime') as acttime,
get_json_object(log,'$.goods_id') as goods_id,
get_json_object(log,'$.stay_time') as stay_time,
get_json_object(log,'$.loading_time') as loading_time
from
(
select log from ods_mall.ods_good_item where dt = '${dt}' group by log
) as tmp
where get_json_object(log,'$.xaid') !='';



insert overwrite table dwd_mall.dwd_good_list partition(dt='${dt}') select 
get_json_object(log,'$.uid') as user_id,
get_json_object(log,'$.xaid') as xaid,
get_json_object(log,'$.platform') as platform,
get_json_object(log,'$.ver') as ver,
get_json_object(log,'$.vercode') as vercode,
get_json_object(log,'$.net') as net,
get_json_object(log,'$.brand') as brand,
get_json_object(log,'$.model') as model,
get_json_object(log,'$.display') as display,
get_json_object(log,'$.osver') as osver,
get_json_object(log,'$.acttime') as acttime,
get_json_object(log,'$.loading_time') as loading_time,
get_json_object(log,'$.loading_type') as loading_type,
get_json_object(log,'$.goods_num') as goods_num
from
(
select log from ods_mall.ods_good_list where dt = '${dt}' group by log
) as tmp
where get_json_object(log,'$.xaid') !='';


insert overwrite table dwd_mall.dwd_app_close partition(dt='${dt}') select 
get_json_object(log,'$.uid') as user_id,
get_json_object(log,'$.xaid') as xaid,
get_json_object(log,'$.platform') as platform,
get_json_object(log,'$.ver') as ver,
get_json_object(log,'$.vercode') as vercode,
get_json_object(log,'$.net') as net,
get_json_object(log,'$.brand') as brand,
get_json_object(log,'$.model') as model,
get_json_object(log,'$.display') as display,
get_json_object(log,'$.osver') as osver,
get_json_object(log,'$.acttime') as acttime
from
(
select log from ods_mall.ods_app_close where dt = '${dt}' group by log
) as tmp
where get_json_object(log,'$.xaid') !='';
"

测试
1、删除表和数据库

drop table dwd_user_active;

drop schema dwd_mall;

2、执行脚本

sh dwd_mall_init_table.sh

sh dwd_mall_add_partition.sh 20220211

image.png