diff --git a/neutron/db/db_base_plugin_common.py b/neutron/db/db_base_plugin_common.py index f6ab5913580..3f4ede29798 100644 --- a/neutron/db/db_base_plugin_common.py +++ b/neutron/db/db_base_plugin_common.py @@ -110,6 +110,9 @@ class DbBasePluginCommon(common_db_mixin.CommonDbMixin): subnet_id=subnet_id ) context.session.add(allocated) + # Flush now to ensure duplicates properly trigger retry + context.session.flush() + # NOTE(kevinbenton): We add this to the session info so the sqlalchemy # object isn't immediately garbage collected. Otherwise when the # fixed_ips relationship is referenced a new persistent object will be diff --git a/neutron/db/ipam_non_pluggable_backend.py b/neutron/db/ipam_non_pluggable_backend.py index abc0098c972..d2c26eac305 100644 --- a/neutron/db/ipam_non_pluggable_backend.py +++ b/neutron/db/ipam_non_pluggable_backend.py @@ -13,6 +13,10 @@ # License for the specific language governing permissions and limitations # under the License. +import collections +import itertools +import random + import netaddr from neutron_lib import constants from neutron_lib import exceptions as n_exc @@ -36,51 +40,58 @@ LOG = logging.getLogger(__name__) class IpamNonPluggableBackend(ipam_backend_mixin.IpamBackendMixin): @staticmethod - def _generate_ip(context, subnets): - try: - return IpamNonPluggableBackend._try_generate_ip(context, subnets) - except n_exc.IpAddressGenerationFailure: - IpamNonPluggableBackend._rebuild_availability_ranges(context, - subnets) - - return IpamNonPluggableBackend._try_generate_ip(context, subnets) - - @staticmethod - def _try_generate_ip(context, subnets): + def _generate_ip(context, subnets, filtered_ips=None): """Generate an IP address. The IP address will be generated from one of the subnets defined on the network. """ - range_qry = context.session.query( - models_v2.IPAvailabilityRange).join( - models_v2.IPAllocationPool).with_lockmode('update') - for subnet in subnets: - ip_range = range_qry.filter_by(subnet_id=subnet['id']).first() - if not ip_range: - LOG.debug("All IPs from subnet %(subnet_id)s (%(cidr)s) " - "allocated", - {'subnet_id': subnet['id'], - 'cidr': subnet['cidr']}) + filtered_ips = filtered_ips or [] + subnet_id_list = [subnet['id'] for subnet in subnets] + pool_qry = context.session.query(models_v2.IPAllocationPool) + pool_qry = pool_qry.filter( + models_v2.IPAllocationPool.subnet_id.in_(subnet_id_list)) + + allocation_qry = context.session.query(models_v2.IPAllocation) + allocation_qry = allocation_qry.filter( + models_v2.IPAllocation.subnet_id.in_(subnet_id_list)) + + ip_allocations = collections.defaultdict(netaddr.IPSet) + for ipallocation in allocation_qry: + subnet_ip_allocs = ip_allocations[ipallocation.subnet_id] + subnet_ip_allocs.add(netaddr.IPAddress(ipallocation.ip_address)) + + ip_pools = collections.defaultdict(netaddr.IPSet) + for ip_pool in pool_qry: + subnet_ip_pools = ip_pools[ip_pool.subnet_id] + subnet_ip_pools.add(netaddr.IPRange(ip_pool.first_ip, + ip_pool.last_ip)) + + for subnet_id in ip_pools: + subnet_ip_pools = ip_pools[subnet_id] + subnet_ip_allocs = ip_allocations[subnet_id] + filter_set = netaddr.IPSet() + for ip in filtered_ips: + filter_set.add(netaddr.IPAddress(ip)) + + av_set = subnet_ip_pools.difference(subnet_ip_allocs) + av_set = av_set.difference(filter_set) + + av_set_size = av_set.size + if av_set_size == 0: continue - ip_address = ip_range['first_ip'] - if ip_range['first_ip'] == ip_range['last_ip']: - # No more free indices on subnet => delete - LOG.debug("No more free IP's in slice. Deleting " - "allocation pool.") - context.session.delete(ip_range) - else: - # increment the first free - new_first_ip = str(netaddr.IPAddress(ip_address) + 1) - ip_range['first_ip'] = new_first_ip - LOG.debug("Allocated IP - %(ip_address)s from %(first_ip)s " - "to %(last_ip)s", - {'ip_address': ip_address, - 'first_ip': ip_range['first_ip'], - 'last_ip': ip_range['last_ip']}) - return {'ip_address': ip_address, - 'subnet_id': subnet['id']} - raise n_exc.IpAddressGenerationFailure(net_id=subnets[0]['network_id']) + + # Compute a window size, select an index inside the window, then + # select the IP address at the selected index within the window + window = min(av_set_size, 10) + ip_index = random.randint(1, window) + candidate_ips = list(itertools.islice(av_set, ip_index)) + if candidate_ips: + allocated_ip = candidate_ips[-1] + return {'ip_address': str(allocated_ip), + 'subnet_id': subnet_id} + raise n_exc.IpAddressGenerationFailure( + net_id=subnets[0]['network_id']) @staticmethod def _rebuild_availability_ranges(context, subnets): @@ -285,6 +296,7 @@ class IpamNonPluggableBackend(ipam_backend_mixin.IpamBackendMixin): # those IPs happen to be next in the line for allocation for ones that # didn't ask for a specific IP fixed_ips.sort(key=lambda x: 'ip_address' not in x) + allocated_ips = [] for fixed in fixed_ips: subnet = self._get_subnet(context, fixed['subnet_id']) is_auto_addr = ipv6_utils.is_auto_address_subnet(subnet) @@ -293,6 +305,7 @@ class IpamNonPluggableBackend(ipam_backend_mixin.IpamBackendMixin): # Remove the IP address from the allocation pool IpamNonPluggableBackend._allocate_specific_ip( context, fixed['subnet_id'], fixed['ip_address']) + allocated_ips.append(fixed['ip_address']) ips.append({'ip_address': fixed['ip_address'], 'subnet_id': fixed['subnet_id']}) # Only subnet ID is specified => need to generate IP @@ -307,7 +320,8 @@ class IpamNonPluggableBackend(ipam_backend_mixin.IpamBackendMixin): else: subnets = [subnet] # IP address allocation - result = self._generate_ip(context, subnets) + result = self._generate_ip(context, subnets, allocated_ips) + allocated_ips.append(result['ip_address']) ips.append({'ip_address': result['ip_address'], 'subnet_id': result['subnet_id']}) return ips diff --git a/neutron/db/ipam_pluggable_backend.py b/neutron/db/ipam_pluggable_backend.py index 7be6620d8ae..9c87f32b291 100644 --- a/neutron/db/ipam_pluggable_backend.py +++ b/neutron/db/ipam_pluggable_backend.py @@ -313,13 +313,13 @@ class IpamPluggableBackend(ipam_backend_mixin.IpamBackendMixin): try: # Check if the IPs need to be updated network_id = db_port['network_id'] + for ip in changes.remove: + self._delete_ip_allocation(context, network_id, + ip['subnet_id'], ip['ip_address']) for ip in changes.add: self._store_ip_allocation( context, ip['ip_address'], network_id, ip['subnet_id'], db_port.id) - for ip in changes.remove: - self._delete_ip_allocation(context, network_id, - ip['subnet_id'], ip['ip_address']) self._update_db_port(context, db_port, new_port, network_id, new_mac) except Exception: diff --git a/neutron/ipam/drivers/neutrondb_ipam/driver.py b/neutron/ipam/drivers/neutrondb_ipam/driver.py index 813cadfbc53..cf46000426e 100644 --- a/neutron/ipam/drivers/neutrondb_ipam/driver.py +++ b/neutron/ipam/drivers/neutrondb_ipam/driver.py @@ -13,14 +13,15 @@ # License for the specific language governing permissions and limitations # under the License. +import itertools +import random + import netaddr from neutron_lib import exceptions as n_exc -from oslo_db import exception as db_exc from oslo_log import log from oslo_utils import uuidutils from neutron._i18n import _, _LE -from neutron.common import ipv6_utils from neutron.ipam import driver as ipam_base from neutron.ipam.drivers.neutrondb_ipam import db_api as ipam_db_api from neutron.ipam import exceptions as ipam_exc @@ -153,99 +154,6 @@ class NeutronDbSubnet(ipam_base.Subnet): subnet_id=self.subnet_manager.neutron_id, ip=ip_address) - def _allocate_specific_ip(self, session, ip_address, - allocation_pool_id=None, - auto_generated=False): - """Remove an IP address from subnet's availability ranges. - - This method is supposed to be called from within a database - transaction, otherwise atomicity and integrity might not be - enforced and the operation might result in incosistent availability - ranges for the subnet. - - :param session: database session - :param ip_address: ip address to mark as allocated - :param allocation_pool_id: identifier of the allocation pool from - which the ip address has been extracted. If not specified this - routine will scan all allocation pools. - :param auto_generated: indicates whether ip was auto generated - :returns: list of IP ranges as instances of IPAvailabilityRange - """ - # Return immediately for EUI-64 addresses. For this - # class of subnets availability ranges do not apply - if ipv6_utils.is_eui64_address(ip_address): - return - - LOG.debug("Removing %(ip_address)s from availability ranges for " - "subnet id:%(subnet_id)s", - {'ip_address': ip_address, - 'subnet_id': self.subnet_manager.neutron_id}) - # Netaddr's IPRange and IPSet objects work very well even with very - # large subnets, including IPv6 ones. - final_ranges = [] - ip_in_pools = False - if allocation_pool_id: - av_ranges = self.subnet_manager.list_ranges_by_allocation_pool( - session, allocation_pool_id) - else: - av_ranges = self.subnet_manager.list_ranges_by_subnet_id(session) - for db_range in av_ranges: - initial_ip_set = netaddr.IPSet(netaddr.IPRange( - db_range['first_ip'], db_range['last_ip'])) - final_ip_set = initial_ip_set - netaddr.IPSet([ip_address]) - if not final_ip_set: - ip_in_pools = True - # Range exhausted - bye bye - if not self.subnet_manager.delete_range(session, db_range): - raise db_exc.RetryRequest(ipam_exc.IPAllocationFailed()) - continue - if initial_ip_set == final_ip_set: - # IP address does not fall within the current range, move - # to the next one - final_ranges.append(db_range) - continue - ip_in_pools = True - for new_range in final_ip_set.iter_ipranges(): - # store new range in database - # use netaddr.IPAddress format() method which is equivalent - # to str(...) but also enables us to use different - # representation formats (if needed) for IPv6. - first_ip = netaddr.IPAddress(new_range.first) - last_ip = netaddr.IPAddress(new_range.last) - if (db_range['first_ip'] == first_ip.format() or - db_range['last_ip'] == last_ip.format()): - rows = self.subnet_manager.update_range( - session, db_range, first_ip=first_ip, last_ip=last_ip) - if not rows: - raise db_exc.RetryRequest( - ipam_exc.IPAllocationFailed()) - LOG.debug("Adjusted availability range for pool %s", - db_range['allocation_pool_id']) - final_ranges.append(db_range) - else: - new_ip_range = self.subnet_manager.create_range( - session, - db_range['allocation_pool_id'], - first_ip.format(), - last_ip.format()) - LOG.debug("Created availability range for pool %s", - new_ip_range['allocation_pool_id']) - final_ranges.append(new_ip_range) - - # If ip is autogenerated it should be present in allocation pools, - # so retry if it is not there - if auto_generated and not ip_in_pools: - raise db_exc.RetryRequest(ipam_exc.IPAllocationFailed()) - # Most callers might ignore this return value, which is however - # useful for testing purposes - LOG.debug("Availability ranges for subnet id %(subnet_id)s " - "modified: %(new_ranges)s", - {'subnet_id': self.subnet_manager.neutron_id, - 'new_ranges': ", ".join(["[%s; %s]" % - (r['first_ip'], r['last_ip']) for - r in final_ranges])}) - return final_ranges - def _rebuild_availability_ranges(self, session): """Rebuild availability ranges. @@ -300,29 +208,27 @@ class NeutronDbSubnet(ipam_base.Subnet): session.add(av_range) def _generate_ip(self, session): - try: - return self._try_generate_ip(session) - except ipam_exc.IpAddressGenerationFailure: - self._rebuild_availability_ranges(session) + """Generate an IP address from the set of available addresses.""" + ip_allocations = netaddr.IPSet() + for ipallocation in self.subnet_manager.list_allocations(session): + ip_allocations.add(netaddr.IPAddress(ipallocation.ip_address)) - return self._try_generate_ip(session) + for ip_pool in self.subnet_manager.list_pools(session): + ip_set = netaddr.IPSet() + ip_set.add(netaddr.IPRange(ip_pool.first_ip, ip_pool.last_ip)) + av_set = ip_set.difference(ip_allocations) + if av_set.size == 0: + continue - def _try_generate_ip(self, session): - """Generate an IP address from availability ranges.""" - ip_range = self.subnet_manager.get_first_range(session) - if not ip_range: - LOG.debug("All IPs from subnet %(subnet_id)s allocated", - {'subnet_id': self.subnet_manager.neutron_id}) - raise ipam_exc.IpAddressGenerationFailure( - subnet_id=self.subnet_manager.neutron_id) - # A suitable range was found. Return IP address. - ip_address = ip_range['first_ip'] - LOG.debug("Allocated IP - %(ip_address)s from range " - "[%(first_ip)s; %(last_ip)s]", - {'ip_address': ip_address, - 'first_ip': ip_address, - 'last_ip': ip_range['last_ip']}) - return ip_address, ip_range['allocation_pool_id'] + # Compute a value for the selection window + window = min(av_set.size, 10) + ip_index = random.randint(1, window) + candidate_ips = list(itertools.islice(av_set, ip_index)) + allocated_ip = candidate_ips[-1] + return str(allocated_ip), ip_pool.id + + raise ipam_exc.IpAddressGenerationFailure( + subnet_id=self.subnet_manager.neutron_id) def allocate(self, address_request): # NOTE(pbondar): Ipam driver is always called in context of already @@ -331,7 +237,6 @@ class NeutronDbSubnet(ipam_base.Subnet): # should not create new nested transaction blocks. session = self._context.session all_pool_id = None - auto_generated = False # NOTE(salv-orlando): It would probably better to have a simpler # model for address requests and just check whether there is a # specific IP address specified in address_request @@ -342,9 +247,7 @@ class NeutronDbSubnet(ipam_base.Subnet): self._verify_ip(session, ip_address) else: ip_address, all_pool_id = self._generate_ip(session) - auto_generated = True - self._allocate_specific_ip(session, ip_address, all_pool_id, - auto_generated) + # Create IP allocation request object # The only defined status at this stage is 'ALLOCATED'. # More states will be available in the future - e.g.: RECYCLABLE diff --git a/neutron/tests/functional/db/test_ipam.py b/neutron/tests/functional/db/test_ipam.py index 308e3bfb77e..2f207f56bc9 100644 --- a/neutron/tests/functional/db/test_ipam.py +++ b/neutron/tests/functional/db/test_ipam.py @@ -147,76 +147,24 @@ class IpamTestCase(object): 'ip_address': fixed_ip[0].get('ip_address'), 'subnet_id': self.subnet_id, 'network_id': self.network_id}] - ip_avail_ranges_expected = [{'first_ip': '10.10.10.2', - 'last_ip': '10.10.10.2'}, - {'first_ip': '10.10.10.4', - 'last_ip': '10.10.10.6'}] ip_alloc_pool_expected = [{'first_ip': '10.10.10.2', 'last_ip': '10.10.10.6', 'subnet_id': self.subnet_id}] self.assert_ip_alloc_matches(ip_alloc_expected) self.assert_ip_alloc_pool_matches(ip_alloc_pool_expected) - self.assert_ip_avail_range_matches( - ip_avail_ranges_expected) - - def test_allocate_first_available_ip(self): - self._create_port(self.port_id) - ip_alloc_expected = [{'port_id': self.port_id, - 'ip_address': '10.10.10.2', - 'subnet_id': self.subnet_id, - 'network_id': self.network_id}] - ip_avail_ranges_expected = [{'first_ip': '10.10.10.3', - 'last_ip': '10.10.10.6'}] - ip_alloc_pool_expected = [{'first_ip': '10.10.10.2', - 'last_ip': '10.10.10.6', - 'subnet_id': self.subnet_id}] - self.assert_ip_alloc_matches(ip_alloc_expected) - self.assert_ip_alloc_pool_matches(ip_alloc_pool_expected) - self.assert_ip_avail_range_matches( - ip_avail_ranges_expected) def test_allocate_ip_exausted_pool(self): # available from .2 up to .6 -> 5 for i in range(1, 6): self._create_port(self.port_id + str(i)) - ip_avail_ranges_expected = [] ip_alloc_pool_expected = [{'first_ip': '10.10.10.2', 'last_ip': '10.10.10.6', 'subnet_id': self.subnet_id}] self.assert_ip_alloc_pool_matches(ip_alloc_pool_expected) - self.assert_ip_avail_range_matches( - ip_avail_ranges_expected) - # Create another port with testtools.ExpectedException(n_exc.IpAddressGenerationFailure): self._create_port(self.port_id) - def test_rebuild_availability_range(self): - for i in range(1, 6): - self._create_port(self.port_id + str(i)) - - ip_avail_ranges_expected = [] - ip_alloc_pool_expected = [{'first_ip': '10.10.10.2', - 'last_ip': '10.10.10.6', - 'subnet_id': self.subnet_id}] - self.assert_ip_alloc_pool_matches(ip_alloc_pool_expected) - self.assert_ip_avail_range_matches( - ip_avail_ranges_expected) - # Delete some ports, this will free the first two IPs - for i in range(1, 3): - self.plugin.delete_port(self.cxt, self.port_id + str(i)) - # Create another port, this will trigger the rebuilding of the - # availability ranges - self._create_port(self.port_id) - ip_avail_ranges_expected = [{'first_ip': '10.10.10.3', - 'last_ip': '10.10.10.3'}] - - ip_alloc = self.cxt.session.query(models_v2.IPAllocation).all() - self.assertEqual(4, len(ip_alloc)) - self.assert_ip_alloc_pool_matches(ip_alloc_pool_expected) - self.assert_ip_avail_range_matches( - ip_avail_ranges_expected) - class TestIpamMySql(common_base.MySQLTestCase, base.BaseTestCase, IpamTestCase): diff --git a/neutron/tests/unit/db/test_ipam_non_pluggable_backend.py b/neutron/tests/unit/db/test_ipam_non_pluggable_backend.py index a77c244a0c1..d16784ad158 100644 --- a/neutron/tests/unit/db/test_ipam_non_pluggable_backend.py +++ b/neutron/tests/unit/db/test_ipam_non_pluggable_backend.py @@ -15,7 +15,6 @@ import mock from neutron_lib import constants as n_const -from neutron_lib import exceptions as n_exc from oslo_config import cfg from neutron.common import constants @@ -30,31 +29,6 @@ from neutron.tests import base class TestIpamNonPluggableBackend(base.BaseTestCase): """Unit Tests for non pluggable IPAM Logic.""" - def test_generate_ip(self): - with mock.patch.object(non_ipam.IpamNonPluggableBackend, - '_try_generate_ip') as generate: - with mock.patch.object(non_ipam.IpamNonPluggableBackend, - '_rebuild_availability_ranges') as rebuild: - - non_ipam.IpamNonPluggableBackend._generate_ip('c', 's') - - generate.assert_called_once_with('c', 's') - self.assertEqual(0, rebuild.call_count) - - def test_generate_ip_exhausted_pool(self): - with mock.patch.object(non_ipam.IpamNonPluggableBackend, - '_try_generate_ip') as generate: - with mock.patch.object(non_ipam.IpamNonPluggableBackend, - '_rebuild_availability_ranges') as rebuild: - - exception = n_exc.IpAddressGenerationFailure(net_id='n') - # fail first call but not second - generate.side_effect = [exception, None] - non_ipam.IpamNonPluggableBackend._generate_ip('c', 's') - - self.assertEqual(2, generate.call_count) - rebuild.assert_called_once_with('c', 's') - def _validate_rebuild_availability_ranges(self, pools, allocations, expected): ip_qry = mock.Mock() diff --git a/neutron/tests/unit/ipam/drivers/neutrondb_ipam/test_driver.py b/neutron/tests/unit/ipam/drivers/neutrondb_ipam/test_driver.py index 19b58ff81d3..191b111394b 100644 --- a/neutron/tests/unit/ipam/drivers/neutrondb_ipam/test_driver.py +++ b/neutron/tests/unit/ipam/drivers/neutrondb_ipam/test_driver.py @@ -20,7 +20,6 @@ from neutron_lib import exceptions as n_exc from neutron.common import constants as n_const from neutron import context -from neutron.db import api as ndb_api from neutron.ipam.drivers.neutrondb_ipam import db_models from neutron.ipam.drivers.neutrondb_ipam import driver from neutron.ipam import exceptions as ipam_exc @@ -281,75 +280,6 @@ class TestNeutronDbIpamSubnet(testlib_api.SqlTestCase, self.ctx.session, '10.0.0.0') - def test__allocate_specific_ip(self): - cidr = '10.0.0.0/24' - ipam_subnet = self._create_and_allocate_ipam_subnet(cidr)[0] - with self.ctx.session.begin(): - ranges = ipam_subnet._allocate_specific_ip( - self.ctx.session, '10.0.0.33') - self.assertEqual(2, len(ranges)) - # 10.0.0.1 should be allocated for gateway ip - ranges.sort(key=convert_firstip_to_ipaddress) - self.assertEqual('10.0.0.2', ranges[0]['first_ip']) - self.assertEqual('10.0.0.32', ranges[0]['last_ip']) - self.assertEqual('10.0.0.34', ranges[1]['first_ip']) - self.assertEqual('10.0.0.254', ranges[1]['last_ip']) - # Limit test - first address in range - ranges = ipam_subnet._allocate_specific_ip( - self.ctx.session, '10.0.0.2') - self.assertEqual(2, len(ranges)) - ranges.sort(key=convert_firstip_to_ipaddress) - self.assertEqual('10.0.0.3', ranges[0]['first_ip']) - self.assertEqual('10.0.0.32', ranges[0]['last_ip']) - self.assertEqual('10.0.0.34', ranges[1]['first_ip']) - self.assertEqual('10.0.0.254', ranges[1]['last_ip']) - # Limit test - last address in range - ranges = ipam_subnet._allocate_specific_ip( - self.ctx.session, '10.0.0.254') - self.assertEqual(2, len(ranges)) - ranges.sort(key=convert_firstip_to_ipaddress) - self.assertEqual('10.0.0.3', ranges[0]['first_ip']) - self.assertEqual('10.0.0.32', ranges[0]['last_ip']) - self.assertEqual('10.0.0.34', ranges[1]['first_ip']) - self.assertEqual('10.0.0.253', ranges[1]['last_ip']) - - def test__allocate_specific_ips_multiple_ranges(self): - cidr = '10.0.0.0/24' - ipam_subnet = self._create_and_allocate_ipam_subnet( - cidr, - allocation_pools=[{'start': '10.0.0.10', 'end': '10.0.0.19'}, - {'start': '10.0.0.30', 'end': '10.0.0.39'}])[0] - with self.ctx.session.begin(): - ranges = ipam_subnet._allocate_specific_ip( - self.ctx.session, '10.0.0.33') - self.assertEqual(3, len(ranges)) - # 10.0.0.1 should be allocated for gateway ip - ranges.sort(key=convert_firstip_to_ipaddress) - self.assertEqual('10.0.0.10', ranges[0]['first_ip']) - self.assertEqual('10.0.0.19', ranges[0]['last_ip']) - self.assertEqual('10.0.0.30', ranges[1]['first_ip']) - self.assertEqual('10.0.0.32', ranges[1]['last_ip']) - self.assertEqual('10.0.0.34', ranges[2]['first_ip']) - self.assertEqual('10.0.0.39', ranges[2]['last_ip']) - - def test__allocate_specific_ip_out_of_range(self): - cidr = '10.0.0.0/24' - subnet = self._create_subnet( - self.plugin, self.ctx, self.net_id, cidr) - subnet_req = ipam_req.SpecificSubnetRequest( - 'tenant_id', subnet['id'], cidr, gateway_ip=subnet['gateway_ip']) - ipam_subnet = self.ipam_pool.allocate_subnet(subnet_req) - with self.ctx.session.begin(): - ranges = ipam_subnet._allocate_specific_ip( - self.ctx.session, '192.168.0.1') - # In this case _allocate_specific_ips does not fail, but - # simply does not update availability ranges at all - self.assertEqual(1, len(ranges)) - # 10.0.0.1 should be allocated for gateway ip - ranges.sort(key=convert_firstip_to_ipaddress) - self.assertEqual('10.0.0.2', ranges[0]['first_ip']) - self.assertEqual('10.0.0.254', ranges[0]['last_ip']) - def _allocate_address(self, cidr, ip_version, address_request): ipam_subnet = self._create_and_allocate_ipam_subnet( cidr, ip_version=ip_version)[0] @@ -443,21 +373,6 @@ class TestNeutronDbIpamSubnet(testlib_api.SqlTestCase, 'tenant_id', 'meh', '192.168.0.0/24') self.ipam_pool.allocate_subnet(subnet_req) - def test__allocate_specific_ip_raises_exception(self): - cidr = '10.0.0.0/24' - ip = '10.0.0.15' - ipam_subnet = self._create_and_allocate_ipam_subnet(cidr)[0] - ipam_subnet.subnet_manager = mock.Mock() - ipam_subnet.subnet_manager.list_ranges_by_subnet_id.return_value = [{ - 'first_ip': '10.0.0.15', 'last_ip': '10.0.0.15'}] - ipam_subnet.subnet_manager.delete_range.return_value = 0 - - @ndb_api.retry_db_errors - def go(): - ipam_subnet._allocate_specific_ip(self.ctx.session, ip) - - self.assertRaises(ipam_exc.IPAllocationFailed, go) - def test_update_allocation_pools_with_no_pool_change(self): cidr = '10.0.0.0/24' ipam_subnet = self._create_and_allocate_ipam_subnet(