Support binding_deactivate in CacheBackedPluginApi

CacheBackedPluginApi enables the neutron server to push resources
updates to L2 agents. The agents retrieve the resources updates locally
from the cache implemented by it. CacheBackedPluginApi also emulates
server notifications to the agents, such as port_delete or port_update,
based on the updated data received by the cache. This commit adds code
to CacheBackedPluginApi to implement a binding_deactivate notification
from the server to the agents

Change-Id: I023ccbd405bc41379007d87a9c1051970aa8d603
Partial-Bug: #1580880
This commit is contained in:
Miguel Lavalle 2018-05-07 16:39:56 -05:00
parent f374697760
commit be7ad302e1
2 changed files with 147 additions and 4 deletions

View File

@ -18,8 +18,10 @@ import itertools
import netaddr
from neutron_lib.agent import topics
from neutron_lib.api.definitions import portbindings_extended as pb_ext
from neutron_lib.callbacks import events as callback_events
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources as callback_resources
from neutron_lib import constants
from oslo_log import log as logging
import oslo_messaging
@ -199,16 +201,54 @@ class CacheBackedPluginApi(PluginApi):
the payloads the handlers are expecting (an ID).
"""
rtype = rtype.lower() # all legacy handlers don't camelcase
is_delete = event == callback_events.AFTER_DELETE
suffix = 'delete' if is_delete else 'update'
method = "%s_%s" % (rtype, suffix)
method, host = self._get_method_host(rtype, event, **kwargs)
if not hasattr(self._legacy_interface, method):
# TODO(kevinbenton): once these notifications are stable, emit
# a deprecation warning for legacy handlers
return
payload = {rtype: {'id': resource_id}, '%s_id' % rtype: resource_id}
payload = {rtype: {'id': resource_id}, '%s_id' % rtype: resource_id,
'host': host}
getattr(self._legacy_interface, method)(context, **payload)
def _get_method_host(self, rtype, event, **kwargs):
"""Constructs the name of method to be called in the legacy interface.
If the event received is a port update that contains a binding
activation where a previous binding is deactivated, the method name
is 'binding_deactivate' and the host where the binding has to be
deactivated is returned. Otherwise, the method name is constructed from
rtype and the event received and the host is None.
"""
is_delete = event == callback_events.AFTER_DELETE
suffix = 'delete' if is_delete else 'update'
method = "%s_%s" % (rtype, suffix)
host = None
if is_delete or rtype != callback_resources.PORT:
return method, host
# A port update was received. Find out if it is a binding activation
# where a previous binding was deactivated
BINDINGS = pb_ext.COLLECTION_NAME
if BINDINGS in kwargs.get('changed_fields', set()):
existing_active_binding = (
utils.get_port_binding_by_status_and_host(
getattr(kwargs['existing'], 'bindings', []),
constants.ACTIVE))
updated_active_binding = (
utils.get_port_binding_by_status_and_host(
getattr(kwargs['updated'], 'bindings', []),
constants.ACTIVE))
if (existing_active_binding and updated_active_binding and
existing_active_binding.host !=
updated_active_binding.host):
if (utils.get_port_binding_by_status_and_host(
getattr(kwargs['updated'], 'bindings', []),
constants.INACTIVE,
host=existing_active_binding.host)):
method = 'binding_deactivate'
host = existing_active_binding.host
return method, host
def get_devices_details_list_and_failed_devices(self, context, devices,
agent_id, host=None):
result = {'devices': [], 'failed_devices': []}

View File

@ -16,9 +16,15 @@
import datetime
import mock
from neutron_lib.agent import topics as lib_topics
from neutron_lib.callbacks import events
from neutron_lib.callbacks import resources
from neutron_lib import constants
from oslo_context import context as oslo_context
from oslo_utils import uuidutils
from neutron.agent import rpc
from neutron.objects import ports
from neutron.tests import base
@ -162,3 +168,100 @@ class AgentRPCMethods(base.BaseTestCase):
with mock.patch(call_to_patch) as create_connection:
rpc.create_consumers(endpoints, 'foo', [('topic', 'op', 'node1')])
create_connection.assert_has_calls(expected)
class TestCacheBackedPluginApi(base.BaseTestCase):
def setUp(self):
super(TestCacheBackedPluginApi, self).setUp()
self._api = rpc.CacheBackedPluginApi(lib_topics.PLUGIN)
self._api._legacy_interface = mock.Mock()
self._port_id = uuidutils.generate_uuid()
self._port = ports.Port(
id=self._port_id,
bindings=[ports.PortBinding(port_id=self._port_id,
host='host1',
status=constants.ACTIVE)])
def test__legacy_notifier_resource_delete(self):
self._api._legacy_notifier(resources.PORT, events.AFTER_DELETE, self,
mock.ANY, resource_id=self._port_id,
existing=self._port)
self._api._legacy_interface.port_update.assert_not_called()
self._api._legacy_interface.port_delete.assert_called_once_with(
mock.ANY, port={'id': self._port_id}, port_id=self._port_id,
host=None)
self._api._legacy_interface.binding_deactivate.assert_not_called()
def test__legacy_notifier_resource_update(self):
updated_port = ports.Port(id=self._port_id, name='updated_port')
self._api._legacy_notifier(resources.PORT, events.AFTER_UPDATE, self,
mock.ANY, changed_fields=set(['name']),
resource_id=self._port_id,
existing=self._port, updated=updated_port)
self._api._legacy_interface.port_delete.assert_not_called()
self._api._legacy_interface.port_update.assert_called_once_with(
mock.ANY, port={'id': self._port_id}, port_id=self._port_id,
host=None)
self._api._legacy_interface.binding_deactivate.assert_not_called()
def _test__legacy_notifier_binding_activated(self):
updated_port = ports.Port(
id=self._port_id, name='updated_port',
bindings=[ports.PortBinding(port_id=self._port_id,
host='host2',
status=constants.ACTIVE),
ports.PortBinding(port_id=self._port_id,
host='host1',
status=constants.INACTIVE)])
self._api._legacy_notifier(resources.PORT, events.AFTER_UPDATE, self,
mock.ANY,
changed_fields=set(['name', 'bindings']),
resource_id=self._port_id,
existing=self._port, updated=updated_port)
self._api._legacy_interface.port_update.assert_not_called()
self._api._legacy_interface.port_delete.assert_not_called()
def test__legacy_notifier_new_binding_activated(self):
self._test__legacy_notifier_binding_activated()
self._api._legacy_interface.binding_deactivate.assert_called_once_with(
mock.ANY, port={'id': self._port_id}, port_id=self._port_id,
host='host1')
def test__legacy_notifier_no_new_binding_activated(self):
updated_port = ports.Port(
id=self._port_id, name='updated_port',
bindings=[ports.PortBinding(port_id=self._port_id,
host='host2',
status=constants.ACTIVE)])
self._api._legacy_notifier(resources.PORT, events.AFTER_UPDATE, self,
mock.ANY,
changed_fields=set(['name', 'bindings']),
resource_id=self._port_id,
existing=self._port, updated=updated_port)
self._api._legacy_interface.port_update.assert_called_once_with(
mock.ANY, port={'id': self._port_id}, port_id=self._port_id,
host=None)
self._api._legacy_interface.port_delete.assert_not_called()
self._api._legacy_interface.binding_deactivate.assert_not_called()
def test__legacy_notifier_existing_or_updated_is_none(self):
self._api._legacy_notifier(resources.PORT, events.AFTER_UPDATE,
self, mock.ANY,
changed_fields=set(['name', 'bindings']),
resource_id=self._port_id,
existing=None, updated=None)
self._api._legacy_notifier(resources.PORT, events.AFTER_UPDATE, self,
mock.ANY,
changed_fields=set(['name', 'bindings']),
resource_id=self._port_id,
existing=self._port, updated=None)
call = mock.call(mock.ANY, port={'id': self._port_id},
port_id=self._port_id, host=None)
self._api._legacy_interface.port_update.assert_has_calls([call, call])
self._api._legacy_interface.port_delete.assert_not_called()
self._api._legacy_interface.binding_deactivate.assert_not_called()
def test__legacy_notifier_binding_activated_not_supported(self):
del self._api._legacy_interface.binding_deactivate
self._test__legacy_notifier_binding_activated()