分析

为什么需要用mapreduce去访问hbase的数据?
——加快分析速度和扩展分析能力
Mapreduce访问hbase数据作分析一定是在离线分析的场景下应用

image.png

代码

从Hbase中读取数据分析写入hdfs

  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.fs.Path;
  3. import org.apache.hadoop.hbase.HBaseConfiguration;
  4. import org.apache.hadoop.hbase.client.Result;
  5. import org.apache.hadoop.hbase.client.Scan;
  6. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  7. import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
  8. import org.apache.hadoop.hbase.mapreduce.TableMapper;
  9. import org.apache.hadoop.io.NullWritable;
  10. import org.apache.hadoop.io.Text;
  11. import org.apache.hadoop.mapreduce.Job;
  12. import org.apache.hadoop.mapreduce.Reducer;
  13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  14. import java.io.IOException;
  15. public class HbaseReader {
  16. public static String t_user_info = "t_user_info";
  17. //这边泛型决定出去
  18. static class HdfsSinkMapper extends TableMapper<Text, NullWritable> {
  19. //key代表row key,value代表这一行结果
  20. @Override
  21. protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
  22. byte[] bytes = key.copyBytes();
  23. //把row key变为string
  24. String rowkey = new String(bytes);
  25. //从这行中取数据
  26. //注意bash_info这个列族下面的username这个列要有,不然会报空指针异常
  27. byte[] usernameBytes = value.getValue("base_info".getBytes(), "username".getBytes());
  28. String username = new String(usernameBytes);
  29. context.write(new Text(rowkey + "\t" + username), NullWritable.get());
  30. }
  31. }
  32. //reduce从map中拿数据
  33. static class HdfsSinkReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
  34. @Override
  35. protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
  36. context.write(key, NullWritable.get());
  37. }
  38. }
  39. public static void main(String[] args) throws Exception {
  40. Configuration conf = HBaseConfiguration.create();
  41. conf.set("hbase.zookeeper.quorum", "master:2181,slave:2181");
  42. Job job = Job.getInstance(conf);
  43. job.setJarByClass(HbaseReader.class);
  44. // job.setMapperClass(HdfsSinkMapper.class);
  45. Scan scan = new Scan();
  46. //初始化
  47. TableMapReduceUtil.initTableMapperJob(t_user_info, scan, HdfsSinkMapper.class, Text.class, NullWritable.class, job);
  48. job.setReducerClass(HdfsSinkReducer.class);
  49. FileOutputFormat.setOutputPath(job, new Path("/Users/jdxia/Desktop/website/hdfs/output"));
  50. job.setOutputKeyClass(Text.class);
  51. job.setOutputValueClass(NullWritable.class);
  52. job.waitForCompletion(true);
  53. }
  54. }

从hdfs中读取数据写入Hbase

测试数据

  1. 13902070000 www.baidu.com beijing
  2. 13902070006 www.google.com.hk beijing
  3. 13902070012 www.google.com.hk shanghai
  4. 13902070018 www.baidu.com shanghai
  5. 13902070024 www.baidu.com guanzhou
  6. 13902070030 www.baidu.com tianjin

创建表

  1. create 'flow_fields_import', 'info'

bean代码

  1. import org.apache.hadoop.io.Text;
  2. import org.apache.hadoop.io.WritableComparable;
  3. import java.io.DataInput;
  4. import java.io.DataOutput;
  5. import java.io.IOException;
  6. public class FlowBean implements WritableComparable<FlowBean> {
  7. private Text phone;
  8. private Text url;
  9. public FlowBean() {
  10. }
  11. public FlowBean(Text phone, Text url) {
  12. super();
  13. this.phone = phone;
  14. this.url = url;
  15. }
  16. public FlowBean(String phone, String url) {
  17. super();
  18. this.phone = new Text(phone);
  19. this.url = new Text(url);
  20. }
  21. @Override
  22. public void write(DataOutput out) throws IOException {
  23. out.writeUTF(phone.toString());
  24. out.writeUTF(url.toString());
  25. }
  26. @Override
  27. public void readFields(DataInput in) throws IOException {
  28. phone = new Text(in.readUTF());
  29. url = new Text(in.readUTF());
  30. }
  31. @Override
  32. public String toString() {
  33. final StringBuilder sb = new StringBuilder("{");
  34. sb.append("\"phone\":")
  35. .append(phone);
  36. sb.append(",\"url\":")
  37. .append(url);
  38. sb.append('}');
  39. return sb.toString();
  40. }
  41. public Text getPhone() {
  42. return phone;
  43. }
  44. public void setPhone(Text phone) {
  45. this.phone = phone;
  46. }
  47. public Text getUrl() {
  48. return url;
  49. }
  50. public void setUrl(Text url) {
  51. this.url = url;
  52. }
  53. @Override
  54. public int compareTo(FlowBean o) {
  55. return this.phone.toString().compareTo(o.getPhone().toString());
  56. }
  57. }

