FileSystem 这个类是最常用的类,因为只要是我们想连接 HDFS 进行一些操作,程序的入口必定是它。接下来主要分析一下,本地代码是如何通过 FileSystem 连接上 hadoop 并且读取数据的

本地代码示例

由于要连接的集群是一个配置 HA 的 hadoop 集群,所有需要把 core-site.xml 和 hdfs-site.xml 文件放到resources 目录中

  1. public static void main(String[] args) throws IOException {
  2. // 构建配置, 默认加载core-site.xml hdfs-site.xml
  3. Configuration conf = new Configuration();
  4. // 构建FileSystem——核心
  5. FileSystem fs = FileSystem.get(conf);
  6. //读取根目录上有哪些文件.
  7. FileStatus[] list = fs.listStatus(new Path("/"));
  8. for (FileStatus file:list ) {
  9. System.out.println(file.getPath().getName());
  10. }
  11. }
  1. <dependency>
  2. <groupId>org.apache.hadoop</groupId>
  3. <artifactId>hadoop-client</artifactId>
  4. <version>3.2.1</version>
  5. </dependency>

解析

FileSystem.get(conf)

  1. // 代码核心
  2. FileSystem fs = FileSystem.get(conf);
  1. public static FileSystem get(Configuration conf) throws IOException{
  2. return get(getDefaultUri(conf), conf);
  3. }
  4. public static URI getDefaultUri(Configuration conf) {
  5. URI uri =
  6. URI.create(fixName(conf.getTrimmed(FS_DEFAULT_NAME_KEY, DEFAULT_FS)));
  7. if (uri.getScheme() == null) {
  8. throw new IllegalArgumentException("No scheme in default FS: " + uri);
  9. }
  10. return uri; // 返回core-site.xml中的fs.defaultFS: hdfs://master
  11. }

FileSystem get(URI uri, Configuration conf)

通过 CACHE.get(uri, conf); 获取对应的 FileSystem。默认配置是走缓存,如果设置不走缓存则自动创建一个去走,就是:createFileSystem 方法

  1. public static FileSystem get(URI uri, Configuration conf) throws IOException {
  2. String scheme = uri.getScheme();
  3. String authority = uri.getAuthority();
  4. if (scheme == null && authority == null) { // use default FS
  5. return get(conf);
  6. }
  7. if (scheme != null && authority == null) { // no authority
  8. URI defaultUri = getDefaultUri(conf);
  9. if (scheme.equals(defaultUri.getScheme()) // if scheme matches default
  10. && defaultUri.getAuthority() != null) { // & default has authority
  11. return get(defaultUri, conf); // return default
  12. }
  13. }
  14. String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
  15. if (conf.getBoolean(disableCacheName, false)) {
  16. LOGGER.debug("Bypassing cache to create filesystem {}", uri);
  17. return createFileSystem(uri, conf);
  18. }
  19. return CACHE.get(uri, conf);
  20. }
  1. FileSystem get(URI uri, Configuration conf) throws IOException{
  2. FileSystem.Cache.Key key = new FileSystem.Cache.Key(uri, conf);
  3. return this.getInternal(uri, conf, key);
  4. }
  5. /** The objects inserted into the cache using this method are all unique. */
  6. FileSystem getUnique(URI uri, Configuration conf) throws IOException{
  7. FileSystem.Cache.Key key = new FileSystem.Cache.Key(uri, conf, unique.getAndIncrement());
  8. return this.getInternal(uri, conf, key);
  9. }

缓存里面的方法就是根据 uri 和 conf 生成的 key ,在缓存中查询是否有已经创建的 FileSystem ,有就复用,没有就创建。并将创建的 FileSystem 加入缓存

CreateFileSystem(uri, conf)

  1. private static FileSystem createFileSystem(URI uri, Configuration conf) throws IOException {
  2. Tracer tracer = FsTracer.get(conf);
  3. TraceScope scope = tracer.newScope("FileSystem#createFileSystem");
  4. Throwable var4 = null;
  5. FileSystem var7;
  6. try {
  7. scope.addKVAnnotation("scheme", uri.getScheme());
  8. // 根据配置获取需要加载的FileSystem实现类
  9. Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
  10. // 实例化配置 FileSystem
  11. FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
  12. // 对 FileSystem 进行初始化
  13. fs.initialize(uri, conf);
  14. var7 = fs;
  15. } catch (Throwable var16) {
  16. var4 = var16;
  17. throw var16;
  18. } finally {
  19. if (scope != null) {
  20. if (var4 != null) {
  21. try {
  22. scope.close();
  23. } catch (Throwable var15) {
  24. var4.addSuppressed(var15);
  25. }
  26. } else {
  27. scope.close();
  28. }
  29. }
  30. }
  31. return var7;
  32. }

