执行计划Explain

查看执行计划:

  1. -- EXTENDED 查看详细执行计划
  2. EXPLAIN [EXTENDED | DEPENDENCY | AUTHORIZATION] query;

有的SQL执行不需要走MR,例如:

  1. explain select * from emp;

输出结果:

  1. STAGE DEPENDENCIES:
  2. Stage-0 is a root stage
  3. STAGE PLANS:
  4. Stage: Stage-0
  5. Fetch Operator
  6. limit: -1
  7. Processor Tree:
  8. TableScan
  9. alias: emp
  10. Statistics: Num rows: 1 Data size: 6190 Basic stats: COMPLETE Column stats: NONE
  11. Select Operator
  12. expressions: empno (type: int), ename (type: string), job (type: string), mgr (type: int), hiredate (type: string), sal (type: double), comm (type: double), deptno (type: int)
  13. outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7
  14. Statistics: Num rows: 1 Data size: 6190 Basic stats: COMPLETE Column stats: NONE
  15. ListSink

有的SQL需要走MR,例如:

  1. explain select deptno, avg(sal) as avg_sal from emp group by deptno;

输出结果:

  1. STAGE DEPENDENCIES:
  2. Stage-1 is a root stage
  3. Stage-0 depends on stages: Stage-1
  4. STAGE PLANS:
  5. Stage: Stage-1
  6. Map Reduce
  7. Map Operator Tree:
  8. TableScan
  9. alias: emp
  10. Statistics: Num rows: 1 Data size: 6190 Basic stats: COMPLETE Column stats: NONE
  11. Select Operator
  12. expressions: sal (type: double), deptno (type: int)
  13. outputColumnNames: sal, deptno
  14. Statistics: Num rows: 1 Data size: 6190 Basic stats: COMPLETE Column stats: NONE
  15. Group By Operator
  16. aggregations: sum(sal), count(sal)
  17. keys: deptno (type: int)
  18. mode: hash
  19. outputColumnNames: _col0, _col1, _col2
  20. Statistics: Num rows: 1 Data size: 6190 Basic stats: COMPLETE Column stats: NONE
  21. Reduce Output Operator
  22. key expressions: _col0 (type: int)
  23. sort order: +
  24. Map-reduce partition columns: _col0 (type: int)
  25. Statistics: Num rows: 1 Data size: 6190 Basic stats: COMPLETE Column stats: NONE
  26. value expressions: _col1 (type: double), _col2 (type: bigint)
  27. Execution mode: vectorized
  28. Reduce Operator Tree:
  29. Group By Operator
  30. aggregations: sum(VALUE._col0), count(VALUE._col1)
  31. keys: KEY._col0 (type: int)
  32. mode: mergepartial
  33. outputColumnNames: _col0, _col1, _col2
  34. Statistics: Num rows: 1 Data size: 6190 Basic stats: COMPLETE Column stats: NONE
  35. Select Operator
  36. expressions: _col0 (type: int), (_col1 / _col2) (type: double)
  37. outputColumnNames: _col0, _col1
  38. Statistics: Num rows: 1 Data size: 6190 Basic stats: COMPLETE Column stats: NONE
  39. File Output Operator
  40. compressed: false
  41. Statistics: Num rows: 1 Data size: 6190 Basic stats: COMPLETE Column stats: NONE
  42. table:
  43. input format: org.apache.hadoop.mapred.SequenceFileInputFormat
  44. output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
  45. serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
  46. Stage: Stage-0
  47. Fetch Operator
  48. limit: -1
  49. Processor Tree:
  50. ListSink

Fetch抓取

Fetch是指Hive中对某些情况的查询可以不必使用 MapReduce计算。

例如 select * from emp;,Hive可以直接读取emp对应的hdfs文件然后输出到控制台。

hive-default.xml.template文件中,hive.fetch.task.conversion默认值是more(老版本默认是minimal),如果设置为more,那么全局查找、字段查找、limit等查找都不走 MapReduce。

本地模式

大多数的 Hadoop Job是需要Hadoop提供的完整的可扩展来处理大数据集的。不过,有时Hive的输入数据量是非常小的。在这种情况下,为查询触发执行任务消耗的时间可能会比实际 job 的执行时间要多的多。对于大多数这种情况,Hive可以通过本地模式在单台机器上处理所有的任务。对于小数据集,执行时间可以明显被缩短。

