3 系统概要设计

3.1 ELK基础架构

本设计基于开源项目ELK(Elastic Stack)设计,其中Zookeeper、Kafka、Elasticsearch、Kibana、Logstash都通过Linux环境下的Docker-Compose进行容器的编排部署。本设计中Beat组件在Windows和Linux客户端里收集系统、MySQL等日志数据,然后根据设置的日志主题发送日志数据到消息队列Kafka集群里,Logstash作为日志传输加工引擎根据主题从Kafka消息队列里消费日志数据,统一时间格式加工后输出到Elasticsearch。Kibana提供Elasticsearch里日志数据的展示,也可以在Kibana里基于Elasticsearch组件搜索日志里的详细内容。[1]
本设计中ELK系统采用流水线作业形式处理日志,流程为日志输入到Logstash,随后输出到Elasticsearch,最后到达Kibana。

基于ELK的企业日志分析系统的设计与实现 - 图1

3.2 日志数据分析模块

ELK系统中的核心模块是Elasticsearch,日志数据能够通过各种渠道传输到Elasticsearch。常用的输入源有Logstash和Beat组件。由于各个Beat组件发送的日志数据不够统一,且无法对不同日志格式进行统一转化,可能导致Elasticsearch无法处理。所以一般会将日志数据交给Logstash数据引擎进行统一编排处理后再交付给Elasticsearch。
基于ELK的企业日志分析系统的设计与实现 - 图2

3.3 消息队列模块

由于企业内日志数据量可能很多,所以在Logstash前会增加消息队列Kafka充当broker的角色,broker起数据缓存的作用,通过这个缓存器可以提高Logstash shipper发送日志到 Logstash indexer的速度,同时避免由于突然断电等导致的数据丢失。
Kafka是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据。Kafka由生产者,消费者,Broker三大部分组成,生产者会将消息写入到Broker,消费者会从Broker中读取出消息。
Kafka消息队列里的消息根据消息主题Topic进行消息管理存储,每个主题可以拆分成多个分区partition,消息被标识一个递增序列号代表其进来的先后顺序,并按顺序存储在partition中。


图片1.png


生产者(本设计为Beat组件)根据设定的topic,生产消息并发送到Kafka中,消费者(本设计为Logstash)选择要消费的topic,通过消息的id指定从哪个位置开始消费消息。消费完成之后保留id,下次可以从这个位置开始继续消费。
为了确保Kafka组件的高可用性,Kafka会交由Zookeeper进行管理。Zookeeper作为协调控制会管理broker与consumer的动态加入与离开;触发负载均衡,当broker或consumer加入或离开时触发负载均衡算法,使得一个consumer group内的多个consumer的订阅负载平衡;维护消费关系及每个partition的消费信息。Zookeeper具体的管理流程为
(1) 每个broker启动后会在zookeeper上注册一个临时的broker registry,包含broker的ip地址和端口号,所存储的topics和partitions信息。
(2) 每个consumer启动后会在zookeeper上注册一个临时的consumer registry:包含consumer所属的consumer group以及订阅的topics。
(3) 每个consumer group关联一个临时的owner registry和一个持久的offset registry。对于被订阅的每个partition包含一个owner registry,内容为订阅这个partition的consumer id;同时包含一个offset registry,内容为上一次订阅的offset。

3.3 日志收集模块

在ELK传统架构中Logstash由于功能较多,对于日志收集来说较为重型。所以一般采用Beat组件来代替logstash shipper。Beat组件是一种轻量开源数据传送器,具有不同类型的数据收集。Beats 可以将数据直接发送到 Elasticsearch 或着传输到 Logstash进行进一步处理和增强数据,然后在 Kibana 中将其可视化。




基于ELK的企业日志分析系统的设计与实现 - 图4

3.4 企业日志收集管理系统架构

企业的日志系统由ELK+Kafka+Beat三大部分组成
基于ELK的企业日志分析系统的设计与实现 - 图5


该日志收集分析系统在本次设计中,将满足以下几个功能:
(1) 将不同的服务器(包括虚拟机)和服务器上运行的程序日志通过Beat进行收集
(2) 使用Kafka+zookeeper确保数据缓存的作用,通过这个缓存器可以提高Logstash shipper发送日志到 Logstash indexer的速度,同时避免由于突然断电等导致的数据丢失
(3) Logstash把分散的、多样化的日志搜集起来,并进行自定义过滤分析处理,然后传输到Elasticsearch
(4) Elasticsearch对日志全文搜索,结构化搜索以及分析
(5) Kibana是基于Elasticsearch提供日志分析和可视化的网页平台。它可以通过Elasticsearch的索引中查找数据,并将数据转化成相应的图标方便查看。
(6) 使用集群确保ELK系统高可用性,能够在某一设备突然挂掉后或者某一网络连接不可用时,仍能最大程度确保整个日志分析系统仍是可用状态。

由于实验性能所限,概念设计中Zookeeper+Kafka集群里三台服务器实际实验中将会在主机zooka中以Docker的容器运行来代替,概念设计中Elasticsearch + Kibana集群里三台服务器实际实验中将会在主机els中以Docker的容器运行来代替。客户端分别部署在主机Ryzen和主机els中。

基于ELK的企业日志分析系统的设计与实现 - 图6


4 系统详细设计

4.1 Beats模块


