FileSystem 这个类是最常用的类,因为只要是我们想连接 HDFS 进行一些操作,程序的入口必定是它。接下来主要分析一下,本地代码是如何通过 FileSystem 连接上 hadoop 并且读取数据的
本地代码示例
由于要连接的集群是一个配置 HA 的 hadoop 集群,所有需要把 core-site.xml 和 hdfs-site.xml 文件放到resources 目录中
public static void main(String[] args) throws IOException {// 构建配置, 默认加载core-site.xml hdfs-site.xmlConfiguration conf = new Configuration();// 构建FileSystem——核心FileSystem fs = FileSystem.get(conf);//读取根目录上有哪些文件.FileStatus[] list = fs.listStatus(new Path("/"));for (FileStatus file:list ) {System.out.println(file.getPath().getName());}}
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.2.1</version></dependency>
解析
FileSystem.get(conf)
// 代码核心FileSystem fs = FileSystem.get(conf);
public static FileSystem get(Configuration conf) throws IOException{return get(getDefaultUri(conf), conf);}public static URI getDefaultUri(Configuration conf) {URI uri =URI.create(fixName(conf.getTrimmed(FS_DEFAULT_NAME_KEY, DEFAULT_FS)));if (uri.getScheme() == null) {throw new IllegalArgumentException("No scheme in default FS: " + uri);}return uri; // 返回core-site.xml中的fs.defaultFS: hdfs://master}
FileSystem get(URI uri, Configuration conf)
通过 CACHE.get(uri, conf); 获取对应的 FileSystem。默认配置是走缓存,如果设置不走缓存则自动创建一个去走,就是:createFileSystem 方法
public static FileSystem get(URI uri, Configuration conf) throws IOException {String scheme = uri.getScheme();String authority = uri.getAuthority();if (scheme == null && authority == null) { // use default FSreturn get(conf);}if (scheme != null && authority == null) { // no authorityURI defaultUri = getDefaultUri(conf);if (scheme.equals(defaultUri.getScheme()) // if scheme matches default&& defaultUri.getAuthority() != null) { // & default has authorityreturn get(defaultUri, conf); // return default}}String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);if (conf.getBoolean(disableCacheName, false)) {LOGGER.debug("Bypassing cache to create filesystem {}", uri);return createFileSystem(uri, conf);}return CACHE.get(uri, conf);}
FileSystem get(URI uri, Configuration conf) throws IOException{FileSystem.Cache.Key key = new FileSystem.Cache.Key(uri, conf);return this.getInternal(uri, conf, key);}/** The objects inserted into the cache using this method are all unique. */FileSystem getUnique(URI uri, Configuration conf) throws IOException{FileSystem.Cache.Key key = new FileSystem.Cache.Key(uri, conf, unique.getAndIncrement());return this.getInternal(uri, conf, key);}
缓存里面的方法就是根据 uri 和 conf 生成的 key ,在缓存中查询是否有已经创建的 FileSystem ,有就复用,没有就创建。并将创建的 FileSystem 加入缓存
CreateFileSystem(uri, conf)
private static FileSystem createFileSystem(URI uri, Configuration conf) throws IOException {Tracer tracer = FsTracer.get(conf);TraceScope scope = tracer.newScope("FileSystem#createFileSystem");Throwable var4 = null;FileSystem var7;try {scope.addKVAnnotation("scheme", uri.getScheme());// 根据配置获取需要加载的FileSystem实现类Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);// 实例化配置 FileSystemFileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);// 对 FileSystem 进行初始化fs.initialize(uri, conf);var7 = fs;} catch (Throwable var16) {var4 = var16;throw var16;} finally {if (scope != null) {if (var4 != null) {try {scope.close();} catch (Throwable var15) {var4.addSuppressed(var15);}} else {scope.close();}}}return var7;}
这里其实就是通过配置获取到 FileSystem 的实现类,通过反射进行实例化,然后执行 initialize 方法进行初始化,然后返回 FileSystem 对象就行了
如何通过配置获取到对应的 FileSystem 的实现类
Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
public static Class<? extends FileSystem> getFileSystemClass(String scheme,Configuration conf) throws IOException {if (!FILE_SYSTEMS_LOADED) {loadFileSystems();}LOGGER.debug("Looking for FS supporting {}", scheme);Class<? extends FileSystem> clazz = null;if (conf != null) {// 这里的 property 为: "fs.hdfs.impl"String property = "fs." + scheme + ".impl";LOGGER.debug("looking for configuration option {}", property);clazz = conf.getClass(property, (Class)null);} else {LOGGER.debug("No configuration: skipping check for fs.{}.impl", scheme);}if (clazz == null) {LOGGER.debug("Looking in service filesystems for implementation class");clazz = (Class)SERVICE_FILE_SYSTEMS.get(scheme);} else {LOGGER.debug("Filesystem {} defined in configuration option", scheme);}if (clazz == null) {throw new UnsupportedFileSystemException("No FileSystem for scheme \"" +scheme + "\"");} else {LOGGER.debug("FS for {} is {}", scheme, clazz);return clazz;}}
上面代码是获取 FileSystem 的实现类,首先去配置文件中查找,是否配置了 fs.hdfs.impl 的实现类,如果配置了直接返回,如果没有的话,会查询系统自己自带的实现。也就是 SERVICE_FILE_SYSTEMS 这里系统默认的配置,一共有一下八种:
我们就可以知道 hdfs 对应的实现类为:org.apache.hadoop.hdfs.DistributedFileSystem
Initialize(URI uri, Configuration conf)
public void initialize(URI uri, Configuration conf) throws IOException {super.initialize(uri, conf);this.setConf(conf);String host = uri.getHost();if (host == null) {throw new IOException("Incomplete HDFS URI, no host: "+ uri);}else{// 采用DFSClient进行通讯this.dfs = new DFSClient(uri, conf, this.statistics);this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());this.workingDir = getHomeDirectory();this.storageStatistics = (DFSOpsCountStatistics) GlobalStorageStatistics.INSTANCE.put(DFSOpsCountStatistics.NAME, new StorageStatisticsProvider() {@Overridepublic StorageStatistics provide() {return new DFSOpsCountStatistics();}});}}
DFSClient 主要是负责与 hadoop 通讯的客户端类
public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf, Statistics stats) throws IOException {this.clientRunning = true;this.r = new Random();this.filesBeingWritten = new HashMap();this.tracer = FsTracer.get(conf);this.dfsClientConf = new DfsClientConf(conf);this.conf = conf;this.stats = stats;// 构建socket通讯工厂类: StandardSocketFactorythis.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);this.dtpReplaceDatanodeOnFailureReplication = (short)conf.getInt("dfs.client.block.write.replace-datanode-on-failure.min-replication", 0);if (LOG.isDebugEnabled()) {LOG.debug("Sets dfs.client.block.write.replace-datanode-on-failure.min-replication to " + this.dtpReplaceDatanodeOnFailureReplication);}this.ugi = UserGroupInformation.getCurrentUser();this.namenodeUri = nameNodeUri;// 客户端的名字this.clientName = "DFSClient_" + this.dfsClientConf.getTaskId() + "_" + ThreadLocalRandom.current().nextInt() + "_" + Thread.currentThread().getId();int numResponseToDrop = conf.getInt("dfs.client.test.drop.namenode.response.number", 0);// 通讯协议 ClientProtocol : NameNodeProxiesClientProxyAndInfo<ClientProtocol> proxyInfo = null;AtomicBoolean nnFallbackToSimpleAuth = new AtomicBoolean(false);if (numResponseToDrop > 0) {LOG.warn("dfs.client.test.drop.namenode.response.number is set to " + numResponseToDrop + ", this hacked client will proactively drop responses");proxyInfo = NameNodeProxiesClient.createProxyWithLossyRetryHandler(conf, nameNodeUri, ClientProtocol.class, numResponseToDrop, nnFallbackToSimpleAuth);}if (proxyInfo != null) {this.dtService = proxyInfo.getDelegationTokenService();this.namenode = (ClientProtocol)proxyInfo.getProxy();} else if (rpcNamenode != null) {Preconditions.checkArgument(nameNodeUri == null);this.namenode = rpcNamenode;this.dtService = null;} else {Preconditions.checkArgument(nameNodeUri != null, "null URI");proxyInfo = NameNodeProxiesClient.createProxyWithClientProtocol(conf, nameNodeUri, nnFallbackToSimpleAuth);this.dtService = proxyInfo.getDelegationTokenService();this.namenode = (ClientProtocol)proxyInfo.getProxy();}String[] localInterfaces = conf.getTrimmedStrings("dfs.client.local.interfaces");this.localInterfaceAddrs = getLocalInterfaceAddrs(localInterfaces);if (LOG.isDebugEnabled() && 0 != localInterfaces.length) {LOG.debug("Using local interfaces [" + Joiner.on(',').join(localInterfaces) + "] with addresses [" + Joiner.on(',').join(this.localInterfaceAddrs) + "]");}Boolean readDropBehind = conf.get("dfs.client.cache.drop.behind.reads") == null ? null : conf.getBoolean("dfs.client.cache.drop.behind.reads", false);Long readahead = conf.get("dfs.client.cache.readahead") == null ? null : conf.getLong("dfs.client.cache.readahead", 0L);this.serverDefaultsValidityPeriod = conf.getLong("dfs.client.server-defaults.validity.period.ms", HdfsClientConfigKeys.DFS_CLIENT_SERVER_DEFAULTS_VALIDITY_PERIOD_MS_DEFAULT);Boolean writeDropBehind = conf.get("dfs.client.cache.drop.behind.writes") == null ? null : conf.getBoolean("dfs.client.cache.drop.behind.writes", false);this.defaultReadCachingStrategy = new CachingStrategy(readDropBehind, readahead);this.defaultWriteCachingStrategy = new CachingStrategy(writeDropBehind, readahead);this.clientContext = ClientContext.get(conf.get("dfs.client.context", "default"), this.dfsClientConf, conf);if (this.dfsClientConf.getHedgedReadThreadpoolSize() > 0) {initThreadsNumForHedgedReads(this.dfsClientConf.getHedgedReadThreadpoolSize());}this.initThreadsNumForStripedReads(this.dfsClientConf.getStripedReadThreadpoolSize());this.saslClient = new SaslDataTransferClient(conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf), TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);}
DFSClient 是采用 socket 进行通讯,默认的通讯工厂类为: org.apache.hadoop.net.StandardSocketFactory
FileSystem 类是通过配置参数匹配到 fs.defaultFS 对应的 FileSystem 实现类,然后由实现类来进行操作
也就是,hdfs://master 匹配的 DistributedFileSystem 实现类,然后 DistributedFileSystem 的实现类里面封装了 DFSClient 类。然后后面的操作都是通过 DFSClient 与 hadoop 进行通讯
