Hadoop 2.7.2 - 图1

1. HDFS

Hadoop 2.7.2 - 图2

1.1. NameNode 与 SecondaryNameNode

  • NameNode中存放edits_inprogress 和 fsimage 的位置:

{$HADOOP_HOME}/data/tmp/dfs/name/current

NameNode的安全模式:
该阶段不允许对HDFS进行访问外的操作,NameNode是只读的;
检查DN上面的块是否损坏
该阶段可以将fsimage和edits载入内存,进行合并。

  • 为什么需要SecondaryNameNode?
  1. 辅助合并edits和fsimage

    1. edits文件记录每次操作,fsimage相当于内存状态的镜像。
    2. 当NN挂掉了,需要将fsimage和edits合并,恢复挂之前的内存状态;
    3. 如果edits太大了,则恢复需要很长时间;
    4. 如果在NN中对fsimage和edits进行定期合并,会占用大量内存和CPU;
    5. 所以借助SecondaryNameNode,在运行过程中辅助NameNode合并edits和fsimage,控制edits大小。
  2. 辅助恢复NameNode

    1. 由于fsimage都是由2NN合并edits产生的,所以2NN中的fsimage一定会比NN中的fsimage新(或相同);
    2. 当NN挂了以后,将2NN中的fsimage拷贝到NN中;
    3. 重启NN,将拷贝的fsimage与edits进行合并,恢复之前的内存状态。
  • CheckPoint设置:查看hadoop-hdfs-2.7.2.jar中的 hdfs-default.xml,有如下几个设置(标红可改):
  1. SecondaryNameNode每隔一个小时执行一次checkpoint。
  2. SecondaryNameNode每分钟检查一次事务操作次数,如果操作次数达到100万次,则执行checkpoint。 ```xml 执行checkpoint的时间间隔为3600秒,即1一个小时执行一次 dfs.namenode.checkpoint.period 3600 The number of seconds between two periodic checkpoints.

每分钟检查一次操作次数,如果操作次数达到100万次,则执行checkpoint

dfs.namenode.checkpoint.txns 1000000 The Secondary NameNode or CheckpointNode will create a checkpoint of the namespace every ‘dfs.namenode.checkpoint.txns’ transactions, regardless of whether ‘dfs.namenode.checkpoint.period’ has expired.

每隔多少秒检查一次操作次数,这里默认是一分钟。

dfs.namenode.checkpoint.check.period 60 The SecondaryNameNode and CheckpointNode will poll the NameNode every ‘dfs.namenode.checkpoint.check.period’ seconds to query the number of uncheckpointed transactions.

  1. <a name="pfxnE"></a>
  2. ## 1.2. HDFS常用命令
  3. > hadoop fs -mkdir /tez
  4. > hadoop fs -put /opt/module/tez-0.9.1 /tez
  5. > hadoop fs -ls /tez/tez-0.9.1
  6. <a name="6OLN3"></a>
  7. ## 1.3. 将文件从本地上传到HDFS的方法
  8. 1. 命令:hadoop fs -put /localfile /hdfsfile
  9. 1. 使用Flume上传
  10. 1. 使用Hive load local inpath上传
  11. <a name="ndNxy"></a>
  12. # 2. MapReduce
  13. MapReduce运行时有三类实例进程:
  14. 1. **MRAppMaster**:负责整个程序的过程调度及状态协调
  15. 1. **MapTask**:负责Map阶段的整个数据处理流程
  16. 1. **ReduceTask**:负责Reduce阶段的整个数据流程处理
  17. 一个MapReduce任务对应一个Job,Job在执行的不同阶段启动若干了Task
  18. 其中MapTask分为 mapPhase 和 sortPhase<br />ReduceTask分为 copyPhase 、 sortPhase 和 reducePhase
  19. <a name="EWEbf"></a>
  20. ## 2.0. MapReduce流程
  21. [语雀内容](https://www.yuque.com/lashuishulaoda/ez5etc/xxf4rs?view=doc_embed)
  22. 1. 客户端调用**InputFormat**对文件进行切片
  23. 1. 执行**MapTask**
  24. 1. **MapTask**类中重写了**run()**方法
  25. 1. 划分阶段:
  26. 1. 如果没有reduce任务,则Map阶段只有map阶段
  27. 1. 如果有reduce任务,则将Map阶段分为map阶段和sort阶段,其中mapPhase占66.7%,sortPhase占33.3%。**------见 code1**
  28. 2. 判断新旧API
  29. 2. 初始化:outputFormat、outputPath、committer等
  30. 2. 根据API启动相应的**Mapper**:
  31. 旧API:org.apache.hadoop.mapred.Mapper——**runNewMapper()**<br />新API:org.apache.hadoop.mapreduce.Mapper——**runOldMapper()**
  32. 1. 启动对应的Mapper **------见 code2**
  33. 1. 创建当前MapTask的**Mapper**对象
  34. 1. 创建当前MapTask的输入格式**InputFormat**对象
  35. 1. 重建当前MapTask的**切片信息**对象split
  36. 1. 创建当前MapTask的**RecordReader**输入对象input
  37. 1. 创建输出收集器NewOutputCollector对象collector
  38. 1. 创建当前MapTask的**MapOutputCollector**输出对象collector,根据**numReduceTasks**选择类型:
  39. 1. **numReduceTasks>0 **选择**MapOutputBuffer**,(map结束后进入到**Shuffle**环节)
  40. 1. 对collector进行初始化(缓冲区的初始化)**------见 code3**
  41. 1. 获取溢写的百分比(默认0.8)
  42. 1. 设置缓冲区的初始大小(默认100m)
  43. 1. 设置排序方式(默认使用快排)
  44. 1. 确定key的比较器
  45. 1. 获取mapper输出的kv类型
  46. 1. 根据key的类型指定序列化器 **------见 code4**
  47. 1. 设置mapper的输出端使用压缩
  48. 1. 设置combiner
  49. 1. 新开一条SpillThread线程
  50. 2. **numReduceTasks<=0 **选择 **DirectMapOutputCollector**,(map结束后跳过**shuffle**和**reduce**,直接输出)
  51. 2. 根据numReduceTasks创建分区器partitioner(默认采用HashPartitioner)
  52. 6. 创建当前MapTask的context对象
  53. 6. 切片文件初始化
  54. 6. 运行mapper.run(),执行业务逻辑。**------见 code5**
  55. 1. 调用setup()方法
  56. 1. 循环调用map()方法
  57. 1. 最终调用cleanup()方法
  58. 9. 完成mapPhase
  59. 9. 设置Phase为SORT
  60. 9. 设置状态为umbilical(中央的,脐带的) statusUpdate(umbilical)
  61. 9. 刷出缓冲区中的数据
  62. 9. mergeParts()
  63. 1. 判断是否进行combiner
  64. code1:阶段划分
  65. ```java
  66. if (isMapTask()) {
  67. // If there are no reducers then there won't be any sort. Hence the map
  68. // phase will govern the entire attempt's progress.
  69. if (conf.getNumReduceTasks() == 0) {
  70. mapPhase = getProgress().addPhase("map", 1.0f);
  71. } else {
  72. // If there are reducers then the entire attempt's progress will be
  73. // split between the map phase (67%) and the sort phase (33%).
  74. mapPhase = getProgress().addPhase("map", 0.667f);
  75. sortPhase = getProgress().addPhase("sort", 0.333f);
  76. }
  77. }

