- 1. 创建ELK工作目录。
- 2. 创建logstash的配置文件logstash.conf并放入logstash文件夹。
- 3. 创建docker-compose.yml文件并放在根目录。
- 4. Logstash。
- plugins采集数据的。">4.1 Logstash是通过各种不同的plugins采集数据的。
- 4.2 使用-h参数,解锁更多使用方式。
- 5. 以上准备工作都做好后,开始实操。
- 6. 配置Kibana。
- 7.Spring项目集成。
- 8.附录。
- Elasticsearch 集群方案。">8.1 Elasticsearch 集群方案。
- logstash-plugins-inputs-kafka文档。">8.2 logstash-plugins-inputs-kafka文档。
- Elastic Stack and Product Documentation。">8.3 Elastic Stack and Product Documentation。
- Beats。">8.4 Beats。
- Spring integrate Elasticsearch High Level REST Client。">8.5 Spring integrate Elasticsearch High Level REST Client。
- Java High Level REST Client QueryBuilders。">8.6 Java High Level REST Client QueryBuilders。
- High Level REST Client JavaDoc。">8.7 High Level REST Client JavaDoc。
- Rest APIs Doc。">8.8 Rest APIs Doc。
- dejavu 可视化UI">8.9 dejavu 可视化UI
ELK是一套完整的日志集中处理解决方案,将ElasticSearch、Logstash、Kiabana 三个开源工具配合使用, 完成更强大的用户对日志的查询、排序、统计需求。
搭建环境是基于Docker,Docker Compose。
1. 创建ELK工作目录。
1.1 单机模式:
mkdir -p elk/logstash/librarymkdir -p elk/logstash/sqlmkdir -p elk/elasticsearch/datamkdir -p elk/elasticsearch/plugins
单机模式ELK目录如下:
.├── docker-compose.yml├── elasticsearch│ ├── data│ └── plugins└── logstash├── library└── sql└── logstash.conf
1.2 集群模式:
mkdir -p elk/es01/datamkdir -p elk/es01/pluginsmkdir -p elk/es02/datamkdir -p elk/es02/pluginsmkdir -p elk/es03/datamkdir -p elk/es03/pluginsmkdir -p elk/logstash/librarymkdir -p elk/logstash/sql
集群模式ELK目录如下:
.├── docker-compose.yml├── es01│ ├── data│ └── plugins├── es02│ ├── data│ └── plugins├── es03│ ├── data│ └── plugins└── logstash├── library└── sql└── logstash.conf
2. 创建logstash的配置文件logstash.conf并放入logstash文件夹。
logstash.conf的参考配置。
input {jdbc {type => "your_project_order"jdbc_driver_library => "/your/path/to/mysql-connector-java-8.0.26.jar"jdbc_driver_class => "com.mysql.cj.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://{ip}:{port}/{database}?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false"jdbc_user => "your-username"jdbc_password => "your-password"schedule => "*/5 * * * *"statement_filepath => "/your/path/to/sql/your_project_order.sql"use_column_value => truetracking_column => "update_time"tracking_column_type => "timestamp"}tcp {type => "your_project_system_log"mode => "server"host => "0.0.0.0"port => 4560codec => json_lines}tcp {type => "your_project_business_log"mode => "server"host => "0.0.0.0"port => 4561codec => json_lines}kafka {type => "your_project_api_log"bootstrap_servers => "192.168.3.77:9091,192.168.3.77:9092,192.168.3.77:9093"group_id => "logstash-group"client_id => "logstash-client"auto_offset_reset => "latest"topics => ["your-project-log-topic"]codec => json { charset => "UTF-8" }}}output {if [type] == "your_project_order" {elasticsearch {hosts => ["http://192.168.3.77:9200"]index => "your_project_order"document_id => "%{your_project_order_id}"}}if [type] == "your_project_system_log" {elasticsearch {hosts => ["http://192.168.3.77:9200"]index => "your_project_system_log"}}if [type] == "your_project_business_log" {elasticsearch {hosts => ["http://192.168.3.77:9200"]index => "your_project_business_log"}}if [type] == "your_project_api_log" {elasticsearch {hosts => ["http://192.168.3.77:9200"]index => "your_project_api_log"}}}
3. 创建docker-compose.yml文件并放在根目录。
3.1 单机模式
version: '3.8'networks:elastic:driver: bridgeservices:elasticsearch:image: elasticsearch:7.14.2container_name: elasticsearchrestart: alwaysenvironment:- 'cluster.name=elasticsearch'- 'discovery.type=single-node'- 'ES_JAVA_OPTS=-Xms512m -Xmx512m'volumes:- ./elasticsearch/plugins:/usr/share/elasticsearch/plugins- ./elasticsearch/data:/usr/share/elasticsearch/datanetworks:- elasticports:- 9200:9200logstash:image: logstash:7.14.2container_name: logstashrestart: alwaysvolumes:- ./logstash/logstash.conf:/usr/share/logstash/pipeline/logstash.confnetworks:- elasticports:- 4560:4560- 4561:4561depends_on:- elasticsearchkibana:image: kibana:7.14.2container_name: kibanarestart: alwaysenvironment:ELASTICSEARCH_HOSTS: '["http://elasticsearch:9200"]'networks:- elasticports:- 5601:5601depends_on:- elasticsearch
3.2 集群模式
version: '3.8'networks:elastic:driver: bridgeservices:es01:image: elasticsearch:7.14.2container_name: es01restart: alwaysenvironment:- node.name=es01- cluster.name=es-docker-cluster- discovery.seed_hosts=es02,es03- cluster.initial_master_nodes=es01,es02,es03- bootstrap.memory_lock=true- 'ES_JAVA_OPTS=-Xms1024m -Xmx1024m'ulimits:memlock:soft: -1hard: -1volumes:- ./es01/plugins:/usr/share/elasticsearch/plugins- ./es01/data:/usr/share/elasticsearch/dataports:- 9200:9200networks:- elastices02:image: elasticsearch:7.14.2container_name: es02restart: alwaysenvironment:- node.name=es02- cluster.name=es-docker-cluster- discovery.seed_hosts=es01,es03- cluster.initial_master_nodes=es01,es02,es03- bootstrap.memory_lock=true- 'ES_JAVA_OPTS=-Xms1024m -Xmx1024m'ulimits:memlock:soft: -1hard: -1volumes:- ./es02/plugins:/usr/share/elasticsearch/plugins- ./es02/data:/usr/share/elasticsearch/datanetworks:- elastices03:image: elasticsearch:7.14.2container_name: es03restart: alwaysenvironment:- node.name=es03- cluster.name=es-docker-cluster- discovery.seed_hosts=es01,es02- cluster.initial_master_nodes=es01,es02,es03- bootstrap.memory_lock=true- 'ES_JAVA_OPTS=-Xms1024m -Xmx1024m'ulimits:memlock:soft: -1hard: -1volumes:- ./es03/plugins:/usr/share/elasticsearch/plugins- ./es03/data:/usr/share/elasticsearch/datanetworks:- elasticlogstash:image: logstash:7.14.2container_name: logstash-mutirestart: alwaysvolumes:- ./logstash/logstash.conf:/usr/share/logstash/pipeline/logstash.confnetworks:- elasticdepends_on:- es01- es02- es03ports:- 4560:4560- 4561:4561kibana:image: kibana:7.14.2container_name: kibana-mutirestart: alwaysenvironment:ELASTICSEARCH_HOSTS: '["http://es01:9200","http://es02:9200","http://es03:9200"]'networks:- elasticdepends_on:- es01- es02- es03ports:- 5601:5601
3.3 注意事项
- 生产环境不要使用参数 ES_JAVA_OPTS,要手动设置heap size- vm.max_map_count至少要设置为262144,设置方式参考[官方文档](https://www.elastic.co/guide/en/elasticsearch/reference/7.16/docker.html#_set_vm_max_map_count_to_at_least_262144)
4. Logstash。
Logstash 是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的“存储库”中。Logstash一些注意事项请参考小记:Logstash问题汇总。
4.1 Logstash是通过各种不同的plugins采集数据的。
4.1.1 安装插件命令如下:
# enter logstash containerdocker exec -it logstash /bin/bash# enter bin directorycd /bin/# install plugins, choice by yourselflogstash-plugin install logstash-codec-json_lines# exit logstash containerexit
4.1.2 Logstash插件列表:
logstash-codec-avrologstash-codec-ceflogstash-codec-collectdlogstash-codec-dotslogstash-codec-ednlogstash-codec-edn_lineslogstash-codec-es_bulklogstash-codec-fluentlogstash-codec-graphitelogstash-codec-jsonlogstash-codec-json_lineslogstash-codec-linelogstash-codec-msgpacklogstash-codec-multilinelogstash-codec-netflowlogstash-codec-plainlogstash-codec-rubydebuglogstash-filter-aggregatelogstash-filter-anonymizelogstash-filter-cidrlogstash-filter-clonelogstash-filter-csvlogstash-filter-datelogstash-filter-de_dotlogstash-filter-dissectlogstash-filter-dnslogstash-filter-droplogstash-filter-elasticsearchlogstash-filter-fingerprintlogstash-filter-geoiplogstash-filter-groklogstash-filter-httplogstash-filter-jsonlogstash-filter-kvlogstash-filter-memcachedlogstash-filter-metricslogstash-filter-mutatelogstash-filter-prunelogstash-filter-rubylogstash-filter-sleeplogstash-filter-splitlogstash-filter-syslog_prilogstash-filter-throttlelogstash-filter-translatelogstash-filter-truncatelogstash-filter-urldecodelogstash-filter-useragentlogstash-filter-uuidlogstash-filter-xmllogstash-input-azure_event_hubslogstash-input-beats└── logstash-input-elastic_agent (alias)logstash-input-couchdb_changeslogstash-input-dead_letter_queuelogstash-input-elasticsearchlogstash-input-execlogstash-input-filelogstash-input-ganglialogstash-input-gelflogstash-input-generatorlogstash-input-graphitelogstash-input-heartbeatlogstash-input-httplogstash-input-http_pollerlogstash-input-imaplogstash-input-jmslogstash-input-pipelogstash-input-redislogstash-input-s3logstash-input-snmplogstash-input-snmptraplogstash-input-sqslogstash-input-stdinlogstash-input-sysloglogstash-input-tcplogstash-input-twitterlogstash-input-udplogstash-input-unixlogstash-integration-elastic_enterprise_search├── logstash-output-elastic_app_search└── logstash-output-elastic_workplace_searchlogstash-integration-jdbc├── logstash-input-jdbc├── logstash-filter-jdbc_streaming└── logstash-filter-jdbc_staticlogstash-integration-kafka├── logstash-input-kafka└── logstash-output-kafkalogstash-integration-rabbitmq├── logstash-input-rabbitmq└── logstash-output-rabbitmqlogstash-output-cloudwatchlogstash-output-csvlogstash-output-elasticsearchlogstash-output-emaillogstash-output-filelogstash-output-graphitelogstash-output-httplogstash-output-lumberjacklogstash-output-nagioslogstash-output-nulllogstash-output-pipelogstash-output-redislogstash-output-s3logstash-output-snslogstash-output-sqslogstash-output-stdoutlogstash-output-tcplogstash-output-udplogstash-output-webhdfslogstash-patterns-core
4.2 使用-h参数,解锁更多使用方式。
命令如下:
# 查询帮助,获取更多命令行logstash-plugin -h
5. 以上准备工作都做好后,开始实操。
5.1 启动ELK。
docker-compose up
5.2 检查elasticsearch状态。
打开浏览器并访问http://192.168.3.77:9200/,返回下方JSON,表示启动成功。
{"name" : "ea7ce77db80e","cluster_name" : "elasticsearch","cluster_uuid" : "jeJqTZyyS9OC4RgM9sV3ww","version" : {"number" : "7.14.2","build_flavor" : "default","build_type" : "docker","build_hash" : "6bc13727ce758c0e943c3c21653b3da82f627f75","build_date" : "2021-09-15T10:18:09.722761972Z","build_snapshot" : false,"lucene_version" : "8.9.0","minimum_wire_compatibility_version" : "6.8.0","minimum_index_compatibility_version" : "6.0.0-beta1"},"tagline" : "You Know, for Search"}
5.3 检查kibana状态。
打开浏览器并访问 http://192.168.3.77:5601/
6. 配置Kibana。
6.1 查看索引
进入最下方菜单Stack Management
6.2 创建索引查询规则
6.3 按索引规则查询日志
进入菜单Discover,查看日志
7.Spring项目集成。
7.1 log4j2
7.1.1 pom.xml 依赖引用
<log4j.version>1.2.17</log4j.version><disruptor.version>3.4.4</disruptor.version><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-log4j2</artifactId></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>${log4j.version}</version></dependency><dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>${disruptor.version}</version></dependency>
7.1.2 log4j.xml
<?xml version="1.0" encoding="UTF-8"?><Configuration status="DEBUG" monitorInterval="1800"><Properties><Property name="FILE_PATH">logs</Property><Property name="FILE_NAME">your-project-name</Property><property name="LOG_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] [%p] [%c:%L] --- %m%n"/><Property name="LOGSTASH_HOST" value="192.168.3.77"></Property><Property name="LOGSTASH_PORT" value="4560"></Property></Properties><Appenders><Console name="Console" target="SYSTEM_OUT"><PatternLayout pattern="%date{yyyy-MM-dd HH:mm:ss.SSS} [%file] [%thread] %n%level : %msg%n"/></Console><RollingRandomAccessFile name="HourLogFile" fileName="${FILE_PATH}/${FILE_NAME}-hour.log"filePattern="${FILE_PATH}/${FILE_NAME}-hour.%d{yyy-MM-dd HH}.log"><PatternLayout pattern="${LOG_PATTERN}"/><Policies><TimeBasedTriggeringPolicy interval="1" modulate="true"/></Policies></RollingRandomAccessFile><Socket name="LOGSTASH" host="${LOGSTASH_HOST}" port="${LOGSTASH_PORT}" protocol="TCP"><PatternLayout pattern="${LOG_PATTERN}"/><JsonLayout properties="true"/></Socket></Appenders><Loggers><AsyncLogger name="com.alibaba.druid" level="DEBUG" additivity="false"><AppenderRef ref="Console"/><AppenderRef ref="HourLogFile"/><AppenderRef ref="LOGSTASH"/></AsyncLogger><AsyncLogger name="org.springframework" level="DEBUG" additivity="false"><AppenderRef ref="Console"/><AppenderRef ref="HourLogFile"/><AppenderRef ref="LOGSTASH"/></AsyncLogger><AsyncLogger name="org.apache.ibatis" level="DEBUG" additivity="false"><AppenderRef ref="Console"/><AppenderRef ref="HourLogFile"/><AppenderRef ref="LOGSTASH"/></AsyncLogger><Root level="DEBUG"><AppenderRef ref="Console"/><AppenderRef ref="HourLogFile"/><AppenderRef ref="LOGSTASH"/></Root></Loggers></Configuration>
7.2 logback
7.2.1 pom.xml 依赖引用
<logstash-logback-encoder.version>5.3</logstash-logback-encoder.version><dependency><groupId>net.logstash.logback</groupId><artifactId>logstash-logback-encoder</artifactId><version>${logstash-logback-encoder.version}</version></dependency>
7.2.2 logback-spring.xml
<configuration debug="false" scan="false"><property name="LOGSTASH_DESTINATION" value="192.168.3.77:4561"/><appender name="LOGSTASH" class="net.logstash.logback.appender.LogstashTcpSocketAppender"><destination>${LOGSTASH_DESTINATION}</destination><encoder charset="UTF-8" class="net.logstash.logback.encoder.LogstashEncoder"/></appender><root level="DEBUG"><appender-ref ref="LOGSTASH"/></root></configuration>
7.3 kafka
7.3.1 pom.xml 依赖引用
<kafka.version>2.4.1</kafka.version><jackson.version>2.12.5</jackson.version><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><artifactId>kafka-clients</artifactId><groupId>org.apache.kafka</groupId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>${kafka.version}</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>${jackson.version}</version></dependency>
7.3.2 application.properties
# kafkaspring.kafka.bootstrap-servers=192.168.3.77:9091,192.168.3.77:9092,192.168.3.77:9093## 生产者配置### kafka自带key序列化工具spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer### 自定义kafka value序列化工具spring.kafka.producer.value-serializer=com.your.project.kafka.KafkaJsonSerializer
7.3.3 SpringBootApplication 可选
@SpringBootApplicationpublic class YourProjectApplication {public static void main(String[] args) {SpringApplication springApplication = new SpringApplication(YourProjectApplication.class);springApplication.run(args);}// 如果没有创建好的Topic,就初始化1个@Beanpublic NewTopic topic() {return TopicBuilder.name("your-project-log-topic").partitions(10).replicas(1).build();}}
7.3.4 KafkaTemplate
import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.support.SendResult;import org.springframework.util.concurrent.ListenableFuture;@Autowired private KafkaTemplate kafkaTemplate;YourProjectLog yourProjectLog = new YourProjectLog();ListenableFuture<SendResult> listenableFuture = kafkaTemplate.send("your-project-log-topic", yourProjectLog);
7.4 Elasticsearch High Level REST Client
7.4.1 pom.xml 依赖引用
<spring-data-elasticsearch.version>4.3.0</spring-data-elasticsearch.version><jackson.version>2.12.5</jackson.version><elasticsearch.version>7.15.2</elasticsearch.version><dependency><groupId>co.elastic.clients</groupId><artifactId>elasticsearch-java</artifactId><version>${elasticsearch.version}</version></dependency><dependency><groupId>org.springframework.data</groupId><artifactId>spring-data-elasticsearch</artifactId><version>${spring-data-elasticsearch.version}</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>${jackson.version}</version></dependency>
7.4.2 Spring代理RestHighLevelClient
import org.elasticsearch.client.RestHighLevelClient;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.data.elasticsearch.client.ClientConfiguration;import org.springframework.data.elasticsearch.client.RestClients;import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration;@Configurationpublic class EsRestClientConfig extends AbstractElasticsearchConfiguration {@Override@Beanpublic RestHighLevelClient elasticsearchClient() {final ClientConfiguration clientConfiguration =ClientConfiguration.builder().connectedTo("IP:PORT").build();return RestClients.create(clientConfiguration).rest();}}
7.4.3 RestHighLevelClient
@Autowired private RestHighLevelClient restHighLevelClient;SearchRequest searchRequest = new SearchRequest(EsIndex.SKL_ADMIN_OPT_LOG_INDEX);SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery().should(QueryBuilders.matchQuery("methodNo", "login_v1")).should(QueryBuilders.matchQuery("description", "登录"));searchSourceBuilder.query(boolQueryBuilder).from(0).size(10).sort("requestTime", SortOrder.DESC);searchRequest.source(searchSourceBuilder);SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
