一、为什么要使用消息队列

1.1 同步通信

我们先举个电商相关的🌰进行说明,业务流程如下:

  1. 用户下单。
  2. 数据库创建一笔订单信息。
  3. 商品库存表进行减少一笔库存。
  4. 给用户添加积分。
  5. 给用户下发优惠券。

KafKa - 图1
假设服务于服务之间的通信时间需要耗费 500 ms,然后每个服务进行对业务的处理时间需要 200 ms,那么整个用户下单流程所需的业务时间为 2 - 5 s,这对于客户而已肯定是不行。
同时这同步的方式还存在以下问题:

  • 造成的系统开销,响应时间特别久 2 - 5 s
  • 在同步通信的过程中必须保证每个服务都顺利执行完,整个链路执行完才算下单成功。
  • 因为网络等其他问题,可能会导致用户下单成功率较低,对用户的体验较差。

    1.2 异步通信

    同样也是👆的需求,我们使用消息队列进行实现异步通信进行看看,有什么好处。
    KafKa - 图2
    根据上面的流程图,我们可以看出异步通信针对于同步通信来说,异步的方式,可以让上游快速成功,极大的提高了系统的业务吞吐量,而且在分布式的系统中,通过下游多个服务对分布式事务的保障,也能解决业务执行后的数据最终一致性。
    消息队列解决具体的问题是什么?
    回答:通信问题

二、消息队列的流派

目前市面上对于消息队列的选型有很多种:

  • RabbitMQ:内部的可玩性(功能性)十分强大。
  • RocketMQ:阿里内部大神,根据 Kafka 内部执行原理,手写的一个消息队列中间件。性能是可以做到于 Kafka 相比肩的,除此之外,功能上封装了更多的功能。
  • ActiveMQ
  • Kafka:全球消息处理性能最快的一款 MQ。
  • ZeroMQ

这些消息队列都有什么区别呢?

2.1 什么是 MQ

Message Queue(MQ),消息队列中间件。很多人都说:MQ 通过将消息都发送和接收分离来实现应用程序的异步和解耦,这个给人的直觉就是 — MQ 是异步的,用来解耦的,但是这个只是 MQ 的效果而不是目的。MQ 真正的目的是为了通讯,屏蔽底层复杂的通讯协议,定义了一套应用层的、更加简单的通讯协议。一个分布式系统中两个模块之间的通讯要么是 HTTP,要么就是自己开发的 RPC,也称之为 TCP,但是这两种协议其实都是原始的协议。HTTP 协议很难实现两端通讯 — 模块 A 可以调用 B,B 也可以主动调用 A,如果要做到这个两端都要背上 Web Server,而且还不支持长连接(HTTP 2.0 的库根本找不到)。TCP 就更加原始了,粘包、心跳、私有的协议,想一想头皮就发麻。MQ 所要做的就是在这些协议之上构建一个简单的“协议” — 生产者/消费者模型。MQ 带给我的“协议”不是具体的通讯协议,而是更高层次通讯模型。它定义了两个对象 — 发送数据的叫生产者;接收数据的叫消费者,提供一个 SDK 让我们可以定义自己的生产者和消费者实现消息通讯而无视底层通讯协议。

2.2 有 Broker 的 MQ

这个流派通常有一台服务器作为 Broker,所有的消息都通过它中转。生产者把消息发送给它就结束自己的任务了,Broker 则把消息主动推送给消费者(或者消费者主动订阅)。

2.2.1 重 Topic

重 Topic 代表作品:Kafka、RocketMQ、ActiveMQ

Kafka、JMS(ActiveMQ)就属于这个流派,生产者会发送 key 和数据到 Broker,由 Broker 比较 key 之后决定给哪个消费者。这种模式是我们最常见的模式,是我们对 MQ 最多的印象。这这种模式下一个 topic 往往是一个比较大的概念,甚至一个系统中就可能只有一个 topic, topic 某种意义上就是 queue,生产者发送 key 相当于说:“hi,把数据放到 key 的队列中”。
image.png
如上图所示,Broker 定义了三个队列,key1,key2,key3,生产者发送数据的时候会发送 key1 和 data,Broker 这推送数据的时候则推送 data(也可能把 key 带上)。
虽然架构一样但是 Kafka 的性能要比 jms 的性能不知道高到多少倍,所以基本这种类型的 MQ 只有 Kafka 一种备选方案。如果你需要一条暴力的数据流(在乎性能而非灵活性)那么 Kafka 是最好的选择。

2.2.2 轻 Topic

这种的代表是 RabbitMQ(或者说是 AMQP)。生产者发送 key 和数据,消费者定义订阅的队列,Broker 收到数据之后会通过一定的逻辑计算出 key 对应的队列,然后把数据交给队列。
image.png

2.3 无 Broker 的 MQ

无 Broker 的 MQ 的代表是 ZeroMQ。该作者非常睿智,他非常敏锐的意识到 — MQ 是更高级的 Socket,它是解决通讯问题的。所以 ZeroMQ 被设计成了一个 “库” 而不是一个中间件,这种实现也可以达到 — 没有 Broker 的目的。
image.png
节点之间通讯的信息都是发送到彼此的队列中,每个节点都既是生产者又是消费者。ZeroMQ 做的事情就是封装出一套类似于 Socket 的 API 可以完成发送数据,读取数据。
ZeroMQ 其实就是一个跨语言、重量级的 Actor 模型邮箱库。你可以把自己的程序想象成一个 Actor,ZeroMQ 就是提供邮箱功能的库;ZeroMQ 可以实现同一台机器的 RPC 通讯也可以实现不同机器的 TCP、UDP 通讯,如果你需要一个强大的、灵活的、野蛮的通讯能力,别犹豫,那就是 ZeroMQ。


三、 Kafka 介绍

