Register sqlalchemy events through hook for UT cleanup

Register all sqlalchemy events through a new function in
neutron.db.api so we can keep track of active events and
ensure all are removed at the end of each test run.

Without this, an instance of a plugin may be left around
with the only reference to it existing in SQLAlchemy, where
it will receive events for tests unrelated to it and potentially
interfere.

Change-Id: I8e93eb4e8ef5a13f015db9cd20e44941cdcb72ef
This commit is contained in:
Kevin Benton 2017-01-11 17:32:43 -08:00
parent 0c05d49949
commit 553ab6d86e
13 changed files with 80 additions and 53 deletions

View File

@ -29,6 +29,8 @@ import osprofiler.sqlalchemy
from pecan import util as p_util
import six
import sqlalchemy
from sqlalchemy import event # noqa
from sqlalchemy import exc as sql_exc
from sqlalchemy.orm import exc
import traceback
@ -230,3 +232,32 @@ def autonested_transaction(sess):
session_context = sess.begin(subtransactions=True)
with session_context as tx:
yield tx
_REGISTERED_SQLA_EVENTS = []
def sqla_listen(*args):
"""Wrapper to track subscribers for test teardowns.
SQLAlchemy has no "unsubscribe all" option for its event listener
framework so we need to keep track of the subscribers by having
them call through here for test teardowns.
"""
event.listen(*args)
_REGISTERED_SQLA_EVENTS.append(args)
def sqla_remove(*args):
event.remove(*args)
_REGISTERED_SQLA_EVENTS.remove(args)
def sqla_remove_all():
for args in _REGISTERED_SQLA_EVENTS:
try:
event.remove(*args)
except sql_exc.InvalidRequestError:
# already removed
pass
del _REGISTERED_SQLA_EVENTS[:]

View File

@ -26,7 +26,6 @@ from oslo_log import log as logging
from oslo_utils import excutils
from oslo_utils import uuidutils
from sqlalchemy import and_
from sqlalchemy import event
from sqlalchemy import not_
from neutron._i18n import _, _LE, _LI
@ -136,12 +135,12 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
# NOTE(arosen) These event listeners are here to hook into when
# port status changes and notify nova about their change.
self.nova_notifier = nova.Notifier.get_instance()
event.listen(models_v2.Port, 'after_insert',
self.nova_notifier.send_port_status)
event.listen(models_v2.Port, 'after_update',
self.nova_notifier.send_port_status)
event.listen(models_v2.Port.status, 'set',
self.nova_notifier.record_port_status_changed)
db_api.sqla_listen(models_v2.Port, 'after_insert',
self.nova_notifier.send_port_status)
db_api.sqla_listen(models_v2.Port, 'after_update',
self.nova_notifier.send_port_status)
db_api.sqla_listen(models_v2.Port.status, 'set',
self.nova_notifier.record_port_status_changed)
for e in (events.BEFORE_CREATE, events.BEFORE_UPDATE,
events.BEFORE_DELETE):
registry.subscribe(self.validate_network_rbac_policy_change,

View File

@ -16,7 +16,7 @@ from alembic import context
from neutron_lib.db import model_base
from oslo_config import cfg
import sqlalchemy as sa
from sqlalchemy import event
from sqlalchemy import event # noqa
from neutron.db.migration.alembic_migrations import external
from neutron.db.migration import autogen

View File

@ -16,7 +16,7 @@ from neutron_lib.db import constants as db_const
from neutron_lib.db import model_base
from oslo_utils import timeutils
import sqlalchemy as sa
from sqlalchemy import event
from sqlalchemy import event # noqa
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.ext import declarative
from sqlalchemy.orm import session as se

View File

@ -389,6 +389,23 @@ def check_assertIsNone(logical_line, filename):
"sentences not allowed")
@flake8ext
def check_no_sqlalchemy_event_import(logical_line, filename, noqa):
"""N346 - Use neutron.db.api.sqla_listen instead of sqlalchemy event."""
if noqa:
return
is_import = (logical_line.startswith('import') or
logical_line.startswith('from'))
if not is_import:
return
for kw in ('sqlalchemy', 'event'):
if kw not in logical_line:
return
yield (0, "N346: Register sqlalchemy events through "
"neutron.db.api.sqla_listen so they can be cleaned up between "
"unit tests")
def factory(register):
register(validate_log_translations)
register(use_jsonutils)
@ -410,3 +427,4 @@ def factory(register):
register(check_no_imports_from_tests)
register(check_python3_no_filter)
register(check_assertIsNone)
register(check_no_sqlalchemy_event_import)

View File

@ -15,7 +15,6 @@
from oslo_config import cfg
from oslo_log import log
from oslo_utils import excutils
from sqlalchemy import event
from sqlalchemy import exc as sql_exc
from sqlalchemy.orm import session as se
@ -299,18 +298,19 @@ class TrackedResource(BaseResource):
self._model_class)
def register_events(self):
event.listen(self._model_class, 'after_insert', self._db_event_handler)
event.listen(self._model_class, 'after_delete', self._db_event_handler)
event.listen(se.Session, 'after_bulk_delete', self._except_bulk_delete)
listen = db_api.sqla_listen
listen(self._model_class, 'after_insert', self._db_event_handler)
listen(self._model_class, 'after_delete', self._db_event_handler)
listen(se.Session, 'after_bulk_delete', self._except_bulk_delete)
def unregister_events(self):
try:
event.remove(self._model_class, 'after_insert',
self._db_event_handler)
event.remove(self._model_class, 'after_delete',
self._db_event_handler)
event.remove(se.Session, 'after_bulk_delete',
self._except_bulk_delete)
db_api.sqla_remove(self._model_class, 'after_insert',
self._db_event_handler)
db_api.sqla_remove(self._model_class, 'after_delete',
self._db_event_handler)
db_api.sqla_remove(se.Session, 'after_bulk_delete',
self._except_bulk_delete)
except sql_exc.InvalidRequestError:
LOG.warning(_LW("No sqlalchemy event for resource %s found"),
self.name)

View File

@ -12,11 +12,11 @@
# under the License.
from oslo_log import log as logging
from sqlalchemy import event
from sqlalchemy.orm import exc
from sqlalchemy.orm import session as se
from neutron._i18n import _, _LW
from neutron.db import api as db_api
from neutron.db import db_base_plugin_v2
from neutron.db import standard_attr
from neutron.services import service_base
@ -34,7 +34,7 @@ class RevisionPlugin(service_base.ServicePluginBase):
for resource in standard_attr.get_standard_attr_resource_model_map():
db_base_plugin_v2.NeutronDbPluginV2.register_dict_extend_funcs(
resource, [self.extend_resource_dict_revision])
event.listen(se.Session, 'before_flush', self.bump_revisions)
db_api.sqla_listen(se.Session, 'before_flush', self.bump_revisions)
def bump_revisions(self, session, context, instances):
# bump revision number for any updated objects in the session

View File

@ -15,11 +15,10 @@
from neutron_lib import exceptions as n_exc
from oslo_log import log
from oslo_utils import timeutils
from sqlalchemy import event
from sqlalchemy import exc as sql_exc
from sqlalchemy.orm import session as se
from neutron._i18n import _LW
from neutron.db import api as db_api
from neutron.db import standard_attr
LOG = log.getLogger(__name__)
@ -68,22 +67,10 @@ class TimeStamp_db_mixin(object):
obj.updated_at = timeutils.utcnow()
def register_db_events(self):
event.listen(standard_attr.StandardAttribute, 'before_insert',
self._add_timestamp)
event.listen(se.Session, 'before_flush', self.update_timestamp)
def unregister_db_events(self):
self._unregister_db_event(standard_attr.StandardAttribute,
'before_insert', self._add_timestamp)
self._unregister_db_event(se.Session, 'before_flush',
self.update_timestamp)
def _unregister_db_event(self, listen_obj, listened_event, listen_hander):
try:
event.remove(listen_obj, listened_event, listen_hander)
except sql_exc.InvalidRequestError:
LOG.warning(_LW("No sqlalchemy event for resource %s found"),
listen_obj)
listen = db_api.sqla_listen
listen(standard_attr.StandardAttribute, 'before_insert',
self._add_timestamp)
listen(se.Session, 'before_flush', self.update_timestamp)
def _format_timestamp(self, resource_db, result):
result['created_at'] = (resource_db.created_at.

View File

@ -47,6 +47,7 @@ from neutron.callbacks import registry
from neutron.common import config
from neutron.common import rpc as n_rpc
from neutron.db import agentschedulers_db
from neutron.db import api as db_api
from neutron import manager
from neutron import policy
from neutron.quota import resource_registry
@ -295,6 +296,7 @@ class BaseTestCase(DietTestCase):
policy.init()
self.addCleanup(policy.reset)
self.addCleanup(resource_registry.unregister_all_resources)
self.addCleanup(db_api.sqla_remove_all)
self.addCleanup(rpc_consumer_reg.clear)
def get_new_temp_dir(self):

View File

@ -23,7 +23,7 @@ from oslo_db.sqlalchemy import test_migrations
from oslotest import base as oslotest_base
import six
import sqlalchemy
from sqlalchemy import event
from sqlalchemy import event # noqa
from sqlalchemy.sql import ddl as sqla_ddl
import subprocess

View File

@ -31,7 +31,6 @@ from oslo_utils import importutils
from oslo_utils import netutils
from oslo_utils import uuidutils
import six
from sqlalchemy import event
from sqlalchemy import orm
import testtools
from testtools import matchers
@ -6467,9 +6466,7 @@ class DbOperationBoundMixin(object):
self._db_execute_count += 1
engine = db_api.context_manager.writer.get_engine()
event.listen(engine, 'after_execute', _event_incrementer)
self.addCleanup(event.remove, engine, 'after_execute',
_event_incrementer)
db_api.sqla_listen(engine, 'after_execute', _event_incrementer)
def _get_context(self):
if self.admin:

View File

@ -56,8 +56,6 @@ class TimeStampChangedsinceTestCase(test_db_base_plugin_v2.
ext_mgr = TimeStampExtensionManager()
super(TimeStampChangedsinceTestCase, self).setUp(plugin=self.plugin,
ext_mgr=ext_mgr)
self.addCleanup(
directory.get_plugin('timestamp').unregister_db_events)
self.addCleanup(manager.NeutronManager.clear_instance)
def setup_coreplugin(self, core_plugin=None, load_plugins=True):

View File

@ -1148,7 +1148,6 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
do_request)
def _test_operation_resillient_to_ipallocation_failure(self, func):
from sqlalchemy import event
class IPAllocationsGrenade(object):
insert_ip_called = False
@ -1167,12 +1166,8 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
listener = IPAllocationsGrenade()
engine = db_api.context_manager.writer.get_engine()
event.listen(engine, 'before_cursor_execute', listener.execute)
event.listen(engine, 'commit', listener.commit)
self.addCleanup(event.remove, engine, 'before_cursor_execute',
listener.execute)
self.addCleanup(event.remove, engine, 'commit',
listener.commit)
db_api.sqla_listen(engine, 'before_cursor_execute', listener.execute)
db_api.sqla_listen(engine, 'commit', listener.commit)
func()
# make sure that the grenade went off during the commit
self.assertTrue(listener.except_raised)