Merge "Update test cases after security group menthods improvements."
This commit is contained in:
commit
326c82c828
|
@ -17,11 +17,9 @@ from tempest.lib import decorators
|
|||
from tempest.lib import exceptions as lib_exc
|
||||
|
||||
from neutron_tempest_plugin.api import base
|
||||
from neutron_tempest_plugin.api import base_security_groups as base_security
|
||||
|
||||
|
||||
class PortSecurityAdminTests(base_security.BaseSecGroupTest,
|
||||
base.BaseAdminNetworkTest):
|
||||
class PortSecurityAdminTests(base.BaseAdminNetworkTest):
|
||||
|
||||
required_extensions = ['port-security']
|
||||
|
||||
|
|
|
@ -10,6 +10,7 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from neutron_lib import constants
|
||||
from tempest.common import utils
|
||||
from tempest.lib.common.utils import data_utils
|
||||
from tempest.lib import decorators
|
||||
|
@ -110,54 +111,38 @@ class QuotasAdminNegativeTestJSON(test_quotas.QuotasTestBase):
|
|||
@decorators.idempotent_id('5c924ff7-b7a9-474f-92a3-dbe0f976ec13')
|
||||
@utils.requires_ext(extension="security-group", service="network")
|
||||
def test_create_security_group_when_quotas_is_full(self):
|
||||
tenant_id = self.create_project()['id']
|
||||
sg_args = {'tenant_id': tenant_id}
|
||||
# avoid a number that is made by default
|
||||
sg_list = self.admin_client.list_security_groups(
|
||||
tenant_id=tenant_id)['security_groups']
|
||||
num = len(sg_list) + 1
|
||||
project = self.create_project()
|
||||
|
||||
new_quotas = {'security_group': num}
|
||||
self._setup_quotas(tenant_id, **new_quotas)
|
||||
# Set quotas to allow to create only one more security group
|
||||
security_groups = self.admin_client.list_security_groups(
|
||||
tenant_id=project['id'])['security_groups']
|
||||
self._setup_quotas(project['id'],
|
||||
security_group=len(security_groups) + 1)
|
||||
|
||||
sg = self.admin_client.create_security_group(
|
||||
**sg_args)['security_group']
|
||||
self.addCleanup(self.admin_client.delete_security_group, sg['id'])
|
||||
|
||||
self.assertRaises(lib_exc.Conflict,
|
||||
self.admin_client.create_security_group, **sg_args)
|
||||
self.create_security_group(project=project)
|
||||
self.assertRaises(lib_exc.Conflict, self.create_security_group,
|
||||
project=project)
|
||||
|
||||
@decorators.attr(type='negative')
|
||||
@decorators.idempotent_id('b7143480-6118-4ed4-be38-1b6f15f30d05')
|
||||
@utils.requires_ext(extension="security-group", service="network")
|
||||
def test_create_security_group_rule_when_quotas_is_full(self):
|
||||
tenant_id = self.create_project()['id']
|
||||
sg_args = {'tenant_id': tenant_id}
|
||||
project = self.create_project()
|
||||
security_group = self.create_security_group(project=project)
|
||||
|
||||
sg = self.admin_client.create_security_group(
|
||||
**sg_args)['security_group']
|
||||
self.addCleanup(self.admin_client.delete_security_group, sg['id'])
|
||||
# Set quotas to allow to create only one more security group rule
|
||||
security_group_rules = self.admin_client.list_security_group_rules(
|
||||
tenant_id=project['id'])['security_group_rules']
|
||||
self._setup_quotas(project['id'],
|
||||
security_group_rule=len(security_group_rules) + 1)
|
||||
|
||||
# avoid a number that is made by default
|
||||
sg_rule_list = self.admin_client.list_security_group_rules(
|
||||
tenant_id=tenant_id)['security_group_rules']
|
||||
num = len(sg_rule_list) + 1
|
||||
|
||||
new_quotas = {'security_group_rule': num}
|
||||
self._setup_quotas(tenant_id, **new_quotas)
|
||||
|
||||
sg_rule_args = {'tenant_id': tenant_id,
|
||||
'security_group_id': sg['id'],
|
||||
'direction': 'ingress'}
|
||||
sg_rule = self.admin_client.create_security_group_rule(
|
||||
**sg_rule_args)['security_group_rule']
|
||||
self.addCleanup(
|
||||
self.admin_client.delete_security_group_rule, sg_rule['id'])
|
||||
|
||||
sg_rule_args['direction'] = 'egress'
|
||||
self.create_security_group_rule(
|
||||
project=project, security_group=security_group,
|
||||
direction=constants.INGRESS_DIRECTION)
|
||||
self.assertRaises(lib_exc.Conflict,
|
||||
self.admin_client.create_security_group_rule,
|
||||
**sg_rule_args)
|
||||
self.create_security_group_rule,
|
||||
project=project, security_group=security_group,
|
||||
direction=constants.EGRESS_DIRECTION)
|
||||
|
||||
@decorators.attr(type='negative')
|
||||
@decorators.idempotent_id('d00fe5bb-9db8-4e1a-9c31-490f52897e6f')
|
||||
|
|
|
@ -14,10 +14,10 @@
|
|||
|
||||
from tempest.lib import decorators
|
||||
|
||||
from neutron_tempest_plugin.api import base_security_groups as base
|
||||
from neutron_tempest_plugin.api import base
|
||||
|
||||
|
||||
class SecGroupAdminTest(base.BaseSecGroupTest):
|
||||
class SecGroupAdminTest(base.BaseNetworkTest):
|
||||
required_extensions = ['security-group']
|
||||
credentials = ['primary', 'admin']
|
||||
|
||||
|
|
|
@ -14,9 +14,6 @@
|
|||
# under the License.
|
||||
|
||||
from neutron_lib import constants
|
||||
from tempest.lib.common.utils import data_utils
|
||||
|
||||
from neutron_tempest_plugin.api import base
|
||||
|
||||
|
||||
# NOTE(yamamoto): The list of protocols here is what we had in Ocata.
|
||||
|
@ -45,9 +42,13 @@ V4_PROTOCOL_NAMES = {
|
|||
'udplite',
|
||||
'vrrp',
|
||||
}
|
||||
V4_PROTOCOL_INTS = set(v for k, v in constants.IP_PROTOCOL_MAP.items() if
|
||||
k in V4_PROTOCOL_NAMES)
|
||||
V6_PROTOCOL_LEGACY = set([constants.PROTO_NAME_IPV6_ICMP_LEGACY])
|
||||
|
||||
V4_PROTOCOL_INTS = {v
|
||||
for k, v in constants.IP_PROTOCOL_MAP.items()
|
||||
if k in V4_PROTOCOL_NAMES}
|
||||
|
||||
V6_PROTOCOL_LEGACY = {constants.PROTO_NAME_IPV6_ICMP_LEGACY}
|
||||
|
||||
V6_PROTOCOL_NAMES = {
|
||||
'ipv6-encap',
|
||||
'ipv6-frag',
|
||||
|
@ -56,66 +57,7 @@ V6_PROTOCOL_NAMES = {
|
|||
'ipv6-opts',
|
||||
'ipv6-route',
|
||||
}
|
||||
V6_PROTOCOL_INTS = set(v for k, v in constants.IP_PROTOCOL_MAP.items() if
|
||||
k in (V6_PROTOCOL_NAMES | V6_PROTOCOL_LEGACY))
|
||||
|
||||
|
||||
class BaseSecGroupTest(base.BaseNetworkTest):
|
||||
|
||||
def _create_security_group(self, **kwargs):
|
||||
# Create a security group
|
||||
name = data_utils.rand_name('secgroup-')
|
||||
group_create_body = self.client.create_security_group(name=name,
|
||||
**kwargs)
|
||||
self.addCleanup(self._delete_security_group,
|
||||
group_create_body['security_group']['id'])
|
||||
self.assertEqual(group_create_body['security_group']['name'], name)
|
||||
return group_create_body, name
|
||||
|
||||
def _delete_security_group(self, secgroup_id):
|
||||
self.client.delete_security_group(secgroup_id)
|
||||
# Asserting that the security group is not found in the list
|
||||
# after deletion
|
||||
list_body = self.client.list_security_groups()
|
||||
secgroup_list = list()
|
||||
for secgroup in list_body['security_groups']:
|
||||
secgroup_list.append(secgroup['id'])
|
||||
self.assertNotIn(secgroup_id, secgroup_list)
|
||||
|
||||
def _create_security_group_rule(self, **kwargs):
|
||||
rule_create_body = self.client.create_security_group_rule(**kwargs)
|
||||
# List rules and verify created rule is in response
|
||||
rule_list_body = (
|
||||
self.client.list_security_group_rules())
|
||||
rule_list = [rule['id']
|
||||
for rule in rule_list_body['security_group_rules']]
|
||||
self.assertIn(rule_create_body['security_group_rule']['id'],
|
||||
rule_list)
|
||||
self.addCleanup(self._delete_security_group_rule,
|
||||
rule_create_body['security_group_rule']['id'])
|
||||
return rule_create_body
|
||||
|
||||
def _show_security_group_rule(self, **kwargs):
|
||||
show_rule_body = self.client.show_security_group_rule(kwargs['id'])
|
||||
for key, value in kwargs.items():
|
||||
self.assertEqual(value,
|
||||
show_rule_body['security_group_rule'][key],
|
||||
"%s does not match." % key)
|
||||
|
||||
def _delete_security_group_rule(self, secgroup_rule_id):
|
||||
self.client.delete_security_group_rule(secgroup_rule_id)
|
||||
rule_list_body = self.client.list_security_group_rules()
|
||||
rule_list = [rule['id']
|
||||
for rule in rule_list_body['security_group_rules']]
|
||||
self.assertNotIn(secgroup_rule_id, rule_list)
|
||||
|
||||
def _test_create_show_delete_security_group_rule(self, **kwargs):
|
||||
# The security group rule is deleted by the cleanup call in
|
||||
# _create_security_group_rule.
|
||||
rule_create_body = (
|
||||
self._create_security_group_rule(**kwargs)['security_group_rule'])
|
||||
self._show_security_group_rule(
|
||||
id=rule_create_body['id'],
|
||||
protocol=rule_create_body['protocol'],
|
||||
direction=rule_create_body['direction'],
|
||||
ethertype=rule_create_body['ethertype'])
|
||||
V6_PROTOCOL_INTS = {v
|
||||
for k, v in constants.IP_PROTOCOL_MAP.items()
|
||||
if k in (V6_PROTOCOL_NAMES | V6_PROTOCOL_LEGACY)}
|
||||
|
|
|
@ -19,15 +19,13 @@ from tempest.lib import decorators
|
|||
from tempest.lib import exceptions as lib_exc
|
||||
|
||||
from neutron_tempest_plugin.api import base
|
||||
from neutron_tempest_plugin.api import base_security_groups as base_security
|
||||
|
||||
FAKE_IP = '10.0.0.1'
|
||||
FAKE_MAC = '00:25:64:e8:19:dd'
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
class PortSecTest(base_security.BaseSecGroupTest,
|
||||
base.BaseNetworkTest):
|
||||
class PortSecTest(base.BaseNetworkTest):
|
||||
|
||||
@decorators.idempotent_id('7c338ddf-e64e-4118-bd33-e49a1f2f1495')
|
||||
@utils.requires_ext(extension='port-security', service='network')
|
||||
|
@ -76,7 +74,7 @@ class PortSecTest(base_security.BaseSecGroupTest,
|
|||
network = self.create_network()
|
||||
self.create_subnet(network)
|
||||
|
||||
sec_group_body, _ = self._create_security_group()
|
||||
security_group = self.create_security_group()
|
||||
port = self.create_port(network)
|
||||
|
||||
# Exception when set port-sec to False with sec-group defined
|
||||
|
@ -88,7 +86,7 @@ class PortSecTest(base_security.BaseSecGroupTest,
|
|||
self.assertEmpty(port['security_groups'])
|
||||
self.assertFalse(port['port_security_enabled'])
|
||||
port = self.update_port(
|
||||
port, security_groups=[sec_group_body['security_group']['id']],
|
||||
port, security_groups=[security_group['id']],
|
||||
port_security_enabled=True)
|
||||
|
||||
self.assertNotEmpty(port['security_groups'])
|
||||
|
@ -102,11 +100,11 @@ class PortSecTest(base_security.BaseSecGroupTest,
|
|||
def test_port_sec_update_pass(self):
|
||||
network = self.create_network()
|
||||
self.create_subnet(network)
|
||||
sec_group, _ = self._create_security_group()
|
||||
sec_group_id = sec_group['security_group']['id']
|
||||
port = self.create_port(network, security_groups=[sec_group_id],
|
||||
port_security_enabled=True)
|
||||
security_group = self.create_security_group()
|
||||
|
||||
port = self.create_port(network,
|
||||
security_groups=[security_group['id']],
|
||||
port_security_enabled=True)
|
||||
self.assertNotEmpty(port['security_groups'])
|
||||
self.assertTrue(port['port_security_enabled'])
|
||||
|
||||
|
@ -114,7 +112,7 @@ class PortSecTest(base_security.BaseSecGroupTest,
|
|||
self.assertEmpty(port['security_groups'])
|
||||
self.assertTrue(port['port_security_enabled'])
|
||||
|
||||
port = self.update_port(port, security_groups=[sec_group_id])
|
||||
port = self.update_port(port, security_groups=[security_group['id']])
|
||||
self.assertNotEmpty(port['security_groups'])
|
||||
port = self.update_port(port, security_groups=[],
|
||||
port_security_enabled=False)
|
||||
|
|
|
@ -12,16 +12,16 @@
|
|||
|
||||
import netaddr
|
||||
|
||||
from neutron_lib import constants
|
||||
from tempest.common import utils
|
||||
from tempest.lib import decorators
|
||||
from tempest.lib import exceptions
|
||||
|
||||
from neutron_tempest_plugin.api import base
|
||||
from neutron_tempest_plugin.api import base_security_groups as bsg
|
||||
from neutron_tempest_plugin import config
|
||||
|
||||
|
||||
class TestRevisions(base.BaseAdminNetworkTest, bsg.BaseSecGroupTest):
|
||||
class TestRevisions(base.BaseAdminNetworkTest):
|
||||
|
||||
required_extensions = ['standard-attr-revisions']
|
||||
|
||||
|
@ -111,46 +111,51 @@ class TestRevisions(base.BaseAdminNetworkTest, bsg.BaseSecGroupTest):
|
|||
@decorators.idempotent_id('6c256f71-c929-4200-b3dc-4e1843506be5')
|
||||
@utils.requires_ext(extension="security-group", service="network")
|
||||
def test_update_sg_group_bumps_revision(self):
|
||||
sg, name = self._create_security_group()
|
||||
self.assertIn('revision_number', sg['security_group'])
|
||||
update_body = self.client.update_security_group(
|
||||
sg['security_group']['id'], name='new_sg_name')
|
||||
self.assertGreater(update_body['security_group']['revision_number'],
|
||||
sg['security_group']['revision_number'])
|
||||
security_group = self.create_security_group()
|
||||
self.assertIn('revision_number', security_group)
|
||||
updated_security_group = self.client.update_security_group(
|
||||
security_group['id'], name='new_sg_name')['security_group']
|
||||
self.assertGreater(updated_security_group['revision_number'],
|
||||
security_group['revision_number'])
|
||||
|
||||
@decorators.idempotent_id('6489632f-8550-4453-a674-c98849742967')
|
||||
@utils.requires_ext(extension="security-group", service="network")
|
||||
def test_update_port_sg_binding_bumps_revision(self):
|
||||
net = self.create_network()
|
||||
self.addCleanup(self.client.delete_network, net['id'])
|
||||
port = self.create_port(net)
|
||||
self.addCleanup(self.client.delete_port, port['id'])
|
||||
sg = self._create_security_group()[0]
|
||||
self.client.update_port(
|
||||
port['id'], security_groups=[sg['security_group']['id']])
|
||||
updated = self.client.show_port(port['id'])
|
||||
updated2 = self.client.update_port(port['id'], security_groups=[])
|
||||
self.assertGreater(updated['port']['revision_number'],
|
||||
network = self.create_network()
|
||||
port = self.create_port(network)
|
||||
|
||||
security_group = self.create_security_group()
|
||||
updated_port = self.client.update_port(
|
||||
port['id'], security_groups=[security_group['id']])['port']
|
||||
self.assertGreater(updated_port['revision_number'],
|
||||
port['revision_number'])
|
||||
self.assertGreater(updated2['port']['revision_number'],
|
||||
updated['port']['revision_number'])
|
||||
|
||||
updated_port2 = self.client.update_port(
|
||||
port['id'], security_groups=[])['port']
|
||||
self.assertGreater(updated_port2['revision_number'],
|
||||
updated_port['revision_number'])
|
||||
|
||||
@decorators.idempotent_id('29c7ab2b-d1d8-425d-8cec-fcf632960f22')
|
||||
@utils.requires_ext(extension="security-group", service="network")
|
||||
def test_update_sg_rule_bumps_sg_revision(self):
|
||||
sg, name = self._create_security_group()
|
||||
rule = self.client.create_security_group_rule(
|
||||
security_group_id=sg['security_group']['id'],
|
||||
protocol='tcp', direction='ingress', ethertype=self.ethertype,
|
||||
port_range_min=60, port_range_max=70)
|
||||
updated = self.client.show_security_group(sg['security_group']['id'])
|
||||
self.assertGreater(updated['security_group']['revision_number'],
|
||||
sg['security_group']['revision_number'])
|
||||
self.client.delete_security_group_rule(
|
||||
rule['security_group_rule']['id'])
|
||||
updated2 = self.client.show_security_group(sg['security_group']['id'])
|
||||
self.assertGreater(updated2['security_group']['revision_number'],
|
||||
updated['security_group']['revision_number'])
|
||||
security_group = self.create_security_group()
|
||||
|
||||
security_group_rule = self.create_security_group_rule(
|
||||
security_group=security_group,
|
||||
protocol=constants.PROTO_NAME_TCP,
|
||||
direction=constants.INGRESS_DIRECTION,
|
||||
port_range_min=60,
|
||||
port_range_max=70)
|
||||
updated_security_group = self.client.show_security_group(
|
||||
security_group['id'])['security_group']
|
||||
self.assertGreater(updated_security_group['revision_number'],
|
||||
security_group['revision_number'])
|
||||
|
||||
self.client.delete_security_group_rule(security_group_rule['id'])
|
||||
updated_security_group2 = self.client.show_security_group(
|
||||
security_group['id'])['security_group']
|
||||
self.assertGreater(updated_security_group2['revision_number'],
|
||||
updated_security_group['revision_number'])
|
||||
|
||||
@decorators.idempotent_id('db70c285-0365-4fac-9f55-2a0ad8cf55a8')
|
||||
@utils.requires_ext(extension="allowed-address-pairs", service="network")
|
||||
|
|
|
@ -17,39 +17,40 @@ from neutron_lib import constants
|
|||
from tempest.lib.common.utils import data_utils
|
||||
from tempest.lib import decorators
|
||||
|
||||
from neutron_tempest_plugin.api import base_security_groups as base
|
||||
from neutron_tempest_plugin.api import base
|
||||
from neutron_tempest_plugin.api import base_security_groups
|
||||
|
||||
|
||||
class SecGroupTest(base.BaseSecGroupTest):
|
||||
class SecGroupTest(base.BaseNetworkTest):
|
||||
|
||||
required_extensions = ['security-group']
|
||||
|
||||
@decorators.idempotent_id('bfd128e5-3c92-44b6-9d66-7fe29d22c802')
|
||||
def test_create_list_update_show_delete_security_group(self):
|
||||
group_create_body, name = self._create_security_group()
|
||||
security_group = self.create_security_group()
|
||||
|
||||
# List security groups and verify if created group is there in response
|
||||
list_body = self.client.list_security_groups()
|
||||
secgroup_list = list()
|
||||
for secgroup in list_body['security_groups']:
|
||||
secgroup_list.append(secgroup['id'])
|
||||
self.assertIn(group_create_body['security_group']['id'], secgroup_list)
|
||||
security_groups = self.client.list_security_groups()['security_groups']
|
||||
self.assertIn(security_group['id'],
|
||||
{sg['id'] for sg in security_groups})
|
||||
|
||||
# Update the security group
|
||||
new_name = data_utils.rand_name('security')
|
||||
new_description = data_utils.rand_name('security-description')
|
||||
update_body = self.client.update_security_group(
|
||||
group_create_body['security_group']['id'],
|
||||
name=new_name,
|
||||
description=new_description)
|
||||
updated_security_group = self.client.update_security_group(
|
||||
security_group['id'], name=new_name,
|
||||
description=new_description)['security_group']
|
||||
|
||||
# Verify if security group is updated
|
||||
self.assertEqual(update_body['security_group']['name'], new_name)
|
||||
self.assertEqual(update_body['security_group']['description'],
|
||||
self.assertEqual(updated_security_group['name'], new_name)
|
||||
self.assertEqual(updated_security_group['description'],
|
||||
new_description)
|
||||
|
||||
# Show details of the updated security group
|
||||
show_body = self.client.show_security_group(
|
||||
group_create_body['security_group']['id'])
|
||||
self.assertEqual(show_body['security_group']['name'], new_name)
|
||||
self.assertEqual(show_body['security_group']['description'],
|
||||
observed_security_group = self.client.show_security_group(
|
||||
security_group['id'])['security_group']
|
||||
self.assertEqual(observed_security_group['name'], new_name)
|
||||
self.assertEqual(observed_security_group['description'],
|
||||
new_description)
|
||||
|
||||
@decorators.idempotent_id('7c0ecb10-b2db-11e6-9b14-000c29248b0d')
|
||||
|
@ -67,58 +68,48 @@ class SecGroupTest(base.BaseSecGroupTest):
|
|||
self.assertIsNotNone(secgrp['id'])
|
||||
|
||||
|
||||
class SecGroupProtocolTest(base.BaseSecGroupTest):
|
||||
class SecGroupProtocolTest(base.BaseNetworkTest):
|
||||
|
||||
protocol_names = base_security_groups.V4_PROTOCOL_NAMES
|
||||
protocol_ints = base_security_groups.V4_PROTOCOL_INTS
|
||||
|
||||
@decorators.idempotent_id('282e3681-aa6e-42a7-b05c-c341aa1e3cdf')
|
||||
def test_create_show_delete_security_group_rule_names(self):
|
||||
group_create_body, _ = self._create_security_group()
|
||||
for protocol in base.V4_PROTOCOL_NAMES:
|
||||
self._test_create_show_delete_security_group_rule(
|
||||
security_group_id=group_create_body['security_group']['id'],
|
||||
protocol=protocol,
|
||||
def test_security_group_rule_protocol_names(self):
|
||||
self._test_security_group_rule_protocols(protocols=self.protocol_names)
|
||||
|
||||
@decorators.idempotent_id('66e47f1f-20b6-4417-8839-3cc671c7afa3')
|
||||
def test_security_group_rule_protocol_ints(self):
|
||||
self._test_security_group_rule_protocols(protocols=self.protocol_ints)
|
||||
|
||||
def _test_security_group_rule_protocols(self, protocols):
|
||||
security_group = self.create_security_group()
|
||||
for protocol in protocols:
|
||||
self._test_security_group_rule(
|
||||
security_group=security_group,
|
||||
protocol=str(protocol),
|
||||
direction=constants.INGRESS_DIRECTION,
|
||||
ethertype=self.ethertype)
|
||||
|
||||
@decorators.idempotent_id('66e47f1f-20b6-4417-8839-3cc671c7afa3')
|
||||
def test_create_show_delete_security_group_rule_integers(self):
|
||||
group_create_body, _ = self._create_security_group()
|
||||
for protocol in base.V4_PROTOCOL_INTS:
|
||||
self._test_create_show_delete_security_group_rule(
|
||||
security_group_id=group_create_body['security_group']['id'],
|
||||
protocol=protocol,
|
||||
direction=constants.INGRESS_DIRECTION,
|
||||
ethertype=self.ethertype)
|
||||
def _test_security_group_rule(self, security_group, **kwargs):
|
||||
security_group_rule = self.create_security_group_rule(
|
||||
security_group=security_group, **kwargs)
|
||||
observed_security_group_rule = self.client.show_security_group_rule(
|
||||
security_group_rule['id'])['security_group_rule']
|
||||
for key, value in kwargs.items():
|
||||
self.assertEqual(value, security_group_rule[key],
|
||||
"{!r} does not match.".format(key))
|
||||
self.assertEqual(value, observed_security_group_rule[key],
|
||||
"{!r} does not match.".format(key))
|
||||
|
||||
|
||||
class SecGroupProtocolIPv6Test(SecGroupProtocolTest):
|
||||
_ip_version = constants.IP_VERSION_6
|
||||
|
||||
@decorators.idempotent_id('1f7cc9f5-e0d5-487c-8384-3d74060ab530')
|
||||
def test_create_security_group_rule_with_ipv6_protocol_names(self):
|
||||
group_create_body, _ = self._create_security_group()
|
||||
for protocol in base.V6_PROTOCOL_NAMES:
|
||||
self._test_create_show_delete_security_group_rule(
|
||||
security_group_id=group_create_body['security_group']['id'],
|
||||
protocol=protocol,
|
||||
direction=constants.INGRESS_DIRECTION,
|
||||
ethertype=self.ethertype)
|
||||
_ip_version = constants.IP_VERSION_6
|
||||
protocol_names = base_security_groups.V6_PROTOCOL_NAMES
|
||||
protocol_ints = base_security_groups.V6_PROTOCOL_INTS
|
||||
protocol_legacy_names = base_security_groups.V6_PROTOCOL_LEGACY
|
||||
|
||||
@decorators.idempotent_id('c7d17b41-3b4e-4add-bb3b-6af59baaaffa')
|
||||
def test_create_security_group_rule_with_ipv6_protocol_legacy_names(self):
|
||||
group_create_body, _ = self._create_security_group()
|
||||
for protocol in base.V6_PROTOCOL_LEGACY:
|
||||
self._test_create_show_delete_security_group_rule(
|
||||
security_group_id=group_create_body['security_group']['id'],
|
||||
protocol=protocol,
|
||||
direction=constants.INGRESS_DIRECTION,
|
||||
ethertype=self.ethertype)
|
||||
|
||||
@decorators.idempotent_id('bcfce0b7-bc96-40ae-9b08-3f6774ee0260')
|
||||
def test_create_security_group_rule_with_ipv6_protocol_integers(self):
|
||||
group_create_body, _ = self._create_security_group()
|
||||
for protocol in base.V6_PROTOCOL_INTS:
|
||||
self._test_create_show_delete_security_group_rule(
|
||||
security_group_id=group_create_body['security_group']['id'],
|
||||
protocol=protocol,
|
||||
direction=constants.INGRESS_DIRECTION,
|
||||
ethertype=self.ethertype)
|
||||
def test_security_group_rule_protocol_legacy_names(self):
|
||||
self._test_security_group_rule_protocols(
|
||||
protocols=self.protocol_legacy_names)
|
||||
|
|
|
@ -18,12 +18,14 @@ from neutron_lib.db import constants as db_const
|
|||
from tempest.lib import decorators
|
||||
from tempest.lib import exceptions as lib_exc
|
||||
|
||||
from neutron_tempest_plugin.api import base_security_groups as base
|
||||
from neutron_tempest_plugin.api import base
|
||||
from neutron_tempest_plugin.api import base_security_groups
|
||||
|
||||
|
||||
LONG_NAME_NG = 'x' * (db_const.NAME_FIELD_SIZE + 1)
|
||||
|
||||
|
||||
class NegativeSecGroupTest(base.BaseSecGroupTest):
|
||||
class NegativeSecGroupTest(base.BaseNetworkTest):
|
||||
|
||||
required_extensions = ['security-group']
|
||||
|
||||
|
@ -36,72 +38,68 @@ class NegativeSecGroupTest(base.BaseSecGroupTest):
|
|||
@decorators.idempotent_id('594edfa8-9a5b-438e-9344-49aece337d49')
|
||||
def test_create_security_group_with_too_long_name(self):
|
||||
self.assertRaises(lib_exc.BadRequest,
|
||||
self.client.create_security_group,
|
||||
self.create_security_group,
|
||||
name=LONG_NAME_NG)
|
||||
|
||||
@decorators.attr(type='negative')
|
||||
@decorators.idempotent_id('b6b79838-7430-4d3f-8e07-51dfb61802c2')
|
||||
def test_create_security_group_with_boolean_type_name(self):
|
||||
self.assertRaises(lib_exc.BadRequest,
|
||||
self.client.create_security_group,
|
||||
self.create_security_group,
|
||||
name=True)
|
||||
|
||||
@decorators.attr(type='negative')
|
||||
@decorators.idempotent_id('55100aa8-b24f-333c-0bef-64eefd85f15c')
|
||||
def test_update_default_security_group_name(self):
|
||||
sg_list = self.client.list_security_groups(name='default')
|
||||
sg = sg_list['security_groups'][0]
|
||||
security_group = self.client.list_security_groups(name='default')[
|
||||
'security_groups'][0]
|
||||
self.assertRaises(lib_exc.Conflict, self.client.update_security_group,
|
||||
sg['id'], name='test')
|
||||
security_group['id'], name='test')
|
||||
|
||||
@decorators.attr(type='negative')
|
||||
@decorators.idempotent_id('c8510dd8-c3a8-4df9-ae44-24354db50960')
|
||||
def test_update_security_group_with_too_long_name(self):
|
||||
sg_list = self.client.list_security_groups(name='default')
|
||||
sg = sg_list['security_groups'][0]
|
||||
security_group = self.client.list_security_groups(name='default')[
|
||||
'security_groups'][0]
|
||||
self.assertRaises(lib_exc.BadRequest,
|
||||
self.client.update_security_group,
|
||||
sg['id'], name=LONG_NAME_NG)
|
||||
security_group['id'], name=LONG_NAME_NG)
|
||||
|
||||
@decorators.attr(type='negative')
|
||||
@decorators.idempotent_id('d9a14917-f66f-4eca-ab72-018563917f1b')
|
||||
def test_update_security_group_with_boolean_type_name(self):
|
||||
sg_list = self.client.list_security_groups(name='default')
|
||||
sg = sg_list['security_groups'][0]
|
||||
security_group = self.client.list_security_groups(name='default')[
|
||||
'security_groups'][0]
|
||||
self.assertRaises(lib_exc.BadRequest,
|
||||
self.client.update_security_group,
|
||||
sg['id'], name=True)
|
||||
security_group['id'], name=True)
|
||||
|
||||
@decorators.attr(type='negative')
|
||||
@decorators.idempotent_id('3200b1a8-d73b-48e9-b03f-e891a4abe2d3')
|
||||
def test_delete_in_use_sec_group(self):
|
||||
sgroup = self.os_primary.network_client.create_security_group(
|
||||
name='sgroup')
|
||||
self.security_groups.append(sgroup['security_group'])
|
||||
port = self.client.create_port(
|
||||
network_id=self.network['id'],
|
||||
security_groups=[sgroup['security_group']['id']])
|
||||
self.ports.append(port['port'])
|
||||
security_group = self.create_security_group()
|
||||
self.create_port(network=self.network,
|
||||
security_groups=[security_group['id']])
|
||||
self.assertRaises(lib_exc.Conflict,
|
||||
self.os_primary.network_client.delete_security_group,
|
||||
security_group_id=sgroup['security_group']['id'])
|
||||
security_group_id=security_group['id'])
|
||||
|
||||
|
||||
class NegativeSecGroupIPv6Test(NegativeSecGroupTest):
|
||||
_ip_version = constants.IP_VERSION_6
|
||||
|
||||
|
||||
class NegativeSecGroupProtocolTest(base.BaseSecGroupTest):
|
||||
class NegativeSecGroupProtocolTest(base.BaseNetworkTest):
|
||||
|
||||
def _test_create_security_group_rule_with_bad_protocols(self, protocols):
|
||||
group_create_body, _ = self._create_security_group()
|
||||
security_group = self.create_security_group()
|
||||
|
||||
# bad protocols can include v6 protocols because self.ethertype is v4
|
||||
for protocol in protocols:
|
||||
self.assertRaises(
|
||||
lib_exc.BadRequest,
|
||||
self.client.create_security_group_rule,
|
||||
security_group_id=group_create_body['security_group']['id'],
|
||||
security_group_id=security_group['id'],
|
||||
protocol=protocol, direction=constants.INGRESS_DIRECTION,
|
||||
ethertype=self.ethertype)
|
||||
|
||||
|
@ -109,10 +107,10 @@ class NegativeSecGroupProtocolTest(base.BaseSecGroupTest):
|
|||
@decorators.idempotent_id('cccbb0f3-c273-43ed-b3fc-1efc48833810')
|
||||
def test_create_security_group_rule_with_ipv6_protocol_names(self):
|
||||
self._test_create_security_group_rule_with_bad_protocols(
|
||||
base.V6_PROTOCOL_NAMES)
|
||||
base_security_groups.V6_PROTOCOL_NAMES)
|
||||
|
||||
@decorators.attr(type=['negative'])
|
||||
@decorators.idempotent_id('8aa636bd-7060-4fdf-b722-cdae28e2f1ef')
|
||||
def test_create_security_group_rule_with_ipv6_protocol_integers(self):
|
||||
self._test_create_security_group_rule_with_bad_protocols(
|
||||
base.V6_PROTOCOL_INTS)
|
||||
base_security_groups.V6_PROTOCOL_INTS)
|
||||
|
|
|
@ -11,14 +11,15 @@
|
|||
# under the License.
|
||||
|
||||
import copy
|
||||
import time
|
||||
|
||||
from neutron_lib import constants
|
||||
from tempest.common import utils
|
||||
from tempest.lib.common.utils import data_utils
|
||||
from tempest.lib import decorators
|
||||
|
||||
from neutron_tempest_plugin.api import base
|
||||
from neutron_tempest_plugin.api import base_routers
|
||||
from neutron_tempest_plugin.api import base_security_groups
|
||||
from neutron_tempest_plugin import config
|
||||
|
||||
CONF = config.CONF
|
||||
|
@ -276,7 +277,7 @@ class TestTimeStampWithL3(base_routers.BaseRouterTest):
|
|||
show_fip['updated_at'])
|
||||
|
||||
|
||||
class TestTimeStampWithSecurityGroup(base_security_groups.BaseSecGroupTest):
|
||||
class TestTimeStampWithSecurityGroup(base.BaseNetworkTest):
|
||||
|
||||
required_extensions = ['standard-attr-timestamp']
|
||||
|
||||
|
@ -287,66 +288,66 @@ class TestTimeStampWithSecurityGroup(base_security_groups.BaseSecGroupTest):
|
|||
|
||||
@decorators.idempotent_id('a3150a7b-d31a-423a-abf3-45e71c97cbac')
|
||||
def test_create_sg_with_timestamp(self):
|
||||
sg, _ = self._create_security_group()
|
||||
security_group = self.create_security_group()
|
||||
# Verifies body contains timestamp fields
|
||||
self.assertIsNotNone(sg['security_group']['created_at'])
|
||||
self.assertIsNotNone(sg['security_group']['updated_at'])
|
||||
self.assertIsNotNone(security_group['created_at'])
|
||||
self.assertIsNotNone(security_group['updated_at'])
|
||||
|
||||
@decorators.idempotent_id('432ae0d3-32b4-413e-a9b3-091ac76da31b')
|
||||
def test_update_sg_with_timestamp(self):
|
||||
sgc, _ = self._create_security_group()
|
||||
sg = sgc['security_group']
|
||||
origin_updated_at = sg['updated_at']
|
||||
update_body = {'name': sg['name'] + 'new'}
|
||||
body = self.client.update_security_group(sg['id'], **update_body)
|
||||
updated_sg = body['security_group']
|
||||
new_updated_at = updated_sg['updated_at']
|
||||
self.assertEqual(sg['created_at'], updated_sg['created_at'])
|
||||
# Verify that origin_updated_at is not same with new_updated_at
|
||||
self.assertIsNot(origin_updated_at, new_updated_at)
|
||||
security_group = self.create_security_group()
|
||||
|
||||
# Make sure update time will be different
|
||||
time.sleep(2.)
|
||||
updated_security_group = self.client.update_security_group(
|
||||
security_group['id'], name=security_group['name'] + 'new')[
|
||||
'security_group']
|
||||
|
||||
# Verify that created_at hasn't changed
|
||||
self.assertEqual(security_group['created_at'],
|
||||
updated_security_group['created_at'])
|
||||
# Verify that updated_at has changed
|
||||
self.assertNotEqual(security_group['updated_at'],
|
||||
updated_security_group['updated_at'])
|
||||
|
||||
@decorators.idempotent_id('521e6723-43d6-12a6-8c3d-f5042ad9fc32')
|
||||
def test_show_sg_attribute_with_timestamp(self):
|
||||
sg, _ = self._create_security_group()
|
||||
body = self.client.show_security_group(sg['security_group']['id'])
|
||||
show_sg = body['security_group']
|
||||
# verify the timestamp from creation and showed is same
|
||||
self.assertEqual(sg['security_group']['created_at'],
|
||||
show_sg['created_at'])
|
||||
self.assertEqual(sg['security_group']['updated_at'],
|
||||
show_sg['updated_at'])
|
||||
security_group = self.create_security_group()
|
||||
observed_security_group = self.client.show_security_group(
|
||||
security_group['id'])['security_group']
|
||||
|
||||
def _prepare_sgrule_test(self):
|
||||
sg, _ = self._create_security_group()
|
||||
sg_id = sg['security_group']['id']
|
||||
direction = 'ingress'
|
||||
protocol = 'tcp'
|
||||
port_range_min = 77
|
||||
port_range_max = 77
|
||||
rule_create_body = self.client.create_security_group_rule(
|
||||
security_group_id=sg_id,
|
||||
direction=direction,
|
||||
ethertype=self.ethertype,
|
||||
protocol=protocol,
|
||||
port_range_min=port_range_min,
|
||||
port_range_max=port_range_max,
|
||||
remote_group_id=None,
|
||||
remote_ip_prefix=None
|
||||
)
|
||||
return rule_create_body['security_group_rule']
|
||||
# Verify that created_at hasn't changed
|
||||
self.assertEqual(security_group['created_at'],
|
||||
observed_security_group['created_at'])
|
||||
# Verify that updated_at hasn't changed
|
||||
self.assertEqual(security_group['updated_at'],
|
||||
observed_security_group['updated_at'])
|
||||
|
||||
def _create_security_group_rule(self):
|
||||
security_group = self.create_security_group()
|
||||
return self.create_security_group_rule(
|
||||
security_group=security_group,
|
||||
direction=constants.INGRESS_DIRECTION,
|
||||
protocol=constants.PROTO_NAME_TCP,
|
||||
port_range_min=77,
|
||||
port_range_max=77)
|
||||
|
||||
@decorators.idempotent_id('83e8bd32-43e0-a3f0-1af3-12a5733c653e')
|
||||
def test_create_sgrule_with_timestamp(self):
|
||||
sgrule = self._prepare_sgrule_test()
|
||||
security_group_rule = self._create_security_group_rule()
|
||||
# Verifies body contains timestamp fields
|
||||
self.assertIsNotNone(sgrule['created_at'])
|
||||
self.assertIsNotNone(sgrule['updated_at'])
|
||||
self.assertIn('created_at', security_group_rule)
|
||||
self.assertIn('updated_at', security_group_rule)
|
||||
|
||||
@decorators.idempotent_id('143da0e6-ba17-43ad-b3d7-03aa759c3cb4')
|
||||
def test_show_sgrule_attribute_with_timestamp(self):
|
||||
sgrule = self._prepare_sgrule_test()
|
||||
body = self.client.show_security_group_rule(sgrule['id'])
|
||||
show_sgrule = body['security_group_rule']
|
||||
# verify the timestamp from creation and showed is same
|
||||
self.assertEqual(sgrule['created_at'], show_sgrule['created_at'])
|
||||
self.assertEqual(sgrule['updated_at'], show_sgrule['updated_at'])
|
||||
security_group_rule = self._create_security_group_rule()
|
||||
|
||||
observed_security_group_rule = self.client.show_security_group_rule(
|
||||
security_group_rule['id'])['security_group_rule']
|
||||
|
||||
# Verify the time stamp from creation and showed are equal
|
||||
self.assertEqual(security_group_rule['created_at'],
|
||||
observed_security_group_rule['created_at'])
|
||||
self.assertEqual(security_group_rule['updated_at'],
|
||||
observed_security_group_rule['updated_at'])
|
||||
|
|
|
@ -14,7 +14,6 @@
|
|||
# under the License.
|
||||
from tempest.common import utils
|
||||
from tempest.common import waiters
|
||||
from tempest.lib.common.utils import data_utils
|
||||
from tempest.lib import decorators
|
||||
|
||||
from neutron_tempest_plugin.common import ssh
|
||||
|
@ -38,22 +37,17 @@ class FloatingIpTestCasesAdmin(base.BaseTempestTestCase):
|
|||
cls.create_router_interface(router['id'], cls.subnets[0]['id'])
|
||||
# Create keypair with admin privileges
|
||||
cls.keypair = cls.create_keypair(client=cls.os_admin.keypairs_client)
|
||||
# Create security group with admin privileges
|
||||
cls.secgroup = cls.os_admin.network_client.create_security_group(
|
||||
name=data_utils.rand_name('secgroup'))['security_group']
|
||||
# Execute funcs to achieve ssh and ICMP capabilities
|
||||
funcs = [cls.create_loginable_secgroup_rule,
|
||||
cls.create_pingable_secgroup_rule]
|
||||
for func in funcs:
|
||||
func(secgroup_id=cls.secgroup['id'],
|
||||
client=cls.os_admin.network_client)
|
||||
|
||||
@classmethod
|
||||
def resource_cleanup(cls):
|
||||
# Cleanup for security group
|
||||
cls.os_admin.network_client.delete_security_group(
|
||||
security_group_id=cls.secgroup['id'])
|
||||
super(FloatingIpTestCasesAdmin, cls).resource_cleanup()
|
||||
# Create security group with admin privileges
|
||||
network_client = cls.os_admin.network_client
|
||||
cls.secgroup = cls.create_security_group(
|
||||
client=cls.os_admin.network_client)
|
||||
cls.create_loginable_secgroup_rule(
|
||||
secgroup_id=cls.secgroup['id'],
|
||||
client=network_client)
|
||||
cls.create_pingable_secgroup_rule(
|
||||
secgroup_id=cls.secgroup['id'],
|
||||
client=network_client),
|
||||
|
||||
def _list_hypervisors(self):
|
||||
# List of hypervisors
|
||||
|
|
Loading…
Reference in New Issue