Spring框架为与消息传递系统集成提供了广泛的支持,从JMS API的简化使用JmsTemplate到完整的异步接收消息的基础架构。Spring AMQP为高级消息队列协议提供了类似的功能集。Spring Boot还为RabbitTemplate和RabbitMQ提供了自动配置选项。Spring WebSocket本身包括对STOMP消息的支持,而Spring Boot通过启动程序和少量的自动配置对此提供了支持。Spring Boot还支持Apache Kafka。

14.1 JMS

javax.jms.ConnectionFactory界面提供了一种javax.jms.Connection用于创建与JMS代理进行交互的的标准方法。尽管Spring需要ConnectionFactory使用JMS来工作,但是您通常不需要自己直接使用它,而可以依赖于更高级别的消息抽象。(有关详细信息,请参见Spring Framework参考文档的相关部分。)Spring Boot还会自动配置必要的基础结构来发送和接收消息。

14.1.1 ActiveMQ支持

ActiveMQ在类路径中可用时,Spring Boot也可以配置ConnectionFactory。如果存在代理,则将自动启动和配置嵌入式代理(前提是未通过配置指定代理URL)。

如果使用spring-boot-starter-activemq,则提供了连接或嵌入ActiveMQ实例的必要依赖关系,以及与JMS集成的Spring基础结构。

ActiveMQ配置由中的外部配置属性控制spring.activemq.*。例如,您可以在中声明以下部分application.properties
物产
Yaml

  1. spring.activemq.broker-url=tcp://192.168.1.210:9876
  2. spring.activemq.user=admin
  3. spring.activemq.password=secret

默认情况下,a用合理的设置CachingConnectionFactory包装本机ConnectionFactory,您可以通过以下方式中的外部配置属性来控制这些设置spring.jms.*
物产
Yaml

  1. spring.jms.cache.session-cache-size=5

如果您希望使用本机池,则可以通过向其添加依赖项org.messaginghub:pooled-jms并进行相应的配置来实现JmsPoolConnectionFactory,如以下示例所示:
物产
Yaml

  1. spring.activemq.pool.enabled=true
  2. spring.activemq.pool.max-connections=50
请参阅ActiveMQProperties以获取更多受支持的选项。您还可以注册任意数量的Bean,以实现ActiveMQConnectionFactoryCustomizer更高级的自定义。

默认情况下,ActiveMQ将创建一个目的地(如果目的地尚不存在),以便根据其提供的名称来解析目的地。

14.1.2 Artemis支持

当Spring BootConnectionFactory检测到Artemis在类路径中可用时,可以自动配置a 。如果存在代理,则将自动启动和配置嵌入式代理(除非已明确设置mode属性)。支持的模式是embedded(明确表示需要嵌入式代理,并且如果代理在类路径上不可用,则会发生错误)和native(使用netty传输协议连接到代理)。配置后者后,Spring BootConnectionFactory将使用默认设置配置一个连接到在本地计算机上运行的代理的代理。

如果使用spring-boot-starter-artemis,则提供了连接到现有Artemis实例所需的依赖关系,以及与JMS集成的Spring基础结构。添加org.apache.activemq:artemis-jms-server到您的应用程序可以让您使用嵌入式模式。

Artemis配置由中的外部配置属性控制spring.artemis.*。例如,您可以在中声明以下部分application.properties
物产
Yaml

  1. spring.artemis.mode=native
  2. spring.artemis.host=192.168.1.210
  3. spring.artemis.port=9876
  4. spring.artemis.user=admin
  5. spring.artemis.password=secret

嵌入代理时,可以选择是否要启用持久性并列出应使其可用的目的地。可以将它们指定为以逗号分隔的列表,以使用默认选项创建它们,或者您可以分别为高级队列和主题配置定义类型为org.apache.activemq.artemis.jms.server.config.JMSQueueConfiguration或的bean org.apache.activemq.artemis.jms.server.config.TopicConfiguration
默认情况下,a用合理的设置CachingConnectionFactory包装本机ConnectionFactory,您可以通过以下方式中的外部配置属性来控制这些设置spring.jms.*
物产
Yaml

  1. spring.jms.cache.session-cache-size=5

