Add 'allocation_pools' to Quantum v2 API subnets.

This changeset adds a new parameter, 'allocation_pools' to the 'subnet'
resource of the Quantum v2 API.
Allows for creating and validation of subnets with multiple allocation pools.
This commit only deals with POST verb (subnet creation).
PUT verb (subnet update) will be dealt with in a subsequent commit.

This is the first commit for fixing bug #1016308

Change-Id: Ic356dcb5dcfa46af8ecc7e598529881f8bcbbeed
This commit is contained in:
Salvatore Orlando 2012-06-28 10:22:36 +01:00
parent 06900100ad
commit b99dde80e2
7 changed files with 365 additions and 86 deletions

View File

@ -31,7 +31,11 @@ FAULT_MAP = {exceptions.NotFound: webob.exc.HTTPNotFound,
exceptions.MacAddressGenerationFailure:
webob.exc.HTTPServiceUnavailable,
exceptions.StateInvalid: webob.exc.HTTPBadRequest,
exceptions.InvalidInput: webob.exc.HTTPBadRequest}
exceptions.InvalidInput: webob.exc.HTTPBadRequest,
exceptions.OverlappingAllocationPools: webob.exc.HTTPConflict,
exceptions.OutOfBoundsAllocationPool: webob.exc.HTTPBadRequest,
exceptions.InvalidAllocationPool: webob.exc.HTTPBadRequest,
}
def fields(request):

View File