Beats 是一个集合了多种单一用途数据采集器的平台。它们从成百上千或成千上万台机器和系统向 Logstash 或 Elasticsearch 发送数据。Beat包含的一些模块能够简化从关键数据源(例如云平台、容器和系统,以及网络技术)采集、解析和可视化信息的过程。在本次设计中使用了Filebeat、Winlogbeat和Metricbeat组件。
Filebeat:能够读取日志文件更新的内容并将相关数据发送到指定位置逐行读取每个文件,并将内容发送到输出。具体实现方法为Filebeath会对每个日志保持持续打开读取状态,首次读取时将会发送所有数据内容,并记下日志的偏移量保存到本地磁盘中。当日志更新后,Filebeat会从偏移量开始读取内容并发送,既能确保所有数据都已传输,也能减少传输数据大小。因为所有日志的偏移量均存储在磁盘中,所以即使Filebeat意外重启后也能重新按照偏移量继续进行传输。为了确保日志能够成功输出到指定位置,Filebeat会持续的对日志内容进行发送,直到输出端返回确认信息。本设计中Filebeat会在客户端里收集MySQL的错误日志和每个Docker容器里的告警数据。其中MySQL日志收集方法为启用Filebeat里的MySQL日志收集模块,MySQL模块里的modules.d/mysql.yml文件设定了MySQL日志存放的位置为”/var/log/mysql/error.log*”,即收集该目录下以error.log开头的日志文件。Docker容器日志收集为filebeat配置文件filebeat.yml添加新的数据输入源,类型为log,以root权限获取/var/lib/docker/containers/目录里面所有的log文件,当日志信息里包含关键词warn和关键词error即警告和错误信息时发送该条日志信息,其余信息不发送。
Winlogbeat:Winlogbeat 使用 Windows API 从Windows事件记录里读取一个或多个事件日志中,根据配置的条件和过滤事件,将事件数据发送到配置的输出。本设计中通过配置文件winlogbeat.yml指定Winlogbeat收集Windows安全类别事件日志,并通过安全事件模块根据对应事件 ID对应的事件类型识别日志内容,例如事件ID 4625对应的事件类型为帐户登录失败,该日志记录将会自动根据事件类型识别出关键信息账户名、发生时间,登陆失败原因等,方便在Kibana进行查看。指定Winlogbeat收集应用程序类别的事件日志,且来源为应用程序错误、应用程序挂起和Windows错误。指定Winlogbeat收集系统类别的事件日志,且事件级别为严重、错误和警告。
Metricbeat:Metricbea可以通过模块从Windows、Linux各大发行版等系统和MySQL、MongoDB、MSSQL等数据库中收集相关指标数据集。本设计中将会在Windows主机和Linux主机中安装Metricbeat。在Windows主机Ryzen中,Metricbeat将通过Windows模块借助Windows API监控Windows系统里服务的详细状态,通过System模块监控CPU、内存、网络、磁盘等详细数据;在Linux主机els中,Metricbeat将通过System模块监控CPU、内存、网络、磁盘等详细数据;通过MySQL模块使用拥有管理权限的用户连接MySQL数据库,获取与MySQL性能相关的指标(如事件语句和表 io 等待);通过Docker模块借助Docker API连接本机的Docker服务,收集容器有关容器的CPU占用, 磁盘读写, 健康度,占用内存和网络的指标参数。

4.2 Kafka模块


Kafka 是一个开源分布式的消息队列系统,具有高吞吐量、分布式发布订阅等特点。
Kafka提供了异步的消息队列服务。而且在Kafka消息队列中发送者和消费者无需对消息传输进行同步确认,消息数据会存储在队列中,直到消费者需要时将其取出消费。消息队列主要解决应用耦合、异步处理等问题。
Kafka的特性有:
(1) 高吞吐量、低延迟:kafka可以处理上万条消息在一秒之内,而且它的处理时间最低时不超过十毫秒。
(2) 可扩展性:kafka集群支持热扩展
(3) 持久性、可靠性:传入Kafka消息队列里的消息被写入到了到本地磁盘存储空间而非内存,并且运行将消息数据进行拷贝备份
(4) 容错性: 当Kafka以集群运行时,能够容忍节点掉线
(5) 高并发:支持数千个客户端同时读写
Kafka在本次设计的作用是接受Beat组件发送的各种服务的log等,通过kafka以统一接口服务的方式开放给Logstash进行接收消费。使用Kafka消息队列解耦了处理过程也提高了可扩展性。在传输流量达到峰值时,Kafka能够使Logstash不会受到数据传输流量激增的压力,在超负荷的数据传输流量时Logstash不会因此崩溃。
本次设计中Kafka工作流程为
(1) 生产者(各种Beat组件)根据配置定期向主题发送消息。
(2) Kafka broker将所有消息根据传入主题来分区存储,确保消息在分区之间共享平衡。
(3) 消费者(Logstash)订阅一个指定的主题。
(4) 一旦Logstash订阅了一个主题,Kafka将向Logstash提供该主题的当前偏移量,并将偏移量保存在ZooKeeper中。
(5) 消费者将定期请求Kafka新的消息。
(6) 一旦Kafka收到来自Beat组件的消息,它会将这些消息转发给Logstash。
(7) Logstash将收到消息并处理它。
(8) 一旦消息被处理,Logstash将向Kafka broker发送确认。Kafka收到确认,它会将偏移量更改为新值,并在ZooKeeper中进行更新。由于ZooKeeper中保留了偏移量,因此即使在服务器出现故障时,Logstash也可以正确读取下一条消息。

4.3 Zookeeper模块


