目前Kafka Python Client共有三种原生库:
kafka(推荐Consumer使用此库)
pykafka(推荐Producer使用此库)
confluent-kafka(Producer发送最快)
- 消费端
标准demo
from kafka import KafkaConsumer
import json
import os
from io import BytesIO
import avro.schema
from avro.io import DatumReader, DatumWriter, BinaryDecoder, BinaryEncoder
consumer = KafkaConsumer('pqchen',bootstrap_servers='192.168.202.37:9092',group_id='my_favorite_group')
SCHEME = {
"type": "record",
"name": "Event",
"fields": [
{"name": "headers", "type": {"type": "map", "values": "string"}},
{"name": "body", "type": "bytes"}
]
}
scheme = avro.schema.SchemaFromJSONData(SCHEME)
def avro_decode(binary_data, schema=None):
bio = BytesIO(binary_data)
binary_decoder = BinaryDecoder(bio)
return DatumReader(writer_schema=schema).read(binary_decoder)
for msg in consumer:
msd_json = avro_decode(msg.value,scheme)
a = msd_json['body'].decode('utf8')
print(a)
from kafka import KafkaConsumer
安装报错解决
UnsupportedCodecError: UnsupportedCodecError: Libraries for snappy compression codec not found
pip install python-snappy
安装 python-snappy 报 snappy/snappymodule.cc:31:22: 致命错误:snappy-c.h:没有那个文件或目录错误
参考: https://stackoverflow.com/questions/11416024/error-installing-python-snappy-snappy-c-h-no-such-file-or-directory
我在 centos7 中使用yum install snappy-devel 解决问题
python操作kafka报错:return ‘
‘ % self.async
python3.7 使用最新的kafka版本,但是pyPI上的kafka还没有被替换成最新的,可以使用下面的方法升级kafka python
pip install kafka-python