EFK描述

image.png
Elasticsearch
开源的分布式搜索和数据分析引擎,底层是开源库Apache Lucene。描述:

  • 一个分布式的实时文档存储,每个字段可以被索引与搜索;
  • 一个分布式实时分析搜索引擎;
  • 能胜任上百个服务节点的扩展,并支持 PB 级别的结构化或者非结构化数据。

Kibana
开源的分析和可视化平台。可以通过Kibana来搜索,查看,并和存储在Elasticsearch索引中的数据进行交互。也可以执行高级数据分析,并且可视化数据。

Fluentd
针对日志的收集、处理、转发的系统。通过丰富的插件系统,可以收集来自于各种系统或应用的日志,转化为用户指定的格式后,转发到用户所指定的日志存储系统之中。
image.png

主要运行步骤如下:
1. 首先 Fluentd 从多个日志源获取数据
2. 结构化并且标记这些数据
3. 然后根据匹配的标签将数据发送到多个目标服务

Fluentd

Fluentd架构

image.png

为什么使用fluentd作为k8s体系的日志收集工具?

  1. 云原生
  2. 将日志json化;
  3. 可插拔架构设计;
  4. 极小的资源占用;
  5. 极强的可靠性
    1. 基于内存和本地文件的缓存
    2. 强大的故障转移

事件流生命周期和指令配置

#官档
https://docs.fluentd.org/v/0.12/quickstart/life-of-a-fluentd-event

#事件流程
Input -> filter 1 -> … -> filter N -> Buffer -> Output

#启动命令
$ fluentd -c fluent.conf

#指令介绍
source,数据源,对应Input
通过使用 source 指令,来选择和配置所需的输入插件来启用 Fluentd 输入源, source 把事件提交到 fluentd 的路由引擎中。使用type来区分不同类型的数据源。

监听指定文件的追加输入:

<source>
    @type tail
    path /var/log/httpd-access.log
    pos_file /var/log/td-agent/httpd-access.log.pos
    tag myapp.access
    format apache2
</source>

filter,Event processing pipeline(事件处理流)
filter 可以串联成 pipeline,对数据进行串行处理,最终再交给 match 输出。

对事件内容进行处理:

<source>
    @type http
    port 9880
  </source>

  <filter myapp.access>
    @type record_transformer
    <record>
      host_param “#{Socket.gethostname}”
    </record>
  </filter>

filter 获取数据后,调用内置 @type record_transformer 插件,在事件的 record 里插入了新的字段 host_param,然后再交给 match 输出。

label指令
source 里指定 @label,这个 source 所触发的事件就会被发送给指定的 label 所包含的任务,而不会被后续的其他任务获取到。

<source>
    @type forward
  </source>

  <source>
  ### 这个任务指定了 label 为 @SYSTEM
  ### 会被发送给 <label @SYSTEM>
  ### 而不会被发送给下面紧跟的 filter 和 match
    @type tail
    @label @SYSTEM
    path /var/log/httpd-access.log
    pos_file /var/log/td-agent/httpd-access.log.pos
    tag myapp.access
    format apache2
  </source>

  <filter access.**>
    @type record_transformer
    <record>
    # …
    </record>
  </filter>

  <match **>
    @type elasticsearch
    # …
  </match>

  <label @SYSTEM>
    ### 将会接收到上面 @type tail 的 source event
    <filter var.log.middleware.**>
      @type grep
      # …
    </filter>

    <match **>
      @type s3
      # …
    </match>
  </label>

match,匹配输出
查找匹配 ‘tags’ 的事件并处理。match 命令的最常见用法是将事件输出到其他系统(因此,与 match 命令对应的插件称为 “输出插件”)

<source>
    @type http
    port 9880
  </source>

  <filter myapp.access>
    @type record_transformer
    <record>
      host_param “#{Socket.gethostname}”
    </record>
  </filter>

  <match myapp.access>
    @type file
    path /var/log/fluent/access
  </match>

#事件的结构
time:事件的处理时间
tag:事件的来源,在fluentd.conf中配置
record:真实的日志内容,json对象

