官方下载地址 http://kafka.apache.org/downloads

jdk

  1. # yum install java-1.8.0-openjdk-devel -y

查看安装结果

  1. # java -version
  2. openjdk version "1.8.0_232"
  3. OpenJDK Runtime Environment (build 1.8.0_232-b09)
  4. OpenJDK 64-Bit Server VM (build 25.232-b09, mixed mode)

zookeeper

下载地址 https://archive.apache.org/dist/zookeeper/ 下载的版本zookeeper-3.4.12

  1. wget https://archive.apache.org/dist/zookeeper/zookeeper-3.4.12/zookeeper-3.4.12.tar.gz
  2. tar -zxvf zookeeper-3.4.12.tar.gz -C /opt/
  3. cd /opt
  4. mv zookeeper-3.4.12/ zookeeper
  5. cd zookeeper/conf/
  6. cp zoo_sample.cfg zoo.cfg

修改配置文件vim zoo.cfg

创建数据存储文件 mkdir -p /data/zookeeper
创建日志存储文件 mkdir -p /log/zookeeper

  1. # The number of milliseconds of each tick
  2. tickTime=2000
  3. # The number of ticks that the initial
  4. # synchronization phase can take
  5. initLimit=10
  6. # The number of ticks that can pass between
  7. # sending a request and getting an acknowledgement
  8. syncLimit=5
  9. # the directory where the snapshot is stored.
  10. # do not use /tmp for storage, /tmp here is just
  11. # example sakes.
  12. dataDir=/data/zookeeper
  13. dataLogDir=/log/zookeeper
  14. # the port at which the clients will connect
  15. clientPort=2181

启动zk

  1. cd /opt/zookeeper/bin/
  2. ./zkServer.sh start
  3. ZooKeeper JMX enabled by default
  4. Using config: /opt/zookeeper/bin/../conf/zoo.cfg
  5. Starting zookeeper ... STARTED

建议设置环境变量 vim /etc/profile,并执行 source /etc/profile 命令使配置生效:

  1. export ZK_HOME=/opt/zookeeper
  2. export PATH=$PATH:$ZK_HOME/bin

查看zk 状态

  1. zkServer.sh status
  2. ZooKeeper JMX enabled by default
  3. Using config: /opt/zookeeper/bin/../conf/zoo.cfg
  4. Mode: standalone

kafka安装

解压下载包

  1. wget http://mirror.bit.edu.cn/apache/kafka/2.1.1/kafka_2.12-2.1.1.tgz
  2. tar -zxvf kafka_2.12-2.1.1.tgz -C /opt/
  3. cd /opt/
  4. mv kafka_2.12-2.1.1/ kafka

配置环境变量 sudo vim /etc/profile 执行 source /etc/profile

  1. #KAFKA_HOME
  2. export KAFKA_HOME=/opt/kafka
  3. export PATH=$PATH:$KAFKA_HOME/bin

修改配置文件 vim /opt/kafka/config/server.properties
broker的全局唯一编号,不能重复

  1. # The id of the broker. This must be set to a unique integer for each broker.
  2. broker.id=0

删除topic功能使能

  1. # Switch to enable topic deletion or not, default value is false
  2. delete.topic.enable=true

kafka运行日志存放的路径 mkdir -p /log/kafka

  1. # A comma seperated list of directories under which to store log files
  2. log.dirs=/log/kafka

配置连接Zookeeper集群地址

  1. # Zookeeper connection string (see zookeeper docs for details).
  2. # This is a comma separated host:port pairs, each corresponding to a zk
  3. # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
  4. # You can also append an optional chroot string to the urls to specify the
  5. # root directory for all kafka znodes.
  6. zookeeper.connect=localhost:2181

https://stackoverflow.com/questions/54989802/where-kafka-store-the-meta-data-on-zookeeper-which-path
image.png

设置远程访问vim server.properties

  1. [root@centos7 config]#
  2. #listeners=PLAINTEXT://:9092
  3. # 内网访问的端口
  4. listeners=PLAINTEXT://:9092
  5. # 远程访问的PLAINTEXT://IP地址:9092
  6. #advertised.listeners=PLAINTEXT://your.host.name:9092
  7. advertised.listeners=PLAINTEXT://192.168.196.129:9092

启动kafka

  1. kafka-server-start.sh -daemon /opt/kafka/config/server.properties

查看jps,QuorumPeerMain是zookeeper集群的启动入口类,是用来加载配置启动QuorumPeer线程的。

  1. # jps
  2. 2658 Jps
  3. 2596 Kafka
  4. 1290 QuorumPeerMain

