接下来就是上代码了,这里使用java调用API说明。

配置环境:

这里是在windows下的环境配置,我在mac环境下没有配置程序也能够运行。

  1. 将hadoop的插件包放置在eclipse的安装目录的plugins
    hadoop-eclipse-plugin-2.7.5.jar 添加过去。
    编译好的jarhttps://github.com/HuangDongdong666/Hadoop-eclipse-plugin-2.8.3
  2. 配置本地的hadoop环境 安装单机版的windows下的hadoop ,用于代码调试

    1. 解压hadoop的安装包
    2. 配置hadoop的windows下的环境变量

      1. HADOOP_HOME=C:\soft\hadoop-3.2.1
      2. PATH= 末尾追加 ;%HADOOP_HOME%\bin;%HADOOP_HOME%\sbin
    3. 添加插件https://github.com/cdarlint/winutils

      • hadoop.dll放在windows下的System32文件夹下
      • 将winutil.exe放在hadoop安装目录的bin目录下
    4. 重启eclipse
    5. 配置eclipse的可视化操作界面
      1. 配置eclipse下的hadoop的环境变量
        • windows>>prefrences>>左侧菜单栏搜索 hadoop >> browse 导入hadoop的windows下的安装目录
      2. 配置hadoop的控制台界面
        • window>>>show view>>other>>搜索map 选择map/reduce,面板会出现黄色小象
      3. 配置hadoop集群的本地连接
        • 前提:先配置windows下的hosts文件
        • 修改eclipse的显示方式:project explorer

创建工程

导入依赖包

有下面3种方式,为了方便我使用maven进行依赖管理

  1. 1)在工程下建一个lib的包 bulid PATH
  2. 工程移动的时候比较方便
  3. 不能解决jar包冲入问题 工程比较臃肿
  4. 2maven
  5. 自动解决jar包依赖冲突问题 便于项目的管理
  6. 工程移动的时候需要重新构建 构建工程比较麻烦
  7. 3)自己创建一个本地的library 导入

案例

本地文件系统对象上传

  1. package pers.sfl.hdfs;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.FileSystem;
  4. import org.apache.hadoop.fs.Path;
  5. import java.io.IOException;
  6. public class TestHdfs1 {
  7. public static void main(String[] args) throws IOException {
  8. // Configuration 配置文件对象 加载配置文件 或设置配置文件
  9. Configuration conf = new Configuration();
  10. // 获取hdfs的操作对象 获取的是hdfs的操作句柄对象 FileSystem对象
  11. // hdfs的文件系统对象 想要操作hdfs必须先拿到这个对象
  12. FileSystem fileSystem = FileSystem.get(conf);
  13. // org.apache.hadoop.fs.LocalFileSystem@37574691 本地文件系统对象:文件上传和下载都在本地操作的
  14. System.out.println(fileSystem);
  15. Path src=new Path("/Users/sun/Documents/idea/bigdata/src/main/resources/name.txt");
  16. Path dst=new Path("/Users/sun/Documents/idea/bigdata/src/main/resources/name2.txt");
  17. /**
  18. *
  19. * 文件上传
  20. * put 本地 hdfs路径
  21. * copyfromlocal-----api
  22. * movefromlocal
  23. */
  24. //文件上传 上传到本地 文件传到了工程的根目录下
  25. //文件上传的时候生成了两个文件 原始文件name2.txt 另外一个文件 .name2.txt.crc
  26. fileSystem.copyFromLocalFile(src, dst);
  27. fileSystem.close();
  28. }
  29. }

结果

  1. SUN:resources sun$ pwd
  2. /Users/sun/Documents/idea/bigdata/src/main/resources
  3. SUN:resources sun$ ll
  4. total 24
  5. drwxr-xr-x 5 sun staff 160 Oct 24 16:01 ./
  6. drwxr-xr-x 4 sun staff 128 Oct 22 20:19 ../
  7. -rw-r--r-- 1 sun staff 12 Oct 24 16:01 .name2.txt.crc
  8. -rw-r--r-- 1 sun staff 18 Oct 24 15:58 name.txt
  9. -rw-r--r-- 1 sun staff 18 Oct 24 16:01 name2.txt

