安装

  1. brew install kafka

实测,很慢很慢…

耐心等待。

安装信息如下:

Warning: You are using macOS 10.15.
We do not provide support for this pre-release version.
You will encounter build failures with some formulae.
Please create pull requests instead of asking for help on Homebrew’s GitHub,
Discourse, Twitter or IRC. You are responsible for resolving any issues you
experience while you are running this pre-release version.
==> Installing dependencies for kafka: zookeeper
==> Installing kafka dependency: zookeeper
==> Downloading https://homebrew.bintray.com/bottles/zookeeper-3.4.13.mojave.bottle.tar.gz
==> Downloading from https://akamai.bintray.com/d1/d1e4e7738cd147dceb3d91b32480c20ac5da27d129905f336ba51c0c01b8a476?__gda
######################################################################## 100.0%
==> Pouring zookeeper-3.4.13.mojave.bottle.tar.gz
==> Caveats
To have launchd start zookeeper now and restart at login:
brew services start zookeeper
Or, if you don’t want/need a background service you can just run:
zkServer start
==> Summary
🍺 /usr/local/Cellar/zookeeper/3.4.13: 244 files, 33.4MB
==> Installing kafka
==> Downloading https://homebrew.bintray.com/bottles/kafka-2.2.1.mojave.bottle.tar.gz
==> Downloading from https://akamai.bintray.com/51/518f131edae4443dc664b4f4775ab82cba4b929ac6f2120cdf250898e35fa0db?__gda
################################################### 71.4%
curl: (18) transfer closed with 15373632 bytes remaining to read
Error: Failed to download resource “kafka”
Download failed: https://homebrew.bintray.com/bottles/kafka-2.2.1.mojave.bottle.tar.gz
Warning: Bottle installation failed: building from source.
==> Downloading https://www.apache.org/dyn/closer.cgi?path=/kafka/2.2.1/kafka_2.12-2.2.1.tgz
==> Downloading from http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.1/kafka_2.12-2.2.1.tgz
######################################################################## 100.0%
==> Caveats
To have launchd start kafka now and restart at login:
brew services start kafka
Or, if you don’t want/need a background service you can just run:
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties
==> Summary
🍺 /usr/local/Cellar/kafka/2.2.1: 149 files, 54.4MB, built in 2 minutes 23 seconds
==> brew cleanup has not been run in 30 days, running now…
Removing: /Users/taoshilei/Library/Logs/Homebrew/redis… (64B)
==> Caveats
==> zookeeper
To have launchd start zookeeper now and restart at login:
brew services start zookeeper
Or, if you don’t want/need a background service you can just run:
zkServer start
==> kafka
To have launchd start kafka now and restart at login:
brew services start kafka
Or, if you don’t want/need a background service you can just run:
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties

从上面的安装信息,我们可以提取出 kafka 的启动命令:

  1. brew services start kafka

注意: 我本地忘记之前执行过后台启动这个命令了,ps -ef | grep kafka 之后,kill -9 pid 一直杀不掉。
应该使用 brew services stop kafka 关掉!

上面的是在后台启动,如果我们不需要让这两个服务在后台启动,我们可以使用下面的命令。

  1. zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties

配置文件

查看下两个默认的配置文件:

查看 zookeeper 的: cat /usr/local/etc/kafka/zookeeper.properties

  1. # Licensed to the Apache Software Foundation (ASF) under one or more
  2. # contributor license agreements. See the NOTICE file distributed with
  3. # this work for additional information regarding copyright ownership.
  4. # The ASF licenses this file to You under the Apache License, Version 2.0
  5. # (the "License"); you may not use this file except in compliance with
  6. # the License. You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. # the directory where the snapshot is stored.
  16. dataDir=/usr/local/var/lib/zookeeper
  17. # the port at which the clients will connect
  18. clientPort=2181
  19. # disable the per-ip limit on the number of connections since this is a non-production config
  20. maxClientCnxns=0

