创建数据集
创建Range数据集
# ========RDD========sc.range(1,100)# PythonRDD[55] at RDD at PythonRDD.scala:53sc.range(1,3).collect()# [1, 2]# =====DataFrame=====s.range(1,100)# DataFrame[id: bigint]s.range(1,3).collect()# [Row(id=1), Row(id=2)]s.range(1,1000000).selectExpr('id % 3')# DataFrame[(id % 3): bigint]from pyspark.sql.functions import col, floor, ceils.range(1,100).select(ceil(col('id') / 3))# DataFrame[CEIL((id / 3)): bigint]s.range(1,3).union(s.range(100,103)).collect()# [Row(id=1), Row(id=2), Row(id=100), Row(id=101), Row(id=102)]
读取文件到RDD
不支持SQL语法操作
sc.textFile(filepath)
读取文件到DataFrame
支持SQL,强类型,可以用pyspark.sql.types里面的类型定义
from pyspark.sql.types import IntegerType, StructField, StructType,StringTypeschema = StructType([StructField("id", IntegerType(), True),StructField("age", IntegerType(), True),StructField("name", StringType(), True),StructField("score", IntegerType(), True),])schema = """`id` INT,`age` INT,`name` STRING,`score` INT,"""df=spark.read.csv(filepath,schema=schema)
显示数据
预览
df.show(10)
默认是10行,也可以指定显示多少行,显示的时候类似mysql-client会有ASCII做出的边框
取出部分数据使用
df.take(10)
需要指定行数,取出到python中是list类型,里面是Row的对象
取出完整数据使用
df.collect()
截取部分数据继续在Executor中计算
df.limit(20)
不提取数据,作为transform继续在executor中,直到action触发才计算
使用SQL处理数据
选择列
df.select('id','age')df.selectExpr('max(id) as m')df.selectExpr('id+1 as id')
使用 .select 是选择列名,而 .selectExpr 可以写SQL函数,就像在写SQL里面的 select 语句类似。
使用UDF
.select 或者 .withColumn 后面要写python中的函数名.selectExpr 或者 .sql 后面要写executor中注册的(java/scala)函数名
可以使用python处理DataFrame中的内容,可参考链接
