Switch to URL for loading backends

This allow to pass options in a single string, which is going to be
easier for managing options.

Change-Id: I32409c09153b8abaf2b36c31f0bbf658a9d653bc
This commit is contained in:
Julien Danjou 2014-08-04 15:51:14 +02:00
parent 30588b877a
commit d38fe0301d
11 changed files with 80 additions and 108 deletions

View File

@ -1,5 +1,5 @@
from tooz import coordination
coordinator = coordination.get_coordinator('zookeeper', b'host-1')
coordinator = coordination.get_coordinator('zookeeper://localhost', b'host-1')
coordinator.start()
coordinator.stop()

View File

@ -2,7 +2,7 @@ import time
from tooz import coordination
coordinator = coordination.get_coordinator('memcached', b'host-1')
coordinator = coordination.get_coordinator('memcached://localhost', b'host-1')
coordinator.start()
while True:

View File

@ -1,6 +1,6 @@
from tooz import coordination
coordinator = coordination.get_coordinator('zookeeper', b'host-1')
coordinator = coordination.get_coordinator('zookeeper://localhost', b'host-1')
coordinator.start()
# Create a group

View File

@ -1,6 +1,6 @@
from tooz import coordination
coordinator = coordination.get_coordinator('zookeeper', b'host-1')
coordinator = coordination.get_coordinator('zookeeper://localhost', b'host-1')
coordinator.start()
# Create a group

View File

@ -1,6 +1,6 @@
from tooz import coordination
coordinator = coordination.get_coordinator('zookeeper', b'host-1')
coordinator = coordination.get_coordinator('zookeeper://localhost', b'host-1')
coordinator.start()
# Create a group

View File

@ -1,5 +1,5 @@
pbr>=0.6,!=0.7,<1.0
babel
Babel>=1.3
stevedore>=0.14
six>=1.7.0
iso8601

View File

