9.2 探索 Spring Integration
Spring Integration 涵盖了许多集成场景。试图将所有这些内容都包含在一个章节中,就像试图将大象装进一个信封一样。我将展示一张 Spring Integration 大象的照片,而不是对 Spring Integration 进行全面的讨论,以便让你了解它是如何工作的。然后,将创建一个向 Taco Cloud 应用程序添加功能的集成流。
集成流由以下一个或多个组件组成。在编写更多代码之前,我们将简要地了解一下这些组件在集成流中所扮演的角色:
Channels —— 将信息从一个元素传递到另一个元素。
Filters —— 有条件地允许基于某些标准的消息通过流。
Transformers —— 更改消息值或将消息有效负载从一种类型转换为另一种类型。
Routers —— 直接将信息发送到几个渠道之一,通常是基于消息头。
Splitters —— 将收到的信息分成两条或多条,每条都发送到不同的渠道。
Aggregators —— 与分离器相反,它将来自不同渠道的多条信息组合成一条信息。
Service activators —— 将消息传递给某个 Java 方法进行处理,然后在输出通道上发布返回值。
Channel adapters —— 将通道连接到某些外部系统或传输。可以接受输入,也可以向外部系统写入。
Gateways —— 通过接口将数据传递到集成流。
在定义文件写入集成流时,你已经看到了其中的一些组件。FileWriterGateway 接口是将应用程序提交的文本写入文件的网关。还定义了一个转换器来将给定的文本转换为大写;然后声明一个服务网关,它执行将文本写入文件的任务。这个流有两个通道:textInChannel 和 fileWriterChannel,它们将其他组件相互连接起来。现在,按照承诺快速浏览一下集成流组件。
9.2.1 消息通道
消息通道意指消息移动的集成管道移动。它们是连接 Spring Integration 所有其他部分的管道。
Spring Integration 提供了多个管道的实现,包括以下这些:
- PublishSubscribeChannel —— 消息被发布到 PublishSubscribeChannel 后又被传递给一个或多个消费者。如果有多个消费者,他们都将会收到消息。
- QueueChannel —— 消息被发布到 QueueChannel 后被存储到一个队列中,直到消息被消费者以先进先出(FIFO)的方式拉取。如果有多个消费者,他们中只有一个能收到消息。
- PriorityChannel —— 与 QueueChannel 类似,但是与 FIFO 方式不同,消息被冠以 priority 的消费者拉取。
- RendezvousChannel —— 与 QueueChannel 期望发送者阻塞通道,直到消费者接收这个消息类似,这种方式有效的同步了发送者与消费者。
- DirectChannel —— 与 PublishSubscribeChannel 类似,但是是通过在与发送方相同的线程中调用消费者来将消息发送给单个消费者,此通道类型允许事务跨越通道。
- ExecutorChannel —— 与 DirectChannel 类似,但是消息分派是通过 TaskExecutor 进行的,在与发送方不同的线程中进行,此通道类型不支持事务跨通道。
- FluxMessageChannel —— Reactive Streams Publisher 基于 Project Reactor Flux 的消息通道。(我们将会在第 10 章讨论 Reactive Streams、Reactor 和 Flux)
在 Java 配置和 Java DSL 样式中,输入通道都是自动创建的,默认是 DirectChannel。但是,如果希望使用不同的通道实现,则需要显式地将通道声明为 bean 并在集成流中引用它。例如,要声明 PublishSubscribeChannel,需要声明以下 @Bean 方法:
@Bean
public MessageChannel orderChannel() {
return new PublishSubscribeChannel();
}
然后在集成流定义中通过名称引用这个通道。例如,如果一个服务 activator bean 正在使用这个通道,那么可以在 @ServiceActivator 的 inputChannel 属性中引用它:
@ServiceActovator(inputChannel="orderChannel")
或者,如果使用 Java DSL 配置方式,需要通过调用 channel() 方法引用它:
@Bean
public IntegrationFlow orderFlow() {
return IntegrationFlows
...
.channel("orderChannel")
...
.get();
}
需要注意的是,如果使用 QueueChannel,则必须为使用者配置一个轮询器。例如,假设已经声明了一个这样的 QueueChannel bean:
@Bean
public MessageChannel orderChannel() {
return new QueueChannel();
}
需要确保将使用者配置为轮询消息通道。在服务激活器的情况下,@ServiceActivator 注解可能是这样的:
@ServiceActivator(inputChannel="orderChannel",
poller=@Poller(fixedRate="1000"))
在本例中,服务激活器每秒(或 1,000 ms)从名为 orderChannel 的通道轮询一次。
9.2.2 过滤器
过滤器可以放置在集成管道的中间,以允许或不允许消息进入流中的下一个步骤。
例如,假设包含整数值的消息通过名为 numberChannel 的通道发布,但是只希望偶数传递到名为 evenNumberChannel 的通道。在这种情况下,可以使用 @Filter 注解声明一个过滤器,如下所示:
@Bean
public IntegrationFlow evenNumberFlow(AtomicInteger integerSource) {
return IntegrationFlows
...
.<Integer>filter((p) -> p % 2 == 0)
...
.get();
}
在本例中,使用 lambda 表达式实现过滤器。但是,事实上,filter() 方法是接收一个 GenericSelector 作为参数。这意味着可以实现 GenericSelector 接口,而不是引入一个简略的 lambda 表达式实现过滤。
9.2.3 转换器
转换器对消息执行一些操作,通常会产生不同的消息,并且可能会产生不同的负载类型。转换可以是一些简单的事情,例如对数字执行数学运算或操作 String 字符串值;转换也会很复杂,例如使用表示 ISBN 的 String 字符串值来查找并返回相应书籍的详细信息。
例如,假设正在一个名为 numberChannel 的通道上发布整数值,并且希望将这些数字转换为包含等效罗马数字的 String 字符串。在这种情况下,可以声明一个 GenericTransformer 类型的 bean,并添加 @Transformer 注解,如下所示:
@Bean
@Transformer(inputChannel="numberChannel", outputChannel="romanNumberChannel")
public GenericTransformer<Integer, String> romanNumTransformer() {
return RomanNubers::toRoman;
}
通过 @Transformer 注解将 bean 指定为 transformer bean,它从名为 numberChannel 的通道接收整数值,并使用 oRoman() (toRoman() 方法是在一个名为 RomanNumbers 的类中静态定义的,并在这里通过方法引用进行引)的静态方法进行转换,得到的结果被发布到名为 romanNumberChannel 的通道中。
在 Java DSL 配置风格中,调用 transform() 甚至更简单,将方法引用传递给 toRoman() 方法即可:
@Bean
public IntegrationFlow transformerFlow() {
return IntegrationFlows
...
.transform(RomanNumbers::toRoman)
...
.get();
}
虽然在两个 transformer 代码示例中都使用了方法引用,但是要知道 transformer 也可以使用 lambda 表达式。或者,如果 transformer 比较复杂,需要单独的成为一个 Java 类,可以将它作为 bean 注入流配置,并将引用传递给 transform() 方法:
@Bean
public RomanNumberTransformer romanNumberTransformer() {
return new RomanNumberTransformer();
}
@Bean
public IntegrationFlow transformerFlow(
RomanNumberTransformer romanNumberTransformer) {
return IntegrationFlows
...
.transform(romanNumberTransformer)
...
.get();
}
在这里,声明了一个 RomanNumberTransformer 类型的 bean,它本身是 Spring Integration 的 Transformer 或 GenericTransformer 接口的实现。bean 被注入到 transformerFlow() 方法,并在定义集成流时传递给 transform() 方法。
9.2.4 路由
基于某些路由标准的路由器允许在集成流中进行分支,将消息定向到不同的通道。
例如,假设有一个名为 numberChannel 的通道,整数值通过它流动。假设希望将所有偶数消息定向到一个名为 evenChannel 的通道,而将奇数消息定向到一个名为 oddChannel 的通道。要在集成流中创建这样的路由,可以声明一个 AbstractMessageRouter 类型的 bean,并使用 @Router 注解该 bean:
@Bean
@Router(inputChannel="numberChannel")
public AbstractMessageRouter evenOddRouter() {
return new AbstractMessageRouter() {
@Override
protected Collection<MessageChannel>
determineTargetChannels(Message<?> message) {
Integer number = (Integer) message.getPayload();
if (number % 2 == 0) {
return Collections.singleton(evenChannel());
}
return Collections.singleton(oddChannel());
}
};
}
@Bean
public MessageChannel evenChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel oddChannel() {
return new DirectChannel();
}
这里声明的 AbstractMessageRouter bean 接受来自名为 numberChannel 的输入通道的消息。定义为匿名内部类的实现检查消息有效负载,如果它是偶数,则返回名为 evenChannel 的通道(在路由器 bean 之后声明为 bean)。否则,通道有效载荷中的数字必须为奇数;在这种情况下,将返回名为 oddChannel 的通道(也在 bean 声明方法中声明)。
在 Java DSL 形式中,路由器是通过在流定义过程中调用 route() 来声明的,如下所示:
@Bean
public IntegrationFlow numberRoutingFlow(AtomicInteger source) {
return IntegrationFlows
...
.<Integer, String>route(n -> n%2==0 ? "EVEN":"ODD", mapping ->
mapping.subFlowMapping("EVEN", sf ->
sf.<Integer, Integer>transform(n -> n * 10).handle((i,h) -> { ... }))
.subFlowMapping("ODD", sf ->
sf.transform(RomanNumbers::toRoman).handle((i,h) -> { ... }))
)
.get();
}
虽然仍然可以声明 AbstractMessageRouter 并将其传递给 route(),但是本例使用 lambda 表达式来确定消息有效负载是奇数还是偶数。
如果是偶数,则返回一个偶数的字符串值。如果是奇数,则返回奇数。然后使用这些值来确定哪个子映射将处理消息。
9.2.5 Splitter
有时,在集成流中,将消息拆分为多个独立处理的消息可能很有用。Splitter 将为分割并处理这些消息。
Splitter 在很多情况下都很有用,但是有两个基本用例可以使用 Splitter:
- 消息有效载荷,包含单个消息有效载荷相同类型的项的集合。例如,携带产品列表的消息可能被分成多个消息,每个消息的有效负载是一个产品。
- 信息有效载荷,携带的信息虽然相关,但可以分为两种或两种以上不同类型的信息。例如,购买订单可能包含交付、帐单和行项目信息。交付细节可能由一个子流程处理,账单由另一个子流程处理,每一项则由另一个子流程处理。在这个用例中,Splitter 后面通常跟着一个路由器,它根据有效负载类型路由消息,以确保正确的子流处理数据。
当将消息有效负载拆分为两个或多个不同类型的消息时,通常只需定义一个 POJO 即可,该 POJO 提取传入的有效负载的各个部分,并将它们作为集合的元素返回。
例如,假设希望将携带购买订单的消息拆分为两条消息:一条携带账单信息,另一条携带项目列表。下面的 OrderSplitter 将完成这项工作:
public class OrderSplitter {
public Collection<Object> splitOrderIntoParts(PurchaseOrder po) {
ArrayList<Object> parts = new ArrayList<>();
parts.add(po.getBillingInfo());
parts.add(po.getLineItems());
return parts;
}
}
然后,可以使用 @Splitter 注解将 OrderSplitter bean 声明为集成流的一部分,如下所示:
@Bean
@Splitter(inputChannel="poChannel", outputChannel="splitOrderChannel")
public OrderSplitter orderSplitter() {
return new OrderSplitter();
}
在这里,购买订单到达名为 poChannel 的通道,并被 OrderSplitter 分割。然后,将返回集合中的每个项作为集成流中的单独消息发布到名为 splitOrderChannel 的通道。在流的这一点上,可以声明一个 PayloadTypeRouter 来将账单信息和项目,并路由到它们自己的子流:
@Bean
@Router(inputChannel="splitOrderChannel")
public MessageRouter splitOrderRouter() {
PayloadTypeRouter router = new PayloadTypeRouter();
router.setChannelMapping(
BillingInfo.class.getName(), "billingInfoChannel");
router.setChannelMapping(List.class.getName(), "lineItemsChannel");
return router;
}
顾名思义,PayloadTypeRouter 根据消息的有效负载类型将消息路由到不同的通道。按照这里的配置,将有效负载为类型为 BillingInfo 的消息路由到一个名为 billingInfoChannel 的通道进行进一步处理。至于项目信息,它们在 java.util.List 集合包中;因此,可以将 List 类型的有效负载映射到名为 lineItemsChannel 的通道中。
按照目前的情况,流分为两个子流:一个是 BillingInfo 对象流,另一个是 List
@Splitter(inputChannel="lineItemsChannel", outputChannel="lineItemChannel")
public List<LineItem> lineItemSplitter(List<LineItem> lineItems) {
return lineItems;
}
当携带 List
如果你想使用 Java DSL 来声明相同的 Splitter/Router 配置,你可以调用 split() 和 route():
return IntegrationFlows
...
.split(orderSplitter())
.<Object, String> route(p -> {
if (p.getClass().isAssignableFrom(BillingInfo.class)) {
return "BILLING_INFO";
} else {
return "LINE_ITEMS";
}
}, mapping ->
mapping.subFlowMapping("BILLING_INFO", sf ->
sf.<BillingInfo> handle((billingInfo, h) -> { ... }))
.subFlowMapping("LINE_ITEMS", sf ->
sf.split().<LineItem> handle((lineItem, h) -> { ... }))
)
.get();
流定义的 DSL 形式当然更简洁,如果不是更难于理解的话。它使用与 Java 配置示例相同的 OrderSplitter 来分割订单。在订单被分割之后,它被其类型路由到两个单独的子流。
9.2.6 服务激活器
服务激活器从输入信道接收消息并发送这些消息给的 MessageHandler。Spring 集成提供了多种的 MessageHandler 实现开箱即用(PayloadTypeRouter 就是 MessageHandler 的实现),但你会经常需要提供一些定制实现充当服务激活。作为一个例子,下面的代码说明了如何声明的 MessageHandler bean,构成为一个服务激活器:
@Bean
@ServiceActivator(inputChannel="someChannel")
public MessageHandler sysoutHandler() {
return message -> {
System.out.println("Message payload: " + message.getPayload());
};
}
通过 @ServiceActivator 注解 bean,将其指定为一个服务激活器,从所述信道处理消息命名 someChannel。至于 MessageHandler 的本身,它是通过一个 lambda 实现。虽然这是一个简单的 MessageHandler,给定的消息时,它发出其有效载荷的标准输出流。
另外,可以声明一个服务激活器,用于在返回一个新的有效载荷之前处理传入的消息。在这种情况下,这个 bean 应该是一个 GenericHandler 而非的 MessageHandler:
@Bean
@ServiceActivator(inputChannel="orderChannel", outputChannel="completeOrder")
public GenericHandler<Order> orderHandler(OrderRepository orderRepo) {
return (payload, headers) -> {
return orderRepo.save(payload);
};
}
在这种情况下,服务激活器是一个 GenericHandler,其中的有效载荷为 Order 类型。当订单到达,它是通过 repository 进行保存;保存 Order 后产生的结果被发送到名称为 completeChannel 的输出通道。
注意,GenericHandler 不仅给出了有效载荷,还有消息头(即使该示例不使用任何形式的头信息)。同时也可以通过传递了 MessageHandler 或 GenericHandler 去调用在流定义中的 handler() 方法,来使用在 Java DSL 配置式中的服务激活器:
public IntegrationFlow someFlow() {
return IntegrationFlows
...
.handle(msg -> {
System.out.println("Message payload: " + msg.getPayload());
})
.get();
}
在这种情况下,MessageHandler 是作为一个 lambda,但也可以将它作为一个参考方法甚至是一个类,它实现了 MessageHandler 接口。如果给它一个 lambda 或方法引用,要知道,它是接受一个消息作为参数。
类似地,如果服务激活器不是流的结束,handler() 可以写成接受 GenericHandler 参数。从之前应用订单存储服务激活器来看,可以使用 Java DSL 对流程进行配置:
public IntegrationFlow orderFlow(OrderRepository orderRepo) {
return IntegrationFlows
...
.<Order>handle((payload, headers) -> {
return orderRepo.save(payload);
})
...
.get();
}
当利用 GenericHandler 时,lambda 表达式或方法参考接受该消息的有效载荷和报头作为参数。另外,如果选择在一个流程的结束使用 GenericHandler,需要返回 null,否则会得到这表明有没有指定输出通道的错误。
9.2.7 网关
网关是通过一个应用程序可以将数据提交到一个集成信息流和接收这是该流的结果的响应的装置。通过 Spring Integration 实现的,网关是实现为应用程序可以调用将消息发送到集成信息流的接口。
你已经见过 FileWriterGateway 消息网关的例子。FileWriterGateway 是单向网关,它的方法接受 String 作为参数,将其写入到文件中,返回 void。同样,编写一个双向网关也很容易。当写网关接口时,确保该方法返回某个值发布到集成流程即可。
作为一个例子,假设一个网关处理接受一个 String 的简单集成信息流,并把特定的 String 转成大写。网关接口可能是这个样子:
package com.example.demo;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.stereotype.Component;
@Component
@MessagingGateway(defaultRequestChannel="inChannel", defaultReplyChannel="outChannel")
public interface UpperCaseGateway {
String uppercase(String in);
}
令人惊叹的是,没有必要实现这个接口。Spring Integration 自动提供运行时实现,这个实现会使用特定的通道进行数据的发送与接收。
当 uppercase() 被调用时,给定的 String 被发布到名为 inChannel 的集成流通道中。而且,不管流是如何定义的或是它是做什么的,在当数据到达名为 outChannel 通道时,它从 uppercase() 方法中返回。
至于 uppercase 集成流,它只有一个单一的步骤,把 String 转换为大写一个简单的集成流。以下是使用 Java DSL 配置:
@Bean
public IntegrationFlow uppercaseFlow() {
return IntegrationFlows
.from("inChannel")
.<String, String> transform(s -> s.toUpperCase())
.channel("outChannel")
.get();
}
正如这里所定义的,流程启动于名为 inChannel 的通道获得数据输入的时候。然后消息的有效负载通过转换器去执行变成大写字母的操作,这里的操作都使用 lambda 表达式进行定义。消息的处理结果被发布到名为 outChannel 的通道中,这个通道就是已经被声明为 UpperCaseGateway 接口的答复通道。
9.2.8 通道适配器
通道适配器代表集成信息流的入口点和出口点。数据通过入站信道适配器的方式进入到集成流中,通过出站信道适配器的方式离开集成流。
入站信道的适配器可以采取多种形式,这取决于它们引入到流的数据源。例如,声明一个入站通道适配器,它采用从 AtomicInteger 到流递增的数字。使用 Java 配置,它可能是这样的:
@Bean
@InboundChannelAdapter(
poller=@Poller(fixedRate="1000"), channel="numberChannel")
public MessageSource<Integer> numberSource(AtomicInteger source) {
return () -> {
return new GenericMessage<>(source.getAndIncrement());
};
}
此 @Bean 方法声明了一个入站信道适配器 bean,后面跟随着 @InboundChannelAdapter 注解,它们每 1 秒(1000 ms)从注入的 AtomicInteger 提交一个数字到名 numberChannel 的通道中。
当使用 Java 配置时,@InboundChannelAdapter 意味着是一个入站通道适配器,from() 方法就是使用 Java DSL 来定义流的时候,表明它是怎么处理的。下面对于流定义的一个片段展示了在 Java DSL 配置中类似的输入通道适配器:
@Bean
public IntegrationFlow someFlow(AtomicInteger integerSource) {
return IntegrationFlows
.from(integerSource, "getAndIncrement",
c -> c.poller(Pollers.fixedRate(1000)))
...
.get();
}
通常情况下,通道适配器通过的 Spring Integration 的多端点模块之一进行提供。举个例子,假设需要一个入站通道适配器,用它来监视指定的目录,同时将任何写入到那个目录中的文件作为消息,提交到名为 file-channel 的通道中。下面的 Java 配置使用 FileReadingMessageSource 从 Spring Integration 的文件端点模块来实现这一目标:
@Bean
@InboundChannelAdapter(channel="file-channel",
poller=@Poller(fixedDelay="1000"))
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource sourceReader = new FileReadingMessageSource();
sourceReader.setDirectory(new File(INPUT_DIR));
sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN));
return sourceReader;
}
当在 Java DSL 中写入同样的 file-reading 入站通道适配器时,来自 Files 类的 inboundAdapter() 方法达到的同样的目的。出站通道适配器位于集成信息流的最后位置,将最终消息扇出到应用程序或是其他系统中:
@Bean
public IntegrationFlow fileReaderFlow() {
return IntegrationFlows
.from(Files.inboundAdapter(new File(INPUT_DIR))
.patternFilter(FILE_PATTERN))
.get();
}
服务激活器(作为消息处理的实现)往往是为出站通道适配器而存在的,特别是当数据需要被扇出到应用程序本身的时候。
值得一提的,Spring Integration 的端点模块为几种常见的用例提供了有用的消息处理程序。如在程序清单 9.3 中所看到的 FileWritingMessageHandler 出站通道适配器,这就是一个很好的例子。说到 Spring Integration 端点模块,让我们快速浏览一下准备使用的集成端点模块。
9.2.9 端点模块
Spring Integration 可以让你创建自己的通道适配器,这是很棒的。但是,更棒的是 Spring Integration 提供了包含通道超过两打的端点模块适配器,包括入站和出站,用于与各种常用外部系统进行集成,如表 9.1 所示。
模块 | 依赖的 Artifact ID |
---|---|
AMQP | spring-integration-amqp |
Spring application events | spring-integration-event |
RSS and Atom | spring-integration-feed |
Filesystem | spring-integration-file |
FTP/FTPS | spring-integration-ftp |
GemFire | spring-integration-gemfire |
HTTP | spring-integration-http |
JDBC | spring-integration-jdbc |
JPA | spring-integration-jpa |
JMS | spring-integration-jms |
spring-integration-mail | |
MongoDB | spring-integration-mongodb |
MQTT | spring-integration-mqtt |
Redis | spring-integration-redis |
RMI | spring-integration-rmi |
SFTP | spring-integration-sftp |
STOMP | spring-integration-stomp |
Stream | spring-integration-stream |
Syslog | spring-integration-syslog |
TCP/UDP | spring-integration-ip |
spring-integration-twitter | |
Web | Services spring-integration-ws |
WebFlux | spring-integration-webflux |
WebSocket | spring-integration-websocket |
XMPP | spring-integration-xmpp |
ZooKeeper | spring-integration-zookeeper |
从表 9.1 可以清楚的看出 Spring Integration 提供了一套广泛的组件,以满足众多集成的需求。大多数应用程序一点都不会用到 Spring Integration 提供的功能。但是,如果你需要它们,很好,Spring Integration 几乎都能覆盖到。
更重要的是,本章在表 9.1 中列出模块,不可能涵盖提供的所有通道适配器。你已经看到,使用文件系统模块写入到文件系统的例子。而你很快就要使用电子邮件模块读取电子邮件。
每个端点模块提供通道适配器,当使用 Java 配置时,可以被声明为 bean,当时应 Java DSL 配置时,可以通过静态方法进行引用。鼓励你去探索你最感兴趣的任何端点模块。你会发现它们的使用方法相当一致。但现在,让我们把关注点转向电子邮件端点模块,看看在 Taco Cloud 应用程序中如何使用它。