如果您希望使用本机池,则可以通过向其添加依赖项org.messaginghub:pooled-jms并进行相应的配置来实现JmsPoolConnectionFactory,如以下示例所示:
物产
Yaml

  1. spring.artemis.pool.enabled=true
  2. spring.artemis.pool.max-connections=50

请参阅ArtemisProperties以获取更多受支持的选项。
不涉及JNDI查找,并且使用nameArtemis配置中的属性或通过配置提供的名称来根据目的地名称解析目的地。

14.1.3 使用JNDI ConnectionFactory

如果您正在应用程序服务器中运行应用程序,Spring Boot会尝试ConnectionFactory使用JNDI来查找JMS 。默认情况下,java:/JmsXAjava:/XAConnectionFactory处于选中状态。spring.jms.jndi-name如果需要指定替代位置,则可以使用该属性,如以下示例所示:
物产
Yaml

  1. spring.jms.jndi-name=java:/MyConnectionFactory

14.1.4 发送信息

SpringJmsTemplate是自动配置的,您可以将其直接自动连接到自己的bean中,如以下示例所示:

  1. import org.springframework.beans.factory.annotation.Autowired;
  2. import org.springframework.jms.core.JmsTemplate;
  3. import org.springframework.stereotype.Component;
  4. @Component
  5. public class MyBean {
  6. private final JmsTemplate jmsTemplate;
  7. @Autowired
  8. public MyBean(JmsTemplate jmsTemplate) {
  9. this.jmsTemplate = jmsTemplate;
  10. }
  11. // ...
  12. }
JmsMessagingTemplate可以以类似的方式注入。如果定义了aDestinationResolverMessageConverterbean,则将其自动关联到auto-configured JmsTemplate

14.1.5 接收讯息

存在JMS基础结构时,可以对任何bean进行注释@JmsListener以创建侦听器端点。如果JmsListenerContainerFactory未定义,则会自动配置一个默认值。如果定义了aDestinationResolverMessageConverterbean,它将自动关联到默认工厂。
默认情况下,默认工厂是事务性的。如果您在JtaTransactionManager存在a的基础结构中运行,则默认情况下它将与侦听器容器关联。如果不是,sessionTransacted则启用该标志。在后一种情况下,可以通过添加@Transactional侦听器方法(或其委托)将本地数据存储事务与传入消息的处理相关联。这样可以确保本地事务完成后,传入消息得到确认。这还包括发送已在同一JMS会话上执行的响应消息。
以下组件在someQueue目标上创建侦听器端点:

  1. @Component
  2. public class MyBean {
  3. @JmsListener(destination = "someQueue")
  4. public void processMessage(String content) {
  5. // ...
  6. }
  7. }
有关更多详细信息,请参见的Javadoc@EnableJms

如果您需要创建更多JmsListenerContainerFactory实例,或者想要覆盖默认实例,Spring Boot提供了一个DefaultJmsListenerContainerFactoryConfigurer可用于初始化的实例DefaultJmsListenerContainerFactory,其设置与自动配置的实例相同。
例如,以下示例公开了另一个使用特定工厂的工厂MessageConverter

  1. @Configuration(proxyBeanMethods = false)
  2. static class JmsConfiguration {
  3. @Bean
  4. public DefaultJmsListenerContainerFactory myFactory(
  5. DefaultJmsListenerContainerFactoryConfigurer configurer) {
  6. DefaultJmsListenerContainerFactory factory =
  7. new DefaultJmsListenerContainerFactory();
  8. configurer.configure(factory, connectionFactory());
  9. factory.setMessageConverter(myMessageConverter());
  10. return factory;
  11. }
  12. }

然后,您可以@JmsListener按以下任何带注释的方法使用工厂:

  1. @Component
  2. public class MyBean {
  3. @JmsListener(destination = "someQueue", containerFactory="myFactory")
  4. public void processMessage(String content) {
  5. // ...
  6. }
  7. }

14.2 AMQP

高级消息队列协议(AMQP)是面向消息中间件的与平台无关的有线级别协议。Spring AMQP项目将Spring的核心概念应用于基于AMQP的消息传递解决方案的开发。Spring Boot为通过RabbitMQ使用AMQP提供了许多便利,包括spring-boot-starter-amqp“启动器”。

14.2.1 RabbitMQ支持

RabbitMQ是基于AMQP协议的轻型,可靠,可伸缩和便携式消息代理。Spring用于RabbitMQ通过AMQP协议进行通信。
RabbitMQ配置由中的外部配置属性控制spring.rabbitmq.*。例如,您可以在中声明以下部分application.properties
物产
Yaml

  1. spring.rabbitmq.host=localhost
  2. spring.rabbitmq.port=5672
  3. spring.rabbitmq.username=admin
  4. spring.rabbitmq.password=secret

另外,您可以使用addresses属性配置相同的连接:
物产
Yaml

  1. spring.rabbitmq.addresses=amqp://admin:secret@localhost
以这种方式指定地址时,hostport属性将被忽略。如果地址使用amqps协议,则会自动启用SSL支持。

如果ConnectionNameStrategy上下文中存在bean,它将自动用于命名由auto-configured创建的连接ConnectionFactory。请参阅RabbitProperties以获取更多受支持的选项。

有关更多详细信息请参阅了解RabbitMQ使用的协议AMQP

14.2.2 发送信息

Spring的AmqpTemplateAmqpAdmin是自动配置的,您可以将它们直接自动连接到自己的bean中,如以下示例所示:

  1. import org.springframework.amqp.core.AmqpAdmin;
  2. import org.springframework.amqp.core.AmqpTemplate;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. public class MyBean {
  7. private final AmqpAdmin amqpAdmin;
  8. private final AmqpTemplate amqpTemplate;
  9. @Autowired
  10. public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
  11. this.amqpAdmin = amqpAdmin;
  12. this.amqpTemplate = amqpTemplate;
  13. }
  14. // ...
  15. }
RabbitMessagingTemplate可以以类似的方式注入。如果MessageConverter定义了bean,它将自动关联到auto-configured AmqpTemplate

如有必要,任何org.springframework.amqp.core.Queue定义为bean的对象都会自动用于在RabbitMQ实例上声明相应的队列。
要重试操作,您可以启用重试AmqpTemplate(例如,在代理连接丢失的情况下):
物产
Yaml

  1. spring.rabbitmq.template.retry.enabled=true
  2. spring.rabbitmq.template.retry.initial-interval=2s

默认情况下,重试是禁用的。您也可以RetryTemplate通过声明RabbitRetryTemplateCustomizerbean来以编程方式自定义。
如果您需要创建更多RabbitTemplate实例,或者想要覆盖默认实例,Spring Boot提供了一个RabbitTemplateConfigurerbean,您可以使用它初始化RabbitTemplate与自动配置所使用的工厂相同的设置。

14.2.3 接收讯息

存在Rabbit基础结构时,可以对任何bean进行注释@RabbitListener以创建侦听器端点。如果RabbitListenerContainerFactory未定义,SimpleRabbitListenerContainerFactory则会自动配置一个默认值,您可以使用该spring.rabbitmq.listener.type属性切换到直接容器。如果定义了aMessageConverterMessageRecovererbean,它将自动与默认工厂关联。
以下示例组件在someQueue队列上创建一个侦听器端点:

  1. @Component
  2. public class MyBean {
  3. @RabbitListener(queues = "someQueue")
  4. public void processMessage(String content) {
  5. // ...
  6. }
  7. }
有关更多详细信息,请参见的Javadoc@EnableRabbit

如果您需要创建更多RabbitListenerContainerFactory实例,或者想覆盖默认实例,Spring Boot提供了一个SimpleRabbitListenerContainerFactoryConfigurerDirectRabbitListenerContainerFactoryConfigurer,您可以使用与自动配置所使用的工厂相同的设置来初始化一个SimpleRabbitListenerContainerFactoryDirectRabbitListenerContainerFactory

选择哪种容器都没有关系。这两个bean通过自动配置公开。

