diff --git a/neutron/tests/unit/agent/test_securitygroups_rpc.py b/neutron/tests/unit/agent/test_securitygroups_rpc.py index 00b492f1792..06cdb729b5b 100644 --- a/neutron/tests/unit/agent/test_securitygroups_rpc.py +++ b/neutron/tests/unit/agent/test_securitygroups_rpc.py @@ -463,6 +463,7 @@ class SGServerRpcCallBackTestCase(test_sg.SecurityGroupDBTestCase): security_groups=[sg2_id]) ports_rest2 = self.deserialize(self.fmt, res2) port_id2 = ports_rest2['port']['id'] + port_fixed_ip2 = ports_rest2['port']['fixed_ips'][0]['ip_address'] ctx = context.get_admin_context() ports_rpc = self.rpc.security_group_rules_for_devices( ctx, devices=devices) @@ -476,7 +477,7 @@ class SGServerRpcCallBackTestCase(test_sg.SecurityGroupDBTestCase): {'direction': 'egress', 'ethertype': const.IPv6, 'security_group_id': sg2_id}, {'direction': u'ingress', - 'source_ip_prefix': u'10.0.0.3/32', + 'source_ip_prefix': port_fixed_ip2 + '/32', 'protocol': const.PROTO_NAME_TCP, 'ethertype': const.IPv4, 'port_range_max': 25, 'port_range_min': 24, @@ -519,6 +520,7 @@ class SGServerRpcCallBackTestCase(test_sg.SecurityGroupDBTestCase): security_groups=[sg2_id]) ports_rest2 = self.deserialize(self.fmt, res2) port_id2 = ports_rest2['port']['id'] + port_ip2 = ports_rest2['port']['fixed_ips'][0]['ip_address'] ctx = context.get_admin_context() ports_rpc = self.rpc.security_group_info_for_devices( ctx, devices=devices) @@ -533,7 +535,7 @@ class SGServerRpcCallBackTestCase(test_sg.SecurityGroupDBTestCase): 'remote_group_id': sg2_id} ]}, 'sg_member_ips': {sg2_id: { - 'IPv4': set([u'10.0.0.3']), + 'IPv4': set([port_ip2]), 'IPv6': set(), }} } @@ -1077,6 +1079,7 @@ class SGServerRpcCallBackTestCase(test_sg.SecurityGroupDBTestCase): fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}], security_groups=[sg2_id]) port_id2 = ports_rest2['port']['id'] + port_ip2 = ports_rest2['port']['fixed_ips'][0]['ip_address'] ctx = context.get_admin_context() ports_rpc = self.rpc.security_group_rules_for_devices( @@ -1091,7 +1094,7 @@ class SGServerRpcCallBackTestCase(test_sg.SecurityGroupDBTestCase): {'direction': 'egress', 'ethertype': const.IPv6, 'security_group_id': sg2_id}, {'direction': 'ingress', - 'source_ip_prefix': '2001:db8::2/128', + 'source_ip_prefix': port_ip2 + '/128', 'protocol': const.PROTO_NAME_TCP, 'ethertype': const.IPv6, 'port_range_max': 25, 'port_range_min': 24, diff --git a/neutron/tests/unit/db/test_db_base_plugin_v2.py b/neutron/tests/unit/db/test_db_base_plugin_v2.py index fb99b2d8750..89622966c39 100644 --- a/neutron/tests/unit/db/test_db_base_plugin_v2.py +++ b/neutron/tests/unit/db/test_db_base_plugin_v2.py @@ -891,14 +891,18 @@ class TestV2HTTPResponse(NeutronDbPluginV2TestCase): class TestPortsV2(NeutronDbPluginV2TestCase): def test_create_port_json(self): keys = [('admin_state_up', True), ('status', self.port_create_status)] - with self.port(name='myname') as port: - for k, v in keys: - self.assertEqual(port['port'][k], v) - self.assertIn('mac_address', port['port']) - ips = port['port']['fixed_ips'] - self.assertEqual(1, len(ips)) - self.assertEqual('10.0.0.2', ips[0]['ip_address']) - self.assertEqual('myname', port['port']['name']) + with self.network(shared=True) as network: + with self.subnet(network=network) as subnet: + with self.port(name='myname') as port: + for k, v in keys: + self.assertEqual(port['port'][k], v) + self.assertIn('mac_address', port['port']) + ips = port['port']['fixed_ips'] + subnet_ip_net = netaddr.IPNetwork(subnet['subnet']['cidr']) + self.assertEqual(1, len(ips)) + self.assertIn(netaddr.IPAddress(ips[0]['ip_address']), + subnet_ip_net) + self.assertEqual('myname', port['port']['name']) def test_create_port_as_admin(self): with self.network() as network: @@ -938,11 +942,10 @@ class TestPortsV2(NeutronDbPluginV2TestCase): def test_create_port_public_network_with_ip(self): with self.network(shared=True) as network: - with self.subnet(network=network, cidr='10.0.0.0/24') as subnet: + ip_net = netaddr.IPNetwork('10.0.0.0/24') + with self.subnet(network=network, cidr=str(ip_net)): keys = [('admin_state_up', True), - ('status', self.port_create_status), - ('fixed_ips', [{'subnet_id': subnet['subnet']['id'], - 'ip_address': '10.0.0.2'}])] + ('status', self.port_create_status)] port_res = self._create_port(self.fmt, network['network']['id'], webob.exc.HTTPCreated.code, @@ -951,6 +954,8 @@ class TestPortsV2(NeutronDbPluginV2TestCase): port = self.deserialize(self.fmt, port_res) for k, v in keys: self.assertEqual(port['port'][k], v) + port_ip = port['port']['fixed_ips'][0]['ip_address'] + self.assertIn(port_ip, ip_net) self.assertIn('mac_address', port['port']) self._delete('ports', port['port']['id']) @@ -1532,7 +1537,9 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s Check that a configured IP 10.0.0.2 is replaced by 10.0.0.10. """ with self.subnet() as subnet: - with self.port(subnet=subnet) as port: + fixed_ip_data = [{'ip_address': '10.0.0.2', + 'subnet_id': subnet['subnet']['id']}] + with self.port(subnet=subnet, fixed_ips=fixed_ip_data) as port: ips = port['port']['fixed_ips'] self.assertEqual(1, len(ips)) self.assertEqual('10.0.0.2', ips[0]['ip_address']) @@ -1550,21 +1557,24 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s def test_update_port_update_ip_address_only(self): with self.subnet() as subnet: - with self.port(subnet=subnet) as port: + ip_address = '10.0.0.2' + fixed_ip_data = [{'ip_address': ip_address, + 'subnet_id': subnet['subnet']['id']}] + with self.port(subnet=subnet, fixed_ips=fixed_ip_data) as port: ips = port['port']['fixed_ips'] self.assertEqual(1, len(ips)) - self.assertEqual('10.0.0.2', ips[0]['ip_address']) + self.assertEqual(ip_address, ips[0]['ip_address']) self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id']) data = {'port': {'fixed_ips': [{'subnet_id': subnet['subnet']['id'], 'ip_address': "10.0.0.10"}, - {'ip_address': "10.0.0.2"}]}} + {'ip_address': ip_address}]}} req = self.new_update_request('ports', data, port['port']['id']) res = self.deserialize(self.fmt, req.get_response(self.api)) ips = res['port']['fixed_ips'] self.assertEqual(2, len(ips)) - self.assertIn({'ip_address': '10.0.0.2', + self.assertIn({'ip_address': ip_address, 'subnet_id': subnet['subnet']['id']}, ips) self.assertIn({'ip_address': '10.0.0.10', 'subnet_id': subnet['subnet']['id']}, ips) @@ -1607,10 +1617,11 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s res['port']['admin_state_up']) ips = res['port']['fixed_ips'] self.assertEqual(2, len(ips)) - self.assertIn({'ip_address': '10.0.0.3', - 'subnet_id': subnet['subnet']['id']}, ips) - self.assertIn({'ip_address': '10.0.0.4', - 'subnet_id': subnet['subnet']['id']}, ips) + self.assertNotEqual(ips[0]['ip_address'], + ips[1]['ip_address']) + network_ip_net = netaddr.IPNetwork(subnet['subnet']['cidr']) + self.assertIn(ips[0]['ip_address'], network_ip_net) + self.assertIn(ips[1]['ip_address'], network_ip_net) def test_update_port_invalid_fixed_ip_address_v6_slaac(self): with self.subnet( @@ -1692,10 +1703,11 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s def test_requested_duplicate_ip(self): with self.subnet() as subnet: + subnet_ip_net = netaddr.IPNetwork(subnet['subnet']['cidr']) with self.port(subnet=subnet) as port: ips = port['port']['fixed_ips'] self.assertEqual(1, len(ips)) - self.assertEqual('10.0.0.2', ips[0]['ip_address']) + self.assertIn(ips[0]['ip_address'], subnet_ip_net) self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id']) # Check configuring of duplicate IP kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'], @@ -1706,10 +1718,12 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s def test_requested_subnet_id(self): with self.subnet() as subnet: + subnet_ip_net = netaddr.IPNetwork(subnet['subnet']['cidr']) with self.port(subnet=subnet) as port: ips = port['port']['fixed_ips'] self.assertEqual(1, len(ips)) - self.assertEqual('10.0.0.2', ips[0]['ip_address']) + self.assertIn(netaddr.IPAddress(ips[0]['ip_address']), + netaddr.IPSet(subnet_ip_net)) self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id']) # Request a IP from specific subnet kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id']}]} @@ -1718,7 +1732,7 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s port2 = self.deserialize(self.fmt, res) ips = port2['port']['fixed_ips'] self.assertEqual(1, len(ips)) - self.assertEqual('10.0.0.3', ips[0]['ip_address']) + self.assertIn(ips[0]['ip_address'], subnet_ip_net) self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id']) self._delete('ports', port2['port']['id']) @@ -1771,23 +1785,43 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s res = self._create_port(self.fmt, net_id=net_id, **kwargs) port3 = self.deserialize(self.fmt, res) ips = port3['port']['fixed_ips'] + cidr_v4 = subnet['subnet']['cidr'] + cidr_v6 = subnet2['subnet']['cidr'] self.assertEqual(2, len(ips)) - self.assertIn({'ip_address': '10.0.0.2', - 'subnet_id': subnet['subnet']['id']}, ips) - self.assertIn({'ip_address': '2607:f0d0:1002:51::2', - 'subnet_id': subnet2['subnet']['id']}, ips) + self._test_requested_port_subnet_ids(ips, + [subnet['subnet']['id'], + subnet2['subnet']['id']]) + self._test_dual_stack_port_ip_addresses_in_subnets(ips, + cidr_v4, + cidr_v6) + res = self._create_port(self.fmt, net_id=net_id) port4 = self.deserialize(self.fmt, res) # Check that a v4 and a v6 address are allocated ips = port4['port']['fixed_ips'] self.assertEqual(2, len(ips)) - self.assertIn({'ip_address': '10.0.0.3', - 'subnet_id': subnet['subnet']['id']}, ips) - self.assertIn({'ip_address': '2607:f0d0:1002:51::3', - 'subnet_id': subnet2['subnet']['id']}, ips) + self._test_requested_port_subnet_ids(ips, + [subnet['subnet']['id'], + subnet2['subnet']['id']]) + self._test_dual_stack_port_ip_addresses_in_subnets(ips, + cidr_v4, + cidr_v6) self._delete('ports', port3['port']['id']) self._delete('ports', port4['port']['id']) + def _test_requested_port_subnet_ids(self, ips, expected_subnet_ids): + self.assertEqual(set(x['subnet_id'] for x in ips), + set(expected_subnet_ids)) + + def _test_dual_stack_port_ip_addresses_in_subnets(self, ips, cidr_v4, + cidr_v6): + ip_net_v4 = netaddr.IPNetwork(cidr_v4) + ip_net_v6 = netaddr.IPNetwork(cidr_v6) + for address in ips: + ip_addr = netaddr.IPAddress(address['ip_address']) + expected_ip_net = ip_net_v4 if ip_addr.version == 4 else ip_net_v6 + self.assertIn(ip_addr, expected_ip_net) + def test_create_port_invalid_fixed_ip_address_v6_pd_slaac(self): with self.network(name='net') as network: subnet = self._make_v6_subnet( @@ -1923,25 +1957,30 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s {'subnet_id': subnet2['subnet']['id']}] ) as port: ips = port['port']['fixed_ips'] + subnet1_net = netaddr.IPNetwork(subnet['subnet']['cidr']) + subnet2_net = netaddr.IPNetwork(subnet2['subnet']['cidr']) + network_ip_set = netaddr.IPSet(subnet1_net) + network_ip_set.add(subnet2_net) self.assertEqual(2, len(ips)) - self.assertIn({'ip_address': '10.0.0.2', - 'subnet_id': subnet['subnet']['id']}, ips) port_mac = port['port']['mac_address'] subnet_cidr = subnet2['subnet']['cidr'] eui_addr = str(ipv6_utils.get_ipv6_addr_by_EUI64( subnet_cidr, port_mac)) + self.assertIn(ips[0]['ip_address'], network_ip_set) + self.assertIn(ips[1]['ip_address'], network_ip_set) self.assertIn({'ip_address': eui_addr, 'subnet_id': subnet2['subnet']['id']}, ips) def test_create_router_port_ipv4_and_ipv6_slaac_no_fixed_ips(self): with self.network() as network: # Create an IPv4 and an IPv6 SLAAC subnet on the network - with self.subnet(network),\ + with self.subnet(network) as subnet_v4,\ self.subnet(network, cidr='2607:f0d0:1002:51::/64', ip_version=6, gateway_ip='fe80::1', ipv6_address_mode=n_const.IPV6_SLAAC): + subnet_ip_net = netaddr.IPNetwork(subnet_v4['subnet']['cidr']) # Create a router port without specifying fixed_ips port = self._make_port( self.fmt, network['network']['id'], @@ -1949,7 +1988,7 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s # Router port should only have an IPv4 address fixed_ips = port['port']['fixed_ips'] self.assertEqual(1, len(fixed_ips)) - self.assertEqual('10.0.0.2', fixed_ips[0]['ip_address']) + self.assertIn(fixed_ips[0]['ip_address'], subnet_ip_net) @staticmethod def _calc_ipv6_addr_by_EUI64(port, subnet): @@ -1974,15 +2013,16 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s subnet = self._make_v6_subnet(network, addr_mode, ipv6_pd) subnet_id = subnet['subnet']['id'] fixed_ips = [{'subnet_id': subnet_id}] + with self.port(subnet=subnet, fixed_ips=fixed_ips) as port: - if addr_mode == n_const.IPV6_SLAAC: - exp_ip_addr = self._calc_ipv6_addr_by_EUI64(port, subnet) - else: - exp_ip_addr = 'fe80::2' port_fixed_ips = port['port']['fixed_ips'] self.assertEqual(1, len(port_fixed_ips)) - self.assertEqual(exp_ip_addr, - port_fixed_ips[0]['ip_address']) + if addr_mode == n_const.IPV6_SLAAC: + exp_ip_addr = self._calc_ipv6_addr_by_EUI64(port, subnet) + self.assertEqual(exp_ip_addr, + port_fixed_ips[0]['ip_address']) + self.assertIn(port_fixed_ips[0]['ip_address'], + netaddr.IPNetwork(subnet['subnet']['cidr'])) def test_create_port_with_ipv6_slaac_subnet_in_fixed_ips(self): self._test_create_port_with_ipv6_subnet_in_fixed_ips( @@ -2182,10 +2222,11 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s def test_requested_invalid_fixed_ips(self): with self.subnet() as subnet: + subnet_ip_net = netaddr.IPNetwork(subnet['subnet']['cidr']) with self.port(subnet=subnet) as port: ips = port['port']['fixed_ips'] self.assertEqual(1, len(ips)) - self.assertEqual('10.0.0.2', ips[0]['ip_address']) + self.assertIn(ips[0]['ip_address'], subnet_ip_net) self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id']) # Test invalid subnet_id kwargs = {"fixed_ips": @@ -2240,41 +2281,6 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s res = self._create_port(self.fmt, net_id=net_id, **kwargs) self.assertEqual(webob.exc.HTTPClientError.code, res.status_int) - def test_requested_split(self): - with self.subnet() as subnet: - with self.port(subnet=subnet) as port: - ports_to_delete = [] - ips = port['port']['fixed_ips'] - self.assertEqual(1, len(ips)) - self.assertEqual('10.0.0.2', ips[0]['ip_address']) - self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id']) - # Allocate specific IP - kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'], - 'ip_address': '10.0.0.5'}]} - net_id = port['port']['network_id'] - res = self._create_port(self.fmt, net_id=net_id, **kwargs) - port2 = self.deserialize(self.fmt, res) - ports_to_delete.append(port2) - ips = port2['port']['fixed_ips'] - self.assertEqual(1, len(ips)) - self.assertEqual('10.0.0.5', ips[0]['ip_address']) - self.assertEqual(subnet['subnet']['id'], ips[0]['subnet_id']) - # Allocate specific IP's - allocated = ['10.0.0.3', '10.0.0.4', '10.0.0.6'] - - for a in allocated: - res = self._create_port(self.fmt, net_id=net_id) - port2 = self.deserialize(self.fmt, res) - ports_to_delete.append(port2) - ips = port2['port']['fixed_ips'] - self.assertEqual(1, len(ips)) - self.assertEqual(a, ips[0]['ip_address']) - self.assertEqual(subnet['subnet']['id'], - ips[0]['subnet_id']) - - for p in ports_to_delete: - self._delete('ports', p['port']['id']) - def test_duplicate_ips(self): with self.subnet() as subnet: # Allocate specific IP @@ -2306,7 +2312,9 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s def test_requested_ips_only(self): with self.subnet() as subnet: - with self.port(subnet=subnet) as port: + fixed_ip_data = [{'ip_address': '10.0.0.2', + 'subnet_id': subnet['subnet']['id']}] + with self.port(subnet=subnet, fixed_ips=fixed_ip_data) as port: ips = port['port']['fixed_ips'] self.assertEqual(1, len(ips)) self.assertEqual('10.0.0.2', ips[0]['ip_address']) diff --git a/neutron/tests/unit/db/test_ipam_pluggable_backend.py b/neutron/tests/unit/db/test_ipam_pluggable_backend.py index 0fcb2a98301..08e2bb83810 100644 --- a/neutron/tests/unit/db/test_ipam_pluggable_backend.py +++ b/neutron/tests/unit/db/test_ipam_pluggable_backend.py @@ -559,9 +559,28 @@ class TestDbBasePluginIpam(test_db_base.NeutronDbPluginV2TestCase): mocks['subnet'].deallocate.assert_called_once_with(auto_ip) def test_recreate_port_ipam(self): - ip = '10.0.0.2' with self.subnet() as subnet: + subnet_cidr = subnet['subnet']['cidr'] with self.port(subnet=subnet) as port: + ips = port['port']['fixed_ips'] + self.assertEqual(1, len(ips)) + orig_ip = ips[0]['ip_address'] + self.assertIn(netaddr.IPAddress(ips[0]['ip_address']), + netaddr.IPSet(netaddr.IPNetwork(subnet_cidr))) + req = self.new_delete_request('ports', port['port']['id']) + res = req.get_response(self.api) + self.assertEqual(webob.exc.HTTPNoContent.code, res.status_int) + with self.port(subnet=subnet, fixed_ips=ips) as port: + ips = port['port']['fixed_ips'] + self.assertEqual(1, len(ips)) + self.assertEqual(orig_ip, ips[0]['ip_address']) + + def test_recreate_port_ipam_specific_ip(self): + with self.subnet() as subnet: + ip = '10.0.0.2' + fixed_ip_data = [{'subnet_id': subnet['subnet']['id'], + 'ip_address': ip}] + with self.port(subnet=subnet, fixed_ips=fixed_ip_data) as port: ips = port['port']['fixed_ips'] self.assertEqual(1, len(ips)) self.assertEqual(ip, ips[0]['ip_address']) diff --git a/neutron/tests/unit/extensions/test_dns.py b/neutron/tests/unit/extensions/test_dns.py index c3ee54e1b5a..fca68c52146 100644 --- a/neutron/tests/unit/extensions/test_dns.py +++ b/neutron/tests/unit/extensions/test_dns.py @@ -23,6 +23,7 @@ from neutron.common import utils from neutron import context from neutron.db import db_base_plugin_v2 from neutron.extensions import dns +from neutron import manager from neutron.tests.unit.db import test_db_base_plugin_v2 @@ -121,10 +122,13 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2): self.assertIn('mac_address', port['port']) ips = port['port']['fixed_ips'] self.assertEqual(1, len(ips)) - self.assertEqual('10.0.0.2', ips[0]['ip_address']) + subnet_db = manager.NeutronManager.get_plugin().get_subnet( + context.get_admin_context(), ips[0]['subnet_id']) + self.assertIn(netaddr.IPAddress(ips[0]['ip_address']), + netaddr.IPSet(netaddr.IPNetwork(subnet_db['cidr']))) self.assertEqual('myname', port['port']['name']) self._verify_dns_assigment(port['port'], - ips_list=['10.0.0.2']) + ips_list=[ips[0]['ip_address']]) def test_list_ports(self): # for this test we need to enable overlapping ips @@ -147,6 +151,7 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2): def test_update_port_non_default_dns_domain_with_dns_name(self): with self.port() as port: + port_ip = port['port']['fixed_ips'][0]['ip_address'] cfg.CONF.set_override('dns_domain', 'example.com') data = {'port': {'admin_state_up': False, 'dns_name': 'vm1'}} req = self.new_update_request('ports', data, port['port']['id']) @@ -154,18 +159,19 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2): self.assertEqual(data['port']['admin_state_up'], res['port']['admin_state_up']) self._verify_dns_assigment(res['port'], - ips_list=['10.0.0.2'], + ips_list=[port_ip], dns_name='vm1') def test_update_port_default_dns_domain_with_dns_name(self): with self.port() as port: + port_ip = port['port']['fixed_ips'][0]['ip_address'] data = {'port': {'admin_state_up': False, 'dns_name': 'vm1'}} req = self.new_update_request('ports', data, port['port']['id']) res = self.deserialize(self.fmt, req.get_response(self.api)) self.assertEqual(data['port']['admin_state_up'], res['port']['admin_state_up']) self._verify_dns_assigment(res['port'], - ips_list=['10.0.0.2']) + ips_list=[port_ip]) def _verify_dns_assigment(self, port, ips_list=None, exp_ips_ipv4=0, exp_ips_ipv6=0, ipv4_cidrs=None, ipv6_cidrs=None, @@ -254,10 +260,10 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2): Check that a configured IP 10.0.0.2 is replaced by 10.0.0.10. """ with self.subnet() as subnet: - with self.port(subnet=subnet) as port: + fixed_ip_data = [{'ip_address': '10.0.0.2'}] + with self.port(subnet=subnet, fixed_ips=fixed_ip_data) as port: ips = port['port']['fixed_ips'] self.assertEqual(1, len(ips)) - self.assertEqual('10.0.0.2', ips[0]['ip_address']) self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id']) data = {'port': {'fixed_ips': [{'subnet_id': subnet['subnet']['id'], @@ -273,10 +279,10 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2): def test_update_port_update_ip_address_only(self): with self.subnet() as subnet: - with self.port(subnet=subnet) as port: + fixed_ip_data = [{'ip_address': '10.0.0.2'}] + with self.port(subnet=subnet, fixed_ips=fixed_ip_data) as port: ips = port['port']['fixed_ips'] self.assertEqual(1, len(ips)) - self.assertEqual('10.0.0.2', ips[0]['ip_address']) self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id']) data = {'port': {'fixed_ips': [{'subnet_id': subnet['subnet']['id'], diff --git a/neutron/tests/unit/extensions/test_extraroute.py b/neutron/tests/unit/extensions/test_extraroute.py index 82c1971339d..c505dd302a2 100644 --- a/neutron/tests/unit/extensions/test_extraroute.py +++ b/neutron/tests/unit/extensions/test_extraroute.py @@ -78,7 +78,8 @@ class ExtraRouteDBTestCaseBase(object): routes = [{'destination': '135.207.0.0/16', 'nexthop': '10.0.1.3'}] with self.router() as r: with self.subnet(cidr='10.0.1.0/24') as s: - with self.port(subnet=s) as p: + fixed_ip_data = [{'ip_address': '10.0.1.2'}] + with self.port(subnet=s, fixed_ips=fixed_ip_data) as p: body = self._routes_update_prepare(r['router']['id'], None, p['port']['id'], routes) @@ -88,8 +89,11 @@ class ExtraRouteDBTestCaseBase(object): def test_route_update_with_external_route(self): my_tenant = 'tenant1' - routes = [{'destination': '135.207.0.0/16', 'nexthop': '10.0.1.3'}] - with self.subnet(cidr='10.0.1.0/24', tenant_id='notme') as ext_subnet: + with self.subnet(cidr='10.0.1.0/24', tenant_id='notme') as ext_subnet,\ + self.port(subnet=ext_subnet) as nexthop_port: + nexthop_ip = nexthop_port['port']['fixed_ips'][0]['ip_address'] + routes = [{'destination': '135.207.0.0/16', + 'nexthop': nexthop_ip}] self._set_net_external(ext_subnet['subnet']['network_id']) ext_info = {'network_id': ext_subnet['subnet']['network_id']} with self.router( @@ -101,8 +105,11 @@ class ExtraRouteDBTestCaseBase(object): def test_route_update_with_route_via_another_tenant_subnet(self): my_tenant = 'tenant1' - routes = [{'destination': '135.207.0.0/16', 'nexthop': '10.0.1.3'}] - with self.subnet(cidr='10.0.1.0/24', tenant_id='notme') as subnet: + with self.subnet(cidr='10.0.1.0/24', tenant_id='notme') as subnet,\ + self.port(subnet=subnet) as nexthop_port: + nexthop_ip = nexthop_port['port']['fixed_ips'][0]['ip_address'] + routes = [{'destination': '135.207.0.0/16', + 'nexthop': nexthop_ip}] with self.router(tenant_id=my_tenant) as r: body = self._routes_update_prepare( r['router']['id'], subnet['subnet']['id'], None, routes, @@ -118,7 +125,8 @@ class ExtraRouteDBTestCaseBase(object): 'nexthop': '10.0.1.5'}] with self.router() as r: with self.subnet(cidr='10.0.1.0/24') as s: - with self.port(subnet=s) as p: + fixed_ip_data = [{'ip_address': '10.0.1.2'}] + with self.port(subnet=s, fixed_ips=fixed_ip_data) as p: self._routes_update_prepare(r['router']['id'], None, p['port']['id'], routes) body = self._update('routers', r['router']['id'], @@ -132,7 +140,8 @@ class ExtraRouteDBTestCaseBase(object): 'nexthop': '10.0.1.3'}] with self.router() as r: with self.subnet(cidr='10.0.1.0/24') as s: - with self.port(subnet=s) as p: + fixed_ip_data = [{'ip_address': '10.0.1.2'}] + with self.port(subnet=s, fixed_ips=fixed_ip_data) as p: body = self._routes_update_prepare(r['router']['id'], None, p['port']['id'], routes) @@ -156,7 +165,8 @@ class ExtraRouteDBTestCaseBase(object): 'nexthop': '10.0.1.5'}] with self.router() as r: with self.subnet(cidr='10.0.1.0/24') as s: - with self.port(subnet=s) as p: + fixed_ip_data = [{'ip_address': '10.0.1.2'}] + with self.port(subnet=s, fixed_ips=fixed_ip_data) as p: body = self._routes_update_prepare(r['router']['id'], None, p['port']['id'], routes) @@ -168,14 +178,17 @@ class ExtraRouteDBTestCaseBase(object): None, r['router']['id'], []) def test_routes_update_for_multiple_routers(self): - routes1 = [{'destination': '135.207.0.0/16', - 'nexthop': '10.0.0.3'}] - routes2 = [{'destination': '12.0.0.0/8', - 'nexthop': '10.0.0.4'}] with self.router() as r1,\ self.router() as r2,\ self.subnet(cidr='10.0.0.0/24') as s: - with self.port(subnet=s) as p1, self.port(subnet=s) as p2: + with self.port(subnet=s) as p1,\ + self.port(subnet=s) as p2: + p1_ip = p1['port']['fixed_ips'][0]['ip_address'] + p2_ip = p2['port']['fixed_ips'][0]['ip_address'] + routes1 = [{'destination': '135.207.0.0/16', + 'nexthop': p2_ip}] + routes2 = [{'destination': '12.0.0.0/8', + 'nexthop': p1_ip}] body = self._routes_update_prepare(r1['router']['id'], None, p1['port']['id'], routes1) @@ -204,7 +217,8 @@ class ExtraRouteDBTestCaseBase(object): 'nexthop': '10.0.1.5'}] with self.router() as r: with self.subnet(cidr='10.0.1.0/24') as s: - with self.port(subnet=s) as p: + fixed_ip_data = [{'ip_address': '10.0.1.2'}] + with self.port(subnet=s, fixed_ips=fixed_ip_data) as p: body = self._routes_update_prepare(r['router']['id'], None, p['port']['id'], routes_orig) @@ -435,22 +449,23 @@ class ExtraRouteDBTestCaseBase(object): port_list = self.deserialize('json', port_res) self.assertEqual(1, len(port_list['ports'])) - routes = [{'destination': '135.207.0.0/16', - 'nexthop': '10.0.1.3'}] + with self.port(subnet=s) as p: + next_hop = p['port']['fixed_ips'][0]['ip_address'] + routes = [{'destination': '135.207.0.0/16', + 'nexthop': next_hop}] + body = self._update('routers', r['router']['id'], + {'router': {'routes': + routes}}) - body = self._update('routers', r['router']['id'], - {'router': {'routes': - routes}}) + body = self._show('routers', r['router']['id']) + self.assertEqual(routes, body['router']['routes']) - body = self._show('routers', r['router']['id']) - self.assertEqual(routes, body['router']['routes']) - - self._remove_external_gateway_from_router( - r['router']['id'], - s['subnet']['network_id']) - body = self._show('routers', r['router']['id']) - gw_info = body['router']['external_gateway_info'] - self.assertIsNone(gw_info) + self._remove_external_gateway_from_router( + r['router']['id'], + s['subnet']['network_id']) + body = self._show('routers', r['router']['id']) + gw_info = body['router']['external_gateway_info'] + self.assertIsNone(gw_info) def test_router_list_with_sort(self): with self.router(name='router1') as router1,\ diff --git a/neutron/tests/unit/extensions/test_l3.py b/neutron/tests/unit/extensions/test_l3.py index 09191ebfa1e..2311fc20b2e 100644 --- a/neutron/tests/unit/extensions/test_l3.py +++ b/neutron/tests/unit/extensions/test_l3.py @@ -2478,11 +2478,9 @@ class L3NatTestCaseBase(L3NatTestCaseMixin): ) as private_port: fp1 = self._make_floatingip(self.fmt, network_ex_id1, - private_port['port']['id'], - floating_ip='10.0.0.3') + private_port['port']['id']) fp2 = self._make_floatingip(self.fmt, network_ex_id2, - private_port['port']['id'], - floating_ip='11.0.0.3') + private_port['port']['id']) self.assertEqual(fp1['floatingip']['router_id'], r1['router']['id']) self.assertEqual(fp2['floatingip']['router_id'], @@ -2519,8 +2517,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin): ) as private_port: fp = self._make_floatingip(self.fmt, network_ex_id, - private_port['port']['id'], - floating_ip='10.0.0.8') + private_port['port']['id']) self.assertEqual(r1['router']['id'], fp['floatingip']['router_id']) @@ -2681,8 +2678,9 @@ class L3NatTestCaseBase(L3NatTestCaseMixin): self.subnet(cidr="192.168.1.0/24", ip_version=4, network=n): self._set_net_external(n['network']['id']) fip = self._make_floatingip(self.fmt, n['network']['id']) - self.assertEqual('192.168.1.2', - fip['floatingip']['floating_ip_address']) + fip_set = netaddr.IPSet(netaddr.IPNetwork("192.168.1.0/24")) + fip_ip = fip['floatingip']['floating_ip_address'] + self.assertTrue(netaddr.IPAddress(fip_ip) in fip_set) def test_create_floatingip_with_assoc_to_ipv6_subnet(self): with self.subnet() as public_sub: diff --git a/neutron/tests/unit/ipam/drivers/neutrondb_ipam/test_driver.py b/neutron/tests/unit/ipam/drivers/neutrondb_ipam/test_driver.py index d0a04c9674e..a54b5d68a40 100644 --- a/neutron/tests/unit/ipam/drivers/neutrondb_ipam/test_driver.py +++ b/neutron/tests/unit/ipam/drivers/neutrondb_ipam/test_driver.py @@ -355,20 +355,16 @@ class TestNeutronDbIpamSubnet(testlib_api.SqlTestCase, return ipam_subnet.allocate(address_request) def test_allocate_any_v4_address_succeeds(self): - ip_address = self._allocate_address( - '10.0.0.0/24', 4, ipam_req.AnyAddressRequest) - # As the DB IPAM driver allocation logic is strictly sequential, we can - # expect this test to allocate the .2 address as .1 is used by default - # as subnet gateway - self.assertEqual('10.0.0.2', ip_address) + self._test_allocate_any_address_succeeds('10.0.0.0/24', 4) def test_allocate_any_v6_address_succeeds(self): + self._test_allocate_any_address_succeeds('fde3:abcd:4321:1::/64', 6) + + def _test_allocate_any_address_succeeds(self, subnet_cidr, ip_version): ip_address = self._allocate_address( - 'fde3:abcd:4321:1::/64', 6, ipam_req.AnyAddressRequest) - # As the DB IPAM driver allocation logic is strictly sequential, we can - # expect this test to allocate the .2 address as .1 is used by default - # as subnet gateway - self.assertEqual('fde3:abcd:4321:1::2', ip_address) + subnet_cidr, ip_version, ipam_req.AnyAddressRequest) + self.assertIn(netaddr.IPAddress(ip_address), + netaddr.IPSet(netaddr.IPNetwork(subnet_cidr))) def test_allocate_specific_v4_address_succeeds(self): ip_address = self._allocate_address( diff --git a/neutron/tests/unit/plugins/ml2/drivers/l2pop/test_mech_driver.py b/neutron/tests/unit/plugins/ml2/drivers/l2pop/test_mech_driver.py index 22114fed607..4af7699ecdd 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/l2pop/test_mech_driver.py +++ b/neutron/tests/unit/plugins/ml2/drivers/l2pop/test_mech_driver.py @@ -636,7 +636,7 @@ class TestL2PopulationRpcTestCase(test_plugin.Ml2PluginV2TestCase): arg_list=(portbindings.HOST_ID,), **host_arg) as port1: p1 = port1['port'] - + p1_ip = p1['fixed_ips'][0]['ip_address'] self.mock_fanout.reset_mock() device = 'tap' + p1['id'] @@ -667,7 +667,7 @@ class TestL2PopulationRpcTestCase(test_plugin.Ml2PluginV2TestCase): '20.0.0.1': [ l2pop_rpc.PortInfo('00:00:00:00:00:00', '0.0.0.0'), - l2pop_rpc.PortInfo(new_mac, '10.0.0.2') + l2pop_rpc.PortInfo(new_mac, p1_ip) ] } } @@ -680,9 +680,12 @@ class TestL2PopulationRpcTestCase(test_plugin.Ml2PluginV2TestCase): with self.subnet(network=self._network) as subnet: host_arg = {portbindings.HOST_ID: HOST} + fixed_ips = [{'subnet_id': subnet['subnet']['id'], + 'ip_address': '10.0.0.2'}] with self.port(subnet=subnet, cidr='10.0.0.0/24', device_owner=DEVICE_OWNER_COMPUTE, arg_list=(portbindings.HOST_ID,), + fixed_ips=fixed_ips, **host_arg) as port1: p1 = port1['port'] diff --git a/neutron/tests/unit/plugins/ml2/extensions/test_dns_integration.py b/neutron/tests/unit/plugins/ml2/extensions/test_dns_integration.py index 60f4cf7f690..c3e1e8fed88 100644 --- a/neutron/tests/unit/plugins/ml2/extensions/test_dns_integration.py +++ b/neutron/tests/unit/plugins/ml2/extensions/test_dns_integration.py @@ -20,6 +20,7 @@ from neutron import context from neutron.db import dns_db from neutron.extensions import dns from neutron.extensions import providernet as pnet +from neutron import manager from neutron.plugins.ml2 import config from neutron.plugins.ml2.extensions import dns_integration from neutron.tests.unit.plugins.ml2 import test_plugin @@ -51,6 +52,7 @@ class DNSIntegrationTestCase(test_plugin.Ml2PluginV2TestCase): super(DNSIntegrationTestCase, self).setUp() dns_integration.DNS_DRIVER = None dns_integration.subscribe() + self.plugin = manager.NeutronManager.get_plugin() def _create_port_for_test(self, provider_net=True, dns_domain=True, dns_name=True, ipv4=True, ipv6=True): @@ -69,11 +71,13 @@ class DNSIntegrationTestCase(test_plugin.Ml2PluginV2TestCase): **net_kwargs) network = self.deserialize(self.fmt, res) if ipv4: - self._create_subnet(self.fmt, network['network']['id'], - '10.0.0.0/24', ip_version=4) + cidr = '10.0.0.0/24' + self._create_subnet_for_test(network['network']['id'], cidr) + if ipv6: - self._create_subnet(self.fmt, network['network']['id'], - 'fd3d:bdd4:da60::/64', ip_version=6) + cidr = 'fd3d:bdd4:da60::/64' + self._create_subnet_for_test(network['network']['id'], cidr) + port_kwargs = {} if dns_name: port_kwargs = { @@ -90,6 +94,19 @@ class DNSIntegrationTestCase(test_plugin.Ml2PluginV2TestCase): port_id=port['id']).one_or_none() return network['network'], port, dns_data_db + def _create_subnet_for_test(self, network_id, cidr): + ip_net = netaddr.IPNetwork(cidr) + # initialize the allocation_pool to the lower half of the subnet + subnet_size = ip_net.last - ip_net.first + subnet_mid_point = int(ip_net.first + subnet_size / 2) + start, end = (netaddr.IPAddress(ip_net.first + 2), + netaddr.IPAddress(subnet_mid_point)) + allocation_pools = [{'start': str(start), + 'end': str(end)}] + return self._create_subnet(self.fmt, network_id, + str(ip_net), ip_version=ip_net.ip.version, + allocation_pools=allocation_pools) + def _update_port_for_test(self, port, new_dns_name=NEWDNSNAME, **kwargs): mock_client.reset_mock() @@ -100,7 +117,7 @@ class DNSIntegrationTestCase(test_plugin.Ml2PluginV2TestCase): recordsets = [] if records_v4: recordsets.append({'id': V4UUID, 'records': records_v4}) - if records_v4: + if records_v6: recordsets.append({'id': V6UUID, 'records': records_v6}) mock_client.recordsets.list.return_value = recordsets mock_admin_client.reset_mock() @@ -362,10 +379,26 @@ class DNSIntegrationTestCase(test_plugin.Ml2PluginV2TestCase): config.cfg.CONF.set_override('dns_domain', DNSDOMAIN) net, port, dns_data_db = self._create_port_for_test() original_ips = [ip['ip_address'] for ip in port['fixed_ips']] + ctx = context.get_admin_context() kwargs = {'fixed_ips': []} for ip in port['fixed_ips']: + # Since this tests using an "any" IP allocation to update the port + # IP address, change the allocation pools so that IPAM won't ever + # give us back the IP address we originally had. + subnet = self.plugin.get_subnet(ctx, ip['subnet_id']) + ip_net = netaddr.IPNetwork(subnet['cidr']) + subnet_size = ip_net.last - ip_net.first + subnet_mid_point = int(ip_net.first + subnet_size / 2) + start, end = (netaddr.IPAddress(subnet_mid_point + 1), + netaddr.IPAddress(ip_net.last - 1)) + allocation_pools = [{'start': str(start), 'end': str(end)}] + body = {'allocation_pools': allocation_pools} + req = self.new_update_request('subnets', {'subnet': body}, + ip['subnet_id']) + req.get_response(self.api) kwargs['fixed_ips'].append( {'subnet_id': ip['subnet_id']}) + port, dns_data_db = self._update_port_for_test(port, new_dns_name=None, **kwargs) diff --git a/neutron/tests/unit/plugins/ml2/test_plugin.py b/neutron/tests/unit/plugins/ml2/test_plugin.py index 6ed3678f660..d83c8ff63b5 100644 --- a/neutron/tests/unit/plugins/ml2/test_plugin.py +++ b/neutron/tests/unit/plugins/ml2/test_plugin.py @@ -559,7 +559,9 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase): def test_update_port_fixed_ip_changed(self): ctx = context.get_admin_context() plugin = manager.NeutronManager.get_plugin() - with self.port() as port, mock.patch.object( + fixed_ip_data = [{'ip_address': '10.0.0.4'}] + with self.port(fixed_ips=fixed_ip_data) as port,\ + mock.patch.object( plugin.notifier, 'security_groups_member_updated') as sg_member_update: port['port']['fixed_ips'][0]['ip_address'] = '10.0.0.3' @@ -829,14 +831,20 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase): self._test_operation_resillient_to_ipallocation_failure(make_port) def test_port_update_resillient_to_duplicate_records(self): - with self.port() as p: - data = {'port': {'fixed_ips': [{'ip_address': '10.0.0.9'}]}} - req = self.new_update_request('ports', data, p['port']['id']) + cidr = '10.0.0.0/24' + allocation_pools = [{'start': '10.0.0.2', 'end': '10.0.0.8'}] + with self.subnet(cidr=cidr, + allocation_pools=allocation_pools) as subnet: + with self.port(subnet=subnet) as p: + data = {'port': {'fixed_ips': [{'ip_address': '10.0.0.9'}]}} + req = self.new_update_request('ports', data, p['port']['id']) - def do_request(): - self.assertEqual(200, req.get_response(self.api).status_int) + def do_request(): + self.assertEqual(200, + req.get_response(self.api).status_int) - self._test_operation_resillient_to_ipallocation_failure(do_request) + self._test_operation_resillient_to_ipallocation_failure( + do_request) def _test_operation_resillient_to_ipallocation_failure(self, func): from sqlalchemy import event