vlambda博客
学习文章列表

小六六学Netty系列之初识Netty

前言

文本已收录至我的GitHub仓库,欢迎Star:https://github.com/bin392328206/six-finger
种一棵树最好的时间是十年前,其次是现在
我知道很多人不玩qq了,但是怀旧一下,欢迎加入六脉神剑Java菜鸟学习群,群聊号码:549684836 鼓励大家在技术的路上写博客

絮叨

为了学习Netty,我们前面铺垫了那么多,NIO Java的零拷贝,UNIX的I/O模型等等。下面是前面系列的链接

今天我们就来看看Netty 然后用Netty搞个最简单的例子

今天七夕,祝大家有情人终成眷属

Netty官网

官网

小六六学Netty系列之初识Netty

Netty官网说明

  • Netty 是由 JBOSS 提供的一个 Java 开源框架。Netty 提供异步的、基于事件驱动的网络应用程序框架,用以快速开发高性能、高可靠性的网络 IO 程序

  • Netty 可以帮助你快速、简单的开发出一个网络应用,相当于简化和流程化了 NIO 的开发过程

  • Netty 是目前最流行的 NIO 框架,Netty 在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,知名的 Elasticsearch 、Dubbo 框架内部都采用了 Netty。

Netty的优点

Netty 对 JDK 自带的 NIO 的 API 进行了封装,解决了上述问题。

  • 设计优雅:适用于各种传输类型的统一 API 阻塞和非阻塞 Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型 - 单线程,一个或多个线程池.

  • 使用方便:详细记录的 Javadoc,用户指南和示例;没有其他依赖项,JDK 5(Netty 3.x)或 6(Netty 4.x)就足够了。高性能、吞吐量更高:延迟更低;减少资源消耗;最小化不必要的内存复制。

  • 安全:完整的 SSL/TLS 和 StartTLS 支持。社区活跃、不断更新:社区活跃,版本迭代周期短,发现的 Bug 可以被及时修复,同时,更多的新功能会被加入

Reactor 模式

针对传统阻塞 I/O 服务模型的 2 个缺点,解决方案:

  • 基于 I/O 复用模型:多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象等待,无需阻塞等待所有连接。当某个连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理Reactor 对应的叫法: 1. 反应器模式 2. 分发者模式(Dispatcher) 3. 通知者模式(notifier)

  • 基于线程池复用线程资源:不必再为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务。小六六学Netty系列之初识Netty

I/O 复用结合线程池,就是 Reactor 模式基本设计思想,如图:小六六学Netty系列之初识Netty

  • Reactor 模式,通过一个或多个输入同时传递给服务处理器的模式(基于事件驱动)
  • 服务器端程序处理传入的多个请求,并将它们同步分派到相应的处理线程, 因此Reactor模式也叫 Dispatcher模式
  • Reactor 模式使用IO复用监听事件, 收到事件后,分发给某个线程(进程), 这点就是网络服务器高并发处理关键

Reactor 三种模式

  • 单 Reactor 单线程,前台接待员和服务员是同一个人,全程为顾客服
  • 单 Reactor 多线程,1 个前台接待员,多个服务员,接待员只负责接待
  • 主从 Reactor 多线程,多个前台接待员,多个服务生

Netty模型

工作原理示意图小六六学Netty系列之初识Netty

  • Netty抽象出两组线程池 BossGroup 专门负责接收客户端的连接, WorkerGroup 专门负责网络的读写 BossGroup 和 WorkerGroup 类型都是 NioEventLoopGroup
  • NioEventLoopGroup 相当于一个事件循环组, 这个组中含有多个事件循环 ,每一个事件循环是 NioEventLoop
  • NioEventLoop 表示一个不断循环的执行处理任务的线程, 每个NioEventLoop 都有一个selector , 用于监听绑定在其上的socket的网络通讯
  • NioEventLoopGroup 可以有多个线程, 即可以含有多个NioEventLoop
  • 每个Boss NioEventLoop 循环执行的步骤有3步
    • 轮询accept 事件
    • 处理accept 事件 , 与client建立连接 , 生成NioScocketChannel , 并将其注册到某个worker NIOEventLoop 上的 selector
    • 处理任务队列的任务 , 即 runAllTasks
  • 每个 Worker NIOEventLoop 循环执行的步骤
    • 轮询read, write 事件
    • 处理i/o事件, 即read , write 事件,在对应NioScocketChannel 处理
    • 处理任务队列的任务 , 即 runAllTasks
  • 每个Worker NIOEventLoop  处理业务时,会使用pipeline(管道), pipeline 中包含了 channel , 即通过pipeline 可以获取到对应通道, 管道中维护了很多的 处理器

Netty快速入门实例-TCP服务

  • NettyServer

package com.xiaoliuliu.netty.simple;
import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelFutureListener;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;
/** * @author 小六六 * @version 1.0 * @date 2020/8/16 17:41 */public class NettyServer {
public static void main(String[] args) {
//创建BossGroup 和 WorkerGroup //说明 //1. 创建两个线程组 bossGroup 和 workerGroup //2. bossGroup 只是处理连接请求 , 真正的和客户端业务处理,会交给 workerGroup完成 //3. 两个都是无限循环 //4. bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数 // 默认实际 cpu核数 * 2

NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup(8);
try { //创建服务器端的启动对象,配置参数 ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)//设置两个线程组 .channel(NioServerSocketChannel.class) //使用NioSocketChannel 作为服务器的通道实现 .option(ChannelOption.SO_BACKLOG,128) //设置保持活动连接状态 // .handler(null) // 该 handler对应 bossGroup , childHandler 对应 workerGroup .childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象) @Override protected void initChannel(SocketChannel ch) throws Exception { System.out.println("客户socketchannel hashcode=" + ch.hashCode()); //可以使用一个集合管理 SocketChannel, 再推送消息时,可以将业务加入到各个channel 对应的 NIOEventLoop 的 taskQueue 或者 scheduleTaskQueue ch.pipeline().addLast(new NettyServerHandler()); } });

System.out.println(".....服务器 is ready...");

//绑定一个端口并且同步, 生成了一个 ChannelFuture 对象 //启动服务器(并绑定端口) //基于异步操作 ChannelFuture cf = bootstrap.bind(6666).sync();

//给cf 注册监听器,监控我们关心的事件 cf.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (cf.isSuccess()) { System.out.println("监听端口 6666 成功"); } else { System.out.println("监听端口 6666 失败"); } } });
//对关闭通道进行监听 cf.channel().closeFuture().sync();
} catch (InterruptedException e) { e.printStackTrace(); }finally {
bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }
}}
  • NettyServerHandler
package com.xiaoliuliu.netty.simple;
import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.Channel;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.channel.ChannelPipeline;import io.netty.util.CharsetUtil;
/** * @author 小六六 * @version 1.0 * @date 2020/8/16 18:04 * * * 说明 * 1. 我们自定义一个Handler 需要继续netty 规定好的某个HandlerAdapter(规范) * 2. 这时我们自定义一个Handler , 才能称为一个handler * */public class NettyServerHandler extends ChannelInboundHandlerAdapter {

//读取数据实际(这里我们可以读取客户端发送的消息) /* 1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址 2. Object msg: 就是客户端发送的数据 默认Object */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
/*
//比如这里我们有一个非常耗时长的业务-> 异步执行 -> 提交该channel 对应的 //NIOEventLoop 的 taskQueue中,
//解决方案1 用户程序自定义的普通任务
ctx.channel().eventLoop().execute(new Runnable() { @Override public void run() {
try { Thread.sleep(5 * 1000); ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵2", CharsetUtil.UTF_8)); System.out.println("channel code=" + ctx.channel().hashCode()); } catch (Exception ex) { System.out.println("发生异常" + ex.getMessage()); } } });
ctx.channel().eventLoop().execute(new Runnable() { @Override public void run() {
try { Thread.sleep(5 * 1000); ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵3", CharsetUtil.UTF_8)); System.out.println("channel code=" + ctx.channel().hashCode()); } catch (Exception ex) { System.out.println("发生异常" + ex.getMessage()); } } });
//解决方案2 : 用户自定义定时任务 -》 该任务是提交到 scheduleTaskQueue中
ctx.channel().eventLoop().schedule(new Runnable() { @Override public void run() {
try { Thread.sleep(5 * 1000); ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵4", CharsetUtil.UTF_8)); System.out.println("channel code=" + ctx.channel().hashCode()); } catch (Exception ex) { System.out.println("发生异常" + ex.getMessage()); } } }, 5, TimeUnit.SECONDS);


System.out.println("go on ...");*/ System.out.println("服务器读取线程 " + Thread.currentThread().getName() + " channle =" + ctx.channel()); System.out.println("server ctx =" + ctx); System.out.println("看看channel 和 pipeline的关系");
Channel channel = ctx.channel(); ChannelPipeline pipeline = channel.pipeline(); //本质是一个双向链接, 出站入站
ByteBuf byteBuf=(ByteBuf)msg;
System.out.println("客户端发送的数据是"+byteBuf.toString(CharsetUtil.UTF_8));
System.out.println("客户端地址:" + channel.remoteAddress()); }

