有了前面的基础,我们就可以利用PySpark进行数据的分析与挖掘,而通常我们需要花费80%——90%的时间用于数据的预处理,这是我们后续建模开发的基础,本文将基于PySpark对大规模数据预处理从实际业务场景出发进行分析,从而更接地气!!!

1 重复数据

spark 2.0+
  1. df = spark.createDataFrame([
  2. (1, 144.5, 5.9, 33, 'M'),
  3. (2, 167.2, 5.4, 45, 'M'),
  4. (3, 124.1, 5.2, 23, 'F'),
  5. (4, 144.5, 5.9, 33, 'M'),
  6. (5, 133.2, 5.7, 54, 'F'),
  7. (3, 124.1, 5.2, 23, 'F'),
  8. (5, 129.2, 5.3, 42, 'M')],
  9. ['id',
  10. 'weight',
  11. 'height',
  12. 'age',
  13. 'gender']
  14. )

spark 1.6.2
  1. df = sqlContext.createDataFrame([
  2. (1, 144.5, 5.9, 33, 'M'),
  3. (2, 167.2, 5.4, 45, 'M'),
  4. (3, 124.1, 5.2, 23, 'F'),
  5. (4, 144.5, 5.9, 33, 'M'),
  6. (5, 133.2, 5.7, 54, 'F'),
  7. (3, 124.1, 5.2, 23, 'F'),
  8. (5, 129.2, 5.3, 42, 'M')],
  9. ['id',
  10. 'weight',
  11. 'height',
  12. 'age',
  13. 'gender']
  14. )

1.1 检查重复数据

用 .distinct() 完全相同的两行或多行去重

  1. print('Count of rows: {0}'.format(df.count()))
  2. print('Count of distinct rows: {0}'.format(df.distinct().count()))
  3. Count of rows: 7
  4. Count of distinct rows: 6

用 .dropDuplicates(…) 方法去除重复行,具体用法可以参考本文集 Pyspark.SQL模块介绍

  1. df = df.dropDuplicates()
  2. df.show()
  3. +---+------+------+---+------+
  4. | id|weight|height|age|gender|
  5. +---+------+------+---+------+
  6. | 5| 129.2| 5.3| 42| M|
  7. | 4| 144.5| 5.9| 33| M|
  8. | 3| 124.1| 5.2| 23| F|
  9. | 2| 167.2| 5.4| 45| M|
  10. | 1| 144.5| 5.9| 33| M|
  11. | 5| 133.2| 5.7| 54| F|
  12. +---+------+------+---+------+

除了ID字段外的消除重复行

  1. print('Count of ids: {0}'.format(df.count()))
  2. #print('Count of distinct ids: {0}'.format(df.select([c for c in df.columns if c != 'id']).distinct().count()))
  3. print('Count of distinct ids:{0}'.format(df.select(["weight","height","age","gender"]).distinct().count()))
  4. Count of ids: 6
  5. Count of distinct ids:5

使用 .dropDuplicates(…) 处理重复行,但是添加 subset 参数

  1. # df = df.dropDuplicates(subset=[c for c in df.columns if c != 'id'])
  2. df =df.dropDuplicates(["weight","height","age","gender"])
  3. df.show()
  4. +---+------+------+---+------+
  5. | id|weight|height|age|gender|
  6. +---+------+------+---+------+
  7. | 5| 133.2| 5.7| 54| F|
  8. | 4| 144.5| 5.9| 33| M|
  9. | 2| 167.2| 5.4| 45| M|
  10. | 5| 129.2| 5.3| 42| M|
  11. | 3| 124.1| 5.2| 23| F|
  12. +---+------+------+---+------+

利用.agg(…)函数计算ID的总数和ID唯一个数

  1. import pyspark.sql.functions as fn
  2. df.agg(
  3. fn.count('id').alias('id_count'),
  4. fn.countDistinct('id').alias('id_distinct')
  5. ).show()
  6. +--------+-----------+
  7. |id_count|id_distinct|
  8. +--------+-----------+
  9. | 5| 4|
  10. +--------+-----------+

