参考:

  • Hadoop.The.Definitive.Guide.4th

在我们的例子里,要写一个挖掘气象数据的程序(分析每年的全球最高气温是多少)。分布在全球各地的很多气象传感器每隔一小时收集气象数据和收集大量日志数据,由于我们希望处理所有这些数据,而且这些数据是半结构化的且是按照记录方式存储的,因此非常适合用MapReduce来分析。

我们使用的数据来自美国国家气候数据中心 (National Climatic Data Center,简称 NCDC) 。

示例代码及数据下载:

数据格式

这些数据按行并以ASCII格式存储,其中每一行是一条记录。 该存储格式支持丰富的气象要素,其中许多要素可以选择性地列入收集范围或其数据所需的存储长度是可变的。为了简单起见,我们重点讨论一些基本要素(比如气温),这些要素始终都有而且长度都是固定的。

范例2-1显示了一行采样数据,其中重要字段加了注释。这一行数据被分成很多行以突出每个字段,但在实际文件中,这些字段合并成一行,没有任何分隔符。

范例2-1.国家气候数据中心数据记录的格式

  1. 0057
  2. 332130 # USAF气象站标识 USAF weather station identifier
  3. 99999 # WBAN气象站标识 WBAN weather station identifier
  4. 19500101 # 观测日期 observation date
  5. 0300 # 观测时间 observation time
  6. 4
  7. +51317 # 纬度 latitude (degrees x 1000)
  8. +028783 # 精度 lon8gitude (degrees x 1000)
  9. FM-12
  10. +0171 # 海拔 elevation (meters)
  11. 99999
  12. V020
  13. 320 # 风向 wind direction (degrees)
  14. 1 # quality code
  15. N
  16. 0072
  17. 1
  18. 00450 # sky ceiling height (meters)
  19. 1 # quality code
  20. C
  21. N
  22. 010000 # visibility distance (meters)
  23. 1 # quality code
  24. N
  25. 9
  26. -0128 # 10x 空气温度 air temperature (degrees Celsius x 10)
  27. 1 # quality code
  28. -0139 # 10x 露点温度 dew point temperature (degrees Celsius x 10)
  29. 1 # quality code
  30. 10268 # 10x 大气压强 atmospheric pressure (hectopascals x 10)
  31. 1 # quality code

数据文件按照日期和气象站进行组织。从1901年到2001年,每一年都有一个目录,每一个目录中包含各个气象站该年气象数据的打包文件及其说明文件。例如,1990年对应文件夹下面就包含下面的记录:

  1. $ ls raw/1990 | head
  2. 010010-99999-1990.gz
  3. 010014-99999-1990.gz
  4. 010015-99999-1990.gz
  5. 010016-99999-1990.gz

气象台有成千上万个,所以整个数据集由大量的小文件组成。通常情况下,处理少量的大型文件更容易、更有效,因此,这些数据需要经过预处理(详情参见最后的参考部分),将每年的数据文件拼接成一个单独的文件。

使用Unix工具分析数据

