vlambda博客
学习文章列表

轻量级消息总线 EventBus 初尝!


阅读文本大概需要 2.8 分钟。


前言

Guava 作为 Google 家出品的一款优秀的开源 Java 框架,在日常的开发过程中,帮助我们提高了很多开发的效率,而 EventBus 则是 Guava 框架中,一个优秀的基于事件处理机制的组件,对于事件的监听以及事件的订阅,都是一个不错的解决方案。


使用场景

  1. 异步化处理事件

  2. 发布订阅场景


EventBus基本用法

引入 Guava 依赖

对于一个 SpringBoot 项目而言,首先需要引入 Guava 的依赖

<!-- https://mvnrepository.com/artifact/com.google.guava/guava --><dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>30.1.1-jre</version></dependency>

初始化 EventBus 实例

其次需要在 Spring 环境下初始化 EventBus 的实例组件

@Configurationpublic class EventBusConfig { /** * 配置一个异步消息总线实例 * @return */ @Bean public AsyncEventBus asyncEventBus() { // 线程池的线程数依据自己的业务体量来决定 return new AsyncEventBus(Executors.newFixedThreadPool(100)); }}

构建 EventBus 传递的消息体

事件的处理需要通过消息体来承载,这里我们创建一个订单事件OrderEvent来承载我们需要处理的数据

/** * @program: csp * @description: 订单事件-用于传递消息数据 * @author: liuencier * @create: 2021-08-23 23:08 **/@Data@Accessors@ApiModel("订单事件")public class OrderEvent implements Serializable { @ApiModelProperty("订单ID") private String orderId; @ApiModelProperty("订单编码") private String orderCode; @ApiModelProperty("创建事件") private Date createTime;}

订阅消息

接下来需要订阅消息,进而处理消息

@Slf4j@Componentpublic class OrderListener { /** * 注入需要的异步事件总线组件,用于注册订阅消息 */ @Resource private AsyncEventBus asyncEventBus; /** * 在监听器启动之前注册到消息总线上 */ @PostConstruct public void register() { asyncEventBus.register(this); } /** * 最终订阅消息,消费消息 * @param orderEvent */ @Subscribe public void process(OrderEvent orderEvent) { log.info("EventBus process message : {}", JsonUtils.obj2String(orderEvent)); this.doSomeThing(orderEvent); log.info("EventBus process finish ..."); } private void doSomeThing(OrderEvent orderEvent) { // do something ... }}

发布事件

发布消息的方式有很多种

  1. 通过 main 函数发布事件

  2. 通过单元测试的方式发布事件

  3. 通过接口的方式发布事件

这里我使用的是通过接口的方式来模拟事件的发布与消费

@RestController@RequestMapping("/event_bus/")public class EventController { @Resource private AsyncEventBus asyncEventBus; @GetMapping("/publish") public CommonResult publish() { // 构建需要处理的事件消息 OrderEvent orderEvent = new OrderEvent() .setOrderId(UuidGenerator.newUuid()) .setOrderCode(String.format("SO%s", DateUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"))) .setCreateTime(new Date()); // 通过 EventBus 异步发送事件消息 asyncEventBus.post(orderEvent); return CommonResult.success("事件发布成功"); }}

最终效果

OrderListener 会监听到注册到消息总线上的消息,并且成功消费打印出 OrderEvent 事件的数据。

[2021-08-23 23:46:57] [INFO][pool-2-thread-1] [com.liuencier.csp.core.event.OrderListener]: EventBus process message : {"orderId":"12f39851424a48a49330e4d6957e0659","orderCode":"SO20210823234657","createTime":"2021-08-23T15:46:57.577+0000"}[2021-08-23 23:46:57] [INFO][pool-2-thread-1] [com.liuencier.csp.core.event.OrderListener]: EventBus process finish ...


源码

https://github.com/liuenci/csp




技术人不应该只有技术

你一关注我

我写的就更来劲了