过滤器查询
过滤器的类型很多,但是可以分为两大类——比较过滤器,专用过滤器
过滤器的作用是在服务端判断数据是否满足条件,然后只将满足条件的数据返回给客户端;
注意,可以用reset()重置过滤器
hbase过滤器的比较运算符:
LESS <LESS_OR_EQUAL <=EQUAL =NOT_EQUAL <>GREATER_OR_EQUAL >=GREATER >NO_OP 排除所有
Hbase过滤器的比较器(指定比较机制):
BinaryComparator 按字节索引顺序比较指定字节数组,采用Bytes.compareTo(byte[])BinaryPrefixComparator 跟前面相同,只是比较左端的数据是否相同NullComparator 判断给定的是否为空BitComparator 按位比较RegexStringComparator 提供一个正则的比较器,仅支持 EQUAL 和非EQUALSubstringComparator 判断提供的子串是否出现在value中
Hbase的过滤器分类
比较过滤器
行键过滤器RowFilter
筛选出匹配的所有的行
Filter filter1 = new RowFilter(CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("row-22")));scan.setFilter(filter1);
列族过滤器FamilyFilter
Filter filter1 = new FamilyFilter(CompareFilter.CompareOp.LESS, new BinaryComparator(Bytes.toBytes("colfam3")));scan.setFilter(filter1);
列过滤器QualifierFilter
filter = new QualifierFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("col-2")));scan.setFilter(filter1);
值过滤器 ValueFilter
Filter filter = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(".4") );scan.setFilter(filter1);
专用过滤器
单列值过滤器 SingleColumnValueFilter
——会返回满足条件的整行
SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("colfam1"),Bytes.toBytes("col-5"),CompareFilter.CompareOp.NOT_EQUAL,new SubstringComparator("val-5"));filter.setFilterIfMissing(true); //如果不设置为true,则那些不包含指定column的行也会返回scan.setFilter(filter1);
SingleColumnValueExcludeFilter
与上相反
前缀过滤器 PrefixFilter——针对行键
筛选出具有特定前缀的行键的数据
Filter filter = new PrefixFilter(Bytes.toBytes("row1"));scan.setFilter(filter1);
列前缀过滤器 ColumnPrefixFilter
Filter filter = new ColumnPrefixFilter(Bytes.toBytes("qual2"));scan.setFilter(filter1);
仅仅是行键过滤器 KeyOnlyFilter
这个过滤器唯一的功能就是只返回每行的行键,值全部为空,这对于只关注于行键的应用场景来说非常合适,这样忽略掉其值就可以减少传递到客户端的数据量,能起到一定的优化作用:
Filter kof = new KeyOnlyFilter(); // OK 返回所有的行,但值全是空
随机行过滤器 RandomRowFilter
从名字上就可以看出其大概的用法,本过滤器的作用就是按照一定的几率(<=0会过滤掉所有的行,>=1会包含所有的行)来返回随机的结果集,对于同样的数据集,多次使用同一个RandomRowFilter会返回不通的结果集,对于需要随机抽取一部分数据的应用场景,可以使用此过滤器:
Filter rrf = new RandomRowFilter((float) 0.8); // OK 随机选出一部分的行
包含起始行,但不包含终止行 InclusiveStopFilter
扫描的时候,我们可以设置一个开始行键和一个终止行键,默认情况下,这个行键的返回是前闭后开区间,即包含起始行,但不包含终止行,如果我们想要同时包含起始行和终止行,那么我们可以使用此过滤器:
Filter isf = new InclusiveStopFilter(Bytes.toBytes("row1")); // OK 包含了扫描的上限在结果之内
时间戳过滤器 TimestampsFilter
需要在扫描结果中对版本进行细粒度控制。一个版本是指一个列在一个特定时间的值。
filter = TimestampsFilter (1435747469212, 1435738500459);
timestamps.add(1479788961691L);timestamps.add(1479788676517L);timestamps.add(1479788812565L);Filter filter = new TimestampsFilter(timestamps);
返回的结果就是在这个时间戳的数据
过滤器代码
package com.hbase;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.Cell;import org.apache.hadoop.hbase.CellScanner;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.client.*;import org.apache.hadoop.hbase.filter.*;import org.apache.hadoop.hbase.util.Bytes;import org.junit.Before;import org.junit.Test;import java.io.IOException;import java.util.Iterator;public class HbaseDemo {private Configuration conf = null;private Connection conn = null;@Beforepublic void init() throws IOException {//构建个配置conf = HBaseConfiguration.create();//对于hbase的客户端来说,只需要知道hbase所使用的zookeeper集群就可以了//因为hbase的客户端找hbase读写数据完全不用经过hmasterconf.set("hbase.zookeeper.quorum", "master:2181,slave:2181");conn = ConnectionFactory.createConnection(conf);}@Testpublic void testFilter() throws IOException {//针对行键的前缀过滤器,row key,前缀过滤Filter pf = new PrefixFilter(Bytes.toBytes("liu"));testScan(pf);//行过滤器//比较运算符//小于,BinaryComparator比较器按照字节字典, LESS排在user002他之前的row key都出来RowFilter rf1 = new RowFilter(CompareFilter.CompareOp.LESS, new BinaryComparator(Bytes.toBytes("user002")));//在row key中包含00就符合RowFilter rf2 = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("00"));testScan(rf1);System.out.println("*****************");testScan(rf2);//针对指定一个列的value来过滤,会显示一个完整的列//列族名base_info,列标识符password,值是123456//注意这边选择的运算符SingleColumnValueFilter scvf = new SingleColumnValueFilter("base_info".getBytes(), "password".getBytes(), CompareFilter.CompareOp.EQUAL, "123456".getBytes());//如果指定的列缺失,则也过滤掉scvf.setFilterIfMissing(true);testScan(scvf);System.out.println("************");//针对指定一个列的value的比较器来过滤//正则比较器//包含以zhang这个字符串开头的value值,符合这个要求的列RegexStringComparator comparator1 = new RegexStringComparator("^zhang");//子串包含si的值,符合这个条件的列ByteArrayComparable comparator2 = new SubstringComparator("si");//第三个参数可更换SingleColumnValueFilter scvf1 = new SingleColumnValueFilter("base_info".getBytes(), "username".getBytes(), CompareFilter.CompareOp.EQUAL, comparator2);testScan(scvf1);//针对列族名的过滤器,返回结果中只会包含满足条件的列族中的数据//等于,列族中名称infoFamilyFilter ff1 = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("info")));//包含这个base前置的列族的对应列FamilyFilter ff2 = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryPrefixComparator(Bytes.toBytes("base")));testScan(ff2);//针对列名的过滤器,返回结果中只会包含满足条件的列的数据QualifierFilter qf = new QualifierFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("password")));QualifierFilter qf2 = new QualifierFilter(CompareFilter.CompareOp.EQUAL, new BinaryPrefixComparator(Bytes.toBytes("us")));testScan(qf2);//跟SingleColumnValueFilter结果不同,只返回符合条件的该column//列名前缀过滤ColumnPrefixFilter cf = new ColumnPrefixFilter("passw".getBytes());testScan(cf);//指定多个列条件,但是这些条件是或的关系byte[][] prefixes = {Bytes.toBytes("username"), Bytes.toBytes("password")};MultipleColumnPrefixFilter mcf = new MultipleColumnPrefixFilter(prefixes);testScan(mcf);//多个过滤器//等于,前置比较器FamilyFilter ff20 = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryPrefixComparator(Bytes.toBytes("base")));//列前缀比较器ColumnPrefixFilter cf1 = new ColumnPrefixFilter("passw".getBytes());//多个过滤器都要满足//如果是只想满足一个条件,FilterList.Operator.MUST_PASS_ONEFilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);filterList.addFilter(ff20);filterList.addFilter(cf1);testScan(filterList);}public void testScan(Filter filter) throws IOException {Table t_user_info = conn.getTable(TableName.valueOf("t_user_info"));Scan scan = new Scan();scan.setFilter(filter);ResultScanner scanner = t_user_info.getScanner(scan);//迭代器Iterator<Result> iter = scanner.iterator();while (iter.hasNext()) {//获取一行记录Result result = iter.next();//获取到每一个cellCellScanner cellScanner = result.cellScanner();//遍历cellwhile (cellScanner.advance()) {Cell current = cellScanner.current();byte[] familyArray = current.getFamilyArray();byte[] valueArray = current.getValueArray();byte[] qualifierArray = current.getQualifierArray();byte[] rowArray = current.getRowArray();System.out.print(new String(rowArray, current.getRowOffset(), current.getRowLength()) + " ");System.out.print(new String(familyArray, current.getFamilyOffset(), current.getFamilyLength()));System.out.print(":" + new String(qualifierArray, current.getQualifierOffset(), current.getQualifierLength()));System.out.print(" " + new String(valueArray, current.getValueOffset(), current.getValueLength()));System.out.println();}System.out.println("-----------------------------");}}}
分页过滤器 PageFilter
package com.hbase;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.Cell;import org.apache.hadoop.hbase.CellScanner;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.client.*;import org.apache.hadoop.hbase.filter.*;import org.apache.hadoop.hbase.util.Bytes;import org.junit.Before;import org.junit.Test;import java.io.IOException;import java.util.Iterator;public class HbaseDemo {private Configuration conf = null;private Connection conn = null;@Beforepublic void init() throws IOException {//构建个配置conf = HBaseConfiguration.create();//对于hbase的客户端来说,只需要知道hbase所使用的zookeeper集群就可以了//因为hbase的客户端找hbase读写数据完全不用经过hmasterconf.set("hbase.zookeeper.quorum", "master:2181,slave:2181");conn = ConnectionFactory.createConnection(conf);}//分页查询@Testpublic void pageScan() throws IOException, InterruptedException {final byte[] POSTFIX = {0x00};//获取表Table table = conn.getTable(TableName.valueOf("t_user_info"));//分页过滤器,每页多少条数据PageFilter filter = new PageFilter(3);//起始行号,这边设为空byte[] lastRow = null;//总共的记录int totalRows = 0;while (true) {Scan scan = new Scan();scan.setFilter(filter);//当上次起始行不为空if (lastRow != null) {//设置本次查询的起始行键//上次起始行加上后置,加后置可以获取上次结束行作为本次的起始行byte[] startRow = Bytes.add(lastRow, POSTFIX);//设置为起始行scan.setStartRow(startRow);}//获取整个扫描的结果ResultScanner scanner = table.getScanner(scan);//定义本地的行号int localRows = 0;//结果Result result;//遍历一页的结果while ((result = scanner.next()) != null) {//localRows显示本地行号每页中的行号,result会调用toStringSystem.out.println(++localRows + ":" + result);//全局行号++totalRows++;//上次起始的行号设置为这次结束的行号lastRow = result.getRow();}scanner.close();if (localRows == 0) {break;}}//打印本次总行数System.out.println("total rows:" + totalRows);}}
