安装
kafka安装:https://blog.csdn.net/RenshenLi/article/details/118739136
kafaka下载地址:https://kafka.apache.org/downloads
# Kafka 依赖于 ZK,先启动 ZKbin/zookeeper-server-start.sh -daemon config/zookeeper.properties &# 启动 Kafka 服务器nohup bin/kafka-server-start.sh config/server.properties &#创建测试topicbin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test# 查看 Topic 列表bin/kafka-topics.sh --list --zookeeper localhost:2181
修改kafka_2.13-2.7.0/config/server.properties
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://192.168.1.10:9092
Topics(主题)
Topic在逻辑上可以被认为是一个queue,每条消费都必须指定它的Topic,可以简单理解为必须指明把这条消息放进哪个queue里。
Partition(分区)

为了使得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。创建一个topic时,同时可以指定分区数目,分区数越多,其吞吐量也越大,但是需要的资源也越多,同时也会导致更高的不可用性,kafka在接收到生产者发送的消息之后,会根据均衡策略将消息存储到不同的分区中。因为每条消息都被append到该Partition中,属于顺序写磁盘,因此效率非常高(经验证,顺序写磁盘效率比随机写内存还要高,这是Kafka高吞吐率的一个很重要的保证)。
由于不同 Partition 可位于不同机器,因此可以充分利用集群优势,实现机器间的并行处理。另一方面,由于 Partition 在物理上对应一个文件夹,即使多个 Partition 位于同一个节点,也可通过配置让同一节点上的不同 Partition 置于不同的磁盘上,从而实现磁盘间的并行处理,充分发挥多磁盘的优势。
Consumer Group(消费者组)


这是Kafka用来实现一个Topic消息的广播(发给所有的Consumer)和单播(发给某一个Consumer)的手段。一个Topic可以对应多个Consumer Group。如果需要实现广播,只要每个Consumer有一个独立的Group就可以了。要实现单播只要所有的Consumer在同一个Group里。用Consumer Group还可以将Consumer进行自由的分组而不需要多次发送消息到不同的Topic。
消费者组与分区
Group中的一个分区只能被一个消费者消费,一个消费者可以消费多个分区
有三种情况:
1.消费者实列数小于分区数:这种情况会下,一个消费者实列会消费多个分区
2.消费者实列数等于分区数: 一个消费者消费一个分区
3.消费者实列数大于分区数:会有几个消费者处于空闲的状态下。
理想情况下,Group中的Consumer实例的数量应该等于该Group订阅主题的总分区数。
offset(消费位移)
消费者在消费的过程中要记录自己消费了多少数据,即消费位置信息,在Kafka中叫:位移(offset)。
看上去该Offset就是一个数字而已,其实对于Consumer Group 而言,它是一组KV对,Key是分区,V对应Consumer 消费该分区的最新位移。
SpringBoot集成Kafka

