sqlalchemy: switch to new oslo.db facade

The SQLAlchemyIndexer is a class with methods, which is not the
standalone-function based calling style that the new enginefacade was designed
for; additionally, it does not use the global engine scope and instead has its
own, which is even disconnectable (do we need that?) So an interim
PerInstanceFacade object is produced locally which ideally should be replaced
with first-class patterns in oslo.db, supporting context on an methods with
"self" as well as supporting self-local transaction facades (if we really need
that?).

Change-Id: I4e60cca3434e4290145a6a38ce6f27a6c7b1d2be
Co-Authored-By: Mike Bayer <mike_mp@zzzcomputing.com>
This commit is contained in:
Julien Danjou 2015-07-22 12:38:18 +02:00
parent 57ac881e06
commit 67f43a2f7e
3 changed files with 221 additions and 205 deletions

View File

@ -71,16 +71,16 @@ def run_migrations_online():
conf = config.conf
indexer = sqlalchemy.SQLAlchemyIndexer(conf)
indexer.connect()
connectable = indexer.engine_facade.get_engine()
with indexer.facade.writer_connection() as connectable:
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
with context.begin_transaction():
context.run_migrations()
indexer.disconnect()

View File

@ -17,12 +17,13 @@ from __future__ import absolute_import
import itertools
import operator
import os.path
import threading
import uuid
import oslo_db.api
from oslo_db import exception
from oslo_db.sqlalchemy import enginefacade
from oslo_db.sqlalchemy import models
from oslo_db.sqlalchemy import session
from oslo_db.sqlalchemy import utils as oslo_db_utils
import six
import sqlalchemy
@ -63,6 +64,44 @@ def get_resource_mappers(ext):
'history': resource_history_ext}
class PerInstanceFacade(object):
def __init__(self, conf):
self.trans = enginefacade.transaction_context()
self.trans.configure(
**dict(conf.database.items())
)
self._context = threading.local()
def independent_writer(self):
return self.trans.independent.writer.using(self._context)
def independent_reader(self):
return self.trans.independent.reader.using(self._context)
def writer_connection(self):
return self.trans.connection.writer.using(self._context)
def reader_connection(self):
return self.trans.connection.reader.using(self._context)
def writer(self):
return self.trans.writer.using(self._context)
def reader(self):
return self.trans.reader.using(self._context)
def get_engine(self):
# TODO(mbayer): add get_engine() to enginefacade
if not self.trans._factory._started:
self.trans._factory._start()
return self.trans._factory._writer_engine
def dispose(self):
# TODO(mbayer): add dispose() to enginefacade
if self.trans._factory._started:
self.trans._factory._writer_engine.dispose()
class SQLAlchemyIndexer(indexer.IndexerDriver):
resources = extension.ExtensionManager('gnocchi.indexer.resources')
@ -72,12 +111,10 @@ class SQLAlchemyIndexer(indexer.IndexerDriver):
def __init__(self, conf):
conf.set_override("connection", conf.indexer.url, "database")
self.conf = conf
def connect(self):
self.engine_facade = session.EngineFacade.from_config(self.conf)
self.facade = PerInstanceFacade(conf)
def disconnect(self):
self.engine_facade.get_engine().dispose()
self.facade.dispose()
def _get_alembic_config(self):
from alembic import config
@ -88,6 +125,9 @@ class SQLAlchemyIndexer(indexer.IndexerDriver):
self.conf.database.connection)
return cfg
def get_engine(self):
return self.facade.get_engine()
def upgrade(self, nocreate=False):
from alembic import command
from alembic import migration
@ -97,14 +137,14 @@ class SQLAlchemyIndexer(indexer.IndexerDriver):
if nocreate:
command.upgrade(cfg, "head")
else:
engine = self.engine_facade.get_engine()
ctxt = migration.MigrationContext.configure(engine.connect())
current_version = ctxt.get_current_revision()
if current_version is None:
Base.metadata.create_all(engine)
command.stamp(cfg, "head")
else:
command.upgrade(cfg, "head")
with self.facade.writer_connection() as connection:
ctxt = migration.MigrationContext.configure(connection)
current_version = ctxt.get_current_revision()
if current_version is None:
Base.metadata.create_all(connection)
command.stamp(cfg, "head")
else:
command.upgrade(cfg, "head")
def _resource_type_to_class(self, resource_type, purpose="resource"):
if resource_type not in self._RESOURCE_CLASS_MAPPER:
@ -112,42 +152,36 @@ class SQLAlchemyIndexer(indexer.IndexerDriver):
return self._RESOURCE_CLASS_MAPPER[resource_type][purpose]
def list_archive_policies(self):
session = self.engine_facade.get_session()
aps = list(session.query(ArchivePolicy).all())
session.expunge_all()
return aps
with self.facade.independent_reader() as session:
return list(session.query(ArchivePolicy).all())
def get_archive_policy(self, name):
session = self.engine_facade.get_session()
ap = session.query(ArchivePolicy).get(name)
session.expunge_all()
return ap
with self.facade.independent_reader() as session:
return session.query(ArchivePolicy).get(name)
def delete_archive_policy(self, name):
session = self.engine_facade.get_session()
try:
if session.query(ArchivePolicy).filter(
ArchivePolicy.name == name).delete() == 0:
raise indexer.NoSuchArchivePolicy(name)
except exception.DBReferenceError as e:
if (e.constraint ==
'fk_metric_archive_policy_name_archive_policy_name'):
raise indexer.ArchivePolicyInUse(name)
raise
with self.facade.writer() as session:
try:
if session.query(ArchivePolicy).filter(
ArchivePolicy.name == name).delete() == 0:
raise indexer.NoSuchArchivePolicy(name)
except exception.DBReferenceError as e:
if (e.constraint ==
'fk_metric_archive_policy_name_archive_policy_name'):
raise indexer.ArchivePolicyInUse(name)
raise
def get_metrics(self, uuids, active_only=True, with_resource=False):
if not uuids:
return []
session = self.engine_facade.get_session()
query = session.query(Metric).filter(Metric.id.in_(uuids))
if active_only:
query = query.filter(Metric.status == 'active')
if with_resource:
query = query.options(sqlalchemy.orm.joinedload('resource'))
with self.facade.independent_reader() as session:
query = session.query(Metric).filter(Metric.id.in_(uuids))
if active_only:
query = query.filter(Metric.status == 'active')
if with_resource:
query = query.options(sqlalchemy.orm.joinedload('resource'))
metrics = list(query.all())
session.expunge_all()
return metrics
return list(query.all())
def create_archive_policy(self, archive_policy):
ap = ArchivePolicy(
@ -156,33 +190,27 @@ class SQLAlchemyIndexer(indexer.IndexerDriver):
definition=archive_policy.definition,
aggregation_methods=list(archive_policy.aggregation_methods),
)
session = self.engine_facade.get_session()
session.add(ap)
try:
session.flush()
with self.facade.writer() as session:
session.add(ap)
except exception.DBDuplicateEntry:
raise indexer.ArchivePolicyAlreadyExists(archive_policy.name)
session.expunge_all()
return ap
def list_archive_policy_rules(self):
session = self.engine_facade.get_session()
aps = session.query(ArchivePolicyRule).order_by(
ArchivePolicyRule.metric_pattern.desc()).all()
session.expunge_all()
return aps
with self.facade.independent_reader() as session:
return session.query(ArchivePolicyRule).order_by(
ArchivePolicyRule.metric_pattern.desc()).all()
def get_archive_policy_rule(self, name):
session = self.engine_facade.get_session()
ap = session.query(ArchivePolicyRule).get(name)
session.expunge_all()
return ap
with self.facade.independent_reader() as session:
return session.query(ArchivePolicyRule).get(name)
def delete_archive_policy_rule(self, name):
session = self.engine_facade.get_session()
if session.query(ArchivePolicyRule).filter(
ArchivePolicyRule.name == name).delete() == 0:
raise indexer.NoSuchArchivePolicyRule(name)
with self.facade.writer() as session:
if session.query(ArchivePolicyRule).filter(
ArchivePolicyRule.name == name).delete() == 0:
raise indexer.NoSuchArchivePolicyRule(name)
def create_archive_policy_rule(self, name, metric_pattern,
archive_policy_name):
@ -191,13 +219,11 @@ class SQLAlchemyIndexer(indexer.IndexerDriver):
archive_policy_name=archive_policy_name,
metric_pattern=metric_pattern
)
session = self.engine_facade.get_session()
session.add(apr)
try:
session.flush()
with self.facade.writer() as session:
session.add(apr)
except exception.DBDuplicateEntry:
raise indexer.ArchivePolicyRuleAlreadyExists(name)
session.expunge_all()
return apr
def create_metric(self, id, created_by_user_id, created_by_project_id,
@ -209,35 +235,31 @@ class SQLAlchemyIndexer(indexer.IndexerDriver):
archive_policy_name=archive_policy_name,
name=name,
resource_id=resource_id)
session = self.engine_facade.get_session()
session.add(m)
try:
session.flush()
with self.facade.writer() as session:
session.add(m)
except exception.DBReferenceError as e:
if (e.constraint ==
'fk_metric_archive_policy_name_archive_policy_name'):
raise indexer.NoSuchArchivePolicy(archive_policy_name)
raise
session.expunge_all()
return m
def list_metrics(self, user_id=None, project_id=None, details=False,
status='active', **kwargs):
session = self.engine_facade.get_session()
q = session.query(Metric).filter(
Metric.status == status).order_by(Metric.id)
if user_id is not None:
q = q.filter(Metric.created_by_user_id == user_id)
if project_id is not None:
q = q.filter(Metric.created_by_project_id == project_id)
for attr in kwargs:
q = q.filter(getattr(Metric, attr) == kwargs[attr])
if details:
q = q.options(sqlalchemy.orm.joinedload('resource'))
with self.facade.independent_reader() as session:
q = session.query(Metric).filter(
Metric.status == status).order_by(Metric.id)
if user_id is not None:
q = q.filter(Metric.created_by_user_id == user_id)
if project_id is not None:
q = q.filter(Metric.created_by_project_id == project_id)
for attr in kwargs:
q = q.filter(getattr(Metric, attr) == kwargs[attr])
if details:
q = q.options(sqlalchemy.orm.joinedload('resource'))
metrics = list(q.all())
session.expunge_all()
return metrics
return list(q.all())
def create_resource(self, resource_type, id,
created_by_user_id, created_by_project_id,
@ -248,19 +270,19 @@ class SQLAlchemyIndexer(indexer.IndexerDriver):
if (started_at is not None
and ended_at is not None
and started_at > ended_at):
raise ValueError("Start timestamp cannot be after end timestamp")
r = resource_cls(
id=id,
type=resource_type,
created_by_user_id=created_by_user_id,
created_by_project_id=created_by_project_id,
user_id=user_id,
project_id=project_id,
started_at=started_at,
ended_at=ended_at,
**kwargs)
session = self.engine_facade.get_session()
with session.begin():
raise ValueError(
"Start timestamp cannot be after end timestamp")
with self.facade.writer() as session:
r = resource_cls(
id=id,
type=resource_type,
created_by_user_id=created_by_user_id,
created_by_project_id=created_by_project_id,
user_id=user_id,
project_id=project_id,
started_at=started_at,
ended_at=ended_at,
**kwargs)
session.add(r)
try:
session.flush()
@ -273,11 +295,10 @@ class SQLAlchemyIndexer(indexer.IndexerDriver):
if metrics is not None:
self._set_metrics_for_resource(session, r, metrics)
# NOTE(jd) Force load of metrics :)
r.metrics
# NOTE(jd) Force load of metrics :)
r.metrics
session.expunge_all()
return r
return r
@oslo_db.api.retry_on_deadlock
def update_resource(self, resource_type,
@ -288,9 +309,8 @@ class SQLAlchemyIndexer(indexer.IndexerDriver):
resource_cls = self._resource_type_to_class(resource_type)
resource_history_cls = self._resource_type_to_class(resource_type,
"history")
session = self.engine_facade.get_session()
try:
with session.begin():
with self.facade.writer() as session:
try:
# NOTE(sileht): We use FOR UPDATE that is not galera friendly,
# but they are no other way to cleanly patch a resource and
# store the history that safe when two concurrent calls are
@ -315,7 +335,7 @@ class SQLAlchemyIndexer(indexer.IndexerDriver):
# Update the resource
if ended_at is not _marker:
# NOTE(jd) MySQL does not honor checks. I hate it.
engine = self.engine_facade.get_engine()
engine = session.connection()
if engine.dialect.name == "mysql":
if r.started_at is not None and ended_at is not None:
if r.started_at > ended_at:
@ -338,17 +358,18 @@ class SQLAlchemyIndexer(indexer.IndexerDriver):
Metric.status == 'active').update(
{"resource_id": None})
self._set_metrics_for_resource(session, r, metrics)
except exception.DBConstraintError as e:
if e.check_name == "ck_started_before_ended":
raise indexer.ResourceValueError(
resource_type, "ended_at", ended_at)
raise
# NOTE(jd) Force load of metrics do it outside the session!
r.metrics
session.flush()
except exception.DBConstraintError as e:
if e.check_name == "ck_started_before_ended":
raise indexer.ResourceValueError(
resource_type, "ended_at", ended_at)
raise
session.expunge_all()
return r
# NOTE(jd) Force load of metrics do it outside the session!
r.metrics
return r
@staticmethod
def _set_metrics_for_resource(session, r, metrics):
@ -389,8 +410,7 @@ class SQLAlchemyIndexer(indexer.IndexerDriver):
session.expire(r, ['metrics'])
def delete_resource(self, resource_id):
session = self.engine_facade.get_session()
with session.begin():
with self.facade.writer() as session:
# We are going to delete the resource; the on delete will set the
# resource_id of the attached metrics to NULL, we just have to mark
# their status as 'delete'
@ -403,15 +423,13 @@ class SQLAlchemyIndexer(indexer.IndexerDriver):
def get_resource(self, resource_type, resource_id, with_metrics=False):
resource_cls = self._resource_type_to_class(resource_type)
session = self.engine_facade.get_session()
q = session.query(
resource_cls).filter(
resource_cls.id == resource_id)
if with_metrics:
q = q.options(sqlalchemy.orm.joinedload('metrics'))
r = q.first()
session.expunge_all()
return r
with self.facade.independent_reader() as session:
q = session.query(
resource_cls).filter(
resource_cls.id == resource_id)
if with_metrics:
q = q.options(sqlalchemy.orm.joinedload('metrics'))
return q.first()
def _get_history_result_mapper(self, resource_type):
resource_cls = self._resource_type_to_class(resource_type)
@ -461,97 +479,95 @@ class SQLAlchemyIndexer(indexer.IndexerDriver):
sorts=None):
sorts = sorts or []
session = self.engine_facade.get_session()
if history:
target_cls = self._get_history_result_mapper(resource_type)
else:
target_cls = self._resource_type_to_class(resource_type)
q = session.query(target_cls)
with self.facade.independent_reader() as session:
q = session.query(target_cls)
if attribute_filter:
engine = session.connection()
try:
f = QueryTransformer.build_filter(engine.dialect.name,
target_cls,
attribute_filter)
except indexer.QueryAttributeError as e:
# NOTE(jd) The QueryAttributeError does not know about
# resource_type, so convert it
raise indexer.ResourceAttributeError(resource_type,
e.attribute)
q = q.filter(f)
# transform the api-wg representation to the oslo.db one
sort_keys = []
sort_dirs = []
for sort in sorts:
sort_key, __, sort_dir = sort.partition(":")
sort_keys.append(sort_key.strip())
sort_dirs.append(sort_dir or 'asc')
# paginate_query require at list one uniq column
if 'id' not in sort_keys:
sort_keys.append('id')
sort_dirs.append('asc')
if marker:
resource_marker = self.get_resource(resource_type, marker)
if resource_marker is None:
raise indexer.InvalidPagination(
"Invalid marker: `%s'" % marker)
else:
resource_marker = None
if attribute_filter:
engine = self.engine_facade.get_engine()
try:
f = QueryTransformer.build_filter(engine.dialect.name,
target_cls,
attribute_filter)
except indexer.QueryAttributeError as e:
# NOTE(jd) The QueryAttributeError does not know about
# resource_type, so convert it
raise indexer.ResourceAttributeError(resource_type,
e.attribute)
q = oslo_db_utils.paginate_query(q, target_cls, limit=limit,
sort_keys=sort_keys,
marker=resource_marker,
sort_dirs=sort_dirs)
except (exception.InvalidSortKey, ValueError) as e:
raise indexer.InvalidPagination(e)
q = q.filter(f)
# Always include metrics
q = q.options(sqlalchemy.orm.joinedload("metrics"))
all_resources = q.all()
# transform the api-wg representation to the oslo.db one
sort_keys = []
sort_dirs = []
for sort in sorts:
sort_key, __, sort_dir = sort.partition(":")
sort_keys.append(sort_key.strip())
sort_dirs.append(sort_dir or 'asc')
# paginate_query require at list one uniq column
if 'id' not in sort_keys:
sort_keys.append('id')
sort_dirs.append('asc')
if marker:
resource_marker = self.get_resource(resource_type, marker)
if resource_marker is None:
raise indexer.InvalidPagination(
"Invalid marker: `%s'" % marker)
else:
resource_marker = None
try:
q = oslo_db_utils.paginate_query(q, target_cls, limit=limit,
sort_keys=sort_keys,
marker=resource_marker,
sort_dirs=sort_dirs)
except (exception.InvalidSortKey, ValueError) as e:
raise indexer.InvalidPagination(e)
# Always include metrics
q = q.options(sqlalchemy.orm.joinedload("metrics"))
all_resources = q.all()
if details:
grouped_by_type = itertools.groupby(
all_resources, lambda r: (r.revision != -1, r.type))
all_resources = []
for (is_history, type), resources in grouped_by_type:
if type == 'generic':
# No need for a second query
all_resources.extend(resources)
else:
if is_history:
target_cls = self._resource_type_to_class(type,
"history")
f = target_cls.revision.in_(
[r.revision for r in resources])
if details:
grouped_by_type = itertools.groupby(
all_resources, lambda r: (r.revision != -1, r.type))
all_resources = []
for (is_history, type), resources in grouped_by_type:
if type == 'generic':
# No need for a second query
all_resources.extend(resources)
else:
target_cls = self._resource_type_to_class(type)
f = target_cls.id.in_([r.id for r in resources])
if is_history:
target_cls = self._resource_type_to_class(
type, "history")
f = target_cls.revision.in_(
[r.revision for r in resources])
else:
target_cls = self._resource_type_to_class(type)
f = target_cls.id.in_([r.id for r in resources])
q = session.query(target_cls).filter(f)
# Always include metrics
q = q.options(sqlalchemy.orm.joinedload('metrics'))
all_resources.extend(q.all())
session.expunge_all()
return all_resources
q = session.query(target_cls).filter(f)
# Always include metrics
q = q.options(sqlalchemy.orm.joinedload('metrics'))
all_resources.extend(q.all())
return all_resources
def expunge_metric(self, id):
session = self.engine_facade.get_session()
if session.query(Metric).filter(Metric.id == id).delete() == 0:
raise indexer.NoSuchMetric(id)
with self.facade.writer() as session:
if session.query(Metric).filter(Metric.id == id).delete() == 0:
raise indexer.NoSuchMetric(id)
def delete_metric(self, id):
session = self.engine_facade.get_session()
if session.query(Metric).filter(
Metric.id == id).update({"status": "delete"}) == 0:
raise indexer.NoSuchMetric(id)
with self.facade.writer() as session:
if session.query(Metric).filter(
Metric.id == id).update({"status": "delete"}) == 0:
raise indexer.NoSuchMetric(id)
class QueryTransformer(object):

View File

@ -40,7 +40,7 @@ class ModelsMigrationsSync(
return sqlalchemy_base.Base.metadata
def get_engine(self):
return self.index.engine_facade.get_engine()
return self.index.get_engine()
@staticmethod
def db_sync(engine):