Merge "Prevent some L3 ports change IP address"

This commit is contained in:
Zuul 2018-12-14 12:11:00 +00:00 committed by Gerrit Code Review
commit efab816b40
9 changed files with 224 additions and 43 deletions

View File

@ -1439,6 +1439,8 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
# conflict, bubble up a retry instead that should bring things
# back to sanity.
raise os_db_exc.RetryRequest(e)
except ipam_exc.IPAddressChangeNotAllowed as e:
raise exc.BadRequest(resource='ports', msg=e)
return self._make_port_dict(db_port)
@db_api.retry_if_session_inactive()

View File

@ -20,6 +20,9 @@ from neutron_lib.api.definitions import portbindings
from neutron_lib import constants
from neutron_lib.db import api as db_api
from neutron_lib import exceptions as n_exc
from neutron_lib.plugins import constants as plugin_consts
from neutron_lib.plugins import directory
from oslo_db import exception as db_exc
from oslo_log import log as logging
from oslo_utils import excutils
@ -37,6 +40,20 @@ from neutron.objects import subnet as obj_subnet
LOG = logging.getLogger(__name__)
def get_ip_update_not_allowed_device_owner_list():
l3plugin = directory.get_plugin(plugin_consts.L3)
# The following list is for IPAM to prevent direct update of port
# IP address. Currently it only has some L3 related types.
# L2 plugin can add the same list here, but for now it is not required.
return getattr(l3plugin, 'IP_UPDATE_NOT_ALLOWED_LIST', [])
def is_neutron_built_in_router(context, router_id):
l3plugin = directory.get_plugin(plugin_consts.L3)
return bool(l3plugin and
l3plugin.router_supports_scheduling(context, router_id))
class IpamPluggableBackend(ipam_backend_mixin.IpamBackendMixin):
def _get_failed_ips(self, all_ips, success_ips):
@ -287,6 +304,20 @@ class IpamPluggableBackend(ipam_backend_mixin.IpamBackendMixin):
return fixed_ip_list
def _check_ip_changed_by_version(self, context, ip_list, version):
for ip in ip_list:
ip_address = ip.get('ip_address')
subnet_id = ip.get('subnet_id')
if ip_address:
ip_addr = netaddr.IPAddress(ip_address)
if ip_addr.version == version:
return True
elif subnet_id:
subnet = obj_subnet.Subnet.get_object(context, id=subnet_id)
if subnet and subnet.ip_version == version:
return True
return False
def _update_ips_for_port(self, context, port, host,
original_ips, new_ips, mac):
"""Add or remove IPs from the port. IPAM version"""
@ -294,6 +325,16 @@ class IpamPluggableBackend(ipam_backend_mixin.IpamBackendMixin):
removed = []
changes = self._get_changed_ips_for_port(
context, original_ips, new_ips, port['device_owner'])
not_allowed_list = get_ip_update_not_allowed_device_owner_list()
if (port['device_owner'] in not_allowed_list and
is_neutron_built_in_router(context, port['device_id'])):
ip_v4_changed = self._check_ip_changed_by_version(
context, changes.remove + changes.add,
constants.IP_VERSION_4)
if ip_v4_changed:
raise ipam_exc.IPAddressChangeNotAllowed(port_id=port['id'])
try:
subnets = self._ipam_get_subnets(
context, network_id=port['network_id'], host=host,

View File

@ -0,0 +1,18 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.api.definitions import l3_port_ip_change_not_allowed as apidef
from neutron_lib.api import extensions
class L3_port_ip_change_not_allowed(extensions.APIExtensionDescriptor):
api_definition = apidef

View File

@ -54,6 +54,10 @@ class InvalidSubnetRequest(exceptions.BadRequest):
"%(reason)s")
class IPAddressChangeNotAllowed(exceptions.BadRequest):
message = _("IP updates for port %(port_id)s are not allowed")
class AllocationOnAutoAddressSubnet(exceptions.InvalidInput):
message = _("IPv6 address %(ip)s cannot be directly "
"assigned to a port on subnet %(subnet_id)s as the "

View File

@ -87,12 +87,20 @@ class L3RouterPlugin(service_base.ServicePluginBase,
"l3-ha", "router_availability_zone",
"l3-flavors", "qos-fip",
"fip-port-details", "floatingip-pools",
"qos-gateway-ip"]
"qos-gateway-ip",
"l3-port-ip-change-not-allowed"]
__native_pagination_support = True
__native_sorting_support = True
__filter_validation_support = True
IP_UPDATE_NOT_ALLOWED_LIST = [
n_const.DEVICE_OWNER_ROUTER_INTF,
n_const.DEVICE_OWNER_ROUTER_HA_INTF,
n_const.DEVICE_OWNER_HA_REPLICATED_INT,
n_const.DEVICE_OWNER_ROUTER_SNAT,
n_const.DEVICE_OWNER_DVR_INTERFACE]
@resource_registry.tracked_resources(router=l3_models.Router,
floatingip=l3_models.FloatingIP)
def __init__(self):

