pyspark.sql.DataFrame
A distributed collection of data grouped into named columns
Create
# 缓存df.cache()
Read
# 设定别名df_as1 = df.alias("df_as1")df_as2 = df.alias("df_as2")joined_df = df_as1.join(df_as2, col("df_as1.name") == col("df_as2.name"), 'inner')# printsshow(n=20, truncate=True, vertical=False)# 列名称df.columns# 返回所有列名称和数据类型df.dtypes# 返回schemadf.printSchema()df.schema# 返回所有recordsdf.collect()# 返回第一行df.first()df.head(n=1)df.take(1)# 行数df.count()# 限制输出结果数量df.limit(5).show()# computes statistics for numeric and string columns (max, min, mean, count, stddev)df.describe(['age']).show()# 去重行,返回新dfdf.distinct().count()# 选择某行df.select('*').show()df.select('name').show()# 排序df.sort('age').desc().collect()
1. join(other, on=None, how=’inner’)
Parameters
- other - right side of the join
- on - column names (string, list, expr)
- how - str, default is “inner”. One of inner, outer, left_outer, right_outer, leftsemi
- left_outer: keep all columns in the left df, and right one join it.
df.join(df1, ['name', 'age']).select(df.name, df.age).collect()
- left_outer: keep all columns in the left df, and right one join it.
2. filter()
# 多条件过滤similarity.filter((similarity.dot>0.1) & ((similarity.title_1==book) | (similarity.title_2==book))).sort('dot', ascending=False).collect()
Update
1. withColumn(colName, col)
返回新的df,添加一列或者,替换存在的列
- concat() 连接两列 ```python df.withColumn(‘age2’, df.age+2).collect()
列的改名,返回新df
df.withColumnRenamed(‘age’, ‘age2’).collect()
from pyspark.sql.functions import concat df.withColumn(‘new_col’, concat(df[‘name’], df[‘age’]))
<a name="DnMzA"></a>### 2. groupBy(*col)- groupby() is an alias- agg() is the shorthand for `df.groupBy.agg()`Groups the DataFrame using the specified columns, so we can run aggregation on them```pythondf.groupBy().avg().collect()sorted(df.groupBy('name').agg({'age': 'mean'}).collect())
3. orderBy(cols, *kwargs)
返回一个新的df,根据指定列排序,默认升序
df.sort(df.age.desc()).collect()df.orderBy(df.age.desc()).collect()df.orderBy(['age', 'name'], ascending=[0, 1]).collect()
# 对每一行操作funcdf.foreach(func)# 转换特定行列为dfdf.toDF('f1', 'f2').collect()# 转换为pandas.dfdf.toPandas()
Delete
drop 返回新df,删除指定列
df.drop('age').collect()df.drop(df.age)df.join(df2, 'name', 'inner').drop('age').collect()
dropDuplicates(subset=None)
别名:drop_duplicates()
返回新的被移除的重复行DataFrame,可选指定列的去重
from pyspark.sql import Rowdf = sc.parallelize([ \Row(name='Alice', age=5, height=80), \Row(name='Alice', age=5, height=80), \Row(name='Alice', age=10, height=80)]).toDF()df.dropDuplicates().show()df.dropDuplicates(['name']).show()