code2:启动Mapper

private <INKEY,INVALUE,OUTKEY,OUTVALUE>
  void runNewMapper(final JobConf job,
                    final TaskSplitIndex splitIndex,
                    final TaskUmbilicalProtocol umbilical,
                    TaskReporter reporter
                    ) throws IOException, ClassNotFoundException,
                             InterruptedException {
    // make a task context so we can get the classes
    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
      new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, 
                                                                  getTaskID(),
                                                                  reporter);
    // make a mapper 一个MapTask只会创建一个Mapper对象            
    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
      (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
        ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
    // make the input format 创建输入格式对象
    org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
      (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
        ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
    // rebuild the input split 重建当前MapTask的切片
    org.apache.hadoop.mapreduce.InputSplit split = null;
    split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
        splitIndex.getStartOffset());
    LOG.info("Processing split: " + split);

     //构建MapTask的输入对象,负责整个MapTask的输入工作,RecordReader由input负责进行调用读取数据
    org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
      new NewTrackingRecordReader<INKEY,INVALUE>
        (split, inputFormat, reporter, taskContext);

    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
    org.apache.hadoop.mapreduce.RecordWriter output = null;

    //构建MapTask的输出对象
    // get an output object
    if (job.getNumReduceTasks() == 0) {
        //如果没有reduce阶段,由Map收集输出的数据,直接输出
      output = 
        new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
    } else {
        // 创建记录收集器
      output = new NewOutputCollector(taskContext, job, umbilical, reporter);
    }

    org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
    mapContext = 
      new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), 
          input, output, 
          committer, 
          reporter, split);

          //构建Mapper中使用的context对象,代表MapTask的上下文(来龙去脉),
    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
        mapperContext = 
          new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
              mapContext);

    try {
        // 会执行输入过程中所需要组件的一系列初始化
        // 调用RecordReader.initialize()
      input.initialize(split, mapperContext);
        // 调用自己编写的Mapper的run()
      mapper.run(mapperContext);
      mapPhase.complete();
      setPhase(TaskStatus.Phase.SORT);
      statusUpdate(umbilical);
      input.close();
      input = null;
      output.close(mapperContext);
      output = null;
    } finally {
      closeQuietly(input);
      closeQuietly(output, mapperContext);
    }
}

