关于在Spark上运行Python有无数文章和论坛帖子,但大多数人认为要提交的工作包含在一个.py文件中:spark-submit wordcount.py - 完成!
如果你的Python程序不仅仅是一个脚本怎么办?也许它为Spark生成动态SQL来执行,或使用Spark的输出刷新模型。随着您的Python代码变得更像一个应用程序(具有目录结构,配置文件和库依赖项),将其提交给Spark需要更多考虑。
以下是我最近考虑使用Spark 2.3将一个这样的Python应用程序用于生产时的替代方案。第一篇文章重点介绍Spark独立集群。另一篇文章介绍了EMR Spark(YARN)。
我远不是Spark的权威,更不用说Python了。我的决定试图平衡正确性和易于部署,以及应用程序对群集的限制。让我知道你的想法。

趋势AI文章:

1.使用机器学习预测购买行为> 2.理解和构建生成对抗网络(GAN)> 3.使用OpenCV和Haar Cascades构建Django POST面部检测API> 4.通过Hindsight Experience Replay从错误中学习

示例Python应用程序

为了模拟完整的应用程序,下面的场景假定Python 3应用程序具有以下结构:

  1. project.py
  2. data /
  3. data_source.py
  4. data_source.ini

data_source.ini包含各种配置参数:

  1. [spark]
  2. app_name =myPySpark App
  3. master_url = spark// sparkmaster:7077

data_source.py是一个模块,负责在Spark中获取和处理数据,使用NumPy进行数学转换,并将Pandas数据帧返回给客户端。依赖关系:

  1. from pyspark import SparkConf, SparkContext
  2. from pyspark.sql import SparkSession
  3. from pyspark.sql.types import StructType, StructField, FloatType
  4. import pandas as pd
  5. import numpy as np
  6. import configparser

它定义了一个创建和初始化的DataSource类……SparkContext``SparkSession

  1. class DataSource:
  2. def __init__(self):
  3. config = configparser.ConfigParser()
  4. config.read('./data/data_source.ini')
  5. master_url = config['spark']['master_url']
  6. app_name = config['spark']['app_name']
  7. conf = SparkConf().setAppName(app_name) \
  8. .setMaster(master_url)
  9. self.sc = SparkContext(conf=conf)
  10. self.spark = SparkSession.builder \
  11. .config(conf=conf) \
  12. .getOrCreate()

…和一个get_data()方法:

  1. 从NumPy正态分布创建RDD。
  2. 应用函数将每个元素的值加倍。
  3. 将RDD转换为Spark数据帧并在顶部定义临时视图。
  4. 应用Python UDF,使用SQL对每个dataframe元素的内容进行平方。
  5. 将结果作为Pandas数据帧返回给客户端。
  1. def get_data(self, num_elements=1000) -> pd.DataFrame:
  2. mu, sigma = 2, 0.5
  3. v = np.random.normal(mu, sigma, num_elements)
  4. rdd1 = self.sc.parallelize(v)
  5. def mult(x): return x * np.array([2])
  6. rdd2 = rdd1.map(mult).map(lambda x: (float(x),))
  7. schema = StructType([StructField("value", FloatType(), True)])
  8. df1 = self.spark.createDataFrame(rdd2, schema)
  9. df1.registerTempTable("test")
  10. def square(x): return x ** 2
  11. self.spark.udf.register("squared", square)
  12. df2 = self.spark.sql("SELECT squared(value) squared FROM test")
  13. return df2.toPandas()

project.py是我们的主程序,充当上述模块的客户端:

  1. from data.data_source import DataSource
  2. def main():
  3. src = DataSource()
  4. df = src.get_data(num_elements=100000)
  5. print(f"Got Pandas dataframe with {df.size} elements")
  6. print(df.head(10))
  7. main()

克隆回购:https//bitbucket.org/calidoteam/pyspark.git
在开始之前,让我们回顾一下向Spark提交工作时可用的选项。

spark-submit,客户端和集群模式

  • Spark支持各种集群管理器:独立(即内置于Spark),Hadoop的YARN,Mesos,Kubernetes,所有这些都控制着工作负载在一组资源上的运行方式。
  • spark-submit是唯一与所有集群管理器一致的接口。对于Python应用程序,spark-submit可以在需要时上载和暂存您提供的所有依赖项,如.py,.zip或.egg文件。
  • 客户端模式下驱动程序_)将在运行的同一主机上spark-submit运行。确保此类主机靠近工作节点以减少网络延迟符合您的最佳利益。
  • 集群模式下驱动程序某个工作节点并从其运行。从远程主机提交作业时,这很有用。从Spark 2.4独立开始,Spark 2.4.0集群模式不是一个选项。
  • 或者,可以spark-submit通过SparkSession在Python应用程序中配置连接到群集来绕过。这需要正确的配置和匹配的PySpark二进制文件。您的Python应用程序将在客户端模式下有效运行:它将从您启动它的主机上运行。

以下部分描述了几种部署方案,以及每种方案中需要的配置。


#1:直接连接到Spark(客户端模式,无spark-submit)

1_HcQRb-GgmYnWRF15yobCOw.png
针对Spark独立的客户端模式的Python应用程序

这是最简单的部署方案:Python应用程序通过指向Spark主URL直接建立spark上下文,并使用它来提交工作:

  1. conf = SparkConf().setAppName("My PySpark App") \
  2. .setMaster("spark://192.168.1.10:7077")
  3. sc = SparkContext(conf=conf)
  4. spark = SparkSession.builder \
  5. .config(conf=conf) \
  6. .getOrCreate()

