kafka入门
第1集 什么是MQ消息中间件和应用场景
简介:介绍什么是MQ消息中间件和应用场景
- 什么是MQ消息中间件
- 全称MessageQueue,主要是用于程序和程序直接通信,异步+解耦
- 使用场景:
- 核心应用
- 解耦:订单系统-》物流系统
- 异步:用户注册-》发送邮件,初始化信息
- 削峰:秒杀、日志处理
- 跨平台 、多语言
- 分布式事务、最终一致性
- RPC调用上下游对接,数据源变动->通知下属
- 核心应用
第2集 JMS消息服务和和常见核心概念介绍
简介:讲解什么是AMQP和JMS消息服务
- 什么是JMS: Java消息服务(Java Message Service),Java平台中关于面向消息中间件的接口
- JMS是一种与厂商无关的 API,用来访问消息收发系统消息,它类似于JDBC(Java Database Connectivity)。这里,JDBC 是可以用来访问许多不同关系数据库的 API
- 是由Sun公司早期提出的消息标准,旨在为java应用提供统一的消息操作,包括create、send、receive
- JMS是针对java的,那微软开发了NMS(.NET消息传递服务)
- 特性
- 面向Java平台的标准消息传递API
- 在Java或JVM语言比如Scala、Groovy中具有互用性
- 无需担心底层协议
- 有queues和topics两种消息传递模型
- 支持事务、能够定义消息格式(消息头、属性和内容)
- 常见概念
- JMS提供者:连接面向消息中间件的,JMS接口的一个实现,RocketMQ,ActiveMQ,Kafka等等
- JMS生产者(Message Producer):生产消息的服务
- JMS消费者(Message Consumer):消费消息的服务
- JMS消息:数据对象
- JMS队列:存储待消费消息的区域
- JMS主题:一种支持发送消息给多个订阅者的机制
- JMS消息通常有两种类型:点对点(Point-to-Point)、发布/订阅(Publish/Subscribe)
基础编程模型
Kafka
- Kafka是最初由Linkedin公司开发,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目,也是一个开源【分布式流处理平台】,由Scala和Java编写,(也当做MQ系统,但不是纯粹的消息系统)
- open-source distributed event streaming platform
- 核心:一种高吞吐量的分布式流处理平台,它可以处理消费者在网站中的所有动作流数据。
- 比如 网页浏览,搜索和其他用户的行为等,应用于大数据实时处理领域
- Kafka是最初由Linkedin公司开发,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目,也是一个开源【分布式流处理平台】,由Scala和Java编写,(也当做MQ系统,但不是纯粹的消息系统)
- 官网:http://kafka.apache.org/
- 快速开始:http://kafka.apache.org/quickstart
- 快速认识概念
- Broker
- Kafka的服务端程序,可以认为一个mq节点就是一个broker
- broker存储topic的数据
- Producer生产者
- 创建消息Message,然后发布到MQ中
- 该角色将消息发布到Kafka的topic中
- Consumer消费者:
- 消费队列里面的消息
第2集 【重要】分布式流处理平台Kafka核心概念介绍
简介:介绍分布式流处理平台kafka核心概念解释
- 核心概念
- Broker
- Kafka的服务端程序,可以认为一个mq节点就是一个broker
- broker存储topic的数据
- Producer生产者
- 创建消息Message,然后发布到MQ中
- 该角色将消息发布到Kafka的topic中
- Consumer消费者:
- 消费队列里面的消息
- ConsumerGroup消费者组
- 同个topic, 广播发送给不同的group,一个group中只有一个consumer可以消费此消息
- Topic
- 每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic,主题的意思
- Partition分区
- kafka数据存储的基本单元,topic中的数据分割为一个或多个partition,每个topic至少有一个partition,是有序的
- 一个Topic的多个partitions, 被分布在kafka集群中的多个server上
- 消费者数量 <=小于或者等于Partition数量
- Replication 副本(备胎)
- 同个Partition会有多个副本replication ,多个副本的数据是一样的,当其他broker挂掉后,系统可以主动用副本提供服务
- 默认每个topic的副本都是1(默认是没有副本,节省资源),也可以在创建topic的时候指定
- 如果当前kafka集群只有3个broker节点,则replication-factor最大就是3了,如果创建副本为4,则会报错
- ReplicationLeader、ReplicationFollower
- Partition有多个副本,但只有一个replicationLeader负责该Partition和生产者消费者交互
- ReplicationFollower只是做一个备份,从replicationLeader进行同步
- ReplicationManager
- 负责Broker所有分区副本信息,Replication 副本状态切换
- offset
- 每个consumer实例需要为他消费的partition维护一个记录自己消费到哪里的偏移offset
- kafka把offset保存在消费端的消费者组里
- Broker
- 特点总结
- 多订阅者
- 一个topic可以有一个或者多个订阅者
- 每个订阅者都要有一个partition,所以订阅者数量要少于等于partition数量
- 高吞吐量、低延迟: 每秒可以处理几十万条消息
- 高并发:几千个客户端同时读写
- 容错性:多副本、多分区,允许集群中节点失败,如果副本数据量为n,则可以n-1个节点失败
- 扩展性强:支持热扩展
- 多订阅者
- 基于消费者组可以实现:
- 基于队列的模型:所有消费者都在同一消费者组里,每条消息只会被一个消费者处理
- 基于发布订阅模型:消费者属于不同的消费者组,假如每个消费者都有自己的消费者组,这样kafka消息就能广播到所有消费者实例上
kafka启动
关闭防火墙#关闭防火墙
systemctl stop firewalld.service
#查看防火墙状态
systemctl status firewalld.service
#关闭开机启动防火墙
systemctl disable firewalld.service
启动zookpeer
#启动zookpeer
bin/zkServer.sh start
#查看节点状态
./zkServer.sh status
#停止节点
./zkServer.sh stop
启动kafka
#启动
./kafka-server-start.sh ../config/server.properties &
#守护进程方式启动
./kafka-server-start.sh -daemon ../config/server.properties &
#停止
./kafka-server-stop.sh
kafka命令
#创建topic
./kafka-topics.sh --create --zookeeper 192.168.200.200:2181 --replication-factor 1 --partitions 1 --topic topic-name
#查看topic
./kafka-topics.sh --list --zookeeper 192.168.200.200:2181
#生产者发送消息
./kafka-console-producer.sh --broker-list 192.168.200.200:9092 --topic topic-name
#消费者消费消息 ( --from-beginning:会把主题中以往所有的数据都读取出来, 重启后会有这个重复消费)
./kafka-console-consumer.sh --bootstrap-server 192.168.200.200:9092 --from-beginning --topic topic-name
#删除topic
./kafka-topics.sh --zookeeper 192.168.200.200:2181 --delete --topic topic-name
#查看broker节点topic状态信息
./kafka-topics.sh --describe --zookeeper 192.168.200.200:2181 --topic topic-name
点对点模式
订阅消费模型#创建topic, 1个分区
./kafka-topics.sh --create --zookeeper 192.168.200.200:2181 --replication-factor 1 --partitions 2 --topic topic-name
#指定配置文件启动 两个消费者
./kafka-console-consumer.sh --bootstrap-server 192.168.200.200:9092 --from-beginning --topic t1 --consumer.config ../config/consumer.properties
#创建topic, 2个分区
./kafka-topics.sh --create --zookeeper 192.168.200.200:2181 --replication-factor 1 --partitions 2 --topic topic-name
#指定配置文件启动 两个消费者
./kafka-console-consumer.sh --bootstrap-server 192.168.200.200:9092 --from-beginning --topic t1 --consumer.config ../config/consumer-1.properties
./kafka-console-consumer.sh --bootstrap-server 192.168.200.200:9092 --from-beginning --topic t1 --consumer.config ../config/consumer-2.properties
现象
两个不同消费者组的节点,都可以消费到消息,实现发布订阅模型
kafka数据存储流程和原理概述和LEO+HW讲解
简介: Kafka数据存储流程、原理、LEO+HW讲解
- Partition
- topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列
- 是以文件夹的形式存储在具体Broker本机上
- LEO(LogEndOffset)
表示每个partition的log最后一条Message的位置。
- HW(HighWatermark)
- 表示partition各个replicas数据间同步且一致的offset位置,即表示allreplicas已经commit的位置
- HW之前的数据才是Commit后的,对消费者才可见
- ISR集合里面最小leo
- offset
- 每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中
- partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息
- 可以认为offset是partition中Message的id
- Segment:每个partition又由多个segment file组成;
- segment file 由2部分组成,分别为index file和data file(log file),
- 两个文件是一一对应的,后缀”.index”和”.log”分别表示索引文件和数据文件
- 命名规则:partition的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset+1
kakfa的api有AdminClient和springboot整合的KafkaTemplate这里实现在我的gitee上
kafka数据文件存储-可靠性保证原理(副本)
Kafka数据存储流程和log日志讲解
简介: Kafka数据存储流程和log日志讲解
- Kafka 采取了分片和索引机制,将每个partition分为多个segment,每个segment对应2个文件 log 和 index
- 新增备注
index文件中并没有为每一条message建立索引,采用了稀疏存储的方式
每隔一定字节的数据建立一条索引,避免了索引文件占用过多的空间和资源,从而可以将索引文件保留到内存中
缺点是没有建立索引的数据在查询的过程中需要小范围内的顺序扫描操作。
配置文件 server.properties
The maximum size of a log segment file. When this size is reached a new log segment will be created. 默认是1G,当log数据文件大于1g后,会创建一个新的log文件(即segment,包括index和log)
log.segment.bytes=1073741824
- 例子
#分段一
00000000000000000000.index 00000000000000000000.log
#分段二 数字 1234指的是当前文件的最小偏移量offset,即上个文件的最后一个消息的offset+1
00000000000000001234.index 00000000000000001234.log
#分段三
00000000000000088888.index 00000000000000088888.log
Kafka数据可靠性保证原理之副本Replica+ACK
简介: Kafka数据可靠性保证原理之副本机制Replica介绍《上》
- 背景
- Kafka之间副本数据同步是怎样的?一致性怎么保证,数据怎样保证不丢失呢
- kafka的副本(replica)
- topic可以设置有N个副本, 副本数最好要小于broker的数量
- 每个分区有1个leader和0到多个follower,我们把多个replica分为Learder replica和follower replica
- 生产者发送数据流程
- 保证producer 发送到指定的 topic, topic 的每个 partition 收到producer 发送的数据后
- 需要向 producer 发送 ack 确认收到,如果producer 收到 ack, 就会进行下一轮的发送否则重新发送数据
- 副本数据同步机制
- 当producer在向partition中写数据时,根据ack机制,默认ack=1,只会向leader中写入数据
- 然后leader中的数据会复制到其他的replica中,follower会周期性的从leader中pull数据,
- 对于数据的读写操作都在leader replica中,follower副本只是当leader副本挂了后才重新选取leader,follower并不向外提供服务,假如还没同步完成,leader副本就宕机了,怎么办?
简介: Kafka数据可靠性保证原理之副本机制Replica介绍《下》
- 问题点:Partition什么时间发送ack确认机制(要追求高吞吐量,那么就要放弃可靠性)
- 当producer向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别
- 副本数据同步策略 , ack有3个可选值,分别是0, 1,all。
- ack=0
- producer发送一次就不再发送了,不管是否发送成功
- 发送出去的消息还在半路,或者还没写入磁盘, Partition Leader所在Broker就直接挂了,客户端认为消息发送成功了,此时就会导致这条消息就丢失
- ack=1(默认)
- 只要Partition Leader接收到消息而且写入【本地磁盘】,就认为成功了,不管他其他的Follower有没有同步过去这条消息了
- 问题:万一Partition Leader刚刚接收到消息,Follower还没来得及同步过去,结果Leader所在的broker宕机了
- ack= all(即-1)
- producer只有收到分区内所有副本的成功写入全部落盘的通知才认为推送消息成功
- 备注:leader会维持一个与其保持同步的replica集合,该集合就是ISR,leader副本也在isr里面
- 问题一:如果在follower同步完成后,broker发送ack之前,leader发生故障,那么会造成数据重复
- 数据发送到leader后 ,部分ISR的副本同步,leader此时挂掉。比如follower1和follower2都有可能变成新的leader, producer端会得到返回异常,producer端会重新发送数据,数据可能会重复
- 问题二:acks=all 就可以代表数据一定不会丢失了吗
- Partition只有一个副本,也就是一个Leader,任何Follower都没有
- 接收完消息后宕机,也会导致数据丢失,acks=all,必须跟ISR列表里至少有2个以上的副本配合使用
- 在设置request.required.acks=-1的同时,也要min.insync.replicas这个参数设定 ISR中的最小副本数是多少,默认值为1,改为 >=2,如果ISR中的副本数少于min.insync.replicas配置的数量时,客户端会返回异常
- ack=0
简介: Kafka数据可靠性保证原理之ISR机制讲解
- 什么是ISR (in-sync replica set )
- leader会维持一个与其保持同步的replica集合,该集合就是ISR,每一个leader partition都有一个ISR,leader动态维护, 要保证kafka不丢失message,就要保证ISR这组集合存活(至少有一个存活),并且消息commit成功
- Partition leader 保持同步的 Partition Follower 集合, 当 ISR 中的Partition Follower 完成数据的同步之后,就会给 leader 发送 ack
- 如果Partition follower长时间(replica.lag.time.max.ms) 未向leader同步数据,则该Partition Follower将被踢出ISR
- Partition Leader 发生故障之后,就会从 ISR 中选举新的 Partition Leader。
- OSR (out-of-sync-replica set)
- 与leader副本分区 同步滞后过多的副本集合
- AR(Assign Replicas)
- 分区中所有副本统称为AR
- 背景 broker故障后
- ACK保障了【生产者】的投递可靠性
- partition的多副本保障了【消息存储】的可靠性
- hw的作用是啥?
- 备注:重复消费问题需要消费者自己处理
- HW作用:保证消费数据的一致性和副本数据的一致性 ``` 假设没有HW,消费者消费leader到15,下面消费者应该消费16。
此时leader挂掉,选下面某个follower为leader,此时消费者找新leader消费数据,发现新Leader没有16数据,报错。
HW(High Watermark)是所有副本中最小的LEO。 ```
- Follower故障
- Follower发生故障后会被临时踢出ISR(动态变化),待该follower恢复后,follower会读取本地的磁盘记录的上次的HW,并将该log文件高于HW的部分截取掉,从HW开始向leader进行同步,等该follower的LEO大于等于该Partition的hw,即follower追上leader后,就可以重新加入ISR
- Leader故障
- Leader发生故障后,会从ISR中选出一个新的leader,为了保证多个副本之间的数据一致性,其余的follower会先将各自的log文件高于hw的部分截掉(新leader自己不会截掉),然后从新的leader同步数据
高级篇-kafka高可用集群和高性能讲解
简介: Kafka的高性能原理分析归纳ZeroCopy
- 零拷贝ZeroCopy(SendFile)
- 例子:将一个File读取并发送出去(Linux有两个上下文,内核态,用户态)
- File文件的经历了4次copy
- 调用read,将文件拷贝到了kernel内核态
- CPU控制 kernel态的数据copy到用户态
- 调用write时,user态下的内容会copy到内核态的socket的buffer中
- 最后将内核态socket buffer的数据copy到网卡设备中传送
- 缺点:增加了上下文切换、浪费了2次无效拷贝(即步骤2和3)
- File文件的经历了4次copy
- 例子:将一个File读取并发送出去(Linux有两个上下文,内核态,用户态)
- ZeroCopy:
- 请求kernel直接把disk的data传输给socket,而不是通过应用程序传输。Zero copy大大提高了应用程序的性能,减少不必要的内核缓冲区跟用户缓冲区间的拷贝,从而减少CPU的开销和减少了kernel和user模式的上下文切换,达到性能的提升
- 对应零拷贝技术有mmap及sendfile
- mmap:小文件传输快
- sendfile:大文件传输比mmap快
- 应用:Kafka、Netty、RocketMQ等都采用了零拷贝技术
简介: Kafka的高性能原理分析归纳总结
- kafka高性能
- 存储模型,topic多分区,每个分区多segment段
- index索引文件查找,利用分段和稀疏索引
- 磁盘顺序写入
- 异步操作少阻塞sender和main线程,批量操作(batch)
- 页缓存Page cache,没利用JVM内存,因为容易GC影响性能
- 零拷贝ZeroCopy(SendFile)