Make lock blocking with no time out by default
The lock_timeout option that has been copied from memcached actually does not mean the same thing at all. In memcached, it's used to expire the lock when a client dies, whereas in IPC and ZK it's meant to be a timeout on acquire(). This means that by default acquire(blocking=True) will have different behaviours between the different drivers. We now avoid that by removing lock_timeout from ZK and IPC, and allowing the blocking argument from acquire() to be a numeric value. Change-Id: Idb68e2ff945403f7ee24d86ea480531122b19ff8
This commit is contained in:
parent
ebe41afc52
commit
d00413cdfa
|
@ -42,10 +42,9 @@ class IPCLock(locking.Lock):
|
|||
"""
|
||||
_LOCK_PROJECT = b'__TOOZ_LOCK_'
|
||||
|
||||
def __init__(self, name, timeout):
|
||||
def __init__(self, name):
|
||||
super(IPCLock, self).__init__(name)
|
||||
self.key = self.ftok(name, self._LOCK_PROJECT)
|
||||
self.timeout = timeout
|
||||
self._lock = None
|
||||
|
||||
@staticmethod
|
||||
|
@ -62,11 +61,17 @@ class IPCLock(locking.Lock):
|
|||
return (int(h.hexdigest(), 16) % _KEY_RANGE) + sysv_ipc.KEY_MIN
|
||||
|
||||
def acquire(self, blocking=True):
|
||||
timeout = self.timeout if blocking else 0
|
||||
if ((blocking is False or timeout is not None)
|
||||
if (blocking is not True
|
||||
and sysv_ipc.SEMAPHORE_TIMEOUT_SUPPORTED is False):
|
||||
raise tooz.NotImplemented(
|
||||
"This system does not support semaphore timeout")
|
||||
# Convert blocking argument to a valid timeout value
|
||||
if blocking is True:
|
||||
timeout = None
|
||||
elif blocking is False:
|
||||
timeout = 0
|
||||
else:
|
||||
timeout = blocking
|
||||
while True:
|
||||
try:
|
||||
self._lock = sysv_ipc.Semaphore(self.key,
|
||||
|
@ -104,18 +109,12 @@ class IPCLock(locking.Lock):
|
|||
class IPCDriver(coordination.CoordinationDriver):
|
||||
|
||||
def __init__(self, member_id, parsed_url, options):
|
||||
"""Initialize the IPC driver.
|
||||
|
||||
:param lock_timeout: how many seconds to wait when trying to acquire
|
||||
a lock in blocking mode. None means forever, 0
|
||||
means don't wait, any other value means wait
|
||||
this amount of seconds.
|
||||
"""
|
||||
"""Initialize the IPC driver."""
|
||||
super(IPCDriver, self).__init__()
|
||||
self.lock_timeout = int(options.get('lock_timeout', ['30'])[-1])
|
||||
|
||||
def get_lock(self, name):
|
||||
return IPCLock(name, self.lock_timeout)
|
||||
@staticmethod
|
||||
def get_lock(name):
|
||||
return IPCLock(name)
|
||||
|
||||
@staticmethod
|
||||
def watch_join_group(group_id, callback):
|
||||
|
|
|
@ -41,10 +41,15 @@ def retry_if_retry_raised(exception):
|
|||
return isinstance(exception, Retry)
|
||||
|
||||
|
||||
_RETRYING_KWARGS = dict(
|
||||
retry_on_exception=retry_if_retry_raised,
|
||||
wait='exponential_sleep',
|
||||
wait_exponential_max=1,
|
||||
)
|
||||
|
||||
|
||||
def retry(f):
|
||||
return retrying.retry(
|
||||
retry_on_exception=retry_if_retry_raised,
|
||||
wait='exponential_sleep', wait_exponential_max=1)(f)
|
||||
return retrying.retry(**_RETRYING_KWARGS)(f)
|
||||
|
||||
|
||||
class MemcachedLock(locking.Lock):
|
||||
|
@ -57,16 +62,20 @@ class MemcachedLock(locking.Lock):
|
|||
|
||||
@retry
|
||||
def acquire(self, blocking=True):
|
||||
if self.coord.client.add(
|
||||
self.name,
|
||||
self.coord._member_id,
|
||||
expire=self.timeout,
|
||||
noreply=False):
|
||||
self.coord._acquired_locks.append(self)
|
||||
return True
|
||||
if not blocking:
|
||||
return False
|
||||
raise Retry
|
||||
def _acquire():
|
||||
if self.coord.client.add(
|
||||
self.name,
|
||||
self.coord._member_id,
|
||||
expire=self.timeout,
|
||||
noreply=False):
|
||||
self.coord._acquired_locks.append(self)
|
||||
return True
|
||||
if blocking is False:
|
||||
return False
|
||||
raise Retry
|
||||
kwargs = _RETRYING_KWARGS.copy()
|
||||
kwargs['stop_max_delay'] = blocking
|
||||
return retrying.Retrying(**kwargs).call(_acquire)
|
||||
|
||||
def release(self):
|
||||
if self.coord.client.delete(self.name, noreply=False):
|
||||
|
|
|
@ -28,15 +28,13 @@ from tooz import utils
|
|||
|
||||
|
||||
class ZooKeeperLock(locking.Lock):
|
||||
def __init__(self, name, lock, timeout):
|
||||
def __init__(self, name, lock):
|
||||
super(ZooKeeperLock, self).__init__(name)
|
||||
self._lock = lock
|
||||
self.timeout = timeout
|
||||
|
||||
def acquire(self, blocking=True):
|
||||
timeout = self.timeout if blocking else None
|
||||
return self._lock.acquire(blocking=blocking,
|
||||
timeout=timeout)
|
||||
return self._lock.acquire(blocking=bool(blocking),
|
||||
timeout=blocking)
|
||||
|
||||
def release(self):
|
||||
return self._lock.release()
|
||||
|
@ -47,10 +45,6 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver):
|
|||
|
||||
:param timeout: connection timeout to wait when first connecting to the
|
||||
zookeeper server
|
||||
:param lock_timeout: how many seconds to wait when trying to acquire
|
||||
a lock in blocking mode. None means forever, 0
|
||||
means don't wait, any other value means wait
|
||||
this amount of seconds.
|
||||
"""
|
||||
|
||||
_TOOZ_NAMESPACE = b"tooz"
|
||||
|
@ -59,7 +53,6 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver):
|
|||
super(BaseZooKeeperDriver, self).__init__()
|
||||
self._member_id = member_id
|
||||
self.timeout = int(options.get('timeout', ['10'])[-1])
|
||||
self.lock_timeout = int(options.get('lock_timeout', ['30'])[-1])
|
||||
|
||||
def start(self):
|
||||
try:
|
||||
|
@ -347,8 +340,7 @@ class KazooDriver(BaseZooKeeperDriver):
|
|||
name,
|
||||
self._coord.Lock(
|
||||
self.paths_join(b"/", self._TOOZ_NAMESPACE, b"locks", name),
|
||||
self._member_id.decode('ascii')),
|
||||
self.lock_timeout)
|
||||
self._member_id.decode('ascii')))
|
||||
|
||||
def run_watchers(self):
|
||||
ret = []
|
||||
|
|
|
@ -47,6 +47,11 @@ class Lock(object):
|
|||
def acquire(self, blocking=True):
|
||||
"""Attempts to acquire the lock.
|
||||
|
||||
:param blocking: If True, blocks until the lock is acquired. If False,
|
||||
returns right away. Otherwise, the value is used as a
|
||||
timeout value and the call returns maximum after this
|
||||
number of seonds.
|
||||
:returns: returns true if acquired (false if not)
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
|
|
Loading…
Reference in New Issue