案例1:WordCount
- 理解行列转换函数explode
$ hadoop fs -put sample.txt /hive/wordcount/input
hive>create table words(line string);
hive>load data inpath ‘/hive/wordcount/input’ overwrite into table words;
hive>create table wordcount as select word, count(1) as count from (select explode(split(line,’ ‘))as word from words) w group by word order by word;
案例2:去重
dintinct 去重
- 适合整行去重
:::info
hive> create table test1(userid int,courseid int);
hive> insert into test1(userid,courseid )values(101,666),(102,666),(102,666);
hive> select distinct userid,courseid from test1;
:::
- with as 用法:
:::info
hive> with test1 as
(select 101 as userid,666 as courseid
union all
select 102 as userid,666 as courseid
union all
select 102 as userid,666 as courseid)
select
distinct userid,courseid
from test1; :::group by 去重
:::info hive> select userid,courseid from test1 group by userid,courseid; :::
row_number 去重
创建表:用户ID,课程名字,考试日期,分数
:::info
hive> create table test2(userid int,courseid string,ctime string,score int);
hive> insert into test2(userid,courseid ,ctime,score)values
(101,’redis’,’05-01’,60),
(101,’redis’,’05-02’,70),
(101,’mysql’,’05-04’,65),
(101,’mysql’,’05-04’,76),
(102,’redis’,’05-01’,70),
(102,’redis’,’05-03’,80),
(102,’mysql’,’05-07’,75),
(102,’mysql’,’05-08’,86),
(103,’redis’,’05-15’,91),
(103,’mysql’,’05-20’,92);
:::
问题
查询每个人每门课程的最终分数,
分析
distinct只是简单的去重,解决不了问题;group by也是简单的分组去重,也解决不了问题;order by只是简单的排序,也解决不了问题。那这个时候row_number()就派上用场了,分组完再排序
体验row_number()用法
:::info
hive> select userid,courseid,ctime,score,
row_number() over(partition by userid,courseid order by ctime desc) as rank
from test2;
:::
解决问题:
:::info
hive> select userid,courseid,ctime,score
from
(select userid,courseid,ctime,score,
row_number() over(partition by userid,courseid order by ctime desc) as rank
from test2) t1
where t1.rank=1;
:::
对比总结
distinct去重,所有数据都在一个reduce里面,很浪费资源,效率又很低,会有内存溢出的风险。
group by去重不会将所有数据都到一个reduce中,但是无法实现复杂业务。
简单数据去重建议用group by替代distinct的方式。
row_number可以实现复杂业务,比如top10等、最大、最小值所在列的其他列属性
作业
1、造数并证明distinct去重时所有数据都会集中到一个reduce中,group by 方式不会
通过yarn查看作业图,并截图
2、查询每个班级每门课程成绩前三的学生姓名、性别、年龄、班级
表结构为:学生姓名、科目、成绩、班级、年龄、性别
案例3:无窗口函数时获取最大值所在记录
问题:
使用max,group by只能获取group by的分组与最大值,不能获取最大值所在列的其他信息
id | name | age | class |
---|---|---|---|
1 | qiu | 22 | 1 |
2 | liu | 42 | 1 |
3 | zheng | 20 | 2 |
4 | qian | 20 | 2 |
5 | wang | 11 | 3 |
6 | li | 33 | 3 |
:::info
create table test(id int primary key,name varchar(20),age int,class int);
replace into test values(1,’qiu’,22,1);
replace into test values(2,’liu’,42,1);
replace into test values(3,’zheng’,20,2);
replace into test values(4,’qian’,20,2);
replace into test values(5,’wang’,11,3);
replace into test values(6,’li’,33,3);
:::
解决方案一
:::info
select from (select from test order by age desc) as b group by class;
:::
与之匹配的是group by分组后的第一条记录的基本信息
在mysql执行没有问题,性能也比较快,最大值有多个时只会返回第一个
解决方案二
:::info
select * from test t where t.age = (select max(age) from test where t.class = class);
:::
mysql测试,8000条,InnoDB情况下方法2会慢很多,几乎无法忍受,MyIsam情况下第一次很慢,后面会比较快
mysql测试,8000条,加索引,第一次很慢16s,后面比较快0s
mysql测试,上百万条的话会非常慢,10几分钟尚无结果
在phoenix测试:表行数39w+,被选中记录几万级别,最终结果几千的情况下,使用时间为几秒左右(<3~5s)
如果有多个最大值则结果中会包含多个
总结: mysql用方法1,phoenix用方法2
案例4:排序
order by:
order by会对输入做全局排序,因此只有一个Reducer(多个Reducer无法保证全局有序),然而只有一个Reducer,会导致当输入规模较大时,消耗较长的计算时间。 :::info select * from test order by name; :::
sort by
sort by不是全局排序,其在数据进入reducer前完成排序,因此,如果用sort by进行排序,并且设置mapred.reduce.tasks>1,则sort by只会保证每个reducer的输出有序,并不保证全局有序。
sort by的数据只能保证在同一个reduce中的数据可以按指定字段排序。
使用sort by你可以指定执行的reduce个数(通过set mapred.reduce.tasks=n来指定),对输出的数据再执行归并排序,即可得到全部结果。
:::info
select * from test order by name;
:::
distribute by
distribute by是控制在map端如何拆分数据给reduce端的。
hive会根据distribute by后面列,对应reduce的个数进行分发,默认是采用hash算法。
sort by为每个reduce产生一个排序文件。在有些情况下,你需要控制某个特定行应该到哪个reducer,这通常是为了进行后续的聚集操作。distribute by刚好可以做这件事。因此,distribute by经常和sort by配合使用。
- Distribute by和sort by的使用场景
1.Map输出的文件大小不均。
2.Reduce输出文件大小不均。
3.小文件过多。
4.文件超大。
cluster by
cluster by除了具有distribute by的功能外还兼具sort by的功能。但是排序只能是倒叙排序,不能指定排序规则为ASC或者DESC。
ROW_NUMBER()
案例5:行列转换
输入表1:
id | name | course | score |
---|---|---|---|
1 | stu1 | mysql | 90 |
2 | stu1 | mongodb | 100 |
3 | stu2 | mysql | 80 |
4 | stu2 | mongodb | 70 |
5 | stu2 | redis | 90 |
6 | stu3 | mysql | 100 |
7 | stu3 | redis | 60 |
8 | stu4 | mysql | 90 |
9 | stu4 | redis | 170 |
输出表2:
name | mysql | mongodb | redis |
---|---|---|---|
stu3 | 100 | 0 | 60 |
stu1 | 90 | 100 | 0 |
stu4 | 90 | 0 | 70 |
stu2 | 80 | 70 | 70 |
数据准备
:::info
CREATE TABLE temp_tb
(
id
int(11) NOT NULL AUTO_INCREMENT,
name
varchar(10) NOT NULL,
course
varchar(10) DEFAULT NULL,
score
int(11) DEFAULT NULL,
PRIMARY KEY (id
)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
insert into temp_tb(name,course,score)values
(‘stu1’,’mysql’,90),(‘stu1’,’mongodb’,100),(‘stu2’,’mysql’,80),
(‘stu2’,’mongodb’,70),(‘stu2’,’redis’,70),(‘stu3’,’mysql’,100),
(‘stu3’,’redis’,60),(‘stu4’,’mysql’,90),(‘stu4’,’redis’,70);
:::
CASE WHEN实现:
:::info
SELECT name,
SUM(CASE course WHEN ‘mysql’ THEN score ELSE 0 END) ‘mysql’,
SUM(CASE course WHEN ‘mongodb’ THEN score ELSE 0 END) ‘mongodb’,
SUM(CASE course WHEN ‘redis’ THEN score ELSE 0 END) ‘redis’
FROM temp_tb
GROUP BY name
ORDER BY mysql DESC;
:::
IF实现:
:::info
SELECT name,
SUM(CASE course WHEN ‘mysql’ THEN score ELSE 0 END) ‘mysql’,
SUM(CASE course WHEN ‘mongodb’ THEN score ELSE 0 END) ‘mongodb’,
SUM(CASE course WHEN ‘redis’ THEN score ELSE 0 END) ‘redis’
FROM temp_tb
GROUP BY name
ORDER BY mysql DESC;
:::
explode
案例6:小表驱动大表
把重复关联键小的表放到join前面可以提高join效率
问题
统计供应商星际(如一星、二星、三星等)的销售情况
销售明细表:分片键,订单号,供应商id :::info sales_detail(partition_value,order_id,seller_id) ::: 供应商表:分片建,供应商id,供应商星级 :::info dim_seller(partition_value,seller_id, seller_star) :::
错误写法:性能很低
:::info
select seller_star, count(order_id) as order_cnt
from
(select order_id,seller_id from sales_detail where partition_value=’20181010’ ) a
left outer join
(select seller_id, seller_star from dim_seller where partition_value =’’20181010’ ) b
on a.seller_id = b.seller_id
group by b.seller_star;
:::
分析
- 通常各个地区的供应商销售是不均衡的,比如发达地区销售可能占了8成,这样导致数据的倾斜的,如果不加以优化,此SQL将会耗费很长时间,甚至运行不出结果。
- 通常来说,供应商是有限的,数量不会很多,而销售明细表比较大,这就是典型的大表join小表的问题
- 解决方案
可以通过mapjoin的方式来优化,只需要添加mapjoin hint即可:
:::info
select /+mapjoin(b)/
seller_star, count(order_id) as order_cnt
from
(select order_id,seller_id from sales_detail where partition_value=’20181010’) a
left outer join
(select seller_id, seller_star from dim_seller where partition_value =’’20181010’ ) b
on a.seller_id = b.seller_id
group by b.seller_star;
:::
/+mapjoin(b)/ 即是mapjoin hint,如果需要多个mapjoin多个表,则格式为:/+mapjoin(b,c,d)/。
Hive对于mapjoin是默认开启的,设置参数为:
:::info
set hive.auto.convert.join = true;
:::
- mapjoin优化是在Map阶段进行join,而不是通常那样在Reduce阶段按照join列进行分发后在每个Reduce节点上进行join,不需要分发也就没有倾斜的问题。
- Hive会将小表(本例中的dim_seller表)全量复制到每个Map任务节点(只复制用到的列),然后每个Map任务节点查找小标即可。
- 小表不能太大,否则全量复制分发得不偿失
- Hive根据参数hive.mapjoin.smalltable.size(0.11.0版本后是hive.auto.convert.join.nonconditionaltask.size) 来确定小表的大小是否满⾜条件(默认25MB)
- 此参数允许的最大值可以修改,一般超过1GB(太大的话Map任务所在的节点内存会撑爆,Hive会报错。
另外需要注意的是,HDFS显示的文件大小是压缩后的大小,当实际加载到内存的时候,容量会增加很多,很多场景下会膨胀10倍。
案例7:数据倾斜
如何判断数据倾斜,如何避免
- 容易导致数据倾斜的操作:group by,count distinct,join
使用COUNT DISTINCT和GROUP BY造成的数据倾斜:
在大量空值或NULL,或者某一个值的记录特别多,可以先把该值过滤掉,在最后单独处理:
多重COUNT DISTINCT导致数据倾斜
通常使用UNION ALL + ROW_NUMBER() + SUM + GROUP BY来变通实现。
使用JOIN引起的数据倾斜:
a) 关联键存在大量空值或者某一特殊值,如NULL 空值单独处理,不参与关联; 空值或特殊值加随机数作为关联键; b) 不同数据类型的字段关联 转换为同一数据类型之后再做关联
案例8:UDF自定义函数
案例9:分区分桶
案例10 合理使用Union All
对同一张表的union all 要比多重insert快的多,
原因是hive本身对这种union all做过优化,即只扫描一次源表;
而多重insert也只扫描一次,但应为要insert到多个分区,所以做了很多其他的事情,导致消耗的时间非常长;
lxw_test3 12亿左右记录数
Union all : 耗时7分钟左右
select type,popt_id,login_date
from (
select 'm3_login' as type,popt_id,login_date
from lxw_test3
where login_date>='2012-02-01' and login_date<'2012-05-01'
union all
select 'mn_login' as type,popt_id,login_date
from lxw_test3
where login_date>='2012-05-01' and login_date<='2012-05-09'
union all
select 'm3_g_login' as type,popt_id,login_date
from lxw_test3
where login_date>='2012-02-01' and login_date<'2012-05-01' and apptypeid='1'
union all
select 'm3_l_login' as type,popt_id,login_date
from lxw_test3
where login_date>='2012-02-01' and login_date<'2012-05-01' and apptypeid='2'
union all
select 'm3_s_login' as type,popt_id,login_date
from lxw_test3
where login_date>='2012-02-01' and login_date<'2012-05-01' and apptypeid='3'
union all
select 'm3_o_login' as type,popt_id,login_date
from lxw_test3
where login_date>='2012-02-01' and login_date<'2012-05-01' and apptypeid='4'
union all
select 'mn_g_login' as type,popt_id,login_date
from lxw_test3
where login_date>='2012-05-01' and login_date<='2012-05-09' and apptypeid='1'
union all
select 'mn_l_login' as type,popt_id,login_date
from lxw_test3
where login_date>='2012-05-01' and login_date<='2012-05-09' and apptypeid='2'
union all
select 'mn_s_login' as type,popt_id,login_date
from lxw_test3
where login_date>='2012-05-01' and login_date<='2012-05-09' and apptypeid='3'
union all
select 'mn_o_login' as type,popt_id,login_date
from lxw_test3
where login_date>='2012-05-01' and login_date<='2012-05-09' and apptypeid='4'
) x
多重insert耗时25分钟左右:
FROM lxw_test3
insert overwrite table lxw_test6 partition (flag = '1')
select 'm3_login' as type,popt_id,login_date
where login_date>='2012-02-01' and login_date<'2012-05-01'
insert overwrite table lxw_test6 partition (flag = '2')
select 'mn_login' as type,popt_id,login_date
where login_date>='2012-05-01' and login_date<='2012-05-09'
insert overwrite table lxw_test6 partition (flag = '3')
select 'm3_g_login' as type,popt_id,login_date
where login_date>='2012-02-01' and login_date<'2012-05-01' and apptypeid='1'
insert overwrite table lxw_test6 partition (flag = '4')
select 'm3_l_login' as type,popt_id,login_date
where login_date>='2012-02-01' and login_date<'2012-05-01' and apptypeid='2'
insert overwrite table lxw_test6 partition (flag = '5')
select 'm3_s_login' as type,popt_id,login_date
where login_date>='2012-02-01' and login_date<'2012-05-01' and apptypeid='3'
insert overwrite table lxw_test6 partition (flag = '6')
select 'm3_o_login' as type,popt_id,login_date
where login_date>='2012-02-01' and login_date<'2012-05-01' and apptypeid='4'
insert overwrite table lxw_test6 partition (flag = '7')
select 'mn_g_login' as type,popt_id,login_date
where login_date>='2012-05-01' and login_date<='2012-05-09' and apptypeid='1'
insert overwrite table lxw_test6 partition (flag = '8')
select 'mn_l_login' as type,popt_id,login_date
where login_date>='2012-05-01' and login_date<='2012-05-09' and apptypeid='2'
insert overwrite table lxw_test6 partition (flag = '9')
select 'mn_s_login' as type,popt_id,login_date
where login_date>='2012-05-01' and login_date<='2012-05-09' and apptypeid='3'
insert overwrite table lxw_test6 partition (flag = '10')
select 'mn_o_login' as type,popt_id,login_date
where login_date>='2012-05-01' and login_date<='2012-05-09' and apptypeid='4'
性能优化
- 使用分区剪裁、列剪裁:尽量使用分区过滤,少用SELECT *
- 是否存在多对多的关联:避免笛卡尔积
- 合理使用Union All:对同一张表的union all 要比multi insert快的多。
- 复杂业务使用临时表,而不是单表操作;
- 使用本地MR:如果在hive中运行的sql本身数据量很小,那么使用本地mr的效率要比提交到Hadoop集群中运行快很多
- 合理使用动态分区;
- 中间结果压缩;
- HQL优化其实也是MapReduce的优化,作为分布式计算模型,
- 其最核心的地方就是要确保每个节点上分布的数据均匀,才能最大程度发挥它的威力,否则,某一个不均匀的节点就会拖后腿。