vlambda博客
学习文章列表

读书笔记《developing-java-applications-with-spring-and-spring-boot-ebook》被动式Web客户端

Chapter 5. Reactive Web Clients

到目前为止,我们已经创建了整个项目基础架构来使用 Twitter 流。我们创建了一个存储跟踪的主题标签的应用程序。

在本章中,我们将学习如何使用 Spring Reactive Web Client 并使用响应式范式进行 HTTP 调用,这是 Spring 5.0 最受期待的特性之一。我们将异步调用 Twitter REST API,并使用 Project Reactor 提供一种处理流的优雅方式。

我们将介绍 RabbitMQ 的 Spring 消息传递。我们将使用 Spring Messaging API 与 RabbitMQ 代理进行交互,并了解 Spring 如何帮助开发人员为此使用高级抽象。

在本章的最后,我们将打包应用程序并创建一个 docker 镜像。

在本章中,我们将了解:

  • Reactive web clients
  • Spring Messaging for RabbitMQ
  • RabbitMQ Docker usage
  • Spring Actuator

Creating the Twitter Gathering project


我们了解了如何 使用令人惊叹的 Spring Initializr 创建 Spring Boot 项目。在本章中,我们将以不同的方式创建项目,向您展示创建 Spring Boot 项目的另一种方式。

在任意目录中创建 tweet-gathering 文件夹。我们可以使用以下命令:

mkdir tweet-gathering

然后,我们可以访问之前创建的文件夹并复制位于 GitHub 的 pom.xml文件: https://github.com/PacktPublishing/Spring-5.0-By-Example /blob/master/Chapter05/tweet-gathering/pom.xml

在 IDE 上打开 pom.xml

这里有一些有趣的依赖关系。   jackson-module-kotlin 有助于在 Kotlin 语言中使用 JSON。另一个有趣的依赖是 kotlin-stdlib,它在我们的类路径中提供了 Kotlin 标准库。

在插件部分,最重要的插件是 kotlin-maven-plugin,它允许和配置我们的 Kotlin 代码的构建。

在下一节中,我们将创建一个文件夹结构来启动代码。

我们开始做吧。

Project structure

项目结构遵循 maven suggested 模式。我们将使用 Kotlin 语言对项目进行编码,然后我们将创建一个 kotlin 文件夹来存储我们的代码。

我们在之前创建的 pom.xml 上进行了配置,所以它可以正常工作。让我们看一下项目的正确文件夹结构:

读书笔记《developing-java-applications-with-spring-and-spring-boot-ebook》被动式Web客户端

正如我们所见,基础包是 springfive.twittergathering 包。然后,我们将尽快开始在这个包中创建子包。

让我们为微服务创建 infrastructure

Starting the RabbitMQ server with Docker


我们可以使用 Docker 来启动 RabbitMQ 服务器。我们不想在我们的开发人员机器上安装服务器,因为它会产生库冲突和大量文件。让我们了解如何在 Docker 容器中启动 RabbitMQ。

让我们在接下来的几节中这样做。

Pulling the RabbitMQ image from Docker Hub

我们需要从 Docker 镜像indexterm"> 集线器。我们将使用官方存储库中的图像,因为它更安全可靠。

要获取图像,我们需要使用以下命令:

docker pull rabbitmq:3.7.0-management-alpine

等待下载结束,然后我们可以继续下一部分。在下一节中,我们将学习如何设置 RabbitMQ 服务器。

Starting the RabbitMQ server

要启动 RabbitMQ 服务器,我们将运行 Docker 命令。有一些我们需要注意的注意事项;我们将在之前创建的 Twitter Docker 网络上运行此容器,但我们将在主机上公开一些端口,因为它可以更轻松地与代理交互。

此外,我们将使用管理图像,因为它提供了一个页面,使我们能够在类似于控制面板的东西上管理和查看 RabbitMQ 信息。

让我们运行:

docker run -d --name rabbitmq --net twitter -p 5672:5672 -p 15672:15672 rabbitmq:3.7.0-management-alpine

等待几秒钟,让 RabbitMQ 建立连接,然后我们可以连接到管理页面。为此,请转到 http://localhost:15672 并登录系统。默认用户是guest,密码也是guest。控制面板如下所示:

读书笔记《developing-java-applications-with-spring-and-spring-boot-ebook》被动式Web客户端

面板上有很多有趣的信息 但是现在,我们将探索渠道和一些有趣的部分。

惊人的。我们的 RabbitMQ 服务器已启动并正在运行。我们将很快使用基础设施。

Spring Messaging AMQP


该项目支持基于AMQP的消息传递解决方案。有一个高级 API 可以与所需的代理进行交互。这些交互可以发送和接收来自代理的消息。 

与其他 Spring 项目一样,这些设施由 template 类提供,这些类公开了代理提供并由 Spring 模块实现的核心功能.

这个项目有两个部分: spring-amqp 是基础抽象, spring-rabbit 是 RabbitMQ 的实现兔MQ。我们将使用 spring-rabbit 因为我们使用的是 RabbitMQ 代理。

Adding Spring AMQP in our pom.xml

让我们将 spring-amqp jar 添加到 我们的 项目中。 < code class="literal">spring-amqp 有一个starter依赖,它为我们配置了一些常用的东西,比如 ConnectionFactoryRabbitTemplate,所以我们将使用它。为了添加这个依赖,我们将配置我们的 pom.xml 如下:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