Kafka 是最初由 Linkedin 公司开发,是一个分布式支持分区的(Partition)多副本的(replica),基于 zookeeper 协调的分布式消息系统,它最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于 Hadoop 的批处理系统、低延迟的实时系统、Storm/Spark 流式处理引擎,web/nginx 日志、访问日志、消息服务等,用 Scala 语言编写,Linkedin 于 2010 年贡献给了 Apache 基金会并成为了顶级开源项目。

3.1 Kafka 的使用场景

  • 日志收集:一个公司可以使用 Kafka 收集各种服务的 log,通过 Kafka 以统一接口服务的方式开放给各种 consumer,例如 Hadoop、Hbase、Solr 等。
  • 消息系统:解耦和生产者和消费者、缓存消息等。
  • 用户活动跟踪:Kafka 经常被用来记录 Web 用户或者 App 用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息都被各个服务器发布到 Kafka 的 Topic 中,然后订阅者通过订阅这些 Topic 来做实时的监控分析,或者装载到 Hadoop、数据仓库中做离线分析和挖掘。
  • 运营指标:Kafka 也经常用来记录运营监控数据,包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。

    3.2 Kafka 基本概念

    Kafka 是一个分布式的、分区的消息(官方称之为 commit log)服务。它提供一个消息系统应该具备的功能,但是却有着独特的设计。可以这样说,Kafka 借鉴了 JMS 规范的思想,但是却并没有完成遵循 JMS 规范
    首先,让我们来看一下基础的消息(Message)相关术语:
名称 解释
Broker 消息中间件处理节点,一个 Kafka 节点就是一个 Broker,一个或多个 Broker 可以组成一个 Kafka 集群。
Topic Kafka 根据 Topic 对消息进行归类,发布到 Kafka 集群的每条信息都需要指定一个 Topic。
Producer 消息生产者,向 Broker 发送信息的客户端
Consumer 消息消费者,从 Broker 读取消息的客户端


ConsumerGroup
每个 Consumer 属于一个特定的 Consumer Group,一条消息可以被多个不同的 Consumer Group 消费,但是一个 Consumer Group 中只能由一个 Consumer 能够消费该信息。
Partition 物理上的概念,一个 Topic 可以分为多个 Partition,每个 Partition 内部消息是有序的。

因此,从⼀个较⾼的层⾯上来看,producer 通过⽹络发送消息到 Kafka 集群,然后 consumer
来进⾏消费,如下图:
image.png
服务端(brokers)和客户端(producer、consumer)之间通信通过TCP协议来完成。


四、Kafka 基础使用

4.1 安装 Kafka

  • 安装 JDK 1.8+
  • 安装 ZK ```markdown
  1. 下载 ZK https://zookeeper.apache.org/releases.html
  2. 目前使用版本为 apache-zookeeper-3.6.3-bin
  3. 解压缩 tar -zxvf apache-zookeeper-3.6.3-bin
  4. 复制 zoo.cfg 进入到 conf 目录 cp zoo_sample.cfg zoo.cfg
  5. 新建 zk 日志目录 mkdir zkdata
  6. vim zoo.cfg ``` image.png
  • 官网下载 Kafka 压缩包 http://kafka.apache.org/downloads 使用版本为 kafka-2.11-2.4.0.tgz
  • 进入到 conf 修改 server.properties

    1. #broker.id属性在kafka集群中必须要是唯⼀
    2. broker.id=0
    3. #kafka部署的机器ip和提供服务的端⼝号
    4. listeners=PLAINTEXT://192.168.65.60:9092
    5. #kafka的消息存储⽂件
    6. log.dir=/usr/local/data/kafka-logs
    7. #kafka连接zookeeper的地址
    8. zookeeper.connect=192.168.65.60:2181
  • 进入 bin 目录,使用以下命令进行启动 Kafka 服务器(携带配置文件)

    1. ./kafka-server-start.sh -daemon ../config/server.properties
  • 校验 Kafka 是否启动成功,进入到 zk 安装目录,查看是否把 Kafka 对应的 broker 注册上来,目录为 /brokers/ids/0

zk 客户端连接命令

  1. ./zkCli.sh -server ip:2181

image.png

4.2 server.properties 核心配置文件解析

property default description
broker.id 0 每个 broker 都可以使用一个唯一的非负整数 ID 进行标识。这个 ID 可以作为 broker 的名称,你可以选择任意的数字作为 broker 的 ID,只需要保持唯一即可。
log.dirs /tmp/kafka-logs kafka存放数据的路径。这个路径并不是唯⼀的,可以是多个,路径之间只需要使⽤逗号分隔即可;每
当创建新 partition 时,都会选择在包含最少 partitions 的路径下进⾏。
listeners PLAINTEXT://ip:9092 sever 接受客户端连接的的端口,ip 配置为安装 Kafka 的本机 IP 即可
zookeeper.connect lolachost:2181 zooKeeper连接字符串的格式为: hostname:port,此处 hostname和 port 分别是 ZooKeeper 集群中某个节点的 host 和 port; zookeeper 如果是集群,连接⽅式为 hostname1:port1, hostname2:port2,
hostname3:port3
log.retention.hours 168 每个日志文件删除之前保存的时间。默认数据保存的时间对所有的 topic 都一样。
num.partitions 1 创建 topic 的默认分区数
default.replication.factor 1 自动创建 topic 的默认副本数量,建议设置为大于等于 2
min.insync.replicas 1 当 producer 设置 acks 为-1时,min.insync.replicas 指定 replicas的最⼩数⽬(必须确认每⼀个repica的 写数据都是成功的),如果这个数⽬没有达到, producer发送消息会产⽣异常
delete.topic.enable false 是否允许删除主题

4.3 创建主题 topic

topic 是什么概念?topic 可以实现消息的分类,不同的消费者消费不同的 topic。
image.png
执行以下的命令进行创建一个名称为 test 的主题 topic,这个 topic 只有一个 partition 并且备份因子也设置为 1。

  1. ./kafka-topics.sh --create --zookeeper 172.17.222.14:2181 --replication-factor 1 --partitions 1 --topic test

