分布式事务框架 seata-golang 通信模型详解
简介
如何基于 getty 实现 RPC 通信
1. 建立连接
实现 RPC 通信,首先要建立网络连接,这里先从 开始看起。
func (c *client) connect() {var (err errorss Session)for {// 建立一个 session 连接ss = c.dial()if ss == nil {// client has been closedbreak}err = c.newSession(ss)if err == nil {// 收发报文ss.(*session).run()// 此处省略部分代码break}// don't distinguish between tcp connection and websocket connection. Because// gorilla/websocket/conn.go:(Conn)Close also invoke net.Conn.Close()ss.Conn().Close()}}
connect() 方法通过 dial() 方法得到了一个 session 连接,进入 dial() 方法:
func (c *client) dial() Session {switch c.endPointType {case TCP_CLIENT:return c.dialTCP()case UDP_CLIENT:return c.dialUDP()case WS_CLIENT:return c.dialWS()case WSS_CLIENT:return c.dialWSS()}return nil}
c.dialTCP() 方法:
func (c *client) dialTCP() Session {var (err errorconn net.Conn)for {if c.IsClosed() {return nil}if c.sslEnabled {if sslConfig, err := c.tlsConfigBuilder.BuildTlsConfig(); err == nil && sslConfig != nil {d := &net.Dialer{Timeout: connectTimeout}// 建立加密连接conn, err = tls.DialWithDialer(d, "tcp", c.addr, sslConfig)}} else {// 建立 tcp 连接conn, err = net.DialTimeout("tcp", c.addr, connectTimeout)}if err == nil && gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {conn.Close()err = errSelfConnect}if err == nil {// 返回一个 TCPSessionreturn newTCPSession(conn, c)}log.Infof("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, connectTimeout, perrors.WithStack(err))<-wheel.After(connectInterval)}}
2. 收发报文
ss.(*session).run(),在这行代码之后,代码都是很简单的操作,我们猜测这行代码运行的逻辑里面一定包含收发报文的逻辑,接着进入 ru
n() 方法:
func (s *session) run() {// 省略部分代码go s.handleLoop()go s.handlePackage()}
handleLoop 和 handlePackage,看字面意思符合我们的猜想,进入 handleLoop() 方法:
func (s *session) handleLoop() {// 省略部分代码for {// A select blocks until one of its cases is ready to run.// It choose one at random if multiple are ready. Otherwise it choose default branch if none is ready.select {// 省略部分代码case outPkg, ok = <-s.wQ:// 省略部分代码iovec = iovec[:0]for idx := 0; idx < maxIovecNum; idx++ {// 通过 s.writer 将 interface{} 类型的 outPkg 编码成二进制的比特pkgBytes, err = s.writer.Write(s, outPkg)// 省略部分代码iovec = append(iovec, pkgBytes)//省略部分代码}// 将这些二进制比特发送出去err = s.WriteBytesArray(iovec[:]...)if err != nil {log.Errorf("%s, [session.handleLoop]s.WriteBytesArray(iovec len:%d) = error:%+v",s.sessionToken(), len(iovec), perrors.WithStack(err))s.stop()// break LOOPflag = false}case <-wheel.After(s.period):if flag {if wsFlag {err := wsConn.writePing()if err != nil {log.Warnf("wsConn.writePing() = error:%+v", perrors.WithStack(err))}}// 定时执行的逻辑,心跳等s.listener.OnCron(s)}}}}
handleLoop() 方法处理的是发送报文的逻辑,RPC 需要发送的消息首先由 s.writer 编码成二进制比特,然后通过建立的 TCP 连接发送出去。这个 s.writer 对应的 Writer 接口是 RPC 框架必须要实现的一个接口。
handlePackage() 方法:
func (s *session) handlePackage() {// 省略部分代码if _, ok := s.Connection.(*gettyTCPConn); ok {if s.reader == nil {errStr := fmt.Sprintf("session{name:%s, conn:%#v, reader:%#v}", s.name, s.Connection, s.reader)log.Error(errStr)panic(errStr)}err = s.handleTCPPackage()} else if _, ok := s.Connection.(*gettyWSConn); ok {err = s.handleWSPackage()} else if _, ok := s.Connection.(*gettyUDPConn); ok {err = s.handleUDPPackage()} else {panic(fmt.Sprintf("unknown type session{%#v}", s))}}
handleTCPPackage() 方法:
func (s *session) handleTCPPackage() error {// 省略部分代码conn = s.Connection.(*gettyTCPConn)for {// 省略部分代码bufLen = 0for {// for clause for the network timeout condition check// s.conn.SetReadTimeout(time.Now().Add(s.rTimeout))// 从 TCP 连接中收到报文bufLen, err = conn.recv(buf)// 省略部分代码break}// 省略部分代码// 将收到的报文二进制比特写入 pkgBufpktBuf.Write(buf[:bufLen])for {if pktBuf.Len() <= 0 {break}// 通过 s.reader 将收到的报文解码成 RPC 消息pkg, pkgLen, err = s.reader.Read(s, pktBuf.Bytes())// 省略部分代码s.UpdateActive()// 将收到的消息放入 TaskQueue 供 RPC 消费端消费s.addTask(pkg)pktBuf.Next(pkgLen)// continue to handle case 5}if exit {break}}return perrors.WithStack(err)}
3. 底层处理网络报文的逻辑如何与业务逻辑解耦
handlePackage() 方法最后,我们看到,收到的消息被放入了 s.addTask(pkg) 这个方法,接着往下分析:
func (s *session) addTask(pkg interface{}) {f := func() {s.listener.OnMessage(s, pkg)s.incReadPkgNum()}if taskPool := s.EndPoint().GetTaskPool(); taskPool != nil {taskPool.AddTaskAlways(f)return}f()}
pkg 参数传递到了一个匿名方法,这个方法最终放入了 taskPool。这个方法很关键,在我后来写 seata-golang 代码
的时候,就遇到了一个坑,这个坑后面分析。
接着我们看一下 的定义:
// NewTaskPoolSimple build a simple task poolfunc NewTaskPoolSimple(size int) GenericTaskPool {if size < 1 {size = runtime.NumCPU() * 100}return &taskPoolSimple{work: make(chan task),sem: make(chan struct{}, size),done: make(chan struct{}),}}
runtime.NumCPU() * 100) 的 channel sem。再看方法 AddTaskAlways(t task):
func (p *taskPoolSimple) AddTaskAlways(t task) {select {case <-p.done:returndefault:}select {case p.work <- t:returndefault:}select {case p.work <- t:case p.sem <- struct{}{}:p.wg.Add(1)go p.worker(t)default:goSafely(t)}}
4. 具体实现
下面的代码见:
// Reader is used to unmarshal a complete pkg from buffertype Reader interface {Read(Session, []byte) (interface{}, int, error)}// Writer is used to marshal pkg and write to sessiontype Writer interface {// if @Session is udpGettySession, the second parameter is UDPContext.Write(Session, interface{}) ([]byte, error)}// ReadWriter interface use for handle application packagestype ReadWriter interface {ReaderWriter}
// EventListener is used to process pkg that received from remote sessiontype EventListener interface {// invoked when session opened// If the return error is not nil, @Session will be closed.OnOpen(Session) error// invoked when session closed.OnClose(Session)// invoked when got error.OnError(Session, error)// invoked periodically, its period can be set by (Session)SetCronPeriodOnCron(Session)// invoked when getty received a package. Pls attention that do not handle long time// logic processing in this func. You'd better set the package's maximum length.// If the message's length is greater than it, u should should return err in// Reader{Read} and getty will close this connection soon.//// If ur logic processing in this func will take a long time, u should start a goroutine// pool(like working thread pool in cpp) to handle the processing asynchronously. Or u// can do the logic processing in other asynchronous way.// !!!In short, ur OnMessage callback func should return asap.//// If this is a udp event listener, the second parameter type is UDPContext.OnMessage(Session, interface{})}
ReadWriter 来对 RPC 消息编解码,再实现 EventListener 来处理 RPC 消息的对应的具体逻辑,将 ReadWriter 实现和 EventLister 实现注入到 R
PC 的 Client 和 Server 端,则可实现 RPC 通信。
1)编解码协议实现
// 消息编码为二进制比特func MessageEncoder(codecType byte, in interface{}) []byte {switch codecType {case SEATA:return SeataEncoder(in)default:log.Errorf("not support codecType, %s", codecType)return nil}}// 二进制比特解码为消息体func MessageDecoder(codecType byte, in []byte) (interface{}, int) {switch codecType {case SEATA:return SeataDecoder(in)default:log.Errorf("not support codecType, %s", codecType)return nil, 0}}
2)Client 端实现
再来看 client 端 EventListener 的实现:
func (client *RpcRemoteClient) OnOpen(session getty.Session) error {go func()request := protocal.RegisterTMRequest{AbstractIdentifyRequest: protocal.AbstractIdentifyRequest{ApplicationId: client.conf.ApplicationId,TransactionServiceGroup: client.conf.TransactionServiceGroup,}}// 建立连接后向 Transaction Coordinator 发起注册 TransactionManager 的请求_, err := client.sendAsyncRequestWithResponse(session, request, RPC_REQUEST_TIMEOUT)if err == nil {// 将与 Transaction Coordinator 建立的连接保存在连接池供后续使用clientSessionManager.RegisterGettySession(session)client.GettySessionOnOpenChannel <- session.RemoteAddr()}}()return nil}// OnError ...func (client *RpcRemoteClient) OnError(session getty.Session, err error) {clientSessionManager.ReleaseGettySession(session)}// OnClose ...func (client *RpcRemoteClient) OnClose(session getty.Session) {clientSessionManager.ReleaseGettySession(session)}// OnMessage ...func (client *RpcRemoteClient) OnMessage(session getty.Session, pkg interface{}) {log.Info("received message:{%v}", pkg)rpcMessage, ok := pkg.(protocal.RpcMessage)if ok {heartBeat, isHeartBeat := rpcMessage.Body.(protocal.HeartBeatMessage)if isHeartBeat && heartBeat == protocal.HeartBeatMessagePong {log.Debugf("received PONG from %s", session.RemoteAddr())}}if rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST ||rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST_ONEWAY {log.Debugf("msgId:%s, body:%v", rpcMessage.Id, rpcMessage.Body)// 处理事务消息,提交 or 回滚client.onMessage(rpcMessage, session.RemoteAddr())} else {resp, loaded := client.futures.Load(rpcMessage.Id)if loaded {response := resp.(*getty2.MessageFuture)response.Response = rpcMessage.Bodyresponse.Done <- trueclient.futures.Delete(rpcMessage.Id)}}}// OnCron ...func (client *RpcRemoteClient) OnCron(session getty.Session) {// 发送心跳client.defaultSendRequest(session, protocal.HeartBeatMessagePing)}
clientSessionManager.RegisterGettySession(session) 的
逻辑将在下文中分析。
3)Server 端 Transaction Coordinator 实现
代码见:
func (coordinator *DefaultCoordinator) OnOpen(session getty.Session) error {log.Infof("got getty_session:%s", session.Stat())return nil}func (coordinator *DefaultCoordinator) OnError(session getty.Session, err error) {// 释放 TCP 连接SessionManager.ReleaseGettySession(session)session.Close()log.Errorf("getty_session{%s} got error{%v}, will be closed.", session.Stat(), err)}func (coordinator *DefaultCoordinator) OnClose(session getty.Session) {log.Info("getty_session{%s} is closing......", session.Stat())}func (coordinator *DefaultCoordinator) OnMessage(session getty.Session, pkg interface{}) {log.Debugf("received message:{%v}", pkg)rpcMessage, ok := pkg.(protocal.RpcMessage)if ok {_, isRegTM := rpcMessage.Body.(protocal.RegisterTMRequest)if isRegTM {// 将 TransactionManager 信息和 TCP 连接建立映射关系coordinator.OnRegTmMessage(rpcMessage, session)return}heartBeat, isHeartBeat := rpcMessage.Body.(protocal.HeartBeatMessage)if isHeartBeat && heartBeat == protocal.HeartBeatMessagePing {coordinator.OnCheckMessage(rpcMessage, session)return}if rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST ||rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST_ONEWAY {log.Debugf("msgId:%s, body:%v", rpcMessage.Id, rpcMessage.Body)_, isRegRM := rpcMessage.Body.(protocal.RegisterRMRequest)if isRegRM {// 将 ResourceManager 信息和 TCP 连接建立映射关系coordinator.OnRegRmMessage(rpcMessage, session)} else {if SessionManager.IsRegistered(session) {defer func() {if err := recover(); err != nil {log.Errorf("Catch Exception while do RPC, request: %v,err: %w", rpcMessage, err)}}()// 处理事务消息,全局事务注册、分支事务注册、分支事务提交、全局事务回滚等coordinator.OnTrxMessage(rpcMessage, session)} else {session.Close()log.Infof("close a unhandled connection! [%v]", session)}}} else {resp, loaded := coordinator.futures.Load(rpcMessage.Id)if loaded {response := resp.(*getty2.MessageFuture)response.Response = rpcMessage.Bodyresponse.Done <- truecoordinator.futures.Delete(rpcMessage.Id)}}}}func (coordinator *DefaultCoordinator) OnCron(session getty.Session) {}
coordinator.OnRegTmMessage(rpcMessage, session) 注册 Transaction Manager,coordinator.OnRegRmMessage(rpcMessage, session) 注册 Resource Manager。具体逻辑分析见下文。
coordinator.OnTrxMessage(rpcMessage, session)
方法,将按照消息的类型码路由到具体的逻辑当中:
switch msg.GetTypeCode() {case protocal.TypeGlobalBegin:req := msg.(protocal.GlobalBeginRequest)resp := coordinator.doGlobalBegin(req, ctx)return respcase protocal.TypeGlobalStatus:req := msg.(protocal.GlobalStatusRequest)resp := coordinator.doGlobalStatus(req, ctx)return respcase protocal.TypeGlobalReport:req := msg.(protocal.GlobalReportRequest)resp := coordinator.doGlobalReport(req, ctx)return respcase protocal.TypeGlobalCommit:req := msg.(protocal.GlobalCommitRequest)resp := coordinator.doGlobalCommit(req, ctx)return respcase protocal.TypeGlobalRollback:req := msg.(protocal.GlobalRollbackRequest)resp := coordinator.doGlobalRollback(req, ctx)return respcase protocal.TypeBranchRegister:req := msg.(protocal.BranchRegisterRequest)resp := coordinator.doBranchRegister(req, ctx)return respcase protocal.TypeBranchStatusReport:req := msg.(protocal.BranchReportRequest)resp := coordinator.doBranchReport(req, ctx)return respdefault:return nil}
4)session manager 分析
clientSessionManager.RegisterGettySession(session) 将连接保存在 serverSessions = sync.Map{} 这个 map 中。map 的 key 为从 session 中获取的 RemoteAddress 即 Transa
ction Coordinator 的地址,value 为 session。这样,Client 端就可以通过 map 中的一个 session 来向 Transaction Coordinator 注册 Transaction Manager 和 Resource Manager 了。具体代码见
。
type RpcContext struct {Version stringTransactionServiceGroup stringClientRole meta.TransactionRoleApplicationId stringClientId stringResourceSets *model.SetSession getty.Session}
var (// session -> transactionRole// TM will register before RM, if a session is not the TM registered,// it will be the RM registeredsession_transactionroles = sync.Map{}// session -> applicationIdidentified_sessions = sync.Map{}// applicationId -> ip -> port -> sessionclient_sessions = sync.Map{}// applicationId -> resourceIdsclient_resources = sync.Map{})
coordinator.OnRegTmMessage(rpcMessage, session) 和 coordinator.OnRegRmMessage(rpcMessage, session) 注册到 Transaction Coordinator 时,会在上述 client_sessions map 中缓存 applicationId、ip、port 与 session 的关系,在
client_resources map 中缓存 applicationId 与 resourceIds(一个应用可能存在多个 Resource Manager) 的关系。
至此,我们就分析完了 整个 RPC 通信模型的机制。
seata-golang 的未来
参考资料
seata 官方:
https://seata.io
java 版 seata:
https://github.com/seata/seata
https://github.com/opentrx/seata-golang
seata-golang go 夜读 b 站分享:
https://www.bilibili.com/video/BV1oz411e72T
