步骤分解
1、导入必备的jar包
<!-- 内部已经依赖了hadoop-common -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>2.2.0</version>
</dependency>
<!-- 引入hadoop-mapreduce-client-core Jar包 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.2.0</version>
</dependency>
<!-- 这个jar包也许不是必要的,但是我在本地调试是必要的 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>2.2.0</version>
</dependency>
<!-- 读写hbase必备 -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.2.0</version>
</dependency>
<!-- 不引入的话会报ClassNotFound错,可以尝试注解掉,看是否能正常运行 -->
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.1</version>
</dependency>
2、配置HBase连接信息,主要是zooker信息
//1、配置hbase连接
Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum", "hd02,hd04,hd06,hd07,hd08");
config.set("hbase.zookeeper.property.clientPort", "2181");
3、然后配置查询信息,主要构建Scan对象
//2、配置Scan对象,查询操作
Scan scan = new Scan();
scan.setCaching(200);
//不设置缓存
scan.setCacheBlocks(false);
//设置startRow 和 endRow,我这里是全表扫描,所以注释掉了
// scan.withStartRow(Bytes.toBytes("00_13980337439_2017-03-30 23:04:15_15897468949_1_1669"));
// scan.withStopRow(Bytes.toBytes("00_15422018558_2017-08-11 17:48:12_19902496992_1_0553"));
//这个属性不设置应该也行,因为下面的TableMapReduceUtil.initTableMapperJob会设置
scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes("bigdata:twx_telephone"));
4、构建MP Job配置Reducer信息
这里只设置Reducer信息即可。Mapper在下面设置:
Job job = Job.getInstance(config,"ExampleReadWrite");
job.setJarByClass(HBaseMapReduce.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1);
//setOutputFormatClass(NullOutputFormat.class)输出为空,只为了测试Mapper输入
//如果设置了NullOutputFormat.class,把上面的Reducer相关代码注释掉
// job.setOutputFormatClass(NullOutputFormat.class);
//设置结果输出路径
FileOutputFormat.setOutputPath(job,new Path("hdfs://hd03:8020/tangwx/mapred-example/output"));
5、设置Mapper
TableMapReduceUtil.initTableMapperJob("bigdata:twx_telephone",scan,
MyMapper.class, //mapper class
Text.class, //mapper output key class
LongWritable.class,//mapper output value class
job);
完整代码
mapper-reduce
package com.soyuan.mapredexample.tabelmapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.Iterator;
/**
* @author tangwx@soyuan.com.cn
* @date 2019-08-19 11:36
*/
public class HBaseMapReduce {
static class MyMapper extends TableMapper<Text, LongWritable> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
// 获取某个列的值
byte[] caller = value.getValue(Bytes.toBytes("f1"), Bytes.toBytes("call1"));
// System.out.println("caller: "+Bytes.toString(caller));
context.write(new Text(caller),new LongWritable(1));
}
}
static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long sum=0;
Iterator<LongWritable> iterator = values.iterator();
while (iterator.hasNext()) {
LongWritable next = iterator.next();
sum += next.get();
}
context.write(key,new LongWritable(sum));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1、配置hbase连接
Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum", "hd02,hd04,hd06,hd07,hd08");
config.set("hbase.zookeeper.property.clientPort", "2181");
Job job = Job.getInstance(config,"ExampleReadWrite");
job.setJarByClass(HBaseMapReduce.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1);
// job.setOutputFormatClass(NullOutputFormat.class);
FileOutputFormat.setOutputPath(job,new Path("hdfs://hd03:8020/tangwx/mapred-example/output"));
// job.addFileToClassPath(new Path("hdfs://hd03:8020/wangxin/jar/*"));
//2、配置Scan对象,查询操作
Scan scan = new Scan();
scan.setCaching(200);
scan.setCacheBlocks(false);
scan.setLimit(10);
// scan.withStartRow(Bytes.toBytes("00_13980337439_2017-03-30 23:04:15_15897468949_1_1669"));
// scan.withStopRow(Bytes.toBytes("00_15422018558_2017-08-11 17:48:12_19902496992_1_0553"));
scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes("bigdata:twx_telephone"));
TableMapReduceUtil.initTableMapperJob("bigdata:twx_telephone",scan,
MyMapper.class,
Text.class,
LongWritable.class,
job);
System.exit(job.waitForCompletion(true)==true?0:-1);
}
}
如果想本地调试,可以把 FileOutputFormat.setOutputPath(job,new Path("hdfs://hd03:8020/tangwx/mapred-example/output"));
这行代码的路径改成本地路径(当前是hdfs上的路径)。例如 FileOutputFormat.setOutputPath(job,new Path("mapred-example/output"));
pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>telecom-service</artifactId>
<groupId>com.soyuan</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>mapred-example</artifactId>
<dependencies>
<!-- 内部已经依赖了hadoop-common -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>2.2.0</version>
</dependency>
<!-- 引入hadoop-mapreduce-client-core Jar包 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 将依赖一起打包 -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
部署/运行
mvn打包: mvn clean package -DskipTests
上传jar包: scp target/mapred-example-1.0-SNAPSHOT.jar admin@hd06:~
运行: yarn jar mapred-example-1.0-SNAPSHOT.jar com.soyuan.mapredexample.tabelmapper.HBaseMapReduce
会报错 org.apache.hadoop.hbase.HBaseConfiguration
ClassNotFound错。原因是mapred-example-1.0-SNAPSHOT.jar是最小依赖。与HBase相关的类库不在系统的classpath下。
那么如何将hbase的类库添加到HADOOP_CLASSPATH下呢?
add habase jar to HADOOP_CLASSPATH
HADOOP_CLASSPATH=`/opt/cloudera/parcels/CDH/bin/hbase classpath`:`/opt/cloudera/parcels/CDH/bin/hadoop classpath`
export HADOOP_CLASSPATH
即使这么做了,运行yarn jar mapred-example-1.0-SNAPSHOT.jar com.soyuan.mapredexample.tabelmapper.HBaseMapReduce还是会报错。原因是我们引入了第三方jar包,这些jar包在hbase、hadoop的classpath下都不存在。所以得在运行时添加这些jar包。
网上搜索到的方法有两种:
一:在执行命令时加上参数 -libjars 3rd-jar-path
二:在代码层加上这么一句:job.addFileToClassPath(new Path(“hdfs://hd03:8020/wangxin/jar/*”));
(我试过两种方法,好像都不行。可能是哪里出问题了!)
因为我已经理不清哪些是第三方jar包、哪些是hbase、hadoop自带的jar了,所以我选择最简洁的方式:将mapper-reduce的所有依赖通通打成一个jar文件,也就是pom.xml文件中看到的那个插件。
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
完整的jar包名称是: mapred-example-1.0-SNAPSHOT-jar-with-dependencies.jar
上传至服务器后,执行命令: yarn jar mapred-example-1.0-SNAPSHOT-jar-with-dependencies.jar com.soyuan.mapredexample.tabelmapper.HBaseMapReduce
,等待结果,大功告成!!!!