From 08b8c6d33b34de4be96116ed78835645dca9493a Mon Sep 17 00:00:00 2001 From: Rodolfo Alonso Hernandez Date: Mon, 11 Sep 2023 14:30:47 +0000 Subject: [PATCH] Add the "cancellable" flag to the ``CallbacksManager`` events The ``CallbacksManager`` class considers, by default, that the events starting with "before_" and "precommit_" can raise an Exception (``CallbackFailure``) in case that the callbacks associated to these methods exit with an error. However there are some other events (those started with "after_") that won't generate an exception in case of error. The error will be logged but the process will continue. This new functionality adds the possibility of adding any kind of event and mark is as "cancellable". The ``CallbacksManager`` instance will check the errors returned by the callback methods and if any of them is marked as "cancellable", the manager will raise a ``CallbackFailure`` exception, terminating the process. In case of being a Neutron worker, for example, the ``oslo_service.service.Services`` class will restart the process again. Related-Bug: #2036607 Change-Id: Ie1e7be6d70cca957c1b1b6c15b402e8bc6523865 --- neutron_lib/callbacks/events.py | 6 ++ neutron_lib/callbacks/exceptions.py | 7 +- neutron_lib/callbacks/manager.py | 74 ++++++++++--------- neutron_lib/callbacks/registry.py | 6 +- .../tests/unit/callbacks/test_manager.py | 63 +++++++++++++--- .../tests/unit/callbacks/test_registry.py | 4 +- ...r-cancellable-events-966d76925db919a8.yaml | 8 ++ test-requirements.txt | 1 + 8 files changed, 120 insertions(+), 49 deletions(-) create mode 100644 releasenotes/notes/callbacksmanager-cancellable-events-966d76925db919a8.yaml diff --git a/neutron_lib/callbacks/events.py b/neutron_lib/callbacks/events.py index 7f5d1ca0d..122798bb2 100644 --- a/neutron_lib/callbacks/events.py +++ b/neutron_lib/callbacks/events.py @@ -51,6 +51,12 @@ PRECOMMIT = 'precommit_' OVS_RESTARTED = 'ovs_restarted' +def is_cancellable_event(event): + """Return if an event is cancellable by definition""" + return (event.startswith(BEFORE) or + event.startswith(PRECOMMIT)) + + class EventPayload(object): """Base event payload object. diff --git a/neutron_lib/callbacks/exceptions.py b/neutron_lib/callbacks/exceptions.py index f2bea0d41..867f2e80e 100644 --- a/neutron_lib/callbacks/exceptions.py +++ b/neutron_lib/callbacks/exceptions.py @@ -50,9 +50,14 @@ class CallbackFailure(exceptions.MultipleExceptions): class NotificationError(object): - def __init__(self, callback_id, error): + def __init__(self, callback_id, error, cancellable=False): self.callback_id = callback_id self.error = error + self._cancellable = cancellable def __str__(self): return 'Callback %s failed with "%s"' % (self.callback_id, self.error) + + @property + def is_cancellable(self): + return self._cancellable diff --git a/neutron_lib/callbacks/manager.py b/neutron_lib/callbacks/manager.py index 5908f666b..ef40022a4 100644 --- a/neutron_lib/callbacks/manager.py +++ b/neutron_lib/callbacks/manager.py @@ -11,7 +11,6 @@ # under the License. import collections -import itertools from oslo_log import log as logging from oslo_utils import reflection @@ -22,6 +21,10 @@ from neutron_lib.callbacks import priority_group from neutron_lib.db import utils as db_utils LOG = logging.getLogger(__name__) +PriorityCallbacks = collections.namedtuple( + 'PriorityCallbacks', ['priority', 'pri_callbacks', 'cancellable']) +Callback = collections.namedtuple( + 'Callback', ['id', 'method', 'cancellable']) class CallbacksManager(object): @@ -31,7 +34,8 @@ class CallbacksManager(object): self.clear() def subscribe(self, callback, resource, event, - priority=priority_group.PRIORITY_DEFAULT): + priority=priority_group.PRIORITY_DEFAULT, + cancellable=False): """Subscribe callback for a resource event. The same callback may register for more than one event. @@ -41,22 +45,26 @@ class CallbacksManager(object): :param event: the event. It must be a valid event. :param priority: the priority. Callbacks are sorted by priority to be called. Smaller one is called earlier. + :param cancellable: if the callback is "cancellable", in case of + returning an exception, the callback manager will + raise a ``CallbackFailure`` exception. """ LOG.debug("Subscribe: %(callback)s %(resource)s %(event)s " - "%(priority)d", + "%(priority)d, %(cancellable)s", {'callback': callback, 'resource': resource, 'event': event, - 'priority': priority}) + 'priority': priority, 'cancellable': cancellable}) callback_id = _get_id(callback) - callbacks_list = self._callbacks[resource].setdefault(event, []) - for pc_pair in callbacks_list: - if pc_pair[0] == priority: - pri_callbacks = pc_pair[1] + pri_callbacks_list = self._callbacks[resource].setdefault(event, []) + for pri_callbacks in pri_callbacks_list: + if pri_callbacks.priority == priority: + pri_callbacks = pri_callbacks.pri_callbacks break else: pri_callbacks = {} - callbacks_list.append((priority, pri_callbacks)) - callbacks_list.sort(key=lambda x: x[0]) + pri_callbacks_list.append( + PriorityCallbacks(priority, pri_callbacks, cancellable)) + pri_callbacks_list.sort(key=lambda x: x.priority) pri_callbacks[callback_id] = callback # We keep a copy of callbacks to speed the unsubscribe operation. @@ -64,13 +72,12 @@ class CallbacksManager(object): self._index[callback_id] = collections.defaultdict(set) self._index[callback_id][resource].add(event) - def _del_callback(self, callbacks_list, callback_id): - for pc_pair in callbacks_list: - pri_callbacks = pc_pair[1] - if callback_id in pri_callbacks: - del pri_callbacks[callback_id] - if not pri_callbacks: - callbacks_list.remove(pc_pair) + def _del_callback(self, pri_callbacks, callback_id): + for pri_callback in pri_callbacks: + if callback_id in pri_callback.pri_callbacks: + del pri_callback.pri_callbacks[callback_id] + if not pri_callback.pri_callbacks: + pri_callbacks.remove(pri_callback) break def unsubscribe(self, callback, resource, event): @@ -156,7 +163,8 @@ class CallbacksManager(object): raise exceptions.CallbackFailure(errors=errors) - if event.startswith(events.PRECOMMIT): + if (event.startswith(events.PRECOMMIT) or + any(error.is_cancellable for error in errors)): raise exceptions.CallbackFailure(errors=errors) def clear(self): @@ -167,32 +175,30 @@ class CallbacksManager(object): def _notify_loop(self, resource, event, trigger, payload): """The notification loop.""" errors = [] - # NOTE(yamahata): Since callback may unsubscribe it, - # convert iterator to list to avoid runtime error. - callbacks = list(itertools.chain( - *[pri_callbacks.items() for (priority, pri_callbacks) - in self._callbacks[resource].get(event, [])])) + callbacks = [] + for pri_callbacks in self._callbacks[resource].get(event, []): + for cb_id, cb_method in pri_callbacks.pri_callbacks.items(): + cb = Callback(cb_id, cb_method, pri_callbacks.cancellable) + callbacks.append(cb) resource_id = getattr(payload, "resource_id", None) LOG.debug("Publish callbacks %s for %s (%s), %s", - [c[0] for c in callbacks], resource, resource_id, event) + [c.id for c in callbacks], resource, resource_id, event) # TODO(armax): consider using a GreenPile - for callback_id, callback in callbacks: + for callback in callbacks: try: - callback(resource, event, trigger, payload=payload) + callback.method(resource, event, trigger, payload=payload) except Exception as e: - abortable_event = ( - event.startswith(events.BEFORE) or - event.startswith(events.PRECOMMIT) - ) - if not abortable_event: + if not (events.is_cancellable_event(event) or + callback.cancellable): LOG.exception("Error during notification for " "%(callback)s %(resource)s, %(event)s", - {'callback': callback_id, + {'callback': callback.id, 'resource': resource, 'event': event}) else: LOG.debug("Callback %(callback)s raised %(error)s", - {'callback': callback_id, 'error': e}) - errors.append(exceptions.NotificationError(callback_id, e)) + {'callback': callback.id, 'error': e}) + errors.append(exceptions.NotificationError( + callback.id, e, cancellable=callback.cancellable)) return errors def _find(self, callback): diff --git a/neutron_lib/callbacks/registry.py b/neutron_lib/callbacks/registry.py index 1863da6f6..a618a124c 100644 --- a/neutron_lib/callbacks/registry.py +++ b/neutron_lib/callbacks/registry.py @@ -34,8 +34,10 @@ def _get_callback_manager(): def subscribe(callback, resource, event, - priority=priority_group.PRIORITY_DEFAULT): - _get_callback_manager().subscribe(callback, resource, event, priority) + priority=priority_group.PRIORITY_DEFAULT, + cancellable=False): + _get_callback_manager().subscribe(callback, resource, event, priority, + cancellable) def unsubscribe(callback, resource, event): diff --git a/neutron_lib/tests/unit/callbacks/test_manager.py b/neutron_lib/tests/unit/callbacks/test_manager.py index 19229823b..3b70ca83b 100644 --- a/neutron_lib/tests/unit/callbacks/test_manager.py +++ b/neutron_lib/tests/unit/callbacks/test_manager.py @@ -14,6 +14,7 @@ from unittest import mock +import ddt from oslo_db import exception as db_exc from oslotest import base @@ -64,10 +65,15 @@ def callback_raise_retriable(*args, **kwargs): raise db_exc.DBDeadlock() +def callback_raise_not_retriable(*args, **kwargs): + raise Exception() + + def callback_3(resource, event, trigger, payload): callback_3.counter += 1 +@ddt.ddt class CallBacksManagerTestCase(base.BaseTestCase): def setUp(self): @@ -78,14 +84,19 @@ class CallBacksManagerTestCase(base.BaseTestCase): callback_2.counter = 0 callback_3.counter = 0 - def test_subscribe(self): + @ddt.data(True, False) + def test_subscribe(self, cancellable): self.manager.subscribe( - callback_1, resources.PORT, events.BEFORE_CREATE) + callback_1, resources.PORT, events.BEFORE_CREATE, + cancellable=cancellable) self.assertIsNotNone( self.manager._callbacks[resources.PORT][events.BEFORE_CREATE]) self.assertIn(callback_id_1, self.manager._index) self.assertEqual(self.__module__ + '.callback_1-%s' % hash(callback_1), callback_id_1) + self.assertEqual(cancellable, + self.manager._callbacks[resources.PORT] + [events.BEFORE_CREATE][0][2]) def test_subscribe_unknown(self): self.manager.subscribe( @@ -95,13 +106,19 @@ class CallBacksManagerTestCase(base.BaseTestCase): self.assertIn(callback_id_1, self.manager._index) def test_subscribe_is_idempotent(self): - self.manager.subscribe( - callback_1, resources.PORT, events.BEFORE_CREATE) - self.manager.subscribe( - callback_1, resources.PORT, events.BEFORE_CREATE) + for cancellable in (True, False): + self.manager.subscribe( + callback_1, resources.PORT, events.BEFORE_CREATE, + cancellable=cancellable) + self.manager.subscribe( + callback_1, resources.PORT, events.BEFORE_CREATE, + cancellable=cancellable) self.assertEqual( 1, len(self.manager._callbacks[resources.PORT][events.BEFORE_CREATE])) + # The first event registered had cancellable=True. + self.assertTrue(self.manager._callbacks[resources.PORT] + [events.BEFORE_CREATE][0][2]) callbacks = self.manager._index[callback_id_1][resources.PORT] self.assertEqual(1, len(callbacks)) @@ -129,9 +146,11 @@ class CallBacksManagerTestCase(base.BaseTestCase): payload=self.event_payload) self.assertNotIn(unsub, self.manager._index) - def test_unsubscribe(self): + @ddt.data(True, False) + def test_unsubscribe(self, cancellable): self.manager.subscribe( - callback_1, resources.PORT, events.BEFORE_CREATE) + callback_1, resources.PORT, events.BEFORE_CREATE, + cancellable=cancellable) self.manager.unsubscribe( callback_1, resources.PORT, events.BEFORE_CREATE) self.assertNotIn( @@ -155,9 +174,11 @@ class CallBacksManagerTestCase(base.BaseTestCase): self.manager.unsubscribe, callback_1, None, events.BEFORE_CREATE) - def test_unsubscribe_is_idempotent(self): + @ddt.data(True, False) + def test_unsubscribe_is_idempotent(self, cancellable): self.manager.subscribe( - callback_1, resources.PORT, events.BEFORE_CREATE) + callback_1, resources.PORT, events.BEFORE_CREATE, + cancellable=cancellable) self.manager.unsubscribe( callback_1, resources.PORT, events.BEFORE_CREATE) self.manager.unsubscribe( @@ -256,6 +277,28 @@ class CallBacksManagerTestCase(base.BaseTestCase): resources.PORT, events.BEFORE_CREATE, self, payload=self.event_payload) + def test_publish_handle_not_retriable_exception(self): + self.manager.subscribe( + callback_raise_not_retriable, resources.PORT, events.BEFORE_CREATE) + self.assertRaises(exceptions.CallbackFailure, self.manager.publish, + resources.PORT, events.BEFORE_CREATE, self, + payload=self.event_payload) + + def test_publish_handle_not_retriable_exception_no_cancellable_flag(self): + self.manager.subscribe( + callback_raise_not_retriable, resources.PORT, events.AFTER_INIT) + # No exception is raised. + self.manager.publish(resources.PORT, events.AFTER_INIT, self, + payload=self.event_payload) + + def test_publish_handle_not_retriable_exception_cancellable_flag(self): + self.manager.subscribe( + callback_raise_not_retriable, resources.PORT, events.AFTER_INIT, + cancellable=True) + self.assertRaises(exceptions.CallbackFailure, self.manager.publish, + resources.PORT, events.AFTER_INIT, self, + payload=self.event_payload) + def test_publish_called_once_with_no_failures(self): with mock.patch.object(self.manager, '_notify_loop') as n: n.return_value = False diff --git a/neutron_lib/tests/unit/callbacks/test_registry.py b/neutron_lib/tests/unit/callbacks/test_registry.py index 8cce84499..5ede69754 100644 --- a/neutron_lib/tests/unit/callbacks/test_registry.py +++ b/neutron_lib/tests/unit/callbacks/test_registry.py @@ -132,13 +132,13 @@ class TestCallbackRegistryDispatching(base.BaseTestCase): registry.subscribe(my_callback, 'my-resource', 'my-event') self.callback_manager.subscribe.assert_called_with( my_callback, 'my-resource', 'my-event', - priority_group.PRIORITY_DEFAULT) + priority_group.PRIORITY_DEFAULT, False) def test_subscribe_explicit_priority(self): registry.subscribe(my_callback, 'my-resource', 'my-event', PRI_CALLBACK) self.callback_manager.subscribe.assert_called_with( - my_callback, 'my-resource', 'my-event', PRI_CALLBACK) + my_callback, 'my-resource', 'my-event', PRI_CALLBACK, False) def test_unsubscribe(self): registry.unsubscribe(my_callback, 'my-resource', 'my-event') diff --git a/releasenotes/notes/callbacksmanager-cancellable-events-966d76925db919a8.yaml b/releasenotes/notes/callbacksmanager-cancellable-events-966d76925db919a8.yaml new file mode 100644 index 000000000..91461f307 --- /dev/null +++ b/releasenotes/notes/callbacksmanager-cancellable-events-966d76925db919a8.yaml @@ -0,0 +1,8 @@ +--- +features: + - | + ``CallbacksManager`` can now subscribe cancellable events. By default, + only ``before_`` and ``precommit_`` events, in case of error, can raise a + ``CallbackFailure`` exception. Now, if the event is subscribed with + the flag ``cancellable`` enabled, the ``publish`` method will raise this + exception if the callback fails and returns an error. diff --git a/test-requirements.txt b/test-requirements.txt index 036b8a912..652d30185 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -6,6 +6,7 @@ hacking>=3.0.1,<3.1.0 # Apache-2.0 bandit!=1.6.0,>=1.1.0 # Apache-2.0 coverage!=4.4,>=4.0 # Apache-2.0 +ddt>=1.0.1 # MIT fixtures>=3.0.0 # Apache-2.0/BSD flake8-import-order==0.12 # LGPLv3 pylint>=2.2.0 # GPLv2