banner.webp

1.整合思路

  1. HBaseMapReduce整合时,有三种情形:HBase作为MapReduce的数据流向;HBase作为MapReduce的数据来源;HBase同时作为MapReduce的数据来源与数据流向。当HBase作为数据来源时,如分析HBase里的数据,自定义Mapper需继承TableMapper,实质上是使用TableInputFormat取得数据。当HBase作为数据流向时,如从HDFS里向HBase里导入数据,自定义Reducer需继承TableReducer,实际上是使用TableOutputFormat进行格式化输出。
  2. 同时,需要调用TableMapReduceUtil类的静态方法initTableMapperJob来标示作为数据输入来源的HBase表名和自定义Mapper类,用TableMapReduceUtil类的静态方法initTableReducerJob来标示作为数据输出流向的HBase表名和自定义Reducer类。

2.例子

有如下数据存于music1.txt文件中,每一条代表一首音乐的一次播放记录,我们需要统计每首音乐的播放次数

  1. 1_song1_2016-1-11 song1 singer1 man slow pc
  2. 2_song2_2016-1-11 song2 singer2 woman slow ios
  3. 3_song3_2016-1-11 song3 singer3 man quick andriod
  4. 4_song4_2016-1-11 song4 singer4 woman slow ios
  5. 5_song5_2016-1-11 song5 singer5 man quick pc
  6. 6_song6_2016-1-11 song6 singer6 woman quick ios
  7. 7_song7_2016-1-11 song7 singer7 man quick andriod
  8. 8_song8_2016-1-11 song8 singer8 woman slow pc
  9. 9_song9_2016-1-11 song9 singer9 woman slow ios
  10. 10_song4_2016-1-11 song4 singer4 woman slow ios
  11. 11_song6_2016-1-11 song6 singer6 woman quick ios
  12. 12_song6_2016-1-11 song6 singer6 woman quick ios
  13. 13_song3_2016-1-11 song3 singer3 man quick andriod
  14. 14_song2_2016-1-11 song2 singer2 woman slow ios

