在本文中,我们将讨论如何设置我们的开发环境以创建高质量的python代码以及如何自动执行一些繁琐的任务来加速部署。
我们将介绍以下步骤:

  • 使用pipenv在隔离的虚拟环境中设置我们的依赖
  • 如何为多个作业设置项目结构
  • 如何运行pyspark工作
  • 如何使用Makefile 自动执行开发步骤
  • 如何使用flake8测试代码的质量
  • 如何使用pytest-spark为PySpark应用程序运行单元测试
  • 运行测试覆盖率,看看我们是否使用pytest-cov创建了足够的单元测试

第1步:设置虚拟环境

虚拟环境有助于我们将特定应用程序的依赖关系与系统的整体依赖关系隔离开来。这很好,因为我们不会涉及现有库的依赖性问题,并且在单独的系统(例如docker容器或服务器)上安装或卸载它们更容易。对于此任务,我们将使用pipenv。
要在mac os系统上安装它,例如运行:

  1. brew install pipenv

要为应用程序声明我们的依赖项(库),我们需要在项目的路径路径中创建一个Pipfile

  1. [[source]]
  2. url = 'https://pypi.python.org/simple'
  3. verify_ssl = true
  4. name = 'pypi'
  5. [requires]
  6. python_version = "3.6"
  7. [packages]
  8. flake8 = "*"
  9. pytest-spark = ">=0.4.4"
  10. pyspark = ">=2.4.0"
  11. pytest-cov = "*"

这里有三个组件。在[[source]]标签中,我们声明了下载所有软件包的url,在[requires]中我们定义了python版本,最后在[packages]中声明了我们需要的依赖项。我们可以将依赖项绑定到某个版本,或者使用**“符号来获取最新版本。
要创建虚拟环境并激活它,我们需要在终端中运行两个命令:

  1. pipenv --three install
  2. pipenv shell

一旦完成这一步,你应该看到你在一个新的venv中,让项目的名字出现在命令行的终端中(默认情况下,env采用项目的名称):

  1. (pyspark-project-template) host:project$

现在,您可以使用两个命令进出。
停用env并返回标准环境:

  1. deactivate

再次激活虚拟环境(您需要位于项目的根目录中):

  1. source `pipenv --venv`/bin/activate

第2步:项目结构

该项目可以具有以下结构:

  1. pyspark-project-template
  2. src/
  3. jobs/
  4. pi/
  5. __init__.py
  6. resources/
  7. args.json
  8. word_count/
  9. __init__.py
  10. resources/
  11. args.json
  12. word_count.csv
  13. main.py
  14. test/
  15. jobs/
  16. pi/
  17. test_pi.py
  18. word_count/
  19. test_word_count.py

排除一些init.py文件以简化操作,但您可以在本教程末尾的github上找到完整项目的链接。我们基本上有源代码和测试。每个作业都分成一个文件夹,每个作业都有一个资源文件夹,我们在其中添加该作业所需的额外文件和配置。
在本教程中,我使用了两个经典示例 -  pi,生成最多小数的pi数和字数,以计算csv文件中的单词数。

第3步:使用spark-submit运行作业

我们先来看看main.py文件的样子:

  1. if __name__ == '__main__':
  2. parser = argparse.ArgumentParser(description='My pyspark job arguments')
  3. parser.add_argument('--job', type=str, required=True, dest='job_name',
  4. help='The name of the spark job you want to run')
  5. parser.add_argument('--res-path', type=str, required=True, dest='res_path',
  6. help='Path to the jobs resurces')
  7. args = parser.parse_args()
  8. spark = SparkSession\
  9. .builder\
  10. .appName(args.job_name)\
  11. .getOrCreate()
  12. job_module = importlib.import_module('jobs.%s' % args.job_name)
  13. res = job_module.run(spark, get_config(args.res_path, args.job_name))
  14. print('[JOB {job} RESULT]: {result}'.format(job=args.job_name, result=res))

