SpringCloud Stream消息驱动

一、Stream消息驱动概述

1.1 为什么引入cloud stream?解决的痛点是什么?

  1. 市面上存在着多种消息中间件技术
    ActiveMQ,RabbitMQ,RocketMQ,Kafka
    那么每多出来一种新的技术,就要付出响应的学习成本
    消息中间件技术的多样导致开发者的学习成本很大

  2. 不同的系统中会用到不同的消息中间件,那么当需要系统进行整合时,或者系统进行切换时
    由于用的是不同的中间件技术,该怎么整合切换。
    存在多种MQ的情况时,如何进行切换、维护和开发?
    具体的实现,需要的成本很大。

  3. 那么有没有一种新的技术,让我们不再关注具体的MQ的细节,我们只需要用一种适配绑定的方式,自动的给我们在各种MQ内切换。

    4. 引出了SpringCloud Stream
    屏蔽底层的细节差异,让我只需要操作一个Cloud Stream,就可以操作底层下面各种各样不同的MQ。达到我们以更小的代价实现切换,维护,开发。

    1.2 概述

    什么是SpringCloudStream?

    官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。

    应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream中binder对象交互。
    通过我们配置来binding(绑定) ,而 Spring Cloud Stream 的 binder对象负责与消息中间件交互
    所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。

    通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。
    Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。

    目前仅支持RabbitMQ、Kafka

一句话:SpringCloud Stream 屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。

官网

官网 文档 中文指导手册
版本要求
image.png
绑定器对象:Binder Implementations
就是靠它屏蔽了我们底层的MQ的差异。

1.3 设计思想

1.3.1 传统的消息中间件的流程

image.png
image.png

1.3.2 为什么用Cloud Stream

比方说我们同时用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,整合和切换就会有很大的成本。像RabbitMQ有exchange,kafka有Topic和Partitions分区。
image.png
这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。

1.3.3 stream是怎么统一底层差异的?

image.png

1.3.4 Binder

在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性,通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。
image.png
通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。
Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUT和OUTPUT,input对应于消费者(消费者从Stream接收消息),output对应于生产者(生产者从Stream发布消息)。

Stream中的消息通信方式遵循了发布-订阅模式。Topic主题进行广播:在RabbitMQ就是Exchange;在Kakfa中就是Topic

1.4 Spring Cloud Stream标准流程套路

