image.png

1. 简介

Kafka 是一种高吞吐量的分布式发布订阅消息系统,使用 Scala 编写。

Kafka 拥有作为一个消息系统应该具备的功能,但是确有着独特的设计。可以这样来说,Kafka 借鉴了 JMS 规范的思想,但是确并没有完全遵循 JMS 规范。

kafka 是一个分布式的,分区的消息(官方称之为 commit log )服务。让我们来看一下基础的消息(Message)相关术语:

  • Topic: Kafka 按照 Topic 分类来维护消息
  • Producer: 我们将发布(publish)消息到 Topic 的进程称之为生产者(producer)
  • Consumer: 我们将订阅(subscribe) Topic 并且处理 Topic 中消息的进程称之为消费者(consumer)
  • Broker: Kafka 以集群的方式运行,集群中的每一台服务器称之为一个代理(broker)。

image.png

服务端(brokers)和客户端(producer、consumer)之间通信通过 TCP 协议来完成。Kafka 提供了一个 Java 客户端,但是也可以使用其他语言编写的客户端。

kafka也是主从式的架构,主节点就叫controller,其余的为从节点,controller是需要和zookeeper进行配合管理整个kafka集群。

1.1 Kafka 的特性

(1)高吞吐量、低延迟:kafka 每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个 topic 可以分多个partition, consumer group 对 partition 进行 consume 操作;
(2)可扩展性:kafka 集群支持热扩展;
(3)持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失;
(4)容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败);
(5)高并发:支持数千个客户端同时读写;
(6)支持实时在线处理和离线处理:可以使用 Storm 这种实时流处理系统对消息进行实时进行处理,同时还可以使用Hadoop这种批处理系统进行离线处理;

kafka 是一个天然的分布式消息队列,一个 topic 的数据,是分散到多个机器上的,每个机器就放一部分数据。
image.png

kafka、activemq、rabbitmq、rocketmq 的优缺点

特性 ActiveMQ RabbitMQ RocketMQ kafka
单机吞吐量 万级 万级 10万级,高吞吐量 10万级,高吞吐量
topic 数量对吞吐量的影响 topic 可以达到几百几千个级别,吞吐量会有小幅度的下降;在同等机器下,可以支持大量的 topic topic 到达几千个的时候,吞吐量会有大幅度下降;支持大量 topic,需要增加更多的机器资源
时效性 ms级 微妙级,延迟是最低的 ms级 ms级以内
可用性 高,基于主从架构实现高可用性 高,基于主从架构实现高可用性 非常高,分布式架构 非常高,分布式的,可以做到0丢失
核心特点 MQ 领域功能完备 基于 erlang 开发,所以并发能力强,性能极好,延迟低 MQ功能完备,分布式,扩展性好 功能简单,支持简单的 MQ 功能。用于大数据领域的实时计算和数据采集
优势 成熟;功能强大,之前很多项目都在用 erlang 语言开发,性能极好。开源提供的管理界面很棒;最近几年中小型互联网用的比较多 接口简单,阿里品牌保障;吞吐性能好;支持大规模的topic数量;支持复杂的 MQ 业务场景 ms级延迟,极高的可用性和可靠性;分布式可以任意扩展
缺点 社区和国内应用的越来越少,吞吐量小 吞吐量底;erlang 语言懂得少,做定制难 技术有可能被抛弃 消息可能重复消费

1.2 Kafka 的使用场景

(1)日志收集:一个公司可以用 Kafka 可以收集各种服务的 log,通过 kafka 以统一接口服务的方式开放给各种consumer,例如 Hadoop、Hbase、Solr 等;
(2)消息系统:解耦和生产者和消费者、缓存消息等;
(3)用户活动跟踪:Kafka 经常被用来记录 web 用户或者 app 用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到 kafka 的 topic 中,然后订阅者通过订阅这些 topic 来做实时的监控分析,或者装载到 Hadoop、数据仓库中做离线分析和挖掘;
(4)运营指标:Kafka 也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告;
(5)流式处理:比如 Spark streaming 和 storm;
(6)事件源;

和其他队列比,kafka 的优势?

  1. 极致的性能 :基于 Scala 和 Java 语言开发,设计中大量使用了批量处理和异步的思想,最高可以每秒处理千万级别的消息。
  2. 生态系统兼容性无可匹敌 :Kafka 与周边生态系统的兼容性是最好的没有之一,尤其在大数据和流计算领域。


2. kafka 的一些概念

2.1 Topic、Partition 和 Log

Topic(主题) 是一个类别的名称,所有的 message 发送到 Topic 下面(这个东西类似于关系型数据库的表)。对于每一个 Topic,kafka 集群按照如下方式维护一个分区 (Partition,可以就理解为一个队列 Queue)日志文件:
image.png
partition 是一个有序的 message 序列,这些 message 按顺序添加到一个叫做 commit log 的文件中。每个 partition 中的消息都有一个唯一的编号,称之为 offset,用来唯一标示某个分区中的 message。

提示:每个 partition,都对应一个 commit-log。一个 partition 中的 message 的 offset 都是唯一的,但是不同的 partition 中的 message 的 offset 可能是相同的。
image.png
Topic 和 partition 像是 HBASE 里的 table 和 region 的概念,table 只是一个逻辑上的概念,真正存储数据的是region,这些 region 会分布式地存储在各个服务器上面,对应于 kafka,也是一样,Topic 也是逻辑概念 ,而partition 就是分布式存储单元。这个设计是保证了海量数据处理的基础。

注意:

  1. 分区会有单点故障,所以我们会为每个分区设置副本
  2. 分区的编号从 0 开始

kafka 集群,在配置的时间范围内,维护所有的由 producer 生成的消息,而不管这些消息有没有被消费。例如日志保留( log retention )时间默认被设置为7天。kafka 会维护最近 7 天生产的所有消息,而 7 天前的消息会被丢弃。kafka 的性能与保留的数据量的大小没有关系,因此保存大量的数据(日志信息)不会有什么影响。

每个 consumer 是基于自己在 commit log 中的消费进度(offset)来进行工作的。在 kafka 中,offset 由consumer 来维护:一般情况下我们按照顺序逐条消费 commit log 中的消息,当然我可以通过指定 offset 来重复消费某些消息,或者跳过某些消息。

这意味 kafka 中的 consumer 对集群的影响是非常小的,添加一个或者减少一个 consumer,对于集群或者其他consumer 来说,都是没有影响的,因为每个 consumer 维护各自的 offset。

对 log 进行分区(partitioned),有以下目的:

  • 当 log 文件大小超过系统文件系统的限制时,可以自动拆分。
  • 每个 partition 对应的 log 都受到所在机器的文件系统大小的限制,但是一个 Topic 中是可以有很多分区的,因此可以处理任意数量的数据。
  • 另一个方面,是为了提高并行度。


2.2 Distribution

log 的 partitions 分布在 kafka 集群中不同的 broker上,每个 broker 可以请求备份其他 broker 上 partition 上的数据。kafka 集群支持配置一个或多个 partition 备份的数量。

针对每个 partition,都有一个 broker 起到「leader」的作用,0 个或多个其他的 broker 作为「follwers」。leader 处理所有的针对这个 partition 的读写请求,而 followers 被动复制 leader 的结果。如果这个 leader 失效了,其中的一个 follower 将会自动的变成新的 leader。每个 broker 都是自己所管理的 partition 的 leader,同时又是其他 broker 所管理 partitions 的 followers,kafka 通过这种方式来达到负载均衡

生产者在发送数据的时候,是直接发送到 leader partition 里面 ,然后 follower partition 会去 leader 那里自行同步数据,消费者消费数据的时候,也是从 leader 那去消费数据的 。

image.png

2.3 Producers 生产者

