1 Shell 命令行操作 HDFS

1.1 基本语法

  • 以下两种方式都可以

    • hadoop fs 具体命令
    • hdfs dfs 具体命令

      1.2 命令行大全

  • 输入 hadoop fs 或者 hdfs dfs 可以查看命令行大全

    1.3 其余操作与Linux相似

    -moveFromLocal

  • 从本地剪切并粘贴到 HDFS

  • hadoop fs -moveFromLocal ./hadoop.txt /lagou/bigdata

-get 或 -copyToLocal

  • 从 HDFS 拷贝到本地
  • hadoop fs -copyToLocal /lagou/bigdata/hadoop.txt ./

-put 或 -copyFromLocal

  • 复制,而 moveFromLocal 是剪切
  • hadoop fs -copyFromLocal ./hadoop.txt /lagou/bigdata

-appendToFile

  • 追加一个文件到一个已有文件的末尾
  • hadoop fs -appendToFile hdfs.txt /lagou/bigdata/hadoop.txt

    2 JAVA 客户端

  • 解压Hadoop安装包并配置环境变量

  • 创建一个 Maven 工程 ClientDemo
  • 导入相应的依赖坐标 + 日志配置文件

    • 导入三个模块
      • hadoop-common
      • hadoopclient
      • hadoop-hdfs ```xml
    org.apache.hadoop hadoop-common 2.9.2 org.apache.hadoop hadoop-client 2.9.2 org.apache.hadoop hadoop-hdfs 2.9.2 junit junit RELEASE org.apache.logging.log4j log4j-core 2.8.2

  1. 为了便于控制程序运行打印的日志数量,需要在项目的 `src/main/resources` 目录下,新建一个文件,命名为 `log4j.properties `,文件内容如下:
  2. ```xml
  3. log4j.rootLogger=INFO, stdout
  4. log4j.appender.stdout=org.apache.log4j.ConsoleAppender
  5. log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
  6. log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
  7. log4j.appender.logfile=org.apache.log4j.FileAppender
  8. log4j.appender.logfile.File=target/spring.log
  9. log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
  10. log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
  • 创建 HdfsClient

    public class HdfsClient{
    
      @Test
      public void testMkdirs() throws IOException, InterruptedException, URISyntaxException {
    
          // 1 获取文件系统配置对象
          Configuration configuration = new Configuration();
    
          // 配置在集群上运⾏,指定相关配置
          // configuration.set("fs.defaultFS", "hdfs://linux121:9000");
          // FileSystem fs = FileSystem.get(configuration);
          FileSystem fs = FileSystem.get(new URI("hdfs://linux121:9000"), configuration, "root");
    
          // 2 创建⽬录
          fs.mkdirs(new Path("/test"));
    
          // 3 关闭资源
          fs.close();
      }
    }
    

    2.1 问题集锦

  • 1. 如果不指定操作 HDFS 集群的用户信息,默认是 获取当前操作系统的用户信息,出现权限被拒绝的问题,报错如下:

image.png

  • 解决方案 ①
    • 由于 HDFS 的权限管理机制是:用户告诉 hdfs 自己是什么用户,hdfs 都会相信,认为用户不会做坏事。因此这种权限管理比较鸡肋
    • 把 根目录 的权限设置为 777 ,关于文件权限管理 交给其他软件去做
      • hdfs dfs -R 777 /
  • 解决方案 ②

    • 在配置文件 hdfs-site.xml 中添加以下属性,以关闭 HDFS 集群权限校验(简单粗暴)
      <property>
      <name>dfs.permissions.enabled</name>
      <value>false</value>
      </property>
      
  • 解决方案 ③

    • 指定用户信息获取 FileSystem 对象
    • FileSystem fs = FileSystem.get(new URI("hdfs://linux121:9000"), configuration, "root")
  • 2. 当 在 配置文件,java 代码中,包括hdfs服务器的默认配置,都设置副本数量时,参数的优先级是:
    • 参数优先级排序:(1)代码中设置的值 —>(2)用户自定义配置文件 —>(3)服务器的默认配置

**

  • 3. windows 解压安装 Hadoop后,在调用相关API操作 HDFS 集群时可能会报错,这是由于 Hadoop 安装缺少 windows 操作系统相关文件所致,如下图:

image.png

  • 解决方案
    • 将 winutils.exe 拷贝到 windows 系统 Hadoop 安装目录的 bin 目录下即可
  • 4. IDEA 启动 Hadoop 的任务时,出现如下报错:
    • image.png
    • 解决方案
      • 将 Hadoop.dll 文件 hadoop.rar (先解压) 添加到 C:\Windows\System32 中

2.2 HDFS 的 API 操作

上传文件

  • 可通过 configuration.set("dfs.replication", "2") 设置副本数量

    @Test
    public void testCopyFromLocalFile() throws IOException, InterruptedException, URISyntaxException {
    
      // 1 获取⽂文件系统
      Configuration configuration = new Configuration();
      configuration.set("dfs.replication", "2");
      FileSystem fs = FileSystem.get(new URI("hdfs://linux121:9000"), configuration, "root");
    
      // 2上传⽂文件
      fs.copyFromLocalFile(new Path("e:/lagou.txt"), new Path("/lagou.txt"));
    
      // 3 关闭资源
      fs.close();
    }
    
  • 也可通过 配置文件 hdfs-site.xml 来指定,文件放在 resources 目录下 ```xml <?xml version=”1.0” encoding=”UTF-8”?> <?xml-stylesheet type=”text/xsl” href=”configuration.xsl”?>

dfs.replication 1

<a name="BBYTk"></a>
### 下载文件
```java
@Test
public void testCopyToLocalFile() throws IOException, InterruptedException, URISyntaxException{

    // 1 获取⽂文件系统
    Configuration configuration = new Configuration();
    FileSystem fs = FileSystem.get(new URI("hdfs://linux121:9000"), configuration, "root");

    // 2 执⾏行行下载操作
    // boolean delSrc 指是否将原⽂文件删除
    // Path src 指要下载的⽂文件路路径
    // Path dst 指将⽂文件下载到的路路径
    // boolean useRawLocalFileSystem 是否开启⽂文件校验
    fs.copyToLocalFile(false, new Path("/lagou.txt"), new Path("e:/lagou_copy.txt"), true);

    // 3 关闭资源
    fs.close();
}