image.png

  • Binder:很方便的连接中间件,屏蔽差异(用于连接中间件与生产/消费者)
  • Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
  • Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出(output),接受消息就是输入(input)。(简单的理解为输出/输如)

    1.5 编码API和常用注解

    image.png
    image.png

    二、入门案例

    2.1 案例说明

    RabbitMQ环境已经OK;
    工程中新建三个子模块:

  • cloud-stream-rabbitmq-provider8801, 作为生产者进行发消息模块

  • cloud-stream-rabbitmq-consumer8802,作为消息接收模块
  • cloud-stream-rabbitmq-consumer8803 作为消息接收模块

    2.2 消息驱动之生产者8801

    2.2.1 新建module

    cloud-stream-rabbitmq-provider8801:作为生产者发送消息模块

    2.2.2 pom

    1. <?xml version="1.0" encoding="UTF-8"?>
    2. <project xmlns="http://maven.apache.org/POM/4.0.0"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    5. <parent>
    6. <artifactId>jdk8cloud2021</artifactId>
    7. <groupId>com.atguigu.springcloud</groupId>
    8. <version>1.0-SNAPSHOT</version>
    9. </parent>
    10. <modelVersion>4.0.0</modelVersion>
    11. <artifactId>cloud-stream-rabbitmq-provider8801</artifactId>
    12. <properties>
    13. <maven.compiler.source>8</maven.compiler.source>
    14. <maven.compiler.target>8</maven.compiler.target>
    15. </properties>
    16. <dependencies>
    17. <!--rabbitMQ-->
    18. <dependency>
    19. <groupId>org.springframework.cloud</groupId>
    20. <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    21. </dependency>
    22. <!--eureka client-->
    23. <dependency>
    24. <groupId>org.springframework.cloud</groupId>
    25. <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    26. </dependency>
    27. <!--web/actuator这两个一般一起使用,写在一起-->
    28. <dependency>
    29. <groupId>org.springframework.boot</groupId>
    30. <artifactId>spring-boot-starter-web</artifactId>
    31. </dependency>
    32. <dependency>
    33. <groupId>org.springframework.boot</groupId>
    34. <artifactId>spring-boot-starter-actuator</artifactId>
    35. </dependency>
    36. <!--基础配置-->
    37. <dependency>
    38. <groupId>org.springframework.boot</groupId>
    39. <artifactId>spring-boot-devtools</artifactId>
    40. <scope>runtime</scope>
    41. <optional>true</optional>
    42. </dependency>
    43. <dependency>
    44. <groupId>org.projectlombok</groupId>
    45. <artifactId>lombok</artifactId>
    46. <optional>true</optional>
    47. </dependency>
    48. <dependency>
    49. <groupId>org.springframework.boot</groupId>
    50. <artifactId>spring-boot-starter-test</artifactId>
    51. <scope>test</scope>
    52. </dependency>
    53. </dependencies>
    54. </project>

    2.2.3 application.yml

    这里有个报错,但是正常使用
    image.png
    报红解决方法:

  • output前面加”- “:

  • defaultRabbit用大括号{}括起来: ```yaml server: port: 8801

spring: application: name: cloud-stream-provider cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型 environment: # 设置rabbitmq的相关的环境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服务的整合处理 output: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置

eureka: client: # 客户端进行Eureka注册的配置 service-url: defaultZone: http://localhost:7001/eureka,http://localhost:7002/eureka instance: lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒) lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒) instance-id: send-8801.com # 在信息列表时显示主机名称 prefer-ip-address: true # 访问的路径变为IP地址

  1. <a name="r3RFP"></a>
  2. #### 2.2.4 主启动类
  3. ```java
  4. package com.atguigu.springcloud;
  5. import org.springframework.boot.SpringApplication;
  6. import org.springframework.boot.autoconfigure.SpringBootApplication;
  7. import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
  8. @SpringBootApplication
  9. @EnableEurekaClient
  10. public class StreamMQMain8801 {
  11. public static void main(String[] args) {
  12. SpringApplication.run(StreamMQMain8801.class, args);
  13. }
  14. }

2.2.5 业务类

我们此时要写的代码,要注意是和MQ交互,而不是传统的controller调用service。
image.png
我们现在写的代码是基于SpringCloudStream,然后做output指定通道,开启交互绑定器,再和中间件进行交互。

发送消息接口:
  1. package com.atguigu.springcloud.service;
  2. public interface IMessageProvider {
  3. public String send() ;
  4. }

发送消息接口实现类:
  1. Source 定义消息的发送管道:

image.png
这个Source哪来的呢? 简单的可理解为参照对象是SpringCloudStream自身, 从Stream发出消息就是输出,接收消息就是输入。 这里我们可以理解为我们定义一个消息生产者的发送管道:消息源。
image.png

  1. 创建并发送消息

这里MessageChannel的实例名必须是output,要不然无法启动
image.png

  1. package com.atguigu.springcloud.service.impl;
  2. import com.atguigu.springcloud.service.IMessageProvider;
  3. import org.springframework.cloud.stream.annotation.EnableBinding;
  4. import org.springframework.cloud.stream.messaging.Source;
  5. import org.springframework.integration.support.MessageBuilder;
  6. import org.springframework.messaging.Message;
  7. import org.springframework.messaging.MessageChannel;
  8. import javax.annotation.Resource;
  9. import java.util.UUID;
  10. //可以理解为定义消息的发送管道Source对应output(生产者),Sink对应input(消费者)
  11. @EnableBinding(Source.class)
  12. //@Service:这里不需要了,这里不是传统的controller调用service。这个service是和rabbitMQ打交道的
  13. public class MessageProviderImpl implements IMessageProvider {
  14. @Resource
  15. private MessageChannel output; // 消息的发送管道,MessageChannel对象的实例名必须是output
  16. @Override
  17. public String send() {
  18. String serial = UUID.randomUUID().toString();
  19. //创建消息,通过output这个管道向消息中间件发消息
  20. // Message<String> message = MessageBuilder.withPayload(serial).build();
  21. // output.send(message);
  22. this.output.send(MessageBuilder.withPayload(serial).build()); // 创建并发送消息
  23. System.out.println("***serial: "+serial);
  24. return serial;
  25. }
  26. }