View File

@ -25,6 +25,7 @@ NETWORK_API_EXTENSIONS+=",ip-substring-filtering"
NETWORK_API_EXTENSIONS+=",l3-flavors"
NETWORK_API_EXTENSIONS+=",l3-ha"
NETWORK_API_EXTENSIONS+=",l3_agent_scheduler"
NETWORK_API_EXTENSIONS+=",l3-port-ip-change-not-allowed"
NETWORK_API_EXTENSIONS+=",logging"
NETWORK_API_EXTENSIONS+=",metering"
NETWORK_API_EXTENSIONS+=",multi-provider"

View File

@ -36,11 +36,13 @@ from neutron.objects import agent as agent_obj
from neutron.objects import l3agent as rb_obj
from neutron.objects import router as router_obj
from neutron.tests.unit.db import test_db_base_plugin_v2
from neutron.tests.unit.extensions import test_l3
_uuid = uuidutils.generate_uuid
class FakeL3Plugin(common_db_mixin.CommonDbMixin,
class FakeL3Plugin(test_l3.TestL3PluginBaseAttributes,
common_db_mixin.CommonDbMixin,
l3_dvr_db.L3_NAT_with_dvr_db_mixin,
l3_dvrscheduler_db.L3_DVRsch_db_mixin,
agents_db.AgentDbMixin):
@ -663,6 +665,28 @@ class L3DvrTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
self.assertEqual(1, len(csnat_ports))
self.assertEqual(2, len(csnat_ports[0]['fixed_ips']))
def _test_update_router_interface_port_ip_not_allowed(self, device_owner):
router, subnet_v4, subnet_v6 = self._setup_router_with_v4_and_v6()
device_filter = {'device_owner': [device_owner]}
ports = self.core_plugin.get_ports(self.ctx, filters=device_filter)
self.assertRaises(
exceptions.BadRequest,
self.core_plugin.update_port,
self.ctx, ports[0]['id'],
{'port': {'fixed_ips': [
{'ip_address': "20.0.0.100",
'subnet_id': subnet_v4['subnet']['id']},
{'ip_address': "20.0.0.101",
'subnet_id': subnet_v4['subnet']['id']}]}})
def test_update_router_centralized_snat_port_ip_not_allowed(self):
self._test_update_router_interface_port_ip_not_allowed(
const.DEVICE_OWNER_ROUTER_SNAT)
def test_update_router_interface_distributed_port_ip_not_allowed(self):
self._test_update_router_interface_port_ip_not_allowed(
const.DEVICE_OWNER_DVR_INTERFACE)
def test_remove_router_interface_csnat_ports_removal(self):
router_dict = {'name': 'test_router', 'admin_state_up': True,
'distributed': True}
@ -797,24 +821,6 @@ class L3DvrTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
self.ctx, filters=dvr_filters)
self.assertEqual(1, len(dvr_ports))
def test_remove_router_interface_csnat_port_missing_ip(self):
# NOTE(kevinbenton): this is a contrived scenario to reproduce
# a condition observed in bug/1609540. Once we figure out why
# these ports lose their IP we can remove this test.
router, subnet_v4, subnet_v6 = self._setup_router_with_v4_and_v6()
self.mixin.remove_router_interface(
self.ctx, router['id'],
{'subnet_id': subnet_v4['subnet']['id']})
csnat_filters = {'device_owner':
[const.DEVICE_OWNER_ROUTER_SNAT]}
csnat_ports = self.core_plugin.get_ports(
self.ctx, filters=csnat_filters)
self.core_plugin.update_port(self.ctx, csnat_ports[0]['id'],
{'port': {'fixed_ips': []}})
self.mixin.remove_router_interface(
self.ctx, router['id'],
{'subnet_id': subnet_v6['subnet']['id']})
def test__validate_router_migration_notify_advanced_services(self):
router = {'name': 'foo_router', 'admin_state_up': False}
router_db = self._create_router(router)

