Merge "Performance: remove unnecessary workflow execution update" into stable/rocky
This commit is contained in:
commit
f60d417164
|
@ -1147,6 +1147,7 @@ def get_expired_executions(expiration_time, limit=None, columns=(),
|
||||||
@b.session_aware()
|
@b.session_aware()
|
||||||
def get_running_expired_sync_actions(expiration_time, session=None):
|
def get_running_expired_sync_actions(expiration_time, session=None):
|
||||||
query = b.model_query(models.ActionExecution)
|
query = b.model_query(models.ActionExecution)
|
||||||
|
|
||||||
query = query.filter(
|
query = query.filter(
|
||||||
models.ActionExecution.last_heartbeat < expiration_time
|
models.ActionExecution.last_heartbeat < expiration_time
|
||||||
)
|
)
|
||||||
|
|
|
@ -98,11 +98,20 @@ def _save_command_to_backlog(wf_ex, cmd):
|
||||||
|
|
||||||
|
|
||||||
def _poll_commands_from_backlog(wf_ex):
|
def _poll_commands_from_backlog(wf_ex):
|
||||||
backlog_cmds = wf_ex.runtime_context.pop(BACKLOG_KEY, [])
|
# NOTE: We need to always use a guard condition that checks
|
||||||
|
# if a persistent structure is empty and, as in this case,
|
||||||
if not backlog_cmds:
|
# return immediately w/o doing any further manipulations.
|
||||||
|
# Otherwise, if we do pop() operation with a default value
|
||||||
|
# then the ORM framework will consider it a modification of
|
||||||
|
# the persistent object and generate a corresponding SQL
|
||||||
|
# UPDATE operation. In this particular case it will increase
|
||||||
|
# contention for workflow executions table drastically and
|
||||||
|
# decrease performance.
|
||||||
|
if not wf_ex.runtime_context.get(BACKLOG_KEY):
|
||||||
return []
|
return []
|
||||||
|
|
||||||
|
backlog_cmds = wf_ex.runtime_context.pop(BACKLOG_KEY)
|
||||||
|
|
||||||
return [
|
return [
|
||||||
commands.restore_command_from_dict(wf_ex, cmd_dict)
|
commands.restore_command_from_dict(wf_ex, cmd_dict)
|
||||||
for cmd_dict in backlog_cmds
|
for cmd_dict in backlog_cmds
|
||||||
|
|
|
@ -34,25 +34,32 @@ SCHEDULER_KEY = 'handle_expired_actions_key'
|
||||||
def handle_expired_actions():
|
def handle_expired_actions():
|
||||||
LOG.debug("Running heartbeat checker...")
|
LOG.debug("Running heartbeat checker...")
|
||||||
|
|
||||||
try:
|
interval = CONF.action_heartbeat.check_interval
|
||||||
interval = CONF.action_heartbeat.check_interval
|
max_missed = CONF.action_heartbeat.max_missed_heartbeats
|
||||||
max_missed = CONF.action_heartbeat.max_missed_heartbeats
|
|
||||||
exp_date = utils.utc_now_sec() - datetime.timedelta(
|
|
||||||
seconds=max_missed * interval
|
|
||||||
)
|
|
||||||
|
|
||||||
|
exp_date = utils.utc_now_sec() - datetime.timedelta(
|
||||||
|
seconds=max_missed * interval
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
with db_api.transaction():
|
with db_api.transaction():
|
||||||
action_exs = db_api.get_running_expired_sync_actions(exp_date)
|
action_exs = db_api.get_running_expired_sync_actions(exp_date)
|
||||||
|
|
||||||
LOG.debug("Found {} running and expired actions.".format(
|
LOG.debug("Found {} running and expired actions.".format(
|
||||||
len(action_exs))
|
len(action_exs))
|
||||||
)
|
)
|
||||||
|
|
||||||
if action_exs:
|
if action_exs:
|
||||||
LOG.info("Actions executions to transit to error, because "
|
LOG.info(
|
||||||
"heartbeat wasn't received: {}".format(action_exs))
|
"Actions executions to transit to error, because "
|
||||||
|
"heartbeat wasn't received: {}".format(action_exs)
|
||||||
|
)
|
||||||
|
|
||||||
for action_ex in action_exs:
|
for action_ex in action_exs:
|
||||||
result = mistral_lib.Result(
|
result = mistral_lib.Result(
|
||||||
error="Heartbeat wasn't received."
|
error="Heartbeat wasn't received."
|
||||||
)
|
)
|
||||||
|
|
||||||
action_handler.on_action_complete(action_ex, result)
|
action_handler.on_action_complete(action_ex, result)
|
||||||
finally:
|
finally:
|
||||||
schedule(interval)
|
schedule(interval)
|
||||||
|
@ -62,14 +69,19 @@ def setup():
|
||||||
interval = CONF.action_heartbeat.check_interval
|
interval = CONF.action_heartbeat.check_interval
|
||||||
max_missed = CONF.action_heartbeat.max_missed_heartbeats
|
max_missed = CONF.action_heartbeat.max_missed_heartbeats
|
||||||
enabled = interval and max_missed
|
enabled = interval and max_missed
|
||||||
|
|
||||||
if not enabled:
|
if not enabled:
|
||||||
LOG.info("Action heartbeat reporting disabled.")
|
LOG.info("Action heartbeat reporting disabled.")
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
||||||
wait_time = interval * max_missed
|
wait_time = interval * max_missed
|
||||||
LOG.debug("First run of action execution checker, wait before "
|
|
||||||
"checking to make sure executors have time to send "
|
LOG.debug(
|
||||||
"heartbeats. ({} seconds)".format(wait_time))
|
"First run of action execution checker, wait before "
|
||||||
|
"checking to make sure executors have time to send "
|
||||||
|
"heartbeats. ({} seconds)".format(wait_time)
|
||||||
|
)
|
||||||
|
|
||||||
schedule(wait_time)
|
schedule(wait_time)
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,10 @@
|
||||||
|
---
|
||||||
|
fixes:
|
||||||
|
- |
|
||||||
|
Eliminated an unnecessary update of the workflow execution object
|
||||||
|
when processing "on_action_complete" operation. W/o this fix all
|
||||||
|
such transactions would have to compete for the workflow executions
|
||||||
|
table that causes lots of DB deadlocks (on MySQL) and transaction
|
||||||
|
retries. In some cases the number of retries even exceeds the limit
|
||||||
|
(currently hardcoded 50) and such tasks can be fixed only with the
|
||||||
|
integrity checker over time.
|
Loading…
Reference in New Issue