Merge "Really make the cron trigger execution interval configurable"

This commit is contained in:
Zuul 2018-02-09 11:42:01 +00:00 committed by Gerrit Code Review
commit 0727c35bc1
2 changed files with 65 additions and 56 deletions

View File

@ -34,66 +34,75 @@ CONF = cfg.CONF
_periodic_tasks = {}
class MistralPeriodicTasks(periodic_task.PeriodicTasks):
@periodic_task.periodic_task(spacing=CONF.cron_trigger.execution_interval,
run_immediately=True)
def process_cron_triggers_v2(self, ctx):
LOG.debug("Processing cron triggers...")
def process_cron_triggers_v2(self, ctx):
LOG.debug("Processing cron triggers...")
for trigger in triggers.get_next_cron_triggers():
LOG.debug("Processing cron trigger: %s", trigger)
for trigger in triggers.get_next_cron_triggers():
LOG.debug("Processing cron trigger: %s", trigger)
try:
# Setup admin context before schedule triggers.
ctx = security.create_context(
trigger.trust_id,
trigger.project_id
try:
# Setup admin context before schedule triggers.
ctx = security.create_context(
trigger.trust_id,
trigger.project_id
)
auth_ctx.set_ctx(ctx)
LOG.debug("Cron trigger security context: %s", ctx)
# Try to advance the cron trigger next_execution_time and
# remaining_executions if relevant.
modified = advance_cron_trigger(trigger)
# If cron trigger was not already modified by another engine.
if modified:
LOG.debug(
"Starting workflow '%s' by cron trigger '%s'",
trigger.workflow.name,
trigger.name
)
auth_ctx.set_ctx(ctx)
LOG.debug("Cron trigger security context: %s", ctx)
# Try to advance the cron trigger next_execution_time and
# remaining_executions if relevant.
modified = advance_cron_trigger(trigger)
# If cron trigger was not already modified by another engine.
if modified:
LOG.debug(
"Starting workflow '%s' by cron trigger '%s'",
trigger.workflow.name,
trigger.name
)
description = {
"description": (
"Workflow execution created by cron"
" trigger '(%s)'." % trigger.id
),
"triggered_by": {
"type": "cron_trigger",
"id": trigger.id,
"name": trigger.name,
}
description = {
"description": (
"Workflow execution created by cron"
" trigger '(%s)'." % trigger.id
),
"triggered_by": {
"type": "cron_trigger",
"id": trigger.id,
"name": trigger.name,
}
}
rpc.get_engine_client().start_workflow(
trigger.workflow.name,
trigger.workflow.namespace,
None,
trigger.workflow_input,
description=json.dumps(description),
**trigger.workflow_params
)
except Exception:
# Log and continue to next cron trigger.
LOG.exception(
"Failed to process cron trigger %s",
str(trigger)
rpc.get_engine_client().start_workflow(
trigger.workflow.name,
trigger.workflow.namespace,
None,
trigger.workflow_input,
description=json.dumps(description),
**trigger.workflow_params
)
finally:
auth_ctx.set_ctx(None)
except Exception:
# Log and continue to next cron trigger.
LOG.exception(
"Failed to process cron trigger %s",
str(trigger)
)
finally:
auth_ctx.set_ctx(None)
class MistralPeriodicTasks(periodic_task.PeriodicTasks):
def __init__(self, conf):
super(MistralPeriodicTasks, self).__init__(conf)
periodic_task_ = periodic_task.periodic_task(
spacing=CONF.cron_trigger.execution_interval,
run_immediately=True,
)
self.add_periodic_task(periodic_task_(process_cron_triggers_v2))
def advance_cron_trigger(t):

View File

@ -68,7 +68,7 @@ class ProcessCronTriggerTest(base.EngineTestCase):
next_trigger = triggers.get_next_cron_triggers()[0]
next_execution_time_before = next_trigger.next_execution_time
periodic.MistralPeriodicTasks(cfg.CONF).process_cron_triggers_v2(None)
periodic.process_cron_triggers_v2(None, None)
start_wf_mock = get_engine_client_mock.return_value.start_workflow
@ -121,7 +121,7 @@ class ProcessCronTriggerTest(base.EngineTestCase):
next_trigger = next_triggers[0]
next_execution_time_before = next_trigger.next_execution_time
periodic.MistralPeriodicTasks(cfg.CONF).process_cron_triggers_v2(None)
periodic.process_cron_triggers_v2(None, None)
next_triggers = triggers.get_next_cron_triggers()
@ -167,7 +167,7 @@ class ProcessCronTriggerTest(base.EngineTestCase):
cron_trigger.next_execution_time
)
periodic.MistralPeriodicTasks(cfg.CONF).process_cron_triggers_v2(None)
periodic.process_cron_triggers_v2(None, None)
# After process_triggers context is set to None, need to reset it.
auth_ctx.set_ctx(self.ctx)