1. Hive表设计优化

1.1 分区表

优化前:普通表查询
image.png
优化后:分区表查询
image.png
总结,分区表结构的设计思想是:根据查询的需求,将数据按照查询的条件【一般都以时间】进行划分分区存储,将不同分区的数据单独使用一个HDFS目录来进行存储,当底层实现计算时,根据查询的条件,只读取对应分区的数据作为输入,减少不必要的数据加载,提高程序的性能

1.2 分桶表

1.2.1 Hive中join的问题

Hive Join的底层是通过MapReduce来实现的,但是Hive实现Join时面临一个问题:如果有两张非常大的表要进行Join,两张表的数据量都很大,Hive底层通过MapReduce实现时,无法使用MapJoin提高Join的性能,只能走默认的ReduceJoin,而ReduceJoin必须经过Shuffle过程,相对性能比较差,而且容易产生数据倾斜,如何解决这个问题?

1.2.2 分桶表设计思想

分桶表的设计有别于分区表的设计,分区表是将数据划分不同的目录进行存储,而分桶表是将数据划分不同的文件进行存储。分桶表的设计是按照一定的规则(hash)将数据划分到不同的文件中进行存储,构建分桶表。
如果有两张表按照相同的划分规则【按照Join的关联字段】将各自的数据进行划分,在Join时,就可以实现Bucket与Bucket的Join,避免不必要的比较。
例如:当前有两张表,订单表有1000万条,用户表有10万条,两张表的关联字段是userid,现在要实现两张表的Join。
我们将订单表和用户表都按照userid划分为3个桶。在Join时,只需要将两张表的Bucket0与Bucket0进行Join,Bucket1与Bucket1进行Join,Bucket2与Bucket2进行Join即可,不用让所有的数据挨个比较,降低了比较次数,提高了Join的性能。
image.png

1.3 索引

索引功能支持是从Hive0.7版本开始,到Hive3.0不再支持。
Hive中索引的基本原理:当为某张表的某个字段创建索引时,Hive中会自动创建一张索引表,该表记录了该字段的每个值与数据实际物理位置之间的关系,例如数据所在的HDFS文件地址,以及所在文件中偏移量offset等信息。

  • Hive索引问题

Hive构建索引的过程是通过一个MapReduce程序来实现的,这就导致了Hive的一个问题,每次Hive中原始数据表的数据发生更新时,索引表不会自动更新,必须手动执行一个Alter index命令来实现通过MapReduce更新索引表,导致整体性能较差,维护相对繁琐
实际工作场景中,一般不推荐使用Hive Index,推荐使用ORC文件格式中的索引来代替Hive Index提高查询性能。

2. Hive表数据优化

2.1 文件格式

Hive数据存储的本质还是HDFS,所有的数据读写都基于HDFS的文件来实现,为了提高对HDFS文件读写的性能,Hive中提供了多种文件存储格式:TextFile、SequenceFile、RCFile、ORC、Parquet等。不同的文件存储格式具有不同的存储特点,有的可以降低存储空间,有的可以提高查询性能等,可以用来实现不同场景下的数据存储,以提高对于数据文件的读写效率。

2.1.1 TextFile