原始日志:

192.168.0.1 - - [16/07/2020:12:00:00 +0900] "GET / HTTP/1.1" 200 777

经过 fluentd 引擎处理完后的日志格式可能是:

2020-07-16 08:40:35 +0000 apache.access: {"user":"-","method":"GET","code":200,"size":777,"host":"192.168.0.1","path":"/"}

buffer事件缓冲模型

image.png
因为每个事件数据量通常很小,考虑数据传输效率、稳定性等方面的原因,所以基本不会每条事件处理完后都会立马写入到output端,因此fluentd建立了缓冲模型,模型中主要有两个概念:

  • buffer_chunk:事件缓冲块,用来存储本地已经处理完待发送至目的端的事件,可以设置每个块的大小。
    - buffer_queue:存储chunk的队列,可以设置长度。

可以设置的参数,主要有:
- buffer_type,缓冲类型,可以设置file或者memory
- buffer_chunk_limit,每个chunk块的大小,默认8MB
- buffer_queue_limit ,chunk块队列的最大长度,默认256
- flush_interval ,flush一个chunk的时间间隔
- retry_limit ,chunk块发送失败重试次数,默认17次,之后就丢弃该chunk数据
- retry_wait ,重试发送chunk数据的时间间隔,默认1s,第2次失败再发送的话,间隔2s,下次4秒,以此类推。

大致的过程为:
随着fluentd事件的不断生成并写入chunk,缓存块变大,当缓存块满足buffer_chunk_limit大小或者新的缓存块诞生超过flush_interval时间间隔后,会推入缓存queue队列尾部,该队列大小由buffer_queue_limit决定。

每次有新的chunk入列,位于队列最前部的chunk块会立即写入配置的存储后端,比如配置的是kafka,则立即把数据推入kafka中。

比较理想的情况是每次有新的缓存块进入缓存队列,则立马会被写入到后端,同时,新缓存块也持续入列。入列的速度不会快于出列的速度,这样基本上缓存队列处于空的状态,队列中最多只有一个缓存块。

但是实际情况考虑网络等因素,往往缓存块被写入后端存储的时候会出现延迟或者写入失败的情况,当缓存块写入后端失败时,该缓存块还会留在队列中,等retry_wait时间后重试发送,当retry的次数达到retry_limit后,该缓存块被销毁(数据被丢弃)。

此时缓存队列持续有新的缓存块进来,如果队列中存在很多未及时写入到后端存储的缓存块的话,当队列长度达到buffer_queue_limit大小,则新的事件被拒绝,fluentd报错,error_class=Fluent::Plugin::Buffer::BufferOverflowError error=”buffer space has too many data”。

还有一种情况是网络传输缓慢的情况,若每3秒钟会产生一个新块,但是写入到后端时间却达到了30s钟,队列长度为100,那么每个块出列的时间内,又有新的10个块进来,那么队列很快就会被占满,导致异常出现。

实践一:实现业务应用日志的收集及字段解析

#目标
收集容器内的 nginx 应用的access.log日志,并解析日志字段为 JSON 格式,原始日志格式:

$ tail -f access.log
...
53.49.146.149 1561620585.973 0.005 502 [27/Jun/2019:15:29:45 +0800] 178.73.215.171 33337 GET https

收集并处理成:

{
    "serverIp": "53.49.146.149",
    "timestamp": "1561620585.973",
    "respondTime": "0.005",
    "httpCode": "502",
    "eventTime": "27/Jun/2019:15:29:45 +0800",
    "clientIp": "178.73.215.171",
    "clientPort": "33337",
    "method": "GET",
    "protocol": "https"
}

#思路

    • 配置fluent.conf
        • 使用@tail插件通过监听access.log文件
        • 用filter实现对nginx日志格式解析
    • 启动fluentd服务
    • 手动追加内容至access.log文件
    • 观察本地输出内容是否符合预期

fluent.conf

<source>
    @type tail
    @label @nginx_access
    path /fluentd/access.log
    pos_file /fluentd/nginx_access.posg
    tag nginx_access
    format none
    @log_level trace
