一、环境准备

1. 开启MariaDB的Binlog日志

1.1 修改/etc/my.cnf文件,在配置文件[mysqld]下增加如下配置

  1. server-id=1
  2. log-bin=mysql-bin
  3. binlog_format=ROW

image.png
注意:MySQL Binlog支持多种数据更新格式包括Row、Statement和mix(Row和Statement的混合),这里建议使用Row模式的Binlog格式,可以更加方便实时的反应行级别的数据变化。

1.2 重启MariaDB服务

  1. systemctl restart mariadb
  2. systemctl status mariadb

image.png

1.3 登录MariaDB创建同步账号

  1. GRANT ALL on maxwell.* to 'maxwell'@'%' identified by '123456';
  2. GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';
  3. FLUSH PRIVILEGES;

image.png

1.4 StreamSets安装MySQL驱动

将MySQL的JDBC驱动拷贝至/opt/cloudera/parcels/STREAMSETS_DATACOLLECTOR/streamsets-libs/streamsets-datacollector-mysql-binlog-lib/lib目录
image.png

1.5 在MariaDB数据库中创建测试表

  1. create database test character set utf8;
  2. use test;
  3. create table student (id int(6),name varchar(30),sex char(3),age Int(3));
  4. insert into student values(000001,'张三','男',18);
  5. insert into student values(000002,'李四','男',20);
  6. insert into student values(000003,'王五','女',20);
  7. select * from student;

image.png

1.6 使用Hue创建Kudu表

  1. CREATE TABLE student (
  2. id BIGINT,
  3. age INT,
  4. name STRING,
  5. sex STRING,
  6. PRIMARY KEY (id, age)
  7. )
  8. PARTITION BY HASH (id) PARTITIONS 4,
  9. RANGE (age)
  10. (
  11. PARTITION VALUES < 20,
  12. PARTITION 20 <= VALUES < 30,
  13. PARTITION 30 <= VALUES < 50,
  14. PARTITION 50 <= VALUES
  15. ) STORED AS KUDU
  16. TBLPROPERTIES("kudu.master_addresses" = "cdh1.macro.com:7051");

image.png

二、创建StreamSets的Pipline

1.1 登录StreamSets,创建一个新的Pipline

image.png

1.2 选择Origins类别,搜索MySQL Binary Log

image.png

  1. 配置MySQL Binary Log

image.png

  1. 配置MySQL信息

image.png

  1. 配置同步账号信息

image.png

  1. 高级配置,根据自己的需要进行配置

image.png

  1. 到此MySQL Binary Log的配置完成。

    1.3 添加表过滤的Stream Selector

    image.png

  2. Stream Selector基本配置

image.png

  1. 配置分流条件

image.png

1.4 添加插入类型分流的Stream Selector

image.png

  1. Stream Selector基本配置

image.png

  1. 配置分流条件

    1. ${record:value('/Type')=='DELETE'}

    image.png

    1.5 添加处理Delete类型日志的JavaScript Evaluator

  2. 该JavaScript Evaluator主要用于解析DELETE类型的Binary Log 日志

image.png

  1. 配置基本属性

image.png

  1. 配置JavaScript脚本,脚本如下:

    1. for(var i = 0; i < records.length; i++) {
    2. try {
    3. var newRecord = sdcFunctions.createRecord(true);
    4. newRecord.value = records[i].value['OldData'];
    5. newRecord.value.Type = records[i].value['Type'];
    6. newRecord.value.Database = records[i].value['Database'];
    7. newRecord.value.Table = records[i].value['Table'];
    8. log.info(records[i].value['Type'])
    9. output.write(newRecord);
    10. } catch (e) {
    11. // Send record to error
    12. error.write(records[i], e);
    13. }
    14. }

    image.png

    1.6 添加处理INSRET和UPDATE类型日志的JavaScript Evaluator

  2. 该JavaScript Evaluator主要用于解析INSERT和UPDATE类型的日志

image.png

  1. 配置基本属性

image.png

  1. 配置JavaScript脚本,脚本如下:

    1. for(var i = 0; i < records.length; i++) {
    2. try {
    3. var newRecord = sdcFunctions.createRecord(true);
    4. newRecord.value = records[i].value['Data'];
    5. newRecord.value.Type = records[i].value['Type'];
    6. newRecord.value.Database = records[i].value['Database'];
    7. newRecord.value.Table = records[i].value['Table'];
    8. log.info(records[i].value['Type'])
    9. output.write(newRecord);
    10. } catch (e) {
    11. // Send record to error
    12. error.write(records[i], e);
    13. }
    14. }

    image.png

    1.7 为JavaScript Evaluator-DELETE添加Kudu

    image.png

  2. 配置Kudu基本属性

image.png

  1. 配置Kudu环境

image.png

  1. Kudu的高级配置

image.png

1.8 为JavaScript Evaluator-UPSERT添加Kudu并配置基础属性

image.png

  1. 配置Kudu环境

image.png

  1. Kudu高级配置

image.png

1.9 流程创建完成后,启动该Pipelines

image.png
image.png

三、Pipeline流程测试

1. insert测试

  1. 登录MariaDB数据库,向student表中插入数据

    1. insert into student values(4,'赵六','男',30);

    image.png

  2. 查看StreamSets的Pipeline实时状态

image.png

  1. 可以看到Kudu-Upsert成功的处理了一条数据

image.png

  1. 使用Hue查看Kudu表数据

image.png

2. update测试

  1. 数据成功的插入到Kudu的student表中。
  2. 登录MariaDB数据库修改student表中数据

    1. update student set name='AAA' where id=4;

    image.png

  3. 查看StreamSets的Pipeline实时状态

image.png

  1. 可以看到Kudu-Upsert成功处理了两条数据,这两条数据分别是INSERT和UPDATE

image.png

  1. 使用Hue查看Kudu的student表

image.png

3. delete测试

  1. 登录MariaDB数据,删除student表中数据

    1. delete from student where id=4;

    image.png

  2. 查看StreamSets的Pipeline实时状态

image.png

  1. 可以看到Kudu-Delete成功处理一条日志

image.png

  1. 使用Hue查看Kudu的student表,id为4的数据已不存在

image.png

四、总结

  1. 实现MySQL CDC的前提是需要开启MySQL的Binary Log日志,并且需要创建复制账号,SreamSets中MySQL-Binary Log实际充当的为MySQL的一个Slave。
  2. 向Kudu实时写入数据的前提是Kudu的表已存在,否则无法正常写入数据。
  3. JavaScript脚本需要注意在解析每一条Record是需要使用其内置的Function,在示例中Fayson将MySQL Binary Log复杂的JSON数据解析重组为简单的Map对象,这里就省去了Kudu入库时“Field to Column Mapping”的映射,需要去确保组装的Map数据中Key与Kudu表中的column字段一致。
  4. 在Kudu插入数据时指定Kudu表名需要注意,如果使用Impala创建的表,则需要加上impala的前缀格式impala::。