Apache Kafka 是一款开源的消息引擎系统, 也是一个分布式流处理平台(Distributed Streaming Platform)

  • 处理实时数据提供一个统一、高吞吐、低延迟的平台。
  • 它使用的是纯二进制的字节序列, 以时间复杂度为 O(1) 的方式提供消息持久化能力,即使对 TB 级以上数据也能保证常数时间复杂度的访问性能。
  • 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒 100K 条以上消息的传输。
  • 支持 Kafka Server 间的消息分区,及分布式消费,同时保证每个 Partition 内的消息顺序传输。
  • 同时支持离线数据处理和实时数据处理。
  • Scale out:支持在线水平扩展。

高性能缘由

  • 并行处理
  • 批处理
  • 顺序 IO
  • 数据压缩
  • 零拷贝
  • Page Cache

Kafka功能

削峰填谷、异步通信、业务解耦、混沌磐涅

KafKa传输模型

Kfaka有两种传输模型,分别是基于一对一、多对多的思想。

  • 一对一:一般也称之为消息队列模型,系统 A 发送的消息只能被系统 B 接收,其他任何系统都不能读取 A 发送的消息。
  • 多对多:一般称之为发布订阅模型。与上面不同的是,它有一个主题(Topic)的概念,该模型也有发送方和接收方,只不过提法不同。发送方也称为发布者(Publisher)接收方称为订阅者(Subscriber)。和点对点模型不同的是,这个模型可能存在多个发布者向相同的主题发送消息,而订阅者也可能存在多个,它们都能接收到相同主题的消息。

Kafka术语

客户端

  • 生产者(Producer): 向主题发布消息的客户端应用程序,生产者程序通常持续不断地向一个或多个主题发送消息。
  • 消费者(Consumer): 订阅这些主题消息的客户端应用程序。和生产者类似,消费者也能够同时订阅多个主题的消息。
  • 消费者组(Consumer Group):多个消费者实例共同组成的一个组,同时消费多个分区以实现高吞吐。
  • 消费者实例(Consumer Instance): 运行消费者应用的进程,也可以是一个线程。

message

  • 主题(Topic):在 Kafka 中,发布订阅的对象是主题(Topic),你可以为每个业务、每个应用甚至是每类数据都创建专属的主题。
  • 分区(Partitioning):将每个主题划分成多个分区(Partition),每个分区是一组有序的消息日志。生产者生产的每条消息只会被发送到一个分区中
  • 消息(Record):Kafka 是消息引擎,这里的消息就是指 Kafka 处理的主要对象。

服务端

  • Broker: Kafka 的服务器端由被称为 Broker 的服务进程构成,即一个 Kafka 集群由多个 Broker 组成,Broker 负责接收和处理客户端发送过来的请求,以及对消息进行持久化。虽然多个 Broker进程能够运行在同一台机器上,但更常见的做法是将不同的 Broker 分散运行在不同的机器上,这样如果集群中某一台机器宕机,即使在它上面运行的所有 Broker 进程都挂掉了,其他机器上的 Broker 也依然能够对外提供服务。
  • Replication: 把相同的数据拷贝到多台机器上,而这些相同的数据拷贝在 Kafka 中被称为副本(Replica)副本的数量是可以配置的,这些副本保存着相同的数据,但却有不同的角色和作用。
    • Kafka 定义了两类副本:
      • 领导者副本(Leader Replica):对外提供服务,这里的对外指的是与客户端程序进行交互;
      • 追随者副本(Follower Replica):被动地追随领导者副本,不能与外界进行交互。
  • 重平衡(Rebalance): 消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。
  • 消费者位移(Consumer Offset): 表征消费者消费进度,每个消费者都有自己的消费者位移。

Kafka - 图1

Kafka - 图2

