From b710d3fdad64d1be4e4638cc6717ca83a43f30e4 Mon Sep 17 00:00:00 2001 From: Stephen Finucane Date: Tue, 19 Sep 2023 12:05:31 +0100 Subject: [PATCH] db: Replace use of LegacyEngineFacade We aren't passing context objects around, hence this looks more like ironic's use of oslo.db than e.g. nova or cinder. Nothing complicated though. Signed-off-by: Stephen Finucane Change-Id: Ia0368cc128d495011776e80a27af04776e876fa9 --- aodh/storage/__init__.py | 2 +- aodh/storage/base.py | 2 +- aodh/storage/impl_sqlalchemy.py | 224 ++++++++++-------- aodh/storage/sqlalchemy/alembic/env.py | 15 +- aodh/tests/functional/db.py | 8 +- .../storage/sqlalchemy/test_migrations.py | 5 +- .../functional/storage/test_get_connection.py | 2 +- .../tests/functional/storage/test_impl_log.py | 4 +- aodh/tests/unit/test_bin.py | 6 +- 9 files changed, 145 insertions(+), 123 deletions(-) diff --git a/aodh/storage/__init__.py b/aodh/storage/__init__.py index 1dfd23242..9fa4a10aa 100644 --- a/aodh/storage/__init__.py +++ b/aodh/storage/__init__.py @@ -72,7 +72,7 @@ def get_connection_from_config(conf): reraise=True) def _get_connection(): """Return an open connection to the database.""" - return mgr.driver(conf, url) + return mgr.driver(conf) return _get_connection() diff --git a/aodh/storage/base.py b/aodh/storage/base.py index 02bf8151e..64764804c 100644 --- a/aodh/storage/base.py +++ b/aodh/storage/base.py @@ -81,7 +81,7 @@ class Connection(object): 'storage': {'production_ready': False}, } - def __init__(self, conf, url): + def __init__(self, conf): pass @staticmethod diff --git a/aodh/storage/impl_sqlalchemy.py b/aodh/storage/impl_sqlalchemy.py index 6a9d02b74..1eb5e17ef 100644 --- a/aodh/storage/impl_sqlalchemy.py +++ b/aodh/storage/impl_sqlalchemy.py @@ -11,9 +11,11 @@ # under the License. """SQLAlchemy storage backend.""" + import copy import datetime import os.path +import threading from alembic import command from alembic import config @@ -37,9 +39,11 @@ from aodh.storage import models as alarm_api_models from aodh.storage.sqlalchemy import models from aodh.storage.sqlalchemy import utils as sql_utils +DB_CONFIGURED = False osprofiler_sqlalchemy = importutils.try_import('osprofiler.sqlalchemy') +_CONTEXT = threading.local() LOG = log.getLogger(__name__) AVAILABLE_CAPABILITIES = { @@ -48,8 +52,6 @@ AVAILABLE_CAPABILITIES = { 'history': {'query': {'simple': True, 'complex': True}}}, } - - AVAILABLE_STORAGE_CAPABILITIES = { 'storage': {'production_ready': True}, } @@ -90,6 +92,20 @@ def apply_filters(query, model, **filters): return query +def _session_for_read(): + session = enginefacade.reader.using(_CONTEXT) + if osprofiler_sqlalchemy: + session = osprofiler_sqlalchemy.wrap_session(sqlalchemy, session) + return session + + +def _session_for_write(): + session = enginefacade.writer.using(_CONTEXT) + if osprofiler_sqlalchemy: + session = osprofiler_sqlalchemy.wrap_session(sqlalchemy, session) + return session + + class Connection(base.Connection): """Put the data into a SQLAlchemy database. """ CAPABILITIES = base.update_nested(base.Connection.CAPABILITIES, @@ -99,28 +115,29 @@ class Connection(base.Connection): AVAILABLE_STORAGE_CAPABILITIES, ) - def __init__(self, conf, url): + def __init__(self, conf): # Set max_retries to 0, since oslo.db in certain cases may attempt # to retry making the db connection retried max_retries ^ 2 times # in failure case and db reconnection has already been implemented # in storage.__init__.get_connection_from_config function - options = dict(conf.database.items()) - options['max_retries'] = 0 - # FIXME(stephenfin): Remove this (and ideally use of - # LegacyEngineFacade) asap since it's not compatible with SQLAlchemy - # 2.0 - options['autocommit'] = True - # oslo.db doesn't support options defined by Aodh - for opt in storage.OPTS: - options.pop(opt.name, None) - self._engine_facade = enginefacade.LegacyEngineFacade( - self.dress_url(url), - **options) + global DB_CONFIGURED + + if not DB_CONFIGURED: + options = dict(conf.database.items()) + options['connection'] = self.dress_url(conf.database.connection) + options['max_retries'] = 0 + options['sqlite_fk'] = True + # FIXME(stephenfin): Remove this asap since it's not compatible + # with SQLAlchemy 2.0 + options['__autocommit'] = True + # oslo.db doesn't support options defined by Aodh + for opt in storage.OPTS: + options.pop(opt.name, None) + + enginefacade.configure(**options) + + DB_CONFIGURED = True - if osprofiler_sqlalchemy: - osprofiler_sqlalchemy.add_tracing(sqlalchemy, - self._engine_facade.get_engine(), - 'db') self.conf = conf @staticmethod @@ -132,9 +149,6 @@ class Connection(base.Connection): return str(url) return url - def disconnect(self): - self._engine_facade.get_engine().dispose() - def _get_alembic_config(self): cfg = config.Config( "%s/sqlalchemy/alembic/alembic.ini" % os.path.dirname(__file__)) @@ -148,7 +162,7 @@ class Connection(base.Connection): if nocreate: command.upgrade(cfg, "head") else: - engine = self._engine_facade.get_engine() + engine = enginefacade.writer.get_engine() ctxt = migration.MigrationContext.configure(engine.connect()) current_version = ctxt.get_current_revision() if current_version is None: @@ -158,7 +172,7 @@ class Connection(base.Connection): command.upgrade(cfg, "head") def clear(self): - engine = self._engine_facade.get_engine() + engine = enginefacade.writer.get_engine() for table in reversed(models.Base.metadata.sorted_tables): engine.execute(table.delete()) engine.dispose() @@ -167,9 +181,10 @@ class Connection(base.Connection): if limit == 0: return [] - session = self._engine_facade.get_session() - engine = self._engine_facade.get_engine() - query = session.query(table) + engine = enginefacade.reader.get_engine() + with _session_for_read() as session: + query = session.query(table) + transformer = sql_utils.QueryTransformer(table, query, dialect=engine.dialect.name) if filter_expr is not None: @@ -246,13 +261,17 @@ class Connection(base.Connection): def get_alarms(self, meter=None, pagination=None, **kwargs): """Yields a lists of alarms that match filters.""" pagination = pagination or {} - session = self._engine_facade.get_session() - query = session.query(models.Alarm) - query = apply_filters(query, models.Alarm, **kwargs) - query = self._get_pagination_query( - session, query, pagination, alarm_api_models.Alarm, models.Alarm) - - alarms = self._retrieve_alarms(query) + with _session_for_read() as session: + query = session.query(models.Alarm) + query = apply_filters(query, models.Alarm, **kwargs) + query = self._get_pagination_query( + session, + query, + pagination, + alarm_api_models.Alarm, + models.Alarm, + ) + alarms = [self._row_to_alarm_model(x) for x in query.all()] # TODO(cmart): improve this by using sqlalchemy.func factory if meter is not None: @@ -267,8 +286,7 @@ class Connection(base.Connection): :param alarm: The alarm to create. """ - session = self._engine_facade.get_session() - with session.begin(): + with _session_for_write() as session: alarm_row = models.Alarm(alarm_id=alarm.alarm_id) alarm_row.update(alarm.as_dict()) session.add(alarm_row) @@ -280,13 +298,14 @@ class Connection(base.Connection): :param alarm: the new Alarm to update """ - session = self._engine_facade.get_session() - with session.begin(): + with _session_for_write() as session: count = session.query(models.Alarm).filter( - models.Alarm.alarm_id == alarm.alarm_id).update( - alarm.as_dict()) - if not count: - raise storage.AlarmNotFound(alarm.alarm_id) + models.Alarm.alarm_id == alarm.alarm_id, + ).update(alarm.as_dict()) + + if not count: + raise storage.AlarmNotFound(alarm.alarm_id) + return alarm def delete_alarm(self, alarm_id): @@ -294,13 +313,14 @@ class Connection(base.Connection): :param alarm_id: ID of the alarm to delete """ - session = self._engine_facade.get_session() - with session.begin(): + with _session_for_write() as session: session.query(models.Alarm).filter( - models.Alarm.alarm_id == alarm_id).delete() + models.Alarm.alarm_id == alarm_id, + ).delete() # FIXME(liusheng): we should use delete cascade session.query(models.AlarmChange).filter( - models.AlarmChange.alarm_id == alarm_id).delete() + models.AlarmChange.alarm_id == alarm_id, + ).delete() @staticmethod def _row_to_alarm_change_model(row): @@ -359,45 +379,46 @@ class Connection(base.Connection): :param pagination: Pagination query parameters. """ pagination = pagination or {} - session = self._engine_facade.get_session() - query = session.query(models.AlarmChange) - query = query.filter(models.AlarmChange.alarm_id == alarm_id) - if on_behalf_of is not None: - query = query.filter( - models.AlarmChange.on_behalf_of == on_behalf_of) - if user is not None: - query = query.filter(models.AlarmChange.user_id == user) - if project is not None: - query = query.filter(models.AlarmChange.project_id == project) - if alarm_type is not None: - query = query.filter(models.AlarmChange.type == alarm_type) - if severity is not None: - query = query.filter(models.AlarmChange.severity == severity) - if start_timestamp: - if start_timestamp_op == 'gt': - query = query.filter( - models.AlarmChange.timestamp > start_timestamp) - else: - query = query.filter( - models.AlarmChange.timestamp >= start_timestamp) - if end_timestamp: - if end_timestamp_op == 'le': - query = query.filter( - models.AlarmChange.timestamp <= end_timestamp) - else: - query = query.filter( - models.AlarmChange.timestamp < end_timestamp) + with _session_for_read() as session: + query = session.query(models.AlarmChange) + query = query.filter(models.AlarmChange.alarm_id == alarm_id) - query = self._get_pagination_query( - session, query, pagination, alarm_api_models.AlarmChange, - models.AlarmChange) - return self._retrieve_alarm_history(query) + if on_behalf_of is not None: + query = query.filter( + models.AlarmChange.on_behalf_of == on_behalf_of) + if user is not None: + query = query.filter(models.AlarmChange.user_id == user) + if project is not None: + query = query.filter(models.AlarmChange.project_id == project) + if alarm_type is not None: + query = query.filter(models.AlarmChange.type == alarm_type) + if severity is not None: + query = query.filter(models.AlarmChange.severity == severity) + if start_timestamp: + if start_timestamp_op == 'gt': + query = query.filter( + models.AlarmChange.timestamp > start_timestamp) + else: + query = query.filter( + models.AlarmChange.timestamp >= start_timestamp) + if end_timestamp: + if end_timestamp_op == 'le': + query = query.filter( + models.AlarmChange.timestamp <= end_timestamp) + else: + query = query.filter( + models.AlarmChange.timestamp < end_timestamp) + + query = self._get_pagination_query( + session, query, pagination, alarm_api_models.AlarmChange, + models.AlarmChange) + + return (self._row_to_alarm_change_model(x) for x in query.all()) def record_alarm_change(self, alarm_change): """Record alarm change event.""" - session = self._engine_facade.get_session() - with session.begin(): + with _session_for_write() as session: alarm_change_row = models.AlarmChange( event_id=alarm_change['event_id']) alarm_change_row.update(alarm_change) @@ -411,8 +432,7 @@ class Connection(base.Connection): :param ttl: Number of seconds to keep alarm history records for. :param max_count: Number of records to delete. """ - session = self._engine_facade.get_session() - with session.begin(): + with _session_for_write() as session: end = timeutils.utcnow() - datetime.timedelta(seconds=ttl) alarm_history_q = (session.query(models.AlarmChange.event_id) .filter(models.AlarmChange.timestamp < end)) @@ -420,22 +440,23 @@ class Connection(base.Connection): deleted_rows = session.query(models.AlarmChange).filter( models.AlarmChange.event_id.in_(event_ids) ).delete(synchronize_session="fetch") - LOG.info("%d alarm histories are removed from database", - deleted_rows) + + LOG.info("%d alarm histories are removed from database", deleted_rows) def conditional_update(self, model, values, expected_values, filters=None): """Compare-and-swap conditional update SQLAlchemy implementation.""" filters = filters or {} filters.update(expected_values) - session = self._engine_facade.get_session() - query = session.query(model) - if filters: - query = query.filter_by(**filters) + with _session_for_write() as session: + query = session.query(model) + if filters: + query = query.filter_by(**filters) - update_args = {'synchronize_session': False} + update_args = {'synchronize_session': False} + + result = query.update(values, **update_args) - result = query.update(values, **update_args) return 0 != result @staticmethod @@ -446,21 +467,16 @@ class Connection(base.Connection): limit=row.limit, ) - def _retrieve_quotas(self, query): - return [self._row_to_quota_model(x) for x in query.all()] - def get_quotas(self, project_id): """Get resource quota for the given project.""" filters = {'project_id': project_id} - session = self._engine_facade.get_session() - query = session.query(models.Quota).filter_by(**filters) - return self._retrieve_quotas(query) + with _session_for_read() as session: + query = session.query(models.Quota).filter_by(**filters) + return [self._row_to_quota_model(x) for x in query.all()] def set_quotas(self, project_id, quotas): """Set resource quota for the given user.""" - session = self._engine_facade.get_session() - - with session.begin(): + with _session_for_write() as session: for q in quotas: values = { 'project_id': project_id, @@ -477,11 +493,11 @@ class Connection(base.Connection): values['limit'] = q['limit'] quota.update(values.copy()) - filters = {'project_id': project_id} - query = session.query(models.Quota).filter_by(**filters) - return self._retrieve_quotas(query) + filters = {'project_id': project_id} + query = session.query(models.Quota).filter_by(**filters) + return [self._row_to_quota_model(x) for x in query.all()] def delete_quotas(self, project_id): filters = {'project_id': project_id} - session = self._engine_facade.get_session() - session.query(models.Quota).filter_by(**filters).delete() + with _session_for_write() as session: + session.query(models.Quota).filter_by(**filters).delete() diff --git a/aodh/storage/sqlalchemy/alembic/env.py b/aodh/storage/sqlalchemy/alembic/env.py index 6fe2e71e5..11b207831 100644 --- a/aodh/storage/sqlalchemy/alembic/env.py +++ b/aodh/storage/sqlalchemy/alembic/env.py @@ -12,10 +12,12 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -from alembic import context + from logging.config import fileConfig -from aodh.storage import impl_sqlalchemy +from alembic import context +from oslo_db.sqlalchemy import enginefacade + from aodh.storage.sqlalchemy import models @@ -66,11 +68,8 @@ def run_migrations_online(): and associate a connection with the context. """ - conf = config.conf - conn = impl_sqlalchemy.Connection(conf, conf.database.connection) - connectable = conn._engine_facade.get_engine() - - with connectable.connect() as connection: + engine = enginefacade.writer.get_engine() + with engine.connect() as connection: context.configure( connection=connection, target_metadata=target_metadata @@ -78,13 +77,13 @@ def run_migrations_online(): with context.begin_transaction(): context.run_migrations() - conn.disconnect() if not hasattr(config, "conf"): from aodh import service config.conf = service.prepare_service([]) + if context.is_offline_mode(): run_migrations_offline() else: diff --git a/aodh/tests/functional/db.py b/aodh/tests/functional/db.py index 632d3ecf1..1d578dbf2 100644 --- a/aodh/tests/functional/db.py +++ b/aodh/tests/functional/db.py @@ -80,10 +80,12 @@ class TestBase(test_base.BaseTestCase, def setUp(self): super(TestBase, self).setUp() + db_url = os.environ.get( - 'AODH_TEST_STORAGE_URL', - 'sqlite://').replace( - "mysql://", "mysql+pymysql://") + 'AODH_TEST_STORAGE_URL', 'sqlite://', + ).replace( + "mysql://", "mysql+pymysql://", + ) engine = urlparse.urlparse(db_url).scheme # In case some drivers have additional specification, for example: # PyMySQL will have scheme mysql+pymysql. diff --git a/aodh/tests/functional/storage/sqlalchemy/test_migrations.py b/aodh/tests/functional/storage/sqlalchemy/test_migrations.py index 68ff7f1c0..d840ff2ef 100644 --- a/aodh/tests/functional/storage/sqlalchemy/test_migrations.py +++ b/aodh/tests/functional/storage/sqlalchemy/test_migrations.py @@ -1,4 +1,3 @@ -# # Copyright 2015 Huawei Technologies Co., Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -12,9 +11,11 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. + import abc from unittest import mock +from oslo_db.sqlalchemy import enginefacade from oslo_db.sqlalchemy import test_migrations from aodh.storage.sqlalchemy import models @@ -39,7 +40,7 @@ class ModelsMigrationsSync(tests_db.TestBase, return models.Base.metadata def get_engine(self): - return self.alarm_conn._engine_facade.get_engine() + return enginefacade.writer.get_engine() def db_sync(self, engine): pass diff --git a/aodh/tests/functional/storage/test_get_connection.py b/aodh/tests/functional/storage/test_get_connection.py index efd6bd624..7c2719d87 100644 --- a/aodh/tests/functional/storage/test_get_connection.py +++ b/aodh/tests/functional/storage/test_get_connection.py @@ -61,7 +61,7 @@ class ConnectionRetryTest(base.BaseTestCase): class ConnectionError(Exception): pass - def x(a, b): + def x(conf): raise ConnectionError log_init.side_effect = x diff --git a/aodh/tests/functional/storage/test_impl_log.py b/aodh/tests/functional/storage/test_impl_log.py index 9a6c71a7b..b6f5dd10b 100644 --- a/aodh/tests/functional/storage/test_impl_log.py +++ b/aodh/tests/functional/storage/test_impl_log.py @@ -1,4 +1,3 @@ -# # Copyright 2012 New Dream Network, LLC (DreamHost) # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -12,6 +11,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. + from oslo_config import cfg from oslotest import base @@ -21,4 +21,4 @@ from aodh.storage import impl_log class ConnectionTest(base.BaseTestCase): @staticmethod def test_get_connection(): - impl_log.Connection(cfg.CONF, None) + impl_log.Connection(cfg.CONF) diff --git a/aodh/tests/unit/test_bin.py b/aodh/tests/unit/test_bin.py index 47371961f..28bbb2500 100644 --- a/aodh/tests/unit/test_bin.py +++ b/aodh/tests/unit/test_bin.py @@ -66,7 +66,11 @@ class BinTestCase(base.BaseTestCase): stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, __ = subp.communicate() - self.assertEqual(0, subp.poll()) + self.assertEqual( + 0, + subp.poll(), + f'Failed with stdout:\n{out.decode()}', + ) msg = "Dropping alarm history 10 data with TTL 1" msg = msg.encode('utf-8') self.assertIn(msg, out)