Merge "Run revision bump operations en masse"

This commit is contained in:
Zuul 2019-02-20 10:52:57 +00:00 committed by Gerrit Code Review
commit fc3c0f9a09
3 changed files with 173 additions and 29 deletions

View File

@ -19,6 +19,7 @@ import sqlalchemy as sa
from sqlalchemy import event # noqa
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.ext import declarative
from sqlalchemy.orm import attributes
from sqlalchemy.orm import session as se
from neutron._i18n import _
@ -76,6 +77,16 @@ class StandardAttribute(model_base.BASEV2):
return
self.revision_number += 1
def _set_updated_revision_number(self, revision_number, updated_at):
attributes.set_committed_value(
self, "revision_number", revision_number)
attributes.set_committed_value(
self, "updated_at", updated_at)
@property
def _effective_standard_attribute_id(self):
return self.id
class HasStandardAttributes(object):
@ -146,6 +157,10 @@ class HasStandardAttributes(object):
single_parent=True,
uselist=False)
@property
def _effective_standard_attribute_id(self):
return self.standard_attr_id
def __init__(self, *args, **kwargs):
standard_attr_keys = ['description', 'created_at',
'updated_at', 'revision_number']
@ -188,6 +203,10 @@ class HasStandardAttributes(object):
# modified (e.g. fixed_ips change should bump port revision)
self.standard_attr.bump_revision()
def _set_updated_revision_number(self, revision_number, updated_at):
self.standard_attr._set_updated_revision_number(
revision_number, updated_at)
def _resource_model_map_helper(rs_map, resource, subclass):
if resource in rs_map:

View File

@ -37,7 +37,12 @@ class RevisionPlugin(service_base.ServicePluginBase):
def __init__(self):
super(RevisionPlugin, self).__init__()
# background on these event hooks:
# https://docs.sqlalchemy.org/en/latest/orm/session_events.html
db_api.sqla_listen(se.Session, 'before_flush', self.bump_revisions)
db_api.sqla_listen(
se.Session, "after_flush_postexec",
self._emit_related_revision_bumps)
db_api.sqla_listen(se.Session, 'after_commit',
self._clear_rev_bumped_flags)
db_api.sqla_listen(se.Session, 'after_rollback',
@ -46,9 +51,12 @@ class RevisionPlugin(service_base.ServicePluginBase):
def bump_revisions(self, session, context, instances):
self._enforce_if_match_constraints(session)
# bump revision number for any updated objects in the session
for obj in session.dirty:
if isinstance(obj, standard_attr.HasStandardAttributes):
self._bump_obj_revision(session, obj)
self._bump_obj_revisions(
session,
[
obj for obj in session.dirty
if isinstance(obj, standard_attr.HasStandardAttributes)]
)
# see if any created/updated/deleted objects bump the revision
# of another object
@ -56,26 +64,44 @@ class RevisionPlugin(service_base.ServicePluginBase):
o for o in session.deleted | session.dirty | session.new
if getattr(o, 'revises_on_change', ())
]
for obj in objects_with_related_revisions:
self._bump_related_revisions(session, obj)
collected = session.info.setdefault('_related_bumped', set())
self._collect_related_tobump(
session, objects_with_related_revisions, collected)
def _bump_related_revisions(self, session, obj):
for revises_col in getattr(obj, 'revises_on_change', ()):
def _emit_related_revision_bumps(self, session, context):
# within after_flush_postexec, emit an UPDATE statement to increment
# revision flags for related objects that were located in the
# before_flush phase.
#
# note that this event isn't called if the flush fails;
# in that case, the transaction is rolled back and the
# after_rollback event will invoke self._clear_rev_bumped_flags
# to clean out state.
collected = session.info.get('_related_bumped', None)
if collected:
try:
related_obj = self._find_related_obj(session, obj, revises_col)
self._bump_obj_revisions(
session, collected, version_check=False)
finally:
collected.clear()
def _collect_related_tobump(self, session, objects, collected):
for obj in objects:
if obj in collected:
continue
for revises_col in getattr(obj, 'revises_on_change', ()):
related_obj = self._find_related_obj(obj, revises_col)
if not related_obj:
LOG.warning("Could not find related %(col)s for "
"resource %(obj)s to bump revision.",
{'obj': obj, 'col': revises_col})
continue
# if related object revises others, bump those as well
self._bump_related_revisions(session, related_obj)
self._collect_related_tobump(session, [related_obj], collected)
# no need to bump revisions on related objects being deleted
if related_obj not in session.deleted:
self._bump_obj_revision(session, related_obj)
except exc.ObjectDeletedError:
# object was in session but another writer deleted it
pass
collected.add(related_obj)
return collected
def get_plugin_type(self):
return "revision_plugin"
@ -89,14 +115,19 @@ class RevisionPlugin(service_base.ServicePluginBase):
def extend_resource_dict_revision(resource_res, resource_db):
resource_res['revision_number'] = resource_db.revision_number
def _find_related_obj(self, session, obj, relationship_col):
def _find_related_obj(self, obj, relationship_col):
"""Gets a related object off of a relationship.
Raises a runtime error if the relationship isn't configured correctly
for revision bumping.
"""
# first check to see if it's directly attached to the object already
related_obj = getattr(obj, relationship_col)
try:
related_obj = getattr(obj, relationship_col)
except exc.ObjectDeletedError:
# object was in session but another writer deleted it
return None
if related_obj:
return related_obj
for rel in sqlalchemy.inspect(obj).mapper.relationships:
@ -110,24 +141,101 @@ class RevisionPlugin(service_base.ServicePluginBase):
def _clear_rev_bumped_flags(self, session):
"""This clears all flags on commit/rollback to enable rev bumps."""
session.info.pop('_related_bumped', None)
for inst in session:
setattr(inst, '_rev_bumped', False)
def _bump_obj_revision(self, session, obj):
"""Increment object revision in compare and swap fashion.
def _bump_obj_revisions(self, session, objects, version_check=True):
"""Increment object revisions.
If version_check=True, uses SQLAlchemy ORM's compare-and-swap feature
(known as "version_id_col" in the ORM mapping), which is part of the
StandardAttribute class.
If version_check=False, runs an UPDATE statement directly against
the set of all StandardAttribute objects at once, without using
any compare and swap logic.
If a revision number constraint rule was associated with the Session,
this is retrieved and each object is tested to see if it matches
this condition; if so, the constraint is enforced.
Before the increment, this checks and enforces any revision number
constraints.
"""
if getattr(obj, '_rev_bumped', False):
# we've already bumped the revision of this object in this txn
# filter objects for which we've already bumped the revision
to_bump = [
obj for obj in objects if not getattr(obj, '_rev_bumped', False)]
if not to_bump:
return
self._run_constrained_instance_match_check(session, to_bump)
if not version_check:
# this UPDATE statement could alternatively be written to run
# as an UPDATE-per-object with Python-generated revision numbers
# as parameters.
session.query(standard_attr.StandardAttribute).filter(
standard_attr.StandardAttribute.id.in_(
[obj._effective_standard_attribute_id for obj in to_bump]
)
).update({
# note that SQLAlchemy runs the onupdate function for
# the updated_at column and applies it to the SET clause as
# well.
standard_attr.StandardAttribute.revision_number:
standard_attr.StandardAttribute.revision_number + 1},
synchronize_session=False)
# run a SELECT to get back the new values we just generated.
# if MySQL supported RETURNING, we could get these numbers
# back from the UPDATE without running another SELECT.
retrieve_revision_numbers = {
row.id: (row.revision_number, row.updated_at)
for row in
session.query(
standard_attr.StandardAttribute.id,
standard_attr.StandardAttribute.revision_number,
standard_attr.StandardAttribute.updated_at,
).filter(
standard_attr.StandardAttribute.id.in_(
[
obj._effective_standard_attribute_id
for obj in to_bump
]
)
)
}
for obj in to_bump:
if version_check:
# full version check, run the ORM routine to UPDATE
# the row with a WHERE clause
obj.bump_revision()
else:
# no version check - get back what we did in our one-step
# UPDATE statement and set it without causing change in
# ORM flush state
try:
new_version_id, new_updated_at = retrieve_revision_numbers[
obj._effective_standard_attribute_id
]
except KeyError:
# in case the object was deleted concurrently
LOG.warning(
"No standard attr row found for resource: %(obj)s",
{'obj': obj})
else:
obj._set_updated_revision_number(
new_version_id, new_updated_at)
setattr(obj, '_rev_bumped', True)
def _run_constrained_instance_match_check(self, session, objects):
instance, match = self._get_constrained_instance_match(session)
if instance and instance == obj:
# one last check before bumping revision
self._enforce_if_match_constraints(session)
obj.bump_revision()
setattr(obj, '_rev_bumped', True)
for obj in objects:
if instance and instance == obj:
# one last check before bumping revision
self._enforce_if_match_constraints(session)
def _find_instance_by_column_value(self, session, model, column, value):
"""Lookup object in session or from DB based on a column's value."""

View File

@ -78,10 +78,27 @@ class TestRevisionPlugin(test_plugin.Ml2PluginV2TestCase):
other_ctx.session.delete(
other_ctx.session.query(models_v2.Port).first()
)
# expire the port so the revision bumping code will trigger a
# lookup on its attributes and encounter an ObjectDeletedError
other_ctx.session.flush()
# ensure no attribute lookups are attempted on an
# object deleted from the session when doing related
# bumps
self.ctx.session.expire(port)
rp._bump_related_revisions(self.ctx.session, ipal_obj)
collected = rp._collect_related_tobump(
self.ctx.session, [ipal_obj], set())
rp._bump_obj_revisions(
self.ctx.session, collected, version_check=False)
def test_shared_network_create(self):
# this test intends to run db_base_plugin_v2 -> create_network_db,
# which in turn creates a Network and then a NetworkRBAC object.
# An issue was observed with the revision_plugin which would interfere
# with the flush process that occurs with these two connected objects,
# creating two copies of the Network object in the Session and putting
# it into an invalid state.
with self.network(shared=True):
pass
def test_port_name_update_revises(self):
with self.port() as port: