1. Yarn

2. windows向yarn提交源码任务

  1. 在Configuration配置文件添加yarn的配置属性
  2. 用Maven 构建jar
  3. 修改job加载驱动类为 打包后的jar包
  4. 12. Yarn - 图1

驱动类编码

  1. package com.mywordcount;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.IntWritable;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.mapreduce.Job;
  8. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  9. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  10. public class WcDriver {
  11. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  12. // 1 获取配置信息以及封装任务
  13. Configuration configuration = new Configuration();
  14. configuration.set("fs.defaultFS", "hdfs://hadoop102:8020");
  15. configuration.set("mapreduce.framework.name", "yarn");
  16. configuration.set("mapreduce.app-submission.cross-platform", "true");
  17. configuration.set("yarn.resourcemanager.hostname", "hadoop103");
  18. Job job = Job.getInstance(configuration);
  19. // 2 设置jar加载路径
  20. // job.setJarByClass(WcDriver.class);
  21. job.setJar("D:\\code\\mapreduce1\\target\\mapreduce1-1.0-SNAPSHOT.jar");
  22. // 3 设置map和reduce类
  23. job.setMapperClass(WcMapper.class);
  24. job.setReducerClass(WcReducer.class);
  25. // 4 设置map输出
  26. job.setMapOutputKeyClass(Text.class);
  27. job.setMapOutputValueClass(IntWritable.class);
  28. // 5 设置最终输出kv类型
  29. job.setOutputKeyClass(Text.class);
  30. job.setOutputValueClass(IntWritable.class);
  31. // 6 设置输入和输出路径
  32. FileInputFormat.setInputPaths(job, new Path(args[0]));
  33. FileOutputFormat.setOutputPath(job, new Path(args[1]));
  34. // 7 提交
  35. boolean result = job.waitForCompletion(true);
  36. System.exit(result ? 0 : 1);
  37. }
  38. }

3. 数据压缩

采用压缩技术减少了磁盘IO 但同时增加了CPU运算负担 所以压缩特性运用得当能提高性能 但运用不当也可能降低性能

压缩格式 hadoop自带? 算法 文件扩展名 是否可切分 换成压缩格式后,原来的程序是否需要修改
DEFLATE 是,直接使用 DEFLATE .deflate 和文本处理一样,不需要修改
Gzip 是,直接使用 DEFLATE .gz 和文本处理一样,不需要修改
bzip2 是,直接使用 bzip2 .bz2 和文本处理一样,不需要修改
LZO 否,需要安装 LZO .lzo 需要建索引,还需要指定输入格式
Snappy 否,需要安装 Snappy .snappy 和文本处理一样,不需要修改

常用Snappy压缩 因为较高 其次是LZO

不同阶段开启压缩:

  1. 如果输入阶段时为压缩包 则直接传递即可无需更改 Hadoop自动解压缩并处理

  2. shuffle阶段 在驱动类设置开启压缩 并指定压缩格式

    1. //开启压缩模式
    2. configuration.setBoolean("mapreduce.map.output.compress", true);
    3. //压缩格式为
    4. configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class,
    5. CompressionCodec.class);
  1. reduce阶段 输出压缩
    1. //reduce阶段压缩
    2. configuration.setBoolean("mapreduce.output.fileoutputformat.compress", true);
    3. //指定压缩格式
    4. configuration.setClass("mapreduce.output.fileoutputformat.compress.codec", SnappyCodec.class,
    5. CompressionCodec.class);

