数据上传下载概述

MaxCompute提供了两种数据上传下载的通道:

  • DataHub实时数据通道:包含的工具有OGG插件、Flume插件、LogStash插件和Fluentd插件。
  • Tunnel批量数据通道:包含的工具有MaxCompute客户端、DataWorks、DTS、Sqoop、Kettle插件以及MMA迁移工具。

DataHub和Tunnel各自也提供了SDK。

使用限制

  • Tunnel Upload命令上传限制
    • Tunnel命令不支持上传下载Array、Map和Struct类型的数据。
    • 上传没有速度限制,上传速度的瓶颈为网络带宽以及服务器性能。
    • 重传retry有次数的限制,当重传次数超过限制,就会继续上传下一个block。因此上传完成后,需要通过查询语句,检查数据是否有丢失(直接报失败它不香吗❓)。
    • 一个项目下Tunnel支持并发的连接数默认上限为2000个。
    • 每个Tunnel的Session在服务端的生命周期为24小时,创建后24小时内均可使用,也可以跨进程/线程共享使用,但是必须保证同一个BlockId没有重复使用。
    • 当遇到并发写入时,MaxCompute会根据ACID进行并发写的保障。
  • DataHub上传数据限制
    • 每个字段的大小不能超过这个字段本身的限制。
      说明 STRING的长度不能超过8M。
    • 上传的过程中,会将多条数据打包成一个Package来进行上传。
  • TableTunnel SDK接口限制
    • BlockId的取值范围是[0, 20000),单个block上传的数据限制为100G。
    • Session的超时时间为24小时。大批量数据传送导致超过24小时,需要自行拆分成多个Session。
    • RecordWriter对应的HTTP Request超时时间为120s。如果120s内HTTP连接上没有数据流过,Service端会主动关闭连接。

数据通道服务连接

不同网络环境下需要选择不同的服务地址(Endpoint)来连接服务,否则将无法向服务发起请求,DataHub和Tunnel在不同网络环境场景下,所使用的EndPoint会有所区别。

数据上云的工具选择

Hadoop数据迁移

可使用MMA、Sqoop和DataWorks进行Hadoop数据迁移。

  • 可使用DataWorks结合DataX进行Hadoop数据迁移。
  • Sqoop执行时,会在原来的Hadoop集群上执行MR作业,可以分布式地将数据传输到MaxCompute上。
  • MMA利用Meta Carrier连接您的Hive Metastore服务,抓取Hive Metadata,并利用这些数据生成用于创建MaxComputer表和分区的DDL语句以及用于迁移数据的Hive UDTF SQL。

(后续要在知识库中撰写对应的实施方案)

数据库数据同步

数据库的数据同步到MaxCompute需要根据数据库的类型和同步策略来选择相应的工具。

  • 离线批量的数据库数据同步:可以使用DataWorks/Dataphin/DataX,但其底层的本质都是调用DataX。
  • Oracle数据库数据实时同步时,可以选择OGG插件工具
  • RDS数据库数据实时同步时,可以选择DTS同步,详情请参见数据传输服务DTS

(后续要在知识库中撰写对应的实施方案)

日志采集

日志采集时,您可以选用Flume、Fluentd、LogStash等工具。具体场景示例请参见Flume收集网站日志数据到MaxCompute海量日志数据分析与应用
(后续要在知识库中撰写对应的实施方案)

工具介绍

