视频地址:10数据采集-视频-day03-12-第二次flume基本配置

执行指令模板

bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console

第一个flume

1)Source

(1)Taildir Source相比Exec Source、Spooling Directory Source的优势
TailDir Source:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。
(2)batchSize大小如何设置?
答:Event 1K左右时,500-1000合适(默认为100)

2)Channel

采用Kafka Channel,省去了Sink,提高了效率。KafkaChannel数据存储在Kafka里面,所以数据是存储在磁盘中。
注意在Flume1.7以前,Kafka Channel很少有人使用,因为发现parseAsFlumeEvent这个配置起不了作用。也就是无论parseAsFlumeEvent配置为true还是false,都会转为Flume Event。这样的话,造成的结果是,会始终都把Flume的headers中的信息混合着内容一起写入Kafka的消息中,这显然不是我所需要的,我只是需要把内容写入即可。
image.png

  1. #1、定义agent、source、channel的名称
  2. a1.sources = r1
  3. a1.channels = c1
  4. #====================================================================
  5. #2、描述source
  6. a1.sources.r1.type = TAILDIR
  7. #定义断点续传文件
  8. a1.sources.r1.positionFile = /opt/module/flume/position.json
  9. #定义监控的文件组
  10. a1.sources.r1.filegroups = f1
  11. #指定文件组监控的文件
  12. a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
  13. #定义source每个批次采集的数据
  14. a1.sources.r1.batchSize = 100
  15. #---------------------------------------------------------------------
  16. #3、描述拦截器[过滤非json数据]
  17. #定义拦截器名称
  18. a1.sources.r1.interceptors = i1
  19. #定义拦截器类型
  20. a1.sources.r1.interceptors.i1.type = Tcode.ETLInterceptor.Builder
  21. #====================================================================
  22. #4、描述channel
  23. a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
  24. #指定kafka集群地址
  25. a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
  26. #指定写入的topic的名称
  27. a1.channels.c1.kafka.topic = applog
  28. #指定数据是否以Event数据格式写入kafka
  29. a1.channels.c1.parseAsFlumeEvent = false
  30. #====================================================================
  31. #5、关联source->channel
  32. a1.sources.r1.channels = c1

第二个flume

1)FileChannel和MemoryChannel区别

MemoryChannel传输数据速度更快,但因为数据保存在JVM的堆内存中,Agent进程挂掉会导致数据丢失,适用于对数据质量要求不高的需求。
FileChannel传输速度相对于Memory慢,但数据安全保障高,Agent进程挂掉也可以从失败中恢复数据。
选型:
金融类公司、对钱要求非常准确的公司通常会选择FileChannel
传输的是普通日志信息(京东内部一天丢100万-200万条,这是非常正常的),通常选择MemoryChannel。

2)FileChannel优化

通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。
checkpointDir和backupCheckpointDir也尽量配置在不同硬盘对应的目录中,保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据

3)Sink:HDFS Sink

(1)HDFS存入大量小文件,有什么影响?
元数据层面:每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在Namenode内存中。所以小文件过多,会占用Namenode服务器大量内存,影响Namenode性能和使用寿命
计算层面:默认情况下MR会对每个小文件启用一个Map任务计算,非常影响计算性能。同时也影响磁盘寻址时间。
(2)HDFS小文件处理
官方默认的这三个参数配置写入HDFS后会产生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount
基于以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0几个参数综合作用,效果如下:
(1)文件在达到128M时会滚动生成新文件
(2)文件创建超3600秒时会滚动生成新文件

拦截器

  1. import com.alibaba.fastjson.JSON;
  2. import com.alibaba.fastjson.JSONObject;
  3. import org.apache.flume.Context;
  4. import org.apache.flume.Event;
  5. import org.apache.flume.interceptor.Interceptor;
  6. import java.util.List;
  7. public class MyTimestampInterceptor implements Interceptor {
  8. @Override
  9. public void initialize() {
  10. }
  11. @Override
  12. public Event intercept(Event event) {
  13. byte[] body = event.getBody();
  14. JSONObject jsonObject = JSON.parseObject(new String(body));
  15. Long ts = jsonObject.getLong("ts");
  16. event.getHeaders().put("timestamp",ts+"");
  17. return event;
  18. }
  19. @Override
  20. public List<Event> intercept(List<Event> list) {
  21. for (Event event : list) {
  22. intercept(event);
  23. }
  24. return list;
  25. }
  26. @Override
  27. public void close() {
  28. }
  29. public static class Builder implements Interceptor.Builder {
  30. @Override
  31. public Interceptor build() {
  32. return new MyTimestampInterceptor();
  33. }
  34. @Override
  35. public void configure(Context context) {
  36. }
  37. }
  38. }

