Flume 配置模板

1. spooldir -> hdfs 的配置

  1. * Conf 配置
  2. agentDw.sources = SrcAccessLog
  3. agentDw.channels = ChAccesslog
  4. agentDw.sinks = SinkAccesslog
  5. # set SrcAccessLog
  6. # SrcSafeClickLog Source 配置
  7. agentDw.sources.SrcSafeClickLog.type = spooldir
  8. agentDw.sources.SrcSafeClickLog.spoolDir = /data/log/uba/access_log
  9. # 忽略文件正则
  10. agentDw.sources.SrcSafeClickLog.ignorePattern = ^(.)*\\.tmp$
  11. # 输入字符编码
  12. agentDw.sources.SrcSafeClickLog.inputCharset = UTF-8
  13. # 反序列化方式
  14. agentDw.sources.SrcSafeClickLog.deserializer = LINE
  15. # 一行最大字数
  16. agentDw.sources.SrcSafeClickLog.deserializer.maxLineLength = 204800
  17. agentDw.sources.SrcSafeClickLog.deserializer.outputCharset = UTF-8
  18. # 解码错误政策处理规则, FAIL(失效) || IGNORE(忽略)
  19. agentDw.sources.SrcSafeClickLog.decodeErrorPolicy = IGNORE
  20. # 完成删除文件 immediate | never
  21. agentDw.sources.SrcSafeClickLog.deletePolicy = immediate
  22. # 批处理条数
  23. agentDw.sources.SrcSafeClickLog.batchSize = 1000
  24. # 递归检测目录(必须开启)
  25. agentDw.sources.SrcSafeClickLog.recursiveDirectorySearch = true
  26. # 上传文件的绝对路径(必须开启)
  27. agentDw.sources.SrcSafeClickLog.fileHeader = true
  28. agentDw.sources.SrcSafeClickLog.fileHeaderKey = file
  29. # 上传的文件名(必须开启)
  30. agentDw.sources.SrcSafeClickLog.basenameHeader = true
  31. agentDw.sources.SrcSafeClickLog.basenameHeaderKey = basename
  32. agentDw.sources.SrcSafeClickLog.channels = ChAccesslog
  33. # set ChAccesslog
  34. # 保存类型
  35. agentDw.channels.ChAccesslog.type = file
  36. agentDw.channels.ChAccesslog.checkpointDir = /data/log/test/checkpoint
  37. agentDw.channels.ChAccesslog.dataDirs = /data/log/test/data
  38. # 设置最大线程数
  39. agentDw.channels.ChAccesslog.threads = 10
  40. # SinkAccesslog 设置
  41. agentDw.sinks.SinkAccesslog.type = hdfs
  42. agentDw.sinks.SinkAccesslog.channel = ChDwAccesslog
  43. agentDw.sinks.SinkAccesslog.hdfs.path = hdfs://uhadoop-ociicy-master2:8020/flume/test/access_log_%Y%m%d
  44. # hdfs 创建文件前缀
  45. agentDw.sinks.SinkAccesslog.hdfs.filePrefix = access_log
  46. # hdfs 创建文件后缀
  47. agentDw.sinks.SinkAccesslog.hdfs.fileSuffix = .log
  48. # 临时写入时的前缀
  49. agentDw.sinks.SinkAccesslog.hdfs.inUsePrefix = .
  50. agentDw.sinks.SinkAccesslog.hdfs.inUseSuffix = .tmp
  51. agentDw.sinks.SinkAccesslog.hdfs.round = true
  52. agentDw.sinks.SinkAccesslog.hdfs.roundValue = 10
  53. # 下舍入值的单位 second, minute or hour.
  54. agentDw.sinks.SinkAccesslog.hdfs.roundUnit = minute
  55. # 复制块, 用于控制滚动大小
  56. agentDw.sinks.SinkAccesslog.hdfs.minBlockReplicas=1
  57. # 文件大小来触发滚动(字节), 0: 永远不触发
  58. agentDw.sinks.SinkAccesslog.hdfs.rollSize = 0
  59. # 文件条数来触发滚动(数量), 0:永远不触发
  60. agentDw.sinks.SinkAccesslog.hdfs.rollCount = 0
  61. # 滚动前等待的秒数(秒), 0:没有时间间隔, 每隔多少秒产生一个新文件, 案例为 60 喵
  62. agentDw.sinks.SinkAccesslog.hdfs.rollInterval = 60
  63. # 写入格式
  64. agentDw.sinks.SinkAccesslog.hdfs.writeFormat = Text
  65. # 文件格式 : SequenceFile, DataStream(数据不会压缩输出文件) or CompressedStream(压缩输出,需要选择一个压缩/解码器)
  66. agentDw.sinks.SinkAccesslog.hdfs.fileType = DataStream
  67. # 批处理达到这个上限, 写到 HDFS
  68. agentDw.sinks.SinkAccesslog.hdfs.batchSize = 100
  69. # hdfs 打开、写、刷新、关闭的超时时间, 毫秒
  70. agentDw.sinks.SinkAccesslog.hdfs.callTimeout = 60000
  71. # 使用本地时间
  72. agentDw.sinks.SinkAccesslog.hdfs.useLocalTimeStamp = true

2. syslogtcp -> file 配置

  • 必须先启动 Flume, 开启 TCP|UDP 端口, 保证 Syslog 可以通过指定端口发送日志数据
  1. agentDw.sources = SrcUbaAppActionLog
  2. agentDw.channels = ChUbaAppActionLog
  3. agentDw.sinks = SinkUbaAppActionLog
  4. # UbaAppActionLog source 配置
  5. agentDw.sources.SrcUbaAppActionLog.type = syslogtcp
  6. agentDw.sources.SrcUbaAppActionLog.port = 10001
  7. agentDw.sources.SrcUbaAppActionLog.host = 0.0.0.0
  8. agentDw.sources.SrcUbaAppActionLog.channels = ChUbaAppActionLog
  9. # UbaAppActionLog channels 配置
  10. agentDw.channels.ChUbaAppActionLog.type = file
  11. agentDw.channels.ChUbaAppActionLog.checkpointDir = /var/log/flume/uba_app_action/checkpoint
  12. agentDw.channels.ChUbaAppActionLog.dataDirs = /var/log/flume/uba_app_action/data
  13. agentDw.channels.ChUbaAppActionLog.threads = 2
  14. # UbaAppActionLog sinks 配置
  15. agentDw.sinks.SinkUbaAppActionLog.channel = ChUbaAppActionLog
  16. agentDw.sinks.SinkUbaAppActionLog.type = thrift
  17. agentDw.sinks.SinkUbaAppActionLog.hostname = log1
  18. agentDw.sinks.SinkUbaAppActionLog.port = 18889

3. flume 多端口写入写出

  1. # Name the components on this agent
  2. a1.sources = r1 r2
  3. a1.sinks = k1 k2
  4. a1.channels = c1 c2
  5. # 定义两个 sources 分别来自 syslogtcp 的 44441 和 44442 端口
  6. a1.sources.r1.type = syslogtcp
  7. a1.sources.r1.bind = localhost
  8. a1.sources.r1.port = 44441
  9. a1.sources.r2.type = syslogtcp
  10. a1.sources.r2.bind = localhost
  11. a1.sources.r2.port = 44442
  12. # 定义两个 sinks 分别写入到 hdfs 中的不同目录下。
  13. a1.sinks.k1.type = hdfs
  14. a1.sinks.k1.hdfs.path = hdfs://localhost:9000/flume/events1/%y-%m-%d/
  15. a1.sinks.k1.hdfs.fileType=DataStream
  16. a1.sinks.k1.hdfs.writeFormat=Text
  17. a1.sinks.k1.hdfs.filePrefix = events-
  18. a1.sinks.k1.hdfs.rollCount= 0
  19. a1.sinks.k1.hdfs.rollSize= 0
  20. a1.sinks.k1.hdfs.rollInterval= 300
  21. a1.sinks.k1.hdfs.batchSize = 10000
  22. a1.sinks.k1.hdfs.useLocalTimeStamp = true
  23. a1.sinks.k2.type = hdfs
  24. a1.sinks.k2.hdfs.path = hdfs://localhost:9000/flume/events2/%y-%m-%d/
  25. a1.sinks.k2.hdfs.fileType=DataStream
  26. a1.sinks.k2.hdfs.writeFormat=Text
  27. a1.sinks.k2.hdfs.filePrefix = events-
  28. a1.sinks.k2.hdfs.rollCount= 0
  29. a1.sinks.k2.hdfs.rollSize= 0
  30. a1.sinks.k2.hdfs.rollInterval= 300
  31. a1.sinks.k2.hdfs.batchSize = 10000
  32. a1.sinks.k2.hdfs.useLocalTimeStamp = true
  33. # 定义两个 channels 因为需要两个 sinks 进行消费
  34. a1.channels.c1.type = memory
  35. a1.channels.c1.capacity = 1000
  36. a1.channels.c1.transactionCapacity = 100
  37. a1.channels.c2.type = memory
  38. a1.channels.c2.capacity = 1000
  39. a1.channels.c2.transactionCapacity = 100
  40. # Bind the source and sink to the channel
  41. a1.sources.r1.channels = c1
  42. a1.sources.r2.channels = c2
  43. a1.sinks.k1.channel = c1
  44. a1.sinks.k2.channel = c2

4. flume 一个 sources 多个 sinks

  • 这里举例 flume 同时写入到 hdfs 和 kafka
  1. a2.sources = r1
  2. a2.sinks = k1 k2
  3. a2.channels = c1 c2
  4. # 定义数据源来自 spooldir
  5. a2.sources.r1.type = spooldir
  6. a2.sources.r1.channels = c1
  7. a2.sources.r1.spoolDir = ~/work/test/flume_source
  8. a2.sources.r1.fileHeader = true
  9. # 写入到 kafka 端口为 9092 server 中的 test topic 中
  10. a2.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
  11. a2.sinks.k1.channel = c1
  12. a2.sinks.k1.kafka.topic = test
  13. a2.sinks.k1.kafka.bootstrap.servers = localhost:9092
  14. a2.sinks.k1.kafka.flumeBatchSize = 20
  15. a2.sinks.k1.kafka.producer.acks = 1
  16. a2.sinks.k1.kafka.producer.linger.ms = 1
  17. #a2.sinks.ki.kafka.producer.compression.type = snappy
  18. # 写入到 hdfs 目录下
  19. a2.sinks.k2.type = hdfs
  20. a2.sinks.k2.channel = c2
  21. a2.sinks.k2.hdfs.path = hdfs://localhost:9000/flume/events/%y-%m-%d/%H%M/%S
  22. a2.sinks.k2.hdfs.fileType=DataStream
  23. a2.sinks.k2.hdfs.writeFormat=Text
  24. a2.sinks.k2.hdfs.filePrefix = events-
  25. a2.sinks.k2.hdfs.round = true
  26. a2.sinks.k2.hdfs.roundValue = 10
  27. a2.sinks.k2.hdfs.roundUnit = minute
  28. a2.sinks.k2.hdfs.useLocalTimeStamp = true
  29. # 两个 sink 就要对应 两个 channels
  30. a2.channels.c1.type = memory
  31. a2.channels.c1.capacity = 1000
  32. a2.channels.c1.transactionCapacity = 100
  33. a2.channels.c2.type = memory
  34. a2.channels.c2.capacity = 1000
  35. a2.channels.c2.transactionCapacity = 100
  36. # Bind the source and sink to the channel
  37. a2.sources.r1.channels = c1 c2

