From d79798389e97317a961e86b20ec47c3053f27060 Mon Sep 17 00:00:00 2001 From: Boden R Date: Mon, 24 Apr 2017 08:59:51 -0600 Subject: [PATCH] remove and shim callbacks The callback modules have been available in neutron-lib since commit [1] and are ready for consumption. As the callback registry is implemented with a singleton manager instance, sync complications can arise ensuring all consumers switch to lib's implementation at the same time. Therefore this consumption has been broken down: 1) Shim neutron's callbacks using lib's callback system and remove existing neutron internals related to callbacks (devref, UTs, etc.). 2) Switch all neutron's callback imports over to neutron-lib's. 3) Have all sub-projects using callbacks move their imports over to use neutron-lib's callbacks implementation. 4) Remove the callback shims in neutron-lib once sub-projects are moved over to lib's callbacks. 5) Follow-on patches moving our existing uses of callbacks to the new event payload model provided by neutron-lib.callback.events This patch implements #1 from above, shimming neutron's callbacks and removing devref + UTs. Rather than shimming using debtcollector, this patch leaves callback constants as-is, and simply references the lib class/function in its respective neutron callback module. This allows consumers to test callback types without changing code. For example, an except block block like that below continues to work even though the raised exception now lives in lib:: try: neutron_cb_registry.notify(...) except neutron_cb_exceptions.CallbackFailure: handle_exception() In addition this patch contains minor UT updates to support the shim approach. NeutronLibImpact [1] fea8bb64ba7ff52632c2bd3e3298eaedf623ee4f Change-Id: Ib6baee2aaeb044aaba42a97b35900d75dd43021f --- doc/source/devref/callbacks.rst | 441 ------------------ doc/source/devref/index.rst | 1 - doc/source/devref/rpc_callbacks.rst | 7 +- neutron/callbacks/events.py | 2 + neutron/callbacks/exceptions.py | 44 +- neutron/callbacks/manager.py | 164 +------ neutron/callbacks/registry.py | 109 +---- neutron/callbacks/resources.py | 2 + neutron/tests/base.py | 12 +- neutron/tests/unit/callbacks/__init__.py | 0 neutron/tests/unit/callbacks/test_manager.py | 258 ---------- neutron/tests/unit/callbacks/test_registry.py | 93 ---- 12 files changed, 31 insertions(+), 1102 deletions(-) delete mode 100644 doc/source/devref/callbacks.rst delete mode 100644 neutron/tests/unit/callbacks/__init__.py delete mode 100644 neutron/tests/unit/callbacks/test_manager.py delete mode 100644 neutron/tests/unit/callbacks/test_registry.py diff --git a/doc/source/devref/callbacks.rst b/doc/source/devref/callbacks.rst deleted file mode 100644 index bce93be422f..00000000000 --- a/doc/source/devref/callbacks.rst +++ /dev/null @@ -1,441 +0,0 @@ -.. - Licensed under the Apache License, Version 2.0 (the "License"); you may - not use this file except in compliance with the License. You may obtain - a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - License for the specific language governing permissions and limitations - under the License. - - - Convention for heading levels in Neutron devref: - ======= Heading 0 (reserved for the title in a document) - ------- Heading 1 - ~~~~~~~ Heading 2 - +++++++ Heading 3 - ''''''' Heading 4 - (Avoid deeper levels because they do not render well.) - - -Neutron Callback System -======================= - -In Neutron, core and service components may need to cooperate during the -execution of certain operations, or they may need to react upon the occurrence -of certain events. For instance, when a Neutron resource is associated to -multiple services, the components in charge of these services may need to play -an active role in determining what the right state of the resource needs to be. - -The cooperation may be achieved by making each object aware of each other, but -this leads to tight coupling, or alternatively it can be achieved by using a -callback-based system, where the same objects are allowed to cooperate in a -loose manner. - -This is particularly important since the spin off of the advanced services like -VPN, Firewall and Load Balancer, where each service's codebase lives independently -from the core and from one another. This means that the tight coupling is no longer -a practical solution for object cooperation. In addition to this, if more services -are developed independently, there is no viable integration between them and the -Neutron core. A callback system, and its registry, tries to address these issues. - -In object-oriented software systems, method invocation is also known as message -passing: an object passes a message to another object, and it may or may not expect -a message back. This point-to-point interaction can take place between the parties -directly involved in the communication, or it can happen via an intermediary. The -intermediary is then in charge of keeping track of who is interested in the messages -and in delivering the messages forth and back, when required. As mentioned earlier, -the use of an intermediary has the benefit of decoupling the parties involved -in the communications, as now they only need to know about the intermediary; the -other benefit is that the use of an intermediary opens up the possibility of -multiple party communication: more than one object can express interest in -receiving the same message, and the same message can be delivered to more than -one object. To this aim, the intermediary is the entity that exists throughout -the system lifecycle, as it needs to be able to track whose interest is associated -to what message. - -In a design for a system that enables callback-based communication, the following -aspects need to be taken into account: - -* how to become consumer of messages (i.e. how to be on the receiving end of the message); -* how to become producer of messages (i.e. how to be on the sending end of the message); -* how to consume/produce messages selectively; - -Translate and narrow this down to Neutron needs, and this means the design of a callback -system where messages are about lifecycle events (e.g. before creation, before -deletion, etc.) of Neutron resources (e.g. networks, routers, ports, etc.), where the -various parties can express interest in knowing when these events for a specific -resources take place. - -Rather than keeping the conversation abstract, let us delve into some examples, that would -help understand better some of the principles behind the provided mechanism. - - -Subscribing to events ---------------------- - -Imagine that you have entity A, B, and C that have some common business over router creation. -A wants to tell B and C that the router has been created and that they need to get on and -do whatever they are supposed to do. In a callback-less world this would work like so: - -:: - - # A is done creating the resource - # A gets hold of the references of B and C - # A calls B - # A calls C - B->my_random_method_for_knowing_about_router_created() - C->my_random_very_difficult_to_remember_method_about_router_created() - -If B and/or C change, things become sour. In a callback-based world, things become a lot -more uniform and straightforward: - -:: - - # B and C ask I to be notified when A is done creating the resource - - # ... - # A is done creating the resource - # A gets hold of the reference to the intermediary I - # A calls I - I->notify() - -Since B and C will have expressed interest in knowing about A's business, 'I' will -deliver the messages to B and C. If B and C changes, A and 'I' do not need to change. - -In practical terms this scenario would be translated in the code below: - -:: - - from neutron.callbacks import events - from neutron.callbacks import resources - from neutron.callbacks import registry - - - def callback1(resource, event, trigger, **kwargs): - print('Callback1 called by trigger: ', trigger) - print('kwargs: ', kwargs) - - def callback2(resource, event, trigger, **kwargs): - print('Callback2 called by trigger: ', trigger) - print('kwargs: ', kwargs) - - - # B and C express interest with I - registry.subscribe(callback1, resources.ROUTER, events.BEFORE_CREATE) - registry.subscribe(callback2, resources.ROUTER, events.BEFORE_CREATE) - print('Subscribed') - - - # A notifies - def do_notify(): - kwargs = {'foo': 'bar'} - registry.notify(resources.ROUTER, events.BEFORE_CREATE, do_notify, **kwargs) - - - print('Notifying...') - do_notify() - - -The output is: - -:: - - > Subscribed - > Notifying... - > Callback2 called by trigger: - > kwargs: {'foo': 'bar'} - > Callback1 called by trigger: - > kwargs: {'foo': 'bar'} - -Thanks to the intermediary existence throughout the life of the system, A, B, and C -are flexible to evolve their internals, dynamics, and lifecycles. - - -Subscribing and aborting events -------------------------------- - -Interestingly in Neutron, certain events may need to be forbidden from happening due to the -nature of the resources involved. To this aim, the callback-based mechanism has been designed -to support a use case where, when callbacks subscribe to specific events, the action that -results from it, may lead to the propagation of a message back to the sender, so that it itself -can be alerted and stop the execution of the activity that led to the message dispatch in the -first place. - -The typical example is where a resource, like a router, is used by one or more high-level -service(s), like a VPN or a Firewall, and actions like interface removal or router destruction -cannot not take place, because the resource is shared. - -To address this scenario, special events are introduced, 'BEFORE_*' events, to which callbacks -can subscribe and have the opportunity to 'abort', by raising an exception when notified. - -Since multiple callbacks may express an interest in the same event for a particular resource, -and since callbacks are executed independently from one another, this may lead to situations -where notifications that occurred before the exception must be aborted. To this aim, when an -exception occurs during the notification process, an abort_* event is propagated immediately -after. It is up to the callback developer to determine whether subscribing to an abort -notification is required in order to revert the actions performed during the initial execution -of the callback (when the BEFORE_* event was fired). Exceptions caused by callbacks registered -to abort events are ignored. The snippet below shows this in action: - -:: - - from neutron.callbacks import events - from neutron.callbacks import exceptions - from neutron.callbacks import resources - from neutron.callbacks import registry - - - def callback1(resource, event, trigger, **kwargs): - raise Exception('I am failing!') - - def callback2(resource, event, trigger, **kwargs): - print('Callback2 called by %s on event %s' % (trigger, event)) - - - registry.subscribe(callback1, resources.ROUTER, events.BEFORE_CREATE) - registry.subscribe(callback2, resources.ROUTER, events.BEFORE_CREATE) - registry.subscribe(callback2, resources.ROUTER, events.ABORT_CREATE) - print('Subscribed') - - - def do_notify(): - kwargs = {'foo': 'bar'} - registry.notify(resources.ROUTER, events.BEFORE_CREATE, do_notify, **kwargs) - - - print('Notifying...') - try: - do_notify() - except exceptions.CallbackFailure as e: - print('Error: ', e) - -The output is: - -:: - - > Subscribed - > Notifying... - > Callback2 called by on event before_create - > Callback2 called by on event abort_create - > Error: Callback __main__.callback1 failed with "I am failing!" - -In this case, upon the notification of the BEFORE_CREATE event, Callback1 triggers an exception -that can be used to stop the action from taking place in do_notify(). On the other end, Callback2 -will be executing twice, once for dealing with the BEFORE_CREATE event, and once to undo the -actions during the ABORT_CREATE event. It is worth noting that it is not mandatory to have -the same callback register to both BEFORE_* and the respective ABORT_* event; as a matter of -fact, it is best to make use of different callbacks to keep the two logic separate. - -As we can see from the last example, exception which is triggered in some callback will be -recorded, and it will not prevent the other remaining callbacks execution. Exception triggered in -callback of BEFORE_XXX will make notify process generate an ABORT_XXX event and call the related -callback, while exception from PRECOMMIT_XXX will not generate ABORT_XXX event. But both of them -will finally raise a unified CallbackFailure exception to the outside. For the exception triggered -from other events, like AFTER_XXX and ABORT_XXX there will no exception raised to the outside. - - -Unsubscribing to events ------------------------ - -There are a few options to unsubscribe registered callbacks: - -* clear(): it unsubscribes all subscribed callbacks: this can be useful especially when - winding down the system, and notifications shall no longer be triggered. -* unsubscribe(): it selectively unsubscribes a callback for a specific resource's event. - Say callback C has subscribed to event A for resource R, any notification of event A - for resource R will no longer be handed over to C, after the unsubscribe() invocation. -* unsubscribe_by_resource(): say that callback C has subscribed to event A, B, and C for - resource R, any notification of events related to resource R will no longer be handed - over to C, after the unsubscribe_by_resource() invocation. -* unsubscribe_all(): say that callback C has subscribed to events A, B for resource R1, - and events C, D for resource R2, any notification of events pertaining resources R1 and - R2 will no longer be handed over to C, after the unsubscribe_all() invocation. - -The snippet below shows these concepts in action: - -:: - - from neutron.callbacks import events - from neutron.callbacks import exceptions - from neutron.callbacks import resources - from neutron.callbacks import registry - - - def callback1(resource, event, trigger, **kwargs): - print('Callback1 called by %s on event %s for resource %s' % (trigger, event, resource)) - - - def callback2(resource, event, trigger, **kwargs): - print('Callback2 called by %s on event %s for resource %s' % (trigger, event, resource)) - - - registry.subscribe(callback1, resources.ROUTER, events.BEFORE_READ) - registry.subscribe(callback1, resources.ROUTER, events.BEFORE_CREATE) - registry.subscribe(callback1, resources.ROUTER, events.AFTER_DELETE) - registry.subscribe(callback1, resources.PORT, events.BEFORE_UPDATE) - registry.subscribe(callback2, resources.ROUTER_GATEWAY, events.BEFORE_UPDATE) - print('Subscribed') - - - def do_notify(): - print('Notifying...') - kwargs = {'foo': 'bar'} - registry.notify(resources.ROUTER, events.BEFORE_READ, do_notify, **kwargs) - registry.notify(resources.ROUTER, events.BEFORE_CREATE, do_notify, **kwargs) - registry.notify(resources.ROUTER, events.AFTER_DELETE, do_notify, **kwargs) - registry.notify(resources.PORT, events.BEFORE_UPDATE, do_notify, **kwargs) - registry.notify(resources.ROUTER_GATEWAY, events.BEFORE_UPDATE, do_notify, **kwargs) - - - do_notify() - registry.unsubscribe(callback1, resources.ROUTER, events.BEFORE_READ) - do_notify() - registry.unsubscribe_by_resource(callback1, resources.PORT) - do_notify() - registry.unsubscribe_all(callback1) - do_notify() - registry.clear() - do_notify() - -The output is: - -:: - - Subscribed - Notifying... - Callback1 called by on event before_read for resource router - Callback1 called by on event before_create for resource router - Callback1 called by on event after_delete for resource router - Callback1 called by on event before_update for resource port - Callback2 called by on event before_update for resource router_gateway - Notifying... - Callback1 called by on event before_create for resource router - Callback1 called by on event after_delete for resource router - Callback1 called by on event before_update for resource port - Callback2 called by on event before_update for resource router_gateway - Notifying... - Callback1 called by on event before_create for resource router - Callback1 called by on event after_delete for resource router - Callback2 called by on event before_update for resource router_gateway - Notifying... - Callback2 called by on event before_update for resource router_gateway - Notifying... - - -FAQ ---- - -Are callbacks a mechanism for remote or local communication (intra vs inter-process)? - - Callbacks as described in this document are a local communication mechanism that - allows multiple entities in the same process space to communicate with one another. - For Neutron specific remote (IPC) mechanisms, you can see read more in - :doc:`RPC API ` or :doc:`Messaging callbacks `. - -Can I use the callbacks registry to subscribe and notify non-core resources and events? - - Short answer is yes. The callbacks module defines literals for what are considered core Neutron - resources and events. However, the ability to subscribe/notify is not limited to these as you - can use your own defined resources and/or events. Just make sure you use string literals, as - typos are common, and the registry does not provide any runtime validation. Therefore, make - sure you test your code! - -What is the relationship between Callbacks and Taskflow? - - There is no overlap between Callbacks and Taskflow or mutual exclusion; as matter of fact they - can be combined; You could have a callback that goes on and trigger a taskflow. It is a nice - way of separating implementation from abstraction, because you can keep the callback in place - and change Taskflow with something else. - -Is there any ordering guarantee during notifications? - - No, the ordering in which callbacks are notified is completely arbitrary by design: callbacks - should know nothing about each other, and ordering should not matter; a callback will always be - notified and its outcome should always be the same regardless as to in which order is it - notified. Priorities can be a future extension, if a use case arises that require enforced - ordering. - -How is the notifying object expected to interact with the subscribing objects? - - The ``notify`` method implements a one-way communication paradigm: the notifier sends a message - without expecting a response back (in other words it fires and forget). However, due to the nature - of Python, the payload can be mutated by the subscribing objects, and this can lead to unexpected - behavior of your code, if you assume that this is the intentional design. Bear in mind, that - passing-by-value using deepcopy was not chosen for efficiency reasons. Having said that, if you - intend for the notifier object to expect a response, then the notifier itself would need to act - as a subscriber. - -Is the registry thread-safe? - - Short answer is no: it is not safe to make mutations while callbacks are being called (more - details as to why can be found `here `_). - A mutation could happen if a 'subscribe'/'unsubscribe' operation interleaves with the execution - of the notify loop. Albeit there is a possibility that things may end up in a bad state, the - registry works correctly under the assumption that subscriptions happen at the very beginning - of the life of the process and that the unsubscriptions (if any) take place at the very end. - In this case, chances that things do go badly may be pretty slim. Making the registry - thread-safe will be considered as a future improvement. - -What kind of operation I can add into callback? - - For callback function of PRECOMMIT_XXX events, we can't use blocking functions or a function - that would take a long time, like communicating to SDN controller over network. - Callbacks for PRECOMMIT events are meant to execute DB operations in a transaction context. The - errors that occur will be taken care by the context manager. - -What kind of function can be a callback? - - Anything you fancy: lambdas, 'closures', class, object or module methods. For instance: - -:: - - from neutron.callbacks import events - from neutron.callbacks import resources - from neutron.callbacks import registry - - - def callback1(resource, event, trigger, **kwargs): - print('module callback') - - - class MyCallback(object): - - def callback2(self, resource, event, trigger, **kwargs): - print('object callback') - - @classmethod - def callback3(cls, resource, event, trigger, **kwargs): - print('class callback') - - - c = MyCallback() - registry.subscribe(callback1, resources.ROUTER, events.BEFORE_CREATE) - registry.subscribe(c.callback2, resources.ROUTER, events.BEFORE_CREATE) - registry.subscribe(MyCallback.callback3, resources.ROUTER, events.BEFORE_CREATE) - - def do_notify(): - def nested_subscribe(resource, event, trigger, **kwargs): - print('nested callback') - - registry.subscribe(nested_subscribe, resources.ROUTER, events.BEFORE_CREATE) - - kwargs = {'foo': 'bar'} - registry.notify(resources.ROUTER, events.BEFORE_CREATE, do_notify, **kwargs) - - - print('Notifying...') - do_notify() - -And the output is going to be: - -:: - - Notifying... - module callback - object callback - class callback - nested callback diff --git a/doc/source/devref/index.rst b/doc/source/devref/index.rst index 02a135ddd8b..9a375d3b3fc 100644 --- a/doc/source/devref/index.rst +++ b/doc/source/devref/index.rst @@ -70,7 +70,6 @@ Neutron Internals ovs_vhostuser quality_of_service service_extensions - callbacks dns_order external_dns_integration upgrade diff --git a/doc/source/devref/rpc_callbacks.rst b/doc/source/devref/rpc_callbacks.rst index f69cac8974a..231979170ec 100644 --- a/doc/source/devref/rpc_callbacks.rst +++ b/doc/source/devref/rpc_callbacks.rst @@ -26,9 +26,10 @@ Neutron Messaging Callback System ================================= -Neutron already has a :doc:`callback system ` for -in-process resource callbacks where publishers and subscribers are able -to publish and subscribe for resource events. +Neutron already has a `callback system +`_ +for in-process resource callbacks where publishers and subscribers are +able to publish and subscribe for resource events. This system is different, and is intended to be used for inter-process callbacks, via the messaging fanout mechanisms. diff --git a/neutron/callbacks/events.py b/neutron/callbacks/events.py index 42e06231702..4db83d85008 100644 --- a/neutron/callbacks/events.py +++ b/neutron/callbacks/events.py @@ -10,6 +10,8 @@ # License for the specific language governing permissions and limitations # under the License. +# NOTE(boden): This module will be removed soon; use neutron-lib callbacks + # String literals representing events associated to data store operations BEFORE_CREATE = 'before_create' BEFORE_READ = 'before_read' diff --git a/neutron/callbacks/exceptions.py b/neutron/callbacks/exceptions.py index 3020def98a4..a73c82d55cc 100644 --- a/neutron/callbacks/exceptions.py +++ b/neutron/callbacks/exceptions.py @@ -10,44 +10,10 @@ # License for the specific language governing permissions and limitations # under the License. -from neutron_lib import exceptions +from neutron_lib.callbacks import exceptions -from neutron._i18n import _ +# NOTE(boden): This module will be removed soon; use neutron-lib callbacks - -class Invalid(exceptions.NeutronException): - message = _("The value '%(value)s' for %(element)s is not valid.") - - -class CallbackFailure(exceptions.MultipleExceptions): - - def __init__(self, errors): - self.errors = errors - - def __str__(self): - if isinstance(self.errors, list): - return ','.join(str(error) for error in self.errors) - else: - return str(self.errors) - - @property - def inner_exceptions(self): - if isinstance(self.errors, list): - return [self._unpack_if_notification_error(e) for e in self.errors] - return [self._unpack_if_notification_error(self.errors)] - - @staticmethod - def _unpack_if_notification_error(exc): - if isinstance(exc, NotificationError): - return exc.error - return exc - - -class NotificationError(object): - - def __init__(self, callback_id, error): - self.callback_id = callback_id - self.error = error - - def __str__(self): - return 'Callback %s failed with "%s"' % (self.callback_id, self.error) +Invalid = exceptions.Invalid +CallbackFailure = exceptions.CallbackFailure +NotificationError = exceptions.NotificationError diff --git a/neutron/callbacks/manager.py b/neutron/callbacks/manager.py index 864a1b7fa0a..bd1fed38d53 100644 --- a/neutron/callbacks/manager.py +++ b/neutron/callbacks/manager.py @@ -10,167 +10,11 @@ # License for the specific language governing permissions and limitations # under the License. -import collections +from neutron_lib.callbacks import manager -from oslo_log import log as logging -from oslo_utils import reflection +# NOTE(boden): This module will be removed soon; use neutron-lib callbacks -from neutron._i18n import _LE -from neutron.callbacks import events -from neutron.callbacks import exceptions -from neutron.db import api as db_api - -LOG = logging.getLogger(__name__) +CallbacksManager = manager.CallbacksManager -class CallbacksManager(object): - """A callback system that allows objects to cooperate in a loose manner.""" - - def __init__(self): - self.clear() - - def subscribe(self, callback, resource, event): - """Subscribe callback for a resource event. - - The same callback may register for more than one event. - - :param callback: the callback. It must raise or return a boolean. - :param resource: the resource. It must be a valid resource. - :param event: the event. It must be a valid event. - """ - LOG.debug("Subscribe: %(callback)s %(resource)s %(event)s", - {'callback': callback, 'resource': resource, 'event': event}) - - callback_id = _get_id(callback) - try: - self._callbacks[resource][event][callback_id] = callback - except KeyError: - # Initialize the registry for unknown resources and/or events - # prior to enlisting the callback. - self._callbacks[resource][event] = {} - self._callbacks[resource][event][callback_id] = callback - # We keep a copy of callbacks to speed the unsubscribe operation. - if callback_id not in self._index: - self._index[callback_id] = collections.defaultdict(set) - self._index[callback_id][resource].add(event) - - def unsubscribe(self, callback, resource, event): - """Unsubscribe callback from the registry. - - :param callback: the callback. - :param resource: the resource. - :param event: the event. - """ - LOG.debug("Unsubscribe: %(callback)s %(resource)s %(event)s", - {'callback': callback, 'resource': resource, 'event': event}) - - callback_id = self._find(callback) - if not callback_id: - LOG.debug("Callback %s not found", callback_id) - return - if resource and event: - del self._callbacks[resource][event][callback_id] - self._index[callback_id][resource].discard(event) - if not self._index[callback_id][resource]: - del self._index[callback_id][resource] - if not self._index[callback_id]: - del self._index[callback_id] - else: - value = '%s,%s' % (resource, event) - raise exceptions.Invalid(element='resource,event', value=value) - - def unsubscribe_by_resource(self, callback, resource): - """Unsubscribe callback for any event associated to the resource. - - :param callback: the callback. - :param resource: the resource. - """ - callback_id = self._find(callback) - if callback_id: - if resource in self._index[callback_id]: - for event in self._index[callback_id][resource]: - del self._callbacks[resource][event][callback_id] - del self._index[callback_id][resource] - if not self._index[callback_id]: - del self._index[callback_id] - - def unsubscribe_all(self, callback): - """Unsubscribe callback for all events and all resources. - - - :param callback: the callback. - """ - callback_id = self._find(callback) - if callback_id: - for resource, resource_events in self._index[callback_id].items(): - for event in resource_events: - del self._callbacks[resource][event][callback_id] - del self._index[callback_id] - - @db_api.reraise_as_retryrequest - def notify(self, resource, event, trigger, **kwargs): - """Notify all subscribed callback(s). - - Dispatch the resource's event to the subscribed callbacks. - - :param resource: the resource. - :param event: the event. - :param trigger: the trigger. A reference to the sender of the event. - """ - errors = self._notify_loop(resource, event, trigger, **kwargs) - if errors: - if event.startswith(events.BEFORE): - abort_event = event.replace( - events.BEFORE, events.ABORT) - self._notify_loop(resource, abort_event, trigger, **kwargs) - - raise exceptions.CallbackFailure(errors=errors) - - if event.startswith(events.PRECOMMIT): - raise exceptions.CallbackFailure(errors=errors) - - def clear(self): - """Brings the manager to a clean slate.""" - self._callbacks = collections.defaultdict(dict) - self._index = collections.defaultdict(dict) - - def _notify_loop(self, resource, event, trigger, **kwargs): - """The notification loop.""" - errors = [] - callbacks = list(self._callbacks[resource].get(event, {}).items()) - LOG.debug("Notify callbacks %s for %s, %s", - [c[0] for c in callbacks], resource, event) - # TODO(armax): consider using a GreenPile - for callback_id, callback in callbacks: - try: - callback(resource, event, trigger, **kwargs) - except Exception as e: - abortable_event = ( - event.startswith(events.BEFORE) or - event.startswith(events.PRECOMMIT) - ) - if not abortable_event: - LOG.exception(_LE("Error during notification for " - "%(callback)s %(resource)s, %(event)s"), - {'callback': callback_id, - 'resource': resource, 'event': event}) - else: - LOG.debug("Callback %(callback)s raised %(error)s", - {'callback': callback_id, 'error': e}) - errors.append(exceptions.NotificationError(callback_id, e)) - return errors - - def _find(self, callback): - """Return the callback_id if found, None otherwise.""" - callback_id = _get_id(callback) - return callback_id if callback_id in self._index else None - - -def _get_id(callback): - """Return a unique identifier for the callback.""" - # TODO(armax): consider using something other than names - # https://www.python.org/dev/peps/pep-3155/, but this - # might be okay for now. - parts = (reflection.get_callable_name(callback), - str(hash(callback))) - return '-'.join(parts) +_get_id = manager._get_id diff --git a/neutron/callbacks/registry.py b/neutron/callbacks/registry.py index 13a15774e97..dbfea8411e3 100644 --- a/neutron/callbacks/registry.py +++ b/neutron/callbacks/registry.py @@ -10,104 +10,15 @@ # License for the specific language governing permissions and limitations # under the License. -import collections -import inspect +from neutron_lib.callbacks import registry -from neutron.callbacks import manager +# NOTE(boden): This module will be removed soon; use neutron-lib callbacks - -# TODO(armax): consider adding locking -CALLBACK_MANAGER = None - -# stores a dictionary keyed on function pointers with a list of -# (resource, event) tuples to subscribe to on class initialization -_REGISTERED_CLASS_METHODS = collections.defaultdict(list) - - -def _get_callback_manager(): - global CALLBACK_MANAGER - if CALLBACK_MANAGER is None: - CALLBACK_MANAGER = manager.CallbacksManager() - return CALLBACK_MANAGER - - -def subscribe(callback, resource, event): - _get_callback_manager().subscribe(callback, resource, event) - - -def unsubscribe(callback, resource, event): - _get_callback_manager().unsubscribe(callback, resource, event) - - -def unsubscribe_by_resource(callback, resource): - _get_callback_manager().unsubscribe_by_resource(callback, resource) - - -def unsubscribe_all(callback): - _get_callback_manager().unsubscribe_all(callback) - - -def notify(resource, event, trigger, **kwargs): - _get_callback_manager().notify(resource, event, trigger, **kwargs) - - -def clear(): - _get_callback_manager().clear() - - -def receives(resource, events): - """Use to decorate methods on classes before initialization. - - Any classes that use this must themselves be decorated with the - @has_registry_receivers decorator to setup the __new__ method to - actually register the instance methods after initialization. - """ - def decorator(f): - for e in events: - _REGISTERED_CLASS_METHODS[f].append((resource, e)) - return f - return decorator - - -def has_registry_receivers(klass): - """Decorator to setup __new__ method in classes to subscribe bound methods. - - Any method decorated with @receives above is an unbound method on a class. - This decorator sets up the class __new__ method to subscribe the bound - method in the callback registry after object instantiation. - """ - orig_new = klass.__new__ - new_inherited = '__new__' not in klass.__dict__ - - @staticmethod - def replacement_new(cls, *args, **kwargs): - if new_inherited: - # class didn't define __new__ so we need to call inherited __new__ - super_new = super(klass, cls).__new__ - if super_new is object.__new__: - # object.__new__ doesn't accept args nor kwargs - instance = super_new(cls) - else: - instance = super_new(cls, *args, **kwargs) - else: - instance = orig_new(cls, *args, **kwargs) - if getattr(instance, '_DECORATED_METHODS_SUBSCRIBED', False): - # Avoid running this logic twice for classes inheriting other - # classes with this same decorator. Only one needs to execute - # to subscribe all decorated methods. - return instance - for name, unbound_method in inspect.getmembers(cls): - if (not inspect.ismethod(unbound_method) and - not inspect.isfunction(unbound_method)): - continue - # handle py27/py34 difference - func = getattr(unbound_method, 'im_func', unbound_method) - if func not in _REGISTERED_CLASS_METHODS: - continue - for resource, event in _REGISTERED_CLASS_METHODS[func]: - # subscribe the bound method - subscribe(getattr(instance, name), resource, event) - setattr(instance, '_DECORATED_METHODS_SUBSCRIBED', True) - return instance - klass.__new__ = replacement_new - return klass +subscribe = registry.subscribe +unsubscribe = registry.unsubscribe +unsubscribe_by_resource = registry.unsubscribe_by_resource +unsubscribe_all = registry.unsubscribe_all +notify = registry.notify +clear = registry.clear +receives = registry.receives +has_registry_receivers = registry.has_registry_receivers diff --git a/neutron/callbacks/resources.py b/neutron/callbacks/resources.py index 73667a598d5..f8e6fafbae6 100644 --- a/neutron/callbacks/resources.py +++ b/neutron/callbacks/resources.py @@ -10,6 +10,8 @@ # License for the specific language governing permissions and limitations # under the License. +# NOTE(boden): This module will be removed soon; use neutron-lib callbacks + # String literals representing core resources. AGENT = 'agent' FLOATING_IP = 'floatingip' diff --git a/neutron/tests/base.py b/neutron/tests/base.py index 519907dfdc8..944c4fc66f5 100644 --- a/neutron/tests/base.py +++ b/neutron/tests/base.py @@ -43,7 +43,6 @@ from neutron.agent.linux import external_process from neutron.api.rpc.callbacks.consumer import registry as rpc_consumer_reg from neutron.api.rpc.callbacks.producer import registry as rpc_producer_reg from neutron.callbacks import manager as registry_manager -from neutron.callbacks import registry from neutron.common import config from neutron.common import rpc as n_rpc from neutron.db import _model_query as model_query @@ -296,7 +295,10 @@ class BaseTestCase(DietTestCase): self.setup_rpc_mocks() self.setup_config() - self.setup_test_registry_instance() + + self._callback_manager = registry_manager.CallbacksManager() + self.useFixture(fixture.CallbackRegistryFixture( + callback_manager=self._callback_manager)) # Give a private copy of the directory to each test. self.useFixture(fixture.PluginDirectoryFixture()) @@ -364,12 +366,6 @@ class BaseTestCase(DietTestCase): self.addCleanup(n_rpc.cleanup) n_rpc.init(CONF) - def setup_test_registry_instance(self): - """Give a private copy of the registry to each test.""" - self._callback_manager = registry_manager.CallbacksManager() - mock.patch.object(registry, '_get_callback_manager', - return_value=self._callback_manager).start() - def setup_config(self, args=None): """Tests that need a non-default config can override this method.""" self.config_parse(args=args) diff --git a/neutron/tests/unit/callbacks/__init__.py b/neutron/tests/unit/callbacks/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/neutron/tests/unit/callbacks/test_manager.py b/neutron/tests/unit/callbacks/test_manager.py deleted file mode 100644 index eb2b59d2ecf..00000000000 --- a/neutron/tests/unit/callbacks/test_manager.py +++ /dev/null @@ -1,258 +0,0 @@ -# Copyright 2015 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import mock -from oslo_db import exception as db_exc - -from neutron.callbacks import events -from neutron.callbacks import exceptions -from neutron.callbacks import manager -from neutron.callbacks import resources -from neutron.tests import base - - -class ObjectWithCallback(object): - - def __init__(self): - self.counter = 0 - - def callback(self, *args, **kwargs): - self.counter += 1 - - -class GloriousObjectWithCallback(ObjectWithCallback): - pass - - -def callback_1(*args, **kwargs): - callback_1.counter += 1 -callback_id_1 = manager._get_id(callback_1) - - -def callback_2(*args, **kwargs): - callback_2.counter += 1 -callback_id_2 = manager._get_id(callback_2) - - -def callback_raise(*args, **kwargs): - raise Exception() - - -def callback_raise_retriable(*args, **kwargs): - raise db_exc.DBDeadlock() - - -class CallBacksManagerTestCase(base.BaseTestCase): - - def setUp(self): - super(CallBacksManagerTestCase, self).setUp() - self.manager = manager.CallbacksManager() - callback_1.counter = 0 - callback_2.counter = 0 - - def test_subscribe(self): - self.manager.subscribe( - callback_1, resources.PORT, events.BEFORE_CREATE) - self.assertIsNotNone( - self.manager._callbacks[resources.PORT][events.BEFORE_CREATE]) - self.assertIn(callback_id_1, self.manager._index) - - def test_subscribe_unknown(self): - self.manager.subscribe( - callback_1, 'my_resource', 'my-event') - self.assertIsNotNone( - self.manager._callbacks['my_resource']['my-event']) - self.assertIn(callback_id_1, self.manager._index) - - def test_subscribe_is_idempotent(self): - self.manager.subscribe( - callback_1, resources.PORT, events.BEFORE_CREATE) - self.manager.subscribe( - callback_1, resources.PORT, events.BEFORE_CREATE) - self.assertEqual( - 1, - len(self.manager._callbacks[resources.PORT][events.BEFORE_CREATE])) - callbacks = self.manager._index[callback_id_1][resources.PORT] - self.assertEqual(1, len(callbacks)) - - def test_subscribe_multiple_callbacks(self): - self.manager.subscribe( - callback_1, resources.PORT, events.BEFORE_CREATE) - self.manager.subscribe( - callback_2, resources.PORT, events.BEFORE_CREATE) - self.assertEqual(2, len(self.manager._index)) - self.assertEqual( - 2, - len(self.manager._callbacks[resources.PORT][events.BEFORE_CREATE])) - - def test_unsubscribe_during_iteration(self): - unsub = lambda r, e, *a, **k: self.manager.unsubscribe(unsub, r, e) - self.manager.subscribe(unsub, resources.PORT, - events.BEFORE_CREATE) - self.manager.notify(resources.PORT, events.BEFORE_CREATE, mock.ANY) - self.assertNotIn(unsub, self.manager._index) - - def test_unsubscribe(self): - self.manager.subscribe( - callback_1, resources.PORT, events.BEFORE_CREATE) - self.manager.unsubscribe( - callback_1, resources.PORT, events.BEFORE_CREATE) - self.assertNotIn( - callback_id_1, - self.manager._callbacks[resources.PORT][events.BEFORE_CREATE]) - self.assertNotIn(callback_id_1, self.manager._index) - - def test_unsubscribe_unknown_callback(self): - self.manager.subscribe( - callback_2, resources.PORT, events.BEFORE_CREATE) - self.manager.unsubscribe(callback_1, mock.ANY, mock.ANY) - self.assertEqual(1, len(self.manager._index)) - - def test_unsubscribe_is_idempotent(self): - self.manager.subscribe( - callback_1, resources.PORT, events.BEFORE_CREATE) - self.manager.unsubscribe( - callback_1, resources.PORT, events.BEFORE_CREATE) - self.manager.unsubscribe( - callback_1, resources.PORT, events.BEFORE_CREATE) - self.assertNotIn(callback_id_1, self.manager._index) - self.assertNotIn(callback_id_1, - self.manager._callbacks[resources.PORT][events.BEFORE_CREATE]) - - def test_unsubscribe_by_resource(self): - self.manager.subscribe( - callback_1, resources.PORT, events.BEFORE_CREATE) - self.manager.subscribe( - callback_1, resources.PORT, events.BEFORE_DELETE) - self.manager.subscribe( - callback_2, resources.PORT, events.BEFORE_DELETE) - self.manager.unsubscribe_by_resource(callback_1, resources.PORT) - self.assertNotIn( - callback_id_1, - self.manager._callbacks[resources.PORT][events.BEFORE_CREATE]) - self.assertIn( - callback_id_2, - self.manager._callbacks[resources.PORT][events.BEFORE_DELETE]) - self.assertNotIn(callback_id_1, self.manager._index) - - def test_unsubscribe_all(self): - self.manager.subscribe( - callback_1, resources.PORT, events.BEFORE_CREATE) - self.manager.subscribe( - callback_1, resources.PORT, events.BEFORE_DELETE) - self.manager.subscribe( - callback_1, resources.ROUTER, events.BEFORE_CREATE) - self.manager.unsubscribe_all(callback_1) - self.assertNotIn( - callback_id_1, - self.manager._callbacks[resources.PORT][events.BEFORE_CREATE]) - self.assertNotIn(callback_id_1, self.manager._index) - - def test_notify_none(self): - self.manager.notify(resources.PORT, events.BEFORE_CREATE, mock.ANY) - self.assertEqual(0, callback_1.counter) - self.assertEqual(0, callback_2.counter) - - def test_feebly_referenced_callback(self): - self.manager.subscribe(lambda *x, **y: None, resources.PORT, - events.BEFORE_CREATE) - self.manager.notify(resources.PORT, events.BEFORE_CREATE, mock.ANY) - - def test_notify_with_exception(self): - with mock.patch.object(self.manager, '_notify_loop') as n: - n.return_value = ['error'] - self.assertRaises(exceptions.CallbackFailure, - self.manager.notify, - mock.ANY, events.BEFORE_CREATE, - 'trigger', params={'a': 1}) - expected_calls = [ - mock.call(mock.ANY, 'before_create', - 'trigger', params={'a': 1}), - mock.call(mock.ANY, 'abort_create', - 'trigger', params={'a': 1}) - ] - n.assert_has_calls(expected_calls) - - def test_notify_handle_exception(self): - self.manager.subscribe( - callback_raise, resources.PORT, events.BEFORE_CREATE) - e = self.assertRaises(exceptions.CallbackFailure, self.manager.notify, - resources.PORT, events.BEFORE_CREATE, self) - self.assertIsInstance(e.errors[0], exceptions.NotificationError) - - def test_notify_handle_retriable_exception(self): - self.manager.subscribe( - callback_raise_retriable, resources.PORT, events.BEFORE_CREATE) - self.assertRaises(db_exc.RetryRequest, self.manager.notify, - resources.PORT, events.BEFORE_CREATE, self) - - def test_notify_called_once_with_no_failures(self): - with mock.patch.object(self.manager, '_notify_loop') as n: - n.return_value = False - self.manager.notify(resources.PORT, events.BEFORE_CREATE, mock.ANY) - n.assert_called_once_with( - resources.PORT, events.BEFORE_CREATE, mock.ANY) - - def test__notify_loop_single_event(self): - self.manager.subscribe( - callback_1, resources.PORT, events.BEFORE_CREATE) - self.manager.subscribe( - callback_2, resources.PORT, events.BEFORE_CREATE) - self.manager._notify_loop( - resources.PORT, events.BEFORE_CREATE, mock.ANY) - self.assertEqual(1, callback_1.counter) - self.assertEqual(1, callback_2.counter) - - def test__notify_loop_multiple_events(self): - self.manager.subscribe( - callback_1, resources.PORT, events.BEFORE_CREATE) - self.manager.subscribe( - callback_1, resources.ROUTER, events.BEFORE_DELETE) - self.manager.subscribe( - callback_2, resources.PORT, events.BEFORE_CREATE) - self.manager._notify_loop( - resources.PORT, events.BEFORE_CREATE, mock.ANY) - self.manager._notify_loop( - resources.ROUTER, events.BEFORE_DELETE, mock.ANY) - self.assertEqual(2, callback_1.counter) - self.assertEqual(1, callback_2.counter) - - @mock.patch("neutron.callbacks.manager.LOG") - def test__notify_loop_skip_log_errors(self, _logger): - self.manager.subscribe( - callback_raise, resources.PORT, events.BEFORE_CREATE) - self.manager.subscribe( - callback_raise, resources.PORT, events.PRECOMMIT_CREATE) - self.manager._notify_loop( - resources.PORT, events.BEFORE_CREATE, mock.ANY) - self.manager._notify_loop( - resources.PORT, events.PRECOMMIT_CREATE, mock.ANY) - self.assertFalse(_logger.exception.call_count) - self.assertTrue(_logger.debug.call_count) - - def test_object_instances_as_subscribers(self): - """Ensures that the manager doesn't think these are equivalent.""" - a = GloriousObjectWithCallback() - b = ObjectWithCallback() - c = ObjectWithCallback() - for o in (a, b, c): - self.manager.subscribe( - o.callback, resources.PORT, events.BEFORE_CREATE) - # ensure idempotency remains for a single object - self.manager.subscribe( - o.callback, resources.PORT, events.BEFORE_CREATE) - self.manager.notify(resources.PORT, events.BEFORE_CREATE, mock.ANY) - self.assertEqual(1, a.counter) - self.assertEqual(1, b.counter) - self.assertEqual(1, c.counter) diff --git a/neutron/tests/unit/callbacks/test_registry.py b/neutron/tests/unit/callbacks/test_registry.py deleted file mode 100644 index bccaf331e2f..00000000000 --- a/neutron/tests/unit/callbacks/test_registry.py +++ /dev/null @@ -1,93 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import mock - -from neutron.callbacks import events -from neutron.callbacks import registry -from neutron.callbacks import resources -from neutron.tests import base - - -@registry.has_registry_receivers -class ObjectWithDecoratedCallback(object): - - def __init__(self): - self.counter = 0 - - @registry.receives(resources.PORT, [events.AFTER_CREATE, - events.AFTER_UPDATE]) - @registry.receives(resources.NETWORK, [events.AFTER_DELETE]) - def callback(self, *args, **kwargs): - self.counter += 1 - - -class MixinWithNew(object): - def __new__(cls): - i = super(MixinWithNew, cls).__new__(cls) - i.new_called = True - return i - - -@registry.has_registry_receivers -class AnotherObjectWithDecoratedCallback(ObjectWithDecoratedCallback, - MixinWithNew): - - def __init__(self): - super(AnotherObjectWithDecoratedCallback, self).__init__() - self.counter2 = 0 - - @registry.receives(resources.NETWORK, [events.AFTER_DELETE]) - def callback2(self, *args, **kwargs): - self.counter2 += 1 - - -@registry.has_registry_receivers -class CallbackClassWithParameters(object): - - def __init__(self, dummy): - pass - - -class CallBacksManagerTestCase(base.BaseTestCase): - - def test_decorated_inst_method_receives(self): - i1 = ObjectWithDecoratedCallback() - registry.notify(resources.PORT, events.BEFORE_CREATE, self) - self.assertEqual(0, i1.counter) - registry.notify(resources.PORT, events.AFTER_CREATE, self) - self.assertEqual(1, i1.counter) - registry.notify(resources.PORT, events.AFTER_UPDATE, self) - self.assertEqual(2, i1.counter) - registry.notify(resources.NETWORK, events.AFTER_UPDATE, self) - self.assertEqual(2, i1.counter) - registry.notify(resources.NETWORK, events.AFTER_DELETE, self) - self.assertEqual(3, i1.counter) - i2 = ObjectWithDecoratedCallback() - self.assertEqual(0, i2.counter) - registry.notify(resources.NETWORK, events.AFTER_DELETE, self) - self.assertEqual(4, i1.counter) - self.assertEqual(1, i2.counter) - - def test_object_inheriting_others_no_double_subscribe(self): - with mock.patch.object(registry, 'subscribe') as sub: - AnotherObjectWithDecoratedCallback() - # there are 3 methods (2 in parent and one in child) and 1 - # subscribes to 2 events, so we expect 4 subscribes - self.assertEqual(4, len(sub.mock_calls)) - - def test_new_inheritance_not_broken(self): - self.assertTrue(AnotherObjectWithDecoratedCallback().new_called) - - def test_object_new_not_broken(self): - CallbackClassWithParameters('dummy')