Blob的中文意思是二进制大对象,Flink中提供了一种Blob服务,用来进行包的管理。在Flink云化的工作中涉及到这个组件,本文就来解析一下Blob server的实现。
Blob使用
k8s模式下,没有NodeManager来提供jar包管理的服务,例如用户的jar,引擎的jar,在Yarn的模式下是由NodeManager的LOCALIZED阶段download到本地的。k8s模式下container启动是去镜像仓库拉一个镜像启动。如果只是用户自己的应用程序管理,可以把用户jar打入镜像中,但是这个方式并不通用。比较通用的场景是镜像提供Flink引擎的框架包,如果用户有自己的依赖jar包,如datastream程序包,或者sql的udf包,单独上传到pod中进行分发。Blob Server在这里就承担这样的功能,下面我们就来看下在k8s中如何使用这个功能
在k8s模式中是使用RestClusterClient进行任务的提交,提交时会通过Rest api像jm启动的RestServer发起任务提交请求.
final CompletableFuture<JobSubmitResponseBody> submissionFuture = requestFuture.thenCompose(
requestAndFileUploads -> sendRetriableRequest(
JobSubmitHeaders.getInstance(),
EmptyMessageParameters.getInstance(),
requestAndFileUploads.f0,
requestAndFileUploads.f1,
isConnectionProblemOrServiceUnavailable())
);
对应服务端的处理Handler为JobSubmitHandler,收到请求后,会将jar包上传到BlobServer中
log.info("Uploading jarFiles {} and userArtifacts {} to blob server", jarFiles.toString(), artifacts.toString());
ClientUtils.uploadJobGraphFiles(jobGraph, jarFiles, artifacts, () -> new BlobClient(address, configuration));
最终会将jar包传到BlobServer中,并将BlobKey(可以trace到相应的文件),记录在JobInformation中
/** Blob keys for the required jar files */
private final Collection<PermanentBlobKey> requiredJarFileBlobKeys;
等到TM启动之后,会收到submitTask请求,这时候就会尝试去blob中load必要的数据
tdd.loadBigData(blobCacheService.getPermanentBlobService());
tdd中存储了jobInformation,这里的information有两种状态
/**
* Serialized job information or <tt>null</tt> if offloaded.
*/
private MaybeOffloaded<JobInformation> serializedJobInformation;
if (jobInformationOrBlobKey.isLeft()) {
serializedJobInformation = new TaskDeploymentDescriptor.NonOffloaded<>(jobInformationOrBlobKey.left());
} else {
serializedJobInformation = new TaskDeploymentDescriptor.Offloaded<>(jobInformationOrBlobKey.right());
}
在ExecutionGraph创建时,尝试序列化jobInformation并存储,存储时会判断序列化后的大小是否超过1M,超过1MB就会将其OffLoad到BlobServer中,也就造成了上面在提交Task时需要判断这里的序列化后的数据是处于OffLoad还是处于NonOffLoad状态
/** Serialized job information or a blob key pointing to the offloaded job information. */
private final Either<SerializedValue<JobInformation>, PermanentBlobKey> jobInformationOrBlobKey;
this.jobInformationOrBlobKey = BlobWriter.serializeAndTryOffload(jobInformation, jobInformation.getJobId(), blobWriter);
loadBigData做的事情,就是将可能被持久化到blob中的jobInformation和taskInformation都反序列到对象中,但是这里并不会将jar包存储到内存中,jar包只是jobInformation中的BlobKey的列表,指向真实的jar包数据。
随后开始创建Task,创建Task时,会从blob服务开始download所需要的jar包
this.requiredJarFiles = jobInformation.getRequiredJarFileBlobKeys();
// triggers the download of all missing jar files from the job manager
libraryCache.registerTask(jobId, executionId, requiredJarFiles, requiredClasspaths);
这里的BlobLibraryCacheManager就提供了一种能力,从blob server中下载jar包,并创建一个classloader,能够从这些jar路径中解析使用的用户类。
Blob 实现
其实可以看出Blob的代码设计很好,以上的分析都是在使用端的视图,对于内部的存储,缓存逻辑都做了很好的封装,如果只是看使用部分,基本都不会看到相应的实现。
BlobView提供了对存储在BlobStore中的数据的访问视图,FileSystemBlobStore是基于文件系统实现的BlobStore,basepath是由 high-availability.storageDir
指定的HA path,ha模式下构建的都是FileSystemBlobStore
BlobServer负责接收请求,启动线程处理,并负责创建目录存储Blob和临时文件。主要方法是处理文件的存入和读取,文件类型分为永久和临时,永久类型的文件会存入BlobStore(文件系统),而临时文件只会存入本地的临时目录。继承Thread的run方法处理客户端请求
try {
while (!this.shutdownRequested.get()) {
BlobServerConnection conn = new BlobServerConnection(serverSocket.accept(), this);
try {
synchronized (activeConnections) {
while (activeConnections.size() >= maxConnections) {
activeConnections.wait(2000);
}
activeConnections.add(conn);
}
conn.start();
conn = null;
}
finally {
if (conn != null) {
conn.close();
synchronized (activeConnections) {
activeConnections.remove(conn);
}
}
}
}
}
最后一部分实现是BlobCache,是用于在TaskExecutor中维护Blob的缓存,以免多次请求下载,整个代码的分层设计非常清晰,值得学习
Blob与classloader协作
在Blob jar被download到本地,task需要使用相应的类还需要通过classloader加载相应的类才能使用,这里是通过BlobLibraryCacheManager来实现了相应逻辑。
看下类的注释就知道这个类的大致用途
/**
* Provides facilities to download a set of libraries (typically JAR files) for a job from a
* {@link PermanentBlobService} and create a class loader with references to them.
*/
public class BlobLibraryCacheManager implements LibraryCacheManager
注册
org.apache.flink.runtime.execution.librarycache.LibraryCacheManager#registerTask
synchronized (lockObject) {
LibraryCacheEntry entry = cacheEntries.get(jobId);
if (entry == null) {
URL[] urls = new URL[requiredJarFiles.size() + requiredClasspaths.size()];
int count = 0;
try {
// add URLs to locally cached JAR files
for (PermanentBlobKey key : requiredJarFiles) {
urls[count] = blobService.getFile(jobId, key).toURI().toURL();
++count;
}
// add classpaths
for (URL url : requiredClasspaths) {
urls[count] = url;
++count;
}
cacheEntries.put(jobId, new LibraryCacheEntry(
requiredJarFiles, requiredClasspaths, urls, task, classLoaderResolveOrder, alwaysParentFirstPatterns));
} catch (Throwable t) {
// rethrow or wrap
ExceptionUtils.tryRethrowIOException(t);
throw new IOException(
"Library cache could not register the user code libraries.", t);
}
} else {
entry.register(task, requiredJarFiles, requiredClasspaths);
}
}
LibraryCacheEntry记录了每个job所需要的library jar,并创建一个classloader来加载,如果没有Execution指向他的时候就会释放classloader以及相应的class了
这里意思上是每次任务发生fo,都会在这里重新注册一次LibraryEntry(reference + 1)
/**
* An entry in the per-job library cache. Tracks which execution attempts
* still reference the libraries. Once none reference it any more, the
* class loaders can be cleaned up.
*/
classloader
创建classloader来加载相应的blob jar
this.classLoader =
FlinkUserCodeClassLoaders.create(
classLoaderResolveOrder,
libraryURLs,
FlinkUserCodeClassLoaders.class.getClassLoader(),
alwaysParentFirstPatterns);
自定义顺序加载class文件
@Override
protected synchronized Class<?> loadClass(
String name, boolean resolve) throws ClassNotFoundException {
// First, check if the class has already been loaded
Class<?> c = findLoadedClass(name);
if (c == null) {
// check whether the class should go parent-first
for (String alwaysParentFirstPattern : alwaysParentFirstPatterns) {
if (name.startsWith(alwaysParentFirstPattern)) {
return super.loadClass(name, resolve);
}
}
try {
// check the URLs 先自己加载,再由parent classloader加载
c = findClass(name);
} catch (ClassNotFoundException e) {
// let URLClassLoader do it, which will eventually call the parent
c = super.loadClass(name, resolve);
}
}
if (resolve) {
resolveClass(c);
}
return c;
}
释放
在task执行结束或者fo时会从libraryCacheManager中unregisterTask
public void unregisterTask(JobID jobId, ExecutionAttemptID task) {
checkNotNull(jobId, "The JobId must not be null.");
checkNotNull(task, "The task execution id must not be null.");
synchronized (lockObject) {
LibraryCacheEntry entry = cacheEntries.get(jobId);
if (entry != null) {
if (entry.unregister(task)) {
cacheEntries.remove(jobId);
entry.releaseClassLoader();
}
}
// else has already been unregistered
}
}
如果LibraryEntry引用计数为0,就会执行classLoader.close()来释放Metaspace中的类文件
void releaseClassLoader() {
try {
classLoader.close();
} catch (IOException e) {
LOG.warn("Failed to release user code class loader for " + Arrays.toString(libraries.toArray()));
}
}
但是从观察到的现象看这里的classloader对象和class文件不会立马被gc掉,而是等待fullgc的时候才会触发old去的classloader对象清理,并将Metaspace的class文件清理掉,线上也能观察到多次fo之后会存在很多的classloader对象。
这里可能会导致一个问题,在没有设置MaxMetaSpaceSize的情况下,可能会导致内存随着fo飙涨,而因为没有设置MaxMetaSpaceSize,不触发fgc,导致在k8s模式下很容易被cgroup杀死
参考
MetaSpace参考
https://www.atatech.org/articles/149842
https://www.atatech.org/articles/64871
https://www.atatech.org/articles/81068