Doris数据导入
1.数据导入总体说明
Doris的数据导入实现有以下共性特征,这里分别介绍,以帮助大家更好的使用数据导入功能
原子性保证
Doris的每一个导入作业,不论是使用 Broker Load 进行批量导入,还是使用 INSERT 语句进行单条导入,都是一个完整的事务操作。导入事务可以保证一批次内的数据原子生效,不会出现部分数据写入的情况。
同时,一个导入作业都会有一个 Label。这个 Label 是在一个数据库(Database)下唯一的,用于唯一标识一个导入作业。Label 可以由用户指定,部分导入功能也会由系统自动生成。
Label 是用于保证对应的导入作业,仅能成功导入一次。一个被成功导入的 Label,再次使用时,会被拒绝并报错 Label already used
。通过这个机制,可以在Doris 测做到 At-Most-Once
语义。如果结合上游系统的 At-Least-Once
语义,则可以实现导入数据的 Exactly-Once
语义。
关于原子性保证的最佳实践,可以参阅 导入事务和原子性。
同步和异步
导入方式分为同步和异步。对于同步导入方式,返回结果即表示导入成功还是失败。而对于异步导入方式,返回成功仅代表作业提交成功,不代表数据导入成功,需要使用对应的命令查看导入作业的运行状态。
支持的数据格式
不同的导入方式支持的数据格式略有不同。
导入方式 | 支持的格式 |
---|---|
Broker Load | Parquet,ORC,csv,gzip |
Stream Load | csv, gzip, json |
Routine Load | csv, json |
2.导入本地数据
Stream Load 用于将本地文件导入到Doris 中。
不同于其他命令的提交方式,Stream Load 是通过 HTTP 协议与Doris 进行连接交互的。
该方式中涉及 HOST:PORT 应为 HTTP 协议端口。
- 公有云用户必须使用 Compute Node(BE)的 HTTP 协议端口,默认为 8040。
- 私有化部署用户可以使用 Leader Node(FE)的 HTTP 协议端口,默认为 8030。但须保证客户端所在机器网络能够联通 Compute Node 所在机器。
本文文档我们以 curl 命令为例演示如何进行数据导入。
文档最后,我们给出一个使用 Java 导入数据的代码示例。
导入数据
Stream Load 的请求体如下:
PUT /api/{db}/{table}/_stream_load
- 创建一张表
通过CREATE TABLE
命令创建一张表用于存储待导入的数据。具体的导入方式请查阅 CREATE TABLE 命令手册。示例如下:CREATE TABLE IF NOT EXISTS load_test
(
id INT,
name VARCHAR(128)
)
DISTRIBUTED BY HASH(id) BUCKETS 8;
- 导入数据
执行以下 curl 命令导入本地文件:curl -u user:passwd -H "label:example_label_1" -T /path/to/local/your_file.txt http://host:port/api/example_db/load_test/_stream_load
- user:passwd 为在Doris 中创建的用户。初始用户为 admin,密码为创建Doris 集群时设置的密码。
- host:port 为 Compute Node 的 HTTP 协议端口,默认是 8040,可以在智能云Doris 集群详情页面查看。
- label: 可以在 Header 中指定 Label 唯一标识这个导入任务。
- 等待导入结果
Stream Load 命令是同步命令,返回成功即表示导入成功。如果导入数据较大,可能需要较长的等待时间。示例如下:{
"TxnId": 1003,
"Label": "example_label_1",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 1000000,
"NumberLoadedRows": 1000000,
"NumberFilteredRows": 1,
"NumberUnselectedRows": 0,
"LoadBytes": 40888898,
"LoadTimeMs": 2144,
"BeginTxnTimeMs": 1,
"StreamLoadPutTimeMs": 2,
"ReadDataTimeMs": 325,
"WriteDataTimeMs": 1933,
"CommitAndPublishTimeMs": 106,
"ErrorURL": "http://192.168.1.1:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005"
}
Status
字段状态为Success
即表示导入成功。- 其他字段的详细介绍,请参阅 Stream Load 命令文档。
使用建议
- Stream Load 只能导入本地文件。
- 建议一个导入请求的数据量控制在 1 GB 以内。如果有大量本地文件,可以分批并发提交。
Java 代码示例
这里通过一个简单的 JAVA 示例来执行 Stream Load:
package demo.doris;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.reflect.Type;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
public class PaloStreamLoadDemo {
private final static String HOST = "127.0.0.1"; // Compute Node host
private final static int PORT = 8040; // Compute Node HTTP port
private static final String STREAM_LOAD_URL_PATTERN = "http://%s:%d/api/%s/%s/_stream_load";
private final static String DB = "example_db";
private final static String TABLE = "example_tbl";
private final static String USER = "user";
private final static String PASSWD = "passwd";
// local file to be loaded
private final static String LOAD_FILE = "./data.txt";
public static void main(String[] args) throws Exception {
streamLoad();
}
private static void streamLoad() throws Exception {
String loadUrlStr = String.format(STREAM_LOAD_URL_PATTERN, HOST, PORT, DB, TABLE);
URL loadUrl = new URL(loadUrlStr);
HttpURLConnection conn = (HttpURLConnection) loadUrl.openConnection();
conn.setRequestMethod("PUT");
String auth = String.format("%s:%s", USER, PASSWD);
String authEncoding = Base64.getEncoder().encodeToString(auth.getBytes(StandardCharsets.UTF_8));
conn.setRequestProperty("Authorization", "Basic " + authEncoding);
conn.addRequestProperty("Expect", "100-continue");
conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
// set header.
// your add add any other headers here.
conn.addRequestProperty("column_separator", ",");
conn.addRequestProperty("label", "example_label");
conn.setDoOutput(true);
conn.setDoInput(true);
// read and send file content
File loadFile = new File(LOAD_FILE);
try (BufferedOutputStream bos = new BufferedOutputStream(conn.getOutputStream());
BufferedInputStream bis = new BufferedInputStream(new FileInputStream(loadFile));) {
int i;
while ((i = bis.read()) > 0) {
bos.write(i);
}
}
// get response
int status = conn.getResponseCode();
String respMsg = conn.getResponseMessage();
System.out.println("get status: " + status + ", response msg: " + respMsg);
// parse the response json
InputStream stream = (InputStream) conn.getContent();
BufferedReader br = new BufferedReader(new InputStreamReader(stream));
StringBuilder sb = new StringBuilder();
String line;
while ((line = br.readLine()) != null) {
sb.append(line);
}
Type type = new TypeToken<SubmitResult>() {
}.getType();
SubmitResult result = new Gson().fromJson(sb.toString(), type);
System.out.println("Get result status: " + result.Status);
}
// The response json class
public static class SubmitResult {
public String TxnId;
public String Label;
public String Status;
public String ExistingJobStatus;
public String Message;
public String NumberTotalRows;
public String NumberLoadedRows;
public String NumberFilteredRows;
public String NumberUnselectedRows;
public String LoadBytes;
public String LoadTimeMs;
public String BeginTxnTimeMs;
public String StreamLoadPutTimeMs;
public String ReadDataTimeMs;
public String WriteDataTimeMs;
public String CommitAndPublishTimeMs;
public String ErrorURL;
}
}
3.订阅Kafka日志
用户可以通过提交例行导入作业,直接订阅Kafka中的消息数据,以近实时的方式进行数据同步。
Doris自身能够保证不丢不重的订阅 Kafka 中的消息,即 Exactly-Once
消费语义。
自建 Kafka 服务
如果使用自建 Kafka 服务,请确保 Kafka 服务和 Doris集群在同一个 VPC 内,并且相互之间的网络能够互通。
订阅 Kafka 消息
订阅 Kafka 消息使用了 Doris中的例行导入(Routine Load)功能。
用户首先需要创建一个例行导入作业。作业会通过例行调度,不断地发送一系列的任务,每个任务会消费一定数量 Kafka 中的消息。
请注意以下使用限制:
- 支持无认证的 Kafka 访问,以及通过 SSL 方式认证的 Kafka 集群。
支持的消息格式如下:
- csv 文本格式。每一个 message 为一行,且行尾不包含换行符。
- Json 格式,详见 导入 Json 格式数据。
- 仅支持 Kafka 0.10.0.0(含) 以上版本。
访问 SSL 认证的 Kafka 集群
例行导入功能支持无认证的 Kafka 集群,以及通过 SSL 认证的 Kafka 集群。
访问 SSL 认证的 Kafka 集群需要用户提供用于认证 Kafka Broker 公钥的证书文件(ca.pem)。如果 Kafka 集群同时开启了客户端认证,则还需提供客户端的公钥(client.pem)、密钥文件(client.key),以及密钥密码。这里所需的文件需要先通过 CREAE FILE
命令上传到 Plao 中,并且 catalog 名称为 kafka
。CREATE FILE
命令的具体帮助可以参见 CREATE FILE 命令手册。这里给出示例:
- 上传文件
CREATE FILE "ca.pem" PROPERTIES("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka");
CREATE FILE "client.key" PROPERTIES("url" = "https://example_urlkafka-key/client.key", "catalog" = "kafka");
CREATE FILE "client.pem" PROPERTIES("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");
上传完成后,可以通过 SHOW FILES 命令查看已上传的文件。
创建例行导入作业
创建例行导入任务的具体命令,请参阅 ROUTINE LOAD 命令手册。这里给出示例:
- 访问无认证的 Kafka 集群
CREATE ROUTINE LOAD example_db.my_first_job ON example_tbl
COLUMNS TERMINATED BY ","
PROPERTIES
(
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"property.group.id" = "xxx",
"property.client.id" = "xxx",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
max_batch_interval/max_batch_rows/max_batch_size
用于控制一个子任务的运行周期。一个子任务的运行周期由最长运行时间、最多消费行数和最大消费数据量共同决定。- 访问 SSL 认证的 Kafka 集群
CREATE ROUTINE LOAD example_db.my_first_job ON example_tbl
COLUMNS TERMINATED BY ",",
PROPERTIES
(
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
)
FROM KAFKA
(
"kafka_broker_list"= "broker1:9091,broker2:9091",
"kafka_topic" = "my_topic",
"property.security.protocol" = "ssl",
"property.ssl.ca.location" = "FILE:ca.pem",
"property.ssl.certificate.location" = "FILE:client.pem",
"property.ssl.key.location" = "FILE:client.key",
"property.ssl.key.password" = "abcdefg"
);
- 访问 SSL 认证的 Kafka 集群
- 对于百度消息服务,
property.ssl.key.password
属性可以在client.properties
文件中获取。
查看导入作业状态
查看作业状态的具体命令和示例请参阅 SHOW ROUTINE LOAD 命令文档。
查看某个作业的任务运行状态的具体命令和示例请参阅 SHOW ROUTINE LOAD TASK 命令文档。
只能查看当前正在运行中的任务,已结束和未开始的任务无法查看。
修改作业属性
用户可以修改已经创建的作业的部分属性。具体说明请参阅 ALTER ROUTINE LOAD 命令手册。
作业控制
用户可以通过 STOP/PAUSE/RESUME
三个命令来控制作业的停止,暂停和重启。
具体命令请参阅 STOP ROUTINE LOAD ,PAUSE ROUTINE LOAD ,RESUME ROUTINE LOAD 命令文档。
更多帮助
关于 ROUTINE LOAD 的更多详细语法和最佳实践,请参阅 ROUTINE LOAD 命令手册。
4.使用JDBC同步数据
用户可以通过 JDBC 协议,使用 INSERT 语句进行数据导入。
INSERT 语句的使用方式和 MySQL 等数据库中 INSERT 语句的使用方式类似。 INSERT 语句支持以下两种语法:
* INSERT INTO table SELECT ...
* INSERT INTO table VALUES(...)
这里我们仅介绍第二种方式。关于 INSERT 命令的详细说明,请参阅 INSERT 命令文档。
单次写入
单次写入是指用户直接执行一个 INSERT 命令。示例如下:
INSERT INTO example_tbl (col1, col2, col3) VALUES (1000, "baidu", 3.25);
对于 Doris来说,一个 INSERT 命令就是一个完整的导入事务。
因此不论是导入一条数据,还是多条数据,我们都不建议在生产环境使用这种方式进行数据导入。高频词的 INSERT 操作会导致在存储层产生大量的小文件,会严重影响系统性能。
该方式仅用于线下简单测试或低频少量的操作。
或者可以使用以下方式进行批量的插入操作:
INSERT INTO example_tbl VALUES
(1000, "baidu1", 3.25)
(2000, "baidu2", 4.25)
(3000, "baidu3", 5.25);
我们建议一批次插入条数在尽量大,比如几千甚至一万条一次。或者可以通过下面的程序的方式,使用 PreparedStatement 来进行批量插入。
JDBC 示例
这里我们给出一个简单的 JDBC 批量 INSERT 代码示例:
package demo.doris;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
public class PaloJDBCDemo {
private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";
private static final String DB_URL_PATTERN = "jdbc:mysql://%s:%d/%s?rewriteBatchedStatements=true";
private static final String HOST = "127.0.0.1"; // Leader Node host
private static final int PORT = 8030; // http port of Leader Node
private static final String DB = "example_db";
private static final String TBL = "example_tbl";
private static final String USER = "admin";
private static final String PASSWD = "my_pass";
private static final int INSERT_BATCH_SIZE = 10000;
public static void main(String[] args) {
insert();
}
private static void insert() {
// 注意末尾不要加 分号 ";"
String query = "insert into " + TBL + " values(?, ?)";
// 设置 Label 以做到幂等。
// String query = "insert into " + TBL + " WITH LABEL my_label values(?, ?)";
Connection conn = null;
PreparedStatement stmt = null;
String dbUrl = String.format(DB_URL_PATTERN, HOST, PORT, DB);
try {
Class.forName(JDBC_DRIVER);
conn = DriverManager.getConnection(dbUrl, USER, PASSWD);
stmt = conn.prepareStatement(query);
for (int i =0; i < INSERT_BATCH_SIZE; i++) {
stmt.setInt(1, i);
stmt.setInt(2, i * 100);
stmt.addBatch();
}
int[] res = stmt.executeBatch();
System.out.println(res);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (stmt != null) {
stmt.close();
}
} catch (SQLException se2) {
se2.printStackTrace();
}
try {
if (conn != null) conn.close();
} catch (SQLException se) {
se.printStackTrace();
}
}
}
}
请注意以下几点:
- JDBC 连接串需添加
rewriteBatchedStatements=true
参数,并使用PreparedStatement
方式。
目前 Doris暂不支持服务器端的 PrepareStatemnt,所以 JDBC Driver 会在客户端进行批量 Prepare。rewriteBatchedStatements=true
会确保 Driver 执行批处理。并最终形成如下形式的 INSERT 语句发往 Palo:INSERT INTO example_tbl VALUES
(1000, "baidu1", 3.25)
(2000, "baidu2", 4.25)
(3000, "baidu3", 5.25);
批次大小
因为是在客户端进行批量处理,因此一批次如果过大的话,话占用客户端的内存资源,需关注。
Doris后续会支持服务端的 PrepareStatemnt,敬请期待。导入原子性
和其他到导入方式一样,INSERT 操作本身也支持原子性。每一个 INSERT 操作都是一个导入事务,能够保证一个 INSERT 中的所有数据原子性的写入。
前面提到,我们建议在使用 INSERT 导入数据时,采用 ”批“ 的方式进行导入,而不是单条插入。
同时,我们可以为每次 INSERT 操作设置一个 Label。通过 Label 机制 可以保证操作的幂等性和原子性,最终做到数据的不丢不重。关于 INSERT 中 Label 的具体用法,可以参阅 INSERT 文档。
5.通过外部表同步数据
Doris可以创建通过 ODBC 协议访问的外部表。创建完成后,可以通过 SELECT 语句直接查询外部表的数据,也可以通过 INSERT INTO SELECT
的方式导入外部表的数据。
本文档主要介绍如何创建通过 ODBC 协议访问的外部表,以及如何导入这些外部表的数据。目前支持的数据源包括:
- MySQL
- Oracle
- PostgreSQL
创建外部表
创建 ODBC 外部表的详细介绍请参阅 [CREATE ODBC TABLE] 语法帮助手册。
这里仅通过示例说明使用方式。
- 创建 ODBC Resource
ODBC Resource 的目的是用于统一管理外部表的连接信息。CREATE EXTERNAL RESOURCE `oracle_odbc`
PROPERTIES (
"type" = "odbc_catalog",
"host" = "192.168.0.1",
"port" = "8086",
"user" = "test",
"password" = "test",
"database" = "test",
"odbc_type" = "oracle",
"driver" = "Oracle"
);
这里我们创建了一个名为 oracle_odbc
的 Resource,其类型为 odbc_catalog
,表示这是一个用于存储 ODBC 信息的 Resource。odbc_type
为 oracle
,表示这个 OBDC Resource 是用于连接 Oracle 数据库的。关于其他类型的资源,具体可参阅 [资源管理] 文档。
- 创建外部表
CREATE EXTERNAL TABLE `ext_oracle_tbl` (
`k1` decimal(9, 3) NOT NULL COMMENT "",
`k2` char(10) NOT NULL COMMENT "",
`k3` datetime NOT NULL COMMENT "",
`k5` varchar(20) NOT NULL COMMENT "",
`k6` double NOT NULL COMMENT ""
) ENGINE=ODBC
COMMENT "ODBC"
PROPERTIES (
"odbc_catalog_resource" = "oracle_odbc",
"database" = "test",
"table" = "baseall"
);
这里我们创建一个 ext_oracle_tbl
外部表,并引用了之前创建的 oracle_odbc
Resource。
连接外部数据库
- 创建资源
CREATE EXTERNAL RESOURCE `rds_odbc`
PROPERTIES (
"type" = "odbc_catalog",
"host" = "mysql56.rdsxxxxx.rds.gz.baidubce.com",
"port" = "3306",
"user" = "rdsroot",
"password" = "12345",
"odbc_type" = "mysql",
"driver" = "MySQL"
);
需修改其中 host
,port
,user
,password
对应的参数。 host port 可以在 RDS 实例信息也查看。user 和 password 需要在 RDS 控制台创建账户后获取。
- 创建外部表
CREATE EXTERNAL TABLE `mysql_table` (
k1 int,
k2 int
) ENGINE=ODBC
PROPERTIES (
"odbc_catalog_resource" = "rds_odbc",
"database" = "mysql_db",
"table" = "mysql_tbl"
);
创建之后,就可以进行查询等操作了。
导入数据
- 创建 Doris表
这里我们创建一张 Doris的表,列信息和上一步创建的外部表ext_oracle_tbl
一样:CREATE EXTERNAL TABLE `palo_tbl` (
`k1` decimal(9, 3) NOT NULL COMMENT "",
`k2` char(10) NOT NULL COMMENT "",
`k3` datetime NOT NULL COMMENT "",
`k5` varchar(20) NOT NULL COMMENT "",
`k6` double NOT NULL COMMENT ""
)
COMMENT "Palo Table"
DISTRIBUTED BY HASH(k1) BUCKETS 2;
PROPERTIES (
"replication_num" = "1"
);
关于创建 Doris表的详细说明,请参阅 [CREATE-TABLE] 语法帮助。
- 导入数据 (从
ext_oracle_tbl
表 导入到palo_tbl
表)INSERT INTO palo_tbl SELECT k1,k2,k3 FROM ext_oracle_tbl limit 100;
INSERT 命令是同步命令,返回成功,即表示导入成功。
注意事项
- 必须保证外部数据源与 Doris集群在同一个VPC内,并且 Compute Node 可以和外部数据源的网络是互通的。
- ODBC 外部表本质上是通过单一 ODBC 客户端访问数据源,因此并不合适一次性导入大量的数据,建议分批多次导入。
6.JSON格式数据导入说明
Doris支持导入 JSON 格式的数据。本文档主要说明在进行JSON格式数据导入时的注意事项。
支持的导入方式
目前只有以下导入方式支持 Json 格式的数据导入:
- 将本地 JSON 格式的文件通过 [STREAM LOAD]方式导入。
- 通过 [ROUNTINE LOAD] 订阅并消费 Kafka 中的 JSON 格式消息。
暂不支持其他方式的 JSON 格式数据导入。
支持的 Json 格式
当前前仅支持以下两种 Json 格式:
- 以 Array 表示的多行数据
以 Array 为根节点的 Json 格式。Array 中的每个元素表示要导入的一行数据,通常是一个 Object。示例如下:[
{ "id": 123, "city" : "beijing"},
{ "id": 456, "city" : "shanghai"},
...
]
[
{ "id": 123, "city" : { "name" : "beijing", "region" : "haidian"}},
{ "id": 456, "city" : { "name" : "beijing", "region" : "chaoyang"}},
...
]
这种方式通常用于 Stream Load 导入方式,以便在一批导入数据中表示多行数据。
这种方式必须配合设置 stripe_outer_array=true
使用。Doris在解析时会将数组展开,然后依次解析其中的每一个 Object 作为一行数据。
- 以 Object 表示的单行数据
以 Object 为根节点的 Json 格式。整个 Object 即表示要导入的一行数据。示例如下:{ "id": 123, "city" : "beijing"}
{ "id": 123, "city" : { "name" : "beijing", "region" : "haidian" }}
这种方式通常用于 Routine Load 导入方式,如表示 Kafka 中的一条消息,即一行数据。
fuzzy_parse 参数
在 [STREAM LOAD] 中,可以添加 fuzzy_parse
参数来加速 JSON 数据的导入效率。
这个参数通常用于导入 以 Array 表示的多行数据 这种格式,所以一般要配合 strip_outer_array=true
使用。
这个功能要求 Array 中的每行数据的字段顺序完全一致。Doris仅会根据第一行的字段顺序做解析,然后以下标的形式访问之后的数据。该方式可以提升 3-5X 的导入效率。
Json Path
Doris支持通过 Json Path 抽取 Json 中指定的数据。
注:因为对于 Array 类型的数据,Doris会先进行数组展开,最终按照 Object 格式进行单行处理。所以本文档之后的示例都以单个 Object 格式的 Json 数据进行说明。
- 不指定 Json Path
如果没有指定 Json Path,则 Doris会默认使用表中的列名查找 Object 中的元素。示例如下:
表中包含两列:id
,city
Json 数据如下:{ "id": 123, "city" : "beijing"}
则 Doris会使用 id
, city
进行匹配,得到最终数据 123
和 beijing
。
如果 Json 数据如下:
{ "id": 123, "name" : "beijing"}
则使用 id
, city
进行匹配,得到最终数据 123
和 null
。
- 指定 Json Path
通过一个 Json 数据的形式指定一组 Json Path。数组中的每个元素表示一个要抽取的列。示例如下:["$.id", "$.name"]
["$.id.sub_id", "$.name[0]", "$.city[0]"]
Doris会使用指定的 Json Path 进行数据匹配和抽取。
- 匹配非基本类型
前面的示例最终匹配到的数值都是基本类型,如整型、字符串等。Doris当前暂不支持复合类型,如 Array、Map 等。所以当匹配到一个非基本类型时,Doris会将该类型转换为 Json 格式的字符串,并以字符串类型进行导入。示例如下:
Json 数据为:{ "id": 123, "city" : { "name" : "beijing", "region" : "haidian" }}
Json Path 为 ["$.city"]
。则匹配到的元素为:
{ "name" : "beijing", "region" : "haidian" }
该元素会被转换为字符串进行后续导入操作:
"{'name':'beijing','region':'haidian'}"
- 匹配失败
当匹配失败时,将会返回null
。示例如下:
Json 数据为:{ "id": 123, "name" : "beijing"}
Json Path 为 ["$.id", "$.info"]
。则匹配到的元素为 123
和 null
。
Doris当前不区分 Json 数据中表示的 null 值,和匹配失败时产生的 null 值。假设 Json 数据为:
{ "id": 123, "name" : null }
则使用以下两种 Json Path 会获得相同的结果:123
和 null
。
["$.id", "$.name"]
["$.id", "$.info"]
- 完全匹配失败
为防止一些参数设置错误导致的误操作。Doris在尝试匹配一行数据时,如果所有列都匹配失败,则会认为这个是一个错误行。假设 Json 数据为:{ "id": 123, "city" : "beijing" }
如果 Json Path 错误的写为(或者不指定 Json Path 时,表中的列不包含 id
和 city
):
["$.ad", "$.infa"]
则会导致完全匹配失败,则该行会标记为错误行,而不是产出 null, null
。
Json Path 和 Columns
Json Path 用于指定如何对 JSON 格式中的数据进行抽取,而 Columns 指定列的映射和转换关系。两者可以配合使用。
换句话说,相当于通过 Json Path,将一个 Json 格式的数据,按照 Json Path 中指定的列顺序进行了列的重排。之后,可以通过 Columns,将这个重排后的源数据和表的列进行映射。举例如下:
数据内容:
{"k1" : 1, "k2": 2}
表结构:
k2 int, k1 int
导入语句1(以 Stream Load 为例):
curl -v --location-trusted -u root: -H "format: json" -H "jsonpaths: [\"$.k2\", \"$.k1\"]" -T example.json http://127.0.0.1:8030/api/db1/tbl1/_stream_load
导入语句1中,仅指定了 Json Path,没有指定 Columns。其中 Json Path 的作用是将 Json 数据按照 Json Path 中字段的顺序进行抽取,之后会按照表结构的顺序进行写入。最终导入的数据结果如下:
+------+------+
| k1 | k2 |
+------+------+
| 2 | 1 |
+------+------+
会看到,实际的 k1 列导入了 Json 数据中的 “k2” 列的值。这是因为,Json 中字段名称并不等同于表结构中字段的名称。我们需要显式的指定这两者之间的映射关系。
导入语句2:
curl -v --location-trusted -u root: -H "format: json" -H "jsonpaths: [\"$.k2\", \"$.k1\"]" -H "columns: k2, k1" -T example.json http://127.0.0.1:8030/api/db1/tbl1/_stream_load
相比如导入语句1,这里增加了 Columns 字段,用于描述列的映射关系,按 k2, k1
的顺序。即按Json Path 中字段的顺序抽取后,指定第一列为表中 k2 列的值,而第二列为表中 k1 列的值。最终导入的数据结果如下:
+------+------+
| k1 | k2 |
+------+------+
| 1 | 2 |
+------+------+
当然,如其他导入一样,可以在 Columns 中进行列的转换操作。示例如下:
curl -v --location-trusted -u root: -H "format: json" -H "jsonpaths: [\"$.k2\", \"$.k1\"]" -H "columns: k2, tmp_k1, k1 = tmp_k1 * 100" -T example.json http://127.0.0.1:8030/api/db1/tbl1/_stream_load
上述示例会将 k1 的值乘以 100 后导入。最终导入的数据结果如下:
+------+------+
| k1 | k2 |
+------+------+
| 100 | 2 |
+------+------+
NULL 和 Default 值
示例数据如下:
[
{"k1": 1, "k2": "a"},
{"k1": 2},
{"k1": 3, "k2": "c"},
]
表结构为:k1 int null, k2 varchar(32) null default "x"
导入语句如下:
curl -v --location-trusted -u root: -H "format: json" -H "strip_outer_array: true" -T example.json http://127.0.0.1:8030/api/db1/tbl1/_stream_load
用户可能期望的导入结果如下,即对于缺失的列,填写默认值。
+------+------+
| k1 | k2 |
+------+------+
| 1 | a |
+------+------+
| 2 | x |
+------+------+
| 3 | c |
+------+------+
但实际的导入结果如下,即对于缺失的列,补上了 NULL。
+------+------+
| k1 | k2 |
+------+------+
| 1 | a |
+------+------+
| 2 | NULL |
+------+------+
| 3 | c |
+------+------+
这是因为通过导入语句中的信息,Doris并不知道 “缺失的列是表中的 k2 列”。 如果要对以上数据按照期望结果导入,则导入语句如下:
curl -v --location-trusted -u root: -H "format: json" -H "strip_outer_array: true" -H "jsonpaths: [\"$.k1\", \"$.k2\"]" -H "columns: k1, tmp_k2, k2 = ifnull(tmp_k2, 'x')" -T example.json http://127.0.0.1:8030/api/db1/tbl1/_stream_load
应用示例
Stream Load
因为 Json 格式的不可拆分特性,所以在使用 Stream Load 导入 Json 格式的文件时,文件内容会被全部加载到内存后,才开始处理。因此,如果文件过大的话,可能会占用较多的内存。
假设表结构为:
id INT NOT NULL,
city VARHCAR NULL,
code INT NULL
- 导入单行数据1
{"id": 100, "city": "beijing", "code" : 1}
- 不指定 Json Path
curl --location-trusted -u user:passwd -H "format: json" -T data.json http://localhost:8030/api/db1/tbl1/_stream_load
导入结果:
100 beijing 1
- 指定 Json Path
curl --location-trusted -u user:passwd -H "format: json" -H "jsonpaths: [\"$.id\",\"$.city\",\"$.code\"]" -T data.json http://localhost:8030/api/db1/tbl1/_stream_load
导入结果:
100 beijing 1
- 导入单行数据2
{"id": 100, "content": {"city": "beijing", "code" : 1}}
- 指定 Json Path
curl --location-trusted -u user:passwd -H "format: json" -H "jsonpaths: [\"$.id\",\"$.content.city\",\"$.content.code\"]" -T data.json http://localhost:8030/api/db1/tbl1/_stream_load
导入结果:
100 beijing 1
- 导入多行数据
[
{"id": 100, "city": "beijing", "code" : 1},
{"id": 101, "city": "shanghai"},
{"id": 102, "city": "tianjin", "code" : 3},
{"id": 103, "city": "chongqing", "code" : 4},
{"id": 104, "city": ["zhejiang", "guangzhou"], "code" : 5},
{
"id": 105,
"city": {
"order1": ["guangzhou"]
},
"code" : 6
}
]
- 指定 Json Path
curl --location-trusted -u user:passwd -H "format: json" -H "jsonpaths: [\"$.id\",\"$.city\",\"$.code\"]" -H "strip_outer_array: true" -T data.json http://localhost:8030/api/db1/tbl1/_stream_load
导入结果:
100 beijing 1
101 shanghai NULL
102 tianjin 3
103 chongqing 4
104 ["zhejiang","guangzhou"] 5
105 {"order1":["guangzhou"]} 6
- 对导入数据进行转换
数据依然是示例3中的多行数据,现需要对导入数据中的code
列加1后导入。curl --location-trusted -u user:passwd -H "format: json" -H "jsonpaths: [\"$.id\",\"$.city\",\"$.code\"]" -H "strip_outer_array: true" -H "columns: id, city, tmpc, code=tmpc+1" -T data.json http://localhost:8030/api/db1/tbl1/_stream_load
导入结果:
100 beijing 2
101 shanghai NULL
102 tianjin 4
103 chongqing 5
104 ["zhejiang","guangzhou"] 6
105 {"order1":["guangzhou"]} 7
Routine Load
Routine Load 对 Json 数据的处理原理和 Stream Load 相同。在此不再赘述。
对于 Kafka 数据源,每个 Massage 中的内容被视作一个完整的 Json 数据。如果一个 Massage 中是以 Array 格式的表示的多行数据,则会导入多行,而 Kafka 的 offset 只会增加 1。而如果一个 Array 格式的 Json 表示多行数据,但是因为 Json 格式错误导致解析 Json 失败,则错误行只会增加 1(因为解析失败,实际上 Doris无法判断其中包含多少行数据,只能按一行错误数据记录)。
7.导入事务和原子性
导入原子性
Doris中的所有导入操作都有原子性保证,即一个导入作业中的数据要么全部成功,要么全部失败。不会出现仅部分数据导入成功的情况。
在 [BROKER LOAD]中我们也可以实现多多表的原子性导入。
对于表所附属的 [物化视图](TODO),也同时保证和基表的原子性和一致性。
Label 机制
Doris的导入作业都可以设置一个 Label。这个 Label 通常是用户自定义的、具有一定业务逻辑属性的字符串。
Label 的主要作用是唯一标识一个导入任务,并且能够保证相同的 Label 仅会被成功导入一次。
Label 机制可以保证导入数据的不丢不重。如果上游数据源能够保证 At-Least-Once 语义,则配合 Doris的 Label 机制,能够保证 Exactly-Once 语义。
Label 在一个数据库下具有唯一性。Label 的保留期限默认是 3 天。即 3 天后,已完成的 Label 会被自动清理,之后 Label 可以被重复使用。
最佳实践
Label 通常被设置为 业务逻辑+时间
的格式。如 my_business1_20201010_125000
。
这个 Label 通常用于表示:业务 my_business1
这个业务在 2020-10-10 12:50:00
产生的一批数据。通过这种 Label 设定,业务上可以通过 Label 查询导入任务状态,来明确的获知该时间点批次的数据是否已经导入成功。如果没有成功,则可以使用这个 Label 继续重试导入。
8.列的映射、转换与过滤
Doris支持丰富的列映射、转换和过滤操作。可以非常灵活的处理需要导入的原始数据。
本文档主要介绍如何在导入中使用这些功能。
总体介绍
Doris在导入过程中对数据处理步骤分为以下几步:
- 数据按原始文件中的列的顺序读入到 Palo
- 通过前置过滤条件(PRECEDING FILTER)对原始数据进行一次过滤。
- 通过列映射和转换,将原始数据映射到目标列顺序。
- 通过后置过滤条件(WHERE)对转换后的数据在进行一次过滤。
- 写入最终数据。
列的映射、转换和过滤参数在导入作业中皆为可选操作。在默认空缺的情况下,Doris会将源文件中的行按默认的列分割符 \t
分割后,按顺序对应到表中的列。如果源文件中的列数量和表中的列数量不匹配,则会产生数据质量问题,导致数据无法导入。此时则需要显式的描述列的映射、转换和过滤信息。
支持的导入方式
- BROKER LOAD
LOAD LABEL example_db.label1
(
DATA INFILE("bos://bucket/input/file")
INTO TABLE `my_table`
(k1, k2, tmpk3)
PRECEDING FILTER k1 = 1
SET (
k3 = tmpk3 + 1
)
WHERE k1 > k2
)
WITH BROKER bos
(
...
);
- STREAM LOAD
curl
--location-trusted
-u user:passwd
-H "columns: k1, k2, tmpk3, k3 = tmpk3 + 1"
-H "where: k1 > k2"
-T file.txt
http://host:port/api/testDb/testTbl/_stream_load
- ROUTINE LOAD
CREATE ROUTINE LOAD example_db.label1 ON my_table
COLUMNS(k1, k2, tmpk3, k3 = tmpk3 + 1),
PRECEDING FILTER k1 = 1,
WHERE k1 > k2
...
以上导入方式都支持对源数据进行列映射、转换和过滤操作:
- 前置过滤:对读取到的原始数据进行一次过滤。
PRECEDING FILTER k1 = 1
- 映射:定义源数据中的列。如果定义的列名和表中的列相同,则直接映射为表中的列。如果不同,则这个被定义的列可以用于之后的转换操作。如上面示例中的:
(k1, k2, tmpk3)
- 转换:将第一步中经过映射的列进行转换,可以使用内置表达式、函数、自定义函数进行转化,并重新映射到表中对应的列上。如上面示例中的:
k3 = tmpk3 + 1
- 后置过滤:对经过映射和转换后的列,通过表达式进行过滤。被过滤的数据行不会导入到系统中。如上面示例中的:
WHERE k1 > k2
列映射
列映射的目的主要是描述导入文件中各个列的信息,相当于为源数据中的列定义名称。通过描述列映射关系,我们可以将于表中列顺序不同、列数量不同的源文件导入到 Doris中。下面我们通过示例说明:
假设源文件有4列,内容如下(表头列名仅为方便表述,实际并无表头):
列1 | 列2 | 列3 | 列4 |
---|---|---|---|
1 | 100 | beijing | 1.1 |
2 | 200 | shanghai | 1.2 |
3 | 300 | guangzhou | 1.3 |
4 | \N | chongqing | 1.4 |
注:
\N
在源文件中表示 null。
- 调整映射顺序
假设表中有k1,k2,k3,k4
4列。我们希望的导入映射关系如下:列1 -> k1
列2 -> k3
列3 -> k2
列4 -> k4
则列映射的书写顺序应如下:
(k1, k3, k2, k4)
- 源文件中的列数量多于表中的列
假设表中有k1,k2,k3
3列。我们希望的导入映射关系如下:列1 -> k1
列2 -> k3
列3 -> k2
则列映射的书写顺序应如下:
(k1, k3, k2, tmpk4)
其中 tmpk4
为一个自定义的、表中不存在的列名。Doris会忽略这个不存在的列名。
- 源文件中的列数量少于表中的列,使用默认值填充
假设表中有k1,k2,k3,k4,k5
5列。我们希望的导入映射关系如下:列1 -> k1
列2 -> k3
列3 -> k2
这里我们仅使用源文件中的前3列。k4,k5
两列希望使用默认值填充。
则列映射的书写顺序应如下:
(k1, k3, k2)
如果 k4,k5
列有默认值,则会填充默认值。否则如果是 nullable
的列,则会填充 null
值。否则,导入作业会报错。
列前置过滤
前置过滤是对读取到的原始数据进行一次过滤。目前仅支持 BROKER LOAD 和 ROUTINE LOAD。
前置过滤有以下应用场景:
转换前做过滤
希望在列映射和转换前做过滤的场景。能够先行过滤掉部分不需要的数据。过滤列不存在于表中,仅作为过滤标识
比如源数据中存储了多张表的数据(或者多张表的数据写入了同一个 Kafka 消息队列)。数据中每行有一列表名来标识该行数据属于哪个表。用户可以通过前置过滤条件来筛选对应的表数据进行导入。
列转换
列转换功能允许用户对源文件中列值进行变换。目前 Doris支持使用绝大部分内置函数、用户自定义函数进行转换。
注:自定义函数隶属于某一数据库下,在使用自定义函数进行转换时,需要用户对这个数据库有读权限。
转换操作通常是和列映射一起定义的。即先对列进行映射,再进行转换。下面我们通过示例说明:
假设源文件有4列,内容如下(表头列名仅为方便表述,实际并无表头):
列1 | 列2 | 列3 | 列4 |
---|---|---|---|
1 | 100 | beijing | 1.1 |
2 | 200 | shanghai | 1.2 |
3 | 300 | guangzhou | 1.3 |
4 | 400 | chongqing | 1.4 |
- 将源文件中的列值经转换后导入表中
假设表中有k1,k2,k3,k4
4列。我们希望的导入映射和转换关系如下:列1 -> k1
列2 * 100 -> k3
列3 -> k2
列4 -> k4
则列映射的书写顺序应如下:
(k1, tmpk3, k2, k4, k3 = tmpk3 * 100)
这里相当于我们将源文件中的第2列命名为 tmpk3
,同时指定表中 k3
列的值为 tmpk3 * 100
。最终表中的数据如下:
| k1 | k2 | k3 | k4 |
| —- | —- | —- | —- |
| 1 | beijing | 10000 | 1.1 |
| 2 | shanghai | 20000 | 1.2 |
| 3 | guangzhou | 30000 | 1.3 |
| null | chongqing | 40000 | 1.4 |
- 通过 case when 函数,有条件的进行列转换。
假设表中有k1,k2,k3,k4
4列。我们希望对于源数据中的beijing, shanghai, guangzhou, chongqing
分别转换为对应的地区id后导入:列1 -> k1
列2 -> k2
列3 进行地区id转换后 -> k3
列4 -> k4
则列映射的书写顺序应如下:
(k1, k2, tmpk3, k4, k3 = case tmpk3 when "beijing" then 1 when "shanghai" then 2 when "guangzhou" then 3 when "chongqing" then 4 else null end)
最终表中的数据如下:
| k1 | k2 | k3 | k4 |
| —- | —- | —- | —- |
| 1 | 100 | 1 | 1.1 |
| 2 | 200 | 2 | 1.2 |
| 3 | 300 | 3 | 1.3 |
| null | 400 | 4 | 1.4 |
- 将源文件中的 null 值转换成 0 导入。同时也进行示例2中的地区id转换。
假设表中有k1,k2,k3,k4
4列。在对地区id转换的同时,我们也希望对于源数据中 k1 列的 null 值转换成 0 导入:列1 如果为null 则转换成0 -> k1
列2 -> k2
列3 -> k3
列4 -> k4
则列映射的书写顺序应如下:
(tmpk1, k2, tmpk3, k4, k1 = ifnull(tmpk1, 0), k3 = case tmpk3 when "beijing" then 1 when "shanghai" then 2 when "guangzhou" then 3 when "chongqing" then 4 else null end)
最终表中的数据如下:
| k1 | k2 | k3 | k4 |
| —- | —- | —- | —- |
| 1 | 100 | 1 | 1.1 |
| 2 | 200 | 2 | 1.2 |
| 3 | 300 | 3 | 1.3 |
| 0 | 400 | 4 | 1.4 |
列过滤
经过列映射和转换后,我们可以通过过滤条件将不希望导入到Palo中的数据进行过滤。下面我们通过示例说明:
假设源文件有4列,内容如下(表头列名仅为方便表述,实际并无表头):
列1 | 列2 | 列3 | 列4 |
---|---|---|---|
1 | 100 | beijing | 1.1 |
2 | 200 | shanghai | 1.2 |
3 | 300 | guangzhou | 1.3 |
4 | 400 | chongqing | 1.4 |
- 在列映射和转换缺省的情况下,直接过滤
假设表中有k1,k2,k3,k4
4列。我们可以在缺省列映射和转换的情况下,直接定义过滤条件。如我们希望只导入源文件中第4列为大于 1.2 的数据行,则过滤条件如下:where k4 > 1.2
最终表中的数据如下:
| k1 | k2 | k3 | k4 |
| —- | —- | —- | —- |
| 3 | 300 | guangzhou | 1.3 |
| null | 400 | chongqing | 1.4 |
缺省情况下,Doris会按照顺序进行列映射,因此源文件中的第4列自动被映射到表中的 k4
列。
- 对经过列转换的数据进行过滤
假设表中有k1,k2,k3,k4
4列。在 列转换 示例中,我们将省份名称转换成了id。这里我们想过滤掉 id 为 3 的数据。则转换、过滤条件如下:(k1, k2, tmpk3, k4, k3 = case tmpk3 when "beijing" then 1 when "shanghai" then 2 when "guangzhou" then 3 when "chongqing" then 4 else null end)
where k3 != 3
最终表中的数据如下:
| k1 | k2 | k3 | k4 |
| —- | —- | —- | —- |
| 1 | 100 | 1 | 1.1 |
| 2 | 200 | 2 | 1.2 |
| null | 400 | 4 | 1.4 |
这里我们看到,执行过滤时的列值,为经过映射和转换后的最终列值,而不是原始数据。
- 多条件过滤
假设表中有k1,k2,k3,k4
4列。我们想过滤掉k1
列为null
的数据,同时过滤掉k4
列小于 1.2 的数据,则过滤条件如下:where k1 is null and k4 < 1.2
最终表中的数据如下:
| k1 | k2 | k3 | k4 |
| —- | —- | —- | —- |
| 2 | 200 | 2 | 1.2 |
| 3 | 300 | 3 | 1.3 |
数据质量问题和过滤阈值
导入作业中被处理的数据行可以分为如下三种:
Filtered Rows
因数据质量不合格而被过滤掉的数据。数据质量不合格包括类型错误、精度错误、字符串长度超长、文件列数不匹配等数据格式问题,以及因没有对应的分区而被过滤掉的数据行。Unselected Rows
这部分为因preceding filter
或where
列过滤条件而被过滤掉的数据行。Loaded Rows
被正确导入的数据行。
Doris的导入任务允许用户设置最大错误率(max_filter_ratio
)。如果导入的数据的错误率低于阈值,则这些错误行将被忽略,其他正确的数据将被导入。
错误率的计算方式为:
#Filtered Rows / (#Filtered Rows + #Loaded Rows)
也就是说 Unselected Rows
不会参与错误率的计算。
9.严格模式
严格模式(strict_mode)为导入操作中的一个参数配置。该参数会影响某些数值的导入行为和最终导入的数据。
本文档主要说明如何设置严格模式,以及严格模式产生的影响。
如何设置
严格模式默认情况下都为 False,即关闭状态。
不同的导入方式设置严格模式的方式不尽相同。
- BROKER LOAD
LOAD LABEL example_db.label1
(
DATA INFILE("bos://my_bucket/input/file.txt")
INTO TABLE `my_table`
COLUMNS TERMINATED BY ","
)
WITH BROKER bos
(
"bos_endpoint" = "http://bj.bcebos.com",
"bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx",
"bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyyyyyyyy"
)
PROPERTIES
(
"strict_mode" = "true"
)
- STREAM LOAD
curl --location-trusted -u user:passwd \
-H "strict_mode: true" \
-T 1.txt \
http://host:port/api/example_db/my_table/_stream_load
- ROUTINE LOAD
CREATE ROUTINE LOAD example_db.test_job ON my_table
PROPERTIES
(
"strict_mode" = "true"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic"
);
- INSERT
通过[会话变量]设置:SET enable_insert_strict = true;
INSERT INTO my_table ...;
严格模式的作用
严格模式的意思是,对于导入过程中的列类型转换进行严格过滤。
严格过滤的策略如下:
对于列类型转换来说,如果开启严格模式,则错误的数据将被过滤。这里的错误数据是指:原始数据并不为 null
,而在进行列类型转换后结果为 null
的这一类数据。
这里说指的 列类型转换
,并不包括用函数计算得出的 null
值。
对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,严格模式对其也不产生影响。例如:如果类型是 decimal(1,0)
, 原始数据为 10,则属于可以通过类型转换但不在列声明的范围内。这种数据 strict 对其不产生影响。
- 以列类型为 TinyInt 来举例: | 原始数据类型 | 原始数据举例 | 转换为 TinyInt 后的值 | 严格模式 | 结果 | | —- | —- | —- | —- | —- | | 空值 | \N | NULL | 开启或关闭 | NULL | | 非空值 | “abc” or 2000 | NULL | 开启 | 非法值(被过滤) | | 非空值 | “abc” | NULL | 关闭 | NULL | | 非空值 | 1 | 1 | 开启或关闭 | 正确导入 |
说明:
- 表中的列允许导入空值
abc
及2000
在转换为 TinyInt 后,会因类型或精度问题变为 NULL。在严格模式开启的情况下,这类数据将会被过滤。而如果是关闭状态,则会导入null
。
- 以列类型为 Decimal(1,0) 举例 | 原始数据类型 | 原始数据举例 | 转换为 Decimal 后的值 | 严格模式 | 结果 | | —- | —- | —- | —- | —- | | 空值 | \N | null | 开启或关闭 | NULL | | 非空值 | aaa | NULL | 开启 | 非法值(被过滤) | | 非空值 | aaa | NULL | 关闭 | NULL | | 非空值 | 1 or 10 | 1 or 10 | 开启或关闭 | 正确导入 |
说明:
- 表中的列允许导入空值
abc
在转换为 Decimal 后,会因类型问题变为 NULL。在严格模式开启的情况下,这类数据将会被过滤。而如果是关闭状态,则会导入null
。10
虽然是一个超过范围的值,但是因为其类型符合 decimal 的要求,所以严格模式对其不产生影响。10
最后会在其他导入处理流程中被过滤。但不会被严格模式过滤。