5. 负载均衡和故障转移

  1. # 配置需要处理的 srouce channels slinks
  2. agentDw.sources = SrcDwAccessLog
  3. agentDw.channels = ChDwAccesslog
  4. agentDw.sinks = SinkDwAccesslog1 SinkDwAccesslogKafka
  5. # 对所有的出口 slink 做 Load balancing Sink Processor 负载平衡处理器配置, 防止远端单点故障
  6. agentDw.sinkgroups = SinkGroupSinkDwAccesslog
  7. # --- DwAccessLog 配置 Start --- #
  8. # SrcDwAccessLog source 配置
  9. agentDw.sources.SrcDwAccessLog.type = syslogudp
  10. agentDw.sources.SrcDwAccessLog.port = 10004
  11. agentDw.sources.SrcDwAccessLog.host = 0.0.0.0
  12. agentDw.sources.SrcDwAccessLog.channels = ChDwAccesslog
  13. # SrcDwAccessLog Interceptors 配置
  14. agentDw.sources.SrcDwAccessLog.interceptors = in1 in2
  15. # SrcDwAccessLog Search and Replace Interceptor 配置
  16. agentDw.sources.SrcDwAccessLog.interceptors.in1.type = search_replace
  17. # 正则替换 ^[a-zA-Z_]+\:[ ]{1} 或者 ^lb_access\:[ ]{1}
  18. agentDw.sources.SrcDwAccessLog.interceptors.in1.searchPattern = ^[a-zA-Z_]+\:[ ]{1}
  19. agentDw.sources.SrcDwAccessLog.interceptors.in1.replaceString =
  20. agentDw.sources.SrcDwAccessLog.interceptors.in1.charset = UTF-8
  21. # SrcDwAccessLog Timestamp Interceptor 配置
  22. agentDw.sources.SrcDwAccessLog.interceptors.in2.type = timestamp
  23. agentDw.sources.SrcDwAccessLog.interceptors.in2.preserveExisting = true
  24. # ChDwAccesslog channels 配置
  25. agentDw.channels.ChDwAccesslog.type = file
  26. agentDw.channels.ChDwAccesslog.checkpointDir = /var/log/flume/dw_access_log/checkpoint
  27. agentDw.channels.ChDwAccesslog.dataDirs = /var/log/flume/dw_access_log/data
  28. agentDw.channels.ChDwAccesslog.capacity = 10000
  29. agentDw.channels.ChDwAccesslog.threads = 2
  30. # SinkDwAccesslog To File sinks 配置
  31. #agentDw.sinks.SinkDwAccesslog.channel = ChDwAccesslog
  32. #agentDw.sinks.SinkDwAccesslog.type = file_roll
  33. #agentDw.sinks.SinkDwAccesslog.sink.directory = /var/log/flume/dw_access_log/test
  34. # SinkDwAccesslogKafka To Kafka 配置
  35. #agentDw.sinks.SinkDwAccesslogKafka.channel = ChDwAccesslog
  36. #agentDw.sinks.SinkDwAccesslogKafka.type = org.apache.flume.sink.kafka.KafkaSink
  37. #agentDw.sinks.SinkDwAccesslogKafka.kafka.bootstrap.servers = bi4:9092
  38. #agentDw.sinks.SinkDwAccesslogKafka.kafka.topic = accessLogTest
  39. #agentDw.sinks.SinkDwAccesslogKafka.kafka.flumeBatchSize = 20
  40. # 被接受的值为0(从不等待确认),1(只等待领导),-1(等待所有副本)将其设置为-1
  41. #agentDw.sinks.SinkDwAccesslogKafka.kafka.producer.acks = 1
  42. #agentDw.sinks.SinkDwAccesslogKafka.kafka.producer.linger.ms = 1
  43. #agentDw.sinks.SinkDwAccesslogKafka.kafka.producer.compression.type = snappy
  44. # SinkDwAccesslog0 To thrift sinks 配置
  45. agentDw.sinks.SinkDwAccesslog0.channel = ChDwAccesslog
  46. agentDw.sinks.SinkDwAccesslog0.type = thrift
  47. agentDw.sinks.SinkDwAccesslog0.hostname = log0
  48. agentDw.sinks.SinkDwAccesslog0.port = 18889
  49. # 批量提交的个数
  50. agentDw.sinks.SinkDwAccesslog0.batch-size = 1000
  51. # 请求超时时间, 单位毫秒
  52. agentDw.sinks.SinkDwAccesslog0.request-timeout = 20000
  53. # 连接超时时间, 单位毫秒
  54. agentDw.sinks.SinkDwAccesslog0.connect-timeout = 3000
  55. # 重新连接 source 的时间, 单位秒, 用于后端负载均衡的轮询时间
  56. # 重接秒数, 如在故障转移模式时, 当前的 slinks 故障时间超过阈值, 就会转移到另外一个 slinks 处理
  57. agentDw.sinks.SinkDwAccesslog0.connection-reset-interval = 300
  58. # SinkDwAccesslog1 To thrift sinks 配置
  59. agentDw.sinks.SinkDwAccesslog1.channel = ChDwAccesslog
  60. agentDw.sinks.SinkDwAccesslog1.type = thrift
  61. agentDw.sinks.SinkDwAccesslog1.hostname = log1
  62. agentDw.sinks.SinkDwAccesslog1.port = 18889
  63. agentDw.sinks.SinkDwAccesslog1.batch-size = 1000
  64. agentDw.sinks.SinkDwAccesslog1.request-timeout = 20000
  65. agentDw.sinks.SinkDwAccesslog1.connect-timeout = 3000
  66. agentDw.sinks.SinkDwAccesslog1.connection-reset-interval = 300
  67. # SinkGroupSinkDwAccesslog 负载均衡
  68. agentDw.sinkgroups.SinkGroupSinkDwAccesslog.sinks = SinkDwAccesslog0 SinkDwAccesslog1
  69. agentDw.sinkgroups.SinkGroupSinkDwAccesslog.processor.type = load_balance
  70. # random(随机) 和 round_robin(轮询)
  71. agentDw.sinkgroups.SinkGroupSinkDwAccesslog.processor.selector = round_robin
  72. # 当某个sink不可用时,就会被加入黑名单列表中,一定时间之后再从黑名单中移除,继续被尝试。
  73. agentDw.sinkgroups.SinkGroupSinkDwAccesslog.processor.backoff = true
  74. # 黑名单的最长有效期, 单位毫秒(这里配置是: 1800 S)
  75. agentDw.sinkgroups.SinkGroupSinkDwAccesslog.processor.selector.maxTimeOut = 1800000
  76. # SinkGroupSinkDwAccesslog 故障转义
  77. agentDw.sinkgroups.SinkGroupSinkDwAccesslog.sinks = SinkDwAccesslog0 SinkDwAccesslog1
  78. agentDw.sinkgroups.SinkGroupSinkDwAccesslog.processor.type = failover
  79. agentDw.sinkgroups.SinkGroupSinkDwAccesslog.processor.priority.SinkDwAccesslog0 = 1
  80. agentDw.sinkgroups.SinkGroupSinkDwAccesslog.processor.priority.SinkDwAccesslog1 = 100
  81. agentDw.sinkgroups.SinkGroupSinkDwAccesslog.processor.maxpenalty = 10000
  82. # --- DwAccessLog 配置 End --- #

