1. MapReduce

MapReduce是一个分布式运算程序的编程框架 将用户编写的业务逻辑代码和自带默认组件整合一贯完整的分布式运算程序 并发运行在一个Haoop集群上

优点:它简单的实现一些接口,就可以完成一个分布式程序

缺点:每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。

10. MapReduce - 图1

2. Hadoop原生MapReduce

存储在/opt/module/hadoop-3.1.3/share/hadoop/mapreduce

  • yarn jar MapReduce路径 wordcount 输入hdfs文件路径 输出hdfs文件保存路径还是在hdfs上(必须是不存在的文件夹否则报错)
  1. cd /opt/module/hadoop-3.1.3
  2. yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount /fiddler.md /output

打开hdfs中output中的part-r-xxxx 里面会统计出每个字词出现的次数

3. 常用数据序列化类型

10. MapReduce - 图2

4. WordCount编写

使用IDEA中创建hadoop项目 创建maven项目

WcDriver类

  1. package com.mywordcount;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.IntWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  8. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  9. import java.io.IOException;
  10. public class WcDriver {
  11. public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
  12. //1.获取Job实例
  13. Job job = Job.getInstance(new Configuration());
  14. //2.设置jar包
  15. job.setJarByClass(WcDriver.class);
  16. //设置Mapper和Reducer
  17. job.setMapOutputKeyClass(WcDriver.class);
  18. job.setReducerClass(WcReducer.class);
  19. //设置Map和Reduce的输出类型
  20. job.setMapOutputKeyClass(Text.class);
  21. job.setMapOutputValueClass(IntWritable.class);
  22. job.setOutputKeyClass(Text.class);
  23. job.setOutputValueClass(IntWritable.class);
  24. //设置输入输出文件
  25. FileInputFormat.setInputPaths(job, new Path(args[0]));
  26. FileOutputFormat.setOutputPath(job, new Path(args[1]));
  27. //提交job
  28. boolean result = job.waitForCompletion(true);
  29. System.exit(result ? 0 : 1);
  30. }
  31. }

WcMapper

  1. package com.mywordcount;
  2. import org.apache.hadoop.io.IntWritable;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Mapper;
  6. import java.io.IOException;
  7. public class WcMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
  8. private IntWritable one = new IntWritable(1);
  9. private Text word = new Text();
  10. /**
  11. * 框架将数据拆成一行一行输入进来 把数据变成(单词,1)的形式
  12. *
  13. * @param key 行号
  14. * @param value 行内容
  15. * @param context 任务本身
  16. * @throws IOException
  17. * @throws InterruptedException
  18. */
  19. @Override
  20. protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
  21. //行数据
  22. String line = value.toString();
  23. //拆分成若干个单词
  24. String[] words = line.split(" ");
  25. //将(单词,1)写回框架
  26. for (String word : words) {
  27. this.word.set(word);
  28. context.write(this.word, this.one);
  29. }
  30. }
  31. }

WcReducer

  1. package com.mywordcount;
  2. import org.apache.hadoop.io.IntWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Reducer;
  5. import java.io.IOException;
  6. public class WcReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  7. private IntWritable result = new IntWritable();
  8. /**
  9. * 框架把单词分好组给我们, 我们将同一个单词的次数进行增加
  10. *
  11. * @param key 单词
  12. * @param values 此时为1 数量
  13. * @param context 任务本身
  14. * @throws IOException
  15. * @throws InterruptedException
  16. */
  17. @Override
  18. protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
  19. //累加
  20. int sum = 0;
  21. for (IntWritable value : values) {
  22. sum += value.get();
  23. }
  24. result.set(sum);
  25. context.write(key, result);
  26. }
  27. }

10. MapReduce - 图3

打包项目 把maprduce1-1.0上传到集群中 打包前注意java版本 请用1.8打包

https://zhuanlan.zhihu.com/p/348660719 还有pom.xml版本要设置

10. MapReduce - 图4

  • yarn jar mapreduce1-1.0-SNAPSHOT.jar 全类名引用路径 /fiddler.md /output2
  1. yarn jar mapreduce1-1.0-SNAPSHOT.jar com.mywordcount.WcDriver /fiddler.md /output2

5. HaDoop序列化

序列号就是把内存中的对象,转换为二进制序列 以便持久化

JAVA序列化是一个重量级序列化框架 会附带额外的信息(校验信息 header 继承体系等)

但Hadoop不需要这么多信息,所以Hadoop拥有自己的一套序列化体系(Writable)

