04 HDFS Java API
一、搭建Linux下Eclipse开发环境
- 通过wincap传输Eclipse,并解压
/usr/local/eclipse
- 在虚拟机中,使用命令打开Eclipse
/usr/local/eclipse/eclipse
新建Java工程
hdfs,并将/usr/local/hadoop/share/hadoop目录下的common、hdfs、mapreduce、yarn等4个子目录中的jar文件,以及这4个子目录下的lib文件夹中的所有jar文件作为外部jar文件添加到工程中关闭Eclipse
复制
hadoop-eclipse-plugin-2.7.1.jar到Eclipse的dropins目录下打开Eclipse
单击
Windows -- Perspective -- Open Perspective --Other --Map/Reduce

在Map/Reduce Locations视图空白处,单击鼠标右键,选择New Hadoop location

Hadoop location的具体配置如下
两个端口要与Hadoop的配置文件core-site.xml和hdfs-site.xml相一致core-site.xml —>DFS Master
hdfs-site.xml —>Map/Reduce(V2) Master

二、HDFS Java API
Hadoop中的文件操作类基本上是在org.apache.hadoop.fs包中,这些API能够支持的操作包含打开文件、读写文件、删除文件等。
Hadoop类库中最终面向用户提供的接口类是FileSystem,该类是个抽象类,只能通过该类的get方法得到具体类。
get方法存在几个重载版本,常用的如下
static FileSystem get(Configuration conf);
该类封装了几乎所有的文件操作,如mkdir、delete等。
- 构建获取FileSystem接口API的方法供以后的文件操作使用的具体方式如下:
FileSystem getFileSystem() throws Exception{URI uri = new URI("hdfs://hadoop0:9000/");FileSystem fileSystem = FileSystem.get(uri, new Configuration());return fileSystem;}
- 使用FileSystem的copyFromLocalFile(本地文件,HDFS路径)方法可以将本地文件传到HDFS中
/**
* 使用FileSystem的copyFromLocalFile(本地文件,HDFS路径)
* 方法将本地文件传到HDFS中
* @throws Exception
*/
public static void uploadFile() throws Exception {
//通过FileSystem类和HDFS建立连接
FileSystem hdfs = getFileSystem();
//文件源路径
Path src = new Path("/root/a.txt");
//目标路径
Path dst = new Path("/");
//通过调用listStatus方法,把路径下所有文件存到数组中(上传前)
FileStatus files[] = hdfs.listStatus(dst);
//遍历数组
for (FileStatus file : files) {
System.out.println(file.getPath());
}
System.out.println("------------after upload--------------------");
//调用copyFromLocalFile把本地文件上传到HDFS中
hdfs.copyFromLocalFile(src, dst);
//通过调用listStatus方法,把路径下所有文件存到数组中(上传后)
files = hdfs.listStatus(dst);
for (FileStatus file : files) {
System.out.println(file.getPath());
}
}
- 通过
FileSYstem.create(Path f)在HDFS上创建文件,其中f为文件的完整路径
/**
* 在HDFS上创建文件
* @throws Exception
*/
public static void createFile() throws Exception {
byte[] buff = "Hello Hadoop".getBytes();
FileSystem hdfs = getFileSystem();
//在HDFS上创建文件,f为文件的完整路径
Path dfs = new Path("/testcreate");
//FSDataOutputStream重载了很多write方法,用于写入很多类型的数据
FSDataOutputStream outputStream = hdfs.create(dfs);
outputStream.write(buff, 0, buff.length);
System.out.println("Finish...");
outputStream.close();
}

刷新

- 通过
FileSystem.rename(Path src,Path dst),为指定的HDFS文件重命名,其中src和dst均为文件的完整路径
/**
* 为指定的HDFS文件重命名
* @throws Exception
*/
public static void fileRename() throws Exception {
FileSystem hdfs = getFileSystem();
//原文件名
Path frpaht = new Path("/a.txt");
//新文件名
Path topath = new Path("/b.txt");
//判断是否重命名成功
boolean isRename = hdfs.rename(frpaht, topath);
String result = isRename ? "成功" : "失败";
System.out.println("文件重命名结果为:" + result);
}
执行之后,刷新

- 通过
FileSystem.delete(Path f,Boolean recursive)可删除指定的HDFS文件,其中f为需要删除文件的完整路径,recursive用来确定是否进行递归删除
/**
* 删除指定的HDFS文件
* @throws Exception
*/
public static void deleteFile() throws Exception {
FileSystem hdfs = getFileSystem();
Path delef = new Path("/test1");
boolean isDeleted = hdfs.delete(delef, false);
// 递归删除
// boolean isDeleted=hdfs.delete(delef,true);
System.out.println("Delete?" + isDeleted);
}

删除文件夹与删除文件的代码一样,只需要换成删除的目录路径即可,如果目录下有文件,要进行递归删除
- 通过
FileSystem.open(Path f)可打开指定的HDFS文件进行读取
/**
* 打开指定的HDFS文件进行读取
* @throws Exception
*/
public static void readFile() throws Exception {
FileSystem fileSystem = getFileSystem();
FSDataInputStream openStream = fileSystem.open(new Path("/testcreate"));
//import org.apache.hadoop.io.IOUtils;
IOUtils.copyBytes(openStream, System.out, 1024, false);
IOUtils.closeStream(openStream);
}

- 其他操作
/**
* 查看指定的HDFS文件是否存在
* @throws Exception
*/
public static void isFileExists() throws Exception {
FileSystem hdfs = getFileSystem();
Path findf = new Path("/test1");
boolean isExists = hdfs.exists(findf);
System.out.println("Exist?" + isExists);
}
/**
* 查看指定的HDFS文件的修改时间
* @throws Exception
*/
public static void fileLastModify() throws Exception {
FileSystem hdfs = getFileSystem();
Path fpath = new Path("/testcreate");
FileStatus fileStatus = hdfs.getFileStatus(fpath);
long modiTime = fileStatus.getModificationTime();
System.out.println("testcreate的修改时间是" + modiTime);
}
/**
* 查看指定的HDFS中某个目录下的所有文件
* @throws Exception
*/
public static void fileLocation() throws Exception {
FileSystem hdfs = getFileSystem();
Path fpath = new Path("/testcreate");
FileStatus filestatus = hdfs.getFileStatus(fpath);
BlockLocation[] blkLocations = hdfs.getFileBlockLocations(filestatus, 0, filestatus.getLen());
int blockLen = blkLocations.length;
for (int i = 0; i < blockLen; i++) {
String[] hosts = blkLocations[i].getHosts();
System.out.println("block_" + i + "_location:" + hosts[0]);
}
}
/**
* 获取HDFS集群上的所有节点名称
* @throws Exception
*/
public static void nodeList() throws Exception {
FileSystem fs = getFileSystem();
DistributedFileSystem hdfs = (DistributedFileSystem) fs;
DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats();
for (int i = 0; i < dataNodeStats.length; i++) {
System.out.println("DataNode_" + i + "_Name:"
+ dataNodeStats[i].getHostName());
}
}
运行结果

