一、 准备工作
MapReduce依赖HDFS已经能够完成诸多计算任务,但是仅仅依靠HDFS无法满足业务对于数据的高速随机读写
为此我们需要依赖HBase来满足。本节将主要介绍项目依赖的基础工作,在完成本基础工作后将通过实际代码
来演示如何使用。
1. 项目依赖
为了满足项目依赖框架的使用为此我们需要在pom中依赖HBase以及MapReduce相关依赖项,具体依赖如下所示。
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<version>3.1.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>2.1.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
以上依赖中我们可以看到,为了简化我们的开发工作,避免部署过多的服务。这里采用了 hadoop-minicluster
来模拟
相关环境,但是我们依然需要通过Docker部署HBase环境便于连接到该服务。完成以上服务依赖后就基本完成的项目的搭
建。
二、 开发项目
下面我们将列举两个例子,其中一个例子为从HBase读取数据然后写入HDFS,另一个则为从HDFS读取数据写入到HBase中。
通过两种例子,读者就可以灵活组合进行使用了。
1. 从HBase写入HDFS
下面为我们编写的核心服务代码。
public class ReadFromHBaseDriver extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new WriteToHBaseDriver(), args);
System.exit(exitCode);
}
@Override
public int run(String[] args) throws Exception {
Job job = new Job(getConf(), "Max temperature");
job.setJarByClass(getClass());
FileOutputFormat.setOutputPath(job, new Path(args[0]));
job.setMapperClass(ReadFromHBaseMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("mycf"), Bytes.toBytes("hits"));
TableMapReduceUtil.initTableMapperJob("mytable", scan, ReadFromHBaseMapper.class, Text.class, Text.class, job);
job.setNumReduceTasks(0);
return job.waitForCompletion(true) ? 0 : 1;
}
public static class ReadFromHBaseMapper extends TableMapper<Text, Text> {
private Text resultKey = new Text();
private Text resultVal = new Text();
@Override
protected void map(ImmutableBytesWritable key, Result value,
Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)
throws IOException, InterruptedException {
StringBuilder sb = new StringBuilder();
// 通过cell获取表中的列值
for (Cell cell : value.listCells()) {
String val = Bytes.toString(Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(),
cell.getValueOffset() + cell.getValueLength()));
sb.append(val).append("|");
}
// 写入HDFS
resultKey.set(Bytes.toString(value.getRow()));
resultVal.set(sb.deleteCharAt(sb.length()-1).toString());
context.write(resultKey, resultVal);
}
}
}
单元测试代码
@Test
public void test() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "file:///");
conf.set("mapreduce.framework.name", "local");
conf.set("hbase.zookeeper.quorum", "127.0.0.1");
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.setInt("mapreduce.task.io.sort.mb", 1);
Path input = new Path("g://output.txt");
FileSystem fs = FileSystem.getLocal(conf);
ReadFromHBaseDriver driver = new ReadFromHBaseDriver();
driver.setConf(conf);
int exitCode = driver.run(new String[] {
input.toString()
});
assertThat(exitCode, is(0));
}
我们通过实际调试就可以观察其具体表现。
2. 从HDFS写入HBase
核心服务代码
public class WriteToHBaseDriver extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new WriteToHBaseDriver(), args);
System.exit(exitCode);
}
@Override
public int run(String[] args) throws Exception {
Job job = new Job(getConf(), "Max temperature");
job.setJarByClass(getClass());
FileInputFormat.addInputPath(job, new Path(args[0]));
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(IntSumReducer.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(LongWritable.class);
TableMapReduceUtil.initTableReducerJob("mytable", IntSumReducer.class, job);
return job.waitForCompletion(true) ? 0 : 1;
}
static class MaxTemperatureMapper extends Mapper<LongWritable, Text,LongWritable,LongWritable> {
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, LongWritable, LongWritable>.Context context)
throws IOException, InterruptedException {
byte[] row = Bytes.toBytes("row10");
byte[] family = Bytes.toBytes("mycf");
byte[] column = Bytes.toBytes("text");
byte[] val = Bytes.toBytes("val1");
Put put = new Put(row);
put.addColumn(family, column, val);
context.write(new LongWritable(1), new LongWritable(1));
}
}
static class IntSumReducer extends TableReducer<LongWritable, LongWritable, Text> {
private Text resultKey = new Text();
@Override
public void reduce(LongWritable key, Iterable<LongWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (LongWritable value : values) {
sum += value.get();
}
Put put = new Put(key.toString().getBytes());
put.addColumn("mycf".getBytes(), "count".getBytes(), Bytes.toBytes(sum));
context.write(resultKey, put);
}
}
}
单元测试代码
@Test
public void test() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "file:///");
conf.set("mapreduce.framework.name", "local");
conf.set("hbase.zookeeper.quorum", "127.0.0.1");
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.setInt("mapreduce.task.io.sort.mb", 1);
Path input = new Path("g://data.txt");
FileSystem fs = FileSystem.getLocal(conf);
WriteToHBaseDriver driver = new WriteToHBaseDriver();
driver.setConf(conf);
int exitCode = driver.run(new String[] {
input.toString()
});
assertThat(exitCode, is(0));
}