在这个数据集中,每年全球气温的最高记录是多少?我们先不使用Hadoop来解决这个问题,因为只有提供了性能基准和结果检查工具,才能和Hadoop进行有效的对比。
传统处理按行存储数据的工具是awk。范例2-2是一个程序脚本,用于计算每年的最高气温。
范例2-2. 该程序从NCDC气象记录中找出每年最高气温。

  1. #!/usr/bin/env bash
  2. # 循环遍历,按年,压缩的数据文件
  3. for year in all/*
  4. do
  5. # 显示年份
  6. # -n 不输出结尾的换行符
  7. # -e 启用反斜线转义的解释
  8. echo -ne `basename $year .gz`"\t"
  9. # 解压缩,内容输出到标准输出
  10. gunzip -c $year | \
  11. # 按行处理,提取气温和质量代码
  12. # temp=气温,+0转换为整数
  13. # q=质量代码
  14. # if判断,值是否有效,最高气温
  15. # NCDC数据集中缺失的值会用9999替代
  16. # 质量代码,是01459之一,通过检测
  17. # 数据读取正确,且temp>max,temp为最高气温,赋值给max
  18. # END块,输出最高气温
  19. awk '{ temp = substr($0, 88, 5) + 0;
  20. q = substr($0, 93, 1);
  21. if (temp !=9999 && q ~ /[01459]/ && temp > max) max = temp }
  22. END { print max }'
  23. done

执行结果(其实部分):

  1. $ ./max_temperature.sh
  2. 1901 317
  3. 1902 244
  4. 1903 289
  5. 1904 256
  6. 1905 283
  7. ...

由于源文件中的气温值被放大10倍,所以1901年的最高气温是31.7C (20世纪初记录的气温数据比较少,所以这个结果也是可能的)。使用亚马逊的EC2 High-CPU Extra Large Instance 运行这个程序,只需要42分钟就可以处理完一个世纪的气象数据,找出最高气温。
image.png

(并行处理)提高分析速度及数据量

为了加快处理速度,我们需要并行处理程序来进行数据分析。从理论上讲,这很简单:我们可以使用计算机上所有可用的硬件线程(hardware thread)来处理,每个线程负责处理不同年份的数据。但这样做仍然存在一些问题。

首先,将任务划分成大小相同的作业通常并不是一件容易的事情。在我们这个例子中,不同年份数据文件的大小差异很大,所以有一部分线程会比其他线程更早结束运行。即使可以再为它们分配下一个作业,但总的运行时间仍然取决于处理最长文件所需要的时间。另一种更好的方法是将输入数据分成固定大小的块(chunk),然后每块分到各个进程去执行,这样一来,即使有一些进程可以处理更多数据,我们也可以为它们分配更多的数据。

其次,合并各个独立进程的运行结果,可能还需要额外进行处理。在我们的例子中,每年的结果独立于其他年份,所以可能需要把所有结果拼接起来,然后再按年份进行排序。如果使用固定块大小的方法,则需要一种精巧的方法来合并结果。在这个例子中,某年的数据通常被分割成几个块,每个块独立处理。我们最终获得每个块的最高气温,所以最后一步找出最大值作为该年的最高气温,其他年份的数据都像这样处理。

最后,还是得受限于单台计算机的处理能力。即便开足马力,用上所有处理器,至少也得花20分钟,系统无法更快了。另外,某些数据集的增长可能会超出单台计算机的处理能力。一旦开始使用多台计算机,整个大环境中的其他因素就会互相影响,主要归类为协调性可靠性两个方面。哪个进程负责运行整个作业?我们如何处理失败的进程?

因此,虽然并行处理也是可行的,但实际上也很麻烦。可以借助于Hadoop类似框架来解决这些问题。

MapReduce分析数据的逻辑流程

为了充分利用 Hadoop 提供的并行处理优势,我们需要将查询表示成 MapReduce 作业。完成某种本地端的小规模测试之后,就可以把作业部署到集群上运行。

MapReduce任务 过程分为两个处理阶段:map阶段和reduce阶段。
每阶段都以键-值对作为输入和输出,其类型由程序员来选择。
程序员还需要写两个函数: map函数和reduce函数。

map阶段

map阶段的输入是NCDC原始数据。我们选择文本格式作为输入格式,将数据集的每一行作为文本输入。键是某一行起始位置相对于文件起始位置的偏移量,不过我们不需要这个信息,所以将其忽略。

我们的map函数很简单。由于我们只对年份和气温属性感兴趣,所以只需要取出这两个字段数据。
在本例中,map函数只是一个数据准备阶段,通过这种方式来准备数据,使reduce函数能够继续对它进行处理:即找出每年的最高气温。
map函数还是一个比较适合去除已损记录(ETL)的地方:此处,我们筛掉缺失的、可疑的或错误的气温数据。

输入数据 的示例数据(部分内容用省略号表示):

  1. 0067011990999991950051507004...9999999N9+00001+99999999999...
  2. 0043011990999991950051512004...9999999N9+00221+99999999999...
  3. 0043011990999991950051518004...9999999N9-00111+99999999999...
  4. 0043012650999991949032412004...0500001N9+01111+99999999999...
  5. 0043012650999991949032418004...0500001N9+00781+99999999999...

map函数的输入,这些行以键-值对的方式输入:

(0, 0067011990999991950051507004…9999999N9+00001+99999999999…) (106, 0043011990999991950051512004…9999999N9+00221+99999999999…) (212, 0043011990999991950051518004…9999999N9-00111+99999999999…) (318, 0043012650999991949032412004…0500001N9+01111+99999999999…) (424, 0043012650999991949032418004…0500001N9+00781+99999999999…)

键(key)是文件中的行偏移量,map函数并不需要这个信息,所以将其忽略。map函数的功能仅限于提取年份和气温信息(以粗体显示),并将它们作为输出(气温值已用整数表示):

(1950, 0) (1950, 22) (1950, -11) (1949, 111) (1949, 78)

reduce阶段

map函数的输出经由MapReduce框架处理后,最后发送到reduce函数。
这个处理过程基于键来对键-值对进行排序和分组。
因此,在这一示例中,reduce 函数看到的是如下输入:

(1949, [111, 78]) (1950, [0, 22, -11])

每一年份后紧跟着一系列气温数据。reduce函数现在要做的是遍历整个列表并从中找出最大的读数:

(1949, 111) (1950, 22)

执行结果和逻辑流程

reduce阶段的执行结果,就是最终的输出结果,即每一年的全球最高气温记录。

(1949, 111) (1950, 22)

整个数据流如下图所示。在图的底部是Unix管线,用于模拟整个MapReduce的流程,部分内容将在讨论Hadoop Streaming时涉及。

MapReduce logical data flow
image.png

使用Java MapReduce分析数据

代码实现:一个map函数、一个reduce函数和一些运行作业的代码。

Mapper

map函数由一个Mapper类来实现,其中声明了一个map()方法。
Mapper for the maximum temperature example

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
* Mapper接口(泛型类型),含有4个形式参数类型:
*   输入键:偏移量,长整数
*   输入值:一行文本
*   输出键:年份,文本
*   输出值:气温,整数
*
* Hadoop规定了自己的一套基本类型,针对网络序列化传输进行了优化,没有直接使用内置的Java类型。
* 这些可以在 `org.apache.hadoop.io` 包中找到。这里使用的是:
*   LongWritable类型(相当于Java的Long类型)
*   Text类型(相当于Java的String类型)
*   IntWritable类型(相当于Java的Integer类型)
*/
public class MaxTemperatureMapper
   extends Mapper<LongWritable, Text, Text, IntWritable> {

   private static final int MISSING = 9999;

    /**
    * map()方法的输入是一个键和一个值。我们将包含一行输入的 Text 值转换成
    * Java的String类型,然后利用其 `subString()` 方法提取我们感兴趣的列。
    *
    * map()方法还提供 `Content` 实例用于来写入输出的内容。
    * 将年份数据按 `Text` 对象进行读/写(年份作为键),用 `IntWritable` 
    * 类型封装气温值。只有气温数据不缺并且所对应质量代码显示为正确的气温
    * 读数时,这些数据才会被写入输出记录中。
    */
    @Override
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {

        String line = value.toString();
        String year = line.subString(15, 19);
        int airTemperature;
        if (line.charAt(87) == '+') {
            // parseInt dosen't like leading plus signs
            airTemperaTure = Integer.parseInt(line.subString(88, 92));
        } else {
            airTemperaTure = Integer.parseInt(line.subString(87, 92));
        }

        String quality = line.subString(92, 93);
        if (airTemperature != MISSING && quality.matches("[01459]")) {
            context.write(new Text(year), new IntWritable(airTemperature));
        }
    }
}

Reducer

以类似方法用Reducer来定义reduce函数。
Reducer for the maximum temperature example

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
* Reducer类(泛型类型),含有4个形式参数类型,指定reduce函数的 输入键、输入值、
* 输出键、输出值 的类型。
*/
public class MaxTemperatureReducer
    extends Reducer<Text, IntWritable, Text, IntWritable> {

    /**
    * reduce函数的输入类型必须匹配map函数的输出类型:`Text` 和 `IntWritable`。
    * reduce函数的输出类型为 `Text` 和 `IntWritable`,分别是年份和最高气温。
    * 这个最高气温是通过循环比较每个气温与当前所知最高气温所得到的。
    */
    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
        throws IOException, InterruptedException {

        int maxValue = Integer.MIN_VALUE;
        for (IntWritable value : values) {
            maxValue = Math.max(maxValue, value.get());
        }
        context.write(key, new IntWritable(maxValue));
    }
}

Runner

