每个文件、目录、块都大概有150字节的元数据,文件数量的限制也由namenode内存大小决定,如果小文件过多则会造成namenode的压力过大,且HDFS能存储的数据总量也会变小。
小文件治理的方式,之前已经使用JavaAPI方式演示过,下面讲解其它方式。

一、HAR文件方案

HAR文件方案的本质是启动Mapreduce程序,所以需要启动yarn
通过命令即可完成:

  1. hadoop archive -archiveName <NAME>.har -p <parent path> [-r <replication factor>]<src>* <dest>
  2. /*
  3. -archiveName <NAME>.har: 归档文件(许多小文件形成的一个大文件)
  4. -p <parent path>:小文件的父级目录
  5. [-r <replication factor>]:可选,副本个数
  6. <src>* <dest>:归档文件存放的路径
  7. */

案例:

  1. 第一步:创建归档文件
  2. cd /kkb/install/hadoop-2.6.0-cdh5.14.2/bin
  3. hadoop archive -archiveName myhar.har -p /user/hadoop /user
  4. 第二步:查看归档文件内容
  5. [hadoop@node01 bin]$ hdfs dfs -ls -R har:///myhar.har/
  6. -rw-r--r-- 3 hadoop supergroup 0 2020-02-10 23:34 har:///myhar.har/k1.txt
  7. -rw-r--r-- 3 hadoop supergroup 0 2020-02-10 23:34 har:///myhar.har/k2.txt
  8. -rw-r--r-- 3 hadoop supergroup 0 2020-02-10 23:35 har:///myhar.har/k3.txt
  9. [hadoop@node01 bin]$ hdfs dfs -ls -R /myhar.har
  10. -rw-r--r-- 3 hadoop supergroup 0 2020-02-10 23:35 /myhar.har/_SUCCESS
  11. -rw-r--r-- 3 hadoop supergroup 326 2020-02-10 23:35 /myhar.har/_index
  12. -rw-r--r-- 3 hadoop supergroup 23 2020-02-10 23:35 /myhar.har/_masterindex
  13. -rw-r--r-- 3 hadoop supergroup 0 2020-02-10 23:35 /myhar.har/part-0
  14. //所有小文件的内容合并形成了part-0文件
  15. 第三步:解压归档文件
  16. hdfs dfs -mkdir -p /user/har
  17. hdfs dfs -cp har:///user/myhar.har/* /user/har/

image.png

二、Sequence File方案

SequenceFile文件,主要由一条条record记录组成;每个record是键值对形式的,SequenceFile文件可以作为小文件的存储容器。每条record保存一个小文件的内容。
小文件名作为当前record的键,小文件的内容作为当前record的值。
image.png

例如有10000100KB的小文件,可以编写程序将这些文件内容放到一个SequenceFile文件。
一个SequenceFile是可分割的,所以MapReduce可将SequenceFile文件切分成块,每一块独立操作。

SequenceFile的详细结构

  • 一个SequenceFile首先有一个4字节的header(文件版本号)
  • 接着是若干record记录
  • 记录间会随机的插入一些同步点sync marker,用于方便定位到记录边界。(同步点的作用就是如果位移点不在一个record的开始处,就会调用sync方法把位移点移到下一个record的开始处。)

    压缩SequenceFile文件的方式

    不像HARSequenceFile支持压缩。记录的结构取决于是否启动压缩,支持两类压缩:

  • 不压缩NONE方式以及压缩RECORD

image.png

  • 压缩BLOCK
    • 一次性压缩多条record记录。
    • 每一个新块Block开始处都需要插入同步点
    • 在大多数情况下,以block(注意:指的是SequenceFile中的block)为单位进行压缩是最好的选择,因为一个block包含多条记录,利用record间的相似性进行压缩,压缩效率更高

image.png

