过滤器查询

过滤器的类型很多,但是可以分为两大类——比较过滤器,专用过滤器
过滤器的作用是在服务端判断数据是否满足条件,然后只将满足条件的数据返回给客户端;

注意,可以用reset()重置过滤器

hbase过滤器的比较运算符:

  1. LESS <
  2. LESS_OR_EQUAL <=
  3. EQUAL =
  4. NOT_EQUAL <>
  5. GREATER_OR_EQUAL >=
  6. GREATER >
  7. NO_OP 排除所有

Hbase过滤器的比较器(指定比较机制):

  1. BinaryComparator 按字节索引顺序比较指定字节数组,采用Bytes.compareTo(byte[])
  2. BinaryPrefixComparator 跟前面相同,只是比较左端的数据是否相同
  3. NullComparator 判断给定的是否为空
  4. BitComparator 按位比较
  5. RegexStringComparator 提供一个正则的比较器,仅支持 EQUAL 和非EQUAL
  6. SubstringComparator 判断提供的子串是否出现在value

Hbase的过滤器分类

比较过滤器

行键过滤器RowFilter
筛选出匹配的所有的行

  1. Filter filter1 = new RowFilter(CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("row-22")));
  2. scan.setFilter(filter1);

列族过滤器FamilyFilter

  1. Filter filter1 = new FamilyFilter(CompareFilter.CompareOp.LESS, new BinaryComparator(Bytes.toBytes("colfam3")));
  2. scan.setFilter(filter1);

列过滤器QualifierFilter

  1. filter = new QualifierFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("col-2")));
  2. scan.setFilter(filter1);

值过滤器 ValueFilter

  1. Filter filter = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(".4") );
  2. scan.setFilter(filter1);

专用过滤器

单列值过滤器 SingleColumnValueFilter

——会返回满足条件的整行

  1. SingleColumnValueFilter filter = new SingleColumnValueFilter(
  2. Bytes.toBytes("colfam1"),
  3. Bytes.toBytes("col-5"),
  4. CompareFilter.CompareOp.NOT_EQUAL,
  5. new SubstringComparator("val-5"));
  6. filter.setFilterIfMissing(true); //如果不设置为true,则那些不包含指定column的行也会返回
  7. scan.setFilter(filter1);

SingleColumnValueExcludeFilter
与上相反

前缀过滤器 PrefixFilter——针对行键
筛选出具有特定前缀的行键的数据

  1. Filter filter = new PrefixFilter(Bytes.toBytes("row1"));
  2. scan.setFilter(filter1);

列前缀过滤器 ColumnPrefixFilter

  1. Filter filter = new ColumnPrefixFilter(Bytes.toBytes("qual2"));
  2. scan.setFilter(filter1);

仅仅是行键过滤器 KeyOnlyFilter

这个过滤器唯一的功能就是只返回每行的行键,值全部为空,这对于只关注于行键的应用场景来说非常合适,这样忽略掉其值就可以减少传递到客户端的数据量,能起到一定的优化作用:

  1. Filter kof = new KeyOnlyFilter(); // OK 返回所有的行,但值全是空

随机行过滤器 RandomRowFilter
从名字上就可以看出其大概的用法,本过滤器的作用就是按照一定的几率(<=0会过滤掉所有的行,>=1会包含所有的行)来返回随机的结果集,对于同样的数据集,多次使用同一个RandomRowFilter会返回不通的结果集,对于需要随机抽取一部分数据的应用场景,可以使用此过滤器:

  1. Filter rrf = new RandomRowFilter((float) 0.8); // OK 随机选出一部分的行

包含起始行,但不包含终止行 InclusiveStopFilter
扫描的时候,我们可以设置一个开始行键和一个终止行键,默认情况下,这个行键的返回是前闭后开区间,即包含起始行,但不包含终止行,如果我们想要同时包含起始行和终止行,那么我们可以使用此过滤器:

  1. Filter isf = new InclusiveStopFilter(Bytes.toBytes("row1")); // OK 包含了扫描的上限在结果之内

时间戳过滤器 TimestampsFilter

需要在扫描结果中对版本进行细粒度控制。一个版本是指一个列在一个特定时间的值。

  1. filter = TimestampsFilter (1435747469212, 1435738500459);
  1. timestamps.add(1479788961691L);
  2. timestamps.add(1479788676517L);
  3. timestamps.add(1479788812565L);
  4. Filter filter = new TimestampsFilter(timestamps);

返回的结果就是在这个时间戳的数据