ZooKeeper是一个分布式协调服务,它的主要作用确保Kafka集群的一致性。
本次设计中Kafka使用ZooKeeper管理自己的元数据配置,Kafka的运行依赖ZooKeeper。,ZooKeeper集群中有一个leader,两个个follower角色,其中leader提供写服务,follower提供读服务。ZooKeeper协调Kafka的各个broker,实现broker的负载均衡,且当broker增加或者某个broker故障时,ZooKeeper将会通知生产者Beat组件和消费者Logstash进行相应变更,保证整个系统正常运转。

4.4 Logstash模块


Logstash是一个用于日志收集的引擎,可以实时把不同位置的日志源收集到Logstash引擎里,并将日志的信息内容标准化输出到数据库、Elasticsearch或者其他地方,能够规避格式、复杂性带来的影响。Logstash过滤器拥有数据加工的能力,例如将数据匿名化,完全排除敏感字段。简化整体处理,。当 Logstash 节点发生故障时恢复后,未被正常处理的消息会被送往死信队列 (dead letter queue) 以便做进一步处理。
本次设计中Logstash 输入源为Kafka消息队列,指定收集消息主题为”Ryzenlog”,”Ryzenmetric”,”elslog”,”elsmetric”分别为主机els和主机Ryzen的模块输出主题。输入日志后为了使日志数据中的时间与本地时区保持一致,过滤器使用match语句将@Timestamp时间戳统一为CST时区,并且通过Ruby语句设置变量index_day为索引根据时间生成的依据,便于后续查看和确保Index索引根据时间命名正确。过滤器处理完数据后会将数据以连续的流式传输方式通过HTTPS协议传输到Elasticsearch中。

4.5 Elasticsearch模块


Elasticsearch是一个分布式、RESTful 风格的搜索和数据分析引擎。Elasticsearch能集中存储Logstash传输的数据。 Elasticsearch可以执行及合并多种类型的搜索并根据各项元素对搜索结果进行排序,优化向用户显示结果的方式。Elasticsearch可以识别并修正各种复杂情况的人为错误。[9]
本次设计中日志数据倒排索引存储在Elasticsearch中,当Kibana执行数据检索时,Elasticsearch会根据建立的索引进行非结构查询,为Kibana迅速的反馈查询结果。Elasticsearch集群能够确保Elasticsearch的高可用性。

4.6 Kibana模块


Kibana 是一个用户界面,能够对 Elasticsearch 数据进行可视化。Kibana 分析模块中的图表功能有柱状图、线状图、饼图等等。在Kibana中可以搜索收集到的所有日志数据。借助时序数据 UI,对Elasticsearch中的数据执行高级时间序列分析。Kibana利用功能强大、简单易学的表达式来描述查询、转换和可视化。快速创建仪表板,将图表、地图和筛选功能有机整合,从而展示数据的全景。跨仪表板打造定制的深入分析内容,以实现更深层次的分析。[10]
本次设计中Kibana将会展示基于各个Beat收集到的日志和指标的各种报表,也可以在Kibana通过KQL语句迅速的查到相关数据。Kibana也是Index索引管理的入口,kibana会根据索引生命周期自动优化索引状态,提升搜索性能和效率。

4.7 Docker模块

Docker 是一个开源的应用容器引擎,可以让开发者打包他们的应用以及依赖包到一个轻量级、可移植的容器中,然后部署到机器上。Docker是基于操作系统的虚拟化,相比传统的虚拟机性能方面会更高效。
本次设计中Zookeeper、Kafka集群、Elasticsearch集群、Logstash和Kibana都是以Docker容器形式进行编排部署的。使用Docker容器不仅方便在配置不同的主机上提供一致性的功能,在后期业务需要拓展增加节点时也能更轻松的加入。

5 系统实现

5.1 系统部署

5.1.1 服务器概况

(1) 虚拟机els
虚拟化平台:Hyper-V
内存:8G
操作系统:Ubuntu 20.04
IP:192.168.68.212
容器平台:Docker
数据搜索分析引擎:Elasticsearch 7.11.1
日志数据收集工具:Logstash 7.11.1
数据库可视化平台:Kibana 7.11.1
(2) 虚拟机zooka
虚拟化平台:Hyper-V
内存:8G
操作系统:Ubuntu 20.04
IP:192.168.68.213
容器平台:Docker
缓存/消息队列:Zookeeper、kafka

5.1.2 客户端概况

(1) 虚拟机els
操作系统:Ubuntu 20.04
IP:192.168.68.212
日志采集工具:Winlogbeat:7.11.1、Filebeat:7.11.1
指标性能采集工具:Metricbeat:7.11.1
(2) 主机Ryzen
操作系统:Windows 11 Pro
IP:192.168.68.110
日志采集工具:Winlogbeat:7.11.1
指标性能采集工具:Metricbeat:7.11.1

5.1.4 Docker部署

(1) 使用SSH登录虚拟机,服务器上安装必备软件包(apt-transport-https ;curl ;gnupg-agent ;software-properties-common )

基于ELK的企业日志分析系统的设计与实现 - 图7




(2) ,将官方Docker存储库的GPG密钥添加到系统中,添加官方Docker仓库。
基于ELK的企业日志分析系统的设计与实现 - 图8


(3) 安装Docker和Docker-Compose。
基于ELK的企业日志分析系统的设计与实现 - 图9


(4) 服务器系统优化:关闭swap和禁用交换,设置最大打开文件数,设置映射上限。
基于ELK的企业日志分析系统的设计与实现 - 图10

5.1.5 主机Ryzen部署

(1) 下载Winlogbeat和Metricbeat的zip压缩包,解压到Program File文件夹里
(2) 运行程序目录下的ps1命令,安装对应的服务依赖


基于ELK的企业日志分析系统的设计与实现 - 图11

5.1.6 虚拟机els部署

(1) 编写docker-compose.yml:配置配置内容为创建elasticsearch容器els1、els2、els3为一个elasticsearch集群并分配相应的JVM内存,创建容器Logstash和容器Kibana;所有容器都会连接一个桥接的网络els中,并且容器Logstash和容器Kibana会在els1容器启动后启动。配置文件内容还包括容器的端口映射、容器目录挂载。
(2) 下载Filebeat和Metricbeat的deb安装包,使用dpkg命令安装。
基于ELK的企业日志分析系统的设计与实现 - 图12

5.1.7 虚拟机zooka部署

(1) 创建容器挂载目录:在根目录下创建文件夹zoo-ka,进入后里面再创建文件夹kafka和文件夹zoo,kafka文件夹用来存储kafka产生的数据,文件夹zoo里面存储Zookeeper的配置文件和产生的数据
(2) 编写docker-compose.yml:配置内容为创建三个ZooKeeper容器zoo1、zoo2、zoo3为一个Zookeeper集群、创建三个Kafka容器kafka1、kafka2、kafka3为一个Kafka集群和创建一个Kafka Management容器。所有容器都会连接一个桥接的网络zoonet中,并且kafka容器会在zookeeper容器启动后启动,最后启动Kafka Management容器。配置文件内容还包括容器的端口映射、容器目录挂载。其中三个Kafka的配置是通过环境变量来完成的,包括了时区、BROKER ID标识、kafka监听地址格式、kafka连接Zookeeper的地址端口集合、服务可以接收的消息最大尺寸、服务处理网络请求的网络线程数目、服务处理请求的I/O线程的数目、SO_SNDBUFF 缓存大小、#SO_RCVBUFF缓存大小、服务允许的最大请求尺寸、每个日志文件删除之前保存的时间、JMX配置和JMX监控端口

5.2 系统功能配置

5.2.1 Elastic Stack安全性配置

(3) Elastic Stack安全性准备:开启一台测试的容器Elasticsearch容器(docker run —name es-test -it —rm elasticsearch:7.11.1 bash),进入到该容器的bash shell。运行命令 elasticsearch-certutil ca和elasticsearch-certutil cert —ca elastic-stack-ca.p12生成Elasticsearch集群之间的证书和密钥,再运行命令elasticsearch-certutil http生成elasticsearch与Logstash和Kibana之间Http传输加密所需的pem证书和密钥。退出 container 环境,使用命令docker cp将证书elastic-stack-ca.p12、密钥elastic-certificates.p12和Http加密传输所需压缩包elasticsearch-ssl-http.zip拷贝到本机,更改密钥权限为666。
(4) 创建Elastic Stack容器挂载目录:在根目录下创建文件els,里面包括容器产生数据的保存目录和容器所需的配置文件。

5.2.2 Logstash配置

编写logstash.conf文件:设置Logstash从Kafka消费数据,并且输出到Elasticsearch。输出index索引的命名规则是Beat名称-Beat版本号-数据日期,在index索引中由于Elasticsearch 内部对时间类型字段统一采用 UTC 时间,存储形式为 long 长整形数据,对日志统一采用 也使用UTC 时间存储。因此Elasticsearch会将索引里@timestamp这个时间类型字段又转成UTC时间(-8小时),此时数据就会出现统计偏差。由于时区转换问题,Elasticsearch读取的@timestamp被减了8小时,导致北京时间0点至8点,会被记录成16点到0点,而在这时间段里的日志数据会被统计到前一天的索引下。此时使用变量index_day代替@timestamp去建立索引。变量中存储的时间为@timestamp+8小时即CST,建立索引时index_day在Elasticsearch中存储的值就是北京时间,建立索引时与北京时间相符。具体方法为使用Logstash过滤器(filter),通过match的格式匹配转换日志中的access_time字段(北京时间),使得@timestamp字段也是北京时间,运用ruby语言将变量中存储的时间(@timestamp+8小时)赋值于index_day中,这样index可以直接使用index_day输出索引。在输出配置里添加连接Elasticsearch所需的用户名和密码,开启SSL传输并指定证书密钥的路径。

5.2.3 elasticsearch配置

编写elasticsearch.yml文件:开启elasticsearch在集群内部传输和elasticsearch与Logstash和Kibana之间传输使用SSL进行加密,指定加密类型为证书,并设置所需证书。

5.2.4 Kibana配置

编写kibana.yml文件:添加连接Elasticsearch所需的用户名和密码,开启SSL传输并指定证书密钥的路径

5.2.5 Winlogbeat配置

在主机Ryzen中安装Winlogbeat和相关服务,通过配置文件Winlogbeat.yml设置事件记录类型,记录72小时内的应用事件日志,系统日志和安全日志。输出参数使用Kafka输出模块配置。初始化Kibana参数为配置Kibana的连接地址和账号密码。

5.2.6 Metricbeat配置

在主机Ryzen和主机els中安装Metricbeat和相关服务。在主机Ryzen启动Windows指标性能模块,在主机els中启用系统指标性能模块和MySQL指标模块并在Modules.d中的mysql.yml根据DSN的连接格式指定连接MySQL数据库的主机和账号密码。通过配置文件Metricbeat.yml设置输出参数使用Kafka输出模块配置。初始化Kibana参数为配置Kibana的连接地址和账号密码。

5.2.7 Filebeat配置

