Really make the cron trigger execution interval configurable
Note: This is an update to I9060253bc416be28af4ef81f3edf694059d92066, which almost worked but due to a race condition with config loading the default value may have been used. This delays the decorator execution to allow the config to be fully loaded. The cron_trigger subsystem in Mistral queries the database every second to look for triggers that require execution. This can be very wasteful if your deployment only has cron triggers that run infrequently (every hour or day etc.). Letting operators configure this value reduces the load and allows the cron triggers to be useful in more scenarios. Operators do need to be aware that this configuration would limit the frequency of execution. For example, if the value is set to 600 then cron triggers configured to run every minute will only run every 10 minutes. Related-Bug: #1747386 Change-Id: Ia12dcf7fa6b1784be226d223eb89b114c8e4b4f5
This commit is contained in:
parent
7da5b880c9
commit
24be74635a
|
@ -34,66 +34,75 @@ CONF = cfg.CONF
|
|||
_periodic_tasks = {}
|
||||
|
||||
|
||||
class MistralPeriodicTasks(periodic_task.PeriodicTasks):
|
||||
@periodic_task.periodic_task(spacing=CONF.cron_trigger.execution_interval,
|
||||
run_immediately=True)
|
||||
def process_cron_triggers_v2(self, ctx):
|
||||
LOG.debug("Processing cron triggers...")
|
||||
def process_cron_triggers_v2(self, ctx):
|
||||
LOG.debug("Processing cron triggers...")
|
||||
|
||||
for trigger in triggers.get_next_cron_triggers():
|
||||
LOG.debug("Processing cron trigger: %s", trigger)
|
||||
for trigger in triggers.get_next_cron_triggers():
|
||||
LOG.debug("Processing cron trigger: %s", trigger)
|
||||
|
||||
try:
|
||||
# Setup admin context before schedule triggers.
|
||||
ctx = security.create_context(
|
||||
trigger.trust_id,
|
||||
trigger.project_id
|
||||
try:
|
||||
# Setup admin context before schedule triggers.
|
||||
ctx = security.create_context(
|
||||
trigger.trust_id,
|
||||
trigger.project_id
|
||||
)
|
||||
|
||||
auth_ctx.set_ctx(ctx)
|
||||
|
||||
LOG.debug("Cron trigger security context: %s", ctx)
|
||||
|
||||
# Try to advance the cron trigger next_execution_time and
|
||||
# remaining_executions if relevant.
|
||||
modified = advance_cron_trigger(trigger)
|
||||
|
||||
# If cron trigger was not already modified by another engine.
|
||||
if modified:
|
||||
LOG.debug(
|
||||
"Starting workflow '%s' by cron trigger '%s'",
|
||||
trigger.workflow.name,
|
||||
trigger.name
|
||||
)
|
||||
|
||||
auth_ctx.set_ctx(ctx)
|
||||
|
||||
LOG.debug("Cron trigger security context: %s", ctx)
|
||||
|
||||
# Try to advance the cron trigger next_execution_time and
|
||||
# remaining_executions if relevant.
|
||||
modified = advance_cron_trigger(trigger)
|
||||
|
||||
# If cron trigger was not already modified by another engine.
|
||||
if modified:
|
||||
LOG.debug(
|
||||
"Starting workflow '%s' by cron trigger '%s'",
|
||||
trigger.workflow.name,
|
||||
trigger.name
|
||||
)
|
||||
|
||||
description = {
|
||||
"description": (
|
||||
"Workflow execution created by cron"
|
||||
" trigger '(%s)'." % trigger.id
|
||||
),
|
||||
"triggered_by": {
|
||||
"type": "cron_trigger",
|
||||
"id": trigger.id,
|
||||
"name": trigger.name,
|
||||
}
|
||||
description = {
|
||||
"description": (
|
||||
"Workflow execution created by cron"
|
||||
" trigger '(%s)'." % trigger.id
|
||||
),
|
||||
"triggered_by": {
|
||||
"type": "cron_trigger",
|
||||
"id": trigger.id,
|
||||
"name": trigger.name,
|
||||
}
|
||||
}
|
||||
|
||||
rpc.get_engine_client().start_workflow(
|
||||
trigger.workflow.name,
|
||||
trigger.workflow.namespace,
|
||||
None,
|
||||
trigger.workflow_input,
|
||||
description=json.dumps(description),
|
||||
**trigger.workflow_params
|
||||
)
|
||||
except Exception:
|
||||
# Log and continue to next cron trigger.
|
||||
LOG.exception(
|
||||
"Failed to process cron trigger %s",
|
||||
str(trigger)
|
||||
rpc.get_engine_client().start_workflow(
|
||||
trigger.workflow.name,
|
||||
trigger.workflow.namespace,
|
||||
None,
|
||||
trigger.workflow_input,
|
||||
description=json.dumps(description),
|
||||
**trigger.workflow_params
|
||||
)
|
||||
finally:
|
||||
auth_ctx.set_ctx(None)
|
||||
except Exception:
|
||||
# Log and continue to next cron trigger.
|
||||
LOG.exception(
|
||||
"Failed to process cron trigger %s",
|
||||
str(trigger)
|
||||
)
|
||||
finally:
|
||||
auth_ctx.set_ctx(None)
|
||||
|
||||
|
||||
class MistralPeriodicTasks(periodic_task.PeriodicTasks):
|
||||
|
||||
def __init__(self, conf):
|
||||
super(MistralPeriodicTasks, self).__init__(conf)
|
||||
|
||||
periodic_task_ = periodic_task.periodic_task(
|
||||
spacing=CONF.cron_trigger.execution_interval,
|
||||
run_immediately=True,
|
||||
)
|
||||
self.add_periodic_task(periodic_task_(process_cron_triggers_v2))
|
||||
|
||||
|
||||
def advance_cron_trigger(t):
|
||||
|
|
|
@ -68,7 +68,7 @@ class ProcessCronTriggerTest(base.EngineTestCase):
|
|||
next_trigger = triggers.get_next_cron_triggers()[0]
|
||||
next_execution_time_before = next_trigger.next_execution_time
|
||||
|
||||
periodic.MistralPeriodicTasks(cfg.CONF).process_cron_triggers_v2(None)
|
||||
periodic.process_cron_triggers_v2(None, None)
|
||||
|
||||
start_wf_mock = get_engine_client_mock.return_value.start_workflow
|
||||
|
||||
|
@ -121,7 +121,7 @@ class ProcessCronTriggerTest(base.EngineTestCase):
|
|||
next_trigger = next_triggers[0]
|
||||
next_execution_time_before = next_trigger.next_execution_time
|
||||
|
||||
periodic.MistralPeriodicTasks(cfg.CONF).process_cron_triggers_v2(None)
|
||||
periodic.process_cron_triggers_v2(None, None)
|
||||
|
||||
next_triggers = triggers.get_next_cron_triggers()
|
||||
|
||||
|
@ -167,7 +167,7 @@ class ProcessCronTriggerTest(base.EngineTestCase):
|
|||
cron_trigger.next_execution_time
|
||||
)
|
||||
|
||||
periodic.MistralPeriodicTasks(cfg.CONF).process_cron_triggers_v2(None)
|
||||
periodic.process_cron_triggers_v2(None, None)
|
||||
|
||||
# After process_triggers context is set to None, need to reset it.
|
||||
auth_ctx.set_ctx(self.ctx)
|
||||
|
|
Loading…
Reference in New Issue