分布式文件系统的对象

  • conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
  • FileSystem fs=FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");
  1. package pers.sfl.hdfs;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.FileSystem;
  4. import org.apache.hadoop.fs.Path;
  5. import java.io.IOException;
  6. public class TestHdfs1 {
  7. public static void main(String[] args) throws IOException {
  8. // Configuration 配置文件对象 加载配置文件 或设置配置文件
  9. Configuration conf = new Configuration();
  10. //没有指定集群的连接入口 想要获取集群的需要指定集群的连接入口
  11. //在conf对象上设置连接入口
  12. /**
  13. * 参数1:配置文件的属性名
  14. * 参数2:配置文件的属性的值
  15. */
  16. //conf.set("dfs.replication", "4");
  17. //conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
  18. //参数1:namenode的连接入口
  19. FileSystem fs=FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");
  20. // 获取hdfs的操作对象 获取的是hdfs的操作句柄对象 FileSystem对象
  21. FileSystem fileSystem = FileSystem.get(conf);
  22. // org.apache.hadoop.fs.LocalFileSystem@37574691 本地文件系统对象:文件上传和下载都在本地操作的
  23. System.out.println(fileSystem);
  24. Path src=new Path("/Users/sun/Documents/idea/bigdata/src/main/resources/name.txt");
  25. Path dst=new Path("/Users/sun/Documents/idea/bigdata/src/main/resources/name3.txt");
  26. //文件上传
  27. fileSystem.copyFromLocalFile(src, dst);
  28. fileSystem.close();
  29. }
  30. }

结果

  1. [root@hadoop01 ~]# hadoop fs -ls /wordcount/output
  2. Found 3 items
  3. -rw-r--r-- 3 root supergroup 0 2019-10-22 13:28 /wordcount/output/_SUCCESS
  4. -rw-r--r-- 3 root supergroup 77 2019-10-22 13:28 /wordcount/output/part-r-00000
  5. -rw-r--r-- 3 root supergroup 119 2019-10-22 13:28 /wordcount/output/part-r-00001
  6. [root@hadoop01 ~]# hadoop fs -ls /wordcount/output
  7. Found 4 items
  8. -rw-r--r-- 3 root supergroup 0 2019-10-22 13:28 /wordcount/output/_SUCCESS
  9. -rw-r--r-- 3 root supergroup 18 2019-10-23 00:16 /wordcount/output/name3.txt
  10. -rw-r--r-- 3 root supergroup 77 2019-10-22 13:28 /wordcount/output/part-r-00000
  11. -rw-r--r-- 3 root supergroup 119 2019-10-22 13:28 /wordcount/output/part-r-00001

文件上传的时候报一个权限的问题

原因是用户在eclipse中进行上传文件的时候 使用windows下的用户 而不是hdfs的安装用户。

