[toc]

需求描述

  • 统计视频观看数 Top10
  • 统计视频类别热度 Top10
  • 统计出视频观看数最高的 20 个视频的所属类别以及类别包含 Top20 视频的个数
  • 统计视频观看数 Top50 所关联视频的所属类别 Rank
  • 统计每个类别中的视频热度 Top10, 以 Music 为例
  • 统计每个类别视频观看数 Top10
  • 统计上传视频最多的用户 Top10 以及他们上传的视频观看次数在前20的视频

数据结构

视频表

字段 备注 详细描述
videoId 视频唯一id(String) 11 位字符串
uploader 视频上传者(String) 上传视频的用户名 String
age 视频年龄(int) 视频在平台上的整数天
category 视频类别(Array 上传视频指定的视频分类
length 视频长度(Int) 整形数字标识的视频长度
views 观看次数(Int) 视频被浏览的次数
rate 视频评分(Double) 满分 5 分
Ratings 流量(Int) 视频的流量,整型数字
conments 评论数(Int) 一个视频的整数评论数
relatedId 相关视频id(Array 相关视频的 id ,最多 20 个

用户表

字段 备注 字段类型
uploader 上传者用户名 string
videos 上传视频数 int
friends 朋友数量 int

ETL

video :

  1. LKh7zAJ4nwo TheReceptionist 653 Entertainment 424 13021 4.34 1305 744 DjdA-5oKYFQ NxTDlnOuybo c-8VuICzXtU DH56yrIO5nI W1Uo5DQTtzc E-3zXq_r4w0 1TCeoRPg5dE yAr26YhuYNY 2ZgXx72XmoE -7ClGo-YgZ0 vmdPOOd6cxI KRHfMQqSHpk pIMpORZthYw 1tUDzOp10pk heqocRij5P0 _XIuvoH6rUg LGVU5DsezE0 uO2kj6_D8B4 xiDqywcDQRM uX81lMev6_o

user :

  1. barelypolitical 151 5106

观察原始数据形式,视频可以有多个所属分类,每个所属分类用 & 符号分割,且分割的两边有空格字符,同时相关视频也是可以有多个元素,多个相关视频又用 “ \t ” 进行分割

为了方便分析数据,先对数据重组清洗操作 :

  • 将所有的类别用 “ & ” 分割,同时去掉两边空格
  • 多个相关视频 id 使用 “ & ” 进行分割

封装工具类

  1. /**
  2. * 清洗规则
  3. * 1. 将数据长度小于9的清洗掉
  4. * 2. 将数据中的视频类别中间的空格去掉 People & Blogs
  5. * 3. 将数据中的关联视频id通过 & 符号拼接
  6. */
  7. public class ETLUtil {
  8. /**
  9. * 数据清洗方法
  10. */
  11. public static String etlData(String srcData){
  12. StringBuffer resultData = new StringBuffer();
  13. //1. 先将数据通过\t 切割
  14. String[] datas = srcData.split("\t");
  15. //2. 判断长度是否小于9
  16. if(datas.length < 9){
  17. return null ;
  18. }
  19. //3. 将数据中的视频类别的空格去掉
  20. datas[3] = datas[3].replaceAll(" ", "");
  21. //4. 将数据中的关联视频id通过 & 拼接
  22. for (int i = 0; i < datas.length; i++) {
  23. if(i < 9){
  24. //4.1 没有关联视频的情况
  25. if(i == datas.length - 1){
  26. resultData.append(datas[i]);
  27. }else{
  28. resultData.append(datas[i]).append("\t");
  29. }
  30. }else{
  31. //4.2 有关联视频的情况
  32. if(i == datas.length - 1){
  33. resultData.append(datas[i]);
  34. }else{
  35. resultData.append(datas[i]).append("&");
  36. }
  37. }
  38. }
  39. return resultData.toString();
  40. }
  41. }

Mapper

  1. /**
  2. * 清洗谷粒影音的原始数据
  3. */
  4. public class EtlMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
  5. private Text k = new Text();
  6. @Override
  7. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  8. //获取一行
  9. String line = value.toString();
  10. //清洗
  11. String resultData = ETLUtil.etlData(line);
  12. if(resultData != null) {
  13. //写出
  14. k.set(resultData);
  15. context.write(k, NullWritable.get());
  16. }
  17. }
  18. }

