Merge "Create and run a workflow within a namespace"

This commit is contained in:
Jenkins 2017-08-01 20:02:02 +00:00 committed by Gerrit Code Review
commit 3ba7b739d4
8 changed files with 264 additions and 34 deletions

View File

@ -72,8 +72,8 @@ class MistralClientBase(rest_client.RestClient):
self.action_executions = []
self.event_triggers = []
def get_list_obj(self, name):
resp, body = self.get(name)
def get_list_obj(self, url_path):
resp, body = self.get(url_path)
return resp, json.loads(body)

View File

@ -24,21 +24,30 @@ CONF = config.CONF
class MistralClientV2(base.MistralClientBase):
def post_request(self, url, file_name):
def post_request(self, url_path, file_name):
headers = {"headers": "Content-Type:text/plain"}
return self.post(url, base.get_resource(file_name), headers=headers)
return self.post(
url_path,
base.get_resource(file_name),
headers=headers
)
def post_json(self, url, obj, extra_headers={}):
def get_request(self, url_path):
headers = {"headers": "Content-Type:application/json"}
return self.get(url_path, headers=headers)
def post_json(self, url_path, obj, extra_headers={}):
headers = {"Content-Type": "application/json"}
headers = dict(headers, **extra_headers)
return self.post(url, json.dumps(obj), headers=headers)
return self.post(url_path, json.dumps(obj), headers=headers)
def update_request(self, url, file_name):
def update_request(self, url_path, file_name):
headers = {"headers": "Content-Type:text/plain"}
resp, body = self.put(
url,
url_path,
base.get_resource(file_name),
headers=headers
)
@ -64,26 +73,61 @@ class MistralClientV2(base.MistralClientBase):
return resp, json.loads(body)
def create_workflow(self, yaml_file, scope=None):
def create_workflow(self, yaml_file, scope=None, namespace=None):
url_path = 'workflows?'
if scope:
resp, body = self.post_request('workflows?scope=public', yaml_file)
else:
resp, body = self.post_request('workflows', yaml_file)
url_path += 'scope=public&'
if namespace:
url_path += 'namespace=' + namespace
resp, body = self.post_request(url_path, yaml_file)
for wf in json.loads(body)['workflows']:
self.workflows.append(wf['name'])
identifier = wf['id'] if namespace else wf['name']
self.workflows.append(identifier)
return resp, json.loads(body)
def get_workflow(self, wf_identifier, namespace=None):
url_path = 'workflows/' + wf_identifier
if namespace:
url_path += 'namespace=' + namespace
resp, body = self.get_request(url_path)
return resp, json.loads(body)
def update_workflow(self, file_name, namespace=None):
url_path = "workflows?"
if namespace:
url_path += 'namespace=' + namespace
return self.update_request(url_path, file_name=file_name)
def get_action_execution(self, action_execution_id):
return self.get('action_executions/%s' % action_execution_id)
def create_execution(self, identifier, wf_input=None, params=None):
def get_action_executions(self, task_id=None):
url_path = 'action_executions'
if task_id:
url_path += '?task_execution_id=%s' % task_id
return self.get_list_obj(url_path)
def create_execution(self, identifier, wf_namespace=None, wf_input=None,
params=None):
if uuidutils.is_uuid_like(identifier):
body = {"workflow_id": "%s" % identifier}
else:
body = {"workflow_name": "%s" % identifier}
if wf_namespace:
body.update({'workflow_namespace': wf_namespace})
if wf_input:
body.update({'input': json.dumps(wf_input)})
if params:
@ -100,6 +144,23 @@ class MistralClientV2(base.MistralClientBase):
return resp, json.loads(body)
def get_execution(self, execution_id):
return self.get('executions/%s' % execution_id)
def get_executions(self, task_id):
url_path = 'executions'
if task_id:
url_path += '?task_execution_id=%s' % task_id
return self.get_list_obj(url_path)
def get_tasks(self, execution_id=None):
url_path = 'tasks'
if execution_id:
url_path += '?workflow_execution_id=%s' % execution_id
return self.get_list_obj(url_path)
def create_cron_trigger(self, name, wf_name, wf_input=None, pattern=None,
first_time=None, count=None):
post_body = {
@ -133,11 +194,8 @@ class MistralClientV2(base.MistralClientBase):
return [t for t in all_tasks if t['workflow_name'] == wf_name]
def create_action_execution(self, request_body, extra_headers={}):
resp, body = self.post_json(
'action_executions',
request_body,
extra_headers
)
resp, body = self.post_json('action_executions', request_body,
extra_headers)
params = json.loads(request_body.get('params', '{}'))
if params.get('save_result', False):

View File

@ -226,3 +226,31 @@ class ActionExecutionTestsV2(base.TestCase):
self.assertEqual(201, resp.status)
output = json.loads(body['output'])
self.assertEqual(200, output['result']['status'])
@decorators.idempotent_id('9438e195-031c-4502-b216-6d72941ec281')
@decorators.attr(type='sanity')
def test_action_execution_of_workflow_within_namespace(self):
resp, body = self.client.create_workflow('wf_v2.yaml', namespace='abc')
wf_name = body['workflows'][0]['name']
wf_namespace = body['workflows'][0]['namespace']
self.assertEqual(201, resp.status)
resp, body = self.client.create_execution(
wf_name,
wf_namespace=wf_namespace
)
self.assertEqual(201, resp.status)
resp, body = self.client.get_list_obj('tasks')
self.assertEqual(200, resp.status)
task_id = body['tasks'][0]['id']
resp, body = self.client.get_list_obj(
'action_executions?include_output=true&task_execution_id=%s' %
task_id)
self.assertEqual(200, resp.status)
action_execution = body['action_executions'][0]
self.assertEqual(200, resp.status)
action_execution = body['action_executions'][0]
self.assertEqual(wf_namespace, action_execution['workflow_namespace'])

View File

@ -133,9 +133,7 @@ class ActionTestsV2(base.TestCase):
resp, body = self.client.create_action('action_v2.yaml')
self.assertEqual(201, resp.status)
resp, body = self.client.get_list_obj(
'actions?is_system=False'
)
resp, body = self.client.get_list_obj('actions?is_system=False')
self.assertEqual(200, resp.status)
self.assertNotEmpty(body['actions'])
@ -149,9 +147,7 @@ class ActionTestsV2(base.TestCase):
resp, body = self.client.create_action('action_v2.yaml')
self.assertEqual(201, resp.status)
resp, body = self.client.get_list_obj(
'actions?is_system=neq:False'
)
resp, body = self.client.get_list_obj('actions?is_system=neq:False')
self.assertEqual(200, resp.status)
self.assertNotEmpty(body['actions'])
@ -169,8 +165,7 @@ class ActionTestsV2(base.TestCase):
_, body = self.client.get_object('actions', created_acts[0])
time = body['created_at']
resp, body = self.client.get_list_obj(
'actions?created_at=in:' + time.replace(' ', '%20')
)
'actions?created_at=in:' + time.replace(' ', '%20'))
self.assertEqual(200, resp.status)
action_names = [action['name'] for action in body['actions']]

