你的python 可重入分布式锁是怎么实现的
基本概念
分布式锁
分布式锁是控制分布式系统之间同步访问共享资源的一种方式。在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来防止彼此干扰来保证一致性,在这种情况下,便需要使用到分布式锁。
分布式锁一般有三种实现方式:1. 数据库乐观锁;2. 基于Redis的分布式锁;3. 基于ZooKeeper的分布式锁。本文主要介绍基于redis方式。
可重入
可重入锁指的是同一线程中可重复可递归调用的锁,在外层使用锁之后,在内层仍然可以使用,如果没有可重入锁的支持,在第二次尝试获得锁时将会进入死锁状态。
可重入锁的要点是对于同一线程可以多次获取锁,不同线程之间同一把锁不能重复获取,因此理论上需要保存锁拥有者的线程标识。在Java和python中都可以用 threadlocal变量同来同步同一线程之间的资源。
分布式锁设计要点
一个完整的分布式锁,需要满足以下几个条件:
互斥性。在任意时刻,只有一个客户端能持有锁。
不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。这就要求必须为锁加上超时时间。
具有容错性。只要大部分的Redis节点正常运行,客户端就可以加锁和解锁。这点是要求必须能够保证相关操作的原子性。
解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。这就要求每把锁必须有自己的标识,在解锁时通过判断标识来保证是释放的自己的锁。
代码实现
Talk is cheap, show me the code。下面是一个分布式锁的基本实现:
class DistributedLock(object):
"""
This is not a 100% reliable distributed lock implementation. It is a try best one.
And this distributed lock algorithm is based on the time synchronization between
each node.
But it should be enough for our project.
"""
def __init__(self, lock_name, expire_time=3):
self.name = lock_name + '_distributed_lock'
self.expire_time = expire_time
self.uuid = str(uuid.uuid4())
@redis_exc
def __enter__(self):
"""
If driver is not redis, nothing will happen.
"""
redis_conn = driver_manager.get_global_redis_conn()
while True:
if redis_conn.set(self.name, self.uuid, ex=self.expire_time*2, nx=True) == 1:
LOG.debug('Get lock %s' % self.name)
break
LOG.debug('Sleep 0.1s to try to get lock %s' % self.name)
time.sleep(0.1)
return self
@redis_exc
def __exit__(self, exc_type, exc_val, exc_tb):
redis_conn = driver_manager.get_global_redis_conn()
owner_id = redis_conn.get(self.name)
if owner_id == self.uuid:
redis_conn.delete(self.name)
LOG.debug('Release lock %s' % self.name)
else:
LOG.warning('Transaction locked by distributed lock %s need larger expire time' % self.name)
return
可以看到,我们加锁就一行代码:redis_conn.set(self.name, self.uuid, ex=self.expire_time, nx=True)
,这个set()方法一共有五个形参:
-
第一个为key,表示锁的唯一性标识。 -
第二个为value,我们传的是uuid,很多童鞋可能不明白,有key作为锁不就够了吗,为什么还要用到value?原因就是我们在上面讲到可靠性时,分布式锁要满足第四个条件解铃还须系铃人,通过给value赋值为uuid,我们就知道这把锁是哪个请求加的了,在解锁的时候就可以有依据。 -
第三个表示锁的过期时间,如果程序发生意外没有及时解锁,也能保证在超时后其他请求能够正常加锁 -
第四个nx=True,意思是SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作;
分布式锁支持可重入
_request_store = threading.local()
def _add_locker_info(lock_name, owner_uuid):
setattr(_request_store, lock_name, owner_uuid)
def _get_locker_info(lock_name):
return getattr(_request_store, lock_name, None)
class DistributedLock(object):
"""
This is not a 100% reliable distributed lock implementation. It is a try best one.
And this distributed lock algorithm is based on the time synchronization between
each node.
But it should be enough for our project.
"""
def __init__(self, lock_name, expire_time=3):
self.name = lock_name + '_distributed_lock'
self.expire_time = expire_time
self.uuid = str(uuid.uuid4())
@redis_exc
def __enter__(self):
"""
If driver is not redis, nothing will happen.
"""
redis_conn = driver_manager.get_global_redis_conn()
while True:
if redis_conn.set(self.name, self.uuid, ex=self.expire_time*2, nx=True) == 1:
_add_locker_info(self.name, self.uuid)
LOG.debug('Get lock %s' % self.name)
break
if _get_locker_info(self.name):
LOG.debug('Get re-entrant lock %s' % self.name)
break
LOG.debug('Sleep 0.1s to try to get lock %s' % self.name)
time.sleep(0.1)
return self
@redis_exc
def __exit__(self, exc_type, exc_val, exc_tb):
redis_conn = driver_manager.get_global_redis_conn()
owner_id = redis_conn.get(self.name)
if owner_id == self.uuid:
redis_conn.delete(self.name)
LOG.debug('Release lock %s' % self.name)
elif owner_id == _get_locker_info(self.name):
LOG.debug('Quit from re-entrant lock %s, do nothing ' % self.name)
else:
LOG.warning('Transaction locked by distributed lock %s need larger expire time' % self.name)
return
注意点
-
threadlocal中的内存不主动释放,会不会内存泄漏?
-
可重入锁的counter计数
-
关于threadlocal的线程隔离
def currentThread():
"""Return the current Thread object, corresponding to the caller's thread of control.
If the caller's thread of control was not created through the threading
module, a dummy thread object with limited functionality is returned.
"""
try:
return _active[_get_ident()]
except KeyError:
##print "current_thread(): no current thread for", _get_ident()
return _DummyThread()
# 获取线程号函数
_get_ident = thread.get_ident
def current_thread():
g = greenlet.getcurrent()
if not g:
# Not currently in a greenthread, fall back to standard function
return _fixup_thread(__orig_threading.current_thread())
try:
active = __threadlocal.active
except AttributeError:
active = __threadlocal.active = {}
try:
t = active[id(g)]
except KeyError:
# Add green thread to active if we can clean it up on exit
def cleanup(g):
del active[id(g)]
try:
g.link(cleanup)
except AttributeError:
# Not a GreenThread type, so there's no way to hook into
# the green thread exiting. Fall back to the standard
# function then.
t = _fixup_thread(__orig_threading.currentThread())
else:
t = active[id(g)] = _GreenThread(g)
return t
greenlet.getcurrent()
Returns the current greenlet (i.e. the one which called this function).
返回当前greenlet,也就是谁在调用这个函数。
附录:python threading.RLock源码
_allocate_lock = thread.allocate_lock
_get_ident = thread.get_ident
Lock = _allocate_lock
def RLock(*args, **kwargs):
"""Factory function that returns a new reentrant lock.
A reentrant lock must be released by the thread that acquired it. Once a
thread has acquired a reentrant lock, the same thread may acquire it again
without blocking; the thread must release it once for each time it has
acquired it.
"""
return _RLock(*args, **kwargs)
class _RLock(_Verbose):
"""A reentrant lock must be released by the thread that acquired it. Once a
thread has acquired a reentrant lock, the same thread may acquire it
again without blocking; the thread must release it once for each time it
has acquired it.
"""
def __init__(self, verbose=None):
_Verbose.__init__(self, verbose)
self.__block = _allocate_lock()
self.__owner = None
self.__count = 0
def __repr__(self):
owner = self.__owner
try:
owner = _active[owner].name
except KeyError:
pass
return "<%s owner=%r count=%d>" % (
self.__class__.__name__, owner, self.__count)
def acquire(self, blocking=1):
"""Acquire a lock, blocking or non-blocking.
When invoked without arguments: if this thread already owns the lock,
increment the recursion level by one, and return immediately. Otherwise,
if another thread owns the lock, block until the lock is unlocked. Once
the lock is unlocked (not owned by any thread), then grab ownership, set
the recursion level to one, and return. If more than one thread is
blocked waiting until the lock is unlocked, only one at a time will be
able to grab ownership of the lock. There is no return value in this
case.
When invoked with the blocking argument set to true, do the same thing
as when called without arguments, and return true.
When invoked with the blocking argument set to false, do not block. If a
call without an argument would block, return false immediately;
otherwise, do the same thing as when called without arguments, and
return true.
"""
me = _get_ident()
if self.__owner == me:
self.__count = self.__count + 1
if __debug__:
self._note("%s.acquire(%s): recursive success", self, blocking)
return 1
rc = self.__block.acquire(blocking)
if rc:
self.__owner = me
self.__count = 1
if __debug__:
self._note("%s.acquire(%s): initial success", self, blocking)
else:
if __debug__:
self._note("%s.acquire(%s): failure", self, blocking)
return rc
__enter__ = acquire
def release(self):
"""Release a lock, decrementing the recursion level.
If after the decrement it is zero, reset the lock to unlocked (not owned
by any thread), and if any other threads are blocked waiting for the
lock to become unlocked, allow exactly one of them to proceed. If after
the decrement the recursion level is still nonzero, the lock remains
locked and owned by the calling thread.
Only call this method when the calling thread owns the lock. A
RuntimeError is raised if this method is called when the lock is
unlocked.
There is no return value.
"""
if self.__owner != _get_ident():
raise RuntimeError("cannot release un-acquired lock")
self.__count = count = self.__count - 1
if not count:
self.__owner = None
self.__block.release()
if __debug__:
self._note("%s.release(): final release", self)
else:
if __debug__:
self._note("%s.release(): non-final release", self)
def __exit__(self, t, v, tb):
self.release()
# Internal methods used by condition variables
def _acquire_restore(self, count_owner):
count, owner = count_owner
self.__block.acquire()
self.__count = count
self.__owner = owner
if __debug__:
self._note("%s._acquire_restore()", self)
def _release_save(self):
if __debug__:
self._note("%s._release_save()", self)
count = self.__count
self.__count = 0
owner = self.__owner
self.__owner = None
self.__block.release()
return (count, owner)
def _is_owned(self):
return self.__owner == _get_ident()
-
简单分析
参考文档
-
https://segmentfault.com/a/1190000021199037 -
https://github.com/yangfeixxx/python_redis -
https://leaveslm.github.io/2018/08/08/2018-2018-08-08- 基于-Redis-实现可重入分布式锁/ -
https://www.cnblogs.com/xybaby/p/6420873.html -
https://www.cnblogs.com/linjiqin/p/8003838.html
【您的在看,我的莫大鼓励】