下一步是配置连接;我们将使用 application.yaml 文件,因为我们使用的是启动器。在下一节中,我们将进行配置。

Integrating Spring Application and RabbitMQ

我们在项目中配置了 spring-amqp 依赖项。现在,是时候正确配置 RabbitMQ 连接了。我们将使用 RabbitMQTemplate 向代理发送消息;这有一些转换器可以帮助我们将 domain 模型转换为 JSON,反之亦然。

让我们配置我们的 RabbitMQ 连接。配置应该在 application.yaml 文件中,并且应该如下所示:

spring:
  rabbitmq:
    host: localhost
username: guest
password: guest
port: 5672

正如我们所见,一些 Spring 配置与其他配置非常相似,风格相同,yaml 中的节点是技术名称,后跟几个属性。

我们使用 RabbitMQ 的默认凭据。主机和端口与 RabbitMQ Broker 地址相关。配置非常简单,但为我们做了很多事情,例如 ConnectionFactory

Understanding RabbitMQ exchanges, queues, and bindings

我们正在用 RabbitMQ 做一些有趣的 事情。我们成功配置了连接。还有一些其他的事情我们还没有完成,例如配置交换、队列和绑定,但在我们这样做之前,让我们对这些术语有更多的了解。

Exchanges

交换是发送消息的 RabbitMQ 实体。我们可以类比一条流水的河流;河流是消息的过程。我们将在以下部分中了解四种不同类型的交换。

Direct exchanges

直接交换允许路由消息基于 在路由键上。该名称是不言自明的,它允许将消息直接发送给指定的客户,该客户是收听交换的人。请记住,它使用路由键作为参数将消息路由给客户。

Fanout exchanges

扇出交换路由所有 queues 的消息,与路由键无关。所有绑定的队列都将收到发送到扇出交换的消息。它们可用于具有主题行为或分布式列表。

Topic exchanges

topic 交换类似于直接交换,但与直接交换相比,主题交换使我们能够使用模式匹配,这只允许准确的路由键。我们将在我们的项目中使用这个交换。

Header exchanges

标头交换是不言自明的,其行为类似于 topic 交换,但它不使用路由键,而是使用标头属性以匹配正确的队列。

Queues

队列 是 exchanges 将写入有关路由键的消息的缓冲区。队列是消费者获取发布到交易所的消息的地方。消息根据交换类型被路由到队列。

Bindings

绑定可以被认为是 exchanges 和队列之间的链接。我们可以说它是一种交通警察,它根据配置指示消息应该重定向到哪里,在这种情况下是链接。

Configuring exchanges, queues, and bindings on Spring AMQP

Spring AMQP 项目有abstractions 对于所有 RabbitMQ 实体列出 之前,我们需要 将其配置为与代理交互。正如我们在 other 项目中所做的那样,我们需要一个 @Configuration 类,这将为 Spring 容器声明 bean。

Declaring exchanges, queues, and bindings in yaml

我们需要配置实体名称来指示framework 连接 broker 实体。我们将使用 application.yaml 文件来存储这些名称,因为它更容易< /a> 是 正确的 方式来存储application 基础设施数据。

实体名称部分应如下所示:

queue:
  twitter: twitter-stream
exchange:
  twitter: twitter-exchange
routing_key:
  track: track.*

属性是不言自明的,exchange 节点具有交换的名称,queue 节点具有队列名称,并且最后, routing_key 节点具有路由参数。

惊人的。属性已配置,现在我们将创建我们的 @Configuration 类。让我们在下一节中这样做。我们几乎准备好与 RabbitMQ 代理进行交互了。

Declaring Spring beans for RabbitMQ

现在,让我们创建我们的配置类。这个类非常简单,正如我们将在 Spring 抽象中看到的那样,它们也很容易理解,特别是因为类名暗示了 RabbitMQ 个实体。

让我们创建我们的类:

package springfive.twittergathering.infra.rabbitmq

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.KotlinModule
import org.springframework.amqp.core.Binding
import org.springframework.amqp.core.BindingBuilder
import org.springframework.amqp.core.Queue
import org.springframework.amqp.core.TopicExchange
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

@Configuration
open class RabbitMQConfiguration(@Value("${queue.twitter}") private val queue:String,
                                 @Value("${exchange.twitter}") private val        
exchange:String,
                                 @Value("${routing_key.track}") private val routingKey:String){

@Bean
    open fun queue():Queue{
return Queue(this.queue,false)
    }

@Bean
    open fun exchange():TopicExchange{
return TopicExchange(this.exchange)
    }

@Bean
    open fun binding(queue: Queue, exchange: TopicExchange): Binding {
return BindingBuilder.bind(queue).to(exchange).with(this.routingKey)
    }

@Bean
    open fun converter(): Jackson2JsonMessageConverter {
return Jackson2JsonMessageConverter(ObjectMapper().registerModule(KotlinModule()))
    }

}

这里有一些有趣的事情需要注意。在 RabbitMQConfiguration构造函数中,我们注入了application.yaml文件中配置的值来命名实体。之后,我们开始为容器配置 Spring bean 以允许容器将它们注入 Spring 管理的类中。这里的关键点是,如果它们在 RabbitMQ 代理中不存在,Spring 将创建它们。谢谢,Spring,我们对此表示赞赏并喜欢它的帮助。 

我们可以看到 DSL 声明 Binding,它使开发人员的生活更轻松,并防止代码中的错误。