mapreduce代码

  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.fs.Path;
  3. import org.apache.hadoop.hbase.HBaseConfiguration;
  4. import org.apache.hadoop.hbase.HColumnDescriptor;
  5. import org.apache.hadoop.hbase.HTableDescriptor;
  6. import org.apache.hadoop.hbase.TableName;
  7. import org.apache.hadoop.hbase.client.HBaseAdmin;
  8. import org.apache.hadoop.hbase.client.Mutation;
  9. import org.apache.hadoop.hbase.client.Put;
  10. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  11. import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
  12. import org.apache.hadoop.hbase.mapreduce.TableReducer;
  13. import org.apache.hadoop.io.LongWritable;
  14. import org.apache.hadoop.io.NullWritable;
  15. import org.apache.hadoop.io.Text;
  16. import org.apache.hadoop.mapreduce.Job;
  17. import org.apache.hadoop.mapreduce.Mapper;
  18. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  19. import java.io.IOException;
  20. /**
  21. * User: jdxia
  22. * Date: 2018/7/25
  23. * Time: 15:40
  24. */
  25. public class HbaseSinker {
  26. public static String flow_fields_import = "flow_fields_import";
  27. public static class HbaseSinkMrMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable> {
  28. @Override
  29. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  30. String line = value.toString();
  31. String[] fields = line.split(" ");
  32. String phone = fields[0];
  33. String url = fields[1];
  34. FlowBean bean = new FlowBean(phone, url);
  35. context.write(bean, NullWritable.get());
  36. }
  37. }
  38. public static class HbaseSinkMrReducer extends TableReducer<FlowBean, NullWritable, ImmutableBytesWritable> {
  39. @Override
  40. protected void reduce(FlowBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
  41. Put put = new Put(key.getPhone().getBytes());
  42. put.add("f1".getBytes(), "url".getBytes(), key.getUrl().getBytes());
  43. context.write(new ImmutableBytesWritable(key.getPhone().getBytes()), put);
  44. }
  45. }
  46. public static void main(String[] args) throws Exception {
  47. Configuration conf = HBaseConfiguration.create();
  48. conf.set("hbase.zookeeper.quorum", "master:2181,slave1:2181,slave2:2181");
  49. HBaseAdmin hBaseAdmin = new HBaseAdmin(conf);
  50. boolean tableExists = hBaseAdmin.tableExists(flow_fields_import);
  51. if (tableExists) {
  52. hBaseAdmin.disableTable(flow_fields_import);
  53. hBaseAdmin.deleteTable(flow_fields_import);
  54. }
  55. HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(flow_fields_import));
  56. HColumnDescriptor hColumnDescriptor = new HColumnDescriptor("f1".getBytes());
  57. desc.addFamily(hColumnDescriptor);
  58. hBaseAdmin.createTable(desc);
  59. Job job = Job.getInstance(conf);
  60. job.setJarByClass(HbaseSinker.class);
  61. job.setMapperClass(HbaseSinkMrMapper.class);
  62. TableMapReduceUtil.initTableReducerJob(flow_fields_import, HbaseSinkMrReducer.class, job);
  63. FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/data.txt"));
  64. job.setMapOutputKeyClass(FlowBean.class);
  65. job.setMapOutputValueClass(NullWritable.class);
  66. job.setOutputKeyClass(ImmutableBytesWritable.class);
  67. job.setOutputValueClass(Mutation.class);
  68. job.waitForCompletion(true);
  69. }
  70. }