查看 kafka 的:

  1. # Licensed to the Apache Software Foundation (ASF) under one or more
  2. # contributor license agreements. See the NOTICE file distributed with
  3. # this work for additional information regarding copyright ownership.
  4. # The ASF licenses this file to You under the Apache License, Version 2.0
  5. # (the "License"); you may not use this file except in compliance with
  6. # the License. You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. # see kafka.server.KafkaConfig for additional details and defaults
  16. ############################# Server Basics #############################
  17. # The id of the broker. This must be set to a unique integer for each broker.
  18. broker.id=0
  19. ############################# Socket Server Settings #############################
  20. # The address the socket server listens on. It will get the value returned from
  21. # java.net.InetAddress.getCanonicalHostName() if not configured.
  22. # FORMAT:
  23. # listeners = listener_name://host_name:port
  24. # EXAMPLE:
  25. # listeners = PLAINTEXT://your.host.name:9092
  26. #listeners=PLAINTEXT://:9092
  27. # Hostname and port the broker will advertise to producers and consumers. If not set,
  28. # it uses the value for "listeners" if configured. Otherwise, it will use the value
  29. # returned from java.net.InetAddress.getCanonicalHostName().
  30. #advertised.listeners=PLAINTEXT://your.host.name:9092
  31. # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
  32. #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
  33. # The number of threads that the server uses for receiving requests from the network and sending responses to the network
  34. num.network.threads=3
  35. # The number of threads that the server uses for processing requests, which may include disk I/O
  36. num.io.threads=8
  37. # The send buffer (SO_SNDBUF) used by the socket server
  38. socket.send.buffer.bytes=102400
  39. # The receive buffer (SO_RCVBUF) used by the socket server
  40. socket.receive.buffer.bytes=102400
  41. # The maximum size of a request that the socket server will accept (protection against OOM)
  42. socket.request.max.bytes=104857600
  43. ############################# Log Basics #############################
  44. # A comma separated list of directories under which to store log files
  45. log.dirs=/usr/local/var/lib/kafka-logs
  46. # The default number of log partitions per topic. More partitions allow greater
  47. # parallelism for consumption, but this will also result in more files across
  48. # the brokers.
  49. num.partitions=1
  50. # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
  51. # This value is recommended to be increased for installations with data dirs located in RAID array.
  52. num.recovery.threads.per.data.dir=1
  53. ############################# Internal Topic Settings #############################
  54. # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
  55. # For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
  56. offsets.topic.replication.factor=1
  57. transaction.state.log.replication.factor=1
  58. transaction.state.log.min.isr=1
  59. ############################# Log Flush Policy #############################
  60. # Messages are immediately written to the filesystem but by default we only fsync() to sync
  61. # the OS cache lazily. The following configurations control the flush of data to disk.
  62. # There are a few important trade-offs here:
  63. # 1. Durability: Unflushed data may be lost if you are not using replication.
  64. # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
  65. # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
  66. # The settings below allow one to configure the flush policy to flush data after a period of time or
  67. # every N messages (or both). This can be done globally and overridden on a per-topic basis.
  68. # The number of messages to accept before forcing a flush of data to disk
  69. #log.flush.interval.messages=10000
  70. # The maximum amount of time a message can sit in a log before we force a flush
  71. #log.flush.interval.ms=1000
  72. ############################# Log Retention Policy #############################
  73. # The following configurations control the disposal of log segments. The policy can
  74. # be set to delete segments after a period of time, or after a given size has accumulated.
  75. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
  76. # from the end of the log.
  77. # The minimum age of a log file to be eligible for deletion due to age
  78. log.retention.hours=168
  79. # A size-based retention policy for logs. Segments are pruned from the log unless the remaining
  80. # segments drop below log.retention.bytes. Functions independently of log.retention.hours.
  81. #log.retention.bytes=1073741824
  82. # The maximum size of a log segment file. When this size is reached a new log segment will be created.
  83. log.segment.bytes=1073741824
  84. # The interval at which log segments are checked to see if they can be deleted according
  85. # to the retention policies
  86. log.retention.check.interval.ms=300000
  87. ############################# Zookeeper #############################
  88. # Zookeeper connection string (see zookeeper docs for details).
  89. # This is a comma separated host:port pairs, each corresponding to a zk
  90. # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
  91. # You can also append an optional chroot string to the urls to specify the
  92. # root directory for all kafka znodes.
  93. zookeeper.connect=localhost:2181
  94. # Timeout in ms for connecting to zookeeper
  95. zookeeper.connection.timeout.ms=6000
  96. ############################# Group Coordinator Settings #############################
  97. # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
  98. # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
  99. # The default value for this is 3 seconds.
  100. # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
  101. # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
  102. group.initial.rebalance.delay.ms=0%

启动 zk + kafka

  1. zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &
  2. kafka-server-start /usr/local/etc/kafka/server.properties

控制台,输出了很多信息,最后看到:

[2019-12-09 19:55:22,317] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

