笔记内容选自慕课网《大数据开发工程师》体系课

5.1 Yarn概述

5.1.1 Yarn的由来

从Hadoop2开始,官方把资源管理单独剥离出来,主要是为了考虑后期作为一个公共的资源管理平台, 任何满足规则的计算引擎都可以在它上面执行

  • 实现Hadoop集群的资源共享
  • YARN不仅仅支持MapReduce,还支持SparkFlink等计算引擎

5.1.2 YARN架构分析

  • YARN主要负责集群资源的管理和调度,支持主从架构,主节点最多可以有2个,从节点可以有多个
  • ResourceManager:主节点主要负责集群资源的分配和管理
  • NodeManager:从节点主要负责当前机器资源管理

5.1.3 YARN资源管理模型

  • YARN主要管理内存CPU这两种资源类型
  • NodeManager启动时会向ResourceManager注册,注册信息中包含该节点可分配的CPU和内存总量提交注册上去,提交告诉给ResourceManager
  • 当所有的NodeManager注册完成后,ResourceManager它就斯道冒前集群韵资源总量

    5.1.4 Yarn的资源信息

    访问:http://bigdata01:8088/cluster
    image.png
    image.png
    **

  • 虚标的内存和CPU

    • yarn.nodemanager.resource.memory-mb:单节点可分配的物理内存总量,默认是8MB*1024,即8G
    • yarn.nodemanager.resource.cpu-vcores:单节点可分配的虚拟CPU个数,默认是8
  • 可以通过修改 yarn-site.xml 里面的配置

5.2 Yarn调度器

如果你提交了一个很占资源的任务,这一个任务就把集群中90%的资源都占用了,后面别人再提交任务, 剩下的资源就不够用了,这个时候怎么办?
YARN中支持三种调度器,来解决资源调度的问题
image.png

5.2.1 FIFO Scheduler

  • 先进先出(first in, first out)调度策略
    • 大家都是排队的,如果你的任务申请不到足够的资源,那就只能等着

5.2.2 Capacity Scheduler

  • FIFO Scheduler的多队列版本「Hadoop2.x开始默认调度器」
    • 可以人为的给这些资源定义使用场景,例如图里面的queue A里面运行普通的任务, queueB中运行优先级比较高的任务
    • 这两个队列的资源是相互对立的,但是注意一点,队列内部还是按照先进先出的规则

5.2.3 FairScheduler

  • 多队列,多用户共享资源
    • 假设向一个队列中提交了一个任务,这个任务刚开始会占用整个队列的资源
    • 当你再提交第二个任务的时候,第一个任务会把他的资源释放出来一部分给第二个任务使用

5.2.4 查看队列

访问:http://bigdata01:8088/cluster/scheduler
image.png

  • Capacity,这个是集群的调度器类型
  • root,是根的意思,它下面目前只有一个队列
  • default,我们之前提交的任务都会进入到这个队列中

5.2.5 YARN多资源队列配置和使用

  • 增加online队列和offline队列
    • online队列:负责实时任务
    • offline队列:负责离线任务
  • 向offline队列提交任务

1、停止集群,修改文件

  1. # 停止集群
  2. cd /data/soft/hadoop-3.2.0/
  3. sbin/stop-all.sh
  4. # 修改文件
  5. cd etc/hadoop/
  6. vi capacity-scheduler.xml

2、修改内容

  1. <property>
  2. <name>yarn.scheduler.capacity.root.queues</name>
  3. <value>default,online,offline</value>
  4. <description>队列列表,多个队列之间使用逗号分割</description>
  5. </property>
  6. <property>
  7. <name>yarn.scheduler.capacity.root.default.capacity</name>
  8. <value>70</value>
  9. <description>default队列70%</description>
  10. </property>
  11. <property>
  12. <name>yarn.scheduler.capacity.root.online.capacity</name>
  13. <value>10</value>
  14. <description>online队列10%</description>
  15. </property>
  16. <property>
  17. <name>yarn.scheduler.capacity.root.offline.capacity</name>
  18. <value>20</value>
  19. <description>offline队列20%</description>
  20. </property>
  21. <property>
  22. <name>yarn.scheduler.capacity.root.default.maximum-capacity</name>
  23. <value>70</value>
  24. <description>Default队列可使用的资源上限.</description>
  25. </property>
  26. <property>
  27. <name>yarn.scheduler.capacity.root.online.maximum-capacity</name>
  28. <value>10</value>
  29. <description>online队列可使用的资源上限.</description>
  30. </property>
  31. <property>
  32. <name>yarn.scheduler.capacity.root.offline.maximum-capacity</name>
  33. <value>20</value>
  34. <description>offline队列可使用的资源上限.</description>
  35. </property>

3、把配置传给其他两台子节点

  1. # 传输配置
  2. scp -rq capacity-scheduler.xml bigdata02:/data/soft/hadoop-3.2.0/etc/hadoop/
  3. scp -rq capacity-scheduler.xml bigdata03:/data/soft/hadoop-3.2.0/etc/hadoop/
  4. # 启动集群
  5. cd /data/soft/hadoop-3.2.0/
  6. sbin/start-all.sh

