staticfinalint PORT = Integer.parseInt(System.getProperty("port", "8007"));
publicstaticvoidmain(String[] args)throws Exception { // Configure SSL. final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx = null; }
// Configure the server. EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); final EchoServerHandler serverHandler = new EchoServerHandler(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override publicvoidinitChannel(SocketChannel ch)throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(serverHandler); } });
// Start the server. ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
/** * Indicates a blocking select should follow. */ int SELECT = -1; /** * Indicates the IO loop should be retried, no blocking select to follow directly. */ int CONTINUE = -2; /** * Indicates the IO loop to poll for new events without blocking. */ int BUSY_WAIT = -3;
/** * The {@link SelectStrategy} can be used to steer the outcome of a potential select * call. * * @param selectSupplier The supplier with the result of a select result. * @param hasTasks true if tasks are waiting to be processed. * @return {@link #SELECT} if the next step should be blocking select {@link #CONTINUE} if * the next step should be to not select but rather jump back to the IO loop and try * again. Any value >= 0 is treated as an indicator that work needs to be done. */ intcalculateStrategy(IntSupplier selectSupplier, boolean hasTasks)throws Exception; }
if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); }
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) { boolean success = false; try { children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type thrownew IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); }
for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } }
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override publicvoidoperationComplete(Future<Object> future)throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } };
for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); }
看这个构造函数,如果executor是null(这个例子中就是null,可以回顾一下之前的构造函数,第二个构造函数就传入了一个null的executor),就new了一个ThreadPerTaskExecutor,这个ThreadPerTaskExecutor对于每个提交的任务(顾名思义,Thread Per Task),都使用ThreadFactory创建一个新的Thread去 执行这个任务。关键是ThreadFactory,是使用newDefaultThreadFactory()获得的,newDefaultThreadFactory方法创建了一个DefaultThreadFactory,DefaultThreadFactory除了定制了Thread的名字之类的属性外,最重要的是他返回的是FastThreadLocalThread类型的Thread,FastThreadLocalThread里面使用的ThreadLocal是FastThreadLocal,并将任务Runnable包装成一个新的Runable:FastThreadLocalRunnable.wrap(r)。