Skip to content

10. Messaging

Spring Framework 为消息传递系统集成提供了广泛的支持,从使用 JmsTemplate 简化 JMS API 的使用到异步接收消息的完整基础设施. Spring AMQP 为高级消息队列协议 (Advanced Message Queuing Protocol,AMQP) 提供了类似的功能集合. Spring Boot 还为 RabbitTemplate 和 RabbitMQ 提供自动配置选项. Spring WebSocket 本身包含了对 STOMP 消息传递的支持,Spring Boot 通过 starter 和少量自动配置即可支持它. Spring Boot 同样支持 Apache Kafka.

10.1. JMS

javax.jms.ConnectionFactory 接口提供了一种创建 javax.jms.Connection 的标准方法,可与 JMS broker (代理) 进行交互. 虽然 Spring 需要一个 ConnectionFactory 来与 JMS 一同工作,但是您通常不需要自己直接使用它,而是可以依赖更高级别的消息传递抽象. (有关详细信息,请参阅 Spring Framework 参考文档的相关部分.) Spring Boot 还会自动配置发送和接收消息所需的基础设施.

10.1.1. ActiveMQ Support

当 ActiveMQ 在 classpath 上可用时,Spring Boot 也可以配置一个 ConnectionFactory. 如果 broker 存在,则会自动启动并配置一个内嵌式 broker (前提是未通过配置指定 broder URL) .

TIP

如果使用 spring-boot-starter-activemq,则提供了连接到 ActiveMQ 实例必须依赖或内嵌一个 ActiveMQ 实例,以及与 JMS 集成的 Spring 基础设施.

ActiveMQ 配置由 spring.activemq.* 中的外部配置属性控制.

默认情况下,ActiveMQ 自动配置为使用 VM transport,它启动嵌入在同一 JVM 实例中的代理。

您可以通过配置 spring.activemq.in-memory 属性来禁用嵌入式代理,如下例所示:

yaml
spring:
  activemq:
    in-memory: false

如果您配置 broker URL,嵌入式 broker 也将被禁用,如以下示例所示:

yaml
spring:
  activemq:
    broker-url: "tcp://192.168.1.210:9876"
    user: "admin"
    password: "secret"

如果您想完全控制嵌入式代理,请参阅 ActiveMQ 文档 了解更多信息 .

默认情况下,CachingConnectionFactory 将原生的 ConnectionFactory 使用可由 spring.jms.* 中的外部配置属性控制的合理设置包装起来:

yaml
spring:
  jms:
    cache:
      session-cache-size: 5

如果您更愿意使用原生池,则可以通过向 org.messaginghub:pooled-jms 添加一个依赖并相应地配置 JmsPoolConnectionFactory 来实现,如下所示:

yaml
spring:
  activemq:
    pool:
      enabled: true
      max-connections: 50

TIP

有关更多支持的选项,请参阅 ActiveMQProperties. 您还可以注册多个实现了 ActiveMQConnectionFactoryCustomizer 的的 bean,以进行更高级的自定义.

默认情况下,ActiveMQ 会创建一个 destination (目标) (如果它尚不存在) ,以便根据提供的名称解析 destination.

10.1.2. ActiveMQ Artemis 支持

Spring Boot 可以在检测到 Artemis 在 classpath 上可用时自动配置一个 ConnectionFactory. 如果存在 broker,则会自动启动并配置一个内嵌 broker (除非已明确设置 mode 属性) . 支持的 mode 为 embedded (明确表示需要一个内嵌 broker,如果 broker 在 classpath 上不可用则发生错误) 和 native (使用 netty 传输协议连接到 broker) . 配置后者后,Spring Boot 会使用默认设置配置一个 ConnectionFactory,该 ConnectionFactory 连接到在本地计算机上运行的 broker.

TIP

如果使用了 spring-boot-starter-artemis,则会提供连接到现有的 Artemis 实例的必须依赖,以及与 JMS 集成的Spring 基础设施. 将 org.apache.activemq:artemis-jms-server 添加到您的应用程序可让您使用内嵌模式.

