Hadoop生态圈技术栈 Hive

第一部分 Hive概述

HDFS => 海量数据的存储

MapReduce => 海量数据的分析和处理

YARN => 集群资源的管理和作业调度

第 1 节 Hive产生背景

第 2 节 Hive和RDBMS对比

第 3 节 Hive的优缺点

第 4 节 Hive架构

第二部分 Hive安装与配置

第三部分 数据类型与文件格式

Hive支持关系型数据库的绝大多数基本数据类型,同时也支持4种集合数据类型

1 节 基本数据类型及转换

Hive类似和java语言中一样,会支持多种不同长度的整型和浮点类型数据,同时也

支持布尔类型、字符串类型,时间戳数据类型以及二进制数组数据类型等。详细信

息见下表:

大类 类型
Integers(整型) TINYINT — 1字节的有符号整数
SAMLINT — 2字节的有符号整数
INT — 4字节的有符号整数
BIGINT — 8字节的有符号整数
Floating point numbers(浮点数) FLOAT — 单精度浮点数
DOUBLE — 双精度浮点数
Fixed point numbers(定点数) DECIMAL—用户自定义精度定点数,如
DECIMAL(10,3)
String types(字符串) STRTIMESTAMP — 时间戳
TIMESTAMP WITH LOCAL
TIME ZONE — 时间戳,纳秒精度
DATE — 日期类型
Boolean(布尔类型) BOOLEAN — TRUE / FALSE
Binary types(二进制类型) BINARY — 字节序列

这些类型名称都是 Hive 中保留字。这些基本的数据类型都是 java 中的接口进行实

现的,因此与 java 中数据类型是基本一致的:

Hive - 图1

数据类型的隐式转换

Hive的数据类型是可以进行隐式转换的,类似于Java的类型转换。如用户在查询中将

一种浮点类型和另一种浮点类型的值做对比,Hive会将类型转换成两个浮点类型中值

较大的那个类型,即:将FLOAT类型转换成DOUBLE类型;当然如果需要的话,任意

整型会转化成DOUBLE类型。 Hive 中基本数据类型遵循以下层次结构,按照这个层

次结构,子类型到祖先类型允许隐式转换。

Hive - 图2

总的来说数据转换遵循以下规律:

  1. hive> select '1.0'+2;
  2. OK
  3. 3.0
  4. hive> select '1111' > 10;
  5. hive> select 1 > 0.8;

数据类型的显示转换

使用cast函数进行强制类型转换;如果强制类型转换失败,返回NULL

  1. hive> select cast('1111s' as int);
  2. OK
  3. NULL
  4. hive> select cast('1111' as int);
  5. OK
  6. 1111

2 节 集合数据类型

Hive支持集合数据类型,包括array、map、struct、union

Hive - 图3

和基本数据类型一样,这些类型的名称同样是保留字;

ARRAY 和 MAP 与 Java 中的 Array 和 Map 类似;

STRUCT 与 C 语言中的 Struct 类似,它封装了一个命名字段集合,复杂数据类型允

许任意层次的嵌套;

  1. hive> select array( 1 , 2 , 3 );
  2. OK
  3. [ 1 , 2 , 3 ]
  4. -- 使用 [] 访问数组元素
  5. hive> select arr[ 0 ] from (select array( 1 , 2 , 3 ) arr) tmp;
  6. hive> select map('a', 1 , 'b', 2 , 'c', 3 );
  7. OK
  8. {"a":1,"b":2,"c":3}
  9. -- 使用 [] 访问map元素
  10. hive> select mymap["a"] from (select map('a', 1 , 'b', 2 , 'c', 3 )
  11. as mymap) tmp;
  12. -- 使用 [] 访问map元素。 key 不存在返回 NULL
  13. hive> select mymap["x"] from (select map('a', 1 , 'b', 2 , 'c', 3 )
  14. as mymap) tmp;
  15. NULL
  16. hive> select struct('username1', 7 , 1288.68);
  17. OK
  18. {"col1":"username1","col2":7,"col3":1288. 68 }
  19. -- struct 中的字段命名
  20. hive> select named_struct("name", "username1", "id", 7,"salary", 12880.68);
  21. OK
  22. {"name":"username1","id":7,"salary":12880.68}
  23. -- 使用 列名.字段名 访问具体信息
  24. hive> select userinfo.id from (select named_struct("name", "username1", "id",7, "salary", 12880.68) userinfo) tmp;
  25. -- union 数据类型
  26. hive> select create_union(0, "zhansan", 19, 8000.88) uinfo;

3 节 文本文件数据编码

Hive表中的数据在存储在文件系统上,Hive定义了默认的存储格式,也支持用户自

定义文件存储格式。

Hive默认使用几个很少出现在字段值中的控制字符,来表示替换默认分隔符的字符。

Hive默认分隔符

待整理

第四部分 HQL操作之 -DDL命令

参考:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL

DDL(data definition language): 主要的命令有CREATE、ALTER、DROP等。

DDL主要是用在定义、修改数据库对象的结构 或 数据类型。

Hive - 图4

1 节 数据库操作

Hive有一个默认的数据库default,在操作HQL时,如果不明确的指定要使用哪个

库,则使用默认数据库;

