案例1:WordCount

  • 理解行列转换函数explode

$ hadoop fs -put sample.txt /hive/wordcount/input
hive>create table words(line string);
hive>load data inpath ‘/hive/wordcount/input’ overwrite into table words;
hive>create table wordcount as select word, count(1) as count from (select explode(split(line,’ ‘))as word from words) w group by word order by word;

案例2:去重

dintinct 去重

  • 适合整行去重

:::info hive> create table test1(userid int,courseid int);
hive> insert into test1(userid,courseid )values(101,666),(102,666),(102,666);
hive> select distinct userid,courseid from test1; :::

  • with as 用法: :::info hive> with test1 as
    (select 101 as userid,666 as courseid
    union all
    select 102 as userid,666 as courseid
    union all
    select 102 as userid,666 as courseid)
    select
    distinct userid,courseid
    from test1; :::

    group by 去重

    :::info hive> select userid,courseid from test1 group by userid,courseid; :::

row_number 去重

创建表:用户ID,课程名字,考试日期,分数 :::info hive> create table test2(userid int,courseid string,ctime string,score int);
hive> insert into test2(userid,courseid ,ctime,score)values
(101,’redis’,’05-01’,60),
(101,’redis’,’05-02’,70),
(101,’mysql’,’05-04’,65),
(101,’mysql’,’05-04’,76),
(102,’redis’,’05-01’,70),
(102,’redis’,’05-03’,80),
(102,’mysql’,’05-07’,75),
(102,’mysql’,’05-08’,86),
(103,’redis’,’05-15’,91),
(103,’mysql’,’05-20’,92); :::

问题
查询每个人每门课程的最终分数,
分析
distinct只是简单的去重,解决不了问题;group by也是简单的分组去重,也解决不了问题;order by只是简单的排序,也解决不了问题。那这个时候row_number()就派上用场了,分组完再排序

体验row_number()用法 :::info hive> select userid,courseid,ctime,score,
row_number() over(partition by userid,courseid order by ctime desc) as rank
from test2; ::: 解决问题: :::info hive> select userid,courseid,ctime,score
from
(select userid,courseid,ctime,score,
row_number() over(partition by userid,courseid order by ctime desc) as rank
from test2) t1
where t1.rank=1; :::

对比总结

distinct去重,所有数据都在一个reduce里面,很浪费资源,效率又很低,会有内存溢出的风险。
group by去重不会将所有数据都到一个reduce中,但是无法实现复杂业务。
简单数据去重建议用group by替代distinct的方式。
row_number可以实现复杂业务,比如top10等、最大、最小值所在列的其他列属性

作业

1、造数并证明distinct去重时所有数据都会集中到一个reduce中,group by 方式不会

通过yarn查看作业图,并截图

2、查询每个班级每门课程成绩前三的学生姓名、性别、年龄、班级

表结构为:学生姓名、科目、成绩、班级、年龄、性别

案例3:无窗口函数时获取最大值所在记录

问题:
使用max,group by只能获取group by的分组与最大值,不能获取最大值所在列的其他信息

id name age class
1 qiu 22 1
2 liu 42 1
3 zheng 20 2
4 qian 20 2
5 wang 11 3
6 li 33 3

:::info create table test(id int primary key,name varchar(20),age int,class int);
replace into test values(1,’qiu’,22,1);
replace into test values(2,’liu’,42,1);
replace into test values(3,’zheng’,20,2);
replace into test values(4,’qian’,20,2);
replace into test values(5,’wang’,11,3);
replace into test values(6,’li’,33,3); :::

解决方案一

:::info select from (select from test order by age desc) as b group by class; ::: 与之匹配的是group by分组后的第一条记录的基本信息
在mysql执行没有问题,性能也比较快,最大值有多个时只会返回第一个

解决方案二

:::info select * from test t where t.age = (select max(age) from test where t.class = class); ::: mysql测试,8000条,InnoDB情况下方法2会慢很多,几乎无法忍受,MyIsam情况下第一次很慢,后面会比较快
mysql测试,8000条,加索引,第一次很慢16s,后面比较快0s
mysql测试,上百万条的话会非常慢,10几分钟尚无结果
在phoenix测试:表行数39w+,被选中记录几万级别,最终结果几千的情况下,使用时间为几秒左右(<3~5s)
如果有多个最大值则结果中会包含多个
总结: mysql用方法1,phoenix用方法2

