diff --git a/neutron_tempest_plugin/api/admin/test_extension_driver_port_security_admin.py b/neutron_tempest_plugin/api/admin/test_extension_driver_port_security_admin.py index d449eadb..048a1e57 100644 --- a/neutron_tempest_plugin/api/admin/test_extension_driver_port_security_admin.py +++ b/neutron_tempest_plugin/api/admin/test_extension_driver_port_security_admin.py @@ -17,11 +17,9 @@ from tempest.lib import decorators from tempest.lib import exceptions as lib_exc from neutron_tempest_plugin.api import base -from neutron_tempest_plugin.api import base_security_groups as base_security -class PortSecurityAdminTests(base_security.BaseSecGroupTest, - base.BaseAdminNetworkTest): +class PortSecurityAdminTests(base.BaseAdminNetworkTest): required_extensions = ['port-security'] diff --git a/neutron_tempest_plugin/api/admin/test_quotas_negative.py b/neutron_tempest_plugin/api/admin/test_quotas_negative.py index cd64e5ce..9c37d923 100644 --- a/neutron_tempest_plugin/api/admin/test_quotas_negative.py +++ b/neutron_tempest_plugin/api/admin/test_quotas_negative.py @@ -10,6 +10,7 @@ # License for the specific language governing permissions and limitations # under the License. +from neutron_lib import constants from tempest.common import utils from tempest.lib.common.utils import data_utils from tempest.lib import decorators @@ -110,54 +111,38 @@ class QuotasAdminNegativeTestJSON(test_quotas.QuotasTestBase): @decorators.idempotent_id('5c924ff7-b7a9-474f-92a3-dbe0f976ec13') @utils.requires_ext(extension="security-group", service="network") def test_create_security_group_when_quotas_is_full(self): - tenant_id = self.create_project()['id'] - sg_args = {'tenant_id': tenant_id} - # avoid a number that is made by default - sg_list = self.admin_client.list_security_groups( - tenant_id=tenant_id)['security_groups'] - num = len(sg_list) + 1 + project = self.create_project() - new_quotas = {'security_group': num} - self._setup_quotas(tenant_id, **new_quotas) + # Set quotas to allow to create only one more security group + security_groups = self.admin_client.list_security_groups( + tenant_id=project['id'])['security_groups'] + self._setup_quotas(project['id'], + security_group=len(security_groups) + 1) - sg = self.admin_client.create_security_group( - **sg_args)['security_group'] - self.addCleanup(self.admin_client.delete_security_group, sg['id']) - - self.assertRaises(lib_exc.Conflict, - self.admin_client.create_security_group, **sg_args) + self.create_security_group(project=project) + self.assertRaises(lib_exc.Conflict, self.create_security_group, + project=project) @decorators.attr(type='negative') @decorators.idempotent_id('b7143480-6118-4ed4-be38-1b6f15f30d05') @utils.requires_ext(extension="security-group", service="network") def test_create_security_group_rule_when_quotas_is_full(self): - tenant_id = self.create_project()['id'] - sg_args = {'tenant_id': tenant_id} + project = self.create_project() + security_group = self.create_security_group(project=project) - sg = self.admin_client.create_security_group( - **sg_args)['security_group'] - self.addCleanup(self.admin_client.delete_security_group, sg['id']) + # Set quotas to allow to create only one more security group rule + security_group_rules = self.admin_client.list_security_group_rules( + tenant_id=project['id'])['security_group_rules'] + self._setup_quotas(project['id'], + security_group_rule=len(security_group_rules) + 1) - # avoid a number that is made by default - sg_rule_list = self.admin_client.list_security_group_rules( - tenant_id=tenant_id)['security_group_rules'] - num = len(sg_rule_list) + 1 - - new_quotas = {'security_group_rule': num} - self._setup_quotas(tenant_id, **new_quotas) - - sg_rule_args = {'tenant_id': tenant_id, - 'security_group_id': sg['id'], - 'direction': 'ingress'} - sg_rule = self.admin_client.create_security_group_rule( - **sg_rule_args)['security_group_rule'] - self.addCleanup( - self.admin_client.delete_security_group_rule, sg_rule['id']) - - sg_rule_args['direction'] = 'egress' + self.create_security_group_rule( + project=project, security_group=security_group, + direction=constants.INGRESS_DIRECTION) self.assertRaises(lib_exc.Conflict, - self.admin_client.create_security_group_rule, - **sg_rule_args) + self.create_security_group_rule, + project=project, security_group=security_group, + direction=constants.EGRESS_DIRECTION) @decorators.attr(type='negative') @decorators.idempotent_id('d00fe5bb-9db8-4e1a-9c31-490f52897e6f') diff --git a/neutron_tempest_plugin/api/admin/test_security_groups.py b/neutron_tempest_plugin/api/admin/test_security_groups.py index de7e7d22..d79b0eee 100644 --- a/neutron_tempest_plugin/api/admin/test_security_groups.py +++ b/neutron_tempest_plugin/api/admin/test_security_groups.py @@ -14,10 +14,10 @@ from tempest.lib import decorators -from neutron_tempest_plugin.api import base_security_groups as base +from neutron_tempest_plugin.api import base -class SecGroupAdminTest(base.BaseSecGroupTest): +class SecGroupAdminTest(base.BaseNetworkTest): required_extensions = ['security-group'] credentials = ['primary', 'admin'] diff --git a/neutron_tempest_plugin/api/base_security_groups.py b/neutron_tempest_plugin/api/base_security_groups.py index 127bbd90..ca2c17a3 100644 --- a/neutron_tempest_plugin/api/base_security_groups.py +++ b/neutron_tempest_plugin/api/base_security_groups.py @@ -14,9 +14,6 @@ # under the License. from neutron_lib import constants -from tempest.lib.common.utils import data_utils - -from neutron_tempest_plugin.api import base # NOTE(yamamoto): The list of protocols here is what we had in Ocata. @@ -45,9 +42,13 @@ V4_PROTOCOL_NAMES = { 'udplite', 'vrrp', } -V4_PROTOCOL_INTS = set(v for k, v in constants.IP_PROTOCOL_MAP.items() if - k in V4_PROTOCOL_NAMES) -V6_PROTOCOL_LEGACY = set([constants.PROTO_NAME_IPV6_ICMP_LEGACY]) + +V4_PROTOCOL_INTS = {v + for k, v in constants.IP_PROTOCOL_MAP.items() + if k in V4_PROTOCOL_NAMES} + +V6_PROTOCOL_LEGACY = {constants.PROTO_NAME_IPV6_ICMP_LEGACY} + V6_PROTOCOL_NAMES = { 'ipv6-encap', 'ipv6-frag', @@ -56,66 +57,7 @@ V6_PROTOCOL_NAMES = { 'ipv6-opts', 'ipv6-route', } -V6_PROTOCOL_INTS = set(v for k, v in constants.IP_PROTOCOL_MAP.items() if - k in (V6_PROTOCOL_NAMES | V6_PROTOCOL_LEGACY)) - -class BaseSecGroupTest(base.BaseNetworkTest): - - def _create_security_group(self, **kwargs): - # Create a security group - name = data_utils.rand_name('secgroup-') - group_create_body = self.client.create_security_group(name=name, - **kwargs) - self.addCleanup(self._delete_security_group, - group_create_body['security_group']['id']) - self.assertEqual(group_create_body['security_group']['name'], name) - return group_create_body, name - - def _delete_security_group(self, secgroup_id): - self.client.delete_security_group(secgroup_id) - # Asserting that the security group is not found in the list - # after deletion - list_body = self.client.list_security_groups() - secgroup_list = list() - for secgroup in list_body['security_groups']: - secgroup_list.append(secgroup['id']) - self.assertNotIn(secgroup_id, secgroup_list) - - def _create_security_group_rule(self, **kwargs): - rule_create_body = self.client.create_security_group_rule(**kwargs) - # List rules and verify created rule is in response - rule_list_body = ( - self.client.list_security_group_rules()) - rule_list = [rule['id'] - for rule in rule_list_body['security_group_rules']] - self.assertIn(rule_create_body['security_group_rule']['id'], - rule_list) - self.addCleanup(self._delete_security_group_rule, - rule_create_body['security_group_rule']['id']) - return rule_create_body - - def _show_security_group_rule(self, **kwargs): - show_rule_body = self.client.show_security_group_rule(kwargs['id']) - for key, value in kwargs.items(): - self.assertEqual(value, - show_rule_body['security_group_rule'][key], - "%s does not match." % key) - - def _delete_security_group_rule(self, secgroup_rule_id): - self.client.delete_security_group_rule(secgroup_rule_id) - rule_list_body = self.client.list_security_group_rules() - rule_list = [rule['id'] - for rule in rule_list_body['security_group_rules']] - self.assertNotIn(secgroup_rule_id, rule_list) - - def _test_create_show_delete_security_group_rule(self, **kwargs): - # The security group rule is deleted by the cleanup call in - # _create_security_group_rule. - rule_create_body = ( - self._create_security_group_rule(**kwargs)['security_group_rule']) - self._show_security_group_rule( - id=rule_create_body['id'], - protocol=rule_create_body['protocol'], - direction=rule_create_body['direction'], - ethertype=rule_create_body['ethertype']) +V6_PROTOCOL_INTS = {v + for k, v in constants.IP_PROTOCOL_MAP.items() + if k in (V6_PROTOCOL_NAMES | V6_PROTOCOL_LEGACY)} diff --git a/neutron_tempest_plugin/api/test_extension_driver_port_security.py b/neutron_tempest_plugin/api/test_extension_driver_port_security.py index 8a8c4f2a..6b055577 100644 --- a/neutron_tempest_plugin/api/test_extension_driver_port_security.py +++ b/neutron_tempest_plugin/api/test_extension_driver_port_security.py @@ -19,15 +19,13 @@ from tempest.lib import decorators from tempest.lib import exceptions as lib_exc from neutron_tempest_plugin.api import base -from neutron_tempest_plugin.api import base_security_groups as base_security FAKE_IP = '10.0.0.1' FAKE_MAC = '00:25:64:e8:19:dd' @ddt.ddt -class PortSecTest(base_security.BaseSecGroupTest, - base.BaseNetworkTest): +class PortSecTest(base.BaseNetworkTest): @decorators.idempotent_id('7c338ddf-e64e-4118-bd33-e49a1f2f1495') @utils.requires_ext(extension='port-security', service='network') @@ -76,7 +74,7 @@ class PortSecTest(base_security.BaseSecGroupTest, network = self.create_network() self.create_subnet(network) - sec_group_body, _ = self._create_security_group() + security_group = self.create_security_group() port = self.create_port(network) # Exception when set port-sec to False with sec-group defined @@ -88,7 +86,7 @@ class PortSecTest(base_security.BaseSecGroupTest, self.assertEmpty(port['security_groups']) self.assertFalse(port['port_security_enabled']) port = self.update_port( - port, security_groups=[sec_group_body['security_group']['id']], + port, security_groups=[security_group['id']], port_security_enabled=True) self.assertNotEmpty(port['security_groups']) @@ -102,11 +100,11 @@ class PortSecTest(base_security.BaseSecGroupTest, def test_port_sec_update_pass(self): network = self.create_network() self.create_subnet(network) - sec_group, _ = self._create_security_group() - sec_group_id = sec_group['security_group']['id'] - port = self.create_port(network, security_groups=[sec_group_id], - port_security_enabled=True) + security_group = self.create_security_group() + port = self.create_port(network, + security_groups=[security_group['id']], + port_security_enabled=True) self.assertNotEmpty(port['security_groups']) self.assertTrue(port['port_security_enabled']) @@ -114,7 +112,7 @@ class PortSecTest(base_security.BaseSecGroupTest, self.assertEmpty(port['security_groups']) self.assertTrue(port['port_security_enabled']) - port = self.update_port(port, security_groups=[sec_group_id]) + port = self.update_port(port, security_groups=[security_group['id']]) self.assertNotEmpty(port['security_groups']) port = self.update_port(port, security_groups=[], port_security_enabled=False) diff --git a/neutron_tempest_plugin/api/test_revisions.py b/neutron_tempest_plugin/api/test_revisions.py index b03285d9..0d590f63 100644 --- a/neutron_tempest_plugin/api/test_revisions.py +++ b/neutron_tempest_plugin/api/test_revisions.py @@ -12,16 +12,16 @@ import netaddr +from neutron_lib import constants from tempest.common import utils from tempest.lib import decorators from tempest.lib import exceptions from neutron_tempest_plugin.api import base -from neutron_tempest_plugin.api import base_security_groups as bsg from neutron_tempest_plugin import config -class TestRevisions(base.BaseAdminNetworkTest, bsg.BaseSecGroupTest): +class TestRevisions(base.BaseAdminNetworkTest): required_extensions = ['standard-attr-revisions'] @@ -111,46 +111,51 @@ class TestRevisions(base.BaseAdminNetworkTest, bsg.BaseSecGroupTest): @decorators.idempotent_id('6c256f71-c929-4200-b3dc-4e1843506be5') @utils.requires_ext(extension="security-group", service="network") def test_update_sg_group_bumps_revision(self): - sg, name = self._create_security_group() - self.assertIn('revision_number', sg['security_group']) - update_body = self.client.update_security_group( - sg['security_group']['id'], name='new_sg_name') - self.assertGreater(update_body['security_group']['revision_number'], - sg['security_group']['revision_number']) + security_group = self.create_security_group() + self.assertIn('revision_number', security_group) + updated_security_group = self.client.update_security_group( + security_group['id'], name='new_sg_name')['security_group'] + self.assertGreater(updated_security_group['revision_number'], + security_group['revision_number']) @decorators.idempotent_id('6489632f-8550-4453-a674-c98849742967') @utils.requires_ext(extension="security-group", service="network") def test_update_port_sg_binding_bumps_revision(self): - net = self.create_network() - self.addCleanup(self.client.delete_network, net['id']) - port = self.create_port(net) - self.addCleanup(self.client.delete_port, port['id']) - sg = self._create_security_group()[0] - self.client.update_port( - port['id'], security_groups=[sg['security_group']['id']]) - updated = self.client.show_port(port['id']) - updated2 = self.client.update_port(port['id'], security_groups=[]) - self.assertGreater(updated['port']['revision_number'], + network = self.create_network() + port = self.create_port(network) + + security_group = self.create_security_group() + updated_port = self.client.update_port( + port['id'], security_groups=[security_group['id']])['port'] + self.assertGreater(updated_port['revision_number'], port['revision_number']) - self.assertGreater(updated2['port']['revision_number'], - updated['port']['revision_number']) + + updated_port2 = self.client.update_port( + port['id'], security_groups=[])['port'] + self.assertGreater(updated_port2['revision_number'], + updated_port['revision_number']) @decorators.idempotent_id('29c7ab2b-d1d8-425d-8cec-fcf632960f22') @utils.requires_ext(extension="security-group", service="network") def test_update_sg_rule_bumps_sg_revision(self): - sg, name = self._create_security_group() - rule = self.client.create_security_group_rule( - security_group_id=sg['security_group']['id'], - protocol='tcp', direction='ingress', ethertype=self.ethertype, - port_range_min=60, port_range_max=70) - updated = self.client.show_security_group(sg['security_group']['id']) - self.assertGreater(updated['security_group']['revision_number'], - sg['security_group']['revision_number']) - self.client.delete_security_group_rule( - rule['security_group_rule']['id']) - updated2 = self.client.show_security_group(sg['security_group']['id']) - self.assertGreater(updated2['security_group']['revision_number'], - updated['security_group']['revision_number']) + security_group = self.create_security_group() + + security_group_rule = self.create_security_group_rule( + security_group=security_group, + protocol=constants.PROTO_NAME_TCP, + direction=constants.INGRESS_DIRECTION, + port_range_min=60, + port_range_max=70) + updated_security_group = self.client.show_security_group( + security_group['id'])['security_group'] + self.assertGreater(updated_security_group['revision_number'], + security_group['revision_number']) + + self.client.delete_security_group_rule(security_group_rule['id']) + updated_security_group2 = self.client.show_security_group( + security_group['id'])['security_group'] + self.assertGreater(updated_security_group2['revision_number'], + updated_security_group['revision_number']) @decorators.idempotent_id('db70c285-0365-4fac-9f55-2a0ad8cf55a8') @utils.requires_ext(extension="allowed-address-pairs", service="network") diff --git a/neutron_tempest_plugin/api/test_security_groups.py b/neutron_tempest_plugin/api/test_security_groups.py index 299a62e7..b6d344db 100644 --- a/neutron_tempest_plugin/api/test_security_groups.py +++ b/neutron_tempest_plugin/api/test_security_groups.py @@ -17,39 +17,40 @@ from neutron_lib import constants from tempest.lib.common.utils import data_utils from tempest.lib import decorators -from neutron_tempest_plugin.api import base_security_groups as base +from neutron_tempest_plugin.api import base +from neutron_tempest_plugin.api import base_security_groups -class SecGroupTest(base.BaseSecGroupTest): +class SecGroupTest(base.BaseNetworkTest): required_extensions = ['security-group'] @decorators.idempotent_id('bfd128e5-3c92-44b6-9d66-7fe29d22c802') def test_create_list_update_show_delete_security_group(self): - group_create_body, name = self._create_security_group() + security_group = self.create_security_group() # List security groups and verify if created group is there in response - list_body = self.client.list_security_groups() - secgroup_list = list() - for secgroup in list_body['security_groups']: - secgroup_list.append(secgroup['id']) - self.assertIn(group_create_body['security_group']['id'], secgroup_list) + security_groups = self.client.list_security_groups()['security_groups'] + self.assertIn(security_group['id'], + {sg['id'] for sg in security_groups}) + # Update the security group new_name = data_utils.rand_name('security') new_description = data_utils.rand_name('security-description') - update_body = self.client.update_security_group( - group_create_body['security_group']['id'], - name=new_name, - description=new_description) + updated_security_group = self.client.update_security_group( + security_group['id'], name=new_name, + description=new_description)['security_group'] + # Verify if security group is updated - self.assertEqual(update_body['security_group']['name'], new_name) - self.assertEqual(update_body['security_group']['description'], + self.assertEqual(updated_security_group['name'], new_name) + self.assertEqual(updated_security_group['description'], new_description) + # Show details of the updated security group - show_body = self.client.show_security_group( - group_create_body['security_group']['id']) - self.assertEqual(show_body['security_group']['name'], new_name) - self.assertEqual(show_body['security_group']['description'], + observed_security_group = self.client.show_security_group( + security_group['id'])['security_group'] + self.assertEqual(observed_security_group['name'], new_name) + self.assertEqual(observed_security_group['description'], new_description) @decorators.idempotent_id('7c0ecb10-b2db-11e6-9b14-000c29248b0d') @@ -67,58 +68,48 @@ class SecGroupTest(base.BaseSecGroupTest): self.assertIsNotNone(secgrp['id']) -class SecGroupProtocolTest(base.BaseSecGroupTest): +class SecGroupProtocolTest(base.BaseNetworkTest): + + protocol_names = base_security_groups.V4_PROTOCOL_NAMES + protocol_ints = base_security_groups.V4_PROTOCOL_INTS @decorators.idempotent_id('282e3681-aa6e-42a7-b05c-c341aa1e3cdf') - def test_create_show_delete_security_group_rule_names(self): - group_create_body, _ = self._create_security_group() - for protocol in base.V4_PROTOCOL_NAMES: - self._test_create_show_delete_security_group_rule( - security_group_id=group_create_body['security_group']['id'], - protocol=protocol, + def test_security_group_rule_protocol_names(self): + self._test_security_group_rule_protocols(protocols=self.protocol_names) + + @decorators.idempotent_id('66e47f1f-20b6-4417-8839-3cc671c7afa3') + def test_security_group_rule_protocol_ints(self): + self._test_security_group_rule_protocols(protocols=self.protocol_ints) + + def _test_security_group_rule_protocols(self, protocols): + security_group = self.create_security_group() + for protocol in protocols: + self._test_security_group_rule( + security_group=security_group, + protocol=str(protocol), direction=constants.INGRESS_DIRECTION, ethertype=self.ethertype) - @decorators.idempotent_id('66e47f1f-20b6-4417-8839-3cc671c7afa3') - def test_create_show_delete_security_group_rule_integers(self): - group_create_body, _ = self._create_security_group() - for protocol in base.V4_PROTOCOL_INTS: - self._test_create_show_delete_security_group_rule( - security_group_id=group_create_body['security_group']['id'], - protocol=protocol, - direction=constants.INGRESS_DIRECTION, - ethertype=self.ethertype) + def _test_security_group_rule(self, security_group, **kwargs): + security_group_rule = self.create_security_group_rule( + security_group=security_group, **kwargs) + observed_security_group_rule = self.client.show_security_group_rule( + security_group_rule['id'])['security_group_rule'] + for key, value in kwargs.items(): + self.assertEqual(value, security_group_rule[key], + "{!r} does not match.".format(key)) + self.assertEqual(value, observed_security_group_rule[key], + "{!r} does not match.".format(key)) class SecGroupProtocolIPv6Test(SecGroupProtocolTest): - _ip_version = constants.IP_VERSION_6 - @decorators.idempotent_id('1f7cc9f5-e0d5-487c-8384-3d74060ab530') - def test_create_security_group_rule_with_ipv6_protocol_names(self): - group_create_body, _ = self._create_security_group() - for protocol in base.V6_PROTOCOL_NAMES: - self._test_create_show_delete_security_group_rule( - security_group_id=group_create_body['security_group']['id'], - protocol=protocol, - direction=constants.INGRESS_DIRECTION, - ethertype=self.ethertype) + _ip_version = constants.IP_VERSION_6 + protocol_names = base_security_groups.V6_PROTOCOL_NAMES + protocol_ints = base_security_groups.V6_PROTOCOL_INTS + protocol_legacy_names = base_security_groups.V6_PROTOCOL_LEGACY @decorators.idempotent_id('c7d17b41-3b4e-4add-bb3b-6af59baaaffa') - def test_create_security_group_rule_with_ipv6_protocol_legacy_names(self): - group_create_body, _ = self._create_security_group() - for protocol in base.V6_PROTOCOL_LEGACY: - self._test_create_show_delete_security_group_rule( - security_group_id=group_create_body['security_group']['id'], - protocol=protocol, - direction=constants.INGRESS_DIRECTION, - ethertype=self.ethertype) - - @decorators.idempotent_id('bcfce0b7-bc96-40ae-9b08-3f6774ee0260') - def test_create_security_group_rule_with_ipv6_protocol_integers(self): - group_create_body, _ = self._create_security_group() - for protocol in base.V6_PROTOCOL_INTS: - self._test_create_show_delete_security_group_rule( - security_group_id=group_create_body['security_group']['id'], - protocol=protocol, - direction=constants.INGRESS_DIRECTION, - ethertype=self.ethertype) + def test_security_group_rule_protocol_legacy_names(self): + self._test_security_group_rule_protocols( + protocols=self.protocol_legacy_names) diff --git a/neutron_tempest_plugin/api/test_security_groups_negative.py b/neutron_tempest_plugin/api/test_security_groups_negative.py index c4276914..1fcbd18d 100644 --- a/neutron_tempest_plugin/api/test_security_groups_negative.py +++ b/neutron_tempest_plugin/api/test_security_groups_negative.py @@ -18,12 +18,14 @@ from neutron_lib.db import constants as db_const from tempest.lib import decorators from tempest.lib import exceptions as lib_exc -from neutron_tempest_plugin.api import base_security_groups as base +from neutron_tempest_plugin.api import base +from neutron_tempest_plugin.api import base_security_groups + LONG_NAME_NG = 'x' * (db_const.NAME_FIELD_SIZE + 1) -class NegativeSecGroupTest(base.BaseSecGroupTest): +class NegativeSecGroupTest(base.BaseNetworkTest): required_extensions = ['security-group'] @@ -36,72 +38,68 @@ class NegativeSecGroupTest(base.BaseSecGroupTest): @decorators.idempotent_id('594edfa8-9a5b-438e-9344-49aece337d49') def test_create_security_group_with_too_long_name(self): self.assertRaises(lib_exc.BadRequest, - self.client.create_security_group, + self.create_security_group, name=LONG_NAME_NG) @decorators.attr(type='negative') @decorators.idempotent_id('b6b79838-7430-4d3f-8e07-51dfb61802c2') def test_create_security_group_with_boolean_type_name(self): self.assertRaises(lib_exc.BadRequest, - self.client.create_security_group, + self.create_security_group, name=True) @decorators.attr(type='negative') @decorators.idempotent_id('55100aa8-b24f-333c-0bef-64eefd85f15c') def test_update_default_security_group_name(self): - sg_list = self.client.list_security_groups(name='default') - sg = sg_list['security_groups'][0] + security_group = self.client.list_security_groups(name='default')[ + 'security_groups'][0] self.assertRaises(lib_exc.Conflict, self.client.update_security_group, - sg['id'], name='test') + security_group['id'], name='test') @decorators.attr(type='negative') @decorators.idempotent_id('c8510dd8-c3a8-4df9-ae44-24354db50960') def test_update_security_group_with_too_long_name(self): - sg_list = self.client.list_security_groups(name='default') - sg = sg_list['security_groups'][0] + security_group = self.client.list_security_groups(name='default')[ + 'security_groups'][0] self.assertRaises(lib_exc.BadRequest, self.client.update_security_group, - sg['id'], name=LONG_NAME_NG) + security_group['id'], name=LONG_NAME_NG) @decorators.attr(type='negative') @decorators.idempotent_id('d9a14917-f66f-4eca-ab72-018563917f1b') def test_update_security_group_with_boolean_type_name(self): - sg_list = self.client.list_security_groups(name='default') - sg = sg_list['security_groups'][0] + security_group = self.client.list_security_groups(name='default')[ + 'security_groups'][0] self.assertRaises(lib_exc.BadRequest, self.client.update_security_group, - sg['id'], name=True) + security_group['id'], name=True) @decorators.attr(type='negative') @decorators.idempotent_id('3200b1a8-d73b-48e9-b03f-e891a4abe2d3') def test_delete_in_use_sec_group(self): - sgroup = self.os_primary.network_client.create_security_group( - name='sgroup') - self.security_groups.append(sgroup['security_group']) - port = self.client.create_port( - network_id=self.network['id'], - security_groups=[sgroup['security_group']['id']]) - self.ports.append(port['port']) + security_group = self.create_security_group() + self.create_port(network=self.network, + security_groups=[security_group['id']]) self.assertRaises(lib_exc.Conflict, self.os_primary.network_client.delete_security_group, - security_group_id=sgroup['security_group']['id']) + security_group_id=security_group['id']) class NegativeSecGroupIPv6Test(NegativeSecGroupTest): _ip_version = constants.IP_VERSION_6 -class NegativeSecGroupProtocolTest(base.BaseSecGroupTest): +class NegativeSecGroupProtocolTest(base.BaseNetworkTest): def _test_create_security_group_rule_with_bad_protocols(self, protocols): - group_create_body, _ = self._create_security_group() + security_group = self.create_security_group() # bad protocols can include v6 protocols because self.ethertype is v4 for protocol in protocols: self.assertRaises( lib_exc.BadRequest, self.client.create_security_group_rule, - security_group_id=group_create_body['security_group']['id'], + security_group_id=security_group['id'], protocol=protocol, direction=constants.INGRESS_DIRECTION, ethertype=self.ethertype) @@ -109,10 +107,10 @@ class NegativeSecGroupProtocolTest(base.BaseSecGroupTest): @decorators.idempotent_id('cccbb0f3-c273-43ed-b3fc-1efc48833810') def test_create_security_group_rule_with_ipv6_protocol_names(self): self._test_create_security_group_rule_with_bad_protocols( - base.V6_PROTOCOL_NAMES) + base_security_groups.V6_PROTOCOL_NAMES) @decorators.attr(type=['negative']) @decorators.idempotent_id('8aa636bd-7060-4fdf-b722-cdae28e2f1ef') def test_create_security_group_rule_with_ipv6_protocol_integers(self): self._test_create_security_group_rule_with_bad_protocols( - base.V6_PROTOCOL_INTS) + base_security_groups.V6_PROTOCOL_INTS) diff --git a/neutron_tempest_plugin/api/test_timestamp.py b/neutron_tempest_plugin/api/test_timestamp.py index f5888f97..9ec982d9 100644 --- a/neutron_tempest_plugin/api/test_timestamp.py +++ b/neutron_tempest_plugin/api/test_timestamp.py @@ -11,14 +11,15 @@ # under the License. import copy +import time +from neutron_lib import constants from tempest.common import utils from tempest.lib.common.utils import data_utils from tempest.lib import decorators from neutron_tempest_plugin.api import base from neutron_tempest_plugin.api import base_routers -from neutron_tempest_plugin.api import base_security_groups from neutron_tempest_plugin import config CONF = config.CONF @@ -276,7 +277,7 @@ class TestTimeStampWithL3(base_routers.BaseRouterTest): show_fip['updated_at']) -class TestTimeStampWithSecurityGroup(base_security_groups.BaseSecGroupTest): +class TestTimeStampWithSecurityGroup(base.BaseNetworkTest): required_extensions = ['standard-attr-timestamp'] @@ -287,66 +288,66 @@ class TestTimeStampWithSecurityGroup(base_security_groups.BaseSecGroupTest): @decorators.idempotent_id('a3150a7b-d31a-423a-abf3-45e71c97cbac') def test_create_sg_with_timestamp(self): - sg, _ = self._create_security_group() + security_group = self.create_security_group() # Verifies body contains timestamp fields - self.assertIsNotNone(sg['security_group']['created_at']) - self.assertIsNotNone(sg['security_group']['updated_at']) + self.assertIsNotNone(security_group['created_at']) + self.assertIsNotNone(security_group['updated_at']) @decorators.idempotent_id('432ae0d3-32b4-413e-a9b3-091ac76da31b') def test_update_sg_with_timestamp(self): - sgc, _ = self._create_security_group() - sg = sgc['security_group'] - origin_updated_at = sg['updated_at'] - update_body = {'name': sg['name'] + 'new'} - body = self.client.update_security_group(sg['id'], **update_body) - updated_sg = body['security_group'] - new_updated_at = updated_sg['updated_at'] - self.assertEqual(sg['created_at'], updated_sg['created_at']) - # Verify that origin_updated_at is not same with new_updated_at - self.assertIsNot(origin_updated_at, new_updated_at) + security_group = self.create_security_group() + + # Make sure update time will be different + time.sleep(2.) + updated_security_group = self.client.update_security_group( + security_group['id'], name=security_group['name'] + 'new')[ + 'security_group'] + + # Verify that created_at hasn't changed + self.assertEqual(security_group['created_at'], + updated_security_group['created_at']) + # Verify that updated_at has changed + self.assertNotEqual(security_group['updated_at'], + updated_security_group['updated_at']) @decorators.idempotent_id('521e6723-43d6-12a6-8c3d-f5042ad9fc32') def test_show_sg_attribute_with_timestamp(self): - sg, _ = self._create_security_group() - body = self.client.show_security_group(sg['security_group']['id']) - show_sg = body['security_group'] - # verify the timestamp from creation and showed is same - self.assertEqual(sg['security_group']['created_at'], - show_sg['created_at']) - self.assertEqual(sg['security_group']['updated_at'], - show_sg['updated_at']) + security_group = self.create_security_group() + observed_security_group = self.client.show_security_group( + security_group['id'])['security_group'] - def _prepare_sgrule_test(self): - sg, _ = self._create_security_group() - sg_id = sg['security_group']['id'] - direction = 'ingress' - protocol = 'tcp' - port_range_min = 77 - port_range_max = 77 - rule_create_body = self.client.create_security_group_rule( - security_group_id=sg_id, - direction=direction, - ethertype=self.ethertype, - protocol=protocol, - port_range_min=port_range_min, - port_range_max=port_range_max, - remote_group_id=None, - remote_ip_prefix=None - ) - return rule_create_body['security_group_rule'] + # Verify that created_at hasn't changed + self.assertEqual(security_group['created_at'], + observed_security_group['created_at']) + # Verify that updated_at hasn't changed + self.assertEqual(security_group['updated_at'], + observed_security_group['updated_at']) + + def _create_security_group_rule(self): + security_group = self.create_security_group() + return self.create_security_group_rule( + security_group=security_group, + direction=constants.INGRESS_DIRECTION, + protocol=constants.PROTO_NAME_TCP, + port_range_min=77, + port_range_max=77) @decorators.idempotent_id('83e8bd32-43e0-a3f0-1af3-12a5733c653e') def test_create_sgrule_with_timestamp(self): - sgrule = self._prepare_sgrule_test() + security_group_rule = self._create_security_group_rule() # Verifies body contains timestamp fields - self.assertIsNotNone(sgrule['created_at']) - self.assertIsNotNone(sgrule['updated_at']) + self.assertIn('created_at', security_group_rule) + self.assertIn('updated_at', security_group_rule) @decorators.idempotent_id('143da0e6-ba17-43ad-b3d7-03aa759c3cb4') def test_show_sgrule_attribute_with_timestamp(self): - sgrule = self._prepare_sgrule_test() - body = self.client.show_security_group_rule(sgrule['id']) - show_sgrule = body['security_group_rule'] - # verify the timestamp from creation and showed is same - self.assertEqual(sgrule['created_at'], show_sgrule['created_at']) - self.assertEqual(sgrule['updated_at'], show_sgrule['updated_at']) + security_group_rule = self._create_security_group_rule() + + observed_security_group_rule = self.client.show_security_group_rule( + security_group_rule['id'])['security_group_rule'] + + # Verify the time stamp from creation and showed are equal + self.assertEqual(security_group_rule['created_at'], + observed_security_group_rule['created_at']) + self.assertEqual(security_group_rule['updated_at'], + observed_security_group_rule['updated_at']) diff --git a/neutron_tempest_plugin/scenario/admin/test_floatingip.py b/neutron_tempest_plugin/scenario/admin/test_floatingip.py index 1af55029..511452c2 100644 --- a/neutron_tempest_plugin/scenario/admin/test_floatingip.py +++ b/neutron_tempest_plugin/scenario/admin/test_floatingip.py @@ -14,7 +14,6 @@ # under the License. from tempest.common import utils from tempest.common import waiters -from tempest.lib.common.utils import data_utils from tempest.lib import decorators from neutron_tempest_plugin.common import ssh @@ -38,22 +37,17 @@ class FloatingIpTestCasesAdmin(base.BaseTempestTestCase): cls.create_router_interface(router['id'], cls.subnets[0]['id']) # Create keypair with admin privileges cls.keypair = cls.create_keypair(client=cls.os_admin.keypairs_client) - # Create security group with admin privileges - cls.secgroup = cls.os_admin.network_client.create_security_group( - name=data_utils.rand_name('secgroup'))['security_group'] - # Execute funcs to achieve ssh and ICMP capabilities - funcs = [cls.create_loginable_secgroup_rule, - cls.create_pingable_secgroup_rule] - for func in funcs: - func(secgroup_id=cls.secgroup['id'], - client=cls.os_admin.network_client) - @classmethod - def resource_cleanup(cls): - # Cleanup for security group - cls.os_admin.network_client.delete_security_group( - security_group_id=cls.secgroup['id']) - super(FloatingIpTestCasesAdmin, cls).resource_cleanup() + # Create security group with admin privileges + network_client = cls.os_admin.network_client + cls.secgroup = cls.create_security_group( + client=cls.os_admin.network_client) + cls.create_loginable_secgroup_rule( + secgroup_id=cls.secgroup['id'], + client=network_client) + cls.create_pingable_secgroup_rule( + secgroup_id=cls.secgroup['id'], + client=network_client), def _list_hypervisors(self): # List of hypervisors