回顾 :
shell-api
DDL
DML list_regions locate_region split
java-api
Connection
Admin
Table
* 预分region表
tools split flush
一、java-api
1 添加数据
1.1 put数据
- 插入一个单元格
package doit.day03;
import doit.day02.HbaseUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
/**
* Author: Hang.Z
* Date: 21/07/03
* Description:
* put 'tb_user' , 'rk' , 'cf:name' , 'zss'
* Put对象 行键 列族 属性 值
* Put对象中可以添加多个Cell , 一次插入一行数据
* 1 插入一个单元格
* 2 插入多个单元格
* 3 插入多个列族下的多个单元格
*/
public class Hbase07_AddData01 {
public static void main(String[] args) throws Exception {
Connection conn = HbaseUtils.getConnection();
// 获取操作的表对象
Table tb_user = conn.getTable(TableName.valueOf("tb_user"));
// 创建Put对象
Put put = new Put("rk001".getBytes());
put.addColumn("cf".getBytes() , "name".getBytes() , "zss".getBytes()) ;
put.addColumn("cf".getBytes() , "gender".getBytes() , "M".getBytes()) ;
// 添加数据
tb_user.put(put);
//释放资源
tb_user.close();
conn.close();
}
}
- 插入多个单元格
package doit.day03;
import doit.day02.HbaseUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
/**
* Author: Hang.Z
* Date: 21/07/03
* Description:
* put 'tb_user' , 'rk' , 'cf:name' , 'zss'
* Put对象 行键 列族 属性 值
* Put对象中可以添加多个Cell , 一次插入一行数据
* 1 插入一个单元格
* 2 插入多个单元格
* 3 插入多个列族下的多个单元格
*/
public class Hbase07_AddData02 {
public static void main(String[] args) throws Exception {
Connection conn = HbaseUtils.getConnection();
// 获取操作的表对象
Table tb_a1 = conn.getTable(TableName.valueOf("tb_a1"));
// 创建Put对象
Put put = new Put("rk001".getBytes());
put.addColumn("cf1".getBytes() , "name".getBytes() , "zss".getBytes()) ;
put.addColumn("cf1".getBytes() , "gender".getBytes() , "M".getBytes()) ;
put.addColumn("cf2".getBytes() , "job".getBytes() , "coder".getBytes()) ;
// 添加数据
tb_a1.put(put);
//释放资源
tb_a1.close();
conn.close();
}
}
- 插入多行数据
package doit.day03;
import doit.day02.HbaseUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.ArrayList;
import java.util.List;
/**
* Author: Hang.Z
* Date: 21/07/03
* Description:
* put 'tb_user' , 'rk' , 'cf:name' , 'zss'
* Put对象 行键 列族 属性 值
* 同时参入多行数据
*/
public class Hbase07_AddData03 {
public static void main(String[] args) throws Exception {
Connection conn = HbaseUtils.getConnection();
// 获取操作的表对象
Table tb_a1 = conn.getTable(TableName.valueOf("tb_a1"));
// 创建Put对象
// rk002
Put put2 = new Put("rk002".getBytes());
put2.addColumn("cf1".getBytes() , "name".getBytes() , "lss".getBytes()) ;
put2.addColumn("cf1".getBytes() , "gender".getBytes() , "F".getBytes()) ;
put2.addColumn("cf2".getBytes() , "job".getBytes() , "coder".getBytes()) ;
// rk003
Put put3 = new Put("rk003".getBytes());
put3.addColumn("cf1".getBytes() , "name".getBytes() ,Bytes.toBytes( "王五")) ;
put3.addColumn("cf2".getBytes() , "sal".getBytes() , Bytes.toBytes(22222.22)) ;
List<Put> ls = new ArrayList<>() ;
ls.add(put2) ;
ls.add(put3);
// 添加数据
tb_a1.put(ls);
//释放资源
tb_a1.close();
conn.close();
}
}
1.2 缓存,批次处理数据
BufferedMutator bm = conn.getBufferedMutator(TableName.valueOf("tb_user"));
// 周期性刷写数据到hbase中 , 并不是一行写一次
bm.setWriteBufferPeriodicFlush(2000);
bm.mutate(ls);
bm.flush ;
案例:
package doit.day03;
import doit.day02.HbaseUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.ArrayList;
import java.util.List;
/**
* Author: Hang.Z
* Date: 21/07/03
* Description:
* put 'tb_user' , 'rk' , 'cf:name' , 'zss'
* BufferedMutator 批次的写入数据
* 1 时间间隔
* 2 缓存大小
* 任何一个条件满足都会执行一次RPC请求批次写
*/
public class Hbase07_AddData04 {
public static void main(String[] args) throws Exception {
Connection conn = HbaseUtils.getConnection();
// 获取一个操作数据的对象 , 包含表信息
BufferedMutator bm = conn.getBufferedMutator(TableName.valueOf("tb_user"));
Put put1 = new Put(Bytes.toBytes("rk004"));
put1.addColumn(Bytes.toBytes("cf") ,Bytes.toBytes("name"),Bytes.toBytes("ww3")) ;
Put put2 = new Put(Bytes.toBytes("rk005"));
put2.addColumn(Bytes.toBytes("cf") ,Bytes.toBytes("name"),Bytes.toBytes("ww4")) ;
List<Put> ls = new ArrayList<>() ;
ls.add(put1);
ls.add(put2);
// 周期性刷写数据到hbase中 , 并不是一行写一次
bm.setWriteBufferPeriodicFlush(2000);
// 大小达到阈值 请求写数据
long size = bm.getWriteBufferSize();
System.out.println(size); // 2M 2048K
// 执行插入
bm.mutate(ls);
// 手动强制刷写
// bm.flush();
bm.close();
conn.close();
}
}
1.3 bulkload方式(MR)
将静态数据转换成hfile , 将hfile导入到指定的表中
shell脚本
MR编程
1 数据
1,zss,23,BJ
2,lss,13,NJ
3,wbb,33,SH
4,lyy,12,BJ
2 创建hbase的表 create ‘tb_teacher’ , ‘cf’
3 操作的数据在HDFS上
注意参数不要写错:列族名、hdfs的文件路径等。
hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.separator=, -Dimporttsv.columns='HBASE_ROW_KEY,cf:name,cf:age,cf:city' -Dimporttsv.bulk.output=/teacher/output tb_teacher /csv/teacher.csv
4 将生成的hfile文件导入到hbase 的指定表中
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /teacher/output tb_teacher
2 查看表信息
2.1 获取数据(scan一个表)
package com._51doit.hbase.day02;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.Iterator;
/**
* Author: Hang.Z
* Date: 21/07/03
* Description:
*/
public class Hbase08_ReadData {
public static void main(String[] args) throws Exception {
Connection connection = HbaseUtils.getConnection();
// 获取表对象
Table tb_a1 = connection.getTable(TableName.valueOf("tb_a1"));
//所有的数据
ResultScanner results = tb_a1.getScanner(new Scan());
// 获取每行
for (Result result : results) {
// 遍历单元格
while (result.advance()){
// 获取单元格
Cell cell = result.current();
// 获取行键 列族 属性 值
// 工具类 解析CEll单元格
// rk001cf1gender zi�M
byte[] cloneRow = CellUtil.cloneRow(cell);
byte[] cloneFamily = CellUtil.cloneFamily(cell);
byte[] cloneQualifier = CellUtil.cloneQualifier(cell);
byte[] cloneValue = CellUtil.cloneValue(cell);
System.out.println(
new String(cloneRow)+"-"+
new String(cloneFamily)+"-"+
new String(cloneQualifier)+"-"+
(new String(cloneQualifier).equals("sal") ? Bytes.toDouble(cloneValue):new String(cloneValue))
);
}
}
connection.close();
}
}
2.2 获取某行数据(get)
package doit.day03;
import doit.day02.HbaseUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
/**
* Author: Hang.Z
* Date: 21/07/03
* Description:获取第一行的所有数据
*/
public class Hbase08_ReadData2 {
public static void main(String[] args) throws Exception {
Connection connection = HbaseUtils.getConnection();
// 获取表对象
Table tb_a1 = connection.getTable(TableName.valueOf("tb_a1"));
Get get = new Get(Bytes.toBytes("rk001"));
// 行数据
Result result = tb_a1.get(get);
HbaseUtils.showData(result);
connection.close();
}
}
2.3 获取某行的某个列族的数据
package doit.day03;
import doit.day02.HbaseUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
/**
* Author: Hang.Z
* Date: 21/07/03
* Description:
* 扫描表的一个列族的数据
* 总结
* 1 get一行数据
* 2 List<Get></> 多行
* 3 扫描整个表的所有数据
* 4 扫描整个表的某个列族的所有数据
*/
public class Hbase08_ReadData3 {
public static void main(String[] args) throws Exception {
Connection connection = HbaseUtils.getConnection();
// 获取表对象
Table tb_a1 = connection.getTable(TableName.valueOf("tb_a1"));
ResultScanner scanner = tb_a1.getScanner("cf2".getBytes());
for (Result result : scanner) {
HbaseUtils.showData(result);
}
connection.close();
}
}
2.4 过滤查看数据
package doit.day03;
import doit.day02.HbaseUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
/**
* Author: Hang.Z
* Date: 21/07/03
* Description:
* 扫描表的一个列族的数据
* 总结:
* 1 get一行数据
* 2 List<Get></> 多行
* 3 扫描整个表的所有数据
* 4 扫描整个表的某个列族的所有数据
*/
public class Hbase08_ReadData4 {
public static void main(String[] args) throws Exception {
Connection connection = HbaseUtils.getConnection();
// 获取表对象
Table tb_a1 = connection.getTable(TableName.valueOf("tb_a1"));
Scan scan = new Scan();
// 设置过滤器 , 过滤符合条件的结果
// scan.setFilter()
// 设置扫描范围
scan.withStartRow("rk002".getBytes()) ;
// scan.withStopRow()
/* Get get = new Get("".getBytes());
get.setFilter()*/
//描整个表的所有数据
ResultScanner scanner = tb_a1.getScanner(scan);
for (Result result : scanner) {
HbaseUtils.showData(result);
}
connection.close();
}
}
3 删除数据
package doit.day03;
import doit.day02.HbaseUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Table;
/**
* Author: Hang.Z
* Date: 21/07/03
* Put put数据
* new Put("rk01101") ;
* put.addColumn("cf" , "name","zss")
* put.addColumn("cf" , "age","zss")
* put.addColumn("cf" , "gender","zss")
* put.addColumn("cf" , "addr","zss")
* put.addColumn("cf2" , "job","zss")
* Description:
* 删除数据
* 1 删除1行
* 2 删除一个属性
* 3 删除多个属性
*/
public class Hbase09_DeleteData {
public static void main(String[] args) throws Exception {
Connection conn = HbaseUtils.getConnection();
Table tb_teacher = conn.getTable(TableName.valueOf("tb_teacher"));
Delete delete = new Delete("2".getBytes());
// 设置 列族下的属性
delete.addColumn("cf".getBytes(),"name".getBytes()) ;
delete.addColumn("cf".getBytes(),"city".getBytes()) ;
tb_teacher.delete(delete);
conn.close();
}
}
4 创建名称空间
package doit.day03;
import doit.day02.HbaseUtils;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
/**
* Author: Hang.Z
* Date: 21/07/03
* Description:
*/
public class Hbase11_CreateNS {
public static void main(String[] args) throws Exception {
Connection conn = HbaseUtils.getConnection();
Admin admin = conn.getAdmin();
NamespaceDescriptor.Builder builder = NamespaceDescriptor.create("ns24");
NamespaceDescriptor namespaceDescriptor = builder.build();
admin.createNamespace(namespaceDescriptor);
admin.close();
conn.close();
}
}
二、快照
Commands: clone_snapshot, delete_all_snapshot, delete_snapshot, delete_table_snapshots, list_snapshots, list_table_snapshots, restore_snapshot, snapshot
snapshot 'tb_teacher' , 'teacher_20210703' -- 创建快照
restore_snapshot 'teacher_20210703' -- 恢复快照
clone_snapshot 'teacher_20210703' , 'ns24:new_tb_teacher' -- 备份数据
list_snapshots -- 查看当前所有的快照
三、写数据流程
- 客户端请求写数据
- zookeeper收到请求,根据要写入的行号,返回该数据要请求的regionserver
- regionserver收到请求返回(META)元数据给客户端,到客户端的缓存中
- 客户端解析定位到将数据插入到哪个region,请求向它写数据
- 每个列族对应一个cf-store类来写入,cfstore在memorystore中不断生成StoreFile对象,同时将数据进行了排序,同时将写入的操作写入日志中。
- 当添加数据到一定程度,storefile对象将会上传(刷新)到hdfs中生成hfile文件,每个对象生成一个hfile。
flush的条件(常问)
- 内存数据达到128M
- 手动强制刷新
- 操作次数/操作时间达到阈值
- 当前节点整体的内存达到阈值
什么时候才算更新了一次数据?(面试题)
假如对同一个数据put,delete先后两个操作,会在hdfs中写入两个hfile文件,当更新或者删除数据的时候,会将这俩个文件合并成一个hfile文件,此时才是真正的跟新了一次数据。
四、读数据的流程
- 客户端请求get rowkey的值,向zookeeper请求
- zookeeper返回给客户端数据所在的机器
- 客户端向该机器请求下载META数据
- regionserver返回给客户端META数据,客户端写到缓存中
- 客户端解析META数据,定位到具体的region
- 开始读数据,查看cfstore的memorystore中查看是否有该数据,如果没有同时从本机的缓存和hdfs中查找该数据。
- hdfs查看中查找数据利用了布隆过滤器,可以快速筛选出可能有该数据的Hfile文件。布隆过滤器的基本原理是,在生成hfile时,使用很小的空间,存入一个字节数组,根据行号取哈希地址,此地址对应的值设置为1,其他没有值得设置为0。在用户查看当前hfile中是否有想要的rowkey的时候,根据行号计算出哈希值,可以直接判断出该文件中是否有该行,如果没有此行号,那么这个文件一定没有这个值,如果有进一步查看hfile的内容
- 进一步查看hfile中rowkey对应的值,hfile文件生成的时候,在对数据块排序后,对他们加上了索引,存储了一个rowkey-index的哈希表,过滤器得到rowkey后,直接根据这个哈希表得到值得索引,进而返回rowkey对应的数据。
- 由于可能对同一行的数据做多个操作,就会生成多个hfile文件,同一个数据的不同操作也就对应有多个数据块,此时,读取数据的时候,会将region缓存中的数据块和hdfs中的数据块合并,然后在新的hfile中返回数据
hfile文件的生成过程
- memorystore刷新操作,生成hfile文件,发送到hdfs上。
- 生成过程:
- 首先对数据排序:rowkey、列族、属性、值
- 封装成数据块
- 生成rowkey-index的索引表
- 生成布隆过滤器:rowkey-标记数据
五、数据的合并
由于每次对数据操作一次,都会生成一个hfile文件,hdfs端会在空闲时间,把多个hfile文件进行合并。
最后,如果合并之后,多个region中的数据分配的不合理,会自动的合并region。
合并过程
1. 在HDFS底层数据以regionname为文件夹
2. 原来的region文件夹不存在,生成新的文件夹
3. 内部hfile的移动
shell
#合并region
merge_region '28eea66d36680849e4abdbfb36d25f7a','04980472336896c55bd96bae4271a909'
# 移动region
move '04980472336896c55bd96bae4271a909' , 'linux02,16020,1625467987938'
# 自动负载均衡
balancer