源码分析netty服务器创建过程vs java nio服务器创建
首先,我们通过一个时序图来看下如何创建一个NIO服务端并启动监听,接收多个客户端的连接,进行消息的异步读写。
示例代码(参考文献【2】):
import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.CharBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.nio.charset.Charset;import java.nio.charset.CharsetDecoder;import java.util.Iterator;import java.util.Set;/*** User: mihasya* Date: Jul 25, 2010* Time: 9:09:03 AM*/public class JServer {public static void main (String[] args) {ServerSocketChannel sch = null;Selector sel = null;try {// setup the socket we're listening for connections on.InetSocketAddress addr = new InetSocketAddress(8400);sch = ServerSocketChannel.open();sch.configureBlocking(false);sch.socket().bind(addr);// setup our selector and register the main socket on itsel = Selector.open();sch.register(sel, SelectionKey.OP_ACCEPT);} catch (IOException e) {System.out.println("Couldn't setup server socket");System.out.println(e.getMessage());System.exit(1);}// fire up the listener thread, pass it our selectorListenerThread listener = new ListenerThread(sel);listener.run();}/** the thread is completely unnecessary, it could all just happen* in main()*/class ListenerThread extends Thread {Selector sel = null;ListenerThread(Selector sel) {this.sel = sel;}public void run() {while (true) {// our canned response for nowByteBuffer resp = ByteBuffer.wrap(new String("got it\n").getBytes());try {// loop over all the sockets that are ready for some activitywhile (this.sel.select() > 0) {Set keys = this.sel.selectedKeys();Iterator i = keys.iterator();while (i.hasNext()) {SelectionKey key = (SelectionKey)i.next();if (key.isAcceptable()) {// this means that a new client has hit the port our main// socket is listening on, so we need to accept the connection// and add the new client socket to our select pool for reading// a command laterSystem.out.println("Accepting connection!");// this will be the ServerSocketChannel we initially registered// with the selector in main()ServerSocketChannel sch = (ServerSocketChannel)key.channel();SocketChannel ch = sch.accept();ch.configureBlocking(false);ch.register(this.sel, SelectionKey.OP_READ);} else if (key.isReadable()) {// one of our client sockets has received a command and// we're now ready to read it inSystem.out.println("Accepting command!");SocketChannel ch = (SocketChannel)key.channel();ByteBuffer buf = ByteBuffer.allocate(200);ch.read(buf);buf.flip();Charset charset = Charset.forName("UTF-8");CharsetDecoder decoder = charset.newDecoder();CharBuffer cbuf = decoder.decode(buf);System.out.print(cbuf.toString());// re-register this socket with the selector, this time// for writing since we'll want to write something to it// on the next go-aroundch.register(this.sel, SelectionKey.OP_WRITE);} else if (key.isWritable()) {// we are ready to send a response to one of the client sockets// we had read a command from previouslySystem.out.println("Sending response!");SocketChannel ch = (SocketChannel)key.channel();ch.write(resp);resp.rewind();// we may get another command from this guy, so prepare// to read again. We could also close the channel, but// that sort of defeats the whole purpose of doing asyncch.register(this.sel, SelectionKey.OP_READ);}i.remove();}}} catch (IOException e) {System.out.println("Error in poll loop");System.out.println(e.getMessage());System.exit(1);}}}}}
从上面的代码可以看出java nio的通用步骤:
1.打开ServerSocketChannel,用于监听客户端的连接,它是所有客户端连接的父通道,绑定监听端口,设置客户端连接方式为非阻塞模式。
2.打开多路复用器并启动服务端监听线程,将ServerSocketChannel注册到Reactor线程的多路复用器Selector上,监听ACCEPT状态。
3.多路复用器监听到有新的客户端接入,处理新的接入请求,完成TCP三次握手后,与客户端建立物理链路。
2. Netty服务端创建
2.1 打开ServerSocketChannel
ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
创建一个ServerSocketChannel的过程:
/*** Create a new instance*/public NioServerSocketChannel() {this(newSocket(DEFAULT_SELECTOR_PROVIDER));}
调用newSocket方法:
private static ServerSocketChannel newSocket(SelectorProvider provider) {try {/*** Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in* {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.** See <a href="See https://github.com/netty/netty/issues/2308">#2308</a>.*/return provider.openServerSocketChannel();} catch (IOException e) {throw new ChannelException("Failed to open a server socket.", e);}}
其中的provider.openServerSocketChannel()就是java nio的实现。设置非阻塞模式包含在父类中:
/*** Create a new instance** @param parent the parent {@link Channel} by which this instance was created. May be {@code null}* @param ch the underlying {@link SelectableChannel} on which it operates* @param readInterestOp the ops to set to receive data from the {@link SelectableChannel}*/protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);this.ch = ch;this.readInterestOp = readInterestOp;try {ch.configureBlocking(false);} catch (IOException e) {try {ch.close();} catch (IOException e2) {if (logger.isWarnEnabled()) {logger.warn("Failed to close a partially initialized socket.", e2);}}throw new ChannelException("Failed to enter non-blocking mode.", e);}}
2.2 打开多路复用器过程
NioEventLoop负责调度和执行Selector轮询操作,选择准备就绪的Channel集合,相关代码如下:
protected void run() {for (;;) {boolean oldWakenUp = wakenUp.getAndSet(false);try {if (hasTasks()) {selectNow();} else {select(oldWakenUp);// 'wakenUp.compareAndSet(false, true)' is always evaluated// before calling 'selector.wakeup()' to reduce the wake-up// overhead. (Selector.wakeup() is an expensive operation.)//// However, there is a race condition in this approach.// The race condition is triggered when 'wakenUp' is set to// true too early.//// 'wakenUp' is set to true too early if:// 1) Selector is waken up between 'wakenUp.set(false)' and// 'selector.select(...)'. (BAD)// 2) Selector is waken up between 'selector.select(...)' and// 'if (wakenUp.get()) { ... }'. (OK)//// In the first case, 'wakenUp' is set to true and the// following 'selector.select(...)' will wake up immediately.// Until 'wakenUp' is set to false again in the next round,// 'wakenUp.compareAndSet(false, true)' will fail, and therefore// any attempt to wake up the Selector will fail, too, causing// the following 'selector.select(...)' call to block// unnecessarily.//// To fix this problem, we wake up the selector again if wakenUp// is true immediately after selector.select(...).// It is inefficient in that it wakes up the selector for both// the first case (BAD - wake-up required) and the second case// (OK - no wake-up required).if (wakenUp.get()) {selector.wakeup();}}cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;if (ioRatio == 100) {processSelectedKeys();runAllTasks();} else {final long ioStartTime = System.nanoTime();processSelectedKeys();final long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}if (isShuttingDown()) {closeAll();if (confirmShutdown()) {break;}}} catch (Throwable t) {logger.warn("Unexpected exception in the selector loop.", t);// Prevent possible consecutive immediate failures that lead to// excessive CPU consumption.try {Thread.sleep(1000);} catch (InterruptedException e) {// Ignore.}}}}
2.2.1 绑定处理的key
private void processSelectedKeys() {if (selectedKeys != null) {processSelectedKeysOptimized(selectedKeys.flip());} else {processSelectedKeysPlain(selector.selectedKeys());}}
以processSelectedKeysPlain为例:
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {// check if the set is empty and if so just return to not create garbage by// creating a new Iterator every time even if there is nothing to process.// See https://github.com/netty/netty/issues/597if (selectedKeys.isEmpty()) {return;}Iterator<SelectionKey> i = selectedKeys.iterator();for (;;) {final SelectionKey k = i.next();final Object a = k.attachment();i.remove();if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);} else {NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}if (!i.hasNext()) {break;}if (needsToSelectAgain) {selectAgain();selectedKeys = selector.selectedKeys();// Create the iterator again to avoid ConcurrentModificationExceptionif (selectedKeys.isEmpty()) {break;} else {i = selectedKeys.iterator();}}}}
2.3 绑定端口,接收请求:
// Bind and start to accept incoming connections.ChannelFuture f = b.bind(PORT).sync();
调用bind程序
private ChannelFuture doBind(final SocketAddress localAddress) {final ChannelFuture regFuture = initAndRegister();final Channel channel = regFuture.channel();if (regFuture.cause() != null) {return regFuture;}if (regFuture.isDone()) {// At this point we know that the registration was complete and successful.ChannelPromise promise = channel.newPromise();doBind0(regFuture, channel, localAddress, promise);return promise;} else {// Registration future is almost always fulfilled already, but just in case it's not.final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);regFuture.addListener(new ChannelFutureListener() {public void operationComplete(ChannelFuture future) throws Exception {Throwable cause = future.cause();if (cause != null) {// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an// IllegalStateException once we try to access the EventLoop of the Channel.promise.setFailure(cause);} else {// Registration was successful, so set the correct executor to use.// See https://github.com/netty/netty/issues/2586promise.executor = channel.eventLoop();}doBind0(regFuture, channel, localAddress, promise);}});return promise;}}
最终的绑定由dobind0来完成
private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up// the pipeline in its channelRegistered() implementation.channel.eventLoop().execute(new OneTimeTask() {public void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});}
具体实现:
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();if (!k.isValid()) {// close the channel if the key is not valid anymoreunsafe.close(unsafe.voidPromise());return;}try {int readyOps = k.readyOps();// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead// to a spin loopif ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();if (!ch.isOpen()) {// Connection already closed - no need to handle write.return;}}if ((readyOps & SelectionKey.OP_WRITE) != 0) {// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to writech.unsafe().forceFlush();}if ((readyOps & SelectionKey.OP_CONNECT) != 0) {// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking// See https://github.com/netty/netty/issues/924int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}}
参考文献
【1】http://www.infoq.com/cn/articles/netty-server-create
【2】https://github.com/mihasya/sample-java-nio-server/blob/master/src/JServer.java
