SpringBoot2.* Hadoop3.3

前言:HDFS安装参照

CentOS7搭建HDFS(Hadoop3.3)

1、在SpringBoot项目中添加Hadoop客户端操作的pom依赖

:::tips Hadoop核心依赖包采用log4j方式记录日志,spring-boot-start-web 核心jar包日志采用logback 方式记录。这里采用spring-boot-start-web的日志记录方式而排除log4j的相关依赖 :::

  1. <dependency>
  2. <groupId>org.apache.hadoop</groupId>
  3. <artifactId>hadoop-common</artifactId>
  4. <version>3.3.0</version>
  5. <exclusions>
  6. <exclusion>
  7. <groupId>org.slf4j</groupId>
  8. <artifactId>slf4j-log4j12</artifactId>
  9. </exclusion>
  10. </exclusions>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.apache.hadoop</groupId>
  14. <artifactId>hadoop-hdfs</artifactId>
  15. <version>3.3.0</version>
  16. <exclusions>
  17. <exclusion>
  18. <groupId>org.slf4j</groupId>
  19. <artifactId>slf4j-log4j12</artifactId>
  20. </exclusion>
  21. </exclusions>
  22. </dependency>
  23. <dependency>
  24. <groupId>org.apache.hadoop</groupId>
  25. <artifactId>hadoop-client</artifactId>
  26. <version>3.3.0</version>
  27. <exclusions>
  28. <exclusion>
  29. <groupId>org.slf4j</groupId>
  30. <artifactId>slf4j-log4j12</artifactId>
  31. </exclusion>
  32. </exclusions>
  33. </dependency>
  34. <dependency>
  35. <groupId>com.alibaba</groupId>
  36. <artifactId>fastjson</artifactId>
  37. <version>1.2.72</version>
  38. </dependency>

2、Hadoop Client需要做的配置

在客户端环境变量配置HADOOP_USER_NAME

因为在启动Hadoop时,使用root用户启动,Hadoop客户端会读取环境变量中配置的用户名,如果没有则默认会读取系统计算机管理员名称,会报无权限的异常。
image.png

如果是访问云主机,则需要在本地配置主机名称和公网IP的映射

image.png

3、在Java中配置HDFS的信息

A.在ymal配置文件配置主机信息

  1. application:
  2. config:
  3. hdfs:
  4. host: hdfs://120.79.178.68:9100

B.在Config配置文件引用配置参数即可

:::info 以下几个配置项说明:

  1. dfs.replication:设置副本的数量,在JAVA API客户端设置该参数只是为了和服务端启动时设置的副本数量保持一致,不设置则默认为3个副本。
  2. dfs.client.use.datanode.hostname:解决服务Java 操作HDFS时,NameNode 和DataNode 是通过局域网进行通信,NameNode返回地址为 DataNode 的私有 IP,本地无法访问的问题。 :::

    问题的原因:NameNode节点存放的是文件目录,也就是文件夹、文件名称,本地可以通过公网访问 NameNode,所以可以进行文件夹的创建,当上传文件需要写入数据到DataNode时,NameNode 和DataNode 是通过局域网进行通信,NameNode返回地址为 DataNode 的私有 IP,本地无法访问。

  1. import io.renren.service.HdfsService;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. /**
  5. * HDFS相关配置
  6. *
  7. * @author Fcant
  8. * @since 1.0.0
  9. */
  10. @Configuration
  11. public class HdfsConfig {
  12. @Value("${application.config.hdfs.host}")
  13. private String defaultHdfsUri;
  14. @Bean
  15. public HdfsService getHbaseService(){
  16. org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
  17. conf.set("fs.defaultFS",defaultHdfsUri);
  18. conf.set("dfs.client.use.datanode.hostname", "true");
  19. conf.set("dfs.replication", "1");
  20. return new HdfsService(conf,defaultHdfsUri);
  21. }
  22. }