阿里云数加产品

  • MaxCompute客户端(Tunnel通道系列)

    1. 客户端基于[批量数据通道](https://help.aliyun.com/document_detail/27837.html)的SDK,实现了内置的Tunnel命令。
  • DataWorks数据集成/Dataphin同步任务

    支持多种数据源,基于DataX及各类插件,主要用于离线批量数据同步。
    
  • DTS(Data Transmission)数据传输服务

    DTS支持RDBMS(关系型数据库)、NoSQL、OLAP等多种数据源之间数据交互的数据服务。它提供了数据迁移、实时数据订阅及数据实时同步等多种数据传输功能,同时支持RDS、MySQL实例的数据实时同步MaxCompute表中。
    

开源产品

  • MaxCompute Sqoop(ODPS Sqoop)

    odps-sqoop是基于社区sqoop 1.4.6版本开发的,增加了对MaxCompute的支持,详情参见[MaxCompute Sqoop](https://github.com/aliyun/aliyun-maxcompute-data-collectors/wiki/odps-sqoop)。
    
  • Kettle

    Kettle是一款开源的ETL工具,只需要下载插件即可使用MaxCompute数据源的导入导出,详情参见[基于Kettle的MaxCompute插件实现数据上云](https://yq.aliyun.com/articles/68911?spm=a2c4g.11186623.2.18.728a361fWzPGLB)。
    
  • Flume(DataHub通道系列)

    Apache Flume是一个分布式的、可靠的、可用的系统,可高效地从不同的数据源中收集、聚合和移动海量日志数据到集中式数据存储系统,支持多种Source和Sink插件。<br />      Apache Flume的DataHub Sink插件可以将日志数据实时上传到DataHub,并归档到MaxCompute表中。详情参见[flume_plugin](https://github.com/aliyun/aliyun-maxcompute-data-collectors/wiki/flume_plugin?spm=a2c4g.11186623.2.16.oSXp9R)。
    
  • Fluentd(DataHub通道系列)

    Fluentd是一个开源的软件,用来收集各种源头日志(包括Application Log、Sys Log及Access Log)进行处理后并存储到不同的目标端。<br />      Fluentd的DataHub插件可以将日志数据实时上传到DataHub,并归档到MaxCompute表中。详情请参见 [Fluentd插件介绍](https://help.aliyun.com/document_detail/47450.html?spm=a2c4g.11186623.2.18.oSXp9R)。
    
  • LogStash(DataHub通道系列)

    LogStash是一款开源日志收集处理框架,LogStash的DataHub插件可以将日志数据实时上传到DataHub,并归档到MaxCompute表中。具体示例请参见[Logstash + DataHub + MaxCompute和StreamCompute 进行实时数据分析](https://yq.aliyun.com/articles/61766?spm=a2c4g.11186623.2.19.oSXp9R)。
    
  • OGG(DataHub通道系列)

    OGG的DataHub插件可以支持将Oracle数据库的数据实时地以增量方式同步到DataHub中,并最终归档到MaxCompute表中。详情请参见[基于OGG DataHub插件将Oracle数据同步上云](https://yq.aliyun.com/articles/66139?spm=a2c4g.11186623.2.20.oSXp9R)。
    
  • MMA迁移工具

    MMA利用Meta Carrier连接用户的Hive Metastore服务,抓取用户的Hive Metadata,并利用这些数据生成用于创建MaxComputer表和分区的DDL语句以及用于迁移数据的Hive UDTF SQL。
    

(后续要在知识库中撰写对应的实施方案)

使用Tunnel命令上传下载数据

Tunnel命令使用说明

Tunnel命令功能

客户端提供的Tunnel命令主要用于数据的上传和下载等,其功能如下:

  • Upload:上传本地数据至MaxCompute表中。支持文件或目录(指一级目录)的上传,每一次上传只支持数据上传到一张表或表的一个分区。分区表一定要指定上传的分区,多级分区一定要指定到末级分区。
  • Download:下载MaxCompute表或指定Instance执行结果至本地。只支持下载到单个文件,每一次下载只支持下载一张表或一个分区到一个文件。分区表一定要指定下载的分区,多级分区一定要指定到末级分区。
  • Resume:因为网络或Tunnel服务的原因造成上传出错,可以通过该命令对文件或目录进行续传。可以继续上一次的数据上传操作,但Resume命令暂时不支持下载操作。
  • Show:显示历史任务信息。
  • Purge:清理session目录,默认清理3天内的日志。
  • help:获取帮助信息,每个命令和选择支持短命令格式。

Tunnel上传下载限制

  • Tunnel功能及Tunnel SDK当前不支持外部表操作。您可以通过Tunnel直接上传数据到MaxCompute内部表,或者是通过OSS Python SDK上传到OSS后,在MaxCompute使用外部表做映射。
  • Tunnel命令不支持上传下载ARRAY、MAP和STRUCT类型的数据。
  • 每个Tunnel的Session在服务端的生命周期为24小时,创建后24小时内均可使用,也可以跨进程/线程共享使用,但是必须保证同一个BlockId没有重复使用。
  • 如果您根据Session下载,请注意只有主账号创建的Session,可以被主账号及所属子账号下载。

使用Tunnel命令过程中遇到的各类问题请参见Tunnel命令相关问题

Tunnel命令示例

--建表
CREATE TABLE IF NOT EXISTS sale_detail(
      shop_name     STRING,
      customer_id   STRING,
      total_price   DOUBLE)
PARTITIONED BY (sale_date STRING,region STRING);

--添加分区
alter table sale_detail add partition (sale_date='201312', region='hangzhou');

--准备需要上传的数据文件data.txt,内容如下所示,保存路径为d:\data.txt。
--这份文件的第三行数据与sale_detail的表定义不符。sale_detail定义了三列,但数据只有两列。
shopx,x_id,100
shopy,y_id,200
shopz,z_id

--使用upload命令上传文件,由于有脏数据,因此会上传失败
odps@ project_name>tunnel upload d:\data.txt sale_detail/sale_date=201312,region=hangzhou -s false
Upload session: 20150610xxxxxxxxxxx70a002ec60c
Start upload:d:\data.txt
Total bytes:41   Split input to 1 blocks
2015-06-10 16:39:22     upload block: '1'
ERROR: column mismatch -,expected 3 columns, 2 columns found, please check data or delimiter

--使用show命令查询历史任务信息
odps@ project_name>tunnel show history;
20150610xxxxxxxxxxx70a002ec60c  failed  'u --config-file /D:/console/conf/odps_config.ini --project odpstest_ay52c_ay52 --endpoint http://service.cn-shanghai.maxcompute.aliyun.com/api --id UlxxxxxxxxxxxrI1 --key 2m4r3WvTxxxxxxxxxx0InVke7UkvR d:\data.txt sale_detail/sale_date=201312,region=hangzhou -s false'

--修改示例数据,删除最后一行后,执行resume命令修复执行上传数据。其中,20150610xxxxxxxxxxx70a002ec60c为上传失败的session ID。
odps@ project_name>tunnel resume 20150610xxxxxxxxxxx70a002ec60c --force;
start resume
20150610xxxxxxxxxxx70a002ec60c
Upload session: 20150610xxxxxxxxxxx70a002ec60c
Start upload:d:\data.txt
Resume 1 blocks 
2015-06-10 16:46:42     upload block: '1'
2015-06-10 16:46:42     upload block complete, blockid=1
upload complete, average speed is 0 KB/s
OK

--执行download命令将表sale_detail上的数据下载至本地result.txt
$ ./tunnel download sale_detail/sale_date=201312,region=hangzhou result.txt;
Download session: 20150610xxxxxxxxxxx70a002ed0b9
Total records: 2
2015-06-10 16:58:24     download records: 2
2015-06-10 16:58:24     file size: 30 bytes
OK

下载Instance数据

  • 方式一:使用tunnel download命令将执行结果下载到本地文件。
--执行SELECT语句查询表sale_detail
odps@ odps_test_project>select * from sale_detail;
ID = 20170724071705393ge3csfb8
... ...

--执行如下Tunnel命令下载执行结果到本地文件
odps@ odps_test_project>tunnel download instance://20170724071705393ge3csfb8 result;
2017-07-24 15:18:47  -  new session: 2017072415184785b6516400090ca8    total lines: 8
2017-07-24 15:18:47  -  file [0]: [0, 8), result
downloading 8 records into 1 file
2017-07-24 15:18:47  -  file [0] start
2017-07-24 15:18:48  -  file [0] OK. total: 44 bytes
download OK

--执行cat命令查看文件
cat result
  • 方法二:通过配置参数使SQL查询默认采用InstanceTunnel方式输出执行结果。

打开use_instance_tunnel参数后,执行的查询会使用InstanceTunnel来下载结果,并且可以设置instance_tunnel_max_record参数的值来限定最大返回记录数,如果值为空则不受限制。

# download sql results by instance tunnel
use_instance_tunnel=true
# the max records when download sql results by instance tunnel
instance_tunnel_max_record=10000

或者,也可以通过在客户端中输入如下命令开启此功能

odps@ odps_test_tunnel_project>set console.sql.result.instancetunnel=true;

Tunnel命令参数表

详见Tunnel命令参数表

批量数据通道SDK

主要接口 描述
TableTunnel 访问MaxCompute Tunnel服务的入口类。可以通过公网或者阿里云内网环境对MaxCompute及其Tunnel进行访问。当您在阿里云内网环境中使用Tunnel内网连接下载数据时,MaxCompute不会将该操作产生的流量计入计费。
TableTunnel.UploadSession 表示一个向MaxCompute表中上传数据的会话。
TableTunnel.DownloadSession 表示一个从MaxCompute表中下载数据的会话。
InstanceTunnel 访问MaxCompute Tunnel服务的入口类。您可以通过公网或者阿里云内网环境对MaxCompute及其Tunnel进行访问。当您在阿里云内网环境中使用Tunnel内网连接下载数据时,MaxCompute不会将该操作产生的流量计入计费。
InstanceTunnel.DownloadSession 表示一个向MaxCompute Instance中下载数据的会话。但只能用于下载以SELECT关键字开头的用于获取数据的SQL Instance。

详情参考接口定义与说明SDK示例

数据集成导入导出数据

DataWorks数据集成读写MaxCompute数据底层功能为Tunnel功能。

操作步骤

  1. 数据集成页面配置MaxCompute数据源
  2. 数据开发页面新建业务流程
  3. 新建离线同步节点
  4. 选择数据来源
    • 从MaxCompute导出数据时,数据来源选择MaxCompute,详细配置请参见配置MaxCompute Reader
    • 导入数据至MaxCompute时,数据来源选择其他数据源,详情请参见配置Reader
  5. 选择数据去向
    • 导入数据至MaxCompute时,数据去向选择MaxCopute,详细配置请参见配置MaxCompute Writer
    • 从MaxCompute导出数据时,数据去向选择其他数据源,详情请参见配置Writer
  6. 配置字段的映射关系
  7. 配置通道控制
  8. 配置调度属性
  9. 配置完成后保存,单击上方MaxCompute学习笔记 Part 7 数据上传下载 - 图1按钮完成数据的导入导出。

DataHub实时数据通道

DataHub是MaxCompute提供的流式数据处理(Streaming Data)服务,它提供流式数据的发布(Publish)和订阅 (Subscribe)的功能,并支持将流式数据归档至MaxCompute。
同时,DataHub提供了Java和Python两种语言的SDK。