Vert.x系列(四),EventBus事件总线
EventBus也叫事件总线,效果如上图所示。发布者发布事件到总线,订阅者从总线订阅事件,然后处理事件,响应事件。看起来有点像一个消息队列系统,都有发布者和订阅者,但这一切都是Vert.x内部的一个实现,可以说是最经典的功能之一。另外这里关于名词作一点说明:发布者可以叫做生产者,订阅者也可以叫消费者。
EventBus一般用于什么场合呢?
异步处理和分布式应用。
所谓异步处理,就是事件消息发送出去,并不关心其处理结果,交由订阅者后台慢慢处理。在当前版本的Vert.x体现的方法就是publisher和send方法。
发布者代码一,publish方法
vertx.eventBus().publish("address", "msg");
发布者代码二,send方法
vertx.eventBus().send("address", "msg");
在这可以看到发布者的实现有两种方法,publish和send。它们之间是有区别的。publish发出去的消息,所有的订阅者都会收到,而send是点对点,只有一个订阅者会收到,该使用哪种方式,要根据实际情况选择。
订阅者代码
MessageConsumer<String> consumer = vertx.eventBus().consumer("address");consumer.handler(msg ->{System.out.println(msg.body());});
看到这里,可能有读者会问,订阅者处理完发布者的消息,可以回应发布者吗?发布者可以获取到订阅者的回应吗?
答案是可以的。
接下来看消费者如何回应发布者:
MessageConsumer<String> consumer = vertx.eventBus().consumer("address");consumer.handler(msg ->{System.out.println(msg.body());msg.reply("success");});
从代码里面可以看到msg有一个reply方法,这个方法就是用来做出回应的。
再看发布者如何获取订阅者的回应:
vertx.eventBus().request("address", "msg", rh -> {if (rh.succeeded()) {System.out.println(rh.result().body());}});
这里要留意发布者使用的request方法,不是publish或者send了。
表情图片来自网络
!_!:纳尼!!前面不是说发布者是用publish和send发送事件消息的吗,这里怎么变request了?作者你的节操呢?
其实笔者也挺委屈的,在以前的版本中send方法是支持获取响应的,但在当前版本中已经被标记为过时方法了。这里应采用request方法。request方法提供了一个Handler<AsyncResult<Message
^=^:好吧,那么可爱的读者宝宝就暂时原谅你了。那你说了这么多,能整一个案例不?倒是让本宝宝吃一个栗子啊。
举个例子,假设Http verticle有一个接口/user/:id,当接口获取到id参数后,通过EventBus,将参数发送到一个DatabaseVerticle里面,在这个verticle里面有一个订阅者,收到参数后就去数据库里面查询对应的数据,然后返回给Http verticle中的接口,并返回到前端。
DatabaseVerticle.java
package com.javafm.vertx;import io.vertx.core.AbstractVerticle;import io.vertx.core.eventbus.MessageConsumer;import io.vertx.core.json.JsonObject;public class DatabaseVerticle extends AbstractVerticle {@Overridepublic void start() throws Exception {super.start();MessageConsumer<JsonObject> consumer = vertx.eventBus().consumer("com.javafm.vertx.database");consumer.handler(msg -> {// 加装从数据库查询出数据,然后返回System.out.println(msg.body());JsonObject json = new JsonObject();json.put("id", 1);json.put("name", "dev-tang");msg.reply(json);});}}
HttpVerticle.java
package com.javafm.vertx;import io.vertx.core.AbstractVerticle;import io.vertx.core.http.HttpServer;import io.vertx.core.json.JsonObject;import io.vertx.ext.web.Router;import io.vertx.ext.web.RoutingContext;import io.vertx.ext.web.handler.BodyHandler;public class HttpVerticle extends AbstractVerticle {@Overridepublic void start() throws Exception {HttpServer server = vertx.createHttpServer();Router router = Router.router(vertx);router.route().handler(BodyHandler.create());router.get("/user/:id").handler(this::getUser);server.requestHandler(router);server.listen(8080);}private void getUser(RoutingContext ctx) {int id = Integer.parseInt(ctx.request().getParam("id"));JsonObject json = new JsonObject().put("id", id);vertx.eventBus().request("com.javafm.vertx.database", json, r -> {if (r.succeeded()){out(ctx, r.result().body().toString());} else {r.cause().printStackTrace();ctx.fail(r.cause());}});}private void out(RoutingContext ctx, String msg) {ctx.response().putHeader("Content-Type", "application/json; charset=utf-8").end(msg);}}
App.java
package com.javafm.vertx;import io.vertx.core.Vertx;public class App {public static void main(String[] args) {Vertx vertx = Vertx.vertx();vertx.deployVerticle(new DatabaseVerticle());vertx.deployVerticle(new HttpVerticle());}}
App.java是项目启动类,在这个类里面发布两个Verticle。接着我们启动这个项目,在浏览器访问接口:http://localhost:8080/user/1,出现如图1-1所示则表示结果是正确的。
图1-1
本文章中体现了EventBus异步处理,如果要做分布式应用开发又是怎么搞的呢?请听下回分解。