用户可以通过设置 hive.exec.mode.local.auto的值为true,来让hive在适当的时候自动启动这个优化。

  1. -- 开启本地MR。默认值为 false
  2. set hive.exec.mode.local.auto=true;
  3. -- 设置 local MR 的最大输入数据量。当数据量小于这个值时采用 local MR的方式,默认为134217728,即 128M
  4. set hive.exec.mode.local.auto.inputbytes.max=5000000;
  5. -- 设置 local MR 的最大输入文件个数。当输入文件个数小于这个值时采用 local MR 的方式,默认为4
  6. set hive.exec.mode.local.auto.input.files.max=10;

表的优化

小表大表Join(MapJoin)

将 key 相对分散,并且数据量小的表放在 join的左边,可以使用 map join 让小的维度表先进内存,在 map 端完成 join。

新版的hive已经对小表大表的join进行了优化,小表放在左边还是右边已经没有区别。

设置自动选择 MapJoin:

  1. -- 开启自动选择MapJoin.(默认值为true)
  2. set hive.auto.convert.join=true;

大表小表的阈值设置(默认 25M 以下认为是小表):

  1. set hive.mapjoin.smalltable.filesize=25000000;

MapJoin机制:

mapjoin.png

其中:

  • Task A:是一个 Local Task (在客户端本地执行的Task),负责扫描小表 Small Table b 的数据,将其转换成一个HashTable 的数据结构,并写入本地的文件中,之后将该文件加载到 DistributeCache
  • Task B:该任务是一个没有 Reduce 的MR。启动 MapTasks扫描大表 Table a,在 Map 阶段,根据表a的每一条记录去和DistributeCache中 表b 对应的 HashTable关联,并直接输出结果
  • 由于 MapJoin 没有 Reduce,所以由Map直接输出结果文件。有多少个 MapTask,就有多少个结果文件

大表join大表

空值过滤:

有时候 join 超时是因为某些 key 对应的数据太多,而相同key对应的数据都会发送到相同的reducer上,从而导致内存不够。此时我们可以先对数据进行分析,在SQL中过滤掉这些空值等异常数据。

key转换:

有时候虽然某个key为空对应的数据很多,但是相应的数据不是异常数据,必须要包含在join结果中。此时我们可以将表中的key为空的字段赋一个随机的值,使得数据随机均匀的分布到不同的reducer上。

SMB(Sort Merge Bucket join):使用分桶表

Group By

默认情况下,Map阶段同一个key数据分发给一个reducer,当一个key数据过大时就倾斜了。

并不是所有的聚合操作都需要在Reducer完成,很多聚合操作都可以先在Mapper进行部分聚合,最后在Reducer得出最终结果。

开启Map端聚合:

  1. set hive.map.aggr=true;

在Map段进行聚合操作的条目数目:

  1. set hive.groupby.mapaggr.checkinterval=100000;

有数据倾斜的时候进行负载均衡(默认false):

  1. set hive.groupby.skewindata=true;

当选项设定为true,生成的查询计划会有两个MR Job。第一个MR Job中,Map的输出结果会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的Group By key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;第二个MR Job再根据预处理的数据结果按照 Group By key分布到Reduce中(这个过程可以保证相同的 Group By key被分布到同一个Reduce中),最后完成最终的聚合操作。

Count(Distinct) 去重统计

数据量大的情况下,由于count distinct操作需要用一个 Reduce Task来完成,这一个 Reduce需要处理的数据量太大,就会导致整个 Job 很难完成,一般Count distinct 使用先 Group Bycount的方式替换,但是需要注意 group by造成的数据倾斜问题。

笛卡尔积

需要尽量避免笛卡尔积。join的时候如果不加on条件或者无效的on条件,Hive只能使用1个Reducer来完成笛卡尔积。

行列过滤

列处理:在select中,只拿需要的列,如果有分区,尽量使用分区过滤,少用 select *

行处理:在分区裁剪中,当使用外关联时,如果将附表的过滤条件写在where后,那么就会先全表关联,之后再过滤。

合理设置Map和Reduce数量

通常情况下,作业会通过input的目录产生一个或多个map任务。

主要的决定因素有:input的文件总个数,input的文件大小,集群设置的文件块大小。

map的数量不是越多越好。

如果一个任务有很多小文件(远远小于块大小128M),则每个小文件也会被当成一个块,用一个map来完成,而一个map任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费。而且,同时可执行的map数是受限的。

