Add a way to save action executions that run synchronously
When using the command `mistral run-action`, by default it will run the action
synchronously unless the action can only be used asynchronously
(action.is_sync returns False). When it runs synchronously the result of the
action is not saved. When it is ran asynchronously the result is saved, as you
need to retrieve it from Mistral afterwards.
There is a argument on the command `--save-result` that can be used, it causes
the action to be ran asynchronously and the result is saved. There is no way
to have the action run synchronously and have the result be saved.
This patch adds a new API parameter `run_sync` which will be exposed by the
CLI as `--run-sync`. This new argument is intended to be used with `--save-
result` but can be be used independently to ensure an action isn't ran
synchronously by mistake. With the new argument the behaviour of the command
is now as follows:
* `mistral run-action` This behaves as it did before, it runs synchronously if
it can, or it schedules for later and saves the action.
* `mistral run-action --save-result` Again, this is the same as before, it
schedules the action to run later and the action is saved.
* `mistral run-action --run-sync` This is similar to having no argument
passed, however, if you try to run an action that can't be used
synchronously it will be rejected and an error is returned.
* `mistral run-action --run-sync --save-result` The combination of the two
arguments runs the actions synchronously and saves the result. If the action
can't be ran synchronously then an error is returned.
(This commit message uses the CLI to demonstrate the API usage, the new
argument is added in in a mistralclient patch. It can of course be used
directly via the API also.)
Change-Id: I4417750fd5ff47016357655370410e9e7348cc25
(cherry picked from commit 0ed4f05d63
)
This commit is contained in:
parent
c0a450128e
commit
6356bce814
|
@ -373,7 +373,7 @@ class ActionExecution(resource.Resource):
|
|||
output={'some_output': 'Hello, John Doe!'},
|
||||
created_at='1970-01-01T00:00:00.000000',
|
||||
updated_at='1970-01-01T00:00:00.000000',
|
||||
params={'save_result': True}
|
||||
params={'save_result': True, "run_sync": False}
|
||||
)
|
||||
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ from mistral.db.v2.sqlalchemy import models as db_models
|
|||
from mistral.engine import action_handler
|
||||
from mistral.engine import base
|
||||
from mistral.engine import workflow_handler as wf_handler
|
||||
from mistral import exceptions
|
||||
from mistral import utils as u
|
||||
from mistral.workflow import states
|
||||
|
||||
|
@ -62,27 +63,48 @@ class DefaultEngine(base.Engine, coordination.Service):
|
|||
|
||||
action.validate_input(action_input)
|
||||
|
||||
sync = params.get('run_sync')
|
||||
save = params.get('save_result')
|
||||
target = params.get('target')
|
||||
|
||||
if save or not action.is_sync(action_input):
|
||||
is_action_sync = action.is_sync(action_input)
|
||||
|
||||
if sync and not is_action_sync:
|
||||
raise exceptions.InputException(
|
||||
"Action does not support synchronous execution.")
|
||||
|
||||
if not sync and (save or not is_action_sync):
|
||||
action.schedule(action_input, target)
|
||||
|
||||
return action.action_ex.get_clone()
|
||||
|
||||
output = action.run(action_input, target, save=save)
|
||||
output = action.run(action_input, target, save=False)
|
||||
|
||||
state = states.SUCCESS if output.is_success() else states.ERROR
|
||||
|
||||
# Action execution is not created but we need to return similar
|
||||
# object to a client anyway.
|
||||
return db_models.ActionExecution(
|
||||
name=action_name,
|
||||
description=description,
|
||||
input=action_input,
|
||||
output=output.to_dict(),
|
||||
state=state
|
||||
)
|
||||
if not save:
|
||||
# Action execution is not created but we need to return similar
|
||||
# object to a client anyway.
|
||||
return db_models.ActionExecution(
|
||||
name=action_name,
|
||||
description=description,
|
||||
input=action_input,
|
||||
output=output.to_dict(),
|
||||
state=state
|
||||
)
|
||||
|
||||
action_ex_id = u.generate_unicode_uuid()
|
||||
|
||||
values = {
|
||||
'id': action_ex_id,
|
||||
'name': action_name,
|
||||
'description': description,
|
||||
'input': action_input,
|
||||
'output': output.to_dict(),
|
||||
'state': state,
|
||||
}
|
||||
|
||||
return db_api.create_action_execution(values)
|
||||
|
||||
@u.log_exec(LOG)
|
||||
@profiler.trace('engine-on-action-complete')
|
||||
|
|
|
@ -189,7 +189,7 @@ class TestActionExecutionsController(base.APITest):
|
|||
{
|
||||
'name': 'std.echo',
|
||||
'input': "{}",
|
||||
'params': '{"save_result": true}'
|
||||
'params': '{"save_result": true, "run_sync": true}'
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -204,7 +204,8 @@ class TestActionExecutionsController(base.APITest):
|
|||
action_exec['name'],
|
||||
json.loads(action_exec['input']),
|
||||
description=None,
|
||||
save_result=True
|
||||
save_result=True,
|
||||
run_sync=True
|
||||
)
|
||||
|
||||
@mock.patch.object(rpc.EngineClient, 'start_action')
|
||||
|
|
|
@ -126,6 +126,39 @@ class RunActionEngineTest(base.EngineTestCase):
|
|||
self.assertEqual(states.SUCCESS, action_ex.state)
|
||||
self.assertEqual({'result': 'Hello!'}, action_ex.output)
|
||||
|
||||
def test_run_action_run_sync(self):
|
||||
# Start action.
|
||||
action_ex = self.engine.start_action(
|
||||
'std.echo',
|
||||
{'output': 'Hello!'},
|
||||
run_sync=True
|
||||
)
|
||||
|
||||
self.assertEqual('Hello!', action_ex.output['result'])
|
||||
self.assertEqual(states.SUCCESS, action_ex.state)
|
||||
|
||||
def test_run_action_save_result_and_run_sync(self):
|
||||
# Start action.
|
||||
action_ex = self.engine.start_action(
|
||||
'std.echo',
|
||||
{'output': 'Hello!'},
|
||||
save_result=True,
|
||||
run_sync=True
|
||||
)
|
||||
|
||||
self.assertEqual('Hello!', action_ex.output['result'])
|
||||
self.assertEqual(states.SUCCESS, action_ex.state)
|
||||
|
||||
db_action_ex = db_api.get_action_execution(action_ex.id)
|
||||
self.assertEqual(states.SUCCESS, db_action_ex.state)
|
||||
self.assertEqual({'result': 'Hello!'}, db_action_ex.output)
|
||||
|
||||
def test_run_action_run_sync_error(self):
|
||||
# Start action.
|
||||
self.assertRaises(
|
||||
exc.InputException,
|
||||
self.engine.start_action, 'std.async_noop', {}, run_sync=True)
|
||||
|
||||
def test_run_action_async(self):
|
||||
action_ex = self.engine.start_action('std.async_noop', {})
|
||||
|
||||
|
@ -281,5 +314,5 @@ class RunActionEngineTest(base.EngineTestCase):
|
|||
run_mock.assert_called_once_with(
|
||||
{'input': 'Hello'},
|
||||
None,
|
||||
save=None
|
||||
save=False
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue