一.对HDFS操作设计以下几个主要的类:Configuration:封装了客户端或者服务器的配置信息

    FileSystem:此类的对象是一个文件系统对象,可以用该对象的一些方法来对文件进行操作通过FileSystem的静态方法get获得该对象,例:FileSystem hdfs = FileSystem.get(conf);

    FSDataInputStream:这是HDFS中的输入流,通过由FileSystem的open方法获取

    FSDataOutputStream:这是HDFS中的输出流,通过由FileSystem的create方法获取

    二.依赖配置
    xmlns:xsi="[http://www.w3.org/2001/XMLSchema-instance"](http://www.w3.org/2001/XMLSchema-instance")
    xsi:schemaLocation=”http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd”>
    4.0.0

    com.hdfs
    HadoopTest
    0.0.1-SNAPSHOT
    jar

    HadoopTest
    http://maven.apache.org


    org.springframework.boot
    spring-boot-starter-parent
    2.0.0.RELEASE




    UTF-8
    UTF-8
    1.8




    org.springframework.boot
    spring-boot-starter


    org.springframework.boot
    spring-boot-starter-test
    test


    org.springframework.boot
    spring-boot-starter-web


    org.apache.hadoop
    hadoop-common
    3.1.1


    org.apache.hadoop
    hadoop-hdfs
    3.1.1


    org.apache.hadoop
    hadoop-client
    3.1.1


    org.apache.hadoop
    hadoop-mapreduce-client-core
    3.1.1


    cn.bestwu
    ik-analyzers
    5.1.0


    jdk.tools
    jdk.tools
    1.8
    system
    ${JAVA_HOME}/lib/tools.jar


    junit
    junit
    test






    org.springframework.boot
    spring-boot-maven-plugin


    org.apache.maven.plugins
    maven-compiler-plugin

    1.8
    1.8





    # tomcat thread = 200
    server.tomcat.max-threads=1000


    server.port=8900
    # session time 30
    server.session-timeout=60

    spring.application.name=hadoop
    spring.servlet.multipart.max-file-size=50MB
    spring.servlet.multipart.max-request-size=50MB

    hdfs.path=hdfs://localhost:9000
    hdfs.username=linhaiy
    logging.config=classpath:logback.xml
    三.HDFS文件操作接口开发
    package com.hadoop.config;

    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Configuration;

    /
    HDFS配置类
    @author linhaiy
    @date 2019.05.18
    /
    @Configuration
    public class HdfsConfig {
    @Value(“${hdfs.path}”)
    private String path;

    public String getPath() {
    return path;
    }

    public void setPath(String path) {
    this.path = path;
    }
    }

    package com.hadoop.hdfs.entity;

    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;

    import org.apache.hadoop.io.Writable;

    /

    用户实体类
    @author linhaiy
    @date 2019.05.18
    /
    public class User implements Writable {

    private String username;
    private Integer age;
    private String address;

    public User() {
    super();
    // TODO Auto-generated constructor stub
    }

    public User(String username, Integer age, String address) {
    super();
    this.username = username;
    this.age = age;
    this.address = address;
    }

    @Override
    public void write(DataOutput output) throws IOException {
    // 把对象序列化
    output.writeChars(username);
    output.writeInt(age);
    output.writeChars(address);
    }

    @Override
    public void readFields(DataInput input) throws IOException {
    // 把序列化的对象读取到内存中
    username = input.readUTF();
    age = input.readInt();
    address = input.readUTF();
    }

    public String getUsername() {
    return username;
    }

    public void setUsername(String username) {
    this.username = username;
    }

    public Integer getAge() {
    return age;
    }

    public void setAge(Integer age) {
    this.age = age;
    }

    public String getAddress() {
    return address;
    }

    public void setAddress(String address) {
    this.address = address;
    }

    @Override
    public String toString() {
    return “User [username=” + username + “, age=” + age + “, address=” + address + “]”;
    }

    }
    package com.hadoop.hdfs.service;

    import java.io.BufferedReader;
    import java.io.InputStreamReader;
    import java.net.URI;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import javax.annotation.PostConstruct;
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.BlockLocation;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.LocatedFileStatus;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.fs.RemoteIterator;
    import org.apache.hadoop.io.IOUtils;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    import org.springframework.web.multipart.MultipartFile;

    import com.hadoop.util.JsonUtil;

    @Component
    public class HdfsService {

    @Value(“${hdfs.path}”)
    private String path;
    @Value(“${hdfs.username}”)
    private String username;

    private static String hdfsPath;
    private static String hdfsName;
    private static final int bufferSize = 1024 1024 64;

    /
    获取HDFS配置信息
    @return
    */
    private static Configuration getConfiguration() {
    Configuration configuration = new Configuration();
    configuration.set(“fs.defaultFS”, hdfsPath);
    return configuration;
    }

    /

    获取HDFS文件系统对象
    @return
    @throws Exception
    /
    public static FileSystem getFileSystem() throws Exception {
    // 客户端去操作hdfs时是有一个用户身份的,默认情况下hdfs客户端api会从jvm中获取一个参数作为自己的用户身份
    // DHADOOP_USER_NAME=hadoop
    // 也可以在构造客户端fs对象时,通过参数传递进去
    FileSystem fileSystem = FileSystem.get(new URI(hdfsPath), getConfiguration(), hdfsName);
    return fileSystem;
    }

    /
    在HDFS创建文件夹
    @param path
    @return
    @throws Exception
    */
    public static boolean mkdir(String path) throws Exception {
    if (StringUtils.isEmpty(path)) {
    return false;
    }
    if (existFile(path)) {
    return true;
    }
    FileSystem fs = getFileSystem();
    // 目标路径
    Path srcPath = new Path(path);
    boolean isOk = fs.mkdirs(srcPath);
    fs.close();
    return isOk;
    }

    /

    判断HDFS文件是否存在
    @param path
    @return
    @throws Exception
    /
    public static boolean existFile(String path) throws Exception {
    if (StringUtils.isEmpty(path)) {
    return false;
    }
    FileSystem fs = getFileSystem();
    Path srcPath = new Path(path);
    boolean isExists = fs.exists(srcPath);
    return isExists;
    }

    /**
    读取HDFS目录信息
    @param path
    @return
    @throws Exception
    /
    public static List> readPathInfo(String path) throws Exception {
    if (StringUtils.isEmpty(path)) {
    return null;
    }
    if (!existFile(path)) {
    return null;
    }
    FileSystem fs = getFileSystem();
    // 目标路径
    Path newPath = new Path(path);
    FileStatus[] statusList = fs.listStatus(newPath);
    List> list = new ArrayList<>();
    if (null != statusList && statusList.length > 0) {
    for (FileStatus fileStatus : statusList) {
    Map map = new HashMap<>();
    map.put(“filePath”, fileStatus.getPath());
    map.put(“fileStatus”, fileStatus.toString());
    list.add(map);
    }
    return list;
    } else {
    return null;
    }
    }

    /
    HDFS创建文件
    @param path
    @param file
    @throws Exception
    */
    public static void createFile(String path, MultipartFile file) throws Exception {
    if (StringUtils.isEmpty(path) || null == file.getBytes()) {
    return;
    }
    String fileName = file.getOriginalFilename();
    FileSystem fs = getFileSystem();
    // 上传时默认当前目录,后面自动拼接文件的目录
    Path newPath = new Path(path + “/“ + fileName);
    // 打开一个输出流
    FSDataOutputStream outputStream = fs.create(newPath);
    outputStream.write(file.getBytes());
    outputStream.close();
    fs.close();
    }

    /

    读取HDFS文件内容
    @param path
    @return
    @throws Exception
    /
    public static String readFile(String path) throws Exception {
    if (StringUtils.isEmpty(path)) {
    return null;
    }
    if (!existFile(path)) {
    return null;
    }
    FileSystem fs = getFileSystem();
    // 目标路径
    Path srcPath = new Path(path);
    FSDataInputStream inputStream = null;
    try {
    inputStream = fs.open(srcPath);
    // 防止中文乱码
    BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
    String lineTxt = “”;
    StringBuffer sb = new StringBuffer();
    while ((lineTxt = reader.readLine()) != null) {
    sb.append(lineTxt);
    }
    return sb.toString();
    } finally {
    inputStream.close();
    fs.close();
    }
    }

    /**
    读取HDFS文件列表
    @param path
    @return
    @throws Exception
    /
    public static List> listFile(String path) throws Exception {
    if (StringUtils.isEmpty(path)) {
    return null;
    }
    if (!existFile(path)) {
    return null;
    }

    FileSystem fs = getFileSystem();
    // 目标路径
    Path srcPath = new Path(path);
    // 递归找到所有文件
    RemoteIterator filesList = fs.listFiles(srcPath, true);
    List> returnList = new ArrayList<>();
    while (filesList.hasNext()) {
    LocatedFileStatus next = filesList.next();
    String fileName = next.getPath().getName();
    Path filePath = next.getPath();
    Map map = new HashMap<>();
    map.put(“fileName”, fileName);
    map.put(“filePath”, filePath.toString());
    returnList.add(map);
    }
    fs.close();
    return returnList;
    }

    /
    HDFS重命名文件
    @param oldName
    @param newName
    @return
    @throws Exception
    /
    public static boolean renameFile(String oldName, String newName) throws Exception {
    if (StringUtils.isEmpty(oldName) || StringUtils.isEmpty(newName)) {
    return false;
    }
    FileSystem fs = getFileSystem();
    // 原文件目标路径
    Path oldPath = new Path(oldName);
    // 重命名目标路径
    Path newPath = new Path(newName);
    boolean isOk = fs.rename(oldPath, newPath);
    fs.close();
    return isOk;
    }

    /

    删除HDFS文件
    @param path
    @return
    @throws Exception
    /
    public static boolean deleteFile(String path) throws Exception {
    if (StringUtils.isEmpty(path)) {
    return false;
    }
    if (!existFile(path)) {
    return false;
    }
    FileSystem fs = getFileSystem();
    Path srcPath = new Path(path);
    boolean isOk = fs.deleteOnExit(srcPath);
    fs.close();
    return isOk;
    }

    /**
    上传HDFS文件
    @param path
    @param uploadPath
    @throws Exception
    /
    public static void uploadFile(String path, String uploadPath) throws Exception {
    if (StringUtils.isEmpty(path) || StringUtils.isEmpty(uploadPath)) {
    return;
    }
    FileSystem fs = getFileSystem();
    // 上传路径
    Path clientPath = new Path(path);
    // 目标路径
    Path serverPath = new Path(uploadPath);

    // 调用文件系统的文件复制方法,第一个参数是否删除原文件true为删除,默认为false
    fs.copyFromLocalFile(false, clientPath, serverPath);
    fs.close();
    }

    /
    下载HDFS文件
    @param path
    @param downloadPath
    @throws Exception
    */
    public static void downloadFile(String path, String downloadPath) throws Exception {
    if (StringUtils.isEmpty(path) || StringUtils.isEmpty(downloadPath)) {
    return;
    }
    FileSystem fs = getFileSystem();
    // 上传路径
    Path clientPath = new Path(path);
    // 目标路径
    Path serverPath = new Path(downloadPath);

    // 调用文件系统的文件复制方法,第一个参数是否删除原文件true为删除,默认为false
    fs.copyToLocalFile(false, clientPath, serverPath);
    fs.close();
    }

    /

    HDFS文件复制
    @param sourcePath
    @param targetPath
    @throws Exception
    /
    public static void copyFile(String sourcePath, String targetPath) throws Exception {
    if (StringUtils.isEmpty(sourcePath) || StringUtils.isEmpty(targetPath)) {
    return;
    }
    FileSystem fs = getFileSystem();
    // 原始文件路径
    Path oldPath = new Path(sourcePath);
    // 目标路径
    Path newPath = new Path(targetPath);

    FSDataInputStream inputStream = null;
    FSDataOutputStream outputStream = null;
    try {
    inputStream = fs.open(oldPath);
    outputStream = fs.create(newPath);

    IOUtils.copyBytes(inputStream, outputStream, bufferSize, false);
    } finally {
    inputStream.close();
    outputStream.close();
    fs.close();
    }
    }

    /**
    打开HDFS上的文件并返回byte数组
    @param path
    @return
    @throws Exception
    /
    public static byte[] openFileToBytes(String path) throws Exception {
    if (StringUtils.isEmpty(path)) {
    return null;
    }
    if (!existFile(path)) {
    return null;
    }
    FileSystem fs = getFileSystem();
    // 目标路径
    Path srcPath = new Path(path);
    try {
    FSDataInputStream inputStream = fs.open(srcPath);
    return IOUtils.readFullyToByteArray(inputStream);
    } finally {
    fs.close();
    }
    }

    /
    打开HDFS上的文件并返回java对象
    @param path
    @return
    @throws Exception
    */
    public static T openFileToObject(String path, Class clazz) throws Exception {
    if (StringUtils.isEmpty(path)) {
    return null;
    }
    if (!existFile(path)) {
    return null;
    }
    String jsonStr = readFile(path);
    return JsonUtil.fromObject(jsonStr, clazz);
    }

    /

    获取某个文件在HDFS的集群位置
    @param path
    @return
    @throws Exception
    /
    public static BlockLocation[] getFileBlockLocations(String path) throws Exception {
    if (StringUtils.isEmpty(path)) {
    return null;
    }
    if (!existFile(path)) {
    return null;
    }
    FileSystem fs = getFileSystem();
    // 目标路径
    Path srcPath = new Path(path);
    FileStatus fileStatus = fs.getFileStatus(srcPath);
    return fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
    }

    @PostConstruct
    public void getPath() {
    hdfsPath = this.path;
    }

    @PostConstruct
    public void getName() {
    hdfsName = this.username;
    }

    public static String getHdfsPath() {
    return hdfsPath;
    }

    public String getUsername() {
    return username;
    }
    }
    package com.hadoop.hdfs.controller;

    import java.util.List;
    import java.util.Map;

    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.fs.BlockLocation;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestMethod;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.ResponseBody;
    import org.springframework.web.bind.annotation.RestController;
    import org.springframework.web.multipart.MultipartFile;

    import com.hadoop.hdfs.entity.User;
    import com.hadoop.hdfs.service.HdfsService;
    import com.hadoop.util.Result;

    @RestController
    @RequestMapping(“/hadoop/hdfs”)
    public class HdfsAction {

    private static Logger LOGGER = LoggerFactory.getLogger(HdfsAction.class);

    /**
    创建文件夹
    @param path
    @return
    @throws Exception
    /
    @RequestMapping(value = “mkdir”, method = RequestMethod.POST)
    @ResponseBody
    public Result mkdir(@RequestParam(“path”) String path) throws Exception {
    if (StringUtils.isEmpty(path)) {
    LOGGER.debug(“请求参数为空”);
    return new Result(Result.FAILURE, “请求参数为空”);
    }
    // 创建空文件夹
    boolean isOk = HdfsService.mkdir(path);
    if (isOk) {
    LOGGER.debug(“文件夹创建成功”);
    return new Result(Result.SUCCESS, “文件夹创建成功”);
    } else {
    LOGGER.debug(“文件夹创建失败”);
    return new Result(Result.FAILURE, “文件夹创建失败”);
    }
    }

    /
    读取HDFS目录信息
    @param path
    @return
    @throws Exception
    */
    @PostMapping(“/readPathInfo”)
    public Result readPathInfo(@RequestParam(“path”) String path) throws Exception {
    List> list = HdfsService.readPathInfo(path);
    return new Result(Result.SUCCESS, “读取HDFS目录信息成功”, list);
    }

    /

    获取HDFS文件在集群中的位置
    @param path
    @return
    @throws Exception
    /
    @PostMapping(“/getFileBlockLocations”)
    public Result getFileBlockLocations(@RequestParam(“path”) String path) throws Exception {
    BlockLocation[] blockLocations = HdfsService.getFileBlockLocations(path);
    return new Result(Result.SUCCESS, “获取HDFS文件在集群中的位置”, blockLocations);
    }

    /**
    创建文件
    @param path
    @return
    @throws Exception
    /
    @PostMapping(“/createFile”)
    public Result createFile(@RequestParam(“path”) String path, @RequestParam(“file”) MultipartFile file)
    throws Exception {
    if (StringUtils.isEmpty(path) || null == file.getBytes()) {
    return new Result(Result.FAILURE, “请求参数为空”);
    }
    HdfsService.createFile(path, file);
    return new Result(Result.SUCCESS, “创建文件成功”);
    }

    /
    读取HDFS文件内容
    @param path
    @return
    @throws Exception
    */
    @PostMapping(“/readFile”)
    public Result readFile(@RequestParam(“path”) String path) throws Exception {
    String targetPath = HdfsService.readFile(path);
    return new Result(Result.SUCCESS, “读取HDFS文件内容”, targetPath);
    }

    /

    读取HDFS文件转换成Byte类型
    @param path
    @return
    @throws Exception
    /
    @PostMapping(“/openFileToBytes”)
    public Result openFileToBytes(@RequestParam(“path”) String path) throws Exception {
    byte[] files = HdfsService.openFileToBytes(path);
    return new Result(Result.SUCCESS, “读取HDFS文件转换成Byte类型”, files);
    }

    /**
    读取HDFS文件装换成User对象
    @param path
    @return
    @throws Exception
    /
    @PostMapping(“/openFileToUser”)
    public Result openFileToUser(@RequestParam(“path”) String path) throws Exception {
    User user = HdfsService.openFileToObject(path, User.class);
    return new Result(Result.SUCCESS, “读取HDFS文件装换成User对象”, user);
    }

    /
    读取文件列表
    @param path
    @return
    @throws Exception
    */
    @PostMapping(“/listFile”)
    public Result listFile(@RequestParam(“path”) String path) throws Exception {
    if (StringUtils.isEmpty(path)) {
    return new Result(Result.FAILURE, “请求参数为空”);
    }
    List> returnList = HdfsService.listFile(path);
    return new Result(Result.SUCCESS, “读取文件列表成功”, returnList);
    }

    /

    重命名文件
    @param oldName
    @param newName
    @return
    @throws Exception
    /
    @PostMapping(“/renameFile”)
    public Result renameFile(@RequestParam(“oldName”) String oldName, @RequestParam(“newName”) String newName)
    throws Exception {
    if (StringUtils.isEmpty(oldName) || StringUtils.isEmpty(newName)) {
    return new Result(Result.FAILURE, “请求参数为空”);
    }
    boolean isOk = HdfsService.renameFile(oldName, newName);
    if (isOk) {
    return new Result(Result.SUCCESS, “文件重命名成功”);
    } else {
    return new Result(Result.FAILURE, “文件重命名失败”);
    }
    }

    /
    删除文件
    @param path
    @return
    @throws Exception
    */
    @PostMapping(“/deleteFile”)
    public Result deleteFile(@RequestParam(“path”) String path) throws Exception {
    boolean isOk = HdfsService.deleteFile(path);
    if (isOk) {
    return new Result(Result.SUCCESS, “delete file success”);
    } else {
    return new Result(Result.FAILURE, “delete file fail”);
    }
    }

    /

    上传文件
    @param path
    @param uploadPath
    @return
    @throws Exception
    /
    @PostMapping(“/uploadFile”)
    public Result uploadFile(@RequestParam(“path”) String path, @RequestParam(“uploadPath”) String uploadPath)
    throws Exception {
    HdfsService.uploadFile(path, uploadPath);
    return new Result(Result.SUCCESS, “upload file success”);
    }

    /
    下载文件
    @param path
    @param downloadPath
    @return
    @throws Exception
    /
    @PostMapping(“/downloadFile”)
    public Result downloadFile(@RequestParam(“path”) String path, @RequestParam(“downloadPath”) String downloadPath)
    throws Exception {
    HdfsService.downloadFile(path, downloadPath);
    return new Result(Result.SUCCESS, “download file success”);
    }

    /

    HDFS文件复制
    @param sourcePath
    @param targetPath
    @return
    @throws Exception
    /
    @PostMapping(“/copyFile”)
    public Result copyFile(@RequestParam(“sourcePath”) String sourcePath, @RequestParam(“targetPath”) String targetPath)
    throws Exception {
    HdfsService.copyFile(sourcePath, targetPath);
    return new Result(Result.SUCCESS, “copy file success”);
    }

    /*
    查看文件是否已存在
    @param path
    @return
    @throws Exception
    /
    @PostMapping(“/existFile”)
    public Result existFile(@RequestParam(“path”) String path) throws Exception {
    boolean isExist = HdfsService.existFile(path);
    return new Result(Result.SUCCESS, “file isExist: “ + isExist);
    }
    }
    四.一些测试结果截图
    image.png
    image.png


    image.png

    image.png