官方安装

http://kafka.apache.org/quickstart
下载

  1. $ wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz
  2. $ tar -xzf kafka_2.13-2.8.0.tgz
  3. cd kafka_2.13-2.8.0

后台启动

  1. bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
  2. bin/kafka-server-start.sh -daemon config/server.properties

查看

  1. bin/kafka-topics.sh --zookeeper localhost:2181 --list


08版本的安装

  1. tar -zxvf kafka_2.8.0-0.8.0.tar.gz

docker安装

https://hub.docker.com/r/wurstmeister/kafka

  1. version: '2'
  2. services:
  3. zookeeper:
  4. image: wurstmeister/zookeeper
  5. ports:
  6. - "2181:2181"
  7. kafka:
  8. image: wurstmeister/kafka
  9. ports:
  10. - "9092"
  11. environment:
  12. KAFKA_ADVERTISED_HOST_NAME: 10.0.3.5
  13. KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  14. volumes:
  15. - /var/run/docker.sock:/var/run/docker.sock
  16. [注] 10.0.3.5 是本机(vm)地址。
  1. docker-compose up --scale kafka=3 -d
  1. kafka-topics.sh --create --topic passbook --partitions 3 --zookeeper zookeeper:2181 --replication-factor 2
  1. kafka-topics.sh --zookeeper zookeeper:2181 --list
  2. kafka-topics.sh --list --bootstrap-server localhost:9092

启动kafaka服务器

  1. ./kafka-server-start.sh -daemon ../config/server.properties

启动消费者

  1. kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic merchants --from-beginning

创建主题

  1. kafka-topics.sh --create --bootstrap-server localhost:9092 --topic passbook --replication-factor 1 --partitions 1

mac os安装

使用brew安装kafka

  1. ==> Installing dependencies for kafka: zookeeper
  2. ==> Installing kafka dependency: zookeeper
  3. ==> Downloading https://homebrew.bintray.com/bottles/zookeeper-3.4.12.high_sierra.bottle.tar.gz
  4. ######################################################################## 100.0%
  5. ==> Pouring zookeeper-3.4.12.high_sierra.bottle.tar.gz
  6. ==> Caveats
  7. To have launchd start zookeeper now and restart at login:
  8. brew services start zookeeper
  9. Or, if you don't want/need a background service you can just run:
  10. zkServer start
  11. ==> Summary
  12. 🍺 /usr/local/Cellar/zookeeper/3.4.12: 242 files, 32.9MB
  13. ==> Installing kafka
  14. ==> Downloading https://homebrew.bintray.com/bottles/kafka-1.1.0.high_sierra.bottle.tar.gz
  15. ######################################################################## 100.0%
  16. ==> Pouring kafka-1.1.0.high_sierra.bottle.tar.gz
  17. ==> Caveats
  18. To have launchd start kafka now and restart at login:
  19. brew services start kafka
  20. Or, if you don't want/need a background service you can just run:
  21. zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties
  22. ==> Summary
  23. 🍺 /usr/local/Cellar/kafka/1.1.0: 157 files, 47.8MB
  24. ==> Caveats
  25. ==> zookeeper
  26. To have launchd start zookeeper now and restart at login:
  27. brew services start zookeeper
  28. Or, if you don't want/need a background service you can just run:
  29. zkServer start
  30. ==> kafka
  31. To have launchd start kafka now and restart at login:
  32. brew services start kafka
  33. Or, if you don't want/need a background service you can just run:
  34. zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties

启动kafka

  1. zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties

JAVA

JAVA 代码

  1. import org.apache.kafka.clients.producer.KafkaProducer;
  2. import org.apache.kafka.clients.producer.ProducerRecord;
  3. import java.util.Properties;
  4. public class ProducerQuickStart {
  5. public static final String brokerList = "ip地址:9092";
  6. public static final String topic = "topic-test";
  7. public static void main(String[] args) {
  8. Properties properties = new Properties();
  9. properties.put("key.serializer",
  10. "org.apache.kafka.common.serialization.StringSerializer");
  11. properties.put("value.serializer",
  12. "org.apache.kafka.common.serialization.StringSerializer");
  13. properties.put("bootstrap.servers", brokerList);
  14. KafkaProducer<String, String> producer =
  15. new KafkaProducer<String, String>(properties);
  16. ProducerRecord<String, String> record =
  17. new ProducerRecord<String, String>(topic, "hi, Kafka!");
  18. try {
  19. producer.send(record);
  20. } catch (Exception e) {
  21. e.printStackTrace();
  22. }
  23. producer.close();
  24. }
  25. }

