FileSystem 这个类是最常用的类,因为只要是我们想连接 HDFS 进行一些操作,程序的入口必定是它。接下来主要分析一下,本地代码是如何通过 FileSystem 连接上 hadoop 并且读取数据的
本地代码示例
由于要连接的集群是一个配置 HA 的 hadoop 集群,所有需要把 core-site.xml 和 hdfs-site.xml 文件放到resources 目录中
public static void main(String[] args) throws IOException {
// 构建配置, 默认加载core-site.xml hdfs-site.xml
Configuration conf = new Configuration();
// 构建FileSystem——核心
FileSystem fs = FileSystem.get(conf);
//读取根目录上有哪些文件.
FileStatus[] list = fs.listStatus(new Path("/"));
for (FileStatus file:list ) {
System.out.println(file.getPath().getName());
}
}
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.1</version>
</dependency>
解析
FileSystem.get(conf)
// 代码核心
FileSystem fs = FileSystem.get(conf);
public static FileSystem get(Configuration conf) throws IOException{
return get(getDefaultUri(conf), conf);
}
public static URI getDefaultUri(Configuration conf) {
URI uri =
URI.create(fixName(conf.getTrimmed(FS_DEFAULT_NAME_KEY, DEFAULT_FS)));
if (uri.getScheme() == null) {
throw new IllegalArgumentException("No scheme in default FS: " + uri);
}
return uri; // 返回core-site.xml中的fs.defaultFS: hdfs://master
}
FileSystem get(URI uri, Configuration conf)
通过 CACHE.get(uri, conf); 获取对应的 FileSystem。默认配置是走缓存,如果设置不走缓存则自动创建一个去走,就是:createFileSystem 方法
public static FileSystem get(URI uri, Configuration conf) throws IOException {
String scheme = uri.getScheme();
String authority = uri.getAuthority();
if (scheme == null && authority == null) { // use default FS
return get(conf);
}
if (scheme != null && authority == null) { // no authority
URI defaultUri = getDefaultUri(conf);
if (scheme.equals(defaultUri.getScheme()) // if scheme matches default
&& defaultUri.getAuthority() != null) { // & default has authority
return get(defaultUri, conf); // return default
}
}
String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
if (conf.getBoolean(disableCacheName, false)) {
LOGGER.debug("Bypassing cache to create filesystem {}", uri);
return createFileSystem(uri, conf);
}
return CACHE.get(uri, conf);
}
FileSystem get(URI uri, Configuration conf) throws IOException{
FileSystem.Cache.Key key = new FileSystem.Cache.Key(uri, conf);
return this.getInternal(uri, conf, key);
}
/** The objects inserted into the cache using this method are all unique. */
FileSystem getUnique(URI uri, Configuration conf) throws IOException{
FileSystem.Cache.Key key = new FileSystem.Cache.Key(uri, conf, unique.getAndIncrement());
return this.getInternal(uri, conf, key);
}
缓存里面的方法就是根据 uri 和 conf 生成的 key ,在缓存中查询是否有已经创建的 FileSystem ,有就复用,没有就创建。并将创建的 FileSystem 加入缓存
CreateFileSystem(uri, conf)
private static FileSystem createFileSystem(URI uri, Configuration conf) throws IOException {
Tracer tracer = FsTracer.get(conf);
TraceScope scope = tracer.newScope("FileSystem#createFileSystem");
Throwable var4 = null;
FileSystem var7;
try {
scope.addKVAnnotation("scheme", uri.getScheme());
// 根据配置获取需要加载的FileSystem实现类
Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
// 实例化配置 FileSystem
FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
// 对 FileSystem 进行初始化
fs.initialize(uri, conf);
var7 = fs;
} catch (Throwable var16) {
var4 = var16;
throw var16;
} finally {
if (scope != null) {
if (var4 != null) {
try {
scope.close();
} catch (Throwable var15) {
var4.addSuppressed(var15);
}
} else {
scope.close();
}
}
}
return var7;
}
这里其实就是通过配置获取到 FileSystem 的实现类,通过反射进行实例化,然后执行 initialize 方法进行初始化,然后返回 FileSystem 对象就行了
如何通过配置获取到对应的 FileSystem 的实现类
Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
public static Class<? extends FileSystem> getFileSystemClass(String scheme,
Configuration conf) throws IOException {
if (!FILE_SYSTEMS_LOADED) {
loadFileSystems();
}
LOGGER.debug("Looking for FS supporting {}", scheme);
Class<? extends FileSystem> clazz = null;
if (conf != null) {
// 这里的 property 为: "fs.hdfs.impl"
String property = "fs." + scheme + ".impl";
LOGGER.debug("looking for configuration option {}", property);
clazz = conf.getClass(property, (Class)null);
} else {
LOGGER.debug("No configuration: skipping check for fs.{}.impl", scheme);
}
if (clazz == null) {
LOGGER.debug("Looking in service filesystems for implementation class");
clazz = (Class)SERVICE_FILE_SYSTEMS.get(scheme);
} else {
LOGGER.debug("Filesystem {} defined in configuration option", scheme);
}
if (clazz == null) {
throw new UnsupportedFileSystemException("No FileSystem for scheme \"" +
scheme + "\"");
} else {
LOGGER.debug("FS for {} is {}", scheme, clazz);
return clazz;
}
}
上面代码是获取 FileSystem 的实现类,首先去配置文件中查找,是否配置了 fs.hdfs.impl 的实现类,如果配置了直接返回,如果没有的话,会查询系统自己自带的实现。也就是 SERVICE_FILE_SYSTEMS 这里系统默认的配置,一共有一下八种:
我们就可以知道 hdfs 对应的实现类为:org.apache.hadoop.hdfs.DistributedFileSystem
Initialize(URI uri, Configuration conf)
public void initialize(URI uri, Configuration conf) throws IOException {
super.initialize(uri, conf);
this.setConf(conf);
String host = uri.getHost();
if (host == null) {
throw new IOException("Incomplete HDFS URI, no host: "+ uri);
}else{
// 采用DFSClient进行通讯
this.dfs = new DFSClient(uri, conf, this.statistics);
this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
this.workingDir = getHomeDirectory();
this.storageStatistics = (DFSOpsCountStatistics) GlobalStorageStatistics.
INSTANCE.put(DFSOpsCountStatistics.NAME, new StorageStatisticsProvider() {
@Override
public StorageStatistics provide() {
return new DFSOpsCountStatistics();
}
});
}
}
DFSClient 主要是负责与 hadoop 通讯的客户端类
public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf, Statistics stats) throws IOException {
this.clientRunning = true;
this.r = new Random();
this.filesBeingWritten = new HashMap();
this.tracer = FsTracer.get(conf);
this.dfsClientConf = new DfsClientConf(conf);
this.conf = conf;
this.stats = stats;
// 构建socket通讯工厂类: StandardSocketFactory
this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
this.dtpReplaceDatanodeOnFailureReplication = (short)conf.getInt("dfs.client.block.write.replace-datanode-on-failure.min-replication", 0);
if (LOG.isDebugEnabled()) {
LOG.debug("Sets dfs.client.block.write.replace-datanode-on-failure.min-replication to " + this.dtpReplaceDatanodeOnFailureReplication);
}
this.ugi = UserGroupInformation.getCurrentUser();
this.namenodeUri = nameNodeUri;
// 客户端的名字
this.clientName = "DFSClient_" + this.dfsClientConf.getTaskId() + "_" + ThreadLocalRandom.current().nextInt() + "_" + Thread.currentThread().getId();
int numResponseToDrop = conf.getInt("dfs.client.test.drop.namenode.response.number", 0);
// 通讯协议 ClientProtocol : NameNodeProxiesClient
ProxyAndInfo<ClientProtocol> proxyInfo = null;
AtomicBoolean nnFallbackToSimpleAuth = new AtomicBoolean(false);
if (numResponseToDrop > 0) {
LOG.warn("dfs.client.test.drop.namenode.response.number is set to " + numResponseToDrop + ", this hacked client will proactively drop responses");
proxyInfo = NameNodeProxiesClient.createProxyWithLossyRetryHandler(conf, nameNodeUri, ClientProtocol.class, numResponseToDrop, nnFallbackToSimpleAuth);
}
if (proxyInfo != null) {
this.dtService = proxyInfo.getDelegationTokenService();
this.namenode = (ClientProtocol)proxyInfo.getProxy();
} else if (rpcNamenode != null) {
Preconditions.checkArgument(nameNodeUri == null);
this.namenode = rpcNamenode;
this.dtService = null;
} else {
Preconditions.checkArgument(nameNodeUri != null, "null URI");
proxyInfo = NameNodeProxiesClient.createProxyWithClientProtocol(conf, nameNodeUri, nnFallbackToSimpleAuth);
this.dtService = proxyInfo.getDelegationTokenService();
this.namenode = (ClientProtocol)proxyInfo.getProxy();
}
String[] localInterfaces = conf.getTrimmedStrings("dfs.client.local.interfaces");
this.localInterfaceAddrs = getLocalInterfaceAddrs(localInterfaces);
if (LOG.isDebugEnabled() && 0 != localInterfaces.length) {
LOG.debug("Using local interfaces [" + Joiner.on(',').join(localInterfaces) + "] with addresses [" + Joiner.on(',').join(this.localInterfaceAddrs) + "]");
}
Boolean readDropBehind = conf.get("dfs.client.cache.drop.behind.reads") == null ? null : conf.getBoolean("dfs.client.cache.drop.behind.reads", false);
Long readahead = conf.get("dfs.client.cache.readahead") == null ? null : conf.getLong("dfs.client.cache.readahead", 0L);
this.serverDefaultsValidityPeriod = conf.getLong("dfs.client.server-defaults.validity.period.ms", HdfsClientConfigKeys.DFS_CLIENT_SERVER_DEFAULTS_VALIDITY_PERIOD_MS_DEFAULT);
Boolean writeDropBehind = conf.get("dfs.client.cache.drop.behind.writes") == null ? null : conf.getBoolean("dfs.client.cache.drop.behind.writes", false);
this.defaultReadCachingStrategy = new CachingStrategy(readDropBehind, readahead);
this.defaultWriteCachingStrategy = new CachingStrategy(writeDropBehind, readahead);
this.clientContext = ClientContext.get(conf.get("dfs.client.context", "default"), this.dfsClientConf, conf);
if (this.dfsClientConf.getHedgedReadThreadpoolSize() > 0) {
initThreadsNumForHedgedReads(this.dfsClientConf.getHedgedReadThreadpoolSize());
}
this.initThreadsNumForStripedReads(this.dfsClientConf.getStripedReadThreadpoolSize());
this.saslClient = new SaslDataTransferClient(conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf), TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);
}
DFSClient 是采用 socket 进行通讯,默认的通讯工厂类为: org.apache.hadoop.net.StandardSocketFactory
FileSystem 类是通过配置参数匹配到 fs.defaultFS 对应的 FileSystem 实现类,然后由实现类来进行操作
也就是,hdfs://master 匹配的 DistributedFileSystem 实现类,然后 DistributedFileSystem 的实现类里面封装了 DFSClient 类。然后后面的操作都是通过 DFSClient 与 hadoop 进行通讯