Driver

package com.cpucode.video.etl;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class EtlDriver {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        Job job  = Job.getInstance(conf);
        job.setJarByClass(EtlDriver.class);

        job.setMapperClass(EtlMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        job.setNumReduceTasks(0);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
    }
}

将 ETL 程序打包为 etl.jar 并上传到 Linux 的 /opt/module/hive/datas 目录下

上传原始数据到 HDFS

hadoop fs -mkdir -p /video/video
hadoop fs -mkdir -p /video/user
hadoop fs -put video/user/user.txt  /video/user
hadoop fs -put video/video/*.txt  /video/video

ETL数据

hadoop jar etl.jar com.cpucode.hive.etl.EtlDriver /video/video /video/video/output

创建表

创建原始数据表

video_ori

create table video_ori(
videoId string, 
uploader string, 
age int, 
category array<string>, 
length int, 
views int, 
rate float, 
ratings int, 
comments int,
relatedId array<string>
)
row format delimited fields terminated by "\t"
collection items terminated by "&"
stored as textfile;

video_user_ori :

create table video_user_ori(
uploader string,
videos int,
friends int
)
row format delimited fields terminated by "\t"
stored as textfile;

创建 orc 存储格式 snappy 压缩的表

video_orc :

create table video_orc(
videoId string, 
uploader string, 
age int, 
category array<string>, 
length int, 
views int, 
rate float, 
ratings int, 
comments int,
relatedId array<string>
)
stored as orc
tblproperties("orc.comparess" = "SNAPPY");

video_user_orc

create table video_user_orc(
uploader string,
videos int,
friends int
)
row format delimited
fields terminated by "\t"
stored as orc
tblproperties("orc.compress" = "SNAPPY");

向 ori 表插入数据

load data inpath "/video/video/output" into table video_ori;
load data inpath "/video/user" into table video_user_ori;

向 orc 表插入数据

insert into table video_orc select * from video_ori;
insert into table video_user_orc select * from video_user_ori;

需求解决

视频观看数 Top10

思路:

  1. 使用 order by 按照 views 字段做一个全局排序
  2. 设置只显示前 10 条
select
     videoId,
     views
from
    video_orc
order by
    views DESC
limit 10;

视频类别热度 Top10

统计每个类别有多少个视频,显示出包含视频最多的前10个类别

思路:

  1. 当前表结构为:一个视频对应 n 个类别。先将类别进行列转行(展开)
select
    videoId, 
    category_name
from
    video_orc
    lateral view explode(category) video_orc_tmp AS category_name
  1. 按照类别 group by 聚合,然后 count 组内的 videoId 个数
select
    t1.category_name , 
    count(t1.videoId) hot
from
    t1
group by
    t1.category_name
  1. 最后按照热度排序,显示前 10 条
select
    t1.category_name , 
    COUNT(t1.videoId) hot
from(
    select
        videoId, 
        category_name
    from
        video_orc
        lateral view explode(category) video_orc_tmp AS category_name
) t1
group by
    t1.category_name
order by
    hot desc
limit 10;

出视频观看数最高的 20 个视频的所属类别以及类别包含 Top20 视频的个数

思路:

  1. 先找到观看数最高的 20 个视频所属条目的所有信息,降序排列
select
    videoId, 
    views,
    category
from
    video_orc
order by
    views desc
limit 20
  1. 把这 20 条信息中的 category 分裂出来( 列转行 )
select
    t1.videoId,
    category_name
from
    t1
    lateral view explode(t1.category) t1_tmp as category_name
  1. 最后查询视频分类名称和该分类下有多少个 Top20 的视频
select
    t2.category_name,
    count(t2.videoId) video_sum
from(
    select
        t1.videoId,
        category_name
    from(
        select
            videoId, 
            views,
            category
        from
            video_orc
        order by
            views desc
        limit 20
    ) t1
    lateral view explode(t1.category) t1_tmp as category_name
) t2
group by
    t2.category_name

视频观看数 Top50 所关联视频的所属类别排序

思路 :

  1. 统计视频观看数 Top50 所关联视频
select
    videoId,
    `views`,
    relatedId
from
    video_orc
order by
    `views` desc
limit 50
  1. 炸开关联视频
select
    relatedId_video
from
    t1
lateral view explode(t1.relatedId) t1_tmp as relatedId_video
  1. 关联原表, 求每个关联视频的类别
select
    t2.relatedId_video,
    t3.category
from
    t2
    join video_orc t3
    on
        t2.relatedId_video = t3.videoId
  1. 炸开类别
select
    t4.relatedId_video,
    category_name
from
    t4
    lateral view explode(t4.category) t4_tmp as category_name
  1. 按照类别分组,求统计
select
    t5.category_name,
    count(t5.relatedId_video) video_num
from
    t5
group by
    t5.category_name
  1. 求排名
select
    t6.category_name,
    t6.video_num,
    rank() over(order by t6.video_num desc ) rk
from(
    select
        t5.category_name,
        count(t5.relatedId_video) video_num
    from(
        select
        t4.relatedId_video,
        category_name
        from(
            select
                t2.relatedId_video,
                t3.category
            from(
                select
                    relatedId_video
                from(
                    select
                        videoId,
                        `views`,
                        relatedId
                    from
                        video_orc
                    order by
                        `views` desc
                    limit 50
                ) t1
                lateral view explode(t1.relatedId) t1_tmp as relatedId_video
            ) t2
                join video_orc t3
                on
                    t2.relatedId_video = t3.videoId
        ) t4
            lateral view explode(t4.category) t4_tmp as category_name
    ) t5
    group by
        t5.category_name
) t6

每个类别中的视频热度 Top10

思路:

  1. 炸开类别
select
    videoId,
    category_name,
    `views`
from
    video_orc
    lateral view explode(category) video_orc_tmp as category_name
  1. Music 类别下的 top10
select
    t1.videoId,
    t1.category_name,
    t1.views
from(
    select
        videoId,
        category_name,
        `views`
    from
        video_orc
        lateral view explode(category) video_orc_tmp as category_name
) t1
where
    t1.category_name = 'Music'
order by 
    t1.views desc
limit 10;

每个类别视频观看数 Top10

  1. 炸开类别
select
    videoId,
    category_name,
    views
from
    video_orc
lateral view expload(category) video_orc_tmp as category_name
  1. 开窗 , 按照类别分区 , 观看数排序 ,求排名
select
    t1.videoId,
    t1.category_name,
    t1.views,
      rank() over(partition by t1.category_name order by t1.views desc) rk
from
    t1
  1. 求 top10
select
    t2.videoId,
    t2.category_name,
    t2.views,
    t2.rk
from(
    select
        t1.videoId,
        t1.category_name,
        t1.views,
        rank() over(partition by t1.category_name order by t1.views desc) rk
    from(
        select
            videoId,
            category_name,
            views
        from
            video_orc
        lateral view expload(category) video_orc_tmp as category_name
    ) t1
) t2
where
    t2.rk <= 10;

上传视频最多的用户 Top10 以上传的视频观看次数在前 20 的视频

思路:

  1. 求出上传视频最多的 10 个用户
select
    uploader,
    videos
from
    video_user_orc
order by
    videos desc
limit 10
  1. 关联 video_orc 表,求出这 10 个用户上传的所有的视频,按照观看数取前 20
select
    t1.uploader,
    t2.videoId,
    t2.views
from(
    select
        uploader,
        videos
    from
        video_user_orc
    order by
        videos desc
    limit 10
) t1
    join gulivideo_orc t2
    on
        t1.uploader = t2.uploader
order by 
    t2.views desc
limit 20;

每个用户的累积访问次数

用户访问数据 :

u01     2017/1/21       5
u02     2017/1/23       6
u03     2017/1/22       8
u04     2017/1/20       3
u01     2017/1/23       6
u01     2017/2/21       8
u02     2017/1/23       6
u01     2017/2/22       4

创建表

create table action(
userId string,
visitDate string,
visitCount int
)
row format delimited fields terminated by "\t";
  1. 把日期格式化 , 并对用户和日期进行分组, 求次数和
select
    userid,
    date_format(regexp_replace(visitDate, '/', '-'), 'yyyy-MM') yy_mm_date,
    sum(visitCount) sum1
from
    `action`
group by
    userId,
    yy_mm_date
  1. 对用户进行分区 , 从第一到当前行开窗 , 求该用户累积和
select
    t1.*,
    sum(sum1) over(partition by userid rows between unbounded preceding and current row)
from(
    select
        userid,
        date_format(regexp_replace(visitDate, '/', '-'), 'yyyy-MM') yy_mm_date,
        sum(visitCount) sum1
    from
        `action`
    group by
        userId,
        yy_mm_date
) t1

每个店铺的UV(访客数)

每个顾客访问任何一个店铺的任何一个商品时都会产生一条访问日志

表名为 visit

vim visit
  • 访客的用户id : user_id
  • 被访问的店铺名称 : shop
u1    a
u2    b
u1    b
u1    a
u3    c
u4    b
u1    a
u2    c
u5    b
u4    b
u6    c
u2    c
u1    b
u2    a
u2    a
u3    a
u5    a
u5    a
u5    a

创建表

create table visit(
user_id string,
shop string
)
row format delimited fields terminated by "\";
select
    shop,
    count(user_id) cou
from
    visit
group by
    shop

每个店铺访问次数 top3 的访客信息

  1. 求每个店铺每个用户访问次数
select
    shop,
    user_id,
    count(user_id) cou
from
    visit
group by
    shop, user_id
  1. 对商店分区 , 根据访问次数排名
select
    *,
    rank() over(partition by shop order by cou desc) rk
from
    t1
  1. 根据排名选择 top3
select
    shop,
    user_id,
    cou
from(
    select
        *,
        rank() over(partition by shop order by cou desc) rk
    from(
        select
            shop,
            user_id,
            count(user_id) cou
        from
            visit
        group by
            shop, user_id
    ) t1
) t2
where
    rk >= 3;

蚂蚁森林植物申领统计

user_low_carbon:

  • 用户
  • 日期
  • 减少碳排放(g)
u_001    2017/1/1    10
u_001    2017/1/2    150
u_001    2017/1/2    110
u_001    2017/1/2    10
u_001    2017/1/4    50
u_001    2017/1/4    10
u_001    2017/1/6    45
u_001    2017/1/6    90
u_002    2017/1/1    10
u_002    2017/1/2    150
u_002    2017/1/2    70
u_002    2017/1/3    30
u_002    2017/1/3    80
u_002    2017/1/4    150
u_002    2017/1/5    101
u_002    2017/1/6    68
u_003    2017/1/1    20
u_003    2017/1/2    10
u_003    2017/1/2    150
u_003    2017/1/3    160
u_003    2017/1/4    20
u_003    2017/1/5    120
u_003    2017/1/6    20
u_003    2017/1/7    10

plant_carbon:

  • 植物编号
  • 植物名
  • 换购植物所需要的碳
p001 梭梭树  17
p002 沙柳 19
p003 樟子树  146
p004 胡杨 215

创建表

create table user_low_carbon(
    user_id string,
    data_dt string,
    low_carbon int
)
row format delimited fields terminated by "\t";
create table plant_carbon(
    plant_id string,
    plant_name string,
    low_carbon int
)
row format delimited fields terminated by '\t';

加载数据

load data local inpath "/opt/module/hive-3.1.2/datas/user_low_carbon.txt" into table user_low_carbon;
load data local inpath "/opt/module/hive-3.1.2/datas/plant_carbon.txt" into table plant_carbon;

设置本地模式

set hive.exec.mode.local.auto = true;

假设 :

  • 2017年1月1日开始记录低碳数据(user_low_carbon)
  • 2017年10月1日之前满足申领条件的用户都申领了一颗 p004-胡杨
  • 剩余的能量全部用来领取 “ p002-沙柳 ”

需求 : 统计在10月1日累计申领 “ p002-沙柳 ” 排名前 10 的用户信息;以及他比后一名多领了几颗沙柳

  1. 调整日期格式 , 对 10月1日之后 进行排除
select
    user_id,
    date_format(regexp_replace(data_dt, '/', '-'), 'yyyy-MM-dd') data_dt,
    low_carbon 
from
    user_low_carbon
where 
    data_dt < '2017-10-01'
  1. 对用户进行分组 , 求每用户的碳和 , 并对其排序 , 查出 11 条
select
    user_id,
    sum(t1.low_carbon) `sum`
from
    t1
group by
    user_id
order by
    `sum` desc 
limit 11
  1. 找出 胡杨 , 沙柳 求出沙柳数
select
    user_id,
    floor((sum - t4.low_carbon) / t3.low_carbon) sum1
from 
    t2,
    (
        select
            low_carbon
        from
            plant_carbon
        where
            plant_id = 'p002'
    ) t3,
    (
        select
            low_carbon
        from
            plant_carbon
        where
            plant_id = 'p004'
    ) t4
  1. 把下条沙柳数放在当前行 , 并根据沙柳数排序 , 查出 10 条
select
    user_id,
    sum1, 
    lead(t5.sum1, 1) over(order by sum1 desc) next_sum1
from
    t5
limit
    10
  1. 求出沙柳数比下名多多少 , 并按照沙柳数排序
select
    t6.user_id,
    t6.sum1,
    (t6.sum1 - t6.next_sum1) 
from(
    select
        user_id,
        sum1, 
        lead(t5.sum1, 1) over(order by sum1 desc) next_sum1
    from(
        select
            user_id,
            floor((sum - t4.low_carbon) / t3.low_carbon) sum1
        from (
            select
                user_id,
                sum(t1.low_carbon) `sum`
            from(
                select
                    user_id,
                    date_format(regexp_replace(data_dt, '/', '-'), 'yyyy-MM-dd') data_dt,
                    low_carbon 
                from
                    user_low_carbon
                where 
                    data_dt < '2017-10-01' 
            ) t1
            group by
                user_id
            order by
                `sum` desc 
            limit 11
            ) t2,
            (
                select
                    low_carbon
                from
                    plant_carbon
                where
                    plant_id = 'p002'
            ) t3,
            (
                select
                    low_carbon
                from
                    plant_carbon
                where
                    plant_id = 'p004'
            ) t4
    ) t5
    limit
        10
) t6
order by
    t6.sum1 desc;

蚂蚁森林低碳用户排名分析

需求 :

  • 用户在2017年,连续三天(或以上)的天数里
  • 每天减少碳排放(low_carbon)都超过100g的用户低碳流水
  • 需要查询返回满足以上条件的 user_low_carbon 表中的记录流水
  1. 对日期进行格式化
select
    user_id,
    date_format(regexp_replace(data_dt, '/', '-'), 'yyyy-MM-dd') data_dt,
    low_carbon
from
    user_low_carbon
  1. 对用户和日期进行分组 , 并把低于 100 排除
select
    t1.user_id, 
    t1.data_dt,
    sum(t1.low_carbon) day_sum 
from
    t1
group by
    user_id, data_dt
having
    day_sum > 100

select 
    t5.user_id,
    t6.data_dt,
    t6.low_carbon 
from (
    select 
        t4.user_id, t4.day_sum, t4.jt 
    from (
        select 
            t3.user_id, t3.day_sum, t3.jt,
            datediff(t3.jt,t3.qt) jt_qt_diff,
            datediff(t3.jt,t3.zt) jt_zt_diff,
            datediff(t3.jt,t3.mt) jt_mt_diff,
            datediff(t3.jt,t3.ht) jt_ht_diff
        from (
            select 
                t2.user_id,
                t2.day_sum,
                t2.data_dt jt,
                lag(t2.data_dt, 2, '1970-01-01') over(partition by t2.user_id order by t2.data_dt) qt,
                lag(t2.data_dt, 1, '1970-01-01') over(partition by t2.user_id order by t2.data_dt) zt,
                lead(t2.data_dt, 1, '9999-99-99') over(partition by t2.user_id order by t2.data_dt) mt,
                lead(t2.data_dt, 2, '9999-99-99') over(partition by t2.user_id order by t2.data_dt) ht
            from (
                select 
                    t1.user_id, t1.data_dt,
                    sum(t1.low_carbon) day_sum 
                from (
                    select 
                        user_id,
                        date_format(regexp_replace(data_dt, '/', '-'), 'yyyy-MM-dd') data_dt, 
                        low_carbon 
                    from 
                        user_low_carbon
                ) t1  
                group by 
                    user_id,data_dt 
                having 
                    day_sum > 100
            ) t2
        )t3
    )t4
    where 
        jt_qt_diff = 2 and jt_zt_diff = 1 
        or jt_zt_diff = 1 and jt_mt_diff = -1 
        or jt_mt_diff = -1 and jt_ht_diff = -1
)t5 
    join (
        select 
            user_id,
            date_format(regexp_replace(data_dt, '/', '-'), 'yyyy-MM-dd') data_dt,low_carbon 
        from user_low_carbon
    ) t6 
    on 
        t5.user_id=t6.user_id and t5.jt=t6.data_dt;

方法二 :

  1. 对日期进行格式化
select
    user_id,
    date_format(regexp_replace(data_dt, '/', '-'), 'yyyy-MM-dd') data_dt,
    low_carbon
from
    user_low_carbon
  1. 对用户和日期进行分组 , 并把低于 100 排除
select
    t1.user_id, 
    t1.data_dt,
    sum(t1.low_carbon) day_sum 
from
    t1
group by
    user_id, data_dt
having
    day_sum > 100

select 
    t6.user_id,t6.data_dt,t7.low_carbon,t6.lx_day 
from (
    select 
        +t5.user_id,t5.data_dt,t5.lx_day 
    from(
        select 
            t4.user_id,t4.data_dt,
            count(t4.lx_data) over(partition by  t4.user_id,t4.lx_data) lx_day 
        from (
            select 
                t3.user_id,
                t3.data_dt,date_sub(t3.data_dt,t3.rn) lx_data 
            from (
                select 
                    t2.user_id,
                    t2.data_dt,row_number() over(partition by t2.user_id order by t2.data_dt) rn 
                from (
                    select 
                        t1.user_id,t1.data_dt,
                        sum(t1.low_carbon) day_sum 
                    from (
                        select user_id,
                            date_format(regexp_replace(data_dt,'/','-'),'yyyy-MM-dd') data_dt,low_carbon 
                        from user_low_carbon
                         ) t1  
                        group by user_id,data_dt 
                        having day_sum>=100
                ) t2
            )t3
        )t4
    )t5 
    where t5.lx_day >=3
)t6 
    join (
        select user_id,
            date_format(regexp_replace(data_dt,'/','-'),'yyyy-MM-dd') data_dt,low_carbon 
        from user_low_carbon
    ) t7  
    on 
        t6.user_id = t7.user_id and t6.data_dt = t7.data_dt;