客户端

  1. import org.apache.kafka.clients.producer.KafkaProducer;
  2. import org.apache.kafka.clients.producer.ProducerRecord;
  3. import java.util.Properties;
  4. public class ProducerQuickStart {
  5. public static final String brokerList = "ip地址:9092";
  6. public static final String topic = "topic-test";
  7. public static void main(String[] args) {
  8. Properties properties = new Properties();
  9. properties.put("key.serializer",
  10. "org.apache.kafka.common.serialization.StringSerializer");
  11. properties.put("value.serializer",
  12. "org.apache.kafka.common.serialization.StringSerializer");
  13. properties.put("bootstrap.servers", brokerList);
  14. KafkaProducer<String, String> producer =
  15. new KafkaProducer<String, String>(properties);
  16. ProducerRecord<String, String> record =
  17. new ProducerRecord<String, String>(topic, "hello, Kafka!");
  18. try {
  19. producer.send(record);
  20. } catch (Exception e) {
  21. e.printStackTrace();
  22. }
  23. producer.close();
  24. }
  25. }

重要配置

broker 端配置

broker.id

每个 kafka broker 都有一个唯一的标识来表示,这个唯一的标识符即是 broker.id,它的默认值是 0。这个值在 kafka 集群中必须是唯一的,这个值可以任意设定,

port

如果使用配置样本来启动 kafka,它会监听 9092 端口。修改 port 配置参数可以把它设置成任意的端口。要注意,如果使用 1024 以下的端口,需要使用 root 权限启动 kakfa。

zookeeper.connect

用于保存 broker 元数据的 Zookeeper 地址是通过 zookeeper.connect 来指定的。比如我可以这么指定 localhost:2181 表示这个 Zookeeper 是运行在本地 2181 端口上的。我们也可以通过 比如我们可以通过 zk1:2181,zk2:2181,zk3:2181 来指定 zookeeper.connect 的多个参数值。该配置参数是用冒号分割的一组 hostname:port/path 列表,其含义如下
hostname 是 Zookeeper 服务器的机器名或者 ip 地址。
port 是 Zookeeper 客户端的端口号
/path 是可选择的 Zookeeper 路径,Kafka 路径是使用了 chroot 环境,如果不指定默认使用跟路径。

如果你有两套 Kafka 集群,假设分别叫它们 kafka1 和 kafka2,那么两套集群的zookeeper.connect 参数可以这样指定:zk1:2181,zk2:2181,zk3:2181/kafka1zk1:2181,zk2:2181,zk3:2181/kafka2

log.dirs

Kafka 把所有的消息都保存到磁盘上,存放这些日志片段的目录是通过 log.dirs 来制定的,它是用一组逗号来分割的本地系统路径,log.dirs 是没有默认值的,你必须手动指定他的默认值。其实还有一个参数是 log.dir,如你所知,这个配置是没有 s 的,默认情况下只用配置 log.dirs 就好了,比如你可以通过 /home/kafka1,/home/kafka2,/home/kafka3 这样来配置这个参数的值。

num.recovery.threads.per.data.dir

对于如下3种情况,Kafka 会使用可配置的线程池来处理日志片段。
服务器正常启动,用于打开每个分区的日志片段;
服务器崩溃后重启,用于检查和截断每个分区的日志片段;
服务器正常关闭,用于关闭日志片段。
默认情况下,每个日志目录只使用一个线程。因为这些线程只是在服务器启动和关闭时会用到,所以完全可以设置大量的线程来达到井行操作的目的。特别是对于包含大量分区的服务器来说,一旦发生崩愤,在进行恢复时使用井行操作可能会省下数小时的时间。设置此参数时需要注意,所配置的数字对应的是 log.dirs 指定的单个日志目录。也就是说,如果 num.recovery.threads.per.data.dir 被设为 8,并且 log.dir 指定了 3 个路径,那么总共需要 24 个线程。

auto.create.topics.enable

默认情况下,kafka 会使用三种方式来自动创建主题,下面是三种情况:
当一个生产者开始往主题写入消息时
当一个消费者开始从主题读取消息时
当任意一个客户端向主题发送元数据请求时
auto.create.topics.enable参数设置成 false,即不允许自动创建 Topic。

unclean.leader.election.enable

是否允许落后进度太多的副本成为leader,建议设置成false

主题默认配置

Kafka 为新创建的主题提供了很多默认配置参数,下面就来一起认识一下这些参数

num.partitions

num.partitions 参数指定了新创建的主题需要包含多少个分区。如果启用了主题自动创建功能(该功能是默认启用的),主题分区的个数就是该参数指定的值。该参数的默认值是 1。要注意,我们可以增加主题分区的个数,但不能减少分区的个数。