//数据读取完毕 @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //writeAndFlush 是 write + flush //将数据写入到缓存,并刷新 //一般讲,我们对这个发送的数据进行编码 ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵1", CharsetUtil.UTF_8)); }
//处理异常,关闭通道 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); }}
  • NettyClient
package com.xiaoliuliu.netty.simple;
import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;
/** * @author 小六六 * @version 1.0 * @date 2020/8/16 18:13 */public class NettyClient {
public static void main(String[] args) { //客户端需要一个事件循环组 EventLoopGroup group = new NioEventLoopGroup();
try {
//创建客户端启动对象 //注意客户端使用的不是 ServerBootstrap 而是 Bootstrap Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group)//设置线程组 .channel(NioSocketChannel.class)// 设置客户端通道的实现类(反射) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new NettyClientHandler()); } });
System.out.println("客户端 ok..");
//启动客户端去连接服务器端 //关于 ChannelFuture 要分析,涉及到netty的异步模型 ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
//给关闭通道进行监听 channelFuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); }finally { group.shutdownGracefully(); }

}}
  • NettyClientHandler

package com.xiaoliuliu.netty.simple;
import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.util.CharsetUtil;
/** * @author 小六六 * @version 1.0 * @date 2020/8/16 18:23 */public class NettyClientHandler extends ChannelInboundHandlerAdapter {
//当通道就绪就会触发该方法 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("client " + ctx); ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 小六六: (>^ω^<)喵", CharsetUtil.UTF_8)); }
//当通道有读取事件时,会触发 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8)); System.out.println("服务器的地址:"+ ctx.channel().remoteAddress()); }

@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}
  • 结果 小六六学Netty系列之初识Netty

结尾

总结一下

  • Netty 抽象出两组线程池,BossGroup 专门负责接收客户端连接,WorkerGroup 专门负责网络读写操作。
  • NioEventLoop 表示一个不断循环执行处理任务的线程,每个 NioEventLoop 都有一个 selector,用于监听绑定在其上的 socket 网络通道。
  • NioEventLoop 内部采用串行化设计,从消息的读取->解码->处理->编码->发送,始终由 IO 线程 NioEventLoop 负责
  • NioEventLoopGroup 下包含多个 NioEventLoop
  • 每个 NioEventLoop 中包含有一个 Selector,一个 taskQueue
  • 每个 NioEventLoop 的 Selector 上可以注册监听多个 NioChannel
  • 每个 NioChannel 只会绑定在唯一的 NioEventLoop 上
  • 每个 NioChannel 都绑定有一个自己的 ChannelPipeline

日常求赞

好了各位,以上就是这篇文章的全部内容了,能看到这里的人呀,都是真粉。

创作不易,各位的支持和认可,就是我创作的最大动力,我们下篇文章见

六脉神剑 | 文 【原创】如果本篇博客有任何错误,请批评指教,不胜感激 !