一、消息队列简介
1.1 传统消息队列的应用场景
1.2 使用消息队列的好处
1)解耦
允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
2)可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
3)缓冲
有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
4)灵活性 & 峰值处理能力
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
5)异步通信
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
1.3 消息队列的两种模式
Kafka 通过一个 Topic 只能被一个消费组消费来实现类似于点对点模式。通过多个消费组消费同一个 Topic 来实现发布订阅模式。 单播消息 —> 队列
多播消息 —> 发布订阅
点对点模式
一对一,消费者主动拉取数据,消息收到后消息清除。消息生产者生产消息发送到 Queue 中,然后消息消费者从 Queue 中取出并且消费消息。消息被消费以后,queue 中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue 支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
发布/订阅模式
一对多 ,消费者数据之后不会清除 。消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到 topic 的消息会被所有订阅者消费。
Kafka 由消费者主动拉取消费消息。
二、Kafka 概述
Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。
Apache Kafka® 是 一个分布式流处理平台。它可以用于两大类别的应用:
- 构造实时流数据管道,它可以在系统或应用之间可靠地获取数据。 (相当于 message queue)。
- 构建实时流式应用程序,对这些流数据进行转换或者影响。 (就是流处理,通过 kafka stream topic 和 topic 之间内部进行变化)。
流处理平台有以下三种特性:
- 可以让你发布和订阅流式的记录。这一方面与消息队列或者企业消息系统类似。
- 可以储存流式的记录,并且有较好的容错性。
- 可以在流式记录产生时就进行处理。
2.1 Kafka 基本概念

