消息总线Spring Cloud Bus


Spring cloud bus通过轻量消息代理连接各个分布的节点。这会用在广播状态的变化(例如配置变化)或者其他的消息指令。Spring bus的一个核心思想是通过分布式的启动器对spring boot应用进行扩展,也可以用来建立一个多个应用之间的通信频道。目前唯一实现的方式是用AMQP消息代理作为通道,同样特性的设置(有些取决于通道的设置)在更多通道的文档中。
SpringCloud Bus会向外提供一个http接口,即下图中的/ actuator / bus-refresh。我们将这个接口配置到gitwebhook上,当git上的内容发生改变时,就会自动调用/ actuator / bus-refresh接口。Bus就会通知ConfigServerConfigServer会发布更新消息到消息总线的消息队列,其他服务订阅到该消息就会信息刷新,从而实现整个微服务进行自动刷新。

MQ 中间件的作用

MQ中间件就是我们常说的消息代理,主要用于消息的接收和转发消息,可以将消息生产者和消息消费者完全解耦,不必直接调用对方的API。一般常用在邮件服务、短信服务、日志服务等。
消息总线Spring Cloud Bus - 图1
目前常用的MQ有:

  • Kafka
  • RabbitMQ
  • RocketMQ(阿里开源)
  • ActiveMQ等

Spring Cloud Bus 目前仅支持两款MQ中间件: RabbitMQKafka。后面讲主要介绍这两种中间件配合Spring Cloud Bus 的使用

RabbitMQ 介绍

1. 什么是RabbitMQ

RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现,是一个轻量级的消息中间件,为很多语言提供了调用工具,例如:Java,.NET,PHP,Python,JavaScript,Ruby,Go等。
AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。

2. RabbitMQ 的特点

RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:

  1. 可靠性(Reliability)
  2. 灵活的路由(Flexible Routing)
  3. 消息集群(Clustering)
  4. 高可用(Highly Available Queues)
  5. 多种协议(Multi-protocol)
  6. 多语言客户端(Many Clients)
  7. 管理界面(Management UI)
  8. 跟踪机制(Tracing)
  9. 插件机制(Plugin System)

    3. RabbitMQ 实现原理

    RabbitMQ 是 AMQP 协议的一个开源实现,所以其内部实际上也是 AMQP 中的基本概念
    消息总线Spring Cloud Bus - 图2

  10. Message
    消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

  11. Publisher
    消息的生产者,也是一个向交换器发布消息的客户端应用程序。
  12. Exchange
    交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
  13. Binding
    绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
  14. Queue
    消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
  15. Connection
    网络连接,比如一个TCP连接。
  16. Channel
    信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
  17. Consumer
    消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
  18. Virtual Host
    虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
  19. Broker
    表示消息队列服务器实体。

    4.RabbitMQ 工作模式

    官网介绍:https://www.rabbitmq.com/getstarted.html
    RabbitMQ 有6中工作模式,分别是:
  • simple模式(即最简单的收发模式):一个生产者,一个消费者
    消息总线Spring Cloud Bus - 图3
  • work工作模式:一个生产者,多个消费者,每个消费者获取到的消息唯一。
    消息总线Spring Cloud Bus - 图4
  • publish/subscribe发布订阅:一个生产者发送的消息会被多个消费者获取。
    消息总线Spring Cloud Bus - 图5
  • routing路由模式:发送消息到交换机并且要指定路由key ,消费者将队列绑定到交换机时需要指定路由key
    消息总线Spring Cloud Bus - 图6
  • topic 主题模式:路由模式的一种,将路由键和某模式进行匹配,此时队列需要绑定在一个模式上,“#”匹配一个词或多个词,“*”只匹配一个词。
    消息总线Spring Cloud Bus - 图7
  • RPC模式: 当客户端启动时,创建一个匿名的回调队列。客户端为RPC请求设置2个属性:replyTo,设置回调队列名字;correlationId,标记request。请求被发送到rpc_queue队列中。RPC服务器端监听rpc_queue队列中的请求,当请求到来时,服务器端会处理并且把带有结果的消息发送给客户端。接收的队列就是replyTo设定的回调队列。客户端监听回调队列,当有消息时,检查correlationId属性,如果与request中匹配,那就是结果了
    消息总线Spring Cloud Bus - 图8
  • 发布者确认:消费者确认解决的问题是确认消息是否被消费者”成功消费”,发布者确认是异步发出的,可以确认单个消息或一组消息,发出确认的确切时刻取决于消息的传递模式(持久性与瞬态)以及消息路由到的队列的属性

本文主要讲解的还是Spring Cloud 微服务,RabbitMQ 不做重点讲解

RabbitMQ 安装

本节只说明Windows版的安装(安装包我放置在交流群里面了),Linux 的安装,统一放在Linux 模块 官网地址:https://www.rabbitmq.com/#getstarted

1. 点击下载

消息总线Spring Cloud Bus - 图9

2. 选择Windows

消息总线Spring Cloud Bus - 图10

3.选择并点击

消息总线Spring Cloud Bus - 图11
我们选择这个推荐的下载,目前的最新版本
消息总线Spring Cloud Bus - 图12

4. Erlang 下载安装

安装RabbitMQ之前需要装Erlang
https://www.erlang-solutions.com/resources/download.html
双击esl-erlang_22.1_windows_amd64.exe,一路next
安装完成之后设置环境变量
消息总线Spring Cloud Bus - 图13
在添加path属性,在path后面添加%ERLANG\_HOME%\bin
消息总线Spring Cloud Bus - 图14
然后在安装目录下,进入dos,并输入erl,如果版本出现,就代表安装成功了。

  1. C:\Program Files\erl10.5\bin>erl
  2. Eshell V10.5 (abort with ^G)
  3. 1>

5. RabbitMQ 安装

双击rabbitmq-server-3.8.2.exe,一路next,安装完成后,在用命令安装RabbitMq-Plugins,进入/sbin

  1. rabbitmq-plugins enable rabbitmq\_management

消息总线Spring Cloud Bus - 图15
如果执行失败,执行rabbitmq-service stop,在输入rqbbitmq-service remove,在输入rabbitmq-service install,在输入rabbit-service start,最后重新输入rabbitmq-plugins enable rabbitmq\_management

6. 安装完之后浏览器访问

http://localhost:15672/#/
默认账号密码guest\guest
消息总线Spring Cloud Bus - 图16
登录进来后,可以看到Connections、Channels、Exchanges、Queues等功能
消息总线Spring Cloud Bus - 图17

7. 服务关闭

Windows 下 快捷键win+R 弹出小窗口,输入services.msc
消息总线Spring Cloud Bus - 图18
可以点击关闭
消息总线Spring Cloud Bus - 图19
关闭和重启以后都可以从这里操作

RabbitMQ 使用

本节代码地址

GitHub: https://github.com/xuyisu/fw-sping-cloud/tree/master/fw-cloud-mq/fw-cloud-mq-rabbitmq


浏览器打开RabbitMQ 客户端,可以自己先熟悉熟悉使用
尝试建一个fwcloud用户
消息总线Spring Cloud Bus - 图20
点击fwcloud用户设置权限
消息总线Spring Cloud Bus - 图21
下面通过一个SpringBoot 整合RabbitMQ 的例子熟悉一下RabbitMQ的功能

1. 新建项目

用于测试Rabbit 的基本功能
消息总线Spring Cloud Bus - 图22

1.1 maven配置

需要引入spring-boot-starter-amqp的包,封装了RabbitMQ 的API。

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-web</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.boot</groupId>
  8. <artifactId>spring-boot-starter-amqp</artifactId>
  9. </dependency>
  10. <dependency>
  11. <groupId>org.springframework.boot</groupId>
  12. <artifactId>spring-boot-starter-test</artifactId>
  13. </dependency>
  14. </dependencies>

1.2 新建启动类

  1. @SpringBootApplication
  2. public class FwRabbitMqApplication {
  3. public static void main(String[] args) {
  4. SpringApplication.run(FwRabbitMqApplication.class, args);
  5. }
  6. }

1.3 新建RabbitMQ配置

使用MQ前,我们需要设置一个Topic ,还可以配置交换器、路由等信息,我们这里以简单使用为主。

  1. /**
  2. * @author xuyisu
  3. * @description RabbitMq配置
  4. * @date 2019/12/18
  5. */
  6. @Configuration
  7. public class RabbitMqConfig {
  8. /**
  9. * 注册一个名为hello的队列
  10. * @return
  11. */
  12. @Bean
  13. public Queue helloQueue(){
  14. return new Queue("hello");
  15. }
  16. }

1.4 应用配置信息

设置RabbitMQ 连接配置信息,这里设置的用户是我们刚刚创建的fwcloud用户

  1. server:
  2. port: 8781
  3. spring:
  4. application:
  5. name: fw-cloud-mq-rabbitmq
  6. rabbitmq:
  7. host: localhost
  8. port: 5672
  9. username: fwcloud
  10. password: fwcloud

1.5 创建消息生产者

通过注入AmqpTemplate 来发送我们自定义产生的消息,AmqpTemplate 已经为我们定义了一套AMQP协议的基本操作。我们也将信息通过log日志打印出来

  1. /**
  2. * @author xuyisu
  3. * @description 发送方
  4. * @date 2019/12/18
  5. */
  6. @Component
  7. @Slf4j
  8. public class FwSender {
  9. @Autowired
  10. private AmqpTemplate amqpTemplate;
  11. public void send(){
  12. String message="Hello World:"+ DateUtil.now();
  13. log.info("FwSender:"+message);
  14. //第一个参数是topic,第二个参数是内容
  15. amqpTemplate.convertAndSend("hello",message);
  16. }
  17. }

1.6 创建消息消费者

通过定义@RabbitListener来监听topic为hello的消息,并且通过@RabbitHandler来指定消息的处理。

  1. /**
  2. * @author xuyisu
  3. * @description 接收方
  4. * @date 2019/12/18
  5. */
  6. @Component
  7. @RabbitListener(queues = "hello")
  8. @Slf4j
  9. public class FwReceiver {
  10. @RabbitHandler
  11. public void process(String msg){
  12. log.info("FwReceiver:{}",msg);
  13. }
  14. }

1.7 启动项目

启动之后,程序创建了一个和127.0.0.1:5672的连接,用户名是fwcloud

  1. 2019-12-18 14:56:16 INFO main org.springframework.amqp.rabbit.connection.CachingConnectionFactory Attempting to connect to: [localhost:5672]
  2. 2019-12-18 14:56:16 INFO main org.springframework.amqp.rabbit.connection.CachingConnectionFactory Created new connection: rabbitConnectionFactory#7a583586:0/SimpleConnection@33f17289 [delegate=amqp://fwcloud@127.0.0.1:5672/, localPort= 51546]

通过RabbitMQ 的web 管理可以看到Connections和Channels连接信息

1.7.1. Connections

消息总线Spring Cloud Bus - 图23

1.7.2. Channels

消息总线Spring Cloud Bus - 图24

1.8. 单元测试 发送数据sender()

  1. 2019-12-18 14:49:37.501 INFO 28284 --- [ main] com.yisu.mq.rabbitmq.sender.FwSender : FwSender:Hello World:2019-12-18 14:49:37
  2. 2019-12-18 14:49:37.513 INFO 28284 --- [ main] com.yisu.mq.rabbitmq.sender.FwSender : FwSender:Hello World:2019-12-18 14:49:37
  3. 2019-12-18 14:49:37.514 INFO 28284 --- [ main] com.yisu.mq.rabbitmq.sender.FwSender : FwSender:Hello World:2019-12-18 14:49:37
  4. 2019-12-18 14:49:37.514 INFO 28284 --- [ main] com.yisu.mq.rabbitmq.sender.FwSender : FwSender:Hello World:2019-12-18 14:49:37
  5. 2019-12-18 14:49:37.515 INFO 28284 --- [ main] com.yisu.mq.rabbitmq.sender.FwSender : FwSender:Hello World:2019-12-18 14:49:37
  6. 2019-12-18 14:49:37.515 INFO 28284 --- [ main] com.yisu.mq.rabbitmq.sender.FwSender : FwSender:Hello World:2019-12-18 14:49:37
  7. 2019-12-18 14:49:37.515 INFO 28284 --- [ main] com.yisu.mq.rabbitmq.sender.FwSender : FwSender:Hello World:2019-12-18 14:49:37
  8. 2019-12-18 14:49:37.516 INFO 28284 --- [ main] com.yisu.mq.rabbitmq.sender.FwSender : FwSender:Hello World:2019-12-18 14:49:37
  9. 2019-12-18 14:49:37.516 INFO 28284 --- [ main] com.yisu.mq.rabbitmq.sender.FwSender : FwSender:Hello World:2019-12-18 14:49:37
  10. 2019-12-18 14:49:37.517 INFO 28284 --- [ main] com.yisu.mq.rabbitmq.sender.FwSender : FwSender:Hello World:2019-12-18 14:49:37

1.9. 队列已经存在

消息总线Spring Cloud Bus - 图25

1.10. 接收端数据

  1. 2019-12-18 15:00:44 INFO org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1 com.yisu.mq.rabbitmq.consumer.FwReceiver FwReceiver:Hello World:2019-12-18 15:00:43
  2. 2019-12-18 15:00:44 INFO org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1 com.yisu.mq.rabbitmq.consumer.FwReceiver FwReceiver:Hello World:2019-12-18 15:00:43
  3. 2019-12-18 15:00:44 INFO org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1 com.yisu.mq.rabbitmq.consumer.FwReceiver FwReceiver:Hello World:2019-12-18 15:00:43
  4. 2019-12-18 15:00:44 INFO org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1 com.yisu.mq.rabbitmq.consumer.FwReceiver FwReceiver:Hello World:2019-12-18 15:00:43
  5. 2019-12-18 15:00:44 INFO org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1 com.yisu.mq.rabbitmq.consumer.FwReceiver FwReceiver:Hello World:2019-12-18 15:00:43
  6. 2019-12-18 15:00:44 INFO org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1 com.yisu.mq.rabbitmq.consumer.FwReceiver FwReceiver:Hello World:2019-12-18 15:00:43
  7. 2019-12-18 15:00:44 INFO org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1 com.yisu.mq.rabbitmq.consumer.FwReceiver FwReceiver:Hello World:2019-12-18 15:00:43
  8. 2019-12-18 15:00:43 INFO org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1 com.yisu.mq.rabbitmq.consumer.FwReceiver FwReceiver:Hello World:2019-12-18 15:00:43
  9. 2019-12-18 15:00:44 INFO org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1 com.yisu.mq.rabbitmq.consumer.FwReceiver FwReceiver:Hello World:2019-12-18 15:00:43
  10. 2019-12-18 15:00:44 INFO org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1 com.yisu.mq.rabbitmq.consumer.FwReceiver FwReceiver:Hello World:2019-12-18 15:00:43

上面只是演示了最简单的收发模式,RabbitMQ还有跟多特性(订阅模式、路由模式、RPC模式等),读者可以到官网查看:https://www.rabbitmq.com,也可以购买书籍阅读。

Spring Cloud Config 改造(RabbitMQ)

本节代码地址

GitHub: https://github.com/xuyisu/fw-sping-cloud/tree/master/fw-cloud-config-center/fw-cloud-config-amqp-client


在config配置那一节我们提过,如果一个ConfigServer 有多个客户端,ConfigServer 修改了数据,一个个刷新客户端的/actuator/refresh 对开发很不友好,下面我们通过引入Spring Cloud Bus 总线来解决问题,ConfigServer 变更后推送给总线,由总线来向各个客户端刷新配置。过程如下图:
消息总线Spring Cloud Bus - 图26
下面我们新建一个项目来演示

1.新建项目fw-cloud-config-amqp-client

消息总线Spring Cloud Bus - 图27

1.1 maven 配置