独立群集中,资源在作业持续时间内分配,默认配置为客户端应用程序提供所有可用资源,因此需要对多租户环境进行微调。执行程序进程(JVM或python)由每个节点本地的工作进程启动。
这类似于传统的客户端 - 服务器应用程序,因为客户端只是“连接”到“远程”集群。建议:
确保驱动程序和群集之间有足够的带宽。大多数网络活动发生在驱动程序和它的执行程序之间,因此这个“远程”集群实际上必须在近距离(LAN)内。
通过启用Apache Arrow改进Java-Python序列化: Python工作负载(NumPy,Pandas和其他应用于Spark RDD,数据帧和数据集的转换)默认需要大量的Java和Python进程的序列化和反序列化,并且会迅速降低性能。从Spark 2.3开始,启用Apache Arrow(包含在下面列出的步骤中)使这些传输更加高效
跨所有群集节点和驱动程序主机部署依赖关系。这包括下载和安装Python 3,pip安装PySpark(必须与目标集群的版本匹配),PyArrow以及其他库依赖项:

  1. sudo yum install python36
  2. pip install pyspark==2.3.1
  3. pip install pyspark[sql]
  4. pip install numpy pandas msgpack sklearn

注意:在安装像PySpark这样的大型库(~200MB)时,可能会遇到以“ MemoryError” 结尾的错误。如果是这样,请尝试:

  1. pip install --no-cache-dir pyspark==2.3.1

配置和环境变量:在客户端,$SPARK_HOME必须指向pip安装PySpark的位置:

  1. $ pip show pyspark
  2. Name: pyspark
  3. Version: 2.3.1
  4. Summary: Apache Spark Python API
  5. Home-page: https://github.com/apache/spark/tree/master/python
  6. Author: Spark Developers
  7. Author-email: dev@spark.apache.org
  8. License: http://www.apache.org/licenses/LICENSE-2.0
  9. Location: /opt/anaconda/lib/python3.6/site-packages
  10. Requires: py4j
  11. $ export SPARK_HOME=/opt/anaconda/lib/python3.6/site-packages

在每个群集节点上,设置其他默认参数和环境变量。特别是对于Python应用程序:
$SPARK_HOME/conf/spark-defaults.sh

  • spark.sql.execution.arrow.enabled true

$SPARK_HOME/conf/spark-env.sh

  • export PYSPARK_PYTHON=/usr/bin/python3 :Python可执行文件,所有节点。
  • export PYSPARK_DRIVER_PYTHON=/usr/bin/python3 :驱动程序的Python可执行文件,如果与执行程序节点不同。

注:环境变量是从哪里读spark-submit推出,不一定是从群集主机内。

运行它

将工作负载提交到集群只需运行Python应用程序(例如spark-submit,不需要):

  1. $ cd my-project-dir/
  2. $ python3 project.py

在运行时,可以看到运行多个python3进程的从属节点在作业上运行:

1_l-aJ47luaPs9wpj_FTlMSQ.png

#2:集装箱式应用(客户端模式,无火花提交)

1_zcpriTNz1FM6kWUpQNr_RA.png
这是前一个场景的扩展,由于可移植性等原因,最好将Python应用程序作为Docker容器运行,作为CI / CD管道的一部分。
除了针对上一个方案推荐的配置外,还需要以下内容:
构建容器以包含所有依赖项:从包含Python 3和/或Java 8 OpenJDK的映像开始,然后pip-install PySpark,PyArrow以及应用程序所需的所有其他库。
配置Spark驱动程序主机和端口,在容器中打开它们:这是执行程序到达容器内驱动程序所必需的。可以通过编程方式设置驱动程序的Spark属性(spark.conf.set(“property”, “value”)):

  1. spark.driver.host : host_ip_address (e.g. 192.168.1.10)
  2. spark.driver.port : static_port (e.g. 51400)
  3. spark.driver.bindAddress : container_internal_ip (e.g. 10.192.6.81)
  4. spark.driver.blockManagerPort : static_port (e.g. 51500)

在Docker中,端口可以使用-p选项从命令行公开到外部:-p 51400:51400 -p 51500:51500。其他文章建议只发布此端口范围:-p 5000–5010:5000–5010

运行它

与前一个场景一样,运行容器将启动Python驱动程序:

  1. docker run -p 5140051400 -p 5150051500 <docker_image_url>

#3:通过spark-submit提供Python应用程序(客户端模式)

此方案实际上与方案#1相同,仅为了清楚起见包含在此处。唯一的区别是Python应用程序是使用该spark-submit进程启动的。除了日志文件之外,还会将群集事件发送到stdout

  1. $ cd my-project-dir/
  2. $ ls -l
  3. rwxrwxr-x. 3 centos centos 70 Feb 25 02:11 data
  4. -rw-rw-r--. 1 centos centos 220 Feb 25 01:09 project.py
  5. $ spark-submit project.py

笔记:

  • 根据我的经验,spark-submit只要从项目根目录(my-project-dir/)调用它就没有必要在调用时传递依赖的子目录/文件。
  • 由于示例应用程序已指定主URL,因此无需将其传递给spark-submit。否则,更完整的命令将是:
  1. $ spark-submit --master spark://sparkcas1:7077 --deploy-mode client project.py
  • 从Spark 2.3开始,无法将集群模式下的Python应用程序提交给独立的Spark集群。这样做会产生错误:
  1. $ spark-submit --master spark://sparkcas1:7077 --deploy-mode cluster project.py
  2. Error: Cluster deploy mode is currently not supported for python applications on standalone clusters.

Takeaways- Python on Spark独立集群:

  • 虽然独立群集在生产中不受欢迎(可能是因为商业支持的分发包括群集管理器),但只要不需要多租户和动态资源分配,它们的占用空间就会更小并且做得很好。
  • 对于Python应用程序,部署选项仅限于客户端模式。
  • 使用Docker来容纳Python应用程序具有所有预期的优势,非常适合客户端模式部署。