mapreduce编程

跑一个mapreduce程序 需要yarn

执行命令:hadoop jar jar包文件路径(绝对的、相对的) 主函数 参数

  1. # 上传文件到 hadoop
  2. [root@hadoop01 hadoop3.2.1]# history |grep hadoop
  3. 136 hadoop fs -mkdir /inhadoop fs -mkdir /in
  4. 137 hadoop fs -out LICENSE.txt /in/01.txt
  5. 138 hadoop fs -put LICENSE.txt /in/01.txt
  6. 139 hadoop fs -put LICENSE.txt /in/02.txt
  7. 140 hadoop fs -put LICENSE.txt /in/03.txt
  8. [root@hadoop01 mapreduce]# pwd
  9. /usr/local/hadoop3.2.1/share/hadoop/mapreduce
  10. [root@hadoop01 mapreduce]# hadoop jar hadoop-mapreduce-examples-3.2.1.jar wordcount /in /wcout

出现错误

  1. [2019-11-05 20:54:32.193]Container exited with a non-zero exit code 127. Error file: prelaunch.err.
  2. Last 4096 bytes of prelaunch.err :
  3. Last 4096 bytes of stderr :
  4. /bin/bash: /bin/java: No such file or directory
  5. [2019-11-05 20:54:32.193]Container exited with a non-zero exit code 127. Error file: prelaunch.err.
  6. Last 4096 bytes of prelaunch.err :
  7. Last 4096 bytes of stderr :
  8. /bin/bash: /bin/java: No such file or directory
  9. 添加环境变量都试过了,没有办法
  10. 算了,没有就给你一个吧(每个节点都给)
  11. ln -s /usr/local/java1.8/bin/java /bin/java

Mapreduce编程 - 图1

