数据上传下载概述
MaxCompute提供了两种数据上传下载的通道:
- DataHub实时数据通道:包含的工具有OGG插件、Flume插件、LogStash插件和Fluentd插件。
- Tunnel批量数据通道:包含的工具有MaxCompute客户端、DataWorks、DTS、Sqoop、Kettle插件以及MMA迁移工具。
DataHub和Tunnel各自也提供了SDK。
使用限制
- Tunnel Upload命令上传限制
- Tunnel命令不支持上传下载Array、Map和Struct类型的数据。
- 上传没有速度限制,上传速度的瓶颈为网络带宽以及服务器性能。
- 重传retry有次数的限制,当重传次数超过限制,就会继续上传下一个block。因此上传完成后,需要通过查询语句,检查数据是否有丢失(直接报失败它不香吗❓)。
- 一个项目下Tunnel支持并发的连接数默认上限为2000个。
- 每个Tunnel的Session在服务端的生命周期为24小时,创建后24小时内均可使用,也可以跨进程/线程共享使用,但是必须保证同一个BlockId没有重复使用。
- 当遇到并发写入时,MaxCompute会根据ACID进行并发写的保障。
- DataHub上传数据限制
- 每个字段的大小不能超过这个字段本身的限制。
说明 STRING的长度不能超过8M。 - 上传的过程中,会将多条数据打包成一个Package来进行上传。
- 每个字段的大小不能超过这个字段本身的限制。
- TableTunnel SDK接口限制
- BlockId的取值范围是[0, 20000),单个block上传的数据限制为100G。
- Session的超时时间为24小时。大批量数据传送导致超过24小时,需要自行拆分成多个Session。
- RecordWriter对应的HTTP Request超时时间为120s。如果120s内HTTP连接上没有数据流过,Service端会主动关闭连接。
数据通道服务连接
不同网络环境下需要选择不同的服务地址(Endpoint)来连接服务,否则将无法向服务发起请求,DataHub和Tunnel在不同网络环境场景下,所使用的EndPoint会有所区别。
数据上云的工具选择
Hadoop数据迁移
可使用MMA、Sqoop和DataWorks进行Hadoop数据迁移。
- 可使用DataWorks结合DataX进行Hadoop数据迁移。
- Sqoop执行时,会在原来的Hadoop集群上执行MR作业,可以分布式地将数据传输到MaxCompute上。
- MMA利用Meta Carrier连接您的Hive Metastore服务,抓取Hive Metadata,并利用这些数据生成用于创建MaxComputer表和分区的DDL语句以及用于迁移数据的Hive UDTF SQL。
(后续要在知识库中撰写对应的实施方案)
数据库数据同步
数据库的数据同步到MaxCompute需要根据数据库的类型和同步策略来选择相应的工具。
- 离线批量的数据库数据同步:可以使用DataWorks/Dataphin/DataX,但其底层的本质都是调用DataX。
- Oracle数据库数据实时同步时,可以选择OGG插件工具。
- RDS数据库数据实时同步时,可以选择DTS同步,详情请参见数据传输服务DTS。
(后续要在知识库中撰写对应的实施方案)
日志采集
日志采集时,您可以选用Flume、Fluentd、LogStash等工具。具体场景示例请参见Flume收集网站日志数据到MaxCompute和海量日志数据分析与应用。
(后续要在知识库中撰写对应的实施方案)
工具介绍
阿里云数加产品
MaxCompute客户端(Tunnel通道系列)
客户端基于[批量数据通道](https://help.aliyun.com/document_detail/27837.html)的SDK,实现了内置的Tunnel命令。
DataWorks数据集成/Dataphin同步任务
支持多种数据源,基于DataX及各类插件,主要用于离线批量数据同步。
DTS(Data Transmission)数据传输服务
DTS支持RDBMS(关系型数据库)、NoSQL、OLAP等多种数据源之间数据交互的数据服务。它提供了数据迁移、实时数据订阅及数据实时同步等多种数据传输功能,同时支持RDS、MySQL实例的数据实时同步MaxCompute表中。
开源产品
MaxCompute Sqoop(ODPS Sqoop)
odps-sqoop是基于社区sqoop 1.4.6版本开发的,增加了对MaxCompute的支持,详情参见[MaxCompute Sqoop](https://github.com/aliyun/aliyun-maxcompute-data-collectors/wiki/odps-sqoop)。
Kettle
Kettle是一款开源的ETL工具,只需要下载插件即可使用MaxCompute数据源的导入导出,详情参见[基于Kettle的MaxCompute插件实现数据上云](https://yq.aliyun.com/articles/68911?spm=a2c4g.11186623.2.18.728a361fWzPGLB)。
Flume(DataHub通道系列)
Apache Flume是一个分布式的、可靠的、可用的系统,可高效地从不同的数据源中收集、聚合和移动海量日志数据到集中式数据存储系统,支持多种Source和Sink插件。<br /> Apache Flume的DataHub Sink插件可以将日志数据实时上传到DataHub,并归档到MaxCompute表中。详情参见[flume_plugin](https://github.com/aliyun/aliyun-maxcompute-data-collectors/wiki/flume_plugin?spm=a2c4g.11186623.2.16.oSXp9R)。
Fluentd(DataHub通道系列)
Fluentd是一个开源的软件,用来收集各种源头日志(包括Application Log、Sys Log及Access Log)进行处理后并存储到不同的目标端。<br /> Fluentd的DataHub插件可以将日志数据实时上传到DataHub,并归档到MaxCompute表中。详情请参见 [Fluentd插件介绍](https://help.aliyun.com/document_detail/47450.html?spm=a2c4g.11186623.2.18.oSXp9R)。
LogStash(DataHub通道系列)
LogStash是一款开源日志收集处理框架,LogStash的DataHub插件可以将日志数据实时上传到DataHub,并归档到MaxCompute表中。具体示例请参见[Logstash + DataHub + MaxCompute和StreamCompute 进行实时数据分析](https://yq.aliyun.com/articles/61766?spm=a2c4g.11186623.2.19.oSXp9R)。
OGG(DataHub通道系列)
OGG的DataHub插件可以支持将Oracle数据库的数据实时地以增量方式同步到DataHub中,并最终归档到MaxCompute表中。详情请参见[基于OGG DataHub插件将Oracle数据同步上云](https://yq.aliyun.com/articles/66139?spm=a2c4g.11186623.2.20.oSXp9R)。
MMA迁移工具
MMA利用Meta Carrier连接用户的Hive Metastore服务,抓取用户的Hive Metadata,并利用这些数据生成用于创建MaxComputer表和分区的DDL语句以及用于迁移数据的Hive UDTF SQL。
(后续要在知识库中撰写对应的实施方案)
使用Tunnel命令上传下载数据
Tunnel命令使用说明
Tunnel命令功能
客户端提供的Tunnel命令主要用于数据的上传和下载等,其功能如下:
- Upload:上传本地数据至MaxCompute表中。支持文件或目录(指一级目录)的上传,每一次上传只支持数据上传到一张表或表的一个分区。分区表一定要指定上传的分区,多级分区一定要指定到末级分区。
- Download:下载MaxCompute表或指定Instance执行结果至本地。只支持下载到单个文件,每一次下载只支持下载一张表或一个分区到一个文件。分区表一定要指定下载的分区,多级分区一定要指定到末级分区。
- Resume:因为网络或Tunnel服务的原因造成上传出错,可以通过该命令对文件或目录进行续传。可以继续上一次的数据上传操作,但Resume命令暂时不支持下载操作。
- Show:显示历史任务信息。
- Purge:清理session目录,默认清理3天内的日志。
- help:获取帮助信息,每个命令和选择支持短命令格式。
Tunnel上传下载限制
- Tunnel功能及Tunnel SDK当前不支持外部表操作。您可以通过Tunnel直接上传数据到MaxCompute内部表,或者是通过OSS Python SDK上传到OSS后,在MaxCompute使用外部表做映射。
- Tunnel命令不支持上传下载ARRAY、MAP和STRUCT类型的数据。
- 每个Tunnel的Session在服务端的生命周期为24小时,创建后24小时内均可使用,也可以跨进程/线程共享使用,但是必须保证同一个BlockId没有重复使用。
- 如果您根据Session下载,请注意只有主账号创建的Session,可以被主账号及所属子账号下载。
使用Tunnel命令过程中遇到的各类问题请参见Tunnel命令相关问题。
Tunnel命令示例
--建表
CREATE TABLE IF NOT EXISTS sale_detail(
shop_name STRING,
customer_id STRING,
total_price DOUBLE)
PARTITIONED BY (sale_date STRING,region STRING);
--添加分区
alter table sale_detail add partition (sale_date='201312', region='hangzhou');
--准备需要上传的数据文件data.txt,内容如下所示,保存路径为d:\data.txt。
--这份文件的第三行数据与sale_detail的表定义不符。sale_detail定义了三列,但数据只有两列。
shopx,x_id,100
shopy,y_id,200
shopz,z_id
--使用upload命令上传文件,由于有脏数据,因此会上传失败
odps@ project_name>tunnel upload d:\data.txt sale_detail/sale_date=201312,region=hangzhou -s false
Upload session: 20150610xxxxxxxxxxx70a002ec60c
Start upload:d:\data.txt
Total bytes:41 Split input to 1 blocks
2015-06-10 16:39:22 upload block: '1'
ERROR: column mismatch -,expected 3 columns, 2 columns found, please check data or delimiter
--使用show命令查询历史任务信息
odps@ project_name>tunnel show history;
20150610xxxxxxxxxxx70a002ec60c failed 'u --config-file /D:/console/conf/odps_config.ini --project odpstest_ay52c_ay52 --endpoint http://service.cn-shanghai.maxcompute.aliyun.com/api --id UlxxxxxxxxxxxrI1 --key 2m4r3WvTxxxxxxxxxx0InVke7UkvR d:\data.txt sale_detail/sale_date=201312,region=hangzhou -s false'
--修改示例数据,删除最后一行后,执行resume命令修复执行上传数据。其中,20150610xxxxxxxxxxx70a002ec60c为上传失败的session ID。
odps@ project_name>tunnel resume 20150610xxxxxxxxxxx70a002ec60c --force;
start resume
20150610xxxxxxxxxxx70a002ec60c
Upload session: 20150610xxxxxxxxxxx70a002ec60c
Start upload:d:\data.txt
Resume 1 blocks
2015-06-10 16:46:42 upload block: '1'
2015-06-10 16:46:42 upload block complete, blockid=1
upload complete, average speed is 0 KB/s
OK
--执行download命令将表sale_detail上的数据下载至本地result.txt
$ ./tunnel download sale_detail/sale_date=201312,region=hangzhou result.txt;
Download session: 20150610xxxxxxxxxxx70a002ed0b9
Total records: 2
2015-06-10 16:58:24 download records: 2
2015-06-10 16:58:24 file size: 30 bytes
OK
下载Instance数据
- 方式一:使用tunnel download命令将执行结果下载到本地文件。
--执行SELECT语句查询表sale_detail
odps@ odps_test_project>select * from sale_detail;
ID = 20170724071705393ge3csfb8
... ...
--执行如下Tunnel命令下载执行结果到本地文件
odps@ odps_test_project>tunnel download instance://20170724071705393ge3csfb8 result;
2017-07-24 15:18:47 - new session: 2017072415184785b6516400090ca8 total lines: 8
2017-07-24 15:18:47 - file [0]: [0, 8), result
downloading 8 records into 1 file
2017-07-24 15:18:47 - file [0] start
2017-07-24 15:18:48 - file [0] OK. total: 44 bytes
download OK
--执行cat命令查看文件
cat result
- 方法二:通过配置参数使SQL查询默认采用InstanceTunnel方式输出执行结果。
打开use_instance_tunnel参数后,执行的查询会使用InstanceTunnel来下载结果,并且可以设置instance_tunnel_max_record参数的值来限定最大返回记录数,如果值为空则不受限制。
# download sql results by instance tunnel
use_instance_tunnel=true
# the max records when download sql results by instance tunnel
instance_tunnel_max_record=10000
或者,也可以通过在客户端中输入如下命令开启此功能
odps@ odps_test_tunnel_project>set console.sql.result.instancetunnel=true;
Tunnel命令参数表
批量数据通道SDK
主要接口 | 描述 |
---|---|
TableTunnel | 访问MaxCompute Tunnel服务的入口类。可以通过公网或者阿里云内网环境对MaxCompute及其Tunnel进行访问。当您在阿里云内网环境中使用Tunnel内网连接下载数据时,MaxCompute不会将该操作产生的流量计入计费。 |
TableTunnel.UploadSession | 表示一个向MaxCompute表中上传数据的会话。 |
TableTunnel.DownloadSession | 表示一个从MaxCompute表中下载数据的会话。 |
InstanceTunnel | 访问MaxCompute Tunnel服务的入口类。您可以通过公网或者阿里云内网环境对MaxCompute及其Tunnel进行访问。当您在阿里云内网环境中使用Tunnel内网连接下载数据时,MaxCompute不会将该操作产生的流量计入计费。 |
InstanceTunnel.DownloadSession | 表示一个向MaxCompute Instance中下载数据的会话。但只能用于下载以SELECT关键字开头的用于获取数据的SQL Instance。 |
数据集成导入导出数据
DataWorks数据集成读写MaxCompute数据底层功能为Tunnel功能。
操作步骤
- 在数据集成页面配置MaxCompute数据源。
- 在数据开发页面新建业务流程。
- 新建离线同步节点。
- 选择数据来源。
- 从MaxCompute导出数据时,数据来源选择MaxCompute,详细配置请参见配置MaxCompute Reader。
- 导入数据至MaxCompute时,数据来源选择其他数据源,详情请参见配置Reader。
- 选择数据去向。
- 导入数据至MaxCompute时,数据去向选择MaxCopute,详细配置请参见配置MaxCompute Writer。
- 从MaxCompute导出数据时,数据去向选择其他数据源,详情请参见配置Writer。
- 配置字段的映射关系。
- 配置通道控制。
- 配置调度属性。
- 配置完成后保存,单击上方
按钮完成数据的导入导出。
DataHub实时数据通道
DataHub是MaxCompute提供的流式数据处理(Streaming Data)服务,它提供流式数据的发布(Publish)和订阅 (Subscribe)的功能,并支持将流式数据归档至MaxCompute。
同时,DataHub提供了Java和Python两种语言的SDK。