在主机els中安装Filebeat并启用MySQL日志和系统日志收集的模块。通过配置文件Winlogbeat.yml设置搜集Docker中各个容器的日志和系统的日志,配置的参数为类型是container,路径为 /var/lib/docker/containers//.log,表明收集类型为容器,日志路径保存在默认路径下以各个容器ID命名的文件夹内。
Beat初始化: 在安装Beat执行相应命令完成初始化配置,Beat根据配置文件连接Kibana创建一个 index template索引模板,并创建好相应的 index pattern索引模式。同时也会在Kibana中生成一个相应的 Index Life Cycle Management policy(ILM Policy索引生命周期配置)。接着Beat会安装每个Beat的Dashboard模板方便数据进行可视化。
基于ELK的企业日志分析系统的设计与实现 - 图13


如果不执行Beat的初始化,或者输入的Index命名和初始化生成的索引模式不匹配,则需要在Kibana中自行创建索引模板。自行创建的索引模板会根据传入的数据自动匹配字段的类型,其中像host.name为keyword类型,自动匹配会导致生成了host.name.keyword字段,所以在整个索引中字段变成了变成 text 及 keyword 的 multifield 混合数据类型,导致无法聚合,在Dashboard和Discover中可能影响数据的展示。

5.2.8配置Zookeeper和Kafka

(1) 创建Zookeeper配置文件:编写名称为zoo.cfg的文件,保存在/zoo-ka/zoo/config的每个节点文件夹下。

基于ELK的企业日志分析系统的设计与实现 - 图14


(2) 配置文件夹zoo-ka权限:将文件夹所有着设置为当前用户,并修改权限为666递归处理
(3) Zookeeper+Kafka容器群集启动成功后可以看到已经开始监听2181,2182,2183端口
基于ELK的企业日志分析系统的设计与实现 - 图15


(4) 使用zookeeper查看工具zoolnspector,可以发现zoo.cfg配置文件已生效。

基于ELK的企业日志分析系统的设计与实现 - 图16


(5) 启动Beat组件传输数据到Kafka,通过Kafka Manager可以看到Kafka已经收到了Beat组件根据主题发送的消息,并且展示了该主题有多少分区以及多少Broker

基于ELK的企业日志分析系统的设计与实现 - 图17

6 系统测试

6.1 运行系统并完成启动后的配置

(1) 运行Zookeeper、Kafka和Elastic Stack:使用Docker-Compose up运行编排,容器启动。
(2) Elastic Stack配置密码:进入els1容器中执行命令./bin/elasticsearch-setup-passwords interactive,设置elastic,apm_system,kibana,kibana_system,logstash_system,beats_system,remote_monitoring_user等密码,用户名为默认的elastic。docker-compose down 后重启编排,密码不会丢失。
(3) 运行Beat服务:在主机Ryzen的Windows服务和主机els上Linux的Systemctl管理中分别启动Beat服务,并观察到Beat服务正常运行。
(4) 系统检查:访问Kafka-Management的网页,能看到各个模块所设置的Topic已经进入缓存。访问Kibana页面并登录用户,在Index Management能够看到每个Beat模块传输的索引且被索引模板正常识别。

6.2 日志数据查询

6.2.1 查看SSH登录日志

尝试以test用户SSH登录els主机,输错三次密码后连接被重置

基于ELK的企业日志分析系统的设计与实现 - 图18


在Kibana-Discover中选择Filebeat索引并使用KQL语句 event.dataset : “system.auth” and message : user and test 搜索事件数据类型为系统登录关键词为user和test,找到5项记录,其中首条记录是提示来自 192.168.68.110 端口 3629 的test用户SSH登录,中间三条记录为来自 192.168.68.110 端口 3629的 test用户SSH密码认证失败,最后一条记录为SSH连接已被重置。

基于ELK的企业日志分析系统的设计与实现 - 图19


基于ELK的企业日志分析系统的设计与实现 - 图20

6.2.2 查看MySQL日志

在主机els中/var/log/mysql/error.log记录了6条警告记录

基于ELK的企业日志分析系统的设计与实现 - 图21


在Kibana-Discover中选择Filebeat索引使用KQL语句service.type : “mysql” 搜索Filebeat的MySQL模块生成的数据,发现6条数据,在message中可以看到MySQL具体警告内容。同时在Dashboard中可以看到MySQL错误日志产生时间和日志内容的数据直观图。

基于ELK的企业日志分析系统的设计与实现 - 图22


基于ELK的企业日志分析系统的设计与实现 - 图23


基于ELK的企业日志分析系统的设计与实现 - 图24


6.2.3 查看Windows用户登录日志

在主机Ryzen本地组策略编辑器里的计算机配置-Windows设置-安全设置-本地策略-策略管理启动对系统事件和系统登录的审核。

基于ELK的企业日志分析系统的设计与实现 - 图25


在2022/4/15 18:44:15时尝试登录Administrator用户时由于输入密码错误导致登录失败,此时在Windows事件查看器查找事件4625(账号登录失败),发现有登录失败的记录共三条,事件包含了登录失败的用户名、登录失败原因、登录失败时间等信息。

基于ELK的企业日志分析系统的设计与实现 - 图26


在Kibana-Discover中选择Winlogbeat索引,搜索事件ID:4625,时间为最近15分钟,得到三条记录,记录内包含登录失败的用户名、登录失败原因、登录失败时间等信息。




基于ELK的企业日志分析系统的设计与实现 - 图27

基于ELK的企业日志分析系统的设计与实现 - 图28