wordcount运行日志

  1. [root@hadoop01 mapreduce]# hadoop jar hadoop-mapreduce-examples-3.2.1.jar wordcount /in /wcout
  2. 2019-11-05 20:57:27,211 INFO client.RMProxy: Connecting to ResourceManager at hadoop02/192.168.61.102:8032
  3. 2019-11-05 20:57:28,037 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1572958454724_0004
  4. 2019-11-05 20:57:28,194 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
  5. 2019-11-05 20:57:28,436 INFO input.FileInputFormat: Total input files to process : 3
  6. 2019-11-05 20:57:28,500 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
  7. 2019-11-05 20:57:28,548 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
  8. 2019-11-05 20:57:28,971 INFO mapreduce.JobSubmitter: number of splits:3
  9. 2019-11-05 20:57:29,217 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
  10. 2019-11-05 20:57:29,271 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1572958454724_0004
  11. 2019-11-05 20:57:29,271 INFO mapreduce.JobSubmitter: Executing with tokens: []
  12. 2019-11-05 20:57:29,546 INFO conf.Configuration: resource-types.xml not found
  13. 2019-11-05 20:57:29,547 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
  14. 2019-11-05 20:57:29,632 INFO impl.YarnClientImpl: Submitted application application_1572958454724_0004
  15. 2019-11-05 20:57:29,671 INFO mapreduce.Job: The url to track the job: http://hadoop02:8088/proxy/application_1572958454724_0004/
  16. # job_时间戳_job的编号(起始 0001)
  17. 2019-11-05 20:57:29,672 INFO mapreduce.Job: Running job: job_1572958454724_0004
  18. 2019-11-05 20:57:39,891 INFO mapreduce.Job: Job job_1572958454724_0004 running in uber mode : false
  19. # 先 map 后 reduce
  20. 2019-11-05 20:57:39,892 INFO mapreduce.Job: map 0% reduce 0%
  21. 2019-11-05 20:58:08,354 INFO mapreduce.Job: map 100% reduce 0%
  22. 2019-11-05 20:58:20,518 INFO mapreduce.Job: map 100% reduce 100%
  23. # 运行成功
  24. 2019-11-05 20:58:21,541 INFO mapreduce.Job: Job job_1572958454724_0004 completed successfully
  25. # 结果计数器:总体统计(读了多少数据,写出多少字节数据)
  26. 2019-11-05 20:58:24,245 INFO mapreduce.Job: Counters: 54
  27. File System Counters
  28. FILE: Number of bytes read=140544
  29. FILE: Number of bytes written=1184301
  30. FILE: Number of read operations=0
  31. FILE: Number of large read operations=0
  32. FILE: Number of write operations=0
  33. HDFS: Number of bytes read=451992
  34. HDFS: Number of bytes written=35843
  35. HDFS: Number of read operations=14
  36. HDFS: Number of large read operations=0
  37. HDFS: Number of write operations=2
  38. HDFS: Number of bytes read erasure-coded=0
  39. Job Counters
  40. Launched map tasks=3
  41. Launched reduce tasks=1
  42. Data-local map tasks=3
  43. Total time spent by all maps in occupied slots (ms)=74817
  44. Total time spent by all reduces in occupied slots (ms)=9165
  45. Total time spent by all map tasks (ms)=74817
  46. Total time spent by all reduce tasks (ms)=9165
  47. Total vcore-milliseconds taken by all map tasks=74817
  48. Total vcore-milliseconds taken by all reduce tasks=9165
  49. Total megabyte-milliseconds taken by all map tasks=76612608
  50. Total megabyte-milliseconds taken by all reduce tasks=9384960
  51. Map-Reduce Framework
  52. Map input records=8442
  53. Map output records=65712
  54. Map output bytes=702105
  55. Map output materialized bytes=140556
  56. Input split bytes=285
  57. Combine input records=65712
  58. Combine output records=8943
  59. Reduce input groups=2981
  60. Reduce shuffle bytes=140556
  61. Reduce input records=8943
  62. Reduce output records=2981
  63. Spilled Records=17886
  64. Shuffled Maps =3
  65. Failed Shuffles=0
  66. Merged Map outputs=3
  67. GC time elapsed (ms)=4536
  68. CPU time spent (ms)=6180
  69. Physical memory (bytes) snapshot=671293440
  70. Virtual memory (bytes) snapshot=10928197632
  71. Total committed heap usage (bytes)=381550592
  72. Peak Map Physical memory (bytes)=189513728
  73. Peak Map Virtual memory (bytes)=2733113344
  74. Peak Reduce Physical memory (bytes)=106672128
  75. Peak Reduce Virtual memory (bytes)=2735411200
  76. Shuffle Errors
  77. BAD_ID=0
  78. CONNECTION=0
  79. IO_ERROR=0
  80. WRONG_LENGTH=0
  81. WRONG_MAP=0
  82. WRONG_REDUCE=0
  83. File Input Format Counters
  84. Bytes Read=451707
  85. File Output Format Counters
  86. Bytes Written=35843

WordCount 程序官方

下载 jar包

  1. <dependency>
  2. <groupId>org.apache.hadoop</groupId>
  3. <artifactId>hadoop-mapreduce-examples</artifactId>
  4. <version>${hadoop.version}</version>
  5. </dependency>

/maven-repo/org/apache/hadoop/hadoop-mapreduce-examples/3.2.1/hadoop-mapreduce-examples-3.2.1.jar!/org/apache/hadoop/examples/WordCount.class

观察依赖

ExampleDriver

这里就是 hadoop jar XXX.jar 主函数

