vlambda博客
学习文章列表

10 分钟入门 Python Rpc 实现

Python编程与实战
人生苦短,我用Python。主要分享项目实战经验,包括但不限于web开发,爬虫以及数据可视化等。
77篇原创内容
Official Account

XML-RPC 是一种远程过程调用方法,它使用通过 HTTP 传递的 XML 作为载体。有了它,客户端可以在远程服务器上调用带参数的服务器方法(服务器以 URI 命名)并获取结构化的数据。python自带xmlrpc实现,学习xmlrpc,可以让我们快速了解rpc的实现及原理,本文包括下面几个部分:

  • xmlrpc演示
  • xmlrpc-API
  • xmlrpc-server实现
  • xmlrpc-client实现
  • xmlrpc序列化/反序列化
  • 小结

xmlrpc演示

xmlrpc可以直接运行,启动rpc服务:

# python3 -m xmlrpc.server
Serving XML-RPC on localhost port 8000
It is advisable to run this example server within a secure, closed network.
127.0.0.1 - - [05/May/2021 18:03:16] "POST /RPC2 HTTP/1.1" 200 -
127.0.0.1 - - [05/May/2021 18:03:16] "POST /RPC2 HTTP/1.1" 200 -

启动rpc客户端:

# python3 -m xmlrpc.client
20210505T18:03:16
42
512
3

从服务端可以看到2个http请求。我们先看抓包得到的第一个http请求报文:

METHOD: POST
URL: http://localhost:8000/RPC2
HEADERS
    accept-encoding: gzip
    content-length: 120
    content-type: text/xml
host: localhost:8000
user-agent: Python-xmlrpc/3.8

请求的数据是:

<?xml version='1.0'?>
  <methodCall>
    <methodName>
      currentTime.getCurrentTime
    </methodName>
    <params>
    </params>
  </methodCall>

http响应报文:

STATUS: 200 OK
HEADERS
    content-length: 163
    content-type: text/xml
    date: Wed, 05 May 2021 09:40:57 GMT
server: BaseHTTP/0.6 Python/3.8.5

响应的数据是:

<?xml version='1.0'?>
  <methodResponse>
    <params>
      <param>
        <value>
          <dateTime.iso8601>
            20210505T18:03:16
          </dateTime.iso8601>
        </value>
      </param>
    </params>
  </methodResponse>

第二个请求的数据报文我就不贴出来了,下面是请求xml:

<?xml version='1.0'?>
  <methodCall>
    <methodName>
      system.multicall
    </methodName>
    <params>
      <param>
        <value>
          <array>
            <data>
              <value>
                <struct>
                  <member>
                    <name>
                      methodName
                    </name>
                    <value>
                      <string>
                        getData
                      </string>
                    </value>
                  </member>
                  <member>
                    <name>
                      params
                    </name>
                    <value>
                      <array>
                        <data>
                        </data>
                      </array>
                    </value>
                  </member>
                </struct>
              </value>
              <value>
                <struct>
                  <member>
                    <name>
                      methodName
                    </name>
                    <value>
                      <string>
                        pow
                      </string>
                    </value>
                  </member>
                  <member>
                    <name>
                      params
                    </name>
                    <value>
                      <array>
                        <data>
                          <value>
                            <int>
                              2
                            </int>
                          </value>
                          <value>
                            <int>
                              9
                            </int>
                          </value>
                        </data>
                      </array>
                    </value>
                  </member>
                </struct>
              </value>
              <value>
                <struct>
                  <member>
                    <name>
                      methodName
                    </name>
                    <value>
                      <string>
                        add
                      </string>
                    </value>
                  </member>
                  <member>
                    <name>
                      params
                    </name>
                    <value>
                      <array>
                        <data>
                          <value>
                            <int>
                              1
                            </int>
                          </value>
                          <value>
                            <int>
                              2
                            </int>
                          </value>
                        </data>
                      </array>
                    </value>
                  </member>
                </struct>
              </value>
            </data>
          </array>
        </value>
      </param>
    </params>
  </methodCall>

下面是响应xml:

<?xml version='1.0'?>
  <methodResponse>
    <params>
      <param>
        <value>
          <array>
            <data>
              <value>
                <array>
                  <data>
                    <value>
                      <string>
                        42
                      </string>
                    </value>
                  </data>
                </array>
              </value>
              <value>
                <array>
                  <data>
                    <value>
                      <int>
                        512
                      </int>
                    </value>
                  </data>
                </array>
              </value>
              <value>
                <array>
                  <data>
                    <value>
                      <int>
                        3
                      </int>
                    </value>
                  </data>
                </array>
              </value>
            </data>
          </array>
        </value>
      </param>
    </params>
  </methodResponse>

注: 为了完整展示xmlrpc数据,所以我贴了xml全文,导致内容有点长。

从演示可以看到xmlrpc下面几个特点:

  • 使用http协议进行数据传输。使用 POST 方法,url是 RPC2,content-type是 text/xml
  • 使用xml对请求/响应进行编码。请求使用 methodCall 标签, 响应使用 methodResponse 标签。
  • 使用xml数据层层嵌套,冗余较多,看起来非常烦琐 (这应该是xmlrpc没流行起来的原因之一)。

xmlrpc-API

继续查看xmlrpc的API使用,服务端代码:

class ExampleService:
    def getData(self):
        return '42'

    class currentTime:
        @staticmethod
        def getCurrentTime():
            return datetime.datetime.now()

with SimpleXMLRPCServer(("localhost", 8000)) as server:
    server.register_function(pow)
    server.register_function(lambda x,y: x+y, 'add')
    server.register_instance(ExampleService(), allow_dotted_names=True)
    server.register_multicall_functions()
    print('Serving XML-RPC on localhost port 8000')
    print('It is advisable to run this example server within a secure, closed network.')
    try:
        server.serve_forever()
    except KeyboardInterrupt:
        print("\nKeyboard interrupt received, exiting.")
        sys.exit(0)

服务端做了下面几件事:

  • 在8000端口创建SimpleXMLRPCServer的实例server
  • 向server注册 pow和名为 add的lambda函数接口
  • 向server注册服务实例instance(可能叫app更合适),instance带有2个函数实现: getDatacurrentTime.getCurrentTime
  • 向server注册system.multicall实现,这个实现支持多个rpc请求合并使用一个http请求
  • 启动server

客户端是这样使用的:

server = ServerProxy("http://localhost:8000")

print(server.currentTime.getCurrentTime())

multi = MultiCall(server)
multi.getData()
multi.pow(2,9)
multi.add(1,2)
try:
    for response in multi():
        print(response)
except Error as v:
    print("ERROR", v)
  • 创建了一个服务代理(可以理解为rpc-client)
  • 调用服务端的实现 currentTime.getCurrentTime
  • 使用MultiCall的方式调用 getData, powadd 三个rpc接口
  • 发送multicall请求,并循环打印服务调用结果

可以很明显的对比出xmlrpc服务和http服务的不同:

  • rpc服务接口都是普通的函数,比如pow,getData和getCurrentTime;这些接口和http的request和response是隔离的
  • 客户端需要额外实现,并不是直接发送的http请求

同时大家对 RPC(remote procedure call) 应该也有直观了解,简单的解释就是远程函数调用。所谓远程:跨机器是远程,我们这里的跨进程也是远程。至于如何实现远程函数调用,就是各个RPC框架的功能了,今天我们先看看xmlrpc的实现。

xmlrpc-server的实现

服务端http协议实现

xmlrpc中http协议由SimpleXMLRPCServer和SimpleXMLRPCRequestHandler实现:

class SimpleXMLRPCServer(socketserver.TCPServer,
                         SimpleXMLRPCDispatcher):

    allow_reuse_address = True

    _send_traceback_header = False

    def __init__(self, addr, requestHandler=SimpleXMLRPCRequestHandler,
                 logRequests=True, allow_none=False, encoding=None,
                 bind_and_activate=True, use_builtin_types=False):
        self.logRequests = logRequests

        SimpleXMLRPCDispatcher.__init__(self, allow_none, encoding, use_builtin_types)
        socketserver.TCPServer.__init__(self, addr, requestHandler, bind_and_activate)

SimpleXMLRPCServer的父类TCPServer在之前的博文中有介绍,提供tcp服务的实现。SimpleXMLRPCRequestHandler负责http协议部分的实现,而xmlrpc规范是必须使用POST请求到 /RPC2 重点就在 do_POST 方法:

class SimpleXMLRPCRequestHandler(BaseHTTPRequestHandler):
    # rpc-url
    rpc_paths = ('/''/RPC2')
    
    def do_POST(self):
        ...
        max_chunk_size = 10*1024*1024
        size_remaining = int(self.headers["content-length"])
        L = []
        while size_remaining:
            chunk_size = min(size_remaining, max_chunk_size)
            chunk = self.rfile.read(chunk_size)
            if not chunk:
                break
            L.append(chunk)
            size_remaining -= len(L[-1])
        data = b''.join(L)
        ...
        response = self.server._marshaled_dispatch(
                    data, getattr(self, '_dispatch', None), self.path
                )
        ...
        self.send_response(200)
        self.send_header("Content-type""text/xml")
        self.send_header("Content-length", str(len(response)))
        self.end_headers()
        self.wfile.write(response)

do_POST方法分三段:

  • 从http请求上读取请求数据,数据长度由 content-length 决定
  • 使用server的_marshaled_dispatch方法调用rpc接口
  • 将接口返回值包装成http响应返回

服务端rpc协议实现

SimpleXMLRPCServer的另外一个父类SimpleXMLRPCDispatcher提供了rpc协议的实现:

class SimpleXMLRPCDispatcher:
    def __init__(self, allow_none=False, encoding=None,
                 use_builtin_types=False):
        # 接口函数字典
        self.funcs = {}
        # 服务实例 (app)
        self.instance = None
        self.allow_none = allow_none
        self.encoding = encoding or 'utf-8'
        self.use_builtin_types = use_builtin_types
    
    def register_instance(self, instance, allow_dotted_names=False):
        # 注册服务实例对象
        self.instance = instance
        self.allow_dotted_names = allow_dotted_names

    def register_function(self, function=None, name=None):
        # 注册接口方法
        if name is None:
            name = function.__name__
        self.funcs[name] = function
        return function
    
    def register_multicall_functions(self):
        """Registers the XML-RPC multicall method in the system
        namespace."
""
        # 复合调用
        self.funcs.update({'system.multicall' : self.system_multicall})

instace和function的注册比较简单,我们可以跳过实现会略微复杂一点的multical,先看看注册的接口如何在_marshaled_dispatch中调用:

def _marshaled_dispatch(self, data, dispatch_method = None, path = None):

    try:
        # 解析rpc接口和参数
        params, method = loads(data, use_builtin_types=self.use_builtin_types)

        # generate response
        response = self._dispatch(method, params)
        # wrap response in a singleton tuple
        response = (response,)
        # 生成xml响应
        response = dumps(response, methodresponse=1,
                         allow_none=self.allow_none, encoding=self.encoding)
    except Fault as fault:
        ...
    except:
        ...
    return response.encode(self.encoding, 'xmlcharrefreplace')

def _dispatch(self, method, params):
    try:
        # call the matching registered function
        # 查找接口
        func = self.funcs[method]
    except KeyError:
        pass
    else:
        if func is not None:
            # 执行接口
            return func(*params)
        ...

    if self.instance is not None:
        if hasattr(self.instance, '_dispatch'):
            # call the `_dispatch` method on the instance
            return self.instance._dispatch(method, params)

        # call the instance's method directly
        try:
            func = resolve_dotted_attribute(
                self.instance,
                method,
                self.allow_dotted_names
            )
        except AttributeError:
            pass
        else:
            if func is not None:
                return func(*params)
    ...

代码比较长,主要做了2件事:

  • 从请求中解析 params 和 method
  • 根据method从func或者instance中调用方法并返回

服务端multi-call实现

了解 single-call 后,再回头看 multi-call 的实现,就比较容易。注册接口:

def register_multicall_functions(self):
    self.funcs.update({'system.multicall' : self.system_multicall})

funcs字典中会增加一个名为 system.multicall ,处理函数为system_multicall的调用:

def system_multicall(self, call_list):
    """system.multicall([{'methodName': 'add', 'params': [2, 2]}, ...]) => \
[[4], ...]

    Allows the caller to package multiple XML-RPC calls into a single
    request.

    See http://www.xmlrpc.com/discuss/msgReader$1208
    "
""
    results = []
    # 顺序执行多个call
    for call in call_list:
        method_name = call['methodName']
        params = call['params']
        ...
        results.append([self._dispatch(method_name, params)])
        ...
    return results

system_multicall和注释介绍一样,就是从请求中接受多个请求,然后逐一调用执行。system.multicall的调用数据示例:

<methodName>
  system.multicall
</methodName>
<params>
    ...
              <member>
                <name>
                  methodName
                </name>
                <value>
                  <string>
                    getData
                  </string>
                </value>
              </member>
    ...
<params>

xmlrpc-client实现

客户端http协议实现

客户端也需要实现http协议,主要在ServerProxy和Transport中(SafeTransport实现https)。ServerProxy包装Transport对象:

class ServerProxy:
    def __init__(self, uri, transport=None, encoding=None, verbose=False,
                 allow_none=False, use_datetime=False, use_builtin_types=False,
                 *, headers=(), context=None):
        # get the url
        type, uri = urllib.parse._splittype(uri)

        self.__host, self.__handler = urllib.parse._splithost(uri)
        ..
        handler = Transport
        extra_kwargs = {}
        transport = handler(use_datetime=use_datetime,
                                use_builtin_types=use_builtin_types,
                                headers=headers,
                                **extra_kwargs)
        self.__transport = transport
        ...
    
    def __request(self, methodname, params):
        # call a method on the remote server
        # 接口调用转为xml请求数据
        request = dumps(params, methodname, encoding=self.__encoding,
                        allow_none=self.__allow_none).encode(self.__encoding, 'xmlcharrefreplace')

        response = self.__transport.request(
            self.__host,
            self.__handler,
            request,
            verbose=self.__verbose
            )

        return response

Transport实现http细节:

class Transport:
    """Handles an HTTP transaction to an XML-RPC server."""
    def __init__(self, use_datetime=False, use_builtin_types=False,
                 *, headers=()):
        self._use_datetime = use_datetime
        self._use_builtin_types = use_builtin_types
        self._connection = (None, None)
        self._headers = list(headers)
        self._extra_headers = []
    
    def request(self, host, handler, request_body, verbose=False):
        http_conn = self.send_request(host, handler, request_body, verbose)
        resp = http_conn.getresponse()
        if resp.status == 200:
            self.verbose = verbose
            return self.parse_response(resp)
    
    def send_request(self, host, handler, request_body, debug):
        connection = self.make_connection(host)
        headers = self._headers + self._extra_headers
        ...
        connection.putrequest("POST", handler)
        headers.append(("Content-Type""text/xml"))
        headers.append(("User-Agent", self.user_agent))
        self.send_headers(connection, headers)
        self.send_content(connection, request_body)
        return connection
    
    def make_connection(self, host):
        if self._connection and host == self._connection[0]:
            return self._connection[1]
        # create a HTTP connection object from a host descriptor
        chost, self._extra_headers, x509 = self.get_host_info(host)
        self._connection = host, http.client.HTTPConnection(chost)
        return self._connection[1]
    
    def parse_response(self, response):
        stream = response
        p, u = self.getparser()
        while 1:
            data = stream.read(1024)
            if not data:
                break
            if self.verbose:
                print("body:", repr(data))
            p.feed(data)

        if stream is not response:
            stream.close()
        p.close()

        return u.close()
  • 使用http.client创建http连接
  • 使用send_request发送http请求
  • 使用parse_response解析http请求

客户端rpc协议实现

在http协议上使用_Method包装请求:

class ServerProxy:
    def __getattr__(self, name):
        # magic method dispatcher
        return _Method(self.__request, name)
        
class _Method:
    # some magic to bind an XML-RPC method to an RPC server.
    # supports "nested" methods (e.g. examples.getStateName)
    def __init__(self, send, name):
        self.__send = send
        self.__name = name
    def __getattr__(self, name):
        return _Method(self.__send, "%s.%s" % (self.__name, name))
    def __call__(self, *args):
        return self.__send(self.__name, args)

可以使用 server.currentTime.getCurrentTime() 发送请求,这是一个链式调用。server.currentTime会调用 ServerProxy.__getattr__ 得到一个_Method对象;继续调用getCurrentTime会执行 _Method.__getattr__ 又得到一个_Method对象,最后使用 getCurrentTime() 执行这个method对象的call方法,会使用ServerProxy的call方法将请求发送出去。

客户端multi-call实现

了解客户端 single-call 实现后,继续查看 multi-call,主要涉及下面3个类:

class _MultiCallMethod:

    def __init__(self, call_list, name):
        self.__call_list = call_list
        self.__name = name
    def __getattr__(self, name):
        return _MultiCallMethod(self.__call_list, "%s.%s" % (self.__name, name))
    def __call__(self, *args):
        self.__call_list.append((self.__name, args))  # 添加一个call

class MultiCallIterator:

    def __init__(self, results):
        self.results = results

    def __getitem__(self, i):
        item = self.results[i]
        if type(item) == type({}):
            raise Fault(item['faultCode'], item['faultString'])
        elif type(item) == type([]):
            return item[0]
        else:
            raise ValueError("unexpected type in multicall result")

class MultiCall:

    def __init__(self, server):
        self.__server = server
        self.__call_list = []

    def __getattr__(self, name):
        return _MultiCallMethod(self.__call_list, name)

    def __call__(self):
        marshalled_list = []
        for name, args in self.__call_list:
            marshalled_list.append({'methodName' : name, 'params' : args})
        # 最后执行system.multical
        return MultiCallIterator(self.__server.system.multicall(marshalled_list))

代码行数比较多,和Method一样都是使用python的魔法函数:call, getattr和getitem, 可以对比调用示例体会:

multi = MultiCall(server)
multi.getData()
multi.pow(2,9)
multi.add(1,2)
for response in multi():
    print(response)

xmlrpc序列化/反序列化

rpc服务需要跨网络传输,server和client之间的数据还需要进行序列化/反序列化。主要由Marshaller和Unmarshaller两个类实现:

# client.py

class Marshaller:
    ...

class Unmarshaller:
    ...

xmlrpc支持下面9种数据类型:

  • array
  • base64
  • boolean
  • date/time
  • double
  • integer
  • string
  • struct
  • nil

一些数据类型的,比如double和nil在python中是不存在的。这2种数据的编/解码如下:

class Marshaller:
    
    def dump_double(self, value, write):
        write("<value><double>")
        write(repr(value))
        write("</double></value>\n")
    dispatch[float] = dump_double
    
    def dump_nil (self, value, write):
        if not self.allow_none:
            raise TypeError("cannot marshal None unless allow_none is enabled")
        write("<value><nil/></value>")
    dispatch[type(None)] = dump_nil
    

class Unmarshaller:
    
    def end_double(self, data):
        self.append(float(data)) # float
        self._value = 0
    dispatch["double"] = end_double
    dispatch["float"] = end_double
    
    def end_nil (self, data):
        self.append(None) # None
        self._value = 0
    dispatch["nil"] = end_nil

小结

xmlrpc不考虑tcp协议的情况下,主要是2层模型,底层是http协议,上层是xmlrpc协议。http协议负责网络传输;xmlrpc协议负责将rpc请求转换成xml数据,然后再反序列化成请求执行。

结构

参考链接

  • https://en.wikipedia.org/wiki/XML-RPC


Python编程与实战
人生苦短,我用Python。主要分享项目实战经验,包括但不限于web开发,爬虫以及数据可视化等。
77篇原创内容
Official Account