一、项目基础

1. 如何开发应用

首先需要将hadoop 3.1.1解压到对应文件夹下,然后下载winutils将对应版本的文件复制到bin目录下,完成之后我们还需要调整环境变量,主要设置如下:

  1. HADOOP_HOME=hadoop路径
  2. PATH=%HADOOP%\bin

同时为了还需要将hadoop.dll复制到C:\Windows\System32下保证其访问有效性,接着只要将项目生成对应的
jar文件后通过如下方式即可运行:

  1. export HADOOP_CLASSPATH=mapreducedemo.jar
  2. hadoop MaxTempreatureDriver input/sample.txt output

当然为了更便于测试本项目使用了hadoop-minicluster依赖更佳便于整体的测试,其中单元测试中就可以完全在
本地模拟一个环境进行执行测试。

二、学习知识

fs.default.name设置Hadoop的默认文件系统,而由dfs.replication设置块副本数量,其默认为3,测试
开发环境需要设置为1副本。

1. hadoop fs指令集

  • copyFromLocal: 将本地文件复制到HDFS
  • copyToLocal:将HDFS文件复制到本地
  • mkdir:创建文件夹
  • ls:查询文件

2. FileSystem

如果需要读取HDFS中的文件,我们可以通过其提供的FileSystem类,并通过其中提供的各类get方法获取
到实例。读取文件则需要通过open方法打开指定的路径后通过FSDataInputStream对象进行操作。该方法
继承了DataInputStream对象,实现了SeekablePositionedReadable接口以实现随机访问功能。具
体读取字节等通过IOUtils工具类。下面通过代码示例:

  1. FileSystem fs = FileSystem.get(uri, conf);
  2. FSDataInputStream in = null;
  3. try {
  4. in = fs.open(new Path(uri));
  5. IOUtils.copyBytes(in, System.out, 4096, false);
  6. in.seek(0);
  7. IOUtils.copyBytes(in, System.out, 4096, false);
  8. } finally {
  9. IOUtils.closeStream(in);
  10. }

如果我们需要写入文件则可以通过createappend创建或者追加一个文件,通过其返回的FSDataOutputStream
兑现实现数据的写入,因为HDFS的特性,我们还可以通过Progressable接收异步的进度。下面我们通过一个简单的
例子来说如何使用。

  1. FileSystem fs = FileSystem.get(uri, conf);
  2. InputStream in = new BufferedInputStream(new FileInputStream(srcPath));
  3. OutputStream out = fs.create(new Path(uri), new Progressable() {
  4. public void progress() {
  5. System.out.print(".");
  6. }
  7. });
  8. IOUtils.copyBytes(in, out, 4096, true);

除了以上几个常用的写入写出以外还支持以下对HDFS的操作方法:

  • mkdirs: 创建文件夹
  • exists: 判断文件是否存在
  • getFileStatus: 查询文件元数据,通过返回FileStatus对象
  • listStatus: 列出目录下所有文件的元数据
  • globalStatus: 支持通配符查询文件元数据
  • delete: 删除数据

3. OutputCommitter

如果希望在一个任务完成后自行处理相关的后续清理或其他工作可以自行实现OutputCommitter接口,并
通过JobConfsetCommitter配置进去。而默认采用FileOutputCommitter实现。

4. Partitioner

用于指定分区键,默认的实现为HashPartitioner类,其具体的方法签名如下:

  1. public abstract class Partitioner<KEY, VALUE> {
  2. public abstract int getPartition(KEY key, VALUE value, int numPartitions);
  3. }

5. 多个输入

考虑到实际的业务场景可能需要一个mapreduce处理两个文件,那么我们就需要使用如下的方式进行添加。

  1. MultipleInputs.addInputPath(job, ncdcInputPath, TextInputFormat.class, MaxTemperatureMapper.class);
  2. MultipleInputs.addInputPath(job, metofficeInputPath, TextInputFormat.class, MetofficMaxTemperatureMapper.class);

6. 多个输出

为了能够自定义输出的文件目录接口,可以通过MultipleOutputs实现,其需要在任务的reducer
使用,并通过其将记录输出即可。

  1. static class MultipleOutputReducer extends Reducer<Text, Text, NullWritable, Text> {
  2. private MultipleOutputs<NullWritable, Text> multipleOutputs;
  3. @Override
  4. protected void setup(Context context) {
  5. multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);
  6. }
  7. @Override
  8. public void reduce(Text key, Iterable<Text> values, Context context) {
  9. for (Text value : values) {
  10. multipleOutputs.write(NullWritable.get(), value, key.toString());
  11. }
  12. }
  13. @Override
  14. protected void cleanup(Context context) {
  15. multipleOutputs.close();
  16. }
  17. }

7. 计数

除了诸多自带的计数器,用户也可以通过mapreduce任务中的context进行任务统计。

  1. context.getCounter(Temperature.MISSING).increment(1);
  2. context.getCounter("TemperatureQuality", getQuality()).increment(1);