删除文件/文件夹

@Test
public void testDelete() throws IOException, InterruptedException, URISyntaxException{

    // 1 获取⽂文件系统
    Configuration configuration = new Configuration();
    FileSystem fs = FileSystem.get(new URI("hdfs://linux121:9000"), configuration, "root");

    // 2 执⾏行行删除
    fs.delete(new Path("/api_test/"), true);

    // 3 关闭资源
    fs.close();
}

查看 文件名称、权限、长度、块信息

@Test
public void testListFiles() throws IOException, InterruptedException, URISyntaxException{

    // 1获取⽂文件系统
    Configuration configuration = new Configuration();
    FileSystem fs = FileSystem.get(new URI("hdfs://linux121:9000"), configuration, "root");

    // 2 获取⽂文件详情
    RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);

    while(listFiles.hasNext()){
        LocatedFileStatus status = listFiles.next();
        // 输出详情

        // 文件路径名称
        System.out.println(status.getPath().getName());

        // 长度
        System.out.println(status.getLen());

        // 权限
        System.out.println(status.getPermission());

        // 分组
        System.out.println(status.getGroup());

        // 获取存储的块信息
        BlockLocation[] blockLocations = status.getBlockLocations();
        for (BlockLocation blockLocation : blockLocations) {

            // 获取块存储的主机节点
            String[] hosts = blockLocation.getHosts();
            for (String host : hosts) {
                System.out.println(host);
            }
        }

        System.out.println("-----------华丽的分割线----------");
    }

    // 3 关闭资源
    fs.close();
}

区分文件和文件夹

@Test
public void testListStatus() throws IOException, InterruptedException, URISyntaxException{

    // 1 获取⽂文件配置信息
    Configuration configuration = new Configuration();
    FileSystem fs = FileSystem.get(new URI("hdfs://linux121:9000"), configuration, "root");

    // 2 判断是⽂文件还是⽂文件夹
    FileStatus[] listStatus = fs.listStatus(new Path("/"));
    for (FileStatus fileStatus : listStatus) {
        // 如果是⽂文件
        if (fileStatus.isFile()){ 
            System.out.println("f:"+fileStatus.getPath().getName());
        } else{ 
            System.out.println("d:"+fileStatus.getPath().getName());         
        }
    }

       // 3 关闭资源
    fs.close();
}

2.3 I/O流操作 HDFS

文件上传 fs.create

  • 将本地文件上传到 hdfs 上

    @Test
    public void putFileToHDFS() throws IOException, InterruptedException, URISyntaxException {
    
      // 1 获取文件系统
      Configuration configuration = new Configuration();
      FileSystem fs = FileSystem.get(new URI("hdfs://linux121:9000"), configuration, "root");
    
      // 2 创建输⼊流
      FileInputStream fis = new FileInputStream(new File("e:/lagou.txt"));
    
      // 3 获取输出流
      FSDataOutputStream fos = fs.create(new Path("/lagou_io.txt"));
    
      // 4 流的拷贝
      IOUtils.copyBytes(fis, fos, configuration);
    
      // 5 关闭资源(这里其实不需要关闭) 因为参数configuration自动关闭IO流
      IOUtils.closeStream(fos);
      IOUtils.closeStream(fis);
      fs.close();
    }
    

    文件下载 fs.open

  • 将 hdfs 文件下载到本地

    @Test
    public void getFileFromHDFS() throws IOException, InterruptedException, URISyntaxException{
    
      // 1 获取⽂文件系统
      Configuration configuration = new Configuration();
      FileSystem fs = FileSystem.get(new URI("hdfs://linux121:9000"), configuration, "root");
    
      // 2 获取输⼊入流
      FSDataInputStream fis = fs.open(new Path("/lagou_io.txt"));
    
      // 3 获取输出流
      FileOutputStream fos = new FileOutputStream(new File("e:/lagou_io_copy.txt"));
    
      // 4 流的对拷
      IOUtils.copyBytes(fis, fos, configuration);
    
      // 5 关闭资源(这里其实不需要关闭) 因为参数configuration自动关闭IO流
      IOUtils.closeStream(fos);
      IOUtils.closeStream(fis);
      fs.close();
    }
    

    定位读取 seek

  • 需求:将 HDFS上 的 lagou.txt 的内容在控制台输出两次

    @Test
    public void readFileSeek2() throws IOException, InterruptedException, URISyntaxException{
    
      // 1 获取⽂文件系统
      Configuration configuration = new Configuration();
      FileSystem fs = FileSystem.get(new URI("hdfs://linux121:9000"), configuration, "root");
    
      // 2 打开输⼊入流,读取数据输出到控制台
      FSDataInputStream in = null;
      try{
          in = fs.open(new Path("/lagou.txt"));
          IOUtils.copyBytes(in, System.out, 4096, false);
    
          //从头再次读取,偏移量为0 表示从头读起
          in.seek(0); 
          IOUtils.copyBytes(in, System.out, 4096, false);
      }finally {
          IOUtils.closeStream(in);
          fs.close();
      }
    }