生产者将消息发送到 topic 中去,同时负责选择将 message 发送到 topic 的哪一个 partition 中。通过round-robin做简单的负载均衡。也可以根据消息中的某一个关键字来进行区分。通常第二种方式使用的更多。

2.4 Consumers 消费者

传统的消息传递模式有2种:队列( queuing)和订阅广播( publish-subscribe)。

在 queuing 模式中,多个 consumer 从服务器中读取数据,消息只会到达一个 consumer。
在 publish-subscribe 模型中,消息会被广播给所有的 consumer。

Kafka 基于这 2 种模式提供了一种 consumer 的抽象概念:consumer group。

每个 consumer 都要标记自己属于哪一个 consumer group。发布到 topic 中的 message 会被传递到 consumer group 中的一个 consumer 实例。consumer 实例可以运行在不同的进程上,也可以在不同的物理机器上。

如果所有的 consumer 都位于同一个 consumer group 下,这就类似于传统的 queue 模式,并在众多的consumer instance 之间进行负载均衡。

如果所有的 consumer 都有着自己唯一的 consumer group,这就类似于传统的 publish-subscribe 模型。
更一般的情况是,通常一个 topic 会有几个 consumer group,每个 consumer group 都是一个逻辑上的订阅者( logical subscriber )。每个 consumer group 由多个 consumer instance(消费者实例) 组成,从而达到可扩展和容灾的功能。这并没有什么特殊的地方,仅仅是将 publish-subscribe 模型中的运行在单个进程上的 consumers 中的 consumer 替换成一个 consumer group。如下图所示:
image.png

说明:由2个 broker 组成的 kafka 集群,总共有4个 Parition(P0-P3)。这个集群由 2 个 Consumer Group, A 有 2 个 consumer instances ,而 B 有 4 个。同一个 consumer group 中,不同的 consumer 不能消费相同的 parition。

Consumer Group - 消费者组

我们在消费数据时会在代码里面指定一个 group.id,这个 id 代表的是消费组的名字,而且这个 group.id 就算不设置,系统也会默认设置

  1. conf.setProperty("group.id","tellYourDream")

比如现在consumerA去消费了一个topicA里面的数据。

  1. consumerA:
  2. group.id = a
  3. consumerB:
  4. group.id = a
  5. consumerC:
  6. group.id = b
  7. consumerD:
  8. group.id = b

再让 consumerB 也去消费 TopicA 的数据,它是消费不到了,但是我们在 consumerC 中重新指定一个另外的group.id,consumerC 是可以消费到topicA的数据的。而 consumerD 也是消费不到的,所以在 kafka 中,不同组可有唯一的一个消费者去消费同一主题的数据 。
image.png
如图,因为前面提到过了消费者会直接和leader建立联系,所以它们分别消费了三个leader,所以一个分区不会让消费者组里面的多个消费者去消费 ,但是在消费者不饱和的情况下,一个消费者是可以去消费多个分区的数据的 。
**

2.5 消费顺序

Kafka 比传统的消息系统有着更强的顺序保证。在传统的情况下,服务器按照顺序保留消息到队列,如果有多个consumer 来消费队列中的消息,服务器会按照接受消息的顺序向外提供消息。但是,尽管服务器是按照顺序提供消息,但是消息传递到每一个 consumer 是异步的,这可能会导致先消费的 consumer 获取到消息时间可能比后消费的 consumer 获取到消息的时间长,导致最终不能保证顺序性。(即当进行并行的消费的时候,消息在多个 consumer之间可能会失去顺序性)。此时消息系统通常会采取一种「exclusive consumer(独家消费)」的概念,来确保同一时间内只有一个 consumer 能够从队列中进行消费,但是这实际上意味着在消息处理的过程中是不支持并行的。

