一般我们有几种方式使用Flink(以Flink1.13/1.14为例)

  • 本地local模式,主要用于开发与逻辑调试
  • standalone(一般不用,自行搭建调试用)
  • k8s
    • Flink session模式
    • Flink application模式(生产)
  • yarn
    • Flink session模式
    • Flink application模式(生产)

      local模式

      比如使用idea+maven开发,直接在pom里面引入flink相关的包。

standalone

提交一个Flink wordcount作业 - 图1
默认start-cluster.sh会在master起一个JM,worker列表节点上各起一个TM
还可以在maser或者worker节点上执行以下命令增加JM/TM。增加JM主要用于HA。

  1. bin/jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all
  2. bin/taskmanager.sh start|start-foreground|stop|stop-all

详细的集群部署操作参考官网
standalone模式的高可用需要借助zk。

Flink on K8S(Native)

session模式

此外我们还在k8s上启动一个session集群,对于平时一些flink sql的调试任务,可以在这个集群完成。
通过踩坑实践我觉得首先你需要对flink和k8s的一些概念、使用方式等等都非常了解,这样你才能对flink在k8s的部署的时候遇到问题很好的定位和解决问题。

创建一个名字为flink-cluster的namespace

kubectl create namespace flink-cluster
复制

创建一个账户

kubectl create serviceaccount flink -n flink-cluster
复制

service account和角色的绑定

kubectl create clusterrolebinding flink-role-binding-flink \ —clusterrole=edit \ —serviceaccount=flink-cluster:flink

k8s session 集群

启动session集群

./bin/kubernetes-session.sh \ -Dkubernetes.namespace=flink-cluster \ -Dkubernetes.jobmanager.service-account=flink \ -Dkubernetes.cluster-id=my-session \ -Dtaskmanager.memory.process.size=4096m \ -Dkubernetes.taskmanager.cpu=1 \ -Dtaskmanager.numberOfTaskSlots=4 \ -Dresourcemanager.taskmanager-timeout=3600000

往session集群提交jar任务

./bin/flink run -d \ -e kubernetes-session \ -Dkubernetes.namespace=flink-cluster \ -Dkubernetes.cluster-id=my-session \ examples/streaming/WindowJoin.jar

=============待调试:映射个端口出来访问=============

往session集群提交sql任务

在sql客户端执行以下的命令之后,可以把sql任务提交到刚才新建的flink session集群。
set kubernetes.cluster-id=my-session; set kubernetes.namespace=flink-cluster; set execution.target = kubernetes-session;