Hive的数据库名、表名均不区分大小写;

名字不能使用数字开头;

不能使用关键字,尽量不使用特殊符号;

创建数据库语法

  1. CREATE (DATABASE|SCHEMA) [IF NOT EXISTS] database_name
  2. [COMMENT database_comment]
  3. [LOCATION hdfs_path]
  4. [MANAGEDLOCATION hdfs_path]
  5. [WITH DBPROPERTIES (property_name=property_value, ...)];

— 创建数据库,在HDFS上存储路径为 /user/hive/warehouse/*.db

  1. hive (default)> create database lee;
  2. hive (default)> dfs -ls /user/hive/warehouse;
  1. hive> dfs -ls /user/hive/warehouse;
  2. Found 4 items
  3. drwxrwxrwt - root hive 0 2021-07-21 14:47 /user/hive/warehouse/bigdata.db
  4. drwxrwxrwt - root hive 0 2021-08-01 19:49 /user/hive/warehouse/lee.db
  5. drwxrwxrwt - admin hive 0 2021-07-15 17:51 /user/hive/warehouse/test333.db
  6. drwxrwxrwt - root hive 0 2021-07-31 11:21 /user/hive/warehouse/udf.db

也可以使用 hdfs 命令查看

  1. hadoop dfs -ls /user/hive/warehouse;
  1. [root@cdh-112 ~]# hadoop dfs -ls /user/hive/warehouse;
  2. WARNING: Use of this script to execute dfs is deprecated.
  3. WARNING: Attempting to execute replacement "hdfs dfs" instead.
  4. Found 4 items
  5. drwxrwxrwt - root hive 0 2021-07-21 14:47 /user/hive/warehouse/bigdata.db
  6. drwxrwxrwt - root hive 0 2021-08-01 19:49 /user/hive/warehouse/lee.db
  7. drwxrwxrwt - admin hive 0 2021-07-15 17:51 /user/hive/warehouse/test333.db
  8. drwxrwxrwt - root hive 0 2021-07-31 11:21 /user/hive/warehouse/udf.db

— 避免数据库已经存在时报错,使用 if not exists 进行判断【标准写法】

  1. hive (default)> create database if not exists mydb;

— 创建数据库。添加备注,指定数据库在存放位置

  1. hive (default)> create database if not exists mydb2 comment 'this is mydb2' location '/user/hive/mydb2.db';

查看数据库

  1. -- 查看所有数据库
  2. show database;
  3. -- 查看数据库信息
  4. desc database mydb2;
  5. desc database extended mydb2;
  6. describe database extended mydb2;

使用数据库

  1. use mydb;

删除数据库

  1. -- 删除一个空数据库
  2. drop database databasename;
  3. -- 如果数据库不为空,使用 cascade 强制删除
  4. drop database databasename cascade;

2 节 建表语法

  1. create [external] table [IF NOT EXISTS] table_name
  2. [(colName colType [comment 'comment'], ...)]
  3. [comment table_comment]
  4. [partition by (colName colType [comment col_comment], ...)]
  5. [clustered BY (colName, colName, ...)
  6. [sorted by (col_name [ASC|DESC], ...)] into num_buckets buckets]
  7. [row format row_format]
  8. [stored as file_format]
  9. [LOCATION hdfs_path]
  10. [TBLPROPERTIES (property_name=property_value, ...)]
  11. [AS select_statement];
  1. CREATE[TEMPORARY][EXTERNAL]TABLE[IFNOTEXISTS][db_name.]table_name LIKEexisting_table_or_view_name
  2. [LOCATIONhdfs_path];
  1. CREATE TABLE。按给定名称创建表,如果表已经存在则抛出异常。可使用 if not exists 规避。
  2. EXTERNAL关键字。创建外部表,否则创建的是内部表(管理表)。

    删除内部表时,数据和表的定义同时被删除;

    删除外部表时,仅仅删除了表的定义,数据保留;在生产环境中,多使用外部表;

  3. comment。表的注释

  4. partition by。对表中数据进行分区,指定表的分区字段
  5. clustered by。创建分桶表,指定分桶字段
  6. sorted by。对桶中的一个或多个列排序,较少使用
  7. 存储子句。
  1. ROW FORMAT DELIMITED
  2. [FIELDS TERMINATED BY char]
  3. [COLLECTION ITEMS TERMINATED BY char]
  4. [MAP KEYS TERMINATED BY char]
  5. [LINES TERMINATED BY char] | SERDE serde_name
  6. [WITH SERDEPROPERTIES (property_name=property_value,property_name=property_value, ...)]

建表时可指定 SerDe 。如果没有指定 ROW FORMAT 或者 ROW FORMAT DELIMITED,将会使用默认的 SerDe。建表时还需要为表指定列,在指定列的同时也会指定自定义的 SerDe。Hive通过 SerDe 确定表的具体的列的数据。

SerDe 是Serialize/Deserilize 的简称, hive使用Serde进行行对象的序列与反

序列化。

  1. stored as SEQUENCEFILE|TEXTFILE|RCFILE。如果文件数据是纯文本,可以使用 STORED AS TEXTFILE(缺省);如果数据需要压缩,使用 STORED ASSEQUENCEFILE(二进制序列文件)。
  2. LOCATION。表在HDFS上的存放位置
  3. TBLPROPERTIES。定义表的属性
  4. AS。后面可以接查询语句,表示根据后面的查询结果创建表
  5. LIKE。like 表名,允许用户复制现有的表结构,但是不复制数据

查看表的信息

show tblproperties Your_table_name

totalSize 字节

#

第五部分 HQL 操作之 数据操作

#

第六部分 HQL操作之 DQL 命令

第1节 基本查询

  1. -- 省略from子句的查询
  2. select 8*888 ;
  3. select current_date ;
  4. -- 使用列别名 select 8*888 product;
  5. select current_date as currdate;
  6. -- 全表查询
  7. select * from emp;
  8. -- 选择特定列查询
  9. select ename, sal, comm from emp;
  10. -- 使用函数
  11. select count(*) from emp; -- count(colname)
  12. 按字段进行count,不统计NULL
  13. select sum(sal) from emp;
  14. select max(sal) from emp;
  15. select min(sal) from emp;
  16. select avg(sal) from emp;
  17. -- 使用limit子句限制返回的行数
  18. select * from emp limit 3;

第2节 where 子语句

第3节 group by 子语句

第4节 表连接

第5节 排序子句

第七部分 函数

第1节 系统内置函数

第 2节 窗口函数

第3节 自定义函数

第八部分 HQL 操作之 DML命令

第1节 HIve 事务

第2节 Hive 事务操作实例

第九部分 元数据管理与存储

第十部分 Hive 调优策略

Hive 调优,要深入理解 MapReduce 过程,便于合理的调整参数

Map 阶段:将复杂的任务分解为若干个“简单的任务”来进行并行处理

Reduce阶段:对 map 阶段的结果进行全局汇总

MapReduce 原理分析

MapTask流程

Hive - 图5

详细步骤:

  1. 首先,读取数据组件InputFormat(默认TextInputFormat)会通过getSplits方法对输入目录中文件进行逻辑切片规划得到splits,有多少个split就对应启动多少个MapTask。split与block的对应关系默认是一对一。
  2. 将输入文件切分为splits之后,由RecordReader对象(默认LineRecordReader)进行读取,以\n作为分隔符,读取一行数据,返回。Key表示每行首字符偏移值,value表示这一行文本内容。
  3. 读取split返回,进入用户自己继承的Mapper类中,执行用户重写的map函数。RecordReader读取一行这里调用一次。
  4. map逻辑完之后,将map的每条结果通过context.write进行collect数据收集。在collect中,会先对其进行分区处理,默认使用HashPartitioner。
MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。默认对key hash后再以reduce task数量取模。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上。
  1. 接下来,会将数据写入内存,内存中这片区域叫做环形缓冲区,缓冲区的作用是批量收集map结果,减少磁盘IO的影响。我们的key/value对以及Partition的结果都会被写入缓冲区。当然写入之前,key与value值都会被序列化成字节数组。
环形缓冲区其实是一个数组,数组中存放着key、value的序列化数据和key、value的元数据信息,包括partition、key的起始位置、value的起始位置以及value的长度。环形结构是一个抽象概念。
缓冲区是有大小限制,默认是100MB。当map task的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写。这个溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例spill.percent。这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size * spillpercent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Maptask的输出结果还可以往剩下的20MB内存中写,互不影响。
6. 当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认行为!
如果job设置过Combiner,那么现在就是使用Combiner的时候了。将有相同key的key/value对的value加起来,减少溢写到磁盘的数据量。Combiner会优化MapReduce的中间结果,所以它在整个模型中会多次使用。
那哪些场景才能使用Combiner呢?从这里分析,Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。Combiner的使用一定得慎重,如果用好,它对job执行效率有帮助,反之会影响reduce的最终结果。
  1. 合并溢写文件:每次溢写会在磁盘上生成一个临时文件(写之前判断是否有combiner),如果map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个临时文件存在。当整个数据处理结束之后开始对磁盘中的临时文件进行merge合并,因为最终的文件只有一个,写入磁盘,并且为这个文件提供了一个索引文件,以记录每个reduce对应数据的偏移量。

官方地址 https://hadoop.apache.org/docs/r3.0.0/

MapTask的并行度

MapTask的并行度决定 Map 阶段的任务处理并发度,从而影响到整个 Job 的处理速度。

数据块:Block 是 HDFS 物理上把数据分成一块一块。

Hive - 图6

ReduceTask工作机制

Hive - 图7

Reduce大致分为 copysortreduce 三个阶段,重点在前两个阶段。copy阶段包含一个eventFetcher来获取已完成的map列表,由Fetcher线程去copy数据,在此过程中会启动两个merge线程,分别为inMemoryMerger和onDiskMerger,分别将内存中的数据merge到磁盘和将磁盘中的数据进行merge。待数据copy完成之后,copy阶段就完成了,开始进行sort阶段,sort阶段主要是执行finalMerge操作,纯粹的sort阶段,完成之后就是reduce阶段,调用用户定义的reduce函数进行处理。

详细步骤

  1. Copy阶段,简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求maptask获取属于自己的文件。
  2. Merge阶段。这里的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活。merge有三种形式:内存到内存;内存到磁盘;磁盘到磁盘。默认情况下第一种形式不启用。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的文件。
  3. 合并排序。把分散的数据合并成一个大的数据后,还会再对合并后的数据排序。
  4. 对排序后的键值对调用reduce方法,键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到HDFS文件中。

ReduceTask并行度

ReduceTask的并行度同样影响整个Job的执行并发度和执行效率,但与MapTask的并发数由切片数决定不同,ReduceTask数量的决定是可以直接手动设置:

// 默认值是1,手动设置为4 
job.setNumReduceTasks(4);

注意事项:

  1. ReduceTask=0,表示没有Reduce阶段,输出文件数和MapTask数量保持一致;
  2. ReduceTask数量不设置默认就是一个,输出文件数量为1个;
  3. 如果数据分布不均匀,可能在Reduce阶段产生倾斜;

shuffle 机制

map阶段处理的数据如何传递给 reduce 阶段,是 MapReduce 框架中最关键的一个流程,这个流程就叫 shuffle。

Hive - 图8

第1节 架构优化

执行引擎

Hive支持多种执行引擎,分别是 MapReduce、Tez、Spark、Flink。可以通过hive-site.xml 文件中的hive.execution.engine属性控制。

MapReduce

默认执行引擎

Tez

Tez是一个构建于YARN之上的支持复杂的DAG(有向无环图)任务的数据处理框架。由Hontonworks开源,将MapReduce的过程拆分成若干个子过程,同时可以把多个mapreduce任务组合成一个较大的DAG任务,减少了MapReduce之间的文件存储,同时合理组合其子过程从而大幅提升MR作业的性能。

Hive - 图9.jpg#crop=0&crop=0&crop=1&crop=1&id=KdLUq&originalType=binary&ratio=1&rotation=0&showTitle=false&status=done&style=none&title=)

Spark

Hive on Spark 是由 Cloudera 发起,由 Intel、MapR 等公司共同参与的开源项目,其目的是把Spark作为Hive的一个计算引擎,将Hive的查询作为 Spark 的任务提交到 Spark 集群上进行计算。通过该项目,可以提高 Hive 查询的性能,同时为已经部署了 Hive 或者 Spark 的用户提供了更加灵活的选择,从而进一步提高 Hive 和 Spark 的普及率。

官方数据认为 spark 会被传统 mapreduce 快10-100倍

set hive.execution.engine=spark;

优化器

与关系型数据库类似,Hive会在真正执行计算之前,生成和优化逻辑执行计划与物理执行计划。Hive有两种优化器:

Vectorize(向量化优化器)

Cost-BasedOptimization (CBO 成本优化器)。

矢量化查询执行

矢量化查询(要求执行引擎为Tez)执行通过一次批量执行1024行而不是每行一行来提
高扫描,聚合,过滤器和连接等操作的性能,这个功能一显着缩短查询执行时间。

set hive.vectorized.execution.enabled = true; -
- 默认 false
set hive.vectorized.execution.reduce.enabled = true; -
- 默认 false

备注:要使用矢量化查询执行,必须用ORC格式存储数据

成本优化器

Hive 的 CBO 是基于a pache Calcite 的,Hive的CBO通过查询成本(有analyze收集的统计信息)会生成有效率的执行计划,最终会减少执行的时间和资源的利用,使用CBO的配置如下:

SET hive.cbo.enable=true; --从 v0.14.0默认true
SET hive.compute.query.using.stats=true; -- 默认false
SET hive.stats.fetch.column.stats=true; -- 默认false
SET hive.stats.fetch.partition.stats=true; -- 默认true

定期执行表(analyze)的分析,分析后的数据放在元数据库中。

分区表

对于一张比较大的表,将其设计成分区表可以提升查询的性能,对于一个特定分区的查询,只会加载对应分区路径的文件数据,所以执行速度会比较快。

分区字段的选择是影响查询性能的重要因素,尽量避免层级较深的分区,这样会造
成太多的子文件夹。一些常见的分区字段可以是:

  1. 日期或时间。如year、month、day或者hour,当表中存在时间或者日期字段
  2. 地理位置。如国家、省份、城市等
  3. 业务逻辑。如部门、销售区域、客户等等

分通表

与分区表类似,分桶表的组织方式是将HDFS上的文件分割成多个文件。

分桶可以加快数据采样,也可以提升join的性能(join的字段是分桶字段),因为分桶可以确保某个key对应的数据在一个特定的桶内(文件),巧妙地选择分桶字段可以大幅度提升join的性能。

通常情况下,分桶字段可以选择经常用在过滤操作或者join操作的字段。

文件格式

在 HiveQL 的 create tabl e语句中,可以使用 stored as … 指定表的存储格式,不指定默认 TextFile
Hive表支持的存储格式有 TextFileSequenceFileRCFileORCParquet 等。

存储格式一般需要根据业务进行选择,在我们的实操中,绝大多数表都采用TextFile与Parquet两种存储格式之一。 TextFile是最简单的存储格式,它是纯文本记录,也是Hive的默认格式。虽然它的磁盘开销比较大,查询效率也低,但它更多地是作为跳板来使用。RCFile、ORC、Parquet等格式的表都不能由文件直接导入数据,必须由TextFile来做中转。 Parquet和ORC都是Apache旗下的开源列式存储格式。列式存储比起传统的行式存储更适合批量OLAP查询,并且也支持更好的压缩和编码。创建表时,特别是宽表,尽量使用ORC、ParquetFile这些列式存储格式,因为列式存储的表,每一列的数据在物理上是存储在一起的,Hive查询时会只遍历需要列数据,大大减少处理的数据量。

TextFile

1、存储方式:行存储。默认格式,如果建表时不指定默认为此格式。 
2、每一行都是一条记录,每行都以换行符"\n"结尾。数据不做压缩时,磁盘会开销比较大,数据解析开销也 比较大。
3、可结合Gzip、Bzip2等压缩方式一起使用(系统会自动检查,查询时会自动解压),推荐选用可切分的压 缩算法。

Sequence File

Parquet和ORC都是Apache旗下的开源列式存储格式。列式存储比起传统的行式存储更适合批量OLAP查询,并且也支持更好的压缩和编码。选择Parquet的原因主要是它支持Impala查询引擎,并且对update、delete和事务性操作需求很低。

1、一种Hadoop API提供的二进制文件,使用方便、可分割、个压缩的特点。 2、支持三种压缩选择:NONE、RECORD、BLOCK。RECORD压缩率低,一般建议使用BLOCK压缩。

RC File

1、存储方式:数据按行分块,每块按照列存储 。 A、首先,将数据按行分块,保证同一个record在一个块上,避免读一个记录需要读取多个block。 B、其次,块数据列式存储,有利于数据压缩和快速的列存取。 
2、相对来说,RCFile对于提升任务执行性能提升不大,但是能节省一些存储空间。可以使用升级版的ORC格 式。

ORC File

1、存储方式:数据按行分块,每块按照列存储 
2、Hive提供的新格式,属于RCFile的升级版,性能有大幅度提升,而且数据可以压缩存储,压缩快,快速 列存取。 
3、ORC File会基于列创建索引,当查询的时候会很快。

Parquet File

1、存储方式:列式存储。 
2、Parquet对于大型查询的类型是高效的。对于扫描特定表格中的特定列查询,Parquet特别有用。 Parquet一般使用Snappy、Gzip压缩。默认Snappy。 
3、Parquet支持Impala 查询引擎。 
                                                                                        4、表的文件存储格式尽量采用Parquet或ORC,不仅降低存储量,还优化了查询,压缩,表关联等性能。

数据压缩

Hive 语句最终是转化为 MapReduce 程序来执行的,而 MapReduce 的性能瓶颈在与 网络IO磁盘IO,要解决性能瓶颈,最主要的是 减少数据量,对数据进行压缩是个好方式。压缩虽然是减少了数据量,但是压缩过程要消耗 CPU,但是在 Hadoop 中,往往性能瓶颈不在于 CPU,CPU 压力并不大,所以压缩充分利用了比较空闲的 CPU。

压缩技术可以减少map与reduce之间的数据传输,从而可以提升查询性能,关于压缩的配置可以在hive的命令行中或者hive-site.xml文件中进行配置。

SET hive.exec.compress.intermediate=true

开启压缩之后,可以选择下面的压缩格式:

压缩格式 是否可拆分 是否自带 压缩率 速度 是否hadoop自带
gzip 很高 比较快
lzo 比较高 很快 否,要安装
snappy 比较高 很快 否,要安装
bzip2 最高

对应的类名

压缩格式 codec 拓展名 支持分割
Deflate org.apache.hadoop.io.compress.DefaultCodec .deflate N
Gzip org.apache.hadoop.io.compress.GzipCodec .gz N
Bzip2 org.apache.hadoop.io.compress.BZip2Codec .gz Y
LZO com.apache.compression.Izo.LzopCodec .lzo N
LZ4 org.apache.hadoop.io.compress.Lz4Codec .lz4 N
Snappy org.apache.hadoop.io.compress.SnappyCodec .snappy N

如何选择压缩方式

1、压缩比率 
2、压缩解压速度 
3、是否支持split

支持分割的文件可以并行的有多个 mapper 程序处理大数据文件,大多数文件不支持可分割是因为这些文件只能从头开始读。

是否压缩

1、计算密集型,不压缩,否则进一步增加了CPU的负担 
2、网络密集型,推荐压缩,减小网络数据传输

压缩使用

Job 输出文件按照 Block 以 GZip 的方式进行压缩:

## 默认值是false 
set mapreduce.output.fileoutputformat.compress=true; 

## 默认值是Record 
set mapreduce.output.fileoutputformat.compress.type=BLOCK 

## 默认值是 org.apache.hadoop.io.compress.DefaultCodec 
set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.G zipCodec

Map 输出结果也以 Gzip 进行压缩:

## 启用map端输出压缩 
set mapred.map.output.compress=true 

## 默认值是org.apache.hadoop.io.compress.DefaultCodec 
set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.GzipCodec

对 Hive 输出结果和中间都进行压缩:

set hive.exec.compress.output=true ## 默认值是false,不压缩 
set hive.exec.compress.intermediate=true ## 默认值是false,为true时MR设置的压缩才 启用

第2节 参数优化

本地模式

当Hive处理的数据量较小时,启动分布式去处理数据会有点浪费,因为可能启动的时间比数据处理的时间还要长。Hive支持将作业动态地转为本地模式,需要使用下面的配置:

SET hive.exec.mode.local.auto=true; -- 默认 false
SET hive.exec.mode.local.auto.inputbytes.max=50000000;
SET hive.exec.mode.local.auto.input.files.max=5; -- 默认 4

一个作业只要满足下面的条件,会启用本地模式

  1. 输入文件的大小小于 hive.exec.mode.local.auto.inputbytes.max 配置的大小
  2. map任务的数量小于 hive.exec.mode.local.auto.input.files.max 配置的大小
  3. reduce任务的数量是1或者0

严格模式

所谓严格模式,就是强制不允许用户执行3种有风险的HiveQL语句,一旦执行会直接失败。这3种语句是:

  1. 查询分区表时不限定分区列的语句;
  2. 两表join产生了笛卡尔积的语句;
  3. 用order by来排序,但没有指定limit的语句。

要开启严格模式,需要将参数hive.mapred.mode 设为strict(缺省值)。

该参数可以不在参数文件中定义,在执行SQL之前设置(set hive.mapred.mode=nostrict )

JVM 重用

默认情况下,Hadoop会为为一个map或者reduce启动一个JVM,这样可以并行执行map和reduce。

当map或者reduce是那种仅运行几秒钟的轻量级作业时,JVM启动进程所耗费的时间会比作业执行的时间还要长。Hadoop可以重用JVM,通过共享JVM以串行而非并行的方式运行map或者reduce。

JVM的重用适用于同一个作业的map和reduce,对于不同作业的task不能够共享JVM。如果要开启JVM重用,需要配置一个作业最大task数量,默认值为1,如果设置为-1,则表示不限制:

# 代表同一个MR job中顺序执行的5个task重复使用一个JVM,减少启动和关闭的开销
SET mapreduce.job.jvm.numtasks=5;

这个功能的缺点是,开启JVM重用将一直占用使用到的task插槽,以便进行重用,直到任务完成后才能释放。如果某个“不平衡的”job中有某几个reduce task执行的时间要比其他Reduce task消耗的时间多的多的话,那么保留的插槽就会一直空闲着却无法被其他的job使用,直到所有的task都结束了才会释放。

并行执行

Hive的查询通常会被转换成一系列的stage,这些stage之间并不是一直相互依赖的,可以并行执行这些stage,通过下面的方式进行配置:

SET hive.exec.parallel=true; -- 默认false
SET hive.exec.parallel.thread.number=16; -- 默认8

并行执行可以增加集群资源的利用率,如果集群的资源使用率已经很高了,那么并行执行的效果不会很明显。

推测执行

在分布式集群环境下,因为程序Bug、负载不均衡、资源分布不均等原因,会造成同一个作业的多个任务之间运行速度不一致,有些任务的运行速度可能明显慢于其他任务(比如一个作业的某个任务进度只有50%,而其他所有任务已经运行完毕),则这些任务会拖慢作业的整体执行进度。

为了避免这种情况发生,Hadoop采用了推测执行机制,它根据一定的规则推测出“拖后腿”的任务,并为这样的任务启动一个备份任务,让该任务与原始任务同时处理同一份数据,并最终选用最先成功运行完成任务的计算结果作为最终结果。

set mapreduce.map.speculative=true
set mapreduce.reduce.speculative=true
set hive.mapred.reduce.tasks.speculative.execution=true

合并小文件

在map执行前合并小文件,减少map数

# 缺省参数
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

在Map-Reduce的任务结束时合并小文件

# 在 map-only 任务结束时合并小文件,默认true
SET hive.merge.mapfiles = true;

# 在 map-reduce 任务结束时合并小文件,默认false
SET hive.merge.mapredfiles = true;

# 合并文件的大小,默认256M
SET hive.merge.size.per.task = 268435456;

# 当输出文件的平均大小小于该值时,启动一个独立的map-reduce任务进行文件merge
SET hive.merge.smallfiles.avgsize = 16777216;

Fetch模式

Fetch模式是指Hive中对某些情况的查询可以不必使用MapReduce计算。select col1, col2 from tab ;

可以简单地读取表对应的存储目录下的文件,然后输出查询结果到控制台。在开启fetch模式之后,在全局查找、字段查找、limit查找等都不启动 MapReduce 。

# Default Value: minimal in Hive 0.10.0 through 0.13.1, more in
Hive 0.14.0 and later
hive.fetch.task.conversion=more

Hive 参数说明的官方文档:

https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties

第3节 SQL优化

查看 Hive 执行计划

Hive 的 SQL 语句在执行之前需要将 SQL 语句转换成 MapReduce 任务,因此需要了解具体的转换过程,可以在 SQL 语句中输入如下命令查看具体的执行计划。

## 查看执行计划,添加extended关键字可以查看更加详细的执行计划 
explain [extended] query

列裁剪和分区裁剪

列裁剪是在查询时只读取需要的列;

分区裁剪就是只读取需要的分区。

谓词下推

将 SQL 语句中的 where 谓词逻辑都尽可能提前执行,减少下游处理的数据量。对应逻辑优化器是 PredicatePushDown。

set hive.optimize.ppd=true; ## 默认是true
select a.*, b.* from a join b on a.id = b.id where b.age > 20;
select a.*, c.* from a join (select * from b where age > 20) c on a.id = c.id;

sort by 代替 order by

HiveQL中的order by与其他关系数据库SQL中的功能一样,是将结果按某字段全局排序,这会导致所有map端数据都进入一个reducer中,在数据量大时可能会长时间计算不完。

如果使用sort by,那么还是会视情况启动多个reducer进行排序,并且保证每个reducer内局部有序。为了控制map端数据分配到reducer的key,往往还要配合distribute by 一同使用。如果不加 distribute by 的话,map端数据就会随机分配到reducer。

group by 代替 count(distinct)

当要统计某一列的去重数时,如果数据量很大,count(distinct) 会非常慢。原因与order by类似,count(distinct)逻辑只会有很少的reducer来处理。此时可以用group by 来改写:

参考

-- 原始SQL
select count(distinct uid) from tab;

-- 优化后的SQL
select count(1) from 
(select uid from tab group by uid) tmp;

这样写会启动两个MR job(单纯distinct只会启动一个),所以要确保数据量大到启动job的overhead远小于计算耗时,才考虑这种方法。当数据集很小或者key的倾斜比较明显时,group by还可能会比distinct慢。

group by 配置调整

map端预聚合

group by时,如果先起一个combiner在map端做部分预聚合,可以有效减少 shuffle 数据量。

-- 默认为true
set hive.map.aggr = true

Map端进行聚合操作的条目数

set hive.groupby.mapaggr.checkinterval = 100000

通过 hive.groupby.mapaggr.checkinterval 参数也可以设置map端预聚合的行数阈值,超过该值就会分拆job,默认值10W。

倾斜均衡配置项

group by时如果某些key对应的数据量过大,就会发生数据倾斜。Hive自带了一个均衡数据倾斜的配置项hive.groupby.skewindata ,默认值false。

其实现方法是在group by时启动两个MR job。第一个job会将map端数据随机输入reducer,每个reducer做部分聚合,相同的key就会分布在不同的reducer中。第二个job再将前面预处理过的数据按key聚合并输出结果,这样就起到了均衡的效果。

但是,配置项毕竟是死的,单纯靠它有时不能根本上解决问题,建议了解数据倾斜
的细节,并优化查询语句。

Join 基础优化

Hive join 的三种方式

1.common join

普通连接,在SQL中不特殊指定连接方式使用的都是这种普通连接。

缺点:性能较差(要将数据分区,有shuffle)

优点:操作简单,普适性强

2.map join

map端连接,与普通连接的区别是这个连接中不会有reduce阶段存在,连接在map端完成

适用场景:大表与小表连接,小表数据量应该能够完全加载到内存,否则不适用

优点:在大小表连接时性能提升明显

备注:Hive 0.6 的时候默认认为写在select 后面的是大表,前面的是小表, 或者使用 /+mapjoin(map_table) / 提示进行设定。select a., b. from a join b on a.id =b.id【要求小表在前,大表之后】

hive 0.7 的时候这个计算是自动化的,它首先会自动判断哪个是小表,哪个是大表,这个参数由(hive.auto.convert.join=true)来控制,然后控制小表的大小由(hive.smalltable.filesize=25000000)参数控制(默认是25M),当小表超过这个大小,hive 会默认转化成common join。

Hive 0.8.1,hive.smalltable.filesize => hive.mapjoin.smalltable.filesize

3.bucket map join

分桶连接:Hive 建表的时候支持hash 分区通过指定clustered by (col_name,xxx )into number_buckets buckets 关键字.当连接的两个表的join key 就是bucket column 的时候,就可以通过设置hive.optimize.bucketmapjoin= true 来执行优化。

原理:通过两个表分桶在执行连接时会将小表的每个分桶映射成hash表,每个task节点都需要这个小表的所有hash表,但是在执行时只需要加载该task所持有大表分桶对应的小表部分的hash表就可以,所以对内存的要求是能够加载小表中最大的hash块即可。

备注:小表与大表的分桶数量需要是倍数关系,这个是因为分桶策略决定的,分桶时会根据分桶字段对桶数取余后决定哪个桶的,所以要保证成倍数关系。

优点:比map join对内存的要求降低,能在逐行对比时减少数据计算量(不用比对小表全量)
缺点:只适用于分桶表

利用 map join 特性

map join特别适合大小表join的情况。Hive会将build table和probe table在map端直接完成join过程,消灭了reduce,效率很高。

select a.event_type, b.upload_time
from calendar_event_code a
inner join (
select event_type, upload_time from calendar_record_log
where pt_date = 20190225
) b on a.event_type = b.event_type;

map join的配置项是hive.auto.convert.join ,默认值true。

当build table大小小于 hive.mapjoin.smalltable.filesize 会启用map join,默认值25000000(约25MB)。还有hive.mapjoin.cache.numrows ,表示缓存build table的多少行数据到内存,默认值25000。

分桶表map join

map join对分桶表还有特别的优化。由于分桶表是基于一列进行hash存储的,因此非常适合抽样(按桶或按块抽样)。它对应的配置项是hive.optimize.bucketmapjoin 。

倾斜均衡配置项

这个配置与 group by 的倾斜均衡配置项异曲同工,通过 hive.optimize.skewjoin 来配置,默认false。

如果开启了,在join过程中Hive会将计数超过阈值 hive.skewjoin.key (默认100000)的倾斜key对应的行临时写进文件中,然后再启动另一个job做map join生成结果。通过hive.skewjoin.mapjoin.map.tasks 参数还可以控制第二个job的mapper数量,默认10000。

处理空值或无意义值

日志类数据中往往会有一些项没有记录到,其值为null,或者空字符串、-1等。如果缺失的项很多,在做join时这些空值就会非常集中,拖累进度【备注:这个字段是连接字段】。

若不需要空值数据,就提前写 where 语句过滤掉。需要保留的话,将空值key用随机方式打散,例如将用户ID为null的记录随机改为负值:

SELECT a.uid, a.event_type, b.nickname, b.age
FROM (
    SELECT CASE 
            WHEN uid IS NULL THEN CAST(rand() * -10240 AS int)
            ELSE uid
        END AS uid, event_type
    FROM calendar_record_log
    WHERE pt_date >= 20190201
) a
    LEFT JOIN (
        SELECT uid, nickname, age
        FROM user_info
        WHERE status = 4
    ) b
    ON a.uid = b.uid;

单独处理倾斜key

如果倾斜的 key 有实际的意义,一般来讲倾斜的key都很少,此时可以将它们单独抽取出来,对应的行单独存入临时表中,然后打上一个较小的随机数前缀(比如0~9),最后再进行聚合。

不要一个Select语句中,写太多的Join。一定要了解业务,了解数据。(A0-A9)分成多条语句,分步执行;(A0-A4; A5-A9);先执行大表与小表的关联;

调整 Map 数

查看表基本信息

DESCRIBE formatted table_name;

通常情况下,作业会通过输入数据的目录产生一个或者多个map任务。主要因素包括:

  1. 输入文件总数
  2. 输入文件大小
  3. HDFS文件块大小

map越多越好吗。当然不是,合适的才是最好的。

如果一个任务有很多小文件(<< 128M),每个小文件也会被当做一个数据块,用一个 Map Task 来完成。

一个 Map Task 启动和初始化时间 >> 处理时间,会造成资源浪费,而且系统中同时可用的map数是有限的。

对于小文件采用的策略是合并

每个map处理接近128M的文件块,会有其他问题吗。也不一定。
有一个125M的文件,一般情况下会用一个Map Task完成。假设这个文件字段很少,但记录数却非常多。如果Map处理的逻辑比较复杂,用一个map任务去做,性能也不好。

对于复杂文件采用的策略是增加 Map 数。

computeSliteSize(max(minSize, min(maxSize, blocksize))) =blocksize
minSize : mapred.min.split.size (默认值1)
maxSize : mapred.max.split.size (默认值256M)
调整maxSize最大值。让maxSize最大值低于blocksize就可以增加map的个数。
建议用set的方式,针对SQL语句进行调整。

调整 Reduce 数

reducer数量的确定方法比mapper简单得多。使用参数mapred.reduce.tasks 可以直接设定reducer数量。如果未设置该参数,Hive会进行自行推测,逻辑如下:

  1. 参数hive.exec.reducers.bytes.per.reducer 用来设定每个reducer能够处理的最大数据量,默认值256M
  2. 参数hive.exec.reducers.max 用来设定每个job的最大reducer数量,默认值999(1.2版本之前)或1009(1.2版本之后)
  3. 得出reducer数: reducer_num = MIN(total_input_size /reducers.bytes.per.reducer, reducers.max)
    即: min(输入总数据量 / 256M, 1009)

reducer数量与输出文件的数量相关。如果reducer数太多,会产生大量小文件,对HDFS造成压力。如果reducer数太少,每个reducer要处理很多数据,容易拖慢运行时间或者造成OOM。


删除临时文件

https://www.cnblogs.com/TurboWay/p/12838557.html


第4节 优化小结

深入理解 Hadoop 的核心能力,对Hive优化很有帮助。Hadoop/Hive 处理数据过程,有几个显著特征:

  1. 不怕数据多,就怕数据倾斜
  2. 对 job 数比较多的作业运行效率相对比较低,比如即使有几百行的表,多次关联多次汇总,产生十几个jobs,执行也需要较长的时间。MapReduce 作业初始化的时间是比较长的
  3. 对sum、count等聚合操作而言,不存在数据倾斜问题
  4. count(distinct) 效率较低,数据量大容易出问题

从大的方面来说,优化可以从这几个方面着手:

  1. 好的模型设计,事半功倍
  2. 解决数据倾斜问题。仅仅依靠参数解决数据倾斜,是通用的优化手段,收获有限。开发人员应该熟悉业务,了解数据规律,通过业务逻辑解决数据倾斜往往更可靠
  3. 减少 job 数
  4. 设置合理的map、reduce task数
  5. 对小文件进行合并,是行之有效的提高Hive效率的方法
  6. 优化把握整体,单一作业的优化不如整体最优