fw-cloud-config-native-client多加了一个spring-cloud-starter-bus-amqp

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-web</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.cloud</groupId>
  8. <artifactId>spring-cloud-starter-config</artifactId>
  9. </dependency>
  10. <dependency>
  11. <groupId>org.springframework.boot</groupId>
  12. <artifactId>spring-boot-starter-actuator</artifactId>
  13. </dependency>
  14. <dependency>
  15. <groupId>org.springframework.cloud</groupId>
  16. <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
  17. </dependency>
  18. <dependency>
  19. <groupId>org.springframework.cloud</groupId>
  20. <artifactId>spring-cloud-starter-bus-amqp</artifactId>
  21. </dependency>
  22. </dependencies>

1.2 新建FwConfigAmqpClientApplication启动类

  1. @EnableDiscoveryClient
  2. @SpringBootApplication
  3. public class FwConfigAmqpClientApplication {
  4. public static void main(String[] args) {
  5. SpringApplication.run(FwConfigAmqpClientApplication.class, args);
  6. }
  7. }

1.3 配置文件bootstrap.yml

这里我们选用基于git 且使用服务发现的方式(先把Config Server启动起来)
fw-cloud-config-native-client我们吧添加了rabbitmq相关的配置信息
并且把bus-refresh接口暴漏出来

  1. server:
  2. port: 8779
  3. spring:
  4. application:
  5. name: fw-register-eureka-client
  6. cloud:
  7. config:
  8. profile: dev
  9. label: master
  10. discovery: #基于服务发现的
  11. enabled: true
  12. service-id: fw-config-server
  13. rabbitmq:
  14. host: localhost
  15. port: 5672
  16. username: fwcloud
  17. password: fwcloud
  18. management:
  19. endpoints:
  20. web:
  21. exposure:
  22. include: refresh,health,info,bus-refresh

注意:Spring boot 2.0的改动较大,/ bus / refresh全部整合到执行器里面了,变成了/ actuator / bus-refresh,所以之前1.x的management.security.enabled全部失效,不适用于2.0 ,2.0的性能配置是这样的:

  1. management
  2. endpoints
  3. web
  4. exposure
  5. include:*

1.4 启动项目并postman 测试

localhost:8779/api/version get请求
返回结果
eureka-2.0
修改version的值为3.0,提交git并postman测试
localhost:8779/actuator/bus-refresh post请求
返回结果
eureka-3.0

1.5设置自动触发

配置一个可以直接访问的地址,不要是localhost,可以再每次变更之后动态刷新配置
消息总线Spring Cloud Bus - 图28

2.小结

我们已经通过Spring Cloud Bus 与 Spring Cloud Config的整合,并已RabbitMQ作为消息代理,实现了配置的动态更新

Kafka 介绍

1. 什么是Kafka

Apache Kafka发源于LinkedIn,于2011年成为Apache的孵化项目,随后于2012年成为Apache的主要项目之一。Kafka使用Scala和Java进行编写。Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统。Kafka具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。

2. Kafka 的特点

  • 高吞吐量(顺序读写)
  • 高可靠性(数据备份)
  • 可持久化(本次磁盘持久化)
  • 可扩展性(集群热扩(缩)容)
  • 分区内有序
  • 生产者与消费者多样性(支持多语言)

    3. Kafka的基本概念

  • Producer
    消息生产者,就是向kafka broker发消息的客户端。

  • Consumer
    消息消费者,是消息的使用方,负责消费Kafka服务器上的消息。
  • Topic
    主题,由用户定义并配置在Kafka服务器,用于建立Producer和Consumer之间的订阅关系。生产者发送消息到指定的Topic下,消息者从这个Topic下消费消息。
  • Partition
    消息分区,一个topic可以分为多个 partition,每个
    partition是一个有序的队列。partition中的每条消息都会被分配一个有序的
    id(offset)。
  • Broker
    一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
  • Consumer Group
    消费者分组,用于归组同类消费者。每个consumer属于一个特定的consumer group,多个消费者可以共同消息一个Topic下的消息,每个消费者消费其中的部分消息,这些消费者就组成了一个分组,拥有同一个分组名称,通常也被称为消费者集群。
  • Offset
    消息在partition中的偏移量。每一条消息在partition都有唯一的偏移量,消息者可以指定偏移量来指定要消费的消息。

    4.Kafka具有四个核心API

    消息总线Spring Cloud Bus - 图29

  • 使用 Producer API 发布消息到kafka集群中一个或多个topic。

  • 使用 Consumer API 来订阅一个或多个topic,并处理产生的消息。
  • 使用 Streams API 充当一个流处理器,从1个或多个topic消费输入流,并生产输出流到1个或多个输出topic,有效地将输入流转换到输出流。
  • 使用Connector API可以构建和运行可重复使用的生产者或消费者,将topic连接到现有的应用程序或数据系统。例如,针对关系型数据库的连接器可以捕获到表的每个变化。

