创建数据集
创建Range数据集
# ========RDD========
sc.range(1,100)
# PythonRDD[55] at RDD at PythonRDD.scala:53
sc.range(1,3).collect()
# [1, 2]
# =====DataFrame=====
s.range(1,100)
# DataFrame[id: bigint]
s.range(1,3).collect()
# [Row(id=1), Row(id=2)]
s.range(1,1000000).selectExpr('id % 3')
# DataFrame[(id % 3): bigint]
from pyspark.sql.functions import col, floor, ceil
s.range(1,100).select(ceil(col('id') / 3))
# DataFrame[CEIL((id / 3)): bigint]
s.range(1,3).union(s.range(100,103)).collect()
# [Row(id=1), Row(id=2), Row(id=100), Row(id=101), Row(id=102)]
读取文件到RDD
不支持SQL语法操作
sc.textFile(filepath)
读取文件到DataFrame
支持SQL,强类型,可以用pyspark.sql.types里面的类型定义
from pyspark.sql.types import IntegerType, StructField, StructType,StringType
schema = StructType([
StructField("id", IntegerType(), True),
StructField("age", IntegerType(), True),
StructField("name", StringType(), True),
StructField("score", IntegerType(), True),])
schema = """`id` INT,
`age` INT,
`name` STRING,
`score` INT,
"""
df=spark.read.csv(filepath,schema=schema)
显示数据
预览
df.show(10)
默认是10行,也可以指定显示多少行,显示的时候类似mysql-client会有ASCII做出的边框
取出部分数据使用
df.take(10)
需要指定行数,取出到python中是list类型,里面是Row的对象
取出完整数据使用
df.collect()
截取部分数据继续在Executor中计算
df.limit(20)
不提取数据,作为transform继续在executor中,直到action触发才计算
使用SQL处理数据
选择列
df.select('id','age')
df.selectExpr('max(id) as m')
df.selectExpr('id+1 as id')
使用 .select
是选择列名,而 .selectExpr
可以写SQL函数,就像在写SQL里面的 select
语句类似。
使用UDF
.select
或者 .withColumn
后面要写python中的函数名.selectExpr
或者 .sql
后面要写executor中注册的(java/scala)函数名
可以使用python处理DataFrame中的内容,可参考链接