依赖的maven
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>1.4.3</version>
<type>pom</type>
</dependency>
CURD代码
注意:
全表扫描不是对某个时间点表的快照的扫描.如果扫描已经开始,但是在行R被扫描器对象读出之前,行R被改变了,那么扫描器读出行R更新后的版本.但是扫描器读出的数据是一致的,得到R更新后的完整行
前置操作
package com.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
public class HbaseDemo {
private Configuration conf = null;
private Connection conn = null;
@Before
public void init() throws IOException {
//构建个配置
conf = HBaseConfiguration.create();
//对于hbase的客户端来说,只需要知道hbase所使用的zookeeper集群就可以了
//因为hbase的客户端找hbase读写数据完全不用经过hmaster
conf.set("hbase.zookeeper.quorum", "master:2181,slave1:2181,slave2:2181");
conn = ConnectionFactory.createConnection(conf);
}
}
添加测试数据
//添加数据
@Test
public void testPut() throws IOException {
Table table = conn.getTable(TableName.valueOf("t_user_info"));
ArrayList<Put> puts = new ArrayList<Put>();
//构建一个put对象(kv),指定行键
Put put01 = new Put(Bytes.toBytes("user001"));
put01.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"), Bytes.toBytes("zhangsan"));
Put put02 = new Put("user001".getBytes());
put02.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("password"), Bytes.toBytes("123456"));
Put put03 = new Put("user002".getBytes());
put03.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"), Bytes.toBytes("lisi"));
put03.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("married"), Bytes.toBytes("false"));
Put put04 = new Put("zhang_sh_01".getBytes());
put04.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"), Bytes.toBytes("zhang01"));
put04.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("married"), Bytes.toBytes("false"));
Put put05 = new Put("zhang_sh_02".getBytes());
put05.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"), Bytes.toBytes("zhang02"));
put05.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("married"), Bytes.toBytes("false"));
Put put06 = new Put("liu_sh_01".getBytes());
put06.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"), Bytes.toBytes("liu01"));
put06.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("married"), Bytes.toBytes("false"));
Put put07 = new Put("zhang_bj_01".getBytes());
put07.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"), Bytes.toBytes("zhang03"));
put07.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("married"), Bytes.toBytes("false"));
Put put08 = new Put("zhang_bj_01".getBytes());
put08.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"), Bytes.toBytes("zhang04"));
put08.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("married"), Bytes.toBytes("false"));
puts.add(put01);
puts.add(put02);
puts.add(put03);
puts.add(put04);
puts.add(put05);
puts.add(put06);
puts.add(put07);
puts.add(put08);
table.put(puts);
table.close();
conn.close();
}
表是否存在
@Test
public boolean testExists(String tableName) throws IOException {
//老API
//HBaseAdmin admin = new HBaseAdmin(conf);
//新API
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
return admin.tableExists(TableName.valueOf(tableName));
}
创建表
这是不需要命名空间的
//建表
@Test
public void testCreate() throws IOException {
//获取一个表管理器
Admin admin = conn.getAdmin();
//构造一个表描述器,并指定表名
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("t_user_info"));
//构造一个列族描述器,并指定列族名
HColumnDescriptor hcd1 = new HColumnDescriptor("base_info");
//为该列族设定一个布隆过滤器类型参数/版本数量
hcd1.setBloomFilterType(BloomType.ROW).setVersions(1, 3);
//构造第二个列族描述器,并指定列族名
HColumnDescriptor hcd2 = new HColumnDescriptor("extra_info");
//为该列族设定一个布隆过滤器类型参数/版本数量
hcd2.setBloomFilterType(BloomType.ROW).setVersions(1, 3);
//将列族描述器添加到表描述器中
htd.addFamily(hcd1).addFamily(hcd2);
admin.createTable(htd);
admin.close();
conn.close();
}
命名空间管理
命名空间可以被创建、移除、修改。
表和命名空间的隶属关系在在创建表时决定,通过以下格式指定:
<namespace>:<table>
Example:hbase shell中创建命名空间、创建命名空间中的表、移除命名空间、修改命名空间
#Create a namespace
create_namespace 'my_ns'
#create my_table in my_ns namespace
create 'my_ns:my_table', 'fam'
#drop namespace
drop_namespace 'my_ns'
#alter namespace
alter_namespace 'my_ns', {METHOD => 'set', 'PROPERTY_NAME' => 'PROPERTY_VALUE'}
预定义的命名空间
有两个系统内置的预定义命名空间:
- hbase:系统命名空间,用于包含hbase的内部表
- default:所有未指定命名空间的表都自动进入该命名空间
Example:指定命名空间和默认命名空间
#namespace=foo and table qualifier=bar
create 'foo:bar', 'fam'
#namespace=default and table qualifier=bar
create 'bar', 'fam'
代码
Configuration conf = HBaseConfiguration.create();
HBaseAdmin admin = new HBaseAdmin(conf);
//create namespace named "my_ns"
admin.createNamespace(NamespaceDescriptor.create("my_ns").build());
//create tableDesc, with namespace name "my_ns" and table name "mytable"
HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf("my_ns:mytable"));
tableDesc.setDurability(Durability.SYNC_WAL);
//add a column family "mycf"
HColumnDescriptor hcd = new HColumnDescriptor("mycf");
tableDesc.addFamily(hcd);
admin.createTable(tableDesc);
admin.close();
关键知识点
- 必须将HBase集群的hbase-site.xml文件添加进工程的classpath中,或者通过Configuration对象设置相关属性,否则程序获取不到集群相关信息,也就无法找到集群,运行程序时会报错;
- HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(“my_ns:mytable”))代码是描述表mytable,并将mytable放到了my_ns命名空间中,前提是该命名空间已存在,如果指定的是不存在命名空间,则会报错org.apache.hadoop.hbase.NamespaceNotFoundException;
- 命名空间一般在建模阶段通过命令行创建,在java代码中通过admin.createNamespace(NamespaceDescriptor.create(“my_ns”).build())创建的机会不多;
- 创建HBaseAdmin对象时就已经建立了客户端程序与HBase集群的connection,所以在程序执行完成后,务必通过admin.close()关闭connection;
- 可以通过HTableDescriptor对象设置表的特性,比如:通过tableDesc.setMaxFileSize(512)设置一个region中的store文件的最大size,当一个region中的最大store文件达到这个size时,region就开始分裂;通过tableDesc.setMemStoreFlushSize(512)设置region内存中的memstore的最大值,当memstore达到这个值时,开始往磁盘中刷数据。更多特性请自行查阅官网API;
- 可以通过HColumnDescriptor对象设置列族的特性,比如:通过hcd.setTimeToLive(5184000)设置数据保存的最长时间;通过hcd.setInMemory(true)设置数据保存在内存中以提高响应速度;通过 hcd.setMaxVersions(10)设置数据保存的最大版本数;通过hcd.setMinVersions(5)设置数据保存的最小版本数(配合TimeToLive使用)。更多特性请自行查阅官网API;
- 数据的版本数只能通过HColumnDescriptor对象设置,不能通过HTableDescriptor对象设置;
- 由于HBase的数据是先写入内存,数据累计达到内存阀值时才往磁盘中flush数据,所以,如果在数据还没有flush进硬盘时,regionserver down掉了,内存中的数据将丢失。要想解决这个场景的问题就需要用到WAL(Write-Ahead-Log),tableDesc.setDurability(Durability.SYNC_WAL)就是设置写WAL日志的级别,示例中设置的是同步写WAL,该方式安全性较高,但无疑会一定程度影响性能,请根据具体场景选择使用;
- setDurability(Durability d)方法可以在相关的三个对象中使用,分别是:HTableDescriptor,Delete,Put(其中Delete和Put的该方法都是继承自父类org.apache.hadoop.hbase.client.Mutation)。分别针对表、插入操作、删除操作设定WAL日志写入级别。需要注意的是,Delete和Put并不会继承Table的Durability级别(已实测验证)。Durability是一个枚举变量,可选值参见4.2节。如果不通过该方法指定WAL日志级别,则为默认USE_DEFAULT级别。
删除表
删除表没创建表那么多学问,直接上代码
Configuration conf = HBaseConfiguration.create();
HBaseAdmin admin = new HBaseAdmin(conf);
String tablename = "my_ns:mytable";
if(admin.tableExists(tablename)) {
try {
if (! admin.isTableDisabled(TableName.valueOf(tableName))) {
admin.disableTable(tablename);
}
admin.deleteTable(tablename);
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
}
admin.close();
删除表前必须先disable表
删除表中的数据
Delete类用于删除表中的一行数据,通过HTable.delete来执行该动作。
在执行Delete操作时,HBase并不会立即删除数据,而是对需要删除的数据打上一个“墓碑”标记,直到当Storefile合并时,再清除这些被标记上“墓碑”的数据。
如果希望删除整行,用行键来初始化一个Delete对象即可。如果希望进一步定义删除的具体内容,可以使用以下这些Delete对象的方法:
- 为了删除指定的列族,可以使用deleteFamily
- 为了删除指定列的多个版本,可以使用deleteColumns
- 为了删除指定列的指定版本,可以使用deleteColumn,这样的话就只会删除版本号(时间戳)与指定版本相同的列。如果不指定时间戳,默认只删除最新的版本
构造函数
- 指定要删除的行键
Delete(byte[] row)
删除行键指定行的数据。
如果没有进一步的操作,使用该构造函数将删除行键指定的行中所有列族中所有列的所有版本!
- 指定要删除的行键和时间戳
Delete(byte[] row, long timestamp)
删除行键和时间戳共同确定行的数据。
如果没有进一步的操作,使用该构造函数将删除行键指定的行中,所有列族中所有列的时间戳小于等于指定时间戳的数据版本。
注意:该时间戳仅仅和删除行有关,如果需要进一步指定列族或者列,你必须分别为它们指定时间戳。
- 给定一个字符串,目标行键的偏移,截取的长度
Delete(byte[] rowArray, int rowOffset, int rowLength)
- 给定一个字符串,目标行键的偏移,截取的长度,时间戳
Delete(byte[] rowArray, int rowOffset, int rowLength, long ts)
常用方法
Delete deleteColumn(byte[] family, byte[] qualifier)
删除指定列的最新版本的数据。Delete deleteColumns(byte[] family, byte[] qualifier)
删除指定列的所有版本的数据。Delete deleteColumn(byte[] family, byte[] qualifier, long timestamp)
删除指定列的指定版本的数据。Delete deleteColumns(byte[] family, byte[] qualifier, long timestamp)
删除指定列的,时间戳小于等于给定时间戳的所有版本的数据。Delete deleteFamily(byte[] family)
删除指定列族的所有列的所有版本数据。Delete deleteFamily(byte[] family, long timestamp)
删除指定列族的所有列中时间戳小于等于指定时间戳的所有数据。Delete deleteFamilyVersion(byte[] family, long timestamp)
删除指定列族中所有列的时间戳等于指定时间戳的版本数据。void setTimestamp(long timestamp)
为Delete对象设置时间戳。
实例代码
删除整行的所有列族、所有行、所有版本
Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, "rd_ns:leetable");
Delete delete = new Delete(Bytes.toBytes("000"));
table.delete(delete);
table.close();
删除指定列的最新版本
Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, "rd_ns:leetable");
Delete delete = new Delete(Bytes.toBytes("100003"));
delete.deleteColumn(Bytes.toBytes("info"), Bytes.toBytes("address"));
table.delete(delete);
table.close();
删除指定列的所有版本
接以上场景,执行以下代码:
Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, "rd_ns:leetable");
Delete delete = new Delete(Bytes.toBytes("100003"));
delete.deleteColumns(Bytes.toBytes("info"), Bytes.toBytes("address"));
table.delete(delete);
table.close();
删除指定列族中所有列的时间戳等于指定时间戳的版本数据
Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, "rd_ns:leetable");
Delete delete = new Delete(Bytes.toBytes("100003"));
delete.deleteFamilyVersion(Bytes.toBytes("info"), 1405390959464L);
table.delete(delete);
table.close();
修改表
修改现有列族的属性
@Test
public void testModify() throws IOException {
Admin admin = conn.getAdmin();
// admin.disableTable(TableName.valueOf("t_user_info"));
// 修改已有的ColumnFamily
HTableDescriptor table = admin.getTableDescriptor(TableName.valueOf("t_user_info"));
HColumnDescriptor f2 = table.getFamily("extra_info".getBytes());
//设置布隆过滤器
f2.setBloomFilterType(BloomType.ROWCOL);
//设置版本
f2.setVersions(1, 5);
// 添加新的ColumnFamily
table.addFamily(new HColumnDescriptor("other_info"));
//将修改后的描述对象应用到目标表
admin.modifyTable(TableName.valueOf("t_user_info"), table);
admin.close();
conn.close();
}
修改表,删除三个列族,新增一个列族
Configuration conf = HBaseConfiguration.create();
HBaseAdmin admin = new HBaseAdmin(conf);
String tablename = "rd_ns:itable";
if(admin.tableExists(tablename)) {
try {
admin.disableTable(tablename);
//get the TableDescriptor of target table
HTableDescriptor newtd = admin.getTableDescriptor(Bytes.toBytes("rd_ns:itable"));
//remove 3 useless column families
newtd.removeFamily(Bytes.toBytes("note"));
newtd.removeFamily(Bytes.toBytes("newcf"));
newtd.removeFamily(Bytes.toBytes("sysinfo"));
//create HColumnDescriptor for new column family
HColumnDescriptor newhcd = new HColumnDescriptor("action_log");
newhcd.setMaxVersions(10);
newhcd.setKeepDeletedCells(true);
//add the new column family(HColumnDescriptor) to HTableDescriptor
newtd.addFamily(newhcd);
//modify target table struture
admin.modifyTable(Bytes.toBytes("rd_ns:itable"),newtd);
admin.enableTable(tablename);
} catch (Exception e) {
e.printStackTrace();
}
}
admin.close();
逻辑很简单:
- 通过admin.getTableDescriptor(Bytes.toBytes(“rd_ns:itable”))取得目标表的描述对象,应该就是取得指向该对象的指针了;
- 修改目标表描述对象;
- 通过admin.modifyTable(Bytes.toBytes(“rd_ns:itable”),newtd)将修改后的描述对象应用到目标表。
添加数据
新增、更新数据Put
常用构造函数
- 指定行键
public Put(byte[] row)
参数:row 行键
- 指定行键和时间戳
public Put(byte[] row, long ts)
参数:row 行键,ts 时间戳
- 从目标字符串中提取子串,作为行键
Put(byte[] rowArray, int rowOffset, int rowLength)
- 从目标字符串中提取子串,作为行键,并加上时间戳
Put(byte[] rowArray, int rowOffset, int rowLength, long ts)
常用方法
- 指定列族、限定符,添加值
add(byte[] family, byte[] qualifier, byte[] value)
- 指定列族、限定符、时间戳,添加值
add(byte[] family, byte[] qualifier, long ts, byte[] value)
- 设置写WAL(Write-Ahead-Log)的级别
public void setDurability(Durability d)
参数是一个枚举值,可以有以下几种选择:
- ASYNC_WAL : 当数据变动时,异步写WAL日志
- SYNC_WAL : 当数据变动时,同步写WAL日志
- FSYNC_WAL : 当数据变动时,同步写WAL日志,并且,强制将数据写入磁盘
- SKIP_WAL : 不写WAL日志
- USE_DEFAULT : 使用HBase全局默认的WAL写入级别,即SYNC_WAL
实例代码
插入行
Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, "rd_ns:leetable");
Put put = new Put(Bytes.toBytes("100001"));
put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("lion"));
put.add(Bytes.toBytes("info"), Bytes.toBytes("address"), Bytes.toBytes("shangdi"));
put.add(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("30"));
put.setDurability(Durability.SYNC_WAL);
table.put(put);
table.close();
更新行
Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, "rd_ns:leetable");
Put put = new Put(Bytes.toBytes("100001"));
put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("lee"));
put.add(Bytes.toBytes("info"), Bytes.toBytes("address"), Bytes.toBytes("longze"));
put.add(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("31"));
put.setDurability(Durability.SYNC_WAL);
table.put(put);
table.close();
注意:
- Put的构造函数都需要指定行键,如果是全新的行键,则新增一行;如果是已有的行键,则更新现有行
- 创建Put对象及put.add过程都是在构建一行的数据,创建Put对象时相当于创建了行对象,add的过程就是往目标行里添加cell,直到table.put才将数据插入表格;
- 以上代码创建Put对象用的是构造函数1,也可用构造函数2,第二个参数是时间戳;
- Put还有别的构造函数,请查阅官网API。
从目标字符串中提取子串,作为行键,构建Put
Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, "rd_ns:leetable");
Put put = new Put(Bytes.toBytes("100001_100002"),7,6);
put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("show"));
put.add(Bytes.toBytes("info"), Bytes.toBytes("address"), Bytes.toBytes("caofang"));
put.add(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("30"));
table.put(put);
table.close();
注意,关于:Put put = new Put(Bytes.toBytes(“100001_100002”),7,6)
第二个参数是偏移量,也就是行键从第一个参数的第几个字符开始截取;
第三个参数是截取长度;
这个代码实际是从 100001_100002 中截取了100002子串作为目标行的行键
读取数据
读取,get一次读取一行
@Test
public void testGet() throws IOException {
Table table = conn.getTable(TableName.valueOf("t_user_info"));
//构造一个get查询对象.指定要get的是那一行
Get get = new Get("user001".getBytes());
Result result = table.get(get);
CellScanner cellScanner = result.cellScanner();
//迭代
while (cellScanner.advance()) {
Cell current = cellScanner.current();
//列族名
byte[] familyArray = current.getFamilyArray();
//列标识符的名称
byte[] qualifierArray = current.getQualifierArray();
//具体的值
byte[] valueArray = current.getValueArray();
//获取有用字符
System.out.printf(new String(familyArray, current.getFamilyOffset(), current.getFamilyLength()));
System.out.printf(":" + new String(qualifierArray, current.getQualifierOffset(), current.getQualifierLength()));
System.out.printf(" " + new String(valueArray, current.getValueOffset(), current.getValueLength()));
System.out.println();
}
table.close();
conn.close();
}
批量查询数据
@Test
public void testScan() throws IOException {
Table t_user_info = conn.getTable(TableName.valueOf("t_user_info"));
//表是liu_sh_01,row key是zhang_bj_01
//数据(字典排序,从liu_sh_01到zhang_bj_01之间的row key全部遍历)("\000"不加这个是包头不包尾,加了是全部包,原因是这个字段排序是排在zhang_bj_01后面),因为永远不知道下一个rowkey是什么,就加个\000来表示下一个rowkey
Scan scan = new Scan(Bytes.toBytes("liu_sh_01"), Bytes.toBytes("zhang_bj_01" + "\000"));
ResultScanner scanner = t_user_info.getScanner(scan);
//迭代器
Iterator<Result> iter = scanner.iterator();
while (iter.hasNext()) {
//获取一行记录
Result result = iter.next();
//获取到每一个cell
CellScanner cellScanner = result.cellScanner();
//遍历cell
while (cellScanner.advance()) {
Cell current = cellScanner.current();
byte[] familyArray = current.getFamilyArray();
byte[] valueArray = current.getValueArray();
byte[] qualifierArray = current.getQualifierArray();
byte[] rowArray = current.getRowArray();
System.out.print(new String(rowArray, current.getRowOffset(), current.getRowLength())+" ");
System.out.print(new String(familyArray, current.getFamilyOffset(), current.getFamilyLength()));
System.out.print(":" + new String(qualifierArray, current.getQualifierOffset(), current.getQualifierLength()));
System.out.print(" " + new String(valueArray, current.getValueOffset(), current.getValueLength()));
System.out.println();
}
System.out.println("-----------------------------");
}
}
读取指定的列,多版本
@Test
public void testGetColumn() throws IOException {
Table table = conn.getTable(TableName.valueOf("t_user_info"));
//构造一个get查询对象.指定要get的是那一行
Get get = new Get("zhang_sh_02".getBytes());
//设置一次性取多少个版本的数据
get.setMaxVersions(4);
// 获取指定列族和列修饰符对应的列
get.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"));
Result result = table.get(get);
for (KeyValue kv : result.list()) {
System.out.println("family:" + Bytes.toString(kv.getFamily()));
System.out.println("qualifier:" + Bytes.toString(kv.getQualifier()));
System.out.println("value:" + Bytes.toString(kv.getValue()));
System.out.println("Timestamp:" + kv.getTimestamp());
System.out.println("-------------------------------------------");
}
table.close();
conn.close();
}
Get获取单行
如果希望获取整行数据,用行键初始化一个Get对象就可以,如果希望进一步缩小获取的数据范围,可以使用Get对象的以下方法:
- 如果希望取得指定列族的所有列数据,使用addFamily添加所有的目标列族即可;
- 如果希望取得指定列的数据,使用addColumn添加所有的目标列即可;
- 如果希望取得目标列的指定时间戳范围的数据版本,使用setTimeRange;
- 如果仅希望获取目标列的指定时间戳版本,则使用setTimestamp;
- 如果希望限制每个列返回的版本数,使用setMaxVersions;
- 如果希望添加过滤器,使用setFilte
上述讲述了如何使用Get从HBase中获取数据,并将数据进行展示,其实Get对象中的很多属性可以控制在进行查询时的细节控制,从而控制数据从HBase服务器返回时的数据量,从而可以进行数据优化
Get(byte[] row) / Get(byte[] row, RowLock lock)
初始化函数。在初始化函数时必须要指定Get将要获取的行键,第二个函数则是允许用户自己对Get上一个行锁,但是系统并不赞成用户这么使用。因为在多个客户端进行操作,且都上了自定义的行锁以后,可能会出现因为彼此的行锁需要对方的资源而死锁现象。但是两个客户端的长时间等待与系统连接资源的占用。
addFamily(byte[] family) / addColumn(byte[] family, byte[] qualifier)
添加列簇 / 添加列函数。通过该函数Get在数据获取时,获取的数据范围:两个函数都不设定时获取正行的所有数据。 使用 addFamily时获取制定列簇的所有列的数据。 addColumn则获取制定列的数据
setTimeStamp(long timestamp)
设置获取数据的时间戳
setTimeRange(long minTime,long maxTime)
设置获取数据的时间戳范围
setMaxVersion(int version) / setMaxVersion()
在默认情况下,Get方法之获取一列的最新的版本。但是有时需要的话则会一次获取多个版本的数据。 第一个函数可以指定确切的返回的版本数量。第二个函数则相当于setMaxVersion(Integer.MAX_VALUE)。即获取列中所有版本的 数据。
setCacheBlock(boolean open)
是否打开服务器端快缓存。设置该Get获取的数据是否缓存在内存中
在HBase中,整个表以region分块的方式被分布式的存在不同的region服务器中。每一个region服务器将会维护多个region。而在每一个region中都会存在快缓存区域。当每次去读某一个KeyValue数据块时,则会将整个数据加载到缓存区中。又因为加载的数据远大于一个KeyValue所含的数据大小。所以一般情况下缓存区域内都会存放当前KeyValue对象的连续的数据。但是如果在随机读写的程序中,这种数据加载进入缓存区并没有任何的作用,反而会因为在家时间而使得数据获取时间增长。因此我们要根据实际情况去选择是否开启region上的缓存区。连续读写时,开始缓存区可以增加搜索速度。在随机读写时,关闭缓存区可以缩小读取时间。
setFilter(Filter f)
添加过滤器。因为HBase并没有原声的SQL指定环境,因此在SQL语句中的where条件语句就需要通过特定的借口去实现,而Filter则就是顶替了where 语句的作用。能够实现在在数据查询中的一些精细的控制。
- 设置获取数据的版本
Get setMaxVersions(int maxVersions)
设定获取数据的版本数
Get setMaxVersions()
设定获取数据的所有版本
代码
获取行键指定行的所有列族、所有列的最新版本数据
Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, "rd_ns:leetable");
Get get = new Get(Bytes.toBytes("100003"));
Result r = table.get(get);
for (Cell cell : r.rawCells()) {
System.out.println(
"Rowkey : "+Bytes.toString(r.getRow())+
" Familiy:Quilifier : "+Bytes.toString(CellUtil.cloneQualifier(cell))+
" Value : "+Bytes.toString(CellUtil.cloneValue(cell))
);
}
table.close();
获取行键指定行中,指定列的最新版本数据
Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, "rd_ns:leetable");
Get get = new Get(Bytes.toBytes("100003"));
get.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"));
Result r = table.get(get);
for (Cell cell : r.rawCells()) {
System.out.println(
"Rowkey : "+Bytes.toString(r.getRow())+
" Familiy:Quilifier : "+Bytes.toString(CellUtil.cloneQualifier(cell))+
" Value : "+Bytes.toString(CellUtil.cloneValue(cell))
);
}
table.close();
获取行键指定的行中,指定时间戳的数据
Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, "rd_ns:leetable");
Get get = new Get(Bytes.toBytes("100003"));
get.setTimeStamp(1405407854374L);
Result r = table.get(get);
for (Cell cell : r.rawCells()) {
System.out.println(
"Rowkey : "+Bytes.toString(r.getRow())+
" Familiy:Quilifier : "+Bytes.toString(CellUtil.cloneQualifier(cell))+
" Value : "+Bytes.toString(CellUtil.cloneValue(cell))
);
}
table.close();
获取行键指定的行中,所有版本的数据
Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, "rd_ns:itable");
Get get = new Get(Bytes.toBytes("100003"));
get.setMaxVersions();
Result r = table.get(get);
for (Cell cell : r.rawCells()) {
System.out.println(
"Rowkey : "+Bytes.toString(r.getRow())+
" Familiy:Quilifier : "+Bytes.toString(CellUtil.cloneQualifier(cell))+
" Value : "+Bytes.toString(CellUtil.cloneValue(cell))+
" Time : "+cell.getTimestamp()
);
}
table.close();
注意:
能输出多版本数据的前提是当前列族能保存多版本数据,列族可以保存的数据版本数通过HColumnDescriptor的setMaxVersions(Int)方法设置
scan获取多行
Scan对象可以返回满足给定条件的多行数据。如果希望获取所有的行,直接初始化一个Scan对象即可。如果希望限制扫描的行范围,可以使用以下方法:
- 如果希望获取指定列族的所有列,可使用addFamily方法来添加所有希望获取的列族
- 如果希望获取指定列,使用addColumn方法来添加所有列
- 通过setTimeRange方法设定获取列的时间范围
- 通过setTimestamp方法指定具体的时间戳,只返回该时间戳的数据
- 通过setMaxVersions方法设定最大返回的版本数
- 通过setBatch方法设定返回数据的最大行数
- 通过setFilter方法为Scan对象添加过滤器,过滤器详解请参见:http://blog.csdn.net/u010967382/article/details/37653177
- Scan的结果数据是可以缓存在内存中的,可以通过getCaching()方法来查看当前设定的缓存条数,也可以通过setCaching(int caching)来设定缓存在内存中的行数,缓存得越多,以后查询结果越快,同时也消耗更多内存。此外,通过setCacheBlocks方法设置是否缓存Scan的结果数据块,默认为true
- 我们可以通过setMaxResultSize(long)方法来设定Scan返回的结果行数
常用构造函数
- 创建扫描所有行的Scan
Scan()
- 创建Scan,从指定行开始扫描
Scan(byte[] startRow)
参数:startRow行键
注意:如果指定行不存在,从下一个最近的行开始
- 创建Scan,指定起止行
Scan(byte[] startRow, byte[] stopRow)
参数:startRow起始行,stopRow终止行
注意:startRow <= 结果集 < stopRow
- 创建Scan,指定起始行和过滤器
Scan(byte[] startRow, Filter filter)
参数:startRow起始行,filter过滤器
注意:过滤器的功能和构造参见http://blog.csdn.net/u010967382/article/details/37653177
常用方法
Scan setStartRow(byte[] startRow)
设置Scan的开始行,默认结果集包含该行。如果希望结果集不包含该行,可以在行键末尾加上0。Scan setStopRow(byte[] stopRow)
设置Scan的结束行,默认结果集不包含该行。如果希望结果集包含该行,可以在行键末尾加上0。Scan setTimeRange(long minStamp, long maxStamp)
扫描指定时间范围的数据Scan setTimeStamp(long timestamp)
扫描指定时间的数据Scan addColumn(byte[] family, byte[] qualifier)
指定扫描的列Scan addFamily(byte[] family)
指定扫描的列族Scan setFilter(Filter filter)
为Scan设置过滤器Scan setReversed(boolean reversed)
设置Scan的扫描顺序,默认是正向扫描(false),可以设置为逆向扫描(true)。注意:该方法0.98版本以后才可用!!Scan setMaxVersions()
获取所有版本的数据Scan setMaxVersions(int maxVersions)
设置获取的最大版本数void setCaching(int caching)
设定缓存在内存中的行数,缓存得越多,以后查询结果越快,同时也消耗更多内存void setRaw(boolean raw)
激活或者禁用raw模式。如果raw模式被激活,Scan将返回所有已经被打上删除标记但尚未被真正删除的数据。该功能仅用于激活了KEEP_DELETED_ROWS的列族,即列族开启了hcd.setKeepDeletedCells(true)。Scan激活raw模式后,就不能指定任意的列,否则会报错
代码
扫描表中的所有行的最新版本数据
Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, "rd_ns:itable");
Scan s = new Scan();
ResultScanner rs = table.getScanner(s);
for (Result r : rs) {
for (Cell cell : r.rawCells()) {
System.out.println(
"Rowkey : "+Bytes.toString(r.getRow())+
" Familiy:Quilifier : "+Bytes.toString(CellUtil.cloneQualifier(cell))+
" Value : "+Bytes.toString(CellUtil.cloneValue(cell))+
" Time : "+cell.getTimestamp()
);
}
}
table.close();
扫描指定行键范围,通过末尾加0,使得结果集包含StopRow
Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, "rd_ns:itable");
Scan s = new Scan();
s.setStartRow(Bytes.toBytes("100001"));
s.setStopRow(Bytes.toBytes("1000020"));
ResultScanner rs = table.getScanner(s);
for (Result r : rs) {
for (Cell cell : r.rawCells()) {
System.out.println(
"Rowkey : "+Bytes.toString(r.getRow())+
" Familiy:Quilifier : "+Bytes.toString(CellUtil.cloneQualifier(cell))+
" Value : "+Bytes.toString(CellUtil.cloneValue(cell))+
" Time : "+cell.getTimestamp()
);
}
}
table.close();
返回所有已经被打上删除标记但尚未被真正删除的数据
然而,使用Scan强大的s.setRaw(true)方法,可以获得所有已经被打上删除标记但尚未被真正删除的数据。
代码如下:
Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, "rd_ns:itable");
Scan s = new Scan();
s.setStartRow(Bytes.toBytes("100003"));
s.setRaw(true);
s.setMaxVersions();
ResultScanner rs = table.getScanner(s);
for (Result r : rs) {
for (Cell cell : r.rawCells()) {
System.out.println(
"Rowkey : "+Bytes.toString(r.getRow())+
" Familiy:Quilifier : "+Bytes.toString(CellUtil.cloneQualifier(cell))+
" Value : "+Bytes.toString(CellUtil.cloneValue(cell))+
" Time : "+cell.getTimestamp()
);
}
}
table.close();
结合过滤器,获取所有age在25到30之间的行
Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, "rd_ns:itable");
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
SingleColumnValueFilter filter1 = new SingleColumnValueFilter(
Bytes.toBytes("info"),
Bytes.toBytes("age"),
CompareOp.GREATER_OR_EQUAL,
Bytes.toBytes("25")
);
SingleColumnValueFilter filter2 = new SingleColumnValueFilter(
Bytes.toBytes("info"),
Bytes.toBytes("age"),
CompareOp.LESS_OR_EQUAL,
Bytes.toBytes("30")
);
filterList.addFilter(filter1);
filterList.addFilter(filter2);
Scan scan = new Scan();
scan.setFilter(filterList);
ResultScanner rs = table.getScanner(scan);
for (Result r : rs) {
for (Cell cell : r.rawCells()) {
System.out.println(
"Rowkey : "+Bytes.toString(r.getRow())+
" Familiy:Quilifier : "+Bytes.toString(CellUtil.cloneQualifier(cell))+
" Value : "+Bytes.toString(CellUtil.cloneValue(cell))+
" Time : "+cell.getTimestamp()
);
}
}
table.close();
计数器
计数器可以-1也可以是0
在mapreduce中要注意,mapreduce任务失败可能会重试,而导致如果用这个可能会不准.因为在mapreduce中可能不是幂等运算
单计数器
Table table = conn.getTable(TableName.valueOf("t_user_info"));
//记住这个值初始的时候不要用put去设置,会导致后面的错误 原因是'1'会转换成Bytes.toBytes()
long rel = table.incrementColumnValue(Bytes.toBytes("user001"), Bytes.toBytes("base_info"),
Bytes.toBytes("hit"), 2L);
//返回这一列的结果
System.out.println(rel);
//存储成功会变成
//column=base_info:hit, timestamp=1532337393697, value=\x00\x00\x00\x00\x00\x00\x00\x01
table.close();
复合计数器
Table table = connection.getTable(TableName.valueOf("counters"));
Increment increment1 = new Increment(Bytes.toBytes("20160101"));
increment1.addColumn(Bytes.toBytes("daily"),Bytes.toBytes("clicks"),1);
increment1.addColumn(Bytes.toBytes("daily"),Bytes.toBytes("hits"),1);
increment1.addColumn(Bytes.toBytes("weekly"),Bytes.toBytes("clicks"),10);
increment1.addColumn(Bytes.toBytes("weekly"),Bytes.toBytes("hits"),10);
Result result = table.increment(increment1);
for(Cell cell:result.rawCells()){
System.out.println("Cell: " + cell +
" Value: " + Bytes.toLong(cell.getValueArray(), cell.getValueOffset(),cell.getValueLength()));
}
Increment increment2 = new Increment(Bytes.toBytes("20160101"));
increment2.addColumn(Bytes.toBytes("daily"),Bytes.toBytes("clicks"), 5);
increment2.addColumn(Bytes.toBytes("daily"),Bytes.toBytes("hits"), 1);
increment2.addColumn(Bytes.toBytes("weekly"),Bytes.toBytes("clicks"), 0);
increment2.addColumn(Bytes.toBytes("weekly"),Bytes.toBytes("hits"), -5);
Result result2 = table.increment(increment2);
for (Cell cell : result2.rawCells()) {
System.out.println("Cell: " + cell +
" Value: " + Bytes.toLong(cell.getValueArray(),
cell.getValueOffset(), cell.getValueLength()));
}
table.close();
connection.close();
获取计数器的值
@Test
public void testGet()throws Exception{
HTable table = new HTable(conf,"wc");
Get get =new Get("apple01".getBytes());
get.addColumn("cf".getBytes(),"hits".getBytes());
Result result = table.get(get);
for (KeyValue kv : result.list()) {
System.out.println("family:" + Bytes.toString(kv.getFamily()));
System.out .println("qualifier:" + Bytes.toString(kv.getQualifier()));
System.out.println("value:" + Bytes.toLong(kv.getValue()));
计数器的值获取
}
table.close();
}
扫描器缓存
在Hbase的设置里扫描每次RPC调用得到一批数据.这可以在扫描对象上使用setCaching(int)在每个扫描器(scanner)层次上设置,也可以在hbase-site.xml配置文件里使用HBase.client.scanner.caching属性来设置.
如果缓存值设置为n,每次RPC调用扫描器返回n行,然后这些数据缓存在客户端.这个设置的默认值是1,这意味着客户端对HBase的每次RPC调用在扫描整张表后仅仅返回一行.这个数字很保守,可以调整它以获得更好的性能.
但是该值设置过高意味着客户端和hbase的交互会出现较长的暂停,这会导致hbase端的超时.
ResultScanner接口也有一个next(int)调用,你可以用来要求返回扫描的下面n行.这是在API层面提供的遍历,与为了获取那n行数据客户端对HBase的RPC调用次数无关.
在内部机制中,ResultScanner使用了多次RPC调用来满足这个请求,每次RPC调用返回的行数只取决于为扫描器设置的缓存值