diff --git a/gbpservice/neutron/plugins/ml2plus/driver_api.py b/gbpservice/neutron/plugins/ml2plus/driver_api.py index ad19f87a1..f08d8d954 100644 --- a/gbpservice/neutron/plugins/ml2plus/driver_api.py +++ b/gbpservice/neutron/plugins/ml2plus/driver_api.py @@ -152,6 +152,15 @@ class SecurityGroupRuleContext(object): @six.add_metaclass(abc.ABCMeta) class MechanismDriver(driver_api.MechanismDriver): + def start_rpc_listeners(self): + """Start driver-specify RPC listeners. + + If any driver-specific RPC listeners are needed, create an RPC + connection, create the consumers, call consume_in_threads() on + the connection, and return the resulting list of servers. + """ + return [] + # REVISIT(rkukura): Is this needed for all operations, or just for # create operations? If its needed for all operations, should the # method be specific to the resource and operation, and include diff --git a/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/mechanism_driver.py b/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/mechanism_driver.py index 5578e600b..5368addf9 100644 --- a/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/mechanism_driver.py +++ b/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/mechanism_driver.py @@ -63,7 +63,6 @@ from neutron_lib.plugins import directory from neutron_lib.plugins.ml2 import api from neutron_lib.utils import net from opflexagent import constants as ofcst -from opflexagent import host_agent_rpc as arpc from opflexagent import rpc as ofrpc from oslo_config import cfg from oslo_db import exception as db_exc @@ -205,26 +204,6 @@ class ApicMechanismDriver(api_plus.MechanismDriver, rpc.ApicRpcHandlerMixin): NIC_NAME_LEN = 14 - class TopologyRpcEndpoint(object): - target = oslo_messaging.Target(version=arpc.VERSION) - - def __init__(self, mechanism_driver): - self.md = mechanism_driver - - @db_api.retry_if_session_inactive() - def update_link(self, context, *args, **kwargs): - context._session = db_api.get_writer_session() - return self.md.update_link(context, *args, **kwargs) - - @db_api.retry_if_session_inactive() - def delete_link(self, context, *args, **kwargs): - # Don't take any action on link deletion in order to tolerate - # situations like fabric upgrade or flapping links. Old links - # are removed once a specific host is attached somewhere else. - # To completely decommission the host, aimctl can be used to - # cleanup the hostlink table - return - def __init__(self): LOG.info("APIC AIM MD __init__") @@ -250,12 +229,6 @@ class ApicMechanismDriver(api_plus.MechanismDriver, self.apic_system_id = cfg.CONF.apic_system_id self.notifier = ofrpc.AgentNotifierApi(n_topics.AGENT) self.sg_enabled = securitygroups_rpc.is_firewall_enabled() - # setup APIC topology RPC handler - self.topology_conn = n_rpc.create_connection() - self.topology_conn.create_consumer(arpc.TOPIC_APIC_SERVICE, - [self.TopologyRpcEndpoint(self)], - fanout=False) - self.topology_conn.consume_in_threads() self.keystone_notification_exchange = (cfg.CONF.ml2_apic_aim. keystone_notification_exchange) self.keystone_notification_topic = (cfg.CONF.ml2_apic_aim. @@ -284,6 +257,10 @@ class ApicMechanismDriver(api_plus.MechanismDriver, self.apic_router_id_pool = cfg.CONF.ml2_apic_aim.apic_router_id_pool self.apic_router_id_subnet = netaddr.IPSet([self.apic_router_id_pool]) + def start_rpc_listeners(self): + LOG.info("APIC AIM MD starting RPC listeners") + return self._start_rpc_listeners() + def _setup_nova_vm_update(self): self.admin_context = nctx.get_admin_context() self.host_id = 'id-%s' % net.get_hostname() diff --git a/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/rpc.py b/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/rpc.py index ce23fd04a..ef48accbb 100644 --- a/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/rpc.py +++ b/gbpservice/neutron/plugins/ml2plus/drivers/apic_aim/rpc.py @@ -19,6 +19,7 @@ import netaddr import sqlalchemy as sa from sqlalchemy.ext import baked +from neutron.common import rpc as n_rpc from neutron.db import api as db_api from neutron.db.extra_dhcp_opt import models as dhcp_models from neutron.db.models import allowed_address_pair as aap_models @@ -33,7 +34,10 @@ from neutron.services.trunk import models as trunk_models from neutron_lib.api.definitions import portbindings from neutron_lib import constants as n_constants from neutron_lib import context as n_context +from opflexagent import host_agent_rpc as oa_rpc +from opflexagent import rpc as o_rpc from oslo_log import log +import oslo_messaging from oslo_serialization import jsonutils from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import constants @@ -151,8 +155,49 @@ EndpointTrunkInfo = namedtuple( 'segmentation_id']) +class TopologyRpcEndpoint(object): + + target = oslo_messaging.Target(version=oa_rpc.VERSION) + + def __init__(self, mechanism_driver): + self.md = mechanism_driver + + @db_api.retry_if_session_inactive() + def update_link(self, context, *args, **kwargs): + context._session = db_api.get_writer_session() + return self.md.update_link(context, *args, **kwargs) + + @db_api.retry_if_session_inactive() + def delete_link(self, context, *args, **kwargs): + # Don't take any action on link deletion in order to tolerate + # situations like fabric upgrade or flapping links. Old links + # are removed once a specific host is attached somewhere else. + # To completely decommission the host, aimctl can be used to + # cleanup the hostlink table. + return + + class ApicRpcHandlerMixin(object): + def _start_rpc_listeners(self): + conn = n_rpc.create_connection() + + # Opflex RPC handler. + if self.enable_new_rpc: + conn.create_consumer( + o_rpc.TOPIC_OPFLEX, + [o_rpc.GBPServerRpcCallback(self, self.notifier)], + fanout=False) + + # Topology RPC hander. + conn.create_consumer( + oa_rpc.TOPIC_APIC_SERVICE, + [TopologyRpcEndpoint(self)], + fanout=False) + + # Start listeners and return list of servers. + return conn.consume_in_threads() + # The following five methods handle RPCs from the Opflex agent. # # REVISIT: These handler methods are currently called by @@ -232,7 +277,12 @@ class ApicRpcHandlerMixin(object): # implementation from get_vrf_details() to this method. return self.get_vrf_details(context, kwargs) - # REVISIT: def ip_address_owner_update(self, context, **kwargs): + def ip_address_owner_update(self, context, **kwargs): + LOG.debug("APIC AIM MD handling ip_address_owner_update for: %s", + kwargs) + # REVISIT: Move actual handler implementation to this class. + if self.gbp_driver: + self.gbp_driver.ip_address_owner_update(context, **kwargs) @db_api.retry_if_session_inactive() def _get_vrf_details(self, context, vrf_id): diff --git a/gbpservice/neutron/plugins/ml2plus/managers.py b/gbpservice/neutron/plugins/ml2plus/managers.py index ef05a990d..21d110b41 100644 --- a/gbpservice/neutron/plugins/ml2plus/managers.py +++ b/gbpservice/neutron/plugins/ml2plus/managers.py @@ -93,6 +93,13 @@ class MechanismManager(managers.MechanismManager): errors=errors ) + def start_rpc_listeners(self): + servers = [] + for driver in self.ordered_mech_drivers: + if isinstance(driver.obj, driver_api.MechanismDriver): + servers.extend(driver.obj.start_rpc_listeners()) + return servers + def ensure_tenant(self, plugin_context, tenant_id): for driver in self.ordered_mech_drivers: if isinstance(driver.obj, driver_api.MechanismDriver): diff --git a/gbpservice/neutron/plugins/ml2plus/plugin.py b/gbpservice/neutron/plugins/ml2plus/plugin.py index e5a208fc7..f1fbd9de3 100644 --- a/gbpservice/neutron/plugins/ml2plus/plugin.py +++ b/gbpservice/neutron/plugins/ml2plus/plugin.py @@ -137,6 +137,11 @@ class Ml2PlusPlugin(ml2_plugin.Ml2Plugin, self._verify_service_plugins_requirements() LOG.info("Modular L2 Plugin (extended) initialization complete") + def start_rpc_listeners(self): + servers = super(Ml2PlusPlugin, self).start_rpc_listeners() + servers.extend(self.mechanism_manager.start_rpc_listeners()) + return servers + def _handle_security_group_change(self, resource, event, trigger, **kwargs): if 'payload' in kwargs: diff --git a/gbpservice/neutron/services/grouppolicy/drivers/cisco/apic/aim_mapping.py b/gbpservice/neutron/services/grouppolicy/drivers/cisco/apic/aim_mapping.py index c10680675..1a017474f 100644 --- a/gbpservice/neutron/services/grouppolicy/drivers/cisco/apic/aim_mapping.py +++ b/gbpservice/neutron/services/grouppolicy/drivers/cisco/apic/aim_mapping.py @@ -159,6 +159,8 @@ class AIMMappingDriver(nrd.CommonNeutronBase, aim_rpc.AIMMappingRPCMixin): @log.log_method_call def start_rpc_listeners(self): + if self.aim_mech_driver.enable_new_rpc: + return [] return self.setup_opflex_rpc_listeners() def validate_state(self, repair): diff --git a/gbpservice/neutron/tests/unit/plugins/ml2plus/drivers/mechanism_logger.py b/gbpservice/neutron/tests/unit/plugins/ml2plus/drivers/mechanism_logger.py index 6d73d982c..99921387b 100644 --- a/gbpservice/neutron/tests/unit/plugins/ml2plus/drivers/mechanism_logger.py +++ b/gbpservice/neutron/tests/unit/plugins/ml2plus/drivers/mechanism_logger.py @@ -32,6 +32,10 @@ class LoggerPlusMechanismDriver(driver_api.MechanismDriver, def initialize(self): LOG.info("initialize called") + def start_rpc_listeners(self): + LOG.info("start_rpc_listeners called") + return [] + def ensure_tenant(self, plugin_context, tenant_id): LOG.info("ensure_tenant called with tenant_id %s", tenant_id) diff --git a/gbpservice/neutron/tests/unit/plugins/ml2plus/test_apic_aim.py b/gbpservice/neutron/tests/unit/plugins/ml2plus/test_apic_aim.py index 8a54b0384..fbff6b960 100644 --- a/gbpservice/neutron/tests/unit/plugins/ml2plus/test_apic_aim.py +++ b/gbpservice/neutron/tests/unit/plugins/ml2plus/test_apic_aim.py @@ -285,7 +285,6 @@ class ApicAimTestCase(test_address_scope.AddressScopeTestCase, self.saved_keystone_client = ksc_client.Client ksc_client.Client = FakeKeystoneClient self.plugin = directory.get_plugin() - self.plugin.start_rpc_listeners() self.driver = self.plugin.mechanism_manager.mech_drivers[ 'apic_aim'].obj self.l3_plugin = directory.get_plugin(n_constants.L3) @@ -471,6 +470,37 @@ class ApicAimTestCase(test_address_scope.AddressScopeTestCase, return verify +class TestRpcListeners(ApicAimTestCase): + @staticmethod + def _consume_in_threads(self): + return self.servers + + # REVISIT: Remove new_rpc option with old RPC cleanup. + def _test_start_rpc_listeners(self, new_rpc): + # Override mock from + # neutron.tests.base.BaseTestCase.setup_rpc_mocks(), so that + # it returns servers, but still avoids starting them. + with mock.patch('neutron.common.rpc.Connection.consume_in_threads', + TestRpcListeners._consume_in_threads): + # Call plugin method and verify that the apic_aim MD's + # RPC servers are returned. + servers = self.plugin.start_rpc_listeners() + topics = [server._target.topic for server in servers] + self.assertIn('apic-service', topics) + if new_rpc: + self.assertIn('opflex', topics) + else: + self.assertNotIn('opflex', topics) + + def test_start_rpc_listeners(self): + self.driver.enable_new_rpc = False + self._test_start_rpc_listeners(False) + + def test_start_rpc_listeners_new_rpc(self): + self.driver.enable_new_rpc = True + self._test_start_rpc_listeners(True) + + class TestAimMapping(ApicAimTestCase): def setUp(self): self.call_wrapper = CallRecordWrapper() diff --git a/gbpservice/neutron/tests/unit/plugins/ml2plus/test_plugin.py b/gbpservice/neutron/tests/unit/plugins/ml2plus/test_plugin.py index e89e43f5c..d92c1da8a 100644 --- a/gbpservice/neutron/tests/unit/plugins/ml2plus/test_plugin.py +++ b/gbpservice/neutron/tests/unit/plugins/ml2plus/test_plugin.py @@ -17,6 +17,8 @@ import mock import testtools from neutron.api import extensions +from neutron.common import rpc as n_rpc +from neutron.common import topics from neutron.conf.plugins.ml2 import config # noqa from neutron.conf.plugins.ml2.drivers import driver_type from neutron.tests.unit.api import test_extensions @@ -57,7 +59,6 @@ class Ml2PlusPluginV2TestCase(test_address_scope.AddressScopeTestCase): self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr) self.port_create_status = 'DOWN' self.plugin = directory.get_plugin() - self.plugin.start_rpc_listeners() def exist_checker(self, getter): def verify(context): @@ -67,6 +68,40 @@ class Ml2PlusPluginV2TestCase(test_address_scope.AddressScopeTestCase): return verify +class TestRpcListeners(Ml2PlusPluginV2TestCase): + @staticmethod + def _consume_in_threads(self): + return self.servers + + @staticmethod + def _start_rpc_listeners(self): + conn = n_rpc.create_connection() + conn.create_consumer('q-test-topic', []) + return conn.consume_in_threads() + + def test_start_rpc_listeners(self): + # Override mock from + # neutron.tests.base.BaseTestCase.setup_rpc_mocks(), so that + # it returns servers, but still avoids starting them. + with mock.patch('neutron.common.rpc.Connection.consume_in_threads', + TestRpcListeners._consume_in_threads): + # Mock logger MD to start an RPC listener. + with mock.patch( + 'gbpservice.neutron.tests.unit.plugins.ml2plus.drivers.' + 'mechanism_logger.LoggerPlusMechanismDriver.' + 'start_rpc_listeners', + TestRpcListeners._start_rpc_listeners): + # Call plugin method and verify that the base ML2 + # servers as well as the test MD server are returned. + servers = self.plugin.start_rpc_listeners() + self.assertEqual(sorted([topics.PLUGIN, + topics.SERVER_RESOURCE_VERSIONS, + topics.REPORTS, + 'q-test-topic']), + sorted([server._target.topic + for server in servers])) + + class TestEnsureTenant(Ml2PlusPluginV2TestCase): def test_network(self): with mock.patch.object(mech_logger.LoggerPlusMechanismDriver,