消息系统必要性

  • 解耦:消息系统在处理过程中插入了一个隐含的,基于数据接口, 两边的处理过程都要实现这一接口.这允许独立拓展或修改两边的处理过程. 只要确保他们遵循同样的接口约束. 而基于消息发布订阅的机制, 可以联动多个业务下流子系统,能够不侵入的情况下的情况下分布编排和开发,来保证数据一致性
  • 冗余:有些情况下,处理数据的过程中会失败.除非数据被持久化,否则将造成丢失.消息队列吧数据进行持久化直到已经完全被处理, 通过这一方式可规避数据丢失,许多消息队列所采用的”插入-获取-删除”的范式,把一个消息从队列中删除之前,需要处理系统明确指出该消息已经被完全处理完毕, 从而确保你的数据被安全的保存直到使用完毕
  • 拓展:消息解耦了处理过程, 所以增大消息入队和处理的频率是简单的,只需要在对应的端加速处理即可. 无需修改代码,修改参数,扩展非常简单
  • 灵活 & 峰值处理能力: 在访问量剧增的情况下, 应用仍然需要继续发挥作用,但这样的突发流量并不常见; 如果对此特定时间为标准投入资源,无疑是巨大的浪费. 使用消息队列能使关键组件顶住突发的压力,而不是因为突发的超负荷的请求完全崩溃
  • 可恢复性:系统的一部分组件失效时,不会影响到整个系统. 消息队列降低了进程间的耦合度, 即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在恢复后被处理
  • 顺序:在大多使用厂家下,数据处理的顺序都很重要. 大部分消息队列本来就是排序的,并且能保证数据按照特定的顺序来处理. kafka保证一个Partition内的消息有序性
  • 缓冲:在任何重要的系统中,都会有需要不同的处理时间因素, 消息队列通过一个缓冲层来帮助业务最高效的执行-写入队列的处理尽可能的快速. 缓冲有助于控制和优化数据流和系统的速度
  • 异步:很多时候并不需要立即处理消息,而消息队列提供了异步处理机制, 允许将消息放入到队列,但不立即处理它. 只需要到一定的时间点处理即可

kafka发行版本与版本号

Apache Kafka

Apache Kafka 是最“正宗”的 Kafka,也应该最熟悉的发行版。

自 Kafka 开源伊始,它便在 Apache 基金会孵化并最终毕业成为顶级项目,它也被称为社区版 Kafka。咱们专栏就是以这个版本的 Kafka作为模板来学习的。更重要的是,它是后面其他所有发行版的基础。也就是说,后面提到的发行版要么是原封不动地继承了 Apache Kafka,要么是在此之上扩展了新功能,总之 Apache Kafka 是我们学习和使用 Kafka 的基础

  • 优: 对 Apache Kafka 而言,它现在依然是开发人数最多、版本迭代速度最快的 Kafka。在 2018 年度 Apache 基金会邮件列表开发者数量最多的 Top 5 排行榜中,Kafka 社区邮件组排名第二位。如果你使用 ApacheKafka 碰到任何问题并提交问题到社区,社区都会比较及时地响应你。这对于我们 Kafka 普通使用者来说无疑是非常友好的。
  • 劣: Apache Kafka 的劣势在于它仅仅提供最最基础的组件,特别是对于前面提到的 Kafka Connect 而言,社区版 Kafka只提供一种连接器,即读写磁盘文件的连接器,而没有与其他外部系统交互的连接器,在实际使用过程中需要自行编写代码实现,这是它的一个劣势。

另外 Apache Kafka 没有提供任何监控框架或工具。显然在线上环境不加监控肯定是不可行的,你必然需要借助第三方的监控框架实现对 Kafka 的监控。好消息是目前有一些开源的监控框架可以帮助用于监控 Kafka(比如 Kafka manager)。

总而言之,如果仅仅需要一个消息引擎系统亦或是简单的流处理应用场景,同时需要对系统有较大把控度,那么推荐你使用 Apache Kafka。

Confluent Kafka

它主要从事商业化 Kafka 工具开发,并在此基础上发布了 Confluent Kafka。Confluent Kafka 提供了一些 Apache Kafka 没有的高级特性,

比如跨数据中心备份、Schema 注册中心以及集群监控工具等。

  • 优: Confluent Kafka 目前分为免费版和企业版两种。前者和 Apache Kafka 非常相像,除了常规的组件之外,免费版还包含 Schema 注册中心和 REST proxy 两大功能。前者是帮助你集中管理 Kafka 消息格式以实现数据前向 / 后向兼容;后者用开放 HTTP 接口的方式允许你通过网络访问 Kafka 的各种功能,这两个都是 Apache Kafka 所没有的。除此之外,免费版包含了更多的连接器,它们都是 Confluent 公司开发并认证过的,可以免费使用它们。至于企业版,它提供的功能就更多了。最有用的当属跨数据中心备份和集群监控两大功能了。多个数据中心之间数据的同步以及对集群的监控历来是 Kafka 的痛点,Confluent Kafka 企业版提供了强大的解决方案帮助你“干掉”它们
  • 劣: Confluent 公司暂时没有发展国内业务的计划,相关的资料以及技术支持都很欠缺,很多国内 Confluent Kafka 使用者甚至无法找到对应的中文文档,因此目前 Confluent Kafka 在国内的普及率是比较低的。

如果需要用到 Kafka 的一些高级特性,那么推荐使用 Confluent Kafka。

Cloudera/Hortonworks Kafka

