什么是消息驱动开发

image.png
分布式环境下,消息中间件支持同步方式和异步方式的消息传递。异步方式的消息传递比同步方式具有更强的容错性,能够保障在系统故障时消息正常可靠地传输。异步消息中间件的消息传递模式又可以分为两种:点对点模式和”发布-订阅”模式。

  • 点对点模式:该模式常用于消息生产者和消息消费者之间点到点的通信;
  • “发布-订阅”模式:该模式使用主题(Topic)代替点对点中的目的消费者。

1、基于消息中间件开发的优点

  • 降低应用耦合:减少微服务之间的依赖和调用。
  • 改善应用性能:微服务的处理变成了异步处理模式,提升了性能,使系统具有了更好的响应延迟。
  • 提升应用的可扩展性
  • 提高系统的可用性:减低微服务之间的相应影响。
  • 更灵活的系统集成:通过消息中间件进行集成,使系统的继承可以跨平台,同时降低了对网络协议复杂性的处理。

2、基于消息中间件开发的缺点

  • 更复杂的应用架构
  • 更具挑战的开发模式
  • 更陡峭的学习曲线

    Spring Cloud Stream简介

    Spring Cloud Stream是创建消息驱动微服务应用的框架,继承了Spring应用框架的理念,实现了一个基于注解驱动的框架,可以让开发者通过注解的方式,在应用中实现消息的发送和订阅处理。
    Spring Cloud Stream支持与多种消息中间件整合,如Kafka、RabbitMQ,使用Spring Integration提供与消息代理之间的连接,为应用程序的消息发布和消费提供了一个平台中立的接口,将实现的细节独立于应用代码之外。
    1、应用模型
    image.png
    消息发送通道接口Source
    用于Spring Cloud Stream与外界通道的绑定,通过在该接口中通过注解的方式定义消息通道的名称。当使用该通道接口发送一个消息时,会将所要发送的消息进行序列化,然后通过该接口锁提供的MessageChannel将所要发送的消息发送到相应的消息中间件中。
    消息通道Channel
    是对消息队列的一种抽象,用来存放消息发布者发布的消息或者消费者所要消费的消息。Spring Cloud Stream进行了抽象,开发者只需要定义好消息通道,消息通道具体发送到哪个消息队列则在项目配置文件中进行配置。
    消息绑定器Binder
    Spring Cloud Stream通过定义绑定器作为中间层,实现了应用程序与具体消息中间件细节之间的隔离,向应用程序暴露统一的消息通道,使应用程序不需要考虑与各种不同的消息中间件的对接。
    Spring Cloud Stream默认提供了对RabbitMQ和Kafka的绑定器,在应用中开发只需要引入相应的绑定器就可以实现与RabbitMQ或Kafka 的对接,从而进行消息的发送与监听。
    消息监听通道Sink
    是Spring Cloud Stream提供应用程序监听通道消息的抽象处理接口。当从消息中间件接收到一个待处理消息时,该接口将负责把消息数据反序列化为Java对象,然后交由业务所定义的具体业务处理方法进行处理。
    2、编程模型
    声明和绑定消息通道
    要连接到消息中间件的那个通道上。
    @EnableBinding注解是告诉应用需要出发消息通道的绑定,可以声明一个或多个消息发送通道接口或消息监听通道接口参数,
    @Input直接是用在监听通道接口的方法定义上。用来绑定一个具体的消息通道。
    image.png
    @Output注解用在消息发送通道接口的方法定义上,同来绑定消息发送的通道。
    image.png
    开箱即用的消息通道接口定义Processor,同时继承了Source和Sink,该接口所定义的通道是一个消息发送通道同时也是一个消息监听通道。
    访问消息通道
    image.png
    发布或监听消息
    在消息监听处理时是可以使用Spring Integration的注解或者Spring Cloud Stream的@StreamListener注解来实现。
    image.png
    Spring Cloud Stream提供了一个扩展的MessageConverter机制。对所绑定的通道进行消息数据的处理,就是说MessageConverter机制会使用contentType头所指定的消息内容格式,将所接收的消息负载进行反序列化,解析为Java对象。
    消息监听返回数据到其他消息通道时,可以使用@SendTo注解指定返回数据的输出通道。
    image.png
    3、使用”发送-订阅“模式
    发布-订阅模式可以将两个或多个互相依赖的应用进行解耦,使它们可以额各自独立地改变和复用,给系统维护、扩展和重用带来便利。
    image.png
    如果微服务之间需要进行通信,应尽量采取发布-订阅模式,避免点对点的调用。一方面可以大大减少微服务之间的耦合,另一方面可以保持他们之间的独立性和自治能力。

    Kafka使用指南

    Kafka使用Scala和Java进行编写,具有快速、可扩展、高吞吐量、内置分区、支持数据副本和可容错等特性,能够支持海量数据的高效传递。同时Kafka支持消息持久化,能够保障对数据高效处理的同时,防止消息数据的丢失。
    1、Kafka基础知识
    主题(Topic):
    每一个不同类别的消息称为一个主题。同步Topic的消息是分开存储的。同一个Topic的消息可能保存在一个或多个Broker中,但对于生产者或消费者来说,只需指定消息的Topic就可以,不用关心消息数据的存储位置。
    生产者(Producer):
    生产者也就是消息的发布者。负责将消息发布到Kafka中的某个Topic中,Broker在接收到生产者发送的消息后,将该消息追加到当前分区中。生产者在发布消息的时候也可以选择将消息发布到Topic上的哪个分区上。
    消费者(Consumer):
    消费者从Broker中读取消息数据并进行处理。一个消费者可以同时消费多个Topic中的消息。
    消息代理(Broker):
    生产者所发布的消息将保存在一组Kafka服务器中,为Kafka集群。而集群中的每个Kafka服务器节点就是一个消息代理Broker。消费者通过Broker从获取所订阅的消息并进行消费。
    消息分区(Partition):
    Topic所发布的消息数据将会被分割为一个或多个分区,每一个分区数据又可以使用多个Segment文件进行存储。在一个分区中的消息数据是有序的,而多个分区之间则是无序的。如果一个主题的数据需要严格保证消息的消费顺序,那么需要将分区数设为1。
    当生产者将消息存储到一个分区中时,Kafka会为每条消息数据建立一个唯一索引号(index),这个索引号称为偏移量(offset)。对于消费者来说都会在本地保存该offset,当消费者正常消费时,相应本地的偏移量也会增加。同时消息者可以自己控制偏移量,以便进行消息的重新消费等处理。
    当新的消息数据追加到分区中时,Kafka集群就会在不同的Broker之间做备份,保证了消息数据的可靠性。
    Kafka还支持实时的流处理。通过流处理可以持续从某个Topic中获取输入数据,并进行处理加工,然后将其写入输出主题中。对于复杂的转换,Kafka提供了Streams API来辅助流处理。通过这种实时的流处理,可以构建聚合计算或者将流连接到一起,形成复杂的应用。
    2、搭建Kafka环境
    因为Kafka是一个分布式的消息系统,Broker、Consumer都需要ZooKeeper来提供分布式支持。因此在启动Kafka服务器之前需要先启动一个ZooKeeper服务。
    1、启动ZooKeeper服务器
    zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties
    image.png
    2、启动Kafka服务
    bin/kafka-server-start /usr/local/etc/kafka/server.properties
    image.png

    使用消息对应用重构

    对原来的示例项目进行改进:

  • 改造商品微服务增加缓存处理功能,能够将远程请求所得到的的用户数据缓存到Redis中

  • 改造用户微服务,当一个用户信息被更新、删除时,可以通过Kafka发送一条信息给商品微服务
  • 改造商品微服务,增加用户信息变更消息监听功能,当接收到Kafka发送来的用户更新、删除等消息时。可以对缓存数据进行相应的更新处理。

