vlambda博客
学习文章列表

GO 实现千万级 WebSocket 消息推送服务

【导读】WebSocket 是做什么的,应用上有什么坑?本文详细介绍了WebSocket技术和Go实现。

拉模式和推模式区别

拉模式(定时轮询访问接口获取数据)

  • 数据更新频率低,则大多数的数据请求时无效的
  • 在线用户数量多,则服务端的查询负载很高
  • 定时轮询拉取,无法满足时效性要求

推模式(向客户端进行数据的推送)

  • 仅在数据更新时,才有推送
  • 需要维护大量的在线长连接
  • 数据更新后,可以立即推送

基于WebSocket协议做推送

  • 浏览器支持的socket编程,轻松维持服务端的长连接
  • 基于TCP协议之上的高层协议,无需开发者关心通讯细节
  • 提供了高度抽象的编程接口,业务开发成本较低

WebSocket协议的交互流程

客户端首先发起一个Http请求到服务端,请求的特殊之处,在于在请求里面带了一个upgrade的字段,告诉服务端,我想生成一个websocket的协议,服务端收到请求后,会给客户端一个握手的确认,返回一个switching, 意思允许客户端向websocket协议转换,完成这个协商之后,客户端与服务端之间的底层TCP协议是没有中断的,接下来,客户端可以向服务端发起一个基于websocket协议的消息,服务端也可以主动向客户端发起websocket协议的消息,websocket协议里面通讯的单位就叫message。

————————————————

服务端技术选型与考虑

NodeJs

  • 单线程模型(尽管可以多进程),推送性能有限

C/C++

  • TCP通讯、WebSocket协议实现成本高

Go

  • 多线程,基于协程模型并发
  • Go语言属于编译型语言,运行速度并不慢
  • 成熟的WebSocket标准库,无需造轮子

基于Go实现WebSocket服务端

用Go语言对WebSocket做一个简单的服务端实现,以及HTML页面进行调试,并对WebSocket封装,这里就直接给出代码了。

WebSocket服务端

package main
 
import (
  "net/http"
  "github.com/gorilla/websocket"
  "github.com/myproject/gowebsocket/impl"
  "time"
  )
var(
 upgrader = websocket.Upgrader{
  // 允许跨域
  CheckOrigin:func(r *http.Request) bool{
   return true
  },
 }
)
 
func wsHandler(w http.ResponseWriter , r *http.Request){
 // w.Write([]byte("hello"))
 var(
  wsConn *websocket.Conn
  err error
  conn *impl.Connection
  data []byte
 )
 // 完成ws协议的握手操作
 // Upgrade:websocket
 if wsConn , err = upgrader.Upgrade(w,r,nil); err != nil{
  return 
 }
 
 if conn , err = impl.InitConnection(wsConn); err != nil{
  goto ERR
 }
 
 // 启动线程,不断发消息
 go func(){
  var (err error)
  for{
   if err = conn.WriteMessage([]byte("heartbeat"));err != nil{
    return 
   }
   time.Sleep(1*time.Second)
  }
 }()
 
 for {
  if data , err = conn.ReadMessage();err != nil{
   goto ERR
  }
  if err = conn.WriteMessage(data);err !=nil{
   goto ERR
  }
 }
 
 ERR:
  conn.Close()
 
}
 
func main(){
 
 http.HandleFunc("/ws",wsHandler)
 http.ListenAndServe("0.0.0.0:7777",nil)
}

前端页面

<!DOCTYPE html>
<html>
<head>
 <title>go websocket</title>
 <meta charset="utf-8" />  
</head>
<body>
 <script type="text/javascript">
  var wsUri ="ws://127.0.0.1:7777/ws"
     var output;  
     
     function init(
         output = document.getElementById("output"); 
         testWebSocket(); 
     }  
  
     function testWebSocket(
         websocket = new WebSocket(wsUri); 
         websocket.onopen = function(evt
             onOpen(evt) 
         }; 
         websocket.onclose = function(evt
             onClose(evt) 
         }; 
         websocket.onmessage = function(evt
             onMessage(evt) 
         }; 
         websocket.onerror = function(evt
             onError(evt) 
         }; 
     }  
  
     function onOpen(evt
         writeToScreen("CONNECTED"); 
        // doSend("WebSocket rocks"); 
     }  
  
     function onClose(evt
         writeToScreen("DISCONNECTED"); 
     }  
  
     function onMessage(evt
         writeToScreen('<span style="color: blue;">RESPONSE: '+ evt.data+'</span>'); 
        // websocket.close(); 
     }  
  
     function onError(evt
         writeToScreen('<span style="color: red;">ERROR:</span> '+ evt.data); 
     }  
  
     function doSend(message
         writeToScreen("SENT: " + message);  
         websocket.send(message); 
     }  
  
     function writeToScreen(message
         var pre = document.createElement("p"); 
         pre.style.wordWrap = "break-word"
         pre.innerHTML = message; 
         output.appendChild(pre); 
     }  
  
     window.addEventListener("load", init, false);  
     function sendBtnClick(){
      var msg = document.getElementById("input").value;
      doSend(msg);
      document.getElementById("input").value = '';
     }
     function closeBtnClick(){
      websocket.close(); 
     }
 
