memcached: add locking

Change-Id: I18d5358278e31fcff1cbd1c20b7be814052cae05
Co-Authored-By: Sahid Ferdjaoui <sahid.ferdjaoui@cloudwatt.com>
This commit is contained in:
Julien Danjou 2014-04-11 14:31:42 +02:00
parent 7ac1d2110c
commit 26e6243772
1 changed files with 43 additions and 2 deletions

View File

@ -27,6 +27,7 @@ import pymemcache.client
import six
from tooz import coordination
from tooz import locking
class Retry(Exception):
@ -44,17 +45,50 @@ def retry(f, *args, **kwargs):
time.sleep(random.randint(0, (2 ** try_number)) / 1000.0)
class MemcachedLock(locking.Lock):
_LOCK_PREFIX = b'__TOOZ_LOCK_'
def __init__(self, coord, name, timeout):
self.coord = coord
self.name = self._LOCK_PREFIX + name
self.timeout = timeout
def acquire(self, blocking=True):
while True:
if self.coord.client.add(
self.name,
self.coord._member_id,
expire=self.timeout,
noreply=False):
self.coord._acquired_locks.append(self)
return True
if not blocking:
return False
# NOTE(jd) Configurable? :/
time.sleep(0.11)
def release(self):
self.coord._acquired_locks.remove(self)
self.coord.client.delete(self.name)
def heartbeat(self):
"""Keep the lock alive."""
self.coord.client.touch(self.name,
expire=self.timeout)
class MemcachedDriver(coordination.CoordinationDriver):
_GROUP_PREFIX = b'_TOOZ_GROUP_'
_MEMBER_PREFIX = b'_TOOZ_MEMBER_'
_GROUP_LIST_KEY = b'_TOOZ_GROUP_LIST'
def __init__(self, member_id, membership_timeout=30):
def __init__(self, member_id, membership_timeout=30, lock_timeout=30):
super(MemcachedDriver, self).__init__()
self._member_id = member_id
self._groups = set()
self.membership_timeout = membership_timeout
self.lock_timeout = lock_timeout
@staticmethod
def _msgpack_serializer(key, value):
@ -80,8 +114,9 @@ class MemcachedDriver(coordination.CoordinationDriver):
connect_timeout=timeout)
except Exception as e:
raise coordination.ToozConnectionError(e)
self.heartbeat()
self._group_members = collections.defaultdict(set)
self._acquired_locks = []
self.heartbeat()
def stop(self):
self.client.delete(self._encode_member_id(self._member_id))
@ -210,6 +245,9 @@ class MemcachedDriver(coordination.CoordinationDriver):
self.client.set(self._encode_member_id(self._member_id),
"It's alive!",
expire=self.membership_timeout)
# Reset the acquired locks
for lock in self._acquired_locks:
lock.heartbeat()
def _init_watch_group(self, group_id):
members = self.client.get(self._encode_group_id(group_id))
@ -245,6 +283,9 @@ class MemcachedDriver(coordination.CoordinationDriver):
def unwatch_elected_as_leader(group_id, callback):
raise NotImplementedError
def get_lock(self, name):
return MemcachedLock(self, name, self.lock_timeout)
def run_watchers(self):
result = []
for group_id in self.client.get(self._GROUP_LIST_KEY):