diff --git a/neutron/agent/rpc.py b/neutron/agent/rpc.py index 13a2b7f1ac9..c76bf2f20ff 100644 --- a/neutron/agent/rpc.py +++ b/neutron/agent/rpc.py @@ -18,8 +18,10 @@ import itertools import netaddr from neutron_lib.agent import topics +from neutron_lib.api.definitions import portbindings_extended as pb_ext from neutron_lib.callbacks import events as callback_events from neutron_lib.callbacks import registry +from neutron_lib.callbacks import resources as callback_resources from neutron_lib import constants from oslo_log import log as logging import oslo_messaging @@ -199,16 +201,54 @@ class CacheBackedPluginApi(PluginApi): the payloads the handlers are expecting (an ID). """ rtype = rtype.lower() # all legacy handlers don't camelcase - is_delete = event == callback_events.AFTER_DELETE - suffix = 'delete' if is_delete else 'update' - method = "%s_%s" % (rtype, suffix) + method, host = self._get_method_host(rtype, event, **kwargs) if not hasattr(self._legacy_interface, method): # TODO(kevinbenton): once these notifications are stable, emit # a deprecation warning for legacy handlers return - payload = {rtype: {'id': resource_id}, '%s_id' % rtype: resource_id} + payload = {rtype: {'id': resource_id}, '%s_id' % rtype: resource_id, + 'host': host} getattr(self._legacy_interface, method)(context, **payload) + def _get_method_host(self, rtype, event, **kwargs): + """Constructs the name of method to be called in the legacy interface. + + If the event received is a port update that contains a binding + activation where a previous binding is deactivated, the method name + is 'binding_deactivate' and the host where the binding has to be + deactivated is returned. Otherwise, the method name is constructed from + rtype and the event received and the host is None. + """ + is_delete = event == callback_events.AFTER_DELETE + suffix = 'delete' if is_delete else 'update' + method = "%s_%s" % (rtype, suffix) + host = None + if is_delete or rtype != callback_resources.PORT: + return method, host + + # A port update was received. Find out if it is a binding activation + # where a previous binding was deactivated + BINDINGS = pb_ext.COLLECTION_NAME + if BINDINGS in kwargs.get('changed_fields', set()): + existing_active_binding = ( + utils.get_port_binding_by_status_and_host( + getattr(kwargs['existing'], 'bindings', []), + constants.ACTIVE)) + updated_active_binding = ( + utils.get_port_binding_by_status_and_host( + getattr(kwargs['updated'], 'bindings', []), + constants.ACTIVE)) + if (existing_active_binding and updated_active_binding and + existing_active_binding.host != + updated_active_binding.host): + if (utils.get_port_binding_by_status_and_host( + getattr(kwargs['updated'], 'bindings', []), + constants.INACTIVE, + host=existing_active_binding.host)): + method = 'binding_deactivate' + host = existing_active_binding.host + return method, host + def get_devices_details_list_and_failed_devices(self, context, devices, agent_id, host=None): result = {'devices': [], 'failed_devices': []} diff --git a/neutron/tests/unit/agent/test_rpc.py b/neutron/tests/unit/agent/test_rpc.py index cfbb56e3b04..f687f935b25 100644 --- a/neutron/tests/unit/agent/test_rpc.py +++ b/neutron/tests/unit/agent/test_rpc.py @@ -16,9 +16,15 @@ import datetime import mock +from neutron_lib.agent import topics as lib_topics +from neutron_lib.callbacks import events +from neutron_lib.callbacks import resources +from neutron_lib import constants from oslo_context import context as oslo_context +from oslo_utils import uuidutils from neutron.agent import rpc +from neutron.objects import ports from neutron.tests import base @@ -162,3 +168,100 @@ class AgentRPCMethods(base.BaseTestCase): with mock.patch(call_to_patch) as create_connection: rpc.create_consumers(endpoints, 'foo', [('topic', 'op', 'node1')]) create_connection.assert_has_calls(expected) + + +class TestCacheBackedPluginApi(base.BaseTestCase): + + def setUp(self): + super(TestCacheBackedPluginApi, self).setUp() + self._api = rpc.CacheBackedPluginApi(lib_topics.PLUGIN) + self._api._legacy_interface = mock.Mock() + self._port_id = uuidutils.generate_uuid() + self._port = ports.Port( + id=self._port_id, + bindings=[ports.PortBinding(port_id=self._port_id, + host='host1', + status=constants.ACTIVE)]) + + def test__legacy_notifier_resource_delete(self): + self._api._legacy_notifier(resources.PORT, events.AFTER_DELETE, self, + mock.ANY, resource_id=self._port_id, + existing=self._port) + self._api._legacy_interface.port_update.assert_not_called() + self._api._legacy_interface.port_delete.assert_called_once_with( + mock.ANY, port={'id': self._port_id}, port_id=self._port_id, + host=None) + self._api._legacy_interface.binding_deactivate.assert_not_called() + + def test__legacy_notifier_resource_update(self): + updated_port = ports.Port(id=self._port_id, name='updated_port') + self._api._legacy_notifier(resources.PORT, events.AFTER_UPDATE, self, + mock.ANY, changed_fields=set(['name']), + resource_id=self._port_id, + existing=self._port, updated=updated_port) + self._api._legacy_interface.port_delete.assert_not_called() + self._api._legacy_interface.port_update.assert_called_once_with( + mock.ANY, port={'id': self._port_id}, port_id=self._port_id, + host=None) + self._api._legacy_interface.binding_deactivate.assert_not_called() + + def _test__legacy_notifier_binding_activated(self): + updated_port = ports.Port( + id=self._port_id, name='updated_port', + bindings=[ports.PortBinding(port_id=self._port_id, + host='host2', + status=constants.ACTIVE), + ports.PortBinding(port_id=self._port_id, + host='host1', + status=constants.INACTIVE)]) + self._api._legacy_notifier(resources.PORT, events.AFTER_UPDATE, self, + mock.ANY, + changed_fields=set(['name', 'bindings']), + resource_id=self._port_id, + existing=self._port, updated=updated_port) + self._api._legacy_interface.port_update.assert_not_called() + self._api._legacy_interface.port_delete.assert_not_called() + + def test__legacy_notifier_new_binding_activated(self): + self._test__legacy_notifier_binding_activated() + self._api._legacy_interface.binding_deactivate.assert_called_once_with( + mock.ANY, port={'id': self._port_id}, port_id=self._port_id, + host='host1') + + def test__legacy_notifier_no_new_binding_activated(self): + updated_port = ports.Port( + id=self._port_id, name='updated_port', + bindings=[ports.PortBinding(port_id=self._port_id, + host='host2', + status=constants.ACTIVE)]) + self._api._legacy_notifier(resources.PORT, events.AFTER_UPDATE, self, + mock.ANY, + changed_fields=set(['name', 'bindings']), + resource_id=self._port_id, + existing=self._port, updated=updated_port) + self._api._legacy_interface.port_update.assert_called_once_with( + mock.ANY, port={'id': self._port_id}, port_id=self._port_id, + host=None) + self._api._legacy_interface.port_delete.assert_not_called() + self._api._legacy_interface.binding_deactivate.assert_not_called() + + def test__legacy_notifier_existing_or_updated_is_none(self): + self._api._legacy_notifier(resources.PORT, events.AFTER_UPDATE, + self, mock.ANY, + changed_fields=set(['name', 'bindings']), + resource_id=self._port_id, + existing=None, updated=None) + self._api._legacy_notifier(resources.PORT, events.AFTER_UPDATE, self, + mock.ANY, + changed_fields=set(['name', 'bindings']), + resource_id=self._port_id, + existing=self._port, updated=None) + call = mock.call(mock.ANY, port={'id': self._port_id}, + port_id=self._port_id, host=None) + self._api._legacy_interface.port_update.assert_has_calls([call, call]) + self._api._legacy_interface.port_delete.assert_not_called() + self._api._legacy_interface.binding_deactivate.assert_not_called() + + def test__legacy_notifier_binding_activated_not_supported(self): + del self._api._legacy_interface.binding_deactivate + self._test__legacy_notifier_binding_activated()