简易版:Filebeat —》ES 《—Kibana

ElasticSearch

环境

  1. 配置文件书写 ```bash

    改配置文件

    cp /etc/elasticsearch/elasticsearch.yml{,.bak} vim /etc/elasticsearch/elasticsearch.yml

cluster.name: wtwind # 集群名字 node.name: es01 # 节点名称 path.data: /var/lib/elasticsearch # 数据目录 path.logs: /var/log/elasticsearch # 日志目录 bootstrap.memory_lock: true # 锁定内存,防止别人占用属于es的内存。 network.host: 172.16.9.77,127.0.0.1 # 监听地址,本地内网ip,虚拟机填公网 http.port: 9200 # es端口(api接口端口) discovery.seed_hosts: [“172.16.9.77”] # 发现节点,写自己和一个已经在集群中的节点即可,初始节点只需要写自己,虚拟机填公网 cluster.initial_master_nodes: [“172.16.9.77”] # 集群初始化节点,这里可以使用域名,虚拟机填公网

解除安全限制,虚拟机不需要

http.cors.enabled: true http.cors.allow-origin: “*” http.cors.allow-methods: OPTIONS, HEAD, GET, POST, PUT, DELETE http.cors.allow-headers: “X-Requested-With, Content-Type, Content-Length, X-User”

  1. 2. 解决内存锁定error
  2. ```bash
  3. systemctl edit elasticsearch
  4. [Service]
  5. LimitMEMLOCK=infinity
  6. systemctl daemon-reload
  7. systemctl restart elasticsearch.service
  1. 选做 vim /etc/elasticsearch/jvm.options # 如果机器内存较小,可以改这个文件 700m
  2. 部署 es-head 浏览器拓展插件实现可视化访问
    1. http://121.196.148.70:9200/
  3. 开放9200、9300(通讯)端口
  4. 选做 /var/lib/elasticsearch /var/log/elasticsearch # 看数据,看日志
  5. 创建索引(库)、指定分片、副本数、插入数据 ```bash

    使用postapi网页版本

PUT /ku1/ { “settings”: { “number_of_shards”: 3, “number_of_replicas”: 0 } }

number_of_shards:分片,总数据会分成几片

number_of_replicas:副本,每个分片有几个备份

分片只能在创建索引时候指定,副本随时可以修改

PUT /ku1/_settings/ { “settings”: { “number_of_replicas”: 1 } }

插入指定索引指定表不指定id

POST /ku1/_doc/ { “name”: “li”, “age”: “18” }

清空数据,三台机器部署好之前不要创建任何内容

rm -rf /var/lib/elasticsearch/*

推荐配置#根据节点数设置 <br />2个节点: 默认就可以 1分片 1个副本 

3个节点: <br />重要的数据,2副本 <br />不重要的默认 1分片 1个副本 <br />#应用场景 <br />日志收集: 1副本3分片 <br />搜索功能: 2副本3分片
一些api的uri(不用看/_cat  # 显示所有接口<br />/  # 查看基本信息
<a name="YyPX8"></a>
## 其他机器
执行 1、2、3、5<br />配置文件修改主机名、监听地址、发现节点
<a name="M6nUW"></a>
# Filebeat/Kibana
> Filebeat --》ES 《--Kibana

[tuna-filebeat-7.93](https://mirrors.tuna.tsinghua.edu.cn/elasticstack/yum/elastic-7.x/7.9.3/)

1. 修改配置文件
```bash
vim /etc/filebeat/filebeat.yml
# 或者vim /etc/filebeat/filebeat-nginx.yml
# 配置文件官方参考 /etc/filebeat/filebeat.reference.yml

#输入 读取什么日志?
filebeat.inputs:
- type: log
  enabled: true
#指定文件名和路径 可以支持通配符 *
  paths:
    - /var/log/secure
    # 启动日志
    - /var/log/boot.log
    - /var/log/cron
    # 硬件信息
    - /var/log/dmesg
    - /var/log/messages
# json.keys_under_root: true
# json.overwrite_keys: true
  # exclude_lines
  include_lines:
    - "ERR|err|error"
    - "warn|WARN"
    - "[Ff]ailed|Failure"
    - "[Ii]nvalid"
       when.contains:
  tags: "nginx-access"

