1 代码示例

  • 安装kafka的python模块

    pip install kafka-python

image.png

(1) 生产者

  • producer.py ```python from kafka import KafkaProducer from kafka.errors import kafka_errors import traceback import json

def producer_demo():

  1. # 假设生产的消息为键值对,且序列化方式为json
  2. producer = KafkaProducer(
  3. bootstrap_servers=['localhost:9092'],
  4. key_serializer=lambda k: json.dumps(k).encode(),
  5. value_serializer=lambda v: json.dumps(v).encode())
  6. # 发送三条消息
  7. for i in range(3):
  8. future = producer.send(
  9. 'demo',
  10. key='count_num', # 同一个key值,会被送至同一个分区
  11. value=str(i),
  12. partition=0) # 向分区0发送消息
  13. print("send {}".format(str(i)))
  14. try:
  15. future.get(timeout=10) # 监控是否发送成功
  16. except kafka_errors: # 发送失败抛出kafka_errors
  17. traceback.format_exc()

producer_demo()

  1. <a name="vnB3P"></a>
  2. ## (2) 消费者
  3. - consumer.py
  4. ```python
  5. from kafka import KafkaConsumer
  6. import json
  7. def consumer_demo():
  8. consumer = KafkaConsumer(
  9. 'demo',
  10. bootstrap_servers='localhost:9092',
  11. group_id='test'
  12. )
  13. for message in consumer:
  14. print("receive, key: {}, value: {}".format(
  15. json.loads(message.key.decode()),
  16. json.loads(message.value.decode())
  17. )
  18. )
  19. consumer_demo()

image.pngimage.png