给每一行一个唯一的ID号,相当于新增一列数字

  1. df.withColumn('new_id', fn.monotonically_increasing_id()).show()
  2. +---+------+------+---+------+------------+
  3. | id|weight|height|age|gender| new_id|
  4. +---+------+------+---+------+------------+
  5. | 5| 133.2| 5.7| 54| F|283467841536|
  6. | 4| 144.5| 5.9| 33| M|352187318272|
  7. | 2| 167.2| 5.4| 45| M|386547056640|
  8. | 5| 129.2| 5.3| 42| M|438086664192|
  9. | 3| 124.1| 5.2| 23| F|463856467968|
  10. +---+------+------+---+------+------------+

1.2 缺失值观察

  1. df_miss = sqlContext.createDataFrame([
  2. (1, 143.5, 5.6, 28, 'M', 100000),
  3. (2, 167.2, 5.4, 45, 'M', None),
  4. (3, None , 5.2, None, None, None),
  5. (4, 144.5, 5.9, 33, 'M', None),
  6. (5, 133.2, 5.7, 54, 'F', None),
  7. (6, 124.1, 5.2, None, 'F', None),
  8. (7, 129.2, 5.3, 42, 'M', 76000),
  9. ], ['id', 'weight', 'height', 'age', 'gender', 'income'])
  10. df_miss.show()
  11. +---+------+------+----+------+------+
  12. | id|weight|height| age|gender|income|
  13. +---+------+------+----+------+------+
  14. | 1| 143.5| 5.6| 28| M|100000|
  15. | 2| 167.2| 5.4| 45| M| null|
  16. | 3| null| 5.2|null| null| null|
  17. | 4| 144.5| 5.9| 33| M| null|
  18. | 5| 133.2| 5.7| 54| F| null|
  19. | 6| 124.1| 5.2|null| F| null|
  20. | 7| 129.2| 5.3| 42| M| 76000|
  21. +---+------+------+----+------+------+

统计每行缺失值的个数

  1. df_miss.rdd.\
  2. map(lambda row:(sum([c == None for c in row]),row['id'])).\
  3. sortByKey(ascending=False).collect() #(缺失值个数,行号)
  4. [(4, 3), (2, 6), (1, 2), (1, 4), (1, 5), (0, 1), (0, 7)]
  5. df_miss.where('id = 3').show()
  6. +---+------+------+----+------+------+
  7. | id|weight|height| age|gender|income|
  8. +---+------+------+----+------+------+
  9. | 3| null| 5.2|null| null| null|
  10. +---+------+------+----+------+------+

每一列缺失的观测数据的百分比

  1. # fn.count(c) / fn.count('*') 为每列字段非缺失的记录数占比
  2. # fn.count('*') *表示列名的位置,指示该列方法计算所有的列
  3. # .agg(* *之前的列指示,.agg(...)方法将该列表处理为一组独立的参数传递给函数
  4. df_miss.agg(*[
  5. (1.00 - (fn.count(c) / fn.count('*'))).alias(c + '_missing') for c in df_miss.columns
  6. ]).show()
  7. +----------+------------------+--------------+------------------+------------------+------------------+
  8. |id_missing| weight_missing|height_missing| age_missing| gender_missing| income_missing|
  9. +----------+------------------+--------------+------------------+------------------+------------------+
  10. | 0.0|0.1428571428571429| 0.0|0.2857142857142857|0.1428571428571429|0.7142857142857143|
  11. +----------+------------------+--------------+------------------+------------------+------------------+