Controller:

  1. package com.atguigu.springcloud.controller;
  2. import com.atguigu.springcloud.service.IMessageProvider;
  3. import org.springframework.web.bind.annotation.GetMapping;
  4. import org.springframework.web.bind.annotation.RestController;
  5. import javax.annotation.Resource;
  6. @RestController
  7. public class SendMessageController {
  8. @Resource
  9. private IMessageProvider messageProvider;
  10. @GetMapping(value = "/sendMessage")
  11. public String sendMessage()
  12. {
  13. return messageProvider.send();
  14. }
  15. }

2.2.6 测试

启动7001、7002、8801
image.png
多次访问http://localhost:8801/sendMessage
image.png
image.png

2.3 消息驱动之消费者8802

2.3.1 新建cloud-stream-rabbitmq-consumer8802

2.3.2 pom

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <parent>
  6. <artifactId>jdk8cloud2021</artifactId>
  7. <groupId>com.atguigu.springcloud</groupId>
  8. <version>1.0-SNAPSHOT</version>
  9. </parent>
  10. <modelVersion>4.0.0</modelVersion>
  11. <artifactId>cloud-stream-rabbitmq-consumer8802</artifactId>
  12. <properties>
  13. <maven.compiler.source>8</maven.compiler.source>
  14. <maven.compiler.target>8</maven.compiler.target>
  15. </properties>
  16. <dependencies>
  17. <!--rabbitMQ-->
  18. <dependency>
  19. <groupId>org.springframework.cloud</groupId>
  20. <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
  21. </dependency>
  22. <!--eureka client-->
  23. <dependency>
  24. <groupId>org.springframework.cloud</groupId>
  25. <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
  26. </dependency>
  27. <!--web/actuator这两个一般一起使用,写在一起-->
  28. <dependency>
  29. <groupId>org.springframework.boot</groupId>
  30. <artifactId>spring-boot-starter-web</artifactId>
  31. </dependency>
  32. <dependency>
  33. <groupId>org.springframework.boot</groupId>
  34. <artifactId>spring-boot-starter-actuator</artifactId>
  35. </dependency>
  36. <!--基础配置-->
  37. <dependency>
  38. <groupId>org.springframework.boot</groupId>
  39. <artifactId>spring-boot-devtools</artifactId>
  40. <scope>runtime</scope>
  41. <optional>true</optional>
  42. </dependency>
  43. <dependency>
  44. <groupId>org.projectlombok</groupId>
  45. <artifactId>lombok</artifactId>
  46. <optional>true</optional>
  47. </dependency>
  48. <dependency>
  49. <groupId>org.springframework.boot</groupId>
  50. <artifactId>spring-boot-starter-test</artifactId>
  51. <scope>test</scope>
  52. </dependency>
  53. </dependencies>
  54. </project>

2.3.3 application.yml