image.png
查看当前 Kafka 内有哪些 topic

  1. ./kafka-topics.sh --list --zookeeper 172.17.222.14:2181

image.png

4.4 发送消息

Kafka 自带一个 producer 命令行的客户端,可以从本地文件中读取内容,或者我们也可以通过命令行直接输入内容,并将这些内容以消息的形式发送到 Kafka 集群中。在默认情况下,每一行独立的内容都会被当作是一条独立的消息。
使用以下命令使用 Kafka 的发送消息客户端指定发送到 Kafka 到服务器地址和 topic

  1. ./kafka-console-producer.sh --broker-list 172.17.222.14:9092 --topic test

4.5 消费消息

对于consumer,kafka同样也携带了⼀个命令⾏客户端,会将获取到内容在命令中进⾏输 出,默认是消费最新的消息。使⽤ kafka 的消费者消息的客户端,从指定 kafka 服务器的指定 topic 中消费消息。

  • 方式一:从最后一条消息到偏移量 + 1 开始消费

    1. ./kafka-console-consumer.sh --bootstrap-server 172.17.222.14:9092 --topic test
  • 方式二:从头开始消费

    1. ./kafka-console-consumer.sh --bootstrap-server 172.17.222.14:9092 --from-beginning --topic test

    image.png
    几个注意点:

  • 消息会被存储

  • 消息是顺序消费
  • 消息是有偏移量
  • 消费时可以指明偏移量进行消费

    4.6 消息的细节

    image.png
    两个消费者同时订阅一个主题,一个是从头开始消费,一个是从最新的消息 + 1 开始消费,底层是怎么实现的呢?

  • 生产者 producer 将信息发送给 broker 之后,broker 会把消息保存到本地到日志文件中

    1. /usr/softwar/kafka/kafka-logs/主题-分区/00000000000.log
  • 消息的保存是有序的,底层使用的是队列,先进先出。通过 offset 偏移量来描述消息的有序性。

  • 消费者消息消息时也是通过 offset 偏移量来描述当前需要消费的那条消息对应的位置。

    4.7 单播消息

    在一个 Kafka 的 topic 中,启动两个消费者,一个生产者,问:生产者发送一条消息,这两个消费者都能消费都能消费到这一条消息吗?
    如果是多个消费者在同一个消费组,那么只有一个消费者可以收到订阅的 topic 中的消息。换言之,在同一个消费组中只有一个消费者能消费到消息。
    1. ./kafka-console-consumer.sh --bootstrap-server 172.17.222.14:9092 --consumer-property group.id=testGroup --topic test

    4.8 多播消息

    不同的消费组订阅同一个 topic,那么不同的消费组中只有一个消费者能收到消息。实际上也是多个消费组中的多个消费者收到同一条消息。
    1. ./kafka-console-consumer.sh --bootstrap-server 172.17.222.14:9092 --consumer-property group.id=testGroup --topic test
    2. ./kafka-console-consumer.sh --bootstrap-server 172.17.222.14:9092 --consumer-property group.id=testGroup1 --topic test
    下图就是单播和多播的区别:
    image.png

    4.9 查询消费组的详细信息

    通过以下命令可以查看到消费组的详细信息: ```bash

    查看当前主题下有哪些消费组

    ./kafka-consumer-groups.sh —bootstrap-server 172.17.222.14:9092 —list

查看消费组中的具体信息:⽐如当前偏移量、最后⼀条消息的偏移量、堆积的消息数量

./kafka-consumer-groups.sh —bootstrap-server 172.17.222.14:9092 — describe —group testGroup

  1. ![image.png](https://cdn.nlark.com/yuque/0/2022/png/8434501/1647791200494-c05e0a9c-5257-4506-9b89-f202e6c60c94.png#clientId=u988bc880-3952-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=130&id=u3a9e0e1c&margin=%5Bobject%20Object%5D&name=image.png&originHeight=260&originWidth=2444&originalType=binary&ratio=1&rotation=0&showTitle=false&size=217647&status=done&style=shadow&taskId=u449abd1f-a871-4ebb-8709-cca00c80a9f&title=&width=1222)
  2. - current-offset:当前消费组已消费的偏移量
  3. - log-end-offset:当前订阅的 topic 消息对应的结束偏移量
  4. - lag:当前消费组未消费的消息
  5. ---
  6. <a name="KXyMc"></a>
  7. # 五、主题和分区的概念
  8. <a name="PX4uA"></a>
  9. ## 5.1 主题 topic
  10. 主题 topic kafka 中是⼀个逻辑的概念,kafka 通过 topic 将消息进⾏分类。不同的 topic 会被 订阅该 topic 的消费者消费。 <br />但是有⼀个问题,如果说这个 topic 中的消息⾮常⾮常多,多到需要⼏T来存,因为消息是会被保存到 log ⽇志⽂件中的。为了解决这个⽂件过⼤的问题,kafka 提出了 partition 分区的概念。
  11. <a name="lOAPb"></a>
  12. ## 5.2 分区 partition
  13. ![image.png](https://cdn.nlark.com/yuque/0/2022/png/8434501/1647791714286-da176a82-b758-4c47-9ed0-646becff9bf3.png#clientId=u988bc880-3952-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=252&id=u4d0c25c7&margin=%5Bobject%20Object%5D&name=image.png&originHeight=504&originWidth=1000&originalType=binary&ratio=1&rotation=0&showTitle=false&size=73455&status=done&style=shadow&taskId=u56bde7af-7f24-424f-9568-ecc26bdd50e&title=&width=500)<br />⼀个主题中的消息量是⾮常⼤的,因此可以通过分区的设置,来分布式存储这些消息。⽐如⼀个 topic 创建了 3个分区。那么 topic 中的消息就会分别存放在这三个分区中。
  14. **创建多个分区的主题**
  15. ```bash
  16. ./kafka-topics.sh --create --zookeeper 172.17.222.14:2181 --replication-factor 1 --partitions 2 --topic test1

