笔记内容选自慕课网《大数据开发工程师》体系课
5.1 Yarn概述
5.1.1 Yarn的由来
从Hadoop2开始,官方把资源管理单独剥离出来,主要是为了考虑后期作为一个公共的资源管理平台, 任何满足规则的计算引擎都可以在它上面执行
- 实现Hadoop集群的资源共享
- YARN不仅仅支持MapReduce,还支持Spark、Flink等计算引擎
5.1.2 YARN架构分析
- YARN主要负责集群资源的管理和调度,支持主从架构,主节点最多可以有2个,从节点可以有多个
- ResourceManager:主节点主要负责集群资源的分配和管理
- NodeManager:从节点主要负责当前机器资源管理
5.1.3 YARN资源管理模型
- YARN主要管理内存和CPU这两种资源类型
- NodeManager启动时会向ResourceManager注册,注册信息中包含该节点可分配的CPU和内存总量提交注册上去,提交告诉给ResourceManager
当所有的NodeManager注册完成后,ResourceManager它就斯道冒前集群韵资源总量
5.1.4 Yarn的资源信息
虚标的内存和CPU
- yarn.nodemanager.resource.memory-mb:单节点可分配的物理内存总量,默认是8MB*1024,即8G
- yarn.nodemanager.resource.cpu-vcores:单节点可分配的虚拟CPU个数,默认是8
- 可以通过修改 yarn-site.xml 里面的配置
5.2 Yarn调度器
如果你提交了一个很占资源的任务,这一个任务就把集群中90%的资源都占用了,后面别人再提交任务, 剩下的资源就不够用了,这个时候怎么办?
YARN中支持三种调度器,来解决资源调度的问题
5.2.1 FIFO Scheduler
- 先进先出(first in, first out)调度策略
- 大家都是排队的,如果你的任务申请不到足够的资源,那就只能等着
5.2.2 Capacity Scheduler
- FIFO Scheduler的多队列版本「Hadoop2.x开始默认调度器」
- 可以人为的给这些资源定义使用场景,例如图里面的queue A里面运行普通的任务, queueB中运行优先级比较高的任务
- 这两个队列的资源是相互对立的,但是注意一点,队列内部还是按照先进先出的规则
5.2.3 FairScheduler
- 多队列,多用户共享资源
- 假设向一个队列中提交了一个任务,这个任务刚开始会占用整个队列的资源
- 当你再提交第二个任务的时候,第一个任务会把他的资源释放出来一部分给第二个任务使用
5.2.4 查看队列
访问:http://bigdata01:8088/cluster/scheduler
- Capacity,这个是集群的调度器类型
- root,是根的意思,它下面目前只有一个队列
- default,我们之前提交的任务都会进入到这个队列中
5.2.5 YARN多资源队列配置和使用
- 增加online队列和offline队列
- online队列:负责实时任务
- offline队列:负责离线任务
- 向offline队列提交任务
1、停止集群,修改文件
# 停止集群
cd /data/soft/hadoop-3.2.0/
sbin/stop-all.sh
# 修改文件
cd etc/hadoop/
vi capacity-scheduler.xml
2、修改内容
<property>
<name>yarn.scheduler.capacity.root.queues</name>
<value>default,online,offline</value>
<description>队列列表,多个队列之间使用逗号分割</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.default.capacity</name>
<value>70</value>
<description>default队列70%</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.online.capacity</name>
<value>10</value>
<description>online队列10%</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.offline.capacity</name>
<value>20</value>
<description>offline队列20%</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.default.maximum-capacity</name>
<value>70</value>
<description>Default队列可使用的资源上限.</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.online.maximum-capacity</name>
<value>10</value>
<description>online队列可使用的资源上限.</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.offline.maximum-capacity</name>
<value>20</value>
<description>offline队列可使用的资源上限.</description>
</property>
3、把配置传给其他两台子节点
# 传输配置
scp -rq capacity-scheduler.xml bigdata02:/data/soft/hadoop-3.2.0/etc/hadoop/
scp -rq capacity-scheduler.xml bigdata03:/data/soft/hadoop-3.2.0/etc/hadoop/
# 启动集群
cd /data/soft/hadoop-3.2.0/
sbin/start-all.sh
4、Java代码指定队列,打包上传
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* 指定队列名称
*/
public class WordCountJobQueue {
/**
* Map阶段
*/
public static class MyMapper extends Mapper<LongWritable, Text,Text,LongWritable>{
Logger logger = LoggerFactory.getLogger(MyMapper.class);
/**
* 需要实现map函数
* 这个map函数就是可以接收<k1,v1>,产生<k2,v2>
* @param k1
* @param v1
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
//输出k1,v1的值
//System.out.println("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
//logger.info("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
//k1 代表的是每一行数据的行首偏移量,v1代表的是每一行内容
//对获取到的每一行数据进行切割,把单词切割出来
String[] words = v1.toString().split(" ");
//迭代切割出来的单词数据
for (String word : words) {
//把迭代出来的单词封装成<k2,v2>的形式
Text k2 = new Text(word);
LongWritable v2 = new LongWritable(1L);
//把<k2,v2>写出去
context.write(k2,v2);
}
}
}
/**
* Reduce阶段
*/
public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
Logger logger = LoggerFactory.getLogger(MyReducer.class);
/**
* 针对<k2,{v2...}>的数据进行累加求和,并且最终把数据转化为k3,v3写出去
* @param k2
* @param v2s
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context)
throws IOException, InterruptedException {
//创建一个sum变量,保存v2s的和
long sum = 0L;
//对v2s中的数据进行累加求和
for(LongWritable v2: v2s){
//输出k2,v2的值
//System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+">");
//logger.info("<k2,v2>=<"+k2.toString()+","+v2.get()+">");
sum += v2.get();
}
//组装k3,v3
Text k3 = k2;
LongWritable v3 = new LongWritable(sum);
//输出k3,v3的值
//System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
//logger.info("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
// 把结果写出去
context.write(k3,v3);
}
}
/**
* 组装Job=Map+Reduce,设置队列参数
*/
public static void main(String[] args) {
try{
//指定Job需要的配置参数
Configuration conf = new Configuration();
//解析命令行中通过-D传递过来的队列参数,添加到conf中
String[] remainingArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
//创建一个Job
Job job = Job.getInstance(conf);
//注意了:这一行必须设置,否则在集群中执行的时候是找不到WordCountJob这个类的
job.setJarByClass(WordCountJobQueue.class);
//指定输入路径(可以是文件,也可以是目录)
FileInputFormat.setInputPaths(job,new Path(remainingArgs[0]));
//指定输出路径(只能指定一个不存在的目录)
FileOutputFormat.setOutputPath(job,new Path(remainingArgs[1]));
//指定map相关的代码
job.setMapperClass(MyMapper.class);
//指定k2的类型
job.setMapOutputKeyClass(Text.class);
//指定v2的类型
job.setMapOutputValueClass(LongWritable.class);
//指定reduce相关的代码
job.setReducerClass(MyReducer.class);
//指定k3的类型
job.setOutputKeyClass(Text.class);
//指定v3的类型
job.setOutputValueClass(LongWritable.class);
//提交job
job.waitForCompletion(true);
}catch(Exception e){
e.printStackTrace();
}
}
}
5、执行任务
# 使用-D指定队列,则可以进入该队列执行任务
[root@bigdata01 hadoop-3.2.0]# hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.chaunceyi.mr.WordCountJobQueue -D mapreduce.job.queuename=offline /test/hello.txt /outqueue
# 不使用-D指定队列参数,则进入默认队列执行任务
[root@bigdata01 hadoop-3.2.0]# hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.chaunceyi.mr.WordCountJobQueue /test/hello.txt /outdefault