小文件问题

image.png

需求: 10个小文件,统计里面单词出现的情况

image.png

SequenceFile

image.png
可以理解为把很多小文件压缩为一个大压缩包

  1. /**
  2. * 小文件解决方案之SequenceFile
  3. */
  4. public class SmallFileSeq {
  5. public static void main(String[] args) throws Exception {
  6. //生成SequenceFile文件
  7. write("D:\\smallFile", "/seqFile");
  8. //读取SequenceFile文件
  9. read("/seqFile");
  10. }
  11. /**
  12. * 生成SequenceFile文件
  13. *
  14. * @param inputDir 输入目录-windows目录
  15. * @param outputFile 输出文件-hdfs文件
  16. * @throws Exception
  17. */
  18. private static void write(String inputDir, String outputFile) throws Exception {
  19. //创建一个配置对象
  20. Configuration conf = new Configuration();
  21. //指定HDFS的地址
  22. conf.set("fs.defaultFS", "hdfs://bigdata1:9000");
  23. //获取操作HDFS的对象
  24. FileSystem fileSystem = FileSystem.get(conf);
  25. //删除HDFS上的输出文件
  26. fileSystem.delete(new Path(outputFile), true);
  27. //构造opts数组,有三个元素
  28. /*
  29. 第一个是输出路径【文件】
  30. 第二个是key的类型
  31. 第三个是value的类型
  32. */
  33. SequenceFile.Writer.Option[] opts = new SequenceFile.Writer.Option[]{
  34. SequenceFile.Writer.file(new Path(outputFile)),
  35. SequenceFile.Writer.keyClass(Text.class),
  36. SequenceFile.Writer.valueClass(Text.class)
  37. };
  38. //创建了一个writer实例
  39. SequenceFile.Writer writer = SequenceFile.createWriter(conf, opts);
  40. //指定需要压缩的文件的目录
  41. File inputDirPath = new File(inputDir);
  42. if (inputDirPath.isDirectory()) {
  43. //获取目录中的文件
  44. File[] files = inputDirPath.listFiles();
  45. //迭代文件
  46. for (File file : files) {
  47. //获取文件的全部内容
  48. String content = FileUtils.readFileToString(file, "UTF-8");
  49. //获取文件名
  50. String fileName = file.getName();
  51. Text key = new Text(fileName);
  52. Text value = new Text(content);
  53. //向SequenceFile中写入数据
  54. writer.append(key, value);
  55. }
  56. }
  57. writer.close();
  58. }
  59. /**
  60. * 读取SequenceFile文件
  61. *
  62. * @param inputFile SequenceFile文件路径
  63. * @throws Exception
  64. */
  65. private static void read(String inputFile) throws Exception {
  66. //创建一个配置对象
  67. Configuration conf = new Configuration();
  68. //指定HDFS的地址
  69. conf.set("fs.defaultFS", "hdfs://bigdata1:9000");
  70. //创建阅读器
  71. SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(new Path(inputFile)));
  72. Text key = new Text();
  73. Text value = new Text();
  74. //循环读取数据
  75. while (reader.next(key, value)) {
  76. //输出文件名称
  77. System.out.print("文件名:" + key.toString() + ",");
  78. //输出文件内容
  79. System.out.println("文件内容:" + value.toString() + "");
  80. }
  81. reader.close();
  82. }
  83. }

运行结果:
image.png

MapFile

image.png

  1. /**
  2. * 小文件解决方案之MapFile
  3. */
  4. public class SmallFileMap {
  5. public static void main(String[] args) throws Exception{
  6. //生成MapFile文件
  7. write("D:\\smallFile","/mapFile");
  8. //读取MapFile文件
  9. read("/mapFile");
  10. }
  11. /**
  12. * 生成MapFile文件
  13. * @param inputDir 输入目录-windows目录
  14. * @param outputDir 输出目录-hdfs目录
  15. * @throws Exception
  16. */
  17. private static void write(String inputDir,String outputDir) throws Exception{
  18. //创建一个配置对象
  19. Configuration conf = new Configuration();
  20. //指定HDFS的地址
  21. conf.set("fs.defaultFS","hdfs://bigdata1:9000");
  22. //获取操作HDFD的对象
  23. FileSystem fileSystem = FileSystem.get(conf);
  24. //删除HDFS上的输出文件
  25. fileSystem.delete(new Path(outputDir),true);
  26. //构造opts数组,有两个元素
  27. /*
  28. 第一个是key的类型
  29. 第二个是value的类型
  30. */
  31. SequenceFile.Writer.Option[] opts = new SequenceFile.Writer.Option[]{
  32. MapFile.Writer.keyClass(Text.class),
  33. MapFile.Writer.valueClass(Text.class)
  34. };
  35. //创建了一个writer实例
  36. MapFile.Writer writer = new MapFile.Writer(conf, new Path(outputDir), opts);
  37. //指定需要压缩的文件的目录
  38. File inputDirPath = new File(inputDir);
  39. if(inputDirPath.isDirectory()){
  40. //获取目录中的文件
  41. File[] files = inputDirPath.listFiles();
  42. //迭代文件
  43. for (File file: files) {
  44. //获取文件的全部内容
  45. String content = FileUtils.readFileToString(file, "UTF-8");
  46. //获取文件名
  47. String fileName = file.getName();
  48. Text key = new Text(fileName);
  49. Text value = new Text(content);
  50. //向SequenceFile中写入数据
  51. writer.append(key,value);
  52. }
  53. }
  54. writer.close();
  55. }
  56. /**
  57. * 读取MapFile文件
  58. * @param inputDir MapFile文件路径
  59. * @throws Exception
  60. */
  61. private static void read(String inputDir)throws Exception{
  62. //创建一个配置对象
  63. Configuration conf = new Configuration();
  64. //指定HDFS的地址
  65. conf.set("fs.defaultFS","hdfs://bigdata1:9000");
  66. //创建阅读器
  67. MapFile.Reader reader = new MapFile.Reader(new Path(inputDir),conf);
  68. Text key = new Text();
  69. Text value = new Text();
  70. //循环读取数据
  71. while(reader.next(key,value)){
  72. //输出文件名称
  73. System.out.print("文件名:"+key.toString()+",");
  74. //输出文件内容
  75. System.out.println("文件内容:"+value.toString()+"");
  76. }
  77. reader.close();
  78. }
  79. }

