Merge "Style changes in policies and commands"

This commit is contained in:
Jenkins 2014-09-24 17:58:55 +00:00 committed by Gerrit Code Review
commit 37abf53a27
2 changed files with 24 additions and 23 deletions

View File

@ -209,7 +209,5 @@ RESERVED_COMMANDS = {
def get_reserved_command(cmd_name):
if cmd_name not in RESERVED_COMMANDS:
return None
return RESERVED_COMMANDS[cmd_name]()
return RESERVED_COMMANDS[cmd_name]() if cmd_name in RESERVED_COMMANDS \
else None

View File

@ -26,27 +26,29 @@ from mistral.services import scheduler
from mistral.workflow import utils
WORKFLOW_TRACE = logging.getLogger(cfg.CONF.workflow_trace_log_name)
WF_TRACE = logging.getLogger(cfg.CONF.workflow_trace_log_name)
_ENGINE_CLIENT_PATH = 'mistral.engine1.rpc.get_engine_client'
def _log_task_delay(task_name, state_from, delay_sec):
wf_trace_msg = ("Task '%s' [%s -> %s, delay = %s sec]" %
(task_name, state_from,
states.DELAYED, delay_sec))
WORKFLOW_TRACE.info(wf_trace_msg)
WF_TRACE.info(
"Task '%s' [%s -> %s, delay = %s sec]" %
(task_name, state_from, states.DELAYED, delay_sec)
)
def build_policies(policies_spec, wf_spec):
task_defaults = wf_spec.get_task_defaults()
wf_policies = task_defaults.get_policies() if task_defaults else None
if not (policies_spec or wf_policies):
return []
return construct_policies_list(policies_spec, wf_policies)
def get_factories_list():
def get_policy_factories():
return [
build_wait_before_policy,
build_wait_after_policy,
@ -56,10 +58,9 @@ def get_factories_list():
def construct_policies_list(policies_spec, wf_policies):
factories = get_factories_list()
policies = []
for factory in factories:
for factory in get_policy_factories():
policy = factory(policies_spec)
if wf_policies and not policy:
@ -262,25 +263,27 @@ class TimeoutPolicy(base.TaskPolicy):
self.delay = timeout_sec
def before_task_start(self, task_db, task_spec):
WORKFLOW_TRACE.info("Task %s is waiting completeness in %s seconds."
% (task_db.name, self.delay))
fail_task_func_path = ('mistral.engine1.policies.'
'fail_task_if_incomplete')
scheduler.schedule_call(
None,
fail_task_func_path,
'mistral.engine1.policies.fail_task_if_incomplete',
self.delay,
task_id=task_db.id,
timeout=self.delay
)
WF_TRACE.info("Timeout check scheduled [task=%s, timeout(s)=%s]."
% (task_db.id, self.delay))
def fail_task_if_incomplete(task_id, timeout):
task_db = db_api.get_task(task_id)
if not states.is_finished(task_db.state):
msg = "Task failed: Timeout exceeded for %s seconds." % timeout
WORKFLOW_TRACE.info(msg)
result = utils.TaskResult(error=msg)
rpc.get_engine_client().on_task_result(task_id, result)
msg = "Task timed out [task=%s, timeout(s)=%s]." % (task_id, timeout)
WF_TRACE.info(msg)
rpc.get_engine_client().on_task_result(
task_id,
utils.TaskResult(error=msg)
)