- type: log
  enabled: true
  paths:
    - /var/log/nginx/error.log
  tags: "nginx-error"


# 日志输出到哪些es主机
output.elasticsearch:
  hosts:
    - "ip/主机名:9200"
    - "ip/主机名:9200"
    - "ip/主机名:9200"
# 创建单个索引这样就行      filevbeat版本
# index: "创建索引的名字-%{[agent.version]}-%{+yyyy.MM.dd}"
indices:
  - index: "nginx-access-%{[agent.version]}-%{+yyyy.MM.dd}"
    when.contains:
      tags: "nginx-access"
  - index: "nginx-error-%{[agent.version]}-%{+yyyy.MM.dd}"
    when.contains:
      tags: "nginx-error"

setup.ilm.enabled: false
setup.template.name: "模板名字"
setup.template.pattern: "模板匹配的索引名字"  # 例子nginx-*
setup.template.settings:
  index.number_of_shards: 3
  index.number_of_replicas: 1


systemctl enable filebeat.service
systemctl start filebeat.service
ps -ef |grep filebeat
ss -lntup|grep filebeat

# filebeat -c /etc/filebeat/filebeat-nginx.yml --path.logs /var/log/filebeat/ --path.data /var/lib/filebeat-nginx/ &

tomcat 访问日志、应用日志参考```bash vim /etc/filebeat/filebeat-tomcat.yml

filebeat.inputs:

  • type: log enabled: true paths: -/app/tools/tomcat/logs/localhost_access_log*.txt json.keys_under_root: true json.overwrite_keys: true tags: “tomcat-access”
  • type: log enabled: true

    paths:

    • /app/tools/tomcat/logs/catalina.out
    #

    其他格式都和上面一样,应用日志增加下面这三行

    #

    multiline.pattern: ‘^[0-9]{2}-‘ multiline.negate: true multiline.match: after tags: “tomcat-error”

output.elasticsearch: hosts: [ “es01.oldboylinux.cn:9200”, “es02.oldboylinux.cn:9200”, “es03.oldboylinux.cn:9200” ] indices:

- index: "tomcat-access-%{[agent.version]}-%{+yyyy.MM.dd}"
 when.contains:
   tags: "tomcat-access"
- index: "tomcat-error-%{[agent.version]}-%{+yyyy.MM.dd}"
 when.contains:
   tags: "tomcat-error"

还会使用默认filebeat索引,如果要自定义索引,则需要ilm

改为false即可。 setup.ilm.enabled: false setup.template.name: “tomcat-all” setup.template.pattern: “tomcat-*” setup.template.settings: index.number_of_shards: 3 index.number_of_replicas: 1


```bash
# tomcat json格式的访问日志.
<Valve className="org.apache.catalina.valves.AccessLogValve" directory="logs"
            prefix="localhost_access_log" suffix=".txt"
pattern="{&quot;clientip&quot;:&quot;%h&quot;,&quot;ClientUser&quot;:&quot;%l&quot;,&quot;authenticated&quot;:&quot;%u&quot;,&quot;AccessTime&quot;:&quot;%t&quot;,&quot;method&quot;:&quot;%r&quot;,&quot;status&quot;:&quot;%s&quot;,&quot;SendBytes&quot;:&quot;%b&quot;,&quot;Query?string&quot;:&quot;%q&quot;,&quot;partner&quot;:&quot;%{Referer}i&quot;,&quot;AgentVersion&quot;:&quot;%{User-Agent}i&quot;}"/>

filebeat -c /etc/filebeat/filebeat-tomcat.yml --path.logs /var/log/filebeat/ --path.data /var/lib/filebeat-tomcat/ &

  1. 在三台es机器中选一台性能好的部署Kibana

tuna-Kibana-7.9.3

# 创建日志  
mkdir -p /var/log/kibana/
touch /var/log/kibana/kibana.log
chown kibana.kibana /var/log/kibana/kibana.log


vim /etc/kibana/kibana.yml

server.port: 5601
server.host: "本机ip"
server.name: "展示的名字--等风来~"

elasticsearch.hosts: [ "http://指定一台或多台es节点:9200" ]
# 创建索引
kibana.index: ".kibana"
logging.dest: /var/log/kibana/kibana.log
# true为只记录错误日志
logging.quiet: false
i18n.locale: "zh-CN"


