Merge "Remove etcd3 drvier"

This commit is contained in:
Zuul 2023-11-23 18:17:04 +00:00 committed by Gerrit Code Review
commit ab8caa2ffb
10 changed files with 6 additions and 484 deletions

View File

@ -7,10 +7,6 @@ reno>=3.1.0 # Apache-2.0
python-consul2>=0.0.16 # MIT License
## etcd
requests>=2.10.0 # Apache-2.0
## etcd3
etcd3>=0.12.0 # Apache-2.0
grpcio>=1.18.0
protobuf # BSD License (3 clause)
## etcd3gw
etcd3gw!=0.2.6,>=0.1.0 # Apache-2.0
## zake

View File

@ -20,12 +20,6 @@ Etcd
.. autoclass:: tooz.drivers.etcd.EtcdDriver
:members:
Etcd3
~~~~~
.. autoclass:: tooz.drivers.etcd3.Etcd3Driver
:members:
Etcd3gw
~~~~~~~

View File

@ -198,21 +198,6 @@ The etcd driver is a driver providing only distributed locks (for now)
and is based on the `etcd server`_ supported key/value storage and
associated primitives.
Etcd3
-----
**Driver:** :py:class:`tooz.drivers.etcd3.Etcd3Driver`
**Characteristics:** :py:attr:`tooz.drivers.etcd3.Etcd3Driver.CHARACTERISTICS`
**Entrypoint name:** ``etcd3``
**Summary:**
The etcd3 driver is a driver providing only distributed locks (for now)
and is based on the `etcd server`_ supported key/value storage and
associated primitives.
Etcd3 Gateway
-------------

View File

@ -0,0 +1,4 @@
---
upgrade:
- |
The etcd3 driver has been removed.

View File

@ -29,7 +29,6 @@ packages =
[entry_points]
tooz.backends =
etcd = tooz.drivers.etcd:EtcdDriver
etcd3 = tooz.drivers.etcd3:Etcd3Driver
etcd3+http = tooz.drivers.etcd3gw:Etcd3Driver
etcd3+https = tooz.drivers.etcd3gw:Etcd3Driver
kazoo = tooz.drivers.zookeeper:KazooDriver
@ -48,10 +47,6 @@ consul =
python-consul2>=0.0.16 # MIT License
etcd =
requests>=2.10.0 # Apache-2.0
etcd3 =
etcd3>=0.12.0 # Apache-2.0
grpcio>=1.18.0
protobuf<4.0 # BSD License (3 clause)
etcd3gw =
etcd3gw!=0.2.6,>=0.1.0 # Apache-2.0
zake =

View File

