- 1.名词解释
- 2.什么是单体架构?
- 3.什么是微服务架构?
- 4.微服务架构的优缺点有哪些?
- 5.什么叫做微服务治理?
- 6.微服务拆分原则有哪些?
- 7.微服务常用的组件有哪些
- 8.Eureka组件
- 9.Ribbon组件
- 10.Nacos组件
- 11.OpenFeign组件
- 12.Gateway组件
- 5 Gateway跨域配置设置
- RabbitMQ
- 10.常见的5种工作模式示例代码
- 11.SpringAMQP消息转换器的作用和使用步骤
- 12.SpringAMQP创建交换机和队列的两种方式
- ElasticSearch1
- 7.文档操作
- 8.java代码操作索引库
- 9.java代码操作文档
- ElasticSearch2
- 2.ES 5种检索类型请求方式
- 3.ES查询结果处理-排序
- 4.ES查询结果处理-分页
- 5.ES查询结果处理-高亮
- 6.ES相关的RESTClient代码实现
- 7.练习
- ElasticSearch3
- 10.ES集群搭建过程?
- 11.ES集群中各个节点的角色?
- 12.什么是ES的脑裂问题?
- 13.如何解决脑裂问题?
- 14.ES分布式集群环境下,添加文档数据的原理?
- 15.ES分布式集群环境下,查询文档数据的原理?
- Sentinel
- 7.线程隔离的两种方案?
- 8.熔断器的三种状态?
- 9.Sentinel支持的熔断规则有哪些?
- 10.授权规则是什么?如何设置授权规则?
- 11.自定义异常结果格式?
- 12.Sentinel规则管理模式有哪些?
- Seata
- 6.Seata的四种模式?
- 7.四种模式执行原理?
- 8.三种模式实现方式?
1.名词解释
1.什么是集群?
概念: 多台服务器做相同的事情
集群可以解决什么问题?
解决的问题:
提高并发量
解决单点故障(单点,单台指只有一台服务器,一旦宕机,整个项目将会瘫痪)
保证服务的高可用
2.什么是负载均衡? 为什么需要负载均衡?
负载均衡策略(Ribbon、Nacos、Eureka、Nginx)
将并发过来的请求,均匀的分发给不同的服务器
降低系统响应时间,提高系统资源利用率,使系统的性能得以提高
3.什么是分布式? 为什么需要分布式?
不同的服务器做不同的事情,协作起来完成一个完整的业务
解决的问题:
降低耦合,让服务器专职做一件事情
4.什么是反向代理? 为什么需要反向代理?
反向代理:
概述: 代理服务端接收来自客户端的请求
反向代理可以将发送到服务器的请求进行处理,决定这些请求是将其转到服务器还是阻止该请求。这就意味着反向代理可以避免任何人对服务器进行直接访问,通过负载均衡和缓存,使服务器免受web漏洞的攻击。
5.什么是正向代理? 为什么需要正向代理?
正向代理:
概述: 代理客户端,向服务端发送请求
(1)正向代理可以让客户端访问原本访问不了的资源。
(2)对外隐藏用户信息
6.正向代理和反向代理的区别是什么?
1、位置不同
2、代理对象不同
3、用途不同
4、安全性不同
2.什么是单体架构?
请描述你对单体架构的理解
单体架构:将业务的所有功能集中在一个项目中开发,打成一个包部署。
优点:
架构简单
部署成本低
缺点:
耦合度高(维护困难、升级困难)
技术栈受限
3.什么是微服务架构?
请描述你对微服务架构的理解,以及为什么需要使用微服务架构?
分布式架构:根据业务功能对系统做拆分,每个业务功能模块作为独立项目开发,称为一个服务。
微服务:一种良好的分布式架构方案
4.微服务架构的优缺点有哪些?
优点:拆分粒度更小、服务更独立、耦合度更低
缺点:架构非常复杂,运维、监控、部署难度提高
5.什么叫做微服务治理?
服务治理就是提供了微服务架构中各微服务实例的快速上线或下线且保持各服务能正常通信的能⼒的⽅案总称
6.微服务拆分原则有哪些?
1、数据独立
2、不同微服务不重复开发相同业务
3、把业务暴露为接口供其他微服务调用
7.微服务常用的组件有哪些
1、Eureka 注册中心、配置中心
2、Nacos 注册中心、配置中心
3、Ribbon 负载均衡
4、OpenFegin 远程调用
5、GetWay 服务网关
6、Sentinel 服务保护
7、seata 分布式事务
8.Eureka组件
1 Eureka组件的作用
服务提供者启动时向Eureka注册自己的信息,Eureka保存这些信息,消费者根据服务名称向Eureka拉取提供者信息。
有多名服务提供者时,服务消费者利用负载均衡算法,从服务列表中挑选一个。
服务提供者会每隔30s向Eureka发送心跳请求,来报告健康状态,Eureka会更新记录服务列表信息,心跳不正常会被剔除。消费者就可以拉取到最新的信息
2 Eureka组件的应用场景
需要对微服务进行远程调用时
3 Eureka组件的使用步骤
创建eureka-server服务
引入eureka依赖
编写启动类
编写配置文件
启动服务
4 Eureka组件是如何进行服务状态监控的
记录服务信息
心跳监控
5 Eureka组件的使用步骤
1、Eureka服务提供者导入依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>
2、编写yml配置文件
server:
port: 10086
spring:
application:
name: eureka-server
eureka:
client:
service-url:
defaultZone: http://127.0.0.1:10086/eureka
register-with-eureka: false # 不注册自己
fetch-registry: false #不拉取服务本eureka服务中的服务信息
3、启动类
@SpringBootApplication
// 配置注解表明当前模块是Eureka服务器
@EnableEurekaServer
public class EurekaApplication {
public static void main(String[] args) {
SpringApplication.run(EurekaApplication.class, args);
}
}
4、其他微服务导入客户端并配置
<!--eureka客户端-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
5、客户端配置文件
eureka:
client:
service-url:
# 服务器的ip地址
defaultZone: http://127.0.0.1:10086/eureka
instance: # 在Eureka中显示服务的ip地址
ip-address: 127.0.0.1 # 配置服务器ip地址
prefer-ip-address: true # 更倾向于使用ip,而不是host名
instance-id: ${eureka.instance.ip-address}:${server.port} # 自定义实例的id
6、修改业务中的远程调用相关代码
// 2.远程调用获取对应的用户信息
// String url = "http://localhost:8081/user/" + order.getUserId();
String url = "http://userservice/user/" + order.getUserId();
6 Eureka组件服务剔除的时机
服务提供者会每隔30秒向EurekaServer发送心跳请求,报告健康状态
eureka会更新记录服务列表信息,心跳不正常会被剔除
消费者就可以拉取到最新的信息
当提供者连续三次没有发送心跳请求时,会认为服务提供者进入宕机状态,从而进行剔除
9.Ribbon组件
1 Ribbon组件的作用
从eureka注册中心中拿到服务列表serverList,然后根据当前的负载均衡算法,选择一个服务server进行http远程调用将结果返回
2 Ribbon组件的使用步骤
方式一:代码方式:在order-service中的OrderApplication类中,定义一个新的IRule:
@Bean
public IRule randomRule(){
return new RandomRule();
}
方式二: 配置文件方式:在order-service的application.yml文件中,添加新的配置也可以修改规则:
userservice:
ribbon:
NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule # 负载均衡规则
方式三:Ribbon默认是采用懒加载,即第一次访问时才会去创建LoadBalanceClient,而饥饿加载则会在项目启动时创建,降低第一次访问的耗时,通过下面配置开启饥饿加载:
ribbon:
eager-load:
enabled: true
clients: userservice
10.Nacos组件
Nacos组件-注册中心
a.作用?
这个组件能做什么? 能够解决什么问题?
管理服务提供者的地址信息
监控服务提供者的健康状态
b.问题的由来?
如何管理服务提供者的地址
多个服务提供者的地址如何管理
如何监控服务提供者的健康状态
c.实现步骤? 技术的使用方式?
启动nacos服务
nacos是一个外部独立的服务,直接解压启动即可.
start.cmd -m standalone
消费者方引入依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
添加配置信息
spring:
cloud:
nacos:
server-addr: localhost:8848
启动微服务
浏览器访问http://localhost:8848/nacos
Nacos组件-配置中心
a.作用?
这个组件能做什么? 能够解决什么问题?
将配置文件从微服务中分离,存放在nacos中,微服务启动时先从nacos中获取配置信息,在加载微服务程序
b.问题的由来?
在集群模式下,每一个微服务中存在相同的配置信息,维护起来不方便
c.实现步骤? 技术的使用方式?
启动nacos服务
nacos是一个外部独立的服务,直接解压启动即可.
start.cmd -m standalone
在nacos中编写外部配置
配置服务器地址(bootstrap.yml)
spring:
application:
name: userservice # 服务名称
profiles:
active: dev #开发环境,这里是dev
cloud:
nacos:
server-addr: localhost:8848 # Nacos地址
config: # 作为配置中心的相关配置
file-extension: yaml # 文件后缀名 必须与服务端配置相同
cluster-name: SH # 所属集群名称
namespace: SH # 所属名称空间
group: default # 所属组名
消费者方引入依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
添加配置信息
spring:
application:
# 给当前服务起名称(注册给Eureka时,Eureka识别服务使用)
# 给服务起名称时不能使用下划线
name: userservice
cloud:
nacos:
server-addr: localhost:8848
discovery:
cluster-name: SH # 集群名称
ephemeral: false # 设置为非临时实例
11.OpenFeign组件
1 Open组件作用
1、远程调用的代码不够简洁优雅。
2、有多个调用时代码需要重复多次
2 OpenFeign组件的应用场景
Feign在RestTemplate对http请求的封装处理基础上做了进一步封装,由他来帮助我们定义和实现依赖服务接口的定义。在Feign的实现下,我们只需创建一个接口并使用注解的方式来配置它(以前是Dao接口上面标注Mapper注解,现在是一个微服务接口上面标注一个Feign注解即可),即可完成对服务提供方的接口绑定,简化了使用Spring cloud Ribbon时,自动封装服务调用客户端的开发量。
3 OpenFeign组件的使用步骤
1、导入依赖
<!--openFeign-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
2、在使用远程调用的地方添加注解并改写代码
1、在Fegin的微服务上cn.itcast.feign包下加入以下代码:
@FeignClient("userservice")
public interface UserFeignClient {
@GetMapping("/user/{id}")
User queryById(@PathVariable("id") Long id);
}
2、在消费者(orderService)启动类上加@EnableFeignClients(basePackages = "cn.itcast.feign") //注解开启Feign
3、在消费者(orderService)中导入远程服务依赖
<dependency>
<groupId>cn.itcast.demo</groupId>
<artifactId>feign-api</artifactId>
<version>1.0</version>
</dependency>
4、 //利用feign发起http请求,查询用户信息
User user = userFeignClient.queryById(order.getUserId());
12.Gateway组件
1 Gateway组件的作用
它是微服务群的入口,一切请求都要从网关进入,经过网关过滤以后,如果网关放行,那么请求才会被路由到微服务上,保护微服务群的安全
2 为什么需要使用Gateway组件
为了保护微服务群的安全
如果没有网关的话,假如说前端工程师要直接连接成百上千的微服务,如果后端给ip端口改了,前端还得跟着动.这样极大增加了前端的工作量.
使用了网关的话,前端直接连接网关,然后根据网关的配置去Nacos里面获取对应的服务器地址去调用.
前端只需要在前端代码里面配置网关的地址就行了,不管你后端怎么改服务的ip和端口,这样前端的工作量就大大的降低了.
3 Gateway组件的使用步骤
1、在getWay微服务中导入依赖
<dependencies>
<!--网关-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<!--nacos服务发现依赖-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
</dependencies>
2、添加yml配置文件
server:
port: 10010 # 网关端口
spring:
application:
name: gateway # 服务名称
cloud:
nacos:
server-addr: localhost:8848 # nacos地址
gateway:
routes: # 网关路由配置
- id: user-service # 路由id,自定义,只要唯一即可
# uri: http://127.0.0.1:8081 # 路由的目标地址 http就是固定地址
uri: lb://userservice # 路由的目标地址 lb就是负载均衡,后面跟服务名称
predicates: # 路由断言,也就是判断请求是否符合路由规则的条件
- Path=/user/** # 这个是按照路径匹配,只要以/user/开头就符合要求 filters: # 过滤器
4 Gateway路由规则的配置方式
server:
port: 10010 # 网关端口
spring:
application:
name: gateway # 服务名称
cloud:
nacos:
server-addr: localhost:8848 # nacos地址
gateway:
routes: # 网关路由配置
- id: user-service # 路由id,自定义,只要唯一即可
# uri: http://127.0.0.1:8081 # 路由的目标地址 http就是固定地址
uri: lb://userservice # 路由的目标地址 lb就是负载均衡,后面跟服务名称
predicates: # 路由断言,也就是判断请求是否符合路由规则的条件
- Path=/user/** # 这个是按照路径匹配,只要以/user/开头就符合要求
filters: # 过滤器
- AddRequestHeader=myHeader, hello # 添加请求头
default-filters: //默认过滤器
- AddRequestHeader=origin,gateway
自定义过滤器规则:
//@Order(-1)//order值越小,优先级越高,执行顺序越靠前
//@Component
public class AuthorizeFilter implements GlobalFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 1.获取请求参数
MultiValueMap<String, String> params = exchange.getRequest().getQueryParams();
// 2.获取authorization参数
String auth = params.getFirst("authorization");
// 3.校验
if ("admin".equals(auth)) {
// 放行
return chain.filter(exchange);
}
// 4.拦截
// 4.1.禁止访问,设置状态码
exchange.getResponse().setStatusCode(HttpStatus.FORBIDDEN);
// 4.2.结束处理
return exchange.getResponse().setComplete();
}
}
5 Gateway跨域配置设置
在网关微服务中配置
spring:
cloud:
gateway:
# 跨域配置
globalcors: # 全局的跨域处理
add-to-simple-url-handler-mapping: true # 解决options请求被拦截问题
corsConfigurations:
'[/**]':
allowedOrigins: # 允许哪些网站的跨域请求
- "http://localhost:80"
- "http://localhost"
- "http://127.0.0.1:80"
- "http://127.0.0.1"
allowedMethods: # 允许的跨域ajax的请求方式
- "GET"
- "POST"
- "DELETE"
- "PUT"
- "OPTIONS"
allowedHeaders: "*" # 允许在请求中携带的头信息
allowCredentials: true # 是否允许携带cookie
maxAge: 360000 # 这次跨域检测的有效期
RabbitMQ
1.什么是同步请求?优缺点?
同一时间只能做一件事,就像打电话,需要实时响应。
优点:
时效性较强,可以立即得到结果
缺点:
耦合度高
性能和吞吐能力下降
有额外的资源消耗
有级联失败问题
2.什么是异步请求?优缺点?
1、一段时间可以同时做很多事,就像发邮件,不需要马上回复。
2、优点:
(1)吞吐量提升:无需等待订阅者处理完成,响应更快速
(2)故障隔离:服务没有直接调用,不存在级联失败问题
(3)调用间没有阻塞,不会造成无效的资源占用
(4)耦合度极低,每个服务都可以灵活插拔,可替换
(5)流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件
3、缺点:
(1)架构复杂了,业务没有明显的流程线,不好管理
(2)需要依赖于Broker的可靠、安全、性能
3.同步请求和异步请求区别?
同步请求时效性较强,可以立即得到结果但是耦合度高,性能和吞吐能力下降,有额外的资源消耗,有级联失败问题
异步请求吞吐量提升:无需等待订阅者处理完成,响应更快速,故障隔离:服务没有直接调用,不存在级联失败问题,调用间没有阻塞,不会造成无效的资源占用,耦合度极低,每个服务都可以灵活插拔,可替换,流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件但是架构复杂了,业务没有明显的流程线,不好管理,需要依赖于Broker的可靠、安全、性能
4.RabbitMQ通讯模型架构是什么?
5.名词解释
Borker是什么: MQ 消息队列
Virtual Host是什么: 虚拟主机
Exchange 是什么什么作用: 交换机
作用:路由消息到队列
Queue是什么什么作用: 队列
作用:存储消息
6.RabbitMQ常见的5种工作模式
Helloworld: 入门/基本模式
单生产 ---> 队列(只有一个队列时,交换机省去不写,默认的交换机与队列名称一致) ---> 单消费
work: 工作模式
单生产 ---> 队列 ---> 多消费
发布/订阅:
广播:fanout
单生产-->交换机-->多个队列-->多个消费者
路由:direct
单生产-->交换机-->多个队列(routingKey)-->多个消费者
主题:topic
单生产-->交换机-->多个队列(routingKey)-->多个消费者
可以使使用通配符
通配符规则:
#:匹配一个或多个词=
*:匹配不多不少恰好1个词
7.简述RabbitMQ发送消息时的链路
1、生产者和Broker建立TCP连接。
2、生产者和Broker建立通道。
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4、Exchange将消息转发到指定的Queue(队列)
8.简述RabbitMQ接收消息时的链路
1、消费者和Broker建立TCP连接
2、消费者和Broker建立通道
3、消费者监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、消费者接收到消息。
6、ack回复
9.SpringAMQP使用步骤
1、生产者导入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、生产者配置文件
spring:
rabbitmq:
host: 192.168.248.222 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: mq # 用户名
password: 123321 # 密码
10.常见的5种工作模式示例代码
1.入门模式
/**
* 入门 helloworld
* @param msg
* @throws InterruptedException
*/
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息:【" + msg + "】");
}
2.工作模式
/**
* wordk 工作模式 按劳分配
* @param msg
* @throws InterruptedException
*/
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage1(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息:【" + msg + "】1");
Thread.sleep(20);
}
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage2(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息:【" + msg + "】2");
Thread.sleep(200);
}
3.发布、订阅模式(Fanout)
消费者
/**
* @author: hanyaning
* @date: 2022/06/25
* @description:
*/
@Configuration
public class FanoutConfig {
/**
* 声明交换机
* @return Fanout类型交换机
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("itcast.fanout");
}
/**
* 第1个队列
*/
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
/**
* 第2个队列
*/
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
/**
* 发布、订阅模式
* @param msg
*/
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}
发布者:
/**
* fanout 扇出
*/
@Test
public void testFanoutExchange() {
// 队列名称
String exchangeName = "itcast.fanout";
// 消息
String message = "hello, everyone!";
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
4.路由(direct)模式
发布者
/**
* direct
*/
@Test
public void testSendDirectExchange() {
// 交换机名称
String exchangeName = "di.exchange";
// 消息
String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "red", message);
rabbitTemplate.convertAndSend(exchangeName, "blue", message);
rabbitTemplate.convertAndSend(exchangeName, "green", message);
rabbitTemplate.convertAndSend(exchangeName, "yellow", message);
}
消费者:
/**
* direct
* @param msg
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue("di.queue"),
exchange = @Exchange("di.exchange"),
key = {
"red","blue"
}
))
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue("di.queue1"),
exchange = @Exchange("di.exchange"),
key = {
"green","yellow"
}
))
public void listenFanoutQueue2(String msg) {
System.out.println("消费者2接收到direct消息:【" + msg + "】");
}
5.Topic模式
消费者
@RabbitListener(bindings = @QueueBinding(
value = @Queue("top.queue3"),
exchange = @Exchange("topic.exchange"),
key = {
"china.#"
}
))
public void listenFanoutQueue3(String msg) {
System.out.println("消费者3接收到Fanout消息:【" + msg + "】");
}
发布者:
/**
* Topic
*/
@Test
public void testSendTopicExchange() {
// 交换机名称
String exchangeName = "topic.exchange";
// 消息
String message = "china红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "china.", message);
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}
11.SpringAMQP消息转换器的作用和使用步骤
问题:当消息传递是对象时,会调用jdk的序列化,内存空间占用大
默认情况下Spring采用的序列化方式是JDK序列化,数据体积过大且可读性差,因此可以使用JSON方式来做序列化和反序列化
1、发布者消费者导入依赖
<!--json转换-->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
2、引导类中添加MessageConverter
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
12.SpringAMQP创建交换机和队列的两种方式
1、 /**
* 声明交换机
* @return Fanout类型交换机
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("itcast.fanout");
}
/**
* 第1个队列
*/
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
/**
* 第2个队列
*/
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
2、注解
@RabbitListener(bindings = @QueueBinding(
value = @Queue("top.queue3"),//队列名称
exchange = @Exchange("topic.exchange"),//交换机名称
key = {
"china.#" //key
}
))
ElasticSearch1
1.ES是什么?有什么作用?
elasticsearch是一款非常强大的开源搜索引擎,具备非常多强大功能,可以帮助我们从海量数据中快速找到需要的内容
2.ES应用场景
在GitHub搜索代码
京东、淘宝商城搜索指定的内容
百度搜索
3.倒排索引过程?
1、将每一个文档的数据利用算法分词,得到一个个词条
2、创建表,每行数据包括词条、词条所在文档id、位置等信息
3、因为词条唯一性,可以给词条创建索引,例如hash表结构索引
4.名词解释
索引库: Index 类似于mysql中的table
mapping映射: Mapping(映射)是索引中文档的约束,例如字段类型约束。类似数据库的表结构(Schema)
文档: 文档(Document),就是一条条的数据,类似数据库中的行(Row),文档都是JSON格式
词条: 字段(Field),就是JSON文档中的字段,类似数据库中的列(Column)
5.Kibana作用?
Kibana 是一个开源的数据分析与可视化平台,与Elasticsearch搜索引擎一起使用
作用:Kibana搜索、查看、交互存放在Elasticsearch索引中的数据,也可以使用Kibana以图表、表格、地图等方式展示数据。
6.索引库操作
创建索引库: PUT /索引库名称
查看索引库:GET /索引库名
修改索引库:PUT /索引库名/_mapping
删除索引库:DELETE /索引库名
创建索引库
格式:
PUT /索引库名称
{
"mappings": {
"properties": {
"字段名":{
"type": "text",
"analyzer": "ik_smart"
},
"字段名2":{
"type": "keyword",
"index": "false"
},
"字段名3":{
"properties": {
"子字段": {
"type": "keyword"
}
}
},
// ...略
}
}
}
示例:
PUT /heima
{
"mappings": {
"properties": {
"info":{
"type": "text",
"analyzer": "ik_smart"
},
"email":{
"type": "keyword",
"index": "falsae"
},
"name":{
"properties": {
"firstName": {
"type": "keyword"
}
}
},
// ... 略
}
}
}
查询索引库
格式:
GET /索引库名
示例:
GET /test
修改索引库
倒排索引结构虽然不复杂,但是一旦数据结构改变(比如改变了分词器),就需要重新创建倒排索引,这简直是灾难。因此索引库一旦创建,无法修改mapping。
虽然无法修改mapping中已有的字段,但是却允许添加新的字段到mapping中,因为不会对倒排索引产生影响。
格式:
PUT /索引库名/_mapping
{
"properties": {
"新字段名":{
"type": "integer"
}
}
}
示例:
PUT /heima/_mapping
{
"properties": {
"age":{
"type": "integer"
}
}
}
删除索引库
格式:
DELETE /索引库名
示例:
DELETE /heima
7.文档操作
添加文档数据:POST /索引库名/_doc/文档id(不给会随机生成id)
修改文档数据:PUT /索引库名/_doc/文档id
删除文档数据:DELETE /索引库名/_doc/文档id
查询文档数据:GET /索引库名/_doc/文档id
添加文档数据
格式:
POST /索引库名/_doc/文档id
{
"字段1": "值1",
"字段2": "值2",
"字段3": {
"子属性1": "值3",
"子属性2": "值4"
},
// ...
}
示例:
POST /heima/_doc/1
{
"info": "黑马程序员Java讲师",
"email": "zy@itcast.cn",
"name": {
"firstName": "云",
"lastName": "赵"
}
}
修改文档数据
修改有两种方式:
- 全量修改:直接覆盖原来的文档
- 增量修改:修改文档中的部分字段
1. 全量修改
全量修改是覆盖原来的文档,其本质是:
- 根据指定的id删除文档
- 新增一个相同id的文档
注意:如果根据id删除时,id不存在,第二步的新增也会执行,也就从修改变成了新增操作了。
格式:
PUT /{索引库名}/_doc/文档id
{
"字段1": "值1",
"字段2": "值2",
// ... 略
}
示例:
PUT /heima/_doc/1
{
"info": "黑马程序员高级Java讲师",
"email": "zy@itcast.cn",
"name": {
"firstName": "云",
"lastName": "赵"
}
}
2. 增量修改
增量修改是只修改指定id匹配的文档中的部分字段。
格式:
POST /{索引库名}/_update/文档id
{
"doc": {
"字段名": "新的值",
}
}
示例:
POST /heima/_update/1
{
"doc": {
"email": "ZhaoYun@itcast.cn"
}
}
删除文档数据
格式:
DELETE /{索引库名}/_doc/id值
示例:
# 根据id删除数据
DELETE /heima/_doc/1
查询文档数据
格式:
GET /{索引库名称}/_doc/{id}
示例:
GET /heima/_doc/1
8.java代码操作索引库
创建索引库:
查看索引库:
删除索引库:
判断索引库是否存在:
初始化
- 引入es的RestHighLevelClient依赖:
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
</dependency>
- 因为SpringBoot默认的ES版本是7.6.2,所以我们需要覆盖默认的ES版本:
<properties>
<java.version>1.8</java.version>
<elasticsearch.version>7.12.1</elasticsearch.version>
</properties>
- 初始化RestHighLevelClient:
private RestHighLevelClient client;
// 创建连接
@BeforeEach
void setUp() {
this.client = new RestHighLevelClient(RestClient.builder(
HttpHost.create("http://192.168.248.222:9200")
));
}
// 关闭连接
@AfterEach
void tearDown() throws IOException {
this.client.close();
}
创建索引库:
@Test
void createHotelIndex() throws IOException {
// 1.创建请求语义对象
CreateIndexRequest request = new CreateIndexRequest("hotel");
// 2.准备请求的参数:DSL语句
request.source(MAPPING_TEMPLATE, XContentType.JSON);
// 3.发送请求
client.indices().create(request, RequestOptions.DEFAULT);
}
查看索引库:
与判断索引库是否存在原理相同,对应的DSL是:
GET /hotel
删除索引库:
@Test
void testDeleteHotelIndex() throws IOException {
// 1.创建请求语义对象
DeleteIndexRequest request = new DeleteIndexRequest("hotel");
// 2.发送请求
client.indices().delete(request, RequestOptions.DEFAULT);
}
判断索引库是否存在:
@Test
void testExistsHotelIndex() throws IOException {
// 1.创建请求语义对象
GetIndexRequest request = new GetIndexRequest("hotel");
// 2.发送请求
boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
// 3.输出
System.err.println(exists ? "索引库已经存在!" : "索引库不存在!");
}
9.java代码操作文档
添加文档数据
修改文档数据
删除文档数据
查看文档数据
批量添加文档数据
package cn.itcast.hotel;
import cn.itcast.hotel.pojo.Hotel;
import cn.itcast.hotel.pojo.HotelDoc;
import cn.itcast.hotel.service.IHotelService;
import com.alibaba.fastjson.JSON;
import org.apache.http.HttpHost;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.io.IOException;
/**
* 文档操作
*/
@SpringBootTest
public class RestHighLeveClient2Test {
private RestHighLevelClient client = null;
@Autowired
private IHotelService hotelService;
/**
* 从Mysql中查询数据,将查询到的数据保存到ES的索引库中
*/
@Test
public void addDoc() throws IOException {
// 从Mysql中根据id查询酒店数据
Hotel hotel = hotelService.getById(36934);
System.out.println(hotel);
//Mysql中查询到的数据与ES索引库中的数据格式不匹配,需要预处理
HotelDoc hotelDoc = new HotelDoc(hotel);
//============操作ES
//1.创建请求语义对象
IndexRequest request = new IndexRequest("hotel");
request.id(hotelDoc.getId() + "");
// 向ES中添加数据时,必须将数据转成json格式
String hotelDocJson = JSON.toJSONString(hotelDoc);
System.out.println(hotelDocJson);
// 设置请求参数
request.source(hotelDocJson, XContentType.JSON);
//2.发送请求给ES,并接收响应结果
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
//3.处理响应结果
DocWriteResponse.Result result = response.getResult();
RestStatus status = response.status();
System.out.println(result);
System.out.println(status);
}
/**
* 从ES中查询文档数据信息
*/
@Test
public void findDoc() throws IOException {
//1.创建请求语义对象
GetRequest request = new GetRequest("hotel", "36934");
//2.发送请求给ES,并接收响应结果
GetResponse response = client.get(request, RequestOptions.DEFAULT);
//3.处理响应结果
String sourceStr = response.getSourceAsString();
System.out.println(sourceStr);
// 将查询到的结果转成java对象
HotelDoc hotelDoc = JSON.parseObject(sourceStr, HotelDoc.class);
System.out.println(hotelDoc);
}
/**
* 从ES中删除文档数据信息
*/
@Test
public void deleteDoc() throws IOException {
//1.创建请求语义对象
DeleteRequest request = new DeleteRequest("hotel", "36934");
//2.发送请求给ES,并接收响应结果
DeleteResponse response = client.delete(request, RequestOptions.DEFAULT);
//3.处理响应结果
RestStatus status = response.status();
System.out.println(status);
}
/**
* 修改ES中的文档数据信息
*/
@Test
public void updateDoc() throws IOException {
//1.创建请求语义对象
UpdateRequest request = new UpdateRequest("hotel", "36934");
// 设置修改的内容
request.doc(
"name", "7天酒店(航头店)",
"address", "航都路xxx号"
);
//2.发送请求给ES,并接收响应结果
UpdateResponse response = client.update(request, RequestOptions.DEFAULT);
//3.处理响应结果
RestStatus status = response.status();
System.out.println(status);
}
/**
* 在单元测试方法执行前执行
*/
@BeforeEach
public void init() {
client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("192.168.200.130", 9200, "http")
)
);
}
/**
* 在单元测试类执行后执行
*/
@AfterEach
public void destory() throws IOException {
if (client != null) {
client.close();
}
}
}
批量添加文档数据
package cn.itcast.hotel;
import cn.itcast.hotel.pojo.Hotel;
import cn.itcast.hotel.pojo.HotelDoc;
import cn.itcast.hotel.service.IHotelService;
import com.alibaba.fastjson.JSON;
import org.apache.http.HttpHost;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.io.IOException;
import java.util.List;
/**
* 文档操作
*/
@SpringBootTest
public class RestHighLeveClient3Test {
private RestHighLevelClient client = null;
@Autowired
private IHotelService hotelService;
/**
* 批量添加文档数据
*/
@Test
public void addDoc() throws IOException {
// 从Mysql中查询所有酒店数据
List<Hotel> list = hotelService.list();
// 1.创建请求语义对象
BulkRequest request = new BulkRequest();
// 封装请求参数
for (Hotel hotel : list) {
// 转换对象
HotelDoc hotelDoc = new HotelDoc(hotel);
// 单个请求对象
IndexRequest indexRequest = new IndexRequest("hotel");
// 设置文档id
indexRequest.id(hotelDoc.getId()+"");
// 转json字符串
String hotelDocJson = JSON.toJSONString(hotelDoc);
// 封装单个文档数据
indexRequest.source(hotelDocJson,XContentType.JSON);
// 封装每一个文档请求对象
request.add(indexRequest);
}
//2.发送请求给ES,并接收结果
BulkResponse response = client.bulk(request, RequestOptions.DEFAULT);
//3.处理响应结果
RestStatus status = response.status();
System.out.println(status);
}
/**
* 在单元测试方法执行前执行
*/
@BeforeEach
public void init(){
client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("192.168.200.130", 9200, "http")
)
);
}
/**
* 在单元测试类执行后执行
*/
@AfterEach
public void destory() throws IOException {
if (client!=null){
client.close();
}
}
}
ElasticSearch2
1.ES 5种检索类型
查询所有
全文检索:
精确查询:
地理坐标:
复合查询:
查询所有
# 查询所有
GET /hotel/_search
GET /hotel/_search
{
"query": {
"match_all": {}
}
}
全文检索:
# 全文检索-查询单个字段
GET /hotel/_search
{
"query": {
"match": {
"name":"如家"
}
}
}
# 全文检索-查询多个字段
GET /hotel/_search
{
"query": {
"multi_match": {
"query": "外滩如家",
"fields": ["name","business"]
}
}
}
精确查询:
# 精准查询-term(词条查询)
GET /hotel/_search
{
"query": {
"term": {
"city":{
"value": "上海"
}
}
}
}
GET /hotel/_search
{
"query": {
"term": {
"brand":"如家"
}
}
}
# 精准查询-range(范围查询)
GET /hotel/_search
{
"query": {
"range": {
"price": {
"gte": 3000,
"lte": 8000
}
}
}
}
地理坐标:
# 地址坐标查询 - 矩形范围查询
GET /hotel/_search
{
"query": {
"geo_bounding_box":{
"location":{
"top_left":{
"lat":"31.24",
"lon":"121.4"
},
"bottom_right":{
"lat":"31.19",
"lon":"121.55"
}
}
}
}
}
# 地址坐标查询 - 方圆多少里 搜附近
GET /hotel/_search
{
"query": {
"geo_distance":{
"distance":"15km",
"location":"31.03,121.6"
}
}
}
复合查询:
# 复合查询---算分函数
GET /hotel/_search
{
"query": {
"match": {
"name":"虹桥如家"
}
}
}
GET /hotel/_search
{
"query": {
"function_score": {
"query": {
"match": {
"name": "虹桥如家"
}
},
"functions": [
{
"filter": {
"term": {
"brand": "希尔顿"
}
},
"weight": 10000
}
],
"boost_mode": "multiply"
}
}
}
GET /hotel/_search
{
"query": {
"function_score": {
"query": {
"match": {
"name": "如家酒店"
}
},
"functions": [
{
"filter": {
"term": {
"brand": "希尔顿"
}
},
"weight": 1000000
}
],
"boost_mode": "multiply"
}
}
}
# 复合查询-bool查询
# must: 必须满足的条件,参与打分
GET /hotel/_search
{
"query": {
"bool": {
"must": [
{
"match": {
"name": "如家酒店"
}
},
{
"term": {
"city": "上海"
}
}
]
}
}
}
# should: 或的关系 参与打分
GET /hotel/_search
{
"query": {
"bool": {
"must": [
{
"term": {
"city": "上海"
}
}
],
"should": [
{
"term": {
"brand": "皇冠假日"
}
},
{
"term": {
"brand": "7天酒店"
}
}
]
}
}
}
# must_not: 必须不满足,不参与打分
GET /hotel/_search
{
"query": {
"bool": {
"must": [
{
"term": {
"city": "上海"
}
}
],
"must_not": [
{
"range": {
"price": {
"lte": 500
}
}
}
]
}
}
}
# filter★: 对查询结果进行过滤,不参与打分
GET /hotel/_search
{
"query": {
"bool": {
"must": [
{
"term": {
"city": "上海"
}
}
],
"filter": [
{
"range": {
"price": {
"lte": 500
}
}
}
]
}
}
}
2.ES 5种检索类型请求方式
GET /索引库/_search
{
"query":{
"查询类型":{}
}
}
3.ES查询结果处理-排序
# 普通字段排序
GET /hotel/_search
{
"query": {
"match_all": {}
},
"sort": [
{
"price":"asc"
},
{
"score":"asc"
}
]
}
# 经纬度距离排序
GET /hotel/_search
{
"query": {
"match_all": {}
},
"sort": [
{
"_geo_distance": {
"location": {
"lat": 31.034,
"lon": 121.612
},
"order": "asc",
"unit": "km"
}
}
]
}
GET /hotel/_search
{
"query": {
"match_all": {}
},
"sort": [
{
"_geo_distance": {
"location":"31.034,121.612",
"order": "asc",
"unit": "km"
}
}
]
}
4.ES查询结果处理-分页
# 基本分页
# form: 当前页的起始索引
# form = (当前页-1)*每页显示条数
# size: 每页显示条数
GET /hotel/_search
{
"query": {
"match_all": {}
},
"from": 9999,
"size": 2
}
5.ES查询结果处理-高亮
# 高亮查询
GET /hotel/_search
{
"query": {
"match": {
"all": "如家酒店"
}
},
"highlight": {
"fields": {
"name":{
"pre_tags": "<em>",
"post_tags": "</em>",
"require_field_match": "false"
},
"brand":{
"pre_tags": "<em>",
"post_tags": "</em>",
"require_field_match": "false"
}
}
}
}
6.ES相关的RESTClient代码实现
package cn.itcast.hotel.find;
import cn.itcast.hotel.pojo.HotelDoc;
import com.alibaba.fastjson.JSON;
import org.apache.http.HttpHost;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.sort.SortOrder;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import java.io.IOException;
import java.util.Map;
/**
* 索引库操作
*/
@SpringBootTest
public class RestHighLeveClientFind1Test {
private RestHighLevelClient client = null;
/**
* 基本查询:
* 查询所有
* 分词查询
* 词条查询
* 范围查询
* @throws IOException
*/
@Test
public void find1() throws IOException {
// 1.创建请求语义对象
SearchRequest request = new SearchRequest("hotel");
// 描述请求类型
// 查询所有
request.source().query(QueryBuilders.matchAllQuery());
// 分词查询一个字段
// request.source().query(QueryBuilders.matchQuery("name", "如家酒店"));
//分词查询多个字段
// request.source().query(QueryBuilders.multiMatchQuery("如家酒店", "name", "brand"));
// 词条查询
// request.source().query(QueryBuilders.termQuery("brand", "如家"));
// 范围查询
// request.source().query(QueryBuilders.rangeQuery("price").gte(100).lte(500));
// 缺少:复合查询(bool查询和算分函数查询),地理坐标查询
// 2.发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 3.解析响应结果
handleResponse(response);
}
/**
* bool查询
* @throws IOException
*/
@Test
public void find2() throws IOException {
// 1.创建请求语义对象
SearchRequest request = new SearchRequest("hotel");
// 描述请求类型
// ------------------
// bool查询——构建bool查询类型
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
// 设置bool查询条件
// must必须满足的条件
boolQuery.must(QueryBuilders.termQuery("city","上海"));
// filter过滤条件
boolQuery.filter(QueryBuilders.rangeQuery("price").lte(200));
// ------------------
// 将请求类型设置到请求语义对象中
request.source().query(boolQuery);
// 2.发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 3.解析响应结果
handleResponse(response);
}
/**
* 分页和排序
* @throws IOException
*/
@Test
public void find3() throws IOException {
// 1.创建请求语义对象
SearchRequest request = new SearchRequest("hotel");
// 描述请求类型
// 查询所有
request.source().query(QueryBuilders.matchAllQuery());
// 分页
// 开始索引 = (当前页-1)*每页显示条数
request.source().from(0);
request.source().size(5);
// 排序
request.source().sort("price", SortOrder.ASC);
// 2.发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 3.解析响应结果
handleResponse(response);
}
/**
* 高亮查询
* @throws IOException
*/
@Test
public void find4() throws IOException {
// 1.创建请求语义对象
SearchRequest request = new SearchRequest("hotel");
// 描述请求类型
// 分词查询
request.source().query(QueryBuilders.matchQuery("all","如家外滩"));
// 高亮查询
request.source().highlighter(new HighlightBuilder().field("name").requireFieldMatch(false));
// 2.发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 3.解析响应结果
handleResponse(response);
}
/**
* 处理响应结果
* @param response
*/
public static void handleResponse(SearchResponse response) {
// 获取命中的所有数据(总条数和文档数据)
SearchHits hits = response.getHits();
// 获取总条数信息
long total = hits.getTotalHits().value;
System.out.println("命中的总条数为:" + total);
// 获取命中的文档数据
SearchHit[] docHits = hits.getHits();
// 遍历数据
for (SearchHit docHit : docHits) {
// 解析文档数据
String docJson = docHit.getSourceAsString();
// System.out.println(docJson);
// 将文档数据转成java对象
HotelDoc hotelDoc = JSON.parseObject(docJson, HotelDoc.class);
// 解析高亮结果数据
Map<String, HighlightField> highlightFields = docHit.getHighlightFields();
// 获取指定字段你对应的高亮结果
HighlightField highlightField = highlightFields.get("name");
if (highlightField !=null){
String hlname = highlightField.getFragments()[0].string();
// System.out.println(hlname);
hotelDoc.setName(hlname);
}
System.out.println(hotelDoc);
}
}
/**
* 在单元测试方法执行前执行
*/
@BeforeEach
public void init() {
client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("192.168.200.130", 9200, "http")
)
);
}
/**
* 在单元测试类执行后执行
*/
@AfterEach
public void destroy() throws IOException {
if (client != null) {
client.close();
}
}
}
7.练习
ElasticSearch3
1.数据聚合:
聚合分桶:用来对文档做分组
聚合度量: 用以计算一些值,比如:最大值、最小值、平均值等
管道: 其它聚合的结果为基础做聚合
2.数据聚合DSL格式
语法如下:
GET /hotel/_search
{
"size": 0, // 设置size为0,结果中不包含文档,只包含聚合结果
"aggs": { // 定义聚合
"brandAgg": { //给聚合起个名字
"terms": { // 聚合的类型,按照品牌值聚合,所以选择term
"field": "brand", // 参与聚合的字段
"size": 20 // 希望获取的聚合结果数量
}
}
}
}
示例:
GET /hotel/_search
{
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"size": 20
}
}
}
}
3.RestAPI实现数据聚合:
package cn.itcast.hotel.find;
import org.apache.http.HttpHost;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import java.io.IOException;
import java.util.List;
/**
* 聚合操作
*/
@SpringBootTest
public class RestHighLeveClientAggsTest {
private RestHighLevelClient client = null;
/**
* @Description
*/
@Test
public void test01() throws IOException {
// 1.创建请求语义对象
SearchRequest request = new SearchRequest("hotel");
// 不需要展示任何文档数据
request.source().size(0);
String brandAgg="brandAgg";
// 设置聚合分桶
request.source().aggregation(
AggregationBuilders
.terms(brandAgg)
.field("brand")
.size(20)
);
String cityAgg="cityAgg";
request.source().aggregation(
AggregationBuilders
.terms(cityAgg)
.field("city")
.size(5)
);
// 2.发送请求给ES
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 3.解析响应结果
// 获取品牌桶结果数据
handleResponse(response,brandAgg);
// 获取城市桶结果数据
handleResponse(response,cityAgg);
}
/**
* 解析响应结果数据
*
* @param response
*/
public void handleResponse(SearchResponse response, String name) {
//1.获取所有的聚合结果
Aggregations aggregations = response.getAggregations();
//2.根据聚合名称获取对应的聚合结果
Terms brandAgg = aggregations.get(name);
//3.获取桶数组
List<? extends Terms.Bucket> buckets = brandAgg.getBuckets();
if (buckets != null && buckets.size() > 0) {
for (Terms.Bucket bucket : buckets) {
Object key = bucket.getKey();
long count = bucket.getDocCount();
System.out.println(key + " : " + count);
}
}
System.out.println("=========================================");
}
/**
* 在单元测试方法执行前执行
*/
@BeforeEach
public void init() {
client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("192.168.200.130", 9200, "http")
)
);
}
/**
* 在单元测试类执行后执行
*/
@AfterEach
public void destroy() throws IOException {
if (client != null) {
client.close();
}
}
}
4.什么是自动补全?
当用户在搜索框输入字符时,我们应该提示出与该字符有关的搜索项,根据用户输入的字母,提示完整词条的功能,就是自动补全了。
5.拼音分词器作用?
要实现根据字母做补全,就必须对文档按照拼音分词
6.如何使用拼音分词器?
默认的拼音分词器会将每个汉字单独分为拼音,我们需要对拼音分词器做个性化定制,形成自定义分词器
- character filters:在tokenizer之前对文本进行处理。例如删除字符、替换字符
- tokenizer:将文本按照一定的规则切割成词条(term)。例如keyword,就是不分词;还有ik_smart
- tokenizer filter:将tokenizer输出的词条做进一步处理。例如大小写转换、同义词处理、拼音处理等
声明自定义分词器的语法如下:
PUT /test
{
"settings": {
"analysis": {
"analyzer": {
"my_analyzer": {
"tokenizer": "ik_max_word",
"filter": "pinyin"
}
}
}
}
}
PUT /test
{
"settings": {
"analysis": {
"analyzer": { // 自定义分词器
"my_analyzer": { // 分词器名称
"tokenizer": "ik_max_word",
"filter": "py"
}
},
"filter": { // 自定义tokenizer filter
"py": { // 过滤器名称
"type": "pinyin", // 过滤器类型,这里是pinyin
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chinese_pinyin_tokenize": false
}
}
}
},
"mappings": {
"properties": {
"name": {
"type": "text",
"analyzer": "my_analyzer",
"search_analyzer": "ik_smart"
}
}
}
}
示例:
# 自定义拼音分词器
# 注意事项,在哪个索引库中自定义分词器,分词器就在那个索引库中使用
PUT /test
{
"settings": {
"analysis": {
"analyzer": {
"my_analyzer": {
"tokenizer": "ik_max_word",
"filter": "pinyin"
}
}
}
}
}
POST /test/_analyze
{
"text": "如家酒店还不错",
"analyzer": "my_analyzer"
}
PUT /test1
{
"settings": {
"analysis": {
"analyzer": {
"my_analyzer": {
"tokenizer": "ik_max_word",
"filter": "py"
}
},
"filter": {
"py": {
"type": "pinyin",
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chinese_pinyin_tokenize": false
}
}
}
}
}
POST /test1/_analyze
{
"text": "如家酒店还不错",
"analyzer": "my_analyzer"
}
PUT /test2
{
"settings": {
"analysis": {
"analyzer": {
"my_analyzer": {
"tokenizer": "ik_max_word",
"filter": "py"
}
},
"filter": {
"py": {
"type": "pinyin",
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chinese_pinyin_tokenize": false
}
}
}
},
"mappings": {
"properties": {
"name": {
"type": "text",
"analyzer": "my_analyzer"
}
}
}
}
POST /test2/_doc/1
{
"id": 1,
"name": "狮子"
}
POST /test2/_doc/2
{
"id": 2,
"name": "虱子"
}
GET /test2/_search
GET /test2/_search
{
"query": {
"match": {
"name": "虱子"
}
}
}
# 创建索引库时使用的分词器和搜索时使用的分词器不能是同一个
PUT /test3
{
"settings": {
"analysis": {
"analyzer": {
"my_analyzer": {
"tokenizer": "ik_max_word",
"filter": "py"
}
},
"filter": {
"py": {
"type": "pinyin",
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chinese_pinyin_tokenize": false
}
}
}
},
"mappings": {
"properties": {
"name": {
"type": "text",
"analyzer": "my_analyzer",
"search_analyzer": "ik_smart"
}
}
}
}
POST /test3/_doc/1
{
"id": 1,
"name": "狮子"
}
POST /test3/_doc/2
{
"id": 2,
"name": "虱子"
}
GET /test3/_search
GET /test3/_search
{
"query": {
"match": {
"name": "sz"
}
}
}
注意: 构建索引库和搜索搜因库使用的不是同一个分词器
7.DSL如何实现自动补全功能?
# 自动补全查询
GET /test/_search
{
"suggest": {
"title_suggest": {
"text": "n", // 关键字
"completion": {
"field": "title", // 补全查询的字段
"skip_duplicates": true, // 跳过重复的
"size": 10 // 获取前10条结果
}
}
}
}
8.什么是ES的数据同步?
elasticsearch中的酒店数据来自于mysql数据库,因此mysql数据发生改变时,elasticsearch也必须跟着改变,这个就是elasticsearch与mysql之间的数据同步。
9.ES数据同步过程?
引入依赖
在hotel-admin、hotel-demo中引入rabbitmq的依赖:
<!--amqp-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
host: 192.168.200.130
port: 5672
virtual-host: /
username: itcast
password: 123321
声明队列交换机名称
在hotel-admin和hotel-demo中的cn.itcast.hotel.constatnts
包下新建一个类MqConstants
:
package cn.itcast.hotel.constants;
public class MqConstants {
/**
* 交换机
*/
public final static String HOTEL_EXCHANGE = "hotel.topic";
/**
* 监听新增和修改的队列
*/
public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
/**
* 监听删除的队列
*/
public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
/**
* 新增或修改的RoutingKey
*/
public final static String HOTEL_INSERT_KEY = "hotel.insert";
/**
* 删除的RoutingKey
*/
public final static String HOTEL_DELETE_KEY = "hotel.delete";
}
声明队列交换机
在hotel-demo中,定义配置类,声明队列、交换机:
package cn.itcast.hotel.config;
import cn.itcast.hotel.constants.MqConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MqConfig {
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(MqConstants.HOTEL_EXCHANGE, true, false);
}
@Bean
public Queue insertQueue(){
return new Queue(MqConstants.HOTEL_INSERT_QUEUE, true);
}
@Bean
public Queue deleteQueue(){
return new Queue(MqConstants.HOTEL_DELETE_QUEUE, true);
}
@Bean
public Binding insertQueueBinding(){
return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
}
@Bean
public Binding deleteQueueBinding(){
return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
}
}
发送MQ消息
在hotel-admin中的增、删、改业务中分别发送MQ消息:
@PostMapping
public void saveHotel(@RequestBody Hotel hotel){
hotelService.save(hotel);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
}
@PutMapping()
public void updateById(@RequestBody Hotel hotel){
if (hotel.getId() == null) {
throw new InvalidParameterException("id不能为空");
}
hotelService.updateById(hotel);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
}
@DeleteMapping("/{id}")
public void deleteById(@PathVariable("id") Long id) {
hotelService.removeById(id);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_DELETE_KEY,id);
}
接收MQ消息
hotel-demo接收到MQ消息要做的事情包括:
- 新增消息:根据传递的hotel的id查询hotel信息,然后新增一条数据到索引库
- 删除消息:根据传递的hotel的id删除索引库中的一条数据
1)首先在hotel-demo的cn.itcast.hotel.service
包下的IHotelService
中新增新增、删除业务
void deleteById(Long id);
void insertById(Long id);
2)给hotel-demo中的cn.itcast.hotel.service.impl
包下的HotelService中实现业务:
@Override
public void deleteById(Long id) {
try {
// 1.准备Request
DeleteRequest request = new DeleteRequest("hotel", id.toString());
// 2.发送请求
client.delete(request, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void insertById(Long id) {
try {
// 0.根据id查询酒店数据
Hotel hotel = getById(id);
// 转换为文档类型
HotelDoc hotelDoc = new HotelDoc(hotel);
// 1.准备Request对象
IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
// 2.准备Json文档
request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
// 3.发送请求
client.index(request, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
3)编写监听器
在hotel-demo中的cn.itcast.hotel.mq
包新增一个类:
package cn.itcast.hotel.mq;
import cn.itcast.hotel.constants.MqConstants;
import cn.itcast.hotel.service.IHotelService;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class HotelListener {
@Autowired
private IHotelService hotelService;
/**
* 监听酒店新增或修改的业务
* @param id 酒店id
*/
@RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
public void listenHotelInsertOrUpdate(Long id){
hotelService.insertById(id);
}
/**
* 监听酒店删除的业务
* @param id 酒店id
*/
@RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
public void listenHotelDelete(Long id){
hotelService.deleteById(id);
}
}
10.ES集群搭建过程?
创建es集群
首先编写一个docker-compose文件,内容如下:
version: '2.2'
services:
es01:
image: elasticsearch:7.12.1
container_name: es01
environment:
- node.name=es01
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es02,es03
- cluster.initial_master_nodes=es01,es02,es03
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
volumes:
- data01:/usr/share/elasticsearch/data
ports:
- 9200:9200
networks:
- elastic
es02:
image: elasticsearch:7.12.1
container_name: es02
environment:
- node.name=es02
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es01,es03
- cluster.initial_master_nodes=es01,es02,es03
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
volumes:
- data02:/usr/share/elasticsearch/data
ports:
- 9201:9200
networks:
- elastic
es03:
image: elasticsearch:7.12.1
container_name: es03
environment:
- node.name=es03
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es01,es02
- cluster.initial_master_nodes=es01,es02,es03
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
volumes:
- data03:/usr/share/elasticsearch/data
networks:
- elastic
ports:
- 9202:9200
volumes:
data01:
driver: local
data02:
driver: local
data03:
driver: local
networks:
elastic:
driver: bridge
es运行需要修改一些linux系统权限,修改/etc/sysctl.conf
文件
vi /etc/sysctl.conf
添加下面的内容:
vm.max_map_count=262144
然后执行命令,让配置生效:
sysctl -p
通过docker-compose启动集群:
docker-compose up -d
集群状态监控
使用cerebro来监控es集群状态,解压即可使用,进入对应的bin目录,双击其中的cerebro.bat文件即可启动服务
.创建索引库
利用kibana的DevTools创建索引库
在DevTools中输入指令:
PUT /itcast
{
"settings": {
"number_of_shards": 3, // 分片数量
"number_of_replicas": 1 // 副本数量
},
"mappings": {
"properties": {
// mapping映射定义 ...
}
}
}
利用cerebro创建索引库
利用cerebro还可以创建索引库
查看分片效果
回到首页,即可查看索引库分片效果
11.ES集群中各个节点的角色?
ES节点有如下角色:
master 主节点 对CPU要求高,但是内存要求低
data 数据节点 对CPU和内存要求都高
data_content 内容数据节点
data_hot 热点数据节点
data_warm 暖数据节点
data_cold 冷数据节点
data_frozen 冻结数据节点
ingest 摄取节点
ml 机器学习节点
remote_cluster_client 远程集群客户端节点
transform 转换节点
voting_only 仅投票节点
coordinating 仅协调节点 对网络带宽、CPU要求高
12.什么是ES的脑裂问题?
ES的脑裂问:脑裂是因为集群中的节点失联导致的。出现多个主节点
13.如何解决脑裂问题?
解决脑裂的方案是:要求选票超过 ( eligible节点数量 + 1 )/ 2 才能当选为主,因此eligible节点数量最好是奇数。对应配置项是discovery.zen.minimum_master_nodes,在es7.0以后,已经成为默认配置,因此一般不会发生脑裂问题
例如:3个节点形成的集群,选票必须超过 (3 + 1) / 2 ,也就是2票。node3得到node2和node3的选票,当选为主。node1只有自己1票,没有当选。集群中依然只有1个主节点,没有出现脑裂。
14.ES分布式集群环境下,添加文档数据的原理?
分片存储:elasticsearch会通过hash算法来计算文档应该存储到哪个分片:
15.ES分布式集群环境下,查询文档数据的原理?
scatter phase:分散阶段,coordinating node会把请求分发到每一个分片
gather phase:聚集阶段,coordinating node汇总data node的搜索结果,并处理为最终结果集返回给用户
Sentinel
1.Sentinel是什么?能做什么?
Sentinel是阿里巴巴开源的一款微服务流量控制组件。
1、丰富的应用场景:Sentinel 承接了阿里巴巴近 10 年的双十一大促流量的核心场景,例如秒杀(即突发流量控制在系统容量可以承受的范围)、消息削峰填谷、集群流量控制、实时熔断下游不可用应用等。
2、完备的实时监控:Sentinel 同时提供实时的监控功能。您可以在控制台中看到接入应用的单台机器秒级数据,甚至 500 台以下规模的集群的汇总运行情况。
3、广泛的开源生态:Sentinel 提供开箱即用的与其它开源框架/库的整合模块,例如与 Spring Cloud、Dubbo、gRPC 的整合。您只需要引入相应的依赖并进行简单的配置即可快速地接入 Sentinel。
4、完善的 SPI 扩展点:Sentinel 提供简单易用、完善的 SPI 扩展接口。您可以通过实现扩展接口来快速地定制逻辑。例如定制规则管理、适配动态数据源等。
2.描述雪崩问题?解决方案有哪些?
雪崩问题:微服务之间相互调用,因为调用链中的一个服务故障,引起整个链路都无法访问的情况。
解决方案:
(1)超时处理:设定超时时间,请求超过一定时间没有响应就返回错误信息,不会无休止等待
(2)仓壁模式:我们可以限定每个业务能使用的线程数,避免耗尽整个tomcat的资源,因此也叫线程隔离。
(3)断路器模式:由断路器统计业务执行的异常比例,如果超出阈值则会熔断该业务,拦截访问该业务的一切请求。
(4)流量控制:限制业务访问的QPS,避免服务因流量的突增而故障。
3.什么是Sentinel的资源?
资源是Sentinel中的一个关键概念,它可以是任何东西,比如服务、方法甚至是代码片段。
4.流控模式有哪些?
(1)直接:统计当前资源的请求,触发阈值时对当前资源直接限流,也是默认的模式
(2)关联:统计与当前资源相关的另一个资源,触发阈值时,对当前资源限流
(3)链路:统计从指定链路访问到本资源的请求,触发阈值时,对指定链路限流
5.流控效果有哪些?
(1)快速失败:达到阈值后,新的请求会被立即拒绝并抛出FlowException异常。是默认的处理方式。
(2)warm up:预热模式,对超出阈值的请求同样是拒绝并抛出异常。但这种模式阈值会动态变化,从一个较小值逐渐增加到最大阈值。
(3)排队等待:让所有的请求按照先后次序排队执行,两个请求的间隔不能小于指定时长
6.Feign整合Sentinel步骤?
1.修改配置,开启sentinel功能
修改OrderService的application.yml文件,开启Feign的Sentinel功能:
feign:
sentinel:
enabled: true # 开启feign对sentinel的支持
2.编写失败降级逻辑
①方式一:FallbackClass,无法对远程调用的异常做处理
②方式二:FallbackFactory,可以对远程调用的异常做处理,我们选择这种
方式二:
步骤一:在feing-api项目中定义类,实现FallbackFactory:
package cn.itcast.feign.clients.fallback;
@Slf4j
public class UserClientFallbackFactory implements FallbackFactory<UserClient> {
@Override
public UserClient create(Throwable throwable) {
return new UserClient() {
@Override
public User findById(Long id) {
log.error("查询用户异常", throwable);
return new User();
}
};
}
}
步骤二:在feing-api项目中的DefaultFeignConfiguration类中将UserClientFallbackFactory注册为一个Bean:
@Bean
public UserClientFallbackFactory userClientFallbackFactory(){
return new UserClientFallbackFactory();
}
步骤三:在feing-api项目中的UserClient接口中使用UserClientFallbackFactory:
import cn.itcast.feign.clients.fallback.UserClientFallbackFactory;
import cn.itcast.feign.pojo.User;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
@FeignClient(value = "userservice", fallbackFactory = UserClientFallbackFactory.class)
public interface UserClient {
@GetMapping("/user/{id}")
User findById(@PathVariable("id") Long id);
}
7.线程隔离的两种方案?
(1)线程池隔离:给每个服务调用业务分配一个线程池,利用线程池本身实现隔离效果
(2)信号量隔离(Sentinel默认采用):不创建线程池,而是计数器模式,记录业务使用的线程数量,达到信号量上限时,禁止新的请求。
8.熔断器的三种状态?
(1)closed:关闭状态,断路器放行所有请求,并开始统计异常比例、慢请求比例。超过阈值则切换到open状态
(2)open:打开状态,服务调用被熔断,访问被熔断服务的请求会被拒绝,快速失败,直接走降级逻辑。Open状态5秒后会进入half-open状态
(3)half-open:半开状态,放行一次请求,根据执行结果来判断接下来的操作。
请求成功:则切换到closed状态
请求失败:则切换到open状态
9.Sentinel支持的熔断规则有哪些?
(1)慢调用
(2)异常比例
(3)异常数
10.授权规则是什么?如何设置授权规则?
授权规则是对请求方来源做判断和控制。
1、给网关添加请求头
spring:
cloud:
gateway:
default-filters:
- AddRequestHeader=origin,gateway
2、添加一个授权规则,放行origin值为gateway的请
11.自定义异常结果格式?
public interface BlockExceptionHandler {
/**
* 处理请求被限流、降级、授权拦截时抛出的异常:BlockException
*/
void handle(HttpServletRequest request, HttpServletResponse response, BlockException e) throws Exception;
}
2、实现BlockExceptionHandler
@Component
public class SentinelExceptionHandler implements BlockExceptionHandler {
@Override
public void handle(HttpServletRequest request, HttpServletResponse response, BlockException e) throws Exception {
String msg = "未知异常";
int status = 429;
if (e instanceof FlowException) {
msg = "请求被限流了";
} else if (e instanceof ParamFlowException) {
msg = "请求被热点参数限流";
} else if (e instanceof DegradeException) {
msg = "请求被降级了";
} else if (e instanceof AuthorityException) {
msg = "没有权限访问";
status = 401;
}
response.setContentType("application/json;charset=utf-8");
response.setStatus(status);
response.getWriter().println("{\"msg\": " + msg + ", \"status\": " + status + "}");
}
}
12.Sentinel规则管理模式有哪些?
(1)原始模式
(2)pull模式:控制台将配置的规则推送到Sentinel客户端,而客户端会将配置规则保存在本地文件或数据库中。以后会定时去本地文件或数据库中查询,更新本地规则。
(3)push模式:控制台将配置规则推送到远程配置中心,例如Nacos。Sentinel客户端监听Nacos,获取配置变更的推送消息,完成本地配置更新。
Seata
1.CPA定理是什么?
C:一致性
在微服务架构中,从任何节点读取到的数据应当是一致的
A:可用性
在服务不宕机的情况下,必须返回预期的结果
p:分区容错性
在微服务架构中不可避免的
2.Base理论是什么?
Basically Avaliable 基本可用
Soft State 软状态
EventUallt Consistent 最终一致性
3.CP和AP指导思想是什么?
CP:一致性
所有的分支事务执行完毕后,等待协调事务发送最终提交回滚事务的通知
AP:可用性
软状态:允许不同事务短时间内状态不一致
但状态过后要保证最终一致
4.Seata结构?
TC 事务协调者
连接mysql数据库:将事务的各种状态信息保存到mysql数据表中
记录全局事务
记录全局事务包含的分支事务
TM 事务管理者
开启全局事务
提交/回滚事务
RM 资源管理者
分支事务管理
5.Seata搭建过程?
1.服务中导入依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<exclusions>
<!--版本较低,1.3.0,因此排除-->
<exclusion>
<artifactId>seata-spring-boot-starter</artifactId>
<groupId>io.seata</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<!--seata starter 采用1.4.2版本-->
<version>${seata.version}</version>
</dependency>
2、配置TC地址
seata:
registry: # TC服务注册中心的配置,微服务根据这些信息去注册中心获取tc服务地址
type: nacos # 注册中心类型 nacos
nacos:
server-addr: 127.0.0.1:8848 # nacos地址
namespace: "" # namespace,默认为空
group: DEFAULT_GROUP # 分组,默认是DEFAULT_GROUP
application: seata-tc-server # seata服务名称
username: nacos
password: nacos
tx-service-group: seata-demo # 事务组名称
service:
vgroup-mapping: # 事务组与cluster的映射关系
seata-demo: SH
data-source-proxy-mode: XA #默认AT
6.Seata的四种模式?
(1)XA 模式: XA 规范 是 X/Open 组织定义的分布式事务处理(DTP,Distributed Transaction Processing)标准,XA 规范 描述了全局的TM与局部的RM之间的接口,几乎所有主流的数据库都对 XA 规范 提供了支持
(2)AT 模式: AT模式同样是分阶段提交的事务模型,不过缺弥补了XA模型中资源锁定周期过长的缺陷。
(3)TCC 模式:每阶段都是独立事务,不同的是TCC通过人工编码来实现数据恢复
(4)saga 模式:长事务解决方案
7.四种模式执行原理?
1.XA 模式
RM一阶段的工作:
① 注册分支事务到TC
② 执行分支业务sql但不提交
③ 报告执行状态到TC
TC二阶段的工作:
• TC检测各分支事务执行状态
a.如果都成功,通知所有RM提交事务
b.如果有失败,通知所有RM回滚事务
RM二阶段的工作:
• 接收TC指令,提交或回滚事务
2.AT 模
一阶段:
1)TM发起并注册全局事务到TC
2)TM调用分支事务
3)分支事务准备执行业务SQL
4)RM拦截业务SQL,根据where条件查询原始数据,形成快照。
5)RM执行业务SQL,提交本地事务,释放数据库锁。此时 money = 90
6)RM报告本地事务状态给TC
二阶段:
1)TM通知TC事务结束
2)TC检查分支事务状态
a)如果都成功,则立即删除快照
b)如果有分支事务失败,需要回滚。读取快照数据
3.TCC模式
4.Saga模式
8.三种模式实现方式?
1.XA 模式
1)修改application.yml文件(==每个参与事务的微服务==),开启XA模式
seata:
data-source-proxy-mode: XA
2) 给发起全局事务的入口方法添加@GlobalTransactional注解.
2.AT模式
1)创建个一个数据库表用来记录全局锁
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for undo_log
-- ----------------------------
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
`branch_id` bigint(20) NOT NULL COMMENT 'branch transaction id',
`xid` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'global transaction id',
`context` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` longblob NOT NULL COMMENT 'rollback info',
`log_status` int(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` datetime(6) NOT NULL COMMENT 'create datetime',
`log_modified` datetime(6) NOT NULL COMMENT 'modify datetime',
UNIQUE INDEX `ux_undo_log`(`xid`, `branch_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = 'AT transaction mode undo table' ROW_FORMAT = Compact;
DROP TABLE IF EXISTS `lock_table`;
CREATE TABLE `lock_table` (
`row_key` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`xid` varchar(96) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`transaction_id` bigint(20) NULL DEFAULT NULL,
`branch_id` bigint(20) NOT NULL,
`resource_id` varchar(256) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`table_name` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`pk` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`gmt_create` datetime NULL DEFAULT NULL,
`gmt_modified` datetime NULL DEFAULT NULL,
PRIMARY KEY (`row_key`) USING BTREE,
INDEX `idx_branch_id`(`branch_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;
SET FOREIGN_KEY_CHECKS = 1;
2)修改application.yml文件,将事务模式修改为AT模式即可:
seata:
data-source-proxy-mode: AT # 默认就是AT
3.TCC模式
1) 创建一个表用来冻结数据
CREATE TABLE `account_freeze_tbl` (
`xid` varchar(128) NOT NULL,
`user_id` varchar(255) DEFAULT NULL COMMENT '用户id',
`freeze_money` int(11) unsigned DEFAULT '0' COMMENT '冻结金额',
`state` int(1) DEFAULT NULL COMMENT '事务状态,0:try,1:confirm,2:cancel',
PRIMARY KEY (`xid`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT;
- xid:是全局事务id
- freeze_money:用来记录用户冻结金额
- state:用来记录事务状态
2) 声明TCC接口
1.TCC的Try、Confirm、Cancel方法都需要在接口中基于注解来声明
2.account-service项目中的cn.itcast.account.service包中新建一个接口
@LocalTCC // 设置TCC模式
public interface TCCAccountService {
/**
* 从用户账户中扣款
*/
// 此注解使用在try方法上,方法名称可以自定义
// 预留冻结
@TwoPhaseBusinessAction(name = "deduct",commitMethod = "confirm",rollbackMethod = "cancel")
void deduct(
@BusinessActionContextParameter(paramName = "userId") String userId,
@BusinessActionContextParameter(paramName = "money") int money);
// confirm: 提交
// BusinessActionContext: 业务上下文对象
void confirm(BusinessActionContext context);
// cancel: 事务补偿
void cancel(BusinessActionContext context);
}
3) 编写实现类
1.cotroller层调用业务方法
@RestController
@RequestMapping("account")
public class AccountController {
//@Autowired
//private AccountService accountService;
@Autowired
private TCCAccountService accountService;
@RequestMapping("/{userId}/{money}")
public ResponseEntity<Void> deduct(@PathVariable("userId") String userId, @PathVariable("money") Integer money){
accountService.deduct(userId, money);
return ResponseEntity.noContent().build();
}
}
2.account-service服务中的cn.itcast.account.service.impl包下新建一个类,编写业务逻辑
package cn.itcast.account.service.impl;
@Service
public class TCCAccountServiceImpl implements TCCAccountService {
@Autowired
private AccountMapper accountMapper;
@Autowired
private AccountFreezeMapper freezeMapper;
// try阶段
@Override
@Transactional
public void deduct(String userId, int money) {
// 0.获取全局事务ID
String xid = RootContext.getXID();
// 1.查询account_freeze_tbl表,判断cancel是否执行
AccountFreeze accountFreeze = freezeMapper.selectById(xid);
if (accountFreeze!=null){
// cancel执行过了.不再执行try
if (accountFreeze.getState()==2){
freezeMapper.deleteById(xid);
}
return;
}
// 1.冻结金额 -- 向冻结表中添加一条数据
AccountFreeze freeze = new AccountFreeze();
freeze.setUserId(userId);
freeze.setFreezeMoney(money);
freeze.setState(AccountFreeze.State.TRY);
freeze.setXid(xid);
freezeMapper.insert(freeze);
// 2.从account表中减去冻结金额
accountMapper.deduct(userId, money);
}
//confirm
@Override
@Transactional
public void confirm(BusinessActionContext context) {
// 删除冻结数据
freezeMapper.deleteById(context.getXid());
}
//cancel
@Override
@Transactional
public void cancel(BusinessActionContext context) {
// 查看冻结数据状态,并执行补偿
// 1.通过业务上下文对象获取
String userId = context.getActionContext("userId").toString();
Integer money = (Integer) context.getActionContext("money");
String xid = context.getXid();
// 2.判断try是否执行,如果未执行,则进行空回滚
AccountFreeze accountFreeze = freezeMapper.selectById(xid);
if (accountFreeze==null){
// 添加一条记录,标准cancel执行过
AccountFreeze freeze = new AccountFreeze();
freeze.setUserId(userId);
freeze.setFreezeMoney(money);
freeze.setState(AccountFreeze.State.CANCEL);
freeze.setXid(xid);
freezeMapper.insert(freeze);
return;
}
// 3.恢复可用金额
accountMapper.refund(userId, money);
// 4.移除冻结金额,修改事务状态
freezeMapper.deleteById(context.getXid());
}
}