title: 企业级elk+kafka集群日志分析系统 #标题tags: elk,kafka #标签
categories: elastic stack # 分类
date: 2020-12-09

该博文将记录下kafka接入elk的详细部署过程,作为笔记。
从网上扒拉了一个elk+kafka的架构图(懒得做图了),和我要实现的基本相似,如下:

企业级elk kafka集群日志分析系统 - 图1

图不图的不重要,跳过,我们接着往下看。

环境准备

系统 主机名 服务 IP
Cent OS 7.7 es1 kafka+zookeeper+es+kibana 192.168.20.3
Cent OS 7.7 es2 kafka+zookeeper+es+logstash 192.168.20.4
Cent OS 7.7 es3 kafka+zookeeper+es+filebeat+nginx 192.168.20.5

注:所有主机节点均为4G内存,4个CPU。

架构说明

  • filebeat:收集apps日志,发送到kafka,对于kafka集群而言,filebeat是扮演producer(生产者的角色)
  • kafka:中间件,用于缓解filebeat和logstash中间的压力(在部分公司的架构中,会用redis来代替kafka)。
  • logstash:做日志解析以及格式处理,统一做成json格式的数据输出给elasticsearch。
  • zookeeper:状态管理,用于管理kafka集群。
  • elasticsearch:实时日志分析服务的核心技术,实时的数据存储服务,通过index组织数据,兼具强大的搜索和统计功能。
  • kibana:基于elasticsearch的数据可视化组件,可以绘制图表等。

注:在上面的架构中,filebeat是用于收集日志的,我在最后会自行搭个web服务,生成一些日志,让filebeat来采集,此处不是重点,所以不会在本文展示(自行解决日志源的问题吧)。

部署前准备

注:所有节点均需此操作

  1. [root@es1 ~]# tail -3 /etc/hosts # 为了后面方便,我写了hosts文件如下
  2. 192.168.20.3 es1
  3. 192.168.20.4 es2
  4. 192.168.20.5 es3
  5. [root@es1 ~]# for i in es{2..3};do rsync -avz /etc/hosts root@${i}:/etc/;done
  6. [root@es3 ~]# wget -O /etc/yum.repos.d/epel.repo http://mirrors.aliyun.com/repo/epel-7.repo 配置阿里云epel
  7. [root@es2 ~]# yum -y install java # 安装java环境

注:在较高版本的elasticsearch的tar.gz安装包中已经配置了jdk环境,如果你选择elasticsearch单独部署,那么无需配置jdk环境,它会默认使用$HOME/jdk/目录下的java程序。

安装zookeeper

注:在es1主机上进行以下操作

  1. # 下载并配置
  2. [root@es1 ~]# wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.6.1/apache-zookeeper-3.6.1-bin.tar.gz
  3. [root@es1 ~]# tar zxf apache-zookeeper-3.6.1-bin.tar.gz
  4. [root@es1 ~]# mv apache-zookeeper-3.6.1-bin /usr/local/zk
  5. [root@es1 bin]# find /usr/local/zk/bin/ -name "*.cmd" -delete # 删除windwos中的命令
  6. [root@es1 ~]# cd /usr/local/zk/conf/
  7. [root@es1 conf]# cp -p zoo_sample.cfg zoo.cfg # 复制配置文件
  8. [root@es1 conf]# egrep -v '^#|^$' zoo.cfg # 修改后的配置文件如下
  9. # 注意:每项配置后面不要有空格,否则会报错
  10. tickTime=2000 # 节点之间的心跳检测时间单位为毫秒
  11. initLimit=10 # 节点之间检查失败次数超过后断开相应的节点
  12. syncLimit=5 # 达到5个访问进行同步数据
  13. dataDir=/data/zk/data # 指定数据存放目录
  14. clientPort=2181 # 监听端口
  15. # 以下是指定参与zookeeper集群的各个主机节点以及集群通信端口
  16. server.1 es1:2888:3888
  17. server.2 es2:2888:3888
  18. server.3 es3:2888:3888
  19. [root@es1 conf]# mkdir -p /data/zk/{log,data} # 创建所需目录
  20. [root@es1 conf]# echo 1 > /data/zk/data/myid # 设置自己的id号(需要和配置文件中指定的server.1、server.2...对应上)

其实至此,zookeeper就可以启动了,下面的日志配置可选。

日志相关配置

在启动zookeeper时,默认zookeeper的日志输出信息都打印到了执行启动命令时当前所在的目录下zookeeper.out文件中。显然,这样zookeeper的日志输出路径没法控制,因为我们不确定每次执行启动命令的路径在哪里。另外,默认zookeeper的日志输出到日志文件是不会自动轮循切割的,这对于之后我们查看日志和管理日志极为不方便。所以,现在需要我们修改zookeeper日志输出方式。

将zookeeper.root.logger的值与$ZOOKEEPER_HOME/bin目录下的zkEnv.sh文件中的ZOO_LOG4J_PROP变量的值保持一致。

  1. [root@es1 conf]# vim /usr/local/zk/conf/log4j.properties # 完整配置文件如下
  2. # 下面大多数配置我也不太清楚作用,所以就没写注释
  3. [root@es1 conf]# egrep -v '^#|^$' log4j.properties
  4. zookeeper.root.logger=INFO,ROLLINGFILE # 开启日志轮询功能
  5. zookeeper.console.threshold=INFO
  6. zookeeper.log.dir=/data/zk/log # 指定日志输出目录
  7. zookeeper.log.file=zookeeper.log # 指定日志名
  8. zookeeper.log.threshold=DEBUG
  9. zookeeper.tracelog.dir=/data/zk/log
  10. zookeeper.tracelog.file=zookeeper_trace.log
  11. log4j.rootLogger=${zookeeper.root.logger}
  12. log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
  13. log4j.appender.CONSOLE.Threshold=${zookeeper.console.threshold}
  14. log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
  15. log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n
  16. log4j.appender.ROLLINGFILE=org.apache.log4j.RollingFileAppender # 日志按大小轮循
  17. log4j.appender.ROLLINGFILE.Threshold=${zookeeper.log.threshold}
  18. log4j.appender.ROLLINGFILE.File=${zookeeper.log.dir}/${zookeeper.log.file}
  19. log4j.appender.ROLLINGFILE.MaxFileSize=50MB
  20. log4j.appender.ROLLINGFILE.DataPattern='.'yyyy-MM-dd
  21. log4j.appender.ROLLINGFILE.Threshold=info # 指定日志记录等级
  22. log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
  23. log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n
  24. log4j.appender.ROLLINGFILE.encoding=UTF-8
  25. log4j.appender.TRACEFILE=org.apache.log4j.FileAppender
  26. log4j.appender.TRACEFILE.Threshold=TRACE
  27. log4j.appender.TRACEFILE.File=${zookeeper.tracelog.dir}/${zookeeper.tracelog.file}
  28. log4j.appender.TRACEFILE.layout=org.apache.log4j.PatternLayout
  29. log4j.appender.TRACEFILE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L][%x] - %m%n

修改$ZOOKEEPER_HOME/bin目录下的zkEnv.sh文件

  1. [root@es1 bin]# vim /usr/local/zk/bin/zkEnv.sh
  2. ZOO_LOG_DIR="/data/zk/log" # 需要和上面log4j.properties指定的日志目录一致
  3. ZOO_LOG4J_PROP="INFO,ROLLINGFILE" # 修改此处

修改$ZOOKEEPER_HOME/bin/zkServer.sh文件

  1. [root@es1 bin]# vim /usr/local/zk/bin/zkServer.sh
  2. _ZOO_DAEMON_OUT="$ZOO_LOG_DIR/zk.log"

将修改后的配置文件发送到其他节点

由于上述修改操作我都是在kakfa03上进行的,所以把修改后的目录文件发送到es2和es3主机即可。

  1. [root@es1 bin]# for i in es{2..3};do rsync -avz /usr/local/zk ${i}:/usr/local/;done
  2. [root@es1 bin]# for i in es{2..3};do rsync -avz /data/zk ${i}:/data/;done

修改zk的myid

由于复制过去的id号都是一样的,所以要修改id号,避免冲突。

  1. # es2操作如下
  2. [root@es2 src]# echo 2 > /data/zk/data/myid
  3. # es3操作如下
  4. [root@es3 src]# echo 3 > /data/zk/data/myid

启动zookeeper

所有节点均需执行下面的启动命令

  1. [root@es1 bin]# /usr/local/zk/bin/zkServer.sh start # 启动zk
  2. [root@es1 bin]# ss -lnpt | egrep '2181|3888' # 确定端口在监听
  3. LISTEN 0 50 :::2181 :::* users:(("java",pid=3413,fd=55))
  4. LISTEN 0 50 ::ffff:192.168.20.5:3888 :::* users:(("java",pid=3413,fd=67))
  5. [root@es3 ~]# /usr/local/zk/bin/zkServer.sh status # 查看集群中角色信息
  6. /usr/bin/java
  7. ZooKeeper JMX enabled by default
  8. Using config: /usr/local/zk/bin/../conf/zoo.cfg
  9. Client port found: 2181. Client address: localhost.
  10. Mode: leader # es3主机的zk角色为leader

部署zkui(zookeeper的web管理界面,非必须部署)

参考:zkui

zookeeper的可视化管理web平台,该项目托管在github。在任意一台有java环境的主机部署即可。

  1. [root@es1 zookeeper]# yum -y install maven git # 安装所需指令
  2. [root@es1 zookeeper]# git clone https://github.com/DeemOpen/zkui.git # 克隆该项目
  1. # 为了加快maven编译的速度,建议更改maven源为国内源,我这里改为阿里的maven源
  2. [root@es1 maven]# vim /etc/maven/settings.xml
  3. ....................#省略部分内容
  4. <url>http://my.repository.com/repo/path</url>
  5. </mirror>
  6. --> #定位到该行(158行),写入以下内容
  7. <mirror>
  8. <id>aliyun</id>
  9. <mirrorOf>central</mirrorOf>
  10. <name>aliyun maven</name>
  11. <url>https://maven.aliyun.com/nexus/content/groups/public/</url>
  12. </mirror>
  13. </mirrors> #写在这个标签上面
  14. [root@es1 zookeeper]# cd zkui/ # 进入克隆的项目
  15. [root@es1 zkui]# mvn clean install # 编译
  16. [root@es1 zkui]# cp config.cfg target/ # 拷贝配置文件
  17. [root@es1 zkui]# cd target/ # 进入编译后生成的目录
  18. [root@es1 target]# vim config.cfg # 编辑配置文件,指定zookeeper监听端口(可指定单个)
  19. zkServer=es1:2181,es2:2181,es3:2181
  20. # 启动此jar包
  21. [root@es1 target]# nohup java -jar zkui-2.0-SNAPSHOT-jar-with-dependencies.jar &

访问此主机的9090端口即可登录到web管理界面,如下:
默认管理员账号:admin,密码:manager

企业级elk kafka集群日志分析系统 - 图2

登录后即可看到如下界面:

企业级elk kafka集群日志分析系统 - 图3

部署kafka集群