View File

@ -107,7 +107,7 @@ class EventTriggerTestsV2(base.TestCase):
@decorators.attr(type='negative')
@decorators.idempotent_id('56b90a90-9ff3-42f8-a9eb-04a77198710e')
def test_get_nonexistent_event_trigger(self):
fake_id = '123e4567-e89b-12d3-a456-426655440000'
fake_id = '3771c152-d1a7-4a82-8a50-c79d122012dc'
self.assertRaises(exceptions.NotFound,
self.client.get_object,

View File

@ -19,6 +19,8 @@ from tempest.lib import exceptions
from mistral import utils
from mistral_tempest_tests.tests import base
import json
class ExecutionTestsV2(base.TestCase):
@ -72,8 +74,7 @@ class ExecutionTestsV2(base.TestCase):
self.assertIn(exec_id_2, [ex['id'] for ex in body['executions']])
resp, body = self.client.get_list_obj(
'executions?limit=1&sort_keys=workflow_name&sort_dirs=asc'
)
'executions?limit=1&sort_keys=workflow_name&sort_dirs=asc')
self.assertEqual(200, resp.status)
self.assertEqual(1, len(body['executions']))
@ -127,8 +128,8 @@ class ExecutionTestsV2(base.TestCase):
def test_create_execution_for_reverse_wf(self):
resp, body = self.client.create_execution(
self.reverse_wf['name'],
{self.reverse_wf['input']: "Bye"},
{"task_name": "goodbye"})
wf_input={self.reverse_wf['input']: "Bye"},
params={"task_name": "goodbye"})
exec_id = body['id']
self.assertEqual(201, resp.status)
@ -327,3 +328,91 @@ class ExecutionTestsV2(base.TestCase):
'executions',
exec_id
)
@decorators.idempotent_id('a882876b-7565-4f7f-9714-d99032ffaabb')
@decorators.attr(type='sanity')
def test_workflow_execution_of_nested_workflows_within_namespace(self):
low_wf = 'for_wf_namespace/lowest_level_wf.yaml'
middle_wf = 'for_wf_namespace/middle_wf.yaml'
top_wf = 'for_wf_namespace/top_level_wf.yaml'
resp, wf = self.client.create_workflow(low_wf)
self.assertEqual(201, resp.status)
namespace = 'abc'
resp, wf = self.client.create_workflow(low_wf, namespace=namespace)
self.assertEqual(201, resp.status)
resp, wf = self.client.create_workflow(middle_wf)
self.assertEqual(201, resp.status)
resp, wf = self.client.create_workflow(top_wf)
self.assertEqual(201, resp.status)
resp, wf = self.client.create_workflow(top_wf, namespace=namespace)
self.assertEqual(201, resp.status)
wf_name = wf['workflows'][0]['name']
resp, top_execution = self.client.create_execution(wf_name, namespace)
self.assertEqual(201, resp.status)
self.assertEqual('RUNNING', top_execution['state'])
self.assertEqual(wf_name, top_execution['workflow_name'])
self.assertEqual(wf_name, top_execution['workflow_name'])
self.assertEqual(namespace, top_execution['workflow_namespace'])
self.client.wait_execution(top_execution, target_state='SUCCESS')
self.assertEqual(
namespace,
json.loads(top_execution['params'])['namespace']
)
resp, tasks = self.client.get_tasks(top_execution['id'])
top_task = tasks['tasks'][0]
self.assertEqual(wf_name, top_task['workflow_name'])
self.assertEqual(namespace, top_task['workflow_namespace'])
resp, executions = self.client.get_executions(top_task['id'])
middle_execution = executions['executions'][0]
self.assertEqual('middle_wf', middle_execution['workflow_name'])
self.assertEqual('', middle_execution['workflow_namespace'])
self.assertEqual(
namespace,
json.loads(middle_execution['params'])['namespace']
)
resp, tasks = self.client.get_tasks(middle_execution['id'])
middle_task = tasks['tasks'][0]
self.assertEqual('middle_wf', middle_task['workflow_name'])
self.assertEqual('', middle_task['workflow_namespace'])
resp, executions = self.client.get_executions(middle_task['id'])
lowest_execution = executions['executions'][0]
self.assertEqual('lowest_level_wf', lowest_execution['workflow_name'])
self.assertEqual(namespace, lowest_execution['workflow_namespace'])
self.assertEqual(
namespace,
json.loads(lowest_execution['params'])['namespace']
)
resp, tasks = self.client.get_tasks(lowest_execution['id'])
lowest_task = tasks['tasks'][0]
self.assertEqual('lowest_level_wf', lowest_task['workflow_name'])
self.assertEqual(namespace, lowest_task['workflow_namespace'])
resp, action_executions = self.client.get_action_executions(
lowest_task['id']
)
action_execution = action_executions['action_executions'][0]
self.assertEqual('lowest_level_wf', action_execution['workflow_name'])
self.assertEqual(namespace, action_execution['workflow_namespace'])

View File

@ -63,8 +63,7 @@ class TasksTestsV2(base.TestCase):
@decorators.idempotent_id('3230d694-40fd-4094-ad12-024f40a21b94')
def test_get_tasks_of_execution(self):
resp, body = self.client.get_list_obj(
'tasks?workflow_execution_id=%s' % self.execution_id
)
'tasks?workflow_execution_id=%s' % self.execution_id)
self.assertEqual(200, resp.status)
self.assertEqual(

View File

@ -242,6 +242,58 @@ class WorkflowTestsV2(base.TestCase):
self.assertEqual(200, resp.status)
self.assertEqual(name, body['workflows'][0]['name'])
@decorators.attr(type='sanity')
@decorators.idempotent_id('42f5d135-a2b8-4a31-8135-c5ce8c5f1ed5')
def test_workflow_within_namespace(self):
self.useFixture(lockutils.LockFixture('mistral-workflow'))
namespace = 'abc'
resp, body = self.client.create_workflow(
'single_wf.yaml',
namespace=namespace
)
name = body['workflows'][0]['name']
id = body['workflows'][0]['id']
self.assertEqual(201, resp.status)
self.assertEqual(name, body['workflows'][0]['name'])
resp, body = self.client.get_workflow(
id
)
self.assertEqual(namespace, body['namespace'])
resp, body = self.client.update_workflow('single_wf.yaml', namespace)
self.assertEqual(200, resp.status)
self.assertEqual(name, body['workflows'][0]['name'])
self.assertEqual(namespace, body['workflows'][0]['namespace'])
namespace = 'abc2'
resp, body = self.client.create_workflow(
'single_wf.yaml',
namespace=namespace
)
name = body['workflows'][0]['name']
id = body['workflows'][0]['id']
self.assertEqual(201, resp.status)
self.assertEqual(name, body['workflows'][0]['name'])
resp, body = self.client.get_workflow(id)
self.assertEqual(namespace, body['namespace'])
self.assertRaises(exceptions.NotFound, self.client.get_workflow, name)
self.client.create_workflow(
'single_wf.yaml'
)
resp, body = self.client.get_workflow(id)
self.assertEqual(200, resp.status)
@decorators.attr(type='sanity')
@decorators.idempotent_id('02bc1fc3-c31a-4e37-bb3d-eda46818505c')
def test_get_workflow_definition(self):
@ -280,6 +332,15 @@ class WorkflowTestsV2(base.TestCase):
self.assertRaises(exceptions.NotFound, self.client.get_object,
'workflows', 'nonexist')
exception = self.assertRaises(
exceptions.NotFound,
self.client.get_workflow,
'nonexist_wf',
'nonexist_namespace'
)
self.assertIn('nonexist_wf', str(exception))
self.assertIn('nonexist_namespace', str(exception))
@decorators.attr(type='negative')
@decorators.idempotent_id('6b917213-7f11-423a-8fe0-55795dcf0fb2')
def test_double_create_workflows(self):