8801是生产者是output,8802是消费者是input
image.png

  1. server:
  2. port: 8802
  3. spring:
  4. application:
  5. name: cloud-stream-consumer
  6. cloud:
  7. stream:
  8. binders: # 在此处配置要绑定的rabbitmq的服务信息;
  9. defaultRabbit: # 表示定义的名称,用于于binding整合
  10. type: rabbit # 消息组件类型
  11. environment: # 设置rabbitmq的相关的环境配置
  12. spring:
  13. rabbitmq:
  14. host: localhost
  15. port: 5672
  16. username: guest
  17. password: guest
  18. bindings: # 服务的整合处理
  19. input: # 这个名字是一个通道的名称
  20. destination: studyExchange # 表示要使用的Exchange名称定义
  21. content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
  22. binder: { defaultRabbit } # 设置要绑定的消息服务的具体设置
  23. eureka:
  24. client: # 客户端进行Eureka注册的配置
  25. service-url:
  26. defaultZone: http://localhost:7001/eureka,http://localhost:7002/eureka
  27. instance:
  28. lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
  29. lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
  30. instance-id: receive-8802.com # 在信息列表时显示主机名称
  31. prefer-ip-address: true # 访问的路径变为IP地址

2.3.4 主启动类

  1. package com.atguigu.springcloud;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
  5. @SpringBootApplication
  6. @EnableEurekaClient
  7. public class StreamMQMain8802 {
  8. public static void main(String[] args) {
  9. SpringApplication.run(StreamMQMain8802.class, args);
  10. }
  11. }

2.3.5 业务类

  1. package com.atguigu.springcloud.controller;
  2. import org.springframework.beans.factory.annotation.Value;
  3. import org.springframework.cloud.stream.annotation.EnableBinding;
  4. import org.springframework.cloud.stream.annotation.StreamListener;
  5. import org.springframework.cloud.stream.messaging.Sink;
  6. import org.springframework.messaging.Message;
  7. import org.springframework.stereotype.Component;
  8. @Component
  9. @EnableBinding(Sink.class) //可以理解为我们定义一个消息消费者的接收管道
  10. public class ReceiveMessageListener {
  11. @Value("${server.port}")
  12. private String serverPort;
  13. @StreamListener(Sink.INPUT) //输入源:作为一个消息监听者
  14. public void input(Message<String> message) {
  15. //获取到消息
  16. String messageStr = message.getPayload();
  17. System.out.println("消费者1号,------->接收到的消息:" + messageStr + "\t port: "+serverPort);
  18. }
  19. }

2.3.6 测试

启动7001、7002、8801、8802
image.png
image.png
image.png
image.png
测试成功

三、高级特性:分组消费与持久化

3.1 依照8802,clone出一份cloud-stream-rabbitmq-consumer8803

3.1.1 pom

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <parent>
  6. <artifactId>jdk8cloud2021</artifactId>
  7. <groupId>com.atguigu.springcloud</groupId>
  8. <version>1.0-SNAPSHOT</version>
  9. </parent>
  10. <modelVersion>4.0.0</modelVersion>
  11. <artifactId>cloud-stream-rabbitmq-consumer8803</artifactId>
  12. <properties>
  13. <maven.compiler.source>8</maven.compiler.source>
  14. <maven.compiler.target>8</maven.compiler.target>
  15. </properties>
  16. <dependencies>
  17. <!--rabbitMQ-->
  18. <dependency>
  19. <groupId>org.springframework.cloud</groupId>
  20. <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
  21. </dependency>
  22. <!--eureka client-->
  23. <dependency>
  24. <groupId>org.springframework.cloud</groupId>
  25. <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
  26. </dependency>
  27. <!--web/actuator这两个一般一起使用,写在一起-->
  28. <dependency>
  29. <groupId>org.springframework.boot</groupId>
  30. <artifactId>spring-boot-starter-web</artifactId>
  31. </dependency>
  32. <dependency>
  33. <groupId>org.springframework.boot</groupId>
  34. <artifactId>spring-boot-starter-actuator</artifactId>
  35. </dependency>
  36. <!--基础配置-->
  37. <dependency>
  38. <groupId>org.springframework.boot</groupId>
  39. <artifactId>spring-boot-devtools</artifactId>
  40. <scope>runtime</scope>
  41. <optional>true</optional>
  42. </dependency>
  43. <dependency>
  44. <groupId>org.projectlombok</groupId>
  45. <artifactId>lombok</artifactId>
  46. <optional>true</optional>
  47. </dependency>
  48. <dependency>
  49. <groupId>org.springframework.boot</groupId>
  50. <artifactId>spring-boot-starter-test</artifactId>
  51. <scope>test</scope>
  52. </dependency>
  53. </dependencies>
  54. </project>

