flume段定义
    flume_kafka.conf

    1. a1.sources = r1
    2. a1.sinks = k1
    3. a1.channels = c1
    4. #定义source信息
    5. a1.sources.r1.type=netcat
    6. a1.sources.r1.bind=localhost
    7. a1.sources.r1.port=8888
    8. a1.channels.c1.type=memory
    9. a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    10. a1.sinks.k1.kafka.topic = test
    11. a1.sinks.k1.brokerList = s2:9092
    12. a1.sinks.k1.kafka.flumeBatchSize = 20
    13. a1.sinks.k1.kafka.producer.acks = 1
    14. a1.sources.r1.channels = c1
    15. a1.sinks.k1.channel = c1

    启动flume

    1. bin/flume-ng agent -f ../conf/flume_kafka.conf -n a1 -Dflume.root.logger=INFO,console

    kafka端数据采集

    生产者-消费者测试

    1. 生产者
    2. kafka-console-producer.sh --broker-list s2:9092 --topic test
    3. 消费者
    4. kafka-console-consumer.sh --bootstrap-server s2:9092 --topic test

    测试方式:

    1. 1.启动nc
    2. [root@s3 ~]# nc localhost 8888
    3. 2.启动flume
    4. [root@s3 flumeconfig]# flume-ng agent -f flume_kafka.conf -n a1 -Dflume.root.logger=INFO,console
    5. 3.启动kafka消费者
    6. [root@s2 ~]# kafka-console-consumer.sh --bootstrap-server s2:9092 --topic test
    7. nc输入abc,在kafka消费者有消息显示