vlambda博客
学习文章列表

Vert.x系列(四),EventBus事件总线

EventBus也叫事件总线,效果如上图所示。发布者发布事件到总线,订阅者从总线订阅事件,然后处理事件,响应事件。看起来有点像一个消息队列系统,都有发布者和订阅者,但这一切都是Vert.x内部的一个实现,可以说是最经典的功能之一。另外这里关于名词作一点说明:发布者可以叫生产者,订阅者也可以叫消费者。


EventBus一般用于什么场合呢?


异步处理和分布式应用。


所谓异步处理,就是事件消息发送出去,并不关心其处理结果,交由订阅者后台慢慢处理。在当前版本的Vert.x体现的方法就是publisher和send方法。


发布者代码一,publish方法

vertx.eventBus().publish("address", "msg");


发布者代码二,send方法

vertx.eventBus().send("address", "msg");


在这可以看到发布者的实现有两种方法,publish和send。它们之间是有区别的。publish发出去的消息,所有的订阅者都会收到,而send是点对点,只有一个订阅者会收到,该使用哪种方式,要根据实际情况选择。


订阅者代码

MessageConsumer<String> consumer = vertx.eventBus().consumer("address");consumer.handler(msg ->{ System.out.println(msg.body());});


看到这里,可能有读者会问,订阅者处理完发布者的消息,可以回应发布者吗?发布者可以获取到订阅者的回应吗?


答案是可以的。


接下来看消费者如何回应发布者:

MessageConsumer<String> consumer = vertx.eventBus().consumer("address");consumer.handler(msg ->{ System.out.println(msg.body()); msg.reply("success");});


从代码里面可以看到msg有一个reply方法,这个方法就是用来做出回应的。


再看发布者如何获取订阅者的回应:

vertx.eventBus().request("address", "msg", rh -> { if (rh.succeeded()) { System.out.println(rh.result().body()); }});

这里要留意发布者使用的request方法,不是publish或者send了。


表情图片来自网络

!_!:纳尼!!前面不是说发布者是用publish和send发送事件消息的吗,这里怎么变request了?作者你的节操呢?


其实笔者也挺委屈的,在以前的版本中send方法是支持获取响应的,但在当前版本中已经被标记为过时方法了。这里应采用request方法。request方法提供了一个Handler<AsyncResult<Message >>的参数,这个参数就是用来获取订阅者响应的。


^=^:好吧,那么可爱的读者宝宝就暂时原谅你了。那你说了这么多,能整一个案例不?倒是让本宝宝吃一个栗子啊。


举个例子,假设Http verticle有一个接口/user/:id,当接口获取到id参数后,通过EventBus,将参数发送到一个DatabaseVerticle里面,在这个verticle里面有一个订阅者,收到参数后就去数据库里面查询对应的数据,然后返回给Http verticle中的接口,并返回到前端。


DatabaseVerticle.java

package com.javafm.vertx;
import io.vertx.core.AbstractVerticle;import io.vertx.core.eventbus.MessageConsumer;import io.vertx.core.json.JsonObject;
public class DatabaseVerticle extends AbstractVerticle { @Override public void start() throws Exception { super.start();
MessageConsumer<JsonObject> consumer = vertx.eventBus().consumer("com.javafm.vertx.database"); consumer.handler(msg -> { // 加装从数据库查询出数据,然后返回 System.out.println(msg.body());
JsonObject json = new JsonObject(); json.put("id", 1); json.put("name", "dev-tang"); msg.reply(json); }); }}



HttpVerticle.java

package com.javafm.vertx;
import io.vertx.core.AbstractVerticle;import io.vertx.core.http.HttpServer;import io.vertx.core.json.JsonObject;import io.vertx.ext.web.Router;import io.vertx.ext.web.RoutingContext;import io.vertx.ext.web.handler.BodyHandler;

public class HttpVerticle extends AbstractVerticle { @Override public void start() throws Exception { HttpServer server = vertx.createHttpServer();
Router router = Router.router(vertx); router.route().handler(BodyHandler.create());
router.get("/user/:id").handler(this::getUser);
server.requestHandler(router); server.listen(8080); }
private void getUser(RoutingContext ctx) { int id = Integer.parseInt(ctx.request().getParam("id")); JsonObject json = new JsonObject().put("id", id); vertx.eventBus().request("com.javafm.vertx.database", json, r -> { if (r.succeeded()){ out(ctx, r.result().body().toString()); } else { r.cause().printStackTrace(); ctx.fail(r.cause()); } }); }

private void out(RoutingContext ctx, String msg) { ctx.response().putHeader("Content-Type", "application/json; charset=utf-8") .end(msg); }}


App.java

package com.javafm.vertx;
import io.vertx.core.Vertx;
public class App { public static void main(String[] args) { Vertx vertx = Vertx.vertx(); vertx.deployVerticle(new DatabaseVerticle()); vertx.deployVerticle(new HttpVerticle()); }}


App.java是项目启动类,在这个类里面发布两个Verticle。接着我们启动这个项目,在浏览器访问接口:http://localhost:8080/user/1,出现如图1-1所示则表示结果是正确的。




图1-1


本文章中体现了EventBus异步处理,如果要做分布式应用开发又是怎么搞的呢?请听下回分解。