1)Producer:消息生产者,就是向 kafka broker 发消息的客户端;
2)Consumer:消息消费者,向 kafka broker 拉取消息的客户端;
3)Consumer Group(CG):消费者组,由多个 consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
4)Broker:一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 可以容纳多个 topic。
5)Topic:可以理解为一个队列,生产者和消费者面向的都是一个 topic;
6)Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列;
7)Replica:副本,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 kafka 仍然能够继续工作,kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本,一个 Leader 和若干个 Follower。
Topic 是逻辑上的概念,而 Partition 是物理上的概念。 Partition 是针对一个主题在同一个 Broker 上进行拆分(分而治之);Replica 指的是主题的副本(备份)的数量,既然是备份肯定是得在不同的 Broker 上才有意义。 Partition 数量是可以任意指定的,而 Replica 的数量是不能大于集群的 Broker 的数量的。
8)Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 Leader。
9)Follower:每个分区多个副本中的“从”,实时从 Leader 中同步数据,保持和 Leader 数据的同步。Leader 发生故障时,某个 Follower 会成为新的 Follower。
2.2 Kafka 特点
1、Consumer 启动时,默认消费从启动这一刻之后的消息。可以通过 from-begining 参数来指定从头开始消费消息。from-begining 的作用是每次消费者启动的时候都会从头开始消费消息。
bin/kafka-console-consumer.sh --bootstrap-server 192.168.65.60:9092 --from-beginning --topic test
Kafka 还有一个配置参数 auto.offset.reset,用于指定当消费者没有 offset 的时候消费消息的策略。
当消费主题的是一个新的消费组,或者指定 offset 的消费方式,offset 不存在,那么应该如何消费?
latest(默认) :只消费自己启动之后发送到主题的消息。
earliest:第一次从头开始消费,以后按照消费 offset 记录继续消费,这个需要区别于 consumer.seekToBeginning (每次都从头开始消费)
2、Kafka 消息消费后,消息在 Producer 端还是存在的,不会被删除,Consumer 是可以再次消费的。
默认保存在磁盘一周的时间。
log.retetion.hours=168
Kafka 的性能和数据大小无关,所以长时间存储数据没有什么问题。
3、消费者的消费信息是由消费者自己维护的。
4、Kafka 保证分区内的顺序性,但是 Topic 下是不保证顺序。
可以通过一个 Topic 单个分区来实现顺序,但是性能会下降。
5、一个分区只能被一个消费者消费,可以被多个消费组消费。因此一个消费组里的消费组个数一般不超过分区数,多了也没用,没有分区可以消费。
6、Kafka 的消息消费模式为 poll,客户端主动去拉取。
7、Kafka 日志与索引
Kafak 消息日志文件主要存放在分区文件夹里的以 log 结尾的日志文件里,如下是 test1 主题对应的分区 0 的消息日志:
其中 .log 结尾的为消息日志文件,Kafka 消息日志文件是分段存储的,日志默认大小为 1 G,名称是以当前日志最小的 offset 命名的。
2.3 Kafka 使用场景
- 日志收集:可以用 Kafka 收集各种服务的 log,通过 kafka 以统一接口服务的方式开放给各种 consumer,例如 hadoop、Hbase、Solr 等。
- 消息系统:解耦和生产者和消费者、缓存消息等。
- 用户活动跟踪:Kafka 经常被用来记录 web 用户或者 app 用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到 kafka 的 topic 中,然后订阅者通过订阅这些 topic 来做实时的监控分析,或者装载到 hadoop、数据仓库中做离线分析和挖掘。
- 运营指标:Kafka 也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
三、Kafka 快速入门
Kafka 的安装
安装前的环境准备
由于 Kafka 是用 Scala 语言开发的,运行在 JVM 上,因此在安装 Kafka 之前需要先安装 JDK。
Kafka 依赖 Zookeeper,所以需要先安装 Zookeeper。yum install java-1.8.0-openjdk* -y
wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gztar -zxvf apache-zookeeper-3.5.8-bin.tar.gzcd apache-zookeeper-3.5.8-bincp conf/zoo_sample.cfg conf/zoo.cfg# 启动zookeeperbin/zkServer.sh start# 查看zk的根目录相关节点bin/zkCli.sh ls /
安装
下载、解压
# 2.11是 scala 的版本,2.4.1 是 kafka 的版本wget https://mirror.bit.edu.cn/apache/kafka/2.4.1/kafka_2.11-2.4.1.tgz# wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.1/kafka_2.11-2.4.1.tgz# 解压tar -xzf kafka_2.11-2.4.1.tgzcd kafka_2.11-2.4.1
修改配置
修改配置文件 config/server.properties:# broker.id 属性在 kafka 集群中必须要是唯一broker.id=0# kafka 部署的机器 ip 和提供服务的端口号listeners=PLAINTEXT://127.0.0.1:9092# kafka 的消息存储文件log.dir=/usr/local/data/kafka-logs# kafka 连接 zookeeper 的地址zookeeper.connect=127.0.0.1:2181
启动服务
现在来启动 kafka 服务,启动脚本语法:
可以看到,server.properties 配置路径是一个强制的参数,-daemon 表示以后台进程运行,否则 ssh 客户端退出后,就会停止服务。./kafka-server-start.sh -daemon ../config/server.properties # 或者使用 bin/kafka-server-start.sh config/server.properties &注意,在启动 kafka 时会使用 linux 主机名关联的 ip 地址,所以需要把主机名和 linux 的 ip 映射配置到本地 host里,用 vim /etc/hosts. 这里是指 kafka 客户端在连接到 broker 时,会使用 broker 的 hostName 去获取对应 IP,因此需要在客户端的 host 里配置对应 ip。
启动后可以查看下 ZooKeeper 的节点情况:
# 进入 ZooKeeper 目录通过 ZooKeeper 客户端查看下 ZooKeeper 的目录树
bin/zkCli.sh
# 查看 zk 的根目录 kafka 相关节点
ls /
# 查看 kafka 节点
ls /brokers/ids
Kafka 集群
对于 kafka 来说,一个单独的 broker 意味着 kafka 集群中只有一个节点。Kafka 的节点都是无状态的,因此要想增加 kafka 集群中的节点数量,只需要多启动几个 broker 实例即可。即修改 server.properties 配置文件,保证 broker id 的唯一,并且所有的节点连接到同一个 ZooKeeper 集群即可。
# broker.id 属性在 kafka 集群中必须要是唯一
broker.id=1
# kafka 部署的机器 ip 和提供服务的端口号
listeners=PLAINTEXT://192.168.65.60:9093
log.dir=/usr/local/data/kafka-logs-1
# kafka 连接 zookeeper的地址,要把多个 kafka 实例组成集群,对应连接的 zookeeper 必须相同
zookeeper.connect=127.0.0.1:2181
Kafka 的基本操作
主题
创建主题
—topic:定义 topic 名。
—replication-factor:定义副本数。
—partitions:定义分区数。
bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test
# 创建多分区的主题
bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 2 --topic test1
# 增加 topic 的分区数量(目前kafka不支持减少分区)
bin/kafka-topics.sh -alter --partitions 3 --zookeeper 127.0.0.1:2181 --topic test
除了我们通过手工的方式创建 Topic,当 producer 发布一个消息到某个指定的 Topic,这个 Topic 如果不存在,就自动创建。
创建主题时指定了 —replication-factor,该值不能大于 Kafka 集群的 broker 的数量! Replication factor: 2 larger than available brokers: 1.
查看主题
bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181
查看主题详情
bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic test1