运行结果:
image.png

计算生成的SequenceFile

  1. /**
  2. * 需求:读取SequenceFile文件
  3. */
  4. @Slf4j
  5. public class WordCountJobSeq {
  6. /**
  7. * Map阶段
  8. */
  9. public static class MyMapper extends Mapper<Text, Text, Text, LongWritable> {
  10. /**
  11. * 需要实现map函数
  12. * 这个map函数就是可以接收<k1,v1>,产生<k2,v2>
  13. *
  14. * @param k1 这里的k1不是偏移量,而是SequenceFile的文件名
  15. * @param v1 文件内容
  16. * @param context
  17. * @throws IOException
  18. * @throws InterruptedException
  19. */
  20. @Override
  21. protected void map(Text k1, Text v1, Context context) throws IOException, InterruptedException {
  22. log.info("<k1,v1>=<{},{}>", k1.toString(), v1.toString());
  23. //对获取到的每一行数据进行切割,把单词切割出来
  24. String[] words = v1.toString().split(" ");
  25. //迭代切割出来的单词数据
  26. for (String word : words) {
  27. //把迭代出来的单词封装成<k2,v2>的形式
  28. Text k2 = new Text(word);
  29. LongWritable v2 = new LongWritable(1L);
  30. //把<k2,v2>写出去
  31. context.write(k2, v2);
  32. }
  33. }
  34. }
  35. /**
  36. * Reduce阶段
  37. */
  38. public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
  39. /**
  40. * 针对<k2,{v2...}>的数据进行累加求和,并且最终把数据转化为k3,v3写出去
  41. *
  42. * @param k2
  43. * @param v2s
  44. * @param context
  45. * @throws IOException
  46. * @throws InterruptedException
  47. */
  48. @Override
  49. protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException {
  50. //创建一个sum变量,保存v2s的和
  51. long sum = 0L;
  52. //对v2s中的数据进行累加求和
  53. for (LongWritable v2 : v2s) {
  54. log.info("<k2,v2>=<{},{}>", k2.toString(), v2.get());
  55. sum += v2.get();
  56. }
  57. //组装k3,v3
  58. Text k3 = k2;
  59. LongWritable v3 = new LongWritable(sum);
  60. log.info("<k3,v3>=<{},{}>", k3.toString(), v3.get());
  61. // 把结果写出去
  62. context.write(k3, v3);
  63. }
  64. }
  65. /**
  66. * 组装Job=Map+Reduce
  67. */
  68. public static void main(String[] args) {
  69. try {
  70. if (args.length != 2) {
  71. //如果传递的参数不够,程序直接退出
  72. System.exit(100);
  73. }
  74. //指定Job需要的配置参数
  75. Configuration conf = new Configuration();
  76. //创建一个Job
  77. Job job = Job.getInstance(conf);
  78. //注意了:这一行必须设置,否则在集群中执行的时候是找不到WordCountJob这个类的
  79. job.setJarByClass(WordCountJobSeq.class);
  80. //指定输入路径(可以是文件,也可以是目录)
  81. FileInputFormat.setInputPaths(job, new Path(args[0]));
  82. //指定输出路径(只能指定一个不存在的目录)
  83. FileOutputFormat.setOutputPath(job, new Path(args[1]));
  84. //指定map相关的代码
  85. job.setMapperClass(MyMapper.class);
  86. //指定k2的类型
  87. job.setMapOutputKeyClass(Text.class);
  88. //指定v2的类型
  89. job.setMapOutputValueClass(LongWritable.class);
  90. //设置输入数据处理类
  91. job.setInputFormatClass(SequenceFileInputFormat.class);
  92. //指定reduce相关的代码
  93. job.setReducerClass(MyReducer.class);
  94. //指定k3的类型
  95. job.setOutputKeyClass(Text.class);
  96. //指定v3的类型
  97. job.setOutputValueClass(LongWritable.class);
  98. //提交job
  99. job.waitForCompletion(true);
  100. } catch (Exception e) {
  101. e.printStackTrace();
  102. }
  103. }
  104. }

计算之前生成的seqFile

hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.mr.WordCountJobSeq /seqFile /out10

查看计算结果
image.png
查看日志
Map阶段:
image.png
Reduce阶段
image.png

总结

1、将一堆小文件合并为一个SequenceFile
2、使用MapReduce计算SequenceFile

附:一个小文件有多行的情况时,可能需要修改

image.png
image.png
image.png