1. 数据采集模块

2. hadoop

2.1. 安装准备工作

安装工具包

  1. sudo yum install -y epel-release #额外软件源
  2. sudo yum install -y psmisc nc net-tools rsync vim lrzsz ntp libzstd openssl-static tree iotop git wget #安装 psmisc工具包 nc工具包 net-tools工具包 rsync 远程同步 vim编辑器 lrzsz上传下载 ntp时间同步

修改静态ip

  1. sudo vim /etc/sysconfig/network-scripts/ifcfg-ens33
  1. DEVICE=ens33
  2. TYPE=Ethernet
  3. ONBOOT=yes
  4. BOOTPROTO=static
  5. NAME="ens33"
  6. IPADDR=192.168.42.102
  7. PREFIX=24
  8. GATEWAY=192.168.42.2
  9. DNS1=192.168.42.2
  1. service network restart

修改主机名

  1. sudo vim /etc/hostname

修改host

  1. sudo vim /etc/hosts
  1. 192.168.42.102 hadoop102
  2. 192.168.42.103 hadoop103
  3. 192.168.42.104 hadoop104

windows上也配置host

关闭防火墙

  1. sudo systemctl stop firewalld
  2. sudo systemctl disable firewalld

创建用户 新版本hadoop无法使用root用户启动

  1. sudo useradd atguigu
  2. sudo passwd atguigu

修改atguigu权限

  1. visudo
  1. ## Allow root to run any commands anywhere
  2. root ALL=(ALL) ALL
  3. atguigu ALL=(ALL) ALL #在root下添加此行内容

创建opt下的存放目录

  1. cd /opt
  2. sudo mkdir module
  3. sudo mkdir software
  4. sudo chown atguigu:atguigu /opt/module /opt/software

2.2. 安装java

先卸载

  1. rpm -qa | grep -i java | xargs -n1 sudo rpm -e --nodeps

解压

  1. cd /opt/software/
  2. tar -zxvf jdk-8u212-linux-x64.tar.gz -C /opt/module/

配置环境变量

  1. sudo vim /etc/profile.d/my_env.sh
  1. #JAVA_HOME
  2. #yum 为/usr/lib/jvm/java
  3. export JAVA_HOME=/opt/module/jdk1.8.0_212
  4. export PATH=$PATH:$JAVA_HOME/bin

更新环境变量

  1. source /etc/profile
  2. java -version

2.3. 安装hadoop

解压

  1. tar -zxvf hadoop-3.1.3.tar.gz -C /opt/module/

配置环境变量

  1. sudo vim /etc/profile.d/my_env.sh
  1. ##HADOOP_HOME
  2. export HADOOP_HOME=/opt/module/hadoop-3.1.3
  3. export PATH=$PATH:$HADOOP_HOME/bin
  4. export PATH=$PATH:$HADOOP_HOME/sbin

更新

  1. source /etc/profile
  2. hadoop version

