并行度、分区

  • 算子子任务数就是其对应算子的并行度、分区。在同一程序中,不同算子也可能具有不同的并行度
  • 数据在算子间传输数据的模式:一对一、重新分发

    分层API

    image.png

flink部署的四种方式

  • local模式:一个节点的单机模式 ```java tar -zxvf flink-1.13.0-bin-scala_2.12.tgz -C /opt/module/

cd flink-1.13.0/ bin/start-cluster.sh bin/stop-cluster.sh

访问 http://hadoop102:8081/

  1. - standalone模式:多个节点、不依靠yarn调度的集群
  2. ```java
  3. // 三台都要操作
  4. vim conf/flink-conf.yaml
  5. # JobManager 节点地址
  6. jobmanager.rpc.address: hadoop102
  7. vim workers
  8. hadoop103
  9. hadoop104
  10. bin/start-cluster.sh
  11. 访问http://hadoop102:8081/
  • flink on yarn
  • yarn on k8s

submit-job的三种模式

在standlone、native k8s、on yarn三种部署方式下,都分别有三种提交方式,下文主要讲的 on yarn,其他部署方式详见官方文档

session mode:常驻

  • 在yarn中初始化一个flink集群,所有job共用。 这个集群会常驻在yarn中 除非手动停止 所有作业共享 dispatcher、resourceManager
  • main方法运行在client上, 然后将 JobGraph 提交到yarn-jobmanager
  • 缺点:资源隔离做的不好。如果 TaskManager 崩溃,则在此 TaskManager 上运行 task 的所有作业都将失败;类似的,如果 JobManager 上发生一些致命错误,它将影响集群中正在运行的所有作业。
  • 优点:节省大量时间申请资源和启动 TaskManager

    1. # 创建yarn-session集群
    2. ./yarn-session.sh
    3. -n 2 taskmanager数量
    4. -s 2 每个taskmanagerslot数量
    5. -jm 1024 jobmanager内存
    6. -tm 1024 每个taskmanager的内存
    7. -nm test yarn上的appname
    8. -d 后台执行
    9. # 在session集群上提交一个job
    10. ./bin/flink run
    11. -t yarn-session \
    12. -Dyarn.application.id=application_XXXX_YY \ # 手动指定运行在刚创建的session集群
    13. ./examples/streaming/TopSpeedWindowing.jar
    14. # 取消yarn-session集群
    15. yarn application --kill application_1577
    16. 或者 echo "stop" | ./bin/yarn-session.sh -id application_XXXXX_XXX

    per-job mode:job独占+client 生产常用

  • 每个job独立一个集群,作业之间互不影响。job完成后 对应的集群也会消失

  • main方法运行在client上, 然后将 JobGraph 提交到yarn-jobmanager。如果您传递—detached参数,一旦提交被接受,客户端将停止
    • -d、—detached :加上本参数是cluster模式 不加是client模式(client一直输出日志)。
  • 优点:资源隔离:JobManager 中的致命错误仅影响在 Flink Job 集群中运行的一个作业。 适合长期运行、具有高稳定性要求且对较长的启动时间不敏感的大型作业 ```bash ./bin/flink run -t yarn-per-job —detached // cluster模式 ./examples/streaming/TopSpeedWindowing.jar

List running job on the cluster

./bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY

Cancel running job

./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY

  1. <a name="LdrYg"></a>
  2. #### application mode:job独占+cluster
  3. - 每个job独立一个集群,作业之间互不影响。job完成后 对应的集群也会消失
  4. - main方法运行在yarn-jobmanager上。可以看做 client与cluster的区别 (除了client、cluster,其他都和per-job模式相同)
  5. - Kubernetes 支持 application cluster,不支持per-job
  6. ```sql
  7. ./bin/flink
  8. run-application
  9. -t yarn-application
  10. ./examples/streaming/TopSpeedWindowing.jar
  11. # List running job on the cluster
  12. ./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
  13. # Cancel running job
  14. ./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
  15. # 停止job
  16. yarn application --kill application_1577