1. acquire:获得、取到 Concurrent:并发

1. 主流的应用场景(其它MQ同理)

1.1 数据异步化

1.1.1 什么是异步化

  1. 异步化的原理同理为多线程中的异步,例如下面的例子
    1. 一个非常大的请求量,可能是几千万上亿次的请求,我们把这些请求封装成一条一条的message
    2. 然后把它投递到MQ,具体的消息可能就是一个一个指令,比如我想发邮件、计算、入库之类的操作
    3. 通过消息中间件,让我们的Consumer一条一条的消化

      1.1.2 架构分析

      image.png
  • 消息持久化:可以防止我们的消息发送失败,我们可以重推送,保证我们的消息触达率(这个一般是重要消息,具体参考Rabbit MQ的学习过程,因为Rabbit适合中小型公司做数据异步化消息中间件)

    1.2 服务解耦、削峰填谷

    1.2.1 概念理解

  1. 服务解耦:异步化了之后,就是不用再担心别的服务的问题
  2. 削峰填谷:MQ天然的就有这种缓存的机制,把消息缓存到 MQ borker 里面,然后我们去慢速的消费,高峰期的时候可以去做一个削峰,低谷期的时候做一个填谷,使我们的处理能力,相对来说更均匀一些

    1.2.2 架构分析

    image.png

    1.3 海量日志收集(kafka特有)

    image.png

  3. 海量日志收集:注意第阶段的日志收集实战,自己总结

  4. 上图的架构解析:
    1. application:是我们的服务、集群
    2. app.log:是一个全量的日志
    3. error.log:错误日志,单独拆开的原因是,要让业务任务清晰的看见问题原因
    4. filebeat:日志抓取文件,go语言写的,将我们需要的文件推送到kafka
    5. logstash:对我们的日志文件做解析,传递到ES

      1.4 数据同步

      1.5 实时计算分析(特有)

      2. 环境搭建

  • 下载kafka安装包:https://archive.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz

    2.1 服务端配置说明

    参数文件:$KAFKA_HOME/config/server.properties
    zookeeper.connect:需要连接的zk集群地址,包含ip:port
    listeners:指明kafka需要监听的客户端
    broker.id:指定kafka集群中broker的唯一标志,默认值是-1,如果没有指定会默认生成一个
    log.dir、log.dirskafka所有的消息都是需要保存到磁盘上的,这两个就是指定存储文件目录
    message.max.bytesbroker所能接收的最大消息值,默认是1m

    2.2 搭建zookeeper集群

    zookeeper

    2.3 搭建kafka集群

    ```makefile

    可以直接在zk集群的三台服务器上搭建kafka集群

下载kafka安装包

https://archive.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz

上传文件到服务器,或者上传一个之后使用copy命令

/home/software/kafka scp kafka_2.12-2.1.0.tgz 192.168.231.132:/home/software/kafka

解压到指定目录

tar -zxvf kafka_2.12-2.1.0.tgz.gz -C /usr/local/

修改解压后的文件名称

cd /usr/local mv kafka_2.12-2.1.0 kafka_2.12

进入配置文件目录

vim /usr/local/kafka_2.12/config/server.properties

修改server.properties文件,指定kafka集群中broker的唯一标志,默认值是-1,如果没有指定会默认生成一个

broker.id=0

修改server.properties文件,需要连接的zk集群地址,包含ip:port

zookeeper.connect=192.168.11.221:2181,192.168.11.222:2181,192.168.11.223:2181

修改server.properties文件,修改端口

port=9092

修改server.properties文件,指明kafka需要监听的客户端

listeners:

修改server.properties文件,一般写本机地址

host.name=192.168.11.221

修改server.properties文件,所有的消息都是需要保存到磁盘上的,这两个就是指定存储文件目录,配置的时候配置一个就行了,dirs如果有多个磁盘可以写多个文件分开存储

log.dir=/usr/local/kafka_2.12/kafka-logs log.dirs=/usr/local/kafka_2.12/kafka-logs

修改server.properties文件,broker所能接收的最大消息值,默认是1m

message.max.bytes=10302430

修改server.properties文件,新建一个topic默认的分区数,默认是1

num.partitions=5

创建kafka存储消息(log日志数据)的目录

mkdir /usr/local/kafka_2.12/kafka-log

到此为止,kafka已经配置成功,进入到kafka的bin目录,执行启动命令,启动kafka

/usr/local/kafka_2.12/bin/kafka-server-start.sh /usr/local/kafka_2.12/config/server.properties &

  1. ```makefile

2.4 搭建kafka manager可视化控制台

  1. ## 这个在一台服务器上安装就行,因为集群信息已经互通,假设
  2. 192.168.11.222
  3. ## 把jar包(kafka manager)上传到 服务器
  4. /home/software/kafka/kafka-manager-2.0.0.2.zip
  5. ## 解压到指定目录
  6. unzip kafka-manager-2.0.0.2.zip -d /usr/local/
  7. ## 修改配置文件
  8. vim /usr/local/kafka-manager-2.0.0.2/conf/application.conf
  9. ## 配置zk的地址
  10. kafka-manager.zkhosts="192.168.11.221:2181,192.168.11.222:2181,192.168.11.223:2181"
  11. ## 启动
  12. /usr/local/kafka-manager-2.0.0.2/bin/kafka-manager &
  1. ## 6.4 浏览器访问控制台:默认端口号是9000
  2. http://192.168.11.222:9000/
  3. ## 6.5 添加Cluster集群
  4. ## 7 集群验证:
  5. ## 7.1 通过控制台创建了一个topic为"test" 2个分区 1个副本
  6. ## 7.2 消费发送与接收验证
  7. cd /usr/local/kafka_2.12/bin
  8. ## 启动发送消息的脚本
  9. ## --broker-list 192.168.11.221 指的是kafka broker的地址列表
  10. ## --topic test 指的是把消息发送到test主题
  11. kafka-console-producer.sh --broker-list 192.168.11.221:2181 --topic test
  12. ## 启动接收消息的脚本
  13. kafka-console-consumer.sh --bootstrap-server 192.168.11.221:9092 --topic test

3. 核心概念讲解

3.1 集群架构解析

image.png

  1. 上图是kafka最基本的全局上的一个架构模型

    1. 生产者把消息发送到kafka集群中,消费者从集群中拉取消息进行消费
    2. kafka中的元数据通过zookeeper进行关系

      3.2 topicpartitionoffset

      image.png
  2. 上图是一个kafka broker的详情图

  3. 对比Rabbit MQ的个人理解
    1. topic就是exchangepartition就是queue
    2. 生产者把消息发送到topicA中某个partition,消费者订阅topicA从而拉取其中的消息进行消费
  4. 主题topic
    1. 是一个逻辑上的概念不是一个物理存储的概念,这样一解释更像exchange
  5. 分区partition
    1. 起始位置的offset0
    2. 暂时定位一个物理存储概念
    3. 一个topic对应多个partition,每个分区里面包含不同的消息
    4. 在存储的结构上可以理解为一个可以追加日志的log文件
    5. topic如何将消息投递到对应的partition
      1. partition策略,例如hash策略
  6. 偏移量(offset
    1. 偏移量,就是上图的0,1,2,3.......
    2. 在消息的partition中是一个唯一的标识
    3. kafka通过offset保证消息在partition内是顺序的
  7. 消息的填充过程(发送消息)
    1. 消息都会被一个一个填充到我们的partition里面,也可以理解为追加到一个一个日志文件里面
    2. 这时候都会分配一个偏移量offset,对于这个offset它是一个顺序写入的过程 0,1,2...
  8. 消息的找到过程(拉取消息)
    1. 订阅topic,在确定是哪个partition
    2. 在消息的partitionoffset是一个唯一的标识,通过offset找到对应的消息
  9. 如何保证消息的有序
    1. 我们消费的时候,一个消费者、一个线程、一个进程只连接一个分区
  10. 其它的相关逻辑

    1. kafka可以保证分区是有序的,但是没法确定主题是有序的,因为主题就是一个逻辑的概念
    2. 每一条消息在发送到我们的kafka broker 之前,会根据一个分区的规则(例如hsah规则),或者分区的选择来确认被发送到那个分区上(这个就是分区器的作用,kafka自己也有一个默认的分区器,可以通过你路由规则变成一个0,1,2...分区号,把消息打到指定的分区上)(这一点非常像Rabbit MQ

      3.3 副本replica

      image.png
  11. broker1broker2:指的是一个一个物理机器集群

  12. borker1-p1broker2-p1:相互的数据是完全一样的,防止某台机器挂掉了,数据仍然保证完整性(类似于es的副本分片)
  13. 同一时刻分片数据不一致,因为数据同步需要时间(虽然很短)
  14. 绿色的是leader分片会对外提供读写功能,紫色的follower分区,会不断的向leader拉取数据,只有当leader故障了,follower才会对外提供服务
  15. 总结:空间(存储空间)换取数据的高可靠性:不管是kafka还是es,只要是分布式的架构,都会使用这种方式来保证数据高可靠性,是一个常规的解决方式

    3.4 HwLEO

    image.png

  16. High Watermark:消费者最多拉取到高水位线的消息(拉取 < hw,注意不能等于)

  17. Log End Offset:日志文件最后一条记录的offset,例如:一个partition100条数据,最后一条的偏移量就是99,这个99就是LEO,也就是说leo = offset

    3.5 同步副本机制ISR

    image.png
    image.png

  18. ISR机制保证主从分区的数据一致性

  19. 两个时间概念
    1. 拉取时间:followerleader拉取数据花费的时间
    2. 容忍时间:followerleader拉取数据需要一定的时间,kafka可以设置一个容忍时间
  20. 上图的流程机制
    1. p1收到写入请求 - s1 s2会从p1拉取数据
    2. 如果拉取时间 < 容忍时间,则副本会进入到ISR集合
    3. 否则会进入out sync replicas OSR集合,这时候leader已经放弃s2这个副本
    4. 后续看图二,如果s2追上进度,就会被从新拉进ISR集合
  21. leader挂掉的后续

    1. 只有ISR集合的副本才能竞争成为leader的机会

      3.6 同步副本的时候HwLEO的变化

      | image.png | image.png | image.png | | —- | —- | —- |
  22. 拉取策略(注意是默认策略)

    1. 图一:当前一段时间没有数据写入hw = leo
    2. 图二:maste写入消息后,slave正在拉取数据,这时候可以看出因为其它副本没有同步完成数据,那么Hw就只能等于原来的最大数据量
    3. 图三:只有当所有的数据在每个分片上同步完成,hw才会增加
  23. 拉取策略是可以设置的(默认是全部副本)

    1. 比如设置半数以上的副本成功同步了数据,Hw就等于一次同步的发生时候的LEO

      3.7 leader选举机制

      3.7.1 核心控制器controller

  24. kafka的所有Broker都会注册到kafka集群中去,kafka集群会选举一个Broker作为leader作为kafka七群的总控制器controller,负责管理整个集群所有分区partition和副本follower的状态

    leader和follower数据同步

    如果leader写入成功follwer还没写入时,leader就挂掉了,此时重新选举leader,新的leader自然没有这条信息。这种情况下,如果原有leader恢复正常,成为follwer节点,会把之前写入的那条信息同步给新leader

    4. 生产者/消费者快速入门

    Tips:结合代码 + 笔记 + 源码学习生产者和消费者的内容

4.1 生产者

4.1.1 简单流程

  1. 配置生产者启动的关键参数
  2. 创建kafka生产者对象
  3. 发送消息
  4. 关闭资源

    4.1.2 关键对象解析

    (1) KafkaProducer<String, String>

  • KafkaProducer是线程安全的,KafkaConsumer不是线程安全的
  • 看代码quickstart + 源码

    (2) ProducerRecord<K, V>

    1. public class ProducerRecord<K, V> {
    2. private final String topic; // 对应的主题
    3. private final Integer partition; // 分区的数量
    4. private final Headers headers; // 自定义的一些属性
    5. private final K k; // 在关键参数中设置的,可以用来判断进入哪个分区
    6. private final V v; // 具体的消息内容
    7. private final Long timestamp; // 消息具体发送的时间
    8. }

    5.1.3 消息重试机制

  1. 配置:查看代码,默认不重试
  2. 可重试异常:网络抖动
  3. 不可重试异常:宕机、消息格式不正确…..

    4.1.4 生产者重要参数

    Tips:其它参数,直接项目代码

(1) acks

  1. 含义:发送消息后,broker端至少多少个副本接受到消息(这里的副本包含leader``follower)才给生产端回应消息
  2. acks = 1:默认情况,leader副本收到消息,就给生产端回应成功
  3. acks = 0:生产者发送完成消息后,不需要收到任何消息(不需要任何可靠性消息投递保证)
  4. acks = -1、acks = a:生产者在消息发送之后,需要ISR中所有副本都成功写入消息之后,才能收到来之broker端的相应()。
    1. 这种情况不能保证消息百分之百的投递成功
    2. 因为可能ISR可能也会出现问题,导致信息没有同步,无法返回响应,可以配合最小副本同步数min.insync.replicas = 1,比如同步一个副本就可以

      (2) max.request.size

  • 用来限制生产者客户端能发送的消息的最大值

    (3) retriesretry.backoff.msretries

  • 重试次数和重试间隔

    (4) compression.type

  • 指定消息的压缩方式,默认值为none,可选择gizp``snappy``lz4

    (5) connections.max.idle.ms

  • 指定多久之后关闭限制的连接

    (6) linger.ms、batch.size、buffer.memory

  1. 这三个是调优相关的
  2. linger.ms:例如设置100毫秒(100是经验设置,要记住),这100毫秒之内的消息都是在内存中的,等到100毫秒的时候一起发送;也可以成为延迟发送、批量发送
  3. batch.size:累计多少条消息,则进行一次批量发送
  4. buffer.memory:缓存提升性能参数,默认32M
  5. 三个参数的配合情况
    1. 前两条的存储的信息都是buffer.memory
    2. 背景:linger.ms = 100msbatch.size = 100
      1. 100ms到了,累计了30条消息,也是要发送一次
      2. 刚到50ms,累计了100条,也要发送一次
      3. 结论:只要满足一条就发送

下面的前两条:如果消息确实大,调整下一
image.png

4.1.4 拦截器

Tips:生产者和消费者都有拦截器

  1. 看生产者源码就知道在发送消息(doSend)之前有一个拦截器,我们就是需要实现这个拦截器image.png
  2. 实现这个org.apache.kafka.clients.producer.ProducerInterceptor接口,具体看项目代码
  3. 配置实现的拦截器properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomProducerInterceptor.class.getName())

    4.1.5 序列化问题

    broker只会接收二进制数据,所以消息的发送和接收需要做序列化和反序列化

  4. 一些基本的、简单的数据类型kafka已经提供了序列化和反序列化机制,所以前面所有的demo演示中,我们将对象转换为JSON数据其实也就是字符串,kafka自带的序列化工具序列化字符串

  5. 特殊对象,例如自定义的对象,需要我们自己实现序列化/反序列化机制
  6. 具体实现查看代码即可(注意:直接参考已经实现的StringSerializer
  7. 理解之后其实通过转JSON然后使用默认String序列化工具也是ok

    4.1.6 分区器

  8. 作用:就是对消息进行分区的,消息到底是落到那个分区他来决定

  9. 如果消息的partition属性带有值,则按照指定的分区,没有的话需要通过分区器来进行分区
  10. 有一个默认的分区器DefaultPartitioner
  11. 自定义分区器,只需要实现Partitioner接口即可,其中的partition方法的参数介绍
    1. @param topic 主题名称
    2. @param key 要分区的键(如果没有键,则为 null)
    3. @param keyBytes 序列化的要分区的键(如果没有键,则为 null)
    4. @param value 要分区的值或 null
    5. @param valueBytes 序列化要分区的值或 null
    6. @param cluster 当前集群元数据
  12. 实际工作中分区器如何使用
    1. 例如想要不同的消息进入不同的分区
      1. 例如商品topic,有4个分区(鞋子、裤子、上衣、内衣),根据@param value或者其他任意参数,让他们进入属于自己的分区
      2. 例如我们理算接收不同的调用方,核心进入a分区,中台进入b分区….
    2. 顺序消费问题
      1. 按照a例子属于自己的进入自己的分区,一个消费者只能消费一个分区,实现顺序消费
      2. 问题:消费者如何只接受某一个分区的消息?可以在拦截器进行拦截

4.1.7 生产者发消息到broker的流程

image.png

  1. ProducerRecord把信息包装成消息,由KafkaProducer.send()发送出去
  2. 经过拦截器也就是onSend()方法,拦截器可能自由的做一些业务
  3. serializer、partition
    1. 经过序列化,key、value都需要序列化
    2. 有key通过key进行hash计算出一个分区,没有指定key也会通过算法计算出分区
      1. 如果带有partition的属性值,就需要走分区器进行分区
      2. 如果指定的partition的值,则按照指定的来

4.2 消费者流程

4.2.1 简单流程

  1. 配置消费者启动关键参数
  2. 创建kafka消费者对象
  3. 订阅相关主题
  4. 拉取消息,并进行消费处理
  5. 提交消费偏移量,关闭消费者

    4.2.2 消费者、消费者组的概念

    image.png

  6. 几个消费者联合起来就是消费者组

  7. 一个消费者组内的消费者同时订阅一个topic
    1. 消费者的数量 = 分区的数量
      1. 一个分区只能被一个消费组中的一个消费者消费,例如:不能说coc1同时消费p0(这时kafka的规定)
    2. 消费者的数量 > 分区的数量
      1. 例如有4个分区,但是一个消费者组有5个消费者,那么安装规定其中一个消费者什么都不敢,因为如果干了就违反一对一的消费
    3. 消费者的数量 < 分区的数量
      1. 比如2个消费者,4个分区,那么可能1个消费者分到3个分区,一个分到1个分区,总之是把分区分配完毕,不能违反规定
  8. 不同消费组的消费者订阅同一topic,相当于广播模式
  9. 一个消费者最内的消费者都是点对点的意思
  10. 一个消费者组看成一个整体,然后不同的消费者组就是发布订阅模型

    4.2.3 消费者重要参数

    Tips:其它参数,查看项目代码

(1) assign、subscribe

  • assign:只订阅主题下的一个、多个分区
  • subscribe:订阅主题,支持支持集合、正则表达式 ```java // 对于consume消息的订阅 subscribe方法 :可以订阅一个 或者 多个 topic consumer.subscribe(Collections.singletonList(Const.TOPIC_CORE)); // 也可以支持正则表达式方式的订阅 consumer.subscribe(Pattern.compile(“topic-.*”));

// 可以指定订阅某个主题下的某一个、多个 partition TopicPartition tp1 = new TopicPartition(Const.TOPIC_CORE, 0); TopicPartition tp2 = new TopicPartition(Const.TOPIC_CORE, 2); consumer.assign(Arrays.asList(tp1, tp2);

// 如何拉取主题下的所有partition,然后订阅相关的partition List tpList = new ArrayList(); List tpinfoList = consumer.partitionsFor(Const.TOPIC_CORE); for(PartitionInfo pi : tpinfoList) { System.err.println(“主题:”+ pi.topic() +”, 分区: “ + pi.partition()); tpList.add(new TopicPartition(pi.topic(), pi.partition())); } consumer.assign(tpList); ```

(2) auto_offset_reset_conifg

  1. AUTO_OFFSET_RESET_CONFIG 有三种方式: "latest", "earliest", "none"
  2. none:不设置,存储在kafka的本地文件的信息
  3. latest:默认,从一个分区的最后提交的offset开始拉取消息
  4. earliest:从最开始的起始位置拉取消息 0
  5. properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "");

    (3) fetch.min.bytes

  • 一次拉取的最小数据量,默认1B

    (4) fetch.max.bytes

  • 一次拉取的最大数据量,默认50M

    (5) max.partition.fetch.bytes

  • 一次fetch请求,从分区中取取得的record最大值,默认1M

    (6) fetch.max.wait.ms

  • fetch请求发送给broker之后,在broker中可能被阻塞的时长,默认500ms

    (7) max.pull.records

  • consumer每次调用pull时候,最大拉取的records数量,默认500

    4.2.4 消费者的手工提交

  1. 设置这个参数:ConsumerConfig.enable.auto.commit.config = false
  2. 提交方式:
    1. 消息级别:同步提交、异步提交
    2. 分区级别:同步提交、异步提交
    3. 整体(代表一次拉取的所有消息):整体提交、分区提交
  3. offset不提交会有什么现象?

    1. kafka没有识别到offset位移,所以认为消息没有被消费,消费者能一直获取到相同的消息

      4.2.5 在均衡

      一个消费组新增消费者的时候(前提是消费者 < 分区数),其它消费者会分配一些任务给新的消费者

      4.2.6 消费者多线程

  4. KafkaProducer是线程安全的,KafkaConsumer不是线程安全的

  5. KafkaConsumer中定义了一个acquire()方法用来检测是否只有一个线程在操作,如果有其它线程操作则会抛出ConcurrentModificationException(并发修改异常)
  6. KafkaConsumer在执行所有的动作的时候都会先执行acquire()方法检测是否线程安全

    思考:消费者线程不安全,那么单线程如何解决消息堆积问题呢? 解析:实现消费者的多线程,这个也是解决消息堆积的一种方式:生产者消息发送太快了,消费者又是单线程 的可能会造成消息堆积�

  7. 既然消费者是单线程的,那么消费者如何实现多线程呢?

借用消费者组内的规则:一个partition只能被一个Consumer消费,那我就把partition和consumer的数量对其
image.png

5. 与SpringBoot整合使用

生产者
消费者
界面命令使用

6. 海量日志搜集ELK