Create and run a workflow within a namespace

Allow adding many workflows with the same name.
The way this works is by adding the new workflows under another
namespace.

Notice that:
  1. Namespaces are not part of the mistral DSL.
  2. When executing a workflow, the namespace passes down to the
     subworkflow.
  3. When searching for the sub-workflow definition -
     If no workflow was found in the given namespace, than the defualt
     namespace will be used.
  4. The default namespace is an empty string.
  5. The namespace property or the workflow execution will be under
     params

Partially-Implements: blueprint create-and-run-workflows-within-a-namespace

Change-Id: Id248ec5906a0899d188675d002b45f6249d36d90
This commit is contained in:
Michal Gershenzon 2017-07-19 18:30:38 +00:00 committed by Dougal Matthews
parent d30ff30efa
commit 0e2962f679
8 changed files with 264 additions and 34 deletions

View File

@ -72,8 +72,8 @@ class MistralClientBase(rest_client.RestClient):
self.action_executions = []
self.event_triggers = []
def get_list_obj(self, name):
resp, body = self.get(name)
def get_list_obj(self, url_path):
resp, body = self.get(url_path)
return resp, json.loads(body)

View File

@ -24,21 +24,30 @@ CONF = config.CONF
class MistralClientV2(base.MistralClientBase):
def post_request(self, url, file_name):
def post_request(self, url_path, file_name):
headers = {"headers": "Content-Type:text/plain"}
return self.post(url, base.get_resource(file_name), headers=headers)
return self.post(
url_path,
base.get_resource(file_name),
headers=headers
)
def post_json(self, url, obj, extra_headers={}):
def get_request(self, url_path):
headers = {"headers": "Content-Type:application/json"}
return self.get(url_path, headers=headers)
def post_json(self, url_path, obj, extra_headers={}):
headers = {"Content-Type": "application/json"}
headers = dict(headers, **extra_headers)
return self.post(url, json.dumps(obj), headers=headers)
return self.post(url_path, json.dumps(obj), headers=headers)
def update_request(self, url, file_name):
def update_request(self, url_path, file_name):
headers = {"headers": "Content-Type:text/plain"}
resp, body = self.put(
url,
url_path,
base.get_resource(file_name),
headers=headers
)
@ -64,26 +73,61 @@ class MistralClientV2(base.MistralClientBase):
return resp, json.loads(body)
def create_workflow(self, yaml_file, scope=None):
def create_workflow(self, yaml_file, scope=None, namespace=None):
url_path = 'workflows?'
if scope:
resp, body = self.post_request('workflows?scope=public', yaml_file)
else:
resp, body = self.post_request('workflows', yaml_file)
url_path += 'scope=public&'
if namespace:
url_path += 'namespace=' + namespace
resp, body = self.post_request(url_path, yaml_file)
for wf in json.loads(body)['workflows']:
self.workflows.append(wf['name'])
identifier = wf['id'] if namespace else wf['name']
self.workflows.append(identifier)
return resp, json.loads(body)
def get_workflow(self, wf_identifier, namespace=None):
url_path = 'workflows/' + wf_identifier
if namespace:
url_path += 'namespace=' + namespace
resp, body = self.get_request(url_path)
return resp, json.loads(body)
def update_workflow(self, file_name, namespace=None):
url_path = "workflows?"
if namespace:
url_path += 'namespace=' + namespace
return self.update_request(url_path, file_name=file_name)
def get_action_execution(self, action_execution_id):
return self.get('action_executions/%s' % action_execution_id)
def create_execution(self, identifier, wf_input=None, params=None):
def get_action_executions(self, task_id=None):
url_path = 'action_executions'
if task_id:
url_path += '?task_execution_id=%s' % task_id
return self.get_list_obj(url_path)
def create_execution(self, identifier, wf_namespace=None, wf_input=None,
params=None):
if uuidutils.is_uuid_like(identifier):
body = {"workflow_id": "%s" % identifier}
else:
body = {"workflow_name": "%s" % identifier}
if wf_namespace:
body.update({'workflow_namespace': wf_namespace})
if wf_input:
body.update({'input': json.dumps(wf_input)})
if params:
@ -100,6 +144,23 @@ class MistralClientV2(base.MistralClientBase):
return resp, json.loads(body)
def get_execution(self, execution_id):
return self.get('executions/%s' % execution_id)
def get_executions(self, task_id):
url_path = 'executions'
if task_id:
url_path += '?task_execution_id=%s' % task_id
return self.get_list_obj(url_path)
def get_tasks(self, execution_id=None):
url_path = 'tasks'
if execution_id:
url_path += '?workflow_execution_id=%s' % execution_id
return self.get_list_obj(url_path)
def create_cron_trigger(self, name, wf_name, wf_input=None, pattern=None,
first_time=None, count=None):
post_body = {
@ -133,11 +194,8 @@ class MistralClientV2(base.MistralClientBase):
return [t for t in all_tasks if t['workflow_name'] == wf_name]
def create_action_execution(self, request_body, extra_headers={}):
resp, body = self.post_json(
'action_executions',
request_body,
extra_headers
)
resp, body = self.post_json('action_executions', request_body,
extra_headers)
params = json.loads(request_body.get('params', '{}'))
if params.get('save_result', False):

View File

@ -226,3 +226,31 @@ class ActionExecutionTestsV2(base.TestCase):
self.assertEqual(201, resp.status)
output = json.loads(body['output'])
self.assertEqual(200, output['result']['status'])
@decorators.idempotent_id('9438e195-031c-4502-b216-6d72941ec281')
@decorators.attr(type='sanity')
def test_action_execution_of_workflow_within_namespace(self):
resp, body = self.client.create_workflow('wf_v2.yaml', namespace='abc')
wf_name = body['workflows'][0]['name']
wf_namespace = body['workflows'][0]['namespace']
self.assertEqual(201, resp.status)
resp, body = self.client.create_execution(
wf_name,
wf_namespace=wf_namespace
)
self.assertEqual(201, resp.status)
resp, body = self.client.get_list_obj('tasks')
self.assertEqual(200, resp.status)
task_id = body['tasks'][0]['id']
resp, body = self.client.get_list_obj(
'action_executions?include_output=true&task_execution_id=%s' %
task_id)
self.assertEqual(200, resp.status)
action_execution = body['action_executions'][0]
self.assertEqual(200, resp.status)
action_execution = body['action_executions'][0]
self.assertEqual(wf_namespace, action_execution['workflow_namespace'])

View File

@ -133,9 +133,7 @@ class ActionTestsV2(base.TestCase):
resp, body = self.client.create_action('action_v2.yaml')
self.assertEqual(201, resp.status)
resp, body = self.client.get_list_obj(
'actions?is_system=False'
)
resp, body = self.client.get_list_obj('actions?is_system=False')
self.assertEqual(200, resp.status)
self.assertNotEmpty(body['actions'])
@ -149,9 +147,7 @@ class ActionTestsV2(base.TestCase):
resp, body = self.client.create_action('action_v2.yaml')
self.assertEqual(201, resp.status)
resp, body = self.client.get_list_obj(
'actions?is_system=neq:False'
)
resp, body = self.client.get_list_obj('actions?is_system=neq:False')
self.assertEqual(200, resp.status)
self.assertNotEmpty(body['actions'])
@ -169,8 +165,7 @@ class ActionTestsV2(base.TestCase):
_, body = self.client.get_object('actions', created_acts[0])
time = body['created_at']
resp, body = self.client.get_list_obj(
'actions?created_at=in:' + time.replace(' ', '%20')
)
'actions?created_at=in:' + time.replace(' ', '%20'))
self.assertEqual(200, resp.status)
action_names = [action['name'] for action in body['actions']]

