一:HDFS和YARN

HDFS和YARN是两个概念,一个是文件存储技术,一个是文件读写技术

二:搭建

1:规划

  1. NN NN JN ZKFC ZK DN RM NM<br /> node01 * * *<br /> node02 * * * * * *<br /> node03 * * * * *<br /> node04 * * * *

2:配置

1):mapred-site.xml > mapreduce on yarn

  1. <property>
  2. <name>mapreduce.framework.name</name>
  3. <value>yarn</value>
  4. </property>

2):yarn-site.xml

  1. <property>
  2. <name>yarn.nodemanager.aux-services</name>
  3. <value>mapreduce_shuffle</value>
  4. </property>
  5. <property>
  6. <name>yarn.resourcemanager.ha.enabled</name>
  7. <value>true</value>
  8. </property>
  9. <property>
  10. <name>yarn.resourcemanager.zk-address</name>
  11. <value>node02:2181,node03:2181,node04:2181</value>
  12. </property>
  13. <property>
  14. <name>yarn.resourcemanager.cluster-id</name>
  15. <value>mashibing</value>
  16. </property>
  17. <property>
  18. <name>yarn.resourcemanager.ha.rm-ids</name>
  19. <value>rm1,rm2</value>
  20. </property>
  21. <property>
  22. <name>yarn.resourcemanager.hostname.rm1</name>
  23. <value>node03</value>
  24. </property>
  25. <property>
  26. <name>yarn.resourcemanager.hostname.rm2</name>
  27. <value>node04</value>
  28. </property>

3:root操作

node01

  1. cd $HADOOP_HOME/etc/hadoop
  2. cp mapred-site.xml.template mapred-site.xml
  3. vi mapred-site.xml
  4. vi yarn-site.xml
  5. scp mapred-site.xml yarn-site.xml node02:`pwd`
  6. scp mapred-site.xml yarn-site.xml node03:`pwd`
  7. scp mapred-site.xml yarn-site.xml node04:`pwd`
  8. vi slaves //可以不用管,搭建hdfs时候已经改过了。。。
  9. start-yarn.sh

node03~04

  1. yarn-daemon.sh start resourcemanager

访问

http://node03:8088
http://node04:8088
This is standby RM. Redirecting to the current active RM: http://node03:8088/

image.png
image.png

4: 官方案例使用wc

实战:MR ON YARN 的运行方式:
hdfs dfs -mkdir -p /data/wc/input
hdfs dfs -D dfs.blocksize=1048576 -put data.txt /data/wc/input
cd $HADOOP_HOME
cd share/hadoop/mapreduce
hadoop jar hadoop-mapreduce-examples-2.6.5.jar wordcount /data/wc/input /data/wc/output
1)webui:需要查看
2)cli:命令行
hdfs dfs -ls /data/wc/output
-rw-r—r— 2 root supergroup 0 2019-06-22 11:37 /data/wc/output/_SUCCESS //标志成功的文件
-rw-r—r— 2 root supergroup 788922 2019-06-22 11:37 /data/wc/output/part-r-00000 //数据文件
如果是reduce文件:part-r-00000
如果是map文件:part-m-00000
如果是map/reduce:r/m
hdfs dfs -cat /data/wc/output/part-r-00000
hdfs dfs -get /data/wc/output/part-r-00000 ./

5:开发简易MR程序

1:yarn-site.xml

2:mapred-site.xml

3:pom.xml

  1. <dependency>
  2. <groupId>org.apache.hadoop</groupId>
  3. <artifactId>hadoop-client</artifactId>
  4. <version>2.6.5</version>
  5. </dependency>

4:MyWordCount

  1. package com.hadoop.fastdfs.mapreduce.wc;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.IntWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  8. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  9. import org.apache.hadoop.util.GenericOptionsParser;
  10. public class MyWordCount {
  11. //bin/hadoop command [genericOptions] [commandOptions]
  12. // hadoop jar ooxx.jar ooxx -D ooxx=ooxx inpath outpath
  13. // args : 2类参数 genericOptions commandOptions
  14. // 人你有复杂度: 自己分析 args数组
  15. //
  16. public static void main(String[] args) throws Exception {
  17. Configuration conf = new Configuration(true);
  18. //GenericOptionsParser parser = new GenericOptionsParser(conf, args); //工具类帮我们把-D 等等的属性直接set到conf,会留下commandOptions
  19. //String[] othargs = parser.getRemainingArgs();
  20. //让框架知道是windows异构平台运行
  21. //conf.set("mapreduce.app-submission.cross-platform","true");
  22. // conf.set("mapreduce.framework.name","local");
  23. // System.out.println(conf.get("mapreduce.framework.name"));
  24. Job job = Job.getInstance(conf);
  25. // FileInputFormat.setMinInputSplitSize(job,2222);
  26. // job.setInputFormatClass(ooxx.class);
  27. // job.setJar("C:\\Users\\admin\\IdeaProjects\\msbhadoop\\target\\hadoop-hdfs-1.0-0.1.jar");
  28. //必须必须写的
  29. job.setJarByClass(MyWordCount.class);
  30. job.setJobName("mashibing");
  31. // Path infile = new Path(othargs[0]);
  32. Path infile = new Path("/data/wc/input");
  33. TextInputFormat.addInputPath(job, infile);
  34. //Path outfile = new Path(othargs[1]);
  35. Path outfile = new Path("/data/wc/output1");
  36. if (outfile.getFileSystem(conf).exists(outfile)) outfile.getFileSystem(conf).delete(outfile, true);
  37. TextOutputFormat.setOutputPath(job, outfile);
  38. job.setMapperClass(MyMapper.class);
  39. job.setMapOutputKeyClass(Text.class);
  40. job.setMapOutputValueClass(IntWritable.class);
  41. job.setReducerClass(MyReducer.class);
  42. // job.setNumReduceTasks(2);
  43. // Submit the job, then poll for progress until the job is complete
  44. job.waitForCompletion(true);
  45. }
  46. }

