From b7faf8d4fcdf694fa5c96f1f89c311a2af5a007d Mon Sep 17 00:00:00 2001 From: Renat Akhmerov Date: Fri, 31 Aug 2018 15:11:00 +0700 Subject: [PATCH] Performance: remove unnecessary workflow execution update * This fix drastically improves performance since it eliminates an unnecessary update of the workflow execution object when processing "on_action_complete" operation. W/o this fix all such transactions would have to compete for the workflow executions table that causes lots of DB deadlocks (on MySQL) and transaction retries. In some cases the number of retries even exceeds the limit (currently hardcoded 50) and such tasks can be fixed only with the integrity checker over time. See the code comment in the "dispatcher.py" module that explains why this specific change eliminates the update of the workflow execution object. * Style changes in api.py and aciton_execution_checker.py Closes-Bug: #1790079 Change-Id: I08cb561e252d31e35fcfb61984d87a7bfc387a4d (cherry picked from commit 8f40b01cb6bb01d81437aae795e7f34fcee66abb) --- mistral/db/v2/sqlalchemy/api.py | 1 + mistral/engine/dispatcher.py | 15 ++++++-- mistral/services/action_execution_checker.py | 34 +++++++++++++------ ...low_execution_update-bdc9526bd39539c4.yaml | 10 ++++++ 4 files changed, 46 insertions(+), 14 deletions(-) create mode 100644 releasenotes/notes/remove_unnecessary_workflow_execution_update-bdc9526bd39539c4.yaml diff --git a/mistral/db/v2/sqlalchemy/api.py b/mistral/db/v2/sqlalchemy/api.py index dddf7bb01..1b5435ce1 100644 --- a/mistral/db/v2/sqlalchemy/api.py +++ b/mistral/db/v2/sqlalchemy/api.py @@ -1147,6 +1147,7 @@ def get_expired_executions(expiration_time, limit=None, columns=(), @b.session_aware() def get_running_expired_sync_actions(expiration_time, session=None): query = b.model_query(models.ActionExecution) + query = query.filter( models.ActionExecution.last_heartbeat < expiration_time ) diff --git a/mistral/engine/dispatcher.py b/mistral/engine/dispatcher.py index 4099f9db0..c4d18e031 100644 --- a/mistral/engine/dispatcher.py +++ b/mistral/engine/dispatcher.py @@ -98,11 +98,20 @@ def _save_command_to_backlog(wf_ex, cmd): def _poll_commands_from_backlog(wf_ex): - backlog_cmds = wf_ex.runtime_context.pop(BACKLOG_KEY, []) - - if not backlog_cmds: + # NOTE: We need to always use a guard condition that checks + # if a persistent structure is empty and, as in this case, + # return immediately w/o doing any further manipulations. + # Otherwise, if we do pop() operation with a default value + # then the ORM framework will consider it a modification of + # the persistent object and generate a corresponding SQL + # UPDATE operation. In this particular case it will increase + # contention for workflow executions table drastically and + # decrease performance. + if not wf_ex.runtime_context.get(BACKLOG_KEY): return [] + backlog_cmds = wf_ex.runtime_context.pop(BACKLOG_KEY) + return [ commands.restore_command_from_dict(wf_ex, cmd_dict) for cmd_dict in backlog_cmds diff --git a/mistral/services/action_execution_checker.py b/mistral/services/action_execution_checker.py index 2b69299cd..3cf4fcbf8 100644 --- a/mistral/services/action_execution_checker.py +++ b/mistral/services/action_execution_checker.py @@ -34,25 +34,32 @@ SCHEDULER_KEY = 'handle_expired_actions_key' def handle_expired_actions(): LOG.debug("Running heartbeat checker...") - try: - interval = CONF.action_heartbeat.check_interval - max_missed = CONF.action_heartbeat.max_missed_heartbeats - exp_date = utils.utc_now_sec() - datetime.timedelta( - seconds=max_missed * interval - ) + interval = CONF.action_heartbeat.check_interval + max_missed = CONF.action_heartbeat.max_missed_heartbeats + exp_date = utils.utc_now_sec() - datetime.timedelta( + seconds=max_missed * interval + ) + + try: with db_api.transaction(): action_exs = db_api.get_running_expired_sync_actions(exp_date) + LOG.debug("Found {} running and expired actions.".format( len(action_exs)) ) + if action_exs: - LOG.info("Actions executions to transit to error, because " - "heartbeat wasn't received: {}".format(action_exs)) + LOG.info( + "Actions executions to transit to error, because " + "heartbeat wasn't received: {}".format(action_exs) + ) + for action_ex in action_exs: result = mistral_lib.Result( error="Heartbeat wasn't received." ) + action_handler.on_action_complete(action_ex, result) finally: schedule(interval) @@ -62,14 +69,19 @@ def setup(): interval = CONF.action_heartbeat.check_interval max_missed = CONF.action_heartbeat.max_missed_heartbeats enabled = interval and max_missed + if not enabled: LOG.info("Action heartbeat reporting disabled.") + return wait_time = interval * max_missed - LOG.debug("First run of action execution checker, wait before " - "checking to make sure executors have time to send " - "heartbeats. ({} seconds)".format(wait_time)) + + LOG.debug( + "First run of action execution checker, wait before " + "checking to make sure executors have time to send " + "heartbeats. ({} seconds)".format(wait_time) + ) schedule(wait_time) diff --git a/releasenotes/notes/remove_unnecessary_workflow_execution_update-bdc9526bd39539c4.yaml b/releasenotes/notes/remove_unnecessary_workflow_execution_update-bdc9526bd39539c4.yaml new file mode 100644 index 000000000..d51e49d0e --- /dev/null +++ b/releasenotes/notes/remove_unnecessary_workflow_execution_update-bdc9526bd39539c4.yaml @@ -0,0 +1,10 @@ +--- +fixes: + - | + Eliminated an unnecessary update of the workflow execution object + when processing "on_action_complete" operation. W/o this fix all + such transactions would have to compete for the workflow executions + table that causes lots of DB deadlocks (on MySQL) and transaction + retries. In some cases the number of retries even exceeds the limit + (currently hardcoded 50) and such tasks can be fixed only with the + integrity checker over time.