flume段定义
flume_kafka.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#定义source信息
a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=8888
a1.channels.c1.type=memory
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = test
a1.sinks.k1.brokerList = s2:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动flume
bin/flume-ng agent -f ../conf/flume_kafka.conf -n a1 -Dflume.root.logger=INFO,console
kafka端数据采集
生产者-消费者测试
生产者
kafka-console-producer.sh --broker-list s2:9092 --topic test
消费者
kafka-console-consumer.sh --bootstrap-server s2:9092 --topic test
测试方式:
1.启动nc
[root@s3 ~]# nc localhost 8888
2.启动flume
[root@s3 flumeconfig]# flume-ng agent -f flume_kafka.conf -n a1 -Dflume.root.logger=INFO,console
3.启动kafka消费者
[root@s2 ~]# kafka-console-consumer.sh --bootstrap-server s2:9092 --topic test
在nc输入abc,在kafka消费者有消息显示