过滤器代码

  1. package com.hbase;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.hbase.Cell;
  4. import org.apache.hadoop.hbase.CellScanner;
  5. import org.apache.hadoop.hbase.HBaseConfiguration;
  6. import org.apache.hadoop.hbase.TableName;
  7. import org.apache.hadoop.hbase.client.*;
  8. import org.apache.hadoop.hbase.filter.*;
  9. import org.apache.hadoop.hbase.util.Bytes;
  10. import org.junit.Before;
  11. import org.junit.Test;
  12. import java.io.IOException;
  13. import java.util.Iterator;
  14. public class HbaseDemo {
  15. private Configuration conf = null;
  16. private Connection conn = null;
  17. @Before
  18. public void init() throws IOException {
  19. //构建个配置
  20. conf = HBaseConfiguration.create();
  21. //对于hbase的客户端来说,只需要知道hbase所使用的zookeeper集群就可以了
  22. //因为hbase的客户端找hbase读写数据完全不用经过hmaster
  23. conf.set("hbase.zookeeper.quorum", "master:2181,slave:2181");
  24. conn = ConnectionFactory.createConnection(conf);
  25. }
  26. @Test
  27. public void testFilter() throws IOException {
  28. //针对行键的前缀过滤器,row key,前缀过滤
  29. Filter pf = new PrefixFilter(Bytes.toBytes("liu"));
  30. testScan(pf);
  31. //行过滤器
  32. //比较运算符
  33. //小于,BinaryComparator比较器按照字节字典, LESS排在user002他之前的row key都出来
  34. RowFilter rf1 = new RowFilter(CompareFilter.CompareOp.LESS, new BinaryComparator(Bytes.toBytes("user002")));
  35. //在row key中包含00就符合
  36. RowFilter rf2 = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("00"));
  37. testScan(rf1);
  38. System.out.println("*****************");
  39. testScan(rf2);
  40. //针对指定一个列的value来过滤,会显示一个完整的列
  41. //列族名base_info,列标识符password,值是123456
  42. //注意这边选择的运算符
  43. SingleColumnValueFilter scvf = new SingleColumnValueFilter("base_info".getBytes(), "password".getBytes(), CompareFilter.CompareOp.EQUAL, "123456".getBytes());
  44. //如果指定的列缺失,则也过滤掉
  45. scvf.setFilterIfMissing(true);
  46. testScan(scvf);
  47. System.out.println("************");
  48. //针对指定一个列的value的比较器来过滤
  49. //正则比较器
  50. //包含以zhang这个字符串开头的value值,符合这个要求的列
  51. RegexStringComparator comparator1 = new RegexStringComparator("^zhang");
  52. //子串包含si的值,符合这个条件的列
  53. ByteArrayComparable comparator2 = new SubstringComparator("si");
  54. //第三个参数可更换
  55. SingleColumnValueFilter scvf1 = new SingleColumnValueFilter("base_info".getBytes(), "username".getBytes(), CompareFilter.CompareOp.EQUAL, comparator2);
  56. testScan(scvf1);
  57. //针对列族名的过滤器,返回结果中只会包含满足条件的列族中的数据
  58. //等于,列族中名称info
  59. FamilyFilter ff1 = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("info")));
  60. //包含这个base前置的列族的对应列
  61. FamilyFilter ff2 = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryPrefixComparator(Bytes.toBytes("base")));
  62. testScan(ff2);
  63. //针对列名的过滤器,返回结果中只会包含满足条件的列的数据
  64. QualifierFilter qf = new QualifierFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("password")));
  65. QualifierFilter qf2 = new QualifierFilter(CompareFilter.CompareOp.EQUAL, new BinaryPrefixComparator(Bytes.toBytes("us")));
  66. testScan(qf2);
  67. //跟SingleColumnValueFilter结果不同,只返回符合条件的该column
  68. //列名前缀过滤
  69. ColumnPrefixFilter cf = new ColumnPrefixFilter("passw".getBytes());
  70. testScan(cf);
  71. //指定多个列条件,但是这些条件是或的关系
  72. byte[][] prefixes = {Bytes.toBytes("username"), Bytes.toBytes("password")};
  73. MultipleColumnPrefixFilter mcf = new MultipleColumnPrefixFilter(prefixes);
  74. testScan(mcf);
  75. //多个过滤器
  76. //等于,前置比较器
  77. FamilyFilter ff20 = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryPrefixComparator(Bytes.toBytes("base")));
  78. //列前缀比较器
  79. ColumnPrefixFilter cf1 = new ColumnPrefixFilter("passw".getBytes());
  80. //多个过滤器都要满足
  81. //如果是只想满足一个条件,FilterList.Operator.MUST_PASS_ONE
  82. FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
  83. filterList.addFilter(ff20);
  84. filterList.addFilter(cf1);
  85. testScan(filterList);
  86. }
  87. public void testScan(Filter filter) throws IOException {
  88. Table t_user_info = conn.getTable(TableName.valueOf("t_user_info"));
  89. Scan scan = new Scan();
  90. scan.setFilter(filter);
  91. ResultScanner scanner = t_user_info.getScanner(scan);
  92. //迭代器
  93. Iterator<Result> iter = scanner.iterator();
  94. while (iter.hasNext()) {
  95. //获取一行记录
  96. Result result = iter.next();
  97. //获取到每一个cell
  98. CellScanner cellScanner = result.cellScanner();
  99. //遍历cell
  100. while (cellScanner.advance()) {
  101. Cell current = cellScanner.current();
  102. byte[] familyArray = current.getFamilyArray();
  103. byte[] valueArray = current.getValueArray();
  104. byte[] qualifierArray = current.getQualifierArray();
  105. byte[] rowArray = current.getRowArray();
  106. System.out.print(new String(rowArray, current.getRowOffset(), current.getRowLength()) + " ");
  107. System.out.print(new String(familyArray, current.getFamilyOffset(), current.getFamilyLength()));
  108. System.out.print(":" + new String(qualifierArray, current.getQualifierOffset(), current.getQualifierLength()));
  109. System.out.print(" " + new String(valueArray, current.getValueOffset(), current.getValueLength()));
  110. System.out.println();
  111. }
  112. System.out.println("-----------------------------");
  113. }
  114. }
  115. }

分页过滤器 PageFilter

  1. package com.hbase;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.hbase.Cell;
  4. import org.apache.hadoop.hbase.CellScanner;
  5. import org.apache.hadoop.hbase.HBaseConfiguration;
  6. import org.apache.hadoop.hbase.TableName;
  7. import org.apache.hadoop.hbase.client.*;
  8. import org.apache.hadoop.hbase.filter.*;
  9. import org.apache.hadoop.hbase.util.Bytes;
  10. import org.junit.Before;
  11. import org.junit.Test;
  12. import java.io.IOException;
  13. import java.util.Iterator;
  14. public class HbaseDemo {
  15. private Configuration conf = null;
  16. private Connection conn = null;
  17. @Before
  18. public void init() throws IOException {
  19. //构建个配置
  20. conf = HBaseConfiguration.create();
  21. //对于hbase的客户端来说,只需要知道hbase所使用的zookeeper集群就可以了
  22. //因为hbase的客户端找hbase读写数据完全不用经过hmaster
  23. conf.set("hbase.zookeeper.quorum", "master:2181,slave:2181");
  24. conn = ConnectionFactory.createConnection(conf);
  25. }
  26. //分页查询
  27. @Test
  28. public void pageScan() throws IOException, InterruptedException {
  29. final byte[] POSTFIX = {0x00};
  30. //获取表
  31. Table table = conn.getTable(TableName.valueOf("t_user_info"));
  32. //分页过滤器,每页多少条数据
  33. PageFilter filter = new PageFilter(3);
  34. //起始行号,这边设为空
  35. byte[] lastRow = null;
  36. //总共的记录
  37. int totalRows = 0;
  38. while (true) {
  39. Scan scan = new Scan();
  40. scan.setFilter(filter);
  41. //当上次起始行不为空
  42. if (lastRow != null) {
  43. //设置本次查询的起始行键
  44. //上次起始行加上后置,加后置可以获取上次结束行作为本次的起始行
  45. byte[] startRow = Bytes.add(lastRow, POSTFIX);
  46. //设置为起始行
  47. scan.setStartRow(startRow);
  48. }
  49. //获取整个扫描的结果
  50. ResultScanner scanner = table.getScanner(scan);
  51. //定义本地的行号
  52. int localRows = 0;
  53. //结果
  54. Result result;
  55. //遍历一页的结果
  56. while ((result = scanner.next()) != null) {
  57. //localRows显示本地行号每页中的行号,result会调用toString
  58. System.out.println(++localRows + ":" + result);
  59. //全局行号++
  60. totalRows++;
  61. //上次起始的行号设置为这次结束的行号
  62. lastRow = result.getRow();
  63. }
  64. scanner.close();
  65. if (localRows == 0) {
  66. break;
  67. }
  68. }
  69. //打印本次总行数
  70. System.out.println("total rows:" + totalRows);
  71. }
  72. }