1. 从dataframe arraytype列获取前n个元素

有一个Spark dataframe ,其中的行为

  1. df.show()
  2. +---+---------+
  3. | id| letters|
  4. +---+---------+
  5. | 1|[a, b, c]|
  6. | 2|[d, e, f]|
  7. | 3|[g, h, i]|
  8. +---+---------+
  9. # 记住这个东西,数据类型很重要,用做定位、分类针对性解决问题
  10. df.printSchema()
  11. |-- id: long (nullable = true)
  12. |-- letters: array (nullable = true)
  13. | |-- element: string (containsNull = true)

只想保留 array列的前2个元素

  1. df.show()
  2. +---+---------+
  3. | id| letters|
  4. +---+-----+
  5. | 1|[a, b]|
  6. | 2|[d, e]|
  7. | 3|[g, h]|
  8. +---+------+

操作:
可以使用方括号按索引访问Letters列中的元素,并将其包装在对pyspark.sql.functions.array()的调用中以创建新的arrayType列。

  1. import pyspark.sql.functions as f
  2. df.withColumn("first_two", f.array([f.col("letters")[0], f.col("letters")[1]])).show()
  3. +---+---------+---------+
  4. | id| letters|first_two|
  5. +---+---------+---------+
  6. | 1|[a, b, c]| [a, b]|
  7. | 2|[d, e, f]| [d, e]|
  8. | 3|[g, h, i]| [g, h]|
  9. +---+---------+---------+
  10. # 或者如果有太多的索引要列出,可以使用列表理解:
  11. df.withColumn("first_two", f.array([f.col("letters")[i] for i in range(2)])).show()
  12. +---+---------+---------+
  13. | id| letters|first_two|
  14. +---+---------+---------+
  15. | 1|[a, b, c]| [a, b]|
  16. | 2|[d, e, f]| [d, e]|
  17. | 3|[g, h, i]| [g, h]|
  18. +---+---------+---------+

2. 将一列string转为一列array

这里用的是分词

  1. #原DataFrame
  2. col1 | my_str_col
  3. ----- +-----------
  4. 1 | qxl《电力设备》杂志
  5. 2 | qxl《电力设备》杂志
  6. # 期望结果 1
  7. col1 | my_str_col
  8. ----- +-----------
  9. 1 | [q, x, l, , 电, 力, 设, 备, , 杂, 志]
  10. 2 | [q, x, l, , 电, 力, 设, 备, , 杂, 志]
  11. # 期望结果 2
  12. col1 | my_str_col
  13. ----- +-----------
  14. 1 | [qxl, 电, 力, 设, 备, 杂, 志]
  15. 2 | [qxl, 电, 力, 设, 备, 杂, 志]
  16. # 实现
  17. # 注意jieba分词的文件路径和导入方式
  18. import re
  19. import jieba
  20. from pyspark.sql.functions import UserDefinedFunction
  21. from pyspark.sql.types import ArrayType, StringType
  22. from pyspark.sql.functions import udf
  23. # 实现期待结果 1
  24. @udf(returnType=ArrayType(StringType()))
  25. def splitToken(cols):
  26. """
  27. 1. 输入参数长度可变,字段可变
  28. 2. 正则去除特殊字符
  29. """
  30. col=""
  31. for i in cols:
  32. if i is not None:
  33. col+=i
  34. col = re.sub('\W+', ' ', col).replace("_", '').replace(" ", '')
  35. if not jieba.dt.initialized:
  36. jieba.load_userdict("/opt/cloudera/parcels/Anaconda-5.2.0/envs/py3.6/lib/python3.6/site-packages/jieba/dict.txt")
  37. return [i for i in col] # 视情况修改返回值格式
  38. # 实现期待结果 2
  39. def splitEN_CZ(oneStr):
  40. """分隔中英文字符串"""
  41. uncn = re.compile(r'[^\u4e00-\u9fa5]')
  42. en = "".join(uncn.findall(oneStr))
  43. en = re.sub('\W+', ' ', en).replace("_", '')
  44. uncz = re.compile(r'[\u4e00-\u9fa5]')
  45. cz = "".join(uncz.findall(oneStr))
  46. cz = re.sub('\W+', ' ', cz).replace("_", '')
  47. tmp =[j for j in cz] + [j for j in en.split()]
  48. return tmp
  49. @udf(returnType=ArrayType(StringType()))
  50. def splitToken(cols):
  51. """
  52. my_func()函数的改进
  53. 1. 输入参数长度可变,字段可变
  54. 2. 正则去除特殊字符
  55. """
  56. col=""
  57. for i in cols:
  58. if i is not None:
  59. col+= " ".join(splitEN_CZ(i))
  60. col = re.sub('\W+', ' ', col).replace("_", '')
  61. if not jieba.dt.initialized:
  62. jieba.load_userdict("/opt/cloudera/parcels/Anaconda-5.2.0/envs/py3.6/lib/python3.6/site-packages/jieba/dict.txt")
  63. return col.split() # 视情况修改返回值格式
  64. df = df.withColumn('my_array_col', splitToken(array('my_str_col')) )
  65. # 同时对多列操作
  66. df = df.withColumn('my_array_col',splitToken(array(col("my_str_col1"), col("my_str_col2"))))

3. 将一列string转为多列string

更多例子查看https://stackoverflow.com/questions/39235704/split-spark-dataframe-string-column-into-multiple-columns
使用 函数 pyspark.sql.functions.split()

  1. # 原DataFrame
  2. col1 | my_str_col
  3. -----+-----------
  4. 18 | 856-yygrm
  5. 201 | 777-psgdg
  6. # 期待结果
  7. col1 | my_str_col | _col3 | _col4
  8. -----+------------+-------+------
  9. 18 | 856-yygrm | 856 | yygrm
  10. 201 | 777-psgdg | 777 | psgdg
  11. # 操作
  12. split_col = pyspark.sql.functions.split(df['my_str_col'], '-')
  13. df = df.withColumn('NAME1', split_col.getItem(0))
  14. df = df.withColumn('NAME2', split_col.getItem(1))
  15. #结果
  16. col1 | my_str_col | NAME1 | NAME2
  17. -----+------------+-------+------
  18. 18 | 856-yygrm | 856 | yygrm
  19. 201 | 777-psgdg | 777 | psgdg

4. 对两个df实现行的拼接(第2列接到第1列末尾)

  1. df1=df.select(df['age'],df['name'])
  2. df2=df.select(df['age'],df['weigh'])
  3. df1.unionAll(df2).show()

5. 增加一列有序id,从0计数

from pyspark.sql.functions import monotonically_increasing_id
df.withColumn(‘id’,monotonically_increasing_id() )

6. 将一个array()扩展成一列

用 explode()

  1. # 数据
  2. |id| label |
  3. |0 |['a','b']|
  4. # 期待
  5. |id| label | after|
  6. |0 |['a','b']| a |
  7. |0 |['a','b']| b |
  8. df.withColumn('after', explode(col('label'))).show()

7. 将一个字符串拆分成每行一个

结合使用explode、split

  1. .withColumn('one_item', explode(split('items', ',')))