消息总线Spring Cloud Bus
Spring cloud bus通过轻量消息代理连接各个分布的节点。这会用在广播状态的变化(例如配置变化)或者其他的消息指令。Spring bus的一个核心思想是通过分布式的启动器对spring boot应用进行扩展,也可以用来建立一个多个应用之间的通信频道。目前唯一实现的方式是用AMQP消息代理作为通道,同样特性的设置(有些取决于通道的设置)在更多通道的文档中。SpringCloud Bus
会向外提供一个http接口,即下图中的/ actuator / bus-refresh
。我们将这个接口配置到git
的webhook
上,当git上的内容发生改变时,就会自动调用/ actuator / bus-refresh
接口。Bus
就会通知ConfigServer
,ConfigServer
会发布更新消息到消息总线的消息队列,其他服务订阅到该消息就会信息刷新,从而实现整个微服务进行自动刷新。
MQ 中间件的作用
MQ中间件就是我们常说的消息代理,主要用于消息的接收和转发消息,可以将消息生产者和消息消费者完全解耦,不必直接调用对方的API。一般常用在邮件服务、短信服务、日志服务等。
目前常用的MQ有:
- Kafka
- RabbitMQ
- RocketMQ(阿里开源)
- ActiveMQ等
Spring Cloud Bus
目前仅支持两款MQ中间件: RabbitMQ
和Kafka
。后面讲主要介绍这两种中间件配合Spring Cloud Bus
的使用
RabbitMQ 介绍
1. 什么是RabbitMQ
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现,是一个轻量级的消息中间件,为很多语言提供了调用工具,例如:Java,.NET,PHP,Python,JavaScript,Ruby,Go等。
AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
2. RabbitMQ 的特点
RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:
- 可靠性(Reliability)
- 灵活的路由(Flexible Routing)
- 消息集群(Clustering)
- 高可用(Highly Available Queues)
- 多种协议(Multi-protocol)
- 多语言客户端(Many Clients)
- 管理界面(Management UI)
- 跟踪机制(Tracing)
-
3. RabbitMQ 实现原理
RabbitMQ 是 AMQP 协议的一个开源实现,所以其内部实际上也是 AMQP 中的基本概念
Message
消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。- Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。 - Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。 - Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。 - Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。 - Connection
网络连接,比如一个TCP连接。 - Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。 - Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。 - Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。 - Broker
表示消息队列服务器实体。4.RabbitMQ 工作模式
官网介绍:https://www.rabbitmq.com/getstarted.html
RabbitMQ 有6中工作模式,分别是:
- simple模式(即最简单的收发模式):一个生产者,一个消费者
- work工作模式:一个生产者,多个消费者,每个消费者获取到的消息唯一。
- publish/subscribe发布订阅:一个生产者发送的消息会被多个消费者获取。
- routing路由模式:发送消息到交换机并且要指定路由key ,消费者将队列绑定到交换机时需要指定路由key
- topic 主题模式:路由模式的一种,将路由键和某模式进行匹配,此时队列需要绑定在一个模式上,“#”匹配一个词或多个词,“*”只匹配一个词。
- RPC模式: 当客户端启动时,创建一个匿名的回调队列。客户端为RPC请求设置2个属性:replyTo,设置回调队列名字;correlationId,标记request。请求被发送到rpc_queue队列中。RPC服务器端监听rpc_queue队列中的请求,当请求到来时,服务器端会处理并且把带有结果的消息发送给客户端。接收的队列就是replyTo设定的回调队列。客户端监听回调队列,当有消息时,检查correlationId属性,如果与request中匹配,那就是结果了
- 发布者确认:消费者确认解决的问题是确认消息是否被消费者”成功消费”,发布者确认是异步发出的,可以确认单个消息或一组消息,发出确认的确切时刻取决于消息的传递模式(持久性与瞬态)以及消息路由到的队列的属性
本文主要讲解的还是Spring Cloud 微服务,RabbitMQ 不做重点讲解
RabbitMQ 安装
本节只说明Windows版的安装(安装包我放置在交流群里面了),Linux 的安装,统一放在Linux 模块 官网地址:https://www.rabbitmq.com/#getstarted
1. 点击下载
2. 选择Windows
3.选择并点击
4. Erlang 下载安装
安装RabbitMQ之前需要装Erlang
https://www.erlang-solutions.com/resources/download.html
双击esl-erlang_22.1_windows_amd64.exe
,一路next
安装完成之后设置环境变量
在添加path属性,在path后面添加%ERLANG\_HOME%\bin
然后在安装目录下,进入dos,并输入erl,如果版本出现,就代表安装成功了。
C:\Program Files\erl10.5\bin>erl
Eshell V10.5 (abort with ^G)
1>
5. RabbitMQ 安装
双击rabbitmq-server-3.8.2.exe
,一路next,安装完成后,在用命令安装RabbitMq-Plugins,进入/sbin
rabbitmq-plugins enable rabbitmq\_management
如果执行失败,执行rabbitmq-service stop
,在输入rqbbitmq-service remove
,在输入rabbitmq-service install
,在输入rabbit-service start
,最后重新输入rabbitmq-plugins enable rabbitmq\_management
6. 安装完之后浏览器访问
http://localhost:15672/#/
默认账号密码guest\guest
登录进来后,可以看到Connections、Channels、Exchanges、Queues等功能
7. 服务关闭
Windows
下 快捷键win+R
弹出小窗口,输入services.msc
可以点击关闭
关闭和重启以后都可以从这里操作
RabbitMQ 使用
本节代码地址
GitHub: https://github.com/xuyisu/fw-sping-cloud/tree/master/fw-cloud-mq/fw-cloud-mq-rabbitmq
浏览器打开RabbitMQ 客户端,可以自己先熟悉熟悉使用
尝试建一个fwcloud
用户
点击fwcloud用户设置权限
下面通过一个SpringBoot 整合RabbitMQ 的例子熟悉一下RabbitMQ的功能
1. 新建项目
1.1 maven配置
需要引入spring-boot-starter-amqp
的包,封装了RabbitMQ 的API。
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
1.2 新建启动类
@SpringBootApplication
public class FwRabbitMqApplication {
public static void main(String[] args) {
SpringApplication.run(FwRabbitMqApplication.class, args);
}
}
1.3 新建RabbitMQ配置
使用MQ前,我们需要设置一个Topic ,还可以配置交换器、路由等信息,我们这里以简单使用为主。
/**
* @author xuyisu
* @description RabbitMq配置
* @date 2019/12/18
*/
@Configuration
public class RabbitMqConfig {
/**
* 注册一个名为hello的队列
* @return
*/
@Bean
public Queue helloQueue(){
return new Queue("hello");
}
}
1.4 应用配置信息
设置RabbitMQ 连接配置信息,这里设置的用户是我们刚刚创建的fwcloud用户
server:
port: 8781
spring:
application:
name: fw-cloud-mq-rabbitmq
rabbitmq:
host: localhost
port: 5672
username: fwcloud
password: fwcloud
1.5 创建消息生产者
通过注入AmqpTemplate 来发送我们自定义产生的消息,AmqpTemplate 已经为我们定义了一套AMQP协议的基本操作。我们也将信息通过log日志打印出来
/**
* @author xuyisu
* @description 发送方
* @date 2019/12/18
*/
@Component
@Slf4j
public class FwSender {
@Autowired
private AmqpTemplate amqpTemplate;
public void send(){
String message="Hello World:"+ DateUtil.now();
log.info("FwSender:"+message);
//第一个参数是topic,第二个参数是内容
amqpTemplate.convertAndSend("hello",message);
}
}
1.6 创建消息消费者
通过定义@RabbitListener
来监听topic为hello的消息,并且通过@RabbitHandler
来指定消息的处理。
/**
* @author xuyisu
* @description 接收方
* @date 2019/12/18
*/
@Component
@RabbitListener(queues = "hello")
@Slf4j
public class FwReceiver {
@RabbitHandler
public void process(String msg){
log.info("FwReceiver:{}",msg);
}
}
1.7 启动项目
启动之后,程序创建了一个和127.0.0.1:5672
的连接,用户名是fwcloud
2019-12-18 14:56:16 INFO main org.springframework.amqp.rabbit.connection.CachingConnectionFactory Attempting to connect to: [localhost:5672]
2019-12-18 14:56:16 INFO main org.springframework.amqp.rabbit.connection.CachingConnectionFactory Created new connection: rabbitConnectionFactory#7a583586:0/SimpleConnection@33f17289 [delegate=amqp://fwcloud@127.0.0.1:5672/, localPort= 51546]
通过RabbitMQ 的web 管理可以看到Connections和Channels连接信息
1.7.1. Connections
1.7.2. Channels
1.8. 单元测试 发送数据sender()
2019-12-18 14:49:37.501 INFO 28284 --- [ main] com.yisu.mq.rabbitmq.sender.FwSender : FwSender:Hello World:2019-12-18 14:49:37
2019-12-18 14:49:37.513 INFO 28284 --- [ main] com.yisu.mq.rabbitmq.sender.FwSender : FwSender:Hello World:2019-12-18 14:49:37
2019-12-18 14:49:37.514 INFO 28284 --- [ main] com.yisu.mq.rabbitmq.sender.FwSender : FwSender:Hello World:2019-12-18 14:49:37
2019-12-18 14:49:37.514 INFO 28284 --- [ main] com.yisu.mq.rabbitmq.sender.FwSender : FwSender:Hello World:2019-12-18 14:49:37
2019-12-18 14:49:37.515 INFO 28284 --- [ main] com.yisu.mq.rabbitmq.sender.FwSender : FwSender:Hello World:2019-12-18 14:49:37
2019-12-18 14:49:37.515 INFO 28284 --- [ main] com.yisu.mq.rabbitmq.sender.FwSender : FwSender:Hello World:2019-12-18 14:49:37
2019-12-18 14:49:37.515 INFO 28284 --- [ main] com.yisu.mq.rabbitmq.sender.FwSender : FwSender:Hello World:2019-12-18 14:49:37
2019-12-18 14:49:37.516 INFO 28284 --- [ main] com.yisu.mq.rabbitmq.sender.FwSender : FwSender:Hello World:2019-12-18 14:49:37
2019-12-18 14:49:37.516 INFO 28284 --- [ main] com.yisu.mq.rabbitmq.sender.FwSender : FwSender:Hello World:2019-12-18 14:49:37
2019-12-18 14:49:37.517 INFO 28284 --- [ main] com.yisu.mq.rabbitmq.sender.FwSender : FwSender:Hello World:2019-12-18 14:49:37
1.9. 队列已经存在
1.10. 接收端数据
2019-12-18 15:00:44 INFO org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1 com.yisu.mq.rabbitmq.consumer.FwReceiver FwReceiver:Hello World:2019-12-18 15:00:43
2019-12-18 15:00:44 INFO org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1 com.yisu.mq.rabbitmq.consumer.FwReceiver FwReceiver:Hello World:2019-12-18 15:00:43
2019-12-18 15:00:44 INFO org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1 com.yisu.mq.rabbitmq.consumer.FwReceiver FwReceiver:Hello World:2019-12-18 15:00:43
2019-12-18 15:00:44 INFO org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1 com.yisu.mq.rabbitmq.consumer.FwReceiver FwReceiver:Hello World:2019-12-18 15:00:43
2019-12-18 15:00:44 INFO org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1 com.yisu.mq.rabbitmq.consumer.FwReceiver FwReceiver:Hello World:2019-12-18 15:00:43
2019-12-18 15:00:44 INFO org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1 com.yisu.mq.rabbitmq.consumer.FwReceiver FwReceiver:Hello World:2019-12-18 15:00:43
2019-12-18 15:00:44 INFO org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1 com.yisu.mq.rabbitmq.consumer.FwReceiver FwReceiver:Hello World:2019-12-18 15:00:43
2019-12-18 15:00:43 INFO org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1 com.yisu.mq.rabbitmq.consumer.FwReceiver FwReceiver:Hello World:2019-12-18 15:00:43
2019-12-18 15:00:44 INFO org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1 com.yisu.mq.rabbitmq.consumer.FwReceiver FwReceiver:Hello World:2019-12-18 15:00:43
2019-12-18 15:00:44 INFO org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1 com.yisu.mq.rabbitmq.consumer.FwReceiver FwReceiver:Hello World:2019-12-18 15:00:43
上面只是演示了最简单的收发模式,RabbitMQ还有跟多特性(订阅模式、路由模式、RPC模式等),读者可以到官网查看:https://www.rabbitmq.com,也可以购买书籍阅读。
Spring Cloud Config 改造(RabbitMQ)
本节代码地址
在config配置那一节我们提过,如果一个ConfigServer 有多个客户端,ConfigServer 修改了数据,一个个刷新客户端的/actuator/refresh 对开发很不友好,下面我们通过引入Spring Cloud Bus 总线来解决问题,ConfigServer 变更后推送给总线,由总线来向各个客户端刷新配置。过程如下图:
下面我们新建一个项目来演示
1.新建项目fw-cloud-config-amqp-client
1.1 maven 配置
比fw-cloud-config-native-client
多加了一个spring-cloud-starter-bus-amqp
包
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
</dependencies>
1.2 新建FwConfigAmqpClientApplication启动类
@EnableDiscoveryClient
@SpringBootApplication
public class FwConfigAmqpClientApplication {
public static void main(String[] args) {
SpringApplication.run(FwConfigAmqpClientApplication.class, args);
}
}
1.3 配置文件bootstrap.yml
这里我们选用基于git 且使用服务发现的方式(先把Config Server
启动起来)
较fw-cloud-config-native-client
我们吧添加了rabbitmq
相关的配置信息
并且把bus-refresh接口暴漏出来
server:
port: 8779
spring:
application:
name: fw-register-eureka-client
cloud:
config:
profile: dev
label: master
discovery: #基于服务发现的
enabled: true
service-id: fw-config-server
rabbitmq:
host: localhost
port: 5672
username: fwcloud
password: fwcloud
management:
endpoints:
web:
exposure:
include: refresh,health,info,bus-refresh
注意:Spring boot 2.0的改动较大,
/ bus / refresh
全部整合到执行器里面了,变成了/ actuator / bus-refresh
,所以之前1.x的management.security.enabled
全部失效,不适用于2.0 ,2.0的性能配置是这样的:
management:
endpoints:
web:
exposure:
include:*
1.4 启动项目并postman 测试
localhost:8779/api/version get请求
返回结果
eureka-2.0
修改version的值为3.0,提交git并postman测试
localhost:8779/actuator/bus-refresh post请求
返回结果
eureka-3.0
1.5设置自动触发
配置一个可以直接访问的地址,不要是localhost,可以再每次变更之后动态刷新配置
2.小结
我们已经通过Spring Cloud Bus 与 Spring Cloud Config的整合,并已RabbitMQ作为消息代理,实现了配置的动态更新
Kafka 介绍
1. 什么是Kafka
Apache Kafka发源于LinkedIn,于2011年成为Apache的孵化项目,随后于2012年成为Apache的主要项目之一。Kafka使用Scala和Java进行编写。Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统。Kafka具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。
2. Kafka 的特点
- 高吞吐量(顺序读写)
- 高可靠性(数据备份)
- 可持久化(本次磁盘持久化)
- 可扩展性(集群热扩(缩)容)
- 分区内有序
-
3. Kafka的基本概念
Producer
消息生产者,就是向kafka broker发消息的客户端。- Consumer
消息消费者,是消息的使用方,负责消费Kafka服务器上的消息。 - Topic
主题,由用户定义并配置在Kafka服务器,用于建立Producer和Consumer之间的订阅关系。生产者发送消息到指定的Topic下,消息者从这个Topic下消费消息。 - Partition
消息分区,一个topic可以分为多个 partition,每个
partition是一个有序的队列。partition中的每条消息都会被分配一个有序的
id(offset)。 - Broker
一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。 - Consumer Group
消费者分组,用于归组同类消费者。每个consumer属于一个特定的consumer group,多个消费者可以共同消息一个Topic下的消息,每个消费者消费其中的部分消息,这些消费者就组成了一个分组,拥有同一个分组名称,通常也被称为消费者集群。 Offset
消息在partition中的偏移量。每一条消息在partition都有唯一的偏移量,消息者可以指定偏移量来指定要消费的消息。4.Kafka具有四个核心API
使用 Producer API 发布消息到kafka集群中一个或多个topic。
- 使用 Consumer API 来订阅一个或多个topic,并处理产生的消息。
- 使用 Streams API 充当一个流处理器,从1个或多个topic消费输入流,并生产输出流到1个或多个输出topic,有效地将输入流转换到输出流。
- 使用Connector API可以构建和运行可重复使用的生产者或消费者,将topic连接到现有的应用程序或数据系统。例如,针对关系型数据库的连接器可以捕获到表的每个变化。
kafka 官网https://kafka.apache.org/
Kafka 安装
本节只说明Windows版的安装(安装包我放置在交流群里面了),Linux 的安装,统一放在Linux 模块
1. 下载
- 打开官网 下载地址
- 我这里已经提供了
kafka_2.11-2.4.0.tgz
,已经传到群里面
解压并进入到 kafka_2.11-2.4.0\bin\windows2. 启动ZooKeeper
zookeeper-server-start.bat ..\..\config\zookeeper.properties
3. 启动Kafka
kafka-server-start.bat ..\..\config\server.properties
4. 创建一个主题
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic fwcloud
5. 查看创建的主题列表
kafka-topics.bat --list --zookeeper localhost:2181
6. 启动生产者
kafka-console-producer.bat --broker-list localhost:9092 --topic fwcloud
7. 启动消费者
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic fwcloud --from-beginning
8.验证
在服务端输入hello world、nihao
客户端就会把hello world、nihao
输出出来启动kafka保错:命令过长语法不正确 解决方法就是:把kafka的目录从桌面移到别处,因为目录层级太深或者是目录名字太长导致的。比如放到D盘根目录
9.zkui 安装
9.1 下载源码包
地址:https://github.com/DeemOpen/zkui
cmd 到pom.xml 所在的文件夹
mvn clean install
9.2 启动
将config.cfg copy到 target包内
java -jar zkui-2.0-SNAPSHOT-jar-with-dependencies.jar
9.3 浏览器验证
默认地址http://localhost:9090
默认账号密码:admin/manager
9.4 配置修改
Kafka 使用
本节代码地址
GitHub:https://github.com/xuyisu/fw-sping-cloud/tree/master/fw-cloud-mq/fw-cloud-mq-kafka
前面我们讲解了RabbitMQ,也了解了RabbitMQ的工作模式,下面我们来看一下Kafka 是如何进行消息发送的,Kafka 号称性能怪兽,吞吐量非常高,当然kafka 设计之初就是为了解决高吞吐量的问题。Kafka 也一直被用与大数据、海量日志的处理等问题。
1.新建项目
新建项目模块fw-cloud-mq-kafka
用来测试Kafka 的生产和消息,本文并不会测试Kafka 的海量数据,仅仅为了Kafka的使用。
2.maven 配置
配置中我们主要引入了spring-kafka
,这是Spring 为Kafka 提供的工具包
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
3.新建启动类
本项目并没有加入到微服务中,如果有需要,读者可以自行加入,当然如果加入了可以从RESTFUL接口接收数据,再推送到对应的Topic 中。
@SpringBootApplication
public class FwKafkaMqApplication {
public static void main(String[] args) {
SpringApplication.run(FwKafkaMqApplication.class, args);
}
}
4.应用配置
server:
port: 8781
spring:
application:
name: fw-cloud-mq-kafka
kafka:
bootstrap-servers: localhost:9092
producer:
acks: 1
retries: 0
batch-size: 16384
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: testGroup
auto-offset-reset: earliest
enable-auto-commit: true
auto-commit-interval: 100
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
kafka 每个配置对应的意思如下:
- kafka.bootstrap-servers
指定kafka server的地址,集群配多个,中间,逗号隔开 - kafka.producer.asks
可以设置的值为:all, -1, 0, 1
procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。 - kafka.producer.retries
写入失败时,重试次数。当leader节点失效,一个副本节点会替代成为leader节点,此时可能出现写入失败。
当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。 - kafka.producer.batch-size
每次批量发送消息的数量,produce积累到一定数据,一次发送 - kafka.producer.key-serializer
指定消息key编解码方式 - kafka.producer.value-serializer
指定消息体的编解码方式 - kafka.consumer.group-id
指定默认消费者group id —> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名 - kafka.consumer.auto-offset-reset
smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest - kafka.consumer.enable-auto-commit
设置自动提交offset - kafka.consumer.auto-commit-interval
如果’enable.auto.commit’为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。 - kafka.consumer.key-deserializer
指定消息key编解码方式 - kafka.consumer.value-deserializer
指定消息体编解码方式
当然kafka 还有很多配置,我们简单使用这些已经够了5.新建发送方
发送方需要引入KafkaTemplate,用来发送消息,发送的时候我们指定消息的topic 和内容。并把消息内容打印出来。并把发送类注册为组件。/**
* @author xuyisu
* @description 发送方
* @date 2019/12/18
*/
@Component
@Slf4j
public class FwSender {
@Autowired
private KafkaTemplate<String,Object> kafkaTemplate;
public boolean send(){
String message="Hello World:"+ DateUtil.now();
log.info("FwSender:"+message);
//第一个参数是topic,第二个参数是内容
kafkaTemplate.send("fwcloud",message);
return true;
}
}
6.新建消费方
消费放需要添加一个公共方法并设置@KafkaListener和需要监听的Topic,就可以实现消息的监听并消费了。/**
* @author xuyisu
* @description 接收方
* @date 2019/12/18
*/
@Component
@Slf4j
public class FwReceiver {
@KafkaListener(topics = "fwcloud")
public void onMessage(String message){
log.info(message);
}
}
7.启动项目
启动项目前,需要先启动kafka
然后启动单元测试FwSenderTest发送消息
通过控制台我们看推送的消息
然后我们再看一下消费的信息2020-01-12 15:01:34 INFO main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
2020-01-12 15:01:34 INFO main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
2020-01-12 15:01:34 INFO main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
2020-01-12 15:01:34 INFO main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
2020-01-12 15:01:34 INFO main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
2020-01-12 15:01:34 INFO main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
2020-01-12 15:01:34 INFO main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
2020-01-12 15:01:34 INFO main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
2020-01-12 15:01:34 INFO main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
2020-01-12 15:01:35 INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
2020-01-12 15:01:35 INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
2020-01-12 15:01:35 INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
2020-01-12 15:01:35 INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
2020-01-12 15:01:35 INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
2020-01-12 15:01:35 INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
2020-01-12 15:01:35 INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
2020-01-12 15:01:35 INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
2020-01-12 15:01:35 INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
2020-01-12 15:01:35 INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
Spring Cloud Config 改造(Kafka)
本节代码地址
前面我们使用了RabbitMQ 来改造Spring Cloud Config,这节我们使用Kafka 来改造Spring Cloud Config。在RabbitMQ 那一节使用的是spring-cloud-starter-bus-amqp
包,和RabbitMQ 那一节集成 Spring Cloud Bus 集成的区别是包和连接配置有区别,下面我们看下Kafka的依赖包。
1.新建项目
我们新建一个客户端项目用来演示Spring Cloud Bus、Kafka 的示例
2.maven 配置
需要引入spring-cloud-starter-bus-kafka
包,这个Spring Cloud Bus 和Kafka 集成的工具包,未开发节省了很多操作。
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>
</dependencies>
3.新建启动类
这里和RabbitMQ 集成Sring Cloud Bus 是一样的
@EnableDiscoveryClient
@SpringBootApplication
public class FwConfigKafkaClientApplication {
public static void main(String[] args) {
SpringApplication.run(FwConfigKafkaClientApplication.class, args);
}
}
4. 应用配置
配置中和RabbitMQ 集成的区别主要是讲RabbitMQ 的连接信息换成Kafka的连接信息,确保Kafka 是启动的。同时我们将应用的健康信息和bus-refresh接口暴露出去
server:
port: 8779
spring:
application:
name: fw-register-eureka-client
cloud:
config: #自己指定的和服务发现的2选1
# uri: http://localhost:8778/ 自己指定的
profile: dev
label: master
discovery: #基于服务发现的
enabled: true
service-id: fw-config-server
# kafka
stream:
kafka:
binder:
brokers: localhost:9092
bus:
trace:
enabled: true
management:
endpoints:
web:
exposure:
include: refresh,health,info,bus-refresh
5. 启动项目
先将fw-cloud-config-native-server
启动起来,然后启动本项目,否则会报错,获取不到配置信息
浏览器或者Postman 测试localhost:8779/api/version
如果修改了服务的配置,例如git 仓库修改的配置,通过localhost:8779/actuator/bus-refresh即可给全部客户端刷新配置,这里和RabbitMQ 集成Spring Cloud Bus那一节一样的,可以回头再看一下。