微服务架构与Spring Cloud简介
1.什么是微服务架构
微服务架构是一种架构模式,它提倡将单一的应用程序划分成一组一组小的服务,服务之间互相协调、互相配合。
每个服务运行在其独立的进程中,服务与服务采用轻量级的通信机制互相协调(通常是基于HTTP协议的RESTful API)。
每个服务都围绕着具体的业务进行构建,并且能够被独立的部署到生产环境、类生产环境中等等。
另外,应当尽量避免统一的、集中式的服务管理机制,对具体的一个服务而言,应根据上下文,选择合适的语言、工具对其进行构建。
2. 分布式服务架构的理解

一整套体系与体系之间的相互联系,落地的技术与维度构成了分布式微服务架构的体系,强在一个整体,而不是个体
3.什么是Spring Cloud
是提供分布式服务架构的一站式解决方案,是多种微服务架构落地技术的集合体,俗称微服务全家桶。
协调与集合

EUREKA已停止更新,后面使用springcloud alibaba
京东
阿里


4.主要核心服务

springboot与springcloud的版本选择
springboot的git源码地址:https://github.com/spring-projects/spring-boot/releases/
springboot2.0新特性:https://github.com/spring-projects/spring-boot/wiki/Spring-Boot-2.0-Release-Notes
springcloud的git源码地址:https://github.com/spring-projects/spring-cloud/wiki
SpringCloud版本采用了名称命名而非版本号,每当解决一个重大BUG后,都会发布一个service releases版本简称SRX版本
1.SpringCloud和SpringBoot之间的依赖关系

更详细的可查看:https://start.spring.io/actuator/info

为什么springboot选择2.2.2还不是最新的2.2.4?
为了照顾springbcloud,cloud推荐使用2.2.2
SpringCloud Netflix
服务调用(Ribbon负载均衡服务调用)
LoadBalancer以后会慢慢替换Ribbon,虽然Ribbon也已经停止更新了,但还能使用
Spring Cloud Ribbon是基于Netflix Ribbon实现的一套客户端,即负载均衡的工具。
简单的说,Ribbon是Netflix发布的开源项目,主要功能是提供客户端的软件负载均衡算法和服务调用。Ribbon客户端组件提供一系列完善的配置项,如连接超时,重试等。简单的说,就是在配置文件中列出Load Balancer(简称LB)后面所有的机器,Ribbon会自动的帮助你基于某种规则(如简单轮询、随机连接等)去连接这些机器。我们很容易使用Ribbon实现自定义的负载均衡算法。
负载均衡是什么
简单的说就是将用户的请求平摊的分配到多个服务上,从而达到系统的HA,即高可用。常见的负载均衡软件有Nginx,LVS,硬件F5等。
Ribbon本地负载均衡客户端 VS Nginx服务端负载均衡的区别
Nginx是服务器负载均衡,客户端所有的请求都会交给Nginx,然后有Nginx实现转发请求。即负载均衡是由服务端实现的。
Ribbon本地负载均衡,在调用微服务接口的时候,会在注册中心上获取注册信息服务列表之后缓存到JVM本地,从而在本地实现RPC远程服务调用技术。
集中式负载均衡,即在服务的消费方和提供方之间使用独立的负载均衡设施(可以是硬件,如F5,也可以是软件,如Nginx),由该设施负载把访问请求通过某种策略转发至服务的提供方
进程内负载均衡,将负载均衡逻辑集成到消费方,消费方从服务注册中心获知有哪些地址可用,然后自己再从这些地址中选择出一台合适的服务器。Ribbon就属于进程内LB,它只是一个类库,集成于消费进程,消费方通过它来获取到服务提供方的地址。
Ribbon:负载均衡+RestTemplate调用
Ribbon-Eureka架构

Ribbon在工作是分成两步
第一步先选择EurekaServer,它优先选择在同一个区域内负载较少的Server
第二步再根据用户指定的策略,再从Server取到的服务注册列表中选择一个地址。
其中Ribbon提供了多种策略:比如轮询、随机和根据响应时间加权
spring-cloud-starter-netflix-eureka-client已经整合了RIbbon
<!--EurekaClient--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency>


替换:
配置不要放在@ComponentScan包或去子包下(即不要放在主启动类的包中),否则这个配置类就会被所有的Ribbon客户端所共享,达不到特殊化定制的目的了。

