diff --git a/compute_hyperv/nova/conf.py b/compute_hyperv/nova/conf.py index a3d64228..5322550b 100644 --- a/compute_hyperv/nova/conf.py +++ b/compute_hyperv/nova/conf.py @@ -63,9 +63,17 @@ hyperv_opts = [ "requiring subsequent retries."), ] +coordination_opts = [ + cfg.StrOpt('backend_url', + default='file:///C:/OpenStack/Lock', + help='The backend URL to use for distributed coordination.'), +] + CONF = nova.conf.CONF +CONF.register_opts(coordination_opts, 'coordination') CONF.register_opts(hyperv_opts, 'hyperv') def list_opts(): - return [('hyperv', hyperv_opts)] + return [('coordination', coordination_opts), + ('hyperv', hyperv_opts)] diff --git a/compute_hyperv/nova/coordination.py b/compute_hyperv/nova/coordination.py new file mode 100644 index 00000000..9473ee67 --- /dev/null +++ b/compute_hyperv/nova/coordination.py @@ -0,0 +1,158 @@ +# Copyright 2015 Intel +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Coordination and locking utilities.""" + +import inspect +import uuid + +import decorator +from nova import exception +from oslo_config import cfg +from oslo_log import log +from oslo_utils import timeutils +from tooz import coordination + +from compute_hyperv.i18n import _ + +LOG = log.getLogger(__name__) + +CONF = cfg.CONF + + +class Coordinator(object): + """Tooz coordination wrapper. + + Coordination member id is created from concatenated + `prefix` and `agent_id` parameters. + + :param str agent_id: Agent identifier + :param str prefix: Used to provide member identifier with a + meaningful prefix. + """ + + def __init__(self, agent_id=None, prefix=''): + self.coordinator = None + self.agent_id = agent_id or str(uuid.uuid4()) + self.started = False + self.prefix = prefix + + def start(self): + if self.started: + return + + # NOTE(bluex): Tooz expects member_id as a byte string. + member_id = (self.prefix + self.agent_id).encode('ascii') + self.coordinator = coordination.get_coordinator( + cfg.CONF.coordination.backend_url, member_id) + self.coordinator.start(start_heart=True) + self.started = True + + def stop(self): + """Disconnect from coordination backend and stop heartbeat.""" + if self.started: + self.coordinator.stop() + self.coordinator = None + self.started = False + + def get_lock(self, name): + """Return a Tooz backend lock. + + :param str name: The lock name that is used to identify it + across all nodes. + """ + # NOTE(bluex): Tooz expects lock name as a byte string. + lock_name = (self.prefix + name).encode('ascii') + if self.coordinator is not None: + return self.coordinator.get_lock(lock_name) + else: + raise exception.NovaException( + _('Could not create lock. Coordinator uninitialized.')) + + +COORDINATOR = Coordinator(prefix='compute-hyperv-') + + +def synchronized(lock_name, blocking=True, coordinator=COORDINATOR): + """Synchronization decorator. + + :param str lock_name: Lock name. + :param blocking: If True, blocks until the lock is acquired. + If False, raises exception when not acquired. Otherwise, + the value is used as a timeout value and if lock is not acquired + after this number of seconds exception is raised. + :param coordinator: Coordinator class to use when creating lock. + Defaults to the global coordinator. + :raises tooz.coordination.LockAcquireFailed: if lock is not acquired + + Decorating a method like so:: + + @synchronized('mylock') + def foo(self, *args): + ... + + ensures that only one process will execute the foo method at a time. + + Different methods can share the same lock:: + + @synchronized('mylock') + def foo(self, *args): + ... + + @synchronized('mylock') + def bar(self, *args): + ... + + This way only one of either foo or bar can be executing at a time. + + Lock name can be formatted using Python format string syntax:: + + @synchronized('{f_name}-{vol.id}-{snap[name]}') + def foo(self, vol, snap): + ... + + Available field names are: decorated function parameters and + `f_name` as a decorated function name. + """ + + @decorator.decorator + def _synchronized(f, *a, **k): + call_args = inspect.getcallargs(f, *a, **k) + call_args['f_name'] = f.__name__ + lock = coordinator.get_lock(lock_name.format(**call_args)) + t1 = timeutils.now() + t2 = None + try: + with lock(blocking): + t2 = timeutils.now() + LOG.debug('Lock "%(name)s" acquired by "%(function)s" :: ' + 'waited %(wait_secs)0.3fs', + {'name': lock.name, + 'function': f.__name__, + 'wait_secs': (t2 - t1)}) + return f(*a, **k) + finally: + t3 = timeutils.now() + if t2 is None: + held_secs = "N/A" + else: + held_secs = "%0.3fs" % (t3 - t2) + LOG.debug('Lock "%(name)s" released by "%(function)s" :: held ' + '%(held_secs)s', + {'name': lock.name, + 'function': f.__name__, + 'held_secs': held_secs}) + + return _synchronized diff --git a/compute_hyperv/tests/unit/test_coordination.py b/compute_hyperv/tests/unit/test_coordination.py new file mode 100644 index 00000000..6dffbd6a --- /dev/null +++ b/compute_hyperv/tests/unit/test_coordination.py @@ -0,0 +1,112 @@ +# Copyright 2015 Intel +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import inspect + +import mock +import tooz.coordination +import tooz.locking + +from compute_hyperv.nova import coordination +from compute_hyperv.tests.unit import test_base + +if hasattr(inspect, 'getfullargspec'): + getargspec = inspect.getfullargspec +else: + getargspec = inspect.getargspec + + +class Locked(Exception): + pass + + +class MockToozLock(tooz.locking.Lock): + active_locks = set() + + def acquire(self, blocking=True): + if self.name not in self.active_locks: + self.active_locks.add(self.name) + return True + elif not blocking: + return False + else: + raise Locked + + def release(self): + self.active_locks.remove(self.name) + + +@mock.patch('tooz.coordination.get_coordinator') +class CoordinatorTestCase(test_base.HyperVBaseTestCase): + def test_coordinator_start(self, get_coordinator): + crd = get_coordinator.return_value + + agent = coordination.Coordinator() + agent.start() + self.assertTrue(get_coordinator.called) + self.assertTrue(crd.start.called) + + def test_coordinator_stop(self, get_coordinator): + crd = get_coordinator.return_value + + agent = coordination.Coordinator() + agent.start() + self.assertIsNotNone(agent.coordinator) + agent.stop() + self.assertTrue(crd.stop.called) + self.assertIsNone(agent.coordinator) + + def test_coordinator_lock(self, get_coordinator): + crd = get_coordinator.return_value + crd.get_lock.side_effect = lambda n: MockToozLock(n) + + agent1 = coordination.Coordinator() + agent1.start() + agent2 = coordination.Coordinator() + agent2.start() + + lock_name = 'lock' + expected_name = lock_name.encode('ascii') + + self.assertNotIn(expected_name, MockToozLock.active_locks) + with agent1.get_lock(lock_name): + self.assertIn(expected_name, MockToozLock.active_locks) + self.assertRaises(Locked, agent1.get_lock(lock_name).acquire) + self.assertRaises(Locked, agent2.get_lock(lock_name).acquire) + self.assertNotIn(expected_name, MockToozLock.active_locks) + + def test_coordinator_offline(self, get_coordinator): + crd = get_coordinator.return_value + crd.start.side_effect = tooz.coordination.ToozConnectionError('err') + + agent = coordination.Coordinator() + self.assertRaises(tooz.coordination.ToozError, agent.start) + self.assertFalse(agent.started) + + +@mock.patch.object(coordination.COORDINATOR, 'get_lock') +class CoordinationTestCase(test_base.HyperVBaseTestCase): + def test_synchronized(self, get_lock): + @coordination.synchronized('lock-{f_name}-{foo.val}-{bar[val]}') + def func(foo, bar): + pass + + foo = mock.Mock() + foo.val = 7 + bar = mock.MagicMock() + bar.__getitem__.return_value = 8 + func(foo, bar) + get_lock.assert_called_with('lock-func-7-8') + self.assertEqual(['foo', 'bar'], getargspec(func)[0]) diff --git a/lower-constraints.txt b/lower-constraints.txt index cc84c0fe..45e8acaf 100644 --- a/lower-constraints.txt +++ b/lower-constraints.txt @@ -79,6 +79,7 @@ stestr==2.0.0 stevedore==1.28.0 testrepository==0.0.20 testtools==2.2.0 +tooz==1.58.0 traceback2==1.4.0 unittest2==1.1.0 urllib3==1.22 diff --git a/requirements.txt b/requirements.txt index e45286a0..0dd938a9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,5 +14,7 @@ oslo.service!=1.28.1,>=1.24.0 # Apache-2.0 oslo.utils>=3.33.0 # Apache-2.0 oslo.i18n>=3.15.3 # Apache-2.0 +tooz>=1.58.0 # Apache-2.0 + eventlet!=0.18.3,!=0.20.1,>=0.18.2 # MIT python-barbicanclient>=4.5.2 # Apache-2.0