测试时,我们使用jupyter notebook可以方便开发调试程序。但是,当我们实际生产过程中,需要执行一个计划作业时,该作业每小时都在运行,jupyter notebook就工作不了了。如何去利用可以重用的模块形式去编写脚本,打包好我们的Spark应用程序,直至最后提交Spark作业用于生产,这是需要我们掌握的一项技能。

1 spark-submit命令

  1. # 一般级别的,语法如下:
  2. spark-submit [options] <python file> [app arguments]

提交作业到Spark的入口点(本地或集群)是spark-submit脚本,该脚本不仅可以提交作业,也可以终止作业或检查其状态。
spark-submit命令提供了一个统一的API把应用程序部署到各种Spark支持的集群管理器上(如YARN),从而免除了单独配置每个应用程序。

PySpark命令行参数

—master:用于设置主节点的URL参数。
(1)local 用于执行本地机器的代码。
如果你传递local参数,Spark会运行一个单一的线程(不会利用任何并行线程)。
在一个多核机器上,local[n]来为Spark指定一个具体使用的内核数,n指的是使用的内核数。
通过loacl[*]来制定运行和Spark机器内核一样多的复杂线程。
(2)spark://host:port 这是一个URL和一个Spark单机集群的端口(不运行任何作业调度,如Mesos或者Yarn)
(3)mesos://host:port 这是一个URL和一个部署在Mesos上的Spark集群端口
(4)yarn 作为一个负载均衡器,用于从运行Yarn的主节点提交作业。

—deploy-mode:允许你决定是否在本地(使用client)启动Spark驱动程序的参数,或者在集群内(使用cluster)的任意一台机器上启动。此参数默认值为client。

—name:应用程序的名字,如果在创建SparkSession时,以编程方式指定应用程序名称,那么来自命令行的参数会被重写。

—py-files : .py、.egg或者.zip文件的逗号分隔列表,包括Python应用程序。这些文件将被交付给每一个执行器来使用。

—file :命令给出一个逗号分隔的文件列表,这些文件将被交付到每一个执行器来使用。

—conf : 参数通过命令行动态地更改应用程序的配置。语法是:

  1. <Spark property>=<value for the property>
  2. 例如:
  3. --conf spark.local.dir=/opt/Spark2.2.0/
  4. --conf spark.app.name=com.xxx.passengerflow.metro_jh

需要注意的是Spark有3个地方使用配置参数:
(1)最高优先级的是在SparkContext时,指定了SparkConf的参数获得最高优先权;
(2)第二优先权:spark-submit传递给的参数;
(3)第三优先权:conf/spark-default.conf文件中指定的参数。

—properties-file :配置文件,它应该有和conf/spark-defaults.conf文件相同的属性设置,也是可读的。

—driver-memory:指定应用程序在驱动程序上分配多少内存的参数。允许的值有一个语法限制,类似于1000M,2G。默认值为1024M。

—executor-memory:参数指定每个执行器上为应用程序分配多少内存。默认值为1G。

—help:展示帮助信息和退出

—verbose:在运行应用程序时打印附加调试信息。

—version:打印spark版本

—driver-cores:(在spark单机cluster或者Yarn上部署cluster模式下),允许指定驱动程序的内核数量(默认值为1)

—kill:将完成的过程赋予submission_id

—status:如果指定了该命令,它将请求指定的应用程序的状态。

—total-executor-cores:(在spark单机或Mesos client部署模式下)该参数会为所有执行器(不是每一个)请求指定的内核数量。

在YARN集群提交时可以指定的:

—queue:该参数指定了YARN上的队列,以便于将该作业提交到队列(默认值是default)

—num-executors:指定需要多个执行器来请求该作业的参数。如果启动了动态分配,则执行器的初始数量至少是指定的数量。

2 以编程方式部署应用程序

如何创建和配置SparkSession?

如何对Spark使用外部模块?

2.1 配置你的SparkSession(或者sc)

以编程方式使用Jupyter和提交作业的主要区别是,你必须创建Spark context上下文背景环境,而使用Jupyter运行Spark,上下文背景会自动开始。

2.2 创建SparkSession

