Reworked SecurityGroups to work without legacy EC2.

Supports both Nova and Neutron.

Change-Id: Id9dbc4b72f861e2b7663201c01bbbfdff6d4953d
This commit is contained in:
alevine 2014-11-28 22:04:42 +04:00
parent 3c400fb115
commit f1e627bb29
4 changed files with 493 additions and 115 deletions

View File

@ -13,11 +13,15 @@
# limitations under the License.
from neutronclient.common import exceptions as neutron_exception
import copy
try:
from neutronclient.common import exceptions as neutron_exception
except ImportError:
pass # clients will log absense of neutronclient in this case
from oslo.config import cfg
from ec2api.api import clients
from ec2api.api import ec2client
from ec2api.api import ec2utils
from ec2api.api import utils
from ec2api.db import api as db_api
@ -44,26 +48,30 @@ FILTER_MAP = {'vpc-id': 'vpcId',
'group-id': 'groupId'}
def get_security_group_engine():
if clients.neutronclient:
return SecurityGroupEngineNeutron()
else:
return SecurityGroupEngineNova()
def create_security_group(context, group_name, group_description,
vpc_id=None):
if vpc_id is None:
ec2 = ec2client.ec2client(context)
return ec2.create_security_group(group_name=group_name,
group_description=group_description)
vpc = ec2utils.get_db_item(context, 'vpc', vpc_id)
neutron = clients.neutron(context)
nova = clients.nova(context)
with utils.OnCrashCleaner() as cleaner:
os_security_group = neutron.create_security_group(
{'security_group':
{'name': group_name,
'description': group_description}})['security_group']
cleaner.addCleanup(neutron.delete_security_group,
os_security_group['id'])
security_group = db_api.add_item(context, 'sg',
{'vpc_id': vpc['id'],
'os_id': os_security_group['id']})
return {'return': 'true',
'groupId': security_group['id']}
os_security_group = nova.security_groups.create(group_name,
group_description)
cleaner.addCleanup(nova.security_groups.delete,
os_security_group.id)
if vpc_id:
# NOTE(Alex) Check if such vpc exists
ec2utils.get_db_item(context, 'vpc', vpc_id)
security_group = db_api.add_item(context, 'sg',
{'vpc_id': vpc_id,
'os_id': os_security_group.id})
return {'return': 'true',
'groupId': security_group['id']}
return {'return': 'true'}
def _create_default_security_group(context, vpc):
@ -78,35 +86,14 @@ def _create_default_security_group(context, vpc):
def delete_security_group(context, group_name=None, group_id=None):
if group_id is None or not group_id.startswith('sg-'):
ec2 = ec2client.ec2client(context)
return ec2.delete_security_group(group_name=group_name,
group_id=group_id)
security_group = ec2utils.get_db_item(context, 'sg', group_id)
# TODO(Alex) Check dependencies - instances and other security groups
neutron = clients.neutron(context)
try:
neutron.delete_security_group(security_group['os_id'])
except neutron_exception.Conflict as ex:
# TODO(Alex): Instance ID is unknown here, report exception message
# in its place - looks readable.
raise exception.DependencyViolation(
obj1_id=group_id,
obj2_id=ex.message)
except neutron_exception.NeutronClientException as ex:
# TODO(Alex): do log error
# TODO(Alex): adjust caught exception classes to catch:
# the port doesn't exist
pass
db_api.delete_item(context, security_group['id'])
security_group_engine.delete_group(context, group_name, group_id)
return True
def describe_security_groups(context, group_name=None, group_id=None,
filter=None):
# TODO(Alex): implement filters
neutron = clients.neutron(context)
os_security_groups = neutron.list_security_groups()['security_groups']
os_security_groups = security_group_engine.get_os_groups(context)
security_groups = ec2utils.get_db_items(context, 'sg', group_id)
formatted_security_groups = []
for os_security_group in os_security_groups:
@ -128,41 +115,36 @@ def describe_security_groups(context, group_name=None, group_id=None,
def authorize_security_group_ingress(context, group_id,
group_name, ip_permissions):
if group_id is None or not group_id.startswith('sg-'):
ec2 = ec2client.ec2client(context)
return ec2.authorize_security_group_ingress(
group_name=group_name,
group_id=group_id,
ip_permissions=ip_permissions)
return _authorize_security_group(context, group_id, ip_permissions,
'ingress')
return _authorize_security_group(context, group_id, group_name,
ip_permissions, 'ingress')
def authorize_security_group_egress(context, group_id, ip_permissions):
return _authorize_security_group(context, group_id, ip_permissions,
'egress')
return _authorize_security_group(context, group_id, None,
ip_permissions, 'egress')
def _authorize_security_group(context, group_id, ip_permissions, direction):
rule_body = _build_rule(context, group_id, ip_permissions, direction)
neutron = clients.neutron(context)
try:
os_security_group_rule = neutron.create_security_group_rule(
{'security_group_rule': rule_body})['security_group_rule']
except neutron_exception.Conflict as ex:
raise exception.RuleAlreadyExists()
def _authorize_security_group(context, group_id, group_name,
ip_permissions, direction):
rules_bodies = _build_rules(context, group_id, group_name,
ip_permissions, direction)
for rule_body in rules_bodies:
security_group_engine.authorize_security_group(context, rule_body)
return True
def _build_rule(context, group_id, ip_permissions, direction):
security_group = ec2utils.get_db_item(context, 'sg', group_id)
os_security_group_rule_body = (
{'security_group_id': security_group['os_id'],
'direction': direction,
'ethertype': 'IPv4'})
def _build_rules(context, group_id, group_name, ip_permissions, direction):
os_security_group_id = security_group_engine.get_group_os_id(context,
group_id,
group_name)
os_security_group_rule_bodies = []
if ip_permissions is None:
ip_permissions = []
for rule in ip_permissions:
os_security_group_rule_body = (
{'security_group_id': os_security_group_id,
'direction': direction,
'ethertype': 'IPv4'})
if rule.get('ip_protocol', -1) != -1:
os_security_group_rule_body['protocol'] = rule['ip_protocol']
if rule.get('from_port', -1) != -1:
@ -177,28 +159,26 @@ def _build_rule(context, group_id, ip_permissions, direction):
# For now only 1 value is supported for either.
if rule.get('groups'):
os_security_group_rule_body['remote_group_id'] = (
ec2utils.get_db_item(context, 'sg',
rule['groups'][0]['group_id'])['os_id'])
security_group_engine.get_group_os_id(
context,
rule['groups'][0].get('group_id'),
rule['groups'][0].get('group_name')))
elif rule.get('ip_ranges'):
os_security_group_rule_body['remote_ip_prefix'] = (
rule['ip_ranges'][0]['cidr_ip'])
return os_security_group_rule_body
os_security_group_rule_bodies.append(os_security_group_rule_body)
return os_security_group_rule_bodies
def revoke_security_group_ingress(context, group_id,
group_name, ip_permissions):
if group_id is None or not group_id.startswith('sg-'):
ec2 = ec2client.ec2client(context)
return ec2.revoke_security_group_ingress(
group_name=group_name,
group_id=group_id,
ip_permissions=ip_permissions)
return _revoke_security_group(context, group_id, ip_permissions,
'ingress')
return _revoke_security_group(context, group_id, group_name,
ip_permissions, 'ingress')
def revoke_security_group_egress(context, group_id, ip_permissions):
return _revoke_security_group(context, group_id, ip_permissions, 'egress')
return _revoke_security_group(context, group_id, None,
ip_permissions, 'egress')
def _are_identical_rules(rule1, rule2):
@ -208,7 +188,7 @@ def _are_identical_rules(rule1, rule2):
for key, value in rule.items():
if (value is not None and value != -1 and
value != '0.0.0.0/0' and
key not in ['id', 'tenant_id']):
key not in ['id', 'tenant_id', 'security_group_id']):
dict[key] = str(value)
return dict
@ -217,19 +197,26 @@ def _are_identical_rules(rule1, rule2):
return r1 == r2
def _revoke_security_group(context, group_id, ip_permissions, direction):
rule_body = _build_rule(context, group_id, ip_permissions, direction)
neutron = clients.neutron(context)
os_security_group = neutron.show_security_group(
rule_body['security_group_id'])['security_group']
if not os_security_group.get('security_group_rules'):
def _revoke_security_group(context, group_id, group_name, ip_permissions,
direction):
rules_bodies = _build_rules(context, group_id, group_name,
ip_permissions, direction)
if not rules_bodies:
return True
for os_rule in os_security_group['security_group_rules']:
if _are_identical_rules(os_rule, rule_body):
neutron.delete_security_group_rule(
os_rule['id'])
return True
raise exception.InvalidPermissionNotFound()
os_rules = security_group_engine.get_os_group_rules(
context, rules_bodies[0]['security_group_id'])
os_rules_to_delete = []
for rule_body in rules_bodies:
for os_rule in os_rules:
if _are_identical_rules(rule_body, os_rule):
os_rules_to_delete.append(os_rule['id'])
if len(os_rules_to_delete) != len(rules_bodies):
raise exception.InvalidPermissionNotFound()
for os_rule_id in os_rules_to_delete:
security_group_engine.delete_os_group_rule(context, os_rule_id)
return True
def _format_security_groups_ids_names(context):
@ -259,7 +246,7 @@ def _format_security_group(context, security_group, os_security_group,
ec2_security_group['groupDescription'] = os_security_group['description']
ingress_permissions = []
egress_permissions = []
for os_rule in os_security_group['security_group_rules']:
for os_rule in os_security_group.get('security_group_rules', []):
# NOTE(Alex) We're skipping IPv6 rules because AWS doesn't support
# them.
if os_rule.get('ethertype', 'IPv4') == 'IPv6':
@ -291,11 +278,181 @@ def _format_security_group(context, security_group, os_security_group,
ec2_rule['groups'] = [ec2_remote_group]
elif os_rule['remote_ip_prefix'] is not None:
ec2_rule['ipRanges'] = [{'cidrIp': os_rule['remote_ip_prefix']}]
if os_rule['direction'] == 'egress':
if os_rule.get('direction') == 'egress':
egress_permissions.append(ec2_rule)
elif os_rule['direction'] == 'ingress':
ingress_permissions.append(ec2_rule)
else:
if security_group is None and os_rule['protocol'] is None:
for protocol, min_port, max_port in (('icmp', -1, -1),
('tcp', 1, 65535),
('udp', 1, 65535)):
ec2_rule['ipProtocol'] = protocol
ec2_rule['fromPort'] = min_port
ec2_rule['toPort'] = max_port
ingress_permissions.append(copy.deepcopy(ec2_rule))
else:
ingress_permissions.append(ec2_rule)
ec2_security_group['ipPermissions'] = ingress_permissions
ec2_security_group['ipPermissionsEgress'] = egress_permissions
if security_group is not None:
ec2_security_group['ipPermissionsEgress'] = egress_permissions
return ec2_security_group
class SecurityGroupEngineNeutron(object):
def delete_group(self, context, group_name=None, group_id=None):
neutron = clients.neutron(context)
if group_id is None or not group_id.startswith('sg-'):
return SecurityGroupEngineNova().delete_group(context,
group_name,
group_id)
security_group = ec2utils.get_db_item(context, 'sg', group_id)
try:
neutron.delete_security_group(security_group['os_id'])
except neutron_exception.Conflict as ex:
# TODO(Alex): Instance ID is unknown here, report exception message
# in its place - looks readable.
raise exception.DependencyViolation(
obj1_id=group_id,
obj2_id=ex.message)
except neutron_exception.NeutronClientException as ex:
# TODO(Alex): do log error
# TODO(Alex): adjust caught exception classes to catch:
# the port doesn't exist
pass
db_api.delete_item(context, group_id)
def get_os_groups(self, context):
neutron = clients.neutron(context)
return neutron.list_security_groups()['security_groups']
def authorize_security_group(self, context, rule_body):
neutron = clients.neutron(context)
try:
os_security_group_rule = neutron.create_security_group_rule(
{'security_group_rule': rule_body})['security_group_rule']
except neutron_exception.Conflict as ex:
raise exception.RuleAlreadyExists()
def get_os_group_rules(self, context, os_id):
neutron = clients.neutron(context)
os_security_group = (
neutron.show_security_group(os_id)['security_group'])
return os_security_group.get('security_group_rules')
def delete_os_group_rule(self, context, os_id):
neutron = clients.neutron(context)
neutron.delete_security_group_rule(os_id)
def get_group_os_id(self, context, group_id, group_name):
if group_name:
return SecurityGroupEngineNova().get_group_os_id(context,
group_id,
group_name)
return ec2utils.get_db_item(context, 'sg', group_id)['os_id']
class SecurityGroupEngineNova(object):
def delete_group(self, context, group_name=None, group_id=None):
nova = clients.nova(context)
try:
nova.security_groups.delete(self.get_group_os_id(context,
group_id,
group_name))
except Exception as ex:
# TODO(Alex): do log error
# nova doesn't differentiate Conflict exception like neutron does
pass
def get_os_groups(self, context):
nova = clients.nova(context)
return self.convert_groups_to_neutron_format(
context,
nova.security_groups.list())
def authorize_security_group(self, context, rule_body):
nova = clients.nova(context)
try:
os_security_group_rule = nova.security_group_rules.create(
rule_body['security_group_id'],
rule_body.get('protocol'),
rule_body.get('port_range_min', -1),
rule_body.get('port_range_max', -1),
rule_body.get('remote_ip_prefix'),
rule_body.get('remote_group_id'))
except Exception as ex:
# TODO(Alex) resolve Conflict exceptions
raise ex
def get_os_group_rules(self, context, os_id):
nova = clients.nova(context)
os_security_group = nova.security_groups.get(os_id)
os_rules = os_security_group.rules
neutron_rules = []
for os_rule in os_rules:
neutron_rules.append(
self.convert_rule_to_neutron(context,
os_rule,
nova.security_groups.list()))
return neutron_rules
def delete_os_group_rule(self, context, os_id):
nova = clients.nova(context)
nova.security_group_rules.delete(os_id)
def convert_groups_to_neutron_format(self, context, nova_security_groups):
neutron_security_groups = []
for nova_group in nova_security_groups:
neutron_group = {'id': nova_group.id,
'name': nova_group.name,
'description': nova_group.description,
'tenant_id': nova_group.tenant_id}
neutron_rules = []
for rule in nova_group.rules:
neutron_rules.append(
self.convert_rule_to_neutron(context,
rule, nova_security_groups))
if neutron_rules:
neutron_group['security_group_rules'] = neutron_rules
neutron_security_groups.append(neutron_group)
return neutron_security_groups
def convert_rule_to_neutron(self, context, nova_rule,
nova_security_groups=None):
neutron_rule = {'id': nova_rule['id'],
'protocol': nova_rule['ip_protocol'],
'port_range_min': nova_rule['from_port'],
'port_range_max': nova_rule['to_port'],
'remote_ip_prefix': (
nova_rule.get('ip_range') or {}).get('cidr'),
'remote_group_id': None,
'direction': 'ingress',
'ethertype': 'IPv4',
'security_group_id': nova_rule['parent_group_id']}
if (nova_rule.get('group') or {}).get('name'):
neutron_rule['remote_group_id'] = (
self.get_group_os_id(context, None,
nova_rule['group']['name'],
nova_security_groups))
return neutron_rule
def get_group_os_id(self, context, group_id, group_name,
nova_security_groups=None):
nova_group = self.get_nova_group_by_name(context, group_name,
nova_security_groups)
return nova_group.id
def get_nova_group_by_name(self, context, group_name,
nova_security_groups=None):
if nova_security_groups is None:
nova = clients.nova(context)
nova_security_groups = nova.security_groups.list()
nova_group = next((g for g in nova_security_groups
if g.name == group_name), None)
if nova_group is None:
raise exception.InvalidGroupNotFound(sg_id=group_name)
return nova_group
security_group_engine = get_security_group_engine()

View File

@ -40,6 +40,9 @@ class ApiTestCase(test_base.BaseTestCase):
nova_mock = nova_patcher.start()
self.nova_servers = nova_mock.return_value.servers
self.nova_flavors = nova_mock.return_value.flavors
self.nova_security_groups = nova_mock.return_value.security_groups
self.nova_security_group_rules = (
nova_mock.return_value.security_group_rules)
self.addCleanup(nova_patcher.stop)
db_api_patcher = mock.patch('ec2api.db.api.IMPL')
self.db_api = db_api_patcher.start()

View File

@ -660,6 +660,16 @@ OS_FLOATING_IP_2 = {
# security group objects
class NovaSecurityGroup(object):
def __init__(self, nova_group_dict):
self.id = nova_group_dict['id']
self.name = nova_group_dict['name']
self.description = nova_group_dict['description']
self.tenant_id = ID_OS_PROJECT
self.rules = nova_group_dict['security_group_rules']
DB_SECURITY_GROUP_1 = {
'id': ID_EC2_SECURITY_GROUP_1,
'os_id': ID_OS_SECURITY_GROUP_1,
@ -727,6 +737,48 @@ OS_SECURITY_GROUP_2 = {
'description': 'Group description',
'tenant_id': ID_OS_PROJECT
}
NOVA_SECURITY_GROUP_RULE_1 = {
'id': random_os_id(),
'from_port': 10,
'to_port': 10,
'ip_protocol': 'tcp',
'group': None,
'ip_range': {'cidr': '192.168.1.0/24'},
'parent_group_id': ID_OS_SECURITY_GROUP_2
}
NOVA_SECURITY_GROUP_RULE_2 = {
'id': random_os_id(),
'from_port': None,
'to_port': None,
'ip_protocol': 'icmp',
'group': {'name': 'groupname'},
'ip_range': None,
'parent_group_id': ID_OS_SECURITY_GROUP_2
}
NOVA_SECURITY_GROUP_1 = {
'id': ID_OS_SECURITY_GROUP_1,
'name': 'groupname',
'security_group_rules':
[{'group': None,
'ip_range': None,
'ip_protocol': None,
'to_port': None,
'parent_group_id': ID_OS_SECURITY_GROUP_1,
'from_port': None,
'id': random_os_id()}],
'description': 'Group description',
'tenant_id': ID_OS_PROJECT
}
NOVA_SECURITY_GROUP_2 = {
'id': ID_OS_SECURITY_GROUP_2,
'name': 'groupname2',
'security_group_rules': [
NOVA_SECURITY_GROUP_RULE_1,
NOVA_SECURITY_GROUP_RULE_2
],
'description': 'Group description',
'tenant_id': ID_OS_PROJECT
}
EC2_SECURITY_GROUP_1 = {
'vpcId': ID_EC2_VPC_1,
'groupDescription': 'Group description',
@ -762,6 +814,41 @@ EC2_SECURITY_GROUP_2 = {
'ownerId': ID_OS_PROJECT,
'groupId': ID_EC2_SECURITY_GROUP_2
}
EC2_NOVA_SECURITY_GROUP_1 = {
'groupDescription': 'Group description',
'ipPermissions': None,
'groupName': 'groupname',
'ipPermissions':
[{'toPort': 65535,
'ipProtocol': 'tcp',
'fromPort': 1},
{'toPort': 65535,
'ipProtocol': 'udp',
'fromPort': 1},
{'toPort': -1,
'ipProtocol': 'icmp',
'fromPort': -1}],
'ownerId': ID_OS_PROJECT,
}
EC2_NOVA_SECURITY_GROUP_2 = {
'groupDescription': 'Group description',
'groupName': 'groupname2',
'ipPermissions':
[{'toPort': 10,
'ipProtocol': 'tcp',
'fromPort': 10,
'ipRanges':
[{'cidrIp': '192.168.1.0/24'}]
},
{'toPort': -1,
'ipProtocol': 'icmp',
'fromPort': -1,
'groups':
[{'groupName': 'groupname',
'userId': ID_OS_PROJECT}]
}],
'ownerId': ID_OS_PROJECT,
}
# route table objects

View File

@ -18,6 +18,7 @@ import copy
import mock
from neutronclient.common import exceptions as neutron_exception
from ec2api.api import security_group
from ec2api.tests import base
from ec2api.tests import fakes
from ec2api.tests import matchers
@ -27,45 +28,53 @@ from ec2api.tests import tools
class SecurityGroupTestCase(base.ApiTestCase):
def test_create_security_group(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNeutron())
self.db_api.get_item_by_id.return_value = fakes.DB_VPC_1
self.db_api.add_item.return_value = fakes.DB_SECURITY_GROUP_1
self.neutron.create_security_group.return_value = {
'security_group': fakes.OS_SECURITY_GROUP_1}
self.nova_security_groups.create.return_value = (
fakes.NovaSecurityGroup(fakes.OS_SECURITY_GROUP_1))
def check_response(resp, auto_ips=False):
self.assertEqual(200, resp['status'])
self.assertEqual(fakes.ID_EC2_SECURITY_GROUP_1, resp['groupId'])
self.db_api.add_item.assert_called_once_with(
mock.ANY, 'sg',
tools.purge_dict(fakes.DB_SECURITY_GROUP_1, ('id',)))
self.neutron.create_security_group.assert_called_once_with(
{'security_group':
{'name': 'groupname',
'description': 'Group description'}})
self.neutron.reset_mock()
self.db_api.reset_mock()
resp = self.execute(
'CreateSecurityGroup',
{'GroupName': 'groupname',
'GroupDescription': 'Group description'})
self.assertEqual(200, resp['status'])
self.nova_security_groups.create.assert_called_once_with(
'groupname', 'Group description')
self.nova_security_groups.reset_mock()
resp = self.execute(
'CreateSecurityGroup',
{'VpcId': fakes.ID_EC2_VPC_1,
'GroupName': 'groupname',
'GroupDescription': 'Group description'})
check_response(resp)
self.assertEqual(200, resp['status'])
self.assertEqual(fakes.ID_EC2_SECURITY_GROUP_1, resp['groupId'])
self.db_api.add_item.assert_called_once_with(
mock.ANY, 'sg',
tools.purge_dict(fakes.DB_SECURITY_GROUP_1, ('id',)))
self.nova_security_groups.create.assert_called_once_with(
'groupname', 'Group description')
def test_create_security_group_rollback(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNova())
self.db_api.get_item_by_id.return_value = fakes.DB_VPC_1
self.neutron.create_security_group.return_value = {
'security_group': fakes.OS_SECURITY_GROUP_1}
self.db_api.add_item.side_effect = Exception()
self.nova_security_groups.create.return_value = (
fakes.NovaSecurityGroup(fakes.OS_SECURITY_GROUP_1))
resp = self.execute(
'CreateSecurityGroup',
{'VpcId': fakes.ID_EC2_VPC_1,
'GroupName': 'groupname',
'GroupDescription': 'Group description'})
self.neutron.delete_security_group.assert_called_once_with(
self.nova_security_groups.delete.assert_called_once_with(
fakes.ID_OS_SECURITY_GROUP_1)
def test_delete_security_group(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNeutron())
self.db_api.get_item_by_id.return_value = fakes.DB_SECURITY_GROUP_1
self.db_api.get_items.return_value = []
resp = self.execute(
@ -83,7 +92,24 @@ class SecurityGroupTestCase(base.ApiTestCase):
self.neutron.delete_security_group.assert_called_once_with(
fakes.ID_OS_SECURITY_GROUP_1)
def test_delete_security_group_nova(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNova())
self.nova_security_groups.list.return_value = (
[fakes.NovaSecurityGroup(fakes.OS_SECURITY_GROUP_1),
fakes.NovaSecurityGroup(fakes.OS_SECURITY_GROUP_2)])
resp = self.execute(
'DeleteSecurityGroup',
{'GroupName':
fakes.EC2_SECURITY_GROUP_1['groupName']})
self.assertEqual(200, resp['status'])
self.assertEqual(True, resp['return'])
self.nova_security_groups.delete.assert_called_once_with(
fakes.ID_OS_SECURITY_GROUP_1)
def test_delete_security_group_no_security_group(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNeutron())
self.db_api.get_item_by_id.return_value = None
resp = self.execute(
'DeleteSecurityGroup',
@ -95,6 +121,8 @@ class SecurityGroupTestCase(base.ApiTestCase):
self.assertEqual(0, self.neutron.delete_port.call_count)
def test_delete_security_group_is_in_use(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNeutron())
self.db_api.get_item_by_id.return_value = fakes.DB_SECURITY_GROUP_1
self.neutron.delete_security_group.side_effect = (
neutron_exception.Conflict())
@ -107,6 +135,8 @@ class SecurityGroupTestCase(base.ApiTestCase):
self.assertEqual(0, self.db_api.delete_item.call_count)
def test_describe_security_groups(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNeutron())
self.db_api.get_items.return_value = [fakes.DB_SECURITY_GROUP_1,
fakes.DB_SECURITY_GROUP_2]
self.neutron.list_security_groups.return_value = (
@ -118,9 +148,26 @@ class SecurityGroupTestCase(base.ApiTestCase):
self.assertThat(resp['securityGroupInfo'],
matchers.ListMatches(
[fakes.EC2_SECURITY_GROUP_1,
fakes.EC2_SECURITY_GROUP_2]))
fakes.EC2_SECURITY_GROUP_2],
orderless_lists=True))
def test_describe_security_groups_nova(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNova())
self.nova_security_groups.list.return_value = (
[fakes.NovaSecurityGroup(fakes.NOVA_SECURITY_GROUP_1),
fakes.NovaSecurityGroup(fakes.NOVA_SECURITY_GROUP_2)])
resp = self.execute('DescribeSecurityGroups', {})
self.assertEqual(200, resp['status'])
self.assertThat(resp['securityGroupInfo'],
matchers.ListMatches(
[fakes.EC2_NOVA_SECURITY_GROUP_1,
fakes.EC2_NOVA_SECURITY_GROUP_2],
orderless_lists=True))
def test_authorize_security_group_ingress_ip_ranges(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNeutron())
self.db_api.get_item_by_id.side_effect = copy.deepcopy(
fakes.get_db_api_get_item_by_id({
fakes.ID_EC2_SECURITY_GROUP_1: fakes.DB_SECURITY_GROUP_1,
@ -140,7 +187,29 @@ class SecurityGroupTestCase(base.ApiTestCase):
tools.purge_dict(fakes.OS_SECURITY_GROUP_RULE_1,
{'id', 'remote_group_id', 'tenant_id'})})
def test_authorize_security_group_ip_ranges_nova(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNova())
self.nova_security_group_rules.create.return_value = (
{'security_group_rule': [fakes.NOVA_SECURITY_GROUP_RULE_1]})
self.nova_security_groups.list.return_value = (
[fakes.NovaSecurityGroup(fakes.NOVA_SECURITY_GROUP_1),
fakes.NovaSecurityGroup(fakes.NOVA_SECURITY_GROUP_2)])
resp = self.execute(
'AuthorizeSecurityGroupIngress',
{'GroupName': fakes.EC2_NOVA_SECURITY_GROUP_2['groupName'],
'IpPermissions.1.FromPort': '10',
'IpPermissions.1.ToPort': '10',
'IpPermissions.1.IpProtocol': 'tcp',
'IpPermissions.1.IpRanges.1.CidrIp': '192.168.1.0/24'})
self.assertEqual(200, resp['status'])
self.nova_security_group_rules.create.assert_called_once_with(
fakes.ID_OS_SECURITY_GROUP_2, 'tcp', 10, 10,
'192.168.1.0/24', None)
def test_authorize_security_group_egress_groups(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNeutron())
self.db_api.get_item_by_id.side_effect = copy.deepcopy(
fakes.get_db_api_get_item_by_id({
fakes.ID_EC2_SECURITY_GROUP_1: fakes.DB_SECURITY_GROUP_1,
@ -161,7 +230,28 @@ class SecurityGroupTestCase(base.ApiTestCase):
{'id', 'remote_ip_prefix', 'tenant_id',
'port_range_max'})})
def test_authorize_security_group_groups_nova(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNova())
self.nova_security_group_rules.create.return_value = (
{'security_group_rule': [fakes.NOVA_SECURITY_GROUP_RULE_2]})
self.nova_security_groups.list.return_value = (
[fakes.NovaSecurityGroup(fakes.NOVA_SECURITY_GROUP_1),
fakes.NovaSecurityGroup(fakes.NOVA_SECURITY_GROUP_2)])
resp = self.execute(
'AuthorizeSecurityGroupIngress',
{'GroupName': fakes.EC2_NOVA_SECURITY_GROUP_2['groupName'],
'IpPermissions.1.IpProtocol': 'icmp',
'IpPermissions.1.Groups.1.GroupName':
fakes.EC2_NOVA_SECURITY_GROUP_1['groupName']})
self.assertEqual(200, resp['status'])
self.nova_security_group_rules.create.assert_called_once_with(
fakes.ID_OS_SECURITY_GROUP_2, 'icmp', -1, -1,
None, fakes.ID_OS_SECURITY_GROUP_1)
def test_revoke_security_group_ingress_ip_ranges(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNeutron())
self.db_api.get_item_by_id.side_effect = copy.deepcopy(
fakes.get_db_api_get_item_by_id({
fakes.ID_EC2_SECURITY_GROUP_1: fakes.DB_SECURITY_GROUP_1,
@ -182,7 +272,29 @@ class SecurityGroupTestCase(base.ApiTestCase):
self.neutron.delete_security_group_rule.assert_called_once_with(
fakes.OS_SECURITY_GROUP_RULE_1['id'])
def test_revoke_security_group_ingress_ip_ranges_nova(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNova())
self.nova_security_groups.list.return_value = (
[fakes.NovaSecurityGroup(fakes.NOVA_SECURITY_GROUP_1),
fakes.NovaSecurityGroup(fakes.NOVA_SECURITY_GROUP_2)])
self.nova_security_groups.get.return_value = (
fakes.NovaSecurityGroup(fakes.NOVA_SECURITY_GROUP_2))
self.nova_security_group_rules.delete.return_value = True
resp = self.execute(
'RevokeSecurityGroupIngress',
{'GroupName': fakes.EC2_NOVA_SECURITY_GROUP_2['groupName'],
'IpPermissions.1.FromPort': '10',
'IpPermissions.1.ToPort': '10',
'IpPermissions.1.IpProtocol': 'tcp',
'IpPermissions.1.IpRanges.1.CidrIp': '192.168.1.0/24'})
self.assertEqual(200, resp['status'])
self.nova_security_group_rules.delete.assert_called_once_with(
fakes.NOVA_SECURITY_GROUP_RULE_1['id'])
def test_revoke_security_group_egress_groups(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNeutron())
self.db_api.get_item_by_id.side_effect = copy.deepcopy(
fakes.get_db_api_get_item_by_id({
fakes.ID_EC2_SECURITY_GROUP_1: fakes.DB_SECURITY_GROUP_1,
@ -202,3 +314,22 @@ class SecurityGroupTestCase(base.ApiTestCase):
fakes.ID_OS_SECURITY_GROUP_2)
self.neutron.delete_security_group_rule.assert_called_once_with(
fakes.OS_SECURITY_GROUP_RULE_2['id'])
def test_revoke_security_group_groups_nova(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNova())
self.nova_security_groups.list.return_value = (
[fakes.NovaSecurityGroup(fakes.NOVA_SECURITY_GROUP_1),
fakes.NovaSecurityGroup(fakes.NOVA_SECURITY_GROUP_2)])
self.nova_security_groups.get.return_value = (
fakes.NovaSecurityGroup(fakes.NOVA_SECURITY_GROUP_2))
self.nova_security_group_rules.delete.return_value = True
resp = self.execute(
'RevokeSecurityGroupIngress',
{'GroupName': fakes.EC2_NOVA_SECURITY_GROUP_2['groupName'],
'IpPermissions.1.IpProtocol': 'icmp',
'IpPermissions.1.Groups.1.GroupName':
fakes.EC2_NOVA_SECURITY_GROUP_1['groupName']})
self.assertEqual(200, resp['status'])
self.nova_security_group_rules.delete.assert_called_once_with(
fakes.NOVA_SECURITY_GROUP_RULE_2['id'])