default.replication.factor

这个参数比较简单,它表示 kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务default.replication.factor 的默认值为1,这个参数在你启用了主题自动创建功能后有效。

log.retention.ms

Kafka 通常根据时间来决定数据可以保留多久。默认使用 log.retention.hours 参数来配置时间,默认是 168 个小时,也就是一周。除此之外,还有两个参数 log.retention.minutes 和 log.retentiion.ms 。这三个参数作用是一样的,都是决定消息多久以后被删除,推荐使用 log.retention.ms。

log.retention.bytes

另一种保留消息的方式是判断消息是否过期。它的值通过参数 log.retention.bytes 来指定,作用在每一个分区上。也就是说,如果有一个包含 8 个分区的主题,并且 log.retention.bytes 被设置为 1GB,那么这个主题最多可以保留 8GB 数据。所以,当主题的分区个数增加时,整个主题可以保留的数据也随之增加。

log.segment.bytes

上述的日志都是作用在日志片段上,而不是作用在单个消息上。当消息到达 broker 时,它们被追加到分区的当前日志片段上,当日志片段大小到达 log.segment.bytes 指定上限(默认为 1GB)时,当前日志片段就会被关闭,一个新的日志片段被打开。如果一个日志片段被关闭,就开始等待过期。这个参数的值越小,就越会频繁的关闭和分配新文件,从而降低磁盘写入的整体效率。

log.segment.ms

上面提到日志片段经关闭后需等待过期,那么 log.segment.ms 这个参数就是指定日志多长时间被关闭的参数和,log.segment.ms 和 log.retention.bytes 也不存在互斥问题。日志片段会在大小或时间到达上限时被关闭,就看哪个条件先得到满足。

  • message.max.bytes

broker 通过设置 message.max.bytes 参数来限制单个消息的大小,默认是 1000 000, 也就是 1MB,如果生产者尝试发送的消息超过这个大小,不仅消息不会被接收,还会收到 broker 返回的错误消息。跟其他与字节相关的配置参数一样,该参数指的是压缩后的消息大小,也就是说,只要压缩后的消息小于 mesage.max.bytes,那么消息的实际大小可以大于这个值
这个值对性能有显著的影响。值越大,那么负责处理网络连接和请求的线程就需要花越多的时间来处理这些请求。它还会增加磁盘写入块的大小,从而影响 IO 吞吐量。

retention.ms

规定了该主题消息被保存的时常,默认是7天,即该主题只能保存7天的消息,一旦设置了这个值,它会覆盖掉 Broker 端的全局参数值。

retention.bytes

retention.bytes:规定了要为该 Topic 预留多大的磁盘空间。和全局参数作用相似,这个值通常在多租户的 Kafka 集群中会有用武之地。当前默认值是 -1,表示可以无限使用磁盘空间。

JVM 参数配置

JDK 版本一般推荐直接使用 JDK1.8,这个版本也是现在中国大部分程序员的首选版本。
说到 JVM 端设置,就绕不开这个话题,业界最推崇的一种设置方式就是直接将 JVM 堆大小设置为 6GB,这样会避免很多 Bug 出现。
JVM 端配置的另一个重要参数就是垃圾回收器的设置,也就是平时常说的 GC 设置。如果你依然在使用 Java 7,那么可以根据以下法则选择合适的垃圾回收器:

  • 如果 Broker 所在机器的 CPU 资源非常充裕,建议使用 CMS 收集器。启用方法是指定-XX:+UseCurrentMarkSweepGC
  • 否则,使用吞吐量收集器。开启方法是指定-XX:+UseParallelGC

当然了,如果你已经在使用 Java 8 了,那么就用默认的 G1 收集器就好了。在没有任何调优的情况下,G1 表现得要比 CMS 出色,主要体现在更少的 Full GC,需要调整的参数更少等,所以使用 G1 就好了。
一般 G1 的调整只需要这两个参数即可

  • MaxGCPauseMillis

该参数指定每次垃圾回收默认的停顿时间。该值不是固定的,G1可以根据需要使用更长的时间。它的默认值是 200ms,也就是说,每一轮垃圾回收大概需要200 ms 的时间。

  • InitiatingHeapOccupancyPercent

该参数指定了 G1 启动新一轮垃圾回收之前可以使用的堆内存百分比,默认值是45,这就表明G1在堆使用率到达45之前不会启用垃圾回收。这个百分比包括新生代和老年代。

参考

带你涨姿势的认识一下kafka