vlambda博客
学习文章列表

python进程线程:共享内存

10305

共享内存

1.内存共享

如上所述,在进行并发编程时,通常最好尽可能避免使用共享状态。使用多个进程时尤其如此。但是,如果你确实需要使用一些共享数据,那么 multiprocessing提供了一些方法.

在多进程的情况下,每个进程都有自己独立的内存空间,multiprocessing 提供了Value,Array 两个函数来共享内存(定义在sharedctypes.py)

Vaule

定义:

multiprocessing.Value(typecode_or_type, *args, lock=True)

返回从共享内存中分配的一个ctypes对象,其中typecode_or_type定义了返回的类型。它要么是一个ctypes类型,要么是一个代表ctypes类型的code。ctypes是Python的一个外部函数库,它提供了和C语言兼任的数据类型,可以调用DLLs或者共享库的函数,能被用作在python中包裹这些库。

*args 是传递给ctypes的构造函数:

对于共享整数或者单个字符 映射关系为:

Type Code C Type Python Type
'c' char character
'b' signed char int
'B' unsigned char int
'u' Py_UNICODE unicode character
'h' signed short int
'H' unsigned short int
'i' signed int int
'I' unsigned int int
'l' signed long int
'L' unsigned long int
'f' float float
'd' double float

比如整数1 ,可以表示为:Value('h', 1)

如果共享的是字符串,没有对应的Type code ,可以使用原始的ctype 类型:

ctypes type C type Python type
c_bool _Bool bool (1)
char char 1-character string
c_wchar wchar_t 1-character unicode string
c_byte char int/long
c_ubyte unsigned char int/long
c_short short int/long
c_ushort unsigned short int/long
c_int int int/long
c_uint unsigned in int/long
c_long long int/long
c_ulong unsigned long int/long
c_longlong __int64 or long long int/long
c_ulonglong unsigned __int64 or unsigned long long int/long
c_float float float
c_double double float
c_longdouble long double float
c_char_p char * (NUL terminated) string or None
c_wchar_p wchar_t * (NUL terminated) unicode or None
c_void_p void * int/long or None

比如上面的Value('h', 1) 也可以用Value(c_short, 1) ,字符串可以用Value(c_char_p, 'python') 来表示.

返回的对象基于SynchronizedBase 类,定义如下

class SynchronizedBase(object):

 def __init__(self, obj, lock=None):
     self._obj = obj
     self._lock = lock or RLock()
     self.acquire = self._lock.acquire
     self.release = self._lock.release

 def __reduce__(self):
     assert_spawning(self)
     return synchronized, (self._obj, self._lock)

 def get_obj(self):
     return self._obj

 def get_lock(self):
     return self._lock

 def __repr__(self):
     return '<%s wrapper for %s>' % (type(self).__name__, self._obj)

具有的属性和方法有:

  • value : 获取值
  • get_lock() : 获取锁对象
  • acquire/release :获取锁,释放锁

举例:

#!/usr/bin/evn python
# coding=utf-8

import time
import multiprocessing

def fun(val):
    for i in range(10):
        time.sleep(0.5)
        val.value += 1

v = multiprocessing.Value('i'0)
p_list = [multiprocessing.Process(target=fun, args=(v, )) for i in range(10)]
for p in p_list:
    p.start()

for p in p_list:
    p.join()

print(v.value)

上述代码是多个进程修改v值,我们期待它输出的是100,但是实际上并输出的并不是100,Value的构造函数默认的lock是True,它会创建一个锁对象用于同步访问控制,这就容易造成一个错误的意识,认为Value在多进程中是安全的,但实际上并不是,要想真正的控制同步访问,需要实现获取这个锁。所以需要修改fun()函数。如下:

def fun(val):
    for i in range(10):
        time.sleep(0.5)
        with val.get_lock():  # 必须获取锁
            val.value += 1

Array

定义:

multiprocessing.Array(*typecode_or_type*, *size_or_initializer*, ***, *lock=True*)

返回的是一个从共享内存分配的ctypes数组.

  • typecode_or_type :确定返回数组的元素的类型:它是一个ctypes类型或一个字符类型代码类型的 数组模块使用的类型
  • size_or_initializer:如果它是一个整数,那么它确定数组的长度,并且数组将被初始化为零。否则,size_or_initializer是用于初始化数组的序列,其长度决定数组的长度
  • 如果关键字参数中有lock的话,lock为True,则会创建一个新的锁对象,以同步对该值的访问。如果lock是Lock或RLock对象,那么它将用于同步对该值的访问。如果lock是False,那么对返回的对象的访问不会被锁自动保护,因此它不一定是“进程安全的”

Value,Array实例

#!/usr/bin/env python
# coding=utf-8

"""
使用数组的方式共享内存和全局对象
Value: 返回一个从共享内存中分配的对象
 Value.value : 得到Value中Ctype的值
 Value.get_lock: 获取 Value中的锁对象
 Vlaue.acquie/release 上锁/释放锁
Array: 返回一个从共享内存中分配的数组
"""


