ML2: Add tests to validate quota usage tracking

Ensure that event handlers are invoked upon completion of
ML2 operations which add or remove tracked resources.
Also validate that the event handlers are called for the
appropriate resources and that quota usage's dirty bit
is set and unset as expected.

These are not unit tests, but added in the unit test tree
as they leverage code both from the DB unit test and the ML2
unit test framework. This module has indeed been added to
the 'exclusion list' in check_unit_test_structure.sh, and
should be moved to the functional test tree together with
the other modules.

Closes-Bug: #1499358

Change-Id: I78c432c35f3f3339607cd533019ae6d0fa2a5cd6
This commit is contained in:
Salvatore Orlando 2015-09-21 16:40:55 -07:00
parent ad26d26987
commit cdd049e4c4
3 changed files with 303 additions and 3 deletions

View File

@ -18,8 +18,9 @@ from oslo_db import exception as oslo_db_exception
from oslo_log import log
from oslo_utils import excutils
from sqlalchemy import event
from sqlalchemy import exc as sql_exc
from neutron._i18n import _LE
from neutron._i18n import _LE, _LW
from neutron.db import api as db_api
from neutron.db.quota import api as quota_api
@ -300,5 +301,11 @@ class TrackedResource(BaseResource):
event.listen(self._model_class, 'after_delete', self._db_event_handler)
def unregister_events(self):
event.remove(self._model_class, 'after_insert', self._db_event_handler)
event.remove(self._model_class, 'after_delete', self._db_event_handler)
try:
event.remove(self._model_class, 'after_insert',
self._db_event_handler)
event.remove(self._model_class, 'after_delete',
self._db_event_handler)
except sql_exc.InvalidRequestError:
LOG.warning(_LW("No sqlalchemy event for resource %s found"),
self.name)

View File