@ -20,6 +20,8 @@ import collections
import six
from stevedore import driver
from tooz.openstack.common import network_utils
TOOZ_BACKENDS_NAMESPACE = "tooz.backends"
@ -166,14 +168,11 @@ class CoordinationDriver(object):
"""
raise NotImplementedError
def start(self, timeout=10):
def start(self):
"""Start the service engine.
If needed, the establishment of a connection to the servers
is initiated.
:param timeout: Time in seconds to wait for connection to succeed.
:type timeout: int
"""
@staticmethod
@ -314,23 +313,21 @@ class CoordAsyncResult(object):
"""Returns True if the task is done, False otherwise."""
# TODO(yassine)
# Replace kwargs by something more simple.
def get_coordinator(backend, member_id, **kwargs):
def get_coordinator(backend_url, member_id):
"""Initialize and load the backend.
:param backend: the current tooz provided backends are 'zookeeper'
:param backend_url: the backend URL to use
:type backend: str
:param member_id: the id of the member
:type member_id: str
:param kwargs: additional backend specific options
:type kwargs: dict
"""
return driver.DriverManager(namespace=TOOZ_BACKENDS_NAMESPACE,
name=backend,
invoke_on_load=True,
invoke_args=(member_id,),
invoke_kwds=kwargs).driver
parsed_url = network_utils.urlsplit(backend_url)
parsed_qs = six.moves.urllib.parse.parse_qs(parsed_url.query)
return driver.DriverManager(
namespace=TOOZ_BACKENDS_NAMESPACE,
name=parsed_url.scheme,
invoke_on_load=True,
invoke_args=(member_id, parsed_url, parsed_qs)).driver
class ToozError(Exception):

View File

@ -44,7 +44,7 @@ class IPCLock(locking.Lock):
class IPCDriver(coordination.CoordinationDriver):
def __init__(self, member_id, lock_timeout=30):
def __init__(self, member_id, parsed_url, options):
"""Initialize the IPC driver.
:param lock_timeout: how many seconds to wait when trying to acquire
@ -55,7 +55,7 @@ class IPCDriver(coordination.CoordinationDriver):
lock_timeout
"""
super(IPCDriver, self).__init__()
self.lock_timeout = lock_timeout
self.lock_timeout = int(options.get('lock_timeout', ['30'])[-1])
def get_lock(self, name):
return IPCLock(name, self.lock_timeout)

View File

@ -79,14 +79,20 @@ class MemcachedDriver(coordination.CoordinationDriver):
_MEMBER_PREFIX = b'_TOOZ_MEMBER_'
_GROUP_LIST_KEY = b'_TOOZ_GROUP_LIST'
def __init__(self, member_id, membership_timeout=30, lock_timeout=30,
leader_timeout=30):
def __init__(self, member_id, parsed_url, options):
super(MemcachedDriver, self).__init__()
self._member_id = member_id
self._groups = set()
self.membership_timeout = membership_timeout
self.lock_timeout = lock_timeout
self.leader_timeout = leader_timeout
self.host = (parsed_url.hostname or "localhost",
parsed_url.port or 11211)
default_timeout = options.get('timeout', ['30'])
self.timeout = int(default_timeout[-1])
self.membership_timeout = int(options.get(
'membership_timeout', default_timeout)[-1])
self.lock_timeout = int(options.get(
'lock_timeout', default_timeout)[-1])
self.leader_timeout = int(options.get(
'leader_timeout', default_timeout)[-1])
@staticmethod
def _msgpack_serializer(key, value):
@ -102,14 +108,14 @@ class MemcachedDriver(coordination.CoordinationDriver):
return msgpack.loads(value)
raise Exception("Unknown serialization format")
def start(self, host=("127.0.0.1", 11211), timeout=5):
def start(self):
try:
self.client = pymemcache.client.Client(
host,
self.host,
serializer=self._msgpack_serializer,
deserializer=self._msgpack_deserializer,
timeout=timeout,
connect_timeout=timeout)
timeout=self.timeout,
connect_timeout=self.timeout)
except Exception as e:
raise coordination.ToozConnectionError(e)
self._group_members = collections.defaultdict(set)

View File

@ -16,12 +16,14 @@
import collections
import copy
import threading
from kazoo import client
from kazoo import exceptions
from kazoo.protocol import paths
import six
from zake import fake_client
import zake.fake_client
import zake.fake_storage
from tooz import coordination
from tooz import locking
@ -43,9 +45,14 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver):
_TOOZ_NAMESPACE = b"tooz"
def start(self, timeout=10):
def __init__(self, member_id, parsed_url, options):
super(BaseZooKeeperDriver, self).__init__()
self._member_id = member_id
self.timeout = int(options.get('timeout', ['10'])[-1])
def start(self):
try:
self._coord.start(timeout=timeout)
self._coord.start(timeout=self.timeout)
except self._coord.handler.timeout_exception as e:
raise coordination.ToozConnectionError("operation error: %s" % (e))
@ -201,20 +208,10 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver):
class KazooDriver(BaseZooKeeperDriver):
"""The driver using the Kazoo client against real ZooKeeper servers."""
def __init__(self, member_id, hosts="127.0.0.1:2181", handler=None,
**kwargs):
""":param hosts: the list of zookeeper servers in the
form "ip:port2, ip2:port2".
:param handler: a kazoo async handler to use if provided, if not
provided the default that kazoo uses internally will be used instead.
"""
if not all((hosts, member_id)):
raise KeyError("hosts=%r, member_id=%r" % hosts, member_id)
def __init__(self, member_id, parsed_url, options):
super(KazooDriver, self).__init__(member_id, parsed_url, options)
self._coord = client.KazooClient(hosts=parsed_url.netloc)
self._member_id = member_id
self._coord = client.KazooClient(hosts=hosts, handler=handler)
super(KazooDriver, self).__init__()
def _watch_group(self, group_id):
get_members_req = self.get_members(group_id)
@ -361,14 +358,11 @@ class ZakeDriver(BaseZooKeeperDriver):
without the need of real ZooKeeper servers.
"""
def __init__(self, member_id, storage=None, **kwargs):
""":param storage: a fake storage object."""
fake_storage = zake.fake_storage.FakeStorage(threading.RLock())
if not all((storage, member_id)):
raise KeyError("storage=%r, member_id=%r" % storage, member_id)
self._member_id = member_id
self._coord = fake_client.FakeClient(storage=storage)
super(ZakeDriver, self).__init__()
def __init__(self, member_id, parsed_url, options):
super(ZakeDriver, self).__init__(member_id, parsed_url, options)
self._coord = zake.fake_client.FakeClient(storage=self.fake_storage)
@staticmethod
def watch_join_group(group_id, callback):

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2013 eNovance Inc. All Rights Reserved.
# Copyright © 2013-2014 eNovance Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -13,50 +13,34 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import threading
import time
import uuid
import testscenarios
from testtools import testcase
from zake import fake_storage
import tooz.coordination
from tooz import tests
# Real ZooKeeper server scenario
zookeeper_tests = ('zookeeper_tests', {'backend': 'kazoo',
'kwargs': {'hosts': '127.0.0.1:2181'}})
# Fake Kazoo client scenario
fake_storage = fake_storage.FakeStorage(threading.RLock())
fake_zookeeper_tests = ('fake_zookeeper_tests', {'backend': 'zake',
'kwargs': {'storage':
fake_storage}})
class TestAPI(testscenarios.TestWithScenarios,
tests.TestCaseSkipNotImplemented):
scenarios = [
zookeeper_tests,
fake_zookeeper_tests,
('memcached', {'backend': 'memcached',
'kwargs': {'membership_timeout': 5}}),
('ipc', {'backend': 'ipc',
'kwargs': {'lock_timeout': 2}}),
('zookeeper', {'url': 'kazoo://127.0.0.1:2181?timeout=5'}),
('zake', {'url': 'zake://?timeout=5'}),
('memcached', {'url': 'memcached://?timeout=5'}),
('ipc', {'url': 'ipc://'}),
]
def setUp(self):
super(TestAPI, self).setUp()
self.group_id = self._get_random_uuid()
self.member_id = self._get_random_uuid()
self._coord = tooz.coordination.get_coordinator(self.backend,
self.member_id,
**self.kwargs)
self._coord = tooz.coordination.get_coordinator(self.url,
self.member_id)
try:
self._coord.start(timeout=5)
self._coord.start()
except tooz.coordination.ToozConnectionError as e:
raise testcase.TestSkipped(str(e))
@ -98,9 +82,8 @@ class TestAPI(testscenarios.TestWithScenarios,
def test_join_group_with_member_id_already_exists(self):
self._coord.create_group(self.group_id).get()
self._coord.join_group(self.group_id).get()
client = tooz.coordination.get_coordinator(self.backend,
self.member_id,
**self.kwargs)
client = tooz.coordination.get_coordinator(self.url,
self.member_id)
client.start()
join_group = client.join_group(self.group_id)
self.assertRaises(tooz.coordination.MemberAlreadyExist,
@ -144,9 +127,8 @@ class TestAPI(testscenarios.TestWithScenarios,
def test_get_members(self):
group_id_test2 = self._get_random_uuid()
member_id_test2 = self._get_random_uuid()
client2 = tooz.coordination.get_coordinator(self.backend,
member_id_test2,
**self.kwargs)
client2 = tooz.coordination.get_coordinator(self.url,
member_id_test2)
client2.start()
self._coord.create_group(group_id_test2).get()
@ -213,12 +195,11 @@ class TestAPI(testscenarios.TestWithScenarios,
self._coord.heartbeat()
def test_disconnect_leave_group(self):
if self.backend == 'zake':
if self.url.startswith('zake://'):
self.skipTest("Zake has a bug that prevent this test from working")
member_id_test2 = self._get_random_uuid()
client2 = tooz.coordination.get_coordinator(self.backend,
member_id_test2,
**self.kwargs)
client2 = tooz.coordination.get_coordinator(self.url,
member_id_test2)
client2.start()
self._coord.create_group(self.group_id).get()
self._coord.join_group(self.group_id).get()
@ -232,12 +213,11 @@ class TestAPI(testscenarios.TestWithScenarios,
self.assertTrue(member_id_test2 not in members_ids)
def test_timeout(self):
if self.backend != 'memcached':
if not self.url.startswith('memcached://'):
self.skipTest("This test only works with memcached for now")
member_id_test2 = self._get_random_uuid()
client2 = tooz.coordination.get_coordinator(self.backend,
member_id_test2,
**self.kwargs)
client2 = tooz.coordination.get_coordinator(self.url,
member_id_test2)
client2.start()
self._coord.create_group(self.group_id).get()
self._coord.join_group(self.group_id).get()
@ -258,9 +238,8 @@ class TestAPI(testscenarios.TestWithScenarios,
def test_watch_group_join(self):
member_id_test2 = self._get_random_uuid()
client2 = tooz.coordination.get_coordinator(self.backend,
member_id_test2,
**self.kwargs)
client2 = tooz.coordination.get_coordinator(self.url,
member_id_test2)
client2.start()
self._coord.create_group(self.group_id).get()
@ -293,9 +272,8 @@ class TestAPI(testscenarios.TestWithScenarios,
def test_watch_leave_group(self):
member_id_test2 = self._get_random_uuid()
client2 = tooz.coordination.get_coordinator(self.backend,
member_id_test2,
**self.kwargs)
client2 = tooz.coordination.get_coordinator(self.url,
member_id_test2)
client2.start()
self._coord.create_group(self.group_id).get()
@ -367,9 +345,8 @@ class TestAPI(testscenarios.TestWithScenarios,
self._coord.run_watchers()
member_id_test2 = self._get_random_uuid()
client2 = tooz.coordination.get_coordinator(self.backend,
member_id_test2,
**self.kwargs)
client2 = tooz.coordination.get_coordinator(self.url,
member_id_test2)
client2.start()
client2.watch_elected_as_leader(self.group_id, self._set_event)
client2.run_watchers()
@ -421,9 +398,8 @@ class TestAPI(testscenarios.TestWithScenarios,
self._coord.run_watchers()
member_id_test2 = self._get_random_uuid()
client2 = tooz.coordination.get_coordinator(self.backend,
member_id_test2,
**self.kwargs)
client2 = tooz.coordination.get_coordinator(self.url,
member_id_test2)
client2.start()
client2.watch_elected_as_leader(self.group_id, self._set_event)
client2.run_watchers()
@ -468,9 +444,8 @@ class TestAPI(testscenarios.TestWithScenarios,
def test_get_lock_multiple_coords(self):
member_id2 = self._get_random_uuid()
client2 = tooz.coordination.get_coordinator(self.backend,
member_id2,
**self.kwargs)
client2 = tooz.coordination.get_coordinator(self.url,
member_id2)
client2.start()
lock_name = self._get_random_uuid()