注:本文中的o均默认为ODPS入口对象,后续不再赘述。

DataFrame 基本概念

ODPS DataFrame中的三种对象:Collection(DataFrame) ,Sequence,Scalar。 这三个对象分别表示表结构(或者二维结构)、列(一维结构)、标量。需要注意的是,这些对象仅在使用 Pandas 数据创建后会包含实际数据, 而在 ODPS 表上创建的对象中并不包含实际的数据,而仅仅包含对这些数据的操作,实质的存储和计算会在 ODPS 中进行。

创建DataFrame

可以通过ODPS表、ODPS分区、Pandas DataFrame及SQLAlchemy table来创建。

  1. from odps.df import DataFrame
  2. # 从ODPS表创建
  3. iris = DataFrame(o.get_table('pyodps_iris'))
  4. iris2 = o.get_table('pyodps_iris').to_df() # 使用表的to_df方法
  5. # 从ODPS分区创建
  6. pt_df = DataFrame(o.get_table('partitioned_table').get_partition('pt=20171111'))
  7. pt_df2 = o.get_table('partitioned_table').get_partition('pt=20171111').to_df() # 使用分区的to_df方法
  8. # 从Pandas DataFrame创建
  9. import pandas as pd
  10. import numpy as np
  11. df = DataFrame(pd.DataFrame(np.arange(9).reshape(3, 3), columns=list('abc')))
  12. # 从sqlalchemy Table创建
  13. engine = sqlalchemy.create_engine('mysql://root:123456@localhost/movielens')
  14. metadata = sqlalchemy.MetaData(bind=engine) # 需要绑定到engine
  15. table = sqlalchemy.Table('top_users', metadata, extend_existing=True, autoload=True)
  16. users = DataFrame(table)

在用Pandas DataFrame初始化时,指定unknown_as_string参数为True,可以将无法识别的列指定为string类型,也可以指定as_type参数,传入一个dict,将指定的列强行转化为指定的类型。

  1. df2 = DataFrame(df, unknown_as_string=True, as_type={'null_col2': 'float'})

Sequence

获取列

可以使用Collection.column_name取出一列,如

  1. df2.col1.head(5)
  2. df2['col1'].head(5)

列类型

ODPS的字段和DataFrame的类型映射关系如下:

ODPS类型 DataFrame类型
bigint int64
double float64
string string
datetime datetime
boolean boolean
decimal decimal
array list
map dict

list和dict必须填写其包含值的类型,否则会报错。目前 DataFrame 暂不支持 MaxCompute 2.0 中新增的Timestamp及Struct类型,未来的版本会支持。

获取数据类型与数据类型修改

  1. df2.col1.dtype # 获取列col1的数据类型
  2. df2.col1.astype('int') # 修改列col1的数据类型

列名

在DataFrame的计算过程中,一个Sequence必须要有列名,在进行一些运算,如max()之后,DataFrame会自动将其结果集的字段进行命名,但用户也可以自行对列名重命名。

  1. df2.col1.rename('col_rename')

Collection

DataFrame中所有二维数据集上的操作都属于CollectionExpr,可视为一张 ODPS 表或一张电子表单,DataFrame 对象也是CollectionExpr的特例(ps:理解不能)。CollectionExpr中包含针对二维数据集的列操作、筛选、变换等大量操作。

获取类型

dtypes可以用来获取 CollectionExpr 中所有列的类型。dtypes返回的是Schema类型。

  1. df2.dtypes

列选择与增删

  1. # 选择单列时,需要在列名后加逗号或用多一个中括号,否则返回的就是Sequence而不是Collection了
  2. df['col1',]
  3. df[['col1']]
  4. # 选择多列
  5. df['col1','col2']
  6. # 排除某些列
  7. df.exclude('col1', 'col2')[:5]
  8. # 在数据集上就地删除某些列,这种写法只支持0.7.2之后的版本
  9. del df['col1']
  10. # 在已有的Collection中添加某一列变换的结果,新增的列会作为新Collection的一部分
  11. # 下例为在原Collection后新增一列
  12. df[df, (df.col1 + 1).rename('new_col1')].head(5)
  13. # 0.7.2之后的版本支持在原数据集中直接追加
  14. df['new_col1'] = df.col1 + 1
  15. # 增删列以创建新Collection的另一种方法是调用select方法,传入需要用到的字段及计算表达式
  16. df.select('name', col1=iris.col - 1).head(5)
  17. # lambda表达式可接受Collection作为传入参数,以用来增删列,如下例就将col1列去掉了
  18. df['name', 'col1'][[lambda x: x.name]].head(5)
  19. # 0.7.2以后版本的 PyODPS 中,支持对数据进行条件赋值,下例为,当满足条件时,对col2字段重新赋值
  20. df[df.col1 > 5.0, 'col2'] = iris.col * 2

引入常数和随机数

  1. # 在Collection中追加一列常数。追加常数需要使用Scalar,引入时需要手动指定列名
  2. from odps.df import Scalar
  3. df2[df2, Scalar(1).rename('id')][:5]
  4. # 指定一个空值列,可以使用NullScalar
  5. from odps.df import NullScalar
  6. df2[df2, NullScalar('float').rename('fid')][:5]
  7. # 0.7.12及以后版本中,简化为
  8. df2['id'] = 1
  9. # 空值列的简化写法如下
  10. from odps.df import NullScalar
  11. df2['null_col'] = NullScalar('float')
  12. # 在Collection中增加一列随机数列,该列类型为 float,范围为0-1
  13. from odps.df import RandomScalar
  14. df2[df2, RandomScalar().rename('rand_val')][:5]

