Merge "Convert the rest of memcached driver functions to futures"
This commit is contained in:
commit
eb0c584949
|
@ -207,46 +207,44 @@ class MemcachedDriver(coordination.CoordinationDriver):
|
|||
return self.client.get(self._GROUP_LIST_KEY) or []
|
||||
return MemcachedFutureResult(self._executor.submit(_get_groups))
|
||||
|
||||
@retry
|
||||
def join_group(self, group_id, capabilities=b""):
|
||||
encoded_group = self._encode_group_id(group_id)
|
||||
group_members, cas = self.client.gets(encoded_group)
|
||||
if not cas:
|
||||
return MemcachedAsyncError(
|
||||
coordination.GroupNotCreated(group_id))
|
||||
if self._member_id in group_members:
|
||||
return MemcachedAsyncError(
|
||||
coordination.MemberAlreadyExist(group_id, self._member_id))
|
||||
group_members[self._member_id] = {
|
||||
"capabilities": capabilities,
|
||||
}
|
||||
if not self.client.cas(encoded_group,
|
||||
group_members,
|
||||
cas):
|
||||
# It changed, let's try again
|
||||
raise Retry
|
||||
self._groups.add(group_id)
|
||||
return MemcachedAsyncResult(None)
|
||||
|
||||
@retry
|
||||
@retry
|
||||
def _join_group():
|
||||
group_members, cas = self.client.gets(encoded_group)
|
||||
if not cas:
|
||||
raise coordination.GroupNotCreated(group_id)
|
||||
if self._member_id in group_members:
|
||||
raise coordination.MemberAlreadyExist(group_id,
|
||||
self._member_id)
|
||||
group_members[self._member_id] = {
|
||||
"capabilities": capabilities,
|
||||
}
|
||||
if not self.client.cas(encoded_group, group_members, cas):
|
||||
# It changed, let's try again
|
||||
raise Retry
|
||||
self._groups.add(group_id)
|
||||
|
||||
return MemcachedFutureResult(self._executor.submit(_join_group))
|
||||
|
||||
def leave_group(self, group_id):
|
||||
encoded_group = self._encode_group_id(group_id)
|
||||
group_members, cas = self.client.gets(encoded_group)
|
||||
if not cas:
|
||||
return MemcachedAsyncError(
|
||||
coordination.GroupNotCreated(group_id))
|
||||
if self._member_id not in group_members:
|
||||
return MemcachedAsyncError(
|
||||
coordination.MemberNotJoined(group_id,
|
||||
self._member_id))
|
||||
del group_members[self._member_id]
|
||||
if not self.client.cas(encoded_group,
|
||||
group_members,
|
||||
cas):
|
||||
# It changed, let's try again
|
||||
raise Retry
|
||||
self._groups.discard(group_id)
|
||||
return MemcachedAsyncResult(None)
|
||||
|
||||
@retry
|
||||
def _leave_group():
|
||||
group_members, cas = self.client.gets(encoded_group)
|
||||
if not cas:
|
||||
raise coordination.GroupNotCreated(group_id)
|
||||
if self._member_id not in group_members:
|
||||
raise coordination.MemberNotJoined(group_id, self._member_id)
|
||||
del group_members[self._member_id]
|
||||
if not self.client.cas(encoded_group, group_members, cas):
|
||||
# It changed, let's try again
|
||||
raise Retry
|
||||
self._groups.discard(group_id)
|
||||
|
||||
return MemcachedFutureResult(self._executor.submit(_leave_group))
|
||||
|
||||
def _get_members(self, group_id):
|
||||
group_members = self.client.get(self._encode_group_id(group_id))
|
||||
|
@ -269,21 +267,23 @@ class MemcachedDriver(coordination.CoordinationDriver):
|
|||
return MemcachedFutureResult(
|
||||
self._executor.submit(_get_member_capabilities))
|
||||
|
||||
@retry
|
||||
def update_capabilities(self, group_id, capabilities):
|
||||
encoded_group = self._encode_group_id(group_id)
|
||||
group_members, cas = self.client.gets(encoded_group)
|
||||
if cas is None:
|
||||
return MemcachedAsyncError(
|
||||
coordination.GroupNotCreated(group_id))
|
||||
if self._member_id not in group_members:
|
||||
return MemcachedAsyncError(
|
||||
coordination.MemberNotJoined(group_id, self._member_id))
|
||||
group_members[self._member_id][b'capabilities'] = capabilities
|
||||
if not self.client.cas(encoded_group, group_members, cas):
|
||||
# It changed, try again
|
||||
raise Retry
|
||||
return MemcachedAsyncResult(None)
|
||||
|
||||
@retry
|
||||
def _update_capabilities():
|
||||
group_members, cas = self.client.gets(encoded_group)
|
||||
if cas is None:
|
||||
raise coordination.GroupNotCreated(group_id)
|
||||
if self._member_id not in group_members:
|
||||
raise coordination.MemberNotJoined(group_id, self._member_id)
|
||||
group_members[self._member_id][b'capabilities'] = capabilities
|
||||
if not self.client.cas(encoded_group, group_members, cas):
|
||||
# It changed, try again
|
||||
raise Retry
|
||||
|
||||
return MemcachedFutureResult(
|
||||
self._executor.submit(_update_capabilities))
|
||||
|
||||
def get_leader(self, group_id):
|
||||
def _get_leader():
|
||||
|
@ -385,39 +385,3 @@ class MemcachedFutureResult(coordination.CoordAsyncResult):
|
|||
|
||||
def done(self):
|
||||
return self._fut.done()
|
||||
|
||||
|
||||
class MemcachedAsyncResult(coordination.CoordAsyncResult):
|
||||
"""Memcached asynchronous result.
|
||||
|
||||
Unfortunately, this is mostely a fake because our driver is not
|
||||
asynchronous at all. :-(.
|
||||
|
||||
"""
|
||||
def __init__(self, result):
|
||||
self._result = result
|
||||
|
||||
def get(self, timeout=0):
|
||||
return self._result
|
||||
|
||||
@staticmethod
|
||||
def done():
|
||||
return True
|
||||
|
||||
|
||||
class MemcachedAsyncError(coordination.CoordAsyncResult):
|
||||
"""Memcached asynchronous error.
|
||||
|
||||
Unfortunately, this is mostely a fake because our driver is not
|
||||
asynchronous at all. :-(.
|
||||
|
||||
"""
|
||||
def __init__(self, error):
|
||||
self._error = error
|
||||
|
||||
def get(self, timeout=10):
|
||||
raise self._error
|
||||
|
||||
@staticmethod
|
||||
def done():
|
||||
return True
|
||||
|
|
Loading…
Reference in New Issue