分布式专题|都说netty入门很难,那是因为你没有看我的文章!
在写代码之前,我们先看下netty的线程模型,这比那固定格式的代码将会更有趣,看完线程模型,你就知道netty写的那几段固定代码的意义了。
线程模型图
这个线程模型图里面大概包含了这几个组件:bossGroup,workGroup,selectot(accept),selector(读写),pipline,NioSocketChannel,NioServerSocketChannel;
-
bossgroup,workgroup
在netty中,处理客户端的请求会被注册在两类selector上,这两类selector分别对应两个线程池bossGroup和workgroup,bossGroup主要处理客户端与服务端建立连接注册的selector;workgroup看名字也知道了,是用来干活的线程池,它主要负责处理客户端读事件的selector逻辑;在创建netty的第一行代码中,就是创建这两个线程池,一般情况下bossgroup会设置成一个线程,workgroup会设置多个线程,默认不写的话,netty会获取当前服务器中的cpu核数*2作为默认创建的线程数量。
-
selector(accepet),selector(读写)
selector和NIO中的selector是同一种组件,不过在netty中会分为两种类型的selector:专门处理连接事件的selector和专门处理读写事件的selector;但是在NIO中处理这些事件都是使用的同一个selector,NIO中通过遍历key的方式,来判断是连接事件还是读写事件,然后交给后端线程处理的逻辑;
-
NioServerSocketChannel
这是服务端启动之后创建的一个channel,然后会把这个channel注册到selector中,并添加自己感兴趣的accept的事件,后续所有客户端发起的连接都会被该channel监听到。具体用来做什么,我们会结合下个组件介绍
-
NioSocketChannel
客户端在发起连接请求之后,服务端会通过调用NioServerSocketChannel的accepet方法,生成一个NioSocketChannel,接着会从workGroup中挑选一个eventLoop,然后把channel注册到该eventLoop线程的selector上,并添加感兴趣的读事件;后续客户端与服务端所有的读写操作都会在该channel中进行。
-
pipline
pipline是一个实现了职责链模式的管道处理器,在初始化之后,会添加一些处理器,例如:编码器、解码器、业务逻辑处理器,select在得到客户端发送过来的数据后,会把数据丢到这个管道里面,然后从头到尾依次执行这些处理器;如果是服务端把数据发往客户端,会从尾部到头部依次执行处理器,但是从服务端发数据到客户端,只会执行出站处理器;客户端发送数据到服务端,只会执行入站处理器。
介绍完这些基本组件之后,我们对netty的线程模型应该有了初步的认识,现在我们大概梳理下netty的整个处理过程:
流程讲解
-
服务端初始化时,会创建两个线程组bossGroup,workGoup;
-
创建一个NioServerSocketChannel 注册到bossGroup中eventLoop的selector上面,添加自己感兴趣的accept事件, 并监听指定端口;
-
client1发起连接请求,在服务端会产生一个accept事件,通过遍历selector中的key得到accept事件;
-
服务端的NioServerSocketChannel通过accept方法进行阻塞(其实该事件已经来了,不需要阻塞),返回一个客户端的channel1(NioSocketChannel);
-
获得了chnnel1之后,服务端会从workgroup挑选一个eventloop1,并将channel1注册到该eventloop1的selector1上面,并添加感兴趣的读事件;这时候已经初始化好了该通道中的pipline1,并将所有的处理器都添加到了pipline1中;
-
这个时候又新加入一个client2发起连接,会执行同样的操作,最终将chnnel2注册到另外一个eventloop2里面的selector2上面,并添加感兴趣的读事件;这时候已经初始化好了该通道中的pipline2,并将所有的处理器都添加到了pipline2中;
-
如果client1发送数据到服务端,服务端生成的selector1会监听到该事件(读事件),读取通道中的数据,并将数据交给pipline1中,执行后续逻辑处理;
-
如果client2发送数据到服务端,服务端生成的selector2会监听到该事件(读事件),读取通道中的数据,并将数据交给pipline2中,执行后续逻辑处理;
快速上手
前面已经将netty的基本组成和其线程模型大概说了下,现在我们演示下如何使用netty进行开发:代码已经放到码云:穿云箭
添加依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha1</version>
</dependency>
服务端代码
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyServer {
public static void main(String[] args) {
// 创建 处理连接请求的线程组 1个
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 创建工作组线程 默认为 cpu核数*2 个
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//在pipline中添加自定义的handle处理器
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});
System.out.println("netty server start");
// 绑定9000 端口号 sync指的是 创建完端口监听后,才执行后续操作
ChannelFuture cf = serverBootstrap.bind(9000).sync();
// 添加监听器
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
System.out.println("服务启动完成");
}
});
// 注册chnnel的关闭事件,sync是只有当关闭事件发生后才结束该线程,否则一直阻塞
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
创建自定义的处理器,写我们自己的业务逻辑
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf msg1 = (ByteBuf) msg;
System.out.println(String.format("收到客户端(%s)消息:%s", ctx.channel().remoteAddress().toString(), msg1.toString(CharsetUtil.UTF_8)));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = Unpooled.copiedBuffer("HelloClient", CharsetUtil.UTF_8);
ctx.writeAndFlush(buf);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(String.format("有新的客户端连接:%s", ctx.channel().remoteAddress().toString()));
}
}
# 这里的ChannelInboundHandlerAdapter已经被废弃了,大家后续可以继承SimpleChannelInboundHandler,支持传入泛型,然后配合解码器使用,这里只是做个简单的演示。
客户端代码
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
public class NettyClient {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(bossGroup).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyClientHandler());
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(String.format("有新的客户端连接:%s", ctx.channel().remoteAddress().toString()));
}
});
System.out.println("netty client start");
ChannelFuture cf = bootstrap.connect("127.0.0.1", 9000).sync();
cf.addListener((ChannelFutureListener) channelFuture -> System.out.println("客户端启动完成"));
String msg = "";
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
do {
try {
msg = br.readLine();
} catch (IOException e) {
e.printStackTrace();
}
ByteBuf buf = Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8);
cf.channel().writeAndFlush(buf);
} while (!msg.equals("end"));
System.out.println("您已退出");
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
}
}
}
创客户端自定义处理器
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = Unpooled.copiedBuffer("HelloServer", CharsetUtil.UTF_8);
ctx.writeAndFlush(buf);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf msg1 = (ByteBuf) msg;
System.out.println(String.format("收到服户端(%s)消息:%s", ctx.channel().remoteAddress().toString(), msg1.toString(CharsetUtil.UTF_8)));
}
}
netty相关面试知识拓展
什么是拆包和粘包
名词解释
客户端与服务端建立了TCP/UDP连接,如果连接中限制了发送数据的报文大小,此时 将要发送的数据大于这个限制,就会产生拆包现象;截取后的数据包会等待下次发送数据的时候一起发送,如果这个时候这部分数据和其他数据包一起发到服务端,又会产生粘包的现象;
解决方案
-
自己定义数据发送的数据格式,包括数据长度和数据内容两个,通过长度来判断数据有没有结束 -
使用定长解码器实现 -
使用指定开始符和结束符实现
解释下什么是零拷贝
说零拷贝之前,我们需要引入一个名词“直接内存”,我们知道java代码都运行在jvm虚拟机中,分配的内存数据都是在jvm中分配的,如果想直接访问jvm之外的内存数据,那就叫直接内存访问;在netty中,直接使用直接内存进行socket进行读写。不需要将数据拷贝到jvm中的缓冲区中,而是将数据直接发送到socket中,不需要再执行中间的拷贝操作;
架构之路 | 优质书单 | 面试资料
小白入门 | 前沿技术 | 视频教程