1 代码示例
- 安装kafka的python模块
pip install kafka-python
(1) 生产者
- producer.py ```python from kafka import KafkaProducer from kafka.errors import kafka_errors import traceback import json
def producer_demo():
# 假设生产的消息为键值对,且序列化方式为json
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
key_serializer=lambda k: json.dumps(k).encode(),
value_serializer=lambda v: json.dumps(v).encode())
# 发送三条消息
for i in range(3):
future = producer.send(
'demo',
key='count_num', # 同一个key值,会被送至同一个分区
value=str(i),
partition=0) # 向分区0发送消息
print("send {}".format(str(i)))
try:
future.get(timeout=10) # 监控是否发送成功
except kafka_errors: # 发送失败抛出kafka_errors
traceback.format_exc()
producer_demo()
<a name="vnB3P"></a>
## (2) 消费者
- consumer.py
```python
from kafka import KafkaConsumer
import json
def consumer_demo():
consumer = KafkaConsumer(
'demo',
bootstrap_servers='localhost:9092',
group_id='test'
)
for message in consumer:
print("receive, key: {}, value: {}".format(
json.loads(message.key.decode()),
json.loads(message.value.decode())
)
)
consumer_demo()