Merge "Fix calling acquire(blocking=False) twice leads to a deadlock"

This commit is contained in:
Jenkins 2016-02-23 01:48:34 +00:00 committed by Gerrit Code Review
commit af9a4a6c29
5 changed files with 37 additions and 23 deletions

View File

@ -85,21 +85,23 @@ class IPCLock(locking.Lock):
elif blocking and timeout is not None:
start_time = time.time()
while True:
tmplock = None
try:
self._lock = sysv_ipc.Semaphore(self.key,
flags=sysv_ipc.IPC_CREX,
initial_value=1)
self._lock.undo = True
tmplock = sysv_ipc.Semaphore(self.key,
flags=sysv_ipc.IPC_CREX,
initial_value=1)
tmplock.undo = True
except sysv_ipc.ExistentialError:
# We failed to create it because it already exists, then try to
# grab the existing one.
try:
self._lock = sysv_ipc.Semaphore(self.key)
self._lock.undo = True
tmplock = sysv_ipc.Semaphore(self.key)
tmplock.undo = True
except sysv_ipc.ExistentialError:
# Semaphore has been deleted in the mean time, retry from
# the beginning!
continue
if start_time is not None:
elapsed = max(0.0, time.time() - start_time)
if elapsed >= timeout:
@ -109,24 +111,24 @@ class IPCLock(locking.Lock):
else:
adjusted_timeout = timeout
try:
self._lock.acquire(timeout=adjusted_timeout)
tmplock.acquire(timeout=adjusted_timeout)
except sysv_ipc.BusyError:
self._lock = None
tmplock = None
return False
except sysv_ipc.ExistentialError:
# Likely the lock has been deleted in the meantime, retry
continue
else:
self._lock = tmplock
return True
def release(self):
if self._lock is not None:
try:
self._lock.remove()
self._lock = None
except sysv_ipc.ExistentialError:
return False
finally:
self._lock = None
return True
return False

View File

@ -73,7 +73,6 @@ class MemcachedLock(locking.Lock):
super(MemcachedLock, self).__init__(self._LOCK_PREFIX + name)
self.coord = coord
self.timeout = timeout
self.acquired = False
def is_still_owner(self):
if not self.acquired:
@ -100,8 +99,7 @@ class MemcachedLock(locking.Lock):
return False
raise _retry.Retry
self.acquired = gotten = _acquire()
return gotten
return _acquire()
@_translate_failures
def break_(self):
@ -167,6 +165,10 @@ class MemcachedLock(locking.Lock):
def get_owner(self):
return self.coord.client.get(self.name)
@property
def acquired(self):
return self in self.coord._acquired_locks
class MemcachedDriver(coordination._RunWatchersMixin,
coordination.CoordinationDriver):

View File

@ -64,7 +64,6 @@ class RedisLock(locking.Lock):
thread_local=False)
self._coord = coord
self._client = client
self.acquired = False
def is_still_owner(self):
with _translate_failures():
@ -81,11 +80,11 @@ class RedisLock(locking.Lock):
def acquire(self, blocking=True):
blocking, timeout = utils.convert_blocking(blocking)
with _translate_failures():
self.acquired = self._lock.acquire(
acquired = self._lock.acquire(
blocking=blocking, blocking_timeout=timeout)
if self.acquired:
if acquired:
self._coord._acquired_locks.add(self)
return self.acquired
return acquired
def release(self):
if not self.acquired:
@ -96,7 +95,6 @@ class RedisLock(locking.Lock):
except exceptions.LockError:
return False
self._coord._acquired_locks.discard(self)
self.acquired = False
return True
def heartbeat(self):
@ -104,6 +102,10 @@ class RedisLock(locking.Lock):
with _translate_failures():
self._lock.extend(self._lock.timeout)
@property
def acquired(self):
return self in self._coord._acquired_locks
class RedisDriver(coordination._RunWatchersMixin,
coordination.CoordinationDriver):

View File

@ -37,7 +37,6 @@ class ZooKeeperLock(locking.Lock):
super(ZooKeeperLock, self).__init__(name)
self._lock = lock
self._client = lock.client
self.acquired = False
def is_still_owner(self):
if not self.acquired:
@ -58,18 +57,20 @@ class ZooKeeperLock(locking.Lock):
def acquire(self, blocking=True):
blocking, timeout = utils.convert_blocking(blocking)
self.acquired = self._lock.acquire(blocking=blocking,
timeout=timeout)
return self.acquired
return self._lock.acquire(blocking=blocking,
timeout=timeout)
def release(self):
if self.acquired:
self._lock.release()
self.acquired = False
return True
else:
return False
@property
def acquired(self):
return self._lock.is_acquired
class BaseZooKeeperDriver(coordination.CoordinationDriver):
"""Initialize the zookeeper driver.

View File

@ -886,6 +886,13 @@ class TestAPI(testscenarios.TestWithScenarios,
def _get_random_uuid():
return str(uuid.uuid4()).encode('ascii')
def test_acquire_twice_no_deadlock_releasing(self):
name = self._get_random_uuid()
lock = self._coord.get_lock(name)
self.assertTrue(lock.acquire(blocking=False))
self.assertFalse(lock.acquire(blocking=False))
self.assertTrue(lock.release())
class TestHook(testcase.TestCase):
def setUp(self):