1. Kafka API

2. Producer API

Kafka的Producer发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker

06. Kafka API - 图1

2.1. 异步发送API

导入依赖

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>2.4.1</version>
  5. </dependency>

api使用

package com.atguigu.producer;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class Producer {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //实例化kafka集群
        Properties properties = new Properties();
        properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //key的序列化类
        properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //value的序列化类
        properties.setProperty("acks", "all"); //ack级别
        properties.setProperty("bootstrap.servers", "hadoop102:9092");
        properties.setProperty("buffer.memory", "33554432");//RecordAccumulator缓冲区大小
        properties.setProperty("retries", "1"); //重试次数
        properties.setProperty("batch.size", "16384");//打包大小
        properties.setProperty("linger.ms", "1");//等待时间
        //当缓冲区大小达到16384时就向broker发送一次 如果没有达到但时间已经等待了1毫秒也会发送
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
        //用集群对象发送数据
        for (int i = 0; i < 100; i++) {
            Future<RecordMetadata> fist = producer.send(
                    //封装ProducerRecord
                    new ProducerRecord<>("first", Integer.toString(i), "Value" + i), new Callback() {
                        //回调函数
                        @Override
                        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                            if (e == null) {
                                System.out.println(recordMetadata);
                            }
                        }
                    });
            System.out.println("发完了" + i + "条数据");
        }
        //关闭资源
        producer.close();
    }
}

回调函数不是必须的 也可以不传递回调函数

回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数,分别是RecordMetadata和Exception,如果Exception为null,说明消息发送成功,如果Exception不为null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

2.2. 同步发送 API

同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。

由于send方法返回的是一个Future对象,根据Futrue对象的特点,我们也可以实现同步发送的效果,只需在调用Future对象的get方发即可

package com.atguigu.producer;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class Producer {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //实例化kafka集群
        Properties properties = new Properties();
        properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //key的序列化类
        properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //value的序列化类
        properties.setProperty("acks", "all"); //ack级别
//        properties.put(ProducerConfig.ACKS_CONFIG,1); //ProducerConfig封装配置所有key
        properties.setProperty("bootstrap.servers", "hadoop102:9092");
        properties.setProperty("buffer.memory", "33554432");//RecordAccumulator缓冲区大小
        properties.setProperty("retries", "1"); //重试次数
        properties.setProperty("batch.size", "16384");//打包大小
        properties.setProperty("linger.ms", "1");//等待时间
        //当缓冲区大小达到16384时就向broker发送一次 如果没有达到但时间已经等待了1毫秒也会发送
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
//        producer.beginTransaction(); //获取事务对象
        //用集群对象发送数据
        for (int i = 0; i < 100; i++) {
            Future<RecordMetadata> fist = producer.send(
                    //封装ProducerRecord
                    new ProducerRecord<>("first", Integer.toString(i), "Value" + i), new Callback() {
                        //回调函数
                        @Override
                        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                            if (e == null) {
                                System.out.println(recordMetadata);
                            }
                        }
                    });
            RecordMetadata recordMetadata = fist.get();  //直到返回ack后 RecordMetadata 有数据了 才发下一条数据
            System.out.println("发完了" + i + "条数据");
        }
        //关闭资源
        producer.close();
    }
}

3. Consumer API

3.1. 自动提交offset

读取properties文件

bootstrap.servers=hadoop102:9092
group.id=test
enable.auto.commit=true
auto.commit.interval.ms=1000
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
auto.offset.reset=earliest
# 默认为latest从最后一条数据后拉取 earliest从开头拉取

consumer类

package com.atguigu.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.io.IOException;
import java.util.Collections;
import java.util.Properties;

public class Consumer {
    public static void main(String[] args) throws IOException, InterruptedException {
        //实例化一个Consumer对象
        Properties properties = new Properties();
        properties.load(Consumer.class.getClassLoader().getResourceAsStream("conusumer1.properties"));
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        //接受消息
        consumer.subscribe(Collections.singleton("first")); //定义话题
        while (true) {
            ConsumerRecords<String, String> poll = consumer.poll(2000); //从话题中拉取数据 2000毫秒
            if (poll.count() == 0){
               Thread.sleep(100);
            }
            for (ConsumerRecord<String, String> record : poll) {
                System.out.println(record);
            }
        }
        //关闭Consumer
//        consumer.close();
    }
}

3.2. 手动提交offset

虽然自动提交offset十分简介便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因此Kafka还提供了手动提交offset的API。

properties文件

bootstrap.servers=hadoop102:9092
group.id=test
enable.auto.commit=flase
#自动提交offset 默认为true 如果自动提交offset由broker来进行保存
auto.commit.interval.ms=1000
#多久提交一次offset
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
auto.offset.reset=earliest
# 默认为latest从最后一条数据后拉取 earliest从开头拉取
auto.commit.interval.ms=5000
#自动提交offset的时间 默认为5000毫秒

consumer类

package com.atguigu.consumer;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;

public class Consumer {
    public static void main(String[] args) throws IOException, InterruptedException {
        //实例化一个Consumer对象
        Properties properties = new Properties();
        properties.load(Consumer.class.getClassLoader().getResourceAsStream("conusumer1.properties"));
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        //接受消息
        consumer.subscribe(Collections.singleton("first")); //定义话题
        while (true) {
            ConsumerRecords<String, String> poll = consumer.poll(2000); //从话题中拉取数据 2000毫秒
            //ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(2000)); //从话题中拉取数据 2000毫秒
            if (poll.count() == 0) {
                Thread.sleep(100);
            }
            for (ConsumerRecord<String, String> record : poll) {
                System.out.println(record);
            }
//            consumer.commitSync(); //手动提交offset 同步提交
            consumer.commitAsync(new OffsetCommitCallback() {
                //回调函数
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
                    if (e !=null){
                        System.out.println("Commit failed for " + map);
                    }
                }
            }); //手动提交offset 异步提交
        }
        //关闭Consumer
//        consumer.close();
    }
}

手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次poll的一批数据最高的偏移量提交;不同点是,commitSync阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而commitAsync则没有失败重试机制,故有可能提交失败。

3.2.1. 数据漏消费和重复消费分析

无论是同步提交还是异步提交offset,都有可能会造成数据的漏消费或者重复消费。先提交offset后消费,有可能造成数据的漏消费;而先消费后提交offset,有可能会造成数据的重复消费。

06. Kafka API - 图2

解决方案: 只有将消费和提交offset进行一个原子绑定才能解决

3.3. 自定义存储offset

package com.atguigu.consumer;

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.io.*;
import java.util.*;

/*
自定义保存
 */
public class ConsumerManual {
    //用于记录top 分区
    private static Map<TopicPartition, Long> offset = new HashMap<>();
    private static String file = "d:/offset";

    public static void main(String[] args) throws IOException {
        //实例化一个Consumer对象
        Properties properties = new Properties();
        properties.load(Consumer.class.getClassLoader().getResourceAsStream("conusumer1.properties"));
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        //订阅话题 拉取消息
        consumer.subscribe(Collections.singleton("first"), new ConsumerRebalanceListener() {
            //分区分配之前做的操作
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                //提交旧的offset
                commit();
            }

            //分区分配之后做的操作
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                //获取新的offset
                readOffset(collection);
                for (TopicPartition partition : collection) {
                    Long os = offset.get(partition);
                    if (os == null) {
                        consumer.seek(partition, 0);
                    } else {
                        consumer.seek(partition, os);
                    }
                }
            }
        });
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(2000);
            //原子绑定
            for (ConsumerRecord<String, String> record : records) {
                //消费
                System.out.println(record);
                //消费完后 写入map中
                offset.put(
                        new TopicPartition(record.topic(), record.partition()), record.offset());
            }
            commit();
        }
    }

    /**
     * 从自定义介质读取offset到缓存
     *
     * @param collection
     */
    private static void readOffset(Collection<TopicPartition> collection) {
        ObjectInputStream objectInputStream = null;
        Map<TopicPartition, Long> temp;
        try {
            objectInputStream = new ObjectInputStream(new FileInputStream(file));
            temp = (Map<TopicPartition, Long>) objectInputStream.readObject();
        } catch (Exception e) {
            temp = new HashMap<>();
        } finally {
            if (objectInputStream != null) {
                try {
                    objectInputStream.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        //从全部分区offset中读取我们分配到的分区的offset
        for (TopicPartition partition : collection) {
            offset.put(partition, temp.get(partition));
        }
    }

    /**
     * 将缓存中的offset提交到自定义介质中
     */
    private static void commit() {
        //先从文件中读取旧的所有的offset
        ObjectInputStream objectInputStream = null;
        Map<TopicPartition, Long> temp;
        try {
            objectInputStream = new ObjectInputStream(new FileInputStream(file));
            temp = (Map<TopicPartition, Long>) objectInputStream.readObject();
        } catch (Exception e) {
            temp = new HashMap<>();
        } finally {
            if (objectInputStream != null) {
                try {
                    objectInputStream.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        //合并offset
        temp.putAll(offset);
        //将新的offset写出去
        ObjectOutputStream objectOutputStream = null;
        try {
            objectOutputStream = new ObjectOutputStream(new FileOutputStream(file));
            objectOutputStream.writeObject(temp);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (objectInputStream != null) {
                try {
                    objectInputStream.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

4. 自定义Interceptor(拦截器)

拦截器实现的接口是ProducerInterceptor

4.1. 使用拦截器统计消息发送成功和失败的数量

package com.atguigu.interceptor;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

/**
 * 统计消息发送成功和失败的数量
 */
public class CountInterceptor implements ProducerInterceptor<String, String> {

    private long success = 0;
    private long fail = 0;

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
        return producerRecord;
    }

    /**
     * 收到ACK后做计数
     *
     * @param recordMetadata
     * @param e
     */
    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        if (e == null) {
            success++;
        } else {
            fail++;
        }
    }

    @Override
    public void close() {
        System.out.println("成功了" + success + "条");
        System.out.println("失败了" + fail + "条");
    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

4.2. 使用拦截器 将值改为 自定义前缀 + 时间戳 + 值

package com.atguigu.interceptor;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

public class TimeInterceptor implements ProducerInterceptor<String, String> {

    //前缀
    private String prefix;

    /**
     * 自定义Record 修改时间戳可以在此方法中修改
     *
     * @param producerRecord 原始Record
     * @return 修改后的Record
     */
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
        Long timestamp = producerRecord.timestamp();
        //Record只能获取 不能修改 所有我们只重新创建一个Record 并把对应的值赋上去
        return new ProducerRecord<String, String>(
                producerRecord.topic(),
                producerRecord.partition(),
                producerRecord.timestamp(),
                producerRecord.key(),
                prefix + System.currentTimeMillis() + producerRecord.value(),
                producerRecord.headers()
        );
    }

    /**
     * 收到 ACK以后调用
     *
     * @param recordMetadata
     * @param e
     */
    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {

    }

    /**
     * 关闭Producer时调用
     */
    @Override
    public void close() {

    }

    /**
     * 定义拦截器的方法
     *
     * @param map
     */
    @Override
    public void configure(Map<String, ?> map) {
        //定义前缀
        //获取配置文件中配置值
        prefix = (String) map.get("prefix");
    }
}

4.3. 生产者调用自定义拦截器

package com.atguigu.interceptor;

import org.apache.kafka.clients.producer.*;

import java.util.ArrayList;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class Producer {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //实例化kafka集群
        Properties properties = new Properties();
        properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //key的序列化类
        properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //value的序列化类
        properties.setProperty("acks", "all"); //ack级别
        properties.setProperty("bootstrap.servers", "hadoop102:9092");
        properties.setProperty("buffer.memory", "33554432");//RecordAccumulator缓冲区大小
        properties.setProperty("retries", "1"); //重试次数
        properties.setProperty("batch.size", "16384");//打包大小
        properties.setProperty("linger.ms", "1");//等待时间

        //自定义拦截器 列表
        ArrayList<String> interceptors = new ArrayList<>();
        interceptors.add("com.atguigu.interceptor.TimeInterceptor"); //执行顺序为添加顺序
        interceptors.add("com.atguigu.interceptor.CountInterceptor"); //值为类引用路径
        properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); //添加到properties中
        //自定义前缀
        properties.setProperty("prefix","自定义前缀测试");

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
        //用集群对象发送数据
        for (int i = 0; i < 10; i++) {
            Future<RecordMetadata> fist = producer.send(
                    //封装ProducerRecord
                    new ProducerRecord<>("first", Integer.toString(i), "Value" + i), new Callback() {
                        //回调函数
                        @Override
                        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                            if (e == null) {
                                System.out.println(recordMetadata);
                            }
                        }
                    });
            RecordMetadata recordMetadata = fist.get();  //直到返回ack后 RecordMetadata 有数据了 才发下一条数据
            System.out.println("发完了" + i + "条数据");
        }
        //关闭资源
        producer.close();
    }
}