1. 从dataframe arraytype列获取前n个元素
有一个Spark dataframe ,其中的行为
df.show()+---+---------+| id| letters|+---+---------+| 1|[a, b, c]|| 2|[d, e, f]|| 3|[g, h, i]|+---+---------+# 记住这个东西,数据类型很重要,用做定位、分类针对性解决问题df.printSchema()|-- id: long (nullable = true)|-- letters: array (nullable = true)| |-- element: string (containsNull = true)
只想保留 array列的前2个元素
df.show()+---+---------+| id| letters|+---+-----+| 1|[a, b]|| 2|[d, e]|| 3|[g, h]|+---+------+
操作:
可以使用方括号按索引访问Letters列中的元素,并将其包装在对pyspark.sql.functions.array()的调用中以创建新的arrayType列。
import pyspark.sql.functions as fdf.withColumn("first_two", f.array([f.col("letters")[0], f.col("letters")[1]])).show()+---+---------+---------+| id| letters|first_two|+---+---------+---------+| 1|[a, b, c]| [a, b]|| 2|[d, e, f]| [d, e]|| 3|[g, h, i]| [g, h]|+---+---------+---------+# 或者如果有太多的索引要列出,可以使用列表理解:df.withColumn("first_two", f.array([f.col("letters")[i] for i in range(2)])).show()+---+---------+---------+| id| letters|first_two|+---+---------+---------+| 1|[a, b, c]| [a, b]|| 2|[d, e, f]| [d, e]|| 3|[g, h, i]| [g, h]|+---+---------+---------+
2. 将一列string转为一列array
这里用的是分词
#原DataFramecol1 | my_str_col----- +-----------1 | qxl《电力设备》杂志2 | qxl《电力设备》杂志# 期望结果 1col1 | my_str_col----- +-----------1 | [q, x, l, , 电, 力, 设, 备, , 杂, 志]2 | [q, x, l, , 电, 力, 设, 备, , 杂, 志]# 期望结果 2col1 | my_str_col----- +-----------1 | [qxl, 电, 力, 设, 备, 杂, 志]2 | [qxl, 电, 力, 设, 备, 杂, 志]# 实现# 注意jieba分词的文件路径和导入方式import reimport jiebafrom pyspark.sql.functions import UserDefinedFunctionfrom pyspark.sql.types import ArrayType, StringTypefrom pyspark.sql.functions import udf# 实现期待结果 1@udf(returnType=ArrayType(StringType()))def splitToken(cols):"""1. 输入参数长度可变,字段可变2. 正则去除特殊字符"""col=""for i in cols:if i is not None:col+=icol = re.sub('\W+', ' ', col).replace("_", '').replace(" ", '')if not jieba.dt.initialized:jieba.load_userdict("/opt/cloudera/parcels/Anaconda-5.2.0/envs/py3.6/lib/python3.6/site-packages/jieba/dict.txt")return [i for i in col] # 视情况修改返回值格式# 实现期待结果 2def splitEN_CZ(oneStr):"""分隔中英文字符串"""uncn = re.compile(r'[^\u4e00-\u9fa5]')en = "".join(uncn.findall(oneStr))en = re.sub('\W+', ' ', en).replace("_", '')uncz = re.compile(r'[\u4e00-\u9fa5]')cz = "".join(uncz.findall(oneStr))cz = re.sub('\W+', ' ', cz).replace("_", '')tmp =[j for j in cz] + [j for j in en.split()]return tmp@udf(returnType=ArrayType(StringType()))def splitToken(cols):"""my_func()函数的改进1. 输入参数长度可变,字段可变2. 正则去除特殊字符"""col=""for i in cols:if i is not None:col+= " ".join(splitEN_CZ(i))col = re.sub('\W+', ' ', col).replace("_", '')if not jieba.dt.initialized:jieba.load_userdict("/opt/cloudera/parcels/Anaconda-5.2.0/envs/py3.6/lib/python3.6/site-packages/jieba/dict.txt")return col.split() # 视情况修改返回值格式df = df.withColumn('my_array_col', splitToken(array('my_str_col')) )# 同时对多列操作df = df.withColumn('my_array_col',splitToken(array(col("my_str_col1"), col("my_str_col2"))))
3. 将一列string转为多列string
更多例子查看https://stackoverflow.com/questions/39235704/split-spark-dataframe-string-column-into-multiple-columns
使用 函数 pyspark.sql.functions.split()
# 原DataFramecol1 | my_str_col-----+-----------18 | 856-yygrm201 | 777-psgdg# 期待结果col1 | my_str_col | _col3 | _col4-----+------------+-------+------18 | 856-yygrm | 856 | yygrm201 | 777-psgdg | 777 | psgdg# 操作split_col = pyspark.sql.functions.split(df['my_str_col'], '-')df = df.withColumn('NAME1', split_col.getItem(0))df = df.withColumn('NAME2', split_col.getItem(1))#结果col1 | my_str_col | NAME1 | NAME2-----+------------+-------+------18 | 856-yygrm | 856 | yygrm201 | 777-psgdg | 777 | psgdg
4. 对两个df实现行的拼接(第2列接到第1列末尾)
df1=df.select(df['age'],df['name'])df2=df.select(df['age'],df['weigh'])df1.unionAll(df2).show()
5. 增加一列有序id,从0计数
from pyspark.sql.functions import monotonically_increasing_id
df.withColumn(‘id’,monotonically_increasing_id() )
6. 将一个array()扩展成一列
用 explode()
# 数据|id| label ||0 |['a','b']|# 期待|id| label | after||0 |['a','b']| a ||0 |['a','b']| b |df.withColumn('after', explode(col('label'))).show()
7. 将一个字符串拆分成每行一个
结合使用explode、split
.withColumn('one_item', explode(split('items', ',')))
