Role based resource access control - get workflows

We already supported role based api access control, this series patches
will implement resource access control for mistral, so that
administrator could define the rules of resource accessibility, e.g.
admin user could get/delete/update the workflows of other tenants
according to the policy.

TODO:
- Implement update workflow by admin
- Implement delete workflow by admin
- Implement for other resources(workfbook/execution/task/action, etc.)

Partially implements: blueprint mistral-rbac

Change-Id: I8b00e8a260a74457ad037ee7322a7cba9ae34fab
This commit is contained in:
Lingxian Kong 2016-12-21 13:28:45 +13:00
parent 9490cd0eba
commit 9f18358774
2 changed files with 62 additions and 2 deletions

View File

@ -11,6 +11,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
from oslo_concurrency.fixture import lockutils
from tempest.lib import exceptions
@ -42,6 +43,62 @@ class WorkflowTestsV2(base.TestCase):
self.assertNotIn('next', body)
@test.attr(type='smoke')
def test_get_list_workflows_by_admin(self):
self.useFixture(lockutils.LockFixture('mistral-workflow'))
_, body = self.client.create_workflow('wf_v2.yaml')
name = body['workflows'][0]['name']
resp, raw_body = self.admin_client.get('workflows?all_projects=true')
body = json.loads(raw_body)
self.assertEqual(200, resp.status)
names = [wf['name'] for wf in body['workflows']]
self.assertIn(name, names)
@test.attr(type='smoke')
def test_get_list_workflows_with_project_by_admin(self):
self.useFixture(lockutils.LockFixture('mistral-workflow'))
_, body = self.client.create_workflow('wf_v2.yaml')
name = body['workflows'][0]['name']
resp, raw_body = self.admin_client.get(
'workflows?project_id=%s' %
self.client.auth_provider.credentials.tenant_id
)
body = json.loads(raw_body)
self.assertEqual(200, resp.status)
names = [wf['name'] for wf in body['workflows']]
self.assertIn(name, names)
@test.attr(type='smoke')
def test_get_list_other_project_private_workflows(self):
self.useFixture(lockutils.LockFixture('mistral-workflow'))
_, body = self.client.create_workflow('wf_v2.yaml')
name = body['workflows'][0]['name']
resp, raw_body = self.alt_client.get(
'workflows?project_id=%s' %
self.client.auth_provider.credentials.tenant_id
)
body = json.loads(raw_body)
self.assertEqual(200, resp.status)
names = [wf['name'] for wf in body['workflows']]
self.assertNotIn(name, names)
@test.attr(type='smoke')
def test_get_list_workflows_with_fields(self):
resp, body = self.client.get_list_obj('workflows?fields=name')

View File

@ -25,7 +25,7 @@ CONF = config.CONF
class TestCase(test.BaseTestCase):
credentials = ['primary', 'alt']
credentials = ['admin', 'primary', 'alt']
@classmethod
def skip_checks(cls):
@ -46,12 +46,15 @@ class TestCase(test.BaseTestCase):
if 'WITHOUT_AUTH' in os.environ:
cls.mgr = mock.MagicMock()
cls.mgr.auth_provider = service_base.AuthProv()
cls.alt_mgr = cls.mgr
cls.admin_mgr = cls.alt_mgr = cls.mgr
else:
cls.admin_mgr = cls.admin_manager
cls.mgr = cls.manager
cls.alt_mgr = cls.alt_manager
if cls._service == 'workflowv2':
cls.admin_client = mistral_client.MistralClientV2(
cls.admin_mgr.auth_provider, cls._service)
cls.client = mistral_client.MistralClientV2(
cls.mgr.auth_provider, cls._service)
cls.alt_client = mistral_client.MistralClientV2(