diff --git a/mistral/services/periodic.py b/mistral/services/periodic.py index 84691bf57..d5e2b660b 100644 --- a/mistral/services/periodic.py +++ b/mistral/services/periodic.py @@ -34,66 +34,75 @@ CONF = cfg.CONF _periodic_tasks = {} -class MistralPeriodicTasks(periodic_task.PeriodicTasks): - @periodic_task.periodic_task(spacing=CONF.cron_trigger.execution_interval, - run_immediately=True) - def process_cron_triggers_v2(self, ctx): - LOG.debug("Processing cron triggers...") +def process_cron_triggers_v2(self, ctx): + LOG.debug("Processing cron triggers...") - for trigger in triggers.get_next_cron_triggers(): - LOG.debug("Processing cron trigger: %s", trigger) + for trigger in triggers.get_next_cron_triggers(): + LOG.debug("Processing cron trigger: %s", trigger) - try: - # Setup admin context before schedule triggers. - ctx = security.create_context( - trigger.trust_id, - trigger.project_id + try: + # Setup admin context before schedule triggers. + ctx = security.create_context( + trigger.trust_id, + trigger.project_id + ) + + auth_ctx.set_ctx(ctx) + + LOG.debug("Cron trigger security context: %s", ctx) + + # Try to advance the cron trigger next_execution_time and + # remaining_executions if relevant. + modified = advance_cron_trigger(trigger) + + # If cron trigger was not already modified by another engine. + if modified: + LOG.debug( + "Starting workflow '%s' by cron trigger '%s'", + trigger.workflow.name, + trigger.name ) - auth_ctx.set_ctx(ctx) - - LOG.debug("Cron trigger security context: %s", ctx) - - # Try to advance the cron trigger next_execution_time and - # remaining_executions if relevant. - modified = advance_cron_trigger(trigger) - - # If cron trigger was not already modified by another engine. - if modified: - LOG.debug( - "Starting workflow '%s' by cron trigger '%s'", - trigger.workflow.name, - trigger.name - ) - - description = { - "description": ( - "Workflow execution created by cron" - " trigger '(%s)'." % trigger.id - ), - "triggered_by": { - "type": "cron_trigger", - "id": trigger.id, - "name": trigger.name, - } + description = { + "description": ( + "Workflow execution created by cron" + " trigger '(%s)'." % trigger.id + ), + "triggered_by": { + "type": "cron_trigger", + "id": trigger.id, + "name": trigger.name, } + } - rpc.get_engine_client().start_workflow( - trigger.workflow.name, - trigger.workflow.namespace, - None, - trigger.workflow_input, - description=json.dumps(description), - **trigger.workflow_params - ) - except Exception: - # Log and continue to next cron trigger. - LOG.exception( - "Failed to process cron trigger %s", - str(trigger) + rpc.get_engine_client().start_workflow( + trigger.workflow.name, + trigger.workflow.namespace, + None, + trigger.workflow_input, + description=json.dumps(description), + **trigger.workflow_params ) - finally: - auth_ctx.set_ctx(None) + except Exception: + # Log and continue to next cron trigger. + LOG.exception( + "Failed to process cron trigger %s", + str(trigger) + ) + finally: + auth_ctx.set_ctx(None) + + +class MistralPeriodicTasks(periodic_task.PeriodicTasks): + + def __init__(self, conf): + super(MistralPeriodicTasks, self).__init__(conf) + + periodic_task_ = periodic_task.periodic_task( + spacing=CONF.cron_trigger.execution_interval, + run_immediately=True, + ) + self.add_periodic_task(periodic_task_(process_cron_triggers_v2)) def advance_cron_trigger(t): diff --git a/mistral/tests/unit/engine/test_cron_trigger.py b/mistral/tests/unit/engine/test_cron_trigger.py index ffedaab0a..9089ac65b 100644 --- a/mistral/tests/unit/engine/test_cron_trigger.py +++ b/mistral/tests/unit/engine/test_cron_trigger.py @@ -68,7 +68,7 @@ class ProcessCronTriggerTest(base.EngineTestCase): next_trigger = triggers.get_next_cron_triggers()[0] next_execution_time_before = next_trigger.next_execution_time - periodic.MistralPeriodicTasks(cfg.CONF).process_cron_triggers_v2(None) + periodic.process_cron_triggers_v2(None, None) start_wf_mock = get_engine_client_mock.return_value.start_workflow @@ -121,7 +121,7 @@ class ProcessCronTriggerTest(base.EngineTestCase): next_trigger = next_triggers[0] next_execution_time_before = next_trigger.next_execution_time - periodic.MistralPeriodicTasks(cfg.CONF).process_cron_triggers_v2(None) + periodic.process_cron_triggers_v2(None, None) next_triggers = triggers.get_next_cron_triggers() @@ -167,7 +167,7 @@ class ProcessCronTriggerTest(base.EngineTestCase): cron_trigger.next_execution_time ) - periodic.MistralPeriodicTasks(cfg.CONF).process_cron_triggers_v2(None) + periodic.process_cron_triggers_v2(None, None) # After process_triggers context is set to None, need to reset it. auth_ctx.set_ctx(self.ctx)