[toc]
需求描述
- 统计视频观看数 Top10
- 统计视频类别热度 Top10
- 统计出视频观看数最高的 20 个视频的所属类别以及类别包含 Top20 视频的个数
- 统计视频观看数 Top50 所关联视频的所属类别 Rank
- 统计每个类别中的视频热度 Top10, 以 Music 为例
- 统计每个类别视频观看数 Top10
- 统计上传视频最多的用户 Top10 以及他们上传的视频观看次数在前20的视频
数据结构
视频表
| 字段 | 备注 | 详细描述 |
|---|---|---|
| videoId | 视频唯一id(String) | 11 位字符串 |
| uploader | 视频上传者(String) | 上传视频的用户名 String |
| age | 视频年龄(int) | 视频在平台上的整数天 |
| category | 视频类别(Array |
上传视频指定的视频分类 |
| length | 视频长度(Int) | 整形数字标识的视频长度 |
| views | 观看次数(Int) | 视频被浏览的次数 |
| rate | 视频评分(Double) | 满分 5 分 |
| Ratings | 流量(Int) | 视频的流量,整型数字 |
| conments | 评论数(Int) | 一个视频的整数评论数 |
| relatedId | 相关视频id(Array |
相关视频的 id ,最多 20 个 |
用户表
| 字段 | 备注 | 字段类型 |
|---|---|---|
| uploader | 上传者用户名 | string |
| videos | 上传视频数 | int |
| friends | 朋友数量 | int |
ETL
video :
LKh7zAJ4nwo TheReceptionist 653 Entertainment 424 13021 4.34 1305 744 DjdA-5oKYFQ NxTDlnOuybo c-8VuICzXtU DH56yrIO5nI W1Uo5DQTtzc E-3zXq_r4w0 1TCeoRPg5dE yAr26YhuYNY 2ZgXx72XmoE -7ClGo-YgZ0 vmdPOOd6cxI KRHfMQqSHpk pIMpORZthYw 1tUDzOp10pk heqocRij5P0 _XIuvoH6rUg LGVU5DsezE0 uO2kj6_D8B4 xiDqywcDQRM uX81lMev6_o
user :
barelypolitical 151 5106
观察原始数据形式,视频可以有多个所属分类,每个所属分类用 & 符号分割,且分割的两边有空格字符,同时相关视频也是可以有多个元素,多个相关视频又用 “ \t ” 进行分割
为了方便分析数据,先对数据重组清洗操作 :
- 将所有的类别用 “ & ” 分割,同时去掉两边空格
- 多个相关视频 id 使用 “ & ” 进行分割
封装工具类
/*** 清洗规则* 1. 将数据长度小于9的清洗掉* 2. 将数据中的视频类别中间的空格去掉 People & Blogs* 3. 将数据中的关联视频id通过 & 符号拼接*/public class ETLUtil {/*** 数据清洗方法*/public static String etlData(String srcData){StringBuffer resultData = new StringBuffer();//1. 先将数据通过\t 切割String[] datas = srcData.split("\t");//2. 判断长度是否小于9if(datas.length < 9){return null ;}//3. 将数据中的视频类别的空格去掉datas[3] = datas[3].replaceAll(" ", "");//4. 将数据中的关联视频id通过 & 拼接for (int i = 0; i < datas.length; i++) {if(i < 9){//4.1 没有关联视频的情况if(i == datas.length - 1){resultData.append(datas[i]);}else{resultData.append(datas[i]).append("\t");}}else{//4.2 有关联视频的情况if(i == datas.length - 1){resultData.append(datas[i]);}else{resultData.append(datas[i]).append("&");}}}return resultData.toString();}}
Mapper
/*** 清洗谷粒影音的原始数据*/public class EtlMapper extends Mapper<LongWritable, Text,Text, NullWritable> {private Text k = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//获取一行String line = value.toString();//清洗String resultData = ETLUtil.etlData(line);if(resultData != null) {//写出k.set(resultData);context.write(k, NullWritable.get());}}}
Driver
package com.cpucode.video.etl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class EtlDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(EtlDriver.class);
job.setMapperClass(EtlMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setNumReduceTasks(0);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
将 ETL 程序打包为 etl.jar 并上传到 Linux 的 /opt/module/hive/datas 目录下
上传原始数据到 HDFS
hadoop fs -mkdir -p /video/video
hadoop fs -mkdir -p /video/user
hadoop fs -put video/user/user.txt /video/user
hadoop fs -put video/video/*.txt /video/video
ETL数据
hadoop jar etl.jar com.cpucode.hive.etl.EtlDriver /video/video /video/video/output
创建表
创建原始数据表
video_ori
create table video_ori(
videoId string,
uploader string,
age int,
category array<string>,
length int,
views int,
rate float,
ratings int,
comments int,
relatedId array<string>
)
row format delimited fields terminated by "\t"
collection items terminated by "&"
stored as textfile;
video_user_ori :
create table video_user_ori(
uploader string,
videos int,
friends int
)
row format delimited fields terminated by "\t"
stored as textfile;
创建 orc 存储格式 snappy 压缩的表
video_orc :
create table video_orc(
videoId string,
uploader string,
age int,
category array<string>,
length int,
views int,
rate float,
ratings int,
comments int,
relatedId array<string>
)
stored as orc
tblproperties("orc.comparess" = "SNAPPY");
video_user_orc
create table video_user_orc(
uploader string,
videos int,
friends int
)
row format delimited
fields terminated by "\t"
stored as orc
tblproperties("orc.compress" = "SNAPPY");
向 ori 表插入数据
load data inpath "/video/video/output" into table video_ori;
load data inpath "/video/user" into table video_user_ori;
向 orc 表插入数据
insert into table video_orc select * from video_ori;
insert into table video_user_orc select * from video_user_ori;
需求解决
视频观看数 Top10
思路:
- 使用
order by按照views字段做一个全局排序 - 设置只显示前 10 条
select
videoId,
views
from
video_orc
order by
views DESC
limit 10;
视频类别热度 Top10
统计每个类别有多少个视频,显示出包含视频最多的前10个类别
思路:
- 当前表结构为:一个视频对应 n 个类别。先将类别进行列转行(展开)
select
videoId,
category_name
from
video_orc
lateral view explode(category) video_orc_tmp AS category_name
- 按照类别 group by 聚合,然后 count 组内的 videoId 个数
select
t1.category_name ,
count(t1.videoId) hot
from
t1
group by
t1.category_name
- 最后按照热度排序,显示前 10 条
select
t1.category_name ,
COUNT(t1.videoId) hot
from(
select
videoId,
category_name
from
video_orc
lateral view explode(category) video_orc_tmp AS category_name
) t1
group by
t1.category_name
order by
hot desc
limit 10;
出视频观看数最高的 20 个视频的所属类别以及类别包含 Top20 视频的个数
思路:
- 先找到观看数最高的 20 个视频所属条目的所有信息,降序排列
select
videoId,
views,
category
from
video_orc
order by
views desc
limit 20
- 把这 20 条信息中的 category 分裂出来( 列转行 )
select
t1.videoId,
category_name
from
t1
lateral view explode(t1.category) t1_tmp as category_name
- 最后查询视频分类名称和该分类下有多少个 Top20 的视频
select
t2.category_name,
count(t2.videoId) video_sum
from(
select
t1.videoId,
category_name
from(
select
videoId,
views,
category
from
video_orc
order by
views desc
limit 20
) t1
lateral view explode(t1.category) t1_tmp as category_name
) t2
group by
t2.category_name
视频观看数 Top50 所关联视频的所属类别排序
思路 :
- 统计视频观看数 Top50 所关联视频
select
videoId,
`views`,
relatedId
from
video_orc
order by
`views` desc
limit 50
- 炸开关联视频
select
relatedId_video
from
t1
lateral view explode(t1.relatedId) t1_tmp as relatedId_video
- 关联原表, 求每个关联视频的类别
select
t2.relatedId_video,
t3.category
from
t2
join video_orc t3
on
t2.relatedId_video = t3.videoId
- 炸开类别
select
t4.relatedId_video,
category_name
from
t4
lateral view explode(t4.category) t4_tmp as category_name
- 按照类别分组,求统计
select
t5.category_name,
count(t5.relatedId_video) video_num
from
t5
group by
t5.category_name
- 求排名
select
t6.category_name,
t6.video_num,
rank() over(order by t6.video_num desc ) rk
from(
select
t5.category_name,
count(t5.relatedId_video) video_num
from(
select
t4.relatedId_video,
category_name
from(
select
t2.relatedId_video,
t3.category
from(
select
relatedId_video
from(
select
videoId,
`views`,
relatedId
from
video_orc
order by
`views` desc
limit 50
) t1
lateral view explode(t1.relatedId) t1_tmp as relatedId_video
) t2
join video_orc t3
on
t2.relatedId_video = t3.videoId
) t4
lateral view explode(t4.category) t4_tmp as category_name
) t5
group by
t5.category_name
) t6
每个类别中的视频热度 Top10
思路:
- 炸开类别
select
videoId,
category_name,
`views`
from
video_orc
lateral view explode(category) video_orc_tmp as category_name
- Music 类别下的 top10
select
t1.videoId,
t1.category_name,
t1.views
from(
select
videoId,
category_name,
`views`
from
video_orc
lateral view explode(category) video_orc_tmp as category_name
) t1
where
t1.category_name = 'Music'
order by
t1.views desc
limit 10;
每个类别视频观看数 Top10
- 炸开类别
select
videoId,
category_name,
views
from
video_orc
lateral view expload(category) video_orc_tmp as category_name
- 开窗 , 按照类别分区 , 观看数排序 ,求排名
select
t1.videoId,
t1.category_name,
t1.views,
rank() over(partition by t1.category_name order by t1.views desc) rk
from
t1
- 求 top10
select
t2.videoId,
t2.category_name,
t2.views,
t2.rk
from(
select
t1.videoId,
t1.category_name,
t1.views,
rank() over(partition by t1.category_name order by t1.views desc) rk
from(
select
videoId,
category_name,
views
from
video_orc
lateral view expload(category) video_orc_tmp as category_name
) t1
) t2
where
t2.rk <= 10;
上传视频最多的用户 Top10 以上传的视频观看次数在前 20 的视频
思路:
- 求出上传视频最多的 10 个用户
select
uploader,
videos
from
video_user_orc
order by
videos desc
limit 10
- 关联 video_orc 表,求出这 10 个用户上传的所有的视频,按照观看数取前 20
select
t1.uploader,
t2.videoId,
t2.views
from(
select
uploader,
videos
from
video_user_orc
order by
videos desc
limit 10
) t1
join gulivideo_orc t2
on
t1.uploader = t2.uploader
order by
t2.views desc
limit 20;
每个用户的累积访问次数
用户访问数据 :
u01 2017/1/21 5
u02 2017/1/23 6
u03 2017/1/22 8
u04 2017/1/20 3
u01 2017/1/23 6
u01 2017/2/21 8
u02 2017/1/23 6
u01 2017/2/22 4
创建表
create table action(
userId string,
visitDate string,
visitCount int
)
row format delimited fields terminated by "\t";
- 把日期格式化 , 并对用户和日期进行分组, 求次数和
select
userid,
date_format(regexp_replace(visitDate, '/', '-'), 'yyyy-MM') yy_mm_date,
sum(visitCount) sum1
from
`action`
group by
userId,
yy_mm_date
- 对用户进行分区 , 从第一到当前行开窗 , 求该用户累积和
select
t1.*,
sum(sum1) over(partition by userid rows between unbounded preceding and current row)
from(
select
userid,
date_format(regexp_replace(visitDate, '/', '-'), 'yyyy-MM') yy_mm_date,
sum(visitCount) sum1
from
`action`
group by
userId,
yy_mm_date
) t1
每个店铺的UV(访客数)
每个顾客访问任何一个店铺的任何一个商品时都会产生一条访问日志
表名为 visit
vim visit
- 访客的用户id : user_id
- 被访问的店铺名称 : shop
u1 a
u2 b
u1 b
u1 a
u3 c
u4 b
u1 a
u2 c
u5 b
u4 b
u6 c
u2 c
u1 b
u2 a
u2 a
u3 a
u5 a
u5 a
u5 a
创建表
create table visit(
user_id string,
shop string
)
row format delimited fields terminated by "\";
select
shop,
count(user_id) cou
from
visit
group by
shop
每个店铺访问次数 top3 的访客信息
- 求每个店铺每个用户访问次数
select
shop,
user_id,
count(user_id) cou
from
visit
group by
shop, user_id
- 对商店分区 , 根据访问次数排名
select
*,
rank() over(partition by shop order by cou desc) rk
from
t1
- 根据排名选择 top3
select
shop,
user_id,
cou
from(
select
*,
rank() over(partition by shop order by cou desc) rk
from(
select
shop,
user_id,
count(user_id) cou
from
visit
group by
shop, user_id
) t1
) t2
where
rk >= 3;
蚂蚁森林植物申领统计
user_low_carbon:
- 用户
- 日期
- 减少碳排放(g)
u_001 2017/1/1 10
u_001 2017/1/2 150
u_001 2017/1/2 110
u_001 2017/1/2 10
u_001 2017/1/4 50
u_001 2017/1/4 10
u_001 2017/1/6 45
u_001 2017/1/6 90
u_002 2017/1/1 10
u_002 2017/1/2 150
u_002 2017/1/2 70
u_002 2017/1/3 30
u_002 2017/1/3 80
u_002 2017/1/4 150
u_002 2017/1/5 101
u_002 2017/1/6 68
u_003 2017/1/1 20
u_003 2017/1/2 10
u_003 2017/1/2 150
u_003 2017/1/3 160
u_003 2017/1/4 20
u_003 2017/1/5 120
u_003 2017/1/6 20
u_003 2017/1/7 10
plant_carbon:
- 植物编号
- 植物名
- 换购植物所需要的碳
p001 梭梭树 17
p002 沙柳 19
p003 樟子树 146
p004 胡杨 215
创建表
create table user_low_carbon(
user_id string,
data_dt string,
low_carbon int
)
row format delimited fields terminated by "\t";
create table plant_carbon(
plant_id string,
plant_name string,
low_carbon int
)
row format delimited fields terminated by '\t';
加载数据
load data local inpath "/opt/module/hive-3.1.2/datas/user_low_carbon.txt" into table user_low_carbon;
load data local inpath "/opt/module/hive-3.1.2/datas/plant_carbon.txt" into table plant_carbon;
设置本地模式
set hive.exec.mode.local.auto = true;
假设 :
- 2017年1月1日开始记录低碳数据(user_low_carbon)
- 2017年10月1日之前满足申领条件的用户都申领了一颗 p004-胡杨
- 剩余的能量全部用来领取 “ p002-沙柳 ”
需求 : 统计在10月1日累计申领 “ p002-沙柳 ” 排名前 10 的用户信息;以及他比后一名多领了几颗沙柳
- 调整日期格式 , 对 10月1日之后 进行排除
select
user_id,
date_format(regexp_replace(data_dt, '/', '-'), 'yyyy-MM-dd') data_dt,
low_carbon
from
user_low_carbon
where
data_dt < '2017-10-01'
- 对用户进行分组 , 求每用户的碳和 , 并对其排序 , 查出 11 条
select
user_id,
sum(t1.low_carbon) `sum`
from
t1
group by
user_id
order by
`sum` desc
limit 11
- 找出 胡杨 , 沙柳 求出沙柳数
select
user_id,
floor((sum - t4.low_carbon) / t3.low_carbon) sum1
from
t2,
(
select
low_carbon
from
plant_carbon
where
plant_id = 'p002'
) t3,
(
select
low_carbon
from
plant_carbon
where
plant_id = 'p004'
) t4
- 把下条沙柳数放在当前行 , 并根据沙柳数排序 , 查出 10 条
select
user_id,
sum1,
lead(t5.sum1, 1) over(order by sum1 desc) next_sum1
from
t5
limit
10
- 求出沙柳数比下名多多少 , 并按照沙柳数排序
select
t6.user_id,
t6.sum1,
(t6.sum1 - t6.next_sum1)
from(
select
user_id,
sum1,
lead(t5.sum1, 1) over(order by sum1 desc) next_sum1
from(
select
user_id,
floor((sum - t4.low_carbon) / t3.low_carbon) sum1
from (
select
user_id,
sum(t1.low_carbon) `sum`
from(
select
user_id,
date_format(regexp_replace(data_dt, '/', '-'), 'yyyy-MM-dd') data_dt,
low_carbon
from
user_low_carbon
where
data_dt < '2017-10-01'
) t1
group by
user_id
order by
`sum` desc
limit 11
) t2,
(
select
low_carbon
from
plant_carbon
where
plant_id = 'p002'
) t3,
(
select
low_carbon
from
plant_carbon
where
plant_id = 'p004'
) t4
) t5
limit
10
) t6
order by
t6.sum1 desc;
蚂蚁森林低碳用户排名分析
需求 :
- 用户在2017年,连续三天(或以上)的天数里
- 每天减少碳排放(low_carbon)都超过100g的用户低碳流水
- 需要查询返回满足以上条件的 user_low_carbon 表中的记录流水
- 对日期进行格式化
select
user_id,
date_format(regexp_replace(data_dt, '/', '-'), 'yyyy-MM-dd') data_dt,
low_carbon
from
user_low_carbon
- 对用户和日期进行分组 , 并把低于 100 排除
select
t1.user_id,
t1.data_dt,
sum(t1.low_carbon) day_sum
from
t1
group by
user_id, data_dt
having
day_sum > 100
select
t5.user_id,
t6.data_dt,
t6.low_carbon
from (
select
t4.user_id, t4.day_sum, t4.jt
from (
select
t3.user_id, t3.day_sum, t3.jt,
datediff(t3.jt,t3.qt) jt_qt_diff,
datediff(t3.jt,t3.zt) jt_zt_diff,
datediff(t3.jt,t3.mt) jt_mt_diff,
datediff(t3.jt,t3.ht) jt_ht_diff
from (
select
t2.user_id,
t2.day_sum,
t2.data_dt jt,
lag(t2.data_dt, 2, '1970-01-01') over(partition by t2.user_id order by t2.data_dt) qt,
lag(t2.data_dt, 1, '1970-01-01') over(partition by t2.user_id order by t2.data_dt) zt,
lead(t2.data_dt, 1, '9999-99-99') over(partition by t2.user_id order by t2.data_dt) mt,
lead(t2.data_dt, 2, '9999-99-99') over(partition by t2.user_id order by t2.data_dt) ht
from (
select
t1.user_id, t1.data_dt,
sum(t1.low_carbon) day_sum
from (
select
user_id,
date_format(regexp_replace(data_dt, '/', '-'), 'yyyy-MM-dd') data_dt,
low_carbon
from
user_low_carbon
) t1
group by
user_id,data_dt
having
day_sum > 100
) t2
)t3
)t4
where
jt_qt_diff = 2 and jt_zt_diff = 1
or jt_zt_diff = 1 and jt_mt_diff = -1
or jt_mt_diff = -1 and jt_ht_diff = -1
)t5
join (
select
user_id,
date_format(regexp_replace(data_dt, '/', '-'), 'yyyy-MM-dd') data_dt,low_carbon
from user_low_carbon
) t6
on
t5.user_id=t6.user_id and t5.jt=t6.data_dt;
方法二 :
- 对日期进行格式化
select
user_id,
date_format(regexp_replace(data_dt, '/', '-'), 'yyyy-MM-dd') data_dt,
low_carbon
from
user_low_carbon
- 对用户和日期进行分组 , 并把低于 100 排除
select
t1.user_id,
t1.data_dt,
sum(t1.low_carbon) day_sum
from
t1
group by
user_id, data_dt
having
day_sum > 100
select
t6.user_id,t6.data_dt,t7.low_carbon,t6.lx_day
from (
select
+t5.user_id,t5.data_dt,t5.lx_day
from(
select
t4.user_id,t4.data_dt,
count(t4.lx_data) over(partition by t4.user_id,t4.lx_data) lx_day
from (
select
t3.user_id,
t3.data_dt,date_sub(t3.data_dt,t3.rn) lx_data
from (
select
t2.user_id,
t2.data_dt,row_number() over(partition by t2.user_id order by t2.data_dt) rn
from (
select
t1.user_id,t1.data_dt,
sum(t1.low_carbon) day_sum
from (
select user_id,
date_format(regexp_replace(data_dt,'/','-'),'yyyy-MM-dd') data_dt,low_carbon
from user_low_carbon
) t1
group by user_id,data_dt
having day_sum>=100
) t2
)t3
)t4
)t5
where t5.lx_day >=3
)t6
join (
select user_id,
date_format(regexp_replace(data_dt,'/','-'),'yyyy-MM-dd') data_dt,low_carbon
from user_low_carbon
) t7
on
t6.user_id = t7.user_id and t6.data_dt = t7.data_dt;
