From be7ad302e1f8b9a6b57c7b0c9f6c17f43f63f391 Mon Sep 17 00:00:00 2001 From: Miguel Lavalle Date: Mon, 7 May 2018 16:39:56 -0500 Subject: [PATCH] Support binding_deactivate in CacheBackedPluginApi CacheBackedPluginApi enables the neutron server to push resources updates to L2 agents. The agents retrieve the resources updates locally from the cache implemented by it. CacheBackedPluginApi also emulates server notifications to the agents, such as port_delete or port_update, based on the updated data received by the cache. This commit adds code to CacheBackedPluginApi to implement a binding_deactivate notification from the server to the agents Change-Id: I023ccbd405bc41379007d87a9c1051970aa8d603 Partial-Bug: #1580880 --- neutron/agent/rpc.py | 48 +++++++++++-- neutron/tests/unit/agent/test_rpc.py | 103 +++++++++++++++++++++++++++ 2 files changed, 147 insertions(+), 4 deletions(-) diff --git a/neutron/agent/rpc.py b/neutron/agent/rpc.py index 13a2b7f1ac9..c76bf2f20ff 100644 --- a/neutron/agent/rpc.py +++ b/neutron/agent/rpc.py @@ -18,8 +18,10 @@ import itertools import netaddr from neutron_lib.agent import topics +from neutron_lib.api.definitions import portbindings_extended as pb_ext from neutron_lib.callbacks import events as callback_events from neutron_lib.callbacks import registry +from neutron_lib.callbacks import resources as callback_resources from neutron_lib import constants from oslo_log import log as logging import oslo_messaging @@ -199,16 +201,54 @@ class CacheBackedPluginApi(PluginApi): the payloads the handlers are expecting (an ID). """ rtype = rtype.lower() # all legacy handlers don't camelcase - is_delete = event == callback_events.AFTER_DELETE - suffix = 'delete' if is_delete else 'update' - method = "%s_%s" % (rtype, suffix) + method, host = self._get_method_host(rtype, event, **kwargs) if not hasattr(self._legacy_interface, method): # TODO(kevinbenton): once these notifications are stable, emit # a deprecation warning for legacy handlers return - payload = {rtype: {'id': resource_id}, '%s_id' % rtype: resource_id} + payload = {rtype: {'id': resource_id}, '%s_id' % rtype: resource_id, + 'host': host} getattr(self._legacy_interface, method)(context, **payload) + def _get_method_host(self, rtype, event, **kwargs): + """Constructs the name of method to be called in the legacy interface. + + If the event received is a port update that contains a binding + activation where a previous binding is deactivated, the method name + is 'binding_deactivate' and the host where the binding has to be + deactivated is returned. Otherwise, the method name is constructed from + rtype and the event received and the host is None. + """ + is_delete = event == callback_events.AFTER_DELETE + suffix = 'delete' if is_delete else 'update' + method = "%s_%s" % (rtype, suffix) + host = None + if is_delete or rtype != callback_resources.PORT: + return method, host + + # A port update was received. Find out if it is a binding activation + # where a previous binding was deactivated + BINDINGS = pb_ext.COLLECTION_NAME + if BINDINGS in kwargs.get('changed_fields', set()): + existing_active_binding = ( + utils.get_port_binding_by_status_and_host( + getattr(kwargs['existing'], 'bindings', []), + constants.ACTIVE)) + updated_active_binding = ( + utils.get_port_binding_by_status_and_host( + getattr(kwargs['updated'], 'bindings', []), + constants.ACTIVE)) + if (existing_active_binding and updated_active_binding and + existing_active_binding.host != + updated_active_binding.host): + if (utils.get_port_binding_by_status_and_host( + getattr(kwargs['updated'], 'bindings', []), + constants.INACTIVE, + host=existing_active_binding.host)): + method = 'binding_deactivate' + host = existing_active_binding.host + return method, host + def get_devices_details_list_and_failed_devices(self, context, devices, agent_id, host=None): result = {'devices': [], 'failed_devices': []} diff --git a/neutron/tests/unit/agent/test_rpc.py b/neutron/tests/unit/agent/test_rpc.py index cfbb56e3b04..f687f935b25 100644 --- a/neutron/tests/unit/agent/test_rpc.py +++ b/neutron/tests/unit/agent/test_rpc.py @@ -16,9 +16,15 @@ import datetime import mock +from neutron_lib.agent import topics as lib_topics +from neutron_lib.callbacks import events +from neutron_lib.callbacks import resources +from neutron_lib import constants from oslo_context import context as oslo_context +from oslo_utils import uuidutils from neutron.agent import rpc +from neutron.objects import ports from neutron.tests import base @@ -162,3 +168,100 @@ class AgentRPCMethods(base.BaseTestCase): with mock.patch(call_to_patch) as create_connection: rpc.create_consumers(endpoints, 'foo', [('topic', 'op', 'node1')]) create_connection.assert_has_calls(expected) + + +class TestCacheBackedPluginApi(base.BaseTestCase): + + def setUp(self): + super(TestCacheBackedPluginApi, self).setUp() + self._api = rpc.CacheBackedPluginApi(lib_topics.PLUGIN) + self._api._legacy_interface = mock.Mock() + self._port_id = uuidutils.generate_uuid() + self._port = ports.Port( + id=self._port_id, + bindings=[ports.PortBinding(port_id=self._port_id, + host='host1', + status=constants.ACTIVE)]) + + def test__legacy_notifier_resource_delete(self): + self._api._legacy_notifier(resources.PORT, events.AFTER_DELETE, self, + mock.ANY, resource_id=self._port_id, + existing=self._port) + self._api._legacy_interface.port_update.assert_not_called() + self._api._legacy_interface.port_delete.assert_called_once_with( + mock.ANY, port={'id': self._port_id}, port_id=self._port_id, + host=None) + self._api._legacy_interface.binding_deactivate.assert_not_called() + + def test__legacy_notifier_resource_update(self): + updated_port = ports.Port(id=self._port_id, name='updated_port') + self._api._legacy_notifier(resources.PORT, events.AFTER_UPDATE, self, + mock.ANY, changed_fields=set(['name']), + resource_id=self._port_id, + existing=self._port, updated=updated_port) + self._api._legacy_interface.port_delete.assert_not_called() + self._api._legacy_interface.port_update.assert_called_once_with( + mock.ANY, port={'id': self._port_id}, port_id=self._port_id, + host=None) + self._api._legacy_interface.binding_deactivate.assert_not_called() + + def _test__legacy_notifier_binding_activated(self): + updated_port = ports.Port( + id=self._port_id, name='updated_port', + bindings=[ports.PortBinding(port_id=self._port_id, + host='host2', + status=constants.ACTIVE), + ports.PortBinding(port_id=self._port_id, + host='host1', + status=constants.INACTIVE)]) + self._api._legacy_notifier(resources.PORT, events.AFTER_UPDATE, self, + mock.ANY, + changed_fields=set(['name', 'bindings']), + resource_id=self._port_id, + existing=self._port, updated=updated_port) + self._api._legacy_interface.port_update.assert_not_called() + self._api._legacy_interface.port_delete.assert_not_called() + + def test__legacy_notifier_new_binding_activated(self): + self._test__legacy_notifier_binding_activated() + self._api._legacy_interface.binding_deactivate.assert_called_once_with( + mock.ANY, port={'id': self._port_id}, port_id=self._port_id, + host='host1') + + def test__legacy_notifier_no_new_binding_activated(self): + updated_port = ports.Port( + id=self._port_id, name='updated_port', + bindings=[ports.PortBinding(port_id=self._port_id, + host='host2', + status=constants.ACTIVE)]) + self._api._legacy_notifier(resources.PORT, events.AFTER_UPDATE, self, + mock.ANY, + changed_fields=set(['name', 'bindings']), + resource_id=self._port_id, + existing=self._port, updated=updated_port) + self._api._legacy_interface.port_update.assert_called_once_with( + mock.ANY, port={'id': self._port_id}, port_id=self._port_id, + host=None) + self._api._legacy_interface.port_delete.assert_not_called() + self._api._legacy_interface.binding_deactivate.assert_not_called() + + def test__legacy_notifier_existing_or_updated_is_none(self): + self._api._legacy_notifier(resources.PORT, events.AFTER_UPDATE, + self, mock.ANY, + changed_fields=set(['name', 'bindings']), + resource_id=self._port_id, + existing=None, updated=None) + self._api._legacy_notifier(resources.PORT, events.AFTER_UPDATE, self, + mock.ANY, + changed_fields=set(['name', 'bindings']), + resource_id=self._port_id, + existing=self._port, updated=None) + call = mock.call(mock.ANY, port={'id': self._port_id}, + port_id=self._port_id, host=None) + self._api._legacy_interface.port_update.assert_has_calls([call, call]) + self._api._legacy_interface.port_delete.assert_not_called() + self._api._legacy_interface.binding_deactivate.assert_not_called() + + def test__legacy_notifier_binding_activated_not_supported(self): + del self._api._legacy_interface.binding_deactivate + self._test__legacy_notifier_binding_activated()