Really make the cron trigger execution interval configurable

Note: This is an update to I9060253bc416be28af4ef81f3edf694059d92066,
      which almost worked but due to a race condition with config loading
      the default value may have been used. This delays the decorator
      execution to allow the config to be fully loaded.

The cron_trigger subsystem in Mistral queries the database every second
to look for triggers that require execution. This can be very wasteful
if your deployment only has cron triggers that run infrequently (every
hour or day etc.). Letting operators configure this value reduces the
load and allows the cron triggers to be useful in more scenarios.

Operators do need to be aware that this configuration would limit the
frequency of execution. For example, if the value is set to 600 then cron
triggers configured to run every minute will only run every 10 minutes.

Related-Bug: #1747386
Change-Id: Ia12dcf7fa6b1784be226d223eb89b114c8e4b4f5
This commit is contained in:
Dougal Matthews 2018-02-07 16:50:27 +00:00
parent 7da5b880c9
commit 24be74635a
2 changed files with 65 additions and 56 deletions

View File

@ -34,66 +34,75 @@ CONF = cfg.CONF
_periodic_tasks = {}
class MistralPeriodicTasks(periodic_task.PeriodicTasks):
@periodic_task.periodic_task(spacing=CONF.cron_trigger.execution_interval,
run_immediately=True)
def process_cron_triggers_v2(self, ctx):
LOG.debug("Processing cron triggers...")
def process_cron_triggers_v2(self, ctx):
LOG.debug("Processing cron triggers...")
for trigger in triggers.get_next_cron_triggers():
LOG.debug("Processing cron trigger: %s", trigger)
for trigger in triggers.get_next_cron_triggers():
LOG.debug("Processing cron trigger: %s", trigger)
try:
# Setup admin context before schedule triggers.
ctx = security.create_context(
trigger.trust_id,
trigger.project_id
try:
# Setup admin context before schedule triggers.
ctx = security.create_context(
trigger.trust_id,
trigger.project_id
)
auth_ctx.set_ctx(ctx)
LOG.debug("Cron trigger security context: %s", ctx)
# Try to advance the cron trigger next_execution_time and
# remaining_executions if relevant.
modified = advance_cron_trigger(trigger)
# If cron trigger was not already modified by another engine.
if modified:
LOG.debug(
"Starting workflow '%s' by cron trigger '%s'",
trigger.workflow.name,
trigger.name
)
auth_ctx.set_ctx(ctx)
LOG.debug("Cron trigger security context: %s", ctx)
# Try to advance the cron trigger next_execution_time and
# remaining_executions if relevant.
modified = advance_cron_trigger(trigger)
# If cron trigger was not already modified by another engine.
if modified:
LOG.debug(
"Starting workflow '%s' by cron trigger '%s'",
trigger.workflow.name,
trigger.name
)
description = {
"description": (
"Workflow execution created by cron"
" trigger '(%s)'." % trigger.id
),
"triggered_by": {
"type": "cron_trigger",
"id": trigger.id,
"name": trigger.name,
}
description = {
"description": (
"Workflow execution created by cron"
" trigger '(%s)'." % trigger.id
),
"triggered_by": {
"type": "cron_trigger",
"id": trigger.id,
"name": trigger.name,
}
}
rpc.get_engine_client().start_workflow(
trigger.workflow.name,
trigger.workflow.namespace,
None,
trigger.workflow_input,
description=json.dumps(description),
**trigger.workflow_params
)
except Exception:
# Log and continue to next cron trigger.
LOG.exception(
"Failed to process cron trigger %s",
str(trigger)
rpc.get_engine_client().start_workflow(
trigger.workflow.name,
trigger.workflow.namespace,
None,
trigger.workflow_input,
description=json.dumps(description),
**trigger.workflow_params
)
finally:
auth_ctx.set_ctx(None)
except Exception:
# Log and continue to next cron trigger.
LOG.exception(
"Failed to process cron trigger %s",
str(trigger)
)
finally:
auth_ctx.set_ctx(None)
class MistralPeriodicTasks(periodic_task.PeriodicTasks):
def __init__(self, conf):
super(MistralPeriodicTasks, self).__init__(conf)
periodic_task_ = periodic_task.periodic_task(
spacing=CONF.cron_trigger.execution_interval,
run_immediately=True,
)
self.add_periodic_task(periodic_task_(process_cron_triggers_v2))
def advance_cron_trigger(t):

View File

@ -68,7 +68,7 @@ class ProcessCronTriggerTest(base.EngineTestCase):
next_trigger = triggers.get_next_cron_triggers()[0]
next_execution_time_before = next_trigger.next_execution_time
periodic.MistralPeriodicTasks(cfg.CONF).process_cron_triggers_v2(None)
periodic.process_cron_triggers_v2(None, None)
start_wf_mock = get_engine_client_mock.return_value.start_workflow
@ -121,7 +121,7 @@ class ProcessCronTriggerTest(base.EngineTestCase):
next_trigger = next_triggers[0]
next_execution_time_before = next_trigger.next_execution_time
periodic.MistralPeriodicTasks(cfg.CONF).process_cron_triggers_v2(None)
periodic.process_cron_triggers_v2(None, None)
next_triggers = triggers.get_next_cron_triggers()
@ -167,7 +167,7 @@ class ProcessCronTriggerTest(base.EngineTestCase):
cron_trigger.next_execution_time
)
periodic.MistralPeriodicTasks(cfg.CONF).process_cron_triggers_v2(None)
periodic.process_cron_triggers_v2(None, None)
# After process_triggers context is set to None, need to reset it.
auth_ctx.set_ctx(self.ctx)