Spark2.2.0

  1. from pyspark.sql import SparkSession
  2. spark = SparkSession \
  3. .builder \
  4. .appName('CalculatingGeoDistances') \
  5. .getOrCreate()
  6. print('Session created')
  7. 如果此时你想创建一个SparkContext,可以直接使用:
  8. sc = spark.SparkContext

Spark1.6.2

  1. from pyspark import SparkContext
  2. sc = SparkContext \
  3. .builder \
  4. .appName('CalculatingGeoDistances') \
  5. .getOrCreate()
  6. print('sc created')

2.3 模块化代码(重要重要!!!)

以模块化的形式构建代码,以便于以后重用该代码是一件值得做的事情。
例子:建立一个模块,并且在数据集上做一些计算,计算出上车和下车位置的直线距离(英里),并且将英里转换为公里。
(1)模块结构

11-打包Spark应用程序 - 图1

11-打包Spark应用程序 - 图2

在Python包的结构中,在顶层有一个setup.py文件,所以可以打包我们的模块。

setup.py文件内容如下:

  1. from setuptools import setup
  2. setup(
  3. name='PySparkUtilities',
  4. version='0.1dev',
  5. packages=['utilities', 'utilities/converters'],
  6. license='''
  7. Creative Commons
  8. Attribution-Noncommercial-Share Alike license''',
  9. long_description='''
  10. An example of how to package code for PySpark'''
  11. )

关于如何定义其他项目的setup.py文件,可以参考:
https://pythonhosted.org/an_example_pypi_project/setuptools.html
11-打包Spark应用程序 - 图3
init.py文件内容如下:

  1. from .geoCalc import geoCalc
  2. __all__ = ['geoCalc','converters']

(2)计算两点之间的距离
该代码位于该模块的geoCalc.py文件中

  1. import math
  2. class geoCalc(object):
  3. @staticmethod
  4. def calculateDistance(p1, p2):
  5. '''
  6. calculates the distance using Haversine formula
  7. '''
  8. R = 3959 # earth's radius in miles
  9. # get the coordinates
  10. lat1, lon1 = p1[0], p1[1]
  11. lat2, lon2 = p2[0], p2[1]
  12. # convert to radians
  13. deltaLat_radians = math.radians(lat2-lat1)
  14. deltaLon_radians = math.radians(lon2-lon1)
  15. lat1_radians = math.radians(lat1)
  16. lat2_radians = math.radians(lat2)
  17. # apply the formula
  18. hav = math.sin(deltaLat_radians / 2.0) * \
  19. math.sin(deltaLat_radians / 2.0) + \
  20. math.sin(deltaLon_radians / 2.0) * \
  21. math.sin(deltaLon_radians / 2.0) * \
  22. math.cos(lat1_radians) * \
  23. math.cos(lat2_radians)
  24. dist = 2 * R * math.asin(math.sqrt(hav))
  25. return dist
  26. if __name__ == '__main__':
  27. p1 = {'address': '301 S Jackson St, Seattle, WA 98104',
  28. 'lat': 47.599200,
  29. 'long': -122.329841}
  30. p2 = {'address': 'Thunderbird Films Inc 533, Smithe St #401, Vancouver, BC V6B 6H1, Canada',
  31. 'lat': 49.279688,
  32. 'long': -123.119190}
  33. print(geoCalc.calculateDistance((p1['lat'], p1['long']), (p2['lat'], p2['long'])))

calculateDistance()是geoCalc类的静态方法,它需要两个地理位置,表示为一个元祖或者一个具有两个元素的列表(按照顺序排列的维度和经度),并且使用Haversine公式计算距离(英里)。

(3)转变距离单位
为了便于使用,作为converter实现的任何类都应该公开类似的窗口,查看base.py文件内容:

  1. from abc import ABCMeta, abstractmethod
  2. class BaseConverter(metaclass=ABCMeta):
  3. @staticmethod
  4. @abstractmethod
  5. def convert(f, t):
  6. raise NotImplementedError
  7. if __name__ == '__main__':
  8. i = BaseConverter()

