通过Netty,实现Websocket消息推送简单几步搞定
这里我强烈推荐neety框架,一款优秀的框架都是基于原生代码上进行包装,以此达到更好、更方便、更实用的目的。使用框架,很多需要我们去考虑的问题都可以交给框架去自动完成,不用再考虑。
一、逻辑架构图
上诉图中可以看出本次实现的基本流程:从客户端A发送请求到核心模块,核心模块生产消息至消息队列,然后服务端消息模块消费掉消息,再将消息推送给客户端B。流程很简单,没有太多技巧,本文主要重点就在消息模块这里,主要包括netty client、netty server、channel的存储等等。
二、代码
1、添加依赖
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.6.Final</version></dependency>
2、NettyServer类
@Servicepublic class NettyServer {public void run(int port){new Thread(){public void run(){runServer(port);}}.start();}private void runServer(int port){Print.info("===============Message服务端启动===============");EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup);b.channel(NioServerSocketChannel.class);b.childHandler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("codec-http", new HttpServerCodec());pipeline.addLast("aggregator", new HttpObjectAggregator(65536));pipeline.addLast("handler", new MyWebSocketServerHandler());}});Channel ch = b.bind(port).sync().channel();Print.info("Message服务器启动成功:" + ch.toString());ch.closeFuture().sync();} catch (Exception e){Print.error("Message服务运行异常:" + e.getMessage());e.printStackTrace();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();Print.info("Message服务已关闭");}}}
3、MyWebSocketServerHandler类
public class MyWebSocketServerHandler extends SimpleChannelInboundHandler<Object>{private static final String WEBSOCKET_PATH = "";private WebSocketServerHandshaker handshaker;@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof FullHttpRequest){//以http请求形式接入,但是走的是websockethandleHttpRequest(ctx, (FullHttpRequest) msg);}else if (msg instanceof WebSocketFrame){//处理websocket客户端的消息handleWebSocketFrame(ctx, (WebSocketFrame) msg);}}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();}private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception {//要求Upgrade为websocket,过滤掉get/Postif (!req.decoderResult().isSuccess()|| (!"websocket".equals(req.headers().get("Upgrade")))) {//若不是websocket方式,则创建BAD_REQUEST的req,返回给客户端sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));return;}WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://localhost:9502/websocket", null, false);handshaker = wsFactory.newHandshaker(req);if (handshaker == null) {WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } else {handshaker.handshake(ctx.channel(), req); }}private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {// Check for closing frame if (frame instanceof CloseWebSocketFrame) {handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); return; }if (frame instanceof PingWebSocketFrame) {ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); return; }if (!(frame instanceof TextWebSocketFrame)) {Print.error("数据帧类型不支持!"); throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName())); }// Send the uppercase string back. String request = ((TextWebSocketFrame) frame).text(); Print.info("Netty服务器接收到的信息: " + request); if (request.equals(Const.HEARTBEAT)){ctx.channel().write(new TextWebSocketFrame(request)); return; }JSONObject jsonData = JSONObject.parseObject(request); String eventType = jsonData.getString("event_type"); String apiToken = jsonData.getString("api_token"); if (Const.FRONT.equals(eventType)){Print.info("front event"); ChannelSupervise.updateChannel(apiToken, ctx.channel()); }else if (Const.BEHIND.equals(eventType)){Print.info("behind event"); Channel chan = ChannelSupervise.findChannel(apiToken); if (null == chan){Print.error("目标用户不存在"); }else {JSONObject jsonMsg = new JSONObject(); jsonMsg.put("type", jsonData.get("type")); jsonMsg.put("child_type", jsonData.get("child_type")); jsonMsg.put("title", jsonData.get("title")); jsonMsg.put("body", jsonData.get("body")); ChannelSupervise.sendToSimple(apiToken, new TextWebSocketFrame(jsonMsg.toString())); Print.info("向目标用户发送成功"); }}else{Print.error("event type error"); }}private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) {// 返回应答给客户端 if (res.status().code() != 200) {ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); }ChannelFuture f = ctx.channel().writeAndFlush(res); // 如果是非Keep-Alive,关闭连接 if (!isKeepAlive(req) || res.status().code() != 200) {f.addListener(ChannelFutureListener.CLOSE); }}@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace(); ctx.close(); }private static String getWebSocketLocation(FullHttpRequest req) {return "ws://" + req.headers().get(HOST) + WEBSOCKET_PATH; }/** * 接收客户端连接事件 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception {Print.info("客户端与服务端连接开启:" + ctx.channel()); ChannelSupervise.addChannel(null, ctx.channel()); }/** * 接收客户端关闭事件 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception {Print.info("客户端与服务端连接关闭:" + ctx.channel()); ChannelSupervise.removeChannel(ctx.channel()); }}
4、ChannelSupervise类
public class ChannelSupervise {private static ChannelGroup GlobalGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);private static ConcurrentMap<String, ChannelId> ChannelMap = new ConcurrentHashMap();public static void addChannel(String apiToken, Channel channel){GlobalGroup.add(channel);if (null != apiToken) {ChannelMap.put(apiToken, channel.id());}}public static void updateChannel(String apiToken, Channel channel){Channel chan = GlobalGroup.find(channel.id());if (null == chan){addChannel(apiToken, channel);}else {ChannelMap.put(apiToken, channel.id());}}public static void removeChannel(Channel channel){GlobalGroup.remove(channel);Collection<ChannelId> values = ChannelMap.values();values.remove(channel.id());}public static Channel findChannel(String apiToken){ChannelId chanId = ChannelMap.get(apiToken);if (null == chanId){return null;}return GlobalGroup.find(ChannelMap.get(apiToken));}public static void sendToAll(TextWebSocketFrame tws){GlobalGroup.writeAndFlush(tws);}public static void sendToSimple(String apiToken, TextWebSocketFrame tws){GlobalGroup.find(ChannelMap.get(apiToken)).writeAndFlush(tws);}}
5、NettyClient类
@Servicepublic class NettyClient {private Channel channel;public void run(String strUri){new Thread(){public void run(){runClient(strUri);}}.start();private void runClient(String strUri) {EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();URI uri = new URI(strUri);String protocol = uri.getScheme();if (!"ws".equals(protocol)) {throw new IllegalArgumentException("Unsupported protocol: " + protocol);}HttpHeaders customHeaders = new DefaultHttpHeaders();customHeaders.add("MyHeader", "MyValue");// Connect with V13 (RFC 6455 aka HyBi-17). You can change it to V08 or V00.// If you change it to V00, ping is not supported and remember to change// HttpResponseDecoder to WebSocketHttpResponseDecoder in the pipeline.final MyWebSocketClientHandler handler =new MyWebSocketClientHandler(WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, false, customHeaders)); b.group(group); b.channel(NioSocketChannel.class); b.handler(new ChannelInitializer<SocketChannel>() {@Overpublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("http-codec", new HttpClientCodec()); pipeline.addLast("aggregator", new HttpObjectAggregator(8192)); pipeline.addLast("ws-handler", handler); }}); Print.info("===============Message客户端启动==============="); channel = b.connect(uri.getHost(), uri.getPort()).sync().channel(); handler.handshakeFuture().sync(); channel.closeFuture().sync(); } catch (Exception e){Print.error(e.getMessage()); } finally {group.shutdownGracefully(); }}}
6、MyWebSocketClientHandler类
public class MyWebSocketClientHandler extends SimpleChannelInboundHandler<Object> {private final WebSocketClientHandshaker handshaker;private ChannelPromise handshakeFuture;public MyWebSocketClientHandler(WebSocketClientHandshaker handshaker) {this.handshaker = handshaker;}public ChannelFuture handshakeFuture() {return handshakeFuture;}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {handshakeFuture = ctx.newPromise();}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {handshaker.handshake(ctx.channel());}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {Print.info("webSocket client disconnected!");}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {Channel ch = ctx.channel();if (!handshaker.isHandshakeComplete()) {handshaker.finishHandshake(ch, (FullHttpResponse) msg);Print.info("websocket client connected!");handshakeFuture.setSuccess();return;}if (msg instanceof FullHttpResponse) {FullHttpResponse response = (FullHttpResponse) msg;throw new Exception("Unexpected FullHttpResponse (getStatus=" + response.getStatus() + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');}WebSocketFrame frame = (WebSocketFrame) msg;if (frame instanceof TextWebSocketFrame) {TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;Print.info("客户端收到消息: " + textFrame.text());} else if (frame instanceof PongWebSocketFrame) {Print.info("websocket client received pong");} else if (frame instanceof CloseWebSocketFrame) {Print.info("websocket client received closing");ch.close();}}@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();if (!handshakeFuture.isDone()) {handshakeFuture.setFailure(cause);}ctx.close();}}
7、启动类
@SpringBootApplication@Servicepublicclass MessageApplication {@Autowiredprivate NettyServer server;@Autowiredprivate NettyClient client;public static void main(String[] args) {SpringApplication.run(MessageApplication.class, args);}@PostConstructpublic void initMessage(){server.run(9502);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}client.run("ws://localhost:" + 9502);}
8、客户端B测试页面
<!DOCTYPE html><html><head><meta charset="UTF-8"><title>WebSocket Chat</title></head><body><script type="text/javascript">var socket;if (!window.WebSocket) {window.WebSocket = window.MozWebSocket;}if (window.WebSocket) {socket = new WebSocket("ws://localhost:9502");socket.onmessage = function(event) {var ta = document.getElementById('responseText');ta.value = ta.value + '\n' + event.data};socket.onopen = function(event) {var ta = document.getElementById('responseText');ta.value = "连接开启!";};socket.onclose = function(event) {var ta = document.getElementById('responseText');ta.value = ta.value + "连接被关闭";};} else {alert("你的浏览器不支持 WebSocket!");}function send(message) {if (!window.WebSocket) {return;}if (socket.readyState == WebSocket.OPEN) {socket.send(message);} else {alert("连接没有开启.");}}</script><form onsubmit="return false;"><h3>WebSocket:</h3><textarea id="responseText" style="width: 500px; height: 300px;"></textarea><br><input type="text" name="message" style="width: 300px" value="1"><input type="button" value="发送消息" onclick="send(this.form.message.value)"><input type="button" onclick="javascript:document.getElementById('responseText').value=''" value="清空聊天记录"></form><br></body></html>
三、测试
1、运行启动类后,会先启动netty服务器,再来启动一个netty客户端,等过30s模拟客户端A发送消息。
2、打开测试页面,在底下的输入框输入:{"event_type":"front", "api_token":"11111"},表示客户端B连接上netty服务器
测试结果如下:
消息模块:
扫描二维码
获取更多精彩
九码课堂
