Spring框架为与消息传递系统的集成提供了广泛的支持,从使用JmsTemplate
的JMS API的简化使用到用于异步接收消息的完整基础设施。Spring AMQP为高级消息队列协议提供了类似的功能集。Spring Boot还提供了RabbitTemplate
和RabbitMQ的自动配置选项。Spring WebSocket本身就包括对STOMP消息传递的支持,而Spring Boot则通过启动器和少量的自动配置来支持这一点。Spring Boot也支持阿帕奇·卡夫卡。
1. JMS
javax.jms.ConnectionFactory
接口提供了创建用于与JMS代理交互的javax.jms.Connection
的标准方法。尽管Spring需要ConnectionFactory
来使用JMS,但您通常不需要直接使用它,而是可以依赖更高级别的消息传递抽象。(有关详细信息,请参阅Spring框架参考文档的相关部分。)Spring Boot还自动配置发送和接收消息所需的基础设施。
1.1. ActiveMQ Artemis Support
当Spring Boot检测到类路径上有ActiveMQ Artemis时,它可以自动配置ConnectionFactory
。如果存在代理,则会自动启动和配置嵌入的代理(除非显式设置了模式属性)。支持的模式是Embedded
(明确表示需要嵌入式代理,如果类路径上没有可用的代理,则会出现错误)和原生
(使用netty
传输协议连接到代理)。当配置后者时,Spring Boot使用默认设置配置一个ConnectionFactory
,它连接到在本地机器上运行的代理。
If you use spring-boot-starter-artemis , the necessary dependencies to connect to an existing ActiveMQ Artemis instance are provided, as well as the Spring infrastructure to integrate with JMS. Adding org.apache.activemq:artemis-jakarta-server to your application lets you use embedded mode. |
ActiveMQ Artemis配置由spring.artemis.*
中的外部配置属性控制。例如,您可以在Applation.Properties
中声明以下部分:
spring.artemis.mode=native
spring.artemis.broker-url=tcp://192.168.1.210:9876
spring.artemis.user=admin
spring.artemis.password=secret
在嵌入代理时,您可以选择是否要启用持久性,并列出应该可用的目的地。可以将它们指定为逗号分隔的列表,以使用默认选项创建它们,或者您可以分别为高级队列和主题配置定义org.apache.activemq.artemis.jms.server.config.JMSQueueConfiguration
或org.apache.activemq.artemis.jms.server.config.TopicConfiguration
,类型的Bean。
默认情况下,CachingConnectionFactory
使用您可以通过spring.jms.*
中的外部配置属性控制的合理设置包装本机ConnectionFactory
:
spring.jms.cache.session-cache-size=5
如果您更愿意使用原生池,可以通过添加对org.MessagingHub:pooled-jms
的依赖并相应地配置JmsPoolConnectionFactory
来实现,如下例所示:
spring.artemis.pool.enabled=true
spring.artemis.pool.max-connections=50
有关更多支持的选项,请参阅ArtemisProperties
。
不涉及JNDI查找,使用Artemis配置中的name
属性或通过配置提供的名称解析目的地。
1.2. Using a JNDI ConnectionFactory
如果您在应用程序服务器上运行应用程序,Spring Boot会尝试使用JNDI定位JMSConnectionFactory
。默认情况下,选中Java:/JmsXA
和Java:/XAConnectionFactory
位置。如果需要指定备用位置,可以使用spring.jms.jndi-name
属性,如下例所示:
spring.jms.jndi-name=java:/MyConnectionFactory
1.3. Sending a Message
Spring的JmsTemplate
是自动配置的,您可以直接将其自动绑定到您自己的Bean中,如下例所示:
@Component public class MyBean { private final JmsTemplate jmsTemplate; public MyBean(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } }
JmsMessagingTemplate can be injected in a similar manner. If a DestinationResolver or a MessageConverter bean is defined, it is associated automatically to the auto-configured JmsTemplate . |
1.4. Receiving a Message
当存在JMS基础设施时,任何Bean都可以用@JmsListener
注释来创建侦听器端点。如果没有定义JmsListenerContainerFactory
,则会自动配置一个默认的。如果定义了DestinationResolver
、MessageConverter
或javax.jms.ExceptionListener
Bean,它们将自动与默认工厂关联。
默认情况下,默认工厂是事务性的。如果您在存在JtaTransactionManager
的基础设施中运行,则默认情况下,它与侦听器容器相关联。如果不是,则启用sessionTransated
标志。在后一种情况下,通过在侦听器方法(或其委托)上添加@Transaction,可以将本地数据存储事务与传入消息处理相关联。这确保了在本地事务完成后确认传入的消息。这还包括发送已在同一JMS会话上执行的响应消息。
以下组件在omeQueue
目标上创建侦听器终结点:
@Component public class MyBean { @JmsListener(destination = "someQueue") public void processMessage(String content) { // ... } }
See the Javadoc of @EnableJms for more details. |
如果您需要创建更多JmsListenerContainerFactory
实例,或者如果您想要覆盖缺省值,Spring Boot提供了一个DefaultJmsListenerContainerFactoryConfigurer
,您可以使用它来使用与自动配置相同的设置来初始化DefaultJmsListenerContainerFactory
。
例如,下面的示例公开了另一个使用特定MessageConverter
的工厂:
@Configuration(proxyBeanMethods = false) public class MyJmsConfiguration { @Bean public DefaultJmsListenerContainerFactory myFactory(DefaultJmsListenerContainerFactoryConfigurer configurer) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); ConnectionFactory connectionFactory = getCustomConnectionFactory(); configurer.configure(factory, connectionFactory); factory.setMessageConverter(new MyMessageConverter()); return factory; } private ConnectionFactory getCustomConnectionFactory() { return ... } }
然后,您可以在任何@JmsListener
注释的方法中使用工厂,如下所示:
@Component public class MyBean { @JmsListener(destination = "someQueue", containerFactory = "myFactory") public void processMessage(String content) { // ... } }
2. AMQP
高级消息队列协议(AMQP)是面向消息中间件的平台中立、线路级协议。Spring AMQP项目将核心的Spring概念应用于基于AMQP的消息传递解决方案的开发。Spring Boot为通过RabbitMQ使用AMQP提供了几个便利,包括Spring-ot-starter-amqp
“starter”。
2.1. RabbitMQ Support
RabbitMQ是一个基于AMQP协议的轻量级、可靠、可扩展和可移植的消息代理。Spring使用RabbitMQ
通过AMQP协议进行通信。
RabbitMQ配置由spring.rabbitmq.*
中的外部配置属性控制。例如,您可以在Applation.Properties
中声明以下部分:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
或者,您可以使用Addresses
属性配置相同的连接:
spring.rabbitmq.addresses=amqp://admin:secret@localhost
When specifying addresses that way, the host and port properties are ignored. If the address uses the amqps protocol, SSL support is enabled automatically. |
有关支持的更多属性配置选项,请参阅RabbitProperties
。要配置Spring AMQP使用的RabbitMQConnectionFactory
的低层详细信息,请定义一个ConnectionFactoryCustomizer
Bean。
如果上下文中存在ConnectionNameStrategy
Bean,它将自动用于命名由自动配置的CachingConnectionFactory
创建的连接。
See Understanding AMQP, the protocol used by RabbitMQ for more details. |
2.2. Sending a Message
Spring的AmqpTemplate
和AmqpAdmin
是自动配置的,您可以直接将它们自动绑定到您自己的Bean中,如下例所示:
@Component public class MyBean { private final AmqpAdmin amqpAdmin; private final AmqpTemplate amqpTemplate; public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) { this.amqpAdmin = amqpAdmin; this.amqpTemplate = amqpTemplate; } }
RabbitMessagingTemplate can be injected in a similar manner. If a MessageConverter bean is defined, it is associated automatically to the auto-configured AmqpTemplate . |
如有必要,定义为org.springframework.amqp.core.Queue
的任何Bean将自动用于在RabbitMQ实例上声明相应的队列。
要重试操作,您可以在AmqpTemplate
上启用重试(例如,在Broker连接丢失的情况下):
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
默认情况下禁用重试。您还可以通过声明RabbitRetryTemplateCustomizer
Bean,以编程方式定制RetryTemplate
。
如果您需要创建更多RabbitTemplate
实例,或者如果您想覆盖默认设置,Spring Boot提供了一个RabbitTemplateConfigurer
Bean,您可以使用该Bean来初始化RabbitTemplate
,其设置与自动配置使用的工厂相同。
2.3. Sending a Message To A Stream
要将消息发送到特定流,请指定流的名称,如下例所示:
spring.rabbitmq.stream.name=my-stream
如果定义了MessageConverter
、StreamMessageConverter
或ProducerCustomizer
Bean,它会自动关联到自动配置的RabbitStreamTemplate
。
如果您需要创建更多的RabbitStreamTemplate
实例,或者如果您想覆盖默认设置,Spring Boot提供了一个RabbitStreamTemplateConfigurer
Bean,您可以使用它来初始化RabbitStreamTemplate
,其设置与自动配置使用的工厂相同。
2.4. Receiving a Message
当存在Rabbit基础设施时,任何Bean都可以用@RabbitListener
注释来创建侦听器端点。如果没有定义<SimpleRabbitListenerContainerFactory
>RabbitListenerContainerFactory,则会自动配置一个默认类型,您可以使用spring.rabbitmq.listener.type
属性切换到直接容器。如果定义了MessageConverter
或MessageRecoverer
Bean,它将自动与默认工厂关联。
以下示例组件在omeQueue
队列上创建侦听器终结点:
@Component public class MyBean { @RabbitListener(queues = "someQueue") public void processMessage(String content) { // ... } }
See the Javadoc of @EnableRabbit for more details. |
如果您需要创建更多RabbitListenerContainerFactory
实例,或者如果您想覆盖默认设置,Spring Boot提供了一个SimpleRabbitListenerContainerFactoryConfigurer
和一个DirectRabbitListenerContainerFactoryConfigurer
,您可以使用它们来初始化SimpleRabbitListenerContainerFactory
和DirectRabbitListenerContainerFactory
,它们具有与自动配置使用的工厂相同的设置。
It does not matter which container type you chose. Those two beans are exposed by the auto-configuration. |
例如,下面的配置类公开使用特定MessageConverter
的另一个工厂:
@Configuration(proxyBeanMethods = false) public class MyRabbitConfiguration { @Bean public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); ConnectionFactory connectionFactory = getCustomConnectionFactory(); configurer.configure(factory, connectionFactory); factory.setMessageConverter(new MyMessageConverter()); return factory; } private ConnectionFactory getCustomConnectionFactory() { return ... } }
然后,您可以在任何@RabbitListener
注释的方法中使用工厂,如下所示:
@Component public class MyBean { @RabbitListener(queues = "someQueue", containerFactory = "myFactory") public void processMessage(String content) { // ... } }
您可以启用重试来处理侦听器引发异常的情况。默认情况下,使用RejectAndDontRequeeRecoverer
,但您可以定义自己的MessageRecoverer
。当重试次数耗尽时,消息将被拒绝,并被丢弃或路由到死信交换(如果代理被配置为这样做)。默认情况下,重试被禁用。您还可以通过声明RabbitRetryTemplateCustomizer
Bean,以编程方式定制RetryTemplate
。
By default, if retries are disabled and the listener throws an exception, the delivery is retried indefinitely. You can modify this behavior in two ways: Set the defaultRequeueRejected property to false so that zero re-deliveries are attempted or throw an AmqpRejectAndDontRequeueException to signal the message should be rejected. The latter is the mechanism used when retries are enabled and the maximum number of delivery attempts is reached. |
3. Apache Kafka Support
通过提供Spring-kafka
工程的自动配置,支持apache kafka。
Kafka配置由spring.kafka.*
中的外部配置属性控制。例如,您可以在Applation.Properties
中声明以下部分:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
To create a topic on startup, add a bean of type NewTopic . If the topic already exists, the bean is ignored. |
有关更多支持的选项,请参阅KafkaProperties
。
3.1. Sending a Message
Spring的KafkaTemplate
是自动配置的,您可以直接在自己的Bean中自动绑定,如下例所示:
@Component public class MyBean { private final KafkaTemplate<String, String> kafkaTemplate; public MyBean(KafkaTemplate<String, String> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } }
If the property spring.kafka.producer.transaction-id-prefix is defined, a KafkaTransactionManager is automatically configured. Also, if a RecordMessageConverter bean is defined, it is automatically associated to the auto-configured KafkaTemplate . |
3.2. Receiving a Message
当存在ApacheKafka基础设施时,任何Bean都可以用@KafkaListener
注释来创建侦听器端点。如果没有定义KafkaListenerContainerFactory
,则会使用spring.kafka.listener.*
中定义的key自动配置一个默认密钥。
以下组件在某些主题
主题上创建侦听器终结点:
@Component public class MyBean { @KafkaListener(topics = "someTopic") public void processMessage(String content) { // ... } }
如果定义了KafkaTransactionManager
Bean,它将自动关联到容器工厂。同样,如果定义了RecordFilterStrategy
、CommonErrorHandler
、AfterRollback Processor
或Consumer AwareRebalanceListener
Bean,它将自动关联到默认工厂。
根据侦听器类型,RecordMessageConverter
或BatchMessageConverter
Bean与默认工厂相关联。如果批处理侦听器只存在RecordMessageConverter
Bean,则它包装在BatchMessageConverter
中。
A custom ChainedKafkaTransactionManager must be marked @Primary as it usually references the auto-configured KafkaTransactionManager bean. |
3.3. Kafka Streams
Spring for Apache Kafka提供了一个工厂Bean来创建StreamsBuilder
对象并管理其流的生命周期。只要Kafka-Streams
在类路径上,并且@EnableKafkaStreams
注释启用了Kafka Streams,Spring Boot就会自动配置所需的KafkaStreamsConfiguration
Bean。
启用Kafka Streams意味着必须设置应用程序ID和引导服务器。前者可以使用spring.kafka.streams.application-id
,进行配置,如果未设置,则默认为spring.applation.name
。后者可以全局设置,也可以仅为流专门覆盖。
使用专用属性可以使用几个额外的属性;其他任意的Kafka属性可以使用spring.kafka.stres.properties
名称空间进行设置。有关详细信息,另请参阅其他卡夫卡属性。
要使用工厂Bean,请将StreamsBuilder
连接到您的@Bean
,如下例所示:
@Configuration(proxyBeanMethods = false) @EnableKafkaStreams public class MyKafkaStreamsConfiguration { @Bean public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) { KStream<Integer, String> stream = streamsBuilder.stream("ks1In"); stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>())); return stream; } private KeyValue<Integer, String> uppercaseValue(Integer key, String value) { return new KeyValue<>(key, value.toUpperCase()); } }
默认情况下,由StreamBuilder
对象管理的流自动启动。您可以使用spring.kafka.streams.auto-startup
属性来自定义此行为。
3.4. Additional Kafka Properties
自动配置支持的属性显示在附录的“集成属性”部分。请注意,在大多数情况下,这些属性(带连字符或CamelCase)直接映射到以点分隔的ApacheKafka属性。有关详细信息,请参阅阿帕奇·卡夫卡文档。
这些属性中的前几个适用于所有组件(生产者、消费者、管理员和流),但如果您希望使用不同的值,则可以在组件级别指定这些属性。阿帕奇·卡夫卡将属性的重要性指定为高、中或低。Spring Boot自动配置支持所有高重要性属性、一些选定的中低属性以及没有缺省值的任何属性。
只有Kafka支持的属性子集可以通过KafkaProperties
类直接使用。如果要使用不直接支持的其他属性配置生产者或消费者,请使用以下属性:
spring.kafka.properties[prop.one]=first
spring.kafka.admin.properties[prop.two]=second
spring.kafka.consumer.properties[prop.three]=third
spring.kafka.producer.properties[prop.four]=fourth
spring.kafka.streams.properties[prop.five]=fifth
这会将常见的prop.one
kafka属性设置为First
(适用于生产者、消费者和管理员),将proport.Two
admin属性设置为Second
,将prop.Three
消费者属性设置为Third
,将pro.four
Producer属性设置为第四
,并将pro.Five
Streams属性设置为第五
。
您也可以通过如下方式配置Spring KafkaJsonOverializer
:
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another
同样,您可以禁用在Header中发送类型信息的JsonSerializer
默认行为:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false
Properties set in this way override any configuration item that Spring Boot explicitly supports. |
3.5. Testing with Embedded Kafka
Spring for ApacheKafka提供了一种使用嵌入的ApacheKafka代理测试项目的便捷方法。要使用此功能,请使用Spring-kafka-test
模块中的@EmbeddedKafka
注释测试类。有关更多信息,请参阅参考手册。
要使Spring Boot自动配置与前面提到的嵌入式Apache Kafka代理一起工作,您需要将嵌入式代理地址的系统属性(由EmbeddedKafkaBroker
填充)重新映射到Apache Kafka的Spring Boot配置属性中。有几种方法可以做到这一点:
-
提供一个系统属性,将嵌入的代理地址映射到测试类中的
spring.kafka.bootstrap-Servers
:
static {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}
-
在
@EmbeddedKafka
批注上配置属性名称:
@SpringBootTest @EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers") class MyTest { // ... }
-
在配置属性中使用占位符:
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
4. RSocket
RSocket是用于字节流传输的二进制协议。它通过在单个连接上传递异步消息来启用对称交互模型。
Spring框架的Spring-Messaging
模块为客户端和服务器端的RSocket请求器和响应器提供支持。有关更多详细信息,包括RSocket协议的概述,请参阅Spring框架参考的RSocket部分。
4.1. RSocket Strategies Auto-configuration
Spring Boot自动配置RSocketStrategy
Bean,该Bean提供编码和解码RSocket有效负载所需的所有基础设施。默认情况下,自动配置将尝试配置以下内容(按顺序):
-
CBOR与Jackson的编解码器
-
JSON与杰克逊的编解码器
SpringBoot-starter-r套接字
starter提供了这两个依赖项。有关定制可能性的更多信息,请参阅Jackson支持部分。
开发人员可以通过创建实现RSocketStrategiesCustomizer
接口的Bean来自定义RSocketStrategy
组件。请注意,它们的@order
很重要,因为它决定了编解码器的顺序。
4.2. RSocket server Auto-configuration
Spring Boot提供RSocket服务器自动配置。所需的依赖项由SpringBoot-starter-r套接字
提供。
Spring Boot允许通过WebSocket从WebFlux服务器公开RSocket,或建立一个独立的RSocket服务器。这取决于应用程序的类型及其配置。
对于WebFlux应用程序(即类型WebApplicationType.REACTIVE
),仅当以下属性匹配时,才会将RSocket服务器插入Web服务器:
spring.rsocket.server.mapping-path=/rsocket
spring.rsocket.server.transport=websocket
Plugging RSocket into a web server is only supported with Reactor Netty, as RSocket itself is built with that library. |
或者,RSocket TCP或WebSocket服务器作为独立的嵌入式服务器启动。除了相关性要求之外,唯一需要的配置是为该服务器定义一个端口:
spring.rsocket.server.port=9898
4.3. Spring Messaging RSocket support
Spring Boot将为RSocket自动配置Spring消息传递基础设施。
这意味着Spring Boot将创建一个RSocketMessageHandler
Bean来处理对应用程序的RSocket请求。
4.4. Calling RSocket Services with RSocketRequester
在服务器和客户端之间建立RSocket
通道后,任何一方都可以向另一方发送或接收请求。
作为服务器,您可以在RSocket@Controller
的任何处理程序方法上注入RSocketRequester
实例。作为客户端,您需要首先配置和建立RSocket连接。Spring Boot使用预期的编解码器为此类情况自动配置RSocketRequester.Builder
,并应用任何RSocketConnectorConfigurer
Bean。
RSocketRequester.Builder
实例是一个原型Bean,这意味着每个注入点都将为您提供一个新实例。这是故意这样做的,因为此构建器是有状态的,您不应该使用同一实例创建具有不同设置的请求器。
下面的代码显示了一个典型的示例:
@Service public class MyService { private final RSocketRequester rsocketRequester; public MyService(RSocketRequester.Builder rsocketRequesterBuilder) { this.rsocketRequester = rsocketRequesterBuilder.tcp("example.org", 9898); } public Mono<User> someRSocketCall(String name) { return this.rsocketRequester.route("user").data(name).retrieveMono(User.class); } }
5. Spring Integration
Spring Boot为使用Spring Integration提供了几个便利,包括Spring-boot-starter-集成
“starter”。Spring集成提供了消息传递和其他传输(如HTTP、TCP和其他传输)的抽象。如果您的类路径上有Spring集成可用,那么它将通过@EnableIntegration
注释进行初始化。
Spring集成轮询逻辑依赖于自动配置的TaskScheduler
。默认的PollerMetadata
(每秒轮询无限数量的消息)可以使用spring.Integration.poller.*
配置属性进行自定义。
Spring Boot还配置了一些由附加的Spring集成模块触发的功能。如果类路径上也有Spring-Integration-jmx
,则消息处理统计信息将通过JMX发布。如果可以使用Spring-Integration-jdbc
,则可以在启动时创建默认的数据库模式,如下面一行所示:
spring.integration.jdbc.initialize-schema=always
如果有Spring-Integration-r套接字
可用,开发人员可以使用“spring.rsocket.server.*”
属性配置RSocket服务器,并让它使用IntegrationRSocketEndpoint
或RSocketOutound Gateway
组件处理传入的RSocket消息。该基础设施可以处理Spring Integration RSocket通道适配器和@Messagemap
处理程序(假设配置了“spring.integration.rsocket.server.message-mapping-enabled”
)。
Spring Boot还可以使用配置属性自动配置客户端RSocketConnector
:
# Connecting to a RSocket server over TCP
spring.integration.rsocket.client.host=example.org
spring.integration.rsocket.client.port=9898
# Connecting to a RSocket Server over WebSocket
spring.integration.rsocket.client.uri=ws://example.org
有关更多详细信息,请参阅IntegrationAutoConfiguration
和IntegrationProperties
类。
6. WebSockets
Spring Boot为嵌入式Tomcat、Jetty和Undertow提供WebSockets自动配置。如果将WAR文件部署到独立容器,则Spring Boot假定该容器负责其WebSocket支持的配置。
Spring框架为MVC Web应用程序提供了丰富的WebSocket支持,这些应用程序可以通过Spring-ot-starter-web套接字
模块轻松访问。
WebSocket支持也可用于反应性Web应用程序,并且需要将WebSocket API与Spring-ot-starter-webflow
一起包括:
<dependency>
<groupId>javax.websocket</groupId>
<artifactId>javax.websocket-api</artifactId>
</dependency>
7. What to Read Next
下一节介绍如何在应用程序中启用IO功能。您可以在本节中阅读有关缓存、邮件、验证、REST客户端等内容。