使用API访问HDFS

一、环境准备

windows下需要下载winutils,且配置环境变量。

winutils下载地址:https://github.com/cdarlint/winutils

创建maven工程hdfs-client-demo,在pom.xml中引入依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>junit</groupId>
  4. <artifactId>junit</artifactId>
  5. <version>4.12</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.slf4j</groupId>
  9. <artifactId>slf4j-log4j12</artifactId>
  10. <version>1.7.28</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.apache.hadoop</groupId>
  14. <artifactId>hadoop-client</artifactId>
  15. <version>3.2.1</version>
  16. </dependency>
  17. </dependencies>

src/main/resources目录下创建log4j.properties

## 输出到控制台
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d    %p    [%c] - %m%n

## 输出到文件
#log4j.appender.logfile=org.apache.log4j.FileAppender
#log4j.appender.logfile.File=target/spring.log
#log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
#log4j.appender.stdout.layout.ConversionPattern=%d    %p    [%c] - %m%n

二、代码示例

调用示例(创建目录)

这里以上传文件为例,演示hdfs的api调用

// hadoop集群namenode的通信地址
URI uri = new URI("hdfs://hadoop102:8020");
// 创建配置文件
Configuration configuration = new Configuration();
// 获取到HDFS客户端对象
fs = FileSystem.get(uri, configuration, "hadoop");
// 创建目录
fs.mkdirs(new Path("/demo"));
// 关闭资源
fs.close();

完整示例代码

package org.example.hadoop.hdfs;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/**
 * HDFS客户端测试
 *
 * @author zhengpin
 * @date 2021/06/07
 */
public class HdfsClient {

    /**
     * HDFS集群NameNode的通信地址
     */
    private static final String HDFS_URI = "hdfs://hadoop102:8020";
    /**
     * 访问HDFS集群的用户
     */
    private static final String HDFS_USER = "hadoop";

    /**
     * HDFS客户端对象
     */
    private FileSystem fs;

    /**
     * 初始化HDFS客户端
     *
     * @throws URISyntaxException
     * @throws IOException
     * @throws InterruptedException
     */
    @Before
    public void init() throws URISyntaxException, IOException, InterruptedException {
        // hadoop集群namenode的通信地址
        URI uri = new URI(HDFS_URI);
        // 创建配置文件
        Configuration configuration = new Configuration();
        // 设置副本数量
        configuration.set("dfs.replication", "2");
        // 获取到HDFS客户端对象
        fs = FileSystem.get(uri, configuration, HDFS_USER);
    }

    /**
     * 关闭资源
     *
     * @throws IOException
     */
    @After
    public void close() throws IOException {
        fs.close();
    }

    /**
     * 创建目录
     *
     * @throws Exception
     */
    @Test
    public void testMkdirs() throws IOException {
        fs.mkdirs(new Path("/demo"));
    }

    /**
     * 上传文件
     *
     * @throws IOException
     */
    @Test
    public void testPut() throws IOException {
        // 参数说明
        //    1 是否删除原文件
        //    2 文件已存在时,是否覆盖
        //    3 原文件路径(本地路径)
        //    4 目标文件路径(HDFS路径)只写目录时,将在目录下创建一个和原文件名一样的文件;写了目录和文件名时,将按指定文件名创建文件
        fs.copyFromLocalFile(false, true, new Path("/Users/zhengpin/Develop/DevOps/git-sync.sh"),
            new Path("/demo/git-sync4.sh"));
    }

    /**
     * 下载文件
     */
    @Test
    public void testGet() throws IOException {
        // 参数说明
        //    1 是否删除原文件
        //    2 原文件路径(HDFS路径)
        //    3 目标文件路径路径(本地路径)
        //    4 是否开启本地校验(CRC),为false时,会同时下载一个.crc的文件
        fs.copyToLocalFile(false, new Path("/demo/git-sync4.sh"), new Path("/Users/zhengpin/Downloads/"), true);
    }

    /**
     * 删除文件
     *
     * @throws IOException
     */
    @Test
    public void testRm() throws IOException {
        // 参数说明
        //    1 要删除的文件
        //    2 是否递归删除
        fs.delete(new Path("/demo/git-sync4.sh"), true);
    }

    /**
     * 移动文件
     */
    @Test
    public void testMv() throws IOException {
        fs.rename(new Path("/demo/git-sync.sh"), new Path("/demo/mv/git-sync8.sh"));
    }

    /**
     * 获取文件详情
     */
    @Test
    public void testFileDetail() throws IOException {
        // 获取目录下所有文件信息
        RemoteIterator<LocatedFileStatus> files = fs.listFiles(new Path("/demo"), true);
        while (files.hasNext()) {
            LocatedFileStatus fileStatus = files.next();
            System.out.println("=================== 文件详情 ====================");
            System.out.println("文件路径:" + fileStatus.getPath());
            System.out.println("文件权限:" + fileStatus.getPermission());
            System.out.println("文件所属用户:" + fileStatus.getOwner());
            System.out.println("文件所属分组:" + fileStatus.getGroup());
            System.out.println("文件大小:" + fileStatus.getLen());
            System.out.println("文件修改时间:" + fileStatus.getModificationTime());
            System.out.println("文件副本数:" + fileStatus.getReplication());
            System.out.println("文件块大小:" + fileStatus.getBlockSize());
            System.out.println("文件名:" + fileStatus.getPath().getName());

            // 获取块信息
            BlockLocation[] blockLocations = fileStatus.getBlockLocations();
            System.out.println("---------文件块信息:--------");
            for (BlockLocation blockLocation : blockLocations) {
                // 起始位置、长度、NameNode列表
                System.out.println(blockLocation);
            }
        }
    }

    /**
     * 判断是文件还是文件夹
     *
     * @throws IOException
     */
    @Test
    public void testIsFile() throws IOException {
        FileStatus[] listStatus = fs.listStatus(new Path("/demo"));
        for (FileStatus fileStatus : listStatus) {
            if (fileStatus.isFile()) {
                System.out.println("文件:" + fileStatus.getPath());
            } else {
                System.out.println("目录:" + fileStatus.getPath());
            }
        }
    }
}

三、hadoop配置优先级

代码配置(本地) > .site.xml(本地) > -site.xml(HDFS)> *-defalut.xml

配置的优先级,这里以上传文件时,文件副本数配置为例

使用代码配置

这里使用代码设置副本数量

// hadoop集群namenode的通信地址
URI uri = new URI(HDFS_URI);
// 创建配置文件
Configuration configuration = new Configuration();
// 设置副本数量
configuration.set("dfs.replication", "2");
// 获取到HDFS客户端对象
fs = FileSystem.get(uri, configuration, HDFS_USER);

本地xml配置

在本地工程的resources目录下创建一个hdfs-site.xml文件,内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
  <!--  设置文件的副本数-->
  <property>
    <name>dfs.replication</name>
    <value>1</value>
  </property>
</configuration>

HDFS服务端XML配置

HDFS服务端默认配置