简介
Apache Kafka 是一款开源的消息引擎系统。
消息引擎系统是一组规范。企业利用这组规范在不同系统之间传递语义准确的消息,实现松耦合的异步式数据传递。
消息引擎系统还要设定具体的传输协议,即我用什么方法把消息 传输出去。常见的有两种方法:
点对点模型:也叫消息队列模型。如果拿上面那个“民间版”的定义来说,那么系统 A发送的消息只能被系统 B 接收,其他任何系统都不能读取 A 发送的消息。日常生活的例子比如电话客服就属于这种模型:同一个客户呼入电话只能被一位客服人员处理,第二个客服人员不能为该客户服务。
发布 / 订阅模型:与上面不同的是,它有一个主题(Topic)的概念,你可以理解成逻辑语义相近的消息容器。该模型也有发送方和接收方,只不过提法不同。发送方也称为发布者(Publisher),接收方称为订阅者(Subscriber)。和点对点模型不同的是,这个模型可能存在多个发布者向相同的主题发送消息,而订阅者也可能存在多个,它们都能接收到相同主题的消息。生活中的报纸订阅就是一种典型的发布 / 订阅模型。
为什么系统 A 不能直接发送消息给系统 B?中间还要隔一个消息引擎呢?
答案:就是“削峰填谷”。这四个字简直比消息引擎本身还要有名气。所谓的“削峰填谷”就是指缓冲上下游瞬时突发流量,使其更平滑。特别是对于那种发送能力很强的上游系统,如果没有消息引擎的保护,“脆弱”的下游系统可能会直接被压垮导致全链路服务“雪崩”。但是,一旦有了消息引擎,它能够有效地对抗上游的流量冲击,真正做到将上游的“峰”填满到“谷”中,避免了流量的震荡。消息引擎系统的另一大好处在于发送方和接收方的松耦合,这也在一定程度上简化了应用的开发,减少了系统间不必要的交互。
Kafka 的服务器端由被称为 Broker 的服务进程构成,即一个 Kafka 集群由多个 Broker 组成,Broker 负责接收和处理客户端发送过来的请求,以及对消息进行持久化。
高可用手段:
- 将不同的 Broker 分散运行在不同的机器上,这样如果集群中某一台机器宕机,即使在它上面 运行的所有 Broker 进程都挂掉了,其他机器上的 Broker 也依然能够对外提供服务。
- 备份机制(Replication)。备份的思想很简单,就是把相同的数据拷贝到多台机器上,而这些相同的数据拷贝在 Kafka 中被称为副本(Replica)。
Kafka 的三层消息架构:
- 第一层是主题层,每个主题可以配置 M 个分区,而每个分区又可以配置 N 个副本。
- 第二层是分区层,每个分区的 N 个副本中只能有一个充当领导者角色,对外提供服务;其他 N-1 个副本是追随者副本,只是提供数据冗余之用。
- 第三层是消息层,分区中包含若干条消息,每条消息的位移从 0 开始,依次递增。
- 最后,客户端程序只能与分区的领导者副本进行交互。
术语
消息:Record。Kafka 是消息引擎嘛,这里的消息就是指 Kafka 处理的主要对象。
主题:Topic。主题是承载消息的逻辑容器,在实际使用中多用来区分具体的业务。
分区:Partition。一个有序不变的消息序列。每个主题下可以有多个分区。
消息位移:Offset。表示分区中每条消息的位置信息,是一个单调递增且不变的值。
副本:Replica。Kafka 中同一条消息能够被拷贝到多个地方以提供数据冗余,这些地方就是所谓的副本。副本还分为领导者副本和追随者副本,各自有不同的角色划分。副本是在分区层级下的,即每个分区可配置多个副本实现高可用。
生产者:Producer。向主题发布新消息的应用程序。
消费者:Consumer。从主题订阅新消息的应用程序。
消费者位移:Consumer Offset。表征消费者消费进度,每个消费者都有自己的消费者位移。
消费者组:Consumer Group。多个消费者实例共同组成的一个组,同时消费多个分区以实现高吞吐。
重平衡:Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。
Kafka 是消息引擎系统,也是分布式流处理平台。
特性:
提供一套 API 实现生产者和消费者;
降低网络传输和磁盘存储开销;
实现高伸缩性架构。
版本
Apache Kafka
Apache Kafka 是最“正宗”的 Kafka,也应该是你最熟悉的发行版了。自 Kafka 开源伊始,它便在 Apache 基金会孵化并最终毕业成为顶级项目,它也被称为社区版 Kafka。
劣势:在于它仅仅提供最最基础的组件,特别是对于前面提到的Kafka Connect 而言,社区版 Kafka 只提供一种连接器,即读写磁盘文件的连接器,而没有与其他外部系统交互的连接器,在实际使用过程中需要自行编写代码实现。
总而言之,如果你仅仅需要一个消息引擎系统亦或是简单的流处理应用场景,同时需要对系统有较大把控度,那么我推荐你使用 Apache Kafka。
Confluent Kafka
Confluent Kafka 目前分为免费版和企业版两种。前者和Apache Kafka 非常相像,除了常规的组件之外,免费版还包含 Schema 注册中心和 RESTproxy 两大功能。前者是帮助你集中管理 Kafka 消息格式以实现数据前向 / 后向兼容;后者用开放 HTTP 接口的方式允许你通过网络访问 Kafka 的各种功能,这两个都是 ApacheKafka 所没有的。
除此之外,免费版包含了更多的连接器,它们都是 Confluent 公司开发并认证过的,你可以免费使用它们。至于企业版,它提供的功能就更多了。在我看来,最有用的当属跨数据中心备份和集群监控两大功能了。多个数据中心之间数据的同步以及对集群的监控历来是Kafka 的痛点,Confluent Kafka 企业版提供了强大的解决方案帮助你“干掉”它们。
Cloudera/Hortonworks Kafka
便捷化的界面操作将 Kafka 的安装、运维、管理、监控全部统一在控制台中。如果你是这些平台的用户一定觉得非常方便,因为所有的操作都可以在前端 UI 界面上完成,而不必去执行复杂的 Kafka 命令。另外这些平台提供的监控界面也非常友好,你通常不需要进行任何配置就能有效地监控 Kafka。
如果你需要快速地搭建消息引擎系统,或者你需要搭建的是多框架构成的数据平台且 Kafka 只是其中一个组件,那么我推荐你使用这些大数据云公司提供的 Kafka。
总结
Apache Kafka,也称社区版 Kafka。优势在于迭代速度快,社区响应度高,使用它可以让你有更高的把控度;缺陷在于仅提供基础核心组件,缺失一些高级的特性。
Confluent Kafka,Confluent 公司提供的 Kafka。优势在于集成了很多高级特性且由Kafka 原班人马打造,质量上有保证;缺陷在于相关文档资料不全,普及率较低,没有太多可供参考的范例。
CDH/HDP Kafka,大数据云公司提供的 Kafka,内嵌 Apache Kafka。优势在于操作简单,节省运维成本;缺陷在于把控度低,演进速度较慢。
kafka-manager最近的演进速度不及Kafka本身。目前我了解到用的比较多的是
JMXTrans + InfluxDB + Grafana这种组合
版本号
Kafka 版本命名
当前 Apache Kafka 已经迭代到 2.2 版本,社区正在为 2.3.0 发版日期进行投票.
于是有些同学就会纳闷,难道 Kafka 版本号不是 2.11 或 2.12 吗?其实不然,前面的版本号是编译 Kafka 源代码的 Scala 编译器版本。
真正的 Kafka版本号实际上是 2.1.1。那么这个 2.1.1 又表示什么呢?前面的 2 表示大版本号,即 Major Version;中间的 1 表示小版本号或次版本号,即 Minor Version;最后的 1 表示修订版本号,也就是 Patch 号。
Kafka 版本演进
Kafka 目前总共演进了 7 个大版本,分别是 0.7、0.8、0.9、0.10、0.11、1.0 和 2.0,其中的小版本和 Patch 版本很多。
Kafka 从 0.7 时代演进到 0.8 之后正式引入了副本机制,至此 Kafka 成为了一个真正意义上完备的分布式高可靠消息队列解决方案。有了副本备份机制,Kafka 就能够比较好地做到消息无丢失。那时候生产和消费消息使用的还是老版本的客户端 API,所谓的老版本是指当你用它们的 API 开发生产者和消费者应用时,你需要指定 ZooKeeper 的地址而非 Broker的地址。
2015 年 11 月,社区正式发布了 0.9.0.0 版本。在我看来这是一个重量级的大版本更迭,0.9 大版本增加了基础的安全认证 / 权限功能,同时使用 Java 重写了新版本消费者 API,另外还引入了 Kafka Connect 组件用于实现高性能的数据抽取。如果这么多眼
花缭乱的功能你一时无暇顾及,那么我希望你记住这个版本的另一个好处,那就是新版本Producer API 在这个版本中算比较稳定了。
0.10.0.0 是里程碑式的大版本,因为该版本引入了 Kafka Streams。从这个版本起,Kafka 正式升级成分布式流处理平台,虽然此时的 Kafka Streams 还基本不能线上部署使用。0.10 大版本包含两个小版本:0.10.1 和 0.10.2,它们的主要功能变更都是在 KafkaStreams 组件上。如果你把 Kafka 用作消息引擎,实际上该版本并没有太多的功能提升。
如果你依然在使用 0.10 大版本,我强烈建议你至少升级到 0.10.2.2 然后使用新版本 ConsumerAPI。还有个事情不得不提,0.10.2.2 修复了一个可能导致 Producer 性能降低的 Bug。基于性能的缘故你也应该升级到 0.10.2.2。
在 2017 年 6 月,社区发布了 0.11.0.0 版本,引入了两个重量级的功能变更:一个是提供幂等性 Producer API 以及事务(Transaction) API;另一个是对 Kafka 消息格式做了重构。
部署
操作系统
主流的 I/O 模型通常有 5 种类型:阻塞式 I/O、非阻塞式 I/O、I/O 多路复用、信号驱动I/O 和异步 I/O。每种 I/O 模型都有各自典型的使用场景,比如 Java 中 Socket 对象的阻塞模式和非阻塞模式就对应于前两种模型;而 Linux 中的系统调用 select 函数就属于 I/O多路复用模型;大名鼎鼎的 epoll 系统调用则介于第三种和第四种模型之间;至于第五种模型,其实很少有 Linux 系统支持,反而是 Windows 系统提供了一个叫 IOCP 线程模型属于这一种。
实际上 Kafka 客户端底层使用了 Java的 selector,selector 在 Linux 上的实现机制是 epoll,而在 Windows 平台上的实现机制
是 select。因此在这一点上将 Kafka 部署在 Linux 上是有优势的,因为能够获得更高效的I/O 性能。
磁盘
在对 Kafka 集群进行磁盘规划时经常面对的问题是,我应该选择普通的机械磁盘还是固态硬盘?
前者成本低且容量大,但易损坏;后者性能优势大,不过单价高。我给出的建议是使用普通机械硬盘即可。
Kafka 大量使用磁盘不假,可它使用的方式多是顺序读写操作,一定程度上规避了机械磁盘最大的劣势,即随机读写操作慢。从这一点上来说,使用 SSD 似乎并没有太大的性能优势,毕竟从性价比上来说,机械磁盘物美价廉,而它因易损坏而造成的可靠性差等缺陷,又由 Kafka 在软件层面提供机制来保证,故使用普通机械磁盘是很划算的。
磁盘容量
Kafka 集群到底需要多大的存储空间?
我们来计算一下:每天 1 亿条 1KB 大小的消息,保存两份且留存两周的时间,那么总的空间大小就等于 1 亿 1KB 2 / 1000 / 1000 = 200GB。一般情况下 Kafka 集群除了消息数据还有其他类型的数据,比如索引数据等,故我们再为这些数据预留出 10% 的磁盘空间,因此总的存储容量就是 220GB。既然要保存两周,那么整体容量即为 220GB 14,大约 3TB 左右。Kafka 支持数据的压缩,假设压缩比是 0.75,那么最后你需要规划的存储空间就是 0.75 3 = 2.25TB。
带宽
假设你公司的机房环境是千兆网络,即 1Gbps,现在你有个业务,其业务目标或 SLA 是在 1 小时内处理 1TB 的业务数据。那么问题来了,你到底需要多少台 Kafka 服务器来完成这个业务呢?
根据实际使用经验,超过 70% 的阈值就有网络丢包的可能性了,故 70% 的设定是一个比较合理的值,也就是说单台 Kafka 服务器最多也就能使用大约 700Mb 的带宽资源。稍等,这只是它能使用的最大带宽资源,你不能让 Kafka 服务器常规性使用这么多资源,故通常要再额外预留出 2/3 的资源,即单台服务器使用带宽 700Mb / 3 ≈ 240Mbps。需要提示的是,这里的 2/3 其实是相当保守的,你可以结合你自己机器的使用情况酌情减少此值。
好了,有了 240Mbps,我们就可以计算 1 小时内处理 1TB 数据所需的服务器数量了。根据这个目标,我们每秒需要处理 2336Mb(10241024/36008,带宽资源一般用Mbps而不是MBps衡量) 的数据,除以 240,约等于 10 台服务器。如果
消息还需要额外复制两份,那么总的服务器台数还要乘以 3,即 30 台。
参数配置(重点)
Broker
首先 Broker 是需要配置存储信息的,即 Broker 使用哪些磁盘。那么针对存储信息的重要参数有以下这么几个:
log.dirs:这是非常重要的参数,指定了 Broker 需要使用的若干个文件目录路径。要知道这个参数是没有默认值的,这说明什么?这说明它必须由你亲自指定。在线上生产环境中一定要为log.dirs配置多个路径,具体格式是一个 CSV 格式,也就是用逗号分隔的多个路径,比如/home/kafka1,/home/kafka2,/home/kafka3这样。如果有条件的话你最好保证这些目录挂载到不同的物理
磁盘上。这样做有两个好处:提升读写性能:比起单块磁盘,多块物理磁盘同时读写数据有更高的吞吐量。能够实现故障转移:即Failover。这是 Kafka 1.1 版本新引入的强大功能。
log.dir:注意这是 dir,结尾没有 s,说明它只能表示单个路径,它是补充上一个参数用的。(一版不用设置)
ZooKeeper
首先 ZooKeeper 是做什么的呢?
它是一个分布式协调框架,负责协调管理并保存 Kafka 集群的所有元数据信息,比如集群都有哪些Broker 在运行、创建了哪些 Topic,每个 Topic 都有多少分区以及这些分区的 Leader 副本都在哪些机器上等信息。
Kafka 与 ZooKeeper 相关的最重要的参数当属zookeeper.connect。这也是一个 CSV 格式的参数,比如我可以指定它的值为zk1:2181,zk2:2181,zk3:2181。2181 是 ZooKeeper的默认端口。
现在问题来了,如果我让多个 Kafka 集群使用同一套ZooKeeper 集群,那么这个参数应该怎么设置呢?
如果你有两套 Kafka 集群,假设分别叫它们 kafka1 和kafka2,那么两套集群的zookeeper.connect参数可以这样指定:zk1:2181,zk2:2181,zk3:2181/kafka1和zk1:2181,zk2:2181,zk3:2181/kafka2。切记 chroot只需要写一次,而且是加到最后的。我经常碰到有人这样指定:zk1:2181/kafka1,zk2:2181/kafka2,zk3:2181/kafka3,这样的格式是不对的。
Broker 连接
即客户端程序或其他Broker 如何与该 Broker 进行通信的设置。有以下三个参数:
listeners:学名叫监听器,其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务。
advertised.listeners:和 listeners 相比多了个advertised。Advertised 的含义表示宣称的、公布的,就是说这组监听器是 Broker 用于对外发布的。
host.name/port:列出这两个参数就是想说你把它们忘掉吧,压根不要为它们指定值,毕竟都是过期的参数了。
我们具体说说监听器的概念,从构成上来说,它是若干个逗号分隔的三元组,每个三元组的格式为<协议名称,主机名,端口号>。这里的协议名称可能是标准的名字,比如PLAINTEXT 表示明文传输、SSL 表示使用 SSL 或 TLS 加密传输等;也可能是你自己定义的协议名字,比如CONTROLLER: //localhost:9092。一旦你自己定义了协议名称,你必须还要指定listener.security.protocol.map参数告诉这个协议底层使用了哪种安全协议,比如指定listener.security.protocol.map=CONTROLLER:PLAINTEXT表示CONTROLLER这个自定义协议底层使用明文不加密传输数据。
至于三元组中的主机名和端口号则比较直观,不需要做过多解释。不过有个事情你还是要注意一下,经常有人会问主机名这个设置中我到底使用 IP 地址还是主机名。这里我给出统一的建议:最好全部使用主机名,即 Broker 端和 Client端应用配置中全部填写主机名。 Broker 源代码中也使用的是主机名,如果你在某些地方使用了 IP 地址进行连接,可能会发生无法连接的问题。
Topic 管理
auto.create.topics.enable:是否允许自动创建Topic。参数我建议最好设置成false,即不允许自动创建 Topic。在我们的线上环境里面有很多名字稀奇古怪的 Topic,我想大概都是因为该参数被设置成了 true 的缘故。
unclean.leader.election.enable:是否允许Unclean Leader 选举。只有保存数据比较多的那些副本才有资格竞选,那些落
后进度太多的副本没资格做这件事。默认false但是还是建议你还是显式地把它设置成 false
auto.leader.rebalance.enable:是否允许定期进行 Leader 选举。设置它的值为 true 表示允许 Kafka 定期地对一些 Topic 分区进行Leader 重选举,当然这个重选举不是无脑进行的,它要满足一定的条件才会发生。严格来说它与上一个参数中Leader 选举的最大不同在于,它不是选 Leader,而是换Leader!比如 Leader A 一直表现得很好,但若auto.leader.rebalance.enable=true,那么有可能一段时间后 Leader A 就要被强行卸任换成 Leader B。建议你在生产环境中把这个参数设置成 false。
数据留存方面
log.retention.{hour|minutes|ms}:这是个“三兄弟”,都是控制一条消息数据被保存多长时间。从优先级上来说 ms 设置最高、minutes 次之、hour 最低。虽然 ms 设置有最高的优先级,但是通常情况下我们还是设置 hour 级别的多一些,比如log.retention.hour=168表示默认保存 7 天的数据,自动删除 7 天前的数据。很多公司把 Kafka 当做存储来使用,那么这个值就要相应地调大。
log.retention.bytes:这是指定 Broker 为消息保存的总磁盘容量大小。这个值默认是 -1,表明你想在这台 Broker 上保存多少数据都可以,至少在容量方面 Broker 绝对为你开绿灯,不会做任何阻拦。这个参数真正发挥作用的场景其实是在云上构建多租户的 Kafka 集群:设想你要做一个云上的 Kafka 服务,每个租户只能使用100GB 的磁盘空间,为了避免有个“恶意”租户使用过多的磁盘空间,设置这个参数就显得至关重要了。
message.max.bytes:控制 Broker 能够接收的最大消息大小。这个参数也是一样,默认的 1000012 太少了,还不到 1MB。实际场景中突破 1MB 的消息都是屡见不鲜的,因此在线上环境中设置一个比较大的值还是比较保险的做法。毕竟它只是一个标尺而已,仅仅衡量 Broker 能够处理的最大消息大小,即使设置大一点也不会耗费什么磁盘空间的。
Topic 级别参数
retention.ms:规定了该 Topic 消息被保存的时长。默认是 7 天,即该 Topic 只保存最近 7 天的消息。一旦设置了这个值,它会覆盖掉 Broker 端的全局参数值。
retention.bytes:规定了要为该 Topic 预留多大的磁盘空间。和全局参数作用相似,这个值通常在多租户的Kafka 集群中会有用武之地。当前默认值是 -1,表示可以无限使用磁盘空间。
max.message.bytes。决定了 Kafka Broker 能够正常接收该 Topic 的最大消息大小。
问题
为什么kafka不支持主从分离?
https://www.zhihu.com/question/327925275/answer/705690755