Kafka 在这方面做的更好。通过 Topic 中并行度的概念,即 partition。Kafka 可以同时提供顺序性保证和多个consumer 同时消费时的负载均衡。实现的原理是通过将一个 topic 中的 partition 分配给一个 consumer group中的不同 consumer instance。通过这种方式,我们可以保证一个 partition 在同一个时刻只有一个 consumer instance 在消费消息,从而保证顺序。虽然一个 topic 中有多个 partition,但是一个 consumer group 中同时也有多个 consumer instance,通过合理的分配依然能够保证负载均衡。需要注意的是,一个 consumer group 中的 consumer instance 的数量不能比一个 Topic 中的 partition 的数量多。
**
Kafka 只在 partition 的范围内保证消息消费的局部顺序性,不能在同一个 topic 中的多个 partition 中保证总的消费顺序性。通常来说,这已经可以满足大部分应用的需求。但是,如果的确有在总体上保证消费的顺序的需求的话,那么我们可以通过将 topic 的 partition 数量设置为 1,将 consumer group 中的 consumer instance 数量也设置为1.

2.6 Guarantees

从较高的层面上来说的话,Kafka 提供了以下的保证:

  • 发送到一个 Topic 中的 message 会按照发送的顺序添加到 commit log 中。意思是,如果消息 M1,M2由同一个 producer 发送,M1 比 M2 发送的早的话,那么在 commit log 中,M1 的 offset 就会比 commit 2 的 offset 小。
  • 一个 consumer 在 commit log 中可以按照发送顺序来消费 message
  • 如果一个 topic 的备份因子( replication factor )设置为N,那么 Kafka 可以容忍 N-1 个服务器的失败,而存储在 commit log 中的消息不会丢失。


3. kafka 集群搭建和使用

安装前的环境准备
由于 Kafka 是用 Scala 语言开发的,运行在 JVM上,因此在安装 Kafka 之前需要先安装 JDK。

  1. # yum install java-1.8.0-openjdk* -y

kafka依赖zookeeper,所以需要先安装zookeeper

  1. # wget http://mirror.bit.edu.cn/apache/zookeeper/stable/zookeeper-3.4.12.tar.gz
  2. # tar -zxvf zookeeper-3.4.12.tar.gz
  3. # cd zookeeper-3.4.12
  4. # cp conf/zoo_sample.cfg conf/zoo.cfg

启动zookeeper

  1. # bin/zkServer.sh start
  2. # bin/zkCli.sh
  3. # ls / #查看zk的根目录相关节点


第一步:下载安装包
下载1.1.0 release版本,并解压:

  1. # wget https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz
  2. # tar -xzf kafka_2.11-1.1.0.tgz
  3. # cd kafka_2.11-1.1.0


第二步:启动服务

现在来启动 kafka 服务:
启动脚本语法:kafka-server-start.sh [-daemon] server.properties
可以看到,server.properties 的配置路径是一个强制的参数,-daemon 表示以后台进程运行,否则 ssh 客户端退出后,就会停止服务。(注意,在启动 kafka 时会使用linux主机名关联的ip地址,所以需要把主机名和linux的ip映射配置到本地 host 里,用vim /etc/hosts)

  1. # bin/kafka-server-start.sh -daemon config/server.properties

我们进入zookeeper目录通过zookeeper客户端查看下zookeeper的目录树

  1. # bin/zkCli.sh
  2. # ls / #查看zk的根目录kafka相关节点
  3. # ls /brokers/ids #查看kafka节点


第三步:创建主题
现在我们来创建一个名字为「test」的Topic,这个 topic 只有一个 partition,并且备份因子也设置为 1:

  1. # bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

现在我们可以通过以下命令来查看kafka中目前存在的topic

  1. # bin/kafka-topics.sh --list --zookeeper localhost:2181

除了我们通过手工的方式创建 Topic,我们可以配置 broker,当 producer 发布一个消息某个指定的 Topic,但是这个 Topic 并不存在时,就自动创建。

第四步:发送消息