编写MapReduce任务运行代码。
Application to find the maximum temperature in the weather dataset

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MaxTemperature {
    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: MaxTemperature <input path> <output path>");
            System.exit(-1);
        }
        // Job对象指定作业执行规范,控制整个作业的运行。
        // 我们在Hadoop集群上运行这个作业时,要把代码打包成一个JAR文件(Hadoop在集群上发布
        // 这个文件)。不必明确指定JAR文件的名称,在Job对象的 `setJarByClass()` 方法中传递
        // 一个类即可,Hadoop利用这个类来查找包含它的JAR文件,进而找到相关的JAR文件。
        Job job = new Job();
        // 指定运行作业的class类
        job.setJarByClass(MaxTemperature.class);
        // 指定作业名称
        job.setJobName("Max temperature");
        // 指定输入数据的路径
        // 路径可以是单个文件,一个目录(此时,目录下所有文件作为输入),或匹配pattern的文件
        // 由函数名可知,可以多次调用 `addInputPath()` 实现多路径输入
        FileInputFormat.addInputPath(job, new Path(args[0]));
        // 指定输出数据的路径
        // 只能有一个输出路径,指定reduce函数输出文件的写入目录
        // 在作业运行前,该目录不应该存在,否则,Hadoop会报错并拒绝运行作业。这种预防措施
        // 的目的是防止数据丢失(长时间运行的作业如果结果被意外覆盖,肯定是非常恼人的)。
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 指定要使用的 map 和 reduce 类型
        job.setMapperClass(MaxTemperatureMapper.class);
        job.setReducerClass(MaxTemperatureReducer.class);

        // 输入的类型通过输入格式来控制,这里没有设置,使用默认的格式
        // 设置默认的输入格式
        // job.setInputFormatClass(TextInputFormat.class);
        // 设置默认的输出格式
        // job.setOutputFormatClass(TextOutputFormat.class);

        // 指定Mapper和Reducer的输出类型
        // `setOutputKeyClass()` 和 `setOutputValueClass()` 方法控制 `reduce` 函数的输出类型,
        // 并且必须和Reduce类产生的相匹配。
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // `map`函数的输出类型默认和 `reduce` 函数相同,所以,如果mapper产生的类型与reducer相同
        // ,它们就不需要设置。但是,如果不同,则必须通过 `setMapOutputKeyClass()` 和
        // `setMapOutputValueClass()` 方法来设置 map 函数的输出类型。
        // job.setMapOutputKeyClass(Text.class)
        // job.setMapOutputValueClass(IntWritable.class)

        // 在设置定义了 map 和 reduce 函数的类之后,可以开始运行作业。
        // `waitForCompletion()` 方法提交作业并等待作业完成。
        // 该方法唯一的参数是一个标识,指示是否生成详细输出。当标识为 true 时,作业会把进度
        // 信息打印到控制台上。该方法返回一个布尔值,表示执行成功(true)或失败(false)。这个
        // 布尔值被转换成程序的退出代码0或1。
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

该部分使用的Java MapReduce API,被为“新API”,取代了功能上等价的旧版本API。(参考底部新旧Java MapReduce APIs)

本地运行测试

写好MapReduce作业之后,通常要拿一个小型数据集(sample.txt)进行测试以排除代码问题。首先,以独立(本机)模式安装Hadoop。在这种模式下,Hadoop在本地文件系统上运行作业程序,即输入文件和输出目录都在本地。执行测试:

# 独立(本地)模式安装的Hadoop目录
$ export HADOOP_HOME=~/Downloads/hadoop-2.5.0-cdh5.3.6/

# 查找编译时需要用到的class文件,添加到CLASSPATH
$ export CLASSPATH=$CLASSPATH:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.5.0-cdh5.3.6.jar
$ export CLASSPATH=$CLASSPATH:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.5.0-cdh5.3.6.jar

# 编译class文件, MaxTemperature.java, MaxTemperatureMapper.java, MaxTemperatureReducer.java 
$ javac MaxTemperature.java 

# 打成 jar 包
$ jar -cvf hadoop-examples.jar MaxTemperature.class MaxTemperatureMapper.class MaxTemperatureReducer.class 
added manifest
adding: MaxTemperature.class(in = 1418) (out= 800)(deflated 43%)
adding: MaxTemperatureMapper.class(in = 1876) (out= 805)(deflated 57%)
adding: MaxTemperatureReducer.class(in = 1664) (out= 708)(deflated 57%)

$ ls
hadoop-examples.jar  MaxTemperature.class  MaxTemperatureMapper.class  MaxTemperatureReducer.class
input                MaxTemperature.java   MaxTemperatureMapper.java   MaxTemperatureReducer.java  
$ cat input/ncdc/sample.txt
0067011990999991950051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9+00001+99999999999
0043011990999991950051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9+00221+99999999999
0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999
0043012650999991949032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+01111+99999999999
0043012650999991949032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+00781+99999999999

# 将该jar包添加到Hadoop的CLASSPATH路径中
$ export HADOOP_CLASSPATH=hadoop-examples.jar
# 执行,查找MaxTemperaturec.class文件,执行main方法
$ $HADOOP_HOME/bin/hadoop MaxTemperature input/ncdc/sample.txt output
20/04/30 00:08:34 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
20/04/30 00:08:35 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
20/04/30 00:08:35 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
20/04/30 00:08:36 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
20/04/30 00:08:36 INFO input.FileInputFormat: Total input paths to process : 1
20/04/30 00:08:36 INFO mapreduce.JobSubmitter: number of splits:1
20/04/30 00:08:37 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local694727581_0001
20/04/30 00:08:37 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
20/04/30 00:08:37 INFO mapreduce.Job: Running job: job_local694727581_0001
20/04/30 00:08:37 INFO mapred.LocalJobRunner: OutputCommitter set in config null
20/04/30 00:08:37 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
20/04/30 00:08:37 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
20/04/30 00:08:37 INFO mapred.LocalJobRunner: Waiting for map tasks
20/04/30 00:08:37 INFO mapred.LocalJobRunner: Starting task: attempt_local694727581_0001_m_000000_0
20/04/30 00:08:37 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
20/04/30 00:08:37 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
20/04/30 00:08:37 INFO mapred.MapTask: Processing split: file:/home/jack/Templates/input/ncdc/sample.txt:0+529
20/04/30 00:08:38 INFO mapreduce.Job: Job job_local694727581_0001 running in uber mode : false
20/04/30 00:08:38 INFO mapreduce.Job:  map 0% reduce 0%
20/04/30 00:08:41 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
20/04/30 00:08:41 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
20/04/30 00:08:41 INFO mapred.MapTask: soft limit at 83886080
20/04/30 00:08:41 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
20/04/30 00:08:41 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
20/04/30 00:08:41 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
20/04/30 00:08:41 INFO mapred.LocalJobRunner: 
20/04/30 00:08:41 INFO mapred.MapTask: Starting flush of map output
20/04/30 00:08:41 INFO mapred.MapTask: Spilling map output
20/04/30 00:08:41 INFO mapred.MapTask: bufstart = 0; bufend = 45; bufvoid = 104857600
20/04/30 00:08:41 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26214380(104857520); length = 17/6553600
20/04/30 00:08:41 INFO mapred.MapTask: Finished spill 0
20/04/30 00:08:41 INFO mapred.Task: Task:attempt_local694727581_0001_m_000000_0 is done. And is in the process of committing
20/04/30 00:08:41 INFO mapred.LocalJobRunner: map
20/04/30 00:08:41 INFO mapred.Task: Task 'attempt_local694727581_0001_m_000000_0' done.
20/04/30 00:08:41 INFO mapred.LocalJobRunner: Finishing task: attempt_local694727581_0001_m_000000_0
20/04/30 00:08:41 INFO mapred.LocalJobRunner: map task executor complete.
20/04/30 00:08:41 INFO mapred.LocalJobRunner: Waiting for reduce tasks
20/04/30 00:08:41 INFO mapred.LocalJobRunner: Starting task: attempt_local694727581_0001_r_000000_0
20/04/30 00:08:41 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
20/04/30 00:08:41 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
20/04/30 00:08:41 INFO mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@7d8a06d6
20/04/30 00:08:41 INFO reduce.MergeManagerImpl: MergerManager: memoryLimit=333971456, maxSingleShuffleLimit=83492864, mergeThreshold=220421168, ioSortFactor=10, memToMemMergeOutputsThreshold=10
20/04/30 00:08:41 INFO reduce.EventFetcher: attempt_local694727581_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
20/04/30 00:08:41 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local694727581_0001_m_000000_0 decomp: 57 len: 61 to MEMORY
20/04/30 00:08:41 INFO reduce.InMemoryMapOutput: Read 57 bytes from map-output for attempt_local694727581_0001_m_000000_0
20/04/30 00:08:41 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 57, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->57
20/04/30 00:08:41 INFO reduce.EventFetcher: EventFetcher is interrupted.. Returning
20/04/30 00:08:41 INFO mapred.LocalJobRunner: 1 / 1 copied.
20/04/30 00:08:41 INFO reduce.MergeManagerImpl: finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
20/04/30 00:08:41 INFO mapred.Merger: Merging 1 sorted segments
20/04/30 00:08:41 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 50 bytes
20/04/30 00:08:41 INFO reduce.MergeManagerImpl: Merged 1 segments, 57 bytes to disk to satisfy reduce memory limit
20/04/30 00:08:41 INFO reduce.MergeManagerImpl: Merging 1 files, 61 bytes from disk
20/04/30 00:08:41 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce
20/04/30 00:08:41 INFO mapred.Merger: Merging 1 sorted segments
20/04/30 00:08:41 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 50 bytes
20/04/30 00:08:41 INFO mapred.LocalJobRunner: 1 / 1 copied.
20/04/30 00:08:41 INFO Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
20/04/30 00:08:41 INFO mapred.Task: Task:attempt_local694727581_0001_r_000000_0 is done. And is in the process of committing
20/04/30 00:08:41 INFO mapred.LocalJobRunner: 1 / 1 copied.
20/04/30 00:08:41 INFO mapred.Task: Task attempt_local694727581_0001_r_000000_0 is allowed to commit now
20/04/30 00:08:41 INFO output.FileOutputCommitter: Saved output of task 'attempt_local694727581_0001_r_000000_0' to file:/home/jack/Templates/output/_temporary/0/task_local694727581_0001_r_000000
20/04/30 00:08:41 INFO mapred.LocalJobRunner: reduce > reduce
20/04/30 00:08:41 INFO mapred.Task: Task 'attempt_local694727581_0001_r_000000_0' done.
20/04/30 00:08:41 INFO mapred.LocalJobRunner: Finishing task: attempt_local694727581_0001_r_000000_0
20/04/30 00:08:41 INFO mapred.LocalJobRunner: reduce task executor complete.
20/04/30 00:08:42 INFO mapreduce.Job:  map 100% reduce 100%
20/04/30 00:08:42 INFO mapreduce.Job: Job job_local694727581_0001 completed successfully
20/04/30 00:08:42 INFO mapreduce.Job: Counters: 33
    File System Counters
        FILE: Number of bytes read=7700
        FILE: Number of bytes written=494600
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
    Map-Reduce Framework
        Map input records=5
        Map output records=5
        Map output bytes=45
        Map output materialized bytes=61
        Input split bytes=112
        Combine input records=0
        Combine output records=0
        Reduce input groups=2
        Reduce shuffle bytes=61
        Reduce input records=5
        Reduce output records=2
        Spilled Records=10
        Shuffled Maps =1
        Failed Shuffles=0
        Merged Map outputs=1
        GC time elapsed (ms)=0
        CPU time spent (ms)=0
        Physical memory (bytes) snapshot=0
        Virtual memory (bytes) snapshot=0
        Total committed heap usage (bytes)=522190848
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=529
    File Output Format Counters 
        Bytes Written=29

# 查看结果
$ ls output/
part-r-00000  _SUCCESS
$ cat ~/Templates/output/part-r-00000 
1949    111
1950    22

如果调用 hadoop 命令的第一个参数是类名,Hadoop 就会启动一个JVM(Java虚拟机)来运行这个类。该Hadoop命令将Hadoop库(及其依赖关系)添加到类路径中,同时也能获得Hadoop配置信息。为了将应用类添加到类路径中,我们定义了一个 HADOOP_ CLASSPATH 环境变量,然后由Hadoop脚本来执行相关操作。

运行作业所得到的输出提供了一些有用的信息。例如,我们可以看到,作业标识 为 job_local694727581_0001,并且执行了一个map任务和一个reduce 任务,任务标识分别为 attempt_local694727581_0001_m_000000_0和
attempt_local694727581_0001_r_000000_0。在调试MapReduce作业时,知道作业ID和任务ID是非常有用的。

输出的最后一部分,以 Counters 为标题,显示 Hadoop 上运行的每 个作业的一些统计信息。这些信息对检查数据是否按照预期进行处理非常有用。例如,我们查看系统输出的记录信息可知:5个map输入记录产生5个map输出记录(由于mapper为每个合法的输入记录产生一个输出记录),随后,分为两组的5个reduce输入记录(一组对应一个唯一的键)产生两个reduce输出记录。

输出数据写入 output 目录,其中每个 reducer 都有一个输出文件。我们的例子中作业只有一个 reducer,所以只能好到一个名为“part-r-00000”的文件。
继续上面的环境,测试比较大的数据集,执行测试:

$ ls input/ncdc/all/
1901.gz  1902.gz
$ gunzip input/ncdc/all/190*
$ rm -rf output
$ $HADOOP_HOME/bin/hadoop MaxTemperature input/ncdc/all/ output
··· ···
$ cat output/part-r-00000 
1901    317
1902    244

数据流(分布式并行分析 - 横向扩展)

前面介绍了MapReduce针对少量输入数据是如何工作的,现在我们开始鸟瞰整个系统以及有大量输入时的数据流。为了简单起见,到目前为止,我们的例子都只是用了本地文件系统中的文件。然而,为了实现横向扩展(scaling out), 我们需要把数据存储在分布式文件系统中(典型的为HDFS)。通过使用Hadoop资源管理系统YARN,Hadoop 可以将MapReduce计算转移到存储有部分数据的各台机器上。下面我们看看具体过程。

作业调度

首先定义一些术语。
MapReduce 作业(job)是客户端需要执行的一个工作单元,它包括:

  • 输入数据
  • MapReduce程序
  • _配置信息

Hadoop 将作业分成若干个任务(task)来执行,其中包括两类任务:

  • map任务
  • reduce 任务

这些任务运行在集群的节点上,并通过YARN进行调度。如果一个任务失败,它将在另一个不同的节点上自动重新调度运行。

map任务调度(分片)

Hadoop将MapReduce的输入数据划分成等长的小数据块,称为输入分片(input split)或简称“分片”。Hadoop 为每个分片构建一个map任务,并由该任务来运行用户自定义的map函数从而处理分片中的每条记录。

拥有许多分片,意味着处理每个分片所需要的时间少于处理整个输入数据所花的时间。因此,如果我们并行处理每个分片,且每个分片数据比较小,那么整个处理过程将获得更好的负载平衡,因为一台较快的计算机能够处理的数据分片比一台较慢的计算机更多,且成一定的比例。即使使用相同的机器,失败的进程或其他并发运行的作业能够实现满意的负载平衡,并且随着分片被切分得更细,负载平衡的质量会更高。

另一方面,如果分片切分得太小,那么管理分片的总时间和构建map任务的总时间将决定作业的整个执行时间。对于大多数作业来说,一个合理的分片大小趋向于HDFS的一个块的大小,默认是128 MB,不过可以针对集群调整这个默认值(对所有新建的文件),或在每个文件创建时指定。

a)Hadoop在存储有输入数据(HDFS中的数据)的节点上运行map任务,可以获得最佳性能,因为它无需使用宝贵的集群带宽资源。这就是所谓的“数据本地优化”(data locality optimization)。
b)但是,有时对于一个map任务的输入分片来说,存储该分片的HDFS数据块复本的所有节点可能正在运行其他map任务,此时作业调度需要从某一数据块所在的机架中的另一个节点上寻找一个空闲的map槽(slot)来运行该map任务分片。
c)仅仅在非常偶然的情况下(该情况基本上不会发生),会使用其他机架中的节点运行该map任务,这将导致机架与机架之间的网络传输。下图显示了这三种可能性。
Data-local (a), rack-local (b), and off-rack (c) map tasks
image.png
现在我们应该清楚为什么最佳分片的大小应该与块大小相同:因为它是确保可以存储在单个节点上的最大输入块的大小。如果分片跨越两个数据块,那么对于任何一个HDFS节点,基本上都不可能同时存储这两个数据块,因此分片中的部分数据需要通过网络传输到map任务运行的节点。与使用本地数据运行整个map任务相比,这种方法显然效率更低。
map任务会将其输出写入本地硬盘,而非HDFS。这是为什么?
因为map的输出是中间结果:该中间结果由 reduce 任务处理后才产生最终输出结果,而且一旦作业完成,map的输出结果就可以删除。因此,如果把它存储在HDFS中并实现备份,难免有些小题大做。如果运行map任务的节点在将map中间结果传送给reduce任务之前失败,Hadoop 将在另一个节点上重新运行这个map任务以再次构建map中间结果。

reduce任务调度(合并、分区、混洗)

reduce任务并不具备数据本地化的优势,单个reduce 任务的输入通常来自于所有mapper的输出。在本例中,我们仅有一个reduce 任务,其输入是所有map任务的输出。因此,排过序的map输出需通过网络传输发送到运行reduce 任务的节点。数据在reduce端合并,然后由用户定义的reduce函数处理。reduce 的输出通常存储在HDFS中以实现可靠存储。如第3章所述,对于reduce输出的每个HDFS块,第一个复本存储在本地节点上,其他复本出于可靠性考虑存储在其他机架的节点中。因此,将 reduce 的输出写入HDFS确实需要占用网络带宽,但这与正常的HDFS管线写入的消耗一样。

一个reduce 任务的完整数据流如下图所示。虚线框表示节点,虚线箭头表示节点内部的数据传输,而实线箭头表示不同节点之间的数据传输。
MapReduce data flow with a single reduce task
image.png
reduce任务的数量并非由输入数据的大小决定,相反是独立指定的。8.1.1节将介绍如何为指定的作业选择reduce任务的数量。
如果有好多个reduce 任务,每个map任务就会针对输出进行分区(partition),即为每个 reduce 任务建一个分区。每个分区有许多键(及其对应的值),但每个键对应的键-值对记录都在同一分区中。分区可由用户定义的分区函数控制,但通常用默认的prtitioner通过哈希函数来分区,很高效。

一般情况下,多个reduce 任务的数据流如下图所示。该图清楚地表明了为什么map任务和reduce任务之间的数据流称为shuffle(混洗),因为每个reduce任务的输入都来自许多map任务。shuffle 一般比图中所示的更复杂,而且调整混洗参数对作业总执行时间的影响非常大,详情参见7.3节。
MapReduce data flow with multiple reduce tasks
image.png

无reduce任务调度

可能会出现无 reduce 任务的情况,数据处理可以完全并行(即无需混洗)。在这种情况下,唯一的非本地节点数据传输是map任务将结果写人HDFS。
MapReduce data flow with no reduce tasks
image.png

combiner函数(Map与Reduce间的数据传输优化)

集群上的可用带宽限制了MapReduce作业的数量,因此尽量避免map和reduce任务之间的数据传输是有利的。Hadoop 允许用户针对map任务的输出指定一个combiner (就像mapper和reducer一样), combiner 函数的输出作为reduce 函数的输入。由于combiner属于优化方案,所以Hadoop无法确定要对一个指定的map任务输出记录调用多少次 combiner (如果需要)。换而言之,不管调用combiner多少次,0次、1次或多次,reducer 的输出结果都是一样的。

combiner的工作机制

combiner的规则制约着可用的函数类型。这里最好用一个例子来说明。还是假设以前计算最高气温的例子,1950 年的读数由两个map任务处理(因为它们在不同的分片中)。
假设第一个map的输出如下:

(1950, 0) (1950, 20) (1950, 10)

第二个map的输出:

(1950, 25) (1950, 15)

reduce函数在调用时被传入以下数字:

(1950, [0, 20, 10, 25, 15])

因为25是输入值中的最大数,所以输出为

(1950, 25)

使用combiner函数,像reduce函数那样,为每个map输出找到最高气温。 reduce函数被调用时将传入如下数值:

(1950, [20, 25])

reduce输出的结果和以前一样。 类似于如下调用

max(0, 20, 10, 25, 15) -> max(max(0, 20, 10), max(25, 15)) -> max(20, 25) -> 25

注意,并非所有的函数都有此属性。例如,计算平均气温时,便不能用mean作为combiner。

mean(0, 20, 10, 25, 15) = 14 mean(mean(0, 20, 10), mean(25, 15)) = 15

combiner函数不能取代reduce函数。为什么呢?我们仍然需要reduce函数来处理不同map输出中具有相同键的记录。但combiner函数能帮助减少mapper和reducer之间的数据传输量,因此,单纯就这点而言,在MapReduce作业中是否使用combiner函数还是值得斟酌的。

指定一个combiner

回到Java MapReduce编程,combiner 函数是使用 Reducer 类定义的,且对于这个应用程序,它的实现与MaxTemperatureReducer 中的reduce函数实现相同。我们需要做的唯一更改是在 Job 上设置 combiner 类。

/** 使用combiner功能高效寻找最高气温的应用程序  */
public class MaxTemperatureWithCombiner {

    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: MaxTemperatureWithCombiner <input path> <output path>");
            System.exit(-1);
        }

        Job job = new Job();
        job.setJarByClass(MaxTemperatureWithCombiner.class);
        job.setJobName("Max Temperatrue");

        FileInputFormat.addInputPath(new Path(args[0]));
        FileOutputFormat.addOutputPath(new Path(args[1]));

        job.setMapperClass(MaxTemperatureMapper.class);
        // 使用reducer
        job.setCombinerClass(MaxTemperatureReducer.class);
        job.setReducerClass(MaxTemperatureReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        System.exit(job.waitForCompletion(true)? 0:1);
    }
}

使用Java MapReduce分布式并行分析数据

这个程序用不着修改便可以在一个完整的数据集上直接运行。这是MapReduce 的优势之一:它可以根据数据量的大小和硬件规模进行扩展
这里有一个运行结果:在一个10节点(High-CPU Extra Large Instance)EC2集群运行,程序执行时间只花了短短6分钟。(这比在单台机器上通过awk串行运行快7倍。性能没有得到线性增长的主要原因是输入数据并不是均匀分块的。为了方便起见,数据已经按照年份压缩(gzip),导致后续年份的文件比较大,因为这些年份的天气记录更多。)

使用Hadoop Streaming分析数据

Hadoop提供了一个API给MapReduce,它允许你用除了Java以外的语言来编写自己的map和reduce函数。
Hadoop Streaming 使用Unix标准流作为hadoop和程序之间的接口,所以可以使用任何语言,只要编写的MapReduce程序能够读取标准输入,并写入到标准输出。(对于C++程序员而言,Hadoop Pipes是Streaming的可替代品。它用套接字(socket)与运行着用 C++ 语言写的map或reduce函数的进程通信。)

Streaming天生适合用于文本处理。Map的输入数据通过标准输入传递给Map函数,Map函数逐行处理数据并将行写入标准输出。map输出的键/值对是以一个制表符分隔的行。reduce函数的输入具有相同的格式 — 通过制表符分隔的键/值对 — 通过标准输入流进行传输。reduce函数从标准输入流中读取输入行,然后为保证结果的有序性,会根据键来排序,最后将结果写入标准输出。

Ruby版本

用于查找最高气温的 map函数(ruby版)

#!/usr/bin/env ruby

STDIN.each_line do |line|
  val = line
  year, temp, q = val[15,4], val[87, 5], val[92, 1]
  puts "#{year}\t#{temp}" if (temp != "+9999" && q =~ /[01459]/)
end

该程序通过为STDIN(类型为IO的全局常量)中的每一行执行一个块来遍历来自标准输入的行。
该块从每个输入行提取相关字段,如果温度有效,则将用制表符“\t”分隔的年份和温度写入标准输出(使用puts)。

因为脚本只能在标准输入和输出上运行,所以最简单的方式是通过UNIX管道进行测试,而不使用Hadoop:

% cat input/ncdc/sample.txt | ch02-mr-intro/src/main/ruby/max_temperature_map.rb
1950 +0000
1950 +0022
1950 -0011
1949 +0111
1949 +0078

用于查找最高气温的 reduce函数(ruby版)

#!/usr/bin/env ruby

last_key, max_val = null, -1000000
STDIN.each_line do |line|
  key, value = line.split("\t")
  if last_key && last_key != key
    puts "#{last_key}\t#{max_val}"
    last_key, max_val = key, val.to_i
  else
    last_key, max_val = key, [val.to_i, max_val].max
  end
end
puts "#{last_key}\t#{max_val}" if last_key

同样,程序遍历标准输入中的行,但这一次我们必须在处理每个键组时存储一些状态。
在本例中,键是年份,我们存储最后看到的键和到目前为止看到的该键的最高温度。MapReduce框架确保键是有序的,因此我们知道,如果一个键与前一个键不同,我们就进入了一个新的键组(预示着处理完一组键相同的输入行)。 与Java API不同,在Java API中为每个键组提供一个迭代器,而在Streaming中,您必须在程序中找到键组边界。

对于每一行,我们提取键和值。然后,如果我们刚完成一组(last_key && last_key != key),在重新设置新键的最高温度之前,我们写入该组的键和最高温度,用制表符分隔。如果我们还没有完成一个组,我们只需要更新当前键的最高温度。程序的最后一行保证了输入的最后一组键会有一行输出。

现在可以用UNIX管道来模拟整个 MapReduce 的管线:

% cat input/ncdc/sample.txt | \
  ch02-mr-intro/src/main/ruby/max_temperature_map.rb | \
  sort | ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb
1949 111
1950 22

hadoop命令不支持streaming;相反,您需要使用 jar 选项来指定 Streaming JAR 文件。streaming 程序的选项指定输入和输出路径以及map和reduce脚本。如下所示:

% hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-input input/ncdc/sample.txt \
-output output \
-mapper ch02-mr-intro/src/main/ruby/max_temperature_map.rb \
-reducer ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb

当在集群上运行大型数据集时,我们应该使用 -combiner 选项来设置combiner:

% hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
  -files ch02-mr-intro/src/main/ruby/max_temperature_map.rb,\
ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb \
  -input input/ncdc/all \
  -output output \
  -mapper ch02-mr-intro/src/main/ruby/max_temperature_map.rb \
  -combiner ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb \
  -reducer ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb

还要注意 -files 选项的使用,我们在集群上运行 streaming 程序时,我们使用它将脚本发送到集群。

Python版本

streaming支持任何可以从标准输入中读和写到标准输出中的编程语言,所以对于更熟悉Python的读者来说,下面是同样的例子。
用于查找最高气温的 map 函数(python版本)

#!/usr/bin/env python

import re
import sys

for line in sys.stdin:
    val = line.strip()
    (year, temp, q) = (val[15:19], val[87:92], val[92:93])
    if (temp != "+9999" and re.match(r"[01459]", q)):
        print "%s\t%s" % (year, temp)

用于查找最高气温的 reduce 函数(python版本)

#!/usr/bin/env python

import sys

(last_key, max_val) = (None, -sys.maxint)
for line in sys.stdin:
    (key, val) = line.strip().split("\t")
    if last_key and last_key != key:
        print "%s\t%s" % (last_key, max_val)
        (last_key, max_val) = (key, int(val))
    else:
        (last_key, max_val) = (key, max(max_val, int(val)))

if last_key:
    print "%s\t%s" % (last_key, max_val)

作为 streaming 的替代方案,Python程序员应该考虑 Dumbo,它可以生成Streaming MapReduce接口更加python化和易于使用。

我们可以像测试Ruby程序来运行作业。例如,像下面这样运行:

% cat input/ncdc/sample.txt | \
  ch02-mr-intro/src/main/python/max_temperature_map.py | \
  sort | ch02-mr-intro/src/main/python/max_temperature_reduce.py
1949 111
1950 22

参考

NCDC数据预处理

这里首先简要介绍如何准备原始气象数据文件,以便我们能用Hadoop对它们进行分析。如果打算得到一份数据副本供Hadoop 处理,可按照本书配套网站(网址为 http://www.hadoopbook.com )给出的指导进行操作。接下来,首先说明如何处理原始的气象文件。

原始数据实际是一组经过bzip2压缩的tar文件。每个年份有价值的数据单独放在一个文件中。部分文件列举如下:

1901.tar.bz2
1902.tar.bz2
1903.tar.bz2
...
2000.tar.bz2

各个tar文件包含一个gzip压缩文件,描述某一年度所有气象站的天气记录。(事实上,由于在存档中的各个文件已经预先压缩过,因此再利用bzip2对存档压缩就稍显多余了)。示例如下:

$ tar jxf 1901.tar.bz2
$ ls 1901 | head
029070-99999-1901.gz
029500-99999-1901.gz
029600-99999-1901.gz
029720-99999-1901.gz
029810-99999-1901.gz
227070-99999-1901.gz

由于气象站数以万计,所以整个数据集实际上是由大量小文件构成的。鉴于Hadoop对少量的大文件的处理更容易、更高效,所以在本例中,我们将每个年度的数据解压缩到一个文件中,并以年份命名。上述操作可由一个
MapReduce 程序来完成,以充分利用其并行处理能力的优势。下面具体看看这个程序。

该程序只有一个map函数无reduce函数,因为map函数可并行处理所有文件操作,无需整合步骤。这项处理任务能够用一个Unix脚本进行处理,因而在这里使用面向MapReduce的Streaming接口比较恰当。请看范例C-1。

范例C-1. **Bash脚本处理原始的NCDC数据文件并将它们存储在HDFS中**

#!/usr/bin/env bash

# NLineInputFormat gives a single line: key is offset, value is S3 URI
# 输入格式 NLineInputFormat:键值对 - 偏移量/文件链接
read offset s3file                                        # ncdc_files.txt

# Retrieve file from S3 to local disk
# 从AWS S3检索文件,保存到本地
echo "reporter:status:Retrieving $s3file" >&2
$HADOOP_HOME/bin/hadoop fs -get $s3file .

# Un-bzip and un-tar the local file
# 解压 bz2 文件,解压 tar 文件,到本地
# echo `basename s3n://hadoopbook/ncdc/raw/isd-1901.tar.bz2 .tar.bz2` -> isd-1901
target=`basename $s3file .tar.bz2`                        # isd-1901   获取文件名
mkdir -p $target                                          # isd-1901/  创建文件夹
echo "reporter:status:Un-tarring $s3file to $target" >&2  # 输出到 标准错误
# -j:支持bzip2解压文件;
# -x或--extract或--get:从备份文件中还原文件;
# -f<备份文件>或--file=<备份文件>:指定备份文件;    # isd-1901.tar.bz2
tar jxf `basename $s3file` -C $target               # isd-1901/isd-1901/029070-99999-1901.gz

# Un-gzip each station file and concat into one file
# 解压 gz 文件,每个站,合并到一个文件
echo "reporter:status:Un-gzipping $target" >&2      # 
for file in $target/*/*                             # isd-1901/029070-99999-1901
do                                                  # 029070-99999-1901.gz 文件
  # -c或--stdout或--to-stdout:把解压后的文件输出到标准输出设备;
  gunzip -c $file >> $target.all                    # 解压写入 isd-1901.all
  echo "reporter:status:Processed $file" >&2
done

# Put gzipped version into HDFS
# 压缩,上传 
echo "reporter:status:Gzipping $target and putting in HDFS" >&2
# -c或--stdout或--to-stdout:把解压后的文件输出到标准输出设备;
# `hadoop fs -put -` 从标准流中获得数据
gzip -c $target.all | $HADOOP_HOME/bin/hadoop fs -put - gz/$target.gz

输入是一个小的文本文件(ncdc_files.txt),列出了所有待处理文件(这些文件放在S3文件系统中,因此能够以Hadoop所认可的S3 URI的方式被引用。旧的“s3n”(s3 native)实现,schema已更换成“s3a”)。示例如下:

s3n://hadoopbook/ncdc/raw/isd-1901.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1902.tar.bz2
...
s3n://hadoopbook/ncdc/raw/isd-2000.tar.bz2

由于输入格式被指定为NLineInputFormat,每个mapper接受一行输入(包含必须处理的文件)。处理过程在脚本中解释,但简单说来,它会解压缩bzip2文件,然后将该年份所有文件整合为一个文件。最后,该文件以gzip进行压缩并复制至HDFS之中。注意,可以使用指令hadoop fs -put - 从标准输入中获得数据。

状态消息输出到“标准错误”并以 reporter:status为前缀,这样可以解释为MapReduce状态更新。这告诉Hadoop该脚本正在运行,并未挂起。

运行Streaming作业的脚本如下:

% hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
  -D mapred.reduce.tasks=0 \
  -D mapred.map.tasks.speculative.execution=false \
  -D mapred.task.timeout=12000000 \
  -input ncdc_files.txt \
  -inputformat org.apache.hadoop.mapred.lib.NLineInputFormat \
  -output output \
  -mapper load_ncdc_map.sh \
  -file load_ncdc_map.sh

由此易知,这是一个“只有map” 的作业,因为 reduce 任务数为0。以上脚本还关闭了推测执行(speculative execution), 因此重复的任务不会写相同的文件。任务超时参数被设置为一个比较大的值,使得Hadoop不会杀掉那些运行时间较长的任务,例如,在解档文件或将文件复制到HDFS时,或者当进展状态未被报告时。最后,调用distcp将文件从HDFS中复制出来,再存档到S3文件系统中。

新旧Java MapReduce APIs

使用的JavaMapReduceAPI被称为“新API”,它替代了功能上等同的旧API。尽管Hadoop同时提供MapReduce API 的新旧版本,但它们彼此并不兼容。如果愿意的话可以使用旧API,因为对于本书中所有 MapReduce 范例,相应的使用旧API的范例代码都可以到本书的配套网站下载(在 oldapi 包中)。

新版旧版API有下面几个值得注意的差异。

  • 存储位置不同
    新API在 org.apache.hadoop.mapreduce 包(子包)中。旧API仍然在 org.apache.hadoop.mapred 中。
  • API实现不同
    新API偏好在接口上定义抽象类,因为这样更利于演进。该特性意味着可以在一个抽象类中添加一个方法(带有默认的实现),而无需破坏该类中旧的实现。例如,旧API中的Mapper和Reducer接口在新API中都变成了抽象类。(从技术角度而言,如果类中原有的方法和新增的方法签名相同,那么这种改变几乎必然会破坏原有方法的实现,正如Jim des Rivieres 在“Evolving Java-based APIs” 一文(网址为http://bit.ly/adding_api_method)中解释的那样。出于实用目的,我们将这种改变视为可兼容的。)
  • Context对象代替JobConf、OutputCollector、Reporter
    新API大量使用了 context object,使得用户代码能够与MapReduce系统通信。例如,新的 Context 本质上统一了旧API 中的以下三种角色JobConfOutputCollectorReporter
  • 执行流的控制方式不同
    在新版和旧版API中,键值对都是被推送给 mapper 和 reducer 。但是,新API允许 mapper 和 reducer 通过重写 run() 方法来控制执行流。例如,可以以批为单位处理记录,或者在所有记录都被处理前终止本次执行。在旧API中,mapper可以通过写一个MapRunnable类来达到同样的目的,但对于reducer来说没有等价的手段。
  • 新API通过 Job 类来完成作业控制,旧API中对应的是JobClient,在新API中已经删除该类。
  • 作业配置方式不同
    新API对配置功能进行了整合。旧API专门有一个用于作业配置的JobConf对象,是Hadoop的平常(vanilla)对象Configuration (用于配置守护进程)的一个扩展。新API 中,作业配置通过 Configuration 完成,可能会用到Job 类的一些辅助类(helper)方法。
  • 输出文件的命名略有不同:旧API中,map 和 reduce 输出命名格式为 part-nnnnn,而在新API中,map 输出命名为 part-m-nnnnn,reduce 输出命名为 part-r-nnnnn (nnnnn是个整数,表示该部分的编号,从00000开始)。
  • 自定义异常中断处理
    新API中,用户可重写的方法声明抛出的异常是 java.lang.InterruptedException。意味着用户可以自己写代码处理中断,这样,如果需要,系统框架可以友好地取消长时间运行的操作。(详情可以参见Brian Goetz的文章“Java theory and practice: Dealing with InterruptedException”,网址为http://bit.ly/interruptedexception。)
  • 新API中,reduce()方法用 java.lang.Iterable 传递值,而旧API使用 java.lang.Iterator 传递值。这种改变使得使用Java 的 for-each 循环结构实现值的迭代更加容易。
    for (VALUEIN value : values) { ... }

使用新API且在 Hadoop1 上编译的程序,如果要在 Hadoop2 上运行,则需重新编译。这是因为新Mapreduce API中的一些类在 Hadoop1 和 Hadoop2 两个版本之间改变了接口。下面就是一个运行时的错误的症状:

java.lang.IncompatibleClassChangeError: Found interface 
         org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected

范例D-1给出了一个MaxTemperature应用,该程序来自上面的Java MapReduce API示例,这里使用旧API对其进行了重写。

把 Mapper 和 Reducer 类转换为新API时,不要忘记将 map() 方法和 reduce() 方法的签名(signatures)修改成新的形式。仅仅修改原有的类去继承新的 Mapper 或 Reducer 类将不会产生一个编译错误或警告,因为这些类分别提供了 map() 方法和 reduce() 方法的标识形式。但是,你的 mapper 或 reducer 代码将不会被调用,这会导致一些难以诊断的错误。
使用 @override 注释map()和reduce()方法,这样一来,Java编译器就会捕捉到这些错误。

范例D-1. 这个应用程序在气象数据集中找出最高气温(使用旧的MapReduceAPI)

public class OldMaxTemperature {

    static class OldMaxTemperatureMapper 
        // diff
        extends MapReduceBase
        // diff
        implements Mapper<LongWritable, Text, Text, IntWritable> {

        private static final int MISSING = 9999;

        @Override
        public void map(LongWritable key, Text value,
            // diff
            OutputCollector<Text, IntWritable> output, Reporter reporter)
            throws IOException {

            String line = value.toString();
            String year = line.substring(15, 19);
            int airTemperature;
            if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
                airTemperature = Integer.parseInt(line.substring(88, 92));
            } else {
                airTemperature = Integer.parseInt(line.substring(87, 92));
            }
            String quality = line.substring(92, 93);
            if (airTemperature != MISSING && quality.matches("[01459]")) {
                output.collect(new Text(year), new IntWritable(airTemperature));
            }
        }
    }

    static class OldMaxTemperatureReducer 
        // diff
        extends MapReduceBase
        // diff
        implements Reducer<Text, IntWritable, Text, IntWritable> {
        @Override
        public void reduce(Text key, Iterator<IntWritable> values,
            // diff
            OutputCollector<Text, IntWritable> output, Reporter reporter)
            throws IOException {
            int maxValue = Integer.MIN_VALUE;
            while (values.hasNext()) {
                maxValue = Math.max(maxValue, values.next().get());
            }
            output.collect(key, new IntWritable(maxValue));
        }
    }
    public static void main(String[] args) throws IOException {
        if (args.length != 2) {
            System.err.println("Usage: OldMaxTemperature <input path> <output path>");
            System.exit(-1);
        }
        // diff: conf
        JobConf conf = new JobConf(OldMaxTemperature.class);
        conf.setJobName("Max temperature");
        FileInputFormat.addInputPath(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));

        conf.setMapperClass(OldMaxTemperatureMapper.class);
        conf.setReducerClass(OldMaxTemperatureReducer.class);
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);
        JobClient.runJob(conf);
   }
}