vlambda博客
学习文章列表

Vert.x 响应式 Web 框架介绍使用

一、Vert.x

Vert.x 是JVM上构建 响应式 应用的工具。响应式应用既可以随着工作负载的增长而 扩展 ,又可以在出现故障时 弹性回复 。响应式应用是 即时响应 的,因为它能有效利用系统资源并保护自身免受错误影响,从而使延迟可控。Vert.x 背靠庞大的响应式模块生态系统,具有编写现代服务所需的一切:全面的Web技术栈,响应式数据库驱动程序、消息队列、事件流、集群支持、指标监控、分布式跟踪等等。

Vert.x最大的特点就在于异步(底层基于Netty),通过事件循环(EventLoop)来调起存储在异步任务队列(CallBackQueue)中的任务,大大降低了传统阻塞模型中线程对于操作系统的开销。因此相比较传统的阻塞模型,异步模型能够很大层度的提高系统的并发量。还提供了非常多的吸引人的技术,比如EventBus,通过EventBus可以非常简单的实现分布式消息,进而为分布式系统调用,微服务奠定基础。除此之外,还提供了对多种客户端的支持,比如Mysql、Redis、MongoDB、Kafka等等。

在Java Web框架领域,我们目前最熟知的应该是 SpringMVC,目前来讲,SpringMVC作为Spring体系下的Web层架构,是深受企业青睐的,而 Vert.x Web层框架,感觉很少被提及,一方面社区确实不如 Spring 家族的活跃,另一方面我感觉应该是使用方式上和SpringMVC相差较大,我们都知道SpringMVC是采用响应式的编程方式,通过几个注解就可以完成一个接口,而 Vert.x 需要手动去实现 Router ,当前写起来也非常简单,对于 Vert.x 基本都充斥着 lamda 表达式及JDK 8 的其他新特性,这意味着你必须要学习 JDK 8 的新特性才能驾驭 Vert.x 。

从官网的介绍来看, Vert.x 已经拥有了完善的生态体系:Vert.x 响应式 Web 框架介绍使用下面我们开始对 Vert.x 的学习:

在开始前先了解下 Vert.x 的核心组件 Vert.x Core,本篇文章会使用到 Vert.x Core 和 Vert.x Web:

Vert.x Core 提供了下列功能:

编写 TCP 客户端和服务端编写支持 WebSocket 的 HTTP 客户端和服务端事件总线共享数据 —— 本地的Map和分布式集群Map周期性、延迟性动作部署和撤销 Verticle 实例数据报套接字DNS客户端文件系统访问高可用性集群

Vert.x Core 中的功能相当底层 ,在此不会找到诸如数据库访问、授权或高层Web应用的功能。其余功能会在后面讲解 Vert.x ext (扩展包)的时候讲到。

Vert.x 目前是 4.x 的版本,对比与 3.x 增加了些新的特性,本文使用的 Vert.x 版本为 4.1.8,目前最新的是 4.2.5, 最新的不一定是最稳定的:Vert.x 响应式 Web 框架介绍使用新建一个maven项目,在 pom 中引入 Vert.x Core 的依赖:

<dependency> <groupId>io.vertx</groupId> <artifactId>vertx-core</artifactId> <version>4.1.8</version></dependency>

下面实现一个简单的web接口:

二、简单的web接口

其中在 Vert.x 中最重要的类就是 Vertx 类,可以通过直接 new 的方式创建,通过Vertx 对象直接 createHttpServer ,就可以轻松的开启一个web容器:

@Slf4jpublic class VertxDemo {
private static final Integer port = 8090;
public static void main(String[] args) { Vertx vertx = Vertx.vertx(new VertxOptions() .setEventLoopPoolSize(20) .setWorkerPoolSize(20));
vertx.createHttpServer().requestHandler(ctx -> { String param = ctx.getParam("param"); log.info("param:{}", param); log.info("absoluteURI: {}",ctx.absoluteURI()); log.info("uri:{}",ctx.uri()); ctx.response().putHeader("content-type", "text/plain").end("success"); }).listen(port, "0.0.0.0", res -> { if (res.succeeded()) { log.info("vert.x success to listen:" + port); } else { log.info("vert.x fail:" + res.cause().getMessage()); } }); }}

直接启动 main方法就启动了一个 8090 端口的服务:Vert.x 响应式 Web 框架介绍使用

可以使用 PostMan 访问 http://localhost:8090?param=abc

Vert.x 响应式 Web 框架介绍使用Vert.x 响应式 Web 框架介绍使用另外 Vert.x 还提供了一种基于 Verticle 的方式:

@Slf4jpublic class VertxDemo extends AbstractVerticle {
private static final Integer port = 8090;
public static void main(String[] args) { VertxOptions options = new VertxOptions() .setEventLoopPoolSize(20) .setWorkerPoolSize(20); Vertx.vertx(options).deployVerticle(new VertxDemo()); }
@Override public void start(Promise<Void> startPromise) throws Exception { vertx.createHttpServer().requestHandler(ctx -> { String param = ctx.getParam("param"); log.info("param:{}", param); log.info("absoluteURI: {}",ctx.absoluteURI()); log.info("uri:{}",ctx.uri()); ctx.response().putHeader("content-type", "text/plain").end("success"); }).listen(port, "0.0.0.0", res -> { if (res.succeeded()) { log.info("vert.x success to listen:" + port); } else { log.info("vert.x fail:" + res.cause().getMessage()); } }); }}

运行出来和上面一样的效果。

从上面的演示中可以看出貌似没有地方设置路由,我们在 SpringMVC的项目中还能使用@GetMapping、@PostMapping 等设置请求路由,同样在 Vert.x 肯定也是支持的,不过不是声明式的,我们需要再依赖一个 Vert.x-Web 包,就可以实现路由了。

下面基于 Vert.x-Web 实现 RestFul 接口。

三、RestFul 接口

Vert.x Core 提供了一系列相对底层的功能用于操作HTTP, 对于一部分应用是足够的。Vert.x Web 基于 Vert.x Core 提供了一系列更丰富的功能, 以便更容易地开发实际的 Web 应用。

Vert.x Web 非常适合编写 RESTful HTTP 微服务,特性有:

路由(基于方法,路径等)

      •基于正则表达式的路径匹配     从路径中提取参数      内容协商      处理消息体      消息体的长度限制      Multipart 表单      Multipart 文件上传      子路由      支持本地会话和集群会话      支持 CORS(跨域资源共享)      错误页面处理器      HTTP基本/摘要认证      基于重定向的认证      授权处理器      基于 JWT 的授权      用户/角色/权限授权      网页图标处理器      响应时间处理器      静态文件服务,包括缓存逻辑以及目录监听      支持请求超时      支持 SockJS      桥接 Event-bus      CSRF 跨域请求伪造      虚拟主机


添加 Vert.x Web 依赖:

<dependency> <groupId>io.vertx</groupId> <artifactId>vertx-web</artifactId> <version>4.1.8</version></dependency>

改造上面的接口:

@Slf4jpublic class VertxDemo extends AbstractVerticle {
private static final Integer port = 8090;
public static void main(String[] args) { VertxOptions options = new VertxOptions() .setEventLoopPoolSize(20) .setWorkerPoolSize(20); Vertx.vertx(options).deployVerticle(new VertxDemo()); }
@Override public void start(Promise<Void> startPromise) throws Exception { Router router = Router.router(this.vertx);
router.get("/getTest").handler(handler -> { HttpServerRequest request = handler.request(); String param = request.getParam("param"); log.info("method: {}",request.method()); log.info("param:{}", param); log.info("absoluteURI: {}", request.absoluteURI()); log.info("uri:{}", request.uri()); handler.response().putHeader("content-type", "text/plain").end("success"); });
router.post("/getTest").handler(handler -> { HttpServerRequest request = handler.request(); String param = request.getParam("param"); log.info("method: {}",request.method()); log.info("param:{}", param); log.info("absoluteURI: {}", request.absoluteURI()); log.info("uri:{}", request.uri()); handler.response().putHeader("content-type", "text/plain").end("success"); });
this.vertx.createHttpServer().requestHandler(router).listen(port, "0.0.0.0", res -> { if (res.succeeded()) { log.info("vert.x success to listen:" + port); } else { log.info("vert.x fail:" + res.cause().getMessage()); } }); }}

运行程序,调用接口,查看日志:Vert.x 响应式 Web 框架介绍使用上面接收的form表单的形式,一般接口都会有JSON格式的参数,应该怎么接收呢:

router.post("/getTest").handler(handler -> { HttpServerRequest request = handler.request(); log.info("method: {}", request.method()); log.info("absoluteURI: {}", request.absoluteURI()); log.info("uri:{}", request.uri()); handler.request().bodyHandler(h -> { Optional.ofNullable(h.toJsonObject()).ifPresent(c -> { log.info("接口参数:{}",c.toString()); }); }); handler.response().putHeader("content-type", "text/plain").end("success");});

Vert.x 响应式 Web 框架介绍使用Vert.x 响应式 Web 框架介绍使用

四、阻塞式处理器

Vert.x中的几乎所有API都不会阻塞调用线程,可以立即提供结果的API会立即返回,否则就需要提供一个处理器(Handler) 来接收稍后回调的事件。因为Vert.x API不会阻塞线程, 所以通过Vert.x您可以只使用少量的线程来处理大量的并发。当使用传统的阻塞式API做操作时,调用线程就可能会被阻塞。

尽管Vert.x 的 API 都是非阻塞式的,且不会阻塞 Event Loop, 但是用户编写的处理器中可能会阻塞 Event Loop。Vert.x 提供了将route设置成阻塞式处理器的功能。阻塞式处理器和普通处理器很像, 区别是 Vert.x 会使用 Worker Pool 中的线程而不是 Event Loop 线程来处理请求。

如果您只有单个 Event Loop,而且您希望每秒处理10000个 HTTP 请求, 很明显的是每一个请求处理时间不可以超过0.1毫秒,所以您不能阻塞任何过多(大于0.1毫秒)的时间。

router.get("/getTest").blockingHandler(handler -> { HttpServerRequest request = handler.request(); String param = request.getParam("param"); log.info("method: {}", request.method()); log.info("param:{}", param); log.info("absoluteURI: {}", request.absoluteURI()); log.info("uri:{}", request.uri()); log.info("thread: {}",Thread.currentThread().getName()); handler.response().putHeader("content-type", "text/plain").end("success");});

Vert.x 响应式 Web 框架介绍使用默认情况下,在同一个 Context (例如同一个 Verticle 实例) 上执行的所有阻塞式处理器是顺序的, 也就意味着只有一个处理器执行完了才会继续执行下一个。如果不关心执行的顺序, 并且不介意阻塞式处理器以并行的方式执行, 可以在使用 blockingHandle 时,设置阻塞式处理器的 ordered 为 false。

router.get("/getTest").blockingHandler(handler -> { HttpServerRequest request = handler.request(); String param = request.getParam("param"); log.info("method: {}", request.method()); log.info("param:{}", param); log.info("absoluteURI: {}", request.absoluteURI()); log.info("uri:{}", request.uri()); log.info("thread: {}",Thread.currentThread().getName()); handler.response().putHeader("content-type", "text/plain").end("success");},false);

同样还可以通过 executeBlocking 将耗时操作交给 Worker 线程来执行:

router.get("/getTest").handler(handler -> { HttpServerRequest request = handler.request(); this.vertx.executeBlocking(h -> { String param = request.getParam("param"); log.info("method: {}", request.method()); log.info("param:{}", param); log.info("absoluteURI: {}", request.absoluteURI()); log.info("uri:{}", request.uri()); log.info("thread1: {}", Thread.currentThread().getName()); h.complete("success"); }, false, res -> { log.info("thread2: {}", Thread.currentThread().getName()); handler.response().putHeader("content-type", "text/plain").end(res.result().toString()); });});

我们也可以创建一个自己的 Worker 线程执行器,默认情况下,如果 executeBlocking 在同一个上下文环境中(如:同一个 Verticle 实例)被调用了多次, 那么这些不同的 executeBlocking 代码块会 顺序执行(一个接一个)。不关心调用 executeBlocking 的顺序, 可以将 ordered 参数的值设为 false。这样任何 executeBlocking 都会在 Worker Pool 中并行执行。

当使用同一个名字创建了许多 worker 时,它们将共享同一个 pool。当所有的 worker executor 调用了 close 方法被关闭过后,对应的 worker pool 会被销毁。如果 Worker Executor 在 Verticle 中创建,那么 Verticle 实例销毁的同时 Vert.x 将会自动关闭这个 Worker Executor。

int poolSize = 10;long maxExecuteTime = 2;TimeUnit maxExecuteTimeUnit = TimeUnit.MINUTES;WorkerExecutor executor = vertx.createSharedWorkerExecutor("my-worker-pool", poolSize, maxExecuteTime, maxExecuteTimeUnit);executor.executeBlocking(h -> { h.complete("success");}, false, res -> { log.info("thread2: {}", Thread.currentThread().getName());});executor.close();

其中 第三个参数为,任务的超时时间,大于该时间,并不会终止执行,而是进行日志的提醒:

router.get("/getTest").handler(handler -> { WorkerExecutor executor = vertx.createSharedWorkerExecutor("my-worker-pool", 10, 2, TimeUnit.SECONDS); executor.executeBlocking(h -> { log.info("thread1: {}", Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } h.complete("success"); }, false, res -> { log.info("thread2: {}", Thread.currentThread().getName()); handler.response().putHeader("content-type", "text/plain").end(res.result().toString()); executor.close(); });});