kafka 自带了一个 producer 命令客户端,可以从本地文件中读取内容,或者我们也可以以命令行中直接输入内容,并将这些内容以消息的形式发送到kafka集群中。在默认情况下,每一个行会被当做成一个独立的消息。
首先我们要运行发布消息的脚本,然后在命令中输入要发送的消息的内容:

  1. # bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
  2. >this is a msg
  3. >this is a another msg


第五步:消费消息

对于consumer,kafka同样也携带了一个命令行客户端,会将获取到内容在命令中进行输出:

  1. # bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning #老版本
  2. # bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup --consumer-property client.id=consumer-1 --topic test #新版本

如果你是通过不同的终端窗口来运行以上的命令,你将会看到在producer终端输入的内容,很快就会在consumer的终端窗口上显示出来。

以上所有的命令都有一些附加的选项;当我们不携带任何参数运行命令的时候,将会显示出这个命令的详细用法。
还有一些其他命令如下:

查看组名

  1. # bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list --new-consumer

查看消费者的消费偏移量

  1. # bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testGroup

消费多主题

  1. # bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist "test|test-2"


单播消费
一条消息只能被某一个消费者消费的模式,类似queue模式,只需让所有消费者在同一个消费组里即可
分别在两个客户端执行如下消费命令,然后往主题里发送消息,结果只有一个客户端能收到消息

  1. # bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup --topic test

多播消费

一条消息能被多个消费者消费的模式,类似publish-subscribe模式费,针对Kafka同一条消息只能被同一个消费组下的某一个消费者消费的特性,要实现多播只要保证这些消费者属于不同的消费组即可。我们再增加一个消费者,该消费者属于testGroup-2消费组,结果两个客户端都能收到消息

  1. # bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup-2 --topic test


第六步:kafka集群配置

到目前为止,我们都是在一个单节点上运行 broker,这并没有什么意思。对于kafka来说,一个单独的broker意味着kafka集群中只有一个接点。要想增加kafka集群中的节点数量,只需要多启动几个broker实例即可。为了有更好的理解,现在我们在一台机器上同时启动三个broker实例。

首先,我们需要建立好其他2个broker的配置文件:

  1. # cp config/server.properties config/server-1.properties
  2. # cp config/server.properties config/server-2.properties

配置文件的内容分别如下:

  1. config/server-1.properties:
  2. broker.id=1
  3. listeners=PLAINTEXT://:9093
  4. log.dir=/tmp/kafka-logs-1
  5. config/server-2.properties:
  6. broker.id=2
  7. listeners=PLAINTEXT://:9094
  8. log.dir=/tmp/kafka-logs-2


broker.id 属性在 kafka 集群中必须要是唯一的。我们需要重新指定 port 和 log 目录,因为我们是在同一台机器上运行多个实例。如果不进行修改的话,consumer 只能获取到一个 instance 实例的信息,或者是相互之间的数据会被影响。

目前我们已经有一个 zookeeper 实例和一个 broker实 例在运行了,现在我们只需要在启动2个 broker 实例即可:

  1. # bin/kafka-server-start.sh -daemon config/server-1.properties
  2. # bin/kafka-server-start.sh -daemon config/server-2.properties

现在我们创建一个新的topic,备份因子设置为3:

  1. # bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

现在我们已经有了集群,并且创建了一个3个备份因子的topic,但是到底是哪一个broker在为这个topic提供服务呢(因为我们只有一个分区,所以肯定同时只有一个broker在处理这个topic)?

  1. # bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

image.png

以下是输出内容的解释,第一行是所有分区的概要信息,之后的每一行表示每一个 partition 的信息。因为目前我们只有一个 partition,因此关于 partition 的信息只有一行。

  • leader 节点负责给定 partition 的所有读写请求。
  • replicas 表示某个 partition 在哪几个 broker 上存在备份。不管这个几点是不是「leader」,甚至这个节点挂了,也会列出。
  • isr 是 replicas 的一个子集,它只列出当前还存活着的,并且备份了该 partition 的节点。

