Netty原理实践解析
Netty作为基于异步事件驱动的网络通信框架,广泛应用于服务端和客户端中,例如RPC远程框架如Dubbo、消息队列RocketMQ的生产者消费者通信以及zookeeper等。Netty是对JDK自带NIO的封装,其优点大家都知道。翻看Netty源码,里面内容繁多,代码嵌套深、继承及实现接口复杂,看得眼花缭乱。自己选择较为稳定的4.1.8版本,在其上简化代码,去掉细枝末节保留主干,更直观的来理解其原理。
简化后的代码里的核心组件主要包括:服务端、客户端启动引导类ServerBootstrap和Bootstrap;处理客户端新连接ServerBootstrapAcceptor;通道相关ChannelHandler,ChannelInitializer,ChannelPipeline,NioServerSocketChannel,NioSocketChannel;线程相关ThreadEventLoopGroup和SingleThreadEventLoop以及处理事件回调MyChannelPromise等。本文围绕这些概念,帮助读者分析服务端和客户端启动具体做了哪些事情,以及两者互相通信发生了什么。
目录结构如下:
(1) bootstap包:存放服务端、客户端启动引导类,以及处理客户端新连接请求类。
(2) channel包:存放服务端、客户端通道,通道处理器以及管道pipeline等。
(3) concurrent包:存放监听器,保存Channel相关异步操作结果和状态的类。
(4) util包:存放socket操作工具类和封装safeExecute方法工具类。
(5) example包:服务端和客户端测试例子。
(6)事件循环线程相关类如SingleThreadEventLoop,ThreadEventLoopGroup。
服务端和客户端内部结构示意图如下所示。
主要思想:事件循环线程组ThreadEventLoopGroup管理多个事件循环线程SingleThreadEventLoop。服务端需要boss线程组和worker线程组,前者只负责处理新连接,后者是监听到连接后,boss线程组里的线程会把SocketChannel转发给worker线程组的线程并注册到其Selector上,处理后续的io事件。客户端只需要worker线程组的线程去连接远程服务端并监听处理读写事件。Netty中有大量异步操作,调用者不需要立刻得到结果,其原理是继承了jdk的Future接口并引入Promise接口,保存了如通道注册、绑定端口等状态结果,然后回调通知调用者执行。项目中通过MyChannelPromise类来实现其功能。
我们看下类SingleThreadEventExecutor、SingleThreadEventLoop、ThreadEventLoopGroup、AbstractChannel和MyChannelPromise之间的关系,如下所示。
通道Channel用于IO读写,支持文件、UDP和TCP操作。项目中用TCP的通道为例。服务端的channel为NioServerSocketChannel,客户端的channel为NioSocketChannel。源码中这两个类上面继承的抽查类有好几层,前者继承AbstractNioByteChannel,后者继承AbstractNioMessageChannel,它们俩有共同的抽象类AbstractNioChannel和AbstractChannel。该项目简化多层的继承关系,NioServerSocketChannel和NioSocketChannel直接继承抽象类AbstractChannel。
源码中实际读写操作是unsafe接口的实现类,服务端采用的是建立新连接及读写操作的NioMessageUnsafe,客户端采用的是字节数据读写的NioByteUnsafe。Netty对其封装不让用户直接调用,需要通过这两个类调用服务端channel和客户端channel来执行读写,它们底层还是jdk NIO的api。本项目跳过unsafe接口实现类调用channel的步骤,让AbstractChannel两个实现类直接负责注册,绑定端口及读写操作。
NioServerSocketChannel、NioSocketChannel和父类AbstractChannel之间关系如下所示:
NIO线程事件循环组是管理多个线程事件循环,用于处理io事件和任务队列里的任务。服务器端需要初始化用来接受客户端请求的bossGroup和处理io读写等事件的workerGroup。
//boss线程组初始化
ThreadEventLoopGroup bossGroup = new ThreadEventLoopGroup(nthread);
//worker线程组初始化
ThreadEventLoopGroup workerGroup = new ThreadEventLoopGroup(nthread);
线程事件循环组ThreadEventLoopGroup类代码如下所示。完成的事主要有:
public class ThreadEventLoopGroup {
private SingleThreadEventLoop[] children;
private final AtomicInteger index = new AtomicInteger();
private static final int DEFAULT_EVENT_LOOP_THREADS = Runtime.getRuntime().availableProcessors()*2;
public ThreadEventLoopGroup(int nThreads){
if (nThreads < 0){
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
nThreads = (nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads);
Executor executor = new ThreadPerTaskExecutor(Executors.defaultThreadFactory());
children = new SingleThreadEventLoop[nThreads];
for (int i = 0; i < nThreads; i ++) {
children[i] = new SingleThreadEventLoop(executor);
}
}
public SingleThreadEventLoop chooser(){
return children[Math.abs(index.getAndIncrement() % children.length)];
}
public MyChannelPromise register(AbstractChannel channel) {
return chooser().register(channel);
}
}
(1)检查用户指定给线程事件循环线程数是否满足条件。一方面它不能为负数;另一方面当指定数量为0时,则默认赋值为CPU核心数的2倍。
(2)创建线程执行者ThreadPerTaskExecutor。构造方法的参数线程工厂threadFactory,没有使用Netty源码自定义DefaultThreadFactory类,采用JUC中Executors.defaultThreadFactory()生成新线程,并调整其优先级为最大,优先执行。
(3)创建并初始化SingleThreadEventLoop数组。该数组的每个元素是一个事件循环线程。SingleThreadEventLoop类的构造方法初始化了选择器Selector。
SingleThreadEventLoop(Executor executor){
super(executor);
try {
selector = Selector.open();
} catch (IOException e) {
e.printStackTrace();
}
this.executor = executor;
}
在ServerBootstrap引导类的initAndRegister方法中需要做两件事:首先创建并初始化通道,然后把通道注册到选择器Selector上。
public MyChannelPromise initAndRegister(){
AbstractChannel channel = null;
try{
//创建通道
channel = new NioServerSocketChannel(SelectionKey.OP_ACCEPT);
//初始化通道
initChannel(channel);
}catch(Throwable t){
return new MyChannelPromise(channel).setFailure(t);
}
//通道注册
MyChannelPromise promise = bossGroup.register(channel);
return promise;
}
2.2.1通道初始化
我们先看下通道的创建及初始化。在NioServerSocketChannel构造方法中完成OP_ACCEPT事件的设置,打开通道并设置通道为非阻塞模式。
public NioServerSocketChannel(int readInterestOp) {
super();
try {
//设置感兴趣事件
this.readInterestOp = readInterestOp;
//打开通道
channel = ServerSocketChannel.open();
//设置非阻塞
channel.configureBlocking(false);
} catch (IOException e) {
try {
channel.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
}
}
再来看下通道相关初始化,主要功能有:
(1) 初始化管道pipeline,其作用是传播事件,拦截用户自定义实现业务逻辑处理的ChannelHandler。事件类型分为inbound请求和outbound通知。inbound类型涵盖传播channelRegistered,channelActive和channelRead等事件。outbound类型包括bind,connect,read和write等操作。
(2) 通过addLast方法在pipeline上添加一个ChannelInitializer对象,它重写initChanel方法,方法里设置bossHandler到channel所属的pipeline上。
(3) 往pipeline里加入一个专门接受新请求的处理器ServerBootstrapAcceptor放到当前channel绑定SingleThreadEventLoop的任务队列里等待执行。
public void initChannel(AbstractChannel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
final ThreadEventLoopGroup curWorkerGroup = workerGroup;
final ChannelHandler curWorkerHandler = workerHandler;
p.addLast(new ChannelInitializer() {
public void initChannel(final AbstractChannel channel) throws Exception {
final ChannelPipeline pipeline = channel.pipeline();
ChannelHandler handler = bossHandler;
if (handler != null) {
pipeline.addLast(handler);
}
channel.threadEventLoop().execute(new Runnable() {
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
channel, curWorkerGroup, curWorkerHandler));
}
});
}
});
}
此时服务端channel所属管道pipeline如下图所示。
addLast方法具体逻辑是:
(1) 实例化一个封装了handler的defaultChannelHandlerContext,添加到管道pipleline中。
(2) 判断当前channel是否注册到SingleThreadEventLoop上的Selector:
a. 若已注册完成,则会执行方法callHandlerAdded0,调用 handlerAdded方法。
b. 若还未完成注册,则通过方法setAddPending(),设置当前Context的属性HANDLER_STATE_UPDATER为等待状态。并调用方法callHandlerCallbackLater(newCtx, true),实现把当前Context封装为任务PendingHandlerAddedTask,并添加到以PendingHandlerCallbackHead为链表头的单向链表PendingHandlerCallback中。DefaultChannelPipeline类中引用了链表头PendingHandlerCallbackHead。
当通道注册完后,会调用pipeline.invokeHandlerAddedIfNeeded方法从PendingHandlerCallbackHead表头开始执行handlerAdded方法。
new出来的ChannelInitializer是个特殊的ChannelInboundHandler,作用是往pipleline里添加用户自定义的handler。重写的方法initChannel执行完后会删除自己。上述ChannelInitializer把ServerBootstrapAcceptor加入到管道中,如下图所示。
2.2.2通道注册
当执行完通道相关初始化工作后,在ThreadEventLoopGroup类的register方法中,通过chooser()方法从SingleThreadEventLoop数组中选择一个事件循环线程所绑定通道执行注册NioServerSocketChannel操作方法。若当前线程不是当前通道绑定的线程,则执行如下代码。
threadEventLoop.execute(new Runnable() {
@Override
public void run() {
register0(channelPromise);
}
});
在threadEventLoop.execute方法中,如果当前线程是事件循环线程则创建的任务放到队列中等待执行。循环事件线程状态有:未开始,已开始,关闭中,已关闭和已终止。刚开始SingleThreadEventLoop的成员变量thread为null,则通过doStartThread方法创建启动新线程,赋值thread状态为已开始,保证一个事件循环线程绑定一个线程。如下图所示,main线程中执行的register方法,当前事件循环线程绑定的thread为null。
doStartThread方法里新启动的线程中完成的事有:检测是否有就绪事件,执行就绪事件,最后处理队列任务。
public void run() {
while (true) {
try {
//检测多个通道上是否有就绪的感兴趣事件的发生
selector.select(1000);
//处理感兴趣事件
processSelectedKeys(selector.selectedKeys());
//处理队列任务
runAllTasks();
} catch (Exception e) {
// ignore
}
}
}
在方法register0中执行具体注册操作,注册前需要确保channel没有关闭。这个是通过MyChannelPromise类中原子更新器RESULT_UPDATER获取成员result值。若值为UNCANCELLABLE,则认为当前channel是可用的。
private void register0(MyChannelPromise channelPromise) {
try {
...
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(channelPromise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
}
beginRead();
}
...
}
register0的具体任务如下:
(1) 执行注册通道操作。doRegister()该方法定义在抽象方法AbstractChannel中,NioServerSocketChannel实现该方法,调用jdk NIO的ServerSocketChannel注册到Selector上,不过这里没有注册任何事件,只是延迟到端口绑定步骤里。
public void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = channel.register(eventLoop().selector(), 0, this);
return;
...
}
}
}
(2) 调用方法safeSetSuccess(channelPromise)设置当前promise的状态为success。
(3) 当第一次注册成功,会执调用方法callHandlerAddedForAllHandlers。执行通道为注册成功前挂起的任务PendingHandlerAddedTask。从链表表头开始执行,调用callHandlerAdded0方法执行handlerAdded操作。并通过原子更新器HANDLER_STATE_UPDATER更新handler状态为ADD_COMPLETE。
(4) ChannelHander都加到管道后,从管道Head头部沿着链表寻找inboud类型handler传播channelRegistered事件。
此时管道结构为: head->ServerBootstrapAcceptor->tail
根据initAndRegister方法返回的MyChannelPromise类型regFuture判断当前通道是否注册成功,若注册成功则进行server端口绑定。反之添加listener,等注册完成后再绑定server端口。
端口绑定调用链: ServerBootstrap#doBind0-->AbstractChannel#bind-->DefaultChannelPipeline#bind
从管道tail沿着链表向head方向找outbound事件,当到达head时,在HeadContext里调用AbstractChannel的bindChannel方法。该方法内容如下所示。
public void bindChannel(SocketAddress localAddress, MyChannelPromise promise){
assertEventLoop();
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
上面第5行代码中doBind是一个抽象方法,服务端NIOServerSocketChannel实现了该方法,利用java NIO的api,ServerSocketChannel的bind方法实现端口绑定。
protected void doBind(SocketAddress localAddress) throws Exception {
channel.bind(localAddress);
}
若端口由未绑定到完成绑定后,则以任务形式执行第14行代码中fireChannelActive方法。从管道head开始。在HeadContext方法中完成两件事:
(1) 朝tail方向寻找inboud类型的ChannelHandler,执行channelActive方法;
(2) 注册OP_ACCEPT事件,调用链如下:
HeadContext#readIfIsAutoRead-->AbstractChannel#beginRead--> AbstractChannel#doBeginRead-->NioServerSocketChannel#doBeginRead,通过selectionKey.interestOps方法设置感兴趣事件。
服务端启动用到两个事件循环线程组,一个是接受客户端连接,另一个是处理客户端的读写。创建NioServerSocketChannel时传入OP_ACCEPT保存在其成员变量readInterestOp中,并打开通道,设置非阻塞模式。注册通道后会在pipeline上传播ChannelRegistered事件。
我们可以自定义ChannelInitializer对象重写initChannel方法来添加ChannelHandler。通道注册完成前,ChannelHandler通过PendingHandlerAddedTask形式保存在DefaultChannelPipeline引用的以pendingHandlerCallbackHead为链表头的链表里。当通道注册成功后,回调其handlerAdded方法。如果通道绑定端口成功但首次注册成功后会传播ChannelActive事件,管道头部HeadContext里注册了前面保存的OP_ACCEPT事件。
private MyChannelPromise doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
final MyChannelPromise regFuture = initAndRegister();
final AbstractChannel channel = regFuture.channel();
if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
}
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newMyChannelPromise());
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final MyChannelPromise promise = new MyChannelPromise(channel);
regFuture.addListener(new MyEventListener() {
@Override
public void operationComplete(MyChannelPromise future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
}
}
});
return promise;
}
}
客户端的通道初始化及注册逻辑跟服务端流程相似。不同的地方是initAndRegister方法中的创建的是NioSocketChannel,其构造方法里给成员变量readInterestOp赋值SelectionKey.OP_READ,保存读事件,注册成功后再传播channelActive。
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
boolean success = false;
try {
boolean connected = channel.connect(remoteAddress);
if (!connected) {
this.selectionKey.interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
客户端事件循环线程一直监听OP_CONNECT事件。如下图所示,表示客户端监听到连接事件。
此时在SingleThreadEventLoop类中processSelectedKey方法里,调用NioSocketChannel的finishConnect方法完成连接操作。
private void processSelectedKey(SelectionKey k, AbstractChannel ch) {
...
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
ch.finishConnect();
}
...
}
finishConnect方法完成的事有:
(1) doFinishConnect里调用java底层socketChannel的finishConnect方法完成连接操作。
(2) fulfillConnectPromise里,若channel是打开状态且已经连接到服务端的情况下,会在管道传播channelActive事件。这里客户端传播channelActive事件会设置OP_READ,跟服务端有区别。从管道的head开始查找inbound类型Context,最后到达链表尾部tail。HeadContex调用readIfIsAutoRead()路径是:HeadContex#readIfIsAutoRead-->AbstractChannel#beginRead()-->NioSocketChannel#doBeginRead。
public void doBeginRead() throws Exception {
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
//这里readInterestOp为读事件,已在创建NioSocketChannel里赋值
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
服务端和客户端正常启动后,我们看下服务端例子,TestServer类里服务端添加workHandler为ServerHandler。它继承了类ChannelInboundHandlerAdapter,重写channelRead方法和channelActive方法。客户端每次给服务端发送数据时回调ServerHandler中的channelRead方法,当channel已注册到selector并绑定端口成功后会回调ServerHandler中的channelActive方法。TestServer和ServerHandler主要代码如下所示。
客户端TestClient例子中,添加workerHandler为ClientHandler类, 它继承ChannelInboundHandlerAdapter,重写channelRead方法,打印读到的消息。
服务端启动完成后,此时管道pipeline结构为:HeadContext--->ServerBootstrap Acceptor --->TailContext。
客户端向服务端发出连接操作后,服务端中SingleThreadEventLoop事件循环线程监听到感兴趣acceptor事件时,执行NioServerSocketChannel的read方法。
NioServerSocketChannel的read方法主要代码如下所示:
try {
int totalMessages;
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
totalMessages = localRead;
} while (totalMessages<16);
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
pipeline.fireChannelReadComplete();
doReadMessages方法里通过 ServerSocketChannel.accept() 方法监听新进来的连接SocketChannel,放到readBuf里。
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = channel.accept();
try {
if (ch != null) {
buf.add(new NioSocketChannel(ch));
return 1;
}
...
}
客户端和服务端已建立好了连接通道:
[Connected:local=/127.0.0.1:8888 remote=/127.0.0.1:51223]。
接下来从HeadContext开始,沿着链表向后寻找inbound类型ChannelHandler执行channelRead方法传播。HeadContext下一个inbound类型ChannelHandler是接受新连接处理器ServerBootstrapAcceptor。它的channelRead方法作用是接受客户端的请求NioSocketChannel,并在其绑定管道pipeline中添加用户自定义的ServerHandler。
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final AbstractChannel workChannel = (AbstractChannel) msg;
//此时childHandler是ServerHandler
workChannel.pipeline().addLast(childHandler);
try {
//此时workChannel是NioSocketChannel
childGroup.register(workChannel).addListener(new MyEventListener() {
public void operationComplete(MyChannelPromise future) throws Exception {
if (!future.isSuccess()) {
forceClose(workChannel, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(workChannel, t);
}
}
childGroup.register(workChannel)语句是在childGroup线程组中选择一个SingleThreadEventLoop去执行注册。流程跟bossGroup的注册类似。在AbstractChannel的register0方法里会执行pipeline.fireChannelActive()方法
传播channelActive事件,从管道head向后搜索inbound类型的ChannelHandler。
此时管道链是headContext-->DefaultChannelHandlerContext-->tailContext,DefaultChannelHandlerContext封装的是ServerHandler,会执行其重写的channelActive方法。传播路径如下图所示。
如下图所示pool-3-thread-1是它绑定的线程。该注册流程跟bossGroup类似。
AbstractChannel里doRegister方法是实现类NioSocketChannel来执行,在NioSocketChannel的doBeginRead方法里注册读事件。
客户端TestClient类里下面代码第2行向服务端发送数据, 执行链:
AbstractChannel#writeAndFlush-->DefaultChannelPipeline#writeAndFlush-->tail#writeAndFlush-->AbstractChannelHandlerContext#writeAndFlush--> AbstractChannelHandlerContext#doWrite
MyChannelPromise f = b.connect("127.0.0.1", 8888).sync();
f.channel().writeAndFlush("hello server, 我是客户端!");
在doWrite方法里做的事有:
(1) 从管道tail中向前搜索outbound类型的ChannelHandler。此时管道链是headContext-->DefaultChannelHandlerContext-->tailContext,DefaultChannelHandlerContext封装的是clientHandler,它属于inboud类型所以下图中next为headContext。因为执行doWrite方法为main线程,executor绑定的线程不一样,所以把写操作作为任务放到executor的队列里等待执行。
(2) write0()具体执行堆栈路径如下图所示,最后到NioSocketChannel类里write方法,通过java NIO中SocketChannel的write方法发送数据。
服务端workGroup中的SingleThreadEventLoop(线程为下图中的pool-3-thread-1),监听到感兴趣读事件时,执行NioSocketChannel的read方法。read方法里会执行沿着pipeline传播channelRead事件。
此时pipeline结构为: HeadContext-->DefaultChannelHandlerContext-->TaiContext。其中DefaultChannelHandlerContext封装了ServerHandler。
ByteBuffer readBuf = ByteBuffer.allocate(1024);
int len = 0;
while((len = this.channel.read(readBuf)) > 0 ){
readBuf.flip();
String data = new String(readBuf.array(), 0, len);
pipeline.fireChannelRead(data);
readBuf.clear();
}
pipeline.fireChannelReadComplete();
ServerHandler里重新channelRead方法打印消息,并发送回执消息。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
logger.info("服务端收到消息:"+ msg);
ctx.channel().writeAndFlush("hello client,我是服务端!");
ctx.close();
}
同理客户端的SingleThreadEventLoop轮询到读事件,流程跟服务端的读事件相同。客户端往管道添加的ClientHandler类,重写了channelRead方法来打印服务端发过滤的消息。
最后客户端发送消息及收到服务器端回执的消息,效果如下:
服务端收到消息并且给客户端回执消息,效果如下:
本文在自己对源码理解基础上进行简化,保留了核心流程。从服务端,客户端启动以及两端通信交互角度进行了分析,帮助读者快速了解Netty的核心原理,带大家入门。其实源码中还有一些更深层次的细节如Netty高低水位,流量整形和池化设计思想等等,值得花时间去学习研究。希望本文对大家有所启发,文中有不准确的地方,欢迎大家一起交流。