当我们运行我们的工作时,我们需要两个命令行参数:  - job,是我们想要运行的作业的名称(在例外pi或word_count中)和  - res-path,是作业的相对路径。我们需要第二个参数,因为spark需要知道我们资源的完整路径。在生产环境中,我们将代码部署在集群上,我们将资源转移到HDFS或S3,我们将使用该路径。
在进一步解释代码之前,我们需要提一下,我们必须压缩作业文件夹并将其传递给spark-submit语句假设我们在项目的根目录中:

  1. cd src/
  2. zip -r ../jobs.zip jobs/

这将使代码在我们的应用程序中作为模块提供。基本上在第16行的main.py中,我们以编程方式导入作业模块。
我们的作业piword_count都有一个run函数,所以我们只需要运行这个函数来启动这个作业(main.py中的第17行)。我们还在那里传递了工作的配置。
让我们看一下word_count作业,进一步了解这个例子:

  1. from operator import add
  2. def get_keyval(row):
  3. words = filter(lambda r: r is not None, row)
  4. return [[w.strip().lower(), 1] for w in words]
  5. def run(spark, config):
  6. df = spark.read.csv(config['relative_path'] + config['words_file_path'])
  7. mapped_rdd = df.rdd.flatMap(lambda row: get_keyval(row))
  8. counts_rdd = mapped_rdd.reduceByKey(add)
  9. return counts_rdd.collect()

此代码在word_count文件夹的init.py文件中定义。我们在这里可以看到,我们使用两个配置参数来读取资源文件夹中的csv文件:相对路径和csv文件的位置。其余的代码只计算单词,所以我们不会在这里详细介绍。值得一提的是,每个作业在resources文件夹中都有一个args.json文件。这里我们实际定义了传递给作业的配置。这是word_count作业的配置文件:

  1. {
  2. "words_file_path": "/word_count/resources/word_count.csv"
  3. }

所以我们现在有了所有细节来运行我们的spark-submit命令:

  1. spark-submit --py-files jobs.zip src/main.py --job word_count --res-path /your/path/pyspark-project-template/src/jobs

要运行另一个作业pi,我们只需要更改- job标志的参数  。

第4步:编写单元测试,并使用覆盖率运行它们

要为pyspark应用程序编写测试,我们使用pytest-spark,一个非常易于使用的模块。
WORD_COUNT工作单元测试:

  1. from src.jobs.word_count import get_keyval, run
  2. def test_get_keyval():
  3. words=['this', 'are', 'words', 'words']
  4. expected_results=[['this', 1], ['are', 1], ['words', 1], ['words', 1]]
  5. assert expected_results == get_keyval(words)
  6. def test_word_count_run(spark_session):
  7. expected_results = [('one', 1), ('two', 1), ('three', 2), ('four', 2), ('test', 1)]
  8. conf = {
  9. 'relative_path': '/your/path/pyspark-project-template/src/jobs',
  10. 'words_file_path': '/word_count/resources/word_count.csv'
  11. }
  12. assert expected_results == run(spark_session, conf)

我们需要从src模块导入我们想要测试的函数。这里更有趣的部分是我们如何进行test_word_count_run。我们可以看到没有初始化的spark会话,我们只是在测试中将其作为参数接收。这要归功于pytest-spark模块,因此我们可以专注于编写测试,而不是编写样板代码。
接下来让我们讨论一下代码覆盖率。我们怎么知道我们是否编写了足够的单元测试?很简单,我们运行测试覆盖工具,告诉我们尚未测试的代码。对于python,我们可以使用pytest-cov模块。要使用代码覆盖率运行所有测试,我们必须运行:

  1. pytest --cov=src test/jobs/

where  - cov flag告诉pytest在哪里检查覆盖范围。
测试覆盖率结果:

  1. ---------- coverage: platform darwin, python 3.7.2-final-0 -----------
  2. Name Stmts Miss Cover
  3. -----------------------------------------------------
  4. src/__init__.py 0 0 100%
  5. src/jobs/__init__.py 0 0 100%
  6. src/jobs/pi/__init__.py 11 0 100%
  7. src/jobs/word_count/__init__.py 9 0 100%
  8. -----------------------------------------------------
  9. TOTAL 20 0 100%

