为了实现控制最终文件的输出路径和输出格式,可以自定义OutputFormat。
    例如:要在一个MapReduce程序中根据数据的不同输出两类结果到不同目录,这类灵活的输出需求可以通过自定义类来实现。
    步骤:

    1. 自定义一个类继承FileOutputFormat
    2. 改写RecordWriter,具体改写输出数据的方法write()
    3. 在主程序中设置输出类为自定义类

    案例:
    (需求)过滤输入的log日志,包含atguigu的网址输出到/log/atguigu.log,不包含atguigu的网站输出到/log/other.log
    (1) 数据
    log.txt

    1. package com.BigData.MapReduce;
    2. import org.apache.hadoop.conf.Configuration;
    3. import org.apache.hadoop.fs.FSDataOutputStream;
    4. import org.apache.hadoop.fs.FileSystem;
    5. import org.apache.hadoop.fs.Path;
    6. import org.apache.hadoop.io.*;
    7. import org.apache.hadoop.mapreduce.Job;
    8. import org.apache.hadoop.mapreduce.Mapper;
    9. import org.apache.hadoop.mapreduce.RecordWriter;
    10. import org.apache.hadoop.mapreduce.TaskAttemptContext;
    11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    13. import java.io.IOException;
    14. import java.net.URI;
    15. import java.net.URISyntaxException;
    16. public class MapReduceDemo {
    17. public static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    18. protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text,NullWritable>.Context context) throws java.io.IOException ,InterruptedException
    19. {
    20. String line = value.toString();
    21. context.write(new Text(line+"\n"),NullWritable.get());
    22. }
    23. }
    24. // =======分割线=========
    25. public static void main(String[] args) throws Exception{
    26. //设置配置参数
    27. Configuration conf = new Configuration();
    28. FileSystem fs = FileSystem.get(new URI("hdfs://192.168.142.20:9000"),conf);
    29. if(fs.exists(new Path("/out")))
    30. fs.delete(new Path("/out"),true);
    31. //创建任务
    32. conf.set("fs.defaultFS","hdfs://192.168.142.20:9000");
    33. Path input = new Path("/data/log.data");
    34. Path output = new Path("/out");
    35. Job job = Job.getInstance(conf, MapReduceDemo.class.getSimpleName());
    36. //指定jar文件
    37. job.setJarByClass(MapReduceDemo.class);
    38. //指定输入路径,数据在hdfs上的输入路径,指定第一个参数是hdfs输入路径
    39. FileInputFormat.addInputPath(job,input);
    40. //指定map的类
    41. job.setMapperClass(MyMapper.class);
    42. //指定map输出的key和value的数据类型。
    43. job.setMapOutputKeyClass(OrderBean.class);
    44. job.setMapOutputValueClass(Text.class);
    45. //指定reduce类以及输出数据类型。
    46. job.setNumReduceTasks(0);
    47. //指定输出路径hdfs
    48. job.setOutputFormatClass(MyFileOutPutFormat.class);
    49. FileOutputFormat.setOutputPath(job,output);
    50. //提交任务,如果是true,会返回任务执行的进度信息等。
    51. job.waitForCompletion(true);
    52. }
    53. }
    54. class MyFileOutPutFormat extends FileOutputFormat<Text,NullWritable>{
    55. @Override
    56. public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
    57. try {
    58. return new MyRecordWriter(taskAttemptContext);
    59. } catch (URISyntaxException e) {
    60. e.printStackTrace();
    61. }
    62. return null;
    63. }
    64. }
    65. class MyRecordWriter extends RecordWriter<Text, NullWritable>{
    66. private Path output1;
    67. private Path output2;
    68. private FSDataOutputStream fos1;
    69. private FSDataOutputStream fos2;
    70. private FileSystem fs;
    71. public MyRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, URISyntaxException {
    72. output1 = new Path("/log/atguigu.log");
    73. output2 = new Path("/log/other.log");
    74. Configuration conf = new Configuration();
    75. FileSystem fs = FileSystem.get(new URI("hdfs://192.168.142.20:9000"),conf);
    76. fos1 = fs.create(output1);
    77. fos2 = fs.create(output2);
    78. }
    79. @Override
    80. public void write(Text key, NullWritable value) throws IOException, InterruptedException {
    81. String data = key.toString();
    82. if(data.contains("atguigu"))
    83. fos1.writeUTF(data);
    84. else
    85. fos2.writeUTF(data);
    86. }
    87. @Override
    88. public void close(TaskAttemptContext context) throws IOException, InterruptedException {
    89. if(fos1!=null)
    90. fos1.close();
    91. if(fos2!=null)
    92. fos2.close();
    93. if(fs!=null)
    94. fs.close();
    95. }
    96. }

    (2)运行结果
    image.png