在实际工作中flume和kafka会深度结合使用
1:flume采集数据,将数据实时写入kafka
2:flume从kafka中消费数据,保存到hdfs,做数据备份

看一个综合案例

使用flume采集日志文件中产生的实时数据,写入到kafka中,然后再使用flume从kafka中将数据消费出来,保存到hdfs上面

那为什么不直接使用flume将采集到的日志数据保存到hdfs上面呢?
因为中间使用kafka进行缓冲之后,后面既可以实现实时计算,又可以实现离线数据备份,最终实现离线计算,所以这一份数据就可以实现两种需求,使用起来很方便,所以在工作中一般都会这样做。
image.png
在Flume中,针对Kafka提供的有KafkaSourceKafkaSink
KafkaSource:从kafka中读取数据
KafkaSink:向kafka中写入数据
需要配置两个Agent
第一个Agent:负责实时采集日志文件,将采集到的数据写入kafka中,使用KafkaSink
第二个Agent:负责从kafka中读取数据,将数据写入HDFS中进行备份(落盘),使用KafkaSource

Log→Flume→Kafka

第一个Agent负责实时采集日志文件,将采集到的数据写入Kafka中,使用KafkaSink :::info source:ExecSource,使用tail -F监控日志文件即可
channel:MemoryChannel
sink:KafkaSink ::: file-to-kafka.conf

  1. # Name the components on this agent
  2. a1.sources = r1
  3. a1.sinks = k1
  4. a1.channels = c1
  5. # 配置source组件
  6. a1.sources.r1.type = exec
  7. a1.sources.r1.command = tail -F /data/log/test.log
  8. # 配置channel组件
  9. a1.channels.c1.type = memory
  10. a1.channels.c1.capacity = 1000
  11. a1.channels.c1.transactionCapacity = 100
  12. # 配置sink组件
  13. a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
  14. a1.sinks.k1.kafka.topic = test_r1p5
  15. a1.sinks.k1.kafka.bootstrap.servers = bigdata1:9092
  16. # 一次向kafka中写多少条数据,默认值为100,在这里为了演示方便,改为1。表示采集到1条就写一次
  17. # 在实际工作中这个值具体设置多少需要在传输效率和数据延迟上进行取舍
  18. # 如果kafak后面的实时计算程序对数据的要求是低延迟,那么这个值小一点比较好
  19. # 如果kafka后面的实时计算程序对数据延迟没什么要求,那么就考虑传输性能,一次多传输一些数据,这样吞吐量会有所提升
  20. a1.sinks.k1.kafka.flumeBatchSize = 1
  21. a1.sinks.k1.kafka.producer.acks = 1
  22. # 一个Batch被创建之后,最多过多久,不管这个Batch有没有写满,都必须发送出去
  23. # linger.ms和flumeBatchSize,哪个先满足先按哪个规则执行,这个值默认是0,在这设置为1表示每隔1毫秒就将这一个Batch中的数据发送出去
  24. a1.sinks.k1.kafka.producer.linger.ms = 1
  25. # 指定数据传输时的压缩格式,对数据进行压缩,提高传输效率
  26. a1.sinks.k1.kafka.producer.compression.type = snappy
  27. # 把组件连接起来
  28. a1.sources.r1.channels = c1
  29. a1.sinks.k1.channel = c1

Kafka→Flume→HDFS

第二个Agent负责从Kafka中读取数据,将数据写入HDFS中进行备份,使用KafkaSource :::info Source:KafkaSource
channel:MemoryChannel
sink:HdfsSink ::: kafka-to-hdfs.conf

  1. # Name the components on this agent
  2. a1.sources = r1
  3. a1.sinks = k1
  4. a1.channels = c1
  5. # 配置source组件
  6. a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
  7. # 一次性向channel中写入的最大数据量,在这为了演示方便,设置为1
  8. # 这个参数的值不要大于MemoryChannel中transactionCapacity的值
  9. a1.sources.r1.batchSize = 1
  10. # 最大多长时间向channel写一次数据
  11. a1.sources.r1.batchDurationMillis = 2000
  12. # kafka地址
  13. a1.sources.r1.kafka.bootstrap.servers = bigdata1:9092
  14. # topic名称,可以指定一个或者多个,多个topic之间使用逗号隔开
  15. # 也可以使用正则表达式指定一个topic名称规则
  16. a1.sources.r1.kafka.topics = test_r1p5
  17. # 指定消费者组
  18. a1.sources.r1.kafka.consumer.group.id = flume-con1
  19. # 配置channel组件
  20. a1.channels.c1.type = memory
  21. a1.channels.c1.capacity = 1000
  22. a1.channels.c1.transactionCapacity = 100
  23. # 配置sink组件
  24. a1.sinks.k1.type = hdfs
  25. a1.sinks.k1.hdfs.path = hdfs://192.168.1.21:9000/kafkaout
  26. a1.sinks.k1.hdfs.filePrefix = data-
  27. a1.sinks.k1.hdfs.fileType = DataStream
  28. a1.sinks.k1.hdfs.writeFormat = Text
  29. a1.sinks.k1.hdfs.rollInterval = 3600
  30. a1.sinks.k1.hdfs.rollSize = 134217728
  31. a1.sinks.k1.hdfs.rollCount = 0
  32. # 把组件连接起来
  33. a1.sources.r1.channels = c1
  34. a1.sinks.k1.channel = c1

在flume目录下复制两个conf目录,便于区分两个agent的日志

  1. cd /data/soft/apache-flume-1.9.0-bin
  2. cp -r conf conf-file-to-kafka
  3. cp -r conf conf-kafka-to-hdfs

修改 conf_file_to_kafkaconf_kafka_to_hdfs中log4j的配置

  1. cd conf-file-to-kafka
  2. vi log4j.properties
  3. ...
  4. flume.root.logger=ERROR,LOGFILE
  5. flume.log.file=flume-file-to-kafka.log
  6. ...
  1. cd conf-kafka-to-hdfs
  2. vi log4j.properties
  3. ...
  4. flume.root.logger=ERROR,LOGFILE
  5. flume.log.file=flume-kafka-to-hdfs.log
  6. ...

把刚才配置的两个Agent的配置文件复制到这两个目录下

  1. cd conf-file-to-kafka
  2. vi file-to-kafka.conf
  3. .....把file-to-kafka.conf文件中的内容复制进来即可

image.png

  1. cd conf-kafka-to-hdfs/
  2. vi kafka-to-hdfs.conf
  3. .....把kafka-to-hdfs.conf文件中的内容复制进来即可

image.png
启动这两个Flume Agent
确保zookeeper集群、kafka集群和Hadoop集群是正常运行的
以及Kafka中的topic需要提前创建好

创建topic

  1. bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 5 --replication-factor 2 --topic test_r2p5

测试的时候忘记创建topic,默认生成了一个r1p1的test_r2p5

先启动第二个Agent,再启动第一个Agent

  1. bin/flume-ng agent --name a1 --conf conf-kafka-to-hdfs --conf-file conf-kafka-to-hdfs/kafka-to-hdfs.conf
  2. bin/flume-ng agent --name a1 --conf conf-file-to-kafka --conf-file conf-file-to-kafka/file-to-kafka.conf

模拟产生日志数据,

  1. cd /data/log/
  2. echo hello world >> /data/log/test.log

标准输出重定向,会覆盖掉之前的内容

标准输出重定向,追加

image.png
到HDFS上查看数据,验证结果:
image.png

此时Flume可以通过tail -F命令实时监控文件中的新增数据,发现有新数据就写入kafka,然后kafka后面的flume落盘程序,以及kafka后面的实时计算程序就可以使用这份数据了。