我们一起来聊聊websocket
前言
说到websocket,我是又爱又恨,从我一开始工作的时候,就开始接触这个东西,当初也没有研究的很详细,后来无论是从事PHP还是Go方面的项目,对它也有了逐渐地认识。在一个比较完善的Web即时通讯项目中,WebSocket是占着非常重要的部分,但当我们去使用它,并不是像一般的造轮子应用一样稳定,其中在复杂的网络场景下就显示及其不稳定,常常会有数据丢包的情况,因此,如何在复杂网络场景下,更快地感知网络变动,快速恢复websocket的可用性,就变得尤其重要。下面就让我们一起聊聊websocket的心跳和WebSocket如何在不同网络状态下,如何实现快速断网重连。
心跳包与重连机制
引入心跳包的作用是监测客户端与服务端的连接是否正常,当ws服务断开之后,我们如何进行重连,如何舍弃旧连接,如何重新发起新连接,新连接在不同网络状态下是否可用,这正是我们要考虑的问题,下面就让我们一起来分析一下:
客户端断开
在使用websocket过程中,可能会出现网络断开的情况,比如信号不好,或者网络临时关闭,这时候websocket的连接已经断开,而不同浏览器有不同的机制,触发onclose的时机也不同,并不一定会执行websocket的onclose方法,我们无法知道是否断开连接,也就无法进行重连操作。
ws.onclose = function (e) {
console.log('websocket 断开: ' + e.code + ' ' + e.reason + ' ' + e.wasClean)
}
错误状态码:
WebSocket断开时,会触发onclose, onclose会在连接关闭时发送给使用 WebSockets 的客户端. 它在 WebSocket 对象的 onclose 事件监听器中使用。onclose的code字段表示了WebSocket断开的原因。可以从该字段中分析断开的原因,其中比较常见的错误码如下。
1000 CLOSE_NORMAL 正常关闭; 无论为何目的而创建, 该链接都已成功完成任务。
1001 CLOSE_GOING_AWAY 终端离开, 可能因为服务端错误, 也可能因为浏览器正从打开连接的页面跳转离开。
1002 CLOSE_PROTOCOL_ERROR 由于协议错误而中断连接。
1003 CLOSE_UNSUPPORTED 由于接收到不允许的数据类型而断开连接 (如仅接收文本数据的终端接收到了二进制数据)。
服务器端口断开
因为某些原因需要断开websocket,比如异常关闭,重启ws服务器,客户端是不会自动感应到ws服务是否可用,这就需要我们引入心跳包这个概念,当ws服务器既没有返回心跳响应消息,客户端又没有收到任何其他消息的情况下,我们就能断定ws服务器主动断开了。因此需要一种机制来检测客户端和服务端是否处于正常连接的状态下。通过指定时间间隔发送心跳包来保证连接正常,如果连接出现问题,就需要手动触发onclose事件,这时候便可进行重连操作。因此websocket心跳重连就应运而生,定时以不太快的频率发送心跳包,比如20s/次、40s/次等,具体可以根据应用场景来定;然后在网络状态由离线变为在线时立即发送一次心跳,检测当前连接是否可用,不可用的话立即进行恢复处理。这样在大多数情况下,上层的应用通信都能较快从不可用状态中恢复,对于少部分场景,有定时心跳作为兜底,在一个心跳周期内也能够恢复。服务端断开
服务器因为某些原因需要断开websocket,比如异常关闭,重启ws服务器,客户端是不会自动感应到ws服务是否可用,这就需要我们引入心跳包这个概念,当ws服务器既没有返回心跳响应消息,客户端又没有收到任何其他消息的情况下,我们就能断定ws服务器主动断开了。因此需要一种机制来检测客户端和服务端是否处于正常连接的状态下。通过指定时间间隔发送心跳包来保证连接正常,如果连接出现问题,就需要手动触发onclose事件,这时候便可进行重连操作。因此websocket心跳重连就应运而生,定时以不太快的频率发送心跳包,比如20s/次、40s/次等,具体可以根据应用场景来定;然后在网络状态由离线变为在线时立即发送一次心跳,检测当前连接是否可用,不可用的话立即进行恢复处理。这样在大多数情况下,上层的应用通信都能较快从不可用状态中恢复,对于少部分场景,有定时心跳作为兜底,在一个心跳周期内也能够恢复。
重连机制
重连ws服务器,我们需要引入延时函数,有些人可能会问,比如ws服务器出现网络抖动时,我不用延时不是更快连接上服务器。其实这是不可取的,当网络连接上所有的设备都会立即同时向服务器发起连接,相当于抢课一样,服务器负载过高,即可引发服务端雪崩效应。
webSocket的readyState属性
CONNECTING:值为0,表示正在连接。OPEN:值为1,表示连接成功,可以通信了。CLOSING:值为2,表示连接正在关闭。CLOSED:值为3,表示连接已经关闭,或者打开连接失败。
通过上述的描述中,总结 websocket连接如下:
1、通过发送心跳包,监测网络恢复情况,恢复后立即发送一次心跳包,判读服务器是否需要重连。
CLOSE_GOING_AWAY2、服务器断开旧链接,客户端立即弃用旧链接。
3、重连机制需要延时发起连接,减少最大的重连间隔。
实践
客户端代码
/**
* @program: Go
*
* @description:
*
* @author: Mr.chen
*
* @create: 2020-09-25 10:10
**/
<script>
var app=new Vue(
{
el:"#pageapp",
data:{
webSocket:{},
token:"",
ws: {
SocketTask: null,
Timer: null,
ErrorMsg: [],
MaxRetryCount: 3,// 最大重连次数
CurrentRetryCount: 0,
url: null
},
},
created:function(){
// 测试
// 测试结束
this.token = util.getcookie("token")
if (util.isEmptykey(this.token) || userInfo() ==null) {
var url = "/user/login.shtml"
location.href= url
}
// 检查token的真实性
// 若用户 10 秒后任在此页面,链接Socket
// setTimeout(function () {
// // if (!this.ws.SocketTask || this.ws.SocketTask.readyState == 3 || this.ws.SocketTask.readyState == 2) {
// // this.initwebsocket();
// // }
// }, 10000);
// websocket的readyState属性,CONNECTING:值为0,表示正在连接。OPEN:值为1,表示连接成功,可以通信了。CLOSING:值为2,表示连接正在关闭。CLOSED:值为3,表示连接已经关闭,或者打开连接失败。
if (!this.ws.SocketTask || this.ws.SocketTask.readyState == 3 || this.ws.SocketTask.readyState == 2){ //
this.initwebsocket()
}
},
mounted:function(){
},
beforeDestroy() {
},
methods:{
onmessage:function(data){
if (this.ws.ws_error) { // 发生错误或者当前不支持ebsocket,能发消息的时候将可重连的次数置为0
this.ws.CurrentRetryCount = 0;
this.ws.ws_error = false;
}
console.log(data)
},
initwebsocket:function(){
var url="ws://"+location.host+"/chat?authToken=" + this.token
this.webSocket=new WebSocket(url);
this.ws.SocketTask = this.webSocket;
var Kefu = this
this.webSocket.onopen = function () { // 打开websocket,定时发送心跳包
// 重新发送所有出错的消息
console.log(Kefu.ws.ErrorMsg)
if (Kefu.ws.ErrorMsg.length > 0) {
for (let i in Kefu.ws.ErrorMsg) {
Kefu.ws_send(Kefu.ws.ErrorMsg[i]);
}
Kefu.ws.ErrorMsg = [];
}
if (Kefu.ws.Timer != null) { // 清除当前的定时器
clearInterval(Kefu.ws.Timer);
}
Kefu.ws.Timer = setInterval(Kefu.ws_send, 28000);//定时发送心跳
};
//消息处理
this.webSocket.onmessage = function(evt){
console.log("ErrorMsg",this.ws.ErrorMsg)
if(evt.data.indexOf("}")>-1){
this.onmessage(JSON.parse(evt.data));
}else{
console.log("recv<=="+evt.data)
}
}.bind(this)
//关闭websocket
this.webSocket.onclose=function (evt) { // websocket 自动重连3次
//console.log(evt.data)
if (Kefu.ws.Timer != null) {
clearInterval(Kefu.ws.Timer);
}
Kefu.ws.ws_error = true;
console.log("网络链接已断开")
if (Kefu.ws.MaxRetryCount) {
Kefu.ws.Timer = setInterval(Kefu.retry_webSocket, 3000);//每3秒重新连接一次
}
}
//出错回调
this.webSocket.onerror=function (evt) {
console.log(evt)
Kefu.ws.ws_error = true;
console.error('websocket 错误:', evt);
}
/*{
this.webSocket.send()
}*/
},
// 发送消息-心跳包
ws_send: function (message) {
if (!message) { // 发送心跳包
message = {cmd: 1, content: 'ping'};
}
// websocket的readyState属性,CONNECTING:值为0,表示正在连接。OPEN:值为1,表示连接成功,可以通信了。CLOSING:值为2,表示连接正在关闭。CLOSED:值为3,表示连接已经关闭,或者打开连接失败。
if (this.ws.SocketTask && this.ws.SocketTask.readyState == 1) { // 链接成功
this.ws.SocketTask.send(JSON.stringify(message));
} else {
console.log('消息发送出错', message)
this.ws.ErrorMsg.push(message);
}
},
// websocket断开重连,切勿直接连接,遇到因网络原因导致的重连时,是万万不能立即发起一次新连接的,否则当出现网络抖动时,所有的设备都会立即同时向服务器发起连接,这无异于黑客通过发起大量请求消耗网络带宽引起的拒绝服务攻击,这对服务器来说简直是灾难(即:服务端雪崩效应)。
retry_webSocket: function () {
if (this.ws.CurrentRetryCount < this.ws.MaxRetryCount) {
this.ws.CurrentRetryCount++;
this.initwebsocket();
console.log('重连 WebSocket 第' + this.ws.CurrentRetryCount + '次');
} else {
if (this.ws.Timer != null) {
clearInterval(this.ws.Timer);
}
console.log('每隔10秒将再次尝试重连 WebSocket')
this.ws.Timer = setInterval(this.initwebsocket, 10000);//每10秒重新连接一次
}
},
},
watch:{
}
}
)
</script>
服务端代码
/**
* @program: Go
*
* @description:
*
* @author: Mr.chen
*
* @create: 2020-09-25 10:10
**/
package main
import (
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
)
var clientMap = make(map[int64]*Node, 0)
//读写锁
var rwLocker sync.RWMutex
var errSendList = make(map[int64][][]byte,0) // 异常对列,当服务端重新启动的时候,将异常对列
func chat(c *gin.Context) {
baseMethod(c) // 验证token
userId := UserInfos.Id
conn, err := (&websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}).Upgrade(c.Writer, c.Request, nil)
if err != nil {
log.Println(err.Error())
return
}
//todo 获得conn
node := &Node{
Conn: conn,
DataQueue: make(chan []byte, 50),
GroupSets: set.New(set.ThreadSafe),
}
//todo userid和node形成绑定关系
rwLocker.Lock()
clientMap[userId] = node
rwLocker.Unlock()
// 启动成功将
//todo 完成发送逻辑,con
go sendMessage(node)
//todo 完成接收逻辑
go recvMessage(node)
// 判断当前用户是否有未发的
rwLocker.RLock()
messageNode, isok := errSendList[userId]
if isok {
for k, _:= range messageNode {
dispatch(messageNode[k],true)
// 将当前的信息
//log.Println("错误的消息",messageNode[k])
}
delete(errSendList,userId)
}
rwLocker.RUnlock()
sendMsg(userId, []byte("websocket链接成功")) // 返回给给客户端表示链接成功,客户端需要响应
}
//ws发送协程
func sendMessage(node *Node) {
for {
select {
case data := <-node.DataQueue:
err := node.Conn.WriteMessage(websocket.TextMessage, data)
if err != nil {
log.Println(err.Error())
return
}
}
}
}
//ws接收协程
func recvMessage(node *Node) {
for {
_, data, err := node.Conn.ReadMessage()
if err != nil {
log.Println(err.Error())
return
}
dispatch(data,false)
//把消息广播到局域网
broadMsg(data)
log.Printf("[ws1]<=%s\n", data)
}
}
//后端调度逻辑处理
func dispatch(data []byte,stype bool) { // type 为true,不插入
//todo 解析data为message
// 解析心跳包格式{"c":"Message","a":"ping"}
msg := Message{}
err := json.Unmarshal(data, &msg)
if err != nil {
log.Println(err.Error())
return
}
isok := true
//todo 根据cmd对逻辑进行处理
switch msg.Cmd {
case CMD_SINGLE_MSG:
isok =sendMsg(msg.Dstid, data)
log.Println(isok)
case CMD_ROOM_MSG:
//todo 群聊转发逻辑
for k, v := range clientMap {
if v.GroupSets.Has(msg.Dstid) && msg.Userid != k { // 去掉群主,否者群主会发送两条消息
v.DataQueue <- data
log.Println(string(data))
}
}
case CMD_PING:
log.Println("接收到的新跳包",msg.Content)
case CMD_HEART:
//todo 一般啥都不做
}
if (msg.Cmd == CMD_SINGLE_MSG && !stype) || msg.Cmd == CMD_ROOM_MSG { // 单聊,群聊插入数据库
// 插入数据库
message := &model.Message{
Userid:msg.Userid,
Cmd:msg.Cmd,
Dstid:msg.Dstid,
Media:msg.Media,
Content:msg.Content,
Pic:msg.Pic,
Url:msg.Url,
Memo:msg.Memo,
Amount:msg.Amount,
Createtime: time.Now(),
}
messageService.Add(message)
}
}
//todo 发送消息
func sendMsg(userId int64, msg []byte) bool {
rwLocker.RLock()
node, ok := clientMap[userId]
//log.Println("接收者的ID:",userId)
//log.Println("接收者的ok:",ok)
rwLocker.RUnlock()
if ok { // 当服务器断开链接,客户端同时发送信息,某个链接的客户端未重来起来,重新发送
node.DataQueue <- msg
return true
}else{ // 重新发送防止未链接成功,起一个携程,将未处理的数据重新发送
rwLocker.Lock()
messageNode, _ := errSendList[userId]
messageNode = append(messageNode,msg)
errSendList[userId] = messageNode
log.Println(errSendList)
rwLocker.Unlock()
return false
//sendMsg(userId, msg)
}
}
总结:
通过上述的简单实践,让我们更加深入了解websocket,当前的项目只是实现了消息的发送与接收,还有很大的改进空间,在细节上也有很多没有考虑到,比如消息引入redis存储,引入channel通道来管控消息的发送与接收,离线消息的处理等等,如果作者有空闲时间,我将会以此项目改造成微服务化的项目,在微服务化之后需要添加什么功能也可以留言给作者,让我们一起来完善当前项目。