1 概述

1.1 Spring Cloud Bus是什么?

  • Spring Cloud Bus配置Spring Cloud Config使用可以实现配置的动态刷新。

Spring Cloud Bus是什么.jpg

  • Spring Cloud Bus是用来分布式系统的节点和轻量级消息系统连接起来的框架,它整合了Java事件处理机制和消息中间件的功能。
  • Spring Cloud Bus目前支持RabbitMQ和Kafka。

1.2 Spring Cloud Bus能干嘛?

  • Spring Cloud Bus能管理和传播分布式系统间的消息,就像一个分布式执行器,可用于广播状态更改、事件推送等,也可以当做微服务间的通信通道。

Spring Cloud Bus能干嘛.png

1.3 为什么称为总线?

1.3.1 什么是总线

  • 在微服务架构的系统中,通常会使用轻量级的消息代理来构建一个共用的消息主题,并让系统中所有微服务实例都连接上来。由于该主题中产生的消息会被所有实例监听和消费,所以称其为消息总线。
  • 在总线上的各个实例,都可以方便的传播一些需要让其他连接在该主题上的实例都知道的消息。

1.3.2 基本原理

  • Config Client实例都监听MQ中的同一个Topic(默认是SpringCloudBus)。当一个服务刷新数据的时候,其他监听到这个主题的服务就会得到通知,然后去更新自身的配置。

2 Spring Cloud Bus动态刷新全局广播

2.1 设计思想

消息总线.png

  • 根据上图我们可以看出Spring Cloud Bus做配置更新的步骤:
  • Spring Cloud Bus(不推荐) - 图4提交代码触发POST请求给bus/refresh。
  • Spring Cloud Bus(不推荐) - 图5server端接收到请求并发送给Spring Cloud Bus。
  • Spring Cloud Bus(不推荐) - 图6Spring Cloud Bus接收到消息并通知给其他客户端。
  • Spring Cloud Bus(不推荐) - 图7其他客户端接收到通知,请求Server端获取最新配置。
  • Spring Cloud Bus(不推荐) - 图8全部客户端获取到最新的配置。

2.2 服务端修改

2.2.1 导入相关依赖

  • 修改部分:
  1. <dependency>
  2. <groupId>org.springframework.cloud</groupId>
  3. <artifactId>spring-cloud-starter-bus-amqp</artifactId>
  4. </dependency>
  • 完整部分:
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xmlns="http://maven.apache.org/POM/4.0.0"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <parent>
  6. <artifactId>spring_cloud_demo</artifactId>
  7. <groupId>org.sunxiaping</groupId>
  8. <version>1.0</version>
  9. </parent>
  10. <modelVersion>4.0.0</modelVersion>
  11. <artifactId>config_server9010</artifactId>
  12. <dependencies>
  13. <dependency>
  14. <groupId>org.springframework.cloud</groupId>
  15. <artifactId>spring-cloud-config-server</artifactId>
  16. </dependency>
  17. <!-- 导入Eureka Client对应的坐标 -->
  18. <dependency>
  19. <groupId>org.springframework.cloud</groupId>
  20. <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
  21. </dependency>
  22. <dependency>
  23. <groupId>org.springframework.cloud</groupId>
  24. <artifactId>spring-cloud-starter-bus-amqp</artifactId>
  25. </dependency>
  26. <dependency>
  27. <groupId>org.springframework.boot</groupId>
  28. <artifactId>spring-boot-starter-actuator</artifactId>
  29. </dependency>
  30. </dependencies>
  31. </project>

2.2.2 修改配置文件

  • application.yml
  1. server:
  2. port: 9010
  3. spring:
  4. application:
  5. name: config-server
  6. # ----修改部分------
  7. # 配置rabbitmq
  8. rabbitmq:
  9. host: 192.168.1.57
  10. port: 5672
  11. username: guest
  12. password: guest
  13. # ----修改部分------
  14. cloud:
  15. config:
  16. server:
  17. git:
  18. # git服务地址
  19. uri: https://gitee.com/AncientFairy/config-repo.git
  20. # 配置git的用户名
  21. username:
  22. # 配置git的密码
  23. password:
  24. search-paths:
  25. - config-repo
  26. # 分支
  27. label: master
  28. # 配置 eureka
  29. eureka:
  30. instance:
  31. # 主机名称:服务名称修改,其实就是向eureka server中注册的实例id
  32. instance-id: config-server:${server.port}
  33. # 显示IP信息
  34. prefer-ip-address: true
  35. client:
  36. service-url: # 此处修改为 Eureka Server的集群地址
  37. defaultZone: http://eureka7001.com:7001/eureka/,http://eureka7002.com:7002/eureka/,http://eureka7003.com:7003/eureka/
  38. # ----修改部分------
  39. # 配置端点
  40. management:
  41. endpoints:
  42. web:
  43. exposure:
  44. include: 'bus-refresh'
  45. # ----修改部分------

