Allow specifying a kazoo async handler 'kind'

In situations where the built-in (and default) kazoo
threading async handler does not work (which sometimes
appears to happen under eventlet) allow for specifying
a different handler (ie the 'eventlet' one) that should
work better under those scenarios.

Closes-bug: #1512001
Change-Id: Iec5e39928b223a3ffca0b9b5b4d0fd61abaa0f2b
(cherry-picked from commit 9c5cb6fd0b)
This commit is contained in:
Joshua Harlow 2015-11-03 11:08:54 -08:00 committed by Rohit Jaiswal
parent 8a01682bb5
commit 4251e1ea72
3 changed files with 75 additions and 4 deletions

View File

@ -16,3 +16,6 @@ coverage>=3.6
psycopg2
PyMySQL>=0.6.2 # MIT License
sysv-ipc>=0.6.8 # BSD License
# Ensure that the eventlet executor continues to operate...
eventlet!=0.17.0,>=0.16.1

View File

@ -19,8 +19,12 @@ import copy
from kazoo import client
from kazoo import exceptions
from kazoo.handlers import eventlet as eventlet_handler
from kazoo.handlers import threading as threading_handler
from kazoo.protocol import paths
from oslo_utils import strutils
import six
from six.moves import filter as compat_filter
from tooz import coordination
from tooz.drivers import _retry
@ -73,6 +77,8 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver):
def __init__(self, member_id, parsed_url, options):
super(BaseZooKeeperDriver, self).__init__()
options = utils.collapse(options, exclude=['hosts'])
self._options = options
self._member_id = member_id
self.timeout = int(options.get('timeout', ['10'])[-1])
@ -291,15 +297,49 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver):
class KazooDriver(BaseZooKeeperDriver):
"""The driver using the Kazoo client against real ZooKeeper servers."""
HANDLERS = {
'eventlet': eventlet_handler.SequentialEventletHandler,
'threading': threading_handler.SequentialThreadingHandler,
}
"""
Restricted immutable dict of handler 'kinds' -> handler classes that
this driver can accept via 'handler' option key (the expected value for
this option is one of the keys in this dictionary).
"""
def __init__(self, member_id, parsed_url, options):
super(KazooDriver, self).__init__(member_id, parsed_url, options)
self._coord = self._make_client(parsed_url, options)
self._coord = self._make_client(parsed_url, self._options)
self._member_id = member_id
self._timeout_exception = self._coord.handler.timeout_exception
@classmethod
def _make_client(cls, parsed_url, options):
return client.KazooClient(hosts=parsed_url.netloc)
def _make_client(self, parsed_url, options):
# Creates a kazoo client,
# See: https://github.com/python-zk/kazoo/blob/1.3.1/kazoo/client.py
# for what options a client takes...
maybe_hosts = [parsed_url.netloc] + list(options.get('hosts', []))
hosts = list(compat_filter(None, maybe_hosts))
if not hosts:
hosts = ['localhost:2181']
randomize_hosts = options.get('randomize_hosts', True)
client_kwargs = {
'hosts': ",".join(hosts),
'timeout': float(options.get('timeout', self.timeout)),
'connection_retry': options.get('connection_retry'),
'command_retry': options.get('command_retry'),
'randomize_hosts': strutils.bool_from_string(randomize_hosts),
}
handler_kind = options.get('handler')
if handler_kind:
try:
handler_cls = self.HANDLERS[handler_kind]
except KeyError:
raise ValueError("Unknown handler '%s' requested"
" valid handlers are %s"
% (handler_kind,
sorted(self.HANDLERS.keys())))
client_kwargs['handler'] = handler_cls()
return client.KazooClient(**client_kwargs)
def _watch_group(self, group_id):
get_members_req = self.get_members(group_id)

View File

@ -58,3 +58,31 @@ def loads(blob, excp_cls=coordination.ToozError):
return msgpack.unpackb(blob, encoding='utf-8')
except (msgpack.UnpackException, ValueError) as e:
raise excp_cls(exception_message(e))
def collapse(config, exclude=None, item_selector=None):
"""Collapses config with keys and **list/tuple** values.
NOTE(harlowja): The last item/index from the list/tuple value is selected
be default as the new value (values that are not lists/tuples are left
alone). If the list/tuple value is empty (zero length), then no value
is set.
"""
if not isinstance(config, dict):
raise TypeError("Unexpected config type, dict expected")
if not config:
return {}
if exclude is None:
exclude = set()
if item_selector is None:
item_selector = lambda items: items[-1]
collapsed = {}
for (k, v) in six.iteritems(config):
if isinstance(v, (tuple, list)):
if k in exclude:
collapsed[k] = v
else:
if len(v):
collapsed[k] = item_selector(v)
else:
collapsed[k] = v
return collapsed