这里其实就是通过配置获取到 FileSystem 的实现类,通过反射进行实例化,然后执行 initialize 方法进行初始化,然后返回 FileSystem 对象就行了

如何通过配置获取到对应的 FileSystem 的实现类

  1. Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
  1. public static Class<? extends FileSystem> getFileSystemClass(String scheme,
  2. Configuration conf) throws IOException {
  3. if (!FILE_SYSTEMS_LOADED) {
  4. loadFileSystems();
  5. }
  6. LOGGER.debug("Looking for FS supporting {}", scheme);
  7. Class<? extends FileSystem> clazz = null;
  8. if (conf != null) {
  9. // 这里的 property 为: "fs.hdfs.impl"
  10. String property = "fs." + scheme + ".impl";
  11. LOGGER.debug("looking for configuration option {}", property);
  12. clazz = conf.getClass(property, (Class)null);
  13. } else {
  14. LOGGER.debug("No configuration: skipping check for fs.{}.impl", scheme);
  15. }
  16. if (clazz == null) {
  17. LOGGER.debug("Looking in service filesystems for implementation class");
  18. clazz = (Class)SERVICE_FILE_SYSTEMS.get(scheme);
  19. } else {
  20. LOGGER.debug("Filesystem {} defined in configuration option", scheme);
  21. }
  22. if (clazz == null) {
  23. throw new UnsupportedFileSystemException("No FileSystem for scheme \"" +
  24. scheme + "\"");
  25. } else {
  26. LOGGER.debug("FS for {} is {}", scheme, clazz);
  27. return clazz;
  28. }
  29. }

上面代码是获取 FileSystem 的实现类,首先去配置文件中查找,是否配置了 fs.hdfs.impl 的实现类,如果配置了直接返回,如果没有的话,会查询系统自己自带的实现。也就是 SERVICE_FILE_SYSTEMS 这里系统默认的配置,一共有一下八种:
image.png
我们就可以知道 hdfs 对应的实现类为:org.apache.hadoop.hdfs.DistributedFileSystem

Initialize(URI uri, Configuration conf)

  1. public void initialize(URI uri, Configuration conf) throws IOException {
  2. super.initialize(uri, conf);
  3. this.setConf(conf);
  4. String host = uri.getHost();
  5. if (host == null) {
  6. throw new IOException("Incomplete HDFS URI, no host: "+ uri);
  7. }else{
  8. // 采用DFSClient进行通讯
  9. this.dfs = new DFSClient(uri, conf, this.statistics);
  10. this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
  11. this.workingDir = getHomeDirectory();
  12. this.storageStatistics = (DFSOpsCountStatistics) GlobalStorageStatistics.
  13. INSTANCE.put(DFSOpsCountStatistics.NAME, new StorageStatisticsProvider() {
  14. @Override
  15. public StorageStatistics provide() {
  16. return new DFSOpsCountStatistics();
  17. }
  18. });
  19. }
  20. }

