一、 准备工作

MapReduce依赖HDFS已经能够完成诸多计算任务,但是仅仅依靠HDFS无法满足业务对于数据的高速随机读写
为此我们需要依赖HBase来满足。本节将主要介绍项目依赖的基础工作,在完成本基础工作后将通过实际代码
来演示如何使用。

1. 项目依赖

为了满足项目依赖框架的使用为此我们需要在pom中依赖HBase以及MapReduce相关依赖项,具体依赖如下所示。

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.hadoop</groupId>
  4. <artifactId>hadoop-client</artifactId>
  5. <version>3.1.1</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.hadoop</groupId>
  9. <artifactId>hadoop-minicluster</artifactId>
  10. <version>3.1.1</version>
  11. <scope>test</scope>
  12. </dependency>
  13. <dependency>
  14. <groupId>org.apache.hbase</groupId>
  15. <artifactId>hbase-mapreduce</artifactId>
  16. <version>2.1.0</version>
  17. </dependency>
  18. <dependency>
  19. <groupId>org.apache.hbase</groupId>
  20. <artifactId>hbase-client</artifactId>
  21. <version>2.1.0</version>
  22. </dependency>
  23. <dependency>
  24. <groupId>org.apache.hbase</groupId>
  25. <artifactId>hbase-server</artifactId>
  26. <version>2.1.0</version>
  27. <scope>provided</scope>
  28. </dependency>
  29. <dependency>
  30. <groupId>junit</groupId>
  31. <artifactId>junit</artifactId>
  32. <version>4.11</version>
  33. <scope>test</scope>
  34. </dependency>
  35. </dependencies>

以上依赖中我们可以看到,为了简化我们的开发工作,避免部署过多的服务。这里采用了 hadoop-minicluster 来模拟
相关环境,但是我们依然需要通过Docker部署HBase环境便于连接到该服务。完成以上服务依赖后就基本完成的项目的搭
建。

二、 开发项目

下面我们将列举两个例子,其中一个例子为从HBase读取数据然后写入HDFS,另一个则为从HDFS读取数据写入到HBase中。
通过两种例子,读者就可以灵活组合进行使用了。

1. 从HBase写入HDFS

下面为我们编写的核心服务代码。

public class ReadFromHBaseDriver extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new WriteToHBaseDriver(), args);
        System.exit(exitCode);
    }

    @Override
    public int run(String[] args) throws Exception {
        Job job = new Job(getConf(), "Max temperature");
        job.setJarByClass(getClass());
        FileOutputFormat.setOutputPath(job, new Path(args[0]));

        job.setMapperClass(ReadFromHBaseMapper.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        Scan scan = new Scan();
        scan.addColumn(Bytes.toBytes("mycf"), Bytes.toBytes("hits"));
        TableMapReduceUtil.initTableMapperJob("mytable", scan, ReadFromHBaseMapper.class, Text.class, Text.class, job);
        job.setNumReduceTasks(0);
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static class ReadFromHBaseMapper extends TableMapper<Text, Text> {
        private Text resultKey = new Text();
        private Text resultVal = new Text();

        @Override
        protected void map(ImmutableBytesWritable key, Result value,
                Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)
                throws IOException, InterruptedException {
                    StringBuilder sb = new StringBuilder();
            // 通过cell获取表中的列值
            for (Cell cell : value.listCells()) {
                String val = Bytes.toString(Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(),
                        cell.getValueOffset() + cell.getValueLength()));
                sb.append(val).append("|");
            }
            // 写入HDFS
            resultKey.set(Bytes.toString(value.getRow()));
            resultVal.set(sb.deleteCharAt(sb.length()-1).toString());
            context.write(resultKey, resultVal);
        }
    }
}

单元测试代码

@Test
public void test() throws Exception {
    Configuration conf = new Configuration();
    conf.set("fs.defaultFS", "file:///");
    conf.set("mapreduce.framework.name", "local");
    conf.set("hbase.zookeeper.quorum", "127.0.0.1");
    conf.set("hbase.zookeeper.property.clientPort", "2181");
    conf.setInt("mapreduce.task.io.sort.mb", 1);

    Path input = new Path("g://output.txt");

    FileSystem fs = FileSystem.getLocal(conf);

    ReadFromHBaseDriver driver = new ReadFromHBaseDriver();
    driver.setConf(conf);

    int exitCode = driver.run(new String[] {
        input.toString()
        });

    assertThat(exitCode, is(0));
}

我们通过实际调试就可以观察其具体表现。

2. 从HDFS写入HBase

核心服务代码

public class WriteToHBaseDriver extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new WriteToHBaseDriver(), args);
        System.exit(exitCode);
    }

    @Override
    public int run(String[] args) throws Exception {
        Job job = new Job(getConf(), "Max temperature");
        job.setJarByClass(getClass());
        FileInputFormat.addInputPath(job, new Path(args[0]));

        job.setMapperClass(MaxTemperatureMapper.class);
        job.setReducerClass(IntSumReducer.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(LongWritable.class);

        TableMapReduceUtil.initTableReducerJob("mytable", IntSumReducer.class, job);
        return job.waitForCompletion(true) ? 0 : 1;
    }

    static class MaxTemperatureMapper extends Mapper<LongWritable, Text,LongWritable,LongWritable> {

        @Override
        protected void map(LongWritable key, Text value,
                Mapper<LongWritable, Text, LongWritable, LongWritable>.Context context)
                throws IOException, InterruptedException {
            byte[] row = Bytes.toBytes("row10");
            byte[] family = Bytes.toBytes("mycf");
            byte[] column = Bytes.toBytes("text");
            byte[] val = Bytes.toBytes("val1");

            Put put = new Put(row);
            put.addColumn(family, column, val);

            context.write(new LongWritable(1), new LongWritable(1));
        }
    }

    static class IntSumReducer extends TableReducer<LongWritable, LongWritable, Text> {
        private Text resultKey = new Text();

        @Override
        public void reduce(LongWritable key, Iterable<LongWritable> values, Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            for (LongWritable value : values) {
                sum += value.get();
            }

            Put put = new Put(key.toString().getBytes());
            put.addColumn("mycf".getBytes(), "count".getBytes(), Bytes.toBytes(sum));
            context.write(resultKey, put);
        }
    }
}

单元测试代码

@Test
public void test() throws Exception {
    Configuration conf = new Configuration();
    conf.set("fs.defaultFS", "file:///");
    conf.set("mapreduce.framework.name", "local");
    conf.set("hbase.zookeeper.quorum", "127.0.0.1");
    conf.set("hbase.zookeeper.property.clientPort", "2181");
    conf.setInt("mapreduce.task.io.sort.mb", 1);

    Path input = new Path("g://data.txt");

    FileSystem fs = FileSystem.getLocal(conf);

    WriteToHBaseDriver driver = new WriteToHBaseDriver();
    driver.setConf(conf);

    int exitCode = driver.run(new String[] {
        input.toString()
        });

    assertThat(exitCode, is(0));
}