2.4. xsync脚本

  1. cd /home/atguigu
  2. vim xsync
  1. #!/bin/bash
  2. #1. 判断参数个数
  3. if [ $# -lt 1 ]
  4. then
  5. echo Not Enough Arguement!
  6. exit;
  7. fi
  8. #2. 遍历集群所有机器
  9. for host in hadoop102 hadoop103 hadoop104
  10. do
  11. echo ==================== $host ====================
  12. #3. 遍历所有目录,挨个发送
  13. for file in $@
  14. do
  15. #4 判断文件是否存在
  16. if [ -e $file ]
  17. then
  18. #5. 获取父目录
  19. pdir=$(cd -P $(dirname $file); pwd)
  20. #6. 获取当前文件的名称
  21. fname=$(basename $file)
  22. ssh $host "mkdir -p $pdir"
  23. rsync -av $pdir/$fname $host:$pdir
  24. else
  25. echo $file does not exists!
  26. fi
  27. done
  28. done
  1. chmod +x xsync
  2. sudo mv xsync /bin/ #将脚本移动到/bin中,以便全局调用

2.5. ssh免密

生成ssh密钥

  1. ssh-keygen -t rsa

分发ssh密钥

  1. ssh-copy-id hadoop102
  2. ssh-copy-id hadoop103
  3. ssh-copy-id hadoop104

以上生成和分发 需要在每台机器上执行

2.6. 分发软件

  1. cd /opt/module/
  2. xsync jdk1.8.0_212/
  3. xsync hadoop-3.1.3/

分发环境变量

  1. sudo scp /etc/profile.d/my_env.sh root@hadoop103:/etc/profile.d/my_env.sh
  2. sudo scp /etc/profile.d/my_env.sh root@hadoop104:/etc/profile.d/my_env.sh

2.7. 集群配置

集群规划

服务器hadoop102 服务器hadoop103 服务器hadoop104
HDFS NameNode DataNode DataNode DataNode SecondaryNameNode
Yarn NodeManager Resourcemanager NodeManager NodeManager

进配置文件夹

  1. cd /opt/module/hadoop-3.1.3/etc/hadoop

core-site.xml

  1. vim core-site.xml
  1. <property>
  2. <name>fs.defaultFS</name>
  3. <!-- 配置hdfs默认的地址 -->
  4. <value>hdfs://hadoop102:8020</value>
  5. </property>
  6. <property>
  7. <!-- 配置hadoop临时存放路径-->
  8. <name>hadoop.tmp.dir</name>
  9. <value>/opt/module/hadoop-3.1.3/data</value>
  10. </property>
  11. <property>
  12. <!-- 兼容性配置hive -->
  13. <name>hadoop.proxyuser.atguigu.hosts</name>
  14. <value>*</value>
  15. </property>
  16. <property>
  17. <name>hadoop.proxyuser.atguigu.groups</name>
  18. <value>*</value>
  19. </property>
  20. <property>
  21. <name>hadoop.http.staticuser.user</name>
  22. <value>atguigu</value>
  23. </property>

hdfs-site.xml

  1. <property>
  2. <!-- 2nn的地址 -->
  3. <name>dfs.namenode.secondary.http-address</name>
  4. <value>hadoop104:9868</value>
  5. </property>

yarn-site.xml

  1. <property>
  2. <!--设置NodeManager上运行的附属服务,需配置成mapreduce_shuffle才可运行MapReduce程序-->
  3. <name>yarn.nodemanager.aux-services</name>
  4. <value>mapreduce_shuffle</value>
  5. </property>
  6. <property>
  7. <name>yarn.resourcemanager.hostname</name>
  8. <value>hadoop103</value>
  9. </property>
  10. <property>
  11. <name>yarn.nodemanager.env-whitelist</name>
  12. <value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
  13. </property>
  14. <!--设定单个容器可以申领到的最小内存资源-->
  15. <property>
  16. <name>yarn.scheduler.minimum-allocation-mb</name>
  17. <value>2048</value>
  18. </property>
  19. <property>
  20. <name>yarn.scheduler.maximum-allocation-mb</name>
  21. <value>4096</value>
  22. </property>
  23. <!--设定物理节点有4G内存加入资源池-->
  24. <property>
  25. <name>yarn.nodemanager.resource.memory-mb</name>
  26. <value>4096</value>
  27. </property>
  28. <property>
  29. <name>yarn.nodemanager.pmem-check-enabled</name>
  30. <value>false</value>
  31. </property>
  32. <property>
  33. <name>yarn.nodemanager.vmem-check-enabled</name>
  34. <value>false</value>
  35. </property>

mapred-site.xml

  1. <!--Hadoop对MapReduce运行框架一共提供了3种实现,在mapred-site.xml中通过"mapreduce.framework.name"这个属性来设置为"classic"、"yarn"或者"local"-->
  2. <property>
  3. <name>mapreduce.framework.name</name>
  4. <value>yarn</value>
  5. </property>

workers

  1. vim /opt/module/hadoop-3.1.3/etc/hadoop/workers

删除localhost 文件中添加的内容结尾不允许有空格,文件中不允许有空行。

  1. hadoop102
  2. hadoop103
  3. hadoop104

同步

  1. xsync /opt/module/hadoop-3.1.3/etc/hadoop/

2.8. 格式化

格式化hdfs

  1. hdfs namenode -format

启动dfs

  1. start-dfs.sh

如果报java未找到 修改hadoop.env.sh文件

  1. vim /opt/module/hadoop-3.1.3/etc/hadoop/hadoop-env.sh

修改JAVA_HOME为 并同步

  1. export JAVA_HOME=/opt/module/jdk1.8.0_212

启动yarn

  1. start-yarn.sh #在103机器上启动

2.9. hdfs存储多目录

若HDFS存储空间紧张,需要对DataNode进行磁盘扩展。

  1. 在DataNode节点增加磁盘并进行挂载。
    04. 数据采集模块 - 图1
  2. 在hdfs-site.xml文件中配置多目录,注意新挂载磁盘的访问权限问题。
    1. <property>
    2. <name>dfs.datanode.data.dir</name>
    3. <value>file:///${hadoop.tmp.dir}/dfs/data1,file:///hd2/dfs/data2,file:///hd3/dfs/data3,file:///hd4/dfs/data4</value>
    4. </property>
  1. 增加磁盘后,保证每个目录数据均衡
    1. bin/start-balancer.sh threshold 10

    对于参数10,代表的是集群中各个节点的磁盘空间利用率相差不超过10%,可根据实际情况进行调整。
    停止数据均衡命令:

2.10. LZO压缩配置

2.10.1. 编译

hadoop本身并不支持lzo压缩,故需要使用twitter提供的hadoop-lzo开源组件。hadoop-lzo需依赖hadoop和lzo进行编译,编译步骤如下

下载、安装并编译LZO

  1. sudo yum -y install gcc-c++ lzo-devel zlib-devel autoconf automake libtool
  2. wget http://www.oberhumer.com/opensource/lzo/download/lzo-2.10.tar.gz
  3. tar -zxvf lzo-2.10.tar.gz
  4. cd lzo-2.10
  5. ./configure -prefix=/usr/local/hadoop/lzo/
  6. make
  7. sudo make install

编译hadoop-lzo源码

下载hadoop-lzo的源码,下载地址:https://github.com/twitter/hadoop-lzo/archive/master.zip

解压之后,修改pom.xml

  1. <hadoop.current.version>2.7.2</hadoop.current.version>

声明两个临时环境变量

  1. export C_INCLUDE_PATH=/usr/local/hadoop/lzo/include
  2. export LIBRARY_PATH=/usr/local/hadoop/lzo/lib

编译打包

  1. mvn package -Dmaven.test.skip=true

进入target,hadoop-lzo-0.4.21-SNAPSHOT.jar 即编译成功的hadoop-lzo组件

2.10.2. 使用

将编译好后的hadoop-lzo-0.4.20.jar 放入hadoop-3.1.3/share/hadoop/common/

  1. cd /opt/software
  2. mv hadoop-lzo-0.4.20.jar /opt/module/hadoop-3.1.3/share/hadoop/common/

同步分发

  1. xsync /opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-lzo-0.4.20.jar

core-site.xml增加配置支持LZO压缩

  1. vim /opt/module/hadoop-3.1.3/etc/hadoop/core-site.xml

添加以下内容

  1. <property>
  2. <name>io.compression.codecs</name>
  3. <value>
  4. org.apache.hadoop.io.compress.GzipCodec,
  5. org.apache.hadoop.io.compress.DefaultCodec,
  6. org.apache.hadoop.io.compress.BZip2Codec,
  7. org.apache.hadoop.io.compress.SnappyCodec,
  8. com.hadoop.compression.lzo.LzoCodec,
  9. com.hadoop.compression.lzo.LzopCodec
  10. </value>
  11. </property>
  12. <property>
  13. <name>io.compression.codec.lzo.class</name>
  14. <value>com.hadoop.compression.lzo.LzoCodec</value>
  15. </property>

分发

  1. xsync /opt/module/hadoop-3.1.3/etc/hadoop/core-site.xml

重新启动集群

  1. stop-all.sh
  2. start-dfs.sh #102
  3. start-yarn.sh #103

LZO创建索引

将bigtable.lzo(150M)上传到集群的/input目录

  1. hadoop fs -mkdir /input
  2. hadoop fs -put bigtable.lzo /input

执行wordcount程序

  1. hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount /input /output1

没有创建索引则LZO文件的切片只有一个

04. 数据采集模块 - 图2

创建索引

  1. # hadoop jar 编译好的lzo jar包 类引用路径 lzo数据文件
  2. hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer /input/bigtable.lzo

04. 数据采集模块 - 图3

重新执行wordcount

  1. hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount /input /output2

04. 数据采集模块 - 图4

2.11. 基准测试

2.11.1. 测试HDFS写性能

向HDFS写10个128M的文件

  1. hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -write -nrFiles 10 -fileSize 128MB

04. 数据采集模块 - 图5

2.11.2. 测试HDFS读性能

读取HDFS集群10个128M的文件

  1. hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -read -nrFiles 10 -fileSize 128MB

04. 数据采集模块 - 图6

2.11.3. 删除测试生成数据

测试数据默认存储在 /benchmarks/TestDFSIO上

04. 数据采集模块 - 图7

  1. hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -clean

2.11.4. 使用sort测试mapreduce

使用RandomWriter来产生随机数,每个节点运行10个Map任务,每个Map产生大约1G大小的二进制随机数

  1. hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar randomwriter random-data

执行sort程序

  1. hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar sort random-data sorted-data

验证数据是否真正排好序了

  1. hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar testmapredsort -sortInput random-data -sortOutput sorted-data

2.12. hadoop参数调优

  • HDFS参数调优hdfs-site.xml

    1. dfs.namenode.handler.count=20 * log2(Cluster Size)


    如集群规模为8台时,此参数设置为60
    NameNode有一个工作线程池,用来处理不同DataNode的并发心跳以及客户端并发的元数据操作。对于大集群或者有大量客户端的集群来说,通常需要增大参数dfs.namenode.handler.count的默认值10。设置该值的一般原则是将其设置为集群大小的自然对数乘以20,即20logN,N为集群大小

  • YARN参数调优yarn-site.xml

    • 情景描述:总共7台机器,每天几亿条数据,数据源->Flume->Kafka->HDFS->Hive
    • 面临问题:数据统计主要用HiveSQL,没有数据倾斜(每个reduce数据没有偏大或偏小),小文件已经做了合并处理,开启的JVM重用,而且IO没有阻塞,内存用了不到50%。但是还是跑的非常慢,而且数据量洪峰过来时,整个集群都会宕掉。基于这种情况有没有优化方案。
    • 解决办法:内存利用率不够。这个一般是Yarn的2个配置造成的,单个任务可以申请的最大内存大小,和Hadoop单个节点可用内存大小。调节这两个参数能提高系统内存的利用率。
      • yarn.nodemanager.resource.memory-mb
        表示该节点上YARN可使用的物理内存总量默认是8192(MB),注意,如果你的节点内存资源不够8GB,则需要调减小这个值,而YARN不会智能的探测节点的物理内存总量。
      • yarn.scheduler.maximum-allocation-mb
        单个任务可申请的最多物理内存量默认是8192(MB)
      • 04. 数据采集模块 - 图8
  • Hadoop宕机
    • 如果MR造成系统宕机。此时要控制Yarn同时运行的任务数,和每个任务申请的最大内存。调整参数:yarn.scheduler.maximum-allocation-mb(单个任务可申请的最多物理内存量,默认是8192MB)
    • 如果写入文件过量造成NameNode宕机。那么调高Kafka的存储大小,控制从Kafka到HDFS的写入速度。高峰期的时候用Kafka进行缓存,高峰期过去数据同步会自动跟上。

3. zookeeper

集群规划

服务器hadoop102 服务器hadoop103 服务器hadoop104
Zookeeper Zookeeper Zookeeper Zookeeper
  1. tar -zxvf /opt/software/apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/
  2. mv /opt/module/apache-zookeeper-3.5.7-bin/ /opt/module/zookeeper
  3. #环境变量
  4. sudo vim /etc/profile.d/my_env.sh
  5. #追加
  6. #ZOOKEEPER_HOME
  7. export ZOOKEEPER_HOME=/opt/module/zookeeper
  8. export PATH=$PATH:$ZOOKEEPER_HOME/bin
  9. #同步环境变量
  10. source /etc/profile.d/my_env.sh
  11. sudo xsync /etc/profile.d/my_env.sh
  12. #将配置文件改名称为zoo.cfg
  13. cd /opt/module/zookeeper/conf/
  14. cp zoo_sample.cfg zoo.cfg
  15. #配置zookeeper文件
  16. vim zoo.cfg
  17. #追加下内容
  18. server.2=hadoop102:2888:3888
  19. server.3=hadoop103:2888:3888
  20. server.4=hadoop104:2888:3888
  21. #修改数据存储位置
  22. dataDir=/opt/module/zookeeper/zkData
  23. cd /opt/module/zookeeper
  24. mkdir zkData
  25. #创建myid用于zookeeper标记机器
  26. #编辑为102的2用于唯一标识 103为3 104为4
  27. echo 2 > /opt/module/zookeeper/zkData/myid
  28. #同步
  29. xsync /opt/module/zookeeper/
  30. #103
  31. echo 3 > /opt/module/zookeeper/zkData/myid
  32. #104
  33. echo 4 > /opt/module/zookeeper/zkData/myid

启动

  1. #在三台机器上分别启动
  2. zkServer.sh start

3.1. 群起脚本

在hadoop102的/home/atguigu/bin目录下创建脚本

  1. mkdir -p /home/atguigu/bin
  2. cd /home/atguigu/bin
  3. vim zk.sh
  1. #!/bin/bash
  2. case $1 in
  3. "start"){
  4. for i in hadoop102 hadoop103 hadoop104
  5. do
  6. echo "------------- $i -------------"
  7. ssh $i "/opt/module/zookeeper/bin/zkServer.sh start"
  8. done
  9. };;
  10. "stop"){
  11. for i in hadoop102 hadoop103 hadoop104
  12. do
  13. echo "------------- $i -------------"
  14. ssh $i "/opt/module/zookeeper/bin/zkServer.sh stop"
  15. done
  16. };;
  17. "status"){
  18. for i in hadoop102 hadoop103 hadoop104
  19. do
  20. echo "------------- $i -------------"
  21. ssh $i "/opt/module/zookeeper/bin/zkServer.sh status"
  22. done
  23. };;
  24. esac

