go-stash是一个高效的从Kafka获取,根据配置的规则进行处理,然后发送到ElasticSearch集群的工具。
go-stash有大概logstash 5倍的吞吐性能,并且部署简单,一个可执行文件即可。

go-stash旨在一些场景替换logstash使用,部署和使用更轻量,效率也相对更高。 另外在一定程度上与filebeat相似。

每日一库之109:go-stash - 图1

安装

  1. cd stash && go build stash.go

Quick Start

  • 可执行文件方式

    1. ./stash -f etc/config.yaml
  • docker 方式,确保配置文件路径正确

    1. docker run -d -v `pwd`/etc:/app/etc kevinwan/go-stash

    config.yaml示例如下: ```yaml Clusters:

  • Input: Kafka:
    1. Name: go-stash
    2. Log:
    3. Mode: file
    4. Brokers:
    5. - "172.16.48.41:9092"
    6. - "172.16.48.42:9092"
    7. - "172.16.48.43:9092"
    8. Topic: ngapplog
    9. Group: stash
    10. Conns: 3
    11. Consumers: 10
    12. Processors: 60
    13. MinBytes: 1048576
    14. MaxBytes: 10485760
    15. Offset: first
    Filters:
    • Action: drop Conditions:
      • Key: status Value: 503 Type: contains
      • Key: type Value: “app” Type: match Op: and
    • Action: remove_field Fields:
      • message
      • source
      • beat
      • fields
      • input_type
      • offset
      • “@version”
      • _score
      • _type
      • clientip
      • http_host
      • request_time Output: ElasticSearch: Hosts:
        • http://172.16.188.73:9200
        • http://172.16.188.74:9200
        • http://172.16.188.75:9200“ Index: “go-stash-{{yyyy.MM.dd}}” MaxChunkBytes: 5242880 GracePeriod: 10s Compress: false TimeZone: UTC
          1. <a name="iWiSM"></a>
          2. ## 详细参数说明
          3. <a name="NPrLl"></a>
          4. ### input
          5. ```bash
          6. Conns: 3
          7. Consumers: 10
          8. Processors: 60
          9. MinBytes: 1048576
          10. MaxBytes: 10485760
          11. Offset: first

          Conns

          链接kafka的链接数,链接数依据cpu的核数,一般<= CPU的核数;

          Consumers

          每个连接数打开的线程数,计算规则为Conns Consumers,不建议超过分片总数,比如topic分片为30,Conns Consumers <= 30

          Processors

          处理数据的线程数量,依据CPU的核数,可以适当增加,建议配置:Conns Consumers 2 或 Conns Consumers 3,例如:60 或 90

          MinBytes MaxBytes

          每次从kafka获取数据块的区间大小,默认为1M~10M,网络和IO较好的情况下,可以适当调高

          Offset

          可选last和false,默认为last,表示从头从kafka开始读取数据

          Filters

          ```yaml
  • Action: drop Conditions:
    • Key: k8s_container_name Value: “-rpc” Type: contains
    • Key: level Value: info Type: match Op: and
  • Action: remove_field Fields:
    • message
    • _source
    • _type
    • _score
    • _id
    • “@version”
    • topic
    • index
    • beat
    • docker_container
    • offset
    • prospector
    • source
    • stream
  • Action: transfer Field: message Target: data ```

    - Action: drop

  • 删除标识:满足此条件的数据,在处理时将被移除,不进入es

  • 按照删除条件,指定key字段及Value的值,Type字段可选contains(包含)或match(匹配)
  • 拼接条件Op: and,也可写or

    - Action: remove_field

    移除字段标识:需要移除的字段,在下面列出即可

    - Action: transfer

    转移字段标识:例如可以将message字段,重新定义为data字段

    Output

    Index

    索引名称,indexname-{{yyyy.MM.dd}}表示年.月.日,也可以用{{yyyy-MM-dd}},格式自己定义

    MaxChunkBytes

    每次往ES提交的bulk大小,默认是5M,可依据ES的io情况,适当的调整

    GracePeriod

    默认为10s,在程序关闭后,在10s内用于处理余下的消费和数据,优雅退出

    Compress

    数据压缩,压缩会减少传输的数据量,但会增加一定的处理性能,可选值true/false,默认为false

    TimeZone

    默认值为UTC,世界标准时间

    ES性能写入测试

    测试环境

  • stash服务器:3台 4核 8G

  • es服务器: 15台 16核 64G

    关键配置

    ```yaml
  • Input:
    1. Conns: 3
    2. Consumers: 10
    3. Processors: 60
    4. MinBytes: 1048576
    5. MaxBytes: 10485760
    Filters:
    • Action: remove_field Fields:
      • message
      • source
      • beat
      • fields
      • input_type
      • offset
      • request_time Output: Index: “nginx_pro-{{yyyy.MM.d}}” Compress: false MaxChunkBytes: 5242880 TimeZone: UTC ```

写入速度平均在15W/S以上

每日一库之109:go-stash - 图2