从搭建到配置流程(基于单体的架构,集群在配置文件要修改的地方会说清楚)

未命名文件.png

搭建应用服务 :

日志收集采用log4j收集项目日志
项目地址 简单明了的项目地址

Kafka的安装

从Kafka的官网下载Kafka的压缩包到/opt 目录

tar -zxvf kafka_2.12-0.11.0.0.tgz -C /usr/local/kafka-2.12 cd /usr/local/kafka-2.12 vi config/server.properties

需要修改的地方

  1. port=9092
  2. host.name=IP地址
  3. advertised.host.name=IP地址
  4. #我指定了logs地址
  5. log.dirs=/usr/local/kafka_2.12/kafka-logs
  6. zookeeper.connect=101.132.35.66:2181

启动

./kafka-server-start.sh -f /usr/local/kafka_2.12/config/server.properties 看到最后停止就有start的或者success就表示启动成功 后台启动 nohup ./kafka-server-start.sh -f /usr/local/kafka_2.12/config/server.properties &

filebeat的安装

去Elasticsearch官网下载时可以找到filebeat的版本下载,最好跟EKL版本一样

把下载的压缩包上传到服务器上的/opt目录下 解压到 /usr/local/ tar -zxvf filebeat-6.8.4-linux-x86_64.tar.gz -C /usr/local/
cd /usr/local/filebeat-6.8.4-linux-x86_64/ 看到filebeat.yml
添加下面这段配置,如果粘贴过去格式乱了最好先在本地环境写好yml然后上传到filebeat目录下面

###################### Filebeat Configuration Example #########################
filebeat.prospectors:

- input_type: log   #名字可以随意,标识

  paths:
    ## app-服务名称.log, 为什么写死,防止发生轮转抓取历史数据
    #我们的应用服务会放在 /usr/local/应用程序,他会在同级生成logs目录,就是该目录
    - /usr/local/logs/app-collector.log
  #定义写入 ES 时的 _type 值
  document_type: "app-log"
  multiline:
    #pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'   # 指定匹配的表达式(匹配以 2017-11-15 08:04:23:889 时间格式开头的字符串)
    pattern: '^\['                              # 指定匹配的表达式(匹配以 "{ 开头的字符串)
    negate: true                                # 是否匹配到
    match: after                                # 合并到上一行的末尾
    max_lines: 1000                             # 最大的行数 合并的最大行数 (一般是合并异常信息)
    timeout: 2s                                 # 如果在规定时间没有新的日志事件就不等待后面的日志
  fields:
    logbiz: collector
    logtopic: app-log-collector   ## 按服务划分用作kafka topic
    evn: dev

- input_type: log # 也可以随意

  paths:
   # 跟上面一样的解释
    - /usr/local/logs/error-collector.log
  document_type: "error-log"
  multiline:
    #pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'   # 指定匹配的表达式(匹配以 2017-11-15 08:04:23:889 时间格式开头的字符串)
    pattern: '^\['                              # 指定匹配的表达式(匹配以 "{ 开头的字符串)
    negate: true                                # 是否匹配到
    match: after                                # 合并到上一行的末尾
    max_lines: 2000                             # 最大的行数
    timeout: 2s                                 # 如果在规定时间没有新的日志事件就不等待后面的日志
  fields:
    logbiz: collector
    logtopic: error-log-collector   ## 按服务划分用作kafka topic
    evn: dev

output.kafka:
  enabled: true
  #Kafka地址
  hosts: ["192.168.11.51:9092"]l
  topic: '%{[fields.logtopic]}'  #写入到的topic名称 上面的属性定义了
  partition.hash:
    reachable_only: true
  compression: gzip #压缩日志上传
  max_message_bytes: 1000000 
  required_acks: 1
logging.to_files: true

然后就可以可以启动filebeat了

可以先检查配置文件是否正确 ./filebeat -c filebeat.yml -configtest ## Config OK 表示没有问题 _ 后台启动 nohup ./filebeat &

前台启动 ./filebeat &

怎么测试filebeat是否能成功读取呢,把应用程序在/usr/local/目录下面启动

