From 6ec73a0bed7382790819151a0a84db2a8a464438 Mon Sep 17 00:00:00 2001 From: Davanum Srinivas Date: Thu, 18 May 2017 16:24:54 -0400 Subject: [PATCH] Separate etcd3gw driver that uses the etcd3 grpc gateway Using grpc to directly access etcd3 does not work well with eventlet based services. So we need to use the grpc HTTP gateway API (/v3alpha). https://coreos.com/etcd/docs/latest/dev-guide/api_grpc_gateway.html For this we use the etcd3gw package: https://pypi.python.org/pypi/etcd3gw The structure of the code is very similar to the one we already have that uses the etcd3 package. Change-Id: I97bd7ffb05a7e40cb08c9b9d62cc45236ad292aa --- setup.cfg | 3 + tooz/drivers/etcd3gw.py | 193 ++++++++++++++++++++++++++++++++++++++++ tooz/tests/__init__.py | 2 + tox.ini | 15 +++- 4 files changed, 211 insertions(+), 2 deletions(-) create mode 100644 tooz/drivers/etcd3gw.py diff --git a/setup.cfg b/setup.cfg index 8719884..c0d311e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -27,6 +27,7 @@ packages = tooz.backends = etcd = tooz.drivers.etcd:EtcdDriver etcd3 = tooz.drivers.etcd3:Etcd3Driver + etcd3+http = tooz.drivers.etcd3gw:Etcd3Driver kazoo = tooz.drivers.zookeeper:KazooDriver zake = tooz.drivers.zake:ZakeDriver memcached = tooz.drivers.memcached:MemcachedDriver @@ -45,6 +46,8 @@ etcd = requests>=2.10.0 # Apache-2.0 etcd3 = etcd3>=0.5.1 # Apache-2.0 +etcd3gw = + etcd3gw>=0.1.0 # Apache-2.0 zake = zake>=0.1.6 # Apache-2.0 redis = diff --git a/tooz/drivers/etcd3gw.py b/tooz/drivers/etcd3gw.py new file mode 100644 index 0000000..c73a854 --- /dev/null +++ b/tooz/drivers/etcd3gw.py @@ -0,0 +1,193 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from __future__ import absolute_import +import base64 +import uuid + +import etcd3gw +from etcd3gw import exceptions as etcd3_exc +from oslo_utils import encodeutils +import six + +import tooz +from tooz import _retry +from tooz import coordination +from tooz import locking +from tooz import utils + + +def _translate_failures(func): + """Translates common requests exceptions into tooz exceptions.""" + + @six.wraps(func) + def wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except etcd3_exc.ConnectionFailedError as e: + utils.raise_with_cause(coordination.ToozConnectionError, + encodeutils.exception_to_unicode(e), + cause=e) + except etcd3_exc.ConnectionTimeoutError as e: + utils.raise_with_cause(coordination.OperationTimedOut, + encodeutils.exception_to_unicode(e), + cause=e) + except etcd3_exc.Etcd3Exception as e: + utils.raise_with_cause(coordination.ToozError, + encodeutils.exception_to_unicode(e), + cause=e) + + return wrapper + + +class Etcd3Lock(locking.Lock): + """An etcd3-specific lock. + + Thin wrapper over etcd3's lock object basically to provide the heartbeat() + semantics for the coordination driver. + """ + + LOCK_PREFIX = b"/tooz/locks" + + def __init__(self, coord, name, timeout): + super(Etcd3Lock, self).__init__(name) + self._timeout = timeout + self._coord = coord + self._key = self.LOCK_PREFIX + name + self._key_b64 = base64.b64encode(self._key).decode("ascii") + self._uuid = base64.b64encode(uuid.uuid4().bytes).decode("ascii") + self._lease = self._coord.client.lease(self._timeout) + + @_translate_failures + def acquire(self, blocking=True, shared=False): + if shared: + raise tooz.NotImplemented + + @_retry.retry(stop_max_delay=blocking) + def _acquire(): + # TODO(jd): save the created revision so we can check it later to + # make sure we still have the lock + txn = { + 'compare': [{ + 'key': self._key_b64, + 'result': 'EQUAL', + 'target': 'CREATE', + 'create_revision': 0 + }], + 'success': [{ + 'request_put': { + 'key': self._key_b64, + 'value': self._uuid, + 'lease': self._lease.id + } + }], + 'failure': [{ + 'request_range': { + 'key': self._key_b64 + } + }] + } + result = self._coord.client.transaction(txn) + success = result.get('succeeded', False) + + if success is not True: + if blocking is False: + return False + raise _retry.TryAgain + self._coord._acquired_locks.add(self) + return True + + return _acquire() + + @_translate_failures + def release(self): + txn = { + 'compare': [{ + 'key': self._key_b64, + 'result': 'EQUAL', + 'target': 'VALUE', + 'value': self._uuid + }], + 'success': [{ + 'request_delete_range': { + 'key': self._key_b64 + } + }] + } + + result = self._coord.client.transaction(txn) + success = result.get('succeeded', False) + if success: + self._coord._acquired_locks.remove(self) + return True + return False + + @_translate_failures + def break_(self): + if self._coord.client.delete(self._key): + self._coord._acquired_locks.discard(self) + return True + return False + + @_translate_failures + def heartbeat(self): + self._lease.refresh() + + +class Etcd3Driver(coordination.CoordinationDriver): + """An etcd based driver. + + This driver uses etcd provide the coordination driver semantics and + required API(s). + """ + + #: Default socket/lock/member/leader timeout used when none is provided. + DEFAULT_TIMEOUT = 30 + + #: Default hostname used when none is provided. + DEFAULT_HOST = "localhost" + + #: Default port used if none provided (4001 or 2379 are the common ones). + DEFAULT_PORT = 2379 + + def __init__(self, member_id, parsed_url, options): + super(Etcd3Driver, self).__init__(member_id) + host = parsed_url.hostname or self.DEFAULT_HOST + port = parsed_url.port or self.DEFAULT_PORT + options = utils.collapse(options) + timeout = int(options.get('timeout', self.DEFAULT_TIMEOUT)) + self.client = etcd3gw.client(host=host, port=port, timeout=timeout) + self.lock_timeout = int(options.get('lock_timeout', timeout)) + self._acquired_locks = set() + + def get_lock(self, name): + return Etcd3Lock(self, name, self.lock_timeout) + + def heartbeat(self): + # NOTE(jaypipes): Copying because set can mutate during iteration + for lock in self._acquired_locks.copy(): + lock.heartbeat() + return self.lock_timeout + + def watch_join_group(self, group_id, callback): + raise tooz.NotImplemented + + def unwatch_join_group(self, group_id, callback): + raise tooz.NotImplemented + + def watch_leave_group(self, group_id, callback): + raise tooz.NotImplemented + + def unwatch_leave_group(self, group_id, callback): + raise tooz.NotImplemented diff --git a/tooz/tests/__init__.py b/tooz/tests/__init__.py index be72823..505ef5f 100644 --- a/tooz/tests/__init__.py +++ b/tooz/tests/__init__.py @@ -59,6 +59,8 @@ class TestWithCoordinator(testcase.TestCase): raise RuntimeError("No URL set for this driver") if os.getenv("TOOZ_TEST_ETCD3"): self.url = self.url.replace("etcd://", "etcd3://") + if os.getenv("TOOZ_TEST_ETCD3GW"): + self.url = self.url.replace("etcd://", "etcd3+http://") self.useFixture(fixtures.NestedTempfile()) self.group_id = get_random_uuid() self.member_id = get_random_uuid() diff --git a/tox.ini b/tox.ini index 15cfab2..f6bfe17 100644 --- a/tox.ini +++ b/tox.ini @@ -1,11 +1,11 @@ [tox] minversion = 1.8 -envlist = py27,py35,py27-zookeeper,py35-zookeeper,py27-redis,py35-redis,py27-sentinel,py35-sentinel,py27-memcached,py35-memcached,py27-postgresql,py35-postgresql,py27-mysql,py35-mysql,py27-consul,py35-consul,py27-etcd3,py35-etcd3,pep8 +envlist = py27,py35,py27-zookeeper,py35-zookeeper,py27-redis,py35-redis,py27-sentinel,py35-sentinel,py27-memcached,py35-memcached,py27-postgresql,py35-postgresql,py27-mysql,py35-mysql,py27-consul,py35-consul,py27-etcd3,py35-etcd3,py27-etcd3gw,py35-etcd3gw,pep8 [testenv] # We need to install a bit more than just `test' because those drivers have # custom tests that we always run -deps = .[test,zake,ipc,memcached,mysql,etcd,etcd3] +deps = .[test,zake,ipc,memcached,mysql,etcd,etcd3,etcd3gw] zookeeper: .[zookeeper] redis: .[redis] sentinel: .[redis] @@ -14,6 +14,7 @@ deps = .[test,zake,ipc,memcached,mysql,etcd,etcd3] mysql: .[mysql] etcd: .[etcd] etcd3: .[etcd3] + etcd3gw: .[etcd3gw] consul: .[consul] setenv = TOOZ_TEST_URLS = file:///tmp zake:// ipc:// @@ -81,6 +82,16 @@ setenv = TOOZ_TEST_ETCD3=1 commands = {toxinidir}/setup-etcd-env.sh pifpaf -e TOOZ_TEST run etcd -- {toxinidir}/tools/pretty_tox.sh "{posargs}" {toxinidir}/setup-etcd-env.sh pifpaf -e TOOZ_TEST run etcd --cluster -- {toxinidir}/tools/pretty_tox.sh "{posargs}" +[testenv:py27-etcd3gw] +setenv = TOOZ_TEST_ETCD3GW=1 +commands = {toxinidir}/setup-etcd-env.sh pifpaf -e TOOZ_TEST run etcd -- {toxinidir}/tools/pretty_tox.sh "{posargs}" + {toxinidir}/setup-etcd-env.sh pifpaf -e TOOZ_TEST run etcd --cluster -- {toxinidir}/tools/pretty_tox.sh "{posargs}" + +[testenv:py35-etcd3gw] +setenv = TOOZ_TEST_ETCD3GW=1 +commands = {toxinidir}/setup-etcd-env.sh pifpaf -e TOOZ_TEST run etcd -- {toxinidir}/tools/pretty_tox.sh "{posargs}" + {toxinidir}/setup-etcd-env.sh pifpaf -e TOOZ_TEST run etcd --cluster -- {toxinidir}/tools/pretty_tox.sh "{posargs}" + [testenv:py27-consul] commands = {toxinidir}/setup-consul-env.sh pifpaf -e TOOZ_TEST run consul -- {toxinidir}/tools/pretty_tox.sh "{posargs}"