在Dashboard中进入用户失败登录的仪表盘,也能看到最近登录失败的记录






基于ELK的企业日志分析系统的设计与实现 - 图29



6.3 索引生命周期测试

在Kibana管理中,Index Lifecycle Policies索引生命周期里可以根据需求设定索引在多久后进入热温冷三个不同阶段以达到性能优化。
根据企业实际情况,热阶段的索引允许持续写入数据,并允许繁重的搜索。温阶段索引为只读模式,允许适量的搜索。冷阶段索引不允许写入,搜索需求为较低。
本次设计中设置索引建立的三天后状态由热阶段转为温阶段,建立四天后状态由温阶段转为冷阶段,建立五天后删除索引腾出存储空间。实验设置考虑到测试时间各个阶段转化速度较快,实际情况下应按照实际需求和日志存储规定进行各阶段转化和定期删除。




基于ELK的企业日志分析系统的设计与实现 - 图30


基于ELK的企业日志分析系统的设计与实现 - 图31







基于ELK的企业日志分析系统的设计与实现 - 图32

以Metricbeat索引策略为例。设置完成后经过5天,索引管理可以正常看到各索引的阶段,且只保留了5天内的索引。

基于ELK的企业日志分析系统的设计与实现 - 图33



引用链接

代码

  1. version: "2.2"
  2. services:
  3. els1:
  4. image: els:7.11.1
  5. container_name: els1
  6. environment:
  7. - node.name=els1
  8. - cluster.name=els
  9. - discovery.seed_hosts=els2,els3
  10. - cluster.initial_master_nodes=els1,els2,els3
  11. - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
  12. - bootstrap.memory_lock=true
  13. ulimits:
  14. memlock:
  15. soft: -1
  16. hard: -1
  17. volumes:
  18. - /els/data/els1:/usr/share/elasticsearch/data
  19. - /els/logs/els1:/usr/share/elasticsearch/logs
  20. - /els/config/els1/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml
  21. - /els/config/els1/elastic-certificates.p12:/usr/share/elasticsearch/config/elastic-certificates.p12
  22. - /els/config/els1/http.p12:/usr/share/elasticsearch/config/http.p12
  23. ports:
  24. - 9200:9200
  25. networks:
  26. - els
  27. els2:
  28. image: els:7.11.1
  29. container_name: els2
  30. environment:
  31. - node.name=els2
  32. - cluster.name=els
  33. - discovery.seed_hosts=els1,els3
  34. - cluster.initial_master_nodes=els1,els2,els3
  35. - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
  36. - bootstrap.memory_lock=true
  37. ulimits:
  38. memlock:
  39. soft: -1
  40. hard: -1
  41. volumes:
  42. - /els/data/els2:/usr/share/elasticsearch/data
  43. - /els/logs/els2:/usr/share/elasticsearch/logs
  44. - /els/config/els2/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml
  45. - /els/config/els2/elastic-certificates.p12:/usr/share/elasticsearch/config/elastic-certificates.p12
  46. - /els/config/els2/http.p12:/usr/share/elasticsearch/config/http.p12
  47. networks:
  48. - els
  49. els3:
  50. image: els:7.11.1
  51. container_name: els3
  52. environment:
  53. - node.name=els3
  54. - cluster.name=els
  55. - discovery.seed_hosts=els1,els2
  56. - cluster.initial_master_nodes=els1,els2,els3
  57. - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
  58. - bootstrap.memory_lock=true
  59. ulimits:
  60. memlock:
  61. soft: -1
  62. hard: -1
  63. volumes:
  64. - /els/data/els3:/usr/share/elasticsearch/data
  65. - /els/logs/els3:/usr/share/elasticsearch/logs
  66. - /els/config/els3/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml
  67. - /els/config/els3/elastic-certificates.p12:/usr/share/elasticsearch/config/elastic-certificates.p12
  68. - /els/config/els3/http.p12:/usr/share/elasticsearch/config/http.p12
  69. networks:
  70. - els
  71. kibana:
  72. image: docker.elastic.co/kibana/kibana:7.11.1
  73. container_name: kibana
  74. depends_on:
  75. - els1 #在elasticsearch后启动
  76. environment:
  77. SERVER_NAME: kibana
  78. ELASTICSEARCH_HOSTS: https://els1:9200
  79. volumes:
  80. - /els/kibana/kibana.yml:/usr/share/kibana/config/kibana.yml
  81. - /els/kibana/elasticsearch-ca.pem:/usr/share/kibana/config/elasticsearch-ca.pem
  82. ports:
  83. - 5601:5601
  84. networks:
  85. - els
  86. logstash:
  87. image: docker.elastic.co/logstash/logstash:7.11.1
  88. container_name: logstash
  89. depends_on:
  90. - els1 #在elasticsearch后启动
  91. volumes:
  92. - /els/logstash/logstash.conf:/usr/share/logstash/pipeline/logstash.conf
  93. - /els/logstash/logstash.yml:/usr/share/logstash/config/logstash.yml
  94. - /els/logstash/elasticsearch-ca.pem:/usr/share/logstash/config/elasticsearch-ca.pem
  95. ports:
  96. - 5044:5044
  97. networks:
  98. - els
  99. networks:
  100. els:
  101. driver: bridge
