exec—Kafka

  1. a1.sources = r1
  2. a1.sinks = k1
  3. a1.channels = c1
  4. a1.sources.r1.type = exec
  5. a1.sources.r1.command = tail -F /home/admin/didi-car/output/order/order
  6. a1.sources.r1.fileHeader = true
  7. a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
  8. a1.sinks.k1.topic = hai_kou_order_topic
  9. a1.sinks.k1.brokerList= cdh01:9092,cdh02:9092,cdh03:9092
  10. a1.sinks.k1.batchSize = 20
  11. a1.sinks.k1.requiredAcks = 1
  12. a1.sinks.k1.producer.linger.ms = 1
  13. a1.channels.c1.type = memory
  14. a1.channels.c1.capacity = 1000
  15. a1.channels.c1.transactionCapacity = 100
  16. # Bind the source and sink to the channel
  17. a1.sources.r1.channels = c1
  18. a1.sinks.k1.channel = c1

TailDir—Kafka

一个source分发到两个channel中:

  1. a1.sources=r1
  2. a1.channels=c1 c2
  3. # configure source
  4. a1.sources.r1.type = TAILDIR
  5. a1.sources.r1.positionFile = /opt/module/flume/test/log_position.json
  6. a1.sources.r1.filegroups = f1
  7. a1.sources.r1.filegroups.f1 = /tmp/logs/app.+
  8. a1.sources.r1.fileHeader = true
  9. a1.sources.r1.channels = c1 c2
  10. #interceptor
  11. #com.atguigu.flume.interceptor.LogETLInterceptor和
  12. #com.atguigu.flume.interceptor.LogTypeInterceptor是自定义的拦截器的全类名。
  13. #需要根据用户自定义的拦截器做相应修改
  14. a1.sources.r1.interceptors = i1 i2
  15. a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.LogETLInterceptor$Builder
  16. a1.sources.r1.interceptors.i2.type = com.atguigu.flume.interceptor.LogTypeInterceptor$Builder
  17. a1.sources.r1.selector.type = multiplexing
  18. a1.sources.r1.selector.header = topic
  19. a1.sources.r1.selector.mapping.topic_start = c1
  20. a1.sources.r1.selector.mapping.topic_event = c2
  21. # configure channel
  22. a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
  23. a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
  24. a1.channels.c1.kafka.topic = topic_start
  25. a1.channels.c1.parseAsFlumeEvent = false
  26. a1.channels.c1.kafka.consumer.group.id = flume-consumer
  27. a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
  28. a1.channels.c2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
  29. a1.channels.c2.kafka.topic = topic_event
  30. a1.channels.c2.parseAsFlumeEvent = false
  31. a1.channels.c2.kafka.consumer.group.id = flume-consumer

spooldir—-HDFS

  1. a1.sources = r1
  2. a1.sinks = k1
  3. a1.channels = c1
  4. ##注意:不能往监控目中重复丢同名文件
  5. a1.sources.r1.type = spooldir
  6. a1.sources.r1.spoolDir = /export/servers/dirfile
  7. a1.sources.r1.fileHeader = true
  8. # Describe the sink
  9. a1.sinks.k1.type = hdfs
  10. a1.sinks.k1.channel = c1
  11. a1.sinks.k1.hdfs.path = hdfs://node01:8020/spooldir/files/%y-%m-%d/%H%M/
  12. a1.sinks.k1.hdfs.filePrefix = events-
  13. a1.sinks.k1.hdfs.round = true
  14. a1.sinks.k1.hdfs.roundValue = 10
  15. a1.sinks.k1.hdfs.roundUnit = minute
  16. a1.sinks.k1.hdfs.rollInterval = 3
  17. a1.sinks.k1.hdfs.rollSize = 20
  18. a1.sinks.k1.hdfs.rollCount = 5
  19. a1.sinks.k1.hdfs.batchSize = 1
  20. a1.sinks.k1.hdfs.useLocalTimeStamp = true
  21. #生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
  22. a1.sinks.k1.hdfs.fileType = DataStream
  23. # Use a channel which buffers events in memory
  24. a1.channels.c1.type = memory
  25. a1.channels.c1.capacity = 1000
  26. a1.channels.c1.transactionCapacity = 100
  27. # Bind the source and sink to the channel
  28. a1.sources.r1.channels = c1
  29. a1.sinks.k1.channel = c1