参数:
重点:
①指定channel的容量 batchSize<=transactionCapacity<=capacity
image.png
a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/applog/%Y%m%d —这里的%Y%m%d必须要有时间戳在hearder中定义

#1、定义agent、source、channel、sink的名称
a1.sources = r1
a1.channels = c1
a1.sinks = k1

#2、描述source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
#指定kafka集群地址
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
#指定消费者组的id
a1.sources.r1.kafka.consumer.group.id = g2222
#指定消费的topic的名称
a1.sources.r1.kafka.topics = applog
#指定source从kafka拉取数据的批次大小
a1.sources.r1.batchSize = 100
#是否以event数据类型读取kafka数据
a1.sources.r1.useFlumeEventFormat = false
#消费者组第一次消费topic数据的时候指定从哪个位置开始消费
a1.sources.r1.kafka.consumer.auto.offset.reset = earliest

#2.1、描述拦截器[处理数据漂移问题]
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.interceptor.MyTimeInterceptor$Builder

#3、描述channel
a1.channels.c1.type = file
#指定channel数据的存储目录
a1.channels.c1.dataDirs = /opt/module/flume/datas
#指定channel内存指针的保存路径
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoints
#指定事务的容量
a1.channels.c1.transactionCapacity = 1000
#指定channel的容量 batchSize<=transactionCapacity<=capacity
a1.channels.c1.capacity = 1000000

#4、描述sink
a1.sinks.k1.type = hdfs
#指定数据存储目录
a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/applog/%Y%m%d --这里的%Y%m%d必须要有时间戳在hearder中定义
#指定数据存储的文件前缀
a1.sinks.k1.hdfs.filePrefix = log-
#指定flume向文件中写多长时间之后,生成新文件,数据向新文件中写入,老文件不再写数据
a1.sinks.k1.hdfs.rollInterval=30
#指定flume向文件写到多大之后,生成新文件,数据向新文件中写入,老文件不再写数据
a1.sinks.k1.hdfs.rollSize=132120576
#指定flume向文件写入多少个event之后,生成新文件,数据向新文件中写入,老文件不再写数据,生产环境中不配置,因为文件大小有可能不均衡
a1.sinks.k1.hdfs.rollCount=0
#指定sink每个批次从channel中拉取多少数据
a1.sinks.k1.hdfs.batchSize=100
#指定数据写入HDFS的时候压缩格式
#a1.sinks.k1.hdfs.codeC=lzop
#指定数据保存在HDFS的文件类型:SequenceFile-序列化文件, DataStream-文本文件, CompressedStream-压缩文件
#a1.sinks.k1.hdfs.fileType=CompressedStream
a1.sinks.k1.hdfs.fileType=DataStream
#指定是否按照指定的时间间隔生成文件夹
a1.sinks.k1.hdfs.round=false

#5、关联source->channel->sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

第二个flume参数说明
**auto.offset.reset**

#消费者组第一次消费topic数据的时候指定从哪个位置开始消费,
#再次消费则不会从最早位置开始,除非更换消费者
a1.sources.r1.kafka.consumer.auto.offset.reset = earliest

chickpoint:
在磁盘中持久化file channel在内存中的指针,防止宕机指针丢失``

#指定channel内存指针的保存路径
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoints

image.png
相关参数:
image.png

内存优化

项目经验之Flume内存优化

1)问题描述:如果启动消费Flume抛出如下异常,说明堆内存不足
ERROR hdfs.HDFSEventSink: process failed:提交失败
java.lang.OutOfMemoryError: GC overhead limit exceeded
2)解决方案步骤:
(1)在hadoop102服务器的/opt/module/flume/conf/flume-env.sh文件中增加如下配置

export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"

(2)同步配置到hadoop103、hadoop104服务器

[atguigu@hadoop102 conf]$ xsync flume-env.sh

3)Flume内存参数设置及优化
JVM heap一般设置为4G或更高
-Xmx与-Xms最好设置一致,减少内存抖动(GC删除原本加载的内存,重新加载更大的内存)带来的性能影响,如果设置不一致容易导致频繁fullgc(垃圾挥手,速度很慢)。

-Xms JVM Heap(堆内存,启动时内存)最小尺寸,初始分配;
-Xmx JVM Heap(堆内存,运行时内存)最大允许的尺寸,按需分配。如果不设置一致,容易在初始化时,由于内存不够,频繁触发fullgc
-Xms>=-Xmx