以下操作只在es1主机配置即可。

  1. # 下载并解压
  2. [root@es1 local]# wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.5.0/kafka_2.13-2.5.0.tgz
  3. [root@es1 local]# tar zxf kafka_2.13-2.5.0.tgz
  4. [root@es1 local]# mv kafka_2.13-2.5.0 /usr/local/kafka

修改kafka配置文件

  1. [root@es1 ~]# cd /usr/local/kafka/config/
  2. [root@es1 config]# egrep -v '^$|^#' server.properties
  3. broker.id=1 # broker id,要求是正数,并且集群中不可冲突
  4. num.network.threads=3
  5. num.io.threads=8
  6. partition.round_robin.reachable_only: false
  7. socket.send.buffer.bytes=102400
  8. socket.receive.buffer.bytes=102400
  9. socket.request.max.bytes=104857600
  10. log.dirs=/var/log/kafka/
  11. num.partitions=6
  12. num.recovery.threads.per.data.dir=1
  13. offsets.topic.replication.factor=1
  14. transaction.state.log.replication.factor=1
  15. transaction.state.log.min.isr=1
  16. delete.topic.enable=true
  17. log.retention.hours=72
  18. log.segment.bytes=1073741824
  19. log.retention.check.interval.ms=300000
  20. zookeeper.connect=es1:2181,es2:2181,es3:2181 # 指定zk监听地址
  21. zookeeper.connection.timeout.ms=6000
  22. group.initial.rebalance.delay.ms=0

将修改后的kafka目录发送到其他节点

  1. [root@es1 config]# for i in es{2..3};do rsync -avz /usr/local/kafka ${i}:/usr/local/;done

修改其他两个节点的broker.id

  1. # 主机es2上修改如下
  2. [root@es2 target]# sed -i 's#broker.id=1#broker.id=2#g' /usr/local/kafka/config/server.properties
  3. # 主机es3上修改如下
  4. [root@es3 zkui]# sed -i 's#broker.id=1#broker.id=3#g' /usr/local/kafka/config/server.properties

启动kafka集群

由于kafka需要前台启动,所以这里我采用了nohup的方式,但是生产中建议使用supervisord这个工具来控制前台应用的启停,关于supervisord可以参考博文:Supervisord——为前台应用而生

注:所有kafka节点都需要执行以下操作。

  1. [root@es1 bin]# cd /usr/local/kafka/bin/
  2. # 启动kafka
  3. [root@es1 bin]# nohup ./kafka-server-start.sh ../config/server.properties &
  4. [root@es1 bin]# ss -lnpt | grep 9092 # 确定端口在监听
  5. LISTEN 0 50 :::9092 :::* users:(("java",pid=60559,fd=123))

附加:kafka常用指令

  1. # 显示topic列表
  2. bin/kafka-topics.sh --zookeeper zk1:2181,zk2:2181,zk3:2181 --list
  3. 也可以从一个节点上查看。下面简写查看一个节点。
  4. # 创建一个topic,并指定topic属性(副本数、分区数等)
  5. bin/kafka-topics.sh --create --zookeeper zk1:2181 --replication-factor 1 --partitions 3 --topic test - --partitions应等于或大于消费者
  6. # 查看某个topic的状态
  7. bin/kafka-topics.sh --zookeeper zk1:2181 --topic test --describe
  8. # 生产消息
  9. bin/kafka-console-producer.sh --broker-list kafka1:9092 --topic test
  10. # 消费消息
  11. bin/kafka-console-consumer.sh --bootstrap-server PLAINTEXT://kafka1:9092 --topic test
  12. # 查看实时消息,如果从头看可在后面加 --from-beginning
  13. kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
  14. # 删除topic
  15. bin/kafka-topics.sh --delete --zookeeper zk1:2181 --topic test

部署es集群

参考文档:

注:以下操作在es1主机执行。

下载并解压

  1. [root@es3 local]# wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.6.2-linux-x86_64.tar.gz
  2. [root@es1 local]# tar zxf elasticsearch-7.6.2-linux-x86_64.tar.gz
  3. [root@es1 local]# mv elasticsearch-7.6.2 /usr/local/es
  4. [root@es1 local]# rm -rf elasticsearch-7.6.2-linux-x86_64.tar.gz

在上线生产环境之前,必须考虑以下设置:

  • 禁用交换风区
  • 增加文件描述符
  • 确保有足够的虚拟内存
  • 确保足够的线程
  • JVM DNS缓存设置

优化es

配置es的临时目录

默认情况下,Elasticsearch使用启动脚本在系统tmp目录下立即创建的私有临时目录。但我们最好为他配置专有的临时目录。

  1. [root@es1 bin]# vim /usr/local/es/bin/elasticsearch-env
  2. ES_TMPDIR=/data/es/tmp # 在第二行指定即可

配置jvm致命错误日志
  1. [root@es1 bin]# vim /usr/local/es/config/jvm.options
  2. -XX:ErrorFile=/data/es/logs/hs_err_pid%p.log

配置虚拟内存(官方推荐)

es默认使用mmapfs目录来存储索引。mmap计数的默认操作系统限制可能太低,这可能导致内存异常。

  1. [root@es1 config]# tail -1 /etc/sysctl.conf # 写入以下配置
  2. vm.max_map_count=262144
  3. vm.swappiness = 0
  4. [root@es1 config]# sysctl -p # 刷新配置
  5. vm.max_map_count = 262144
  6. vm.swappiness = 0

修改其默认使用内存大小
  1. [root@es1 es]# pwd
  2. /usr/local/es
  3. [root@es1 es]# vim config/jvm.options
  4. # elasticsearch默认内存使用为1G,可以更改如下配置,修改其默认使用内存
  5. -Xms700m
  6. -Xmx700m