import os
from multiprocessing import Process, Array, Value

procs = 3
count = 0


def showdate(label, val, arr):
 """
 在这个进程中打印数据值
 """

 msg = '%-12s:pid:%4s, global:%s,value:%s,array:%s'
 print(msg % (label, os.getpid(), count, val.value, list(arr)))


def updater(val, arr):
 """通过共享内存进行通信"""
 global count
 count += 1  # 全局的基数器,非共享的
 # 传入的数据Value,Array是共享的
 val.value += 1
 for i in range(3):
     arr[i] += 1

if __name__ == '__main__':
 """
 Value, Array 返回的是一个内存中共享的Ctype对象
 i --> sigint --> int
 d ---> float,
 porcs 整数,确定了数组的长度,并且被初始化为 0

 因为这里是浮点类型,被初始化为0.0
 """

 scalar = Value('i'0)
 vector = Array('d', procs)

 # 在父进程中显示起始值
 showdate('parent start', scalar, vector)

 # 派生子进程, 传入共享内存
 p = Process(target=showdate, args=('child', scalar, vector))
 p.start()
 p.join()

 # 传入父进程中跟新过的共享内存,等待每次传入结束
 # 每个子进程看到了父进程中到现在为止对args的跟新(但是全局变量看不到)
 print('\nloop1(updates in parent, serial children)....')
 for i in range(procs):
     count += 1
     scalar.value += 1
     vector[i] += 1  # 数组中的每个元素加1
     p = Process(target=showdate, args=(
         ('porcess %s ' % i), scalar, vector))
     p.start()
     p.join()

 # 同上,不过允许子进程单行运行
 # 所有进程都看到了最近一次的迭代结果,因为他们都共享这个对象
 print('\nloop2(updates in parent, serial children)....')
 ps = []
 for i in range(procs):
     count += 1
     scalar.value += 1
     vector[i] += 1
     p = Process(target=showdate, args=(
         ('porcess %s ' % i), scalar, vector))
     p.start()
     ps.append(p)
 for p in ps:
     p.join()

 # 共享内存在派生子进程中进行跟新,等待每个跟新结束
 print('\nloop3(updates in parent, serial children)....')
 for i in range(procs):
     p = Process(target=updater, args=(scalar, vector))
     p.start()
     p.join()

 showdate('parent temp', scalar, vector)

 # 同上, 不过允许子进程并行的进行更新
 ps = []
 print('\nloop4(updates in parent, serial children)....')
 for i in range(procs):
     p = Process(target=updater, args=(scalar, vector))
     p.start()
     ps.append(p)
 for p in ps:
     p.join()
 # 仅在父进程中全局变量count = 6

 # 在此显示最终的结果
 showdate('parent end', scalar, vector)

显示结果:----> 理解这一个实例就够了

parent start:pid:27891, global:0,value:0,array:[0.0, 0.0, 0.0]
child       :pid:27892, global:0,value:0,array:[0.0, 0.0, 0.0]

loop1(updates in parent, serial children)....
porcess 0   :pid:27893, global:1,value:1,array:[1.0, 0.0, 0.0]
porcess 1   :pid:27894, global:2,value:2,array:[1.0, 1.0, 0.0]
porcess 2   :pid:27895, global:3,value:3,array:[1.0, 1.0, 1.0]

loop2(updates in parent, serial children)....
porcess 0   :pid:27896, global:4,value:6,array:[2.0, 2.0, 2.0]
porcess 1   :pid:27897, global:5,value:6,array:[2.0, 2.0, 2.0]
porcess 2   :pid:27898, global:6,value:6,array:[2.0, 2.0, 2.0]

loop3(updates in parent, serial children)....
parent temp :pid:27891, global:6,value:9,array:[5.0, 5.0, 5.0]

loop4(updates in parent, serial children)....
parent end  :pid:27891, global:6,value:12,array:[8.0, 8.0, 8.0]

2.再看Manger()

通过Manager()返回的一个manager对象控制一个服务器进程,它保持住Python对象并允许其它进程使用代理操作它们。同时它用起来很方便,而且支持本地和远程内存共享.

Manager()返回的manager支持的类型有list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event, Queue, Value和Array。定义在manager.py文件里.

manager的定义很简单如下

def Manager():
 '''
 Returns a manager associated with a running server process

 The managers methods such as `Lock()`, `Condition()` and `Queue()`
 can be used to create shared objects.
 '''

 from multiprocessing.managers import SyncManager
 m = SyncManager()
 m.start()
 return m

Manager()返回一个已经启动的SyncManager对象,管理器进程将在垃圾收集或其父进程退出时立即关闭。SyncManager继承自BaseManager。BaseManager的定义也在managers.py文件里,初始化如下:BaseManager([address[, authkey]])

  • address : 是管理器进程侦听新连接的地址。如果地址是无,则选择任意一个
  • authkey:是将用于检查到服务器进程的传入连接的有效性的认证密钥。如果authkey是None,那么使用当前进程current_process()的authkey;否则使用的authkey,它必须是字符串

