在利用Python编写Kafka Producer端代码时,有一个很简单的操作,部分代码如下:

    1. # -*- coding: utf-8 -*-
    2. from kafka import KafkaProducer
    3. from public_funcs import logInfo, pgRetrieve, pgConnection
    4. import datetime
    5. # import logging
    6. import sys
    7. from config import Config
    8. import json
    9. def load_data(tablename, batchid, engine):
    10. query_sql = Config.INPUT_QUERY.format(tablename, batchid)
    11. df = pgRetrieve(engine, query_sql)
    12. return df.to_json(orient="split",date_format = 'iso') #df转换为json '{"columns":["A","C","B"],"index":[5,4,3],"data":[[0,1,2],[3,4,5],[6,7,8]]}'
    13. class Kafka_producer():
    14. def __init__(self, kafkaserver, topic):
    15. self.kafkaserver = kafkaserver
    16. self.topic = topic
    17. self.producer = KafkaProducer(bootstrap_servers=self.kafkaserver, max_request_size=1073741824) # max_request_size该参数控制了能够发往kafka broker的数据量大小
    18. def sendjsondata(self, tablename, batchid, engine):
    19. try:
    20. msgs = load_data(tablename, batchid, engine)
    21. print(msgs)
    22. msgs_dict = json.loads(msgs)
    23. producer = self.producer
    24. msg_cnt = 0
    25. if len(msgs_dict["index"]) > 0:
    26. for i in range(len(msgs_dict["index"])):
    27. msg = dict(
    28. columns=msgs_dict["columns"],
    29. index=msgs_dict["index"][i],
    30. data=msgs_dict["data"][i],
    31. )
    32. producer.send(self.topic, value=json.dumps(msg).encode('utf-8'))
    33. producer.flush()
    34. msg_cnt += 1
    35. meta_info = dict(
    36. tablename=tablename,
    37. batchid=batchid,
    38. cnt=msg_cnt,
    39. )
    40. print(meta_info)
    41. print(producer)
    42. producer.send(Config.META_TOPIC, value=json.dumps(meta_info).encode('utf-8'))
    43. logInfo('(table_name:{0} batch_id:{1}) 数据发送到kafka成功'.format(tablename, batchid))
    44. except KafkaError as e:
    45. print(e)
    46. if __name__ == '__main__':
    47. batchid = sys.argv[1]
    48. # topic = 'gp_topic1'
    49. engine, conn = pgConnection(Config.gp_database)
    50. for i in range(len(Config.TOPICS)):
    51. producer = Kafka_producer(Config.KAFKA_SERVER, Config.TOPICS[i])
    52. producer.sendjsondata(Config.SOURCE_TABLES[i], batchid, engine)



    1. {"tablename": "public.baseinfo", "batchid": "1", "cnt": 4}


    1. {"tablename": "public.baseinfo", "batchid": "1", "cnt": 4}
    2. {"tablename": "public.seemployees", "batchid": "1", "cnt": 1}






    Invoking this method makes all buffered records immediately available to send (even if linger_ms is greater than 0) and blocks on the completion of the requests associated with these records. The post-condition of flush() is that any previously sent record will have completed (e.g. Future.is_done() == True). A request is considered completed when either it is successfully acknowledged according to the ‘acks’ configuration for the producer, or it results in an error.

    Other threads can continue sending messages while one thread is blocked waiting for a flush call to complete; however, no guarantee is made about the completion of messages sent after the flush call begins.


    调用此方法可以立即发送所有Producer中缓冲的记录(即使linger_ms大于0),并在与这些记录关联的请求完成时阻塞。flush()的后置条件是以前发送的任何记录都已经完成(例如Future.is_done() == True)。根据Producer的’ acks ‘配置,如果请求成功确认,或者请求导致错误,则认为该请求已完成。