主函数位置写的是 wordcount的原因

  1. package org.apache.hadoop.examples;
  2. import org.apache.hadoop.examples.dancing.DistributedPentomino;
  3. import org.apache.hadoop.examples.dancing.Sudoku;
  4. import org.apache.hadoop.examples.pi.DistBbp;
  5. import org.apache.hadoop.examples.terasort.TeraGen;
  6. import org.apache.hadoop.examples.terasort.TeraSort;
  7. import org.apache.hadoop.examples.terasort.TeraValidate;
  8. import org.apache.hadoop.util.ProgramDriver;
  9. public class ExampleDriver {
  10. public ExampleDriver() {
  11. }
  12. public static void main(String[] argv)
  13. //退出标
  14. int exitCode = -1;
  15. //项目驱动对对象,类似一个容器集合
  16. ProgramDriver pgd = new ProgramDriver();
  17. try {
  18. //运行wordcount,实际运行的是 WordCount.class
  19. pgd.addClass("wordcount", WordCount.class, "A map/reduce program that counts the words in the input files.");
  20. pgd.addClass("wordmean", WordMean.class, "A map/reduce program that counts the average length of the words in the input files.");
  21. pgd.addClass("wordmedian", WordMedian.class, "A map/reduce program that counts the median length of the words in the input files.");
  22. pgd.addClass("wordstandarddeviation", WordStandardDeviation.class, "A map/reduce program that counts the standard deviation of the length of the words in the input files.");
  23. pgd.addClass("aggregatewordcount", AggregateWordCount.class, "An Aggregate based map/reduce program that counts the words in the input files.");
  24. pgd.addClass("aggregatewordhist", AggregateWordHistogram.class, "An Aggregate based map/reduce program that computes the histogram of the words in the input files.");
  25. pgd.addClass("grep", Grep.class, "A map/reduce program that counts the matches of a regex in the input.");
  26. pgd.addClass("randomwriter", RandomWriter.class, "A map/reduce program that writes 10GB of random data per node.");
  27. pgd.addClass("randomtextwriter", RandomTextWriter.class, "A map/reduce program that writes 10GB of random textual data per node.");
  28. pgd.addClass("sort", Sort.class, "A map/reduce program that sorts the data written by the random writer.");
  29. pgd.addClass("pi", QuasiMonteCarlo.class, "A map/reduce program that estimates Pi using a quasi-Monte Carlo method.");
  30. pgd.addClass("bbp", BaileyBorweinPlouffe.class, "A map/reduce program that uses Bailey-Borwein-Plouffe to compute exact digits of Pi.");
  31. pgd.addClass("distbbp", DistBbp.class, "A map/reduce program that uses a BBP-type formula to compute exact bits of Pi.");
  32. pgd.addClass("pentomino", DistributedPentomino.class, "A map/reduce tile laying program to find solutions to pentomino problems.");
  33. pgd.addClass("secondarysort", SecondarySort.class, "An example defining a secondary sort to the reduce.");
  34. pgd.addClass("sudoku", Sudoku.class, "A sudoku solver.");
  35. pgd.addClass("join", Join.class, "A job that effects a join over sorted, equally partitioned datasets");
  36. pgd.addClass("multifilewc", MultiFileWordCount.class, "A job that counts words from several files.");
  37. pgd.addClass("dbcount", DBCountPageView.class, "An example job that count the pageview counts from a database.");
  38. pgd.addClass("teragen", TeraGen.class, "Generate data for the terasort");
  39. pgd.addClass("terasort", TeraSort.class, "Run the terasort");
  40. pgd.addClass("teravalidate", TeraValidate.class, "Checking results of terasort");
  41. exitCode = pgd.run(argv);
  42. } catch (Throwable var4) {
  43. var4.printStackTrace();
  44. }
  45. System.exit(exitCode);
  46. }
  47. }