@ -1,372 +0,0 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import functools
import threading
import etcd3
from etcd3 import exceptions as etcd3_exc
from oslo_utils import encodeutils
import tooz
from tooz import _retry
from tooz import coordination
from tooz import locking
from tooz import utils
import warnings
@contextlib.contextmanager
def _failure_translator():
"""Translates common requests exceptions into tooz exceptions."""
try:
yield
except etcd3_exc.ConnectionFailedError as e:
utils.raise_with_cause(coordination.ToozConnectionError,
encodeutils.exception_to_unicode(e),
cause=e)
except etcd3_exc.ConnectionTimeoutError as e:
utils.raise_with_cause(coordination.OperationTimedOut,
encodeutils.exception_to_unicode(e),
cause=e)
except etcd3_exc.Etcd3Exception as e:
utils.raise_with_cause(coordination.ToozError,
encodeutils.exception_to_unicode(e),
cause=e)
def _translate_failures(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
with _failure_translator():
return func(*args, **kwargs)
return wrapper
class Etcd3Lock(locking.Lock):
"""An etcd3-specific lock.
Thin wrapper over etcd3's lock object basically to provide the heartbeat()
semantics for the coordination driver.
"""
LOCK_PREFIX = b"/tooz/locks"
def __init__(self, coord, name, timeout):
super(Etcd3Lock, self).__init__(name)
self._coord = coord
self._lock = coord.client.lock(name.decode(), timeout)
self._exclusive_access = threading.Lock()
@_translate_failures
def acquire(self, blocking=True, shared=False):
if shared:
raise tooz.NotImplemented
blocking, timeout = utils.convert_blocking(blocking)
if blocking is False:
timeout = 0
if self._lock.acquire(timeout):
self._coord._acquired_locks.add(self)
return True
return False
@property
def acquired(self):
return self in self._coord._acquired_locks
@_translate_failures
def release(self):
with self._exclusive_access:
if self.acquired and self._lock.release():
self._coord._acquired_locks.discard(self)
return True
return False
@_translate_failures
def heartbeat(self):
with self._exclusive_access:
if self.acquired:
self._lock.refresh()
return True
return False
class Etcd3Driver(coordination.CoordinationDriverCachedRunWatchers,
coordination.CoordinationDriverWithExecutor):
"""An etcd based driver.
This driver uses etcd provide the coordination driver semantics and
required API(s).
The Etcd driver connection URI should look like::
etcd3://[HOST[:PORT]][?OPTION1=VALUE1[&OPTION2=VALUE2[&...]]]
If not specified, HOST defaults to localhost and PORT defaults to 2379.
Available options are:
================== =======
Name Default
================== =======
ca_cert None
cert_key None
cert_cert None
timeout 30
lock_timeout 30
membership_timeout 30
================== =======
"""
#: Default socket/lock/member/leader timeout used when none is provided.
DEFAULT_TIMEOUT = 30
#: Default hostname used when none is provided.
DEFAULT_HOST = "localhost"
#: Default port used if none provided (4001 or 2379 are the common ones).
DEFAULT_PORT = 2379
def __init__(self, member_id, parsed_url, options):
super(Etcd3Driver, self).__init__(member_id, parsed_url, options)
warnings.warn(
"The etcd3 tooz driver is deprecated, it will be removed in"
"a future release."
)
host = parsed_url.hostname or self.DEFAULT_HOST
port = parsed_url.port or self.DEFAULT_PORT
options = utils.collapse(options)
ca_cert = options.get('ca_cert')
cert_key = options.get('cert_key')
cert_cert = options.get('cert_cert')
timeout = int(options.get('timeout', self.DEFAULT_TIMEOUT))
self.client = etcd3.client(host=host,
port=port,
ca_cert=ca_cert,
cert_key=cert_key,
cert_cert=cert_cert,
timeout=timeout)
self.lock_timeout = int(options.get('lock_timeout', timeout))
self.membership_timeout = int(options.get(
'membership_timeout', timeout))
self._acquired_locks = set()
def _start(self):
super(Etcd3Driver, self)._start()
self._membership_lease = self.client.lease(self.membership_timeout)
def _stop(self):
super(Etcd3Driver, self)._stop()
self._membership_lease.revoke()
def get_lock(self, name):
return Etcd3Lock(self, name, self.lock_timeout)
def heartbeat(self):
# NOTE(jaypipes): Copying because set can mutate during iteration
for lock in self._acquired_locks.copy():
lock.heartbeat()
# TODO(jd) use the same lease for locks?
self._membership_lease.refresh()
return min(self.lock_timeout, self.membership_timeout)
GROUP_PREFIX = b"tooz/groups/"
def _encode_group_id(self, group_id):
return self.GROUP_PREFIX + utils.to_binary(group_id) + b"/"
def _encode_group_member_id(self, group_id, member_id):
return self._encode_group_id(group_id) + utils.to_binary(member_id)
def create_group(self, group_id):
encoded_group = self._encode_group_id(group_id)
@_translate_failures
def _create_group():
status, results = self.client.transaction(
compare=[
self.client.transactions.version(
encoded_group) == 0
],
success=[
self.client.transactions.put(encoded_group, b"")
],
failure=[],
)
if not status:
raise coordination.GroupAlreadyExist(group_id)
return EtcdFutureResult(self._executor.submit(_create_group))
def _destroy_group(self, group_id):
self.client.delete(self._encode_group_id(group_id))
def delete_group(self, group_id):
encoded_group = self._encode_group_id(group_id)
@_translate_failures
def _delete_group():
members = list(self.client.get_prefix(encoded_group))
if len(members) > 1:
raise coordination.GroupNotEmpty(group_id)
# Warning: as of this writing python-etcd3 does not support the
# NOT_EQUAL operator so we use the EQUAL operator and will retry on
# success, hihi
status, results = self.client.transaction(
compare=[
self.client.transactions.version(encoded_group) == 0
],
success=[],
failure=[
self.client.transactions.delete(encoded_group)
],
)
if status:
raise coordination.GroupNotCreated(group_id)
return EtcdFutureResult(self._executor.submit(_delete_group))
def join_group(self, group_id, capabilities=b""):
encoded_group = self._encode_group_id(group_id)
@_retry.retry()
@_translate_failures
def _join_group():
members = list(self.client.get_prefix(encoded_group))
encoded_member = self._encode_group_member_id(
group_id, self._member_id)
group_metadata = None
for cap, metadata in members:
if metadata.key == encoded_member:
raise coordination.MemberAlreadyExist(group_id,
self._member_id)
if metadata.key == encoded_group:
group_metadata = metadata
if group_metadata is None:
raise coordination.GroupNotCreated(group_id)
status, results = self.client.transaction(
# This comparison makes sure the group has not been deleted in
# the mean time
compare=[
self.client.transactions.version(encoded_group) ==
group_metadata.version
],
success=[
self.client.transactions.put(encoded_member,
utils.dumps(capabilities),
lease=self._membership_lease)
],
failure=[],
)
if not status:
# TODO(jd) There's a small optimization doable by getting the
# current range on failure and passing it to this function as
# the first arg when retrying to avoid redoing a get_prefix()
raise _retry.TryAgain
return EtcdFutureResult(self._executor.submit(_join_group))
def leave_group(self, group_id):
encoded_group = self._encode_group_id(group_id)
@_translate_failures
def _leave_group():
members = list(self.client.get_prefix(encoded_group))
encoded_member = self._encode_group_member_id(
group_id, self._member_id)
for capabilities, metadata in members:
if metadata.key == encoded_member:
break
else:
raise coordination.MemberNotJoined(group_id,
self._member_id)
self.client.delete(encoded_member)
return EtcdFutureResult(self._executor.submit(_leave_group))
def get_members(self, group_id):
encoded_group = self._encode_group_id(group_id)
@_translate_failures
def _get_members():
members = set()
group_found = False
for cap, metadata in self.client.get_prefix(encoded_group):
if metadata.key == encoded_group:
group_found = True
else:
members.add(metadata.key[len(encoded_group):])
if not group_found:
raise coordination.GroupNotCreated(group_id)
return members
return EtcdFutureResult(self._executor.submit(_get_members))
def get_member_capabilities(self, group_id, member_id):
encoded_member = self._encode_group_member_id(
group_id, member_id)
@_translate_failures
def _get_member_capabilities():
capabilities, metadata = self.client.get(encoded_member)
if capabilities is None:
raise coordination.MemberNotJoined(group_id, member_id)
return utils.loads(capabilities)
return EtcdFutureResult(
self._executor.submit(_get_member_capabilities))
def update_capabilities(self, group_id, capabilities):
encoded_member = self._encode_group_member_id(
group_id, self._member_id)
@_translate_failures
def _update_capabilities():
cap, metadata = self.client.get(encoded_member)
if cap is None:
raise coordination.MemberNotJoined(group_id, self._member_id)
self.client.put(encoded_member, utils.dumps(capabilities),
lease=self._membership_lease)
return EtcdFutureResult(
self._executor.submit(_update_capabilities))
@staticmethod
def watch_elected_as_leader(group_id, callback):
raise tooz.NotImplemented
@staticmethod
def unwatch_elected_as_leader(group_id, callback):
raise tooz.NotImplemented
EtcdFutureResult = functools.partial(coordination.CoordinatorResult,
failure_translator=_failure_translator)

View File

@ -55,8 +55,6 @@ class TestWithCoordinator(testcase.TestCase, metaclass=SkipNotImplementedMeta):
super(TestWithCoordinator, self).setUp()
if self.url is None:
raise RuntimeError("No URL set for this driver")
if os.getenv("TOOZ_TEST_ETCD3"):
self.url = self.url.replace("etcd://", "etcd3://")
if os.getenv("TOOZ_TEST_ETCD3GW"):
# TODO(jan.gutter): When pifpaf supports etcd 3.4 we should use the
# defaults

View File

@ -1,63 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright 2020 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ddt
from testtools import testcase
from unittest import mock
import tooz.coordination
import tooz.drivers.etcd3 as etcd3_driver
import tooz.tests
@ddt.ddt
class TestEtcd3(testcase.TestCase):
FAKE_MEMBER_ID = tooz.tests.get_random_uuid()
@ddt.data({'coord_url': 'etcd3://',
'host': etcd3_driver.Etcd3Driver.DEFAULT_HOST,
'port': etcd3_driver.Etcd3Driver.DEFAULT_PORT,
'ca_cert': None,
'cert_key': None,
'cert_cert': None,
'timeout': etcd3_driver.Etcd3Driver.DEFAULT_TIMEOUT},
{'coord_url': ('etcd3://my_host:666?ca_cert=/my/ca_cert&'
'cert_key=/my/cert_key&cert_cert=/my/cert_cert&'
'timeout=42'),
'host': 'my_host',
'port': 666,
'ca_cert': '/my/ca_cert',
'cert_key': '/my/cert_key',
'cert_cert': '/my/cert_cert',
'timeout': 42})
@ddt.unpack
@mock.patch('etcd3.client')
def test_etcd3_client_init(self,
mock_etcd3_client,
coord_url,
host,
port,
ca_cert,
cert_key,
cert_cert,
timeout):
tooz.coordination.get_coordinator(coord_url, self.FAKE_MEMBER_ID)
mock_etcd3_client.assert_called_with(host=host,
port=port,
ca_cert=ca_cert,
cert_key=cert_key,
cert_cert=cert_cert,
timeout=timeout)

View File

@ -818,22 +818,10 @@ class TestAPI(tests.TestWithCoordinator):
self.assertTrue(f.result())
def test_get_lock_concurrency_locking_two_lock_process(self):
# NOTE(jd) Using gRPC and forking is not supported so this test might
# very likely hang forever or crash. See
# https://github.com/grpc/grpc/issues/10140#issuecomment-297548714 for
# more info.
if self.url.startswith("etcd3://"):
self.skipTest("Unable to use etcd3 with fork()")
self._do_test_get_lock_concurrency_locking_two_lock(
futures.ProcessPoolExecutor, False)
def test_get_lock_serial_locking_two_lock_process(self):
# NOTE(jd) Using gRPC and forking is not supported so this test might
# very likely hang forever or crash. See
# https://github.com/grpc/grpc/issues/10140#issuecomment-297548714 for
# more info.
if self.url.startswith("etcd3://"):
self.skipTest("Unable to use etcd3 with fork()")
self._do_test_get_lock_serial_locking_two_lock(
futures.ProcessPoolExecutor, False)

View File

@ -1,13 +1,13 @@
[tox]
minversion = 3.1.0
envlist = py3,py{36,38}-{zookeeper,redis,sentinel,memcached,postgresql,mysql,consul,etcd,etcd3,etcd3gw},pep8
envlist = py3,py{36,38}-{zookeeper,redis,sentinel,memcached,postgresql,mysql,consul,etcd,etcd3gw},pep8
ignore_basepython_conflict = True
[testenv]
basepython = python3
# We need to install a bit more than just `test-requirements' because those drivers have
# custom tests that we always run
deps = .[zake,ipc,memcached,mysql,etcd,etcd3,etcd3gw]
deps = .[zake,ipc,memcached,mysql,etcd,etcd3gw]
zookeeper: .[zookeeper]
redis: .[redis]
sentinel: .[redis]
@ -15,7 +15,6 @@ deps = .[zake,ipc,memcached,mysql,etcd,etcd3,etcd3gw]
postgresql: .[postgresql]
mysql: .[mysql]
etcd: .[etcd]
etcd3: .[etcd3]
etcd3gw: .[etcd3gw]
consul: .[consul]
-r{toxinidir}/test-requirements.txt
@ -28,8 +27,6 @@ setenv =
mysql: TOOZ_TEST_DRIVERS = mysql
postgresql: TOOZ_TEST_DRIVERS = postgresql
etcd: TOOZ_TEST_DRIVERS = etcd,etcd --cluster
etcd3: TOOZ_TEST_DRIVERS = etcd
etcd3: TOOZ_TEST_ETCD3 = 1
etcd3gw: TOOZ_TEST_DRIVERS = etcd
etcd3gw: TOOZ_TEST_ETCD3GW = 1
consul: TOOZ_TEST_DRIVERS = consul