- 微服务
- SpringCloud技术栈
- 技术及版本选型
- 项目搭建
- 注册中心
- http://eureka7002.com:7002/eureka/">defaultZone: http://eureka7002.com:7002/eureka/
- server:
- enable-self-preservation: false
- eviction-interval-timer-in-ms: 2000 #2秒钟
- 集群版Eureka构建
- 服务发现Discovery
- Eureka自我保护机制
- Eureka停更
- zookeeper服务注册中心
- Eureka和zookeeper的异同
- Ribbon负载均衡服务调用
- OpenFeign服务调用
- Histrix断路器
- 服务网关 zuul zuul2 gateway
- http://localhost:8001 #匹配后提供服务的路由地址">uri: http://localhost:8001 #匹配后提供服务的路由地址
- - After=2020-09-30T16:06:22.476+08:00[GMT+08:00]
- http://localhost:8001">uri: http://localhost:8001
- - After=2020-09-30T15:06:22.476+08:00[GMT+08:00] #时间
- - Cookie=username,zzyy #cookie
- - Header=X-Request-Id, \d+ #请求要求要有:X-Request-Id属性并且值为整数的正则表达式
- 服务配置 config Nacos
- SpringCloud Bus消息总线
- Config+Bus 项目整改
- rabbitmq相关配置
- SpringCloud Stream 消息驱动
- SpringCloud Sleuth 分布式请求链路追踪
微服务
将单一应用程序划分成一组小的服务,服务之间互相协调、相互配合,为用户提供最终价值。每个服务运行在其独立的进程中,服务与服务之间采用轻量级的通信机制相互沟通(通常是基于HTTP的RESTful API)。
SpringCloud技术栈
技术及版本选型
版本选型 https://spring.io/projects/spring-cloud
详细选型:https://start.spring.io/actuator/info
使用版本
组件升级更新说明
SpringCloud中文翻译 https://www.bookstack.cn/read/spring-cloud-docs/docs-index.md
项目搭建
创建过程
创建聚合父工程、建Module,改Pom,写yml,主启动类,业务类
父工程pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.sengg</groupId>
<artifactId>springcloud</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>pom</packaging>
<modules>
</modules>
<!--统一管理jar包版本-->
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<junit.version>4.12</junit.version>
<log4j.version>1.2.17</log4j.version>
<lombok.version>1.18.10</lombok.version>
<mysql.version>8.0.18</mysql.version>
<druid.version>1.1.16</druid.version>
<mybatis.spring.boot.version>2.1.1</mybatis.spring.boot.version>
</properties>
<!--子模块继承后,提供作用:锁定版本+子modlue不用写groupId和version-->
<!--只约定版本,并不会引进来jar包-->
<dependencyManagement>
<dependencies>
<!--spring boot 2.2.2.RELEASE-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.2.2.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!--spring cloud Hoxton.SR1-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Hoxton.SR1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!--spring cloud alibaba 2.1.0.RELEASE-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2.1.0.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>${druid.version}</version>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>${mybatis.spring.boot.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<fork>true</fork>
<addResources>true</addResources>
</configuration>
</plugin>
</plugins>
</build>
</project>
构建步骤
1.cloud-provider-payment8001 微服务提供者支付module模块
2.cloud-consumer-order80 微服务消费者订module单模块
3.上两个端口之间使用RestTemplate方式调用。 之后使用RPC远程调用
@Configuration //将restTemplateConfig加入bean工厂
public class RestTemplateConfig {
@Bean
public RestTemplate getRestTemplate(){
return new RestTemplate();
}
}
///////// 业务接口
@RestController
@RequestMapping("consumer")
public class Ordercontroller {
public static final String PAYMENT_URL="http://localhost:8001";
@Autowired
private RestTemplate restTemplate;
@PostMapping("/payment/create")
public R create(@RequestBody Payment payment){
R r = restTemplate.postForObject(PAYMENT_URL + "/payment/create", payment, R.class);
return r;
}
@GetMapping("/getPaymentById/{id}")
public R getPaymentById(@PathVariable("id")Long id){
return restTemplate.getForObject(PAYMENT_URL+"/payment/get/"+id,R.class);
}
}
注册中心
单机版Eureka构建
Eureka两大组件,服务注册和服务发现
服务注册:将服务信息注册进注册中心
服务发现:从注册中心上获取服务信息
实质:存key服务名,去value调动地址
创建Eureka注册中心 cloud-eureka-server7001
pom依赖
<!-- eureka-server --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId> </dependency>
yml ```yaml server: port: 7001
eureka: instance: hostname: eureka7001 #eureka服务端的实例名称 client: register-with-eureka: false #false表示不向注册中心注册自己 fetch-registry: false #false表示自己端就是注册中心,我的职责就是维护服务实例,并不需要去检索服务 service-url:
#设置与eureka server交互的地址查询服务和注册服务都需要这个地址
defaultZone: http://eureka7002.com:7002/eureka/
#单机指向的就是自己
defaultZone: http://localhost:7001/eureka/
server:
#关闭自我保护机制,保证不可用服务时及时剔除
enable-self-preservation: false
eviction-interval-timer-in-ms: 2000 #2秒钟
- 主启动
```java
@SpringBootApplication
@EnableEurekaServer //表示eureka服务端
public class EurekaMain7001 {
public static void main(String[] args) {
SpringApplication.run(EurekaMain7001.class,args);
}
}
将订单服务8001加入eureka
pom加入依赖、yml添加配置 、添加主启动类注解@EnableEurekaClient
eureka:
client:
register-with-eureka: true #表示自己进入EurekaServer
fetch-registry: true #是否从EurekaServer抓取已有的注册信息
service-url:
defaultZone: http://localhost:7001/eureka #单机版
# defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka #集群版
instance:
instance-id: payment8001
prefer-ip-address: true #访问路径可以显示id地址
#Eureka客户端向服务端发送心跳的时间间隔,单位为秒(默认为30秒
# lease-renewal-interval-in-seconds: 1
#Eurka服务端在收到最后一次心跳后等待的时间上限,单位为秒(默认为90秒,超时将剔除服务
# lease-expiration-duration-in-seconds: 2
pom
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
将客户端80加入eureka
pom
<!-- eureka-server --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId> </dependency>
yml
spring: application: name: cloud-order-server eureka: client: register-with-eureka: true #表示自己进入EurekaServer fetch-registry: true #是否从EurekaServer抓取已有的注册信息 service-url: defaultZone: http://localhost:7001/eureka #单机版 # defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka #集群版
主启动 @EnableEurekaClient
集群版Eureka构建
微服务RPC远程服务调用最核心的是:高可用。所以搭建Eureka注册中心集群。实现负载均衡+故障容错。
实现原理: 互相注册,相互守望
集群搭建
对照上述Eureka7001注册中心,创建Eureka7002modle。pom、主启动不变,更改yml。
更改环境配置
- 在路径
C:\Windows\System32\drivers\etc
中添加
- hosts 文件中添加
127.0.0.1 eureka7001.com & 127.0.0.1 eureka7002.com
- 更改7001和7002的yaml文件
- 7001将
eureka.client.server-url.defaultZone
指向7002的地址http://eureka7002.com:7002/eureka/ - 7002将
eureka.client.server-url.defaultZone
指向7001的地址http://eureka7001.com:7001/eureka/
- 7001将
- 测试结果
二者相互指向、即集群成功部署
将payment8001和cloud-order-server:81加入集群
只需更改yaml,同时指向7001和7002
defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka #集群版
- 测试结果
同时在7001和7002注册中心页面可以看到cloud-order-server:81和payment8001的注册信息。
支付微服务cloud-order-server:800*集群配置
- 参考8001新建8002
修改controller接口,调用接口时输出当前端口信息。
@Value("${server.port}") private String serverPort; @PostMapping("/create") public R create(@RequestBody Payment payment){ int i = paymentService.create(payment); log.info("添加结果:",i); if (i>0){ return R.ok("添加成功"+serverPort); }else { return R.errer("添加失败"+serverPort); } }
修改81controller中订单服务调用地址 http://CLOUD-PAYMENT-SERVICE
注意:此时还需要修改RestTemplateConfig,不然多个微服务提供者会报错。
@Configuration //将restTemplateConfig加入bean工厂 public class RestTemplateConfig { @Bean @LoadBalanced //负载均衡,轮询 public RestTemplate getRestTemplate(){ return new RestTemplate(); } }
测试结果:此时使用81客户端调用接口,可以轮流访问不同的服务端。实现了负载均衡功能。
其它功能配置
yaml中添加如下
eureka:
instance:
instance-id: cloud-order-server #微服务名称,
prefer-ip-address: true #访问路径可以显示id地址
- instance-id:定义微服务名称
- prefer-ip-address: 鼠标单击微服务名称,可以显示ip地址等。
- 访问端口微服务端口8001或8002 路径actuator/health检查服务器是否正常。(访问ureka客户端需要添加web和actuator依赖)
服务发现Discovery
对于注册进eureka里面的微服务,可以通过服务发现来获取该服务信息。
修改controller
@RestController @RequestMapping("payment") @Slf4j public class PaymentController { @Autowired private DiscoveryClient discoveryClient; @GetMapping("/discovery") public Object discovery(){ List<String> services = discoveryClient.getServices(); for (String service : services) { System.out.println("element:"+service); } List<ServiceInstance> instances = discoveryClient.getInstances("CLOUD-PAYMENT-SERVICE"); for (ServiceInstance instance : instances) { System.out.println(instance.getInstanceId()+"\t" +instance.getHost()+"\t"+instance.getPort() +"\t"+instance.getUri()); } return this.discoveryClient; } }
修改主启动类 添加注解@EnableDiscoveryClient 开启服务发现
Eureka自我保护机制
某时刻一个微服务不可用了,Eureka不会立即清理,依旧会对该微服务的信息进行保存。(属于CAP中的AP分支)
产生原因:
为了防止EurekaClient本可以正常运行,但是由于EurekaServer网络不通的情况下,EurekaServer不会立刻将EurekaClient服务剔除。
什么是自我保护机制:
默认情况下,如果EurekaServer在一定时间内没有接收到某个微服务实例的心跳,Eureka会将其注销(默认90秒)。但是如果因为网络原因导致无法正常通信,以上行为将变得非常危险(因为微服务本身是健康的,不应该注销)。那么通过Eureka自我保护机制来解决这个问题。
- 出厂默认开启自我保护机制:
eureka.server.enable-self-preservation:true
关闭自我保护机制只需要将true改为false。
### 7001 eureka注册中心 eureka: instance: hostname: eureka7001 #eureka服务端的实例名称 client: register-with-eureka: false #false表示不向注册中心注册自己 fetch-registry: false #false表示自己端就是注册中心,我的职责就是维护服务实例,并不需要去检索服务 service-url: #设置与eureka server交互的地址查询服务和注册服务都需要这个地址 # defaultZone: http://eureka7002.com:7002/eureka/ #单机指向的就是自己 defaultZone: http://localhost:7001/eureka/ server: #关闭自我保护机制,保证不可用服务时及时剔除 enable-self-preservation: false eviction-interval-timer-in-ms: 2000 #2秒钟
eviction-interval-timer-in-ms: 2000 #2秒钟
设置剔除时间### 8001微服务模块 eureka: client: register-with-eureka: true #表示自己进入EurekaServer fetch-registry: true #是否从EurekaServer抓取已有的注册信息 service-url: defaultZone: http://localhost:7001/eureka #单机版 # defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka #集群版 instance: instance-id: payment8001 prefer-ip-address: true #访问路径可以显示id地址 # Eureka客户端向服务端发送心跳的时间间隔,单位为秒(默认为30秒 lease-renewal-interval-in-seconds: 1 # Eurka服务端在收到最后一次心跳后等待的时间上限,单位为秒(默认为90秒,超时将剔除服务 lease-expiration-duration-in-seconds: 2
Eureka停更
zookeeper服务注册中心
项目搭建
快速创建服务端
cloud-provider-payment8004
pom
<!--SpringBoot整合Zookeeper客户端--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-zookeeper-discovery</artifactId> <exclusions> <!--先排除自带的zookeeper3.5.3--> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency> <!--添加zookeeper3.4.14版本--> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.14</version> </dependency>
yml
server: port: 8004 spring: application: name: cloud-provider-payment cloud: zookeeper: connect-string: 119.3.160.59:2181
主启动
@SpringBootApplication @EnableDiscoveryClient //该注解用于使用consul或者zookeeper作为注册中心时注册服务 public class paymentMain8004 { public static void main(String[] args) { SpringApplication.run(paymentMain8004.class,args); } }
测试结果: 8004成功注册进zookeeper
注: 注册进zookeeper的服务是临时性的,当服务断了之后,一段时间后就会删除注册信息。Consul服务注册中心(停止使用)
Eureka和zookeeper的异同
组件名称 | 语言 | CAP | 服务健康检查 | 对外暴露接口 |
---|---|---|---|---|
Eureka | java | AP | 可配置支持 | HTTP |
zookeeper | java | CP | 支持 | 客户端 |
CAP原理
C: Consistency(强一致性)
A: Availability (可用性)
P: Partition tolerance(分区容错性) 分布式基本上都要保证P所以分布式都为AP/CP
CAP理论关注粒度是数据,而不是整体系统设计的策略
Ribbon负载均衡服务调用
Spirng Cloud Ribbon是基于Netflix Ribbon实现的一套客户端。(实现负载均衡)
LB负载均衡(Load Balance)是什么
简单的说就是将用户的请求平摊的分配到多个服务上,从而达到系统的HA(高可用)
常见的负载均衡有软件Nginx、LVS,硬件F5等。
Ribbon本地负载均衡客户端 VS Nginx服务端负载均衡区别
Nginx(集中式)是服务器负载均衡,客户端所有请求都会交给nginx,然后由nginx实现转发请求。即负载均衡是由服务端实现的。
Ribbon(进程内)本地负载均衡,在调用微服务接口的时候,会在注册中心上获取注册信息服务列表,之后缓存到本地JVM。从而在本地实现RPC远程服务调用技术。
Ribbon就是: RestTemplate调用+负载均衡
Ribbon的工作原理
第一步:先选择EurekaServer,它优先选择在同一个区域内负载较少的server。
第二步:再根据用户指定的策略,在从server取到的服务注册列表中选择一个地址。
其中Ribbon提供了多种策略:比如轮询,随机和根据相应时间加权。
项目案例
pom:之前引入的Eureka中已经集成了Ribbon相应的jar包。
Ribbon核心组件IRule
IRle:根据特定的算法中从服务列表中选取一个要访问的服务。7种算法:轮询、随机、权重等。
注意:
官方文档给出警告:Ribbon配置类不能放在@ComponentScan所扫描的当前包及子包下。
主启动添加
@RibbonClient(name = "CLOUD-PAYMENT-SERVICE",configuration = MyselfRule.class)
测试结果:
80客户端访问服务端从轮询变为随机。
负载均衡轮询算法
- 原理:
rest接口第几次请求数%服务器集群总数=实际服务器位置下标。每次服务重启后rest接口计数从0开始。
源码
public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { log.warn("no load balancer"); return null; } else { Server server = null; int count = 0; while(true) { if (server == null && count++ < 10) { List<Server> reachableServers = lb.getReachableServers();//所有服务list List<Server> allServers = lb.getAllServers(); int upCount = reachableServers.size();//可用服务count int serverCount = allServers.size(); if (upCount != 0 && serverCount != 0) { int nextServerIndex = this.incrementAndGetModulo(serverCount);//@ server = (Server)allServers.get(nextServerIndex); if (server == null) { Thread.yield(); } else { if (server.isAlive() && server.isReadyToServe()) { return server; } server = null; } continue; } log.warn("No up servers available from load balancer: " + lb); return null; } if (count >= 10) { log.warn("No available alive servers after 10 tries from load balancer: " + lb); } return server; } } }
private int incrementAndGetModulo(int modulo) {//@增量与取模 int current; int next; do { current = this.nextServerCyclicCounter.get(); next = (current + 1) % modulo;//获取到调用服务器下标 } while(!this.nextServerCyclicCounter.compareAndSet(current, next)); return next; }
//自旋锁+CAS获取到服务器地址 public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); }
手写负载均衡算法
//接口 public interface LoadBalancer { ServiceInstance INSTANCE(List<ServiceInstance> serviceInstances); }
//具体实现 @Component public class MyLB implements LoadBalancer { private AtomicInteger atomicInteger =new AtomicInteger(0); public final int getAndIncerment(){ int current; int next; do { current=this.atomicInteger.get(); next= current>= 2147483647 ? 0: current+1; }while (!this.atomicInteger.compareAndSet(current,next)); System.out.println("第几次访问次数next:"+next); return next; } //负载均衡算法 接口第几次请求数%服务器集群总数=实际服务器位置下标 @Override public ServiceInstance INSTANCE(List<ServiceInstance> serviceInstances) { int index=getAndIncerment()%serviceInstances.size();//第几次访问%服务集群数量 return serviceInstances.get(index); } }
//调用接口 @GetMapping("/payment/lb") public R getPayemtLB(){ List<ServiceInstance> instances = discoveryClient.getInstances("CLOUD-PAYMENT-SERVICE"); if (instances==null|| instances.size()<=0){ return null; } ServiceInstance instance = loadBalancer.INSTANCE(instances); URI uri = instance.getUri(); String forObject = restTemplate.getForObject(uri + "/payment/lb", String.class); return R.ok().put("data",forObject); }
测试结果
OpenFeign服务调用
OpenFeign是什么?
Feign是一个声明式的web服务客户端,让编写web服务客户端变得更加容易,只需要创建一个接口并在接口上添加注解即可。简化Ribbon调用接口的操作。
项目搭建
- 新建cloud-consumer-feign-order80
pom
<!-- openfeign --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> </dependency>
yaml
server: port: 80 eureka: client: register-with-eureka: true #表示自己进入EurekaServer fetch-registry: true #是否从EurekaServer抓取已有的注册信息 service-url: # defaultZone: http://localhost:7001/eureka #单机版 defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka #集群版
主启动 添加注解@EnableFeignClients
业务类
- 业务逻辑接口+@FeignClient配置调用provider服务
新建PaymentFeignService接口并新增注解@FeignClient ```java @Component @FeignClient(value = “CLOUD-PAYMENT-SERVICE”) public interface PaymentFeignService {
@GetMapping(“/payment/get/{id}”) public R getPaymentById(@PathVariable(“id”) Long id);
}
- 控制层controller
```java
@RestController("feign")
public class FeignController {
@Autowired
private PaymentFeignService paymentFeignService;
@GetMapping("/consumer/get/{id}")
public R get(@PathVariable("id")Long id){
return paymentFeignService.getPaymentById(id);
}
}
OpenFeign日志增强
可以通过配置来调整日志级别,从而了解Feign中Http请求的细节。对Feign接口的调用情况进行监控和输出。
添加config
日志级别:NONE,BASIC,HEADERS,FULL;
@Configuration
public class FeignConfig {
@Bean
Logger.Level feignLoggerLevel(){
return Logger.Level.FULL;//详细日志
}
}
yml
logging:
level:
#feign日志以什么级别监控哪个接口
com.sengg.springcloud.service.paymentFeignService: debug
Histrix断路器
Histrix是一个用于处理分布式系统的延迟和容错的开源库,在分布式系统里,许多依赖不可避免的会调用失败,比如超时、异常等。Hystrix能够保证在一个依赖出问题的情况下,不会导致整体服务失败,避免级联故障,以提高分布式系统的弹性。
能干啥
- 服务降级:服务器忙,请稍后再试,不让客户等待并立即返回一个友好提示,fallback;
- 服务熔断:服务器达到最大负载的时候,直接拒绝访问,然后调用服务降级的方式返回友好提示。break
服务限流:秒杀高并发等操作,严禁都直接访问,进行限流一秒n个。flowlimit
停更进维
替代方案官方推荐resilience4j,国内推荐使用阿里巴巴的Sentinel
项目搭建
修改服务注册中心7001为单机版,创建服务提供者
cloud-provider-hystrix-payment8001
新建两个接口,一个直接返回ok,另外一个模拟延时2秒钟
pom<!-- hystrix--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix</artifactId> </dependency>
@Service public class PaymentService { //1正常访问,肯定ok的方法 public String paymentInfo_OK(Integer id){ return "线程池:"+Thread.currentThread().getName()+" paymentInfo_OK,id: "+id+"\t"+"O(∩_∩)O哈哈~"; } public String paymentInfo_timeout(Integer id){ int time=10/0; try { TimeUnit.SECONDS.sleep(time);} catch (InterruptedException e) {e.printStackTrace(); } return "线程池:"+Thread.currentThread().getName()+" paymentInfo_timeout,id: "+id+"\t"+"(*^▽^*)"+"耗时"+time+"秒钟"; } public String paymentInfo_timeoutHandler(Integer id) { return "线程池:"+Thread.currentThread().getName()+"系统繁忙,请稍后再试!!!"+"\t"+"o(╥﹏╥)o"; } //=========访问熔断=========================== @HystrixCommand(fallbackMethod = "paymentCircuitBreaker_fallback",commandProperties = { @HystrixProperty(name = "circuitBreaker.enabled",value = "true"),//是否开启断路器 @HystrixProperty(name = "circuitBreaker.requestVolumeThreshold",value = "10"),//请求次数 @HystrixProperty(name = "circuitBreaker.sleepWindowInMilliseconds",value = "10000"),//实践窗口期 @HystrixProperty(name = "circuitBreaker.errorThresholdPercentage",value = "60")//失败率达到多少后跳闸 }) public String paymentCircuitBreaker(@PathVariable("id") Integer id){ if (id<0){ throw new RuntimeException("******id不能为负数"); } String serialNumber= IdUtil.simpleUUID(); return Thread.currentThread().getName()+"调用成功,流水号: "+serialNumber; } public String paymentCircuitBreaker_fallback(@PathVariable("id")Integer id){ return "id 不能为负数,请稍后再试,╮(╯▽╰)╭ id:"+id; } }
@RestController @Slf4j public class PaymentController { @Resource private PaymentService paymentService; @Value("${server.port}") private String serverPort; @GetMapping("/payment/hystrix/ok/{id}") public String paymentInfo_OK(@PathVariable("id") Integer id){ String result= paymentService.paymentInfo_OK(id); log.info("result:"+result); return result; } @GetMapping("/payment/hystrix/timeout/{id}") public String paymentInfo_timeout(@PathVariable("id") Integer id){ String result= paymentService.paymentInfo_timeout(id); log.info("result:"+result); return result; } //=======服务熔断======== @GetMapping("/payment/circuit/{id}") public String paymentCircuiBreaker(@PathVariable("id")Integer id){ String result = paymentService.paymentCircuitBreaker(id); log.info("*****result:"+result); return result; } }
添加服务hystrix服务熔断之前使用JMeter压力测试延时接口200*100次。
结论: 正常ok接口在压力测试期间,访问速度也会下降。即当一个接口出现问题时,其他接口也会受到影响,影响正常的访问速度。将消费端80加入测试
新建消费端
cloud-consumer-feign-hystrix-order80
pom<!-- openfeign --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> </dependency> <!--监控--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix</artifactId> </dependency> <!--eureka client--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency>
yml 注册进Eureka
server: port: 80 eureka: client: fetch-registry: true service-url: defaultZone: http://eureka7001.com:7001/eureka/ feign: hystrix: enabled: true
主启动类 ```java @SpringBootApplication @EnableFeignClients @EnableHystrix public class OrderHystrixMain80 { public static void main(String[] args) {
SpringApplication.run(OrderHystrixMain80.class,args);
} }
service
```java
//使用OpenFeign调用服务端接口
@Component
@FeignClient(value = "CLOUD-PROVIDER-HYSTRIX-PAYMENT",fallback = PaymentFallbackService.class)
public interface PaymentHystrixService {
@GetMapping("/payment/hystrix/ok/{id}")
public String paymentInfo_OK(@PathVariable("id") Integer id);
@GetMapping("/payment/hystrix/timeout/{id}")
public String paymentInfo_timeout(@PathVariable("id") Integer id);
}
Controller
@RestController
@Slf4j
@DefaultProperties(defaultFallback = "payment_Global_FallbackMethod")
public class OrderHystrixController {
@Resource
private PaymentHystrixService paymentHystrixService;
@GetMapping("/consumer/hystrix/ok/{id}")
public String paymentInfo_OK(@PathVariable("id") Integer id){
String result= paymentHystrixService.paymentInfo_OK(id);
return result;
}
@HystrixCommand //该方法可以解决代码膨胀
public String paymentInfo_timeout(@PathVariable("id") Integer id){
int age=10/0;
String result= paymentHystrixService.paymentInfo_timeout(id);
return result ;
}
}
故障现象和导致原因
- 8001同一层次的其它接口服务被困死,因为tomcat线程池里面的线程被占用。
因为有了上述故障或不佳的表现才有我们的降级/容错/限流等技术的产生。
解决方案
超时导致服务器变慢(转圈)—-超时不要再等待
- 出错(宕机或程序运行出错)—-出错要有兜底
解决:
降级配置
@HystrixCommand
- 8001服务降级:
- 主启动启用
@EnableCircuitBreaker
- 业务类启用
- 主启动启用
此时8001正常启动,但是处理请求需要三秒,80设置只等待1.5秒,会返回报错页面,请求将进行降级。
@GetMapping("/consumer/hystrix/timeout/{id}")
@HystrixCommand(fallbackMethod = "paymentInfo_timeoutHandler",commandProperties = {
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds",value = "1500")
})
public String paymentInfo_timeout(@PathVariable("id") Integer id){
String result= paymentHystrixService.paymentInfo_timeout(id);
return result ;
}
public String paymentInfo_timeoutHandler(Integer id) {
return "我是消费者80,对方系统繁忙请稍后再试"+"\t"+"o(╥﹏╥)o";
}
客戶端80的降级
- 降级配置
@HystrixCommand
8001服务降级:
- 主启动启用
@EnableCircuitBreaker
业务类启用
@GetMapping("/consumer/hystrix/timeout/{id}") @HystrixCommand(fallbackMethod = "paymentInfo_timeoutHandler",commandProperties = { @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds",value = "3000") }) // @HystrixCommand //该方法可以解决代码膨胀 public String paymentInfo_timeout(@PathVariable("id") Integer id){ int age=10/0; // int age=5; try { TimeUnit.SECONDS.sleep(age); } catch (InterruptedException e) { e.printStackTrace(); } String result= paymentHystrixService.paymentInfo_timeout(id); return result ; } public String paymentInfo_timeoutHandler(Integer id) { return "我是消费者80,对方系统繁忙请稍后再试"+"\t"+"o(╥﹏╥)o"; }
全局服务降级 DefaultProperties
上述代码存在的问题
- 主启动启用
每个业务方法对应一个兜底的方法,代码膨胀。
- 和业务逻辑混在一起比较混乱。
解决方案1 配置全局默认容错方法
- 添加全局配置注解 @DefaultProperties(defaultFallback = “payment_Global_FallbackMethod”)
- 使用默认容错注解 @HystrixCommand
- 定义全局容错方法 payment_Global_FallbackMethod()
@RestController
@Slf4j
@DefaultProperties(defaultFallback = "payment_Global_FallbackMethod")
public class OrderHystrixController {
@Resource
private PaymentHystrixService paymentHystrixService;
@GetMapping("/consumer/hystrix/timeout/{id}")
@HystrixCommand //该方法可以解决代码膨胀
public String paymentInfo_timeout(@PathVariable("id") Integer id){
int age=10/0;
// int age=5;
try { TimeUnit.SECONDS.sleep(age); } catch (InterruptedException e) { e.printStackTrace(); }
String result= paymentHystrixService.paymentInfo_timeout(id);
return result ;
}
//全局fallback方法
public String payment_Global_FallbackMethod(){
return "Global异常信息处理,请稍后";
}
}
解决方法2 通配服务降级
使用上述全局降级配置,存在兜底方法和业务方法混淆的问题。使用通配服务降级解决。
解决以下异常:运行时异常、超时、宕机
@Component
@FeignClient(value = "CLOUD-PROVIDER-HYSTRIX-PAYMENT",fallback = PaymentFallbackService.class)
public interface PaymentHystrixService {
@GetMapping("/payment/hystrix/ok/{id}")
public String paymentInfo_OK(@PathVariable("id") Integer id);
@GetMapping("/payment/hystrix/timeout/{id}")
public String paymentInfo_timeout(@PathVariable("id") Integer id);
}
@Component
public class PaymentFallbackService implements PaymentHystrixService {
@Override
public String paymentInfo_OK(Integer id) {
return "-------PaymentFallbackService fall back,o(╥﹏╥)o";
}
@Override
public String paymentInfo_timeout(Integer id) {
return "-------PaymentFallbackService fall back,o(╥﹏╥)o";
}
}
通过此方法可以避免兜底代码和业务逻辑混淆,但是配置在客户端80,不能兜底80接口本身的报错。
服务熔断
类比保险丝达到最大服务访问后,直接拒绝访问,拉闸限电,然后调用服务降级的方法并返回友好提示。
熔断机制是应对雪崩效应的一种微服务链路保护机制。当扇出链路的某个微服务出错不可用或者响应时间太长时,会进行服务降级,进而熔断该节点微服务调用,快速返回错误的响应信息。
当检测到该节点微服务调用响应正常后,恢复调用链路。
代码测试
代码为10秒窗口期,错误率达到60%打开熔断器。
//=======controller 服务熔断========
@GetMapping("/payment/circuit/{id}")
public String paymentCircuiBreaker(@PathVariable("id")Integer id){
String result = paymentService.paymentCircuitBreaker(id);
log.info("*****result:"+result);
return result;
}
//=========service 访问熔断===========================
@HystrixCommand(fallbackMethod = "paymentCircuitBreaker_fallback",commandProperties = {
@HystrixProperty(name = "circuitBreaker.enabled",value = "true"),//是否开启断路器
@HystrixProperty(name = "circuitBreaker.requestVolumeThreshold",value = "10"),//请求次数
@HystrixProperty(name = "circuitBreaker.sleepWindowInMilliseconds",value = "10000"),//时间窗口期
@HystrixProperty(name = "circuitBreaker.errorThresholdPercentage",value = "60")//失败率达到多少后跳闸
})
public String paymentCircuitBreaker(@PathVariable("id") Integer id){
if (id<0){
throw new RuntimeException("******id不能为负数");
}
String serialNumber= IdUtil.simpleUUID();
return Thread.currentThread().getName()+"调用成功,流水号: "+serialNumber;
}
自测:
正确:http://localhost:8001/payment/circuit/1
错误:http://localhost:8001/payment/circuit/-1
该方法传参为正数正常返回,负数返回报错。尝试10次以上,错误率60%。结果正确传参也返回兜底信息,当正确率回升之后,服务可正常调用。
熔断类型
- 熔断打开:请求不再进行调用当前服务,内部设置时钟一般为MTTR(平均故障处理时间),当打开时长达到所设时钟时间,则进入半熔断状态。
- 熔断关闭:熔断关闭不会对服务进行熔断。
- 熔断半开:部分请求根据规则调用当前服务,如果请求成功且符合规则认为当前服务恢复正常,关闭熔断。
断路器开启或者关闭的条件
断路器开启或者关闭的条件:
- 当满足一定的阀值的时候(默认十秒内超过20个请求次数)
- 当失败率达到一定的时候(默认10内超过50%的请求失败)
- 到达以上阀值,断路器将会开启
- 当开启的时候,所有请求都不会进行转发
- 一段时间后(默认5秒),这时候断路器是半开状态,会让其中一个请求进行转发。如果成功,断路器会关闭,若失败,继续开启。重复4和5
服务限流
之后alibaba sentinel中介绍仪表盘
创建新工程
cloud-consumer-hystrix-dashboard9001
pom
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix-dashboard</artifactId> </dependency> <!--监控--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix</artifactId> </dependency> <!--eureka client--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency>
yml
server.port: 9001
- 主启动 添加
@EnableHystrixDashboard
测试
- 使用监控平台9001监控8001
http://localhost:8001/hystrix.stream
打开后报错,需要在8001主启动类添加:
/** *此配置是为了服务监控面配置,与服务容错本身无关,springcloud升级后的坑 * ServletRegistraBean因为springboot的默认路径不是“/hystrix.stream", * 是要在自己的项目配置上下面的servlet就可以了 */ @Bean public ServletRegistrationBean getServlet() { HystrixMetricsStreamServlet streamServlet = new HystrixMetricsStreamServlet(); ServletRegistrationBean registrationBean = new ServletRegistrationBean(streamServlet); registrationBean.setLoadOnStartup(1); registrationBean.addUrlMappings("/hystrix.stream"); registrationBean.setName("HystrixMetricsStreamServlet"); return registrationBean; }
使用8001对应接口测试
- 折线图代表访问频率,circuit代表断路器是否打开
服务网关 zuul zuul2 gateway
SpringCloud Gateway 是zuul1.x的替代
什么是:
SpringCloud Gateway使用的Webflux中的reactor-netty响应式编程组件,底层使用了Netty通讯框架。
能干啥:
三大核心概念:
- 路由Route:路由是构建网关的基本模块,它由ID,目标URI,一系列的断言和过滤器组成,如果断言为true则匹配该路由。
- 断言Predicate:开发人员可以匹配HTTP请求中的所有内容,如果请求与断言相匹配则进行路由。(匹配条件)
Filter过滤:spring框架中GatewayFilter的实例,使用过滤器,可以在请求被路由前或者之后对请求进行修改。(拦截器)
核心逻辑:
GateWay9527搭建
pom
<!--gateway--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-gateway</artifactId> </dependency> <!--eureka client--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency>
yml routes为网关路由配置
```yaml server: port: 9527 spring: application: name: cloud-gateway cloud: gateway:
discovery: locator: enabled: true #开启从注册中心动态创建路由的功能,利用微服务名进行路由 routes: - id: payment_routh #路由的id,没有固定规律但是要求唯一,建议配合服务名
uri: http://localhost:8001 #匹配后提供服务的路由地址
uri: lb://cloud-payment-service #匹配后提供服务的路由地址 predicates: - Path=/payment/gets/** #断言,路径相匹配的进行路由
- After=2020-09-30T16:06:22.476+08:00[GMT+08:00]
- id: payment_routh2 #payment_route
uri: http://localhost:8001
uri: lb://cloud-payment-service #匹配后提供服务的路由地址 predicates: - Path=/payment/lb/** #地址
- After=2020-09-30T15:06:22.476+08:00[GMT+08:00] #时间
- Cookie=username,zzyy #cookie
- Header=X-Request-Id, \d+ #请求要求要有:X-Request-Id属性并且值为整数的正则表达式
<a name="ItwC1"></a>
#### routes配置规则
![image.png](https://cdn.nlark.com/yuque/0/2021/png/12860789/1638444164457-22704da4-0fc1-4996-9cff-65039f8efa29.png#clientId=ub31e9d1d-f7a1-4&from=paste&height=247&id=u5777c0ef&margin=%5Bobject%20Object%5D&name=image.png&originHeight=493&originWidth=1338&originalType=binary&ratio=1&size=466862&status=done&style=none&taskId=uca9ad8f6-fe97-443d-9770-03f152c4565&width=669)
<a name="EKBdQ"></a>
#### getway配置方法两种:
第一种使用yaml配置,同上<br />第二种添加config
```yaml
@Configuration
public class GateWayConfig {
@Bean
public RouteLocator customRouteLocator(RouteLocatorBuilder routeLocatorBuilder){
RouteLocatorBuilder.Builder routes = routeLocatorBuilder.routes();
//http://news.baidu.com/guonei
routes.route("path_route_sengg",
r ->r.path("/guonei")
.uri("http://news.baidu.com/guonei"))
.build();
return routes.build();
}
}
getway动态路由配置
默认情况下Gateway会根据注册中心注册的服务列表,以注册中心上微服务名为路径创建动态路由进行转发,从而实现动态路由功能。
Predicate断言的使用
Predicate就是为了实现一组匹配规则,让请求过来找到对应的Route进行处理。
Filter的使用 通常使用自定义过滤器
常用订单过滤器使用
自定义全局过滤器 需要实现GlobalFilter, Ordered
两个接口
@Component
@Slf4j
public class MyLogGateWayFilter implements GlobalFilter, Ordered {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
log.info("***************come in MyLogGateWayFilter"+new Date());
String uname = exchange.getRequest().getQueryParams().getFirst("uname");
if (uname==null){
log.info("***用户名为null");
exchange.getResponse().setStatusCode(HttpStatus.NOT_ACCEPTABLE);
return exchange.getResponse().setComplete();
}
return chain.filter(exchange);
}
@Override
public int getOrder() {
return 0;
}
}
服务配置 config Nacos
分布式系统面临的问题:
微服务每个服务粒度相对较小,因此系统中会出现大量的服务,由于每个服务都需要必要的配置信息才能运行,所以一套集中式的、动态的配置管理设备是必不可少的。
是什么?
SpringCloud Config为微服务架构中的微服务提供集中化的外部配置支持,配置服务器为各个不同微服务应用的所有环境提供了一个中心化的外部配置。
能干嘛?
- 集中管理配置文件
- 不同环境不同配置,动态化的配置更新,分环境部署比如dev/test/prod/beta/release。
- 运行期间动态调整配置,不再需要在每个服务部署的机器上编写配置文件,服务会向配置中心统一拉取配置自己的信息,当配置发生变动时,服务不需要重启即可感知到配置的改变并应用新的配置。
- 将配置信息以REST接口的形式暴露。
Config使用
SpirngCloud Config分为服务端和客户端两部分。
项目搭建
在gitHub或者gitee中创建配置中心仓库。
创建config服务端 cloud-config-center3344
pom
<!--config server--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-config-server</artifactId> </dependency>
yml
server: port: 3344 spring: application: name: cloud-config-center #注册进eureka服务器的服务名 cloud: config: server: git: uri: https://github.com/tuyar/springcloud-config.git #gitee上面的git仓库名字 #搜索目录 search-paths: - springcloud-config force-pull: true username: 1127752003@qq.com #登录名 password: 225211zll #登录密码 #读取分支 label: master eureka: client: service-url: defaultZone: http://eureka7001.com:7001/eureka
主启动
@SpringBootApplication @EnableConfigServer //激活配置中心 public class ConfigCenterMain3344 { public static void main(String[] args) { SpringApplication.run(ConfigCenterMain3344.class,args); } }
测试
访问localhost:3344/master/config-prod.yml
得到git上的配置信息
创建config客户端 cloud-config-client-3355
pom 依赖为config客户端
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-config</artifactId> </dependency>
yml
** **bootstrap.yml
server: port: 3355 spring: application: name: config-client cloud: #config客户端配置 config: label: master #分支名称 name: config #配置文件名称 profile: dev #读取后缀名称 #上述三个综合:master分支上的config-dev.yml配置文件被读取 uri: http://localhost:3344 #配置中心地址 #服务注册到eureka eureka: client: service-url: defaultZone: http://eureka7001.com:7001/eureka
主启动
@EnableEurekaClient @SpringBootApplication public class ConfigClinetMain3355 { public static void main(String[] args) { SpringApplication.run(ConfigClinetMain3355.class,args); } }
业务类
@RestController @RefreshScope public class ConfigClientControlelr { @Value("${server.port}") private String serverPort; @Value("${config.info}") private String configInfo; @GetMapping("/configInfo") public String getConfigInfo(){ return "serverPort:"+serverPort+"\t\n\n configInfo:"+configInfo; } }
测试
访问localhost:3355,此方法通过3344config配置中心
获取到配置信息并返回
注:
application.yml 是用户级的资源配置项
bootstrap.yml是系统级的,优先级更高。
Config客户端动态刷新
分布式配置刷新问题
客户端3355从配置中心3344获取到git上的配置信息,但是如果配置信息修改,需要客户端3355重启才能刷新。
动态刷新(手动)
修改3355模块
- pom:引入actuator监控
修改yml:暴露监控端口
management: endpoints: web: exposure: include: "bus-refresh"
-
测试:
发现3355配置并没有生效
github修改之后需要请运维人员发送post请求刷新3355Curl -X POST “http://localhost:3355/actuator/refresh”
SpringCloud Bus消息总线
用来实现分布式自动刷新配置功能,是config的加强。Bus配合config使用可实现配置的动态刷新。
是什么
在微服务架构的系统中,通常会使用轻量级的消息代理来构建一个公用的消息主题,并让系统中所有微服务实例都连接上来。由于该主题中产生的消息会被所有实例监听和消费,所以称它为消息总线。
Bus支持两种消息代理:RabbitMQ和Kafka
安装Rabbitmq环境
服务器地址
[http://47.108.210.231:15672/](http://47.108.210.231:15672/)
根据3355新建3366
设计思想:
1.利用消息总线触发一个客户端/bus/refresh,从而刷新所有客户端配置。
2.利用一个消息总线触发一个服务端ConfigServer的/bus/refresh端点,从而刷新所有客户端配置。
3.由以上两图可知,图二更合适,图一不合适原因: 打破了微服务的职责单一性,因为微服务本身是业务模块,它本不应该承担配置刷新的职责。
- 打破了微服务各节点的对等性。
有一定的局限性,例如,微服务在迁移的时,地址常常会发生改变,此时如果想要做到自动刷新,那就会增加更多的修改。
Config+Bus 项目整改
给3344配置中心服务端配置消息总线支持
Pom
<!-- 添加消息总线RabbitMQ支持 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bus-amqp</artifactId> </dependency>
yaml ```yaml
rabbitmq相关配置
rabbitmq: host: 47.108.210.231 port: 5672 username: guest password: guest
rabbitmq相关配置,暴露bus刷新配置的端点
management: endpoints: web: exposure: include: ‘bus-refresh’
<a name="ccqMo"></a>
#### 给3355和3366客户端配置消息总线支持
- pom
```xml
<!-- 添加消息总线RabbitMQ支持 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
yaml
#rabbitmq相关配置 rabbitmq: host: 47.108.210.231 port: 5672 username: guest password: guest #暴露监控端点 management: endpoints: web: exposure: include: "*"
测试 启动7001,3344,3355,3366
修改github中配置文件,发现只有3344发生改变,此时给3344发送post请求刷新/bus/refresh。
curl -X POST “http://localhost:3344/actuator/bus-refresh”
结论:只需要刷新3344服务端,3355和3366客户端都实现了刷新基本原理
所有的ConfigClient实例都监听MQ中同一个Topic(默认SpringCloudBus)。当一个服务刷新数据的时候,它会把这个消息放到Topic中,这样其他监听同一个Topic的服务就能得到通知,然后去更新自身的配置。
定点通知
指定通知3355不通知3366,即指定具体某个实例生效而不是全部。
- 公式:
curl -X POST "http://localhost:3344/actuator/bus-refresh/config-client:3355"
SpringCloud Stream 消息驱动
是什么
屏蔽底层消息中间件的差异,降低切换成本,同意消息的编程模型。
官方定义:是一个构件消息驱动的微服务框架
仅仅支持:Rabbitmq和kafka
Binder
通过定义绑定器(Binder)作为中间层,完美的实现了应用程序与消息中间件细节之间的隔离。
案例
创建三个子模块
cloud-stream-rabbitmq-provider8801 作为生产者进行发消息模块。
pom
<!--stream rabbit --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
yml
server: port: 8801 spring: application: name: cloud-stream-provider cloud: stream: binders: #在此处配置要绑定的rabbitmq的服务信息 defaultRabbit: #表示定义的名称,用于binding整合 type: rabbit #消息中间件类型 environment: #设置rabbitmq的相关配置 spring: rabbitmq: host: 47.108.210.231 port: 5672 username: guest password: guest bindings: #服务的整合处理 output: #这个名字是一个通道的名称 destination: studyExchange #表示要使用的exchange名称定义 content-type: application/json #设置消息类型,本次为json 文本则设置为“text/plain" binder: defaultRabbit #设置要绑定的消息服务的具体设置 eureka: client: service-url: defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 #设置心跳时间的间隔(默认30秒 lease-expiration-duration-in-seconds: 5 #如果现在超过了5秒的间隔(默认90秒 instance-id: send-8801.com #在信息列表时显示主机名称 prefer-ip-address: true #访问的路径变为ip地址
业务方法 ```java public interface IMessageProvider { public String send(); }
@EnableBinding(Source.class) //定义消息的推送管道 public class MessageProviderImpl implements IMessageProvider {
@Resource private MessageChannel output;//消息发送管道 @Override public String send() { String s = UUID.randomUUID().toString(); output.send(MessageBuilder.withPayload(s).build()); System.out.println(“**s” +s); return null; } }
测试结果:<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/12860789/1638951350295-c8d5a75d-ca25-451b-8424-dc0ffc604334.png#clientId=udbf08beb-0760-4&from=paste&height=263&id=u790130ca&margin=%5Bobject%20Object%5D&name=image.png&originHeight=525&originWidth=950&originalType=binary&ratio=1&size=220450&status=done&style=stroke&taskId=ufa2fdbb3-35fd-414b-8fcf-c6e95a09033&width=475)<br />向`localhost:8801/sendMessage`发送请求<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/12860789/1638951469835-72ea8a2a-2cc9-4bc7-bd23-3478bd7d19b4.png#clientId=udbf08beb-0760-4&from=paste&height=88&id=ud84097c7&margin=%5Bobject%20Object%5D&name=image.png&originHeight=175&originWidth=1139&originalType=binary&ratio=1&size=197090&status=done&style=none&taskId=uea2be3fe-3391-41e0-92bc-c75085fb1ec&width=569.5)
- cloud-stream-rabbitmq-consumer8802,作为消息接收模块
- cloud-stream-rabbitmq-consumer8803,作为消息接收模块
- pom
```java
<!--stream rabbit -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
yml(只列关键配置,其他同8801)
bindings: #服务的整合处理 input: #这个名字是一个通道的名称 destination: studyExchange #表示要使用的exchange名称定义 content-type: application/json #设置消息类型,本次为json 文本则设置为“text/plain" binder: defaultRabbit #设置要绑定的消息服务的具体设置
controller
@RestController @EnableBinding(Sink.class) //消息绑定 public class ReceiveMessageListenerController { @Value("${server.port}") private String serverPort;//端口号 @StreamListener(Sink.INPUT)//监听输入流数据 public void input(Message<String> message){ System.out.println("消费者1号,------>收到的消息:"+message.getPayload()+"\t port:"+serverPort); } }
测试:
调用8001发送消息接口,8002控制台展示接收到的消息。分组消费与持久化
微服务模块:
- 7001:服务注册
- 8801:消息生产
- 8802:消息消费
- 8803:消息消费
运行后两个问题:
- 重复消费: 8801发送消息,8802和8803都收到消息,存在重复消费问题
- 导致原因:
默认分组group是不同的,组流水号不一样,被认为不同组,可以消费。
- 解决方法:使用stream中的消息分组来解决
- 原理:不同的组是可以消费的,同一个组会发生竞争关系,只有其中一个可以消费。
- 测试结果:8802和8803轮询得到结果,成功解决重复消费问题
- 消息持久化问题
加入分组之后,测试得到,有分组的8802任务重启之后可以获取到消息,未分组的8803将丢失消息。
SpringCloud Sleuth 分布式请求链路追踪
是什么
SpringCloud Sleuth 提供了一套完整的服务跟踪解决方案。并兼容支持了zipkin。
下载
https://zipkin.io/quickstart.sh
启动:
java -jar zipkin-server-2.12.9-exec.jar
访问:localhost9411
测试:
测试使用7001,8001,80
- 8001,80pom添加: ```xml
- 8001和80yaml中添加
```yaml
spring:
application:
name: cloud-payment-service
zipkin:
base-url: http://localhost:9411
sleuth:
sampler:
#采样率值介于0到1之间,1则全部采集
probability: 1
8001Controller中添加
@GetMapping("/zipkin") public String paymentZipkin(){ return "hi,i`am paymentzipkin server fall back,welcome,O(∩_∩)O哈哈~"; }
80Controller中添加
@GetMapping("/payment/zipkin") public String paymentZipkin(){ String result = restTemplate.getForObject("http://localhost:8001" + "/payment/zipkin", String.class); return result; }
测试:
浏览器访问:http://localhost/consumer/payment/zipkin
打开:localhost:9411
名称解释:
Trace:类似于树结构的Span集合,表示一条调用链路,存在唯一标识。
span:标识调用链路来源,通俗的理解span就是一次请求信息。