安装指南
推荐使用pip安装。
修改pypi镜像源为阿里源
Windows环境
- Win+R调出运行,输入%APPDATA%,回车。
- 在弹出的目录下新建目录pip。
- 在pip目录下新建文件pip.ini,内容如下,修改好后保存。
[global]
trusted-host=mirrors.aliyun.com
index-url=http://mirrors.aliyun.com/pypi/simple/
Linux环境
输入以下命令,对pip.conf文件进行修改。
vi ~/.pip/pip.conf
在文件中添加或修改如下,保存退出。 ```shell [global] index-url = https://mirrors.aliyun.com/pypi/simple/
[install] trusted-host=mirrors.aliyun.com
<a name="pQs5T"></a>
## 安装依赖包
确保以下包的版本都达标。
```shell
pip install setuptools>=3.0
pip install requests>=2.4.0
pip install greenlet>=0.4.10 # 可选,安装后能加速 Tunnel 上传
pip install cython>=0.19.0
安装PyODPS并检查
依次键入以下命令安装PyODPS,并检查是否能正常调用。
pip install pyodps
python -c "from odps import ODPS"
基本操作
ODPS入口对象
在做所有操作之前,都需要先实例化一个ODPS入口对象,需要提供ak信息,项目空间及endpoint作为传入的参数。
odps = ODPS(access_id='**your-access-id**',
secret_access_key='**your-secret-access-key**',
project='**your-default-project**',
endpoint='**your-end-point**')
项目空间
获取项目空间
project = o.get_project('my_project') # 取到某个项目
project = o.get_project() # 取到默认项目
判断项目空间是否存在
if o.exist_project('my_project'):
<do something>
表
获取项目空间下的所有表
for table in o.list_tables(): # 返回的是可迭代对象
<do something>
判断表是否存在
if o.exist_table('my_project'):
<do something>
操作表对象
t = o.get_table('dual') # 通过传入表名获取表对象
t = o.get_table('dual', project='other_project') # 跨项目空间获取表对象
t.schema # 获取表的Schema
t.lifecycle # 获取表的生命周期
t.creation_time # 获取表的创建时间
t.is_virtual_view # 是否为视图,返回布尔值
t.size # 返回表占用的空间(但应该并非实际存储大小?有压缩和多两份backup的原因)
t.comment # 获取表注释
t.schema.columns # 获取所有字段信息,包含字段名与数据类型
t.schema['c_int_a'] # 获取指定字段信息,包含字段名与数据类型
t.schema['c_int_a'].comment # 获取指定字段的注释
t.schema.names # 获取非分区字段的字段名称
t.schema.types # 获取非分区字段的字段类型
创建表的Schema
创建表的Schema有两种方式
第一种方式是通过传入表字段信息及分区字段信息来初始化,推荐使用,可读性上来说会好很多。
from odps.models import Schema, Column, Partition
# 传入表字段信息,包括列名,字段类型及字段注释
columns = [Column(name='num', type='bigint', comment='the column'),
Column(name='num2', type='double', comment='the column2')]
# 传入分区字段信息
partitions = [Partition(name='pt', type='string', comment='the partition')]
# 将Column对象和Partition对象作为传入参数,实例化Schema对象
schema = Schema(columns=columns, partitions=partitions)
第二种方式是使用Schema.from_lists方法,但并不推荐,一个是可读性的问题,另一个是无法直接设置列和分区的注释,此处仍给出示例代码。
schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])
默认只允许使用 bigint、double、decimal、string、datetime、boolean、map 和 array 类型,如果要使用的数据类型为2.0新增的数据类型,则需要在代码中加入
from odps import options
options.sql.use_odps2_extension = True
创建表
可以使用表 schema 来创建表,也可以通过传入“字段名 字段类型”字符串来创建表
table = o.create_table('my_new_table', schema) # 传入表名和Schema对象
table = o.create_table('my_new_table', 'num bigint, num2 double') # 传入字符串
# 传入字符串创建分区表,注意此处将分区字段单独用了另一个字符串存放,普通字段与分区字段合成一个元组
table = o.create_table('my_new_table', ('num bigint, num2 double', 'pt string'))
此外,create_table方法可使用的传入参数如下表
传入参数 | 含义 |
---|---|
project | 传入项目空间名,如果不显式指定则表示使用ODPS对象的默认项目空间 |
comment | 指定表注释 |
if_not_exists | 如果该表不存在则创建,默认值为False |
lifecycle | 指定生命周期,但该参数在api里已标为弃用,如果该参数未指定值,则使用options.lifecycle |
shard_num | table’s shard num(尚未知具体作用) |
hub_lifestyle | hub lifecycle(尚未知具体作用) |
async_ | if True, will run asynchronously(尚未知具体作用) |
同步表更新
有时候,一个表可能被别的程序做了更新,比如schema有了变化。
table.reload()
行记录Record
Record表示表的一行记录,我们在 Table 对象上调用 new_record 就可以创建一个新的 Record。
t = o.get_table('mytable')
r = t.new_record(['val0', 'val1']) # 值的个数必须等于表schema的字段数
r2 = t.new_record() # 也可以不传入值
r2[0] = 'val0' # 可以通过偏移量设置值
r2['field1'] = 'val1' # 也可以通过字段名设置值
r2.field1 = 'val1' # 通过属性设置值,与上一种方法等价
print(record[0]) # 取第0个位置的值
print(record['c_double_a']) # 通过字段取值
print(record.c_double_a) # 通过属性取值
print(record[0: 3]) # 切片操作,取位置为0-3(不含3)的字段值
print(record[0, 2, 3]) # 取多个位置的值
print(record['c_int_a', 'c_double_a']) # 通过多个字段取值
获取表数据
如果只是查看每个表的开始的小于1万条数据,则可以使用head方法,或者使用切片的方式。
t = o.get_table('dual')
for record in t.head(3):
# 处理每个Record对象
for record in t[:3]:
# 处理每个Record对象
使用with表达式的写法
with t.open_reader(partition='pt=test') as reader:
count = reader.count
for record in reader[5:10] # 可以执行多次,直到将count数量的record读完,这里可以改造成并行操作
<do something>
不使用with表达式的写法
reader = t.open_reader(partition='pt=test')
count = reader.count
for record in reader[5:10] # 可以执行多次,直到将count数量的record读完,这里可以改造成并行操作
<do something>
reader.close() # 不使用with的话就需要手动关闭句柄
使用ODPS对象的read_table方法
for record in o.read_table('test_table', partition='pt=test'):
<do something>
向表写数据
使用 with 表达式的写法,不使用with的写法同reader,如果分区不存在,可以使用create_partition参数指定创建分区,如。
with t.open_writer(partition='pt=test',create_parition=True) as writer:
records = [[111, 'aaa', True], # 这里可以是list
[222, 'bbb', False],
[333, 'ccc', True],
[444, '中文', False]]
writer.write(records) # 这里records可以是可迭代对象
records = [t.new_record([111, 'aaa', True]), # 也可以是Record对象
t.new_record([222, 'bbb', False]),
t.new_record([333, 'ccc', True]),
t.new_record([444, '中文', False])]
writer.write(records)
ps:尽可能的用list的append方法去构建records,而不是用pandas的DataFrame的append方法,后者会非常吃性能。
使用ODPS对象的write_table方法
records = [[111, 'aaa', True], # 这里可以是list
[222, 'bbb', False],
[333, 'ccc', True],
[444, '中文', False]]
o.write_table('test_table', records, partition='pt=test', create_partition=True)
注意:每次调用 write_table,MaxCompute 都会在服务端生成一个文件。这一操作需要较大的时间开销,同时过多的文件会降低后续的查询效率。因此,建议在使用write_table方法时,一次性写入多组数据,或者传入一个 generator对象。
write_table 写表时会追加到原有数据。PyODPS 不提供覆盖数据的选项,如果需要覆盖数据,需要手动清除 原有数据。对于非分区表,需要调用 table.truncate(),对于分区表,需要删除分区后再建立。
删除表
o.delete_table('my_table_name', if_exists=True) # 只有表存在时删除
t.drop() # Table对象存在的时候可以直接执行drop函数
创建DataFrame
注意,这里创建的是odps的DataFrame,并非是Pandas的DataFrame
table = o.get_table('my_table_name')
df = table.to_df()
分区表
if table.schema.partitions:
<do something> # 判断是否为分区表
for partition in table.partitions:
<do something> # 遍历表全部分区
for partition in table.iterate_partitions(spec='pt=test'):
<do something> # 用于遍历表的二级分区
table.exist_partition('pt=test,sub=2015') # 判断分区是否存在,如果有多级分区则需要按序全部填写
table.exist_partitions('pt=test') # 判断给定前缀的分区是否存在
partition = table.get_partition('pt=test') # 获取分区
partition.creation_time # 获取分区创建时间
partition.size # 获取分区所占空间
t.create_partition('pt=test', if_not_exists=True) # 创建分区,仅当不存在的时候才创建
t.delete_partition('pt=test', if_exists=True) # 删除分区,存在的时候才删除
partition.drop() # 删除分区,当Partition对象存在的时候直接drop
数据上传下载通道
上传
from odps.tunnel import TableTunnel
# 获取表对象
table = o.get_table('my_table')
# 获取Tunnel对象,创建上传会话
tunnel = TableTunnel(odps)
upload_session = tunnel.create_upload_session(table.name, partition_spec='pt=test')
# 组装record对象进行写操作
with upload_session.open_record_writer(0) as writer:
record = table.new_record()
record[0] = 'test1'
record[1] = 'id1'
writer.write(record)
record = table.new_record(['test2', 'id2'])
writer.write(record)
# 提交事务,参数为block id列表对象,id与writer中的相同
upload_session.commit([0])
下载
from odps.tunnel import TableTunnel
# 获取Tunnel对象,创建下载会话
tunnel = TableTunnel(odps)
download_session = tunnel.create_download_session('my_table', partition_spec='pt=test')
with download_session.open_record_reader(0, download_session.count) as reader:
for record in reader:
# 处理每条记录
SQL
并非所有在ODPS Console中可以执行的命令都是ODPS可以接受的SQL语句。 在调用非DDL/DML语句时,请使用其他方法,例如GRANT/REVOKE等语句请使用run_security_query方法,PAI 命令需要使用run_xflow或execute_xflow方法。
执行SQL
执行SQL有两种方式,同步和异步,且execute_sql和run_sql返回的对象均为任务实例(instance对象)。
o.execute_sql('select * from dual') # 同步的方式执行,会阻塞直到SQL执行完成
instance = o.run_sql('select * from dual') # 异步的方式执行
print(instance.get_logview_address()) # 获取logview地址
instance.wait_for_success() # 阻塞直到完成
设置运行参数
设置运行参数有两种级别,会话级别(当前会话全局有效)和语句运行时有效。
# 设置运行时参数
o.execute_sql('select * from pyodps_iris', hints={'odps.sql.mapper.split.size': 16})
# 设置会话级全局参数
from odps import options
options.sql.settings = {'odps.sql.mapper.split.size': 16}
o.execute_sql('select * from pyodps_iris') # 会根据全局配置添加hints
读取SQL执行结果
读取SQL执行结果有两种情况(ps:怎么又是两种>,<),一种是返回结构化数据(如执行查询语句),一种是返回非结构化数据(如desc)。
# 返回结构化数据
with o.execute_sql('select * from dual').open_reader() as reader:
for record in reader:
# 处理每一个record
# 返回非结构化数据
with o.execute_sql('desc dual').open_reader() as reader:
print(reader.raw)
如果options.tunnel.use_instance_tunnel=True,在调用 open_reader 时,PyODPS会默认调用Instance Tunnel, 否则会调用旧的Result接口。如果你使用了版本较低的MaxCompute服务,或者调用Instance Tunnel出现了问题,PyODPS会给出警告并自动降级到旧的Result接口,可根据警告信息判断导致降级的原因。如果Instance Tunnel 的结果不合预期,请将该选项设为False。在调用open_reader时,也可以使用tunnel参数来指定使用何种结果接口,例如
# tunnel参数为True时使用Instance Tunnel接口,为False时使用Result接口
with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
for record in reader:
# 处理每一个record
注意:
- PyODPS默认不限制能够从 Instance 读取的数据规模,但对于受保护的Project,通过Tunnel下载数据会受限。
- 如果options.tunnel.limit_instance_tunnel未设置,会自动打开数据量限制,此时可下载的数据条数受到 Project 配置限制,通常该限制为 10000 条。如果想要手动限制下载数据的规模,可以为open_reader方法增加 limit 选项,或者设置 options.tunnel.limit_instance_tunnel = True 。
- 如果所使用的 MaxCompute只能支持旧 Result 接口,同时需要读取所有数据,可将SQL结果写入另一张表后用读表接口读取(可能受到 Project 安全设置的限制)。
设置alias
有时在运行时,比如某个UDF引用的资源是动态变化的,可以alias旧的资源名到新的资源,这样免去了重新删除并重新创建UDF的麻烦。(get_cache_file方法含义不明)。
from odps.models import Schema
# 构建函数代码,该函数的作用是读取文件资源中的值,然后与传入参数相加并返回结果
myfunc = '''\
from odps.udf import annotate
from odps.distcache import get_cache_file
@annotate('bigint->bigint')
class Example(object):
def __init__(self):
# get_cache_file的含义不明
self.n = int(get_cache_file('test_alias_res1').read())
def evaluate(self, arg):
return arg + self.n
'''
# 创建file类型的资源,该文件只有一个字符'1'
res1 = o.create_resource('test_alias_res1', 'file', file_obj='1')
# 根据myfunc中的代码创建py类型的资源
o.create_resource('test_alias.py', 'py', file_obj=myfunc)
# 创建函数,指定类和需要引用的资源
o.create_function('test_alias_func',
class_type='test_alias.Example',
resources=['test_alias.py', 'test_alias_res1'])
# 创建测试表,只有一个数据类型为bigint的字段
table = o.create_table(
'test_table',
schema=Schema.from_lists(['size'], ['bigint']),
if_not_exists=True
)
# 写入一行数据,只包含一个值1
data = [[1, ], ]
o.write_table(table, 0, [table.new_record(it) for it in data])
# 执行sql,调用创建好的函数并打印结果
with o.execute_sql(
'select test_alias_func(size) from test_table').open_reader() as reader:
print(reader[0][0])
# 输出结果为 2
res2 = o.create_resource('test_alias_res2', 'file', file_obj='2')
# 把内容为1的资源映射成内容为2的资源,不需要修改UDF或资源
with o.execute_sql(
'select test_alias_func(size) from test_table',
aliases={'test_alias_res1': 'test_alias_res2'}).open_reader() as reader:
print(reader[0][0])
设置biz_id
在少数情形下,可能在提交 SQL 时,需要同时提交 biz_id,否则执行会报错。此时,你可以设置全局 options 里的 biz_id。(wdnmd啥时候才需要提交biz_id?)
from odps import options
options.biz_id = 'my_biz_id'
o.execute_sql('select * from pyodps_iris')
任务实例
获取项目空间下所有实例
for instance in o.list_instances():
print(instance.id)
判断实例是否存在
o.exist_instance('my_instance_id')
获取指定实例
o.get_instance('my_instance_id')
停止实例
# 调用instance对象的stop方法
instance_obj = o.get_instance('my_instance_id')
instance_obj.stop()
# 调用ODPS入口对象的stop_instance方法
o.stop_instance('my_instance_id')
获取LogView地址
对于SQL等任务,可以直接调用instance对象的方法即可
# 从已有的instance对象获取Logview地址
instance = o.run_sql('desc pyodps_iris')
print(instance.get_logview_address())
# 从instance id获取Logview地址
instance = o.get_instance('2016042605520945g9k5pvyi2')
print(instance.get_logview_address())
对于 XFlow 任务,需要枚举其子任务,再获取子任务的 LogView:
instance = o.run_xflow('AppendID', 'algo_public',
{'inputTableName': 'input_table', 'outputTableName': 'output_table'}
)
for sub_inst_name, sub_inst in o.get_xflow_sub_instances(instance).items():
print('%s: %s' % (sub_inst_name, sub_inst.get_logview_address()))
在专有云环境,需要设置host才能正常获取logview
from odps import options
options.log_view_host = 'http://logview.cn-gz-csa-d01.odps.inner.y.csair.com:9000'
任务实例状态
一个instance的状态可以是Running、Suspended 或者Terminated,用户可以通过 status 属性来获取状态。 is_terminated方法返回当前instance是否已经执行完成,is_successful方法返回当前instance是否正确完成执行, 任务处于运行中或者执行失败都会返回False。
# 通过status属性来获取状态
>>> instance = o.get_instance('2016042605520945g9k5pvyi2')
>>> instance.status
<Status.TERMINATED: 'Terminated'>
>>> from odps.models import Instance
>>> instance.status == Instance.Status.TERMINATED
True
>>> instance.status.value
'Terminated'
# 调用instance对象方法
instance.is_terminated() # 等价于line 6的判断
instance.is_successful()
子任务操作
一个Instance真正运行时,可能包含一个或者多个子任务,我们称为Task。
# 获取实例下所有的Task,返回一个所有子任务的名称列表。
>>> instance.get_task_names()
['SQLDropTableTask']
# 获取Task的执行结果
>>> instance.get_task_result('AnonymousSQLTask')
'"sepallength","sepalwidth","petallength","petalwidth","name"\n5.1,3.5,1.4,0.2,"Iris-setosa"\n'
>>> instance.get_task_results()
OrderedDict([('AnonymousSQLTask',
'"sepallength","sepalwidth","petallength","petalwidth","name"\n5.1,3.5,1.4,0.2,"Iris-setosa"\n')])
# 获取Task当前的执行进度
while not instance.is_terminated():
for task_name in instance.get_task_names():
print(instance.id, instance.get_task_progress(task_name).get_stage_progress_formatted_string())
time.sleep(10)
20160519101349613 gzbzufck2 2016-05-19 18:14:03 M1_Stg1_job0:0/1/1[100%]
资源
基本操作
o.list_resources(project=None) # 列出所有资源,可指定项目空间
o.delete_resource(name, project=None) # 删除资源,也可调用Resource对象的drop方法进行删除
o.exist_resource(name, project=None) # 判断资源是否存在
文件资源
文件资源支持以多种方式进行打开:
- r,读模式,只能打开不能写
- w,写模式,只能写入而不能读文件,注意用写模式打开,文件内容会被先清空
- a,追加模式,只能写入内容到文件末尾
- r+,读写模式,能任意读写内容
- w+,类似于 r+,但会先清空文件内容
- a+,类似于 r+,但写入时只能写入文件末尾
- 以二进制模式打开,如打开压缩文件, 因此 rb 就是指以二进制读模式打开文件,r+b 是指以二进制读写模式打开。
```python
创建文件资源,指定资源名称,资源类型,文件对象
resource = o.create_resource(‘test_file_resource’, ‘file’, file_obj=open(‘/to/path/file’)) # 使用file-like的对象 resource = o.create_resource(‘test_py_resource’, ‘py’, file_obj=’import this’) # 使用字符串
读取文件资源,类似于Python内置的open方法
with resource.open(‘r’) as fp: # 以读模式打开 content = fp.read() # 读取全部的内容 fp.seek(0) # 回到资源开头 lines = fp.readlines() # 读成多行
使用odps入口对象的open_resource方法读取
with o.open_resource(‘test_file_resource’, mode=’r+’) as fp: # 读写模式打开 fp.read() fp.tell() # 当前位置 fp.seek(10) fp.truncate() # 截断后面的内容 fp.writelines([‘Hello\n’, ‘World\n’]) # 写入多行 fp.write(‘Hello World’) fp.flush() # 手动调用会将更新提交到ODPS
<a name="Gm8qs"></a>
### 表资源
```python
# 创建表资源,指定资源名称,资源类型,表名,分区
o.create_resource('test_table_resource', 'table', table_name='my_table', partition='pt=test')
# 更新表资源
table_resource = o.get_resource('test_table_resource')
table_resource.update(partition='pt=test2', project_name='my_project2')
# 获取表或分区
table_resource = o.get_resource('test_table_resource')
table = table_resource.table
print(table.name)
partition = table_resource.partition
print(partition.spec)
# 对表对象写入记录行
table_resource = o.get_resource('test_table_resource')
with table_resource.open_writer() as writer:
writer.write([0, 'aaaa'])
writer.write([1, 'bbbbb'])
# 读取表对象
with table_resource.open_reader() as reader:
for rec in reader:
print(rec)
函数
# 基本操作
o.list_functions(project=None) # 列出所有函数,可指定项目空间
o.delete_function(name, project=None) # 删除函数,也可调用Function对象的drop方法进行删除
o.exist_function(name, project=None) # 判断函数是否存在
# 引用资源创建函数,project参数用于引用其他project 中的资源
resource = o.get_resource('my_udf.py', project='another_project')
function = o.create_function('test_function2', class_type='my_udf.Test', resources=[resource])
# 更新函数
function.update()
XFlow和模型
(ps:等到看ML的时候再来补充吧,不想纯抄书)
配置选项
PyODPS 提供了一系列的配置选项,可通过odps.options进行配置
from odps import options
# 设置所有输出表的生命周期(lifecycle 选项)
options.lifecycle = 30
完整的配置参数列表参见PyODPS配置选项。