</source>
<label @nginx_access>
   <filter  nginx_access>
       @type parser
    key_name message
    format  /(?<serverIp>[^ ]*) (?<timestamp>[^ ]*) (?<respondTime>[^ ]*) (?<httpCode>[^ ]*) \[(?<eventTime>[^\]]*)\] (?<clientIp>[^ ]*) (?<clientPort>[^ ]*) (?<method>[^ ]*) (?<protocol>[^ ]*)/
   </filter>
   <match  nginx_access>
     @type stdout
   </match>
</label>

#启动服务
$ docker run -u root —rm -ti 192.168.136.10:5000/fluentd_elasticsearch/fluentd:v2.5.2 sh
/ # cd /fluentd/
/ # touch access.log
/ # fluentd -c /fluentd/etc/fluent.conf
/ # echo ‘53.49.146.149 1561620585.973 0.005 502 [27/Jun/2019:15:29:45 +0800] 178.73.215.171 33337 GET https’ >>/fluentd/access.log

#正则校验
使用该网站进行正则校验: http://fluentular.herokuapp.com/

实践二:使用ruby实现日志字段的转换及自定义处理

<source>
    @type tail
    @label @nginx_access
    path /fluentd/access.log
    pos_file /fluentd/nginx_access.posg
    tag nginx_access
    format none
    @log_level trace
</source>
<label @nginx_access>
   <filter  nginx_access>
       @type parser
       key_name message
       format  /(?<serverIp>[^ ]*) (?<timestamp>[^ ]*) (?<respondTime>[^ ]*) (?<httpCode>[^ ]*) \[(?<eventTime>[^\]]*)\] (?<clientIp>[^ ]*) (?<clientPort>[^ ]*) (?<method>[^ ]*) (?<protocol>[^ ]*)/
   </filter>
   <filter  nginx_access>   
       @type record_transformer
       enable_ruby
       <record>
        host_name "#{Socket.gethostname}"
        my_key  "my_val"
        tls ${record["protocol"].index("https") ? "true" : "false"}
       </record>
   </filter>
   <match  nginx_access>
     @type stdout
   </match>
</label>

部署es服务

