Propagated a task timeout to a action execution

It shall be possible to specify timeout for Mistral actions in order
to cancel some long-performed action so that to provide predictable
execution time for client service.
Currently Mistral allows configure timeout on task and automatically
changes task status to error. However mistral don't interrupt action
execution.
We need Mistral to terminate timed out action execution, because there
might be the following issues:
* several the same action executions can run at the same time breaking
data consistency
* stale action executions may lead to the massive resources
consumption (memory, cpu..)

Change-Id: I2a960110663627a54b8150917fd01eec68e8933d
Signed-off-by: Vitalii Solodilov <mcdkr@yandex.ru>
This commit is contained in:
Vitalii Solodilov 2018-01-30 12:50:57 +04:00
parent 540f8d67e7
commit b79f91e9ec
14 changed files with 145 additions and 37 deletions

View File

@ -57,7 +57,8 @@ def _process_queue(queue):
for operation, args in queue:
if operation == _RUN_ACTION:
action_ex, action_def, target, execution_context = args
action_ex, action_def, target, execution_context, \
timeout = args
executor.run_action(
action_ex.id,
@ -67,6 +68,7 @@ def _process_queue(queue):
action_ex.runtime_context.get('safe_rerun', False),
execution_context,
target=target,
timeout=timeout
)
elif operation == _ON_ACTION_COMPLETE:
action_ex_id, result, wf_action = args
@ -120,8 +122,9 @@ def process(func):
return decorate
def schedule_run_action(action_ex, action_def, target, execution_context):
args = (action_ex, action_def, target, execution_context)
def schedule_run_action(action_ex, action_def, target, execution_context,
timeout):
args = (action_ex, action_def, target, execution_context, timeout)
_get_queue().append((_RUN_ACTION, args))

View File

@ -91,7 +91,8 @@ class Action(object):
self.action_ex.state = state
@abc.abstractmethod
def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False):
def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False,
timeout=None):
"""Schedule action run.
This method is needed to schedule action run so its result can
@ -100,6 +101,8 @@ class Action(object):
executor asynchrony when executor doesn't immediately send a
result).
:param timeout: a period of time in seconds after which execution of
action will be interrupted
:param input_dict: Action input.
:param target: Target (group of action executors).
:param index: Action execution index. Makes sense for some types.
@ -111,13 +114,15 @@ class Action(object):
@abc.abstractmethod
def run(self, input_dict, target, index=0, desc='', save=True,
safe_rerun=False):
safe_rerun=False, timeout=None):
"""Immediately run action.
This method runs method w/o scheduling its run for a later time.
From engine perspective action will be processed in synchronous
mode.
:param timeout: a period of time in seconds after which execution of
action will be interrupted
:param input_dict: Action input.
:param target: Target (group of action executors).
:param index: Action execution index. Makes sense for some types.
@ -225,7 +230,8 @@ class PythonAction(Action):
self._log_result(prev_state, result)
@profiler.trace('action-schedule', hide_args=True)
def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False):
def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False,
timeout=None):
assert not self.action_ex
# Assign the action execution ID here to minimize database calls.
@ -248,11 +254,12 @@ class PythonAction(Action):
self.action_def,
target,
execution_context,
timeout=timeout
)
@profiler.trace('action-run', hide_args=True)
def run(self, input_dict, target, index=0, desc='', save=True,
safe_rerun=False):
safe_rerun=False, timeout=None):
assert not self.action_ex
input_dict = self._prepare_input(input_dict)
@ -284,7 +291,8 @@ class PythonAction(Action):
safe_rerun,
execution_context,
target=target,
async_=False
async_=False,
timeout=timeout
)
return self._prepare_output(result)
@ -512,7 +520,8 @@ class WorkflowAction(Action):
pass
@profiler.trace('workflkow-action-schedule', hide_args=True)
def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False):
def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False,
timeout=None):
assert not self.action_ex
parent_wf_ex = self.task_ex.workflow_execution
@ -564,7 +573,7 @@ class WorkflowAction(Action):
@profiler.trace('workflow-action-run', hide_args=True)
def run(self, input_dict, target, index=0, desc='', save=True,
safe_rerun=True):
safe_rerun=True, timeout=None):
raise NotImplementedError('Does not apply to this WorkflowAction.')
def is_sync(self, input_dict):

View File

@ -76,6 +76,7 @@ class DefaultEngine(base.Engine):
sync = params.get('run_sync')
save = params.get('save_result')
target = params.get('target')
timeout = params.get('timeout')
is_action_sync = action.is_sync(action_input)
@ -84,11 +85,12 @@ class DefaultEngine(base.Engine):
"Action does not support synchronous execution.")
if not sync and (save or not is_action_sync):
action.schedule(action_input, target)
action.schedule(action_input, target, timeout=timeout)
return action.action_ex.get_clone()
output = action.run(action_input, target, save=False)
output = action.run(action_input, target, save=False,
timeout=timeout)
state = states.SUCCESS if output.is_success() else states.ERROR

View File

