summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.openstack.org>2018-04-27 15:11:08 +0000
committerGerrit Code Review <review@openstack.org>2018-04-27 15:11:08 +0000
commit2ac3ef36d2687f820cb44f6b27dd81ebf1c4165b (patch)
tree26f4fe364dcf9d7bec26533a3512baa7c9e1b419
parent96ce23ad12d0104cba9d32417bb094d1c3ac8d38 (diff)
parent6ab8c380c8d6a2e15611b225da7594e820cc773e (diff)
Merge "Implement group support for etcd3gw"1.62.0
-rw-r--r--releasenotes/notes/etcd3gw-group-support-598832a8764a8aa6.yaml4
-rw-r--r--tooz/drivers/etcd3gw.py209
2 files changed, 210 insertions, 3 deletions
diff --git a/releasenotes/notes/etcd3gw-group-support-598832a8764a8aa6.yaml b/releasenotes/notes/etcd3gw-group-support-598832a8764a8aa6.yaml
new file mode 100644
index 0000000..0231060
--- /dev/null
+++ b/releasenotes/notes/etcd3gw-group-support-598832a8764a8aa6.yaml
@@ -0,0 +1,4 @@
1---
2features:
3 - |
4 The etcd3gw driver now supports the group membership API.
diff --git a/tooz/drivers/etcd3gw.py b/tooz/drivers/etcd3gw.py
index d2a7781..c229e6b 100644
--- a/tooz/drivers/etcd3gw.py
+++ b/tooz/drivers/etcd3gw.py
@@ -29,6 +29,11 @@ from tooz import locking
29from tooz import utils 29from tooz import utils
30 30
31 31
32def _encode(data):
33 """Safely encode data for consumption of the gateway."""
34 return base64.b64encode(data).decode("ascii")
35
36
32def _translate_failures(func): 37def _translate_failures(func):
33 """Translates common requests exceptions into tooz exceptions.""" 38 """Translates common requests exceptions into tooz exceptions."""
34 39
@@ -66,8 +71,8 @@ class Etcd3Lock(locking.Lock):
66 self._timeout = timeout 71 self._timeout = timeout
67 self._coord = coord 72 self._coord = coord
68 self._key = self.LOCK_PREFIX + name 73 self._key = self.LOCK_PREFIX + name
69 self._key_b64 = base64.b64encode(self._key).decode("ascii") 74 self._key_b64 = _encode(self._key)
70 self._uuid = base64.b64encode(uuid.uuid4().bytes).decode("ascii") 75 self._uuid = _encode(uuid.uuid4().bytes)
71 self._exclusive_access = threading.Lock() 76 self._exclusive_access = threading.Lock()
72 77
73 @_translate_failures 78 @_translate_failures
@@ -156,7 +161,7 @@ class Etcd3Lock(locking.Lock):
156 return False 161 return False
157 162
158 163
159class Etcd3Driver(coordination.CoordinationDriver): 164class Etcd3Driver(coordination.CoordinationDriverWithExecutor):
160 """An etcd based driver. 165 """An etcd based driver.
161 166
162 This driver uses etcd provide the coordination driver semantics and 167 This driver uses etcd provide the coordination driver semantics and
@@ -172,6 +177,8 @@ class Etcd3Driver(coordination.CoordinationDriver):
172 #: Default port used if none provided (4001 or 2379 are the common ones). 177 #: Default port used if none provided (4001 or 2379 are the common ones).
173 DEFAULT_PORT = 2379 178 DEFAULT_PORT = 2379
174 179
180 GROUP_PREFIX = b"tooz/groups/"
181
175 def __init__(self, member_id, parsed_url, options): 182 def __init__(self, member_id, parsed_url, options):
176 super(Etcd3Driver, self).__init__(member_id, parsed_url, options) 183 super(Etcd3Driver, self).__init__(member_id, parsed_url, options)
177 host = parsed_url.hostname or self.DEFAULT_HOST 184 host = parsed_url.hostname or self.DEFAULT_HOST
@@ -180,8 +187,14 @@ class Etcd3Driver(coordination.CoordinationDriver):
180 timeout = int(options.get('timeout', self.DEFAULT_TIMEOUT)) 187 timeout = int(options.get('timeout', self.DEFAULT_TIMEOUT))
181 self.client = etcd3gw.client(host=host, port=port, timeout=timeout) 188 self.client = etcd3gw.client(host=host, port=port, timeout=timeout)
182 self.lock_timeout = int(options.get('lock_timeout', timeout)) 189 self.lock_timeout = int(options.get('lock_timeout', timeout))
190 self.membership_timeout = int(options.get(
191 'membership_timeout', timeout))
183 self._acquired_locks = set() 192 self._acquired_locks = set()
184 193
194 def _start(self):
195 super(Etcd3Driver, self)._start()
196 self._membership_lease = self.client.lease(self.membership_timeout)
197
185 def get_lock(self, name): 198 def get_lock(self, name):
186 return Etcd3Lock(self, name, self.lock_timeout) 199 return Etcd3Lock(self, name, self.lock_timeout)
187 200
@@ -202,3 +215,193 @@ class Etcd3Driver(coordination.CoordinationDriver):
202 215
203 def unwatch_leave_group(self, group_id, callback): 216 def unwatch_leave_group(self, group_id, callback):
204 raise tooz.NotImplemented 217 raise tooz.NotImplemented
218
219 def _encode_group_id(self, group_id):
220 return _encode(self._prefix_group(group_id))
221
222 def _prefix_group(self, group_id):
223 return b"%s%s/" % (self.GROUP_PREFIX, group_id)
224
225 def create_group(self, group_id):
226 @_translate_failures
227 def _create_group():
228 encoded_group = self._encode_group_id(group_id)
229 txn = {
230 'compare': [{
231 'key': encoded_group,
232 'result': 'EQUAL',
233 'target': 'VERSION',
234 'version': 0
235 }],
236 'success': [{
237 'request_put': {
238 'key': encoded_group,
239 # We shouldn't need a value, but etcd3gw needs it for
240 # now
241 'value': encoded_group
242 }
243 }],
244 'failure': []
245 }
246 result = self.client.transaction(txn)
247 if not result.get("succeeded"):
248 raise coordination.GroupAlreadyExist(group_id)
249
250 return coordination.CoordinatorResult(
251 self._executor.submit(_create_group))
252
253 def _destroy_group(self, group_id):
254 self.client.delete(group_id)
255
256 def delete_group(self, group_id):
257 @_translate_failures
258 def _delete_group():
259 prefix_group = self._prefix_group(group_id)
260 members = self.client.get_prefix(prefix_group)
261 if len(members) > 1:
262 raise coordination.GroupNotEmpty(group_id)
263
264 encoded_group = self._encode_group_id(group_id)
265 txn = {
266 'compare': [{
267 'key': encoded_group,
268 'result': 'NOT_EQUAL',
269 'target': 'VERSION',
270 'version': 0
271 }],
272 'success': [{
273 'request_delete_range': {
274 'key': encoded_group,
275 }
276 }],
277 'failure': []
278 }
279 result = self.client.transaction(txn)
280
281 if not result.get("succeeded"):
282 raise coordination.GroupNotCreated(group_id)
283
284 return coordination.CoordinatorResult(
285 self._executor.submit(_delete_group))
286
287 def join_group(self, group_id, capabilities=b""):
288 @_retry.retry()
289 @_translate_failures
290 def _join_group():
291 prefix_group = self._prefix_group(group_id)
292 prefix_member = prefix_group + self._member_id
293 members = self.client.get_prefix(prefix_group)
294
295 encoded_member = _encode(prefix_member)
296
297 group_metadata = None
298 for cap, metadata in members:
299 if metadata['key'] == prefix_member:
300 raise coordination.MemberAlreadyExist(group_id,
301 self._member_id)
302 if metadata['key'] == prefix_group:
303 group_metadata = metadata
304
305 if group_metadata is None:
306 raise coordination.GroupNotCreated(group_id)
307
308 encoded_group = self._encode_group_id(group_id)
309 txn = {
310 'compare': [{
311 'key': encoded_group,
312 'result': 'EQUAL',
313 'target': 'VERSION',
314 'version': int(group_metadata['version'])
315 }],
316 'success': [{
317 'request_put': {
318 'key': encoded_member,
319 'value': _encode(utils.dumps(capabilities)),
320 'lease': self._membership_lease.id
321 }
322 }],
323 'failure': []
324 }
325 result = self.client.transaction(txn)
326 if not result.get('succeeded'):
327 raise _retry.TryAgain
328 else:
329 self._joined_groups.add(group_id)
330
331 return coordination.CoordinatorResult(
332 self._executor.submit(_join_group))
333
334 def leave_group(self, group_id):
335 @_translate_failures
336 def _leave_group():
337 prefix_group = self._prefix_group(group_id)
338 prefix_member = prefix_group + self._member_id
339 members = self.client.get_prefix(prefix_group)
340 for capabilities, metadata in members:
341 if metadata['key'] == prefix_member:
342 break
343 else:
344 raise coordination.MemberNotJoined(group_id,
345 self._member_id)
346
347 self.client.delete(prefix_member)
348 self._joined_groups.discard(group_id)
349
350 return coordination.CoordinatorResult(
351 self._executor.submit(_leave_group))
352
353 def get_members(self, group_id):
354 @_translate_failures
355 def _get_members():
356 prefix_group = self._prefix_group(group_id)
357 members = set()
358 group_found = False
359
360 for cap, metadata in self.client.get_prefix(prefix_group):
361 if metadata['key'] == prefix_group:
362 group_found = True
363 else:
364 members.add(metadata['key'][len(prefix_group):])
365
366 if not group_found:
367 raise coordination.GroupNotCreated(group_id)
368
369 return members
370
371 return coordination.CoordinatorResult(
372 self._executor.submit(_get_members))
373
374 def get_member_capabilities(self, group_id, member_id):
375 @_translate_failures
376 def _get_member_capabilities():
377 prefix_member = self._prefix_group(group_id) + member_id
378 result = self.client.get(prefix_member)
379 if not result:
380 raise coordination.MemberNotJoined(group_id, member_id)
381 return utils.loads(result[0])
382
383 return coordination.CoordinatorResult(
384 self._executor.submit(_get_member_capabilities))
385
386 def update_capabilities(self, group_id, capabilities):
387 @_translate_failures
388 def _update_capabilities():
389 prefix_member = self._prefix_group(group_id) + self._member_id
390 result = self.client.get(prefix_member)
391 if not result:
392 raise coordination.MemberNotJoined(group_id, self._member_id)
393
394 self.client.put(prefix_member, utils.dumps(capabilities),
395 lease=self._membership_lease)
396
397 return coordination.CoordinatorResult(
398 self._executor.submit(_update_capabilities))
399
400 def get_groups(self):
401 @_translate_failures
402 def _get_groups():
403 groups = self.client.get_prefix(self.GROUP_PREFIX)
404 return [
405 group[1]['key'][len(self.GROUP_PREFIX):-1] for group in groups]
406 return coordination.CoordinatorResult(
407 self._executor.submit(_get_groups))