现在我们的案例中,0 号节点是leader,即使用 server.properties 启动的那个进程。
我们可以运行相同的命令查看之前创建的名称为「test」的 topic

  1. # bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

image.png

现在我们向新建的topic中发送一些 message:

  1. # bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
  2. >my test msg 1
  3. >my test msg 2

现在开始消费:

  1. # bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
  2. my test msg 1
  3. my test msg 2

现在我们来测试我们容错性,因为 broker0 目前是 leader,所以我们要将其 kill

  1. # ps -ef | grep server.properties
  2. # kill -9 1177

现在再执行命令:

  1. # bin/kafka-topics.sh --describe --zookeeper localhost:9092 --topic my-replicated-topic

image.png

我们可以看到,leader 节点已经变成了 broker 2。要注意的是,在 Isr 中,已经没有了 0 号节点。
leader 的选举也是从 ISR(in-sync replica) 中进行的。

此时,我们依然可以 消费新消息:

  1. # bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
  2. my test msg 1
  3. my test msg 2

查看主题分区对应的leader信息:
image.png

4. kafka 原理分析

image.png

image.png

4.1 producer 发布消息

1.写入方式
producer 采用 push 模式将消息发布到 broker,每条消息都被 append 到 patition 中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 kafka 吞吐率)。

2.消息路由
producer 发送消息到 broker 时,会根据分区算法选择将其存储到哪一个 partition。其路由机制为:

  1. 指定了 patition,则直接使用;
  2. 未指定 patition 但指定 key,通过对 key 的 value 进行 hash 选出一个 patition
  3. patition 和 key 都未指定,使用轮询选出一个 patition。

源码如下:

  1. private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
  2. Integer partition = record.partition();
  3. // 如果没有指定分区,默认选择自己的分区,否则的话再进行判断
  4. return partition != null ? partition : this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
  5. }
  6. public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  7. List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
  8. int numPartitions = partitions.size();
  9. if (keyBytes == null) {
  10. // patition 和 key 都未指定,使用轮询选出一个 patition。
  11. int nextValue = this.nextValue(topic);
  12. List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
  13. if (availablePartitions.size() > 0) {
  14. int part = Utils.toPositive(nextValue) % availablePartitions.size();
  15. return ((PartitionInfo)availablePartitions.get(part)).partition();
  16. } else {
  17. return Utils.toPositive(nextValue) % numPartitions;
  18. }
  19. } else {
  20. // key 指定,partition 未指定,通过对 key 的 value 进行 hash 选出一个 patition
  21. return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
  22. }
  23. }

3.写入流程

image.png

  1. producer 先从 zookeeper 的 /brokers/.../state 节点找到该 partition 的 leader

  2. producer 将消息发送给该 leader

  3. leader 将消息写入本地 log

  4. followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK

  5. leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK


    4.2 分区 leader 选举原理

    image.png

    4.3 zookeeper 在 kafka 中的作用

    zookeeper 中存储了 kafka 的一些元数据(topic、分区、broker 等)
    (1)无论是kafka集群,还是producer和consumer都依赖于 zookeeper 来保证系统可用性集群保存一些meta信息。
    (2)Kafka 使用 zookeeper 作为其分布式协调框架,很好的将消息生产、消息存储、消息消费的过程结合在一起。
    (3)同时借助 zookeeper,kafka 能够生产者、消费者和broker在内的所以组件在无状态的情况下,建立起生产者和消费者的订阅关系,并实现生产者与消费者的负载均衡。

image.png