增加权限

  1. chmod 755 zk.sh
  2. zk.sh start

4. 日志生成

将之前日志生成的带依赖jar上传到102上

4.1. 代码参数说明

  1. // 参数一:控制发送每条的延时时间,默认是0
  2. Long delay = args.length > 0 ? Long.parseLong(args[0]) : 0L;
  3. // 参数二:循环遍历次数
  4. int loop_len = args.length > 1 ? Integer.parseInt(args[1]) : 1000;

4.2. 启动方式

启动方式有两种:

  1. 通过jar -classpath指定全类名启动
    1. java -classpath log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar com.atguigu.appclient.AppMain >/opt/module/test.log
  1. 通过jar -jar 启动 必须打包时指定启动类类引用否则无法启动
    1. java -jar log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar >/opt/module/test.log

4.3. 日志收集

我们通过上面方式启动 java程序会收集控制台中的输出 但我们已经通过logback收集日志到指定路径文件了 我们无需保存控制台上的输出内容 可以通过将输出内容直接输出到/dev/null中

/dev/null代表linux的空设备文件,所有往这个文件里面写入的内容都会丢失,俗称“黑洞”。

名称 代码 操作符 Java中表示 Linux 下文件描述符(Debian 为例)
标准输入(stdin) 0 < 或 << System.in /dev/stdin -> /proc/self/fd/0 -> /dev/pts/0
标准输出(stdout) 1 >, >>, 1> 或 1>> System.out /dev/stdout -> /proc/self/fd/1 -> /dev/pts/0
标准错误输出(stderr) 2 2> 或 2>> System.err /dev/stderr -> /proc/self/fd/2 -> /dev/pts/0
  1. java -jar log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar 2>/dev/null 1>/dev/null

