Merge "Security Group support for Nova and Neutron"

This commit is contained in:
Jenkins 2017-03-31 12:06:23 +00:00 committed by Gerrit Code Review
commit 98af646b0c
11 changed files with 1561 additions and 31 deletions

View File

@ -114,6 +114,13 @@ class BaseHelper(object):
self.app = app
self.openstack_version = openstack_version
@staticmethod
def tenant_from_req(req):
try:
return req.environ["HTTP_X_PROJECT_ID"]
except KeyError:
raise exception.Forbidden(reason="Cannot find project ID")
def _get_req(self, req, method,
path=None,
content_type="application/json",
@ -198,17 +205,21 @@ class BaseHelper(object):
return self._get_req(req, path=path,
query_string=query_string, method="GET")
def _make_create_request(self, req, resource, parameters):
def _make_create_request(self, req, resource, parameters,
resource_object_name=None):
"""Create CREATE request
This method creates a CREATE Request instance
:param req: the incoming request
:param parameters: parameters with values
:param resource_object_name: in case resource name is different
to the response one.
"""
path = "/%s" % resource
single_resource = resource[:-1]
body = utils.make_body(single_resource, parameters)
if not resource_object_name:
resource_object_name = resource[:-1]
body = utils.make_body(resource_object_name, parameters)
return self._get_req(req, path=path,
content_type="application/json",
body=json.dumps(body), method="POST")
@ -245,13 +256,6 @@ class OpenStackHelper(BaseHelper):
}
}
@staticmethod
def tenant_from_req(req):
try:
return req.environ["HTTP_X_PROJECT_ID"]
except KeyError:
raise exception.Forbidden(reason="Cannot find project ID")
def _get_index_req(self, req):
tenant_id = self.tenant_from_req(req)
path = "/%s/servers" % tenant_id
@ -1009,3 +1013,237 @@ class OpenStackHelper(BaseHelper):
os_req = self._get_req(req, path=path, method="DELETE")
os_req.get_response(self.app)
return []
def _get_security_group(self, req, sec_id):
"""Retrieve info about a security group.
:param req: the incoming request
:param sec_id: security group id to show
"""
path = "os-security-groups"
tenant_id = self.tenant_from_req(req)
path = "/%s/%s/%s" % (tenant_id, path, sec_id)
os_req = self._get_req(req, path=path,
method="GET")
response = os_req.get_response(self.app)
return self.get_from_response(response, "security_group", [])
def get_security_group_details(self, req, sec_id):
"""Get details about a security group.
:param req: the incoming request
:param sec_id: security group id to show
"""
net = self._get_security_group(req, sec_id)
ooi_sec = os_helpers.build_security_group_from_nova([net])
return ooi_sec[0]
def list_security_groups(self, req):
"""List security groups
:param req: the incoming request
"""
path = "os-security-groups"
tenant_id = self.tenant_from_req(req)
path = "/%s/%s" % (tenant_id, path)
os_req = self._get_req(req, path=path,
method="GET")
response = os_req.get_response(self.app)
sec = self.get_from_response(response, "security_groups", [])
ooi_sec = os_helpers.build_security_group_from_nova(sec)
return ooi_sec
def create_security_group(self, req, name, description, rules):
"""Create security group
:param req: the incoming request
:param name: security group name
:param description: security group description
:param rules: security group rules
"""
try:
tenant_id = self.tenant_from_req(req)
path = "os-security-groups"
path = "/%s/%s" % (tenant_id, path)
param_group = {
"description": description,
"name": name,
}
body = utils.make_body('security_group', param_group)
os_req = self._get_req(req,
path=path,
content_type="application/json",
body=json.dumps(body),
method="POST")
response_group = os_req.get_response(self.app)
secgroup = self.get_from_response(
response_group, "security_group", {})
sec_id = secgroup["id"]
secgroup["rules"] = []
for rule in rules:
port_min, port_max = os_helpers.security_group_rule_port(
rule["port"]
)
param_rules = {
"parent_group_id": sec_id,
"ip_protocol": rule["protocol"],
"from_port": port_min,
"to_port": port_max,
"cidr": rule.get("range", "0.0.0.0/0")
}
body_rules = utils.make_body('security_group_rule',
param_rules)
path = "/%s/os-security-group-rules" % (tenant_id)
os_req_rules = self._get_req(req,
path=path,
content_type="application/json",
body=json.dumps(body_rules),
method="POST")
response_rules = os_req_rules.get_response(self.app)
secrules = self.get_from_response(
response_rules, "security_group_rule", {})
secgroup["rules"].append(secrules)
ooi_sec = os_helpers.build_security_group_from_nova(
[secgroup]
)
return ooi_sec[0]
except Exception as ex:
raise ex
def delete_security_group(self, req, sec_id):
"""Delete info about a security group.
:param req: the incoming request
:param sec_id: security group id to delete
"""
path = "os-security-groups"
tenant_id = self.tenant_from_req(req)
path = "/%s/%s/%s" % (tenant_id, path, sec_id)
os_req = self._get_req(req, path=path,
method="DELETE")
os_req.get_response(self.app)
return []
def _get_server_security_group(self, req, server_id):
"""Get security group from a server
:param req: incoming request
:param server_id: server id
:return: information about the security group
"""
path = "os-security-groups"
tenant_id = self.tenant_from_req(req)
path = "/%s/servers/%s/%s" % (tenant_id,
server_id,
path
)
os_req = self._get_req(req, path=path,
method="GET")
response = os_req.get_response(self.app)
sec = self.get_from_response(response,
"security_groups", [])
ooi_sec = os_helpers.build_security_group_from_nova(sec)
return ooi_sec
def list_server_security_groups(self, req,
server_id=None):
"""List security groups associated to a server
:param req: incoming request
:param server_id: server id
:return: security groups associated to servers
"""
return self._get_server_security_group(
req, server_id)
def list_server_security_links(self, req, server_id=None):
"""List security groups associated to servers
:param req: incoming request
:param server_id: server id
:return: security groups associated to servers
"""
link_list = []
if server_id:
compute_list = [self.get_server(req, server_id)]
else:
compute_list = self.index(req)
for c in compute_list:
server_id = c["id"]
server_secgroups = self._get_server_security_group(
req, server_id)
for sec in server_secgroups:
link = {
"compute_id": server_id,
"securitygroup": sec
}
link_list.append(link)
return link_list
def get_server_security_link(self, req, server_id,
securitygroup_id):
"""Show security group link from a server
:param req: incoming request
:param server_id: server id
:param securitygroup_id: security group id
:return: information about the link of security group
"""
ooi_sec = self._get_server_security_group(req, server_id)
for sg in ooi_sec:
if sg["id"] == securitygroup_id:
link = {"compute_id": server_id,
"securitygroup": sg
}
return [link]
return None
def delete_server_security_link(self, req, server_id,
securitygroup_id):
"""Delete security group link from a server
:param req: incoming request
:param server_id: server id
:param securitygroup_id: segurity group id
:return: empty
"""
tenant_id = self.tenant_from_req(req)
path = "/%s/servers/%s/action" % (tenant_id, server_id)
sg = self._get_security_group(req, securitygroup_id)
if "name" not in sg:
raise exception.NotFound("Security group %s not found."
% securitygroup_id)
param = {"name": sg["name"]}
body = utils.make_body('removeSecurityGroup', param)
os_req = self._get_req(req,
path=path,
content_type="application/json",
body=json.dumps(body),
method="POST")
os_req.get_response(self.app)
return []
def create_server_security_link(self, req, server_id,
securitygroup_id):
"""Create security group link in a server
:param req: incoming request
:param server_id: server id
:param securitygroup_id: segurity group id
:return: empty
"""
tenant_id = self.tenant_from_req(req)
path = "/%s/servers/%s/action" % (tenant_id, server_id)
sg = self._get_security_group(req, securitygroup_id)
if "name" not in sg:
raise exception.NotFound("Security group %s not found."
% securitygroup_id)
param = {"name": sg["name"]}
body = utils.make_body('addSecurityGroup', param)
os_req = self._get_req(req,
path=path,
content_type="application/json",
body=json.dumps(body),
method="POST")
os_req.get_response(self.app)
return []

