Blob的中文意思是二进制大对象,Flink中提供了一种Blob服务,用来进行包的管理。在Flink云化的工作中涉及到这个组件,本文就来解析一下Blob server的实现。

Blob使用

k8s模式下,没有NodeManager来提供jar包管理的服务,例如用户的jar,引擎的jar,在Yarn的模式下是由NodeManager的LOCALIZED阶段download到本地的。k8s模式下container启动是去镜像仓库拉一个镜像启动。如果只是用户自己的应用程序管理,可以把用户jar打入镜像中,但是这个方式并不通用。比较通用的场景是镜像提供Flink引擎的框架包,如果用户有自己的依赖jar包,如datastream程序包,或者sql的udf包,单独上传到pod中进行分发。Blob Server在这里就承担这样的功能,下面我们就来看下在k8s中如何使用这个功能

在k8s模式中是使用RestClusterClient进行任务的提交,提交时会通过Rest api像jm启动的RestServer发起任务提交请求.

  1. final CompletableFuture<JobSubmitResponseBody> submissionFuture = requestFuture.thenCompose(
  2. requestAndFileUploads -> sendRetriableRequest(
  3. JobSubmitHeaders.getInstance(),
  4. EmptyMessageParameters.getInstance(),
  5. requestAndFileUploads.f0,
  6. requestAndFileUploads.f1,
  7. isConnectionProblemOrServiceUnavailable())
  8. );

对应服务端的处理Handler为JobSubmitHandler,收到请求后,会将jar包上传到BlobServer中

  1. log.info("Uploading jarFiles {} and userArtifacts {} to blob server", jarFiles.toString(), artifacts.toString());
  2. ClientUtils.uploadJobGraphFiles(jobGraph, jarFiles, artifacts, () -> new BlobClient(address, configuration));

最终会将jar包传到BlobServer中,并将BlobKey(可以trace到相应的文件),记录在JobInformation中

  1. /** Blob keys for the required jar files */
  2. private final Collection<PermanentBlobKey> requiredJarFileBlobKeys;

等到TM启动之后,会收到submitTask请求,这时候就会尝试去blob中load必要的数据

  1. tdd.loadBigData(blobCacheService.getPermanentBlobService());

tdd中存储了jobInformation,这里的information有两种状态

  1. /**
  2. * Serialized job information or <tt>null</tt> if offloaded.
  3. */
  4. private MaybeOffloaded<JobInformation> serializedJobInformation;
  5. if (jobInformationOrBlobKey.isLeft()) {
  6. serializedJobInformation = new TaskDeploymentDescriptor.NonOffloaded<>(jobInformationOrBlobKey.left());
  7. } else {
  8. serializedJobInformation = new TaskDeploymentDescriptor.Offloaded<>(jobInformationOrBlobKey.right());
  9. }

在ExecutionGraph创建时,尝试序列化jobInformation并存储,存储时会判断序列化后的大小是否超过1M,超过1MB就会将其OffLoad到BlobServer中,也就造成了上面在提交Task时需要判断这里的序列化后的数据是处于OffLoad还是处于NonOffLoad状态

  1. /** Serialized job information or a blob key pointing to the offloaded job information. */
  2. private final Either<SerializedValue<JobInformation>, PermanentBlobKey> jobInformationOrBlobKey;
  3. this.jobInformationOrBlobKey = BlobWriter.serializeAndTryOffload(jobInformation, jobInformation.getJobId(), blobWriter);

loadBigData做的事情,就是将可能被持久化到blob中的jobInformation和taskInformation都反序列到对象中,但是这里并不会将jar包存储到内存中,jar包只是jobInformation中的BlobKey的列表,指向真实的jar包数据。

随后开始创建Task,创建Task时,会从blob服务开始download所需要的jar包

  1. this.requiredJarFiles = jobInformation.getRequiredJarFileBlobKeys();
  1. // triggers the download of all missing jar files from the job manager
  2. libraryCache.registerTask(jobId, executionId, requiredJarFiles, requiredClasspaths);

这里的BlobLibraryCacheManager就提供了一种能力,从blob server中下载jar包,并创建一个classloader,能够从这些jar路径中解析使用的用户类。

Blob 实现

其实可以看出Blob的代码设计很好,以上的分析都是在使用端的视图,对于内部的存储,缓存逻辑都做了很好的封装,如果只是看使用部分,基本都不会看到相应的实现。

Flink Blob Server | binary large objects - 图1 BlobView提供了对存储在BlobStore中的数据的访问视图,FileSystemBlobStore是基于文件系统实现的BlobStore,basepath是由 high-availability.storageDir 指定的HA path,ha模式下构建的都是FileSystemBlobStore

Flink Blob Server | binary large objects - 图2 BlobServer负责接收请求,启动线程处理,并负责创建目录存储Blob和临时文件。主要方法是处理文件的存入和读取,文件类型分为永久和临时,永久类型的文件会存入BlobStore(文件系统),而临时文件只会存入本地的临时目录。继承Thread的run方法处理客户端请求

  1. try {
  2. while (!this.shutdownRequested.get()) {
  3. BlobServerConnection conn = new BlobServerConnection(serverSocket.accept(), this);
  4. try {
  5. synchronized (activeConnections) {
  6. while (activeConnections.size() >= maxConnections) {
  7. activeConnections.wait(2000);
  8. }
  9. activeConnections.add(conn);
  10. }
  11. conn.start();
  12. conn = null;
  13. }
  14. finally {
  15. if (conn != null) {
  16. conn.close();
  17. synchronized (activeConnections) {
  18. activeConnections.remove(conn);
  19. }
  20. }
  21. }
  22. }
  23. }

