Make ML2 OVO push notification asynchronous

The OVO push notification logic was blocking the AFTER_UPDATE
event for all core resources. This isn't problematic for individual
HTTP API calls. However, the agent current updates the status of
every port it wires two times, including on agent restarts.

In order to drastically reduce the impact of this notifier, this
patch moves it into a spawned eventlet coroutine so it doesn't
block the main AFTER_UPDATE events. A semaphore is used to prevent
multiple coroutines from trying to hit the database at once in the
background.

This doesn't change the semantics of the notifier since its goal
is always to send the latest events.

Partial-Bug: #1665215
Change-Id: Ic259bad6f65f4bc45f5b900e0979c9a91865089b
This commit is contained in:
Kevin Benton 2017-02-15 21:13:18 -08:00
parent 840e04b6f1
commit 4134f882cc
2 changed files with 46 additions and 16 deletions

View File

@ -13,6 +13,8 @@
import traceback
import eventlet
from oslo_concurrency import lockutils
from oslo_log import log as logging
from neutron._i18n import _LE
@ -21,6 +23,7 @@ from neutron.api.rpc.handlers import resources_rpc
from neutron.callbacks import events
from neutron.callbacks import registry
from neutron.callbacks import resources
from neutron import context as n_ctx
from neutron.db import api as db_api
from neutron.objects import network
from neutron.objects import ports
@ -35,10 +38,16 @@ class _ObjectChangeHandler(object):
self._resource = resource
self._obj_class = object_class
self._resource_push_api = resource_push_api
self._resources_to_push = {}
self._worker_pool = eventlet.GreenPool()
for event in (events.AFTER_CREATE, events.AFTER_UPDATE,
events.AFTER_DELETE):
registry.subscribe(self.handle_event, resource, event)
def wait(self):
"""Waits for all outstanding events to be dispatched."""
self._worker_pool.waitall()
@staticmethod
def _is_session_semantic_violated(context, resource, event):
"""Return True and print an ugly error on transaction violation.
@ -68,22 +77,37 @@ class _ObjectChangeHandler(object):
if self._is_session_semantic_violated(context, resource, event):
return
resource_id = self._extract_resource_id(kwargs)
# attempt to get regardless of event type so concurrent delete
# after create/update is the same code-path as a delete event
with db_api.context_manager.independent.reader.using(context):
obj = self._obj_class.get_object(context, id=resource_id)
# CREATE events are always treated as UPDATE events to ensure
# listeners are written to handle out-of-order messages
if obj is None:
rpc_event = rpc_events.DELETED
# construct a fake object with the right ID so we can
# have a payload for the delete message.
obj = self._obj_class(id=resource_id)
else:
rpc_event = rpc_events.UPDATED
LOG.debug("Dispatching RPC callback event %s for %s %s.",
rpc_event, self._resource, resource_id)
self._resource_push_api.push(context, [obj], rpc_event)
# we preserve the context so we can trace a receive on the agent back
# to the server-side event that triggered it
self._resources_to_push[resource_id] = context.to_dict()
# spawn worker so we don't block main AFTER_UPDATE thread
self._worker_pool.spawn(self.dispatch_events)
@lockutils.synchronized('event-dispatch')
def dispatch_events(self):
# this is guarded by a lock to ensure we don't get too many concurrent
# dispatchers hitting the database simultaneously.
to_dispatch, self._resources_to_push = self._resources_to_push, {}
# TODO(kevinbenton): now that we are batching these, convert to a
# single get_objects call for all of them
for resource_id, context_dict in to_dispatch.items():
context = n_ctx.Context.from_dict(context_dict)
# attempt to get regardless of event type so concurrent delete
# after create/update is the same code-path as a delete event
with db_api.context_manager.independent.reader.using(context):
obj = self._obj_class.get_object(context, id=resource_id)
# CREATE events are always treated as UPDATE events to ensure
# listeners are written to handle out-of-order messages
if obj is None:
rpc_event = rpc_events.DELETED
# construct a fake object with the right ID so we can
# have a payload for the delete message.
obj = self._obj_class(id=resource_id)
else:
rpc_event = rpc_events.UPDATED
LOG.debug("Dispatching RPC callback event %s for %s %s.",
rpc_event, self._resource, resource_id)
self._resource_push_api.push(context, [obj], rpc_event)
def _extract_resource_id(self, callback_kwargs):
id_kwarg = '%s_id' % self._resource
@ -118,3 +142,8 @@ class OVOServerRpcInterface(object):
res: _ObjectChangeHandler(res, obj_class, self._rpc_pusher)
for res, obj_class in resource_objclass_map.items()
}
def wait(self):
"""Wait for all handlers to finish processing async events."""
for handler in self._resource_handlers.values():
handler.wait()

View File

@ -33,6 +33,7 @@ class OVOServerRpcInterfaceTestCase(test_plugin.Ml2PluginV2TestCase):
'ResourcesPushRpcApi.push', new=receive).start()
def _assert_object_received(self, ovotype, oid=None, event=None):
self.plugin.ovo_notifier.wait()
for obj, evt in self.received:
if isinstance(obj, ovotype):
if (obj.id == oid or not oid) and (not event or event == evt):