可以通过以下命令查看主题的分区

  1. ./kafka-topics.sh --describe --zookeeper 172.17.222.14:2181 --topic test1

分区相关细节:

  • 生产者 producer 发送的消息是保存到 /usr/kafka/kafka-logs/主题名称-分区数/0000.log 文件中。
  • Kafka 默认生成了50个 __comsumer_offsets分区数,我们可以通过修改配置文件 offsets.topic.num.partitions进行配置默认产生的分区数。
  • __comsumer_offsets作用:存放消费者消费某个主题对应的偏移量。因为每个消费者都会维护自己所消费主题对应的偏移量,换句话说就是每个消费者都会自己当前消费的主题对应的偏移量(offset)提交到 Kafka 默认主题中。
    • 提交哪个分区:通过 hash 函数进行计算。相关公式为:hash(comsumerGroupId)%__comsumer_offsets 主题的分区数。
    • 提交到该主题的内容为:key 是 comsumerGroupId + topic + 分区号,value 就是当前 offset 的值。
  • 文件中保存的消息,默认保存 7 天。7 天后消息会被默认删除。

六、Kafka 集群操作

6.1 搭建集群

需要注意是本处集群搭建都是在一个机器上的,读者可以根据自己需要搭建在不同的机器上,步骤都是一致的。
搭建三个 broker。
前置条件:准备三个 server.properties 文件,这三个配置文件都需要修改以下内容。

  • server.properties ```bash broker.id=0

    允许外部端口连接

    listeners=PLAINTEXT://172.17.222.14:9092

外部代理地址 如果是部署在云服务上需要注意这点,否则会连接不上 advertised.listeners=PLAINTEXT://公网IP:9092

log.dir=/usr/software/kafka_2.11-2.4.0/kafka-logs

  1. - server1.properties
  2. ```bash
  3. broker.id=1
  4. # 允许外部端口连接
  5. listeners=PLAINTEXT://172.17.222.14:9093
  6. # 外部代理地址 如果是部署在云服务上需要注意这点,否则会连接不上 advertised.listeners=PLAINTEXT://公网IP:9093
  7. log.dir=/usr/software/kafka_2.11-2.4.0/kafka-logs-1
  • server2.properties ```bash broker.id=2

    允许外部端口连接

    listeners=PLAINTEXT://172.17.222.14:9094

外部代理地址 如果是部署在云服务上需要注意这点,否则会连接不上 advertised.listeners=PLAINTEXT://公网IP:9094

log.dir=/usr/software/kafka_2.11-2.4.0/kafka-logs-2

  1. - 注意点:如果 Kafka 机器是部署在云服务上,需要在 server.properties 配置下面的命令,并开启相关的安全组,否则程序会连接不上 Kafka,导致无法正常发送消息和消费消息
  2. ```powershell
  3. # 外部代理地址 如果是部署在云服务上需要注意这点,否则会连接不上 advertised.listeners=PLAINTEXT://公网IP:9092

启动 Kafka 集群

  1. ./kafka-server-start.sh -daemon ../config/server.properties
  2. ./kafka-server-start.sh -daemon ../config/server1.properties
  3. ./kafka-server-start.sh -daemon ../config/server2.properties

验证集群是否启动成功,到对应 zk 服务器进行查看
image.png

6.2 副本的概念

副本是对分区的备份。在集群中,不同的副本会被部署在不同的 broker 上。
下面例子:创建一个主题、三个副本、两个分区

  1. ./kafka-topics.sh --create --zookeeper 172.17.222.14:2181 --replication-factor 3 --partitions 2 --topic my-replicated-topic

查看 topic 相关信息

  1. ./kafka-topics.sh --describe --zookeeper 172.17.222.14:2181 --topic my-replicated-topic

image.png
副本是为主题中的多个分区创建多个备份。多个副本在 Kafka 集群中的多个 broker 中,其中会有一个作为 leader,其他的作为 flower。
image.png
相关概念解析:

  • leader:每个 partition 都有一个 broker 作为 leader,Kafka 的读写操作都发生在 leader 上。leader 负责把数据同步到 flower 上。当 leader 挂掉之后,通过主从选举的方式,从 flower 中选择一个作为 leader 进行工作。
  • flower:leader 处理所有针对这个 partition 的读写请求,而 flower 则被动的复制 leader(不提供任何的读写操作,主要是为了保证多个副本数据与消费的一致性)。
  • replicas:当前副本中存在的 broker ID。
  • Isr:可以同步的 broker 节点和已进行同步的 broker 节点都存放在 Isr 上。

    6.3 Kafka集群消息的发送

    1. ./kafka-console-producer.sh --broker-list
    2. 172.17.222.14:9092,172.17.222.14:9093,172.17.222.14:9094 --topic my-replicated-topic

    6.4 Kafka集群消息的消费

    1. ./kafka-console-consumer.sh --bootstrap-server
    2. 172.17.222.14:9092,172.17.222.14:9093,172.17.222.14:9094 --from-beginning --topic my-replicated-topic

    6.5 分区分组消费组的概念

    image.png

  • ⼀个partition只能被⼀个消费组中的⼀个消费者消费,⽬的是为了保证消费的顺序性,但是多个partion的多个消费者消费的总的顺序性是得不到保证的,那怎么做到消费的总顺序性呢?

  • partition的数量决定了消费组中消费者的数量,建议同⼀个消费组中消费者的数量不要超过partition的数量,否则多的消费者消费不到消息。
  • 如果消费者挂了,那么会触发rebalance机制(后⾯介绍),会让其他消费者来消费该分区。

七、Java 客户端操作 Kafka

7.1 引入依赖

这里建议引入的 maven 依赖需和部署的 Kafka 版本保持一致,否则会出现一些问题。

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>2.4.0</version>
  5. </dependency>

7.2 生产者

