在Kubernetes中部署EFK+kafka
参考资料: 作者:阳明 博客地址:https://www.qikqiak.com/k8s-book/
Kubernetes 中比较流行的日志收集解决方案是 Elasticsearch、Fluentd 和 Kibana(EFK)技术栈,也是官方现在比较推荐的一种方案。
- Elasticsearch 是一个实时的、分布式的可扩展的搜索引擎,允许进行全文、结构化搜索,它通常用于索引和搜索大量日志数据,也可用于搜索许多不同类型的文档。
- Kibana 是 Elasticsearch 的一个功能强大的数据可视化 Dashboard,Kibana 允许你通过 web 界面来浏览 Elasticsearch 日志数据。
- Fluentd是一个流行的开源数据收集器,我们将在 Kubernetes 集群节点上安装 Fluentd,通过获取容器日志文件、过滤和转换日志数据,然后将数据传递到 Elasticsearch 集群,在该集群中对其进行索引和存储。
正常情况下,上面这种方案就足够我们使用,但是如果集群日志太多,ES不堪重负,我们就需要接入中间件来缓冲数据,对于这些中间件来说kafka和redis无疑是我们的首选方案。我们这里采用了kafka,我们追求一切容器化,所以将这些组件全部都部署在Kubernetes中。
注:
(1)、我们将所有的组件都部署在一个单独的namespace中,我这里是新建了一个kube-ops的namespace;
(2)、集群部署到分布式存储,可选ceph,NFS等,我这里采用的NFS,如果你和我一样使用NFS并且不会搭建,可以参考https://www.qikqiak.com/k8s-book/docs/35.StorageClass.html;
创建Namespace
首先创建一个Namespace,可以使用命令,如下:
kubectl create ns kube-ops
也可以使用YAML清单,如下(efk-ns.yaml):
apiVersion: v1kind: Namespacemetadata:name: kube-ops
如果使用清单,需要创建清单文件:
kubectl apply -f efk-ns.yaml
部署Elasticsearch
首先我们来部署Elasticsearch集群。
开始部署3个节点的ElasticSearch。其中关键点是应该设置discover.zen.minimum_master_nodes=N/2+1,其中N是 Elasticsearch 集群中符合主节点的节点数,比如我们这里3个节点,意味着N应该设置为2。这样,如果一个节点暂时与集群断开连接,则另外两个节点可以选择一个新的主节点,并且集群可以在最后一个节点尝试重新加入时继续运行,在扩展 Elasticsearch 集群时,一定要记住这个参数。
(1)、创建 elasticsearch无头服务(elasticsearch-svc.yaml)
apiVersion: v1kind: Servicemetadata:name: elasticsearchnamespace: kube-opslabels:app: elasticsearchspec:selector:app: elasticsearchclusterIP: Noneports:- name: restport: 9200- name: inter-nodeport: 9300
定义为无头服务,是因为我们后面真正部署elasticsearch的pod是通过statefulSet部署的,到时候将其进行关联,另外9200是REST API端口,9300是集群间通信端口。
然后我们创建这个资源对象。
# kubectl apply -f elasticsearch-svc.yamlservice/elasticsearch created# kubectl get svc -n kube-opsNAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGEelasticsearch ClusterIP None <none> 9200/TCP,9300/TCP 9s
(2)、用StatefulSet部署Elasticsearch,配置清单如下(elasticsearch-elasticsearch.yaml ):
apiVersion: apps/v1kind: StatefulSetmetadata:name: es-clusternamespace: kube-opsspec:serviceName: elasticsearchreplicas: 3selector:matchLabels:app: elasticsearchtemplate:metadata:labels:app: elasticsearchspec:containers:- name: elasticsearchimage: docker.elastic.co/elasticsearch/elasticsearch-oss:6.4.3resources:limits:cpu: 1000mrequests:cpu: 1000mports:- containerPort: 9200name: restprotocol: TCP- containerPort: 9300name: inter-nodeprotocol: TCPvolumeMounts:- name: datamountPath: /usr/share/elasticsearch/dataenv:- name: cluster.namevalue: k8s-logs- name: node.namevalueFrom:fieldRef:fieldPath: metadata.name- name: discovery.zen.ping.unicast.hostsvalue: "es-cluster-0.elasticsearch,es-cluster-1.elasticsearch,es-cluster-2.elasticsearch"- name: discovery.zen.minimum_master_nodesvalue: "2"- name: ES_JAVA_OPTSvalue: "-Xms512m -Xmx512m"initContainers:- name: fix-permissionsimage: busyboxcommand: ["sh", "-c", "chown -R 1000:1000 /usr/share/elasticsearch/data"]securityContext:privileged: truevolumeMounts:- name: datamountPath: /usr/share/elasticsearch/data- name: increase-vm-max-mapimage: busyboxcommand: ["sysctl", "-w", "vm.max_map_count=262144"]securityContext:privileged: true- name: increase-fd-ulimitimage: busyboxcommand: ["sh", "-c", "ulimit -n 65536"]securityContext:privileged: truevolumeClaimTemplates:- metadata:name: datalabels:app: elasticsearchannotations:volume.beta.kubernetes.io/storage-class: es-data-dbspec:accessModes: [ "ReadWriteOnce" ]storageClassName: es-data-dbresources:requests:storage: 10Gi
配置清单说明:
上面Pod中定义了两种类型的container,普通的container和initContainer。其中在initContainer种它有3个container,它们会在所有容器启动前运行。
- 名为fix-permissions的container的作用是将 Elasticsearch 数据目录的用户和组更改为1000:1000(Elasticsearch 用户的 UID)。因为默认情况下,Kubernetes 用 root 用户挂载数据目录,这会使得 Elasticsearch 无法方法该数据目录。
- 名为 increase-vm-max-map 的容器用来增加操作系统对mmap计数的限制,默认情况下该值可能太低,导致内存不足的错误
- 名为increase-fd-ulimit的容器用来执行ulimit命令增加打开文件描述符的最大数量
在普通container中,我们定义了名为elasticsearch的container,然后暴露了9200和9300两个端口,注意名称要和上面定义的 Service 保持一致。然后通过 volumeMount 声明了数据持久化目录,下面我们再来定义 VolumeClaims。最后就是我们在容器中设置的一些环境变量了:
- cluster.name:Elasticsearch 集群的名称,我们这里命名成 k8s-logs;
- node.name:节点的名称,通过metadata.name来获取。这将解析为 es-cluster-[0,1,2],取决于节点的指定顺序;
- discovery.zen.ping.unicast.hosts:此字段用于设置在 Elasticsearch 集群中节点相互连接的发现方法。我们使用 unicastdiscovery 方式,它为我们的集群指定了一个静态主机列表。由于我们之前配置的无头服务,我们的 Pod 具有唯一的 DNS 域es-cluster-[0,1,2].elasticsearch.logging.svc.cluster.local,因此我们相应地设置此变量。由于都在同一个 namespace 下面,所以我们可以将其缩短为es-cluster-[0,1,2].elasticsearch;
- discovery.zen.minimum_master_nodes:我们将其设置为(N/2) + 1,N是我们的群集中符合主节点的节点的数量。我们有3个 Elasticsearch 节点,因此我们将此值设置为2(向下舍入到最接近的整数);
- ES_JAVA_OPTS:这里我们设置为-Xms512m -Xmx512m,告诉JVM使用512 MB的最小和最大堆。您应该根据群集的资源可用性和需求调整这些参数;
(3)、定义一个StorageClass(elasticsearch-storage.yaml)
apiVersion: storage.k8s.io/v1kind: StorageClassmetadata:name: es-data-dbprovisioner: rookieops/nfs
注意:由于我们这里采用的是NFS来存储,所以上面的provisioner需要和我们nfs-client-provisoner中保持一致。
然后我们创建资源:
# kubectl apply -f elasticsearch-storage.yaml# kubectl apply -f elasticsearch-elasticsearch.yaml# kubectl get pod -n kube-opsNAME READY STATUS RESTARTS AGEdingtalk-hook-8497494dc6-s6qkh 1/1 Running 0 16mes-cluster-0 1/1 Running 0 10mes-cluster-1 1/1 Running 0 10mes-cluster-2 1/1 Running 0 9m20s# kubectl get pvc -n kube-opsNAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGEdata-es-cluster-0 Bound pvc-9f15c0f8-60a8-485d-b650-91fb8f5f8076 10Gi RWO es-data-db 18mdata-es-cluster-1 Bound pvc-503828ec-d98e-4e94-9f00-eaf6c05f3afd 10Gi RWO es-data-db 11mdata-es-cluster-2 Bound pvc-3d2eb82e-396a-4eb0-bb4e-2dd4fba8600e 10Gi RWO es-data-db 10m# kubectl get svc -n kube-opsNAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGEdingtalk-hook ClusterIP 10.68.122.48 <none> 5000/TCP 18melasticsearch ClusterIP None <none> 9200/TCP,9300/TCP 19m
测试:
# kubectl port-forward es-cluster-0 9200:9200 --namespace=kube-opsForwarding from 127.0.0.1:9200 -> 9200Forwarding from [::1]:9200 -> 9200Handling connection for 9200
如果看到如下结果,就表示服务正常:
# curl http://localhost:9200/_cluster/state?pretty{"cluster_name" : "k8s-logs","compressed_size_in_bytes" : 337,"cluster_uuid" : "nzc4y-eDSuSaYU1TigFAWw","version" : 3,"state_uuid" : "6Mvd-WTPT0e7WMJV23Vdiw","master_node" : "KRyMrbS0RXSfRkpS0ZaarQ","blocks" : { },"nodes" : {"XGP4TrkrQ8KNMpH3pQlaEQ" : {"name" : "es-cluster-2","ephemeral_id" : "f-R_IyfoSYGhY27FmA41Tg","transport_address" : "172.20.1.104:9300","attributes" : { }},"KRyMrbS0RXSfRkpS0ZaarQ" : {"name" : "es-cluster-0","ephemeral_id" : "FpTnJTR8S3ysmoZlPPDnSg","transport_address" : "172.20.1.102:9300","attributes" : { }},"Xzjk2n3xQUutvbwx2h7f4g" : {"name" : "es-cluster-1","ephemeral_id" : "FKjRuegwToe6Fz8vgPmSNw","transport_address" : "172.20.1.103:9300","attributes" : { }}},"metadata" : {"cluster_uuid" : "nzc4y-eDSuSaYU1TigFAWw","templates" : { },"indices" : { },"index-graveyard" : {"tombstones" : [ ]}},"routing_table" : {"indices" : { }},"routing_nodes" : {"unassigned" : [ ],"nodes" : {"KRyMrbS0RXSfRkpS0ZaarQ" : [ ],"XGP4TrkrQ8KNMpH3pQlaEQ" : [ ],"Xzjk2n3xQUutvbwx2h7f4g" : [ ]}},"snapshots" : {"snapshots" : [ ]},"restore" : {"snapshots" : [ ]},"snapshot_deletions" : {"snapshot_deletions" : [ ]}}
到此,Elasticsearch部署完成。
部署kibana
对于kibana,它只是一个展示工具,所以我们用Deployment部署即可。
(1)、定义kibana service的配置清单(kibana-svc.yaml)
apiVersion: v1kind: Servicemetadata:name: kibananamespace: kube-opslabels:app: kibanaspec:ports:- port: 5601type: NodePortselector:app: kibana
我们这里配置的Service是采用NodePort类型,当然也可以采用ingress,推荐使用ingress。
(2)、定义kibana Deployment配置清单(kibana-deploy.yaml)
apiVersion: apps/v1kind: Deploymentmetadata:name: kibananamespace: kube-opslabels:app: kibanaspec:selector:matchLabels:app: kibanatemplate:metadata:labels:app: kibanaspec:containers:- name: kibanaimage: docker.elastic.co/kibana/kibana-oss:6.4.3resources:limits:cpu: 1000mrequests:cpu: 100menv:- name: ELASTICSEARCH_URLvalue: http://elasticsearch:9200ports:- containerPort: 5601
创建配置清单:
# kubectl apply -f kibana.yamlservice/kibana createddeployment.apps/kibana created# kubectl get svc -n kube-opsNAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGEdingtalk-hook ClusterIP 10.68.122.48 <none> 5000/TCP 47melasticsearch ClusterIP None <none> 9200/TCP,9300/TCP 48mkibana NodePort 10.68.221.60 <none> 5601:26575/TCP 7m29s[root@ecs-5704-0003 storage]# kubectl get pod -n kube-opsNAME READY STATUS RESTARTS AGEdingtalk-hook-8497494dc6-s6qkh 1/1 Running 0 47mes-cluster-0 1/1 Running 0 41mes-cluster-1 1/1 Running 0 41mes-cluster-2 1/1 Running 0 40mkibana-7fc9f8c964-68xbh 1/1 Running 0 7m41s
如果看到一下界面,表示你的kibana部署完成。