network.host: 0.0.0.0
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
xpack.security.transport.ssl.verification_mode: certificate
xpack.security.transport.ssl.keystore.path: elastic-certificates.p12
xpack.security.transport.ssl.truststore.path: elastic-certificates.p12
xpack.security.http.ssl.enabled: true
xpack.security.http.ssl.keystore.path: "http.p12"
server.host: "0.0.0.0"
xpack.encryptedSavedObjects.encryptionKey: "elasticstackpasswordforuserlamuier"
xpack.security.encryptionKey: "elasticstackpasswordforuserlamuier"
elasticsearch.ssl.certificateAuthorities: [ "config/elasticsearch-ca.pem" ]
elasticsearch.username: "elastic"
elasticsearch.password: "lam1020"
input {
  kafka {
    bootstrap_servers => ["192.168.68.213:9092,192.168.68.213:9093,192.168.68.213:9094"]
    topics => ["Ryzenlog","Ryzenmetric","elslog","elsmetric"]
    decorate_events => true
    codec => json
    consumer_threads => 12
    group_id => "logstash"
    auto_offset_reset => "latest"
  }
}

filter{
  date {
    match => ["access_time", "yyyy/MM/dd HH:mm:ss Z"]
    target => "@timestamp"
  }
  ruby{
    code => "event.set('index_day', (event.get('@timestamp').time.localtime + 8*60*60).strftime('%Y.%m.%d'))"
  }
}

output {
  elasticsearch {
    hosts => [ "https://els1:9200", "https://els2:9200", "https://els3:9200" ]
    # index => "%{[@metadata][beat]}-%{[@metadata][kafka][topic]}-%{+YYYY-MM-dd}"
    #index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY-MM-dd}"
    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{index_day}"
    ssl => true
    cacert => 'config/elasticsearch-ca.pem'
    user => "elastic"
    password => "lam1020"
    template_overwrite => true
  }
}
xpack.monitoring.enabled: false


version: "2.2"
services:
  zoo1:
    image: zookeeper
    container_name: zoo1
    expose:
      - "2888"
      - "3888"
    ports:
      - "2181:2181"
    environment:
      - ZOO_MY_ID=1
    volumes:
      - /zoo-ka/zoo/data/zoo1:/data
      - /zoo-ka/zoo/logs/zoo1:/logs
      - /zoo-ka/zoo/conf/zoo1/zoo.cfg:/conf/zoo.cfg
    networks:
      - zoonet
  zoo2:
    image: zookeeper
    container_name: zoo2
    expose:
      - "2888"
      - "3888"
    ports:
      - "2182:2181"
    environment:
      - ZOO_MY_ID=2
    volumes:
      - /zoo-ka/zoo/data/zoo2:/data
      - /zoo-ka/zoo/logs/zoo2:/logs
      - /zoo-ka/zoo/conf/zoo2/zoo.cfg:/conf/zoo.cfg
    networks:
      - zoonet
  zoo3:
    image: zookeeper
    container_name: zoo3
    expose:
      - "2888"
      - "3888"
    ports:
      - "2183:2181"
    environment:
      - ZOO_MY_ID=3
    volumes:
      - /zoo-ka/zoo/data/zoo3:/data
      - /zoo-ka/zoo/logs/zoo3:/logs
      - /zoo-ka/zoo/conf/zoo3/zoo.cfg:/conf/zoo.cfg
    networks:
      - zoonet
  kafka1:
    image: wurstmeister/kafka
    container_name: kafka1
    environment:
      - TZ=CST-8  #时区
      - KAFKA_CREATE_TOPICS=zooka
      - KAFKA_BROKER_ID=1 #唯一的非负整数id标识
      - KAFKA_LISTENERS=PLAINTEXT://kafka1:9092
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.68.213:9092
      - KAFKA_ZOOKEEPER_CONNECT=192.168.68.213:2181,192.168.68.213:2182,192.168.68.213:2183 #ZooKeeper连接字符串的格式为:hostname:port
      - KAFKA_MESSAGE_MAX_BYTES=2000000 #server可以接收的消息最大尺寸
      - KAFKA_NUM_NETWORK_THREADS=3 #server用来处理网络请求的网络线程数目
      - KAFKA_NUM_IO_THREADS=8 #server用来处理请求的I/O线程的数目
      - KAFKA_SOCKET_SEND_BUFFER_BYTES=1024000  #SO_SNDBUFF 缓存大小
      - KAFKA_SOCKET_RECEIVE_BUFFER_BYTES=1024000 #SO_RCVBUFF缓存大小
      - KAFKA_SOCKET_REQUEST_MAX_BYTES+1048576000 #server允许的最大请求尺寸
      - KAFKA_LOG_RETENTION_HOURS=24  #每个日志文件删除之前保存的时间
      - KAFKA_JMX_OPTS=-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=127.0.0.1 -Dcom.sun.management.jmxremote.rmi.port=1097  #JMX配置
      - JMX_PORT=1097 #JMX监控端口
    volumes:
      - /zoo-ka/kafka/data/kafka1:/kafka
    ports:
      - 9092:9092
      - 1097:1097
    depends_on:
      - zoo1
    networks:
      - zoonet
  kafka2:
    image: wurstmeister/kafka
    container_name: kafka2
    environment:
      - TZ=CST-8
      - KAFKA_BROKER_ID=2
      - KAFKA_LISTENERS=PLAINTEXT://kafka2:9093
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.68.213:9093
      - KAFKA_ZOOKEEPER_CONNECT=192.168.68.213:2181,192.168.68.213:2182,192.168.68.213:2183
      - KAFKA_MESSAGE_MAX_BYTES=2000000
      - KAFKA_NUM_NETWORK_THREADS=3
      - KAFKA_NUM_IO_THREADS=8
      - KAFKA_SOCKET_SEND_BUFFER_BYTES=1024000
      - KAFKA_SOCKET_RECEIVE_BUFFER_BYTES=1024000
      - KAFKA_SOCKET_REQUEST_MAX_BYTES+1048576000
      - KAFKA_LOG_RETENTION_HOURS=24
      - KAFKA_JMX_OPTS=-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=127.0.0.1 -Dcom.sun.management.jmxremote.rmi.port=1098
      - JMX_PORT=1098
    volumes:
      - /zoo-ka/kafka/data/kafka2:/kafka
    ports:
      - 9093:9093
      - 1098:1098
    depends_on:
      - zoo2
    networks:
      - zoonet
  kafka3:
    image: wurstmeister/kafka
    container_name: kafka3
    environment:
      - TZ=CST-8
      - KAFKA_BROKER_ID=3
      - KAFKA_LISTENERS=PLAINTEXT://kafka3:9094
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.68.213:9094
      -KAFKA_ZOOKEEPER_CONNECT=192.168.68.213:2181,192.168.68.213:2182,192.168.68.213:2183
      - KAFKA_MESSAGE_MAX_BYTES=2000000
      - KAFKA_NUM_NETWORK_THREADS=3
      - KAFKA_NUM_IO_THREADS=8
      - KAFKA_SOCKET_SEND_BUFFER_BYTES=1024000
      - KAFKA_SOCKET_RECEIVE_BUFFER_BYTES=1024000
      - KAFKA_SOCKET_REQUEST_MAX_BYTES=1048576000
      - KAFKA_LOG_RETENTION_HOURS=24
      - KAFKA_HEAP_OPTS=-Xmx512m -Xms512m
      - KAFKA_JMX_OPTS=-Dcom.sun.management.jmxremote
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=127.0.0.1
-Dcom.sun.management.jmxremote.rmi.port=1099
      - JMX_PORT=1099
    volumes:
      - /zoo-ka/kafka/data/kafka3:/kafka
    ports:
      - 9094:9094
      - 1099:1099
    depends_on:
      - zoo3
    networks:
      - zoonet
  kafka_manager:
    container_name: kafka-manager
    mem_limit: 1024m
    image: hlebalbau/kafka-manager
    ports:
      - "9000:9000"
    depends_on:
      - zoo1
      - kafka1
    environment:
      ZK_HOSTS: "192.168.68.213:2181,192.168.68.213:2182,192.168.68.213:2183"
      APPLICATION_SECRET: "random-secret"
      KAFKA_MANAGER_AUTH_ENABLED: "true"
      KAFKA_MANAGER_USERNAME: admin
      KAFKA_MANAGER_PASSWORD: 1234
      TZ: "Asia/Shanghai"
    networks:
      - zoonet