Flink Blob Server | binary large objects - 图3 最后一部分实现是BlobCache,是用于在TaskExecutor中维护Blob的缓存,以免多次请求下载,整个代码的分层设计非常清晰,值得学习

Blob与classloader协作

在Blob jar被download到本地,task需要使用相应的类还需要通过classloader加载相应的类才能使用,这里是通过BlobLibraryCacheManager来实现了相应逻辑。

看下类的注释就知道这个类的大致用途

  1. /**
  2. * Provides facilities to download a set of libraries (typically JAR files) for a job from a
  3. * {@link PermanentBlobService} and create a class loader with references to them.
  4. */
  5. public class BlobLibraryCacheManager implements LibraryCacheManager

注册

org.apache.flink.runtime.execution.librarycache.LibraryCacheManager#registerTask

  1. synchronized (lockObject) {
  2. LibraryCacheEntry entry = cacheEntries.get(jobId);
  3. if (entry == null) {
  4. URL[] urls = new URL[requiredJarFiles.size() + requiredClasspaths.size()];
  5. int count = 0;
  6. try {
  7. // add URLs to locally cached JAR files
  8. for (PermanentBlobKey key : requiredJarFiles) {
  9. urls[count] = blobService.getFile(jobId, key).toURI().toURL();
  10. ++count;
  11. }
  12. // add classpaths
  13. for (URL url : requiredClasspaths) {
  14. urls[count] = url;
  15. ++count;
  16. }
  17. cacheEntries.put(jobId, new LibraryCacheEntry(
  18. requiredJarFiles, requiredClasspaths, urls, task, classLoaderResolveOrder, alwaysParentFirstPatterns));
  19. } catch (Throwable t) {
  20. // rethrow or wrap
  21. ExceptionUtils.tryRethrowIOException(t);
  22. throw new IOException(
  23. "Library cache could not register the user code libraries.", t);
  24. }
  25. } else {
  26. entry.register(task, requiredJarFiles, requiredClasspaths);
  27. }
  28. }

LibraryCacheEntry记录了每个job所需要的library jar,并创建一个classloader来加载,如果没有Execution指向他的时候就会释放classloader以及相应的class了

这里意思上是每次任务发生fo,都会在这里重新注册一次LibraryEntry(reference + 1)

  1. /**
  2. * An entry in the per-job library cache. Tracks which execution attempts
  3. * still reference the libraries. Once none reference it any more, the
  4. * class loaders can be cleaned up.
  5. */

classloader

创建classloader来加载相应的blob jar

  1. this.classLoader =
  2. FlinkUserCodeClassLoaders.create(
  3. classLoaderResolveOrder,
  4. libraryURLs,
  5. FlinkUserCodeClassLoaders.class.getClassLoader(),
  6. alwaysParentFirstPatterns);

自定义顺序加载class文件

  1. @Override
  2. protected synchronized Class<?> loadClass(
  3. String name, boolean resolve) throws ClassNotFoundException {
  4. // First, check if the class has already been loaded
  5. Class<?> c = findLoadedClass(name);
  6. if (c == null) {
  7. // check whether the class should go parent-first
  8. for (String alwaysParentFirstPattern : alwaysParentFirstPatterns) {
  9. if (name.startsWith(alwaysParentFirstPattern)) {
  10. return super.loadClass(name, resolve);
  11. }
  12. }
  13. try {
  14. // check the URLs 先自己加载,再由parent classloader加载
  15. c = findClass(name);
  16. } catch (ClassNotFoundException e) {
  17. // let URLClassLoader do it, which will eventually call the parent
  18. c = super.loadClass(name, resolve);
  19. }
  20. }
  21. if (resolve) {
  22. resolveClass(c);
  23. }
  24. return c;
  25. }

释放

在task执行结束或者fo时会从libraryCacheManager中unregisterTask

  1. public void unregisterTask(JobID jobId, ExecutionAttemptID task) {
  2. checkNotNull(jobId, "The JobId must not be null.");
  3. checkNotNull(task, "The task execution id must not be null.");
  4. synchronized (lockObject) {
  5. LibraryCacheEntry entry = cacheEntries.get(jobId);
  6. if (entry != null) {
  7. if (entry.unregister(task)) {
  8. cacheEntries.remove(jobId);
  9. entry.releaseClassLoader();
  10. }
  11. }
  12. // else has already been unregistered
  13. }
  14. }

如果LibraryEntry引用计数为0,就会执行classLoader.close()来释放Metaspace中的类文件

  1. void releaseClassLoader() {
  2. try {
  3. classLoader.close();
  4. } catch (IOException e) {
  5. LOG.warn("Failed to release user code class loader for " + Arrays.toString(libraries.toArray()));
  6. }
  7. }

但是从观察到的现象看这里的classloader对象和class文件不会立马被gc掉,而是等待fullgc的时候才会触发old去的classloader对象清理,并将Metaspace的class文件清理掉,线上也能观察到多次fo之后会存在很多的classloader对象。

这里可能会导致一个问题,在没有设置MaxMetaSpaceSize的情况下,可能会导致内存随着fo飙涨,而因为没有设置MaxMetaSpaceSize,不触发fgc,导致在k8s模式下很容易被cgroup杀死

参考

MetaSpace参考
https://www.atatech.org/articles/149842
https://www.atatech.org/articles/64871
https://www.atatech.org/articles/81068