vlambda博客
学习文章列表

读书笔记《building-applications-with-spring-5-and-kotlin》使用项目反应堆

Using Project Reactor

在本章中,我们将演示如何将 Spring 与 Project Reactor 一起使用。 Project Reactor 是一个为我们提供基于事件的架构的项目。使用 Project Reactor,我们可以异步处理大量并发服务请求。 Project Reactor 被称为延迟的守护者,并提供非阻塞和背压就绪的嵌入式解决方案。

在这里,我们将解释以下内容:

  • Why you should use Project Reactor
  • What Project Reactor is
  • How to make RESTful calls reactive

Why use Project Reactor?

我们的项目支持 Project Reactor 有很多充分的理由。如果您计划开发严肃的企业软件,Project Reactor 很可能会为一些最常见的挑战提供解决方案。让我们简要介绍一下它的电源特性:

  • It is fully non-blocking
  • It offers reactive API such as Flux [N] and Mono [0|1], each implementing Reactive Extension (http://reactivex.io/)
  • It is suited to the microservices architecture
  • It supports backpressure-ready network engines
  • It is efficient at passing messages

在本章的下一部分中,我们将进一步解释 Project Reactor 以及它如何与 Spring 一起使用。我们将通过扩展我们的代码库来支持 Project Reactor 来演示它的使用。

What is Project Reactor?

Project Reactor 是一个项目,分为几个子项目。每个都有不同的目的,它们结合起来,成为现代应用程序开发的一流工具。 Project Reactor 由以下组件组成:

  • Reactor Core: Core components
  • Reactor Test: Collection of test utilities
  • Reactor Adapter: For adapting to (or from) other reactive libraries
  • Reactor Netty: For developing HTTP, TCP, UDP client/servers with Netty
  • Reactor Extra: Additional operators for Flux
  • Reactor Kafka: Reactive bridge to Apache Kafka
  • Reactor RabbitMQ: Reactive bridge to RabbitMQ
  • Reactor Core .NET: Reactive Streams foundation for .NET
  • Reactor Core JS: Reactive Streams foundation for JavaScript

在下一节中,我们将调整我们的 API 以使用 Project Reactor 并改进我们现有的代码库。不幸的是,我们无法涵盖所有​​ Project Reactor 子项目,但我们将让您了解如何使用 Project Reactor。有关 Project Reactor 的更多信息,您可以查看官方文档,因为该项目有据可查!

Using Project Reactor

是时候更新我们的 REST API 了,让它成为一个事件驱动的应用程序。您会看到我们所做的更改并不难应用。我们将首先更新我们的依赖项,以便可以访问 Project Reactor 类。打开您的 API 应用程序 build.gradle 配置并扩展它:

...  
dependencies { 
    ...  
    compile 'io.projectreactor:reactor-bus:2.0.8.RELEASE' 
    ...  
} 
...  

在这里,我们介绍了对 Project Reactor 总线的支持。我们将使用它在某些特定情况下触发动作。如果系统中的 Notes 或 TODO 总数达到临界水平,我们将向系统管理员发送电子邮件。这个例子很简单。一个很好的例子是当存储达到大量已售商品时触发操作。另一个可能是未订阅用户的数量增加超过一定百分比,依此类推。

我们将定义一些简单的实体来表示通知。在 data 包下使用数据类成员 NotesCountNotification 创建一个名为 reactor 的包:

package com.journaler.api.reactor 
 
data class NotesCountNotification(val notesCount: Int) 
 
Create almost the same class for TODO notification data: 
package com.journaler.api.reactor 
 
data class TodosCountNotification(val todosCount: Int) 

我们需要两个数据类的服务抽象。在reactor包下,创建成员接口NotificationService

package com.journaler.api.reactor 
 
interface NotificationService<in T> { 
 
    fun notify(notification: T) 
 
} 

实体计数的注释和 TODO 通知服务接口如下所示:

  • NotesCountNotificationService:
package com.journaler.api.reactor 
 
import org.springframework.stereotype.Service 
 

interface NotesCountNotificationService : NotificationService<NotesCountNotification>
  • TodosCountNotificationService:
 
package com.journaler.api.reactor 
 
import org.springframework.stereotype.Service 
 

interface TodosCountNotificationService : NotificationService<TodosCountNotification> 

对于每个主界面,我们必须有正确的实现。由于我们将发送电子邮件,因此我们必须添加 Spring 电子邮件依赖项。使用以下内容扩展 build.gradle 配置:

...  
dependencies { 
    ...  
    compile 'org.springframework.boot:spring-boot-starter-mail' 
    ...  
} 

为了能够发送电子邮件,我们必须满足一些要求。我们必须准备好几个课程,以便可以发送电子邮件。我们将一个一个地创建它们,但首先创建一个名为 mail 的新包,然后创建第一个名为 MailMessage 的类:

package com.journaler.api.mail
import org.hibernate.validator.constraints.Email
import org.jetbrains.annotations.NotNull
data class MailMessage
(
@Email
@NotNull
val to: String,
val subject: String,
val text: String
)

我们定义了基本实体来表示我们将发送的消息。我们希望能够发送没有任何附件的电子邮件。为此,我们将创建一个接口来表示该功能,称为 EmailService

package com.journaler.api.mail 
interface MailService { 
    fun sendMessage(message: MailMessage) 
} 

实现应该如下所示(MailServiceImpl.kt):

package com.journaler.api.mail 
 
import org.springframework.beans.factory.annotation.Autowired 
import org.springframework.mail.SimpleMailMessage 
import org.springframework.mail.javamail.JavaMailSender 
import org.springframework.stereotype.Component 
 
@Component 
class MailServiceImpl : MailService { 
 
    @Autowired 
    lateinit var sender: JavaMailSender
 
    override fun sendMessage(message: MailMessage) { 
        val toSend = SimpleMailMessage() 
        toSend.to = arrayOf(message.to) 
        toSend.subject = message.subject 
        toSend.text = message.text 
        sender.send(toSend) 
    } 
 
} 

正如您在前面的代码中看到的,我们定义了 Spring 组件并为 JavaMailSender 实现注入了一个依赖项:JavaMailSenderImpl。该类负责执行邮件发送操作。我们几乎可以发送电子邮件了。还有最后一件事要做。我们必须提供电子邮件服务器配置。从我们的配置 Git 存储库中打开 journaler.configuration 并提供参数,如下例所示:

... 
spring.mail.host=smtp.gmail.com 
spring.mail.port=587 
spring.mail.username=username 
spring.mail.password=password 
spring.mail.properties.mail.smtp.auth=true 
spring.mail.properties.mail.smtp.starttls.enable=true 

在这里,您将输入您的真实 Gmail 地址以及您的用户名和密码,而不是我们在示例中提供的那些!

现在为 NotesCountNotificationService 创建实现。将类命名为 NotesCountNotificationServiceImpl 和以下实现:

package com.journaler.api.reactor 
 
import com.journaler.api.mail.MailMessage 
import com.journaler.api.mail.MailService 
import org.springframework.beans.factory.annotation.Autowired 
import org.springframework.stereotype.Service 
 
@Service 
class NotesCountNotificationServiceImpl : NotesCountNotificationService { 
 
    @Autowired 
    private lateinit var mailService: MailService 
 
    override fun notify(notification: NotesCountNotification) { 
        val to = "[email protected]" 
        val subject = "Notes count notification" 
        val text = "Notes reached ${notification.notesCount} count." 
        val message = MailMessage(to, subject, text) 
        mailService.sendMessage(message) 
    } 
 
} 

我们还需要 TodosCountNotificationService 的实现:

package com.journaler.api.reactor 
 
import com.journaler.api.mail.MailMessage 
import com.journaler.api.mail.MailService 
import org.springframework.beans.factory.annotation.Autowired 
import org.springframework.stereotype.Service 
 
@Service 
class TodosCountNotificationServiceImpl : TodosCountNotificationService { 
 
    @Autowired 
    private lateinit var mailService: MailService 
 
    override fun notify(notification: TodosCountNotification) { 
        val to = "[email protected]" 
        val subject = "Notes count notification" 
        val text = "Todos reached ${notification.todosCount} count." 
        val message = MailMessage(to, subject, text) 
        mailService.sendMessage(message) 
    } 
 
} 

该实现与 Note 实体执行相同的操作,方法是发送有关系统中实体计数的电子邮件消息。对于这两种实现,将 [email protected]to 值替换为您想要的目标电子邮件地址。

为了能够将通知映射到 Project React 事件总线,我们必须定义消费者类。在 reactor 包中创建一个名为 NotificationConsumer 的新接口:

package com.journaler.api.reactor 
 
import reactor.bus.Event 
import reactor.fn.Consumer 
 
interface NotificationConsumer<T> : Consumer<Event<T>> 

我们将有两个实现。第一个是 NotesCountNotificationConsumer

package com.journaler.api.reactor 
 
import org.springframework.beans.factory.annotation.Autowired 
import org.springframework.stereotype.Service 
import reactor.bus.Event 
 
@Service 
class NotesCountNotificationConsumer : NotificationConsumer<NotesCountNotification> { 
 
    @Autowired 
    lateinit var service: NotesCountNotificationService
 
    override fun accept(e: Event<NotesCountNotification>?) { 
        val data = e?.data 
        data?.let { 
            service.notify(data) 
        } 
    } 
 
} 

第二个将如下所示:

package com.journaler.api.reactor 
 
import org.springframework.beans.factory.annotation.Autowired 
import org.springframework.stereotype.Service 
import reactor.bus.Event 
 
@Service 
class TodosCountNotificationConsumer : NotificationConsumer<TodosCountNotification> { 
 
    @Autowired 
    lateinit var service: TodosCountNotificationService 
 
    override fun accept(e: Event<TodosCountNotification>?) { 
        val data = e?.data 
        data?.let { 
            service.notify(data) 
        } 
    } 
 
} 

最后,我们将更改我们的代码,以便我们能够传输事件。打开 NoteService 并更新其实现:

package com.journaler.api.service 
 
import com.journaler.api.data.Note 
import com.journaler.api.data.NoteDTO 
import com.journaler.api.reactor.NotesCountNotification 
import com.journaler.api.repository.NoteRepository 
import org.springframework.beans.factory.annotation.Autowired 
import org.springframework.stereotype.Service 
import reactor.bus.Event 
import reactor.bus.EventBus 
import java.util.* 
 
@Service 
class NoteService { 
    ...  
    @Autowired 
    private lateinit var eventBus: EventBus 
    ...  
    fun insertNote(note: NoteDTO): NoteDTO { 
        val result = NoteDTO( 
                repository.save( 
                        Note( 
                                title = note.title, 
                                message = note.message, 
                                location = note.location 
                        ) 
                ) 
        ) 
        val count = getNotes().count() 
        if (count > 10) { 
            val notification = NotesCountNotification(count) 
            eventBus.notify("notesCountNotificationConsumer", Event.wrap(notification)) 
        } 
        return result 
    } 
    ...  
} 

TodoService 类做同样的改变:

package com.journaler.api.service 
 
import com.journaler.api.data.Todo 
import com.journaler.api.data.TodoDTO 
import com.journaler.api.reactor.TodosCountNotification 
import com.journaler.api.repository.TodoRepository 
import org.springframework.beans.factory.annotation.Autowired 
import org.springframework.stereotype.Service 
import reactor.bus.Event 
import reactor.bus.EventBus 
import java.util.* 
 
 
@Service
class TodoService { 
    ...  
    @Autowired 
    private lateinit var eventBus: EventBus 
    ...  
    fun insertTodo(todo: TodoDTO): TodoDTO { 
        val result = TodoDTO( 
                repository.save( 
                        Todo( 
                                title = todo.title, 
                                message = todo.message, 
                                location = todo.location, 
                                schedule = todo.schedule 
 
                        ) 
                ) 
        ) 
        val count = getTodos().count() 
        if (count > 10) { 
            val notification = TodosCountNotification(count) 
            eventBus.notify("todosCountNotificationConsumer", Event.wrap(notification)) 
        } 
        return result 
    } 
    ... 
} 

每次插入新实体时,我们都会检查计数,如果总计数大于 10,我们将传输事件。我们使用 EventBus 类来发送包装通知。由于我们仍然没有一个 EventBus 候选注入,我们必须提供一个。此外,在这里我们将把所有部分连接在一起,使我们的代码准备好执行。

打开 ApiApplication 应用程序类并扩展它:

package com.journaler.api 
 
import com.journaler.api.reactor.NotesCountNotificationConsumer 
import com.journaler.api.reactor.TodosCountNotificationConsumer 
import org.springframework.beans.factory.annotation.Autowired 
import org.springframework.boot.CommandLineRunner 
import org.springframework.boot.SpringApplication 
import org.springframework.boot.autoconfigure.SpringBootApplication 
import org.springframework.cloud.netflix.eureka.EnableEurekaClient 
import org.springframework.context.annotation.Bean 
import reactor.Environment 
import reactor.bus.EventBus 
import reactor.bus.selector.Selectors.`$` 
 
 
@SpringBootApplication 
@EnableEurekaClient 
class ApiApplication : CommandLineRunner { 
 
    @Autowired 
    private lateinit var eventBus: EventBus 
 
    @Autowired 
    private lateinit var notesCountNotificationConsumer: NotesCountNotificationConsumer 
 
    @Autowired 
    private lateinit var todosCountNotificationConsumer: TodosCountNotificationConsumer 
 
    @Bean 
    fun env() = Environment.initializeIfEmpty().assignErrorJournal() 
 
    @Bean 
    fun createEventBus(env: Environment) = EventBus.create(env, Environment.THREAD_POOL) 
 
    override fun run(vararg args: String) { 
        eventBus.on(`$`("notesCountNotificationConsumer"), notesCountNotificationConsumer) 
        eventBus.on(`$`("todosCountNotificationConsumer"), todosCountNotificationConsumer) 
    } 
} 
 
fun main(args: Array<String>) { 
    SpringApplication.run(ApiApplication::class.java, *args) 
} 

我们刚刚做了什么?首先,我们使用以下代码扩展了 CommandLineRunner

package org.springframework.boot; 
 
import org.springframework.core.Ordered; 
import org.springframework.core.annotation.Order; 
 
public interface CommandLineRunner { 
 
   /** 
    * Callback used to run the bean. 
    * @param args incoming main method arguments 
    * @throws Exception on error 
    */ 
   void run(String... args) throws Exception; 
 
} 

前面的接口用于指示 bean 在包含在 Spring 应用程序中时应该运行。多个 CommandLineRunner bean 可以在同一个应用程序上下文中定义,并且可以使用 Ordered 接口或 @Ordered 注释进行排序。

使用以下代码块,我们初始化 EventBus

@Autowired 
private lateinit var eventBus: EventBus 
 
@Autowired 
private lateinit var notesCountNotificationConsumer: NotesCountNotificationConsumer 
 
@Autowired 
private lateinit var todosCountNotificationConsumer: TodosCountNotificationConsumer 
 
@Bean 
fun env() = Environment.initializeIfEmpty().assignErrorJournal() 
 
@Bean 
fun createEventBus(env: Environment) = EventBus.create(env, Environment.THREAD_POOL) 
 
override fun run(vararg args: String) { 
    eventBus.on(`$`("notesCountNotificationConsumer"), notesCountNotificationConsumer) 
    eventBus.on(`$`("todosCountNotificationConsumer"), todosCountNotificationConsumer) 
} 

在这里,EventBus 使用环境中的默认线程池进行初始化。我们还映射了我们的消费者,现在我们准备好尝试我们的 REST API。启动你的本地 Redis 服务器,并一一运行所有服务:配置、发现、网关,最后是 Journaler API。从 Web 浏览器登录到您在配置中定义的 Gmail 帐户。打开 Postman 并插入一些笔记和 TODO。计数超过 10 项后,您应该会收到一封电子邮件。

可能是您在使用 Gmail SMTP 服务器时遇到问题,或者您根本无法访问 SMTP 服务器。在这种情况下,您需要更新您的实现以仅记录通知而不是尝试发送电子邮件。

打开 NotesCountNotificationServiceImpl 并更新它:

package com.journaler.api.reactor 
 
import org.springframework.stereotype.Service 
 
@Service 
class NotesCountNotificationServiceImpl : NotesCountNotificationService { 
 
    override fun notify(notification: NotesCountNotification) { 
        val text = "Notes reached ${notification.notesCount} count." 
        println("NOTIFICATION >>>>> $text") 
    } 
 
} 

TodosCountNotificationServiceImpl 执行相同操作:

package com.journaler.api.reactor 
 
import org.springframework.stereotype.Service 
 
@Service 
class TodosCountNotificationServiceImpl : TodosCountNotificationService { 
 
    override fun notify(notification: TodosCountNotification) { 
        val text = "Todos reached ${notification.todosCount} count." 
        println("NOTIFICATION >>>>> $text") 
    } 
 
} 

再次重新运行所有服务并重试。您会注意到类似于以下的日志输出:

读书笔记《building-applications-with-spring-5-and-kotlin》使用项目反应堆

Summary

使事物具有响应性是许多现代应用程序的常见要求。在本章中,我们演示了如何轻松快速地为您的服务提供此功能。考虑使用事件总线可能带来的好处,并尝试扩展现有的 API 服务应用程序以支持更多事件。最后,准备好讨论 Spring 实践,因为这将是我们下一章的重点。