RabbitMQ
工作流程
生产者生产过程:
- 生产者连接到RabbitMQ Broker建立一个连接(Connection),然后开启一个信道(Channel)
- 生产者声明一个交换器,并设置相关属性,比如交换机类型,是否持久化等
- 生产者声明一个队列并设置相关属性,比如是否排他、是否持久化、是否自动删除等
- 生产者通过路由键将交换器和队列绑定起来
- 生产者发送消息至RabbitMQ Broker,其中包含路由键、交换器等信息
- 相应的交换器根据接收到的路由键查找想匹配的队列,如果找到了,则将从生产者发送过来的消息存入到相应的队列中
- 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
- 关闭信道
- 关闭连接
消费者消费过程:
- 消费者连接到RabbitMQ Broker,建立一个连接(Connection,开启一个信道(Channel))
- 消费者向RabbitMQ Broker请求消费相应队列中的消息,可能会设置相应的回调函数,以及准备工作
- 等待RabbitMQ Broker回应并投递相应队列中的消息,消费者接收消息
- 消费者确认ack接收到的消息
- RabbitMQ从对垒中删除响应已经被确认的消息
- 关闭信道
- 关闭连接
spring+rabbitmq的相关配置
生产者的配置
spring:
application:
name: spring-boot -rabbi tmq
rabbitmq :
host: 127.0.0.1
port: 5672
username: admin
password: admin
virtual-host: /
#确认机制
publisher-confirms: true #消息有没有到达MQ,会返回一个ack缺人吗
publisher-returns: true #消息有没有找到合适的队列
#mandatory参数设为true时
#默认为false,直接丢弃
template:
mandatory: true #为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么rabbitmq会调用Basic.Return命令将消息返回给生产者,默认为false,直接丢弃
确实机制主要是为了生产者和mq之间的一个确认机制,当消息到没到mq,会提供相应的回调,在项目中RabbitSender这个类中进行了相应的配置
private final RabbitTemplate.ConfirmCallback confirmCallback = (correlationData, ack, s) -> {
if (ack) {
System.out.println(correlationData.getId());
} else {
log.error("ConfirmCallback消息发送失败: {}", s);
}
};
private final RabbitTemplate.ReturnCallback returnCallback = (message, replyCode, replyText, exchange, routingKey)
-> log.error("ReturnCallback消息发送失败: {}", new String(message.getBody(), StandardCharsets.UTF_8));
public <T> void sendMsg(String exchangeName, String routingKeyName, T content) {
// 设置每个消息都返回一个确认消息
this.rabbitTemplate.setMandatory(true);
// 消息确认机制
this.rabbitTemplate.setConfirmCallback(confirmCallback);
// 消息发送失败机制
this.rabbitTemplate.setReturnCallback(returnCallback);
// 异步发送消息
CorrelationData correlationData = new CorrelationData();
correlationData.setId("123");
this.rabbitTemplate.convertAndSend(exchangeName, routingKeyName, content, correlationData);
}
消费者的配置
rabbitmq :
host: 127.0.0. 1
port: 5672
username: admin
password: admin
virtual-host: /
#listener:
#simple:
#acknowledge-mode: manual
#prefetch: 1
listener:
direct:
acknowledge-mode: manual
prefetch: 1
consumers-per-queue: 2
#missing-queues-fatal. true
#default-requeue-rejected: false
#idle-event-interval: 1000
#auto-startup:false
listerner
simple
acknowledge-mode: manual 手动确认模式。推荐使用这种,就是说当消息被消费者消费时,徐还要手动返回信息告诉mq。如果是自动的话,mq会自动确认,不管你消费者是否完成消费,比如抛出异常
prefetch: 1 一个消费者一次拉取几条信息
consumers-per-queue: 2 一个队列可以被多少消费者消费
一个生产者,一个消费者
/**简单模式*/
@Test
public void senderSimple() throws Exception {
String context = "simple---> " + new Date();
this.rabbitTemplate.convertAndSend("simple", context);
}
@RabbitListener(queues = "simple")
public void simple(Message message, Channel channel){
String messageRec = new String(message.getBody());
System.out.println("simple模式接收到了消息:"+messageRec);
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
System.out.println("报错了------------------"+e.getMessage());
}
}
work
一个生产者,多个消费者
private static final List<Integer> ints = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
/**work模式*/
@Test
public void senderWork() throws Exception {
ints.forEach((i)->{
String context = "work---> " + i;
this.rabbitTemplate.convertAndSend("work", context);
});
}
@RabbitListener(queues = "work")
public void work1(Message message, Channel channel){
try{
Thread.sleep(500);
}catch (Exception e){
System.out.println(e.getMessage());
}
String messageRec = new String(message.getBody());
System.out.println("work1接收到了消息:"+messageRec);
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
System.out.println("work1报错了------------------"+e.getMessage());
}
}
@RabbitListener(queues = "work")
public void work2(Message message, Channel channel){
try{
Thread.sleep(1000);
}catch (Exception e){
System.out.println(e.getMessage());
}
String messageRec = new String(message.getBody());
System.out.println("work2接收到了消息:"+messageRec);
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
System.out.println("work2报错了------------------"+e.getMessage());
}
}
direct交换机
生产者发送消息给指定交换机,绑定某个队列
消费者通过监听某交换机绑定的某个队列接收消息
/**direct交换机*/
@Test
public void senderDirect() throws Exception {
rabbitSender.sendMsg("direct","directKey1","directContent1");
rabbitSender.sendMsg("direct","directKey2","directContent2");
}
@RabbitListener(bindings = @QueueBinding(exchange = @Exchange("direct"), key = "directKey1"
, value = @Queue(value = "directQueue1", durable = "true", exclusive = "false", autoDelete = "false")))
public void direct1(String str, Channel channel, Message message) throws IOException {
try {
System.out.println("directQueue1接收到了:"+str);
}catch (Exception e){
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
}
}
@RabbitListener(bindings = @QueueBinding(exchange = @Exchange("direct"), key = "directKey2"
, value = @Queue(value = "directQueue2", durable = "true", exclusive = "false", autoDelete = "false")))
public void direct2(String str, Channel channel, Message message) throws IOException {
try {
System.out.println("directQueue2接收到了:"+str);
}catch (Exception e){
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
}
}
topic交换机
指定主题
:匹配一个或者多级路径
*:匹配一级路径
@Test
public void senderTopic() throws Exception {
String contexta = "topic.a";
rabbitSender.sendMsg("topic","topicKey.a",contexta);
String contextb = "topic.b";
rabbitSender.sendMsg("topic","topicKey.b",contextb);
String contextc = "topic.c";
rabbitSender.sendMsg("topic","topicKey.c",contextc);
String contextz = "topic.z";
rabbitSender.sendMsg("topic","topicKey.c.z",contextz);
}
/**
* topic交换机
* */
@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(name = "topic",type = "topic"), key = "topicKey.#"
, value = @Queue(value = "topicQueue", durable = "true", exclusive = "false", autoDelete = "false")))
public void topicQueue(String str, Channel channel, Message message) throws Exception {
try {
System.out.println("topicQueue接收到了:"+str);
}catch (Exception e){
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
}
}
fanout交换机
/**
* topic交换机
* */
@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(name = "topic",type = "topic"), key = "topicKey.#"
, value = @Queue(value = "topicQueue", durable = "true", exclusive = "false", autoDelete = "false")))
public void topicQueue(String str, Channel channel, Message message) throws Exception {
try {
System.out.println("topicQueue接收到了:"+str);
}catch (Exception e){
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
}
}
/**
* Fanout 交换机
* */
@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(name = "fanout",type = "fanout"), key = "fanoutKey1"
, value = @Queue(value = "fanoutQueue1", durable = "true", exclusive = "false", autoDelete = "false")))
public void fanoutQueue1(String str, Channel channel, Message message) throws Exception {
try {
System.out.println("fanoutQueue1接收到了:"+str);
}catch (Exception e){
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
}
}
@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(name = "fanout",type = "fanout"), key = "fanoutKey2"
, value = @Queue(value = "fanoutQueue2", durable = "true", exclusive = "false", autoDelete = "false")))
public void fanoutQueue2(String str, Channel channel, Message message) throws Exception {
try {
System.out.println("fanoutQueue2接收到了:"+str);
}catch (Exception e){
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
}
}
Headers 交换机
常见问题及其解决方案
如何保证消息不丢失
ACK确认机制+持久化
如何处理重复消息
场景:
broker的响应没有发送到生产者中,生产者又重发了一次,此时消息是重复的
消费者没有更新consumer offset(挂了),其他消费者重复消息
处理:
保证幂等性(就是用户对于统一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生的副作用),通过代码逻辑判断实现(比如加锁),或者使用token机制实现,利用分布式锁的概念,使用redis校验唯一索引
如何处理消息堆积
产生原因:生产者的生产速度与消费者的消费速度不一致导致的
如果不是代码问题,就得从架构上进行解决,比如进行水平扩容处理,增加topic的队列数和消费者数量
如何保证消息的有序性
全局有序
只能有一个生产者往topic发送消息,并且一个topic内部只能有一个队列/分区
消费者也必须是单线程消费这个队列
部分有序
可以加你个topic内部划分成我们需要的队列数,把消息通过特定的策略发往固定的队列中,然后每个对哦对应一个单线程处理的消费者
kafka
是一个分布式的、可分区的、可复制的消息系统,依赖于Zookeeper
消息系统:由生产者、消费者、消息队列三部分组成
主要设计目标
- 以时间福在读O(1)的方式提供消息持久化能力,即使对GTB级以上数据也能保证常数时间的访问性能;
- 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100k条消息的传输;
- 支持kafka server间的消息分区,以及分布式消费,同时保证每个partition内的消息顺序传输;
- 同时支持离线数据处理和实时数据处理;
- scale out,支持在线水平扩展。
引入kafka的背景
当大量写请求,或者具体的说应该是瞬时大量的写请求到来时,既要保证系统的可用性还要保证尽量全部处理掉请求的情况下,我们在不引入消息队列的请求下,我们应该怎么处理呢?无非就是引入以下几种方式:
- 线程池:是用池化技术也在我们日常开发中缓解系统压力的一个重要利器,但是对于线程池而言很难根据系统的瞬时流量来自动伸缩容量,而且自动伸缩流量并不是根治大流量到来办法。当池子满了以后也会影响部分情况的丢弃,最重要的是一旦服务宕机,那么所有的请求将化为乌有,除非你将线程池序列化,自己想想复杂度跟性能吧。
- 队列/缓存:队列一般场景下都是使用空间来换时间的另一种常用办法,但是它的弊病也存在需要自己开发一套序列化方案,保证服务异常恢复能正常处理后续任务。Redis的发布订阅者同样存在这些问题。
在传统的点对点消息系统中,消息持久化到一个队列中,此时,将由一个或多个消费者消费队列中的数据,但是一条消息只能被消费一次。当一个消费者消费了队列中的某条数据之后,该条数据则从消息队列中删除。该模式即使有多个消费者同事消费数据,也能保证数据处理的顺序
而在发布-订阅消息系统中,消息被持久化到一个topic中,与点对点消息不同的是,消费者可以订阅一个或多个topic,消费者可以消费订阅后的topic中的所有消息,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。
使用消息队列能解决哪些问题
- 异步化:我们举个常见的电商下单例子:在没有异步化的步骤大致是用户提交订单->预扣库存->生成订单->消费成功消息推送->通知仓库发货->数据计入财务几个串行化步骤;在串行的模式下对于整条业务链路等待时间较长,而且浪费了接口的并发带来的性能提高。如果我们从[生成订单]后就直接返回主流程,使用消息中间件后续同时处理剩余流程。既不影响整个业务的体验,而且从性能跟响应速度上有了大幅提升。
- 缓解大流量的冲击:我们在提高 了系统的并行速度后还得注意一件最重要的问题就是防止系统被大流量压垮。前面我们说到使用池化技术来削峰,引入消息中间件后就能使系统平稳的处理大流量,当然这里的处理技术没有这么简单,同样需要考虑例如消息堆积与消息丢弃等问题
- 服务解耦:还是拿我上面说到的下单流程,随着业务逐渐发展,应用也变得复杂起来,原来一个下单流程可能就负责在一个人手里,现在可能分别在不同的人或者团队中。那么还是使用直接相互调用的话,每个子系统在开发升级的过程中为要为了上下游系统而做出妥协与让步,或者牵扯到一条业务线的多处改动适应某一个子系统的变动,每一次上线要拉着一大帮人上线,测试整条链路,这样导致系统到最后简直忍不惨睹。在引入消息队列后通过消息队列的各种特性可以让变化透明化,不在影响上下游系统,实现了系统间的解耦。
以上就是我们选择使用消息队列能帮我们解决的问题,当然消息队列还有其他的功能:作为发布 / 订阅系统实现一个微服务级系统间的观察者模式;连接流计算任务和数据;用于将消息广播给大量接收者。我们不难归纳出这样一个结果:在单应用模式下使用队列的场景,在分布式集群环境下大多都能使用消息队列来解决,当然使用消息队列给我我们带来这么多好处的同时也要做好它带来的常见性问题:消息延迟问题;系统复杂度;数据不一致;诸如这些问题。后面几节我们将展开详细说明。
kafka的特性
消息持久化,基于本间系统来存储和缓存消息
高吞吐量,kafka将数据写到磁盘,但是在底层采用了零拷贝技术,所以速度比较快
高扩展,kafka依赖Zookeeper来对集群进行协调管理,同时子机器扩展时无需将整个集群停机
多客户端支持,kafka核心模块用scala语言开发但提供多种开发语言接入,包括java、python等
安全机制,支持代理与Zookeeper连接身份验证,客户端读、写权限认证
数据备份,kafka可以为每个主题指定副本数,对数据进行持久化备份
轻量级,kafka的实例是无状态的,同事集群本身几乎不需要生产者和消费者的状态信息
消息压缩,kafka支持gzip,snappy,la4三种压缩方式,把多条消息压缩成messageset
kafka效率高的原因
顺序写入磁盘
kafka的producer生产数据,要写入到log文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能达到600M/s,而随机写只有100K/s。这与磁盘的机械机构无关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。
零拷贝技术
指计算机执行操作时,CPU不需要先将数据从某处内存复制到另一个特定区域。这种技术通常用于网络传输文件时节省CPU周期和内存带宽。
零拷贝技术可以减少数据拷贝和共享总线操作的次数,消除传输数据在存储器之间不必要的中间拷贝次数,从而有效的提高数据传输效率,减少了用户进程地址空间和内核地址空间之间因为上下文切换而带来的开销。
需要注意的是零拷贝技术并不是不需要拷贝,而只是减少不必要的拷贝次数与线程上下文切换次数。
在早期的计算机中,用户进程需要读取磁盘数据时,需要CPU终端和CPU参数,因此效率比较低,发起IO请求,每次的IO中断,都带了CPU的上下文切换,因此出现了DMA
DMA,直接内存存取,是所有现代电脑的重要特色,它允许不同速度的硬件装置来沟通,而不需要依赖于CPU的大量中断负载。DMA控制器,接管了数据的读写请求,减少了CPU的负担,这样一来,CPU就能高校工作了,现代硬盘基本都支持DMA
kafka术语解释
broker
kafka集群包含一个或多个服务器,每个服务器节点称为broker。
broker负责存储topic的数据。如果某个topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。如果某个topic有N个partition,集群有N+M个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。如果某个topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在实际生产环境中,尽量避免这种情况的发生,这种情况溶体导致kafka集群数据不均衡。
topic
每条发布到kafka集群的消息都有一个类别,这个类别被称为topic。
物理上不同topic的消息分开存储,逻辑上一个topic的消息虽然保存于一个或多个broker上,但用户只需指定消息的topic即可生产或者消费数据,而不必关心数据存于何处。
partition
topic中的数据分割为一个或多个partition。每个topic至少有一个partition,每个partition中的数据使用多个segment文件存储。
partition中的数据是有序的,不同partition间的数据解释了数据的顺序。
如果topic有多个partition,消费数据时旧不能保证数据的顺序,在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。
producer
生产者即数据的发布者,该角色将消息发布到kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。
consumer
消费者可以从broker中读取数据。消费者可以消费多个topic中的数据
consumer group
每个consumer属于一个特定的consumer group。可为每个consumer指定group name,若不指定,则group name属于默认的group
leader
每个partition有多个副本,其中有且仅有一个作为leader。leader是当前负责数据的读写的partition
follower
follower跟随leader,所有写请求都通过leader路由,数据变更会广播给所有的follower,follower与leader保持数据同步,如果leader失效,则从follower中选举出一个新的leader。当follower与leader挂掉、卡主或者同步太慢,leader会把follower从in sync replicas列表中删除,重新创建一个follower。
kafka工作流程
kafka工作原理
kafka将消息以topic为单位进行归纳。向kafka topic发布消息的程序为producers,将预定topic并消费消息的程序称为consumer。
kafka以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker
客户端和服务端通过TCP协议通信
topics和logs
一个topic是对自足消息的归纳。每个topic,kafka对其日志进行了分区:
每个分区都由一系列有序的、不可变的消息组成,这些消息被连续的追加的分区中。分区中的每个消息都有一个连续的序列号叫做offset,用来在分区中唯一的标识这个消息。
将日志分区的目的是为了使每个日志的数量不会太大,可以在单个服务上保存。另外每个分区可以单独发布和消费,为并发操作topic提供了一种可能
每个consumer唯一需要维护的数据是消息在日志中的位置,也就是offset。一般情况下锁着consumer不断的读取消息,offset的值会不断增加,但其实consumer可以以任意顺序读取消息,比如它可以将offset设置为一个旧的值来重读之前的消息。
分布式
每个分区在kafka集群的若干服务中否有副本,这样这些持有副本的服务可以共同出路数据和请求,副本数量是可以配置的。副本使得kafka具备了容错能力。
每个分区都由一个服务器作为leader,0个或者多个服务器作为followers,leader负责处理消息的读和写,followers则负责复制leader。如果leader宕机了,kafka会从followers中选择一台服务作为leader
producers
将消息发布到它指定的topic中,并负责决定发布到哪个分区。通常简单的由负载均衡机制随机选择分区,也可以用过特定的分区函数选择分区
consumers
发布消息通常有两种模式:队列模式和发布-订阅模式
队列模式中,consumers可以同时从服务端读取消息,每个消息只被其中一个consumer读到;发布-订阅模式中消息被广播到所有的consumer中
consumers可以加入一个consumer组,共同竞争一个topic,topic中的消息将被分发到组中的一个成员中。同一组中的consumer可以在不同的程序中,也可以在不同的机器上。如果所有的consumer都在一个组中,这就称为了传统的队列模式,在各个consumer中实现负载均衡。
发布-订阅模式:
分片与副本机制
分区:当数据量非常大的时候,一个服务器存放不了,就将数据分成两个或者多个部分,存放在多台服务器上,每个服务器上的数据叫做一个分区
副本:当数据只保存一份时,有丢失风险。为了更好的容错和容灾,将数据拷贝几份,保存到其他机器上
设置分区和副本的方法:
控制台上:—replication-factor 1 —partition 3
API代码:AdminUtils.createTopic(zkUtils,topicName,3,1,new Properties(),AdminUtils.createTopic$default$6());
消息不丢失机制
生产者消息不丢失机制:
发送消息的同步和异步模式:
同步模式:生产者重试3次,如果还没有相应就报错。生产者等待10s,如果broker没有给出ack响应,就认为失败。
异步模式:先将数据保存在生产者端的buffer中。buffer大小是两万条。发送一批数据的大小是500条。满足数据阈值或者数量阈值其中一个条件就可以发送数据
消息确认的三个状态:
0:生产者值负责发送数据,不管kafka的broker是否接收到数据;
1:某个partition的leader收到数据给出响应;
-1:某个partition的所有副本都收到数据后给出响应
broker端消息不丢失机制:
broker端的消息不丢失,其实就是用partition副本机制来保证。producer ack -1,能够保证所有的副本都同步好了数据。其中一台机器挂了,并不影响数据的完整性
消费者端消息不丢失:
如果有一个外部存储能够记录每个consumer消费partition的offset值,就不会造成数据丢失,只会有重复消费的可能。而kafka0.8以后,offset值可以存放到kafka内置的topic中
消息存储和查询机制
消息存储机制
kafka作为消息中间件,只负责消息的临沭存储,并不是永久存储,需要删除过期的数据
如果一个partition中有10t数据,是如何存放的?是存放在一个文件中,还是存放在多个文件中?
kafka采用存储到多个文件中的方式。因为如果将所有数据都存放在一个文件中,需要删除过期数据的时候,就比较麻烦。因为文件有日期属性,所以当删除过期数据时,只需要根据文件的时期属性删除就可以了
kafka的数据时存储在/export/data/kafka(可以自己设置)目录下,存储时是将数据划分为一个个的segment段,在segment段中有两个核心的文件,一个是log,一个是index。当log文件等于1G时,新的数据会写入到下一个segment中。
消息查询机制
在kafka中进行消息查询时,首先会查找segment中的index索引文件,index索引文件时以起始来命名的,根据查询索引文件能很快的定位到具体文件
当根据index索引文件定位到需要查询的具体文件时,就会去查找log文件,在该文件中按顺序查找到目标文件
生产者数据分发策略
kafka在数据生产的时候,有一个数据分发策略。默认的情况下使用DefaultPartitioner.class类。如果用户指定了partition,生产就不会调用DefaultPartitioner.partition()方法。
当用户指定key,就会使用hash算法来确定发往哪个partition。如果key一直不变,同一个key算出来的hash值是个固定值。如果是固定值,这就hash取模就没有意义。例如:Utils.toPositive(Utils.murmur2(keyBytes))%numPartitions
还可以指定将数据发往哪个partition。当ProducerRecord的构造参数中有partition的时候,就可以发送到对应的partition上
如果既没有指定partition,也没有key的情况下,那么就是用轮询的方式发送数据
消费者的负载均衡机制
一个partition只能被一个组中的消费者消费。所以如果消费组中有多余partition数量的消费者,那么一定会有消费者无法消费数据。如果消费者组中的消费者小于partition,那么消费的数据就不完整,会造成错误
操作kafka集群的两种方式
使用控制台运行kafka
创建一个topic(主题):
bin/kafka-topics.sh —create —zookeeper zk01:2181 —replication-factor 1 —partition 1 —topic order
启动一个生产者,用来生产数据:
bin/kafka-console-producer.sh —broker -list kafka01:9092 —topic order
启动一个消费者,消费数据:
bin/kafka-console-consumer.sh —zookeeper zk01:2181 —from-beginning —topic order
使用javaAPI操作kafka
引入kafka依赖
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.1</version>
生产者相关操作
//创建Properties配置参数对象,并设置参数
Properties props = new Properties();
props.put("bootstrap.servers", "node01:9092");
props.put("acks", "all");
//创建一个KafkaProducer,Kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
for (int i = 0; i < 1000; i++) {
// 发送数据 ,需要一个producerRecord对象,最少参数 String topic, V value
kafkaProducer.send(new ProducerRecord<String, String>("order", "订单信息!"+i));
Thread.sleep(100);
}
}
消费者相关操作:
// 1、创建配置参数对象,并连接集群
Properties props = new Properties();
props.put("bootstrap.servers", "node01:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
//2、创建Kafka的消费者对象
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
//3、订阅一个主题,订阅主题需传入List格式
kafkaConsumer.subscribe(Arrays.asList("order"));
//4、使用死循环不停拉取数据
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : consumerRecords) {
System.out.println("消费的数据为:" + record.value());
}
}
topic相关操作:
由于topic的元数据信息是注册在zookeeper相应节点之中,所以对topic的操作实质是对zookeeper中记录topic元数据信息相关路径的操作。kafka将对zookeeper的相关操作封装成一个ZkUtils类,并封装了一个AdrninUtils类调用ZkClient类的相关方法以实现对kafka元数据的操作,包括对主题、代理、消费者等相关元数据的操作。对主题操作的相关API嗲用较简单,相应操作都是通过AdminUtils类的方法来完成的
创建topic
方法一:
//参数:zookeeper的地址,session超时时间,连接超时时间,是否启用zookeeper安全机制
zkUtils = ZkUtils.apply("node01:2181", 30000, 3000, JaasUtils.isZkSecurityEnabled());
方法二:
//参数:zkUtils,topic名称,partition数量,副本数量,参数,机架感知模式
AdminUtils.createTopic(zkUtils, topicName, 1, 1, new Properties(), AdminUtils.createTopic$default$6());
删除topic
//参数:zkUtils,topic名称,partition数量,副本数量,参数,机架感知模式
AdminUtils.deleteTopic(zkUtils, topicName);
判断topic是否存在
AdminUtils.topicExists(zkUtils, topicName);
数据持久化
kafka大量依赖文件系统去存储和缓存消息。硬盘的快慢完全取决于它的方式,设计良好的硬盘架构可以和内存一样快。
与传统的将数据缓存在内存中然后刷到磁盘的设计不同,kafka是直接将数据写到文件系统的日志中
在大多数的消息系统中,数据持久化的机制往往是为每个consumer提供一个B树或者其它的随机读写的数据结构。B树虽然有优势,但也会带来一些代价,比如B树的时间复杂度是O(logN),虽然被认为是常量级复杂度,但对于磁盘来说并非如此。比如磁盘进行一次搜索需要10ms的话,每个磁盘在同一时间只能进行一次搜索,这样并发处理就成了问题。虽然存储系统使用缓存进行了大量优化,但是对于树结构的性能的观察结果却表明它的性能往往随着数据的正常而线性下降,数据增长一倍,速度就会降低一倍。
消息传输的事务定义
通常有三种级别:
- 最多一次。消息不会被重复发送,最多被传输一次,但也有可能一次不传输;
- 最少一次。消息不会被漏发送,做少被传输一次,但也有可能被重复传输;
- 精确一次。不会漏传输也不会重复传输,每个消息都传输被一次而且仅仅传输一次,这是我们所期望的。
大多数消息系统并不能精确一次传输,因为有些并没有说明consumer或producer失败时怎么样,或者当有多个consumer并行时怎么样,或写入银盘的数据丢失时又会怎么样。而kafka在这方面做的比较好,kafka有一个committed的概念,一旦消息被提交了,只要消息被写入的分区所在的副本broker是活动的,数据就不会丢失。
随谈kafka有commit的概念,但如果producer发布消息时发生了网络错误,但又不确定是在提交之前发生的还是提交之后发生的,
案例
案例1
引入kafaka
<dependency>
<groupId> org.apache.kafka</groupId >
<artifactId> kafka_2.10</artifactId >
<version> 0.8.0</ version>
</dependency>
通过接口来配置kafka的连接参数
public interface KafkaProperties
{
final static String zkConnect = "10.22.10.139:2181";
final static String groupId = "group1";
final static String topic = "topic1";
final static String kafkaServerURL = "10.22.10.139";
final static int kafkaServerPort = 9092;
final static int kafkaProducerBufferSize = 64 * 1024;
final static int connectionTimeOut = 20000;
final static int reconnectInterval = 10000;
final static String topic2 = "topic2";
final static String topic3 = "topic3";
final static String clientId = "SimpleConsumerDemoClient";
}
创建一个producer
import java.util.Properties;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
/**
* @author leicui bourne_cui@163.com
*/
public class KafkaProducer extends Thread
{
private final kafka.javaapi.producer.Producer<Integer, String> producer;
private final String topic;
private final Properties props = new Properties();
public KafkaProducer(String topic)
{
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", "10.22.10.139:9092");
producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
this.topic = topic;
}
@Override
public void run() {
int messageNo = 1;
while (true)
{
String messageStr = new String("Message_" + messageNo);
System.out.println("Send:" + messageStr);
producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
messageNo++;
try {
sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
创建一个consumer
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
/**
* @author leicui bourne_cui@163.com
*/
public class KafkaConsumer extends Thread
{
private final ConsumerConnector consumer;
private final String topic;
public KafkaConsumer(String topic)
{
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig());
this.topic = topic;
}
private static ConsumerConfig createConsumerConfig()
{
Properties props = new Properties();
props.put("zookeeper.connect", KafkaProperties.zkConnect);
props.put("group.id", KafkaProperties.groupId);
props.put("zookeeper.session.timeout.ms", "40000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
@Override
public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
System.out.println("receive:" + new String(it.next().message()));
try {
sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
简单的发送和消费
public class KafkaConsumerProducerDemo
{
public static void main(String[] args)
{
KafkaProducer producerThread = new KafkaProducer(KafkaProperties.topic);
producerThread.start();
KafkaConsumer consumerThread = new KafkaConsumer(KafkaProperties.topic);
consumerThread.start();
}
}