3.1 消息的发送与接收

image.png
生产者主要的对象有: **KafkaProducerProducerRecord
其中
KafkaProducer 是用于发送消息的类, ProducerRecord 类用于封装Kafka的消息。
KafkaProducer **的创建需要指定的参数和含义:

参数 说明
bootstrap.servers 配置生产者如何与broker建立连接。该参数设置的是初始化参数。如果生产者需要连接的是Kafka集群,则这里配置集群中几个broker的地址,而不是全部,当生产者连接上此处指定的broker之后,在通过该连接发现集群中的其他节点。
key.serializer 要发送信息的key数据的序列化类。设置的时候可以写类名,也可以使用该类的Class对象。
value.serializer 要发送消息的alue数据的序列化类。设置的时候可以写类名,也可以使用该类的Class对象。
acks 默认值:all
acks=0:
生产者不等待broker对消息的确认,只要将消息放到缓冲区,就认为消息已经发送完成。
该情形不能保证broker是否真的收到了消息,retries配置也不会生效。发送的消息的返回的消息偏移量永远是-1。

acks=1
表示消息只需要写到主分区即可,然后就响应客户端,而不等待副本分区的确认。
在该情形下,如果主分区收到消息确认之后就宕机了,而副本分区还没来得及同步该消息,则该消息丢失。

acks=all
首领分区会等待所有的ISR副本分区确认记录。
该处理保证了只要有一个ISR副本分区存活,消息就不会丢失。
这是Kafka最强的可靠性保证,等效于 acks=-1 | | retries | retries重试次数
当消息发送出现错误的时候,系统会重发消息。
跟客户端收到错误时重发一样。
如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1
否则在重试此失败消息的时候,其他的消息可能发送成功了 |

其他参数可以从 org.apache.kafka.clients.producer.ProducerConfig 中找到。我们后面的内容会介绍到。
消费者生产消息后,需要broker端的确认,可以同步确认,也可以异步确认。
同步确认效率低,异步确认效率高,但是需要设置回调对象。

生产者:

  1. package com.lagou.kafka.demo.producer;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import java.util.HashMap;
  5. import java.util.Map;
  6. import java.util.concurrent.ExecutionException;
  7. import java.util.concurrent.TimeUnit;
  8. import java.util.concurrent.TimeoutException;
  9. public class MyProducer1 {
  10. public static void main(String[] args)
  11. throws InterruptedException, ExecutionException, TimeoutException {
  12. Map<String, Object> configs = new HashMap<>();
  13. // 设置连接Kafka的初始连接用到的服务器地址
  14. // 如果是集群,则可以通过此初始连接发现集群中的其他broker
  15. configs.put("bootstrap.servers", "node1:9092");
  16. // 设置key的序列化器
  17. configs.put("key.serializer",
  18. "org.apache.kafka.common.serialization.IntegerSerializer");
  19. // 设置value的序列化器
  20. configs.put("value.serializer",
  21. "org.apache.kafka.common.serialization.StringSerializer");
  22. configs.put("acks", "1");
  23. KafkaProducer<Integer, String> producer =
  24. new KafkaProducer<Integer, String>(configs);
  25. // 用于封装Producer的消息
  26. ProducerRecord<Integer, String> record =
  27. new ProducerRecord<Integer, String>("topic_1", // 主题名称
  28. 0, // 分区编号,现在只有一个分区,所以是0
  29. 0, // 数字作为key
  30. "message 0" // 字符串作为value);
  31. // 发送消息,同步等待消息的确认
  32. producer.send(record).get(3_000, TimeUnit.MILLISECONDS);
  33. // 关闭生产者
  34. producer.close();
  35. }
  36. }

生产者2:

package com.lagou.kafka.demo.producer;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.HashMap;
import java.util.Map;

public class MyProducer2 {
    public static void main(String[] args) {
        Map<String, Object> configs = new HashMap<>();
        configs.put("bootstrap.servers", "node1:9092");
        configs.put("key.serializer",
                    "org.apache.kafka.common.serialization.IntegerSerializer");
        configs.put("value.serializer", 
                    "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<Integer, String> producer =
            new KafkaProducer<Integer, String>(configs);
        ProducerRecord<Integer, String> record =
            new ProducerRecord<Integer, String>( "topic_1", 0,1,"lagou message 2" );
        // 使用回调异步等待消息的确认
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception == null) { 
                    System.out.println( "主题:" + metadata.topic() + "\n"
                                       + "分区:" + metadata.partition() + "\n"
                                       + "偏移量:" + metadata.offset() + "\n"
                                       + "序列化的key字节:" + metadata.serializedKeySize() + "\n"
                                       + "序列化的value字节:" + metadata.serializedValueSize() + "\n"
                                       + "时间戳:" + metadata.timestamp() );
                } else {
                    System.out.println("有异常:" + exception.getMessage()); 
                }
            }
        });
        // 关闭连接
        producer.close();
    } 
}