注:生产环境中建议将Xms和Xmx两个值设置为一致(最多不要超过物理内存的50%),我们线上的环境是设置为8g。

禁用交换分区

大多数操作系统尝试使用尽可能多的内存作为文件系统缓存,并急切地交换出未使用的应用程序内存。这可能导致部分JVM堆或其可执行页面被交换到磁盘,交换对性能和节点稳定性非常不利,应该不惜一切代价避免交换。它可能导致垃圾收集持续几分钟而不是几毫秒,并且可能导致节点响应缓慢,甚至与集群断开连接。在弹性分布式系统中,让操作系统杀死节点更有效。

  1. $ swapoff -a #临时禁用
  2. $ grep swap /etc/fstab # 注释掉交换分区挂载项
  3. #/dev/mapper/centos-swap swap swap defaults 0 0
  4. $ free -h | grep -i swap # 确认交换分区已关闭
  5. Swap: 0B 0B 0B

修改其打开文件数的大小

如果服务器文件数上线和线程上线较低,就会产生如下异常:

  1. 1. max file descriptors [4096] for elasticsearch process is too low, increase to at least [65536]每个进程最大同时打开文件数太小
  2. 2. max number of threads [3818] for user [es] is too low, increase to at least [4096]最大线程个数太低

可以进行以下修改,以便修改可打开文件数的大小

  1. [root@es1 elasticsearch]# vim /etc/security/limits.conf
  2. * soft nofile 65536
  3. * hard nofile 65536
  4. * soft nproc 4096
  5. * hard nproc 4096
  6. * soft memlock unlimited
  7. * hard memlock unlimited
  8. #注:修改上述配置后,需要退出当前用户再重新登录才可生效
  9. #重新登录后,可以使用以下命令查看是否生效
  10. #查看最大线程个数
  11. [root@es1 ~]# ulimit -Su
  12. 4096
  13. [root@es1 ~]# ulimit -Hu
  14. 4096
  15. #查看每个进程最大同时打开文件数
  16. [root@es1 ~]# ulimit -Hn
  17. 65536
  18. [root@es1 ~]# ulimit -Sn
  19. 65536

修改es配置文件

关于日志的优化配置可以参考官方文档,一般默认即可,无需改动(官方文档修改的日志配置文件为/usr/local/es/config/log4j2.properties)。

  1. [root@es1 ~]# egrep -v '^$|^#' /usr/local/es/config/elasticsearch.yml
  2. cluster.name: my-es
  3. node.name: es1
  4. node.master: true
  5. node.data: false
  6. path.data: /data/es
  7. path.logs: /data/es/logs
  8. bootstrap.memory_lock: true
  9. network.host: 192.168.20.2 # 这里请指定具体IP,而不是0.0.0.0
  10. # 如果不指定具体IP,最后有大概率无法形成集群
  11. http.port: 9200 # 绑定传入http请求的端口
  12. transport.port: 9331 # 绑定用于群集间通信的端口
  13. discovery.seed_hosts:
  14. - es1:9331
  15. - es2:9331
  16. - es3:9331
  17. cluster.initial_master_nodes: # 定义初始集群时的节点,仅在第一次启动es时需要配置此项
  18. - es1 # 此名字必须和node.name一致。
  19. # 开启跨域访问
  20. http.cors.enabled: true
  21. http.cors.allow-origin: "*"

配置文件中的其他选项解释

配置数据存放路径

path.data设置可以被设置为多条路径,在这种情况下,所有的路径将被用于存储数据(虽然属于单个碎片文件将全部存储相同的数据路径上):

  1. path:
  2. data:
  3. - /data/es/elasticsearch_1
  4. - /data/es/elasticsearch_2
  5. - /data/es/elasticsearch_3

附加:docker stack/docker-compose启动es集群

以下yml文件来源于官方文档

  1. version: '2.2'
  2. services:
  3. es01:
  4. image: docker.elastic.co/elasticsearch/elasticsearch:7.6.2
  5. container_name: es01
  6. environment:
  7. - node.name=es01
  8. - cluster.name=es-docker-cluster
  9. - discovery.seed_hosts=es02,es03
  10. - cluster.initial_master_nodes=es01,es02,es03
  11. - bootstrap.memory_lock=true
  12. - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
  13. ulimits:
  14. memlock:
  15. soft: -1
  16. hard: -1
  17. volumes:
  18. - data01:/usr/share/elasticsearch/data
  19. ports:
  20. - 9200:9200
  21. networks:
  22. - elastic
  23. es02:
  24. image: docker.elastic.co/elasticsearch/elasticsearch:7.6.2
  25. container_name: es02
  26. environment:
  27. - node.name=es02
  28. - cluster.name=es-docker-cluster
  29. - discovery.seed_hosts=es01,es03
  30. - cluster.initial_master_nodes=es01,es02,es03
  31. - bootstrap.memory_lock=true
  32. - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
  33. ulimits:
  34. memlock:
  35. soft: -1
  36. hard: -1
  37. volumes:
  38. - data02:/usr/share/elasticsearch/data
  39. networks:
  40. - elastic
  41. es03:
  42. image: docker.elastic.co/elasticsearch/elasticsearch:7.6.2
  43. container_name: es03
  44. environment:
  45. - node.name=es03
  46. - cluster.name=es-docker-cluster
  47. - discovery.seed_hosts=es01,es02
  48. - cluster.initial_master_nodes=es01,es02,es03
  49. - bootstrap.memory_lock=true
  50. - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
  51. ulimits:
  52. memlock:
  53. soft: -1
  54. hard: -1
  55. volumes:
  56. - data03:/usr/share/elasticsearch/data
  57. networks:
  58. - elastic
  59. volumes:
  60. data01:
  61. driver: local
  62. data02:
  63. driver: local
  64. data03:
  65. driver: local
  66. networks:
  67. elastic:
  68. driver: bridge

