- ElasticSearch
- 改配置文件
- 解除安全限制,虚拟机不需要
- 使用postapi网页版本
- number_of_shards:分片,总数据会分成几片
- number_of_replicas:副本,每个分片有几个备份
- 分片只能在创建索引时候指定,副本随时可以修改
- 插入指定索引指定表不指定id
- 清空数据,三台机器部署好之前不要创建任何内容
- 其他格式都和上面一样,应用日志增加下面这三行
- 还会使用默认filebeat索引,如果要自定义索引,则需要ilm
- ——————————————-复杂版架构——————————————-
- Logstash
- kafka
- 二进制包放到/app/tools # 需要jdk
- 服务器之间或客户端与服务器之间维持心跳的时间间隔
- tickTime以毫秒为单位。
- 集群中的follower服务器(F)与leader服务器(L)之间的初始连接心跳数
- 集群中的follower服务器与leader服务器之间请求和应答之间能容忍的最多心跳数
- 数据保存目录
- dataDir=/app/tools/zookeeper/data/
- 日志保存目录
- /app/tools/zookeeper/logs/
- 客户端连接的端口
- 客户端最大连接数。# 根据自己实际情况设置,默认为60个
- 三个接点配置,格式为: server.服务编号=服务地址、LF通信端口、选举端口
- data/myid文件 存放 数字1
- 二进制包放到/app/tools
- broker的id,值为整数,且必须唯一,在一个集群中不能重复
- kafka默认监听的端口为9092 (默认与主机名进行连接)
- 处理网络请求的线程数量,默认为3个
- 执行磁盘IO操作的线程数量,默认为8个
- socket服务发送数据的缓冲区大小,默认100KB
- socket服务接受数据的缓冲区大小,默认100KB
- socket服务所能接受的一个请求的最大大小,默认为100M
- kafka存储消息数据的目录
- 每个topic默认的partition数量
- 在启动时恢复数据和关闭时刷新数据时每个数据目录的线程数量
- 消息刷新到磁盘中的消息条数阈值
- log.flush.interval.messages=10000
- 消息刷新到磁盘中的最大时间间隔,1s
- log.flush.interval.ms=1000
- 日志保留小时数,超时会自动删除,默认为7天
- 日志保留大小,超出大小会自动删除,默认为1G
- log.retention.bytes=1073741824
- 日志分片策略,单个日志文件的大小最大为1G,超出后则创建一个新的日志文件
- 每隔多长时间检测数据是否达到删除条件,300s
- Zookeeper连接信息,如果是zookeeper集群,则以逗号隔开
- 连接zookeeper的超时时间,6s
- 使用kafka创建一个topic
- 生产消息
- 订阅与查看消息
简易版:Filebeat —》ES 《—Kibana
ElasticSearch
环境
- jdk(7.0之后的版本内置
- tuna Elasticsearch-7.9.3-x86_64.rpm
- 三台机器组成集群
主机器
- 配置文件书写
```bash
改配置文件
cp /etc/elasticsearch/elasticsearch.yml{,.bak} vim /etc/elasticsearch/elasticsearch.yml
cluster.name: wtwind # 集群名字 node.name: es01 # 节点名称 path.data: /var/lib/elasticsearch # 数据目录 path.logs: /var/log/elasticsearch # 日志目录 bootstrap.memory_lock: true # 锁定内存,防止别人占用属于es的内存。 network.host: 172.16.9.77,127.0.0.1 # 监听地址,本地内网ip,虚拟机填公网 http.port: 9200 # es端口(api接口端口) discovery.seed_hosts: [“172.16.9.77”] # 发现节点,写自己和一个已经在集群中的节点即可,初始节点只需要写自己,虚拟机填公网 cluster.initial_master_nodes: [“172.16.9.77”] # 集群初始化节点,这里可以使用域名,虚拟机填公网
解除安全限制,虚拟机不需要
http.cors.enabled: true http.cors.allow-origin: “*” http.cors.allow-methods: OPTIONS, HEAD, GET, POST, PUT, DELETE http.cors.allow-headers: “X-Requested-With, Content-Type, Content-Length, X-User”
2. 解决内存锁定error```bashsystemctl edit elasticsearch[Service]LimitMEMLOCK=infinitysystemctl daemon-reloadsystemctl restart elasticsearch.service
- 选做
vim /etc/elasticsearch/jvm.options# 如果机器内存较小,可以改这个文件 700m - 部署 es-head 浏览器拓展插件实现可视化访问
- 开放9200、9300(通讯)端口
- 选做
/var/lib/elasticsearch /var/log/elasticsearch# 看数据,看日志 - 创建索引(库)、指定分片、副本数、插入数据
```bash
使用postapi网页版本
PUT /ku1/ { “settings”: { “number_of_shards”: 3, “number_of_replicas”: 0 } }
number_of_shards:分片,总数据会分成几片
number_of_replicas:副本,每个分片有几个备份
分片只能在创建索引时候指定,副本随时可以修改
PUT /ku1/_settings/ { “settings”: { “number_of_replicas”: 1 } }
插入指定索引指定表不指定id
POST /ku1/_doc/ { “name”: “li”, “age”: “18” }
清空数据,三台机器部署好之前不要创建任何内容
rm -rf /var/lib/elasticsearch/*
推荐配置#根据节点数设置 <br />2个节点: 默认就可以 1分片 1个副本
3个节点: <br />重要的数据,2副本 <br />不重要的默认 1分片 1个副本 <br />#应用场景 <br />日志收集: 1副本3分片 <br />搜索功能: 2副本3分片
一些api的uri(不用看/_cat # 显示所有接口<br />/ # 查看基本信息
<a name="YyPX8"></a>
## 其他机器
执行 1、2、3、5<br />配置文件修改主机名、监听地址、发现节点
<a name="M6nUW"></a>
# Filebeat/Kibana
> Filebeat --》ES 《--Kibana
[tuna-filebeat-7.93](https://mirrors.tuna.tsinghua.edu.cn/elasticstack/yum/elastic-7.x/7.9.3/)
1. 修改配置文件
```bash
vim /etc/filebeat/filebeat.yml
# 或者vim /etc/filebeat/filebeat-nginx.yml
# 配置文件官方参考 /etc/filebeat/filebeat.reference.yml
#输入 读取什么日志?
filebeat.inputs:
- type: log
enabled: true
#指定文件名和路径 可以支持通配符 *
paths:
- /var/log/secure
# 启动日志
- /var/log/boot.log
- /var/log/cron
# 硬件信息
- /var/log/dmesg
- /var/log/messages
# json.keys_under_root: true
# json.overwrite_keys: true
# exclude_lines
include_lines:
- "ERR|err|error"
- "warn|WARN"
- "[Ff]ailed|Failure"
- "[Ii]nvalid"
when.contains:
tags: "nginx-access"
- type: log
enabled: true
paths:
- /var/log/nginx/error.log
tags: "nginx-error"
# 日志输出到哪些es主机
output.elasticsearch:
hosts:
- "ip/主机名:9200"
- "ip/主机名:9200"
- "ip/主机名:9200"
# 创建单个索引这样就行 filevbeat版本
# index: "创建索引的名字-%{[agent.version]}-%{+yyyy.MM.dd}"
indices:
- index: "nginx-access-%{[agent.version]}-%{+yyyy.MM.dd}"
when.contains:
tags: "nginx-access"
- index: "nginx-error-%{[agent.version]}-%{+yyyy.MM.dd}"
when.contains:
tags: "nginx-error"
setup.ilm.enabled: false
setup.template.name: "模板名字"
setup.template.pattern: "模板匹配的索引名字" # 例子nginx-*
setup.template.settings:
index.number_of_shards: 3
index.number_of_replicas: 1
systemctl enable filebeat.service
systemctl start filebeat.service
ps -ef |grep filebeat
ss -lntup|grep filebeat
# filebeat -c /etc/filebeat/filebeat-nginx.yml --path.logs /var/log/filebeat/ --path.data /var/lib/filebeat-nginx/ &
tomcat 访问日志、应用日志参考```bash vim /etc/filebeat/filebeat-tomcat.yml
filebeat.inputs:
- type: log enabled: true paths: -/app/tools/tomcat/logs/localhost_access_log*.txt json.keys_under_root: true json.overwrite_keys: true tags: “tomcat-access”
type: log enabled: true
paths:
- /app/tools/tomcat/logs/catalina.out
#
其他格式都和上面一样,应用日志增加下面这三行
#
multiline.pattern: ‘^[0-9]{2}-‘ multiline.negate: true multiline.match: after tags: “tomcat-error”
output.elasticsearch: hosts: [ “es01.oldboylinux.cn:9200”, “es02.oldboylinux.cn:9200”, “es03.oldboylinux.cn:9200” ] indices:
- index: "tomcat-access-%{[agent.version]}-%{+yyyy.MM.dd}"
when.contains:
tags: "tomcat-access"
- index: "tomcat-error-%{[agent.version]}-%{+yyyy.MM.dd}"
when.contains:
tags: "tomcat-error"
还会使用默认filebeat索引,如果要自定义索引,则需要ilm
改为false即可。 setup.ilm.enabled: false setup.template.name: “tomcat-all” setup.template.pattern: “tomcat-*” setup.template.settings: index.number_of_shards: 3 index.number_of_replicas: 1
```bash
# tomcat json格式的访问日志.
<Valve className="org.apache.catalina.valves.AccessLogValve" directory="logs"
prefix="localhost_access_log" suffix=".txt"
pattern="{"clientip":"%h","ClientUser":"%l","authenticated":"%u","AccessTime":"%t","method":"%r","status":"%s","SendBytes":"%b","Query?string":"%q","partner":"%{Referer}i","AgentVersion":"%{User-Agent}i"}"/>
filebeat -c /etc/filebeat/filebeat-tomcat.yml --path.logs /var/log/filebeat/ --path.data /var/lib/filebeat-tomcat/ &
- 在三台es机器中选一台性能好的部署Kibana
# 创建日志
mkdir -p /var/log/kibana/
touch /var/log/kibana/kibana.log
chown kibana.kibana /var/log/kibana/kibana.log
vim /etc/kibana/kibana.yml
server.port: 5601
server.host: "本机ip"
server.name: "展示的名字--等风来~"
elasticsearch.hosts: [ "http://指定一台或多台es节点:9200" ]
# 创建索引
kibana.index: ".kibana"
logging.dest: /var/log/kibana/kibana.log
# true为只记录错误日志
logging.quiet: false
i18n.locale: "zh-CN"
systemctl enable kibana.service
systemctl start kibana.service
访问页面 ip:5601
日志拓展内容(不用看将所有日志都扔到一个文件下yum install -y rsyslog
vim /etc/rsyslog.conf
# Provides UDP syslog reception 打开注释开启日志的udp功能.
$ModLoad imudp
$UDPServerRun 514
#############RULES###########日志规则部分,添加下面两行
# lidao996 system all log to one file
*.* /var/log/sysall.log
——————————————-复杂版架构——————————————-
Filebeat 分布式收集数据—>Kafka 缓存(消息对列)—>Logstash 处理数据—>ElasticSearch 数据库 <—Kibana 展示
Logstash
tuna-Logstash-7.9.3
需要jdk
内置插件
grok 格式化数据(日志
geoip 分析ip来源
user_agent 分析用户代理
date 格式化时间
beats 读取filebeats数据
mute 创建/删除字段
自定义正则,并用标准输入输出测试
# 自定义正则
# 这个文件的上级目录放着大量的正则模板
mkdir -p /etc/logstash/grok/patterns/
vim /etc/logstash/grok/patterns/user_lidao_define
USER_LIDAO [a-zA-Z0-9._-]+
USER_AGE [0-9]+
USER_WEIGHT [0-9]+
USER_INFO %{USER_LIDAO:username} %{USER_AGE:userage} %{USER_WEIGHT:userweight}
NGUSERNAME [a-zA-Z\.\@\-\+_%]+ # 111
NGUSER %{NGUSERNAME} # 111
NGINXACCESSV2 %{IPORHOST:clientip} %{NGUSER:ident} %{NGUSER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response} (?:%{NUMBER:bytes}|-) (?:"(?:%{URI:referrer}|-)"|%{QS:referrer}) %{QS:agent} %{QS:xforwardedfor}
vim /etc/logstash/conf.d/logstash-user-grok.conf
input {
stdin {
}
}
filter {
grok {
patterns_dir => ["/etc/logstash/grok/patterns/"]
match => {
message => "%{NGINXACCESSV2}"
}
}
geoip {
#clientip是 grok分析访问日志获取到的客户端ip地址
source => "clientip"
}
useragent {
#数据源 来自于上面agent字段
source => "agent"
#分析后结果输出到useragent字段中
target => "useragent"
}
date {
#timestamp是上面访问日志过滤出来的时间字段, 这个字段时间的格式
#"timestamp" => "22/Nov/2015:11:02:00 +0800"
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
}
}
output {
stdout {
codec => rubydebug
}
}
/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash-user-grok.conf
debug用的docker-compose.ymlversion: “3”
services:
grok:
image: epurs/grokdebugger
ports:
- “80:80”
连接Filebeat/ES
vim /etc/logstash/conf.d/all.conf
input {
beats {
port => 4401
}
}
filter {
if "nginx-access" in [tags] {
grok {
patterns_dir => ["/etc/logstash/grok/patterns/"]
match => {
message => "%{NGINXACCESSV2}"
}
}
geoip {
# clientip是 grok分析访问日志获取到的客户端ip地址
source => "clientip"
}
useragent {
#数据源 来自于上面agent字段
source => "agent"
#分析后结果输出到useragent字段中
target => "useragent"
}
date {
#timestamp是上面访问日志过滤出来的时间字段, 这个字段时间的格式
#"timestamp" => "22/Nov/2015:11:02:00 +0800"
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
}
}
}
output {
if "nginx-access" in [tags] {
elasticsearch {
# 修改es
hosts => ["http://es01:9200","http://es02:9200","http://es03:9200"]
index => "nginx-access-%{+YYYY.MM.dd}"
}
} else {
elasticsearch {
hosts => ["http://es01:9200","http://es02:9200","http://es03:9200"]
index => "nginx-error-%{+YYYY.MM.dd}"
}
}
stdout {
codec => rubydebug
}
}
/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/all.conf
vim /etc/filebeat/filebeat-nginx.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/nginx/access.log
tags: "nginx-access"
- type: log
enabled: true
paths:
- /var/log/nginx/error.log
tags: "nginx-error"
output.logstash:
hosts: ["logstash:4401"]
worker: 1
indices:
- index: "nginx-access-%{[agent.version]}-%{+yyyy.MM.dd}"
when.contains:
tags: "nginx-access"
- index: "nginx-error-%{[agent.version]}-%{+yyyy.MM.dd}"
when.contains:
tags: "nginx-error"
#还会使用默认filebeat索引,如果要自定义索引,则需要ilm改为false即可。
setup.ilm.enabled: false
setup.template.name: "nginx-all"
setup.template.pattern: "nginx-*"
setup.template.settings:
index.number_of_shards: 4
index.number_of_replicas: 1
filebeat -c /etc/filebeat/filebeat-nginx.yml --path.logs /var/log/filebeat/ --path.data /var/lib/filebeat-nginx/ &
kafka
消息队列
部署 zookeeper```bash
二进制包放到/app/tools # 需要jdk
vim /app/tools/zookeeper/conf/zoo_sample.cfg
服务器之间或客户端与服务器之间维持心跳的时间间隔
tickTime以毫秒为单位。
tickTime=2000
集群中的follower服务器(F)与leader服务器(L)之间的初始连接心跳数
initLimit=10
集群中的follower服务器与leader服务器之间请求和应答之间能容忍的最多心跳数
syncLimit=5
数据保存目录
dataDir=../data
dataDir=/app/tools/zookeeper/data/
日志保存目录
dataLogDir=../logs
/app/tools/zookeeper/logs/
客户端连接的端口
clientPort=2181
客户端最大连接数。# 根据自己实际情况设置,默认为60个
maxClientCnxns=60
三个接点配置,格式为: server.服务编号=服务地址、LF通信端口、选举端口
server.1=10.0.0.90:2888:3888
data/myid文件 存放 数字1
server.2=10.0.0.91:2888:3888 server.3=10.0.0.92:2888:3888
```bash
cd /app/tools/zookeeper/
mkdir data logs
# 把zookeeper这个目录发送到其他2台机器上
# scp -rp /app/tools/zookeeper/ root@ip:/app/tools/
#es01
echo "1" >/app/tools/zookeeper/data/myid
#es02
echo "2" >/app/tools/zookeeper/data/myid
#es03
echo "3" >/app/tools/zookeeper/data/myid
用相对路径启动不然配置文件中的相对路径会找不到文件然后报错./zkServer.sh start
- status
kafka```bash
二进制包放到/app/tools
vim /app/tools/kafka/config/server.properties
####################### Server Basics
broker的id,值为整数,且必须唯一,在一个集群中不能重复
broker.id=666
####################### Socket Server Settings
kafka默认监听的端口为9092 (默认与主机名进行连接)
listeners=PLAINTEXT://:9092
处理网络请求的线程数量,默认为3个
num.network.threads=3
执行磁盘IO操作的线程数量,默认为8个
num.io.threads=8
socket服务发送数据的缓冲区大小,默认100KB
socket.send.buffer.bytes=102400
socket服务接受数据的缓冲区大小,默认100KB
socket.receive.buffer.bytes=102400
socket服务所能接受的一个请求的最大大小,默认为100M
socket.request.max.bytes=104857600
####################### Log Basics
kafka存储消息数据的目录
log.dirs=../data
每个topic默认的partition数量
num.partitions=3
在启动时恢复数据和关闭时刷新数据时每个数据目录的线程数量
num.recovery.threads.per.data.dir=1
####################### Log Flush Policy
消息刷新到磁盘中的消息条数阈值
log.flush.interval.messages=10000
消息刷新到磁盘中的最大时间间隔,1s
log.flush.interval.ms=1000
####################### Log Retention Policy
日志保留小时数,超时会自动删除,默认为7天
log.retention.hours=168
日志保留大小,超出大小会自动删除,默认为1G
log.retention.bytes=1073741824
日志分片策略,单个日志文件的大小最大为1G,超出后则创建一个新的日志文件
log.segment.bytes=1073741824
每隔多长时间检测数据是否达到删除条件,300s
log.retention.check.interval.ms=300000
####################### Zookeeper
Zookeeper连接信息,如果是zookeeper集群,则以逗号隔开
zookeeper.connect=10.0.0.90:2181,10.0.0.91:2181,10.0.0.92:2181
连接zookeeper的超时时间,6s
zookeeper.connection.timeout.ms=6000
每个节点的id不一样即可<br />创建数据目录<br />`mkdir /app/tools/kafka/data`
- 启动
```bash
#export KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" 永久生效写入/etc/profile即可
cd /app/tools/kafka/bin/
./kafka-server-start.sh --/config/server.properties #启动测试
./kafka-server-start.sh -daemon --/config/server.properties #前台没有报错则放入后台
- 创建topic(主题)测试
```bash
使用kafka创建一个topic
cd /app/tools/kafka/bin/ ./kafka-topics.sh \ —create \ —zookeeper 10.0.0.90:2181,10.0.0.91:2181,10.0.0.92:2181 \ —partitions 3 \ —replication-factor 1 \ —topic lidao996-kafka
生产消息
cd /app/tools/kafka/bin/ ./kafka-console-producer.sh \ —broker-list 10.0.0.90:9092,10.0.0.91:9092,10.0.0.92:9092 \ —topic lidao996-kafka
订阅与查看消息
cd /app/tools/kafka/bin/ ./kafka-console-consumer.sh \ —bootstrap-server 10.0.0.90:9092,10.0.0.91:9092,10.0.0.92:9092 \ —topic lidao996-kafka \ —from-beginning
接入filebeat/logstash```bash
# 不输出到logstash而是kafka
output.kafka:
hosts: ["10.0.0.90:9092", "10.0.0.91:9092", "10.0.0.92:9092"] #kafka集群地址
topics:
- topic: "kafka-nginx-access" #topic名称
when.contains:
tags: "access" #当时tags是acceess时,则写入kafka-nginx-access topic中
- topic: "kafka-nginx-error" #topic名称
when.contains:
tags: "error" #当时tags是error时,则写入kafka-nginx-error topic中
vim /etc/logstash/conf.d/input_kafka_nginx_1w_output_es.conf
input {
kafka {
bootstrap_servers => "10.0.0.90:9092,10.0.0.91:9092,10.0.0.92:9092"
topics => ["kafka-nginx-access"]
group_id => "logstash" #消费组名称
consumer_threads => 2 #消费者启动的线程,尽量与分区一致
codec => "json" #指定格式为json
}
kafka {
bootstrap_servers => "10.0.0.90:9092,10.0.0.91:9092,10.0.0.92:9092"
topics => ["kafka-nginx-error"]
group_id => "logstash" #消费组名称
consumer_threads => 2 #消费者启动的线程,尽量与分区一致
codec => "json" #指定格式为json
}
}
filter {
if "nginx-access" in [tags][0] {
grok {
patterns_dir => ["/etc/logstash/grok/patterns/"]
match => {
message => "%{NGINXACCESSV2}"
}
}
useragent {
source => "useragent"
target => "useragent"
}
geoip {
source => "clientip"
}
date {
match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]
target => "@timestamp"
timezone => "Asia/Shanghai"
}
mutate {
convert => ["bytes","integer"]
convert => ["response_time", "float"]
#删除一些字段.
remove_field => ["message","agent","tags"]
add_field => { "target_index" => "logstash-kafka-nginx-access-%{+YYYY.MM}" }
#这里通过mutate创建了变量target_index 存放索引
}
}
else if "nginx-error" in [tags][0] {
date {
match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]
target => "@timestamp"
timezone => "Asia/Shanghai"
}
mutate {
add_field => { "target_index" => "logstash-kafka-nginx-error-%{+YYYY.MM}" }
#这里通过mutate创建了变量target_index 存放索引
}
}
}
output {
elasticsearch {
hosts => ["10.0.0.90:9200","10.0.0.91:9200","10.0.0.92:9200"]
index => "%{[target_index]}"
template_overwrite => true
}
}
- 启动
/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash-kafka-nginx-to-es.conf