生产者3:

package com.lagou.kafka.demo.producer;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.HashMap;
import java.util.Map;

public class MyProducer3 {
    public static void main(String[] args) {
        Map<String, Object> configs = new HashMap<>();
        configs.put("bootstrap.servers", "node1:9092");
        configs.put("key.serializer", 
                    "org.apache.kafka.common.serialization.IntegerSerializer");
        configs.put("value.serializer",
                    "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<Integer, String> producer =
            new KafkaProducer<Integer, String>(configs); 

        for (int i = 100; i < 200; i++) {
            ProducerRecord<Integer, String> record =
                new ProducerRecord<Integer, String>( "topic_1", 0,i,"lagou message " + i );
            // 使用回调异步等待消息的确认
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("主题:" + metadata.topic() + "\n" + 
                                           "分区:" + metadata.partition() + "\n" + 
                                           "偏移量:" + metadata.offset() + "\n" + 
                                           "序列化的key字节:" + metadata.serializedKeySize() + "\n" + 
                                           "序列化的value字节:" + metadata.serializedValueSize() + "\n"+ 
                                           "时间戳:" + metadata.timestamp() );
                    } else {
                        System.out.println("有异常:" + exception.getMessage());
                    } 
                }
            });
        }
        // 关闭连接
        producer.close();
    }
}

消息消费流程:

消费者:

package com.lagou.kafka.demo.consumer;

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.lang.reflect.Array;
import java.util.*;
import java.util.regex.Pattern;

public class MyConsumer1 {
    public static void main(String[] args) {
        Map<String, Object> configs = new HashMap<>();
        // 指定bootstrap.servers属性作为初始化连接Kafka的服务器。
        // 如果是集群,则会基于此初始化连接发现集群中的其他服务器。
        configs.put("bootstrap.servers", "node1:9092"); 
        // key的反序列化器
        configs.put("key.deserializer",
                    "org.apache.kafka.common.serialization.IntegerDeserializer");
        // value的反序列化器
        configs.put("value.deserializer",
                    "org.apache.kafka.common.serialization.StringDeserializer");
        configs.put("group.id", "consumer.demo");
        // 创建消费者对象
        KafkaConsumer<Integer, String> consumer =
            new KafkaConsumer<Integer, String>(configs);
        // final Pattern pattern = Pattern.compile("topic_\\d");
        final Pattern pattern = Pattern.compile("topic_[0-9]");
        // 消费者订阅主题或分区
        // consumer.subscribe(pattern);
        // consumer.subscribe(pattern, new ConsumerRebalanceListener() {

        final List<String> topics = Arrays.asList("topic_1");
        consumer.subscribe(topics, new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                partitions.forEach(tp -> {
                    System.out.println("剥夺的分区:" + tp.partition());
                });
            }
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                partitions.forEach(tp -> {
                    System.out.println(tp.partition()); });
            }
        });

        // 拉取订阅主题的消息
        final ConsumerRecords<Integer, String> records = consumer.poll(3_000);
        // 获取topic_1主题的消息
        final Iterable<ConsumerRecord<Integer, String>> topic1Iterable =
            records.records("topic_1");
        // 遍历topic_1主题的消息
        topic1Iterable.forEach(record -> {
            System.out.println("========================================");
            System.out.println("消息头字段:" + Arrays.toString(record.headers().toArray()));
            System.out.println("消息的key:" + record.key());
            System.out.println("消息的偏移量:" + record.offset());
            System.out.println("消息的分区号:" + record.partition());
            System.out.println("消息的序列化key字节数:" + record.serializedKeySize());
            System.out.println("消息的序列化value字节数:" + record.serializedValueSize());
            System.out.println("消息的时间戳:" + record.timestamp());
            System.out.println("消息的时间戳类型:" + record.timestampType());
            System.out.println("消息的主题:" + record.topic());
            System.out.println("消息的值:" + record.value()); });
        // 关闭消费者
        consumer.close();
    }
}