解决权限问题:

  1. 在代码提交的时候指定用户

    1. 代码运行的时候 右键》》 run configurations>> 配置运行代码需要的参数 -DHADOOP_USER_NAME=hadoop
    2. -> program arguments:代码程序运行需要的参数
    3. 代码中需要控制台传入的参数 这个参数通过main(String[] args)
    4. -> VM arguments:JVM运行过程中需要的参数
    5. 比如jvm内存大小 jvm运行的用户指定
  2. 在代码中指定用户

    • 指定系统的运行参数 System ``` System.setProperty(“HADOOP_USER_NAME”, “hadoop”);

//或者

FileSystem fs=FileSystem.get(new URI(“hdfs://hadoop01:9000”), conf, “hadoop”);

  1. 3. windows下添加一个hadoop用户不建议
  2. <a name="b1067c44"></a>
  3. ## Configuration配置文件加载说明
  4. 配置文件加载的是**默认的配置文件**
  5. - 默认加载的配置文件是 工程下hadoop-hdfs-3.2.1.jarjar包下的hdfs-default.xml这个文件
  6. - 进行文件上传的时候 所有的参数都是默认的参数

Configuration对象默认加载:

  • hdfs-default.xml
  • mapred-default.xml
  • yarn-default.xml
  • core-default.xml
  • 想要加载自己的配置文件,需要将自己的配置文件拷贝到工程的src下才能,默认加载到自己的配置文件
  • 自动加载的配置文件只能识别
  • hdfs-default.xml
  • hdfs-site.xml
  • mapred-default.xml
  • mapred-site.xml
  • yarn-default.xml
  • yarn-site.xml
  • core-default.xml
  • core-site.xml
  • 配置如果没有放在src下?需要代码中手动加载配置文件
  • conf.addResource(“conf/hdfs-site.xml”);
  • 在代码中也可以手动设置配置文件 ```

配置文件的加载顺序:

  1. * 1jar
  2. * 2src
  3. * 3)代码中的
  4. * 生效:
  5. * 3 >> 2 >> 1

API 操作

  • 上传
  • 下载
  • 创建文件夹
  • 删除文件夹和文件
  • 判断文件是否存在
  1. package pers.sfl.hdfs;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.*;
  4. import java.io.IOException;
  5. import java.net.URI;
  6. import java.net.URISyntaxException;
  7. import java.util.Arrays;
  8. public class TestAPI {
  9. public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException {
  10. Configuration conf = new Configuration();
  11. FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");
  12. //文件上传 本地---hdfs
  13. fs.copyFromLocalFile(new Path("D:\\movies.dat"), new Path("/movie_tt"));
  14. //文件下载
  15. fs.copyToLocalFile(new Path("/movie_tt"), new Path("D:\\movie01.avi"));
  16. //创建文件夹 hdfs dfs -mkdir 级联创建文件夹
  17. boolean isfinish = fs.mkdirs(new Path("/bd1807/aa/ss/ff"));
  18. // System.out.println(isfinish);
  19. //delete() 既可以删除文件 也可以删除文件夹的
  20. //删除文件 返回值 bool 删除--true 否则--false
  21. System.out.println(fs.delete(new Path("/aa001"),false));
  22. //删除文件夹
  23. /**
  24. * 参数1:需要删除的路径 文件 文件夹
  25. * 参数2:是否需要级联删除 true需要 false --不需要
  26. */
  27. fs.delete(new Path("/ss"),true);
  28. //判断文件夹/文件是否存在 如果存在 则返回true 否则--false
  29. boolean ise=fs.exists(new Path("/bd1807")); //mkdir/delete
  30. //System.out.println(ise);
  31. if(fs.exists(new Path("/movie_tt"))){
  32. fs.delete(new Path("/movie_tt"),false);
  33. }
  34. fs.close();
  35. }
  36. }

