Flume有两种方式给消息输入到Kafka中,一种是Kafka channel,另外一种是Kafka sink .

Kafka Sink

步骤:

1)配置flume(flume-kafka.conf)

  1. # define
  2. a1.sources = r1
  3. a1.sinks = k1
  4. a1.channels = c1
  5. # source
  6. a1.sources.r1.type = exec
  7. a1.sources.r1.command = tail -F -c +0 /opt/module/datas/flume.log
  8. a1.sources.r1.shell = /bin/bash -c
  9. # sink
  10. a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
  11. a1.sinks.k1.kafka.bootstrap.servers = zjj101:9092,zjj102:9092,zjj103:9092
  12. a1.sinks.k1.kafka.topic = first
  13. # 一次批量发送20个
  14. a1.sinks.k1.kafka.flumeBatchSize = 20
  15. a1.sinks.k1.kafka.producer.acks = 1
  16. a1.sinks.k1.kafka.producer.linger.ms = 1
  17. # channel
  18. a1.channels.c1.type = memory
  19. a1.channels.c1.capacity = 1000
  20. a1.channels.c1.transactionCapacity = 100
  21. # bind
  22. a1.sources.r1.channels = c1
  23. a1.sinks.k1.channel = c1

2) 启动kafkaIDEA消费者

  1. package com.consumer;
  2. import org.apache.kafka.clients.consumer.ConsumerRecord;
  3. import org.apache.kafka.clients.consumer.ConsumerRecords;
  4. import org.apache.kafka.clients.consumer.KafkaConsumer;
  5. import java.util.Arrays;
  6. import java.util.Properties;
  7. public class CustomConsumer {
  8. public static void main(String[] args) {
  9. Properties props = new Properties();
  10. props.put("bootstrap.servers", "zjj101:9092");
  11. props.put("group.id", "test");
  12. //enable.auto.commit:是否开启自动提交offset功能
  13. props.put("enable.auto.commit", "true");
  14. //auto.commit.interval.ms:自动提交offset的时间间隔
  15. props.put("auto.commit.interval.ms", "1000");
  16. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  17. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  18. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  19. //订阅 名字为 first的 topic ,可以订阅好几个topic
  20. consumer.subscribe(Arrays.asList("first"));
  21. while (true) {
  22. //拉取时间,如果拉取时间超过这个时间还没拉取出来数据就算拉失败了
  23. ConsumerRecords<String, String> records = consumer.poll(100);
  24. for (ConsumerRecord<String, String> record : records) {
  25. System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
  26. //输出: offset = 400, key = 0, value = 0
  27. }
  28. }
  29. }
  30. }

3) 进入flume根目录下,启动flume


$ bin/flume-ng agent -c conf/ -n a1 -f jobs/flume-kafka.conf

4) 向 /opt/module/datas/flume.log里追加数据,查看kafka消费者消费情况

$ echo hello >> /opt/module/datas/flume.log