distance.py内容:

  1. from ..base import BaseConverter
  2. class metricImperial(BaseConverter):
  3. pass
  4. @staticmethod
  5. def convert(f, t):
  6. conversionTable = {
  7. 'in': {
  8. 'mm': 25.4, 'cm': 2.54, 'm': 0.0254,
  9. 'km': 0.0000254
  10. }, 'ft': {
  11. 'mm': 304.8, 'cm': 30.48, 'm': 0.3048,
  12. 'km': 0.0003048
  13. }, 'yd': {
  14. 'mm': 914.4, 'cm': 91.44, 'm': 0.9144,
  15. 'km': 0.0009144
  16. }, 'mile': {
  17. 'mm': 1609344, 'cm': 160934.4, 'm': 1609.344,
  18. 'km': 1.609344
  19. }
  20. }
  21. f_val, f_unit = f.split(' ')
  22. f_val = float(f_val)
  23. if f_unit in conversionTable.keys():
  24. if t in conversionTable[f_unit].keys():
  25. conv = 1 / conversionTable[f_unit][t]
  26. else:
  27. raise KeyError('Key {0} not found...' \
  28. .format(t))
  29. elif t in conversionTable.keys():
  30. if f_unit in conversionTable[t].keys():
  31. conv = conversionTable[t][f_unit]
  32. else:
  33. raise KeyError('Key {0} not found...' \
  34. .format(f_unit))
  35. else:
  36. raise KeyError('Neither {0} nor {1} key found'\
  37. .format(t, f_unit))
  38. return f_val / conv
  39. if __name__ == '__main__':
  40. f = metricImperial()
  41. print(f.convert('10 mile', 'km'))

(4)打包:创建一个egg文件
PySpark文档指出,可以用逗号来分隔传递.py文件给spark-submit脚本。其实最方便的是把模块打包进一个.zip或者一个.egg。当setup.py文件方便使用时,调用additionalCode文件夹里的内容:

  1. python setup.py bdist_egg

可以看到3个文件夹:PySparkUtilities.egg-info、build和dist
dist文件夹有:PySparkUtilities-0.1.dev0-py3.5.egg

(5)Spark中用户定义函数

为了对在PySpark中的DataFrame执行操作,有两个选择:使用内置函数来处理数据(大多数情况下都足以达到你的需求,且作为更高性能的代码而被推荐使用);但是有的时候需要自定义函数去实现若干功能选项。

为了定义一个UDF,必须把Python函数封装在.udf()方法中,并且定义它的返回值类型。以下是我们如何在脚本中实现
calculatingGeoDistance.py文件

  1. import utilities.geoCalc as geo
  2. from utilities.converters import metricImperial
  3. from pyspark.sql import SparkSession
  4. from pyspark.sql.types import *
  5. import pyspark.sql.functions as func
  6. def geoEncode(spark):
  7. # read the data in
  8. uber = spark.read.csv(
  9. 'uber_data_nyc_2016-06_3m_partitioned.csv',
  10. header=True,
  11. inferSchema=True
  12. )\
  13. .repartition(4) \
  14. # .select('VendorID','tpep_pickup_datetime', 'pickup_longitude', 'pickup_latitude','dropoff_longitude','dropoff_latitude','total_amount')
  15. # prepare the UDFs
  16. getDistance = func.udf(
  17. lambda lat1, long1, lat2, long2:
  18. geo.calculateDistance(
  19. (lat1, long1),
  20. (lat2, long2)
  21. )
  22. )
  23. convertMiles = func.udf(lambda m:
  24. metricImperial.convert(str(m) + ' mile', 'km'))
  25. # create new columns
  26. uber = uber.withColumn(
  27. 'miles',
  28. getDistance(
  29. func.col('pickup_latitude'),
  30. func.col('pickup_longitude'),
  31. func.col('dropoff_latitude'),
  32. func.col('dropoff_longitude')
  33. )
  34. )
  35. uber = uber.withColumn(
  36. 'kilometers',
  37. convertMiles(func.col('miles')))
  38. # print 10 rows
  39. # uber.show(10)
  40. # save to csv (partitioned)
  41. uber.write.csv(
  42. 'uber_data_nyc_2016-06_new.csv',
  43. mode='overwrite',
  44. header=True,
  45. compression='gzip'
  46. )
  47. if __name__ == '__main__':
  48. spark = SparkSession \
  49. .builder \
  50. .appName('CalculatingGeoDistances') \
  51. .getOrCreate()
  52. print('Session created')
  53. try:
  54. geoEncode(spark)
  55. finally:
  56. spark.stop()

