Flink作业提交
Φ Local模式
# 上传jar包至固定目录,如:/share/BigDataFlinkDemo-1.0.0.jar
cd /share
export FLINK_HOME=/home/vagrant/modules/flink-1.10.0
${FLINK_HOME}/bin/flink run -d -c org.polaris.bigdata.flink.wordcount.BoundStreamOplimizeDemo4 \
/share/BigDataFlinkDemo-1.0.0.jar \
--inputpath hdfs://bigdata-node1:9000/tmp/input/ \
--outputpath hdfs://bigdata-node1:9000/tmp/output/wordcount.txt5
${FLINK_HOME}/bin/flink list --all # 查看job列表(--all:查看所有任务,包括已经取消和停止的)
${FLINK_HOME}/bin/flink cancel ${JobID} # 停止job
说明:
- StandaloneSessionClusterEntrypoint(JobManager),负责计算资源调度。
- TaskManagerRunner(TaskManager),负责具体的计算任务的执行。
CliFrontend(WebUI方式提交无此进程),任务提交过程产生的进程(类似于Driver),作业完成后退出。
Φ Standalone
# 上传jar包至固定目录,如:/share/BigDataFlinkDemo-1.0.0.jar
cd /share
export FLINK_HOME=/home/vagrant/modules/flink-1.10.0
${FLINK_HOME}/bin/flink run -d -c org.polaris.bigdata.flink.wordcount.BoundStreamOplimizeDemo4 \
/share/BigDataFlinkDemo-1.0.0.jar \
--inputpath hdfs://bigdata-node1:9000/tmp/input/ \
--outputpath hdfs://bigdata-node1:9000/tmp/output/wordcount.txt5
${FLINK_HOME}/bin/flink list --all # 查看job列表(--all:查看所有任务,包括已经取消和停止的)
${FLINK_HOME}/bin/flink cancel ${JobID} # 停止job
说明:
StandaloneSessionClusterEntrypoint(JobManager),负责计算资源调度。
- TaskManagerRunner(TaskManager),负责具体的计算任务的执行。
- CliFrontend(WebUI方式提交无此进程),任务提交过程产生的进程(类似于Driver),作业完成后退出。
Φ Flink on yarn
https://www.cnblogs.com/asker009/p/11327533.html
Flink作业重启策略
Φ 固定延迟重启策略
Φ 故障率重启策略
Φ 无重启策略
Φ 后备重启策略
Flink作业补跑
Flink作业重跑
Flink SQL参数设置
SET table.exec.mini-batch.enabled=true;
SET table.exec.mini-batch.allow-latency=1s;
SET table.exec.mini-batch.size=1000;
SET table.optimizer.agg-phase-strategy=TWO_PHASE;
SET table.optimizer.distinct-agg.split.enabled=true;
# Flink SQL Client两种展示模式
SET execution.result-mode=table; # table mode:效果像是对普通数据表的查询
SET execution.result-mode=changelog; # changelog mode:效果像是打印每一次数据变更的日志
参考
博文:Flink重启策略
https://blog.csdn.net/qq_33689414/article/details/93761787