可以简写为

  1. java -jar log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar >/dev/null 2>&1

在/tmp/logs路径下查看生成的日志文件

  1. cd /tmp/logs/
  2. ls

4.4. 集群日志生成脚本

先同步日志jar到集群其他机器上

  1. xsync /opt/module/log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar

在/home/atguigu/bin目录下创建脚本lg.sh

  1. vim /home/atguigu/bin/lg.sh
  1. #! /bin/bash
  2. for i in hadoop102 hadoop103
  3. do
  4. ssh $i "java -jar /opt/module/log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar $1 $2 >/dev/null 2>&1 &"
  5. done
  1. chmod 755 /home/atguigu/bin/lg.sh
  2. /home/atguigu/bin/lg.sh

4.5. 集群时间同步修改脚本(非正规临时脚本)

在/home/atguigu/bin目录下创建脚本dt.sh

  1. vim /home/atguigu/bin/dt.sh
  1. #!/bin/bash
  2. for i in hadoop102 hadoop103 hadoop104
  3. do
  4. echo "========== $i =========="
  5. ssh -t $i "sudo date -s $1"
  6. done

ssh -t 通常用于ssh远程执行sudo命令

  1. chmod 755 /home/atguigu/bin/dt.sh
  2. /home/atguigu/bin/dt.sh 2020-03-10

4.6. 集群所有进程查看脚本

在/home/atguigu/bin目录下创建脚本xcall.sh

  1. vim /home/atguigu/bin/xcall.sh
  1. #! /bin/bash
  2. for i in hadoop102 hadoop103 hadoop104
  3. do
  4. echo --------- $i ----------
  5. ssh $i "$*"
  6. done
  1. chmod 755 /home/atguigu/bin/xcall.sh
  2. xcall.sh jps

5. 采集日志Flume

04. 数据采集模块 - 图9

集群规划:

服务器hadoop102 服务器hadoop103 服务器hadoop104
Flume(采集日志) Flume Flume

5.1. 安装

  1. tar -zxvf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/
  2. cd /opt/module/
  3. mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume
  4. #将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop 3.1.3
  5. rm /opt/module/flume/lib/guava-11.0.2.jar
  6. sudo vim /etc/profile.d/my_env.sh
  7. #FLUME_HOME
  8. export FLUME_HOME=/opt/module/flume
  9. export PATH=$PATH:$FLUME_HOME/bin
  10. source /etc/profile.d/my_env.sh

同步

  1. cd /opt/module/
  2. xsync flume/
  3. sudo xsync /etc/profile.d/my_env.sh

5.2. Flume组件选择

  1. Source

(1)Taildir Source相比Exec Source、Spooling Directory Source的优势

TailDir Source:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。

Exec Source可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失

Spooling Directory Source监控目录,不支持断点续传

(2)batchSize大小如何设置?

答:Event 1K左右时,500-1000合适(默认为100)

  1. Channel

采用Kafka Channel,省去了Sink,提高了效率。KafkaChannel数据存储在Kafka里面,所以数据是存储在磁盘中。

注意在Flume1.7以前,Kafka Channel很少有人使用,因为发现parseAsFlumeEvent这个配置起不了作用。也就是无论parseAsFlumeEvent配置为true还是false,都会转为Flume Event。这样的话,造成的结果是,会始终都把Flume的headers中的信息混合着内容一起写入Kafka的消息中,这显然不是我所需要的,我只是需要把内容写入即可。

5.3. Flume配置

04. 数据采集模块 - 图10

file-flume-kafka.conf文件

  1. cd /opt/module/flume/conf
  2. vim file-flume-kafka.conf
  1. a1.sources=r1
  2. a1.channels=c1 c2
  3. # configure source
  4. a1.sources.r1.type = TAILDIR
  5. a1.sources.r1.positionFile = /opt/module/flume/test/log_position.json
  6. a1.sources.r1.filegroups = f1
  7. a1.sources.r1.filegroups.f1 = /tmp/logs/app.+
  8. a1.sources.r1.fileHeader = true
  9. a1.sources.r1.channels = c1 c2
  10. #interceptor
  11. a1.sources.r1.interceptors = i1 i2
  12. a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.LogETLInterceptor$Builder
  13. a1.sources.r1.interceptors.i2.type = com.atguigu.flume.interceptor.LogTypeInterceptor$Builder
  14. a1.sources.r1.selector.type = multiplexing
  15. a1.sources.r1.selector.header = topic
  16. a1.sources.r1.selector.mapping.topic_start = c1
  17. a1.sources.r1.selector.mapping.topic_event = c2
  18. # configure channel
  19. a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
  20. a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
  21. a1.channels.c1.kafka.topic = topic_start
  22. a1.channels.c1.parseAsFlumeEvent = false
  23. a1.channels.c1.kafka.consumer.group.id = flume-consumer
  24. a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
  25. a1.channels.c2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
  26. a1.channels.c2.kafka.topic = topic_event
  27. a1.channels.c2.parseAsFlumeEvent = false
  28. a1.channels.c2.kafka.consumer.group.id = flume-consumer

04. 数据采集模块 - 图11

5.4. ETL和分类型拦截器

本项目中自定义了两个拦截器,分别是:ETL拦截器、日志类型区分拦截器。

ETL拦截器主要用于,过滤时间戳不合法和Json数据不完整的日志

日志类型区分拦截器主要用于,将启动日志和事件日志区分开来,方便发往Kafka的不

在ide创建flume-interceptor工程 导入依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.flume</groupId>
  4. <artifactId>flume-ng-core</artifactId>
  5. <version>1.9.0</version>
  6. </dependency>
  7. </dependencies>
  8. <build>
  9. <plugins>
  10. <plugin>
  11. <artifactId>maven-compiler-plugin</artifactId>
  12. <version>2.3.2</version>
  13. <configuration>
  14. <source>1.8</source>
  15. <target>1.8</target>
  16. </configuration>
  17. </plugin>
  18. <plugin>
  19. <artifactId>maven-assembly-plugin</artifactId>
  20. <configuration>
  21. <descriptorRefs>
  22. <descriptorRef>jar-with-dependencies</descriptorRef>
  23. </descriptorRefs>
  24. </configuration>
  25. <executions>
  26. <execution>
  27. <id>make-assembly</id>
  28. <phase>package</phase>
  29. <goals>
  30. <goal>single</goal>
  31. </goals>
  32. </execution>
  33. </executions>
  34. </plugin>
  35. </plugins>
  36. </build>

