memcached: add locking
Change-Id: I18d5358278e31fcff1cbd1c20b7be814052cae05 Co-Authored-By: Sahid Ferdjaoui <sahid.ferdjaoui@cloudwatt.com>
This commit is contained in:
parent
7ac1d2110c
commit
26e6243772
|
@ -27,6 +27,7 @@ import pymemcache.client
|
|||
import six
|
||||
|
||||
from tooz import coordination
|
||||
from tooz import locking
|
||||
|
||||
|
||||
class Retry(Exception):
|
||||
|
@ -44,17 +45,50 @@ def retry(f, *args, **kwargs):
|
|||
time.sleep(random.randint(0, (2 ** try_number)) / 1000.0)
|
||||
|
||||
|
||||
class MemcachedLock(locking.Lock):
|
||||
_LOCK_PREFIX = b'__TOOZ_LOCK_'
|
||||
|
||||
def __init__(self, coord, name, timeout):
|
||||
self.coord = coord
|
||||
self.name = self._LOCK_PREFIX + name
|
||||
self.timeout = timeout
|
||||
|
||||
def acquire(self, blocking=True):
|
||||
while True:
|
||||
if self.coord.client.add(
|
||||
self.name,
|
||||
self.coord._member_id,
|
||||
expire=self.timeout,
|
||||
noreply=False):
|
||||
self.coord._acquired_locks.append(self)
|
||||
return True
|
||||
if not blocking:
|
||||
return False
|
||||
# NOTE(jd) Configurable? :/
|
||||
time.sleep(0.11)
|
||||
|
||||
def release(self):
|
||||
self.coord._acquired_locks.remove(self)
|
||||
self.coord.client.delete(self.name)
|
||||
|
||||
def heartbeat(self):
|
||||
"""Keep the lock alive."""
|
||||
self.coord.client.touch(self.name,
|
||||
expire=self.timeout)
|
||||
|
||||
|
||||
class MemcachedDriver(coordination.CoordinationDriver):
|
||||
|
||||
_GROUP_PREFIX = b'_TOOZ_GROUP_'
|
||||
_MEMBER_PREFIX = b'_TOOZ_MEMBER_'
|
||||
_GROUP_LIST_KEY = b'_TOOZ_GROUP_LIST'
|
||||
|
||||
def __init__(self, member_id, membership_timeout=30):
|
||||
def __init__(self, member_id, membership_timeout=30, lock_timeout=30):
|
||||
super(MemcachedDriver, self).__init__()
|
||||
self._member_id = member_id
|
||||
self._groups = set()
|
||||
self.membership_timeout = membership_timeout
|
||||
self.lock_timeout = lock_timeout
|
||||
|
||||
@staticmethod
|
||||
def _msgpack_serializer(key, value):
|
||||
|
@ -80,8 +114,9 @@ class MemcachedDriver(coordination.CoordinationDriver):
|
|||
connect_timeout=timeout)
|
||||
except Exception as e:
|
||||
raise coordination.ToozConnectionError(e)
|
||||
self.heartbeat()
|
||||
self._group_members = collections.defaultdict(set)
|
||||
self._acquired_locks = []
|
||||
self.heartbeat()
|
||||
|
||||
def stop(self):
|
||||
self.client.delete(self._encode_member_id(self._member_id))
|
||||
|
@ -210,6 +245,9 @@ class MemcachedDriver(coordination.CoordinationDriver):
|
|||
self.client.set(self._encode_member_id(self._member_id),
|
||||
"It's alive!",
|
||||
expire=self.membership_timeout)
|
||||
# Reset the acquired locks
|
||||
for lock in self._acquired_locks:
|
||||
lock.heartbeat()
|
||||
|
||||
def _init_watch_group(self, group_id):
|
||||
members = self.client.get(self._encode_group_id(group_id))
|
||||
|
@ -245,6 +283,9 @@ class MemcachedDriver(coordination.CoordinationDriver):
|
|||
def unwatch_elected_as_leader(group_id, callback):
|
||||
raise NotImplementedError
|
||||
|
||||
def get_lock(self, name):
|
||||
return MemcachedLock(self, name, self.lock_timeout)
|
||||
|
||||
def run_watchers(self):
|
||||
result = []
|
||||
for group_id in self.client.get(self._GROUP_LIST_KEY):
|
||||
|
|
Loading…
Reference in New Issue