code3:收集器对象collector的初始化

public void init(MapOutputCollector.Context context
                    ) throws IOException, ClassNotFoundException {
      job = context.getJobConf();
      reporter = context.getReporter();
      mapTask = context.getMapTask();
      mapOutputFile = mapTask.getMapOutputFile();
      sortPhase = mapTask.getSortPhase();
      spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS);
      partitions = job.getNumReduceTasks();
      rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();

      //sanity checks
      //从配置中读取溢写的百分比,默认读取mapreduce.map.sort.spill.percent
      //如果没有配置,使用0.8作为百分比
      final float spillper =
        job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
      //缓冲区的初始大小,默认读取mapreduce.task.io.sort.mb,如果没有配置,默认使用100(m)
      final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
      indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
                                         INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
      if (spillper > (float)1.0 || spillper <= (float)0.0) {
        throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +
            "\": " + spillper);
      }
      if ((sortmb & 0x7FF) != sortmb) {
        throw new IOException(
            "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);
      }

      //排序默认使用快排
      sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
            QuickSort.class, IndexedSorter.class), job);
      // buffers and accounting
      int maxMemUsage = sortmb << 20;
      maxMemUsage -= maxMemUsage % METASIZE;
      kvbuffer = new byte[maxMemUsage];
      bufvoid = kvbuffer.length;
      kvmeta = ByteBuffer.wrap(kvbuffer)
         .order(ByteOrder.nativeOrder())
         .asIntBuffer();
      setEquator(0);
      bufstart = bufend = bufindex = equator;
      kvstart = kvend = kvindex;

      maxRec = kvmeta.capacity() / NMETA;
      softLimit = (int)(kvbuffer.length * spillper);
      bufferRemaining = softLimit;
      ...

      // k/v serialization
      //获取key的比较器
      comparator = job.getOutputKeyComparator();

      //获取mapper输出的key-value的类型
      keyClass = (Class<K>)job.getMapOutputKeyClass();
      valClass = (Class<V>)job.getMapOutputValueClass();
      serializationFactory = new SerializationFactory(job);

      //根据key的类型返回序列化器
      keySerializer = serializationFactory.getSerializer(keyClass);
      keySerializer.open(bb);
      valSerializer = serializationFactory.getSerializer(valClass);
      valSerializer.open(bb);

      // output counters
      mapOutputByteCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES);
      mapOutputRecordCounter =
        reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
      fileOutputByteCounter = reporter
          .getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);

      // compression
      //在mapper的输出阶段使用压缩
      if (job.getCompressMapOutput()) {
        Class<? extends CompressionCodec> codecClass =
          job.getMapOutputCompressorClass(DefaultCodec.class);
        codec = ReflectionUtils.newInstance(codecClass, job);
      } else {
        codec = null;
      }

      // combiner
      //设置combiner
      final Counters.Counter combineInputCounter =
        reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
      combinerRunner = CombinerRunner.create(job, getTaskID(), 
                                             combineInputCounter,
                                             reporter, null);
      if (combinerRunner != null) {
        final Counters.Counter combineOutputCounter =
          reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
        combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job);
      } else {
        combineCollector = null;
      }
      spillInProgress = false;
      minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);
      spillThread.setDaemon(true);
      spillThread.setName("SpillThread");
      spillLock.lock();
      try {
          //启动spillThread线程
        spillThread.start();
        while (!spillThreadRunning) {
          spillDone.await();
        }
      } catch (InterruptedException e) {
        throw new IOException("Spill thread failed to initialize", e);
      } finally {
        spillLock.unlock();
      }
      if (sortSpillException != null) {
        throw new IOException("Spill thread failed to initialize",
            sortSpillException);
      }
    }

code4:获取key的比较器

public RawComparator getOutputKeyComparator() {
    //尝试获取参数中配置的mapreduce.job.output.key.comparator.class,作为比较器
    //如果没有定义,默认为null,定义的话必须是RawComparator类型
    Class<? extends RawComparator> theClass = getClass(
        JobContext.KEY_COMPARATOR, null, RawComparator.class);
    //如果用户有配置,就实例化此类型的一个对象
    if (theClass != null)
        return ReflectionUtils.newInstance(theClass, this);
    // 判断Mapper输出的key是否是writableComparable类型的子类,
    // 如果是,就默认由系统提供比较器,如果不是就抛出异常
    return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
}