</script>
 <h2>WebSocket Test</h2>  
 <input type="text" id="input"></input>
 <button onclick="sendBtnClick()" >send</button>
 <button onclick="closeBtnClick()" >close</button>
 <div id="output"></div>  
 
</body>
</html>

封装WebSocket

package impl
 
import (
  "github.com/gorilla/websocket"
  "sync"
  "errors"
  )
 
type Connection struct{
 wsConnect *websocket.Conn
 inChan chan []byte
 outChan chan []byte
 closeChan chan byte
 
 mutex sync.Mutex  // 对closeChan关闭上锁
 isClosed bool  // 防止closeChan被关闭多次
}
 
func InitConnection(wsConn *websocket.Conn)(conn *Connection ,err error){
 conn = &Connection{
  wsConnect:wsConn,
  inChan: make(chan []byte,1000),
  outChan: make(chan []byte,1000),
  closeChan: make(chan byte,1),
 
 }
 // 启动读协程
 go conn.readLoop();
 // 启动写协程
 go conn.writeLoop();
 return
}
 
func (conn *Connection)ReadMessage()(data []byte , err error){
 
 select{
 case data = <- conn.inChan:
 case <- conn.closeChan:
  err = errors.New("connection is closeed")
 }
 return 
}
 
func (conn *Connection)WriteMessage(data []byte)(err error){
 
 select{
 case conn.outChan <- data:
 case <- conn.closeChan:
  err = errors.New("connection is closeed")
 }
 return 
}
 
func (conn *Connection)Close(){
 // 线程安全,可多次调用
 conn.wsConnect.Close()
 // 利用标记,让closeChan只关闭一次
 conn.mutex.Lock()
 if !conn.isClosed {
  close(conn.closeChan)
  conn.isClosed = true 
 }
 conn.mutex.Unlock()
}
 
// 内部实现
func (conn *Connection)readLoop(){
 var(
  data []byte
  err error
  )
 for{
  if _, data , err = conn.wsConnect.ReadMessage(); err != nil{
   goto ERR
  }
//阻塞在这里,等待inChan有空闲位置
  select{
   case conn.inChan <- data:
   case <- conn.closeChan:  // closeChan 感知 conn断开
    goto ERR
  }
  
 }
 
 ERR:
  conn.Close()
}
 
func (conn *Connection)writeLoop(){
 var(
  data []byte
  err error
  )
 
 for{
  select{
   case data= <- conn.outChan:
   case <- conn.closeChan:
    goto ERR
  }
  if err = conn.wsConnect.WriteMessage(websocket.TextMessage , data); err != nil{
   goto ERR
  }
 }
 
 ERR:
  conn.Close()
 
}

千万级弹幕系统的架构设计

技术难点

  • 内核瓶颈

推送量大:100W在线 * 10条/每秒 = 1000W条/秒

内核瓶颈:linux内核发送TCP的极限包频 ≈ 100W/秒

  • 锁瓶颈

需要维护在线用户集合(100W用户在线),通常是一个字典结构

推送消息即遍历整个集合,顺序发送消息,耗时极长

推送期间,客户端仍旧正常的上下线,集合面临不停的修改,修改需要遍历,所以集合需要上锁

  • CPU瓶颈

浏览器与服务端之间一般采用的是JSon格式去通讯

Json编码非常耗费CPU资源

向100W在线推送一次,则需100W次Json Encode

优化方案

  • 内核瓶颈

减少网络小包的发送,我们将网络上几百字节定义成网络的小包了,小包的问题是对内核和网络的中间设备造成处理的压力。方案是将一秒内N条消息合并成1条消息,合并后,每秒推送数等于在线连接数。

  • 锁瓶颈

大锁拆小锁,将长连接打散到多个集合中去,每个集合都有自己的锁,多线程并发推送集合,线程之间推送的集合不同,所以没有锁的竞争关系,避免锁竞争。

读写锁取代互斥锁,多个推送任务可以并发遍历相同集合

  • CPU瓶颈

减少重复计算,Json编码前置,1次消息编码+100W次推送,消息合并前置,N条消息合并后,只需要编码一次。

  • 集群

部署多个节点,通过负载均衡,把连接打散到多个 服务器上,但推送消息的时候,不知道哪个直播间在哪个节点上,最常用的方式是将消息广播给所有的网关节点,此时就需要做一个逻辑集群。

  • 逻辑集群

基于Http2协议向gateway集群分发消息(Http2支持连接复用,用作RPC性能更佳,即在单个连接上可以做高吞吐的请求应答处理)

基于Http1协议对外提供推送API(Http1更加普及,对业务方更加友好)

整体分布式架构图如下:

任何业务方通过Http接口调用到逻辑集群,逻辑集群把消息广播给所有网关,各个网关各自将消息推送给在线的连接即可。

本文讲解了开发消息推送服务的难点与解决方案的大体思路,按照整个理论流程下来,基本能实现一套弹幕消息推送的服务。


zhuanlan.zhihu.com/p/100770431

 - EOF -

推荐阅读(点击标题可打开)

1、

2、

3、


Go 开发大全

参与维护一个非常全面的Go开源技术资源库。日常分享 Go, 云原生、k8s、Docker和微服务方面的技术文章和行业动态。

关注后获取

回复 Go 获取6万star的Go资源库



分享、点赞和在看

支持我们分享更多好文章,谢谢!