Running new workflow based on an existing execution.
This change would allow an operator to start a new workflow execution based on an existing one, it would also recall all parameters needed to start the workflow. Implements blueprint: mistral-run-workflow-from-execution Change-Id: Iec3d9f3a71a98952860b972acd09ce80e0a849ff
This commit is contained in:
parent
b448f88c46
commit
0562dad4bf
|
@ -32,6 +32,7 @@ from mistral import exceptions as exc
|
||||||
from mistral.rpc import clients as rpc
|
from mistral.rpc import clients as rpc
|
||||||
from mistral.services import workflows as wf_service
|
from mistral.services import workflows as wf_service
|
||||||
from mistral.utils import filter_utils
|
from mistral.utils import filter_utils
|
||||||
|
from mistral.utils import merge_dicts
|
||||||
from mistral.utils import rest_utils
|
from mistral.utils import rest_utils
|
||||||
from mistral.workflow import states
|
from mistral.workflow import states
|
||||||
|
|
||||||
|
@ -220,6 +221,10 @@ class ExecutionsController(rest.RestController):
|
||||||
|
|
||||||
exec_id = exec_dict.get('id')
|
exec_id = exec_dict.get('id')
|
||||||
|
|
||||||
|
source_execution_id = exec_dict.get('source_execution_id')
|
||||||
|
|
||||||
|
source_exec_dict = None
|
||||||
|
|
||||||
if exec_id:
|
if exec_id:
|
||||||
# If ID is present we need to check if such execution exists.
|
# If ID is present we need to check if such execution exists.
|
||||||
# If yes, the method just returns the object. If not, the ID
|
# If yes, the method just returns the object. If not, the ID
|
||||||
|
@ -229,8 +234,17 @@ class ExecutionsController(rest.RestController):
|
||||||
if wf_ex:
|
if wf_ex:
|
||||||
return resources.Execution.from_db_model(wf_ex)
|
return resources.Execution.from_db_model(wf_ex)
|
||||||
|
|
||||||
if not (exec_dict.get('workflow_id')
|
if source_execution_id:
|
||||||
or exec_dict.get('workflow_name')):
|
# If source execution is present we will perform a lookup for
|
||||||
|
# previous workflow execution model and the information to start
|
||||||
|
# a new workflow based on that information.
|
||||||
|
source_exec_dict = db_api.get_workflow_execution(
|
||||||
|
source_execution_id).to_dict()
|
||||||
|
|
||||||
|
result_exec_dict = merge_dicts(source_exec_dict, exec_dict)
|
||||||
|
|
||||||
|
if not (result_exec_dict.get('workflow_id') or
|
||||||
|
result_exec_dict.get('workflow_name')):
|
||||||
raise exc.WorkflowException(
|
raise exc.WorkflowException(
|
||||||
"Workflow ID or workflow name must be provided. Workflow ID is"
|
"Workflow ID or workflow name must be provided. Workflow ID is"
|
||||||
" recommended."
|
" recommended."
|
||||||
|
@ -239,12 +253,13 @@ class ExecutionsController(rest.RestController):
|
||||||
engine = rpc.get_engine_client()
|
engine = rpc.get_engine_client()
|
||||||
|
|
||||||
result = engine.start_workflow(
|
result = engine.start_workflow(
|
||||||
exec_dict.get('workflow_id', exec_dict.get('workflow_name')),
|
result_exec_dict.get('workflow_id',
|
||||||
exec_dict.get('workflow_namespace', ''),
|
result_exec_dict.get('workflow_name')),
|
||||||
|
result_exec_dict.get('workflow_namespace', ''),
|
||||||
exec_id,
|
exec_id,
|
||||||
exec_dict.get('input'),
|
result_exec_dict.get('input'),
|
||||||
exec_dict.get('description', ''),
|
result_exec_dict.get('description', ''),
|
||||||
**exec_dict.get('params') or {}
|
**result_exec_dict.get('params') or {}
|
||||||
)
|
)
|
||||||
|
|
||||||
return resources.Execution.from_dict(result)
|
return resources.Execution.from_dict(result)
|
||||||
|
|
|
@ -249,6 +249,11 @@ class Execution(resource.Resource):
|
||||||
root_execution_id = wtypes.text
|
root_execution_id = wtypes.text
|
||||||
"reference to the root execution"
|
"reference to the root execution"
|
||||||
|
|
||||||
|
source_execution_id = wtypes.text
|
||||||
|
"""reference to a workflow execution id which will signal the api to
|
||||||
|
perform a lookup of a current workflow_execution and create a replica
|
||||||
|
based on that workflow inputs and parameters"""
|
||||||
|
|
||||||
state = wtypes.text
|
state = wtypes.text
|
||||||
"state can be one of: IDLE, RUNNING, SUCCESS, ERROR, PAUSED"
|
"state can be one of: IDLE, RUNNING, SUCCESS, ERROR, PAUSED"
|
||||||
|
|
||||||
|
|
|
@ -570,6 +570,29 @@ class TestExecutionsController(base.APITest):
|
||||||
# corresponding object exists.
|
# corresponding object exists.
|
||||||
start_wf_func.assert_not_called()
|
start_wf_func.assert_not_called()
|
||||||
|
|
||||||
|
@mock.patch.object(db_api,
|
||||||
|
'get_workflow_execution',
|
||||||
|
mock.MagicMock(return_value=WF_EX_JSON))
|
||||||
|
@mock.patch.object(rpc_clients.EngineClient, 'start_workflow')
|
||||||
|
def test_post_with_source_execution_id(self, wf_exec_mock):
|
||||||
|
wf_exec_mock.return_value = WF_EX.to_dict()
|
||||||
|
|
||||||
|
resp = self.app.post_json('/v2/executions/', WF_EX_JSON_WITH_DESC)
|
||||||
|
|
||||||
|
self.assertEqual(201, resp.status_int)
|
||||||
|
self.assertDictEqual(WF_EX_JSON_WITH_DESC, resp.json)
|
||||||
|
|
||||||
|
exec_dict = WF_EX_JSON_WITH_DESC
|
||||||
|
|
||||||
|
wf_exec_mock.assert_called_once_with(
|
||||||
|
exec_dict['workflow_id'],
|
||||||
|
'',
|
||||||
|
exec_dict['id'],
|
||||||
|
json.loads(exec_dict['input']),
|
||||||
|
exec_dict['description'],
|
||||||
|
**json.loads(exec_dict['params'])
|
||||||
|
)
|
||||||
|
|
||||||
@mock.patch.object(
|
@mock.patch.object(
|
||||||
rpc_clients.EngineClient,
|
rpc_clients.EngineClient,
|
||||||
'start_workflow',
|
'start_workflow',
|
||||||
|
|
Loading…
Reference in New Issue