View File

@ -48,12 +48,14 @@ from neutron.objects import l3_hamode
from neutron.scheduler import l3_agent_scheduler
from neutron.services.revisions import revision_plugin
from neutron.tests.common import helpers
from neutron.tests.unit.extensions import test_l3
from neutron.tests.unit import testlib_api
_uuid = uuidutils.generate_uuid
class FakeL3PluginWithAgents(common_db_mixin.CommonDbMixin,
class FakeL3PluginWithAgents(test_l3.TestL3PluginBaseAttributes,
common_db_mixin.CommonDbMixin,
l3_hamode_db.L3_HA_NAT_db_mixin,
l3_agentschedulers_db.L3AgentSchedulerDbMixin,
agents_db.AgentDbMixin):
@ -637,6 +639,28 @@ class L3HATestCase(L3HATestFramework):
networks_after = self.core_plugin.get_networks(self.admin_ctx)
self.assertEqual(networks_before, networks_after)
def test_update_router_ha_interface_port_ip_not_allow(self):
router = self._create_router()
network = self.plugin.get_ha_network(self.admin_ctx,
router['tenant_id'])
self.plugin.add_ha_port(
self.admin_ctx, router['id'], network.network_id,
router['tenant_id'])
device_filter = {
'device_id': [router['id']],
'device_owner': [constants.DEVICE_OWNER_ROUTER_HA_INTF]}
ports = self.core_plugin.get_ports(
self.admin_ctx, filters=device_filter)
port = {"port": {"fixed_ips": [
{"ip_address": "169.254.192.100"},
{"ip_address": "169.254.192.200"}]}}
self.assertRaises(n_exc.BadRequest,
self.core_plugin.update_port,
self.admin_ctx, ports[0]['id'],
port)
def test_ensure_vr_id_and_network_net_exists(self):
router = self._create_router()
router_db = self.plugin._get_router(self.admin_ctx, router['id'])
@ -1086,6 +1110,26 @@ class L3HAModeDbTestCase(L3HATestFramework):
self.admin_ctx, [router['id']])
self.assertEqual(2, len(bindings))
def test_update_ha_router_replicated_interface_port_ip_not_allowed(self):
router = self._create_router()
network_id = self._create_network(self.core_plugin, self.admin_ctx)
subnet = self._create_subnet(self.core_plugin, self.admin_ctx,
network_id)
interface_info = {'subnet_id': subnet['id']}
self.plugin.add_router_interface(self.admin_ctx,
router['id'],
interface_info)
filters = {'device_id': [router['id']],
'device_owner': [constants.DEVICE_OWNER_HA_REPLICATED_INT]}
ports = self.core_plugin.get_ports(self.admin_ctx, filters=filters)
port = {'port': {'fixed_ips': [
{'ip_address': '10.0.0.100'},
{'ip_address': '10.0.0.101'}]}}
self.assertRaises(n_exc.BadRequest,
self.core_plugin.update_port,
self.admin_ctx, ports[0]['id'],
port)
def test_update_router_port_bindings_no_ports(self):
self.plugin._update_router_port_bindings(
self.admin_ctx, {}, self.agent1['host'])

View File