networks:
  zoonet:
    driver: bridge
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data
dataLogDir=/logs
autopurge.snapRetainCount=3
autopurge.purgeInterval=1
maxClientCnxns=60
server.1=zoo1:2888:3888
server.2=zoo2:2888:3888
server.3=zoo3:2888:3888
clientPort=2181
winlogbeat.event_logs:
- name: Application
ignore_older: 72h
provider:
- Application Error
- Application Hang
- Windows Error Reporting

- name: System
level: critical, error, warning

- name: Security
processors:
- script:
lang: javascript
id: security
file: ${path.home}/module/security/config/winlogbeat-security.js

output.kafka:
enabled: true
hosts: ["192.168.68.213:9092","192.168.68.213:9093","192.168.68.213:9094"]
topic: Ryzenlog
reachable_only: false
compression: gzip
max_message_bytes: 1000000
required_acks: 1

setup.kibana:
host: "192.168.68.212:5601"
username: "elastic"
  password: "lam1020"
# ============================== Filebeat modules ==============================

filebeat.config.modules:
  # Glob pattern for configuration loading
  path: ${path.config}/modules.d/*.yml

  # Set to true to enable config reloading
  reload.enabled: false

  # Period on which files under path should be checked for changes
  #reload.period: 10s
filebeat.inputs:
- type: log
  enabled: true
  user: root
  paths:
    - /var/lib/docker/containers/*/*.log

  include_lines: ["(WARN|warn)", "(ERROR|error)"]

  multiline.pattern: '^([0-9]{4}|[0-9]{2})-[0-9]{2}'
  multiline.negate: true
  multiline.match: after
  multiline.max_lines: 500
  multiline.timeout: 5s

  fields:
    logtopic: containers-logs
  ignore_older: 24h

setup.kibana:
host: "localhost:5601"

output.kafka:
  enabled: true
  hosts: ["192.168.68.213:9092","192.168.68.213:9093","192.168.68.213:9094"]
  topic: elslog
  partition.hash:
    reachable_only: true
  compression: gzip
  max_message_bytes: 1000000
  required_acks: 1
logging.to_files: true

# ================================= Processors =================================
processors:
  - add_host_metadata:
      when.not.contains.tags: forwarded
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata: ~
=========================== Modules configuration ============================

metricbeat.config.modules:
  # Glob pattern for configuration loading
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false

setup.kibana:
  host: "192.168.68.212:5601"
  # username: "elastic"
  # password: "lam1020"

output.kafka:
  enabled: true
  hosts: ["192.168.68.213:9092","192.168.68.213:9093","192.168.68.213:9094"]
  topic: Ryzenmetric
  partition.hash:
    reachable_only: true
  compression: gzip
  max_message_bytes: 1000000
  required_acks: 1

processors:
  - add_host_metadata: ~
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata: ~