【IT新手之路】Netty学习之旅(一)
你知道网络架构分层吗?TCP/UDP各有什么优缺点? 你知道网络通信过程吗?一次https请求要经历哪些流程? 你知道网络IO模型吗?c10k问题是如何解决的? 你知道Java如何实现网络编程吗?Netty在Java Nio上做了哪些优化呢?
一、Netty是什么
官网: https://netty.io/index.html
github: https://github.com/netty/netty
Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.
Netty是一个异步、事件驱动的网络应用框架,用于快速开发可维护、高性能协议服务端和客户端。
下图是netty所包含的组件:
可以看到netty包含三个模块:
Core 核心模块: 可扩展的事件模型;通用的通信api;有零拷贝能力的Byte Buffer
Transport Services 传输服务: socket、UDP、http tunnel等
Protocol Support 协议支持: http、websocket、rtsp等协议的支持
我们来看看netty涉及到的知识层级划分:
netty可以应用于各rpc框架,如dubbo等
netty是基于Java NIO实现的,并在此基础上做了优化
netty是基于reactor模式,Linux IO多路复用模型
netty可以实现传输层TCP/UDP协议的通信
netty的多路复用器基于poll、epoll系统调用实现,通过mmap、sendfile实现零拷贝
性能优秀,高吞吐、低延迟
可扩展性好,基于事件驱动模型、可自定义线程模型
支持非阻塞的socket
文档齐全、社区活跃
•Apache Mina netty与mina是同一作者的不同产品,netty是作者用来针对mina提高扩展性与解决一些已知问题的产品,可以理解为netty是mina的升级版
Sun Grizzly 社区没有Netty活跃,更新没有netty频繁
Apple Swift NIO 非java语言
tomcat/jetty tomcat/jetty是Servlet容器,主要应用基于Servlet的Http协议的处理;而netty不仅支持Http,还支持FTP、SSH等应用层协议以及传输层的UDP协议,并且还支持自定义协议;
https://java.libhunt.com/compare-netty-vs-mina https://java.libhunt.com/compare-grizzly-vs-netty
Netty支持常用的应用层协议如Http,而Java Nio需要开发人员去手动进行协议处理
Netty处理了TCP粘包半包问题,而Java Nio未处理
Netty支持流浪整形,而Java Nio不支持
Netty完善的处理了断连、空闲等异常处理
Netty修复了一些Java Nio的bug
二、如何去学习Netty
什么是NIO,个人理解,操作系统层面,是Non-blocking IO,Java层面叫它new IO或许比Non-blocking IO合适,因为它并不是完全非阻塞的,比如Selector.select()就是阻塞的方法;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
public class NioServer implements Runnable {
private Selector selector;
private ServerSocketChannel serverSocketChannel;
public static void main(String[] args) {
new Thread(new NioServer(8765), "NioServer-001").start();
}
public NioServer(int port) {
try {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024);
serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
System.out.println("Server start, port :" + port);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
@Override
public void run() {
while(true){
try {
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
SelectionKey key = null;
while(iterator.hasNext()) {
key = iterator.next();
iterator.remove();
try {
handleEvent(key);
} catch (IOException e) {
if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void handleEvent(SelectionKey key) throws IOException {
if (key.isValid()) {
if (key.isAcceptable()) {
handleAccept(key);
}
if (key.isReadable()) {
handleRead(key);
}
}
}
private void handleAccept(SelectionKey key) throws IOException {
System.out.println("connect accept,time" + System.currentTimeMillis());
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
sc.register(this.selector, SelectionKey.OP_READ);
}
private void handleRead(SelectionKey key) throws IOException {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes).trim();
System.out.println("server recive body:" + body);
String response = "currentTime=" + new Date().toString();
doWrite(sc, response);
} else if (readBytes < 0) {
key.cancel();
sc.close();
}
}
private void doWrite(SocketChannel socketChannel, Strin response) throws IOException {
byte[] bytes = response.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
socketChannel.write(writeBuffer);
if (!writeBuffer.hasRemaining()) {
System.out.println("response 2 client succeed:" + response);
}
}
serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024);
fd=9
的socket绑定到本机(0.0.0.0)的8765端口号上,并开启监听;
bind(9, {sa_family=AF_INET, sin_port=htons(8765), sin_addr=inet_addr("0.0.0.0")}, 16) = 0
listen(9, 1024)
selector = Selector.open();
epoll_create(256) = 8
serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
epoll_ctl(8, EPOLL_CTL_ADD, 9, {EPOLLIN, {u32=9, u64=4538492564453457929}}) = 0
selector.select();
epoll_wait(8,
SocketChannel sc = ssc.accept();
accept(9, {sa_family=AF_INET, sin_port=htons(35744), sin_addr=inet_addr("127.0.0.1")}, [16]) = 11
Multiple Reactor Threads Demo
Multiple Reactor Threads是Reactor Pattern中的主从Reactor多线程模式;(关于reactor模式,可以阅读参考文档4及参考文档5)
import java.io.IOException;
public class Server {
public static void main(String[] args) throws IOException {
MainReactor reactor = new MainReactor(8765);
new Thread(reactor, "mainReactor").start();
}
}
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
public class MainReactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocketChannel;
public MainReactor(int port) throws IOException {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(port));
SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
selectionKey.attach(new Acceptor());
}
@Override
public void run() {
while (!Thread.interrupted()) {
try {
if (this.selector.select(1000L) > 0) {
Iterator<SelectionKey> selectionKeys = this.selector.selectedKeys().iterator();
while (selectionKeys.hasNext()) {
SelectionKey key = selectionKeys.next();
if (!key.isValid()) {
continue;
}
selectionKeys.remove();
dispatch(key);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
void dispatch(SelectionKey selectionKey) throws IOException {
EventHandler eventHandler = (EventHandler) selectionKey.attachment();
eventHandler.process(selectionKey);
}
}
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class SubReactor implements Runnable {
final Selector selector;
public SubReactor() throws IOException {
this.selector = Selector.open();
}
public void register(SocketChannel socketChannel) throws IOException {
SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_READ);
selectionKey.attach(new Handler());
}
@Override
public void run() {
while (!Thread.interrupted()) {
try {
if (this.selector.select(1000L) > 0) {
Iterator<SelectionKey> selectionKeys = this.selector.selectedKeys().iterator();
while (selectionKeys.hasNext()) {
SelectionKey key = selectionKeys.next();
if (!key.isValid()) {
continue;
}
selectionKeys.remove();
dispatch(key);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
void dispatch(SelectionKey selectionKey) throws IOException {
EventHandler eventHandler = (EventHandler) selectionKey.attachment();
eventHandler.process(selectionKey);
}
}
import java.io.IOException;
import java.nio.channels.SelectionKey;
public interface EventHandler {
void process(SelectionKey selectionKey) throws IOException;
}
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
public class Acceptor implements EventHandler {
SubReactor[] subReactors;
int index;
public Acceptor() throws IOException {
SubReactor subReactor01 = new SubReactor();
SubReactor subReactor02 = new SubReactor();
SubReactor subReactor03 = new SubReactor();
this.subReactors = new SubReactor[]{subReactor01, subReactor02, subReactor03};
new Thread(subReactor01, "subReactor01").start();
new Thread(subReactor02, "subReactor02").start();
new Thread(subReactor03, "subReactor03").start();
}
@Override
public void process(SelectionKey selectionKey) throws IOException {
if (!selectionKey.isAcceptable()) {
return;
}
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
SubReactor subReactor = subReactors[index];
index++;
if (index >= subReactors.length) {
index = 0;
}
subReactor.register(socketChannel);
}
}
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Handler implements EventHandler {
private static ExecutorService executorService = Executors.newFixedThreadPool(10);
@Override
public void process(SelectionKey selectionKey) throws IOException {
executorService.execute(() -> {
try {
process2(selectionKey);
} catch (IOException e) {
e.printStackTrace();
}
});
}
private void process2(SelectionKey selectionKey) throws IOException {
if (selectionKey.isReadable()) {
processRead(selectionKey);
}
}
private void processRead(SelectionKey key) throws IOException {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes).trim();
System.out.println(Thread.currentThread().getName() + " server recive body:" + body);
String response = "currentTime=" + new Date().toString();
processWrite(sc, response);
} else if (readBytes < 0) {
key.cancel();
sc.close();
}
}
/**
* 由于是本地测试demo代码,所以针对写事件,并没有去注册OP_WRITE;
*/
private void processWrite(SocketChannel socketChannel, String response) throws IOException {
byte[] bytes = response.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
socketChannel.write(writeBuffer);
if (!writeBuffer.hasRemaining()) {
System.out.println("response 2 client succeed:" + response);
}
}
}
Netty就是采用的这种模式,不仅Netty会使用Reactor模式,redis(单reactor单线程)、nginx等应用都使用了reactor,那为什么要用这种模式,它有什么好处?我们在后面的文章进行说明;
三、小结
通过之前的内容,我们了解了Netty所涉及到的相关知识: Java NIO API、Reactor模式、Linux IO模式、TCP/IP、以及一些系统调用;当然,还有未提及到的zero-copy、内存分配、编解码等知识也很重要;
你可能会问,上面的Java程序并没有使用到Netty啊,是的,确实没使用到Netty,但是Netty是基于Java NIO开发的,弄清楚了Java NIO,学习Netty事半功倍;
或许你还心存疑惑,毕竟这篇文章只是Netty学习系列的开篇,它还不够详细,但它的目的只是为了让你大致了解Netty的知识体系,激发你的学习兴趣,让我们在后续的文章中继续我们的Netty旅程,并能有深度得回答本文开头的那些问题;
四、参考文档
1.https://netty.io/index.html
2.https://java.libhunt.com/compare-netty-vs-mina
3.https://java.libhunt.com/compare-grizzly-vs-netty
4.http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf
5.http://www.dre.vanderbilt.edu/~schmidt/PDF/reactor-siemens.pdf
6.UNIX网络编程卷1:套接字联网API