Kafka 是一个高吞吐量的分布式发布订阅日志服务,具有高可用、高性能、分布式、高扩展、持久性等特性。目前已经在各大公司中广泛使用。和之前采用 Redis 做轻量级消息队列不同,Kafka 利用磁盘作队列,所以也就无所谓消息缓冲时的磁盘问题。此外,如果公司内部已有 Kafka 服务在运行,logstash 也可以快速接入,免去重复建设的麻烦。
    如果打算新建 Kafka 系统的,请参考 Kafka 官方入门文档:http://kafka.apache.org/documentation.html#quickstart
    kafka 基本概念

    以下仅对相关基本概念说明,更多概念见官方文档:
    Topic 主题,声明一个主题,producer指定该主题发布消息,订阅该主题的consumer对该主题进行消费
    Partition 每个主题可以分为多个分区,每个分区对应磁盘上一个目录,分区可以分布在不同broker上,producer在发布消息时,可以通过指定partition key映射到对应分区,然后向该分区发布消息,在无partition key情况下,随机选取分区,一段时间内触发一次(比如10分钟),这样就保证了同一个producer向同一partition发布的消息是顺序的。 消费者消费时,可以指定partition进行消费,也可以使用high-level-consumer api,自动进行负载均衡,并将partition分给consumer,一个partition只能被一个consumer进行消费。
    Consumer 消费者,可以多实例部署,可以批量拉取,有两类API可供选择:一个simpleConsumer,暴露所有的操作给用户,可以提交offset、fetch offset、指定partition fetch message;另外一个high-level-consumer(ZookeeperConsumerConnector),帮助用户做基于partition自动分配的负载均衡,定期提交offset,建立消费队列等。simpleConsumer相当于手动挡,high-level-consumer相当于自动挡。
    simpleConsumer:无需像high-level-consumer那样向zk注册brokerid、owner,甚至不需要提交offset到zk,可以将offset提交到任意地方比如(mysql,本地文件等)。
    high-level-consumer:一个进程中可以启多个消费线程,一个消费线程即是一个consumer,假设A进程里有2个线程(consumerid分别为1,2),B进程有2个线程(consumerid分别为1,2),topic1的partition有5个,那么partition分配是这样的:
    partition1 —-> A进程consumerid1
    partition2 —-> A进程consumerid1
    partition3 —-> A进程consumerid2
    partition4 —-> B进程consumer1
    partition5 —-> B进程consumer2
    Group High-level-consumer可以声明group,每个group可以有多个consumer,每group各自管理各自的消费offset,各个不同group之间互不关联影响。
    由于目前版本消费的offset、owner、group都是consumer自己通过zk管理,所以group对于broker和producer并不关心,一些监控工具需要通过group来监控,simpleComsumer无需声明group。
    小提示

    以上概念是 logstash 的 kafka 插件的必要参数,请理解阅读,对后续使用 kafka 插件有重要作用。logstash-kafka-input 插件使用的是 High-level-consumer API。


    对日志处理流程如下:

    [input] -> [splitter] -> [decoder] -> [filter] -> [encoder] -> [output]

    [input]: 输入插件从外部获得数据,并将其读入Heka管道进行处理,支持数据读取方式包含:网络传输(tcp/udp),消息队列支持(AMQP/KAFKA),文件监听等多种方式.
    [splitter]: 对数据流进行分流,以区分不同来源日志.
    [decoder]: 对数据流进行编码,把 message 的结构各个属性解析出来,支持 Lua 对数据进行编码处理.
    [filter]: Heka的处理引擎,可以对数据流进行过滤逻辑,包含统计/聚合甚至做些统计等操作.
    [encoder] 相对[decoder]来说是互逆操作,重新把消息内容编码为一定的格式,格式化输出.
    [output] 根据指定的匹配规则把某类型的消息输出到外部系统,支持多种协议和方式.
    以上是整个流程各个步骤实现的功能,当然实际使用的话,这几步不是必须都要有的,如果接收到的数据,没有加密或其他特殊格式需要单独编码,可以直接处理,那Decoder就不需要. 甚至不想输出任何东西,不提供Outputs都可以.