# -*- encoding: utf-8 -*- # # Copyright © 2014-2015 eNovance # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. from __future__ import absolute_import import itertools import operator import os.path import threading import uuid from alembic import migration from alembic import operations import oslo_db.api from oslo_db import exception from oslo_db.sqlalchemy import enginefacade from oslo_db.sqlalchemy import utils as oslo_db_utils from oslo_log import log try: import psycopg2 except ImportError: psycopg2 = None try: import pymysql.constants.ER import pymysql.err except ImportError: pymysql = None import six import sqlalchemy from sqlalchemy.engine import url as sqlalchemy_url import sqlalchemy.exc from sqlalchemy import types import sqlalchemy_utils from gnocchi import exceptions from gnocchi import indexer from gnocchi.indexer import sqlalchemy_base as base from gnocchi import utils Base = base.Base Metric = base.Metric ArchivePolicy = base.ArchivePolicy ArchivePolicyRule = base.ArchivePolicyRule Resource = base.Resource ResourceHistory = base.ResourceHistory ResourceType = base.ResourceType _marker = indexer._marker LOG = log.getLogger(__name__) def retry_on_deadlock(f): return oslo_db.api.wrap_db_retry(retry_on_deadlock=True, max_retries=20, retry_interval=0.1, max_retry_interval=2)(f) class PerInstanceFacade(object): def __init__(self, conf): self.trans = enginefacade.transaction_context() self.trans.configure( **dict(conf.database.items()) ) self._context = threading.local() def independent_writer(self): return self.trans.independent.writer.using(self._context) def independent_reader(self): return self.trans.independent.reader.using(self._context) def writer_connection(self): return self.trans.connection.writer.using(self._context) def reader_connection(self): return self.trans.connection.reader.using(self._context) def writer(self): return self.trans.writer.using(self._context) def reader(self): return self.trans.reader.using(self._context) def get_engine(self): # TODO(mbayer): add get_engine() to enginefacade if not self.trans._factory._started: self.trans._factory._start() return self.trans._factory._writer_engine def dispose(self): # TODO(mbayer): add dispose() to enginefacade if self.trans._factory._started: self.trans._factory._writer_engine.dispose() class ResourceClassMapper(object): def __init__(self): # FIXME(sileht): 3 attributes, perhaps we need a better structure. self._cache = {'generic': {'resource': base.Resource, 'history': base.ResourceHistory, 'updated_at': utils.utcnow()}} @staticmethod def _build_class_mappers(resource_type, baseclass=None): tablename = resource_type.tablename tables_args = {"extend_existing": True} tables_args.update(base.COMMON_TABLES_ARGS) # TODO(sileht): Add columns if not baseclass: baseclass = resource_type.to_baseclass() resource_ext = type( str("%s_resource" % tablename), (baseclass, base.ResourceExtMixin, base.Resource), {"__tablename__": tablename, "__table_args__": tables_args}) resource_history_ext = type( str("%s_history" % tablename), (baseclass, base.ResourceHistoryExtMixin, base.ResourceHistory), {"__tablename__": ("%s_history" % tablename), "__table_args__": tables_args}) return {'resource': resource_ext, 'history': resource_history_ext, 'updated_at': resource_type.updated_at} def get_classes(self, resource_type): # NOTE(sileht): We don't care about concurrency here because we allow # sqlalchemy to override its global object with extend_existing=True # this is safe because classname and tablename are uuid. try: mappers = self._cache[resource_type.tablename] # Cache is outdated if (resource_type.name != "generic" and resource_type.updated_at > mappers['updated_at']): for table_purpose in ['resource', 'history']: Base.metadata.remove(Base.metadata.tables[ mappers[table_purpose].__tablename__]) del self._cache[resource_type.tablename] raise KeyError return mappers except KeyError: mapper = self._build_class_mappers(resource_type) self._cache[resource_type.tablename] = mapper return mapper @retry_on_deadlock def map_and_create_tables(self, resource_type, facade): if resource_type.state != "creating": raise RuntimeError("map_and_create_tables must be called in state " "creating") mappers = self.get_classes(resource_type) tables = [Base.metadata.tables[mappers["resource"].__tablename__], Base.metadata.tables[mappers["history"].__tablename__]] try: with facade.writer_connection() as connection: Base.metadata.create_all(connection, tables=tables) except exception.DBError as e: if self._is_current_transaction_aborted(e): raise exception.RetryRequest(e) raise # NOTE(sileht): no need to protect the _cache with a lock # get_classes cannot be called in state creating self._cache[resource_type.tablename] = mappers @staticmethod def _is_current_transaction_aborted(exception): # HACK(jd) Sometimes, PostgreSQL raises an error such as "current # transaction is aborted, commands ignored until end of transaction # block" on its own catalog, so we need to retry, but this is not # caught by oslo.db as a deadlock. This is likely because when we use # Base.metadata.create_all(), sqlalchemy itself gets an error it does # not catch or something. So this is why this function exists. To # paperover I guess. inn_e = exception.inner_exception return (psycopg2 and isinstance(inn_e, sqlalchemy.exc.InternalError) and isinstance(inn_e.orig, psycopg2.InternalError) # current transaction is aborted and inn_e.orig.pgcode == '25P02') @retry_on_deadlock def unmap_and_delete_tables(self, resource_type, facade): if resource_type.state != "deleting": raise RuntimeError("unmap_and_delete_tables must be called in " "state deleting") mappers = self.get_classes(resource_type) del self._cache[resource_type.tablename] tables = [Base.metadata.tables[mappers['resource'].__tablename__], Base.metadata.tables[mappers['history'].__tablename__]] # NOTE(sileht): Base.metadata.drop_all doesn't # issue CASCADE stuffs correctly at least on postgresql # We drop foreign keys manually to not lock the destination # table for too long during drop table. # It's safe to not use a transaction since # the resource_type table is already cleaned and commited # so this code cannot be triggerred anymore for this # resource_type with facade.writer_connection() as connection: try: for table in tables: for fk in table.foreign_key_constraints: try: self._safe_execute( connection, sqlalchemy.schema.DropConstraint(fk)) except exception.DBNonExistentConstraint: pass for table in tables: try: self._safe_execute(connection, sqlalchemy.schema.DropTable(table)) except exception.DBNonExistentTable: pass except exception.DBError as e: if self._is_current_transaction_aborted(e): raise exception.RetryRequest(e) raise # NOTE(sileht): If something goes wrong here, we are currently # fucked, that why we expose the state to the superuser. # TODO(sileht): The idea is to make the delete resource_type more # like a cleanup method, I mean we should don't fail if the # constraint have already been dropped or the table have already # been deleted. So, when the superuser have fixed it's backend # issue, it can rerun 'DELETE ../resource_type/foobar' even the # state is already error and if we are sure all underlying # resources have been cleaned we really deleted the resource_type. # TODO(sileht): Remove this resource on other workers # by using expiration on cache ? for table in tables: Base.metadata.remove(table) @retry_on_deadlock def _safe_execute(self, connection, works): # NOTE(sileht): we create a transaction to ensure mysql # create locks on other transaction... trans = connection.begin() connection.execute(works) trans.commit() class SQLAlchemyIndexer(indexer.IndexerDriver): _RESOURCE_TYPE_MANAGER = ResourceClassMapper() @classmethod def _create_new_database(cls, url): """Used by testing to create a new database.""" purl = sqlalchemy_url.make_url( cls.dress_url( url)) purl.database = purl.database + str(uuid.uuid4()).replace('-', '') new_url = str(purl) sqlalchemy_utils.create_database(new_url) return new_url @staticmethod def dress_url(url): # If no explicit driver has been set, we default to pymysql if url.startswith("mysql://"): url = sqlalchemy_url.make_url(url) url.drivername = "mysql+pymysql" return str(url) return url def __init__(self, conf): conf.set_override("connection", self.dress_url(conf.indexer.url), "database") self.conf = conf self.facade = PerInstanceFacade(conf) def disconnect(self): self.facade.dispose() def _get_alembic_config(self): from alembic import config cfg = config.Config( "%s/alembic/alembic.ini" % os.path.dirname(__file__)) cfg.set_main_option('sqlalchemy.url', self.conf.database.connection) return cfg def get_engine(self): return self.facade.get_engine() def upgrade(self, nocreate=False, create_legacy_resource_types=False): from alembic import command from alembic import migration cfg = self._get_alembic_config() cfg.conf = self.conf if nocreate: command.upgrade(cfg, "head") else: with self.facade.writer_connection() as connection: ctxt = migration.MigrationContext.configure(connection) current_version = ctxt.get_current_revision() if current_version is None: Base.metadata.create_all(connection) command.stamp(cfg, "head") else: command.upgrade(cfg, "head") # TODO(sileht): generic shouldn't be a particular case # we must create a rt_generic and rt_generic_history table # like other type for rt in base.get_legacy_resource_types(): if not (rt.name == "generic" or create_legacy_resource_types): continue try: with self.facade.writer() as session: session.add(rt) except exception.DBDuplicateEntry: continue if rt.name != "generic": try: self._RESOURCE_TYPE_MANAGER.map_and_create_tables( rt, self.facade) except Exception: self._set_resource_type_state(rt.name, "creation_error") LOG.exception('Fail to create tables for ' 'resource_type "%s"', rt.name) continue self._set_resource_type_state(rt.name, "active") # NOTE(jd) We can have deadlock errors either here or later in # map_and_create_tables(). We can't decorate create_resource_type() # directly or each part might retry later on its own and cause a # duplicate. And it seems there's no way to use the same session for # both adding the resource_type in our table and calling # map_and_create_tables() :-( @retry_on_deadlock def _add_resource_type(self, resource_type): try: with self.facade.writer() as session: session.add(resource_type) except exception.DBDuplicateEntry: raise indexer.ResourceTypeAlreadyExists(resource_type.name) def create_resource_type(self, resource_type): # NOTE(sileht): mysql have a stupid and small length limitation on the # foreign key and index name, so we can't use the resource type name as # tablename, the limit is 64. The longest name we have is # fk__h_revision_rh_revision, # so 64 - 26 = 38 and 3 chars for rt_, 35 chars, uuid is 32, it's cool. tablename = "rt_%s" % uuid.uuid4().hex resource_type = ResourceType(name=resource_type.name, tablename=tablename, attributes=resource_type.attributes, state="creating") # NOTE(sileht): ensure the driver is able to store the request # resource_type resource_type.to_baseclass() self._add_resource_type(resource_type) try: self._RESOURCE_TYPE_MANAGER.map_and_create_tables(resource_type, self.facade) except Exception: # NOTE(sileht): We fail the DDL, we have no way to automatically # recover, just set a particular state self._set_resource_type_state(resource_type.name, "creation_error") raise self._set_resource_type_state(resource_type.name, "active") resource_type.state = "active" return resource_type def update_resource_type(self, name, add_attributes=None, del_attributes=None): if not add_attributes and not del_attributes: return self._set_resource_type_state(name, "updating", "active") try: with self.facade.independent_writer() as session: rt = self._get_resource_type(session, name) with self.facade.writer_connection() as connection: ctx = migration.MigrationContext.configure(connection) op = operations.Operations(ctx) with op.batch_alter_table(rt.tablename) as batch_op: for attr in del_attributes: batch_op.drop_column(attr) for attr in add_attributes: # TODO(sileht): When attr.required is True, we have # to pass a default. rest layer current protect us, # requied = True is not yet allowed batch_op.add_column(sqlalchemy.Column( attr.name, attr.satype, nullable=not attr.required)) rt.state = "active" rt.updated_at = utils.utcnow() rt.attributes.extend(add_attributes) for attr in list(rt.attributes): if attr.name in del_attributes: rt.attributes.remove(attr) # FIXME(sileht): yeah that's wierd but attributes is a custom # json column and 'extend' doesn't trigger sql update, this # enforce the update. I wonder if sqlalchemy provides something # on column description side. sqlalchemy.orm.attributes.flag_modified(rt, 'attributes') except Exception: # NOTE(sileht): We fail the DDL, we have no way to automatically # recover, just set a particular state # TODO(sileht): Create a repair REST endpoint that delete # columns not existing in the database but in the resource type # description. This will allow to pass wrong update_error to active # state, that currently not possible. self._set_resource_type_state(name, "updating_error") raise return rt def get_resource_type(self, name): with self.facade.independent_reader() as session: return self._get_resource_type(session, name) def _get_resource_type(self, session, name): resource_type = session.query(ResourceType).get(name) if not resource_type: raise indexer.NoSuchResourceType(name) return resource_type @retry_on_deadlock def _set_resource_type_state(self, name, state, expected_previous_state=None): with self.facade.writer() as session: q = session.query(ResourceType) q = q.filter(ResourceType.name == name) if expected_previous_state is not None: q = q.filter(ResourceType.state == expected_previous_state) update = q.update({'state': state}) if update == 0: if expected_previous_state is not None: rt = session.query(ResourceType).get(name) if rt: raise indexer.UnexpectedResourceTypeState( name, expected_previous_state, rt.state) raise indexer.IndexerException( "Fail to set resource type state of %s to %s" % (name, state)) @staticmethod def get_resource_type_schema(): return base.RESOURCE_TYPE_SCHEMA_MANAGER @staticmethod def get_resource_attributes_schemas(): return [ext.plugin.schema() for ext in ResourceType.RESOURCE_SCHEMAS] def list_resource_types(self): with self.facade.independent_reader() as session: return list(session.query(ResourceType).order_by( ResourceType.name.asc()).all()) # NOTE(jd) We can have deadlock errors either here or later in # map_and_create_tables(). We can't decorate delete_resource_type() # directly or each part might retry later on its own and cause a # duplicate. And it seems there's no way to use the same session for # both adding the resource_type in our table and calling # map_and_create_tables() :-( @retry_on_deadlock def _mark_as_deleting_resource_type(self, name): try: with self.facade.writer() as session: rt = self._get_resource_type(session, name) if rt.state != "active": raise indexer.UnexpectedResourceTypeState( name, "active", rt.state) session.delete(rt) # FIXME(sileht): Why do I need to flush here !!! # I want remove/add in the same transaction !!! session.flush() # NOTE(sileht): delete and recreate to: # * raise duplicate constraints # * ensure we do not create a new resource type # with the same name while we destroy the tables next rt = ResourceType(name=rt.name, tablename=rt.tablename, state="deleting", attributes=rt.attributes) session.add(rt) except exception.DBReferenceError as e: if (e.constraint in [ 'fk_resource_resource_type_name', 'fk_resource_history_resource_type_name', 'fk_rh_resource_type_name']): raise indexer.ResourceTypeInUse(name) raise return rt @retry_on_deadlock def _delete_resource_type(self, name): # Really delete the resource type, no resource can be linked to it # Because we cannot add a resource to a resource_type not in 'active' # state with self.facade.writer() as session: resource_type = self._get_resource_type(session, name) session.delete(resource_type) def delete_resource_type(self, name): if name == "generic": raise indexer.ResourceTypeInUse(name) rt = self._mark_as_deleting_resource_type(name) try: self._RESOURCE_TYPE_MANAGER.unmap_and_delete_tables( rt, self.facade) except Exception: # NOTE(sileht): We fail the DDL, we have no way to automatically # recover, just set a particular state self._set_resource_type_state(rt.name, "deletion_error") raise self._delete_resource_type(name) def _resource_type_to_mappers(self, session, name): resource_type = self._get_resource_type(session, name) if resource_type.state != "active": raise indexer.UnexpectedResourceTypeState( name, "active", resource_type.state) return self._RESOURCE_TYPE_MANAGER.get_classes(resource_type) def list_archive_policies(self): with self.facade.independent_reader() as session: return list(session.query(ArchivePolicy).all()) def get_archive_policy(self, name): with self.facade.independent_reader() as session: return session.query(ArchivePolicy).get(name) def update_archive_policy(self, name, ap_items): with self.facade.independent_writer() as session: ap = session.query(ArchivePolicy).get(name) if not ap: raise indexer.NoSuchArchivePolicy(name) current = sorted(ap.definition, key=operator.attrgetter('granularity')) new = sorted(ap_items, key=operator.attrgetter('granularity')) if len(current) != len(new): raise indexer.UnsupportedArchivePolicyChange( name, 'Cannot add or drop granularities') for c, n in zip(current, new): if c.granularity != n.granularity: raise indexer.UnsupportedArchivePolicyChange( name, '%s granularity interval was changed' % c.granularity) # NOTE(gordc): ORM doesn't update JSON column unless new ap.definition = ap_items return ap def delete_archive_policy(self, name): constraints = [ "fk_metric_ap_name_ap_name", "fk_apr_ap_name_ap_name"] with self.facade.writer() as session: try: if session.query(ArchivePolicy).filter( ArchivePolicy.name == name).delete() == 0: raise indexer.NoSuchArchivePolicy(name) except exception.DBReferenceError as e: if e.constraint in constraints: raise indexer.ArchivePolicyInUse(name) raise def create_archive_policy(self, archive_policy): ap = ArchivePolicy( name=archive_policy.name, back_window=archive_policy.back_window, definition=archive_policy.definition, aggregation_methods=list(archive_policy.aggregation_methods), ) try: with self.facade.writer() as session: session.add(ap) except exception.DBDuplicateEntry: raise indexer.ArchivePolicyAlreadyExists(archive_policy.name) return ap def list_archive_policy_rules(self): with self.facade.independent_reader() as session: return session.query(ArchivePolicyRule).order_by( ArchivePolicyRule.metric_pattern.desc()).all() def get_archive_policy_rule(self, name): with self.facade.independent_reader() as session: return session.query(ArchivePolicyRule).get(name) def delete_archive_policy_rule(self, name): with self.facade.writer() as session: if session.query(ArchivePolicyRule).filter( ArchivePolicyRule.name == name).delete() == 0: raise indexer.NoSuchArchivePolicyRule(name) def create_archive_policy_rule(self, name, metric_pattern, archive_policy_name): apr = ArchivePolicyRule( name=name, archive_policy_name=archive_policy_name, metric_pattern=metric_pattern ) try: with self.facade.writer() as session: session.add(apr) except exception.DBDuplicateEntry: raise indexer.ArchivePolicyRuleAlreadyExists(name) return apr @retry_on_deadlock def create_metric(self, id, created_by_user_id, created_by_project_id, archive_policy_name, name=None, unit=None, resource_id=None): m = Metric(id=id, created_by_user_id=created_by_user_id, created_by_project_id=created_by_project_id, archive_policy_name=archive_policy_name, name=name, unit=unit, resource_id=resource_id) try: with self.facade.writer() as session: session.add(m) except exception.DBReferenceError as e: if (e.constraint == 'fk_metric_ap_name_ap_name'): raise indexer.NoSuchArchivePolicy(archive_policy_name) raise return m @retry_on_deadlock def list_metrics(self, names=None, ids=None, details=False, status='active', limit=None, marker=None, sorts=None, **kwargs): sorts = sorts or [] if ids is not None and not ids: return [] with self.facade.independent_reader() as session: q = session.query(Metric).filter( Metric.status == status) if names is not None: q = q.filter(Metric.name.in_(names)) if ids is not None: q = q.filter(Metric.id.in_(ids)) for attr in kwargs: q = q.filter(getattr(Metric, attr) == kwargs[attr]) if details: q = q.options(sqlalchemy.orm.joinedload('resource')) sort_keys, sort_dirs = self._build_sort_keys(sorts) if marker: metric_marker = self.list_metrics(ids=[marker]) if metric_marker: metric_marker = metric_marker[0] else: raise indexer.InvalidPagination( "Invalid marker: `%s'" % marker) else: metric_marker = None try: q = oslo_db_utils.paginate_query(q, Metric, limit=limit, sort_keys=sort_keys, marker=metric_marker, sort_dirs=sort_dirs) except ValueError as e: raise indexer.InvalidPagination(e) except exception.InvalidSortKey as e: raise indexer.InvalidPagination(e) return list(q.all()) @retry_on_deadlock def create_resource(self, resource_type, id, created_by_user_id, created_by_project_id, user_id=None, project_id=None, started_at=None, ended_at=None, metrics=None, **kwargs): if (started_at is not None and ended_at is not None and started_at > ended_at): raise ValueError( "Start timestamp cannot be after end timestamp") with self.facade.writer() as session: resource_cls = self._resource_type_to_mappers( session, resource_type)['resource'] r = resource_cls( id=id, type=resource_type, created_by_user_id=created_by_user_id, created_by_project_id=created_by_project_id, user_id=user_id, project_id=project_id, started_at=started_at, ended_at=ended_at, **kwargs) session.add(r) try: session.flush() except exception.DBDuplicateEntry: raise indexer.ResourceAlreadyExists(id) except exception.DBReferenceError as ex: raise indexer.ResourceValueError(r.type, ex.key, getattr(r, ex.key)) if metrics is not None: self._set_metrics_for_resource(session, r, metrics) # NOTE(jd) Force load of metrics :) r.metrics return r @retry_on_deadlock def update_resource(self, resource_type, resource_id, ended_at=_marker, metrics=_marker, append_metrics=False, create_revision=True, **kwargs): with self.facade.writer() as session: mappers = self._resource_type_to_mappers(session, resource_type) resource_cls = mappers["resource"] resource_history_cls = mappers["history"] try: # NOTE(sileht): We use FOR UPDATE that is not galera friendly, # but they are no other way to cleanly patch a resource and # store the history that safe when two concurrent calls are # done. q = session.query(resource_cls).filter( resource_cls.id == resource_id).with_for_update() r = q.first() if r is None: raise indexer.NoSuchResource(resource_id) if create_revision: # Build history rh = resource_history_cls() for col in sqlalchemy.inspect(resource_cls).columns: setattr(rh, col.name, getattr(r, col.name)) now = utils.utcnow() rh.revision_end = now session.add(rh) r.revision_start = now # Update the resource if ended_at is not _marker: # NOTE(jd) MySQL does not honor checks. I hate it. engine = session.connection() if engine.dialect.name == "mysql": if r.started_at is not None and ended_at is not None: if r.started_at > ended_at: raise indexer.ResourceValueError( resource_type, "ended_at", ended_at) r.ended_at = ended_at if kwargs: for attribute, value in six.iteritems(kwargs): if hasattr(r, attribute): setattr(r, attribute, value) else: raise indexer.ResourceAttributeError( r.type, attribute) if metrics is not _marker: if not append_metrics: session.query(Metric).filter( Metric.resource_id == resource_id, Metric.status == 'active').update( {"resource_id": None}) self._set_metrics_for_resource(session, r, metrics) session.flush() except exception.DBConstraintError as e: if e.check_name == "ck_started_before_ended": raise indexer.ResourceValueError( resource_type, "ended_at", ended_at) raise # NOTE(jd) Force load of metrics – do it outside the session! r.metrics return r @staticmethod def _set_metrics_for_resource(session, r, metrics): for name, value in six.iteritems(metrics): if isinstance(value, uuid.UUID): try: update = session.query(Metric).filter( Metric.id == value, Metric.status == 'active', (Metric.created_by_user_id == r.created_by_user_id), (Metric.created_by_project_id == r.created_by_project_id), ).update({"resource_id": r.id, "name": name}) except exception.DBDuplicateEntry: raise indexer.NamedMetricAlreadyExists(name) if update == 0: raise indexer.NoSuchMetric(value) else: unit = value.get('unit') ap_name = value['archive_policy_name'] m = Metric(id=uuid.uuid4(), created_by_user_id=r.created_by_user_id, created_by_project_id=r.created_by_project_id, archive_policy_name=ap_name, name=name, unit=unit, resource_id=r.id) session.add(m) try: session.flush() except exception.DBDuplicateEntry: raise indexer.NamedMetricAlreadyExists(name) except exception.DBReferenceError as e: if (e.constraint == 'fk_metric_ap_name_ap_name'): raise indexer.NoSuchArchivePolicy(ap_name) raise session.expire(r, ['metrics']) @retry_on_deadlock def delete_resource(self, resource_id): with self.facade.writer() as session: # We are going to delete the resource; the on delete will set the # resource_id of the attached metrics to NULL, we just have to mark # their status as 'delete' session.query(Metric).filter( Metric.resource_id == resource_id).update( {"status": "delete"}) if session.query(Resource).filter( Resource.id == resource_id).delete() == 0: raise indexer.NoSuchResource(resource_id) @retry_on_deadlock def get_resource(self, resource_type, resource_id, with_metrics=False): with self.facade.independent_reader() as session: resource_cls = self._resource_type_to_mappers( session, resource_type)['resource'] q = session.query( resource_cls).filter( resource_cls.id == resource_id) if with_metrics: q = q.options(sqlalchemy.orm.joinedload('metrics')) return q.first() def _get_history_result_mapper(self, session, resource_type): mappers = self._resource_type_to_mappers(session, resource_type) resource_cls = mappers['resource'] history_cls = mappers['history'] resource_cols = {} history_cols = {} for col in sqlalchemy.inspect(history_cls).columns: history_cols[col.name] = col if col.name in ["revision", "revision_end"]: value = None if col.name == "revision_end" else -1 resource_cols[col.name] = sqlalchemy.bindparam( col.name, value, col.type).label(col.name) else: resource_cols[col.name] = getattr(resource_cls, col.name) s1 = sqlalchemy.select(history_cols.values()) s2 = sqlalchemy.select(resource_cols.values()) if resource_type != "generic": s1 = s1.where(history_cls.revision == ResourceHistory.revision) s2 = s2.where(resource_cls.id == Resource.id) union_stmt = sqlalchemy.union(s1, s2) stmt = union_stmt.alias("result") class Result(base.ResourceJsonifier, base.GnocchiBase): def __iter__(self): return iter((key, getattr(self, key)) for key in stmt.c.keys()) sqlalchemy.orm.mapper( Result, stmt, primary_key=[stmt.c.id, stmt.c.revision], properties={ 'metrics': sqlalchemy.orm.relationship( Metric, primaryjoin=sqlalchemy.and_( Metric.resource_id == stmt.c.id, Metric.status == 'active'), foreign_keys=Metric.resource_id) }) return Result @retry_on_deadlock def list_resources(self, resource_type='generic', attribute_filter=None, details=False, history=False, limit=None, marker=None, sorts=None): sorts = sorts or [] with self.facade.independent_reader() as session: if history: target_cls = self._get_history_result_mapper( session, resource_type) else: target_cls = self._resource_type_to_mappers( session, resource_type)["resource"] q = session.query(target_cls) if attribute_filter: engine = session.connection() try: f = QueryTransformer.build_filter(engine.dialect.name, target_cls, attribute_filter) except indexer.QueryAttributeError as e: # NOTE(jd) The QueryAttributeError does not know about # resource_type, so convert it raise indexer.ResourceAttributeError(resource_type, e.attribute) q = q.filter(f) sort_keys, sort_dirs = self._build_sort_keys(sorts) if marker: resource_marker = self.get_resource(resource_type, marker) if resource_marker is None: raise indexer.InvalidPagination( "Invalid marker: `%s'" % marker) else: resource_marker = None try: q = oslo_db_utils.paginate_query(q, target_cls, limit=limit, sort_keys=sort_keys, marker=resource_marker, sort_dirs=sort_dirs) except ValueError as e: raise indexer.InvalidPagination(e) except exception.InvalidSortKey as e: raise indexer.InvalidPagination(e) # Always include metrics q = q.options(sqlalchemy.orm.joinedload("metrics")) all_resources = q.all() if details: grouped_by_type = itertools.groupby( all_resources, lambda r: (r.revision != -1, r.type)) all_resources = [] for (is_history, type), resources in grouped_by_type: if type == 'generic': # No need for a second query all_resources.extend(resources) else: try: target_cls = self._resource_type_to_mappers( session, type)['history' if is_history else 'resource'] except (indexer.UnexpectedResourceTypeState, indexer.NoSuchResourceType): # NOTE(sileht): This resource_type have been # removed in the meantime. continue if is_history: f = target_cls.revision.in_([r.revision for r in resources]) else: f = target_cls.id.in_([r.id for r in resources]) q = session.query(target_cls).filter(f) # Always include metrics q = q.options(sqlalchemy.orm.joinedload('metrics')) try: all_resources.extend(q.all()) except sqlalchemy.exc.ProgrammingError as e: # NOTE(jd) This exception can happen when the # resources and their resource type have been # deleted in the meantime: # sqlalchemy.exc.ProgrammingError: # (pymysql.err.ProgrammingError) # (1146, "Table \'test.rt_f00\' doesn\'t exist") # In that case, just ignore those resources. if (not pymysql or not isinstance( e, sqlalchemy.exc.ProgrammingError) or not isinstance( e.orig, pymysql.err.ProgrammingError) or (e.orig.args[0] != pymysql.constants.ER.NO_SUCH_TABLE)): raise return all_resources def expunge_metric(self, id): with self.facade.writer() as session: if session.query(Metric).filter(Metric.id == id).delete() == 0: raise indexer.NoSuchMetric(id) def delete_metric(self, id): with self.facade.writer() as session: if session.query(Metric).filter( Metric.id == id, Metric.status == 'active').update( {"status": "delete"}) == 0: raise indexer.NoSuchMetric(id) @staticmethod def _build_sort_keys(sorts): # transform the api-wg representation to the oslo.db one sort_keys = [] sort_dirs = [] for sort in sorts: sort_key, __, sort_dir = sort.partition(":") sort_keys.append(sort_key.strip()) sort_dirs.append(sort_dir or 'asc') # paginate_query require at list one uniq column if 'id' not in sort_keys: sort_keys.append('id') sort_dirs.append('asc') return sort_keys, sort_dirs class QueryTransformer(object): unary_operators = { u"not": sqlalchemy.not_, } binary_operators = { u"=": operator.eq, u"==": operator.eq, u"eq": operator.eq, u"<": operator.lt, u"lt": operator.lt, u">": operator.gt, u"gt": operator.gt, u"<=": operator.le, u"≤": operator.le, u"le": operator.le, u">=": operator.ge, u"≥": operator.ge, u"ge": operator.ge, u"!=": operator.ne, u"≠": operator.ne, u"ne": operator.ne, u"in": lambda field_name, values: field_name.in_(values), u"like": lambda field, value: field.like(value), } multiple_operators = { u"or": sqlalchemy.or_, u"∨": sqlalchemy.or_, u"and": sqlalchemy.and_, u"∧": sqlalchemy.and_, } @classmethod def _handle_multiple_op(cls, engine, table, op, nodes): return op(*[ cls.build_filter(engine, table, node) for node in nodes ]) @classmethod def _handle_unary_op(cls, engine, table, op, node): return op(cls.build_filter(engine, table, node)) @staticmethod def _handle_binary_op(engine, table, op, nodes): try: field_name, value = list(nodes.items())[0] except Exception: raise indexer.QueryError() if field_name == "lifespan": attr = getattr(table, "ended_at") - getattr(table, "started_at") value = utils.to_timespan(value) if engine == "mysql": # NOTE(jd) So subtracting 2 timestamps in MySQL result in some # weird results based on string comparison. It's useless and it # does not work at all with seconds or anything. Just skip it. raise exceptions.NotImplementedError else: try: attr = getattr(table, field_name) except AttributeError: raise indexer.QueryAttributeError(table, field_name) if not hasattr(attr, "type"): # This is not a column raise indexer.QueryAttributeError(table, field_name) # Convert value to the right type if value is not None: converter = None if isinstance(attr.type, base.PreciseTimestamp): converter = utils.to_timestamp elif (isinstance(attr.type, sqlalchemy_utils.UUIDType) and not isinstance(value, uuid.UUID)): converter = utils.ResourceUUID elif isinstance(attr.type, types.String): converter = six.text_type elif isinstance(attr.type, types.Integer): converter = int elif isinstance(attr.type, types.Numeric): converter = float if converter: try: if isinstance(value, list): # we got a list for in_ operator value = [converter(v) for v in value] else: value = converter(value) except Exception: raise indexer.QueryValueError(value, field_name) return op(attr, value) @classmethod def build_filter(cls, engine, table, tree): try: operator, nodes = list(tree.items())[0] except Exception: raise indexer.QueryError() try: op = cls.multiple_operators[operator] except KeyError: try: op = cls.binary_operators[operator] except KeyError: try: op = cls.unary_operators[operator] except KeyError: raise indexer.QueryInvalidOperator(operator) return cls._handle_unary_op(engine, op, nodes) return cls._handle_binary_op(engine, table, op, nodes) return cls._handle_multiple_op(engine, table, op, nodes)