分析

  1. es生产环境通常使用statefulset进行集群环境部署,此为实验环境,资源有限,故单点。
    2. 数据存储挂载主机路径
    3. es默认使用elasticsearch用户启动进程,es的数据目录是通过宿主机的路径挂载,因此目录权限被主机的目录权限覆盖,可以利用initContainer容器在es进程启动之前把目录的权限修改掉,注意init container要用特权模式启动。

    部署

    efk/elasticsearch.yaml ```yaml apiVersion: apps/v1 kind: StatefulSet metadata: labels: addonmanager.kubernetes.io/mode: Reconcile k8s-app: elasticsearch-logging version: v7.4.2 name: elasticsearch-logging namespace: logging spec: replicas: 1 revisionHistoryLimit: 10 selector: matchLabels: k8s-app: elasticsearch-logging version: v7.4.2 serviceName: elasticsearch-logging template: metadata: labels:
     k8s-app: elasticsearch-logging
     version: v7.4.2
    
    spec: nodeSelector:
     es: "true"  ## 指定部署在哪个节点。需根据环境来修改
    
    containers:
    • env:
      • name: NAMESPACE valueFrom: fieldRef:
        apiVersion: v1
        fieldPath: metadata.namespace
        
      • name: cluster.initial_master_nodes value: elasticsearch-logging-0
      • name: ES_JAVA_OPTS value: “-Xms512m -Xmx512m” image: harbor.od.com/efk/elasticsearch:7.4.2 name: elasticsearch-logging ports:
      • containerPort: 9200 name: db protocol: TCP
      • containerPort: 9300 name: transport protocol: TCP volumeMounts:
      • mountPath: /usr/share/elasticsearch/data name: elasticsearch-logging dnsConfig: options:
      • name: single-request-reopen initContainers:
    • command:
      • /sbin/sysctl
      • -w
      • vm.max_map_count=262144 image: alpine:3.6 imagePullPolicy: IfNotPresent name: elasticsearch-logging-init resources: {} securityContext: privileged: true
    • name: fix-permissions image: alpine:3.6 command: [“sh”, “-c”, “chown -R 1000:1000 /usr/share/elasticsearch/data”] securityContext: privileged: true volumeMounts:
      • name: elasticsearch-logging mountPath: /usr/share/elasticsearch/data volumes:
    • name: elasticsearch-logging hostPath: path: /esdata

apiVersion: v1 kind: Service metadata: labels: k8s-app: elasticsearch-logging name: elasticsearch namespace: logging spec: ports:

  • port: 9200 protocol: TCP targetPort: db selector: k8s-app: elasticsearch-logging type: ClusterIP bash $ kubectl create namespace logging

给slave1节点打上label,将es服务调度到slave1节点

$ kubectl label node k8s-slave1 es=true

部署服务,可以先去部署es的节点把镜像下载到本地

$ kubectl create -f elasticsearch.yaml

$ kubectl -n logging get po -o wide
NAME READY STATUS RESTARTS AGE IP NODE
elasticsearch-logging-0 1/1 Running 0 69m 10.244.1.104 k8s-slave1

验证es是否部署成功

$ kubectl -n logging get svc
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
elasticsearch ClusterIP 10.109.174.58 9200/TCP 71m

$ curl 10.109.174.58:9200
{
“name” : “elasticsearch-logging-0”,
“cluster_name” : “docker-cluster”,
“cluster_uuid” : “uic8xOyNSlGwvoY9DIBT1g”,
“version” : {
“number” : “7.4.2”,
“build_flavor” : “default”,
“build_type” : “docker”,
“build_hash” : “2f90bbf7b93631e52bafb59b3b049cb44ec25e96”,
“build_date” : “2019-10-28T20:40:44.881551Z”,
“build_snapshot” : false,
“lucene_version” : “8.2.0”,
“minimum_wire_compatibility_version” : “6.8.0”,
“minimum_index_compatibility_version” : “6.0.0-beta1”
},
“tagline” : “You Know, for Search”
}


<a name="u6ssS"></a>
## 部署kibana服务
<a name="Yr1kL"></a>
### 分析

1. kibana需要暴露web页面给前端使用,因此使用ingress配置域名来实现对kibana的访问<br />2. kibana为无状态应用,直接使用Deployment来启动<br />3. kibana需要访问es,直接利用k8s服务发现访问此地址即可,http://elasticsearch:9200
<a name="kJKR1"></a>
### 部署
 efk/kibana.yaml
```yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kibana
  namespace: logging
  labels:
    app: kibana
spec:
  selector:
    matchLabels:
      app: "kibana"
  template:
    metadata:
      labels:
        app: kibana
    spec:
      nodeSelector:
        kibana: "true"  ## 指定部署在哪个节点。需根据环境来修改
      containers:
      - name: kibana
        #image: 192.168.136.10:5000/kibana/kibana:7.4.2
        image: harbor.od.com/efk/kibana:7.4.2
        resources:
          limits:
            cpu: 1000m
          requests:
            cpu: 100m
        env:
          - name: ELASTICSEARCH_URL
            value: http://elasticsearch:9200
        ports:
        - containerPort: 5601
---
apiVersion: v1
kind: Service
metadata:
  name: kibana
  namespace: logging
  labels:
    app: kibana
spec:
  ports:
  - port: 5601
    protocol: TCP
    targetPort: 5601
  type: ClusterIP
  selector:
    app: kibana
---
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
  name: kibana
  namespace: logging
spec:
  rules:
  - host: kibana.crab.com
    http:
      paths:
      - path: /
        backend:
          serviceName: kibana
          servicePort: 5601
$ kubectl label node k8s-slave2 kibana=true
$ kubectl create -f kibana.yaml  
$ kubectl -n logging get po  
NAME                      READY   STATUS    RESTARTS   AGE  
elasticsearch-logging-0   1/1     Running   0          88m  
kibana-944c57766-ftlcw    1/1     Running   0          15m

