Elasticsearch集群

安装Elasticsearch

在需要做集群的服务器上安装Elasticsearch

配置文件

cluster.name: elk-cluster
node.name: ela01
path.data: /var/lib/elasticsearch
path.logs: /var/log/elasticsearch
# 对应本机IP
network.host: 10.0.0.74,127.0.0.1
http.port: 9200
# 集群中的主机一定要与其他机器联通
discovery.seed_hosts: ["10.0.0.75", "10.0.0.76"]
# 规定了集群最少机器,少于这个数量会无法启动
gateway.recover_after_nodes: 2
http.cors.enabled: true
http.cors.allow-origin: "*"

启动

systemctl restart elasticsearch

查看集群状态

  1. 方法一,使用命令行查看

    curl '10.0.0.74:9200/_cluster/health?pretty'
    
  2. 方法二,连接elasticsearch-head查看

Kafka集群

Zookeeper

环境

# 每个kafka集群都需要安装Java环境
yum install -y java

安装

cd /opt/elk
# 获取
wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.6.0/apache-zookeeper-3.6.0-bin.tar.gz
# 解压
tar xf apache-zookeeper-3.6.0-bin.tar.gz
# 移动
cp -r apache-zookeeper-3.6.0-bin /opt/
# 软链接
ln -s /opt/apache-zookeeper-3.6.0-bin /opt/zookeeper

配置

cd /opt/zookeeper/conf
# 创建数据存储目录
mkdir -p /data/zookeeper
# 拷贝配置文件
cp zoo_sample.cfg zoo.cfg
# 编辑
vim /opt/zookeeper/conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper
clientPort=2181
server.1=10.0.0.74:2888:3888
server.2=10.0.0.75:2888:3888
server.3=10.0.0.76:2888:3888

# 为集群的每台机器配置ID,每台机器ID都不一样
echo "1" > /data/zookeeper/myid
# 查看当前机器ID
cat /data/zookeeper/myid

测试

# 启动
/opt/zookeeper/bin/zkServer.sh start
# 查看状态
/opt/zookeeper/bin/zkServer.sh status

# 测试发送消息
/opt/zookeeper/bin/zkCli.sh -server 10.0.0.74:2181
create /test "hello"
# 测试接收消息
/opt/zookeeper/bin/zkCli.sh -server 10.0.0.75:2181
get /test

Kafka

安装

cd /opt/elk
# 获取
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.0/kafka_2.11-2.4.0.tgz
# 解压
tar xf kafka_2.11-2.4.0.tgz -C /opt/
# 软链接
ln -s /opt/kafka_2.11-2.4.0 /opt/kafka

配置

# 配置kafka主配置文件
vi /opt/kafka/config/server.properties
#--------------------------------------
# id需要与zookeeper的myid一致
broker.id=1
# 设置为本机地址
listeners=PLAINTEXT://10.0.0.74:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# 日志存放地址
log.dirs=/opt/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=24
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
# 集群中所有zookeeper地址
zookeeper.connect=10.0.0.74:2181,10.0.0.75:2181,10.0.0.76:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

测试

# 前台启动(集群中所有机器都启动)
/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties

# 开启新的窗口开始测试
# 创建命令
/opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 10.0.0.74:9092,10.0.0.75:9092,10.0.0.76:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic
# 查看主题
/opt/kafka/bin/kafka-topics.sh --describe --bootstrap-server 10.0.0.74:9092,10.0.0.75:9092,10.0.0.76:9092 --topic my-replicated-topic
# 测试发送消息
/opt/kafka/bin/kafka-console-producer.sh -broker-list 10.0.0.74:9092,10.0.0.75:9092,10.0.0.76:9092 --topic my-replicated-topic
# 其他节点测试接收
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 10.0.0.74:9092,10.0.0.75:9092,10.0.0.76:9092 --topic my-replicated-topic --from-beginning
# 测试获取所有的频道
/opt/kafka/bin/kafka-topics.sh  --list --zookeeper 10.0.0.74:2181,10.0.0.75:2181,10.0.0.76:2181

运行

# 后台运行
/opt/kafka/bin/kafka-server-start.sh  -daemon /opt/kafka/config/server.properties

Filebeat配置

vi /etc/filebeat/filebeat.yml

filebeat.inputs:
# 收集nginx的访问日志
- type: log
  enabled: true
  paths:
    - /var/log/nginx/access.log
  json.keys_under_root: true
  json.overwrite_keys: true
  tags: ["access"]
  # nginx的错误日志
- type: log
  enabled: true
  paths:
    - /var/log/nginx/error.log
  tags: ["error"]
filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: true
setup.template.settings:
  index.number_of_shards: 1
setup.kibana:
# 输出到kafka
output.kafka:
  enabled: true
  hosts: ["10.0.0.74:9092","10.0.0.75:9092","10.0.0.76:9092"]
  topic: 'filebeat'
processors:
  - add_host_metadata: ~
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata: ~

Logstash配置

vi /etc/logstash/conf.d/nginx.conf

input {
  kafka{
    bootstrap_servers=>"10.0.0.74:9092,10.0.0.75:9902,10.0.0.76:9092"
    topics=>["filebeat"]
    group_id=>"logstash"
    codec => "json"
  }
}

filter {
  mutate {
    convert => ["upstream_time", "float"]
    convert => ["request_time", "float"]
  }
}

output {
   stdout {}
   if "access" in [tags] {
      elasticsearch {
        hosts => "http://10.0.0.74:9200"
        manage_template => false
        index => "nginx_access-%{+yyyy.MM.dd}"
      }
    }
    if "error" in [tags] {
      elasticsearch {
        hosts => "http://10.0.0.74:9200"
        manage_template => false
        index => "nginx_error-%{+yyyy.MM.dd}"
      }
    }
}