小六六学Netty系列之初识Netty
前言
文本已收录至我的GitHub仓库,欢迎Star:https://github.com/bin392328206/six-finger
种一棵树最好的时间是十年前,其次是现在
我知道很多人不玩qq了,但是怀旧一下,欢迎加入六脉神剑Java菜鸟学习群,群聊号码:549684836 鼓励大家在技术的路上写博客
絮叨
为了学习Netty,我们前面铺垫了那么多,NIO Java的零拷贝,UNIX的I/O模型等等。下面是前面系列的链接
今天我们就来看看Netty 然后用Netty搞个最简单的例子
今天七夕,祝大家有情人终成眷属
Netty官网
官网
Netty官网说明
-
Netty 是由 JBOSS 提供的一个 Java 开源框架。Netty 提供异步的、基于事件驱动的网络应用程序框架,用以快速开发高性能、高可靠性的网络 IO 程序
-
Netty 可以帮助你快速、简单的开发出一个网络应用,相当于简化和流程化了 NIO 的开发过程
-
Netty 是目前最流行的 NIO 框架,Netty 在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,知名的 Elasticsearch 、Dubbo 框架内部都采用了 Netty。
Netty的优点
Netty 对 JDK 自带的 NIO 的 API 进行了封装,解决了上述问题。
-
设计优雅:适用于各种传输类型的统一 API 阻塞和非阻塞 Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型 - 单线程,一个或多个线程池.
-
使用方便:详细记录的 Javadoc,用户指南和示例;没有其他依赖项,JDK 5(Netty 3.x)或 6(Netty 4.x)就足够了。高性能、吞吐量更高:延迟更低;减少资源消耗;最小化不必要的内存复制。
-
安全:完整的 SSL/TLS 和 StartTLS 支持。社区活跃、不断更新:社区活跃,版本迭代周期短,发现的 Bug 可以被及时修复,同时,更多的新功能会被加入
Reactor 模式
针对传统阻塞 I/O 服务模型的 2 个缺点,解决方案:
-
基于 I/O 复用模型:多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象等待,无需阻塞等待所有连接。当某个连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理Reactor 对应的叫法: 1. 反应器模式 2. 分发者模式(Dispatcher) 3. 通知者模式(notifier)
-
基于线程池复用线程资源:不必再为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务。
I/O 复用结合线程池,就是 Reactor 模式基本设计思想,如图:
-
Reactor 模式,通过一个或多个输入同时传递给服务处理器的模式(基于事件驱动) -
服务器端程序处理传入的多个请求,并将它们同步分派到相应的处理线程, 因此Reactor模式也叫 Dispatcher模式 -
Reactor 模式使用IO复用监听事件, 收到事件后,分发给某个线程(进程), 这点就是网络服务器高并发处理关键
Reactor 三种模式
-
单 Reactor 单线程,前台接待员和服务员是同一个人,全程为顾客服 -
单 Reactor 多线程,1 个前台接待员,多个服务员,接待员只负责接待 -
主从 Reactor 多线程,多个前台接待员,多个服务生
Netty模型
工作原理示意图
-
Netty抽象出两组线程池 BossGroup 专门负责接收客户端的连接, WorkerGroup 专门负责网络的读写 BossGroup 和 WorkerGroup 类型都是 NioEventLoopGroup -
NioEventLoopGroup 相当于一个事件循环组, 这个组中含有多个事件循环 ,每一个事件循环是 NioEventLoop -
NioEventLoop 表示一个不断循环的执行处理任务的线程, 每个NioEventLoop 都有一个selector , 用于监听绑定在其上的socket的网络通讯 -
NioEventLoopGroup 可以有多个线程, 即可以含有多个NioEventLoop -
每个Boss NioEventLoop 循环执行的步骤有3步 -
轮询accept 事件 -
处理accept 事件 , 与client建立连接 , 生成NioScocketChannel , 并将其注册到某个worker NIOEventLoop 上的 selector -
处理任务队列的任务 , 即 runAllTasks -
每个 Worker NIOEventLoop 循环执行的步骤 -
轮询read, write 事件 -
处理i/o事件, 即read , write 事件,在对应NioScocketChannel 处理 -
处理任务队列的任务 , 即 runAllTasks -
每个Worker NIOEventLoop 处理业务时,会使用pipeline(管道), pipeline 中包含了 channel , 即通过pipeline 可以获取到对应通道, 管道中维护了很多的 处理器
Netty快速入门实例-TCP服务
-
NettyServer
package com.xiaoliuliu.netty.simple;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelFutureListener;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;/*** @author 小六六* @version 1.0* @date 2020/8/16 17:41*/public class NettyServer {public static void main(String[] args) {//创建BossGroup 和 WorkerGroup//说明//1. 创建两个线程组 bossGroup 和 workerGroup//2. bossGroup 只是处理连接请求 , 真正的和客户端业务处理,会交给 workerGroup完成//3. 两个都是无限循环//4. bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数// 默认实际 cpu核数 * 2NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup(8);try {//创建服务器端的启动对象,配置参数ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup)//设置两个线程组.channel(NioServerSocketChannel.class) //使用NioSocketChannel 作为服务器的通道实现.option(ChannelOption.SO_BACKLOG,128) //设置保持活动连接状态// .handler(null) // 该 handler对应 bossGroup , childHandler 对应 workerGroup.childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象)@Overrideprotected void initChannel(SocketChannel ch) throws Exception {System.out.println("客户socketchannel hashcode=" + ch.hashCode()); //可以使用一个集合管理 SocketChannel, 再推送消息时,可以将业务加入到各个channel 对应的 NIOEventLoop 的 taskQueue 或者 scheduleTaskQueuech.pipeline().addLast(new NettyServerHandler());}});System.out.println(".....服务器 is ready...");//绑定一个端口并且同步, 生成了一个 ChannelFuture 对象//启动服务器(并绑定端口)//基于异步操作ChannelFuture cf = bootstrap.bind(6666).sync();//给cf 注册监听器,监控我们关心的事件cf.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (cf.isSuccess()) {System.out.println("监听端口 6666 成功");} else {System.out.println("监听端口 6666 失败");}}});//对关闭通道进行监听cf.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();}finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}}
-
NettyServerHandler
package com.xiaoliuliu.netty.simple;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.Channel;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.channel.ChannelPipeline;import io.netty.util.CharsetUtil;/*** @author 小六六* @version 1.0* @date 2020/8/16 18:04*** 说明* 1. 我们自定义一个Handler 需要继续netty 规定好的某个HandlerAdapter(规范)* 2. 这时我们自定义一个Handler , 才能称为一个handler**/public class NettyServerHandler extends ChannelInboundHandlerAdapter {//读取数据实际(这里我们可以读取客户端发送的消息)/*1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址2. Object msg: 就是客户端发送的数据 默认Object*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {/*//比如这里我们有一个非常耗时长的业务-> 异步执行 -> 提交该channel 对应的//NIOEventLoop 的 taskQueue中,//解决方案1 用户程序自定义的普通任务ctx.channel().eventLoop().execute(new Runnable() {@Overridepublic void run() {try {Thread.sleep(5 * 1000);ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵2", CharsetUtil.UTF_8));System.out.println("channel code=" + ctx.channel().hashCode());} catch (Exception ex) {System.out.println("发生异常" + ex.getMessage());}}});ctx.channel().eventLoop().execute(new Runnable() {@Overridepublic void run() {try {Thread.sleep(5 * 1000);ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵3", CharsetUtil.UTF_8));System.out.println("channel code=" + ctx.channel().hashCode());} catch (Exception ex) {System.out.println("发生异常" + ex.getMessage());}}});//解决方案2 : 用户自定义定时任务 -》 该任务是提交到 scheduleTaskQueue中ctx.channel().eventLoop().schedule(new Runnable() {@Overridepublic void run() {try {Thread.sleep(5 * 1000);ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵4", CharsetUtil.UTF_8));System.out.println("channel code=" + ctx.channel().hashCode());} catch (Exception ex) {System.out.println("发生异常" + ex.getMessage());}}}, 5, TimeUnit.SECONDS);System.out.println("go on ...");*/System.out.println("服务器读取线程 " + Thread.currentThread().getName() + " channle =" + ctx.channel());System.out.println("server ctx =" + ctx);System.out.println("看看channel 和 pipeline的关系");Channel channel = ctx.channel();ChannelPipeline pipeline = channel.pipeline(); //本质是一个双向链接, 出站入站ByteBuf byteBuf=(ByteBuf)msg;System.out.println("客户端发送的数据是"+byteBuf.toString(CharsetUtil.UTF_8));System.out.println("客户端地址:" + channel.remoteAddress());}//数据读取完毕@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {//writeAndFlush 是 write + flush//将数据写入到缓存,并刷新//一般讲,我们对这个发送的数据进行编码ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵1", CharsetUtil.UTF_8));}//处理异常,关闭通道@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}}
-
NettyClient
package com.xiaoliuliu.netty.simple;import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;/*** @author 小六六* @version 1.0* @date 2020/8/16 18:13*/public class NettyClient {public static void main(String[] args) {//客户端需要一个事件循环组EventLoopGroup group = new NioEventLoopGroup();try {//创建客户端启动对象//注意客户端使用的不是 ServerBootstrap 而是 BootstrapBootstrap bootstrap = new Bootstrap();bootstrap.group(group)//设置线程组.channel(NioSocketChannel.class)// 设置客户端通道的实现类(反射).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new NettyClientHandler());}});System.out.println("客户端 ok..");//启动客户端去连接服务器端//关于 ChannelFuture 要分析,涉及到netty的异步模型ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();//给关闭通道进行监听channelFuture.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();}finally {group.shutdownGracefully();}}}
-
NettyClientHandler
package com.xiaoliuliu.netty.simple;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.util.CharsetUtil;/*** @author 小六六* @version 1.0* @date 2020/8/16 18:23*/public class NettyClientHandler extends ChannelInboundHandlerAdapter {//当通道就绪就会触发该方法@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("client " + ctx);ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 小六六: (>^ω^<)喵", CharsetUtil.UTF_8));}//当通道有读取事件时,会触发@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));System.out.println("服务器的地址:"+ ctx.channel().remoteAddress());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}}
-
结果
结尾
总结一下
-
Netty 抽象出两组线程池,BossGroup 专门负责接收客户端连接,WorkerGroup 专门负责网络读写操作。 -
NioEventLoop 表示一个不断循环执行处理任务的线程,每个 NioEventLoop 都有一个 selector,用于监听绑定在其上的 socket 网络通道。 -
NioEventLoop 内部采用串行化设计,从消息的读取->解码->处理->编码->发送,始终由 IO 线程 NioEventLoop 负责 -
NioEventLoopGroup 下包含多个 NioEventLoop -
每个 NioEventLoop 中包含有一个 Selector,一个 taskQueue -
每个 NioEventLoop 的 Selector 上可以注册监听多个 NioChannel -
每个 NioChannel 只会绑定在唯一的 NioEventLoop 上 -
每个 NioChannel 都绑定有一个自己的 ChannelPipeline
日常求赞
好了各位,以上就是这篇文章的全部内容了,能看到这里的人呀,都是真粉。
创作不易,各位的支持和认可,就是我创作的最大动力,我们下篇文章见
六脉神剑 | 文 【原创】如果本篇博客有任何错误,请批评指教,不胜感激 !
