步骤分解

1、导入必备的jar包

  1. <!-- 内部已经依赖了hadoop-common -->
  2. <dependency>
  3. <groupId>org.apache.hbase</groupId>
  4. <artifactId>hbase-mapreduce</artifactId>
  5. <version>2.2.0</version>
  6. </dependency>
  7. <!-- 引入hadoop-mapreduce-client-core Jar包 -->
  8. <dependency>
  9. <groupId>org.apache.hadoop</groupId>
  10. <artifactId>hadoop-mapreduce-client-core</artifactId>
  11. <version>2.2.0</version>
  12. </dependency>
  13. <!-- 这个jar包也许不是必要的,但是我在本地调试是必要的 -->
  14. <dependency>
  15. <groupId>org.apache.hadoop</groupId>
  16. <artifactId>hadoop-mapreduce-client-common</artifactId>
  17. <version>2.2.0</version>
  18. </dependency>
  19. <!-- 读写hbase必备 -->
  20. <dependency>
  21. <groupId>org.apache.hbase</groupId>
  22. <artifactId>hbase-client</artifactId>
  23. <version>2.2.0</version>
  24. </dependency>
  25. <!-- 不引入的话会报ClassNotFound错,可以尝试注解掉,看是否能正常运行 -->
  26. <dependency>
  27. <groupId>commons-httpclient</groupId>
  28. <artifactId>commons-httpclient</artifactId>
  29. <version>3.1</version>
  30. </dependency>

2、配置HBase连接信息,主要是zooker信息

//1、配置hbase连接
Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum", "hd02,hd04,hd06,hd07,hd08");
config.set("hbase.zookeeper.property.clientPort", "2181");

3、然后配置查询信息,主要构建Scan对象

 //2、配置Scan对象,查询操作
Scan scan = new Scan();
scan.setCaching(200);
//不设置缓存
scan.setCacheBlocks(false);
//设置startRow 和 endRow,我这里是全表扫描,所以注释掉了
//        scan.withStartRow(Bytes.toBytes("00_13980337439_2017-03-30 23:04:15_15897468949_1_1669"));
//        scan.withStopRow(Bytes.toBytes("00_15422018558_2017-08-11 17:48:12_19902496992_1_0553"));

//这个属性不设置应该也行,因为下面的TableMapReduceUtil.initTableMapperJob会设置
scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes("bigdata:twx_telephone"));

4、构建MP Job配置Reducer信息

这里只设置Reducer信息即可。Mapper在下面设置:

Job job = Job.getInstance(config,"ExampleReadWrite");
job.setJarByClass(HBaseMapReduce.class);


job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1);

//setOutputFormatClass(NullOutputFormat.class)输出为空,只为了测试Mapper输入
//如果设置了NullOutputFormat.class,把上面的Reducer相关代码注释掉
//        job.setOutputFormatClass(NullOutputFormat.class);

//设置结果输出路径
FileOutputFormat.setOutputPath(job,new Path("hdfs://hd03:8020/tangwx/mapred-example/output"));

5、设置Mapper

TableMapReduceUtil.initTableMapperJob("bigdata:twx_telephone",scan,
                                      MyMapper.class,  //mapper class
                                      Text.class, //mapper output key class
                                      LongWritable.class,//mapper output value class
                                      job);

完整代码

mapper-reduce

package com.soyuan.mapredexample.tabelmapper;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.Iterator;

/**
 * @author tangwx@soyuan.com.cn
 * @date 2019-08-19 11:36
 */
public class HBaseMapReduce {


    static class MyMapper extends TableMapper<Text, LongWritable> {
        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
//            获取某个列的值
            byte[] caller = value.getValue(Bytes.toBytes("f1"), Bytes.toBytes("call1"));
//            System.out.println("caller: "+Bytes.toString(caller));
            context.write(new Text(caller),new LongWritable(1));
        }
    }

    static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

        @Override
        protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
            long sum=0;
            Iterator<LongWritable> iterator = values.iterator();
            while (iterator.hasNext()) {
                LongWritable next = iterator.next();
                sum += next.get();
            }
            context.write(key,new LongWritable(sum));
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //1、配置hbase连接
        Configuration config = HBaseConfiguration.create();
        config.set("hbase.zookeeper.quorum", "hd02,hd04,hd06,hd07,hd08");
        config.set("hbase.zookeeper.property.clientPort", "2181");

        Job job = Job.getInstance(config,"ExampleReadWrite");
        job.setJarByClass(HBaseMapReduce.class);


        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        job.setNumReduceTasks(1);

        //        job.setOutputFormatClass(NullOutputFormat.class);
        FileOutputFormat.setOutputPath(job,new Path("hdfs://hd03:8020/tangwx/mapred-example/output"));

