title: Kafka Broker接受不到Producer发送的消息原因探究date: 2020-09-21 13:49:42
tags: [Greenplum, Kafka]
categories: Kafka
copyright: true

公司做项目过程中,需要将Greenplum数据实时同步到PostgreSQL,但Greenplum所在机器是在外网上,而PostgreSQL是在内网上,二者不能直接通信,所以想到了使用Kafka作为中间媒介同步传输数据。

之前研究过Debezium + Kafka Connector这种方式,但无奈公司的Greenplum库是基于PostgreSQL 9.4.24的,而Debezium官网最低支持的版本为9.6,因此不得不放弃这种方案。

而后,又得知Greenplum是支持外部写表这样的方式,这种表只可以INSERT,不可SELECT和UPDATE,在建表时,加上执行kafka producer的命令,相当于将此表的变化作为一个生产者的客户端,向Kafka Broker传输数据,但前提是Greenplum所在的机器上,必须要安装对应的Kafka客户端工具。

由于我们当时的shell脚本,java和python程序,SQL文件全都在另外一台服务器上,为的就是减轻Greenplum的master服务器的压力,所以再在master上部署Kafka客户端不太现实,因此此方案最终也被否决。

最终我们采用的方案是直接用Python编写Producer和Consumer。

Prodcuer:利用pandas读取Greenplum指定表的数据,将生成的dataframe作为消息发送给Kafka Broker

Consumer:监控Kafka Broker中的不同topic的消息,一旦有新消息,则取出来将其入库到PostgreSQL

首先在编写代码,并拿小量数据测试通过后,将代码同步到服务器上。测试时,一边开着kafka自带的kafka-console-consumer.sh,一边执行生产者程序,但是奇怪的事情发生了,程序显示执行成功,但consumer并没有收到消息。由于对kafka算是入门,了解了大致的框架,遇到这个问题我有2个思路方向:

  • Producer发送消息到Kafka Broker失败
  • 前者成功了,但是Consumer由于某种原因消费不到消息

那么就按照这两个思路排查,首先去到kafka的data目录下看对应的topic数据情况,发现生产者程序里定义的几个Topic均已经创建成功,但是每一个Topic下的log文件都是0K的大小,很明显,就是消息没有传过来。在经过了一番查阅资料之后发现,最大的可能就是由于消息过大,超过某个限制,因此传送失败。我在本地测试的时候,数据量不超过10条,数据也非常小,因此不会有问题。但真实Greenplum库中的数据都是几千上万条的,数据量一下就上来了。

所以冲着Kafka 消息过大这个话题,继续在网上搜索资料,发现在Kafka Producer、Kafka Broker、Kafka Consumer这三端,都存在这消息大小的限制。

数据流向:Kafka producer —> Kafka Broker —> Kafka Consumer

因此需要在每一端都做相关的配置。

  • Kafka producer(producer.properties)
    1. max_request_size=1073741824
  • Kafka Broker(server.properties)
    1. message.max.bytes=1073741824
    2. replica.fetch.max.bytes=1073741824
  • Kafka Consumer(consumer.properties)
    1. fetch.message.max.bytes=1073741824

在配置完之后,重新进行测试,发现消息还是没有传送到Broker,正在纠结的时候,突然想到,我的生产者程序是用Python代码编写的,并不是自带的kafka-console-producer.sh,因此是否是因为我需要在代码里单独设定Producer的参数。

去查看了kafka-python这个包里的KafkaProducer类,发现里面确实有max_request_size这个参数,于是赶紧加上这个参数,修改代码,重新测试,发现果然传送成功。

至于后面的Consumer程序,由于是部署在别的机器上,连接的是内网的PostgreSQL库,所以暂时没有进行测试,如果真的收不到消息,解决方案也很明确了,只需更改相应的参数fetch.message.max.bytes即可。