pyspark.sql.DataFrame
A distributed collection of data grouped into named columns
Create
# 缓存
df.cache()
Read
# 设定别名
df_as1 = df.alias("df_as1")
df_as2 = df.alias("df_as2")
joined_df = df_as1.join(df_as2, col("df_as1.name") == col("df_as2.name"), 'inner')
# prints
show(n=20, truncate=True, vertical=False)
# 列名称
df.columns
# 返回所有列名称和数据类型
df.dtypes
# 返回schema
df.printSchema()
df.schema
# 返回所有records
df.collect()
# 返回第一行
df.first()
df.head(n=1)
df.take(1)
# 行数
df.count()
# 限制输出结果数量
df.limit(5).show()
# computes statistics for numeric and string columns (max, min, mean, count, stddev)
df.describe(['age']).show()
# 去重行,返回新df
df.distinct().count()
# 选择某行
df.select('*').show()
df.select('name').show()
# 排序
df.sort('age').desc().collect()
1. join(other, on=None, how=’inner’)
Parameters
- other - right side of the join
- on - column names (string, list, expr)
- how - str, default is “inner”. One of inner, outer, left_outer, right_outer, leftsemi
- left_outer: keep all columns in the left df, and right one join it.
df.join(df1, ['name', 'age']).select(df.name, df.age).collect()
- left_outer: keep all columns in the left df, and right one join it.
2. filter()
# 多条件过滤
similarity.filter((similarity.dot>0.1) & ((similarity.title_1==book) | (similarity.title_2==book))).sort('dot', ascending=False).collect()
Update
1. withColumn(colName, col)
返回新的df,添加一列或者,替换存在的列
- concat() 连接两列 ```python df.withColumn(‘age2’, df.age+2).collect()
列的改名,返回新df
df.withColumnRenamed(‘age’, ‘age2’).collect()
from pyspark.sql.functions import concat df.withColumn(‘new_col’, concat(df[‘name’], df[‘age’]))
<a name="DnMzA"></a>
### 2. groupBy(*col)
- groupby() is an alias
- agg() is the shorthand for `df.groupBy.agg()`
Groups the DataFrame using the specified columns, so we can run aggregation on them
```python
df.groupBy().avg().collect()
sorted(df.groupBy('name').agg({'age': 'mean'}).collect())
3. orderBy(cols, *kwargs)
返回一个新的df,根据指定列排序,默认升序
df.sort(df.age.desc()).collect()
df.orderBy(df.age.desc()).collect()
df.orderBy(['age', 'name'], ascending=[0, 1]).collect()
# 对每一行操作func
df.foreach(func)
# 转换特定行列为df
df.toDF('f1', 'f2').collect()
# 转换为pandas.df
df.toPandas()
Delete
drop 返回新df,删除指定列
df.drop('age').collect()
df.drop(df.age)
df.join(df2, 'name', 'inner').drop('age').collect()
dropDuplicates(subset=None)
别名:drop_duplicates()
返回新的被移除的重复行DataFrame,可选指定列的去重
from pyspark.sql import Row
df = sc.parallelize([ \
Row(name='Alice', age=5, height=80), \
Row(name='Alice', age=5, height=80), \
Row(name='Alice', age=10, height=80)]).toDF()
df.dropDuplicates().show()
df.dropDuplicates(['name']).show()