3.1.2 application.yml

  1. server:
  2. port: 8803
  3. spring:
  4. application:
  5. name: cloud-stream-consumer
  6. cloud:
  7. stream:
  8. binders: # 在此处配置要绑定的rabbitmq的服务信息;
  9. defaultRabbit: # 表示定义的名称,用于于binding整合
  10. type: rabbit # 消息组件类型
  11. environment: # 设置rabbitmq的相关的环境配置
  12. spring:
  13. rabbitmq:
  14. host: localhost
  15. port: 5672
  16. username: guest
  17. password: guest
  18. bindings: # 服务的整合处理
  19. input: # 这个名字是一个通道的名称
  20. destination: studyExchange # 表示要使用的Exchange名称定义
  21. content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
  22. binder: { defaultRabbit } # 设置要绑定的消息服务的具体设置
  23. eureka:
  24. client: # 客户端进行Eureka注册的配置
  25. service-url:
  26. defaultZone: http://localhost:7001/eureka,http://localhost:7002/eureka
  27. instance:
  28. lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
  29. lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
  30. instance-id: receive-8803.com # 在信息列表时显示主机名称
  31. prefer-ip-address: true # 访问的路径变为IP地址

3.1.3 主启动类

  1. package com.atguigu.springcloud;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
  5. @SpringBootApplication
  6. @EnableEurekaClient
  7. public class StreamMQMain8803 {
  8. public static void main(String[] args) {
  9. SpringApplication.run(StreamMQMain8803.class, args);
  10. }
  11. }

3.1.4 业务类

  1. package com.atguigu.springcloud.service;
  2. import org.springframework.beans.factory.annotation.Value;
  3. import org.springframework.cloud.stream.annotation.StreamListener;
  4. import org.springframework.cloud.stream.messaging.Sink;
  5. import org.springframework.messaging.Message;
  6. public class ReceiveMessageListener {
  7. @Value("${server.port}")
  8. private String serverPort;
  9. @StreamListener(Sink.INPUT)
  10. public void input(Message<String> message)
  11. {
  12. System.out.println("消费者2号,------->接收到的消息:" + message.getPayload()+"\t port: "+serverPort);
  13. }
  14. }

3.2 启动测试

启动7001、7002、8801(消息生产)、8802(消息消费)、8803(消息消费)
image.png
此时studyexchange有两个订阅者:8802、8803
image.png
image.png
测试成功

3.3 运行后的问题1:重复消费问题

目前8801发送一条消息后,8802和8803会同时收到8801的消息,存在重复消费问题。

3.3.1 为什么要解决重复消费问题

比如8801下一个订单,但是被两个服务获取消费,会多扣一次款。
image.png
默认分组:流水号
image.png
8802和8803默认是两个不同的分组。不同的微服务,默认分组是不同的,不同的组可以消费同一个消息

3.3.2 解决:消息分组

微服务应用放置于同一个group中,就能够保证消息只会被其中一个应用消费一次。不同的组是可以全面消费的(重复消费),同一个组内的多个消费者会发生竞争关系,只有其中一个可以消费。
先自定义分组,然后自定义配置分为同一个组,解决重复消费问题。

3.3.3 自定义分组