在课程的最后一部分,我们声明了 Jackson2JsonMessageConverter。这些转换器用于转换 JSON 中的域模型,反之亦然。它使我们能够在 Listener 上接收域对象,而不是字节或字符串数​​组。在 Producers 中可以使用相同的行为,我们能够发送域对象而不是 JSON。 

我们需要将 ObjectMapper 提供给 Jackson2JsonMessageConverter,由于 Kotlin 处理数据类的方式,我们使用了 Kotlin 模块,没有无参数构造函数。

很棒的工作!我们的基础设施已完全配置。让我们现在对生产者和消费者进行编码!

 

Consuming messages with Spring Messaging

Spring AMQP 提供了 @RabbitListener annotation;它将为所需队列配置订阅者,它删除了很多 infrastructure 代码,例如连接到 RabbitListenerConnectionFactory,并以编程方式创建消费者。它使队列 consumers 的创建非常容易。

 spring-boot-starter-amqp 为我们提供了一些自动配置。当我们使用这个模块时,Spring会 自动 为我们创建一个 RabbitListenerConnectionFactory 并配置 Spring转换器自动将JSON转换为域类。

很简单。 Spring AMQP 确实为开发人员提供了超高级的抽象。

让我们看一个很快将在我们的应用程序中使用的示例:

@RabbitListener(queues = ["twitter-track-hashtag"])
fun receive(hashTag:TrackedHashTag) {
...
}

一块蛋糕。该代码非常容易理解,并且可以pay 只关注业务规则。维护基础设施并不是一件好事,因为它不会为业务带来真正的价值,因为它只是一项技术。 Spring 尝试将整个基础架构代码抽象化,以帮助开发人员编写业务代码。它是 Spring Framework 提供的真实资产。

谢谢,春天的团队。

Producing messages with Spring Messaging

spring-amqp 模块提供了一个 RabbitTemplate 类,它抽象了高级 RabbitMQ 驱动程序类。它提高了开发人员的性能 并使应用程序 没有错误,因为 Spring 模块是一组经过良好测试的代码。我们将使用 convertAndSend() 函数,该函数允许传递 exchange、路由键和消息对象作为参数。请记住,此函数使用 Spring 转换器将我们的模型类转换为 JSON 字符串。

convertAndSend() 有很多重载函数,根据用例,其他函数可能更合适。我们将使用我们之前看到的简单的。

让我们看看将消息发送到代理的代码:

this.rabbitTemplate.convertAndSend("twitter-exchange","track.${hashTag.queue}",it)

好的。第一个参数是Exchange名称,第二个参数是RoutingKey。最后,我们有了消息对象,它将被转换为 JSON 字符串。

我们很快就会看到代码在起作用。

Enabling Twitter in our application


在本节中,我们将在我们的 Twitter Gathering 应用程序上启用 Twitter API。此应用程序应根据用户指定的查询获取推文。这个查询是在我们在上一章创建的微服务上注册的。

当用户调用API注册TrackedHashTag时,微服务会将TrackedHashTag存入Redis数据库,并通过兔MQ。然后,该项目将开始基于此收集推文。这就是数据流。在下一章中,我们将做一个响应式流并通过我们的响应式 API 分发推文。将会很精彩。

但是,现在,我们需要配置 Twitter 凭据;我们将使用 Spring bean 来做到这一点——让我们实现它。

 

Producing Twitter credentials

我们将使用 @Configuration 类来提供我们的 Twitter 配置对象。 @Configuration 类非常适合提供基础设施 bean,如果我们没有所需模块的启动项目。

此外,我们将使用 application.yaml 文件来存储 Twitter 凭据。这种配置不应保存在源代码存储库中,因为它是敏感数据,不应与他人共享。然后,Spring 框架使我们能够在 yaml 文件中声明属性,并配置环境变量以在运行时填充这些属性。这是将敏感数据排除在源代码存储库之外的绝佳方式。

Configuring Twitter credentials in application.yaml

要开始在我们的应用程序中配置 Twitter API,我们必须提供凭据。我们将为此使用 yaml 文件。让我们在 application.yaml 中添加凭据:

twitter:
  consumer-key: ${consumer-key}
consumer-secret: ${consumer-secret}
access-token: ${access-token}
access-token-secret: ${access-token-secret}

十分简单。属性已被声明,然后我们使用 $ 来指示 Spring Framework 该值将作为环境变量接收。请记住,我们在上一章中配置了 Twitter 帐户。

Modelling objects to represent Twitter settings

我们必须为我们的应用程序创建抽象和惊人的数据模型。这将创建一些模型,使开发人员的生活更容易理解和编码。让我们创建我们的 Twitter 设置模型。

 

Twittertoken

此类表示之前在 Twitter 中配置的应用程序令牌。 token 只能用于应用认证。我们的模型应该是这样的:

data class TwitterToken(val accessToken: String,val accessTokenSecret: String)

我喜欢 Kotlin 声明数据类的方式——完全不可变且没有样板。

TwitterAppSettings

TwitterAppSettings 代表消费者密钥和 consumer 秘密。从 Twitter 的角度来看,它是我们应用程序的一种身份。我们的模型非常简单,必须如下所示:

data class TwitterAppSettings(val consumerKey: String,val consumerSecret: String)

干得好,我们的模型准备好了。是时候为 Spring Container 生成对象了。我们将在下一节中这样做。

Declaring Twitter credentials for the Spring container

让我们生成我们的 Twitter 配置对象。作为我们一直使用的pattern,我们将使用@Configuration为此上课。该类应如下所示:

package springfive.twittergathering.infra.twitter

import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration


@Configuration
open class TwitterConfiguration(@Value("${twitter.consumer-key}") private val consumerKey: String,
                                @Value("${twitter.consumer-secret}") private val consumerSecret: String,
                                @Value("${twitter.access-token}") private val accessToken: String,
                                @Value("${twitter.access-token-secret}") private val accessTokenSecret: String) {

@Bean
    open fun twitterAppSettings(): TwitterAppSettings {
return TwitterAppSettings(consumerKey, consumerSecret)
    }

@Bean
    open fun twitterToken(): TwitterToken {
return TwitterToken(accessToken, accessTokenSecret)
    }

}

非常简单,并且是一种声明 bean 的 Spring 方式。我们正在逐步改进我们使用 Spring 的方式。做得好!

现在,我们完成了 Twitter 配置。我们将使用 Spring WebFlux 中的 WebClient 使用 Twitter API,它支持响应式编程范式。让我们在运行代码之前了解一些东西。

Spring reactive web clients


这是 Spring Framework 5 中添加的一个相当新的功能。它使我们能够使用响应式与 HTTP 服务进行交互范例。

它不是 Spring 提供的 RestTemplate 的替代品,但是,它是对响应式应用程序的补充。不用担心,RestTemplate 是在传统应用程序中与 HTTP 服务交互的优秀且经过测试的实现。

此外,WebClient 实现支持 text/event-stream mime 类型,可以让我们使用服务器事件。

Producing WebClient in a Spring Way

在我们开始调用 Twitter API 之前,我们想在 SpringWebClient 的实例"> 方式。这意味着我们正在寻找一种方法来注入实例,使用依赖注入模式。

 

为此,我们可以使用 @Configuration 注释并创建一个WebClient 实例,使用 @Bean 注释来声明 Spring 容器的 bean。让我们这样做:

package springfive.twittergathering.infra.web

import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.web.reactive.function.client.WebClient

@Configuration
open class WebClientProducer {

@Bean
    open fun webClient(): WebClient? {
return WebClient.create()
    }

}

这个类中有几个已知的注解;这是一种以 Spring 方式声明 bean 实例的非常标准的方法。它可以在其他 Spring 管理的类中注入 WebClient 的实例。

Creating the models to gather Tweets

如果我们想要异步 并且响应式地使用Twitter API,那么我们应该创建API 客户端。在我们编写客户端代码之前,我们需要根据我们的要求创建用于建模的类。

我们不需要所有推文的属性。我们期望以下属性:

  • id
  • text
  • createdAt
  • user

然后,我们将根据列出的属性对我们的类进行建模。

 

让我们从用户属性开始。该属性是一个 JSON 属性,我们将为此创建一个单独的类。该类应如下所示:

@JsonIgnoreProperties(ignoreUnknown = true)
data class TwitterUser(val id:String,val name:String)

我们使用了 Kotlin 数据类,它非常适合我们的用例,我们希望将其用作数据容器。此外,我们需要输入 @JsonIgnoreProperties(ignoreUnknown = true),因为该注释 指示Spring转换器在JSON响应中缺少该属性时忽略该属性。这是这部分代码的重要部分。

我们创建了 TwitterUser 类,它代表创建推文的用户。现在,我们将创建代表 Tweet 的 Tweet 类。让我们创建我们的类:

@JsonIgnoreProperties(ignoreUnknown = true)
data class Tweet(val id:String, val text:String, @JsonProperty("created_at")val createdAt:String, val user:TwitterUser)

对我们来说,有一些共同的东西,还有一些是新的。  @JsonProperty 允许开发者自定义在 JSON 中具有不同属性名称的类的属性名称;这对于 Java 开发人员来说很常见,因为他们通常使用  CamelCase 作为命名属性的方式,并且在 JSON 表示法中,人们通常使用 SnakeCase。这个注解可以帮助我们解决编程语言和 JSON 之间的这种不匹配问题。 

Note

我们可以在这里找到关于蛇盒的更详细解释: https://en. wikipedia.org/wiki/Snake_case。此外,我们可以在这里找到骆驼案的完整解释:https://en.wikipedia .org/wiki/Camel_case

好的。我们的 API 对象已准备就绪。有了这些对象,我们就可以与 API 交互。我们将创建一个服务来收集推文。我们将在下一节中这样做。 

Authentication with Twitter APIs

准备好对象后,我们需要创建一个类来帮助我们处理 Twitter 身份验证。我们将使用 Twitter Application Only Auth 身份验证模型。这种身份验证应该用于后端应用程序。

 

使用这种身份验证的应用程序可以:

  • Pull user timelines
  • Access friends and followers of any account
  • Access lists and resources
  • Search in Tweets
  • Retrieve any user information

正如我们所见,该应用程序是一个只读的 Twitter API 使用者。

Note

我们可以使用 Twitter 文档来详细了解这种身份验证。可以在此处找到文档: https://developer.twitter.com/en/docs/basics/authentication/guides/authorizing-a-request。

我们将按照 Twitter 文档授权我们的请求,这是一种烹饪食谱,因此我们必须遵循所有步骤。最后的类应该是这样的:

package springfive.twittergathering.infra.twitter