TextFIle是Hive中默认的文件格式,存储形式为按行存储。建表时不指定存储格式即为textfile,导入数据时把数据文件拷贝至hdfs不进行处理。

  • TextFile的优点
    • 最简单的数据格式,不需要经过处理,可以直接cat查看
    • 可以使用任意的分隔符进行分割
    • 便于和其他工具(Pig, grep, sed, awk)共享数据
    • 可以搭配Gzip、Bzip2、Snappy等压缩一起使用
  • TextFile的缺点
    • 耗费存储空间,I/O性能较低
    • 结合压缩时Hive不进行数据切分合并,不能进行并行操作,查询效率低
    • 按行存储,读取列的性能差
  • TextFile的应用场景

    • 适合于小量数据的存储查询
    • 一般用于做第一层数据加载和测试使用

      2.1.2 SequenceFile

      SequenceFile是Hadoop里用来存储序列化的键值对即二进制的一种文件格式。SequenceFile文件也可以作为MapReduce作业的输入和输出,hive也支持这种格式。
  • SequenceFIle的优点

    • 以二进制的KV形式存储数据,与底层交互更加友好,性能更快
    • 可压缩、可分割,优化磁盘利用率和I/O
    • 可并行操作数据,查询效率高
    • SequenceFile也可以用于存储多个小文件
  • SequenceFIle的缺点
    • 存储空间消耗最大
    • 与非Hadoop生态系统之外的工具不兼容
    • 构建SequenceFile需要通过TextFile文件转化加载
  • SequenceFIle的应用

    • 适合于小量数据,但是查询列比较多的场景
      1. create table tb_sogou_seq(
      2. stime string,
      3. userid string,
      4. keyword string,
      5. clickorder string,
      6. url string
      7. )
      8. row format delimited fields terminated by '\t'
      9. stored as sequencefile;

      2.1.3 Parquet

      Parquet是一种支持嵌套结构的列式存储文件格式,最早是由Twitter和Cloudera合作开发,2015年5月从Apache孵化器里毕业成为Apache顶级项目。是一种支持嵌套数据模型对的列式存储系统,作为大数据系统中OLAP查询的优化方案,它已经被多种查询引擎原生支持,并且部分高性能引擎将其作为默认的文件存储格式。通过数据编码和压缩,以及映射下推和谓词下推功能,Parquet的性能也较之其它文件格式有所提升。
      Parquet 是与语言无关的,而且不与任何一种数据处理框架绑定在一起,适配多种语言和组件,能够与 Parquet 适配的查询引擎包括 Hive, Impala, Pig, Presto, Drill, Tajo, HAWQ, IBM Big SQL等,计算框架包括 MapReduce, Spark, Cascading, Crunch, Scalding, Kite 等
      Parquet是Hadoop生态圈中主流的列式存储格式,并且行业内流行这样一句话流传:如果说HDFS是大数据时代文件系统的事实标准,Parquet 就是大数据时代存储格式的事实标准。Hive中也同样支持使用Parquet格式来实现数据的存储,并且是工作中主要使用的存储格式之一。
  • Parquet的优点

    • 更高效的压缩和编码
    • 可用于多种数据处理框架
  • Parquet的缺点
    • 不支持update, insert, delete, ACID
  • Parquet的应用

    • 适用于字段数非常多,无更新,只取部分列的查询。
      1. create table tb_sogou_parquet(
      2. stime string,
      3. userid string,
      4. keyword string,
      5. clickorder string,
      6. url string
      7. )
      8. row format delimited fields terminated by '\t'
      9. stored as parquet;

      2.1.4 ORC

      ORC(OptimizedRC File)文件格式也是一种Hadoop生态圈中的列式存储格式,源自于RC(RecordColumnar File),它的产生早在2013年初,最初产生自Apache Hive,用于降低Hadoop数据存储空间和加速Hive查询速度。它并不是一个单纯的列式存储格式,仍然是首先根据行组分割整个表,在每一个行组内进行按列存储。ORC文件是自描述的,它的元数据使用Protocol Buffers序列化,并且文件中的数据尽可能的压缩以降低存储空间的消耗,目前也被Hive、Spark SQL、Presto等查询引擎支持。2015年ORC项目被Apache项目基金会提升为Apache顶级项目。
      ORC文件也是以二进制方式存储的,所以是不可以直接读取,ORC文件也是自解析的,它包含许多的元数据,这些元数据都是同构ProtoBuffer进行序列化的。其中涉及到如下的概念:
  • ORC文件:保存在文件系统上的普通二进制文件,一个ORC文件中可以包含多个stripe,每一个stripe包含多条记录,这些记录按照列进行独立存储,对应到Parquet中的row group的概念。

  • 文件级元数据:包括文件的描述信息PostScript、文件meta信息(包括整个文件的统计信息)、所有stripe的信息和文件schema信息。

