Neutron Security Group native support

blueprint quantum-security-group

Rule table view
* Add direction and ethertype columns (which are specific to Neutron)
  It may be better to hide "Direction" and "Ether Type" columns
  unless Quantum security group is enabled.
* Merge ip_protocol/from_port/to_port into one column for better view
* Use "::/0" for IPv6 ANY instead of "0.0.0.0/0"
* Rename "Source" column to "Remote".
  (The naming "source" does not fit egress rules)
* Display security group name in the title of rule detail view

Rule creation form
* New arguments 'direction' and 'ethertype' in security_group_rule_create()
* Set the default value of 'direction' to 'ingress' in forms.handle()
* Rename 'ip_protocol' to 'rule_menu' and 'source' to 'remote'
  Note that rule_menu is retrieved from rule.ip_protocol in the unit tests
  since they are tests for custom TCP/UDP/ICMP rules.

Network abstraction layer for security group management
* Move security group methods to api.network
* Add Neutron security group API implementation
* Move base classes for network abstraction to a separate module
  (api/network_base.py) to avoid circulated import between
  api.network and api.nova/api.neutron

Add a configuration parameter to control Neutron security group support
* Neutron security group support is enabled when Neutron is enabled and
  enable_security_group in OPENSTACK_NEUTRON_NETWORK in settings is True.
* Not all neutron plugins support security group, so we need a way
  to control neutron security group is enabled or not.
* It can be determined by supported extension list from Neutron
  and it is a possible future work.

Move get_int_or_uuid to openstack_dashboard/utils/filters.
* get_int_or_uuid is now used in security_group implementation as
  well as floating IP logics.
* In addition the depth of the directory tree becomes longer and
  it is hard to fit the import line in 80 chars.
  It is a good chance to move it to a common directory.

Add __repr__ to API**Wrapper to make it easier to debug.

Limitations:
Neutron supports per-port security group. security groups can be
associated with a port instead of an instace and each port can have
a different set of security groups. It is not a scope of this BP
and is a future work.

Change-Id: I5410e88043a364596037b9ebcc566cd50b317614
This commit is contained in:
Akihiro MOTOKI 2013-06-17 18:11:04 +09:00
parent c0ace11e82
commit 695bf560c0
27 changed files with 1433 additions and 514 deletions

View File

@ -24,6 +24,12 @@ def validate_port_range(port):
raise ValidationError("Not a valid port number")
def validate_ip_protocol(ip_proto):
if ip_proto not in range(-1, 256):
raise ValidationError("%s is not a valid ip protocol number" %
type(ip_proto))
def password_validator():
return conf.HORIZON_CONFIG["password_validator"]["regex"]

View File

@ -89,6 +89,12 @@ class APIResourceWrapper(object):
LOG.debug(exceptions.error_color(msg))
raise AttributeError(attr)
def __repr__(self):
return "<%s: %s>" % (self.__class__.__name__,
dict((attr,
getattr(self, attr))
for attr in self._attrs))
class APIDictWrapper(object):
""" Simple wrapper for api dictionaries
@ -125,6 +131,9 @@ class APIDictWrapper(object):
except AttributeError:
return default
def __repr__(self):
return "<%s: %s>" % (self.__class__.__name__, self._apidict)
class Quota(object):
"""Wrapper for individual limits in a quota."""

View File

@ -17,120 +17,35 @@
"""Abstraction layer for networking functionalities.
Currently Nova and Neutron have duplicated features. This API layer is
introduced to astract the differences between them for seamless consumption by
introduced to abstract the differences between them for seamless consumption by
different dashboard implementations.
"""
import abc
from django.conf import settings
from openstack_dashboard.api import base
from openstack_dashboard.api import neutron
from openstack_dashboard.api import nova
class NetworkClient(object):
def __init__(self, request):
from openstack_dashboard import api
if api.base.is_service_enabled(request, 'network'):
self.floating_ips = api.neutron.FloatingIpManager(request)
neutron_enabled = base.is_service_enabled(request, 'network')
if neutron_enabled:
self.floating_ips = neutron.FloatingIpManager(request)
else:
self.floating_ips = api.nova.FloatingIpManager(request)
self.floating_ips = nova.FloatingIpManager(request)
class FloatingIpManager(object):
"""Abstract class to implement Floating IP methods
The FloatingIP object returned from methods in this class
must contains the following attributes:
* id: ID of Floating IP
* ip: Floating IP address
* pool: ID of Floating IP pool from which the address is allocated
* fixed_ip: Fixed IP address of a VIF associated with the address
* port_id: ID of a VIF associated with the address
(instance_id when Nova floating IP is used)
* instance_id: Instance ID of an associated with the Floating IP
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def list_pools(self):
"""Fetches a list of all floating IP pools.
A list of FloatingIpPool objects is returned.
FloatingIpPool object is an APIResourceWrapper/APIDictWrapper
where 'id' and 'name' attributes are defined.
"""
pass
@abc.abstractmethod
def list(self):
"""Fetches a list all floating IPs.
A returned value is a list of FloatingIp object.
"""
pass
@abc.abstractmethod
def get(self, floating_ip_id):
"""Fetches the floating IP.
It returns a FloatingIp object corresponding to floating_ip_id.
"""
pass
@abc.abstractmethod
def allocate(self, pool=None):
"""Allocates a floating IP to the tenant.
You must provide a pool name or id for which you would like to
allocate an floating IP.
"""
pass
@abc.abstractmethod
def release(self, floating_ip_id):
"""Releases a floating IP specified."""
pass
@abc.abstractmethod
def associate(self, floating_ip_id, port_id):
"""Associates the floating IP to the port.
port_id is a fixed IP of a instance (Nova) or
a port_id attached to a VNIC of a instance.
"""
pass
@abc.abstractmethod
def disassociate(self, floating_ip_id, port_id):
"""Disassociates the floating IP from the port.
port_id is a fixed IP of a instance (Nova) or
a port_id attached to a VNIC of a instance.
"""
pass
@abc.abstractmethod
def list_targets(self):
"""Returns a list of association targets of instance VIFs.
Each association target is represented as FloatingIpTarget object.
FloatingIpTarget is a APIResourceWrapper/APIDictWrapper and
'id' and 'name' attributes must be defined in each object.
FloatingIpTarget.id can be passed as port_id in associate().
FloatingIpTarget.name is displayed in Floating Ip Association Form.
"""
pass
@abc.abstractmethod
def get_target_id_by_instance(self, instance_id):
"""Returns a target ID of floating IP association based on
a backend implementation.
"""
pass
@abc.abstractmethod
def is_simple_associate_supported(self):
"""Returns True if the default floating IP pool is enabled."""
pass
# Not all qunantum plugins support security group,
# so we have enable_security_group configuration parameter.
neutron_sg_enabled = getattr(settings,
'OPENSTACK_NEUTRON_NETWORK',
{}).get('enable_security_group', True)
if neutron_enabled and neutron_sg_enabled:
self.secgroups = neutron.SecurityGroupManager(request)
else:
self.secgroups = nova.SecurityGroupManager(request)
def floating_ip_pools_list(request):
@ -170,3 +85,45 @@ def floating_ip_target_list(request):
def floating_ip_target_get_by_instance(request, instance_id):
return NetworkClient(request).floating_ips.get_target_id_by_instance(
instance_id)
def security_group_list(request):
return NetworkClient(request).secgroups.list()
def security_group_get(request, sg_id):
return NetworkClient(request).secgroups.get(sg_id)
def security_group_create(request, name, desc):
return NetworkClient(request).secgroups.create(name, desc)
def security_group_delete(request, sg_id):
return NetworkClient(request).secgroups.delete(sg_id)
def security_group_rule_create(request, parent_group_id,
direction, ethertype,
ip_protocol, from_port, to_port,
cidr, group_id):
return NetworkClient(request).secgroups.rule_create(
parent_group_id, direction, ethertype, ip_protocol,
from_port, to_port, cidr, group_id)
def security_group_rule_delete(request, sgr_id):
return NetworkClient(request).secgroups.rule_delete(sgr_id)
def server_security_groups(request, instance_id):
return NetworkClient(request).secgroups.list_by_instance(instance_id)
def server_update_security_groups(request, instance_id, new_sgs):
return NetworkClient(request).secgroups.update_instance_security_group(
instance_id, new_sgs)
def security_group_backend(request):
return NetworkClient(request).secgroups.backend

View File

@ -0,0 +1,216 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 NEC Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Abstraction layer for networking functionalities.
This module defines internal APIs for duplicated features between OpenStack
Compute and OpenStack Networking. The networking abstraction layer expects
methods defined in this module.
"""
import abc
class FloatingIpManager(object):
"""Abstract class to implement Floating IP methods
The FloatingIP object returned from methods in this class
must contains the following attributes:
* id: ID of Floating IP
* ip: Floating IP address
* pool: ID of Floating IP pool from which the address is allocated
* fixed_ip: Fixed IP address of a VIF associated with the address
* port_id: ID of a VIF associated with the address
(instance_id when Nova floating IP is used)
* instance_id: Instance ID of an associated with the Floating IP
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def list_pools(self):
"""Fetches a list of all floating IP pools.
A list of FloatingIpPool objects is returned.
FloatingIpPool object is an APIResourceWrapper/APIDictWrapper
where 'id' and 'name' attributes are defined.
"""
pass
@abc.abstractmethod
def list(self):
"""Fetches a list all floating IPs.
A returned value is a list of FloatingIp object.
"""
pass
@abc.abstractmethod
def get(self, floating_ip_id):
"""Fetches the floating IP.
It returns a FloatingIp object corresponding to floating_ip_id.
"""
pass
@abc.abstractmethod
def allocate(self, pool=None):
"""Allocates a floating IP to the tenant.
You must provide a pool name or id for which you would like to
allocate an floating IP.
"""
pass
@abc.abstractmethod
def release(self, floating_ip_id):
"""Releases a floating IP specified."""
pass
@abc.abstractmethod
def associate(self, floating_ip_id, port_id):
"""Associates the floating IP to the port.
port_id is a fixed IP of a instance (Nova) or
a port_id attached to a VNIC of a instance.
"""
pass
@abc.abstractmethod
def disassociate(self, floating_ip_id, port_id):
"""Disassociates the floating IP from the port.
port_id is a fixed IP of a instance (Nova) or
a port_id attached to a VNIC of a instance.
"""
pass
@abc.abstractmethod
def list_targets(self):
"""Returns a list of association targets of instance VIFs.
Each association target is represented as FloatingIpTarget object.
FloatingIpTarget is a APIResourceWrapper/APIDictWrapper and
'id' and 'name' attributes must be defined in each object.
FloatingIpTarget.id can be passed as port_id in associate().
FloatingIpTarget.name is displayed in Floating Ip Association Form.
"""
pass
@abc.abstractmethod
def get_target_id_by_instance(self, instance_id):
"""Returns a target ID of floating IP association based on
a backend implementation.
"""
pass
@abc.abstractmethod
def is_simple_associate_supported(self):
"""Returns True if the default floating IP pool is enabled."""
pass
class SecurityGroupManager(object):
"""Abstract class to implement Security Group methods
SecurityGroup object returned from methods in this class
must contains the following attributes:
- id : ID of Security Group (int for Nova, uuid for Neutron)
- name
- description
- tenant_id
- rules : A list of SecurityGroupRule objects
SecurityGroupRule object should have the following attributes:
The attribute names and their formats are borrowed from nova
security group implementation.
- id
- direction
- ethertype
- parent_group_id : security group the rule belongs to
- ip_protocol
- from_port : lower limit of allowed port range (inclusive)
- to_port : upper limit of allowed port range (inclusive)
- ip_range : remote IP CIDR (source for ingress, dest for egress)
The value should be a format of "{'cidr': <cidr>}"
- group : remote security group
The value should be a format of "{'name': <secgroup_name>}"
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def list(self):
"""Fetches a list all security groups.
A returned value is a list of SecurityGroup object.
"""
pass
@abc.abstractmethod
def get(self, sg_id):
"""Fetches the security group.
It returns a SecurityGroup object corresponding to sg_id.
"""
pass
@abc.abstractmethod
def create(self, name, desc):
"""Create a new security group.
It returns a SecurityGroup object created.
"""
pass
@abc.abstractmethod
def delete(self, sg_id):
"""Delete the specified security group."""
pass
@abc.abstractmethod
def rule_create(self, parent_group_id,
direction=None, ethertype=None,
ip_protocol=None, from_port=None, to_port=None,
cidr=None, group_id=None):
"""Create a new security group rule.
:param parent_group_id: security group id a rule is created to
:param direction: ingress or egress
:param ethertype: ipv4, ipv6, ...
:param ip_protocol: tcp, udp, icmp
:param from_port: L4 port range min
:param to_port: L4 port range max
:param cidr: Source IP CIDR
:param group_id: ID of Source Security Group
"""
pass
@abc.abstractmethod
def rule_delete(self, sgr_id):
"""Delete the specified security group rule."""
pass
@abc.abstractmethod
def list_by_instance(self, instance_id):
"""Get security groups of an instance."""
pass
@abc.abstractmethod
def update_instance_security_group(self, instance_id, new_sgs):
"""Update security groups of a specified instance."""
pass

View File

@ -25,10 +25,11 @@ import logging
from django.conf import settings
from django.utils.datastructures import SortedDict
from django.utils.translation import ugettext_lazy as _
from openstack_dashboard.api.base import APIDictWrapper
from openstack_dashboard.api.base import url_for
from openstack_dashboard.api import network
from openstack_dashboard.api import network_base
from openstack_dashboard.api import nova
from neutronclient.v2_0 import client as neutron_client
@ -93,6 +94,174 @@ class Router(NeutronAPIDictWrapper):
super(Router, self).__init__(apiresource)
class SecurityGroup(NeutronAPIDictWrapper):
# Required attributes: id, name, description, tenant_id, rules
def __init__(self, sg, sg_dict=None):
if sg_dict is None:
sg_dict = {sg['id']: sg['name']}
sg['rules'] = [SecurityGroupRule(rule, sg_dict)
for rule in sg['security_group_rules']]
super(SecurityGroup, self).__init__(sg)
class SecurityGroupRule(NeutronAPIDictWrapper):
# Required attributes:
# id, parent_group_id
# ip_protocol, from_port, to_port, ip_range, group
# ethertype, direction (Neutron specific)
def _get_secgroup_name(self, sg_id, sg_dict):
if sg_id:
if sg_dict is None:
sg_dict = {}
# If sg name not found in sg_dict,
# first two parts of UUID is used as sg name.
return sg_dict.get(sg_id, sg_id[:13])
else:
return u''
def __init__(self, sgr, sg_dict=None):
# In Neutron, if both remote_ip_prefix and remote_group_id are None,
# it means all remote IP range is allowed, i.e., 0.0.0.0/0 or ::/0.
if not sgr['remote_ip_prefix'] and not sgr['remote_group_id']:
if sgr['ethertype'] == 'IPv6':
sgr['remote_ip_prefix'] = '::/0'
else:
sgr['remote_ip_prefix'] = '0.0.0.0/0'
rule = {
'id': sgr['id'],
'parent_group_id': sgr['security_group_id'],
'direction': sgr['direction'],
'ethertype': sgr['ethertype'],
'ip_protocol': sgr['protocol'],
'from_port': sgr['port_range_min'],
'to_port': sgr['port_range_max'],
}
cidr = sgr['remote_ip_prefix']
rule['ip_range'] = {'cidr': cidr} if cidr else {}
group = self._get_secgroup_name(sgr['remote_group_id'], sg_dict)
rule['group'] = {'name': group} if group else {}
super(SecurityGroupRule, self).__init__(rule)
def __unicode__(self):
if 'name' in self.group:
remote = self.group['name']
elif 'cidr' in self.ip_range:
remote = self.ip_range['cidr']
else:
remote = 'ANY'
direction = 'to' if self.direction == 'egress' else 'from'
if self.from_port:
if self.from_port == self.to_port:
proto_port = ("%s/%s" %
(self.from_port, self.ip_protocol.lower()))
else:
proto_port = ("%s-%s/%s" %
(self.from_port, self.to_port,
self.ip_protocol.lower()))
elif self.ip_protocol:
try:
ip_proto = int(self.ip_protocol)
proto_port = "ip_proto=%d" % ip_proto
except:
# well-defined IP protocol name like TCP, UDP, ICMP.
proto_port = self.ip_protocol
else:
proto_port = ''
return (_('ALLOW %(ethertype)s %(proto_port)s '
'%(direction)s %(remote)s') %
{'ethertype': self.ethertype,
'proto_port': proto_port,
'remote': remote,
'direction': direction})
class SecurityGroupManager(network_base.SecurityGroupManager):
backend = 'neutron'
def __init__(self, request):
self.request = request
self.client = neutronclient(request)
def _list(self, **filters):
secgroups = self.client.list_security_groups(**filters)
return [SecurityGroup(sg) for sg in secgroups.get('security_groups')]
def list(self):
tenant_id = self.request.user.tenant_id
return self._list(tenant_id=tenant_id)
def _sg_name_dict(self, sg_id, rules):
"""Create a mapping dict from secgroup id to its name."""
related_ids = set([sg_id])
related_ids |= set(filter(None, [r['remote_group_id'] for r in rules]))
related_sgs = self.client.list_security_groups(id=related_ids,
fields=['id', 'name'])
related_sgs = related_sgs.get('security_groups')
return dict((sg['id'], sg['name']) for sg in related_sgs)
def get(self, sg_id):
secgroup = self.client.show_security_group(sg_id).get('security_group')
sg_dict = self._sg_name_dict(sg_id, secgroup['security_group_rules'])
return SecurityGroup(secgroup, sg_dict)
def create(self, name, desc):
body = {'security_group': {'name': name,
'description': desc}}
secgroup = self.client.create_security_group(body)
return SecurityGroup(secgroup.get('security_group'))
def delete(self, sg_id):
self.client.delete_security_group(sg_id)
def rule_create(self, parent_group_id,
direction=None, ethertype=None,
ip_protocol=None, from_port=None, to_port=None,
cidr=None, group_id=None):
if not cidr:
cidr = None
if from_port < 0:
from_port = None
if to_port < 0:
to_port = None
if isinstance(ip_protocol, int) and ip_protocol < 0:
ip_protocol = None
body = {'security_group_rule':
{'security_group_id': parent_group_id,
'direction': direction,
'ethertype': ethertype,
'protocol': ip_protocol,
'port_range_min': from_port,
'port_range_max': to_port,
'remote_ip_prefix': cidr,
'remote_group_id': group_id}}
rule = self.client.create_security_group_rule(body)
rule = rule.get('security_group_rule')
sg_dict = self._sg_name_dict(parent_group_id, [rule])
return SecurityGroupRule(rule, sg_dict)
def rule_delete(self, sgr_id):
self.client.delete_security_group_rule(sgr_id)
def list_by_instance(self, instance_id):
"""Gets security groups of an instance."""
ports = port_list(self.request, device_id=instance_id)
sg_ids = []
for p in ports:
sg_ids += p.security_groups
return self._list(id=set(sg_ids))
def update_instance_security_group(self, instance_id, new_sgs):
ports = port_list(self.request, device_id=instance_id)
for p in ports:
params = {'security_groups': new_sgs}
port_modify(self.request, p.id, **params)
class FloatingIp(APIDictWrapper):
_attrs = ['id', 'ip', 'fixed_ip', 'port_id', 'instance_id', 'pool']
@ -111,7 +280,7 @@ class FloatingIpTarget(APIDictWrapper):
pass
class FloatingIpManager(network.FloatingIpManager):
class FloatingIpManager(network_base.FloatingIpManager):
def __init__(self, request):
self.request = request
self.client = neutronclient(request)

View File

@ -39,7 +39,7 @@ from openstack_dashboard.api.base import APIDictWrapper
from openstack_dashboard.api.base import APIResourceWrapper
from openstack_dashboard.api.base import QuotaSet
from openstack_dashboard.api.base import url_for
from openstack_dashboard.api import network
from openstack_dashboard.api import network_base
LOG = logging.getLogger(__name__)
@ -152,14 +152,11 @@ class SecurityGroup(APIResourceWrapper):
"""Wraps transmitted rule info in the novaclient rule class."""
if "_rules" not in self.__dict__:
manager = nova_rules.SecurityGroupRuleManager(None)
self._rules = [nova_rules.SecurityGroupRule(manager, rule)
for rule in self._apiresource.rules]
rule_objs = [nova_rules.SecurityGroupRule(manager, rule)
for rule in self._apiresource.rules]
self._rules = [SecurityGroupRule(rule) for rule in rule_objs]
return self.__dict__['_rules']
@rules.setter
def rules(self, value):
self._rules = value
class SecurityGroupRule(APIResourceWrapper):
""" Wrapper for individual rules in a SecurityGroup. """
@ -177,6 +174,97 @@ class SecurityGroupRule(APIResourceWrapper):
'cidr': self.ip_range['cidr']}
return _('ALLOW %(from)s:%(to)s from %(cidr)s') % vals
# The following attributes are defined to keep compatibility with Neutron
@property
def ethertype(self):
return None
@property
def direction(self):
return 'ingress'
class SecurityGroupManager(network_base.SecurityGroupManager):
backend = 'nova'
def __init__(self, request):
self.request = request
self.client = novaclient(request)
def list(self):
return [SecurityGroup(g) for g
in self.client.security_groups.list()]
def get(self, sg_id):
return SecurityGroup(self.client.security_groups.get(sg_id))
def create(self, name, desc):
return SecurityGroup(self.client.security_groups.create(name, desc))
def delete(self, security_group_id):
self.client.security_groups.delete(security_group_id)
def rule_create(self, parent_group_id,
direction=None, ethertype=None,
ip_protocol=None, from_port=None, to_port=None,
cidr=None, group_id=None):
# Nova Security Group API does not use direction and ethertype fields.
sg = self.client.security_group_rules.create(parent_group_id,
ip_protocol,
from_port,
to_port,
cidr,
group_id)
return SecurityGroupRule(sg)
def rule_delete(self, security_group_rule_id):
self.client.security_group_rules.delete(security_group_rule_id)
def list_by_instance(self, instance_id):
"""Gets security groups of an instance."""
# TODO(gabriel): This needs to be moved up to novaclient, and should
# be removed once novaclient supports this call.
security_groups = []
nclient = self.client
resp, body = nclient.client.get('/servers/%s/os-security-groups'
% instance_id)
if body:
# Wrap data in SG objects as novaclient would.
sg_objs = [NovaSecurityGroup(nclient.security_groups, sg,
loaded=True)
for sg in body.get('security_groups', [])]
# Then wrap novaclient's object with our own. Yes, sadly wrapping
# with two layers of objects is necessary.
security_groups = [SecurityGroup(sg) for sg in sg_objs]
return security_groups
def update_instance_security_group(self, instance_id, new_sgs):
wanted_groups = set(new_sgs)
try:
current_groups = self.list_by_instance(instance_id)
except Exception:
raise Exception(_("Couldn't get current security group "
"list for instance %s.")
% instance_id)
current_group_names = set(map(lambda g: g.id, current_groups))
groups_to_add = wanted_groups - current_group_names
groups_to_remove = current_group_names - wanted_groups
num_groups_to_modify = len(groups_to_add | groups_to_remove)
try:
for group in groups_to_add:
self.client.servers.add_security_group(instance_id, group)
num_groups_to_modify -= 1
for group in groups_to_remove:
self.client.servers.remove_security_group(instance_id, group)
num_groups_to_modify -= 1
except Exception:
raise Exception(_('Failed to modify %d instance security groups.')
% num_groups_to_modify)
return True
class FlavorExtraSpec(object):
def __init__(self, flavor_id, key, val):
@ -208,7 +296,7 @@ class FloatingIpTarget(APIDictWrapper):
super(FloatingIpTarget, self).__init__(server_dict)
class FloatingIpManager(network.FloatingIpManager):
class FloatingIpManager(network_base.FloatingIpManager):
def __init__(self, request):
self.request = request
self.client = novaclient(request)
@ -398,39 +486,6 @@ def server_console_output(request, instance_id, tail_length=None):
length=tail_length)
def server_security_groups(request, instance_id):
"""Gets security groups of an instance."""
# TODO(gabriel): This needs to be moved up to novaclient, and should
# be removed once novaclient supports this call.
security_groups = []
nclient = novaclient(request)
resp, body = nclient.client.get('/servers/%s/os-security-groups'
% instance_id)
if body:
# Wrap data in SG objects as novaclient would.
sg_objs = [NovaSecurityGroup(nclient.security_groups, sg, loaded=True)
for sg in body.get('security_groups', [])]
# Then wrap novaclient's object with our own. Yes, sadly wrapping
# with two layers of objects is necessary.
security_groups = [SecurityGroup(sg) for sg in sg_objs]
# Package up the rules, as well.
for sg in security_groups:
rule_objects = [SecurityGroupRule(rule) for rule in sg.rules]
sg.rules = rule_objects
return security_groups
def server_add_security_group(request, instance_id, security_group_name):
return novaclient(request).servers.add_security_group(instance_id,
security_group_name)
def server_remove_security_group(request, instance_id, security_group_name):
return novaclient(request).servers.remove_security_group(
instance_id,
security_group_name)
def server_pause(request, instance_id):
novaclient(request).servers.pause(instance_id)
@ -506,40 +561,6 @@ def usage_list(request, start, end):
novaclient(request).usage.list(start, end, True)]
def security_group_list(request):
return [SecurityGroup(g) for g
in novaclient(request).security_groups.list()]
def security_group_get(request, sg_id):
return SecurityGroup(novaclient(request).security_groups.get(sg_id))
def security_group_create(request, name, desc):
return SecurityGroup(novaclient(request).security_groups.create(name,
desc))
def security_group_delete(request, security_group_id):
novaclient(request).security_groups.delete(security_group_id)
def security_group_rule_create(request, parent_group_id, ip_protocol=None,
from_port=None, to_port=None, cidr=None,
group_id=None):
sg = novaclient(request).security_group_rules.create(parent_group_id,
ip_protocol,
from_port,
to_port,
cidr,
group_id)
return SecurityGroupRule(sg)
def security_group_rule_delete(request, security_group_rule_id):
novaclient(request).security_group_rules.delete(security_group_rule_id)
def virtual_interfaces_list(request, instance_id):
return novaclient(request).virtual_interfaces.list(instance_id)

View File

@ -27,8 +27,7 @@ from horizon import messages
from horizon import tables
from openstack_dashboard import api
from .utils import get_int_or_uuid
from openstack_dashboard.utils.filters import get_int_or_uuid
LOG = logging.getLogger(__name__)

View File

@ -19,8 +19,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from django.core.urlresolvers import reverse
from django import http
@ -29,8 +27,6 @@ from mox import IsA
from openstack_dashboard import api
from openstack_dashboard.test import helpers as test
from .utils import get_int_or_uuid
from horizon.workflows.views import WorkflowView
@ -181,24 +177,3 @@ class FloatingIpNeutronViewTests(FloatingIpViewTests):
def tearDown(self):
self.floating_ips = self._floating_ips_orig
super(FloatingIpViewTests, self).tearDown()
class FloatingIpUtilsTests(test.TestCase):
def test_accept_valid_integer(self):
val = 100
ret = get_int_or_uuid(val)
self.assertEqual(val, ret)
def test_accept_valid_integer_string(self):
val = '100'
ret = get_int_or_uuid(val)
self.assertEqual(int(val), ret)
def test_accept_valid_uuid(self):
val = str(uuid.uuid4())
ret = get_int_or_uuid(val)
self.assertEqual(val, ret)
def test_reject_random_string(self):
val = '55WbJTpJDf'
self.assertRaises(ValueError, get_int_or_uuid, val)

View File

@ -23,8 +23,7 @@ from horizon import forms
from horizon import workflows
from openstack_dashboard import api
from .utils import get_int_or_uuid
from openstack_dashboard.utils.filters import get_int_or_uuid
ALLOCATE_URL = "horizon:project:access_and_security:floating_ips:allocate"

View File

@ -18,6 +18,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import logging
import netaddr
from django.conf import settings
from django.core.urlresolvers import reverse
from django.core import validators
@ -28,11 +32,14 @@ from horizon import exceptions
from horizon import forms
from horizon import messages
from horizon.utils import fields
from horizon.utils.validators import validate_ip_protocol
from horizon.utils.validators import validate_port_range
from openstack_dashboard import api
from openstack_dashboard.utils.filters import get_int_or_uuid
from ..floating_ips.utils import get_int_or_uuid
LOG = logging.getLogger(__name__)
class CreateGroup(forms.SelfHandlingForm):
@ -46,9 +53,9 @@ class CreateGroup(forms.SelfHandlingForm):
def handle(self, request, data):
try:
sg = api.nova.security_group_create(request,
data['name'],
data['description'])
sg = api.network.security_group_create(request,
data['name'],
data['description'])
messages.success(request,
_('Successfully created security group: %s')
% data['name'])
@ -62,20 +69,49 @@ class CreateGroup(forms.SelfHandlingForm):
class AddRule(forms.SelfHandlingForm):
id = forms.CharField(widget=forms.HiddenInput())
ip_protocol = forms.ChoiceField(label=_('Rule'),
widget=forms.Select(attrs={
'class': 'switchable',
'data-slug': 'protocol'}))
rule_menu = forms.ChoiceField(label=_('Rule'),
widget=forms.Select(attrs={
'class': 'switchable',
'data-slug': 'rule_menu'}))
port_or_range = forms.ChoiceField(label=_('Open'),
choices=[('port', _('Port')),
('range', _('Port Range'))],
widget=forms.Select(attrs={
'class': 'switchable switched',
'data-slug': 'range',
'data-switch-on': 'protocol',
'data-protocol-tcp': _('Open'),
'data-protocol-udp': _('Open')}))
# "direction" field is enabled only when custom mode.
# It is because most common rules in local_settings.py is meaningful
# when its direction is 'ingress'.
direction = forms.ChoiceField(
label=_('Direction'),
required=False,
widget=forms.Select(attrs={
'class': 'switched',
'data-switch-on': 'rule_menu',
'data-rule_menu-tcp': _('Direction'),
'data-rule_menu-udp': _('Direction'),
'data-rule_menu-icmp': _('Direction'),
'data-rule_menu-custom': _('Direction'),
'data-rule_menu-all_tcp': _('Direction'),
'data-rule_menu-all_udp': _('Direction'),
'data-rule_menu-all_icmp': _('Direction'),
}))
ip_protocol = forms.IntegerField(
label=_('IP Protocol'), required=False,
help_text=_("Enter an integer value between 0 and 255 "
"(or -1 which means wildcard)."),
validators=[validate_ip_protocol],
widget=forms.TextInput(attrs={
'class': 'switched',
'data-switch-on': 'rule_menu',
'data-rule_menu-custom': _('IP Protocol')}))
port_or_range = forms.ChoiceField(
label=_('Open Port'),
choices=[('port', _('Port')),
('range', _('Port Range'))],
widget=forms.Select(attrs={
'class': 'switchable switched',
'data-slug': 'range',
'data-switch-on': 'rule_menu',
'data-rule_menu-tcp': _('Open Port'),
'data-rule_menu-udp': _('Open Port')}))
port = forms.IntegerField(label=_("Port"),
required=False,
@ -113,8 +149,8 @@ class AddRule(forms.SelfHandlingForm):
"in the range (-1: 255)"),
widget=forms.TextInput(attrs={
'class': 'switched',
'data-switch-on': 'protocol',
'data-protocol-icmp': _('Type')}),
'data-switch-on': 'rule_menu',
'data-rule_menu-icmp': _('Type')}),
validators=[validate_port_range])
icmp_code = forms.IntegerField(label=_("Code"),
@ -123,11 +159,11 @@ class AddRule(forms.SelfHandlingForm):
"in the range (-1: 255)"),
widget=forms.TextInput(attrs={
'class': 'switched',
'data-switch-on': 'protocol',
'data-protocol-icmp': _('Code')}),
'data-switch-on': 'rule_menu',
'data-rule_menu-icmp': _('Code')}),
validators=[validate_port_range])
source = forms.ChoiceField(label=_('Source'),
remote = forms.ChoiceField(label=_('Remote'),
choices=[('cidr', _('CIDR')),
('sg', _('Security Group'))],
help_text=_('To specify an allowed IP '
@ -138,7 +174,7 @@ class AddRule(forms.SelfHandlingForm):
'Group".'),
widget=forms.Select(attrs={
'class': 'switchable',
'data-slug': 'source'}))
'data-slug': 'remote'}))
cidr = fields.IPField(label=_("CIDR"),
required=False,
@ -149,44 +185,71 @@ class AddRule(forms.SelfHandlingForm):
mask=True,
widget=forms.TextInput(
attrs={'class': 'switched',
'data-switch-on': 'source',
'data-source-cidr': _('CIDR')}))
'data-switch-on': 'remote',
'data-remote-cidr': _('CIDR')}))
security_group = forms.ChoiceField(label=_('Security Group'),
required=False,
widget=forms.Select(attrs={
'class': 'switched',
'data-switch-on': 'source',
'data-source-sg': _('Security '
'data-switch-on': 'remote',
'data-remote-sg': _('Security '
'Group')}))
# When cidr is used ethertype is determined from IP version of cidr.
# When source group, ethertype needs to be specified explicitly.
ethertype = forms.ChoiceField(label=_('Ether Type'),
required=False,
choices=[('IPv4', _('IPv4')),
('IPv6', _('IPv6'))],
widget=forms.Select(attrs={
'class': 'switched',
'data-slug': 'ethertype',
'data-switch-on': 'remote',
'data-remote-sg': _('Ether Type')}))
def __init__(self, *args, **kwargs):
sg_list = kwargs.pop('sg_list', [])
super(AddRule, self).__init__(*args, **kwargs)
# Determine if there are security groups available for the
# source group option; add the choices and enable the option if so.
# remote group option; add the choices and enable the option if so.
if sg_list:
security_groups_choices = sg_list
else:
security_groups_choices = [("", _("No security groups available"))]
self.fields['security_group'].choices = security_groups_choices
rules_dict = getattr(settings, 'SECURITY_GROUP_RULES', {})
backend = api.network.security_group_backend(self.request)
rules_dict = getattr(settings, 'SECURITY_GROUP_RULES', [])
common_rules = [(k, _(rules_dict[k]['name']))
for k in rules_dict]
for k in rules_dict
if rules_dict[k].get('backend', backend) == backend]
common_rules.sort()
custom_rules = [('tcp', _('Custom TCP Rule')),
('udp', _('Custom UDP Rule')),
('icmp', _('Custom ICMP Rule'))]
self.fields['ip_protocol'].choices = custom_rules + common_rules
if backend == 'neutron':
custom_rules.append(('custom', _('Other Protocol')))
self.fields['rule_menu'].choices = custom_rules + common_rules
self.rules = rules_dict
if backend == 'neutron':
self.fields['direction'].choices = [('ingress', _('Ingress')),
('egress', _('Egress'))]
else:
# direction and ethertype are not supported in Nova secgroup.
self.fields['direction'].widget = forms.HiddenInput()
self.fields['ethertype'].widget = forms.HiddenInput()
# ip_protocol field is to specify arbitrary protocol number
# and it is available only for neutron security group.
self.fields['ip_protocol'].widget = forms.HiddenInput()
def clean(self):
cleaned_data = super(AddRule, self).clean()
ip_proto = cleaned_data.get('ip_protocol')
rule_menu = cleaned_data.get('rule_menu')
port_or_range = cleaned_data.get("port_or_range")
source = cleaned_data.get("source")
remote = cleaned_data.get("remote")
icmp_type = cleaned_data.get("icmp_type", None)
icmp_code = cleaned_data.get("icmp_code", None)
@ -195,7 +258,8 @@ class AddRule(forms.SelfHandlingForm):
to_port = cleaned_data.get("to_port", None)
port = cleaned_data.get("port", None)
if ip_proto == 'icmp':
if rule_menu == 'icmp':
cleaned_data['ip_protocol'] = rule_menu
if icmp_type is None:
msg = _('The ICMP type is invalid.')
raise ValidationError(msg)
@ -210,7 +274,8 @@ class AddRule(forms.SelfHandlingForm):
raise ValidationError(msg)
cleaned_data['from_port'] = icmp_type
cleaned_data['to_port'] = icmp_code
elif ip_proto == 'tcp' or ip_proto == 'udp':
elif rule_menu == 'tcp' or rule_menu == 'udp':
cleaned_data['ip_protocol'] = rule_menu
if port_or_range == "port":
cleaned_data["from_port"] = port
cleaned_data["to_port"] = port
@ -228,23 +293,51 @@ class AddRule(forms.SelfHandlingForm):
msg = _('The "to" port number must be greater than '
'or equal to the "from" port number.')
raise ValidationError(msg)
elif rule_menu == 'custom':
pass
else:
cleaned_data['ip_protocol'] = self.rules[ip_proto]['ip_protocol']
cleaned_data['from_port'] = int(self.rules[ip_proto]['from_port'])
cleaned_data['to_port'] = int(self.rules[ip_proto]['to_port'])
cleaned_data['ip_protocol'] = self.rules[rule_menu]['ip_protocol']
cleaned_data['from_port'] = int(self.rules[rule_menu]['from_port'])
cleaned_data['to_port'] = int(self.rules[rule_menu]['to_port'])
cleaned_data['direction'] = self.rules[rule_menu].get('direction')
if source == "cidr":
# NOTE(amotoki): There are two cases where cleaned_data['direction']
# is empty: (1) Nova Security Group is used. Since "direction" is
# HiddenInput, direction field exists but its value is ''.
# (2) Template is used. In this case, the default value is None.
# To make sure 'direction' field has 'ingress' or 'egress',
# fill this field here if it is not specified.
if not cleaned_data['direction']:
cleaned_data['direction'] = 'ingress'
if remote == "cidr":
cleaned_data['security_group'] = None
else:
cleaned_data['cidr'] = None
# If cleaned_data does not contain cidr, cidr is already marked
# as invalid, so skip the further validation for cidr.
# In addition cleaned_data['cidr'] is None means source_group is used.
if 'cidr' in cleaned_data and cleaned_data['cidr'] is not None:
cidr = cleaned_data['cidr']
if not cidr:
msg = _('CIDR must be specified.')
self._errors['cidr'] = self.error_class([msg])
else:
# If cidr is specified, ethertype is determined from IP address
# version. It is used only when Neutron is enabled.
ip_ver = netaddr.IPNetwork(cidr).version
cleaned_data['ethertype'] = 'IPv6' if ip_ver == 6 else 'IPv4'
return cleaned_data
def handle(self, request, data):
try:
rule = api.nova.security_group_rule_create(
rule = api.network.security_group_rule_create(
request,
get_int_or_uuid(data['id']),
data['direction'],
data['ethertype'],
data['ip_protocol'],
data['from_port'],
data['to_port'],

View File

@ -23,8 +23,7 @@ from django.utils.translation import ugettext_lazy as _
from horizon import tables
from openstack_dashboard import api
from ..floating_ips.utils import get_int_or_uuid
from openstack_dashboard.utils.filters import get_int_or_uuid
LOG = logging.getLogger(__name__)
@ -40,7 +39,7 @@ class DeleteGroup(tables.DeleteAction):
return security_group.name != 'default'
def delete(self, request, obj_id):
api.nova.security_group_delete(request, obj_id)
api.network.security_group_delete(request, obj_id)
class CreateGroup(tables.LinkAction):
@ -86,7 +85,7 @@ class DeleteRule(tables.DeleteAction):
data_type_plural = _("Rules")
def delete(self, request, obj_id):
api.nova.security_group_rule_delete(request, obj_id)
api.network.security_group_rule_delete(request, obj_id)
def get_success_url(self, request):
sg_id = self.table.kwargs['security_group_id']
@ -94,30 +93,49 @@ class DeleteRule(tables.DeleteAction):
"security_groups:detail", args=[sg_id])
def get_source(rule):
def get_remote(rule):
if 'cidr' in rule.ip_range:
if rule.ip_range['cidr'] is None:
return '0.0.0.0/0 (CIDR)'
range = '::/0' if rule.ethertype == 'IPv6' else '0.0.0.0/0'
else:
return rule.ip_range['cidr'] + ' (CIDR)'
range = rule.ip_range['cidr']
return range + ' (CIDR)'
elif 'name' in rule.group:
return rule.group['name']
else:
return None
def get_port_range(rule):
ip_proto = rule.ip_protocol
if rule.from_port == rule.to_port:
return check_rule_template(rule.from_port, ip_proto)
else:
return (u"%(from)s - %(to)s" %
{'from': check_rule_template(rule.from_port, ip_proto),
'to': check_rule_template(rule.to_port, ip_proto)})
def filter_direction(direction):
if direction is None or direction.lower() == 'ingress':
return _('Ingress')
else:
return _('Egress')
def filter_protocol(protocol):
if protocol is None:
return _('Any')
return unicode.upper(protocol)
def check_rule_template(port):
def check_rule_template(port, ip_proto):
rules_dict = getattr(settings, 'SECURITY_GROUP_RULES', {})
if not rules_dict:
return port
templ_rule = filter(lambda rule: str(port) == rule['from_port']
and str(port) == rule['to_port'],
and str(port) == rule['to_port']
and ip_proto == rule['ip_protocol'],
[rule for rule in rules_dict.values()])
if templ_rule:
return u"%(from_port)s (%(name)s)" % templ_rule[0]
@ -125,14 +143,17 @@ def check_rule_template(port):
class RulesTable(tables.DataTable):
direction = tables.Column("direction",
verbose_name=_("Direction"),
filters=(filter_direction,))
ethertype = tables.Column("ethertype",
verbose_name=_("Ether Type"))
protocol = tables.Column("ip_protocol",
verbose_name=_("IP Protocol"),
filters=(filter_protocol,))
from_port = tables.Column("from_port", verbose_name=_("From Port"),
filters=(check_rule_template,))
to_port = tables.Column("to_port", verbose_name=_("To Port"),
filters=(check_rule_template,))
source = tables.Column(get_source, verbose_name=_("Source"))
port_range = tables.Column(get_port_range,
verbose_name=_("Port Range"))
remote = tables.Column(get_remote, verbose_name=_("Remote"))
def sanitize_id(self, obj_id):
return get_int_or_uuid(obj_id)

View File

@ -18,6 +18,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import cgi
from django.conf import settings
from django.core.urlresolvers import reverse
from django import http
@ -41,6 +43,8 @@ def strip_absolute_base(uri):
class SecurityGroupsViewTests(test.TestCase):
secgroup_backend = 'nova'
def setUp(self):
super(SecurityGroupsViewTests, self).setUp()
sec_group = self.security_groups.first()
@ -56,10 +60,10 @@ class SecurityGroupsViewTests(test.TestCase):
self.assertTemplateUsed(res,
'project/access_and_security/security_groups/create.html')
@test.create_stubs({api.network: ('security_group_create',)})
def test_create_security_groups_post(self):
sec_group = self.security_groups.first()
self.mox.StubOutWithMock(api.nova, 'security_group_create')
api.nova.security_group_create(IsA(http.HttpRequest),
api.network.security_group_create(IsA(http.HttpRequest),
sec_group.name,
sec_group.description) \
.AndReturn(sec_group)
@ -71,10 +75,10 @@ class SecurityGroupsViewTests(test.TestCase):
res = self.client.post(SG_CREATE_URL, formData)
self.assertRedirectsNoFollow(res, INDEX_URL)
@test.create_stubs({api.network: ('security_group_create',)})
def test_create_security_groups_post_exception(self):
sec_group = self.security_groups.first()
self.mox.StubOutWithMock(api.nova, 'security_group_create')
api.nova.security_group_create(IsA(http.HttpRequest),
api.network.security_group_create(IsA(http.HttpRequest),
sec_group.name,
sec_group.description) \
.AndRaise(self.exceptions.nova)
@ -87,9 +91,9 @@ class SecurityGroupsViewTests(test.TestCase):
self.assertMessageCount(error=1)
self.assertRedirectsNoFollow(res, INDEX_URL)
@test.create_stubs({api.network: ('security_group_create',)})
def test_create_security_groups_post_wrong_name(self):
sec_group = self.security_groups.first()
self.mox.StubOutWithMock(api.nova, 'security_group_create')
fail_name = sec_group.name + ' invalid'
self.mox.ReplayAll()
@ -101,22 +105,22 @@ class SecurityGroupsViewTests(test.TestCase):
'project/access_and_security/security_groups/create.html')
self.assertContains(res, "ASCII")
@test.create_stubs({api.network: ('security_group_get',)})
def test_detail_get(self):
sec_group = self.security_groups.first()
self.mox.StubOutWithMock(api.nova, 'security_group_get')
api.nova.security_group_get(IsA(http.HttpRequest),
sec_group.id).AndReturn(sec_group)
api.network.security_group_get(IsA(http.HttpRequest),
sec_group.id).AndReturn(sec_group)
self.mox.ReplayAll()
res = self.client.get(self.detail_url)
self.assertTemplateUsed(res,
'project/access_and_security/security_groups/detail.html')
@test.create_stubs({api.network: ('security_group_get',)})
def test_detail_get_exception(self):
sec_group = self.security_groups.first()
self.mox.StubOutWithMock(api.nova, 'security_group_get')
api.nova.security_group_get(IsA(http.HttpRequest),
api.network.security_group_get(IsA(http.HttpRequest),
sec_group.id) \
.AndRaise(self.exceptions.nova)
@ -125,21 +129,25 @@ class SecurityGroupsViewTests(test.TestCase):
res = self.client.get(self.detail_url)
self.assertRedirectsNoFollow(res, INDEX_URL)
@test.create_stubs({api.network: ('security_group_rule_create',
'security_group_list',
'security_group_backend')})
def test_detail_add_rule_cidr(self):
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self.security_group_rules.first()
self.mox.StubOutWithMock(api.nova, 'security_group_rule_create')
self.mox.StubOutWithMock(api.nova, 'security_group_list')
api.nova.security_group_rule_create(IsA(http.HttpRequest),
sec_group.id,
rule.ip_protocol,
int(rule.from_port),
int(rule.to_port),
rule.ip_range['cidr'],
None).AndReturn(rule)
api.nova.security_group_list(
api.network.security_group_backend(
IsA(http.HttpRequest)).AndReturn(self.secgroup_backend)
api.network.security_group_rule_create(IsA(http.HttpRequest),
sec_group.id,
'ingress', 'IPv4',
rule.ip_protocol,
int(rule.from_port),
int(rule.to_port),
rule.ip_range['cidr'],
None).AndReturn(rule)
api.network.security_group_list(
IsA(http.HttpRequest)).AndReturn(sec_group_list)
self.mox.ReplayAll()
@ -147,55 +155,68 @@ class SecurityGroupsViewTests(test.TestCase):
'id': sec_group.id,
'port_or_range': 'port',
'port': rule.from_port,
'ip_protocol': rule.ip_protocol,
'rule_menu': rule.ip_protocol,
'cidr': rule.ip_range['cidr'],
'source': 'cidr'}
'remote': 'cidr'}
res = self.client.post(self.edit_url, formData)
self.assertRedirectsNoFollow(res, self.detail_url)
@test.create_stubs({api.network: ('security_group_rule_create',
'security_group_list',
'security_group_backend')})
def test_detail_add_rule_cidr_with_template(self):
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self.security_group_rules.first()
self.mox.StubOutWithMock(api.nova, 'security_group_rule_create')
self.mox.StubOutWithMock(api.nova, 'security_group_list')
api.nova.security_group_rule_create(IsA(http.HttpRequest),
sec_group.id,
rule.ip_protocol,
int(rule.from_port),
int(rule.to_port),
rule.ip_range['cidr'],
None).AndReturn(rule)
api.nova.security_group_list(
api.network.security_group_backend(
IsA(http.HttpRequest)).AndReturn(self.secgroup_backend)
api.network.security_group_rule_create(IsA(http.HttpRequest),
sec_group.id,
'ingress', 'IPv4',
rule.ip_protocol,
int(rule.from_port),
int(rule.to_port),
rule.ip_range['cidr'],
None).AndReturn(rule)
api.network.security_group_list(
IsA(http.HttpRequest)).AndReturn(sec_group_list)
self.mox.ReplayAll()
formData = {'method': 'AddRule',
'id': sec_group.id,
'ip_protocol': 'http',
'rule_menu': 'http',
'port_or_range': 'port',
'cidr': rule.ip_range['cidr'],
'source': 'cidr'}
'remote': 'cidr'}
res = self.client.post(self.edit_url, formData)
self.assertRedirectsNoFollow(res, self.detail_url)
def _get_source_group_rule(self):
return self.security_group_rules.get(id=3)
@test.create_stubs({api.network: ('security_group_rule_create',
'security_group_list',
'security_group_backend')})
def test_detail_add_rule_self_as_source_group(self):
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self.security_group_rules.get(id=3)
rule = self._get_source_group_rule()
self.mox.StubOutWithMock(api.nova, 'security_group_rule_create')
self.mox.StubOutWithMock(api.nova, 'security_group_list')
api.nova.security_group_rule_create(
api.network.security_group_backend(
IsA(http.HttpRequest)).AndReturn(self.secgroup_backend)
api.network.security_group_rule_create(
IsA(http.HttpRequest),
sec_group.id,
'ingress',
# ethertype is empty for source_group of Nova Security Group
'',
rule.ip_protocol,
int(rule.from_port),
int(rule.to_port),
None,
u'%s' % sec_group.id).AndReturn(rule)
api.nova.security_group_list(
api.network.security_group_list(
IsA(http.HttpRequest)).AndReturn(sec_group_list)
self.mox.ReplayAll()
@ -203,50 +224,84 @@ class SecurityGroupsViewTests(test.TestCase):
'id': sec_group.id,
'port_or_range': 'port',
'port': rule.from_port,
'ip_protocol': rule.ip_protocol,
'rule_menu': rule.ip_protocol,
'cidr': '0.0.0.0/0',
'security_group': sec_group.id,
'source': 'sg'}
'remote': 'sg'}
res = self.client.post(self.edit_url, formData)
self.assertRedirectsNoFollow(res, self.detail_url)
@test.create_stubs({api.network: ('security_group_rule_create',
'security_group_list',
'security_group_backend')})
def test_detail_add_rule_self_as_source_group_with_template(self):
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self.security_group_rules.get(id=3)
rule = self._get_source_group_rule()
self.mox.StubOutWithMock(api.nova, 'security_group_rule_create')
self.mox.StubOutWithMock(api.nova, 'security_group_list')
api.nova.security_group_rule_create(
api.network.security_group_backend(
IsA(http.HttpRequest)).AndReturn(self.secgroup_backend)
api.network.security_group_rule_create(
IsA(http.HttpRequest),
sec_group.id,
'ingress',
# ethertype is empty for source_group of Nova Security Group
'',
rule.ip_protocol,
int(rule.from_port),
int(rule.to_port),
None,
u'%s' % sec_group.id).AndReturn(rule)
api.nova.security_group_list(
api.network.security_group_list(
IsA(http.HttpRequest)).AndReturn(sec_group_list)
self.mox.ReplayAll()
formData = {'method': 'AddRule',
'id': sec_group.id,
'ip_protocol': 'http',
'rule_menu': 'http',
'port_or_range': 'port',
'cidr': '0.0.0.0/0',
'security_group': sec_group.id,
'source': 'sg'}
'remote': 'sg'}
res = self.client.post(self.edit_url, formData)
self.assertRedirectsNoFollow(res, self.detail_url)
@test.create_stubs({api.network: ('security_group_list',
'security_group_backend')})
def test_detail_invalid_port(self):
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self.security_group_rules.first()
api.network.security_group_backend(
IsA(http.HttpRequest)).AndReturn(self.secgroup_backend)
api.network.security_group_list(
IsA(http.HttpRequest)).AndReturn(sec_group_list)
self.mox.ReplayAll()
formData = {'method': 'AddRule',
'id': sec_group.id,
'port_or_range': 'port',
'port': None,
'rule_menu': rule.ip_protocol,
'cidr': rule.ip_range['cidr'],
'remote': 'cidr'}
res = self.client.post(self.edit_url, formData)
self.assertNoMessages()
self.assertContains(res, "The specified port is invalid")
@test.create_stubs({api.network: ('security_group_list',
'security_group_backend')})
def test_detail_invalid_port_range(self):
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self.security_group_rules.first()
self.mox.StubOutWithMock(api.nova, 'security_group_list')
api.nova.security_group_list(
IsA(http.HttpRequest)).AndReturn(sec_group_list)
for i in range(3):
api.network.security_group_backend(
IsA(http.HttpRequest)).AndReturn(self.secgroup_backend)
api.network.security_group_list(
IsA(http.HttpRequest)).AndReturn(sec_group_list)
self.mox.ReplayAll()
formData = {'method': 'AddRule',
@ -254,35 +309,53 @@ class SecurityGroupsViewTests(test.TestCase):
'port_or_range': 'range',
'from_port': rule.from_port,
'to_port': int(rule.from_port) - 1,
'ip_protocol': rule.ip_protocol,
'rule_menu': rule.ip_protocol,
'cidr': rule.ip_range['cidr'],
'source': 'cidr'}
'remote': 'cidr'}
res = self.client.post(self.edit_url, formData)
self.assertNoMessages()
self.assertContains(res, "greater than or equal to")
@test.create_stubs({api.nova: ('security_group_get',
'security_group_list')})
formData = {'method': 'AddRule',
'id': sec_group.id,
'port_or_range': 'range',
'from_port': None,
'to_port': rule.to_port,
'rule_menu': rule.ip_protocol,
'cidr': rule.ip_range['cidr'],
'remote': 'cidr'}
res = self.client.post(self.edit_url, formData)
self.assertNoMessages()
self.assertContains(res, cgi.escape('"from" port number is invalid',
quote=True))
formData = {'method': 'AddRule',
'id': sec_group.id,
'port_or_range': 'range',
'from_port': rule.from_port,
'to_port': None,
'rule_menu': rule.ip_protocol,
'cidr': rule.ip_range['cidr'],
'remote': 'cidr'}
res = self.client.post(self.edit_url, formData)
self.assertNoMessages()
self.assertContains(res, cgi.escape('"to" port number is invalid',
quote=True))
@test.create_stubs({api.network: ('security_group_get',
'security_group_list',
'security_group_backend')})
def test_detail_invalid_icmp_rule(self):
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
icmp_rule = self.security_group_rules.list()[1]
# 1st Test
api.nova.security_group_list(
IsA(http.HttpRequest)).AndReturn(sec_group_list)
# 2nd Test
api.nova.security_group_list(
IsA(http.HttpRequest)).AndReturn(sec_group_list)
# 3rd Test
api.nova.security_group_list(
IsA(http.HttpRequest)).AndReturn(sec_group_list)
# 4th Test
api.nova.security_group_list(
IsA(http.HttpRequest)).AndReturn(sec_group_list)
# Call POST 4 times
for i in range(4):
api.network.security_group_backend(
IsA(http.HttpRequest)).AndReturn(self.secgroup_backend)
api.network.security_group_list(
IsA(http.HttpRequest)).AndReturn(sec_group_list)
self.mox.ReplayAll()
@ -291,9 +364,9 @@ class SecurityGroupsViewTests(test.TestCase):
'port_or_range': 'port',
'icmp_type': 256,
'icmp_code': icmp_rule.to_port,
'ip_protocol': icmp_rule.ip_protocol,
'rule_menu': icmp_rule.ip_protocol,
'cidr': icmp_rule.ip_range['cidr'],
'source': 'cidr'}
'remote': 'cidr'}
res = self.client.post(self.edit_url, formData)
self.assertNoMessages()
self.assertContains(res, "The ICMP type not in range (-1, 255)")
@ -303,9 +376,9 @@ class SecurityGroupsViewTests(test.TestCase):
'port_or_range': 'port',
'icmp_type': icmp_rule.from_port,
'icmp_code': 256,
'ip_protocol': icmp_rule.ip_protocol,
'rule_menu': icmp_rule.ip_protocol,
'cidr': icmp_rule.ip_range['cidr'],
'source': 'cidr'}
'remote': 'cidr'}
res = self.client.post(self.edit_url, formData)
self.assertNoMessages()
self.assertContains(res, "The ICMP code not in range (-1, 255)")
@ -315,9 +388,9 @@ class SecurityGroupsViewTests(test.TestCase):
'port_or_range': 'port',
'icmp_type': icmp_rule.from_port,
'icmp_code': None,
'ip_protocol': icmp_rule.ip_protocol,
'rule_menu': icmp_rule.ip_protocol,
'cidr': icmp_rule.ip_range['cidr'],
'source_group': 'cidr'}
'remote': 'cidr'}
res = self.client.post(self.edit_url, formData)
self.assertNoMessages()
self.assertContains(res, "The ICMP code is invalid")
@ -327,29 +400,32 @@ class SecurityGroupsViewTests(test.TestCase):
'port_or_range': 'port',
'icmp_type': None,
'icmp_code': icmp_rule.to_port,
'ip_protocol': icmp_rule.ip_protocol,
'rule_menu': icmp_rule.ip_protocol,
'cidr': icmp_rule.ip_range['cidr'],
'source': 'cidr'}
'remote': 'cidr'}
res = self.client.post(self.edit_url, formData)
self.assertNoMessages()
self.assertContains(res, "The ICMP type is invalid")
@test.create_stubs({api.network: ('security_group_rule_create',
'security_group_list',
'security_group_backend')})
def test_detail_add_rule_exception(self):
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self.security_group_rules.first()
self.mox.StubOutWithMock(api.nova, 'security_group_rule_create')
self.mox.StubOutWithMock(api.nova, 'security_group_list')
api.nova.security_group_rule_create(
api.network.security_group_backend(
IsA(http.HttpRequest)).AndReturn(self.secgroup_backend)
api.network.security_group_rule_create(
IsA(http.HttpRequest),
sec_group.id,
sec_group.id, 'ingress', 'IPv4',
rule.ip_protocol,
int(rule.from_port),
int(rule.to_port),
rule.ip_range['cidr'],
None).AndRaise(self.exceptions.nova)
api.nova.security_group_list(
api.network.security_group_list(
IsA(http.HttpRequest)).AndReturn(sec_group_list)
self.mox.ReplayAll()
@ -357,18 +433,18 @@ class SecurityGroupsViewTests(test.TestCase):
'id': sec_group.id,
'port_or_range': 'port',
'port': rule.from_port,
'ip_protocol': rule.ip_protocol,
'rule_menu': rule.ip_protocol,
'cidr': rule.ip_range['cidr'],
'source': 'cidr'}
'remote': 'cidr'}
res = self.client.post(self.edit_url, formData)
self.assertRedirectsNoFollow(res, self.detail_url)
@test.create_stubs({api.network: ('security_group_rule_delete',)})
def test_detail_delete_rule(self):
sec_group = self.security_groups.first()
rule = self.security_group_rules.first()
self.mox.StubOutWithMock(api.nova, 'security_group_rule_delete')
api.nova.security_group_rule_delete(IsA(http.HttpRequest), rule.id)
api.network.security_group_rule_delete(IsA(http.HttpRequest), rule.id)
self.mox.ReplayAll()
form_data = {"action": "rules__delete__%s" % rule.id}
@ -379,12 +455,12 @@ class SecurityGroupsViewTests(test.TestCase):
self.assertEqual(strip_absolute_base(handled['location']),
self.detail_url)
@test.create_stubs({api.network: ('security_group_rule_delete',)})
def test_detail_delete_rule_exception(self):
sec_group = self.security_groups.first()
rule = self.security_group_rules.first()
self.mox.StubOutWithMock(api.nova, 'security_group_rule_delete')
api.nova.security_group_rule_delete(
api.network.security_group_rule_delete(
IsA(http.HttpRequest),
rule.id).AndRaise(self.exceptions.nova)
self.mox.ReplayAll()
@ -397,11 +473,11 @@ class SecurityGroupsViewTests(test.TestCase):
self.assertEqual(strip_absolute_base(handled['location']),
self.detail_url)
@test.create_stubs({api.network: ('security_group_delete',)})
def test_delete_group(self):
sec_group = self.security_groups.get(name="other_group")
self.mox.StubOutWithMock(api.nova, 'security_group_delete')
api.nova.security_group_delete(IsA(http.HttpRequest), sec_group.id)
api.network.security_group_delete(IsA(http.HttpRequest), sec_group.id)
self.mox.ReplayAll()
form_data = {"action": "security_groups__delete__%s" % sec_group.id}
@ -411,11 +487,11 @@ class SecurityGroupsViewTests(test.TestCase):
self.assertEqual(strip_absolute_base(handled['location']),
INDEX_URL)
@test.create_stubs({api.network: ('security_group_delete',)})
def test_delete_group_exception(self):
sec_group = self.security_groups.get(name="other_group")
self.mox.StubOutWithMock(api.nova, 'security_group_delete')
api.nova.security_group_delete(
api.network.security_group_delete(
IsA(http.HttpRequest),
sec_group.id).AndRaise(self.exceptions.nova)
@ -430,9 +506,11 @@ class SecurityGroupsViewTests(test.TestCase):
INDEX_URL)
class SecurityGroupsNeutronTests(SecurityGroupsViewTests):
class SecurityGroupsNovaNeutronDriverTests(SecurityGroupsViewTests):
secgroup_backend = 'nova'
def setUp(self):
super(SecurityGroupsNeutronTests, self).setUp()
super(SecurityGroupsNovaNeutronDriverTests, self).setUp()
self._sec_groups_orig = self.security_groups
self.security_groups = self.security_groups_uuid
@ -448,7 +526,137 @@ class SecurityGroupsNeutronTests(SecurityGroupsViewTests):
'security_groups:add_rule',
args=[sec_group.id])
def tearDown(self):
self.security_groups = self._sec_groups_orig
self.security_group_rules = self._sec_group_rules_orig
super(SecurityGroupsNovaNeutronDriverTests, self).tearDown()
class SecurityGroupsNeutronTests(SecurityGroupsViewTests):
secgroup_backend = 'neutron'
def setUp(self):
super(SecurityGroupsNeutronTests, self).setUp()
self._sec_groups_orig = self.security_groups
self.security_groups = self.q_secgroups
self._sec_group_rules_orig = self.security_group_rules
self.security_group_rules = self.q_secgroup_rules
sec_group = self.security_groups.first()
self.detail_url = reverse('horizon:project:access_and_security:'
'security_groups:detail',
args=[sec_group.id])
self.edit_url = reverse('horizon:project:access_and_security:'
'security_groups:add_rule',
args=[sec_group.id])
def tearDown(self):
self.security_groups = self._sec_groups_orig
self.security_group_rules = self._sec_group_rules_orig
super(SecurityGroupsNeutronTests, self).tearDown()
def _get_source_group_rule(self):
for rule in self.security_group_rules.list():
if rule.group:
return rule
raise Exception("No matches found.")
# Additional tests for Neutron Security Group original features
@test.create_stubs({api.network: ('security_group_rule_create',
'security_group_list',
'security_group_backend')})
def test_detail_add_rule_custom_protocol(self):
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self.security_group_rules.first()
api.network.security_group_backend(
IsA(http.HttpRequest)).AndReturn(self.secgroup_backend)
api.network.security_group_rule_create(IsA(http.HttpRequest),
sec_group.id, 'ingress', 'IPv6',
37, None, None, 'fe80::/48',
None).AndReturn(rule)
api.network.security_group_list(
IsA(http.HttpRequest)).AndReturn(sec_group_list)
self.mox.ReplayAll()
formData = {'method': 'AddRule',
'id': sec_group.id,
'rule_menu': 'custom',
'direction': 'ingress',
'port_or_range': 'port',
'ip_protocol': 37,
'cidr': 'fe80::/48',
'remote': 'cidr'}
res = self.client.post(self.edit_url, formData)
self.assertRedirectsNoFollow(res, self.detail_url)
@test.create_stubs({api.network: ('security_group_rule_create',
'security_group_list',
'security_group_backend')})
def test_detail_add_rule_egress(self):
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self.security_group_rules.first()
api.network.security_group_backend(
IsA(http.HttpRequest)).AndReturn(self.secgroup_backend)
api.network.security_group_rule_create(IsA(http.HttpRequest),
sec_group.id, 'egress', 'IPv4',
'udp', 80, 80, '10.1.1.0/24',
None).AndReturn(rule)
api.network.security_group_list(
IsA(http.HttpRequest)).AndReturn(sec_group_list)
self.mox.ReplayAll()
formData = {'method': 'AddRule',
'id': sec_group.id,
'direction': 'egress',
'rule_menu': 'udp',
'port_or_range': 'port',
'port': 80,
'cidr': '10.1.1.0/24',
'remote': 'cidr'}
res = self.client.post(self.edit_url, formData)
self.assertRedirectsNoFollow(res, self.detail_url)
@test.create_stubs({api.network: ('security_group_rule_create',
'security_group_list',
'security_group_backend')})
def test_detail_add_rule_source_group_with_direction_ethertype(self):
sec_group = self.security_groups.first()
sec_group_list = self.security_groups.list()
rule = self._get_source_group_rule()
api.network.security_group_backend(
IsA(http.HttpRequest)).AndReturn(self.secgroup_backend)
api.network.security_group_rule_create(
IsA(http.HttpRequest),
sec_group.id,
'egress',
# ethertype is empty for source_group of Nova Security Group
'IPv6',
rule.ip_protocol,
int(rule.from_port),
int(rule.to_port),
None,
u'%s' % sec_group.id).AndReturn(rule)
api.network.security_group_list(
IsA(http.HttpRequest)).AndReturn(sec_group_list)
self.mox.ReplayAll()
formData = {'method': 'AddRule',
'id': sec_group.id,
'direction': 'egress',
'port_or_range': 'port',
'port': rule.from_port,
'rule_menu': rule.ip_protocol,
'cidr': '0.0.0.0/0',
'security_group': sec_group.id,
'remote': 'sg',
'ethertype': 'IPv6'}
res = self.client.post(self.edit_url, formData)
self.assertRedirectsNoFollow(res, self.detail_url)

View File

@ -32,8 +32,8 @@ from horizon import forms
from horizon import tables
from openstack_dashboard import api
from openstack_dashboard.utils.filters import get_int_or_uuid
from ..floating_ips.utils import get_int_or_uuid
from .forms import AddRule
from .forms import CreateGroup
from .tables import RulesTable
@ -46,19 +46,25 @@ class DetailView(tables.DataTableView):
table_class = RulesTable
template_name = 'project/access_and_security/security_groups/detail.html'
def _get_data(self):
if not hasattr(self, '_sg'):
sg_id = get_int_or_uuid(self.kwargs['security_group_id'])
try:
self._sg = api.network.security_group_get(self.request, sg_id)
except:
redirect = reverse('horizon:project:access_and_security:index')
exceptions.handle(self.request,
_('Unable to retrieve security group.'),
redirect=redirect)
return self._sg
def get_data(self):
security_group_id = get_int_or_uuid(self.kwargs['security_group_id'])
try:
self.object = api.nova.security_group_get(self.request,
security_group_id)
rules = [api.nova.SecurityGroupRule(rule) for
rule in self.object.rules]
except:
redirect = reverse('horizon:project:access_and_security:index')
exceptions.handle(self.request,
_('Unable to retrieve security group.'),
redirect=redirect)
return rules
return self._get_data().rules
def get_context_data(self, **kwargs):
context = super(DetailView, self).get_context_data(**kwargs)
context["security_group"] = self._get_data()
return context
class AddRuleView(forms.ModalFormView):
@ -82,7 +88,7 @@ class AddRuleView(forms.ModalFormView):
kwargs = super(AddRuleView, self).get_form_kwargs()
try:
groups = api.nova.security_group_list(self.request)
groups = api.network.security_group_list(self.request)
except:
groups = []
exceptions.handle(self.request,

View File

@ -43,7 +43,7 @@ class SecurityGroupsTab(tabs.TableTab):
def get_security_groups_data(self):
try:
security_groups = nova.security_group_list(self.request)
security_groups = network.security_group_list(self.request)
except:
security_groups = []
exceptions.handle(self.request,

View File

@ -19,7 +19,7 @@
<p>{% blocktrans %}Rules define which traffic is allowed to instances assigned to the security group. A security group rule consists of three main parts:{% endblocktrans %}</p>
<p><strong>{% trans "Rule" %}</strong>: {% blocktrans %}You can specify the desired rule template or use custom rules, the options are Custom TCP Rule, Custom UDP Rule, or Custom ICMP Rule.{% endblocktrans %}</p>
<p><strong>{% trans "Open Port/Port Range" %}</strong>: {% blocktrans %}For TCP and UDP rules you may choose to open either a single port or a range of ports. Selecting the "Port Range" option will provide you with space to provide both the starting and ending ports for the range. For ICMP rules you instead specify an ICMP type and code in the spaces provided.{% endblocktrans %}</p>
<p><strong>{% trans "Source" %}</strong>: {% blocktrans %}You must specify the source of the traffic to be allowed via this rule. You may do so either in the form of an IP address block (CIDR) or via a source group (Security Group). Selecting a security group as the source will allow any other instance in that security group access to any other instance via this rule.{% endblocktrans %}</p>
<p><strong>{% trans "Remote" %}</strong>: {% blocktrans %}You must specify the source of the traffic to be allowed via this rule. You may do so either in the form of an IP address block (CIDR) or via a source group (Security Group). Selecting a security group as the source will allow any other instance in that security group access to any other instance via this rule.{% endblocktrans %}</p>
</div>
{% endblock %}

View File

@ -3,7 +3,7 @@
{% block title %}{% trans "Edit Security Group Rules" %}{% endblock %}
{% block page_header %}
{% include "horizon/common/_page_header.html" with title=_("Edit Security Group Rules") %}
{% include "horizon/common/_page_header.html" with title=_("Edit Security Group Rules: ")|add:security_group.name %}
{% endblock page_header %}
{% block main %}

View File

@ -40,7 +40,7 @@ class AccessAndSecurityTests(test.TestCase):
sec_groups = self.security_groups.list()
floating_ips = self.floating_ips.list()
self.mox.StubOutWithMock(api.network, 'tenant_floating_ip_list')
self.mox.StubOutWithMock(api.nova, 'security_group_list')
self.mox.StubOutWithMock(api.network, 'security_group_list')
self.mox.StubOutWithMock(api.nova, 'keypair_list')
self.mox.StubOutWithMock(api.nova, 'server_list')
@ -50,7 +50,7 @@ class AccessAndSecurityTests(test.TestCase):
api.nova.keypair_list(IsA(http.HttpRequest)).AndReturn(keypairs)
api.network.tenant_floating_ip_list(IsA(http.HttpRequest)) \
.AndReturn(floating_ips)
api.nova.security_group_list(IsA(http.HttpRequest)) \
api.network.security_group_list(IsA(http.HttpRequest)) \
.AndReturn(sec_groups)
self.mox.ReplayAll()

View File

@ -419,8 +419,8 @@ class InstanceTests(test.TestCase):
@test.create_stubs({api.nova: ("server_get",
"instance_volumes_list",
"flavor_get",
"server_security_groups")})
"flavor_get"),
api.network: ("server_security_groups",)})
def test_instance_details_volumes(self):
server = self.servers.first()
volumes = [self.volumes.list()[1]]
@ -430,7 +430,7 @@ class InstanceTests(test.TestCase):
server.id).AndReturn(volumes)
api.nova.flavor_get(IsA(http.HttpRequest), server.flavor['id']) \
.AndReturn(self.flavors.first())
api.nova.server_security_groups(IsA(http.HttpRequest), server.id) \
api.network.server_security_groups(IsA(http.HttpRequest), server.id) \
.AndReturn(self.security_groups.first())
self.mox.ReplayAll()
@ -443,8 +443,8 @@ class InstanceTests(test.TestCase):
@test.create_stubs({api.nova: ("server_get",
"instance_volumes_list",
"flavor_get",
"server_security_groups")})
"flavor_get"),
api.network: ("server_security_groups",)})
def test_instance_details_volume_sorting(self):
server = self.servers.first()
volumes = self.volumes.list()[1:3]
@ -454,7 +454,7 @@ class InstanceTests(test.TestCase):
server.id).AndReturn(volumes)
api.nova.flavor_get(IsA(http.HttpRequest), server.flavor['id']) \
.AndReturn(self.flavors.first())
api.nova.server_security_groups(IsA(http.HttpRequest), server.id) \
api.network.server_security_groups(IsA(http.HttpRequest), server.id) \
.AndReturn(self.security_groups.first())
self.mox.ReplayAll()
@ -471,8 +471,8 @@ class InstanceTests(test.TestCase):
@test.create_stubs({api.nova: ("server_get",
"instance_volumes_list",
"flavor_get",
"server_security_groups",)})
"flavor_get"),
api.network: ("server_security_groups",)})
def test_instance_details_metadata(self):
server = self.servers.first()
@ -481,7 +481,7 @@ class InstanceTests(test.TestCase):
server.id).AndReturn([])
api.nova.flavor_get(IsA(http.HttpRequest), server.flavor['id']) \
.AndReturn(self.flavors.first())
api.nova.server_security_groups(IsA(http.HttpRequest), server.id) \
api.network.server_security_groups(IsA(http.HttpRequest), server.id) \
.AndReturn(self.security_groups.list())
self.mox.ReplayAll()
@ -643,19 +643,19 @@ class InstanceTests(test.TestCase):
self.assertRedirects(res, redir_url)
instance_update_get_stubs = {
api.nova: ('server_get',
'security_group_list',
'server_security_groups',)}
api.nova: ('server_get',),
api.network: ('security_group_list',
'server_security_groups',)}
@test.create_stubs(instance_update_get_stubs)
def test_instance_update_get(self):
server = self.servers.first()
api.nova.server_get(IsA(http.HttpRequest), server.id).AndReturn(server)
api.nova.security_group_list(IsA(http.HttpRequest)) \
api.network.security_group_list(IsA(http.HttpRequest)) \
.AndReturn([])
api.nova.server_security_groups(IsA(http.HttpRequest),
server.id).AndReturn([])
api.network.server_security_groups(IsA(http.HttpRequest),
server.id).AndReturn([])
self.mox.ReplayAll()
@ -688,64 +688,33 @@ class InstanceTests(test.TestCase):
return self.client.post(url, formData)
instance_update_post_stubs = {
api.nova: ('server_get', 'server_update',
'security_group_list',
'server_security_groups',
'server_add_security_group',
'server_remove_security_group')}
api.nova: ('server_get', 'server_update'),
api.network: ('security_group_list',
'server_security_groups',
'server_update_security_groups')}
@test.create_stubs(instance_update_post_stubs)
def test_instance_update_post(self):
server = self.servers.first()
secgroups = self.security_groups.list()[:3]
new_name = 'manuel'
api.nova.server_get(IsA(http.HttpRequest), server.id).AndReturn(server)
api.nova.security_group_list(IsA(http.HttpRequest)) \
.AndReturn(secgroups)
api.nova.server_security_groups(IsA(http.HttpRequest),
server.id).AndReturn([])
api.nova.server_update(IsA(http.HttpRequest),
server.id,
new_name).AndReturn(server)
api.nova.server_security_groups(IsA(http.HttpRequest),
server.id).AndReturn([])
self.mox.ReplayAll()
res = self._instance_update_post(server.id, new_name, [])
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, INDEX_URL)
@test.create_stubs(instance_update_post_stubs)
def test_instance_update_secgroup_post(self):
server = self.servers.first()
secgroups = self.security_groups.list()[:3]
server_groups = [secgroups[0], secgroups[1]]
wanted_groups = [secgroups[1].name, secgroups[2].name]
expect_add = secgroups[2].name
expect_rm = secgroups[0].name
wanted_groups = [secgroups[1].id, secgroups[2].id]
expect_add = secgroups[2].id
expect_rm = secgroups[0].id
api.nova.server_get(IsA(http.HttpRequest), server.id).AndReturn(server)
api.nova.security_group_list(IsA(http.HttpRequest)) \
api.network.security_group_list(IsA(http.HttpRequest)) \
.AndReturn(secgroups)
api.nova.server_security_groups(IsA(http.HttpRequest),
server.id).AndReturn(server_groups)
api.network.server_security_groups(IsA(http.HttpRequest),
server.id).AndReturn(server_groups)
api.nova.server_update(IsA(http.HttpRequest),
server.id,
server.name).AndReturn(server)
api.nova.server_security_groups(IsA(http.HttpRequest),
server.id).AndReturn(server_groups)
api.nova.server_add_security_group(IsA(http.HttpRequest),
server.id,
expect_add).AndReturn(server)
api.nova.server_remove_security_group(IsA(http.HttpRequest),
server.id,
expect_rm).AndReturn(server)
api.network.server_update_security_groups(IsA(http.HttpRequest),
server.id,
wanted_groups)
self.mox.ReplayAll()
@ -758,16 +727,15 @@ class InstanceTests(test.TestCase):
server = self.servers.first()
api.nova.server_get(IsA(http.HttpRequest), server.id).AndReturn(server)
api.nova.security_group_list(IsA(http.HttpRequest)) \
api.network.security_group_list(IsA(http.HttpRequest)) \
.AndReturn([])
api.nova.server_security_groups(IsA(http.HttpRequest),
server.id).AndReturn([])
api.network.server_security_groups(IsA(http.HttpRequest),
server.id).AndReturn([])
api.nova.server_update(IsA(http.HttpRequest), server.id, server.name) \
.AndRaise(self.exceptions.nova)
api.nova.server_security_groups(IsA(http.HttpRequest),
server.id) \
.AndRaise(self.exceptions.nova)
api.network.server_update_security_groups(
IsA(http.HttpRequest), server.id, [])
self.mox.ReplayAll()
@ -779,17 +747,17 @@ class InstanceTests(test.TestCase):
server = self.servers.first()
api.nova.server_get(IsA(http.HttpRequest), server.id).AndReturn(server)
api.nova.security_group_list(IsA(http.HttpRequest)) \
api.network.security_group_list(IsA(http.HttpRequest)) \
.AndReturn([])
api.nova.server_security_groups(IsA(http.HttpRequest),
server.id).AndReturn([])
api.network.server_security_groups(IsA(http.HttpRequest),
server.id).AndReturn([])
api.nova.server_update(IsA(http.HttpRequest),
server.id,
server.name).AndReturn(server)
api.nova.server_security_groups(IsA(http.HttpRequest),
server.id) \
.AndRaise(self.exceptions.nova)
api.network.server_update_security_groups(
IsA(http.HttpRequest),
server.id, []).AndRaise(self.exceptions.nova)
self.mox.ReplayAll()
@ -798,9 +766,9 @@ class InstanceTests(test.TestCase):
@test.create_stubs({api.nova: ('flavor_list',
'keypair_list',
'security_group_list',
'tenant_absolute_limits',
'availability_zone_list',),
api.network: ('security_group_list',),
cinder: ('volume_snapshot_list',
'volume_list',),
api.neutron: ('network_list',),
@ -835,7 +803,7 @@ class InstanceTests(test.TestCase):
.AndReturn(self.flavors.list())
api.nova.keypair_list(IsA(http.HttpRequest)) \
.AndReturn(self.keypairs.list())
api.nova.security_group_list(IsA(http.HttpRequest)) \
api.network.security_group_list(IsA(http.HttpRequest)) \
.AndReturn(self.security_groups.list())
api.nova.availability_zone_list(IsA(http.HttpRequest)) \
.AndReturn(self.availability_zones.list())
@ -863,9 +831,9 @@ class InstanceTests(test.TestCase):
api.neutron: ('network_list',),
api.nova: ('flavor_list',
'keypair_list',
'security_group_list',
'availability_zone_list',
'server_create',),
api.network: ('security_group_list',),
cinder: ('volume_list',
'volume_snapshot_list',)})
def test_launch_instance_post(self):
@ -882,7 +850,7 @@ class InstanceTests(test.TestCase):
.AndReturn(self.flavors.list())
api.nova.keypair_list(IsA(http.HttpRequest)) \
.AndReturn(self.keypairs.list())
api.nova.security_group_list(IsA(http.HttpRequest)) \
api.network.security_group_list(IsA(http.HttpRequest)) \
.AndReturn(self.security_groups.list())
api.nova.availability_zone_list(IsA(http.HttpRequest)) \
.AndReturn(self.availability_zones.list())
@ -943,9 +911,9 @@ class InstanceTests(test.TestCase):
api.nova: ('flavor_list',
'tenant_absolute_limits',
'keypair_list',
'security_group_list',
'availability_zone_list',
'server_create',),
api.network: ('security_group_list',),
cinder: ('volume_list',
'volume_snapshot_list',)})
def test_launch_instance_post_boot_from_volume_with_image(self):
@ -983,7 +951,7 @@ class InstanceTests(test.TestCase):
.AndReturn(self.flavors.list())
api.nova.keypair_list(IsA(http.HttpRequest)) \
.AndReturn(self.keypairs.list())
api.nova.security_group_list(IsA(http.HttpRequest)) \
api.network.security_group_list(IsA(http.HttpRequest)) \
.AndReturn(self.security_groups.list())
api.nova.availability_zone_list(IsA(http.HttpRequest)) \
.AndReturn(self.availability_zones.list())
@ -1023,9 +991,9 @@ class InstanceTests(test.TestCase):
api.neutron: ('network_list',),
api.nova: ('flavor_list',
'keypair_list',
'security_group_list',
'availability_zone_list',
'server_create',),
api.network: ('security_group_list',),
cinder: ('volume_list',
'volume_snapshot_list',)})
def test_launch_instance_post_boot_from_volume(self):
@ -1045,7 +1013,7 @@ class InstanceTests(test.TestCase):
.AndReturn(self.flavors.list())
api.nova.keypair_list(IsA(http.HttpRequest)) \
.AndReturn(self.keypairs.list())
api.nova.security_group_list(IsA(http.HttpRequest)) \
api.network.security_group_list(IsA(http.HttpRequest)) \
.AndReturn(self.security_groups.list())
api.nova.availability_zone_list(IsA(http.HttpRequest)) \
.AndReturn(self.availability_zones.list())
@ -1107,8 +1075,8 @@ class InstanceTests(test.TestCase):
api.nova: ('server_create',
'flavor_list',
'keypair_list',
'security_group_list',
'availability_zone_list',),
api.network: ('security_group_list',),
cinder: ('volume_list',
'volume_snapshot_list',)})
def test_launch_instance_post_no_images_available_boot_from_volume(self):
@ -1128,7 +1096,7 @@ class InstanceTests(test.TestCase):
.AndReturn(self.flavors.list())
api.nova.keypair_list(IsA(http.HttpRequest)) \
.AndReturn(self.keypairs.list())
api.nova.security_group_list(IsA(http.HttpRequest)) \
api.network.security_group_list(IsA(http.HttpRequest)) \
.AndReturn(self.security_groups.list())
api.nova.availability_zone_list(IsA(http.HttpRequest)) \
.AndReturn(self.availability_zones.list())
@ -1192,8 +1160,8 @@ class InstanceTests(test.TestCase):
api.nova: ('flavor_list',
'keypair_list',
'availability_zone_list',
'security_group_list',
'tenant_absolute_limits',),
api.network: ('security_group_list',),
cinder: ('volume_list',
'volume_snapshot_list',)})
def test_launch_instance_post_no_images_available(self):
@ -1227,7 +1195,7 @@ class InstanceTests(test.TestCase):
.AndReturn(self.flavors.list())
api.nova.keypair_list(IsA(http.HttpRequest)) \
.AndReturn(self.keypairs.list())
api.nova.security_group_list(IsA(http.HttpRequest)) \
api.network.security_group_list(IsA(http.HttpRequest)) \
.AndReturn(self.security_groups.list())
api.nova.availability_zone_list(IsA(http.HttpRequest)) \
.AndReturn(self.availability_zones.list())
@ -1263,9 +1231,9 @@ class InstanceTests(test.TestCase):
api.neutron: ('network_list',),
cinder: ('volume_list',
'volume_snapshot_list',),
api.network: ('security_group_list',),
api.nova: ('flavor_list',
'keypair_list',
'security_group_list',
'tenant_absolute_limits',
'availability_zone_list',)})
def test_launch_flavorlist_error(self):
@ -1296,7 +1264,7 @@ class InstanceTests(test.TestCase):
.AndRaise(self.exceptions.nova)
api.nova.keypair_list(IsA(http.HttpRequest)) \
.AndReturn(self.keypairs.list())
api.nova.security_group_list(IsA(http.HttpRequest)) \
api.network.security_group_list(IsA(http.HttpRequest)) \
.AndReturn(self.security_groups.list())
api.nova.availability_zone_list(IsA(http.HttpRequest)) \
.AndReturn(self.availability_zones.list())
@ -1312,9 +1280,9 @@ class InstanceTests(test.TestCase):
api.neutron: ('network_list',),
api.nova: ('flavor_list',
'keypair_list',
'security_group_list',
'availability_zone_list',
'server_create',),
api.network: ('security_group_list',),
cinder: ('volume_list',
'volume_snapshot_list',)})
def test_launch_form_keystone_exception(self):
@ -1331,7 +1299,7 @@ class InstanceTests(test.TestCase):
.AndReturn(self.volumes.list())
api.nova.flavor_list(IgnoreArg()).AndReturn(self.flavors.list())
api.nova.keypair_list(IgnoreArg()).AndReturn(self.keypairs.list())
api.nova.security_group_list(IsA(http.HttpRequest)) \
api.network.security_group_list(IsA(http.HttpRequest)) \
.AndReturn(self.security_groups.list())
api.nova.availability_zone_list(IsA(http.HttpRequest)) \
.AndReturn(self.availability_zones.list())
@ -1391,9 +1359,9 @@ class InstanceTests(test.TestCase):
api.neutron: ('network_list',),
api.nova: ('flavor_list',
'keypair_list',
'security_group_list',
'tenant_absolute_limits',
'availability_zone_list',),
api.network: ('security_group_list',),
cinder: ('volume_list',
'volume_snapshot_list',)})
def test_launch_form_instance_count_error(self):
@ -1412,7 +1380,7 @@ class InstanceTests(test.TestCase):
.AndReturn(self.flavors.list())
api.nova.keypair_list(IsA(http.HttpRequest)) \
.AndReturn(self.keypairs.list())
api.nova.security_group_list(IsA(http.HttpRequest)) \
api.network.security_group_list(IsA(http.HttpRequest)) \
.AndReturn(self.security_groups.list())
api.nova.availability_zone_list(IsA(http.HttpRequest)) \
.AndReturn(self.availability_zones.list())
@ -1512,9 +1480,9 @@ class InstanceTests(test.TestCase):
@test.create_stubs({api.nova: ('flavor_list',
'keypair_list',
'security_group_list',
'availability_zone_list',
'tenant_absolute_limits',),
api.network: ('security_group_list',),
cinder: ('volume_snapshot_list',
'volume_list',),
api.neutron: ('network_list',),
@ -1550,7 +1518,7 @@ class InstanceTests(test.TestCase):
.AndReturn(self.flavors.list())
api.nova.keypair_list(IsA(http.HttpRequest)) \
.AndReturn([keypair])
api.nova.security_group_list(IsA(http.HttpRequest)) \
api.network.security_group_list(IsA(http.HttpRequest)) \
.AndReturn(self.security_groups.list())
api.nova.availability_zone_list(IsA(http.HttpRequest)) \
.AndReturn(self.availability_zones.list())

View File

@ -196,7 +196,7 @@ class DetailView(tabs.TabView):
instance.volumes.sort(key=lambda vol: vol.device)
instance.full_flavor = api.nova.flavor_get(
self.request, instance.flavor["id"])
instance.security_groups = api.nova.server_security_groups(
instance.security_groups = api.network.server_security_groups(
self.request, instance_id)
except:
redirect = reverse('horizon:project:instances:index')

View File

@ -377,7 +377,7 @@ class SetAccessControlsAction(workflows.Action):
def populate_groups_choices(self, request, context):
try:
groups = api.nova.security_group_list(request)
groups = api.network.security_group_list(request)
security_group_list = [(sg.name, sg.name) for sg in groups]
except:
exceptions.handle(request,

View File

@ -26,6 +26,7 @@ from horizon import forms
from horizon import workflows
from openstack_dashboard import api
from openstack_dashboard.utils.filters import get_int_or_uuid
INDEX_URL = "horizon:projects:instances:index"
@ -50,57 +51,30 @@ class UpdateInstanceSecurityGroupsAction(workflows.Action):
# Get list of available security groups
all_groups = []
try:
all_groups = api.nova.security_group_list(request)
all_groups = api.network.security_group_list(request)
except:
exceptions.handle(request, err_msg)
groups_list = [(group.name, group.name) for group in all_groups]
groups_list = [(group.id, group.name) for group in all_groups]
instance_groups = []
try:
instance_groups = api.nova.server_security_groups(request,
instance_id)
instance_groups = api.network.server_security_groups(request,
instance_id)
except Exception:
exceptions.handle(request, err_msg)
self.fields['role_member'].choices = groups_list
self.fields['role_member'].initial = [group.name
self.fields['role_member'].initial = [group.id
for group in instance_groups]
def handle(self, request, data):
instance_id = data['instance_id']
# update instance security groups
wanted_groups = set(data['wanted_groups'])
wanted_groups = map(get_int_or_uuid, data['wanted_groups'])
try:
current_groups = api.nova.server_security_groups(request,
instance_id)
except:
exceptions.handle(request, _("Couldn't get current security group "
"list for instance %s."
% instance_id))
api.network.server_update_security_groups(request, instance_id,
wanted_groups)
except Exception as e:
exceptions.handle(request, e.message)
return False
current_group_names = set(map(lambda g: g.name, current_groups))
groups_to_add = wanted_groups - current_group_names
groups_to_remove = current_group_names - wanted_groups
num_groups_to_modify = len(groups_to_add | groups_to_remove)
try:
for group in groups_to_add:
api.nova.server_add_security_group(request,
instance_id,
group)
num_groups_to_modify -= 1
for group in groups_to_remove:
api.nova.server_remove_security_group(request,
instance_id,
group)
num_groups_to_modify -= 1
except Exception:
exceptions.handle(request, _('Failed to modify %d instance '
'security groups.'
% num_groups_to_modify))
return False
return True
class Meta:

View File

@ -158,7 +158,8 @@ OPENSTACK_HYPERVISOR_FEATURES = {
# services provided by neutron. Currently only the load balancer service
# is available.
OPENSTACK_NEUTRON_NETWORK = {
'enable_lb': False
'enable_security_group': True,
'enable_lb': False,
}
# OPENSTACK_ENDPOINT_TYPE specifies the endpoint type to use for the endpoints

View File

@ -14,6 +14,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
import itertools
import uuid
from django import http
from mox import IsA
@ -23,13 +27,15 @@ from openstack_dashboard import api
from openstack_dashboard.test import helpers as test
class NetworkApiNovaFloatingIpTests(test.APITestCase):
class NetworkApiNovaTestBase(test.APITestCase):
def setUp(self):
super(NetworkApiNovaFloatingIpTests, self).setUp()
super(NetworkApiNovaTestBase, self).setUp()
self.mox.StubOutWithMock(api.base, 'is_service_enabled')
api.base.is_service_enabled(IsA(http.HttpRequest), 'network') \
.AndReturn(False)
class NetworkApiNovaFloatingIpTests(NetworkApiNovaTestBase):
def test_floating_ip_pools_list(self):
pool_names = ['pool1', 'pool2']
pools = [FloatingIPPool(None, {'name': pool}) for pool in pool_names]
@ -142,14 +148,174 @@ class NetworkApiNovaFloatingIpTests(test.APITestCase):
self.assertEqual(instance_id, ret)
class NetworkApiNeutronFloatingIpTests(test.APITestCase):
class NetworkApiNeutronTestBase(test.APITestCase):
def setUp(self):
super(NetworkApiNeutronFloatingIpTests, self).setUp()
super(NetworkApiNeutronTestBase, self).setUp()
self.mox.StubOutWithMock(api.base, 'is_service_enabled')
api.base.is_service_enabled(IsA(http.HttpRequest), 'network') \
.AndReturn(True)
self.qclient = self.stub_neutronclient()
class NetworkApiNeutronSecurityGroupTests(NetworkApiNeutronTestBase):
def setUp(self):
super(NetworkApiNeutronSecurityGroupTests, self).setUp()
self.sg_dict = dict([(sg['id'], sg['name']) for sg
in self.api_q_secgroups.list()])
def _cmp_sg_rule(self, exprule, retrule):
self.assertEqual(exprule['id'], retrule.id)
self.assertEqual(exprule['security_group_id'],
retrule.parent_group_id)
self.assertEqual(exprule['direction'], retrule.direction)
self.assertEqual(exprule['ethertype'], retrule.ethertype)
self.assertEqual(exprule['port_range_min'], retrule.from_port)
self.assertEqual(exprule['port_range_max'], retrule.to_port)
if (exprule['remote_ip_prefix'] is None and
exprule['remote_group_id'] is None):
expcidr = ('::/0' if exprule['ethertype'] == 'IPv6'
else '0.0.0.0/0')
else:
expcidr = exprule['remote_ip_prefix']
self.assertEqual(expcidr, retrule.ip_range.get('cidr'))
self.assertEqual(self.sg_dict.get(exprule['remote_group_id']),
retrule.group.get('name'))
def _cmp_sg(self, exp_sg, ret_sg):
self.assertEqual(exp_sg['id'], ret_sg.id)
self.assertEqual(exp_sg['name'], ret_sg.name)
exp_rules = exp_sg['security_group_rules']
self.assertEqual(len(exp_rules), len(ret_sg.rules))
for (exprule, retrule) in itertools.izip(exp_rules, ret_sg.rules):
self._cmp_sg_rule(exprule, retrule)
def test_security_group_list(self):
sgs = self.api_q_secgroups.list()
tenant_id = self.request.user.tenant_id
# use deepcopy to ensure self.api_q_secgroups is not modified.
self.qclient.list_security_groups(tenant_id=tenant_id) \
.AndReturn({'security_groups': copy.deepcopy(sgs)})
self.mox.ReplayAll()
rets = api.network.security_group_list(self.request)
self.assertEqual(len(sgs), len(rets))
for (exp, ret) in itertools.izip(sgs, rets):
self._cmp_sg(exp, ret)
def test_security_group_get(self):
secgroup = self.api_q_secgroups.first()
sg_ids = set([secgroup['id']] +
[rule['remote_group_id'] for rule
in secgroup['security_group_rules']
if rule['remote_group_id']])
related_sgs = [sg for sg in self.api_q_secgroups.list()
if sg['id'] in sg_ids]
# use deepcopy to ensure self.api_q_secgroups is not modified.
self.qclient.show_security_group(secgroup['id']) \
.AndReturn({'security_group': copy.deepcopy(secgroup)})
self.qclient.list_security_groups(id=sg_ids, fields=['id', 'name']) \
.AndReturn({'security_groups': related_sgs})
self.mox.ReplayAll()
ret = api.network.security_group_get(self.request, secgroup['id'])
self._cmp_sg(secgroup, ret)
def test_security_group_create(self):
secgroup = self.api_q_secgroups.list()[1]
body = {'security_group':
{'name': secgroup['name'],
'description': secgroup['description']}}
self.qclient.create_security_group(body) \
.AndReturn({'security_group': copy.deepcopy(secgroup)})
self.mox.ReplayAll()
ret = api.network.security_group_create(self.request, secgroup['name'],
secgroup['description'])
self._cmp_sg(secgroup, ret)
def test_security_group_delete(self):
secgroup = self.api_q_secgroups.first()
self.qclient.delete_security_group(secgroup['id'])
self.mox.ReplayAll()
api.network.security_group_delete(self.request, secgroup['id'])
def test_security_group_rule_create(self):
sg_rule = [r for r in self.api_q_secgroup_rules.list()
if r['protocol'] == 'tcp' and r['remote_ip_prefix']][0]
sg_id = sg_rule['security_group_id']
secgroup = [sg for sg in self.api_q_secgroups.list()
if sg['id'] == sg_id][0]
post_rule = copy.deepcopy(sg_rule)
del post_rule['id']
del post_rule['tenant_id']
post_body = {'security_group_rule': post_rule}
self.qclient.create_security_group_rule(post_body) \
.AndReturn({'security_group_rule': copy.deepcopy(sg_rule)})
self.qclient.list_security_groups(id=set([sg_id]),
fields=['id', 'name']) \
.AndReturn({'security_groups': [copy.deepcopy(secgroup)]})
self.mox.ReplayAll()
ret = api.network.security_group_rule_create(
self.request, sg_rule['security_group_id'],
sg_rule['direction'], sg_rule['ethertype'], sg_rule['protocol'],
sg_rule['port_range_min'], sg_rule['port_range_max'],
sg_rule['remote_ip_prefix'], sg_rule['remote_group_id'])
self._cmp_sg_rule(sg_rule, ret)
def test_security_group_rule_delete(self):
sg_rule = self.api_q_secgroup_rules.first()
self.qclient.delete_security_group_rule(sg_rule['id'])
self.mox.ReplayAll()
api.network.security_group_rule_delete(self.request, sg_rule['id'])
def _get_instance(self, cur_sg_ids):
instance_port = [p for p in self.api_ports.list()
if p['device_owner'].startswith('compute:')][0]
instance_id = instance_port['device_id']
# Emulate an intance with two ports
instance_ports = []
for _i in range(2):
p = copy.deepcopy(instance_port)
p['id'] = str(uuid.uuid4())
p['security_groups'] = cur_sg_ids
instance_ports.append(p)
return (instance_id, instance_ports)
def test_server_security_groups(self):
cur_sg_ids = [sg['id'] for sg in self.api_q_secgroups.list()[:2]]
instance_id, instance_ports = self._get_instance(cur_sg_ids)
self.qclient.list_ports(device_id=instance_id) \
.AndReturn({'ports': instance_ports})
secgroups = copy.deepcopy(self.api_q_secgroups.list())
self.qclient.list_security_groups(id=set(cur_sg_ids)) \
.AndReturn({'security_groups': secgroups})
self.mox.ReplayAll()
ret = api.network.server_security_groups(self.request, instance_id)
def test_server_update_security_groups(self):
cur_sg_ids = [self.api_q_secgroups.first()['id']]
new_sg_ids = [sg['id'] for sg in self.api_q_secgroups.list()[:2]]
instance_id, instance_ports = self._get_instance(cur_sg_ids)
self.qclient.list_ports(device_id=instance_id) \
.AndReturn({'ports': instance_ports})
for p in instance_ports:
body = {'port': {'security_groups': new_sg_ids}}
self.qclient.update_port(p['id'], body=body).AndReturn({'port': p})
self.mox.ReplayAll()
ret = api.network.server_update_security_groups(
self.request, instance_id, new_sg_ids)
def test_security_group_backend(self):
self.mox.ReplayAll()
self.assertEqual(api.network.security_group_backend(self.request),
'neutron')
class NetworkApiNeutronFloatingIpTests(NetworkApiNeutronTestBase):
def test_floating_ip_pools_list(self):
search_opts = {'router:external': True}
ext_nets = [n for n in self.api_networks.list()

View File

@ -13,6 +13,7 @@
# under the License.
import copy
import uuid
from openstack_dashboard.api.lbaas import Member
from openstack_dashboard.api.lbaas import Pool
@ -23,6 +24,8 @@ from openstack_dashboard.api.neutron import FloatingIp
from openstack_dashboard.api.neutron import Network
from openstack_dashboard.api.neutron import Port
from openstack_dashboard.api.neutron import Router
from openstack_dashboard.api.neutron import SecurityGroup
from openstack_dashboard.api.neutron import SecurityGroupRule
from openstack_dashboard.api.neutron import Subnet
from .utils import TestDataContainer
@ -35,6 +38,8 @@ def data(TEST):
TEST.ports = TestDataContainer()
TEST.routers = TestDataContainer()
TEST.q_floating_ips = TestDataContainer()
TEST.q_secgroups = TestDataContainer()
TEST.q_secgroup_rules = TestDataContainer()
TEST.pools = TestDataContainer()
TEST.vips = TestDataContainer()
TEST.members = TestDataContainer()
@ -46,6 +51,8 @@ def data(TEST):
TEST.api_ports = TestDataContainer()
TEST.api_routers = TestDataContainer()
TEST.api_q_floating_ips = TestDataContainer()
TEST.api_q_secgroups = TestDataContainer()
TEST.api_q_secgroup_rules = TestDataContainer()
TEST.api_pools = TestDataContainer()
TEST.api_vips = TestDataContainer()
TEST.api_members = TestDataContainer()
@ -252,6 +259,88 @@ def data(TEST):
TEST.api_q_floating_ips.add(fip_dict)
TEST.q_floating_ips.add(FloatingIp(fip_dict))
#------------------------------------------------------------
# security group
sec_group_1 = {'tenant_id': '1',
'description': 'default',
'id': 'faad7c80-3b62-4440-967c-13808c37131d',
'name': 'default'}
sec_group_2 = {'tenant_id': '1',
'description': 'NotDefault',
'id': '27a5c9a1-bdbb-48ac-833a-2e4b5f54b31d',
'name': 'other_group'}
sec_group_3 = {'tenant_id': '1',
'description': 'NotDefault',
'id': '443a4d7a-4bd2-4474-9a77-02b35c9f8c95',
'name': 'another_group'}
def add_rule_to_group(secgroup, default_only=True):
rule_egress_ipv4 = {
'id': str(uuid.uuid4()),
'direction': u'egress', 'ethertype': u'IPv4',
'port_range_min': None, 'port_range_max': None,
'protocol': None, 'remote_group_id': None,
'remote_ip_prefix': None,
'security_group_id': secgroup['id'],
'tenant_id': secgroup['tenant_id']}
rule_egress_ipv6 = {
'id': str(uuid.uuid4()),
'direction': u'egress', 'ethertype': u'IPv6',
'port_range_min': None, 'port_range_max': None,
'protocol': None, 'remote_group_id': None,
'remote_ip_prefix': None,
'security_group_id': secgroup['id'],
'tenant_id': secgroup['tenant_id']}
rule_tcp_80 = {
'id': str(uuid.uuid4()),
'direction': u'ingress', 'ethertype': u'IPv4',
'port_range_min': 80, 'port_range_max': 80,
'protocol': u'tcp', 'remote_group_id': None,
'remote_ip_prefix': u'0.0.0.0/0',
'security_group_id': secgroup['id'],
'tenant_id': secgroup['tenant_id']}
rule_icmp = {
'id': str(uuid.uuid4()),
'direction': u'ingress', 'ethertype': u'IPv4',
'port_range_min': 5, 'port_range_max': 8,
'protocol': u'icmp', 'remote_group_id': None,
'remote_ip_prefix': u'0.0.0.0/0',
'security_group_id': secgroup['id'],
'tenant_id': secgroup['tenant_id']}
rule_group = {
'id': str(uuid.uuid4()),
'direction': u'ingress', 'ethertype': u'IPv4',
'port_range_min': 80, 'port_range_max': 80,
'protocol': u'tcp', 'remote_group_id': sec_group_1['id'],
'remote_ip_prefix': None,
'security_group_id': secgroup['id'],
'tenant_id': secgroup['tenant_id']}
rules = []
if not default_only:
rules += [rule_tcp_80, rule_icmp, rule_group]
rules += [rule_egress_ipv4, rule_egress_ipv6]
secgroup['security_group_rules'] = rules
add_rule_to_group(sec_group_1, default_only=False)
add_rule_to_group(sec_group_2)
add_rule_to_group(sec_group_3)
groups = [sec_group_1, sec_group_2, sec_group_3]
sg_name_dict = dict([(sg['id'], sg['name']) for sg in groups])
for sg in groups:
# Neutron API
TEST.api_q_secgroups.add(sg)
for rule in sg['security_group_rules']:
TEST.api_q_secgroup_rules.add(copy.copy(rule))
# OpenStack Dashboard internaly API
TEST.q_secgroups.add(SecurityGroup(copy.deepcopy(sg), sg_name_dict))
for rule in sg['security_group_rules']:
TEST.q_secgroup_rules.add(
SecurityGroupRule(copy.copy(rule), sg_name_dict))
#------------------------------------------------------------
# LBaaS

View File

@ -0,0 +1,42 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from openstack_dashboard.test import helpers as test
from openstack_dashboard.utils.filters import get_int_or_uuid
class UtilsFilterTests(test.TestCase):
def test_accept_valid_integer(self):
val = 100
ret = get_int_or_uuid(val)
self.assertEqual(val, ret)
def test_accept_valid_integer_string(self):
val = '100'
ret = get_int_or_uuid(val)
self.assertEqual(int(val), ret)
def test_accept_valid_uuid(self):
val = str(uuid.uuid4())
ret = get_int_or_uuid(val)
self.assertEqual(val, ret)
def test_reject_random_string(self):
val = '55WbJTpJDf'
self.assertRaises(ValueError, get_int_or_uuid, val)

View File