Blob的中文意思是二进制大对象,Flink中提供了一种Blob服务,用来进行包的管理。在Flink云化的工作中涉及到这个组件,本文就来解析一下Blob server的实现。
Blob使用
k8s模式下,没有NodeManager来提供jar包管理的服务,例如用户的jar,引擎的jar,在Yarn的模式下是由NodeManager的LOCALIZED阶段download到本地的。k8s模式下container启动是去镜像仓库拉一个镜像启动。如果只是用户自己的应用程序管理,可以把用户jar打入镜像中,但是这个方式并不通用。比较通用的场景是镜像提供Flink引擎的框架包,如果用户有自己的依赖jar包,如datastream程序包,或者sql的udf包,单独上传到pod中进行分发。Blob Server在这里就承担这样的功能,下面我们就来看下在k8s中如何使用这个功能
在k8s模式中是使用RestClusterClient进行任务的提交,提交时会通过Rest api像jm启动的RestServer发起任务提交请求.
final CompletableFuture<JobSubmitResponseBody> submissionFuture = requestFuture.thenCompose(requestAndFileUploads -> sendRetriableRequest(JobSubmitHeaders.getInstance(),EmptyMessageParameters.getInstance(),requestAndFileUploads.f0,requestAndFileUploads.f1,isConnectionProblemOrServiceUnavailable()));
对应服务端的处理Handler为JobSubmitHandler,收到请求后,会将jar包上传到BlobServer中
log.info("Uploading jarFiles {} and userArtifacts {} to blob server", jarFiles.toString(), artifacts.toString());ClientUtils.uploadJobGraphFiles(jobGraph, jarFiles, artifacts, () -> new BlobClient(address, configuration));
最终会将jar包传到BlobServer中,并将BlobKey(可以trace到相应的文件),记录在JobInformation中
/** Blob keys for the required jar files */private final Collection<PermanentBlobKey> requiredJarFileBlobKeys;
等到TM启动之后,会收到submitTask请求,这时候就会尝试去blob中load必要的数据
tdd.loadBigData(blobCacheService.getPermanentBlobService());
tdd中存储了jobInformation,这里的information有两种状态
/*** Serialized job information or <tt>null</tt> if offloaded.*/private MaybeOffloaded<JobInformation> serializedJobInformation;if (jobInformationOrBlobKey.isLeft()) {serializedJobInformation = new TaskDeploymentDescriptor.NonOffloaded<>(jobInformationOrBlobKey.left());} else {serializedJobInformation = new TaskDeploymentDescriptor.Offloaded<>(jobInformationOrBlobKey.right());}
在ExecutionGraph创建时,尝试序列化jobInformation并存储,存储时会判断序列化后的大小是否超过1M,超过1MB就会将其OffLoad到BlobServer中,也就造成了上面在提交Task时需要判断这里的序列化后的数据是处于OffLoad还是处于NonOffLoad状态
/** Serialized job information or a blob key pointing to the offloaded job information. */private final Either<SerializedValue<JobInformation>, PermanentBlobKey> jobInformationOrBlobKey;this.jobInformationOrBlobKey = BlobWriter.serializeAndTryOffload(jobInformation, jobInformation.getJobId(), blobWriter);
loadBigData做的事情,就是将可能被持久化到blob中的jobInformation和taskInformation都反序列到对象中,但是这里并不会将jar包存储到内存中,jar包只是jobInformation中的BlobKey的列表,指向真实的jar包数据。
随后开始创建Task,创建Task时,会从blob服务开始download所需要的jar包
this.requiredJarFiles = jobInformation.getRequiredJarFileBlobKeys();
// triggers the download of all missing jar files from the job managerlibraryCache.registerTask(jobId, executionId, requiredJarFiles, requiredClasspaths);
这里的BlobLibraryCacheManager就提供了一种能力,从blob server中下载jar包,并创建一个classloader,能够从这些jar路径中解析使用的用户类。
Blob 实现
其实可以看出Blob的代码设计很好,以上的分析都是在使用端的视图,对于内部的存储,缓存逻辑都做了很好的封装,如果只是看使用部分,基本都不会看到相应的实现。
BlobView提供了对存储在BlobStore中的数据的访问视图,FileSystemBlobStore是基于文件系统实现的BlobStore,basepath是由
high-availability.storageDir 指定的HA path,ha模式下构建的都是FileSystemBlobStore
BlobServer负责接收请求,启动线程处理,并负责创建目录存储Blob和临时文件。主要方法是处理文件的存入和读取,文件类型分为永久和临时,永久类型的文件会存入BlobStore(文件系统),而临时文件只会存入本地的临时目录。继承Thread的run方法处理客户端请求
try {while (!this.shutdownRequested.get()) {BlobServerConnection conn = new BlobServerConnection(serverSocket.accept(), this);try {synchronized (activeConnections) {while (activeConnections.size() >= maxConnections) {activeConnections.wait(2000);}activeConnections.add(conn);}conn.start();conn = null;}finally {if (conn != null) {conn.close();synchronized (activeConnections) {activeConnections.remove(conn);}}}}}
最后一部分实现是BlobCache,是用于在TaskExecutor中维护Blob的缓存,以免多次请求下载,整个代码的分层设计非常清晰,值得学习
Blob与classloader协作
在Blob jar被download到本地,task需要使用相应的类还需要通过classloader加载相应的类才能使用,这里是通过BlobLibraryCacheManager来实现了相应逻辑。
看下类的注释就知道这个类的大致用途
/*** Provides facilities to download a set of libraries (typically JAR files) for a job from a* {@link PermanentBlobService} and create a class loader with references to them.*/public class BlobLibraryCacheManager implements LibraryCacheManager
注册
org.apache.flink.runtime.execution.librarycache.LibraryCacheManager#registerTask
synchronized (lockObject) {LibraryCacheEntry entry = cacheEntries.get(jobId);if (entry == null) {URL[] urls = new URL[requiredJarFiles.size() + requiredClasspaths.size()];int count = 0;try {// add URLs to locally cached JAR filesfor (PermanentBlobKey key : requiredJarFiles) {urls[count] = blobService.getFile(jobId, key).toURI().toURL();++count;}// add classpathsfor (URL url : requiredClasspaths) {urls[count] = url;++count;}cacheEntries.put(jobId, new LibraryCacheEntry(requiredJarFiles, requiredClasspaths, urls, task, classLoaderResolveOrder, alwaysParentFirstPatterns));} catch (Throwable t) {// rethrow or wrapExceptionUtils.tryRethrowIOException(t);throw new IOException("Library cache could not register the user code libraries.", t);}} else {entry.register(task, requiredJarFiles, requiredClasspaths);}}
LibraryCacheEntry记录了每个job所需要的library jar,并创建一个classloader来加载,如果没有Execution指向他的时候就会释放classloader以及相应的class了
这里意思上是每次任务发生fo,都会在这里重新注册一次LibraryEntry(reference + 1)
/*** An entry in the per-job library cache. Tracks which execution attempts* still reference the libraries. Once none reference it any more, the* class loaders can be cleaned up.*/
classloader
创建classloader来加载相应的blob jar
this.classLoader =FlinkUserCodeClassLoaders.create(classLoaderResolveOrder,libraryURLs,FlinkUserCodeClassLoaders.class.getClassLoader(),alwaysParentFirstPatterns);
自定义顺序加载class文件
@Overrideprotected synchronized Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {// First, check if the class has already been loadedClass<?> c = findLoadedClass(name);if (c == null) {// check whether the class should go parent-firstfor (String alwaysParentFirstPattern : alwaysParentFirstPatterns) {if (name.startsWith(alwaysParentFirstPattern)) {return super.loadClass(name, resolve);}}try {// check the URLs 先自己加载,再由parent classloader加载c = findClass(name);} catch (ClassNotFoundException e) {// let URLClassLoader do it, which will eventually call the parentc = super.loadClass(name, resolve);}}if (resolve) {resolveClass(c);}return c;}
释放
在task执行结束或者fo时会从libraryCacheManager中unregisterTask
public void unregisterTask(JobID jobId, ExecutionAttemptID task) {checkNotNull(jobId, "The JobId must not be null.");checkNotNull(task, "The task execution id must not be null.");synchronized (lockObject) {LibraryCacheEntry entry = cacheEntries.get(jobId);if (entry != null) {if (entry.unregister(task)) {cacheEntries.remove(jobId);entry.releaseClassLoader();}}// else has already been unregistered}}
如果LibraryEntry引用计数为0,就会执行classLoader.close()来释放Metaspace中的类文件
void releaseClassLoader() {try {classLoader.close();} catch (IOException e) {LOG.warn("Failed to release user code class loader for " + Arrays.toString(libraries.toArray()));}}
但是从观察到的现象看这里的classloader对象和class文件不会立马被gc掉,而是等待fullgc的时候才会触发old去的classloader对象清理,并将Metaspace的class文件清理掉,线上也能观察到多次fo之后会存在很多的classloader对象。
这里可能会导致一个问题,在没有设置MaxMetaSpaceSize的情况下,可能会导致内存随着fo飙涨,而因为没有设置MaxMetaSpaceSize,不触发fgc,导致在k8s模式下很容易被cgroup杀死
参考
MetaSpace参考
https://www.atatech.org/articles/149842
https://www.atatech.org/articles/64871
https://www.atatech.org/articles/81068