5:MyMapper

  1. package com.hadoop.fastdfs.mapreduce.wc;
  2. import org.apache.hadoop.io.IntWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Mapper;
  5. import java.io.IOException;
  6. import java.util.StringTokenizer;
  7. public class MyMapper extends Mapper<Object, Text, Text, IntWritable> {
  8. //hadoop框架中,它是一个分布式 数据 :序列化、反序列化
  9. //hadoop有自己一套可以序列化、反序列化
  10. //或者自己开发类型必须:实现序列化,反序列化接口,实现比较器接口
  11. //排序 -》 比较 这个世界有2种顺序: 8 11, 字典序、数值顺序
  12. private final static IntWritable one = new IntWritable(1);
  13. private Text word = new Text();
  14. //hello hadoop 1
  15. //hello hadoop 2
  16. //TextInputFormat
  17. //key 是每一行字符串自己第一个字节面向源文件的偏移量
  18. public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
  19. StringTokenizer itr = new StringTokenizer(value.toString());
  20. while (itr.hasMoreTokens()) {
  21. word.set(itr.nextToken());
  22. context.write(word, one);
  23. }
  24. }
  25. }

6:MyReducer

  1. package com.hadoop.fastdfs.mapreduce.wc;
  2. import org.apache.hadoop.io.IntWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Reducer;
  5. import java.io.IOException;
  6. public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  7. private IntWritable result = new IntWritable();
  8. //相同的key为一组 ,这一组数据调用一次reduce
  9. //hello 1
  10. //hello 1
  11. //hello 1
  12. //hello 1
  13. public void reduce(Text key, Iterable<IntWritable> values,/* 111111*/
  14. Context context) throws IOException, InterruptedException {
  15. int sum = 0;
  16. for (IntWritable val : values) {
  17. sum += val.get();
  18. }
  19. result.set(sum);
  20. context.write(key, result);
  21. }
  22. }

7:maven-》package

8:上传到node01

9:运行

hadoop jar hadoop-mapreduce-examples-2.6.5.jar MyWordCount

10:查看

hdfs dfs -ls /data/wc/output1
hdfs dfs -cat /data/wc/output1/part-r-00000
image.png

6:MR多种提交方式

1:上传jar到服务器

开发-> jar -> 上传到集群中的某一个节点 -> hadoop jar ooxx.jar ooxx in out

2:嵌入【linux,windows】(非hadoop jar)的集群方式 on yarn

集群:M、R
client -> RM -> AppMaster
//在集群运行
mapreduce.framework.name -> yarn
//异构平台
conf.set(“mapreduce.app-submission.cross-platform”,”true”);
//package打jar包设置本地jar路径
job.setJar(“C:\Users\Administrator\IdeaProjects\msbhadoop\target\hadoop-hdfs-1.0-0.1.jar”);

3:local,单机 自测

  1. mapreduce.framework.name -> local<br /> conf.set("mapreduce.app-submission.cross-platform","true"); //windows上必须配<br /> 1,在win的系统中部署我们的hadoop:<br /> C:\usr\hadoop-2.6.5\hadoop-2.6.5<br /> 2,在我给你的资料中\hadoop-install\soft\bin 文件覆盖到 你部署的bin目录下<br /> 还要将hadoop.dll 复制到 c:\windwos\system32\<br /> 3,设置环境变量:HADOOP_HOME C:\usr\hadoop-2.6.5\hadoop-2.6.5 <br /> <br /> IDE -> 集成开发: <br /> hadoop最好的平台是linux<br /> 部署hadoop,bin

4:参数个性化

  1. GenericOptionsParser parser = new GenericOptionsParser(conf, args); //工具类帮我们把-D 等等的属性直接set到conf,会留下commandOptions<br /> String[] othargs = parser.getRemainingArgs();