本处的示例 Kafka 客户端使用同步的方式进行发送消息。

  1. package com.baiyi.basic.producer;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.kafka.clients.producer.*;
  4. import org.apache.kafka.common.serialization.StringSerializer;
  5. import java.util.Properties;
  6. import java.util.concurrent.ExecutionException;
  7. /**
  8. * @author: BaiYi
  9. * @description: kafka 生产者
  10. * @date: 2022/3/27 16:03
  11. */
  12. @Slf4j
  13. public class KafkaProducer {
  14. private static final String TOPIC_NAME = "my-replicated-topic";
  15. public static void main(String[] args) {
  16. // 1. 设置相关参数
  17. Properties properties = new Properties();
  18. // Kafka 集群相关配置
  19. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "47.107.73.103:9092,47.107.73.103:9093,47.107.73.103:9094");
  20. // 把发送消息的 key 从字符串序列化为字节数组
  21. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  22. // 将发送消息的 value 从字符串序列化为字节数组
  23. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  24. /*
  25. Kafka 使用同步发送消息时,ack 相关配置:
  26. ack=0: 生产者往 Kafka clutter 发送消息,无需等待分区对应副本的 leader 接收到消息,broker 直接返回 ack 给生产者,
  27. 这种方式效率是最高的,但是容易丢失消息
  28. ack=1: 生产者往 Kafka clutter 发送消息,需要等待分区对应的副本的 leader 接收到消息,并把消息写到本地的 log 中,才会返回 ack 给生产者,
  29. 这种方式在性能和安全上是最均衡的。但是如果这时候 leader 还没来得及把消息同步到 flower 就已经挂掉了,也会丢失消息。
  30. ack=-1/all: 需要等待 min.insync.replicas(默认为1,推荐配置⼤于等于2) 这个参数配置的副本个数都成功写⼊⽇志,这种策略会保证只要有⼀个备份存活
  31. 就不会丢失数据。这是最强的数据保证。⼀般除⾮是⾦融级别,或跟钱打交道的场景才会使⽤这种配置。
  32. */
  33. properties.put(ProducerConfig.ACKS_CONFIG, 1);
  34. /*
  35. 发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,但是也可能造成消息重复发送,
  36. ⽐如⽹络抖动,所以需要在接收者那边做好消息接收的幂等性处理
  37. */
  38. properties.put(ProducerConfig.RETRIES_CONFIG, 3);
  39. // 重试间隔设置
  40. properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
  41. // 设置发送消息的本地缓冲区,如果设置了缓冲区,那么消息会先发到缓冲区中,可以提高消息的发送性能,默认值是 32M
  42. properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
  43. /*
  44. kafka 本地线程会从本地缓冲区中读取数据,批量发送到 broker
  45. 设置读取数据的大小 16384,默认即为 16K
  46. */
  47. properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
  48. /*
  49. 默认值是0,意思就是消息必须立即被发送,但这样会影响性能
  50. 一般设置 10ms 左右,就是说这个消息发送完会进入到本地到一个 batch,如果 10 毫秒内,这个 batch 满了 16KB 就会随 batch 一起被发送出去
  51. 如果 10毫秒内,batch 没满,那么也必须把消息发送出去,不能让消息的发送延迟时间太长
  52. */
  53. properties.put(ProducerConfig.LINGER_MS_CONFIG, 10);
  54. // 2. 创建生产者的客户端,传入相关参数
  55. Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(properties);
  56. // 3. 创建消息 key:作用是指定往哪个分区上发 value:具体需要发送的内容
  57. ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, "mykey", "baiyi");
  58. // 4. 发送消息,将得到的元数据进行输出
  59. try {
  60. RecordMetadata recordMetadata = producer.send(producerRecord).get();
  61. log.info("同步方式发送消息结果: topic: {}, partition: {}, offset: {}", recordMetadata.topic(),
  62. recordMetadata.partition(), recordMetadata.offset());
  63. } catch (InterruptedException | ExecutionException e) {
  64. log.error("同步方式发送消息失败,topic: {}, e: {}", TOPIC_NAME, e.getMessage(), e);
  65. }
  66. // 异步发送消息
  67. producer.send(producerRecord, (recordMetadata, e) -> {
  68. if (e != null) {
  69. log.error("异步方式发送消息失败,topic: {}, e: {}", TOPIC_NAME, e.getMessage(), e);
  70. }
  71. if (recordMetadata != null) {
  72. log.info("异步方式发送消息结果: topic: {}, partition: {}, offset: {}", recordMetadata.topic(),
  73. recordMetadata.partition(), recordMetadata.offset());
  74. }
  75. });
  76. }
  77. }

7.2.1 同步发送与异步发送消息

我们在使用 Kafka 客户端进行发送消息时,可以选择使用同步的方式或者是异步的方式进行发送消息,两者的区别是什么呢?

1. 同步发送消息

Kafka 客户端使用同步的方式进行发送消息,生产者把消息往 Kafka 的 broker 发送时,broker 接收到消息之后,需要返回一个 ask 给生产者 producer。
如果生产者在发送消息后没有接收到 ack,生产者会进行阻塞,阻塞 3s 时间,如果还是没有收到消息,会进行重试。重试的次数为 3 次。如果还是没有接收到相关 ACK 响应,会抛出异常信息。

相关逻辑图如下所示:
image.png

  1. RecordMetadata metadata = producer.send(producerRecord).get();
  2. System.out.println("同步⽅式发送消息结果:" + "topic-" +
  3. metadata.topic() + "|partition-"
  4. + metadata.partition() + "|offset-" + metadata.offset());

这里会有一些关于 ACK 相关的概念会在后面进行分析。

2. 异步发送消息

Kafka 客户端使用异步的方式进行发送消息,生产者把消息往 Kafka 到 broker 发送时,broker 无需向 生产者 producer 进行返回 ACK。这时候我们可以通过一个 callback 回调函数进行判断消息是否发送成功,如果发送失败,可以进行相关到业务处理(写操作日志等)。

