diff --git a/test-requirements.txt b/test-requirements.txt index 29b379fa..807014eb 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -16,3 +16,6 @@ coverage>=3.6 psycopg2 PyMySQL>=0.6.2 # MIT License sysv-ipc>=0.6.8 # BSD License + +# Ensure that the eventlet executor continues to operate... +eventlet!=0.17.0,>=0.16.1 \ No newline at end of file diff --git a/tooz/drivers/zookeeper.py b/tooz/drivers/zookeeper.py index cac244ed..2ce353be 100644 --- a/tooz/drivers/zookeeper.py +++ b/tooz/drivers/zookeeper.py @@ -19,8 +19,12 @@ import copy from kazoo import client from kazoo import exceptions +from kazoo.handlers import eventlet as eventlet_handler +from kazoo.handlers import threading as threading_handler from kazoo.protocol import paths +from oslo_utils import strutils import six +from six.moves import filter as compat_filter from tooz import coordination from tooz.drivers import _retry @@ -73,6 +77,8 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver): def __init__(self, member_id, parsed_url, options): super(BaseZooKeeperDriver, self).__init__() + options = utils.collapse(options, exclude=['hosts']) + self._options = options self._member_id = member_id self.timeout = int(options.get('timeout', ['10'])[-1]) @@ -291,15 +297,49 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver): class KazooDriver(BaseZooKeeperDriver): """The driver using the Kazoo client against real ZooKeeper servers.""" + HANDLERS = { + 'eventlet': eventlet_handler.SequentialEventletHandler, + 'threading': threading_handler.SequentialThreadingHandler, + } + """ + Restricted immutable dict of handler 'kinds' -> handler classes that + this driver can accept via 'handler' option key (the expected value for + this option is one of the keys in this dictionary). + """ + def __init__(self, member_id, parsed_url, options): super(KazooDriver, self).__init__(member_id, parsed_url, options) - self._coord = self._make_client(parsed_url, options) + self._coord = self._make_client(parsed_url, self._options) self._member_id = member_id self._timeout_exception = self._coord.handler.timeout_exception - @classmethod - def _make_client(cls, parsed_url, options): - return client.KazooClient(hosts=parsed_url.netloc) + def _make_client(self, parsed_url, options): + # Creates a kazoo client, + # See: https://github.com/python-zk/kazoo/blob/1.3.1/kazoo/client.py + # for what options a client takes... + maybe_hosts = [parsed_url.netloc] + list(options.get('hosts', [])) + hosts = list(compat_filter(None, maybe_hosts)) + if not hosts: + hosts = ['localhost:2181'] + randomize_hosts = options.get('randomize_hosts', True) + client_kwargs = { + 'hosts': ",".join(hosts), + 'timeout': float(options.get('timeout', self.timeout)), + 'connection_retry': options.get('connection_retry'), + 'command_retry': options.get('command_retry'), + 'randomize_hosts': strutils.bool_from_string(randomize_hosts), + } + handler_kind = options.get('handler') + if handler_kind: + try: + handler_cls = self.HANDLERS[handler_kind] + except KeyError: + raise ValueError("Unknown handler '%s' requested" + " valid handlers are %s" + % (handler_kind, + sorted(self.HANDLERS.keys()))) + client_kwargs['handler'] = handler_cls() + return client.KazooClient(**client_kwargs) def _watch_group(self, group_id): get_members_req = self.get_members(group_id) diff --git a/tooz/utils.py b/tooz/utils.py index ffb96416..9dbfabc8 100644 --- a/tooz/utils.py +++ b/tooz/utils.py @@ -58,3 +58,31 @@ def loads(blob, excp_cls=coordination.ToozError): return msgpack.unpackb(blob, encoding='utf-8') except (msgpack.UnpackException, ValueError) as e: raise excp_cls(exception_message(e)) + + +def collapse(config, exclude=None, item_selector=None): + """Collapses config with keys and **list/tuple** values. + NOTE(harlowja): The last item/index from the list/tuple value is selected + be default as the new value (values that are not lists/tuples are left + alone). If the list/tuple value is empty (zero length), then no value + is set. + """ + if not isinstance(config, dict): + raise TypeError("Unexpected config type, dict expected") + if not config: + return {} + if exclude is None: + exclude = set() + if item_selector is None: + item_selector = lambda items: items[-1] + collapsed = {} + for (k, v) in six.iteritems(config): + if isinstance(v, (tuple, list)): + if k in exclude: + collapsed[k] = v + else: + if len(v): + collapsed[k] = item_selector(v) + else: + collapsed[k] = v + return collapsed