BaseManager 对象的方法和属性有:

  • start([initializer [, initargs]]):启动子过程以启动管理器。如果初始化程序不是None,那么子程序在启动时会调用initializer(*initargs)为此管理器对象生成一个服务器进程
  • get_server() :    返回一个Server对象,它表示在Manager控制下的实际服务器
  • connect(): 将本地管理器对象连接到远程管理器进程
  • shutdown():停止管理器在使用的进程,这仅在用 start()已启动服务器进程时使用,可以被多次调用
  • register(typeid [,callable [,proxytype [,exposed [,method_to_typeid [,create_method]]]]]):可以用于向管理器类注册类型或可调用的类方法
  • typeid: 用于标识特定类型的共享对象的“类型标识符”。这必须是字符串
  • callable:用于为该类型标识符创建可调用的对象。如果将使用from_address()类方法创建管理器实例,或者如果create_method参数为False,那么这可以保留为None
  • method_to_typeid :一个映射,用于指定返回代理的那些公开方法的返回类型。它将方法名映射到typeid字符串
  • create_method :确定是否应该使用名称typeid创建一个方法,该方法可以用于告诉服务器进程创建一个新的共享对象并为其返回一个代理。默认情况下为True
  • **address:**管理器使用的地址
  • join(timeout=None):阻塞

现在可以来看看,SyncManager类的定义了

class SyncManager(BaseManager):
    '''
    Subclass of `BaseManager` which supports a number of shared object types.

    The types registered are those intended for the synchronization
    of threads, plus `dict`, `list` and `Namespace`.

    The `multiprocessing.Manager()` function creates started instances of
    this class.
    '''


SyncManager.register('Queue', queue.Queue)
SyncManager.register('JoinableQueue', queue.Queue)
SyncManager.register('Event', threading.Event, EventProxy)
SyncManager.register('Lock', threading.Lock, AcquirerProxy)
SyncManager.register('RLock', threading.RLock, AcquirerProxy)
SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
                     AcquirerProxy)
SyncManager.register('Condition', threading.Condition, ConditionProxy)
SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
SyncManager.register('Pool', pool.Pool, PoolProxy)
SyncManager.register('list', list, ListProxy)
SyncManager.register('dict', dict, DictProxy)
SyncManager.register('Value', Value, ValueProxy)
SyncManager.register('Array', Array, ArrayProxy)
SyncManager.register('Namespace', Namespace, NamespaceProxy)

# types returned by methods of PoolProxy
SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
SyncManager.register('AsyncResult', create_method=False)

进程的类的方法都是这个类的方法,都通过注册的方式注册到这里,而且它是继承自BaseManager,这意味着我们可以自己写类并注册到BaseManager

#!/usr/bin/env python
# coding=utf-8

from multiprocessing.managers import BaseManager


class MathsClass:

    def add(self, x, y):
        return x + y

    def mul(self, x, y):
        return x * y


class MyManager(BaseManager):
    pass

# 可以用于向管理器类注册类型或可调用的类方法
MyManager.register('Maths', MathsClass)

if __name__ == '__main__':
    # 实例化进程类
    manager = MyManager()
    # 为此管理器对象生成一个服务器进程
    manager.start()
    # 调用注册的方法
    maths = manager.Maths()
    print(maths.add(12))
    print(maths.mul(12))

下面看个简单的例子

#coding=utf-8
import multiprocessing
 
def fun(ns):
    ns.x.append(1)
    ns.y.append('x')
    
 
if __name__ == '__main__':
    manager = multiprocessing.Manager()
    ns = manager.Namespace()
    ns.x = []
    ns.y = []
    print "before",ns
    p = multiprocessing.Process(target=fun,args=(ns))
    p.start()
    p.join()
    print "after",ns

本程序的目的是想得到x=[1],y=['x'],但是没有得到,这是为什么呢?这是因为manager对象仅能传播一个可变对象本身所做的修改,如果一个manager.list()对象,管理列表本身的任何更改会传播到所有其他进程,但是如果容器对象内部还包括可修改对象,则内部可修改对象的任何更改都不会传播到其他进程。上面例子中,ns是一个容器,它本身的改变会传播到所有进程,但是它的内部对象x,y是可变对象,它们的改变不会传播到其他进程,所有没有得到我们所要的结果。可以作如下修改:

#coding=utf-8
import multiprocessing
 
def fun(ns,x,y):
    x.append(1)
    y.append('x')
    ns.x = x
    ns.y = y
 
 
if __name__ == '__main__':
    manager = multiprocessing.Manager()
    ns = manager.Namespace()
    ns.x = []
    ns.y = []
    print "before",ns
    p = multiprocessing.Process(target=fun,args=(ns,ns.x,ns.y,))
    p.start()
    p.join()
    print "after",ns
- END -