From af1bfe7d0e7dbfb7ca15c60c2f107d162623a7b7 Mon Sep 17 00:00:00 2001 From: Ildiko Vancsa Date: Sat, 15 Feb 2014 18:52:14 +0100 Subject: [PATCH] Remove code duplication Remove code duplication from impl_mongodb and impl_db2 as both of these drivers use pymongo. This is a multiple step process as there are several differences between the implementation of these drivers. The first step is about to move the identical functions to a common base file, called pymongo_base with a Connection class, from which the Connection class of both drivers will be inherited. Change-Id: Ib3c250213cda5450c96c6e495f56623ed7ac0490 --- ceilometer/storage/impl_db2.py | 256 +------------------------- ceilometer/storage/impl_mongodb.py | 265 ++------------------------- ceilometer/storage/pymongo_base.py | 282 +++++++++++++++++++++++++++++ 3 files changed, 306 insertions(+), 497 deletions(-) create mode 100644 ceilometer/storage/pymongo_base.py diff --git a/ceilometer/storage/impl_db2.py b/ceilometer/storage/impl_db2.py index c10c5c6f2c..ef7e28b038 100644 --- a/ceilometer/storage/impl_db2.py +++ b/ceilometer/storage/impl_db2.py @@ -26,7 +26,6 @@ import copy import datetime import itertools import sys -import weakref import bson.code import bson.objectid @@ -38,6 +37,7 @@ from ceilometer.openstack.common import timeutils from ceilometer import storage from ceilometer.storage import base from ceilometer.storage import models +from ceilometer.storage import pymongo_base LOG = log.getLogger(__name__) @@ -74,101 +74,11 @@ class DB2Storage(base.StorageEngine): return Connection(conf) -def make_timestamp_range(start, end, - start_timestamp_op=None, end_timestamp_op=None): - """Given two possible datetimes and their operations, create the query - document to find timestamps within that range. - By default, using $gte for the lower bound and $lt for the - upper bound. - """ - ts_range = {} - - if start: - if start_timestamp_op == 'gt': - start_timestamp_op = '$gt' - else: - start_timestamp_op = '$gte' - ts_range[start_timestamp_op] = start - - if end: - if end_timestamp_op == 'le': - end_timestamp_op = '$lte' - else: - end_timestamp_op = '$lt' - ts_range[end_timestamp_op] = end - return ts_range - - -def make_query_from_filter(sample_filter, require_meter=True): - """Return a query dictionary based on the settings in the filter. - - :param filter: SampleFilter instance - :param require_meter: If true and the filter does not have a meter, - raise an error. - """ - q = {} - - if sample_filter.user: - q['user_id'] = sample_filter.user - if sample_filter.project: - q['project_id'] = sample_filter.project - - if sample_filter.meter: - q['counter_name'] = sample_filter.meter - elif require_meter: - raise RuntimeError('Missing required meter specifier') - - ts_range = make_timestamp_range(sample_filter.start, sample_filter.end, - sample_filter.start_timestamp_op, - sample_filter.end_timestamp_op) - if ts_range: - q['timestamp'] = ts_range - - if sample_filter.resource: - q['resource_id'] = sample_filter.resource - if sample_filter.source: - q['source'] = sample_filter.source - if sample_filter.message_id: - q['message_id'] = sample_filter.message_id - - # so the samples call metadata resource_metadata, so we convert - # to that. - q.update(dict(('resource_%s' % k, v) - for (k, v) in sample_filter.metaquery.iteritems())) - return q - - -class ConnectionPool(object): - - def __init__(self): - self._pool = {} - - def connect(self, url): - connection_options = pymongo.uri_parser.parse_uri(url) - del connection_options['database'] - del connection_options['username'] - del connection_options['password'] - del connection_options['collection'] - pool_key = tuple(connection_options) - - if pool_key in self._pool: - client = self._pool.get(pool_key)() - if client: - return client - LOG.info(_('Connecting to DB2 on %s'), - connection_options['nodelist']) - client = pymongo.MongoClient( - url, - safe=True) - self._pool[pool_key] = weakref.ref(client) - return client - - -class Connection(base.Connection): +class Connection(pymongo_base.Connection): """DB2 connection. """ - CONNECTION_POOL = ConnectionPool() + CONNECTION_POOL = pymongo_base.ConnectionPool() GROUP = {'_id': '$counter_name', 'unit': {'$min': '$counter_unit'}, @@ -352,32 +262,6 @@ class Connection(base.Connection): record['_id'] = str(bson.objectid.ObjectId()) self.db.meter.insert(record) - def get_users(self, source=None): - """Return an iterable of user id strings. - - :param source: Optional source filter. - """ - q = {} - if source is not None: - q['source'] = source - - return (doc['_id'] for doc in - self.db.user.find(q, fields=['_id'], - sort=[('_id', pymongo.ASCENDING)])) - - def get_projects(self, source=None): - """Return an iterable of project id strings. - - :param source: Optional source filter. - """ - q = {} - if source is not None: - q['source'] = source - - return (doc['_id'] for doc in - self.db.project.find(q, fields=['_id'], - sort=[('_id', pymongo.ASCENDING)])) - def get_resources(self, user=None, project=None, source=None, start_timestamp=None, start_timestamp_op=None, end_timestamp=None, end_timestamp_op=None, @@ -415,9 +299,10 @@ class Connection(base.Connection): # Look for resources matching the above criteria and with # samples in the time range we care about, then change the # resource query to return just those resources by id. - ts_range = make_timestamp_range(start_timestamp, end_timestamp, - start_timestamp_op, - end_timestamp_op) + ts_range = pymongo_base.make_timestamp_range(start_timestamp, + end_timestamp, + start_timestamp_op, + end_timestamp_op) if ts_range: q['timestamp'] = ts_range @@ -444,46 +329,6 @@ class Connection(base.Connection): user_id=latest_meter['user_id'], metadata=latest_meter['resource_metadata']) - def get_meters(self, user=None, project=None, resource=None, source=None, - metaquery={}, pagination=None): - """Return an iterable of models.Meter instances - - :param user: Optional ID for user that owns the resource. - :param project: Optional ID for project that owns the resource. - :param resource: Optional resource filter. - :param source: Optional source filter. - :param metaquery: Optional dict with metadata to match on. - :param pagination: Optional pagination query. - """ - - if pagination: - raise NotImplementedError(_('Pagination not implemented')) - - q = {} - if user is not None: - q['user_id'] = user - if project is not None: - q['project_id'] = project - if resource is not None: - q['_id'] = resource - if source is not None: - q['source'] = source - q.update(metaquery) - - for r in self.db.resource.find(q): - for r_meter in r['meter']: - yield models.Meter( - name=r_meter['counter_name'], - type=r_meter['counter_type'], - # Return empty string if 'counter_unit' is not valid for - # backward compatibility. - unit=r_meter.get('counter_unit', ''), - resource_id=r['_id'], - project_id=r['project_id'], - source=r['source'], - user_id=r['user_id'], - ) - def get_samples(self, sample_filter, limit=None): """Return an iterable of model.Sample instances. @@ -492,7 +337,8 @@ class Connection(base.Connection): """ if limit == 0: return - q = make_query_from_filter(sample_filter, require_meter=False) + q = pymongo_base.make_query_from_filter(sample_filter, + require_meter=False) if limit: samples = self.db.meter.find( @@ -521,7 +367,7 @@ class Connection(base.Connection): 'resource_id', 'source'])): raise NotImplementedError("Unable to group by these fields") - q = make_query_from_filter(sample_filter) + q = pymongo_base.make_query_from_filter(sample_filter) if period: if sample_filter.start: @@ -591,67 +437,6 @@ class Connection(base.Connection): stat.period_end = stat.duration_end yield stat - @staticmethod - def _decode_matching_metadata(matching_metadata): - if isinstance(matching_metadata, dict): - #note(sileht): keep compatibility with old db format - return matching_metadata - else: - new_matching_metadata = {} - for elem in matching_metadata: - new_matching_metadata[elem['key']] = elem['value'] - return new_matching_metadata - - @classmethod - def _ensure_encapsulated_rule_format(cls, alarm): - """This ensure the alarm returned by the storage have the correct - format. The previous format looks like: - { - 'alarm_id': '0ld-4l3rt', - 'enabled': True, - 'name': 'old-alert', - 'description': 'old-alert', - 'timestamp': None, - 'meter_name': 'cpu', - 'user_id': 'me', - 'project_id': 'and-da-boys', - 'comparison_operator': 'lt', - 'threshold': 36, - 'statistic': 'count', - 'evaluation_periods': 1, - 'period': 60, - 'state': "insufficient data", - 'state_timestamp': None, - 'ok_actions': [], - 'alarm_actions': ['http://nowhere/alarms'], - 'insufficient_data_actions': [], - 'repeat_actions': False, - 'matching_metadata': {'key': 'value'} - # or 'matching_metadata': [{'key': 'key', 'value': 'value'}] - } - """ - - if isinstance(alarm.get('rule'), dict): - return - - alarm['type'] = 'threshold' - alarm['rule'] = {} - alarm['matching_metadata'] = cls._decode_matching_metadata( - alarm['matching_metadata']) - for field in ['period', 'evaluation_period', 'threshold', - 'statistic', 'comparison_operator', 'meter_name']: - if field in alarm: - alarm['rule'][field] = alarm[field] - del alarm[field] - - query = [] - for key in alarm['matching_metadata']: - query.append({'field': key, - 'op': 'eq', - 'value': alarm['matching_metadata'][key]}) - del alarm['matching_metadata'] - alarm['rule']['query'] = query - def get_alarms(self, name=None, user=None, project=None, enabled=None, alarm_id=None, pagination=None): """Yields a lists of alarms that match filters @@ -685,24 +470,3 @@ class Connection(base.Connection): del a['_id'] self._ensure_encapsulated_rule_format(a) yield models.Alarm(**a) - - def update_alarm(self, alarm): - """update alarm - """ - data = alarm.as_dict() - self.db.alarm.update( - {'alarm_id': alarm.alarm_id}, - {'$set': data}, - upsert=True) - - stored_alarm = self.db.alarm.find({'alarm_id': alarm.alarm_id})[0] - del stored_alarm['_id'] - self._ensure_encapsulated_rule_format(stored_alarm) - return models.Alarm(**stored_alarm) - - create_alarm = update_alarm - - def delete_alarm(self, alarm_id): - """Delete an alarm - """ - self.db.alarm.remove({'alarm_id': alarm_id}) diff --git a/ceilometer/storage/impl_mongodb.py b/ceilometer/storage/impl_mongodb.py index 05d4633f53..5b5d92a023 100644 --- a/ceilometer/storage/impl_mongodb.py +++ b/ceilometer/storage/impl_mongodb.py @@ -25,7 +25,6 @@ import copy import json import operator import uuid -import weakref import bson.code import bson.objectid @@ -39,6 +38,7 @@ from ceilometer.openstack.common import timeutils from ceilometer import storage from ceilometer.storage import base from ceilometer.storage import models +from ceilometer.storage import pymongo_base cfg.CONF.import_opt('time_to_live', 'ceilometer.storage', group="database") @@ -78,101 +78,11 @@ class MongoDBStorage(base.StorageEngine): return Connection(conf) -def make_timestamp_range(start, end, - start_timestamp_op=None, end_timestamp_op=None): - """Given two possible datetimes and their operations, create the query - document to find timestamps within that range. - By default, using $gte for the lower bound and $lt for the - upper bound. - """ - ts_range = {} - - if start: - if start_timestamp_op == 'gt': - start_timestamp_op = '$gt' - else: - start_timestamp_op = '$gte' - ts_range[start_timestamp_op] = start - - if end: - if end_timestamp_op == 'le': - end_timestamp_op = '$lte' - else: - end_timestamp_op = '$lt' - ts_range[end_timestamp_op] = end - return ts_range - - -def make_query_from_filter(sample_filter, require_meter=True): - """Return a query dictionary based on the settings in the filter. - - :param filter: SampleFilter instance - :param require_meter: If true and the filter does not have a meter, - raise an error. - """ - q = {} - - if sample_filter.user: - q['user_id'] = sample_filter.user - if sample_filter.project: - q['project_id'] = sample_filter.project - - if sample_filter.meter: - q['counter_name'] = sample_filter.meter - elif require_meter: - raise RuntimeError('Missing required meter specifier') - - ts_range = make_timestamp_range(sample_filter.start, sample_filter.end, - sample_filter.start_timestamp_op, - sample_filter.end_timestamp_op) - if ts_range: - q['timestamp'] = ts_range - - if sample_filter.resource: - q['resource_id'] = sample_filter.resource - if sample_filter.source: - q['source'] = sample_filter.source - if sample_filter.message_id: - q['message_id'] = sample_filter.message_id - - # so the samples call metadata resource_metadata, so we convert - # to that. - q.update(dict(('resource_%s' % k, v) - for (k, v) in sample_filter.metaquery.iteritems())) - return q - - -class ConnectionPool(object): - - def __init__(self): - self._pool = {} - - def connect(self, url): - connection_options = pymongo.uri_parser.parse_uri(url) - del connection_options['database'] - del connection_options['username'] - del connection_options['password'] - del connection_options['collection'] - pool_key = tuple(connection_options) - - if pool_key in self._pool: - client = self._pool.get(pool_key)() - if client: - return client - LOG.info(_('Connecting to MongoDB on %s'), - connection_options['nodelist']) - client = pymongo.MongoClient( - url, - safe=True) - self._pool[pool_key] = weakref.ref(client) - return client - - -class Connection(base.Connection): +class Connection(pymongo_base.Connection): """MongoDB connection. """ - CONNECTION_POOL = ConnectionPool() + CONNECTION_POOL = pymongo_base.ConnectionPool() REDUCE_GROUP_CLEAN = bson.code.Code(""" function ( curr, result ) { @@ -610,32 +520,6 @@ class Connection(base.Connection): limit = 0 return db_collection.find(q, limit=limit, sort=all_sort) - def get_users(self, source=None): - """Return an iterable of user id strings. - - :param source: Optional source filter. - """ - q = {} - if source is not None: - q['source'] = source - - return (doc['_id'] for doc in - self.db.user.find(q, fields=['_id'], - sort=[('_id', pymongo.ASCENDING)])) - - def get_projects(self, source=None): - """Return an iterable of project id strings. - - :param source: Optional source filter. - """ - q = {} - if source is not None: - q['source'] = source - - return (doc['_id'] for doc in - self.db.project.find(q, fields=['_id'], - sort=[('_id', pymongo.ASCENDING)])) - def get_resources(self, user=None, project=None, source=None, start_timestamp=None, start_timestamp_op=None, end_timestamp=None, end_timestamp_op=None, @@ -677,9 +561,10 @@ class Connection(base.Connection): # Look for resources matching the above criteria and with # samples in the time range we care about, then change the # resource query to return just those resources by id. - ts_range = make_timestamp_range(start_timestamp, end_timestamp, - start_timestamp_op, - end_timestamp_op) + ts_range = pymongo_base.make_timestamp_range(start_timestamp, + end_timestamp, + start_timestamp_op, + end_timestamp_op) if ts_range: q['timestamp'] = ts_range @@ -710,45 +595,6 @@ class Connection(base.Connection): finally: self.db[out].drop() - def get_meters(self, user=None, project=None, resource=None, source=None, - metaquery={}, pagination=None): - """Return an iterable of models.Meter instances - - :param user: Optional ID for user that owns the resource. - :param project: Optional ID for project that owns the resource. - :param resource: Optional resource filter. - :param source: Optional source filter. - :param metaquery: Optional dict with metadata to match on. - :param pagination: Optional pagination query. - """ - if pagination: - raise NotImplementedError(_('Pagination not implemented')) - - q = {} - if user is not None: - q['user_id'] = user - if project is not None: - q['project_id'] = project - if resource is not None: - q['_id'] = resource - if source is not None: - q['source'] = source - q.update(metaquery) - - for r in self.db.resource.find(q): - for r_meter in r['meter']: - yield models.Meter( - name=r_meter['counter_name'], - type=r_meter['counter_type'], - # Return empty string if 'counter_unit' is not valid for - # backward compatibility. - unit=r_meter.get('counter_unit', ''), - resource_id=r['_id'], - project_id=r['project_id'], - source=r['source'], - user_id=r['user_id'], - ) - def _retrieve_samples(self, query, orderby, limit): if limit is not None: samples = self.db.meter.find(query, @@ -775,7 +621,8 @@ class Connection(base.Connection): """ if limit == 0: return [] - q = make_query_from_filter(sample_filter, require_meter=False) + q = pymongo_base.make_query_from_filter(sample_filter, + require_meter=False) return self._retrieve_samples(q, [("timestamp", pymongo.DESCENDING)], @@ -851,7 +698,7 @@ class Connection(base.Connection): 'resource_id', 'source'])): raise NotImplementedError("Unable to group by these fields") - q = make_query_from_filter(sample_filter) + q = pymongo_base.make_query_from_filter(sample_filter) if period: if sample_filter.start: @@ -889,69 +736,6 @@ class Connection(base.Connection): (models.Statistics(**(r['value'])) for r in results['results']), key=operator.attrgetter('period_start')) - @staticmethod - def _decode_matching_metadata(matching_metadata): - if isinstance(matching_metadata, dict): - #note(sileht): keep compatibility with alarm - #with matching_metadata as a dict - return matching_metadata - else: - new_matching_metadata = {} - for elem in matching_metadata: - new_matching_metadata[elem['key']] = elem['value'] - return new_matching_metadata - - @classmethod - def _ensure_encapsulated_rule_format(cls, alarm): - """This ensure the alarm returned by the storage have the correct - format. The previous format looks like: - { - 'alarm_id': '0ld-4l3rt', - 'enabled': True, - 'name': 'old-alert', - 'description': 'old-alert', - 'timestamp': None, - 'meter_name': 'cpu', - 'user_id': 'me', - 'project_id': 'and-da-boys', - 'comparison_operator': 'lt', - 'threshold': 36, - 'statistic': 'count', - 'evaluation_periods': 1, - 'period': 60, - 'state': "insufficient data", - 'state_timestamp': None, - 'ok_actions': [], - 'alarm_actions': ['http://nowhere/alarms'], - 'insufficient_data_actions': [], - 'repeat_actions': False, - 'matching_metadata': {'key': 'value'} - # or 'matching_metadata': [{'key': 'key', 'value': 'value'}] - } - """ - - if isinstance(alarm.get('rule'), dict): - return - - alarm['type'] = 'threshold' - alarm['rule'] = {} - alarm['matching_metadata'] = cls._decode_matching_metadata( - alarm['matching_metadata']) - for field in ['period', 'evaluation_periods', 'threshold', - 'statistic', 'comparison_operator', 'meter_name']: - if field in alarm: - alarm['rule'][field] = alarm[field] - del alarm[field] - - query = [] - for key in alarm['matching_metadata']: - query.append({'field': key, - 'op': 'eq', - 'value': alarm['matching_metadata'][key], - 'type': 'string'}) - del alarm['matching_metadata'] - alarm['rule']['query'] = query - def _retrieve_alarms(self, query_filter, orderby, limit): if limit is not None: alarms = self.db.alarm.find(query_filter, @@ -995,28 +779,6 @@ class Connection(base.Connection): return self._retrieve_alarms(q, [], None) - def update_alarm(self, alarm): - """update alarm - """ - data = alarm.as_dict() - - self.db.alarm.update( - {'alarm_id': alarm.alarm_id}, - {'$set': data}, - upsert=True) - - stored_alarm = self.db.alarm.find({'alarm_id': alarm.alarm_id})[0] - del stored_alarm['_id'] - self._ensure_encapsulated_rule_format(stored_alarm) - return models.Alarm(**stored_alarm) - - create_alarm = update_alarm - - def delete_alarm(self, alarm_id): - """Delete an alarm - """ - self.db.alarm.remove({'alarm_id': alarm_id}) - def _retrieve_alarm_changes(self, query_filter, orderby, limit): if limit is not None: alarms_history = self.db.alarm_history.find(query_filter, @@ -1069,9 +831,10 @@ class Connection(base.Connection): if type is not None: q['type'] = type if start_timestamp or end_timestamp: - ts_range = make_timestamp_range(start_timestamp, end_timestamp, - start_timestamp_op, - end_timestamp_op) + ts_range = pymongo_base.make_timestamp_range(start_timestamp, + end_timestamp, + start_timestamp_op, + end_timestamp_op) if ts_range: q['timestamp'] = ts_range diff --git a/ceilometer/storage/pymongo_base.py b/ceilometer/storage/pymongo_base.py new file mode 100644 index 0000000000..0238e693f5 --- /dev/null +++ b/ceilometer/storage/pymongo_base.py @@ -0,0 +1,282 @@ +# -*- encoding: utf-8 -*- +# +# Copyright Ericsson AB 2013. All rights reserved +# +# Authors: Ildiko Vancsa +# Balazs Gibizer +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Common functions for MongoDB and DB2 backends +""" + +import pymongo +import weakref + +from ceilometer.openstack.common.gettextutils import _ # noqa +from ceilometer.openstack.common import log +from ceilometer.openstack.common import network_utils +from ceilometer.storage import base +from ceilometer.storage import models + +LOG = log.getLogger(__name__) + + +def make_timestamp_range(start, end, + start_timestamp_op=None, end_timestamp_op=None): + + """Given two possible datetimes and their operations, create the query + document to find timestamps within that range. + By default, using $gte for the lower bound and $lt for the + upper bound. + """ + ts_range = {} + + if start: + if start_timestamp_op == 'gt': + start_timestamp_op = '$gt' + else: + start_timestamp_op = '$gte' + ts_range[start_timestamp_op] = start + + if end: + if end_timestamp_op == 'le': + end_timestamp_op = '$lte' + else: + end_timestamp_op = '$lt' + ts_range[end_timestamp_op] = end + return ts_range + + +def make_query_from_filter(sample_filter, require_meter=True): + """Return a query dictionary based on the settings in the filter. + + :param filter: SampleFilter instance + :param require_meter: If true and the filter does not have a meter, + raise an error. + """ + q = {} + + if sample_filter.user: + q['user_id'] = sample_filter.user + if sample_filter.project: + q['project_id'] = sample_filter.project + + if sample_filter.meter: + q['counter_name'] = sample_filter.meter + elif require_meter: + raise RuntimeError('Missing required meter specifier') + + ts_range = make_timestamp_range(sample_filter.start, + sample_filter.end, + sample_filter.start_timestamp_op, + sample_filter.end_timestamp_op) + + if ts_range: + q['timestamp'] = ts_range + + if sample_filter.resource: + q['resource_id'] = sample_filter.resource + if sample_filter.source: + q['source'] = sample_filter.source + if sample_filter.message_id: + q['message_id'] = sample_filter.message_id + + # so the samples call metadata resource_metadata, so we convert + # to that. + q.update(dict(('resource_%s' % k, v) + for (k, v) in sample_filter.metaquery.iteritems())) + return q + + +class ConnectionPool(object): + + def __init__(self): + self._pool = {} + + def connect(self, url): + connection_options = pymongo.uri_parser.parse_uri(url) + del connection_options['database'] + del connection_options['username'] + del connection_options['password'] + del connection_options['collection'] + pool_key = tuple(connection_options) + + if pool_key in self._pool: + client = self._pool.get(pool_key)() + if client: + return client + splitted_url = network_utils.urlsplit(url) + log_data = {'db': splitted_url.scheme, + 'nodelist': connection_options['nodelist']} + LOG.info(_('Connecting to %(db)s on %(nodelist)s') % log_data) + client = pymongo.MongoClient( + url, + safe=True) + self._pool[pool_key] = weakref.ref(client) + return client + + +class Connection(base.Connection): + """Base Connection class for MongoDB and DB2 drivers. + """ + + def get_users(self, source=None): + """Return an iterable of user id strings. + + :param source: Optional source filter. + """ + q = {} + if source is not None: + q['source'] = source + + return (doc['_id'] for doc in + self.db.user.find(q, fields=['_id'], + sort=[('_id', pymongo.ASCENDING)])) + + def get_projects(self, source=None): + """Return an iterable of project id strings. + + :param source: Optional source filter. + """ + q = {} + if source is not None: + q['source'] = source + + return (doc['_id'] for doc in + self.db.project.find(q, fields=['_id'], + sort=[('_id', pymongo.ASCENDING)])) + + def get_meters(self, user=None, project=None, resource=None, source=None, + metaquery={}, pagination=None): + """Return an iterable of models.Meter instances + + :param user: Optional ID for user that owns the resource. + :param project: Optional ID for project that owns the resource. + :param resource: Optional resource filter. + :param source: Optional source filter. + :param metaquery: Optional dict with metadata to match on. + :param pagination: Optional pagination query. + """ + + if pagination: + raise NotImplementedError(_('Pagination not implemented')) + + q = {} + if user is not None: + q['user_id'] = user + if project is not None: + q['project_id'] = project + if resource is not None: + q['_id'] = resource + if source is not None: + q['source'] = source + q.update(metaquery) + + for r in self.db.resource.find(q): + for r_meter in r['meter']: + yield models.Meter( + name=r_meter['counter_name'], + type=r_meter['counter_type'], + # Return empty string if 'counter_unit' is not valid for + # backward compatibility. + unit=r_meter.get('counter_unit', ''), + resource_id=r['_id'], + project_id=r['project_id'], + source=r['source'], + user_id=r['user_id'], + ) + + def update_alarm(self, alarm): + """update alarm + """ + data = alarm.as_dict() + + self.db.alarm.update( + {'alarm_id': alarm.alarm_id}, + {'$set': data}, + upsert=True) + + stored_alarm = self.db.alarm.find({'alarm_id': alarm.alarm_id})[0] + del stored_alarm['_id'] + self._ensure_encapsulated_rule_format(stored_alarm) + return models.Alarm(**stored_alarm) + + create_alarm = update_alarm + + def delete_alarm(self, alarm_id): + """Delete an alarm + """ + self.db.alarm.remove({'alarm_id': alarm_id}) + + @classmethod + def _ensure_encapsulated_rule_format(cls, alarm): + """This ensure the alarm returned by the storage have the correct + format. The previous format looks like: + { + 'alarm_id': '0ld-4l3rt', + 'enabled': True, + 'name': 'old-alert', + 'description': 'old-alert', + 'timestamp': None, + 'meter_name': 'cpu', + 'user_id': 'me', + 'project_id': 'and-da-boys', + 'comparison_operator': 'lt', + 'threshold': 36, + 'statistic': 'count', + 'evaluation_periods': 1, + 'period': 60, + 'state': "insufficient data", + 'state_timestamp': None, + 'ok_actions': [], + 'alarm_actions': ['http://nowhere/alarms'], + 'insufficient_data_actions': [], + 'repeat_actions': False, + 'matching_metadata': {'key': 'value'} + # or 'matching_metadata': [{'key': 'key', 'value': 'value'}] + } + """ + + if isinstance(alarm.get('rule'), dict): + return + + alarm['type'] = 'threshold' + alarm['rule'] = {} + alarm['matching_metadata'] = cls._decode_matching_metadata( + alarm['matching_metadata']) + for field in ['period', 'evaluation_periods', 'threshold', + 'statistic', 'comparison_operator', 'meter_name']: + if field in alarm: + alarm['rule'][field] = alarm[field] + del alarm[field] + + query = [] + for key in alarm['matching_metadata']: + query.append({'field': key, + 'op': 'eq', + 'value': alarm['matching_metadata'][key], + 'type': 'string'}) + del alarm['matching_metadata'] + alarm['rule']['query'] = query + + @staticmethod + def _decode_matching_metadata(matching_metadata): + if isinstance(matching_metadata, dict): + #note(sileht): keep compatibility with alarm + #with matching_metadata as a dict + return matching_metadata + else: + new_matching_metadata = {} + for elem in matching_metadata: + new_matching_metadata[elem['key']] = elem['value'] + return new_matching_metadata