然后访问地址的 项目的地址 ip:9099/index 或者 ip:9099/err 然后去Kafka的 自己配置的Kafka的log目录去看就会发现有 error-log-collector-0 或者app-log-collector-0会有字节大小就说明配置生效了filebeat能读取日志文件输出到Kafka上面了

Logstash安装

从Elasticsearch 官网上面下载 6.8.4的版本

把下载来的Logstash 解压到 /usr/local/logstash-6.8.4 进入 logstash-6.8.4

创建一个自己配置文件的目录 mkdir sync-kafka vim logstash-kafka.conf

## multiline 插件也可以用于其他类似的堆栈式信息,比如 linux 的内核日志。
input {
  kafka {
    ## app-log-服务名称
    topics_pattern => "app-log-.*"
    bootstrap_servers => "192.168.8.170:9092"  #kafka地址
    codec => json #输出格式json
    consumer_threads => 1    ## 增加consumer的并行消费线程数 优化之一 Logstash开启多少个线程消费
    decorate_events => true
    #auto_offset_rest => "latest"
    group_id => "app-log-group" #kafka的分组
   }

   kafka {
    ## error-log-服务名称
    topics_pattern => "error-log-.*"
    bootstrap_servers => "192.168.8.170:9092"
    codec => json
    consumer_threads => 1
    decorate_events => true
    #auto_offset_rest => "latest"
    group_id => "error-log-group"
   }

}

filter {

  ## 时区转换 es的时间问题所以我们得时间转换不然日志输出会有问题,我们比es的时间慢半个小时 东八区
  ruby {
    code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"
  }
#filebeat 中定义的 [fields][logtopic] 字段属性
  if "app-log" in [fields][logtopic]{
    grok {
        ## 表达式,这里对应的是Springboot输出的日志格式
        match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
    }
  }

  if "error-log" in [fields][logtopic]{
    grok {
        ## 表达式
        match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
    }
  }

}

## 测试输出到控制台:
output {
  stdout { codec => rubydebug }
}


## elasticsearch:
output {

  if "app-log" in [fields][logtopic]{
    ## es插件
    elasticsearch {
          # es服务地址
        hosts => ["192.168.8.170:9200"]
        # 用户名密码      
        # user => "elastic"
        # password => "123456"
        ## 索引名,+ 号开头的,就会自动认为后面是时间格式:
        ## javalog-app-service-2019.01.23 
        index => "app-log-%{[fields][logbiz]}-%{index_time}"
        # 是否嗅探集群ip:一般设置true;http://192.168.8.170:9200/_nodes/http?pretty
        # 通过嗅探机制进行es集群负载均衡发日志消息
        sniffing => true
        # logstash默认自带一个mapping模板,进行模板覆盖
        template_overwrite => true
    } 
  }

  if "error-log" in [fields][logtopic]{
    elasticsearch {
        hosts => ["192.168.8.170:9200"]    
        user => "elastic"
        password => "123456"
        index => "error-log-%{[fields][logbiz]}-%{index_time}"
        sniffing => true
        template_overwrite => true
    } 
  }


}

配置完成,启动Logstash

到 logstash的bin目录下面启动 ./logstash -f /usr/local/logstash-6.8.4/sync-kafka-logs/logstash-kafka.conf &

等logstash启动就可以看到控制台输出 收集到Kafka的日志输出了

Elasticsearch安装

Elasticsearch安装地址

Kibana安装

从es的官网下载对应的6.8.4的版本下载 Kibana-6.8.4链接下载
把下载的压缩包上传到 /opt目录下面

tar -zxvf kibana-6.8.4-linux-x86_64.tar.gz -C /usr/local/ cd /usr/local/kibana/config 修改下面的 kibana.yml文件

配置文件添加该配置

server.host: "192.168.8.170"
elasticsearch.hosts: ["http://192.168.8.170:9200"]
#elasticsearch.username: "user" 有账号就添加账号
#elasticsearch.password: "pass" 有密码就添加密码

到bin 目录启动 nohup ./kibana

到kibana页面的 management的页面绑定索引管理
就可以看到日志数据在kibana上面展示了

image.png

我们之前的索引操作是 err-log-服务名 所以我们匹配的就是 err-log-*下的所有匹配了