将8802和8803分为两个不同的组,atguiguA和atguiguB
修改8802和8803的yml
image.png
可以看到,分组已经变成我们自定义的atguiguA和atguiguB了
image.png
image.png
虽然实现了自定义分组,但是重复消费的问题依然存在。
分布式微服务应用为了实现高可用和负载均衡,实际上都会部署多个实例,本例阳哥启动了两个消费微服务(8802/8803)
多数情况,生产者发送消息给某个具体微服务时只希望被消费一次,按照上面我们启动两个应用的例子,虽然它们同属一个应用,但是这个消息出现了被重复消费两次的情况。为了解决这个问题,在Spring Cloud Stream中提供了消费组的概念。

3.3.4 消费组

将8802和8803分为同一个组atguiguA。
image.png
测试,现在没有出现重复消费问题。
image.png
8802/8803实现了轮询分组,每次只有一个消费者接收消息,8801模块的发的消息只能被8802或8803其中一个接收到,这样避免了重复消费。

3.4 运行后的问题2:消息持久化问题

通过上述,解决了重复消费问题,再看看持久化

  1. 停止8802/8803并去除掉8802的分组group: atguiguA(8803的分组group: atguiguA没有去掉)
  2. 8801先发送4条消息到rabbitmq

image.png

  1. 先启动8802,无分组属性配置,后台没有打出来消息

发现8802没有收到消息,消息丢失。。。。

  1. 再启动8803,有分组属性配置,后台打印出来了MQ上的消息

image.png

SpringCloud Sleuth 分布式请求链路跟踪

一、概述

分布式请求链路跟踪:
为什么会出现这个技术,要解决哪些问题。
在微服务框架中,一个客户端发起的请求在后端系统中会经过多次不同的服务节点调用来协同产生最后的请求结果,每一个前段请求都会形成一条复杂的分布式服务调用链路,链路中的任何一环出现高延时或错误都会引起整个请求最后的失败。
SpringCloudSleuth提供了一套完整的服务跟踪的解决方案,在分布式系统中提供乐追踪解决方案并且兼容支持了zipkin。
官网

二、搭建链路监控步骤

2.1 zipkin

Sleuth:负责跟踪整理,zipkin:负责展现

2.1.1 下载

SpringCloud从F版起已不需要自己构建Zipkin Server了,只需调用jar包即可
下载地址
zipkin-server-2.12.9-exec.jar

2.1.2 运行jar&运行控制台

cd到zipkin-server-2.12.9-exec.jar的下载目录
直接在cmd中运行java -jar zipkin-server-2.12.9-exec.jar
image.png
访问http://localhost:9411/zipkin/ web交互界面
image.png
zipkin启动成功

2.1.3 完整的调用链路

表示一请求链路,一条链路通过Trace Id唯一标识,Span标识发起的请求信息,各span通过parent id 关联起来
image.png
Trace:类似于树结构的Span集合,表示一条调用链路,存在唯一标识。
Span:表示调用链路来源,通俗的理解Span就是一次请求信息。各个Span通过parentID关联起来。
image.png
image.png

2.2 服务提供者cloud-provider-payment8001

2.2.1 pom

