问题?
一个工作任务,统计一个按天分区每天都有百亿条数据条的hive表中account字段的非重用户数(大概两千万)。后来又更改为按id字段分别统计每个id的用户数。
我很轻易的跳出来了count(distinct account)这个句子。然后写上了一行查询,等待了四个小时,然后map反着跑
就知道没这么容易的任务。。
然后想起来Hive SQL 基于的mapreduce是并行计算,百亿的数据可不是平时测试时的mysql里的几百条数据。
这么想来应该是map和reduce的内存不够,
set mapreduce.map.memory.mb=48192; set mapreduce.reduce.memory.mb=48192;
执行语句
select count(distinct account) from…where…
继续mapreduce,三个小时后报错error in shuffle in fetcher#3. shuffle过程又出问题了。
找呀找,reducer只有1? 那还怎么并行?果断
set mapred.reduce.tasks=1000;
又进行查询,发现reducer 还是1,为什么?
原来因为加入distinct,map阶段不能用combine消重,数据输出为(key,value)形式然后在reduce阶段进行消重。
重点是,Hive在处理COUNT这种“全聚合(full aggregates)”计算时,它会忽略用户指定的Reduce Task数,而强制使用1。
解决办法:
转换为子查询,转化为两个mapreduce任务 先select distinct的字段,然后在count(),这样去重就会分发到不同的reduce块,count依旧是一个reduce但是只需要计数即可。
select count(*) from (select distinct account form tablename where…)t;
这样大概半小时可以得到结果。
后来需求改变为对这个表按account的类型(字段名为id)统计每个类型的account非重复数。
如果按照上述方法,在查询条件添加 where id=..,这样每个查询都需要半小时,效率很低。
优化方法:利用gourp by 按id,account分组,存入一个临时表 只需要对临时表进行统计即可
insert overwrite table temp select id,account,count(1) as num from tablename group by id,account;
这样temp表里的数据直接就是非重数据,并且按id升序排序,按id筛选 count(*)即可。 sum(num)也可统计总数。
华丽的分割线
count(distinct id)的原理
SELECT COUNT( DISTINCT id ) FROM TABLE_NAME WHERE …;
count(distinct id)从执行计划上面来看:只有一个reducer任务(即使你设置reducer任务为100个,实际上也没有用),所有的id都
会聚集到同一个reducer任务进行去重然后在聚合,这非常容易造成数据倾斜.
| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38 | STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: emp_ct
Statistics: Num rows: 42 Data size: 171 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: dept_num (type: int)
outputColumnNames: _col0
Statistics: Num rows: 42 Data size: 171 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: _col0 (type: int)
sort order: +
Statistics: Num rows: 42 Data size: 171 Basic stats: COMPLETE Column stats: NONE
Reduce Operator Tree:
Group By Operator
aggregations: count(DISTINCT KEY._col0:0._col0)
mode: complete
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink
| | —- | —- |
运行示例:注意设置的reducer任务数量实际上是不生效的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
hive> set mapreduce.job.reduces=5; hive> > select count(distinct dept_num) > from emp_ct; Query ID = mart_fro_20200320233947_4f60c190-4967-4da6-bf3e-97db786fbc6c Total jobs = 1 Launching Job 1 out of 1 Number of reduce tasks determined at compile time: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer= In order to limit the maximum number of reducers: set hive.exec.reducers.max= In order to set a constant number of reducers: set mapreduce.job.reduces= Start submit job ! Start GetSplits GetSplits finish, it costs : 32 milliseconds Submit job success : job_1584341089622_358496 Starting Job = job_1584341089622_358496, Tracking URL = http://BJHTYD-Hope-25-11.hadoop.jd.local:50320/proxy/application_1584341089622_358496/ Kill Command = /data0/hadoop/hadoop_2.100.31_2019090518/bin/hadoop job -kill job_1584341089622_358496 Hadoop job(job_1584341089622_358496) information for Stage-1: number of mappers: 2; number of reducers: 1 2020-03-20 23:39:58,215 Stage-1(job_1584341089622_358496) map = 0%, reduce = 0% 2020-03-20 23:40:09,628 Stage-1(job_1584341089622_358496) map = 50%, reduce = 0%, Cumulative CPU 2.74 sec 2020-03-20 23:40:16,849 Stage-1(job_1584341089622_358496) map = 100%, reduce = 0%, Cumulative CPU 7.43 sec 2020-03-20 23:40:29,220 Stage-1(job_1584341089622_358496) map = 100%, reduce = 100%, Cumulative CPU 10.64 sec MapReduce Total cumulative CPU time: 10 seconds 640 msec Stage-1 Elapsed : 40533 ms job_1584341089622_358496 Ended Job = job_1584341089622_358496 MapReduce Jobs Launched: Stage-1: Map: 2 Reduce: 1 Cumulative CPU: 10.64 sec HDFS Read: 0.000 GB HDFS Write: 0.000 GB SUCCESS Elapsed : 40s533ms job_1584341089622_358496 Total MapReduce CPU Time Spent: 10s640ms Total Map: 2 Total Reduce: 1 Total HDFS Read: 0.000 GB Written: 0.000 GB OK 3 Time taken: 43.025 seconds, Fetched: 1 row(s) |
---|---|
count(distinct id)的解决方案
该怎么解决这个问题呢?实际上解决方法非常巧妙:
我们利用Hive对嵌套语句的支持,将原来一个MapReduce作业转换为两个作业,在第一阶段选出全部的非重复id,在第二阶段再对
这些已消重的id进行计数。这样在第一阶段我们可以通过增大Reduce的并发数,并发处理Map输出。在第二阶段,由于id已经消重,
因此COUNT()操作在Map阶段不需要输出原id数据,只输出一个合并后的计数即可。这样即使第二阶段Hive强制指定一个Reduce Task,
极少量的Map输出数据也不会使单一的Reduce Task成为瓶颈。改进后的SQL语句如下:
SELECT COUNT() FROM (SELECT DISTINCT id FROM TABLE_NAME WHERE … ) t;
查看一下执行计划:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
STAGE DEPENDENCIES: Stage-1 is a root stage Stage-2 depends on stages: Stage-1 Stage-0 depends on stages: Stage-2 STAGE PLANS: Stage: Stage-1 Map Reduce Map Operator Tree: TableScan alias: emp_ct Statistics: Num rows: 42 Data size: 171 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: dept_num (type: int) outputColumnNames: dept_num Statistics: Num rows: 42 Data size: 171 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator key expressions: dept_num (type: int) sort order: + Map-reduce partition columns: dept_num (type: int) Statistics: Num rows: 42 Data size: 171 Basic stats: COMPLETE Column stats: NONE Reduce Operator Tree: Group By Operator keys: KEY._col0 (type: int) mode: complete outputColumnNames: _col0 Statistics: Num rows: 21 Data size: 85 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe Stage: Stage-2 Map Reduce Map Operator Tree: TableScan Reduce Output Operator sort order: Statistics: Num rows: 21 Data size: 85 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: int) Reduce Operator Tree: Group By Operator aggregations: count(VALUE._col0) mode: complete outputColumnNames: _col0 Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Stage: Stage-0 Fetch Operator limit: -1 Processor Tree: ListSink |
---|---|
具体看一下执行结果:注意看reducer任务的数量,第一个reducer任务是5个,第二个是1个.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
hive> set mapreduce.job.reduces=5; hive> > select count(dept_num) > from ( > select distinct dept_num > from emp_ct > ) t1; Query ID = mart_fro_20200320234453_68ad3780-c3e5-44bc-94df-58a8f2b01f59 Total jobs = 2 Launching Job 1 out of 2 Number of reduce tasks not specified. Defaulting to jobconf value of: 5 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer= In order to limit the maximum number of reducers: set hive.exec.reducers.max= In order to set a constant number of reducers: set mapreduce.job.reduces= Start submit job ! Start GetSplits GetSplits finish, it costs : 13 milliseconds Submit job success : job_1584341089622_358684 Starting Job = job_1584341089622_358684, Tracking URL = http://BJHTYD-Hope-25-11.hadoop.jd.local:50320/proxy/application_1584341089622_358684/ Kill Command = /data0/hadoop/hadoop_2.100.31_2019090518/bin/hadoop job -kill job_1584341089622_358684 Hadoop job(job_1584341089622_358684) information for Stage-1: number of mappers: 2; number of reducers: 5 2020-03-20 23:45:02,920 Stage-1(job_1584341089622_358684) map = 0%, reduce = 0% 2020-03-20 23:45:23,533 Stage-1(job_1584341089622_358684) map = 50%, reduce = 0%, Cumulative CPU 3.48 sec 2020-03-20 23:45:25,596 Stage-1(job_1584341089622_358684) map = 100%, reduce = 0%, Cumulative CPU 7.08 sec 2020-03-20 23:45:32,804 Stage-1(job_1584341089622_358684) map = 100%, reduce = 20%, Cumulative CPU 9.43 sec 2020-03-20 23:45:34,861 Stage-1(job_1584341089622_358684) map = 100%, reduce = 40%, Cumulative CPU 12.39 sec 2020-03-20 23:45:36,923 Stage-1(job_1584341089622_358684) map = 100%, reduce = 80%, Cumulative CPU 18.47 sec 2020-03-20 23:45:40,011 Stage-1(job_1584341089622_358684) map = 100%, reduce = 100%, Cumulative CPU 23.23 sec MapReduce Total cumulative CPU time: 23 seconds 230 msec Stage-1 Elapsed : 46404 ms job_1584341089622_358684 Ended Job = job_1584341089622_358684 Launching Job 2 out of 2 Number of reduce tasks determined at compile time: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer= In order to limit the maximum number of reducers: set hive.exec.reducers.max= In order to set a constant number of reducers: set mapreduce.job.reduces= Start submit job ! Start GetSplits GetSplits finish, it costs : 47 milliseconds Submit job success : job_1584341089622_358729 Starting Job = job_1584341089622_358729, Tracking URL = http://BJHTYD-Hope-25-11.hadoop.jd.local:50320/proxy/application_1584341089622_358729/ Kill Command = /data0/hadoop/hadoop_2.100.31_2019090518/bin/hadoop job -kill job_1584341089622_358729 Hadoop job(job_1584341089622_358729) information for Stage-2: number of mappers: 5; number of reducers: 1 2020-03-20 23:45:48,353 Stage-2(job_1584341089622_358729) map = 0%, reduce = 0% 2020-03-20 23:46:05,846 Stage-2(job_1584341089622_358729) map = 20%, reduce = 0%, Cumulative CPU 2.62 sec 2020-03-20 23:46:06,873 Stage-2(job_1584341089622_358729) map = 60%, reduce = 0%, Cumulative CPU 8.49 sec 2020-03-20 23:46:08,931 Stage-2(job_1584341089622_358729) map = 80%, reduce = 0%, Cumulative CPU 11.53 sec 2020-03-20 23:46:09,960 Stage-2(job_1584341089622_358729) map = 100%, reduce = 0%, Cumulative CPU 15.23 sec 2020-03-20 23:46:35,639 Stage-2(job_1584341089622_358729) map = 100%, reduce = 100%, Cumulative CPU 20.37 sec MapReduce Total cumulative CPU time: 20 seconds 370 msec Stage-2 Elapsed : 54552 ms job_1584341089622_358729 Ended Job = job_1584341089622_358729 MapReduce Jobs Launched: Stage-1: Map: 2 Reduce: 5 Cumulative CPU: 23.23 sec HDFS Read: 0.000 GB HDFS Write: 0.000 GB SUCCESS Elapsed : 46s404ms job_1584341089622_358684 Stage-2: Map: 5 Reduce: 1 Cumulative CPU: 20.37 sec HDFS Read: 0.000 GB HDFS Write: 0.000 GB SUCCESS Elapsed : 54s552ms job_1584341089622_358729 Total MapReduce CPU Time Spent: 43s600ms Total Map: 7 Total Reduce: 6 Total HDFS Read: 0.000 GB Written: 0.000 GB OK 3 Time taken: 103.692 seconds, Fetched: 1 row(s) |
---|---|
这个解决方案有点类似于set hive.groupby.skew.indata参数的作用!
实际测试:
1 2 3 4 5 6 7 8 9 |
select count(distinct dept_num) from emp_ct select count(*) from ( select distinct dept_num from emp_ct ) |
---|---|