一:kafka

1:集群规划

  1. hadoop102
  2. hadoop103
  3. hadoop104

2:安装部署

前提安装ZK
http://kafka.apache.org/downloads

3:解压

  1. tar -zxvf kafka_2.11-2.4.1.tgz -C /opt/module/
  2. cd /opt/module/kafka_2.11-2.4.1
  3. mv kafka_2.11-2.4.1 kafka
  4. cd kafka
  5. mkdir logs
  6. vi server.properties
  1. 输入以下内容:
  2. #broker的全局唯一编号,不能重复
  3. broker.id=0
  4. #删除topic功能使能
  5. delete.topic.enable=true
  6. #处理网络请求的线程数量
  7. num.network.threads=3
  8. #用来处理磁盘IO的现成数量
  9. num.io.threads=8
  10. #发送套接字的缓冲区大小
  11. socket.send.buffer.bytes=102400
  12. #接收套接字的缓冲区大小
  13. socket.receive.buffer.bytes=102400
  14. #请求套接字的缓冲区大小
  15. socket.request.max.bytes=104857600
  16. #kafka运行日志存放的路径
  17. log.dirs=/opt/module/kafka/logs
  18. #topic在当前broker上的分区个数
  19. num.partitions=1
  20. #用来恢复和清理data下数据的线程数量
  21. num.recovery.threads.per.data.dir=1
  22. #segment文件保留的最长时间,超时将被删除
  23. log.retention.hours=168
  24. #配置连接Zookeeper集群地址
  25. zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka

4:配置环境变量

  1. sudo vi /etc/profile
  2. 添加
  3. #KAFKA_HOME
  4. export KAFKA_HOME=/opt/module/kafka
  5. export PATH=$PATH:$KAFKA_HOME/bin
  6. [atguigu@hadoop102 module]$ source /etc/profile

5:分发安装包

  1. xsync kafka/
  2. 分别在hadoop103hadoop104上修改配置文件
  3. /opt/module/kafka/config/server.properties中的broker.id=1broker.id=2
  4. 注:broker.id不得重复

6:集群启停

  1. kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
  2. kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
  3. kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
  4. kafka-server-stop.sh
  5. kafka-server-stop.sh
  6. kafka-server-stop.sh

二:术语

1:Topic

发布订阅的对象是主题(Topic)

2:Producer

向主题发布消息的客户端应用程序称为生产者(Producer)

3:Consumer

而订阅这些主题消息的客户端应用程序就被称为消费者(Consumer)

4:Clients

我们把生产者和消费者统称为客户端(Clients)

5:Broker

Kafka 的服务器端由被称为 Broker 的服务进程构成,即一个 Kafka 集群由多个 Broker 组成,Broker 负责接收和处理客户端发送过来的请求,以及对消息进行持久化。虽然多个 Broker 进程能够运行在同一台机器上,但更常见的做法是将不同的 Broker 分散运行在不同的机器上,这样如果集群中某一台机器宕机,即使在它上面运行的所有 Broker 进程都挂掉了,其他机器上的 Broker 也依然能够对外提供服务。这其实就是 Kafka 提供高可用的手段之一

6:Replica

就是把相同的数据拷贝到多台机器上,而这些相同的数据拷贝在 Kafka 中被称为副本(Replica),生产者总是向领导者副本写消息;而消费者总是从领导者副本读消息。至于追随者副本,它只做一件事:向领导者副本发送请求,请求领导者把最新生产的消息发给它,这样它能保持与领导者的同步

7:Partitioning

Kafka 中的分区机制指的是将每个主题划分成多个分区(Partition),每个分区是一组有序的消息日志。生产者生产的每条消息只会被发送到一个分区中,也就是说如果向一个双分区的主题发送一条消息,这条消息要么在分区 0 中,要么在分区 1 中。如你所见,Kafka 的分区编号是从 0 开始的,如果 Topic 有 100 个分区,那么它们的分区号就是从 0 到 99。

8:持久化数据

Kafka 使用消息日志(Log)来保存数据,一个日志就是磁盘上一个只能追加写(Append-only)消息的物理文件。因为只能追加写入,故避免了缓慢的随机 I/O 操作,改为性能较好的顺序 I/O 写操作,这也是实现 Kafka 高吞吐量特性的一个重要手段

