Flink

部署模式

  • local
  • standalone
  • Flink on Yarn

    on Yarn 中有两类:

    1. 内存集中管理模式:在Yarn中初始化一个Flink集群,开辟指定的资源,之后我们提交的 Flink Jon 都在这个Flink yarn-session 中,也就是说不管提交多少个 job,这些 job 都会共用开始时在 yarn 中申请的资源。这个 Flink 集群会常驻在 Yarn 集群中,除非手动停止。
    2. 内存Job管理模式(per-job)【推荐使用】:在 Yarn 中,每次提交 job 都会创建一个新的 Flink 集群,任务之间相互独立,互不影响并且方便管理。任务执行完成之后创建的集群也会消失。

CDH 添加 Flink 服务

修改阿里仓库地址

  • /usr/local/apache-maven/conf/settings.xml
  1. <mirror>
  2. <!--This sends everything else to /public -->
  3. <id>nexus-aliyun</id>
  4. <mirrorOf>central</mirrorOf>
  5. <name>Nexus aliyun</name>
  6. <url>http://maven.aliyun.com/nexus/content/groups/public</url>
  7. </mirror>

参考 github 内容

导读

CDH除了能够管理自生所提供的一些大数据相关服务外,还允许将第三方服务添加到CDH集群(托管在CDH上)。你需要做的就是按照一定的规则流程制作相关程序包,最后发布到CDH上。虽然过程并不困难,但是手动操作尤其是一些关键配置容易出错,往往导致最终服务无法正常在CDH上安装运行。

本文就是指导大家如何打包自己的服务,发布到CDH上,并且由CDH控制服务的运行、监控服务的基本运行状态。

相关介绍

(1) parcel: 以 “.parcel” 结尾的压缩文件。parcel 包内共两个目录,其中 lib 包含了服务组件,meta 包含一个重要的描述性文件 parcel.json,这个文件记录了服务的信息,如版本、所属用户、适用的 CDH 平台版本等。

命名规则必须如下
文件名称格式为三段,第一段是包名,第二段是版本号,第三段是运行平台。
例如:FLINK-1.9.1-bin-scala_2.12-el7.parcel
包名:FLINK
版本号:1.9.1-bin-scala_2.12
运行环境:el7
el6是代表 centos6 系统,centos7 则用 el7 表示

ps:
parcel 必须包置于 /opt/cloudera/parcel-repo/ 目录下才可以被 CDH 发布程序时识别到。

(2) csd:csd 文件是一个 jar 包,它记录了服务在 CDH 上的管理规则里面包含三个文件目录,images、descriptor、scripts 分别对应。如服务在 CDH 页面上显示的图标、依赖的服务、暴露的端口、启动规则等。

csd的jar包必须置于/opt/cloudera/csd/目录才可以在添加集群服务时被识别到。

flink-parcel 制作过程

以CDH5.14、FLINK1.9.1为例

(1) 下载制作包

  1. git clone https://github.com/pkeropen/flink-parcel.git

(2) 修改配置文件 flink-parcel.properties

  1. #FLINK 下载地址
  2. FLINK_URL=https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.12.tgz
  3. #flink版本号
  4. FLINK_VERSION=1.9.1
  5. #扩展版本号
  6. EXTENS_VERSION=BIN-SCALA_2.12
  7. #操作系统版本,以centos为例
  8. OS_VERSION=7
  9. #CDH 小版本
  10. CDH_MIN_FULL=5.2
  11. CDH_MAX_FULL=5.15
  12. #CDH大版本
  13. CDH_MIN=5
  14. CDH_MAX=5

(2) 生成parcel文件

  1. ./build.sh parcel

(3) 生成csd文件

  • on yarn 版本

    1. ./build.sh csd_on_yarn
  • standalone版本

    1. ./build.sh csd_standalone

CDH 中安装flink服务

此处假设你已经安装好CDH集群
(1) 将上面生成的 parcel 文件 copy 至 cloudera/parcel-repo 子目录下
(2) 将上述生成的 jar 文件 copy 至 cloudera /parcel-repo 子目录下
(3) 在 CDH 中添加 flink 的 parcel 包:
打开 CDH 管理界面 -> 集群 -> 检查 parcel 包-> flink -> 分配 -> 激活
(4) 重启 CDH 服务后 ,点击 CDH 所管理的集群添加服务,在列表中找到 flink,按提示添加启动并运行。

说明:

(1) 在如果集群开启了安全,需要配置 security.kerberos.login.keytab 和 security.kerberos.login.principal 两个参数才能正正常启动。如未启动 kerberos, 则在 CDH 中添加 FLINK 服务时请清空这两个参数的内容
(2) 如果你计划将 Apache Flink 与 Apache Hadoop 一起使用(在 YARN 上运行 Flink ,连接到 HDFS ,连接到 HBase ,或使用一些基于 Hadoop 文件系统的 connector ),请选择包含匹配的 Hadoop 版本的下载包,且另外下載对应版本的 Hadoop 库,将官方指定 Pre-bundled Hadoop 2.6.5 ,并且把下载后的 Hadoop 库放置 到 Flink 安装目录下的 lib 目录 包并设置 HADOOP_CLASSPATH 环境变量。
例如:export HADOOP_CLASSPATH=/opt/cloudera/parcels/FLINK/lib/flink/lib

常见报错补充:

测试时发现提交任务失败,无法上传jar,查看日志

  1. - PUT operation failed
  2. java.io.IOException: Mkdirs failed to create /user/flink/ha/cluster_standalone/blob/job_2af01f6cb48eeb97ac9120f1b9325499
  3. at org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:269)
  4. at org.apache.flink.runtime.blob.FileSystemBlobStore.put(FileSystemBlobStore.java:73)
  5. at org.apache.flink.runtime.blob.FileSystemBlobStore.put(FileSystemBlobStore.java:69)
  6. at org.apache.flink.runtime.blob.BlobUtils.moveTempFileToStore(BlobUtils.java:444)
  7. at org.apache.flink.runtime.blob.BlobServer.moveTempFileToStore(BlobServer.java:694)
  8. at org.apache.flink.runtime.blob.BlobServerConnection.put(BlobServerConnection.java:351)
  9. at org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:114)
  10. - Unhandled exception.
  11. org.apache.flink.util.FlinkException: Could not upload job files.
  12. at org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:80)
  13. at org.apache.flink.runtime.client.ClientUtils.extractAndUploadJobGraphFiles(ClientUtils.java:57)
  14. at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:93)
  15. at java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1105)
  16. at java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1070)
  17. at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
  18. at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
  19. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  20. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  21. at java.lang.Thread.run(Thread.java:748)
  22. Caused by: java.io.IOException: PUT operation failed: Server side error: Mkdirs failed to create /user/flink/ha/cluster_standalone/blob/job_2af01f6cb48eeb97ac9120f1b9325499
  23. at org.apache.flink.runtime.blob.BlobClient.putInputStream(BlobClient.java:372)
  24. at org.apache.flink.runtime.blob.BlobClient.uploadFile(BlobClient.java:428)
  25. at org.apache.flink.runtime.client.ClientUtils.uploadUserJars(ClientUtils.java:102)
  26. at org.apache.flink.runtime.client.ClientUtils.uploadAndSetUserJars(ClientUtils.java:95)
  27. at org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:77)
  28. ... 9 more
  29. Caused by: java.io.IOException: Server side error: Mkdirs failed to create /user/flink/ha/cluster_standalone/blob/job_2af01f6cb48eeb97ac9120f1b9325499
  30. at org.apache.flink.runtime.blob.BlobOutputStream.receiveAndCheckPutResponse(BlobOutputStream.java:171)
  31. at org.apache.flink.runtime.blob.BlobOutputStream.finish(BlobOutputStream.java:104)
  32. at org.apache.flink.runtime.blob.BlobClient.putInputStream(BlobClient.java:369)
  33. ... 13 more
  34. Caused by: java.io.IOException: Mkdirs failed to create /user/flink/ha/cluster_standalone/blob/job_2af01f6cb48eeb97ac9120f1b9325499
  35. at org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:269)
  36. at org.apache.flink.runtime.blob.FileSystemBlobStore.put(FileSystemBlobStore.java:73)
  37. at org.apache.flink.runtime.blob.FileSystemBlobStore.put(FileSystemBlobStore.java:69)
  38. at org.apache.flink.runtime.blob.BlobUtils.moveTempFileToStore(BlobUtils.java:444)
  39. at org.apache.flink.runtime.blob.BlobServer.moveTempFileToStore(BlobServer.java:694)
  40. at org.apache.flink.runtime.blob.BlobServerConnection.put(BlobServerConnection.java:351)
  41. at org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:114)

根据错误提示,猜测是写入到机器本地目录而不是 HDFS 上,在 CDH 配置页面将配置 high-availability storageDir 修改为 hdfs 路径即可解决。

  1. high-availability hdfs:///user/flink/ha

相关参考:

  1. Cloudera Manager Extensions
  2. csd参考模板
  3. FLINK官方下载地址
  4. CDH添加第三方服务的方法