引入spring-cloud-starter-zipkin依赖,其包含了sleuth+zipkin

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <parent>
  6. <artifactId>jdk8cloud2021</artifactId>
  7. <groupId>com.atguigu.springcloud</groupId>
  8. <version>1.0-SNAPSHOT</version>
  9. </parent>
  10. <modelVersion>4.0.0</modelVersion>
  11. <artifactId>cloud-provider-payment8001</artifactId>
  12. <properties>
  13. <maven.compiler.source>8</maven.compiler.source>
  14. <maven.compiler.target>8</maven.compiler.target>
  15. </properties>
  16. <dependencies>
  17. <!--包含了sleuth+zipkin-->
  18. <dependency>
  19. <groupId>org.springframework.cloud</groupId>
  20. <artifactId>spring-cloud-starter-zipkin</artifactId>
  21. </dependency>
  22. <!--eureka-client-->
  23. <dependency>
  24. <groupId>org.springframework.cloud</groupId>
  25. <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
  26. </dependency>
  27. <dependency><!-- 引入自己定义的api通用包,可以使用Payment支付Entity -->
  28. <groupId>com.atguigu.springcloud</groupId>
  29. <artifactId>cloud-api-commons</artifactId>
  30. <version>${project.version}</version>
  31. </dependency>
  32. <dependency>
  33. <groupId>org.springframework.boot</groupId>
  34. <artifactId>spring-boot-starter-web</artifactId>
  35. </dependency>
  36. <dependency>
  37. <groupId>org.springframework.boot</groupId>
  38. <artifactId>spring-boot-starter-actuator</artifactId>
  39. </dependency>
  40. <dependency>
  41. <groupId>org.mybatis.spring.boot</groupId>
  42. <artifactId>mybatis-spring-boot-starter</artifactId>
  43. </dependency>
  44. <dependency>
  45. <groupId>com.alibaba</groupId>
  46. <artifactId>druid-spring-boot-starter</artifactId>
  47. <version>1.1.10</version>
  48. </dependency>
  49. <!--mysql-connector-java-->
  50. <dependency>
  51. <groupId>mysql</groupId>
  52. <artifactId>mysql-connector-java</artifactId>
  53. </dependency>
  54. <!--jdbc-->
  55. <dependency>
  56. <groupId>org.springframework.boot</groupId>
  57. <artifactId>spring-boot-starter-jdbc</artifactId>
  58. </dependency>
  59. <dependency>
  60. <groupId>org.springframework.boot</groupId>
  61. <artifactId>spring-boot-devtools</artifactId>
  62. <scope>runtime</scope>
  63. <optional>true</optional>
  64. </dependency>
  65. <dependency>
  66. <groupId>org.projectlombok</groupId>
  67. <artifactId>lombok</artifactId>
  68. <optional>true</optional>
  69. </dependency>
  70. <dependency>
  71. <groupId>org.springframework.boot</groupId>
  72. <artifactId>spring-boot-starter-test</artifactId>
  73. <scope>test</scope>
  74. </dependency>
  75. </dependencies>
  76. </project>

2.2.2 yml