stripe:一组行形成一个stripe,每次读取文件是以行组为单位的,一般为HDFS的块大小,保存了每一列的索引和数据。
stripe元数据:保存stripe的位置、每一个列的在该stripe的统计信息以及所有的stream类型和位置。
row group:索引的最小单位,一个stripe中包含多个row group,默认为10000个值组成。
stream:一个stream表示文件中一段有效的数据,包括索引和数据两类。索引stream保存每一个row group的位置和统计信息,数据stream包括多种类型的数据,具体需要哪几种是由该列类型和编码方式决定。
ORC文件中保存了三个层级的统计信息,分别为文件级别、stripe级别和row group级别的,他们都可以用来根据Search ARGuments(谓词下推条件)判断是否可以跳过某些数据,在统计信息中都包含成员数和是否有null值,并且对于不同类型的数据设置一些特定的统计信息。

  • ORC的优点
    • 列式存储,存储效率非常高
    • 可压缩,高效的列存取
    • 查询效率较高,支持索引
    • 支持矢量化查询
  • ORC的缺点
    • 加载时性能消耗较大
    • 需要通过text文件转化生成
    • 读取全量数据时性能较差
  • ORC的应用

    • 适用于Hive中大型的存储、查询
      1. create table tb_sogou_orc(
      2. stime string,
      3. userid string,
      4. keyword string,
      5. clickorder string,
      6. url string
      7. )
      8. row format delimited fields terminated by '\t'
      9. stored as orc;

      2.2 数据压缩

      Hive底层转换HQL运行MapReduce程序时,磁盘I/O操作、网络数据传输、shuffle和merge要花大量的时间,尤其是数据规模很大和工作负载密集的情况下,鉴于磁盘I/O和网络带宽是Hadoop的宝贵资源,数据压缩对于节省资源、最小化磁盘I/O和网络传输非常有帮助。如果磁盘I/O和网络带宽影响了MapReduce作业性能,在任意MapReduce阶段启用压缩都可以改善端到端处理时间并减少I/O和网络流量。
  • 压缩的优点

    • 减小文件存储所占空间
    • 加快文件传输效率,从而提高系统的处理速度
    • 降低IO读写的次数
  • 压缩的缺点
    • 使用数据时需要先对文件解压,加重CPU负荷,压缩算法越复杂,解压时间越长
  • Hadoop中各种压缩算法对比

image.png

2.2.1 Hive 压缩配置

Hive中的压缩就是使用了Hadoop中的压缩实现的,所以Hadoop中支持的压缩在Hive中都可以直接使用。
Hadoop中支持的压缩算法
image.png
要想在Hive中使用压缩,需要对MapReduce和Hive进行相应的配置。

  • 临时配置

配置MapReduce开启输出压缩及配置压缩类型

  1. --开启输出压缩
  2. set mapreduce.output.fileoutputformat.compress=true;
  3. --配置压缩类型为Snappy
  4. set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;

配置Hive开启中间结果压缩和输出压缩及配置压缩类型

  1. -- 中间结果压缩
  2. set hive.exec.compress.intermediate=true;
  3. set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
  4. -- 输出结果压缩
  5. set hive.exec.compress.output=true;
  • 永久配置

(1)将以上MapReduce的配置写入mapred-site.xml中,重启Hadoop
(2)将以上Hive的配置写入hive-site.xml中,重启Hive

2.3 存储优化

2.3.1 避免生成小文件

Hive的存储本质还是HDFS,HDFS是不利于小文件存储的,因为每个小文件会产生一条元数据信息,并且不利用MapReduce的处理,MapReduce中每个小文件会启动一个MapTask计算处理,导致资源的浪费,所以在使用Hive进行处理分析时,要尽量避免小文件的生成。
那么在使用Hive时,如何能避免小文件的生成呢?当我们使用多个Reduce进行聚合计算时,我们并不清楚每个Reduce最终会生成的结果的数据大小,无法控制用几个Reduce来处理。Hive中为我们提供了一个特殊的机制,可以自动的判断是否是小文件,如果是小文件可以自动将小文件进行合并。

  • 配置

    1. -- 如果hive的程序,只有maptask,将MapTask产生的所有小文件进行合并
    2. set hive.merge.mapfiles=true;
    3. -- 如果hive的程序,有MapReduceTask,将ReduceTask产生的所有小文件进行合并
    4. set hive.merge.mapredfiles=true;
    5. -- 每一个合并的文件的大小
    6. set hive.merge.size.per.task=256000000;
    7. -- 平均每个文件的大小,如果小于这个值就会进行合并
    8. set hive.merge.smallfiles.avgsize=16000000;

    2.3.2 读取小文件

    尽管我们通过配置避免了多个小文件的同时产生,但是我们总会遇到数据处理的中间结果是小文件的情况,例如每个小时的分区数据中,大多数小时的数据都比较多,但是个别几个小时,如凌晨的2点~6点等等,数据量比较小,下一步进行处理时就必须对多个小文件进行处理,那么这种场景下怎么解决呢?
    类似于MapReduce中的解决方案,Hive中也提供一种输入类CombineHiveInputFormat,用于将小文件合并以后,再进行处理。

  • 配置

    1. -- 设置Hive中底层MapReduce读取数据的输入类:将所有文件合并为一个大文件作为输入
    2. set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

    2.3.3 ORC文件索引

    在使用ORC文件时,为了加快读取ORC文件中的数据内容,ORC提供了两种索引机制:Row Group Index和 Bloom Filter Index可以帮助提高查询ORC文件的性能,当用户写入数据时,可以指定构建索引,当用户查询数据时,可以根据索引提前对数据进行过滤,避免不必要的数据扫描。

  • Row Group Index

