独立集群

这里给出如何在集群上以完全分布式方式运行Flink程序的说明 .

需求

软件需求

Flink 需要运行在类unix环境下,比如 Linux, Mac OS X, 和 Cygwin (适用于Windows) 希望集群中至少由一个主节点和最少一个工作节点组成。 在开始运行前,需要保证 每个节点上都安装了一下软件:

  • Java 1.8.x 及其以上版本,
  • ssh (必须运行sshd才能使用管理远程组件的Flink脚本)

如果您的群集不满足这些软件要求,则需要安装/升级软件来达到要求.

通过 SSH互信相同的目录结构 这样我们的脚本就能控制所有内容.

JAVA_HOME 配置

Flink程序要求所有节点上必须设置JAVA_HOME环境变量,并JAVA_HOME环境变量指向JAVA的安装目录.

可以通过 conf/flink-conf.yaml 文件中的 env.java.home 配置来设置配置.

Flink 安装

转到 下载页 下载Flink安装程序. 程序中如果需要使用Hadoop,则选择的Flink版本要保证和 Hadoop 版本相匹配。否则可以选择任何版本

下载完毕后,复制到主节点并解压:

  1. tar xzf flink-*.tgz
  2. cd flink-*

配置 Flink

解压缩之后,需要修改Flink的配置文件,配置文件名称为 conf/flink-conf.yaml

修改 jobmanager.rpc.address 配置值为主节点,设置 jobmanager.heap.mbtaskmanager.heap.mb 用来指定JVM在每个节点上分配的最大堆内存量。

这些设置值单位为 MB. 如果某工作节点有更多主内存可以分配给Flink系统,可以修改节点上的 FLINK_TM_HEAP 配置来覆盖默认值。

最后,必须提供集群中所有节点的列表,这些节点将用作工作节点。与HDFS配置类似, 编辑conf/slaves文件,并输入每个工作节点的IP /主机名. 每个工作节点将运行 TaskManager.

下图说明了具有三个节点 (IP 地址 从10.0.0.110.0.0.3 和对应的主机名 master, worker1, worker2)的集群配置, 并显示了配置文件的内容(需要在所有计算机上的相同路径上访问)):

独立集群 - 图1

/path/to/flink/conf/ flink-conf.yaml

  1. jobmanager.rpc.address: 10.0.0.1

/path/to/flink/ conf/slaves

  1. 10.0.0.2
  2. 10.0.0.3

Flink目录必须每个worker上同一路径下都可用。可以使用共享NFS目录,也可以将整个Flink目录复制到每个工作节点。

有关详细信息和其他配置选项,请参阅 configuration page

需要额外注意的一些配置是,

  • 每个JobManager的可用内存总量 (jobmanager.heap.mb),
  • 每个TaskManager的可用内容总量 (taskmanager.heap.mb),
  • 每台机器的可用CPU数量 (taskmanager.numberOfTaskSlots),
  • 集群中的CPU总数 (parallelism.default)
  • 临时目录 (taskmanager.tmp.dirs)

这些是对Flink非常重要的配置值。

启动Flink

以下脚本在本地节点上启动JobManager,并通过ssh连接到所有slaves文件中定义的所有工作节点,启动TaskManger。 现在Flink系统已启动并正在运行。 在本地运行的JobManager 将通过配置的RPC端口来接受作业提交。

A假设您在主节点上并在Flink目录中:

  1. bin/start-cluster.sh

停止Flink对应的脚本是 stop-cluster.sh

将JobManager/TaskManager实例添加到群集

可以使用 bin/jobmanager.shbin/taskmanager.sh 脚本将JobManager和TaskManager实例添加到正在运行的集群中.

添加 JobManager

  1. bin/jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all

添加 TaskManager

  1. bin/taskmanager.sh start|start-foreground|stop|stop-all

需要保证在对应的机器上调用启动/停止脚本。