PyFlink 作为 Flink 的Python 语言入口大家应该都知道,Python语言的确很简单易学,但是说实话PyFlink的开发环境可不容易搭建,稍有不慎,你的PyFlink环境就会乱掉,而且很难排查原因。今天给大家介绍一款能够帮你解决这些问题的PyFlink开发环境利器:Zeppelin Notebook。也许你早就听说过Zeppelin,但是之前的文章都偏重讲述如何在Zeppelin里开发 Flink SQL, 今天特地写这篇文章来介绍下如何在Zeppelin里高效的开发PyFlink Job,特别是解决PyFlink的环境问题。
一句来总结这篇文章的主题就是在Zeppelin notebook里利用conda来创建Python env自动部署到yarn集群中,你无需手动在集群上去安装任何PyFlink的包,并且你可以在一个yarn集群里同时使用互相隔离的多个版本的PyFlink。最后你能看到的效果就是这样:
- 你能够在PyFlink 客户端使用第三方python库,比如matplotlib
- 你可以在PyFlink udf里使用第三方python库,如
准备工作
Step 1.
准备好最新版本的Zeppelin的搭建,这个就不在这边展开了,如果有问题可以加入Flink on Zeppelin钉钉群(34517043)咨询。另外需要注意的是你的Zeppelin部署集群需要是Linux,如果是Mac的话,会导致在mac机器上打的conda 环境无法在Yarn集群里使用(因为conda包在不同系统间是不兼容的)
Step 2.
下载Flink 1.13, 注意本文的功能只能用在Flink 1.13以上版本,然后
- 把 flink-python-*.jar 这个jar包copy到Flink的lib文件夹下
把 opt/python 这个文件夹copy到Flink的lib文件夹下
Step 3.
安装以下软件(这些软件是用于创建Conda env的):
- conda pack https://conda.github.io/conda-pack/
- mamba https://github.com/mamba-org/mamba
搭建PyFlink环境
接下来你就可以在Zeppelin里搭建并且使用PyFlink了。
Step 1. 制作JobManager上的PyFlink conda环境
因为Zeppelin天生支持Shell,所以你可以在Zeppelin里用Shell来制作你的PyFlink 环境。注意这里的Python第三方包是你在PyFlink客户端(JobManager)需要的包,比如Matplotlib这些,并且确保你至少安装了下面这些包:
- 某个版本的python(这里用的是3.7)
- apache-flink (这里用的还是1.13.1)
- jupyter,grpcio,protobuf(这三个包是Zeppelin需要的)
剩下的包你可以自己根据需要来指定
%sh
# make sure you have conda and momba installed.
# install miniconda: https://docs.conda.io/en/latest/miniconda.html
# install mamba: https://github.com/mamba-org/mamba
echo "name: pyflink_env
channels:
- conda-forge
- defaults
dependencies:
- python=3.7
- pip
- pip:
- apache-flink==1.13.1
- jupyter
- grpcio
- protobuf
- matplotlib
- pandasql
- pandas
- scipy
- seaborn
- plotnine
" > pyflink_env.yml
mamba env remove -n pyflink_env
mamba env create -f pyflink_env.yml
运行下面的代码打包PyFlink的conda 环境并且上传到hdfs(注意这里打包出来的文件格式是tar.gz)
%sh
rm -rf pyflink_env.tar.gz
conda pack --ignore-missing-files -n pyflink_env -o pyflink_env.tar.gz
hadoop fs -rmr /tmp/pyflink_env.tar.gz
hadoop fs -put pyflink_env.tar.gz /tmp
# The python conda tar should be public accessible, so need to change permission here.
hadoop fs -chmod 644 /tmp/pyflink_env.tar.gz
Step 2. 制作TaskManager上的PyFlink conda环境
运行下面的代码来创建TaskManager上的PyFlink conda环境,TaskManager上的PyFlink环境至少包含以下2个包:
- 某个版本的python(这里用的是3.7)
- apache-flink (这里用的还是1.13.1)
剩下的包是你的Python UDF需要依赖的包,比如这里指定了pandas
echo "name: pyflink_tm_env
channels:
- conda-forge
- defaults
dependencies:
- python=3.7
- pip
- pip:
- apache-flink==1.13.1
- pandas
" > pyflink_tm_env.yml
mamba env remove -n pyflink_tm_env
mamba env create -f pyflink_tm_env.yml
运行下面的代码打包PyFlink的conda 环境并且上传到hdfs(注意这里使用的是zip格式)
%sh
rm -rf pyflink_tm_env.zip
conda pack --ignore-missing-files --zip-symlinks -n pyflink_tm_env -o pyflink_tm_env.zip
hadoop fs -rmr /tmp/pyflink_tm_env.zip
hadoop fs -put pyflink_tm_env.zip /tmp
# The python conda tar should be public accessible, so need to change permission here.
hadoop fs -chmod 644 /tmp/pyflink_tm_env.zip
Step 3. 在PyFlink中使用Conda 环境
接下来你就可以在Zeppelin中使用上面创建的conda环境了,首先你需要在Zeppelin里配置你的Flink,主要配置的选项有:
- flink.execution.mode 为 yarn-application, 本文所讲的方法只适用于yarn-application 模式
- 指定 yarn.ship-archives,zeppelin.pyflink.python 以及 zeppelin.interpreter.conda.env.name 来配置JobManager 侧的PyFlink conda环境
- 指定 python.archives 以及 python.executable 来指定 TaskManager 侧的PyFlink conda 环境
- 指定其他可选的flink配置,比如这里的 flink.jm.memory 和 flink.tm.memory ``` %flink.conf
flink.execution.mode yarn-application
yarn.ship-archives /mnt/disk1/jzhang/zeppelin/pyflink_env.tar.gz zeppelin.pyflink.python pyflink_env.tar.gz/bin/python zeppelin.interpreter.conda.env.name pyflink_env.tar.gz
python.archives hdfs:///tmp/pyflink_tm_env.zip python.executable pyflink_tm_env.zip/bin/python3.7
flink.jm.memory 2048 flink.tm.memory 2048 ``` 接下来你就可以如一开始所说的那样在Zeppelin里使用PyFlink以及指定的conda环境了。有2种场景:
下面的例子里,你可以在PyFlink 客户端(JobManager侧)使用上面创建的JobManager 侧的conda环境,比如下边使用了Matplotlib
下面的例子里是在PyFlink UDF里使用上面创建的TaskManager 侧 conda 环境里的库,比如下面在UDF里使用Pandas
总结与未来
本文讲就是在Zeppelin notebook里利用conda来创建python env自动部署到yarn集群中,你无需手动在集群上去安装任何Pyflink的包,并且你可以在一个yarn集群里同时使用多个版本的PyFlink。每个PyFlink的环境都是隔离的,而且你可以随时定制更改你的conda 环境。你可以下载下面这个note并导入到你的Zeppelin就可以复现今天讲的内容:http://23.254.161.240/#/notebook/2G8N1WTTS
此外还有很多可以改进的地方:
- 这里我们需要创建2个conda env ,注意原因是Zeppelin支持tar.gz 格式,而Flink只支持 zip 格式,后期2边统一之后,我们只要创建一个conda env就可以
- apache-flink 现在包含了Flink的jar包,这就导致打出来的conda env特别大,yarn container在初始化的时候耗时会比较长,这个需要Flink社区提供一个轻量级的python包(不包含Flink jar包),就可以大大减小conda env的大小。
希望了解更多Flink on Zeppelin使用的同学可以加入下面的钉钉群来讨论。