API查看文件的信息

  1. package pers.sfl.hdfs;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.*;
  4. import java.io.IOException;
  5. import java.net.URI;
  6. import java.net.URISyntaxException;
  7. import java.util.Arrays;
  8. public class TestAPI {
  9. public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException {
  10. Configuration conf = new Configuration();
  11. FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");
  12. //1.查看文件的信息 指定路径下的文件信息 参数1--需要查看的路径 参数2--是否级联 ls -R
  13. RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), false);
  14. //迭代器 迭代 hasNext next()
  15. //LocatedFileStatus 文件状态对象
  16. //封装的文件状态 -- 文件的路径 文件的大小 文件的用户 组 文件的副本
  17. while(listFiles.hasNext()){
  18. System.out.println("------------------------------");
  19. //next对象 获取的是文件的状态信息 一个next代表的是一个文件
  20. LocatedFileStatus next = listFiles.next();
  21. System.out.println(next.getPath());
  22. System.out.println(next.getBlockSize());//获取的配置文件的分块大小 128M
  23. System.out.println(next.getLen());//获取文件的真实大小 块的实际大小
  24. System.out.println(next.getReplication());
  25. //获取的是文件的切块信息 每一个文件的切块信息
  26. //BlockLocation 数据块的封装对象
  27. //包括数据块的存储位置 数据块的大小 数据块的所属用户 组
  28. BlockLocation[] blockLocations = next.getBlockLocations();
  29. //[0,268435456,hadoop03,hadoop02, 268435456,90761455,hadoop03,hadoop02]
  30. //[0起始的偏移量, 268435456 当前块的实际大小,hadoop03,hadoop02, 副本存放的位置==> 文件的第一个块
  31. //268435456,90761455,hadoop03,hadoop02] ==> 第二个块
  32. System.out.println(Arrays.toString(blockLocations));
  33. //循环遍历每一个文件的每一个数据块
  34. for(BlockLocation bl:blockLocations){
  35. System.out.print("这个块的起始偏移量"+bl.getOffset()+"\t");
  36. System.out.print("这个块的长度"+bl.getLength()+"\t");
  37. //这个是同一个数据块的所有副本存放的位置
  38. String[] hosts = bl.getHosts();
  39. System.out.println("这个数据块的副本存放位置"+Arrays.toString(hosts));
  40. }
  41. }
  42. fs.close();
  43. }
  44. }
  45. ------------------------------
  46. hdfs://hadoop01:9000/hadoop-3.2.1.tar.gz
  47. 268435456
  48. 359196911
  49. 2
  50. [0,268435456,hadoop03,hadoop02, 268435456,90761455,hadoop03,hadoop02]
  51. 这个块的起始偏移量0 这个块的长度268435456 这个数据块的副本存放位置[hadoop03, hadoop02]
  52. 这个块的起始偏移量268435456 这个块的长度90761455 这个数据块的副本存放位置[hadoop03, hadoop02]
  53. ------------------------------
  54. hdfs://hadoop01:9000/jdk-8u221-linux-x64.tar.gz
  55. 268435456
  56. 195094741
  57. 2
  58. [0,195094741,hadoop03,hadoop02]
  59. 这个块的起始偏移量0 这个块的长度195094741 这个数据块的副本存放位置[hadoop03, hadoop02]

API查看文件夹的信息2

看不到块信息 可以看到文件夹

  1. package pers.sfl.hdfs;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.*;
  4. import java.io.IOException;
  5. import java.net.URI;
  6. import java.net.URISyntaxException;
  7. import java.util.Arrays;
  8. public class TestAPI {
  9. public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException {
  10. Configuration conf = new Configuration();
  11. FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");
  12. //2.查看指定目录的文件或文件夹的状态信息
  13. //FileStatus 文件或文件夹的状态对象 封装的文件或文件夹的 用户 组 长度 路径
  14. FileStatus[] listStatus = fs.listStatus(new Path("/"));
  15. for (FileStatus fss : listStatus) {
  16. System.out.println("=======================");
  17. System.out.println(fss.getPath());
  18. System.out.println(fss.getBlockSize());
  19. System.out.println(fss.getLen());
  20. System.out.println(fss.getReplication());
  21. }
  22. fs.close();
  23. }
  24. }
  25. =======================
  26. hdfs://hadoop01:9000/hadoop-3.2.1.tar.gz
  27. 268435456
  28. 359196911
  29. 2
  30. =======================
  31. hdfs://hadoop01:9000/jdk-8u221-linux-x64.tar.gz
  32. 268435456
  33. 195094741
  34. 2
  35. =======================
  36. hdfs://hadoop01:9000/wordcount
  37. 0
  38. 0
  39. 0
  40. Process finished with exit code 0

