Merge "Add the "cancellable" flag to the ``CallbacksManager`` events"

This commit is contained in:
Zuul 2023-11-15 23:07:08 +00:00 committed by Gerrit Code Review
commit 64c6937216
8 changed files with 120 additions and 49 deletions

View File

@ -51,6 +51,12 @@ PRECOMMIT = 'precommit_'
OVS_RESTARTED = 'ovs_restarted'
def is_cancellable_event(event):
"""Return if an event is cancellable by definition"""
return (event.startswith(BEFORE) or
event.startswith(PRECOMMIT))
class EventPayload(object):
"""Base event payload object.

View File

@ -50,9 +50,14 @@ class CallbackFailure(exceptions.MultipleExceptions):
class NotificationError(object):
def __init__(self, callback_id, error):
def __init__(self, callback_id, error, cancellable=False):
self.callback_id = callback_id
self.error = error
self._cancellable = cancellable
def __str__(self):
return 'Callback %s failed with "%s"' % (self.callback_id, self.error)
@property
def is_cancellable(self):
return self._cancellable

View File

@ -11,7 +11,6 @@
# under the License.
import collections
import itertools
from oslo_log import log as logging
from oslo_utils import reflection
@ -22,6 +21,10 @@ from neutron_lib.callbacks import priority_group
from neutron_lib.db import utils as db_utils
LOG = logging.getLogger(__name__)
PriorityCallbacks = collections.namedtuple(
'PriorityCallbacks', ['priority', 'pri_callbacks', 'cancellable'])
Callback = collections.namedtuple(
'Callback', ['id', 'method', 'cancellable'])
class CallbacksManager(object):
@ -31,7 +34,8 @@ class CallbacksManager(object):
self.clear()
def subscribe(self, callback, resource, event,
priority=priority_group.PRIORITY_DEFAULT):
priority=priority_group.PRIORITY_DEFAULT,
cancellable=False):
"""Subscribe callback for a resource event.
The same callback may register for more than one event.
@ -41,22 +45,26 @@ class CallbacksManager(object):
:param event: the event. It must be a valid event.
:param priority: the priority. Callbacks are sorted by priority
to be called. Smaller one is called earlier.
:param cancellable: if the callback is "cancellable", in case of
returning an exception, the callback manager will
raise a ``CallbackFailure`` exception.
"""
LOG.debug("Subscribe: %(callback)s %(resource)s %(event)s "
"%(priority)d",
"%(priority)d, %(cancellable)s",
{'callback': callback, 'resource': resource, 'event': event,
'priority': priority})
'priority': priority, 'cancellable': cancellable})
callback_id = _get_id(callback)
callbacks_list = self._callbacks[resource].setdefault(event, [])
for pc_pair in callbacks_list:
if pc_pair[0] == priority:
pri_callbacks = pc_pair[1]
pri_callbacks_list = self._callbacks[resource].setdefault(event, [])
for pri_callbacks in pri_callbacks_list:
if pri_callbacks.priority == priority:
pri_callbacks = pri_callbacks.pri_callbacks
break
else:
pri_callbacks = {}
callbacks_list.append((priority, pri_callbacks))
callbacks_list.sort(key=lambda x: x[0])
pri_callbacks_list.append(
PriorityCallbacks(priority, pri_callbacks, cancellable))
pri_callbacks_list.sort(key=lambda x: x.priority)
pri_callbacks[callback_id] = callback
# We keep a copy of callbacks to speed the unsubscribe operation.
@ -64,13 +72,12 @@ class CallbacksManager(object):
self._index[callback_id] = collections.defaultdict(set)
self._index[callback_id][resource].add(event)
def _del_callback(self, callbacks_list, callback_id):
for pc_pair in callbacks_list:
pri_callbacks = pc_pair[1]
if callback_id in pri_callbacks:
del pri_callbacks[callback_id]
if not pri_callbacks:
callbacks_list.remove(pc_pair)
def _del_callback(self, pri_callbacks, callback_id):
for pri_callback in pri_callbacks:
if callback_id in pri_callback.pri_callbacks:
del pri_callback.pri_callbacks[callback_id]
if not pri_callback.pri_callbacks:
pri_callbacks.remove(pri_callback)
break
def unsubscribe(self, callback, resource, event):
@ -156,7 +163,8 @@ class CallbacksManager(object):
raise exceptions.CallbackFailure(errors=errors)
if event.startswith(events.PRECOMMIT):
if (event.startswith(events.PRECOMMIT) or
any(error.is_cancellable for error in errors)):
raise exceptions.CallbackFailure(errors=errors)
def clear(self):
@ -167,32 +175,30 @@ class CallbacksManager(object):
def _notify_loop(self, resource, event, trigger, payload):
"""The notification loop."""
errors = []
# NOTE(yamahata): Since callback may unsubscribe it,
# convert iterator to list to avoid runtime error.
callbacks = list(itertools.chain(
*[pri_callbacks.items() for (priority, pri_callbacks)
in self._callbacks[resource].get(event, [])]))
callbacks = []
for pri_callbacks in self._callbacks[resource].get(event, []):
for cb_id, cb_method in pri_callbacks.pri_callbacks.items():
cb = Callback(cb_id, cb_method, pri_callbacks.cancellable)
callbacks.append(cb)
resource_id = getattr(payload, "resource_id", None)
LOG.debug("Publish callbacks %s for %s (%s), %s",
[c[0] for c in callbacks], resource, resource_id, event)
[c.id for c in callbacks], resource, resource_id, event)
# TODO(armax): consider using a GreenPile
for callback_id, callback in callbacks:
for callback in callbacks:
try:
callback(resource, event, trigger, payload=payload)
callback.method(resource, event, trigger, payload=payload)
except Exception as e:
abortable_event = (
event.startswith(events.BEFORE) or
event.startswith(events.PRECOMMIT)
)
if not abortable_event:
if not (events.is_cancellable_event(event) or
callback.cancellable):
LOG.exception("Error during notification for "
"%(callback)s %(resource)s, %(event)s",
{'callback': callback_id,
{'callback': callback.id,
'resource': resource, 'event': event})
else:
LOG.debug("Callback %(callback)s raised %(error)s",
{'callback': callback_id, 'error': e})
errors.append(exceptions.NotificationError(callback_id, e))
{'callback': callback.id, 'error': e})
errors.append(exceptions.NotificationError(
callback.id, e, cancellable=callback.cancellable))
return errors
def _find(self, callback):

View File

@ -34,8 +34,10 @@ def _get_callback_manager():
def subscribe(callback, resource, event,
priority=priority_group.PRIORITY_DEFAULT):
_get_callback_manager().subscribe(callback, resource, event, priority)
priority=priority_group.PRIORITY_DEFAULT,
cancellable=False):
_get_callback_manager().subscribe(callback, resource, event, priority,
cancellable)
def unsubscribe(callback, resource, event):

View File

@ -14,6 +14,7 @@
from unittest import mock
import ddt
from oslo_db import exception as db_exc
from oslotest import base
@ -64,10 +65,15 @@ def callback_raise_retriable(*args, **kwargs):
raise db_exc.DBDeadlock()
def callback_raise_not_retriable(*args, **kwargs):
raise Exception()
def callback_3(resource, event, trigger, payload):
callback_3.counter += 1
@ddt.ddt
class CallBacksManagerTestCase(base.BaseTestCase):
def setUp(self):
@ -78,14 +84,19 @@ class CallBacksManagerTestCase(base.BaseTestCase):
callback_2.counter = 0
callback_3.counter = 0
def test_subscribe(self):
@ddt.data(True, False)
def test_subscribe(self, cancellable):
self.manager.subscribe(
callback_1, resources.PORT, events.BEFORE_CREATE)
callback_1, resources.PORT, events.BEFORE_CREATE,
cancellable=cancellable)
self.assertIsNotNone(
self.manager._callbacks[resources.PORT][events.BEFORE_CREATE])
self.assertIn(callback_id_1, self.manager._index)
self.assertEqual(self.__module__ + '.callback_1-%s' %
hash(callback_1), callback_id_1)
self.assertEqual(cancellable,
self.manager._callbacks[resources.PORT]
[events.BEFORE_CREATE][0][2])
def test_subscribe_unknown(self):
self.manager.subscribe(
@ -95,13 +106,19 @@ class CallBacksManagerTestCase(base.BaseTestCase):
self.assertIn(callback_id_1, self.manager._index)
def test_subscribe_is_idempotent(self):
self.manager.subscribe(
callback_1, resources.PORT, events.BEFORE_CREATE)
self.manager.subscribe(
callback_1, resources.PORT, events.BEFORE_CREATE)
for cancellable in (True, False):
self.manager.subscribe(
callback_1, resources.PORT, events.BEFORE_CREATE,
cancellable=cancellable)
self.manager.subscribe(
callback_1, resources.PORT, events.BEFORE_CREATE,
cancellable=cancellable)
self.assertEqual(
1,
len(self.manager._callbacks[resources.PORT][events.BEFORE_CREATE]))
# The first event registered had cancellable=True.
self.assertTrue(self.manager._callbacks[resources.PORT]
[events.BEFORE_CREATE][0][2])
callbacks = self.manager._index[callback_id_1][resources.PORT]
self.assertEqual(1, len(callbacks))
@ -129,9 +146,11 @@ class CallBacksManagerTestCase(base.BaseTestCase):
payload=self.event_payload)
self.assertNotIn(unsub, self.manager._index)
def test_unsubscribe(self):
@ddt.data(True, False)
def test_unsubscribe(self, cancellable):
self.manager.subscribe(
callback_1, resources.PORT, events.BEFORE_CREATE)
callback_1, resources.PORT, events.BEFORE_CREATE,
cancellable=cancellable)
self.manager.unsubscribe(
callback_1, resources.PORT, events.BEFORE_CREATE)
self.assertNotIn(
@ -155,9 +174,11 @@ class CallBacksManagerTestCase(base.BaseTestCase):
self.manager.unsubscribe,
callback_1, None, events.BEFORE_CREATE)
def test_unsubscribe_is_idempotent(self):
@ddt.data(True, False)
def test_unsubscribe_is_idempotent(self, cancellable):
self.manager.subscribe(
callback_1, resources.PORT, events.BEFORE_CREATE)
callback_1, resources.PORT, events.BEFORE_CREATE,
cancellable=cancellable)
self.manager.unsubscribe(
callback_1, resources.PORT, events.BEFORE_CREATE)
self.manager.unsubscribe(
@ -256,6 +277,28 @@ class CallBacksManagerTestCase(base.BaseTestCase):
resources.PORT, events.BEFORE_CREATE, self,
payload=self.event_payload)
def test_publish_handle_not_retriable_exception(self):
self.manager.subscribe(
callback_raise_not_retriable, resources.PORT, events.BEFORE_CREATE)
self.assertRaises(exceptions.CallbackFailure, self.manager.publish,
resources.PORT, events.BEFORE_CREATE, self,
payload=self.event_payload)
def test_publish_handle_not_retriable_exception_no_cancellable_flag(self):
self.manager.subscribe(
callback_raise_not_retriable, resources.PORT, events.AFTER_INIT)
# No exception is raised.
self.manager.publish(resources.PORT, events.AFTER_INIT, self,
payload=self.event_payload)
def test_publish_handle_not_retriable_exception_cancellable_flag(self):
self.manager.subscribe(
callback_raise_not_retriable, resources.PORT, events.AFTER_INIT,
cancellable=True)
self.assertRaises(exceptions.CallbackFailure, self.manager.publish,
resources.PORT, events.AFTER_INIT, self,
payload=self.event_payload)
def test_publish_called_once_with_no_failures(self):
with mock.patch.object(self.manager, '_notify_loop') as n:
n.return_value = False

View File

@ -132,13 +132,13 @@ class TestCallbackRegistryDispatching(base.BaseTestCase):
registry.subscribe(my_callback, 'my-resource', 'my-event')
self.callback_manager.subscribe.assert_called_with(
my_callback, 'my-resource', 'my-event',
priority_group.PRIORITY_DEFAULT)
priority_group.PRIORITY_DEFAULT, False)
def test_subscribe_explicit_priority(self):
registry.subscribe(my_callback, 'my-resource', 'my-event',
PRI_CALLBACK)
self.callback_manager.subscribe.assert_called_with(
my_callback, 'my-resource', 'my-event', PRI_CALLBACK)
my_callback, 'my-resource', 'my-event', PRI_CALLBACK, False)
def test_unsubscribe(self):
registry.unsubscribe(my_callback, 'my-resource', 'my-event')

View File

@ -0,0 +1,8 @@
---
features:
- |
``CallbacksManager`` can now subscribe cancellable events. By default,
only ``before_`` and ``precommit_`` events, in case of error, can raise a
``CallbackFailure`` exception. Now, if the event is subscribed with
the flag ``cancellable`` enabled, the ``publish`` method will raise this
exception if the callback fails and returns an error.

View File

@ -6,6 +6,7 @@ hacking>=3.0.1,<3.1.0 # Apache-2.0
bandit!=1.6.0,>=1.1.0 # Apache-2.0
coverage!=4.4,>=4.0 # Apache-2.0
ddt>=1.0.1 # MIT
fixtures>=3.0.0 # Apache-2.0/BSD
flake8-import-order==0.12 # LGPLv3
pylint>=2.2.0 # GPLv2