复合键值对的使用

复合键完成排序

【用复合键让系统完成排序,把小的键值对合并成大的键值对】

  • 核心思想:进行Partition处理时,系统自动按照map的输出键进行排序。
  • 适用情况:value中某些值需要进行排序时,加入key中形成复合键。
  • Note:参考前面倒排索引实现,需要实现一个新的Partitioner,保证同一个key的键值对被分到同一个节点上。

    小键值对合并成大键值对

    例:
    image.png

用户自定义数据类型

Hadoop内置数据类型

都实现了WritbaleComparable接口,以实现网络传输和文件存储,以及大小比较
例:
image.png

  • Write,readFields进行序列化与反序列化,进行网络传输。
    • write和read顺序一致,此处为管道模型

Notes:
关于Array,List类型的传输:

  1. 在readFields里面进行数组或列表初始化
  2. 对于数组中元素进行拷贝时,注意区分变量为 基本数据类型 or 引用数据类型,否则会出现深拷贝与浅拷贝方面错误

用户自定义输入输出格式

内置

image.png
image.png
RecordReader用于读取InputFormat
可能考点?[如果需要将行划分为key-val形式读入,使用哪种InputFormat? : KeyValueTextInputFormat]

例子:在倒排索引一节时,期望将offset与filename拼接作为Key:
解决方案:自定义一个RecordReader
代码:

  1. public class FileNameOffsetRecordReader extends RecordReader<Text, Text>
  2. {
  3. String fileName;
  4. LineRecordReader lrr = new LineRecordReader(); //默认的reader
  5. ……
  6. @Override
  7. //该函数的返回值,作为map的key的输入,此处将filename与offset进行拼接
  8. public Text getCurrentKey() throws IOException, InterruptedException
  9. { return new Text("(" + fileName + “#" + lrr.getCurrentKey() + ")"); }
  10. @Override
  11. //默认value返回值
  12. public Text getCurrentValue() throws IOException, InterruptedException
  13. { return lrr.getCurrentValue(); }
  14. @Override
  15. //initialize用于获取filename,与Mapper的setup函数类似,在创建时执行
  16. public void initialize(InputSplit arg0, TaskAttemptContext arg1)
  17. throws IOException, InterruptedException
  18. {
  19. lrr.initialize(arg0, arg1);
  20. fileName = ((FileSplit)arg0).getPath().getName();
  21. }
  22. }

job.setInputFormatClass(``FileNameOffsetInputFormat``.class); //在Job中进行指定
image.png

用户自定义Partitioner和Combiner

考点:为什么Combiner输出时不能改变其输入键值对的格式?答:Combiner的输出是Reducer的输入,添加Combiner的绝不能改变最终的计算结果。

定制Partitioner

  1. Class NewPartitioner extends HashPartitioner<K,V>
  2. { // override the method
  3. void getPartition(K key, V value, int numReduceTasks)
  4. {
  5. term = key.toString().split(“,”)[0]; //<term, docid>=>term
  6. super.getPartition(term, value, numReduceTasks); //调用默认partition函数,
  7. } //此处改变的仅是key值
  8. }

Job.setPartitionerClass(NewPartitioner)

定制Combiner

  • 作用:减少网络数据的传输量,提高系统效率

    1. public static class NewCombiner extends Reducer
    2. < Text, IntWritable, Text, IntWritable >
    3. {
    4. public void reduce(Text key, Iterable<IntWritable> values, Context context)
    5. throws IOException, InterruptedException
    6. { //具体实现
    7. }
    8. }

    组合式MapReduce的设计【PASS】

  • 基本问题:一些问题难以用一趟MapReduce处理来完成,需要分成多趟MapReduce的子任务组合。

    链式作业

    每个子任务提供独立的Jobconf,并按照前后子任务间输入输出关系设置路径。例如:
    image.png

    数据依赖关系MapReduce子任务

    比如:Z任务,需要依赖X,Y的执行结果。

    1. jobx = new Job(jobxconf, Jobx");
    2. ……
    3. joby = new Job(jobyconf, Joby");
    4. ……
    5. jobz = new Job(jobzconf, Jobz");
    6. jobz.addDependingJob(jobx); // jobz将等待jobx执行完毕
    7. jobz.addDependingJob(joby); // jobz将等待joby执行完毕
    8. JobControl JC = new JobControl(“XYZJob”);
    9. JC.addJob(jobx);
    10. JC.addJob(joby);
    11. JC.addJob(jobz);
    12. JC.run()

    addDepending实际形成了一个拓扑图,使得JobControl优先处理没有依赖关系的job

    预处理和后处理链式执行

    Hadoop提供 链式 ChainMapperChainReducer 完成处理。
    但现在Spark能够实现的更好。

多数据源的连接

  • 基本问题:MapReduce任务需要访问和处理两个甚至多个数据集。关系数据库中使用Join进行连接,但Hadoop系统没有Join,故需要基于MapReduce来实现Join功能。
  • 思想:
    • Map中将Key相同的数据发到同一个Reduce进行处理
    • Reduce对来自不同表的数据进行笛卡尔积运算
    • combine对笛卡尔积结果进行处理
  • 例子:

image.png
image.png
在此处,不同表间使用一个tag标签进行区分。
image.png

DataJoin类实现Reduce端的Join

课件P46-60,平时使用不多,且不灵活。
【不知道会不会考】

全局参数/数据文件的传递

全局作业参数

  • Configuration类提供用于保存和获取属性的方法
  • mapper/reducer类初始化方法 setup() 从confiuration对象中读出属性

    全局数据文件的传递

  • 使用DistributedCache文件传递机制

  • Job.addCacheFile(URI uri) //添加文件
  • Path[] cacheFiles = context.getLocalCacheFiles() //获取文件路径

其他处理技术

划分多个输出文件集合

  • 应用场景:作业需要输出多个文件集合。
  • 解决方案:MultipleOutputFormat类中的 generateFileNameForKeyValue``(K key, V value,String current_filename)

    根据键值对和当前数据文件名返回一个输出文件路径

输入输出到关系数据库

  • OLTP (online transaction processing)
    • 联机事务处理:主要是关系数据库应用系统中前台常规的各种事务处理
  • OLAP (online analytical processing)
    • 联机分析处理:主要是进行基于数据仓库的后台数据分析和挖掘,提供优化的客户服务和运营决策支持

Q1: OLAP端采用基于关系数据库仓库的解决方案,在数据量巨大的情况下,复杂数据分析和挖掘处理负载很大
解决方案:提供基于MapReduce大规模数据并行处理的OLAP

  • 输入输出简单记为普通格式前加个DB