Merge "Role based resource access control - get workflows"

This commit is contained in:
Jenkins 2017-01-10 11:54:03 +00:00 committed by Gerrit Code Review
commit 2ab8dc9c37
2 changed files with 62 additions and 2 deletions

View File

@ -11,6 +11,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
from oslo_concurrency.fixture import lockutils
from tempest.lib import exceptions
@ -42,6 +43,62 @@ class WorkflowTestsV2(base.TestCase):
self.assertNotIn('next', body)
@test.attr(type='smoke')
def test_get_list_workflows_by_admin(self):
self.useFixture(lockutils.LockFixture('mistral-workflow'))
_, body = self.client.create_workflow('wf_v2.yaml')
name = body['workflows'][0]['name']
resp, raw_body = self.admin_client.get('workflows?all_projects=true')
body = json.loads(raw_body)
self.assertEqual(200, resp.status)
names = [wf['name'] for wf in body['workflows']]
self.assertIn(name, names)
@test.attr(type='smoke')
def test_get_list_workflows_with_project_by_admin(self):
self.useFixture(lockutils.LockFixture('mistral-workflow'))
_, body = self.client.create_workflow('wf_v2.yaml')
name = body['workflows'][0]['name']
resp, raw_body = self.admin_client.get(
'workflows?project_id=%s' %
self.client.auth_provider.credentials.tenant_id
)
body = json.loads(raw_body)
self.assertEqual(200, resp.status)
names = [wf['name'] for wf in body['workflows']]
self.assertIn(name, names)
@test.attr(type='smoke')
def test_get_list_other_project_private_workflows(self):
self.useFixture(lockutils.LockFixture('mistral-workflow'))
_, body = self.client.create_workflow('wf_v2.yaml')
name = body['workflows'][0]['name']
resp, raw_body = self.alt_client.get(
'workflows?project_id=%s' %
self.client.auth_provider.credentials.tenant_id
)
body = json.loads(raw_body)
self.assertEqual(200, resp.status)
names = [wf['name'] for wf in body['workflows']]
self.assertNotIn(name, names)
@test.attr(type='smoke')
def test_get_list_workflows_with_fields(self):
resp, body = self.client.get_list_obj('workflows?fields=name')

View File

@ -25,7 +25,7 @@ CONF = config.CONF
class TestCase(test.BaseTestCase):
credentials = ['primary', 'alt']
credentials = ['admin', 'primary', 'alt']
@classmethod
def skip_checks(cls):
@ -46,12 +46,15 @@ class TestCase(test.BaseTestCase):
if 'WITHOUT_AUTH' in os.environ:
cls.mgr = mock.MagicMock()
cls.mgr.auth_provider = service_base.AuthProv()
cls.alt_mgr = cls.mgr
cls.admin_mgr = cls.alt_mgr = cls.mgr
else:
cls.admin_mgr = cls.admin_manager
cls.mgr = cls.manager
cls.alt_mgr = cls.alt_manager
if cls._service == 'workflowv2':
cls.admin_client = mistral_client.MistralClientV2(
cls.admin_mgr.auth_provider, cls._service)
cls.client = mistral_client.MistralClientV2(
cls.mgr.auth_provider, cls._service)
cls.alt_client = mistral_client.MistralClientV2(