Functional test IPAM DB operation

Add a functional test to check that the IPAM logic is correct and
that the data stored in the DB are sane.

Partial-bug: #1331564
Change-Id: I7776a50824184663abd37e210d08a63eb5519cea
This commit is contained in:
rossella 2014-09-23 11:02:20 +02:00
parent c6eed9564b
commit c86b8f2248
1 changed files with 222 additions and 0 deletions

View File

@ -0,0 +1,222 @@
# Copyright 2015 SUSE Linux Products GmbH
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from oslo.db.sqlalchemy import session
from oslo.db.sqlalchemy import test_base
import testtools
from neutron.api.v2 import attributes
from neutron.common import constants
from neutron.common import exceptions as n_exc
from neutron import context
from neutron.db import db_base_plugin_v2 as base_plugin
from neutron.db import model_base
from neutron.db import models_v2
from neutron.tests import base
def get_admin_test_context(db_url):
"""
get_admin_test_context is used to provide a test context. A new session is
created using the db url specified
"""
ctx = context.Context(user_id=None,
tenant_id=None,
is_admin=True,
read_deleted="no",
load_admin_roles=True,
overwrite=False)
facade = session.EngineFacade(db_url, mysql_sql_mode='STRICT_ALL_TABLES')
ctx._session = facade.get_session(autocommit=False, expire_on_commit=True)
return ctx
class IpamTestCase(object):
"""
Base class for tests that aim to test ip allocation.
"""
def configure_test(self):
model_base.BASEV2.metadata.create_all(self.engine)
cfg.CONF.set_override('notify_nova_on_port_status_changes', False)
self.plugin = base_plugin.NeutronDbPluginV2()
self.cxt = get_admin_test_context(self.engine.url)
self.addCleanup(self.cxt._session.close)
self.tenant_id = 'test_tenant'
self.network_id = 'test_net_id'
self.subnet_id = 'test_sub_id'
self.port_id = 'test_p_id'
self._create_network()
self._create_subnet()
def result_set_to_dicts(self, resultset, keys):
dicts = []
for item in resultset:
item_dict = dict((x, item[x]) for x in keys)
dicts.append(item_dict)
return dicts
def assert_ip_alloc_matches(self, expected):
result_set = self.cxt.session.query(models_v2.IPAllocation).all()
keys = ['port_id', 'ip_address', 'subnet_id', 'network_id']
actual = self.result_set_to_dicts(result_set, keys)
self.assertEqual(expected, actual)
def assert_ip_avail_range_matches(self, expected):
result_set = self.cxt.session.query(
models_v2.IPAvailabilityRange).all()
keys = ['first_ip', 'last_ip']
actual = self.result_set_to_dicts(result_set, keys)
self.assertEqual(expected, actual)
def assert_ip_alloc_pool_matches(self, expected):
result_set = self.cxt.session.query(models_v2.IPAllocationPool).all()
keys = ['first_ip', 'last_ip', 'subnet_id']
actual = self.result_set_to_dicts(result_set, keys)
self.assertEqual(expected, actual)
def _create_network(self):
network = {'tenant_id': self.tenant_id,
'id': self.network_id,
'name': 'test-net',
'admin_state_up': True,
'shared': False,
'status': constants.NET_STATUS_ACTIVE}
return self.plugin.create_network(self.cxt, {'network': network})
def _create_subnet(self):
subnet = {'tenant_id': self.tenant_id,
'id': self.subnet_id,
'name': 'test_sub',
'network_id': self.network_id,
'ip_version': 4,
'cidr': '10.10.10.0/29',
'enable_dhcp': False,
'gateway_ip': '10.10.10.1',
'shared': False,
'allocation_pools': attributes.ATTR_NOT_SPECIFIED,
'dns_nameservers': attributes.ATTR_NOT_SPECIFIED,
'host_routes': attributes.ATTR_NOT_SPECIFIED}
return self.plugin.create_subnet(self.cxt, {'subnet': subnet})
def _create_port(self, port_id, fixed_ips=None):
port_fixed_ips = (fixed_ips if fixed_ips else
attributes.ATTR_NOT_SPECIFIED)
port = {'tenant_id': self.tenant_id,
'name': 'test_port',
'id': port_id,
'network_id': self.network_id,
'mac_address': attributes.ATTR_NOT_SPECIFIED,
'admin_state_up': True,
'status': constants.PORT_STATUS_ACTIVE,
'device_id': 'test_dev_id',
'device_owner': 'compute',
'fixed_ips': port_fixed_ips}
self.plugin.create_port(self.cxt, {'port': port})
def test_allocate_fixed_ip(self):
fixed_ip = [{'ip_address': "10.10.10.3", 'subnet_id': self.subnet_id}]
self._create_port(self.port_id, fixed_ip)
ip_alloc_expected = [{'port_id': self.port_id,
'ip_address': fixed_ip[0].get('ip_address'),
'subnet_id': self.subnet_id,
'network_id': self.network_id}]
ip_avail_ranges_expected = [{'first_ip': '10.10.10.2',
'last_ip': '10.10.10.2'},
{'first_ip': '10.10.10.4',
'last_ip': '10.10.10.6'}]
ip_alloc_pool_expected = [{'first_ip': '10.10.10.2',
'last_ip': '10.10.10.6',
'subnet_id': self.subnet_id}]
self.assert_ip_alloc_matches(ip_alloc_expected)
self.assert_ip_alloc_pool_matches(ip_alloc_pool_expected)
self.assert_ip_avail_range_matches(
ip_avail_ranges_expected)
def test_allocate_first_available_ip(self):
self._create_port(self.port_id)
ip_alloc_expected = [{'port_id': self.port_id,
'ip_address': '10.10.10.2',
'subnet_id': self.subnet_id,
'network_id': self.network_id}]
ip_avail_ranges_expected = [{'first_ip': '10.10.10.3',
'last_ip': '10.10.10.6'}]
ip_alloc_pool_expected = [{'first_ip': '10.10.10.2',
'last_ip': '10.10.10.6',
'subnet_id': self.subnet_id}]
self.assert_ip_alloc_matches(ip_alloc_expected)
self.assert_ip_alloc_pool_matches(ip_alloc_pool_expected)
self.assert_ip_avail_range_matches(
ip_avail_ranges_expected)
def test_allocate_ip_exausted_pool(self):
# available from .2 up to .6 -> 5
for i in range(1, 6):
self._create_port(self.port_id + str(i))
ip_avail_ranges_expected = []
ip_alloc_pool_expected = [{'first_ip': '10.10.10.2',
'last_ip': '10.10.10.6',
'subnet_id': self.subnet_id}]
self.assert_ip_alloc_pool_matches(ip_alloc_pool_expected)
self.assert_ip_avail_range_matches(
ip_avail_ranges_expected)
# Create another port
with testtools.ExpectedException(n_exc.IpAddressGenerationFailure):
self._create_port(self.port_id)
def test_rebuild_availability_range(self):
for i in range(1, 6):
self._create_port(self.port_id + str(i))
ip_avail_ranges_expected = []
ip_alloc_pool_expected = [{'first_ip': '10.10.10.2',
'last_ip': '10.10.10.6',
'subnet_id': self.subnet_id}]
self.assert_ip_alloc_pool_matches(ip_alloc_pool_expected)
self.assert_ip_avail_range_matches(
ip_avail_ranges_expected)
# Delete some ports, this will free the first two IPs
for i in range(1, 3):
self.plugin.delete_port(self.cxt, self.port_id + str(i))
# Create another port, this will trigger the rebuilding of the
# availability ranges
self._create_port(self.port_id)
ip_avail_ranges_expected = [{'first_ip': '10.10.10.3',
'last_ip': '10.10.10.3'}]
ip_alloc = self.cxt.session.query(models_v2.IPAllocation).all()
self.assertEqual(4, len(ip_alloc))
self.assert_ip_alloc_pool_matches(ip_alloc_pool_expected)
self.assert_ip_avail_range_matches(
ip_avail_ranges_expected)
class TestIpamMySql(test_base.MySQLOpportunisticTestCase, base.BaseTestCase,
IpamTestCase):
def setUp(self):
super(TestIpamMySql, self).setUp()
self.configure_test()
class TestIpamPsql(test_base.PostgreSQLOpportunisticTestCase,
base.BaseTestCase, IpamTestCase):
def setUp(self):
super(TestIpamPsql, self).setUp()
self.configure_test()