IO流本地上传

  1. package pers.sfl.hdfs;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.FSDataOutputStream;
  4. import org.apache.hadoop.fs.FileSystem;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.IOUtils;
  7. import java.io.FileInputStream;
  8. import java.io.IOException;
  9. import java.net.URI;
  10. import java.net.URISyntaxException;
  11. public class TestIO {
  12. public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException {
  13. Configuration conf = new Configuration();
  14. FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");
  15. // 文件上传
  16. // 本地 -> 读 -> 输入流
  17. // hdfs -> 写 -> 输出流
  18. //创建本地的输入流
  19. FileInputStream in = new FileInputStream("D:\\movie01.avi");
  20. //创建hdfs的输出流 fs
  21. FSDataOutputStream out = fs.create(new Path("/movie_yy01"));
  22. //进行读写 IOUtils hadoop进行文件读写的工具类
  23. //参数1:输入流 参数2:输出流 参数3:缓冲大小
  24. IOUtils.copyBytes(in, out, 1024);
  25. //关闭流
  26. in.close();
  27. out.close();
  28. //参数1:输入流 参数2:输出流 参数3:缓冲大小 参数4:执行完成 是否关闭流
  29. IOUtils.copyBytes(in, out, 1024, true);
  30. //参数3---long 读取的字节数 读取指定的字节数
  31. IOUtils.copyBytes(in, out, 2L, true);
  32. }
  33. }

IO 下载

  1. package pers.sfl.hdfs;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.FSDataInputStream;
  4. import org.apache.hadoop.fs.FileSystem;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.IOUtils;
  7. import java.io.FileOutputStream;
  8. import java.io.IOException;
  9. import java.net.URI;
  10. import java.net.URISyntaxException;
  11. public class testIO2 {
  12. public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException {
  13. Configuration conf = new Configuration();
  14. FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");
  15. //文件下载
  16. // hdfs -> 读 -> 输入流
  17. // 本地 -> 写 -> 输出流
  18. //hdfs的输入流 fs
  19. FSDataInputStream in = fs.open(new Path("/movie_yy"));
  20. //这个方法将流的指针设置到某一个字节开始读取 参数--偏移量
  21. in.seek(10L);
  22. //创建本地的输出流
  23. FileOutputStream out = new FileOutputStream("D:\\moo02");
  24. //进行流的复制
  25. //参数3是Long类型:读取的字节数 读取指定的字节数
  26. IOUtils.copyBytes(in, out, 100L, true);
  27. }
  28. }

crc校验

检测数据是否损坏的措施是,在数据第一次引入系统时候计算校验和(checksum),并在数据通过一个不可靠的通道时候进行传输时再次计算校验和,这样就能发现数据是否损坏了,如果两次计算的校验和不匹配,你就认为数据已经损坏了,但是该技术不能修复数据,它只能检测出错误。常用的错误检测码是CRC-32(循环冗余校验),任何大小的数据输入均计算得到一个32位的整数校验和。

  1. package pers.sfl.hdfs;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.FileSystem;
  4. import org.apache.hadoop.fs.Path;
  5. import java.io.IOException;
  6. import java.net.URI;
  7. import java.net.URISyntaxException;
  8. public class TestCrc {
  9. public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException {
  10. Configuration conf=new Configuration();
  11. FileSystem fs=FileSystem.get(new URI("hdfs://hadoop01:9000"), conf,"hadoop");
  12. Path src=new Path("/testcrc");
  13. Path dst=new Path("D:\\test04");
  14. /**
  15. * 文件下载过程中 只要有一个是没有损坏的副本 下载是没有损坏的块 crc校验是可通过的
  16. *
  17. * crc校验的时候:校验的内容只是原始文件的偏移量内的内容
  18. * 只要这部分内容没有发生变化 -> 校验通过
  19. * 这部分内容 发生变化 -> 校验不通过的
  20. */
  21. fs.copyToLocalFile(src, dst);
  22. fs.close();
  23. }
  24. }