image.png
注意缩进!

  1. server:
  2. port: 8001
  3. spring:
  4. application:
  5. name: cloud-payment-service
  6. datasource:
  7. type: com.alibaba.druid.pool.DruidDataSource # 当前数据源操作类型
  8. driver-class-name: com.mysql.cj.jdbc.Driver
  9. url: jdbc:mysql://localhost:3306/dbCloud?useUnicode=true&characterEncoding=utf-8&useSSL=false
  10. username: root
  11. password: 10086
  12. zipkin:
  13. base-url: http://localhost:9411
  14. sleuth:
  15. sampler:
  16. #采样率值介于 0 到 1 之间,1 则表示全部采集
  17. probability: 1
  18. eureka:
  19. instance:
  20. instance-id: payment8001
  21. prefer-ip-address: true #访问路径可以显示IP地址
  22. #心跳检测与续约时间
  23. #开发时设置小些,保证服务关闭后注册中心能及时剔除服务
  24. #Eureka客户端向服务端发送心跳的时间间隔,单位为秒(默认是30秒)
  25. lease-renewal-interval-in-seconds: 1
  26. #Eureka服务端在收到最后一次心跳后等待时间上限,单位为秒(默认是90秒),超时将剔除服务
  27. lease-expiration-duration-in-seconds: 2
  28. client:
  29. #表示是否将自己注册进EurekaServer默认为true。
  30. register-with-eureka: true
  31. #是否从EurekaServer抓取已有的注册信息,默认为true。单节点无所谓,集群必须设置为true才能配合ribbon使用负载均衡
  32. fetchRegistry: true
  33. service-url:
  34. # defaultZone: http://localhost:7001/eureka # 单机版
  35. defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka #集群版
  36. mybatis:
  37. mapperLocations: classpath:mapper/*.xml
  38. type-aliases-package: com.atguigu.springcloud.entities # 所有Entity别名类所在包

2.2.3 业务类PaymentController

  1. @GetMapping("/payment/zipkin")
  2. public String paymentZipkin() {
  3. return "hi ,i'am paymentzipkin server fall back,welcome to atguigu,O(∩_∩)O哈哈~";
  4. }

2.3 服务消费者cloud-consumer-order80

2.3.1 pom

引入spring-cloud-starter-zipkin依赖,其包含了sleuth+zipkin

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <parent>
  6. <artifactId>jdk8cloud2021</artifactId>
  7. <groupId>com.atguigu.springcloud</groupId>
  8. <version>1.0-SNAPSHOT</version>
  9. </parent>
  10. <modelVersion>4.0.0</modelVersion>
  11. <artifactId>cloud-consumer-order80</artifactId>
  12. <properties>
  13. <maven.compiler.source>8</maven.compiler.source>
  14. <maven.compiler.target>8</maven.compiler.target>
  15. </properties>
  16. <dependencies>
  17. <!--包含了sleuth+zipkin-->
  18. <dependency>
  19. <groupId>org.springframework.cloud</groupId>
  20. <artifactId>spring-cloud-starter-zipkin</artifactId>
  21. </dependency>
  22. <!--eureka-client-->
  23. <dependency>
  24. <groupId>org.springframework.cloud</groupId>
  25. <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
  26. </dependency>
  27. <dependency><!-- 引入自己定义的api通用包,可以使用Payment支付Entity -->
  28. <groupId>com.atguigu.springcloud</groupId>
  29. <artifactId>cloud-api-commons</artifactId>
  30. <version>${project.version}</version>
  31. </dependency>
  32. <dependency>
  33. <groupId>org.springframework.boot</groupId>
  34. <artifactId>spring-boot-starter-web</artifactId>
  35. </dependency>
  36. <dependency>
  37. <groupId>org.springframework.boot</groupId>
  38. <artifactId>spring-boot-starter-actuator</artifactId>
  39. </dependency>
  40. <dependency>
  41. <groupId>org.springframework.boot</groupId>
  42. <artifactId>spring-boot-devtools</artifactId>
  43. <scope>runtime</scope>
  44. <optional>true</optional>
  45. </dependency>
  46. <dependency>
  47. <groupId>org.projectlombok</groupId>
  48. <artifactId>lombok</artifactId>
  49. <optional>true</optional>
  50. </dependency>
  51. <dependency>
  52. <groupId>org.springframework.boot</groupId>
  53. <artifactId>spring-boot-starter-test</artifactId>
  54. <scope>test</scope>
  55. </dependency>
  56. </dependencies>
  57. </project>

2.3.2 yml

image.png
注意缩进!

  1. server:
  2. port: 80
  3. spring:
  4. application:
  5. name: cloud-order-service
  6. zipkin:
  7. base-url: http://localhost:9411
  8. sleuth:
  9. sampler:
  10. probability: 1
  11. eureka:
  12. client:
  13. #表示是否将自己注册进EurekaServer默认为true。
  14. register-with-eureka: true
  15. #是否从EurekaServer抓取已有的注册信息,默认为true。单节点无所谓,集群必须设置为true才能配合ribbon使用负载均衡
  16. fetchRegistry: true
  17. service-url:
  18. # defaultZone: http://localhost:7001/eureka
  19. defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka #集群版
  20. instance:
  21. instance-id: order80
  22. prefer-ip-address: true

2.3.3 业务类OrderController

  1. // ====================> zipkin+sleuth
  2. @GetMapping("/comsumer/payment/zipkin")
  3. public String paymentZipKin() {
  4. String result = restTemplate.getForObject("http://localhost:8001" + "/payment/zipkin/", String.class);
  5. return result;
  6. }

2.4 测试

启动7001、7002、8001、80
image.png

8001自测
image.png

80调用8001
image.png

打开浏览器访问:http://localhost:9411
image.png
order服务调用了支付服务
image.png
说明:order调用的payment服务,发送的是什么请求,请求链接,都有详细的记录
查看依赖关系
image.png