Flink
部署模式
- local
- standalone
- Flink on Yarn
on Yarn 中有两类:
- 内存集中管理模式:在Yarn中初始化一个Flink集群,开辟指定的资源,之后我们提交的 Flink Jon 都在这个Flink yarn-session 中,也就是说不管提交多少个 job,这些 job 都会共用开始时在 yarn 中申请的资源。这个 Flink 集群会常驻在 Yarn 集群中,除非手动停止。
- 内存Job管理模式(per-job)【推荐使用】:在 Yarn 中,每次提交 job 都会创建一个新的 Flink 集群,任务之间相互独立,互不影响并且方便管理。任务执行完成之后创建的集群也会消失。
CDH 添加 Flink 服务
- 编译 CDH parcel: https://github.com/pkeropen/flink-parcel
- Flink下载: https://archive.apache.org/dist/flink/flink-1.9.2/
修改阿里仓库地址
- /usr/local/apache-maven/conf/settings.xml
<mirror><!--This sends everything else to /public --><id>nexus-aliyun</id><mirrorOf>central</mirrorOf><name>Nexus aliyun</name><url>http://maven.aliyun.com/nexus/content/groups/public</url></mirror>
参考 github 内容
导读
CDH除了能够管理自生所提供的一些大数据相关服务外,还允许将第三方服务添加到CDH集群(托管在CDH上)。你需要做的就是按照一定的规则流程制作相关程序包,最后发布到CDH上。虽然过程并不困难,但是手动操作尤其是一些关键配置容易出错,往往导致最终服务无法正常在CDH上安装运行。
本文就是指导大家如何打包自己的服务,发布到CDH上,并且由CDH控制服务的运行、监控服务的基本运行状态。
相关介绍
(1) parcel: 以 “.parcel” 结尾的压缩文件。parcel 包内共两个目录,其中 lib 包含了服务组件,meta 包含一个重要的描述性文件 parcel.json,这个文件记录了服务的信息,如版本、所属用户、适用的 CDH 平台版本等。
命名规则必须如下:
文件名称格式为三段,第一段是包名,第二段是版本号,第三段是运行平台。
例如:FLINK-1.9.1-bin-scala_2.12-el7.parcel
包名:FLINK
版本号:1.9.1-bin-scala_2.12
运行环境:el7
el6是代表 centos6 系统,centos7 则用 el7 表示
ps:
parcel 必须包置于 /opt/cloudera/parcel-repo/ 目录下才可以被 CDH 发布程序时识别到。
(2) csd:csd 文件是一个 jar 包,它记录了服务在 CDH 上的管理规则里面包含三个文件目录,images、descriptor、scripts 分别对应。如服务在 CDH 页面上显示的图标、依赖的服务、暴露的端口、启动规则等。
csd的jar包必须置于/opt/cloudera/csd/目录才可以在添加集群服务时被识别到。
flink-parcel 制作过程
以CDH5.14、FLINK1.9.1为例
(1) 下载制作包
git clone https://github.com/pkeropen/flink-parcel.git
(2) 修改配置文件 flink-parcel.properties
#FLINK 下载地址FLINK_URL=https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.12.tgz#flink版本号FLINK_VERSION=1.9.1#扩展版本号EXTENS_VERSION=BIN-SCALA_2.12#操作系统版本,以centos为例OS_VERSION=7#CDH 小版本CDH_MIN_FULL=5.2CDH_MAX_FULL=5.15#CDH大版本CDH_MIN=5CDH_MAX=5
(2) 生成parcel文件
./build.sh parcel
(3) 生成csd文件
on yarn 版本
./build.sh csd_on_yarn
standalone版本
./build.sh csd_standalone
CDH 中安装flink服务
此处假设你已经安装好CDH集群
(1) 将上面生成的 parcel 文件 copy 至 cloudera/parcel-repo 子目录下
(2) 将上述生成的 jar 文件 copy 至 cloudera /parcel-repo 子目录下
(3) 在 CDH 中添加 flink 的 parcel 包:
打开 CDH 管理界面 -> 集群 -> 检查 parcel 包-> flink -> 分配 -> 激活
(4) 重启 CDH 服务后 ,点击 CDH 所管理的集群添加服务,在列表中找到 flink,按提示添加启动并运行。
说明:
(1) 在如果集群开启了安全,需要配置 security.kerberos.login.keytab 和 security.kerberos.login.principal 两个参数才能正正常启动。如未启动 kerberos, 则在 CDH 中添加 FLINK 服务时请清空这两个参数的内容
(2) 如果你计划将 Apache Flink 与 Apache Hadoop 一起使用(在 YARN 上运行 Flink ,连接到 HDFS ,连接到 HBase ,或使用一些基于 Hadoop 文件系统的 connector ),请选择包含匹配的 Hadoop 版本的下载包,且另外下載对应版本的 Hadoop 库,将官方指定 Pre-bundled Hadoop 2.6.5 ,并且把下载后的 Hadoop 库放置 到 Flink 安装目录下的 lib 目录 包并设置 HADOOP_CLASSPATH 环境变量。
例如:export HADOOP_CLASSPATH=/opt/cloudera/parcels/FLINK/lib/flink/lib
常见报错补充:
测试时发现提交任务失败,无法上传jar,查看日志
- PUT operation failedjava.io.IOException: Mkdirs failed to create /user/flink/ha/cluster_standalone/blob/job_2af01f6cb48eeb97ac9120f1b9325499at org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:269)at org.apache.flink.runtime.blob.FileSystemBlobStore.put(FileSystemBlobStore.java:73)at org.apache.flink.runtime.blob.FileSystemBlobStore.put(FileSystemBlobStore.java:69)at org.apache.flink.runtime.blob.BlobUtils.moveTempFileToStore(BlobUtils.java:444)at org.apache.flink.runtime.blob.BlobServer.moveTempFileToStore(BlobServer.java:694)at org.apache.flink.runtime.blob.BlobServerConnection.put(BlobServerConnection.java:351)at org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:114)- Unhandled exception.org.apache.flink.util.FlinkException: Could not upload job files.at org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:80)at org.apache.flink.runtime.client.ClientUtils.extractAndUploadJobGraphFiles(ClientUtils.java:57)at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:93)at java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1105)at java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1070)at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)Caused by: java.io.IOException: PUT operation failed: Server side error: Mkdirs failed to create /user/flink/ha/cluster_standalone/blob/job_2af01f6cb48eeb97ac9120f1b9325499at org.apache.flink.runtime.blob.BlobClient.putInputStream(BlobClient.java:372)at org.apache.flink.runtime.blob.BlobClient.uploadFile(BlobClient.java:428)at org.apache.flink.runtime.client.ClientUtils.uploadUserJars(ClientUtils.java:102)at org.apache.flink.runtime.client.ClientUtils.uploadAndSetUserJars(ClientUtils.java:95)at org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:77)... 9 moreCaused by: java.io.IOException: Server side error: Mkdirs failed to create /user/flink/ha/cluster_standalone/blob/job_2af01f6cb48eeb97ac9120f1b9325499at org.apache.flink.runtime.blob.BlobOutputStream.receiveAndCheckPutResponse(BlobOutputStream.java:171)at org.apache.flink.runtime.blob.BlobOutputStream.finish(BlobOutputStream.java:104)at org.apache.flink.runtime.blob.BlobClient.putInputStream(BlobClient.java:369)... 13 moreCaused by: java.io.IOException: Mkdirs failed to create /user/flink/ha/cluster_standalone/blob/job_2af01f6cb48eeb97ac9120f1b9325499at org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:269)at org.apache.flink.runtime.blob.FileSystemBlobStore.put(FileSystemBlobStore.java:73)at org.apache.flink.runtime.blob.FileSystemBlobStore.put(FileSystemBlobStore.java:69)at org.apache.flink.runtime.blob.BlobUtils.moveTempFileToStore(BlobUtils.java:444)at org.apache.flink.runtime.blob.BlobServer.moveTempFileToStore(BlobServer.java:694)at org.apache.flink.runtime.blob.BlobServerConnection.put(BlobServerConnection.java:351)at org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:114)
根据错误提示,猜测是写入到机器本地目录而不是 HDFS 上,在 CDH 配置页面将配置 high-availability storageDir 修改为 hdfs 路径即可解决。
high-availability hdfs:///user/flink/ha
