1 代码示例
- 安装kafka的python模块
pip install kafka-python
(1) 生产者
- producer.py ```python from kafka import KafkaProducer from kafka.errors import kafka_errors import traceback import json
def producer_demo():
# 假设生产的消息为键值对,且序列化方式为jsonproducer = KafkaProducer(bootstrap_servers=['localhost:9092'],key_serializer=lambda k: json.dumps(k).encode(),value_serializer=lambda v: json.dumps(v).encode())# 发送三条消息for i in range(3):future = producer.send('demo',key='count_num', # 同一个key值,会被送至同一个分区value=str(i),partition=0) # 向分区0发送消息print("send {}".format(str(i)))try:future.get(timeout=10) # 监控是否发送成功except kafka_errors: # 发送失败抛出kafka_errorstraceback.format_exc()
producer_demo()
<a name="vnB3P"></a>## (2) 消费者- consumer.py```pythonfrom kafka import KafkaConsumerimport jsondef consumer_demo():consumer = KafkaConsumer('demo',bootstrap_servers='localhost:9092',group_id='test')for message in consumer:print("receive, key: {}, value: {}".format(json.loads(message.key.decode()),json.loads(message.value.decode())))consumer_demo()