View File

@ -539,3 +539,91 @@ class OpenStackNeutron(helpers.BaseHelper):
response = os_req.get_response()
if response.status_int != 202:
raise helpers.exception_from_response(response)
def get_security_group_details(self, req, sec_id):
"""Get info about a security group.
:param req: the incoming request
:param sec_id: security group id to show
"""
try:
secgroup = self.get_resource(req, 'security-groups', sec_id,
response_resource="security_group")
ooi_sec = os_helpers.build_security_group_from_neutron(
[secgroup]
)
return ooi_sec[0]
except Exception:
raise exception.NotFound()
def list_security_groups(self, req):
"""List security groups
:param req: the incoming request
"""
try:
secgroup = self.list_resources(req, 'security-groups',
response_resource="security_groups")
ooi_sec = os_helpers.build_security_group_from_neutron(
secgroup
)
return ooi_sec
except Exception:
raise exception.NotFound()
def create_security_group(self, req, name, description, rules):
"""Create security group
:param req: the incoming request
:param name: security group name
:param description: security group description
:param rules: security group rules
"""
try:
tenant_id = self.tenant_from_req(req)
param_group = {"tenant_id": tenant_id,
"description": description,
"name": name,
}
secgroup = self.create_resource(
req, 'security-groups', param_group,
response_resource="security_group")
sec_id = secgroup["id"]
secgroup["security_group_rules"] = []
for rule in rules:
port_min, port_max = os_helpers.security_group_rule_port(
rule["port"]
)
param_rule = {
"ethertype": rule.get("ipversion", "IPv4"),
"port_range_max": port_max,
"port_range_min": port_min,
"direction": os_helpers.security_group_rule_type(
rule["type"]),
"remote_ip_prefix": rule.get("range", "0.0.0.0/0"),
"protocol": rule["protocol"],
"security_group_id": sec_id,
}
secrule = self.create_resource(
req,
'security-group-rules', param_rule,
response_resource="security_group_rule")
secgroup["security_group_rules"].append(secrule)
ooi_sec = os_helpers.build_security_group_from_neutron(
[secgroup]
)
return ooi_sec[0]
except Exception as ex:
raise ex
def delete_security_group(self, req, sec_id):
"""Delete info about a security group.
:param req: the incoming request
:param sec_id: security group id to delete
"""
try:
secgroup = self.delete_resource(req, 'security-groups', sec_id)
return secgroup
except Exception:
raise exception.NotFound()

192
ooi/api/securitygroup.py Normal file
View File

@ -0,0 +1,192 @@
# Copyright 2015 LIP - INDIGO-DataCloud
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from ooi.api import base
from ooi.api import helpers
from ooi.api import helpers_neutron
from ooi import exception
from ooi.occi.core import collection
from ooi.occi.infrastructure import securitygroup
from ooi.occi import validator as occi_validator
def parse_validate_schema(req, scheme=None,
required_attr=None):
"""Parse attributes and validate scheme
Returns attributes from request
If scheme is specified, it validates the OCCI scheme:
-Raises exception in case of being invalid
:param req: request
:param: scheme: scheme to validate
:param: required_attr: attributes required
"""
parser = req.get_parser()(req.headers, req.body.decode("utf8"))
if scheme:
attributes = parser.parse()
validator = occi_validator.Validator(attributes)
validator.validate(scheme)
validator.validate_attributes(required_attr)
else:
attributes = parser.parse_attributes(req.headers)
return attributes
def process_parameters(req, scheme=None,
required_attr=None):
"""Get attributes from request parameters
:param req: request
:param scheme: scheme to validate
:param required_attr: attributes required
"""
parameters = parse_validate_schema(req, scheme, required_attr)
try:
attributes = {}
if 'X_PROJECT_ID' in req.headers:
attributes["X_PROJECT_ID"] = req.headers["X_PROJECT_ID"]
if "attributes" in parameters:
for k, v in parameters.get("attributes", None).items():
attributes[k.strip()] = v
if not attributes:
attributes = None
except Exception:
raise exception.Invalid
return attributes
class Controller(base.Controller):
def __init__(self, app=None, openstack_version=None,
neutron_ooi_endpoint=None):
"""Security group controller initialization
:param app: application
:param: openstack_version: nova version
:param: neutron_ooi_endpoint: This parameter
indicates the Neutron endpoint to load the Neutron Helper.
If it is None, Nova api is used.
"""
super(Controller, self).__init__(
app=app,
openstack_version=openstack_version)
if neutron_ooi_endpoint:
self.os_helper = helpers_neutron.OpenStackNeutron(
neutron_ooi_endpoint
)
else:
self.os_helper = helpers.OpenStackHelper(
self.app,
self.openstack_version
)
@staticmethod
def _get_security_group_resources(securitygroup_list):
"""Create OCCI security group instances from list
:param securitygroup_list: security group objects
provides by the cloud infrastructure
:return occi security group list
"""
occi_securitygroup_resources = []
if securitygroup_list:
for s in securitygroup_list:
s_rules = s['rules']
s_id = s["id"]
s_name = s["title"]
s_summary = s["summary"]
s = securitygroup.SecurityGroupResource(title=s_name,
id=s_id,
rules=s_rules,
summary=s_summary)
occi_securitygroup_resources.append(s)
return occi_securitygroup_resources
def index(self, req):
"""List security groups
:param req: request object
"""
occi_sc = self.os_helper.list_security_groups(req)
occi_sc_resources = self._get_security_group_resources(
occi_sc)
return collection.Collection(
resources=occi_sc_resources)
def show(self, req, id):
"""Get security group details
:param req: request object
:param id: security group identification
"""
resp = self.os_helper.get_security_group_details(req, id)
occi_sc_resources = self._get_security_group_resources(
[resp])
return occi_sc_resources[0]
def create(self, req, body=None):
"""Create a network instance in the cloud
:param req: request object
:param body: body request (not used)
"""
scheme = {
"category": securitygroup.SecurityGroupResource.kind,
}
required = ["occi.core.title",
"occi.securitygroup.rules"
]
attributes = process_parameters(req, scheme, required)
name = attributes.get('occi.core.title')
description = attributes.get("occi.core.summary", "")
try:
rules = attributes.get('occi.securitygroup.rules')
except Exception:
raise exception.Invalid(
"Bad JSON format for occi.securitygroup.rules: %s"
% attributes.get(
'occi.securitygroup.rules'))
sec = self.os_helper.create_security_group(
req, name, description, rules
)
occi_sec_resources = self._get_security_group_resources([sec])
return collection.Collection(
resources=occi_sec_resources)
def delete(self, req, id):
"""delete security groups which satisfy the parameters
:param req: current request
:param id: identification
"""
response = self.os_helper.delete_security_group(req, id)
return response
def run_action(self, req, id, body):
"""Run action over the security group
:param req: current request
:param id: security group
:param body: body
"""
action = req.GET.get("action", None)
occi_actions = [a.term for a in
securitygroup.SecurityGroupResource.actions]
if action is None or action not in occi_actions:
raise exception.InvalidAction(action=action)
raise exception.NotImplemented("Security group actions are not implemented")

View File

@ -0,0 +1,144 @@
# Copyright 2015 LIP - INDIGO-DataCloud
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from ooi.api import base
import ooi.api.helpers
from ooi import exception
from ooi.occi.core import collection
from ooi.occi.infrastructure import compute
from ooi.occi.infrastructure import securitygroup
from ooi.occi.infrastructure import securitygroup_link
from ooi.occi import validator as occi_validator
def _get_security_link_resources(link_list):
"""Create OCCI security group instances from a list
:param link_list: provides by the cloud infrastructure
"""
occi_secgropu_resources = []
if link_list:
for l in link_list:
compute_id = l['compute_id']
sec = l['securitygroup']
sec_id = sec.get("id")
sec_name = sec.get("title", "")
sec_rules = sec.get("rules", [])
s = securitygroup.SecurityGroupResource(title=sec_name,
id=sec_id,
rules=sec_rules)
c = compute.ComputeResource(title="Compute",
id=compute_id
)
link = securitygroup_link.SecurityGroupLink(source=c,
target=s)
occi_secgropu_resources.append(link)
return occi_secgropu_resources
class Controller(base.Controller):
def __init__(self, *args, **kwargs):
super(Controller, self).__init__(*args, **kwargs)
# TODO(jorgesece): add neutron controller to list securitygroups
self.os_helper = ooi.api.helpers.OpenStackHelper(
self.app,
self.openstack_version
)
def _get_attachment_from_id(self, req, attachment_id):
try:
server_id, security_id = attachment_id.split('_', 1)
return {"server_id": server_id,
"securitygroup_id": security_id}
except ValueError:
raise exception.LinkNotFound(link_id=attachment_id)
def index(self, req):
"""List security group links
:param req: request object
"""
link_list = self.os_helper.list_server_security_links(req)
occi_link_resources = _get_security_link_resources(link_list)
return collection.Collection(resources=occi_link_resources)
def show(self, req, id):
"""Get security group details
:param req: request object
:param id: security group identification
"""
try:
link_info = self._get_attachment_from_id(req, id)
server_id = link_info["server_id"]
security_name = link_info["securitygroup_id"]
link = self.os_helper.get_server_security_link(
req, server_id, security_name
)
occi_instance = _get_security_link_resources(
link
)[0]
return occi_instance
except Exception:
raise exception.LinkNotFound(link_id=id)
def create(self, req, body=None):
"""Create a security group link
Creates a link between a server and a securitygroup.
:param req: request object
:param body: body request (not used)
"""
parser = req.get_parser()(req.headers, req.body)
scheme = {
"category": securitygroup_link.SecurityGroupLink.kind,
}
obj = parser.parse()
validator = occi_validator.Validator(obj)
validator.validate(scheme)
attrs = obj.get("attributes", {})
_, securitygroup_id = ooi.api.helpers.get_id_with_kind(
req,
attrs.get("occi.core.target"),
securitygroup.SecurityGroupResource.kind)
_, server_id = ooi.api.helpers.get_id_with_kind(
req,
attrs.get("occi.core.source"),
compute.ComputeResource.kind)
self.os_helper.create_server_security_link(
req, server_id,
securitygroup_id)
link = {"compute_id": server_id,
"securitygroup": {"id": securitygroup_id}
}
occi_instance = _get_security_link_resources([link])
return collection.Collection(resources=occi_instance)
def delete(self, req, id):
"""Delete security group link
:param req: current request
:param id: identification
"""
link_info = self._get_attachment_from_id(req, id)
server_id = link_info["server_id"]
security_id = link_info["securitygroup_id"]
try:
self.os_helper.delete_server_security_link(
req, server_id, security_id)
except Exception:
raise exception.LinkNotFound(link_id=id)
return []

View File

@ -45,4 +45,119 @@ def network_status(neutron_status):
if neutron_status == "ACTIVE":
return "active"
else:
return "inactive"
return "inactive"
def security_group_rule_type(neutron_type):
"""Translate neutron rule type.
Translate to/from openstack - occi
:param neutron_type: neutron status
"""
if neutron_type == "ingress":
return "inbound"
elif neutron_type == "egress":
return "outbound"
elif neutron_type == "inbound":
return "ingress"
elif neutron_type == "outbound":
return "egress"
else:
return None
def security_group_rule_port(os_port):
"""Translate openstack rule port
Translate to/from openstack - occi
:param neutron_type: neutron status
"""
ports = str(os_port).split('-')
if ports.__len__() == 1:
port_min = port_max = ports[0]
elif ports.__len__() == 2:
port_min, port_max = ports
else:
raise Exception("Port value")
return port_min, port_max
def build_security_group_from_neutron(sec_groups):
"""Translate neutron security group
Translate to the ooi a standard security group format.
:param sec_groups: array of security groups
"""
sec_list = []
for sec in sec_groups:
ooi_sec = {}
rules_list = []
ooi_sec["id"] = sec["id"]
ooi_sec["title"] = sec.get("name", None)
ooi_sec["summary"] = sec.get("description", "")
for rule in sec["security_group_rules"]:
ipversion = rule.get("ethertype", "IPv4")
rule_type = security_group_rule_type(
rule["direction"]
)
rule_protocol = rule.get("protocol", None)
port_min = rule["port_range_min"]
port_max = rule["port_range_max"]
if port_min and (port_min != port_max):
rule_port = "%s-%s" % (port_min,
port_max
)
else:
rule_port = port_min
rule_range = str(rule["remote_ip_prefix"])
rules_list.append({"type": rule_type,
"protocol": rule_protocol,
"port": rule_port,
"range": rule_range,
"ipversion": ipversion}
)
ooi_sec["rules"] = rules_list
sec_list.append(ooi_sec)
return sec_list
def build_security_group_from_nova(sec_groups):
"""Translate nova security group
Translate to the ooi a standard security group format.
:param sec_groups: array of security groups
"""
sec_list = []
for sec in sec_groups:
ooi_sec = {}
rules_list = []
ooi_sec["id"] = sec["id"]
ooi_sec["title"] = sec.get("name", None)
ooi_sec["summary"] = sec.get("description", "")
for rule in sec["rules"]:
ipversion = "IPv4"
rule_protocol = rule.get("ip_protocol", None)
port_min = rule["from_port"]
port_max = rule["to_port"]
if port_min and (port_min != port_max):
rule_port = "%s-%s" % (port_min,
port_max
)
else:
rule_port = port_min
rule_range = str(rule["ip_range"].get("cidr", ""))
# BUG(jorgesce): type is alwayns inbound because nova
# does not provide that information.
rules_list.append({"type": "inbound",
"protocol": rule_protocol,
"port": rule_port,
"range": rule_range,
"ipversion": ipversion}
)
ooi_sec["rules"] = rules_list
sec_list.append(ooi_sec)
return sec_list

