From c3db9d6b0b990da6664955e4ce1c72758dc600e1 Mon Sep 17 00:00:00 2001 From: Kevin Benton Date: Sun, 22 Jan 2017 17:01:47 -0800 Subject: [PATCH] Use push-notificates for OVSPluginAPI Replace the calls to the OVSPluginAPI info retrieval functions with reads directly from the push notification cache. Since we now depend on the cache for the source of truth, the 'port_update'/'port_delete'/'network_update' handlers are configured to be called whenever the cache receives a corresponding resource update. The OVS agent will no longer subscribe to topic notifications for ports or networks from the legacy notification API. Partially-Implements: blueprint push-notifications Change-Id: Ib2234ec1f5d328649c6bb1c3fe07799d3e351f48 --- neutron/agent/rpc.py | 121 ++++++++++++++++++ .../openvswitch/agent/ovs_neutron_agent.py | 11 +- .../agent/test_ovs_neutron_agent.py | 4 + .../openvswitch/agent/test_ovs_tunnel.py | 3 + 4 files changed, 133 insertions(+), 6 deletions(-) diff --git a/neutron/agent/rpc.py b/neutron/agent/rpc.py index baa87ee44a0..bcfc6c3e1df 100644 --- a/neutron/agent/rpc.py +++ b/neutron/agent/rpc.py @@ -16,13 +16,22 @@ from datetime import datetime import itertools +import netaddr from neutron_lib import constants +from oslo_log import log as logging import oslo_messaging from oslo_utils import uuidutils +from neutron.agent import resource_cache +from neutron.api.rpc.callbacks import resources +from neutron.callbacks import events as callback_events +from neutron.callbacks import registry from neutron.common import constants as n_const from neutron.common import rpc as n_rpc from neutron.common import topics +from neutron import objects + +LOG = logging.getLogger(__name__) def create_consumers(endpoints, prefix, topic_details, start_listening=True): @@ -145,3 +154,115 @@ class PluginApi(object): cctxt = self.client.prepare(version='1.4') return cctxt.call(context, 'tunnel_sync', tunnel_ip=tunnel_ip, tunnel_type=tunnel_type, host=host) + + +def create_cache_for_l2_agent(): + """Create a push-notifications cache for L2 agent related resources.""" + + objects.register_objects() + resource_types = [ + resources.PORT, + resources.SECURITYGROUP, + resources.SECURITYGROUPRULE, + resources.NETWORK, + resources.SUBNET + ] + rcache = resource_cache.RemoteResourceCache(resource_types) + rcache.start_watcher() + # TODO(kevinbenton): ensure flood uses filters or that this has a long + # timeout before Pike release. + rcache.bulk_flood_cache() + return rcache + + +class CacheBackedPluginApi(PluginApi): + + def __init__(self, *args, **kwargs): + super(CacheBackedPluginApi, self).__init__(*args, **kwargs) + self.remote_resource_cache = create_cache_for_l2_agent() + + def register_legacy_notification_callbacks(self, legacy_interface): + """Emulates the server-side notifications from ml2 AgentNotifierApi. + + legacy_interface is an object with 'delete'/'update' methods for + core resources. + """ + self._legacy_interface = legacy_interface + for e in (callback_events.AFTER_UPDATE, callback_events.AFTER_DELETE): + for r in (resources.PORT, resources.NETWORK): + registry.subscribe(self._legacy_notifier, r, e) + + def _legacy_notifier(self, rtype, event, trigger, context, resource_id, + **kwargs): + """Checks if legacy interface is expecting calls for resource. + + looks for port_update, network_delete, etc and calls them with + the payloads the handlers are expecting (an ID). + """ + rtype = rtype.lower() # all legacy handlers don't camelcase + is_delete = event == callback_events.AFTER_DELETE + suffix = 'delete' if is_delete else 'update' + method = "%s_%s" % (rtype, suffix) + if not hasattr(self._legacy_interface, method): + # TODO(kevinbenton): once these notifications are stable, emit + # a deprecation warning for legacy handlers + return + payload = {rtype: {'id': resource_id}, '%s_id' % rtype: resource_id} + getattr(self._legacy_interface, method)(context, **payload) + + def get_devices_details_list_and_failed_devices(self, context, devices, + agent_id, host=None): + result = {'devices': [], 'failed_devices': []} + for device in devices: + try: + result['devices'].append( + self.get_device_details(context, device, agent_id, host)) + except Exception: + LOG.exception("Failed to get details for device %s", device) + result['failed_devices'].append(device) + return result + + def get_device_details(self, context, device, agent_id, host=None): + port_obj = self.remote_resource_cache.get_resource_by_id( + resources.PORT, device) + if not port_obj: + LOG.debug("Device %s does not exist in cache.", device) + return {'device': device} + if not port_obj.binding_levels: + LOG.warning("Device %s is not bound.", port_obj) + return {'device': device} + segment = port_obj.binding_levels[-1].segment + net = self.remote_resource_cache.get_resource_by_id( + resources.NETWORK, port_obj.network_id) + net_qos_policy_id = net.qos_policy_id + # match format of old RPC interface + mac_addr = str(netaddr.EUI(str(port_obj.mac_address), + dialect=netaddr.mac_unix_expanded)) + entry = { + 'device': device, + 'network_id': port_obj.network_id, + 'port_id': port_obj.id, + 'mac_address': mac_addr, + 'admin_state_up': port_obj.admin_state_up, + 'network_type': segment.network_type, + 'segmentation_id': segment.segmentation_id, + 'physical_network': segment.physical_network, + 'fixed_ips': [{'subnet_id': o.subnet_id, + 'ip_address': str(o.ip_address)} + for o in port_obj.fixed_ips], + 'device_owner': port_obj.device_owner, + 'allowed_address_pairs': [{'mac_address': o.mac_address, + 'ip_address': o.ip_address} + for o in port_obj.allowed_address_pairs], + 'port_security_enabled': port_obj.security.port_security_enabled, + 'qos_policy_id': port_obj.qos_policy_id, + 'network_qos_policy_id': net_qos_policy_id, + 'profile': port_obj.binding.profile, + 'security_groups': list(port_obj.security_group_ids) + } + LOG.debug("Returning: %s", entry) + return entry + + def get_devices_details_list(self, context, devices, agent_id, host=None): + return [self.get_device_details(context, device, agent_id, host) + for device in devices] diff --git a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py index 935dd6f2830..16108daf4e1 100644 --- a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py +++ b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py @@ -80,7 +80,7 @@ class _mac_mydialect(netaddr.mac_unix): word_fmt = '%.2x' -class OVSPluginApi(agent_rpc.PluginApi): +class OVSPluginApi(agent_rpc.CacheBackedPluginApi): pass @@ -360,6 +360,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, def setup_rpc(self): self.plugin_rpc = OVSPluginApi(topics.PLUGIN) + # allow us to receive port_update/delete callbacks from the cache + self.plugin_rpc.register_legacy_notification_callbacks(self) self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN) self.dvr_plugin_rpc = dvr_rpc.DVRServerRpcApi(topics.PLUGIN) self.state_rpc = agent_rpc.PluginReportStateAPI(topics.REPORTS) @@ -367,13 +369,10 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, # RPC network init self.context = context.get_admin_context_without_session() # Define the listening consumers for the agent - consumers = [[topics.PORT, topics.UPDATE], - [topics.PORT, topics.DELETE], - [constants.TUNNEL, topics.UPDATE], + consumers = [[constants.TUNNEL, topics.UPDATE], [constants.TUNNEL, topics.DELETE], [topics.SECURITY_GROUP, topics.UPDATE], - [topics.DVR, topics.UPDATE], - [topics.NETWORK, topics.UPDATE]] + [topics.DVR, topics.UPDATE]] if self.l2_pop: consumers.append([topics.L2POPULATION, topics.UPDATE]) self.connection = agent_rpc.create_consumers([self], diff --git a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py index 706d8006cea..e8eb3e423cc 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py +++ b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py @@ -42,6 +42,7 @@ from neutron.tests.unit.plugins.ml2.drivers.openvswitch.agent \ NOTIFIER = 'neutron.plugins.ml2.rpc.AgentNotifierApi' +PULLAPI = 'neutron.api.rpc.handlers.resources_rpc.ResourcesPullRpcApi' OVS_LINUX_KERN_VERS_WITHOUT_VXLAN = "3.12.0" FAKE_MAC = '00:11:22:33:44:55' @@ -101,6 +102,7 @@ class TestOvsNeutronAgent(object): def setUp(self): super(TestOvsNeutronAgent, self).setUp() self.useFixture(test_vlanmanager.LocalVlanManagerFixture()) + mock.patch(PULLAPI).start() notifier_p = mock.patch(NOTIFIER) notifier_cls = notifier_p.start() self.notifier = mock.Mock() @@ -2165,6 +2167,7 @@ class AncillaryBridgesTest(object): 'neutron.agent.ovsdb.impl_idl._connection') conn_patcher.start() self.addCleanup(conn_patcher.stop) + mock.patch(PULLAPI).start() notifier_p = mock.patch(NOTIFIER) notifier_cls = notifier_p.start() self.notifier = mock.Mock() @@ -2284,6 +2287,7 @@ class TestOvsDvrNeutronAgent(object): def setUp(self): super(TestOvsDvrNeutronAgent, self).setUp() + mock.patch(PULLAPI).start() notifier_p = mock.patch(NOTIFIER) notifier_cls = notifier_p.start() self.notifier = mock.Mock() diff --git a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_tunnel.py b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_tunnel.py index 269daeec5ed..13bd4010bf2 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_tunnel.py +++ b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_tunnel.py @@ -80,6 +80,9 @@ class TunnelTest(object): conn_patcher = mock.patch( 'neutron.agent.ovsdb.impl_idl._connection') conn_patcher.start() + mock.patch( + 'neutron.api.rpc.handlers.resources_rpc.ResourcesPullRpcApi' + ).start() self.addCleanup(conn_patcher.stop) cfg.CONF.set_default('firewall_driver', 'neutron.agent.firewall.NoopFirewallDriver',