View File

@ -107,7 +107,7 @@ class EventTriggerTestsV2(base.TestCase):
@decorators.attr(type='negative')
@decorators.idempotent_id('56b90a90-9ff3-42f8-a9eb-04a77198710e')
def test_get_nonexistent_event_trigger(self):
fake_id = '123e4567-e89b-12d3-a456-426655440000'
fake_id = '3771c152-d1a7-4a82-8a50-c79d122012dc'
self.assertRaises(exceptions.NotFound,
self.client.get_object,

View File

@ -19,6 +19,8 @@ from tempest.lib import exceptions
from mistral import utils
from mistral_tempest_tests.tests import base
import json
class ExecutionTestsV2(base.TestCase):
@ -72,8 +74,7 @@ class ExecutionTestsV2(base.TestCase):
self.assertIn(exec_id_2, [ex['id'] for ex in body['executions']])
resp, body = self.client.get_list_obj(
'executions?limit=1&sort_keys=workflow_name&sort_dirs=asc'
)
'executions?limit=1&sort_keys=workflow_name&sort_dirs=asc')
self.assertEqual(200, resp.status)
self.assertEqual(1, len(body['executions']))
@ -127,8 +128,8 @@ class ExecutionTestsV2(base.TestCase):
def test_create_execution_for_reverse_wf(self):
resp, body = self.client.create_execution(
self.reverse_wf['name'],
{self.reverse_wf['input']: "Bye"},
{"task_name": "goodbye"})
wf_input={self.reverse_wf['input']: "Bye"},
params={"task_name": "goodbye"})
exec_id = body['id']
self.assertEqual(201, resp.status)
@ -327,3 +328,91 @@ class ExecutionTestsV2(base.TestCase):
'executions',
exec_id
)
@decorators.idempotent_id('a882876b-7565-4f7f-9714-d99032ffaabb')
@decorators.attr(type='sanity')
def test_workflow_execution_of_nested_workflows_within_namespace(self):
low_wf = 'for_wf_namespace/lowest_level_wf.yaml'
middle_wf = 'for_wf_namespace/middle_wf.yaml'
top_wf = 'for_wf_namespace/top_level_wf.yaml'
resp, wf = self.client.create_workflow(low_wf)
self.assertEqual(201, resp.status)
namespace = 'abc'
resp, wf = self.client.create_workflow(low_wf, namespace=namespace)
self.assertEqual(201, resp.status)
resp, wf = self.client.create_workflow(middle_wf)
self.assertEqual(201, resp.status)
resp, wf = self.client.create_workflow(top_wf)
self.assertEqual(201, resp.status)
resp, wf = self.client.create_workflow(top_wf, namespace=namespace)
self.assertEqual(201, resp.status)
wf_name = wf['workflows'][0]['name']
resp, top_execution = self.client.create_execution(wf_name, namespace)
self.assertEqual(201, resp.status)
self.assertEqual('RUNNING', top_execution['state'])
self.assertEqual(wf_name, top_execution['workflow_name'])
self.assertEqual(wf_name, top_execution['workflow_name'])
self.assertEqual(namespace, top_execution['workflow_namespace'])
self.client.wait_execution(top_execution, target_state='SUCCESS')
self.assertEqual(
namespace,
json.loads(top_execution['params'])['namespace']
)
resp, tasks = self.client.get_tasks(top_execution['id'])
top_task = tasks['tasks'][0]
self.assertEqual(wf_name, top_task['workflow_name'])
self.assertEqual(namespace, top_task['workflow_namespace'])
resp, executions = self.client.get_executions(top_task['id'])
middle_execution = executions['executions'][0]
self.assertEqual('middle_wf', middle_execution['workflow_name'])
self.assertEqual('', middle_execution['workflow_namespace'])
self.assertEqual(
namespace,
json.loads(middle_execution['params'])['namespace']
)
resp, tasks = self.client.get_tasks(middle_execution['id'])
middle_task = tasks['tasks'][0]
self.assertEqual('middle_wf', middle_task['workflow_name'])
self.assertEqual('', middle_task['workflow_namespace'])
resp, executions = self.client.get_executions(middle_task['id'])
lowest_execution = executions['executions'][0]
self.assertEqual('lowest_level_wf', lowest_execution['workflow_name'])
self.assertEqual(namespace, lowest_execution['workflow_namespace'])
self.assertEqual(
namespace,
json.loads(lowest_execution['params'])['namespace']
)
resp, tasks = self.client.get_tasks(lowest_execution['id'])
lowest_task = tasks['tasks'][0]
self.assertEqual('lowest_level_wf', lowest_task['workflow_name'])
self.assertEqual(namespace, lowest_task['workflow_namespace'])
resp, action_executions = self.client.get_action_executions(
lowest_task['id']
)
action_execution = action_executions['action_executions'][0]
self.assertEqual('lowest_level_wf', action_execution['workflow_name'])
self.assertEqual(namespace, action_execution['workflow_namespace'])

