1 Shell 命令行操作 HDFS
1.1 基本语法
以下两种方式都可以
输入
hadoop fs
或者hdfs dfs
可以查看命令行大全1.3 其余操作与Linux相似
-moveFromLocal
从本地剪切并粘贴到 HDFS
hadoop fs -moveFromLocal ./hadoop.txt /lagou/bigdata
-get 或 -copyToLocal
- 从 HDFS 拷贝到本地
hadoop fs -copyToLocal /lagou/bigdata/hadoop.txt ./
-put 或 -copyFromLocal
- 复制,而 moveFromLocal 是剪切
hadoop fs -copyFromLocal ./hadoop.txt /lagou/bigdata
-appendToFile
- 追加一个文件到一个已有文件的末尾
hadoop fs -appendToFile hdfs.txt /lagou/bigdata/hadoop.txt
2 JAVA 客户端
解压Hadoop安装包并配置环境变量
- 创建一个 Maven 工程 ClientDemo
导入相应的依赖坐标 + 日志配置文件
- 导入三个模块
hadoop-common
hadoopclient
hadoop-hdfs
```xml
org.apache.hadoop hadoop-common 2.9.2 org.apache.hadoop hadoop-client 2.9.2 org.apache.hadoop hadoop-hdfs 2.9.2 junit junit RELEASE org.apache.logging.log4j log4j-core 2.8.2 - 导入三个模块
为了便于控制程序运行打印的日志数量,需要在项目的 `src/main/resources` 目录下,新建一个文件,命名为 `log4j.properties `,文件内容如下:
```xml
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
创建
HdfsClient
类public class HdfsClient{ @Test public void testMkdirs() throws IOException, InterruptedException, URISyntaxException { // 1 获取文件系统配置对象 Configuration configuration = new Configuration(); // 配置在集群上运⾏,指定相关配置 // configuration.set("fs.defaultFS", "hdfs://linux121:9000"); // FileSystem fs = FileSystem.get(configuration); FileSystem fs = FileSystem.get(new URI("hdfs://linux121:9000"), configuration, "root"); // 2 创建⽬录 fs.mkdirs(new Path("/test")); // 3 关闭资源 fs.close(); } }
2.1 问题集锦
1. 如果不指定操作 HDFS 集群的用户信息,默认是 获取当前操作系统的用户信息,出现权限被拒绝的问题,报错如下:
- 解决方案 ①
- 由于 HDFS 的权限管理机制是:用户告诉 hdfs 自己是什么用户,hdfs 都会相信,认为用户不会做坏事。因此这种权限管理比较鸡肋
- 把 根目录 的权限设置为 777 ,关于文件权限管理 交给其他软件去做
hdfs dfs -R 777 /
解决方案 ②
- 在配置文件 hdfs-site.xml 中添加以下属性,以关闭 HDFS 集群权限校验(简单粗暴)
<property> <name>dfs.permissions.enabled</name> <value>false</value> </property>
- 在配置文件 hdfs-site.xml 中添加以下属性,以关闭 HDFS 集群权限校验(简单粗暴)
解决方案 ③
- 指定用户信息获取 FileSystem 对象
FileSystem fs = FileSystem.get(new URI("hdfs://linux121:9000"), configuration, "
root
")
- 2. 当 在 配置文件,java 代码中,包括hdfs服务器的默认配置,都设置副本数量时,参数的优先级是:
- 参数优先级排序:(1)代码中设置的值 —>(2)用户自定义配置文件 —>(3)服务器的默认配置
**
- 3. windows 解压安装 Hadoop后,在调用相关API操作 HDFS 集群时可能会报错,这是由于 Hadoop 安装缺少 windows 操作系统相关文件所致,如下图:
- 解决方案
- 将 winutils.exe 拷贝到 windows 系统 Hadoop 安装目录的 bin 目录下即可
- winutils.rar (需要解压)
- 将 winutils.exe 拷贝到 windows 系统 Hadoop 安装目录的 bin 目录下即可
- 4. IDEA 启动 Hadoop 的任务时,出现如下报错:
- 解决方案
- 将 Hadoop.dll 文件 hadoop.rar (先解压) 添加到 C:\Windows\System32 中
- 解决方案
2.2 HDFS 的 API 操作
上传文件
可通过
configuration.set("dfs.replication", "2")
设置副本数量@Test public void testCopyFromLocalFile() throws IOException, InterruptedException, URISyntaxException { // 1 获取⽂文件系统 Configuration configuration = new Configuration(); configuration.set("dfs.replication", "2"); FileSystem fs = FileSystem.get(new URI("hdfs://linux121:9000"), configuration, "root"); // 2上传⽂文件 fs.copyFromLocalFile(new Path("e:/lagou.txt"), new Path("/lagou.txt")); // 3 关闭资源 fs.close(); }
也可通过 配置文件
hdfs-site.xml
来指定,文件放在 resources 目录下 ```xml <?xml version=”1.0” encoding=”UTF-8”?> <?xml-stylesheet type=”text/xsl” href=”configuration.xsl”?>
<a name="BBYTk"></a>
### 下载文件
```java
@Test
public void testCopyToLocalFile() throws IOException, InterruptedException, URISyntaxException{
// 1 获取⽂文件系统
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://linux121:9000"), configuration, "root");
// 2 执⾏行行下载操作
// boolean delSrc 指是否将原⽂文件删除
// Path src 指要下载的⽂文件路路径
// Path dst 指将⽂文件下载到的路路径
// boolean useRawLocalFileSystem 是否开启⽂文件校验
fs.copyToLocalFile(false, new Path("/lagou.txt"), new Path("e:/lagou_copy.txt"), true);
// 3 关闭资源
fs.close();
}
删除文件/文件夹
@Test
public void testDelete() throws IOException, InterruptedException, URISyntaxException{
// 1 获取⽂文件系统
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://linux121:9000"), configuration, "root");
// 2 执⾏行行删除
fs.delete(new Path("/api_test/"), true);
// 3 关闭资源
fs.close();
}
查看 文件名称、权限、长度、块信息
@Test
public void testListFiles() throws IOException, InterruptedException, URISyntaxException{
// 1获取⽂文件系统
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://linux121:9000"), configuration, "root");
// 2 获取⽂文件详情
RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
while(listFiles.hasNext()){
LocatedFileStatus status = listFiles.next();
// 输出详情
// 文件路径名称
System.out.println(status.getPath().getName());
// 长度
System.out.println(status.getLen());
// 权限
System.out.println(status.getPermission());
// 分组
System.out.println(status.getGroup());
// 获取存储的块信息
BlockLocation[] blockLocations = status.getBlockLocations();
for (BlockLocation blockLocation : blockLocations) {
// 获取块存储的主机节点
String[] hosts = blockLocation.getHosts();
for (String host : hosts) {
System.out.println(host);
}
}
System.out.println("-----------华丽的分割线----------");
}
// 3 关闭资源
fs.close();
}
区分文件和文件夹
@Test
public void testListStatus() throws IOException, InterruptedException, URISyntaxException{
// 1 获取⽂文件配置信息
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://linux121:9000"), configuration, "root");
// 2 判断是⽂文件还是⽂文件夹
FileStatus[] listStatus = fs.listStatus(new Path("/"));
for (FileStatus fileStatus : listStatus) {
// 如果是⽂文件
if (fileStatus.isFile()){
System.out.println("f:"+fileStatus.getPath().getName());
} else{
System.out.println("d:"+fileStatus.getPath().getName());
}
}
// 3 关闭资源
fs.close();
}
2.3 I/O流操作 HDFS
文件上传 fs.create
将本地文件上传到 hdfs 上
@Test public void putFileToHDFS() throws IOException, InterruptedException, URISyntaxException { // 1 获取文件系统 Configuration configuration = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://linux121:9000"), configuration, "root"); // 2 创建输⼊流 FileInputStream fis = new FileInputStream(new File("e:/lagou.txt")); // 3 获取输出流 FSDataOutputStream fos = fs.create(new Path("/lagou_io.txt")); // 4 流的拷贝 IOUtils.copyBytes(fis, fos, configuration); // 5 关闭资源(这里其实不需要关闭) 因为参数configuration自动关闭IO流 IOUtils.closeStream(fos); IOUtils.closeStream(fis); fs.close(); }
文件下载 fs.open
将 hdfs 文件下载到本地
@Test public void getFileFromHDFS() throws IOException, InterruptedException, URISyntaxException{ // 1 获取⽂文件系统 Configuration configuration = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://linux121:9000"), configuration, "root"); // 2 获取输⼊入流 FSDataInputStream fis = fs.open(new Path("/lagou_io.txt")); // 3 获取输出流 FileOutputStream fos = new FileOutputStream(new File("e:/lagou_io_copy.txt")); // 4 流的对拷 IOUtils.copyBytes(fis, fos, configuration); // 5 关闭资源(这里其实不需要关闭) 因为参数configuration自动关闭IO流 IOUtils.closeStream(fos); IOUtils.closeStream(fis); fs.close(); }
定位读取 seek
需求:将 HDFS上 的 lagou.txt 的内容在控制台输出两次
@Test public void readFileSeek2() throws IOException, InterruptedException, URISyntaxException{ // 1 获取⽂文件系统 Configuration configuration = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://linux121:9000"), configuration, "root"); // 2 打开输⼊入流,读取数据输出到控制台 FSDataInputStream in = null; try{ in = fs.open(new Path("/lagou.txt")); IOUtils.copyBytes(in, System.out, 4096, false); //从头再次读取,偏移量为0 表示从头读起 in.seek(0); IOUtils.copyBytes(in, System.out, 4096, false); }finally { IOUtils.closeStream(in); fs.close(); } }