过滤器查询
过滤器的类型很多,但是可以分为两大类——比较过滤器,专用过滤器
过滤器的作用是在服务端判断数据是否满足条件,然后只将满足条件的数据返回给客户端;
注意,可以用reset()
重置过滤器
hbase过滤器的比较运算符:
LESS <
LESS_OR_EQUAL <=
EQUAL =
NOT_EQUAL <>
GREATER_OR_EQUAL >=
GREATER >
NO_OP 排除所有
Hbase过滤器的比较器(指定比较机制):
BinaryComparator 按字节索引顺序比较指定字节数组,采用Bytes.compareTo(byte[])
BinaryPrefixComparator 跟前面相同,只是比较左端的数据是否相同
NullComparator 判断给定的是否为空
BitComparator 按位比较
RegexStringComparator 提供一个正则的比较器,仅支持 EQUAL 和非EQUAL
SubstringComparator 判断提供的子串是否出现在value中
Hbase的过滤器分类
比较过滤器
行键过滤器RowFilter
筛选出匹配的所有的行
Filter filter1 = new RowFilter(CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("row-22")));
scan.setFilter(filter1);
列族过滤器FamilyFilter
Filter filter1 = new FamilyFilter(CompareFilter.CompareOp.LESS, new BinaryComparator(Bytes.toBytes("colfam3")));
scan.setFilter(filter1);
列过滤器QualifierFilter
filter = new QualifierFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("col-2")));
scan.setFilter(filter1);
值过滤器 ValueFilter
Filter filter = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(".4") );
scan.setFilter(filter1);
专用过滤器
单列值过滤器 SingleColumnValueFilter
——会返回满足条件的整行
SingleColumnValueFilter filter = new SingleColumnValueFilter(
Bytes.toBytes("colfam1"),
Bytes.toBytes("col-5"),
CompareFilter.CompareOp.NOT_EQUAL,
new SubstringComparator("val-5"));
filter.setFilterIfMissing(true); //如果不设置为true,则那些不包含指定column的行也会返回
scan.setFilter(filter1);
SingleColumnValueExcludeFilter
与上相反
前缀过滤器 PrefixFilter——针对行键
筛选出具有特定前缀的行键的数据
Filter filter = new PrefixFilter(Bytes.toBytes("row1"));
scan.setFilter(filter1);
列前缀过滤器 ColumnPrefixFilter
Filter filter = new ColumnPrefixFilter(Bytes.toBytes("qual2"));
scan.setFilter(filter1);
仅仅是行键过滤器 KeyOnlyFilter
这个过滤器唯一的功能就是只返回每行的行键,值全部为空,这对于只关注于行键的应用场景来说非常合适,这样忽略掉其值就可以减少传递到客户端的数据量,能起到一定的优化作用:
Filter kof = new KeyOnlyFilter(); // OK 返回所有的行,但值全是空
随机行过滤器 RandomRowFilter
从名字上就可以看出其大概的用法,本过滤器的作用就是按照一定的几率(<=0会过滤掉所有的行,>=1会包含所有的行)来返回随机的结果集,对于同样的数据集,多次使用同一个RandomRowFilter会返回不通的结果集,对于需要随机抽取一部分数据的应用场景,可以使用此过滤器:
Filter rrf = new RandomRowFilter((float) 0.8); // OK 随机选出一部分的行
包含起始行,但不包含终止行 InclusiveStopFilter
扫描的时候,我们可以设置一个开始行键和一个终止行键,默认情况下,这个行键的返回是前闭后开区间,即包含起始行,但不包含终止行,如果我们想要同时包含起始行和终止行,那么我们可以使用此过滤器:
Filter isf = new InclusiveStopFilter(Bytes.toBytes("row1")); // OK 包含了扫描的上限在结果之内
时间戳过滤器 TimestampsFilter
需要在扫描结果中对版本进行细粒度控制。一个版本是指一个列在一个特定时间的值。
filter = TimestampsFilter (1435747469212, 1435738500459);
timestamps.add(1479788961691L);
timestamps.add(1479788676517L);
timestamps.add(1479788812565L);
Filter filter = new TimestampsFilter(timestamps);
返回的结果就是在这个时间戳的数据
过滤器代码
package com.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Iterator;
public class HbaseDemo {
private Configuration conf = null;
private Connection conn = null;
@Before
public void init() throws IOException {
//构建个配置
conf = HBaseConfiguration.create();
//对于hbase的客户端来说,只需要知道hbase所使用的zookeeper集群就可以了
//因为hbase的客户端找hbase读写数据完全不用经过hmaster
conf.set("hbase.zookeeper.quorum", "master:2181,slave:2181");
conn = ConnectionFactory.createConnection(conf);
}
@Test
public void testFilter() throws IOException {
//针对行键的前缀过滤器,row key,前缀过滤
Filter pf = new PrefixFilter(Bytes.toBytes("liu"));
testScan(pf);
//行过滤器
//比较运算符
//小于,BinaryComparator比较器按照字节字典, LESS排在user002他之前的row key都出来
RowFilter rf1 = new RowFilter(CompareFilter.CompareOp.LESS, new BinaryComparator(Bytes.toBytes("user002")));
//在row key中包含00就符合
RowFilter rf2 = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("00"));
testScan(rf1);
System.out.println("*****************");
testScan(rf2);
//针对指定一个列的value来过滤,会显示一个完整的列
//列族名base_info,列标识符password,值是123456
//注意这边选择的运算符
SingleColumnValueFilter scvf = new SingleColumnValueFilter("base_info".getBytes(), "password".getBytes(), CompareFilter.CompareOp.EQUAL, "123456".getBytes());
//如果指定的列缺失,则也过滤掉
scvf.setFilterIfMissing(true);
testScan(scvf);
System.out.println("************");
//针对指定一个列的value的比较器来过滤
//正则比较器
//包含以zhang这个字符串开头的value值,符合这个要求的列
RegexStringComparator comparator1 = new RegexStringComparator("^zhang");
//子串包含si的值,符合这个条件的列
ByteArrayComparable comparator2 = new SubstringComparator("si");
//第三个参数可更换
SingleColumnValueFilter scvf1 = new SingleColumnValueFilter("base_info".getBytes(), "username".getBytes(), CompareFilter.CompareOp.EQUAL, comparator2);
testScan(scvf1);
//针对列族名的过滤器,返回结果中只会包含满足条件的列族中的数据
//等于,列族中名称info
FamilyFilter ff1 = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("info")));
//包含这个base前置的列族的对应列
FamilyFilter ff2 = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryPrefixComparator(Bytes.toBytes("base")));
testScan(ff2);
//针对列名的过滤器,返回结果中只会包含满足条件的列的数据
QualifierFilter qf = new QualifierFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("password")));
QualifierFilter qf2 = new QualifierFilter(CompareFilter.CompareOp.EQUAL, new BinaryPrefixComparator(Bytes.toBytes("us")));
testScan(qf2);
//跟SingleColumnValueFilter结果不同,只返回符合条件的该column
//列名前缀过滤
ColumnPrefixFilter cf = new ColumnPrefixFilter("passw".getBytes());
testScan(cf);
//指定多个列条件,但是这些条件是或的关系
byte[][] prefixes = {Bytes.toBytes("username"), Bytes.toBytes("password")};
MultipleColumnPrefixFilter mcf = new MultipleColumnPrefixFilter(prefixes);
testScan(mcf);
//多个过滤器
//等于,前置比较器
FamilyFilter ff20 = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryPrefixComparator(Bytes.toBytes("base")));
//列前缀比较器
ColumnPrefixFilter cf1 = new ColumnPrefixFilter("passw".getBytes());
//多个过滤器都要满足
//如果是只想满足一个条件,FilterList.Operator.MUST_PASS_ONE
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
filterList.addFilter(ff20);
filterList.addFilter(cf1);
testScan(filterList);
}
public void testScan(Filter filter) throws IOException {
Table t_user_info = conn.getTable(TableName.valueOf("t_user_info"));
Scan scan = new Scan();
scan.setFilter(filter);
ResultScanner scanner = t_user_info.getScanner(scan);
//迭代器
Iterator<Result> iter = scanner.iterator();
while (iter.hasNext()) {
//获取一行记录
Result result = iter.next();
//获取到每一个cell
CellScanner cellScanner = result.cellScanner();
//遍历cell
while (cellScanner.advance()) {
Cell current = cellScanner.current();
byte[] familyArray = current.getFamilyArray();
byte[] valueArray = current.getValueArray();
byte[] qualifierArray = current.getQualifierArray();
byte[] rowArray = current.getRowArray();
System.out.print(new String(rowArray, current.getRowOffset(), current.getRowLength()) + " ");
System.out.print(new String(familyArray, current.getFamilyOffset(), current.getFamilyLength()));
System.out.print(":" + new String(qualifierArray, current.getQualifierOffset(), current.getQualifierLength()));
System.out.print(" " + new String(valueArray, current.getValueOffset(), current.getValueLength()));
System.out.println();
}
System.out.println("-----------------------------");
}
}
}
分页过滤器 PageFilter
package com.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Iterator;
public class HbaseDemo {
private Configuration conf = null;
private Connection conn = null;
@Before
public void init() throws IOException {
//构建个配置
conf = HBaseConfiguration.create();
//对于hbase的客户端来说,只需要知道hbase所使用的zookeeper集群就可以了
//因为hbase的客户端找hbase读写数据完全不用经过hmaster
conf.set("hbase.zookeeper.quorum", "master:2181,slave:2181");
conn = ConnectionFactory.createConnection(conf);
}
//分页查询
@Test
public void pageScan() throws IOException, InterruptedException {
final byte[] POSTFIX = {0x00};
//获取表
Table table = conn.getTable(TableName.valueOf("t_user_info"));
//分页过滤器,每页多少条数据
PageFilter filter = new PageFilter(3);
//起始行号,这边设为空
byte[] lastRow = null;
//总共的记录
int totalRows = 0;
while (true) {
Scan scan = new Scan();
scan.setFilter(filter);
//当上次起始行不为空
if (lastRow != null) {
//设置本次查询的起始行键
//上次起始行加上后置,加后置可以获取上次结束行作为本次的起始行
byte[] startRow = Bytes.add(lastRow, POSTFIX);
//设置为起始行
scan.setStartRow(startRow);
}
//获取整个扫描的结果
ResultScanner scanner = table.getScanner(scan);
//定义本地的行号
int localRows = 0;
//结果
Result result;
//遍历一页的结果
while ((result = scanner.next()) != null) {
//localRows显示本地行号每页中的行号,result会调用toString
System.out.println(++localRows + ":" + result);
//全局行号++
totalRows++;
//上次起始的行号设置为这次结束的行号
lastRow = result.getRow();
}
scanner.close();
if (localRows == 0) {
break;
}
}
//打印本次总行数
System.out.println("total rows:" + totalRows);
}
}