一、关于数据的问题
为什么每次都是2022-05-09 ?
因为真实的情况应该是每天都要从mysql中,埋点数据中,以及第三方的日志中获取新数据。
ods层应该每天都会有新的分区数据产生,因为我们没有,只采集了一次,所以我们就老拿这一天的数据。
每天都要重复我们项目前几天的工作。所以一般开发一次,将这些内容编写成脚本直接运行即可。
想到azkaban可以定时每天几点执行,执行任务,我们有几个任务?
任务一:从其他三个数据来源中采集数据到ods层,任务二:从ods层处理数据到dwd层,任务三:dwd-dws
任务四:dws->ads
编写ods层的命令:
里面是使用sqoop命令实现增量导入各个表数据(参照第一天的sqoopJob.sh)生成我们的ods.sh
编写dwd.sh:
#!/bin/bash
/usr/local/hive/bin/hive \
-f dwd.hql
将我们前几天写的dwd的sql语句全部粘贴过来,变成了 dwd.hql
就是将我们第二天的SQL语句全部粘贴到了dwd.hql里面,当然这个里面可以传参。
编写:dws.sh
#!/bin/bash
/usr/local/hive/bin/hive \
-f dws.hql
注意:如果在shell脚本中,使用到了某个命令一定要是 全路径,不要认为配置了环境变量就万事大吉。
dws.hql 语句就是我们第三天编写的sql语句,全部粘贴进去了,当然日期可以传参数。
编写:ads.sh
#!/bin/bash
/usr/local/hive-3.1.2/bin/hive \
-f ads.hql
ads.hql语句就是昨天编写的课堂上的语句,汇总在一起的东西。
现在就有4个脚本了,ods.sh,dwd.sh,dws.sh,ads.sh 四个脚本(shell编程)
编写一个azkaban的任务:
nodes:
- name: ads
type: command
config:
command: sh ads.sh
dependsOn:
- dws
- name: dws
type: command
config:
command: sh dws.sh
dependsOn:
- dwd
- name: dwd
type: command
config:
command: sh dwd.sh
dependsOn:
- ods
- name: ods
type: command
config:
command: sh ods.sh
接着编写project, 打包上传至azkaban,即可,需要设置每天几点执行(定时任务),设置邮箱提醒。
二、复习SuperSet
如何启动/停止 superset:
先进入superset环境:
conda activate superset
接着在环境中,执行这个命令:
gunicorn -w 1 -t 120 -b bigdata01:8787 "superset.app:create_app()"
–workers:指定进程个数
–timeout:worker进程超时时间,超时会自动重启
–bind:绑定本机地址,即为Superset访问地址
–daemon:后台运行
如何停止superset?
停止superset
停掉gunicorn进程:ps -ef | awk '/gunicorn/ && !/awk/{print $2}' | xargs kill -9
退出superset环境:conda deactivate
三、DataX(类上sqoop)
1、安装
1)将DataX上传至/home/soft下,解压至/usr/local/
tar -xvf datax.tar.gz -C /usr/local/
2)复制hive中mysql驱动至 /usr/local/datax/lib下
cp /usr/local/hive/lib/mysql-connector-java-8.0.26.jar /usr/local/datax/lib/
3)创建Hive表,将mysql的表数据抽取到hive表中,需要编写json文件
报错:
配置信息错误,您提供的配置文件[/usr/local/datax/plugin/reader/._drdsreader/plugin.json]不存在. 请检查您的配置文件
解决办法:
需要删除隐藏文件 (重要)
rm -rf /usr/local/datax/plugin/*/._*
2、DataX的相关的概念
https://github.com/alibaba/DataX
github 中文中有一个叫做码云的网站:
https://gitee.com/
DataX 是阿里云 DataWorks数据集成 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 等各种异构数据源之间高效的数据同步功能。
3、使用:
mysql的数据导入hive:
导入到mysql中。
接着将area_code中的数据导入ods_nshop中。(mysql->Hive)
首先需要在hive中创建表:
create table if not exists ods_nshop.ods_01_base_area (
id int COMMENT 'id标识',
area_code string COMMENT '省份编码',
province_name string COMMENT '省份名称',
iso string COMMENT 'ISO编码'
)row format delimited fields terminated by ','
stored as TextFile
location '/data/nshop/ods/ods_01_base_area/'
使用datax将mysql的数据导入到hive:
需要编写datax的脚本,json格式的:,在datax 中的job 文件夹下,创建 01.json
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "123456",
"column": [
"id",
"area_code",
"province_name",
"iso"
],
"splitPk": "id",
"connection": [
{
"table": [
"base_area"
],
"jdbcUrl": [
"jdbc:mysql://192.168.32.100:3306/nshop"
]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://bigdata01:9820",
"fileType": "text",
"path": "/data/nshop/ods/ods_01_base_area/",
"fileName": "base_area_txt",
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "area_code",
"type": "string"
},
{
"name": "province_name",
"type": "string"
},
{
"name": "iso",
"type": "string"
}
],
"writeMode": "append",
"fieldDelimiter": ","
}
}
}
]
}
}
因为每次执行datax.py 这个命令的时候都要写路径,很麻烦,可以配置datax的环境变量:
export DATAX_HOME=/usr/local/datax
export PATH=$PATH:$DATAX_HOME/bin
记得刷新环境变量:
source /etc/profile
继续运行我们的脚本:
datax.py job/01.json
一定要注意数据库的连接和账户密码是否正确!!!!
导入结束看一下数据是否正确:
四、DataX详细讲解
1、玩一下自带的案例 job.json 就是从控制台到控制台
每一个TaskGroup 里面有5个并行的任务。所以我们的Datax脚本在运行的时候,是多线程同时进行的,速度很快。
DataX是基于内存的数据转换工具。Sqoop是基于磁盘的(MapTask),DataX的执行效率要远远高于Sqoop。
我们的DataX可以完成哪些类型的数据转换?
Stream(控制台) --> Stream
mysql --> Hdfs
hdfs --> Mysql
hive --> mysql
mysql --> Hive
调优手段:
两个地方:
1、增加并发度
2、调整jvm内存大小
关于配置文件,有两处,一个是核心配置文件 datax的conf下的core.json
修改次数就是全局修改,一般我们不修改这个地方。
我们一般都会在自己的json文件开头的部分,设置并发度:
在运行json的时候,可以指定jvm内存大小:
datax.py --jvm="-Xms3G -Xmx3G" ../job/test.json
五、使用SuperSet展示图表
1、根据用户所在地区展示全国的访问量:
create external table if not exists ads_nshop.ads_nshop_customer(
customer_gender string COMMENT '性别:1男 0女',
os string comment '手机系统',
customer_natives string COMMENT '所在地区ISO编码',
user_view_count int comment '每个用户浏览次数'
) partitioned by (bdp_day string)
row format delimited fields terminated by ','
stored as TextFile
location '/data/nshop/ads/operation/ads_nshop_customer/';
将之前的两个表关联,查询的数据插入到新表中:
insert overwrite table ads_nshop.ads_nshop_customer partition(bdp_day='20220509')
select
b.customer_gender,
a.os,
c.iso,
a.view_count
from dws_nshop.dws_nshop_ulog_view a
join ods_nshop.ods_02_customer b
on a.user_id=b.customer_id
join ods_nshop.ods_01_base_area c
on b.customer_natives=c.area_code
where a.bdp_day='20220509';
将统计的结果导出到mysql中(datax)
现在mysql中创建一个表:
CREATE TABLE `ads_nshop_customer` (
`customer_gender` tinyint(4) DEFAULT NULL,
`os` varchar(255) DEFAULT NULL,
`iso` varchar(255) DEFAULT NULL,
`user_view_count` int(11) DEFAULT NULL
)
接着编写02.json
{
"job": {
"setting": {
"speed": {
"channel": 3
}
},
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"path": "/data/nshop/ads/operation/ads_nshop_customer/bdp_day=${dt}/*",
"defaultFS": "hdfs://bigdata01:9820",
"column": [
{
"index": 0,
"type": "string"
},
{
"index": 1,
"type": "string"
},
{
"index": 2,
"type": "string"
},
{
"index": 3,
"type": "long"
}
],
"fileType": "text",
"encoding": "UTF-8",
"fieldDelimiter": ","
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "123456",
"column": [
"customer_gender",
"os",
"iso",
"user_view_count"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://bigdata01:3306/nshop",
"table": [
"ads_nshop_customer"
]
}
]
}
}
}
]
}
}
运行该脚本完成数据的导出:
datax.py job/02.json -p "-Ddt=20220509"
接着继续superSet的展示:
mysql://root:123456@bigdata01/nshop?charset=utf8
2、平台浏览统计
先在mysql中创建数据库:
CREATE TABLE `ads_nshop_flowpu_stat` (
`uv` int DEFAULT NULL,
`pv` int DEFAULT NULL,
`pv_avg` double DEFAULT NULL
)
接着将ads层统计的数据导出mysql:
{
"job": {
"setting": {
"speed": {
"channel": 3
}
},
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"path": "/data/nshop/ads/operation/ads_nshop_flow/bdp_day=${dt}/*",
"defaultFS": "hdfs://bigdata01:9820",
"column": [
{
"index": 0,
"type": "Long"
},
{
"index": 1,
"type": "Long"
},
{
"index": 2,
"type": "Double"
}
],
"fileType": "text",
"encoding": "UTF-8",
"fieldDelimiter": ","
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "123456",
"column": [
"uv",
"pv",
"pv_avg"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://bigdata01:3306/nshop",
"table": [
"ads_nshop_flowpu_stat"
]
}
]
}
}
}
]
}
}
执行job任务:
datax.py 03.json -p "-Ddt=20220509"
3、平台搜索热词统计
创建mysql的表:
CREATE TABLE `ads_nshop_search_keys` (
`search_keys` varchar(255) DEFAULT NULL,
`gender` varchar(255) DEFAULT NULL,
`age_range` varchar(255) DEFAULT NULL,
`os` varchar(255) DEFAULT NULL,
`manufacturer` varchar(255) DEFAULT NULL,
`area_code` varchar(255) DEFAULT NULL,
`search_users` int DEFAULT NULL,
`search_records` int DEFAULT NULL,
`search_orders` varchar(255) DEFAULT NULL,
`search_targets` int DEFAULT NULL
)
从ads_nshop的数据库中导出数据到mysql:
{
"job": {
"setting": {
"speed": {
"channel": 3
}
},
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"path": "/data/nshop/ads/operation/ads_nshop_search_keys/bdp_day=${dt}/*",
"defaultFS": "hdfs://bigdata01:9820",
"column": [
{
"index": 0,
"type": "string"
},
{
"index": 1,
"type": "string"
},
{
"index": 2,
"type": "string"
},
{
"index": 3,
"type": "string"
},
{
"index": 4,
"type": "string"
},
{
"index": 5,
"type": "string"
},
{
"index": 6,
"type": "long"
},
{
"index": 7,
"type": "long"
},
{
"index": 8,
"type": "string"
},
{
"index": 9,
"type": "long"
}
],
"fileType": "text",
"encoding": "UTF-8",
"fieldDelimiter": ","
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "123456",
"column": [
"search_keys",
"gender",
"age_range",
"os",
"manufacturer",
"area_code",
"search_users",
"search_records",
"search_orders",
"search_targets"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://bigdata01:3306/nshop",
"table": [
"ads_nshop_search_keys"
]
}
]
}
}
}
]
}
}
datax.py 04.json -p "-Ddt=20220509"
我们发现热词统计出来是一个个的编号,所以展示不好看,所以需要特殊处理一下。
在mysql中导入一个表:
导入进去之后,创建一个新的表:
CREATE TABLE `ads_nshop_search_keys_2` (
`search_keys` varchar(255) DEFAULT NULL,
`gender` varchar(255) DEFAULT NULL,
`age_range` varchar(255) DEFAULT NULL,
`os` varchar(255) DEFAULT NULL,
`manufacturer` varchar(255) DEFAULT NULL,
`area_code` varchar(255) DEFAULT NULL,
`search_users` int DEFAULT NULL,
`search_records` int DEFAULT NULL,
`search_orders` varchar(255) DEFAULT NULL,
`search_targets` int DEFAULT NULL
)
关联shop_code 以及 ads_nshop_search_keys 将关联好的数据插入到新的表中,将来使用superset展示新的表数据即可:
insert into ads_nshop_search_keys_2(search_keys ,gender,age_range,os,manufacturer,area_code,search_users,search_records,search_orders,search_targets)
SELECT name as search_keys ,gender,age_range,os,manufacturer,area_code,search_users,search_records,search_orders,search_targets FROM ads_nshop_search_keys JOIN shop_code ON search_keys=id
5、总体运营指标统计
insert overwrite table ads_nshop.ads_nshop_oper_stat partition(bdp_day='20220509')
select
a.customer_gender,
a.customer_age_range,
e.province_name,
c.category_code,
count(distinct b.order_id) ,
count(distinct b.order_id) / sum(d.view_count),
sum(b.payment_money),
sum(b.district_money),
sum(b.shipping_money),
sum(b.payment_money) / count(distinct b.customer_id)
from ods_nshop.ods_02_customer a
join dwd_nshop.dwd_nshop_orders_details b
on a.customer_id=b.customer_id
join ods_nshop.dim_pub_product c
on b.supplier_code=c.supplier_code
join dws_nshop.dws_nshop_ulog_view d
on b.customer_id=d.user_id
join ods_nshop.ods_01_base_area e
on a.customer_natives=e.area_code
where d.bdp_day='20220509'
group by
a.customer_gender,
a.customer_age_range,
e.province_name,
c.category_code;
为什么要把昨天的指标再次统计一遍,因为今天新增了一个ods_01_base_area,统计出来的结果更加的便于展示。
运行结束后,将ads层的运营指标数据,导出到mysql中。
CREATE TABLE `ads_nshop_oper_stat` (
`customer_gender` int DEFAULT NULL,
`age_range` varchar(255) DEFAULT NULL,
`customer_natives` varchar(255) DEFAULT NULL,
`product_type` int DEFAULT NULL,
`order_counts` int DEFAULT NULL,
`order_rate` double DEFAULT NULL,
`order_amounts` int DEFAULT NULL,
`order_discounts` int DEFAULT NULL,
`shipping_amounts` int DEFAULT NULL,
`per_customer_transaction` int DEFAULT NULL
)
编写job任务:
{
"job": {
"setting": {
"speed": {
"channel": 3
}
},
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"path": "/data/nshop/ads/operation/ads_nshop_oper_stat/bdp_day=${dt}/*",
"defaultFS": "hdfs://bigdata01:9820",
"column": [
{
"index": 0,
"type": "long"
},
{
"index": 1,
"type": "string"
},
{
"index": 2,
"type": "string"
},
{
"index": 3,
"type": "long"
},
{
"index": 4,
"type": "long"
},
{
"index": 5,
"type": "double"
},
{
"index": 6,
"type": "long"
},
{
"index": 7,
"type": "long"
},
{
"index": 8,
"type": "long"
},
{
"index": 9,
"type": "long"
}
],
"fileType": "text",
"encoding": "UTF-8",
"fieldDelimiter": ","
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "123456",
"column": [
"customer_gender",
"age_range",
"customer_natives",
"product_type",
"order_counts",
"order_rate",
"order_amounts",
"order_discounts",
"shipping_amounts",
"per_customer_transaction"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://bigdata01:3306/nshop",
"table": [
"ads_nshop_oper_stat"
]
}
]
}
}
}
]
}
}
运行:
datax.py 05.json -p "-Ddt=20220509"
6、风控类指标统计
insert overwrite table ads_nshop.ads_nshop_risk_mgt partition(bdp_day='20220509')
select
a.customer_gender,
a.customer_age_range,
e.province_name,
c.category_code,
count(distinct case when b.order_status=6 then b.order_id end),
count(distinct case when b.order_status=6 then b.order_id end)/count(distinct b.order_id)*100
from ods_nshop.ods_02_customer a
join dwd_nshop.dwd_nshop_orders_details b
on a.customer_id=b.customer_id
join ods_nshop.dim_pub_product c
on b.supplier_code=c.supplier_code
join ods_nshop.ods_01_base_area e
on a.customer_natives=e.area_code
where b.bdp_day='20220509'
group by
a.customer_gender,
a.customer_age_range,
e.province_name,
c.category_code;
创建mysql的表:
CREATE TABLE `ads_nshop_risk_mgt` (
`customer_gender` int DEFAULT NULL,
`age_range` varchar(255) DEFAULT NULL,
`customer_natives` varchar(255) DEFAULT NULL,
`product_type` varchar(255) DEFAULT NULL,
`start_complaint_counts` int DEFAULT NULL,
`complaint_rate` double DEFAULT NULL
)
将hive中新跑的数据导出到mysql中:
{
"job": {
"setting": {
"speed": {
"channel": 3
}
},
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"path": "/data/nshop/ads/operation/ads_nshop_risk_mgt/bdp_day=${dt}/*",
"defaultFS": "hdfs://bigdata01:9820",
"column": [
{
"index": 0,
"type": "long"
},
{
"index": 1,
"type": "string"
},
{
"index": 2,
"type": "string"
},
{
"index": 3,
"type": "string"
},
{
"index": 4,
"type": "long"
},
{
"index": 5,
"type": "double"
}
],
"fileType": "text",
"encoding": "UTF-8",
"fieldDelimiter": ","
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "123456",
"column": [
"customer_gender",
"age_range",
"customer_natives",
"product_type",
"start_complaint_counts",
"complaint_rate"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://bigdata01:3306/nshop",
"table": [
"ads_nshop_risk_mgt"
]
}
]
}
}
}
]
}
}
执行该脚本:
datax.py 06.json -p "-Ddt=20220509"
7、统计类的TopN
重新运行指标任务,原因是添加了地区:
insert overwrite table ads_nshop.ads_nshop_pay_stat_topn partition(bdp_day='20220509')
select
case when b.pay_type='10' then '网上银行' when b.pay_type='11' then '微信' when b.pay_type='12' then '支付宝' else '线下支付' end,
e.province_name,
count(distinct b.pay_id),
sum(b.pay_amount) as pay_sum
from ods_nshop.ods_02_customer a
join ods_nshop.ods_02_orders_pay_records b
on a.customer_id=b.customer_id
join ods_nshop.ods_01_base_area e
on a.customer_natives=e.area_code
group by
e.province_name,
b.pay_type order by pay_sum desc limit 500;
在mysql中创建表:
CREATE TABLE `ads_nshop_pay_stat_topn` (
`pay_type` varchar(255) DEFAULT NULL,
`customer_area_code` varchar(255) DEFAULT NULL,
`pay_count` int DEFAULT NULL,
`pay_amounts` int DEFAULT NULL
)
编写job任务:
{
"job": {
"setting": {
"speed": {
"channel": 3
}
},
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"path": "/data/nshop/ads/operation/ads_nshop_pay_stat_topn/bdp_day=${dt}/*",
"defaultFS": "hdfs://bigdata01:9820",
"column": [
{
"index": 0,
"type": "string"
},
{
"index": 1,
"type": "string"
},
{
"index": 2,
"type": "long"
},
{
"index": 3,
"type": "long"
}
],
"fileType": "text",
"encoding": "UTF-8",
"fieldDelimiter": ","
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "123456",
"column": [
"pay_type",
"customer_area_code",
"pay_count",
"pay_amounts"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://bigdata01:3306/nshop",
"table": [
"ads_nshop_pay_stat_topn"
]
}
]
}
}
}
]
}
}
运行任务:
datax.py 07.json -p "-Ddt=20220509"
展示:
树状图统计:
8、广告投放类指标统计
重新运行指标:
insert overwrite table ads_nshop.ads_nshop_release_stat partition(bdp_day='20220509')
select
a.device_type,
a.os,
b.customer_gender,
b.customer_age_range,
e.province_name,
a.release_sources,
a.release_category,
count(distinct a.customer_id),
count(*)
from dwd_nshop.dwd_nshop_releasedatas a
join ods_nshop.ods_02_customer b
on a.customer_id=b.customer_id
join ods_nshop.ods_01_base_area e
on b.customer_natives=e.area_code
where a.bdp_day='20220509'
group by
a.device_type,
a.os,
b.customer_gender,
b.customer_age_range,
e.province_name,
a.release_sources,
a.release_category;
创建mysql数据库:
CREATE TABLE `ads_nshop_release_stat` (
`device_type` varchar(255) DEFAULT NULL,
`os` varchar(255) DEFAULT NULL,
`customer_gender` int DEFAULT NULL,
`age_range` varchar(255) DEFAULT NULL,
`customer_natives` varchar(255) DEFAULT NULL,
`release_sources` varchar(255) DEFAULT NULL,
`release_category` varchar(255) DEFAULT NULL,
`visit_total_customers` int DEFAULT NULL,
`visit_total_counts` int DEFAULT NULL
)
执行任务,导出数据:
{
"job": {
"setting": {
"speed": {
"channel": 3
}
},
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"path": "/data/nshop/ads/operation/ads_nshop_release_stat/bdp_day=${dt}/*",
"defaultFS": "hdfs://bigdata01:9820",
"column": [
{
"index": 0,
"type": "string"
},
{
"index": 1,
"type": "string"
},
{
"index": 2,
"type": "long"
},
{
"index": 3,
"type": "string"
},
{
"index": 4,
"type": "string"
},
{
"index": 5,
"type": "string"
},
{
"index": 6,
"type": "string"
},
{
"index": 7,
"type": "long"
},
{
"index": 8,
"type": "long"
}
],
"fileType": "text",
"encoding": "UTF-8",
"fieldDelimiter": ","
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "123456",
"column": [
"device_type",
"os",
"customer_gender",
"age_range",
"customer_natives",
"release_sources",
"release_category",
"visit_total_customers",
"visit_total_counts"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://bigdata01:3306/nshop",
"table": [
"ads_nshop_release_stat"
]
}
]
}
}
}
]
}
}
执行该任务,导出数据即可:
datax.py 08.json -p "-Ddt=20220509"