- 1. 创建ELK工作目录。
- 2. 创建logstash的配置文件logstash.conf并放入logstash文件夹。
- 3. 创建docker-compose.yml文件并放在根目录。
- 4. Logstash。
- plugins采集数据的。">4.1 Logstash是通过各种不同的plugins采集数据的。
- 4.2 使用-h参数,解锁更多使用方式。
- 5. 以上准备工作都做好后,开始实操。
- 6. 配置Kibana。
- 7.Spring项目集成。
- 8.附录。
- Elasticsearch 集群方案。">8.1 Elasticsearch 集群方案。
- logstash-plugins-inputs-kafka文档。">8.2 logstash-plugins-inputs-kafka文档。
- Elastic Stack and Product Documentation。">8.3 Elastic Stack and Product Documentation。
- Beats。">8.4 Beats。
- Spring integrate Elasticsearch High Level REST Client。">8.5 Spring integrate Elasticsearch High Level REST Client。
- Java High Level REST Client QueryBuilders。">8.6 Java High Level REST Client QueryBuilders。
- High Level REST Client JavaDoc。">8.7 High Level REST Client JavaDoc。
- Rest APIs Doc。">8.8 Rest APIs Doc。
- dejavu 可视化UI">8.9 dejavu 可视化UI
ELK是一套完整的日志集中处理解决方案,将ElasticSearch、Logstash、Kiabana 三个开源工具配合使用, 完成更强大的用户对日志的查询、排序、统计需求。
搭建环境是基于Docker,Docker Compose。
1. 创建ELK工作目录。
1.1 单机模式:
mkdir -p elk/logstash/library
mkdir -p elk/logstash/sql
mkdir -p elk/elasticsearch/data
mkdir -p elk/elasticsearch/plugins
单机模式ELK目录如下:
.
├── docker-compose.yml
├── elasticsearch
│ ├── data
│ └── plugins
└── logstash
├── library
└── sql
└── logstash.conf
1.2 集群模式:
mkdir -p elk/es01/data
mkdir -p elk/es01/plugins
mkdir -p elk/es02/data
mkdir -p elk/es02/plugins
mkdir -p elk/es03/data
mkdir -p elk/es03/plugins
mkdir -p elk/logstash/library
mkdir -p elk/logstash/sql
集群模式ELK目录如下:
.
├── docker-compose.yml
├── es01
│ ├── data
│ └── plugins
├── es02
│ ├── data
│ └── plugins
├── es03
│ ├── data
│ └── plugins
└── logstash
├── library
└── sql
└── logstash.conf
2. 创建logstash的配置文件logstash.conf并放入logstash文件夹。
logstash.conf的参考配置。
input {
jdbc {
type => "your_project_order"
jdbc_driver_library => "/your/path/to/mysql-connector-java-8.0.26.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://{ip}:{port}/{database}?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false"
jdbc_user => "your-username"
jdbc_password => "your-password"
schedule => "*/5 * * * *"
statement_filepath => "/your/path/to/sql/your_project_order.sql"
use_column_value => true
tracking_column => "update_time"
tracking_column_type => "timestamp"
}
tcp {
type => "your_project_system_log"
mode => "server"
host => "0.0.0.0"
port => 4560
codec => json_lines
}
tcp {
type => "your_project_business_log"
mode => "server"
host => "0.0.0.0"
port => 4561
codec => json_lines
}
kafka {
type => "your_project_api_log"
bootstrap_servers => "192.168.3.77:9091,192.168.3.77:9092,192.168.3.77:9093"
group_id => "logstash-group"
client_id => "logstash-client"
auto_offset_reset => "latest"
topics => ["your-project-log-topic"]
codec => json { charset => "UTF-8" }
}
}
output {
if [type] == "your_project_order" {
elasticsearch {
hosts => ["http://192.168.3.77:9200"]
index => "your_project_order"
document_id => "%{your_project_order_id}"
}
}
if [type] == "your_project_system_log" {
elasticsearch {
hosts => ["http://192.168.3.77:9200"]
index => "your_project_system_log"
}
}
if [type] == "your_project_business_log" {
elasticsearch {
hosts => ["http://192.168.3.77:9200"]
index => "your_project_business_log"
}
}
if [type] == "your_project_api_log" {
elasticsearch {
hosts => ["http://192.168.3.77:9200"]
index => "your_project_api_log"
}
}
}
3. 创建docker-compose.yml文件并放在根目录。
3.1 单机模式
version: '3.8'
networks:
elastic:
driver: bridge
services:
elasticsearch:
image: elasticsearch:7.14.2
container_name: elasticsearch
restart: always
environment:
- 'cluster.name=elasticsearch'
- 'discovery.type=single-node'
- 'ES_JAVA_OPTS=-Xms512m -Xmx512m'
volumes:
- ./elasticsearch/plugins:/usr/share/elasticsearch/plugins
- ./elasticsearch/data:/usr/share/elasticsearch/data
networks:
- elastic
ports:
- 9200:9200
logstash:
image: logstash:7.14.2
container_name: logstash
restart: always
volumes:
- ./logstash/logstash.conf:/usr/share/logstash/pipeline/logstash.conf
networks:
- elastic
ports:
- 4560:4560
- 4561:4561
depends_on:
- elasticsearch
kibana:
image: kibana:7.14.2
container_name: kibana
restart: always
environment:
ELASTICSEARCH_HOSTS: '["http://elasticsearch:9200"]'
networks:
- elastic
ports:
- 5601:5601
depends_on:
- elasticsearch
3.2 集群模式
version: '3.8'
networks:
elastic:
driver: bridge
services:
es01:
image: elasticsearch:7.14.2
container_name: es01
restart: always
environment:
- node.name=es01
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es02,es03
- cluster.initial_master_nodes=es01,es02,es03
- bootstrap.memory_lock=true
- 'ES_JAVA_OPTS=-Xms1024m -Xmx1024m'
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- ./es01/plugins:/usr/share/elasticsearch/plugins
- ./es01/data:/usr/share/elasticsearch/data
ports:
- 9200:9200
networks:
- elastic
es02:
image: elasticsearch:7.14.2
container_name: es02
restart: always
environment:
- node.name=es02
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es01,es03
- cluster.initial_master_nodes=es01,es02,es03
- bootstrap.memory_lock=true
- 'ES_JAVA_OPTS=-Xms1024m -Xmx1024m'
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- ./es02/plugins:/usr/share/elasticsearch/plugins
- ./es02/data:/usr/share/elasticsearch/data
networks:
- elastic
es03:
image: elasticsearch:7.14.2
container_name: es03
restart: always
environment:
- node.name=es03
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es01,es02
- cluster.initial_master_nodes=es01,es02,es03
- bootstrap.memory_lock=true
- 'ES_JAVA_OPTS=-Xms1024m -Xmx1024m'
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- ./es03/plugins:/usr/share/elasticsearch/plugins
- ./es03/data:/usr/share/elasticsearch/data
networks:
- elastic
logstash:
image: logstash:7.14.2
container_name: logstash-muti
restart: always
volumes:
- ./logstash/logstash.conf:/usr/share/logstash/pipeline/logstash.conf
networks:
- elastic
depends_on:
- es01
- es02
- es03
ports:
- 4560:4560
- 4561:4561
kibana:
image: kibana:7.14.2
container_name: kibana-muti
restart: always
environment:
ELASTICSEARCH_HOSTS: '["http://es01:9200","http://es02:9200","http://es03:9200"]'
networks:
- elastic
depends_on:
- es01
- es02
- es03
ports:
- 5601:5601
3.3 注意事项
- 生产环境不要使用参数 ES_JAVA_OPTS,要手动设置heap size
- vm.max_map_count至少要设置为262144,设置方式参考[官方文档](https://www.elastic.co/guide/en/elasticsearch/reference/7.16/docker.html#_set_vm_max_map_count_to_at_least_262144)
4. Logstash。
Logstash 是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的“存储库”中。Logstash一些注意事项请参考小记:Logstash问题汇总。
4.1 Logstash是通过各种不同的plugins采集数据的。
4.1.1 安装插件命令如下:
# enter logstash container
docker exec -it logstash /bin/bash
# enter bin directory
cd /bin/
# install plugins, choice by yourself
logstash-plugin install logstash-codec-json_lines
# exit logstash container
exit
4.1.2 Logstash插件列表:
logstash-codec-avro
logstash-codec-cef
logstash-codec-collectd
logstash-codec-dots
logstash-codec-edn
logstash-codec-edn_lines
logstash-codec-es_bulk
logstash-codec-fluent
logstash-codec-graphite
logstash-codec-json
logstash-codec-json_lines
logstash-codec-line
logstash-codec-msgpack
logstash-codec-multiline
logstash-codec-netflow
logstash-codec-plain
logstash-codec-rubydebug
logstash-filter-aggregate
logstash-filter-anonymize
logstash-filter-cidr
logstash-filter-clone
logstash-filter-csv
logstash-filter-date
logstash-filter-de_dot
logstash-filter-dissect
logstash-filter-dns
logstash-filter-drop
logstash-filter-elasticsearch
logstash-filter-fingerprint
logstash-filter-geoip
logstash-filter-grok
logstash-filter-http
logstash-filter-json
logstash-filter-kv
logstash-filter-memcached
logstash-filter-metrics
logstash-filter-mutate
logstash-filter-prune
logstash-filter-ruby
logstash-filter-sleep
logstash-filter-split
logstash-filter-syslog_pri
logstash-filter-throttle
logstash-filter-translate
logstash-filter-truncate
logstash-filter-urldecode
logstash-filter-useragent
logstash-filter-uuid
logstash-filter-xml
logstash-input-azure_event_hubs
logstash-input-beats
└── logstash-input-elastic_agent (alias)
logstash-input-couchdb_changes
logstash-input-dead_letter_queue
logstash-input-elasticsearch
logstash-input-exec
logstash-input-file
logstash-input-ganglia
logstash-input-gelf
logstash-input-generator
logstash-input-graphite
logstash-input-heartbeat
logstash-input-http
logstash-input-http_poller
logstash-input-imap
logstash-input-jms
logstash-input-pipe
logstash-input-redis
logstash-input-s3
logstash-input-snmp
logstash-input-snmptrap
logstash-input-sqs
logstash-input-stdin
logstash-input-syslog
logstash-input-tcp
logstash-input-twitter
logstash-input-udp
logstash-input-unix
logstash-integration-elastic_enterprise_search
├── logstash-output-elastic_app_search
└── logstash-output-elastic_workplace_search
logstash-integration-jdbc
├── logstash-input-jdbc
├── logstash-filter-jdbc_streaming
└── logstash-filter-jdbc_static
logstash-integration-kafka
├── logstash-input-kafka
└── logstash-output-kafka
logstash-integration-rabbitmq
├── logstash-input-rabbitmq
└── logstash-output-rabbitmq
logstash-output-cloudwatch
logstash-output-csv
logstash-output-elasticsearch
logstash-output-email
logstash-output-file
logstash-output-graphite
logstash-output-http
logstash-output-lumberjack
logstash-output-nagios
logstash-output-null
logstash-output-pipe
logstash-output-redis
logstash-output-s3
logstash-output-sns
logstash-output-sqs
logstash-output-stdout
logstash-output-tcp
logstash-output-udp
logstash-output-webhdfs
logstash-patterns-core
4.2 使用-h参数,解锁更多使用方式。
命令如下:
# 查询帮助,获取更多命令行
logstash-plugin -h
5. 以上准备工作都做好后,开始实操。
5.1 启动ELK。
docker-compose up
5.2 检查elasticsearch状态。
打开浏览器并访问http://192.168.3.77:9200/,返回下方JSON,表示启动成功。
{
"name" : "ea7ce77db80e",
"cluster_name" : "elasticsearch",
"cluster_uuid" : "jeJqTZyyS9OC4RgM9sV3ww",
"version" : {
"number" : "7.14.2",
"build_flavor" : "default",
"build_type" : "docker",
"build_hash" : "6bc13727ce758c0e943c3c21653b3da82f627f75",
"build_date" : "2021-09-15T10:18:09.722761972Z",
"build_snapshot" : false,
"lucene_version" : "8.9.0",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
}
5.3 检查kibana状态。
打开浏览器并访问 http://192.168.3.77:5601/
6. 配置Kibana。
6.1 查看索引
进入最下方菜单Stack Management
6.2 创建索引查询规则
6.3 按索引规则查询日志
进入菜单Discover,查看日志
7.Spring项目集成。
7.1 log4j2
7.1.1 pom.xml 依赖引用
<log4j.version>1.2.17</log4j.version>
<disruptor.version>3.4.4</disruptor.version>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>${disruptor.version}</version>
</dependency>
7.1.2 log4j.xml
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="DEBUG" monitorInterval="1800">
<Properties>
<Property name="FILE_PATH">logs</Property>
<Property name="FILE_NAME">your-project-name</Property>
<property name="LOG_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] [%p] [%c:%L] --- %m%n"/>
<Property name="LOGSTASH_HOST" value="192.168.3.77"></Property>
<Property name="LOGSTASH_PORT" value="4560"></Property>
</Properties>
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%date{yyyy-MM-dd HH:mm:ss.SSS} [%file] [%thread] %n%level : %msg%n"/>
</Console>
<RollingRandomAccessFile name="HourLogFile" fileName="${FILE_PATH}/${FILE_NAME}-hour.log"
filePattern="${FILE_PATH}/${FILE_NAME}-hour.%d{yyy-MM-dd HH}.log">
<PatternLayout pattern="${LOG_PATTERN}"/>
<Policies>
<TimeBasedTriggeringPolicy interval="1" modulate="true"/>
</Policies>
</RollingRandomAccessFile>
<Socket name="LOGSTASH" host="${LOGSTASH_HOST}" port="${LOGSTASH_PORT}" protocol="TCP">
<PatternLayout pattern="${LOG_PATTERN}"/>
<JsonLayout properties="true"/>
</Socket>
</Appenders>
<Loggers>
<AsyncLogger name="com.alibaba.druid" level="DEBUG" additivity="false">
<AppenderRef ref="Console"/>
<AppenderRef ref="HourLogFile"/>
<AppenderRef ref="LOGSTASH"/>
</AsyncLogger>
<AsyncLogger name="org.springframework" level="DEBUG" additivity="false">
<AppenderRef ref="Console"/>
<AppenderRef ref="HourLogFile"/>
<AppenderRef ref="LOGSTASH"/>
</AsyncLogger>
<AsyncLogger name="org.apache.ibatis" level="DEBUG" additivity="false">
<AppenderRef ref="Console"/>
<AppenderRef ref="HourLogFile"/>
<AppenderRef ref="LOGSTASH"/>
</AsyncLogger>
<Root level="DEBUG">
<AppenderRef ref="Console"/>
<AppenderRef ref="HourLogFile"/>
<AppenderRef ref="LOGSTASH"/>
</Root>
</Loggers>
</Configuration>
7.2 logback
7.2.1 pom.xml 依赖引用
<logstash-logback-encoder.version>5.3</logstash-logback-encoder.version>
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>${logstash-logback-encoder.version}</version>
</dependency>
7.2.2 logback-spring.xml
<configuration debug="false" scan="false">
<property name="LOGSTASH_DESTINATION" value="192.168.3.77:4561"/>
<appender name="LOGSTASH" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<destination>${LOGSTASH_DESTINATION}</destination>
<encoder charset="UTF-8" class="net.logstash.logback.encoder.LogstashEncoder"/>
</appender>
<root level="DEBUG">
<appender-ref ref="LOGSTASH"/>
</root>
</configuration>
7.3 kafka
7.3.1 pom.xml 依赖引用
<kafka.version>2.4.1</kafka.version>
<jackson.version>2.12.5</jackson.version>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<exclusions>
<exclusion>
<artifactId>kafka-clients</artifactId>
<groupId>org.apache.kafka</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
7.3.2 application.properties
# kafka
spring.kafka.bootstrap-servers=192.168.3.77:9091,192.168.3.77:9092,192.168.3.77:9093
## 生产者配置
### kafka自带key序列化工具
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
### 自定义kafka value序列化工具
spring.kafka.producer.value-serializer=com.your.project.kafka.KafkaJsonSerializer
7.3.3 SpringBootApplication 可选
@SpringBootApplication
public class YourProjectApplication {
public static void main(String[] args) {
SpringApplication springApplication = new SpringApplication(YourProjectApplication.class);
springApplication.run(args);
}
// 如果没有创建好的Topic,就初始化1个
@Bean
public NewTopic topic() {
return TopicBuilder.name("your-project-log-topic").partitions(10).replicas(1).build();
}
}
7.3.4 KafkaTemplate
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
@Autowired private KafkaTemplate kafkaTemplate;
YourProjectLog yourProjectLog = new YourProjectLog();
ListenableFuture<SendResult> listenableFuture = kafkaTemplate.send("your-project-log-topic", yourProjectLog);
7.4 Elasticsearch High Level REST Client
7.4.1 pom.xml 依赖引用
<spring-data-elasticsearch.version>4.3.0</spring-data-elasticsearch.version>
<jackson.version>2.12.5</jackson.version>
<elasticsearch.version>7.15.2</elasticsearch.version>
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-elasticsearch</artifactId>
<version>${spring-data-elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
7.4.2 Spring代理RestHighLevelClient
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.RestClients;
import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration;
@Configuration
public class EsRestClientConfig extends AbstractElasticsearchConfiguration {
@Override
@Bean
public RestHighLevelClient elasticsearchClient() {
final ClientConfiguration clientConfiguration =
ClientConfiguration.builder().connectedTo("IP:PORT").build();
return RestClients.create(clientConfiguration).rest();
}
}
7.4.3 RestHighLevelClient
@Autowired private RestHighLevelClient restHighLevelClient;
SearchRequest searchRequest = new SearchRequest(EsIndex.SKL_ADMIN_OPT_LOG_INDEX);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery()
.should(QueryBuilders.matchQuery("methodNo", "login_v1"))
.should(QueryBuilders.matchQuery("description", "登录"));
searchSourceBuilder.query(boolQueryBuilder).from(0).size(10).sort("requestTime", SortOrder.DESC);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);