移除income列,因为income列大部分是缺失值,缺失值占比达到71%

  1. df_miss_no_income = df_miss.select([c for c in df_miss.columns if c != 'income'])
  2. df_miss_no_income.show()
  3. +---+------+------+----+------+
  4. | id|weight|height| age|gender|
  5. +---+------+------+----+------+
  6. | 1| 143.5| 5.6| 28| M|
  7. | 2| 167.2| 5.4| 45| M|
  8. | 3| null| 5.2|null| null|
  9. | 4| 144.5| 5.9| 33| M|
  10. | 5| 133.2| 5.7| 54| F|
  11. | 6| 124.1| 5.2|null| F|
  12. | 7| 129.2| 5.3| 42| M|
  13. +---+------+------+----+------+

使用.dropna(…) 移除方法,thresh(设定移除数的阈值)

  1. df_miss_no_income.dropna(thresh=3).show()
  2. +---+------+------+----+------+
  3. | id|weight|height| age|gender|
  4. +---+------+------+----+------+
  5. | 1| 143.5| 5.6| 28| M|
  6. | 2| 167.2| 5.4| 45| M|
  7. | 4| 144.5| 5.9| 33| M|
  8. | 5| 133.2| 5.7| 54| F|
  9. | 6| 124.1| 5.2|null| F|
  10. | 7| 129.2| 5.3| 42| M|
  11. +---+------+------+----+------+

使用.fillna(…) 方法,填充观测数据.

  1. means = df_miss_no_income.agg(*[
  2. fn.mean(c).alias(c) for c in df_miss_no_income.columns if c != 'gender']).toPandas().to_dict('records')[0]
  3. # gender单独处理
  4. means['gender'] = 'missing'
  5. df_miss_no_income.fillna(means).show()
  6. +---+-------------+------+---+-------+
  7. | id| weight|height|age| gender|
  8. +---+-------------+------+---+-------+
  9. | 1| 143.5| 5.6| 28| M|
  10. | 2| 167.2| 5.4| 45| M|
  11. | 3|140.283333333| 5.2| 40|missing|
  12. | 4| 144.5| 5.9| 33| M|
  13. | 5| 133.2| 5.7| 54| F|
  14. | 6| 124.1| 5.2| 40| F|
  15. | 7| 129.2| 5.3| 42| M|
  16. +---+-------------+------+---+-------+

1.3 离群值Outliers(异常数据)

  1. df_outliers = sqlContext.createDataFrame([
  2. (1, 143.5, 5.3, 28),
  3. (2, 154.2, 5.5, 45),
  4. (3, 342.3, 5.1, 99),
  5. (4, 144.5, 5.5, 33),
  6. (5, 133.2, 5.4, 54),
  7. (6, 124.1, 5.1, 21),
  8. (7, 129.2, 5.3, 42),
  9. ], ['id', 'weight', 'height', 'age'])
  10. df_outliers.show()
  11. +---+------+------+---+
  12. | id|weight|height|age|
  13. +---+------+------+---+
  14. | 1| 143.5| 5.3| 28|
  15. | 2| 154.2| 5.5| 45|
  16. | 3| 342.3| 5.1| 99|
  17. | 4| 144.5| 5.5| 33|
  18. | 5| 133.2| 5.4| 54|
  19. | 6| 124.1| 5.1| 21|
  20. | 7| 129.2| 5.3| 42|
  21. +---+------+------+---+

使用之前列出的定义来标记离群值

  1. # approxQuantile(col, probabilities, relativeError) New in version 2.0.
  2. cols = ['weight', 'height', 'age']
  3. bounds = {}
  4. for col in cols:
  5. quantiles = df_outliers.approxQuantile(col, [0.25, 0.75], 0.05)
  6. IQR = quantiles[1] - quantiles[0]
  7. bounds[col] = [
  8. quantiles[0] - 1.5 * IQR,
  9. quantiles[1] + 1.5 * IQR
  10. ]

bounds字典保存了每个特征的上下界限

  1. bounds
  2. {'age': [9.0, 51.0],
  3. 'height': [4.8999999995, 5.6],
  4. 'weight': [115.0, 146.8499999997]}

