From 4134f882cc695a4e1efa22502bc5dddc31a9865f Mon Sep 17 00:00:00 2001 From: Kevin Benton Date: Wed, 15 Feb 2017 21:13:18 -0800 Subject: [PATCH] Make ML2 OVO push notification asynchronous The OVO push notification logic was blocking the AFTER_UPDATE event for all core resources. This isn't problematic for individual HTTP API calls. However, the agent current updates the status of every port it wires two times, including on agent restarts. In order to drastically reduce the impact of this notifier, this patch moves it into a spawned eventlet coroutine so it doesn't block the main AFTER_UPDATE events. A semaphore is used to prevent multiple coroutines from trying to hit the database at once in the background. This doesn't change the semantics of the notifier since its goal is always to send the latest events. Partial-Bug: #1665215 Change-Id: Ic259bad6f65f4bc45f5b900e0979c9a91865089b --- neutron/plugins/ml2/ovo_rpc.py | 61 ++++++++++++++----- .../tests/unit/plugins/ml2/test_ovo_rpc.py | 1 + 2 files changed, 46 insertions(+), 16 deletions(-) diff --git a/neutron/plugins/ml2/ovo_rpc.py b/neutron/plugins/ml2/ovo_rpc.py index 2255f884985..ec7d84d2665 100644 --- a/neutron/plugins/ml2/ovo_rpc.py +++ b/neutron/plugins/ml2/ovo_rpc.py @@ -13,6 +13,8 @@ import traceback +import eventlet +from oslo_concurrency import lockutils from oslo_log import log as logging from neutron._i18n import _LE @@ -21,6 +23,7 @@ from neutron.api.rpc.handlers import resources_rpc from neutron.callbacks import events from neutron.callbacks import registry from neutron.callbacks import resources +from neutron import context as n_ctx from neutron.db import api as db_api from neutron.objects import network from neutron.objects import ports @@ -35,10 +38,16 @@ class _ObjectChangeHandler(object): self._resource = resource self._obj_class = object_class self._resource_push_api = resource_push_api + self._resources_to_push = {} + self._worker_pool = eventlet.GreenPool() for event in (events.AFTER_CREATE, events.AFTER_UPDATE, events.AFTER_DELETE): registry.subscribe(self.handle_event, resource, event) + def wait(self): + """Waits for all outstanding events to be dispatched.""" + self._worker_pool.waitall() + @staticmethod def _is_session_semantic_violated(context, resource, event): """Return True and print an ugly error on transaction violation. @@ -68,22 +77,37 @@ class _ObjectChangeHandler(object): if self._is_session_semantic_violated(context, resource, event): return resource_id = self._extract_resource_id(kwargs) - # attempt to get regardless of event type so concurrent delete - # after create/update is the same code-path as a delete event - with db_api.context_manager.independent.reader.using(context): - obj = self._obj_class.get_object(context, id=resource_id) - # CREATE events are always treated as UPDATE events to ensure - # listeners are written to handle out-of-order messages - if obj is None: - rpc_event = rpc_events.DELETED - # construct a fake object with the right ID so we can - # have a payload for the delete message. - obj = self._obj_class(id=resource_id) - else: - rpc_event = rpc_events.UPDATED - LOG.debug("Dispatching RPC callback event %s for %s %s.", - rpc_event, self._resource, resource_id) - self._resource_push_api.push(context, [obj], rpc_event) + # we preserve the context so we can trace a receive on the agent back + # to the server-side event that triggered it + self._resources_to_push[resource_id] = context.to_dict() + # spawn worker so we don't block main AFTER_UPDATE thread + self._worker_pool.spawn(self.dispatch_events) + + @lockutils.synchronized('event-dispatch') + def dispatch_events(self): + # this is guarded by a lock to ensure we don't get too many concurrent + # dispatchers hitting the database simultaneously. + to_dispatch, self._resources_to_push = self._resources_to_push, {} + # TODO(kevinbenton): now that we are batching these, convert to a + # single get_objects call for all of them + for resource_id, context_dict in to_dispatch.items(): + context = n_ctx.Context.from_dict(context_dict) + # attempt to get regardless of event type so concurrent delete + # after create/update is the same code-path as a delete event + with db_api.context_manager.independent.reader.using(context): + obj = self._obj_class.get_object(context, id=resource_id) + # CREATE events are always treated as UPDATE events to ensure + # listeners are written to handle out-of-order messages + if obj is None: + rpc_event = rpc_events.DELETED + # construct a fake object with the right ID so we can + # have a payload for the delete message. + obj = self._obj_class(id=resource_id) + else: + rpc_event = rpc_events.UPDATED + LOG.debug("Dispatching RPC callback event %s for %s %s.", + rpc_event, self._resource, resource_id) + self._resource_push_api.push(context, [obj], rpc_event) def _extract_resource_id(self, callback_kwargs): id_kwarg = '%s_id' % self._resource @@ -118,3 +142,8 @@ class OVOServerRpcInterface(object): res: _ObjectChangeHandler(res, obj_class, self._rpc_pusher) for res, obj_class in resource_objclass_map.items() } + + def wait(self): + """Wait for all handlers to finish processing async events.""" + for handler in self._resource_handlers.values(): + handler.wait() diff --git a/neutron/tests/unit/plugins/ml2/test_ovo_rpc.py b/neutron/tests/unit/plugins/ml2/test_ovo_rpc.py index 80bdb1a6b29..7ca8017eb7d 100644 --- a/neutron/tests/unit/plugins/ml2/test_ovo_rpc.py +++ b/neutron/tests/unit/plugins/ml2/test_ovo_rpc.py @@ -33,6 +33,7 @@ class OVOServerRpcInterfaceTestCase(test_plugin.Ml2PluginV2TestCase): 'ResourcesPushRpcApi.push', new=receive).start() def _assert_object_received(self, ovotype, oid=None, event=None): + self.plugin.ovo_notifier.wait() for obj, evt in self.received: if isinstance(obj, ovotype): if (obj.id == oid or not oid) and (not event or event == evt):