ActiveMQ Artemis 配置由 spring.artemis.* 中的外部配置属性控制. 例如,您可以在 application.properties 中声明以下部分:

yaml
spring:
  artemis:
    mode: native
    broker-url: "tcp://192.168.1.210:9876"
    user: "admin"
    password: "secret"

内嵌 broker 时,您可以选择是否要启用持久化并列出应该可用的 destination. 可以将这些指定为以逗号分隔的列表,以使用默认选项创建它们,也可以定义类型为 org.apache.activemq.artemis.jms.server.config.JMSQueueConfiguration 或 org.apache.activemq.artemis.jms.server.config.TopicConfiguration 的 bean,分别用于高级队列和 topic (主题) 配置.

默认情况下,CachingConnectionFactory 将原生的 ConnectionFactory 使用可由 spring.jms.* 中的外部配置属性控制的合理设置包装起来:

yaml
spring:
  jms:
    cache:
      session-cache-size: 5

如果您更愿意使用原生池,则可以通过向 org.messaginghub:pooled-jms 添加一个依赖并相应地配置 JmsPoolConnectionFactory 来实现,如下所示:

yaml
spring:
  artemis:
    pool:
      enabled: true
      max-connections: 50

有关更多支持的选项,请参阅 ArtemisProperties .

不涉及 JNDI 查找,使用 Artemis 配置中的 name 属性或通过配置提供的名称来解析目标 (destination) 名称.

10.1.3. 使用 JNDI ConnectionFactory

如果您在应用程序服务器中运行应用程序,Spring Boot 会尝试使用 JNDI 找到 JMS ConnectionFactory. 默认情况下,将检查 java:/JmsXA 和 java:/XAConnectionFactory 这两个位置. 如果需要指定其他位置,可以使用 spring.jms.jndi-name 属性,如下所示:

yaml
spring:
  jms:
    jndi-name: "java:/MyConnectionFactory"

10.1.4. 发送消息

Spring 的 JmsTemplate 是自动配置的,你可以直接将它注入到你自己的 bean 中,如下所示:

java
@Component
public class MyBean {    
  private final JmsTemplate jmsTemplate;    
  public MyBean(JmsTemplate jmsTemplate) {        
    this.jmsTemplate = jmsTemplate;    
  }

  // ...
}

TIP

JmsMessagingTemplate 可以以类似的方式注入. 如果定义了 DestinationResolver 或 MessageConverter bean,它将自动关联到自动配置的 JmsTemplate.

10.1.5. 接收消息

当存在 JMS 基础设施时,可以使用 @JmsListener 对任何 bean 进行注解以创建监听器 (listener) 端点. 如果未定义 JmsListenerContainerFactory,则会自动配置一个默认的 (factory) . 如果定义了 DestinationResolver ,MessageConverter 或 javax.jms.ExceptionListener bean,它将自动关联到默认的 factory.

默认情况下,默认 factory 是具有事务特性的. 如果您在存在有 JtaTransactionManager 的基础设施中运行,则默认情况下它与监听器容器相关联. 如果不是,则 sessionTransacted flag 将为启用 (enabled) . 在后一种情况下,您可以通过在监听器方法 (或其委托) 上添加 @Transactional,将本地数据存储事务与传入消息的处理相关联. 这确保了在本地事务完成后传入消息能被告知. 这还包括了发送已在同一 JMS 会话上执行的响应消息.

以下组件在 someQueue destination 上创建一个监听器端点:

java
@Component
public class MyBean {    
  @JmsListener(destination = "someQueue")
  public void processMessage(String content) {        
    // ...
  }
}

TIP

有关更多详细信息,请参阅 @EnableJms 的 Javadoc.

如果需要创建更多 JmsListenerContainerFactory 实例或覆盖默认值,Spring Boot 会提供一个 DefaultJmsListenerContainerFactoryConfigurer,您可以使用它来初始化 DefaultJmsListenerContainerFactory,其设置与自动配置的 factory 设置相同.

例如,以下示例暴露了另一个使用特定 MessageConverter 的 factory:

java
@Configuration(proxyBeanMethods = false)
public class MyJmsConfiguration {    
  @Bean
  public DefaultJmsListenerContainerFactory myFactory(DefaultJmsListenerContainerFactoryConfigurer configurer) {        
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();        
    ConnectionFactory connectionFactory = getCustomConnectionFactory();        
    configurer.configure(factory, connectionFactory);        
    factory.setMessageConverter(new MyMessageConverter());        
    return factory;    
  }    
  private ConnectionFactory getCustomConnectionFactory() {        
    return ...    
  }
}

然后,您可以在任何 @JmsListener 注解的方法中使用该 factory,如下所示:

java
@Component
public class MyBean {    
  @JmsListener(destination = "someQueue", containerFactory = "myFactory")
  public void processMessage(String content) {        
    // ...
  }
}

10.2. AMQP

高级消息队列协议 (Advanced Message Queuing Protocol,AMQP) 是一个平台无关,面向消息中间件的连接级协议. Spring AMQP 项目将核心 Spring 概念应用于基于 AMQP 消息传递解决方案的开发. Spring Boot 为通过 RabbitMQ 使用 AMQP 提供了一些快捷方法,包括 spring-boot-starter-amqp starter.

10.2.1. RabbitMQ 支持

RabbitMQ 是一个基于 AMQP 协议的轻量级、可靠、可扩展且可移植的消息代理. Spring 使用 RabbitMQ 通过 AMQP 协议进行通信.

RabbitMQ 配置由 spring.rabbitmq.* 中的外部配置属性控制. 例如,您可以在 application.properties 中声明以下部分:

yaml
spring:
  rabbitmq:
    host: "localhost"
    port: 5672
    username: "admin"
    password: "secret"

另外,您可以配置相同 addresses 属性的连接:

yaml
spring:
  rabbitmq:
    addresses: "amqp://admin:secret@localhost"

TIP

当以这种方式指定 addresses 时,host 和 port 属性将被忽略. 如果地址使用 amqps 协议,则会自动启用 SSL 支持

有关更多支持的选项,请参阅 RabbitProperties . 要配置 Spring AMQP 使用的 RabbitMQ ConnectionFactory 的详细信息,请定义一个 ConnectionFactoryCustomizer bean。

如果上下文中存在 ConnectionNameStrategy bean,它将自动用于命名由自动配置的 CachingConnectionFactory 创建的连接。

TIP

有关详细信息,请参阅理解 AMQP、RabbitMQ 使用的协议.

10.2.2. 发送消息

Spring 的 AmqpTemplate 和 AmqpAdmin 是自动配置的,您可以将它们直接注入自己的 bean 中,如下所示:

java
@Component
public class MyBean {    
  private final AmqpAdmin amqpAdmin;    
  private final AmqpTemplate amqpTemplate;    
  public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {        
    this.amqpAdmin = amqpAdmin;        
    this.amqpTemplate = amqpTemplate;    
  }

  // ...
}

TIP

RabbitMessagingTemplate 可以以类似的方式注入. 如果定义了 MessageConverter bean,它将自动关联到自动配置的 AmqpTemplate.

如有必要,所有定义为 bean 的 org.springframework.amqp.core.Queue 都会自动在 RabbitMQ 实例上声明相应的队列.

要重试操作,可以在 AmqpTemplate 上启用重试 (例如,在 broker 连接丢失的情况下) :

yaml
spring:
  rabbitmq:
    template:
      retry:
        enabled: true
        initial-interval: "2s"

默认情况下禁用重试. 您还可以通过声明 RabbitRetryTemplateCustomizer bean 以编程方式自定义 RetryTemplate.

如果您需要创建更多的 RabbitTemplate 实例,或者想覆盖默认实例,Spring Boot 提供了一个 RabbitTemplateConfigurer bean,您可以使用它来初始化一个 RabbitTemplate,其设置与自动配置所使用的工厂相同.

10.2.3. 接收消息

当 Rabbit 基础设施存在时,可以使用 @RabbitListener 注解任何 bean 以创建监听器端点. 如果未定义 RabbitListenerContainerFactory,则会自动配置一个默认的 SimpleRabbitListenerContainerFactory,您可以使用 spring.rabbitmq.listener.type 属性切换到一个直接容器. 如果定义了 MessageConverter 或 MessageRecoverer bean,它将自动与默认 factory 关联.

以下示例组件在 someQueue 队列上创建一个监听器端点:

java
@Component
public class MyBean {    
  @RabbitListener(queues = "someQueue")
  public void processMessage(String content) {        
    // ...
  }
}

TIP

有关更多详细信息,请参阅 the Javadoc of @EnableRabbit 的 Javadoc.

如果需要创建更多 RabbitListenerContainerFactory 实例或覆盖默认值,Spring Boot 提供了一个 SimpleRabbitListenerContainerFactoryConfigurer 和一个 DirectRabbitListenerContainerFactoryConfigurer,您可以使用它来初始化 SimpleRabbitListenerContainerFactory 和 DirectRabbitListenerContainerFactory,其设置与使用自动配置的 factory 相同.

TIP

这两个 bean 与您选择的容器类型没有关系,它们通过自动配置暴露.

例如,以下配置类暴露了另一个使用特定 MessageConverter 的 factory:

java
@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {    
  @Bean
  public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {        
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();        
    ConnectionFactory connectionFactory = getCustomConnectionFactory();        
    configurer.configure(factory, connectionFactory);        
    factory.setMessageConverter(new MyMessageConverter());        
    return factory;    
  }    
  private ConnectionFactory getCustomConnectionFactory() {        
    return ...    
  }
}

然后,您可以在任何 @RabbitListener 注解的方法中使用该 factory,如下所示:

java
@Component
public class MyBean {    
  @RabbitListener(queues = "someQueue", containerFactory = "myFactory")
  public void processMessage(String content) {        
    // ...
  }
}

您可以启用重试机制来处理监听器的异常抛出情况. 默认情况下使用 RejectAndDontRequeueRecoverer,但您可以定义自己的 MessageRecoverer. 如果 broker 配置了重试机制,当重试次数耗尽时,则拒绝消息并将其丢弃或路由到死信 (dead-letter) exchange 中. 默认情况下重试机制为禁用. 您还可以通过声明 RabbitRetryTemplateCustomizer bean 以编程方式自定义 RetryTemplate.

TIP

默认情况下,如果禁用重试并且监听器异常抛出,则会无限期地重试传递. 您可以通过两种方式修改此行为: 将 defaultRequeueRejected 属性设置为 false,以便尝试零重传或抛出 AmqpRejectAndDontRequeueException 以指示拒绝该消息. 后者是启用重试并且达到最大传递尝试次数时使用的机制.

10.3. Apache Kafka 支持

通过提供 spring-kafka 项目的自动配置来支持 Apache Kafka

Kafka 配置由 spring.kafka.* 中的外部配置属性控制. 例如,您可以在 application.properties 中声明以下部分:

yaml
spring:
  kafka:
    bootstrap-servers: "localhost:9092"
    consumer:
      group-id: "myGroup"

TIP

要在启动时创建主题 (topic) ,请添加 NewTopic 类型的 Bean. 如果主题已存在,则忽略该 bean.

有关更多支持的选项,请参阅 KafkaProperties .

10.3.1. 发送消息

Spring 的 KafkaTemplate 是自动配置的,您可以直接在自己的 bean 中装配它,如下所示:

java
@Component
public class MyBean {    
  private final KafkaTemplate<String, String> kafkaTemplate;    
  public MyBean(KafkaTemplate<String, String> kafkaTemplate) {        
    this.kafkaTemplate = kafkaTemplate;    
  }

  // ...
}

TIP

如果定义了属性 spring.kafka.producer.transaction-id-prefix,则会自动配置一个 KafkaTransactionManager. 此外,如果定义了 RecordMessageConverter bean,它将自动关联到自动配置的 KafkaTemplate.

10.3.2. 接收消息

当存在 Apache Kafka 基础设施时,可以使用 @KafkaListener 注解任何 bean 以创监听器端点. 如果未定义 KafkaListenerContainerFactory,则会使用 spring.kafka.listener.* 中定义的 key 自动配置一个默认的 factory.

以下组件在 someTopic topic 上创建一个监听器端点:

java
@Component
public class MyBean {    
  @KafkaListener(topics = "someTopic")
  public void processMessage(String content) {        
    // ...
  }
}

如果定义了 KafkaTransactionManager bean,它将自动关联到容器 factory. 同样,如果定义了 RecordFilterStrategyCommonErrorHandlerAfterRollbackProcessor 或 ConsumerAwareRebalanceListener bean,它将自动关联到默认的 factory.

根据监听器类型,将 RecordMessageConverter 或 BatchMessageConverter bean与默认工厂关联. 如果对于批处理监听器仅存在一个 RecordMessageConverter bean,则将其包装在 BatchMessageConverter 中.

TIP

自定义 ChainedKafkaTransactionManager 必须标记为 @Primary,因为它通常引用自动配置的 KafkaTransactionManager bean.

10.3.3. Kafka Streams

Spring for Apache Kafka 提供了一个工厂 bean 来创建 StreamsBuilder 对象并管理其 stream (流) 的生命周期. 只要 kafka-streams 在 classpath 上并且通过 @EnableKafkaStreams 注解启用了 Kafka Stream,Spring Boot 就会自动配置所需的 KafkaStreamsConfiguration bean.

启用 Kafka Stream 意味着必须设置应用程序 id 和引导服务器 (bootstrap server) . 可以使用 spring.kafka.streams.application-id 配置前者,如果未设置则默认为 spring.application.name. 后者可以全局设置或专门为 stream 而重写.

使用专用 properties 可以设置多个其他属性,可以使用 spring.kafka.streams.properties 命名空间设置其他任意 Kafka 属性. 有关更多信息,另请参见 其他 Kafka 属性.

要使用 factory bean,只需将 StreamsBuilder 装配到您的 @Bean 中,如下所示:

java
@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public class MyKafkaStreamsConfiguration {    
  @Bean
  public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {        
    KStream<Integer, String> stream = streamsBuilder.stream("ks1In");        
    stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>()));        
    return stream;    
  }    
  private KeyValue<Integer, String> uppercaseValue(Integer key, String value) {        
    return new KeyValue<>(key, value.toUpperCase());    
  }
}

默认情况下,由其创建的 StreamBuilder 对象管理的流会自动启动. 您可以使用 spring.kafka.streams.auto-startup 属性自定义此行为.

10.3.4. 其他 Kafka 属性

自动配置支持的属性可在 常见应用程序属性中找到. 请注意,在大多数情况下,这些属性 (连接符或驼峰命名) 直接映射到 Apache Kafka 点连形式属性. 有关详细信息,请参阅 Apache Kafka 文档.

这些属性中的前几个适用于所有组件 (生产者 [producer] 、使用者 [consumer] 、管理者 [admin] 和流 [stream] ) ,但如果您希望使用不同的值,则可以在组件级别指定. Apache Kafka 重要性 (优先级) 属性设定为 HIGH、MEDIUM 或 LOW. Spring Boot 自动配置支持所有 HIGH 重要性属性,一些选择的 MEDIUM 和 LOW 属性,以及所有没有默认值的属性.

只有 Kafka 支持的属性的子集可以直接通过 KafkaProperties 类获得. 如果您希望使用不受支持的其他属性配置生产者或消费者,请使用以下属性:

yaml
spring:
  kafka:
    properties:
      "[prop.one]": "first"
    admin:
      properties:
        "[prop.two]": "second"
    consumer:
      properties:
        "[prop.three]": "third"
    producer:
      properties:
        "[prop.four]": "fourth"
    streams:
      properties:
        "[prop.five]": "fifth"

这将常见的 prop.one Kafka 属性设置为 first (适用于生产者、消费者和管理者) ,prop.two 管理者属性为 second,prop.three 消费者属性为 third,prop.four 生产者属性为 fourth,prop.five 流属性为 fifth.

您还可以按如下方式配置 Spring Kafka JsonDeserializer:

yaml
spring:
  kafka:
    consumer:
      value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
      properties:
        "[spring.json.value.default.type]": "com.example.Invoice"
        "[spring.json.trusted.packages]": "com.example.main,com.example.another"

同样,您可以禁用 JsonSerializer 在 header 中发送类型信息的默认行为:

yaml
spring:
  kafka:
    producer:
      value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer"
      properties:
        "[spring.json.add.type.headers]": false

TIP

以这种方式设置的属性将覆盖 Spring Boot 明确支持的任何配置项.

10.3.5. 使用嵌入式 Kafka 进行测试

Spring 为 Apache Kafka 提供了一种使用嵌入式 Apache Kafka 代理测试项目的便捷方法. 要使用此功能,请在 spring-kafka-test 模块中使用 @EmbeddedKafka 注解测试类. 有关更多信息,请参阅 Spring for Apache Kafka 参考手册.

要使 Spring Boot 自动配置与上述嵌入式 Apache Kafka 代理一起使用,您需要将嵌入式代理地址 (由 EmbeddedKafkaBroker 填充) 的系统属性重新映射到 Apache Kafka 的 Spring Boot 配置属性中. 有几种方法可以做到这一点:

  • 提供一个系统属性,以将嵌入式代理地址映射到测试类中的 spring.kafka.bootstrap-servers 中:
java
static {
    System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}
  • 在 @EmbeddedKafka 注解上配置属性名称:
java
@SpringBootTest
@EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {    // ...

}
  • 在配置属性中使用占位符:
yaml
spring:
  kafka:
    bootstrap-servers: "${spring.embedded.kafka.brokers}"

10.4. RSocket

RSocket 是用于字节流传输的二进制协议. 它通过通过单个连接传递的异步消息来启用对称交互模型.

Spring 框架的 spring-messaging 模块在客户端和服务器端都支持 RSocket 请求者和响应者. 有关更多详细信息,请参见 Spring Framework 参考中的 RSocket 部分,其中包括 RSocket 协议的概述.

10.4.1. RSocket 自动配置策略

Spring Boot自动配置一个 RSocketStrategies bean,该 bean 提供了编码和解码 RSocket 有效负载所需的所有基础结构. 默认情况下,自动配置将尝试 (按顺序) 配置以下内容:

  1. Jackson的 CBOR 编解码器

  2. Jackson的 JSON 编解码器

spring-boot-starter-rsocket 启动器提供了两个依赖. 查阅 Jackson 支持部分,以了解有关定制可能性的更多信息.

开发人员可以通过创建实现 RSocketStrategiesCustomizer 接口的bean来自定义 RSocketStrategies 组件. 请注意,它们的 @Order 很重要,因为它确定编解码器的顺序.

10.4.2. RSocket 服务器自动配置

Spring Boot 提供了 RSocket 服务器自动配置. 所需的依赖由 spring-boot-starter-rsocket 提供.

Spring Boot 允许从 WebFlux 服务器通过 WebSocket 暴露 RSocket,或支持独立的 RSocket 服务器. 这取决于应用程序的类型及其配置.

对于 WebFlux 应用程序 (即 WebApplicationType.REACTIVE 类型) ,仅当以下属性匹配时,RSocket 服务器才会插入 Web 服务器:

yaml
spring:
  rsocket:
    server:
      mapping-path: "/rsocket"
      transport: "websocket"

TIP

由于 RSocket 本身是使用该库构建的,因此只有 Reactor Netty 支持将 RSocket 插入 Web 服务器.

另外,RSocket TCP 或 Websocket 服务器也可以作为独立的嵌入式服务器启动. 除了依赖性要求之外,唯一需要的配置是为该服务器定义端口:

yaml
spring:
  rsocket:
    server:
      port: 9898

10.4.3. Spring Messaging RSocket 支持

Spring Boot 将为 RSocket 自动配置 Spring Messaging 基础结构.

这意味着 Spring Boot 将创建一个 RSocketMessageHandler bean,该 bean 将处理对您的应用程序的 RSocket 请求.

