一、HDFS文件上传
/**
* @Description 通过流上传文件到hdfs
* @MethodName putFileToHDFS
* @Param
* @Return void
* @Date 2019年10月28日 18:18:25
* @Author Wells
*/
@Test
public void putFileToHDFS() throws Exception {
Configuration conf = new Configuration();
conf.addResource(new Path("/home/wells/Projects/05-Github/01-java/mapreduce-demo/src/main/resources/core-site.xml"));
conf.addResource(new Path("/home/wells/Projects/05-Github/01-java/mapreduce-demo/src/main/resources/hdfs-site.xml"));
// 1.获取文件系统
FileSystem fs = FileSystem.get(new URI("hdfs://xxx-xx-xx"), conf, "root");
// 2.创建输入流
FileInputStream fis = new FileInputStream(new File("/home/wells/Downloads/data"));
// 3.获取输出流
FSDataOutputStream fos = fs.create(new Path("/tmp/wells/testData"));
// 4.流对拷
IOUtils.copyBytes(fis, fos, conf);
// 5.关闭资源
IOUtils.closeStream(fos);
IOUtils.closeStream(fis);
fs.close();
}
二、HDFS文件下载
/**
* @Description 通过流方式从hdfs读取文件到本地
* @MethodName getFileFromHDFS
* @Param
* @Return void
* @Date 2019年10月28日 18:21:35
* @Author Wells
*/
@Test
public void getFileFromHDFS() throws Exception {
Configuration conf = new Configuration();
conf.addResource(new Path("/home/wells/Projects/05-Github/01-java/mapreduce-demo/src/main/resources/core-site.xml"));
conf.addResource(new Path("/home/wells/Projects/05-Github/01-java/mapreduce-demo/src/main/resources/hdfs-site.xml"));
// 1.获取文件系统
FileSystem fs = FileSystem.get(new URI("hdfs://xxx-xx-xx"), conf, "root");
// 2.创建输入流
FSDataInputStream fis = fs.open(new Path("/tmp/wells/testData"));
// 3.获取输出流
FileOutputStream fos = new FileOutputStream(new File("/home/wells/Downloads/testData"));
// 4.流对拷
IOUtils.copyBytes(fis, fos, conf);
// 5.关闭资源
IOUtils.closeStream(fos);
IOUtils.closeStream(fis);
fs.close();
}
三、HDFS定位读取文件
/**
* @Description 分块读取文件part1
* @MethodName readFileSeek1
* @Param
* @Return void
* @Date 2019年10月28日 18:28:32
* @Author Wells
*/
@Test
public void readFileSeek1() throws Exception {
Configuration conf = new Configuration();
conf.addResource(new Path("/home/wells/Projects/05-Github/01-java/mapreduce-demo/src/main/resources/core-site.xml"));
conf.addResource(new Path("/home/wells/Projects/05-Github/01-java/mapreduce-demo/src/main/resources/hdfs-site.xml"));
// 1.获取文件系统
FileSystem fs = FileSystem.get(new URI("hdfs://xxx-xx-xx"), conf, "root");
// 2.获取输入流
FSDataInputStream fis = fs.open(new Path("/tmp/wells/data"));
// 3.创建输出流
FileOutputStream fos = new FileOutputStream(new File("/home/wells/Downloads/data2"));
// 4.流的拷贝
byte[] buf = new byte[1024];
for (int i = 0; i < 1024 * 20; i++) {
fis.read(buf);
fos.write(buf);
}
// 5关闭资源
IOUtils.closeStream(fis);
IOUtils.closeStream(fos);
fs.close();
}
@Test
public void readFileSeek2() throws Exception {
Configuration conf = new Configuration();
conf.addResource(new Path("/home/wells/Projects/05-Github/01-java/mapreduce-demo/src/main/resources/core-site.xml"));
conf.addResource(new Path("/home/wells/Projects/05-Github/01-java/mapreduce-demo/src/main/resources/hdfs-site.xml"));
// 1.获取文件系统
FileSystem fs = FileSystem.get(new URI("hdfs://xxx-xx-xx"), conf, "root");
// 2.获取输入流
FSDataInputStream fis = fs.open(new Path("/tmp/wells/data"));
fis.seek(1024 * 1024 * 20);
// 3.创建输出流
FileOutputStream fos = new FileOutputStream(new File("/home/wells/Downloads/data2"));
// 4.流的拷贝
IOUtils.copyBytes(fis, fos, conf);
// 5关闭资源
IOUtils.closeStream(fis);
IOUtils.closeStream(fos);
fs.close();
}