案例4:排序

order by:

order by会对输入做全局排序,因此只有一个Reducer(多个Reducer无法保证全局有序),然而只有一个Reducer,会导致当输入规模较大时,消耗较长的计算时间。 :::info select * from test order by name; :::

sort by

sort by不是全局排序,其在数据进入reducer前完成排序,因此,如果用sort by进行排序,并且设置mapred.reduce.tasks>1,则sort by只会保证每个reducer的输出有序,并不保证全局有序。
sort by的数据只能保证在同一个reduce中的数据可以按指定字段排序。
使用sort by你可以指定执行的reduce个数(通过set mapred.reduce.tasks=n来指定),对输出的数据再执行归并排序,即可得到全部结果。 :::info select * from test order by name; :::

distribute by

distribute by是控制在map端如何拆分数据给reduce端的。
hive会根据distribute by后面列,对应reduce的个数进行分发,默认是采用hash算法。
sort by为每个reduce产生一个排序文件。在有些情况下,你需要控制某个特定行应该到哪个reducer,这通常是为了进行后续的聚集操作。distribute by刚好可以做这件事。因此,distribute by经常和sort by配合使用。

  • Distribute by和sort by的使用场景

1.Map输出的文件大小不均。
2.Reduce输出文件大小不均。
3.小文件过多。
4.文件超大。

cluster by

cluster by除了具有distribute by的功能外还兼具sort by的功能。但是排序只能是倒叙排序,不能指定排序规则为ASC或者DESC。

ROW_NUMBER()

窗口函数用法参考前面。

案例5:行列转换

输入表1:

id name course score
1 stu1 mysql 90
2 stu1 mongodb 100
3 stu2 mysql 80
4 stu2 mongodb 70
5 stu2 redis 90
6 stu3 mysql 100
7 stu3 redis 60
8 stu4 mysql 90
9 stu4 redis 170

输出表2:

name mysql mongodb redis
stu3 100 0 60
stu1 90 100 0
stu4 90 0 70
stu2 80 70 70

