Elasticsearch集群
安装Elasticsearch
在需要做集群的服务器上安装Elasticsearch
配置文件
cluster.name: elk-cluster
node.name: ela01
path.data: /var/lib/elasticsearch
path.logs: /var/log/elasticsearch
# 对应本机IP
network.host: 10.0.0.74,127.0.0.1
http.port: 9200
# 集群中的主机一定要与其他机器联通
discovery.seed_hosts: ["10.0.0.75", "10.0.0.76"]
# 规定了集群最少机器,少于这个数量会无法启动
gateway.recover_after_nodes: 2
http.cors.enabled: true
http.cors.allow-origin: "*"
启动
systemctl restart elasticsearch
查看集群状态
方法一,使用命令行查看
curl '10.0.0.74:9200/_cluster/health?pretty'方法二,连接elasticsearch-head查看
Kafka集群
Zookeeper
环境
# 每个kafka集群都需要安装Java环境
yum install -y java
安装
cd /opt/elk
# 获取
wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.6.0/apache-zookeeper-3.6.0-bin.tar.gz
# 解压
tar xf apache-zookeeper-3.6.0-bin.tar.gz
# 移动
cp -r apache-zookeeper-3.6.0-bin /opt/
# 软链接
ln -s /opt/apache-zookeeper-3.6.0-bin /opt/zookeeper
配置
cd /opt/zookeeper/conf
# 创建数据存储目录
mkdir -p /data/zookeeper
# 拷贝配置文件
cp zoo_sample.cfg zoo.cfg
# 编辑
vim /opt/zookeeper/conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper
clientPort=2181
server.1=10.0.0.74:2888:3888
server.2=10.0.0.75:2888:3888
server.3=10.0.0.76:2888:3888
# 为集群的每台机器配置ID,每台机器ID都不一样
echo "1" > /data/zookeeper/myid
# 查看当前机器ID
cat /data/zookeeper/myid
测试
# 启动
/opt/zookeeper/bin/zkServer.sh start
# 查看状态
/opt/zookeeper/bin/zkServer.sh status
# 测试发送消息
/opt/zookeeper/bin/zkCli.sh -server 10.0.0.74:2181
create /test "hello"
# 测试接收消息
/opt/zookeeper/bin/zkCli.sh -server 10.0.0.75:2181
get /test
Kafka
安装
cd /opt/elk
# 获取
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.0/kafka_2.11-2.4.0.tgz
# 解压
tar xf kafka_2.11-2.4.0.tgz -C /opt/
# 软链接
ln -s /opt/kafka_2.11-2.4.0 /opt/kafka
配置
# 配置kafka主配置文件
vi /opt/kafka/config/server.properties
#--------------------------------------
# id需要与zookeeper的myid一致
broker.id=1
# 设置为本机地址
listeners=PLAINTEXT://10.0.0.74:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# 日志存放地址
log.dirs=/opt/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=24
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
# 集群中所有zookeeper地址
zookeeper.connect=10.0.0.74:2181,10.0.0.75:2181,10.0.0.76:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
测试
# 前台启动(集群中所有机器都启动)
/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
# 开启新的窗口开始测试
# 创建命令
/opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 10.0.0.74:9092,10.0.0.75:9092,10.0.0.76:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic
# 查看主题
/opt/kafka/bin/kafka-topics.sh --describe --bootstrap-server 10.0.0.74:9092,10.0.0.75:9092,10.0.0.76:9092 --topic my-replicated-topic
# 测试发送消息
/opt/kafka/bin/kafka-console-producer.sh -broker-list 10.0.0.74:9092,10.0.0.75:9092,10.0.0.76:9092 --topic my-replicated-topic
# 其他节点测试接收
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 10.0.0.74:9092,10.0.0.75:9092,10.0.0.76:9092 --topic my-replicated-topic --from-beginning
# 测试获取所有的频道
/opt/kafka/bin/kafka-topics.sh --list --zookeeper 10.0.0.74:2181,10.0.0.75:2181,10.0.0.76:2181
运行
# 后台运行
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
Filebeat配置
vi /etc/filebeat/filebeat.yml
filebeat.inputs:
# 收集nginx的访问日志
- type: log
enabled: true
paths:
- /var/log/nginx/access.log
json.keys_under_root: true
json.overwrite_keys: true
tags: ["access"]
# nginx的错误日志
- type: log
enabled: true
paths:
- /var/log/nginx/error.log
tags: ["error"]
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: true
setup.template.settings:
index.number_of_shards: 1
setup.kibana:
# 输出到kafka
output.kafka:
enabled: true
hosts: ["10.0.0.74:9092","10.0.0.75:9092","10.0.0.76:9092"]
topic: 'filebeat'
processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~
Logstash配置
vi /etc/logstash/conf.d/nginx.conf
input {
kafka{
bootstrap_servers=>"10.0.0.74:9092,10.0.0.75:9902,10.0.0.76:9092"
topics=>["filebeat"]
group_id=>"logstash"
codec => "json"
}
}
filter {
mutate {
convert => ["upstream_time", "float"]
convert => ["request_time", "float"]
}
}
output {
stdout {}
if "access" in [tags] {
elasticsearch {
hosts => "http://10.0.0.74:9200"
manage_template => false
index => "nginx_access-%{+yyyy.MM.dd}"
}
}
if "error" in [tags] {
elasticsearch {
hosts => "http://10.0.0.74:9200"
manage_template => false
index => "nginx_error-%{+yyyy.MM.dd}"
}
}
}