View File

@ -194,6 +194,10 @@ servers = {
"flavor": {"id": flavors[1]["id"]},
"image": {"id": images["foo"]["id"]},
"status": "ACTIVE",
"security_groups":[
{"name": "group1"},
{"name": "group2"}
]
},
{
"id": uuid.uuid4().hex,
@ -201,6 +205,9 @@ servers = {
"flavor": {"id": flavors[2]["id"]},
"image": {"id": images["bar"]["id"]},
"status": "SHUTOFF",
"security_groups":[
{"name": "group1"}
]
},
{
"id": uuid.uuid4().hex,
@ -208,6 +215,9 @@ servers = {
"flavor": {"id": flavors[1]["id"]},
"image": {"id": images["bar"]["id"]},
"status": "ERROR",
"security_groups":[
{"name": "group2"}
]
},
],
tenants["bar"]["id"]: [],
@ -237,7 +247,10 @@ servers = {
"OS-EXT-IPS:type": "floating",
"OS-EXT-IPS-MAC:mac_addr": "1234"},
]
}
},
"security_groups":[
{"name": "group1"}
]
}
],
}
@ -265,6 +278,54 @@ volumes[tenants["baz"]["id"]][1]["attachments"] = [{
"id": volumes[tenants["baz"]["id"]][0]["id"],
}]
security_groups = {
tenants["foo"]["id"]: [],
tenants["baz"]["id"]: [
{
"name": "group1",
"id": uuid.uuid4().hex,
"description": "group one",
"rules": [
{"from_port": 443,
"to_port": 443, "ip_range": {"cidr": "10.0.0.0/32"},
"ip_protocol": "tcp"},
{"from_port": "1000",
"to_port": 2000, "ip_range": {"cidr": "11.0.0.0/32"},
"ip_protocol": "udp"},
]
},
{
"name": "group2",
"id": uuid.uuid4().hex,
"description": "group two",
"rules": [
{"from_port": 80,
"to_port": 80, "ip_range": {"cidr": "10.0.0.0/32"},
"ip_protocol": "tcp"},
{"from_port": "4000",
"to_port": 7000, "ip_range": {"cidr": "13.0.0.0/32"},
"ip_protocol": "udp"},
]
}
],
tenants["bar"]["id"]: [
{
"name": "group3",
"id": uuid.uuid4().hex,
"description": "group three",
"rules": [
{"from_port": 443,
"to_port": 443, "ip_range": {"cidr": "10.0.0.0/32"},
"ip_protocol": "tcp"},
{"from_port": "1000",
"to_port": 2000, "ip_range": {"cidr": "11.0.0.0/32"},
"ip_protocol": "udp"},
]
},
]
}
def fake_query_results():
cats = []
@ -485,8 +546,17 @@ class FakeApp(object):
"os-floating-ip-pools")
self._populate(path, "floating_ip", floating_ips[tenant["id"]],
"os-floating-ips")
self._populate_ports(path, servers[tenant["id"]],
ports[tenant["id"]])
self._populate_server_links(path, "os-interface",
"interfaceAttachments",
servers[tenant["id"]],
ports[tenant["id"]])
self._populate_server_links(path, "os-security-groups",
"security_groups",
servers[tenant["id"]],
security_groups[tenant["id"]])
self._populate(path, "security_group",
security_groups[tenant["id"]],
"os-security-groups")
# NOTE(aloga): dict_values un Py3 is not serializable in JSON
self._populate(path, "image", list(images.values()))
self._populate(path, "flavor", list(flavors.values()))
@ -530,16 +600,20 @@ class FakeApp(object):
self.routes[obj_path] = create_fake_json_resp(
{"volumeAttachment": attach})
def _populate_ports(self, path, servers_list, ports_list):
def _populate_server_links(self, path, resource, obj,
servers_list, link_list):
if servers_list:
for p in ports_list:
for s in servers_list:
list_obj = []
path_base = "%s/servers/%s/%s" % (
path,
servers_list[0]["id"],
"os-interface"
s["id"],
resource
)
for l in link_list:
list_obj.append(l)
self.routes[path_base] = create_fake_json_resp(
{"interfaceAttachments": [p]})
{obj: list_obj})
@webob.dec.wsgify()
def __call__(self, req):
@ -619,7 +693,9 @@ class FakeApp(object):
body = req.json_body.copy()
action = body.popitem()
if action[0] in ["os-start", "os-stop", "reboot",
"addFloatingIp", "removeFloatingIp"]:
"addFloatingIp", "removeFloatingIp",
"removeSecurityGroup",
"addSecurityGroup"]:
return self._get_from_routes(req)
elif req.path_info.endswith("os-volume_attachments"):
return self._do_create_attachment(req)

View File