kafka 官网https://kafka.apache.org/

Kafka 安装

本节只说明Windows版的安装(安装包我放置在交流群里面了),Linux 的安装,统一放在Linux 模块

1. 下载

  1. 打开官网 下载地址
    消息总线Spring Cloud Bus - 图30
  2. 我这里已经提供了kafka_2.11-2.4.0.tgz,已经传到群里面
    解压并进入到 kafka_2.11-2.4.0\bin\windows

    2. 启动ZooKeeper

    1. zookeeper-server-start.bat ..\..\config\zookeeper.properties
    消息总线Spring Cloud Bus - 图31

    3. 启动Kafka

    1. kafka-server-start.bat ..\..\config\server.properties
    消息总线Spring Cloud Bus - 图32

    4. 创建一个主题

    1. kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic fwcloud
    消息总线Spring Cloud Bus - 图33

    5. 查看创建的主题列表

    1. kafka-topics.bat --list --zookeeper localhost:2181
    消息总线Spring Cloud Bus - 图34

    6. 启动生产者

    1. kafka-console-producer.bat --broker-list localhost:9092 --topic fwcloud

    7. 启动消费者

    1. kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic fwcloud --from-beginning
    消息总线Spring Cloud Bus - 图35

    8.验证

    在服务端输入hello world、nihao 客户端就会把hello world、nihao输出出来
    消息总线Spring Cloud Bus - 图36

    启动kafka保错:命令过长语法不正确 解决方法就是:把kafka的目录从桌面移到别处,因为目录层级太深或者是目录名字太长导致的。比如放到D盘根目录

9.zkui 安装

zookeeper节点的可视化操作界面,比较方便

9.1 下载源码包

地址:https://github.com/DeemOpen/zkui
cmd 到pom.xml 所在的文件夹

  1. mvn clean install

消息总线Spring Cloud Bus - 图37

9.2 启动

将config.cfg copy到 target包内

  1. java -jar zkui-2.0-SNAPSHOT-jar-with-dependencies.jar

消息总线Spring Cloud Bus - 图38

9.3 浏览器验证

默认地址http://localhost:9090
默认账号密码:admin/manager
消息总线Spring Cloud Bus - 图39

9.4 配置修改

修改config.cfg,并和编译好的jar包放在一起
消息总线Spring Cloud Bus - 图40

Kafka 使用


本节代码地址

GitHub:https://github.com/xuyisu/fw-sping-cloud/tree/master/fw-cloud-mq/fw-cloud-mq-kafka


前面我们讲解了RabbitMQ,也了解了RabbitMQ的工作模式,下面我们来看一下Kafka 是如何进行消息发送的,Kafka 号称性能怪兽,吞吐量非常高,当然kafka 设计之初就是为了解决高吞吐量的问题。Kafka 也一直被用与大数据、海量日志的处理等问题。

1.新建项目

新建项目模块fw-cloud-mq-kafka用来测试Kafka 的生产和消息,本文并不会测试Kafka 的海量数据,仅仅为了Kafka的使用。
消息总线Spring Cloud Bus - 图41

2.maven 配置

