Add RBAC policy for ec2 API security groups calls

The revoke_security_group_ingress, revoke_security_group_ingress, and
delete_security_group calls in the ec2 API were not restricted by policy
checks.  This prevented a deployer from restricting their usage via
roles or other checks.  Checks have been added for these calls.

Based on commit d4056f8723 but modified
for the backport.

Closes-Bug: #1290537
Change-Id: I4bf681bedd68ed2216b429d34db735823e0a6189
This commit is contained in:
Andrew Laski 2014-04-09 09:27:44 -04:00
parent 17445aa7f2
commit dbb7dd03fe
2 changed files with 54 additions and 0 deletions

View File

@ -30,6 +30,7 @@ from oslo.config import cfg
from nova.api.ec2 import ec2utils
from nova.api.ec2 import inst_state
from nova.api.metadata import password
from nova.api.openstack import extensions
from nova.api import validator
from nova import availability_zones
from nova import block_device
@ -85,6 +86,9 @@ LOG = logging.getLogger(__name__)
QUOTAS = quota.QUOTAS
security_group_authorizer = extensions.extension_authorizer('compute',
'security_groups')
def validate_ec2_id(val):
if not validator.validate_str()(val):
@ -631,6 +635,8 @@ class CloudController(object):
security_group = self.security_group_api.get(context, group_name,
group_id)
security_group_authorizer(context, security_group)
prevalues = kwargs.get('ip_permissions', [kwargs])
rule_ids = []
@ -665,6 +671,8 @@ class CloudController(object):
security_group = self.security_group_api.get(context, group_name,
group_id)
security_group_authorizer(context, security_group)
prevalues = kwargs.get('ip_permissions', [kwargs])
postvalues = []
for values in prevalues:
@ -737,6 +745,8 @@ class CloudController(object):
security_group = self.security_group_api.get(context, group_name,
group_id)
security_group_authorizer(context, security_group)
self.security_group_api.destroy(context, security_group)
return True

View File

@ -23,6 +23,7 @@ import copy
import datetime
import functools
import iso8601
import mock
import os
import string
import tempfile
@ -47,6 +48,7 @@ from nova.image import s3
from nova.network import api as network_api
from nova.network import neutronv2
from nova.openstack.common import log as logging
from nova.openstack.common import policy as common_policy
from nova.openstack.common import timeutils
from nova import test
from nova.tests.api.openstack.compute.contrib import (
@ -471,6 +473,34 @@ class CloudTestCase(test.TestCase):
delete = self.cloud.delete_security_group
self.assertRaises(exception.MissingParameter, delete, self.context)
def test_delete_security_group_policy_not_allowed(self):
rules = common_policy.Rules(
{'compute_extension:security_groups':
common_policy.parse_rule('project_id:%(project_id)s')})
common_policy.set_rules(rules)
with mock.patch.object(self.cloud.security_group_api,
'get') as get:
get.return_value = {'project_id': 'invalid'}
self.assertRaises(exception.PolicyNotAuthorized,
self.cloud.delete_security_group, self.context,
'fake-name', 'fake-id')
def test_authorize_security_group_ingress_policy_not_allowed(self):
rules = common_policy.Rules(
{'compute_extension:security_groups':
common_policy.parse_rule('project_id:%(project_id)s')})
common_policy.set_rules(rules)
with mock.patch.object(self.cloud.security_group_api,
'get') as get:
get.return_value = {'project_id': 'invalid'}
self.assertRaises(exception.PolicyNotAuthorized,
self.cloud.authorize_security_group_ingress, self.context,
'fake-name', 'fake-id')
def test_authorize_security_group_ingress(self):
kwargs = {'project_id': self.context.project_id, 'name': 'test'}
sec = db.security_group_create(self.context, kwargs)
@ -575,6 +605,20 @@ class CloudTestCase(test.TestCase):
db.security_group_destroy(self.context, sec2['id'])
db.security_group_destroy(self.context, sec1['id'])
def test_revoke_security_group_ingress_policy_not_allowed(self):
rules = common_policy.Rules(
{'compute_extension:security_groups':
common_policy.parse_rule('project_id:%(project_id)s')})
common_policy.set_rules(rules)
with mock.patch.object(self.cloud.security_group_api,
'get') as get:
get.return_value = {'project_id': 'invalid'}
self.assertRaises(exception.PolicyNotAuthorized,
self.cloud.revoke_security_group_ingress, self.context,
'fake-name', 'fake-id')
def test_revoke_security_group_ingress(self):
kwargs = {'project_id': self.context.project_id, 'name': 'test'}
sec = db.security_group_create(self.context, kwargs)