Merge "[Default SG rules] Test to check if SG rules are created from template"

This commit is contained in:
Zuul 2023-11-17 12:57:20 +00:00 committed by Gerrit Code Review
commit c4802160f8
2 changed files with 88 additions and 18 deletions

View File

@ -28,7 +28,7 @@ RULE_KEYWORDS_TO_CHECK = [
]
class DefaultSecurityGroupRuleTest(base.BaseNetworkTest):
class DefaultSecurityGroupRuleTest(base.BaseAdminNetworkTest):
required_extensions = ['security-groups-default-rules']
credentials = ['primary', 'admin']
@ -38,19 +38,19 @@ class DefaultSecurityGroupRuleTest(base.BaseNetworkTest):
super(DefaultSecurityGroupRuleTest, cls).setup_clients()
cls.admin_client = cls.os_admin.network_client
def _filter_not_relevant_rule_keys(self, rule):
def _filter_not_relevant_rule_keys(self, rule, expected_keys=None):
expected_keys = expected_keys or RULE_KEYWORDS_TO_CHECK
new_rule = {}
rule_keys = list(rule.keys())
for k in rule_keys:
if k in RULE_KEYWORDS_TO_CHECK:
for k in rule.keys():
if k in expected_keys:
new_rule[k] = rule[k]
return new_rule
def _filter_not_relevant_rules_keys(self, rules):
return [self._filter_not_relevant_rule_keys(r) for r in rules]
def _filter_not_relevant_rules_keys(self, rules, keys=None):
keys = keys or RULE_KEYWORDS_TO_CHECK
return [self._filter_not_relevant_rule_keys(r, keys) for r in rules]
def _assert_rules_exists(self, expected_rules, actual_rules):
actual_rules = self._filter_not_relevant_rules_keys(actual_rules)
for expected_rule in expected_rules:
self.assertIn(expected_rule, actual_rules)
@ -111,8 +111,9 @@ class DefaultSecurityGroupRuleTest(base.BaseNetworkTest):
self.admin_client.list_default_security_group_rules()[
'default_security_group_rules'
])
self._assert_rules_exists(expected_legacy_template_rules,
sg_rules_template)
self._assert_rules_exists(
expected_legacy_template_rules,
self._filter_not_relevant_rules_keys(sg_rules_template))
@decorators.idempotent_id('df98f969-ff2d-4597-9765-f5d4f81f775f')
def test_default_security_group_rule_lifecycle(self):
@ -256,5 +257,57 @@ class DefaultSecurityGroupRuleTest(base.BaseNetworkTest):
self.admin_client.list_default_security_group_rules()[
'default_security_group_rules'
])
self._assert_rules_exists(expected_rules,
sg_rules_template)
self._assert_rules_exists(
expected_rules,
self._filter_not_relevant_rules_keys(sg_rules_template))
def _validate_security_group_rules(self, sg, is_default_sg):
keys_to_check = [
'remote_group_id', 'direction', 'ethertype', 'protocol',
'remote_ip_prefix', 'remote_address_group_id', 'port_range_min',
'port_range_max']
if is_default_sg:
sg_rules_template = (
self.admin_client.list_default_security_group_rules(
used_in_default_sg=True)['default_security_group_rules'])
else:
sg_rules_template = (
self.admin_client.list_default_security_group_rules(
used_in_non_default_sg=True
)['default_security_group_rules'])
# NOTE(slaweq): We need to replace "PARENT" keyword in
# the "remote_group_id" attribute of every default sg rule template
# with actual SG ID
for rule in sg_rules_template:
if rule['remote_group_id'] == 'PARENT':
rule['remote_group_id'] = sg['id']
self._assert_rules_exists(
self._filter_not_relevant_rules_keys(
sg_rules_template, keys_to_check),
self._filter_not_relevant_rules_keys(
sg['security_group_rules'], keys_to_check))
@decorators.idempotent_id('29feedb1-6f04-4a1f-a778-2fae2c7b7dc8')
def test_security_group_rules_created_from_default_sg_rules_template(
self):
"""Test if default SG and custom new SG have got proper SG rules.
This test creates new project and checks if its default SG has SG
rules matching default SG rules for that kind of SG.
Next it creates new SG for the same project and checks if that SG also
have proper SG rules based on the default SG rules template.
"""
project = self.create_project()
# First check rules for default SG created automatically for each
# project
default_sg = self.admin_client.list_security_groups(
tenant_id=project['id'], name='default')['security_groups'][0]
self._validate_security_group_rules(default_sg, is_default_sg=True)
# And now create different SG for same project and check SG rules for
# such additional SG
sg = self.create_security_group(project=project)
self._validate_security_group_rules(sg, is_default_sg=False)

View File

@ -245,6 +245,14 @@ class StatelessSecGroupTest(BaseSecGroupTest):
class BaseSecGroupQuota(base.BaseAdminNetworkTest):
def setUp(self):
super().setUp()
# NOTE(slaweq): we don't know exactly how many rule templates may be
# created in the neutron db and used for every SG so, as in this test
# class we are checking quotas of SG, not SG rules, lets set quota for
# SG rules to -1
self._set_sg_rules_quota(-1)
def _create_max_allowed_sg_amount(self):
sg_amount = self._get_sg_amount()
sg_quota = self._get_sg_quota()
@ -270,17 +278,23 @@ class BaseSecGroupQuota(base.BaseAdminNetworkTest):
self.assertEqual(self._get_sg_quota(), new_sg_quota,
"Security group quota wasn't changed correctly")
def _set_sg_quota(self, val):
sg_quota = self._get_sg_quota()
def _set_quota(self, val, resource):
res_quota = self._get_quota(resource)
project_id = self.client.project_id
self.admin_client.update_quotas(project_id, **{'security_group': val})
self.admin_client.update_quotas(project_id, **{resource: val})
self.addCleanup(self.admin_client.update_quotas,
project_id, **{'security_group': sg_quota})
project_id, **{resource: res_quota})
def _get_sg_quota(self):
def _get_quota(self, resource):
project_id = self.client.project_id
quotas = self.admin_client.show_quotas(project_id)
return quotas['quota']['security_group']
return quotas['quota'][resource]
def _set_sg_quota(self, val):
return self._set_quota(val, 'security_group')
def _get_sg_quota(self):
return self._get_quota('security_group')
def _get_sg_amount(self):
project_id = self.client.project_id
@ -288,6 +302,9 @@ class BaseSecGroupQuota(base.BaseAdminNetworkTest):
security_groups = self.client.list_security_groups(**filter_query)
return len(security_groups['security_groups'])
def _set_sg_rules_quota(self, val):
return self._set_quota(val, 'security_group_rule')
class SecGroupQuotaTest(BaseSecGroupQuota):