过滤数据

  1. # 单个条件
  2. df2[df2.col1 > 5].head(5)
  3. # 多个条件
  4. df2[(df2.col1 < 5) & (df2['col2'] > 1.5)].head(5) # 与
  5. df2[(df2.col1 < 5) | (df2['col2'] > 1.5)].head(5) # 或
  6. df2[~(df2.col1 < 5)].head(5) # 非
  7. # 使用filter方法过滤数据,等价于多个与运算
  8. df2.filter(df2.col1 > 3.5, df2.col2 < 4).head(5)
  9. # 对于连续的操作,可以使用lambda表达式
  10. df2[df2.col1 > 3.8]['name', lambda x: x.col1 + 1]
  11. # Collection如果包含一个boolean类型的字段,则可以直接使用该列作为过滤条件。
  12. >>> df.dtypes
  13. odps.Schema {
  14. a boolean
  15. b int64
  16. }
  17. >>> df[df.a]
  18. a b
  19. 0 True 1
  20. 1 True 3
  21. # 因此,如果在想取字段a的时候,应该这么写
  22. df[df.a, ]
  23. df[[df.a]]
  24. df.select(df.a)
  25. df.a
  26. df['a']
  27. # 使用query方法来对数据进行筛选,在query方法中,&和and都表示与操作,|和or都表示或操作。
  28. df2.query("(col1 < 5) and (col2 > 1.5)").head(5)
  29. # 在query方法中引用本地变量
  30. var = 4
  31. df2.query("(iris.col1 < 2.5) | (col2 > @var)").head(5)

并行多列输出

对于list及map类型的列,explode方法会将该列转换为多行输出,在有多个该类型列时,需要打散哪一列,就在哪列上调用explode方法。

  1. >>> df
  2. id a b
  3. 0 1 [a1, b1] [a2, b2, c2]
  4. 1 2 [c1] [d2, e2]
  5. >>> df[df.id, df.a.explode(), df.b]
  6. id a b
  7. 0 1 a1 [a2, b2, c2]
  8. 1 1 b1 [a2, b2, c2]
  9. 2 2 c1 [d2, e2]
  10. # 如果多行输出方法对某个输入不产生任何输出,默认输入行将不在最终结果中出现
  11. # 但可以通过keep_nulls参数进行记录行的保留
  12. >>> df
  13. id a
  14. 0 1 [a1, b1]
  15. 1 2 []
  16. >>> df[df.id, df.a.explode()]
  17. id a
  18. 0 1 a1
  19. 1 1 b1
  20. >>> df[df.id, df.a.explode(keep_nulls=True)]
  21. id a
  22. 0 1 a1
  23. 1 1 b1
  24. 2 2 None

限制条数

  1. df.head(5)
  2. df[:5] # 只能用于Collection,不能作用于sequence。
  3. df.limit(5)

执行

延迟执行

DataFrame上的所有操作并不会立即执行,只有当用户显式调用execute方法,或者一些立即执行的方法时(内部调用的就是execute),才会真正去执行。
立即执行的方法包括:

方法 说明 返回值
persist 将执行结果保存到ODPS表 PyODPS DataFrame
execute 执行并返回全部结果 ResultFrame
head 查看开头N行数据,这个方法会执行所有结果,并取开头N行数据 ResultFrame
tail 查看结尾N行数据,这个方法会执行所有结果,并取结尾N行数据 ResultFrame
to_pandas 转化为 Pandas DataFrame 或者 Series,wrap 参数为 True 的时候,返回 PyODPS DataFrame 对象 wrap为True返回PyODPS DataFrame,False(默认)返回pandas DataFrame
plot,hist,boxplot 画图有关

在交互式环境下,DataFrame会在打印结果前自动调用execute方法,但是也可以关闭这个功能,此时执行打印操作时,会显示整棵抽象语法树。

# 关闭交互式环境下的自动调用execute
from odps import options
options.interactive = False

保存执行结果为ODPS表

对 Collection,我们可以调用persist方法,参数为表名,返回一个新的DataFrame对象,且会在MaxCompute中创建对应表。

# 入口对象为o
df2 = df.persist('table_name',odps=o)

# 入口对象全局化后的写法
o.to_global()
df2 = df.persist('table_name')

# 指定表的生命周期
df2 = df.persist('table_name',lifecycle=10)

# 创建分区表,partitions参数传入list对象,元素为分区字段名称,而且df中应包含该字段
df3 = df.persist('table_name', partitions=['name'])

# 如果是写入已经存在的表,则可传入partition参数,指定写入的分区
# 此时,df中的所有字段都必须在该表存在,且类型相同
# drop_partition和create_partition参数只有在此时有效
df.persist('table_name', partition='ds=20200217', drop_partition=True, create_partition=True)

保存执行结果为Pandas DataFrame

可以对PyODPS DataFrame使用to_pandas方法,如果wrap参数为True,将返回PyODPS DataFrame对象。

>>> type(iris[iris.sepalwidth < 2.5].to_pandas())
pandas.core.frame.DataFrame
>>> type(iris[iris.sepalwidth < 2.5].to_pandas(wrap=True))
odps.df.core.DataFrame

注意:to_pandas返回的pandas DataFrame与直接通过pandas创建的 DataFrame 没有任何区别,数据的存储和计算均在本地。如果wrap=True,生成的即便是PyODPS DataFrame,数据依然在本地。如果的数据很大,或者运行环境的内存限制较为严格,请谨慎使用 to_pandas。

立即运行设置运行参数

对于立即执行的方法,比如execute、persist、to_pandas等,可以设置运行时参数(仅对ODPS SQL后端有效 )。
一种方法是设置全局参数。详细参考SQL设置运行参数 。
也可以在这些立即执行的方法上,使用hints参数。这样,这些参数只会作用于当前的计算过程,如

>>> iris[iris.sepallength < 5].to_pandas(hints={'odps.sql.mapper.split.size': 16})

运行时显示详细信息

修改全局配置,用于查看运行时instance的logview(所以实际上被转化为odps sql了吗?)