10.4.4. 使用 RSocketRequester 调用 RSocket 服务

在服务器和客户端之间建立 RSocket 通道后,任何一方都可以向另一方发送或接收请求.

作为服务器,您可以在 RSocket @Controller 的任何处理程序方法上注入 RSocketRequester 实例. 作为客户端,您需要首先配置和建立 RSocket 连接. 在这种情况下,Spring Boot 会使用预期的编解码器自动配置 RSocketRequester.Builder 并应用任何 RSocketConnectorConfigurer bean.

RSocketRequester.Builder 实例是一个原型 bean,这意味着每个注入点将为您提供一个新实例. 这样做是有目的的,因为此构建器是有状态的,因此您不应使用同一实例创建具有不同设置的请求者.

以下代码显示了一个典型示例:

java
@Service
public class MyService {    
  private final RSocketRequester rsocketRequester;    
  public MyService(RSocketRequester.Builder rsocketRequesterBuilder) {        
    this.rsocketRequester = rsocketRequesterBuilder.tcp("example.org", 9898);    
  }    
  public Mono<User> someRSocketCall(String name) {        
    return this.rsocketRequester.route("user").data(name).retrieveMono(User.class);    
  }
}

10.5. Spring Integration

Spring Boot 为 Spring Integration 提供了一些便捷的使用方式,它们包含在 spring-boot-starter-integration starter 中. Spring Integration 为消息传递以及其他传输 (如 HTTP、TCP 等) 提供了抽象. 如果 classpath 上存在 Spring Integration,则 Spring Boot 会通过 @EnableIntegration 注解对其进行初始化.

Spring Integration polling logic 轮询逻辑依赖于 自动配置的 TaskScheduler。 默认的 PollerMetadata(每秒轮询无限数量的消息)可以使用 spring.integration.poller.* 配置属性进行自定义。

Spring Boot 还配置了一些由其他 Spring Integration 模块触发的功能. 如果 spring-integration-jmx 也在 classpath 上,则消息处理统计信息将通过 JMX 发布. 如果 spring-integration-jdbc 可用,则可以在启动时创建默认数据库模式,如下所示:

yaml
spring:
  integration:
    jdbc:
      initialize-schema: "always"

如果可用 spring-integration-rsocket,则开发人员可以使用 "spring.rsocket.server.*" 属性配置 RSocket 服务器,并使其使用 IntegrationRSocketEndpoint 或 RSocketOutboundGateway 组件来处理传入的 RSocket 消息. 该基础结构可以处理 Spring Integration RSocket 通道适配器和 @MessageMapping 处理程序 (已配置 "spring.integration.rsocket.server.message-mapping-enabled") .

Spring Boot 还可以使用配置属性来自动配置 ClientRSocketConnector:

yaml
# Connecting to a RSocket server over TCP
spring:
  integration:
    rsocket:
      client:
        host: "example.org"
        port: 9898
yaml
# Connecting to a RSocket Server over WebSocket
spring:
  integration:
    rsocket:
      client:
        uri: "ws://example.org"

有关更多详细信息,请参阅 IntegrationAutoConfiguration 和 IntegrationProperties 类.

10.6. WebSockets

Spring Boot 为内嵌式 Tomcat、Jetty 和 Undertow 提供了 WebSocket 自动配置. 如果将 war 文件部署到独立容器,则 Spring Boot 假定容器负责配置其 WebSocket 支持.

Spring Framework 为 MVC Web 应用程序提供了 丰富的 WebSocket 支持 ,可以通过 spring-boot-starter-websocket 模块轻松访问.

WebSocket 支持也可用于 响应式 Web 应用程序 ,并且引入 WebSocket API 以及 spring-boot-starter-webflux:

xml
<dependency>
    <groupId>javax.websocket</groupId>
    <artifactId>javax.websocket-api</artifactId>
</dependency>

10.7. 下一步

下一节将介绍如何在您的应用程序中启用 IO 能力。 您可以阅读有关 cachingmailvalidationrest clients 以及本节中的更多内容。