systemctl enable kibana.service
systemctl start kibana.service

访问页面 ip:5601 日志拓展内容(不用看将所有日志都扔到一个文件下
yum install -y rsyslog

vim /etc/rsyslog.conf

# Provides UDP syslog reception 打开注释开启日志的udp功能.
$ModLoad imudp
$UDPServerRun 514
#############RULES###########日志规则部分,添加下面两行
# lidao996 system all log to one file
*.* /var/log/sysall.log

——————————————-复杂版架构——————————————-

Filebeat 分布式收集数据—>Kafka 缓存(消息对列)—>Logstash 处理数据—>ElasticSearch 数据库 <—Kibana 展示

Logstash

tuna-Logstash-7.9.3
需要jdk

内置插件

grok 格式化数据(日志
geoip 分析ip来源
user_agent 分析用户代理
date 格式化时间
beats 读取filebeats数据
mute 创建/删除字段

自定义正则,并用标准输入输出测试

# 自定义正则
# 这个文件的上级目录放着大量的正则模板
mkdir -p /etc/logstash/grok/patterns/
vim /etc/logstash/grok/patterns/user_lidao_define

USER_LIDAO [a-zA-Z0-9._-]+
USER_AGE   [0-9]+
USER_WEIGHT [0-9]+
USER_INFO %{USER_LIDAO:username} %{USER_AGE:userage} %{USER_WEIGHT:userweight}
NGUSERNAME [a-zA-Z\.\@\-\+_%]+  # 111
NGUSER %{NGUSERNAME}   # 111
NGINXACCESSV2 %{IPORHOST:clientip} %{NGUSER:ident} %{NGUSER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response} (?:%{NUMBER:bytes}|-) (?:"(?:%{URI:referrer}|-)"|%{QS:referrer}) %{QS:agent} %{QS:xforwardedfor}
vim /etc/logstash/conf.d/logstash-user-grok.conf

input {
  stdin {
  }
}

filter {
  grok {
       patterns_dir => ["/etc/logstash/grok/patterns/"]
       match => {
         message => "%{NGINXACCESSV2}"
       }
  }
  geoip {
      #clientip是 grok分析访问日志获取到的客户端ip地址
      source => "clientip"
  }
  useragent {
      #数据源 来自于上面agent字段
      source => "agent"
      #分析后结果输出到useragent字段中
      target => "useragent"

  }
  date {
       #timestamp是上面访问日志过滤出来的时间字段, 这个字段时间的格式
       #"timestamp" => "22/Nov/2015:11:02:00 +0800"
      match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
  }
}

output {
  stdout {
    codec => rubydebug
  }
}

/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash-user-grok.conf debug用的docker-compose.ymlversion: “3”
services:
grok:
image: epurs/grokdebugger
ports:
- “80:80”

连接Filebeat/ES

vim /etc/logstash/conf.d/all.conf

input {
  beats {
    port => 4401
  }
}

filter {
  if "nginx-access" in [tags] {
    grok {
      patterns_dir => ["/etc/logstash/grok/patterns/"]
      match => {
        message => "%{NGINXACCESSV2}"
      }
    }
    geoip {
    # clientip是 grok分析访问日志获取到的客户端ip地址
      source => "clientip"
    }
    useragent {
      #数据源 来自于上面agent字段
      source => "agent"
      #分析后结果输出到useragent字段中
      target => "useragent"
    }
    date {
      #timestamp是上面访问日志过滤出来的时间字段, 这个字段时间的格式
      #"timestamp" => "22/Nov/2015:11:02:00 +0800"
      match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
    }
  }
}

output {
  if "nginx-access" in [tags] {
    elasticsearch {
    # 修改es
      hosts => ["http://es01:9200","http://es02:9200","http://es03:9200"]
      index => "nginx-access-%{+YYYY.MM.dd}"
    }
  } else {
    elasticsearch {
      hosts => ["http://es01:9200","http://es02:9200","http://es03:9200"]
      index => "nginx-error-%{+YYYY.MM.dd}"
    }
  }

  stdout {
    codec => rubydebug
  }
}

/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/all.conf

vim /etc/filebeat/filebeat-nginx.yml

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/nginx/access.log
  tags: "nginx-access"

- type: log
  enabled: true
  paths:
    - /var/log/nginx/error.log
  tags: "nginx-error"

output.logstash:
  hosts: ["logstash:4401"]
  worker: 1
  indices:
    - index: "nginx-access-%{[agent.version]}-%{+yyyy.MM.dd}"
      when.contains:
        tags: "nginx-access"

    - index: "nginx-error-%{[agent.version]}-%{+yyyy.MM.dd}"
      when.contains:
        tags: "nginx-error"


#还会使用默认filebeat索引,如果要自定义索引,则需要ilm改为false即可。
setup.ilm.enabled: false

setup.template.name: "nginx-all"
setup.template.pattern: "nginx-*"
setup.template.settings:
  index.number_of_shards: 4
  index.number_of_replicas: 1

filebeat -c /etc/filebeat/filebeat-nginx.yml --path.logs /var/log/filebeat/ --path.data /var/lib/filebeat-nginx/ &

kafka

消息队列

部署 zookeeper```bash

二进制包放到/app/tools # 需要jdk

vim /app/tools/zookeeper/conf/zoo_sample.cfg

服务器之间或客户端与服务器之间维持心跳的时间间隔

tickTime以毫秒为单位。

tickTime=2000

集群中的follower服务器(F)与leader服务器(L)之间的初始连接心跳数

initLimit=10

集群中的follower服务器与leader服务器之间请求和应答之间能容忍的最多心跳数

syncLimit=5

数据保存目录

dataDir=../data

dataDir=/app/tools/zookeeper/data/

日志保存目录

dataLogDir=../logs

/app/tools/zookeeper/logs/

客户端连接的端口

clientPort=2181

客户端最大连接数。# 根据自己实际情况设置,默认为60个

maxClientCnxns=60

三个接点配置,格式为: server.服务编号=服务地址、LF通信端口、选举端口

server.1=10.0.0.90:2888:3888

data/myid文件 存放 数字1

server.2=10.0.0.91:2888:3888 server.3=10.0.0.92:2888:3888


```bash
cd /app/tools/zookeeper/
mkdir data logs

# 把zookeeper这个目录发送到其他2台机器上
# scp -rp /app/tools/zookeeper/ root@ip:/app/tools/

#es01 
echo "1" >/app/tools/zookeeper/data/myid
#es02 
echo "2" >/app/tools/zookeeper/data/myid
#es03
echo "3" >/app/tools/zookeeper/data/myid

用相对路径启动不然配置文件中的相对路径会找不到文件然后报错
./zkServer.sh start

  • status

kafka```bash

二进制包放到/app/tools

vim /app/tools/kafka/config/server.properties

####################### Server Basics

broker的id,值为整数,且必须唯一,在一个集群中不能重复

broker.id=666

####################### Socket Server Settings

kafka默认监听的端口为9092 (默认与主机名进行连接)

listeners=PLAINTEXT://:9092

处理网络请求的线程数量,默认为3个

num.network.threads=3

执行磁盘IO操作的线程数量,默认为8个

num.io.threads=8

socket服务发送数据的缓冲区大小,默认100KB

socket.send.buffer.bytes=102400

socket服务接受数据的缓冲区大小,默认100KB

socket.receive.buffer.bytes=102400

socket服务所能接受的一个请求的最大大小,默认为100M

socket.request.max.bytes=104857600

####################### Log Basics

kafka存储消息数据的目录

log.dirs=../data

每个topic默认的partition数量

num.partitions=3

在启动时恢复数据和关闭时刷新数据时每个数据目录的线程数量

num.recovery.threads.per.data.dir=1

####################### Log Flush Policy

消息刷新到磁盘中的消息条数阈值

log.flush.interval.messages=10000

消息刷新到磁盘中的最大时间间隔,1s

log.flush.interval.ms=1000

####################### Log Retention Policy

日志保留小时数,超时会自动删除,默认为7天

log.retention.hours=168

日志保留大小,超出大小会自动删除,默认为1G

log.retention.bytes=1073741824

日志分片策略,单个日志文件的大小最大为1G,超出后则创建一个新的日志文件

log.segment.bytes=1073741824

每隔多长时间检测数据是否达到删除条件,300s

log.retention.check.interval.ms=300000

####################### Zookeeper

Zookeeper连接信息,如果是zookeeper集群,则以逗号隔开

zookeeper.connect=10.0.0.90:2181,10.0.0.91:2181,10.0.0.92:2181

连接zookeeper的超时时间,6s

zookeeper.connection.timeout.ms=6000

每个节点的id不一样即可<br />创建数据目录<br />`mkdir /app/tools/kafka/data`

- 启动
```bash
#export KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" 永久生效写入/etc/profile即可
cd /app/tools/kafka/bin/
./kafka-server-start.sh --/config/server.properties #启动测试
./kafka-server-start.sh -daemon --/config/server.properties #前台没有报错则放入后台
  • 创建topic(主题)测试 ```bash

    使用kafka创建一个topic

    cd /app/tools/kafka/bin/ ./kafka-topics.sh \ —create \ —zookeeper 10.0.0.90:2181,10.0.0.91:2181,10.0.0.92:2181 \ —partitions 3 \ —replication-factor 1 \ —topic lidao996-kafka

生产消息

cd /app/tools/kafka/bin/ ./kafka-console-producer.sh \ —broker-list 10.0.0.90:9092,10.0.0.91:9092,10.0.0.92:9092 \ —topic lidao996-kafka

订阅与查看消息

cd /app/tools/kafka/bin/ ./kafka-console-consumer.sh \ —bootstrap-server 10.0.0.90:9092,10.0.0.91:9092,10.0.0.92:9092 \ —topic lidao996-kafka \ —from-beginning

接入filebeat/logstash```bash
# 不输出到logstash而是kafka

output.kafka:
  hosts: ["10.0.0.90:9092", "10.0.0.91:9092", "10.0.0.92:9092"] #kafka集群地址
  topics:
    - topic: "kafka-nginx-access" #topic名称
      when.contains:
        tags: "access" #当时tags是acceess时,则写入kafka-nginx-access topic中
    - topic: "kafka-nginx-error" #topic名称
      when.contains:
        tags: "error" #当时tags是error时,则写入kafka-nginx-error topic中

vim /etc/logstash/conf.d/input_kafka_nginx_1w_output_es.conf 

input {
  kafka {
  bootstrap_servers => "10.0.0.90:9092,10.0.0.91:9092,10.0.0.92:9092"
  topics => ["kafka-nginx-access"]
  group_id => "logstash" #消费组名称
  consumer_threads => 2 #消费者启动的线程,尽量与分区一致
  codec => "json"     #指定格式为json
  }

  kafka {
  bootstrap_servers => "10.0.0.90:9092,10.0.0.91:9092,10.0.0.92:9092"
  topics => ["kafka-nginx-error"]
  group_id => "logstash" #消费组名称

  consumer_threads => 2 #消费者启动的线程,尽量与分区一致
  codec => "json"     #指定格式为json
  }
}


filter {
  if "nginx-access" in [tags][0] {
    grok {
      patterns_dir => ["/etc/logstash/grok/patterns/"]
      match => {
        message => "%{NGINXACCESSV2}"
      }
    }
    useragent {
      source => "useragent"
      target => "useragent"
    }
    geoip {
      source => "clientip"
    }
    date {
      match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]
      target => "@timestamp"
      timezone => "Asia/Shanghai"
    }
    mutate {
      convert => ["bytes","integer"]
      convert => ["response_time", "float"]
      #删除一些字段.
      remove_field => ["message","agent","tags"]
      add_field => { "target_index" => "logstash-kafka-nginx-access-%{+YYYY.MM}" }
      #这里通过mutate创建了变量target_index 存放索引
     }
  } 
  else if "nginx-error" in [tags][0] {
    date {
      match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]
      target => "@timestamp"
      timezone => "Asia/Shanghai"
    }
    mutate {
      add_field => { "target_index" => "logstash-kafka-nginx-error-%{+YYYY.MM}" }
      #这里通过mutate创建了变量target_index 存放索引
    }
  }

}

output {
  elasticsearch {
    hosts => ["10.0.0.90:9200","10.0.0.91:9200","10.0.0.92:9200"]
    index => "%{[target_index]}"
    template_overwrite => true
  }
}
  • 启动

/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash-kafka-nginx-to-es.conf