Cloudera 提供的 CDH 和 Hortonworks 提供的 HDP 是非常著名的大数据平台,里面集成了目前主流的大数据框架,能够帮助用户实现从分布式存储、集群调度、流处理到机器学习、实时数据库等全方位的数据处理。

很多创业公司在搭建数据平台时首选就是这两个产品。不管是 CDH 还是 HDP 里面都集成了 Apache Kafka,因此把这两款产品中的 Kafka 称为 CDH Kafka 和 HDP Kafka。

  • 优: 大数据平台天然集成了 Apache Kafka,通过便捷化的界面操作将 Kafka 的安装、运维、管理、监控全部统一在控制台中。所有的操作都可以在前端 UI 界面上完成,而不必去执行复杂的 Kafka 命令。
  • 劣: 这样做的结果是直接降低了你对 Kafka 集群的掌控程度。另一个弊端在于它的滞后性。由于它有自己的发布周期,因此是否能及时地包含最新版本的 Kafka 就成为了一个问题。比如 CDH 6.1.0 版本发布时 Apache Kafka 已经演进到了 2.1.0 版本,但 CDH 中的 Kafka 依然是 2.0.0版本,显然那些在 Kafka 2.1.0 中修复的 Bug 只能等到 CDH 下次版本更新时才有可能被真正修复。
kafka版本 优势 不足
Apache Kafka,也称社区版 Kafka 优势在于迭代速度快,社区响应度高,使用它可以让你有更高的把控度 缺陷在于仅提供基础核心组件,缺失一些高级的特性。
Confluent Kafka,Confluent 公司提供的 Kafka 优势在于集成了很多高级特性且由 Kafka 原班人马打造,质量上有保证 缺陷在于相关文档资料不全,普及率较低,没有太多可供参考的范例。
CDH/HDP Kafka,大数据云公司提供的 Kafka,内嵌 Apache Kafka - 优势在于操作简单,节省运维成本 缺陷在于把控度低,演进速度较慢

kafka 版本号

Kafka 版本命名

Kafka - 图3

其中前半部分为 Scala语言版本,后才为kafka版本,如上图所示

他们均符合x.y.z 命名规范

Kafka 版本演进

Kafka 目前总共演进了 7 个大版本,分别是 0.7.x、0.8.x、0.9.x、0.10.x、0.11.x、1.x 、 2.x,3.x 其中的小版本和 Patch 版本很

多。

本文书写 时最新版本为 3.10(2022-03-29)

0.7.x版本

很老的Kafka版本,它只有基本的消息队列功能,连消息副本机制都没有,不建议使用。

0.8.x版本

两个重要特性,

一个是Kafka 0.8.0增加了副本机制,

另一个是Kafka 0.8.2.0引入了新版本Producer API。

新旧版本Producer API如下:

  1. 1
  2. //旧版本Producer
  3. 2
  4. kafka.javaapi.producer.Producer<K,V>
  5. 3
  6. 4
  7. //新版本Producer
  8. 5
  9. org.apache.kafka.clients.producer.KafkaProducer<K,V>

与旧版本相比,新版本ProducerAPI有点不同,一是连接Kafka方式上,旧版本的生产者及消费者API连接的是Zookeeper,而新版本则连接的是Broker;二是新版Producer采用异步方式发送消息,比之前同步发送消息的性能有所提升。但此时的新版ProducerAPI尚不稳定,不建议生产使用。

0.9.x版本

Kafka 0.9 是一个重大的版本迭代,增加了非常多的新特性,主要体现在三个方面:

  • 安全方面:在0.9.0之前,Kafka安全方面的考虑几乎为0。Kafka 0.9.0 在安全认证、授权管理、数据加密等方面都得到了支持,包括支持Kerberos等。
  • 新版本Consumer API:Kafka 0.9.0 重写并提供了新版消费端API,使用方式也是从连接Zookeeper切到了连接Broker,但是此时新版ConsumerAPI也不太稳定、存在不少Bug,生产使用可能会比较痛苦;而0.9.0版本的Producer API已经比较稳定了,生产使用问题不大。
  • Kafka Connect:Kafka 0.9.0 引入了新的组件 Kafka Connect ,用于实现Kafka与其他外部系统之间的数据抽取。

0.10.x版本

Kafka 0.10 是一个重要的大版本,因为Kafka 0.10.0.0 引入了 Kafka Streams,使得Kafka不再仅是一个消息引擎,而是往一个分布式流处理平台方向发展。0.10 大版本包含两个小版本:0.10.1 和0.10.2,它们的主要功能变更都是在 Kafka Streams 组件上。

