Make lock blocking with no time out by default

The lock_timeout option that has been copied from memcached actually
does not mean the same thing at all. In memcached, it's used to expire
the lock when a client dies, whereas in IPC and ZK it's meant to be a
timeout on acquire().
This means that by default acquire(blocking=True) will have different
behaviours between the different drivers.

We now avoid that by removing lock_timeout from ZK and IPC, and allowing
the blocking argument from acquire() to be a numeric value.

Change-Id: Idb68e2ff945403f7ee24d86ea480531122b19ff8
This commit is contained in:
Julien Danjou 2014-10-03 20:53:10 +02:00
parent ebe41afc52
commit d00413cdfa
4 changed files with 44 additions and 39 deletions

View File

@ -42,10 +42,9 @@ class IPCLock(locking.Lock):
"""
_LOCK_PROJECT = b'__TOOZ_LOCK_'
def __init__(self, name, timeout):
def __init__(self, name):
super(IPCLock, self).__init__(name)
self.key = self.ftok(name, self._LOCK_PROJECT)
self.timeout = timeout
self._lock = None
@staticmethod
@ -62,11 +61,17 @@ class IPCLock(locking.Lock):
return (int(h.hexdigest(), 16) % _KEY_RANGE) + sysv_ipc.KEY_MIN
def acquire(self, blocking=True):
timeout = self.timeout if blocking else 0
if ((blocking is False or timeout is not None)
if (blocking is not True
and sysv_ipc.SEMAPHORE_TIMEOUT_SUPPORTED is False):
raise tooz.NotImplemented(
"This system does not support semaphore timeout")
# Convert blocking argument to a valid timeout value
if blocking is True:
timeout = None
elif blocking is False:
timeout = 0
else:
timeout = blocking
while True:
try:
self._lock = sysv_ipc.Semaphore(self.key,
@ -104,18 +109,12 @@ class IPCLock(locking.Lock):
class IPCDriver(coordination.CoordinationDriver):
def __init__(self, member_id, parsed_url, options):
"""Initialize the IPC driver.
:param lock_timeout: how many seconds to wait when trying to acquire
a lock in blocking mode. None means forever, 0
means don't wait, any other value means wait
this amount of seconds.
"""
"""Initialize the IPC driver."""
super(IPCDriver, self).__init__()
self.lock_timeout = int(options.get('lock_timeout', ['30'])[-1])
def get_lock(self, name):
return IPCLock(name, self.lock_timeout)
@staticmethod
def get_lock(name):
return IPCLock(name)
@staticmethod
def watch_join_group(group_id, callback):

View File

@ -41,10 +41,15 @@ def retry_if_retry_raised(exception):
return isinstance(exception, Retry)
_RETRYING_KWARGS = dict(
retry_on_exception=retry_if_retry_raised,
wait='exponential_sleep',
wait_exponential_max=1,
)
def retry(f):
return retrying.retry(
retry_on_exception=retry_if_retry_raised,
wait='exponential_sleep', wait_exponential_max=1)(f)
return retrying.retry(**_RETRYING_KWARGS)(f)
class MemcachedLock(locking.Lock):
@ -57,16 +62,20 @@ class MemcachedLock(locking.Lock):
@retry
def acquire(self, blocking=True):
if self.coord.client.add(
self.name,
self.coord._member_id,
expire=self.timeout,
noreply=False):
self.coord._acquired_locks.append(self)
return True
if not blocking:
return False
raise Retry
def _acquire():
if self.coord.client.add(
self.name,
self.coord._member_id,
expire=self.timeout,
noreply=False):
self.coord._acquired_locks.append(self)
return True
if blocking is False:
return False
raise Retry
kwargs = _RETRYING_KWARGS.copy()
kwargs['stop_max_delay'] = blocking
return retrying.Retrying(**kwargs).call(_acquire)
def release(self):
if self.coord.client.delete(self.name, noreply=False):

View File

@ -28,15 +28,13 @@ from tooz import utils
class ZooKeeperLock(locking.Lock):
def __init__(self, name, lock, timeout):
def __init__(self, name, lock):
super(ZooKeeperLock, self).__init__(name)
self._lock = lock
self.timeout = timeout
def acquire(self, blocking=True):
timeout = self.timeout if blocking else None
return self._lock.acquire(blocking=blocking,
timeout=timeout)
return self._lock.acquire(blocking=bool(blocking),
timeout=blocking)
def release(self):
return self._lock.release()
@ -47,10 +45,6 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver):
:param timeout: connection timeout to wait when first connecting to the
zookeeper server
:param lock_timeout: how many seconds to wait when trying to acquire
a lock in blocking mode. None means forever, 0
means don't wait, any other value means wait
this amount of seconds.
"""
_TOOZ_NAMESPACE = b"tooz"
@ -59,7 +53,6 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver):
super(BaseZooKeeperDriver, self).__init__()
self._member_id = member_id
self.timeout = int(options.get('timeout', ['10'])[-1])
self.lock_timeout = int(options.get('lock_timeout', ['30'])[-1])
def start(self):
try:
@ -347,8 +340,7 @@ class KazooDriver(BaseZooKeeperDriver):
name,
self._coord.Lock(
self.paths_join(b"/", self._TOOZ_NAMESPACE, b"locks", name),
self._member_id.decode('ascii')),
self.lock_timeout)
self._member_id.decode('ascii')))
def run_watchers(self):
ret = []

View File

@ -47,6 +47,11 @@ class Lock(object):
def acquire(self, blocking=True):
"""Attempts to acquire the lock.
:param blocking: If True, blocks until the lock is acquired. If False,
returns right away. Otherwise, the value is used as a
timeout value and the call returns maximum after this
number of seonds.
:returns: returns true if acquired (false if not)
:rtype: bool
"""