1. 从dataframe arraytype列获取前n个元素
有一个Spark dataframe ,其中的行为
df.show()
+---+---------+
| id| letters|
+---+---------+
| 1|[a, b, c]|
| 2|[d, e, f]|
| 3|[g, h, i]|
+---+---------+
# 记住这个东西,数据类型很重要,用做定位、分类针对性解决问题
df.printSchema()
|-- id: long (nullable = true)
|-- letters: array (nullable = true)
| |-- element: string (containsNull = true)
只想保留 array列的前2个元素
df.show()
+---+---------+
| id| letters|
+---+-----+
| 1|[a, b]|
| 2|[d, e]|
| 3|[g, h]|
+---+------+
操作:
可以使用方括号按索引访问Letters列中的元素,并将其包装在对pyspark.sql.functions.array()的调用中以创建新的arrayType列。
import pyspark.sql.functions as f
df.withColumn("first_two", f.array([f.col("letters")[0], f.col("letters")[1]])).show()
+---+---------+---------+
| id| letters|first_two|
+---+---------+---------+
| 1|[a, b, c]| [a, b]|
| 2|[d, e, f]| [d, e]|
| 3|[g, h, i]| [g, h]|
+---+---------+---------+
# 或者如果有太多的索引要列出,可以使用列表理解:
df.withColumn("first_two", f.array([f.col("letters")[i] for i in range(2)])).show()
+---+---------+---------+
| id| letters|first_two|
+---+---------+---------+
| 1|[a, b, c]| [a, b]|
| 2|[d, e, f]| [d, e]|
| 3|[g, h, i]| [g, h]|
+---+---------+---------+
2. 将一列string转为一列array
这里用的是分词
#原DataFrame
col1 | my_str_col
----- +-----------
1 | qxl《电力设备》杂志
2 | qxl《电力设备》杂志
# 期望结果 1
col1 | my_str_col
----- +-----------
1 | [q, x, l, , 电, 力, 设, 备, , 杂, 志]
2 | [q, x, l, , 电, 力, 设, 备, , 杂, 志]
# 期望结果 2
col1 | my_str_col
----- +-----------
1 | [qxl, 电, 力, 设, 备, 杂, 志]
2 | [qxl, 电, 力, 设, 备, 杂, 志]
# 实现
# 注意jieba分词的文件路径和导入方式
import re
import jieba
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import udf
# 实现期待结果 1
@udf(returnType=ArrayType(StringType()))
def splitToken(cols):
"""
1. 输入参数长度可变,字段可变
2. 正则去除特殊字符
"""
col=""
for i in cols:
if i is not None:
col+=i
col = re.sub('\W+', ' ', col).replace("_", '').replace(" ", '')
if not jieba.dt.initialized:
jieba.load_userdict("/opt/cloudera/parcels/Anaconda-5.2.0/envs/py3.6/lib/python3.6/site-packages/jieba/dict.txt")
return [i for i in col] # 视情况修改返回值格式
# 实现期待结果 2
def splitEN_CZ(oneStr):
"""分隔中英文字符串"""
uncn = re.compile(r'[^\u4e00-\u9fa5]')
en = "".join(uncn.findall(oneStr))
en = re.sub('\W+', ' ', en).replace("_", '')
uncz = re.compile(r'[\u4e00-\u9fa5]')
cz = "".join(uncz.findall(oneStr))
cz = re.sub('\W+', ' ', cz).replace("_", '')
tmp =[j for j in cz] + [j for j in en.split()]
return tmp
@udf(returnType=ArrayType(StringType()))
def splitToken(cols):
"""
my_func()函数的改进
1. 输入参数长度可变,字段可变
2. 正则去除特殊字符
"""
col=""
for i in cols:
if i is not None:
col+= " ".join(splitEN_CZ(i))
col = re.sub('\W+', ' ', col).replace("_", '')
if not jieba.dt.initialized:
jieba.load_userdict("/opt/cloudera/parcels/Anaconda-5.2.0/envs/py3.6/lib/python3.6/site-packages/jieba/dict.txt")
return col.split() # 视情况修改返回值格式
df = df.withColumn('my_array_col', splitToken(array('my_str_col')) )
# 同时对多列操作
df = df.withColumn('my_array_col',splitToken(array(col("my_str_col1"), col("my_str_col2"))))
3. 将一列string转为多列string
更多例子查看https://stackoverflow.com/questions/39235704/split-spark-dataframe-string-column-into-multiple-columns
使用 函数 pyspark.sql.functions.split()
# 原DataFrame
col1 | my_str_col
-----+-----------
18 | 856-yygrm
201 | 777-psgdg
# 期待结果
col1 | my_str_col | _col3 | _col4
-----+------------+-------+------
18 | 856-yygrm | 856 | yygrm
201 | 777-psgdg | 777 | psgdg
# 操作
split_col = pyspark.sql.functions.split(df['my_str_col'], '-')
df = df.withColumn('NAME1', split_col.getItem(0))
df = df.withColumn('NAME2', split_col.getItem(1))
#结果
col1 | my_str_col | NAME1 | NAME2
-----+------------+-------+------
18 | 856-yygrm | 856 | yygrm
201 | 777-psgdg | 777 | psgdg
4. 对两个df实现行的拼接(第2列接到第1列末尾)
df1=df.select(df['age'],df['name'])
df2=df.select(df['age'],df['weigh'])
df1.unionAll(df2).show()
5. 增加一列有序id,从0计数
from pyspark.sql.functions import monotonically_increasing_id
df.withColumn(‘id’,monotonically_increasing_id() )
6. 将一个array()扩展成一列
用 explode()
# 数据
|id| label |
|0 |['a','b']|
# 期待
|id| label | after|
|0 |['a','b']| a |
|0 |['a','b']| b |
df.withColumn('after', explode(col('label'))).show()
7. 将一个字符串拆分成每行一个
结合使用explode、split
.withColumn('one_item', explode(split('items', ',')))