例如,以下配置类公开了另一个使用特定工厂的工厂MessageConverter

  1. @Configuration(proxyBeanMethods = false)
  2. static class RabbitConfiguration {
  3. @Bean
  4. public SimpleRabbitListenerContainerFactory myFactory(
  5. SimpleRabbitListenerContainerFactoryConfigurer configurer) {
  6. SimpleRabbitListenerContainerFactory factory =
  7. new SimpleRabbitListenerContainerFactory();
  8. configurer.configure(factory, connectionFactory);
  9. factory.setMessageConverter(myMessageConverter());
  10. return factory;
  11. }
  12. }

然后,您可以按任何带@RabbitListener注释的方法使用工厂,如下所示:

  1. @Component
  2. public class MyBean {
  3. @RabbitListener(queues = "someQueue", containerFactory="myFactory")
  4. public void processMessage(String content) {
  5. // ...
  6. }
  7. }

您可以启用重试来处理侦听器引发异常的情况。默认情况下RejectAndDontRequeueRecoverer使用,但是您可以定义MessageRecoverer自己的a。重试用尽后,如果代理配置为这样做,则消息将被拒绝并被丢弃或路由到死信交换。默认情况下,重试是禁用的。您也可以RetryTemplate通过声明RabbitRetryTemplateCustomizerbean来以编程方式自定义。

默认情况下,如果禁用了重试,并且侦听器抛出异常,则会无限期地重试传递。您可以通过两种方式修改此行为:将defaultRequeueRejected属性设置为,false以便尝试进行零次重新传递,或者抛出AmqpRejectAndDontRequeueException来指示应该拒绝该消息。后者是启用重试并达到最大传递尝试次数时使用的机制。

14.3 Apache Kafka支持

通过提供spring-kafka项目的自动配置来支持Apache Kafka
Kafka配置由中的外部配置属性控制spring.kafka.*。例如,您可以在中声明以下部分application.properties
物产
Yaml

  1. spring.kafka.bootstrap-servers=localhost:9092
  2. spring.kafka.consumer.group-id=myGroup
要在启动时创建主题,请添加类型为的Bean NewTopic。如果该主题已经存在,则将忽略Bean。

请参阅KafkaProperties以获取更多受支持的选项。

14.3.1 发送信息

SpringKafkaTemplate是自动配置的,您可以直接在自己的Bean中自动对其进行布线,如以下示例所示:

  1. @Component
  2. public class MyBean {
  3. private final KafkaTemplate kafkaTemplate;
  4. @Autowired
  5. public MyBean(KafkaTemplate kafkaTemplate) {
  6. this.kafkaTemplate = kafkaTemplate;
  7. }
  8. // ...
  9. }
如果spring.kafka.producer.transaction-id-prefix定义了属性,KafkaTransactionManager则会自动配置a。另外,如果RecordMessageConverter定义了bean,它将自动关联到auto-configured KafkaTemplate

14.3.2 接收讯息

存在Apache Kafka基础结构时,可以对任何bean进行注释@KafkaListener以创建侦听器端点。如果KafkaListenerContainerFactory尚未定义,则会使用中定义的键自动配置默认值spring.kafka.listener.*
以下组件在该someTopic主题上创建一个侦听器端点:

  1. @Component
  2. public class MyBean {
  3. @KafkaListener(topics = "someTopic")
  4. public void processMessage(String content) {
  5. // ...
  6. }
  7. }

如果KafkaTransactionManager定义了bean,它将自动关联到容器工厂。类似地,如果RecordFilterStrategyErrorHandlerAfterRollbackProcessorConsumerAwareRebalanceListener豆被定义,它被自动关联为出厂默认。
根据侦听器的类型,aRecordMessageConverterBatchMessageConverterbean与默认工厂关联。如果RecordMessageConverter对于批处理侦听器仅存在一个bean,则将其包装在中BatchMessageConverter

ChainedKafkaTransactionManager必须标记 一个自定义,@Primary因为它通常引用自动配置的KafkaTransactionManagerbean。

14.3.3 卡夫卡流

