源码解密Netty系列 | 前话 - 磨刀不误砍柴工
Part 1 楔子
作为一个技术人,应该时刻保持饥渴感,对未知世界的好奇心。
对于优秀的开源框架的源码,我们都会抱着敬畏之心去拜读。对于如何去阅读源码,简单总结如下:
第一步学会如何使用框架,起码入门的工程我们可以独立跑起来,可以畅通无阻Debug。
第二步,我们在阅读框架源码之前,先查官网或者网上的资料,了解这个框架的整体架构设计,核心组件,以及相关的一些概念。做到胸中有山水,了解大致的数据流向。此外要准备充分,做好知识点预热。比如:Netty相关的NIO、IO多路复用模型、Reactor线程模型、阻塞IO、非阻塞IO,同步调用,异步调用等等,这是我们需要提前掌握基础。否则理解起来会很懵。
第三步,我们对照第二步的架构设计,我们分析框架源码的包结构,摸清楚框架有哪些包,是负责哪些模块的,针对一些框架的核心组件类,我们最好先了解下它的类继承图,这样在读源码过程中,我们不会莫名地找不到方法的真实调用类。
第四步,开始阅读部分核心源码。优秀的框架一般都是大而全,功能完善,如果逐个阅读,会迷失其中。所以我们需要先抓第二步的核心组件,重点对象进行阅读,理解其领域概念及设计思想,理清脉络。然后在逐步的依赖调用中,慢慢往外延伸、扩展,最后蔓延到整个框架。在阅读过程中,复杂的调用部分,要记得画数据流向图,方便理解。这一部分我们也是熟悉整个的数据流程。这部分我们学习的是设计思想与理念。
第五步,精雕细琢,查漏补缺。框架中的部分模块可能比较独立。比如,Netty中的编码解码器。也有些地方属于框架的基础核心代码,比如:粘包拆包,对象池,高性能无锁队列,FastThreadLocal,零拷贝,Futrue&Promise等等。需要我们逐句去拜读,读懂。这部分需要我们汲取的是编程的技巧与思维。
最后,也是我们最容易忽略的。尽可能自己手动去模仿实现一个简易版本的框架。验证自己的学习成果和理解。
以上只是一些个人的想法,欢迎相互交流学习。下面开始正文。
Part 2 Netty的前世今生
什么是Netty
Netty是一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。
作为当前最流行的NIO框架,Netty在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,一些业界著名的开源组件也基于Netty的NIO框架构建。
目前官方推荐的Netty版本为4.X版本。5.X版本因为引入ForkJoin线程池,导致代码复杂化且性能收益一般,暂时作为放弃的版本。
Netty的优势
1、API使用简单,开发门槛低;
2、功能强大,预置了多种编解码功能,支持多种主流协议;
3、定制能力强,可以通过ChannelHandler对通信框架进行灵活地扩展;
4、性能高,通过与其他业界主流的NIO框架对比,Netty的综合性能最优;
5、成熟、稳定,Netty修复了已经发现的所有JDK NIO BUG,业务开发人员不需要再为NIO的BUG而烦恼;
6、社区活跃,版本迭代周期短,发现的BUG可以被及时修复,同时,更多的新功能会加入;
7、经历了大规模的商业应用考验,质量得到验证。在互联网、大数据、网络游戏、企业应用、电信软件等众多行业得到成功商用,证明了它已经完全能够满足不同行业的商业应用了。
Netty的应用场景
1. 构建高性能、低时延的各种 Java 中间件,例如 MQ、分布式服务框架、ESB 消息总线等,Netty 主要作为基础通信框架提供高性能、低时延的通信服务;典型的应用有:阿里分布式服务框架Dubbo、淘宝的消息中间件 RocketMQ 等等。
2. 研发公有或者私有协议栈的基础通信框架,例如可以基于 Netty 构建异步、高性能的 WebSocket 协议栈;
3. 各领域应用,例如大数据、游戏等,Netty 作为高性能的通信框架用于内部各模块的数据分发、传输和汇总等,实现模块之间高性能通信。经典的Hadoop的高性能通信和序列化组件Avro的RPC框架,默认采用Netty进行跨界点通信,它的Netty Service基于Netty框架二次封装实现。
Netty与Mina的比较
1. 社区活跃度。
Netty的社区活跃度远高于Mina。Netty还有JBoss作为强大的背书。基于这方面考虑,Netty肯定是作为首选。
2. 任务调度粒度。
Netty4.x的任务队列的粒度细化到3中队列中。执行优先级: taskQueue > scheduledTaskQueue > tailTasksQueue。这样任务执行的公平性更好。
Part 3 源码预热知识点
核心概念
阻塞调用与非阻塞调用的区别
他们之间的区别强调的是调用方的状态。
阻塞调用 - 在调用结果返回之前,当前线程会被挂起,调用线程只有在得到结果之后才会返回。调用方一直在等待而且别的事情什么都不做。
同步处理与异步处理的区别
他们之间区别强调的是被调用者的状态。
同步处理 - 被调用方得到最终结果之后才返回给调用方。
异步处理 - 被调用方先返回应答,然后再计算调用结果,计算完最终结果后再通知并返回给调用方
举一个生动的栗子:
老李爱喝茶,煮开水。
1. 老李把水壶放到火上,静等水开。(同步阻塞)
老李觉得自己有点傻。
2. 老李把水壶放到火上,去客厅看电视,时不时去厨房看看水有没有开。(同步非阻塞)老李还是觉得自己有点傻。于是乎买了把可以响笛的水壶,水烧开会发出声响。
3. 老李把响水壶放在火上,立等水开。(异步阻塞)
4. 老李把响水壶放到火上,去客厅看电视,水壶响之前不再去看它了,响了再去拿壶。(异步非阻塞)
五种常见的网络IO模型
1. 阻塞IO
在阻塞式 I/O 模型中,应用程序在从调用 recvfrom 开始到它返回有数据报准备好这段时间是阻塞的,recvfrom 返回成功后,应用进程开始处理数据报。
比喻:一个人在钓鱼,当没鱼上钩时,就坐在岸边一直等。
优点:程序简单,在阻塞等待数据期间进程/线程挂起,基本不会占用 CPU 资源。
缺点:每个连接需要独立的进程/线程单独处理,当并发请求量大时为了维护程序,内存、线程切换开销较大,这种模型在实际生产中很少使用。
2. 非阻塞IO
在非阻塞式 I/O 模型中,应用程序把一个套接口设置为非阻塞,就是告诉内核,当所请求的 I/O 操作无法完成时,不要将进程睡眠。
而是返回一个错误,应用程序基于 I/O 操作函数将不断的轮询数据是否已经准备好,如果没有准备好,继续轮询,直到数据准备好为止。
比喻:边钓鱼边玩手机,隔会再看看有没有鱼上钩,有的话就迅速拉杆。
优点:不会阻塞在内核的等待数据过程,每次发起的 I/O 请求可以立即返回,不用阻塞等待,实时性较好。
缺点:轮询将会不断地询问内核,这将占用大量的 CPU 时间,系统资源利用率较低,所以一般 Web 服务器不使用这种 I/O 模型。
3. 多路复用NIO(IO Multiplexing)、事件驱动IO(Event Drive IO)
select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接
其次,该模型将事件探测和事件响应夹杂在一起,一旦事件响应的执行体庞大,则对整个模型是灾难性的。
幸运的是,有很多高效的事件驱动库可以屏蔽上述的困难,常见的事件驱动库有libevent库,还有作为libevent替代者的libev库。这些库会根据操作系统的特点选择最合适的事件探测接口,并且加入了信号(signal) 等技术以支持异步响应,这使得这些库成为构建事件驱动模型的不二选择。
在 I/O 复用模型中,会用到 Select 或 Poll 函数或 Epoll 函数(Linux 2.6 以后的内核开始支持),这两个函数也会使进程阻塞,但是和阻塞 I/O 有所不同。
这两个函数可以同时阻塞多个 I/O 操作,而且可以同时对多个读操作,多个写操作的 I/O 函数进行检测,直到有数据可读或可写时,才真正调用 I/O 操作函数。
比喻:放了一堆鱼竿,在岸边一直守着这堆鱼竿,没鱼上钩就玩手机。
优点:可以基于一个阻塞对象,同时在多个描述符上等待就绪,而不是使用多个线程(每个文件描述符一个线程),这样可以大大节省系统资源。
缺点:当连接数较少时效率相比多线程+阻塞 I/O 模型效率较低,可能延迟更大,因为单个连接处理需要 2 次系统调用,占用时间会有增加。Nginx这样的高性能互联网反向代理服务器大获成功的关键就是得益于Epoll。
下图为目前实现多路复用的几种重要实现。
Select、poll、epoll、kqueue。
4. 信号驱动模型
在信号驱动式 I/O 模型中,应用程序使用套接口进行信号驱动 I/O,并安装一个信号处理函数,进程继续运行并不阻塞。
当数据准备好时,进程会收到一个 SIGIO 信号,可以在信号处理函数中调用 I/O 操作函数处理数据。
比喻:鱼竿上系了个铃铛,当铃铛响,就知道鱼上钩,然后可以专心玩手机。
优点:线程并没有在等待数据时被阻塞,可以提高资源的利用率。
缺点:信号 I/O 在大量 IO 操作时可能会因为信号队列溢出导致没法通知。
信号驱动 I/O 尽管对于处理 UDP 套接字来说有用,即这种信号通知意味着到达一个数据报,或者返回一个异步错误。
但是,对于 TCP 而言,信号驱动的 I/O 方式近乎无用,因为导致这种通知的条件为数众多,每一个来进行判别会消耗很大资源,与前几种方式相比优势尽失。
5. 异步IO模型
由 POSIX 规范定义,应用程序告知内核启动某个操作,并让内核在整个操作(包括将数据从内核拷贝到应用程序的缓冲区)完成后通知应用程序。
这种模型与信号驱动模型的主要区别在于:信号驱动 I/O 是由内核通知应用程序何时启动一个 I/O 操作,而异步 I/O 模型是由内核通知应用程序 I/O 操作何时完成。
优点:异步 I/O 能够充分利用 DMA 特性,让 I/O 操作与计算重叠。
缺点:要实现真正的异步 I/O,操作系统需要做大量的工作。目前 Windows 下通过 IOCP 实现了真正的异步 I/O。
而在 Linux 系统下,Linux 2.6才引入,目前 AIO 并不完善,因此在 Linux 下实现高并发网络编程时都是以 IO 复用模型模式为主。
5种IO模型的对比:
从上图中我们可以看出,越往后,阻塞越少,理论上效率也是最优。
这五种 I/O 模型中,前四种属于同步 I/O,因为其中真正的 I/O 操作(recvfrom)将阻塞进程/线程,只有异步 I/O 模型才与 POSIX 定义的异步 I/O 相匹配。
常见的线程模型
1. 传统的同步阻塞模型
特点:
1)采用阻塞式 I/O 模型获取输入数据;
2)每个连接都需要独立的线程完成数据输入,业务处理,数据返回的完整操作。单连接。
存在问题:
1)当并发数较大时,需要创建大量线程来处理连接,系统资源占用较大;
2)连接建立后,如果当前线程暂时没有数据可读,则线程就阻塞在 Read 操作上,造成线程资源浪费。
2. Reactor模型
针对传统阻塞 I/O 服务模型的 2 个缺点,比较常见的有如下解决方案:
1)基于 I/O 复用模型:多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象上等待,无需阻塞等待所有连接。当某条连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理;
2)基于线程池复用线程资源:不必再为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务。
Part 4 Netty整体架构图与包结构
-
netty -
buffer 缓冲区模块 -
codec 编码译码基础模块 -
codec-dns 针对DNS的编码译码模块。依赖codec -
codec-haproxy 针对haproxy的编码译码模块 -
codec-http 针对http的编码译码模块 -
codec-http2 针对http2的编码译码模块 -
codec-memcache 针对memcache的编码译码模块 -
codec-mqtt 针对mqtt的编码译码模块 -
codec-redis 针对redis的编码译码模块 -
codec-smtp 针对smtp的编码译码模块 -
codec-socks 针对socks的编码译码模块 -
codec-stomp 针对stomp的编码译码模块 -
codec-xml 针对xml的编码译码模块 -
handler 处理器基础模块 -
handler-proxy 代理处理器模块 -
resolver 解析器模块 -
resolver-dns DNS解析器模块 -
transport 传输层处理模块(包含Channel模块、IO处理模型) -
transport-native-epoll 传输层epoll模型 -
transport-native-kqueue 传输层kqueue模型 -
transport-unix-common -
transport-rxtx 传输层rxtx模型 -
transport-sctp 传输层sctp模型 -
transport-udt 传输层udt模型 -
common 公共基础模块 -
dev-tools 发布工具模块 -
docker Docker部署相关 -
example 官方例子。可以作为源码测试入口。 -
license 证书 -
testsuite 测试套件 -
testsuite-autobahn -
testsuite-http2 -
testsuite-osgi -
testsuite-shading -
microbench 微基准测试模块 -
tarball Linux下Tarball的装机方式
Part 5 Netty源码环境构建踩坑
<!-- Our own Tomcat Native fork - completely optional, used for accelerating SSL with OpenSSL. -->
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>${tcnative.artifactId}</artifactId>
<version>${tcnative.version}</version>
<!-- 将下面的配置注释掉即可 -->
<!--<classifier>${tcnative.classifier}</classifier>-->
<scope>compile</scope>
<optional>true</optional>
</dependency>
Part 6 Netty 初体验
package io.netty.example.echo;
/**
* Echoes back any received data from a client.
*/
public final class EchoServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// 配置SSL
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
// Configure the server.
// (1) 处理I/O操作的事件循环器 (其实是个线程池)。
// boss组。负责接收已到达的connection
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 工作轮询线程组。
// worker组。当boss接收到connection并把它注册到worker后,
// worker就可以处理connection上的数据通信。
EventLoopGroup workerGroup = new NioEventLoopGroup();
// 自定义的Server端处理器
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
// (2) ServerBootstrap 是用来搭建 server 的协助类
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
// (3) 用来初始化一个新的Channel去接收到达的connection。这里面使用了工厂模式,反射
.channel(NioServerSocketChannel.class)
// 日志处理器。handler是boss轮询线程组的处理器
.handler(new LoggingHandler(LogLevel.INFO))
// (4) ChannelInitializer是一个特殊的 handler,帮助开发者配置Channel,而多数情况下你会配置Channel下的ChannelPipeline,
// 往 pipeline 添加一些 handler (例如DiscardServerHandler) 从而实现你的应用逻辑。
// 当你的应用变得复杂,你可能会向 pipeline 添加更多的 handler,并把这里的匿名类抽取出来作为一个单独的类。
.childHandler(new ChannelInitializer<SocketChannel>() {
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
})
// (5) 你可以给Channel配置特有的参数。这里我们写的是 TCP/IP 服务器,所以可以配置一些 socket 选项,例如 tcpNoDeply 和 keepAlive。请参考ChannelOption和ChannelConfig文档来获取更多可用的 Channel 配置选项
.option(ChannelOption.SO_BACKLOG, 100)
// (6) option()用来配置NioServerSocketChannel(负责接收到来的connection),
// 而childOption()是用来配置被ServerChannel(这里是NioServerSocketChannel) 所接收的Channel
.childOption(ChannelOption.SO_KEEPALIVE, true);
// (7) Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
/**
* Handler implementation for the echo server.
*/
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("接收到客户端信息:" + msg.toString());
ctx.write(msg);
}
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
package io.netty.example.echo;
public final class EchoClient {
static final boolean SSL = System.getProperty("ssl") != null;
static final String HOST = System.getProperty("host", "127.0.0.1");
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
public static void main(String[] args) throws Exception {
// Configure SSL.git
final SslContext sslCtx;
if (SSL) {
sslCtx = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE).build();
} else {
sslCtx = null;
}
// (1) 创建事件循环处理组
EventLoopGroup group = new NioEventLoopGroup();
try {
// (2) 客户端启动类
Bootstrap b = new Bootstrap();
//设置EventLoopGroup
b.group(group)
//设置channelFactory。负责生产NioSocketChannel的工厂类。
.channel(NioSocketChannel.class)
// (4) 初始化ChannelInitializer,添加处理器
.handler(new ChannelInitializer<SocketChannel>() {
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//SSl配置开启
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
}
p.addLast(new EchoClientHandler());
}
})
// (5) 配置 NioSocketChannel 的 socket 选项
.option(ChannelOption.TCP_NODELAY, true);
// (6) 开始连接服务端
ChannelFuture f = b.connect(HOST, PORT).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
}
}
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
private final ByteBuf firstMessage;
/**
* Creates a client-side handler.
*/
public EchoClientHandler() {
firstMessage = Unpooled.buffer(EchoClient.SIZE);
int capacity = 1;
for (int i = 0; i < 256; i ++) {
firstMessage.writeByte((byte) capacity);
}
}
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("channel激活...");
ctx.writeAndFlush(firstMessage);
System.out.println("客户端发送消息");
}
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("接收到服务端的消息:"+msg.toString());
}
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
Part 7 Netty源码后续章节计划
Part 8 END