可视化后台
Checkpoint
- 其中
Acknowledged
一列表示有多少个 subtask 对这个 Checkpoint 进行了 ack, Latest Acknowledgement
表示该 operator 的所有 subtask 最后 ack 的时间;End to End Duration
表示整个 operator 的所有 subtask 中完成 snapshot 的最长时间;State Size
表示当前 Checkpoint 的 state 大小 – 主要这里如果是增量 checkpoint 的话,则表示增量大小;Buffered During Alignment
表示在 barrier 对齐阶段积攒了多少数据,如果这个数据过大也间接表示对齐比较慢);坑收集
提交作业报akka超时错误
升级jdk版本 8u_202版本及以上实测可以。Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#-63839359]] after [10000 ms]
版本
上图Scala版本是用户编程使用Scala版本,如果版本不对会出现下面这种序列化错误Caused by: java.io.InvalidClassException: org.apache.flink.streaming.api.scala.DataStream$$anon$6; local class incompatible: stream classdesc serialVersionUID =
虚拟机集群问题
virbr0 192.168.122.1
要把NAT网卡删除掉,要不然TaskManager监听地址会变成192.168.122.1。不会是自身真是IP。virsh net-destroy default
命令备忘
Standalone模式启动
#standalone
bin/start-cluster.sh
bin/flink run -m tail1:8081
bin/flink run -m tail1:8081 -c me.yanri.WordCount -p 1 ~/opt/jars/me.yanri.jar
#检查点恢复
bin/flink run -s hdfs:///flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-860/_metadata me.wordcount.jar
Yarn模式启动
#yarn-session
bin/yarn-session.sh -tm 4096 -s 4
bin/yarn-session.sh -n 2 -tm 1024 -s 2
bin/flink run -yid application_1570379089091_0001 -c me.yanri.WordCount ~/opt/jars/me.yanri.jar
#per-job
bin/flink run -d -m yarn-cluster -yn 2 /opt/jars/me.yanri.jar
bin/flink run -d -m yarn-cluster -p 4 -yjm 2048m -ytm 4096m ./examples/batch/WordCount.jar
bin/flink run -d -m yarn-cluster -yjm 2048m /opt/jars/me.yanri.jar
flink run -m yarn-cluster 后面的 -yxx参数,都是yarn-session.sh的参数。
常用参数
-yn,--yarncontainer Number of Task Managers
-yqu,--yarnqueue Specify YARN queue.
-ys,--yarnslots Number of slots per TaskManager
其他
flink list
flink run -d -s /tmp/savepoint/savepoint-xxxx xxx.jar
flink stop #优雅停止 Source要实现StoppableFunction才能stop
flink cancel #暴力停止
flink savepoint
flink modify -p 4 ${id} #改变并行度、底层借助savepoint机制
flink info #展示StreamGraph,以JSON形式
/tmp/.yarn-properties-${user}保存了创建yarn-session集群的信息,所以上面命令可以不指定集群位置创建任务。
解析命令参数项并初始化,启动指定运行模式,如果是per-job运行模式将根据命令行参数指定的Job主类创建job graph;
- 如果可以从命令行参数(-yid )或YARN properties临时文件(${java.io.tmpdir}/.yarn-properties-${user.name})中获取应用ID,向指定的应用提交Job;
- 否则当命令行参数中包含 -d(表示detached模式)和 -m yarn-cluster(表示指定YARN集群模式),启动per-job运行模式;
- 否则当命令行参数项不包含 -yq(表示查询YARN集群可用资源)时,启动session运行模式;
编译
指定版本Scala和Hadoop版本
mvn clean package -Dscala=2.12 -DskipTests -Dhadoop.version=2.9.2
Hadoop依赖
1.11版本之后Hadoop依赖通过HADOOP_CLASSPATH指定。要在每个节点都设置好这个环境变量。
export HADOOP_CLASSPATH=`${HADOOP_HOME}/bin/hadoop classpath`
坑收集
0x01
Could not find a file system implementation for scheme ‘hdfs’问题
Client所在机器的flink目录lib里缺少flink-shaded-hadoop-2-uber-xxx-xxx.jar
0x02
最好配置HADOOP_CONF_DIR,如果它和HADOOP_HOME都没配置,Yarn的AM会报 /lib/slf4j-log4j12-1.7.15.jar does not exist错误。
flink run 提交任务也会失败。
0x03
docker 必须把jar包拷贝到容器才能提交作业
$ JOBMANAGER_CONTAINER=$(docker ps --filter name=jobmanager --format={{.ID}})
$ docker cp path/to/jar "$JOBMANAGER_CONTAINER":/job.jar
$ docker exec -t -i "$JOBMANAGER_CONTAINER" flink run /job.jar
下载地址
https://flink.apache.org/downloads.html
官网不提供的版本需要自己编译
参考资料
Checkpoint问题排查
http://blog.itpub.net/69915408/viewspace-2657298/