Merge "Propagated a task timeout to a action execution"

This commit is contained in:
Zuul 2018-02-07 10:57:04 +00:00 committed by Gerrit Code Review
commit 23828f6a33
14 changed files with 145 additions and 37 deletions

View File

@ -57,7 +57,8 @@ def _process_queue(queue):
for operation, args in queue:
if operation == _RUN_ACTION:
action_ex, action_def, target, execution_context = args
action_ex, action_def, target, execution_context, \
timeout = args
executor.run_action(
action_ex.id,
@ -67,6 +68,7 @@ def _process_queue(queue):
action_ex.runtime_context.get('safe_rerun', False),
execution_context,
target=target,
timeout=timeout
)
elif operation == _ON_ACTION_COMPLETE:
action_ex_id, result, wf_action = args
@ -120,8 +122,9 @@ def process(func):
return decorate
def schedule_run_action(action_ex, action_def, target, execution_context):
args = (action_ex, action_def, target, execution_context)
def schedule_run_action(action_ex, action_def, target, execution_context,
timeout):
args = (action_ex, action_def, target, execution_context, timeout)
_get_queue().append((_RUN_ACTION, args))

View File

@ -91,7 +91,8 @@ class Action(object):
self.action_ex.state = state
@abc.abstractmethod
def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False):
def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False,
timeout=None):
"""Schedule action run.
This method is needed to schedule action run so its result can
@ -100,6 +101,8 @@ class Action(object):
executor asynchrony when executor doesn't immediately send a
result).
:param timeout: a period of time in seconds after which execution of
action will be interrupted
:param input_dict: Action input.
:param target: Target (group of action executors).
:param index: Action execution index. Makes sense for some types.
@ -111,13 +114,15 @@ class Action(object):
@abc.abstractmethod
def run(self, input_dict, target, index=0, desc='', save=True,
safe_rerun=False):
safe_rerun=False, timeout=None):
"""Immediately run action.
This method runs method w/o scheduling its run for a later time.
From engine perspective action will be processed in synchronous
mode.
:param timeout: a period of time in seconds after which execution of
action will be interrupted
:param input_dict: Action input.
:param target: Target (group of action executors).
:param index: Action execution index. Makes sense for some types.
@ -225,7 +230,8 @@ class PythonAction(Action):
self._log_result(prev_state, result)
@profiler.trace('action-schedule', hide_args=True)
def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False):
def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False,
timeout=None):
assert not self.action_ex
# Assign the action execution ID here to minimize database calls.
@ -248,11 +254,12 @@ class PythonAction(Action):
self.action_def,
target,
execution_context,
timeout=timeout
)
@profiler.trace('action-run', hide_args=True)
def run(self, input_dict, target, index=0, desc='', save=True,
safe_rerun=False):
safe_rerun=False, timeout=None):
assert not self.action_ex
input_dict = self._prepare_input(input_dict)
@ -284,7 +291,8 @@ class PythonAction(Action):
safe_rerun,
execution_context,
target=target,
async_=False
async_=False,
timeout=timeout
)
return self._prepare_output(result)
@ -512,7 +520,8 @@ class WorkflowAction(Action):
pass
@profiler.trace('workflkow-action-schedule', hide_args=True)
def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False):
def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False,
timeout=None):
assert not self.action_ex
parent_wf_ex = self.task_ex.workflow_execution
@ -564,7 +573,7 @@ class WorkflowAction(Action):
@profiler.trace('workflow-action-run', hide_args=True)
def run(self, input_dict, target, index=0, desc='', save=True,
safe_rerun=True):
safe_rerun=True, timeout=None):
raise NotImplementedError('Does not apply to this WorkflowAction.')
def is_sync(self, input_dict):

View File

@ -76,6 +76,7 @@ class DefaultEngine(base.Engine):
sync = params.get('run_sync')
save = params.get('save_result')
target = params.get('target')
timeout = params.get('timeout')
is_action_sync = action.is_sync(action_input)
@ -84,11 +85,12 @@ class DefaultEngine(base.Engine):
"Action does not support synchronous execution.")
if not sync and (save or not is_action_sync):
action.schedule(action_input, target)
action.schedule(action_input, target, timeout=timeout)
return action.action_ex.get_clone()
output = action.run(action_input, target, save=False)
output = action.run(action_input, target, save=False,
timeout=timeout)
state = states.SUCCESS if output.is_success() else states.ERROR

View File