配置类
package com.atguigu.myrule;import com.netflix.loadbalancer.IRule;import com.netflix.loadbalancer.RandomRule;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class MyselfRule {@Beanpublic IRule myRule(){return new RandomRule();//定义为随机}}
主启动类上
/*** 消费者客户端*/@SpringBootApplication@EnableEurekaClient@RibbonClient(name = "CLOUD-PAYMENT-SERVICE",configuration = MyselfRule.class)//name是服务提供者别名,一定要大写public class OderMain80 {public static void main(String[] args) {SpringApplication.run(OderMain80.class,args);}}
负载均衡轮询算法原理
rest接口第几次请求数%服务器集群总数=实际调用服务器位置下标,每次服务重启后rest接口计数从1开始。
自旋锁加CAS
手写:
在服务提供者的controller中添加
@GetMapping(value = "payment/lb")public String getPaymentLB(){return serverPort;}
自己的myLB类
package com.atguigu.springcloud.lb.impl;import com.atguigu.springcloud.lb.LoadBalancer;import org.springframework.cloud.client.ServiceInstance;import org.springframework.stereotype.Component;import java.util.List;import java.util.concurrent.atomic.AtomicInteger;@Componentpublic class MyLb implements LoadBalancer {private AtomicInteger atomicInteger = new AtomicInteger(0);public final int getAndIncrement(){int current;int next;do{current = this.atomicInteger.get();//为什么是2147483647,2147483647是atomicInteger的最大值next = current >= 2147483647 ? 0 : current+1;}while (!this.atomicInteger.compareAndSet(current,next));System.out.println(next);return next;}@Overridepublic ServiceInstance instance(List<ServiceInstance> serviceInstances) {int index = getAndIncrement() % serviceInstances.size();return serviceInstances.get(index);}}
消费者Controller
@Resourceprivate LoadBalancer loadBalancer;@Resourceprivate DiscoveryClient discoveryClient;@GetMapping(value = "/consumer/payment/lb")public String getPaymentLB(){List<ServiceInstance> instances = discoveryClient.getInstances("CLOUD-PAYMENT-SERVICE");if(instances == null || instances.size() <=0){return null;}ServiceInstance serviceInstance = loadBalancer.instance(instances);URI uri = serviceInstance.getUri();return restTemplate.getForObject(uri+"/payment/lb",String.class);}
服务降级(Hystrix断路器)
Hystrix也已经停止更新,但国内大多数都在使用,官网已不推荐使用,国外比较推荐使用resilience4j,但国内用的很少,国内强烈推荐使用springcloud alibaba的sentinel
分布式系统面临的问题
复杂的分布式体系系统中的应用程序有数十个依赖关系,每个依赖关系在某些时候将不可避免地失败,如网络卡顿,网络调用超时、程序出错,甚至机房断电,这时当请求需要调用服务,当某个服务超时,则有可能造成服务雪崩,导致整个系统瘫痪
服务雪崩,当多个微服务之间互相调用的时候。假设微服务A调用微服务B和微服务C,微服务B和微服务C又调用其他的微服务,这就是所谓的“扇出”。如果扇出的链路上某个微服务的调用响应时间过程或者不可用,对微服务A的调用就会占用越来越多的系统资源,进而引起系统崩溃,也即所谓的“雪崩效应’’(服务的高可用遭到破坏)。
Hystrix简介
Hystrix是一个用于处理分布式系统的延迟和容错的开源库,在分布式系统里,许多依赖不可避免额会调用失败,比如超时、异常等,Hystrix能够保证在一个依赖出问题的情况下,不会导致整体服务失败,避免联级故障,以提高分布式系统的弹性。
断路器本身是一种开关装置,当某个服务单元发生故障之后,通过断路器的故障监控(类似熔断保险丝),向调用放返回一个服务预期的、可处理的备选响应(FallBack),而不是长时间的等待或者抛出调用放无法处理的异常,这样就保证了服务调用方的线程不会被长时间、不必要地占用,从而避免了故障在分布式系统中的蔓延,乃至雪崩。
Hystrix能够做到服务降级、服务熔断、接近实时的监控、限流、隔离等等
服务降级
当某个服务不可用(程序运行异常、超时、服务熔断触发服务降级、线程池/信号量打满也会导致服务降级)了,向调用放返回一个服务预期的、可处理的备选响应(FallBack)
服务降级测试
建module,改pom,建yml,主启动,业务。。。
pom
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-hystrix</artifactId></dependency><!--EurekaClient--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency>
yml
server:port: 8001spring:application:name: cloud-provider-hystrix-paymentdatasource:type: com.alibaba.druid.pool.DruidDataSource #当前数据源操作类型'driver-class-name: org.gjt.mm.mysql.Driver # mysql驱动包url: jdbc:mysql://localhost:3306/db2020?useUnicode-true&characterEncoding=utf-8&useSSL=falseusername: rootpassword: aeureka:client:register-with-eureka: truefetch-registry: trueservice-url:#defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka #集群版defaultZone: http://eureka7001.com:7001/eureka #单机版mybatis :mapperLocations: classpath:mapper/*.xmltype aliases-package: com.atguigu.springcloud.entities #所有Entity别名类所在包
主启动类
@SpringBootApplication@EnableEurekaClientpublic class PaymentHystrixMain8001 {public static void main(String[] args) {SpringApplication.run(PaymentHystrixMain8001.class,args);}}
使用Jmeter进行高并发压力测试,发送N个请求,会出现卡顿现象
原因是因为Tomcat的默认的工作线程数被打满了,没有多余的线程来分解压力和处理
现在还只是服务提供者自测,当消费者也请求进来,那么将有可能导致消费者在那干等。
为避免这种情况的发生,只能做服务降级处理
解决
场景:
1.超时导致服务器变慢:超时不再等待
2.错误(服务器宕机或者程序运行错误):出错又有兜底,有好的返回结果
3.消费者自我故障或有自我要求,需自己处理降级
服务降级配置
先从服务提供者找出问题,给定一个时间,超出为异常,给出兜底方案,做服务交集FallBack,没超出为正常
服务提供者业务方法上启用@HystrixCommand(fallbackMethod = “paymentInfo_TimeHandler”) fallbackMethod,提供一个兜底的方法
@HystrixCommand(fallbackMethod = "paymentInfo_TimeHandler",commandProperties = {@HystrixProperty(name = "execution.isolation.timeoutInMilliseconds",value = "3000")})//commandProperties = {@HystrixProperty(name = "execution.isolation.timeoutInMilliseconds",value = "3000")},给定一个时间峰值,没超出则正常,没超出则异常,调用兜底方法public String paymentInfo_TimeOut(Integer id) {int timeNumber = 5;//模拟超时//int age = 10/0;//模拟程序运行错误try {TimeUnit.SECONDS.sleep(timeNumber);} catch (InterruptedException e) {e.printStackTrace();}return "线程池:"+Thread.currentThread().getName()+"paymentInfo_TimeOut,id:"+id+"超时了"+timeNumber+"秒";}public String paymentInfo_TimeHandler(Integer id) {return "线程池:"+Thread.currentThread().getName()+"paymentInfo_TimeHandler,id:"+id+"o(╥﹏╥)o";}
主启动类激活,添加下面的注解到主启动类中
@EnableCircuitBreaker

消费者的自我降级
1)开启feign,修改yml
feign:hystrix:enabled: true
2)在主启动类上添加@EnableHystrix注解
问题
1)每个方法都需要一个兜底的方法,导致代码膨胀(一对一)
定义一个全局的fallback方法处理相同的业务场景,一对多
在业务类上添加,统一跳转到统一的处理结果页面
@RestController@Slf4j@DefaultProperties(defaultFallback = "payment_Global_FallBack")public class PaymentHystrixController {@HystrixCommandpublic String paymentInfo_TimeOut(@PathVariable("id") Integer id){int age = 10/0;String resule = paymentHystrixService.paymentInfo_TimeOut(id);return resule;}public String payment_Global_FallBack(){return "线程池:"+Thread.currentThread().getName()+"消费者80:超时或运行错误+o(╥﹏╥)o";}}
2)业务逻辑的方法和处理业务逻辑的方法糅合在一块,导致代码耦合度高
@Component@FeignClient(value = "CLOUD-PROVIDER-HYSTRIX-PAYMENT",fallback = PaymentFallbackServiceImpl.class)public interface PaymentHystrixService {@GetMapping(value = "/payment/hystrix/ok/{id}")public String paymentInfo_OK(@PathVariable("id") Integer id);@GetMapping(value = "/payment/hystrix/timeout/{id}")public String paymentInfo_TimeOut(@PathVariable("id") Integer id);}
package com.atguigu.springcloud.service.impl;import com.atguigu.springcloud.service.PaymentHystrixService;import org.springframework.stereotype.Component;@Componentpublic class PaymentFallbackServiceImpl implements PaymentHystrixService {@Overridepublic String paymentInfo_OK(Integer id) {return "o(╥﹏╥)o";}@Overridepublic String paymentInfo_TimeOut(Integer id) {return "o(╥﹏╥)o";}}
当服务提供者宕机后,即使成功的方法也会返回rollback方法

一般在客户端,即消费者中使用服务降级
服务熔断
类似保险丝达到最大服务访问后,直接拒绝访问,拉闸限电,然后调用服务降级的方法并返回友好提示
熔断机制是应对雪崩效应的一种微服务链路保护机制。当扇出链路的某个微服务出错不可用或者响应时间太长时,会进行服务的降级,进而熔断该节点微服务的调用,快速返回错误的响应信息。当检测到该节点微服务调用响应正常后,恢复调用链路
在Spring Cloud框架里,熔断机制通过Hystrix实现,Hystrix会监控微服务间调用的状况,当失败的调用到一定的阈值,缺省是5秒内20次调用失败,就会启动熔断机制,熔断机制的注解是@HystrixCommand
服务的降级->进而熔断->恢复调用链路
三种状态 :关、开、半开
service层
//服务熔断@HystrixCommand(fallbackMethod = "paymentCircuitBreak_fallback",commandProperties = {@HystrixProperty(name = "circuitBreaker.enabled",value = "true"),//是否开启断路器@HystrixProperty(name = "circuitBreaker.requestVolumeThreshold",value = "10"),//请求次数@HystrixProperty(name = "circuitBreaker.sleepWindowInMilliseconds",value = "10000"),//时间窗口期/时间范围@HystrixProperty(name = "circuitBreaker.errorThresholdPercentage",value = "60")//失败率达到百分之60后跳闸 十次请求在10000毫秒内达到百分之六十失败(即十次访问六次失败),则启动断路器})//参数的选择在HystrixCommandProperties这个类上,全局搜索可以找到public String paymentCircuitBreaker(@PathVariable("id") Integer id){if (id<0){throw new RuntimeException("id不能为负数");}String serialNumber = IdUtil.simpleUUID();return Thread.currentThread().getName()+"\t"+"调用成功,流水号:"+serialNumber;}public String paymentCircuitBreak_fallback(@PathVariable("id") Integer id){return "id不能为负数,请重新输入o(╥﹏╥)o";}
controller层
//服务熔断@GetMapping(value = "/payment/circuit/{id}")public String paymentCircuitBreaker(@PathVariable("id") Integer id) {String result = paymentService.paymentCircuitBreaker(id);log.info(result);return result;}
十次请求在10000毫秒内达到百分之六十失败(即十次访问六次失败),则启动断路器,所有请求将会在10000毫秒内都失败,知道过了10000毫秒,断路器才会进入一个半开状态,当请求正常返回,则会关闭断路器,主逻辑恢复,如果请求还是失败,断路器则会持续打开10000毫秒,休眠时间窗口重新计时。
服务限流
秒杀高并发等操作,严谨一窝蜂的过来拥挤,大家排队,一秒钟N个,有序进行,保证服务器不会被打满
服务监控
Hystrix提供了准实时的调用监控,Hystrix会持续地记录所有通过Hystrix发起的请求的执行信息,并以统计报表和图形的形式展示给用户,包括每秒执行多少请求成功,多少是失败等。Netflix通过Hystrix-metrics-stream项目实现了多以上指标的监控。Spring CLoud也提供了Hystrix DashBoard的整合,对监控内容转化成可视化界面。
pom
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-hystrix-dashboard</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency>
主启动类添加@EnableHystrixDashboard
被监控的主启动类上加bean
@SpringBootApplication@EnableEurekaClient@EnableCircuitBreakerpublic class PaymentHystrixMain8001 {public static void main(String[] args) {SpringApplication.run(PaymentHystrixMain8001.class,args);}/*** 此配置是为了微服务监控而配置,与服务容错本身无关,springcloud升级后的坑* ServletRegistrationBean因为springboot的默认路径不是/hystrix.stream* 只要在自己的项目里配置下面的servlet就可以了* @return*/@Beanpublic ServletRegistrationBean getServlet(){HystrixMetricsStreamServlet streamServlet = new HystrixMetricsStreamServlet();ServletRegistrationBean registrationBean = new ServletRegistrationBean(streamServlet);registrationBean.setLoadOnStartup(1);registrationBean.addUrlMappings("/hystrix.stream");registrationBean.setName("HystrixMetricsStreamServlet");return registrationBean;}}


实心圆:共有两种含义。它通过颜色的变化代表了实例的健康程度,它的健康度从绿色<黄色<橙色<红色递减。
该实心圆除了颜色的变化之外,它的大小也会根据实例的请求流量发生变化,流量越大该实心圆就越大。所以通过该实心圆的展示,就可以在大量的实例中快速的发现故障实例和高压力实例。
曲线:用来记录2分钟内流量的相对变化,可以通过它来观察到流量的上升和下降趋势。
服务配置Config
Config不在使用,主流使用alibaba的Nacos
为什么要服务配置
微服务意味着要讲单体应用中的业务拆分成一个个自服务,每个服务的粒度相对较小,因此系统中会出现大量的服务。由于每个服务都需要必要的配置信息才能运行,所以一套集中式的、动态的配置管理设施是必不可少的。
SpringCloud提供了ConfigServer(配置中心)来解决这个问题,我们每一个微服务自己带着一个application.yml,上百个配置文件的管理维护麻烦成本高,解决一处配置,处处生效

SpringCloud Config为微服务架构中的微服务提供集中化的我不配置支持,配置服务器为各个不同微服务应用的所有环境提供了一个中心化的外部配置
SpringCloud Config分为服务端和客户端两部分
服务端也称为分布式配置中心,它是一个独立的微服务应用,用来连接配置服务器并为客户端提供获取配置信息,加密/加密信息等访问接口
客户端则是通过制定的配置中心来管理应用资源,以及与业务相关的配置内容,并在启动的时候从配置中心获取和加载配置行行行配置服务器默认采用GIT来存储配置信息,这样有助于对环境配置进行版本管理,并且通过GIT客户端工具来方便的管理和访问配置内容
系统运行期间动态的调整配置,不再需要每个服务部署的机器上编写配置文件,服务会向配置中心统一拉取配置自己的信息,当配置发生变动是,服务不需要重启即可感知到配置的变化并应用新的配置,将配置信息以REST接口的形式暴露
配置
服务端
pom
<dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-config-server</artifactId></dependency><!--EurekaClient--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><!--引入自己定义的api通用包,可以使用payment支付entities--><dependency><groupId>com.atguigu.springcloud</groupId><artifactId>cloud-api-commons</artifactId><version>${project.version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies>
yml
server:port: 3344spring:application:name: cloud-config-center #注册进Eureka服务器的微服务名cloud:config:server:git:uri: git@githun.com:chuangweiRong/springcloud-config.git #github上面的git仓库名字search-paths:- springcloud-config #搜索目录label: master #读取分支eureka:client:service-url:defaultZone: http://localhost:7001/eureka
主启动类
package com.atguigu.springcloud;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.cloud.config.server.EnableConfigServer;@SpringBootApplication@EnableConfigServerpublic class ConfigCenterMain3344 {public static void main(String[] args) {SpringApplication.run(ConfigCenterMain3344.class,args);}}
客户端
bootstrap.yml的优先加载级别比application.yml高,bootstrap是系统及的,application是用户级的资源配置项
pom
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-config-client</artifactId></dependency><!--EurekaClient--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency>
yml
server:port: 3355spring:cloud:config:label: master #分支名称name: config #配置文件名称profile: dev #读取文件后缀 config-devuri: http://localhost:3344 #配置中心地址application:name: config-clienteureka:client:service-url:defaultZone: http://localhost:7001/eureka
主启动类
import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.cloud.netflix.eureka.EnableEurekaClient;@SpringBootApplication@EnableEurekaClientpublic class ConfigClientMain3355 {public static void main(String[] args) {SpringApplication.run(ConfigClientMain3355.class,args);}}
成功

分布式配置的动态刷新问题
当git上的配置中心文件内容被修改了,刷新Config服务端页面会立刻响应,但Config客户端我响应,需要重启Config客户端
解决(Config客户端动态刷新)
pom
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency>
yml
#暴露监控端点management:endpoint:web:exposure:include: "*"
业务类上添加@RefreshScope注解
import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Value;import org.springframework.cloud.context.config.annotation.RefreshScope;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;@RestController@RefreshScope@Slf4jpublic class ConfigClientController {@Value("${config.info}")private String configInfo;@GetMapping("/configInfo")public String getConfigInfo(){return configInfo;}}
每次修改配置中心需要发送post请求curl -X POST “http://localhost:3355/actuator/refresh‘
上述解决方法虽然解决了服务重启的问题,但每次需要都需要发送post请求,还是有点麻烦
广播,一次通知,处处生效,也可定点通知
使用的是消息总线
服务注册中心(Eureka)
Eureka已经停止更新了,可用Zookeeper、Consul、Nacos替代,其中Zookeeper比较稳定,不建议使用Consul,强烈推荐使用Nacos,可完美替代Eureka
为什么要使用注册中心
有使用过ip:port地址直接调用服务的开发经历么?该段痛苦的经历在此处省略500字……,该种方式的缺点:
需要手动的维护所有的服务访问ip地址列表。
单个服务实现负载均衡需要自己搭建,例如使用nginx负载均衡策略,或者基于容器化多实例部署单个服务,在实例之间做负载均衡。
使用注册中心能够实现服务治理,服务动态扩容,以及服务调用的负载均衡,完整调用链路示例如下:

服务提供者:向注册中心根据服务名称提供服务访问的ip:port以及其他信息。
注册中心:根据服务名称,存储对应的ip:port以及其他信息。
服务消费者:根据服务名向注册中心获取调用服务的ip:port以及其他相关的信息集合,然后根据负载均衡策略获取最终的服务器ip:port访问地址。
1.Eureka
什么是服务治理
在传统的rpc(一个节点请求另一个节点提供的服务)远程调用框架中,管理每个服务与服务之间依赖关系比较复杂,所以为了解决这个问题,需要服务治理,管理服务与服务之间的依赖关系,可以实现服务调用负载均衡、容错等,实现服务发现与注册。
什么是服务注册与发现
Eureka采用了CS的设计架构,Eureka Server作为服务注册功能的服务器,它是服务注册中心,而系统中的其他微服务,使用Eureka的客户端连接到Eureka Server并维持心跳,这样系统的维护人员就可以通过Eureka Server来监控系统中各个微服务是否正常运行。
在服务注册与发现中,有一个注册中心,当服务器启动的时候,会把当前自己服务器的信息,比如服务地址、通信地址等以别名的方式注册到注册中心上。另一方(消费者或者服务器提供者),以该别名的方式去注册中心上获取到实际的服务通讯地址,然后再实现笨的RPC调用RPC远程调用架构核心,其设计思想在于注册中心,因为使用注册中心管理每个服务与服务之间的一个依赖关系(服务治理概念),在任何rpc远程框架中,都会有一个注册中心

配置集群的最主要的原因是为了防止单点故障
Eureka包含两个组件
Eureka Server提供服务注册服务
各个微服务节点通过配置启动后,会在Eureka Server中进行注册,这样EurekaServer中的服务注册表将会存储所有可用服务节点的信息,服务节点的信息可以在界面中直观的看到。
EurekaClient通过注册中心进行访问
EurekaClient是一个JAVA客户端,用于简化EurekaServer的交互,客户端同时也具备一个内置的、使用轮询(round-round)负载算法的负载均衡器。在应用启动后,将会想Eureka Server发送心跳(默认周期为30秒)。如果Eureka Server在多个心跳周期内没有接收到某个节点的心跳,EurekaServer将会从服务注册表中吧这个服务节点移出(默认90秒)。
服务发现Discovery
一个新的注解标签,主要作用是对于注册进Eureka里面的微服务,可以通过服务发现来获得该信息
Eureka的自我保护机制

保护模式主要用于一组客户端和Eureka Server之间存在网络分区场景下的保护,一旦进入保护模式,Eureka Server将会尝试保护器服务注册表中的信息,不再删除服务注册表中的数据,也就是不会注销任何微服务。
上面的红字出现,表示Eurek已经进入了保护模式。
导致原因,是因为某一个微服务不可用了(即默认90内,如果Eureka收不到某个微服务的心跳了),Eureka不会立刻清理(即注销服务),依旧会对该微服务的信息进行保存(有些服务本身是健康的,可能发生了网络分区故障)。属于CAP里面的AP分支
怎么禁止自我保护:Eureka Server的yml配置
服务提供者的yml配置:

拓展点
C/S和B/S架构简介
C/S(Client/Server)架构是客户端和服务器架构,通过充分利用两端硬件环境的优势,将任务合理分配到Client端和Server端来实现。
B/S(Browser/Server)架构是浏览器和服务器架构,用户工作界面是通过浏览器来实现,极少部分事务逻辑在前端(Browser)实现,但是主要事务逻辑在服务器端(Server)实现。
负载均衡
CAP理论
- C(一致性):所有的节点上的数据时刻保持同步
- A(可用性):每个请求都能接受到一个响应,无论响应成功或失败
- P(分区容错):系统应该能持续提供服务,即使系统内部有消息丢失(分区)
高可用、数据一致是很多系统设计的目标,但是分区又是不可避免的事情:
CA without P:如果不要求P(不允许分区),则C(强一致性)和A(可用性)是可以保证的。但其实分区不是你想不想的问题,而是始终会存在,因此CA的系统更多的是允许分区后各子系统依然保持CA。
CP without A:如果不要求A(可用),相当于每个请求都需要在Server之间强一致,而P(分区)会导致同步时间无限延长,如此CP也是可以保证的。很多传统的数据库分布式事务都属于这种模式。
AP wihtout C:要高可用并允许分区,则需放弃一致性。一旦分区发生,节点之间可能会失去联系,为了高可用,每个节点只能用本地数据提供服务,而这样会导致全局数据的不一致性。现在众多的NoSQL都属于此类。
Zuul(服务路由)
Springcloud Alibaba
官方博客:https://github.com/alibaba/spring-cloud-alibaba/blob/master/README-zh.md
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-alibaba-dependencies</artifactId><version>2.1.0.RELEASE</version><type>pom</type><scope>import</scope></dependency>
服务注册和配置中心Nacos
一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。
Nacos=Eureka+Config+Bus
下载地址:https://github.com/alibaba/nacos/releases/tag/2.0.3
我下载的是2.0.3版本
解压到本地后进入bin目录双击startup.cmd
报错
ERROR Nacos failed to start, please see D:\DevelopmentTools\nacos\nacos\logs\nacos.log for more details.
nacos默认启动的是集群模式,windows以单机模式启动
cmd进入bin目录运行startup.cmd -m standalone
也可在startup.cmd以记事本方式打开,然后我们将原来的set MODE=”cluster”集群改为set MODE=”standalone”单机,然后在直接鼠标点击startup.cmd启动即可。
访问:http://localhost:8848/nacos

默认账号和密码 nacos nacos

nacos作为注册中心配置
父pom
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-alibaba-dependencies</artifactId><version>2.1.0.RELEASE</version><type>pom</type><scope>import</scope></dependency>
子pom
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency>
yml
server:port: 9001spring:application:name: nacos-payment-providercloud:nacos:discovery:server-addr: localhost:8848 #配置nacos地址#暴露服务management:endpoints:web:exposure:include: '*'
主启动类
@SpringBootApplication@EnableDiscoveryClient //能够让注册中心能够发现,扫描到该服务。public class PaymentMain9001 {public static void main(String[] args) {SpringApplication.run(PaymentMain9001.class,args);}}
业务类
import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Value;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.RestController;@RestController@Slf4jpublic class PaymentController {@Value("${server.port}")private String serverPort;@GetMapping("/payment/nacos/{id}")public String getPayment(@PathVariable("id") Long id){return "nacos registry,serverPort:"+serverPort+"\t id:"+id;}}
成功

新建module9002和消费者83,测试nacos自带负载均衡功能
9002和上面一样步骤,接下来是83
pom和主启动类一样
yml
server:port: 83spring:application:name: nacos-order-consumercloud:nacos:discovery:server-addr: localhost:8848#消费者将要去访问的微服务名称(成功注册进nacos的服务)service-url:nacos-user-service: http://nacos-payment-provider
业务类
package com.atguigu.springcloud.controller;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Value;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.RestController;import org.springframework.web.client.RestTemplate;import javax.annotation.Resource;@RestController@Slf4jpublic class OrderNacosController {@Resourceprivate RestTemplate restTemplate;@Value("${service-url.nacos-user-service}")private String serverUrl;@GetMapping(value = "/consumer/payment/nacos/{id}")public String paymentInfo(@PathVariable("id") Long id){log.info(serverUrl);return restTemplate.getForObject(serverUrl+"/payment/nacos/"+id,String.class);}}
config类
package com.atguigu.springcloud.config;import org.springframework.cloud.client.loadbalancer.LoadBalanced;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.web.client.RestTemplate;@Configurationpublic class ApplicationContextConfig {@Bean@LoadBalancedpublic RestTemplate getRestTemplate(){return new RestTemplate();}}
nacos作为配置中心配置
基础配置(类似config+bus的配置)
pom
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency>
bootstrap.yml
server:port: 3377spring:application:name: nacos-config-clientcloud:nacos:discovery:server-addr: localhost:8848 #nacos作为注册中心地址config:server-addr: localhost:8848 #nacos作为配置中心地址file-extension: yaml #指定yaml格式的配置
application.yml
spring:profiles:active: dev #表示开发环境
为什么要两个:因为nacos同springcloud config一样,在项目初始化是,要保障先从配置中心进行配置拉取,拉取配置之后,才能保证项目的正常启动。
springboot中配置文件的加载时存在优先级顺序的,bootstrap的优先级高于application
controller类
import org.springframework.beans.factory.annotation.Value;import org.springframework.cloud.context.config.annotation.RefreshScope;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;@RestController@RefreshScope //支持Nacos的动态刷新功能public class ConfigClientController {@Value("${config.info}")private String configInfo;@GetMapping("/config/info")public String getConfigInfo(){return configInfo;}}
在配置列表中创建名为nacos-config-client-dev.yaml的文件(即data id)进行配置

命名公式如下:
{spring.profile.active}.${spring.cloud.nacos.config.file-extension}
prefix:默认为spring.application.name的值,也可以通过配置项spring. cloud . nacos. config. prefix来配置。
spring.profile.active:即为当前环境对应的profile, 详情可以参考Spring
Boot文档。注意:当spring. profile. active为空时,对应的连接符-也将不存在,datald的拼接格式变成{file-extension}
file-exetension:为配置内容的数据格式,可以通过配置项spring. cloud.nacos.config. file-extension来配置。目前只支持properties和yaml类型。
最后通过Spring Cloud原生注解@RefreshScope 实现配置自动更新:
分类配置
多环境多项目管理,实际开发中,通常一个系统会准备dev开发环境、test测试环境、prod生产环境,要如何保证指定环境启动时服务能正确读取到nacos上相应环境的配置文件呢?还有就是一个大型分布式微服务系统会有很多微服务子项目,每个微服务项目有都会有相应的开发环境、测试环境、预发环境、正式环境…,要怎么对这些微服务进行管理呢?
命名空间用于区分部署环境的,Group和Data ID逻辑上区分两个目标对象

Group可以把不同的微服务划分到同一个分组里去
service就是微服务,一个service可包含多个cluster集群
instance就是微服务实例
Data Id方案
指定spring.profile.active和配置文件的DataID来使不同环境下读取不同的配置
默认空间+默认分组+新建dev和test两个DataID
通过spring.profile.active属性就能进行多环境下配置文件的读取
spring:profiles:#active: dev #表示开发环境active: test #表示开发环境

Group方案

application.yml
spring:profiles:#active: dev #表示开发环境#active: test #表示开发环境active: info
bootstrap.yml添加group属性
spring:application:name: nacos-config-clientcloud:nacos:discovery:server-addr: localhost:8848 #nacos作为注册中心地址config:server-addr: localhost:8848 #nacos作为配置中心地址file-extension: yaml #指定yaml格式的配置group: TEST_GROUP
命名空间方案


bootstrap.yml添加namespace属性
server:port: 3377spring:application:name: nacos-config-clientcloud:nacos:discovery:server-addr: localhost:8848 #nacos作为注册中心地址config:server-addr: localhost:8848 #nacos作为配置中心地址file-extension: yaml #指定yaml格式的配置group: TEST_GROUP #分组名namespace: ea231ed7-e021-4e57-b82e-9404353f7d50 #命名空间ID#需要在nacos创建名为nacos-config-client-dev.yaml的文件进行配置,有公式
拓展
nacos能够在AP和CP之间进行切换,CAP理论
一般来说,如果不需要存储服务级别的信息且服务实例是通过nacos-client注册,并能够保持心跳上报,那么久可以选择AP模式,当前主流的服务如Springcloud和Dubbo服务,都适用于AP模式,AP模式为了服务的可能性而减弱一致性,因此AP模式下只支持注册临时实例。
如果需要在服务级别编辑或者存储配置信息,那么CP是必须的,KS服务和DNS服务则适用于CP模式,CP模式下则支持注册持久化实例,此时则是以Raft协议为集群运行模式,该模式下注册实例之间必须先注册服务,如果服务不存在,则会返回错误信息。
模式切换命令:Curl -X PUT SNACOS SERVER:8848/nacos/v1/ns/oper ator/switches?entry-serverMode8value=CP


Nacos集群和持久化配置
集群架构图

请求->nginx集群->nacos集群->高可用mysql集群
nacos默认内嵌了数据库(derby)实现数据的存储,所以,如果启动多个默认配置下的nacos节点,数据存储存在一致性问题。为了解决这个问题,nacos采用了集中式存储的方式来支持集群化部署,只不过nacos目前只支持mysql的存储
支持三种集群部署模式:
单机模式:用于测试和单机试用
集群模式:用于生产环境、确保高可用
多集群模式:用于多数据中心场景
配置mysql数据源
- 1.安装数据库,版本要求:5.6.5+
- 2.初始化mysql数据库,数据库初始化文件:nacos-mysql.sql
- 3.修改conf/application.properties文件,增加支持mysql数据源配置(目前只支持mysql),添加mysql数据源的url、用户名和密码。
spring.datasource.platform=mysqldb.num=1 db.url.0=jdbc:mysql://11.162.196.16:3306/nacos_config?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=truedb.user=rootdb.password=a
在控制台添加配置文件后

在mysql数据库nacos_config下的config_info表中能看到记录,表明成功

集群配置
预计需要1个nginx、3个nacos、1个mysql
去官网下载linux版nacos进入linux解压:tar -zxvf 压缩包
导入数据库,修改properties文件,按照官网上的该,改成自己的
修改cluster.conf文件
添加3台主机ip和端口号
修改nacos解压目录下bin下的startup.sh脚本 ,让其启动不同的端口


然后安装nginx,并修改配置文件
添加
upstream cluster{
server 127.0.0.1:8846;
server 127.0.0.1:8847;
server 127.0.0.1:8848;
}
server {
location /nacos {
proxy_pass http://cluster;
}
}
Seata处理分布式事务
分布式事务问题
单体应用被拆分成多个微服务应用,原来的三个模块被拆分成三个独立的应用,分别使用三个独立的数据源,业务操作需要调用三个服务来完成。此时每个服务内部的数据一致性由本地事务来保证,但全局的数据一致性问题没法保证。
为了解决全局数据一致性问题,seate由此而来
Seata简介
Seata 是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。
Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。
一个典型的分布式事务过程:
分布式事务处理过程的唯一事务ID+三组件模型:TC(事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚) TM(事务管理器,控制全局事务的便捷,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议) RM(资源管理器,控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚)

- TM向TC申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的XID
- XID在微服务调用链路的上下文中传播;
- RM向TC注册分支事务,将其纳入XID对应全局事务的管辖;
- TM向TC发起针对XID的全局提交或回滚协议;
- TC调度XID下管辖的全部分支事务完成提交或回滚请求。
配置
本地事务控制@Transaction
全局事务控制@GlobalTransaction

下载地址:https://seata.io/zh-cn/blog/download.html
下载解压后进入conf目录备份修改file.conf文件
主要修改:自定义事务组名称+事务日志存储模式为db+数据库连接信息


新建seata数据库,导入sql文件,sql文件在conf目录下的db_store.sql

先启动nacos,在启动seata
三个服务
订单、库存、账户
当用户下单是,会在订单服务中创建一个订单,然后通过远程调用库存服务来扣减下单商品的库存,再通过远程调用账户服务来扣减用户账户里面的余额,最后在订单服务中修改订单状态为完成
该操作跨越三个数据库,有两次远程调用,很明显会有分布式事务问题
新建三个库,在对应的建立响应表和回滚日志表,日志表语句在seata解压目录下的conf目录中db_undo_log.sql

订单服务模块
pom
<dependencies><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-seata</artifactId><exclusions><exclusion><groupId>io.seata</groupId><artifactId>seata-all</artifactId></exclusion></exclusions></dependency><dependency><groupId>io.seata</groupId><artifactId>seata-all</artifactId><version>0.9.0</version></dependency><!-- 后续做持久化用到 --><dependency><groupId>com.alibaba.csp</groupId><artifactId>sentinel-datasource-nacos</artifactId></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-sentinel</artifactId></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId></dependency><dependency><groupId>com.atguigu.springcloud</groupId><artifactId>cloud-api-commons</artifactId><version>${project.version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.1.22</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies>
yml
server:port: 2001spring:application:name: seata-order-servicecloud:alibaba:seata:tx-service-group: fsp_tx_groupnacos:discovery:server-addr: localhost:80datasource:driver-class-name: com.mysql.jdbc.Driverurl: jdbc:mysql://localhost:3306/seata_orderusername: rootpassword: afeign:hystrix:enabled: falselogging:level:io:seata: infomybatis:mapper-locations: classpath:mapper/*.xml

mapper.xml
<?xml version="1.0" encoding="UTF-8" ?><!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" ><mapper namespace="com.atguigu.springcloud.alibaba.dao.OrderDao"><resultMap id="BaseResultMap" type="com.atguigu.springcloud.alibaba.domain.Order"><id column="id" property="id" jdbcType="BIGINT"/><result column="user_id" property="userId" jdbcType="BIGINT"/><result column="product_id" property="productId" jdbcType="BIGINT"/><result column="count" property="count" jdbcType="INTEGER"/><result column="money" property="money" jdbcType="DECIMAL"/><result column="status" property="status" jdbcType="INTEGER"/></resultMap><insert id="createOrder">insert into t_order(id,user_id,product_id,count,money,status) value (null,#{userId},#{productId},#{count},#{money},0)</insert><update id="updateOrderStatus">update t_order set status = 1<where>user_id=#{userId} and status=#{status}</where></update></mapper>
主启动类
import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;import org.springframework.cloud.client.discovery.EnableDiscoveryClient;import org.springframework.cloud.openfeign.EnableFeignClients;@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)//取消数据源的自动创建@EnableFeignClients@EnableDiscoveryClientpublic class SeataOrderMainApp2001 {public static void main(String[] args) {SpringApplication.run(SeataOrderMainApp2001.class,args);}}

import com.alibaba.druid.pool.DruidDataSource;import io.seata.rm.datasource.DataSourceProxy;import org.apache.ibatis.session.SqlSessionFactory;import org.mybatis.spring.SqlSessionFactoryBean;import org.mybatis.spring.transaction.SpringManagedTransactionFactory;import org.springframework.beans.factory.annotation.Value;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.io.support.PathMatchingResourcePatternResolver;import javax.sql.DataSource;@Configurationpublic class DataSourceProxyConfig {@Value("${mybatis.mapper-locations}")private String mapperLocation;@Bean@ConfigurationProperties(prefix = "spring.datasource")public DataSource druidDataSource(){return new DruidDataSource();}@Beanpublic DataSourceProxy dataSourceProxy(DataSource dataSource){return new DataSourceProxy(dataSource);}@Beanpublic SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception{SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();sqlSessionFactoryBean.setDataSource(dataSourceProxy);sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocation));sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());return sqlSessionFactoryBean.getObject();}}
import org.mybatis.spring.annotation.MapperScan;import org.springframework.context.annotation.Configuration;@Configuration@MapperScan("com.atguigu.springcloud.alibaba.dao")public class MybatisConfig {}
实体属性类
import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import java.math.BigDecimal;@Data@AllArgsConstructor@NoArgsConstructorpublic class Order {private Long id;private Long userId;private Long productId;private Integer count;private BigDecimal money;private Integer status; //订单状态 0:创建中 1:已完结}
service类
import com.atguigu.springcloud.alibaba.domain.Order;public interface OrderService {void createOrder(Order order);}
import com.atguigu.springcloud.alibaba.entities.CommonResult;import org.springframework.cloud.openfeign.FeignClient;import org.springframework.web.bind.annotation.PostMapping;import org.springframework.web.bind.annotation.RequestParam;import java.math.BigDecimal;@FeignClient(value = "seata-account-service")public interface AccountService {@PostMapping("/account/decrease")CommonResult decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money);}
import com.atguigu.springcloud.alibaba.entities.CommonResult;import org.springframework.cloud.openfeign.FeignClient;import org.springframework.web.bind.annotation.PostMapping;import org.springframework.web.bind.annotation.RequestParam;@FeignClient(value = "seata-storage-service")public interface StorageService {@PostMapping(value = "/storage/decrease")CommonResult decrease(@RequestParam("productId") Long productId, @RequestParam("count") Integer count);}
service实现类
import com.atguigu.springcloud.alibaba.dao.OrderDao;import com.atguigu.springcloud.alibaba.domain.Order;import com.atguigu.springcloud.alibaba.service.AccountService;import com.atguigu.springcloud.alibaba.service.OrderService;import com.atguigu.springcloud.alibaba.service.StorageService;import io.seata.spring.annotation.GlobalTransactional;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Service;import javax.annotation.Resource;@Service@Slf4jpublic class OrderServiceImpl implements OrderService {@Resourceprivate OrderDao orderDao;@Resourceprivate StorageService storageService;@Resourceprivate AccountService accountService;@Override//毫秒级,遇到异常可回滚,可提供兜底方法@GlobalTransactional(name = "fsp-create-order",rollbackFor = Exception.class)//name是唯一的,rollbackFor = Exception.class遇到任意异常都要回滚public void createOrder(Order order) {log.info("------>开始新建订单");orderDao.createOrder(order);log.info("------>订单微服务开始调用库存,进行扣减");storageService.decrease(order.getProductId(),order.getCount());log.info("------>订单微服务开始调用库存,扣减成功");log.info("------>订单微服务开始调用账户,进行金额扣减");accountService.decrease(order.getUserId(),order.getMoney());log.info("------>订单微服务开始调用账户,扣减成功");log.info("------>开始修改订单状态");orderDao.updateOrderStatus(order.getUserId(),0);log.info("------>end");}}
dao类
import com.atguigu.springcloud.alibaba.domain.Order;import org.apache.ibatis.annotations.Mapper;import org.apache.ibatis.annotations.Param;@Mapperpublic interface OrderDao {//新建订单void createOrder(Order order);//修改订单状态void updateOrderStatus(@Param("userId") Long userId,@Param("status") Integer status);}
controller类
import com.atguigu.springcloud.alibaba.domain.Order;import com.atguigu.springcloud.alibaba.entities.CommonResult;import com.atguigu.springcloud.alibaba.service.OrderService;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;@RestControllerpublic class OrderController {@Resourceprivate OrderService orderService;@GetMapping("/order/create")public CommonResult createOrder(Order order){orderService.createOrder(order);return new CommonResult(200,"订单创建成功");}}
库存模块
pom和yml几乎一样,yml只需改端口后和对应的数据库,也要添加seata配置文件file和register
xml
<?xml version="1.0" encoding="UTF-8" ?><!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" ><mapper namespace="com.atguigu.springcloud.alibaba.dao.StorageDao"><resultMap id="BaseResultMap" type="com.atguigu.springcloud.alibaba.domain.Storage"><id column="id" property="id" jdbcType="BIGINT"/><result column="product_id" property="productId" jdbcType="BIGINT"/><result column="total" property="total" jdbcType="INTEGER"/><result column="used" property="used" jdbcType="INTEGER"/><result column="residue" property="residue" jdbcType="INTEGER"/></resultMap><update id="decrease">update t_storage set used = used+#{count},residue=residue-#{count}<where>product_id=#{productId}</where></update></mapper>
主启动
import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;import org.springframework.cloud.client.discovery.EnableDiscoveryClient;import org.springframework.cloud.openfeign.EnableFeignClients;@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)//取消数据源的自动创建@EnableFeignClients@EnableDiscoveryClientpublic class SeataStorageMainApp2002 {public static void main(String[] args) {SpringApplication.run(SeataStorageMainApp2002.class,args);}}
实体属性类
import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;@Data@AllArgsConstructor@NoArgsConstructorpublic class Storage {private Long id;private Long productId;private Integer total; //总库存private Integer used; //已用库存private Integer residue; //剩余库存}
config和订单服务一样
service类
public interface StorageService {//扣减库存void decrease(Long productId,Integer coung);}
service实现类
import com.atguigu.springcloud.alibaba.dao.StorageDao;import com.atguigu.springcloud.alibaba.service.StorageService;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Service;import javax.annotation.Resource;@Servicepublic class StorageServiceImpl implements StorageService {private static final Logger LOGGER = LoggerFactory.getLogger(StorageServiceImpl.class);@Resourceprivate StorageDao storageDao;@Overridepublic void decrease(Long productId, Integer count) {LOGGER.info("----->storage-service开始扣减库存");storageDao.decrease(productId,count);LOGGER.info("----->storage-service完成扣减");}}
dao接口
import org.apache.ibatis.annotations.Mapper;import org.apache.ibatis.annotations.Param;@Mapperpublic interface StorageDao {//扣减库存void decrease(@Param("productId") Long productId, @Param("count") Integer count);}
controller类
import com.atguigu.springcloud.alibaba.entities.CommonResult;import com.atguigu.springcloud.alibaba.service.StorageService;import org.springframework.web.bind.annotation.*;import javax.annotation.Resource;@RestControllerpublic class StorageController {@Resourceprivate StorageService storageService;@RequestMapping(value = "/storage/decrease",method = RequestMethod.POST)public CommonResult decrease(Long productId, Integer count){storageService.decrease(productId, count);return new CommonResult(200,"扣减成功");}}
账户模块
pom和yml几乎一样,yml只需改端口后和对应的数据库,也要添加seata配置文件file和register
mapper.xml
<?xml version="1.0" encoding="UTF-8" ?><!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" ><mapper namespace="com.atguigu.springcloud.alibaba.dao.AccountDao"><resultMap id="BaseResultMap" type="com.atguigu.springcloud.alibaba.domain.Account"><id column="id" property="id" jdbcType="BIGINT"/><result column="user_id" property="userId" jdbcType="BIGINT"/><result column="total" property="total" jdbcType="DECIMAL"/><result column="used" property="used" jdbcType="DECIMAL"/><result column="residue" property="residue" jdbcType="DECIMAL"/></resultMap><update id="decrease">update t_account set residue = residue-#{money},used=used+#{money}<where>user_id=#{userId}</where></update></mapper>
主启动类
import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;import org.springframework.cloud.client.discovery.EnableDiscoveryClient;import org.springframework.cloud.openfeign.EnableFeignClients;@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)//取消数据源的自动创建@EnableFeignClients@EnableDiscoveryClientpublic class AccountServiceMainApp2003 {public static void main(String[] args) {SpringApplication.run(AccountServiceMainApp2003.class,args);}}
config一样
实体属性类
import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import java.math.BigDecimal;@Data@AllArgsConstructor@NoArgsConstructorpublic class Account {private Long id;private Long userId;private BigDecimal total; //总额度private BigDecimal used; //已用额度private BigDecimal residue; //剩余额度}
service类
import java.math.BigDecimal;public interface AccountService {//扣减余额void decrease(Long userId, BigDecimal money);}
service实现类
import com.atguigu.springcloud.alibaba.dao.AccountDao;import com.atguigu.springcloud.alibaba.service.AccountService;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Service;import javax.annotation.Resource;import java.math.BigDecimal;import java.util.concurrent.TimeUnit;@Service@Slf4jpublic class AccountServiceImpl implements AccountService {@Resourceprivate AccountDao accountDao;@Overridepublic void decrease(Long userId, BigDecimal money) {log.info("----->account-service开始扣减账户余额");/*模拟超时已超,由于使用的是OpenFeign远程调用服务,其默认超时时间为一秒,此时过了一秒后还没处理完,此处将会报异常,订单状态修改失败,即前面的业务即使付款成功,订单状态却是未完成的*///解决办法,在业务的最初入口添加@GlobalTransactional注解,即order业务类上try {TimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}accountDao.decrease(userId, money);log.info("----->account-service完成扣减");}}
dao接口
import org.apache.ibatis.annotations.Mapper;import org.apache.ibatis.annotations.Param;import java.math.BigDecimal;@Mapperpublic interface AccountDao {//扣减余额void decrease(@Param("userId") Long userId, @Param("money")BigDecimal money);}
controller类
import com.atguigu.springcloud.alibaba.entities.CommonResult;import com.atguigu.springcloud.alibaba.service.AccountService;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;import java.math.BigDecimal;@RestControllerpublic class AccountController {@Resourceprivate AccountService accountService;@RequestMapping("/account/decrease")public CommonResult decrease(Long userId,BigDecimal money){accountService.decrease(userId, money);return new CommonResult(200,"扣减成功");}}
超时异常
由于我们使用的是OpenFeign,其默认超时的时间为一秒,远程调用处理超过一秒则报错
import com.atguigu.springcloud.alibaba.dao.AccountDao;import com.atguigu.springcloud.alibaba.service.AccountService;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Service;import javax.annotation.Resource;import java.math.BigDecimal;import java.util.concurrent.TimeUnit;@Service@Slf4jpublic class AccountServiceImpl implements AccountService {@Resourceprivate AccountDao accountDao;@Overridepublic void decrease(Long userId, BigDecimal money) {log.info("----->account-service开始扣减账户余额");/*模拟超时已超,由于使用的是OpenFeign远程调用服务,其默认超时时间为一秒,此时过了一秒后还没处理完,此处将会报异常,订单状态修改失败,即前面的业务即使付款成功,订单状态却是未完成的*///解决办法,在业务的最初入口添加@GlobalTransactional注解,即order业务类上try {TimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}accountDao.decrease(userId, money);log.info("----->account-service完成扣减");}}
import com.atguigu.springcloud.alibaba.dao.OrderDao;import com.atguigu.springcloud.alibaba.domain.Order;import com.atguigu.springcloud.alibaba.service.AccountService;import com.atguigu.springcloud.alibaba.service.OrderService;import com.atguigu.springcloud.alibaba.service.StorageService;import io.seata.spring.annotation.GlobalTransactional;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Service;import javax.annotation.Resource;@Service@Slf4jpublic class OrderServiceImpl implements OrderService {@Resourceprivate OrderDao orderDao;@Resourceprivate StorageService storageService;@Resourceprivate AccountService accountService;@Override//毫秒级,遇到异常可回滚,可提供兜底方法@GlobalTransactional(name = "fsp-create-order",rollbackFor = Exception.class)//name是唯一的,rollbackFor = Exception.class遇到任意异常都要回滚,就不会提交写的操作,即不会添加记录public void createOrder(Order order) {log.info("------>开始新建订单");orderDao.createOrder(order);log.info("------>订单微服务开始调用库存,进行扣减");storageService.decrease(order.getProductId(),order.getCount());log.info("------>订单微服务开始调用库存,扣减成功");log.info("------>订单微服务开始调用账户,进行金额扣减");accountService.decrease(order.getUserId(),order.getMoney());log.info("------>订单微服务开始调用账户,扣减成功");log.info("------>开始修改订单状态");orderDao.updateOrderStatus(order.getUserId(),0);log.info("------>end");}}
Sentinel实现熔断与限流
分布式系统的流量防卫兵,替代Hystrix
官方中文文档:https://github.com/alibaba/Sentinel/wiki/介绍
下载地址:https://github.com/alibaba/Sentinel/releases/
下载完成后使用 java -jar命令运行jar包,然后访问http://localhost:8080/

配置
pom
<dependencies><!-- 后续做持久化用到 --><dependency><groupId>com.alibaba.csp</groupId><artifactId>sentinel-datasource-nacos</artifactId></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-sentinel</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency><dependency><groupId>com.atguigu.springcloud</groupId><artifactId>cloud-api-commons</artifactId><version>${project.version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies>
yml
server:port: 8401spring:application:name: cloudalibaba-sentinel-servicecloud:nacos:discovery:server-addr: localhost:80sentinel:transport:dashboard: localhost:8080port: 8719 #默认8719,加入被占用会自动从8719+1扫码,知道找到未占用的端口management:endpoints:web:exposure:include: '*'
主启动类
import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.cloud.client.discovery.EnableDiscoveryClient;@SpringBootApplication@EnableDiscoveryClientpublic class SentinelServiceMain8401 {public static void main(String[] args) {SpringApplication.run(SentinelServiceMain8401.class,args);}}
controller
import lombok.extern.slf4j.Slf4j;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;@RestController@Slf4jpublic class FlowLimitController {@GetMapping("/testA")public String testA(){return "testA";}@GetMapping("/testB")public String testB(){return "testB";}}
启动后,访问http://localhost:8080/,页面是空的,
是因为采用的是懒加载,需要发送一次请求

流控规则

资源名:唯一名称,默认为请求路径
针对来源:Sentinel可以针对调用者进行限流,填写微服务名,默认default(不区分来源)
阈值类型/单机阈值:
QPS(每秒的请求数量):当调用该api的QPS达到阈值的时候,进行限流
线程数:当调用该api的线程数达到阈值的时候,进行限流
是否集群:不需要集群
流控模式
直接:api达到限流条件时,直接限流
关联:当关联的资源达到阈值时,就限流自己
链路:只记录指定链路上的流量(指定资源从入口资源进来的流量,如果达到阈值,就进行限流)api级别的针对来源
流控效果
快速失败:直接失败,抛异常

上图表示,当/testA的每秒的请求超过1次,将直接在前台抛出错误信息


上图表示当/testB每秒的请求超过一次,限流/testA请求
postman模拟多线程请求


Warm Up(预热):根据codeFactor(冷加载因子,默认3)的值,从阈值/codeFactor,经过预热时长,才达到设置的QPS阈值
当每秒请求数超过从阈值/codeFactor时,就会报错,经过设置的时长后才慢慢增长到设置的阈值,防止某一瞬间大量请求访问系统,造成系统崩溃
秒杀系统在开启的瞬间,会有很多流量上来,很有可能把系统打死,预热方式就是为了保护系统,可慢慢的把流量放进来,慢慢的把阈值增长到设置的阈值
排队等待:均匀排队,让请求以均匀的速度通过,阈值类型必须为QPS,否则失败
熔断降级规则

RT(平均响应时间,秒级):
平均响应时间超过阈值且在时间窗口内通过的请求>=5,两个条件同时满足后触发降级
窗口期过后关闭断路器
RT最大4900(更大的需要通过-Dcsp.sentinel.max.rt=XXXX才能生效)
异常比例(秒级):QPS>=5且异常比例超过阈值时,触发降级,时间窗口结束后,关闭降级
异常数(分钟级):异常数超过阈值时,触发降级,时间窗口结束后,关闭降级
Sentinel的断路器是没有半开状态的
热点规则
仅支持QPS
热点即经常访问的数据,很多时候我们希望统计某个热点数据中范文频次最高的Top k数据,并对其访问进行限制。比如:
- 商品ID为参数,统计一段时间内最常购买的商品IP并进行限制
- 用户ID为参数,统计一段时间内频繁访问的用户ID进行限制
热点参数限流会统计传入参数中的热点参数,并根据配置的限流阈值与模式,对包含热点参数的资源调用进行限流。热点参数限流可以看作是一种特殊的流量控制,仅对包含热点参数的资源生效。
测试代码
@GetMapping("/testHotKey")@SentinelResource(value = "testHotKey",blockHandler = "deal_testHotKey") //提供兜底的方法,只管sentinel控制台配置的限流规则,不管java程序运行时的错,需要添加fallbake属性public String testHotKey(@RequestParam(value = "p1" , required = false) String p1,@RequestParam(value = "p2",required = false) String p2){return "testHotKey";}/**** 兜底方法* @param p1* @param p2* @param blockException* @return*/public String deal_testHotKey(String p1, String p2, BlockException blockException){return "testHotKey,o(╥﹏╥)o";}


参数例外项

期望参数是某个特殊值时,它的限流规则和平时的不一样
系统规则
系统自适应限流,从整体纬度对应用入口流量进行控制,结合应用的Load、RT、线程数、入口QPS、CPU是用来等几个纬度的监控指标,通过自适应的流控策略,让系统的入口流量和系统的负载达到一个平衡,让系统尽可能跑在最大吞吐量的同时保证系统整体的稳定性

Load 自适应(进队Linux/Unix-like机器生效):系统的Load1作为启发指标,进行自适应系统保护,当系统Load1超过设定的启发值,且系统当前的并发线程数超过估算的系统容量时才回触发系统保护(BBR阶段)。系统容量由系统的maxQps minRt估算得出。设定参考值一般是 CPU cores 2.5
CPU使用率:当系统CPU使用率超过阈值即触发系统保护(取值范围0.0~1.0),比较灵敏
平均RT:当单台机器上所有入口流量的平均RT达到阈值即触发系统保护,单位是毫秒
并发线程数:当单台机器上所有入口流量的并发线程数达到阈值时触发系统保护
入口QPS:当单台机器上所有入口流量的QPS达到阈值时触发系统保护
@SentinelResource
按资源名称限流+兜底方法
@SentinelResource(value = "testHotKey",blockHandler = "deal_testHotKey")

按Url地址限流+兜底方法

上述兜底方法面临的问题
代码耦合,膨胀,每次处理就要写一个兜底方法
系统默认的又不能体现我们的业务
解决,自定义全局统一的处理方法
客户自定义限流处理逻辑
新建一个类用于自定义限流处理逻辑
import com.alibaba.csp.sentinel.slots.block.BlockException;import com.atguigu.springcloud.entities.CommonResult;public class CustomerBlockHandler {public static CommonResult handlerException(BlockException blockException){return new CommonResult(444,"自定义处理");}public static CommonResult handlerException2(BlockException blockException){return new CommonResult(300,"自定义处理");}}
然后
@SentinelResource(value = "testHotKey",blockHandlerClass = CustomerBlockHandler.class,blockHandler = "handlerException")
更多注解属性说明
SphU:定义资源
Tracer:定义统计
ContextUtil:定义上下文
fallback服务熔断功能
sentinel整合Ribbon/OpenFeign+fallback

@SentinelResource(value = “”,fallback=”兜底方法”)
fallback只负责业务异常处理方法
fallback和blockHandler可协调使用
规则持久化
当关闭sentinel上的微服务时,每次重启该微服务时,sentinel上的规则则无了
将规则持久化到nacos
pom
<dependency><groupId>com.alibaba.csp</groupId><artifactId>sentinel-datasource-nacos</artifactId></dependency>
yml,添加nacos数据源配置
server:port: 8401spring:application:name: cloudalibaba-sentinel-servicecloud:nacos:discovery:server-addr: localhost:80sentinel:transport:dashboard: localhost:8080port: 8719 #默认8719,加入被占用会自动从8719+1扫码,知道找到未占用的端口datasource:ds1:nacos:server-addr: localhost:80dataId: ${spring.application.name}groupId: DEFAULT_GROUPdata-type: jsonrule-type: flowmanagement:endpoints:web:exposure:include: '*'
在nacos上添加配置

其他组件
服务注册中心(Consul)
Consul是一套开源的分布式服务发现好配置管理系统,有HashCorp公司用Go语言开发。
提供了微服务系统中的服务治理、配置中心、控制总线等功能。这些功能中的每一个都可以根据需要单独使用,也可以一起使用构建全方位的服务网络,总之Consul提供了一种完整的服务网格解决方案。
它具有很多有点,包括:基于raft协议,比较简洁;支持健康检查,同时支持HTTP和DNS协议;支持跨数据中心的WAN集群,提供图形界面跨平台;支持Linux、Mac、Windows、K-V存储
下载网址:https://www.consul.io/downloads.html
中文文档:https://www.springcloud.cc/spring-cloud-consul.html
安装教程:https://learn.hashicorp.com/consul/getting-started/install.html
下载完成后:


springcloud整合consul
pom
<!-- springcloud consul server --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-consul-discovery</artifactId></dependency>
yml
server:port: 8006spring:application:name: consul-provider-paymentcloud:consul:discovery:#hostname:127.0.0.1service-name: ${spring.application.name}
消费者yml
server:port: 80spring:application:name: cloud-consumer-ordercloud:consul:host: localhostport: 8500discovery:service-name: ${spring.application.name}
三种服务注册中心的区别

服务注册中心(Zookeeper)
Zookeeper是一个分布式协调工具,可以实现注册中心功能,比如kafka就依赖于Zookeeper
docker安装Zookeeper:Docker安装Zookeeper并进行操作 - 疯子110 - 博客园 (cnblogs.com)
Zookeeper服务器取代Eureka服务器,zk作为服务注册中心
服务总线Bus
原生使用Bus,被alibaba的Nacos替换
结合springcloud config实现分布式自动刷新配置功能
简介
Bus支持两种消息代理:RabbitMQ和kafka
Bus是用来将分布式系统的节点与轻量级消息系统链接起来的框架,它整合了Java的事件处理机制和消息中间件的功能
使用消息中间件推送post请求,然后消费进行动态地刷新

Bus能管理和传播分布式系统间的消息,就像一个分布式执行器,可用于广播状态更改、事件推送等,也可当做微服务间的通信通道。

在微服务架构的系统中,通常会使用轻量级的消息代理来构建一个共用的消息主题,并让系统中所有微服务实例都连接上来,由于该主题中产生的消息会被所有实例监听和消费,所以称它为总线。在总线上的各个实例,都可以方便地广播一些需要让其他连接在该主题上的实例都知道的消息
基本原理
ConfigClient实例都监听MQ中同一个topic(默认是springcloudbus)。当一个服务刷新数据的时候,他会把这个信息放入到topic中,这样其它监听同一个topic的服务就能得到通知,然后去更新自身的配置。
设计思想
1)利用消息总线触发一个客户端/bus/refresh,从而刷新所有客户端的配置

2)利用消息总线触发一个服务端ConfigServer的/bus/refresh端点,从而刷新所有客户端的配置

显然第二种更好,原因:
第一种打破了微服务的职责单一性,因为微服务本身是业务模块,他本不应该承担配置刷新的职责;破坏了微服务各个节点的对等性;有一定的局限性,例如,微服务在迁移是,他的网络地址常常或发生变化,此时如果向左做到自动刷新,那就会增加更多的修改
配置(添加总线支持)
配置中心(Server)中的pom
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bus-amqp</artifactId></dependency>
配置中心的yml
spring:cloud:config:label: master #分支名称name: config #配置文件名称profile: dev #读取文件后缀 config-devuri: http://localhost:3344 #配置中心地址application:name: config-clientrabbitmq:host: localhostport: 5672username: rootpassword: a#暴露监控端点management:endpoints:web:exposure:include: 'bus-refresh'
客户端中的pom
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bus-amqp</artifactId></dependency>
客户端中的yml
spring:cloud:config:label: master #分支名称name: config #配置文件名称profile: dev #读取文件后缀 config-devuri: http://localhost:3344 #配置中心地址application:name: config-clientrabbitmq:host: localhostport: 5672username: rootpassword: a#暴露监控端点management:endpoints:web:exposure:include: '*'
然后由服务器发送post请求http://localhost:3344/actuator/bus-refresh
定点通知
http://localhost:3344/actuator/bus-refresh/{目的地:微服务名:端口号}
总结

消息驱动(Stream)
是一个构建消息驱动微服务的框架
应用程序通过inputs(消费者)或者outputs(生产者)来与SpringCloud Stream中的binder(绑定器)对象交互。通过我们配置来binding(绑定),而SpringCloud Stream的binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与SpringCloud Stream交互就可以方便使用消息驱动的方式。
SpringCloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费者组、分区的三个核心概念。
目前仅支持RabbitMq,kafka
为什么要用SpringCloud Steam
java开发和大数据开发可能所用的消息中间件不同,从而导致切换、维护、开发困难,Stream能够帮助我们屏蔽底层消息中间的差异,降低切换成本,统一消息的编程模型,我们只需要用一种适配绑定的方式,自动的给我们在各种消息中间件内切换

通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件之间的隔离,即解耦。
通过想应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。

标准流程

Binder:绑定器,很方便的连接中间件,屏蔽差异
Channel:通道,类似于队列,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
Source和Sink:简单的可理解为参照对象是SpringCloud Stream自身,从Stream发布消息就是输出,接收消息就是输入,也就是生产者和消费者
常用注解:
@Input :标识输入通道,通过该通道接收到的消息进入应用程序
@Output:标识输出通道,发布的消息通过该通道离开应用程序
@StreamListener:监听队列,用于消费者的队列的消息接收
@EnableBinding:值信道Channel和Exchange绑定在一起
配置
生产者
pom
<dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency><!--EurekaClient--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><!--引入自己定义的api通用包,可以使用payment支付entities--><dependency><groupId>com.atguigu.springcloud</groupId><artifactId>cloud-api-commons</artifactId><version>${project.version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies>
yml
server:port: 8801spring:application:name: cloud-stream-providercloud:stream:binders: #在此处配置要绑定的rabbitmq的服务信息defaultRabbit: #表示定义的名称,用于binding整合type: rabbit #消息组件类型# environment: #设置rabbitmq的相关的环境配置# spring:# rabbitmq:# host: localhost# port: 5672# username: root# password: a# virtual-host: my_vhostbindings: #服务的整合处理output: #这个名字是一个通道的名称destination: studyExchange #表示要使用的exchange名称定义content-type: application/json #设置消息类型,本次为json,文本则设置“text/plain”binder: defaultRabbit #设置要绑定的消息鼓舞的具体设置rabbitmq:host: localhostport: 5672username: rootpassword: avirtual-host: my_vhosteureka:client:service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 #设置心跳的时间间隔(默认是30秒)lease-expiration-duration-in-seconds: 5 #如果现在超过了5秒的间隔(默认是90秒)instance-id: send-8801.com #在信息列表时显示主机名称prefer-ip-address: true
主启动类
import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class StreamMain8801 {public static void main(String[] args) {SpringApplication.run(StreamMain8801.class,args);}}
业务类接口
public interface IMessageProvider {public String send();}
接口实现类
import cn.hutool.core.lang.UUID;import com.atguigu.springcloud.service.IMessageProvider;import lombok.extern.slf4j.Slf4j;import org.springframework.cloud.stream.annotation.EnableBinding;import org.springframework.cloud.stream.messaging.Source;import org.springframework.integration.support.MessageBuilder;import org.springframework.messaging.MessageChannel;import javax.annotation.Resource;/*** Source.class定义消息的推送管道*/@EnableBinding(Source.class)@Slf4jpublic class IMessageProviderImpl implements IMessageProvider {@Resourceprivate MessageChannel output; //消息发送通道@Overridepublic String send() {String serial = UUID.randomUUID().toString();output.send(MessageBuilder.withPayload(serial).build());//MessageBuilder消息构建其log.info("发送:"+serial);return serial;}}
controller
import com.atguigu.springcloud.service.IMessageProvider;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;@RestControllerpublic class SendMessageController {@Resourceprivate IMessageProvider messageProvider;@GetMapping(value = "/senMessage")public String send(){return messageProvider.send();}}
消费者
pom
<dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency><!--EurekaClient--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><!--引入自己定义的api通用包,可以使用payment支付entities--><dependency><groupId>com.atguigu.springcloud</groupId><artifactId>cloud-api-commons</artifactId><version>${project.version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies>
yml
server:port: 8802spring:application:name: cloud-stream-providercloud:stream:binders: #在此处配置要绑定的rabbitmq的服务信息defaultRabbit: #表示定义的名称,用于binding整合type: rabbit #消息组件类型bindings: #服务的整合处理input: #这个名字是一个通道的名称destination: studyExchange #表示要使用的exchange名称定义content-type: application/json #设置消息类型,本次为json,文本则设置“text/plain”binder: defaultRabbit #设置要绑定的消息鼓舞的具体设置rabbitmq:host: localhostport: 5672username: rootpassword: avirtual-host: my_vhosteureka:client:service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 #设置心跳的时间间隔(默认是30秒)lease-expiration-duration-in-seconds: 5 #如果现在超过了5秒的间隔(默认是90秒)instance-id: receive-8802.com #在信息列表时显示主机名称prefer-ip-address: true
主启动类
import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class StreamMain8802 {public static void main(String[] args) {SpringApplication.run(StreamMain8802.class,args);}}
controller
import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Value;import org.springframework.cloud.stream.annotation.EnableBinding;import org.springframework.cloud.stream.annotation.StreamListener;import org.springframework.cloud.stream.messaging.Sink;import org.springframework.messaging.Message;import org.springframework.stereotype.Component;@Component@EnableBinding(Sink.class)@Slf4jpublic class ReceiveMessageController {@Value("${server.port}")private String serverPort;@StreamListener(Sink.INPUT)public void input(Message<String> message){log.info("消费者1号,接收到:"+message.getPayload()+"\t port:"+serverPort);}}
clone消费者8802,生成另外一个消费者8803
重复消费问题
8802和8803同时都收到了8801发送的消息,就会造成数据错误
解决
分组和持久化属性group
不同group可以消费同一数据,同一group存在竞争
Stream默认将不同的微服务分为不同的组
在yml中添加group: atguiguA #自定义分组,将两个服务分成同一个组,默认是轮询分组,将两个服务放在同一组,这样每次发送的消息只能被一个服务消费到,避免重复消费
如果要不同组的话,给定每个服务不同的group
stream:binders: #在此处配置要绑定的rabbitmq的服务信息defaultRabbit: #表示定义的名称,用于binding整合type: rabbit #消息组件类型bindings: #服务的整合处理input: #这个名字是一个通道的名称destination: studyExchange #表示要使用的exchange名称定义content-type: application/json #设置消息类型,本次为json,文本则设置“text/plain”binder: defaultRabbit #设置要绑定的消息鼓舞的具体设置group: atguiguA #自定义分组
消息持久化问题
当消费者微服务宕机了,且其中几个微服务没有配置组,那么在重启之前,生产者发送消息,只会被重启后配置了组的消费者,没配置group的不会消费消息,造成消息丢失问题
服务网关gateway
Zuuk已停止更新,Zuul2出不来了,主流使用gateway
Gateway是在Spring生态系统纸上够贱的API网关服务,基于spring5、springboot和project reactor等技术。
Gateway旨在提供一种简单而有效的方式来对API进行路由,以及提供一些强大的过滤器功能,列入:熔断、限流、重试等。
为了提升网关的性能,springcloud Gateway是基于WebFlux框架实现的,使用了reactor-netty响应式(非阻塞)编程组件,而Webflux框架底层则使用了高性能的Reactor模式通信框架Netty
Route(路由)
路由是构建网关的基本模块,他由ID、目标URI,一系列的断言额过滤器阻证,如果断言为true则匹配该路由
Predicate(断言)
参考的是Java8的java.util.function.Predicate,开发人员可以匹配HTTP请求中的所有内容(例如请求头或者请求参数)如果请求与断言相匹配,则进行路由
Filter(过滤)
指的是Spring框架中GatewayFilter的实例,使用过滤器,可以在请求被路由前或者之后对请求进行修改
工作流程
客户端想SpringCloudGateway发出请求,然后在GatewayHandlerMapping中找到与请求想相匹配的路由,将其发送到GatewayWebHandler。
Handler再通过指定的过滤器链来将请求发送到我们实际的服务执行业务逻辑,然后返回。
过滤器之间用虚线分开是因为过滤器可能会在发送代理请求之前(”pre“)或之后(”post“)执行业务逻辑
过滤器在”pre“类型的过滤器科技做参数校验、权限校验、流量监控、日志输出、协议转换等,在”post“类型的过滤器可以做响应内容、响应头的修改,日志的输出、流量监控等有着非常重要的作用。
入门的两种配置
yml配置
pom,不需要引入web
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-gateway</artifactId></dependency><!--EurekaClient--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency>
yml
server:port: 9527spring:application:name: cloud-gatewaycloud:gateway:routes:- id: payment_rout #路由器的ID,没有固定规则,但要求唯一,建议配合服务名uri: http://localhost:8001 #匹配后提供服务的路由地址predicates:- Path=/payment/get/** #断言,路由相匹配的进行路由- id: payment_rout2uri: http://localhost:8001predicates:- Path=/payment/lb/**eureka:client:service-url:register-with-eureka: truefetch-registry: truedefaultZone: http://localhost:7001/eureka #单节点配置#defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka #集群版instance:instance-id: gateway9527prefer-ip-address: true #访问路径可以显示IP地址hostname: cloud-gateway-service
主启动类,不需要业务
package com.atguigu.springcloud;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.cloud.netflix.eureka.EnableEurekaClient;@SpringBootApplication@EnableEurekaClientpublic class GatewayMain9527 {public static void main(String[] args) {SpringApplication.run(GatewayMain9527.class,args);}}
成功

代码中注入RouteLocator的Bean
package com.atguigu.springcloud.config;import org.springframework.cloud.gateway.route.RouteLocator;import org.springframework.cloud.gateway.route.builder.RouteLocatorBuilder;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class GatewayConfig {@Beanpublic RouteLocator customRuteLocator(RouteLocatorBuilder routeLocatorBuilder){RouteLocatorBuilder.Builder routes = routeLocatorBuilder.routes();routes.route("path_route_atguigu",r -> r.path("/guonei").uri("http://news.baidu.com/guonei")).build();return routes.build();}}

通过微服务名实现动态路由
yml文件中
spring:application:name: cloud-gatewaycloud:gateway:discovery:locator:enabled: true # 开启从注册中心动态撞见路由的功能,利用微服务名进行路由routes:- id: payment_rout #路由器的ID,没有固定规则,但要求唯一,建议配合服务名#uri: http://localhost:8001 #匹配后提供服务的路由地址uri: lb://cloud-payment-servicepredicates:- Path=/payment/get/** #断言,路由相匹配的进行路由- id: payment_rout2#uri: http://localhost:8001uri: lb://cloud-payment-servicepredicates:- Path=/payment/lb/**
Predicate的使用
Loaded RoutePredicateFactory [After] 在某个时间之后 通过ZonedDateTime now = ZonedDateTime.now();获取时间格式
Loaded RoutePredicateFactory [Before] 在某个时间之前
Loaded RoutePredicateFactory [Between] 两个时间之间
Loaded RoutePredicateFactory [Cookie] 带cookies访问 两个参数,一个cookies名,一个cookies的键值对(正则表达式)
Loaded RoutePredicateFactory [Header] 带请求头
Loaded RoutePredicateFactory [Host] 带主机名
Loaded RoutePredicateFactory [Method] 带请求方法
Loaded RoutePredicateFactory [Path] 匹配路径
Loaded RoutePredicateFactory [Query] 带参数名
Loaded RoutePredicateFactory [ReadBodyPredicateFactory]
Loaded RoutePredicateFactory [RemoteAddr]
Loaded RoutePredicateFactory [Weight]
Loaded RoutePredicateFactory [CloudFoundryRouteService]
Filter的使用
路由过滤器可用于修改进入的HTTP请求和返回的HTTP响应,路由过滤器只能指定路由进行使用,由GatewayFilter的工厂类来产生过滤器
yml配置
gateway:discovery:locator:enabled: true # 开启从注册中心动态撞见路由的功能,利用微服务名进行路由routes:- id: payment_rout #路由器的ID,没有固定规则,但要求唯一,建议配合服务名#uri: http://localhost:8001 #匹配后提供服务的路由地址uri: lb://cloud-payment-servicefilters:- AddRequestParameter=X-Request-Id,1024 #过滤器工厂会在匹配的请求头上加上一堆请求头,名称为X-Request-Id,值为1024predicates:- Path=/payment/get/** #断言,路由相匹配的进行路由
自定义过滤器
package com.atguigu.springcloud.filter;import lombok.extern.slf4j.Slf4j;import org.springframework.cloud.gateway.filter.GatewayFilterChain;import org.springframework.cloud.gateway.filter.GlobalFilter;import org.springframework.core.Ordered;import org.springframework.http.HttpStatus;import org.springframework.stereotype.Component;import org.springframework.web.server.ServerWebExchange;import reactor.core.publisher.Mono;import java.util.Date;@Component@Slf4jpublic class MyLogGatewayFilter implements GlobalFilter, Ordered {@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {log.info("come in MyLogGatewayFilter"+new Date());String uname = exchange.getRequest().getQueryParams().getFirst("uname");if(uname == null){log.info("用户名为null,非法用户");exchange.getResponse().setStatusCode(HttpStatus.NOT_ACCEPTABLE);return exchange.getResponse().setComplete();}return chain.filter(exchange);}@Overridepublic int getOrder() {return 0;}}
成功

失败

分布式请求链路跟踪(Sleuth)
在微服务框架中,一个由客户端发起的请求在后端系统中会经过多个不同的服务节点调节来协同产生最后的请求结果,每一个前端请求都会形成一条复杂的分布式服务调用链路,链路中的任何一环出现高延时或错误都会引起整个请求最后的失败。
Sleuth+Zipkin协调使用,Sleuth负责收集数据,Zipkin负责展现
下载Zipkin的jar:
https://search.maven.org/remote_content?g=io.zipkin.java&a=zipkin-server&v=LATEST&c=exec
cmd运行java -jar zipkin.jar

完整的调用链路的工作原理


Trace:类似于树结构的Span集合,表示一条调用链路,存在唯一标识
Span:标识调用链路来源,通俗的理解Span就是一次请求信息
简单配置
服务提供者pom
<!-- 包含了sleuth-zipkin --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-zipkin</artifactId></dependency>
服务提供者yml,主要是zipkin和sleuth
spring:application:name: cloud-payment-servicedatasource :type: com.alibaba.druid.pool.DruidDataSource #当前数据源操作类型'driver-class-name: org.gjt.mm.mysql.Driver # mysql驱动包url: jdbc:mysql://localhost:3306/db2020?useUnicode-true&characterEncoding=utf-8&useSSL=falseusername: rootpassword: azipkin:base-url: http://localhost:9411sleuth:sampler:probability: 1 #采样率介于0和1之间,1则表示全部采集
消费者端一样
成功
服务调用2(OpenFeign)
Feign已停止更新维护,推荐使用OpenFeign
Feign是一个声明式的WebService客户端。它的使用只需创建一个接口,并在接口上添加一个注解就行了
Feign与Ribbon
Ribbon+RestTemplate,利用RestTemplate对http请求的封装处理,形成了一套模板化的调用方法。但是在实际开发中,由于对服务依赖的调用可能不止一处,往往一个接口会被多出调用,所以通常都会针对每个微服务自行封装一些客户端类来包装这些依赖服务的调用。所以,Feign在此基础上做了进一步封装,有它来帮助我们定义和实现依赖服务接口的定义。在Feign的实现下,我们只需创建一个接口并使用注解的方式来配置它(以前是Dao加快上标注Mapper注解,现在是一个微服务接口上面标注一个Feign注解即可),既可以完成对服务提供方的接口绑定,简化了使用springcloud Ribbon时,自动封装服务调用客户端的开发量。
Feign集成了Ribbon,利用Ribbon维护了服务提供者的服务列表信息,并且通过轮询实现了客户端的负载均衡。
Feign和OpenFeign的区别
| Feign | OpenFeign |
|---|---|
| Feign是spring cloud组件中的一个轻量级RESTful的HTTP服务客户端,Feign内置了Ribbon,用来做客户端负载均衡,去调用服务注册中心的服务,Feign的使用方式是:使用Feign的注解定义接口,调用这个接口,就可以调用服务注册中心的服务 | OpenFeign是spring cloud在Feign的基础上支持了Spring Mvc的注解,如@RequestMapping等等,OpenFeign的@FeignClient可以解析SpringMVC的@RequestMapping注解小的接口,并通过动态代理的方式产生实现类,实现类中做负载均衡调用其他服务。 |
| org.springframework.cloud</ groupId> spring-cloud-starter-feign |
org.springframework.cloud</ groupId> spring-cloud-starter-openfeign </ dependency> |
自定义接口
@Component@FeignClient(value = "CLOUD-PAYMENT-SERVICE")//服务提供者名称public interface PaymentFeignService {@GetMapping(value = "payment/get/{id}")//服务提供者Controllerpublic CommonResult<Payment> getPaymentById(@PathVariable("id") Long id);}
启动类
@SpringBootApplication@EnableFeignClientspublic class OrderFeignMain80 {public static void main(String[] args) {SpringApplication.run(OrderFeignMain80.class,args);}}
OpenFeign的超时控制
OpenFeign默认等待一秒钟,若服务提供者的业务处理超过一秒中,就会报错

为了避免这种情况,我们需要设置Feign客户端的超时时间,需在yml配置,由底层Ribbon控制
#设置Feign客户端超时时间(OpenFeign默认支持Ribbon)ribbon:#指的是建立连接使用的时间,适用于网络状况正常的情况下,两端连接所用的时间ReadTimeout: 5000#指的是建立连接后从服务器读取到可用资源所用的时间ConnectTimeout: 5000
OpenFeign日志打印功能
OpenFeign提供了日志打印功能,我们可以通过配置来调整日志级别,从而了解Feign中Http请求的细节。说白了就是对Feign接口的调用情况进行监控和输出。
日志级别
NONE:默认的,不显示任何日志;
BASIC:仅记录请求方法、URL、 响应状态码及执行时间;
HEADERS:除了BASIC中定义的信息之外,还有请求和响应的头信息;
FULL:除了HEADERS中定义的信息之外,还有请求和响应的正文及元数据
配置bean
package com.atguigu.springcloud.config;import feign.Logger;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class FeignConfig {@BeanLogger.Level feignLoggerLevel(){return Logger.Level.FULL;}}
yml中开启
logging:level:#feign以什么级别进行监控com.atguigu.springcloud.service.PaymentFeignService: debug