3.1. Hadoop压缩和解压

  1. package com.compression;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.FSDataInputStream;
  4. import org.apache.hadoop.fs.FSDataOutputStream;
  5. import org.apache.hadoop.fs.FileSystem;
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.io.IOUtils;
  8. import org.apache.hadoop.io.compress.*;
  9. import org.apache.hadoop.util.ReflectionUtils;
  10. import java.io.IOException;
  11. public class TestCompression {
  12. public static void main(String[] args) throws IOException {
  13. compress("d:/phone_data.txt", BZip2Codec.class);
  14. decompress("d:/phone_data.txt.bz2");
  15. }
  16. //解压
  17. private static void decompress(String file) throws IOException {
  18. Configuration configuration = new Configuration();
  19. //生成压缩格式工厂对象
  20. CompressionCodecFactory codecFactory = new CompressionCodecFactory(configuration);
  21. //根据压缩格式工厂获取压缩对象
  22. CompressionCodec codec = codecFactory.getCodec(new Path(file));
  23. //输入流
  24. FileSystem fileSystem = FileSystem.get(configuration);
  25. FSDataInputStream fsDataInputStream = fileSystem.open(new Path(file));
  26. CompressionInputStream cis = codec.createInputStream(fsDataInputStream);
  27. //输出流
  28. String outputFile = file.substring(0, file.length() - codec.getDefaultExtension().length()); //获取文件名
  29. FSDataOutputStream fos = fileSystem.create(new Path(outputFile));
  30. IOUtils.copyBytes(cis, fos, 1024);//复制流 缓存为1024字节
  31. //关闭流
  32. IOUtils.closeStream(cis);
  33. IOUtils.closeStream(fos);
  34. }
  35. //压缩
  36. private static void compress(String file, Class<? extends CompressionCodec> codecClass) throws IOException {
  37. Configuration configuration = new Configuration();
  38. FileSystem fileSystem = FileSystem.get(configuration);
  39. //生成压缩格式对象
  40. CompressionCodec codec = ReflectionUtils.newInstance(codecClass, configuration);
  41. //开输入流
  42. FSDataInputStream fis = fileSystem.open(new Path(file));
  43. //输出流
  44. FSDataOutputStream fos = fileSystem.create(new Path(file + codec.getDefaultExtension()));
  45. //用压缩格式包装输出流
  46. CompressionOutputStream cos = codec.createOutputStream(fos);
  47. IOUtils.copyBytes(fis, cos, 1024);
  48. IOUtils.closeStream(fis);
  49. IOUtils.closeStream(cos);
  50. }
  51. }

4. Yarn架构

YARN主要由ResourceManager、NodeManager、ApplicationMaster和Container等组件构成。

12. Yarn - 图2

12. Yarn - 图3

  1. MR程序提交到客户端所在的节点。
  2. YarnRunner向ResourceManager申请一个Application。
  3. RM将该应用程序的资源路径返回给YarnRunner。
  4. 该程序将运行所需资源提交到HDFS上。
  5. 程序资源提交完毕后,申请运行mrAppMaster。
  6. RM将用户的请求初始化成一个Task。
  7. 其中一个NodeManager领取到Task任务。
  8. 该NodeManager创建容器Container,并产生MRAppmaster。
  9. Container从HDFS上拷贝资源到本地。
  10. MRAppmaster向RM 申请运行MapTask资源。
  11. RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。
  12. MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序。
  13. MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。
  14. ReduceTask向MapTask获取相应分区的数据。
  15. 程序运行完毕后,MR会向RM申请注销自己。

5. 资源调度器

目前,Hadoop作业调度器主要有三种:FIFO、Capacity Scheduler和Fair Scheduler。Hadoop3.1.3默认的资源调度器是Capacity Scheduler。

通过yarn-default.xml配置

  1. <property>
  2. <description>The class to use as the resource scheduler.</description>
  3. <name>yarn.resourcemanager.scheduler.class</name>
  4. <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
  5. </property>
  1. 先进先出调度器(FIFO)
    12. Yarn - 图4

  2. 容量调度器(Capacity Scheduler)
    12. Yarn - 图5

  3. 公平调度器(Fair Scheduler)
    12. Yarn - 图6

12. Yarn - 图7

6. 容器调度器多队列配置

