vlambda博客
学习文章列表

Netty入门看这一篇就够了

一、Netty的概述

学习Netty,我们首先得知道什么是Netty
Netty:Netty是Java的一个开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速的开发高性能、高可靠性的网络服务器和客户端程序。也就是说Netty是一个基于NIO客户端、服务端的编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。

二、为什么要学习netty

既然Netty是一个网络应用程序开发框架,那为什么我们不用NIO呢(况且Netty也还是基于NIO开发的),首先我们得了解NIO,这里就不多做叙述,如果想了解一下NIO,可以看我之前写的NIO文章。NIO主要的问题是

  • 首先NIO的类库以及API都是比较复杂,繁琐,学习成本是比较高的,学习NIO你得了解:Channel(通道)、Buffer(缓冲区)、Selector(选择器)NIO的三大核心部分等。

  • 还需要熟悉java的多线程编程,因为NIO编程涉及到Reactor模式,必须对多线程和网络编程非常熟悉,才能写出高质量的NIO程序

  1. 还有epoll bug问题,导致Selector空轮询,导致CPU 100%。直到JDK1.7都没有根本性解决。

Netty的优点
Netty对JDK自带的NIO的API进行封装,解决了上述问题

  • 设计优雅:适用于各种传输类型的统一API;基于灵活可扩展的事件模型;以及高度可定制的线程模型。

  • 使用方便:有详细的用户指南和示例。

  • 高性能、吞吐量,延迟更低

  • 社区更加活跃,在不断更新

三、Netty的线程模型

不同的线程模式,对程序的性能有很大的影响,在研究Netty的线程模式之前,我们先了解存在的各种线程模式,做一下比较。
目前存在的线程模型有

  • 传统的I/O服务模型

  • Reactor模式

根据Reactor的数量和处理资源池线程的数量不同,又可以分为3种实现

  • 单Reactor单线程

  • 单Reactor多线程

  • 主从Reactor多线程

传统的I/O服务模型


采用阻塞IO模式获取数据,每一个连接都要独立的线程完成数据传输,业务处理、数据返回。
产生的问题:当并发数很大时,会创建大量的线程,占用很大的系统资源。
连接创建后,如果当前线程没有数据读取,该线程会一直阻塞在read操作,造成资源浪费。

Reactor模式

针对传统阻塞I/O服务模型的弊端,解决方法

  1. 基于I/O复用模型:多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象等待,无需阻塞等待所有连接。当某一个有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理。

  2. 基于线程池复用线程资源:不必为每一个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个业务连接。

Netty入门看这一篇就够了Reactor模式的执行过程

  1. Reactor模式,通过一个或多个输入同时传递给服务器的模式(基于事件驱动)

  2. 服务器程序处理传入的多个请求,并将它们分派到相应的处理线程,因此Reactor模式又叫Dispatcher模式

  3. Reactor使用IO复用监听事件,收到事件后,分发给某个线程,这也是服务器高并发处理的关键


Reactor模式中的核心组成

  1. Reactor:在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对IO事件做出反应

  2. Handlers:处理程序执行I/O 事件要完成的实际事件

Netty入门看这一篇就够了
(1)Select是前面IO复用模型介绍的标准网络编程API,可以实现应用程序通过一个阻塞对象监听多路连接请求
(2)Reactor对象通过Select监控客户端请求事件,收到事件后通过Dispatch进行分发
(3)如果建立连接请求事件,则由Acceptor通过Accept处理连接请求,然后创建一个Handler对象处理连接完成后的后续业务处理
(4)如果不是建立连接事件,则Reactor会分发调用连接对应的Handler来响应
(5)Handler会完成Read —>业务处理 >send的完成流程

Netty的工作原理

  1. Netty抽象出两组线程池BossGroup专门负责接收客户端的连接,WorkGroup专门负责网络的读写

  2. BossGroup和WorkGroup类型都是NioEventLoopGroup

  3. NIOEventLoopGroup相当于一个事件循环组,这个组中含有多个事件循环,每一个事件循环是NioEventLoop

  4. NioEventLoop表示一个不断循环的执行处理任务的线程,每个NioEventLoop都有一个selector,用于监听绑定在其上的socket的网络通讯

  5. NioEventLoopGroup可以有多个线程,即可以含有多个NioEventLoop

  6. 每个BossNioEventLoop循环执行的步骤有3步:

    轮询accept事件
    处理accept事件,与client建立连接,生成NioSocketChannel,并将其注册到某个WorkNioEventLoop上的select处理任务队列的任务

  7. 每一个WorkNIOEventLoop循环执行的步骤:
    轮询read、write事件
    处理I/O事件,即read、write事件,在对应NIOSocketChannel处理
    处理任务队列的任务

  8. 每个WorkerNIOEventLoop处理业务时,会使用pipeline,pipeline中包含channel,即通过pipeline可以获取到对应管道,管道中维护了很多处理器

四、Netty实战演示

首先引入Netty的jar包

创建服务端的启动类

public class NettyServer { public static void main(String[] args) throws Exception { //bossGroup只处理连接请求,workGroup和客户端业务处理 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workGroup)//设置两个线程组 .channel(NioServerSocketChannel.class)//NioServerSocketChannel服务器通道 .option(ChannelOption.SO_BACKLOG, 128)//线程队列得到的连接个数 .childOption(ChannelOption.SO_KEEPALIVE, true)//设置保持活动连接状态 .childHandler(new ChannelInitializer<SocketChannel>() { //创建通道测试对象 @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new NettyServerHandler()); } }); System.out.println("服务器 is ready...");
//绑定端口并同步,生成一个ChannelFuture 对象 //启动服务器,绑定端口 ChannelFuture cf = bootstrap.bind(6668).sync(); //对关闭通道监听 cf.channel().closeFuture().sync();
} finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } }
}

创建服务端的处理器

public class NettyServerHandler extends ChannelInboundHandlerAdapter {
//ChannelHandlerContext ctx:上下文,含有管道pipeline,通道channel,地址 //msg客户端发送的数据 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Channel channel = ctx.channel();
ByteBuf buffer = (ByteBuf) msg; System.out.println("收到客户端:"+channel.remoteAddress()+"的消息:"+buffer.toString(CharsetUtil.UTF_8));
}
@Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("服务端发送hello yy",CharsetUtil.UTF_8)); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); }}

创建客户端启动类

public class NettyClient { public static void main(String[] args) throws Exception{ //客户端需要一个事件循环组 EventLoopGroup group = new NioEventLoopGroup(); try { //创建客户端启动对象 Bootstrap bootstrap = new Bootstrap(); //设置参数 bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>(){
@Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new NettyClientHandler());//加入处理器 } }); System.out.println("客户端 ok..."); //启动客户端连接服务器 //关于ChannelFuture分析,涉及netty异步模型 ChannelFuture channelFuture = bootstrap.connect("127.0.0.1",6668).sync(); //给关闭通道进行监听 channelFuture.channel().closeFuture().sync();
}catch (Exception e){ e.printStackTrace(); }finally { group.shutdownGracefully(); } }}

创建客户端处理器

public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //发送消息到服务端 ctx.writeAndFlush(Unpooled.copiedBuffer("客户端发送hello,server", CharsetUtil.UTF_8));
}
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf)msg; System.out.println("收到服务端:"+ctx.channel().remoteAddress()+"的消息:"+buf.toString(CharsetUtil.UTF_8)); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}