04 HDFS Java API

一、搭建Linux下Eclipse开发环境

  1. 通过wincap传输Eclipse,并解压
    1. /usr/local/eclipse
  1. 在虚拟机中,使用命令打开Eclipse
    1. /usr/local/eclipse/eclipse
  1. 新建Java工程hdfs,并将/usr/local/hadoop/share/hadoop目录下的common、hdfs、mapreduce、yarn等4个子目录中的jar文件,以及这4个子目录下的lib文件夹中的所有jar文件作为外部jar文件添加到工程中

  2. 关闭Eclipse

  3. 复制hadoop-eclipse-plugin-2.7.1.jar到Eclipse的dropins目录下

  4. 打开Eclipse

  5. 单击Windows -- Perspective -- Open Perspective --Other --Map/Reduce
    image-20210110212240259
    image-20210110214803439

  6. 在Map/Reduce Locations视图空白处,单击鼠标右键,选择New Hadoop location
    image-20210110214942879

  7. Hadoop location的具体配置如下
    image-20210110215308406
    两个端口要与Hadoop的配置文件core-site.xmlhdfs-site.xml相一致

    core-site.xml —>DFS Master

hdfs-site.xml —>Map/Reduce(V2) Master


image-20210110220152857

二、HDFS Java API

Hadoop中的文件操作类基本上是在org.apache.hadoop.fs包中,这些API能够支持的操作包含打开文件、读写文件、删除文件等。

Hadoop类库中最终面向用户提供的接口类是FileSystem,该类是个抽象类,只能通过该类的get方法得到具体类。

get方法存在几个重载版本,常用的如下

static FileSystem get(Configuration conf);

该类封装了几乎所有的文件操作,如mkdir、delete等。

  1. 构建获取FileSystem接口API的方法供以后的文件操作使用的具体方式如下:
  1. FileSystem getFileSystem() throws Exception{
  2. URI uri = new URI("hdfs://hadoop0:9000/");
  3. FileSystem fileSystem = FileSystem.get(uri, new Configuration());
  4. return fileSystem;
  5. }
  1. 使用FileSystem的copyFromLocalFile(本地文件,HDFS路径)方法可以将本地文件传到HDFS中
/**
 *  使用FileSystem的copyFromLocalFile(本地文件,HDFS路径)
 *  方法将本地文件传到HDFS中
 * @throws Exception
 */
public static void uploadFile() throws Exception {
    //通过FileSystem类和HDFS建立连接
    FileSystem hdfs = getFileSystem();
    //文件源路径
    Path src = new Path("/root/a.txt");
    //目标路径
    Path dst = new Path("/");
    //通过调用listStatus方法,把路径下所有文件存到数组中(上传前)
    FileStatus files[] = hdfs.listStatus(dst);
    //遍历数组
    for (FileStatus file : files) {
            System.out.println(file.getPath());
        }
    System.out.println("------------after upload--------------------");
    //调用copyFromLocalFile把本地文件上传到HDFS中
    hdfs.copyFromLocalFile(src, dst);
    //通过调用listStatus方法,把路径下所有文件存到数组中(上传后)
    files = hdfs.listStatus(dst);
    for (FileStatus file : files) {
        System.out.println(file.getPath());
    }
}
  1. 通过FileSYstem.create(Path f)在HDFS上创建文件,其中f为文件的完整路径
/**
 *  在HDFS上创建文件
 * @throws Exception
 */
public static void createFile() throws Exception {
    byte[] buff = "Hello Hadoop".getBytes();
    FileSystem hdfs = getFileSystem();
    //在HDFS上创建文件,f为文件的完整路径
    Path dfs = new Path("/testcreate");
    //FSDataOutputStream重载了很多write方法,用于写入很多类型的数据
    FSDataOutputStream outputStream = hdfs.create(dfs);
    outputStream.write(buff, 0, buff.length);
    System.out.println("Finish...");
    outputStream.close();
}