数据准备 :::info CREATE TABLE temp_tb (
id int(11) NOT NULL AUTO_INCREMENT,
name varchar(10) NOT NULL,
course varchar(10) DEFAULT NULL,
score int(11) DEFAULT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

insert into temp_tb(name,course,score)values
(‘stu1’,’mysql’,90),(‘stu1’,’mongodb’,100),(‘stu2’,’mysql’,80),
(‘stu2’,’mongodb’,70),(‘stu2’,’redis’,70),(‘stu3’,’mysql’,100),
(‘stu3’,’redis’,60),(‘stu4’,’mysql’,90),(‘stu4’,’redis’,70); :::

CASE WHEN实现:

:::info SELECT name,
SUM(CASE course WHEN ‘mysql’ THEN score ELSE 0 END) ‘mysql’,
SUM(CASE course WHEN ‘mongodb’ THEN score ELSE 0 END) ‘mongodb’,
SUM(CASE course WHEN ‘redis’ THEN score ELSE 0 END) ‘redis’
FROM temp_tb
GROUP BY name
ORDER BY mysql DESC; :::

IF实现:

:::info SELECT name,
SUM(CASE course WHEN ‘mysql’ THEN score ELSE 0 END) ‘mysql’,
SUM(CASE course WHEN ‘mongodb’ THEN score ELSE 0 END) ‘mongodb’,
SUM(CASE course WHEN ‘redis’ THEN score ELSE 0 END) ‘redis’
FROM temp_tb
GROUP BY name
ORDER BY mysql DESC; :::

explode

行专列,参考WordCount。

案例6:小表驱动大表

把重复关联键小的表放到join前面可以提高join效率

问题

统计供应商星际(如一星、二星、三星等)的销售情况

销售明细表:分片键,订单号,供应商id :::info sales_detail(partition_value,order_id,seller_id) ::: 供应商表:分片建,供应商id,供应商星级 :::info dim_seller(partition_value,seller_id, seller_star) :::

错误写法:性能很低

:::info select seller_star, count(order_id) as order_cnt
from
(select order_id,seller_id from sales_detail where partition_value=’20181010’ ) a
left outer join
(select seller_id, seller_star from dim_seller where partition_value =’’20181010’ ) b
on a.seller_id = b.seller_id
group by b.seller_star; :::

分析

  • 通常各个地区的供应商销售是不均衡的,比如发达地区销售可能占了8成,这样导致数据的倾斜的,如果不加以优化,此SQL将会耗费很长时间,甚至运行不出结果。
  • 通常来说,供应商是有限的,数量不会很多,而销售明细表比较大,这就是典型的大表join小表的问题
  • 解决方案

可以通过mapjoin的方式来优化,只需要添加mapjoin hint即可: :::info select /+mapjoin(b)/
seller_star, count(order_id) as order_cnt
from
(select order_id,seller_id from sales_detail where partition_value=’20181010’) a
left outer join
(select seller_id, seller_star from dim_seller where partition_value =’’20181010’ ) b
on a.seller_id = b.seller_id
group by b.seller_star; ::: /+mapjoin(b)/ 即是mapjoin hint,如果需要多个mapjoin多个表,则格式为:/+mapjoin(b,c,d)/。
Hive对于mapjoin是默认开启的,设置参数为: :::info set hive.auto.convert.join = true; :::

  • mapjoin优化是在Map阶段进行join,而不是通常那样在Reduce阶段按照join列进行分发后在每个Reduce节点上进行join,不需要分发也就没有倾斜的问题。
  • Hive会将小表(本例中的dim_seller表)全量复制到每个Map任务节点(只复制用到的列),然后每个Map任务节点查找小标即可。
  • 小表不能太大,否则全量复制分发得不偿失
  • Hive根据参数hive.mapjoin.smalltable.size(0.11.0版本后是hive.auto.convert.join.nonconditionaltask.size) 来确定小表的大小是否满⾜条件(默认25MB)
  • 此参数允许的最大值可以修改,一般超过1GB(太大的话Map任务所在的节点内存会撑爆,Hive会报错。
  • 另外需要注意的是,HDFS显示的文件大小是压缩后的大小,当实际加载到内存的时候,容量会增加很多,很多场景下会膨胀10倍。

    案例7:数据倾斜

  • 如何判断数据倾斜,如何避免

  • 容易导致数据倾斜的操作:group by,count distinct,join
  • 使用COUNT DISTINCT和GROUP BY造成的数据倾斜:

    在大量空值或NULL,或者某一个值的记录特别多,可以先把该值过滤掉,在最后单独处理:

  • 多重COUNT DISTINCT导致数据倾斜

    通常使用UNION ALL + ROW_NUMBER() + SUM + GROUP BY来变通实现。

  • 使用JOIN引起的数据倾斜:

    a) 关联键存在大量空值或者某一特殊值,如NULL 空值单独处理,不参与关联; 空值或特殊值加随机数作为关联键; b) 不同数据类型的字段关联 转换为同一数据类型之后再做关联

案例8:UDF自定义函数

案例9:分区分桶

案例10 合理使用Union All

对同一张表的union all 要比多重insert快的多,
原因是hive本身对这种union all做过优化,即只扫描一次源表;
而多重insert也只扫描一次,但应为要insert到多个分区,所以做了很多其他的事情,导致消耗的时间非常长;

