Merge "Style changes in policies and commands"
This commit is contained in:
commit
37abf53a27
|
@ -209,7 +209,5 @@ RESERVED_COMMANDS = {
|
|||
|
||||
|
||||
def get_reserved_command(cmd_name):
|
||||
if cmd_name not in RESERVED_COMMANDS:
|
||||
return None
|
||||
|
||||
return RESERVED_COMMANDS[cmd_name]()
|
||||
return RESERVED_COMMANDS[cmd_name]() if cmd_name in RESERVED_COMMANDS \
|
||||
else None
|
||||
|
|
|
@ -26,27 +26,29 @@ from mistral.services import scheduler
|
|||
from mistral.workflow import utils
|
||||
|
||||
|
||||
WORKFLOW_TRACE = logging.getLogger(cfg.CONF.workflow_trace_log_name)
|
||||
WF_TRACE = logging.getLogger(cfg.CONF.workflow_trace_log_name)
|
||||
|
||||
_ENGINE_CLIENT_PATH = 'mistral.engine1.rpc.get_engine_client'
|
||||
|
||||
|
||||
def _log_task_delay(task_name, state_from, delay_sec):
|
||||
wf_trace_msg = ("Task '%s' [%s -> %s, delay = %s sec]" %
|
||||
(task_name, state_from,
|
||||
states.DELAYED, delay_sec))
|
||||
WORKFLOW_TRACE.info(wf_trace_msg)
|
||||
WF_TRACE.info(
|
||||
"Task '%s' [%s -> %s, delay = %s sec]" %
|
||||
(task_name, state_from, states.DELAYED, delay_sec)
|
||||
)
|
||||
|
||||
|
||||
def build_policies(policies_spec, wf_spec):
|
||||
task_defaults = wf_spec.get_task_defaults()
|
||||
wf_policies = task_defaults.get_policies() if task_defaults else None
|
||||
|
||||
if not (policies_spec or wf_policies):
|
||||
return []
|
||||
|
||||
return construct_policies_list(policies_spec, wf_policies)
|
||||
|
||||
|
||||
def get_factories_list():
|
||||
def get_policy_factories():
|
||||
return [
|
||||
build_wait_before_policy,
|
||||
build_wait_after_policy,
|
||||
|
@ -56,10 +58,9 @@ def get_factories_list():
|
|||
|
||||
|
||||
def construct_policies_list(policies_spec, wf_policies):
|
||||
factories = get_factories_list()
|
||||
policies = []
|
||||
|
||||
for factory in factories:
|
||||
for factory in get_policy_factories():
|
||||
policy = factory(policies_spec)
|
||||
|
||||
if wf_policies and not policy:
|
||||
|
@ -262,25 +263,27 @@ class TimeoutPolicy(base.TaskPolicy):
|
|||
self.delay = timeout_sec
|
||||
|
||||
def before_task_start(self, task_db, task_spec):
|
||||
WORKFLOW_TRACE.info("Task %s is waiting completeness in %s seconds."
|
||||
% (task_db.name, self.delay))
|
||||
|
||||
fail_task_func_path = ('mistral.engine1.policies.'
|
||||
'fail_task_if_incomplete')
|
||||
|
||||
scheduler.schedule_call(
|
||||
None,
|
||||
fail_task_func_path,
|
||||
'mistral.engine1.policies.fail_task_if_incomplete',
|
||||
self.delay,
|
||||
task_id=task_db.id,
|
||||
timeout=self.delay
|
||||
)
|
||||
|
||||
WF_TRACE.info("Timeout check scheduled [task=%s, timeout(s)=%s]."
|
||||
% (task_db.id, self.delay))
|
||||
|
||||
|
||||
def fail_task_if_incomplete(task_id, timeout):
|
||||
task_db = db_api.get_task(task_id)
|
||||
|
||||
if not states.is_finished(task_db.state):
|
||||
msg = "Task failed: Timeout exceeded for %s seconds." % timeout
|
||||
WORKFLOW_TRACE.info(msg)
|
||||
result = utils.TaskResult(error=msg)
|
||||
rpc.get_engine_client().on_task_result(task_id, result)
|
||||
msg = "Task timed out [task=%s, timeout(s)=%s]." % (task_id, timeout)
|
||||
|
||||
WF_TRACE.info(msg)
|
||||
|
||||
rpc.get_engine_client().on_task_result(
|
||||
task_id,
|
||||
utils.TaskResult(error=msg)
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue