Role based resource access control - update workflows

- Admin user could update other user's workflow.
- Admin user could update system workflow. Before this change, it is
  impossible to update system workflow.

TODO:
- Implement delete workflow by admin
- Implement for other resources(workfbook/execution/task/action, etc.)

Partially implements: blueprint mistral-rbac

Change-Id: Ia9372c9a4ffe1904e489513912596b9052ab3142
This commit is contained in:
Lingxian Kong 2017-01-17 23:07:39 +13:00
parent 05d76ca80b
commit 002ba71a18
2 changed files with 46 additions and 2 deletions

View File

@ -28,6 +28,7 @@ import sqlalchemy as sa
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert
from mistral import context as auth_ctx
from mistral.db.sqlalchemy import base as b
from mistral.db.sqlalchemy import model_base as mb
from mistral.db.sqlalchemy import sqlite_lock
@ -487,14 +488,15 @@ def create_workflow_definition(values, session=None):
@b.session_aware()
def update_workflow_definition(identifier, values, session=None):
wf_def = get_workflow_definition(identifier)
ctx = auth_ctx.ctx()
if wf_def.project_id != security.get_project_id():
if not ctx.is_admin and wf_def.project_id != security.get_project_id():
raise exc.NotAllowedException(
"Can not update workflow of other tenants. "
"[workflow_identifier=%s]" % identifier
)
if wf_def.is_system:
if not ctx.is_admin and wf_def.is_system:
raise exc.InvalidActionException(
"Attempt to modify a system workflow: %s" % identifier
)

View File

@ -615,6 +615,48 @@ class WorkflowDefinitionTest(SQLAlchemyTest):
{'definition': 'my new definition', 'scope': 'private'}
)
def test_update_other_project_workflow_by_admin(self):
created = db_api.create_workflow_definition(WF_DEFINITIONS[0])
# Switch to admin.
auth_context.set_ctx(test_base.get_context(default=False, admin=True))
updated = db_api.update_workflow_definition(
created['id'],
{
'definition': 'my new definition',
'scope': 'public',
}
)
self.assertEqual('my new definition', updated.definition)
# Switch back.
auth_context.set_ctx(test_base.get_context())
fetched = db_api.get_workflow_definition(created['id'])
self.assertEqual(updated, fetched)
def test_update_system_workflow_by_admin(self):
system_workflow = copy.deepcopy(WF_DEFINITIONS[0])
system_workflow['is_system'] = True
created = db_api.create_workflow_definition(system_workflow)
# Switch to admin.
auth_context.set_ctx(test_base.get_context(default=False, admin=True))
updated = db_api.update_workflow_definition(
created['id'],
{
'definition': 'my new definition',
'scope': 'public'
}
)
self.assertEqual('my new definition', updated.definition)
def test_create_or_update_workflow_definition(self):
name = WF_DEFINITIONS[0]['name']