我这里使用上一篇文章完成的HBase单元测试的方法帮我生成这张表

  1. @Test
  2. void createTable() throws IOException {
  3. hBaseService.createTable("music","HBASE_ROW_KEY","info");
  4. hBaseService.insertOrUpdate("music","1_song1_2016-1-11","info","name","song1");
  5. hBaseService.insertOrUpdate("music","1_song1_2016-1-11","info","singer","singer1");
  6. hBaseService.insertOrUpdate("music","1_song1_2016-1-11","info","gender","man");
  7. hBaseService.insertOrUpdate("music","1_song1_2016-1-11","info","ryghme","slow");
  8. hBaseService.insertOrUpdate("music","1_song1_2016-1-11","info","terminal","pc");
  9. hBaseService.insertOrUpdate("music","2_song2_2016-1-11","info","name","song2");
  10. hBaseService.insertOrUpdate("music","2_song2_2016-1-11","info","singer","singer2");
  11. hBaseService.insertOrUpdate("music","2_song2_2016-1-11","info","gender","woman");
  12. hBaseService.insertOrUpdate("music","2_song2_2016-1-11","info","ryghme","slow");
  13. hBaseService.insertOrUpdate("music","2_song2_2016-1-11","info","terminal","ios");
  14. hBaseService.insertOrUpdate("music","3_song3_2016-1-11","info","name","song3");
  15. hBaseService.insertOrUpdate("music","3_song3_2016-1-11","info","singer","singer3");
  16. hBaseService.insertOrUpdate("music","3_song3_2016-1-11","info","gender","man");
  17. hBaseService.insertOrUpdate("music","3_song3_2016-1-11","info","ryghme","quick");
  18. hBaseService.insertOrUpdate("music","3_song3_2016-1-11","info","terminal","andriod");
  19. hBaseService.insertOrUpdate("music","4_song4_2016-1-11","info","name","song4");
  20. hBaseService.insertOrUpdate("music","4_song4_2016-1-11","info","singer","singer4");
  21. hBaseService.insertOrUpdate("music","4_song4_2016-1-11","info","gender","woman");
  22. hBaseService.insertOrUpdate("music","4_song4_2016-1-11","info","ryghme","slow");
  23. hBaseService.insertOrUpdate("music","4_song4_2016-1-11","info","terminal","ios");
  24. hBaseService.insertOrUpdate("music","5_song5_2016-1-11","info","name","song5");
  25. hBaseService.insertOrUpdate("music","5_song5_2016-1-11","info","singer","singer5");
  26. hBaseService.insertOrUpdate("music","5_song5_2016-1-11","info","gender","man");
  27. hBaseService.insertOrUpdate("music","5_song5_2016-1-11","info","ryghme","quick");
  28. hBaseService.insertOrUpdate("music","5_song5_2016-1-11","info","terminal","pc");
  29. hBaseService.insertOrUpdate("music","6_song6_2016-1-11","info","name","song6");
  30. hBaseService.insertOrUpdate("music","6_song6_2016-1-11","info","singer","singer6");
  31. hBaseService.insertOrUpdate("music","6_song6_2016-1-11","info","gender","woman");
  32. hBaseService.insertOrUpdate("music","6_song6_2016-1-11","info","ryghme","quick");
  33. hBaseService.insertOrUpdate("music","6_song6_2016-1-11","info","terminal","ios");
  34. hBaseService.insertOrUpdate("music","7_song7_2016-1-11","info","name","song7");
  35. hBaseService.insertOrUpdate("music","7_song7_2016-1-11","info","singer","singer7");
  36. hBaseService.insertOrUpdate("music","7_song7_2016-1-11","info","gender","man");
  37. hBaseService.insertOrUpdate("music","7_song7_2016-1-11","info","ryghme","quick");
  38. hBaseService.insertOrUpdate("music","7_song7_2016-1-11","info","terminal","andriod");
  39. hBaseService.insertOrUpdate("music","8_song8_2016-1-11","info","name","song8");
  40. hBaseService.insertOrUpdate("music","8_song8_2016-1-11","info","singer","singer8");
  41. hBaseService.insertOrUpdate("music","8_song8_2016-1-11","info","gender","woman");
  42. hBaseService.insertOrUpdate("music","8_song8_2016-1-11","info","ryghme","slow");
  43. hBaseService.insertOrUpdate("music","8_song8_2016-1-11","info","terminal","pc");
  44. hBaseService.insertOrUpdate("music","9_song9_2016-1-11","info","name","song9");
  45. hBaseService.insertOrUpdate("music","9_song9_2016-1-11","info","singer","singer9");
  46. hBaseService.insertOrUpdate("music","9_song9_2016-1-11","info","gender","woman");
  47. hBaseService.insertOrUpdate("music","9_song9_2016-1-11","info","ryghme","slow");
  48. hBaseService.insertOrUpdate("music","9_song9_2016-1-11","info","terminal","ios");
  49. hBaseService.insertOrUpdate("music","10_song4_2016-1-11","info","name","song4");
  50. hBaseService.insertOrUpdate("music","10_song4_2016-1-11","info","singer","singer4");
  51. hBaseService.insertOrUpdate("music","10_song4_2016-1-11","info","gender","woman");
  52. hBaseService.insertOrUpdate("music","10_song4_2016-1-11","info","ryghme","slow");
  53. hBaseService.insertOrUpdate("music","10_song4_2016-1-11","info","terminal","ios");
  54. hBaseService.insertOrUpdate("music","11_song6_2016-1-11","info","name","song6");
  55. hBaseService.insertOrUpdate("music","11_song6_2016-1-11","info","singer","singer6");
  56. hBaseService.insertOrUpdate("music","11_song6_2016-1-11","info","gender","woman");
  57. hBaseService.insertOrUpdate("music","11_song6_2016-1-11","info","ryghme","quick");
  58. hBaseService.insertOrUpdate("music","11_song6_2016-1-11","info","terminal","ios");
  59. hBaseService.insertOrUpdate("music","12_song6_2016-1-11","info","name","song6");
  60. hBaseService.insertOrUpdate("music","12_song6_2016-1-11","info","singer","singer6");
  61. hBaseService.insertOrUpdate("music","12_song6_2016-1-11","info","gender","woman");
  62. hBaseService.insertOrUpdate("music","12_song6_2016-1-11","info","ryghme","quick");
  63. hBaseService.insertOrUpdate("music","12_song6_2016-1-11","info","terminal","ios");
  64. hBaseService.insertOrUpdate("music","13_song3_2016-1-11","info","name","song3");
  65. hBaseService.insertOrUpdate("music","13_song3_2016-1-11","info","singer","singer3");
  66. hBaseService.insertOrUpdate("music","13_song3_2016-1-11","info","gender","man");
  67. hBaseService.insertOrUpdate("music","13_song3_2016-1-11","info","ryghme","quick");
  68. hBaseService.insertOrUpdate("music","13_song3_2016-1-11","info","terminal","andriod");
  69. hBaseService.insertOrUpdate("music","14_song2_2016-1-11","info","name","song2");
  70. hBaseService.insertOrUpdate("music","14_song2_2016-1-11","info","singer","singer2");
  71. hBaseService.insertOrUpdate("music","14_song2_2016-1-11","info","gender","woman");
  72. hBaseService.insertOrUpdate("music","14_song2_2016-1-11","info","ryghme","slow");
  73. hBaseService.insertOrUpdate("music","14_song2_2016-1-11","info","terminal","ios");
  74. }