image.png
1、为商品微服务增加缓存功能
增加Redis相关依赖
image.png
增加配置
image.png
修改相应的代码
image.png
2、为用户微服务添加消息发送功能
定义用户变更消息类:
image.png
增加pom依赖:
image.png
修改配置文件:
image.png
修改启动类
image.png
增加Service:
image.png
修改updateUserName
image.png
3、为商品微服务添加消息监听功能
增加pom依赖
image.png
修改启动类
image.png
增加配置
image.png
增加监听类
image.png
4、自定义消息通道

  • 首先需要增加一个自定义消息发送或者接口的接口;
  • 然后将消息发送或者消息监听连接到该通道上;
  • 最后修改项目配置文件,将该消息通道绑定到消息中间件具体的消息主题上。

自定义接收、发送消息通道
image.png
image.png
修改消息监听类代码:
image.png
修改配置文件

image.png

Spring Cloud Stream高级主题

1、单元测试
SpringCloud Stream提供了一个TestSupportBinder来支持单元测试,在没有连接到消息中间件的情况下完成测试。通过TestSupportBinder可以模拟访问消息通道,并进行消息的发送和监听。
对于消息发送,TestSupportBinder会注册一个类型为MessageCollector的Bean,通过该Bean可以获取所发送的消息,这样就可以判断消息是否发送成功。
对于消息监听,则可以通过直接向入站通道发送消息进行模拟。
image.png
2、错误处理
Spring Cloud Stream提供了一个全局错误消息处理通道,当出现异常时,Spring Cloud Stream就会将该异常包装成ErrorMessage,然后发送到该消息通道中。
默认该消息通道的名称为errorChannel,可以通过配置指定通道的名称。
image.png
3、消息处理分发
消息分发的条件可以通过@StreamListener注解中的condition属性设定,条件可以使用SpEL表达式。
在进行消息分发处理是,Spring Cloud Stream会对每一个条件进行求值,所有符合条件的方法都会在同一个线程中执行,但并不保证执行的顺序。
image.png
4、消费者组与消息分区
发布-订阅模式通过共享主题使应用之间的连接更加容易,对于一个消息只需要一个实例进行处理即可,所以当一个应用存在多个实例时,这些实例之间便会成为同一个消息相互竞争的消费者。
Spring Cloud Stream通过消费者组的概念给这种情况进行建模。
消费者组的成员统一在一起消费所订阅消息中的所有消息,而消息中的每个分区只能由同一个消费者组内其中的一个消费者来消费。
再平衡(rebalance),将rebalance理解为一种一种协议,其用来规定一个消费者组下的所有消费者成员如何分配所订阅的消息通道中的消息分区。
在Spring Cloud Stream所提供的数据分片方案中,消息中间件的一个主题可以视为分隔成为多个分片,并确保消息发送者所发送的具有相同特性的消息数据可以被同一消费者实例所处理。
5、消息绑定器
Spring Cloud Stream通过提供了一个抽象的绑定器作为中间层,实现了与具体消息中间件(RabbitMQ、Kafka)连接,应用程序通过Spring Cloud Stream所暴露的统一的消息通道进行消息的发送与监听。通过这种方式一方面大大减少了使用消息中间件的难度,另一方面使应用代码与具体的消息中间件可以解耦,可以根据需要对接不同的消息中间件。
image.png
在进行消息发送前,需要调用绑定器的bindProducer()方法,并根据所要绑定的具体消息代理,创建一个消息通道。bindProducer()方法有以下3个参数:

  • name:要绑定的消息代理的名称。
  • outboundBindTarget:本地中用来发送消息的通道。
  • producerProperties:创建消息通道时的参数,如分区配置等。