相关逻辑图如下所示:
image.png

  1. //5.异步发送消息
  2. producer.send(producerRecord, new Callback() {
  3. public void onCompletion(RecordMetadata metadata, Exception
  4. exception) {
  5. if (exception != null) {
  6. System.err.println("发送消息失败:" +
  7. exception.getStackTrace());
  8. }
  9. if (metadata != null) {
  10. System.out.println("异步⽅式发送消息结果:" + "topic-" +
  11. metadata.topic() + "|partition-"
  12. + metadata.partition() + "|offset-" + metadata.offset());
  13. }
  14. }
  15. });

7.2.2 生产者 ACK 机制

Kafka 客户端在使用同步方式进行发送消息时,生产者在获得 Kafka 集群返回的 ack 之前会一直处于阻塞状态。那么 Kafka 集群什么时候会进行返回 ack 呢?集群 ack 相关的概念在官网上如下所示:
image.png
ACK 概念解析:

  • acks = 0:Kafka cluster 不需要任何 broker 接收到生产者发送的消息,就立刻返回 ack 给生产者。这种方式效率是最高的,但是最容易丢消息。
  • acks = 1:这种方式是 Kafka 集群默认使用的 ack 配置。多副本之间的 leader 已经接收到信息,并把消息写入到本地的 log 文件中,才会返回 ack 给生产者。这种方式在性能和安全上是最均衡的。但是如果这时候 leader 还没来得及把消息同步到 flower 就已经挂掉了,也会丢失消息。�
  • acks = -1/all。需要等待 min.insync.replicas(默认为1,推荐配置⼤于等于2) 这个参数配置的副本个数都成功写⼊⽇志,这种策略会保证只要有⼀个备份存活就不会丢失数据。这是最强的数据保证。⼀般除⾮是⾦融级别,或跟钱打交道的场景才会使⽤这种配置。

关于 ack 和重试(如果没有收到 ack,则开启重试)代码如下:

  1. props.put(ProducerConfig.ACKS_CONFIG, "1");
  2. /*
  3. 发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,但是也可能造
  4. 成消息重复发送,⽐如⽹络抖动,所以需要在接收者那边做好消息接收的幂等性处理
  5. */
  6. props.put(ProducerConfig.RETRIES_CONFIG, 3);
  7. //重试间隔设置
  8. props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);

7.2.3 消息发送缓冲区

在 Kafka 客户端进行发送消息时,不会每次都建立一个新会话,你可以假设想一下如果需要发送 10 万条消息,需要建立 10 万次会话,那这资源得多大,多浪费呢?
Kafka 在发送消息时默认会有一个 32M 大小的缓冲区,消息会先往缓冲区中进行存储,Kafka 生产者本地会有一个线程对缓冲区进行扫描,每次拉取 16 K 数据往 broker 中发送,如果数据未达到 16K,10 毫秒之后也会将当前拉取到的数据进行发送给 broker。
image.png

  • Kafka 默认会创建一个本地缓冲区进行存放要发送的消息,大小为 32M。

    1. // 设置发送消息的本地缓冲区,如果设置了缓冲区,那么消息会先发到缓冲区中,可以提高消息的发送性能,默认值是 32M
    2. properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
  • Kafka 本地线程往本地缓冲区中一次拉取 16KB 数据,发送到 broker

    1. /*
    2. kafka 本地线程会从本地缓冲区中读取数据,批量发送到 broker
    3. 设置读取数据的大小 16384,默认即为 16K
    4. */
    5. properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
  • Kafka 本地线程如果拉取不到 16KB 数据,间隔 10ms 也会将消息发送到 broker

    1. /*
    2. 默认值是0,意思就是消息必须立即被发送,但这样会影响性能
    3. 一般设置 10ms 左右,就是说这个消息发送完会进入到本地到一个 batch,如果 10 毫秒内,这个 batch 满了 16KB 就会随 batch 一起被发送出去
    4. 如果 10毫秒内,batch 没满,那么也必须把消息发送出去,不能让消息的发送延迟时间太长
    5. */
    6. properties.put(ProducerConfig.LINGER_MS_CONFIG, 10);

    7.3 消费者

    7.3.1 基本实现

    ```java package com.baiyi.basic.producer;

import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.Properties; import java.util.function.Consumer;

/**

  • @author: BaiYi
  • @description: kafka consumer
  • @date: 2022/3/27 23:20 */ @Slf4j public class KafkaConsumer{ private static final String TOPIC_NAME = “my-replicated-topic”; private static final String CONSUMER_GROUP_NAME = “testGroup”;

    public static void main(String[] args) {

    1. Properties properties = new Properties();
    2. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "47.107.73.103:9093,47.107.73.103:9094");
    3. // 设置消费组
    4. properties.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
    5. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    6. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    7. // 创建一个消费者客户端
    8. org.apache.kafka.clients.consumer.KafkaConsumer<String, String> consumer =
    9. new org.apache.kafka.clients.consumer.KafkaConsumer<>(properties);
    10. // 消费者订阅主题列表
    11. consumer.subscribe(Collections.singletonList(TOPIC_NAME));
    12. while (true) {
    13. // 每 1000 s 拉取一次消息
    14. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10));
    15. records.forEach(record -> {
    16. log.info("消费者消费到消息, topic: {}, partition: {}, offset: {}, key: {}, value: {}", record.topic(),
    17. record.partition(), record.offset(), record.key(), record.value());
    18. });
    19. }

    } }

    1. <a name="sXbIE"></a>
    2. ### 7.3.2 自动提交和手动提交 offset
    3. <a name="qeaLP"></a>
    4. #### 1. 提交的内容
    5. 消费者无论是使用自动提交或手动提交 offset,都需要把它所属的消费组 + 消费的某个主题 + 消费的某个分区以及消费的偏移量,这样的信息提交到Kafka 集群的 __consumer_offsets 主题中。
    6. <a name="gACF0"></a>
    7. #### 2. 自动提交
    8. 消费者 poll 消息下来之后就自动提交了 offset,在 Kafka 中默认使用的就是自动提交 offset。
    9. ```java
    10. // 自动提交 offset 默认就是自动提交 offset
    11. properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    12. // ⾃动提交offset的间隔时间
    13. properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

    注意:自动提交 offset 会丢失消息,因为消费者刚把消息 poll 下来就直接把 offset 提交到 Kafka 集群的 __consumer_offsets 主题中,如果消费者这时候挂掉了,poll 下来的消息还未来得及消费,就会丢失消息。

    3. 手动提交

    需要把以下自动提交参数改为 false。

    1. // 自动提交 offset 默认就是自动提交 offset
    2. properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

    手动提交 offset 的方式分为两种:手动同步提交、手动异步提交。

  • 手动同步提交