https://gitee.com/zlintent/imooc-study-ecommerce.git
maven依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.0.RELEASE</version>
</dependency>
yaml配置
spring:
# SpringBoot 集成 Kafka 的配置, 最低配置只需要配置 spring.kafka.bootstrap-servers
kafka:
bootstrap-servers: 192.168.1.10:9092
# consumer:
# 如果 Consumer 没有指定 group-id, 则使用配置文件中配置的; 如果配置文件中也没有定义, 则由框架随机生成
# group-id: imooc-study-ecommerce
# auto-offset-reset: latest
# key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# producer:
# key-serializer: org.apache.kafka.common.serialization.StringSerializer
# value-serializer: org.apache.kafka.common.serialization.StringSerializer
配置类
/**
* <h1>通过代码自定义 Kafka 配置</h1>
*/
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
/**
* Kafka Producer 工厂类配置
* k,v序列号方式 String,String
*
* @return
*/
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configs);
}
/**
* <h2>Kafka Producer 客户端</h2>
*/
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
/**
* <h2>Kafka Consumer 工厂类配置</h2>
*/
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);//最多拉取多少条数据
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
/**
* <h2>Kafka Consumer 监听器工厂类配置</h2>
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
// 并发数就是一个消费者实例起几个线程
factory.setConcurrency(3);
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
生产者
/**
* <h1>kafka 生产者</h1>
*/
@Slf4j
@Component
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
/**
* 发送 kafka 消息
*
* @param key 消息分区,可为空
* @param value 消息内容
* @param topic 主题
*/
public void sendMessage(String key, String value, String topic) {
if (StringUtils.isBlank(value) || StringUtils.isBlank(topic)) {
throw new IllegalArgumentException("value or topic is null or empty");
}
ListenableFuture<SendResult<String, String>> future = StringUtils.isBlank(key) ?
kafkaTemplate.send(topic, value) : kafkaTemplate.send(topic, key, value);
// 异步回调的方式获取通知
future.addCallback(
success -> {
assert null != success && null != success.getRecordMetadata();
// 发送到 kafka 的 topic
String _topic = success.getRecordMetadata().topic();
// 消息发送到的分区
int partition = success.getRecordMetadata().partition();
// 消息在分区内的 offset
long offset = success.getRecordMetadata().offset();
log.info("send kafka message success: [{}], [{}], [{}]", _topic, partition, offset);
},
failure -> {
log.error("send kafka message failure: [{}], [{}], [{}]", key, value, topic);
}
);
// 同步等待的方式获取通知
try {
// SendResult<String, String> sendResult = future.get();
SendResult<String, String> sendResult = future.get(5, TimeUnit.SECONDS);//5秒没返回,就抛出异常
// 发送到 kafka 的 topic
String _topic = sendResult.getRecordMetadata().topic();
// 消息发送到的分区
int partition = sendResult.getRecordMetadata().partition();
// 消息在分区内的 offset
long offset = sendResult.getRecordMetadata().offset();
log.info("send kafka message success: [{}], [{}], [{}]", _topic, partition, offset);
} catch (Exception ex) {
log.error("send kafka message failure: [{}], [{}], [{}]", key, value, topic);
}
}
}
消费者
/**
* <h1>Kafka 消费者</h1>
*/
@Slf4j
@Component
public class KafkaConsumer {
@Autowired
private ObjectMapper mapper;
/**
* <h2>监听 Kafka 消息并消费</h2>
*/
@KafkaListener(topics = {"qinyi-springboot"}, groupId = "qinyi-springboot-kafka")
public void listener01(ConsumerRecord<String, String> record) throws Exception {
String key = record.key();
String value = record.value();
QinyiMessage kafkaMessage = mapper.readValue(value, QinyiMessage.class);
log.info("listener01: [{}], [{}]", key, mapper.writeValueAsString(kafkaMessage));
}
/**
* <h2>监听 Kafka 消息并消费</h2>
* 不知道k,v类型的情况
*/
@KafkaListener(topics = {"qinyi-springboot"}, groupId = "qinyi-springboot-kafka-1")
public void listener02(ConsumerRecord<?, ?> record) throws Exception {
Optional<?> _kafkaMessage = Optional.ofNullable(record.value());
if (_kafkaMessage.isPresent()) {
Object message = _kafkaMessage.get();
QinyiMessage kafkaMessage = mapper.readValue(message.toString(), QinyiMessage.class);
log.info("listener02: [{}]", mapper.writeValueAsString(kafkaMessage));
}
}
}
测试
/**
* <h1>SpringBoot 集成 kafka 发送消息</h1>
*/
@Slf4j
@RestController
@RequestMapping("/kafka")
public class KafkaController {
@Autowired
private ObjectMapper mapper;
@Autowired
private KafkaProducer kafkaProducer;
/**
* <h2>发送 kafka 消息</h2>
*/
@GetMapping("/send-message")
public void sendMessage(@RequestParam(required = false) String key, @RequestParam String topic) throws Exception {
QinyiMessage message = new QinyiMessage(1, "Imooc-Study-Ecommerce");
kafkaProducer.sendMessage(key, mapper.writeValueAsString(message), topic);
}
}
GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/kafka/send-message?key=qinyi&topic=qinyi-springboot
GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/kafka/send-message?topic=qinyi-springboot