6. hdfs sink 参数说明

  1. agentDw.sinks.SinkDwAccesslog1.type = hdfs
  2. agentDw.sinks.SinkDwAccesslog1.channel = ChDwAccesslog
  3. # 写入目录和文件规则
  4. agentDw.sinks.SinkDwAccesslog1.hdfs.path = hdfs://uhadoop-ociicy-master2:8020/flume/dw_access_log/dw_access_log_%Y%m%d
  5. agentDw.sinks.SinkDwAccesslog1.hdfs.filePrefix = dw_access_log
  6. agentDw.sinks.SinkDwAccesslog1.hdfs.fileSuffix = .log
  7. # 写入文件前缀规则
  8. agentDw.sinks.SinkDwAccesslog1.hdfs.inUsePrefix = .
  9. agentDw.sinks.SinkDwAccesslog1.hdfs.inUseSuffix = .tmp
  10. # hdfs 舍弃时间
  11. agentDw.sinks.SinkDwAccesslog1.hdfs.round = true
  12. # 时间上进行”舍弃”的单位,包含:second,minute,hour
  13. agentDw.sinks.SinkDwAccesslog1.hdfs.roundUnit = minute
  14. # 时间上进行“舍弃”的值, 2015-10-16 17:38:59 会被舍弃成 17:35, 5 分钟内的时间都被舍弃掉
  15. agentDw.sinks.SinkDwAccesslog1.hdfs.roundValue = 5
  16. # 复制块, 用于控制滚动大小
  17. agentDw.sinks.SinkDwAccesslog1.hdfs.minBlockReplicas=1
  18. # hdfs 间隔多长将临时文件重命名成最终目标文件, 并新打开一个临时文件来写入数据, 0 则表示不根据时间来滚动文件 (单位秒)
  19. agentDw.sinks.SinkDwAccesslog1.hdfs.rollInterval = 300
  20. # hdfs 临时文件达到 rollSize 值, 则滚动成目标文件, 0 则表示不根据临时文件大小来滚动文件(单位:bytes)
  21. agentDw.sinks.SinkDwAccesslog1.hdfs.rollSize = 0
  22. #events 数据达到该数量时候,将临时文件滚动成目标文件, 0 则表示不根据 events 数据来滚动文件
  23. agentDw.sinks.SinkDwAccesslog1.hdfs.rollCount = 0
  24. # 写入格式(必须 Text)
  25. agentDw.sinks.SinkDwAccesslog1.hdfs.writeFormat = Text
  26. # 不压缩
  27. # 文件格式 : SequenceFile, DataStream(数据不会压缩输出文件) or CompressedStream(压缩 Stream)
  28. agentDw.sinks.SinkDwAccesslog1.hdfs.fileType = DataStream
  29. # 设置压缩方式(当使用 CompressedStream 时,保存文件为压缩格式): gzip, bzip2, lzo, lzop, snappy
  30. agentDw.sinks.SinkAccesslog.hdfs.codeC = snappy
  31. # 这个拦截器写事件输出流的身体没有任何转换或修改, 事件标题将被忽略
  32. agentDw.sinks.SinkSafeClickLog.sink.serializer = text
  33. # 换行符追加到每个事件
  34. agentDw.sinks.SinkSafeClickLog.sink.serializer.appendNewline = true
  35. # 每个批次刷新到 HDFS上 的 events 数量
  36. agentDw.sinks.SinkDwAccesslog1.hdfs.batchSize = 10000
  37. # hdfs 打开、写、刷新、关闭的超时时间, 毫秒
  38. agentDw.sinks.SinkDwAccesslog1.hdfs.callTimeout = 60000
  39. # 当目前被打开的临时文件在该参数指定的时间(秒)内,没有任何数据写入,则将该临时文件关闭并重命名成目标文件
  40. agentDw.sinks.SinkDwAccesslog1.hdfs.idleTimeout = 0
  41. # 使用本地时间
  42. agentDw.sinks.SinkDwAccesslog1.hdfs.useLocalTimeStamp = true
  43. # batchsize < transactionCapacity || batchsize = transactionCapacity