在消费者消费完消息之后调用同步提交的方法,当集群返回 ack 之前一直处于阻塞状态,返回 ack 后表示提交成功,执行之后的逻辑。

  1. while (true) {
  2. // 每 1000 ms 拉取一次消息
  3. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  4. records.forEach(record -> {
  5. log.info("消费者消费到消息, topic: {}, partition: {}, offset: {}, key: {}, value: {}", record.topic(),
  6. record.partition(), record.offset(), record.key(), record.value());
  7. });
  8. //所有的消息已消费完
  9. if (records.count() > 0) {//有消息
  10. // ⼿动同步提交offset,当前线程会阻塞直到offset提交成功
  11. // ⼀般使⽤同步提交,因为提交之后⼀般也没有什么逻辑代码了
  12. consumer.commitSync();//=======阻塞=== 提交成功
  13. }
  14. }
  • 手动异步提交

在消息消费完之后调用异步提交方法,无需等待集群返回 ack ,直接执行之后的逻辑,可以设置一个回调函数,供集群使用。

  1. while (true) {
  2. // 每 1000 ms 拉取一次消息
  3. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  4. records.forEach(record -> {
  5. log.info("消费者消费到消息, topic: {}, partition: {}, offset: {}, key: {}, value: {}", record.topic(),
  6. record.partition(), record.offset(), record.key(), record.value());
  7. });
  8. //所有的消息已消费完
  9. if (records.count() > 0) {//有消息
  10. // consumer.commitSync();//=======阻塞=== 提交成功
  11. consumer.commitAsync((map, e) -> {
  12. if (e != null) {
  13. log.error("消费者手动异步提交offset失败, offsets: {}, e: {}", map, e.getMessage(), e);
  14. // 执行相关操作 写异常日志....
  15. }
  16. });
  17. }
  18. }

7.3.3 长轮询 poll 消息

消费者和 broker 建立连接之后,开始 poll 消息。

  • 默认一次 poll 500 条消息。

    1. // ⼀次poll最⼤拉取消息的条数,可以根据消费速度的快慢来设置
    2. properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
  • 代码中设置了长轮询的时间为 1000 ms

    1. while (true) {
    2. // 每 1000 ms 拉取一次消息
    3. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    4. records.forEach(record -> {
    5. log.info("消费者消费到消息, topic: {}, partition: {}, offset: {}, key: {}, value: {}", record.topic(),
    6. record.partition(), record.offset(), record.key(), record.value());
    7. });
    8. //所有的消息已消费完
    9. if (records.count() > 0) {//有消息
    10. // ⼿动同步提交offset,当前线程会阻塞直到offset提交成功
    11. // ⼀般使⽤同步提交,因为提交之后⼀般也没有什么逻辑代码了
    12. consumer.commitSync();//=======阻塞=== 提交成功
    13. }
    14. }
  • 消费者 poll 消息的三种情况:

    1. 如果一次性 poll 到 500 条消息,直接进行 for 循环消费消息。
    2. 如果这一次没有 poll 到 500 条消息,且时间在 1秒内,那么继续进行 poll 消息,要么到 500 条,要么时间到 1秒,才会进行 for 循环消费消息。
    3. 如果多次 poll 不到 500 条消息,且时间已经达到 1秒,那么直接进行 for 循环进行消费消息。
  • 如果两次 poll 的时间超过了 30 秒,集群会认为该消费者消费能力弱,将该消费者踢出消费组,触发 rebalance 机制,让该消费组的其他消费者进行消费该分区消息。rebalance 机制会造成性能开销,可以通过设置以下参数,让一次 poll 的消息尽量少一些。 ```java // ⼀次poll最⼤拉取消息的条数,可以根据消费速度的快慢来设置 properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);

//如果两次poll的时间如果超出了30s的时间间隔,kafka会认为其消费能⼒过弱,将其踢出消费组。将分区分配给其他消费者。-rebalance properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);

  1. <a name="zus1D"></a>
  2. ### 7.3.4 消费者的健康状态检查
  3. 消费者每隔 1s 向 kafka 集群发送⼼跳,集群发现如果有超过 10s 没有续约的消费者,将被踢出消费组,触发该消费组的 rebalance 机制,将该分区交给消费组⾥的其他消费者进⾏消费。
  4. ```java
  5. //consumer给broker发送⼼跳的间隔时间
  6. props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
  7. //kafka如果超过10秒没有收到消费者的⼼跳,则会把消费者踢出消费组,进⾏rebalance,把分区分配给其他消费者。
  8. props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);

7.3.5 指定分区、偏移量、时间消费

  • 指定分区消费

    1. consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
  • 从头消费

    1. consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
    2. consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
  • 指定 offset 消费

    1. consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
    2. consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);
  • 指定时间消费

