1.整合思路
HBase与MapReduce整合时,有三种情形:HBase作为MapReduce的数据流向;HBase作为MapReduce的数据来源;HBase同时作为MapReduce的数据来源与数据流向。当HBase作为数据来源时,如分析HBase里的数据,自定义Mapper需继承TableMapper,实质上是使用TableInputFormat取得数据。当HBase作为数据流向时,如从HDFS里向HBase里导入数据,自定义Reducer需继承TableReducer,实际上是使用TableOutputFormat进行格式化输出。
同时,需要调用TableMapReduceUtil类的静态方法initTableMapperJob来标示作为数据输入来源的HBase表名和自定义Mapper类,用TableMapReduceUtil类的静态方法initTableReducerJob来标示作为数据输出流向的HBase表名和自定义Reducer类。
2.例子
有如下数据存于music1.txt文件中,每一条代表一首音乐的一次播放记录,我们需要统计每首音乐的播放次数
1_song1_2016-1-11 song1 singer1 man slow pc
2_song2_2016-1-11 song2 singer2 woman slow ios
3_song3_2016-1-11 song3 singer3 man quick andriod
4_song4_2016-1-11 song4 singer4 woman slow ios
5_song5_2016-1-11 song5 singer5 man quick pc
6_song6_2016-1-11 song6 singer6 woman quick ios
7_song7_2016-1-11 song7 singer7 man quick andriod
8_song8_2016-1-11 song8 singer8 woman slow pc
9_song9_2016-1-11 song9 singer9 woman slow ios
10_song4_2016-1-11 song4 singer4 woman slow ios
11_song6_2016-1-11 song6 singer6 woman quick ios
12_song6_2016-1-11 song6 singer6 woman quick ios
13_song3_2016-1-11 song3 singer3 man quick andriod
14_song2_2016-1-11 song2 singer2 woman slow ios
我这里使用上一篇文章完成的HBase单元测试的方法帮我生成这张表
@Test
void createTable() throws IOException {
hBaseService.createTable("music","HBASE_ROW_KEY","info");
hBaseService.insertOrUpdate("music","1_song1_2016-1-11","info","name","song1");
hBaseService.insertOrUpdate("music","1_song1_2016-1-11","info","singer","singer1");
hBaseService.insertOrUpdate("music","1_song1_2016-1-11","info","gender","man");
hBaseService.insertOrUpdate("music","1_song1_2016-1-11","info","ryghme","slow");
hBaseService.insertOrUpdate("music","1_song1_2016-1-11","info","terminal","pc");
hBaseService.insertOrUpdate("music","2_song2_2016-1-11","info","name","song2");
hBaseService.insertOrUpdate("music","2_song2_2016-1-11","info","singer","singer2");
hBaseService.insertOrUpdate("music","2_song2_2016-1-11","info","gender","woman");
hBaseService.insertOrUpdate("music","2_song2_2016-1-11","info","ryghme","slow");
hBaseService.insertOrUpdate("music","2_song2_2016-1-11","info","terminal","ios");
hBaseService.insertOrUpdate("music","3_song3_2016-1-11","info","name","song3");
hBaseService.insertOrUpdate("music","3_song3_2016-1-11","info","singer","singer3");
hBaseService.insertOrUpdate("music","3_song3_2016-1-11","info","gender","man");
hBaseService.insertOrUpdate("music","3_song3_2016-1-11","info","ryghme","quick");
hBaseService.insertOrUpdate("music","3_song3_2016-1-11","info","terminal","andriod");
hBaseService.insertOrUpdate("music","4_song4_2016-1-11","info","name","song4");
hBaseService.insertOrUpdate("music","4_song4_2016-1-11","info","singer","singer4");
hBaseService.insertOrUpdate("music","4_song4_2016-1-11","info","gender","woman");
hBaseService.insertOrUpdate("music","4_song4_2016-1-11","info","ryghme","slow");
hBaseService.insertOrUpdate("music","4_song4_2016-1-11","info","terminal","ios");
hBaseService.insertOrUpdate("music","5_song5_2016-1-11","info","name","song5");
hBaseService.insertOrUpdate("music","5_song5_2016-1-11","info","singer","singer5");
hBaseService.insertOrUpdate("music","5_song5_2016-1-11","info","gender","man");
hBaseService.insertOrUpdate("music","5_song5_2016-1-11","info","ryghme","quick");
hBaseService.insertOrUpdate("music","5_song5_2016-1-11","info","terminal","pc");
hBaseService.insertOrUpdate("music","6_song6_2016-1-11","info","name","song6");
hBaseService.insertOrUpdate("music","6_song6_2016-1-11","info","singer","singer6");
hBaseService.insertOrUpdate("music","6_song6_2016-1-11","info","gender","woman");
hBaseService.insertOrUpdate("music","6_song6_2016-1-11","info","ryghme","quick");
hBaseService.insertOrUpdate("music","6_song6_2016-1-11","info","terminal","ios");
hBaseService.insertOrUpdate("music","7_song7_2016-1-11","info","name","song7");
hBaseService.insertOrUpdate("music","7_song7_2016-1-11","info","singer","singer7");
hBaseService.insertOrUpdate("music","7_song7_2016-1-11","info","gender","man");
hBaseService.insertOrUpdate("music","7_song7_2016-1-11","info","ryghme","quick");
hBaseService.insertOrUpdate("music","7_song7_2016-1-11","info","terminal","andriod");
hBaseService.insertOrUpdate("music","8_song8_2016-1-11","info","name","song8");
hBaseService.insertOrUpdate("music","8_song8_2016-1-11","info","singer","singer8");
hBaseService.insertOrUpdate("music","8_song8_2016-1-11","info","gender","woman");
hBaseService.insertOrUpdate("music","8_song8_2016-1-11","info","ryghme","slow");
hBaseService.insertOrUpdate("music","8_song8_2016-1-11","info","terminal","pc");
hBaseService.insertOrUpdate("music","9_song9_2016-1-11","info","name","song9");
hBaseService.insertOrUpdate("music","9_song9_2016-1-11","info","singer","singer9");
hBaseService.insertOrUpdate("music","9_song9_2016-1-11","info","gender","woman");
hBaseService.insertOrUpdate("music","9_song9_2016-1-11","info","ryghme","slow");
hBaseService.insertOrUpdate("music","9_song9_2016-1-11","info","terminal","ios");
hBaseService.insertOrUpdate("music","10_song4_2016-1-11","info","name","song4");
hBaseService.insertOrUpdate("music","10_song4_2016-1-11","info","singer","singer4");
hBaseService.insertOrUpdate("music","10_song4_2016-1-11","info","gender","woman");
hBaseService.insertOrUpdate("music","10_song4_2016-1-11","info","ryghme","slow");
hBaseService.insertOrUpdate("music","10_song4_2016-1-11","info","terminal","ios");
hBaseService.insertOrUpdate("music","11_song6_2016-1-11","info","name","song6");
hBaseService.insertOrUpdate("music","11_song6_2016-1-11","info","singer","singer6");
hBaseService.insertOrUpdate("music","11_song6_2016-1-11","info","gender","woman");
hBaseService.insertOrUpdate("music","11_song6_2016-1-11","info","ryghme","quick");
hBaseService.insertOrUpdate("music","11_song6_2016-1-11","info","terminal","ios");
hBaseService.insertOrUpdate("music","12_song6_2016-1-11","info","name","song6");
hBaseService.insertOrUpdate("music","12_song6_2016-1-11","info","singer","singer6");
hBaseService.insertOrUpdate("music","12_song6_2016-1-11","info","gender","woman");
hBaseService.insertOrUpdate("music","12_song6_2016-1-11","info","ryghme","quick");
hBaseService.insertOrUpdate("music","12_song6_2016-1-11","info","terminal","ios");
hBaseService.insertOrUpdate("music","13_song3_2016-1-11","info","name","song3");
hBaseService.insertOrUpdate("music","13_song3_2016-1-11","info","singer","singer3");
hBaseService.insertOrUpdate("music","13_song3_2016-1-11","info","gender","man");
hBaseService.insertOrUpdate("music","13_song3_2016-1-11","info","ryghme","quick");
hBaseService.insertOrUpdate("music","13_song3_2016-1-11","info","terminal","andriod");
hBaseService.insertOrUpdate("music","14_song2_2016-1-11","info","name","song2");
hBaseService.insertOrUpdate("music","14_song2_2016-1-11","info","singer","singer2");
hBaseService.insertOrUpdate("music","14_song2_2016-1-11","info","gender","woman");
hBaseService.insertOrUpdate("music","14_song2_2016-1-11","info","ryghme","slow");
hBaseService.insertOrUpdate("music","14_song2_2016-1-11","info","terminal","ios");
}
进入hbase shell,创建namelist表
create 'namelist','details'
编写mapper、reducer、runner,建议导入/usr/local/hbase/lib中的所有jar包,我这里根据之前整合Hadoop的项目中又新导入如下两包
<!--HBase-->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.4.12</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-zookeeper</artifactId>
<version>2.4.12</version>
</dependency>
public class MusicMapper extends TableMapper<Text, IntWritable> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
List<Cell> cells = value.listCells();
for (Cell cell : cells) {
context.write(new Text(Bytes.toString(CellUtil.cloneValue(cell))),new IntWritable(1));
}
}
}
public class MusicReducer extends TableReducer<Text, IntWritable,Text> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int playCount = 0;
for (IntWritable value : values) {
playCount += value.get();
}
Put put = new Put(Bytes.toBytes(key.toString()));
put.addColumn(Bytes.toBytes("details"),Bytes.toBytes("rank"),Bytes.toBytes(String.valueOf(playCount)));
context.write(key,put);
}
}
public class MusicRunner {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.rootdir","hdfs://hadoop0:9000/hbase");
configuration.set("hbase.zookeeper.quorum","hadoop0");
Job job = Job.getInstance(configuration,"top-music");
//MapReduce程序基本配置
job.setJarByClass(MusicRunner.class);
job.setNumReduceTasks(1);
//为music表设置过滤条件,只保留歌名name
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"));
//使用HBase提供的工具设置job
TableMapReduceUtil.initTableMapperJob("music",scan, MusicMapper.class, Text.class, IntWritable.class,job);
TableMapReduceUtil.initTableReducerJob("namelist", MusicReducer.class,job);
job.waitForCompletion(true);
System.out.println("执行成功,统计结果保存在namelist表中");
}
}
执行main方法,我们在hbase shell中查看
这样一个简单的HBase整合MapReduce的例子就完成了,根据自己的业务场景进行修改即可~