WordCount

  1. //
  2. // Source code recreated from a .class file by IntelliJ IDEA
  3. // (powered by Fernflower decompiler)
  4. //
  5. package org.apache.hadoop.examples;
  6. import java.io.IOException;
  7. import java.util.Iterator;
  8. import java.util.StringTokenizer;
  9. import org.apache.hadoop.conf.Configuration;
  10. import org.apache.hadoop.fs.Path;
  11. import org.apache.hadoop.io.IntWritable;
  12. import org.apache.hadoop.io.Text;
  13. import org.apache.hadoop.mapreduce.Job;
  14. import org.apache.hadoop.mapreduce.Mapper;
  15. import org.apache.hadoop.mapreduce.Reducer;
  16. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  17. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  18. import org.apache.hadoop.util.GenericOptionsParser;
  19. public class WordCount {
  20. public WordCount() {
  21. }
  22. public static void main(String[] args) throws Exception {
  23. Configuration conf = new Configuration();
  24. String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
  25. if (otherArgs.length < 2) {
  26. System.err.println("Usage: wordcount <in> [<in>...] <out>");
  27. System.exit(2);
  28. }
  29. Job job = Job.getInstance(conf, "word count");
  30. job.setJarByClass(WordCount.class);
  31. job.setMapperClass(WordCount.TokenizerMapper.class);
  32. job.setCombinerClass(WordCount.IntSumReducer.class);
  33. job.setReducerClass(WordCount.IntSumReducer.class);
  34. job.setOutputKeyClass(Text.class);
  35. job.setOutputValueClass(IntWritable.class);
  36. for(int i = 0; i < otherArgs.length - 1; ++i) {
  37. FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
  38. }
  39. FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
  40. System.exit(job.waitForCompletion(true) ? 0 : 1);
  41. }
  42. public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  43. private IntWritable result = new IntWritable();
  44. public IntSumReducer() {
  45. }
  46. public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
  47. int sum = 0;
  48. IntWritable val;
  49. for(Iterator var5 = values.iterator(); var5.hasNext(); sum += val.get()) {
  50. val = (IntWritable)var5.next();
  51. }
  52. this.result.set(sum);
  53. context.write(key, this.result);
  54. }
  55. }
  56. public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
  57. private static final IntWritable one = new IntWritable(1);
  58. private Text word = new Text();
  59. public TokenizerMapper() {
  60. }
  61. public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
  62. StringTokenizer itr = new StringTokenizer(value.toString());
  63. while(itr.hasMoreTokens()) {
  64. this.word.set(itr.nextToken());
  65. context.write(this.word, one);
  66. }
  67. }
  68. }
  69. }

编程结构

  1. public class WordCount {
  2. public WordCount() {
  3. }
  4. //主方法入口
  5. public static void main(String[] args) throws Exception {
  6. //组装map和reduce 并进行运行提交...
  7. }
  8. public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  9. //map方法....
  10. }
  11. public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
  12. //reduce的方法...
  13. }
  14. }

自己写 MapReduce

Driver.java :组装 map 和 reduce

  1. package pers.sfl.mapreduce;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.IntWritable;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.mapreduce.Job;
  8. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  9. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  10. /**
  11. * 驱动类
  12. * @author Administrator
  13. *
  14. */
  15. public class Driver {
  16. public static void main(String[] args) {
  17. //将mapper reducer类进行一个封装 封装为一个任务----job(作业)
  18. //加载配置文件
  19. Configuration conf=new Configuration();
  20. //启动一个Job 创建一个job对象
  21. try {
  22. Job job=Job.getInstance(conf);
  23. //设置这个job
  24. //设置整个job的主函数入口
  25. job.setJarByClass(Driver.class);
  26. //设置job的mappper的类
  27. job.setMapperClass(WordCountMapper.class);
  28. //设置job的reducer的类
  29. job.setReducerClass(WordCountReducer.class);
  30. //设置map输出key value的类型
  31. //指定了泛型 这里为什么还要设置一次 泛型的作用范围 编译的时候生效 运行的时候泛型会自动擦除
  32. job.setMapOutputKeyClass(Text.class);
  33. job.setMapOutputValueClass(IntWritable.class);
  34. //设置reduce的输出的k v类型 以下方法设置的是mr的最终输出
  35. job.setOutputKeyClass(Text.class);
  36. job.setOutputValueClass(IntWritable.class);
  37. //指定需要统计的文件的输入路径 FileInputFormat 文件输入类
  38. Path inpath=new Path(args[0]);
  39. FileInputFormat.addInputPath(job, inpath);
  40. //指定输出目录 输出路径不能存在的 否则会报错 默认输出是覆盖式的输出 如果输出目录存在 有可能造成原始数据的丢失
  41. Path outpath=new Path(args[1]);
  42. FileOutputFormat.setOutputPath(job, outpath);
  43. //提交job 执行这一句的时候 job才会提交 上面做的一系列的工作 都是设置job
  44. //job.submit();
  45. job.waitForCompletion(true);
  46. } catch (Exception e) {
  47. e.printStackTrace();
  48. }
  49. }
  50. }