4、通用Java操作HDFS的Service

  1. import com.alibaba.fastjson.JSON;
  2. import org.apache.commons.io.IOUtils;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.BlockLocation;
  5. import org.apache.hadoop.fs.FSDataInputStream;
  6. import org.apache.hadoop.fs.FileStatus;
  7. import org.apache.hadoop.fs.FileSystem;
  8. import org.apache.hadoop.fs.Path;
  9. import org.apache.hadoop.fs.PathFilter;
  10. import org.slf4j.Logger;
  11. import org.slf4j.LoggerFactory;
  12. import java.io.IOException;
  13. import java.nio.charset.Charset;
  14. import java.text.MessageFormat;
  15. import java.util.ArrayList;
  16. import java.util.HashMap;
  17. import java.util.List;
  18. import java.util.Map;
  19. /**
  20. * HDFS相关的基本操作
  21. *
  22. * @author Fcant
  23. * @since 1.0.0
  24. */
  25. public class HdfsService {
  26. private Logger logger = LoggerFactory.getLogger(HdfsService.class);
  27. private Configuration conf = null;
  28. /**
  29. * 默认的HDFS路径,比如:hdfs://192.168.197.130:9000
  30. */
  31. private String defaultHdfsUri;
  32. public HdfsService(Configuration conf,String defaultHdfsUri) {
  33. this.conf = conf;
  34. this.defaultHdfsUri = defaultHdfsUri;
  35. }
  36. /**
  37. * 获取HDFS文件系统
  38. * @return org.apache.hadoop.fs.FileSystem
  39. */
  40. private FileSystem getFileSystem() throws IOException {
  41. return FileSystem.get(conf);
  42. }
  43. /**
  44. * 创建HDFS目录
  45. * @author Fcant
  46. * @since 1.0.0
  47. * @param path HDFS的相对目录路径,比如:/testDir
  48. * @return boolean 是否创建成功
  49. */
  50. public boolean mkdir(String path){
  51. //如果目录已经存在,则直接返回
  52. if(checkExists(path)){
  53. return true;
  54. }else{
  55. FileSystem fileSystem = null;
  56. try {
  57. fileSystem = getFileSystem();
  58. //最终的HDFS文件目录
  59. String hdfsPath = generateHdfsPath(path);
  60. //创建目录
  61. return fileSystem.mkdirs(new Path(hdfsPath));
  62. } catch (IOException e) {
  63. logger.error(MessageFormat.format("创建HDFS目录失败,path:{0}",path),e);
  64. return false;
  65. }finally {
  66. close(fileSystem);
  67. }
  68. }
  69. }
  70. /**
  71. * 接口上传的文件上传到HDFS
  72. *
  73. * @param filePath 文件路径及文件名 Example:/business/2020/12/23/fbfc71d2-7/hello.png
  74. * @param file 接口收到的文件
  75. * @author Fcant 上午 10:36 2020/12/23/0023
  76. */
  77. public void uploadFileToHdfs(String filePath, MultipartFile file) {
  78. Path path = new Path(filePath);
  79. FSDataOutputStream outputStream;
  80. try {
  81. outputStream = getFileSystem().create(path);
  82. outputStream.write(file.getBytes());
  83. outputStream.close();
  84. } catch (IOException e) {
  85. e.printStackTrace();
  86. }
  87. }
  88. /**
  89. * 上传文件至HDFS
  90. * @author Fcant
  91. * @since 1.0.0
  92. * @param srcFile 本地文件路径,比如:D:/test.txt
  93. * @param dstPath HDFS的相对目录路径,比如:/testDir
  94. */
  95. public void uploadFileToHdfs(String srcFile, String dstPath){
  96. this.uploadFileToHdfs(false, true, srcFile, dstPath);
  97. }
  98. /**
  99. * 上传文件至HDFS
  100. * @author Fcant
  101. * @since 1.0.0
  102. * @param delSrc 是否删除本地文件
  103. * @param overwrite 是否覆盖HDFS上面的文件
  104. * @param srcFile 本地文件路径,比如:D:/test.txt
  105. * @param dstPath HDFS的相对目录路径,比如:/testDir
  106. */
  107. public void uploadFileToHdfs(boolean delSrc, boolean overwrite, String srcFile, String dstPath){
  108. //源文件路径
  109. Path localSrcPath = new Path(srcFile);
  110. //目标文件路径
  111. Path hdfsDstPath = new Path(generateHdfsPath(dstPath));
  112. FileSystem fileSystem = null;
  113. try {
  114. fileSystem = getFileSystem();
  115. fileSystem.copyFromLocalFile(delSrc,overwrite,localSrcPath,hdfsDstPath);
  116. } catch (IOException e) {
  117. logger.error(MessageFormat.format("上传文件至HDFS失败,srcFile:{0},dstPath:{1}",srcFile,dstPath),e);
  118. }finally {
  119. close(fileSystem);
  120. }
  121. }
  122. /**
  123. * 判断文件或者目录是否在HDFS上面存在
  124. * @author Fcant
  125. * @since 1.0.0
  126. * @param path HDFS的相对目录路径,比如:/testDir、/testDir/a.txt
  127. * @return boolean
  128. */
  129. public boolean checkExists(String path){
  130. FileSystem fileSystem = null;
  131. try {
  132. fileSystem = getFileSystem();
  133. //最终的HDFS文件目录
  134. String hdfsPath = generateHdfsPath(path);
  135. //创建目录
  136. return fileSystem.exists(new Path(hdfsPath));
  137. } catch (IOException e) {
  138. logger.error(MessageFormat.format("'判断文件或者目录是否在HDFS上面存在'失败,path:{0}",path),e);
  139. return false;
  140. }finally {
  141. close(fileSystem);
  142. }
  143. }
  144. /**
  145. * 获取HDFS上面的某个路径下面的所有文件或目录(不包含子目录)信息
  146. * @author Fcant
  147. * @since 1.0.0
  148. * @param path HDFS的相对目录路径,比如:/testDir
  149. * @return java.util.List<java.util.Map<java.lang.String,java.lang.Object>>
  150. */
  151. public List<Map<String,Object>> listFiles(String path, PathFilter pathFilter){
  152. //返回数据
  153. List<Map<String,Object>> result = new ArrayList<>();
  154. //如果目录已经存在,则继续操作
  155. if(checkExists(path)){
  156. FileSystem fileSystem = null;
  157. try {
  158. fileSystem = getFileSystem();
  159. //最终的HDFS文件目录
  160. String hdfsPath = generateHdfsPath(path);
  161. FileStatus[] statuses;
  162. //根据Path过滤器查询
  163. if(pathFilter != null){
  164. statuses = fileSystem.listStatus(new Path(hdfsPath),pathFilter);
  165. }else{
  166. statuses = fileSystem.listStatus(new Path(hdfsPath));
  167. }
  168. if(statuses != null){
  169. for(FileStatus status : statuses){
  170. //每个文件的属性
  171. Map<String,Object> fileMap = new HashMap<>(2);
  172. fileMap.put("path",status.getPath().toString());
  173. fileMap.put("isDir",status.isDirectory());
  174. result.add(fileMap);
  175. }
  176. }
  177. } catch (IOException e) {
  178. logger.error(MessageFormat.format("获取HDFS上面的某个路径下面的所有文件失败,path:{0}",path),e);
  179. }finally {
  180. close(fileSystem);
  181. }
  182. }
  183. return result;
  184. }
  185. /**
  186. * 从HDFS下载文件至本地
  187. * @author Fcant
  188. * @since 1.0.0
  189. * @param srcFile HDFS的相对目录路径,比如:/testDir/a.txt
  190. * @param dstFile 下载之后本地文件路径(如果本地文件目录不存在,则会自动创建),比如:D:/test.txt
  191. */
  192. public void downloadFileFromHdfs(String srcFile, String dstFile){
  193. //HDFS文件路径
  194. Path hdfsSrcPath = new Path(generateHdfsPath(srcFile));
  195. //下载之后本地文件路径
  196. Path localDstPath = new Path(dstFile);
  197. FileSystem fileSystem = null;
  198. try {
  199. fileSystem = getFileSystem();
  200. fileSystem.copyToLocalFile(hdfsSrcPath,localDstPath);
  201. } catch (IOException e) {
  202. logger.error(MessageFormat.format("从HDFS下载文件至本地失败,srcFile:{0},dstFile:{1}",srcFile,dstFile),e);
  203. }finally {
  204. close(fileSystem);
  205. }
  206. }
  207. /**
  208. * 打开HDFS上面的文件并返回 InputStream
  209. * @author Fcant
  210. * @since 1.0.0
  211. * @param path HDFS的相对目录路径,比如:/testDir/c.txt
  212. * @return FSDataInputStream
  213. */
  214. public FSDataInputStream open(String path){
  215. //HDFS文件路径
  216. Path hdfsPath = new Path(generateHdfsPath(path));
  217. FileSystem fileSystem = null;
  218. try {
  219. fileSystem = getFileSystem();
  220. return fileSystem.open(hdfsPath);
  221. } catch (IOException e) {
  222. logger.error(MessageFormat.format("打开HDFS上面的文件失败,path:{0}",path),e);
  223. }
  224. return null;
  225. }
  226. /**
  227. * 打开HDFS上面的文件并返回byte数组,方便Web端下载文件
  228. * <p>new ResponseEntity<byte[]>(byte数组, headers, HttpStatus.CREATED);</p>
  229. * <p>或者:new ResponseEntity<byte[]>(FileUtils.readFileToByteArray(templateFile), headers, HttpStatus.CREATED);</p>
  230. * @author Fcant
  231. * @since 1.0.0
  232. * @param path HDFS的相对目录路径,比如:/testDir/b.txt
  233. * @return FSDataInputStream
  234. */
  235. public byte[] openWithBytes(String path){
  236. //HDFS文件路径
  237. Path hdfsPath = new Path(generateHdfsPath(path));
  238. FileSystem fileSystem = null;
  239. FSDataInputStream inputStream = null;
  240. try {
  241. fileSystem = getFileSystem();
  242. inputStream = fileSystem.open(hdfsPath);
  243. return IOUtils.toByteArray(inputStream);
  244. } catch (IOException e) {
  245. logger.error(MessageFormat.format("打开HDFS上面的文件失败,path:{0}",path),e);
  246. }finally {
  247. if(inputStream != null){
  248. try {
  249. inputStream.close();
  250. } catch (IOException e) {
  251. // ignore
  252. }
  253. }
  254. }
  255. return null;
  256. }
  257. /**
  258. * 打开HDFS上面的文件并返回String字符串
  259. * @author Fcant
  260. * @since 1.0.0
  261. * @param path HDFS的相对目录路径,比如:/testDir/b.txt
  262. * @return FSDataInputStream
  263. */
  264. public String openWithString(String path){
  265. //HDFS文件路径
  266. Path hdfsPath = new Path(generateHdfsPath(path));
  267. FileSystem fileSystem = null;
  268. FSDataInputStream inputStream = null;
  269. try {
  270. fileSystem = getFileSystem();
  271. inputStream = fileSystem.open(hdfsPath);
  272. return IOUtils.toString(inputStream, Charset.forName("UTF-8"));
  273. } catch (IOException e) {
  274. logger.error(MessageFormat.format("打开HDFS上面的文件失败,path:{0}",path),e);
  275. }finally {
  276. if(inputStream != null){
  277. try {
  278. inputStream.close();
  279. } catch (IOException e) {
  280. // ignore
  281. }
  282. }
  283. }
  284. return null;
  285. }
  286. /**
  287. * 打开HDFS上面的文件并转换为Java对象(需要HDFS上门的文件内容为JSON字符串)
  288. * @author Fcant
  289. * @since 1.0.0
  290. * @param path HDFS的相对目录路径,比如:/testDir/c.txt
  291. * @return FSDataInputStream
  292. */
  293. public <T extends Object> T openWithObject(String path, Class<T> clazz){
  294. //1、获得文件的json字符串
  295. String jsonStr = this.openWithString(path);
  296. //2、使用com.alibaba.fastjson.JSON将json字符串转化为Java对象并返回
  297. return JSON.parseObject(jsonStr, clazz);
  298. }
  299. /**
  300. * 重命名
  301. * @author Fcant
  302. * @since 1.0.0
  303. * @param srcFile 重命名之前的HDFS的相对目录路径,比如:/testDir/b.txt
  304. * @param dstFile 重命名之后的HDFS的相对目录路径,比如:/testDir/b_new.txt
  305. */
  306. public boolean rename(String srcFile, String dstFile) {
  307. //HDFS文件路径
  308. Path srcFilePath = new Path(generateHdfsPath(srcFile));
  309. //下载之后本地文件路径
  310. Path dstFilePath = new Path(dstFile);
  311. FileSystem fileSystem = null;
  312. try {
  313. fileSystem = getFileSystem();
  314. return fileSystem.rename(srcFilePath,dstFilePath);
  315. } catch (IOException e) {
  316. logger.error(MessageFormat.format("重命名失败,srcFile:{0},dstFile:{1}",srcFile,dstFile),e);
  317. }finally {
  318. close(fileSystem);
  319. }
  320. return false;
  321. }
  322. /**
  323. * 删除HDFS文件或目录
  324. * @author Fcant
  325. * @since 1.0.0
  326. * @param path HDFS的相对目录路径,比如:/testDir/c.txt
  327. * @return boolean
  328. */
  329. public boolean delete(String path) {
  330. //HDFS文件路径
  331. Path hdfsPath = new Path(generateHdfsPath(path));
  332. FileSystem fileSystem = null;
  333. try {
  334. fileSystem = getFileSystem();
  335. return fileSystem.delete(hdfsPath,true);
  336. } catch (IOException e) {
  337. logger.error(MessageFormat.format("删除HDFS文件或目录失败,path:{0}",path),e);
  338. }finally {
  339. close(fileSystem);
  340. }
  341. return false;
  342. }
  343. /**
  344. * 获取某个文件在HDFS集群的位置
  345. * @author Fcant
  346. * @since 1.0.0
  347. * @param path HDFS的相对目录路径,比如:/testDir/a.txt
  348. * @return org.apache.hadoop.fs.BlockLocation[]
  349. */
  350. public BlockLocation[] getFileBlockLocations(String path) {
  351. //HDFS文件路径
  352. Path hdfsPath = new Path(generateHdfsPath(path));
  353. FileSystem fileSystem = null;
  354. try {
  355. fileSystem = getFileSystem();
  356. FileStatus fileStatus = fileSystem.getFileStatus(hdfsPath);
  357. return fileSystem.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
  358. } catch (IOException e) {
  359. logger.error(MessageFormat.format("获取某个文件在HDFS集群的位置失败,path:{0}",path),e);
  360. }finally {
  361. close(fileSystem);
  362. }
  363. return null;
  364. }
  365. /**
  366. * 将相对路径转化为HDFS文件路径
  367. * @author Fcant
  368. * @since 1.0.0
  369. * @param dstPath 相对路径,比如:/data
  370. * @return java.lang.String
  371. */
  372. private String generateHdfsPath(String dstPath){
  373. String hdfsPath = defaultHdfsUri;
  374. if(dstPath.startsWith("/")){
  375. hdfsPath += dstPath;
  376. }else{
  377. hdfsPath = hdfsPath + "/" + dstPath;
  378. }
  379. return hdfsPath;
  380. }
  381. /**
  382. * close方法
  383. */
  384. private void close(FileSystem fileSystem){
  385. if(fileSystem != null){
  386. try {
  387. fileSystem.close();
  388. } catch (IOException e) {
  389. logger.error(e.getMessage());
  390. }
  391. }
  392. }
  393. }