Netty中单例模式的应用——全局线程单例执行器的用法和实现原理
本篇文章大概3000字,阅读时间大约10分钟
本文先从单例模式入手做一个复习,并且结合Netty的源码对其全局的线程单例执行器做一个拆解和总结。不妨带着如下问题阅读:
1、单例模式都有哪些类型,如果实现一个线程安全的延迟加载的单例类?
2、如果让一个线程周期性的判断某个条件,决定是否自动回收?
3、如何使用Netty提供的全局线程单例执行器?
4、Netty的优雅停机API能注册监听器么?
回忆文章:
可以知道Netty的优雅停机API——shutdownGracefully返回了一个future对象——terminationFuture,如下是它的类型:
private final Promise<?> terminationFuture =
new DefaultPromise(GlobalEventExecutor.INSTANCE);
这里有一个小的“缺陷”,或者说是故意的设计,因为实际使用优雅停机API时,这个terminationFuture用不上,并且我找遍了全网资料以及Netty的demo,都没发现有使用shutdownGracefully的返回的future去注册监听器的用法。在我理解,它属于设计缺陷,因为它提供了addListener方法,但是却无法正常使用。
言归正传,重点看该future对象封装的GlobalEventExecutor.INSTANCE对象——Netty提供的全局的线程单例执行器。提到线程执行器,可以回忆文章:
,里面提到【Netty在实例化自己的线程池——NioEventLoopGroup时,会在其直接父类——MultithreadEventExecutorGroup里创建NioEventLoopGroup的线程执行器——ThreadPerTaskExcutor,也就是JDK的Excuter接口的一个实例】,线程执行器是在底层真正创建Java线程并启动的工具。
同理,GlobalEventExecutor也是一个这样的线程执行器,只不过它有一些特性:
1、顾名思义,GlobalEventExecutor是一个单例类,意味着在一个Netty实例(通常是一个机器)里,它只能有且仅有一个,而ThreadPerTaskExcutor可以有多个
2、GlobalEventExecutor会自动启动并执行MPSCQ里的异步任务,当1秒内没有新的任务添加到该队列时,它会自动回收自己,无需用户主动关闭。
3、此执行器的使用场景是:前提是需要线程隔离的场景,同时专用于执行数量较少的“小”任务,这也是它自动1s回收的特性所约束的。如果要调度执行大量的异步任务,应该用专用执行器,比如Netty线程池内置的线程执行器ThreadPerTaskExcutor。
下面看GlobalEventExecutor的单例实现,很朴素,如下使用了饿汉式单例模式,在类加载时直接使用静态实例化,该过程也是线程安全的:
如上也能看到,Netty对单例模式的实现没有那么细致的讲究,没有进行懒加载。
除了以上饿汉式单例模式外,还有一种懒汉式单例模式,一般都是默认实现线程安全的单例,可以直接用《Effective Java》里推荐的枚举类实现。PS.不要在写什么双重检查加锁+volatile那种笨拙难看的代码了。。。个人认为有些过时,也可以如下使用静态内部类实现,或者使用Spring的单例bean配置:
public class Person {
private String name;
private Person() {}
private Person(String name) {this.name = name;}
// 在静态内部类里去创建本类(外部类)的对象
public static Person getInstance() {
return Holder.instatnce;
}
private static class Holder {
private static final Person instatnce =
new Person("John", 31);
}
}
个人比较喜欢上面这种写法,它线程安全的理论支撑是JVM隐含了一些同步操作,包括:
1、由静态初始化器(在静态字段上或static{}块中的初始化器)初始化数据时
2、访问final字段时
3、在创建线程之前创建对象时
4、线程可以看见它将要处理的对象时
并且,在静态内部类里去创建本类(外部类)的对象,只要不使用这个静态内部类,它就不创建外部类对象的实例。因为在JVM看来,不管你什么内部类,外部类,对应到class文件都是单独独立的文件。所以静态内部类会在第一次被使用的时候才被装载且具备静态变量的特性——全局只装载一次,实现了懒加载和单例。
以上写法能同时保证延迟加载和线程安全(JVM底层保证)。但是缺陷是可以被反序列化破解,不过个人理解已经够用了,安全防护应该在应用服务的外层和网络上下功夫。
在看一种,如下是《Effective Java》里推荐的,使用枚举类实现单例,它可以保证线程安全,防止反射攻击,防止反序列化攻击,代码简单。
public enum EnumSingleton {
ENUM_SINGLETON;
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
////////////////////////调用
EnumSingleton.ENUM_SINGLETON.setName("dashuai");
System.out.println(EnumSingleton.ENUM_SINGLETON.getName());
回到GlobalEventExecutor,下面看看它的用法,可以在Netty线程池里塞入这个线程执行器,以替换默认的ThreadPerTaskExcutor,如下:
NioEventLoopGroup group =
new NioEventLoopGroup(1, GlobalEventExecutor.INSTANCE);
这个线程单例执行器是一个全局的处理微小工作的工具,这里的微小的工作可以是简单的消息收发等任务,这和Netty线程池默认的ThreadPerTaskExcutor不一样,在默认创建的Netty线程池里,每个NIO线程所共有的线程执行器是ThreadPerTaskExecutor,只不过Netty允许用户额外创建多个ThreadPerTaskExecutor,而GlobalEventExecutor全局有且仅有一个,因为它是饿汉式加载的单例工具类。
下面看它的一个使用demo,如下,这是我之前写的一个基于Netty实现的websocket聊天服务器的代码片段,下面是它处理聊天消息的ChatHandler:
我使用了Netty提供的Channel组——ChannelGroup工具类保存已经成功接入的客户端,可以基于它实现分群组的消息收发功能,该工具类保证线程安全并且能自动删除已经断开连接的Channel,所以完全没有必要再自己搞个list集合吭哧吭哧的写一堆非业务代码。在使用ChannelGroup时需要搭配GlobalEventExecutor。
如下ChatHandler的channelRead0方法,在收到消息后,给某个特定的客户端群组转发该消息,我直接用的ChannelGroup提供的写接口,它可实现批量发送,黄色1和2是为了测试用的,黄色3调用了批量写接口给某个群组发消息,并且返回一个ChannelGroupFuture:
接着,我给ChannelGroupFuture添加一个监听器,即黄色4处。下面看这个批量写消息的API的内部逻辑,如下,内部的executor属性就是GlobalEventExecutor单例对象:
即通过GlobalEventExecutor能实现真正的异步API,它的回调监听器会由GlobalEventExecutor的线程驱动,不再是NIO线程驱动,也不需要用户在自己额外启动非NIO线程执行异步回调。
验证如下,我启动该websocket服务器,并且在浏览器输入一个消息:
观察ChatHandler的channelRead0方法的黄色1和2的输出:
以上,就是GlobalEventExecutor的用法,可以掌握它,避免在项目里重复造轮子。
关于基于Netty实现websocket服务,后续专题总结。
关于Netty的异步API的本质,可以参考:
下面看看GlobalEventExecutor的实现原理,可以学习Netty的设计思路——单例线程如何自动终止,如何自动开始。
先看它的私有的构造器的内部实现:
在初始化时,默认给自己的定时任务队列添加一个定时任务——即延时1s后,周期性(T=1S)执行的定时任务,如下:
final ScheduledFutureTask<Void> quietPeriodTask = new ScheduledFutureTask<Void>(
this, Executors.<Void>callable(new Runnable() {
public void run() {
// NOOP
}
}, null), ScheduledFutureTask.deadlineNanos(SCHEDULE_QUIET_PERIOD_INTERVAL), -SCHEDULE_QUIET_PERIOD_INTERVAL);
在看它的线程执行方法——execute的实现,和普通的NIO线程默认自带的线程执行器——ThreadPerTaskExecutor没什么大的区别:
主要看它启动线程的方法——即execute里面的startThread方法,它的实现逻辑很简单——启动一个线程,并驱动一个taskRunner(本质是Runnable),下面看这个Runnable的run方法,代码较多,只看前面部分:
具体细节不多说,它和NioEventLoop的事件循环机制——run里的runAllTasks方法一样的套路,明白一个即可。即从自己的定时任务队列里抓取即将到deadline的任务,聚合到自己的MPSCQ缓存,同时依次执行MPSCQ里的异步任务,直到task==quietPeriodTask,这就说明1s的周期时间到了,马上执行for(;;)的后半段逻辑,代码就不贴了,很多但不复杂,就是判断该定时任务是不是之前在构造器里塞入的那个周期1s执行的空的定时任务,如果是,且MPSCQ为空了,那么就自动结束该线程;否则继续正常运转,后续每个T=1s的周期都判断一次。。。直到符合要求后自动自己结束,或者被外界强制关闭。
小结:
1、掌握单例模式的两种类型,即线程安全的单例类常见实现方式,和why
2、掌握Netty提供的全局线程单例执行器的特性和在项目里的实际用法
3、Netty的优雅停机API不能注册监听器
4、提供一种思路,即周期性判断某个线程是否应该停止的设计方式
点亮在看,你最好看
~
点阅读原文,获得更多精彩内容