根据时间,去所有的 partition 中确定该时间对应的 offset,然后去所有的 partition 中找到该 offset 之后的消息开始消费。

  1. // 从指定时间开始消费
  2. List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME); // 获取当前主题的所有 partition
  3. // 从一个小时前开始消费
  4. long fetchTime = new Date().getTime() - 1000 * 60 * 60;
  5. Map<TopicPartition, Long> topicPartitionMap = new HashMap<>();
  6. for (PartitionInfo partitionInfo : topicPartitions) {
  7. topicPartitionMap.put(new TopicPartition(TOPIC_NAME, partitionInfo.partition()), fetchTime);
  8. }
  9. Map<TopicPartition, OffsetAndTimestamp> partitionMap = consumer.offsetsForTimes(topicPartitionMap);
  10. for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : partitionMap.entrySet()) {
  11. TopicPartition key = entry.getKey();
  12. OffsetAndTimestamp value = entry.getValue();
  13. if(key == null || value == null) continue;
  14. long offset = value.offset();
  15. log.info("topic: {}, partition: {}, offset: {}\n", TOPIC_NAME, key.partition(), offset);
  16. // 开始根据 partition 和 offset 进行消费
  17. consumer.assign(Collections.singleton(key));
  18. consumer.seek(key, offset);
  19. }

7.3.6 新消费组的消费 offset 规则

新消费组中的消费者启动之后,默认会从当前分区的 offset + 1 的位置开始消费消息。如果想该消费者从头开始消费这主题的消息,可以通过修改以下这个配置实现,但是需要注意的是,只有在第一次启动的时候才会从头开始消费,后面会从最后消费消息的 offset + 1 位置进行消费。

  • latest:这是 Kafka 默认的配置。从当前分区的 offset + 1 位置进行消费。
  • earliest:第一次从头开始消费,之后消费新消息(从最后消费消费的 offset + 1 位置开始)。
    1. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

八、Spring Boot 操作 Kafka

8.1 引入依赖

  1. <!-- 使用 Spring boot 操作 Kafka -->
  2. <dependency>
  3. <groupId>org.springframework.kafka</groupId>
  4. <artifactId>spring-kafka</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.boot</groupId>
  8. <artifactId>spring-boot-starter-web</artifactId>
  9. </dependency>

8.2 编写配置文件

  1. server:
  2. port: 8080
  3. spring:
  4. application:
  5. name: kafka
  6. kafka:
  7. bootstrap-servers: 47.107.73.103:9093,47.107.73.103:9094
  8. producer: # ⽣产者
  9. retries: 3 # 设置⼤于0的值,则客户端会将发送失败的记录重新发送
  10. batch-size: 16384 # 每次拉取 16KB 数据
  11. buffer-memory: 33554432 # 本地缓冲区 32M
  12. acks: 1
  13. # 指定消息key和消息体的编解码⽅式
  14. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  15. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  16. consumer:
  17. group-id: default-group
  18. enable-auto-commit: false
  19. auto-offset-reset: earliest # 新消费组消费者启动时默认从第一个位置开始消费 默认为 latest
  20. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  21. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  22. max-poll-records: 500
  23. listener:
  24. # 当每⼀条记录被消费者监听器(ListenerConsumer)处理之后提交
  25. # RECORD
  26. # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
  27. # BATCH
  28. # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间⼤于TIME时提交
  29. # TIME
  30. # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量⼤于等于COUNT时提交
  31. # COUNT
  32. # TIME | COUNT 有⼀个条件满⾜时提交
  33. # COUNT_TIME
  34. # 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后, ⼿动调⽤Acknowledgment.acknowledge()后提交
  35. # MANUAL
  36. # ⼿动调⽤Acknowledgment.acknowledge()后⽴即提交,⼀般使⽤这种
  37. # MANUAL_IMMEDIATE
  38. ack-mode: MANUAL_IMMEDIATE
  39. kafka:
  40. topic: my-replicated-topic
  41. group: baiyi

8.3 编写生产者

  1. import com.baiyi.config.KafkaConfig;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.kafka.core.KafkaTemplate;
  4. import org.springframework.web.bind.annotation.GetMapping;
  5. import org.springframework.web.bind.annotation.RestController;
  6. /**
  7. * @author: BaiYi
  8. * @description:
  9. * @date: 2022/3/29 23:55
  10. */
  11. @RestController
  12. public class KafkaController {
  13. @Autowired
  14. private KafkaConfig kafkaConfig;
  15. @Autowired
  16. private KafkaTemplate<String, String> kafkaTemplate;
  17. @GetMapping("/send")
  18. public String sendMessage() {
  19. kafkaTemplate.send(kafkaConfig.getTopic(), 0, "baiyi", "this is message");
  20. return "success";
  21. }
  22. }

8.4 编写消费者

  1. @Slf4j
  2. @Component
  3. public class KafkaConsumer {
  4. /**
  5. * 根据主题进行消费
  6. * @param records 拉取的消费
  7. * @param ack
  8. */
  9. @KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.group}")
  10. public void listenTopic(ConsumerRecords<String, String> records, Acknowledgment ack) {
  11. records.forEach(record -> {
  12. log.info("topic: {}, partition: {}, offset: {}, key: {}, value: {}", record.topic(), record.partition(),
  13. record.offset(), record.key(), record.value());
  14. });
  15. // 手动提交 offset
  16. ack.acknowledge();
  17. }
  18. }

8.5 消费者中配置消费主题、分区和偏移量

  1. /**
  2. * 指定消费组以及分区、主题、offset 进行消费
  3. * concurrency: 代表消费者的数量,一般设置小于等于分区数
  4. * @param records
  5. * @param ack
  6. */
  7. @KafkaListener(groupId = "${kafka.group}", topicPartitions = {
  8. @TopicPartition(topic = "test", partitions = {"0", "1"}),
  9. @TopicPartition(topic = "test1", partitions = {"1"}, partitionOffsets = @PartitionOffset(partition = "1",
  10. initialOffset = "100"))
  11. }, concurrency = "3")
  12. public void listenTopicPro(ConsumerRecords<String, String> records, Acknowledgment ack) {
  13. records.forEach(record -> {
  14. log.info("topic: {}, partition: {}, offset: {}, key: {}, value: {}", record.topic(), record.partition(),
  15. record.offset(), record.key(), record.value());
  16. });
  17. // 手动提交 offset
  18. ack.acknowledge();
  19. }

九、Kafka 集群中 Controller、Rebalance 和 HW

9.1 Controller