用于Apache Kafka的Spring提供了一个工厂bean来创建StreamsBuilder对象并管理其流的生命周期。KafkaStreamsConfiguration只要kafka-streams在类路径上存在,Spring Boot就会自动配置所需的bean,并通过@EnableKafkaStreams注释启用Kafka Streams 。
启用Kafka Streams意味着必须设置应用程序ID和引导服务器。可以使用来配置前者spring.kafka.streams.application-idspring.application.name如果未设置,则默认为。后者可以全局设置,也可以仅针对流覆盖。
使用专用属性可以使用几个其他属性。可以使用spring.kafka.streams.properties名称空间设置其他任意Kafka属性。另请参见其他Kafka属性
要使用工厂bean,StreamsBuilder@Bean按照以下示例所示将其连接到您的计算机中:

  1. @Configuration(proxyBeanMethods = false)
  2. @EnableKafkaStreams
  3. public static class KafkaStreamsExampleConfiguration {
  4. @Bean
  5. public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
  6. KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
  7. stream.map((k, v) -> new KeyValue<>(k, v.toUpperCase())).to("ks1Out",
  8. Produced.with(Serdes.Integer(), new JsonSerde<>()));
  9. return stream;
  10. }
  11. }

默认情况下,由StreamBuilder它创建的对象管理的流将自动启动。您可以使用spring.kafka.streams.auto-startup属性来自定义此行为。

14.3.4 卡夫卡的其他属性

自动配置支持的属性显示在appendix-application-properties.html中。请注意,在大多数情况下,这些属性(连字符或camelCase)直接映射到Apache Kafka点缀属性。有关详细信息,请参阅Apache Kafka文档。
这些属性的前几个属性适用于所有组件(生产者,使用者,管理员和流),但如果您希望使用不同的值,则可以在组件级别上指定。Apache Kafka会指定重要性为HIGH,MEDIUM或LOW的属性。Spring Boot自动配置支持所有HIGH重要性属性,一些选定的MEDIUM和LOW属性以及任何没有默认值的属性。
KafkaProperties该类直接提供了Kafka支持的属性的子集。如果希望使用不直接支持的其他属性来配置生产者或使用者,请使用以下属性:
物产
Yaml

  1. spring.kafka.properties[prop.one]=first
  2. spring.kafka.admin.properties[prop.two]=second
  3. spring.kafka.consumer.properties[prop.three]=third
  4. spring.kafka.producer.properties[prop.four]=fourth
  5. spring.kafka.streams.properties[prop.five]=fifth

这将公共prop.oneKafka属性设置为first(适用于生产者,消费者和管理员),将prop.twoadmin属性设置为second,将prop.three消费者属性设置为third,将prop.four生产者属性设置为fourth并将prop.fivestream属性设置为fifth
您还可以JsonDeserializer如下配置Spring Kafka :
物产
Yaml

  1. spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
  2. spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
  3. spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example,org.acme

同样,您可以禁用JsonSerializer在标头中发送类型信息的默认行为:
物产
Yaml

  1. spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
  2. spring.kafka.producer.properties[spring.json.add.type.headers]=false
以这种方式设置的属性将覆盖Spring Boot显式支持的任何配置项。

14.3.5 使用嵌入式Kafka进行测试

Spring for Apache Kafka提供了一种使用嵌入式Apache Kafka代理测试项目的便捷方法。要使用此功能,请@EmbeddedKafkaspring-kafka-test模块中用注释测试类。有关更多信息,请参阅Spring for Apache Kafka参考手册
要使Spring Boot自动配置与上述嵌入式Apache Kafka代理一起使用,您需要将嵌入式代理地址(由填充EmbeddedKafkaBroker)的系统属性重新映射到Apache Kafka的Spring Boot配置属性中。有几种方法可以做到这一点:

  • 提供系统属性以将嵌入式代理地址映射到spring.kafka.bootstrap-servers测试类中:

    1. static {
    2. System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
    3. }
  • @EmbeddedKafka注释上配置属性名称:

    1. @EmbeddedKafka(topics = "someTopic",
    2. bootstrapServersProperty = "spring.kafka.bootstrap-servers")
  • 在配置属性中使用占位符:

物产
Yaml

  1. spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}