Implement group support for etcd3gw

This adds the various methods to support groups operations on the etc3gw
driver.

Change-Id: I8acb2f617f6044449fb3a91a9c3de73ef0061d36
This commit is contained in:
Thomas Herve 2018-02-28 17:15:12 +01:00
parent 824ff65433
commit 6ab8c380c8
2 changed files with 210 additions and 3 deletions

View File

@ -0,0 +1,4 @@
---
features:
- |
The etcd3gw driver now supports the group membership API.

View File

@ -29,6 +29,11 @@ from tooz import locking
from tooz import utils
def _encode(data):
"""Safely encode data for consumption of the gateway."""
return base64.b64encode(data).decode("ascii")
def _translate_failures(func):
"""Translates common requests exceptions into tooz exceptions."""
@ -66,8 +71,8 @@ class Etcd3Lock(locking.Lock):
self._timeout = timeout
self._coord = coord
self._key = self.LOCK_PREFIX + name
self._key_b64 = base64.b64encode(self._key).decode("ascii")
self._uuid = base64.b64encode(uuid.uuid4().bytes).decode("ascii")
self._key_b64 = _encode(self._key)
self._uuid = _encode(uuid.uuid4().bytes)
self._exclusive_access = threading.Lock()
@_translate_failures
@ -156,7 +161,7 @@ class Etcd3Lock(locking.Lock):
return False
class Etcd3Driver(coordination.CoordinationDriver):
class Etcd3Driver(coordination.CoordinationDriverWithExecutor):
"""An etcd based driver.
This driver uses etcd provide the coordination driver semantics and
@ -172,6 +177,8 @@ class Etcd3Driver(coordination.CoordinationDriver):
#: Default port used if none provided (4001 or 2379 are the common ones).
DEFAULT_PORT = 2379
GROUP_PREFIX = b"tooz/groups/"
def __init__(self, member_id, parsed_url, options):
super(Etcd3Driver, self).__init__(member_id, parsed_url, options)
host = parsed_url.hostname or self.DEFAULT_HOST
@ -180,8 +187,14 @@ class Etcd3Driver(coordination.CoordinationDriver):
timeout = int(options.get('timeout', self.DEFAULT_TIMEOUT))
self.client = etcd3gw.client(host=host, port=port, timeout=timeout)
self.lock_timeout = int(options.get('lock_timeout', timeout))
self.membership_timeout = int(options.get(
'membership_timeout', timeout))
self._acquired_locks = set()
def _start(self):
super(Etcd3Driver, self)._start()
self._membership_lease = self.client.lease(self.membership_timeout)
def get_lock(self, name):
return Etcd3Lock(self, name, self.lock_timeout)
@ -202,3 +215,193 @@ class Etcd3Driver(coordination.CoordinationDriver):
def unwatch_leave_group(self, group_id, callback):
raise tooz.NotImplemented
def _encode_group_id(self, group_id):
return _encode(self._prefix_group(group_id))
def _prefix_group(self, group_id):
return b"%s%s/" % (self.GROUP_PREFIX, group_id)
def create_group(self, group_id):
@_translate_failures
def _create_group():
encoded_group = self._encode_group_id(group_id)
txn = {
'compare': [{
'key': encoded_group,
'result': 'EQUAL',
'target': 'VERSION',
'version': 0
}],
'success': [{
'request_put': {
'key': encoded_group,
# We shouldn't need a value, but etcd3gw needs it for
# now
'value': encoded_group
}
}],
'failure': []
}
result = self.client.transaction(txn)
if not result.get("succeeded"):
raise coordination.GroupAlreadyExist(group_id)
return coordination.CoordinatorResult(
self._executor.submit(_create_group))
def _destroy_group(self, group_id):
self.client.delete(group_id)
def delete_group(self, group_id):
@_translate_failures
def _delete_group():
prefix_group = self._prefix_group(group_id)
members = self.client.get_prefix(prefix_group)
if len(members) > 1:
raise coordination.GroupNotEmpty(group_id)
encoded_group = self._encode_group_id(group_id)
txn = {
'compare': [{
'key': encoded_group,
'result': 'NOT_EQUAL',
'target': 'VERSION',
'version': 0
}],
'success': [{
'request_delete_range': {
'key': encoded_group,
}
}],
'failure': []
}
result = self.client.transaction(txn)
if not result.get("succeeded"):
raise coordination.GroupNotCreated(group_id)
return coordination.CoordinatorResult(
self._executor.submit(_delete_group))
def join_group(self, group_id, capabilities=b""):
@_retry.retry()
@_translate_failures
def _join_group():
prefix_group = self._prefix_group(group_id)
prefix_member = prefix_group + self._member_id
members = self.client.get_prefix(prefix_group)
encoded_member = _encode(prefix_member)
group_metadata = None
for cap, metadata in members:
if metadata['key'] == prefix_member:
raise coordination.MemberAlreadyExist(group_id,
self._member_id)
if metadata['key'] == prefix_group:
group_metadata = metadata
if group_metadata is None:
raise coordination.GroupNotCreated(group_id)
encoded_group = self._encode_group_id(group_id)
txn = {
'compare': [{
'key': encoded_group,
'result': 'EQUAL',
'target': 'VERSION',
'version': int(group_metadata['version'])
}],
'success': [{
'request_put': {
'key': encoded_member,
'value': _encode(utils.dumps(capabilities)),
'lease': self._membership_lease.id
}
}],
'failure': []
}
result = self.client.transaction(txn)
if not result.get('succeeded'):
raise _retry.TryAgain
else:
self._joined_groups.add(group_id)
return coordination.CoordinatorResult(
self._executor.submit(_join_group))
def leave_group(self, group_id):
@_translate_failures
def _leave_group():
prefix_group = self._prefix_group(group_id)
prefix_member = prefix_group + self._member_id
members = self.client.get_prefix(prefix_group)
for capabilities, metadata in members:
if metadata['key'] == prefix_member:
break
else:
raise coordination.MemberNotJoined(group_id,
self._member_id)
self.client.delete(prefix_member)
self._joined_groups.discard(group_id)
return coordination.CoordinatorResult(
self._executor.submit(_leave_group))
def get_members(self, group_id):
@_translate_failures
def _get_members():
prefix_group = self._prefix_group(group_id)
members = set()
group_found = False
for cap, metadata in self.client.get_prefix(prefix_group):
if metadata['key'] == prefix_group:
group_found = True
else:
members.add(metadata['key'][len(prefix_group):])
if not group_found:
raise coordination.GroupNotCreated(group_id)
return members
return coordination.CoordinatorResult(
self._executor.submit(_get_members))
def get_member_capabilities(self, group_id, member_id):
@_translate_failures
def _get_member_capabilities():
prefix_member = self._prefix_group(group_id) + member_id
result = self.client.get(prefix_member)
if not result:
raise coordination.MemberNotJoined(group_id, member_id)
return utils.loads(result[0])
return coordination.CoordinatorResult(
self._executor.submit(_get_member_capabilities))
def update_capabilities(self, group_id, capabilities):
@_translate_failures
def _update_capabilities():
prefix_member = self._prefix_group(group_id) + self._member_id
result = self.client.get(prefix_member)
if not result:
raise coordination.MemberNotJoined(group_id, self._member_id)
self.client.put(prefix_member, utils.dumps(capabilities),
lease=self._membership_lease)
return coordination.CoordinatorResult(
self._executor.submit(_update_capabilities))
def get_groups(self):
@_translate_failures
def _get_groups():
groups = self.client.get_prefix(self.GROUP_PREFIX)
return [
group[1]['key'][len(self.GROUP_PREFIX):-1] for group in groups]
return coordination.CoordinatorResult(
self._executor.submit(_get_groups))