将日志从采集日志层flume发送到kafka集群后,接下来的需求是将日志数据通过flume进行落盘存储到hdfs。
部署Flume Agent到hadoop003上,进而实现hadoop001、hadoop002负责日志的生产和采集、hadoop003负责日志的消费存储。
消费日志层flume集群规划


hadoop001 hadoop002 hadoop003
Flume(消费kafka) flume

消费日志Flume配置

配置分析

消费日志层flume主要从Kafka中读取消息,所以要选用kafka Source 。
channel选用File Channel,能最大限度避免数据丢失。
sink选用HDFS Sink,可以将日志直接落盘到HDFS中。

Flume配置

创建配置文件

  1. touch /root/flume/conf/kafka-flume-hdfs.conf
  2. vi kafka-flume-hdfs.conf
  3. #文件内容如下
  4. #flume agent 组件声明
  5. a1.sources=r1 r2
  6. a1.channels= c1 c2
  7. a1.sinks=k1 k2
  8. #Source1属性配置
  9. #配置Source 类型为kafka source
  10. a.sources.r1.type=org.apache.flume.source.kafka.kafkaSource
  11. #配置kafka source 每次从kafka topic中拉取的event个数
  12. a1.sources.r1.batchSize =5000
  13. #配置拉取数据批次间隔为2000ms
  14. a1.sources.r1.batchDurationMillis = 2000
  15. #配置kafka集群地址
  16. a1.sources.ri.kafka.bootstrap.servers =
  17. hadoop001:9092,hadoop002:9092,hadoop003:9092
  18. #配置source 对接kafka主题
  19. a1.sources.ri.kafka.topics=topic_start
  20. ##source2属性配置,与source1配置相似,只是消费主题不同
  21. a1.sources.r2.type = org.apache.flume.source.kafka.kafkaSource
  22. a1.sources.r1.batchSize =5000
  23. a1.sources.r1.batchDurationMillis = 2000
  24. a1.sources.ri.kafka.bootstrap.servers =
  25. hadoop001:9092,hadoop002:9092,hadoop003:9092
  26. a1.sources.ri.kafka.topics=topic_event
  27. ##channel1属性配置
  28. #channel1类型为file channel
  29. a1.channels.c1.type =file
  30. #配置存储file channel传输数据的断点信息目录
  31. a1.channels.c1.checkpointDir = /root/flume/checkpoint/behavior1
  32. #配置file channel 传输数据的存储位置
  33. a1.channels.c1.dataDirs = /root/flume/data/behavior1/
  34. #配置file channel最大存储容量
  35. a1.channels.c1.maxFileSize = 2146435071
  36. #配置file channel最多存储event个数
  37. a1.channels.c1.capacity = 1000000
  38. #配置channel满时,put事物的超时时间
  39. a1.channels.c1.keep-alive = 6
  40. #配置channel2属性配置同channel1,配置不同路径
  41. a1.channels.c2.type =file
  42. a1.channels.c2.checkpointDir = /root/flume/checkpoint/behavior2
  43. a1.channels.c2.dataDirs = /root/flume/data/behavior2/
  44. a1.channels.c2.maxFileSize = 2146435071
  45. a1.channels.c2.capacity = 1000000
  46. a1.channels.c2.keep-alive = 6
  47. #sink1属性配置
  48. a1.sinks.k1.type = hdfs
  49. a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d
  50. a1.sinks.k1.hdfs.filePrefix = logstart-
  51. #sink2属性配置同sink1
  52. a1.sinks.k2.type = hdfs
  53. a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d
  54. a1.sinks.k2.hdfs.filePrefix = logstart-
  55. #避免产生大量小文件的相关属性配置
  56. a1.sinks.k1.hdfs.rollInterval = 10
  57. a1.sinks.k1.hdfs.rollSize = 134217728
  58. a1.sinks.k1.hdfs.rollCount = 0
  59. a1.sinks.k2.hdfs.rollInterval = 10
  60. a1.sinks.k2.hdfs.rollSize = 134217728
  61. a1.sinks.k2.hdfs.rollCount = 0
  62. #控制输出文件是压缩文件
  63. a1.sinks.k1.hdfs.fileType = CompressedStream
  64. a1.sinks.k2.hdfs.fileType = CompressedStream
  65. a1.sinks.k1.hdfs.codeC = lzop
  66. a1.sinks.k2.hdfs.codeC = lzop
  67. #拼装
  68. a1.sources.r1.channels = c1
  69. a1.sinks.k1.channel = c1
  70. a1.sources.r2.channels = c2
  71. a1.sinks.k2.channel = c2

消费日志Flume启动,停止脚本

  1. vi f2.sh
  2. #! /bin/bash
  3. case $1 in
  4. "start"){
  5. for i in hadoop003
  6. do
  7. echo "-------启动 $i 消费flume ---------"
  8. ssh $i "source /etc/profile;nohup /root/flume/bin/fulme-ng agent --conf-file /root/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/root/flume/log.txt 2>&1 &"
  9. done
  10. };;
  11. "stop"){
  12. for i in hadoop003
  13. do
  14. echo "-------停止 $i 消费flume ---------"
  15. ssh $i " ps -ef|grep kafka-flume-hdfs |grep -v grep |awk '{print$2}' |xargs kill 9"
  16. done
  17. };;
  18. esac
  19. :wq

注:如xargs kill 后不带参数,会报错(提示kill加入参数)

  1. chmod 777 f2.sh #加权限
  2. ./f2.sh start
  3. ./f2.sh stop

结果如下
image.png