View File

@ -63,8 +63,7 @@ class TasksTestsV2(base.TestCase):
@decorators.idempotent_id('3230d694-40fd-4094-ad12-024f40a21b94')
def test_get_tasks_of_execution(self):
resp, body = self.client.get_list_obj(
'tasks?workflow_execution_id=%s' % self.execution_id
)
'tasks?workflow_execution_id=%s' % self.execution_id)
self.assertEqual(200, resp.status)
self.assertEqual(

View File

@ -242,6 +242,58 @@ class WorkflowTestsV2(base.TestCase):
self.assertEqual(200, resp.status)
self.assertEqual(name, body['workflows'][0]['name'])
@decorators.attr(type='sanity')
@decorators.idempotent_id('42f5d135-a2b8-4a31-8135-c5ce8c5f1ed5')
def test_workflow_within_namespace(self):
self.useFixture(lockutils.LockFixture('mistral-workflow'))
namespace = 'abc'
resp, body = self.client.create_workflow(
'single_wf.yaml',
namespace=namespace
)
name = body['workflows'][0]['name']
id = body['workflows'][0]['id']
self.assertEqual(201, resp.status)
self.assertEqual(name, body['workflows'][0]['name'])
resp, body = self.client.get_workflow(
id
)
self.assertEqual(namespace, body['namespace'])
resp, body = self.client.update_workflow('single_wf.yaml', namespace)
self.assertEqual(200, resp.status)
self.assertEqual(name, body['workflows'][0]['name'])
self.assertEqual(namespace, body['workflows'][0]['namespace'])
namespace = 'abc2'
resp, body = self.client.create_workflow(
'single_wf.yaml',
namespace=namespace
)
name = body['workflows'][0]['name']
id = body['workflows'][0]['id']
self.assertEqual(201, resp.status)
self.assertEqual(name, body['workflows'][0]['name'])
resp, body = self.client.get_workflow(id)
self.assertEqual(namespace, body['namespace'])
self.assertRaises(exceptions.NotFound, self.client.get_workflow, name)
self.client.create_workflow(
'single_wf.yaml'
)
resp, body = self.client.get_workflow(id)
self.assertEqual(200, resp.status)
@decorators.attr(type='sanity')
@decorators.idempotent_id('02bc1fc3-c31a-4e37-bb3d-eda46818505c')
def test_get_workflow_definition(self):
@ -280,6 +332,15 @@ class WorkflowTestsV2(base.TestCase):
self.assertRaises(exceptions.NotFound, self.client.get_object,
'workflows', 'nonexist')
exception = self.assertRaises(
exceptions.NotFound,
self.client.get_workflow,
'nonexist_wf',
'nonexist_namespace'
)
self.assertIn('nonexist_wf', str(exception))
self.assertIn('nonexist_namespace', str(exception))
@decorators.attr(type='negative')
@decorators.idempotent_id('6b917213-7f11-423a-8fe0-55795dcf0fb2')
def test_double_create_workflows(self):