一、HDFS文件上传

  1. /**
  2. * @Description 通过流上传文件到hdfs
  3. * @MethodName putFileToHDFS
  4. * @Param
  5. * @Return void
  6. * @Date 2019年10月28日 18:18:25
  7. * @Author Wells
  8. */
  9. @Test
  10. public void putFileToHDFS() throws Exception {
  11. Configuration conf = new Configuration();
  12. conf.addResource(new Path("/home/wells/Projects/05-Github/01-java/mapreduce-demo/src/main/resources/core-site.xml"));
  13. conf.addResource(new Path("/home/wells/Projects/05-Github/01-java/mapreduce-demo/src/main/resources/hdfs-site.xml"));
  14. // 1.获取文件系统
  15. FileSystem fs = FileSystem.get(new URI("hdfs://xxx-xx-xx"), conf, "root");
  16. // 2.创建输入流
  17. FileInputStream fis = new FileInputStream(new File("/home/wells/Downloads/data"));
  18. // 3.获取输出流
  19. FSDataOutputStream fos = fs.create(new Path("/tmp/wells/testData"));
  20. // 4.流对拷
  21. IOUtils.copyBytes(fis, fos, conf);
  22. // 5.关闭资源
  23. IOUtils.closeStream(fos);
  24. IOUtils.closeStream(fis);
  25. fs.close();
  26. }

二、HDFS文件下载

  1. /**
  2. * @Description 通过流方式从hdfs读取文件到本地
  3. * @MethodName getFileFromHDFS
  4. * @Param
  5. * @Return void
  6. * @Date 2019年10月28日 18:21:35
  7. * @Author Wells
  8. */
  9. @Test
  10. public void getFileFromHDFS() throws Exception {
  11. Configuration conf = new Configuration();
  12. conf.addResource(new Path("/home/wells/Projects/05-Github/01-java/mapreduce-demo/src/main/resources/core-site.xml"));
  13. conf.addResource(new Path("/home/wells/Projects/05-Github/01-java/mapreduce-demo/src/main/resources/hdfs-site.xml"));
  14. // 1.获取文件系统
  15. FileSystem fs = FileSystem.get(new URI("hdfs://xxx-xx-xx"), conf, "root");
  16. // 2.创建输入流
  17. FSDataInputStream fis = fs.open(new Path("/tmp/wells/testData"));
  18. // 3.获取输出流
  19. FileOutputStream fos = new FileOutputStream(new File("/home/wells/Downloads/testData"));
  20. // 4.流对拷
  21. IOUtils.copyBytes(fis, fos, conf);
  22. // 5.关闭资源
  23. IOUtils.closeStream(fos);
  24. IOUtils.closeStream(fis);
  25. fs.close();
  26. }

三、HDFS定位读取文件

  1. /**
  2. * @Description 分块读取文件part1
  3. * @MethodName readFileSeek1
  4. * @Param
  5. * @Return void
  6. * @Date 2019年10月28日 18:28:32
  7. * @Author Wells
  8. */
  9. @Test
  10. public void readFileSeek1() throws Exception {
  11. Configuration conf = new Configuration();
  12. conf.addResource(new Path("/home/wells/Projects/05-Github/01-java/mapreduce-demo/src/main/resources/core-site.xml"));
  13. conf.addResource(new Path("/home/wells/Projects/05-Github/01-java/mapreduce-demo/src/main/resources/hdfs-site.xml"));
  14. // 1.获取文件系统
  15. FileSystem fs = FileSystem.get(new URI("hdfs://xxx-xx-xx"), conf, "root");
  16. // 2.获取输入流
  17. FSDataInputStream fis = fs.open(new Path("/tmp/wells/data"));
  18. // 3.创建输出流
  19. FileOutputStream fos = new FileOutputStream(new File("/home/wells/Downloads/data2"));
  20. // 4.流的拷贝
  21. byte[] buf = new byte[1024];
  22. for (int i = 0; i < 1024 * 20; i++) {
  23. fis.read(buf);
  24. fos.write(buf);
  25. }
  26. // 5关闭资源
  27. IOUtils.closeStream(fis);
  28. IOUtils.closeStream(fos);
  29. fs.close();
  30. }
  31. @Test
  32. public void readFileSeek2() throws Exception {
  33. Configuration conf = new Configuration();
  34. conf.addResource(new Path("/home/wells/Projects/05-Github/01-java/mapreduce-demo/src/main/resources/core-site.xml"));
  35. conf.addResource(new Path("/home/wells/Projects/05-Github/01-java/mapreduce-demo/src/main/resources/hdfs-site.xml"));
  36. // 1.获取文件系统
  37. FileSystem fs = FileSystem.get(new URI("hdfs://xxx-xx-xx"), conf, "root");
  38. // 2.获取输入流
  39. FSDataInputStream fis = fs.open(new Path("/tmp/wells/data"));
  40. fis.seek(1024 * 1024 * 20);
  41. // 3.创建输出流
  42. FileOutputStream fos = new FileOutputStream(new File("/home/wells/Downloads/data2"));
  43. // 4.流的拷贝
  44. IOUtils.copyBytes(fis, fos, conf);
  45. // 5关闭资源
  46. IOUtils.closeStream(fis);
  47. IOUtils.closeStream(fos);
  48. fs.close();
  49. }