案例一、向SequenceFile写入数据
把已有的数据转存为SequenceFile比较慢。比起先写小文件,再将小文件写入SequenceFile,一个更好的选择是直接将数据写入一个SequenceFile文件,省去小文件作为中间媒介.
下列代码主要步骤是创建SequenceFile.Writer类的对象,然后使用该对象的append()方法

  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.fs.Path;
  3. import org.apache.hadoop.io.IOUtils;
  4. import org.apache.hadoop.io.IntWritable;
  5. import org.apache.hadoop.io.SequenceFile;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.io.compress.BZip2Codec;
  8. import java.io.IOException;
  9. public class T1 {
  10. //模拟数据源
  11. private static final String[] DATA = {
  12. "I love you krystal","i love you kob3","lakers champion","hahahahhahaha"};
  13. public static void main(String[] args) throws IOException {
  14. //创建hadoop配置文件对象
  15. Configuration conf = new Configuration();
  16. //conf.set("fs.defaultFS","hdfs://node01:8020");
  17. //-->此行可写可不写,可能是默认调用系统的一些资源
  18. /*创建向SequenceFile文件写入数据时的一些选项,
  19. 为等会创建SequenceFile.Writer对象提供参数*/
  20. //要写入的SequenceFile的路径
  21. SequenceFile.Writer.Option pathOption
  22. = SequenceFile.Writer.file(new Path("hdfs://node01:8020//Seq1"));
  23. //record的key类型选项
  24. SequenceFile.Writer.Option keyOption
  25. = SequenceFile.Writer.keyClass(IntWritable.class);
  26. //record的value类型选项
  27. SequenceFile.Writer.Option valueOption
  28. = SequenceFile.Writer.valueClass(Text.class);
  29. //SequenceFile压缩方式:NONE | RECORD | BLOCK三选一
  30. //方案一:RECORD压缩、不指定压缩算法
  31. SequenceFile.Writer.Option compressOption
  32. = SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD);
  33. SequenceFile.Writer writer = SequenceFile.createWriter(conf, pathOption, keyOption, valueOption, compressOption);
  34. /* //方案二:BLOCK压缩、不指定压缩算法
  35. SequenceFile.Writer.Option compressOption = SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK);
  36. SequenceFile.Writer writer = SequenceFile.createWriter(conf, pathOption, keyOption, valueOption, compressOption);
  37. //方案三:使用BLOCK压缩、指定压缩算法:BZip2Codec;压缩耗时间
  38. BZip2Codec codec = new BZip2Codec();
  39. codec.setConf(conf);
  40. SequenceFile.Writer.Option compressAlgorithm = SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD, codec);
  41. //创建写数据的Writer实例
  42. SequenceFile.Writer writer = SequenceFile.createWriter(conf, pathOption, keyOption, valueOption, compressAlgorithm);
  43. */
  44. //因为SequenceFile每个record是键值对的
  45. //指定key类型,value类型:
  46. IntWritable key = new IntWritable();
  47. Text value = new Text();
  48. //IntWritable是Java的int类型的对应Hadoop的的可序列化类型
  49. //Text是Java的String类型的对应Hadoop的可序列化类型
  50. for (int i = 0; i < DATA.length; i++) {
  51. //分别设置key、value值
  52. key.set(i);
  53. value.set(DATA[i]);
  54. System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
  55. //在SequenceFile末尾追加内容
  56. writer.append(key, value);
  57. }
  58. //关闭流
  59. IOUtils.closeStream(writer);
  60. }
  61. }

案例二、查看SequenceFile
shell命令方式: hadoop fs -text /writeSequenceFile
JavaApi方式

  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.fs.Path;
  3. import org.apache.hadoop.io.IOUtils;
  4. import org.apache.hadoop.io.SequenceFile;
  5. import org.apache.hadoop.io.Writable;
  6. import org.apache.hadoop.util.ReflectionUtils;
  7. import java.io.IOException;
  8. public class T1 {
  9. public static void main(String[] args) throws IOException {
  10. Configuration conf = new Configuration();
  11. //创建Reader对象,赋初值为null
  12. SequenceFile.Reader reader = null;
  13. try {
  14. //读取SequenceFile的Reader的路径选项
  15. SequenceFile.Reader.Option pathOption = SequenceFile.Reader.file(new Path("hdfs://node01:8020/writeSequenceFile"));
  16. //实例化Reader对象
  17. reader = new SequenceFile.Reader(conf, pathOption);
  18. //根据反射,求出key类型(hadoop对应的什么类型的可序列化类型)
  19. Writable key = (Writable)
  20. ReflectionUtils.newInstance(reader.getKeyClass(), conf);
  21. //根据反射,求出value类型
  22. Writable value = (Writable)
  23. ReflectionUtils.newInstance(reader.getValueClass(),conf);
  24. long position = reader.getPosition();
  25. System.out.println(position);
  26. while (reader.next(key, value)) {
  27. String syncSeen = reader.syncSeen() ? "*" : "";
  28. System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value);
  29. position = reader.getPosition();
  30. // beginning of next record定位到下一个record
  31. }
  32. } finally {
  33. //关闭资源
  34. IOUtils.closeStream(reader);
  35. }
  36. }
  37. }