code5:运行Mapper.run(),执行业务逻辑

public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
        while (context.nextKeyValue()) {
            map(context.getCurrentKey(), context.getCurrentValue(), context);
        }
    } finally {
        cleanup(context);
    }
}

2.1. 文件切片

  • Client对文件进行切片,并将切片信息提交给 Yarn ResourseManager。
  • 文件的切片是对文件进行的逻辑划分,真实的大文件是分块保存在HDFS中的。
  • 默认的切片机制是TextInputFormat,按照单个文件为一片;
  • TextInputFormat、CombineTextInputFormat、KeyValueTextInputFormat等都是FileInputFormat的实现类;
  • 自定义InputFormat,继承FileInputFormat类;(FileInputFormat继承InputFormat类)。

InputFormat类通过createRecordReader()方法创建一个RecordReader实例对象,将数据读取为KV形式;

2.1.1. FileInputFormat切片机制

  • 按照文件内容进行切片;
  • 切片大小默等于Block大小;
  • 切片时不考虑数据集整体,而是针对每个文件单独切片,如下图所示,如果切片时考虑数据集整体应该是切出3个100m的切片文件。

2.1.2. TextInputFormat切片机制

框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件大小,都会是一个单独的切片,都会交给一个MapTask。
按行读取每条记录:

  • 键是存储该行在整个文件中的起始字节偏移量,LongWritable类型。
  • 值是这行的内容,不包括任何终止符(换行符和回车符)。

如果有大量小文件,就会产生大量的MapTask;
如果有个别特别大的文件,就会产生数据倾斜,有几个MapTask特别慢,且占用内存特别高。

2.1.3. CombineTextInputFormat切片机制

应对小文件过多的情况。
虚拟存储过程:
将输入目录下所有文件的大小依次和设置的setMaxInputSplitSize值比较:

  • 如果不大于设置的最大值,逻辑上划分为一个块;
  • 如果输入文件大于设置的最大值的两倍,那么以最大值切一块;
  • 如果输入文件大于设置的最大值,但不大于其两倍,将该文件均分为两个虚拟存储块(对半分);
  • 当剩余数据超过设置的最大值且不大于最大值的两倍,

切片过程:
判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,

  • 大于等于则单独形成一个切片;
  • 如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。

2.1.4. KeyValueTextInputFormat

每一行为一条记录,被分隔符分割为key,value;
默认分隔符为tab(\t)。
可以在驱动类中设置分隔符。

2.1.5. NLineInputFormat

按照行数划分切片。

2.2. 自定义类

自定义 继承/实现 功能
InputFormat extends FileInputFormat 定义输入文件的路径、读取方式(RecordReader)
RecordReader extends RecordReader 定义输入的KV,开流读取,实现读取的具体过程
Partitioner extends Partitioner 定义分区规则,分区数需和ReduceTask数量一致
对象 implements Writable 实现序列化,使对象可以提交给框架
key对象 implements WritableComparable 实现序列化与比较器,用于key的传输与比较
Comparator implements RawComparator 定义一个比较器,用于key的比较

2.3. 分区

  1. 在写入环形缓冲区的时候按照分区规则在kvmeta中写入PARTITION。
  2. 分区表示该数据去往哪个ReduceTask,分区数需要和ReduceTask数量一致
    1. 分区数不能大于ReduceTask的数量
    2. 分区数可以小于ReduceTask的数量,但会产生闲置的Rducer,浪费空间
  3. 在SpillThread中会进行sortAndSpill(),对索引进行排序,遍历每个分区依次写入spill文件。

2.4. 比较key

提供一个key的比较器实现RawComparator
或key对象实现WritableComparable

2.5. Combiner

一个可选的中间函数,发生在Map阶段

  1. sortAndSpill()方法中 Sorter.sort()之后循环遍历partitions,每次都判断是否需要进行combiner
  2. mergeParts()方法中 遍历spills文件,判断时都需要进行combiner

用来减少发送到Reducer的数据量,提高网络效率以及Reduce端处理效率。
计算平均值时不能用Combiner

参考文献

https://blog.csdn.net/elpsyco/article/details/100597959
MR全流程** https://blog.csdn.net/qq_39261894/article/details/104630148
环形缓冲区:https://blog.csdn.net/lw305080/article/details/56479170?utm_source=blogxgwz4
Map输出跟踪:https://blog.csdn.net/qq_39261894/article/details/104630148