//        job.addFileToClassPath(new Path("hdfs://hd03:8020/wangxin/jar/*"));

        //2、配置Scan对象,查询操作
        Scan scan = new Scan();
        scan.setCaching(200);
        scan.setCacheBlocks(false);
        scan.setLimit(10);
//        scan.withStartRow(Bytes.toBytes("00_13980337439_2017-03-30 23:04:15_15897468949_1_1669"));
//        scan.withStopRow(Bytes.toBytes("00_15422018558_2017-08-11 17:48:12_19902496992_1_0553"));
        scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes("bigdata:twx_telephone"));

        TableMapReduceUtil.initTableMapperJob("bigdata:twx_telephone",scan,
                MyMapper.class,
                Text.class,
                LongWritable.class,
                job);

        System.exit(job.waitForCompletion(true)==true?0:-1);

    }
}

如果想本地调试,可以把 FileOutputFormat.setOutputPath(job,new Path("hdfs://hd03:8020/tangwx/mapred-example/output")); 这行代码的路径改成本地路径(当前是hdfs上的路径)。例如 FileOutputFormat.setOutputPath(job,new Path("mapred-example/output"));

pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>telecom-service</artifactId>
        <groupId>com.soyuan</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>mapred-example</artifactId>

    <dependencies>
        <!-- 内部已经依赖了hadoop-common -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-mapreduce</artifactId>
            <version>2.2.0</version>
        </dependency>

        <!-- 引入hadoop-mapreduce-client-core Jar包 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.2.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-common</artifactId>
            <version>2.2.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.2.0</version>
        </dependency>

        <dependency>
            <groupId>commons-httpclient</groupId>
            <artifactId>commons-httpclient</artifactId>
            <version>3.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- 将依赖一起打包 -->
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

部署/运行

mvn打包: mvn clean package -DskipTests

上传jar包: scp target/mapred-example-1.0-SNAPSHOT.jar admin@hd06:~

运行: yarn jar mapred-example-1.0-SNAPSHOT.jar com.soyuan.mapredexample.tabelmapper.HBaseMapReduce

会报错 org.apache.hadoop.hbase.HBaseConfiguration ClassNotFound错。原因是mapred-example-1.0-SNAPSHOT.jar是最小依赖。与HBase相关的类库不在系统的classpath下。

那么如何将hbase的类库添加到HADOOP_CLASSPATH下呢?

add habase jar to HADOOP_CLASSPATH

HADOOP_CLASSPATH=`/opt/cloudera/parcels/CDH/bin/hbase classpath`:`/opt/cloudera/parcels/CDH/bin/hadoop classpath`
export HADOOP_CLASSPATH

即使这么做了,运行yarn jar mapred-example-1.0-SNAPSHOT.jar com.soyuan.mapredexample.tabelmapper.HBaseMapReduce还是会报错。原因是我们引入了第三方jar包,这些jar包在hbase、hadoop的classpath下都不存在。所以得在运行时添加这些jar包。

网上搜索到的方法有两种:

一:在执行命令时加上参数 -libjars 3rd-jar-path
二:在代码层加上这么一句:job.addFileToClassPath(new Path(“hdfs://hd03:8020/wangxin/jar/*”));

(我试过两种方法,好像都不行。可能是哪里出问题了!)


因为我已经理不清哪些是第三方jar包、哪些是hbase、hadoop自带的jar了,所以我选择最简洁的方式:将mapper-reduce的所有依赖通通打成一个jar文件,也就是pom.xml文件中看到的那个插件。

<plugin>
  <artifactId>maven-assembly-plugin</artifactId>
  <configuration>
    <descriptorRefs>
      <descriptorRef>jar-with-dependencies</descriptorRef>
    </descriptorRefs>
  </configuration>
</plugin>

完整的jar包名称是: mapred-example-1.0-SNAPSHOT-jar-with-dependencies.jar

上传至服务器后,执行命令: yarn jar mapred-example-1.0-SNAPSHOT-jar-with-dependencies.jar com.soyuan.mapredexample.tabelmapper.HBaseMapReduce ,等待结果,大功告成!!!!