exec—Kafka
a1.sources = r1a1.sinks = k1a1.channels = c1a1.sources.r1.type = execa1.sources.r1.command = tail -F /home/admin/didi-car/output/order/ordera1.sources.r1.fileHeader = truea1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.topic = hai_kou_order_topica1.sinks.k1.brokerList= cdh01:9092,cdh02:9092,cdh03:9092a1.sinks.k1.batchSize = 20a1.sinks.k1.requiredAcks = 1a1.sinks.k1.producer.linger.ms = 1a1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
TailDir—Kafka
一个source分发到两个channel中:
a1.sources=r1a1.channels=c1 c2# configure sourcea1.sources.r1.type = TAILDIRa1.sources.r1.positionFile = /opt/module/flume/test/log_position.jsona1.sources.r1.filegroups = f1a1.sources.r1.filegroups.f1 = /tmp/logs/app.+a1.sources.r1.fileHeader = truea1.sources.r1.channels = c1 c2#interceptor#com.atguigu.flume.interceptor.LogETLInterceptor和#com.atguigu.flume.interceptor.LogTypeInterceptor是自定义的拦截器的全类名。#需要根据用户自定义的拦截器做相应修改a1.sources.r1.interceptors = i1 i2a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.LogETLInterceptor$Buildera1.sources.r1.interceptors.i2.type = com.atguigu.flume.interceptor.LogTypeInterceptor$Buildera1.sources.r1.selector.type = multiplexinga1.sources.r1.selector.header = topica1.sources.r1.selector.mapping.topic_start = c1a1.sources.r1.selector.mapping.topic_event = c2# configure channela1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannela1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092a1.channels.c1.kafka.topic = topic_starta1.channels.c1.parseAsFlumeEvent = falsea1.channels.c1.kafka.consumer.group.id = flume-consumera1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannela1.channels.c2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092a1.channels.c2.kafka.topic = topic_eventa1.channels.c2.parseAsFlumeEvent = falsea1.channels.c2.kafka.consumer.group.id = flume-consumer
spooldir—-HDFS
a1.sources = r1a1.sinks = k1a1.channels = c1##注意:不能往监控目中重复丢同名文件a1.sources.r1.type = spooldira1.sources.r1.spoolDir = /export/servers/dirfilea1.sources.r1.fileHeader = true# Describe the sinka1.sinks.k1.type = hdfsa1.sinks.k1.channel = c1a1.sinks.k1.hdfs.path = hdfs://node01:8020/spooldir/files/%y-%m-%d/%H%M/a1.sinks.k1.hdfs.filePrefix = events-a1.sinks.k1.hdfs.round = truea1.sinks.k1.hdfs.roundValue = 10a1.sinks.k1.hdfs.roundUnit = minutea1.sinks.k1.hdfs.rollInterval = 3a1.sinks.k1.hdfs.rollSize = 20a1.sinks.k1.hdfs.rollCount = 5a1.sinks.k1.hdfs.batchSize = 1a1.sinks.k1.hdfs.useLocalTimeStamp = true#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本a1.sinks.k1.hdfs.fileType = DataStream# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
