一、概念详解
- Topic:可以理解为一个类。
- Partition:topic中的数据分割为一个或者多个partition,每个topic至少有一个partition,每个partition中的数据使用多个segment文件存储。partition中的数据是有序的,partition间的数据丢失了数据的顺序,如果一个topic有多个partition,消费数据时就不能保证数据的顺序,在需要严格保证消息顺序的场景下,partition的数目设为1
- Broker:一个kafka集群可以有多个服务器(结点)brokers。其中有一个leader和多个follower,假如有N和结点的话,允许N-1个结点无法工作,broker存储的是topic的数据。
- Replica:副本,主要就是做partition的备份,不会被消费者消费,主要是防止数据的丢失。
- 建议:topic有N个partition,最好就有>=N个broker,这样可以避免broker上的partition分布不均。
- Leader:每个partition有多个副本,其中有且只有一个作为leader。leader是当前负责数据的读写的partition。
Zookeeper:负责维护和协调broker,当kafka系统中新增了broker或者某个broker发送故障失效时,由zookeeper通知生产者和消费者。生产者和消费者依据zookeeper的broker状态信息与broker协调数据的发布和订阅任务。
二、Kafka安装
1、安装zookeeper
2、安装kafka
下载地址:http://kafka.apache.org/downloads
下载并解压之后进入server.properties进行参数配置broker.id=0:表示broker的编号,如果集群中有多个broker,则每个broker的编号需要设置不同
- listeners=PLAINTEXT://:9092:broker对外提供的服务入口地址
- log.dirs=/tmp/kafka/log 设置存放消息日志文件的地址(一般指定为kafka目录下的logs文件)
- zookeeper.connect=localhost:2181:kafka所需Zookeeper集群地址
2、启动kafka
需要指定server.properties后启动
superking@wangchaodeMacBook-Pro bin % kafka-server-start.sh ../config/server.properties
三、Kafka原生使用
1、创建主题
—zookeeper localhost:2181:指定当前zookeeper,多个Zookeeper用逗号分隔
—create —topic heima:指定创建主题hiema
—partition 2:指定分区个数 2
—replication-factor:1 :一个副本;注意:每个副本分布在不同的节点上,不能超过总结点数。假如只有一个节点,但是创建时指定副本数是2,则报错。
// bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic dev.smiler.king.user.index --partitions 2 --replication-factor 1
2、显示主题
# 显示主题
bin/kafka-topics.sh --zookeeper localhost:2181 --list
# 显示详情
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic supkingx
3、指定生产/消费者
# 消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic supkingx
# 生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic supkingx
四、Kafka使用java客户端进行连接
1、maven配置
<parent>
<artifactId>spring-boot-starter-parent</artifactId>
<groupId>org.springframework.boot</groupId>
<version>2.1.10.RELEASE</version>
</parent>
<groupId>com.supkingx</groupId>
<artifactId>spring-boot-kafka-01</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<scala.version>2.11</scala.version>
<kafka.version>2.3.1</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.18</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
2、java程序
2.1 生产者
public class ProducerFastStart {
private static final String BROKER_LIST = "localhost:9092";
private static final String TOPIC = "supkingx";
public static void main(String[] args) {
Properties properties = new Properties();
// 设置key序列化器
// properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 设置重试次数
properties.put(ProducerConfig.RETRIES_CONFIG, 10);
// 设置值序列化器
// properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 设置集群地址
// properties.put("bootstrap.servers", BROKER_LIST);
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
// 定义一个生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 定义一个消息
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "kafka-demo", "hello,kafka! I am supkingx");
try {
// 发送消息
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}
2.2 消费者
public class ConsumerFastStart {
private static final String BROKER_LIST = "localhost:9092";
private static final String TOPIC = "supkingx";
private static final String GROUP_ID = "group.demo";
public static void main(String[] args) {
Properties properties = new Properties();
// 设置key序列化器
// properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 设置值序列化器
// properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 设置集群地址
// properties.put("bootstrap.servers", BROKER_LIST);
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
// properties.put("group.id", GROUP_ID);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
// 定义消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 定义主题
consumer.subscribe(Collections.singletonList(TOPIC));
while (true) {
// 开始接收消息,定义每1秒接收一次
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
}
}
五、生产者详解
5.1 消息发送
5.1.1 kafka java客户端数据生产流程解析
5.1.2 发送类型
发送即忘记
producer.send(record);
同步发送
try {
// 发送消息
Future<RecordMetadata> send = producer.send(record);
RecordMetadata recordMetadata = send.get();
System.out.println("topic:" + recordMetadata.topic());
System.out.println("partition:" + recordMetadata.partition());
System.out.println("offset:" + recordMetadata.offset());
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
异步发送
try {
// 异步发送消息
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
System.out.println("topic:" + recordMetadata.topic());
System.out.println("partition:" + recordMetadata.partition());
System.out.println("offset:" + recordMetadata.offset());
}
});
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
5.1.3 序列化器
org.apache.kafka.common.serialization.Serializer
略
5.1.4 分区器
org.apache.kafka.clients.producer.internals.DefaultPartitioner
略
5.1.5 拦截器
Producer拦截器(interceptor),它和consumer端inteceptor是在kafka0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。
生产者拦截器可以用在消息发送前做一些准备工作。
使用场景
1、按照某个规则过滤掉不符合要求的消息。
2、修改消息的内容。
3、统计类需求
实现
首先实现接口org.apache.kafka.clients.producer.ProducerInterceptor
public class ProducerInterceptorPrefix implements ProducerInterceptor<String, String> {
private volatile long sendSuccess = 0;
private volatile long sendFailure = 0;
/**
* 加前缀
*
* @param record
* @return
*/
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
String newMessage = "prefix--" + record.value();
return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(),
record.key(), newMessage, record.headers());
}
/**
* 统计成功率
*
* @param metadata
* @param exception
*/
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception == null) {
sendSuccess++;
} else {
sendFailure++;
}
}
/**
* 发送之后输出成功率
*/
@Override
public void close() {
double successRatio = (double) sendSuccess / (sendSuccess + sendFailure);
System.out.println("消息发送成功率:" +
String.format("%f", successRatio * 100) + "%");
}
@Override
public void configure(Map<String, ?> configs) { }
}
自定义拦截器加入到配置中去
// 指定自定义拦截器
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptorPrefix.class.getName());
5.2 发送原理剖析
消息发送的过程中,涉及到两个线程的协同工作,主线程首先将业务数据封装成ProducerRecord对象,之后调用send()方法将消息放入RecordAccumulator(消息收集器,也可以理解为主线程与Sender线程直接的缓冲区)中暂存,Sender线程负责将消息信息构成请求,并最终执行网络I/O的线程,它从RecordAccumulate中取出消息并批量发送出去,需要注意的是,KafkaProducer是线程安全的,多个线程之间可以共享使用同一个KafkaProducer对象。
5.3 其他生产者参数
5.3.1 acks
这个参数用来指定分区中必须有多少个副本收到这条消息,之后生产者才会认为这条消息写入成功的。acks是生产者客户端中非常重要的一个参数,它涉及到消息的可靠性和吞吐量之前的权衡。
- ack=0,生产者在成功写入消息之前不会等待任何来自服务器的响应。如果出现问题生产者是感知不到的。
- ack=1,只要集群的首节点收到消息,生产者就会收到一个来自服务器的成功响应。如果消息无法到达首节点(比如首节点崩溃,新的首领还没有被选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。但是,这样还有可能会导致数据丢失,如果收到写成功的通知,此时首节点还没来的及同步数据到follower节点,首节点崩溃,就会导致数据丢失。
- ack=-1,只有当前所有参与复制的节点都收到消息时,生产者会收到一个来自服务器的成功响应,这种模式是最安全的,它可以保证不止一个服务器收到消息。
注意:acks是一个字符串类型。
定义acks到配置中去
// 定义ack
properties.put(ProducerConfig.ACKS_CONFIG, "0");
5.3.2 retries
生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这种情况下,如果达到了retires
设置的次数,生产者会放弃重试并返回错误,默认情况下,生产者会在每次重试之间等待100ms,可以通过retires.backoff.ms
参数来修改这个时间间隔。
5.3.3 batch.size
当有多个消息要发送到同一分区时,生产者会把他们放到同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算,而不是消息个数。当批次被填满,批次里的所有消息会被发送出去。不过生产者并不一定都会等到批次被填满了才发送,半满批次,甚至只包含一个消息的批次也可以发送。所以就算把batch.size
设置的很大,也不会造成延迟,只会占用更多的内存而已,如果设置的过小,生产者会因为频繁发送消息而增加一些额外的开销。
5.3.4 linger.ms
上面比如我们设置batch size为32KB,但是比如有的时刻消息比较少,过了很久,比如5min也没有凑够32KB,这样延时就很大,所以需要一个参数. 再设置一个时间,到了这个时间,即使数据没达到32KB,也将这个批次发送出去. 比如设置5ms,就是到了5ms,大小没到32KB,也会发出去
5.3.5 max.request.size
该参数用于控制生产者发送的请求大小,它可以指定能发送的单个消息的最大值,也可以指定单个请求里所有消息的总大小。broker
对可接收的消息最大值也有(massage.max.size
),所以两边的配置最好匹配,避免生产者发送的消息被broker
拒绝。
5.4.6 buffer.memory
Kafka的客户端发送数据到服务器,不是来一条就发一条,而是经过缓冲的,也就是说,通过KafkaProducer发送出去的消息都是先进入到客户端本地的内存缓冲里,然后把很多消息收集成一个一个的Batch,再发送到Broker上去的,这样性能才可能高。
buffer.memory的本质就是用来约束Kafka Producer能够使用的内存缓冲的大小的,默认值32MB。
如果buffer.memory设置的太小,可能导致的问题是:消息快速的写入内存缓冲里,但Sender线程来不及把Request发送到Kafka服务器,会造成内存缓冲很快就被写满。而一旦被写满,就会阻塞用户线程,不让继续往Kafka写消息了。
缓冲区。内存使用缓冲区。内存即限制总内存,可用来收集未发送的消息。当这个达到限制后,生产者将阻塞额外的发送作为max.block.ms引发异常。
- 注意
buffer.memory
与batch.size
参数的区别 - 如果要发送大文件的话,要同时提高
buffer.memory
和batch.size
的大小
5.4.7 message.max.bytes
(默认:1000000) – broker能接收消息的最大字节数,这个值应该比消费端的fetch.message.max.bytes更小才对,否则broker就会因为消费端无法使用这个消息而挂起。
5.4.8 replica.fetch.max.bytes
(默认: 1MB) – broker可复制的消息的最大字节数。这个值应该比message.max.bytes大,否则broker会接收此消息,但无法将此消息复制出去,从而造成数据丢失。
六、消费者详解
6.1 消费者和消费组
一个消费组里面不管有多少消费者,只能消费一个消息,消费者A消费完消息后,消费者B便消费不了这个消息了。
kafka消费者是消费组的一部分,当多个消费者形成一个消费组来消费主题时,每个消费者会收到不同分区的消息。假设有一个T1主题,该主题有4个分区;同时我们有一个消费组G1,这个消费组只有一个消费者C1。那么消费者C1将会收到这4个分区的消息,如下所示:
kafka一个很重要的特性就是,只需要写入一次消息,可以支持任意多的应用读取到这个消息。换句话说,每个应用都可以读取到全量的消息。为了使得每个应用都能读到全量消息,应用需要有不同的消费组。对于上面的的例子,假如我们新增了一个消费组G2,而这个消费组有两个消费者,那么会是这样的:
6.2 订阅主题和分区
创建完消费者后我们便可以订阅主题了,只需要通过调用subscribe()方法即可,这个方法接收一个主题列表
// 定义消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 定义主题
consumer.subscribe(Collections.singletonList(TOPIC));
另外我们也可以使用正则表达式来匹配多个主题
consumer.subscribe(Pattern.compile("test"));
指定订阅的分区
// 指定订阅的分区
consumer.assign(Arrays.asList(new TopicPartition(TOPIC,1)));
6.3 序列化与反序列化
生产者与消费者的序列化一致
// 设置key序列化器
// properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 设置值序列化器
// properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
6.4 位移提交
对于kafka中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中的位置。
当我们调用poll()时,该方法会返回我们没有消费的消息。当消息从broker返回消费者时,broker并不跟踪这些消费是否被消费者接收到;kafka让消费者自身来管理消费的位移,并向消费者提供更新位移的接口,这种更新位移方式称之为提交(commit)。
kafka只能保证在一个分区里消息是顺序的。
重复提交
组G1里面的一个消费者消费到10的时候,还没有commit,这时由来一个消费者消费到10,这样就会重复消费。
消息丢失
一个消费者A消费到消息并commit后,突然宕机了,此时消费者B是消费不到这个消息的,就会导致这个消息丢失。
自动提交
这种方式让消费者来管理位移,应用本身不需要显示操作。当我们将enable.auto.commit
设置为true,那么消费者会在poll方法调用后每隔5秒(由auto.commit.interval.ms指定),提交一次位移。和很多其他操作一样,自动提交也是由poll()方法来驱动的;在调用poll()时,消费者判断是否到达提交时间,如果是则提交上一次poll返回的最大位移。
需要注意到,这种方式可能会导致消息重复消费。假如,某个消费者poll消息后,应用正在处理消息,在3秒后 kafka进行了再均衡,那么由于没有更新位移导致再均衡后这部分消息重复消费。
同步提交
// 手动提交开启,关闭自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
public class ConsumerFastStart {
private static final String BROKER_LIST = "localhost:9092";
private static final String TOPIC = "supkingx";
private static final String GROUP_ID = "group.demo";
public static Properties initConfig() {
Properties properties = new Properties();
// 设置key序列化器
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 设置值序列化器
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 设置集群地址
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
// 指定kafkaConsumer对应的客户端Id,默认为空,如果不设置则会自动设置一个非空的字符串
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer.client.id.demo");
// 手动提交开启,关闭自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return properties;
}
public static void main(String[] args) {
Properties properties = initConfig();
// 定义消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
final TopicPartition topicPartition = new TopicPartition(TOPIC, 0);
// 指定订阅的分区和主题
consumer.assign(Arrays.asList(topicPartition));
long lastConsumerOffSet = -1;
while (true) {
// 开始接收消息,定义每1秒接收一次
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
if (records.isEmpty()) {
break;
}
List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition);
lastConsumerOffSet = partitionRecords.get(partitionRecords.size() - 1).offset();
// 同步提交消费位移
consumer.commitSync();
}
System.out.println("consumer offset is " + lastConsumerOffSet);
OffsetAndMetadata offsetAndMetadata = consumer.committed(topicPartition);
System.out.println("consumer offset is " + offsetAndMetadata.offset());
long position = consumer.position(topicPartition);
System.out.println("the offset of the next record is " + position);
}
}
异步提交
手动提交的缺点:当发起提交的时候应用会阻塞。当然我们也可以减少手动提交的次数,但这样会增加消息重复的概率(和自动提交一样)。另外一个解决办法就是异步提交,使用异步提交的API。
但是异步提交也有个缺点,就是失败后不会重试,而同步提交失败后会进行重试。异步提交没有重试是因为同时存在多个异步提交,进行重试可能会导致位移覆盖。举个例子,假如我们发起了一个异步提交commitA,此时的提交位移为2000,随后又发起了一个异步提交commitB且位移为3000;commitA提交失败但commitB成功,此时commitA进行重试并成功的话,会将实际上已经提交的位移从3000回滚到2000,导致消息重复消费。
public static void main(String[] args) {
Properties properties = initConfig();
// 定义消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 定义主题
consumer.subscribe(Collections.singletonList(TOPIC));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
// do some logical processing.
}
// 异步回调
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception == null) {
System.out.println(offsets);
} else {
System.out.println("fail to commit offsets " + offsets);
System.out.println(exception);
}
}
});
}
}finally {
consumer.close();
}
}
6.5 指定位移消费
到目前为止,我们知道消息的拉取是根据poll()方法中的逻辑来处理的,但是这个方法对于普通开发人员来说就是个黑盒处理,无法精确掌握其消费的起始位置。
seek()方法正好提供了这个功能,能够追踪以前的消费或者回溯消息。
// 参数topicPartition 表示分区,offset表示指定从分区的哪个位置开始消费。
consumer.seek(topicPartition, 10);
public static void main(String[] args) {
Properties properties = initConfig();
// 定义消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 定义主题
consumer.subscribe(Collections.singletonList(TOPIC));
consumer.poll(Duration.ofMillis(1000));
// 获取消费者分配的分区
Set<TopicPartition> assignment = consumer.assignment();
System.out.println(assignment);
for (TopicPartition topicPartition : assignment) {
// 参数topicPartition 表示分区,offset表示指定从分区的哪个位置开始消费。
consumer.seek(topicPartition, 10);
}
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.offset() + ":" + record.value());
}
}
}
6.6 再均衡监听器
https://blog.csdn.net/ajianyingxiaoqinghan/article/details/107192256
1、再均衡介绍
再均衡是指分区的所有权从一个消费者转移到另外一个消费者的行为,它为消费组具备了高可用性和伸缩性提供了保障,使我们既方便有安全的删除消费组内的消费者或者往消费组内添加消费者。不过再均衡发生期间,消费者是无法拉取信息的。
2、再均衡策略
- RangeAssignor
原理是按照消费者总数和分区总数进行整除运算,获得一个跨度,然后将分区按照跨度进行平均分配。
对于分区数可以整除消费组内消费者数量的情况(比如一个消费组内有 2 个消费者,某个 Topic 中有 4 个分区),这种方法的分配特性较好。但如果分区数除以消费组的消费者数量有余数(比如一个消费组内有 2 个消费者,某个 Topic 有 3 个分区),则会分配不均。这种情况下,如果类似情形扩大,可能会出现消费者过载情况。
- RoundRobinAssignor
RoundRobinAssignor 分配策略,原理是对某个消费组的所有消费者订阅的所有 Topic 的所有分区进行字典排序,然后用轮询方式将分区逐个分配给各消费者。
合理使用这种分配策略,最主要的要求是:消费组内所有消费者都有相同的订阅 Topic 集合。如果消费组内消费者订阅信息不同,则执行分区分配的时候就不能实现完全的轮询,可能导致分区分配不均的情况。
即:所有消费者订阅的topic一致,则使用这种方案产生的效果就比较合理。
- StickyAssignor
3、再均衡过程
2、Rebalance的过程如下:
第一步:所有成员都向coordinator发送请求(包含各自的分配策略和订阅信息),请求入组。一旦所有成员都发送了请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader。
第二步:leader开始分配消费方案,指明具体哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案发给coordinator。coordinator接收到分配方案之后会把方案发给各个consumer,这样组内的所有成员就都知道自己应该消费哪些分区了。
所以对于Rebalance来说,Coordinator起着至关重要的作用
导致再均衡的操作
- 新的消费者加入消费组;
- 消费者宕机下线(不一定是真的下线,令消费组以为消费者宕机下线的本质原因是消费者长时间未向 GroupCoordinator 发送心跳包);
- 消费者主动退出消费组;
- 消费组对应的 GroupCoordinator 节点发生了变更;
- 任意主题或主题分区数量发生变化;
在拉取消息的时候记录下当前的位移,当监听到再均衡的时候就同步提交一次当前的位移,可以防止消息的丢失。
public static void main(String[] args) {
Properties properties = initConfig();
// 定义消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
Map<TopicPartition, OffsetAndMetadata> currentOffSets = new HashMap<>();
// 定义主题
consumer.subscribe(Collections.singletonList(TOPIC), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 尽量避免重复消费,当发生再均衡的时候立刻提交当前位移
consumer.commitSync(currentOffSets);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// do noting
}
});
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.offset() + ":" + record.value());
// 异步提交消费位移,在发生再均衡动作之前可以通过再均衡监听器的onPartitionsRevoked回调执行commitSync方法同步提交位移
currentOffSets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1));
}
consumer.commitAsync(currentOffSets, null);
}
}
4、再均衡导致的故障
下述记录一次 Kafka 的频繁再均衡故障。平均间隔 2 到 3 分钟就会触发一次再均衡,分析日志发现比较严重。主要日志内容如下:
commit failed
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
这个错误意思是消费者在处理完一批 poll 的消息之后,同步提交偏移量给 Broker 时报错,主要原因是当前消费者线程消费的分区已经被 Broker 节点回收了,所以 Kafka 认为这个消费者已经死了,导致提交失败。
导致该问题的原因,主要涉及构建消费者的一个属性 max.poll.interval.ms。这个属性的意思是消费者两次 poll() 方法调用之间的最大延迟。如果超过这个时间 poll 方法没有被再次调用,则认为该消费者已经死亡,触发消费组的再平衡。该参数的默认值为 300s,但我们业务中设置了 5s。
查询 Kafka 拉取日志后,发现有几条日志由于逻辑问题,单条数据处理时间超过了一分钟,所以在处理一批消息之后,总时间超过了该参数的设置值 5s,导致消费者被踢出消费组,导致再均衡。
解决方法:
1、增加 max.poll.interval.ms 值的大小:将该参数调大至合理值,比如默认的 300s;
2、设置分区拉取阈值:通过用外部循环不断拉取的方式,实现客户端的持续拉取效果。消费者每次调用 poll 方法会拉取一批数据,可以通过设置 max.poll.records 消费者参数,控制每次拉取消息的数量,从而减少每两次 poll 方法之间的拉取时间。
此外,再均衡可能会导致消息的重复消费现象。消费者每次拉取消息之后,都需要将偏移量提交给消费组,如果设置了自动提交,则这个过程在消费完毕后自动执行偏移量的提交;如果设置手动提交,则需要在程序中调用 consumer.commitSync() 方法执行提交操作。
反过来,如果消费者没有将偏移量提交,那么下一次消费者重新与 Broker 相连之后,该消费者会从已提交偏移量处开始消费。问题就在这里,如果处理消息时间较长,消费者被消费组剔除,那么提交偏移量出错。消费者踢出消费组后触发了再均衡,分区被分配给其他消费者,其他消费者如果消费该分区的消息时,由于之前的消费者已经消费了该分区的部分消息,所以这里出现了重复消费的问题。
6.7 消费者拦截器
有生产者拦截器自然又会有消费者拦截器,消费者拦截器主要是在消费到消息的时候或者是在提交的消费位移的时候进行一些定制化的操作。
使用场景
对消费消息设置一个有效的属性,如某条消息在既定的时间窗口内无法到达,那就视为无效,不需要再被处理。
public class ConsumerInterceptorTTL implements ConsumerInterceptor<String, String> {
private static final long EXPIRE_INTERVAL = 10 * 1000;
/**
* 小于一分钟的就放到 ConsumerRecords中继续消费,超时的就丢弃
*
* @param records
* @return
*/
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
System.out.println("before:" + records);
long now = System.currentTimeMillis();
Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<>();
for (TopicPartition topicPartition : records.partitions()) {
List<ConsumerRecord<String, String>> tpRecords = records.records(topicPartition);
List<ConsumerRecord<String, String>> newTpRecords = new ArrayList<>();
for (ConsumerRecord<String, String> tpRecord : tpRecords) {
// 在一分钟之内的就存起来并返回
if (now - tpRecord.timestamp() < EXPIRE_INTERVAL) {
newTpRecords.add(tpRecord);
}
}
if (!newTpRecords.isEmpty()) {
newRecords.put(topicPartition, newTpRecords);
}
}
return new ConsumerRecords<>(newRecords);
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
offsets.forEach((tp, offset) -> System.out.println(tp + ":" + offset.offset()));
}
@Override
public void close() { }
@Override
public void configure(Map<String, ?> configs) {}
}
在配置中指定消费者拦截器
// 指定消费者拦截器
properties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptorTTL.class.getName());
6.8 消费者其他参数
- fetch.min.bytes
这个参数允许消费者指定从broker读取消息时最小的数据量。当消费者从broker读取消息时,如果数据量小于这个阀值,broker会等待指导有足够的数据,然后才会返回给消费者。对于写入量不高的主题来说,这个参数可以减少broker和消费者的压力,因为减少了往返的时间。而对于大量消费者的主题来说,则可以明显减轻broker压力。
- fetch.max.wait.ms
消费者读取时最大等待时间。从而避免长时间阻塞,这个参数默认为500ms。
- max.partition.fetch.bytes
这个参数指定了每个分区返回的最多字节数,默认为1M,也就是说,kafkaConsumer.poll()返回记录时,每个分区的记录字节数最多为1M。如果一个主题有20个分区,同时有5个消费者,那么每个消费者需要4M的空间来处理消息。实际情况中,我们需要设置更多的空间,这样当存在消费者宕机时,其他消费者可以承担更多的分区。
- max.poll.records
这个参数控制一个poll()调用返回的记录数,这个可以用来控制应用在拉取循环中的处理数据量。
- auto.commit.interval.ms
消息自动提交的时间间隔,当开始消息自动提交后又用
- session.timeout.ms
kafka会有一个心跳线程来同步服务端,告诉服务端自己是正常可用的,默认是3秒发送一次心跳,超过session.timeout.ms(默认10秒)服务端没有收到心跳就会认为当前消费者失效
concurrency
container.setConcurrency(3)表示创建三个KafkaMessageListenerContainer实例。
一个KafkaMessageListenerContainer实例分配一个分区进行消费;
如果设置为1的情况下, 这一个实例消费Topic的所有分区;
如果设置多个,那么会平均分配所有分区;
如果实例>分区数; 那么空出来的实例会浪费掉;
如果实例<=分区数 那么会有一部分实例消费多个实例,但也是均衡分配的如果在分布式情况下, 那么总的KafkaMessageListenerContainer实例数= 服务器机器数量*concurrency ;
什么情况下设置concurrency,以及设置多少
这个得看我们给Topic设置的分区数量; 总的来说就是 机器数量*concurrency <= 分区数 例如分区=3; 而且同时有3台机器 ,那么concurrency=1就行了; 设置多了就会浪费资源;、 例如分区=9; 只有3台机器;那么可以concurrency=3 ; 每台机器3个消费者连接3个分区; 那么你可能会问我们concurrency=1不也可以吗; 反正都是一台机器消费3个分区;
话是没有错; 但是他们的差别在 一个线程消费3个分区和 3个线程消费3个分区 , 单线程和多线程你选哪个fetch.message.max.bytes (默认 1MB) – 消费者能读取的最大消息。这个值应该大于或等于message.max.bytes。
调节消息大小:
https://www.cnblogs.com/xingfengzuolang/p/10762464.html
七、主题
7.1 主题管理
7.1.1 创建主题
创建主题在开题已经讲过,下面回顾 一下
// bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic supkingx --partitions 2 --replication-factor 1
使用Zookeeper查看topic
## 连接Zookeeper
./zkCli.sh -server localhost:2181
## 查看topic
[zk: localhost:2181(CONNECTED) 0] get /brokers/topics/supkingx
7.1.2 查看主题
7.1.3 修改主题
kafka-topics.sh --alter --zookeeper localhost:2181 --topic supkingx --config flush.messages=1
kafka-topics.sh --alter --zookeeper localhost:2181 --topic supkingx --delete-config flush.messages=1
7.1.4 删除主题
- 若 delete.topic.enable=true
- 直接彻底删除该topic
- 若delete.topic.enable=false
- 如果当前Topic没有使用过即没有传输过信息:可以彻底删除。
- 如果当前Topic有使用过即传输过信息:并不会真正删除Topic,只是把这个Topic标记为删除(marked for deletion),重启kafka server 后删除。
kafka-topics.sh --delete --zookeeper localhost:2181 --topic supkingx
# 提示已标记为删除
superking@wangchaodeMacBook-Pro bin % kafka-topics.sh --delete --zookeeper localhost:2181 --topic supkingx
Topic supkingx is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
7.1.5 修改分区
kafka-topics.sh --alter --zookeeper localhost:2181 --topic supkingx --partitions 3
7.2 KafkaAdminClient应用
将命令继承到Kafka Manage中,调用一些API来操作Kafka
见代码
public class AdminClientConfigTest {
private static final String BROKER_LIST = "localhost:9092";
private static final String TOPIC = "supkingx";
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 增加分区
// addTopicPartitions();
// 查看主题配置
describeTopicConfig();
}
/**
* 给主题添加分区
*
* @throws ExecutionException
* @throws InterruptedException
*/
private static void addTopicPartitions() throws ExecutionException, InterruptedException {
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
AdminClient adminClient = AdminClient.create(properties);
// 当前主题的分区增加到三个
NewPartitions newPartitions = NewPartitions.increaseTo(3);
Map<String, NewPartitions> newPartitionsMap = new HashMap<>();
newPartitionsMap.put(TOPIC, newPartitions);
CreatePartitionsResult result = adminClient.createPartitions(newPartitionsMap);
result.all().get();
adminClient.close();
}
/**
* 查看主题配置
* @throws ExecutionException
* @throws InterruptedException
*/
private static void describeTopicConfig() throws ExecutionException, InterruptedException {
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
AdminClient adminClient = AdminClient.create(properties);
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC);
DescribeConfigsResult result = adminClient.describeConfigs(Collections.singleton(configResource));
Config config = result.all().get().get(configResource);
System.out.println("主题配置:" + config);
adminClient.close();
}
}
八、分区
深入理解kafka分区的管理,包括副本的选举、分区的重新分配
Kafka可以将主题划分为多个分区(Partition),会根据分区规则选择吧消息存储到哪个分区中,只要如果分区规则设置的合理,那么所有的消息将会被均匀的分不到不同的分区中,这样就实现了负载均衡和水平扩展。另外,多个订阅者可以中一个或者多个分区中同时消费消息,以支持海量数据处理能力。
由于消息是追加到分区中的,多个分区顺序写磁盘的总效率要比随机写内存还要高,也是Kafka高吞吐率的重要保障。
思考:为什么顺序写磁盘的总效率要比随机写内存还要高
随机写会导致磁头不停地换道,造成效率的极大降低;顺序写磁头几乎不用换道,或者换道的时间很短。本文来讨论一下两者具体的差别以及相应的内核调用。
5.1 副本机制
由于Producer和Consumer都只会与Leader角色的分区副本相连,所以Kafka需要以集群的组织形式提供主题下的消息高可用。Kafka支持主备复制,所以消息具有高可用和持久性。
一个分区可以有多个副本,这些副本保存在不同的broker上。每个分区的副本中都会有一个作为Leader。当一个broker失败时,在这台broker上的分区都会变得不可用,如果leader也在这台broker上,kafka会自动移除Leader,再其他副本中选出一个作为新的Leader。
在通常情况下,增加分区可以提高kafka集群的吞吐量。然而,也应该意识到集群的总分区数或是单台服务器上的分区数过多,会增加不可用及延迟的风险。
如上图说是,有一个主题Topic1,它有三个分区,part0、part1、part2,和三个副本(观察上图有有三个topic1-part0、三个topic1-part1、三个topic1-part2)即每个分区有三个副本。
红色的是Leader,当Leader接收到消息后,会同步到另外两个副本中去,提高了系统的高可用。
5.2 分区Leader选举
可以预见的是,如果某个分区的Leader挂了那么其它跟随者将会进行选举产生一个新的Leader,之后所有的读写就会转移到这个新的Leader上,在Kafka中,其不是采用常见的多数选举的方式进行副本的Leade选举,而是会在Zookeeper上针对每个Topic维护一个成为ISR(in-sync-replica,已同步的副本)的集合,显然还有一些副本没有来得及同步。只有这个ISR列表里面的才有资格成为leader(优先使用ISR里面的第一个,如果不行依次类推,因为ISR里面的是同步副本,消息是最完整且各个节点都是一样的)。
通过ISR,kafka需要的冗余度较低,可以容忍的失败数比较高。假如某个topic有f+1个副本,kafka可以容忍f个不可用,当然,如果全部ISR里面的副本都不可用,也可以选择其他可用的副本,只是存在数据的不一致。
关键词
- ISR:in-sync-replica,已同步的副本
OSR:out-of-sync-replicas:与leader副本同步滞后过多的副本。
5.3 分区重新分配及分配策略
会自动分配
按照kafka默认的消费逻辑设定,一个分区只能被同一个消费组(ConsumerGroup)内的一个消费者消费。假设目前某消费组内只有一个消费者C0,订阅了一个Topic,这个topic包含了7个分区,也就是说这个消费者C0订阅了7个分区,参考下图。
如果出现了消费者过多,出现了消费者的数量大于分区的数量的情况,就会有消费者分配不到任何分区。参考下图,一共有8个消费者,7个分区,那么最后的消费者C7由于分配不到任何分区进而就无法消费消息。
kafka默认的分配策略:org.apache,kafka.consumer.RangeAssignor,即采用RangeAssignor分配策略。初次之外kafka还提供了其他策略入,RoundRobinAssignor、StickyAssignor等。九、Kafka存储
9.1 存储结构概述
每个partition(文件夹)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件里。所以partition其实是一个逻辑分区,实际数据是存在segment里面的。但是每个segment file消息数据量不一定相等,这样的特性方便old segment file高速被删除。(默认情况下每个文件的大小为1G)
- 每个partition仅仅需要支持顺序读写即可了。segment文件生命周期由服务端配置参数决定。
9.2 日志索引
9.2.1 数据文件的分段
9.2.2 偏移量索引
一句话,Kafka的Message存储采用了分区(partition),分段(LogSegment)和稀疏索引这个几个手段达到了高效性。9.3 日志清理
9.3.1 日志删除
9.3.2 日志压缩
略9.3.3 磁盘存储优势
十、稳定性
了解kafka高性能、高吞吐的同时通过各种机制来保证高可用
Kafka的消息传输保障机制非常直观。当producer向broker发送消息时,一旦这条消息被commit,由于副本机制(replication)的存在,它就不会丢失。但是如果producer发送数据给broker后,遇到网络问题而造成通信中断,那么producer就无法判断该条消息是否已经提交(commit)。虽然kafka无法确定网络故障发生了什么,但是producer可以retry多次,确保消息已经正确传输到broker中,所以目前kafka实现的是at least once。
10.1 幂等性
幂等性:接口多次调用锁所产生的结果和调用一次是一致的。生产者在进行重试的时候有可能会重复写入消息,而使用kafka的幂等性功能就可以避免这种情况。
幂等性是有条件的:
- 只能保证Producer在单个会话内不丢不重,如果producer出现意外挂掉则无法保证。(幂等性情况下,是无法获取之前的状态信息,因此是无法做到跨会话级别的不丢不重的)。
- 幂等性不能跨越多个Topic-Partition,只能保证单个partition内的幂等性,当涉及多个Topic-Partition时,这中间的状态并没有同步。
Producer使用幂等性示例非常简单,只需要在Producer的配置enable.idempotence=true即可。
// 开启幂等性
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
10.2 事务
场景
幂等性并不能跨多个分区运作,而事务可以弥补这个缺憾,事务可以保证对多个分区写入操作的原子性。操作的原子性是指多个操作要么全部成功,要么全部失败,不存在部分成功部分失败的可能。
为了实现事务,应用程序必须提供唯一的transactionId,这个参数通过客户端程序来进行设定。
// 开始事务,并设置transactionId
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, TRANSACTION_ID);
前期准备
事务要求生产者开启幂等性特性,因此因此通过将transactional.id参数设置为非空从而开启事务特性的同时需要将 ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG 设置为true(默认为true),如果设置为false,则会抛出异常
public class ProducerTransactionSend {
private static final String BROKER_LIST = "localhost:9092";
private static final String TOPIC = "supkingx";
private static final String TRANSACTION_ID = "transactionId";
public static void main(String[] args) {
Properties properties = new Properties();
// 设置key序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 设置重试次数
properties.put(ProducerConfig.RETRIES_CONFIG, 10);
// 设置值序列化器
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 设置集群地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
// 开始事务,并设置transactionId
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, TRANSACTION_ID);
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 初始化事务
producer.initTransactions();
// 开启事务
producer.beginTransaction();
try {
// 处理业务逻辑
ProducerRecord<String, String> record1 = new ProducerRecord<>(TOPIC, "message-supkingx-1");
producer.send(record1);
ProducerRecord<String, String> record2 = new ProducerRecord<>(TOPIC, "message-supkingx-2");
producer.send(record2);
ProducerRecord<String, String> record3 = new ProducerRecord<>(TOPIC, "message-supkingx-3");
producer.send(record3);
// 事务提交
producer.commitTransaction();
} catch (Exception e) {
// 事务回滚
producer.abortTransaction();
}
producer.close();
}
}
10.3 控制器
zookeeper选取。
https://blog.csdn.net/chengyuqiang/article/details/79190061
10.4 可靠性保证
- 可靠性保证:确保系统在各种不同的环境下能够发生一致的行为。
- kafka的保证
- 保证分区消息的顺序
- 如果使用同一个生产者往同一个分区写入消息,而消息B在消息A之后写入
- 那么kafka可以保证消息B的偏移量比消息A的偏移量大,而且消费者会先读取消息A在读取消息B
- 只有当消息被写入分区的所有同步副本时,它才被认为已提交。
- 生产者可以选择接受不同类型的确认,控制参数acks。
- 只要还有一个副本时活跃的,那么 已提交的消息就不会丢失
- 消费者只能读取已经提交的消息。
- 保证分区消息的顺序
失效的副本
副本的复制
10.5 一致性保证
简而言之就是HW就是 当broker宕机之后,会保留自己已有的数据(follower节点则是保留已经同步过来的数据)
数据丢失场景
此时 A同步到B,A接收消息到了1的位置,而B此时只同步到0的位置,在这时B做了一次重启的操作,保留了HW之前的数据,也就是0。此时A突然挂掉了,其保留1之前的数据。leader重新选举之后B做了leader,这是B的数据同步到A(恢复后),A也就丢失了1,只有数据0。
数据不一致场景
A(消息到0 1)B(消息到0)都宕机了,重启之后B成为了leader并且重新受到了一条消息,此时B的1位置是一条新的消息,然后A同步了B的消息之后,A在1位置的消息也就被覆盖了。
以上两个问题的解决方案 — leader epoch
A(消息到0 1)B(消息到0)都宕机了,重启之后B成为了leader并且重新受到了一条消息,此时B的1位置是一条新的消息,然后A同步了B的消息之后,A在1位置的消息也就被覆盖了。
10.6 消息重复的场景及解决方案
10.6.1 生产端重复
10.6.2 消费者重复
总结
十一、高级应用
11.1 命令行工具
11.1.1 消费组管理
- 查看消费组 ```shell superking@wangchaodeMacBook-Pro bin % kafka-consumer-groups.sh —bootstrap-server localhost:9092 —list
结果
group.demo console-consumer-14255
启动消费端后,就会出现消费组<br />消费端需要配置
```java
properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
查看消费组详情
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group group.demo
查看消费组当前状态 ``` kafka-consumer-groups.sh —bootstrap-server localhost:9092 —describe —group group.demo —state
Consumer group ‘group.demo’ has no active members.
GROUP COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS group.demo localhost:9092 (0) Empty 0
- 消费组内成员信息
bin % kafka-consumer-groups.sh —bootstrap-server localhost:9092 —describe —group group.demo —members
- 删除消费组
bin % kafka-consumer-groups.sh —bootstrap-server localhost:9092 —delete —group group.demo —members
<a name="e0a6c577"></a>
#### 11.1.2 消费位移管理
重置消费位移,前提是没有消费组在消费
kafka-consumer-groups.sh —bootstrap-server localhost:9092 —group group.demo —all-topics —reset-offsets —to-earliest —execute
--all-topics 指所有主题,可以修改为--topics,指定单个主题。<br />--to-earliest 移到最后
<a name="a952f791"></a>
### 11.2 数据管道(Connect)
<a name="d784097d"></a>
#### 11.2.1 概述
![image.png](https://cdn.nlark.com/yuque/0/2022/png/1460038/1646968407294-d9fe457a-267d-468a-8197-71687f8bcb9a.png#clientId=u99d2b2ad-bbe7-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=477&id=ub668208a&margin=%5Bobject%20Object%5D&name=image.png&originHeight=954&originWidth=2082&originalType=binary&ratio=1&rotation=0&showTitle=false&size=1366611&status=done&style=none&taskId=ue39eed52-c826-45f7-8c49-0f466b34931&title=&width=1041)
在kafka connect 中还有两个重要的概念:task和worker。<br />Connect中一些概念<br />连接器:实现了Connect API,决定需要运行多少个任务,按照任务来进行数据复制,从work进程获取任务配置并将其传递下去。<br />任务:负责将数据移入或者移除kafka<br />work进程:相当于Connector和任务的容器,用于负责管理连接器的配置、启动连接器和连接器任务、提供REST API<br />转换器:kafka Connect 和其他存储系统直接发送或者接受数据之间转换数据。
<a name="410dd437"></a>
#### 11.2.2 独立模式-文件系统
<a name="c931653c-1"></a>
##### 场景
以下示例使用到了两个Connector,将文件source.txt中的内容通过Source连接器写入到kafka主题中,然后将内容写入source.sink.txt中。
- FileStreamSource:从source.txt中读取并发布到Broker中。
- FileStreamSink:从broker中读取数据并写入到source.sink.txt中。
<a name="d0569bec"></a>
##### 步骤详情
![image.png](https://cdn.nlark.com/yuque/0/2022/png/1460038/1646968373692-1e6924d7-6591-4efc-ba3b-09f711f94bd8.png#clientId=u99d2b2ad-bbe7-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=339&id=u39a12bd6&margin=%5Bobject%20Object%5D&name=image.png&originHeight=678&originWidth=1320&originalType=binary&ratio=1&rotation=0&showTitle=false&size=469170&status=done&style=none&taskId=u3fffc08c-d4fb-426a-9ab1-68a226c025f&title=&width=660)<br />首先看下worker进场用到的配置文件${KAFKA_HOME}/config/connect-standalone.properties
// kafka集群连接地址 bootstrap.servers=localhost:9092 // 格式转化类 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter // json消息中包含的schema key.converter.schemas.enable=true value.converter.schemas.enable=true // 保存偏移量的文件路径 offset.storage.file.filename=/tmp/connect.offsets // 设定提交偏移量的频率 offset.flush.interval.ms=10000
Source使用到的配置文件是${KAFKA_HOME}/config/connect-file-source.properties
// 配置连接器的名称 name=local-file-source // 连接器的全限定名称,设置类名称也是可以的 connector.class=FileStreamSource // task数量 tasks.max=1 // 数据源的文件路径 file=test.txt // 主题名称 topic=connect-test%
启用slink使用到的配置文件是${KAFKA_HOME}/config/connect-file-sink.properties
name=local-file-sink connector.class=FileStreamSink tasks.max=1 file=test.sink.txt topics=connect-test
启动source连接器
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties
启用slink连接器
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties
略
<a name="d4e8c84f"></a>
### 11.3 SpringBoot整合Kafka
<a name="eec03a64"></a>
#### 11.3.1 快速使用kafka
- 添加pom.xml
```xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
设置application.properties
logging.level.root=INFO
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.consumer.bootstrap-servers=localhost:9092
生产者
@RestController
@RequestMapping("/hello")
public class Test {
private static final String TOPIC = "supkingx";
@Autowired
private KafkaTemplate<String, String> template;
@GetMapping("/index")
public String index() {
return "hello supkingx";
}
@GetMapping("/send")
public String sendMsg(@RequestParam("msg") String msg) {
this.template.send(TOPIC, msg);
return msg;
}
}
消费者
@Configuration
public class Listener {
private static final Logger LOGGER = LoggerFactory.getLogger(Test.class);
private static final String TOPIC = "supkingx";
private static final String GROUP_ID = "group.demo";
/**
* id 就是个标识
*
* @param msg
*/
@KafkaListener(id = "", topics = TOPIC, groupId = GROUP_ID)
public void listener(String msg) {
LOGGER.info("收到消息" + msg);
}
}
11.3.2 事务
# kafka事务的支持
spring.kafka.producer.transaction-id-prefix=kafka_tx.
- 方式一:开启事务之后需要使用 template.executeInTransaction 发送消息。
@GetMapping("/send")
public String sendMsg(@RequestParam("msg") String msg) {
// this.template.send(TOPIC, msg);
// 事务的支持
template.executeInTransaction(t -> {
t.send(TOPIC, msg);
if("error".equals(msg)){
throw new RuntimeException("msg is error");
}
t.send(TOPIC, msg + "another");
return true;
});
return msg;
}
- 方式二:使用@Transactional进行支持
@GetMapping("/sendMsgTransaction")
@Transactional(rollbackFor = RuntimeException.class)
public String sendMsgTransaction(@RequestParam("msg") String msg) {
// 事务的支持
template.send(TOPIC, msg);
template.send(TOPIC, msg + "another");
return msg;
}
十二、监控
知道kafka的监控体系 掌握JMX监控指标 数据异动实时提醒
12.1 监控度量指标
12.1.1 JMX
JMX(java managent extension)
在使用JMX之前要确保Kafka开启了JMX的功能(默认是关闭的),kafka启动时要添加JMX_PORT=9999这一项,也就是:
JMX_PORT=9999 bin/kafka-server-start.sh config/server.properties
开启JMX之后会在Zookeeper的/brokers/ids/ 节点中有对应的呈现(jmx_port字段对应的值),如下: