笔记内容选自慕课网《大数据开发工程师》体系课
5.1 Yarn概述
5.1.1 Yarn的由来
从Hadoop2开始,官方把资源管理单独剥离出来,主要是为了考虑后期作为一个公共的资源管理平台, 任何满足规则的计算引擎都可以在它上面执行
- 实现Hadoop集群的资源共享
- YARN不仅仅支持MapReduce,还支持Spark、Flink等计算引擎
5.1.2 YARN架构分析
- YARN主要负责集群资源的管理和调度,支持主从架构,主节点最多可以有2个,从节点可以有多个
- ResourceManager:主节点主要负责集群资源的分配和管理
- NodeManager:从节点主要负责当前机器资源管理
5.1.3 YARN资源管理模型
- YARN主要管理内存和CPU这两种资源类型
- NodeManager启动时会向ResourceManager注册,注册信息中包含该节点可分配的CPU和内存总量提交注册上去,提交告诉给ResourceManager
当所有的NodeManager注册完成后,ResourceManager它就斯道冒前集群韵资源总量
5.1.4 Yarn的资源信息
虚标的内存和CPU
- yarn.nodemanager.resource.memory-mb:单节点可分配的物理内存总量,默认是8MB*1024,即8G
- yarn.nodemanager.resource.cpu-vcores:单节点可分配的虚拟CPU个数,默认是8
- 可以通过修改 yarn-site.xml 里面的配置
5.2 Yarn调度器
如果你提交了一个很占资源的任务,这一个任务就把集群中90%的资源都占用了,后面别人再提交任务, 剩下的资源就不够用了,这个时候怎么办?
YARN中支持三种调度器,来解决资源调度的问题
5.2.1 FIFO Scheduler
- 先进先出(first in, first out)调度策略
- 大家都是排队的,如果你的任务申请不到足够的资源,那就只能等着
5.2.2 Capacity Scheduler
- FIFO Scheduler的多队列版本「Hadoop2.x开始默认调度器」
- 可以人为的给这些资源定义使用场景,例如图里面的queue A里面运行普通的任务, queueB中运行优先级比较高的任务
- 这两个队列的资源是相互对立的,但是注意一点,队列内部还是按照先进先出的规则
5.2.3 FairScheduler
- 多队列,多用户共享资源
- 假设向一个队列中提交了一个任务,这个任务刚开始会占用整个队列的资源
- 当你再提交第二个任务的时候,第一个任务会把他的资源释放出来一部分给第二个任务使用
5.2.4 查看队列
访问:http://bigdata01:8088/cluster/scheduler
- Capacity,这个是集群的调度器类型
- root,是根的意思,它下面目前只有一个队列
- default,我们之前提交的任务都会进入到这个队列中
5.2.5 YARN多资源队列配置和使用
- 增加online队列和offline队列
- online队列:负责实时任务
- offline队列:负责离线任务
- 向offline队列提交任务
1、停止集群,修改文件
# 停止集群cd /data/soft/hadoop-3.2.0/sbin/stop-all.sh# 修改文件cd etc/hadoop/vi capacity-scheduler.xml
2、修改内容
<property><name>yarn.scheduler.capacity.root.queues</name><value>default,online,offline</value><description>队列列表,多个队列之间使用逗号分割</description></property><property><name>yarn.scheduler.capacity.root.default.capacity</name><value>70</value><description>default队列70%</description></property><property><name>yarn.scheduler.capacity.root.online.capacity</name><value>10</value><description>online队列10%</description></property><property><name>yarn.scheduler.capacity.root.offline.capacity</name><value>20</value><description>offline队列20%</description></property><property><name>yarn.scheduler.capacity.root.default.maximum-capacity</name><value>70</value><description>Default队列可使用的资源上限.</description></property><property><name>yarn.scheduler.capacity.root.online.maximum-capacity</name><value>10</value><description>online队列可使用的资源上限.</description></property><property><name>yarn.scheduler.capacity.root.offline.maximum-capacity</name><value>20</value><description>offline队列可使用的资源上限.</description></property>
3、把配置传给其他两台子节点
# 传输配置scp -rq capacity-scheduler.xml bigdata02:/data/soft/hadoop-3.2.0/etc/hadoop/scp -rq capacity-scheduler.xml bigdata03:/data/soft/hadoop-3.2.0/etc/hadoop/# 启动集群cd /data/soft/hadoop-3.2.0/sbin/start-all.sh
4、Java代码指定队列,打包上传
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;/*** 指定队列名称*/public class WordCountJobQueue {/*** Map阶段*/public static class MyMapper extends Mapper<LongWritable, Text,Text,LongWritable>{Logger logger = LoggerFactory.getLogger(MyMapper.class);/*** 需要实现map函数* 这个map函数就是可以接收<k1,v1>,产生<k2,v2>* @param k1* @param v1* @param context* @throws IOException* @throws InterruptedException*/@Overrideprotected void map(LongWritable k1, Text v1, Context context)throws IOException, InterruptedException {//输出k1,v1的值//System.out.println("<k1,v1>=<"+k1.get()+","+v1.toString()+">");//logger.info("<k1,v1>=<"+k1.get()+","+v1.toString()+">");//k1 代表的是每一行数据的行首偏移量,v1代表的是每一行内容//对获取到的每一行数据进行切割,把单词切割出来String[] words = v1.toString().split(" ");//迭代切割出来的单词数据for (String word : words) {//把迭代出来的单词封装成<k2,v2>的形式Text k2 = new Text(word);LongWritable v2 = new LongWritable(1L);//把<k2,v2>写出去context.write(k2,v2);}}}/*** Reduce阶段*/public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{Logger logger = LoggerFactory.getLogger(MyReducer.class);/*** 针对<k2,{v2...}>的数据进行累加求和,并且最终把数据转化为k3,v3写出去* @param k2* @param v2s* @param context* @throws IOException* @throws InterruptedException*/@Overrideprotected void reduce(Text k2, Iterable<LongWritable> v2s, Context context)throws IOException, InterruptedException {//创建一个sum变量,保存v2s的和long sum = 0L;//对v2s中的数据进行累加求和for(LongWritable v2: v2s){//输出k2,v2的值//System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+">");//logger.info("<k2,v2>=<"+k2.toString()+","+v2.get()+">");sum += v2.get();}//组装k3,v3Text k3 = k2;LongWritable v3 = new LongWritable(sum);//输出k3,v3的值//System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+">");//logger.info("<k3,v3>=<"+k3.toString()+","+v3.get()+">");// 把结果写出去context.write(k3,v3);}}/*** 组装Job=Map+Reduce,设置队列参数*/public static void main(String[] args) {try{//指定Job需要的配置参数Configuration conf = new Configuration();//解析命令行中通过-D传递过来的队列参数,添加到conf中String[] remainingArgs = new GenericOptionsParser(conf, args).getRemainingArgs();//创建一个JobJob job = Job.getInstance(conf);//注意了:这一行必须设置,否则在集群中执行的时候是找不到WordCountJob这个类的job.setJarByClass(WordCountJobQueue.class);//指定输入路径(可以是文件,也可以是目录)FileInputFormat.setInputPaths(job,new Path(remainingArgs[0]));//指定输出路径(只能指定一个不存在的目录)FileOutputFormat.setOutputPath(job,new Path(remainingArgs[1]));//指定map相关的代码job.setMapperClass(MyMapper.class);//指定k2的类型job.setMapOutputKeyClass(Text.class);//指定v2的类型job.setMapOutputValueClass(LongWritable.class);//指定reduce相关的代码job.setReducerClass(MyReducer.class);//指定k3的类型job.setOutputKeyClass(Text.class);//指定v3的类型job.setOutputValueClass(LongWritable.class);//提交jobjob.waitForCompletion(true);}catch(Exception e){e.printStackTrace();}}}
5、执行任务
# 使用-D指定队列,则可以进入该队列执行任务
[root@bigdata01 hadoop-3.2.0]# hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.chaunceyi.mr.WordCountJobQueue -D mapreduce.job.queuename=offline /test/hello.txt /outqueue

# 不使用-D指定队列参数,则进入默认队列执行任务
[root@bigdata01 hadoop-3.2.0]# hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.chaunceyi.mr.WordCountJobQueue /test/hello.txt /outdefault