我们看上面的图,我们把 broker 的数量减少,叧有一台。现在假设我们按照上图进行部署:
(1)Server-1 broker 其实就是 kafka 的 server,因为 producer 和 consumer 都要去还它。 Broker 主要还是做存储用。
(2)Server-2 是 zookeeper 的 server 端,它维持了一张表,记录了各个节点的 IP、端口等信息。
(3)Server-3、 4、 5 他们的共同之处就是都配置了 zkClient,更明确的说,就是运行前必须配置 zookeeper的地址,道理也很简单,这之间的连接都是需要 zookeeper 来进行分发的。
(4)Server-1 和 Server-2 的关系,他们可以放在一台机器上,也可以分开放,zookeeper 也可以配集群。目的是防止某一台挂了。

所有的 broker 在启动的时候都会往 zookeeper 进行注册,目的就是选举出一个 controller,这个选举过程非常简单粗暴,就是一个谁先谁当的过程,不涉及什么算法问题。

那成为 controller 之后要做啥呢,它会监听 zookeeper 里面的多个目录

例如有一个目录/brokers/,其他从节点往这个目录上注册(就是往这个目录上创建属于自己的子目录而已) 自己,这时命名规则一般是它们的id编号,比如/brokers/0,1,2

注册时各个节点必定会暴露自己的主机名,端口号等等的信息,此时controller就要去读取注册上来的从节点的数据(通过监听机制),生成集群的元数据信息,之后把这些信息都分发给其他的服务器,让其他服务器能感知到集群中其它成员的存在 。

此时模拟一个场景,我们创建一个主题(其实就是在zookeeper上/topics/topicA这样创建一个目录而已),kafka会把分区方案生成在这个目录中,此时controller就监听到了这一改变,它会去同步这个目录的元信息,然后同样下放给它的从节点,通过这个方法让整个集群都得知这个分区方案,此时从节点就各自创建好目录等待创建分区副本即可。这也是整个集群的管理机制。

5. Java 中 kafka 使用

1.增加依赖

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>1.1.0</version>
  5. </dependency>

2.生产者

  1. public class MsgProducer {
  2. public static void main(String[] args) throws InterruptedException, ExecutionException {
  3. Properties props = new Properties();
  4. props.put("bootstrap.servers", "192.168.0.60:9092,192.168.0.60:9093");
  5. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  6. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  7. Producer<String, String> producer = new KafkaProducer<>(props);
  8. for (int i = 0; i < 5; i++) {
  9. //同步方式发送消息
  10. ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("topic-replica-zhuge2", 0, Integer.toString(i), Integer.toString(i));
  11. /*Future<RecordMetadata> result = producer.send(producerRecord);
  12. //等待消息发送成功的同步阻塞方法
  13. RecordMetadata metadata = result.get();
  14. System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"
  15. + metadata.partition() + "|offset-" + metadata.offset());*/
  16. //异步方式发送消息
  17. producer.send(producerRecord, new Callback() {
  18. @Override
  19. public void onCompletion(RecordMetadata metadata, Exception exception) {
  20. if (exception != null) {
  21. System.err.println("发送消息失败:" + exception.getStackTrace());
  22. }
  23. if (metadata != null) {
  24. System.out.println("异步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"
  25. + metadata.partition() + "|offset-" + metadata.offset());
  26. }
  27. }
  28. });
  29. //其他操作
  30. //doSomeThing();
  31. }
  32. producer.close();
  33. }
  34. }

