Merge "Running new workflow based on an existing execution."

This commit is contained in:
Zuul 2018-01-29 16:23:23 +00:00 committed by Gerrit Code Review
commit b70ba99e22
3 changed files with 50 additions and 7 deletions

View File

@ -32,6 +32,7 @@ from mistral import exceptions as exc
from mistral.rpc import clients as rpc
from mistral.services import workflows as wf_service
from mistral.utils import filter_utils
from mistral.utils import merge_dicts
from mistral.utils import rest_utils
from mistral.workflow import states
@ -220,6 +221,10 @@ class ExecutionsController(rest.RestController):
exec_id = exec_dict.get('id')
source_execution_id = exec_dict.get('source_execution_id')
source_exec_dict = None
if exec_id:
# If ID is present we need to check if such execution exists.
# If yes, the method just returns the object. If not, the ID
@ -229,8 +234,17 @@ class ExecutionsController(rest.RestController):
if wf_ex:
return resources.Execution.from_db_model(wf_ex)
if not (exec_dict.get('workflow_id')
or exec_dict.get('workflow_name')):
if source_execution_id:
# If source execution is present we will perform a lookup for
# previous workflow execution model and the information to start
# a new workflow based on that information.
source_exec_dict = db_api.get_workflow_execution(
source_execution_id).to_dict()
result_exec_dict = merge_dicts(source_exec_dict, exec_dict)
if not (result_exec_dict.get('workflow_id') or
result_exec_dict.get('workflow_name')):
raise exc.WorkflowException(
"Workflow ID or workflow name must be provided. Workflow ID is"
" recommended."
@ -239,12 +253,13 @@ class ExecutionsController(rest.RestController):
engine = rpc.get_engine_client()
result = engine.start_workflow(
exec_dict.get('workflow_id', exec_dict.get('workflow_name')),
exec_dict.get('workflow_namespace', ''),
result_exec_dict.get('workflow_id',
result_exec_dict.get('workflow_name')),
result_exec_dict.get('workflow_namespace', ''),
exec_id,
exec_dict.get('input'),
exec_dict.get('description', ''),
**exec_dict.get('params') or {}
result_exec_dict.get('input'),
result_exec_dict.get('description', ''),
**result_exec_dict.get('params') or {}
)
return resources.Execution.from_dict(result)

View File

@ -260,6 +260,11 @@ class Execution(resource.Resource):
root_execution_id = wtypes.text
"reference to the root execution"
source_execution_id = wtypes.text
"""reference to a workflow execution id which will signal the api to
perform a lookup of a current workflow_execution and create a replica
based on that workflow inputs and parameters"""
state = wtypes.text
"state can be one of: IDLE, RUNNING, SUCCESS, ERROR, PAUSED"

View File

@ -570,6 +570,29 @@ class TestExecutionsController(base.APITest):
# corresponding object exists.
start_wf_func.assert_not_called()
@mock.patch.object(db_api,
'get_workflow_execution',
mock.MagicMock(return_value=WF_EX_JSON))
@mock.patch.object(rpc_clients.EngineClient, 'start_workflow')
def test_post_with_source_execution_id(self, wf_exec_mock):
wf_exec_mock.return_value = WF_EX.to_dict()
resp = self.app.post_json('/v2/executions/', WF_EX_JSON_WITH_DESC)
self.assertEqual(201, resp.status_int)
self.assertDictEqual(WF_EX_JSON_WITH_DESC, resp.json)
exec_dict = WF_EX_JSON_WITH_DESC
wf_exec_mock.assert_called_once_with(
exec_dict['workflow_id'],
'',
exec_dict['id'],
json.loads(exec_dict['input']),
exec_dict['description'],
**json.loads(exec_dict['params'])
)
@mock.patch.object(
rpc_clients.EngineClient,
'start_workflow',