Improve security groups management.

- Add create_security_group_rule method to base class
  to make easier creating rules for IPv6 test cases.
- Add delete_security_group method.
- Make sure segurity groups are deleted using the
  client that has been used to create them.
- Improve security group client handling.

Change-Id: I50858d5198d8a70a2bb9fb278786c433d7cb12ca
This commit is contained in:
Federico Ressi 2018-10-10 14:01:08 +02:00
parent 6f9bb77d98
commit 4c590d7cff
3 changed files with 95 additions and 32 deletions

View File

@ -189,15 +189,15 @@ class BaseNetworkTest(test.BaseTestCase):
network['id'])
# Clean up security groups
for secgroup in cls.security_groups:
cls._try_delete_resource(cls.client.delete_security_group,
secgroup['id'])
for security_group in cls.security_groups:
cls._try_delete_resource(cls.delete_security_group,
security_group)
# Clean up admin security groups
for secgroup in cls.admin_security_groups:
cls._try_delete_resource(
cls.admin_client.delete_security_group,
secgroup['id'])
for security_group in cls.admin_security_groups:
cls._try_delete_resource(cls.delete_security_group,
security_group,
client=cls.admin_client)
for subnetpool in cls.subnetpools:
cls._try_delete_resource(cls.client.delete_subnetpool,
@ -717,18 +717,78 @@ class BaseNetworkTest(test.BaseTestCase):
description=test_description)['project']
cls.projects.append(project)
# Create a project will create a default security group.
# We make these security groups into admin_security_groups.
sgs_list = cls.admin_client.list_security_groups(
tenant_id=project['id'])['security_groups']
for sg in sgs_list:
cls.admin_security_groups.append(sg)
for security_group in sgs_list:
# Make sure delete_security_group method will use
# the admin client for this group
security_group['client'] = cls.admin_client
cls.security_groups.append(security_group)
return project
@classmethod
def create_security_group(cls, name, **kwargs):
body = cls.client.create_security_group(name=name, **kwargs)
cls.security_groups.append(body['security_group'])
return body['security_group']
def create_security_group(cls, name=None, project=None, client=None,
**kwargs):
if project:
client = client or cls.admin_client
project_id = kwargs.setdefault('project_id', project['id'])
tenant_id = kwargs.setdefault('tenant_id', project['id'])
if project_id != project['id'] or tenant_id != project['id']:
raise ValueError('Project ID specified multiple times')
else:
client = client or cls.client
name = name or data_utils.rand_name(cls.__name__)
security_group = client.create_security_group(name=name, **kwargs)[
'security_group']
security_group['client'] = client
cls.security_groups.append(security_group)
return security_group
@classmethod
def delete_security_group(cls, security_group, client=None):
client = client or security_group.get('client') or cls.client
client.delete_security_group(security_group['id'])
@classmethod
def create_security_group_rule(cls, security_group=None, project=None,
client=None, ip_version=None, **kwargs):
if project:
client = client or cls.admin_client
project_id = kwargs.setdefault('project_id', project['id'])
tenant_id = kwargs.setdefault('tenant_id', project['id'])
if project_id != project['id'] or tenant_id != project['id']:
raise ValueError('Project ID specified multiple times')
if 'security_group_id' not in kwargs:
security_group = (security_group or
cls.get_security_group(client=client))
if security_group:
client = client or security_group.get('client')
security_group_id = kwargs.setdefault('security_group_id',
security_group['id'])
if security_group_id != security_group['id']:
raise ValueError('Security group ID specified multiple times.')
ip_version = ip_version or cls._ip_version
default_params = (
constants.DEFAULT_SECURITY_GROUP_RULE_PARAMS[ip_version])
for key, value in default_params.items():
kwargs.setdefault(key, value)
client = client or cls.client
return client.create_security_group_rule(**kwargs)[
'security_group_rule']
@classmethod
def get_security_group(cls, name='default', client=None):
client = client or cls.client
security_groups = client.list_security_groups()['security_groups']
for security_group in security_groups:
if security_group['name'] == name:
return security_group
raise ValueError("No such security group named {!r}".format(name))
@classmethod
def create_keypair(cls, client=None, name=None, **kwargs):

View File

@ -171,3 +171,11 @@ VALID_FLOATINGIP_STATUS = (lib_constants.FLOATINGIP_STATUS_ACTIVE,
# Possible types of values (e.g. in QoS rule types)
VALUES_TYPE_CHOICES = "choices"
VALUES_TYPE_RANGE = "range"
# Security group parameters values mapped by IP version
DEFAULT_SECURITY_GROUP_RULE_PARAMS = {
lib_constants.IP_VERSION_4: {'ethertype': lib_constants.IPv4,
'remote_ip_prefix': lib_constants.IPv4_ANY},
lib_constants.IP_VERSION_6: {'ethertype': lib_constants.IPv6,
'remote_ip_prefix': lib_constants.IPv6_ANY},
}

View File

@ -122,29 +122,24 @@ class BaseTempestTestCase(base_api.BaseNetworkTest):
Setting a group_id would only permit traffic from ports
belonging to the same security group.
"""
rule_list = [{'protocol': 'tcp',
'direction': 'ingress',
'port_range_min': 22,
'port_range_max': 22,
'remote_ip_prefix': '0.0.0.0/0'}]
client = client or cls.os_primary.network_client
cls.create_secgroup_rules(rule_list, client=client,
secgroup_id=secgroup_id)
return cls.create_security_group_rule(
security_group_id=secgroup_id,
client=client,
protocol=neutron_lib_constants.PROTO_NAME_TCP,
direction=neutron_lib_constants.INGRESS_DIRECTION,
port_range_min=22,
port_range_max=22)
@classmethod
def create_pingable_secgroup_rule(cls, secgroup_id=None,
client=None):
"""This rule is intended to permit inbound ping"""
"""This rule is intended to permit inbound ping
rule_list = [{'protocol': 'icmp',
'direction': 'ingress',
'port_range_min': 8, # type
'port_range_max': 0, # code
'remote_ip_prefix': '0.0.0.0/0'}]
client = client or cls.os_primary.network_client
cls.create_secgroup_rules(rule_list, client=client,
secgroup_id=secgroup_id)
"""
return cls.create_security_group_rule(
security_group_id=secgroup_id, client=client,
protocol=neutron_lib_constants.PROTO_NAME_ICMP,
direction=neutron_lib_constants.INGRESS_DIRECTION)
@classmethod
def create_router_by_client(cls, is_admin=False, **kwargs):