3.1 消息的发送与接收
生产者主要的对象有: **KafkaProducer , ProducerRecord 。
其中 KafkaProducer 是用于发送消息的类, ProducerRecord 类用于封装Kafka的消息。
KafkaProducer **的创建需要指定的参数和含义:
参数 | 说明 |
---|---|
bootstrap.servers | 配置生产者如何与broker建立连接。该参数设置的是初始化参数。如果生产者需要连接的是Kafka集群,则这里配置集群中几个broker的地址,而不是全部,当生产者连接上此处指定的broker之后,在通过该连接发现集群中的其他节点。 |
key.serializer | 要发送信息的key数据的序列化类。设置的时候可以写类名,也可以使用该类的Class对象。 |
value.serializer | 要发送消息的alue数据的序列化类。设置的时候可以写类名,也可以使用该类的Class对象。 |
acks | 默认值:all。 acks=0: 生产者不等待broker对消息的确认,只要将消息放到缓冲区,就认为消息已经发送完成。 该情形不能保证broker是否真的收到了消息,retries配置也不会生效。发送的消息的返回的消息偏移量永远是-1。 |
acks=1
表示消息只需要写到主分区即可,然后就响应客户端,而不等待副本分区的确认。
在该情形下,如果主分区收到消息确认之后就宕机了,而副本分区还没来得及同步该消息,则该消息丢失。
acks=all
首领分区会等待所有的ISR副本分区确认记录。
该处理保证了只要有一个ISR副本分区存活,消息就不会丢失。
这是Kafka最强的可靠性保证,等效于 acks=-1 |
| retries | retries重试次数
当消息发送出现错误的时候,系统会重发消息。
跟客户端收到错误时重发一样。
如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1
否则在重试此失败消息的时候,其他的消息可能发送成功了 |
其他参数可以从 org.apache.kafka.clients.producer.ProducerConfig 中找到。我们后面的内容会介绍到。
消费者生产消息后,需要broker端的确认,可以同步确认,也可以异步确认。
同步确认效率低,异步确认效率高,但是需要设置回调对象。
生产者:
package com.lagou.kafka.demo.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class MyProducer1 {
public static void main(String[] args)
throws InterruptedException, ExecutionException, TimeoutException {
Map<String, Object> configs = new HashMap<>();
// 设置连接Kafka的初始连接用到的服务器地址
// 如果是集群,则可以通过此初始连接发现集群中的其他broker
configs.put("bootstrap.servers", "node1:9092");
// 设置key的序列化器
configs.put("key.serializer",
"org.apache.kafka.common.serialization.IntegerSerializer");
// 设置value的序列化器
configs.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
configs.put("acks", "1");
KafkaProducer<Integer, String> producer =
new KafkaProducer<Integer, String>(configs);
// 用于封装Producer的消息
ProducerRecord<Integer, String> record =
new ProducerRecord<Integer, String>("topic_1", // 主题名称
0, // 分区编号,现在只有一个分区,所以是0
0, // 数字作为key
"message 0" // 字符串作为value);
// 发送消息,同步等待消息的确认
producer.send(record).get(3_000, TimeUnit.MILLISECONDS);
// 关闭生产者
producer.close();
}
}
生产者2:
package com.lagou.kafka.demo.producer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.HashMap;
import java.util.Map;
public class MyProducer2 {
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
configs.put("bootstrap.servers", "node1:9092");
configs.put("key.serializer",
"org.apache.kafka.common.serialization.IntegerSerializer");
configs.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<Integer, String> producer =
new KafkaProducer<Integer, String>(configs);
ProducerRecord<Integer, String> record =
new ProducerRecord<Integer, String>( "topic_1", 0,1,"lagou message 2" );
// 使用回调异步等待消息的确认
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println( "主题:" + metadata.topic() + "\n"
+ "分区:" + metadata.partition() + "\n"
+ "偏移量:" + metadata.offset() + "\n"
+ "序列化的key字节:" + metadata.serializedKeySize() + "\n"
+ "序列化的value字节:" + metadata.serializedValueSize() + "\n"
+ "时间戳:" + metadata.timestamp() );
} else {
System.out.println("有异常:" + exception.getMessage());
}
}
});
// 关闭连接
producer.close();
}
}
生产者3:
package com.lagou.kafka.demo.producer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.HashMap;
import java.util.Map;
public class MyProducer3 {
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
configs.put("bootstrap.servers", "node1:9092");
configs.put("key.serializer",
"org.apache.kafka.common.serialization.IntegerSerializer");
configs.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<Integer, String> producer =
new KafkaProducer<Integer, String>(configs);
for (int i = 100; i < 200; i++) {
ProducerRecord<Integer, String> record =
new ProducerRecord<Integer, String>( "topic_1", 0,i,"lagou message " + i );
// 使用回调异步等待消息的确认
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("主题:" + metadata.topic() + "\n" +
"分区:" + metadata.partition() + "\n" +
"偏移量:" + metadata.offset() + "\n" +
"序列化的key字节:" + metadata.serializedKeySize() + "\n" +
"序列化的value字节:" + metadata.serializedValueSize() + "\n"+
"时间戳:" + metadata.timestamp() );
} else {
System.out.println("有异常:" + exception.getMessage());
}
}
});
}
// 关闭连接
producer.close();
}
}
消息消费流程:
消费者:
package com.lagou.kafka.demo.consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.lang.reflect.Array;
import java.util.*;
import java.util.regex.Pattern;
public class MyConsumer1 {
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
// 指定bootstrap.servers属性作为初始化连接Kafka的服务器。
// 如果是集群,则会基于此初始化连接发现集群中的其他服务器。
configs.put("bootstrap.servers", "node1:9092");
// key的反序列化器
configs.put("key.deserializer",
"org.apache.kafka.common.serialization.IntegerDeserializer");
// value的反序列化器
configs.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
configs.put("group.id", "consumer.demo");
// 创建消费者对象
KafkaConsumer<Integer, String> consumer =
new KafkaConsumer<Integer, String>(configs);
// final Pattern pattern = Pattern.compile("topic_\\d");
final Pattern pattern = Pattern.compile("topic_[0-9]");
// 消费者订阅主题或分区
// consumer.subscribe(pattern);
// consumer.subscribe(pattern, new ConsumerRebalanceListener() {
final List<String> topics = Arrays.asList("topic_1");
consumer.subscribe(topics, new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
partitions.forEach(tp -> {
System.out.println("剥夺的分区:" + tp.partition());
});
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
partitions.forEach(tp -> {
System.out.println(tp.partition()); });
}
});
// 拉取订阅主题的消息
final ConsumerRecords<Integer, String> records = consumer.poll(3_000);
// 获取topic_1主题的消息
final Iterable<ConsumerRecord<Integer, String>> topic1Iterable =
records.records("topic_1");
// 遍历topic_1主题的消息
topic1Iterable.forEach(record -> {
System.out.println("========================================");
System.out.println("消息头字段:" + Arrays.toString(record.headers().toArray()));
System.out.println("消息的key:" + record.key());
System.out.println("消息的偏移量:" + record.offset());
System.out.println("消息的分区号:" + record.partition());
System.out.println("消息的序列化key字节数:" + record.serializedKeySize());
System.out.println("消息的序列化value字节数:" + record.serializedValueSize());
System.out.println("消息的时间戳:" + record.timestamp());
System.out.println("消息的时间戳类型:" + record.timestampType());
System.out.println("消息的主题:" + record.topic());
System.out.println("消息的值:" + record.value()); });
// 关闭消费者
consumer.close();
}
}
3.2 SpringBoot Kafka
pom.xml文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.8.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.yue.kafka.demo</groupId> <artifactId>demo-02-springboot</artifactId> <version>0.0.1-SNAPSHOT</version> <name>demo-02-springboot</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
- application.properties ``` spring.application.name=springboot-kafka-02 server.port=8080
用于建立初始连接的broker地址
spring.kafka.bootstrap-servers=node1:9092
producer用到的key和value的序列化类
spring.kafka.producer.key-serializer =org.apache.kafka.common.serialization.IntegerSerializer spring.kafka.producer.value-serializer =org.apache.kafka.common.serialization.StringSerializer
默认的批处理记录数
spring.kafka.producer.batch-size=16384
32MB的总发送缓存
spring.kafka.producer.buffer-memory=33554432
consumer用到的key和value的反序列化类
spring.kafka.consumer.key-deserializer =org.apache.kafka.common.serialization.IntegerDeserializer spring.kafka.consumer.value-deserializer =org.apache.kafka.common.serialization.StringDeserializer
consumer的消费组id
spring.kafka.consumer.group-id=spring-kafka-02-consumer
是否自动提交消费者偏移量
spring.kafka.consumer.enable-auto-commit=true
每隔100ms向broker提交一次偏移量
spring.kafka.consumer.auto-commit-interval=100
如果该消费者的偏移量不存在,则自动设置为最早的偏移量
spring.kafka.consumer.auto-offset-reset=earliest
3. Demo02SpringbootApplication.java
```java
package com.yue.kafka.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Demo02SpringbootApplication {
public static void main(String[] args) {
SpringApplication.run(Demo02SpringbootApplication.class, args);
}
}
- KafkaConfig.java ```java package com.yue.kafka.demo.config;
import org.apache.kafka.clients.admin.NewTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class KafkaConfig { @Bean public NewTopic topic1() { return new NewTopic(“ntp-01”, 5, (short) 1); } @Bean public NewTopic topic2() { return new NewTopic(“ntp-02”, 3, (short) 1); } }
5. KafkaSyncProducerController.java
```java
package com.yue.kafka.demo.controller;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.ExecutionException;
@RestController
public class KafkaSyncProducerController {
@Autowired
private KafkaTemplate template;
@RequestMapping("send/sync/{message}")
public String sendSync(@PathVariable String message) {
ListenableFuture future = template.send(
new ProducerRecord<Integer, String>( "topic-spring-02", 0,1,message));
try {
// 同步等待broker的响应
Object o = future.get();
SendResult<Integer, String> result = (SendResult<Integer, String>) o;
System.out.println(result.getRecordMetadata().topic() +
result.getRecordMetadata().partition() +
result.getRecordMetadata().offset());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return "success";
}
}
- KafkaAsyncProducerController ```java package com.yue.kafka.demo.controller;
import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;
@RestController
public class KafkaAsyncProducerController {
@Autowired
private KafkaTemplate
@Override public void onSuccess(SendResult<Integer, String> result) {
System.out.println("发送成功:" + result.getRecordMetadata().topic()
+ "\t" + result.getRecordMetadata().partition() +
"\t" + result.getRecordMetadata().offset());
}
});
return "success";
}
}
7. MyConsumer.java
```java
package com.yue.kafka.demo.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;
@Component
public class MyConsumer {
@KafkaListener(topics = "topic-spring-02")
public void onMessage(ConsumerRecord<Integer, String> record) {
Optional<ConsumerRecord<Integer, String>> optional = Optional.ofNullable(record);
if (optional.isPresent()) {
System.out.println(record.topic() + "\t" +
record.partition() + "\t" +
record.offset() + "\t" +
record.key() + "\t" +
record.value());
}
}
}
4 服务端参数配置
$KAFKA_HOME/config/server.properties文件中的配置。
4.1 zookeeper.connect
该参数用于配置Kafka要连接的Zookeeper/集群的地址。
它的值是一个字符串,使用逗号分隔Zookeeper的多个地址。Zookeeper的单个地址是 host:port形式的,可以在最后添加Kafka在Zookeeper中的根节点路径。
如:
zookeeper.connect=node2:2181,node3:2181,node4:2181/myKafka
4.2 listeners
用于指定当前Broker向外发布服务的地址和端口。
与 **advertised.listeners 配合,用于做内外网隔离。
内外网隔离配置:
listener.security.protocol.map
监听器名称和安全协议的映射配置。
比如,可以将内外网隔离,即使它们都使用SSL。
listener.security.protocol.map=INTERNAL:SSL,EXTERNAL:SSL
每个监听器的名称只能在map中出现一次。
inter.broker.listener.name
用于配置broker之间通信使用的监听器名称,该名称必须在advertised.listeners列表中。
inter.broker.listener.name=EXTERNAL
listeners
用于配置broker监听的URI以及监听器名称列表,使用逗号隔开多个URI及监听器名称。
如果监听器名称代表的不是安全协议,必须配置listener.security.protocol.map。
每个监听器必须使用不同的网络端口。
advertised.listeners
需要将该地址发布到zookeeper供客户端使用,如果客户端使用的地址与listeners配置不同。
可以在zookeeper的 get /myKafka/brokers/ids/
在IaaS环境,该条目的网络接口得与broker绑定的网络接口不同。
如果不设置此条目,就使用listeners的配置。跟listeners不同,该条目不能使用0.0.0.0网络端口。
advertised.listeners的地址必须是listeners中配置的或配置的一部分。
4.3 broker.id
该属性用于唯一标记一个Kafka的Broker,它的值是一个任意integer值。
当Kafka以分布式集群运行的时候,尤为重要。
最好该值跟该Broker所在的物理主机有关的,如主机名为 host1.lagou.com ,则 broker.id=1 ,如果主机名为 192.168.100.101 ,则 broker.id=101 等等。
4.4 log.dir
通过该属性的值,指定Kafka在磁盘上保存消息的日志片段的目录。
它是一组用逗号分隔的本地文件系统路径。
如果指定了多个路径,那么broker 会根据“最少使用”原则,把同一个分区的日志片段保存到同一个路径下。
broker 会往拥有最少数目分区的路径新增分区,而不是往拥有最小磁盘空间的路径新增分区。