@ -28,6 +28,8 @@ import six
_CONTINUE_TASK_PATH = 'mistral.engine.policies._continue_task'
_COMPLETE_TASK_PATH = 'mistral.engine.policies._complete_task'
_FAIL_IF_INCOMPLETE_TASK_PATH = \
'mistral.engine.policies._fail_task_if_incomplete'
def _log_task_delay(task_ex, delay_sec):
@ -423,7 +425,7 @@ class TimeoutPolicy(base.TaskPolicy):
scheduler.schedule_call(
None,
'mistral.engine.policies._fail_task_if_incomplete',
_FAIL_IF_INCOMPLETE_TASK_PATH,
self.delay,
task_ex_id=task_ex.id,
timeout=self.delay

View File

@ -424,7 +424,8 @@ class RegularTask(Task):
action.schedule(
input_dict,
target,
safe_rerun=self.task_spec.get_safe_rerun()
safe_rerun=self.task_spec.get_safe_rerun(),
timeout=self._get_timeout()
)
@profiler.trace('regular-task-get-target', hide_args=True)
@ -502,6 +503,22 @@ class RegularTask(Task):
return actions.PythonAction(action_def, task_ex=self.task_ex)
def _get_timeout(self):
timeout = self.task_spec.get_policies().get_timeout()
if not isinstance(timeout, (int, float)):
wf_ex = self.task_ex.workflow_execution
ctx_view = data_flow.ContextView(
self.task_ex.in_context,
wf_ex.context,
wf_ex.input
)
timeout = expr.evaluate_recursively(data=timeout, context=ctx_view)
return timeout if timeout > 0 else None
class WithItemsTask(RegularTask):
"""With-items task.
@ -588,7 +605,8 @@ class WithItemsTask(RegularTask):
input_dict,
target,
index=i,
safe_rerun=self.task_spec.get_safe_rerun()
safe_rerun=self.task_spec.get_safe_rerun(),
timeout=self._get_timeout()
)
self._decrease_capacity(1)

View File

@ -50,9 +50,11 @@ class Executor(object):
@abc.abstractmethod
def run_action(self, action_ex_id, action_cls_str, action_cls_attrs,
params, safe_rerun, execution_context, redelivered=False,
target=None, async_=True):
target=None, async_=True, timeout=None):
"""Runs action.
:param timeout: a period of time in seconds after which execution of
action will be interrupted
:param action_ex_id: Corresponding action execution id.
:param action_cls_str: Path to action class in dot notation.
:param action_cls_attrs: Attributes of action class which

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from eventlet import timeout as ev_timeout
from mistral_lib import actions as mistral_lib
from oslo_log import log as logging
from osprofiler import profiler
@ -35,7 +36,7 @@ class DefaultExecutor(base.Executor):
@profiler.trace('default-executor-run-action', hide_args=True)
def run_action(self, action_ex_id, action_cls_str, action_cls_attrs,
params, safe_rerun, execution_context, redelivered=False,
target=None, async_=True):
target=None, async_=True, timeout=None):
"""Runs action.
:param action_ex_id: Action execution id.
@ -51,6 +52,8 @@ class DefaultExecutor(base.Executor):
:param target: Target (group of action executors).
:param async_: If True, run action in asynchronous mode (w/o waiting
for completion).
:param timeout: a period of time in seconds after which execution of
action will be interrupted
:return: Action result.
"""
@ -102,14 +105,15 @@ class DefaultExecutor(base.Executor):
# Run action.
try:
# NOTE(d0ugal): If the action is a subclass of mistral-lib we know
# that it expects to be passed the context.
if isinstance(action, mistral_lib.Action):
action_ctx = context.create_action_context(execution_context)
result = action.run(action_ctx)
else:
result = action.run()
with ev_timeout.Timeout(seconds=timeout):
# NOTE(d0ugal): If the action is a subclass of mistral-lib we
# know that it expects to be passed the context.
if isinstance(action, mistral_lib.Action):
action_ctx = context.create_action_context(
execution_context)
result = action.run(action_ctx)
else:
result = action.run()
# Note: it's made for backwards compatibility with already
# existing Mistral actions which don't return result as
@ -117,7 +121,7 @@ class DefaultExecutor(base.Executor):
if not isinstance(result, mistral_lib.Result):
result = mistral_lib.Result(data=result)
except Exception as e:
except BaseException as e:
msg = (
"Failed to run action [action_ex_id=%s, action_cls='%s', "
"attributes='%s', params='%s']\n %s" % (

View File

@ -60,9 +60,14 @@ class ExecutorServer(service_base.MistralService):
self._rpc_server.stop(graceful)
def run_action(self, rpc_ctx, action_ex_id, action_cls_str,
action_cls_attrs, params, safe_rerun, execution_context):
action_cls_attrs, params, safe_rerun, execution_context,
timeout):
"""Receives calls over RPC to run action on executor.
:param timeout: a period of time in seconds after which execution of
action will be interrupted
:param execution_context: A dict of values providing information about
the current execution.
:param rpc_ctx: RPC request context dictionary.
:param action_ex_id: Action execution id.
:param action_cls_str: Action class name.
@ -74,11 +79,13 @@ class ExecutorServer(service_base.MistralService):
LOG.info(
"Received RPC request 'run_action'[action_ex_id=%s, "
"action_cls_str=%s, action_cls_attrs=%s, params=%s]",
"action_cls_str=%s, action_cls_attrs=%s, params=%s, "
"timeout=%s]",
action_ex_id,
action_cls_str,
action_cls_attrs,
utils.cut(params)
utils.cut(params),
timeout
)
redelivered = rpc_ctx.redelivered or False
@ -90,7 +97,8 @@ class ExecutorServer(service_base.MistralService):
params,
safe_rerun,
execution_context,
redelivered
redelivered,
timeout=timeout
)

View File

@ -313,7 +313,7 @@ class ExecutorClient(exe.Executor):
@profiler.trace('executor-client-run-action')
def run_action(self, action_ex_id, action_cls_str, action_cls_attrs,
params, safe_rerun, execution_context, redelivered=False,
target=None, async_=True):
target=None, async_=True, timeout=None):
"""Sends a request to run action to executor.
:param action_ex_id: Action execution id.
@ -322,11 +322,15 @@ class ExecutorClient(exe.Executor):
:param params: Action input parameters.
:param safe_rerun: If true, action would be re-run if executor dies
during execution.
:param execution_context: A dict of values providing information about
the current execution.
:param redelivered: Tells if given action was run before on another
executor.
:param target: Target (group of action executors).
:param async_: If True, run action in asynchronous mode (w/o waiting
for completion).
:param timeout: a period of time in seconds after which execution of
action will be interrupted
:return: Action result.
"""
@ -337,6 +341,7 @@ class ExecutorClient(exe.Executor):
'params': params,
'safe_rerun': safe_rerun,
'execution_context': execution_context,
'timeout': timeout
}
rpc_client_method = (self._client.async_call

View File

@ -288,6 +288,33 @@ class TestActionExecutionsController(base.APITest):
run_sync=True
)
@mock.patch.object(rpc_clients.EngineClient, 'start_action')
def test_post_with_timeout(self, f):
f.return_value = ACTION_EX_DB.to_dict()
resp = self.app.post_json(
'/v2/action_executions',
{
'name': 'std.echo',
'input': "{}",
'params': '{"timeout": 2}'
}
)
self.assertEqual(201, resp.status_int)
action_exec = copy.deepcopy(ACTION_EX)
del action_exec['task_name']
self.assertDictEqual(action_exec, resp.json)
f.assert_called_once_with(
action_exec['name'],
json.loads(action_exec['input']),
description=None,
timeout=2
)
@mock.patch.object(rpc_clients.EngineClient, 'start_action')
def test_post_json(self, f):
f.return_value = ACTION_EX_DB.to_dict()

View File

@ -80,8 +80,7 @@ workflows:
def _run_at_target(action_ex_id, action_cls_str, action_cls_attrs,
params, safe_rerun, execution_context, target=None,
async_=True):
async_=True, timeout=None):
# We'll just call executor directly for testing purposes.
executor = d_exe.DefaultExecutor()
@ -91,7 +90,10 @@ def _run_at_target(action_ex_id, action_cls_str, action_cls_attrs,
action_cls_attrs,
params,
safe_rerun,
execution_context=execution_context
execution_context=execution_context,
target=target,
async_=async_,
timeout=timeout
)
@ -188,7 +190,8 @@ class EnvironmentTest(base.EngineTestCase):
'workflow_name': wf1_ex.name,
'action_execution_id': a_ex.id,
},
target=TARGET
target=TARGET,
timeout=None
)
def test_subworkflow_env_task_input(self):

View File

@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from eventlet import timeout
import mock
from oslo_config import cfg
import requests
@ -1435,6 +1436,29 @@ class PoliciesTest(base.EngineTestCase):
self.await_workflow_error(wf_ex.id)
def test_action_timeout(self):
wb = """---
version: '2.0'
wf1:
tasks:
task1:
action: std.sleep seconds=10
timeout: 2
"""
wf_service.create_workflows(wb)
wf_ex = self.engine.start_workflow('wf1')
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
action_ex = task_ex.action_executions[0]
with timeout.Timeout(8):
self.await_workflow_error(wf_ex.id)
self.await_task_error(task_ex.id)
self.await_action_error(action_ex.id)
def test_pause_before_policy(self):
wb_service.create_workbook_v2(PAUSE_BEFORE_WB)

View File

@ -334,5 +334,6 @@ class RunActionEngineTest(base.EngineTestCase):
run_mock.assert_called_once_with(
{'input': 'Hello'},
None,
save=False
save=False,
timeout=None
)

View File

@ -26,7 +26,7 @@ from mistral.workflow import states
def _run_at_target(action_ex_id, action_class_str, attributes,
action_params, safe_rerun, execution_context, target=None,
async_=True):
async_=True, timeout=None):
# We'll just call executor directly for testing purposes.
executor = d_exe.DefaultExecutor()