WordCountMapper.java

  1. package pers.sfl.mapreduce;
  2. import java.io.IOException;
  3. import java.io.Serializable;
  4. import org.apache.hadoop.io.IntWritable;
  5. import org.apache.hadoop.io.LongWritable;
  6. import org.apache.hadoop.io.Text;
  7. /**
  8. * 单词统计
  9. */
  10. import org.apache.hadoop.mapreduce.Mapper;
  11. /**
  12. * 输入和输出 map类型
  13. * KEYIN, 输入的键的类型 这里指的是每一行的起始偏移量 long 0 12
  14. * VALUEIN,输入的value的类型 这里指的是每一行的内容 和偏移量一一对应的 String
  15. * 输出的类型取决于 业务
  16. * KEYOUT, 输出的键的类型 这里指的每一个单词 --- string
  17. * VALUEOUT,输出的值的类型 这里指的单词的次数 ---- int
  18. * @author Administrator
  19. *
  20. *
  21. * 这里的数据类型 不能使用java的原生类型
  22. * 序列化:数据持久化存储 或 网络传输的时候 数据需要序列化和反序列化的
  23. * 张三---序列化------010101110-----反序列化-----张三
  24. * java-----Serializable
  25. * mapreduce编程中的用于传输的数据类型必须是序列化和反序列化能力的
  26. * hadoop弃用了java中原生的Serializable 实现的自己的一套序列化和反序列化的 接口Writable 只会对数据的值进行序列化和反序列化
  27. * 原因:java中的序列化和反序列化太重 繁琐
  28. * Long 1
  29. * 对于一些常用的数据类型 hadoop帮我们实现好了:
  30. * int------intWritable
  31. * long----LongWritable
  32. * string-----Text
  33. * byte------ByteWritable
  34. * double----DoubleWritable
  35. * float-----FloatWritable
  36. * boolean-----BooleanWritable
  37. * null-----NullWritable
  38. *自己定义的需要序列化和反序列化 实现 Writable接口
  39. */
  40. public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
  41. //重写map函数
  42. /**
  43. * 参数:
  44. * hadoop的底层数据读取的时候 字节读取的
  45. * LongWritable key:输入的key 这里指的是每一行的偏移量 没有实际作用 一行的标识而已
  46. * Text value:输入的value 这里指的是一行的内容
  47. * Context context:上下文对象 用于传输 传输reduce中
  48. * 函数的调用频率:
  49. * 一行调用一次
  50. * 如果一个文件 10行-----map函数会被调用10次
  51. */
  52. @Override
  53. protected void map(LongWritable key,
  54. Text value,
  55. Context context)
  56. throws IOException, InterruptedException {
  57. //创建一个流 进行读取(mapreduce框架帮你做了) 每一行内容进行切分
  58. //获取每一行内容 进行切分
  59. //text--toString()--String
  60. String line = value.toString();
  61. //进行切分 hello word hello ww
  62. String[] words = line.split("\t");
  63. //循环遍历每一个单词 进行统计 直接发送到reduce端 发送的时候 k-v
  64. for(String w:words){
  65. //将String---Text
  66. Text mk=new Text(w);
  67. //int----IntWritable
  68. IntWritable mv=new IntWritable(1);
  69. //hello,1 world,1 hello,1 ww,1
  70. //这里write是直接写出 调用一次 就会写出一个 k---v写出reduce端
  71. context.write(mk, mv);
  72. }
  73. }
  74. }