## 配置域名解析 kibana.crab.com,并访问服务进行验证,若可以访问,说明连接es成功

部署fluentd服务

分析

  1. fluentd为日志采集服务,kubernetes集群的每个业务节点都有日志产生,因此需要使用daemonset的模式进行部署
    2. 为进一步控制资源,会指定标签 fluentd=true 来做进一步过滤,只有带有此标签的节点才会部署fluentd
    3. 日志采集配置涉及到采集目录、发送到的目的地es端等信息,使用configmap的方式把配置文件挂载出来

部署

说明:

  1. 因本次实验环境修改了docker的默认存储目录,故涉及到docker相关目录挂载需进行替换
  2. 因为所有node节点都需要收集日志,故将所有node节点都打标签 fluentd=true

efk/fluentd-es-config-main.yaml

apiVersion: v1
data:
  fluent.conf: |-
    # This is the root config file, which only includes components of the actual configuration
    #
    #  Do not collect fluentd's own logs to avoid infinite loops.
    <match fluent.**>
    @type null
    </match>

    @include /fluentd/etc/config.d/*.conf
kind: ConfigMap
metadata:
  labels:
    addonmanager.kubernetes.io/mode: Reconcile
  name: fluentd-es-config-main
  namespace: logging

配置文件,fluentd-config.yaml,注意点:
1. 数据源source的配置,k8s会默认把容器的标准和错误输出日志重定向到宿主机中
2. 默认集成了 kubernetes_metadata_filter 插件,来解析日志格式,得到k8s相关的元数据,raw.kubernetes
3. match输出到es端的flush配置

efk/fluentd-configmap.yaml

kind: ConfigMap
apiVersion: v1
metadata:
  name: fluentd-config
  namespace: logging
  labels:
    addonmanager.kubernetes.io/mode: Reconcile
data:
  containers.input.conf: |-
    <source>
      @id fluentd-containers.log
      @type tail
      path /var/log/containers/*.log
      pos_file /var/log/es-containers.log.pos
      time_format %Y-%m-%dT%H:%M:%S.%NZ
      localtime
      tag raw.kubernetes.*
      format json
      read_from_head true
    </source>
    # Detect exceptions in the log output and forward them as one log entry.
    # https://github.com/GoogleCloudPlatform/fluent-plugin-detect-exceptions 
    <match raw.kubernetes.**>
      @id raw.kubernetes
      @type detect_exceptions
      remove_tag_prefix raw
      message log
      stream stream
      multiline_flush_interval 5
      max_bytes 500000
      max_lines 1000
    </match>
  output.conf: |-
    # Enriches records with Kubernetes metadata
    <filter kubernetes.**>
      @type kubernetes_metadata
    </filter>
    <match **>
      @id elasticsearch
      @type elasticsearch
      @log_level info
      include_tag_key true
      host elasticsearch
      port 9200
      logstash_format true
      request_timeout    30s
      <buffer>
        @type file
        path /var/log/fluentd-buffers/kubernetes.system.buffer
        flush_mode interval
        retry_type exponential_backoff
        flush_thread_count 2
        flush_interval 5s
        retry_forever
        retry_max_interval 30
        chunk_limit_size 2M
        queue_limit_length 8
        overflow_action block
      </buffer>
    </match>

fluentd.yaml内容注意点:
1、需要配置rbac规则,因为需要访问k8s api去根据日志查询元数据
2、需要将/var/log/containers/目录挂载到容器中
3、需要将fluentd的configmap中的配置文件挂载到容器内
4、想要部署fluentd的节点,需要添加fluentd=true的标签
5、因为docker默认目录被修改,故在目录挂载时也需要对应变更

docker info |grep Dir
Docker Root Dir: /srv/docker

efk/fluentd.yaml

apiVersion: v1
kind: ServiceAccount
metadata:
  name: fluentd-es
  namespace: logging
  labels:
    k8s-app: fluentd-es
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: fluentd-es
  labels:
    k8s-app: fluentd-es
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
rules:
- apiGroups:
  - ""
  resources:
  - "namespaces"
  - "pods"
  verbs:
  - "get"
  - "watch"
  - "list"
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: fluentd-es
  labels:
    k8s-app: fluentd-es
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
subjects:
- kind: ServiceAccount
  name: fluentd-es
  namespace: logging
  apiGroup: ""
roleRef:
  kind: ClusterRole
  name: fluentd-es
  apiGroup: ""
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
  labels:
    addonmanager.kubernetes.io/mode: Reconcile
    k8s-app: fluentd-es
  name: fluentd-es
  namespace: logging
spec:
  selector:
    matchLabels:
      k8s-app: fluentd-es
  template:
    metadata:
      labels:
        k8s-app: fluentd-es
    spec:
      containers:
      - env:
        - name: FLUENTD_ARGS
          #value: --no-supervisor -q
          value: "-c /fluentd/etc/fluent.conf"
        image: quay.mirrors.ustc.edu.cn/fluentd_elasticsearch/fluentd:v2.5.2
        #image: quay.io/fluentd_elasticsearch/fluentd:v3.1.0
        #image: registry.cn-hangzhou.aliyuncs.com/google_containers/fluentd_elasticsearch/fluentd:v2.5.2
        #image: 192.168.136.10:5000/fluentd_elasticsearch/fluentd:v2.5.2
        imagePullPolicy: IfNotPresent
        name: fluentd-es
        resources:
          limits:
            memory: 500Mi
          requests:
            cpu: 100m
            memory: 200Mi
        volumeMounts:
        - mountPath: /var/log
          name: varlog
        - mountPath: /srv/docker/containers
        #- mountPath: /var/lib/docker/containers
          name: varlibdockercontainers
          readOnly: true
        - mountPath: /fluentd/etc/config.d
          name: config-volume
        - mountPath: /fluentd/etc/fluent.conf
          name: config-volume-main
          subPath: fluent.conf
      nodeSelector:
        fluentd: "true"
      securityContext: {}
      serviceAccount: fluentd-es
      serviceAccountName: fluentd-es
      volumes:
      - hostPath:
          path: /var/log
          type: ""
        name: varlog
      - hostPath:
          #path: /var/lib/docker/containers
          path: /srv/docker/containers
          type: ""
        name: varlibdockercontainers
      - configMap:
          defaultMode: 420
          name: fluentd-config
        name: config-volume
      - configMap:
          defaultMode: 420
          items:
          - key: fluent.conf
            path: fluent.conf
          name: fluentd-es-config-main
        name: config-volume-main

给所有node节点打上标签,进行部署fluentd日志采集服务
$ kubectl label node node1 fluentd=true
$ kubectl label node node2 fluentd=true

创建服务
$ kubectl create -f fluentd-es-config-main.yaml
$ kubectl create -f fluentd-configmap.yaml
$ kubectl create -f fluentd.yaml

查看
$ kubectl -n logging get po -o wide

EFK功能验证

在kibana查看收集日志时,集群共有4个node节点,但是有一两个node节点日志没显示,可能是集群原有日志较大,等个十几分钟或者一段时间后,正常显示所有node节点日志。

验证思路

在node节点中启动服务,同时往标准输出中打印测试日志,到kibana中查看是否可以收集。

创建测试容器

efk/test-pod.yaml

apiVersion: v1
kind: Pod
metadata:
  name: counter
spec:
  nodeSelector:
    fluentd: "true"
  containers:
  - name: count
    image: alpine:3.6
    args: [/bin/sh, -c,
            'i=0; while true; do echo "$i: $(date)"; i=$((i+1)); sleep 1; done']

配置kibana

kibana-op1.png
kibana-op2.png
kibana-op3.png
kibana-op4.png

也可以通过其他元数据来过滤日志数据,比如可以单击任何日志条目以查看其他元数据,如容器名称,Kubernetes 节点,命名空间等,比如 kubernetes.pod_name : counter 。

Kibana 用户指南文档:https://www.elastic.co/guide/en/kibana/current/index.html