配置中我们主要引入了spring-kafka,这是Spring 为Kafka 提供的工具包

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-web</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.kafka</groupId>
  8. <artifactId>spring-kafka</artifactId>
  9. </dependency>
  10. <dependency>
  11. <groupId>org.springframework.boot</groupId>
  12. <artifactId>spring-boot-starter-test</artifactId>
  13. </dependency>
  14. </dependencies>

3.新建启动类

本项目并没有加入到微服务中,如果有需要,读者可以自行加入,当然如果加入了可以从RESTFUL接口接收数据,再推送到对应的Topic 中。

  1. @SpringBootApplication
  2. public class FwKafkaMqApplication {
  3. public static void main(String[] args) {
  4. SpringApplication.run(FwKafkaMqApplication.class, args);
  5. }
  6. }

4.应用配置

  1. server:
  2. port: 8781
  3. spring:
  4. application:
  5. name: fw-cloud-mq-kafka
  6. kafka:
  7. bootstrap-servers: localhost:9092
  8. producer:
  9. acks: 1
  10. retries: 0
  11. batch-size: 16384
  12. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  13. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  14. consumer:
  15. group-id: testGroup
  16. auto-offset-reset: earliest
  17. enable-auto-commit: true
  18. auto-commit-interval: 100
  19. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  20. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

kafka 每个配置对应的意思如下:

  • kafka.bootstrap-servers
    指定kafka server的地址,集群配多个,中间,逗号隔开
  • kafka.producer.asks
    可以设置的值为:all, -1, 0, 1
    procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
    acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
    acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
    acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
  • kafka.producer.retries
    写入失败时,重试次数。当leader节点失效,一个副本节点会替代成为leader节点,此时可能出现写入失败。
    当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
  • kafka.producer.batch-size
    每次批量发送消息的数量,produce积累到一定数据,一次发送
  • kafka.producer.key-serializer
    指定消息key编解码方式
  • kafka.producer.value-serializer
    指定消息体的编解码方式
  • kafka.consumer.group-id
    指定默认消费者group id —> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
  • kafka.consumer.auto-offset-reset
    smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
  • kafka.consumer.enable-auto-commit
    设置自动提交offset
  • kafka.consumer.auto-commit-interval
    如果’enable.auto.commit’为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
  • kafka.consumer.key-deserializer
    指定消息key编解码方式
  • kafka.consumer.value-deserializer
    指定消息体编解码方式
    当然kafka 还有很多配置,我们简单使用这些已经够了

    5.新建发送方

    发送方需要引入KafkaTemplate,用来发送消息,发送的时候我们指定消息的topic 和内容。并把消息内容打印出来。并把发送类注册为组件。
    1. /**
    2. * @author xuyisu
    3. * @description 发送方
    4. * @date 2019/12/18
    5. */
    6. @Component
    7. @Slf4j
    8. public class FwSender {
    9. @Autowired
    10. private KafkaTemplate<String,Object> kafkaTemplate;
    11. public boolean send(){
    12. String message="Hello World:"+ DateUtil.now();
    13. log.info("FwSender:"+message);
    14. //第一个参数是topic,第二个参数是内容
    15. kafkaTemplate.send("fwcloud",message);
    16. return true;
    17. }
    18. }

    6.新建消费方

    消费放需要添加一个公共方法并设置@KafkaListener和需要监听的Topic,就可以实现消息的监听并消费了。
    1. /**
    2. * @author xuyisu
    3. * @description 接收方
    4. * @date 2019/12/18
    5. */
    6. @Component
    7. @Slf4j
    8. public class FwReceiver {
    9. @KafkaListener(topics = "fwcloud")
    10. public void onMessage(String message){
    11. log.info(message);
    12. }
    13. }

    7.启动项目

    启动项目前,需要先启动kafka
    然后启动单元测试FwSenderTest发送消息
    通过控制台我们看推送的消息
    1. 2020-01-12 15:01:34 INFO main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
    2. 2020-01-12 15:01:34 INFO main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
    3. 2020-01-12 15:01:34 INFO main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
    4. 2020-01-12 15:01:34 INFO main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
    5. 2020-01-12 15:01:34 INFO main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
    6. 2020-01-12 15:01:34 INFO main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
    7. 2020-01-12 15:01:34 INFO main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
    8. 2020-01-12 15:01:34 INFO main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
    9. 2020-01-12 15:01:34 INFO main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
    然后我们再看一下消费的信息
    1. 2020-01-12 15:01:35 INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
    2. 2020-01-12 15:01:35 INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
    3. 2020-01-12 15:01:35 INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
    4. 2020-01-12 15:01:35 INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
    5. 2020-01-12 15:01:35 INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
    6. 2020-01-12 15:01:35 INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
    7. 2020-01-12 15:01:35 INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
    8. 2020-01-12 15:01:35 INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
    9. 2020-01-12 15:01:35 INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
    10. 2020-01-12 15:01:35 INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34

    Spring Cloud Config 改造(Kafka)


本节代码地址

GitHub:https://github.com/xuyisu/fw-sping-cloud/tree/master/fw-cloud-config-center/fw-cloud-config-kafka-client


前面我们使用了RabbitMQ 来改造Spring Cloud Config,这节我们使用Kafka 来改造Spring Cloud Config。在RabbitMQ 那一节使用的是spring-cloud-starter-bus-amqp包,和RabbitMQ 那一节集成 Spring Cloud Bus 集成的区别是包和连接配置有区别,下面我们看下Kafka的依赖包。

1.新建项目

我们新建一个客户端项目用来演示Spring Cloud Bus、Kafka 的示例
消息总线Spring Cloud Bus - 图42

2.maven 配置

需要引入spring-cloud-starter-bus-kafka包,这个Spring Cloud Bus 和Kafka 集成的工具包,未开发节省了很多操作。

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-web</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.cloud</groupId>
  8. <artifactId>spring-cloud-starter-config</artifactId>
  9. </dependency>
  10. <dependency>
  11. <groupId>org.springframework.boot</groupId>
  12. <artifactId>spring-boot-starter-actuator</artifactId>
  13. </dependency>
  14. <dependency>
  15. <groupId>org.springframework.cloud</groupId>
  16. <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
  17. </dependency>
  18. <dependency>
  19. <groupId>org.springframework.cloud</groupId>
  20. <artifactId>spring-cloud-starter-bus-kafka</artifactId>
  21. </dependency>
  22. </dependencies>

3.新建启动类

这里和RabbitMQ 集成Sring Cloud Bus 是一样的

  1. @EnableDiscoveryClient
  2. @SpringBootApplication
  3. public class FwConfigKafkaClientApplication {
  4. public static void main(String[] args) {
  5. SpringApplication.run(FwConfigKafkaClientApplication.class, args);
  6. }
  7. }

4. 应用配置

配置中和RabbitMQ 集成的区别主要是讲RabbitMQ 的连接信息换成Kafka的连接信息,确保Kafka 是启动的。同时我们将应用的健康信息和bus-refresh接口暴露出去

  1. server:
  2. port: 8779
  3. spring:
  4. application:
  5. name: fw-register-eureka-client
  6. cloud:
  7. config: #自己指定的和服务发现的2选1
  8. # uri: http://localhost:8778/ 自己指定的
  9. profile: dev
  10. label: master
  11. discovery: #基于服务发现的
  12. enabled: true
  13. service-id: fw-config-server
  14. # kafka
  15. stream:
  16. kafka:
  17. binder:
  18. brokers: localhost:9092
  19. bus:
  20. trace:
  21. enabled: true
  22. management:
  23. endpoints:
  24. web:
  25. exposure:
  26. include: refresh,health,info,bus-refresh

5. 启动项目

先将fw-cloud-config-native-server启动起来,然后启动本项目,否则会报错,获取不到配置信息
浏览器或者Postman 测试localhost:8779/api/version
消息总线Spring Cloud Bus - 图43
如果修改了服务的配置,例如git 仓库修改的配置,通过localhost:8779/actuator/bus-refresh即可给全部客户端刷新配置,这里和RabbitMQ 集成Spring Cloud Bus那一节一样的,可以回头再看一下。