@ -29,6 +29,8 @@ application_url = "https://foo.example.org:8774/ooiv1"
tenants = {
"foo": {"id": uuid.uuid4().hex,
"name": "foo"},
"baz": {"id": uuid.uuid4().hex,
"name": "foo"},
"bar": {"id": uuid.uuid4().hex,
"name": "bar"},
"public": {"id": uuid.uuid4().hex,
@ -153,6 +155,40 @@ network_links = {
],
}
security_groups = {
tenants["bar"]["id"]: [],
tenants["foo"]["id"]: [],
tenants["baz"]["id"]: [
{
"name": "group1",
"id": uuid.uuid4().hex,
"description": "group one",
"security_group_rules": [
{"ethertype": "IPv4", "port_range_min": 443,
"port_range_max": 443, "remote_ip_prefix": "10.0.0.0/32",
"protocol": "tcp", "direction": "ingress"},
{"ethertype": "IPv4", "port_range_min": "8000",
"port_range_max": 9000, "remote_ip_prefix": "11.0.0.0/24",
"protocol": "udp", "direction": "egress"}
]
},
{
"name": "group2",
"id": uuid.uuid4().hex,
"description": "group two",
"security_group_rules": [
{"ethertype": "IPv4", "port_range_min": 80,
"port_range_max": 80, "remote_ip_prefix": "10.0.0.0/32",
"protocol": "tcp", "direction": "ingress"},
{"ethertype": "IPv4", "port_range_min": "5000",
"port_range_max": 6000, "remote_ip_prefix": "11.0.0.0/24",
"protocol": "udp", "direction": "egress"}
]
}
]
}
def create_fake_json_resp(data, status=200):
r = webob.Response()
@ -201,6 +237,22 @@ def create_header(params, schemes, project=None):
return headers
def create_req_json_occi(params, category, method="POST"):
headers = create_headers(category,
content_type="application/occi+json")
body = {}
for c in category:
body["kind"] = "%s%s" % (
c.scheme, c.term)
body["attributes"] = params
req = webob.Request.blank(path="")
req.headers = headers
req.method = method
req.body = json.dumps(body).encode("utf8")
return wsgi.Request(req.environ)
def create_req_test_occi(params, category):
headers = create_header_occi(params, category)
req = webob.Request.blank(path="")
@ -209,7 +261,7 @@ def create_req_test_occi(params, category):
def create_header_occi(params, category, project=None):
headers = {}
headers = create_headers(category, project)
att = ""
if params is not None:
for k, v in params.items():
@ -218,6 +270,13 @@ def create_header_occi(params, category, project=None):
else:
att = "%s, %s=%s" % (att, k, v)
headers["X_OCCI_Attribute"] = att
return headers
def create_headers(category, content_type=None,
project=None):
headers = {}
if category is not None:
cat = ""
for c in category:
@ -227,6 +286,8 @@ def create_header_occi(params, category, project=None):
headers['Category'] = cat[:-1]
if project is not None:
headers['X_PROJECT_ID'] = project
if content_type is not None:
headers['Content-Type'] = content_type
return headers
@ -376,3 +437,35 @@ def build_occi_nova(network):
for l in links:
result.append(("Link", l))
return result
def build_occi_securitygroup(secgroup):
name = secgroup["title"]
secgroup_id = secgroup["id"]
rules = secgroup["rules"]
summary = secgroup["summary"]
app_url = application_url
cats = []
cats.append('securitygroup; '
'scheme='
'"http://schemas.ogf.org/occi/infrastructure#";'
' class="kind"; title="securitygroup resource";'
' rel='
'"http://schemas.ogf.org/occi/core#resource";'
' location="%s/securitygroup/"' % app_url)
links = []
attrs = [
'occi.core.id="%s"' % secgroup_id,
'occi.core.title="%s"' % name,
'occi.core.summary="%s"' % summary,
'occi.securitygroup.rules="%s"' % json.dumps(rules).replace('"', "'"),
]
result = []
for c in cats:
result.append(("Category", c))
for a in attrs:
result.append(("X-OCCI-Attribute", a))
for l in links:
result.append(("Link", l))
return result

View File

@ -17,8 +17,10 @@ import uuid
import mock
from ooi.api import helpers
from ooi.api import helpers_neutron
from ooi import exception
from ooi.openstack import helpers as openstack_helper
from ooi.tests import base
from ooi.tests import fakes_network as fakes
from ooi import utils
@ -593,3 +595,80 @@ class TestNetOpenStackHelper(base.TestCase):
self.helper.delete_port,
None,
iface)
@mock.patch.object(helpers_neutron.OpenStackNeutron, "create_resource")
@mock.patch.object(helpers.BaseHelper, "tenant_from_req")
def test_create_security_groups(self, m_tenant, m_create):
tenant_id = fakes.tenants["baz"]["id"]
sec_group = fakes.security_groups[tenant_id][0]
expected = openstack_helper.build_security_group_from_neutron(
[sec_group])[0]
m_tenant.return_value = uuid.uuid4().hex
group_info = {"name": sec_group["name"], "id": sec_group["id"],
"description": sec_group["description"]}
rules_out_1 = sec_group["security_group_rules"][0]
rules_out_2 = sec_group["security_group_rules"][1]
m_create.side_effect = [group_info, rules_out_1, rules_out_2]
ret = self.helper.create_security_group(None, expected["title"],
expected["summary"],
expected["rules"])
self.assertEqual(expected, ret)
self.assertEqual(3, m_create.call_count)
@mock.patch.object(helpers_neutron.OpenStackNeutron, "list_resources")
def test_list_security_group(self, m_list):
tenant_id = fakes.tenants["baz"]["id"]
sec_group = fakes.security_groups[tenant_id]
expected = openstack_helper.build_security_group_from_neutron(
sec_group)
m_list.return_value = sec_group
ret = self.helper.list_security_groups(None)
self.assertEqual(2, ret.__len__())
self.assertEqual(expected, ret)
m_list.assert_called_with(None, 'security-groups',
response_resource="security_groups")
@mock.patch.object(helpers_neutron.OpenStackNeutron, "list_resources")
def test_list_security_group_empty(self, m_list):
tenant_id = fakes.tenants["bar"]["id"]
m_list.return_value = fakes.security_groups[tenant_id]
ret = self.helper.list_security_groups(None)
self.assertEqual(0, ret.__len__())
m_list.assert_called_with(None, 'security-groups',
response_resource="security_groups")
@mock.patch.object(helpers_neutron.OpenStackNeutron, "get_resource")
def test_show_security_group(self, m_list):
tenant_id = fakes.tenants["baz"]["id"]
sec_group = fakes.security_groups[tenant_id][0]
expected = openstack_helper.build_security_group_from_neutron(
[sec_group])[0]
list_sec = sec_group
m_list.return_value = list_sec
ret = self.helper.get_security_group_details(None, None)
self.assertEqual(expected, ret)
m_list.assert_called_with(None, 'security-groups', None,
response_resource="security_group")
@mock.patch.object(helpers_neutron.OpenStackNeutron, "get_resource")
def test_show_security_group_not_found(self, m_list):
m_list.return_value = []
self.assertRaises(exception.NotFound,
self.helper.get_security_group_details,
None,
None)
@mock.patch.object(helpers_neutron.OpenStackNeutron, "delete_resource")
def test_delete_security_group(self, m_list):
m_list.return_value = None
ret = self.helper.delete_security_group(None, None)
self.assertIsNone(ret)
m_list.assert_called_with(None, 'security-groups', None)
@mock.patch.object(helpers_neutron.OpenStackNeutron, "delete_resource")
def test_delete_security_group_not_found(self, m_list):
m_list.side_effect = exception.OCCIException()
self.assertRaises(exception.NotFound,
self.helper.delete_security_group,
None,
None)

View File

@ -18,8 +18,10 @@ import uuid
import mock
from ooi.api import helpers
from ooi.openstack import helpers as os_helpers
from ooi.tests import base
from ooi.tests import fakes_network as fakes
from ooi.tests import fakes as fakes_nova
from ooi.tests import fakes_network
from ooi import utils
@ -33,10 +35,12 @@ class TestNovaNetOpenStackHelper(base.TestCase):
@mock.patch.object(helpers.OpenStackHelper, "tenant_from_req")
def test_list_networks_with_public(self, m_t, m_rq):
id = uuid.uuid4().hex
resp = fakes.create_fake_json_resp({"networks": [{"id": id}]}, 200)
resp = fakes_network.create_fake_json_resp(
{"networks": [{"id": id}]},
200)
req_mock = mock.MagicMock()
req_mock.get_response.return_value = resp
resp_float = fakes.create_fake_json_resp(
resp_float = fakes_network.create_fake_json_resp(
{"floating_ip_pools": [{"id": id}]}, 200
)
req_mock_float = mock.MagicMock()
@ -49,10 +53,11 @@ class TestNovaNetOpenStackHelper(base.TestCase):
@mock.patch.object(helpers.OpenStackHelper, "tenant_from_req")
def test_list_networks_with_no_public(self, m_t, m_rq):
id = uuid.uuid4().hex
resp = fakes.create_fake_json_resp({"networks": [{"id": id}]}, 200)
resp = fakes_network.create_fake_json_resp(
{"networks": [{"id": id}]}, 200)
req_mock = mock.MagicMock()
req_mock.get_response.return_value = resp
resp_float = fakes.create_fake_json_resp(
resp_float = fakes_network.create_fake_json_resp(
{"floating_ip_pools": []}, 204
)
req_mock_float = mock.MagicMock()
@ -67,8 +72,9 @@ class TestNovaNetOpenStackHelper(base.TestCase):
id = uuid.uuid4().hex
tenant_id = uuid.uuid4().hex
m_t.return_value = tenant_id
resp = fakes.create_fake_json_resp({"networks": [{"id": id}]}, 200)
resp_float = fakes.create_fake_json_resp(
resp = fakes_network.create_fake_json_resp(
{"networks": [{"id": id}]}, 200)
resp_float = fakes_network.create_fake_json_resp(
{"floating_ip_pools": [{"id": id}]}, 200
)
req_mock = mock.MagicMock()
@ -103,7 +109,7 @@ class TestNovaNetOpenStackHelper(base.TestCase):
label = "network11"
tenant_id = uuid.uuid4().hex
m_t.return_value = tenant_id
resp = fakes.create_fake_json_resp(
resp = fakes_network.create_fake_json_resp(
{"network": {"id": id, "label": label,
"cidr": address,
"gateway": gateway}}, 200
@ -134,7 +140,7 @@ class TestNovaNetOpenStackHelper(base.TestCase):
"cidr": cidr,
"gateway": gateway
}
resp = fakes.create_fake_json_resp(
resp = fakes_network.create_fake_json_resp(
{"network": {"id": net_id, "label": name,
"cidr": cidr,
"gateway": gateway}}, 200
@ -173,4 +179,250 @@ class TestNovaNetOpenStackHelper(base.TestCase):
m_rq.assert_called_with(
None, method="DELETE",
path="/%s/os-networks/%s" % (tenant_id, net_id),
)
)
@mock.patch.object(helpers.OpenStackHelper, "_get_req")
@mock.patch.object(helpers.OpenStackHelper, "tenant_from_req")
def test_list_security_groups(self, m_t, m_rq):
tenant_id = fakes_nova.tenants["baz"]["id"]
m_t.return_value = tenant_id
sc_groups = fakes_nova.security_groups[tenant_id]
resp = fakes_network.create_fake_json_resp(
{"security_groups": sc_groups}, 200)
req_mock = mock.MagicMock()
req_mock.get_response.return_value = resp
m_rq.side_effect = [req_mock]
ret = self.helper.list_security_groups(None)
cont = 0
for sc in sc_groups:
self.assertEqual(sc['id'], ret[cont]['id'])
cont = cont + 1
self.assertEqual(
{'method': 'GET',
'path': '/%s/os-security-groups' % (tenant_id)},
m_rq.call_args_list[0][1]
)
@mock.patch.object(helpers.OpenStackHelper, "_get_req")
@mock.patch.object(helpers.OpenStackHelper, "tenant_from_req")
def test_delete_security_groups(self, m_t, m_rq):
tenant_id = fakes_nova.tenants["baz"]["id"]
m_t.return_value = tenant_id
sc_id = fakes_nova.security_groups[tenant_id][0]['id']
req_mock = mock.MagicMock()
req_mock.get_response.return_value = []
m_rq.return_value = req_mock
ret = self.helper.delete_security_group(None, sc_id)
self.assertEqual(ret, [])
m_rq.assert_called_with(
None, method="DELETE",
path="/%s/os-security-groups/%s" % (tenant_id, sc_id),
)
@mock.patch.object(helpers.OpenStackHelper, "_get_req")
@mock.patch.object(helpers.OpenStackHelper, "tenant_from_req")
def test_get_security_group(self, m_t, m_rq):
tenant_id = fakes_nova.tenants["baz"]["id"]
m_t.return_value = tenant_id
sc_group = fakes_nova.security_groups[tenant_id][0]
id = sc_group['id']
m_t.return_value = tenant_id
resp = fakes_network.create_fake_json_resp(
{"security_group": sc_group}, 200
)
req_mock = mock.MagicMock()
req_mock.get_response.return_value = resp
m_rq.return_value = req_mock
ret = self.helper.get_security_group_details(None, id)
self.assertEqual(sc_group['id'], ret["id"])
self.assertEqual(sc_group['description'], ret["summary"])
occi_os_group = os_helpers.build_security_group_from_nova(
fakes_nova.security_groups[tenant_id]
)[0]
cont = 0
for r in ret["rules"]:
self.assertEqual(
occi_os_group['rules'][cont]['protocol'], r["protocol"])
self.assertEqual(
occi_os_group['rules'][cont]['range'], r["range"])
self.assertEqual(
occi_os_group['rules'][cont]['port'], r["port"])
self.assertEqual(
occi_os_group['rules'][cont]['type'], r["type"])
cont += 1
m_rq.assert_called_with(
None, method="GET",
path="/%s/os-security-groups/%s" % (tenant_id, id),
)
@mock.patch.object(helpers.OpenStackHelper, "_get_req")
@mock.patch.object(helpers.OpenStackHelper, "tenant_from_req")
def test_create_security_group(self, m_t, m_rq):
tenant_id = fakes_nova.tenants["baz"]["id"]
m_t.return_value = tenant_id
sc_group = fakes_nova.security_groups[tenant_id][0]
occi_os_group = os_helpers.build_security_group_from_nova(
fakes_nova.security_groups[tenant_id]
)[0]
resp = fakes_network.create_fake_json_resp(
{"security_group": sc_group}, 200
)
req_mock = mock.MagicMock()
req_mock.get_response.return_value = resp
resp_rule1 = fakes_network.create_fake_json_resp(
{"security_group_rule": sc_group['rules'][0]}, 200
)
req_mock_rule1 = mock.MagicMock()
req_mock_rule1.get_response.return_value = resp_rule1
resp_rule2 = fakes_network.create_fake_json_resp(
{"security_group_rule": sc_group['rules'][1]}, 200
)
req_mock_rule2 = mock.MagicMock()
req_mock_rule2.get_response.return_value = resp_rule2
m_rq.side_effect = [req_mock, req_mock_rule1, req_mock_rule2]
ret = self.helper.create_security_group(
None,
name=occi_os_group['title'],
description=occi_os_group['summary'],
rules=occi_os_group['rules']
)
cont = 0
for r in ret["rules"]:
self.assertEqual(
occi_os_group['rules'][cont]['protocol'], r["protocol"]
)
self.assertEqual(
occi_os_group['rules'][cont]['range'], r["range"]
)
self.assertEqual(
occi_os_group['rules'][cont]['port'], r["port"])
self.assertEqual(
occi_os_group['rules'][cont]['type'], r["type"])
cont += 1
@mock.patch.object(helpers.OpenStackHelper, "_get_req")
@mock.patch.object(helpers.OpenStackHelper, "tenant_from_req")
def test_get_server_security_group(self, mock_tenant, mock_get):
tenant_id = fakes_nova.tenants["baz"]["id"]
server_id = uuid.uuid4().hex
sc_group = fakes_nova.security_groups[tenant_id]
mock_tenant.return_value = tenant_id
resp = fakes_network.create_fake_json_resp(
{"security_groups": sc_group}, 200
)
req_mock = mock.MagicMock()
req_mock.get_response.return_value = resp
mock_get.return_value = req_mock
ret = self.helper._get_server_security_group(None, server_id)
segroup = os_helpers.build_security_group_from_nova(
sc_group
)
cont = 0
for s in segroup:
self.assertEqual(s, ret[cont])
cont += 1
mock_get.assert_called_with(
None, method="GET",
path="/%s/servers/%s/os-security-groups" % (tenant_id,
server_id),
)
@mock.patch.object(helpers.OpenStackHelper, "index")
@mock.patch.object(helpers.OpenStackHelper, "_get_server_security_group")
def test_list_server_security_links(self, mock_get, mock_list):
tenant_id = fakes_nova.tenants["baz"]["id"]
servers = fakes_nova.servers[tenant_id]
mock_list.return_value = servers
sg = fakes_nova.security_groups[tenant_id]
segroup = os_helpers.build_security_group_from_nova(sg)[0]
mock_get.return_value = [segroup]
ret = self.helper.list_server_security_links(None)
cont = 0
for server in servers:
self.assertEqual(server["id"],
ret[cont]['compute_id'])
self.assertEqual(segroup["title"],
ret[cont]['securitygroup']["title"])
cont += 1
@mock.patch.object(helpers.OpenStackHelper, "_get_server_security_group")
def test_get_server_security_link(self, mock_get):
tenant_id = fakes_nova.tenants["baz"]["id"]
server_id = uuid.uuid4().hex
sg = fakes_nova.security_groups[tenant_id]
segroup = os_helpers.build_security_group_from_nova(sg)[0]
mock_get.return_value = [segroup]
ret = self.helper.get_server_security_link(None, server_id,
segroup["id"])
self.assertEqual(server_id,
ret[0]['compute_id'])
self.assertEqual(segroup["title"],
ret[0]['securitygroup']["title"])
@mock.patch.object(helpers.OpenStackHelper, "_get_req")
@mock.patch.object(helpers.OpenStackHelper, "tenant_from_req")
def test_delete_server_security_link(self, mock_tenant, mock_req):
tenant_id = fakes_nova.tenants["baz"]["id"]
server_id = uuid.uuid4().hex
sg_name = "baz"
mock_tenant.return_value = tenant_id
sc_group = fakes_nova.security_groups[tenant_id][0]
sg_name = sc_group["name"]
resp_get = fakes_network.create_fake_json_resp(
{"security_group": sc_group}, 200
)
req_mock_get = mock.MagicMock()
req_mock_get.get_response.return_value = resp_get
resp_cre = fakes_network.create_fake_json_resp(
{}, 204
)
req_mock_del = mock.MagicMock()
req_mock_del.get_response.return_value = resp_cre
mock_req.side_effect = [req_mock_get, req_mock_del]
ret = self.helper.delete_server_security_link(None,
server_id,
sg_name)
self.assertEqual([], ret)
mock_req.assert_called_with(
None, method="POST",
path="/%s/servers/%s/action" % (tenant_id,
server_id),
body='{"removeSecurityGroup": {"name": "%s"}}' % sg_name,
content_type='application/json'
)
@mock.patch.object(helpers.OpenStackHelper, "_get_req")
@mock.patch.object(helpers.OpenStackHelper, "tenant_from_req")
def test_create_server_security_link(self, mock_tenant, mock_req):
tenant_id = fakes_nova.tenants["baz"]["id"]
server_id = uuid.uuid4().hex
sg_id = "baz"
mock_tenant.return_value = tenant_id
sc_group = fakes_nova.security_groups[tenant_id][0]
resp_get = fakes_network.create_fake_json_resp(
{"security_group": sc_group}, 200
)
req_mock_get = mock.MagicMock()
req_mock_get.get_response.return_value = resp_get
resp_create = fakes_network.create_fake_json_resp(
{}, 204
)
req_mock_cre = mock.MagicMock()
req_mock_cre.get_response.return_value = resp_create
mock_req.side_effect = [req_mock_get, req_mock_cre]
ret = self.helper.create_server_security_link(None,
server_id,
sg_id)
self.assertEqual([], ret)
sg_name = sc_group["name"]
mock_req.assert_called_with(
None, method="POST",
path="/%s/servers/%s/action" % (tenant_id,
server_id),
body='{"addSecurityGroup": {"name": "%s"}}' % sg_name,
content_type='application/json'
)

View File

@ -0,0 +1,149 @@
# Copyright 2015 LIP - INDIGO-DataCloud
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from ooi.api import helpers_neutron
from ooi.api import securitygroup as security_group_api
from ooi import exception
from ooi.occi.infrastructure import securitygroup as occi_security_group
from ooi.openstack import helpers as openstack_helper
from ooi.tests import base
from ooi.tests import fakes_network as fakes
class TestSecurityGroupControllerNeutron(base.TestController):
def setUp(self):
super(TestSecurityGroupControllerNeutron, self).setUp()
self.controller = security_group_api.Controller(
neutron_ooi_endpoint="ff")
@mock.patch.object(helpers_neutron.OpenStackNeutron,
"list_security_groups")
def test_list_security_group(self, m_list):
tenant_id = fakes.tenants["baz"]["id"]
sec_group = openstack_helper.build_security_group_from_neutron(
fakes.security_groups[tenant_id]
)
req = fakes.create_req_test(None, None)
m_list.return_value = sec_group
result = self.controller.index(req)
expected = self.controller._get_security_group_resources(sec_group)
self.assertEqual(result.resources.__len__(),
expected.__len__())
for r in result.resources:
self.assertIsInstance(r, occi_security_group.SecurityGroupResource)
m_list.assert_called_with(req)
@mock.patch.object(helpers_neutron.OpenStackNeutron,
"list_security_groups")
def test_list_security_group_empty(self, m_list):
tenant_id = fakes.tenants["foo"]["id"]
sec_group = openstack_helper.build_security_group_from_neutron(
fakes.security_groups[tenant_id]
)
req = fakes.create_req_test(None, None)
m_list.return_value = sec_group
result = self.controller.index(req)
self.assertEqual(result.resources.__len__(), 0)
@mock.patch.object(helpers_neutron.OpenStackNeutron,
"get_security_group_details")
def test_show_security_group(self, m_list):
tenant_id = fakes.tenants["baz"]["id"]
sec_group = openstack_helper.build_security_group_from_neutron(
[fakes.security_groups[tenant_id][0]]
)
req = fakes.create_req_test(None, None)
m_list.return_value = sec_group[0]
result = self.controller.show(req, None)
expected = self.controller._get_security_group_resources(sec_group)[0]
self.assertIsInstance(
result,
occi_security_group.SecurityGroupResource)
self.assertEqual(result, expected)
m_list.assert_called_with(req, None)
@mock.patch.object(helpers_neutron.OpenStackNeutron, "get_resource")
def test_show_security_group_not_found(self, m_list):
tenant_id = fakes.tenants["baz"]["id"]
sec_group = openstack_helper.build_security_group_from_neutron(
fakes.security_groups[tenant_id]
)
m_list.return_value = sec_group
req = fakes.create_req_test(None, None)
self.assertRaises(exception.NotFound,
self.controller.show,
req,
None)
@mock.patch.object(helpers_neutron.OpenStackNeutron, "delete_resource")
def test_delete_security_group(self, m_list):
m_list.return_value = None
ret = self.controller.delete(None, None)
self.assertIsNone(ret)
m_list.assert_called_with(None, 'security-groups', None)
@mock.patch.object(helpers_neutron.OpenStackNeutron,
"delete_security_group")
def test_delete_security_group_not_found(self, m_list):
m_list.side_effect = exception.NotFound
req = fakes.create_req_test(None, None)
self.assertRaises(exception.NotFound,
self.controller.delete,
req,
None)
@mock.patch.object(helpers_neutron.OpenStackNeutron,
"create_security_group")
def test_create_security_groups(self, m_create):
tenant_id = fakes.tenants["baz"]["id"]
sec_group = openstack_helper.build_security_group_from_neutron(
fakes.security_groups[tenant_id]
)[0]
params = {"occi.core.title": sec_group["title"],
"occi.securitygroup.rules": sec_group["rules"],
"occi.core.summary": sec_group["summary"]
}
categories = {occi_security_group.SecurityGroupResource.kind}
req = fakes.create_req_json_occi(params, categories)
m_create.return_value = sec_group
ret = self.controller.create(req, params)
expected = self.controller._get_security_group_resources(
[sec_group])
self.assertIsInstance(ret.resources[0],
occi_security_group.SecurityGroupResource)
self.assertEqual(expected[0], ret.resources[0])
m_create.assert_called_with(req, sec_group["title"],
sec_group["summary"],
sec_group["rules"])
def test_create_error(self):
test_networks = fakes.networks[fakes.tenants["foo"]["id"]]
schema1 = occi_security_group.SecurityGroupResource.kind.scheme
net = test_networks[0]
schemes = {schema1: net}
parameters = {"occi.core.title": "name"}
req = fakes.create_req_test(parameters, schemes)
self.assertRaises(exception.Invalid, self.controller.create, req)
def test_create_invalid_param_rule(self):
params = {"occi.core.title": "group",
"occi.securitygroup.rules": "{'wrong': 'value'}]"
}
categories = {occi_security_group.SecurityGroupResource.kind}
req = fakes.create_req_test_occi(params, categories)
self.assertRaises(exception.Invalid, self.controller.create, req)

View File

@ -0,0 +1,104 @@
# Copyright 2015 Spanish National Research Council
# Copyright 2015 LIP - INDIGO-DataCloud
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
import mock
from ooi.api import helpers
from ooi.api import securitygroup_link as securitygroup_link_api
from ooi.occi.core import collection
from ooi.occi.infrastructure import securitygroup_link
from ooi.openstack import helpers as os_helpers
from ooi.tests import base
from ooi.tests import fakes as fakes_nova
class TestNetworkLinkController(base.TestController):
def setUp(self):
super(TestNetworkLinkController, self).setUp()
self.controller = securitygroup_link_api.Controller(
mock.MagicMock(), None)
@mock.patch.object(helpers.OpenStackHelper, "list_server_security_links")
def test_index(self, mock_list):
tenant_id = fakes_nova.tenants['bar']["id"]
servers = fakes_nova.servers[tenant_id]
sg = fakes_nova.security_groups[tenant_id]
segroup = os_helpers.build_security_group_from_nova(sg)[0]
links = []
for server in servers:
link = {
"compute_id": server["id"],
"securitygroup": segroup
}
links.append(link)
mock_list.return_value = links
ret = self.controller.index(None)
self.assertIsInstance(ret, collection.Collection)
@mock.patch.object(helpers.OpenStackHelper, "get_server_security_link")
@mock.patch.object(helpers.OpenStackHelper, "list_security_groups")
def test_show(self, mock_list, mock_get):
tenant_id = fakes_nova.tenants['baz']["id"]
server = fakes_nova.servers[tenant_id][0]
server_id = server['id']
secgroup_name = server['security_groups'][0]["name"]
link_id = '%s_%s' % (server_id, secgroup_name)
sec_group = os_helpers.build_security_group_from_nova(
fakes_nova.security_groups[tenant_id]
)
link = {
"compute_id": server_id,
"securitygroup": sec_group[0]
}
mock_get.return_value = [link]
mock_list.return_value = sec_group
ret = self.controller.show(None, link_id)
self.assertIsInstance(ret, securitygroup_link.SecurityGroupLink)
@mock.patch.object(helpers.OpenStackHelper, "delete_server_security_link")
def test_delete(self, mock_del):
tenant_id = fakes_nova.tenants['baz']["id"]
server = fakes_nova.servers[tenant_id][0]
server_id = server['id']
secgroup_name = server['security_groups'][0]["name"]
link_id = '%s_%s' % (server_id, secgroup_name)
mock_del.return_value = []
ret = self.controller.delete(None, link_id)
self.assertEqual([], ret)
@mock.patch.object(helpers.OpenStackHelper, "create_server_security_link")
@mock.patch("ooi.occi.validator.Validator")
@mock.patch("ooi.api.helpers.get_id_with_kind")
def test_create(self, m_get_id, m_validator, m_create):
compute_id = uuid.uuid4().hex
sec_id = uuid.uuid4().hex
obj = {
"attributes": {
"occi.core.target": sec_id,
"occi.core.source": compute_id
}
}
req = self._build_req(uuid.uuid4().hex)
req.get_parser = mock.MagicMock()
req.get_parser.return_value.return_value.parse.return_value = obj
m_validator.validate.return_value = True
m_get_id.side_effect = [('', compute_id), ('', sec_id)]
m_create.return_value = []
ret = self.controller.create(req, None)
link = ret.resources.pop()
self.assertIsInstance(link, securitygroup_link.SecurityGroupLink)