- 一、环境准备
- 二、创建StreamSets的Pipline
- 1.1 登录StreamSets,创建一个新的Pipline
- 1.2 选择Origins类别,搜索MySQL Binary Log
- 1.3 添加表过滤的Stream Selector
- 1.4 添加插入类型分流的Stream Selector
- 1.5 添加处理Delete类型日志的JavaScript Evaluator
- 1.6 添加处理INSRET和UPDATE类型日志的JavaScript Evaluator
- 1.7 为JavaScript Evaluator-DELETE添加Kudu
- 1.8 为JavaScript Evaluator-UPSERT添加Kudu并配置基础属性
- 1.9 流程创建完成后,启动该Pipelines
- 三、Pipeline流程测试
- 四、总结
一、环境准备
1. 开启MariaDB的Binlog日志
1.1 修改/etc/my.cnf文件,在配置文件[mysqld]下增加如下配置
server-id=1
log-bin=mysql-bin
binlog_format=ROW
注意:MySQL Binlog支持多种数据更新格式包括Row、Statement和mix(Row和Statement的混合),这里建议使用Row模式的Binlog格式,可以更加方便实时的反应行级别的数据变化。
1.2 重启MariaDB服务
systemctl restart mariadb
systemctl status mariadb
1.3 登录MariaDB创建同步账号
GRANT ALL on maxwell.* to 'maxwell'@'%' identified by '123456';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';
FLUSH PRIVILEGES;
1.4 StreamSets安装MySQL驱动
将MySQL的JDBC驱动拷贝至/opt/cloudera/parcels/STREAMSETS_DATACOLLECTOR/streamsets-libs/streamsets-datacollector-mysql-binlog-lib/lib目录
1.5 在MariaDB数据库中创建测试表
create database test character set utf8;
use test;
create table student (id int(6),name varchar(30),sex char(3),age Int(3));
insert into student values(000001,'张三','男',18);
insert into student values(000002,'李四','男',20);
insert into student values(000003,'王五','女',20);
select * from student;
1.6 使用Hue创建Kudu表
CREATE TABLE student (
id BIGINT,
age INT,
name STRING,
sex STRING,
PRIMARY KEY (id, age)
)
PARTITION BY HASH (id) PARTITIONS 4,
RANGE (age)
(
PARTITION VALUES < 20,
PARTITION 20 <= VALUES < 30,
PARTITION 30 <= VALUES < 50,
PARTITION 50 <= VALUES
) STORED AS KUDU
TBLPROPERTIES("kudu.master_addresses" = "cdh1.macro.com:7051");
二、创建StreamSets的Pipline
1.1 登录StreamSets,创建一个新的Pipline
1.2 选择Origins类别,搜索MySQL Binary Log
- 配置MySQL Binary Log
- 配置MySQL信息
- 配置同步账号信息
- 高级配置,根据自己的需要进行配置
- 配置分流条件
1.4 添加插入类型分流的Stream Selector
- Stream Selector基本配置
配置分流条件
${record:value('/Type')=='DELETE'}
1.5 添加处理Delete类型日志的JavaScript Evaluator
该JavaScript Evaluator主要用于解析DELETE类型的Binary Log 日志
- 配置基本属性
配置JavaScript脚本,脚本如下:
for(var i = 0; i < records.length; i++) {
try {
var newRecord = sdcFunctions.createRecord(true);
newRecord.value = records[i].value['OldData'];
newRecord.value.Type = records[i].value['Type'];
newRecord.value.Database = records[i].value['Database'];
newRecord.value.Table = records[i].value['Table'];
log.info(records[i].value['Type'])
output.write(newRecord);
} catch (e) {
// Send record to error
error.write(records[i], e);
}
}
1.6 添加处理INSRET和UPDATE类型日志的JavaScript Evaluator
该JavaScript Evaluator主要用于解析INSERT和UPDATE类型的日志
- 配置基本属性
配置JavaScript脚本,脚本如下:
for(var i = 0; i < records.length; i++) {
try {
var newRecord = sdcFunctions.createRecord(true);
newRecord.value = records[i].value['Data'];
newRecord.value.Type = records[i].value['Type'];
newRecord.value.Database = records[i].value['Database'];
newRecord.value.Table = records[i].value['Table'];
log.info(records[i].value['Type'])
output.write(newRecord);
} catch (e) {
// Send record to error
error.write(records[i], e);
}
}
1.7 为JavaScript Evaluator-DELETE添加Kudu
配置Kudu基本属性
- 配置Kudu环境
- Kudu的高级配置
1.8 为JavaScript Evaluator-UPSERT添加Kudu并配置基础属性
- 配置Kudu环境
- Kudu高级配置
1.9 流程创建完成后,启动该Pipelines
三、Pipeline流程测试
1. insert测试
登录MariaDB数据库,向student表中插入数据
insert into student values(4,'赵六','男',30);
查看StreamSets的Pipeline实时状态
- 可以看到Kudu-Upsert成功的处理了一条数据
- 使用Hue查看Kudu表数据
2. update测试
- 数据成功的插入到Kudu的student表中。
登录MariaDB数据库修改student表中数据
update student set name='AAA' where id=4;
查看StreamSets的Pipeline实时状态
- 可以看到Kudu-Upsert成功处理了两条数据,这两条数据分别是INSERT和UPDATE
- 使用Hue查看Kudu的student表
3. delete测试
登录MariaDB数据,删除student表中数据
delete from student where id=4;
查看StreamSets的Pipeline实时状态
- 可以看到Kudu-Delete成功处理一条日志
- 使用Hue查看Kudu的student表,id为4的数据已不存在
四、总结
- 实现MySQL CDC的前提是需要开启MySQL的Binary Log日志,并且需要创建复制账号,SreamSets中MySQL-Binary Log实际充当的为MySQL的一个Slave。
- 向Kudu实时写入数据的前提是Kudu的表已存在,否则无法正常写入数据。
- JavaScript脚本需要注意在解析每一条Record是需要使用其内置的Function,在示例中Fayson将MySQL Binary Log复杂的JSON数据解析重组为简单的Map对象,这里就省去了Kudu入库时“Field to Column Mapping”的映射,需要去确保组装的Map数据中Key与Kudu表中的column字段一致。
- 在Kudu插入数据时指定Kudu表名需要注意,如果使用Impala创建的表,则需要加上impala的前缀格式impala:
:。