kafka source

配置文件:

  1. a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource --source类型
  2. a1.sources.r1.kafka.bootstrap.servers = hadoop105:9092,hadoop106:9092 -- kafka的集群
  3. a1.sources.r1.kafka.topics=topic_log -- 订阅的话题
  4. a1.sources.r1.batchSize=6000 --putlist中数据达到了6K以后提交到channel
  5. a1.sources.r1.batchDurationMillis=2000 --拉取数据的时间达到2s以后,将获取的数据提交到channel

kakfa channel

  • kakfa channel这种情况使用的最多,
  • 此时的flume可以是
    • 消费者、生产者、source和sink之间的缓冲区(具有高吞吐量的优势),Channel是位于Source和Sink之间的缓冲区。

配置文件:

  1. a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel ----channel类型
  2. a1.channels.c1.kafka.bootstrap.servers = hadoop105:9092,hadoop106:9092,hadoop107:9092 --kafka集群
  3. a1.channels.c1.kafka.topic =topic_log --话题
  4. a1.channels.c1.parseAsFlumeEvent=false --不需要eventheader数据

kafka sink

作用:将数据拉取到kafka的topic中。
配置文件:

  1. a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink --sink类型
  2. a1.sinks.k1.kafka.topic =topic_log --话题
  3. a1.sinks.k1.kafka.bootstrap.servers = hadoop105:9092,hadoop106:9092,hadoop107:9092 --kafka集群
  4. a1.sinks.k1.kafka.flumeBatchSize = 20
  5. a1.sinks.k1.kafka.producer.acks = 1 --副本策略
  6. a1.sinks.k1.kafka.producer.linger.ms = 1
  7. a1.sinks.k1.kafka.producer.compression.type = snappy --压缩格式