From 44e191330414d130e020696d1131924d296d0bf3 Mon Sep 17 00:00:00 2001 From: Robert Kukura Date: Wed, 20 Feb 2019 19:45:39 -0500 Subject: [PATCH] [AIM] Dispatch Opflex RPCs directly to the mechanism driver The ML2Plus plugin now overrides ML2's implementation of start_rpc_listeners() so that it also calls similarly named functions on registered mechanism drivers. The apic_aim mechanism driver's start_rpc_listeners() method starts the topology RPC listener (which was previously started even if not in an RPC worker), and, if enabled, an RPC listener for the new Opflex RPC implementation. The aim_mapping policy driver no longer starts its Opflex RPC listener if the new RPC implementation is enabled. RPC initialization code is consolidated into the mechanism driver's rpc module. Change-Id: I685a55248cb17b5e2351805beae6cbd7ab2e8830 --- .../neutron/plugins/ml2plus/driver_api.py | 9 ++++ .../drivers/apic_aim/mechanism_driver.py | 31 ++--------- .../plugins/ml2plus/drivers/apic_aim/rpc.py | 52 ++++++++++++++++++- .../neutron/plugins/ml2plus/managers.py | 7 +++ gbpservice/neutron/plugins/ml2plus/plugin.py | 5 ++ .../drivers/cisco/apic/aim_mapping.py | 2 + .../ml2plus/drivers/mechanism_logger.py | 4 ++ .../unit/plugins/ml2plus/test_apic_aim.py | 32 +++++++++++- .../tests/unit/plugins/ml2plus/test_plugin.py | 37 ++++++++++++- 9 files changed, 149 insertions(+), 30 deletions(-) diff --git a/gbpservice/neutron/plugins/ml2plus/driver_api.py b/gbpservice/neutron/plugins/ml2plus/driver_api.py index ad19f87a1..f08d8d954 100644 --- a/gbpservice/neutron/plugins/ml2plus/driver_api.py +++ b/gbpservice/neutron/plugins/ml2plus/driver_api.py @@ -152,6 +152,15 @@ class SecurityGroupRuleContext(object): @six.add_metaclass(abc.ABCMeta) class MechanismDriver(driver_api.MechanismDriver): + def start_rpc_listeners(self): + """Start driver-specify RPC listeners. + + If any driver-specific RPC listeners are needed, create an RPC + connection, create the consumers, call consume_in_threads() on + the connection, and return the resulting list of servers. + """ + return [] + # REVISIT(rkukura): Is this needed for all operations, or just for # create operations? If its needed for all operations, should the # method be specific to the resource and operation, and include diff --git a/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/mechanism_driver.py b/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/mechanism_driver.py index 5578e600b..5368addf9 100644 --- a/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/mechanism_driver.py +++ b/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/mechanism_driver.py @@ -63,7 +63,6 @@ from neutron_lib.plugins import directory from neutron_lib.plugins.ml2 import api from neutron_lib.utils import net from opflexagent import constants as ofcst -from opflexagent import host_agent_rpc as arpc from opflexagent import rpc as ofrpc from oslo_config import cfg from oslo_db import exception as db_exc @@ -205,26 +204,6 @@ class ApicMechanismDriver(api_plus.MechanismDriver, rpc.ApicRpcHandlerMixin): NIC_NAME_LEN = 14 - class TopologyRpcEndpoint(object): - target = oslo_messaging.Target(version=arpc.VERSION) - - def __init__(self, mechanism_driver): - self.md = mechanism_driver - - @db_api.retry_if_session_inactive() - def update_link(self, context, *args, **kwargs): - context._session = db_api.get_writer_session() - return self.md.update_link(context, *args, **kwargs) - - @db_api.retry_if_session_inactive() - def delete_link(self, context, *args, **kwargs): - # Don't take any action on link deletion in order to tolerate - # situations like fabric upgrade or flapping links. Old links - # are removed once a specific host is attached somewhere else. - # To completely decommission the host, aimctl can be used to - # cleanup the hostlink table - return - def __init__(self): LOG.info("APIC AIM MD __init__") @@ -250,12 +229,6 @@ class ApicMechanismDriver(api_plus.MechanismDriver, self.apic_system_id = cfg.CONF.apic_system_id self.notifier = ofrpc.AgentNotifierApi(n_topics.AGENT) self.sg_enabled = securitygroups_rpc.is_firewall_enabled() - # setup APIC topology RPC handler - self.topology_conn = n_rpc.create_connection() - self.topology_conn.create_consumer(arpc.TOPIC_APIC_SERVICE, - [self.TopologyRpcEndpoint(self)], - fanout=False) - self.topology_conn.consume_in_threads() self.keystone_notification_exchange = (cfg.CONF.ml2_apic_aim. keystone_notification_exchange) self.keystone_notification_topic = (cfg.CONF.ml2_apic_aim. @@ -284,6 +257,10 @@ class ApicMechanismDriver(api_plus.MechanismDriver, self.apic_router_id_pool = cfg.CONF.ml2_apic_aim.apic_router_id_pool self.apic_router_id_subnet = netaddr.IPSet([self.apic_router_id_pool]) + def start_rpc_listeners(self): + LOG.info("APIC AIM MD starting RPC listeners") + return self._start_rpc_listeners() + def _setup_nova_vm_update(self): self.admin_context = nctx.get_admin_context() self.host_id = 'id-%s' % net.get_hostname() diff --git a/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/rpc.py b/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/rpc.py index ce23fd04a..ef48accbb 100644 --- a/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/rpc.py +++ b/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/rpc.py @@ -19,6 +19,7 @@ import netaddr import sqlalchemy as sa from sqlalchemy.ext import baked +from neutron.common import rpc as n_rpc from neutron.db import api as db_api from neutron.db.extra_dhcp_opt import models as dhcp_models from neutron.db.models import allowed_address_pair as aap_models @@ -33,7 +34,10 @@ from neutron.services.trunk import models as trunk_models from neutron_lib.api.definitions import portbindings from neutron_lib import constants as n_constants from neutron_lib import context as n_context +from opflexagent import host_agent_rpc as oa_rpc +from opflexagent import rpc as o_rpc from oslo_log import log +import oslo_messaging from oslo_serialization import jsonutils from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import constants @@ -151,8 +155,49 @@ EndpointTrunkInfo = namedtuple( 'segmentation_id']) +class TopologyRpcEndpoint(object): + + target = oslo_messaging.Target(version=oa_rpc.VERSION) + + def __init__(self, mechanism_driver): + self.md = mechanism_driver + + @db_api.retry_if_session_inactive() + def update_link(self, context, *args, **kwargs): + context._session = db_api.get_writer_session() + return self.md.update_link(context, *args, **kwargs) + + @db_api.retry_if_session_inactive() + def delete_link(self, context, *args, **kwargs): + # Don't take any action on link deletion in order to tolerate + # situations like fabric upgrade or flapping links. Old links + # are removed once a specific host is attached somewhere else. + # To completely decommission the host, aimctl can be used to + # cleanup the hostlink table. + return + + class ApicRpcHandlerMixin(object): + def _start_rpc_listeners(self): + conn = n_rpc.create_connection() + + # Opflex RPC handler. + if self.enable_new_rpc: + conn.create_consumer( + o_rpc.TOPIC_OPFLEX, + [o_rpc.GBPServerRpcCallback(self, self.notifier)], + fanout=False) + + # Topology RPC hander. + conn.create_consumer( + oa_rpc.TOPIC_APIC_SERVICE, + [TopologyRpcEndpoint(self)], + fanout=False) + + # Start listeners and return list of servers. + return conn.consume_in_threads() + # The following five methods handle RPCs from the Opflex agent. # # REVISIT: These handler methods are currently called by @@ -232,7 +277,12 @@ class ApicRpcHandlerMixin(object): # implementation from get_vrf_details() to this method. return self.get_vrf_details(context, kwargs) - # REVISIT: def ip_address_owner_update(self, context, **kwargs): + def ip_address_owner_update(self, context, **kwargs): + LOG.debug("APIC AIM MD handling ip_address_owner_update for: %s", + kwargs) + # REVISIT: Move actual handler implementation to this class. + if self.gbp_driver: + self.gbp_driver.ip_address_owner_update(context, **kwargs) @db_api.retry_if_session_inactive() def _get_vrf_details(self, context, vrf_id): diff --git a/gbpservice/neutron/plugins/ml2plus/managers.py b/gbpservice/neutron/plugins/ml2plus/managers.py index ef05a990d..21d110b41 100644 --- a/gbpservice/neutron/plugins/ml2plus/managers.py +++ b/gbpservice/neutron/plugins/ml2plus/managers.py @@ -93,6 +93,13 @@ class MechanismManager(managers.MechanismManager): errors=errors ) + def start_rpc_listeners(self): + servers = [] + for driver in self.ordered_mech_drivers: + if isinstance(driver.obj, driver_api.MechanismDriver): + servers.extend(driver.obj.start_rpc_listeners()) + return servers + def ensure_tenant(self, plugin_context, tenant_id): for driver in self.ordered_mech_drivers: if isinstance(driver.obj, driver_api.MechanismDriver): diff --git a/gbpservice/neutron/plugins/ml2plus/plugin.py b/gbpservice/neutron/plugins/ml2plus/plugin.py index e5a208fc7..f1fbd9de3 100644 --- a/gbpservice/neutron/plugins/ml2plus/plugin.py +++ b/gbpservice/neutron/plugins/ml2plus/plugin.py @@ -137,6 +137,11 @@ class Ml2PlusPlugin(ml2_plugin.Ml2Plugin, self._verify_service_plugins_requirements() LOG.info("Modular L2 Plugin (extended) initialization complete") + def start_rpc_listeners(self): + servers = super(Ml2PlusPlugin, self).start_rpc_listeners() + servers.extend(self.mechanism_manager.start_rpc_listeners()) + return servers + def _handle_security_group_change(self, resource, event, trigger, **kwargs): if 'payload' in kwargs: diff --git a/gbpservice/neutron/services/grouppolicy/drivers/cisco/apic/aim_mapping.py b/gbpservice/neutron/services/grouppolicy/drivers/cisco/apic/aim_mapping.py index c10680675..1a017474f 100644 --- a/gbpservice/neutron/services/grouppolicy/drivers/cisco/apic/aim_mapping.py +++ b/gbpservice/neutron/services/grouppolicy/drivers/cisco/apic/aim_mapping.py @@ -159,6 +159,8 @@ class AIMMappingDriver(nrd.CommonNeutronBase, aim_rpc.AIMMappingRPCMixin): @log.log_method_call def start_rpc_listeners(self): + if self.aim_mech_driver.enable_new_rpc: + return [] return self.setup_opflex_rpc_listeners() def validate_state(self, repair): diff --git a/gbpservice/neutron/tests/unit/plugins/ml2plus/drivers/mechanism_logger.py b/gbpservice/neutron/tests/unit/plugins/ml2plus/drivers/mechanism_logger.py index 6d73d982c..99921387b 100644 --- a/gbpservice/neutron/tests/unit/plugins/ml2plus/drivers/mechanism_logger.py +++ b/gbpservice/neutron/tests/unit/plugins/ml2plus/drivers/mechanism_logger.py @@ -32,6 +32,10 @@ class LoggerPlusMechanismDriver(driver_api.MechanismDriver, def initialize(self): LOG.info("initialize called") + def start_rpc_listeners(self): + LOG.info("start_rpc_listeners called") + return [] + def ensure_tenant(self, plugin_context, tenant_id): LOG.info("ensure_tenant called with tenant_id %s", tenant_id) diff --git a/gbpservice/neutron/tests/unit/plugins/ml2plus/test_apic_aim.py b/gbpservice/neutron/tests/unit/plugins/ml2plus/test_apic_aim.py index 8a54b0384..fbff6b960 100644 --- a/gbpservice/neutron/tests/unit/plugins/ml2plus/test_apic_aim.py +++ b/gbpservice/neutron/tests/unit/plugins/ml2plus/test_apic_aim.py @@ -285,7 +285,6 @@ class ApicAimTestCase(test_address_scope.AddressScopeTestCase, self.saved_keystone_client = ksc_client.Client ksc_client.Client = FakeKeystoneClient self.plugin = directory.get_plugin() - self.plugin.start_rpc_listeners() self.driver = self.plugin.mechanism_manager.mech_drivers[ 'apic_aim'].obj self.l3_plugin = directory.get_plugin(n_constants.L3) @@ -471,6 +470,37 @@ class ApicAimTestCase(test_address_scope.AddressScopeTestCase, return verify +class TestRpcListeners(ApicAimTestCase): + @staticmethod + def _consume_in_threads(self): + return self.servers + + # REVISIT: Remove new_rpc option with old RPC cleanup. + def _test_start_rpc_listeners(self, new_rpc): + # Override mock from + # neutron.tests.base.BaseTestCase.setup_rpc_mocks(), so that + # it returns servers, but still avoids starting them. + with mock.patch('neutron.common.rpc.Connection.consume_in_threads', + TestRpcListeners._consume_in_threads): + # Call plugin method and verify that the apic_aim MD's + # RPC servers are returned. + servers = self.plugin.start_rpc_listeners() + topics = [server._target.topic for server in servers] + self.assertIn('apic-service', topics) + if new_rpc: + self.assertIn('opflex', topics) + else: + self.assertNotIn('opflex', topics) + + def test_start_rpc_listeners(self): + self.driver.enable_new_rpc = False + self._test_start_rpc_listeners(False) + + def test_start_rpc_listeners_new_rpc(self): + self.driver.enable_new_rpc = True + self._test_start_rpc_listeners(True) + + class TestAimMapping(ApicAimTestCase): def setUp(self): self.call_wrapper = CallRecordWrapper() diff --git a/gbpservice/neutron/tests/unit/plugins/ml2plus/test_plugin.py b/gbpservice/neutron/tests/unit/plugins/ml2plus/test_plugin.py index e89e43f5c..d92c1da8a 100644 --- a/gbpservice/neutron/tests/unit/plugins/ml2plus/test_plugin.py +++ b/gbpservice/neutron/tests/unit/plugins/ml2plus/test_plugin.py @@ -17,6 +17,8 @@ import mock import testtools from neutron.api import extensions +from neutron.common import rpc as n_rpc +from neutron.common import topics from neutron.conf.plugins.ml2 import config # noqa from neutron.conf.plugins.ml2.drivers import driver_type from neutron.tests.unit.api import test_extensions @@ -57,7 +59,6 @@ class Ml2PlusPluginV2TestCase(test_address_scope.AddressScopeTestCase): self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr) self.port_create_status = 'DOWN' self.plugin = directory.get_plugin() - self.plugin.start_rpc_listeners() def exist_checker(self, getter): def verify(context): @@ -67,6 +68,40 @@ class Ml2PlusPluginV2TestCase(test_address_scope.AddressScopeTestCase): return verify +class TestRpcListeners(Ml2PlusPluginV2TestCase): + @staticmethod + def _consume_in_threads(self): + return self.servers + + @staticmethod + def _start_rpc_listeners(self): + conn = n_rpc.create_connection() + conn.create_consumer('q-test-topic', []) + return conn.consume_in_threads() + + def test_start_rpc_listeners(self): + # Override mock from + # neutron.tests.base.BaseTestCase.setup_rpc_mocks(), so that + # it returns servers, but still avoids starting them. + with mock.patch('neutron.common.rpc.Connection.consume_in_threads', + TestRpcListeners._consume_in_threads): + # Mock logger MD to start an RPC listener. + with mock.patch( + 'gbpservice.neutron.tests.unit.plugins.ml2plus.drivers.' + 'mechanism_logger.LoggerPlusMechanismDriver.' + 'start_rpc_listeners', + TestRpcListeners._start_rpc_listeners): + # Call plugin method and verify that the base ML2 + # servers as well as the test MD server are returned. + servers = self.plugin.start_rpc_listeners() + self.assertEqual(sorted([topics.PLUGIN, + topics.SERVER_RESOURCE_VERSIONS, + topics.REPORTS, + 'q-test-topic']), + sorted([server._target.topic + for server in servers])) + + class TestEnsureTenant(Ml2PlusPluginV2TestCase): def test_network(self): with mock.patch.object(mech_logger.LoggerPlusMechanismDriver,