db: Replace use of LegacyEngineFacade

We aren't passing context objects around, hence this looks more like
ironic's use of oslo.db than e.g. nova or cinder. Nothing complicated
though.

Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
Change-Id: Ia0368cc128d495011776e80a27af04776e876fa9
This commit is contained in:
Stephen Finucane 2023-09-19 12:05:31 +01:00
parent 64e0f7c895
commit b710d3fdad
9 changed files with 145 additions and 123 deletions

View File

@ -72,7 +72,7 @@ def get_connection_from_config(conf):
reraise=True)
def _get_connection():
"""Return an open connection to the database."""
return mgr.driver(conf, url)
return mgr.driver(conf)
return _get_connection()

View File

@ -81,7 +81,7 @@ class Connection(object):
'storage': {'production_ready': False},
}
def __init__(self, conf, url):
def __init__(self, conf):
pass
@staticmethod

View File

@ -11,9 +11,11 @@
# under the License.
"""SQLAlchemy storage backend."""
import copy
import datetime
import os.path
import threading
from alembic import command
from alembic import config
@ -37,9 +39,11 @@ from aodh.storage import models as alarm_api_models
from aodh.storage.sqlalchemy import models
from aodh.storage.sqlalchemy import utils as sql_utils
DB_CONFIGURED = False
osprofiler_sqlalchemy = importutils.try_import('osprofiler.sqlalchemy')
_CONTEXT = threading.local()
LOG = log.getLogger(__name__)
AVAILABLE_CAPABILITIES = {
@ -48,8 +52,6 @@ AVAILABLE_CAPABILITIES = {
'history': {'query': {'simple': True,
'complex': True}}},
}
AVAILABLE_STORAGE_CAPABILITIES = {
'storage': {'production_ready': True},
}
@ -90,6 +92,20 @@ def apply_filters(query, model, **filters):
return query
def _session_for_read():
session = enginefacade.reader.using(_CONTEXT)
if osprofiler_sqlalchemy:
session = osprofiler_sqlalchemy.wrap_session(sqlalchemy, session)
return session
def _session_for_write():
session = enginefacade.writer.using(_CONTEXT)
if osprofiler_sqlalchemy:
session = osprofiler_sqlalchemy.wrap_session(sqlalchemy, session)
return session
class Connection(base.Connection):
"""Put the data into a SQLAlchemy database. """
CAPABILITIES = base.update_nested(base.Connection.CAPABILITIES,
@ -99,28 +115,29 @@ class Connection(base.Connection):
AVAILABLE_STORAGE_CAPABILITIES,
)
def __init__(self, conf, url):
def __init__(self, conf):
# Set max_retries to 0, since oslo.db in certain cases may attempt
# to retry making the db connection retried max_retries ^ 2 times
# in failure case and db reconnection has already been implemented
# in storage.__init__.get_connection_from_config function
options = dict(conf.database.items())
options['max_retries'] = 0
# FIXME(stephenfin): Remove this (and ideally use of
# LegacyEngineFacade) asap since it's not compatible with SQLAlchemy
# 2.0
options['autocommit'] = True
# oslo.db doesn't support options defined by Aodh
for opt in storage.OPTS:
options.pop(opt.name, None)
self._engine_facade = enginefacade.LegacyEngineFacade(
self.dress_url(url),
**options)
global DB_CONFIGURED
if not DB_CONFIGURED:
options = dict(conf.database.items())
options['connection'] = self.dress_url(conf.database.connection)
options['max_retries'] = 0
options['sqlite_fk'] = True
# FIXME(stephenfin): Remove this asap since it's not compatible
# with SQLAlchemy 2.0
options['__autocommit'] = True
# oslo.db doesn't support options defined by Aodh
for opt in storage.OPTS:
options.pop(opt.name, None)
enginefacade.configure(**options)
DB_CONFIGURED = True
if osprofiler_sqlalchemy:
osprofiler_sqlalchemy.add_tracing(sqlalchemy,
self._engine_facade.get_engine(),
'db')
self.conf = conf
@staticmethod
@ -132,9 +149,6 @@ class Connection(base.Connection):
return str(url)
return url
def disconnect(self):
self._engine_facade.get_engine().dispose()
def _get_alembic_config(self):
cfg = config.Config(
"%s/sqlalchemy/alembic/alembic.ini" % os.path.dirname(__file__))
@ -148,7 +162,7 @@ class Connection(base.Connection):
if nocreate:
command.upgrade(cfg, "head")
else:
engine = self._engine_facade.get_engine()
engine = enginefacade.writer.get_engine()
ctxt = migration.MigrationContext.configure(engine.connect())
current_version = ctxt.get_current_revision()
if current_version is None:
@ -158,7 +172,7 @@ class Connection(base.Connection):
command.upgrade(cfg, "head")
def clear(self):
engine = self._engine_facade.get_engine()
engine = enginefacade.writer.get_engine()
for table in reversed(models.Base.metadata.sorted_tables):
engine.execute(table.delete())
engine.dispose()
@ -167,9 +181,10 @@ class Connection(base.Connection):
if limit == 0:
return []
session = self._engine_facade.get_session()
engine = self._engine_facade.get_engine()
query = session.query(table)
engine = enginefacade.reader.get_engine()
with _session_for_read() as session:
query = session.query(table)
transformer = sql_utils.QueryTransformer(table, query,
dialect=engine.dialect.name)
if filter_expr is not None:
@ -246,13 +261,17 @@ class Connection(base.Connection):
def get_alarms(self, meter=None, pagination=None, **kwargs):
"""Yields a lists of alarms that match filters."""
pagination = pagination or {}
session = self._engine_facade.get_session()
query = session.query(models.Alarm)
query = apply_filters(query, models.Alarm, **kwargs)
query = self._get_pagination_query(
session, query, pagination, alarm_api_models.Alarm, models.Alarm)
alarms = self._retrieve_alarms(query)
with _session_for_read() as session:
query = session.query(models.Alarm)
query = apply_filters(query, models.Alarm, **kwargs)
query = self._get_pagination_query(
session,
query,
pagination,
alarm_api_models.Alarm,
models.Alarm,
)
alarms = [self._row_to_alarm_model(x) for x in query.all()]
# TODO(cmart): improve this by using sqlalchemy.func factory
if meter is not None:
@ -267,8 +286,7 @@ class Connection(base.Connection):
:param alarm: The alarm to create.
"""
session = self._engine_facade.get_session()
with session.begin():
with _session_for_write() as session:
alarm_row = models.Alarm(alarm_id=alarm.alarm_id)
alarm_row.update(alarm.as_dict())
session.add(alarm_row)
@ -280,13 +298,14 @@ class Connection(base.Connection):
:param alarm: the new Alarm to update
"""
session = self._engine_facade.get_session()
with session.begin():
with _session_for_write() as session:
count = session.query(models.Alarm).filter(
models.Alarm.alarm_id == alarm.alarm_id).update(
alarm.as_dict())
if not count:
raise storage.AlarmNotFound(alarm.alarm_id)
models.Alarm.alarm_id == alarm.alarm_id,
).update(alarm.as_dict())
if not count:
raise storage.AlarmNotFound(alarm.alarm_id)
return alarm
def delete_alarm(self, alarm_id):
@ -294,13 +313,14 @@ class Connection(base.Connection):
:param alarm_id: ID of the alarm to delete
"""
session = self._engine_facade.get_session()
with session.begin():
with _session_for_write() as session:
session.query(models.Alarm).filter(
models.Alarm.alarm_id == alarm_id).delete()
models.Alarm.alarm_id == alarm_id,
).delete()
# FIXME(liusheng): we should use delete cascade
session.query(models.AlarmChange).filter(
models.AlarmChange.alarm_id == alarm_id).delete()
models.AlarmChange.alarm_id == alarm_id,
).delete()
@staticmethod
def _row_to_alarm_change_model(row):
@ -359,45 +379,46 @@ class Connection(base.Connection):
:param pagination: Pagination query parameters.
"""
pagination = pagination or {}
session = self._engine_facade.get_session()
query = session.query(models.AlarmChange)
query = query.filter(models.AlarmChange.alarm_id == alarm_id)
if on_behalf_of is not None:
query = query.filter(
models.AlarmChange.on_behalf_of == on_behalf_of)
if user is not None:
query = query.filter(models.AlarmChange.user_id == user)
if project is not None:
query = query.filter(models.AlarmChange.project_id == project)
if alarm_type is not None:
query = query.filter(models.AlarmChange.type == alarm_type)
if severity is not None:
query = query.filter(models.AlarmChange.severity == severity)
if start_timestamp:
if start_timestamp_op == 'gt':
query = query.filter(
models.AlarmChange.timestamp > start_timestamp)
else:
query = query.filter(
models.AlarmChange.timestamp >= start_timestamp)
if end_timestamp:
if end_timestamp_op == 'le':
query = query.filter(
models.AlarmChange.timestamp <= end_timestamp)
else:
query = query.filter(
models.AlarmChange.timestamp < end_timestamp)
with _session_for_read() as session:
query = session.query(models.AlarmChange)
query = query.filter(models.AlarmChange.alarm_id == alarm_id)
query = self._get_pagination_query(
session, query, pagination, alarm_api_models.AlarmChange,
models.AlarmChange)
return self._retrieve_alarm_history(query)
if on_behalf_of is not None:
query = query.filter(
models.AlarmChange.on_behalf_of == on_behalf_of)
if user is not None:
query = query.filter(models.AlarmChange.user_id == user)
if project is not None:
query = query.filter(models.AlarmChange.project_id == project)
if alarm_type is not None:
query = query.filter(models.AlarmChange.type == alarm_type)
if severity is not None:
query = query.filter(models.AlarmChange.severity == severity)
if start_timestamp:
if start_timestamp_op == 'gt':
query = query.filter(
models.AlarmChange.timestamp > start_timestamp)
else:
query = query.filter(
models.AlarmChange.timestamp >= start_timestamp)
if end_timestamp:
if end_timestamp_op == 'le':
query = query.filter(
models.AlarmChange.timestamp <= end_timestamp)
else:
query = query.filter(
models.AlarmChange.timestamp < end_timestamp)
query = self._get_pagination_query(
session, query, pagination, alarm_api_models.AlarmChange,
models.AlarmChange)
return (self._row_to_alarm_change_model(x) for x in query.all())
def record_alarm_change(self, alarm_change):
"""Record alarm change event."""
session = self._engine_facade.get_session()
with session.begin():
with _session_for_write() as session:
alarm_change_row = models.AlarmChange(
event_id=alarm_change['event_id'])
alarm_change_row.update(alarm_change)
@ -411,8 +432,7 @@ class Connection(base.Connection):
:param ttl: Number of seconds to keep alarm history records for.
:param max_count: Number of records to delete.
"""
session = self._engine_facade.get_session()
with session.begin():
with _session_for_write() as session:
end = timeutils.utcnow() - datetime.timedelta(seconds=ttl)
alarm_history_q = (session.query(models.AlarmChange.event_id)
.filter(models.AlarmChange.timestamp < end))
@ -420,22 +440,23 @@ class Connection(base.Connection):
deleted_rows = session.query(models.AlarmChange).filter(
models.AlarmChange.event_id.in_(event_ids)
).delete(synchronize_session="fetch")
LOG.info("%d alarm histories are removed from database",
deleted_rows)
LOG.info("%d alarm histories are removed from database", deleted_rows)
def conditional_update(self, model, values, expected_values, filters=None):
"""Compare-and-swap conditional update SQLAlchemy implementation."""
filters = filters or {}
filters.update(expected_values)
session = self._engine_facade.get_session()
query = session.query(model)
if filters:
query = query.filter_by(**filters)
with _session_for_write() as session:
query = session.query(model)
if filters:
query = query.filter_by(**filters)
update_args = {'synchronize_session': False}
update_args = {'synchronize_session': False}
result = query.update(values, **update_args)
result = query.update(values, **update_args)
return 0 != result
@staticmethod
@ -446,21 +467,16 @@ class Connection(base.Connection):
limit=row.limit,
)
def _retrieve_quotas(self, query):
return [self._row_to_quota_model(x) for x in query.all()]
def get_quotas(self, project_id):
"""Get resource quota for the given project."""
filters = {'project_id': project_id}
session = self._engine_facade.get_session()
query = session.query(models.Quota).filter_by(**filters)
return self._retrieve_quotas(query)
with _session_for_read() as session:
query = session.query(models.Quota).filter_by(**filters)
return [self._row_to_quota_model(x) for x in query.all()]
def set_quotas(self, project_id, quotas):
"""Set resource quota for the given user."""
session = self._engine_facade.get_session()
with session.begin():
with _session_for_write() as session:
for q in quotas:
values = {
'project_id': project_id,
@ -477,11 +493,11 @@ class Connection(base.Connection):
values['limit'] = q['limit']
quota.update(values.copy())
filters = {'project_id': project_id}
query = session.query(models.Quota).filter_by(**filters)
return self._retrieve_quotas(query)
filters = {'project_id': project_id}
query = session.query(models.Quota).filter_by(**filters)
return [self._row_to_quota_model(x) for x in query.all()]
def delete_quotas(self, project_id):
filters = {'project_id': project_id}
session = self._engine_facade.get_session()
session.query(models.Quota).filter_by(**filters).delete()
with _session_for_write() as session:
session.query(models.Quota).filter_by(**filters).delete()

View File

@ -12,10 +12,12 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from alembic import context
from logging.config import fileConfig
from aodh.storage import impl_sqlalchemy
from alembic import context
from oslo_db.sqlalchemy import enginefacade
from aodh.storage.sqlalchemy import models
@ -66,11 +68,8 @@ def run_migrations_online():
and associate a connection with the context.
"""
conf = config.conf
conn = impl_sqlalchemy.Connection(conf, conf.database.connection)
connectable = conn._engine_facade.get_engine()
with connectable.connect() as connection:
engine = enginefacade.writer.get_engine()
with engine.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata
@ -78,13 +77,13 @@ def run_migrations_online():
with context.begin_transaction():
context.run_migrations()
conn.disconnect()
if not hasattr(config, "conf"):
from aodh import service
config.conf = service.prepare_service([])
if context.is_offline_mode():
run_migrations_offline()
else:

View File

@ -80,10 +80,12 @@ class TestBase(test_base.BaseTestCase,
def setUp(self):
super(TestBase, self).setUp()
db_url = os.environ.get(
'AODH_TEST_STORAGE_URL',
'sqlite://').replace(
"mysql://", "mysql+pymysql://")
'AODH_TEST_STORAGE_URL', 'sqlite://',
).replace(
"mysql://", "mysql+pymysql://",
)
engine = urlparse.urlparse(db_url).scheme
# In case some drivers have additional specification, for example:
# PyMySQL will have scheme mysql+pymysql.

View File

@ -1,4 +1,3 @@
#
# Copyright 2015 Huawei Technologies Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -12,9 +11,11 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
from unittest import mock
from oslo_db.sqlalchemy import enginefacade
from oslo_db.sqlalchemy import test_migrations
from aodh.storage.sqlalchemy import models
@ -39,7 +40,7 @@ class ModelsMigrationsSync(tests_db.TestBase,
return models.Base.metadata
def get_engine(self):
return self.alarm_conn._engine_facade.get_engine()
return enginefacade.writer.get_engine()
def db_sync(self, engine):
pass

View File

@ -61,7 +61,7 @@ class ConnectionRetryTest(base.BaseTestCase):
class ConnectionError(Exception):
pass
def x(a, b):
def x(conf):
raise ConnectionError
log_init.side_effect = x

View File

@ -1,4 +1,3 @@
#
# Copyright 2012 New Dream Network, LLC (DreamHost)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -12,6 +11,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from oslotest import base
@ -21,4 +21,4 @@ from aodh.storage import impl_log
class ConnectionTest(base.BaseTestCase):
@staticmethod
def test_get_connection():
impl_log.Connection(cfg.CONF, None)
impl_log.Connection(cfg.CONF)

View File

@ -66,7 +66,11 @@ class BinTestCase(base.BaseTestCase):
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out, __ = subp.communicate()
self.assertEqual(0, subp.poll())
self.assertEqual(
0,
subp.poll(),
f'Failed with stdout:\n{out.decode()}',
)
msg = "Dropping alarm history 10 data with TTL 1"
msg = msg.encode('utf-8')
self.assertIn(msg, out)