2.4 提交作业

可以在命令行键入以下命令:

  1. ./launch_spark_submit.sh \
  2. --master local[4] \
  3. --py-files additionalCode/dist/PySparkUtilities-0.1.dev0-
  4. py3.5.egg \
  5. calculatingGeoDistance.py

其中launch_spark_submit.sh是spark-submit命令的封装,通过对jupyter设置了PYSPARK_DRIVER_PYTHON系统变量:

  1. #!/bin/bash
  2. unset PYSPARK_DRIVER_PYTHON
  3. spark-submit $*
  4. export PYSPARK_DRIVER_PYTHON=jupyter

2.5 实际项目中,spark-submit的设置和打包

(1)命令行模式

集群模式:

读取的文件必须放在HDFS上:

  1. spark-submit --master yarn \
  2. --deploy-mode cluster \
  3. --num-executors 25 \
  4. --executor-cores 2 \
  5. --driver-memory 4g \
  6. --executor-memory 4g \
  7. --conf spark.broadcast.compress=true \
  8. --jars "/data/spark/ojdbc6-11.2.0.3.jar" com.meihuichina.passengerflow.grid1km_laccell_grid100m.py > /home/ydzhao/log/.out 2>&1

(2)单节点模式

单节点启动PySpark Shell
  1. 1pyspark --jars "/data/spark/ojdbc6-11.2.0.3.jar"
  2. 2pyspark --jars "/data/spark/ojdbc6-11.2.0.3.jar" \
  3. --num-executors 25 \
  4. --executor-cores 2 \
  5. --driver-memory 4g \
  6. --executor-memory 8g

单节点提交任务
  1. spark-submit --jars "/data/spark/ojdbc6-11.2.0.3.jar" /pyspark_app/test.py
  2. ./bin/spark-submit --master lcoal[*] /home/ydzhao/pyspark_app/com.meihuichina.passengerflow.grid1km_laccell_grid100m.py

(3)编辑打包成test.sh Shell脚本

  1. #!/usr/bin/env bash
  2. spark-submit --master yarn \
  3. --deploy-mode cluster \
  4. --num-executors 25 \
  5. --executor-cores 2 \
  6. --driver-memory 4g \
  7. --executor-memory 4g \
  8. --conf spark.broadcast.compress=true \
  9. --conf spark.yarn.executor.memoryOverhead=900 \
  10. --conf spark.sql.shuffle.partitions=20 \
  11. --jars "/data/spark/ojdbc6-11.2.0.3.jar" com.meihuichina.passengerflow.grid1km_laccell_grid100m.py > /home/ydzhao/log/.out 2>&1
  12. # ./test.sh
  13. # 给test.sh相关权限
  14. chmod u+x test.sh

运行test.sh

./test.sh

2.6 监控执行

11-打包Spark应用程序 - 图4
11-打包Spark应用程序 - 图5

3 PySpark实例——Spark On YARN将HDFS的数据写入Redis

首先把redis包引入工程,这样就不需要在集群里每台机器上安装redis客户端了。

  1. $pip install redis
  2. $cd /usr/local/lib/python3.4/dist-packages
  3. $zip -r redis.zip redis/*
  4. $hadoop fs -put redis.zip /user/data/

然后就可以在代码里使用 addPyFile加载redis.zip了。
11-打包Spark应用程序 - 图6

  1. 运行:
  2. $SPARK_HOME/bin/spark-submit
  3. --conf spark.yarn.submit.waitAppCompletion=true
  4. --master yarn-cluster
  5. --num-executors 4
  6. --driver-memory 32G
  7. --executor-memory 32G
  8. --executor-cores 4
  9. --queue root.default
  10. /opt/spark_redis.py