标记样本数据的离群值

  1. # 按位或运算符:只要对应的两个二进位有一个为1时,结果位就为1。
  2. outliers = df_outliers.select(*['id'] + [((df_outliers[c] < bounds[c][0]) | (df_outliers[c] > bounds[c][1])).alias(c + '_o') \
  3. for c in cols \
  4. ]\
  5. )
  6. outliers.show()
  7. +---+--------+--------+-----+
  8. | id|weight_o|height_o|age_o|
  9. +---+--------+--------+-----+
  10. | 1| false| false|false|
  11. | 2| true| false|false|
  12. | 3| true| false| true|
  13. | 4| false| false|false|
  14. | 5| false| false| true|
  15. | 6| false| false|false|
  16. | 7| false| false|false|
  17. +---+--------+--------+-----+

列出weight和age与其他分部明显不同的部分(离群值)

  1. df_outliers = df_outliers.join(outliers, on='id')
  2. df_outliers.filter('weight_o').select('id', 'weight').show()
  3. df_outliers.filter('age_o').select('id', 'age').show()
  4. +---+------+
  5. | id|weight|
  6. +---+------+
  7. | 3| 342.3|
  8. | 2| 154.2|
  9. +---+------+
  10. +---+---+
  11. | id|age|
  12. +---+---+
  13. | 5| 54|
  14. | 3| 99|
  15. +---+---+

通过上述方法即可获得快速清洗数据(重复值、缺失值和离群值)

2 熟悉理解你的数据

2.1描述性统计——EDA

加载数据并且转换为Spark DatFrame

  1. # pyspark.sql.types显示了所有的我们可以使用的数据类型,IntegerType()和FloatType()
  2. from pyspark.sql import types
  3. fraud = sc.textFile('file:///root/ydzhao/PySpark/Chapter04/ccFraud.csv.gz') # 信用卡欺诈数据集
  4. header = fraud.first()
  5. header
  6. '"custID","gender","state","cardholder","balance","numTrans","numIntlTrans","creditLine","fraudRisk"'
  7. fraud.take(3)
  8. ['"custID","gender","state","cardholder","balance","numTrans","numIntlTrans","creditLine","fraudRisk"',
  9. '1,1,35,1,3000,4,14,2,0',
  10. '2,2,2,1,0,9,0,18,0']
  11. fraud.count()
  12. 10000001
  13. # (1) 去除标题行数据,每个元素转换成整型Integer,还是RDD
  14. fraud = fraud.filter(lambda row: row != header).map(lambda row: [int(x) for x in row.split(',')])
  15. fraud.take(3)
  16. [[1, 1, 35, 1, 3000, 4, 14, 2, 0],
  17. [2, 2, 2, 1, 0, 9, 0, 18, 0],
  18. [3, 2, 2, 1, 0, 27, 9, 16, 0]]
  19. # (2) 创建DataFrame模式
  20. # h[1:-1]代表第一行到最后一行
  21. schema = [
  22. *[
  23. types.StructField(h[1:-1], types.IntegerType(), True) for h in header.split(',')
  24. ]
  25. ]
  26. schema = types.StructType(schema)
  27. # (3) 创建DataFrame
  28. # spark2.0+
  29. # fraud_df = spark.createDataFrame(fraud, schema)
  30. # spark1.6.2
  31. fraud_df = sqlContext.createDataFrame(fraud, schema)
  32. fraud_df.printSchema()
  33. root
  34. |-- custID: integer (nullable = true)
  35. |-- gender: integer (nullable = true)
  36. |-- state: integer (nullable = true)
  37. |-- cardholder: integer (nullable = true)
  38. |-- balance: integer (nullable = true)
  39. |-- numTrans: integer (nullable = true)
  40. |-- numIntlTrans: integer (nullable = true)
  41. |-- creditLine: integer (nullable = true)
  42. |-- fraudRisk: integer (nullable = true)
  43. fraud_df.show()
  44. +------+------+-----+----------+-------+--------+------------+----------+---------+
  45. |custID|gender|state|cardholder|balance|numTrans|numIntlTrans|creditLine|fraudRisk|
  46. +------+------+-----+----------+-------+--------+------------+----------+---------+
  47. | 1| 1| 35| 1| 3000| 4| 14| 2| 0|
  48. | 2| 2| 2| 1| 0| 9| 0| 18| 0|
  49. | 3| 2| 2| 1| 0| 27| 9| 16| 0|
  50. | 4| 1| 15| 1| 0| 12| 0| 5| 0|
  51. | 5| 1| 46| 1| 0| 11| 16| 7| 0|
  52. | 6| 2| 44| 2| 5546| 21| 0| 13| 0|
  53. | 7| 1| 3| 1| 2000| 41| 0| 1| 0|
  54. | 8| 1| 10| 1| 6016| 20| 3| 6| 0|
  55. | 9| 2| 32| 1| 2428| 4| 10| 22| 0|
  56. | 10| 1| 23| 1| 0| 18| 56| 5| 0|
  57. | 11| 1| 46| 1| 4601| 54| 0| 4| 0|
  58. | 12| 1| 10| 1| 3000| 20| 0| 2| 0|
  59. | 13| 1| 6| 1| 0| 45| 2| 4| 0|
  60. | 14| 2| 38| 1| 9000| 41| 3| 8| 0|
  61. | 15| 1| 27| 1| 5227| 60| 0| 17| 0|
  62. | 16| 1| 44| 1| 0| 22| 0| 5| 0|
  63. | 17| 2| 18| 1| 13970| 20| 0| 13| 0|
  64. | 18| 1| 35| 1| 3113| 13| 6| 8| 0|
  65. | 19| 1| 5| 1| 9000| 20| 2| 8| 0|
  66. | 20| 2| 31| 1| 1860| 21| 10| 8| 0|
  67. +------+------+-----+----------+-------+--------+------------+----------+---------+
  68. only showing top 20 rows

