Kafka Java代码编程

先创建maven项目,db_kafka
添加kafka的maven依赖

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>2.4.1</version>
  5. </dependency>

生产者代码

 /**
 * 需求:Java代码实现生产者代码
 */
public class ProducerDemo {

   public static void main(String[] args) {

      Properties prop = new Properties();
      //指定kafka的broker地址
      prop.put("bootstrap.servers", "bigdata01:9092,bigdata02:9092,bigdata03:9092");
      //指定key-value数据的序列化格式
      prop.put("key.serializer", StringSerializer.class.getName());
      prop.put("value.serializer", StringSerializer.class.getName());

      //指定topic
      String topic = "hello"; 

      //创建kafka生产者
      KafkaProducer<String, String> producer = new KafkaProducer<String,String>(prop);

      //向topic中生产数据
      producer.send(new ProducerRecord<String, String>(topic, "hello kafka"));

      //关闭链接
      producer.close();

   }

}

等一会我们把消费者代码实现好了以后一起验证

消费者代码

/**
 * 需求:Java代码实现消费者代码
 */
public class ConsumerDemo {

   public static void main(String[] args) {

      Properties prop = new Properties();
      //指定kafka的broker地址
      prop.put("bootstrap.servers", "bigdata01:9092,bigdata02:9092,bigdata03:9092");
      //指定key-value的反序列化类型
      prop.put("key.deserializer", StringDeserializer.class.getName());
      prop.put("value.deserializer", StringDeserializer.class.getName());
      //指定消费者组
      prop.put("group.id", "con-1");

      //创建消费者
      KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(prop);
      Collection<String> topics = new ArrayList<String>();
      topics.add("hello");
      //订阅指定的topic
      consumer.subscribe(topics);

      while(true) {
         //消费数据【注意:需要修改jdk编译级别为1.8,否则Duration.ofSeconds(1)会语法报错】
         ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(1));
         for (ConsumerRecord<String,String> consumerRecord : poll) {
            System.out.println(consumerRecord);
         }
      }

   }

}

注意:

  1. 关闭kafka服务器的防火墙
  2. 配置windows的hosts文件 添加kafka节点的hostname和ip的映射关系。[如果我们的hosts文件中没有对kafka节点的 hostnam和ip的映射关系做配置,在这经过多次尝试连接不上就会报错]

先开启消费者。
发现没有消费到数据,这个topic中是有数据的,为什么之前的数据没有消费出来呢?不要着急,先带着这个问题往下面看
image.png
再开启生产者,生产者会生产一条数据,然后就结束
image.png
此时回到kafka的消费者端就可以看到消费出来的数据了 :::info ConsumerRecord(topic = hello, partition = 3, leaderEpoch = 3, offset = 0, CreateTime = 1591687499753, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka) ::: 所以这个时候我们发现,新产生的数据我们是可以消费到的,但是之前的数据我们就无法消费了,那下面我们来分析一下这个问题

消费者代码扩展

        //开启自动提交offset功能,默认就是开启的
        prop.put("enable.auto.commit", "true");
        //自动提交offset的时间间隔,单位是毫秒
        prop.put("auto.commit.interval.ms", "5000");
        /*
        注意:正常情况下,kafka消费数据的流程是这样的
        先根据group.id指定的消费者组到kafka中查找之前保存的offset信息
        如果查找到了,说明之前使用这个消费者组消费过数据,则根据之前保存的offset继续进行消费
        如果没查找到(说明第一次消费),或者查找到了,但是查找到的那个offset对应的数据已经不存在了
        这个时候消费者该如何消费数据?
        (因为kafak默认只会保存7天的数据,超过时间数据会被删除)

        此时会根据auto.offset.reset的值执行不同的消费逻辑

        这个参数的值有三种:[earliest,latest,none]
        earliest:表示从最早的数据开始消费(从头消费)
        latest【默认】: 表示从最新的数据开始消费
        none:如果根据指定的group.id没有找到之前消费的offset信息,就会抛异常

        解释:【查找到了,但是查找到的那个offset对应的数据已经不存在了】
        假设你第一天使用一个消费者去消费了一条数据,然后就把消费者停掉了
        等了7天之后,你又使用这个消费者去消费数据
        这个时候,这个消费者启动的时候会到kafka里面查询它之前保存的offset信息
        但是那个offset对应的数据已经被删了,所以此时再根据这个offset去消费是消费不到数据的。


        总结:一般在实时计算的场景下,这个参数的值建议设置为latest,消费最新的数据

        这个参数只有在消费者第一次消费数据,或者之前保存的offset信息已过期的情况下才会生效
         */
        prop.put("auto.offset.reset", "latest");

此时我们来验证一下,
先启动一次生产者
再启动一次消费者,看看消费者能不能消费到这条数据,如果能消费到,就说明此时是根据上次保存的offset信息进行消费了。
结果发现是可以消费到的。

注意:消费者消费到数据之后,不要立刻关闭程序,要至少等5秒,因为自动提交offset的时机是5秒提交一次

auto.offset.reset置为earliest,修改一下group.id的值,相当于使用一个新的消费者,验证一下,看是否能把这个topic中的所有数据都取出来,因为新的消费者第一次肯定是获取不到offset信息的,所以就会根据auto.offset.reset的值来消费数据

prop.put("group.id", "con-2");

prop.put("auto.offset.reset","earliest");

结果发现确实把之前的所有数据都消费过来了

ConsumerRecord(topic = hello, partition = 2, leaderEpoch = 0, offset = 0, CreateTime = 1591672800863, serialized key size = -1, serialized value size = 4, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hehe)
ConsumerRecord(topic = hello, partition = 3, leaderEpoch = 3, offset = 0, CreateTime = 1591687499753, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
ConsumerRecord(topic = hello, partition = 4, leaderEpoch = 5, offset = 0, CreateTime = 1591687864482, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)

此时,关闭消费者(需要等待5秒,这样才会提交offset),再重新启动,发现没有消费到数据,说明此时就根据上次保存的offset来消费数据了,因为没有新数据产生,所以就消费不到了。

qmq的offset

image.png

Consumer消费offset查询

kafka0.9版本以前,消费者的offset信息保存在zookeeper中 从kafka0.9开始,使用了新的消费API,消费者的信息会保存在kafka里面的__consumer_offsets这个topic中 因为频繁操作zookeeper性能不高,所以kafka在自己的topic中负责维护消费者的offset信息。

#server.properties的log.dirs
cd /data/soft/kafka_2.12-2.4.1/kafka-logs

image.png

如何查询保存在kafka中的Consumer的offset信息呢?

使用kafka-consumer-groups.sh这个脚本可以查看
查看目前所有的consumer group

bin/kafka-consumer-groups.sh --list  --bootstrap-server localhost:9092

image.png
具体查看某一个consumer group的信息

[root@bigdata1 kafka_2.12-2.4.1]# bin/kafka-consumer-groups.sh --describe  --bootstrap-server localhost:9092 --group con-1

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
con-1           hello           2          26              26              0               consumer-con-1-1-cf7ea462-9133-42b9-8068-2e7f7591570b /192.168.1.4    consumer-con-1-1
con-1           hello           3          26              26              0               consumer-con-1-1-cf7ea462-9133-42b9-8068-2e7f7591570b /192.168.1.4    consumer-con-1-1
con-1           hello           1          28              28              0               consumer-con-1-1-cf7ea462-9133-42b9-8068-2e7f7591570b /192.168.1.4    consumer-con-1-1
con-1           hello           0          12              12              0               consumer-con-1-1-cf7ea462-9133-42b9-8068-2e7f7591570b /192.168.1.4    consumer-con-1-1
con-1           hello           4          17              17              0               consumer-con-1-1-cf7ea462-9133-42b9-8068-2e7f7591570b /192.168.1.4    consumer-con-1-1

GROUP:当前消费者组,通过group.id指定的值 TOPIC:当前消费的topic PARTITION:消费的分区 CURRENT-OFFSET:消费者消费到这个分区的offset LOG-END-OFFSET:当前分区中数据的最大offset LAG:当前分区未消费数据量

再执行一次生产者代码,生产一条数据,重新查看一下这个消费者的offset情况

[root@bigdata1 kafka_2.12-2.4.1]# bin/kafka-consumer-groups.sh --describe  --bootstrap-server localhost:9092 --group con-1

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
con-1           hello           2          26              26              0               consumer-con-1-1-cf7ea462-9133-42b9-8068-2e7f7591570b /192.168.1.4    consumer-con-1-1
con-1           hello           3          27              27              0               consumer-con-1-1-cf7ea462-9133-42b9-8068-2e7f7591570b /192.168.1.4    consumer-con-1-1
con-1           hello           1          28              28              0               consumer-con-1-1-cf7ea462-9133-42b9-8068-2e7f7591570b /192.168.1.4    consumer-con-1-1
con-1           hello           0          12              12              0               consumer-con-1-1-cf7ea462-9133-42b9-8068-2e7f7591570b /192.168.1.4    consumer-con-1-1
con-1           hello           4          17              17              0               consumer-con-1-1-cf7ea462-9133-42b9-8068-2e7f7591570b /192.168.1.4    consumer-con-1-1

可以看到PARTITION-3,OFFSET由26变27

Consumer消费顺序

当一个消费者消费一个partition时候,消费的数据顺序和此partition数据的生产顺序是一致的
当一个消费者消费多个partition时候,消费者按照partition的顺序,首先消费一个partition,当消费完一个partition最新的数据后再消费其它partition中的数据 :::info topic:hello

1 2 3 4 5 7

partition-1:2 4 7
partition-2:1 3 5

消费者组A
consumer-1
consumer-2

consumer-1消费partition-1
2 4 7
consumer-2消费partition-2
1 3 5

消费者组B
consumer-3
2 1 4 3 7 5 :::

总之:如果一个消费者消费多个partiton,只能保证消费的数据顺序在一个partition内是有序的

也就是说消费kafka中的数据只能保证消费partition内的数据是有序的,多个partition之间是无序的。

Kafka的三种语义

kafka可以实现以下三种语义,这三种语义是针对消费者而言的:

至少一次:at-least-once

这种语义有可能会对数据重复处理
实现至少一次消费语义的消费者也很简单。
1: 设置enable.auto.commit为false,禁用自动提交offset
2: 消息处理完之后手动调用consumer.commitSync()提交offset
这种方式是在消费数据之后,手动调用函数consumer.commitSync()异步提交offset,有可能处理多次的场景是消费者的消息处理完并输出到结果库,但是offset还没提交,这个时候消费者挂掉了,再重启的时候会重新消费并处理消息,所以至少会处理一次

至多一次:at-most-once

这种语义有可能会丢失数据
至多一次消费语义是kafka消费者的默认实现。配置这种消费者最简单的方式是
1: enable.auto.commit设置为true。
2: auto.commit.interval.ms设置为一个较低的时间范围。
由于上面的配置,此时kafka会有一个独立的线程负责按照指定间隔提交offset。
消费者的offset已经提交,但是消息还在处理中(还没有处理完),这个时候程序挂了,导致数据没有被成功处理,再重启的时候会从上次提交的offset处消费,导致上次没有被成功处理的消息就丢失了。

仅一次:exactly-once

这种语义可以保证数据只被消费处理一次。 实现仅一次语义的思路如下:
1: 将enable.auto.commit设置为false,禁用自动提交offset
2: 使用consumer.seek(topicPartition,offset)来指定offset
3: 在处理消息的时候,要同时保存住每个消息的offset。以原子事务的方式保存offset和处理的消息结果,这个时候相当于自己保存offset信息了,把offset和具体的数据绑定到一块,数据真正处理成功的时候才会保存offset信息
这样就可以保证数据仅被处理一次了。

幂等性

  1. 乐观锁
  2. 数据库主键
  3. Redis的原子性