分析
为什么需要用mapreduce去访问hbase的数据?
——加快分析速度和扩展分析能力
Mapreduce访问hbase数据作分析一定是在离线分析的场景下应用
代码
从Hbase中读取数据分析写入hdfs
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class HbaseReader {
public static String t_user_info = "t_user_info";
//这边泛型决定出去
static class HdfsSinkMapper extends TableMapper<Text, NullWritable> {
//key代表row key,value代表这一行结果
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
byte[] bytes = key.copyBytes();
//把row key变为string
String rowkey = new String(bytes);
//从这行中取数据
//注意bash_info这个列族下面的username这个列要有,不然会报空指针异常
byte[] usernameBytes = value.getValue("base_info".getBytes(), "username".getBytes());
String username = new String(usernameBytes);
context.write(new Text(rowkey + "\t" + username), NullWritable.get());
}
}
//reduce从map中拿数据
static class HdfsSinkReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "master:2181,slave:2181");
Job job = Job.getInstance(conf);
job.setJarByClass(HbaseReader.class);
// job.setMapperClass(HdfsSinkMapper.class);
Scan scan = new Scan();
//初始化
TableMapReduceUtil.initTableMapperJob(t_user_info, scan, HdfsSinkMapper.class, Text.class, NullWritable.class, job);
job.setReducerClass(HdfsSinkReducer.class);
FileOutputFormat.setOutputPath(job, new Path("/Users/jdxia/Desktop/website/hdfs/output"));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.waitForCompletion(true);
}
}
从hdfs中读取数据写入Hbase
测试数据
13902070000 www.baidu.com beijing
13902070006 www.google.com.hk beijing
13902070012 www.google.com.hk shanghai
13902070018 www.baidu.com shanghai
13902070024 www.baidu.com guanzhou
13902070030 www.baidu.com tianjin
创建表
create 'flow_fields_import', 'info'
bean代码
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowBean implements WritableComparable<FlowBean> {
private Text phone;
private Text url;
public FlowBean() {
}
public FlowBean(Text phone, Text url) {
super();
this.phone = phone;
this.url = url;
}
public FlowBean(String phone, String url) {
super();
this.phone = new Text(phone);
this.url = new Text(url);
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(phone.toString());
out.writeUTF(url.toString());
}
@Override
public void readFields(DataInput in) throws IOException {
phone = new Text(in.readUTF());
url = new Text(in.readUTF());
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("{");
sb.append("\"phone\":")
.append(phone);
sb.append(",\"url\":")
.append(url);
sb.append('}');
return sb.toString();
}
public Text getPhone() {
return phone;
}
public void setPhone(Text phone) {
this.phone = phone;
}
public Text getUrl() {
return url;
}
public void setUrl(Text url) {
this.url = url;
}
@Override
public int compareTo(FlowBean o) {
return this.phone.toString().compareTo(o.getPhone().toString());
}
}
mapreduce代码
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
/**
* User: jdxia
* Date: 2018/7/25
* Time: 15:40
*/
public class HbaseSinker {
public static String flow_fields_import = "flow_fields_import";
public static class HbaseSinkMrMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(" ");
String phone = fields[0];
String url = fields[1];
FlowBean bean = new FlowBean(phone, url);
context.write(bean, NullWritable.get());
}
}
public static class HbaseSinkMrReducer extends TableReducer<FlowBean, NullWritable, ImmutableBytesWritable> {
@Override
protected void reduce(FlowBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
Put put = new Put(key.getPhone().getBytes());
put.add("f1".getBytes(), "url".getBytes(), key.getUrl().getBytes());
context.write(new ImmutableBytesWritable(key.getPhone().getBytes()), put);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "master:2181,slave1:2181,slave2:2181");
HBaseAdmin hBaseAdmin = new HBaseAdmin(conf);
boolean tableExists = hBaseAdmin.tableExists(flow_fields_import);
if (tableExists) {
hBaseAdmin.disableTable(flow_fields_import);
hBaseAdmin.deleteTable(flow_fields_import);
}
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(flow_fields_import));
HColumnDescriptor hColumnDescriptor = new HColumnDescriptor("f1".getBytes());
desc.addFamily(hColumnDescriptor);
hBaseAdmin.createTable(desc);
Job job = Job.getInstance(conf);
job.setJarByClass(HbaseSinker.class);
job.setMapperClass(HbaseSinkMrMapper.class);
TableMapReduceUtil.initTableReducerJob(flow_fields_import, HbaseSinkMrReducer.class, job);
FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/data.txt"));
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Mutation.class);
job.waitForCompletion(true);
}
}