DataFrame 基本概念
ODPS DataFrame中的三种对象:Collection(DataFrame) ,Sequence,Scalar。 这三个对象分别表示表结构(或者二维结构)、列(一维结构)、标量。需要注意的是,这些对象仅在使用 Pandas 数据创建后会包含实际数据, 而在 ODPS 表上创建的对象中并不包含实际的数据,而仅仅包含对这些数据的操作,实质的存储和计算会在 ODPS 中进行。
创建DataFrame
可以通过ODPS表、ODPS分区、Pandas DataFrame及SQLAlchemy table来创建。
from odps.df import DataFrame
# 从ODPS表创建
iris = DataFrame(o.get_table('pyodps_iris'))
iris2 = o.get_table('pyodps_iris').to_df() # 使用表的to_df方法
# 从ODPS分区创建
pt_df = DataFrame(o.get_table('partitioned_table').get_partition('pt=20171111'))
pt_df2 = o.get_table('partitioned_table').get_partition('pt=20171111').to_df() # 使用分区的to_df方法
# 从Pandas DataFrame创建
import pandas as pd
import numpy as np
df = DataFrame(pd.DataFrame(np.arange(9).reshape(3, 3), columns=list('abc')))
# 从sqlalchemy Table创建
engine = sqlalchemy.create_engine('mysql://root:123456@localhost/movielens')
metadata = sqlalchemy.MetaData(bind=engine) # 需要绑定到engine
table = sqlalchemy.Table('top_users', metadata, extend_existing=True, autoload=True)
users = DataFrame(table)
在用Pandas DataFrame初始化时,指定unknown_as_string参数为True,可以将无法识别的列指定为string类型,也可以指定as_type参数,传入一个dict,将指定的列强行转化为指定的类型。
df2 = DataFrame(df, unknown_as_string=True, as_type={'null_col2': 'float'})
Sequence
获取列
可以使用Collection.column_name取出一列,如
df2.col1.head(5)
df2['col1'].head(5)
列类型
ODPS的字段和DataFrame的类型映射关系如下:
ODPS类型 | DataFrame类型 |
---|---|
bigint | int64 |
double | float64 |
string | string |
datetime | datetime |
boolean | boolean |
decimal | decimal |
array |
list |
map |
dict |
list和dict必须填写其包含值的类型,否则会报错。目前 DataFrame 暂不支持 MaxCompute 2.0 中新增的Timestamp及Struct类型,未来的版本会支持。
获取数据类型与数据类型修改
df2.col1.dtype # 获取列col1的数据类型
df2.col1.astype('int') # 修改列col1的数据类型
列名
在DataFrame的计算过程中,一个Sequence必须要有列名,在进行一些运算,如max()之后,DataFrame会自动将其结果集的字段进行命名,但用户也可以自行对列名重命名。
df2.col1.rename('col_rename')
Collection
DataFrame中所有二维数据集上的操作都属于CollectionExpr,可视为一张 ODPS 表或一张电子表单,DataFrame 对象也是CollectionExpr的特例(ps:理解不能)。CollectionExpr中包含针对二维数据集的列操作、筛选、变换等大量操作。
获取类型
dtypes可以用来获取 CollectionExpr 中所有列的类型。dtypes返回的是Schema类型。
df2.dtypes
列选择与增删
# 选择单列时,需要在列名后加逗号或用多一个中括号,否则返回的就是Sequence而不是Collection了
df['col1',]
df[['col1']]
# 选择多列
df['col1','col2']
# 排除某些列
df.exclude('col1', 'col2')[:5]
# 在数据集上就地删除某些列,这种写法只支持0.7.2之后的版本
del df['col1']
# 在已有的Collection中添加某一列变换的结果,新增的列会作为新Collection的一部分
# 下例为在原Collection后新增一列
df[df, (df.col1 + 1).rename('new_col1')].head(5)
# 0.7.2之后的版本支持在原数据集中直接追加
df['new_col1'] = df.col1 + 1
# 增删列以创建新Collection的另一种方法是调用select方法,传入需要用到的字段及计算表达式
df.select('name', col1=iris.col - 1).head(5)
# lambda表达式可接受Collection作为传入参数,以用来增删列,如下例就将col1列去掉了
df['name', 'col1'][[lambda x: x.name]].head(5)
# 0.7.2以后版本的 PyODPS 中,支持对数据进行条件赋值,下例为,当满足条件时,对col2字段重新赋值
df[df.col1 > 5.0, 'col2'] = iris.col * 2
引入常数和随机数
# 在Collection中追加一列常数。追加常数需要使用Scalar,引入时需要手动指定列名
from odps.df import Scalar
df2[df2, Scalar(1).rename('id')][:5]
# 指定一个空值列,可以使用NullScalar
from odps.df import NullScalar
df2[df2, NullScalar('float').rename('fid')][:5]
# 0.7.12及以后版本中,简化为
df2['id'] = 1
# 空值列的简化写法如下
from odps.df import NullScalar
df2['null_col'] = NullScalar('float')
# 在Collection中增加一列随机数列,该列类型为 float,范围为0-1
from odps.df import RandomScalar
df2[df2, RandomScalar().rename('rand_val')][:5]
过滤数据
# 单个条件
df2[df2.col1 > 5].head(5)
# 多个条件
df2[(df2.col1 < 5) & (df2['col2'] > 1.5)].head(5) # 与
df2[(df2.col1 < 5) | (df2['col2'] > 1.5)].head(5) # 或
df2[~(df2.col1 < 5)].head(5) # 非
# 使用filter方法过滤数据,等价于多个与运算
df2.filter(df2.col1 > 3.5, df2.col2 < 4).head(5)
# 对于连续的操作,可以使用lambda表达式
df2[df2.col1 > 3.8]['name', lambda x: x.col1 + 1]
# Collection如果包含一个boolean类型的字段,则可以直接使用该列作为过滤条件。
>>> df.dtypes
odps.Schema {
a boolean
b int64
}
>>> df[df.a]
a b
0 True 1
1 True 3
# 因此,如果在想取字段a的时候,应该这么写
df[df.a, ]
df[[df.a]]
df.select(df.a)
df.a
df['a']
# 使用query方法来对数据进行筛选,在query方法中,&和and都表示与操作,|和or都表示或操作。
df2.query("(col1 < 5) and (col2 > 1.5)").head(5)
# 在query方法中引用本地变量
var = 4
df2.query("(iris.col1 < 2.5) | (col2 > @var)").head(5)
并行多列输出
对于list及map类型的列,explode方法会将该列转换为多行输出,在有多个该类型列时,需要打散哪一列,就在哪列上调用explode方法。
>>> df
id a b
0 1 [a1, b1] [a2, b2, c2]
1 2 [c1] [d2, e2]
>>> df[df.id, df.a.explode(), df.b]
id a b
0 1 a1 [a2, b2, c2]
1 1 b1 [a2, b2, c2]
2 2 c1 [d2, e2]
# 如果多行输出方法对某个输入不产生任何输出,默认输入行将不在最终结果中出现
# 但可以通过keep_nulls参数进行记录行的保留
>>> df
id a
0 1 [a1, b1]
1 2 []
>>> df[df.id, df.a.explode()]
id a
0 1 a1
1 1 b1
>>> df[df.id, df.a.explode(keep_nulls=True)]
id a
0 1 a1
1 1 b1
2 2 None
限制条数
df.head(5)
df[:5] # 只能用于Collection,不能作用于sequence。
df.limit(5)
执行
延迟执行
DataFrame上的所有操作并不会立即执行,只有当用户显式调用execute方法,或者一些立即执行的方法时(内部调用的就是execute),才会真正去执行。
立即执行的方法包括:
方法 | 说明 | 返回值 |
---|---|---|
persist | 将执行结果保存到ODPS表 | PyODPS DataFrame |
execute | 执行并返回全部结果 | ResultFrame |
head | 查看开头N行数据,这个方法会执行所有结果,并取开头N行数据 | ResultFrame |
tail | 查看结尾N行数据,这个方法会执行所有结果,并取结尾N行数据 | ResultFrame |
to_pandas | 转化为 Pandas DataFrame 或者 Series,wrap 参数为 True 的时候,返回 PyODPS DataFrame 对象 | wrap为True返回PyODPS DataFrame,False(默认)返回pandas DataFrame |
plot,hist,boxplot | 画图有关 |
在交互式环境下,DataFrame会在打印结果前自动调用execute方法,但是也可以关闭这个功能,此时执行打印操作时,会显示整棵抽象语法树。
# 关闭交互式环境下的自动调用execute
from odps import options
options.interactive = False
保存执行结果为ODPS表
对 Collection,我们可以调用persist方法,参数为表名,返回一个新的DataFrame对象,且会在MaxCompute中创建对应表。
# 入口对象为o
df2 = df.persist('table_name',odps=o)
# 入口对象全局化后的写法
o.to_global()
df2 = df.persist('table_name')
# 指定表的生命周期
df2 = df.persist('table_name',lifecycle=10)
# 创建分区表,partitions参数传入list对象,元素为分区字段名称,而且df中应包含该字段
df3 = df.persist('table_name', partitions=['name'])
# 如果是写入已经存在的表,则可传入partition参数,指定写入的分区
# 此时,df中的所有字段都必须在该表存在,且类型相同
# drop_partition和create_partition参数只有在此时有效
df.persist('table_name', partition='ds=20200217', drop_partition=True, create_partition=True)
保存执行结果为Pandas DataFrame
可以对PyODPS DataFrame使用to_pandas方法,如果wrap参数为True,将返回PyODPS DataFrame对象。
>>> type(iris[iris.sepalwidth < 2.5].to_pandas())
pandas.core.frame.DataFrame
>>> type(iris[iris.sepalwidth < 2.5].to_pandas(wrap=True))
odps.df.core.DataFrame
注意:to_pandas返回的pandas DataFrame与直接通过pandas创建的 DataFrame 没有任何区别,数据的存储和计算均在本地。如果wrap=True,生成的即便是PyODPS DataFrame,数据依然在本地。如果的数据很大,或者运行环境的内存限制较为严格,请谨慎使用 to_pandas。
立即运行设置运行参数
对于立即执行的方法,比如execute、persist、to_pandas等,可以设置运行时参数(仅对ODPS SQL后端有效 )。
一种方法是设置全局参数。详细参考SQL设置运行参数 。
也可以在这些立即执行的方法上,使用hints参数。这样,这些参数只会作用于当前的计算过程,如
>>> iris[iris.sepallength < 5].to_pandas(hints={'odps.sql.mapper.split.size': 16})
运行时显示详细信息
修改全局配置,用于查看运行时instance的logview(所以实际上被转化为odps sql了吗?)
>>> from odps import options
>>> options.verbose = True
>>>
>>> iris[iris.sepallength < 5].exclude('sepallength')[:5].execute()
Sql compiled:
SELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name`
FROM odps_test_sqltask_finance.`pyodps_iris` t1
WHERE t1.`sepallength` < 5
LIMIT 5
logview:
http://logview
sepalwidth petallength petalwidth name
0 3.0 1.4 0.2 Iris-setosa
1 3.2 1.3 0.2 Iris-setosa
2 3.1 1.5 0.2 Iris-setosa
3 3.4 1.4 0.3 Iris-setosa
4 2.9 1.4 0.2 Iris-setosa
缓存中间Collection计算结果
DataFrame的计算过程中,一些Collection被多处使用,或者用户需要查看中间过程的执行结果,这时用户可以使用 cache标记某个collection需要被优先计算,但并不会触发立即计算。
>>> cached = iris[iris.sepalwidth < 3.5].cache()
>>> df = cached['sepallength', 'name'].head(3)
>>> df
sepallength name
0 4.9 Iris-setosa
1 4.7 Iris-setosa
2 4.6 Iris-setosa
>>> cached.head(3) # 由于cached已经被计算,所以能立刻取到计算结果
sepallength name
0 4.9 Iris-setosa
1 4.7 Iris-setosa
2 4.6 Iris-setosa
异步和并行执行
DataFrame支持异步操作,对于立即执行的方法,包括execute、persist、head、tail、to_pandas(其他方法不支持), 传入async参数,即可以将一个操作异步执行,timeout参数指定超时时间, 异步返回的是Future对象。
(ps:异步操作的本质我不是很懂)
>>> future = iris[iris.sepal_width < 10].head(10, async=True)
>>> future.done()
True
>>> future.result()
DataFrame的并行执行可以使用多线程来并行,单个 expr 的执行可以通过n_parallel参数来指定并发度。 比如,当一个 DataFrame 的执行依赖的多个 cache 的 DataFrame 能够并行执行时,该参数就会生效。
>>> expr1 = iris.groupby('category').agg(value=iris.sepal_width.sum()).cache()
>>> expr2 = iris.groupby('category').agg(value=iris.sepal_length.mean()).cache()
>>> expr3 = iris.groupby('category').agg(value=iris.petal_length.min()).cache()
>>> expr = expr1.union(expr2).union(expr3)
>>> future = expr.execute(n_parallel=3, async=True, timeout=2) # 并行和异步执行,2秒超时,返回Future对象
>>> future.result()
当同时执行多个 expr 时,我们可以用多线程执行,但会面临一个问题, 比如两个 DataFrame 有共同的依赖,这个依赖将会被执行两遍。现在我们提供了新的Delay API, 来将立即执行的操作(包括 execute、persist、head、tail、to_pandas,其他方法不支持)变成延迟操作,并返回 Future 对象。当用户触发delay执行的时候,会去寻找共同依赖,按用户给定的并发度执行,并支持异步执行。delay.execute 也接受async操作来指定是否异步执行,当异步的时候,也可以指定timeout参数来指定超时时间。
>>> from odps.df import Delay
>>> delay = Delay() # 创建Delay对象
>>>
>>> df = iris[iris.sepal_width < 5].cache() # 有一个共同的依赖
>>> future1 = df.sepal_width.sum().execute(delay=delay) # 立即返回future对象,此时并没有执行
>>> future2 = df.sepal_width.mean().execute(delay=delay)
>>> future3 = df.sepal_length.max().execute(delay=delay)
>>> delay.execute(n_parallel=3) # 并发度是3,此时才真正执行。
|==========================================| 1 / 1 (100.00%) 21s
>>> future1.result()
458.10000000000014
>>> future2.result()
3.0540000000000007
列运算
对于一个Sequence来说,对它加上一个常量、或者执行sin函数的这类操作时,是作用于每个元素上的。(类似于apply?)
NULL相关(isnull,notnull,fillna)
isnull用于判断是否某字段是NULL,notnull与isnull作用相同,但返回值相反,而fillna是将NULL值填充为用户指定的值。
from odps.df import DataFrame
iris = DataFrame(o.get_table('pyodps_iris'))
lens = DataFrame(o.get_table('pyodps_ml_100k_lens'))
iris.sepallength.isnull().head(5)
逻辑判断(ifelse,switch)
ifelse作用于boolean类型的字段,当条件成立时,返回第0个参数,否则返回第1个参数。
switch用于多条件判断的情况,参数形式为条件1,结果1,条件2,结果2…条件n,结果n。
(iris.sepallength > 5).ifelse('gt5', 'lte5').rename('cmp5').head(5)
# default参数含义为不满足任何条件时取的值
iris.sepallength.switch(4.9, 'eq4.9', 5.0, 'eq5.0', default='noeq').rename('equalness').head(5)
数学运算
对于数字类型的字段,支持+,-,*,/等操作,也支持log、sin等数学计算。
算术运算支持的操作包括:
- 绝对值:abs
- 平方根:sqr
- 正弦和反正弦:sin、arcsin、sinh、arcsinh
- 余弦和反余弦:cos、arccos、cosh、arccosh
- 正切和反正切:tan、arctan、tanh、arctanh
- 指数函数:exp、expm1(即exp(x)-1)
- 对数:log、log2、log10、log1p(即log(x+1))
- 弧度角度转换:redians(用角度计算弧度)、degrees(用弧度计算角度)
- 取整:ceil(向上取整)、floor(向下取整)、trunc(截尾取整)
对于sequence,也支持与其它sequence和scalar的比较,同时也提供了between函数用于判断数据是否在某个区间内。
(iris.sepallength.between(3, 5)).head(5)
(iris.sepallength.between(3, 5, inclusive=False)).head(5)
String相关操作
string相关操作包括:
string 操作 | 说明 |
---|---|
capitalize | 首字母大写 |
contains | 包含某个字符串,如果 regex 参数为 True,则是包含某个正则表达式,默认为 True |
count | 指定字符串出现的次数 |
endswith | 以某个字符串结尾 |
startswith | 以某个字符串开头 |
extract | 抽取出某个正则表达式,如果 group 不指定,则返回满足整个 pattern 的子串;否则,返回第几个 group |
find | 返回第一次出现的子串位置,若不存在则返回-1 |
rfind | 从右查找返回子串第一次出现的位置,不存在则返回-1 |
replace | 将某个 pattern 的子串全部替换成另一个子串, n 参数若指定,则替换n次 |
get | 返回某个位置上的字符串 |
len | 返回字符串的长度 |
ljust | 若未达到指定的 width 的长度,则在右侧填充 fillchar 指定的字符串(默认空格) |
rjust | 若未达到指定的 width 的长度,则在左侧填充 fillchar 指定的字符串(默认空格) |
lower | 变为全部小写 |
upper | 变为全部大写 |
lstrip | 在左侧删除空格(包括空行符) |
rstrip | 在右侧删除空格(包括空行符) |
strip | 在左右两侧删除空格(包括空行符) |
split | 将字符串按分隔符拆分为若干个字符串(返回 list |
pad | 在指定的位置(left,right 或者 both)用指定填充字符(用 fillchar 指定,默认空格)来对齐 |
repeat | 重复指定 n 次 |
slice | 切片操作 |
swapcase | 对调大小写 |
title | 同 str.title |
zfill | 长度没达到指定 width ,则左侧填充0 |
isalnum | 同 str.isalnum |
isalpha | 同 str.isalpha |
isdigit | 是否都是数字,同 str.isdigit |
isspace | 是否都是空格,同 str.isspace |
islower | 是否都是小写,同 str.islower |
isupper | 是否都是大写,同 str.isupper |
istitle | 同 str.istitle |
isnumeric | 同 str.isnumeric |
isdecimal | 同 str.isdecimal |
todict | 将字符串按分隔符拆分为一个 dict,传入的两个参数分别为项目分隔符和 Key-Value 分隔符(返回 dict |
strptime | 按格式化读取成时间,时间格式和Python标准库相同,详细参考 Python 时间格式化 |
时间相关操作
对于datetime类型的Sequence或者Scalar,可以调用时间相关的内置函数,其本身也包含时间相关的。
与时间相关的属性包括:
时间相关属性 | 说明 |
---|---|
year | 年 |
month | 月 |
day | 日 |
hour | 小时 |
minute | 分钟 |
second | 秒 |
weekofyear | 返回日期位于那一年的第几周。周一作为一周的第一天 |
weekday | 返回日期当前周的第几天 |
dayofweek | 同 weekday |
strftime | 格式化时间,时间格式和 Python 标准库相同,详细参考 Python 时间格式化 |
PyODPS同样也支持时间的加减操作
# pyodps支持的所有时间类型
from odps.df import year,month,day,hour,minute,second,millisecond
df.a - day(3) # 获取时间字段a三天前的日期
df.b - df.a # 日期相减返回的是整数,即毫秒数
集合类型相关操作
PyODPS 支持的集合类型有List 和 Dict。这两个类型都可以使用下标获取集合中的某个元素。
支持的共有方法如下:
- len :可获得集合的大小。
- explode 方法:用于展开集合中的内容。对于 List,explode 默认返回一列,当传入参数 pos 时,将返回两列,其中一列为值在数组中的编号,而另一列则为元素,展开的元素按记录行顺序排列。对于 Dict,explode 会返回两列,分别表示 keys 及 values。explode 中也可以传入列名,作为最后生成的列。
List支持的独有方法如下:
- contains(v):列表是否包含某个元素
- sort:返回排序好的列表
Dict支持的独有方法如下:
- keys:返回字典的所有key,返回值为list
- values:返回字典的所有value,返回值为list
explode方法示例如下,同样也可以和并行多列输出结合:
>>> df
id a b
0 1 [a1, b1] {'a2': 0, 'b2': 1, 'c2': 2}
1 2 [c1] {'d2': 3, 'e2': 4}
# 按照下标和key来取值,如果key对应的值不存在,则返回结果的对应字段会返回NaN
df[df.id, df.a[0], df.b['b2']]
# 获取集合大小
df[df.id, df.a.len(), df.b.len()]
# 展开集合
df.a.explode()
# 展开集合并添加编号列,还可重新指定列名
df.a.explode(pos=True)
df.a.explode(['pos', 'value'], pos=True)
df.b.explode(['key', 'value'])
其它元素操作
# 判断Sequence中的元素是否在isin方法传入的集合里,而notin方法为反操作,均返回布尔值
iris.sepallength.isin([4.9, 5.1]).rename('sepallength').head(5)
iris.sepallength.notin([4.9, 5.1]).rename('sepallength').head(5)
# 将Sequence的数据根据传入的第一个参数进行打散
# include_under和include_over可以分别包括向下和向上的区间。
iris.sepallength.cut(range(6), labels=['0-1', '1-2', '2-3', '3-4', '4-5'],
include_under=True, include_over=True).rename('sepallength_cut').head(5)
使用自定义函数
Dataframe支持对Sequence使用map,即对每个元素执行自定义函数,例如
# 如果map前后,Sequence的类型发生了变化,则需要显式指定map后的类型。
iris.sepallength.map(lambda x: 't'+str(x), 'string').head(5)
注意:目前,受限于 Python UDF,自定义函数无法支持将list/dict类型作为输入或输出。
如果在函数中包含闭包,需要注意的是,函数外闭包变量值的变化会引起函数内该变量值的变化。
查看下面三段代码
dfs = []
for i in range(10):
dfs.append(df.sepal_length.map(lambda x: x + i))
dfs = []
def get_mapper(i):
return lambda x: x + i
for i in range(10):
dfs.append(df.sepal_length.map(get_mapper(i)))
import functools
dfs = []
for i in range(10):
dfs.append(df.sepal_length.map(functools.partial(lambda v, x: x + v, i)))
for j in dfs:
print(j.execute())
第一段代码中,当循环结束后,变量i的值固定在i=9,而SequenceExpr保存的是运算过程,lambda函数指向的是i,也就是值9,因此每次执行都是对Sequance里的每个元素+9,而并非是依次+0+1+2+…+9;
而第二和第三段代码中,由于是先把i的值传了进去,然后返回匿名函数,也就是说,每次返回的匿名函数都是不一样的,第一次返回lambda x:x+0,第二次返回lambda x:x+1,以此类推,因此在execute的时候,对Sequence里的每个元素依次执行了+0+1+2+…+9的操作。
【ps:不得不说这是真的细节】
map也支持使用现有的UDF函数,传入的参数是str类型(函数名)或者Function对象 。
引用资源
自定义函数也能读取ODPS上的资源(表资源或文件资源),或者引用一个collection作为资源。此时,自定义函数需要写成函数闭包或callable的类。
# 创建资源
>>> file_resource = o.create_resource('pyodps_iris_file', 'file', file_obj='Iris-setosa')
>>>
>>> iris_names_collection = iris.distinct('name')[:2]
>>> iris_names_collection
sepallength
0 Iris-setosa
1 Iris-versicolor
>>> def myfunc(resources): # resources按调用顺序传入
>>> names = set()
>>> fileobj = resources[0] # 传入的参数为list,因此通过下标进行取用
>>> for l in fileobj:
>>> names.add(l)
>>> collection = resources[1]
>>> for r in collection:
>>> names.add(r.name) # 这里可以通过字段名或者偏移来取
>>> def h(x):
>>> if x in names:
>>> return True
>>> else:
>>> return False
>>> return h
>>>
>>> df = iris.distinct('name')
>>> df = df[df.name,
>>> df.name.map(myfunc, resources=[file_resource, iris_names_collection], rtype='boolean').rename('isin')]
>>>
>>> df
name isin
0 Iris-setosa True
1 Iris-versicolor True
2 Iris-virginica False
使用第三方Python库
值得注意的是,第三方库的依赖库,也必须指定,否则依然会有导入错误。
# 下载所需的库和依赖库
pip download python-dateutil -d /to/path/
# 通过全局配置对第三方库进行使用
from odps import options
options.df.libraries = ['six.whl', 'python_dateutil.whl']
# 通过execute方法的libraries参数指定
df.datestr.map(get_year).execute(libraries=['six.whl', 'python_dateutil.whl'])
使用计数器
【留白,不知道是干嘛的】
调用ODPS内建或者已定义函数
要想调用ODPS上的内建或者已定义函数,来生成列,我们可以使用 func 接口,该接口默认函数返回值为 String, 可以用 rtype 参数指定返回值。
>>> from odps.df import func
# func.rand即调用rand函数
>>> iris[iris.name, func.rand(rtype='float').rename('rand')][:4]
>>> iris[iris.name, func.rand(10, rtype='float').rename('rand')][:4]
>>> # 调用 ODPS 上定义的 UDF,列名无法确定时需要手动指定
>>> iris[iris.name, func.your_udf(iris.sepalwidth, iris.sepallength, rtype='float').rename('new_col')]
>>> # 从其他 Project 调用 UDF,也可通过 name 参数指定列名
>>> iris[iris.name, func.your_udf(iris.sepalwidth, iris.sepallength, rtype='float', project='udf_project',
>>> name='new_col')]
聚合操作
常用聚合操作
注意:所有的聚合操作都会忽略空值,且目前受限于 Python UDF,自定义聚合无法支持将 list / dict 类型作为初始输入或最终输出结果。
from odps.df import DataFrame
iris = DataFrame(o.get_table('pyodps_iris'))
iris.describe() # 查看列的数量、最大值、最小值、平均值以及标准差,与pandas中的类似
iris.sepallength.max() # 查看最大值
iris.name.unique().cat(sep=',') # 去重后再进行聚合拼接所有值
iris.a.mean() # 如果每列都支持某种聚合操作,则可以直接对整个DataFrame进行聚合
iris.a.count() # 获取总行数
iris.a.size() # 获取总行数
iris.a.nunique() # 不重复值数量,等价于count distinct
iris.a.min() # 最小值
iris.a.max() # 最大值
iris.a.sum() # 求和
iris.a.mean() # 均值
iris.a.median() # 中位数
iris.a.quantile(p) # 分位数,仅在p为整数值下可取得准确值
iris.a.var() # 方差
iris.a.std() # 标准差
iris.a.moment() # n阶中心矩(n阶矩)
iris.a.skew() # 样本偏度(无偏估计)
iris.a.kurtosis() # 样本峰度(无偏估计)
iris.a.cat('sep') # 按分隔符sep做字符串连接操作
iris.a.tolist() # 将Sequence组合为list
分组聚合
分组聚合的理解与SQL类似,先分组,再聚合,因此是先使用DataFrame的groupby来执行分组操作,然后调用agg或者aggregate方法来执行聚合操作。
iris.groupby('name').agg(iris.sepallength.max(), smin=iris.sepallength.min())
# 下面二者等价,即分组后count,并按从大到小排列
iris.groupby('name').agg(count=iris.name.count()).sort('count', ascending=False).head(5)
iris['name'].value_counts().head(5)
# 聚合后的单列可以直接取出列名。但此时只能使用聚合函数
iris.groupby('name').petallength.sum()
# 同样支持对常量进行分组,但需要使用Scalar初始化
from odps.df import Scalar
iris.groupby(Scalar(1)).petallength.sum()
编写自定义聚合
自定义聚合需要提供一个类,这个类需要提供以下方法:
- buffer():返回一个mutable的object(比如 list、dict),buffer大小不应随数据而递增。
- call(buffer, *val):将值聚合到中间 buffer。
- merge(buffer, pbuffer):将 pbuffer 聚合到 buffer 中。
- getvalue(buffer):返回最终值。
# 求平均数
class Agg(object):
def buffer(self):
return [0.0, 0]
def __call__(self, buffer, val):
buffer[0] += val
buffer[1] += 1
def merge(self, buffer, pbuffer):
buffer[0] += pbuffer[0]
buffer[1] += pbuffer[1]
def getvalue(self, buffer):
if buffer[1] == 0:
return 0.0
return buffer[0] / buffer[1]
# 使用自定义聚合
iris.sepalwidth.agg(Agg)
iris.sepalwidth.agg(Agg, 'float') # 指定数据类型(输入/输出?)
iris.groupby('name').sepalwidth.agg(Agg) # 分组聚合
# 对多列调用自定义聚合
from odps.df import agg
# 对两列调用自定义聚合,即将其作为buffer的元素
to_agg = agg([iris.sepalwidth, iris.sepallength], Agg, rtype='float')
iris.groupby('name').agg(val=to_agg) # 分组后进行聚合
HyperLogLog计数
适用于海量数据的去重统计。
>>> df = DataFrame(pd.DataFrame({'a': np.random.randint(100000, size=100000)}))
>>> df.a.hll_count()
63270
>>> df.a.nunique()
63250
排序、去重、采样、数据变换
同样先进行声明
from odps.df import DataFrame
iris = DataFrame(o.get_table('pyodps_iris'))
排序
排序操作只能作用于Collection,等同于SQL中的order by
iris.sort('sepalwidth',ascending=False).head(5) # 设ascending值设为False为降序排列,默认为升序
iris.sort(-iris.sepalwidth).head(5) # 降序排列的另外一种方式,但笔者觉得可读性没第一种好
# 多字段排序,ascending可传入list来按顺序标识字段为升序还是降序
iris.sort(['sepalwidth', 'petallength'], ascending=[True, False]).head(5)
iris.sort(['sepalwidth', -iris.petallength]).head(5) # 上一语句等价变体
ODPS要求排序必须指定记录条数,所以在ODPS后端执行时, 会通过options.df.odps.sort.limit指定排序个数,这个值默认是10000, 如果要排序尽量多的数据,可以把这个值设到较大的值。不过注意,此时可能会导致 OOM。
去重
# 对Collection使用distinct方法
iris[['name']].distinct()
isrs.distinct('name')
iris.distinct('name', 'sepallength').head(3)
# 对Sequence使用unique方法,但调用unique的Sequence不能用于列选择中
iris.name.unique()
iris[iris.name, iris.name.unique()] # 错误用法
采样
除了按份数采样外,其余方法如果要在ODPS DataFrame上执行,需要Project支持XFlow,否则,这些方法只能在 Pandas DataFrame 后端上执行。【ps:已经习惯了残疾版dataframe了,还要啥自行车,又不是不能用(滑稽)】
# 按份数采样
iris.sample(parts=10) # 分成10份,默认取第0份
iris.sample(parts=10, i=0) # 手动指定取第0份
iris.sample(parts=10, i=[2, 5]) # 分成10份,取第2和第5份
iris.sample(parts=10, columns=['name', 'sepalwidth']) # 根据name和sepalwidth的值做采样
数据缩放
即对数据进行归一化与标准化。
# 线性归一化,公式为 (X-X_min)/(X_max-X_min)
# feature参数指定的是输出的范围
# preserve参数设为True可用于保留原始列,结果列会新增一列
df.min_max_scale(columns=['fid'], feature=(-1,1), preserve=True)
# group参数指定一个或多个分组列,在分组列中分别取最值进行缩放
df.min_max_scale(columns=['fid'], group=['name'])
# 标准化,参数与min_max_scale一致,不再赘述
df.std_scale(columns=['fid'])
空值处理
# 删除空行
df.dropna(subset=['f1', 'f2', 'f3', 'f4']) # 取出subset中,包含空值的行
df.dropna(how='all', subset=['f1', 'f2', 'f3', 'f4']) # 只要行中有非空值则不删除,必须要全字段为空才删除
df.dropna(thresh=3, subset=['f1', 'f2', 'f3', 'f4']) # 指定行中至少有几个非空值,不符合标准的会被删掉
# 填充空值
df.fillna(100, subset=['f1', 'f2', 'f3', 'f4']) # 使用常数填充
df.fillna(df.f2, subset=['f1', 'f2', 'f3', 'f4']) # 使用指定列的值进行填充
# 向后填充及向前填充【我又一次被PyODPS DataFrame骚到了,pandas是按行向前后填充,而它是按列的顺序向前和向后ORZ】
df.fillna(method='bfill', subset=['f1', 'f2', 'f3', 'f4'])
df.fillna(method='ffill', subset=['f1', 'f2', 'f3', 'f4'])
对所有行/列调用自定义函数
要对一行数据使用自定义函数,可以使用 apply 方法,axis参数必须为 1,表示在行上操作。
# reduce=True,表示返回Sequence,False则返回Collection
# names参数可指定返回的列名(例子中没有写,因此只返回了相加后的结果列)
# types参数可指定返回的类型,如果不指定则默认返回String类型
>>> iris.apply(lambda row: row.sepallength + row.sepalwidth, axis=1, reduce=True, types='float').rename('sepaladd').head(3)
sepaladd
0 8.6
1 7.9
2 7.9
# 当reduce=False时,可利用yield返回多行结果
>>> iris.count()
150
>>>
>>> def handle(row):
>>> yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
>>> yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
>>>
>>> iris.apply(handle, axis=1, names=['iris_add', 'iris_sub'], types=['float', 'float']).count()
300
# 还可以利用注释来标明返回的字段名和类型
>>> from odps.df import output
>>>
>>> @output(['iris_add', 'iris_sub'], ['float', 'float'])
>>> def handle(row):
>>> yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
>>> yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
# 使用map-only实现等价操作
iris.map_reduce(mapper=handle)
# 调用已经存在的UDTF,只需要指定函数名即可
iris['name', 'sepallength'].apply('your_func', axis=1, names=['name2', 'sepallength2'], types=['string', 'float'])
要对所有列调用自定义聚合,参考上文的编写自定义聚合。
MapReduce API
PyODPS DataFrame也支持MapReduce API,下面是WordCount的例子。
>>> def mapper(row):
>>> for word in row[0].split():
>>> yield word.lower(), 1
>>>
>>> def reducer(keys):
>>> # 这里使用 list 而不是 cnt = 0,否则 h 内的 cnt 会被认为是局部变量,其中的赋值无法输出
>>> cnt = [0]
>>> def h(row, done): # done表示这个key已经迭代结束
>>> cnt[0] += row[1]
>>> if done:
>>> yield keys[0], cnt[0]
>>> return h
>>>
>>> words_df.map_reduce(mapper, reducer, group=['word', ],
>>> mapper_output_names=['word', 'cnt'],
>>> mapper_output_types=['string', 'int'],
>>> reducer_output_names=['word', 'cnt'],
>>> reducer_output_types=['string', 'int'])
word cnt
0 hello 2
1 i 1
2 is 1
3 life 1
4 python 2
5 short 1
6 use 1
7 world 1
这里按笔者的理解来分析下这段代码,因为官方文档给的解释实在太少了【ps:大概是因为我菜所以看了五分钟才懂】。首先是mapper,输入的参数是row,即mapper会按行读取,row是一个list对象,因此需要以row[0]的方式取出该行数据,并按delimiter来做split,切分出的每个单词以
然后是reducer,传入的是keys,即所有的word,cnt[0]作为一个计数器,在迭代不同key的时候对word出现的次数进行累加,这里的row[1]指的其实是在mapper中返回的kv对中的value,即那个1,如果中间有combiner,也可能是n(反正reducer的逻辑要与combiner相同)。而最终在reducer中的cnt[0]会将所有的结果累加起来,得到一个最终的结果,在这里并不关心前面是否有combiner处理过。done作为一个标识,应该是必定需要存在的,标志当前key的迭代完成,即以kv对的方式返回最终结果。
最后是driver,指定了mapper、reducer的输出字段及对应类型,指定了mapper和reducer方法,group存放用于分组的字段,如果不指定该参数,会对所有字段做分组。
实际上,写成callable类的方式会更为直观。
class reducer(object):
def __init__(self, keys):
self.cnt = 0
def __call__(self, row, done): # done表示这个key已经迭代结束
self.cnt += row.cnt
if done:
yield row.word, self.cnt
也可以使用output注释
>>> from odps.df import output
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def mapper(row):
>>> for word in row[0].split():
>>> yield word.lower(), 1
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def reducer(keys):
>>> # 这里使用 list 而不是 cnt = 0,否则 h 内的 cnt 会被认为是局部变量,其中的赋值无法输出
>>> cnt = [0]
>>> def h(row, done): # done表示这个key已经迭代结束
>>> cnt[0] += row.cnt
>>> if done:
>>> yield keys.word, cnt[0]
>>> return h
>>>
>>> words_df.map_reduce(mapper, reducer, group='word')
如果需要在执行MapReduce的时候对数据进行排序,可以使用sort参数,参数值为需要用于排序的字段列表;而至于升降序则可以通过ascending参数指定,参数值可以是一个bool值,表示所有的字段均按升/降序进行排列,也可以传入一个与sort参数值长度相等的列表,通过bool值指定哪些字段需要升序,哪些需要降序。
指定combiner
combiner用于在map端执行reducer的操作,即在map端先对数据进行聚合,用法和reducer是完全一致的,但不能引用资源。 并且combiner的输出的字段名和字段类型必须和mapper完全一致。
就如上文所说,可以使用reducer作为combiner。
words_df.map_reduce(mapper, reducer, combiner=reducer, group='word')
引用资源
使用mapper_resources和reducer_resources指定引用的资源
white_list_file = o.create_resource('pyodps_white_list_words', 'file', file_obj='Python\nWorld')
<mapper>
<reducer>
words_df.map_reduce(mapper, reducer, group='word',
mapper_resources=[stop_words], reducer_resources=[white_list_file])
使用第三方Python库
参考前文使用第三方Python库。
重排数据
有时数据在集群上分布可能是不均匀的,需要对数据重排。
# 默认会按随机数做哈希来分布,也可以指定按那些列做分布,且可以指定重排后的排序顺序。
df1 = df.reshuffle()
df1.reshuffle('name', sort='id', ascending=False)
布隆过滤器(Bloom Filter)
在PyODPS DataFrame中,bloom filter是通过seq2去过滤seq1的内容,不存在于seq2中的数据,就会被过滤,作为一个近似的方法,当数据量大的时候,可能有一些数据不能正确的被过滤。
可以传入capacity和error_rate来设置数据的量以及错误率,默认值是3000和0.01,调大capacity或者减小 error_rate会增加内存的使用,所以应当根据实际情况选择一个合理的值。
df1 = DataFrame(pd.DataFrame({'a': ['name1', 'name2', 'name3', 'name1'], 'b': [1, 2, 3, 4]}))
df2 = DataFrame(pd.DataFrame({'a': ['name1']}))
df1.bloom_filter('a', df2.a) # 这里第0个参数可以是个计算表达式如: df1.a + '1'
透视表(pivot_table)
df['A', 'D', 'E'].pivot_table(rows='A') # rows参数表示根据哪个字段来做聚合,默认求平均数
df.pivot_table(rows=['A', 'B', 'C']) # rows参数也可以传入字段列表
df.pivot_table(rows=['A', 'B'], values='D') # values参数用于指定要计算的字段,同样可以传入列表
# aggfunc指定聚合的函数
df.pivot_table(rows=['A', 'B'], values=['D'], aggfunc=['mean', 'count', 'sum'])
# columns参数的作用是,选择原始数据某一列的值,作为新的Collection的列,即行转列
df.pivot_table(rows=['A', 'B'], values='D', columns='C')
# 零值填充
df.pivot_table(rows=['A', 'B'], values='D', columns='C', fill_value=0)
Key-Value字符串变换
DataFrame提供了将Key-Value对展开为列,以及将普通列转换为Key-Value列的功能。
>>> df
name kv
0 name1 k1=1,k2=3,k5=10
1 name1 k1=7.1,k7=8.2
2 name2 k2=1.2,k3=1.5
3 name2 k9=1.1,k2=1
# 使用extract方法进行kv对的展开,columns参数传入需要展开的字段列表,kv_delim参数传入kv的连接符
# item_delim参数传入kv对的分隔符,fill_value参数用于填充缺失值
>>> df.extract_kv(columns=['kv'], kv_delim='=', item_delim=',', fill_value=0)
name kv_k1 kv_k2 kv_k3 kv_k5 kv_k7 kv_k9
0 name1 1.0 3.0 NaN 10.0 NaN NaN
1 name1 7.0 NaN NaN NaN 8.2 NaN
2 name2 NaN 1.2 1.5 NaN NaN NaN
3 name2 NaN 1.0 NaN NaN NaN 1.1
>>> df
name k1 k2 k3 k5 k7 k9
0 name1 1.0 3.0 NaN 10.0 NaN NaN
1 name1 7.0 NaN NaN NaN 8.2 NaN
2 name2 NaN 1.2 1.5 NaN NaN NaN
3 name2 NaN 1.0 NaN NaN NaN 1.1
# 使用to_kv方法将数据合成kv对,columns传入需要处理的字段列表
# kv_delim参数传入key和value的连接符,kv对分隔符默认为逗号
>>> df.to_kv(columns=['k1', 'k2', 'k3', 'k5', 'k7', 'k9'], kv_delim='=')
name kv
0 name1 k1=1,k2=3,k5=10
1 name1 k1=7.1,k7=8.2
2 name2 k2=1.2,k3=1.5
3 name2 k9=1.1,k2=1
数据合并
Join操作
DataFrame也支持对两个Collection执行 join 的操作。
from odps.df import DataFrame
movies = DataFrame(o.get_table('pyodps_ml_100k_movies'))
ratings = DataFrame(o.get_table('pyodps_ml_100k_ratings'))
# join中传入需要关联的dataframe,由于没有指定关联字段,DataFrame API会寻找名字相同的字段进行join
movies.join(ratings).head(3)
# 显式指定关联字段
movies.join(ratings, on='movie_id').head(3)
movies.join(ratings2, on=[movies.movie_id == ratings2.movie_id2]).head(3)
# 左关联
movies.left_join(ratings, on='movie_id').head(3)
# 右关联
movies.right_join(ratings, on='movie_id').head(3)
# 使用outer_join方法,通过指定left还是right来确定是左连接还是右连接
# 自关联
movies2 = movies.view() # view方法用于复制Collection
movies.join(movies2, movies.movie_id == movies2.movie_id)[movies, movies2.movie_id.rename('movie_id2')].head(3)
以上提到的方法都支持mapjoin,只需要在参数列表里加上mapjoin=True,即可对右表执行mapjoin;同时,join操作也支持来自不同数据源的dataframe,但计算操作会在ODPS上进行。
Union操作
当两个DataFrame字段与类型都相同的时候,可以使用union方法将两份数据合并
df1.union(df2) # 等价于union all
df1.union(df2,distinct=True) # 等价于union
窗口函数
grouped = iris.groupby('name')
grouped.mutate(grouped.sepallength.cumsum(), grouped.sort('sepallength').row_number()).head(10)
# 等价的SQL代码为
select name
,sum(sepallength) over(partition by name) as sepallength_sum
,row_number() over(partition by name order by sepallength) as rn
from iris
DataFrame API支持的窗口函数包括:
窗口函数 | 说明 |
---|---|
cumsum | 计算累积和 |
cummean | 计算累积均值 |
cummedian | 计算累积中位数 |
cumstd | 计算累积标准差 |
cummax | 计算累积最大值 |
cummin | 计算累积最小值 |
cumcount | 计算累积和 |
lag | 按偏移量取当前行之前第几行的值,如当前行号为rn,则取行号为rn-offset的值 |
lead | 按偏移量取当前行之后第几行的值,如当前行号为rn则取行号为rn+offset的值 |
rank | 计算排名 |
dense_rank | 计算连续排名 |
percent_rank | 计算一组数据中某行的相对排名 |
row_number | 计算行号,从1开始 |
qcut | 将分组数据按顺序切分成n片,并返回当前切片值,如果切片不均匀,默认增加第一个切片的分布 |
nth_value | 返回分组中的第n个值 |
cume_dist | 计算分组中值小于等于当前值的行数占分组总行数的比例 |
其中,rank、dense_rank、percent_rank和row_number支持下列参数:
参数名 | 说明 |
---|---|
sort | 排序关键字,默认为空 |
ascending | 排序时,是否依照升序排序,默认 True |
lag和lead除了rank的参数外,还支持下列参数:
参数名 | 说明 |
---|---|
offset | 取数据的行距离当前行的行数 |
default | 当 offset 指定的行不存在时的返回值 |
而cumsum、cummax、cummin、cummean、cummedian、cumcount和cumstd除了rank的上述参数外,还支持下列参数:
参数名 | 说明 |
---|---|
unique | 是否排除重复值,默认 False |
preceding | 窗口范围起点 |
following | 窗口范围终点 |
南航环境需要使用该参数才可以正常查看logview
【PyODPS完结撒花!】