@ -237,8 +237,22 @@ class L3NatExtensionTestCase(test_extensions_base.ExtensionTestCase):
instance.add_router_interface.assert_called_with(mock.ANY, router_id)
class TestL3PluginBaseAttributes(object):
IP_UPDATE_NOT_ALLOWED_LIST = [
lib_constants.DEVICE_OWNER_ROUTER_INTF,
lib_constants.DEVICE_OWNER_ROUTER_HA_INTF,
lib_constants.DEVICE_OWNER_HA_REPLICATED_INT,
lib_constants.DEVICE_OWNER_ROUTER_SNAT,
lib_constants.DEVICE_OWNER_DVR_INTERFACE]
def router_supports_scheduling(self, context, router_id):
return True
# This base plugin class is for tests.
class TestL3NatBasePlugin(db_base_plugin_v2.NeutronDbPluginV2,
class TestL3NatBasePlugin(TestL3PluginBaseAttributes,
db_base_plugin_v2.NeutronDbPluginV2,
external_net_db.External_net_db_mixin):
__native_pagination_support = True
@ -319,7 +333,8 @@ class TestNoL3NatPlugin(TestL3NatBasePlugin):
# A L3 routing service plugin class for tests with plugins that
# delegate away L3 routing functionality
class TestL3NatServicePlugin(common_db_mixin.CommonDbMixin,
class TestL3NatServicePlugin(TestL3PluginBaseAttributes,
common_db_mixin.CommonDbMixin,
l3_dvr_db.L3_NAT_with_dvr_db_mixin,
l3_db.L3_NAT_db_mixin, dns_db.DNSDbMixin):
@ -1402,6 +1417,30 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
None,
p['port']['id'])
def test_update_router_interface_port_ip_not_allowed(self):
with self.router() as r, (
self.port()) as p:
body = self._router_interface_action('add',
r['router']['id'],
None,
p['port']['id'])
self.assertIn('port_id', body)
self.assertEqual(p['port']['id'], body['port_id'])
body = self._show('ports', p['port']['id'])
self.assertEqual(r['router']['id'], body['port']['device_id'])
data = {'port': {'fixed_ips': [
{'ip_address': '1.1.1.1'},
{'ip_address': '2.2.2.2'}]}}
self._update('ports', p['port']['id'], data,
neutron_context=context.get_admin_context(),
expected_code=exc.HTTPBadRequest.code)
self._router_interface_action('remove',
r['router']['id'],
None,
p['port']['id'])
def test_router_add_interface_delete_port_after_failure(self):
with self.router() as r, self.subnet(enable_dhcp=False) as s:
plugin = directory.get_plugin()
@ -3739,26 +3778,44 @@ class L3AgentDbTestCaseBase(L3NatTestCaseMixin):
self.assertEqual(0, len(ifaces))
def test_l3_agent_routers_query_ignore_interfaces_with_moreThanOneIp(self):
with self.router() as r:
with self.subnet(cidr='9.0.1.0/24') as subnet:
with self.port(subnet=subnet,
fixed_ips=[{'ip_address': '9.0.1.3'}]) as p:
self._router_interface_action('add',
r['router']['id'],
None,
p['port']['id'])
port = {'port': {'fixed_ips':
[{'ip_address': '9.0.1.4',
'subnet_id': subnet['subnet']['id']},
{'ip_address': '9.0.1.5',
'subnet_id': subnet['subnet']['id']}]}}
ctx = context.get_admin_context()
self.core_plugin.update_port(ctx, p['port']['id'], port)
routers = self.plugin.get_sync_data(ctx, None)
self.assertEqual(1, len(routers))
interfaces = routers[0].get(lib_constants.INTERFACE_KEY,
[])
self.assertEqual(1, len(interfaces))
with self.router() as r, self.subnet(
cidr='9.0.1.0/24') as subnet, self.port(
subnet=subnet,
fixed_ips=[{'ip_address': '9.0.1.3'}]) as p1, self.port(
subnet=subnet,
fixed_ips=[{'ip_address': '9.0.1.100'},
{'ip_address': '9.0.1.101'}]) as p2:
# Cannot have multiple IPv4 subnets on router port,
# see neutron.db.l3_db line L752-L754.
self._router_interface_action(
'add', r['router']['id'],
None, p2['port']['id'],
expected_code=exc.HTTPBadRequest.code)
self._router_interface_action('add',
r['router']['id'],
None,
p1['port']['id'])
port = {'port': {'fixed_ips':
[{'ip_address': '9.0.1.4',
'subnet_id': subnet['subnet']['id']},
{'ip_address': '9.0.1.5',
'subnet_id': subnet['subnet']['id']}]}}
ctx = context.get_admin_context()
self.assertRaises(
n_exc.BadRequest,
self.core_plugin.update_port,
ctx, p1['port']['id'], port)
routers = self.plugin.get_sync_data(ctx, None)
self.assertEqual(1, len(routers))
interfaces = routers[0].get(lib_constants.INTERFACE_KEY,
[])
self.assertEqual(1, len(interfaces))
self._router_interface_action('remove',
r['router']['id'],
None,
p1['port']['id'])
def test_l3_agent_routers_query_gateway(self):
with self.router() as r: