diff --git a/neutron/agent/l2/extensions/local_ip.py b/neutron/agent/l2/extensions/local_ip.py new file mode 100644 index 00000000000..08320d23916 --- /dev/null +++ b/neutron/agent/l2/extensions/local_ip.py @@ -0,0 +1,135 @@ +# Copyright 2021 Huawei, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections +import sys + +from neutron_lib.agent import l2_extension +from neutron_lib.callbacks import events as lib_events +from neutron_lib.callbacks import registry as lib_registry +from neutron_lib import context as lib_ctx +from oslo_log import log as logging + +from neutron.api.rpc.callbacks.consumer import registry +from neutron.api.rpc.callbacks import events +from neutron.api.rpc.callbacks import resources +from neutron.api.rpc.handlers import resources_rpc +from neutron.plugins.ml2.drivers.openvswitch.agent.common import ( + constants as ovs_constants) + +LOG = logging.getLogger(__name__) + + +class LocalIPAgentExtension(l2_extension.L2AgentExtension): + SUPPORTED_RESOURCE_TYPES = [resources.LOCAL_IP_ASSOCIATION] + + def initialize(self, connection, driver_type): + if driver_type != ovs_constants.EXTENSION_DRIVER_TYPE: + LOG.error('Local IP extension is only supported for OVS, ' + 'currently uses %(driver_type)s', + {'driver_type': driver_type}) + sys.exit(1) + + self.resource_rpc = resources_rpc.ResourcesPullRpcApi() + self._register_rpc_consumers(connection) + + self.local_ip_updates = { + 'added': collections.defaultdict(dict), + 'deleted': collections.defaultdict(dict) + } + + self._pull_all_local_ip_associations() + + def _pull_all_local_ip_associations(self): + context = lib_ctx.get_admin_context_without_session() + + assoc_list = self.resource_rpc.bulk_pull( + context, resources.LOCAL_IP_ASSOCIATION) + for assoc in assoc_list: + port_id = assoc.fixed_port_id + lip_id = assoc.local_ip_id + self.local_ip_updates['added'][port_id][lip_id] = assoc + # No need to notify "port updated" here as on restart agent + # handles all ports anyway + + def consume_api(self, agent_api): + """Allows an extension to gain access to resources internal to the + neutron agent and otherwise unavailable to the extension. + """ + self.agent_api = agent_api + + def _register_rpc_consumers(self, connection): + """Allows an extension to receive notifications of updates made to + items of interest. + """ + endpoints = [resources_rpc.ResourcesPushRpcCallback()] + for resource_type in self.SUPPORTED_RESOURCE_TYPES: + # We assume that the neutron server always broadcasts the latest + # version known to the agent + registry.register(self._handle_notification, resource_type) + topic = resources_rpc.resource_type_versioned_topic(resource_type) + connection.create_consumer(topic, endpoints, fanout=True) + + def _handle_notification(self, context, resource_type, + local_ip_associations, event_type): + if resource_type != resources.LOCAL_IP_ASSOCIATION: + LOG.warning("Only Local IP Association notifications are " + "supported, got: %s", resource_type) + return + + LOG.info("Local IP Association notification received: %s, %s", + local_ip_associations, event_type) + for assoc in local_ip_associations: + port_id = assoc.fixed_port_id + lip_id = assoc.local_ip_id + if event_type in [events.CREATED, events.UPDATED]: + self.local_ip_updates['added'][port_id][lip_id] = assoc + elif event_type == events.DELETED: + self.local_ip_updates['deleted'][port_id][lip_id] = assoc + self.local_ip_updates['added'][port_id].pop(lip_id, None) + + # Notify agent about port update to handle Local IP flows + self._notify_port_updated(context, port_id) + + def _notify_port_updated(self, context, port_id): + payload = lib_events.DBEventPayload( + context, metadata={'changed_fields': {'local_ip'}}, + resource_id=port_id, states=(None,)) + lib_registry.publish(resources.PORT, lib_events.AFTER_UPDATE, + self, payload=payload) + + def handle_port(self, context, port): + """Handle Local IP associations for a port. + """ + port_id = port['port_id'] + local_ip_updates = self._pop_local_ip_updates_for_port(port_id) + for assoc in local_ip_updates['added'].values(): + LOG.info("Local IP added for port %s: %s", + port_id, assoc.local_ip) + # TBD + for assoc in local_ip_updates['deleted'].values(): + LOG.info("Local IP deleted from port %s: %s", + port_id, assoc.local_ip) + # TBD + + def _pop_local_ip_updates_for_port(self, port_id): + return { + 'added': self.local_ip_updates['added'].pop(port_id, {}), + 'deleted': self.local_ip_updates['deleted'].pop(port_id, {}) + } + + def delete_port(self, context, port): + self.local_ip_updates['added'].pop(port['port_id'], None) + self.local_ip_updates['deleted'].pop(port['port_id'], None) diff --git a/neutron/api/rpc/callbacks/resources.py b/neutron/api/rpc/callbacks/resources.py index 9c46f1cb088..0517fe7243c 100644 --- a/neutron/api/rpc/callbacks/resources.py +++ b/neutron/api/rpc/callbacks/resources.py @@ -13,6 +13,7 @@ from neutron._i18n import _ from neutron.objects import address_group from neutron.objects import conntrack_helper +from neutron.objects import local_ip from neutron.objects.logapi import logging_resource as log_object from neutron.objects import network from neutron.objects import port_forwarding @@ -36,6 +37,7 @@ SECURITYGROUPRULE = securitygroup.SecurityGroupRule.obj_name() PORTFORWARDING = port_forwarding.PortForwarding.obj_name() CONNTRACKHELPER = conntrack_helper.ConntrackHelper.obj_name() ADDRESSGROUP = address_group.AddressGroup.obj_name() +LOCAL_IP_ASSOCIATION = local_ip.LocalIPAssociation.obj_name() _VALID_CLS = ( @@ -51,6 +53,7 @@ _VALID_CLS = ( port_forwarding.PortForwarding, conntrack_helper.ConntrackHelper, address_group.AddressGroup, + local_ip.LocalIPAssociation, ) _TYPE_TO_CLS_MAP = {cls.obj_name(): cls for cls in _VALID_CLS} diff --git a/neutron/services/local_ip/local_ip_plugin.py b/neutron/services/local_ip/local_ip_plugin.py index 1069f248ea1..642be308b2a 100644 --- a/neutron/services/local_ip/local_ip_plugin.py +++ b/neutron/services/local_ip/local_ip_plugin.py @@ -15,6 +15,8 @@ from neutron_lib.api.definitions import local_ip as local_ip_apidef +from neutron.api.rpc.callbacks import events as rpc_events +from neutron.api.rpc.handlers import resources_rpc from neutron.db import local_ip_db @@ -26,3 +28,21 @@ class LocalIPPlugin(local_ip_db.LocalIPDbMixin): __native_pagination_support = True __native_sorting_support = True __filter_validation_support = True + + def __init__(self): + super(LocalIPPlugin, self).__init__() + self._resource_rpc = resources_rpc.ResourcesPushRpcApi() + + def create_local_ip_port_association(self, context, local_ip_id, + port_association): + lip_assoc = self._create_local_ip_port_association( + context, local_ip_id, port_association) + self._resource_rpc.push(context, [lip_assoc], rpc_events.CREATED) + return self._make_local_ip_assoc_dict(lip_assoc) + + def delete_local_ip_port_association(self, context, fixed_port_id, + local_ip_id): + lip_assoc = super( + LocalIPPlugin, self).delete_local_ip_port_association( + context, fixed_port_id, local_ip_id) + self._resource_rpc.push(context, [lip_assoc], rpc_events.DELETED) diff --git a/neutron/tests/unit/agent/l2/extensions/test_local_ip.py b/neutron/tests/unit/agent/l2/extensions/test_local_ip.py new file mode 100644 index 00000000000..a6960bc2d85 --- /dev/null +++ b/neutron/tests/unit/agent/l2/extensions/test_local_ip.py @@ -0,0 +1,145 @@ +# Copyright 2021 Huawei, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from unittest import mock + +from neutron_lib.callbacks import events as lib_events +from neutron_lib.callbacks import registry as lib_registry +from neutron_lib import context +from oslo_utils import uuidutils + +from neutron.agent.l2.extensions import local_ip as local_ip_ext +from neutron.api.rpc.callbacks import events +from neutron.api.rpc.callbacks import resources +from neutron.objects import local_ip as lip_obj +from neutron.plugins.ml2.drivers.openvswitch.agent \ + import ovs_agent_extension_api as ovs_ext_api +from neutron.tests import base + + +class LocalIPAgentExtensionTestCase(base.BaseTestCase): + + def setUp(self): + super(LocalIPAgentExtensionTestCase, self).setUp() + self.context = context.get_admin_context_without_session() + self.local_ip_ext = local_ip_ext.LocalIPAgentExtension() + + self.int_br = mock.Mock() + self.tun_br = mock.Mock() + self.plugin_rpc = mock.Mock() + self.agent_api = ovs_ext_api.OVSAgentExtensionAPI( + self.int_br, + self.tun_br, + phys_brs=None, + plugin_rpc=self.plugin_rpc) + self.local_ip_ext.consume_api(self.agent_api) + with mock.patch.object( + self.local_ip_ext, '_pull_all_local_ip_associations'): + self.local_ip_ext.initialize(mock.Mock(), 'ovs') + + def _generate_test_lip_associations(self, count=2): + return [lip_obj.LocalIPAssociation( + fixed_port_id=uuidutils.generate_uuid(), + local_ip_id=uuidutils.generate_uuid(), + local_ip=lip_obj.LocalIP()) for _ in range(count) + ] + + def test_pulling_lip_associations_on_init(self): + res_rpc = mock.Mock() + lip_assocs = self._generate_test_lip_associations() + with mock.patch('neutron.api.rpc.handlers.' + 'resources_rpc.ResourcesPullRpcApi') as res_rpc_cls: + res_rpc_cls.return_value = res_rpc + res_rpc.bulk_pull.return_value = lip_assocs + self.local_ip_ext.initialize(mock.Mock(), 'ovs') + + res_rpc.bulk_pull.assert_called_once_with( + mock.ANY, resources.LOCAL_IP_ASSOCIATION) + + for assoc in lip_assocs: + self.assertEqual( + assoc, self.local_ip_ext.local_ip_updates[ + 'added'][assoc.fixed_port_id][assoc.local_ip_id]) + + def test_notify_port_updated(self): + with mock.patch.object(lib_registry, "publish") as publish_mock: + port_id = 'test' + self.local_ip_ext._notify_port_updated( + self.context, port_id=port_id) + publish_mock.assert_called_once_with( + resources.PORT, lib_events.AFTER_UPDATE, + self.local_ip_ext, payload=mock.ANY) + actual_payload = publish_mock.call_args[1]['payload'] + self.assertEqual(port_id, actual_payload.resource_id) + self.assertEqual({'changed_fields': {'local_ip'}}, + actual_payload.metadata) + + def test_handle_updated_notification(self): + lip_assocs = self._generate_test_lip_associations() + with mock.patch.object( + self.local_ip_ext, + "_notify_port_updated") as port_update_notify: + self.local_ip_ext._handle_notification( + self.context, resources.LOCAL_IP_ASSOCIATION, + lip_assocs, events.UPDATED) + + for assoc in lip_assocs: + self.assertEqual( + assoc, self.local_ip_ext.local_ip_updates[ + 'added'][assoc.fixed_port_id][assoc.local_ip_id]) + port_update_notify.assert_any_call( + self.context, assoc.fixed_port_id) + + return lip_assocs + + def test_handle_deleted_notification(self, lip_assocs=None): + lip_assocs = lip_assocs or self.test_handle_updated_notification() + with mock.patch.object( + self.local_ip_ext, + "_notify_port_updated") as port_update_notify: + self.local_ip_ext._handle_notification( + self.context, resources.LOCAL_IP_ASSOCIATION, + lip_assocs, events.DELETED) + for assoc in lip_assocs: + self.assertEqual({}, self.local_ip_ext.local_ip_updates[ + 'added'][assoc.fixed_port_id]) + self.assertEqual( + assoc, self.local_ip_ext.local_ip_updates[ + 'deleted'][assoc.fixed_port_id][assoc.local_ip_id]) + port_update_notify.assert_any_call( + self.context, assoc.fixed_port_id) + + def test_handle_port(self): + lip_assocs = self.test_handle_updated_notification() + for assoc in lip_assocs: + port = {'port_id': assoc.fixed_port_id} + self.local_ip_ext.handle_port(self.context, port) + self.assertEqual({}, self.local_ip_ext.local_ip_updates[ + 'added'][assoc.fixed_port_id]) + self.test_handle_deleted_notification(lip_assocs) + for assoc in lip_assocs: + port = {'port_id': assoc.fixed_port_id} + self.local_ip_ext.handle_port(self.context, port) + self.assertEqual({}, self.local_ip_ext.local_ip_updates[ + 'deleted'][assoc.fixed_port_id]) + + def test_delete_port(self): + lip_assocs = self.test_handle_updated_notification() + for assoc in lip_assocs: + port = {'port_id': assoc.fixed_port_id} + self.local_ip_ext.delete_port(self.context, port) + + self.assertEqual({}, self.local_ip_ext.local_ip_updates['added']) + self.assertEqual({}, self.local_ip_ext.local_ip_updates['added']) diff --git a/neutron/tests/unit/extensions/test_local_ip.py b/neutron/tests/unit/extensions/test_local_ip.py index 6b42d26acfa..4fd237dbf06 100644 --- a/neutron/tests/unit/extensions/test_local_ip.py +++ b/neutron/tests/unit/extensions/test_local_ip.py @@ -14,6 +14,7 @@ # under the License. import contextlib +from unittest import mock import netaddr from neutron_lib.api.definitions import local_ip as apidef @@ -97,6 +98,8 @@ class TestLocalIP(LocalIPTestBase): ext_mgr = LocalIPTestExtensionManager() svc_plugins = ( 'neutron.services.local_ip.local_ip_plugin.LocalIPPlugin',) + mock.patch("neutron.api.rpc.handlers.resources_rpc." + "ResourcesPushRpcApi.push").start() super(TestLocalIP, self).setUp(ext_mgr=ext_mgr, service_plugins=svc_plugins) diff --git a/setup.cfg b/setup.cfg index dc99f9a417e..865ad465379 100644 --- a/setup.cfg +++ b/setup.cfg @@ -130,6 +130,7 @@ neutron.agent.l2.extensions = fdb = neutron.agent.l2.extensions.fdb_population:FdbPopulationAgentExtension log = neutron.services.logapi.agent.log_extension:LoggingExtension dhcp = neutron.agent.l2.extensions.dhcp.extension:DHCPAgentExtension + local_ip = neutron.agent.l2.extensions.local_ip:LocalIPAgentExtension neutron.agent.l3.extensions = fip_qos = neutron.agent.l3.extensions.qos.fip:FipQosAgentExtension gateway_ip_qos = neutron.agent.l3.extensions.qos.gateway_ip:RouterGatewayIPQosAgentExtension