概要

MapReduce概述

MaxCompute提供三个版本的MapReduce编程接口:

  • MaxCompute MapReduce:MaxCompute的原生接口,执行速度更快、开发更便捷、不暴露文件系统。
  • MaxCompute 扩展MapReduce(MR2):对MaxCompute MapReduce的扩展,支持更复杂的作业调度逻辑。MapReduce的实现方式与MaxCompute原生接口一致。
  • Hadoop兼容版本MapReduce:高度兼容Hadoop MapReduce ,与MaxCompute MapReduce MR2不兼容。

以上三个版本在基本概念、作业提交、输入与输出、资源使用等方面基本一致,仅各版本的Java SDK有所不同。本文仅对MapReduce的基本原理做简单介绍,更多详情请参见Hadoop Map/Reduce教程

应用场景

MapReduce支持下列场景:

  • 搜索:网页爬取、倒排索引、PageRank。
  • Web访问日志分析:
    • 分析和挖掘用户在Web上的访问、购物行为特征,实现个性化推荐。
    • 分析用户访问行为。
  • 文本统计分析:
    • 热门小说的字数统计(WordCount)、词频TFIDF分析。
    • 学术论文、专利文献的引用分析和统计。
    • 维基百科数据分析等。
  • 海量数据挖掘:非结构化数据、时空数据、图像数据的挖掘。
  • 机器学习:监督学习、无监督学习、分类算法如决策树、SVM等。
  • 自然语言处理:
    • 基于大数据的训练和预测。
    • 基于语料库构建单词同现矩阵,频繁项集数据挖掘、重复文档检测等。
  • 广告推荐:用户点击(CTR)和购买行为(CVR)预测。

处理流程

MapReduce处理数据过程主要分成Map和Reduce两个阶段。首先执行Map阶段,再执行Reduce阶段。Map和Reduce的处理逻辑由用户自定义实现,但要符合MapReduce框架的约定。处理流程如下所示:

  1. 在正式执行Map前,需要将输入数据进行分片。所谓分片,就是将输入数据切分为大小相等的数据块,每一块作为单个Map Worker的输入被处理,以便于多个Map Worker同时工作。
  2. 分片完毕后,多个Map Worker便可同时工作。每个Map Worker在读入各自的数据后,进行计算处理,最终输出给Reduce。Map Worker在输出数据时,需要为每一条输出数据指定一个Key,这个Key值决定了这条数据将会被发送给哪一个Reduce Worker。Key值和Reduce Worker是多对一的关系**,具有相同Key的数据会被发送给同一个Reduce Worker**,单个Reduce Worker有可能会接收到多个Key值的数据。
  3. 进入Reduce阶段之前,MapReduce框架会对数据按照Key值排序,使得具有相同Key的数据彼此相邻。如果您指定了合并操作(Combiner),框架会调用Combiner,将具有相同Key的数据进行聚合。Combiner的逻辑可以由您自定义实现。与经典的MapReduce框架协议不同,在MaxCompute中,Combiner的输入、输出的参数必须与Reduce保持一致,这部分的处理通常也叫做洗牌(Shuffle)
  4. 接下来进入Reduce阶段。相同Key的数据会到达同一个Reduce Worker。同一个Reduce Worker会接收来自多个Map Worker的数据。每个Reduce Worker会对Key相同的多个数据进行Reduce操作。最后,一个Key的多条数据经过Reduce的作用后,将变成一个值

下文将以WordCount为例
假设存在一个文本a.txt,文本内每行是一个数字,您要统计每个数字出现的次数。文本内的数字称为Word,数字出现的次数称为Count。如果MaxCompute MapReduce完成这一功能,需要经历以下流程,如下图所示。MaxCompute学习笔记 Part 9 MapReduce - 图1
操作步骤

  1. 输入数据:对文本进行分片,将每片内的数据作为单个Map Worker的输入。
  2. Map阶段:Map处理输入,每获取一个数字,将数字的Count 设置为1,并将此对输出,此时以Word作为输出数据的Key。
  3. Shuffle>合并排序:在Shuffle阶段前期,首先对每个Map Worker的输出,按照Key值(即Word值)进行排序。排序后进行Combiner操作,即将Key值(Word值)相同的Count累加,构成一个新的对。此过程被称为合并排序。
  4. Shuffle>分配Reduce:在Shuffle阶段后期,数据被发送到Reduce端。Reduce Worker收到数据后依赖Key值再次对数据排序。
  5. Reduce阶段:每个Reduce Worker对数据进行处理时,采用与Combiner相同的逻辑,将Key值(Word 值)相同的Count累加,得到输出结果。
  6. 输出结果数据。

注意事项

  • 不能通过MapReduce读取外部表数据。
  • 由于MaxCompute的所有数据都被存放在表中,因此MaxCompute MapReduce的输入、输出只能是表,不允许您自定义输出格式,不提供类似文件系统的接口。

使用限制

详见MapReduce使用限制

扩展MapReduce

相比于传统的MapRudce,MaxCompute提供的扩展MapReduce模型(简称MR2)改变了底层的调度和IO模型,可避免作业时冗余的IO操作。

MR2模型产生背景

传统的MapReduce模型要求在经过每一轮MapReduce操作后,得到的数据结果必须存储到分布式文件系统中(例如,HDFS或MaxCompute数据表)。MapReduce模型通常由多个MapReduce作业组成,每个作业执行完成后都需要将数据写入磁盘,然而后续的Map任务很可能只需要读取一遍这些数据,为之后的Shuffle阶段做准备,这种情况就产生了冗余的磁盘IO操作。
MaxCompute的计算调度逻辑可以支持更复杂的编程模型, 针对上述情况,可以在Reduce后直接执行下一次的Reduce操作,而不需要中间插入一个Map操作。因此,MaxCompute提供了扩展的MapReduce模型,即可以支持Map后连接任意多个Reduce操作,例如Map>Reduce>Reduce。

与Hadoop Chain Mapper/Reducer对比

Hadoop Chain Mapper/Reducer也支持类似的串行化Map或Reduce操作,但和MaxCompute的扩展MapReduce(MR2)模型有本质的区别。
Chain Mapper/Reducer基于传统的MapReduce模型,仅可以在原有的Mapper或Reducer后增加一个或多个Mapper操作(不允许增加Reducer)。这样的优点是:您可以复用之前的Mapper业务逻辑,把一个Map或Reduce拆成多个Mapper阶段,同时本质上并没有改变底层的调度和I/O模型。

开源兼容MapReduce

Hadoop用户如果要将原来的Hadoop MapReduce作业迁移到MaxCompute的MapReduce中执行,需要重写MapReduce的代码,使用MaxCompute的接口进行编译和调试,运行正常后再打成一个Jar包才能放到MaxCompute的平台来运行,这个过程十分繁琐。
目前,MaxCompute平台提供了一个Hadoop MapReduce到MaxCompute MapReduce的适配工具,已经在一定程度上实现了Hadoop MapReduce作业的二进制级别的兼容,您可以在不改代码的情况下通过指定一些配置,便可将原来在Hadoop上运行的MapReduce Jar包直接运行在MaxCompute上。目前该工具处于测试阶段,暂时还不能支持自定义Comparator和自定义Key类型。

下文以WordCount程序为例,阐述HadoopMapReduce插件的基本使用方式

  1. 下载Hadoop MapReduce,包名为openmr_hadoop2openmr-1.0.jar,这个Jar包中已经包含hadoop-2.7.2版本的相关依赖,在作业的Jar包中请不要携带Hadoop的依赖,避免版本冲突(ps:是指打包的时候不要带吗?)。
  2. 编译导出WordCount的Jar包,这里命名为wordcount_test.jar。
  3. 执行如下语句创建输入表wc_in和输出表wc_out,并往wc_in表里导入数据。

    1. create table if not exists wc_in(line string);
    2. create table if not exists wc_out(key string, cnt bigint);
  4. 在配置文件wordcount-table-res.conf 中配置表与HDFS文件路径的映射关系。配置文件如下所示。

    {
    "file:/foo": {
     "resolver": {
       "resolver": "com.aliyun.odps.mapred.hadoop2openmr.resolver.TextFileResolver",
       "properties": {
           "text.resolver.columns.combine.enable": "true",
           "text.resolver.seperator": "\t"
       }
     },
     "tableInfos": [
       {
         "tblName": "wc_in",
         "partSpec": {},
         "label": "__default__"
       }
     ],
     "matchMode": "exact"
    },
    "file:/bar": {
     "resolver": {
       "resolver": "com.aliyun.odps.mapred.hadoop2openmr.resolver.BinaryFileResolver",
       "properties": {
           "binary.resolver.input.key.class" : "org.apache.hadoop.io.Text",
           "binary.resolver.input.value.class" : "org.apache.hadoop.io.LongWritable"
       }
     },
     "tableInfos": [
       {
         "tblName": "wc_out",
         "partSpec": {},
         "label": "__default__"
       }
     ],
     "matchMode": "fuzzy"
    }
    }
    

配置项说明:
整个配置是一个JSON文件,描述HDFS上的文件与MaxCompute上的表之间的映射关系,一般要配置输入和输出两部分,一个HDFS路径对应一个resolver配置、tableInfos配置以及matchMode配置。/foo是HDFS的输入文件路径,这里映射到了MaxCompute的wc_in表,/bar同理,由于这个插件的目标是在不改动代码的基础上,对MaxCompute的表数据进行相同的MapReduce处理,而代码传入的又是HDFS路径,因此需要做这么一个映射
(ps:我看懵了一开始还,愣没想明白为啥要做映射,想半天才想起来哦原来之前是Hadoop QAQ,用odps用多了后遗症,特地把这块记下来避免再次懵逼XD)。

  • resolver:用于配置如何对待文件中的数据,目前有com.aliyun.odps.mapred.hadoop2openmr.resolver.TextFileResolver和com.aliyun.odps.mapred.hadoop2openmr.resolver.BinaryFileResolver两个内置的resolver可以选用。除了指定好resolver的名字,还需要为相应的resolver配置一些特性指导它正确的进行数据解析。
    • TextFileResolver:对于纯文本的数据,输入输出都会当成纯文本对待。当作为输入resolver配置时,需要配置的properties有text.resolver.columns.combine.enable和text.resolver.seperator。当text.resolver.columns.combine.enable配置为true时,会把输入表的所有列按照text.resolver.seperator指定的分隔符组合成一个字符串作为输入。否则,会把输入表的前两列分别作为key、value。
    • BinaryFileResolver:可以处理二进制的数据,自动将数据转换为MaxCompute可以支持的数据类型,如:Bigint、Bool、Double等。当作为输出resolver配置时,需要配置的特性有binary.resolver.input.key.class和binary.resolver.input.value.class,分别代表中间结果的key和value类型。
  • tableInfos:HDFS对应的MaxCompute表,目前只支持配置表的名称tblName,而partSpec和label请保持和示例一致。
  • matchMode:路径的匹配模式,可选项为exact和fuzzy,分别代表精确匹配和模糊匹配,如果设置为fuzzy,则可以通过正则来匹配HDFS的输入路径。
  1. 运行如下命令提交作业
    jar -DODPS_HADOOPMR_TABLE_RES_CONF=./wordcount-table-res.conf -classpath hadoop2openmr-1.0.jar,wordcount_test.jar com.aliyun.odps.mapred.example.hadoop.WordCount /foo /bar
    

功能介绍

基本概念

Map/Reduce

Map和Reduce分别支持对应的map/reduce方法、setup及cleanup方法。setup方法在map/reduce方法之前调用,每个Worker调用且仅调用一次。cleanup方法在map/reduce方法之后调用,每个Worker调用且仅调用一次。

排序

支持将Map输出的key record中的某几列作为排序(Sort)列,不支持您自定义的比较器(comparator)。您可以在排序列中选择某几列作为Group列,不支持您自定义的Group比较器。Sort列一般用来对您的数据进行排序,而Group列一般用来进行二次排序。

哈希

支持设置哈希(partition)列及用户自定义哈希函数(partitioner)。哈希列的使用优先级高于自定义哈希函数。
哈希函数用于将Map端的输出数据按照哈希逻辑分配到不同的Reduce Worker上。

归并

归并(Combiner)函数将Shuffle阶段相邻的Record进行归并。您可以根据不同的业务逻辑选择是否使用归并函数。
归并函数是MapReduce计算框架的一种优化,通常情况下,Combiner的逻辑与Reduce相同。当Map输出数据后,框架会在Map端对相同key值的数据进行本地的归并操作。

作业提交

Jar命令用于运行MapReduce作业,具体语法如下所示:

jar [<GENERIC_OPTIONS>] <MAIN_CLASS> [ARGS];
        -conf <configuration_file>         Specify an application configuration file
        -resources <resource_name_list>    file\table resources used in mapper or reducer, seperate by comma
        -classpath <local_file_list>       classpaths used to run mainClass
        -D <name>=<value>                  Property value pair, which will be used to run mainClass
        -l                                 Run job in local mode

详见MapReduce 作业提交

输入与输出

  • MaxCompute MapReduce的输入、输出,支持MaxCompute内置类型的Bigint、Double、String、Datetime和Boolean类型,不支持自定义类型。
  • 接受多表输入,且输入表的Schema可以不同。在map函数中,您可以获取当前Record对应的Table信息。
  • 输入可以为空,不支持视图(View)作为输入。
  • Reduce接受多路输出,可以输出到不同表,或者同一张表的不同分区。不同输出的Schema可以不同。不同输出间通过label进行区分,默认输出不必加label,但目前不接受没有输出的情况。

资源使用

可以在map/reduce中读取MaxCompute资源,map/reduce的任意Worker都会将资源加载到内存中,以供您的代码使用。

本地运行

详见MapReduce本地运行(ps:因为没有机会本地测试XD,怎么测都得上集群)

示例程序

这里不会详解代码,只会结合SDK文档记录关键点,用于在实际应用的时候拿来即用进行二次开发
(每天不定时更新一份源码解析XD直到更新完成)

WordCount示例

官网链接https://help.aliyun.com/document_detail/27886.html?spm=a2c4g.11174283.6.751.17b1590elzgSB7
关键点

  1. WordCount是MapReduce最经典的案例,主要分为三部分,Mapper、Reducer、Driver。在官方文档的例子中还添加了Combiner,逻辑与Reducer一致,在Map端将每个mapper的数据排序,将key相同的数据做归并。
  2. setup方法中,先创建Map输出Key和Value的记录对象,由于Map的输出是要为,因此用set方法为变量one赋值1,同时调用上下文对象的getTaskID()方法打印出Task ID。
  3. map方法中,是对每一行记录做的处理逻辑,也就是操作Record对象,外层的循环通过Record.getColumnCount()方法获取字段数,逐个字段进行处理。map方法传入的参数有三个:recordNum是行号,也就是record的偏移量;record是记录行对象,context是任务的上下文对象。

    但是这里有个坑,MaxCompute默认分隔符为’,’逗号,即使输入的表只有一个字段,也就是当warehouse元数据中的schema为一个字段,且这个字段的值包含逗号的时候,getColumnCount()方法也会按逗号进行切分从而判断字段数。由于getColumnCount()方法获取的字段数与schema的字段数不一致,就会报如下错误,这个问题笔者还没有解>,<。

Exception in thread "main" java.lang.RuntimeException: com.aliyun.odps.OdpsException: java.lang.RuntimeException: java.io.IOException: column num of table 'example_project.wc_in1' not match, expect: 1, but: 4
    at com.aliyun.odps.mapred.LocalJobRunner.submit(LocalJobRunner.java:163)
    at com.aliyun.odps.mapred.JobClient.submitJob(JobClient.java:180)
    at com.aliyun.odps.mapred.JobClient.runJob(JobClient.java:80)
    at wordcount.WordCountDriver.main(WordCountDriver.java:34)
  1. 在官方文档的示例中,是通过Record对象的get方法,通过下标获取字段的值,再使用word的set方法将zi’d值作为key;但是在Haddop版本的WordCount中,是对行数据根据分隔符进行切分再逐个值做处理,因此在表字段数为1的情况下,可以将map方法改写如下:

    @Override
     public void map(long recordNum, Record record, TaskContext context) throws IOException {
         for (int i = 0; i < record.getColumnCount(); i++) {
             String[] words = record.get(i).toString().split(" ");
             for (String w:words) {
                 word.set(new Object[]{w});
                 context.write(word,one);
             }
         }
     }
    

    先将该字段按分隔符切分,并将切分的结果放到一个数组里,再用foreach将每个值作为word对象的set方法的传入参数,每次传入后调用上下文对象的context的write方法将其作为map的一个输出。

  2. 接下来是Combiner,Combiner实现的接口和Reducer一样,可以立即在Mapper本地执行的一个Reduce运算,作用是减少Mapper的输出量。由于是在Map端执行的运算,因此在setup方法中需要创建Map输出Value的记录对象count,作为Mapper的最终输出值之一。

在WordCount中,Reduce的计算逻辑是将相同key的所有value值相加,因此Combiner的逻辑也是一样的,实现的reduce方法传入参数有三个:key,即是从Mapper处理完成后,产生的key;values,是从Mapper处理完成之后,相同key的values的数据集合,而且这是一个迭代器;context,任务的上下文对象。由于迭代器中的类型为Record类型,因此在获取真正的值时需要通过get方法传入index值,即值0;在完成了value值的归并相加后,使用count对象的set方法,将相加后的值放入index=0中。

  1. Reducer的逻辑与Combiner一致,唯一不同的就是输出的对象要使用context.createOutputRecord()方法实例化。
  2. 最后是Driver,也就是main方法入口,需要设置Mapper、Combiner、Reducer的类名,设置Mapper中间结果的key和value的Schema,设置输入和输出的表信息,最后runJob。

(ps:能不能在案例里加注释,啊❓)

MapOnly示例

多路输入输出示例

多任务示例

二次排序示例

使用资源示例

使用Counter示例

Grep示例

Join示例

Sleep示例

Unique示例

Sort示例

分区表输入示例

Pipeline示例

Java SDK

原生SDK概述

详见MapReduce核心接口

兼容版本SDK概述

详见MaxCompute与Hadoop的MapReduce兼容性

Java沙箱

MaxCompute MapReduce及UDF程序在分布式环境中运行时,受到Java沙箱的限制(MapReduce作业的主程序,例如MR Main则不受此限制)。
详见Java沙箱限制

MapReduce使用限制

详见MapReduce使用限制