lxw_test3 12亿左右记录数
Union all : 耗时7分钟左右

  1. select type,popt_id,login_date
  2. from (
  3. select 'm3_login' as type,popt_id,login_date
  4. from lxw_test3
  5. where login_date>='2012-02-01' and login_date<'2012-05-01'
  6. union all
  7. select 'mn_login' as type,popt_id,login_date
  8. from lxw_test3
  9. where login_date>='2012-05-01' and login_date<='2012-05-09'
  10. union all
  11. select 'm3_g_login' as type,popt_id,login_date
  12. from lxw_test3
  13. where login_date>='2012-02-01' and login_date<'2012-05-01' and apptypeid='1'
  14. union all
  15. select 'm3_l_login' as type,popt_id,login_date
  16. from lxw_test3
  17. where login_date>='2012-02-01' and login_date<'2012-05-01' and apptypeid='2'
  18. union all
  19. select 'm3_s_login' as type,popt_id,login_date
  20. from lxw_test3
  21. where login_date>='2012-02-01' and login_date<'2012-05-01' and apptypeid='3'
  22. union all
  23. select 'm3_o_login' as type,popt_id,login_date
  24. from lxw_test3
  25. where login_date>='2012-02-01' and login_date<'2012-05-01' and apptypeid='4'
  26. union all
  27. select 'mn_g_login' as type,popt_id,login_date
  28. from lxw_test3
  29. where login_date>='2012-05-01' and login_date<='2012-05-09' and apptypeid='1'
  30. union all
  31. select 'mn_l_login' as type,popt_id,login_date
  32. from lxw_test3
  33. where login_date>='2012-05-01' and login_date<='2012-05-09' and apptypeid='2'
  34. union all
  35. select 'mn_s_login' as type,popt_id,login_date
  36. from lxw_test3
  37. where login_date>='2012-05-01' and login_date<='2012-05-09' and apptypeid='3'
  38. union all
  39. select 'mn_o_login' as type,popt_id,login_date
  40. from lxw_test3
  41. where login_date>='2012-05-01' and login_date<='2012-05-09' and apptypeid='4'
  42. ) x

多重insert耗时25分钟左右:

  1. FROM lxw_test3
  2. insert overwrite table lxw_test6 partition (flag = '1')
  3. select 'm3_login' as type,popt_id,login_date
  4. where login_date>='2012-02-01' and login_date<'2012-05-01'
  5. insert overwrite table lxw_test6 partition (flag = '2')
  6. select 'mn_login' as type,popt_id,login_date
  7. where login_date>='2012-05-01' and login_date<='2012-05-09'
  8. insert overwrite table lxw_test6 partition (flag = '3')
  9. select 'm3_g_login' as type,popt_id,login_date
  10. where login_date>='2012-02-01' and login_date<'2012-05-01' and apptypeid='1'
  11. insert overwrite table lxw_test6 partition (flag = '4')
  12. select 'm3_l_login' as type,popt_id,login_date
  13. where login_date>='2012-02-01' and login_date<'2012-05-01' and apptypeid='2'
  14. insert overwrite table lxw_test6 partition (flag = '5')
  15. select 'm3_s_login' as type,popt_id,login_date
  16. where login_date>='2012-02-01' and login_date<'2012-05-01' and apptypeid='3'
  17. insert overwrite table lxw_test6 partition (flag = '6')
  18. select 'm3_o_login' as type,popt_id,login_date
  19. where login_date>='2012-02-01' and login_date<'2012-05-01' and apptypeid='4'
  20. insert overwrite table lxw_test6 partition (flag = '7')
  21. select 'mn_g_login' as type,popt_id,login_date
  22. where login_date>='2012-05-01' and login_date<='2012-05-09' and apptypeid='1'
  23. insert overwrite table lxw_test6 partition (flag = '8')
  24. select 'mn_l_login' as type,popt_id,login_date
  25. where login_date>='2012-05-01' and login_date<='2012-05-09' and apptypeid='2'
  26. insert overwrite table lxw_test6 partition (flag = '9')
  27. select 'mn_s_login' as type,popt_id,login_date
  28. where login_date>='2012-05-01' and login_date<='2012-05-09' and apptypeid='3'
  29. insert overwrite table lxw_test6 partition (flag = '10')
  30. select 'mn_o_login' as type,popt_id,login_date
  31. where login_date>='2012-05-01' and login_date<='2012-05-09' and apptypeid='4'

性能优化

  • 使用分区剪裁、列剪裁:尽量使用分区过滤,少用SELECT *
  • 是否存在多对多的关联:避免笛卡尔积
  • 合理使用Union All:对同一张表的union all 要比multi insert快的多。
  • 复杂业务使用临时表,而不是单表操作;
  • 使用本地MR:如果在hive中运行的sql本身数据量很小,那么使用本地mr的效率要比提交到Hadoop集群中运行快很多
  • 合理使用动态分区;
  • 中间结果压缩;
  • HQL优化其实也是MapReduce的优化,作为分布式计算模型,
  • 其最核心的地方就是要确保每个节点上分布的数据均匀,才能最大程度发挥它的威力,否则,某一个不均匀的节点就会拖后腿。