3.2 SpringBoot Kafka

  1. pom.xml文件

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                              https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-parent</artifactId>
     <version>2.2.8.RELEASE</version>
     <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.yue.kafka.demo</groupId>
    <artifactId>demo-02-springboot</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demo-02-springboot</name>
    <description>Demo project for Spring Boot</description>
    <properties>
     <java.version>1.8</java.version>
    </properties>
    
    <dependencies>
     <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-web</artifactId>
     </dependency>
     <dependency>
       <groupId>org.springframework.kafka</groupId>
       <artifactId>spring-kafka</artifactId>
     </dependency>
     <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-test</artifactId>
       <scope>test</scope>
       <exclusions>
         <exclusion>
           <groupId>org.junit.vintage</groupId>
           <artifactId>junit-vintage-engine</artifactId>
         </exclusion>
       </exclusions>
     </dependency>
     <dependency>
       <groupId>org.springframework.kafka</groupId>
       <artifactId>spring-kafka-test</artifactId>
       <scope>test</scope>
     </dependency>
    </dependencies>
    
    <build>
     <plugins>
       <plugin>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-maven-plugin</artifactId>
       </plugin>
     </plugins>
    </build>
    </project>
    
  2. application.properties ``` spring.application.name=springboot-kafka-02 server.port=8080

用于建立初始连接的broker地址

spring.kafka.bootstrap-servers=node1:9092

producer用到的key和value的序列化类

spring.kafka.producer.key-serializer =org.apache.kafka.common.serialization.IntegerSerializer spring.kafka.producer.value-serializer =org.apache.kafka.common.serialization.StringSerializer

默认的批处理记录数

spring.kafka.producer.batch-size=16384

32MB的总发送缓存

spring.kafka.producer.buffer-memory=33554432

consumer用到的key和value的反序列化类

spring.kafka.consumer.key-deserializer =org.apache.kafka.common.serialization.IntegerDeserializer spring.kafka.consumer.value-deserializer =org.apache.kafka.common.serialization.StringDeserializer

consumer的消费组id

spring.kafka.consumer.group-id=spring-kafka-02-consumer

是否自动提交消费者偏移量

spring.kafka.consumer.enable-auto-commit=true

每隔100ms向broker提交一次偏移量

spring.kafka.consumer.auto-commit-interval=100

如果该消费者的偏移量不存在,则自动设置为最早的偏移量

spring.kafka.consumer.auto-offset-reset=earliest

3. Demo02SpringbootApplication.java
```java
package com.yue.kafka.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Demo02SpringbootApplication {
    public static void main(String[] args) {
        SpringApplication.run(Demo02SpringbootApplication.class, args);
    }
}
  1. KafkaConfig.java ```java package com.yue.kafka.demo.config;

import org.apache.kafka.clients.admin.NewTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;

@Configuration public class KafkaConfig { @Bean public NewTopic topic1() { return new NewTopic(“ntp-01”, 5, (short) 1); } @Bean public NewTopic topic2() { return new NewTopic(“ntp-02”, 3, (short) 1); } }

5. KafkaSyncProducerController.java
```java
package com.yue.kafka.demo.controller;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.ExecutionException;

@RestController
public class KafkaSyncProducerController {
    @Autowired
    private KafkaTemplate template;
    @RequestMapping("send/sync/{message}")
    public String sendSync(@PathVariable String message) {
        ListenableFuture future = template.send(
            new ProducerRecord<Integer, String>( "topic-spring-02", 0,1,message));
        try {
            // 同步等待broker的响应
            Object o = future.get();
            SendResult<Integer, String> result = (SendResult<Integer, String>) o;
            System.out.println(result.getRecordMetadata().topic() +
                               result.getRecordMetadata().partition() +
                               result.getRecordMetadata().offset()); 
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        return "success";
    }
}
  1. KafkaAsyncProducerController ```java package com.yue.kafka.demo.controller;

import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;

@RestController public class KafkaAsyncProducerController { @Autowired private KafkaTemplate template; @RequestMapping(“send/async/{message}”) public String asyncSend(@PathVariable String message) { ProducerRecord record = new ProducerRecord( “topic-spring-02”, 0,3,message ); ListenableFuture> future = template.send(record); // 添加回调,异步等待响应 future.addCallback(new ListenableFutureCallback>() { @Override public void onFailure(Throwable throwable) { System.out.println(“发送失败: “ + throwable.getMessage()); }

        @Override public void onSuccess(SendResult<Integer, String> result) {
            System.out.println("发送成功:" + result.getRecordMetadata().topic()
                               + "\t" + result.getRecordMetadata().partition() + 
                               "\t" + result.getRecordMetadata().offset());
        }
    });
    return "success";
}

}

7. MyConsumer.java
```java
package com.yue.kafka.demo.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Optional;

@Component
public class MyConsumer {
    @KafkaListener(topics = "topic-spring-02")
    public void onMessage(ConsumerRecord<Integer, String> record) {
        Optional<ConsumerRecord<Integer, String>> optional = Optional.ofNullable(record);
        if (optional.isPresent()) {
            System.out.println(record.topic() + "\t" +
                               record.partition() + "\t" +
                               record.offset() + "\t" +
                               record.key() + "\t" +
                               record.value());
        }
    }
}

4 服务端参数配置

$KAFKA_HOME/config/server.properties文件中的配置。

4.1 zookeeper.connect

该参数用于配置Kafka要连接的Zookeeper/集群的地址。
它的值是一个字符串,使用逗号分隔Zookeeper的多个地址。Zookeeper的单个地址是 host:port形式的,可以在最后添加Kafka在Zookeeper中的根节点路径。
如:

zookeeper.connect=node2:2181,node3:2181,node4:2181/myKafka

image.png

4.2 listeners

用于指定当前Broker向外发布服务的地址和端口。
**advertised.listeners 配合,用于做内外网隔离。
image.png
内外网隔离配置:
listener.security.protocol.map
监听器名称和安全协议的映射配置。
比如,可以将内外网隔离,即使它们都使用SSL。
listener.security.protocol.map=INTERNAL:SSL,EXTERNAL:SSL
每个监听器的名称只能在map中出现一次。
inter.broker.listener.name
用于配置broker之间通信使用的监听器名称,该名称必须在advertised.listeners列表中。
inter.broker.listener.name=EXTERNAL
listeners
用于配置broker监听的URI以及监听器名称列表,使用逗号隔开多个URI及监听器名称。
如果监听器名称代表的不是安全协议,必须配置listener.security.protocol.map。
每个监听器必须使用不同的网络端口。
advertised.listeners
需要将该地址发布到zookeeper供客户端使用,如果客户端使用的地址与listeners配置不同。
可以在zookeeper的
get /myKafka/brokers/ids/** 中找到。
在IaaS环境,该条目的网络接口得与broker绑定的网络接口不同。
如果不设置此条目,就使用listeners的配置。跟listeners不同,该条目不能使用0.0.0.0网络端口。
advertised.listeners的地址必须是listeners中配置的或配置的一部分。
image.png

4.3 broker.id

该属性用于唯一标记一个Kafka的Broker,它的值是一个任意integer值。
当Kafka以分布式集群运行的时候,尤为重要。
最好该值跟该Broker所在的物理主机有关的,如主机名为 host1.lagou.com ,则 broker.id=1 ,如果主机名为 192.168.100.101 ,则 broker.id=101 等等。
image.png

4.4 log.dir

通过该属性的值,指定Kafka在磁盘上保存消息的日志片段的目录。
它是一组用逗号分隔的本地文件系统路径。
如果指定了多个路径,那么broker 会根据“最少使用”原则,把同一个分区的日志片段保存到同一个路径下。
broker 会往拥有最少数目分区的路径新增分区,而不是往拥有最小磁盘空间的路径新增分区。
image.png