创建数据集

创建Range数据集

  1. # ========RDD========
  2. sc.range(1,100)
  3. # PythonRDD[55] at RDD at PythonRDD.scala:53
  4. sc.range(1,3).collect()
  5. # [1, 2]
  6. # =====DataFrame=====
  7. s.range(1,100)
  8. # DataFrame[id: bigint]
  9. s.range(1,3).collect()
  10. # [Row(id=1), Row(id=2)]
  11. s.range(1,1000000).selectExpr('id % 3')
  12. # DataFrame[(id % 3): bigint]
  13. from pyspark.sql.functions import col, floor, ceil
  14. s.range(1,100).select(ceil(col('id') / 3))
  15. # DataFrame[CEIL((id / 3)): bigint]
  16. s.range(1,3).union(s.range(100,103)).collect()
  17. # [Row(id=1), Row(id=2), Row(id=100), Row(id=101), Row(id=102)]

读取文件到RDD

不支持SQL语法操作

  1. sc.textFile(filepath)

读取文件到DataFrame

支持SQL,强类型,可以用pyspark.sql.types里面的类型定义

  1. from pyspark.sql.types import IntegerType, StructField, StructType,StringType
  2. schema = StructType([
  3. StructField("id", IntegerType(), True),
  4. StructField("age", IntegerType(), True),
  5. StructField("name", StringType(), True),
  6. StructField("score", IntegerType(), True),])
  7. schema = """`id` INT,
  8. `age` INT,
  9. `name` STRING,
  10. `score` INT,
  11. """
  12. df=spark.read.csv(filepath,schema=schema)

上面两种schema定义方法都可以

显示数据

预览

  1. df.show(10)

默认是10行,也可以指定显示多少行,显示的时候类似mysql-client会有ASCII做出的边框

取出部分数据使用

  1. df.take(10)

需要指定行数,取出到python中是list类型,里面是Row的对象

取出完整数据使用

  1. df.collect()

类型同take,但是会完成全部计算,提取完整的df出来

截取部分数据继续在Executor中计算

  1. df.limit(20)

不提取数据,作为transform继续在executor中,直到action触发才计算

使用SQL处理数据

选择列

  1. df.select('id','age')
  2. df.selectExpr('max(id) as m')
  3. df.selectExpr('id+1 as id')

使用 .select 是选择列名,而 .selectExpr 可以写SQL函数,就像在写SQL里面的 select 语句类似。

使用UDF

.select 或者 .withColumn 后面要写python中的函数名
.selectExpr 或者 .sql 后面要写executor中注册的(java/scala)函数名

可以使用python处理DataFrame中的内容,可参考链接

参考链接
使用selectExpr进行sql操作的例子
RDD与DF速度对比
钨丝计划与催化器介绍