DFSClient 主要是负责与 hadoop 通讯的客户端类

  1. public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf, Statistics stats) throws IOException {
  2. this.clientRunning = true;
  3. this.r = new Random();
  4. this.filesBeingWritten = new HashMap();
  5. this.tracer = FsTracer.get(conf);
  6. this.dfsClientConf = new DfsClientConf(conf);
  7. this.conf = conf;
  8. this.stats = stats;
  9. // 构建socket通讯工厂类: StandardSocketFactory
  10. this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
  11. this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
  12. this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
  13. this.dtpReplaceDatanodeOnFailureReplication = (short)conf.getInt("dfs.client.block.write.replace-datanode-on-failure.min-replication", 0);
  14. if (LOG.isDebugEnabled()) {
  15. LOG.debug("Sets dfs.client.block.write.replace-datanode-on-failure.min-replication to " + this.dtpReplaceDatanodeOnFailureReplication);
  16. }
  17. this.ugi = UserGroupInformation.getCurrentUser();
  18. this.namenodeUri = nameNodeUri;
  19. // 客户端的名字
  20. this.clientName = "DFSClient_" + this.dfsClientConf.getTaskId() + "_" + ThreadLocalRandom.current().nextInt() + "_" + Thread.currentThread().getId();
  21. int numResponseToDrop = conf.getInt("dfs.client.test.drop.namenode.response.number", 0);
  22. // 通讯协议 ClientProtocol : NameNodeProxiesClient
  23. ProxyAndInfo<ClientProtocol> proxyInfo = null;
  24. AtomicBoolean nnFallbackToSimpleAuth = new AtomicBoolean(false);
  25. if (numResponseToDrop > 0) {
  26. LOG.warn("dfs.client.test.drop.namenode.response.number is set to " + numResponseToDrop + ", this hacked client will proactively drop responses");
  27. proxyInfo = NameNodeProxiesClient.createProxyWithLossyRetryHandler(conf, nameNodeUri, ClientProtocol.class, numResponseToDrop, nnFallbackToSimpleAuth);
  28. }
  29. if (proxyInfo != null) {
  30. this.dtService = proxyInfo.getDelegationTokenService();
  31. this.namenode = (ClientProtocol)proxyInfo.getProxy();
  32. } else if (rpcNamenode != null) {
  33. Preconditions.checkArgument(nameNodeUri == null);
  34. this.namenode = rpcNamenode;
  35. this.dtService = null;
  36. } else {
  37. Preconditions.checkArgument(nameNodeUri != null, "null URI");
  38. proxyInfo = NameNodeProxiesClient.createProxyWithClientProtocol(conf, nameNodeUri, nnFallbackToSimpleAuth);
  39. this.dtService = proxyInfo.getDelegationTokenService();
  40. this.namenode = (ClientProtocol)proxyInfo.getProxy();
  41. }
  42. String[] localInterfaces = conf.getTrimmedStrings("dfs.client.local.interfaces");
  43. this.localInterfaceAddrs = getLocalInterfaceAddrs(localInterfaces);
  44. if (LOG.isDebugEnabled() && 0 != localInterfaces.length) {
  45. LOG.debug("Using local interfaces [" + Joiner.on(',').join(localInterfaces) + "] with addresses [" + Joiner.on(',').join(this.localInterfaceAddrs) + "]");
  46. }
  47. Boolean readDropBehind = conf.get("dfs.client.cache.drop.behind.reads") == null ? null : conf.getBoolean("dfs.client.cache.drop.behind.reads", false);
  48. Long readahead = conf.get("dfs.client.cache.readahead") == null ? null : conf.getLong("dfs.client.cache.readahead", 0L);
  49. this.serverDefaultsValidityPeriod = conf.getLong("dfs.client.server-defaults.validity.period.ms", HdfsClientConfigKeys.DFS_CLIENT_SERVER_DEFAULTS_VALIDITY_PERIOD_MS_DEFAULT);
  50. Boolean writeDropBehind = conf.get("dfs.client.cache.drop.behind.writes") == null ? null : conf.getBoolean("dfs.client.cache.drop.behind.writes", false);
  51. this.defaultReadCachingStrategy = new CachingStrategy(readDropBehind, readahead);
  52. this.defaultWriteCachingStrategy = new CachingStrategy(writeDropBehind, readahead);
  53. this.clientContext = ClientContext.get(conf.get("dfs.client.context", "default"), this.dfsClientConf, conf);
  54. if (this.dfsClientConf.getHedgedReadThreadpoolSize() > 0) {
  55. initThreadsNumForHedgedReads(this.dfsClientConf.getHedgedReadThreadpoolSize());
  56. }
  57. this.initThreadsNumForStripedReads(this.dfsClientConf.getStripedReadThreadpoolSize());
  58. this.saslClient = new SaslDataTransferClient(conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf), TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);
  59. }

DFSClient 是采用 socket 进行通讯,默认的通讯工厂类为: org.apache.hadoop.net.StandardSocketFactory

FileSystem 类是通过配置参数匹配到 fs.defaultFS 对应的 FileSystem 实现类,然后由实现类来进行操作

也就是,hdfs://master 匹配的 DistributedFileSystem 实现类,然后 DistributedFileSystem 的实现类里面封装了 DFSClient 类。然后后面的操作都是通过 DFSClient 与 hadoop 进行通讯