每个map处理接近128M的文件块也不一定是最优的。

比如有一个127M的文件,正常会用一个map去完成,但是如果这个文件只有一个或两个字段,行数有几千万行,而且map处理逻辑比较复杂的话,用一个map去做也会比较耗时。

复杂文件增加Map数

input的文件都很大,任务逻辑复杂,map执行非常慢的时候,可以考虑增加map数,来使得每个map处理的数据量减少,从而提高任务的执行效率。

根据MapReduce的切片计算逻辑:

  1. long splitSize = Math.max(minSize, Math.min(maxSize, blockSize));

调整maxSize最大值,让maxSize最大值低于blockSize就可以增加map的个数。

例如:

  1. set mapreduce.input.fileinputformat.split.maxsize=100;

小文件进行合并

在 map 执行前合并小文件,减少 map 数:CombineHiveInputFormat 具有对小文件进行合并的功能(系统默认的格式)。HiveInputFormat没有对小文件合并功能。

  1. -- 默认值就是CombineHiveInputFormat
  2. set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

在MapReduce的任务结束时合并小文件的设置:

map-only任务结束时合并小文件(默认true):

  1. set hive.merge.mapfiles=true;

map-reduce任务结束时合并小文件(默认false):

  1. set hive.merge.mapredfiles=true;

合并文件的大小(默认256M):

  1. set hive.merge.size.per.task=268435456;

当输出文件的平均大小小于该值时,启动一个独立的 MapReduce 任务进行文件merge

  1. set hive.merge.smallfiles.avgsize=16777216;

合理设置Reduce数量

调整Reduce个数的方法一:

每个Reduce处理的数据量默认是256M:

  1. set hive.exec.reducers.bytes.per.reducer=256000000;

每个任务最大的Reduce数,默认1009:

  1. set hive.exec.reducers.max=1009;

计算Reducer书的公式:

  1. N = min(参数2, 总输入数据量/参数1)

调整Reducer个数方式二:
在Hadoop 的 mapred-site.xml文件中进行修改。

设置每个job的Reducer个数:

  1. set mapreduce.job.reduces=15;

Reducer个数不是越多越好:

  • 过多的启动和初始化Reducer也会消耗时间和资源
  • 有多少个Reducer,就会有多少个输出文件,如果生成了很多小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题

并行执行

Hive会将一个查询转化成一个或多个阶段。这样的阶段可以是 MapReduce 阶段、抽样阶段、合并阶段、limit阶段。或者 hive 执行过程中可能需要的其他阶段。默认情况下,Hive一次只会执行一个阶段。不过,某个特定的 job 可能包含众多的阶段,而这些阶段可能并非完全互相依赖的,也就是说有些阶段是可以并行执行的,这样可能使得整个job的执行时间缩短。

通过设置hive.exec.parallel值为true,就可以开启并行。但是在共享集群中,需要注意,如果 job 中并行阶段增多,那么集群利用率就会增加:

  1. -- 开启任务并行执行
  2. set hive.exec.parallel=true;
  3. -- 同一个sql允许最大并行度,默认为8
  4. set hive.exec.parallel.thread.number=16;

严格模式

Hive可以通过一些设置防止部分危险的操作。

分区表不使用分区过滤:

hive.strict.checks.no.partition.filter 设置为 true时,对于分区表,除非where语句中含有分区字段过滤条件来限制范围,否则不允许执行。换句话说,就是用户不允许扫描所有分区。进行这个限制的原因是:通常分区表都拥有非常大的数据集,而且数据增加迅速。没有进行分区限制的查询可能会消耗令人不可接受的巨大资源来处理这个表。

使用 order by 没有 limit 过滤:

hive.strict.checks.orderby.no.limit设置为true时,对于使用了order by语句的查询,要求必须使用limit语句。因为order by为了执行排序过程会将所有的结果数据分发到同一个Reducer中进行处理,强制要求用户增加limit语句可以防止Reducer额外执行很长一段时间。

笛卡尔积:

hive.strict.checks.cartesian.product设置为true时,会限制笛卡尔积的查询。 对于关系型数据库,执行join时不使用on而是在where中加入关联语句,那么关系型数据库的优化器可能会自动将where转换到on上。但是Hive不会执行这种优化,因此如果表数据很大,那么这个查询就会出现不可控的情况。

其他优化

可以在Hadoop端配置 JVM重用、压缩等优化。

可以对表进行分区、分桶进行优化