>>> from odps import options
>>> options.verbose = True
>>>
>>> iris[iris.sepallength < 5].exclude('sepallength')[:5].execute()
Sql compiled:
SELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name`
FROM odps_test_sqltask_finance.`pyodps_iris` t1
WHERE t1.`sepallength` < 5
LIMIT 5
logview:
http://logview

   sepalwidth  petallength  petalwidth         name
0         3.0          1.4         0.2  Iris-setosa
1         3.2          1.3         0.2  Iris-setosa
2         3.1          1.5         0.2  Iris-setosa
3         3.4          1.4         0.3  Iris-setosa
4         2.9          1.4         0.2  Iris-setosa

缓存中间Collection计算结果

DataFrame的计算过程中,一些Collection被多处使用,或者用户需要查看中间过程的执行结果,这时用户可以使用 cache标记某个collection需要被优先计算,但并不会触发立即计算。

>>> cached = iris[iris.sepalwidth < 3.5].cache()
>>> df = cached['sepallength', 'name'].head(3)
>>> df
   sepallength         name
0          4.9  Iris-setosa
1          4.7  Iris-setosa
2          4.6  Iris-setosa
>>> cached.head(3)  # 由于cached已经被计算,所以能立刻取到计算结果
   sepallength         name
0          4.9  Iris-setosa
1          4.7  Iris-setosa
2          4.6  Iris-setosa

异步和并行执行

DataFrame支持异步操作,对于立即执行的方法,包括execute、persist、head、tail、to_pandas(其他方法不支持), 传入async参数,即可以将一个操作异步执行,timeout参数指定超时时间, 异步返回的是Future对象。
(ps:异步操作的本质我不是很懂)

>>> future = iris[iris.sepal_width < 10].head(10, async=True)
>>> future.done()
True
>>> future.result()

DataFrame的并行执行可以使用多线程来并行,单个 expr 的执行可以通过n_parallel参数来指定并发度。 比如,当一个 DataFrame 的执行依赖的多个 cache 的 DataFrame 能够并行执行时,该参数就会生效。

>>> expr1 = iris.groupby('category').agg(value=iris.sepal_width.sum()).cache()
>>> expr2 = iris.groupby('category').agg(value=iris.sepal_length.mean()).cache()
>>> expr3 = iris.groupby('category').agg(value=iris.petal_length.min()).cache()
>>> expr = expr1.union(expr2).union(expr3)
>>> future = expr.execute(n_parallel=3, async=True, timeout=2)  # 并行和异步执行,2秒超时,返回Future对象
>>> future.result()

当同时执行多个 expr 时,我们可以用多线程执行,但会面临一个问题, 比如两个 DataFrame 有共同的依赖,这个依赖将会被执行两遍。现在我们提供了新的Delay API, 来将立即执行的操作(包括 execute、persist、head、tail、to_pandas,其他方法不支持)变成延迟操作,并返回 Future 对象。当用户触发delay执行的时候,会去寻找共同依赖,按用户给定的并发度执行,并支持异步执行。delay.execute 也接受async操作来指定是否异步执行,当异步的时候,也可以指定timeout参数来指定超时时间。

>>> from odps.df import Delay
>>> delay = Delay()  # 创建Delay对象
>>>
>>> df = iris[iris.sepal_width < 5].cache()  # 有一个共同的依赖
>>> future1 = df.sepal_width.sum().execute(delay=delay)  # 立即返回future对象,此时并没有执行
>>> future2 = df.sepal_width.mean().execute(delay=delay)
>>> future3 = df.sepal_length.max().execute(delay=delay)
>>> delay.execute(n_parallel=3)  # 并发度是3,此时才真正执行。
|==========================================|   1 /  1  (100.00%)        21s
>>> future1.result()
458.10000000000014
>>> future2.result()
3.0540000000000007

列运算

对于一个Sequence来说,对它加上一个常量、或者执行sin函数的这类操作时,是作用于每个元素上的。(类似于apply?)

NULL相关(isnull,notnull,fillna)

isnull用于判断是否某字段是NULL,notnull与isnull作用相同,但返回值相反,而fillna是将NULL值填充为用户指定的值。

from odps.df import DataFrame
iris = DataFrame(o.get_table('pyodps_iris'))
lens = DataFrame(o.get_table('pyodps_ml_100k_lens'))

iris.sepallength.isnull().head(5)

逻辑判断(ifelse,switch)

ifelse作用于boolean类型的字段,当条件成立时,返回第0个参数,否则返回第1个参数。
switch用于多条件判断的情况,参数形式为条件1,结果1,条件2,结果2…条件n,结果n。

(iris.sepallength > 5).ifelse('gt5', 'lte5').rename('cmp5').head(5)

# default参数含义为不满足任何条件时取的值
iris.sepallength.switch(4.9, 'eq4.9', 5.0, 'eq5.0', default='noeq').rename('equalness').head(5)

数学运算

对于数字类型的字段,支持+,-,*,/等操作,也支持log、sin等数学计算。
算术运算支持的操作包括:

  • 绝对值:abs
  • 平方根:sqr
  • 正弦和反正弦:sin、arcsin、sinh、arcsinh
  • 余弦和反余弦:cos、arccos、cosh、arccosh
  • 正切和反正切:tan、arctan、tanh、arctanh
  • 指数函数:exp、expm1(即exp(x)-1)
  • 对数:log、log2、log10、log1p(即log(x+1))
  • 弧度角度转换:redians(用角度计算弧度)、degrees(用弧度计算角度)
  • 取整:ceil(向上取整)、floor(向下取整)、trunc(截尾取整)

对于sequence,也支持与其它sequence和scalar的比较,同时也提供了between函数用于判断数据是否在某个区间内。

(iris.sepallength.between(3, 5)).head(5)
(iris.sepallength.between(3, 5, inclusive=False)).head(5)

String相关操作

string相关操作包括:

string 操作 说明
capitalize 首字母大写
contains 包含某个字符串,如果 regex 参数为 True,则是包含某个正则表达式,默认为 True
count 指定字符串出现的次数
endswith 以某个字符串结尾
startswith 以某个字符串开头
extract 抽取出某个正则表达式,如果 group 不指定,则返回满足整个 pattern 的子串;否则,返回第几个 group
find 返回第一次出现的子串位置,若不存在则返回-1
rfind 从右查找返回子串第一次出现的位置,不存在则返回-1
replace 将某个 pattern 的子串全部替换成另一个子串, n 参数若指定,则替换n次
get 返回某个位置上的字符串
len 返回字符串的长度
ljust 若未达到指定的 width 的长度,则在右侧填充 fillchar 指定的字符串(默认空格)
rjust 若未达到指定的 width 的长度,则在左侧填充 fillchar 指定的字符串(默认空格)
lower 变为全部小写
upper 变为全部大写
lstrip 在左侧删除空格(包括空行符)
rstrip 在右侧删除空格(包括空行符)
strip 在左右两侧删除空格(包括空行符)
split 将字符串按分隔符拆分为若干个字符串(返回 list 类型)
pad 在指定的位置(left,right 或者 both)用指定填充字符(用 fillchar 指定,默认空格)来对齐
repeat 重复指定 n
slice 切片操作
swapcase 对调大小写
title 同 str.title
zfill 长度没达到指定 width ,则左侧填充0
isalnum 同 str.isalnum
isalpha 同 str.isalpha
isdigit 是否都是数字,同 str.isdigit
isspace 是否都是空格,同 str.isspace
islower 是否都是小写,同 str.islower
isupper 是否都是大写,同 str.isupper
istitle 同 str.istitle
isnumeric 同 str.isnumeric
isdecimal 同 str.isdecimal
todict 将字符串按分隔符拆分为一个 dict,传入的两个参数分别为项目分隔符和 Key-Value 分隔符(返回 dict 类型)
strptime 按格式化读取成时间,时间格式和Python标准库相同,详细参考 Python 时间格式化

时间相关操作

对于datetime类型的Sequence或者Scalar,可以调用时间相关的内置函数,其本身也包含时间相关的。
与时间相关的属性包括:

时间相关属性 说明
year
month
day
hour 小时
minute 分钟
second
weekofyear 返回日期位于那一年的第几周。周一作为一周的第一天
weekday 返回日期当前周的第几天
dayofweek 同 weekday
strftime 格式化时间,时间格式和 Python 标准库相同,详细参考 Python 时间格式化

PyODPS同样也支持时间的加减操作

# pyodps支持的所有时间类型
from odps.df import year,month,day,hour,minute,second,millisecond
df.a - day(3) # 获取时间字段a三天前的日期
df.b - df.a   # 日期相减返回的是整数,即毫秒数

集合类型相关操作

PyODPS 支持的集合类型有List 和 Dict。这两个类型都可以使用下标获取集合中的某个元素。
支持的共有方法如下:

  • len :可获得集合的大小。
  • explode 方法:用于展开集合中的内容。对于 List,explode 默认返回一列,当传入参数 pos 时,将返回两列,其中一列为值在数组中的编号,而另一列则为元素,展开的元素按记录行顺序排列。对于 Dict,explode 会返回两列,分别表示 keys 及 values。explode 中也可以传入列名,作为最后生成的列。

List支持的独有方法如下:

  • contains(v):列表是否包含某个元素
  • sort:返回排序好的列表

Dict支持的独有方法如下:

  • keys:返回字典的所有key,返回值为list
  • values:返回字典的所有value,返回值为list

explode方法示例如下,同样也可以和并行多列输出结合:

>>> df
   id         a                            b
0   1  [a1, b1]  {'a2': 0, 'b2': 1, 'c2': 2}
1   2      [c1]           {'d2': 3, 'e2': 4}

# 按照下标和key来取值,如果key对应的值不存在,则返回结果的对应字段会返回NaN
df[df.id, df.a[0], df.b['b2']]

# 获取集合大小
df[df.id, df.a.len(), df.b.len()]

# 展开集合
df.a.explode()

# 展开集合并添加编号列,还可重新指定列名
df.a.explode(pos=True)
df.a.explode(['pos', 'value'], pos=True)
df.b.explode(['key', 'value'])

其它元素操作

# 判断Sequence中的元素是否在isin方法传入的集合里,而notin方法为反操作,均返回布尔值
iris.sepallength.isin([4.9, 5.1]).rename('sepallength').head(5)
iris.sepallength.notin([4.9, 5.1]).rename('sepallength').head(5)

# 将Sequence的数据根据传入的第一个参数进行打散
# include_under和include_over可以分别包括向下和向上的区间。
iris.sepallength.cut(range(6), labels=['0-1', '1-2', '2-3', '3-4', '4-5'], 
                     include_under=True, include_over=True).rename('sepallength_cut').head(5)

使用自定义函数

Dataframe支持对Sequence使用map,即对每个元素执行自定义函数,例如

# 如果map前后,Sequence的类型发生了变化,则需要显式指定map后的类型。
iris.sepallength.map(lambda x: 't'+str(x), 'string').head(5)

注意:目前,受限于 Python UDF,自定义函数无法支持将list/dict类型作为输入或输出。
如果在函数中包含闭包,需要注意的是,函数外闭包变量值的变化会引起函数内该变量值的变化。

查看下面三段代码

dfs = []
for i in range(10):
    dfs.append(df.sepal_length.map(lambda x: x + i))

dfs = []
def get_mapper(i):
    return lambda x: x + i
for i in range(10):
    dfs.append(df.sepal_length.map(get_mapper(i)))

import functools
dfs = []
for i in range(10):
    dfs.append(df.sepal_length.map(functools.partial(lambda v, x: x + v, i)))

for j in dfs:
    print(j.execute())

第一段代码中,当循环结束后,变量i的值固定在i=9,而SequenceExpr保存的是运算过程,lambda函数指向的是i,也就是值9,因此每次执行都是对Sequance里的每个元素+9,而并非是依次+0+1+2+…+9;
而第二和第三段代码中,由于是先把i的值传了进去,然后返回匿名函数,也就是说,每次返回的匿名函数都是不一样的,第一次返回lambda x:x+0,第二次返回lambda x:x+1,以此类推,因此在execute的时候,对Sequence里的每个元素依次执行了+0+1+2+…+9的操作。
【ps:不得不说这是真的细节】

map也支持使用现有的UDF函数,传入的参数是str类型(函数名)或者Function对象 。

引用资源

自定义函数也能读取ODPS上的资源(表资源或文件资源),或者引用一个collection作为资源。此时,自定义函数需要写成函数闭包或callable的类。

# 创建资源
>>> file_resource = o.create_resource('pyodps_iris_file', 'file', file_obj='Iris-setosa')
>>>
>>> iris_names_collection = iris.distinct('name')[:2]
>>> iris_names_collection
       sepallength
0      Iris-setosa
1  Iris-versicolor

>>> def myfunc(resources):  # resources按调用顺序传入
>>>     names = set()
>>>     fileobj = resources[0] # 传入的参数为list,因此通过下标进行取用
>>>     for l in fileobj:
>>>         names.add(l)
>>>     collection = resources[1]
>>>     for r in collection:
>>>         names.add(r.name)  # 这里可以通过字段名或者偏移来取
>>>     def h(x):
>>>         if x in names:
>>>             return True
>>>         else:
>>>             return False
>>>     return h
>>>
>>> df = iris.distinct('name')
>>> df = df[df.name,
>>>         df.name.map(myfunc, resources=[file_resource, iris_names_collection], rtype='boolean').rename('isin')]
>>>
>>> df
              name   isin
0      Iris-setosa   True
1  Iris-versicolor   True
2   Iris-virginica  False

使用第三方Python库

值得注意的是,第三方库的依赖库,也必须指定,否则依然会有导入错误。

# 下载所需的库和依赖库
pip download python-dateutil -d /to/path/

# 通过全局配置对第三方库进行使用
from odps import options
options.df.libraries = ['six.whl', 'python_dateutil.whl']

# 通过execute方法的libraries参数指定
df.datestr.map(get_year).execute(libraries=['six.whl', 'python_dateutil.whl'])

使用计数器

【留白,不知道是干嘛的】

调用ODPS内建或者已定义函数

要想调用ODPS上的内建或者已定义函数,来生成列,我们可以使用 func 接口,该接口默认函数返回值为 String, 可以用 rtype 参数指定返回值。

>>> from odps.df import func

# func.rand即调用rand函数
>>> iris[iris.name, func.rand(rtype='float').rename('rand')][:4]
>>> iris[iris.name, func.rand(10, rtype='float').rename('rand')][:4]
>>> # 调用 ODPS 上定义的 UDF,列名无法确定时需要手动指定
>>> iris[iris.name, func.your_udf(iris.sepalwidth, iris.sepallength, rtype='float').rename('new_col')]
>>> # 从其他 Project 调用 UDF,也可通过 name 参数指定列名
>>> iris[iris.name, func.your_udf(iris.sepalwidth, iris.sepallength, rtype='float', project='udf_project',
>>>                               name='new_col')]

聚合操作

常用聚合操作

注意:所有的聚合操作都会忽略空值,且目前受限于 Python UDF,自定义聚合无法支持将 list / dict 类型作为初始输入或最终输出结果。

from odps.df import DataFrame
iris = DataFrame(o.get_table('pyodps_iris'))

iris.describe() # 查看列的数量、最大值、最小值、平均值以及标准差,与pandas中的类似
iris.sepallength.max() # 查看最大值
iris.name.unique().cat(sep=',') # 去重后再进行聚合拼接所有值

iris.a.mean() # 如果每列都支持某种聚合操作,则可以直接对整个DataFrame进行聚合
iris.a.count() # 获取总行数
iris.a.size() # 获取总行数
iris.a.nunique() # 不重复值数量,等价于count distinct
iris.a.min() # 最小值
iris.a.max() # 最大值
iris.a.sum() # 求和
iris.a.mean() # 均值
iris.a.median() # 中位数
iris.a.quantile(p) # 分位数,仅在p为整数值下可取得准确值
iris.a.var() # 方差
iris.a.std() # 标准差
iris.a.moment() # n阶中心矩(n阶矩)
iris.a.skew() # 样本偏度(无偏估计)
iris.a.kurtosis() # 样本峰度(无偏估计)
iris.a.cat('sep') # 按分隔符sep做字符串连接操作 
iris.a.tolist() # 将Sequence组合为list

分组聚合

分组聚合的理解与SQL类似,先分组,再聚合,因此是先使用DataFrame的groupby来执行分组操作,然后调用agg或者aggregate方法来执行聚合操作。

iris.groupby('name').agg(iris.sepallength.max(), smin=iris.sepallength.min())

# 下面二者等价,即分组后count,并按从大到小排列
iris.groupby('name').agg(count=iris.name.count()).sort('count', ascending=False).head(5)
iris['name'].value_counts().head(5)

# 聚合后的单列可以直接取出列名。但此时只能使用聚合函数
iris.groupby('name').petallength.sum()

# 同样支持对常量进行分组,但需要使用Scalar初始化
from odps.df import Scalar
iris.groupby(Scalar(1)).petallength.sum()

编写自定义聚合

自定义聚合需要提供一个类,这个类需要提供以下方法:

  • buffer():返回一个mutable的object(比如 list、dict),buffer大小不应随数据而递增。
  • call(buffer, *val):将值聚合到中间 buffer。
  • merge(buffer, pbuffer):将 pbuffer 聚合到 buffer 中。
  • getvalue(buffer):返回最终值。
# 求平均数
class Agg(object):

    def buffer(self):
        return [0.0, 0]

    def __call__(self, buffer, val):
        buffer[0] += val
        buffer[1] += 1

    def merge(self, buffer, pbuffer):
        buffer[0] += pbuffer[0]
        buffer[1] += pbuffer[1]

    def getvalue(self, buffer):
        if buffer[1] == 0:
            return 0.0
        return buffer[0] / buffer[1]

# 使用自定义聚合
iris.sepalwidth.agg(Agg)
iris.sepalwidth.agg(Agg, 'float') # 指定数据类型(输入/输出?)
iris.groupby('name').sepalwidth.agg(Agg) # 分组聚合

# 对多列调用自定义聚合
from odps.df import agg
# 对两列调用自定义聚合,即将其作为buffer的元素
to_agg = agg([iris.sepalwidth, iris.sepallength], Agg, rtype='float')  
iris.groupby('name').agg(val=to_agg) # 分组后进行聚合

HyperLogLog计数

适用于海量数据的去重统计。

>>> df = DataFrame(pd.DataFrame({'a': np.random.randint(100000, size=100000)}))
>>> df.a.hll_count()
63270
>>> df.a.nunique()
63250

排序、去重、采样、数据变换

同样先进行声明

from odps.df import DataFrame
iris = DataFrame(o.get_table('pyodps_iris'))

排序

排序操作只能作用于Collection,等同于SQL中的order by

iris.sort('sepalwidth',ascending=False).head(5) # 设ascending值设为False为降序排列,默认为升序
iris.sort(-iris.sepalwidth).head(5) # 降序排列的另外一种方式,但笔者觉得可读性没第一种好

# 多字段排序,ascending可传入list来按顺序标识字段为升序还是降序
iris.sort(['sepalwidth', 'petallength'], ascending=[True, False]).head(5)
iris.sort(['sepalwidth', -iris.petallength]).head(5) # 上一语句等价变体

ODPS要求排序必须指定记录条数,所以在ODPS后端执行时, 会通过options.df.odps.sort.limit指定排序个数,这个值默认是10000, 如果要排序尽量多的数据,可以把这个值设到较大的值。不过注意,此时可能会导致 OOM。

去重

# 对Collection使用distinct方法
iris[['name']].distinct()
isrs.distinct('name')
iris.distinct('name', 'sepallength').head(3)

# 对Sequence使用unique方法,但调用unique的Sequence不能用于列选择中
iris.name.unique()
iris[iris.name, iris.name.unique()] # 错误用法

采样

除了按份数采样外,其余方法如果要在ODPS DataFrame上执行,需要Project支持XFlow,否则,这些方法只能在 Pandas DataFrame 后端上执行。【ps:已经习惯了残疾版dataframe了,还要啥自行车,又不是不能用(滑稽)】

# 按份数采样
iris.sample(parts=10)  # 分成10份,默认取第0份
iris.sample(parts=10, i=0)  # 手动指定取第0份
iris.sample(parts=10, i=[2, 5])   # 分成10份,取第2和第5份
iris.sample(parts=10, columns=['name', 'sepalwidth'])  # 根据name和sepalwidth的值做采样

数据缩放

即对数据进行归一化与标准化。

# 线性归一化,公式为 (X-X_min)/(X_max-X_min)
# feature参数指定的是输出的范围
# preserve参数设为True可用于保留原始列,结果列会新增一列
df.min_max_scale(columns=['fid'], feature=(-1,1), preserve=True)

# group参数指定一个或多个分组列,在分组列中分别取最值进行缩放
df.min_max_scale(columns=['fid'], group=['name'])

# 标准化,参数与min_max_scale一致,不再赘述
df.std_scale(columns=['fid'])

空值处理

# 删除空行
df.dropna(subset=['f1', 'f2', 'f3', 'f4']) # 取出subset中,包含空值的行
df.dropna(how='all', subset=['f1', 'f2', 'f3', 'f4']) # 只要行中有非空值则不删除,必须要全字段为空才删除
df.dropna(thresh=3, subset=['f1', 'f2', 'f3', 'f4']) # 指定行中至少有几个非空值,不符合标准的会被删掉

# 填充空值
df.fillna(100, subset=['f1', 'f2', 'f3', 'f4']) # 使用常数填充
df.fillna(df.f2, subset=['f1', 'f2', 'f3', 'f4']) # 使用指定列的值进行填充

# 向后填充及向前填充【我又一次被PyODPS DataFrame骚到了,pandas是按行向前后填充,而它是按列的顺序向前和向后ORZ】
df.fillna(method='bfill', subset=['f1', 'f2', 'f3', 'f4'])
df.fillna(method='ffill', subset=['f1', 'f2', 'f3', 'f4'])

对所有行/列调用自定义函数

要对一行数据使用自定义函数,可以使用 apply 方法,axis参数必须为 1,表示在行上操作。

# reduce=True,表示返回Sequence,False则返回Collection
# names参数可指定返回的列名(例子中没有写,因此只返回了相加后的结果列)
# types参数可指定返回的类型,如果不指定则默认返回String类型
>>> iris.apply(lambda row: row.sepallength + row.sepalwidth, axis=1, reduce=True, types='float').rename('sepaladd').head(3)
   sepaladd
0       8.6
1       7.9
2       7.9

# 当reduce=False时,可利用yield返回多行结果
>>> iris.count()
150
>>>
>>> def handle(row):
>>>     yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
>>>     yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
>>>
>>> iris.apply(handle, axis=1, names=['iris_add', 'iris_sub'], types=['float', 'float']).count()
300

# 还可以利用注释来标明返回的字段名和类型
>>> from odps.df import output
>>>
>>> @output(['iris_add', 'iris_sub'], ['float', 'float'])
>>> def handle(row):
>>>     yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
>>>     yield row.petallength - row.petalwidth, row.petallength + row.petalwidth

# 使用map-only实现等价操作
iris.map_reduce(mapper=handle)

# 调用已经存在的UDTF,只需要指定函数名即可
iris['name', 'sepallength'].apply('your_func', axis=1, names=['name2', 'sepallength2'], types=['string', 'float'])

要对所有列调用自定义聚合,参考上文的编写自定义聚合

MapReduce API

PyODPS DataFrame也支持MapReduce API,下面是WordCount的例子。

>>> def mapper(row):
>>>     for word in row[0].split():
>>>         yield word.lower(), 1
>>>
>>> def reducer(keys):
>>>     # 这里使用 list 而不是 cnt = 0,否则 h 内的 cnt 会被认为是局部变量,其中的赋值无法输出
>>>     cnt = [0]
>>>     def h(row, done):  # done表示这个key已经迭代结束
>>>         cnt[0] += row[1]
>>>         if done:
>>>             yield keys[0], cnt[0]
>>>     return h
>>>
>>> words_df.map_reduce(mapper, reducer, group=['word', ],
>>>                     mapper_output_names=['word', 'cnt'],
>>>                     mapper_output_types=['string', 'int'],
>>>                     reducer_output_names=['word', 'cnt'],
>>>                     reducer_output_types=['string', 'int'])
     word  cnt
0   hello    2
1       i    1
2      is    1
3    life    1
4  python    2
5   short    1
6     use    1
7   world    1

这里按笔者的理解来分析下这段代码,因为官方文档给的解释实在太少了【ps:大概是因为我菜所以看了五分钟才懂】。首先是mapper,输入的参数是row,即mapper会按行读取,row是一个list对象,因此需要以row[0]的方式取出该行数据,并按delimiter来做split,切分出的每个单词以返回,注意这里用的是yield。
然后是reducer,传入的是keys,即所有的word,cnt[0]作为一个计数器,在迭代不同key的时候对word出现的次数进行累加,这里的row[1]指的其实是在mapper中返回的kv对中的value,即那个1,如果中间有combiner,也可能是n(反正reducer的逻辑要与combiner相同)。而最终在reducer中的cnt[0]会将所有的结果累加起来,得到一个最终的结果,在这里并不关心前面是否有combiner处理过。done作为一个标识,应该是必定需要存在的,标志当前key的迭代完成,即以kv对的方式返回最终结果。
最后是driver,指定了mapper、reducer的输出字段及对应类型,指定了mapper和reducer方法,group存放用于分组的字段,如果不指定该参数,会对所有字段做分组。

实际上,写成callable类的方式会更为直观。

class reducer(object):
    def __init__(self, keys):
        self.cnt = 0

    def __call__(self, row, done):  # done表示这个key已经迭代结束
        self.cnt += row.cnt
        if done:
            yield row.word, self.cnt

也可以使用output注释

>>> from odps.df import output
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def mapper(row):
>>>     for word in row[0].split():
>>>         yield word.lower(), 1
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def reducer(keys):
>>>     # 这里使用 list 而不是 cnt = 0,否则 h 内的 cnt 会被认为是局部变量,其中的赋值无法输出
>>>     cnt = [0]
>>>     def h(row, done):  # done表示这个key已经迭代结束
>>>         cnt[0] += row.cnt
>>>         if done:
>>>             yield keys.word, cnt[0]
>>>     return h
>>>
>>> words_df.map_reduce(mapper, reducer, group='word')

如果需要在执行MapReduce的时候对数据进行排序,可以使用sort参数,参数值为需要用于排序的字段列表;而至于升降序则可以通过ascending参数指定,参数值可以是一个bool值,表示所有的字段均按升/降序进行排列,也可以传入一个与sort参数值长度相等的列表,通过bool值指定哪些字段需要升序,哪些需要降序。

指定combiner

combiner用于在map端执行reducer的操作,即在map端先对数据进行聚合,用法和reducer是完全一致的,但不能引用资源。 并且combiner的输出的字段名和字段类型必须和mapper完全一致。
就如上文所说,可以使用reducer作为combiner。

words_df.map_reduce(mapper, reducer, combiner=reducer, group='word')

引用资源

使用mapper_resources和reducer_resources指定引用的资源

white_list_file = o.create_resource('pyodps_white_list_words', 'file', file_obj='Python\nWorld')
<mapper>
<reducer>
words_df.map_reduce(mapper, reducer, group='word',
                    mapper_resources=[stop_words], reducer_resources=[white_list_file])

使用第三方Python库

参考前文使用第三方Python库

重排数据

有时数据在集群上分布可能是不均匀的,需要对数据重排。

# 默认会按随机数做哈希来分布,也可以指定按那些列做分布,且可以指定重排后的排序顺序。
df1 = df.reshuffle()
df1.reshuffle('name', sort='id', ascending=False)

布隆过滤器(Bloom Filter)

在PyODPS DataFrame中,bloom filter是通过seq2去过滤seq1的内容,不存在于seq2中的数据,就会被过滤,作为一个近似的方法,当数据量大的时候,可能有一些数据不能正确的被过滤。
可以传入capacity和error_rate来设置数据的量以及错误率,默认值是3000和0.01,调大capacity或者减小 error_rate会增加内存的使用,所以应当根据实际情况选择一个合理的值。

df1 = DataFrame(pd.DataFrame({'a': ['name1', 'name2', 'name3', 'name1'], 'b': [1, 2, 3, 4]}))
df2 = DataFrame(pd.DataFrame({'a': ['name1']}))
df1.bloom_filter('a', df2.a) # 这里第0个参数可以是个计算表达式如: df1.a + '1'

透视表(pivot_table)

df['A', 'D', 'E'].pivot_table(rows='A') # rows参数表示根据哪个字段来做聚合,默认求平均数
df.pivot_table(rows=['A', 'B', 'C']) # rows参数也可以传入字段列表
df.pivot_table(rows=['A', 'B'], values='D') # values参数用于指定要计算的字段,同样可以传入列表

# aggfunc指定聚合的函数
df.pivot_table(rows=['A', 'B'], values=['D'], aggfunc=['mean', 'count', 'sum'])

# columns参数的作用是,选择原始数据某一列的值,作为新的Collection的列,即行转列
df.pivot_table(rows=['A', 'B'], values='D', columns='C')

# 零值填充
df.pivot_table(rows=['A', 'B'], values='D', columns='C', fill_value=0)

Key-Value字符串变换

DataFrame提供了将Key-Value对展开为列,以及将普通列转换为Key-Value列的功能。

>>> df
    name               kv
0  name1  k1=1,k2=3,k5=10
1  name1    k1=7.1,k7=8.2
2  name2    k2=1.2,k3=1.5
3  name2      k9=1.1,k2=1

# 使用extract方法进行kv对的展开,columns参数传入需要展开的字段列表,kv_delim参数传入kv的连接符
# item_delim参数传入kv对的分隔符,fill_value参数用于填充缺失值
>>> df.extract_kv(columns=['kv'], kv_delim='=', item_delim=',', fill_value=0)
   name   kv_k1  kv_k2  kv_k3  kv_k5  kv_k7  kv_k9
0  name1    1.0    3.0    NaN   10.0    NaN    NaN
1  name1    7.0    NaN    NaN    NaN    8.2    NaN
2  name2    NaN    1.2    1.5    NaN    NaN    NaN
3  name2    NaN    1.0    NaN    NaN    NaN    1.1

>>> df
   name    k1   k2   k3    k5   k7   k9
0  name1  1.0  3.0  NaN  10.0  NaN  NaN
1  name1  7.0  NaN  NaN   NaN  8.2  NaN
2  name2  NaN  1.2  1.5   NaN  NaN  NaN
3  name2  NaN  1.0  NaN   NaN  NaN  1.1

# 使用to_kv方法将数据合成kv对,columns传入需要处理的字段列表
# kv_delim参数传入key和value的连接符,kv对分隔符默认为逗号
>>> df.to_kv(columns=['k1', 'k2', 'k3', 'k5', 'k7', 'k9'], kv_delim='=')
    name               kv
0  name1  k1=1,k2=3,k5=10
1  name1    k1=7.1,k7=8.2
2  name2    k2=1.2,k3=1.5
3  name2      k9=1.1,k2=1

数据合并

Join操作

DataFrame也支持对两个Collection执行 join 的操作。

from odps.df import DataFrame

movies = DataFrame(o.get_table('pyodps_ml_100k_movies'))
ratings = DataFrame(o.get_table('pyodps_ml_100k_ratings'))

# join中传入需要关联的dataframe,由于没有指定关联字段,DataFrame API会寻找名字相同的字段进行join
movies.join(ratings).head(3)

# 显式指定关联字段
movies.join(ratings, on='movie_id').head(3)
movies.join(ratings2, on=[movies.movie_id == ratings2.movie_id2]).head(3)

# 左关联
movies.left_join(ratings, on='movie_id').head(3)

# 右关联
movies.right_join(ratings, on='movie_id').head(3)

# 使用outer_join方法,通过指定left还是right来确定是左连接还是右连接


# 自关联
movies2 = movies.view() # view方法用于复制Collection
movies.join(movies2, movies.movie_id == movies2.movie_id)[movies, movies2.movie_id.rename('movie_id2')].head(3)

以上提到的方法都支持mapjoin,只需要在参数列表里加上mapjoin=True,即可对右表执行mapjoin;同时,join操作也支持来自不同数据源的dataframe,但计算操作会在ODPS上进行。

Union操作

当两个DataFrame字段与类型都相同的时候,可以使用union方法将两份数据合并

df1.union(df2) # 等价于union all
df1.union(df2,distinct=True) # 等价于union

窗口函数

grouped = iris.groupby('name')
grouped.mutate(grouped.sepallength.cumsum(), grouped.sort('sepallength').row_number()).head(10)

# 等价的SQL代码为
select name
       ,sum(sepallength) over(partition by name) as sepallength_sum
       ,row_number() over(partition by name order by sepallength) as rn
from   iris

DataFrame API支持的窗口函数包括:

窗口函数 说明
cumsum 计算累积和
cummean 计算累积均值
cummedian 计算累积中位数
cumstd 计算累积标准差
cummax 计算累积最大值
cummin 计算累积最小值
cumcount 计算累积和
lag 按偏移量取当前行之前第几行的值,如当前行号为rn,则取行号为rn-offset的值
lead 按偏移量取当前行之后第几行的值,如当前行号为rn则取行号为rn+offset的值
rank 计算排名
dense_rank 计算连续排名
percent_rank 计算一组数据中某行的相对排名
row_number 计算行号,从1开始
qcut 将分组数据按顺序切分成n片,并返回当前切片值,如果切片不均匀,默认增加第一个切片的分布
nth_value 返回分组中的第n个值
cume_dist 计算分组中值小于等于当前值的行数占分组总行数的比例

其中,rank、dense_rank、percent_rank和row_number支持下列参数:

参数名 说明
sort 排序关键字,默认为空
ascending 排序时,是否依照升序排序,默认 True

lag和lead除了rank的参数外,还支持下列参数:

参数名 说明
offset 取数据的行距离当前行的行数
default 当 offset 指定的行不存在时的返回值

而cumsum、cummax、cummin、cummean、cummedian、cumcount和cumstd除了rank的上述参数外,还支持下列参数:

参数名 说明
unique 是否排除重复值,默认 False
preceding 窗口范围起点
following 窗口范围终点

南航环境需要使用该参数才可以正常查看logview
【PyODPS完结撒花!】