安装

kafka安装:https://blog.csdn.net/RenshenLi/article/details/118739136

kafaka下载地址:https://kafka.apache.org/downloads
image.png

  1. # Kafka 依赖于 ZK,先启动 ZK
  2. bin/zookeeper-server-start.sh -daemon config/zookeeper.properties &
  3. # 启动 Kafka 服务器
  4. nohup bin/kafka-server-start.sh config/server.properties &
  5. #创建测试topic
  6. bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
  7. # 查看 Topic 列表
  8. bin/kafka-topics.sh --list --zookeeper localhost:2181

修改kafka_2.13-2.7.0/config/server.properties

listeners=PLAINTEXT://0.0.0.0:9092

advertised.listeners=PLAINTEXT://192.168.1.10:9092

Topics(主题)

Topic在逻辑上可以被认为是一个queue,每条消费都必须指定它的Topic,可以简单理解为必须指明把这条消息放进哪个queue里。

Partition(分区)

image.png
为了使得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。创建一个topic时,同时可以指定分区数目,分区数越多,其吞吐量也越大,但是需要的资源也越多,同时也会导致更高的不可用性,kafka在接收到生产者发送的消息之后,会根据均衡策略将消息存储到不同的分区中。因为每条消息都被append到该Partition中,属于顺序写磁盘,因此效率非常高(经验证,顺序写磁盘效率比随机写内存还要高,这是Kafka高吞吐率的一个很重要的保证)。

由于不同 Partition 可位于不同机器,因此可以充分利用集群优势,实现机器间的并行处理。另一方面,由于 Partition 在物理上对应一个文件夹,即使多个 Partition 位于同一个节点,也可通过配置让同一节点上的不同 Partition 置于不同的磁盘上,从而实现磁盘间的并行处理,充分发挥多磁盘的优势。

Consumer Group(消费者组)

image.png
image.png
这是Kafka用来实现一个Topic消息的广播(发给所有的Consumer)和单播(发给某一个Consumer)的手段。一个Topic可以对应多个Consumer Group。如果需要实现广播,只要每个Consumer有一个独立的Group就可以了。要实现单播只要所有的Consumer在同一个Group里。用Consumer Group还可以将Consumer进行自由的分组而不需要多次发送消息到不同的Topic。

同一个topic,每个消费者组都可以拿到相同的全部数据。

消费者组与分区

Group中的一个分区只能被一个消费者消费,一个消费者可以消费多个分区
有三种情况:
1.消费者实列数小于分区数:这种情况会下,一个消费者实列会消费多个分区
2.消费者实列数等于分区数: 一个消费者消费一个分区
3.消费者实列数大于分区数:会有几个消费者处于空闲的状态下。

理想情况下,Group中的Consumer实例的数量应该等于该Group订阅主题的总分区数。

offset(消费位移)

消费者在消费的过程中要记录自己消费了多少数据,即消费位置信息,在Kafka中叫:位移(offset)。
看上去该Offset就是一个数字而已,其实对于Consumer Group 而言,它是一组KV对,Key是分区,V对应Consumer 消费该分区的最新位移。
image.png

SpringBoot集成Kafka

image.png
https://gitee.com/zlintent/imooc-study-ecommerce.git
maven依赖

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.5.0.RELEASE</version>
        </dependency>

yaml配置

spring:
  # SpringBoot 集成 Kafka 的配置, 最低配置只需要配置 spring.kafka.bootstrap-servers
  kafka:
    bootstrap-servers: 192.168.1.10:9092

#    consumer:
      # 如果 Consumer 没有指定 group-id, 则使用配置文件中配置的; 如果配置文件中也没有定义, 则由框架随机生成
#      group-id: imooc-study-ecommerce
#      auto-offset-reset: latest
#      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#    producer:
#      key-serializer: org.apache.kafka.common.serialization.StringSerializer
#      value-serializer: org.apache.kafka.common.serialization.StringSerializer

配置类

/**
 * <h1>通过代码自定义 Kafka 配置</h1>
 */
@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    /**
     * Kafka Producer 工厂类配置
     * k,v序列号方式 String,String
     *
     * @return
     */
    @Bean
    public ProducerFactory<String, String> producerFactory() {

        Map<String, Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(configs);
    }

    /**
     * <h2>Kafka Producer 客户端</h2>
     */
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    /**
     * <h2>Kafka Consumer 工厂类配置</h2>
     */
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {

        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);//最多拉取多少条数据
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(props);
    }

    /**
     * <h2>Kafka Consumer 监听器工厂类配置</h2>
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        // 并发数就是一个消费者实例起几个线程
        factory.setConcurrency(3);
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }
}

生产者

/**
 * <h1>kafka 生产者</h1>
 */
@Slf4j
@Component
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    /**
     * 发送 kafka 消息
     *
     * @param key   消息分区,可为空
     * @param value 消息内容
     * @param topic 主题
     */
    public void sendMessage(String key, String value, String topic) {

        if (StringUtils.isBlank(value) || StringUtils.isBlank(topic)) {
            throw new IllegalArgumentException("value or topic is null or empty");
        }

        ListenableFuture<SendResult<String, String>> future = StringUtils.isBlank(key) ?
                kafkaTemplate.send(topic, value) : kafkaTemplate.send(topic, key, value);

        // 异步回调的方式获取通知
        future.addCallback(
                success -> {
                    assert null != success && null != success.getRecordMetadata();
                    // 发送到 kafka 的 topic
                    String _topic = success.getRecordMetadata().topic();
                    // 消息发送到的分区
                    int partition = success.getRecordMetadata().partition();
                    // 消息在分区内的 offset
                    long offset = success.getRecordMetadata().offset();

                    log.info("send kafka message success: [{}], [{}], [{}]", _topic, partition, offset);
                },
                failure -> {
                    log.error("send kafka message failure: [{}], [{}], [{}]", key, value, topic);
                }
        );

        // 同步等待的方式获取通知
        try {
//            SendResult<String, String> sendResult = future.get();
            SendResult<String, String> sendResult = future.get(5, TimeUnit.SECONDS);//5秒没返回,就抛出异常

            // 发送到 kafka 的 topic
            String _topic = sendResult.getRecordMetadata().topic();
            // 消息发送到的分区
            int partition = sendResult.getRecordMetadata().partition();
            // 消息在分区内的 offset
            long offset = sendResult.getRecordMetadata().offset();

            log.info("send kafka message success: [{}], [{}], [{}]", _topic, partition, offset);
        } catch (Exception ex) {
            log.error("send kafka message failure: [{}], [{}], [{}]", key, value, topic);
        }
    }
}

消费者

/**
 * <h1>Kafka 消费者</h1>
 */
@Slf4j
@Component
public class KafkaConsumer {

    @Autowired
    private ObjectMapper mapper;

    /**
     * <h2>监听 Kafka 消息并消费</h2>
     */
    @KafkaListener(topics = {"qinyi-springboot"}, groupId = "qinyi-springboot-kafka")
    public void listener01(ConsumerRecord<String, String> record) throws Exception {

        String key = record.key();
        String value = record.value();

        QinyiMessage kafkaMessage = mapper.readValue(value, QinyiMessage.class);
        log.info("listener01: [{}], [{}]", key, mapper.writeValueAsString(kafkaMessage));
    }

    /**
     * <h2>监听 Kafka 消息并消费</h2>
     * 不知道k,v类型的情况
     */
    @KafkaListener(topics = {"qinyi-springboot"}, groupId = "qinyi-springboot-kafka-1")
    public void listener02(ConsumerRecord<?, ?> record) throws Exception {

        Optional<?> _kafkaMessage = Optional.ofNullable(record.value());
        if (_kafkaMessage.isPresent()) {
            Object message = _kafkaMessage.get();
            QinyiMessage kafkaMessage = mapper.readValue(message.toString(), QinyiMessage.class);
            log.info("listener02: [{}]", mapper.writeValueAsString(kafkaMessage));
        }
    }
}

测试

/**
 * <h1>SpringBoot 集成 kafka 发送消息</h1>
 */
@Slf4j
@RestController
@RequestMapping("/kafka")
public class KafkaController {

    @Autowired
    private ObjectMapper mapper;
    @Autowired
    private KafkaProducer kafkaProducer;


    /**
     * <h2>发送 kafka 消息</h2>
     */
    @GetMapping("/send-message")
    public void sendMessage(@RequestParam(required = false) String key, @RequestParam String topic) throws Exception {

        QinyiMessage message = new QinyiMessage(1, "Imooc-Study-Ecommerce");
        kafkaProducer.sendMessage(key, mapper.writeValueAsString(message), topic);
    }
}
GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/kafka/send-message?key=qinyi&topic=qinyi-springboot

GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/kafka/send-message?topic=qinyi-springboot

image.png
image.png