PyFlink 作为 Flink 的Python 语言入口大家应该都知道,Python语言的确很简单易学,但是说实话PyFlink的开发环境可不容易搭建,稍有不慎,你的PyFlink环境就会乱掉,而且很难排查原因。今天给大家介绍一款能够帮你解决这些问题的PyFlink开发环境利器:Zeppelin Notebook。也许你早就听说过Zeppelin,但是之前的文章都偏重讲述如何在Zeppelin里开发 Flink SQL, 今天特地写这篇文章来介绍下如何在Zeppelin里高效的开发PyFlink Job,特别是解决PyFlink的环境问题。

一句来总结这篇文章的主题就是在Zeppelin notebook里利用conda来创建Python env自动部署到yarn集群中,你无需手动在集群上去安装任何PyFlink的包,并且你可以在一个yarn集群里同时使用互相隔离的多个版本的PyFlink。最后你能看到的效果就是这样:

  1. 你能够在PyFlink 客户端使用第三方python库,比如matplotlib

image.png

  1. 你可以在PyFlink udf里使用第三方python库,如

image.png

好,废话不多说,我们来看看怎么来实现吧。


准备工作

Step 1.

准备好最新版本的Zeppelin的搭建,这个就不在这边展开了,如果有问题可以加入Flink on Zeppelin钉钉群(34517043)咨询。另外需要注意的是你的Zeppelin部署集群需要是Linux,如果是Mac的话,会导致在mac机器上打的conda 环境无法在Yarn集群里使用(因为conda包在不同系统间是不兼容的)

Step 2.

下载Flink 1.13, 注意本文的功能只能用在Flink 1.13以上版本,然后

搭建PyFlink环境

接下来你就可以在Zeppelin里搭建并且使用PyFlink了。

Step 1. 制作JobManager上的PyFlink conda环境

因为Zeppelin天生支持Shell,所以你可以在Zeppelin里用Shell来制作你的PyFlink 环境。注意这里的Python第三方包是你在PyFlink客户端(JobManager)需要的包,比如Matplotlib这些,并且确保你至少安装了下面这些包:

  • 某个版本的python(这里用的是3.7)
  • apache-flink (这里用的还是1.13.1)
  • jupyter,grpcio,protobuf(这三个包是Zeppelin需要的)

剩下的包你可以自己根据需要来指定

  1. %sh
  2. # make sure you have conda and momba installed.
  3. # install miniconda: https://docs.conda.io/en/latest/miniconda.html
  4. # install mamba: https://github.com/mamba-org/mamba
  5. echo "name: pyflink_env
  6. channels:
  7. - conda-forge
  8. - defaults
  9. dependencies:
  10. - python=3.7
  11. - pip
  12. - pip:
  13. - apache-flink==1.13.1
  14. - jupyter
  15. - grpcio
  16. - protobuf
  17. - matplotlib
  18. - pandasql
  19. - pandas
  20. - scipy
  21. - seaborn
  22. - plotnine
  23. " > pyflink_env.yml
  24. mamba env remove -n pyflink_env
  25. mamba env create -f pyflink_env.yml


运行下面的代码打包PyFlink的conda 环境并且上传到hdfs(注意这里打包出来的文件格式是tar.gz)

  1. %sh
  2. rm -rf pyflink_env.tar.gz
  3. conda pack --ignore-missing-files -n pyflink_env -o pyflink_env.tar.gz
  4. hadoop fs -rmr /tmp/pyflink_env.tar.gz
  5. hadoop fs -put pyflink_env.tar.gz /tmp
  6. # The python conda tar should be public accessible, so need to change permission here.
  7. hadoop fs -chmod 644 /tmp/pyflink_env.tar.gz

Step 2. 制作TaskManager上的PyFlink conda环境

运行下面的代码来创建TaskManager上的PyFlink conda环境,TaskManager上的PyFlink环境至少包含以下2个包:

  • 某个版本的python(这里用的是3.7)
  • apache-flink (这里用的还是1.13.1)

剩下的包是你的Python UDF需要依赖的包,比如这里指定了pandas

  1. echo "name: pyflink_tm_env
  2. channels:
  3. - conda-forge
  4. - defaults
  5. dependencies:
  6. - python=3.7
  7. - pip
  8. - pip:
  9. - apache-flink==1.13.1
  10. - pandas
  11. " > pyflink_tm_env.yml
  12. mamba env remove -n pyflink_tm_env
  13. mamba env create -f pyflink_tm_env.yml

运行下面的代码打包PyFlink的conda 环境并且上传到hdfs(注意这里使用的是zip格式)

  1. %sh
  2. rm -rf pyflink_tm_env.zip
  3. conda pack --ignore-missing-files --zip-symlinks -n pyflink_tm_env -o pyflink_tm_env.zip
  4. hadoop fs -rmr /tmp/pyflink_tm_env.zip
  5. hadoop fs -put pyflink_tm_env.zip /tmp
  6. # The python conda tar should be public accessible, so need to change permission here.
  7. hadoop fs -chmod 644 /tmp/pyflink_tm_env.zip

Step 3. 在PyFlink中使用Conda 环境

接下来你就可以在Zeppelin中使用上面创建的conda环境了,首先你需要在Zeppelin里配置你的Flink,主要配置的选项有:

  • flink.execution.mode 为 yarn-application, 本文所讲的方法只适用于yarn-application 模式
  • 指定 yarn.ship-archives,zeppelin.pyflink.python 以及 zeppelin.interpreter.conda.env.name 来配置JobManager 侧的PyFlink conda环境
  • 指定 python.archives 以及 python.executable 来指定 TaskManager 侧的PyFlink conda 环境
  • 指定其他可选的flink配置,比如这里的 flink.jm.memory 和 flink.tm.memory ``` %flink.conf

flink.execution.mode yarn-application

yarn.ship-archives /mnt/disk1/jzhang/zeppelin/pyflink_env.tar.gz zeppelin.pyflink.python pyflink_env.tar.gz/bin/python zeppelin.interpreter.conda.env.name pyflink_env.tar.gz

python.archives hdfs:///tmp/pyflink_tm_env.zip python.executable pyflink_tm_env.zip/bin/python3.7

flink.jm.memory 2048 flink.tm.memory 2048 ``` 接下来你就可以如一开始所说的那样在Zeppelin里使用PyFlink以及指定的conda环境了。有2种场景:

下面的例子里,你可以在PyFlink 客户端(JobManager侧)使用上面创建的JobManager 侧的conda环境,比如下边使用了Matplotlib

image.png

下面的例子里是在PyFlink UDF里使用上面创建的TaskManager 侧 conda 环境里的库,比如下面在UDF里使用Pandas

image.png

总结与未来

本文讲就是在Zeppelin notebook里利用conda来创建python env自动部署到yarn集群中,你无需手动在集群上去安装任何Pyflink的包,并且你可以在一个yarn集群里同时使用多个版本的PyFlink。每个PyFlink的环境都是隔离的,而且你可以随时定制更改你的conda 环境。你可以下载下面这个note并导入到你的Zeppelin就可以复现今天讲的内容:http://23.254.161.240/#/notebook/2G8N1WTTS

此外还有很多可以改进的地方:

  • 这里我们需要创建2个conda env ,注意原因是Zeppelin支持tar.gz 格式,而Flink只支持 zip 格式,后期2边统一之后,我们只要创建一个conda env就可以
  • apache-flink 现在包含了Flink的jar包,这就导致打出来的conda env特别大,yarn container在初始化的时候耗时会比较长,这个需要Flink社区提供一个轻量级的python包(不包含Flink jar包),就可以大大减小conda env的大小。

希望了解更多Flink on Zeppelin使用的同学可以加入下面的钉钉群来讨论。
image.png