5.1. 统计流量案例

  1. FlowBean类 ```java package com.flow;

import org.apache.hadoop.io.Writable;

import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;

public class FlowBean implements Writable { private long upFlow; private long downFlow; private long sumFlow;

  1. @Override
  2. public String toString() {
  3. return "FlowBean{" +
  4. "upFlow=" + upFlow +
  5. ", downFlow=" + downFlow +
  6. ", sumFlow=" + sumFlow +
  7. '}';
  8. }
  9. public void set(long upFlow, long downFlow) {
  10. this.downFlow = downFlow;
  11. this.upFlow = upFlow;
  12. this.sumFlow = upFlow + downFlow;
  13. }
  14. public long getUpFlow() {
  15. return upFlow;
  16. }
  17. public void setUpFlow(long upFlow) {
  18. this.upFlow = upFlow;
  19. }
  20. public long getDownFlow() {
  21. return downFlow;
  22. }
  23. public void setDownFlow(long downFlow) {
  24. this.downFlow = downFlow;
  25. }
  26. public long getSumFlow() {
  27. return sumFlow;
  28. }
  29. public void setSumFlow(long sumFlow) {
  30. this.sumFlow = sumFlow;
  31. }
  32. /**
  33. * 将对象数据写出到框架指定地方 序列化
  34. *
  35. * @param dataOutput 数据的容器
  36. * @throws IOException
  37. */
  38. @Override
  39. public void write(DataOutput dataOutput) throws IOException {
  40. dataOutput.writeLong(upFlow);
  41. dataOutput.writeLong(downFlow);
  42. dataOutput.writeLong(sumFlow);
  43. }
  44. /**
  45. * 从框架指定地方读取数据填充对象 反序列化
  46. *
  47. * @param dataInput
  48. * @throws IOException
  49. */
  50. @Override
  51. public void readFields(DataInput dataInput) throws IOException {
  52. //读写顺序要一致
  53. this.upFlow = dataInput.readLong();
  54. this.downFlow = dataInput.readLong();
  55. this.sumFlow = dataInput.readLong();
  56. }

}

  1. 2.
  2. FlowMapper
  3. ```java
  4. package com.flow;
  5. import org.apache.hadoop.io.LongWritable;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.mapreduce.Mapper;
  8. import java.io.IOException;
  9. public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
  10. private Text phone = new Text();
  11. private FlowBean flow = new FlowBean();
  12. @Override
  13. protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
  14. //拿到一行数据
  15. String line = value.toString();
  16. //切分
  17. String[] split = line.split("\t");
  18. //封装
  19. phone.set(split[1]);
  20. flow.set(
  21. Long.parseLong(split[split.length - 3]),//upFlow
  22. Long.parseLong(split[split.length - 2]) //downFlow
  23. );
  24. context.write(phone, flow);
  25. }
  26. }
  1. FlowReducer类 ```java package com.flow;

import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowReducer extends Reducer {

  1. private FlowBean flow = new FlowBean();
  2. @Override
  3. protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException {
  4. //累加流量
  5. long sumUpFlow = 0;
  6. long sumDownFlow = 0;
  7. for (FlowBean value : values) {
  8. sumUpFlow += value.getUpFlow();
  9. sumDownFlow += value.getDownFlow();
  10. }
  11. //封装为flow对象
  12. flow.set(sumUpFlow, sumDownFlow);
  13. context.write(key, flow);
  14. }

}

  1. 4.
  2. FlowDriver
  3. ```java
  4. package com.flow;
  5. import org.apache.hadoop.conf.Configuration;
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  10. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  11. import java.io.IOException;
  12. public class FlowDriver {
  13. public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
  14. Job job = Job.getInstance(new Configuration());
  15. job.setJarByClass(FlowDriver.class);
  16. job.setMapperClass(FlowMapper.class);
  17. job.setReducerClass(FlowReducer.class);
  18. job.setMapOutputKeyClass(Text.class);
  19. job.setMapOutputValueClass(FlowBean.class);
  20. job.setOutputKeyClass(Text.class);
  21. job.setOutputValueClass(FlowBean.class);
  22. FileInputFormat.setInputPaths(job, new Path(args[0]));
  23. FileOutputFormat.setOutputPath(job, new Path(args[1]));
  24. boolean completion = job.waitForCompletion(true);
  25. System.exit(completion ? 0 : 1);
  26. }
  27. }
  1. 打包成jar文件上传到hadoop中
    1. #创建目录
    2. hadoop fs -mkdir /input
    3. hadoop fs -put /home/atguigu/phone_data.txt /input
    4. yarn jar mapreduce1-1.0-SNAPSHOT.jar com.flow.FlowDriver /input /output3