1 Apache Kafka 基本操作

1.1 准备

本机ip:192.168.135.145

关闭防火墙:systemctl stop firewalld

1.2 启动Zookeeper

  1. zookeeper-server-start.sh config/zookeeper.properties

1.3 启动Kafka Broker

  1. kafka-server-start.sh config/server.properties

1.4 创建主题Topic

  1. kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 -- partitions 1 --topic test

输出

  1. Created topic test.

1.5 查看主题Topic

  1. kafka-topics.sh --list --zookeeper localhost:2181

输出

test

1.6 修改主题Topic

1.6.1 修改分区数Partition

  1. kafka-topics.sh --zookeeper localhost:2181 --alter --topic topicName --partitions 8

1.6.2 增加配置

  1. kafka-topics.sh --zookeeper localhost:2181 --alter --topic test --config flush.messages=1

1.6.3 删除配置

  1. kafka-topics.sh --zookeeper localhost:2181 --alter --topic test --delete-config flush.messages

1.6.4 删除主题Topic

  1. kafka-topics.sh --zookeeper localhost:2181 --delete --topic topicName

1.7 生产者

  1. kafka-console-producer.sh --broker-list 192.168.135.145:9092 --topic test

1.8 消费者

  1. kafka-console-consumer.sh --bootstrap-server 192.168.135.145:9092 --topic test --from-beginning

例子:

Kafka 基本操作 - 图1
Kafka 基本操作 - 图2
注意:ip不能写错,否则无法建立连接。

2 集群模式

克隆两台虚拟机
复制两个配置文件

  1. cd /opt/kafka_2.11-2.4.0/
  2. cp config/server.properties config/server-1.properties
  3. cp config/server.properties config/server-2.properties
  4. vim config/server-1.properties
  5. broker.id=0
  6. listeners=PLAINTEXT://192.168.135.145:9092
  7. log.dirs=/tmp/kafka-logs-0
  8. zookeeper.connect=192.168.135.145:2181,192.168.135.146:2181,192.168.135.147:2181
  9. host.name=192.168.135.145
  10. vim config/server-1.properties
  11. broker.id=1
  12. listeners=PLAINTEXT://192.168.135.146:9092
  13. log.dirs=/tmp/kafka-logs-0
  14. zookeeper.connect=192.168.135.145:2181,192.168.135.146:2181,192.168.135.147:2181
  15. host.name=192.168.135.146
  16. vim config/server-2.properties
  17. broker.id=1
  18. listeners=PLAINTEXT://192.168.135.147:9092
  19. log.dirs=/tmp/kafka-logs-0
  20. zookeeper.connect=192.168.135.145:2181,192.168.135.146:2181,192.168.135.147:2181
  21. host.name=192.168.135.147

克隆三个窗口

  1. Broker1
  2. kafka-server-start.sh config/server.properties
  3. Broker2
  4. kafka-server-start.sh config/server-1.properties
  5. Broker3
  6. kafka-server-start.sh config/server-2.properties

2.1 创建主题

创建三个分区,两个副本

  1. kafka-topics.sh --create --zookeeper 192.168.135.145:2181 --replication-factor 2 -- partitions 3 --topic test

2.2 查看主题

  1. kafka-topics.sh --list --zookeeper 192.168.135.145:2181,192.168.135.146:2181,192.168.135.147:2181

2.3 生产者

  1. kafka-console-producer.sh --broker-list 192.168.135.145:9092,192.168.135.146:9092,192.168.135.147:9092 --topic test

2.4 消费者

  1. kafka-console-consumer.sh --from-beginning --topic test --zookeeper 192.168.135.145:2181,192.168.135.146:2181,192.168.135.147:2181

2.5 查看各节点运行情况

  1. kafka-topics.sh --describe --zookeeper 192.168.135.145:2181 --topic test

输出

  1. Topic: test PartitionCount: 3 ReplicationFactor: 2 Configs:<br /> Topic: test Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1

2.6 查看主题消费进度

  1. bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group pv
  2. Group Topic Pid Offset logSize Lag Owner
  3. pv page_visits 0 21 21 0 none
  4. pv page_visits 1 19 19 0 none
  5. pv page_visits 2 20 20 0 none

topic:创建时topic名称

pid:分区编号

offset:表示该parition已经消费了多少条message

logSize:表示该partition已经写了多少条message

Lag:表示有多少条message没有被消费。

Owner:表示消费者

3 Java API

3.1 pom

  1. <dependencies>
  2. <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
  3. <dependency>
  4. <groupId>org.apache.kafka</groupId>
  5. <artifactId>kafka-clients</artifactId>
  6. <version>2.4.0</version>
  7. </dependency>
  8. <dependency>
  9. <groupId>org.apache.kafka</groupId>
  10. <artifactId>kafka-streams</artifactId>
  11. <version>2.4.0</version>
  12. </dependency>
  13. </dependencies>
  14. <build>
  15. <plugins>
  16. <!-- java编译插件 -->
  17. <plugin>
  18. <groupId>org.apache.maven.plugins</groupId>
  19. <artifactId>maven-compiler-plugin</artifactId>
  20. <version>3.2</version>
  21. <configuration>
  22. <source>1.8</source>
  23. <target>1.8</target>
  24. <encoding>UTF-8</encoding>
  25. </configuration>
  26. </plugin>
  27. </plugins>
  28. </build>

3.2 生产者

  1. public class SimpleProducer {
  2. public static void main(String[] args) {
  3. //Assign topicName to string variable
  4. String topicName = "my-topic";
  5. // create instance for properties to access producer configs
  6. Properties props = new Properties();
  7. //Assign localhost id
  8. props.put("bootstrap.servers", "192.168.135.145:9092");
  9. //Set acknowledgements for producer requests.
  10. props.put("acks", "all");
  11. //If the request fails, the producer can automatically retry,
  12. props.put("retries", 0);
  13. //Specify buffer size in config
  14. props.put("batch.size", 16384);
  15. //Reduce the no of requests less than 0
  16. props.put("linger.ms", 1);
  17. //The buffer.memory controls the total amount of memory available to the producer for buffering.
  18. props.put("buffer.memory", 33554432);
  19. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  20. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  21. Producer<String, String> producer = new KafkaProducer<> (props);
  22. for (int i = 0; i < 10; i++) {
  23. // 发送消息
  24. producer.send(new ProducerRecord<>(topicName,"hello world->"+ Integer.toString(i)));
  25. }
  26. System.out.println("Message sent successfully");
  27. producer.close();
  28. }
  29. }

3.3 消费者

  1. public class SimpleConsumer {
  2. public static void main(String[] args) {
  3. String topic = "my-topic";
  4. String group = "test";
  5. Properties props = new Properties();
  6. props.put("bootstrap.servers", "192.168.135.145:9092");
  7. props.put("group.id", group);
  8. props.put("enable.auto.commit", "true");
  9. props.put("auto.commit.interval.ms", "1000");
  10. props.put("session.timeout.ms", "30000");
  11. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  12. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  13. KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
  14. consumer.subscribe(Arrays.asList(topic));
  15. System.out.println("Subscribed to topic " + topic);
  16. while (true) {
  17. ConsumerRecords<String, String> records = consumer.poll(100);
  18. for (ConsumerRecord<String, String> record : records) {
  19. // 接收消息
  20. System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
  21. }
  22. }
  23. }
  24. }

注意:测试之前一定要关闭防火墙,否则建立不了连接。

输出

offset = 16, key = null, value = hello world->0
offset = 17, key = null, value = hello world->1
offset = 18, key = null, value = hello world->2
offset = 19, key = null, value = hello world->3
offset = 20, key = null, value = hello world->4
offset = 21, key = null, value = hello world->5
offset = 22, key = null, value = hello world->6
offset = 23, key = null, value = hello world->7
offset = 24, key = null, value = hello world->8
offset = 25, key = null, value = hello world->9