04 HBase与MapReduce整合

HBase与MapReduce整合时,有三种情形:

  • HBase作为MapReduce的数据流向;
  • HBase作为MapReduce的数据来源;
  • HBase同时作为MapReduce的数据来源和数据流向。

步骤

1. 准备

查看进程

  1. 6417 Jps
  2. 3780 ResourceManager
  3. 4086 NodeManager
  4. 3272 DataNode
  5. 6264 HRegionServer
  6. 3481 SecondaryNameNode
  7. 6044 HQuorumPeer
  8. 3133 NameNode
  9. 6111 HMaster

2. 导入数据

从HDFS导入数据到HBase时会将数据先暂存于hdfs://hadoop0:9000/user/root/tmp。如果此文件夹存在,则先删除它。

  1. # 如果此文件夹存在,则先删除它。
  2. hadoop fs -rm -r hdfs://hadoop0:9000/user/root/tmp
  3. # 在HDFS上创建/input2/music2用于存放上传的音乐播放数据文件music1.txt
  4. hadoop fs -mkdir hdfs://hadoop0:9000/input2/music2
  5. # 将音乐播放数据music1.txt复制到linux的/root目录下,再上传到HDFS上的/input2/music2目录
  6. hadoop fs -put /root/music11.txt hdfs://hadoop0:9000/input2/music2
  7. # 调用HBase提供的importtsv工具在HBase上创建表music,并指定列族和列(music是表名)
  8. hadoop jar /usr/local/hbase/lib/hbase-server-1.4.10.jar importtsv -Dimporttsv.bulk.output=tmp -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:singer,info:gender,info:ryghme,info:terminal music /input2/music2
  9. # 此时进入hbase shell可以看到music表,但是并没有任何内容
  10. # 调用HBase提供的completebulkload工具从暂存文件夹hdfs://hadoop0:9000/user/root/tmp加载数据到music表中
  11. hadoop jar /usr/local/hbase/lib/hbase-server-1.4.10.jar completebulkload tmp music
  12. # 此时进入hbase shell可以scan 'music',此时music表中已经有数据了
  13. # 进入HBase Shell,创建namelist表,拥有一个列族details,用于存放统计结果
  14. create 'namelist','details'
  15. scan 'music'
  16. # 准备就绪,可以调用MapReduce进行统计分析了

3. MapReduce处理

创建MyMapper类,继承自TableMapper,取出播放记录中的歌名,记为1次播放。

取出每行中的所有单元,实际上只扫描了一列(info:name,即音乐名称),因为在驱动中使用Scan来设置了过滤条件。

同时,将音乐名称作为key,播放次数(每次为1)作为value,传给Reducer来进一步统计分析。

  1. package com.hbaseapi;
  2. import java.io.IOException;
  3. import java.util.List;
  4. import org.apache.hadoop.hbase.Cell;
  5. import org.apache.hadoop.hbase.CellUtil;
  6. import org.apache.hadoop.hbase.client.Result;
  7. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  8. import org.apache.hadoop.hbase.mapreduce.TableMapper;
  9. import org.apache.hadoop.hbase.util.Bytes;
  10. import org.apache.hadoop.io.IntWritable;
  11. import org.apache.hadoop.io.Text;
  12. import org.apache.hadoop.mapreduce.Mapper;
  13. /**
  14. * 取出播放记录中的歌名,记为一次播放
  15. * @author root
  16. *
  17. */
  18. public class MyMapper extends TableMapper<Text,IntWritable>{
  19. @Override
  20. protected void map(ImmutableBytesWritable key, Result value,
  21. Mapper<ImmutableBytesWritable, Result, Text, IntWritable>.Context context)
  22. throws IOException, InterruptedException {
  23. //取出每行中所有单元,实际上只扫描了一列(info:name),因为在驱动中使用Scan设置了过滤条件
  24. List<Cell> cells = value.listCells();
  25. //将音乐名称作为key,播放次数(每次为1)作为value
  26. for (Cell cell : cells) {
  27. Text text = new Text(Bytes.toString(CellUtil.cloneValue(cell)));
  28. IntWritable iw = new IntWritable(1);
  29. context.write(text, iw);
  30. }
  31. }
  32. }

