diff --git a/neutron_tempest_plugin/api/admin/test_default_security_group_rules.py b/neutron_tempest_plugin/api/admin/test_default_security_group_rules.py index 826e2acd..604dbea5 100644 --- a/neutron_tempest_plugin/api/admin/test_default_security_group_rules.py +++ b/neutron_tempest_plugin/api/admin/test_default_security_group_rules.py @@ -28,7 +28,7 @@ RULE_KEYWORDS_TO_CHECK = [ ] -class DefaultSecurityGroupRuleTest(base.BaseNetworkTest): +class DefaultSecurityGroupRuleTest(base.BaseAdminNetworkTest): required_extensions = ['security-groups-default-rules'] credentials = ['primary', 'admin'] @@ -38,19 +38,19 @@ class DefaultSecurityGroupRuleTest(base.BaseNetworkTest): super(DefaultSecurityGroupRuleTest, cls).setup_clients() cls.admin_client = cls.os_admin.network_client - def _filter_not_relevant_rule_keys(self, rule): + def _filter_not_relevant_rule_keys(self, rule, expected_keys=None): + expected_keys = expected_keys or RULE_KEYWORDS_TO_CHECK new_rule = {} - rule_keys = list(rule.keys()) - for k in rule_keys: - if k in RULE_KEYWORDS_TO_CHECK: + for k in rule.keys(): + if k in expected_keys: new_rule[k] = rule[k] return new_rule - def _filter_not_relevant_rules_keys(self, rules): - return [self._filter_not_relevant_rule_keys(r) for r in rules] + def _filter_not_relevant_rules_keys(self, rules, keys=None): + keys = keys or RULE_KEYWORDS_TO_CHECK + return [self._filter_not_relevant_rule_keys(r, keys) for r in rules] def _assert_rules_exists(self, expected_rules, actual_rules): - actual_rules = self._filter_not_relevant_rules_keys(actual_rules) for expected_rule in expected_rules: self.assertIn(expected_rule, actual_rules) @@ -111,8 +111,9 @@ class DefaultSecurityGroupRuleTest(base.BaseNetworkTest): self.admin_client.list_default_security_group_rules()[ 'default_security_group_rules' ]) - self._assert_rules_exists(expected_legacy_template_rules, - sg_rules_template) + self._assert_rules_exists( + expected_legacy_template_rules, + self._filter_not_relevant_rules_keys(sg_rules_template)) @decorators.idempotent_id('df98f969-ff2d-4597-9765-f5d4f81f775f') def test_default_security_group_rule_lifecycle(self): @@ -256,5 +257,57 @@ class DefaultSecurityGroupRuleTest(base.BaseNetworkTest): self.admin_client.list_default_security_group_rules()[ 'default_security_group_rules' ]) - self._assert_rules_exists(expected_rules, - sg_rules_template) + self._assert_rules_exists( + expected_rules, + self._filter_not_relevant_rules_keys(sg_rules_template)) + + def _validate_security_group_rules(self, sg, is_default_sg): + keys_to_check = [ + 'remote_group_id', 'direction', 'ethertype', 'protocol', + 'remote_ip_prefix', 'remote_address_group_id', 'port_range_min', + 'port_range_max'] + + if is_default_sg: + sg_rules_template = ( + self.admin_client.list_default_security_group_rules( + used_in_default_sg=True)['default_security_group_rules']) + else: + sg_rules_template = ( + self.admin_client.list_default_security_group_rules( + used_in_non_default_sg=True + )['default_security_group_rules']) + # NOTE(slaweq): We need to replace "PARENT" keyword in + # the "remote_group_id" attribute of every default sg rule template + # with actual SG ID + for rule in sg_rules_template: + if rule['remote_group_id'] == 'PARENT': + rule['remote_group_id'] = sg['id'] + + self._assert_rules_exists( + self._filter_not_relevant_rules_keys( + sg_rules_template, keys_to_check), + self._filter_not_relevant_rules_keys( + sg['security_group_rules'], keys_to_check)) + + @decorators.idempotent_id('29feedb1-6f04-4a1f-a778-2fae2c7b7dc8') + def test_security_group_rules_created_from_default_sg_rules_template( + self): + """Test if default SG and custom new SG have got proper SG rules. + + This test creates new project and checks if its default SG has SG + rules matching default SG rules for that kind of SG. + Next it creates new SG for the same project and checks if that SG also + have proper SG rules based on the default SG rules template. + """ + + project = self.create_project() + # First check rules for default SG created automatically for each + # project + default_sg = self.admin_client.list_security_groups( + tenant_id=project['id'], name='default')['security_groups'][0] + self._validate_security_group_rules(default_sg, is_default_sg=True) + + # And now create different SG for same project and check SG rules for + # such additional SG + sg = self.create_security_group(project=project) + self._validate_security_group_rules(sg, is_default_sg=False) diff --git a/neutron_tempest_plugin/api/test_security_groups.py b/neutron_tempest_plugin/api/test_security_groups.py index 14e0c666..85e4763d 100644 --- a/neutron_tempest_plugin/api/test_security_groups.py +++ b/neutron_tempest_plugin/api/test_security_groups.py @@ -245,6 +245,14 @@ class StatelessSecGroupTest(BaseSecGroupTest): class BaseSecGroupQuota(base.BaseAdminNetworkTest): + def setUp(self): + super().setUp() + # NOTE(slaweq): we don't know exactly how many rule templates may be + # created in the neutron db and used for every SG so, as in this test + # class we are checking quotas of SG, not SG rules, lets set quota for + # SG rules to -1 + self._set_sg_rules_quota(-1) + def _create_max_allowed_sg_amount(self): sg_amount = self._get_sg_amount() sg_quota = self._get_sg_quota() @@ -270,17 +278,23 @@ class BaseSecGroupQuota(base.BaseAdminNetworkTest): self.assertEqual(self._get_sg_quota(), new_sg_quota, "Security group quota wasn't changed correctly") - def _set_sg_quota(self, val): - sg_quota = self._get_sg_quota() + def _set_quota(self, val, resource): + res_quota = self._get_quota(resource) project_id = self.client.project_id - self.admin_client.update_quotas(project_id, **{'security_group': val}) + self.admin_client.update_quotas(project_id, **{resource: val}) self.addCleanup(self.admin_client.update_quotas, - project_id, **{'security_group': sg_quota}) + project_id, **{resource: res_quota}) - def _get_sg_quota(self): + def _get_quota(self, resource): project_id = self.client.project_id quotas = self.admin_client.show_quotas(project_id) - return quotas['quota']['security_group'] + return quotas['quota'][resource] + + def _set_sg_quota(self, val): + return self._set_quota(val, 'security_group') + + def _get_sg_quota(self): + return self._get_quota('security_group') def _get_sg_amount(self): project_id = self.client.project_id @@ -288,6 +302,9 @@ class BaseSecGroupQuota(base.BaseAdminNetworkTest): security_groups = self.client.list_security_groups(**filter_query) return len(security_groups['security_groups']) + def _set_sg_rules_quota(self, val): + return self._set_quota(val, 'security_group_rule') + class SecGroupQuotaTest(BaseSecGroupQuota):