From 5be3101c141d9ecb4ce40a849e561c44ed6554e0 Mon Sep 17 00:00:00 2001 From: Renat Akhmerov Date: Tue, 23 Sep 2014 12:52:25 -0700 Subject: [PATCH] Style changes in policies and commands Change-Id: I74142da1c20b18acf5dd0a5618f84dcf6671e5f3 --- mistral/engine1/commands.py | 6 ++---- mistral/engine1/policies.py | 41 ++++++++++++++++++++----------------- 2 files changed, 24 insertions(+), 23 deletions(-) diff --git a/mistral/engine1/commands.py b/mistral/engine1/commands.py index 2ad5ad3a4..eb9e7f5ed 100644 --- a/mistral/engine1/commands.py +++ b/mistral/engine1/commands.py @@ -209,7 +209,5 @@ RESERVED_COMMANDS = { def get_reserved_command(cmd_name): - if cmd_name not in RESERVED_COMMANDS: - return None - - return RESERVED_COMMANDS[cmd_name]() + return RESERVED_COMMANDS[cmd_name]() if cmd_name in RESERVED_COMMANDS \ + else None diff --git a/mistral/engine1/policies.py b/mistral/engine1/policies.py index e0fbdab4d..84ee7705d 100644 --- a/mistral/engine1/policies.py +++ b/mistral/engine1/policies.py @@ -26,27 +26,29 @@ from mistral.services import scheduler from mistral.workflow import utils -WORKFLOW_TRACE = logging.getLogger(cfg.CONF.workflow_trace_log_name) +WF_TRACE = logging.getLogger(cfg.CONF.workflow_trace_log_name) + _ENGINE_CLIENT_PATH = 'mistral.engine1.rpc.get_engine_client' def _log_task_delay(task_name, state_from, delay_sec): - wf_trace_msg = ("Task '%s' [%s -> %s, delay = %s sec]" % - (task_name, state_from, - states.DELAYED, delay_sec)) - WORKFLOW_TRACE.info(wf_trace_msg) + WF_TRACE.info( + "Task '%s' [%s -> %s, delay = %s sec]" % + (task_name, state_from, states.DELAYED, delay_sec) + ) def build_policies(policies_spec, wf_spec): task_defaults = wf_spec.get_task_defaults() wf_policies = task_defaults.get_policies() if task_defaults else None + if not (policies_spec or wf_policies): return [] return construct_policies_list(policies_spec, wf_policies) -def get_factories_list(): +def get_policy_factories(): return [ build_wait_before_policy, build_wait_after_policy, @@ -56,10 +58,9 @@ def get_factories_list(): def construct_policies_list(policies_spec, wf_policies): - factories = get_factories_list() policies = [] - for factory in factories: + for factory in get_policy_factories(): policy = factory(policies_spec) if wf_policies and not policy: @@ -261,25 +262,27 @@ class TimeoutPolicy(base.TaskPolicy): self.delay = timeout_sec def before_task_start(self, task_db, task_spec): - WORKFLOW_TRACE.info("Task %s is waiting completeness in %s seconds." - % (task_db.name, self.delay)) - - fail_task_func_path = ('mistral.engine1.policies.' - 'fail_task_if_incomplete') - scheduler.schedule_call( None, - fail_task_func_path, + 'mistral.engine1.policies.fail_task_if_incomplete', self.delay, task_id=task_db.id, timeout=self.delay ) + WF_TRACE.info("Timeout check scheduled [task=%s, timeout(s)=%s]." + % (task_db.id, self.delay)) + def fail_task_if_incomplete(task_id, timeout): task_db = db_api.get_task(task_id) + if not states.is_finished(task_db.state): - msg = "Task failed: Timeout exceeded for %s seconds." % timeout - WORKFLOW_TRACE.info(msg) - result = utils.TaskResult(error=msg) - rpc.get_engine_client().on_task_result(task_id, result) + msg = "Task timed out [task=%s, timeout(s)=%s]." % (task_id, timeout) + + WF_TRACE.info(msg) + + rpc.get_engine_client().on_task_result( + task_id, + utils.TaskResult(error=msg) + )