@ -28,6 +28,8 @@ import six
_CONTINUE_TASK_PATH = 'mistral.engine.policies._continue_task'
_COMPLETE_TASK_PATH = 'mistral.engine.policies._complete_task'
_FAIL_IF_INCOMPLETE_TASK_PATH = \
'mistral.engine.policies._fail_task_if_incomplete'
def _log_task_delay(task_ex, delay_sec):
@ -423,7 +425,7 @@ class TimeoutPolicy(base.TaskPolicy):
scheduler.schedule_call(
None,
'mistral.engine.policies._fail_task_if_incomplete',
_FAIL_IF_INCOMPLETE_TASK_PATH,
self.delay,
task_ex_id=task_ex.id,
timeout=self.delay

View File

@ -424,7 +424,8 @@ class RegularTask(Task):
action.schedule(
input_dict,
target,
safe_rerun=self.task_spec.get_safe_rerun()
safe_rerun=self.task_spec.get_safe_rerun(),
timeout=self._get_timeout()
)
@profiler.trace('regular-task-get-target', hide_args=True)
@ -502,6 +503,22 @@ class RegularTask(Task):
return actions.PythonAction(action_def, task_ex=self.task_ex)
def _get_timeout(self):
timeout = self.task_spec.get_policies().get_timeout()
if not isinstance(timeout, (int, float)):
wf_ex = self.task_ex.workflow_execution
ctx_view = data_flow.ContextView(
self.task_ex.in_context,
wf_ex.context,
wf_ex.input
)
timeout = expr.evaluate_recursively(data=timeout, context=ctx_view)
return timeout if timeout > 0 else None
class WithItemsTask(RegularTask):
"""With-items task.
@ -588,7 +605,8 @@ class WithItemsTask(RegularTask):
input_dict,
target,
index=i,
safe_rerun=self.task_spec.get_safe_rerun()
safe_rerun=self.task_spec.get_safe_rerun(),
timeout=self._get_timeout()
)
self._decrease_capacity(1)

View File

@ -50,9 +50,11 @@ class Executor(object):
@abc.abstractmethod
def run_action(self, action_ex_id, action_cls_str, action_cls_attrs,
params, safe_rerun, execution_context, redelivered=False,
target=None, async_=True):
target=None, async_=True, timeout=None):
"""Runs action.
:param timeout: a period of time in seconds after which execution of
action will be interrupted
:param action_ex_id: Corresponding action execution id.
:param action_cls_str: Path to action class in dot notation.
:param action_cls_attrs: Attributes of action class which

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from eventlet import timeout as ev_timeout
from mistral_lib import actions as mistral_lib
from oslo_log import log as logging
from osprofiler import profiler
@ -35,7 +36,7 @@ class DefaultExecutor(base.Executor):
@profiler.trace('default-executor-run-action', hide_args=True)
def run_action(self, action_ex_id, action_cls_str, action_cls_attrs,
params, safe_rerun, execution_context, redelivered=False,
target=None, async_=True):
target=None, async_=True, timeout=None):
"""Runs action.
:param action_ex_id: Action execution id.
@ -51,6 +52,8 @@ class DefaultExecutor(base.Executor):
:param target: Target (group of action executors).
:param async_: If True, run action in asynchronous mode (w/o waiting
for completion).
:param timeout: a period of time in seconds after which execution of
action will be interrupted
:return: Action result.
"""
@ -102,14 +105,15 @@ class DefaultExecutor(base.Executor):
# Run action.
try:
# NOTE(d0ugal): If the action is a subclass of mistral-lib we know
# that it expects to be passed the context.
if isinstance(action, mistral_lib.Action):
action_ctx = context.create_action_context(execution_context)
result = action.run(action_ctx)
else:
result = action.run()
with ev_timeout.Timeout(seconds=timeout):
# NOTE(d0ugal): If the action is a subclass of mistral-lib we
# know that it expects to be passed the context.
if isinstance(action, mistral_lib.Action):
action_ctx = context.create_action_context(
execution_context)
result = action.run(action_ctx)
else:
result = action.run()
# Note: it's made for backwards compatibility with already
# existing Mistral actions which don't return result as
@ -117,7 +121,7 @@ class DefaultExecutor(base.Executor):
if not isinstance(result, mistral_lib.Result):
result = mistral_lib.Result(data=result)
except Exception as e:
except BaseException as e:
msg = (
"Failed to run action [action_ex_id=%s, action_cls='%s', "
"attributes='%s', params='%s']\n %s" % (

View File

@ -60,9 +60,14 @@ class ExecutorServer(service_base.MistralService):
self._rpc_server.stop(graceful)
def run_action(self, rpc_ctx, action_ex_id, action_cls_str,
action_cls_attrs, params, safe_rerun, execution_context):
action_cls_attrs, params, safe_rerun, execution_context,
timeout):
"""Receives calls over RPC to run action on executor.
:param timeout: a period of time in seconds after which execution of
action will be interrupted
:param execution_context: A dict of values providing information about
the current execution.
:param rpc_ctx: RPC request context dictionary.
:param action_ex_id: Action execution id.
:param action_cls_str: Action class name.
@ -74,11 +79,13 @@ class ExecutorServer(service_base.MistralService):
LOG.info(
"Received RPC request 'run_action'[action_ex_id=%s, "
"action_cls_str=%s, action_cls_attrs=%s, params=%s]",
"action_cls_str=%s, action_cls_attrs=%s, params=%s, "
"timeout=%s]",
action_ex_id,
action_cls_str,
action_cls_attrs,
utils.cut(params)
utils.cut(params),
timeout
)
redelivered = rpc_ctx.redelivered or False
@ -90,7 +97,8 @@ class ExecutorServer(service_base.MistralService):
params,
safe_rerun,
execution_context,
redelivered
redelivered,
timeout=timeout
)

View File

@ -313,7 +313,7 @@ class ExecutorClient(exe.Executor):
@profiler.trace('executor-client-run-action')
def run_action(self, action_ex_id, action_cls_str, action_cls_attrs,
params, safe_rerun, execution_context, redelivered=False,
target=None, async_=True):
target=None, async_=True, timeout=None):
"""Sends a request to run action to executor.
:param action_ex_id: Action execution id.
@ -322,11 +322,15 @@ class ExecutorClient(exe.Executor):
:param params: Action input parameters.
:param safe_rerun: If true, action would be re-run if executor dies
during execution.
:param execution_context: A dict of values providing information about
the current execution.
:param redelivered: Tells if given action was run before on another
executor.
:param target: Target (group of action executors).
:param async_: If True, run action in asynchronous mode (w/o waiting
for completion).
:param timeout: a period of time in seconds after which execution of
action will be interrupted
:return: Action result.
"""
@ -337,6 +341,7 @@ class ExecutorClient(exe.Executor):
'params': params,
'safe_rerun': safe_rerun,
'execution_context': execution_context,
'timeout': timeout
}
rpc_client_method = (self._client.async_call

View File

@ -288,6 +288,33 @@ class TestActionExecutionsController(base.APITest):
run_sync=True
)
@mock.patch.object(rpc_clients.EngineClient, 'start_action')
def test_post_with_timeout(self, f):
f.return_value = ACTION_EX_DB.to_dict()
resp = self.app.post_json(
'/v2/action_executions',
{
'name': 'std.echo',
'input': "{}",
'params': '{"timeout": 2}'
}
)
self.assertEqual(201, resp.status_int)
action_exec = copy.deepcopy(ACTION_EX)
del action_exec['task_name']
self.assertDictEqual(action_exec, resp.json)
f.assert_called_once_with(
action_exec['name'],
json.loads(action_exec['input']),
description=None,
timeout=2
)
@mock.patch.object(rpc_clients.EngineClient, 'start_action')
def test_post_json(self, f):
f.return_value = ACTION_EX_DB.to_dict()

View File

@ -80,8 +80,7 @@ workflows:
def _run_at_target(action_ex_id, action_cls_str, action_cls_attrs,
params, safe_rerun, execution_context, target=None,
async_=True):
async_=True, timeout=None):
# We'll just call executor directly for testing purposes.
executor = d_exe.DefaultExecutor()
@ -91,7 +90,10 @@ def _run_at_target(action_ex_id, action_cls_str, action_cls_attrs,
action_cls_attrs,
params,
safe_rerun,
execution_context=execution_context
execution_context=execution_context,
target=target,
async_=async_,
timeout=timeout
)
@ -188,7 +190,8 @@ class EnvironmentTest(base.EngineTestCase):
'workflow_name': wf1_ex.name,
'action_execution_id': a_ex.id,
},
target=TARGET
target=TARGET,
timeout=None
)
def test_subworkflow_env_task_input(self):

View File

@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from eventlet import timeout
import mock
from oslo_config import cfg
import requests
@ -1435,6 +1436,29 @@ class PoliciesTest(base.EngineTestCase):
self.await_workflow_error(wf_ex.id)
def test_action_timeout(self):
wb = """---
version: '2.0'
wf1:
tasks:
task1:
action: std.sleep seconds=10
timeout: 2
"""
wf_service.create_workflows(wb)
wf_ex = self.engine.start_workflow('wf1')
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
action_ex = task_ex.action_executions[0]
with timeout.Timeout(8):
self.await_workflow_error(wf_ex.id)
self.await_task_error(task_ex.id)
self.await_action_error(action_ex.id)
def test_pause_before_policy(self):
wb_service.create_workbook_v2(PAUSE_BEFORE_WB)

View File

@ -334,5 +334,6 @@ class RunActionEngineTest(base.EngineTestCase):
run_mock.assert_called_once_with(
{'input': 'Hello'},
None,
save=False
save=False,
timeout=None
)

View File

@ -26,7 +26,7 @@ from mistral.workflow import states
def _run_at_target(action_ex_id, action_class_str, attributes,
action_params, safe_rerun, execution_context, target=None,
async_=True):
async_=True, timeout=None):
# We'll just call executor directly for testing purposes.
executor = d_exe.DefaultExecutor()