因为做的项目中有Greenplum数据同步到Kafka的需求,因此做了这方面的研究。由于Greenplum是基于PostgreSQL开发的,所以首先去找PostgreSQL对接Kafka的解决方案。

    网上搜索了大量资料,锁定在了Kafka Connector + Debezium Connector这种方式,于是一顿折腾之后,终于调试通了。不幸的事情发生了,Debezium官网上有说明,此插件只适用于PostgreSQL9.6及以后的版本,而我们项目中用到的Greenplum是6.4.0的,再仔细一查,此版本是基于PostgreSQL9.4.24的,晴天霹雳!因此Debezium这条路是走不通了,只好另寻他法。
    image-20200814115135471.png

    又继续网上搜索资料,发现还有一种叫做pgkafka的工具,而且没有看到此工具对PostgreSQL的版本有什么限制,于是上了github上下载了源码,而就在make编译时,它居然报错了,而且报的错误在github的README文件中也做了说明。细心一看,pgkafka是基于PostgreSQL9.2做的开发,可能会和高版本PostgreSQL存在兼容问题,而且这仓库已经有6年的时间没做更新了,基本上可以判断此项目停止维护了,如果我在项目上使用此工具,出了什么问题,真的不好办,于是此方案又被否决。

    此方案的研究陆陆续续已经过去了快两周的时间,心态有点崩了,但是不能放弃啊,继续查资料……

    无意中,查Greenplum官网资料中发现,Greenplum支持外部写表这种东西,而且刚好也支持把表里数据同步到Kafka中,但这种写表只能INSERT,不能做其他操作,正好项目的需求就是只有INSERT操作,符合要求。再根据网上的博客资料:greenplum链接kafka写入与导出数据](https://blog.csdn.net/xfg0218/article/details/102605788#15.3 greenplum数据写入到kafka)),这就开始测试。

    在这之前,我的Greenplum6.4.0已经安装好,且Kafka集群也都准备就绪。

    注意:在Greenplum所在的机器上,也要安装Kakfa的客户端

    1. CREATE DATABASE test;
    2. \c test;
    3. --写法1
    4. CREATE SCHEMA kafkaschema;
    5. DROP EXTERNAL TABLE IF EXISTS kafkaschema.kafka_test;
    6. CREATE WRITABLE EXTERNAL WEB TABLE kafkaschema.kafka_test (
    7. id varchar,
    8. name varchar
    9. )
    10. EXECUTE '/opt/module/kafka/bin/kafka-console-producer.sh --broker-list hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic greenplum_kafka ' FORMAT 'TEXT' (DELIMITER E',' ESCAPE 'OFF' NULL '');
    11. --上面的DELIMITER E',' ,指的是输出到Kafka的数据,列与列之间的分隔符,这里用的,做测试。
    12. --写法2
    13. DROP EXTERNAL TABLE IF EXISTS kafkaschema.kafka_test;
    14. CREATE WRITABLE EXTERNAL WEB TABLE kafkaschema.kafka_test (
    15. id varchar,
    16. name varchar
    17. )
    18. EXECUTE '/opt/module/kafka/bin/kafka-console-producer.sh --broker-list hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic gp_kafka' FORMAT 'CSV' (DELIMITER ',');
    19. insert into kafkaschema.kafka_test(id, name) values('1', 'jim');
    20. insert into kafkaschema.kafka_test(id, name) values('2', 'tom');
    21. insert into kafkaschema.kafka_test(id, name) values('3', 'jane');
    22. insert into kafkaschema.kafka_test(id, name) values('4', 'ethan');

    去到Kafka集群对应的集群hadoop102上去消费topic为greenplum_kafka的数据,可以消费的到。至此,关于Greenplum数据同步到Kafka的方案终于落实。
    image-20200909180546440.png