2.2.3 启动类

  1. package com.sunxiaping.config;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. import org.springframework.cloud.config.server.EnableConfigServer;
  5. import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
  6. /**
  7. * @author 许大仙
  8. * @version 1.0
  9. * @since 2020-10-09 16:48
  10. */
  11. @SpringBootApplication
  12. @EnableConfigServer //开启配置中心功能
  13. @EnableEurekaClient
  14. public class ConfigServer9010Application {
  15. public static void main(String[] args) {
  16. SpringApplication.run(ConfigServer9010Application.class, args);
  17. }
  18. }

2.3 客户端修改

2.3.1 导入相关依赖

  • 修改部分:
  1. <dependency>
  2. <groupId>org.springframework.cloud</groupId>
  3. <artifactId>spring-cloud-starter-bus-amqp</artifactId>
  4. </dependency>
  • 完整部分:
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xmlns="http://maven.apache.org/POM/4.0.0"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <parent>
  6. <artifactId>spring_cloud_demo</artifactId>
  7. <groupId>org.sunxiaping</groupId>
  8. <version>1.0</version>
  9. </parent>
  10. <modelVersion>4.0.0</modelVersion>
  11. <artifactId>config_client9011</artifactId>
  12. <dependencies>
  13. <dependency>
  14. <groupId>org.springframework.cloud</groupId>
  15. <artifactId>spring-cloud-starter-bus-amqp</artifactId>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.springframework.cloud</groupId>
  19. <artifactId>spring-cloud-starter-config</artifactId>
  20. </dependency>
  21. <dependency>
  22. <groupId>org.springframework.cloud</groupId>
  23. <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
  24. </dependency>
  25. <dependency>
  26. <groupId>org.springframework.boot</groupId>
  27. <artifactId>spring-boot-starter-web</artifactId>
  28. </dependency>
  29. <dependency>
  30. <groupId>org.springframework.boot</groupId>
  31. <artifactId>spring-boot-starter-data-jpa</artifactId>
  32. </dependency>
  33. <dependency>
  34. <groupId>mysql</groupId>
  35. <artifactId>mysql-connector-java</artifactId>
  36. </dependency>
  37. </dependencies>
  38. </project>

2.3.2 修改配置文件

  • bootstrap.yml:
  1. spring:
  2. # ----修改部分------
  3. # 配置rabbitmq
  4. rabbitmq:
  5. host: 192.168.1.57
  6. port: 5672
  7. username: guest
  8. password: guest
  9. # ----修改部分------
  10. cloud:
  11. config:
  12. name: product # 应用名称,需要对应git中配置文件名称的前半部分
  13. profile: dev # 开发环境,需要对应git中配置文件名称的后半部分
  14. label: master # 分支名称
  15. # uri: http://localhost:9010 # config-server的请求地址
  16. discovery: # 服务发现
  17. service-id: config-server
  18. enabled: true # 从Eureka中获取配置信息
  19. # 配置 eureka
  20. eureka:
  21. instance:
  22. # 主机名称:服务名称修改,其实就是向eureka server中注册的实例id
  23. instance-id: service-product-dev:${server.port}
  24. # 显示IP信息
  25. prefer-ip-address: true
  26. client:
  27. service-url: # 此处修改为 Eureka Server的集群地址
  28. defaultZone: http://eureka7001.com:7001/eureka/,http://eureka7002.com:7002/eureka/,http://eureka7003.com:7003/eureka/

2.3.3 启动类

  1. package com.sunxiaping.product;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
  5. @SpringBootApplication
  6. @EnableEurekaClient //开启Eureka Client
  7. public class ProductApplication {
  8. public static void main(String[] args) {
  9. SpringApplication.run(ProductApplication.class, args);
  10. }
  11. }