消息监听需要调用bindConsumer()方法,创建一个消息监听通道,bindConsumer()方法有以下4个参数:

  • name:要板顶的消息代理的名称。
  • group:消费者组名。
  • inboundBindTarget:本地中用来进行消息监听的通道。
  • consumerProperties:创建消息通道时的参数。

Spring Cloud Stream的绑定器SPI(Service Provider Interface)由数个接口、一些开箱即用的工具类及服务发现策略组成,并提供了可插入机制实现了与外部多种消息中间件的连接。对于绑定器SPI最核心的就是绑定接口:
image.png
实现一个消息绑定器需要以下步骤:

  • 实现Binder接口。
  • 通过@Configuration注解对上面的实现类及所要连接的消息中间件进行相关配置的处理,并创建一个Bean。
  • 在classpath下的META-INF/spring.binders文件中配置该绑定器。

image.png

消息总线——Spring Cloud Bus

Spring Cloud Bus构建在Spring Cloud Stream之上,是一个轻量级的通信组件,可以将分布式系统中的节点与轻量级消息代理连接,从而实现状态更改广播或其他事件的广播。
在实现上Spring Cloud Bus基于Spring事件驱动模型。Spring时间驱动模型包含3个基本概念。

  • 事件:ApplicationEvent
  • 事件监听者:ApplicationListener
  • 时间发布者:ApplicationEventPublisher

Spring中的事件驱动模型其实是观察者模式的典型应用,通过这种处理方式可以解耦目标对象和它的依赖对象,目标对象只需要通知它的依赖对象即可。
1、完成配置自动刷新配置
image.png
(1)修改配置服务,引入Spring Cloud Bus。
image.png
image.png
(2)修改商品微服务,只需要引入Spring Cloud Bus依赖。
image.png
image.png
2、发布自定义事件
通过Spring Cloud Bus也可以发布自定义事件,所发布的事件需要继承自Remote ApplicationEvent。在发布事件时默认会将事件转换为JSON格式,在反序列化时也需要使用该时间的类型。因此,时间发布者和监听者都需要访问这个事件类,或者保持这两个类一致。也可以用@JsonTypeName注解来自定义序列化中的类名,但在接收端也要有同样的定义。
image.png
同样,该事件的代码也需要加一个事件监听类。
image.png
编写测试
image.png
启动类增加注解
image.png