vlambda博客
学习文章列表

netty实现websocket请求实战

描述

WebSocket是html5开始浏览器和服务端进行全双工通信的网络技术。在该协议下,与服务端只需要一次握手,之后建立一条快速通道,开始互相传输数据,实际是基于TCP双向全双工,比http半双工提高了很大的性能,常用于网络在线聊天室等。继续netty实例的学习,本期内容主要做WebSocket实例的实现和相关协议的验证。

WebSocket与http比较

http WebSocket
半双工,可以双向传输,不能同时传输 全双工
消息冗长繁琐,消息头,消息体,换行... 对代理、防火墙、路由器透明
http轮询实现推送请求量大,而comet采用长连接 无头部、Cookie等
- ping/pong帧保持链路激活
- 特点:服务端可以主动传递给客户端,不需要轮询

代码示例和运行结果

服务端

服务端比较简单,是启动一个端口来处理请求。

主程序

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.junit.Test;

public class NettyWebSocketServer {

public void run(final int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(65535))
.addLast(new ChunkedWriteHandler())
.addLast(new WebSocketServerHandler());
}
});
Channel ch = bootstrap.bind(port).sync().channel();
System.out.println("websocket @" + port);
ch.closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}
}

@Test
public void runServer() {
run(23123);
}
}

请求处理handler

核心处理WebSocket请求,重点地方加了注释

import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;

import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;

public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
private WebSocketServerHandshaker handshaker;

@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
//首次请求之后先进行握手,通过http请求来实现
if (msg instanceof FullHttpRequest) {
handleHttpRequest(ctx, (FullHttpRequest) msg);
} else if (msg instanceof WebSocketFrame) {
handleWebSocketRequest(ctx, (WebSocketFrame) msg);
}
}

private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) {
//解码http失败返回
if (!request.decoderResult().isSuccess()) {
sendResponse(ctx, request, new DefaultFullHttpResponse(request.protocolVersion(), HttpResponseStatus.BAD_REQUEST, ctx.alloc().buffer()));
return;
}

if (!HttpMethod.GET.equals(request.method())) {
sendResponse(ctx, request, new DefaultFullHttpResponse(request.protocolVersion(), HttpResponseStatus.FORBIDDEN, ctx.alloc().buffer()));
return;
}

//参数分别是ws地址,子协议,是否扩展,最大frame长度
WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(getWebSocketLocation(request), null, true, 5 * 1024 * 1024);
handshaker = factory.newHandshaker(request);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
handshaker.handshake(ctx.channel(), request);
}
}

//SSL支持采用wss://
private String getWebSocketLocation(FullHttpRequest request) {
String location = request.headers().get(HttpHeaderNames.HOST) + "/websocket";
return "ws://" + location;
}

private void handleWebSocketRequest(ChannelHandlerContext ctx, WebSocketFrame frame) {
//关闭
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}

//握手 PING/PONG
if (frame instanceof PingWebSocketFrame) {
ctx.write(new PongWebSocketFrame(frame.content().retain()));
return;
}

//文本接收和发送
if (frame instanceof TextWebSocketFrame) {
String recv = ((TextWebSocketFrame) frame).text();
String text = "now@" + LocalDateTime.now() + "\n recv:" + recv;

//这里加了个循环,每隔一秒服务端主动发送内容给客户端
for (int i = 0; i < 5; i++) {
String res = i + "-" + text;
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
ctx.writeAndFlush(new TextWebSocketFrame(res));
}
System.out.println(recv);
return;
}

if (frame instanceof BinaryWebSocketFrame) {
ctx.write(frame.retain());
}
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}

private void sendResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse resp) {
HttpResponseStatus status = resp.status();
if (status != HttpResponseStatus.OK) {
ByteBufUtil.writeUtf8(resp.content(), status.toString());
HttpUtil.setContentLength(req, resp.content().readableBytes());
}
boolean keepAlive = HttpUtil.isKeepAlive(req) && status == HttpResponseStatus.OK;
HttpUtil.setKeepAlive(req, keepAlive);
ChannelFuture future = ctx.write(resp);
if (!keepAlive) {
future.addListener(ChannelFutureListener.CLOSE);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

客户端

<html>
<head>
<meta charset="UTF-8">
</head>
<body>
<form onsubmit="return false;">
<input type="text" name="message" value="test"/>
<br/>
<input type="button" value="send", onclick="send(this.form.message.value)"/>
<hr/>
resp:
<br/>
<textarea id="responseText" style="width: 200px;height:300px;"></textarea>
</form>

</form>
</body>
<script type="text/javascript">
var socket;
if (!window.WebSocket) {
window.WebSocket = window.MozWebSocket;
}
if (window.WebSocket) {
socket = new WebSocket("ws://127.0.0.1:23123/websocket");
socket.onmessage = function (ev) {
var ta = document.getElementById('responseText');
ta.value = '';
ta.value = ev.data;
}

socket.onopen = function (ev) {
var ta = document.getElementById('responseText');
ta.value = 'start open websocket ...';
}

socket.onclose = function (ev) {
var ta = document.getElementById('responseText');
ta.value = '';
ta.value = 'close websocket ...';
}
}else {
alert('not support websocket !')
}

function send(msg) {
if (!window.WebSocket) {
return;
}
if (socket.readyState == WebSocket.OPEN) {
socket.send(msg);
}else {
alert('connect fail !')
}
}
</script>
</html>

试验结果

  1. 请求抓包说明


    ws请求


    如图中,首先是TCP三次握手,然后HTTP请求,建立WebSocket连接。之后客户端请求一次,然后服务端每隔1秒发送内容到客户端。


    netty实现websocket请求实战

    keepAlive


    上图是空闲时候的keepAlive调用

  2. 客户端调用


    服务端关闭的时候效果


    服务端主动发送的效果


    上图是服务端主动发送的效果,随着内容不同数据会变化。

参考资料

结语

以上就是本期的内容,后面有时间会继续netty相关实例的文章,确实在网络方面有很多新的收获。