在利用Python编写Kafka Producer端代码时,有一个很简单的操作,部分代码如下:

    1. # -*- coding: utf-8 -*-
    2. from kafka import KafkaProducer
    3. from public_funcs import logInfo, pgRetrieve, pgConnection
    4. import datetime
    5. # import logging
    6. import sys
    7. from config import Config
    8. import json
    9. def load_data(tablename, batchid, engine):
    10. query_sql = Config.INPUT_QUERY.format(tablename, batchid)
    11. df = pgRetrieve(engine, query_sql)
    12. return df.to_json(orient="split",date_format = 'iso') #df转换为json '{"columns":["A","C","B"],"index":[5,4,3],"data":[[0,1,2],[3,4,5],[6,7,8]]}'
    13. class Kafka_producer():
    14. def __init__(self, kafkaserver, topic):
    15. self.kafkaserver = kafkaserver
    16. self.topic = topic
    17. self.producer = KafkaProducer(bootstrap_servers=self.kafkaserver, max_request_size=1073741824) # max_request_size该参数控制了能够发往kafka broker的数据量大小
    18. def sendjsondata(self, tablename, batchid, engine):
    19. try:
    20. msgs = load_data(tablename, batchid, engine)
    21. print(msgs)
    22. msgs_dict = json.loads(msgs)
    23. producer = self.producer
    24. msg_cnt = 0
    25. if len(msgs_dict["index"]) > 0:
    26. for i in range(len(msgs_dict["index"])):
    27. msg = dict(
    28. columns=msgs_dict["columns"],
    29. index=msgs_dict["index"][i],
    30. data=msgs_dict["data"][i],
    31. )
    32. producer.send(self.topic, value=json.dumps(msg).encode('utf-8'))
    33. producer.flush()
    34. msg_cnt += 1
    35. meta_info = dict(
    36. tablename=tablename,
    37. batchid=batchid,
    38. cnt=msg_cnt,
    39. )
    40. print(meta_info)
    41. print(producer)
    42. producer.send(Config.META_TOPIC, value=json.dumps(meta_info).encode('utf-8'))
    43. logInfo('(table_name:{0} batch_id:{1}) 数据发送到kafka成功'.format(tablename, batchid))
    44. except KafkaError as e:
    45. print(e)
    46. if __name__ == '__main__':
    47. batchid = sys.argv[1]
    48. # topic = 'gp_topic1'
    49. engine, conn = pgConnection(Config.gp_database)
    50. for i in range(len(Config.TOPICS)):
    51. producer = Kafka_producer(Config.KAFKA_SERVER, Config.TOPICS[i])
    52. producer.sendjsondata(Config.SOURCE_TABLES[i], batchid, engine)

    第49行,有一个统计上述不同topic发送成功的消息数量,再把该数量发送到指定的元数据META_TOPIC中,结果发生了一个比较奇怪的现象。

    在执行代码时,只有一个topic的信息被统计,且也发送到了META_TOPIC中:

    1. {"tablename": "public.baseinfo", "batchid": "1", "cnt": 4}

    但实际上是监控了获取了2张表的数据,分别发送到2个不同的topic中,正确的结果应该是:

    1. {"tablename": "public.baseinfo", "batchid": "1", "cnt": 4}
    2. {"tablename": "public.seemployees", "batchid": "1", "cnt": 1}

    但如果去Pycharm中debug该程序,kafka是能收到第2条消息的。

    随后,我又多次尝试运行该程序,发现一个情况:kafka有时能收到2条消息,有时只能收到1条

    因此,怀疑是否是在send数据时,会不定时的发生阻塞情况。

    而后,在网上查询过程中无意间发现,每次在send数据之后,最好是加上producer.flush()这段代码。因此有必要对producer.flush()的作用进行深究。

    官网上flush()的定义为:

    Invoking this method makes all buffered records immediately available to send (even if linger_ms is greater than 0) and blocks on the completion of the requests associated with these records. The post-condition of flush() is that any previously sent record will have completed (e.g. Future.is_done() == True). A request is considered completed when either it is successfully acknowledged according to the ‘acks’ configuration for the producer, or it results in an error.

    Other threads can continue sending messages while one thread is blocked waiting for a flush call to complete; however, no guarantee is made about the completion of messages sent after the flush call begins.

    翻译一下就是:

    调用此方法可以立即发送所有Producer中缓冲的记录(即使linger_ms大于0),并在与这些记录关联的请求完成时阻塞。flush()的后置条件是以前发送的任何记录都已经完成(例如Future.is_done() == True)。根据Producer的’ acks ‘配置,如果请求成功确认,或者请求导致错误,则认为该请求已完成。

    当一个线程被阻塞,等待flush()调用完成时,其他线程可以继续发送消息;但是,不能保证在flush()调用开始后发送的消息一定能够真正完成。

    这里注意到一个点,就是flush()是负责发送所有的已在Producer缓存区的消息到Broker中,而我上面的代码49行中,只有一个send()的动作,并没有flush()。因此,在程序运行到最后一个循环时,在缓存区还保留着最后一条消息,并没有发送到Broker。

    因此在49行的send操作后,再加上producer.flush(),即可保证所有的消息都能发送到对应的topic中。