创建MyReducer,继承自TableReducer,将相同歌名的播放次数求和,与前面学习过的MapReduce入门示例WordCount的处理类似。统计完成后将结果另外输出到一个HBase表namelist中。

package com.hbaseapi;

import java.io.IOException;

import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Reducer;
/**
 *  将相同歌名的播放次数求和,统计完成后,将结果另外输出到一个HBase表的namelist中
 * @author root
 *
 */
public class MyReducer extends TableReducer<Text,IntWritable,Text>{
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                Reducer<Text, IntWritable, Text, Mutation>.Context context) throws IOException, InterruptedException {
            int playCount = 0;
            for (IntWritable intWritable : values) {
                playCount += intWritable.get();
            } 
            //为put操作指定行键
            Put put = new Put(Bytes.toBytes(key.toString()));
            //为put操作指定列和值
            put.addColumn(Bytes.toBytes("details"), Bytes.toBytes("rank"), Bytes.toBytes(playCount));
            context.write(key, put);
        }
}

编写驱动程序,为music表设置过滤条件,只保留了歌名name一个字段,同时用TableMapReduceUtil工具设置music表为MapReduce的输入表,设置namelist表为MapReduce的输出表。

package com.hbaseapi;

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;


/**
 * 编写驱动程序,为music表设置过滤条件,只保留歌名name一个子段
 * 同时用TableMapReduceUtil工具
 * 设置music表为MapReduce的输入表
 * 设置namelist表为MapReduce的输出表
 * @author root
 *
 */
public class Music {

    /**
     *  取出播放记录中的歌名,记为一次播放
     * @author root
     *
     */
    static class MyMapper extends TableMapper<Text,IntWritable>{

        protected void map(ImmutableBytesWritable key, Result value,
                Mapper<ImmutableBytesWritable, Result, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
            //取出每行中所有单元,实际上只扫描了一列(info:name),因为在驱动中使用Scan设置了过滤条件
            List<Cell> cells = value.listCells();
            //将音乐名称作为key,播放次数(每次为1)作为value
            for (Cell cell : cells) {
                Text text = new Text(Bytes.toString(CellUtil.cloneValue(cell)));
                IntWritable iw = new IntWritable(1);
                context.write(text, iw);
            }
        }
    }

    /**
     *  将相同歌名的播放次数求和,统计完成后,将结果另外输出到一个HBase表的namelist中
     * @author root
     *
     */
    static class MyReducer extends TableReducer<Text,IntWritable,Text>{
            @Override
            protected void reduce(Text key, Iterable<IntWritable> values,
                    Reducer<Text, IntWritable, Text, Mutation>.Context context) throws IOException, InterruptedException {
                int playCount = 0;
                for (IntWritable intWritable : values) {
                    playCount += intWritable.get();
                } 
                //为put操作指定行键
                Put put = new Put(Bytes.toBytes(key.toString()));
                //为put操作指定列和值
                put.addColumn(Bytes.toBytes("details"), Bytes.toBytes("rank"), Bytes.toBytes(playCount));
                context.write(key, put);
            }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.rootdir", "hdfs://hadoop0:9000/hbase");
        conf.set("hbase.zookeeper.quorum", "hadoop0");
        Job job = Job.getInstance(conf,"top-music");
        //MapReduce程序作业基本配置
        job.setJarByClass(Music.class);
        job.setNumReduceTasks(1);
        //为music表设置过滤条件,只保留歌名name
        Scan scan = new Scan();
        scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"));
        //使用HBase提供的工具类来设置job,这里导包注意import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
        TableMapReduceUtil.initTableMapperJob("music", scan, MyMapper.class, Text.class, IntWritable.class, job);
        TableMapReduceUtil.initTableReducerJob("namelist", MyReducer.class, job);
        job.waitForCompletion(true);
        System.out.println("执行成功,统计结果存于namelist中");
    }
}

4. 运行

导入/usr/local/hbase/lib目录下的全部jar包

04 HBase与MapReduce整合 - 图1