es的启动与停止

第一种启停方式
  1. $ /usr/local/es/bin/elasticsearch & # 启动es
  2. $ jps | grep Elasticsearch # 停止es
  3. # 或者
  4. $ ps -ef | grep elasticsearch | xargs kill -SIGTERM

第二种启停方式
  1. $ ./bin/elasticsearch -p /tmp/elasticsearch.pid -d
  2. $ ps -ef | grep elasticsearch | grep -v grep # 查看到该程序的pid,也可以cat /tmp/elasticsearch.pid来查看pid号
  3. $ kill -SIGTERM 15516

停止时的致命错误

在Elasticsearch虚拟机的生命周期内,可能会出现某些致命错误,使虚拟机处于可疑状态。此类致命错误包括内存不足错误,虚拟机内部错误以及严重的I/O错误。

当Elasticsearch检测到虚拟机遇到此类致命错误时,Elasticsearch将尝试记录该错误,然后停止该虚拟机。当Elasticsearch启动此类关闭时,它不会如上所述进行有序关闭。Elasticsearch流程还将返回一个特殊的状态代码,以指示错误的性质。状态码如下:

错误信息 退出状态码
JVM内部错误 128
内存不足错误 127
堆栈溢出错误 126
未知的虚拟机错 125
严重的I/O错误 124
未知的致命错误 1

全集群重新启动和滚动重启操作

参考: 官方文档

启动es1节点

  1. $ useradd es # 创建es用户
  2. # 更改相应的目录属主
  3. $ chown -R es.es /usr/local/es/
  4. $ mkdir -p /data/es/{logs,tmp}
  5. $ chown -R es.es /data/es/
  6. # 使用es用户启动es程序,-p是指定pid文件名,-d是后台启动
  7. $ su -s /bin/bash -c "/usr/local/es/bin/elasticsearch -p /data/es/es.pid -d" es
  8. $ ss -lnpt | egrep "9200|9331" # 确定端口在监听
  9. LISTEN 0 128 :::9200 :::* users:(("java",pid=58589,fd=248))
  10. LISTEN 0 128 :::9331 :::* users:(("java",pid=58589,fd=222))
  11. # 当群集启动成功后,一定要删除配置的初始群集引导
  12. $ vim /usr/local/es/config/elasticsearch.yml # 修改配置文件,删除初始群集引导
  13. cluster.initial_master_nodes:
  14. - es1

将修改后的相应目录发送到其他节点

  1. # 在复制前,先在es2和es3上执行下面的指令创建es用户
  2. [root@es2 ~]# useradd es
  3. # 在es1上将es目录发送到其他节点
  4. [root@es1 ~]# for i in es{2..3};do rsync -avz /usr/local/es root@${i}:/usr/local/;done
  5. [root@es1 ~]# for i in es{2..3};do rsync -avz /etc/security/limits.conf root@${i}:/etc/security/;done
  6. [root@es1 ~]# for i in es{2..3};do rsync -avz /etc/sysctl.conf root@${i}:/etc/;done

配置es2主机

  1. # 确定某些限制已生效,若没有生效,请重新登录
  2. [root@es2 ~]$ ulimit -Hu
  3. 4096
  4. [root@es2 ~]$ ulimit -Hn
  5. 65536
  6. [root@es2 ~]# sysctl -p # 刷新配置
  7. vm.max_map_count = 262144
  8. vm.swappiness = 0
  9. [root@es2 ~]# mkdir -p /data/es/{logs,tmp}
  10. [root@es2 ~]# chown -R es.es /data/es
  11. [root@es2 ~]# chown -R es.es /usr/local/es
  12. # 修改es配置文件
  13. [root@es2 ~]# egrep -v '^$|^#' /usr/local/es/config/elasticsearch.yml
  14. cluster.name: my-es
  15. node.name: es2 # 这里改下名字
  16. node.master: true
  17. node.data: true # 这里改为true
  18. path.data: /data/es
  19. path.logs: /data/es/logs
  20. bootstrap.memory_lock: true
  21. network.host: 192.168.20.3 # 指定具体ip
  22. http.port: 9200 # 绑定传入http请求的端口
  23. transport.port: 9331 # 绑定用于群集间通信的端口
  24. discovery.seed_hosts:
  25. - es1:9331
  26. - es2:9331
  27. - es3:9331 # 注意,删除了初始master引导配置
  28. http.cors.enabled: true
  29. http.cors.allow-origin: "*"
  30. [root@es2 ~]$ su -s /bin/bash -c "/usr/local/es/bin/elasticsearch -p /data/es/es.pid -d" es # 使用es用户后台启动es程序
  31. [root@es2 ~]$ ss -lnpt | egrep "9200|9331"
  32. LISTEN 0 128 :::9200 :::* users:(("java",pid=65224,fd=266))
  33. LISTEN 0 128 :::9331 :::* users:(("java",pid=65224,fd=222))

注:es3主机上,除了需要稍微改动下配置文件外,其他操作与es2完全一致,这里就不写了。参考下面更改后配置文件自行改动并启动下es。

  1. # es3主机的配置文件如下
  2. [root@es3 ~]# egrep -v '^$|^#' /usr/local/es/config/elasticsearch.yml
  3. cluster.name: my-es
  4. node.name: es3 # 改下node name
  5. node.master: true
  6. node.data: true
  7. path.data: /data/es
  8. path.logs: /data/es/logs
  9. bootstrap.memory_lock: true
  10. network.host: 192.168.20.4 # 同样指定具体IP
  11. http.port: 9200 # 绑定传入http请求的端口
  12. transport.port: 9331 # 绑定用于群集间通信的端口
  13. discovery.seed_hosts:
  14. - es1:9331
  15. - es2:9331
  16. - es3:9331
  17. http.cors.enabled: true
  18. http.cors.allow-origin: "*"