import org.springframework.util.StringUtils
import springfive.twittergathering.infra.twitter.EncodeUtils.computeSignature
import springfive.twittergathering.infra.twitter.EncodeUtils.encode
import java.util.*

object Twitter {

private val SIGNATURE_METHOD = "HMAC-SHA1"

private val AUTHORIZATION_VERIFY_CREDENTIALS = "OAuth " +
"oauth_consumer_key="{key}", " +
"oauth_signature_method="" + SIGNATURE_METHOD + "", " +
"oauth_timestamp="{ts}", " +
"oauth_nonce="{nonce}", " +
"oauth_version="1.0", " +
"oauth_signature="{signature}", " +
"oauth_token="{token}""

fun buildAuthHeader(appSettings: TwitterAppSettings, twitterToken: TwitterToken, method: String, url: String, query: String):String{
val ts = "" + Date().time / 1000
val nounce = UUID.randomUUID().toString().replace("-".toRegex(), "")
val parameters = "oauth_consumer_key=${appSettings.consumerKey}&oauth_nonce=$nounce&oauth_signature_method=$SIGNATURE_METHOD&oauth_timestamp=$ts&oauth_token=${encode(twitterToken.accessToken)}&oauth_version=1.0&track=${encode(query)}"
val signature = "$method&" + encode(url) + "&" + encode(parameters)
var result = AUTHORIZATION_VERIFY_CREDENTIALS
result = StringUtils.replace(result, "{nonce}", nounce)
        result = StringUtils.replace(result, "{ts}", "" + ts)
        result = StringUtils.replace(result, "{key}", appSettings.consumerKey)
        result = StringUtils.replace(result, "{signature}", encode(computeSignature(signature, "${appSettings.consumerSecret}&${encode(twitterToken.accessTokenSecret)}")))
        result = StringUtils.replace(result, "{token}", encode(twitterToken.accessToken))
return result
    }

}

data class TwitterToken(val accessToken: String,val accessTokenSecret: String)

data class TwitterAppSettings(val consumerKey: String,val consumerSecret: String)

这是一个食谱。  buildAuthHeader 函数将使用规则创建授权标头来授权请求​​。我们已经签署了一些请求标头和请求正文。此外,将模板值替换为我们的 Twitter 凭据对象。

Some words about server-sent events (SSE)

服务器发送事件 (SSE) 是一种技术,其中 server 将事件发送到客户端,而不是客户端轮询服务器检查信息的可用性。在客户端或服务器关闭消息流之前,消息流不会中断。

这里最重要的是要了解信息流的方向。服务器决定何时向客户端发送数据。

处理资源负载和带宽使用非常重要。客户端将接收数据块,而不是通过轮询技术在服务器上施加负载。

 

Twitter 有一个流 API,Spring Framework WebClient 支持 SSE。是时候使用 Twitter 流了。

Creating the gather service

 TweetGatherService 将负责与 Twitter API 进行交互 并收集根据请求的主题标签请求推文。该服务将是一个带有一些注入属性的 Spring bean。该类应如下所示:

package springfive.twittergathering.domain.service

import com.fasterxml.jackson.annotation.JsonIgnoreProperties
import com.fasterxml.jackson.annotation.JsonProperty
import org.springframework.http.MediaType
import org.springframework.stereotype.Service
import org.springframework.web.reactive.function.BodyInserters
import org.springframework.web.reactive.function.client.WebClient
import reactor.core.publisher.Flux
import springfive.twittergathering.infra.twitter.Twitter
import springfive.twittergathering.infra.twitter.TwitterAppSettings
import springfive.twittergathering.infra.twitter.TwitterToken


@Service
class TweetGatherService(private val twitterAppSettings: TwitterAppSettings,
                         private val twitterToken: TwitterToken,
                         private val webClient: WebClient) {

fun streamFrom(query: String): Flux<Tweet> {
val url = "https://stream.twitter.com/1.1/statuses/filter.json"
return this.webClient.mutate().baseUrl(url).build()
                .post()
                .body(BodyInserters.fromFormData("track", query))
                .header("Authorization", Twitter.buildAuthHeader(twitterAppSettings, twitterToken, "POST", url, query))
                .accept(MediaType.TEXT_EVENT_STREAM)
                .retrieve().bodyToFlux(Tweet::class.java)
    }

}

@JsonIgnoreProperties(ignoreUnknown = true)
data class Tweet(val id: String = "", val text: String = "", @JsonProperty("created_at") val createdAt: String = "", val user: TwitterUser = TwitterUser("", ""))

@JsonIgnoreProperties(ignoreUnknown = true)
data class TwitterUser(val id: String, val name: String)

这里有一些重要的点。首先是函数声明;看看 Flux<Tweet>, 这意味着数据永远不会被中断,因为它代表了 N 个值。在我们的例子中,我们将使用 Twitter 流,直到客户端或服务器中断数据流。 

之后,我们用我们想要的轨道配置 HTTP 请求正文以获取事件。之后,我们配置了 Accept HTTP 头;必须指示 WebClient 需要使用哪种 mime 类型。

最后,我们使用了 Twitter.buildAuthHeader 函数来配置 Twitter 身份验证。

太棒了,我们已经准备好开始使用 Twitter API,我们只需要编写触发器即可使用该功能。我们将在下一节中这样做。 

Listening to the Rabbit Queue and consuming the Twitter API

我们将使用 Twitter API,但是什么时候呢?

当跟踪主题标签的请求到达我们的应用程序时,我们需要开始获取 Tweets。为了达到这个目标,当 TrackedHashTag 在我们的微服务上注册时,我们将实现 RabbitMQ 监听器。应用程序将向代理发送消息以开始使用 Twitter 流。

让我们看一下代码,逐步了解行为;最终代码应如下所示:

package springfive.twittergathering.domain.service

import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.amqp.rabbit.core.RabbitTemplate
import org.springframework.stereotype.Service
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
import springfive.twittergathering.domain.TrackedHashTag
import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeUnit

@Service
class TwitterGatherRunner(private val twitterGatherService: TweetGatherService,private val rabbitTemplate: RabbitTemplate) {

@RabbitListener(queues = ["twitter-track-hashtag"])
fun receive(hashTag:TrackedHashTag) {
val streamFrom = this.twitterGatherService.streamFrom(hashTag.hashTag).filter({
return@filter it.id.isNotEmpty() && it.text.isNotEmpty() &&  
             it.createdAt.isNotEmpty()
})
val subscribe = streamFrom.subscribe({
println(it.text)
Mono.fromFuture(CompletableFuture.runAsync {
                this.rabbitTemplate.convertAndSend("twitter- 
                 exchange","track.${hashTag.queue}",it)
})
})
Schedulers.elastic().schedule({ subscribe.dispose() },10L,TimeUnit.SECONDS)
    }

}

保持冷静。我们将涵盖整个代码。在 @RabbitListener, 我们配置了我们要消费的队列的名字。 Spring AMQP 模块将为我们自动配置我们的监听器并开始使用所需的队列。如我们所见,我们收到了 TrackedHashTag 对象;记住前面部分的转换器。

第一条指令将开始使用 Twitter 流。该流返回一个通量并且可以在那里有很多数据事件。在消费者之后,我们要过滤流上的数据。我们想要 Tweet 其中 idtext 和 < code class="literal">createdAt 不为空。 

然后,我们订阅这个流并开始接收流中的数据。此外, subscribes 函数返回一次性对象,这将有助于后续步骤。我们创建了一个匿名函数,它将在控制台上打印 Tweet 并将 Tweet 发送到 RabbitMQ 队列,以便在另一个微服务中使用。

 

最后,我们使用调度器停止数据流并消耗数据 10 秒。

在测试 Twitter 流之前,我们需要更改 Tracked Hashtag Service 以通过 RabbitMQ 发送消息。我们将在接下来的部分中这样做。这些变化很小,我们会尽快完成。

Changing the Tracked Hashtag Service


要运行整个解决方案,我们需要对 Tracked Hashtag Service 项目进行 一些 更改。更改简单且基本;配置 RabbitMQ 连接并更改服务以将消息发送到代理。

让我们这样做。

Adding the Spring Starter RabbitMQ dependency

正如我们之前在 Twitter Gathering 项目中所做的,我们需要添加 spring-boot- starter-amqp 为我们提供一些自动配置。为此,我们需要将以下代码段添加到我们的 pom.xml 中:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

正确的。现在,是时候配置 RabbitMQ 连接了。我们将在下一节中执行此操作。

Configuring the RabbitMQ connections

我们将使用 application.yaml配置 RabbitMQ 连接。然后,我们需要在其中创建几个属性,Spring AMQP 模块将使用提供的配置来启动连接工厂。

 

配置它非常简单。 Tracked Hashtag 的最终 yaml 文件应如下所示:

spring:
  rabbitmq:
    host: localhost
    username: guest
    password: guest
    port: 5672
  redis:
    host: 127.0.0.1
    port: 6379

server:
  port: 9090

queue:
  twitter: twitter-track-hashtag
exchange:
  twitter: twitter-track-exchange
routing_key:
  track: "*"
---
spring:
  profiles: docker
  rabbitmq:
    host: rabbitmq
    username: guest
    password: guest
    port: 5672
  redis:
    host: redis
    port: 6379

server:
  port: 9090

queue:
  twitter: twitter-track-hashtag
exchange:
  twitter: twitter-track-exchange
routing_key:
  track: "*"

 

 

 

 

 

 

此 yaml 中有两个配置文件。看看 RabbitMQ 的不同主机。在默认配置文件中,我们能够连接 localhost,因为我们在主机上公开了 RabbitMQ 端口。但是在 Docker 配置文件上,我们无法连接 localhost,我们需要连接到  rabbitmq host,它是 Twitter 网络的主机。

我们的 RabbitMQ 连接已经可以使用了。让我们在下一节中尝试一下。我们走吧。

Creating exchanges, queues, and bindings for the Twitter Hashtag Service

让我们为 Tracked Hashtag indexterm"> 用法。我们将使用 @Configuration 类来做到这一点。

RabbitMQ connection 应该如下所示:

package springfive.twittertracked.infra.rabbitmq

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.KotlinModule
import org.springframework.amqp.core.Binding
import org.springframework.amqp.core.BindingBuilder
import org.springframework.amqp.core.Queue
import org.springframework.amqp.core.TopicExchange
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

@Configuration
open class RabbitMQConfiguration(@Value("${queue.twitter}") private val queue:String,
                                 @Value("${exchange.twitter}") private val exchange:String,
                                 @Value("${routing_key.track}") private val routingKey:String){

@Bean
    open fun queue():Queue{
return Queue(this.queue,false)
    }

@Bean
    open fun exchange():TopicExchange{
return TopicExchange(this.exchange)
    }

@Bean
    open fun binding(queue: Queue, exchange: TopicExchange): Binding {
return BindingBuilder.bind(queue).to(exchange).with(this.routingKey)
    }

@Bean
    open fun converter(): Jackson2JsonMessageConverter {
return Jackson2JsonMessageConverter(ObjectMapper().registerModule(KotlinModule()))
    }

}

很简单。和之前一样,我们声明了一个交换、队列和绑定。

Sending the messages to the broker

这是现在最有趣的部分。当我们想要保存 TrackedHashTag 时,我们必须将漂亮的新实体发送到 RabbitMQ。这个过程会发送消息,然后 Twitter Gathering 微服务会在十秒后开始消费流。

我们需要稍微改变一下TrackedHashTagService;最终版本应如下所示:

package springfive.twittertracked.domain.service

import org.springframework.amqp.rabbit.core.RabbitTemplate
import org.springframework.beans.factory.annotation.Value
import org.springframework.stereotype.Service
import reactor.core.publisher.Mono
import springfive.twittertracked.domain.TrackedHashTag
import springfive.twittertracked.domain.repository.TrackedHashTagRepository
import java.util.concurrent.CompletableFuture

@Service
class TrackedHashTagService(private val repository: TrackedHashTagRepository,
                            private val rabbitTemplate: RabbitTemplate,
                            @Value("${exchange.twitter}") private val exchange: String,
                            @Value("${routing_key.track}") private val routingKey: String) {

fun save(hashTag: TrackedHashTag) {
        this.repository.save(hashTag).subscribe { data ->
Mono.fromFuture(CompletableFuture.runAsync {
                this.rabbitTemplate.convertAndSend(this.exchange, this.routingKey,  
                hashTag)
            })
        }
    }

fun all() = this.repository.findAll()

}

好工作。当新实体到来时,它将被发送给经纪人。我们已完成对 Tracked Hashtag Service 的更改。

最后,我们能够测试整个流程。让我们开始玩,感受我们构建的应用程序的真正威力。

好戏开场了!!!

Testing the microservice's integrations


现在,我们准备测试整个解决方案。在开始之前,我们需要检查以下 infrastructure 项:

  • Redis
  • RabbitMQ

如果项目启动并运行,我们可以跳到下一部分。

Note

我们可以使用 docker ps 命令,该命令应该列出处于运行模式的 Redis 和 RabbitMQ 容器。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

Running Tracked Hashtag Service

运行这个应用程序没有什么特别的。它包括 infrastructure 连接,这些连接在 application.yaml< /代码>。

运行 TrackedHashTagApplication 上的主函数。我们可以使用 IDE  或命令行来执行此操作。

检查控制台输出;输出将显示在 IDE 或命令行上。我们要找到以下行:

读书笔记《developing-java-applications-with-spring-and-spring-boot-ebook》被动式Web客户端

这意味着第一个应用程序已完全运行,我们能够运行 Twitter Gathering。请根据需要保持应用程序运行。

让我们运行 Twitter Gathering 吧!!!

Running the Twitter Gathering

这个应用程序有点复杂 运行。我们需要为此配置一些环境变量。这是必需的,因为我们不希望在我们的存储库中使用 Twitter 应用程序凭据。

在 IDE 中执行此操作非常简单。为此,我们可以配置运行配置。我们开始做吧:

  1. Click on the Edit Configurations... like in the following image:
读书笔记《developing-java-applications-with-spring-and-spring-boot-ebook》被动式Web客户端

然后,我们能够看到环境变量< /strong> 像这样:

读书笔记《developing-java-applications-with-spring-and-spring-boot-ebook》被动式Web客户端
  1. We need to click on ..., as highlighted in the proceeding image.
  2. The next screen will be shown and we can configure the Environment Variable:
读书笔记《developing-java-applications-with-spring-and-spring-boot-ebook》被动式Web客户端