3.消费者

  1. public class MsgConsumer {
  2. public static void main(String[] args) {
  3. Properties props = new Properties();
  4. props.put("bootstrap.servers", "192.168.0.60:9092,192.168.0.60:9093");
  5. // 消费分组名
  6. props.put("group.id", "testGroup");
  7. // 是否自动提交offset
  8. /*props.put("enable.auto.commit", "true");
  9. // 自动提交offset的间隔时间
  10. props.put("auto.commit.interval.ms", "1000");*/
  11. //props.put("enable.auto.commit", "false");
  12. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  13. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  14. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  15. // 消费主题
  16. consumer.subscribe(Arrays.asList("topic-replica-zhuge2"));
  17. // 消费指定分区
  18. //consumer.assign(Arrays.asList(new TopicPartition("topic-replica-zhuge2", 0)));
  19. while (true) {
  20. /*
  21. * poll() API 是拉取消息的长轮询,主要是判断consumer是否还活着,只要我们持续调用poll(),消费者就会存活在自己所在的group中,
  22. * 并且持续的消费指定partition的消息。底层是这么做的:消费者向server持续发送心跳,如果一个时间段(session.
  23. * timeout.ms)consumer挂掉或是不能发送心跳,这个消费者会被认为是挂掉了,
  24. * 这个Partition也会被重新分配给其他consumer
  25. */
  26. ConsumerRecords<String, String> records = consumer.poll(1000);
  27. for (ConsumerRecord<String, String> record : records) {
  28. System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
  29. }
  30. if (records.count() > 0) {
  31. // 提交offset
  32. consumer.commitSync();
  33. }
  34. }
  35. }
  36. }

6. kafka 性能好在什么地方

① 顺序写

操作系统每次从磁盘读写数据的时候,需要先寻址,也就是先要找到数据在磁盘上的物理位置,然后再进行数据读写,如果是机械硬盘,寻址就需要较长的时间。

kafka的设计中,数据其实是存储在磁盘上面,一般来说,会把数据存储在内存上面性能才会好。但是kafka用的是顺序写,追加数据是追加到末尾,磁盘顺序写的性能极高,在磁盘个数一定,转数达到一定的情况下,基本和内存速度一致

随机写的话是在文件的某个位置修改数据,性能会较低。

② 零拷贝

先来看看非零拷贝的情况
01 集群搭建和原理分析 - 图19

可以看到数据的拷贝从内存拷贝到kafka服务进程那块,又拷贝到socket缓存那块,整个过程耗费的时间比较高,kafka利用了 Linux 的 sendFile 技术(NIO),省去了进程切换和一次数据拷贝,让性能变得更好。
image.png

③ 日志分段存储


Kafka规定了一个分区内的.log文件最大为1G,做这个限制目的是为了方便把.log加载到内存去操作

  1. 00000000000000000000.index
  2. 00000000000000000000.log
  3. 00000000000000000000.timeindex
  4. 00000000000005367851.index
  5. 00000000000005367851.log
  6. 00000000000005367851.timeindex
  7. 00000000000009936472.index
  8. 00000000000009936472.log
  9. 00000000000009936472.timeindex

这个9936472之类的数字,就是代表了这个日志段文件里包含的起始offset,也就说明这个分区里至少都写入了接近1000万条数据了。

Kafka broker有一个参数,log.segment.bytes,限定了每个日志段文件的大小,最大就是1GB,一个日志段文件满了,就自动开一个新的日志段文件来写入,避免单个文件过大,影响文件的读写性能,这个过程叫做log rolling,正在被写入的那个日志段文件,叫做 active log segment。

④ kafka 的网络设计

image.png

首先客户端发送请求全部会先发送给一个 Acceptor,broker 里面会存在3个线程(默认是3个),这3个线程都是叫做 processor,Acceptor 不会对客户端的请求做任何的处理,直接封装成一个个 socketChannel 发送给这些processor 形成一个队列,发送的方式是轮询,就是先给第一个 processor 发送,然后再给第二个,第三个,然后又回到第一个。消费者线程去消费这些 socketChannel 时,会获取一个个 request 请求,这些 request 请求中就会伴随着数据。

线程池里面默认有8个线程,这些线程是用来处理request的,解析请求,如果request是写请求,就写到磁盘里。读的话返回结果。processor 会从 response中读取响应数据,然后再返回给客户端。这就是Kafka的网络三层架构。

所以如果我们需要对 kafka 进行增强调优,增加 processor 并增加线程池里面的处理线程,就可以达到效果。request 和 response 那一块部分其实就是起到了一个缓存的效果,是考虑到 processor 们生成请求太快,线程数不够不能及时处理的问题。

所以这就是一个加强版的 reactor 网络线程模型。

资料