创建com.atguigu.flume.interceptor包 包下创建 LogETLInterceptor 类 并实现Interceptor接口

  1. package com.atguigu.flume.interceptor;
  2. import org.apache.flume.Context;
  3. import org.apache.flume.Event;
  4. import org.apache.flume.interceptor.Interceptor;
  5. import java.nio.charset.Charset;
  6. import java.nio.charset.StandardCharsets;
  7. import java.util.ArrayList;
  8. import java.util.List;
  9. public class LogETLInterceptor implements Interceptor {
  10. @Override
  11. public void initialize() {
  12. }
  13. @Override
  14. public Event intercept(Event event) {
  15. //将event转换为string 方便处理
  16. byte[] body = event.getBody();
  17. String log = new String(body, StandardCharsets.UTF_8);
  18. //判断日志是启动日志还是事件日志 并向header中添加标识
  19. if (log.contains("start")) {
  20. //清洗启动日志
  21. if (LogUtils.validateStart(log)) {
  22. return event;
  23. }
  24. } else {
  25. //清洗事件日志
  26. if (LogUtils.validateEvent(log)) {
  27. return event;
  28. }
  29. }
  30. return null;
  31. }
  32. @Override
  33. public List<Event> intercept(List<Event> list) {
  34. ArrayList<Event> intercepts = new ArrayList<>();
  35. for (Event event : list) {
  36. Event intercept = intercept(event);
  37. if (intercept != null) {
  38. intercepts.add(intercept);
  39. }
  40. }
  41. return intercepts;
  42. }
  43. @Override
  44. public void close() {
  45. }
  46. public static class Builder implements Interceptor.Builder{
  47. @Override
  48. public Interceptor build() {
  49. return new LogETLInterceptor();
  50. }
  51. @Override
  52. public void configure(Context context) {
  53. }
  54. }
  55. }

LogUtils 工具类

  1. package com.atguigu.flume.interceptor;
  2. import org.apache.commons.lang.math.NumberUtils;
  3. public class LogUtils {
  4. public static boolean validateStart(String log) {
  5. if (log == null) {
  6. return false;
  7. }
  8. //开头不是以{或结尾不是以}结束的 不是json字符串
  9. if (!log.trim().startsWith("{") || log.trim().endsWith("}")) {
  10. return false;
  11. }
  12. return true;
  13. }
  14. public static boolean validateEvent(String log) {
  15. if (log == null) {
  16. return false;
  17. }
  18. //以 | 切割
  19. String[] logConents = log.split("\\|");
  20. //判断长度
  21. if (logConents.length != 2) {
  22. return false;
  23. }
  24. //判断服务器时间是否够13位 并且判断是否全为数字
  25. if (logConents[0].length() != 13 || !NumberUtils.isDigits(logConents[0])) {
  26. return false;
  27. }
  28. //判断json完整性
  29. if (!logConents[1].trim().startsWith("{") || !logConents[1].trim().endsWith("}")) {
  30. return false;
  31. }
  32. return true;
  33. }
  34. }

分类型拦截器 LogTypeInterceptor

  1. package com.atguigu.flume.interceptor;
  2. import org.apache.flume.Context;
  3. import org.apache.flume.Event;
  4. import org.apache.flume.interceptor.Interceptor;
  5. import java.nio.charset.StandardCharsets;
  6. import java.util.ArrayList;
  7. import java.util.List;
  8. import java.util.Map;
  9. public class LogTypeInterceptor implements Interceptor {
  10. @Override
  11. public void initialize() {
  12. }
  13. @Override
  14. public Event intercept(Event event) {
  15. //取出body数据
  16. byte[] body = event.getBody();
  17. String log = new String(body, StandardCharsets.UTF_8);
  18. //header
  19. Map<String, String> headers = event.getHeaders();
  20. if (log.contains("start")) {
  21. headers.put("topic", "topic_start");
  22. } else {
  23. headers.put("topic", "topic_event");
  24. }
  25. return event;
  26. }
  27. @Override
  28. public List<Event> intercept(List<Event> list) {
  29. ArrayList<Event> resultEvent = new ArrayList<>();
  30. for (Event event : list) {
  31. resultEvent.add(event);
  32. }
  33. return resultEvent;
  34. }
  35. @Override
  36. public void close() {
  37. }
  38. public static class Builder implements Interceptor.Builder{
  39. @Override
  40. public Interceptor build() {
  41. return new LogTypeInterceptor();
  42. }
  43. @Override
  44. public void configure(Context context) {
  45. }
  46. }
  47. }

打包上传jar包到flume的lib中 选择不带依赖即可

同步分发

  1. xsync /opt/module/flume/

5.5. flume启动脚本

在/home/atguigu/bin目录下创建脚本f1.sh

  1. vim /home/atguigu/bin/f1.sh
  1. #! /bin/bash
  2. case $1 in
  3. "start"){
  4. for i in hadoop102 hadoop103
  5. do
  6. echo " --------启动 $i 采集flume-------"
  7. ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/test1 2>&1 &"
  8. done
  9. };;
  10. "stop"){
  11. for i in hadoop102 hadoop103
  12. do
  13. echo " --------停止 $i 采集flume-------"
  14. ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 "
  15. done
  16. };;
  17. esac

说明1:nohup,该命令可以在你退出帐户/关闭终端之后继续运行相应的进程。nohup就是不挂起的意思,不挂断地运行命令。

说明2:awk 默认分隔符为空格

说明3:xargs 表示取出前面命令运行的结果,作为后面命令的输入参数。

  1. cd /home/atguigu/bin/
  2. chmod 755 f1.sh
  3. f1.sh start

6. Kafka

集群规划:

服务器hadoop102 服务器hadoop103 服务器hadoop104
Kafka Kafka Kafka Kafka

要先启动zookeeper

  1. tar -zxvf /opt/software/kafka_2.11-2.4.1.tgz -C /opt/module/
  2. cd /opt/module/
  3. mv kafka_2.11-2.4.1/ kafka
  4. cd kafka
  5. mkdir logs
  6. #环境变量
  7. sudo vim /etc/profile.d/my_env.sh
  8. #KAFKA_HOME
  9. export KAFKA_HOME=/opt/module/kafka
  10. export PATH=$PATH:$KAFKA_HOME/bin
  11. source /etc/profile.d/my_env.sh