2.3.4 业务逻辑

  • Product.java
  1. package com.sunxiaping.product.domain;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Getter;
  4. import lombok.NoArgsConstructor;
  5. import lombok.Setter;
  6. import javax.persistence.*;
  7. import java.io.Serializable;
  8. import java.math.BigDecimal;
  9. @Setter
  10. @Getter
  11. @AllArgsConstructor
  12. @NoArgsConstructor
  13. @Entity
  14. @Table(name = "tb_product")
  15. public class Product implements Serializable {
  16. @Id
  17. @GeneratedValue(strategy = GenerationType.IDENTITY)
  18. private Long id;
  19. @Column(name = "product_name")
  20. private String productName;
  21. @Column(name = "status")
  22. private Integer status;
  23. @Column(name = "price")
  24. private BigDecimal price;
  25. @Column(name = "product_desc")
  26. private String productDesc;
  27. @Column(name = "caption")
  28. private String caption;
  29. @Column(name = "inventory")
  30. private String inventory;
  31. }
  • ProductRepository.java
  1. package com.sunxiaping.product.dao;
  2. import com.sunxiaping.product.domain.Product;
  3. import org.springframework.data.jpa.repository.JpaRepository;
  4. import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
  5. import org.springframework.stereotype.Repository;
  6. @Repository
  7. public interface ProductRepository extends JpaRepository<Product, Long>, JpaSpecificationExecutor<Product> {
  8. }
  • ProductService.java
  1. package com.sunxiaping.product.service;
  2. import com.sunxiaping.product.domain.Product;
  3. public interface ProductService {
  4. /**
  5. * 根据id查询
  6. *
  7. * @param id
  8. * @return
  9. */
  10. Product findById(Long id);
  11. /**
  12. * 保存
  13. *
  14. * @param product
  15. */
  16. void save(Product product);
  17. /**
  18. * 更新
  19. *
  20. * @param product
  21. */
  22. void update(Product product);
  23. /**
  24. * 删除
  25. *
  26. * @param id
  27. */
  28. void delete(Long id);
  29. }
  • ProductServiceImpl.java
  1. package com.sunxiaping.product.service.impl;
  2. import com.sunxiaping.product.dao.ProductRepository;
  3. import com.sunxiaping.product.domain.Product;
  4. import com.sunxiaping.product.service.ProductService;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.stereotype.Service;
  7. import javax.transaction.Transactional;
  8. @Service
  9. @Transactional
  10. public class ProductServiceImpl implements ProductService {
  11. @Autowired
  12. private ProductRepository productRepository;
  13. @Override
  14. public Product findById(Long id) {
  15. return this.productRepository.findById(id).orElse(new Product());
  16. }
  17. @Override
  18. public void save(Product product) {
  19. this.productRepository.save(product);
  20. }
  21. @Override
  22. public void update(Product product) {
  23. this.productRepository.save(product);
  24. }
  25. @Override
  26. public void delete(Long id) {
  27. this.productRepository.deleteById(id);
  28. }
  29. }
  • ProductController.java
  1. package com.sunxiaping.product.controller;
  2. import com.sunxiaping.product.domain.Product;
  3. import com.sunxiaping.product.service.ProductService;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.cloud.context.config.annotation.RefreshScope;
  7. import org.springframework.web.bind.annotation.*;
  8. @RestController
  9. @RequestMapping(value = "/product")
  10. @RefreshScope //开启动态刷新
  11. public class ProductController {
  12. @Value("${server.port}")
  13. private String port;
  14. @Value("${spring.cloud.client.ip-address}")
  15. private String ip;
  16. @Autowired
  17. private ProductService productService;
  18. @PostMapping(value = "/save")
  19. public String save(@RequestBody Product product) {
  20. this.productService.save(product);
  21. return "新增成功";
  22. }
  23. @GetMapping(value = "/findById/{id}")
  24. public Product findById(@PathVariable(value = "id") Long id) {
  25. Product product = this.productService.findById(id);
  26. product.setProductName("访问的地址是:" + this.ip + ":" + this.port);
  27. return product;
  28. }
  29. }

2.4 测试

3 Spring Cloud Bus动态刷新定点通知

3.1 概述

  • 有的时候,不需要在刷新服务端的时候,将与之对应的所有客户端都动态刷新,而是只需要刷新具体的某个客户端即可。

3.2 方法

  • 发送POST请求到http://localhost:配置中心的端口号/actuator/bus-refresh/{destination}即可。