一、分区表
1.1 概念
Hive 中的表对应为 HDFS 上的指定目录,在查询数据时候,默认会对全表进行扫描,这样时间和性能的消耗都非常大。
分区为 HDFS 上表目录的子目录,数据按照分区存储在子目录中。如果查询的 where
字句的中包含分区条件,则直接从该分区去查找,而不是扫描整个表目录,合理的分区设计可以极大提高查询速度和性能。
这里说明一下分区表并 Hive 独有的概念,实际上这个概念非常常见。比如在我们常用的 Oracle 数据库中,当表中的数据量不断增大,查询数据的速度就会下降,这时也可以对表进行分区。表进行分区后,逻辑上表仍然是一张完整的表,只是将表中的数据存放到多个表空间(物理文件上),这样查询数据时,就不必要每次都扫描整张表,从而提升查询性能。
1.2 使用场景
通常,在管理大规模数据集的时候都需要进行分区,比如将日志文件按天进行分区,从而保证数据细粒度的划分,使得查询性能得到提升。
1.3 创建分区表
在 Hive 中可以使用 PARTITIONED BY
子句创建分区表。表可以包含一个或多个分区列,程序会为分区列中的每个不同值组合创建单独的数据目录。下面的我们创建一张雇员表作为测试:
CREATE EXTERNAL TABLE emp_partition(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2)
)
PARTITIONED BY (deptno INT) -- 按照部门编号进行分区
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
LOCATION '/hive/emp_partition';
1.4 加载数据到动态分区
要设置动态插入操作,必须设置动态分区相关参数
动态分区参数:
hive.exec.dynamic.partition 默认值:false
是否开启动态分区功能,默认false关闭。
使用动态分区时候,该参数必须设置成true;
hive.exec.dynamic.partition.mode
默认值:strict
动态分区的模式,默认strict,表示必须指定至少一个分区为静态分区,nonstrict模式表示允许所有的分区字段都可以使用动态分区。
一般需要设置为nonstrict
hive.exec.max.dynamic.partitions.pernode
默认值:100
在每个执行MR的节点上,最大可以创建多少个动态分区。
该参数需要根据实际的数据来设定。
比如:源数据中包含了一年的数据,即day字段有365个值,那么该参数就需要设置成大于365,如果使用 默认值100,则会报错。
hive.exec.max.dynamic.partitions
默认值:1000
在所有执行MR的节点上,最大一共可以创建多少个动态分区。
同上参数解释。
hive.exec.max.created.files
默认值:100000
整个MR Job中,最大可以创建多少个HDFS文件。
一般默认值足够了,除非你的数据量非常大,需要创建的文件数大于100000,可根据实际情况加以调整。
--是否开启动态分区功能,默认false关闭。使用动态分区时候,该参数必须设置成true;
SET hive.exec.dynamic.partition = true;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.max.dynamic.partitions = 1000000;
--参数指的是每个节点上能够生成的最大分区,这个在最坏情况下应该是跟最大分区一样的值
SET hive.exec.max.dynamic.partitions.pernode=100000;
INSERT OVERWRITE table bigdata_dev_dashuju.e_flow_query_record partition(p_date,p_operators) select * from dw_flow.flow_query_record where p_date='2017-10-12';
1.5 查看分区目录
这时候我们直接查看表目录,可以看到表目录下存在两个子目录,分别是 deptno=20
和 deptno=30
,这就是分区目录,分区目录下才是我们加载的数据文件。
# hadoop fs -ls hdfs://hadoop001:8020/hive/emp_partition/
这时候当你的查询语句的 where
包含 deptno=20
,则就去对应的分区目录下进行查找,而不用扫描全表。
二、分桶表
1.1 简介
分区提供了一个隔离数据和优化查询的可行方案,但是并非所有的数据集都可以形成合理的分区,分区的数量也不是越多越好,过多的分区条件可能会导致很多分区上没有数据。同时 Hive 会限制动态分区可以创建的最大分区数,用来避免过多分区文件对文件系统产生负担。鉴于以上原因,Hive 还提供了一种更加细粒度的数据拆分方案:分桶表 (bucket Table)。
分桶表会将指定列的值进行哈希散列,并对 bucket(桶数量)取余,然后存储到对应的 bucket(桶)中。
1.2 理解分桶表
单从概念上理解分桶表可能会比较晦涩,其实和分区一样,分桶这个概念同样不是 Hive 独有的,对于 Java 开发人员而言,这可能是一个每天都会用到的概念,因为 Hive 中的分桶概念和 Java 数据结构中的 HashMap 的分桶概念是一致的。
当调用 HashMap 的 put() 方法存储数据时,程序会先对 key 值调用 hashCode() 方法计算出 hashcode,然后对数组长度取模计算出 index,最后将数据存储在数组 index 位置的链表上,链表达到一定阈值后会转换为红黑树 (JDK1.8+)。下图为 HashMap 的数据结构图:
图片引用自:HashMap vs. Hashtable
1.3 创建分桶表
在 Hive 中,我们可以通过 CLUSTERED BY
指定分桶列,并通过 SORTED BY
指定桶中数据的排序参考列。下面为分桶表建表语句示例:
CREATE EXTERNAL TABLE emp_bucket(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2),
deptno INT)
CLUSTERED BY(empno) SORTED BY(empno ASC) INTO 4 BUCKETS --按照员工编号散列到四个 bucket 中
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
LOCATION '/hive/emp_bucket';
1.4 加载数据到分桶表
这里直接使用 Load
语句向分桶表加载数据,数据时可以加载成功的,但是数据并不会分桶。
这是由于分桶的实质是对指定字段做了 hash 散列然后存放到对应文件中,这意味着向分桶表中插入数据是必然要通过 MapReduce,且 Reducer 的数量必须等于分桶的数量。由于以上原因,分桶表的数据通常只能使用 CTAS(CREATE TABLE AS SELECT) 方式插入,因为 CTAS 操作会触发 MapReduce。加载数据步骤如下:
1. 设置强制分桶
set hive.enforce.bucketing = true; --Hive 2.x 不需要这一步
在 Hive 0.x and 1.x 版本,必须使用设置 hive.enforce.bucketing = true
,表示强制分桶,允许程序根据表结构自动选择正确数量的 Reducer 和 cluster by column 来进行分桶。
2. CTAS导入数据
INSERT INTO TABLE emp_bucket SELECT * FROM emp; --这里的 emp 表就是一张普通的雇员表
可以从执行日志看到 CTAS 触发 MapReduce 操作,且 Reducer 数量和建表时候指定 bucket 数量一致:
1.5 查看分桶文件
三、分区表和分桶表结合使用
分区表和分桶表的本质都是将数据按照不同粒度进行拆分,从而使得在查询时候不必扫描全表,只需要扫描对应的分区或分桶,从而提升查询效率。两者可以结合起来使用,从而保证表数据在不同粒度上都能得到合理的拆分。下面是 Hive 官方给出的示例:
CREATE TABLE page_view_bucketed(
viewTime INT,
userid BIGINT,
page_url STRING,
referrer_url STRING,
ip STRING )
PARTITIONED BY(dt STRING)
CLUSTERED BY(userid) SORTED BY(viewTime) INTO 32 BUCKETS
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\001'
COLLECTION ITEMS TERMINATED BY '\002'
MAP KEYS TERMINATED BY '\003'
STORED AS SEQUENCEFILE;
此时导入数据时需要指定分区:
INSERT OVERWRITE page_view_bucketed
PARTITION (dt='2009-02-25')
SELECT * FROM page_view WHERE dt='2009-02-25';
四、hive分区表增加字段会导致新增字段无法显示值的BUG处理
一、问题重现
1.建表
CREATE EXTERNAL TABLE dw_sms.`sms_status`(
`channel_msg_id` string,
`channel_id` bigint,
`sms_code` string,
`sms_status` string,
`notify_time` bigint,
`source` int)
PARTITIONED BY (
`p_date` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'field.delim'='\u0001',
'serialization.format'='\u0001')
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'hdfs://nameservice/data/hive/dw_sms.db/sms_status'
2.建分区及上传数据
1.建分区目录
hdfs dfs -mkdir -p /data/hive/dw_sms.db/sms_status/p_date=2018-12-
2.上传数据文件
-347923467128781448315EXPIREDEXPIRED1546235549102A0923E4DB2501A43FA4DCCB9A830A38189482761802566334749
-348435262298555995715UNDELIVUNDELIV154621905410774532D6D452739FBF7A16AD7E8E3AA6173363227492562587468
3.分区修复
MSCK REPAIR TABLE dw_sms.`sms_status`;
3.新增字段
ALTER TABLE dw_sms.`sms_status` ADD COLUMNS (uid STRING,mobile STRING,msg_id STRING);
4.查询新增字段前的分区数据 新增字段全部为NULL
二、解决办法
方法一:
ALTER TABLE table_name
[PARTITION partition_spec] -- (Note: Hive 0.14.0 and later)
ADD|REPLACE COLUMNS (col_name data_type [COMMENT col_comment], ...)
[CASCADE|RESTRICT] -- (Note: Hive 1.1.0 and later)
eg:
ALTER TABLE dw_sms.`sms_status` ADD COLUMNS (uid STRING,mobile STRING,msg_id STRING) CASCADE;
注意:DDL语句最后添加CASCADE。 否则新增的列在旧分区中不可见,查询数据时为NULL,重新刷数据时仍为NULL
方法二:
1.查元数据库中对应表的CD_ID
SELECT s.CD_ID FROM DBS d, TBLS t,SDS s
WHERE d.NAME='dw_sms' AND t.TBL_NAME='sms_status' AND d.DB_ID=t.DB_ID
AND s.SD_ID=t.SD_ID
或者
select CD_ID from SDS where LOCATION='hdfs://nameservice/data/hive/dw_sms.db/sms_status' ;
2.查看表所有分区的CD_ID
SELECT * FROM SDS WHERE LOCATION LIKE 'hdfs://nameservice/data/hive/dw_sms.db/sms_status/p_date=%'
说明:经查询会发现,CD_ID新值为14784,历史分区CD_ID为14783
3.更新历史分区的CD_ID
UPDATE SDS SET CD_ID=14784 WHERE LOCATION LIKE 'hdfs://nameservice/data/hive/dw_sms.db/sms_status/p_date=%'
方法三:修改表
先drop 表在重新建表再添加分区 或者
对需要生效的分区也执行添加或者修改字段的操作,比如:alter table tablename partition(year=’2017’) add columns(name STRING );