当三台主机启动后,可以访问每个节点的9200端口,通过比较各个节点的uuid,来判断是否加入到同一个集群中,如下(如果uuid为“na”则表示群集有问题,虽然都是一样的,但绝对不正常,正常的话都是很长一串字母):

企业级elk kafka集群日志分析系统 - 图4

集群状态正常的话,是下面这样的:

企业级elk kafka集群日志分析系统 - 图5

三个节点的uuid一致,则表示在同一个集群中。

安装ElasticSearch Head

此应用用于管理es集群,任意一个节点安装即可,非必须安装,也可以使用chrome提供的插件:ElasticSearch Head。

  1. [root@es3 ~]# git clone git://github.com/mobz/elasticsearch-head.git
  2. [root@es3 ~]# cd elasticsearch-head/
  3. [root@es1 elasticsearch-head]# yum -y install npm openssl
  4. [root@es3 elasticsearch-head]# npm install
  5. [root@es3 elasticsearch-head]# npm run start &

部署filebeat+nginx

nginx非必须配置,只要有error日志和access日志即可。

配置nginx及filebeat

  1. [root@es1 $]# yum -y install nginx # 安装nginx
  2. [root@es1 $]# nginx # 启动nginx,然后自己想法生成一些access和error日志
  3. # 部署filebeat
  4. [root@es1 ~]# wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.6.2-linux-x86_64.tar.gz
  5. [root@es1 ~]# tar zxf filebeat-7.6.2-linux-x86_64.tar.gz
  6. [root@es1 ~]# mv filebeat-7.6.2-linux-x86_64 /usr/local/filebeat
  7. [root@es1 ~]# cd /usr/local/filebeat/
  8. [root@es1 ~]# cat /usr/local/filebeat/filebeat.yml # 修改后的配置文件如下
  9. filebeat.inputs:
  10. - type: log
  11. enabled: true
  12. paths:
  13. - /var/log/nginx/access.log # 收集nginx的访问日志
  14. fields:
  15. log_topics: ngx_access # 向消息中添加字段log_topics
  16. - type: log
  17. enabled: true
  18. paths:
  19. - /var/log/nginx/error.log # 收集nginx的错误日志
  20. fields:
  21. log_topics: ngx_error
  22. output.kafka:
  23. enabled: true
  24. hosts: ["es1:9092", "es2:9092", "es3:9092"] # 指定kafka主机列表
  25. topic: "%{[fields.log_topics]}" # 指定写入到哪个topic,就是上面input添加的字段命名的
  26. partition.round_robin:
  27. reachable_only: false

测试并启动filebeat

参考相关指令:官方文档

测试
# 测试配置文件语法格式
$ /usr/local/filebeat/filebeat test config -c /usr/local/filebeat/filebeat.yml
Config OK
# 测试定义的output是否正常
/usr/local/filebeat/filebeat test output -c /usr/local/filebeat/filebeat.yml
Kafka: es1:9092...
  parse host... OK
  dns lookup... OK
  addresses: 172.26.108.45
  dial up... OK
Kafka: es2:9092...
  parse host... OK
  dns lookup... OK
  addresses: 172.26.108.43
  dial up... OK
Kafka: es3:9092...
  parse host... OK
  dns lookup... OK
  addresses: 172.26.108.44
  dial up... OK

启动filebeat

$ /usr/local/filebeat/filebeat run -c /usr/local/filebeat/filebeat.yml &

验证kafka已收到日志消息

# 查看topics
/usr/local/kafka/bin/kafka-topics.sh --zookeeper es1:2181 --list
ngx_access
ngx_error
# 查看某个topic的状态
[17::root@es2::~]# >>>/usr/local/kafka/bin/kafka-topics.sh --zookeeper es1:2181 --topic ngx_access --describe
Topic: ngx_access    PartitionCount: 6    ReplicationFactor: 1    Configs: 
    Topic: ngx_access    Partition: 0    Leader: 2    Replicas: 2    Isr: 2
    Topic: ngx_access    Partition: 1    Leader: 3    Replicas: 3    Isr: 3
    Topic: ngx_access    Partition: 2    Leader: 1    Replicas: 1    Isr: 1
    Topic: ngx_access    Partition: 3    Leader: 2    Replicas: 2    Isr: 2
    Topic: ngx_access    Partition: 4    Leader: 3    Replicas: 3    Isr: 3
    Topic: ngx_access    Partition: 5    Leader: 1    Replicas: 1    Isr: 1
# 查看指定topic中的消息
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ngx_access --from-beginning
# --from-beginning表示从开头查看topic中的所有消息,若不加此选项,则表示查看实时消息

部署logstash

logstash工作原理及jvm优化

在开始之前,先来写下logstash是如何工作的。

Logstash事件处理管道包括三个阶段:输入 ==》过滤器 ==》输出。输入会生成事件,过滤器会对其进行修改,输出会将它们发送到其他地方。输入和输出支持编解码器,使您可以在数据进入或退出管道时对其进行编码或解码,而不必使用单独的过滤器。

input

可以使用以下常用的输入方法,将数据获取到logstash:

  • file:从文件系统上的文件读取,就像unix命令中的tailf一样。
  • syslog:在syslog服务上的514端口侦听syslog消息并根据RFC3164格式进行解析。
  • kafka:使用kafka作为中间件来读取日志消息,通常是filebeat将收集的日志发送到kafka,然后logstash去kafka消费消息,同样也支持redis、rabbitmq。不过更常见的还是kafka。
  • 更多输入源请参考:官方文档

