Official doc DataFrame

CSDN DF高阶操作

pyspark.sql.DataFrame
A distributed collection of data grouped into named columns

Create

  1. # 缓存
  2. df.cache()

Read

  1. # 设定别名
  2. df_as1 = df.alias("df_as1")
  3. df_as2 = df.alias("df_as2")
  4. joined_df = df_as1.join(df_as2, col("df_as1.name") == col("df_as2.name"), 'inner')
  5. # prints
  6. show(n=20, truncate=True, vertical=False)
  7. # 列名称
  8. df.columns
  9. # 返回所有列名称和数据类型
  10. df.dtypes
  11. # 返回schema
  12. df.printSchema()
  13. df.schema
  14. # 返回所有records
  15. df.collect()
  16. # 返回第一行
  17. df.first()
  18. df.head(n=1)
  19. df.take(1)
  20. # 行数
  21. df.count()
  22. # 限制输出结果数量
  23. df.limit(5).show()
  24. # computes statistics for numeric and string columns (max, min, mean, count, stddev)
  25. df.describe(['age']).show()
  26. # 去重行,返回新df
  27. df.distinct().count()
  28. # 选择某行
  29. df.select('*').show()
  30. df.select('name').show()
  31. # 排序
  32. df.sort('age').desc().collect()

1. join(other, on=None, how=’inner’)

Parameters

  • other - right side of the join
  • on - column names (string, list, expr)
  • how - str, default is “inner”. One of inner, outer, left_outer, right_outer, leftsemi
    • left_outer: keep all columns in the left df, and right one join it.
      1. df.join(df1, ['name', 'age']).select(df.name, df.age).collect()

2. filter()

  1. # 多条件过滤
  2. similarity.filter((similarity.dot>0.1) & ((similarity.title_1==book) | (similarity.title_2==book))).sort('dot', ascending=False).collect()

Update

1. withColumn(colName, col)

返回新的df,添加一列或者,替换存在的列

  • concat() 连接两列 ```python df.withColumn(‘age2’, df.age+2).collect()

列的改名,返回新df

df.withColumnRenamed(‘age’, ‘age2’).collect()

from pyspark.sql.functions import concat df.withColumn(‘new_col’, concat(df[‘name’], df[‘age’]))

  1. <a name="DnMzA"></a>
  2. ### 2. groupBy(*col)
  3. - groupby() is an alias
  4. - agg() is the shorthand for `df.groupBy.agg()`
  5. Groups the DataFrame using the specified columns, so we can run aggregation on them
  6. ```python
  7. df.groupBy().avg().collect()
  8. sorted(df.groupBy('name').agg({'age': 'mean'}).collect())

3. orderBy(cols, *kwargs)

返回一个新的df,根据指定列排序,默认升序

  1. df.sort(df.age.desc()).collect()
  2. df.orderBy(df.age.desc()).collect()
  3. df.orderBy(['age', 'name'], ascending=[0, 1]).collect()
  1. # 对每一行操作func
  2. df.foreach(func)
  3. # 转换特定行列为df
  4. df.toDF('f1', 'f2').collect()
  5. # 转换为pandas.df
  6. df.toPandas()

Delete

drop 返回新df,删除指定列

  1. df.drop('age').collect()
  2. df.drop(df.age)
  3. df.join(df2, 'name', 'inner').drop('age').collect()

dropDuplicates(subset=None)

别名:drop_duplicates()
返回新的被移除的重复行DataFrame,可选指定列的去重

  1. from pyspark.sql import Row
  2. df = sc.parallelize([ \
  3. Row(name='Alice', age=5, height=80), \
  4. Row(name='Alice', age=5, height=80), \
  5. Row(name='Alice', age=10, height=80)]).toDF()
  6. df.dropDuplicates().show()
  7. df.dropDuplicates(['name']).show()