4、Java代码指定队列,打包上传

  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.fs.Path;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Job;
  6. import org.apache.hadoop.mapreduce.Mapper;
  7. import org.apache.hadoop.mapreduce.Reducer;
  8. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  9. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  10. import org.apache.hadoop.util.GenericOptionsParser;
  11. import org.slf4j.Logger;
  12. import org.slf4j.LoggerFactory;
  13. import java.io.IOException;
  14. /**
  15. * 指定队列名称
  16. */
  17. public class WordCountJobQueue {
  18. /**
  19. * Map阶段
  20. */
  21. public static class MyMapper extends Mapper<LongWritable, Text,Text,LongWritable>{
  22. Logger logger = LoggerFactory.getLogger(MyMapper.class);
  23. /**
  24. * 需要实现map函数
  25. * 这个map函数就是可以接收<k1,v1>,产生<k2,v2>
  26. * @param k1
  27. * @param v1
  28. * @param context
  29. * @throws IOException
  30. * @throws InterruptedException
  31. */
  32. @Override
  33. protected void map(LongWritable k1, Text v1, Context context)
  34. throws IOException, InterruptedException {
  35. //输出k1,v1的值
  36. //System.out.println("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
  37. //logger.info("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
  38. //k1 代表的是每一行数据的行首偏移量,v1代表的是每一行内容
  39. //对获取到的每一行数据进行切割,把单词切割出来
  40. String[] words = v1.toString().split(" ");
  41. //迭代切割出来的单词数据
  42. for (String word : words) {
  43. //把迭代出来的单词封装成<k2,v2>的形式
  44. Text k2 = new Text(word);
  45. LongWritable v2 = new LongWritable(1L);
  46. //把<k2,v2>写出去
  47. context.write(k2,v2);
  48. }
  49. }
  50. }
  51. /**
  52. * Reduce阶段
  53. */
  54. public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
  55. Logger logger = LoggerFactory.getLogger(MyReducer.class);
  56. /**
  57. * 针对<k2,{v2...}>的数据进行累加求和,并且最终把数据转化为k3,v3写出去
  58. * @param k2
  59. * @param v2s
  60. * @param context
  61. * @throws IOException
  62. * @throws InterruptedException
  63. */
  64. @Override
  65. protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context)
  66. throws IOException, InterruptedException {
  67. //创建一个sum变量,保存v2s的和
  68. long sum = 0L;
  69. //对v2s中的数据进行累加求和
  70. for(LongWritable v2: v2s){
  71. //输出k2,v2的值
  72. //System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+">");
  73. //logger.info("<k2,v2>=<"+k2.toString()+","+v2.get()+">");
  74. sum += v2.get();
  75. }
  76. //组装k3,v3
  77. Text k3 = k2;
  78. LongWritable v3 = new LongWritable(sum);
  79. //输出k3,v3的值
  80. //System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
  81. //logger.info("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
  82. // 把结果写出去
  83. context.write(k3,v3);
  84. }
  85. }
  86. /**
  87. * 组装Job=Map+Reduce,设置队列参数
  88. */
  89. public static void main(String[] args) {
  90. try{
  91. //指定Job需要的配置参数
  92. Configuration conf = new Configuration();
  93. //解析命令行中通过-D传递过来的队列参数,添加到conf中
  94. String[] remainingArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  95. //创建一个Job
  96. Job job = Job.getInstance(conf);
  97. //注意了:这一行必须设置,否则在集群中执行的时候是找不到WordCountJob这个类的
  98. job.setJarByClass(WordCountJobQueue.class);
  99. //指定输入路径(可以是文件,也可以是目录)
  100. FileInputFormat.setInputPaths(job,new Path(remainingArgs[0]));
  101. //指定输出路径(只能指定一个不存在的目录)
  102. FileOutputFormat.setOutputPath(job,new Path(remainingArgs[1]));
  103. //指定map相关的代码
  104. job.setMapperClass(MyMapper.class);
  105. //指定k2的类型
  106. job.setMapOutputKeyClass(Text.class);
  107. //指定v2的类型
  108. job.setMapOutputValueClass(LongWritable.class);
  109. //指定reduce相关的代码
  110. job.setReducerClass(MyReducer.class);
  111. //指定k3的类型
  112. job.setOutputKeyClass(Text.class);
  113. //指定v3的类型
  114. job.setOutputValueClass(LongWritable.class);
  115. //提交job
  116. job.waitForCompletion(true);
  117. }catch(Exception e){
  118. e.printStackTrace();
  119. }
  120. }
  121. }

5、执行任务

# 使用-D指定队列,则可以进入该队列执行任务
[root@bigdata01 hadoop-3.2.0]# hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.chaunceyi.mr.WordCountJobQueue -D mapreduce.job.queuename=offline /test/hello.txt /outqueue

image.png

# 不使用-D指定队列参数,则进入默认队列执行任务
[root@bigdata01 hadoop-3.2.0]# hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.chaunceyi.mr.WordCountJobQueue /test/hello.txt /outdefault

image.png