@ -80,6 +80,9 @@ RESOURCE_ATTRIBUTE_MAP = {
'cidr': {'allow_post': True, 'allow_put': False},
'gateway_ip': {'allow_post': True, 'allow_put': True,
'default': ATTR_NOT_SPECIFIED},
#TODO(salvatore-orlando): Enable PUT on allocation_pools
'allocation_pools': {'allow_post': True, 'allow_put': False,
'default': ATTR_NOT_SPECIFIED},
'dns_namesevers': {'allow_post': True, 'allow_put': True,
'default': ATTR_NOT_SPECIFIED},
'additional_host_routes': {'allow_post': True, 'allow_put': True,

View File

@ -36,5 +36,5 @@ def network(network_data):
def subnet(subnet_data):
"""Represents a view for a subnet object"""
keys = ('id', 'network_id', 'tenant_id', 'gateway_ip', 'ip_version',
'cidr')
'cidr', 'allocation_pools')
return resource(subnet_data, keys)

View File

@ -127,6 +127,20 @@ class InvalidContentType(Invalid):
message = _("Invalid content type %(content_type)s.")
class InvalidAllocationPool(QuantumException):
message = _("The allocation pool %(pool)s is not valid.")
class OverlappingAllocationPools(QuantumException):
message = _("Found overlapping allocation pools:"
"%(pool_1)s %(pool_2)s for subnet %(subnet_cidr)s.")
class OutOfBoundsAllocationPool(QuantumException):
message = _("The allocation pool %(pool)s spans "
"beyond the subnet cidr %(subnet_cidr)s.")
class NotImplementedError(Error):
pass

View File

@ -167,26 +167,41 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
"""Return an IP address to the pool of free IP's on the network
subnet.
"""
range_qry = context.session.query(models_v2.IPAllocationRange)
# Grab all allocation pools for the subnet
pool_qry = context.session.query(models_v2.IPAllocationPool)
allocation_pools = pool_qry.filter_by(subnet_id=subnet_id).all()
# Find the allocation pool for the IP to recycle
pool_id = None
for allocation_pool in allocation_pools:
allocation_pool_range = netaddr.IPRange(
allocation_pool['first_ip'],
allocation_pool['last_ip'])
if netaddr.IPAddress(ip_address) in allocation_pool_range:
pool_id = allocation_pool['id']
break
if not pool_id:
error_message = ("No allocation pool found for "
"ip address:%s" % ip_address)
raise q_exc.InvalidInput(error_message=error_message)
# Two requests will be done on the database. The first will be to
# search if an entry starts with ip_address + 1 (r1). The second
# will be to see if an entry ends with ip_address -1 (r2).
# If 1 of the above holds true then the specific entry will be
# modified. If both hold true then the two ranges will be merged.
# If there are no entries then a single entry will be added.
range_qry = context.session.query(models_v2.IPAvailabilityRange)
ip_first = str(netaddr.IPAddress(ip_address) + 1)
ip_last = str(netaddr.IPAddress(ip_address) - 1)
LOG.debug("Recycle %s", ip_address)
try:
r1 = range_qry.filter_by(subnet_id=subnet_id,
r1 = range_qry.filter_by(allocation_pool_id=pool_id,
first_ip=ip_first).one()
LOG.debug("Recycle: first match for %s-%s", r1['first_ip'],
r1['last_ip'])
except exc.NoResultFound:
r1 = []
try:
r2 = range_qry.filter_by(subnet_id=subnet_id,
r2 = range_qry.filter_by(allocation_pool_id=pool_id,
last_ip=ip_last).one()
LOG.debug("Recycle: last match for %s-%s", r2['first_ip'],
r2['last_ip'])
@ -195,9 +210,10 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
if r1 and r2:
# Merge the two ranges
ip_range = models_v2.IPAllocationRange(subnet_id=subnet_id,
first_ip=r2['first_ip'],
last_ip=r1['last_ip'])
ip_range = models_v2.IPAvailabilityRange(
allocation_pool_id=pool_id,
first_ip=r2['first_ip'],
last_ip=r1['last_ip'])
context.session.add(ip_range)
LOG.debug("Recycle: merged %s-%s and %s-%s", r2['first_ip'],
r2['last_ip'], r1['first_ip'], r1['last_ip'])
@ -215,9 +231,10 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
r2['last_ip'])
else:
# Create a new range
ip_range = models_v2.IPAllocationRange(subnet_id=subnet_id,
first_ip=ip_address,
last_ip=ip_address)
ip_range = models_v2.IPAvailabilityRange(
allocation_pool_id=pool_id,
first_ip=ip_address,
last_ip=ip_address)
context.session.add(ip_range)
LOG.debug("Recycle: created new %s-%s", ip_address, ip_address)
@ -237,7 +254,9 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
The IP address will be generated from one of the subnets defined on
the network.
"""
range_qry = context.session.query(models_v2.IPAllocationRange)
range_qry = context.session.query(
models_v2.IPAvailabilityRange).join(
models_v2.IPAllocationPool)
for subnet in subnets:
range = range_qry.filter_by(subnet_id=subnet['id']).first()
if not range:
@ -262,9 +281,12 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
def _allocate_specific_ip(context, subnet_id, ip_address):
"""Allocate a specific IP address on the subnet."""
ip = int(netaddr.IPAddress(ip_address))
range_qry = context.session.query(models_v2.IPAllocationRange)
ranges = range_qry.filter_by(subnet_id=subnet_id).all()
for range in ranges:
range_qry = context.session.query(
models_v2.IPAvailabilityRange,
models_v2.IPAllocationPool).join(
models_v2.IPAllocationPool)
results = range_qry.filter_by(subnet_id=subnet_id).all()
for (range, pool) in results:
first = int(netaddr.IPAddress(range['first_ip']))
last = int(netaddr.IPAddress(range['last_ip']))
if first <= ip <= last:
@ -282,9 +304,10 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
new_first = str(netaddr.IPAddress(ip_address) + 1)
new_last = range['last_ip']
range['last_ip'] = str(netaddr.IPAddress(ip_address) - 1)
ip_range = models_v2.IPAllocationRange(subnet_id=subnet_id,
first_ip=new_first,
last_ip=new_last)
ip_range = models_v2.IPAvailabilityRange(
allocation_pool_id=pool['id'],
first_ip=new_first,
last_ip=new_last)
context.session.add(ip_range)
return
@ -459,6 +482,107 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
'subnet_id': result['subnet_id']})
return ips
def _validate_allocation_pools(self, ip_pools, gateway_ip, subnet_cidr):
"""Validate IP allocation pools.
Verify start and end address for each allocation pool are valid,
ie: constituted by valid and appropriately ordered IP addresses.
Also, verify pools do not overlap among themselves and with the
gateway IP. Finally, verify that each range, and the gateway IP,
fall within the subnet's CIDR.
"""
subnet = netaddr.IPNetwork(subnet_cidr)
subnet_first_ip = netaddr.IPAddress(subnet.first + 1)
subnet_last_ip = netaddr.IPAddress(subnet.last - 1)
LOG.debug("Performing IP validity checks on allocation pools")
ip_sets = []
for ip_pool in ip_pools:
try:
start_ip = netaddr.IPAddress(ip_pool['start'])
end_ip = netaddr.IPAddress(ip_pool['end'])
except netaddr.AddrFormatError:
LOG.error("Found invalid IP address in pool: %s - %s:",
ip_pool['start'],
ip_pool['end'])
raise q_exc.InvalidAllocationPool(pool=ip_pool)
if (start_ip.version != subnet.version or
end_ip.version != subnet.version):
LOG.error("Specified IP addresses do not match "
"the subnet IP version")
raise q_exc.InvalidAllocationPool(pool=ip_pool)
if end_ip < start_ip:
LOG.error("Start IP (%s) is greater than end IP (%s)",
ip_pool['start'],
ip_pool['end'])
raise q_exc.InvalidAllocationPool(pool=ip_pool)
if start_ip < subnet_first_ip or end_ip > subnet_last_ip:
LOG.error("Found pool larger than subnet CIDR:%s - %s",
ip_pool['start'],
ip_pool['end'])
raise q_exc.OutOfBoundsAllocationPool(
pool=ip_pool,
subnet_cidr=subnet_cidr)
# Valid allocation pool
# Create an IPSet for it for easily verifying overlaps
ip_sets.append(netaddr.IPSet(netaddr.IPRange(
ip_pool['start'],
ip_pool['end']).cidrs()))
LOG.debug("Checking for overlaps among allocation pools "
"and gateway ip")
ip_ranges = ip_pools[:]
# Treat gw as IPset as well
ip_ranges.append(gateway_ip)
ip_sets.append(netaddr.IPSet([gateway_ip]))
# Use integer cursors as an efficient way for implementing
# comparison and avoiding comparing the same pair twice
for l_cursor in range(len(ip_sets)):
for r_cursor in range(l_cursor + 1, len(ip_sets)):
if ip_sets[l_cursor] & ip_sets[r_cursor]:
l_range = ip_ranges[l_cursor]
r_range = ip_ranges[r_cursor]
LOG.error("Found overlapping ranges: %s and %s",
l_range, r_range)
raise q_exc.OverlappingAllocationPools(
pool_1=l_range,
pool_2=r_range,
subnet_cidr=subnet_cidr)
def _allocate_pools_for_subnet(self, context, subnet):
"""Create IP allocation pools for a given subnet
Pools are defined by the 'allocation_pools' attribute,
a list of dict objects with 'start' and 'end' keys for
defining the pool range.
"""
pools = []
if subnet['allocation_pools'] == api_router.ATTR_NOT_SPECIFIED:
# Auto allocate the pool around gateway
gw_ip = int(netaddr.IPAddress(subnet['gateway_ip']))
net = netaddr.IPNetwork(subnet['cidr'])
first_ip = net.first + 1
last_ip = net.last - 1
if gw_ip > first_ip:
pools.append({'start': str(netaddr.IPAddress(first_ip)),
'end': str(netaddr.IPAddress(gw_ip - 1))})
if gw_ip < last_ip:
pools.append({'start': str(netaddr.IPAddress(gw_ip + 1)),
'end': str(netaddr.IPAddress(last_ip))})
# return auto-generated pools
# no need to check for their validity
return pools
else:
pools = subnet['allocation_pools']
self._validate_allocation_pools(pools,
subnet['gateway_ip'],
subnet['cidr'])
return pools
def _make_network_dict(self, network, fields=None):
res = {'id': network['id'],
'name': network['name'],
@ -475,6 +599,9 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
'network_id': subnet['network_id'],
'ip_version': subnet['ip_version'],
'cidr': subnet['cidr'],
'allocation_pools': [{'start': pool['first_ip'],
'end': pool['last_ip']}
for pool in subnet['allocation_pools']],
'gateway_ip': subnet['gateway_ip']}
return self._fields(res, fields)
@ -542,27 +669,6 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
if s['gateway_ip'] == api_router.ATTR_NOT_SPECIFIED:
s['gateway_ip'] = str(netaddr.IPAddress(net.first + 1))
ip = netaddr.IPAddress(s['gateway_ip'])
# Get the first and last indices for the subnet
ranges = []
# Gateway is the first address in the range
if ip == net.network + 1:
range = {'first': str(ip + 1),
'last': str(net.broadcast - 1)}
ranges.append(range)
# Gateway is the last address in the range
elif ip == net.broadcast - 1:
range = {'first': str(net.network + 1),
'last': str(ip - 1)}
ranges.append(range)
# Gateway is on IP in the subnet
else:
range = {'first': str(net.network + 1),
'last': str(ip - 1)}
ranges.append(range)
range = {'first': str(ip + 1),
'last': str(net.broadcast - 1)}
ranges.append(range)
with context.session.begin():
network = self._get_network(context, s["network_id"])
subnet = models_v2.Subnet(network_id=s['network_id'],
@ -570,12 +676,16 @@ class QuantumDbPluginV2(quantum_plugin_base_v2.QuantumPluginBaseV2):
cidr=s['cidr'],
gateway_ip=s['gateway_ip'])
context.session.add(subnet)
with context.session.begin():
for range in ranges:
ip_range = models_v2.IPAllocationRange(subnet_id=subnet.id,
first_ip=range['first'],
last_ip=range['last'])
pools = self._allocate_pools_for_subnet(context, s)
for pool in pools:
ip_pool = models_v2.IPAllocationPool(subnet=subnet,
first_ip=pool['start'],
last_ip=pool['end'])
context.session.add(ip_pool)
ip_range = models_v2.IPAvailabilityRange(
ipallocationpool=ip_pool,
first_ip=pool['start'],
last_ip=pool['end'])
context.session.add(ip_range)
return self._make_subnet_dict(subnet)

View File

@ -31,20 +31,42 @@ class HasId(object):
id = sa.Column(sa.String(36), primary_key=True, default=utils.str_uuid)
class IPAllocationRange(model_base.BASEV2, HasId):
"""Internal representation of a free IP address range in a Quantum
subnet. The range of available ips is [first_ip..last_ip]. The
allocation retrieves the first entry from the range. If the first
entry is equal to the last entry then this row will be deleted.
class IPAvailabilityRange(model_base.BASEV2):
"""Internal representation of available IPs for Quantum subnets.
Allocation - first entry from the range will be allocated.
If the first entry is equal to the last entry then this row
will be deleted.
Recycling ips involves appending to existing ranges. This is
only done if the range is contiguous. If not, the first_ip will be
the same as the last_ip. When adjacent ips are recycled the ranges
will be merged.
"""
allocation_pool_id = sa.Column(sa.String(36),
sa.ForeignKey('ipallocationpools.id'),
nullable=True,
primary_key=True)
first_ip = sa.Column(sa.String(64), nullable=False, primary_key=True)
last_ip = sa.Column(sa.String(64), nullable=False, primary_key=True)
def __repr__(self):
return "%s - %s" % (self.first_ip, self.last_ip)
class IPAllocationPool(model_base.BASEV2, HasId):
"""Representation of an allocation pool in a Quantum subnet."""
subnet_id = sa.Column(sa.String(36), sa.ForeignKey('subnets.id'),
nullable=True)
first_ip = sa.Column(sa.String(64), nullable=False)
last_ip = sa.Column(sa.String(64), nullable=False)
available_ranges = orm.relationship(IPAvailabilityRange,
backref='ipallocationpool',
lazy="dynamic")
def __repr__(self):
return "%s - %s" % (self.first_ip, self.last_ip)
class IPAllocation(model_base.BASEV2):
@ -80,10 +102,11 @@ class Subnet(model_base.BASEV2, HasId):
ip_version = sa.Column(sa.Integer, nullable=False)
cidr = sa.Column(sa.String(64), nullable=False)
gateway_ip = sa.Column(sa.String(64))
allocation_pools = orm.relationship(IPAllocationPool,
backref='subnet',
lazy="dynamic")
#TODO(danwent):
# - dns_namservers
# - excluded_ranges
# - additional_routes

View File

@ -18,7 +18,8 @@ import logging
import mock
import os
import random
import unittest
import unittest2
import webob.exc
import quantum
from quantum.api.v2.router import APIRouter
@ -40,7 +41,7 @@ def etcdir(*p):
return os.path.join(ETCDIR, *p)
class QuantumDbPluginV2TestCase(unittest.TestCase):
class QuantumDbPluginV2TestCase(unittest2.TestCase):
def setUp(self):
super(QuantumDbPluginV2TestCase, self).setUp()
@ -111,13 +112,15 @@ class QuantumDbPluginV2TestCase(unittest.TestCase):
network_req = self.new_create_request('networks', data, fmt)
return network_req.get_response(self.api)
def _create_subnet(self, fmt, net_id, gateway_ip, cidr, ip_version=4):
def _create_subnet(self, fmt, net_id, gateway_ip, cidr,
allocation_pools=None, ip_version=4):
data = {'subnet': {'network_id': net_id,
'cidr': cidr,
'ip_version': ip_version}}
if gateway_ip:
data['subnet']['gateway_ip'] = gateway_ip
if allocation_pools:
data['subnet']['allocation_pools'] = allocation_pools
subnet_req = self.new_create_request('subnets', data, fmt)
return subnet_req.get_response(self.api)
@ -127,15 +130,25 @@ class QuantumDbPluginV2TestCase(unittest.TestCase):
data = {'port': {'network_id': net_id,
'tenant_id': self._tenant_id}}
for arg in ('admin_state_up', 'device_id', 'mac_address', 'fixed_ips'):
if arg in kwargs:
# Arg must be present and not empty
if arg in kwargs and kwargs[arg]:
data['port'][arg] = kwargs[arg]
port_req = self.new_create_request('ports', data, fmt)
return port_req.get_response(self.api)
def _make_subnet(self, fmt, network, gateway, cidr, ip_version=4):
res = self._create_subnet(fmt, network['network']['id'],
gateway, cidr, ip_version)
def _make_subnet(self, fmt, network, gateway, cidr,
allocation_pools=None, ip_version=4):
res = self._create_subnet(fmt,
network['network']['id'],
gateway,
cidr,
allocation_pools=allocation_pools,
ip_version=ip_version)
# Things can go wrong - raise HTTP exc with res code only
# so it can be caught by unit tests
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(fmt, res)
def _make_port(self, fmt, net_id, **kwargs):
@ -154,30 +167,46 @@ class QuantumDbPluginV2TestCase(unittest.TestCase):
self._delete('networks', network['network']['id'])
@contextlib.contextmanager
def subnet(self, network=None, gateway=None,
cidr='10.0.0.0/24', fmt='json'):
def subnet(self, network=None,
gateway_ip=None,
cidr='10.0.0.0/24',
fmt='json',
ip_version=4,
allocation_pools=None):
# TODO(anyone) DRY this
# NOTE(salvatore-orlando): we can pass the network object
# to gen function anyway, and then avoid the repetition
if not network:
with self.network() as network:
subnet = self._make_subnet(fmt, network, gateway, cidr)
subnet = self._make_subnet(fmt,
network,
gateway_ip,
cidr,
allocation_pools,
ip_version)
yield subnet
self._delete('subnets', subnet['subnet']['id'])
else:
subnet = self._make_subnet(fmt, network, gateway, cidr)
subnet = self._make_subnet(fmt,
network,
gateway_ip,
cidr,
allocation_pools,
ip_version)
yield subnet
self._delete('subnets', subnet['subnet']['id'])
@contextlib.contextmanager
def port(self, subnet=None, fmt='json'):
def port(self, subnet=None, fixed_ips=None, fmt='json'):
if not subnet:
with self.subnet() as subnet:
net_id = subnet['subnet']['network_id']
port = self._make_port(fmt, net_id)
port = self._make_port(fmt, net_id, fixed_ips=fixed_ips)
yield port
self._delete('ports', port['port']['id'])
else:
net_id = subnet['subnet']['network_id']
port = self._make_port(fmt, net_id)
port = self._make_port(fmt, net_id, fixed_ips=fixed_ips)
yield port
self._delete('ports', port['port']['id'])
@ -434,7 +463,7 @@ class TestPortsV2(QuantumDbPluginV2TestCase):
admin_status_up=True)
network2 = self.deserialize(fmt, res)
subnet2 = self._make_subnet(fmt, network2, "1.1.1.1",
"1.1.1.0/24", 4)
"1.1.1.0/24", ip_version=4)
net_id = port['port']['network_id']
# Request a IP from specific subnet
kwargs = {"fixed_ips": [{'subnet_id':
@ -475,7 +504,7 @@ class TestPortsV2(QuantumDbPluginV2TestCase):
def test_range_allocation(self):
fmt = 'json'
with self.subnet(gateway='10.0.0.3',
with self.subnet(gateway_ip='10.0.0.3',
cidr='10.0.0.0/29') as subnet:
kwargs = {"fixed_ips":
[{'subnet_id': subnet['subnet']['id']},
@ -494,7 +523,7 @@ class TestPortsV2(QuantumDbPluginV2TestCase):
self.assertEquals(ips[i]['ip_address'], alloc[i])
self.assertEquals(ips[i]['subnet_id'],
subnet['subnet']['id'])
with self.subnet(gateway='11.0.0.6',
with self.subnet(gateway_ip='11.0.0.6',
cidr='11.0.0.0/29') as subnet:
kwargs = {"fixed_ips":
[{'subnet_id': subnet['subnet']['id']},
@ -672,24 +701,120 @@ class TestNetworksV2(QuantumDbPluginV2TestCase):
class TestSubnetsV2(QuantumDbPluginV2TestCase):
def _test_create_subnet(self, **kwargs):
keys = kwargs.copy()
keys.setdefault('cidr', '10.0.0.0/24')
keys.setdefault('ip_version', 4)
with self.subnet(**keys) as subnet:
# verify the response has each key with the correct value
for k in keys:
self.assertIn(k, subnet['subnet'])
self.assertEquals(subnet['subnet'][k], keys[k])
return subnet
def test_create_subnet(self):
gateway = '10.0.0.1'
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
keys = [('ip_version', 4), ('gateway_ip', gateway),
('cidr', cidr)]
with self.subnet(gateway=gateway, cidr=cidr) as subnet:
for k, v in keys:
self.assertEquals(subnet['subnet'][k], v)
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr)
def test_create_subnet_defaults(self):
generated_gateway = '10.0.0.1'
gateway = '10.0.0.1'
cidr = '10.0.0.0/24'
keys = [('ip_version', 4), ('gateway_ip', generated_gateway),
('cidr', cidr)]
# intentionally not passing gateway in
with self.subnet(cidr=cidr) as subnet:
for k, v in keys:
self.assertEquals(subnet['subnet'][k], v)
allocation_pools = [{'start': '10.0.0.2',
'end': '10.0.0.254'}]
subnet = self._test_create_subnet()
# verify cidr & gw have been correctly generated
self.assertEquals(subnet['subnet']['cidr'], cidr)
self.assertEquals(subnet['subnet']['gateway_ip'], gateway)
self.assertEquals(subnet['subnet']['allocation_pools'],
allocation_pools)
def test_create_subnet_with_allocation_pool(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.2',
'end': '10.0.0.100'}]
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
allocation_pools=allocation_pools)
def test_create_subnet_with_v6_allocation_pool(self):
gateway_ip = 'fe80::1'
cidr = 'fe80::0/80'
allocation_pools = [{'start': 'fe80::2',
'end': 'fe80::ffff:fffa:ffff'}]
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
allocation_pools=allocation_pools)
def test_create_subnet_with_large_allocation_pool(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/8'
allocation_pools = [{'start': '10.0.0.2',
'end': '10.0.0.100'},
{'start': '10.1.0.0',
'end': '10.200.0.100'}]
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
allocation_pools=allocation_pools)
def test_create_subnet_multiple_allocation_pools(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.2',
'end': '10.0.0.100'},
{'start': '10.0.0.110',
'end': '10.0.0.150'}]
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
allocation_pools=allocation_pools)
def test_create_subnet_gateway_in_allocation_pool_returns_409(self):
gateway_ip = '10.0.0.50'
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.1',
'end': '10.0.0.100'}]
with self.assertRaises(webob.exc.HTTPClientError) as ctx_manager:
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
allocation_pools=allocation_pools)
self.assertEquals(ctx_manager.exception.code, 409)
def test_create_subnet_overlapping_allocation_pools_returns_409(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.2',
'end': '10.0.0.150'},
{'start': '10.0.0.140',
'end': '10.0.0.180'}]
with self.assertRaises(webob.exc.HTTPClientError) as ctx_manager:
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
allocation_pools=allocation_pools)
self.assertEquals(ctx_manager.exception.code, 409)
def test_create_subnet_invalid_allocation_pool_returns_400(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.2',
'end': '10.0.0.256'}]
with self.assertRaises(webob.exc.HTTPClientError) as ctx_manager:
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
allocation_pools=allocation_pools)
self.assertEquals(ctx_manager.exception.code, 400)
def test_create_subnet_out_of_range_allocation_pool_returns_400(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.2',
'end': '10.0.1.6'}]
with self.assertRaises(webob.exc.HTTPClientError) as ctx_manager:
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr,
allocation_pools=allocation_pools)
self.assertEquals(ctx_manager.exception.code, 400)
def test_update_subnet(self):
with self.subnet() as subnet:
@ -715,9 +840,9 @@ class TestSubnetsV2(QuantumDbPluginV2TestCase):
# NOTE(jkoelker) This would be a good place to use contextlib.nested
# or just drop 2.6 support ;)
with self.network() as network:
with self.subnet(network=network, gateway='10.0.0.1',
with self.subnet(network=network, gateway_ip='10.0.0.1',
cidr='10.0.1.0/24') as subnet:
with self.subnet(network=network, gateway='10.0.1.1',
with self.subnet(network=network, gateway_ip='10.0.1.1',
cidr='10.0.1.0/24') as subnet2:
req = self.new_list_request('subnets')
res = self.deserialize('json',