使用.groupby(…) 方法计算性别列的使用频率 我们所面对的样本一个性别比例失衡的样本.

  1. fraud_df.groupby('gender').count().show()
  2. +------+-------+
  3. |gender| count|
  4. +------+-------+
  5. | 1|6178231|
  6. | 2|3821769|
  7. +------+-------+

使用describe()方法进行数值统计

  1. desc = fraud_df.describe(['balance', 'numTrans', 'numIntlTrans'])
  2. desc.show()
  3. +-------+-----------------+------------------+-----------------+
  4. |summary| balance| numTrans| numIntlTrans|
  5. +-------+-----------------+------------------+-----------------+
  6. | count| 10000000| 10000000| 10000000|
  7. | mean| 4109.9199193| 28.9351871| 4.0471899|
  8. | stddev|3996.847309737077|26.553781024522852|8.602970115863767|
  9. | min| 0| 0| 0|
  10. | max| 41485| 100| 60|
  11. +-------+-----------------+------------------+-----------------+

检查balance偏度(skewness)

  1. fraud_df.agg({'balance': 'skewness'}).show()
  2. +------------------+
  3. | skewness(balance)|
  4. +------------------+
  5. |1.1818315552996432|
  6. +------------------+

2.2 相关性

目前 .corr()方法仅仅支持Pearson相关性系数,且两两相关性。
当然你可以把Spark DataFrame 转换为Python的DataFrame之后,你就可以随便怎么弄了,但是开销可能会很大,由数据量决定。期待后续版本的更新。

  1. fraud_df.corr('balance', 'numTrans')

创建相关系数矩阵

  1. numerical = ['balance', 'numTrans', 'numIntlTrans']
  2. n_numerical = len(numerical)
  3. corr = []
  4. for i in range(0, n_numerical):
  5. temp = [None] * i
  6. for j in range(i, n_numerical):
  7. temp.append(fraud_df.corr(numerical[i], numerical[j]))
  8. corr.append(temp)
  9. corr
  10. [[1.0, 0.00044523140172659576, 0.00027139913398184604],
  11. [None, 1.0, -0.0002805712819816179],
  12. [None, None, 1.0]]

2.3 数据可视化

加载matplotlib 和 bokeh包,只会调用python的解释器

  1. %matplotlib inline
  2. import matplotlib.pyplot as plt
  3. plt.style.use('ggplot')
  4. import bokeh.charts as chrt
  5. from bokeh.io import output_notebook
  6. output_notebook()
  7. Loading BokehJS ...

2.3.1 直方图Histograms

方法一 聚集工作节点中的数据并返回一个汇总bins列表和直方图每个bin中的计数给驱动(适用大数据集)

(1) 先对数据进行聚合

  1. hists = fraud_df.select('balance').rdd.flatMap(lambda row: row).histogram(20)
  2. type(hists)
  3. tuple

(2)
使用matplotlib绘制直方图

  1. data = {
  2. 'bins': hists[0][:-1],
  3. 'freq': hists[1]
  4. }
  5. fig = plt.figure(figsize=(12,9))
  6. ax = fig.add_subplot(1, 1, 1)
  7. ax.bar(data['bins'], data['freq'], width=2000)
  8. ax.set_title('Histogram of balance')
  9. plt.savefig('balance.png', dpi=300)

07-基于PySpark大规模数据预处理 - 图1

使用Bokeh绘制直方图

  1. b_hist = chrt.Bar(data, values='freq', label='bins', title='Histogram of balance')
  2. chrt.show(b_hist)

方法二 所有数据给驱动程序(适用于小数据集) 当然方法一是性能更好的

  1. data_driver = {'obs': fraud_df.select('balance').rdd.flatMap(lambda row: row).collect()}
  2. fig = plt.figure(figsize=(12,9))
  3. ax = fig.add_subplot(1, 1, 1)
  4. ax.hist(data_driver['obs'], bins=20)
  5. ax.set_title('Histogram of balance using .hist()')
  6. plt.savefig('balance_hist.png', dpi=300)

07-基于PySpark大规模数据预处理 - 图2

使用Bokeh绘制直方图

  1. b_hist_driver = chrt.Histogram(data_driver, values='obs', title='Histogram of balance using .Histogram()', bins=20)
  2. chrt.show(b_hist_driver)

2.3.2 特征值之间的交互

10000000条数据先抽样0.02%

  1. data_sample = fraud_df.sampleBy('gender', {1: 0.0002, 2: 0.0002}).select(numerical)
  2. data_sample.show()
  3. +-------+--------+------------+
  4. |balance|numTrans|numIntlTrans|
  5. +-------+--------+------------+
  6. | 0| 15| 0|
  7. | 4000| 3| 0|
  8. | 6000| 21| 50|
  9. | 0| 28| 10|
  10. | 8630| 100| 0|
  11. | 4000| 17| 0|
  12. | 5948| 76| 0|
  13. | 3000| 9| 4|
  14. | 0| 5| 0|
  15. | 1588| 55| 0|
  16. | 3882| 87| 0|
  17. | 1756| 12| 0|
  18. | 4000| 4| 0|
  19. | 0| 10| 0|
  20. | 5000| 17| 0|
  21. | 4188| 50| 0|
  22. | 3141| 2| 1|
  23. | 8000| 52| 5|
  24. | 9000| 40| 0|
  25. | 2423| 11| 1|
  26. +-------+--------+------------+
  27. only showing top 20 rows
  28. data_multi = dict([
  29. (x, data_sample.select(x).rdd.flatMap(lambda row: row).collect())
  30. for x in numerical
  31. ])
  32. sctr = chrt.Scatter(data_multi, x='balance', y='numTrans')
  33. chrt.show(sctr)