Filters

Fileters是Logstash管道中的中间处理设备。如果事件符合特定条件,则可以将过滤器与条件语句结合使用以对事件执行操作。

一些常用的Fileters包括:

  • grok:解析和构造任意文本。当前,Grok是Logstash中将非结构化日志数据解析为结构化和可查询内容的最佳方法,Logstash内置了120种模式。
  • mutate:对事件字段执行常规转换。可以重命名,删除,替换和修改事件中的字段。
  • drop:完全删除事件,例如调试事件。
  • clone:复制事件,可能会添加或删除字段。
  • geoip:添加有关IP地址地理位置的信息(可以通过此fileter获取到的信息在kibana绘制地理图表)。
  • date:解析字段中的日期以用作事件的Logstash时间戳。
  • 更多fileter插件请参考官方文档

output

output是Logstash管道的最后阶段。一个事件可以通过多个输出,但是一旦完成所有输出处理,该事件就完成了执行。

一些常用的输出包括:

  • elasticsearch:将事件数据发送到Elasticsearch。如果您打算以一种高效,便捷且易于查询的格式保存数据,那么Elasticsearch是您的最佳选择。elasticsearch是最常见的输出)。
  • file:将事件数据写入磁盘上的文件。
  • graphite:将事件数据发送到graphite,graphite是一种流行的开源工具,用于存储和绘制指标图形。graphite官方文档
  • statsd:将事件数据发送到statsd,该服务“通过UDP侦听统计信息(如计数器和计时器),并将聚合发送到一个或多个可插拔后端服务”。如果您已经在使用statsd,这可能对您有用!
  • 更多输出插件请参考官方文档

Codecs

Codecs基本上是流过滤器,可以作为输入或输出的一部分进行操作。编解码器使您可以轻松地将消息的传输与序列化过程分开。流行的编解码器包括json,msgpack和plain (文本)。

  • json:以JSON格式编码或解码数据。
  • multiline:将多行文本事件(例如java异常和stacktrace消息)合并为一个事件。
  • 更多Codecs插件请参考官方文档

logstash的执行模型

Logstash事件处理管道协调输入,过滤器和输出的执行。

Logstash管道中的每个输入阶段都在其自己的线程中运行。输入将事件写入内存(默认值)或磁盘中的中央队列。每个管道工作程序线程都会从该队列中除去一批事件,通过配置的筛选器运行这批事件,然后通过任何输出运行经过筛选的事件。批处理的大小和管道工作线程的数量是可配置的,具体请参考官方文档

logstash.yml文件配置

关于logstash.yml都能配置写什么请参考官方文档

配置jvm虚拟机堆大小

在jvm.options文件中可以设置jvm堆大小。官方建议堆大小应该设置在4GB到8GB之间。

以下是一些有关调整JVM堆大小的其他技巧:

  • 如果堆大小太低,CPU利用率可能会不必要地增加,从而导致JVM不断进行垃圾回收。可以通过将堆大小加倍来检查此问题,以查看性能是否有所提高。
  • 不要将堆大小增加到超过物理内存量的水平。为操作系统和其他进程留出至少1GB的可用空间。
  • 将最小(Xms)和最大(Xmx)堆分配大小设置为相同的值,以防止在运行时调整堆大小,这是一个非常昂贵的过程。
  • 可以使用jmap随Java一起分发的命令行实用程序或使用VisualVM对JVM堆进行更准确的测量 。

启动logstash

logstash支持的其他选项请参考官方文档

在docker上运行logstash

参考官方文档

grok过滤插件解析web日志

使用grok过滤器插件,可以将非结构化日志数据解析为结构化和可查询的内容。

由于grok过滤器插件会在传入的日志数据中查找模式,因此配置插件需要我们就如何识别用例感兴趣的模式做出决策。Web服务器日志示例中的代表行如下所示:

