Merge "Fix calling acquire(blocking=False) twice leads to a deadlock"
This commit is contained in:
commit
af9a4a6c29
|
@ -85,21 +85,23 @@ class IPCLock(locking.Lock):
|
|||
elif blocking and timeout is not None:
|
||||
start_time = time.time()
|
||||
while True:
|
||||
tmplock = None
|
||||
try:
|
||||
self._lock = sysv_ipc.Semaphore(self.key,
|
||||
flags=sysv_ipc.IPC_CREX,
|
||||
initial_value=1)
|
||||
self._lock.undo = True
|
||||
tmplock = sysv_ipc.Semaphore(self.key,
|
||||
flags=sysv_ipc.IPC_CREX,
|
||||
initial_value=1)
|
||||
tmplock.undo = True
|
||||
except sysv_ipc.ExistentialError:
|
||||
# We failed to create it because it already exists, then try to
|
||||
# grab the existing one.
|
||||
try:
|
||||
self._lock = sysv_ipc.Semaphore(self.key)
|
||||
self._lock.undo = True
|
||||
tmplock = sysv_ipc.Semaphore(self.key)
|
||||
tmplock.undo = True
|
||||
except sysv_ipc.ExistentialError:
|
||||
# Semaphore has been deleted in the mean time, retry from
|
||||
# the beginning!
|
||||
continue
|
||||
|
||||
if start_time is not None:
|
||||
elapsed = max(0.0, time.time() - start_time)
|
||||
if elapsed >= timeout:
|
||||
|
@ -109,24 +111,24 @@ class IPCLock(locking.Lock):
|
|||
else:
|
||||
adjusted_timeout = timeout
|
||||
try:
|
||||
self._lock.acquire(timeout=adjusted_timeout)
|
||||
tmplock.acquire(timeout=adjusted_timeout)
|
||||
except sysv_ipc.BusyError:
|
||||
self._lock = None
|
||||
tmplock = None
|
||||
return False
|
||||
except sysv_ipc.ExistentialError:
|
||||
# Likely the lock has been deleted in the meantime, retry
|
||||
continue
|
||||
else:
|
||||
self._lock = tmplock
|
||||
return True
|
||||
|
||||
def release(self):
|
||||
if self._lock is not None:
|
||||
try:
|
||||
self._lock.remove()
|
||||
self._lock = None
|
||||
except sysv_ipc.ExistentialError:
|
||||
return False
|
||||
finally:
|
||||
self._lock = None
|
||||
return True
|
||||
return False
|
||||
|
||||
|
|
|
@ -73,7 +73,6 @@ class MemcachedLock(locking.Lock):
|
|||
super(MemcachedLock, self).__init__(self._LOCK_PREFIX + name)
|
||||
self.coord = coord
|
||||
self.timeout = timeout
|
||||
self.acquired = False
|
||||
|
||||
def is_still_owner(self):
|
||||
if not self.acquired:
|
||||
|
@ -100,8 +99,7 @@ class MemcachedLock(locking.Lock):
|
|||
return False
|
||||
raise _retry.Retry
|
||||
|
||||
self.acquired = gotten = _acquire()
|
||||
return gotten
|
||||
return _acquire()
|
||||
|
||||
@_translate_failures
|
||||
def break_(self):
|
||||
|
@ -167,6 +165,10 @@ class MemcachedLock(locking.Lock):
|
|||
def get_owner(self):
|
||||
return self.coord.client.get(self.name)
|
||||
|
||||
@property
|
||||
def acquired(self):
|
||||
return self in self.coord._acquired_locks
|
||||
|
||||
|
||||
class MemcachedDriver(coordination._RunWatchersMixin,
|
||||
coordination.CoordinationDriver):
|
||||
|
|
|
@ -64,7 +64,6 @@ class RedisLock(locking.Lock):
|
|||
thread_local=False)
|
||||
self._coord = coord
|
||||
self._client = client
|
||||
self.acquired = False
|
||||
|
||||
def is_still_owner(self):
|
||||
with _translate_failures():
|
||||
|
@ -81,11 +80,11 @@ class RedisLock(locking.Lock):
|
|||
def acquire(self, blocking=True):
|
||||
blocking, timeout = utils.convert_blocking(blocking)
|
||||
with _translate_failures():
|
||||
self.acquired = self._lock.acquire(
|
||||
acquired = self._lock.acquire(
|
||||
blocking=blocking, blocking_timeout=timeout)
|
||||
if self.acquired:
|
||||
if acquired:
|
||||
self._coord._acquired_locks.add(self)
|
||||
return self.acquired
|
||||
return acquired
|
||||
|
||||
def release(self):
|
||||
if not self.acquired:
|
||||
|
@ -96,7 +95,6 @@ class RedisLock(locking.Lock):
|
|||
except exceptions.LockError:
|
||||
return False
|
||||
self._coord._acquired_locks.discard(self)
|
||||
self.acquired = False
|
||||
return True
|
||||
|
||||
def heartbeat(self):
|
||||
|
@ -104,6 +102,10 @@ class RedisLock(locking.Lock):
|
|||
with _translate_failures():
|
||||
self._lock.extend(self._lock.timeout)
|
||||
|
||||
@property
|
||||
def acquired(self):
|
||||
return self in self._coord._acquired_locks
|
||||
|
||||
|
||||
class RedisDriver(coordination._RunWatchersMixin,
|
||||
coordination.CoordinationDriver):
|
||||
|
|
|
@ -37,7 +37,6 @@ class ZooKeeperLock(locking.Lock):
|
|||
super(ZooKeeperLock, self).__init__(name)
|
||||
self._lock = lock
|
||||
self._client = lock.client
|
||||
self.acquired = False
|
||||
|
||||
def is_still_owner(self):
|
||||
if not self.acquired:
|
||||
|
@ -58,18 +57,20 @@ class ZooKeeperLock(locking.Lock):
|
|||
|
||||
def acquire(self, blocking=True):
|
||||
blocking, timeout = utils.convert_blocking(blocking)
|
||||
self.acquired = self._lock.acquire(blocking=blocking,
|
||||
timeout=timeout)
|
||||
return self.acquired
|
||||
return self._lock.acquire(blocking=blocking,
|
||||
timeout=timeout)
|
||||
|
||||
def release(self):
|
||||
if self.acquired:
|
||||
self._lock.release()
|
||||
self.acquired = False
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
@property
|
||||
def acquired(self):
|
||||
return self._lock.is_acquired
|
||||
|
||||
|
||||
class BaseZooKeeperDriver(coordination.CoordinationDriver):
|
||||
"""Initialize the zookeeper driver.
|
||||
|
|
|
@ -886,6 +886,13 @@ class TestAPI(testscenarios.TestWithScenarios,
|
|||
def _get_random_uuid():
|
||||
return str(uuid.uuid4()).encode('ascii')
|
||||
|
||||
def test_acquire_twice_no_deadlock_releasing(self):
|
||||
name = self._get_random_uuid()
|
||||
lock = self._coord.get_lock(name)
|
||||
self.assertTrue(lock.acquire(blocking=False))
|
||||
self.assertFalse(lock.acquire(blocking=False))
|
||||
self.assertTrue(lock.release())
|
||||
|
||||
|
||||
class TestHook(testcase.TestCase):
|
||||
def setUp(self):
|
||||
|
|
Loading…
Reference in New Issue