Added session.flush() before update_on_match()

* It turns out that update_on_match() from oslo_db breaks fields
  of a persistent object (attached to the session) in some cases.
  For example, when we use PostgreSQL as a DB it doesn't properly
  merge fields of the given specimen onto the fields of the object
  attached to the SQLAlchemy session. For some reason, it cleans up
  the history of ORM events registered before this operation so
  that if some update took place in this session and they are not
  flushed yet to DB they simply get lost. Hence adding flush()
  on the session right before this operation helps.
* NOTE: this is rather a workaround than a fix of the root cause.
  Root cause still needs to be found.

Change-Id: I546badd8b1c96b68567287fc9d38b07738272503
Closes-Bug: #1736821
This commit is contained in:
Renat Akhmerov 2018-01-22 15:50:09 +07:00
parent 96f4d3f2d0
commit 05252da835
3 changed files with 53 additions and 53 deletions

View File

@ -129,6 +129,7 @@ MISTRAL_TITLE = """
|| \\/ || \\\ || || || \\\ ||
|| || || \\\ || || || /\\\ ||
|| || || __// ||_// || \\\__// \\\_ ||
Mistral Workflow Service, version %s
""" % version.version_string()

View File

@ -143,6 +143,51 @@ def _lock_entity(model, id):
return _secure_query(model).with_for_update().filter(model.id == id).one()
@b.session_aware()
def update_on_match(id, specimen, values, session=None):
"""Updates a model with the given values if it matches the given specimen.
:param id: ID of a persistent model.
:param specimen: Specimen used to match the
:param values: Values to set to the model if fields of the object
match the specimen.
:param session: Session.
:return: Persistent object attached to the session.
"""
assert id is not None
assert specimen is not None
# We need to flush the session because when we do update_on_match()
# it doesn't always update the state of the persistent object properly
# when it merges a specimen state into it. Some fields get wiped out from
# the history of ORM events that must be flushed later. For example, it
# doesn't work well in case of Postgres.
# See https://bugs.launchpad.net/mistral/+bug/1736821
session.flush()
model = None
model_class = type(specimen)
# Use WHERE clause to exclude possible conflicts if the state has
# already been changed.
try:
model = b.model_query(model_class).update_on_match(
specimen=specimen,
surrogate_key='id',
values=values
)
except oslo_sqlalchemy.update_match.NoRowsMatched:
LOG.info(
"Can't change state of persistent object "
"because it has already been changed. [model_class=%, id=%s, "
"specimen=%s, values=%s]",
model_class, id, specimen, values
)
return model
def _secure_query(model, *columns):
query = b.model_query(model, columns)
@ -831,32 +876,10 @@ def delete_workflow_executions(session=None, **kwargs):
return _delete_all(models.WorkflowExecution, **kwargs)
@b.session_aware()
def update_workflow_execution_state(id, cur_state, state, session=None):
wf_ex = None
def update_workflow_execution_state(id, cur_state, state):
specimen = models.WorkflowExecution(id=id, state=cur_state)
# Use WHERE clause to exclude possible conflicts if the state has
# already been changed.
try:
specimen = models.WorkflowExecution(
id=id,
state=cur_state
)
wf_ex = b.model_query(
models.WorkflowExecution).update_on_match(
specimen=specimen,
surrogate_key='id',
values={'state': state}
)
except oslo_sqlalchemy.update_match.NoRowsMatched:
LOG.info(
"Can't change workflow execution state from %s to %s, "
"because it has already been changed. [execution_id=%s]",
cur_state, state, id
)
return wf_ex
return update_on_match(id, specimen, {'state': state})
# Tasks executions.
@ -987,32 +1010,10 @@ def delete_task_executions(session=None, **kwargs):
return _delete_all(models.TaskExecution, **kwargs)
@b.session_aware()
def update_task_execution_state(id, cur_state, state, session=None):
wf_ex = None
def update_task_execution_state(id, cur_state, state):
specimen = models.TaskExecution(id=id, state=cur_state)
# Use WHERE clause to exclude possible conflicts if the state has
# already been changed.
try:
specimen = models.TaskExecution(
id=id,
state=cur_state
)
wf_ex = b.model_query(
models.TaskExecution).update_on_match(
specimen=specimen,
surrogate_key='id',
values={'state': state}
)
except oslo_sqlalchemy.update_match.NoRowsMatched:
LOG.info(
"Can't change task execution state from %s to %s, "
"because it has already been changed. [execution_id=%s]",
cur_state, state, id
)
return wf_ex
return update_on_match(id, specimen, {'state': state})
# Delayed calls.

View File

@ -285,9 +285,7 @@ def add_openstack_data_to_context(wf_ex):
def add_execution_to_context(wf_ex):
wf_ex.context = wf_ex.context or {}
wf_ex.context['__execution'] = {
'id': wf_ex.id
}
wf_ex.context['__execution'] = {'id': wf_ex.id}
def add_environment_to_context(wf_ex):