Go语言网络库getty的那些事
1 Getty 分层设计
1.1 数据交互层
// Reader is used to unmarshal a complete pkg from buffer
type Reader interface {
// Parse tcp/udp/websocket pkg from buffer and if possible return a complete pkg.
// When receiving a tcp network streaming segment, there are 4 cases as following:
// case 1: a error found in the streaming segment;
// case 2: can not unmarshal a pkg header from the streaming segment;
// case 3: unmarshal a pkg header but can not unmarshal a pkg from the streaming segment;
// case 4: just unmarshal a pkg from the streaming segment;
// case 5: unmarshal more than one pkg from the streaming segment;
//
// The return value is (nil, 0, error) as case 1.
// The return value is (nil, 0, nil) as case 2.
// The return value is (nil, pkgLen, nil) as case 3.
// The return value is (pkg, pkgLen, nil) as case 4.
// The handleTcpPackage may invoke func Read many times as case 5.
Read(Session, []byte) ( interface{}, int, error)
}
// Writer is used to marshal pkg and write to session
type Writer interface {
// if @Session is udpGettySession, the second parameter is UDPContext.
Write(Session, interface{}) ([]byte, error)
}
// package handler interface
type ReadWriter interface {
Reader
Writer
}
ReadWriter
接口定义代码如上。
Read
接口之所以有三个返回值,是为了处理 TCP 流粘包情况:
如果发生了网络流错误,如协议格式错误,返回
(nil, 0, error)
如果读到的流很短,其头部 (header) 都无法解析出来,则返回
(nil, 0, nil)
如果读到的流很短,可以解析出其头部 (header) 但无法解析出整个包 (package),则返回
(nil, pkgLen, nil)
如果能够解析出一个完整的包 (package),则返回
(pkg, 0, error)
1.2 业务控制层
Connection 负责建立的 Socket 连接的管理,主要包括:连接状态管理,连接超时控制,连接重连控制,数据包的相关处理,如数据包压缩,数据包拼接重组等。
Session 负责客户端的一次连接建立的管理,记录着本次连接的状态数据,管理 Connection 的创建,关闭,控制数据的发送/接口的处理。
1.2.1 Session
1.2.2 Connection
gettyTCPConn:底层是 *net.TCPConn
gettyUDPConn:底层是 *net.UDPConn
gettyWSConn:底层使用第三方库实现
1.3 网络 API 接口 EventListener
OnOpen:连接建立时提供给用户使用,若当前连接总数超过用户设定的连接数,则可以返回一个非 nil 的 error,Getty 就会在初始阶段关闭这个连接
OnError:用于连接有异常时的监控,Getty 执行这个接口后关闭连接
OnClose:用于连接关闭时的监控,Getty 执行这个接口后关闭连接
OnMessage:当 Getty 调用 Reader 接口成功从 TCP流/UDP/WebSocket 网络中解析出一个 package 后,通过这个接口把数据包交给用户处理
OnCron:定时接口,用户可以在这里接口函数中执行心跳检测等一些定时逻辑
OnMessage
,该方法有一个 interface{} 类型的参数,用于接收对端发来的数据。可能大家有个疑惑,网络连接最底层传输的是二进制,到我们使用的协议层一般以字节流的方式对连接进行读写,那这里为什么要使用 interface{} 呢?这是 getty 为了让我们能够专注编写业务逻辑,将序列化和反序列化的逻辑抽取到了 EventListener 外面,也就是前面提到的 Reader/Writer 接口,session 在运行过程中,会先从 net.Conn 中读取字节流,并通过 Reader 接口进行反序列化,再将反序列化的结果传递给 OnMessage 方法。如果想把对应的指标接入到 Prometheus,在这些 EventListener 接口中很容易添加各种 metrics 的收集。
2 Getty 网络端数据流程
说明:图中灰色部分为 Go 内置库
getty 启动代码流程图如上。server 服务的启动流程只需要两行代码:
server := getty.NewTCPServer(options...)
server.RunEventLoop(NewHelloServerSession)
func (*ServerOptions)
函数,用于给 server 添加一些额外功能设置,如启用 ssl,使用任务队列提交任务的形式执行任务等。第二行的
server.RunEventLoop(NewHelloServerSession)
则是启动 server,同时也是整个 server 服务的入口,它的作用是监听某个端口(具体监听哪个端口可以通过 options 指定),并处理 client 发来的数据。RunEventLoop 方法需要提供一个参数 NewSessionCallback,该参数的类型定义如下:
type NewSessionCallback func(Session) error
3 优化
1 kitex在并发数为2000的时候,客户端调用会有少量出错,并发数为5000时,会有10+%的调用出错;
2 kitex在并发量小的时候吞吐率要比rpcx要好,随着并发量增多,吞吐率基本差不多,吞吐率在大一些,它的长尾效应很明显P99.9延迟很高。
3.1 Goroutine Pool
EventListener.OnMessage()
接口进行逻辑处理;另一个 goroutine 负责发送网络字节流、调用
EventListener.OnCron()
执行定时逻辑。后来出于提升网络吞吐的需要,getty 进行了一次大的优化:将逻辑处理这步逻辑从第一个 goroutine 任务中分离,添加 Goroutine Pool【下文简称 gr pool】专门处理网络逻辑。即网络字节流接收、逻辑处理和网络字节流发送都有单独的 goroutine 处理。
3.1.1 固定大小 Gr Pool
kafka-connect-elasticsearch
中实现过此类型的 Gr Pool:作为消费者从 kafka 读取数据然后放入消息队列,然后各个 worker gr 从此队列中取出任务进行消费处理。
// ┌───────┐ ┌───────┐ ┌───────┐ ┌─────────────────────────┐
// │worker0│ │worker2│ │worker4│ ┌─┤ taskId % NumQueues == 0 │
// └───────┘ └───────┘ └───────┘ │ └─────────────────────────┘
// │ │ │ │
// └───────consume───────┘ enqueue
// ▼ task ╔══════════════════╗
// ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐ │ ║ baseWorkerPool: ║
// TaskQueue0 │t0│t1│t2│t3│t4│t5│t6│t7│t8│t9│◀─┘ ║ ║
// ├──┼──┼──┼──┼──┼──┼──┼──┼──┼──┤ ║ *NumWorkers=6 ║
// TaskQueue1 │t0│t1│t2│t3│t4│t5│t6│t7│t8│t9│◀┐ ║ *NumQueues=2 ║
// └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘ │ ║ *QueueSize=10 ║
// ▲ enqueue ╚══════════════════╝
// ┌───────consume───────┐ task
// │ │ │ │
// ┌───────┐ ┌───────┐ ┌───────┐ │ ┌─────────────────────────┐
// │worker1│ │worker3│ │worker5│ └──│ taskId % NumQueues == 1 │
// └───────┘ └───────┘ └───────┘ └─────────────────────────┘
gr pool
【参考7】 连接中的
TaskPool
实现。
3.1.2 无限制 Gr Pool
gr pool
【参考7】 连接中的
taskPoolSimple
实现。
3.1.3 网络包处理顺序
固定大小的 gr pool
优点是限定了逻辑处理流程对机器 CPU/MEMORY 等资源的使用,而
无限制 Gr Pool
虽然保持了弹性但有可能耗尽机器的资源导致容器被内核杀掉。但无论使用何种形式的
gr pool
,getty 无法保证网络包的处理顺序。
3.2 Lazy Reconnect
client.Close()
接口把连接池关闭掉。如果上层用户没有调用这个接口把连接池关闭掉,client 就认为对端地址还有效,就会不断尝试发起重连,维护连接池。
-
1 旧 session 关闭 网络接收 goroutine
; -
2 旧 session 网络发送 goroutine
探测到网络接收 goroutine
退出后终止网络发送,进行资源回收后设定当前 session 无效; -
3 client 的轮询 goroutine 检测到无效 session 后把它从 session 连接池删除; -
4 client 的轮询 goroutine 检测到有效 session 数目少于 getty 上层使用者设定的数目 且 getty 上层使用者没有通过 client.Close()
接口关闭连接池时,就调用连接接口发起新连接。
网络发送 goroutine
进行 “废物利用”,在这个 goroutine 标记当前 session 无效的逻辑步骤之后再加上一个逻辑:
-
1 如果当前 session 的维护者是一个 client【因为 session 的使用者也可能是 server】; -
2 且如果其当前 session pool 的 session 数量少于上层使用者设定的 session number; -
3 且如果上层使用者还没有通过 client.Close()
设定当前 session pool 无效【即当前 session pool 有效,或者说是对端 server 有效】 -
4 满足上面三个条件, 网络发送 goroutine
执行连接重连即可; -
5 新网络连接 session 建立成功且被加入 client 的 session pool 后, 网络发送 goroutine
使命完成直接退出。
lazy reconnect
,
网络发送 goroutine
在其生命周期的最后阶段应该被称之为
网络重连 goroutine
。通过
lazy reconnect
这种方式,上述重连步骤 3 和 步骤 4 的逻辑被合入了步骤 2,client 当然也就没必要再启动一个额外的 goroutine 通过定时轮询的方式维护其 session pool 了。
lazy reconnect
整体流程图如上。如果对相关代码流程感兴趣,请移步 "参考 13" 给出的链接,很容易自行分析出来。
3.3 定时器
-
一个 goroutine 进行网络字节流的接收、调用 Reader 接口拆解出网络包 (package) -
第二个 goroutine 调用 EventListener.OnMessage()
接口进行逻辑处理 -
第三个 goroutine 负责发送网络字节流、调用 EventListener.OnCron()
执行定时逻辑以及lazy reconnect
EventListener.OnCron()
定时处理任务和
lazy reconnect
任务。这个定时逻辑其实可以抛给 getty 上层调用者处理,但出于方便用户和向后兼容地考虑,我们使用了另一种优化思路:引入时间轮管理定时心跳检测。
timer wheel
(链接见参考 10),兼容 Go 的 timer 所有原生接口,其优点是所有时间任务都在一个 goroutine 内执行。2020 年 12 月把它引入 getty 后,getty 所有的
EventListener.OnCron()
定时处理任务均交由 timer wheel 处理,第三个 goroutine 就可以完美地消失了【后续:两个月后发现 timer 库被另一个擅长拉 star 的 rpc 项目抄走了^+^】。
lazy reconnect
。当第三个 goroutine 不存在后,这个任务完全可以放入第一个 goroutine:在当
网络字节流接收 goroutine
检测到网络错误退出前的最后一个步骤,执行
lazy reconnect
。
-
一个 goroutine 进行网络字节流的接收、调用 Reader 接口拆解出网络包 (package)、 lazy reconnect
-
第二个 goroutine 调用 EventListener.OnMessage()
接口进行逻辑处理、发送网络字节流
3.4 Getty 压测
系统: CentOS Linux release 7 .5.1804 ( Core)
CPU:8核
内存:16 G
类型:2 台腾讯云虚机,一台运行 100 个客户端,另一台运行一个服务端
Go 版本:1 .15.6
服务访问方式:模拟局域网请求
网络参数:每个客户端发送 100000 个请求,每个请求 915 B
benmark result
(链接见参考 12)。这个压测当然没有压出 getty 的极限性能,但已经能够满足阿里主要场景下的使用需求。
4 发展 timeline
2016 年 6 月份开发出第一个生产可用版本,支持 TCP/websocket 两种通信协议,同年 10 月在 gocn 上发帖 https://gocn.vip/topics/8229 推广;
2017 年 9 月时,实现了一个 Go 语言 timer 时间轮库
timer wheel
https://github.com/AlexStocks/goext/blob/master/time/time.go2018 年 3 月在其上加入 UDP 通信支持;
2018 年 5 月实现了
lazy reconnect
;2018 年 5 月支持至于 protobuf 和 json 的 RPC;
2018 年 8 月加入基于 zookeeper 和 etcd 的服务注册和发现功能,取名 micro;
2019 年 5 月 getty 的底层 tcp 通信实现被独立拆出迁入 github.com/dubbogo,后迁入 github.com/apache/dubbo-getty;
2019 年 5 月 getty RPC 包被携程的两位同学迁入
[https://github.com/apache/dubbo-go/tree/master/protocol/dubbo](https://github.com/apache/dubbo-go/tree/master/protocol/dubbo)
, 构建了 dubbogo 基于 hessian2 协议的 RPC 层;2019 年 5 月,加入固定大小 goroutine pool;
2019 年底,刘晓敏同学告知其基于 getty 实现了 seata-golang;
2020 年 11 月,把网络发送与逻辑处理合并放入 gr pool 中处理;
2021 年 5 月,完成定时器优化;
优化
开头部分所说,getty 维护团队不追求无意义的 benchmark 数据,不做无意义的炫技式优化,只根据生产环境需求来进行自身改进。只要维护团队在,getty 稳定性和性能定会越来越优秀。
作者
于雨,github 账号 AlexStocks,dubbogo 社区负责人,一个有十一年服务端基础架构和中间件研发一线工作经验的程序员。陆续参与和改进过 Redis/Pika/Pika-Port/etcd/Muduo/Dubbo/dubbo-go/Sentinel-golang/Seata-golang 等知名项目,目前在蚂蚁集团可信原生技术部从事云原生工作。
郝洪范,github 账号 georgehao,Apache Dubbo Committer,getty 维护团队成员。目前在京东云零售部大数据平台,技术专家,熟练掌握 Go runtime 底层技术。
董剑辉,github 账号 Mulavar,2021 年 5 月份从浙江大学著名 VLIS 实验室研究生毕业。曾在蚂蚁集团杭州总部支付业务资金服务技术部实习,目前就职于美团大数据北京总部计算引擎组,从事 flink 相关开发工作。
参考
1 https://github.com/alexstocks/goext/blob/master/container/xorlist/xorlist.go
2 兼容tcp和websocket的一个简洁网络框架getty https://gocn.vip/topics/8229
3 字节跳动在 Go 网络库上的实践 https://juejin.cn/post/6844904153173458958
4 字节跳动 Go RPC 框架 KiteX 性能优化实践 https://mp.weixin.qq.com/s/Xoaoiotl7ZQoG2iXo9_DWg
5 2021年Go生态圈rpc框架benchmark https://colobu.com/2021/08/01/benchmark-of-rpc-frameworks/
6 分布式事务框架 seata-golang 通信模型 https://mp.weixin.qq.com/s/7xoshM4lzqHX9WqANbHAJQ
7 gr pool https://github.com/dubbogo/gost/blob/master/sync/task_pool.go
8 A Million WebSockets and Go https://www.freecodecamp.org/news/million-websockets-and-go-cc58418460bb/
9 task pool https://github.com/dubbogo/gost/blob/master/sync/base_worker_pool.go
10 timer wheel https://github.com/dubbogo/gost/blob/master/time/timer.go
11 getty benchmark https://github.com/apache/dubbo-getty/tree/master/benchmark
12 benmark result https://github.com/apache/dubbo-getty/pull/61#issuecomment-865698715
13 lazy connection https://github.com/AlexStocks/getty/blob/73b0928b957ab6b6773f6cfe95c909ae39001687/transport/client.go#L412
END
觉得不错,请点个在看呀