kafka source
配置文件:
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource --source类型
a1.sources.r1.kafka.bootstrap.servers = hadoop105:9092,hadoop106:9092 -- kafka的集群
a1.sources.r1.kafka.topics=topic_log -- 订阅的话题
a1.sources.r1.batchSize=6000 --putlist中数据达到了6K以后提交到channel中
a1.sources.r1.batchDurationMillis=2000 --拉取数据的时间达到2s以后,将获取的数据提交到channel中
kakfa channel
- kakfa channel这种情况使用的最多,
- 此时的flume可以是
- 消费者、生产者、source和sink之间的缓冲区(具有高吞吐量的优势),Channel是位于Source和Sink之间的缓冲区。
配置文件:
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel ----channel类型
a1.channels.c1.kafka.bootstrap.servers = hadoop105:9092,hadoop106:9092,hadoop107:9092 --kafka集群
a1.channels.c1.kafka.topic =topic_log --话题
a1.channels.c1.parseAsFlumeEvent=false --不需要event的header数据
kafka sink
作用:将数据拉取到kafka的topic中。
配置文件:
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink --sink类型
a1.sinks.k1.kafka.topic =topic_log --话题
a1.sinks.k1.kafka.bootstrap.servers = hadoop105:9092,hadoop106:9092,hadoop107:9092 --kafka集群
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1 --副本策略
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy --压缩格式