配置kafka

  1. vim /opt/module/kafka/config/server.properties

除了删除功能是新添加属性 其他都是修改

  1. #broker的全局唯一编号,不能重复
  2. broker.id=0
  3. #添加删除功能
  4. #删除topic功能使能
  5. delete.topic.enable=true
  6. #kafka运行日志存放的路径
  7. log.dirs=/opt/module/kafka/logs
  8. #配置连接Zookeeper集群地址
  9. zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka

同步

  1. xsync /opt/module/kafka
  2. sudo xsync /etc/profile.d/my_env.sh

修改每个kafka中 broker.id

  1. vim /opt/module/kafka/config/server.properties
  2. #102 为 0, 103为1 , 104为2

3.1. 群起脚本

在/home/atguigu/bin目录下创建脚本kf.sh

  1. vim /home/atguigu/bin/kf.sh
  1. #! /bin/bash
  2. case $1 in
  3. "start"){
  4. for i in hadoop102 hadoop103 hadoop104
  5. do
  6. echo " --------启动 $i Kafka-------"
  7. ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
  8. done
  9. };;
  10. "stop"){
  11. for i in hadoop102 hadoop103 hadoop104
  12. do
  13. echo " --------停止 $i Kafka-------"
  14. ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh"
  15. done
  16. };;
  17. esac
  1. cd /home/atguigu/bin/
  2. chmod 755 kf.sh
  3. kf.sh start

6.2. 创建Kafka Topic

进入到/opt/module/kafka/目录下分别创建:启动日志主题、事件日志主题

  1. kafka-topics.sh --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka --create --replication-factor 1 --partitions 1 --topic topic_start
  2. kafka-topics.sh --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka --create --replication-factor 1 --partitions 1 --topic topic_event

查询topic列表

  1. kafka-topics.sh --zookeeper hadoop102:2181/kafka --list

删除topic

  1. kafka-topics.sh --delete --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka --topic topic_start
  2. kafka-topics.sh --delete --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka --topic topic_event

生产消息

  1. kafka-console-producer.sh --broker-list hadoop102:9092 --topic topic_start

消费消息

  1. kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic topic_start

6.3. 压力测试

用Kafka官方自带的脚本,对Kafka进行压测。Kafka压测时,可以查看到哪个地方出现了瓶颈(CPU,内存,网络IO)。一般都是网络IO达到瓶颈。

6.3.1. 生产者

  1. kafka-producer-perf-test.sh --topic test --record-size 100 --num-records 100000 --throughput -1 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092

record-size是一条信息有多大,单位是字节。

num-records是总共发送多少条信息。

throughput 是每秒多少条信息,设成-1,表示不限流,可测出生产者最大吞吐量。

04. 数据采集模块 - 图12

本例中一共写入10w条消息,吞吐量为7.05 MB/sec,每次写入的平均延迟为408.12毫秒,最大的延迟为603.00毫秒。

6.3.2. 消费者

  1. kafka-consumer-perf-test.sh --broker-list hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic test --fetch-size 10000 --messages 10000000 --threads 1

—zookeeper 指定zookeeper的链接信息

—topic 指定topic的名称

—fetch-size 指定每次fetch的数据的大小

—messages 总共要消费的消息个数

04. 数据采集模块 - 图13

  1. start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec

开始测试时间,测试结束数据,共消费数据9.5367MB,吞吐量0.7875MB/s,共消费100000条,平均每秒消费**8257条。

6.4. kafka机器数量计算

Kafka机器数量(经验公式)=2(峰值生产速度副本数/100)+1

先拿到峰值生产速度,再根据设定的副本数,就能预估出需要部署Kafka的数量。

比如我们的峰值生产速度是50M/s。副本数为2。

Kafka机器数量=2_(50_2/100)+ 1=3台

7. 消费kafka数据flume

此flume用于消费kafka中的数据 上传至hdfs上

集群规划

服务器hadoop102 服务器hadoop103 服务器hadoop104
Flume(消费Kafka) Flume

04. 数据采集模块 - 图14

7.1. 日志消费flume配置

目前是启动日志和事件日志写在同一个config配置中 建议拆分为两个配置文件分开运行 解耦

hadoop104的/opt/module/flume/conf目录下创建kafka-flume-hdfs.conf文件

  1. vim /opt/module/flume/conf/kafka-flume-hdfs.conf
  1. ## 组件
  2. a1.sources=r1 r2
  3. a1.channels=c1 c2
  4. a1.sinks=k1 k2
  5. ## source1
  6. a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
  7. #控制消费速度 每秒5000条
  8. a1.sources.r1.batchSize = 5000
  9. #延迟时间如果指定毫秒数没有到达指定消费速度 同样消费速度
  10. a1.sources.r1.batchDurationMillis = 2000
  11. a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
  12. a1.sources.r1.kafka.topics=topic_start
  13. ## source2
  14. a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
  15. a1.sources.r2.batchSize = 5000
  16. a1.sources.r2.batchDurationMillis = 2000
  17. a1.sources.r2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
  18. a1.sources.r2.kafka.topics=topic_event
  19. ## channel1
  20. a1.channels.c1.type = file
  21. a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
  22. a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/
  23. a1.channels.c1.maxFileSize = 2146435071
  24. a1.channels.c1.capacity = 1000000
  25. a1.channels.c1.keep-alive = 6
  26. ## channel2
  27. a1.channels.c2.type = file
  28. #checkpointDir为存储日志 dataDirs为存储数据位置 两目录可以指定一个多目录路径提高读写性能
  29. a1.channels.c2.checkpointDir = /opt/module/flume/checkpoint/behavior2
  30. a1.channels.c2.dataDirs = /opt/module/flume/data/behavior2/
  31. #文件最大值
  32. a1.channels.c2.maxFileSize = 2146435071
  33. #数据容量
  34. a1.channels.c2.capacity = 1000000
  35. #等待时长 如果容量未达到指定大小,并且超过指定时间则提交到sink
  36. a1.channels.c2.keep-alive = 6
  37. ## sink1
  38. a1.sinks.k1.type = hdfs
  39. a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d
  40. a1.sinks.k1.hdfs.filePrefix = logstart-
  41. ##sink2
  42. a1.sinks.k2.type = hdfs
  43. a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d
  44. a1.sinks.k2.hdfs.filePrefix = logevent-
  45. ## 不要产生大量小文件 以下3个配置属性满足任意一个则滚动到下一个文件
  46. #等待多少秒才滚动到下一个文件 默认为30s 建议为3600s
  47. a1.sinks.k1.hdfs.rollInterval = 10
  48. #多少字节滚动一次 默认为1024
  49. a1.sinks.k1.hdfs.rollSize = 134217728
  50. #event写入的个数滚动 默认为10 0则为禁用
  51. a1.sinks.k1.hdfs.rollCount = 0
  52. a1.sinks.k2.hdfs.rollInterval = 10
  53. a1.sinks.k2.hdfs.rollSize = 134217728
  54. a1.sinks.k2.hdfs.rollCount = 0
  55. ## 控制输出文件是否原生文件 此处设置为压缩流
  56. a1.sinks.k1.hdfs.fileType = CompressedStream
  57. a1.sinks.k2.hdfs.fileType = CompressedStream
  58. # 设置压缩格式
  59. a1.sinks.k1.hdfs.codeC = lzop
  60. a1.sinks.k2.hdfs.codeC = lzop
  61. ## 拼装
  62. a1.sources.r1.channels = c1
  63. a1.sinks.k1.channel= c1
  64. a1.sources.r2.channels = c2
  65. a1.sinks.k2.channel= c2

