vlambda博客
学习文章列表

读书笔记《hands-on-high-performance-with-spring-5》优化Spring消息传递

Optimizing Spring Messaging

在上一章中,我们学习了使用 对象关系映射 (ORM) 框架(如 Hibernate)访问数据库的不同高级方法。我们还学习了如何在使用 ORM 时以最佳方式改进数据库访问。我们研究了 Spring Data 以删除用于实现 数据访问对象 (DAO) 接口的样板代码。在本章的最后,我们看到了 Hibernate 的最佳实践。

在本章中,我们将了解 Spring 对消息传递的支持。消息传递是一种非常强大的技术,它有助于扩展应用程序并鼓励我们解耦架构。

Spring Framework 通过简化使用 Java Message Service (JMS) API 来异步接收消息,为将消息传递系统集成到我们的应用程序提供了广泛的支持。消息传递解决方案可用于将消息从应用程序中的一个点发送到已知点,以及从应用程序的一个点发送到许多其他未知点。相当于面对面分享某样东西,在喇叭上分别分享给一群人。如果我们希望将消息发送到一组未知的客户端,那么我们可以使用队列将消息广播给正在收听的人。

以下是我们将在本章中介绍的主题:

  • What is messaging?
  • What is AMQP?
  • Why do we need AMQP?
  • RabbitMQ
  • Spring messaging configuration

What is messaging?

消息传递是软件组件或应用程序之间的一种交互模式,客户端可以在其中向任何其他客户端发送消息,也可以从任何其他客户端接收消息。

可以使用名为broker 的组件来完成此消息交换。代理提供所有必要的支持和服务来交换消息以及与其他接口交互的能力。这些接口称为面向消息的中间件 (MOM)。下图描述了基于 MOM 的消息传递系统:

读书笔记《hands-on-high-performance-with-spring-5》优化Spring消息传递

消息系统降低了使用 AMQP、STOMP 和 XMPP 协议开发分布式应用程序的复杂性。让我们详细讨论它们:

  • AMQP: AMQP is an open, standard layer application protocol for asynchronous messaging systems. In AMQP, messages should be transmitted in binary format.
  • STOMP: STOMP stands for Simple Text Oriented Messaging Protocol. STOMP provides a compatible medium that allows systems to communicate with almost all the available message brokers.
  • XMPP: XMPP stands for Extensible Messaging and Presence Protocol. It is an XML-based, open standard communications protocol for MOM.

What is AMQP?

高级消息队列协议 (AMQP) 是一个开放的标准应用层协议。 E指定传输消息的每个字节,这允许它在许多其他语言和操作系统体系结构中使用。因此,这使其成为跨平台兼容的协议。 AMQP 受到多个消息代理的支持,例如 RabbitMQ、ActiveMQ、Qpid 和 Solace。 Spring 提供了基于 AMQP 的消息传递实现方案。 Spring 提供了一个模板,用于通过消息代理发送和接收消息。

Problems with the JMS API

JMS API 用于在Java 平台中发​​送和接收消息。 Spring 通过在 JMS 层周围提供一个附加层来支持一种利用 JMS API 的简单方法。该层改进了发送和接收消息的方式,还处理了连接对象的创建和释放。

为了创建基于 Java 的消息传递系统,JMS API 被开发人员广泛使用。使用 JMS API 的主要缺点是平台矛盾,这意味着我们可以使用 JMS API 来开发与基于 Java 的应用程序一起工作的消息传递系统。 JMS API 不支持其他编程语言。

Why do we need AMQP?

AMQP 是这个 JMS API 问题的解决方案。使用 AMQP 的根本优势在于它支持消息交换,而无需考虑平台的兼容性和消息代理。我们可以使用任何编程语言开发消息系统,并且仍然可以使用基于 AMQP 的消息代理与每个系统进行通信。

Differences between AMQP and the JMS API

以下是 AMQP 和 JMS API 之间的一些重要区别:

  • Platform compatibility
  • Messaging models
  • Message data type
  • Message structure
  • Message routing
  • Workflow strategy

这些将在以下各节中更详细地解释。

Platform compatibility

JMS 应用程序可以与任何操作系统一起工作,但它们只支持 Java 平台。如果我们想开发一个消息系统来与多个系统进行通信,那么所有这些系统都应该使用 Java 编程语言来开发。

在使用 AMQP 的同时,我们可以开发一个消息系统,它可以与任何具有不同技术的系统进行通信。因此,不需要使用相同的技术开发目标系统。

Messaging models

JMS API 提供了两种消息模型,点对点和发布-订阅,用于不同平台系统之间的异步消息传递。

AMQP 支持以下交换类型:direct、topic、fanout 和 headers。

Message data type

JMS API 支持五种标准消息类型:

  • StreamMessage
  • MapMessage
  • TextMessage
  • ObjectMessage
  • BytesMessage

AMQP 只支持一种类型的消息——二进制消息;消息必须仅以二进制格式传输。

Message structure

JMS API 消息的基本结构包括三个部分:标头、属性和正文。它表征了一种标准形式,应该可以在所有 JMS 提供者之间移植。

AMQP 消息包含四个部分:标题、属性、正文和页脚。

Message routing

对于消息路由,AMQP同样可以用于复杂的路由方案,这可以通过路由键来设想,并且取决于目的地匹配标准。

JMS API 基于更复杂的路由方案,这些路由方案基于分层主题和客户端消息选择过滤器。

Workflow strategy

在 AMQP 中,生产者首先需要将消息发送到交换器,然后将其传输到队列中,而在 JMS 中,不需要交换器,因为消息可以直接发送到队列或主题。

What are exchanges, queues, and bindings?

AMQP 与发布者和消费者打交道。 Publisher 发送消息,Consumer 接收消息。消息代理负责这种机制,以确保来自发布者的消息发送到正确的消费者。消息代理使用的两个关键元素是交换和队列。下图说明了发布者如何连接到消费者:

读书笔记《hands-on-high-performance-with-spring-5》优化Spring消息传递

让我们了解一下交换、队列和绑定术语。

Exchange

交换器负责接收消息并将其路由到零个或多个队列中。代理的每个交换都有一个唯一的名称,以及虚拟主机中的一些其他属性。使用的消息路由算法取决于交换类型和绑定。正如我们前面提到的,有四种不同类型的交换:直接、主题、扇出和标头。

Queue

队列是消息消费者从中接收消息的组件。队列具有唯一的名称,以便系统可以引用它们。队列名称可以由应用程序定义或在请求时由代理生成。我们不能使用以 amq. 开头的队列名称,因为它由代理保留供内部使用。

Binding

绑定用于将队列连接到交换器。有一些称为 routing key 标头的标准标头,代理使用它们将消息与队列匹配。每个队列都有一个特定的绑定键,如果该键与路由键标头的值匹配,则队列接收消息。

Introducing RabbitMQ

RabbitMQ 是基于 AMQP 的,并且是使用最广泛的轻量级、可靠、可扩展、可移植和健壮的消息代理之一,它是用 Erlang 编写的。 RabbitMQ 受欢迎的重要原因在于它易于设置并适合云规模。 RabbitMQ 是开源的,大多数操作系统和平台都支持。使用 RabbitMQ 的应用程序可以通过平台中立的线路级协议(AMQP)与其他系统通信。现在,让我们来看看如何配置 RabbitMQ。

Setting up the RabbitMQ server

在开发消息系统之前,我们需要设置一个消息代理来处理发送和接收消息。 RabbitMQ 是 AMQP 服务器,可在 http://www.rabbitmq.com/download.html< /一>。

一旦你安装了 RabbitMQ 服务器,根据你的安装路径,你必须 RABBITMQ_HOME 设置以下系统变量:

RABBITMQ_HOME=D:\Apps\RabbitMQ Server\rabbitmq_server-3.6.0

设置完所有内容后,您可以使用 http://localhost:15672/. 访问 RabbitMQ 控制台

您将看到默认登录屏幕,您需要在其中输入 guest 作为默认 用户名 guestguest作为密码

读书笔记《hands-on-high-performance-with-spring-5》优化Spring消息传递

登录后,你会看到RabbitMQ服务器主页,在这里你可以管理QueuesExchanges绑定:

读书笔记《hands-on-high-performance-with-spring-5》优化Spring消息传递

现在我们将通过一个示例来了解 Spring 应用程序中的消息传递配置。

Spring messaging configuration

在开始示例之前,我们需要了解配置消息传递应用程序的基本设置要求。我们将创建一个 RabbitMQ 消息传递应用程序并完成配置的不同部分。在 Spring 应用程序中设置消息传递涉及以下步骤:

  1. Configure a Maven dependency for RabbitMQ
  2. Configure RabbitMQ
  3. Create a component to send and receive messages

Configuring a Maven dependency for RabbitMQ

让我们从将 RabbitMQ 的依赖项添加到 pom.xml 开始。以下代码显示了要配置的依赖项:

<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>${rabbitmq.version}</version>
</dependency>

我们为 RabbitMQ 添加了依赖项。现在,让我们创建一个类来配置它们之间的队列、交换和绑定。

Configuring RabbitMQ

现在,我们就通过配置部分来清楚的了解ConnectionFactoryRabbitTemplateQueueExchange<的配置/kbd>、Binding、消息侦听器容器和消息转换器。

Configuring ConnectionFactory

对于 ConnectionFactory 接口,t这里有一个具体的实现 CachingConnectionFactory 默认情况下,它创建一个可以共享的连接代理 用于创建的代码 CachingConnectionFactory如下:

@Bean
public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new 
        CachingConnectionFactory("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
}

我们还可以使用 CachingConnectionFactory 以及仅使用通道来配置缓存连接。我们需要使用 setCacheMode()cacheMode 属性设置为 CacheMode.CONNECTION。我们还可以通过使用 setConnectionLimit() 来限制使用 connectionLimit 属性允许的连接总数。当设置此属性并超出限制时,channelCheckoutTimeLimit 用于等待连接变为空闲状态。

Configuring a queue

现在,我们将使用 Queue 类配置一个队列。以下代码创建一个具有特定名称的队列:

@Bean
public Queue queue() {
    return new Queue(RABBIT_MESSAGE_QUEUE, true);
}

前面的 queue() 方法创建了一个 AMQP 队列,该队列具有使用 RABBIT_MESSAGE_QUEUE 常量声明的特定名称。我们还可以使用 durable 标志设置持久性。我们需要将它与第二个构造函数参数一起作为布尔类型传递。

Configuring an exchange

现在,我们需要创建一个 AMQP 交换,消息生产者将向其发送消息。 Exchange 接口代表一个 AMQP 交换。 Exchange 接口类型有四种实现:DirectExchangeTopicExchangeFanoutExchangeHeadersExchange< /kbd>。我们可以根据我们的要求使用任何交换类型。我们将使用以下代码使用 DirectExchange

@Bean
public DirectExchange exchange() {
    return new DirectExchange(RABBIT_MESSAGE_EXCHANGE);
}

exchange() 方法创建具有在 RABBIT_MESSAGE_EXCHANGE 下定义的特定名称的 DirectExchange。我们还可以使用耐用标志设置耐用性。我们需要将它与第二个构造函数参数一起作为布尔类型传递。

Configuring a binding

现在,我们需要使用 BindingBuilder 类创建一个绑定,以将 queue 连接到 Exchange。以下代码用于创建绑定:

@Bean
Binding exchangeBinding(DirectExchange directExchange, Queue queue) {
    return BindingBuilder.bind(queue).
        to(directExchange)
        .with(ROUTING_KEY);
}

exchangeBinding() 方法使用 ROUTING_KEY 路由键值创建 queueExchange 的绑定。

Configuring RabbitAdmin

RabbitAdmin 用于声明需要在启动时准备好的交换、队列和绑定。 RabbitAdmin 自动声明队列、交换和绑定。这种自动声明的主要好处是,如果连接由于某种原因断开连接,它们将在重新建立连接时自动应用。以下代码配置 RabbitAdmin

@Bean
public RabbitAdmin rabbitAdmin() {
    RabbitAdmin admin = new RabbitAdmin(connectionFactory());
    admin.declareQueue(queue());
    admin.declareExchange(exchange());
    admin.declareBinding(exchangeBinding(exchange(), queue()));
    return admin;
}

rabbitAdmin() 将声明 QueueExchangeBindingRabbitAdmin 构造函数使用 connectionFactory() bean 创建一个实例,并且它不能为 null

RabbitAdmin 仅当 CachingConnectionFactory 缓存模式为 CHANNEL(默认情况下)。此限制的原因是因为排他队列和自动删除队列可能绑定到连接。

Configuring a message converter

在侦听器接收到消息的确切时间,会发生两个更改步骤。在初始步骤中,使用 MessageConverter 将传入的 AMQP 消息转换为 Spring 消息传递 Message。在第二步中,当目标方法被执行时,如果需要,消息的有效负载将转换为参数类型。默认情况下,在初始步骤中,MessageConverter 被用作 Spring AMQP SimpleMessageConverter,处理转换为 String 和 java.io.Serializable

第二步,默认使用GenericMessageConverter进行转换。我们在以下代码中使用了 Jackson2JsonMessageConverter

@Bean
public MessageConverter messageConverter() {
    return new Jackson2JsonMessageConverter();
}

我们将使用此消息转换器作为属性更改默认消息转换器 在下一节中配置RabbitTemplate

Creating a RabbitTemplate

Spring AMQP 的 RabbitTemplate 为基本的 AMQP 操作提供了一切。以下代码使用 connectionFactory 创建 RabbitTemplate 的实例:

@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    template.setRoutingKey(ROUTING_KEY);
    template.setExchange(RABBIT_MESSAGE_EXCHANGE);
    template.setMessageConverter(messageConverter());
    return template;
}

RabbitTemplate 充当生产者发送消息和消费者接收消息的辅助类。

Configuring a listener container

要异步接收消息,最简单的方法是使用带注释的侦听器端点。我们将使用 @RabbitListener 注释作为消息 listener 端点。要创建这个 listener 端点,我们必须使用 SimpleRabbitListenerContainerFactory 类配置消息 listener 容器,该类是 RabbitListenerContainerFactory 的实现 接口。以下代码用于配置SimpleRabbitListenerContainerFactory

@Bean
public SimpleRabbitListenerContainerFactory listenerContainer() {
    SimpleRabbitListenerContainerFactory factory = new 
    SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setMaxConcurrentConsumers(5);
    return factory;
}

listenerContainer() 方法将实例化 SimpleRabbitListenerContainerFactory。您可以使用 setMaxConcurrentConsumers() 方法通过 maxConcurrentConsumers 属性设置最大使用者数。

以下是包含所有先前讨论的配置方法的类:

@Configuration
@ComponentScan("com.packt.springhighperformance.ch7.bankingapp")
@EnableRabbit
public class RabbitMqConfiguration {

  public static final String RABBIT_MESSAGE_QUEUE = 
  "rabbit.queue.name";
  private static final String RABBIT_MESSAGE_EXCHANGE =     
  "rabbit.exchange.name";
  private static final String ROUTING_KEY = "messages.key";

  @Bean
  public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new 
    CachingConnectionFactory("127.0.0.1");
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    return connectionFactory;
  }

  @Bean
  public Queue queue() {
    return new Queue(RABBIT_MESSAGE_QUEUE, true);
  }

  @Bean
  public DirectExchange exchange() {
    return new DirectExchange(RABBIT_MESSAGE_EXCHANGE);
  }

  @Bean
  Binding exchangeBinding(DirectExchange directExchange, Queue queue) {
    return 
    BindingBuilder.bind(queue).to(directExchange).with(ROUTING_KEY);
  }

  @Bean
  public RabbitAdmin rabbitAdmin() {
    RabbitAdmin admin = new RabbitAdmin(connectionFactory());
    admin.declareQueue(queue());
    admin.declareExchange(exchange());
    admin.declareBinding(exchangeBinding(exchange(), queue()));
    return admin;
  }

  @Bean
  public MessageConverter messageConverter() {
    return new Jackson2JsonMessageConverter();
  }

  @Bean
  public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    template.setRoutingKey(ROUTING_KEY);
    template.setExchange(RABBIT_MESSAGE_EXCHANGE);
    template.setMessageConverter(messageConverter());
    return template;
  }

  @Bean
  public SimpleRabbitListenerContainerFactory listenerContainer() {
    SimpleRabbitListenerContainerFactory factory = new 
    SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setMaxConcurrentConsumers(5);
    return factory;
  }

}

Creating a message receiver

现在,我们将创建一个带有 @RabbitListener 注释方法的 Consumer 侦听器类,它将接收来自 RabbitMQ 的消息:

@Service
public class Consumer {

  private static final Logger LOGGER = 
  Logger.getLogger(Consumer.class);

  @RabbitListener(containerFactory = "listenerContainer",
  queues = RabbitMqConfiguration.RABBIT_MESSAGE_QUEUE)
  public void onMessage(Message message) {
      LOGGER.info("Received Message: " + 
      new String(message.getBody()));
    }
}

这是消息 listenerContainer 类。每当生产者向 queue 发送消息时,该类将接收它,并且只有具有 @RabbitListener(containerFactory = "listenerContainer", queues = RabbitMqConfiguration.RABBIT_MESSAGE_QUEUE) 的方法注释将接收消息。在这个注解中,我们提到了 containerFactory 属性,它指向 listenerContainer bean 中定义的消息侦听器工厂。

Creating a message producer

为了运行这个应用程序,我们将使用 RabbitTemplate.convertAndSend() 方法来发送消息。此方法还将自定义 Java 对象转换为 AMQP 消息并发送到直接交换。以下 BankAccount 类被创建为自定义类以填充消息属性:

public class BankAccount {

    private int accountId;
    private String accountType;

    public BankAccount(int accountId, String accountType) {
        this.accountId = accountId;
        this.accountType = accountType;
    }

    public int getAccountId() {
        return accountId;
    }

    public String getAccountType() {
        return accountType;
    }

    @Override
    public String toString() {
        return "BankAccount{" +
                "Account Id=" + accountId +
                ", Account Type='" + accountType + '\'' +
                '}';
    }
}

在下面的类中,我们将使用一些适当的值初始化前面的类,并使用 RabbitTemplate.convertAndSend() 将其发送到交换:

public class Producer {

  private static final Logger LOGGER = 
  Logger.getLogger(Producer.class);
  
  @SuppressWarnings("resource")
  public static void main(String[] args) {
        ApplicationContext ctx = new 
        AnnotationConfigApplication
        Context(RabbitMqConfiguration.class);
        RabbitTemplate rabbitTemplate = 
        ctx.getBean(RabbitTemplate.class);
        LOGGER.info("Sending bank account information....");
        rabbitTemplate.convertAndSend(new BankAccount(100, "Savings 
        Account"));
        rabbitTemplate.convertAndSend(new BankAccount(101, "Current 
        Account"));
        
    }

}

当我们运行上述代码时,生产者将使用 convertAndSend() 方法发送 BankAccount 的两个对象,并显示以下输出:

2018-05-13 19:46:58 INFO Producer:17 - Sending bank account information....
2018-05-13 19:46:58 INFO Consumer:17 - Received Message: {"accountId":100,"accountType":"Savings Account"}
2018-05-13 19:46:58 INFO Consumer:17 - Received Message: {"accountId":101,"accountType":"Current Account"}

Maximizing throughput with RabbitMQ

以下是与最大消息传递吞吐量相关的最佳性能的配置选项:

  • Keep your queues short
  • Avoid the use of lazy queues
  • Avoid persistent messages
  • Create multiple queues and consumers
  • Divide queues into different cores
  • Disable acknowledgment
  • Disable unnecessary plugins

Performance and scalability with RabbitMQ

为了使用 RabbitMQ 实现最佳性能,我们应该考虑许多要点:

  • Payload message size
  • Exchange management
  • Configure prefetch properly
  • RabbitMQ HiPE
  • Clustering of nodes
  • Disable RabbitMQ statistics
  • Update the RabbitMQ libraries

Summary

在本章中,我们了解了消息传递的概念。我们还体验了使用消息系统的优势。我们了解了 AMQP。通过了解 JMS API 问题,我们了解了 AMQP 的需求。我们还看到了 AMQP 和 JMS API 之间的区别。我们了解了与 AMQP 相关的交换、队列和绑定。我们还介绍了 RabbitMQ 的设置方面以及与 Spring 应用程序相关的不同配置。

在下一章中,我们将了解 Java 线程的核心概念,然后我们将转到 java.util.concurrent 包提供的高级线程支持。我们还将介绍 java.util.concurrent 的各种类和接口。我们将学习如何使用 Java 线程池来提高性能。我们将介绍 Spring 框架提供的有用功能,例如任务执行、调度和异步运行。最后,我们将研究带有线程的 Spring 事务管理和线程的各种最佳编程实践。