可视化后台

Checkpoint

  1. 其中 Acknowledged 一列表示有多少个 subtask 对这个 Checkpoint 进行了 ack,
  2. Latest Acknowledgement 表示该 operator 的所有 subtask 最后 ack 的时间;
  3. End to End Duration 表示整个 operator 的所有 subtask 中完成 snapshot 的最长时间;
  4. State Size 表示当前 Checkpoint 的 state 大小 – 主要这里如果是增量 checkpoint 的话,则表示增量大小;
  5. Buffered During Alignment 表示在 barrier 对齐阶段积攒了多少数据,如果这个数据过大也间接表示对齐比较慢);

    坑收集

    提交作业报akka超时错误

    升级jdk版本 8u_202版本及以上实测可以。
    1. Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:
    2. akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#-63839359]] after [10000 ms]

    版本

    image.png
    上图Scala版本是用户编程使用Scala版本,如果版本不对会出现下面这种序列化错误
    1. Caused by: java.io.InvalidClassException: org.apache.flink.streaming.api.scala.DataStream$$anon$6; local class incompatible: stream classdesc serialVersionUID =

    虚拟机集群问题

    virbr0 192.168.122.1

    要把NAT网卡删除掉,要不然TaskManager监听地址会变成192.168.122.1。不会是自身真是IP。
    1. virsh net-destroy default

命令备忘

Standalone模式启动

  1. #standalone
  2. bin/start-cluster.sh
  3. bin/flink run -m tail1:8081
  4. bin/flink run -m tail1:8081 -c me.yanri.WordCount -p 1 ~/opt/jars/me.yanri.jar
  5. #检查点恢复
  6. bin/flink run -s hdfs:///flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-860/_metadata me.wordcount.jar

Yarn模式启动

  1. #yarn-session
  2. bin/yarn-session.sh -tm 4096 -s 4
  3. bin/yarn-session.sh -n 2 -tm 1024 -s 2
  4. bin/flink run -yid application_1570379089091_0001 -c me.yanri.WordCount ~/opt/jars/me.yanri.jar
  5. #per-job
  6. bin/flink run -d -m yarn-cluster -yn 2 /opt/jars/me.yanri.jar
  7. bin/flink run -d -m yarn-cluster -p 4 -yjm 2048m -ytm 4096m ./examples/batch/WordCount.jar
  8. bin/flink run -d -m yarn-cluster -yjm 2048m /opt/jars/me.yanri.jar

flink run -m yarn-cluster 后面的 -yxx参数,都是yarn-session.sh的参数。

常用参数

  1. -yn,--yarncontainer Number of Task Managers
  2. -yqu,--yarnqueue Specify YARN queue.
  3. -ys,--yarnslots Number of slots per TaskManager

其他

  1. flink list
  2. flink run -d -s /tmp/savepoint/savepoint-xxxx xxx.jar
  3. flink stop #优雅停止 Source要实现StoppableFunction才能stop
  4. flink cancel #暴力停止
  5. flink savepoint
  6. flink modify -p 4 ${id} #改变并行度、底层借助savepoint机制
  7. flink info #展示StreamGraph,以JSON形式

/tmp/.yarn-properties-${user}保存了创建yarn-session集群的信息,所以上面命令可以不指定集群位置创建任务。

解析命令参数项并初始化,启动指定运行模式,如果是per-job运行模式将根据命令行参数指定的Job主类创建job graph;

  • 如果可以从命令行参数(-yid )或YARN properties临时文件(${java.io.tmpdir}/.yarn-properties-${user.name})中获取应用ID,向指定的应用提交Job;
  • 否则当命令行参数中包含 -d(表示detached模式)和 -m yarn-cluster(表示指定YARN集群模式),启动per-job运行模式;
  • 否则当命令行参数项不包含 -yq(表示查询YARN集群可用资源)时,启动session运行模式;

编译

指定版本Scala和Hadoop版本

  1. mvn clean package -Dscala=2.12 -DskipTests -Dhadoop.version=2.9.2

Hadoop依赖

1.11版本之后Hadoop依赖通过HADOOP_CLASSPATH指定。要在每个节点都设置好这个环境变量。

  1. export HADOOP_CLASSPATH=`${HADOOP_HOME}/bin/hadoop classpath`

坑收集

0x01

Could not find a file system implementation for scheme ‘hdfs’问题
Client所在机器的flink目录lib里缺少flink-shaded-hadoop-2-uber-xxx-xxx.jar

0x02

最好配置HADOOP_CONF_DIR,如果它和HADOOP_HOME都没配置,Yarn的AM会报 /lib/slf4j-log4j12-1.7.15.jar does not exist错误。
flink run 提交任务也会失败。

0x03

docker 必须把jar包拷贝到容器才能提交作业

  1. $ JOBMANAGER_CONTAINER=$(docker ps --filter name=jobmanager --format={{.ID}})
  2. $ docker cp path/to/jar "$JOBMANAGER_CONTAINER":/job.jar
  3. $ docker exec -t -i "$JOBMANAGER_CONTAINER" flink run /job.jar

下载地址
https://flink.apache.org/downloads.html
官网不提供的版本需要自己编译

参考资料

Checkpoint问题排查
http://blog.itpub.net/69915408/viewspace-2657298/