输出内容的解释:第一行是所有分区的概要信息,之后的每一行表示每一个 partition 的信息。
- leader 节点负责给定 partition 的所有读写请求。
- replicas 表示某个 partition 在哪几个 broker 上存在备份。不管这个节点是不是 Leader,甚至这个节点挂了,也会列出。
- isr 是 replicas 的一个子集,它只列出当前还存活着的,并且已同步备份了该 partition 的节点。
删除主题
Kafka 默认是不会删除主题的,需要在 server.properties 里设置 delete.topic.enable=true 否则只是标记删除。
bin/kafka-topics.sh --delete --topic test --zookeeper 127.0.0.1:2181
修改分区数
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --alter --topic test --partitions 6
消息
发送消息
bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
>this is a msg
>this is a another msg
在默认情况下,每一个行会被当做成一个独立的消息。
消费消息
默认是消费最新的消息,如果想要消费之前的消息可以通过 —from-beginning 参数指定。
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test
# 每次启动消费者时,从头开始消费
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic test
消费多主题
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --whitelist "test|test2"
消费组
可以通过 group.id=groupId 来指定消费者所属的的消费组。
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --consumer-property group.id=testGroup-2 --topic test
查看消费组名
bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list
查看消费组的消费偏移量
bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group testGroup

current-offset:当前消费组的已消费偏移量
log-end-offset:主题对应分区消息的结束偏移量(HW)
lag:当前消费组未消费的消息数
四、Kafka 核心参数
server.properties 核心配置
| Property | Default | Description |
|---|---|---|
| broker.id | 0 | 每个 broker 都可以用一个唯一的非负整数 id 进行标识;这个 id 可以作为broker 的“名字”,你可以选择任意你喜欢的数字作为 id,只要 id 是唯一的即可。 |
| log.dirs | /tmp/kafka-logs | kafka 存放数据的路径。这个路径并不是唯一的,可以是多个,路径之间只需要使用逗号分隔即可;每当创建新 partition 时,都会选择在包含最少 partitions 的路径下进行。 |
| listeners | PLAINTEXT://192.168.65.60:9092 | server 接受客户端连接的端口,ip 配置 kafka 本机 ip 即可。 |
| zookeeper.connect | localhost:2181 | zooKeeper 连接字符串的格式为:hostname:port,此处 hostname 和 port 分别是 ZooKeeper 集群中某个节点的 host 和 port;zookeeper 如果是集群,连接方式为 hostname1:port1, hostname2:port2, hostname3:port3 |
| log.retention.hours | 168 | 每个日志文件删除之前保存的时间。默认数据保存时间对所有 topic 都一样。 |
| num.partitions | 1 | 创建 topic 的默认分区数。 |
| default.replication.factor | 1 | 自动创建 topic 的默认副本数量,建议设置为大于等于 2。但是不能大于集群的 broker 的数量。 |
| min.insync.replicas | 1 | 当 producer 设置 acks 为 -1 时,min.insync.replicas 指定 replicas 的最小数目(必须确认每一个 repica 的写数据都是成功的),如果这个数目没有达到,producer 发送消息会产生异常。 |
| delete.topic.enable | false | 是否允许删除主题。 |
发送核心参数
ack 持久化机制
props.put(ProducerConfig.ACKS_CONFIG, "1");
ack:
发送消息持久化策略参数。
(1)acks=0: 表示 producer 不需要等待任何 broker 确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。
(2)acks=1: 至少要等待 leader 已经成功将数据写入本地 log,但是不需要等待所有 follower 是否成功写入。就可以继续发送下一条消息。这种情况下,如果 follower 没有成功备份数据,而此时 leader 又挂掉,则消息会丢失。
(3)acks=-1 或 all: 需要等待 min.insync.replicas(默认为1,推荐配置大于等于 2) 这个参数配置的副本个数都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。
消息重试机制
发送失败会重试,默认重试间隔 100ms,重试能保证消息发送的可靠性,但是也可能造成消息重复发送,比如网络抖动,所以需要在接收者那边做好消息接收的幂等性处理。
// 重试次数
props.put(ProducerConfig.RETRIES_CONFIG, 3);
// 重试间隔设置
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
批量发送
// 设置发送消息的本地缓冲区,如果设置了该缓冲区,消息会先发送到本地缓冲区,
// 可以提高消息发送性能,默认值是 33554432,即 32MB
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
// kafka 本地线程会从缓冲区取数据,批量发送到 broker
// 设置批量发送消息的大小,默认值是 16384,即 16 kb,就是说一个 batch 满了 16kb 就发送出去
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
// 批量发送的间隔时间(延迟时间),默认值是 0,意思就是消息必须立即被发送,但这样会影响性能。
// 一般设置 10 毫秒左右,就是说这个消息发送完后会进入本地的一个 batch,
// 如果 10 毫秒内,这个 batch 满了16kb 就会随 batch 一起被发送出去。
// 如果 10 毫秒内,batch 没满,那么也必须把消息发送出去,不能让消息的发送延迟时间太长。
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
消费核心参数
元信息
- bootstrap.servers
在启动 consumer 时配置的 broker 地址的。不需要将 cluster 中所有的 broker 都配置上,因为启动后会自动的发现 cluster 所有的broker。它配置的格式是:host1:port1;host2:port2…
- key.descrializer、value.descrializer
Message record 的 key, value 的反序列化类。
- group.id
用于表示该 consumer 想要加入到哪个 group 中。默认值是 “”。
- client.id
Consumer 进程的标识。如果设置一个人为可读的值,跟踪问题会比较方便。
- interceptor.classes
用户自定义 interceptor。
- metadata.max.age.ms
Metadata 数据的刷新间隔。即便没有任何的 partition 订阅关系变更也执行。范围是:[0, Integer.MAX],默认值是:300000 (5 min)
- group.instance.id
A unique identifier of the consumer instance provided by the end user. Only non-empty strings are permitted. If set, the consumer is treated as a static member, which means that only one instance with this ID is allowed in the consumer group at any time. This can be used in combination with a larger session timeout to avoid group rebalances caused by transient unavailability (e.g. process restarts). If not set, the consumer will join the group as a dynamic member, which is the traditional behavior.
offset
- enable.auto.commit
Consumer 在commit offset时有两种模式:自动提交,手动提交。手动提交在前面已经说过。自动提交:是Kafka Consumer会在后台周期性的去commit。默认值是true。
- auto.commit.interval.ms
自动提交间隔。范围:[0,Integer.MAX],默认值是 5000 (5 s)
- auto.offset.reset
这个配置项,是告诉 Kafka Broker 在发现 kafka 在没有初始 offset,或者当前的 offset 是一个不存在的值(如果一个 record 被删除,就肯定不存在了)时,该如何处理。它有4种处理方式:
1) earliest:自动重置到最早的 offset。
2) latest:看上去重置到最晚的 offset。
3) none:如果更早的 offset 也没有的话,就抛出异常给 consumer,告诉 consumer 在整个 consumer group 中都没有发现有这样的offset。
4) 如果不是上述3种,只抛出异常给 consumer。
默认值是latest。
timeout
- max.poll.interval.ms
Consumer 需要不间断的调用 poll()。如果长时间没有调用 poll,且间隔超过这个值时,就会认为这个 consumer 失败了。
- heartbeat.interval.ms
心跳间隔。心跳是在 consumer 与 coordinator 之间进行的。心跳是确定 consumer 存活,加入或者退出 group 的有效手段。这个值必须设置的小于 session.timeout.ms,因为当 Consumer 由于某种原因不能发 Heartbeat 到 coordinator 时,并且时间超过session.timeout.ms 时,就会认为该 consumer 已退出,它所订阅的 partition 会分配到同一 group 内的其它的 consumer 上。通常设置的值要低于 session.timeout.ms 的 1/3。默认值是:3000 (3s)
- session.timeout.ms
Consumer session 过期时间。这个值必须设置在 broker configuration 中的 group.min.session.timeout.ms 与 group.max.session.timeout.ms 之间。其默认值是:10000 (10 s)
- connections.max.idle.ms
连接空闲超时时间。因为 consumer 只与 broker 有连接(coordinator也是一个broker),所以这个配置的是 consumer 到 broker 之间的。默认值是:540000 (9 min)
- request.timeout.ms
请求发起后,并不一定会很快接收到响应信息。这个配置就是来配置请求超时时间的。默认值是:305000 (305 s)
fetch
- fetch.max.wait.ms
Fetch 请求发给 broker 后,在 broker 中可能会被阻塞的(当 topic 中 records 的总 size 小于 fetch.min.bytes 时),此时这个 fetch请求耗时就会比较长。这个配置就是来配置 consumer 最多等待 response 多久。
- fetch.min.bytes
当 consumer 向一个 broker 发起 fetch 请求时,broker 返回的 records 的大小最小值。如果 broker 中数据量不够的话会 wait,直到数据大小满足这个条件。取值范围是:[0, Integer.Max],默认值是 1。默认值设置为 1 的目的是:使得 consumer 的请求能够尽快的返回。
- fetch.max.bytes
一次 fetch 请求,从一个 broker 中取得的 records 最大大小。如果在从 topic 中第一个非空的 partition 取消息时,如果取到的第一个record 的大小就超过这个配置时,仍然会读取这个 record,也就是说在这种情况下,只会返回这一条 record。
broker、topic 都会对 producer 发给它的 message size 做限制。所以在配置这值时,可以参考 broker 的 message.max.bytes 和 topic 的 max.message.bytes 的配置。取值范围是:[0, Integer.Max],默认值是:52428800 (5 MB)
- max.partition.fetch.bytes
一次 fetch 请求,从一个 partition 中取得的 records 最大大小。如果在从 topic 中第一个非空的 partition 取消息时,如果取到的第一个 record 的大小就超过这个配置时,仍然会读取这个 record,也就是说在这种情况下,只会返回这一条 record。
broker、topic 都会对 producer 发给它的 message size 做限制。所以在配置这值时,可以参考 broker 的 message.max.bytes 和 topic 的 max.message.bytes 的配置。
- max.poll.records
Consumer 每次调用 poll() 时取到的 records 的最大数。
- receive.buffer.byte
Consumer receiver buffer (SO_RCVBUF)的大小。这个值在创建 Socket 连接时会用到。取值范围是:[-1, Integer.MAX]。默认值是:65536 (64 KB)如果值设置为-1,则会使用操作系统默认的值。