  1. We need to configure the following environment variables:
    • consumer-key
    • consumer-secret
    • access-token
    • access-token-secret

这些值应使用 Twitter Application Management 值填充。

然后,我们可以运行应用程序。运行!!

现在,我们应该在控制台中看到以下行,这意味着应用程序正在运行:

读书笔记《developing-java-applications-with-spring-and-spring-boot-ebook》被动式Web客户端

太棒了,我们的两个微服务正在运行。让我们触发 Twitter 流。我们将在下一节中这样做。

Note

还有其他方法可以运行应用程序,例如,使用 maven Spring Boot 目标或 Java 命令行。如果您更喜欢在 Java 命令行中运行,请记住传递环境变量的 -D 参数。

Testing stuff

我们很高兴能测试完整的集成。我们可以使用 curl 工具将请求数据发送到 Tracked Hashtag Service。我们想从 Twitter 跟踪 "bitcoin"。 

我们可以执行以下命令行:

curl -H "Content-Type: application/json" -X POST -d '{"hashTag":"bitcoin","queue":"bitcoin"}' 
http://localhost:9090/api/tracked-hash-tag

检查HTTP状态码;它应该是 HTTP 状态 200。之后,我们可以从 Twitter Gathering 项目中检查控制台,应该会记录很多 Tweets。

看一下日志,日志肯定有这样的Tweets:

读书笔记《developing-java-applications-with-spring-and-spring-boot-ebook》被动式Web客户端

惊人的!

伟大的工作人员,我们将完整的应用程序与 RabbitMQ 和 Twitter 流集成在一起。

Spring Actuator


Spring Boot Actuator 是应用程序在生产环境中运行时的一种帮助器。该项目提供已部署应用程序的内置信息。

在微服务世界中,监控应用程序实例是获得成功的关键。在这些环境中,通常有许多应用程序通过网络协议(例如 HTTP)调用其他应用程序。网络是一个不稳定的环境,有时会出现故障;我们需要跟踪这些事件,以确保应用程序正常运行并全面运行。

Spring Boot Actuator 在这些情况下可以帮助开发人员。该项目公开了几个带有应用程序信息的 HTTP API,例如内存使用情况、CPU 使用情况、应用程序健康检查,以及应用程序的基础设施组件,例如与数据库和消息代理的连接。

最重要的一点是信息通过 HTTP 公开。例如,它有助于与 Nagios 和 Zabbix 等外部监视器应用程序集成。没有用于公开此信息的特定协议。

让我们将它添加到我们的项目中并尝试几个端点。

Adding Spring Boot Actuator in our pom.xml

Spring Boot Actuator 漂亮在我们的pom.xml中配置简单。我们扩展了 Spring Boot 的父 pom,所以不需要指定依赖的版本。

让我们配置我们的新依赖项:

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency></dependencies>

太棒了,真的很容易。在我们测试之前让我们多了解一点。

Actuator Endpoints

这些项目有很多内置的端点,它们会在应用程序启动时启动。请记住,我们使用了启动项目,它是自动为我们配置的项目。

不同的需求有几个端点,我们将看看生产微服务中最常用的。

  • /health: The most known actuator endpoint; it shows the application's health, and usually, there is a status attribute
  • /configprops: Displays a collapse @ConfigurationProperties
  • /env: Exposes properties from the Spring ConfigurableEnvironment
  • /dump: Shows the thread dump
  • /info: We can put some arbitrary information at this endpoint
  • /metrics: Metrics from the running application
  • /mappings: @RequestMappings endpoints from the current application

还有另一个重要的端点可以通过 HTTP 接口显示应用程序日志。 /logfile 端点可以帮助我们可视化日志文件。

Note

Spring Boot Actuator 创建的端点列表可以在以下位置找到: https://docs.spring.io/spring-boot/docs/current/reference/html/production-ready-endpoints.html

Application custom information

我们可以使用一个特定的端点 来公开应用程序的自定义信息。此信息将公开给 /info 端点。 

要配置它,我们可以使用 application.yaml 文件并放置与模式相关的所需信息,如下所示:

info:
  project: "twitter-gathering"
  kotlin: @kotlin.version@

所需的属性必须在 信息之前。 * 。然后,我们可以测试我们的第一个执行器端点并检查我们的 /info 资源。

让我们尝试访问 http://localhost:8081/infoapplication.yaml 上填写的信息应该会显示出来,如下图:

读书笔记《developing-java-applications-with-spring-and-spring-boot-ebook》被动式Web客户端

正如我们所见,这些属性是从 HTTP 端点公开的。例如,我们可以使用它来放置应用程序版本。

Testing endpoints

在第 2 版中  Spring Boot,Spring Actuator 管理端点默认是禁用的,因为这些端点可能包含正在运行的应用程序的敏感数据。然后,我们需要配置以正确启用这些端点。

有一个特别要注意的地方。如果 application 公开,您应该保护这些端点。

让我们启用我们的管理端点:

management:
  endpoints:
    web:
      expose: "*"

在前面的配置中,我们启用了所有的管理端点,然后我们可以开始测试一些端点。

让我们测试一些端点。首先,我们将测试指标端点。此端点显示可用于正在运行的应用程序的指标。转到 http://localhost:8081/actuator/metrics 并检查结果:

读书笔记《developing-java-applications-with-spring-and-spring-boot-ebook》被动式Web客户端

Note

我们使用端口 8081 因为我们配置了属性 server.port in application .yaml。可以根据需要更改端口。

为我们自动配置了很多指标。该端点仅公开可用的指标。要检查指标值,我们需要使用另一个端点。让我们检查 http.server.request 的值。

检查值的基本端点是: http://localhost:8081/actuator/metrics/{metricName}。然后,我们需要去: http://localhost:8081/actuator/metrics/http.server.requests。结果应该是:

读书笔记《developing-java-applications-with-spring-and-spring-boot-ebook》被动式Web客户端

如您所见,服务器收到了 8 个调用。尝试再点击几次以查看指标的变化。 

好工作。我们的微服务已准备好投入生产。我们有 docker 镜像和端点来监控我们的服务。

Summary


在本章中,我们学习并实践了很多 Spring Advanced 概念,例如 RabbitMQ 集成。

我们创建了一个完全反应式的 WebClient 并利用了反应式范式;它可以优化资源计算并提高应用程序的性能。

此外,我们还通过 RabbitMQ 代理集成了两个微服务。这是集成应用程序的绝佳解决方案,因为它解耦了应用程序,还允许您非常轻松地横向扩展应用程序。消息驱动是构建反应式应用程序所需的特征之一;它可以在 Reactive Manifesto (https://www.reactivemanifesto.org/en)。

在下一章中,我们将改进我们的解决方案并创建一个新的微服务来为我们的客户流式传输过滤后的推文。我们将再使用一次 RabbitMQ。