diff --git a/releasenotes/notes/etcd3gw-group-support-598832a8764a8aa6.yaml b/releasenotes/notes/etcd3gw-group-support-598832a8764a8aa6.yaml new file mode 100644 index 00000000..02310605 --- /dev/null +++ b/releasenotes/notes/etcd3gw-group-support-598832a8764a8aa6.yaml @@ -0,0 +1,4 @@ +--- +features: + - | + The etcd3gw driver now supports the group membership API. diff --git a/tooz/drivers/etcd3gw.py b/tooz/drivers/etcd3gw.py index d2a7781c..c229e6be 100644 --- a/tooz/drivers/etcd3gw.py +++ b/tooz/drivers/etcd3gw.py @@ -29,6 +29,11 @@ from tooz import locking from tooz import utils +def _encode(data): + """Safely encode data for consumption of the gateway.""" + return base64.b64encode(data).decode("ascii") + + def _translate_failures(func): """Translates common requests exceptions into tooz exceptions.""" @@ -66,8 +71,8 @@ class Etcd3Lock(locking.Lock): self._timeout = timeout self._coord = coord self._key = self.LOCK_PREFIX + name - self._key_b64 = base64.b64encode(self._key).decode("ascii") - self._uuid = base64.b64encode(uuid.uuid4().bytes).decode("ascii") + self._key_b64 = _encode(self._key) + self._uuid = _encode(uuid.uuid4().bytes) self._exclusive_access = threading.Lock() @_translate_failures @@ -156,7 +161,7 @@ class Etcd3Lock(locking.Lock): return False -class Etcd3Driver(coordination.CoordinationDriver): +class Etcd3Driver(coordination.CoordinationDriverWithExecutor): """An etcd based driver. This driver uses etcd provide the coordination driver semantics and @@ -172,6 +177,8 @@ class Etcd3Driver(coordination.CoordinationDriver): #: Default port used if none provided (4001 or 2379 are the common ones). DEFAULT_PORT = 2379 + GROUP_PREFIX = b"tooz/groups/" + def __init__(self, member_id, parsed_url, options): super(Etcd3Driver, self).__init__(member_id, parsed_url, options) host = parsed_url.hostname or self.DEFAULT_HOST @@ -180,8 +187,14 @@ class Etcd3Driver(coordination.CoordinationDriver): timeout = int(options.get('timeout', self.DEFAULT_TIMEOUT)) self.client = etcd3gw.client(host=host, port=port, timeout=timeout) self.lock_timeout = int(options.get('lock_timeout', timeout)) + self.membership_timeout = int(options.get( + 'membership_timeout', timeout)) self._acquired_locks = set() + def _start(self): + super(Etcd3Driver, self)._start() + self._membership_lease = self.client.lease(self.membership_timeout) + def get_lock(self, name): return Etcd3Lock(self, name, self.lock_timeout) @@ -202,3 +215,193 @@ class Etcd3Driver(coordination.CoordinationDriver): def unwatch_leave_group(self, group_id, callback): raise tooz.NotImplemented + + def _encode_group_id(self, group_id): + return _encode(self._prefix_group(group_id)) + + def _prefix_group(self, group_id): + return b"%s%s/" % (self.GROUP_PREFIX, group_id) + + def create_group(self, group_id): + @_translate_failures + def _create_group(): + encoded_group = self._encode_group_id(group_id) + txn = { + 'compare': [{ + 'key': encoded_group, + 'result': 'EQUAL', + 'target': 'VERSION', + 'version': 0 + }], + 'success': [{ + 'request_put': { + 'key': encoded_group, + # We shouldn't need a value, but etcd3gw needs it for + # now + 'value': encoded_group + } + }], + 'failure': [] + } + result = self.client.transaction(txn) + if not result.get("succeeded"): + raise coordination.GroupAlreadyExist(group_id) + + return coordination.CoordinatorResult( + self._executor.submit(_create_group)) + + def _destroy_group(self, group_id): + self.client.delete(group_id) + + def delete_group(self, group_id): + @_translate_failures + def _delete_group(): + prefix_group = self._prefix_group(group_id) + members = self.client.get_prefix(prefix_group) + if len(members) > 1: + raise coordination.GroupNotEmpty(group_id) + + encoded_group = self._encode_group_id(group_id) + txn = { + 'compare': [{ + 'key': encoded_group, + 'result': 'NOT_EQUAL', + 'target': 'VERSION', + 'version': 0 + }], + 'success': [{ + 'request_delete_range': { + 'key': encoded_group, + } + }], + 'failure': [] + } + result = self.client.transaction(txn) + + if not result.get("succeeded"): + raise coordination.GroupNotCreated(group_id) + + return coordination.CoordinatorResult( + self._executor.submit(_delete_group)) + + def join_group(self, group_id, capabilities=b""): + @_retry.retry() + @_translate_failures + def _join_group(): + prefix_group = self._prefix_group(group_id) + prefix_member = prefix_group + self._member_id + members = self.client.get_prefix(prefix_group) + + encoded_member = _encode(prefix_member) + + group_metadata = None + for cap, metadata in members: + if metadata['key'] == prefix_member: + raise coordination.MemberAlreadyExist(group_id, + self._member_id) + if metadata['key'] == prefix_group: + group_metadata = metadata + + if group_metadata is None: + raise coordination.GroupNotCreated(group_id) + + encoded_group = self._encode_group_id(group_id) + txn = { + 'compare': [{ + 'key': encoded_group, + 'result': 'EQUAL', + 'target': 'VERSION', + 'version': int(group_metadata['version']) + }], + 'success': [{ + 'request_put': { + 'key': encoded_member, + 'value': _encode(utils.dumps(capabilities)), + 'lease': self._membership_lease.id + } + }], + 'failure': [] + } + result = self.client.transaction(txn) + if not result.get('succeeded'): + raise _retry.TryAgain + else: + self._joined_groups.add(group_id) + + return coordination.CoordinatorResult( + self._executor.submit(_join_group)) + + def leave_group(self, group_id): + @_translate_failures + def _leave_group(): + prefix_group = self._prefix_group(group_id) + prefix_member = prefix_group + self._member_id + members = self.client.get_prefix(prefix_group) + for capabilities, metadata in members: + if metadata['key'] == prefix_member: + break + else: + raise coordination.MemberNotJoined(group_id, + self._member_id) + + self.client.delete(prefix_member) + self._joined_groups.discard(group_id) + + return coordination.CoordinatorResult( + self._executor.submit(_leave_group)) + + def get_members(self, group_id): + @_translate_failures + def _get_members(): + prefix_group = self._prefix_group(group_id) + members = set() + group_found = False + + for cap, metadata in self.client.get_prefix(prefix_group): + if metadata['key'] == prefix_group: + group_found = True + else: + members.add(metadata['key'][len(prefix_group):]) + + if not group_found: + raise coordination.GroupNotCreated(group_id) + + return members + + return coordination.CoordinatorResult( + self._executor.submit(_get_members)) + + def get_member_capabilities(self, group_id, member_id): + @_translate_failures + def _get_member_capabilities(): + prefix_member = self._prefix_group(group_id) + member_id + result = self.client.get(prefix_member) + if not result: + raise coordination.MemberNotJoined(group_id, member_id) + return utils.loads(result[0]) + + return coordination.CoordinatorResult( + self._executor.submit(_get_member_capabilities)) + + def update_capabilities(self, group_id, capabilities): + @_translate_failures + def _update_capabilities(): + prefix_member = self._prefix_group(group_id) + self._member_id + result = self.client.get(prefix_member) + if not result: + raise coordination.MemberNotJoined(group_id, self._member_id) + + self.client.put(prefix_member, utils.dumps(capabilities), + lease=self._membership_lease) + + return coordination.CoordinatorResult( + self._executor.submit(_update_capabilities)) + + def get_groups(self): + @_translate_failures + def _get_groups(): + groups = self.client.get_prefix(self.GROUP_PREFIX) + return [ + group[1]['key'][len(self.GROUP_PREFIX):-1] for group in groups] + return coordination.CoordinatorResult( + self._executor.submit(_get_groups))