9:Consumer Group

所谓的消费者组,指的是多个消费者实例共同组成一个组来消费一组主题。这组主题中的每个分区都只会被组内的一个消费者实例消费,其他消费者实例不能消费它。为什么要引入消费者组呢?主要是为了提升消费者端的吞吐量。多个消费者实例同时消费,加速整个消费端的吞吐量(TPS)。我会在专栏的后面详细介绍消费者组机制,所以现在你只需要了解消费者组是做什么的即可。另外这里的消费者实例可以是运行消费者应用的进程,也可以是一个线程,它们都称为一个消费者实例(Consumer Instance)

10:总结

  • 消息:Record。Kafka 是消息引擎嘛,这里的消息就是指 Kafka 处理的主要对象。
  • 主题:Topic。主题是承载消息的逻辑容器,在实际使用中多用来区分具体的业务。
  • 分区:Partition。一个有序不变的消息序列。每个主题下可以有多个分区。
  • 消息位移:Offset。表示分区中每条消息的位置信息,是一个单调递增且不变的值。
  • 副本:Replica。Kafka 中同一条消息能够被拷贝到多个地方以提供数据冗余,这些地方就是所谓的副本。副本还分为领导者副本和追随者副本,各自有不同的角色划分。副本是在分区层级下的,即每个分区可配置多个副本实现高可用。
  • 生产者:Producer。向主题发布新消息的应用程序。
  • 消费者:Consumer。从主题订阅新消息的应用程序。
  • 消费者位移:Consumer Offset。表征消费者消费进度,每个消费者都有自己的消费者位移。
  • 消费者组:Consumer Group。多个消费者实例共同组成的一个组,同时消费多个分区以实现高吞吐。
  • 重平衡:Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。

    三:重要参数

    1:Broker 端参数

    1):log.dirs=/home/kafka1,/home/kafka2,/home/kafka

    1. 日志文件目录,线上生产环境中一定要为log.dirs配置多个路径,如:/home/kafka1,/home/kafka2,/home/kafka3。好处:
    2. 提升读写性能:比起单块磁盘,多块物理磁盘同时读写数据有更高的吞吐量。
    3. 能够实现故障转移:即 Failover。这是 Kafka 1.1 版本新引入的强大功能。要知道在以前,只要 Kafka Broker 使用的任何一块磁盘挂掉了,整个 Broker 进程都会关闭。但是自 1.1 开始,这种情况被修正了,坏掉的磁盘上的数据会自动地转移到其他正常的磁盘上,而且 Broker 还能正常工作。还记得上一期我们关于 Kafka 是否需要使用 RAID 的讨论吗?这个改进正是我们舍弃 RAID 方案的基础:没有这种 Failover 的话,我们只能依靠 RAID 来提供保障。

    2):zookeeper.connect=zk1:2181,zk2:2181,zk3:2181

    1. zk1:2181,zk2:2181,zk3:2181
    2. #如果是两套kafka集群用一套zk,zk1:2181,zk2:2181,zk3:2181/kafka1和zk1:2181,zk2:2181,zk3:2181/kafka2

    3):listeners

    1. 学名叫监听器,其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务,例如:CONTROLLER: //localhost:9092

    4):advertised.listeners

    1. listeners 相比多了个 advertisedAdvertised 的含义表示宣称的、公布的,就是说这组监听器是 Broker 用于对外发布的

    5):host.name/port=不指定

    1. 列出这两个参数就是想说你把它们忘掉吧,压根不要为它们指定值,毕竟都是过期的参数了

    6):auto.create.topics.enable=false

    1. 是否允许自动创建 Topic,参数我建议最好设置成 false,即不允许自动创建 Topic。在我们的线上环境里面有很多名字稀奇古怪的 Topic,我想大概都是因为该参数被设置成了 true 的缘故。

    7):unclean.leader.election.enable=false

    1. 是关闭 Unclean Leader 选举的。何谓 Unclean?还记得 Kafka 有多个副本这件事吗?每个分区都有多个副本来提供高可用。在这些副本中只能有一个副本对外提供服务,即所谓的 Leader 副本,那么问题来了,这些副本都有资格竞争 Leader 吗?显然不是,只有保存数据比较多的那些副本才有资格竞选,那些落后进度太多的副本没资格做这件事。如果设置成 false,那么就坚持之前的原则,坚决不能让那些落后太多的副本竞选 Leader。这样做的后果是这个分区就不可用了,因为没有 Leader 了。反之如果是 true,那么 Kafka 允许你从那些“跑得慢”的副本中选一个出来当 Leader。这样做的后果是数据有可能就丢失了,因为这些副本保存的数据本来就不全,当了 Leader 之后它本人就变得膨胀了,认为自己的数据才是权威的。

    8):auto.leader.rebalance.enable=false

    ``` 设置它的值为 true 表示允许 Kafka 定期地对一些 Topic 分区进行 Leader 重选举,当然这个重选举不是无脑进行的,它要满足一定的条件才会发生。严格来说它与上一个参数中 Leader 选举的最大不同在于,它不是选 Leader,而是换 Leader!比如 Leader A 一直表现得很好,但若auto.leader.rebalance.enable=true,那么有可能一段时间后 Leader A 就要被强行卸任换成 Leader B。

你要知道换一次 Leader 代价很高的,原本向 A 发送请求的所有客户端都要切换成向 B 发送请求,而且这种换 Leader 本质上没有任何性能收益,因此我建议你在生产环境中把这个参数设置成 false。

  1. <a name="n8cpw"></a>
  2. ### 9):log.retention.{hour|minutes|ms} log.retention.hour=168

都是控制一条消息数据被保存多长时间。从优先级上来说 ms 设置最高、minutes 次之、hour 最低,但是通常情况下我们还是设置 hour 级别的多一些,比如log.retention.hour=168表示默认保存 7 天的数据,自动删除 7 天前的数据。很多公司把 Kafka 当做存储来使用,那么这个值就要相应地调大

  1. <a name="I1lNN"></a>
  2. ### 10):log.retention.bytes

这是指定 Broker 为消息保存的总磁盘容量大小,这个值默认是 -1,表明你想在这台 Broker 上保存多少数据都可以,至少在容量方面 Broker 绝对为你开绿灯,不会做任何阻拦。这个参数真正发挥作用的场景其实是在云上构建多租户的 Kafka 集群:设想你要做一个云上的 Kafka 服务,每个租户只能使用 100GB 的磁盘空间,为了避免有个“恶意”租户使用过多的磁盘空间,设置这个参数就显得至关重要了

<a name="wSzZV"></a>
### 11):message.max.bytes

控制 Broker 能够接收的最大消息大小,这个参数也是一样,默认的 1000012 太少了,还不到 1MB。实际场景中突破 1MB 的消息都是屡见不鲜的,因此在线上环境中设置一个比较大的值还是比较保险的做法。毕竟它只是一个标尺而已,仅仅衡量 Broker 能够处理的最大消息大小,即使设置大一点也不会耗费什么磁盘空间的

<a name="UnPrL"></a>
## 2:Topic 级别参数
<a name="bAgc6"></a>
### 1):retention.ms

规定了该 Topic 消息被保存的时长。默认是 7 天,即该 Topic 只保存最近 7 天的消息。一旦设置了这个值,它会覆盖掉 Broker 端的全局参数值。

<a name="oUXDW"></a>
### 2):retention.bytes

规定了要为该 Topic 预留多大的磁盘空间。和全局参数作用相似,这个值通常在多租户的 Kafka 集群中会有用武之地。当前默认值是 -1,表示可以无限使用磁盘空间。

<a name="E7Wob"></a>
### 3):max.message.bytes

设想你的部门需要将交易数据发送到 Kafka 进行处理,需要保存最近半年的交易数据,同时这些数据很大,通常都有几 MB,但一般不会超过 5MB。现在让我们用以下命令来创建 Topic

bin/kafka-topics.sh—bootstrap-serverlocalhost:9092—create—topictransaction—partitions1—replication-factor1—configretention.ms=15552000000—configmax.message.bytes=5242880

我们只需要知道 Kafka 开放了kafka-topics命令供我们来创建 Topic 即可。对于上面这样一条命令,请注意结尾处的--config设置,我们就是在 config 后面指定了想要设置的 Topic 级别参数。<br />下面看看使用另一个自带的命令kafka-configs来修改 Topic 级别参数。假设我们现在要发送最大值是 10MB 的消息,该如何修改呢?命令如下:

bin/kafka-configs.sh—zookeeperlocalhost:2181—entity-typetopics—entity-nametransaction—alter—add-configmax.message.bytes=10485760

<a name="QFVr9"></a>
### 4):JVM 参数

如果 Broker 所在机器的 CPU 资源非常充裕,建议使用 CMS 收集器。启用方法是指定-XX:+UseCurrentMarkSweepGC。 否则,使用吞吐量收集器。开启方法是指定-XX:+UseParallelGC。

KAFKA_HEAP_OPTS:指定堆大小。 KAFKA_JVM_PERFORMANCE_OPTS:指定 GC 参数。

$> export KAFKA_HEAP_OPTS=—Xms6g —Xmx6g $> export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true $> bin/kafka-server-start.sh config/server.properties

<a name="q7faB"></a>
### 5):操作系统参数

- 文件描述符限制
- 文件系统类型
- Swappiness
- 提交时间

首先是ulimit -n。我觉得任何一个 Java 项目最好都调整下这个值。实际上,文件描述符系统资源并不像我们想象的那样昂贵,你不用太担心调大此值会有什么不利的影响。通常情况下将它设置成一个超大的值是合理的做法,比如ulimit -n 1000000。还记得电影《让子弹飞》里的对话吗:“你和钱,谁对我更重要?都不重要,没有你对我很重要!”。这个参数也有点这么个意思。其实设置这个参数一点都不重要,但不设置的话后果很严重,比如你会经常看到“Too many open files”的错误。

其次是文件系统类型的选择。这里所说的文件系统指的是如 ext3、ext4 或 XFS 这样的日志型文件系统。根据官网的测试报告,XFS 的性能要强于 ext4,所以生产环境最好还是使用 XFS。对了,最近有个 Kafka 使用 ZFS 的数据报告,貌似性能更加强劲,有条件的话不妨一试。

第三是 swap 的调优。网上很多文章都提到设置其为 0,将 swap 完全禁掉以防止 Kafka 进程使用 swap 空间。我个人反倒觉得还是不要设置成 0 比较好,我们可以设置成一个较小的值。为什么呢?因为一旦设置成 0,当物理内存耗尽时,操作系统会触发 OOM killer 这个组件,它会随机挑选一个进程然后 kill 掉,即根本不给用户任何的预警。但如果设置成一个比较小的值,当开始使用 swap 空间时,你至少能够观测到 Broker 性能开始出现急剧下降,从而给你进一步调优和诊断问题的时间。基于这个考虑,我个人建议将 swappniess 配置成一个接近 0 但不为 0 的值,比如 1。

最后是提交时间或者说是 Flush 落盘时间。向 Kafka 发送数据并不是真要等数据被写入磁盘才会认为成功,而是只要数据被写入到操作系统的页缓存(Page Cache)上就可以了,随后操作系统根据 LRU 算法会定期将页缓存上的“脏”数据落盘到物理磁盘上。这个定期就是由提交时间来确定的,默认是 5 秒。一般情况下我们会认为这个时间太频繁了,可以适当地增加提交间隔来降低物理磁盘的写操作。当然你可能会有这样的疑问:如果在页缓存中的数据在写入到磁盘前机器宕机了,那岂不是数据就丢失了。的确,这种情况数据确实就丢失了,但鉴于 Kafka 在软件层面已经提供了多副本的冗余机制,因此这里稍微拉大提交间隔去换取性能还是一个合理的做法。

四:常用命令

kafka-topics.sh —bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 —create —topic topic01 —partitions 3 —replication-factor 2

kafka-topics.sh —bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 —list

kafka-topics.sh —bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 —describe —topic topic02

kafka-console-producer.sh —broker-list hadoop102:9092,hadoop103:9092,hadoop104:9092 —topic topic01

kafka-console-consumer.sh —bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 —topic topic01 —group g1 —property print.key=true —property print.value=true —property key.separator=, ```