就表示kafka启动成功了。

关闭 zk + kafka

  1. zookeeper-server-stop /usr/local/etc/kafka/zookeeper.properties & kafka-server-stop /usr/local/etc/kafka/server.properties

列出 Topic

  1. kafka-topics --list --bootstrap-server localhost:9092

创建 topic

  1. kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

这个命令很简单,就是创建一个叫做 test 的主题,分区数和副本数都指定为 1 ,因为本地测试机器,指定 1 就足够了。

注意:kafka-topics.sh 如果提示没有这个命令,就换成 kafka-topic

查看数据

  1. kafka-console-consumer --bootstrap-server localhost:9092 --topic test_flink_cdc_01 --from-beginning

发送数据

  1. echo "hello" | kafka-console-producer --broker-list localhost:9092 --sync --topic test

其他命令

KAFKA_MANAGER
kafka-delegation-tokens
kafka-mirror-maker
kafka-streams-application-reset
kafka_manager_utils_zero-10_log-config
kafka-acls
kafka-delete-records
kafka-preferred-replica-election
kafka-tools
kafka_manager_utils_zero-10_log-config.bat
kafka-broker-api-versions
kafka-dump-log
kafka-producer-perf-test
kafka-topics
kafka_manager_utils_zero-11_log-config
kafka-cluster
kafka-features
kafka-reassign-partitions
kafka-verifiable-consumer
kafka_manager_utils_zero-11_log-config.bat
kafka-configs
kafka-leader-election
kafka-replica-verification
kafka-verifiable-producer
kafka_manager_utils_zero-90_log-config
kafka-console-consumer
kafka-log-dirs
kafka-run-class
kafka_manager_utils_one-10_log-config
kafka_manager_utils_zero-90_log-config.bat
kafka-console-producer
kafka-manager
kafka-server-start
kafka_manager_utils_one-10_log-config.bat
kafka-consumer-groups
kafka-manager.bat
kafka-server-stop
kafka_manager_utils_two-00_log-config
kafka-consumer-perf-test
kafka-metadata-shell
kafka-storage
kafka_manager_utils_two-00_log-config.bat

编写一个程序测试

  1. package core;
  2. import org.apache.kafka.clients.consumer.KafkaConsumer;
  3. import org.apache.kafka.common.TopicPartition;
  4. import java.util.Arrays;
  5. import java.util.Properties;
  6. /**
  7. * @program: kafka_learning
  8. * @description: kafka 测试出现这种异常:Exception in thread "main" java.lang.IllegalStateException: No current assignment
  9. * @author: TSL
  10. * @create: 2019-12-08 23:14
  11. **/
  12. public class KafkaConsumerSimple {
  13. // 本地搭建一个简易的 kafka 服务。http://kafka.apache.org/quickstart
  14. public static void main(String[] args) {
  15. // 配置
  16. Properties properties = new Properties();
  17. properties.put("bootstrap.servers","localhost:9092");
  18. properties.put("group.id","test-0");
  19. properties.put("enable.auto.commit", "true");
  20. properties.put("auto.commit.interval.ms", "1000");
  21. properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  22. properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  23. // 获取消费者实例
  24. KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
  25. kafkaConsumer.subscribe(Arrays.asList("test"));
  26. // 指定从topic的某个分区、某偏移量开始消费
  27. kafkaConsumer.seek(new TopicPartition("test",0),1L);
  28. }
  29. }

遇到的问题

1、key.deserializer

问题:
Exception in thread “main” org.apache.kafka.common.config.ConfigException: Missing required configuration “key.deserializer” which has no default value.

解决:

  1. // 指定序列化方式
  2. properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  3. properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

2、bootstrap.servers

问题:
org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers

解决:

  1. // 指定 kafka server 的地址
  2. properties.put("bootstrap.servers","localhost:9092");

3、 No current assignment for partition test-0

No current assignment for partition test-0

image.png

通过 debug 代码来理解这个错误是什么。

image.png

  1. /* the partitions that are currently assigned, note that the order of partition matters (see FetchBuilder for more details) */
  2. private final PartitionStates<TopicPartitionState> assignment;

4、java.io.IOException: No snapshot found, but there are log entries. Something is broken!

把zookepper 的临时目录清一下。

5、ps -ef | grep kafka 之后 找到 kill -9 pid 杀不死kafak进程

使用 jps 看一下是不是存在一个kafka主程序
截屏2021-11-11 上午11.37.43.png