Souce

  1. # Sources http
  2. agentDw.sources.srcHttp.type = http
  3. agentDw.sources.srcHttp.port = 10102
  4. agentDw.sources.srcHttp.bind = 0.0.0.0
  5. agentDw.sources.srcHttp.handler = com.dw.flume.source.http.HTTPCustomHandler
  6. agentDw.sources.srcHttp.threads = 8
  7. agentDw.sources.srcHttp.selector.type = replicating
  8. agentDw.sources.srcHttp.channels = chHdfs chKafka
  9. ## Interceptors
  10. agentDw.sources.srcHttp.interceptors = in1
  11. agentDw.sources.srcHttp.interceptors.in1.type = timestamp
  12. agentDw.sources.srcHttp.interceptors.in1.preserveExisting = true
  13. # Sources thrift
  14. agentDw.sources.srcThrift.type = thrift
  15. agentDw.sources.srcThrift.port = 10202
  16. agentDw.sources.srcThrift.bind = 0.0.0.0
  17. agentDw.sources.srcThrift.threads = 8
  18. agentDw.sources.srcThrift.selector.type = replicating
  19. agentDw.sources.srcThrift.channels = chHdfs chKafka
  20. ## Interceptors
  21. agentDw.sources.srcThrift.interceptors = in1
  22. agentDw.sources.srcThrift.interceptors.in1.type = timestamp
  23. agentDw.sources.srcThrift.interceptors.in1.preserveExisting = true
  24. # Sources avro
  25. agentDw.sources.srcAvro.type = avro
  26. agentDw.sources.srcAvro.port = 10302
  27. agentDw.sources.srcAvro.bind = 0.0.0.0
  28. agentDw.sources.srcAvro.threads = 8
  29. # 压缩算法, 对应 slink 也要配置 compression-type = deflate, 默认 none
  30. agentDw.sources.srcAvro.compression-type = deflate
  31. agentDw.sources.srcAvro.selector.type = replicating
  32. agentDw.sources.srcAvro.channels = chHdfs chKafka
  33. ## Interceptors
  34. agentDw.sources.srcAvro.interceptors = in1
  35. agentDw.sources.srcAvro.interceptors.in1.type = timestamp
  36. agentDw.sources.srcAvro.interceptors.in1.preserveExisting = true

Ch

  1. # Channels file
  2. agentDw.channels.ch.type = file
  3. agentDw.channels.ch.checkpointDir = /var/log/flume/Hdfs/checkpoint
  4. agentDw.channels.ch.dataDirs = /var/log/flume/Hdfs/data
  5. # channel 队列记录最大的 events 事件数量
  6. agentDw.channels.ch.capacity = 100000000
  7. # 最大文件的大小 128M
  8. agentDw.channels.ch.maxFileSize = 134217728
  9. # 最少需要多少空间 512M
  10. agentDw.channels.ch.minimumRequiredSpace = 524288000
  11. # 超时时间, channel 中没有数据最长等待时间
  12. agentDw.channels.ch.keep-alive = 3
  13. agentDw.channels.ch.threads = 8
  14. # 事物最大条数
  15. agentDw.channels.ch.transactionCapacity = 2000000

Slink

  1. # Sinks http
  2. agentDw.sinks.sinkHttp.type = http
  3. agentDw.sinks.sinkHttp.channel = ch
  4. agentDw.sinks.sinkHttp.endpoint = http://node1:10501
  5. agentDw.sinks.sinkHttp.connectTimeout = 2000
  6. agentDw.sinks.sinkHttp.requestTimeout = 2000
  7. agentDw.sinks.sinkHttp.contentTypeHeader = Content-Type:application/json;charset=UTF-8
  8. agentDw.sinks.sinkHttp.acceptHeader = Content-Type:application/json;charset=UTF-8
  9. agentDw.sinks.sinkHttp.defaultBackoff = true
  10. agentDw.sinks.sinkHttp.defaultRollback = true
  11. agentDw.sinks.sinkHttp.defaultIncrementMetrics = false
  12. agentDw.sinks.sinkHttp.backoff.200 = false
  13. agentDw.sinks.sinkHttp.rollback.200 = false
  14. agentDw.sinks.sinkHttp.incrementMetrics.200 = true
  15. agentDw.sinks.sinkHttp.serializer.compressionCodec = snappy
  16. # Sinks avro
  17. agentDw.sinks.sinkKafka.type = avro
  18. agentDw.sinks.sinkKafka.channel = ch
  19. agentDw.sinks.sinkKafka.hostname = log1
  20. agentDw.sinks.sinkKafka.port = 10302
  21. # 连接超时(ms)
  22. agentDw.sinks.sinkKafka.request-timeout = 20000
  23. # 请求超时(ms)
  24. agentDw.sinks.sinkKafka.connect-timeout = 20000
  25. # 复位连接间隔
  26. agentDw.sinks.sinkKafka.reset-connection-interval = 20000
  27. # 压缩算法, 对应 source 也要配置 compression-type = deflate, 默认 none
  28. agentDw.sinks.sinkKafka.compression-type = deflate
  29. # 压缩级别, 0: 不压缩压缩, 1 ~ 9: 数越高越压缩略越高, 默认 6
  30. agentDw.sinks.sinkKafka.compression-level = 6
  31. # 一次获取 N 个 Event 提交. batchsize < transactionCapacity || batchsize = transactionCapacity
  32. agentDw.sinks.sinkKafka.batch-size = 100000
  33. # Sinks hdfs
  34. agentDw.sinks.sinkHdfs.type = hdfs
  35. agentDw.sinks.sinkHdfs.channel = ch
  36. agentDw.sinks.sinkHdfs.hdfs.path = hdfs://nameservice1/ods/safe_realtime_click_tmp/%Y%m%d/%H
  37. agentDw.sinks.sinkHdfs.hdfs.filePrefix = from_the_stream
  38. agentDw.sinks.sinkHdfs.hdfs.inUsePrefix = .
  39. agentDw.sinks.sinkHdfs.hdfs.inUseSuffix = .tmp
  40. agentDw.sinks.sinkHdfs.hdfs.round = true
  41. agentDw.sinks.sinkHdfs.hdfs.roundValue = 5
  42. agentDw.sinks.sinkHdfs.hdfs.roundUnit = minute
  43. agentDw.sinks.sinkHdfs.hdfs.minBlockReplicas = 1
  44. agentDw.sinks.sinkHdfs.hdfs.rollSize = 0
  45. agentDw.sinks.sinkHdfs.hdfs.rollCount = 0
  46. agentDw.sinks.sinkHdfs.hdfs.rollInterval = 600
  47. agentDw.sinks.sinkHdfs.hdfs.writeFormat = Text
  48. agentDw.sinks.sinkHdfs.hdfs.fileType = CompressedStream
  49. agentDw.sinks.sinkHdfs.hdfs.codeC = gzip
  50. agentDw.sinks.sinkHdfs.sink.serializer = text
  51. agentDw.sinks.sinkHdfs.sink.serializer.appendNewline = true
  52. agentDw.sinks.sinkHdfs.hdfs.callTimeout = 30000
  53. agentDw.sinks.sinkHdfs.hdfs.idleTimeout = 0
  54. agentDw.sinks.sinkHdfs.hdfs.useLocalTimeStamp = true
  55. # 一次获取 N 个 Event 提交. batchsize < transactionCapacity || batchsize = transactionCapacity
  56. agentDw.sinks.sinkHdfs.hdfs.batchSize = 100000
  57. # Sinks kafka
  58. agentDw.sinks.sinkKafka.type = org.apache.flume.sink.kafka.KafkaSink
  59. agentDw.sinks.sinkKafka.channel = ch
  60. agentDw.sinks.sinkKafka.kafka.bootstrap.servers = node4:9092,node5:9092,node6:9092
  61. agentDw.sinks.sinkKafka.kafka.topic = kafka_topic
  62. # 为该通道中的所有事件指定一个Kafka分区ID, 默认情况下,如果此属性未设置,事件将由Kafka生产者的partition器分配
  63. # agentDw.sinks.sinkStreamKafka.defaultPartitionId
  64. # 有多少副本必须在其被认为成功写入之前确认一条消息。被接受的值为0(从不等待确认)性能最好,1(只等待领导)一般,-1(等待所有副本)最差但是不会丢数据
  65. agentDw.sinks.sinkKafka.kafka.producer.acks = 1
  66. agentDw.sinks.sinkKafka.kafka.producer.linger.ms = 1
  67. agentDw.sinks.sinkKafka.kafka.producer.compression.type = snappy
  68. # 一次获取 N 个 Event 提交. batchsize < transactionCapacity || batchsize = transactionCapacity
  69. agentDw.sinks.sinkKafka.kafka.flumeBatchSize = 100000

Load

  1. # 负载均衡
  2. agentDw.sinkgroups.sinkGroupsCollector.sinks = sinkHttp sinkKafka sinkHdfs
  3. agentDw.sinkgroups.sinkGroupsCollector.processor.type = load_balance
  4. agentDw.sinkgroups.sinkGroupsCollector.processor.selector = round_robin
  5. agentDw.sinkgroups.sinkGroupsCollector.processor.backoff = true
  6. agentDw.sinkgroups.sinkGroupsCollector.processor.selector.maxTimeOut = 1800000