GO 实现千万级 WebSocket 消息推送服务
【导读】WebSocket 是做什么的,应用上有什么坑?本文详细介绍了WebSocket技术和Go实现。
拉模式和推模式区别
拉模式(定时轮询访问接口获取数据)
-
数据更新频率低,则大多数的数据请求时无效的 -
在线用户数量多,则服务端的查询负载很高 -
定时轮询拉取,无法满足时效性要求
推模式(向客户端进行数据的推送)
-
仅在数据更新时,才有推送 -
需要维护大量的在线长连接 -
数据更新后,可以立即推送
基于WebSocket协议做推送
-
浏览器支持的socket编程,轻松维持服务端的长连接 -
基于TCP协议之上的高层协议,无需开发者关心通讯细节 -
提供了高度抽象的编程接口,业务开发成本较低
WebSocket协议的交互流程
客户端首先发起一个Http请求到服务端,请求的特殊之处,在于在请求里面带了一个upgrade的字段,告诉服务端,我想生成一个websocket的协议,服务端收到请求后,会给客户端一个握手的确认,返回一个switching, 意思允许客户端向websocket协议转换,完成这个协商之后,客户端与服务端之间的底层TCP协议是没有中断的,接下来,客户端可以向服务端发起一个基于websocket协议的消息,服务端也可以主动向客户端发起websocket协议的消息,websocket协议里面通讯的单位就叫message。
————————————————
服务端技术选型与考虑
NodeJs
-
单线程模型(尽管可以多进程),推送性能有限
C/C++
-
TCP通讯、WebSocket协议实现成本高
Go
-
多线程,基于协程模型并发 -
Go语言属于编译型语言,运行速度并不慢 -
成熟的WebSocket标准库,无需造轮子
基于Go实现WebSocket服务端
用Go语言对WebSocket做一个简单的服务端实现,以及HTML页面进行调试,并对WebSocket封装,这里就直接给出代码了。
WebSocket服务端
package main
import (
"net/http"
"github.com/gorilla/websocket"
"github.com/myproject/gowebsocket/impl"
"time"
)
var(
upgrader = websocket.Upgrader{
// 允许跨域
CheckOrigin:func(r *http.Request) bool{
return true
},
}
)
func wsHandler(w http.ResponseWriter , r *http.Request){
// w.Write([]byte("hello"))
var(
wsConn *websocket.Conn
err error
conn *impl.Connection
data []byte
)
// 完成ws协议的握手操作
// Upgrade:websocket
if wsConn , err = upgrader.Upgrade(w,r,nil); err != nil{
return
}
if conn , err = impl.InitConnection(wsConn); err != nil{
goto ERR
}
// 启动线程,不断发消息
go func(){
var (err error)
for{
if err = conn.WriteMessage([]byte("heartbeat"));err != nil{
return
}
time.Sleep(1*time.Second)
}
}()
for {
if data , err = conn.ReadMessage();err != nil{
goto ERR
}
if err = conn.WriteMessage(data);err !=nil{
goto ERR
}
}
ERR:
conn.Close()
}
func main(){
http.HandleFunc("/ws",wsHandler)
http.ListenAndServe("0.0.0.0:7777",nil)
}
前端页面
<!DOCTYPE html>
<html>
<head>
<title>go websocket</title>
<meta charset="utf-8" />
</head>
<body>
<script type="text/javascript">
var wsUri ="ws://127.0.0.1:7777/ws";
var output;
function init() {
output = document.getElementById("output");
testWebSocket();
}
function testWebSocket() {
websocket = new WebSocket(wsUri);
websocket.onopen = function(evt) {
onOpen(evt)
};
websocket.onclose = function(evt) {
onClose(evt)
};
websocket.onmessage = function(evt) {
onMessage(evt)
};
websocket.onerror = function(evt) {
onError(evt)
};
}
function onOpen(evt) {
writeToScreen("CONNECTED");
// doSend("WebSocket rocks");
}
function onClose(evt) {
writeToScreen("DISCONNECTED");
}
function onMessage(evt) {
writeToScreen('<span style="color: blue;">RESPONSE: '+ evt.data+'</span>');
// websocket.close();
}
function onError(evt) {
writeToScreen('<span style="color: red;">ERROR:</span> '+ evt.data);
}
function doSend(message) {
writeToScreen("SENT: " + message);
websocket.send(message);
}
function writeToScreen(message) {
var pre = document.createElement("p");
pre.style.wordWrap = "break-word";
pre.innerHTML = message;
output.appendChild(pre);
}
window.addEventListener("load", init, false);
function sendBtnClick(){
var msg = document.getElementById("input").value;
doSend(msg);
document.getElementById("input").value = '';
}
function closeBtnClick(){
websocket.close();
}
</script>
<h2>WebSocket Test</h2>
<input type="text" id="input"></input>
<button onclick="sendBtnClick()" >send</button>
<button onclick="closeBtnClick()" >close</button>
<div id="output"></div>
</body>
</html>
封装WebSocket
package impl
import (
"github.com/gorilla/websocket"
"sync"
"errors"
)
type Connection struct{
wsConnect *websocket.Conn
inChan chan []byte
outChan chan []byte
closeChan chan byte
mutex sync.Mutex // 对closeChan关闭上锁
isClosed bool // 防止closeChan被关闭多次
}
func InitConnection(wsConn *websocket.Conn)(conn *Connection ,err error){
conn = &Connection{
wsConnect:wsConn,
inChan: make(chan []byte,1000),
outChan: make(chan []byte,1000),
closeChan: make(chan byte,1),
}
// 启动读协程
go conn.readLoop();
// 启动写协程
go conn.writeLoop();
return
}
func (conn *Connection)ReadMessage()(data []byte , err error){
select{
case data = <- conn.inChan:
case <- conn.closeChan:
err = errors.New("connection is closeed")
}
return
}
func (conn *Connection)WriteMessage(data []byte)(err error){
select{
case conn.outChan <- data:
case <- conn.closeChan:
err = errors.New("connection is closeed")
}
return
}
func (conn *Connection)Close(){
// 线程安全,可多次调用
conn.wsConnect.Close()
// 利用标记,让closeChan只关闭一次
conn.mutex.Lock()
if !conn.isClosed {
close(conn.closeChan)
conn.isClosed = true
}
conn.mutex.Unlock()
}
// 内部实现
func (conn *Connection)readLoop(){
var(
data []byte
err error
)
for{
if _, data , err = conn.wsConnect.ReadMessage(); err != nil{
goto ERR
}
//阻塞在这里,等待inChan有空闲位置
select{
case conn.inChan <- data:
case <- conn.closeChan: // closeChan 感知 conn断开
goto ERR
}
}
ERR:
conn.Close()
}
func (conn *Connection)writeLoop(){
var(
data []byte
err error
)
for{
select{
case data= <- conn.outChan:
case <- conn.closeChan:
goto ERR
}
if err = conn.wsConnect.WriteMessage(websocket.TextMessage , data); err != nil{
goto ERR
}
}
ERR:
conn.Close()
}
千万级弹幕系统的架构设计
技术难点
-
内核瓶颈
推送量大:100W在线 * 10条/每秒 = 1000W条/秒
内核瓶颈:linux内核发送TCP的极限包频 ≈ 100W/秒
-
锁瓶颈
需要维护在线用户集合(100W用户在线),通常是一个字典结构
推送消息即遍历整个集合,顺序发送消息,耗时极长
推送期间,客户端仍旧正常的上下线,集合面临不停的修改,修改需要遍历,所以集合需要上锁
-
CPU瓶颈
浏览器与服务端之间一般采用的是JSon格式去通讯
Json编码非常耗费CPU资源
向100W在线推送一次,则需100W次Json Encode
优化方案
-
内核瓶颈
减少网络小包的发送,我们将网络上几百字节定义成网络的小包了,小包的问题是对内核和网络的中间设备造成处理的压力。方案是将一秒内N条消息合并成1条消息,合并后,每秒推送数等于在线连接数。
-
锁瓶颈
大锁拆小锁,将长连接打散到多个集合中去,每个集合都有自己的锁,多线程并发推送集合,线程之间推送的集合不同,所以没有锁的竞争关系,避免锁竞争。
读写锁取代互斥锁,多个推送任务可以并发遍历相同集合
-
CPU瓶颈
减少重复计算,Json编码前置,1次消息编码+100W次推送,消息合并前置,N条消息合并后,只需要编码一次。
-
集群
部署多个节点,通过负载均衡,把连接打散到多个 服务器上,但推送消息的时候,不知道哪个直播间在哪个节点上,最常用的方式是将消息广播给所有的网关节点,此时就需要做一个逻辑集群。
-
逻辑集群
基于Http2协议向gateway集群分发消息(Http2支持连接复用,用作RPC性能更佳,即在单个连接上可以做高吞吐的请求应答处理)
基于Http1协议对外提供推送API(Http1更加普及,对业务方更加友好)
整体分布式架构图如下:
任何业务方通过Http接口调用到逻辑集群,逻辑集群把消息广播给所有网关,各个网关各自将消息推送给在线的连接即可。
本文讲解了开发消息推送服务的难点与解决方案的大体思路,按照整个理论流程下来,基本能实现一套弹幕消息推送的服务。
zhuanlan.zhihu.com/p/100770431
- EOF -
1、
2、
3、
Go 开发大全
参与维护一个非常全面的Go开源技术资源库。日常分享 Go, 云原生、k8s、Docker和微服务方面的技术文章和行业动态。
关注后获取
回复 Go 获取6万star的Go资源库
分享、点赞和在看
支持我们分享更多好文章,谢谢!