Authorize super admin actions on all projects

This allows admin super user (user with admin role in admin_project)
to do stack operations across all projects.

Change-Id: Ifbf56fde02b89248ee788e6a212ef9d11e665dc0
Partial-Bug: #1466694
This commit is contained in:
rabi 2016-05-16 10:31:10 +05:30 committed by Rabi Mishra
parent 9722a78840
commit ac86702172
7 changed files with 46 additions and 8 deletions

View File

@ -1,5 +1,5 @@
{
"context_is_admin": "role:admin",
"context_is_admin": "role:admin and auth_token_info.token.is_admin_project:True",
"deny_stack_user": "not role:heat_stack_user",
"deny_everybody": "!",

View File

@ -28,7 +28,7 @@ def policy_enforce(handler):
"""
@six.wraps(handler)
def handle_stack_method(controller, req, tenant_id, **kwargs):
if req.context.tenant_id != tenant_id:
if req.context.tenant_id != tenant_id and not req.context.is_admin:
raise exc.HTTPForbidden()
allowed = req.context.policy.enforce(context=req.context,
action=handler.__name__,

View File

@ -419,9 +419,10 @@ def stack_get(context, stack_id, show_deleted=False, tenant_safe=True,
# One exception to normal project scoping is users created by the
# stacks in the stack_user_project_id (in the heat stack user domain)
if (tenant_safe and result is not None and context is not None and
context.tenant_id not in (result.tenant,
result.stack_user_project_id)):
if (tenant_safe and result is not None
and context is not None and not context.is_admin
and context.tenant_id not in (result.tenant,
result.stack_user_project_id)):
return None
return result
@ -492,7 +493,7 @@ def _query_stack_get_all(context, tenant_safe=True, show_deleted=False,
context, models.Stack, show_deleted=show_deleted
).filter_by(owner_id=None)
if tenant_safe:
if tenant_safe and not context.is_admin:
query = query.filter_by(tenant=context.tenant_id)
query = query.options(orm.subqueryload("tags"))
@ -970,7 +971,7 @@ def software_config_get(context, config_id):
def software_config_get_all(context, limit=None, marker=None,
tenant_safe=True):
query = model_query(context, models.SoftwareConfig)
if tenant_safe:
if tenant_safe and not context.is_admin:
query = query.filter_by(tenant=context.tenant_id)
return _paginate_query(context, query, models.SoftwareConfig,
limit=limit, marker=marker).all()

View File

@ -486,7 +486,8 @@ class EngineService(service.Service):
raise exception.EntityNotFound(entity='Stack',
name=identity.stack_name)
if cnxt.tenant_id not in (identity.tenant, s.stack_user_project_id):
if not cnxt.is_admin and cnxt.tenant_id not in (
identity.tenant, s.stack_user_project_id):
# The DB API should not allow this, but sanity-check anyway..
raise exception.InvalidTenant(target=identity.tenant,
actual=cnxt.tenant_id)

View File

@ -110,6 +110,18 @@ class TestPolicyEnforce(common.HeatTestCase):
self.controller.an_action,
self.req, tenant_id='bar')
@mock.patch.object(policy.Enforcer, 'enforce')
def test_policy_enforce_tenant_mismatch_is_admin(self, mock_enforce):
self.req.context = context.RequestContext(tenant_id='foo',
is_admin=True)
mock_enforce.return_value = True
self.assertEqual('woot',
self.controller.an_action(self.req, 'foo'))
self.assertEqual('woot',
self.controller.an_action(self.req, 'bar'))
@mock.patch.object(policy.Enforcer, 'enforce')
def test_policy_enforce_policy_deny(self, mock_enforce):
mock_enforce.return_value = False

View File

@ -1847,11 +1847,19 @@ class DBAPIStackTest(common.HeatTestCase):
def test_stack_get_can_return_a_stack_from_different_tenant(self):
stack = create_stack(self.ctx, self.template, self.user_creds)
self.ctx.tenant_id = 'abc'
# with tenant_safe = False
ret_stack = db_api.stack_get(self.ctx, stack.id,
show_deleted=False, tenant_safe=False)
self.assertEqual(stack.id, ret_stack.id)
self.assertEqual('db_test_stack_name', ret_stack.name)
# with ctx.is_admin = True
self.ctx.is_admin = True
ret_stack = db_api.stack_get(self.ctx, stack.id,
show_deleted=False)
self.assertEqual(stack.id, ret_stack.id)
self.assertEqual('db_test_stack_name', ret_stack.name)
def test_stack_get_by_name(self):
stack = create_stack(self.ctx, self.template, self.user_creds)
ret_stack = db_api.stack_get_by_name(self.ctx, stack.name)
@ -1934,6 +1942,21 @@ class DBAPIStackTest(common.HeatTestCase):
stacks = db_api.stack_get_all(self.ctx, tenant_safe=False)
self.assertEqual(5, len(stacks))
def test_stack_get_all_with_admin_context(self):
values = [
{'tenant': UUID1},
{'tenant': UUID1},
{'tenant': UUID2},
{'tenant': UUID2},
{'tenant': UUID2},
]
[create_stack(self.ctx, self.template, self.user_creds,
**val) for val in values]
self.ctx.is_admin = True
stacks = db_api.stack_get_all(self.ctx)
self.assertEqual(5, len(stacks))
def test_stack_count_all_with_regular_tenant(self):
values = [
{'tenant': UUID1},

View File

@ -50,6 +50,7 @@ class EncryptionVolTypeTest(functional_base.FunctionalTestsBase):
# Temporarily switch to admin
self.conf.username = self.conf.admin_username
self.conf.password = self.conf.admin_password
self.conf.tenant_name = 'admin'
self.manager = clients.ClientManager(self.conf)
self.client = self.manager.orchestration_client
self.volume_client = self.manager.volume_client