我们的测试覆盖率是100%,但是等一下,缺少一个文件!为什么main.py没有在那里列出?
如果我们认为我们有不需要测试的python代码,我们可以将它从报告中排除。为此,我们需要在项目的根目录中创建一个 .coveragerc文件。对于此示例,它看起来像这样:

  1. [run]
  2. omit = src/main.py

第5步:运行静态代码分析

很好,我们有一些代码,我们可以运行它,我们有良好的覆盖率的单元测试。我们做对了吗?还没!我们还需要确保按照python最佳实践编写易于阅读的代码。为此,我们必须使用名为flake8的python模块检查我们的代码
要运行它:

  1. flake8 ./src

它将分析src文件夹。如果我们有干净的代码,我们就不应该收到任何警告。但不,我们有一些问题:

  1. flake8 ./src
  2. ./src/jobs/pi/__init__.py:13:1: E302 expected 2 blank lines, found 1
  3. ./src/jobs/pi/__init__.py:15:73: E231 missing whitespace after ','
  4. ./src/jobs/pi/__init__.py:15:80: E501 line too long (113 > 79 characters)

我们来看看代码:

  1. from random import random
  2. from operator import add
  3. NUMBER_OF_STEPS_FACTOR = 100000
  4. def f(_):
  5. x = random() * 2 - 1
  6. y = random() * 2 - 1
  7. return 1 if x ** 2 + y ** 2 <= 1 else 0
  8. def run(spark, config):
  9. number_of_steps = config['partitions'] * NUMBER_OF_STEPS_FACTOR
  10. count = spark.sparkContext.parallelize(range(1, number_of_steps + 1),config['partitions']).map(f).reduce(add)
  11. return 4.0 * count / number_of_steps

我们可以看到在第13行我们有一个E302警告这意味着我们需要在两种方法之间增加一条线。然后是第15行的E231E501。这一行的第一个警告告诉我们,我们需要在和之间留出一个额外的空间,第二个警告通知我们线路太长,而且很难读(我们可以’甚至在要点中完整地看到它!)。**range(1, number_of_steps +1),** **config[**
解决所有警告后,代码看起来更容易阅读:

  1. from random import random
  2. from operator import add
  3. NUMBER_OF_STEPS_FACTOR = 100000
  4. def f(_):
  5. x = random() * 2 - 1
  6. y = random() * 2 - 1
  7. return 1 if x ** 2 + y ** 2 <= 1 else 0
  8. def run(spark, config):
  9. number_of_steps = config['partitions'] * NUMBER_OF_STEPS_FACTOR
  10. count = spark.sparkContext\
  11. .parallelize(range(1, number_of_steps + 1),
  12. config['partitions']).map(f).reduce(add)
  13. return 4.0 * count / number_of_steps

第6步:将所有内容与Makefile放在一起

因为我们在终端中运行了一堆命令,所以在最后一步中我们将研究如何简化和自动执行此任务。
我们可以在项目的根目录中创建一个Makefile,如下所示:

  1. .DEFAULT_GOAL := run
  2. init:
  3. pipenv --three install
  4. pipenv shell
  5. analyze:
  6. flake8 ./src
  7. run_tests:
  8. pytest --cov=src test/jobs/
  9. run:
  10. find . -name '__pycache__' | xargs rm -rf
  11. rm -f jobs.zip
  12. cd src/ && zip -r ../jobs.zip jobs/
  13. spark-submit --py-files jobs.zip src/main.py --job $(JOB_NAME) --res-path $(CONF_PATH)

如果我们想要使用coverage运行测试,我们只需输入:

  1. make run_tests

如果我们想要运行pi工作:

  1. make run JOB_NAME=pi CONF_PATH=/your/path/pyspark-project-template/src/jobs

这就是所有人!希望这个对你有帮助。
一如既往,代码存储在github上