通过Netty,实现Websocket消息推送简单几步搞定
这里我强烈推荐neety框架,一款优秀的框架都是基于原生代码上进行包装,以此达到更好、更方便、更实用的目的。使用框架,很多需要我们去考虑的问题都可以交给框架去自动完成,不用再考虑。
一、逻辑架构图
上诉图中可以看出本次实现的基本流程:从客户端A发送请求到核心模块,核心模块生产消息至消息队列,然后服务端消息模块消费掉消息,再将消息推送给客户端B。流程很简单,没有太多技巧,本文主要重点就在消息模块这里,主要包括netty client、netty server、channel的存储等等。
二、代码
1、添加依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.6.Final</version>
</dependency>
2、NettyServer类
@Service
public class NettyServer {
public void run(int port){
new Thread(){
public void run(){
runServer(port);
}
}.start();
}
private void runServer(int port){
Print.info("===============Message服务端启动===============");
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class);
b.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("codec-http", new HttpServerCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
pipeline.addLast("handler", new MyWebSocketServerHandler());
}
});
Channel ch = b.bind(port).sync().channel();
Print.info("Message服务器启动成功:" + ch.toString());
ch.closeFuture().sync();
} catch (Exception e){
Print.error("Message服务运行异常:" + e.getMessage());
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
Print.info("Message服务已关闭");
}
}
}
3、MyWebSocketServerHandler类
public class MyWebSocketServerHandler extends SimpleChannelInboundHandler<Object>{
private static final String WEBSOCKET_PATH = "";
private WebSocketServerHandshaker handshaker;
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest){
//以http请求形式接入,但是走的是websocket
handleHttpRequest(ctx, (FullHttpRequest) msg);
}else if (msg instanceof WebSocketFrame){
//处理websocket客户端的消息
handleWebSocketFrame(ctx, (WebSocketFrame) msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception {
//要求Upgrade为websocket,过滤掉get/Post
if (!req.decoderResult().isSuccess()
|| (!"websocket".equals(req.headers().get("Upgrade")))) {
//若不是websocket方式,则创建BAD_REQUEST的req,返回给客户端
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return;
}
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
"ws://localhost:9502/websocket", null, false);
handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
WebSocketServerHandshakerFactory
.sendUnsupportedVersionResponse(ctx.channel()); } else {
handshaker.handshake(ctx.channel(), req); }
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
// Check for closing frame if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); return; }
if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); return; }
if (!(frame instanceof TextWebSocketFrame)) {
Print.error("数据帧类型不支持!"); throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName())); }
// Send the uppercase string back. String request = ((TextWebSocketFrame) frame).text(); Print.info("Netty服务器接收到的信息: " + request); if (request.equals(Const.HEARTBEAT)){
ctx.channel().write(new TextWebSocketFrame(request)); return; }
JSONObject jsonData = JSONObject.parseObject(request); String eventType = jsonData.getString("event_type"); String apiToken = jsonData.getString("api_token"); if (Const.FRONT.equals(eventType)){
Print.info("front event"); ChannelSupervise.updateChannel(apiToken, ctx.channel()); }else if (Const.BEHIND.equals(eventType)){
Print.info("behind event"); Channel chan = ChannelSupervise.findChannel(apiToken); if (null == chan){
Print.error("目标用户不存在"); }else {
JSONObject jsonMsg = new JSONObject(); jsonMsg.put("type", jsonData.get("type")); jsonMsg.put("child_type", jsonData.get("child_type")); jsonMsg.put("title", jsonData.get("title")); jsonMsg.put("body", jsonData.get("body")); ChannelSupervise.sendToSimple(apiToken, new TextWebSocketFrame(jsonMsg.toString())); Print.info("向目标用户发送成功"); }
}else{
Print.error("event type error"); }
}
private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) {
// 返回应答给客户端 if (res.status().code() != 200) {
ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); }
ChannelFuture f = ctx.channel().writeAndFlush(res); // 如果是非Keep-Alive,关闭连接 if (!isKeepAlive(req) || res.status().code() != 200) {
f.addListener(ChannelFutureListener.CLOSE); }
}
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace(); ctx.close(); }
private static String getWebSocketLocation(FullHttpRequest req) {
return "ws://" + req.headers().get(HOST) + WEBSOCKET_PATH; }
/** * 接收客户端连接事件 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception {
Print.info("客户端与服务端连接开启:" + ctx.channel()); ChannelSupervise.addChannel(null, ctx.channel()); }
/** * 接收客户端关闭事件 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Print.info("客户端与服务端连接关闭:" + ctx.channel()); ChannelSupervise.removeChannel(ctx.channel()); }
}
4、ChannelSupervise类
public class ChannelSupervise {
private static ChannelGroup GlobalGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private static ConcurrentMap<String, ChannelId> ChannelMap = new ConcurrentHashMap();
public static void addChannel(String apiToken, Channel channel){
GlobalGroup.add(channel);
if (null != apiToken) {
ChannelMap.put(apiToken, channel.id());
}
}
public static void updateChannel(String apiToken, Channel channel){
Channel chan = GlobalGroup.find(channel.id());
if (null == chan){
addChannel(apiToken, channel);
}else {
ChannelMap.put(apiToken, channel.id());
}
}
public static void removeChannel(Channel channel){
GlobalGroup.remove(channel);
Collection<ChannelId> values = ChannelMap.values();
values.remove(channel.id());
}
public static Channel findChannel(String apiToken){
ChannelId chanId = ChannelMap.get(apiToken);
if (null == chanId){
return null;
}
return GlobalGroup.find(ChannelMap.get(apiToken));
}
public static void sendToAll(TextWebSocketFrame tws){
GlobalGroup.writeAndFlush(tws);
}
public static void sendToSimple(String apiToken, TextWebSocketFrame tws){
GlobalGroup.find(ChannelMap.get(apiToken)).writeAndFlush(tws);
}
}
5、NettyClient类
@Servicepublic class NettyClient {
private Channel channel;
public void run(String strUri){
new Thread(){
public void run(){
runClient(strUri);
}
}.start();
private void runClient(String strUri) {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
URI uri = new URI(strUri);
String protocol = uri.getScheme();
if (!"ws".equals(protocol)) {
throw new IllegalArgumentException("Unsupported protocol: " + protocol);
}
HttpHeaders customHeaders = new DefaultHttpHeaders();
customHeaders.add("MyHeader", "MyValue");
// Connect with V13 (RFC 6455 aka HyBi-17). You can change it to V08 or V00.
// If you change it to V00, ping is not supported and remember to change
// HttpResponseDecoder to WebSocketHttpResponseDecoder in the pipeline.
final MyWebSocketClientHandler handler =
new MyWebSocketClientHandler(
WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, false, customHeaders)); b.group(group); b.channel(NioSocketChannel.class); b.handler(new ChannelInitializer<SocketChannel>() {
@Overpublic void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("http-codec", new HttpClientCodec()); pipeline.addLast("aggregator", new HttpObjectAggregator(8192)); pipeline.addLast("ws-handler", handler); }
}); Print.info("===============Message客户端启动==============="); channel = b.connect(uri.getHost(), uri.getPort()).sync().channel(); handler.handshakeFuture().sync(); channel.closeFuture().sync(); } catch (Exception e){
Print.error(e.getMessage()); } finally {
group.shutdownGracefully(); }
}
}
6、MyWebSocketClientHandler类
public class MyWebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
private final WebSocketClientHandshaker handshaker;
private ChannelPromise handshakeFuture;
public MyWebSocketClientHandler(WebSocketClientHandshaker handshaker) {
this.handshaker = handshaker;
}
public ChannelFuture handshakeFuture() {
return handshakeFuture;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
handshakeFuture = ctx.newPromise();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
handshaker.handshake(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Print.info("webSocket client disconnected!");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel ch = ctx.channel();
if (!handshaker.isHandshakeComplete()) {
handshaker.finishHandshake(ch, (FullHttpResponse) msg);
Print.info("websocket client connected!");
handshakeFuture.setSuccess();
return;
}
if (msg instanceof FullHttpResponse) {
FullHttpResponse response = (FullHttpResponse) msg;
throw new Exception("Unexpected FullHttpResponse (getStatus=" + response.getStatus() + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
}
WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
Print.info("客户端收到消息: " + textFrame.text());
} else if (frame instanceof PongWebSocketFrame) {
Print.info("websocket client received pong");
} else if (frame instanceof CloseWebSocketFrame) {
Print.info("websocket client received closing");
ch.close();
}
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
if (!handshakeFuture.isDone()) {
handshakeFuture.setFailure(cause);
}
ctx.close();
}
}
7、启动类
@SpringBootApplication
@Servicepublic
class MessageApplication {
@Autowired
private NettyServer server;
@Autowired
private NettyClient client;
public static void main(String[] args) {
SpringApplication.run(MessageApplication.class, args);
}
@PostConstruct
public void initMessage(){
server.run(9502);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
client.run("ws://localhost:" + 9502);
}
8、客户端B测试页面
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>WebSocket Chat</title>
</head>
<body>
<script type="text/javascript">
var socket;
if (!window.WebSocket) {
window.WebSocket = window.MozWebSocket;
}
if (window.WebSocket) {
socket = new WebSocket("ws://localhost:9502");
socket.onmessage = function(event) {
var ta = document.getElementById('responseText');
ta.value = ta.value + '\n' + event.data
};
socket.onopen = function(event) {
var ta = document.getElementById('responseText');
ta.value = "连接开启!";
};
socket.onclose = function(event) {
var ta = document.getElementById('responseText');
ta.value = ta.value + "连接被关闭";
};
} else {
alert("你的浏览器不支持 WebSocket!");
}
function send(message) {
if (!window.WebSocket) {
return;
}
if (socket.readyState == WebSocket.OPEN) {
socket.send(message);
} else {
alert("连接没有开启.");
}
}
</script>
<form onsubmit="return false;">
<h3>WebSocket:</h3>
<textarea id="responseText" style="width: 500px; height: 300px;"></textarea>
<br>
<input type="text" name="message" style="width: 300px" value="1">
<input type="button" value="发送消息" onclick="send(this.form.message.value)">
<input type="button" onclick="javascript:document.getElementById('responseText').value=''" value="清空聊天记录">
</form>
<br>
</body>
</html>
三、测试
1、运行启动类后,会先启动netty服务器,再来启动一个netty客户端,等过30s模拟客户端A发送消息。
2、打开测试页面,在底下的输入框输入:{"event_type":"front", "api_token":"11111"},表示客户端B连接上netty服务器
测试结果如下:
消息模块:
扫描二维码
获取更多精彩
九码课堂