MapReduce是Hadoop的一个分布式计算框架,用于编写批处理应用程序。编写好的程序可以提交到Hadoop集群上用于并行处理大规模的数据集。MapReduce专门用于处理key,value键值对处理。它将作业视为一组key,value,并生成一组key,value作为输出

    MapReduce编程模型简述
    Hadoop之MapReduce - 图1

    1. input:读取文件
    2. spliting:将文件进行拆分。得到K1行数和V1对应的文本类容。
    3. mapping: 并行将每一行按照空格进行拆分,拆分得到List(k2,v2),其中k2代表每一个单词,由于是做词频统计,所以V2的值为1代表出现一次
    4. shuffling:由于Mapping操作可能是在不同的机器上并行处理的。所以需要通过shuffling将相同key值的数据分发到同一个节点上合并。这样才能统计出最终的结果,此时得到K2为每一个单词。List为可迭代集合,v2就是Mapping中的V2。
    5. Reducing:这里的案例是统计单词出来的总结数,所以reducing对List进行归约求和操作,最终输出。
      MapReduce编程模型中splitting和shuffing操作都是由框架实现的,`需要我们自己编程的只有mapping和reducing,这也就是MapReduce这个称呼的来源。

    InputFormat & RecordReaders
    InputFormat将输出文件拆分为多个InputSplit,并由RecordReaders将InputSplit转换为标准的Key,Value键值对,作为map的输出。这一步的意义在于只有先进行逻辑拆分并转为标准的键值对才能为map提供输入以便并行处理。

    Combiner
    combiner是map运算后的可选操作,它实际上是一个本地化的reduce操作,它主要是map计算出中间文件后做一个简单的合并重复key值的操作。
    map在遇到一个hadoop的单词时就会记录1.但是这边文章里可能会出现N次,那么map输出文件冗余就会很多。因此在reduce计算前相同的key做一个合并操作,那么需要传输的数据量就很减少。

    Partitioner
    partitioner可以理解成分类器,将map的输出按照key值的不同分别分给对应的reducer,支持自定义实现。

    wordCount项目案例
    创建maven项目 引入pom文件

    1. //因为采用的pom文件 有的jar冲突 所以我采用了maven helper插件移除了这些冲突的依赖项
    2. <dependency>
    3. <groupId>org.apache.hadoop</groupId>
    4. <artifactId>hadoop-client</artifactId>
    5. <version>2.7.7</version>
    6. <exclusions>
    7. <exclusion>
    8. <artifactId>jackson-core-asl</artifactId>
    9. <groupId>org.codehaus.jackson</groupId>
    10. </exclusion>
    11. <exclusion>
    12. <artifactId>jackson-mapper-asl</artifactId>
    13. <groupId>org.codehaus.jackson</groupId>
    14. </exclusion>
    15. <exclusion>
    16. <artifactId>commons-collections</artifactId>
    17. <groupId>commons-collections</groupId>
    18. </exclusion>
    19. <exclusion>
    20. <artifactId>commons-logging</artifactId>
    21. <groupId>commons-logging</groupId>
    22. </exclusion>
    23. <exclusion>
    24. <artifactId>commons-lang</artifactId>
    25. <groupId>commons-lang</groupId>
    26. </exclusion>
    27. <exclusion>
    28. <artifactId>guava</artifactId>
    29. <groupId>com.google.guava</groupId>
    30. </exclusion>
    31. <exclusion>
    32. <artifactId>log4j</artifactId>
    33. <groupId>log4j</groupId>
    34. </exclusion>
    35. <exclusion>
    36. <artifactId>netty</artifactId>
    37. <groupId>io.netty</groupId>
    38. </exclusion>
    39. <exclusion>
    40. <artifactId>gson</artifactId>
    41. <groupId>com.google.code.gson</groupId>
    42. </exclusion>
    43. </exclusions>
    44. </dependency>

    在hadoop中讲解mapreducer有提到过 我们只需要实现map 和reducer
    所以下面给出map和reducer的自定义实现

    1. import org.apache.hadoop.io.IntWritable;
    2. import org.apache.hadoop.io.LongWritable;
    3. import org.apache.hadoop.io.Text;
    4. import org.apache.hadoop.mapreduce.Mapper;
    5. import java.io.IOException;
    6. public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    7. @Override
    8. protected void map(LongWritable key, Text value, Context context) throws IOException,
    9. InterruptedException {
    10. String[] words = value.toString().split("\t");
    11. for (String word : words) {
    12. context.write(new Text(word), new IntWritable(1));
    13. }
    14. }
    15. }
    16. import org.apache.hadoop.io.IntWritable;
    17. import org.apache.hadoop.io.Text;
    18. import org.apache.hadoop.mapreduce.Reducer;
    19. import java.io.IOException;
    20. public class WordReducer extends Reducer <Text, IntWritable, Text, IntWritable> {
    21. @Override
    22. protected void reduce(Text key, Iterable <IntWritable> values, Context context) throws IOException,
    23. InterruptedException {
    24. int count = 0;
    25. for (IntWritable value : values) {
    26. count += value.get();
    27. }
    28. context.write( key, new IntWritable( count ) );
    29. }
    30. }
    31. import org.apache.hadoop.io.IntWritable;
    32. import org.apache.hadoop.io.Text;
    33. import org.apache.hadoop.mapreduce.Job;
    34. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    35. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    36. import java.io.IOException;
    37. import java.net.URI;
    38. import java.net.URISyntaxException;
    39. public class WordCount {
    40. private static final String HDFS_URL="hdfs://192.168.80.153:8020";
    41. private static final String HADOOP_USER_NAME="root";
    42. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
    43. if (args.length<2){
    44. System.out.println("Input and output paths are necessary!");
    45. return;
    46. }
    47. // 需要指明 hadoop 用户名,否则在 HDFS 上创建目录时可能会抛出权限不足的异常
    48. System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME);
    49. Configuration configuration = new Configuration();
    50. // 指明 HDFS 的地址
    51. configuration.set("fs.defaultFS", HDFS_URL);
    52. // 创建一个 Job
    53. Job job = Job.getInstance(configuration);
    54. job.setJarByClass( WordCount.class );
    55. // 设置 Mapper 和 Reducer
    56. job.setMapperClass( WordCountMapper.class);
    57. job.setReducerClass( WordReducer.class);
    58. // 设置 Mapper 输出 key 和 value 的类型
    59. job.setMapOutputKeyClass( Text.class);
    60. job.setMapOutputValueClass( IntWritable.class);
    61. // 设置 Reducer 输出 key 和 value 的类型
    62. job.setOutputKeyClass(Text.class);
    63. job.setOutputValueClass(IntWritable.class);
    64. // 如果输出目录已经存在,则必须先删除,否则重复运行程序时会抛出异常
    65. FileSystem fileSystem = FileSystem.get(new URI(HDFS_URL), configuration, HADOOP_USER_NAME);
    66. Path outputPath = new Path(args[1]);
    67. if (fileSystem.exists(outputPath)) {
    68. fileSystem.delete(outputPath, true);
    69. }
    70. // 设置作业输入文件和输出文件的路径
    71. FileInputFormat.setInputPaths(job, new Path(args[0]));
    72. FileOutputFormat.setOutputPath(job, outputPath);
    73. // 将作业提交到群集并等待它完成,参数设置为 true 代表打印显示对应的进度
    74. boolean result = job.waitForCompletion(true);
    75. // 关闭之前创建的 fileSystem
    76. fileSystem.close();
    77. // 根据作业结果,终止当前运行的 Java 虚拟机,退出程序
    78. System.exit(result ? 0 : -1);
    79. }
    80. }

    编写完成代码 采用maven 的package 功能打包成jar 提交到服务器
    使用 命令进行运行

    hadoop jar /usr/local//hdfs-0.0.1-SNAPSHOT.jar \
     com.spring.hdfs.WordCount   \
     /wordcount/input.txt /wordcount/output/wordcount  --wordcount/input.txt 是读取的文件地址 --workcount/output/workcount 是项目运行完成之后保存在hdfs中的地址
    

    —关于HDFS一些操作指令

    1. 创建一个文件夹    hdfs dfs -mkdir /myTask
    2. 创建多个文件夹    hdfs dfs -mkdir -p /myTask1/input1
    3. 上传文件 hdfs dfs -put /opt/wordcount.txt /myTask/input
    4. 查看总目录下的文件和文件夹 hdfs dfs -ls /
    5. 查看myTask下的文件和文件夹 hdfs dfs -ls /myTask
    6. 查看myTask下的wordcount.txt的内容 hdfs dfs -cat /myTask/wordcount.txt
    7. 删除总目录下的myTask2文件夹以及里面的文件和文件夹 hdfs dfs -rmr /myTask2
    8. 删除myTask下的wordcount.txt hdfs dfs -rmr /myTask/wordcount.txt
    9. 下载hdfs中myTask/input/wordcount.txt到本地opt文件夹中 hdfs dfs -get /myTask/input/wordcount.txt /opt