WordCountReduce.java

  1. package pers.sfl.mapreduce;
  2. import java.io.IOException;
  3. import org.apache.hadoop.io.IntWritable;
  4. import org.apache.hadoop.io.Text;
  5. /**
  6. * 单词统计
  7. */
  8. import org.apache.hadoop.mapreduce.Reducer;
  9. /**
  10. * reduce的数据来源于map
  11. * @author Administrator
  12. *KEYIN, 输入的key的类型 这里指的是map输出key类型 Text
  13. * VALUEIN, 输入的value的类型 这里指的是map输出的value的类型 IntWritable
  14. *
  15. * 输出
  16. *KEYOUT, 输出的key 这里指的单词的类型 Text
  17. *VALUEOUT,输出的value的类型 这里指的是单词的总次数 IntWritable
  18. */
  19. public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
  20. //重写 reduce方法
  21. /**
  22. * 到reduce端的数据 是已经分好组的数据
  23. * 默认情况下 按照map输出的key进行分组 将map输出的key相同的分为一组
  24. * Text key, 每一组中的相同的key
  25. * Iterable<IntWritable> values, 每一组中的所有的value值 封装到一个迭代器中了
  26. Context context:上下文对象 用于传输的 写出到hdfs中
  27. reduce调用频率:
  28. 一组调用一次 每次只统计一个单词最终结果
  29. 每一组只能访问本组的数据 没有办法和上一组的额数据 下一组的数据共享的
  30. */
  31. @Override
  32. protected void reduce(Text key, Iterable<IntWritable> values,
  33. Context context) throws IOException, InterruptedException {
  34. //循环遍历values 求和
  35. int sum=0;
  36. for(IntWritable v:values){
  37. //intwritable---int 数值类型 get 将hadoop中的类型转换为java中的类型
  38. sum+=v.get();
  39. }
  40. //写出结果文件
  41. IntWritable rv=new IntWritable(sum);
  42. context.write(key, rv);
  43. }
  44. }

jar上传运行

将编写好的程序打成jar包,上传到服务器任意一个节点

运行

运行 指定main函数,程序中指定路径, 没有运行参数的:hadoop jar wc01.jar

  1. 报错:Error: java.io.IOException: Initialization of all the collectors failed. Error in last collector was :class com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider$Text
  2. Text导包问题
  3. 运行结果:2个文件
  4. _SUCCESS:大小为0 运行标志文件 证明job运行成功
  5. part-r-00000:结果文件
  6. r:最终结果reduce输出的
  7. m:最终结果map输出的结果
  8. 00000:文件编号 0开始
  9. 输出多个文件则顺序递增

没有指定main函数:

  1. 主函数的类一定是 全限定名
  2. hadoop jar wc02_nomain.jar pers.sfl.mapreduce.Driver

没有主函数 又需要参数的

  1. hadoop jar wc03_nomain_pa.jar pers.sfl.mapreduce.Driver /wcin /wc_out02

说明

文件输入(加载的类):FileInputFormat,默认的实现类是:TextInputFormat

文件读取: 默认的文件读取器:RecordReader,默认实现类:LineRecordReader

LineRecordReader的核心方法

  1. //判断当前的文件是否还有下一个 k-v 要读取
  2. public boolean nextKeyValue() throws IOException {
  3. }
  4. @Override
  5. //获取当前行返回的偏移量 -- LongWritable
  6. public LongWritable getCurrentKey() {
  7. return key;
  8. }
  9. @Override
  10. //获取当前行的内容 -- Text
  11. public Text getCurrentValue() {
  12. return value;
  13. }

过程

  1. 通过文件加载器和读取器,将文件内容发送到Map 端
  2. Map 端实现map方法(LongWritable key, Text value, Context context)
  3. 将内容进行切割得到(helllo,1),(helllo,1),(helllo,1),(world,1)
  4. shuffle 过程

    • 分区:partitioner
    • 排序: sort按照 key进行排序(字典顺序) writablecomparable
    • 分组:grop相同的 key 分为一组 writablecomparator
    • combiner 组件:优化
  5. reduce接收到的是key相同的一组数据
  6. FileInputFormat -> TextOutputFormat 文件输出类
  7. 使用文件写出器RecordWriter(输出流) -> LineRecordWriter 写到 hdfs

mapreduce的编程套路

  • reduce: 计算统计
  • 一般map端:只做数据的字段的摘取

3个文件中最大值 最小值 平均值

  1. maptask
  2. 每次只能取到一行的内容
  3. 拿到每一个数值 -> 发送
  4. map:输出
  5. 如何判断key:根据reduce端如何接受
  6. map输出的key:分组
  7. 想按找哪一个字段进行分组 mapkey就是那一个字段
  8. reduce一组会调用一次
  9. key:“数据” Text
  10. value:数据的值
  11. reducetask:一组
  12. 接受的数据 所有的数据分到一组
  13. 一次性访问所有的数据'
  14. 将所有的数据取出--values 求最大值 最小值 平均值