使用API访问HDFS
一、环境准备
windows下需要下载winutils,且配置环境变量。
winutils下载地址:https://github.com/cdarlint/winutils
创建maven工程hdfs-client-demo,在pom.xml中引入依赖
<dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.28</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.2.1</version></dependency></dependencies>
在src/main/resources目录下创建log4j.properties
## 输出到控制台
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
## 输出到文件
#log4j.appender.logfile=org.apache.log4j.FileAppender
#log4j.appender.logfile.File=target/spring.log
#log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
#log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
二、代码示例
调用示例(创建目录)
这里以上传文件为例,演示hdfs的api调用
// hadoop集群namenode的通信地址
URI uri = new URI("hdfs://hadoop102:8020");
// 创建配置文件
Configuration configuration = new Configuration();
// 获取到HDFS客户端对象
fs = FileSystem.get(uri, configuration, "hadoop");
// 创建目录
fs.mkdirs(new Path("/demo"));
// 关闭资源
fs.close();
完整示例代码
package org.example.hadoop.hdfs;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* HDFS客户端测试
*
* @author zhengpin
* @date 2021/06/07
*/
public class HdfsClient {
/**
* HDFS集群NameNode的通信地址
*/
private static final String HDFS_URI = "hdfs://hadoop102:8020";
/**
* 访问HDFS集群的用户
*/
private static final String HDFS_USER = "hadoop";
/**
* HDFS客户端对象
*/
private FileSystem fs;
/**
* 初始化HDFS客户端
*
* @throws URISyntaxException
* @throws IOException
* @throws InterruptedException
*/
@Before
public void init() throws URISyntaxException, IOException, InterruptedException {
// hadoop集群namenode的通信地址
URI uri = new URI(HDFS_URI);
// 创建配置文件
Configuration configuration = new Configuration();
// 设置副本数量
configuration.set("dfs.replication", "2");
// 获取到HDFS客户端对象
fs = FileSystem.get(uri, configuration, HDFS_USER);
}
/**
* 关闭资源
*
* @throws IOException
*/
@After
public void close() throws IOException {
fs.close();
}
/**
* 创建目录
*
* @throws Exception
*/
@Test
public void testMkdirs() throws IOException {
fs.mkdirs(new Path("/demo"));
}
/**
* 上传文件
*
* @throws IOException
*/
@Test
public void testPut() throws IOException {
// 参数说明
// 1 是否删除原文件
// 2 文件已存在时,是否覆盖
// 3 原文件路径(本地路径)
// 4 目标文件路径(HDFS路径)只写目录时,将在目录下创建一个和原文件名一样的文件;写了目录和文件名时,将按指定文件名创建文件
fs.copyFromLocalFile(false, true, new Path("/Users/zhengpin/Develop/DevOps/git-sync.sh"),
new Path("/demo/git-sync4.sh"));
}
/**
* 下载文件
*/
@Test
public void testGet() throws IOException {
// 参数说明
// 1 是否删除原文件
// 2 原文件路径(HDFS路径)
// 3 目标文件路径路径(本地路径)
// 4 是否开启本地校验(CRC),为false时,会同时下载一个.crc的文件
fs.copyToLocalFile(false, new Path("/demo/git-sync4.sh"), new Path("/Users/zhengpin/Downloads/"), true);
}
/**
* 删除文件
*
* @throws IOException
*/
@Test
public void testRm() throws IOException {
// 参数说明
// 1 要删除的文件
// 2 是否递归删除
fs.delete(new Path("/demo/git-sync4.sh"), true);
}
/**
* 移动文件
*/
@Test
public void testMv() throws IOException {
fs.rename(new Path("/demo/git-sync.sh"), new Path("/demo/mv/git-sync8.sh"));
}
/**
* 获取文件详情
*/
@Test
public void testFileDetail() throws IOException {
// 获取目录下所有文件信息
RemoteIterator<LocatedFileStatus> files = fs.listFiles(new Path("/demo"), true);
while (files.hasNext()) {
LocatedFileStatus fileStatus = files.next();
System.out.println("=================== 文件详情 ====================");
System.out.println("文件路径:" + fileStatus.getPath());
System.out.println("文件权限:" + fileStatus.getPermission());
System.out.println("文件所属用户:" + fileStatus.getOwner());
System.out.println("文件所属分组:" + fileStatus.getGroup());
System.out.println("文件大小:" + fileStatus.getLen());
System.out.println("文件修改时间:" + fileStatus.getModificationTime());
System.out.println("文件副本数:" + fileStatus.getReplication());
System.out.println("文件块大小:" + fileStatus.getBlockSize());
System.out.println("文件名:" + fileStatus.getPath().getName());
// 获取块信息
BlockLocation[] blockLocations = fileStatus.getBlockLocations();
System.out.println("---------文件块信息:--------");
for (BlockLocation blockLocation : blockLocations) {
// 起始位置、长度、NameNode列表
System.out.println(blockLocation);
}
}
}
/**
* 判断是文件还是文件夹
*
* @throws IOException
*/
@Test
public void testIsFile() throws IOException {
FileStatus[] listStatus = fs.listStatus(new Path("/demo"));
for (FileStatus fileStatus : listStatus) {
if (fileStatus.isFile()) {
System.out.println("文件:" + fileStatus.getPath());
} else {
System.out.println("目录:" + fileStatus.getPath());
}
}
}
}
三、hadoop配置优先级
代码配置(本地) > .site.xml(本地) > -site.xml(HDFS)> *-defalut.xml
配置的优先级,这里以上传文件时,文件副本数配置为例
使用代码配置
这里使用代码设置副本数量
// hadoop集群namenode的通信地址
URI uri = new URI(HDFS_URI);
// 创建配置文件
Configuration configuration = new Configuration();
// 设置副本数量
configuration.set("dfs.replication", "2");
// 获取到HDFS客户端对象
fs = FileSystem.get(uri, configuration, HDFS_USER);
本地xml配置
在本地工程的resources目录下创建一个hdfs-site.xml文件,内容如下:
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<!-- 设置文件的副本数-->
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>
