因为做的项目中有Greenplum数据同步到Kafka的需求,因此做了这方面的研究。由于Greenplum是基于PostgreSQL开发的,所以首先去找PostgreSQL对接Kafka的解决方案。
网上搜索了大量资料,锁定在了Kafka Connector + Debezium Connector
这种方式,于是一顿折腾之后,终于调试通了。不幸的事情发生了,Debezium官网上有说明,此插件只适用于PostgreSQL9.6及以后的版本,而我们项目中用到的Greenplum是6.4.0的,再仔细一查,此版本是基于PostgreSQL9.4.24的,晴天霹雳!因此Debezium这条路是走不通了,只好另寻他法。
又继续网上搜索资料,发现还有一种叫做pgkafka
的工具,而且没有看到此工具对PostgreSQL的版本有什么限制,于是上了github上下载了源码,而就在make编译时,它居然报错了,而且报的错误在github的README文件中也做了说明。细心一看,pgkafka是基于PostgreSQL9.2做的开发,可能会和高版本PostgreSQL存在兼容问题,而且这仓库已经有6年的时间没做更新了,基本上可以判断此项目停止维护了,如果我在项目上使用此工具,出了什么问题,真的不好办,于是此方案又被否决。
此方案的研究陆陆续续已经过去了快两周的时间,心态有点崩了,但是不能放弃啊,继续查资料……
无意中,查Greenplum官网资料中发现,Greenplum支持外部写表
这种东西,而且刚好也支持把表里数据同步到Kafka中,但这种写表只能INSERT,不能做其他操作,正好项目的需求就是只有INSERT操作,符合要求。再根据网上的博客资料:greenplum链接kafka写入与导出数据](https://blog.csdn.net/xfg0218/article/details/102605788#15.3 greenplum数据写入到kafka)),这就开始测试。
在这之前,我的Greenplum6.4.0已经安装好,且Kafka集群也都准备就绪。
注意:在Greenplum所在的机器上,也要安装Kakfa的客户端
CREATE DATABASE test;
\c test;
--写法1
CREATE SCHEMA kafkaschema;
DROP EXTERNAL TABLE IF EXISTS kafkaschema.kafka_test;
CREATE WRITABLE EXTERNAL WEB TABLE kafkaschema.kafka_test (
id varchar,
name varchar
)
EXECUTE '/opt/module/kafka/bin/kafka-console-producer.sh --broker-list hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic greenplum_kafka ' FORMAT 'TEXT' (DELIMITER E',' ESCAPE 'OFF' NULL '');
--上面的DELIMITER E',' ,指的是输出到Kafka的数据,列与列之间的分隔符,这里用的,做测试。
--写法2
DROP EXTERNAL TABLE IF EXISTS kafkaschema.kafka_test;
CREATE WRITABLE EXTERNAL WEB TABLE kafkaschema.kafka_test (
id varchar,
name varchar
)
EXECUTE '/opt/module/kafka/bin/kafka-console-producer.sh --broker-list hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic gp_kafka' FORMAT 'CSV' (DELIMITER ',');
insert into kafkaschema.kafka_test(id, name) values('1', 'jim');
insert into kafkaschema.kafka_test(id, name) values('2', 'tom');
insert into kafkaschema.kafka_test(id, name) values('3', 'jane');
insert into kafkaschema.kafka_test(id, name) values('4', 'ethan');
去到Kafka集群对应的集群hadoop102上去消费topic为greenplum_kafka的数据,可以消费的到。至此,关于Greenplum数据同步到Kafka的方案终于落实。