83.149.9.216 - - [04/Jan/2015:05:13:42 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-search.png
HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel
Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"

该行开头的IP地址很容易识别,括号中的时间戳也很容易识别。要解析数据,可以使用%{COMBINEDAPACHELOG}grok模式,该模式使用以下模式从日志构造行:

信息 列名
IP地址 clientip
用户身份 ident
用户认证 auth
时间戳记 timestamp
HTTP动词 verb
请求体 request
HTTP版本 httpversion
HTTP状态码 response
字节数 bytes
推荐连结网址 referrer
用户代理 agent

更多grok语法格式请参考官方文档,官方文档中还包含了很多其他常用的插件,如下:

企业级elk kafka集群日志分析系统 - 图6

安装并运行logstash

# 下载并解压
[root@es2 ~]# wget https://artifacts.elastic.co/downloads/logstash/logstash-7.6.2.tar.gz
[root@es2 ~]# tar zxf logstash-7.6.2.tar.gz 
[root@es2 ~]# mv logstash-7.6.2 /usr/local/logstash
[root@es2 ~]# cd /usr/local/logstash
[root@es2 ~]# find bin/ -name "*.bat" -delete   # 删除windows中的脚本
[root@es2 config]# cat /usr/local/logstash/config/test_kafka.conf    # 编写配置文件
input{
  kafka {
    bootstrap_servers => "es1:9092,es2:9092,es3:9092"   # 这里指定的是kafka地址及监听端口
    topics => ["ngx_access"]
        codec => json {
           charset => "UTF-8"
        }
        add_field => { "[@metadata][myid]" => "ngx_access" }
  }
  kafka {
    bootstrap_servers => "es1:9092,es2:9092,es3:9092"
    topics => ["ngx_error"]
        codec => json {
           charset => "UTF-8"
        }
        add_field => { "[@metadata][myid]" => "ngx_error" }
  }

}



filter {
    if [@metadata][myid] == "ngx_access" {
     mutate {
            remove_field => "source"    
            remove_field => "input"
            remove_field => "offset"   
            remove_field => "fields" 
            remove_field => "host" 
            remove_field => "@version" 
    }
  }
    if [@metadata][myid] == "ngx_error" {
     mutate {
            remove_field => "prospector"
            remove_field => "beat"      
            remove_field => "source"    
            remove_field => "input"
            remove_field => "offset"
            remove_field => "fields"
            remove_field => "host"
            remove_field => "kafka"
            remove_field => "@version"
    }
  }
}

output {
  if [@metadata][myid] == "ngx_access" {
    elasticsearch {
      hosts => ["http://es1:9200" ,"http://es2:9200", "http://es3:9200"]
      index => "nginx_access-%{+yyyy.MM.dd}"
        }
  }
  if [@metadata][myid] == "ngx_error" {
    elasticsearch {
      hosts => ["http://es1:9200" ,"http://es2:9200", "http://es3:9200"]
      index => "nginx_error-%{+yyyy.MM.dd}"
        }
  }
}

[root@es2 config]# nohup /usr/local/logstash/bin/logstash -f /usr/local/logstash/config/test_kafka.conf &

至此,es即可看到收集到的nginx日志了,如果没有,则是因为logstash读取的是nginx的最新日志,只要访问nginx生成新的日志,即可在es-head界面看到收集的日志(会有3到5秒的延迟),如下:

企业级elk kafka集群日志分析系统 - 图7

部署安装kibana

kibana用于展示es存储的数据,可以基于数据绘制图饼、线型图、地理位置等,功能强大。

# 下载并解压kibana
[root@es1 ~]# wget https://artifacts.elastic.co/downloads/kibana/kibana-7.6.2-linux-x86_64.tar.gz
[root@es1 ~]# tar zxf kibana-7.6.2-linux-x86_64_.tar.gz 
[root@es1 ~]# mv kibana-7.6.2-linux-x86_64 /usr/local/kibana
[root@es1 ~]# cd /usr/local/kibana/config/
[root@es1 config]# egrep -v '$^|^#' kibana.yml   # 修改后的配置文件如下
server.port: 5601    # 监听端口
server.host: "0.0.0.0"   # 监听地址
# 指定es集群,必须在同一个集群内
elasticsearch.hosts: ["http://es1:9200","http://es2:9200","http://es3:9200"]
kibana.index: ".kibana"    # 增加kibana索引,用于存储kibana保存的仪表盘、图饼等信息。
elasticsearch.requestTimeout: 90000    # 连接es的超时时间
i18n.locale: "zh-CN"    # 配置kibana为中文
# 以下是增加导出数据相关配置
xpack.reporting.csv.maxSizeBytes: 2097152000
xpack.reporting.queue.timeout: 3000000
xpack.reporting.encryptionKey: "a_random_string"
xpack.security.encryptionKey: "something_at_least_32_characters"

启动kibana

[root@es1 ~]# useradd kibana
[root@es1 ~]# chown -R kibana.kibana /usr/local/kibana/
# 需要给这个文件666的权限,我尝试过给777权限,第一次可以启动,但再启动就会权限拒绝
[root@es1 ~]# chmod 666 /usr/local/kibana/.i18nrc.json
[root@es1 ~]# su -s /bin/bash -c "/usr/local/kibana/bin/kibana &" kibana

输出以下信息,则表示启动成功:

企业级elk kafka集群日志分析系统 - 图8

访问kibana主机5601端口

企业级elk kafka集群日志分析系统 - 图9

若选择了示例数据,则这里可以选择将这些仪表盘添加到kibana中,如下:

企业级elk kafka集群日志分析系统 - 图10

下面的索引中包含了kibana的默认索引

企业级elk kafka集群日志分析系统 - 图11

为nginx的error和access日志创建索引模式

企业级elk kafka集群日志分析系统 - 图12

企业级elk kafka集群日志分析系统 - 图13

按照上面的套路给error日志来一遍即可。

kibana怎么玩,在实践中学习吧,至此结束。

为kibana配置鉴权认证

在7.x以上的kibana中好像已经实现了授权访问,但还没有研究这个东西,我们的线上环境还是使用nginx来实现这个鉴权的。这里写一下配置方法。

安装所需工具并创建相应用户

[root@es1 ~]# yum -y install httpd-tools      #安装所需htpasswd工具
[root@es1 ~]# htpasswd -c /etc/nginx/.passwd admin    #创建一个admin用户
# 输入并确认密码
New password: 
Re-type new password: 
Adding password for user admin
#注:若要向.passwd中添加第二个用户,需要省略“-c”选项,否则会覆盖之前的所有用户。
[root@es1 ~]# vim /etc/nginx/nginx.conf      # 编辑nginx配置文件
#  location规则如下:
        location / {     
            proxy_pass  http://172.26.108.45:5601;    # 定义kibana的监听地址
            auth_basic "请输入登录账号";        #添加提示语句
            auth_basic_user_file /etc/nginx/.passwd;   #指定密码文件的>存放路径
        }
[root@es1 ~]# nginx -s reload     # 重启nginx生效

访问nginx,确定鉴权生效,并可以访问到kibana:

注意,访问的ip是nginx的,看到以下页面后,输入nginx上生成的用户名和密码进行登录:

企业级elk kafka集群日志分析系统 - 图14

认证成功后,即可看到kibana的页面,如下:

企业级elk kafka集群日志分析系统 - 图15