目前Kafka Python Client共有三种原生库:
kafka(推荐Consumer使用此库)
pykafka(推荐Producer使用此库)
confluent-kafka(Producer发送最快)

  1. 消费端

标准demo

  1. from kafka import KafkaConsumer
  2. import json
  3. import os
  4. from io import BytesIO
  5. import avro.schema
  6. from avro.io import DatumReader, DatumWriter, BinaryDecoder, BinaryEncoder
  7. consumer = KafkaConsumer('pqchen',bootstrap_servers='192.168.202.37:9092',group_id='my_favorite_group')
  8. SCHEME = {
  9. "type": "record",
  10. "name": "Event",
  11. "fields": [
  12. {"name": "headers", "type": {"type": "map", "values": "string"}},
  13. {"name": "body", "type": "bytes"}
  14. ]
  15. }
  16. scheme = avro.schema.SchemaFromJSONData(SCHEME)
  17. def avro_decode(binary_data, schema=None):
  18. bio = BytesIO(binary_data)
  19. binary_decoder = BinaryDecoder(bio)
  20. return DatumReader(writer_schema=schema).read(binary_decoder)
  21. for msg in consumer:
  22. msd_json = avro_decode(msg.value,scheme)
  23. a = msd_json['body'].decode('utf8')
  24. print(a)

from kafka import KafkaConsumer

安装报错解决

  1. UnsupportedCodecError: UnsupportedCodecError: Libraries for snappy compression codec not found

  2. python操作kafka报错:return ‘‘ % self.async

python3.7 使用最新的kafka版本,但是pyPI上的kafka还没有被替换成最新的,可以使用下面的方法升级kafka python
pip install kafka-python