值得一提的是,自 0.10.2.2 版本起,新版本 Consumer API 已经比较稳定了,而且 Producer API 的性能也得到了提升,因此对于使用 0.10.x 大版本的用户,建议使用或升级到 Kafka 0.10.2.2版本。

0.11.x版本

Kafka 0.11 是一个里程碑式的大版本,主要有两个大的变更,一是Kafka从这个版本开始支持 Exactly-Once 语义即精准一次语义,主要是实现了Producer端的消息幂等性,以及事务特性,这对于Kafka流式处理具有非常大的意义。

另一个重大变更是Kafka消息格式的重构,Kafka 0.11主要为了实现Producer幂等性与事务特性,重构了投递消息的数据结构。这一点非常值得关注,因为Kafka0.11之后的消息格式发生了变化,所以我们要特别注意Kafka不同版本间消息格式不兼容的问题。

1.x版本

Kafka 1.x 更多的是Kafka Streams方面的改进,以及Kafka Connect的改进与功能完善等。但仍有两个重要特性,一是Kafka 1.0.0实现了磁盘的故障转移,当Broker的某一块磁盘损坏时数据会自动转移到其他正常的磁盘上,Broker还会正常工作,这在之前版本中则会直接导致Broker宕机,因此Kafka的可用性与可靠性得到了提升;

二是Kafka 1.1.0开始支持副本跨路径迁移,分区副本可以在同一Broker不同磁盘目录间进行移动,这对于磁盘的负载均衡非常有意义。

2.x版本

Kafka 2.x 更多的也是Kafka Streams、Connect方面的性能提升与功能完善,以及安全方面的增强等。一个使用特性,Kafka2.1.0开始支持ZStandard的压缩方式,提升了消息的压缩比,显著减少了磁盘空间与网络io消耗。

3.x版本

  • 不再支持 Java 8 和 Scala 2.12
  • Kafka Raft 支持元数据主题的快照以及自我管理的仲裁中的其他改进
  • 为默认启用的 Kafka 生产者提供更强的交付保证
  • 弃用消息格式 v0 和 v1
  • OffsetFetch 和 FindCoordinator 请求的优化
  • 更灵活的 Mirror Maker 2 配置和 Mirror Maker 1 的弃用
  • 能够在 Kafka Connect 中的一次调用中重新启动连接器的任务
  • 现在默认启用连接器日志上下文和连接器客户端覆盖
  • Kafka Streams 中时间戳同步的增强语义
  • 改进了 Stream 的 TaskId 的公共 API
  • Kafka 中的默认 serde 变为 null

Kafka版本建议

  1. 遵循一个基本原则,Kafka客户端版本和服务端版本应该保持一致,否则可能会遇到一些问题。
  2. 根据是否用到了Kafka的一些新特性来选择,假如要用到Kafka生产端的消息幂等性,那么建议选择Kafka 0.11 或之后的版本。
  3. 选择一个自己熟悉且稳定的版本,如果说没有比较熟悉的版本,建议选择一个较新且稳定、使用比较广泛的版本。

Kafka 部署

资源规划

Kafka - 图4

磁盘容量

磁盘容量需要考虑几个因素:新增消息数、消息留存时间、平均消息大小、备份数、是否启用压缩

$ 计算公式: 新增消息数 信息留存实践 平均消息大小 备份数 压缩率 * 110\% (索引及其他数据) $

假设有个业务每天需要向 Kafka 集群发送 1 亿条消息,每条消息保存两份以防止数据丢失,另外消息默认保存两周时间。现在假设消息的平均大小是 1KB,那么你能说出你的 Kafka 集群需要为这个业务预留多少磁盘空间吗?

每天 1 亿条 1KB 大小的消息,保存两份且留存两周的时间,那么总的空间大小就等于$ 10 ^ 8 * 1024 KB = 0.2 TB $

加上索引以及其他类型数据 在原有基础上增加 10%,那就是0.22TB

保留两周:$ 0.22TB * 14 = 3.08 TB $

压缩率为80%: $ 3.08 * 0.8 = 2.464 TB ≈ 2.5 TB $

保险起见建议预留3TB的存储空间

网络带宽

对于 Kafka 这种通过网络大量进行数据传输的框架而言,带宽特别容易成为瓶颈。事实上,在接触的真实案例当中,带宽资源不足导致 Kafka 出现性能问题的比例至少占 60% 以上。

当规划带宽时到不如说是部署kafka服务器数量

通常情况下只能假设 Kafka 会用到 70% 的带宽资源,因为总要为其他应用或进程留一些资源。根据实际使用经验,超过 70% 的阈值就有网络丢包的可能性了,故 70% 的设定是一个比较合理的值,也就是说单台 Kafka
服务器最多也就能使用大约 700Mb 的带宽资源。

稍等,这只是它能使用的最大带宽资源,你不能让 Kafka 服务器常规性使用这么多资源,故通常要再额外预留出 2/3 的资源,即单台服务器使用带宽 700Mb / 3 ≈ 240Mbps。需要提示的是,这里的 2/3
其实是相当保守的,你可以结合你自己机器的使用情况酌情减少此值。

好了,有了 240Mbps,我们就可以计算 1 小时内处理 1TB 数据所需的服务器数量了。根据这个目标,我们每秒需要处理 2336Mb 的数据,除以 240,约等于 10 台服务器。如果消息还需要额外复制两份,那么总的服务器台数还要乘以
3,即 30 台。

配置

  1. config
  2. ├── connect-console-sink.properties
  3. ├── connect-console-source.properties
  4. ├── connect-distributed.properties
  5. ├── connect-file-sink.properties
  6. ├── connect-file-source.properties
  7. ├── connect-log4j.properties
  8. ├── connect-mirror-maker.properties
  9. ├── connect-standalone.properties
  10. ├── consumer.properties
  11. ├── kraft
  12. ├── README.md
  13. ├── broker.properties
  14. ├── controller.properties
  15. └── server.properties
  16. ├── log4j.properties
  17. ├── producer.properties
  18. ├── server.properties
  19. ├── tools-log4j.properties
  20. ├── trogdor.conf
  21. └── zookeeper.properties

JVM 参数与垃圾回收算法

Kafka 服务器端代码是用 Scala 语言编写的,但终归还是会编译成 .Class 文件在 JVM 上运行,因此 JVM 参数设置对于 Kafka 集群的重要性不言而喻。

JVM 端设置,堆大小这个参数至关重要,无脑通用的建议:将 JVM 堆大小设置成 6GB

垃圾回收器的设置,也就是平时常说的 GC 设置。

手动设置使用 G1 收集器。在没有任何调优的情况下,G1 表现得要比 CMS 出色,主要体现在更少的 Full GC,需要调整的参数更少等,所以使用 G1 就好了。

  • KAFKA_HEAP_OPTS:指定堆大小。
  • KAFKA_JVM_PERFORMANCE_OPTS:指定 GC 参数。
  1. export KAFKA_HEAP_OPTS=--Xms6g --Xmx6g
  2. export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true
  3. kafka-server-start.sh ${KAFKA_HOME}/config/server.properties

操作系统参数

通常情况下,Kafka 并不需要设置太多的 系统参数.下面这几个在此较为重要:

  • 文件描述符限制:比如ulimit -n 1000000
  • 文件系统类型: 根据官网的测试报告,XFS 的性能要强于 ext4,所以生产环境最好还是使用 XFS甚至是ZFS。
  • swap:建议将 swappniess 配置成一个接近 0 但不为 0 的值,比如 1。
  • 提交时间:适当的增加提交间隔来降低物理磁盘的写操作

部署

  1. # 安装jdk
  2. sudo yum group install -y "development tools"
  3. sudo yum install -y java-1.8.0-openjdk.x86_64
  4. # 下载kafka
  5. wget -c https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/3.1.0/kafka_2.13-3.1.0.tgz
  6. # 解压缩
  7. tar -zxf kafka_2.13-3.1.0.tgz -C /usr/local
  8. # 配置环境变量
  9. # kafka env config
  10. export KAFKA_HOME=/usr/local/kafka_2.13-3.1.0
  11. export KAFKA_BIN=${KAFKA_HOME}/bin
  12. export PATH=${KAFKA_BIN}:PATH
  13. source /etc/profile
  14. # 初始化
  15. kafka-storage.sh format -c ${KAFKA_HOME}/config/kraft/server.properties -t `kafka-storage.sh random-uuid`
  16. # 启动服务
  17. kafka-server-start.sh ${KAFKA_HOME}/config/kraft/server.properties

验证

  1. # 创建quickstart-events主题
  2. kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
  3. kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
  4. kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092

Kafka监控工具

GitHub - didi/KnowStreaming: 一站式云原生实时流数据平台,通过0侵入、插件化构建企业级Kafka服务,极大降低操作、存储和管理实时流数据门槛

GitHub - jmxtrans/jmxtrans: jmxtrans

GitHub - smartloli/EFAK: A easy and high-performance monitoring system, for comprehensive monitoring and management of kafka cluster.

Offset Explorer

GitHub - yahoo/CMAK: CMAK is a tool for managing Apache Kafka clusters

GitHub - provectus/kafka-ui: Open-Source Web UI for Apache Kafka Management