使用pip等python安装包安装pyflink的时候会包含一些主要的jar包,所以pyflink的python安装包很大,有200m左右。 而这些jar在下载的java Flink安装文件中都是有的。所以在使用zeppelin整合使用PyFlink的时候并不要通过pip安装PyFlink。

安装Python依赖

在TaskManager运行的机子和Zeppelin运行的机子上安装Python依赖

  1. ipython=7.20.0
  2. ipykernel=5.3.4
  3. jupyter-client==6.1.7
  4. grpcio=1.35.0
  5. protobuf=3.11.2
  6. pandas=1.2.1
  7. apache-beam==2.23.0 # 在Zeppelin运行的机子上可以没有

处理pyflink.zip包

TaskManager上面运行Python的udf时会运行shell脚本pyflink/bin/pyflink-udf-runner.sh在Flink的源码包中flink-python模块下有文件夹bin,这个文件夹中有需要的shell文件。解压Flink安装文件夹中opt/python/pyflink.zip文件,生成pyflink文件夹,把上述的bin文件夹拷贝到pyflink文件夹中,删除原有的pyflink.zip包。重新打包生成新的pyflink.zip包。

拷贝Flink安装文件夹中的python包

拷贝文件夹opt/python到TaskManager运行的主机上的/data/program/pyflink/opt/目录中。解压python文件夹中的cloudpickle-1.2.2-src.zip,py4j-0.10.8.1-src.zip,pyflink.zip有三个文件。生成cloudpickle,py4j,pyflink有三个Python包。

Zeppelin安装主机不需要设置Pyflink相关Python依赖

Zeppelin在运行PyFlink的时候会自动添加Flink包装中PyFlink相关的包到PYTHONPATH中,所以不需要设置PyFlink相关的依赖。

运行定义了Python UDF的Flink作业

在运行Flink作业的时候配置集群参数containerized.taskmanager.env.PYTHONPATH=/data/program/pyflink/opt/python, 这个参数设置了环境变量PYTHONPATH,Python会通过这个变量查找安装的Python包。在目录/data/program/pyflink/opt/python中有事先放入的cloudpickle,py4j,pyflink三个python包。