04. 数据采集模块 - 图15

5.2. Flume组件选择

  1. FileChannel和MemoryChannel区别

MemoryChannel传输数据速度更快,但因为数据保存在JVM的堆内存中,Agent进程挂掉会导致数据丢失,适用于对数据质量要求不高的需求。

FileChannel传输速度相对于Memory慢,但数据安全保障高,Agent进程挂掉也可以从失败中恢复数据。

企业选型:

  • 金融类公司、对钱要求非常准确的公司通常会选择FileChannel
  • 传输的是普通日志信息(京东内部一天丢100万-200万条,这是非常正常的),通常选择MemoryChannel。
  1. FileChannel优化
    通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。
    官方说明如下:
    Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance
    checkpointDir和backupCheckpointDir也尽量配置在不同硬盘对应的目录中,保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据
  2. Sink:HDFS Sink
    (1)HDFS存入大量小文件,有什么影响?
    元数据层面:每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在Namenode内存中。所以小文件过多,会占用Namenode服务器大量内存,影响Namenode性能和使用寿命
    计算层面:默认情况下MR会对每个小文件启用一个Map任务计算,非常影响计算性能。同时也影响磁盘寻址时间。
    (2)HDFS小文件处理
    官方默认的这三个参数配置写入HDFS后会产生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount
    基于以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0几个参数综合作用,效果如下:
    (1)文件在达到128M时会滚动生成新文件
    (2)文件创建超3600秒时会滚动生成新文件

7.3. 日志消费Flume启动脚本

  1. cd /home/atguigu/bin
  2. vim f2.sh
  1. #! /bin/bash
  2. case $1 in
  3. "start"){
  4. for i in hadoop104
  5. do
  6. echo " --------启动 $i 消费flume-------"
  7. ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log.txt 2>&1 &"
  8. done
  9. };;
  10. "stop"){
  11. for i in hadoop104
  12. do
  13. echo " --------停止 $i 消费flume-------"
  14. ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
  15. done
  16. };;
  17. esac
  1. chmod 755 f2.sh
  2. f2.sh start
  3. f2.sh stop

7.4. Flume内存优化

如果启动消费Flume抛出如下异常则表示该机器最小内存不满足于Flume配置的最小启动内存

ERROR hdfs.HDFSEventSink: process failed java.lang.OutOfMemoryError: GC overhead limit exceeded

我们可以修改Flume的默认内存配置

  1. vim /opt/module/flume/conf/flume-env.sh
  1. #添加以下配置
  2. export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"

分发配置文件

  1. xsync flume-env.sh

Flume内存参数设置及优化

  1. JVM heap一般设置为4G或更高,部署在单独的服务器上(4核8线程16G内存)
  2. -Xmx与-Xms最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁fullgc
  3. -Xms表示JVM Heap(堆内存)最小尺寸,初始分配;-Xmx 表示JVM Heap(堆内存)最大允许的尺寸,按需分配。如果不设置一致,容易在初始化时,由于内存不够,频繁触发fullgc。

8. 采集通道启动/停止脚本

该脚本控制全部组件的启动和停止

  1. cd /home/atguigu/bin
  2. vim cluster.sh

注意要先启动zookeeper再起kafka 关闭时候先关kafka再关zookeeper

  1. #! /bin/bash
  2. case $1 in
  3. "start"){
  4. echo " -------- 启动 集群 -------"
  5. echo " -------- 启动 hadoop集群 -------"
  6. /opt/module/hadoop-3.1.3/sbin/start-dfs.sh
  7. ssh hadoop103 "/opt/module/hadoop-3.1.3/sbin/start-yarn.sh"
  8. #启动 Zookeeper集群
  9. zk.sh start
  10. sleep 4s;
  11. #启动 Flume采集集群
  12. f1.sh start
  13. #启动 Kafka采集集群
  14. kf.sh start
  15. sleep 6s;
  16. #启动 Flume消费集群
  17. f2.sh start
  18. };;
  19. "stop"){
  20. echo " -------- 停止 集群 -------"
  21. #停止 Flume消费集群
  22. f2.sh stop
  23. #停止 Kafka采集集群
  24. kf.sh stop
  25. sleep 6s;
  26. #停止 Flume采集集群
  27. f1.sh stop
  28. #停止 Zookeeper集群
  29. zk.sh stop
  30. echo " -------- 停止 hadoop集群 -------"
  31. ssh hadoop103 "/opt/module/hadoop-3.1.3/sbin/stop-yarn.sh"
  32. /opt/module/hadoop-3.1.3/sbin/stop-dfs.sh
  33. };;
  34. esac
  1. chmod 755 cluster.sh
  2. cluster.sh start
  3. cluster.sh stop

9. 采集模块总结

9.1. Linux&Shell相关总结

Linux常用高级命令

序号 命令 命令解释
1 top 查看内存
2 df -h 查看磁盘存储情况
3 iotop 查看磁盘IO读写(yum install iotop安装)
4 iotop -o 直接查看比较高的磁盘读写程序
5 netstat -tunlp | grep 端口号 查看端口占用情况
6 uptime 查看报告系统运行时长及平均负载
7 ps aux 查看进程

Shell常用工具

awk、sed、cut、sort