进入hbase shell,创建namelist表

  1. create 'namelist','details'

编写mapper、reducer、runner,建议导入/usr/local/hbase/lib中的所有jar包,我这里根据之前整合Hadoop的项目中又新导入如下两包

  1. <!--HBase-->
  2. <dependency>
  3. <groupId>org.apache.hbase</groupId>
  4. <artifactId>hbase-client</artifactId>
  5. <version>2.4.12</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.hbase</groupId>
  9. <artifactId>hbase-zookeeper</artifactId>
  10. <version>2.4.12</version>
  11. </dependency>
  1. public class MusicMapper extends TableMapper<Text, IntWritable> {
  2. @Override
  3. protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
  4. List<Cell> cells = value.listCells();
  5. for (Cell cell : cells) {
  6. context.write(new Text(Bytes.toString(CellUtil.cloneValue(cell))),new IntWritable(1));
  7. }
  8. }
  9. }
  1. public class MusicReducer extends TableReducer<Text, IntWritable,Text> {
  2. @Override
  3. protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  4. int playCount = 0;
  5. for (IntWritable value : values) {
  6. playCount += value.get();
  7. }
  8. Put put = new Put(Bytes.toBytes(key.toString()));
  9. put.addColumn(Bytes.toBytes("details"),Bytes.toBytes("rank"),Bytes.toBytes(String.valueOf(playCount)));
  10. context.write(key,put);
  11. }
  12. }
  1. public class MusicRunner {
  2. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  3. Configuration configuration = HBaseConfiguration.create();
  4. configuration.set("hbase.rootdir","hdfs://hadoop0:9000/hbase");
  5. configuration.set("hbase.zookeeper.quorum","hadoop0");
  6. Job job = Job.getInstance(configuration,"top-music");
  7. //MapReduce程序基本配置
  8. job.setJarByClass(MusicRunner.class);
  9. job.setNumReduceTasks(1);
  10. //为music表设置过滤条件,只保留歌名name
  11. Scan scan = new Scan();
  12. scan.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"));
  13. //使用HBase提供的工具设置job
  14. TableMapReduceUtil.initTableMapperJob("music",scan, MusicMapper.class, Text.class, IntWritable.class,job);
  15. TableMapReduceUtil.initTableReducerJob("namelist", MusicReducer.class,job);
  16. job.waitForCompletion(true);
  17. System.out.println("执行成功,统计结果保存在namelist表中");
  18. }
  19. }

执行main方法,我们在hbase shell中查看
1.png

这样一个简单的HBase整合MapReduce的例子就完成了,根据自己的业务场景进行修改即可~