diff --git a/manila/cmd/share.py b/manila/cmd/share.py index 8c0f2b5b19..49ca96b720 100644 --- a/manila/cmd/share.py +++ b/manila/cmd/share.py @@ -48,7 +48,8 @@ def main(): host = "%s@%s" % (CONF.host, backend) server = service.Service.create(host=host, service_name=backend, - binary='manila-share') + binary='manila-share', + coordination=True) launcher.launch_service(server) else: server = service.Service.create(binary='manila-share') diff --git a/manila/coordination.py b/manila/coordination.py new file mode 100644 index 0000000000..a1d4af2991 --- /dev/null +++ b/manila/coordination.py @@ -0,0 +1,294 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Tooz Coordination and locking utilities.""" + +import inspect +import itertools +import random + +import decorator +import eventlet +from oslo_config import cfg +from oslo_log import log +from oslo_service import loopingcall +from oslo_utils import uuidutils +import six +from tooz import coordination +from tooz import locking + +from manila import exception +from manila.i18n import _, _LE, _LI, _LW + + +LOG = log.getLogger(__name__) + + +coordination_opts = [ + cfg.StrOpt('backend_url', + default='file://$state_path', + help='The back end URL to use for distributed coordination.'), + cfg.FloatOpt('heartbeat', + default=1.0, + help='Number of seconds between heartbeats for distributed ' + 'coordination.'), + cfg.FloatOpt('initial_reconnect_backoff', + default=0.1, + help='Initial number of seconds to wait after failed ' + 'reconnection.'), + cfg.FloatOpt('max_reconnect_backoff', + default=60.0, + help='Maximum number of seconds between sequential ' + 'reconnection retries.'), +] + +CONF = cfg.CONF +CONF.register_opts(coordination_opts, group='coordination') + + +class Coordinator(object): + """Tooz coordination wrapper. + + Coordination member id is created from concatenated `prefix` and + `agent_id` parameters. + + :param str agent_id: Agent identifier + :param str prefix: Used to provide member identifier with a + meaningful prefix. + """ + + def __init__(self, agent_id=None, prefix=''): + self.coordinator = None + self.agent_id = agent_id or uuidutils.generate_uuid() + self.started = False + self.prefix = prefix + self._heartbeat_thread = loopingcall.FixedIntervalLoopingCall( + self.heartbeat) + + def _is_active(self): + return self.started + + def start(self): + """Connect to coordination back end and start heartbeat.""" + if not self._is_active(): + try: + self._start() + self.started = True + # NOTE(gouthamr): Start heartbeat in separate thread to avoid + # being blocked by long co-routines. + if self.coordinator and self.coordinator.requires_beating: + LOG.debug("This tooz lock management back end supports " + "heart beats. Spawning a new thread to " + "send regular heart beats.") + self._heartbeat_thread.start( + cfg.CONF.coordination.heartbeat) + else: + LOG.debug("This tooz lock management back end does not " + "support heart beats.") + except coordination.ToozError: + LOG.exception(_LE('Error starting coordination back end.')) + raise + LOG.info(_LI('Coordination back end started successfully.')) + + def stop(self): + """Disconnect from coordination back end and stop heartbeat.""" + msg = _('Stopped Coordinator (Agent ID: %(agent)s, prefix: ' + '%(prefix)s)') + msg_args = {'agent': self.agent_id, 'prefix': self.prefix} + if self._is_active(): + + debug_msg = ('Stopping heartbeat thread for coordinator with ' + '(Agent ID: %(agent)s, prefix: %(prefix)s).') + LOG.debug(debug_msg, msg_args) + if self._heartbeat_thread is not None: + self._heartbeat_thread.stop() + self._heartbeat_thread = None + self.coordinator.stop() + self.coordinator = None + self.started = False + + LOG.info(msg, msg_args) + + def get_lock(self, name): + """Return a Tooz back end lock. + + :param str name: The lock name that is used to identify it + across all nodes. + """ + # NOTE(gouthamr): Tooz expects lock name as a byte string + lock_name = (self.prefix + name).encode('ascii') + if self._is_active(): + return self.coordinator.get_lock(lock_name) + else: + raise exception.LockCreationFailed(_('Coordinator uninitialized.')) + + def heartbeat(self): + """Coordinator heartbeat. + + Method that every couple of seconds (config: `coordination.heartbeat`) + sends heartbeat to prove that the member is not dead. + + If connection to coordination back end is broken it tries to + reconnect every couple of seconds + (config: `coordination.initial_reconnect_backoff` up to + `coordination.max_reconnect_backoff`) + + """ + if self._is_active(): + try: + self._heartbeat() + except coordination.ToozConnectionError: + self._reconnect() + + def _start(self): + # NOTE(gouthamr): Tooz expects member_id as a byte string. + member_id = (self.prefix + self.agent_id).encode('ascii') + self.coordinator = coordination.get_coordinator( + cfg.CONF.coordination.backend_url, member_id) + self.coordinator.start() + + def _heartbeat(self): + try: + self.coordinator.heartbeat() + except coordination.ToozConnectionError: + LOG.exception(_LE('Connection error while sending a heartbeat ' + 'to coordination back end.')) + raise + except coordination.ToozError: + LOG.exception(_LE('Error sending a heartbeat to coordination ' + 'back end.')) + + def _reconnect(self): + """Reconnect with jittered exponential back off.""" + LOG.info(_LI('Reconnecting to coordination back end.')) + cap = cfg.CONF.coordination.max_reconnect_backoff + backoff = base = cfg.CONF.coordination.initial_reconnect_backoff + for attempt in itertools.count(1): + try: + self._start() + break + except coordination.ToozError: + backoff = min(cap, random.uniform(base, backoff * 3)) + msg = _LW('Reconnect attempt %(attempt)s failed. ' + 'Next try in %(backoff).2fs.') + LOG.warning(msg, {'attempt': attempt, 'backoff': backoff}) + eventlet.sleep(backoff) + LOG.info(_LI('Reconnected to coordination back end.')) + + +LOCK_COORDINATOR = Coordinator(prefix='manila-') + + +class Lock(locking.Lock): + """Lock with dynamic name. + + :param str lock_name: Lock name. + :param dict lock_data: Data for lock name formatting. + :param coordinator: Coordinator object to use when creating lock. + Defaults to the global coordinator. + + Using it like so:: + + with Lock('mylock'): + ... + + ensures that only one process at a time will execute code in context. + Lock name can be formatted using Python format string syntax:: + + Lock('foo-{share.id}, {'share': ...,}') + + Available field names are keys of lock_data. + """ + def __init__(self, lock_name, lock_data=None, coordinator=None): + super(Lock, self).__init__(six.text_type(id(self))) + lock_data = lock_data or {} + self.coordinator = coordinator or LOCK_COORDINATOR + self.blocking = True + self.lock = self._prepare_lock(lock_name, lock_data) + + def _prepare_lock(self, lock_name, lock_data): + if not isinstance(lock_name, six.string_types): + raise ValueError(_('Not a valid string: %s') % lock_name) + return self.coordinator.get_lock(lock_name.format(**lock_data)) + + def acquire(self, blocking=None): + """Attempts to acquire lock. + + :param blocking: If True, blocks until the lock is acquired. If False, + returns right away. Otherwise, the value is used as a timeout + value and the call returns maximum after this number of seconds. + :return: returns true if acquired (false if not) + :rtype: bool + """ + blocking = self.blocking if blocking is None else blocking + return self.lock.acquire(blocking=blocking) + + def release(self): + """Attempts to release lock. + + The behavior of releasing a lock which was not acquired in the first + place is undefined. + """ + self.lock.release() + + +def synchronized(lock_name, blocking=True, coordinator=None): + """Synchronization decorator. + + :param str lock_name: Lock name. + :param blocking: If True, blocks until the lock is acquired. + If False, raises exception when not acquired. Otherwise, + the value is used as a timeout value and if lock is not acquired + after this number of seconds exception is raised. + :param coordinator: Coordinator object to use when creating lock. + Defaults to the global coordinator. + :raises tooz.coordination.LockAcquireFailed: if lock is not acquired + + Decorating a method like so:: + + @synchronized('mylock') + def foo(self, *args): + ... + + ensures that only one process will execute the foo method at a time. + + Different methods can share the same lock:: + + @synchronized('mylock') + def foo(self, *args): + ... + + @synchronized('mylock') + def bar(self, *args): + ... + + This way only one of either foo or bar can be executing at a time. + + Lock name can be formatted using Python format string syntax:: + + @synchronized('{f_name}-{shr.id}-{snap[name]}') + def foo(self, shr, snap): + ... + + Available field names are: decorated function parameters and + `f_name` as a decorated function name. + """ + @decorator.decorator + def _synchronized(f, *a, **k): + call_args = inspect.getcallargs(f, *a, **k) + call_args['f_name'] = f.__name__ + lock = Lock(lock_name, call_args, coordinator) + with lock(blocking): + LOG.debug('Lock "%(name)s" acquired by "%(function)s".', + {'name': lock_name, 'function': f.__name__}) + return f(*a, **k) + return _synchronized diff --git a/manila/exception.py b/manila/exception.py index 459b76a1f4..669e65851f 100644 --- a/manila/exception.py +++ b/manila/exception.py @@ -824,3 +824,12 @@ class HSPItemNotFoundException(ShareBackendException): class NexentaException(ShareBackendException): message = _("Exception due to Nexenta failure. %(reason)s") + + +# Tooz locking +class LockCreationFailed(ManilaException): + message = _('Unable to create lock. Coordination backend not started.') + + +class LockingFailed(ManilaException): + message = _('Lock acquisition failed.') diff --git a/manila/opts.py b/manila/opts.py index fe92c86094..834ed132c9 100644 --- a/manila/opts.py +++ b/manila/opts.py @@ -29,6 +29,7 @@ import manila.api.middleware.auth import manila.common.config import manila.compute import manila.compute.nova +import manila.coordination import manila.db.api import manila.db.base import manila.exception @@ -95,6 +96,7 @@ _global_opt_lists = [ manila.common.config.debug_opts, manila.common.config.global_opts, manila.compute._compute_opts, + manila.coordination.coordination_opts, manila.db.api.db_opts, [manila.db.base.db_driver_opt], manila.exception.exc_log_opts, diff --git a/manila/service.py b/manila/service.py index 074b690260..a61f3e54c5 100644 --- a/manila/service.py +++ b/manila/service.py @@ -29,6 +29,7 @@ from oslo_service import service from oslo_utils import importutils from manila import context +from manila import coordination from manila import db from manila import exception from manila.i18n import _, _LE, _LI, _LW @@ -75,7 +76,7 @@ class Service(service.Service): def __init__(self, host, binary, topic, manager, report_interval=None, periodic_interval=None, periodic_fuzzy_delay=None, - service_name=None, *args, **kwargs): + service_name=None, coordination=False, *args, **kwargs): super(Service, self).__init__() if not rpc.initialized(): rpc.init(CONF) @@ -92,6 +93,7 @@ class Service(service.Service): self.periodic_fuzzy_delay = periodic_fuzzy_delay self.saved_args, self.saved_kwargs = args, kwargs self.timers = [] + self.coordinator = coordination def start(self): version_string = version.version_string() @@ -99,6 +101,10 @@ class Service(service.Service): {'topic': self.topic, 'version_string': version_string}) self.model_disconnected = False ctxt = context.get_admin_context() + + if self.coordinator: + coordination.LOCK_COORDINATOR.start() + try: service_ref = db.service_get_by_args(ctxt, self.host, @@ -151,7 +157,8 @@ class Service(service.Service): @classmethod def create(cls, host=None, binary=None, topic=None, manager=None, report_interval=None, periodic_interval=None, - periodic_fuzzy_delay=None, service_name=None): + periodic_fuzzy_delay=None, service_name=None, + coordination=False): """Instantiates class and passes back application object. :param host: defaults to CONF.host @@ -182,7 +189,8 @@ class Service(service.Service): report_interval=report_interval, periodic_interval=periodic_interval, periodic_fuzzy_delay=periodic_fuzzy_delay, - service_name=service_name) + service_name=service_name, + coordination=coordination) return service_obj @@ -206,6 +214,13 @@ class Service(service.Service): x.stop() except Exception: pass + if self.coordinator: + try: + coordination.LOCK_COORDINATOR.stop() + except Exception: + LOG.exception(_LE("Unable to stop the Tooz Locking " + "Coordinator.")) + self.timers = [] super(Service, self).stop() diff --git a/manila/share/manager.py b/manila/share/manager.py index d6e09d68ea..c75f2b142e 100644 --- a/manila/share/manager.py +++ b/manila/share/manager.py @@ -35,6 +35,7 @@ import six from manila.common import constants from manila import context +from manila import coordination from manila.data import rpcapi as data_rpcapi from manila import exception from manila.i18n import _, _LE, _LI, _LW @@ -144,12 +145,12 @@ def locked_share_replica_operation(operation): def wrapped(*args, **kwargs): share_id = kwargs.get('share_id') - @utils.synchronized( - "locked_share_replica_operation_by_share_%s" % share_id, - external=True) - def locked_operation(*_args, **_kwargs): + @coordination.synchronized( + 'locked-share-replica-operation-for-share-%s' % share_id) + def locked_replica_operation(*_args, **_kwargs): return operation(*_args, **_kwargs) - return locked_operation(*args, **kwargs) + return locked_replica_operation(*args, **kwargs) + return wrapped diff --git a/manila/test.py b/manila/test.py index 1176b3e684..bf0054dbe1 100644 --- a/manila/test.py +++ b/manila/test.py @@ -35,6 +35,7 @@ from oslo_utils import uuidutils import oslotest.base as base_test from manila.api.openstack import api_version_request as api_version +from manila import coordination from manila.db import migration from manila.db.sqlalchemy import api as db_api from manila.db.sqlalchemy import models as db_models @@ -145,6 +146,12 @@ class TestCase(base_test.BaseTestCase): fake_notifier.stub_notifier(self) + # Locks must be cleaned up after tests + CONF.set_override('backend_url', 'file://' + lock_path, + group='coordination') + coordination.LOCK_COORDINATOR.start() + self.addCleanup(coordination.LOCK_COORDINATOR.stop) + def tearDown(self): """Runs after each test method to tear down test environment.""" super(TestCase, self).tearDown() diff --git a/manila/tests/cmd/test_share.py b/manila/tests/cmd/test_share.py index 2592bcc438..81c5fa984e 100644 --- a/manila/tests/cmd/test_share.py +++ b/manila/tests/cmd/test_share.py @@ -57,7 +57,9 @@ class ManilaCmdShareTestCase(test.TestCase): mock.call( host=fake_host + '@' + backend, service_name=backend, - binary='manila-share') for backend in backends + binary='manila-share', + coordination=True, + ) for backend in backends ]) self.launcher.launch_service.assert_has_calls([ mock.call(self.server) for backend in backends]) diff --git a/manila/tests/share/test_manager.py b/manila/tests/share/test_manager.py index 39fe887cc1..0c66bf1520 100644 --- a/manila/tests/share/test_manager.py +++ b/manila/tests/share/test_manager.py @@ -27,6 +27,7 @@ import six from manila.common import constants from manila import context +from manila import coordination from manila.data import rpcapi as data_rpc from manila import db from manila.db.sqlalchemy import models @@ -65,7 +66,7 @@ class LockedOperationsTestCase(test.TestCase): self.manager = self.FakeManager() self.fake_context = test_fakes.FakeRequestContext self.lock_call = self.mock_object( - utils, 'synchronized', mock.Mock(return_value=lambda f: f)) + coordination, 'synchronized', mock.Mock(return_value=lambda f: f)) @ddt.data({'id': 'FAKE_REPLICA_ID'}, 'FAKE_REPLICA_ID') @ddt.unpack @@ -94,7 +95,7 @@ class ShareManagerTestCase(test.TestCase): mock.patch.object( lockutils, 'lock', fake_utils.get_fake_lock_context()) self.synchronized_lock_decorator_call = self.mock_object( - utils, 'synchronized', mock.Mock(return_value=lambda f: f)) + coordination, 'synchronized', mock.Mock(return_value=lambda f: f)) def test_share_manager_instance(self): fake_service_name = "fake_service" diff --git a/manila/tests/test_coordination.py b/manila/tests/test_coordination.py new file mode 100644 index 0000000000..4036b8d4ee --- /dev/null +++ b/manila/tests/test_coordination.py @@ -0,0 +1,154 @@ +# Copyright 2015 Intel +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import ddt +import mock +from oslo_service import loopingcall +from tooz import coordination as tooz_coordination +from tooz import locking as tooz_locking + +from manila import coordination +from manila import test + + +class Locked(Exception): + pass + + +class MockToozLock(tooz_locking.Lock): + active_locks = set() + + def acquire(self, blocking=True): + if self.name not in self.active_locks: + self.active_locks.add(self.name) + return True + elif not blocking: + return False + else: + raise Locked + + def release(self): + self.active_locks.remove(self.name) + + +@mock.patch('time.sleep', lambda _: None) +@mock.patch('eventlet.sleep', lambda _: None) +@mock.patch('random.uniform', lambda _a, _b: 0) +@ddt.ddt +class CoordinatorTestCase(test.TestCase): + + def setUp(self): + super(CoordinatorTestCase, self).setUp() + self.get_coordinator = self.mock_object(tooz_coordination, + 'get_coordinator') + self.heartbeat = self.mock_object(coordination.Coordinator, + 'heartbeat') + + @ddt.data(True, False) + def test_coordinator_start_with_heartbeat(self, requires_beating): + mock_start_heartbeat = mock.Mock( + loopingcall, 'FixedIntervalLoopingCall').return_value + self.mock_object(loopingcall, 'FixedIntervalLoopingCall', + mock.Mock(return_value=mock_start_heartbeat)) + crd = self.get_coordinator.return_value + crd.requires_beating = requires_beating + + agent = coordination.Coordinator() + agent.start() + + self.assertTrue(self.get_coordinator.called) + self.assertTrue(crd.start.called) + self.assertEqual(requires_beating, mock_start_heartbeat.start.called) + + def test_coordinator_stop(self): + crd = self.get_coordinator.return_value + + agent = coordination.Coordinator() + agent.start() + + self.assertIsNotNone(agent.coordinator) + agent.stop() + + self.assertTrue(crd.stop.called) + self.assertIsNone(agent.coordinator) + + def test_coordinator_lock(self): + crd = self.get_coordinator.return_value + crd.get_lock.side_effect = lambda n: MockToozLock(n) + + agent1 = coordination.Coordinator() + agent1.start() + agent2 = coordination.Coordinator() + agent2.start() + + lock_string = 'lock' + expected_lock = lock_string.encode('ascii') + + self.assertNotIn(expected_lock, MockToozLock.active_locks) + with agent1.get_lock(lock_string): + self.assertIn(expected_lock, MockToozLock.active_locks) + self.assertRaises(Locked, agent1.get_lock(lock_string).acquire) + self.assertRaises(Locked, agent2.get_lock(lock_string).acquire) + self.assertNotIn(expected_lock, MockToozLock.active_locks) + + def test_coordinator_offline(self): + crd = self.get_coordinator.return_value + crd.start.side_effect = tooz_coordination.ToozConnectionError('err') + + agent = coordination.Coordinator() + self.assertRaises(tooz_coordination.ToozError, agent.start) + self.assertFalse(agent.started) + self.assertFalse(self.heartbeat.called) + + def test_coordinator_reconnect(self): + start_online = iter([True] + [False] * 5 + [True]) + heartbeat_online = iter((False, True, True)) + + def raiser(cond): + if not cond: + raise tooz_coordination.ToozConnectionError('err') + + crd = self.get_coordinator.return_value + crd.start.side_effect = lambda *_: raiser(next(start_online)) + crd.heartbeat.side_effect = lambda *_: raiser(next(heartbeat_online)) + + agent = coordination.Coordinator() + agent.start() + + self.assertRaises(tooz_coordination.ToozConnectionError, + agent._heartbeat) + self.assertEqual(1, self.get_coordinator.call_count) + agent._reconnect() + self.assertEqual(7, self.get_coordinator.call_count) + agent._heartbeat() + + +@mock.patch.object(coordination.LOCK_COORDINATOR, 'get_lock') +class CoordinationTestCase(test.TestCase): + def test_lock(self, get_lock): + with coordination.Lock('lock'): + self.assertTrue(get_lock.called) + + def test_synchronized(self, get_lock): + @coordination.synchronized('lock-{f_name}-{foo.val}-{bar[val]}') + def func(foo, bar): + pass + + foo = mock.Mock() + foo.val = 7 + bar = mock.MagicMock() + bar.__getitem__.return_value = 8 + func(foo, bar) + get_lock.assert_called_with('lock-func-7-8') diff --git a/releasenotes/notes/introduce-tooz-library-5fed75b8caffcf42.yaml b/releasenotes/notes/introduce-tooz-library-5fed75b8caffcf42.yaml new file mode 100644 index 0000000000..2fc79842b3 --- /dev/null +++ b/releasenotes/notes/introduce-tooz-library-5fed75b8caffcf42.yaml @@ -0,0 +1,15 @@ +--- +features: + - Add support for the tooz library. + - Allow configuration of file/distributed locking for the share manager + service. +upgrade: + - New options are necessary in manila.conf to specify the coordination + back-end URL (for example, a Distributed Locking Manager (DLM) back-end + or a file based lock location). The configuration determines the tooz + driver invoked for the locking/coordination. +fixes: + - Share replication workflows are coordinated by the share-manager service + with the help of the tooz library instead of oslo_concurrency. This + allows for deployers to configure Distributed Locking Management if + multiple manila-share services are run across different nodes. diff --git a/requirements.txt b/requirements.txt index 7607778f36..3a2a09417c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -38,6 +38,7 @@ Routes!=2.0,!=2.3.0,>=1.12.3;python_version!='2.7' # MIT six>=1.9.0 # MIT SQLAlchemy<1.1.0,>=1.0.10 # MIT stevedore>=1.17.1 # Apache-2.0 +tooz>=1.47.0 # Apache-2.0 python-cinderclient!=1.7.0,!=1.7.1,>=1.6.0 # Apache-2.0 python-novaclient!=2.33.0,>=2.29.0 # Apache-2.0 WebOb>=1.6.0 # MIT