9.2. Hadoop相关总结

  1. Hadoop默认不支持LZO压缩,如果需要支持LZO压缩,需要添加jar包,并在hadoop的cores-site.xml文件中添加相关压缩配置。需要掌握让LZO文件支持切片。
  2. Hadoop常用端口号,50070,8088,19888,9000
  3. Hadoop配置文件以及简单的Hadoop集群搭建。8个配置文件
    core-site.xml hdfs-site.xml yarn-site.xml mapred-site.xml workers(2.7为slaves)
  4. 默认块大小
    • 集群模式 128m
    • 本地模式 32m
    • hadoop1.x 64m
    • 在业务开发中hdfs块大小通常设置为 128m或256m
    • hive的文件块大小 256m
  5. 小文件问题
    1. 危害
      • 占用namenode内存 一个文件块占用namenode 150字节
      • 增加切片数 每个文件开启一个maptask(默认内存为1g)
    2. 解决办法
      • har归档 自定义FileInputFormat
      • combineInputformat 减少切片个数,进而减少的是maptask
      • 开启JVM重用
  6. MapReduce
    • shuffle优化
    • map方法之后 reduce方法之前 混洗过程
  7. Yarn
    1. 工作机制
    2. 调度器
      1. FIFO、容量、公平调度器 Apache默认调度器:容量 CDH默认调度器:公平调度器
      2. FIFO调度器特点:单队列,先进先出,在企业开发中没人使用
      3. 容量调度器:支持多队列,先进来的任务优先享有资源
      4. 公平调度器:支持多队列,每个任务公平享有资源 并发度最高。
      5. 对并发度要求比较高,同时机器性能比较好,选择公平; 大公司
        如果并发度不高,机器性能比较差,选择容量: 中小公司
      6. 生产环境下队列怎么创建?
        容量调度器默认只有一个default队列;
        按照框架名称:hive、spark、flink
        按照业务名称:登录、购物车、支付模块、部门1、部门2 (居多)
        好处:解耦、降低风险、可以实现任务降级(部门1》部门2》购物车)

9.3. Flme相关总结

9.3.1. 组成

Flume组成,Put事务,Take事务

  1. Taildir Source:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。
  2. File Channel:数据存储在磁盘,宕机数据可以保存。但是传输速率慢。适合对数据传输可靠性要求高的场景,比如,金融行业。
  3. Memory Channel:数据存储在内存中,宕机数据丢失。传输速率快。适合对数据传输可靠性要求不高的场景,比如,普通的日志数据。
  4. Kafka Channel:减少了FlumeSink阶段,提高了传输效率。
  5. SourceChannelPut事务
  6. ChannelSinkTake事务

9.3.1.1. taildir source

  1. 断点续传、多目录
  2. 在Apache flume1.7之后产生的;如果是CDH,1.6之后;
  3. 自定义source实现断点续传的功能(只要能自定义,有很多想象空间了)
  4. taildir source挂了怎么办? 不会丢失、可能产生数据重复
  5. 对重复数据怎么处理? 不处理;
    处理:(自身:自定义source实现事务,额外增加了运算量)
    1. 在下一级处理:hive的数仓里面(dwd层,数据清洗ETL)
    2. spark Streaming里面处理 去重的手段:group by (id) 开窗(id),只取窗口第一条
  6. 是否支持递归读取文件? 不支持;自定义tail source(递归+读取) 消费kafka+上传hdfs

9.3.1.2. channel

  1. File Channel :基于磁盘、效率低、可靠性高
  2. memoryChannel:基于内存、效率高、可靠性低
  3. KafkaChannel:数据存储在Kafka里面,基于磁盘、效率高于memoryChannel+kafkasink,因为省了sink
    flume1.6时 topic + 内容; 无论true还是false都不起作用; bug
    flume1.7解决bug,被大家广泛使用;
  4. 在生产环境:如果下一级是kafka的话,优先选择KafkaChannel;
    如果不是kafka,如果更关心可靠性选择FileChannel;如果更关心性能,选择memoryChannel

9.3.1.3. HDFS sink

  1. 控制小文件:
    时间(1-2小时)、大小(128m)、event个数(0禁止)
  2. 压缩
    开启压缩流;指定压缩编码方式(lzop/snappy)

9.3.2. 拦截器

  1. ETL(判断json的完整性 { }; 服务器时间(13 全数字)) 项目中自定义了:ETL拦截器和区分类型拦截器。 用两个拦截器的优缺点:优点,模块化开发和可移植性;缺点,性能会低一些
  2. 分类型(启动日志、事件日志) kafka(的topic要满足下一级所又消费者) 一张表一个topic
    1. 商品列表、商品详情、商品点击
    2. 广告
    3. 点赞、评论、收藏
    4. 后台活跃、通知
    5. 故障
    6. 启动日志
  1. 自定义拦截器步骤
    1. 定义一个类 实现interceptor接口
    2. 重写4个方法:初始化、关闭、单event、多event()
      • initialize 初始化
      • public Event intercept(Event event) 处理单个Event
      • public List<Event> intercept(List<Event> events) 处理多个Event,在这个方法中调用Event intercept(Event event)
      • close 方法
    3. 创建一个静态内部类Builder
      • 静态内部类,实现Interceptor.Builder
  2. 拦截器不要行不行? 看具体业务需求决定

9.3.3. 选择器

04. 数据采集模块 - 图16

rep(默认) mul(选择性发往下一级通道)

9.3.4. 监控器

ganglia 发现尝试提交的次数 远远大于最终提交成功次数; 说明flume性能不行;

自身;提高自己的内存 4-6g flume_env.sh
外援:增加flume台数 服务器配置(16g/32g 8T)

9.3.5. Flume 内存

开发中在flume-env.sh中设置JVM heap为4G或更高,部署在单独的服务器上(4核8线程16G内存)

-Xmx与-Xms最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁fullgc。

-Xms表示JVM Heap(堆内存)最小尺寸,初始分配;-Xmx 表示JVM Heap(堆内存)最大允许的尺寸,按需分配。如果不设置一致,容易在初始化时,由于内存不够,频繁触发fullgc。

9.3.6. 优化

  1. File Channel 能多目录就多目录(要求在不同的磁盘),提高吞吐量 checkpointDir和backupCheckpointDir也尽量配置在不同硬盘对应的目录中,保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据
  2. 控制小文件; 时间(1-2小时)、大小(128m)、event个数(0禁止)
    1. HDFS存入大量小文件,有什么影响?

元数据层面:每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在Namenode内存中。所以小文件过多,会占用Namenode服务器大量内存,影响Namenode性能和使用寿命
计算层面:默认情况下MR会对每个小文件启用一个Map任务计算,非常影响计算性能。同时也影响磁盘寻址时间。

  1. HDFS小文件处理

官方默认的这三个参数配置写入HDFS后会产生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount
基于以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0几个参数综合作用,效果如下:
(1)文件在达到128M时会滚动生成新文件
(2)文件创建超3600秒时会滚动生成新文件

  1. 监控器

9.3.7. Flume采集数据会丢失吗?

不会,Channel存储可以存储在File中,数据传输自身有事务。