07 MapREduce分区与自定义数据

  1. package com.etc;
  2. import java.io.DataInput;
  3. import java.io.DataOutput;
  4. import java.io.IOException;
  5. import java.net.URI;
  6. import java.net.URISyntaxException;
  7. import org.apache.hadoop.conf.Configuration;
  8. import org.apache.hadoop.fs.FileSystem;
  9. import org.apache.hadoop.fs.Path;
  10. import org.apache.hadoop.io.IntWritable;
  11. import org.apache.hadoop.io.LongWritable;
  12. import org.apache.hadoop.io.NullWritable;
  13. import org.apache.hadoop.io.Text;
  14. import org.apache.hadoop.io.WritableComparable;
  15. import org.apache.hadoop.io.compress.CompressionCodec;
  16. import org.apache.hadoop.io.compress.GzipCodec;
  17. import org.apache.hadoop.mapreduce.Job;
  18. import org.apache.hadoop.mapreduce.Mapper;
  19. import org.apache.hadoop.mapreduce.Partitioner;
  20. import org.apache.hadoop.mapreduce.Reducer;
  21. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  22. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  23. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  24. public class RectangleSort {
  25. static final String INPUT_PATH = "hdfs://hadoop0:9000/data/Rectangle.txt";
  26. static final String OUTPUT_PATH = "hdfs://hadoop0:9000/output/ide";
  27. public static void main(String[] args) throws Throwable, URISyntaxException {
  28. Configuration conf = new Configuration();
  29. // conf.setBoolean("mapred.compress.map.output", true);
  30. // conf.setClass("mapred.map.output. compression.codec",
  31. // GzipCodec.class,CompressionCodec.class);
  32. FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
  33. Path outpath = new Path(OUTPUT_PATH);
  34. if (fileSystem.exists(outpath)) {
  35. fileSystem.delete(outpath, true);
  36. }
  37. Job job = new Job(conf, "RectangleSort");
  38. job.setJarByClass(RectangleWritable.class);
  39. org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job, INPUT_PATH);
  40. job.setInputFormatClass(TextInputFormat.class);
  41. job.setMapperClass(MyMapper.class);
  42. job.setMapOutputKeyClass(RectangleWritable.class);
  43. job.setMapOutputValueClass(NullWritable.class);
  44. job.setReducerClass(MyReducer.class);
  45. job.setOutputKeyClass(IntWritable.class);
  46. job.setOutputValueClass(IntWritable.class);
  47. FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
  48. // FileOutputFormat.setCompressOutput(job, true); // job使用压缩
  49. // FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); // 设置压缩格式
  50. job.setOutputFormatClass(TextOutputFormat.class);
  51. job.setPartitionerClass(MyPatitioner.class);
  52. job.setNumReduceTasks(2);
  53. job.waitForCompletion(true);
  54. }
  55. /**
  56. * Map函数
  57. * @author root
  58. *
  59. */
  60. static class MyMapper extends Mapper<LongWritable, Text, RectangleWritable, NullWritable> {
  61. protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
  62. String[] splites = v1.toString().split(" ");
  63. RectangleWritable k2 = new RectangleWritable(Integer.parseInt(splites[0]), Integer.parseInt(splites[1]));
  64. context.write(k2, NullWritable.get());
  65. };
  66. }
  67. /**
  68. * Reduce函数
  69. * @author root
  70. *
  71. */
  72. static class MyReducer extends Reducer<RectangleWritable, NullWritable, IntWritable, IntWritable> {
  73. protected void reduce(RectangleWritable k2, Iterable<NullWritable> v2s, Context context)
  74. throws IOException, InterruptedException {
  75. context.write(new IntWritable(k2.getLength()), new IntWritable(k2.getWidth()));
  76. };
  77. }
  78. }
  79. class RectangleWritable implements WritableComparable {
  80. // 矩形长和宽
  81. int length, width;
  82. public int getLength() {
  83. return length;
  84. }
  85. public void setLength(int length) {
  86. this.length = length;
  87. }
  88. public int getWidth() {
  89. return width;
  90. }
  91. public void setWidth(int width) {
  92. this.width = width;
  93. }
  94. // 无参构造方法
  95. public RectangleWritable() {
  96. super();
  97. }
  98. // 有参构造
  99. public RectangleWritable(int length, int width) {
  100. super();
  101. this.length = length;
  102. this.width = width;
  103. }
  104. // 序列化写
  105. public void write(DataOutput out) throws IOException {
  106. out.writeInt(length);
  107. out.writeInt(width);
  108. }
  109. // 序列化读
  110. public void readFields(DataInput in) throws IOException {
  111. this.length = in.readInt();
  112. this.width = in.readInt();
  113. }
  114. // 比较方法,实现按面积大小排序
  115. public int compareTo(Object o) {
  116. RectangleWritable to = (RectangleWritable) o;
  117. if (this.getLength() * this.getWidth() > to.getLength() * to.getWidth())
  118. return 1;
  119. if (this.getLength() * this.getWidth() < to.getLength() * to.getWidth())
  120. return -1;
  121. return 0;
  122. }
  123. }
  124. /**
  125. * 判断一个矩形是否三正方形,并交给特定的Reduce人物处理的分区规则
  126. *
  127. * @author root
  128. *
  129. */
  130. class MyPatitioner extends Partitioner<RectangleWritable, NullWritable> {
  131. @Override
  132. public int getPartition(RectangleWritable k2, NullWritable v2, int numReduceTasks) {
  133. if (k2.getLength() == k2.getWidth()) {
  134. // 正方形在任务0中汇总
  135. return 0;
  136. } else
  137. // 长方形在任务1中汇总
  138. return 1;
  139. }
  140. }