image-20210110224340723

刷新

image-20210110224404457

  1. 通过FileSystem.rename(Path src,Path dst),为指定的HDFS文件重命名,其中src和dst均为文件的完整路径
/**
 *  为指定的HDFS文件重命名
 * @throws Exception
 */
public static void fileRename() throws Exception {
    FileSystem hdfs = getFileSystem();
    //原文件名
    Path frpaht = new Path("/a.txt");
    //新文件名
    Path topath = new Path("/b.txt");
    //判断是否重命名成功
    boolean isRename = hdfs.rename(frpaht, topath);
    String result = isRename ? "成功" : "失败";
    System.out.println("文件重命名结果为:" + result);
}

执行之后,刷新

image-20210110224850956

  1. 通过FileSystem.delete(Path f,Boolean recursive)可删除指定的HDFS文件,其中f为需要删除文件的完整路径,recursive用来确定是否进行递归删除
/**
 *  删除指定的HDFS文件
 * @throws Exception
 */
public static void deleteFile() throws Exception {
    FileSystem hdfs = getFileSystem();
    Path delef = new Path("/test1");
    boolean isDeleted = hdfs.delete(delef, false);
    // 递归删除
    // boolean isDeleted=hdfs.delete(delef,true);
    System.out.println("Delete?" + isDeleted);
}

image-20210110231744760

删除文件夹与删除文件的代码一样,只需要换成删除的目录路径即可,如果目录下有文件,要进行递归删除

  1. 通过FileSystem.open(Path f)可打开指定的HDFS文件进行读取
/**
 *  打开指定的HDFS文件进行读取
 * @throws Exception
 */
 public static void readFile() throws Exception {
    FileSystem fileSystem = getFileSystem();
    FSDataInputStream openStream = fileSystem.open(new Path("/testcreate"));
    //import org.apache.hadoop.io.IOUtils;
    IOUtils.copyBytes(openStream, System.out, 1024, false);
    IOUtils.closeStream(openStream);
}

image-20210110232309744

  1. 其他操作
    /**
     *  查看指定的HDFS文件是否存在
     * @throws Exception
     */
    public static void isFileExists() throws Exception {
        FileSystem hdfs = getFileSystem();
        Path findf = new Path("/test1");
        boolean isExists = hdfs.exists(findf);
        System.out.println("Exist?" + isExists);
    }

    /**
     *  查看指定的HDFS文件的修改时间
     * @throws Exception
     */
    public static void fileLastModify() throws Exception {
        FileSystem hdfs = getFileSystem();
        Path fpath = new Path("/testcreate");
        FileStatus fileStatus = hdfs.getFileStatus(fpath);
        long modiTime = fileStatus.getModificationTime();
        System.out.println("testcreate的修改时间是" + modiTime);
    }
        /**
     *  查看指定的HDFS中某个目录下的所有文件
     * @throws Exception
     */
    public static void fileLocation() throws Exception {
        FileSystem hdfs = getFileSystem();
        Path fpath = new Path("/testcreate");
        FileStatus filestatus = hdfs.getFileStatus(fpath);
        BlockLocation[] blkLocations = hdfs.getFileBlockLocations(filestatus, 0, filestatus.getLen());
        int blockLen = blkLocations.length;
        for (int i = 0; i < blockLen; i++) {
            String[] hosts = blkLocations[i].getHosts();
            System.out.println("block_" + i + "_location:" + hosts[0]);
        }
    }

    /**
     *  获取HDFS集群上的所有节点名称
     * @throws Exception
     */
    public static void nodeList() throws Exception {
        FileSystem fs = getFileSystem();
        DistributedFileSystem hdfs = (DistributedFileSystem) fs;

        DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats();
        for (int i = 0; i < dataNodeStats.length; i++) {
            System.out.println("DataNode_" + i + "_Name:"
                    + dataNodeStats[i].getHostName());
        }
    }

运行结果

image-20210110233518912