一、合并
由于每次对数据操作一次,都会生成一个hfile文件,hdfs端会在空闲时间,把多个hfile文件进行合并。
最后,如果合并之后,多个region中的数据分配的不合理,会自动的合并region。由于合并region销号很多资源,所以一般手动合并region。
1.1region的拆分
- 建表的时候预分region,根据rowkey合理设计
create ‘a’ ,’cf’,SPLITS=>[‘’,’’,’’]
//java
createTable(table,参数二二维字节数组)
- 自动进行大小拆分
1 没有预分region的表初始1个region
2 随着数据的插入越来越多,region管理的数据行数越来越多
3 256M的时候会有1个region拆分成2个
4 再次拆分,2G的时候再拆分
5 再次拆分,6.75的时候再拆分
6 再次拆分,10G的时候再拆分
7 以后都是10G
- 拆分策略
1. 默认大小拆分
2. 根据rowkey前缀拆分
3. 切割符
4. 手动强制拆分
1.2 region的合并
hfile文件的合并
- 由于每次对文件操作都会生成一个hfile文件,所以hdfs会在空闲时间依次合并这些小的hfile文件。
- 注意在memorystore中,如果有对数据有多个操作,如果有墓被标记,delete就不会读前面的数据,直接删除
region的合并
region合并,当大量的删除,大量的数据过期 , region管理的数据行数大大减少的时候就会合并
二、rowkey的设计
rowkey的作用
数据的排序维度
行的唯一标识
索引创建
布隆过滤器创建
hbaseKV的数据库 rowkey
如何合理的设计rowkey
- 根据数据特点和业务特点设计
- 优先避免热点问题
- 优先考虑频次最高的作为查询维度
- rowkey的长度不能太长,因为有冗余
还可以多维度查询
案例
在已经创建好movie为rowkey的情况下,查询某个UID对应的信息
思路:创建两个表。
三、电影案例
- 创建tb_movie表,列族为info
- 写MR程序,加载json文件,编写MR程序,生成静态hfile并写入到tb_movie中
package com._51doit.hbase.day04;
import com.google.gson.Gson;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
/**
* Author: Hang.Z
* Date: 21/07/05
* Description:把电影的json数据加载到HBASE的'tb_movie'表中
map
生成rk
write(rk,mb)
reduce
定义put类
将mb的各个属性写入到表的单元格中
*/
public class LoadData2Hbase {
// mapper
static class LoadData2HbaseMapper extends Mapper<LongWritable, Text,Text,MovieWritable> {
Gson gson = new Gson();
Text k = new Text() ;
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
try {
String line = value.toString();
//将每行内容转换成Java类
MovieWritable mb = gson.fromJson(line, MovieWritable.class);
String mid = mb.getMovie(); //47 --> 00047
String timeStamp = mb.getTimeStamp();
//设计好rowkey , rowkey作为Key输出
String padMid = StringUtils.leftPad(mid, 5, '0');
String padTime = StringUtils.leftPad(timeStamp, 10, '0');
// 固定长度 唯一
String rk = padMid+"_"+padTime;
k.set(rk);
context.write(k,mb);
}catch (Exception e){
}
}
}
//reduce
static class LoadData2HbaseReducer extends TableReducer<Text,MovieWritable,ImmutableBytesWritable>{
@Override
protected void reduce(Text key, Iterable<MovieWritable> values, Context context) throws IOException, InterruptedException {
// key就是行键 唯一
String rowkey = key.toString();
//创建put添加单元格
Put put = new Put(rowkey.getBytes());
// 当前的电影记录
MovieWritable movie = values.iterator().next(); // 四个属性 , 四个单元格
// 添加单元格
put.addColumn("info".getBytes() , "movie".getBytes() , Bytes.toBytes(movie.getMovie())) ;
put.addColumn("info".getBytes() , "rate".getBytes() , Bytes.toBytes(movie.getRate())) ;
put.addColumn("info".getBytes() , "timeStamp".getBytes() , Bytes.toBytes(movie.getTimeStamp())) ;
put.addColumn("info".getBytes() , "uid".getBytes() , Bytes.toBytes(movie.getUid())) ;
//输出数据
context.write(null,put);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
// zookeeper
conf.set("hbase.zookeeper.quorum","linux01:2181,linux02:2181,linux03:2181");
Job job = Job.getInstance(conf, "loadData");
job.setMapperClass(LoadData2HbaseMapper.class) ;
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(MovieWritable.class);
// 数据的位置
FileInputFormat.setInputPaths(job,new Path("D:\\mrdata\\movie\\input"));
//表格reduce的设置
TableMapReduceUtil.initTableReducerJob("tb_movie",LoadData2HbaseReducer.class ,job);
job.waitForCompletion(true) ;
}
}
movebean类
package com._51doit.hbase.day04;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
/**
* Author: Hang.Z
* Date: 21/06/23
* Description:
* {"movie":"2288","rate":"4","timeStamp":"978160616","uid":"17"}
* 这个类要放在Map端的VALUE的位置输出 序列化
* 序列化本质
* 内存对象--->二进制 写
* 二进制--->内存对象 读
*/
public class MovieWritable implements Writable {
private String movie ;
private double rate ;
private String timeStamp ;
private String uid ;
public String getMovie() {
return movie;
}
public void setMovie(String movie) {
this.movie = movie;
}
public double getRate() {
return rate;
}
public void setRate(double rate) {
this.rate = rate;
}
public String getTimeStamp() {
return timeStamp;
}
public void setTimeStamp(String timeStamp) {
this.timeStamp = timeStamp;
}
public String getUid() {
return uid;
}
public void setUid(String uid) {
this.uid = uid;
}
@Override
public String toString() {
return "MovieWritable{" +
"movie='" + movie + '\'' +
", rate=" + rate +
", timeStamp='" + timeStamp + '\'' +
", uid='" + uid + '\'' +
'}';
}
// 序列化
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(movie);
dataOutput.writeDouble(rate);
dataOutput.writeUTF(timeStamp);
dataOutput.writeUTF(uid);
}
// 反序列化
public void readFields(DataInput dataInput) throws IOException {
movie = dataInput.readUTF();
rate = dataInput.readDouble() ;
timeStamp = dataInput.readUTF() ;
uid = dataInput.readUTF() ;
}
}
四、过滤器
package com._51doit.hbase.day04.filter;
import com._51doit.hbase.day02.HbaseUtils;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
/**
* Author: Hang.Z
* Date: 21/07/05
* Description:
* 获取数据的时候 可以使用过滤器过滤数据
* Get
* Scan
*/
public class Filter01_Demo1 {
public static void main(String[] args) throws Exception {
Connection conn = HbaseUtils.getConnection();
Table tb_movie = conn.getTable(TableName.valueOf("tb_movie"));
Scan scan = new Scan();
// 显示10行
scan.setLimit(10) ;
SubstringComparator substringComparator = new SubstringComparator("00002");
// 行键
/**
* 参数一 判断的符合依据 比较运算符 = >= > < <= !=
* 参数二 比较形式 substringComparator
*/
RowFilter filter = new RowFilter(CompareOperator.EQUAL, substringComparator);
// 设置数据过滤器 过滤级别为行过滤
scan.setFilter(filter);
ResultScanner results = tb_movie.getScanner(scan);
for (Result result : results) {
HbaseUtils.showData(result);
}
conn.close();
}
}