执行计划Explain
查看执行计划:
-- EXTENDED 查看详细执行计划
EXPLAIN [EXTENDED | DEPENDENCY | AUTHORIZATION] query;
有的SQL执行不需要走MR,例如:
explain select * from emp;
输出结果:
STAGE DEPENDENCIES:
Stage-0 is a root stage
STAGE PLANS:
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
TableScan
alias: emp
Statistics: Num rows: 1 Data size: 6190 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: empno (type: int), ename (type: string), job (type: string), mgr (type: int), hiredate (type: string), sal (type: double), comm (type: double), deptno (type: int)
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7
Statistics: Num rows: 1 Data size: 6190 Basic stats: COMPLETE Column stats: NONE
ListSink
有的SQL需要走MR,例如:
explain select deptno, avg(sal) as avg_sal from emp group by deptno;
输出结果:
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: emp
Statistics: Num rows: 1 Data size: 6190 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: sal (type: double), deptno (type: int)
outputColumnNames: sal, deptno
Statistics: Num rows: 1 Data size: 6190 Basic stats: COMPLETE Column stats: NONE
Group By Operator
aggregations: sum(sal), count(sal)
keys: deptno (type: int)
mode: hash
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 1 Data size: 6190 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: _col0 (type: int)
sort order: +
Map-reduce partition columns: _col0 (type: int)
Statistics: Num rows: 1 Data size: 6190 Basic stats: COMPLETE Column stats: NONE
value expressions: _col1 (type: double), _col2 (type: bigint)
Execution mode: vectorized
Reduce Operator Tree:
Group By Operator
aggregations: sum(VALUE._col0), count(VALUE._col1)
keys: KEY._col0 (type: int)
mode: mergepartial
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 1 Data size: 6190 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: _col0 (type: int), (_col1 / _col2) (type: double)
outputColumnNames: _col0, _col1
Statistics: Num rows: 1 Data size: 6190 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 1 Data size: 6190 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink
Fetch抓取
Fetch
是指Hive中对某些情况的查询可以不必使用 MapReduce计算。
例如 select * from emp;
,Hive可以直接读取emp
对应的hdfs文件然后输出到控制台。
在hive-default.xml.template
文件中,hive.fetch.task.conversion
默认值是more
(老版本默认是minimal
),如果设置为more
,那么全局查找、字段查找、limit
等查找都不走 MapReduce。
本地模式
大多数的 Hadoop Job是需要Hadoop提供的完整的可扩展来处理大数据集的。不过,有时Hive的输入数据量是非常小的。在这种情况下,为查询触发执行任务消耗的时间可能会比实际 job 的执行时间要多的多。对于大多数这种情况,Hive可以通过本地模式在单台机器上处理所有的任务。对于小数据集,执行时间可以明显被缩短。
用户可以通过设置 hive.exec.mode.local.auto
的值为true
,来让hive在适当的时候自动启动这个优化。
-- 开启本地MR。默认值为 false
set hive.exec.mode.local.auto=true;
-- 设置 local MR 的最大输入数据量。当数据量小于这个值时采用 local MR的方式,默认为134217728,即 128M
set hive.exec.mode.local.auto.inputbytes.max=5000000;
-- 设置 local MR 的最大输入文件个数。当输入文件个数小于这个值时采用 local MR 的方式,默认为4
set hive.exec.mode.local.auto.input.files.max=10;
表的优化
小表大表Join(MapJoin)
将 key 相对分散,并且数据量小的表放在 join
的左边,可以使用 map join
让小的维度表先进内存,在 map 端完成 join。
新版的hive已经对小表大表的join
进行了优化,小表放在左边还是右边已经没有区别。
设置自动选择 MapJoin:
-- 开启自动选择MapJoin.(默认值为true)
set hive.auto.convert.join=true;
大表小表的阈值设置(默认 25M 以下认为是小表):
set hive.mapjoin.smalltable.filesize=25000000;
MapJoin机制:
其中:
Task A
:是一个 Local Task (在客户端本地执行的Task),负责扫描小表Small Table b
的数据,将其转换成一个HashTable
的数据结构,并写入本地的文件中,之后将该文件加载到DistributeCache
中Task B
:该任务是一个没有 Reduce 的MR。启动 MapTasks扫描大表Table a
,在 Map 阶段,根据表a的每一条记录去和DistributeCache
中 表b 对应的HashTable
关联,并直接输出结果- 由于 MapJoin 没有 Reduce,所以由Map直接输出结果文件。有多少个 MapTask,就有多少个结果文件
大表join大表
空值过滤:
有时候 join
超时是因为某些 key
对应的数据太多,而相同key
对应的数据都会发送到相同的reducer
上,从而导致内存不够。此时我们可以先对数据进行分析,在SQL中过滤掉这些空值等异常数据。
空key
转换:
有时候虽然某个key
为空对应的数据很多,但是相应的数据不是异常数据,必须要包含在join
结果中。此时我们可以将表中的key
为空的字段赋一个随机的值,使得数据随机均匀的分布到不同的reducer
上。
SMB(Sort Merge Bucket join):使用分桶表
Group By
默认情况下,Map阶段同一个key
数据分发给一个reducer
,当一个key
数据过大时就倾斜了。
并不是所有的聚合操作都需要在Reducer
完成,很多聚合操作都可以先在Mapper
进行部分聚合,最后在Reducer
得出最终结果。
开启Map端聚合:
set hive.map.aggr=true;
在Map段进行聚合操作的条目数目:
set hive.groupby.mapaggr.checkinterval=100000;
有数据倾斜的时候进行负载均衡(默认false
):
set hive.groupby.skewindata=true;
当选项设定为true
,生成的查询计划会有两个MR Job。第一个MR Job中,Map的输出结果会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的Group By
key
有可能被分发到不同的Reduce中,从而达到负载均衡的目的;第二个MR Job再根据预处理的数据结果按照 Group By
key
分布到Reduce中(这个过程可以保证相同的 Group By
key
被分布到同一个Reduce中),最后完成最终的聚合操作。
Count(Distinct) 去重统计
数据量大的情况下,由于count distinct
操作需要用一个 Reduce Task来完成,这一个 Reduce需要处理的数据量太大,就会导致整个 Job 很难完成,一般Count distinct
使用先 Group By
再 count
的方式替换,但是需要注意 group by
造成的数据倾斜问题。
笛卡尔积
需要尽量避免笛卡尔积。join
的时候如果不加on
条件或者无效的on
条件,Hive只能使用1个Reducer
来完成笛卡尔积。
行列过滤
列处理:在select
中,只拿需要的列,如果有分区,尽量使用分区过滤,少用 select *
。
行处理:在分区裁剪中,当使用外关联时,如果将附表的过滤条件写在where
后,那么就会先全表关联,之后再过滤。
合理设置Map和Reduce数量
通常情况下,作业会通过input
的目录产生一个或多个map任务。
主要的决定因素有:input
的文件总个数,input
的文件大小,集群设置的文件块大小。
map的数量不是越多越好。
如果一个任务有很多小文件(远远小于块大小128M),则每个小文件也会被当成一个块,用一个map来完成,而一个map任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费。而且,同时可执行的map数是受限的。
每个map处理接近128M的文件块也不一定是最优的。
比如有一个127M的文件,正常会用一个map去完成,但是如果这个文件只有一个或两个字段,行数有几千万行,而且map处理逻辑比较复杂的话,用一个map去做也会比较耗时。
复杂文件增加Map数
当input
的文件都很大,任务逻辑复杂,map执行非常慢的时候,可以考虑增加map数,来使得每个map处理的数据量减少,从而提高任务的执行效率。
根据MapReduce的切片计算逻辑:
long splitSize = Math.max(minSize, Math.min(maxSize, blockSize));
调整maxSize
最大值,让maxSize
最大值低于blockSize
就可以增加map的个数。
例如:
set mapreduce.input.fileinputformat.split.maxsize=100;
小文件进行合并
在 map 执行前合并小文件,减少 map 数:CombineHiveInputFormat
具有对小文件进行合并的功能(系统默认的格式)。HiveInputFormat
没有对小文件合并功能。
-- 默认值就是CombineHiveInputFormat
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
在MapReduce的任务结束时合并小文件的设置:
在map-only
任务结束时合并小文件(默认true
):
set hive.merge.mapfiles=true;
在map-reduce
任务结束时合并小文件(默认false
):
set hive.merge.mapredfiles=true;
合并文件的大小(默认256M):
set hive.merge.size.per.task=268435456;
当输出文件的平均大小小于该值时,启动一个独立的 MapReduce 任务进行文件merge
:
set hive.merge.smallfiles.avgsize=16777216;
合理设置Reduce数量
调整Reduce个数的方法一:
每个Reduce处理的数据量默认是256M:
set hive.exec.reducers.bytes.per.reducer=256000000;
每个任务最大的Reduce数,默认1009:
set hive.exec.reducers.max=1009;
计算Reducer书的公式:
N = min(参数2, 总输入数据量/参数1)
调整Reducer个数方式二:
在Hadoop 的 mapred-site.xml
文件中进行修改。
设置每个job的Reducer个数:
set mapreduce.job.reduces=15;
Reducer个数不是越多越好:
- 过多的启动和初始化Reducer也会消耗时间和资源
- 有多少个Reducer,就会有多少个输出文件,如果生成了很多小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题
并行执行
Hive会将一个查询转化成一个或多个阶段。这样的阶段可以是 MapReduce 阶段、抽样阶段、合并阶段、limit阶段。或者 hive 执行过程中可能需要的其他阶段。默认情况下,Hive一次只会执行一个阶段。不过,某个特定的 job 可能包含众多的阶段,而这些阶段可能并非完全互相依赖的,也就是说有些阶段是可以并行执行的,这样可能使得整个job的执行时间缩短。
通过设置hive.exec.parallel
值为true
,就可以开启并行。但是在共享集群中,需要注意,如果 job 中并行阶段增多,那么集群利用率就会增加:
-- 开启任务并行执行
set hive.exec.parallel=true;
-- 同一个sql允许最大并行度,默认为8
set hive.exec.parallel.thread.number=16;
严格模式
Hive可以通过一些设置防止部分危险的操作。
分区表不使用分区过滤:
将 hive.strict.checks.no.partition.filter
设置为 true
时,对于分区表,除非where
语句中含有分区字段过滤条件来限制范围,否则不允许执行。换句话说,就是用户不允许扫描所有分区。进行这个限制的原因是:通常分区表都拥有非常大的数据集,而且数据增加迅速。没有进行分区限制的查询可能会消耗令人不可接受的巨大资源来处理这个表。
使用 order by
没有 limit
过滤:
将 hive.strict.checks.orderby.no.limit
设置为true
时,对于使用了order by
语句的查询,要求必须使用limit
语句。因为order by
为了执行排序过程会将所有的结果数据分发到同一个Reducer中进行处理,强制要求用户增加limit
语句可以防止Reducer额外执行很长一段时间。
笛卡尔积:
将hive.strict.checks.cartesian.product
设置为true
时,会限制笛卡尔积的查询。 对于关系型数据库,执行join
时不使用on
而是在where
中加入关联语句,那么关系型数据库的优化器可能会自动将where
转换到on
上。但是Hive不会执行这种优化,因此如果表数据很大,那么这个查询就会出现不可控的情况。
其他优化
可以在Hadoop端配置 JVM重用、压缩等优化。
可以对表进行分区、分桶进行优化