部署kafka
Apache Kafka是一个分布式发布 - 订阅消息系统和一个强大的队列,可以处理大量的数据,并使您能够将消息从一个端点传递到另一个端点。 Kafka适合离线和在线消息消费。 Kafka消息保留在磁盘上,并在群集内复制以防止数据丢失。
以下是Kafka的几个好处 :
- 可靠性 - Kafka是分布式,分区,复制和容错的。
- 可扩展性 - Kafka消息传递系统轻松缩放,无需停机。
- 耐用性 - Kafka使用分布式提交日志,这意味着消息会尽可能快地保留在磁盘上,因此它是持久的。
- 性能 - Kafka对于发布和订阅消息都具有高吞吐量。 即使存储了许多TB的消息,它也保持稳定的性能。
Kafka的一个关键依赖是Zookeeper,它是一个分布式配置和同步服务, Zookeeper是Kafka代理和消费者之间的协调接口, Kafka服务器通过Zookeeper集群共享信息。 Kafka在Zookeeper中存储基本元数据,例如关于主题,代理,消费者偏移(队列读取器)等的信息。
由于所有关键信息存储在Zookeeper中,并且它通常在其整体上复制此数据,因此Kafka代理/ Zookeeper的故障不会影响Kafka集群的状态,另外Kafka代理之间的领导者选举也通过使用Zookeeper在领导者失败的情况下完成的。
部署zookeeper
(1)、定义ZK的storageClass(zookeeper-storage.yaml)
apiVersion: storage.k8s.io/v1kind: StorageClassmetadata:name: zk-data-dbprovisioner: rookieops/nfs
(2)、定义ZK的headless service(zookeeper-svc.yaml)
apiVersion: v1kind: Servicemetadata:name: zk-svcnamespace: kube-opslabels:app: zk-svcspec:ports:- port: 2888name: server- port: 3888name: leader-electionclusterIP: Noneselector:app: zk
(3)、定义ZK的configMap(zookeeper-config.yaml)
apiVersion: v1kind: ConfigMapmetadata:name: zk-cmnamespace: kube-opsdata:jvm.heap: "1G"tick: "2000"init: "10"sync: "5"client.cnxns: "60"snap.retain: "3"purge.interval: "0"
(4)、定义ZK的PodDisruptionBudget(zookeeper-pdb.yaml)
apiVersion: policy/v1beta1kind: PodDisruptionBudgetmetadata:name: zk-pdbnamespace: kube-opsspec:selector:matchLabels:app: zkminAvailable: 2
说明:PodDisruptionBudget的作用就是为了保证业务不中断或者业务SLA不降级。通过PodDisruptionBudget控制器可以设置应用POD集群处于运行状态最低个数,也可以设置应用POD集群处于运行状态的最低百分比,这样可以保证在主动销毁应用POD的时候,不会一次性销毁太多的应用POD,从而保证业务不中断或业务SLA不降级。
(5)、定义ZK的statefulSet(zookeeper-statefulset.yaml)
apiVersion: apps/v1beta1kind: StatefulSetmetadata:name: zknamespace: kube-opsspec:serviceName: zk-svcreplicas: 3template:metadata:labels:app: zkspec:containers:- name: k8szkimagePullPolicy: Alwaysimage: registry.cn-hangzhou.aliyuncs.com/rookieops/zookeeper:3.4.10resources:requests:memory: "2Gi"cpu: "500m"ports:- containerPort: 2181name: client- containerPort: 2888name: server- containerPort: 3888name: leader-electionenv:- name : ZK_REPLICASvalue: "3"- name : ZK_HEAP_SIZEvalueFrom:configMapKeyRef:name: zk-cmkey: jvm.heap- name : ZK_TICK_TIMEvalueFrom:configMapKeyRef:name: zk-cmkey: tick- name : ZK_INIT_LIMITvalueFrom:configMapKeyRef:name: zk-cmkey: init- name : ZK_SYNC_LIMITvalueFrom:configMapKeyRef:name: zk-cmkey: tick- name : ZK_MAX_CLIENT_CNXNSvalueFrom:configMapKeyRef:name: zk-cmkey: client.cnxns- name: ZK_SNAP_RETAIN_COUNTvalueFrom:configMapKeyRef:name: zk-cmkey: snap.retain- name: ZK_PURGE_INTERVALvalueFrom:configMapKeyRef:name: zk-cmkey: purge.interval- name: ZK_CLIENT_PORTvalue: "2181"- name: ZK_SERVER_PORTvalue: "2888"- name: ZK_ELECTION_PORTvalue: "3888"command:- sh- -c- zkGenConfig.sh && zkServer.sh start-foregroundreadinessProbe:exec:command:- "zkOk.sh"initialDelaySeconds: 10timeoutSeconds: 5livenessProbe:exec:command:- "zkOk.sh"initialDelaySeconds: 10timeoutSeconds: 5volumeMounts:- name: datadirmountPath: /var/lib/zookeepervolumeClaimTemplates:- metadata:name: datadirspec:accessModes: ["ReadWriteOnce"]storageClassName: zk-data-dbresources:requests:storage: 1Gi
然后创建配置清单:
# kubectl apply -f zookeeper-storage.yaml# kubectl apply -f zookeeper-svc.yaml# kubectl apply -f zookeeper-config.yaml# kubectl apply -f zookeeper-pdb.yaml# kubectl apply -f zookeeper-statefulset.yaml# kubectl get pod -n kube-opsNAME READY STATUS RESTARTS AGEzk-0 1/1 Running 0 12mzk-1 1/1 Running 0 12mzk-2 1/1 Running 0 11m
然后查看集群状态:
# for i in 0 1 2; do kubectl exec -n kube-ops zk-$i zkServer.sh status; doneZooKeeper JMX enabled by defaultUsing config: /usr/bin/../etc/zookeeper/zoo.cfgMode: followerZooKeeper JMX enabled by defaultUsing config: /usr/bin/../etc/zookeeper/zoo.cfgMode: followerZooKeeper JMX enabled by defaultUsing config: /usr/bin/../etc/zookeeper/zoo.cfgMode: leader
部署kafka
(1)、制作镜像,Dokcerfile如下:
kafka下载地址:wget https://www-us.apache.org/dist/kafka/2.2.0/kafka_2.11-2.2.0.tgz
FROM centos:centos7LABEL "auth"="rookieops" \"mail"="rookieops@163.com"ENV TIME_ZONE Asia/Shanghai# install JAVAADD jdk-8u131-linux-x64.tar.gz /opt/ENV JAVA_HOME /opt/jdk1.8.0_131ENV PATH ${JAVA_HOME}/bin:${PATH}# install kafkaADD kafka_2.11-2.3.1.tgz /opt/RUN mv /opt/kafka_2.11-2.3.1 /opt/kafkaWORKDIR /opt/kafkaEXPOSE 9092CMD ["./bin/kafka-server-start.sh", "config/server.properties"]
然后docker build,docker push到镜像仓库(操作略)。
(2)、定义kafka的storageClass(kafka-storage.yaml )
apiVersion: storage.k8s.io/v1kind: StorageClassmetadata:name: kafka-data-dbprovisioner: rookieops/nfs
(3)、定义kafka headless Service(kafka-svc.yaml)
apiVersion: v1kind: Servicemetadata:name: kafka-svcnamespace: kube-opslabels:app: kafkaspec:selector:app: kafkaclusterIP: Noneports:- name: serverport: 9092
(4)、定义kafka的configMap(kafka-config.yaml)
apiVersion: v1kind: ConfigMapmetadata:name: kafka-confignamespace: kube-opsdata:server.properties: |broker.id=${HOSTNAME##*-}listeners=PLAINTEXT://:9092num.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600log.dirs=/data/kafka/logsnum.partitions=1num.recovery.threads.per.data.dir=1offsets.topic.replication.factor=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1log.retention.hours=168log.segment.bytes=1073741824log.retention.check.interval.ms=300000zookeeper.connect=zk-0.zk-svc.kube-ops.svc.cluster.local:2181,zk-1.zk-svc.kube-ops.svc.cluster.local:2181,zk-2.zk-svc.kube-ops.svc.cluster.local:2181zookeeper.connection.timeout.ms=6000group.initial.rebalance.delay.ms=0
(5)、定义kafka的statefuleSet配置清单(kafka.yaml)
apiVersion: apps/v1kind: StatefulSetmetadata:name: kafkanamespace: kube-opsspec:serviceName: kafka-svcreplicas: 3selector:matchLabels:app: kafkatemplate:metadata:labels:app: kafkaspec:affinity:podAffinity:preferredDuringSchedulingIgnoredDuringExecution:- weight: 1podAffinityTerm:labelSelector:matchExpressions:- key: "app"operator: Invalues:- zktopologyKey: "kubernetes.io/hostname"terminationGracePeriodSeconds: 300containers:- name: kafkaimage: registry.cn-hangzhou.aliyuncs.com/rookieops/kafka:2.3.1-betaimagePullPolicy: Alwaysresources:requests:cpu: 500mmemory: 1Gilimits:cpu: 500mmemory: 1Gicommand:- "/bin/sh"- "-c"- "./bin/kafka-server-start.sh config/server.properties --override broker.id=${HOSTNAME##*-}"ports:- name: servercontainerPort: 9092volumeMounts:- name: configmountPath: /opt/kafka/config/server.propertiessubPath: server.properties- name: datamountPath: /data/kafka/logsvolumes:- name: configconfigMap:name: kafka-configvolumeClaimTemplates:- metadata:name: dataspec:accessModes: [ "ReadWriteOnce" ]storageClassName: kafka-data-dbresources:requests:storage: 10Gi
创建配置清单:
# kubectl apply -f kafka-storage.yaml# kubectl apply -f kafka-svc.yaml# kubectl apply -f kafka-config.yaml# kubectl apply -f kafka.yaml# kubectl get pod -n kube-opsNAME READY STATUS RESTARTS AGEkafka-0 1/1 Running 0 13mkafka-1 1/1 Running 0 13mkafka-2 1/1 Running 0 10mzk-0 1/1 Running 0 77mzk-1 1/1 Running 0 77mzk-2 1/1 Running 0 76m
测试:
(1)、进入一个container,创建topic,并开启consumer等待producer生产数据
# kubectl exec -it -n kube-ops kafka-0 -- /bin/bash$ cd /opt/kafka$ ./bin/kafka-topics.sh --create --topic test --zookeeper zk-0.zk-svc.kube-ops.svc.cluster.local:2181,zk-1.zk-svc.kube-ops.svc.cluster.local:2181,zk-2.zk-svc.kube-ops.svc.cluster.local:2181 --partitions 3 --replication-factor 2Created topic "test".# 消费$ ./bin/kafka-console-consumer.sh --topic test --bootstrap-server localhost:9092
(2)、再进入另一个container做producer:
# kubectl exec -it -n kube-ops kafka-1 -- /bin/bash$ cd /opt/kafka$ ./bin/kafka-console-producer.sh --topic test --broker-list localhost:9092hellonihao
可以看到consumer上会产生消费信息:
$ ./bin/kafka-console-consumer.sh --topic test --bootstrap-server localhost:9092hellonihao
至此,kafka集群搭建完成。
部署Logstash
在这里部署Logstash的作用是读取kafka中的信息,然后传递给我们的后端存储ES,为了简化,我这里直接使用Deployment部署了。
制作镜像,Dockerfile如下:
FROM centos:centos7LABEL "auth"="rookieops" \"mail"="rookieops@163.com"ENV TIME_ZONE Asia/Shanghai# install JAVAADD jdk-8u131-linux-x64.tar.gz /opt/ENV JAVA_HOME /opt/jdk1.8.0_131ENV PATH ${JAVA_HOME}/bin:${PATH}# install logstashADD logstash-7.1.1.tar.gz /opt/RUN mv /opt/logstash-7.1.1 /opt/logstash
(1)、定义configMap配置清单(logstash-config.yaml)
apiVersion: v1kind: ConfigMapmetadata:name: logstash-k8s-confignamespace: kube-opsdata:containers.conf: |input {kafka {codec => "json"topics => ["test"]bootstrap_servers => ["kafka-0.kafka-svc.kube-ops:9092, kafka-1.kafka-svc.kube-ops:9092, kafka-2.kafka-svc.kube-ops:9092"]group_id => "logstash-g1"}}output {elasticsearch {hosts => ["es-cluster-0.elasticsearch.kube-ops:9200", "es-cluster-1.elasticsearch.kube-ops:9200", "es-cluster-2.elasticsearch.kube-ops:9200"]index => "logstash-%{+YYYY.MM.dd}"}}
(2)、定义Deployment配置清单(logstash.yaml)
kind: Deploymentmetadata:name: logstashnamespace: kube-opsspec:replicas: 1selector:matchLabels:app: logstashtemplate:metadata:labels:app: logstashspec:containers:- name: logstashimage: registry.cn-hangzhou.aliyuncs.com/rookieops/logstash-kubernetes:7.1.1volumeMounts:- name: configmountPath: /opt/logstash/config/containers.confsubPath: containers.confcommand:- "/bin/sh"- "-c"- "/opt/logstash/bin/logstash -f /opt/logstash/config/containers.conf"volumes:- name: configconfigMap:name: logstash-k8s-config
然后生成配置:
# kubectl apply -f logstash-config.yaml# kubectl apply -f logstash.yaml
然后观察状态,查看日志:
# kubectl get pod -n kube-opsNAME READY STATUS RESTARTS AGEdingtalk-hook-856c5dbbc9-srcm6 1/1 Running 0 3d20hes-cluster-0 1/1 Running 0 22mes-cluster-1 1/1 Running 0 22mes-cluster-2 1/1 Running 0 22mkafka-0 1/1 Running 0 3h6mkafka-1 1/1 Running 0 3h6mkafka-2 1/1 Running 0 3h6mkibana-7fc9f8c964-dqr68 1/1 Running 0 5d2hlogstash-678c945764-lkl2n 1/1 Running 0 10mzk-0 1/1 Running 0 3d21hzk-1 1/1 Running 0 3d21hzk-2 1/1 Running 0 3d21h

部署Fluentd
Fluentd 是一个高效的日志聚合器,是用 Ruby 编写的,并且可以很好地扩展。对于大部分企业来说,Fluentd 足够高效并且消耗的资源相对较少,另外一个工具Fluent-bit更轻量级,占用资源更少,但是插件相对 Fluentd 来说不够丰富,所以整体来说,Fluentd 更加成熟,使用更加广泛,所以我们这里也同样使用 Fluentd 来作为日志收集工具。
(1)、安装fluent-plugin-kafka插件
我这里的安装步骤是先起一个fluentd容器,然后安装插件,最后commit一下,再推送到仓库。具体步骤如下:
a、先用docker起一个容器
# docker run -it registry.cn-hangzhou.aliyuncs.com/rookieops/fluentd-elasticsearch:v2.0.4 /bin/bash$ gem install fluent-plugin-kafka --no-document
b、退出容器,重新commit 一下:
# docker commit c29b250d8df9 registry.cn-hangzhou.aliyuncs.com/rookieops/fluentd-elasticsearch:v2.0.4
c、将安装了插件的镜像推向仓库:
# docker push registry.cn-hangzhou.aliyuncs.com/rookieops/fluentd-elasticsearch:v2.0.4
(2)、定义Fluentd的configMap(fluentd-config.yaml)
kind: ConfigMapapiVersion: v1metadata:name: fluentd-confignamespace: kube-opslabels:addonmanager.kubernetes.io/mode: Reconciledata:system.conf: |-<system>root_dir /tmp/fluentd-buffers/</system>containers.input.conf: |-<source>@id fluentd-containers.log@type tailpath /var/log/containers/*.logpos_file /var/log/es-containers.log.postime_format %Y-%m-%dT%H:%M:%S.%NZlocaltimetag raw.kubernetes.*format jsonread_from_head true</source># Detect exceptions in the log output and forward them as one log entry.<match raw.kubernetes.**>@id raw.kubernetes@type detect_exceptionsremove_tag_prefix rawmessage logstream streammultiline_flush_interval 5max_bytes 500000max_lines 1000</match>system.input.conf: |-# Logs from systemd-journal for interesting services.<source>@id journald-docker@type systemdfilters [{ "_SYSTEMD_UNIT": "docker.service" }]<storage>@type localpersistent true</storage>read_from_head truetag docker</source><source>@id journald-kubelet@type systemdfilters [{ "_SYSTEMD_UNIT": "kubelet.service" }]<storage>@type localpersistent true</storage>read_from_head truetag kubelet</source>forward.input.conf: |-# Takes the messages sent over TCP<source>@type forward</source>output.conf: |-# Enriches records with Kubernetes metadata<filter kubernetes.**>@type kubernetes_metadata</filter><match **>@id kafka@type kafka2@log_level infoinclude_tag_key truebrokers kafka-0.kafka-svc.kube-ops:9092,kafka-1.kafka-svc.kube-ops:9092,kafka-2.kafka-svc.kube-ops:9092logstash_format truerequest_timeout 30s<buffer>@type filepath /var/log/fluentd-buffers/kubernetes.system.bufferflush_mode intervalretry_type exponential_backoffflush_thread_count 2flush_interval 5sretry_foreverretry_max_interval 30chunk_limit_size 2Mqueue_limit_length 8overflow_action block</buffer># data type settings<format>@type json</format># topic settingstopic_key topicdefault_topic test# producer settingsrequired_acks -1compression_codec gzip</match>
(3)、定义DeamonSet配置清单(fluentd-daemonset.yaml)
apiVersion: v1kind: ServiceAccountmetadata:name: fluentd-esnamespace: kube-opslabels:k8s-app: fluentd-eskubernetes.io/cluster-service: "true"addonmanager.kubernetes.io/mode: Reconcile---kind: ClusterRoleapiVersion: rbac.authorization.k8s.io/v1metadata:name: fluentd-eslabels:k8s-app: fluentd-eskubernetes.io/cluster-service: "true"addonmanager.kubernetes.io/mode: Reconcilerules:- apiGroups:- ""resources:- "namespaces"- "pods"verbs:- "get"- "watch"- "list"---kind: ClusterRoleBindingapiVersion: rbac.authorization.k8s.io/v1metadata:name: fluentd-eslabels:k8s-app: fluentd-eskubernetes.io/cluster-service: "true"addonmanager.kubernetes.io/mode: Reconcilesubjects:- kind: ServiceAccountname: fluentd-esnamespace: kube-opsapiGroup: ""roleRef:kind: ClusterRolename: fluentd-esapiGroup: ""---apiVersion: apps/v1kind: DaemonSetmetadata:name: fluentd-esnamespace: kube-opslabels:k8s-app: fluentd-esversion: v2.0.4kubernetes.io/cluster-service: "true"addonmanager.kubernetes.io/mode: Reconcilespec:selector:matchLabels:k8s-app: fluentd-esversion: v2.0.4template:metadata:labels:k8s-app: fluentd-eskubernetes.io/cluster-service: "true"version: v2.0.4# This annotation ensures that fluentd does not get evicted if the node# supports critical pod annotation based priority scheme.# Note that this does not guarantee admission on the nodes (#40573).annotations:scheduler.alpha.kubernetes.io/critical-pod: ''spec:serviceAccountName: fluentd-escontainers:- name: fluentd-esimage: registry.cn-hangzhou.aliyuncs.com/rookieops/fluentd-elasticsearch:v2.0.4command:- "/bin/sh"- "-c"- "/run.sh $FLUENTD_ARGS"env:- name: FLUENTD_ARGSvalue: --no-supervisor -qresources:limits:memory: 500Mirequests:cpu: 100mmemory: 200MivolumeMounts:- name: varlogmountPath: /var/log- name: varlibdockercontainersmountPath: /var/lib/docker/containersreadOnly: true- name: config-volumemountPath: /etc/fluent/config.dnodeSelector:beta.kubernetes.io/fluentd-ds-ready: "true"tolerations:- key: node-role.kubernetes.io/masteroperator: Existseffect: NoScheduleterminationGracePeriodSeconds: 30volumes:- name: varloghostPath:path: /var/log- name: varlibdockercontainershostPath:path: /var/lib/docker/containers- name: config-volumeconfigMap:name: fluentd-config
创建配置清单:
# kubectl apply -f fluentd-daemonset.yaml# kubectl apply -f fluentd-config.yaml# kubectl get pod -n kube-opsNAME READY STATUS RESTARTS AGEdingtalk-hook-856c5dbbc9-srcm6 1/1 Running 0 3d21hes-cluster-0 1/1 Running 0 112mes-cluster-1 1/1 Running 0 112mes-cluster-2 1/1 Running 0 112mfluentd-es-jvhqv 1/1 Running 0 4h29mfluentd-es-s7v6m 1/1 Running 0 4h29mkafka-0 1/1 Running 0 4h36mkafka-1 1/1 Running 0 4h36mkafka-2 1/1 Running 0 4h36mkibana-7fc9f8c964-dqr68 1/1 Running 0 5d4hlogstash-678c945764-lkl2n 1/1 Running 0 100mzk-0 1/1 Running 0 3d23hzk-1 1/1 Running 0 3d23hzk-2 1/1 Running 0 3d23h
至此,整套流程搭建完了,然后我们进入一台kafka容器,我们查看consumer消息,如下:

然后进入kibana,先创建索引,由于我们在logstash的配置文件中定义了索引为logstash-%{+YYYY.MM.dd},所以我们创建索引如下:

创建成功后如下:
然后我们查看日志信息,如下:
到此,整个日志收集系统搭建完成。