容量调度器默认为1个队列 default 通过修改capacity-scheduler.xml文件来配置多队列

  1. vim /opt/module/hadoop-3.1.3/etc/hadoop/capacity-scheduler.xml #建议用图像界面
  1. 修改yarn.scheduler.capacity.root.queues的value 添加新的队列
    1. <!-- 默认为default队列 可以设置多条队列-->
    2. <property>
    3. <name>yarn.scheduler.capacity.root.queues</name>
    4. <value>default,hive</value>
    5. <description>
    6. The queues at the this level (root is the root queue).
    7. </description>
    8. </property>
  1. 修改default队列占比为40
    1. <!-- default队列默认占比为100 改为百分之40 剩下交给hive -->
    2. <property>
    3. <name>yarn.scheduler.capacity.root.default.capacity</name>
    4. <value>40</value>
    5. <description>Default queue target capacity.</description>
    6. </property>
  1. 修改default队列允许的最大占比为60
    1. <!--default队列最大占比默认为100 改为60 -->
    2. <property>
    3. <name>yarn.scheduler.capacity.root.default.maximum-capacity</name>
    4. <value>60</value>
    5. <description>
    6. The maximum capacity of the default queue.
    7. </description>
    8. </property>
  1. 把default队列的配置属性复制一份 修改为新增队列名hive 并删除其中的description标签

    1. <!--hive队列设置-->
    2. <property>
    3. <name>yarn.scheduler.capacity.root.hive.capacity</name>
    4. <value>60</value>
    5. </property>
    6. <property>
    7. <name>yarn.scheduler.capacity.root.hive.user-limit-factor</name>
    8. <value>1</value>
    9. </property>
    10. <property>
    11. <name>yarn.scheduler.capacity.root.hive.maximum-capacity</name>
    12. <value>80</value>
    13. </property>
    14. <property>
    15. <name>yarn.scheduler.capacity.root.hive.state</name>
    16. <value>RUNNING</value>
    17. </property>
    18. <property>
    19. <name>yarn.scheduler.capacity.root.hive.acl_submit_applications</name>
    20. <value>*</value>
    21. </property>
    22. <property>
    23. <name>yarn.scheduler.capacity.root.hive.acl_administer_queue</name>
    24. <value>*</value>
    25. </property>
    26. <property>
    27. <name>yarn.scheduler.capacity.root.hive.acl_application_max_priority</name>
    28. <value>*</value>
    29. </property>
    30. <property>
    31. <name>yarn.scheduler.capacity.root.hive.maximum-application-lifetime
    32. </name>
    33. <value>-1</value>
    34. </property>
    35. <property>
    36. <name>yarn.scheduler.capacity.root.hive.default-application-lifetime
    37. </name>
    38. <value>-1</value>
    39. </property>
    40. <!--hive队列设置结束-->
  1. 同步到其他集群中
    1. xsync /opt/module/hadoop-3.1.3/etc/hadoop/capacity-scheduler.xml
  1. 重启hadoop yarn
    1. stop-yarn.sh #103中
    2. start-yarn.sh

12. Yarn - 图8

6.1. 多队列提交任务

通过configuration设置 mapred.job.queue.name为指定队列名

  1. configuration.set("mapred.job.queue.name","hive");

7. 任务的推测执行

  1. 推测执行机制
    APPmstr 会监控任务的运行速度如果某个任务运行速度远慢于平均任务 则为拖后腿的任务启动一个备份任务同时运行 谁先运行完 则采取谁的结果

  2. 执行推测任务的前提

    1. 每个task只能有一个备份任务

    2. 当前job已经完成的task必须不小于 5%

    3. 开启了推测执行设置 默认为打开的 在mapred-site.xml设置

      1. <property>
      2. <name>mapreduce.map.speculative</name>
      3. <value>true</value>
      4. </property>
      5. <property>
      6. <name>mapreduce.reduce.speculative</name>
      7. <value>true</value>
      8. </property>
  1. 不能启用推测执行机制情况

    1. 任务间存在严重的负载倾斜
    2. 特殊任务 如任务向数据库中写数据

12. Yarn - 图9