1. Yarn
2. windows向yarn提交源码任务
- 在Configuration配置文件添加yarn的配置属性
- 用Maven 构建jar
- 修改job加载驱动类为 打包后的jar包
驱动类编码
package com.mywordcount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WcDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1 获取配置信息以及封装任务
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", "hdfs://hadoop102:8020");
configuration.set("mapreduce.framework.name", "yarn");
configuration.set("mapreduce.app-submission.cross-platform", "true");
configuration.set("yarn.resourcemanager.hostname", "hadoop103");
Job job = Job.getInstance(configuration);
// 2 设置jar加载路径
// job.setJarByClass(WcDriver.class);
job.setJar("D:\\code\\mapreduce1\\target\\mapreduce1-1.0-SNAPSHOT.jar");
// 3 设置map和reduce类
job.setMapperClass(WcMapper.class);
job.setReducerClass(WcReducer.class);
// 4 设置map输出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5 设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6 设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 提交
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
3. 数据压缩
采用压缩技术减少了磁盘IO 但同时增加了CPU运算负担 所以压缩特性运用得当能提高性能 但运用不当也可能降低性能
压缩格式 | hadoop自带? | 算法 | 文件扩展名 | 是否可切分 | 换成压缩格式后,原来的程序是否需要修改 |
---|---|---|---|---|---|
DEFLATE | 是,直接使用 | DEFLATE | .deflate | 否 | 和文本处理一样,不需要修改 |
Gzip | 是,直接使用 | DEFLATE | .gz | 否 | 和文本处理一样,不需要修改 |
bzip2 | 是,直接使用 | bzip2 | .bz2 | 是 | 和文本处理一样,不需要修改 |
LZO | 否,需要安装 | LZO | .lzo | 是 | 需要建索引,还需要指定输入格式 |
Snappy | 否,需要安装 | Snappy | .snappy | 否 | 和文本处理一样,不需要修改 |
常用Snappy压缩 因为较高 其次是LZO
不同阶段开启压缩:
如果输入阶段时为压缩包 则直接传递即可无需更改 Hadoop自动解压缩并处理
shuffle阶段 在驱动类设置开启压缩 并指定压缩格式
//开启压缩模式
configuration.setBoolean("mapreduce.map.output.compress", true);
//压缩格式为
configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class,
CompressionCodec.class);
- reduce阶段 输出压缩
//reduce阶段压缩
configuration.setBoolean("mapreduce.output.fileoutputformat.compress", true);
//指定压缩格式
configuration.setClass("mapreduce.output.fileoutputformat.compress.codec", SnappyCodec.class,
CompressionCodec.class);
3.1. Hadoop压缩和解压
package com.compression;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.IOException;
public class TestCompression {
public static void main(String[] args) throws IOException {
compress("d:/phone_data.txt", BZip2Codec.class);
decompress("d:/phone_data.txt.bz2");
}
//解压
private static void decompress(String file) throws IOException {
Configuration configuration = new Configuration();
//生成压缩格式工厂对象
CompressionCodecFactory codecFactory = new CompressionCodecFactory(configuration);
//根据压缩格式工厂获取压缩对象
CompressionCodec codec = codecFactory.getCodec(new Path(file));
//输入流
FileSystem fileSystem = FileSystem.get(configuration);
FSDataInputStream fsDataInputStream = fileSystem.open(new Path(file));
CompressionInputStream cis = codec.createInputStream(fsDataInputStream);
//输出流
String outputFile = file.substring(0, file.length() - codec.getDefaultExtension().length()); //获取文件名
FSDataOutputStream fos = fileSystem.create(new Path(outputFile));
IOUtils.copyBytes(cis, fos, 1024);//复制流 缓存为1024字节
//关闭流
IOUtils.closeStream(cis);
IOUtils.closeStream(fos);
}
//压缩
private static void compress(String file, Class<? extends CompressionCodec> codecClass) throws IOException {
Configuration configuration = new Configuration();
FileSystem fileSystem = FileSystem.get(configuration);
//生成压缩格式对象
CompressionCodec codec = ReflectionUtils.newInstance(codecClass, configuration);
//开输入流
FSDataInputStream fis = fileSystem.open(new Path(file));
//输出流
FSDataOutputStream fos = fileSystem.create(new Path(file + codec.getDefaultExtension()));
//用压缩格式包装输出流
CompressionOutputStream cos = codec.createOutputStream(fos);
IOUtils.copyBytes(fis, cos, 1024);
IOUtils.closeStream(fis);
IOUtils.closeStream(cos);
}
}
4. Yarn架构
YARN主要由ResourceManager、NodeManager、ApplicationMaster和Container等组件构成。
- MR程序提交到客户端所在的节点。
- YarnRunner向ResourceManager申请一个Application。
- RM将该应用程序的资源路径返回给YarnRunner。
- 该程序将运行所需资源提交到HDFS上。
- 程序资源提交完毕后,申请运行mrAppMaster。
- RM将用户的请求初始化成一个Task。
- 其中一个NodeManager领取到Task任务。
- 该NodeManager创建容器Container,并产生MRAppmaster。
- Container从HDFS上拷贝资源到本地。
- MRAppmaster向RM 申请运行MapTask资源。
- RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。
- MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序。
- MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。
- ReduceTask向MapTask获取相应分区的数据。
- 程序运行完毕后,MR会向RM申请注销自己。
5. 资源调度器
目前,Hadoop作业调度器主要有三种:FIFO、Capacity Scheduler和Fair Scheduler。Hadoop3.1.3默认的资源调度器是Capacity Scheduler。
通过yarn-default.xml配置
<property>
<description>The class to use as the resource scheduler.</description>
<name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
</property>
先进先出调度器(FIFO)
容量调度器(Capacity Scheduler)
公平调度器(Fair Scheduler)
6. 容器调度器多队列配置
容量调度器默认为1个队列 default 通过修改capacity-scheduler.xml文件来配置多队列
vim /opt/module/hadoop-3.1.3/etc/hadoop/capacity-scheduler.xml #建议用图像界面
- 修改yarn.scheduler.capacity.root.queues的value 添加新的队列
<!-- 默认为default队列 可以设置多条队列-->
<property>
<name>yarn.scheduler.capacity.root.queues</name>
<value>default,hive</value>
<description>
The queues at the this level (root is the root queue).
</description>
</property>
- 修改default队列占比为40
<!-- default队列默认占比为100 改为百分之40 剩下交给hive -->
<property>
<name>yarn.scheduler.capacity.root.default.capacity</name>
<value>40</value>
<description>Default queue target capacity.</description>
</property>
- 修改default队列允许的最大占比为60
<!--default队列最大占比默认为100 改为60 -->
<property>
<name>yarn.scheduler.capacity.root.default.maximum-capacity</name>
<value>60</value>
<description>
The maximum capacity of the default queue.
</description>
</property>
把default队列的配置属性复制一份 修改为新增队列名hive 并删除其中的description标签
<!--hive队列设置-->
<property>
<name>yarn.scheduler.capacity.root.hive.capacity</name>
<value>60</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.hive.user-limit-factor</name>
<value>1</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.hive.maximum-capacity</name>
<value>80</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.hive.state</name>
<value>RUNNING</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.hive.acl_submit_applications</name>
<value>*</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.hive.acl_administer_queue</name>
<value>*</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.hive.acl_application_max_priority</name>
<value>*</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.hive.maximum-application-lifetime
</name>
<value>-1</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.hive.default-application-lifetime
</name>
<value>-1</value>
</property>
<!--hive队列设置结束-->
- 同步到其他集群中
xsync /opt/module/hadoop-3.1.3/etc/hadoop/capacity-scheduler.xml
- 重启hadoop yarn
stop-yarn.sh #103中
start-yarn.sh
6.1. 多队列提交任务
通过configuration设置 mapred.job.queue.name为指定队列名
configuration.set("mapred.job.queue.name","hive");
7. 任务的推测执行
推测执行机制
APPmstr 会监控任务的运行速度如果某个任务运行速度远慢于平均任务 则为拖后腿的任务启动一个备份任务同时运行 谁先运行完 则采取谁的结果执行推测任务的前提
每个task只能有一个备份任务
当前job已经完成的task必须不小于 5%
开启了推测执行设置 默认为打开的 在mapred-site.xml设置
<property>
<name>mapreduce.map.speculative</name>
<value>true</value>
</property>
<property>
<name>mapreduce.reduce.speculative</name>
<value>true</value>
</property>
不能启用推测执行机制情况
- 任务间存在严重的负载倾斜
- 特殊任务 如任务向数据库中写数据