客户端开发:必要的三个参数设置,消息的发送,序列化,分区器,生产者拦截器,生产者客户端的整体架构一个正常的生产逻辑需要具备以下几个步骤:

    (1)配置生产者客户端参数及创建相应的生产者实例。
    (2)构建待发送的消息。
    (3)发送消息。
    (4)关闭生产者实例。

    2.
    image.png
    如上图所示ProducterRecord类有众多属性,topic,partition,headers,key,value,timestamp.

    topic 和partition指定消息发送的主体和分区,headers指定头部信息,可以不用设置,key指定消息的键,,value指消息体一般不为空,为空表示墓碑消息,timestamp只消息时间戳,有两种类型,一种是创建的时间,一种表示消息追加到日志文件的时间

    3.必要的参数设置

    kafkaProducer有三个必填的参数:

    bootstrap.servers:该参数表示生产者连接kafka集群所需的broker地址清单,建议最少设置两个broker,当其中一个宕机,生产者还能连接到集群

    key.serializer和value.serializer.生产者需要将消息中的key,value做响应的序列化操作来转换成字节数组,注意必须是全限定名

    client.id不设置也会有默认设置

    为了方便和写入错误,可以直接使用ProducerConfig类,此类中有很多config常量

    kafkaProducer是线程安全,可以字啊多个线程中共享这个kafkaProducer实例.

    4.消息的发送

    ProducerRecord有多个构造,如上图展示的这个类,有多个属性,topic和value是必填项

    发送消息有三种模式:同步sync,异步async,发后即忘fire-and forget

    send(record)为发后即忘,在有些情况下可能会造成消息的丢失,其性能最高,可靠性最差

    可以使用send().get()获取发送消息的结果,此方法可以阻塞kafka的响应,知道消息发送成功或异常.如下图,启动后控制台没有继续打印,因为被阻塞了
    image.png
    也可以使用下面的方法,获取更多的信息
    image.png
    future表示提供了一个任务的生命周期,使用send().get(Long time ,TimeUit) 可以实现自定义超时阻塞
    KafkaProducer中一般会发生两种类型的异常:可重试的异常和不可重试的异常
    对于可重试的异常,如果配置了 retries 参数,那么只要在规定的重试次数内自行恢复了,就不会抛出异常。retries参数的默认值为0,其配置方式如下:
    image.png
    同步发送的方式可靠性高,要么消息被发送成功,要么发生异常。如果发生异常,则可以捕获并进行相应的处理,而不会像“发后即忘”的方式直接造成消息的丢失。不过同步发送的方式的性能会差很多,需要阻塞等待一条消息发送完之后才能发送下一条

    异步发送方式:

    在send(),方法加入newCallBack{}接口参数,作为消费响应会回调,确认等.

    示例代码中遇到异常时(exception!=null)只是做了简单的打印操作,在实际应用中应该使用更加稳妥的方式来处理,比如可以将异常记录以便日后分析,也可以做一定的处理来进行消息重发。onCompletion()方法的两个参数是互斥的,消息发送成功时,metadata 不为 null 而exception为null;消息发送异常时,metadata为null而exception不为null。
    image.png

    close()方法用于发送结束后关闭资源,其有两个重载方法,另外一个如下图,可以设置超时执行关闭.在实际应用中都使用无参的方法
    image.png
    5.序列化

    除了用于String类型的序列化器,还有ByteArray、ByteBuffer、Bytes、Double、Integer、Long这几种类型,它们都实现了org.apache.kafka.common.serialization.Serializer接口

    如下图自定义序列化器,company对象

    如何使用自定义的序列化器CompanySerializer呢?只需将KafkaProducer的value.serializer参数设置为CompanySerializer类的全限定名即可。
    image.png
    6.分区器

    消息的send()发送broker节点时,需要经过拦截器,序列化器分区器,当消息经过序列化器后就要知道分区器了,这样消息才会知道发送那个分区
    如果消息ProducerRecord中指定了partition字段,那么就不需要分区器的作用,因为partition代表的就是所要发往的分区号。
    如果消息ProducerRecord中没有指定partition字段,那么就需要依赖分区器,根据key这个字段来计算partition的值。分区器的作用就是为消息分配分区

    默认使用的DefaultPartitioner,

    注意:如果 key 不为 null,那么计算得到的分区号会是所有分区中的任意一个;如果 key为null,那么计算得到的分区号仅为可用分区中的任意一个,注意两者之间的差别。

    还可以使用自定义的分区器,只需同DefaultPartitioner一样实现Partitioner接口即可
    image.png
    7.生产者拦截器

    ,Kafka一共有两种拦截器:生产者拦截器和消费者拦截器。

    生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。

    生产者拦截器的使用也很方便,主要是自定义实现org.apache.kafka.clients.producer.ProducerInterceptor接口
    image.png
    KafkaProducer在将消息序列化和计算分区之前会调用生产者拦截器的onSend()方法来对消息进行相应的定制化操作。一般来说最好不要修改消息 ProducerRecord 的 topic、key 和partition 等信息,如果要修改,则需确保对其有准确的判断,否则会与预想的效果出现偏差。比如修改key不仅会影响分区的计算,同样会影响broker端日志压缩(Log Compaction)的功能。

    KafkaProducer 会在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的 onAcknowledgement()方法,优先于用户设定的 Callback 之前执行。

    在这 3 个方法中抛出的异常都会被捕获并记录到日志中,但并不会再向上传递。
    image.png

    image.png
    KafkaProducer中不仅可以指定一个拦截器,还可以指定多个拦截器以形成拦截链。拦截链会按照 interceptor.classes 参数配置的拦截器的顺序来一一执行(配置的时候,各个拦截器之间使用逗号隔开)。

    如果拦截链中的某个拦截器的执行需要依赖于前一个拦截器的输出,那么就有可能产生“副作用”。设想一下,如果前一个拦截器由于异常而执行失败,那么这个拦截器也就跟着无法继续执行。在拦截链中,如果某个拦截器执行失败,那么下一个拦截器会接着从上一个执行成功的拦截器继续执行。

    8.原理分析

    生产者客户端的整体架构

    整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和Sender线程(发送线程)。在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender 线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。

    RecordAccumulator 主要用来缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。
    image.png

    9.重要的生产者参数,acks,max.request.size,retries和retry.backoff.ms,compression.type,connections.max.idle.ms,linger.ms,receive.buffer.bytes,send.buffer.bytes,request.timeout.ms
    acks:这个参数用来指定分区中必须要有多少个副本收到这条消息,之后生产者才会认为这条消息是成功写入的。acks 是生产者客户端中一个非常重要的参数,它涉及消息的可靠性和吞吐量之间的权衡

    ①:acks参数有3种类型的值:

    · acks=1。默认值即为1。生产者发送消息之后,只要分区的leader副本成功写入消息,那么它就会收到来自服务端的成功响应acks设置为1,是消息可靠性和吞吐量之间的折中方案。

    · acks=0。生产者发送消息之后不需要等待任何服务端的响应。如果在消息从发送到写入Kafka的过程中出现某些异常,导致Kafka并没有收到这条消息,那么生产者也无从得知,消息也就丢失了。在其他配置环境相同的情况下,acks 设置为 0 可以达到最大的吞吐量。

    · acks=-1或acks=all。生产者在消息发送之后,需要等待ISR中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。

    注意:acks设置的值是字符型,”1”,”0””-1”

    ②:max.request.size

    这个参数用来限制生产者客户端能发送的消息的最大值,默认值为 1048576B,即 1MB。一般情况下,这个默认值就可以满足大多数的应用场景了

    注意:不建议读者盲目地增大这个参数的配置值,尤其是在对Kafka整体脉络没有足够把控的时候。因为这个参数还涉及一些其他参数的联动,比如broker端的message.max.bytes参数,如果配置错误可能会引起一些不必要的异常。比如将broker端的message.max.bytes参数配置为10,而max.request.size参数配置为20,那么当我们发送一条大小为15B的消息时

    ③:retries和retry.backoff.ms

    retries参数用来配置生产者重试的次数,默认值为0,即在发生异常的时候不进行任何重试动作。

    重试还和另一个参数retry.backoff.ms有关,这个参数的默认值为100,它用来设定两次重试之间的时间间隔,避免无效的频繁重试

    ④:compression.type

    这个参数用来指定消息的压缩方式,默认值为“none”,即默认情况下,消息不会被压缩。该参数还可以配置为“gzip”“snappy”和“lz4”。对消息进行压缩可以极大地减少网络传输量、降低网络I/O,从而提高整体的性能。消息压缩是一种使用时间换空间的优化方式,如果对时延有一定的要求,则不推荐对消息进行压缩。

    ⑤:.connections.max.idle.ms

    这个参数用来指定在多久之后关闭限制的连接,默认值是540000(ms),即9分钟。

    ⑥:.linger.ms

    这个参数用来指定生产者发送 ProducerBatch 之前等待更多消息(ProducerRecord)加入ProducerBatch 的时间,默认值为 0。生产者客户端会在 ProducerBatch 被填满或等待时间超过linger.ms 值时发送出去。增大这个参数的值会增加消息的延迟,但是同时能提升一定的吞吐量。这个linger.ms参数与TCP协议中的Nagle算法有异曲同工之妙。

    ⑦:receive.buffer.bytes

    这个参数用来设置Socket接收消息缓冲区(SO_RECBUF)的大小,默认值为32768(B),即32KB。如果设置为-1,则使用操作系统的默认值。如果Producer与Kafka处于不同的机房,则可以适地调大这个参数值。

    ⑧send.buffer.bytes

    这个参数用来设置Socket发送消息缓冲区(SO_SNDBUF)的大小,默认值为131072(B),即128KB。与receive.buffer.bytes参数一样,如果设置为-1,则使用操作系统的默认值。

    ⑨:.request.timeout.ms

    这个参数用来配置Producer等待请求响应的最长时间,默认值为30000(ms)。请求超时之后可以选择进行重试。注意这个参数需要比broker端参数replica.lag.time.max.ms的值要大,这样可以减少因客户端重试而引起的消息重复的概率。
    image.pngimage.png

    ————————————————
    版权声明:本文为CSDN博主「sunhyly」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/ashylya/article/details/105076727