@ -0,0 +1,292 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_utils import uuidutils
from neutron import context
from neutron.db.quota import api as quota_db_api
from neutron.tests.unit.extensions import test_securitygroup
from neutron.tests.unit.plugins.ml2 import test_plugin
class SgTestCaseWrapper(test_securitygroup.SecurityGroupDBTestCase):
# This wrapper class enables Ml2PluginV2TestCase to correctly call the
# setup method in SecurityGroupDBTestCase which does not accept the
# service_plugins keyword parameter.
def setUp(self, plugin, **kwargs):
super(SgTestCaseWrapper, self).setUp(plugin)
class BaseTestTrackedResources(test_plugin.Ml2PluginV2TestCase,
SgTestCaseWrapper):
def setUp(self):
self.ctx = context.get_admin_context()
# Prevent noise from default security group operations
def_sec_group_patch = mock.patch(
'neutron.db.securitygroups_db.SecurityGroupDbMixin.'
'_ensure_default_security_group')
def_sec_group_patch.start()
get_sec_group_port_patch = mock.patch(
'neutron.db.securitygroups_db.SecurityGroupDbMixin.'
'_get_security_groups_on_port')
get_sec_group_port_patch.start()
super(BaseTestTrackedResources, self).setUp()
self._tenant_id = uuidutils.generate_uuid()
def _test_init(self, resource_name):
quota_db_api.set_quota_usage(
self.ctx, resource_name, self._tenant_id)
class TestTrackedResourcesEventHandler(BaseTestTrackedResources):
def setUp(self):
handler_patch = mock.patch(
'neutron.quota.resource.TrackedResource._db_event_handler')
self.handler_mock = handler_patch.start()
super(TestTrackedResourcesEventHandler, self).setUp()
def _verify_event_handler_calls(self, data, expected_call_count=1):
if not hasattr(data, '__iter__') or isinstance(data, dict):
data = [data]
self.assertEqual(expected_call_count, self.handler_mock.call_count)
call_idx = -1
for item in data:
if item:
model = self.handler_mock.call_args_list[call_idx][0][-1]
self.assertEqual(model['id'], item['id'])
self.assertEqual(model['tenant_id'], item['tenant_id'])
call_idx = call_idx - 1
def test_create_delete_network_triggers_event(self):
self._test_init('network')
net = self._make_network('json', 'meh', True)['network']
self._verify_event_handler_calls(net)
self._delete('networks', net['id'])
self._verify_event_handler_calls(net, expected_call_count=2)
def test_create_delete_port_triggers_event(self):
self._test_init('port')
net = self._make_network('json', 'meh', True)['network']
port = self._make_port('json', net['id'])['port']
# Expecting 2 calls - 1 for the network, 1 for the port
self._verify_event_handler_calls(port, expected_call_count=2)
self._delete('ports', port['id'])
self._verify_event_handler_calls(port, expected_call_count=3)
def test_create_delete_subnet_triggers_event(self):
self._test_init('subnet')
net = self._make_network('json', 'meh', True)
subnet = self._make_subnet('json', net, '10.0.0.1',
'10.0.0.0/24')['subnet']
# Expecting 2 calls - 1 for the network, 1 for the subnet
self._verify_event_handler_calls([subnet, net['network']],
expected_call_count=2)
self._delete('subnets', subnet['id'])
self._verify_event_handler_calls(subnet, expected_call_count=3)
def test_create_delete_network_with_subnet_triggers_event(self):
self._test_init('network')
self._test_init('subnet')
net = self._make_network('json', 'meh', True)
subnet = self._make_subnet('json', net, '10.0.0.1',
'10.0.0.0/24')['subnet']
# Expecting 2 calls - 1 for the network, 1 for the subnet
self._verify_event_handler_calls([subnet, net['network']],
expected_call_count=2)
self._delete('networks', net['network']['id'])
# Expecting 2 more calls - 1 for the network, 1 for the subnet
self._verify_event_handler_calls([net['network'], subnet],
expected_call_count=4)
def test_create_delete_subnetpool_triggers_event(self):
self._test_init('subnetpool')
pool = self._make_subnetpool('json', ['10.0.0.0/8'],
name='meh',
tenant_id=self._tenant_id)['subnetpool']
self._verify_event_handler_calls(pool)
self._delete('subnetpools', pool['id'])
self._verify_event_handler_calls(pool, expected_call_count=2)
def test_create_delete_securitygroup_triggers_event(self):
self._test_init('security_group')
sec_group = self._make_security_group(
'json', 'meh', 'meh', tenant_id=self._tenant_id)['security_group']
# When a security group is created it also creates 2 rules, therefore
# there will be three calls and we need to verify the first
self._verify_event_handler_calls([None, None, sec_group],
expected_call_count=3)
self._delete('security-groups', sec_group['id'])
# When a security group is deleted it also removes the 2 rules
# generated upon creation
self._verify_event_handler_calls(sec_group, expected_call_count=6)
def test_create_delete_securitygrouprule_triggers_event(self):
self._test_init('security_group_rule')
sec_group = self._make_security_group(
'json', 'meh', 'meh', tenant_id=self._tenant_id)['security_group']
rule_req = self._build_security_group_rule(
sec_group['id'], 'ingress', 'TCP', tenant_id=self._tenant_id)
sec_group_rule = self._make_security_group_rule(
'json', rule_req)['security_group_rule']
# When a security group is created it also creates 2 rules, therefore
# there will be four calls in total to the event handler
self._verify_event_handler_calls(sec_group_rule, expected_call_count=4)
self._delete('security-group-rules', sec_group_rule['id'])
self._verify_event_handler_calls(sec_group_rule, expected_call_count=5)
class TestTrackedResources(BaseTestTrackedResources):
def _verify_dirty_bit(self, resource_name, expected_value=True):
usage = quota_db_api.get_quota_usage_by_resource_and_tenant(
self.ctx, resource_name, self._tenant_id)
self.assertEqual(expected_value, usage.dirty)
def test_create_delete_network_marks_dirty(self):
self._test_init('network')
net = self._make_network('json', 'meh', True)['network']
self._verify_dirty_bit('network')
# Clear the dirty bit
quota_db_api.set_quota_usage_dirty(
self.ctx, 'network', self._tenant_id, dirty=False)
self._delete('networks', net['id'])
self._verify_dirty_bit('network')
def test_list_networks_clears_dirty(self):
self._test_init('network')
net = self._make_network('json', 'meh', True)['network']
self.ctx.tenant_id = net['tenant_id']
self._list('networks', neutron_context=self.ctx)
self._verify_dirty_bit('network', expected_value=False)
def test_create_delete_port_marks_dirty(self):
self._test_init('port')
net = self._make_network('json', 'meh', True)['network']
port = self._make_port('json', net['id'])['port']
self._verify_dirty_bit('port')
# Clear the dirty bit
quota_db_api.set_quota_usage_dirty(
self.ctx, 'port', self._tenant_id, dirty=False)
self._delete('ports', port['id'])
self._verify_dirty_bit('port')
def test_list_ports_clears_dirty(self):
self._test_init('port')
net = self._make_network('json', 'meh', True)['network']
port = self._make_port('json', net['id'])['port']
self.ctx.tenant_id = port['tenant_id']
self._list('ports', neutron_context=self.ctx)
self._verify_dirty_bit('port', expected_value=False)
def test_create_delete_subnet_marks_dirty(self):
self._test_init('subnet')
net = self._make_network('json', 'meh', True)
subnet = self._make_subnet('json', net, '10.0.0.1',
'10.0.0.0/24')['subnet']
self._verify_dirty_bit('subnet')
# Clear the dirty bit
quota_db_api.set_quota_usage_dirty(
self.ctx, 'subnet', self._tenant_id, dirty=False)
self._delete('subnets', subnet['id'])
self._verify_dirty_bit('subnet')
def test_create_delete_network_with_subnet_marks_dirty(self):
self._test_init('network')
self._test_init('subnet')
net = self._make_network('json', 'meh', True)
self._make_subnet('json', net, '10.0.0.1',
'10.0.0.0/24')['subnet']
self._verify_dirty_bit('subnet')
# Clear the dirty bit
quota_db_api.set_quota_usage_dirty(
self.ctx, 'subnet', self._tenant_id, dirty=False)
self._delete('networks', net['network']['id'])
self._verify_dirty_bit('network')
self._verify_dirty_bit('subnet')
def test_list_subnets_clears_dirty(self):
self._test_init('subnet')
net = self._make_network('json', 'meh', True)
subnet = self._make_subnet('json', net, '10.0.0.1',
'10.0.0.0/24')['subnet']
self.ctx.tenant_id = subnet['tenant_id']
self._list('subnets', neutron_context=self.ctx)
self._verify_dirty_bit('subnet', expected_value=False)
def test_create_delete_subnetpool_marks_dirty(self):
self._test_init('subnetpool')
pool = self._make_subnetpool('json', ['10.0.0.0/8'],
name='meh',
tenant_id=self._tenant_id)['subnetpool']
self._verify_dirty_bit('subnetpool')
# Clear the dirty bit
quota_db_api.set_quota_usage_dirty(
self.ctx, 'subnetpool', self._tenant_id, dirty=False)
self._delete('subnetpools', pool['id'])
self._verify_dirty_bit('subnetpool')
def test_list_subnetpools_clears_dirty(self):
self._test_init('subnetpool')
pool = self._make_subnetpool('json', ['10.0.0.0/8'],
name='meh',
tenant_id=self._tenant_id)['subnetpool']
self.ctx.tenant_id = pool['tenant_id']
self._list('subnetpools', neutron_context=self.ctx)
self._verify_dirty_bit('subnetpool', expected_value=False)
def test_create_delete_securitygroup_marks_dirty(self):
self._test_init('security_group')
sec_group = self._make_security_group(
'json', 'meh', 'meh', tenant_id=self._tenant_id)['security_group']
self._verify_dirty_bit('security_group')
# Clear the dirty bit
quota_db_api.set_quota_usage_dirty(
self.ctx, 'security_group', self._tenant_id, dirty=False)
self._delete('security-groups', sec_group['id'])
self._verify_dirty_bit('security_group')
def test_list_securitygroups_clears_dirty(self):
self._test_init('security_group')
self._make_security_group(
'json', 'meh', 'meh', tenant_id=self._tenant_id)['security_group']
self.ctx.tenant_id = self._tenant_id
self._list('security-groups', neutron_context=self.ctx)
self._verify_dirty_bit('security_group', expected_value=False)
def test_create_delete_securitygrouprule_marks_dirty(self):
self._test_init('security_group_rule')
sec_group = self._make_security_group(
'json', 'meh', 'meh', tenant_id=self._tenant_id)['security_group']
rule_req = self._build_security_group_rule(
sec_group['id'], 'ingress', 'TCP', tenant_id=self._tenant_id)
sec_group_rule = self._make_security_group_rule(
'json', rule_req)['security_group_rule']
self._verify_dirty_bit('security_group_rule')
# Clear the dirty bit
quota_db_api.set_quota_usage_dirty(
self.ctx, 'security_group_rule', self._tenant_id, dirty=False)
self._delete('security-group-rules', sec_group_rule['id'])
self._verify_dirty_bit('security_group_rule')
def test_list_securitygrouprules_clears_dirty(self):
self._test_init('security_group_rule')
self._make_security_group(
'json', 'meh', 'meh', tenant_id=self._tenant_id)['security_group']
# As the security group create operation also creates 2 security group
# rules there is no need to explicitly create any rule
self.ctx.tenant_id = self._tenant_id
self._list('security-group-rules', neutron_context=self.ctx)
self._verify_dirty_bit('security_group_rule', expected_value=False)

View File

@ -23,6 +23,7 @@ ignore_regexes=(
"^plugins/ml2/test_extension_driver_api.py$"
"^plugins/ml2/test_ext_portsecurity.py$"
"^plugins/ml2/test_agent_scheduler.py$"
"^plugins/ml2/test_tracked_resources.py$"
"^plugins/ml2/drivers/openvswitch/agent/test_agent_scheduler.py$"
"^plugins/ml2/drivers/openvswitch/agent/test_ovs_tunnel.py$"
"^plugins/openvswitch/test_agent_scheduler.py$"