vlambda博客
学习文章列表

通过Netty,实现Websocket消息推送简单几步搞定

点击上方“九码课堂”关注我们
生产中,大家都遇到过需要服务端给客户端推送消息的需求,而当时为此烦恼过。 要想实现这个需求,我们知道离不开websocket的长连接方式,那么我们是该选择原生的websocket还是更加高级的netty框架呢?

这里我强烈推荐neety框架,一款优秀的框架都是基于原生代码上进行包装,以此达到更好、更方便、更实用的目的。使用框架,很多需要我们去考虑的问题都可以交给框架去自动完成,不用再考虑。

一、逻辑架构图

通过Netty,实现Websocket消息推送简单几步搞定

上诉图中可以看出本次实现的基本流程:从客户端A发送请求到核心模块,核心模块生产消息至消息队列,然后服务端消息模块消费掉消息,再将消息推送给客户端B。流程很简单,没有太多技巧,本文主要重点就在消息模块这里,主要包括netty client、netty server、channel的存储等等。

二、代码

1、添加依赖

<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.6.Final</version></dependency>

2、NettyServer类

@Servicepublic class NettyServer { public void run(int port){ new Thread(){ public void run(){ runServer(port); } }.start(); }
private void runServer(int port){ Print.info("===============Message服务端启动==============="); EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup); b.channel(NioServerSocketChannel.class); b.childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("codec-http", new HttpServerCodec()); pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); pipeline.addLast("handler", new MyWebSocketServerHandler()); } });
Channel ch = b.bind(port).sync().channel(); Print.info("Message服务器启动成功:" + ch.toString()); ch.closeFuture().sync(); } catch (Exception e){ Print.error("Message服务运行异常:" + e.getMessage()); e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); Print.info("Message服务已关闭"); } }}

3、MyWebSocketServerHandler类

public class MyWebSocketServerHandler extends SimpleChannelInboundHandler<Object>{ private static final String WEBSOCKET_PATH = ""; private WebSocketServerHandshaker handshaker;
@Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof FullHttpRequest){ //以http请求形式接入,但是走的是websocket handleHttpRequest(ctx, (FullHttpRequest) msg); }else if (msg instanceof WebSocketFrame){ //处理websocket客户端的消息 handleWebSocketFrame(ctx, (WebSocketFrame) msg); } }
@Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); }
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception { //要求Upgrade为websocket,过滤掉get/Post if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) { //若不是websocket方式,则创建BAD_REQUEST的req,返回给客户端 sendHttpResponse(ctx, req, new DefaultFullHttpResponse( HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); return; }
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory( "ws://localhost:9502/websocket", null, false); handshaker = wsFactory.newHandshaker(req); if (handshaker == null) { WebSocketServerHandshakerFactory .sendUnsupportedVersionResponse(ctx.channel()); } else { handshaker.handshake(ctx.channel(), req); } }
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { // Check for closing frame if (frame instanceof CloseWebSocketFrame) { handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); return; } if (frame instanceof PingWebSocketFrame) { ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); return; } if (!(frame instanceof TextWebSocketFrame)) { Print.error("数据帧类型不支持!"); throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName())); }
// Send the uppercase string back. String request = ((TextWebSocketFrame) frame).text(); Print.info("Netty服务器接收到的信息: " + request); if (request.equals(Const.HEARTBEAT)){ ctx.channel().write(new TextWebSocketFrame(request)); return; }
JSONObject jsonData = JSONObject.parseObject(request); String eventType = jsonData.getString("event_type"); String apiToken = jsonData.getString("api_token"); if (Const.FRONT.equals(eventType)){ Print.info("front event"); ChannelSupervise.updateChannel(apiToken, ctx.channel()); }else if (Const.BEHIND.equals(eventType)){ Print.info("behind event"); Channel chan = ChannelSupervise.findChannel(apiToken); if (null == chan){ Print.error("目标用户不存在"); }else { JSONObject jsonMsg = new JSONObject(); jsonMsg.put("type", jsonData.get("type")); jsonMsg.put("child_type", jsonData.get("child_type")); jsonMsg.put("title", jsonData.get("title")); jsonMsg.put("body", jsonData.get("body")); ChannelSupervise.sendToSimple(apiToken, new TextWebSocketFrame(jsonMsg.toString())); Print.info("向目标用户发送成功"); } }else{ Print.error("event type error"); } }
private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) { // 返回应答给客户端 if (res.status().code() != 200) { ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); } ChannelFuture f = ctx.channel().writeAndFlush(res); // 如果是非Keep-Alive,关闭连接 if (!isKeepAlive(req) || res.status().code() != 200) { f.addListener(ChannelFutureListener.CLOSE); } }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }
private static String getWebSocketLocation(FullHttpRequest req) { return "ws://" + req.headers().get(HOST) + WEBSOCKET_PATH; }
/** * 接收客户端连接事件 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Print.info("客户端与服务端连接开启:" + ctx.channel()); ChannelSupervise.addChannel(null, ctx.channel()); }
/** * 接收客户端关闭事件 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Print.info("客户端与服务端连接关闭:" + ctx.channel()); ChannelSupervise.removeChannel(ctx.channel()); }
}

4、ChannelSupervise类

public class ChannelSupervise { private static ChannelGroup GlobalGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); private static ConcurrentMap<String, ChannelId> ChannelMap = new ConcurrentHashMap();
public static void addChannel(String apiToken, Channel channel){ GlobalGroup.add(channel); if (null != apiToken) { ChannelMap.put(apiToken, channel.id()); } }
public static void updateChannel(String apiToken, Channel channel){ Channel chan = GlobalGroup.find(channel.id()); if (null == chan){ addChannel(apiToken, channel); }else { ChannelMap.put(apiToken, channel.id()); } }
public static void removeChannel(Channel channel){ GlobalGroup.remove(channel); Collection<ChannelId> values = ChannelMap.values(); values.remove(channel.id()); }
public static Channel findChannel(String apiToken){ ChannelId chanId = ChannelMap.get(apiToken); if (null == chanId){ return null; }
return GlobalGroup.find(ChannelMap.get(apiToken)); }
public static void sendToAll(TextWebSocketFrame tws){ GlobalGroup.writeAndFlush(tws); }
public static void sendToSimple(String apiToken, TextWebSocketFrame tws){ GlobalGroup.find(ChannelMap.get(apiToken)).writeAndFlush(tws); }}

5、NettyClient类

@Servicepublic class NettyClient { private Channel channel; public void run(String strUri){ new Thread(){ public void run(){ runClient(strUri); } }.start();
private void runClient(String strUri) { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); URI uri = new URI(strUri); String protocol = uri.getScheme(); if (!"ws".equals(protocol)) { throw new IllegalArgumentException("Unsupported protocol: " + protocol); }
HttpHeaders customHeaders = new DefaultHttpHeaders(); customHeaders.add("MyHeader", "MyValue"); // Connect with V13 (RFC 6455 aka HyBi-17). You can change it to V08 or V00. // If you change it to V00, ping is not supported and remember to change // HttpResponseDecoder to WebSocketHttpResponseDecoder in the pipeline. final MyWebSocketClientHandler handler = new MyWebSocketClientHandler( WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, false, customHeaders)); b.group(group); b.channel(NioSocketChannel.class); b.handler(new ChannelInitializer<SocketChannel>() { @Overpublic void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("http-codec", new HttpClientCodec()); pipeline.addLast("aggregator", new HttpObjectAggregator(8192)); pipeline.addLast("ws-handler", handler); } }); Print.info("===============Message客户端启动==============="); channel = b.connect(uri.getHost(), uri.getPort()).sync().channel(); handler.handshakeFuture().sync(); channel.closeFuture().sync(); } catch (Exception e){ Print.error(e.getMessage()); } finally { group.shutdownGracefully(); } }}

6、MyWebSocketClientHandler类

public class MyWebSocketClientHandler extends SimpleChannelInboundHandler<Object> { private final WebSocketClientHandshaker handshaker; private ChannelPromise handshakeFuture;
public MyWebSocketClientHandler(WebSocketClientHandshaker handshaker) { this.handshaker = handshaker; }
public ChannelFuture handshakeFuture() { return handshakeFuture; }
@Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { handshakeFuture = ctx.newPromise(); }
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { handshaker.handshake(ctx.channel()); }
@Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Print.info("webSocket client disconnected!"); }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Channel ch = ctx.channel(); if (!handshaker.isHandshakeComplete()) { handshaker.finishHandshake(ch, (FullHttpResponse) msg); Print.info("websocket client connected!"); handshakeFuture.setSuccess(); return; }
if (msg instanceof FullHttpResponse) { FullHttpResponse response = (FullHttpResponse) msg; throw new Exception("Unexpected FullHttpResponse (getStatus=" + response.getStatus() + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')'); }
WebSocketFrame frame = (WebSocketFrame) msg; if (frame instanceof TextWebSocketFrame) { TextWebSocketFrame textFrame = (TextWebSocketFrame) frame; Print.info("客户端收到消息: " + textFrame.text()); } else if (frame instanceof PongWebSocketFrame) { Print.info("websocket client received pong"); } else if (frame instanceof CloseWebSocketFrame) { Print.info("websocket client received closing"); ch.close(); } }
@Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
}
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); if (!handshakeFuture.isDone()) { handshakeFuture.setFailure(cause); }
ctx.close(); }
}

7、启动类

@SpringBootApplication@Servicepublicclass MessageApplication { @Autowired private NettyServer server;
@Autowired private NettyClient client;
public static void main(String[] args) { SpringApplication.run(MessageApplication.class, args); }
@PostConstruct public void initMessage(){ server.run(9502); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
client.run("ws://localhost:" + 9502); }

8、客户端B测试页面

<!DOCTYPE html> <html> <head> <meta charset="UTF-8"> <title>WebSocket Chat</title> </head> <body> <script type="text/javascript"> var socket; if (!window.WebSocket) { window.WebSocket = window.MozWebSocket; }
if (window.WebSocket) { socket = new WebSocket("ws://localhost:9502"); socket.onmessage = function(event) { var ta = document.getElementById('responseText'); ta.value = ta.value + '\n' + event.data }; socket.onopen = function(event) { var ta = document.getElementById('responseText'); ta.value = "连接开启!"; }; socket.onclose = function(event) { var ta = document.getElementById('responseText'); ta.value = ta.value + "连接被关闭"; }; } else { alert("你的浏览器不支持 WebSocket!"); }
function send(message) { if (!window.WebSocket) { return; }
if (socket.readyState == WebSocket.OPEN) { socket.send(message); } else { alert("连接没有开启."); } }</script> <form onsubmit="return false;"> <h3>WebSocket:</h3> <textarea id="responseText" style="width: 500px; height: 300px;"></textarea> <br> <input type="text" name="message" style="width: 300px" value="1"> <input type="button" value="发送消息" onclick="send(this.form.message.value)"> <input type="button" onclick="javascript:document.getElementById('responseText').value=''" value="清空聊天记录"> </form> <br> </body></html>

三、测试

1、运行启动类后,会先启动netty服务器,再来启动一个netty客户端,等过30s模拟客户端A发送消息。

2、打开测试页面,在底下的输入框输入:{"event_type":"front", "api_token":"11111"},表示客户端B连接上netty服务器

测试结果如下:

消息模块:

通过Netty,实现Websocket消息推送简单几步搞定




扫描二维码

获取更多精彩

九码课堂