一个ORC文件包含一个或多个stripes(groups of row data),每个stripe中包含了每个column的min/max值的索引数据,当查询中有<,>,=的操作时,会根据min/max值,跳过扫描不包含的stripes。而其中为每个stripe建立的包含min/max值的索引,就称为Row Group Index行组索引,也叫min-max Index大小对比索引,或者Storage Index。
在建立ORC格式表时,指定表参数’orc.create.index’=’true’之后,便会建立Row Group Index,需要注意的是,为了使Row Group Index有效利用,向表中加载数据时,必须对需要使用索引的字段进行排序,否则,min/max会失去意义。另外,这种索引主要用于数值型字段的范围查询过滤优化上
image.png

  • 开启索引配置
    1. set hive.optimize.index.filter=true;
    永久生效,请配置在hive-site.xml中
    使用: ```sql — 创建表,并指定构建索引 create table tb_sogou_orc_index stored as orc tblproperties (“orc.create.index”=”true”) as select * from tb_sogou_source distribute by stime sort by stime;

— 当进行范围或者等值查询(<,>,=)时就可以基于构建的索引进行查询 select count(*) from tb_sogou_orc_index where stime > ‘12:00:00’ and stime < ‘18:00:00’;

  1. - **Bloom Filter Index**
  2. 建表时候,通过表参数”orc.bloom.filter.columns”=”columnName……”来指定为哪些字段建立BloomFilter索引,这样,在生成数据的时候,会在每个stripe中,为该字段建立BloomFilter的数据结构,当**查询条件中包含对该字段的=号过滤时候**,先从BloomFilter中获取以下是否包含该值,如果不包含,则跳过该stripe。<br />使用:
  3. ```sql
  4. -- 创建表,并指定构建索引
  5. create table tb_sogou_orc_bloom
  6. stored as orc tblproperties ("orc.create.index"="true",orc.bloom.filter.columns"="stime,userid")
  7. as select * from tb_sogou_source distribute by stime sort by stime;
  8. -- stime的范围过滤可以走row group index,userid的过滤可以走bloom filter index
  9. select
  10. count(*)
  11. from tb_sogou_orc_index
  12. where stime > '12:00:00' and stime < '18:00:00'
  13. and userid = '3933365481995287' ;

2.3.4 ORC矢量化查询

Hive的默认查询执行引擎一次处理一行,而矢量化查询执行是一种Hive针对ORC文件操作的特性,目的是按照每批1024行读取数据,并且一次性对整个记录整合(而不是对单条记录)应用操作,提升了像过滤, 联合, 聚合等等操作的性能。
注意:要使用矢量化查询执行,就必须以ORC格式存储数据。

  • 配置

    1. set hive.vectorized.execution.enabled = true;
    2. set hive.vectorized.execution.reduce.enabled = true;

    3. 计算Job执行优化

    3.1 Explain

    explain会解析HQL语句,将整个HQL语句的实现步骤、依赖关系、实现过程都会进行解析返回,可以帮助更好的了解一条HQL语句在底层是如何实现数据的查询及处理的过程。

  • 语法 | EXPLAIN [FORMATTED|EXTENDED|DEPENDENCY|AUTHORIZATION|] query | | —- |

    • FORMATTED:对执行计划进行格式化,返回JSON格式的执行计划
    • EXTENDED:提供一些额外的信息,比如文件的路径信息
    • DEPENDENCY:以JSON格式返回查询所依赖的表和分区的列表
    • AUTHORIZATION:列出需要被授权的条目,包括输入与输出
  • 组成

解析后的执行计划一般由三个部分构成,分别是:
(1)The Abstract Syntax Tree for the query
抽象语法树:Hive使用Antlr解析生成器,可以自动地将HQL生成为抽象语法树
(2)The dependencies between the different stages of the plan
Stage依赖关系:会列出运行查询所有的依赖以及stage的数量
(3)The description of each of the stages
Stage内容:包含了非常重要的信息,比如运行时的operator和sort orders等具体的信息

3.2 MapReduce属性优化

3.2.1 本地模式

使用Hive的过程中,有一些数据量不大的表也会转换为MapReduce处理,提交到集群时,需要申请资源,等待资源分配,启动JVM进程,再运行Task,一系列的过程比较繁琐,本身数据量并不大,提交到YARN运行返回会导致性能较差的问题。
Hive为了解决这个问题,延用了MapReduce中的设计,提供本地计算模式,允许程序不提交给YARN,直接在本地运行,以便于提高小数据量程序的性能。

  • 配置

    1. -- 开启本地模式
    2. set hive.exec.mode.local.auto = true;

    限制条件
    Hive为了避免大数据量的计算也使用本地模式导致性能差的问题,所以对本地模式做了以下限制,如果以下任意一个条件不满足,那么即使开启了本地模式,将依旧会提交给YARN集群运行。

  • 处理的数据量不超过128M

  • MapTask的个数不超过4个
  • ReduceTask的个数不超过1个

    3.2.2 JVM重用

    JVM正常指代一个Java进程,Hadoop默认使用派生的JVM来执行map-reducer,如果一个MapReduce程序中有100个Map,10个Reduce,Hadoop默认会为每个Task启动一个JVM来运行,那么就会启动100个JVM来运行MapTask,在JVM启动时内存开销大,尤其是Job大数据量情况,如果单个Task数据量比较小,也会申请JVM资源,这就导致了资源紧张及浪费的情况。
    为了解决上述问题,MapReduce中提供了JVM重用机制来解决,JVM重用可以使得JVM实例在同一个job中重新使用N次,当一个Task运行结束以后,JVM不会进行释放,而是继续供下一个Task运行,直到运行了N个Task以后,就会释放,N的值可以在Hadoop的mapred-site.xml文件中进行配置,通常在10-20之间。

  • 配置

    1. -- Hadoop3之前的配置,在mapred-site.xml中添加以下参数
    2. -- Hadoop3中已不再支持该选项
    3. mapreduce.job.jvm.numtasks=10

    3.2.3 并行执行

    Hive在实现HQL计算运行时,会解析为多个Stage,有时候Stage彼此之间有依赖关系,只能挨个执行,但是在一些别的场景下,很多的Stage之间是没有依赖关系的,例如Union语句,Join语句等等,这些Stage没有依赖关系,但是Hive依旧默认挨个执行每个Stage,这样会导致性能非常差,我们可以通过修改参数,开启并行执行,当多个Stage之间没有依赖关系时,允许多个Stage并行执行,提高性能。

  • 配置

    1. -- 开启Stage并行化,默认为false
    2. SET hive.exec.parallel=true;
    3. -- 指定并行化线程数,默认为8
    4. SET hive.exec.parallel.thread.number=16;

    注意:线程数越多,程序运行速度越快,但同样更消耗CPU资源

    3.3 Join优化

    表的Join是数据分析处理过程中必不可少的操作,Hive同样支持Join的语法,Hive Join的底层还是通过MapReduce来实现的,Hive实现Join时,为了提高MapReduce的性能,提供了多种Join方案来实现。例如适合小表Join大表的Map Join,大表Join大表的Reduce Join,以及大表Join的优化方案Bucket Join等。

    3.3.1 Map Join

  • 应用场景

适合于小表join大表或者小表Join小表

  • 原理

image.png
将小的那份数据给每个MapTask的内存都放一份完整的数据,大的数据每个部分都可以与小数据的完整数据进行join。
底层不需要经过shuffle,需要占用内存空间存放小的数据文件。

  • 使用 | — 默认已经开启了Map Join
    hive.auto.convert.join=true | | —- |

  • Hive中判断哪张表是小表及限制

    • LEFT OUTER JOIN的左表必须是大表
    • RIGHT OUTER JOIN的右表必须是大表
    • INNER JOIN左表或右表均可以作为大表
    • FULL OUTER JOIN不能使用MAPJOIN
    • MAPJOIN支持小表为子查询
    • 使用MAPJOIN时需要引用小表或是子查询时,需要引用别名
    • 在MAPJOIN中,可以使用不等值连接或者使用OR连接多个条件
    • 在MAPJOIN中最多支持指定6张小表,否则报语法错误
  • Hive中小表的大小限制 | — 2.0版本之前的控制属性
    hive.mapjoin.smalltable.filesize=25M
    — 2.0版本开始由以下参数控制
    hive.auto.convert.join.noconditionaltask.size=512000000 | | —- |

3.3.2 Reduce Join

  • 应用场景

适合于大表Join大表

  • 原理

image.png
将两张表的数据在shuffle阶段利用shuffle的分组来将数据按照关联字段进行合并。
必须经过shuffle,利用Shuffle过程中的分组来实现关联。

  • 使用

Hive会自动判断是否满足Map Join,如果不满足Map Join,则自动执行Reduce Join。

3.3.3 Bucket Join

  • 应用场景

适合于大表Join大表

  • 原理

image.png
将两张表按照相同的规则将数据划分,根据对应的规则的数据进行join,减少了比较次数,提高了性能

  • 使用 | — 开启分桶join
    set hive.optimize.bucketmapjoin = true; | | —- |

分桶语法见前面文章
要求:分桶字段 = Join字段 ,桶的个数相等或者成倍数。

  • Sort Merge Bucket Join(SMB):基于有序的数据Join

    语法:clustered by colName sorted by (colName)

— 开启分桶SMB join
set hive.optimize.bucketmapjoin = true;
set hive.auto.convert.sortmerge.join=true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.auto.convert.sortmerge.join.noconditionaltask=true;

要求:分桶字段 = Join字段 = 排序字段 ,桶的个数相等或者成倍数。

3.4 优化器

3.4.1 关联优化

在使用Hive的过程中经常会遇到一些特殊的问题,例如当一个程序中如果有一些操作彼此之间有关联性,是可以放在一个MapReduce中实现的,但是Hive会不智能的选择,Hive会使用两个MapReduce来完成这两个操作。
例如:当我们执行以下SQL语句:select …… from table group by id order by id desc;
该SQL语句转换为MapReduce时,我们可以有两种方案来实现:

  1. 方案一

第一个MapReduce做group by,经过shuffle阶段对id做分组;
第二个MapReduce对第一个MapReduce的结果做order by,经过shuffle阶段对id进行排序。

  1. 方案二

因为都是对id处理,可以使用一个MapReduce的shuffle既可以做分组也可以排序。
在这种场景下,Hive会默认选择用第一种方案来实现,这样会导致性能相对较差,我们可以在Hive中开启关联优化,对有关联关系的操作进行解析时,可以尽量放在同一个MapReduce中实现。

  • 配置 | set hive.optimize.correlation=true; | | —- |

3.4.2 CBO优化器引擎

在使用MySQL或者Hive等工具时,我们经常会遇到一个问题,默认的优化器在底层解析一些聚合统计类的处理的时候,底层解析的方案有时候不是最佳的方案。
例如:当前有一张表【共1000条数据】,id构建了索引,id =100值有900条,我们现在的需求是查询所有id = 100的数据,所以SQL语句为:select from table where id = 100;
由于id这一列构建了索引,索引默认的优化器引擎RBO,会选择先从索引中查询id = 100的值所在的位置,再根据索引记录位置去读取对应的数据,但是这并不是最佳的执行方案。有id=100的值有900条,占了总数据的90%,这时候是没有必要检索索引以后再检索数据的,可以直接检索数据返回,这样的效率会更高,更节省资源,这种方式就是CBO优化器引擎会选择的方案。
使用Hive时,*Hive中也支持RBO与CBO这两种引擎,默认使用的是RBO优化器引擎。

  • RBO
  1. rule basic optimise:基于规则的优化器
  2. 根据设定好的规则来对程序进行优化
  • CBO
  1. cost basic optimise:基于代价的优化器
  2. 根据不同场景所需要付出的代价来合适选择优化的方案
  3. 对数据的分布的信息【数值出现的次数,条数,分布】来综合判断用哪种处理的方案是最佳方案

很明显CBO引擎更加智能,所以在使用Hive时,我们可以配置底层的优化器引擎为CBO引擎。

  • 配置 | set hive.cbo.enable=true;
    set hive.compute.query.using.stats=true;
    set hive.stats.fetch.column.stats=true;
    set hive.stats.fetch.partition.stats=true; | | —- |

  • 要求

  1. 要想使用CBO引擎,必须构建数据的元数据【表行数、列的信息、分区的信息……】
  2. 提前获取这些信息,CBO才能基于代价选择合适的处理计划
  3. 所以CBO引擎一般搭配analyze分析优化器一起使用

    3.4.3 Analyze分析优化器

    用于提前运行一个MapReduce程序将表或者分区的信息构建一些元数据【表的信息、分区信息、列的信息】,搭配CBO引擎一起使用。
  • 语法 ```sql — 构建分区信息元数据 ANALYZE TABLE tablename [PARTITION(partcol1[=val1], partcol2[=val2], …)] COMPUTE STATISTICS [noscan];

— 构建列的元数据 ANALYZE TABLE tablename [PARTITION(partcol1[=val1], partcol2[=val2], …)] COMPUTE STATISTICS FOR COLUMNS ( columns name1, columns name2…) [noscan];

— 查看元数据 DESC FORMATTED [tablename] [columnname];

  1. 示例:
  2. ```sql
  3. -- 构建表中分区数据的元数据信息
  4. ANALYZE TABLE tb_login_part PARTITION(logindate) COMPUTE STATISTICS;
  5. -- 构建表中列的数据的元数据信息
  6. ANALYZE TABLE tb_login_part COMPUTE STATISTICS FOR COLUMNS userid;
  7. -- 查看构建的列的元数据
  8. desc formatted tb_login_part userid;

3.5 谓词下推(PPD)

谓词下推 Predicate Pushdown(PPD)的思想:简单点说就是在不影响最终结果的情况下,尽量将过滤条件提前执行(比如两表join,先where过滤减少数据量再join)。谓词下推后,过滤条件在map端执行,减少了map端的输出,降低了数据在集群上传输的量,降低了Reduce端的数据负载,节约了集群的资源,也提升了任务的性能。

— 默认自动开启谓词下推
hive.optimize.ppd=true;

3.6 数据倾斜

现象:某个任务,大多数Task都执行完毕,只有一个Task执行缓慢。
原因:数据分配不均衡。
思考:为什么会出现多个Task数据分配不均衡的情况呢?
从两方面考虑,第一:数据本身就是倾斜的,数据中某种数据出现的次数过多。第二:分区规则导致这些相同的数据都分配给了同一个Task,导致这个Task拿到了大量的数据,而其他Task拿到的数据比较少,所以运行起来相比较于其他Task就比较慢一些。
综上所述,产生数据倾斜的根本原因在于分区规则。

3.6.1 group By的数据倾斜

当程序中出现group by或者count(distinct)等分组聚合的场景时,如果数据本身是倾斜的根据MapReduce的Hash分区规则,肯定会出现数据倾斜的现象。根本原因是因为分区规则导致的,所以我们可以通过以下几种方案来解决group by导致的数据倾斜的问题。

  • 方案一:开启Map端聚合 | — 开启Map端聚合:Combiner
    hive.map.aggr=true; | | —- |

通过减少Reduce的输入量,避免每个Task数据差异过大导致数据倾斜。

  • 方案二:实现随机分区 | — SQL中避免数据倾斜,构建随机分区
    select * from table distribute by rand(); | | —- |

distribute by用于指定底层的MapReduce按照哪个字段作为Key实现分区、分组等。
默认由Hive自己选择,我们可以通过distribute by自己指定,通过rank函数随机值实现随机分区,避免数据倾斜。

  • 方案三:自动构建随机分区并自动聚合 | — 开启随机分区,走两个MapReduce
    hive.groupby.skewindata=true; | | —- |

开启该参数以后,当前程序会自动通过两个MapReduce来运行:第一个MapReduce自动进行随机分区,然后实现聚合;第二个MapReduce将聚合的结果再按照业务进行处理,得到结果。

3.6.2 Join的数据倾斜

实际业务需求中往往需要构建两张表的Join实现,如果两张表比较大,无法实现Map Join,只能走Reduce Join,那么当关联字段中某一种值过多的时候依旧会导致数据倾斜的问题,面对Join产生的数据倾斜,我们核心的思想是尽量避免Reduce Join的产生,优先使用Map Join来实现,但往往很多的Join场景不满足Map Join的需求,那么我们可以以下几种方案来解决Join产生的数据倾斜问题:

  • 方案一:提前过滤,将大数据变成小数据,实现Map Join

实现两张表的Join时,我们要尽量考虑是否可以使用Map Join来实现Join过程有些场景下看起来是大表Join大表,但是我们可以通过转换将大表Join大表变成大表Join小表,来实现Map Join。
例如:现在有两张表订单表A与用户表B,需要实现查询今天所有订单的用户信息,关联字段为userid。
A表:今天的订单,1000万条,字段:orderId,userId,produceId,price等
B表:用户信息表,100万条,字段:userid,username,age,phone等
需求:两张表关联得到今天每个订单的用户信息。
实现1:直接关联,实现大表Join大表

select a.,b. from A join B on a.userid = b.userid;

由于两张表比较大,无法走Map Join,只能走Reduce Join,容易产生数据倾斜。
实现2:将下了订单的用户的数据过滤出来,再Join

  1. select a.*,d.*
  2. from (
  3. -- 获取所有下订单的用户信息
  4. select
  5. b.*
  6. from
  7. -- 获取所有下订单的userid
  8. ( select distinct a.userid from A a ) c join B b on c.userid = b.userid ) d
  9. join
  10. A a on d.userid = a.userid;
  1. 100万个用户中,在今天下订单的人数可能只有一小部分,大量数据是不会Join成功的
  2. 可以提前将订单表中的userid去重,获取所有下订单的用户id
  3. 再使用所有下订单的用户id关联用户表,得到所有下订单的用户的信息
  4. 最后再使用下订单的用户信息关联订单表
  5. 通过多次Map Join来代替Reduce Join,性能更好也可以避免数据倾斜
    • 方案二:使用Bucket Join

如果使用方案一来避免Reduce Join ,有些场景下依旧无法满足,例如过滤后的数据依旧是一张大表,那么最后的Join依旧是一个Reduce Join。这种场景下,我们可以将两张表的数据构建为桶表,实现Bucket Map Join,避免数据倾斜。

  • 方案三:使用Skew Join

Skew Join是Hive中一种专门为了避免数据倾斜而设计的特殊的Join过程,这种Join的原理是将Map Join和Reduce Join进行合并,如果某个值出现了数据倾斜,就会将产生数据倾斜的数据单独使用Map Join来实现,其他没有产生数据倾斜的数据由Reduce Join来实现,这样就避免了Reduce Join中产生数据倾斜的问题,最终将Map Join的结果和Reduce Join的结果进行Union合并。
配置:

  1. -- 开启运行过程中skewjoin
  2. set hive.optimize.skewjoin=true;
  3. -- 如果这个key的出现的次数超过这个范围
  4. set hive.skewjoin.key=100000;
  5. -- 在编译时判断是否会产生数据倾斜
  6. set hive.optimize.skewjoin.compiletime=true;
  7. -- 不合并,提升性能
  8. set hive.optimize.union.remove=true;
  9. -- 如果Hive的底层走的是MapReduce,必须开启这个属性,才能实现不合并
  10. set mapreduce.input.fileinputformat.input.dir.recursive=true;

示例图:
image.png