From 143a308b8c7d3dc7a7186520792b144772326027 Mon Sep 17 00:00:00 2001 From: ZhiQiang Fan Date: Wed, 9 Dec 2015 10:26:36 -0700 Subject: [PATCH] Implement complex query API Complex query computes the single complex query to multiple simple queries which only use AND operator, each simple query will invoke get_samples() method. Complex query collects all the results till limit constraint is satisfied. Change-Id: I0da398d4d3627fcbfe2c686acf5b0fa9a68492b7 --- .gitignore | 2 + ceilosca/ceilometer/storage/impl_monasca.py | 181 +++++++++++++++++- .../tests/unit/storage/test_impl_monasca.py | 94 ++++++++- 3 files changed, 266 insertions(+), 11 deletions(-) diff --git a/.gitignore b/.gitignore index 90d1058..d8d90fe 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ *.egg* *.pyc .tox +.testrepository +simulator diff --git a/ceilosca/ceilometer/storage/impl_monasca.py b/ceilosca/ceilometer/storage/impl_monasca.py index 2d8f2ad..6738a83 100644 --- a/ceilosca/ceilometer/storage/impl_monasca.py +++ b/ceilosca/ceilometer/storage/impl_monasca.py @@ -17,6 +17,7 @@ """ import datetime +import operator from monascaclient import exc as monasca_exc from oslo_config import cfg @@ -28,6 +29,7 @@ import ceilometer from ceilometer.i18n import _ from ceilometer import monasca_client from ceilometer.publisher.monasca_data_filter import MonascaDataFilter +from ceilometer import storage from ceilometer.storage import base from ceilometer.storage import models as api_models from ceilometer import utils @@ -53,7 +55,7 @@ AVAILABLE_CAPABILITIES = { 'groupby': False, 'query': {'simple': True, 'metadata': True, - 'complex': False}}, + 'complex': True}}, 'statistics': {'groupby': False, 'query': {'simple': True, 'metadata': False}, @@ -358,7 +360,11 @@ class Connection(base.Connection): user_id=sample_filter.user, project_id=sample_filter.project, resource_id=sample_filter.resource, - source=sample_filter.source + source=sample_filter.source, + # Dynamic sample filter attributes, these fields are useful for + # filtering result. + unit=getattr(sample_filter, 'unit', None), + type=getattr(sample_filter, 'type', None), ) _dimensions = {k: v for k, v in _dimensions.items() if v is not None} @@ -664,3 +670,174 @@ class Connection(base.Connection): groupby={u'': u''}, **stats_dict ) + + def _parse_to_filter_list(self, filter_expr): + """Parse complex query expression to simple filter list. + + For i.e. parse: + {"or":[{"=":{"meter":"cpu"}},{"=":{"meter":"memory"}}]} + to + [[{"=":{"counter_name":"cpu"}}], + [{"=":{"counter_name":"memory"}}]] + """ + op, nodes = filter_expr.items()[0] + msg = "%s operand is not supported" % op + + if op == 'or': + filter_list = [] + for node in nodes: + filter_list.extend(self._parse_to_filter_list(node)) + return filter_list + elif op == 'and': + filter_list_subtree = [] + for node in nodes: + filter_list_subtree.append(self._parse_to_filter_list(node)) + filter_list = [[]] + for filters in filter_list_subtree: + tmp = [] + for filter in filters: + for f in filter_list: + tmp.append(f + filter) + filter_list = tmp + return filter_list + elif op == 'not': + raise ceilometer.NotImplementedError(msg) + elif op in ("<", "<=", "=", ">=", ">", '!='): + return [[filter_expr]] + else: + raise ceilometer.NotImplementedError(msg) + + def _parse_to_sample_filter(self, simple_filters): + """Parse to simple filters to sample filter. + + For i.e.: parse + [{"=":{"counter_name":"cpu"}},{"=":{"counter_volume": 1}}] + to + SampleFilter(counter_name="cpu", counter_volume=1) + """ + equal_only_fields = ( + 'counter_name', + 'counter_unit', + 'counter_type', + 'project_id', + 'user_id', + 'source', + 'resource_id', + # These fields are supported by Ceilometer but cannot supported + # by Monasca. + # 'message_id', + # 'message_signature', + # 'recorded_at', + ) + field_map = { + "project_id": "project", + "user_id": "user", + "resource_id": "resource", + "counter_name": "meter", + "counter_type": "type", + "counter_unit": "unit", + } + msg = "operand %s cannot be applied to field %s" + kwargs = {'metaquery': {}} + for sf in simple_filters: + op = sf.keys()[0] + field, value = sf.values()[0].items()[0] + if field in equal_only_fields: + if op != '=': + raise ceilometer.NotImplementedError(msg % (op, field)) + field = field_map.get(field, field) + kwargs[field] = value + elif field == 'timestamp': + if op == '>=': + kwargs['start_timestamp'] = value + kwargs['start_timestamp_op'] = 'ge' + elif op == '<=': + kwargs['end_timestamp'] = value + kwargs['end_timestamp_op'] = 'le' + else: + raise ceilometer.NotImplementedError(msg % (op, field)) + elif field == 'counter_volume': + kwargs['volume'] = value + kwargs['volume_op'] = op + elif (field.startswith('resource_metadata.') or + field.startswith('metadata.')): + kwargs['metaquery'][field] = value + else: + ra_msg = "field %s is not supported" % field + raise ceilometer.NotImplementedError(ra_msg) + sample_type = kwargs.pop('type', None) + sample_unit = kwargs.pop('unit', None) + sample_volume = kwargs.pop('volume', None) + sample_volume_op = kwargs.pop('volume_op', None) + sample_filter = storage.SampleFilter(**kwargs) + # Add some dynamic attributes, type and unit attributes can be used + # when query Monasca API, volume and volime_op attributes can + # be used for volume comparison. + sample_filter.type = sample_type + sample_filter.unit = sample_unit + sample_filter.volume = sample_volume + sample_filter.volume_op = sample_volume_op + return sample_filter + + def _parse_to_sample_filters(self, filter_expr): + """Parse complex query expression to sample filter list.""" + filter_list = self._parse_to_filter_list(filter_expr) + sample_filters = [] + for filters in filter_list: + sf = self._parse_to_sample_filter(filters) + if sf: + sample_filters.append(sf) + return sample_filters + + def _validate_samples_by_volume(self, samples, sf): + if not sf.volume: + return samples + + op_func_map = { + '<': operator.lt, + '<=': operator.le, + '=': operator.eq, + '>=': operator.ge, + '>': operator.gt, + '!=': operator.ne, + } + + ret = [] + for s in samples: + op_func = op_func_map[sf.volume_op] + volume = getattr(s, 'volume', getattr(s, 'counter_volume', None)) + if op_func(volume, sf.volume): + ret.append(s) + return ret + + def query_samples(self, filter_expr=None, orderby=None, limit=None): + if not filter_expr: + msg = "fitler must be specified" + raise ceilometer.NotImplementedError(msg) + if orderby: + msg = "orderby is not supported" + raise ceilometer.NotImplementedError(msg) + if not limit: + msg = "limit must be specified" + raise ceilometer.NotImplementedError(msg) + + LOG.debug("filter_expr = %s", filter_expr) + sample_filters = self._parse_to_sample_filters(filter_expr) + LOG.debug("sample_filters = %s", sample_filters) + + ret = [] + for sf in sample_filters: + if not sf.volume: + samples = list(self.get_samples(sf, limit)) + else: + samples = self.get_samples(sf) + samples = list(self._validate_samples_by_volume(samples, sf)) + + if limit <= len(samples): + ret.extend(samples[0:limit]) + break + else: + ret.extend(samples) + limit -= len(samples) + + return ret diff --git a/ceilosca/ceilometer/tests/unit/storage/test_impl_monasca.py b/ceilosca/ceilometer/tests/unit/storage/test_impl_monasca.py index 2e819d9..0711b12 100644 --- a/ceilosca/ceilometer/tests/unit/storage/test_impl_monasca.py +++ b/ceilosca/ceilometer/tests/unit/storage/test_impl_monasca.py @@ -23,8 +23,9 @@ from oslotest import base import ceilometer from ceilometer.api.controllers.v2.meters import Aggregate -import ceilometer.storage as storage +from ceilometer import storage from ceilometer.storage import impl_monasca +from ceilometer.storage import models as storage_models class TestGetResources(base.BaseTestCase): @@ -460,10 +461,7 @@ class TestGetSamples(base.BaseTestCase): self.assertEqual(1, ml_mock.call_count) -class MeterStatisticsTest(base.BaseTestCase): - - Aggregate = collections.namedtuple("Aggregate", ['func', 'param']) - +class _BaseTestCase(base.BaseTestCase): def assertRaisesWithMessage(self, msg, exc_class, func, *args, **kwargs): try: func(*args, **kwargs) @@ -471,9 +469,14 @@ class MeterStatisticsTest(base.BaseTestCase): exc_class.__name__) except AssertionError: raise - except Exception as e: - self.assertIsInstance(e, exc_class) - self.assertEqual(e.message, msg) + # Only catch specific exception so we can get stack trace when fail + except exc_class as e: + self.assertEqual(msg, e.message) + + +class MeterStatisticsTest(_BaseTestCase): + + Aggregate = collections.namedtuple("Aggregate", ['func', 'param']) @mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter") def test_not_implemented_params(self, mock_mdf): @@ -678,6 +681,79 @@ class MeterStatisticsTest(base.BaseTestCase): stat.period_end.isoformat()) +class TestQuerySamples(_BaseTestCase): + def setUp(self): + super(TestQuerySamples, self).setUp() + self.CONF = self.useFixture(fixture_config.Config()).conf + self.CONF([], project='ceilometer', validate_default_values=True) + + @mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter") + def test_query_samples_not_implemented_params(self, mdf_mock): + with mock.patch("ceilometer.monasca_client.Client"): + conn = impl_monasca.Connection("127.0.0.1:8080") + query = {'or': [{'=': {"project_id": "123"}}, + {'=': {"user_id": "456"}}]} + + self.assertRaisesWithMessage( + 'fitler must be specified', + ceilometer.NotImplementedError, + lambda: list(conn.query_samples())) + self.assertRaisesWithMessage( + 'limit must be specified', + ceilometer.NotImplementedError, + lambda: list(conn.query_samples(query))) + order_by = [{"timestamp": "desc"}] + self.assertRaisesWithMessage( + 'orderby is not supported', + ceilometer.NotImplementedError, + lambda: list(conn.query_samples(query, order_by))) + self.assertRaisesWithMessage( + 'Supply meter name at the least', + ceilometer.NotImplementedError, + lambda: list(conn.query_samples(query, None, 1))) + + @mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter") + def test_query_samples(self, mdf_mock): + SAMPLES = [[ + storage_models.Sample( + counter_name="instance", + counter_type="gauge", + counter_unit="instance", + counter_volume=1, + project_id="123", + user_id="456", + resource_id="789", + resource_metadata={}, + source="openstack", + recorded_at=timeutils.utcnow(), + timestamp=timeutils.utcnow(), + message_id="0", + message_signature='',) + ]] * 2 + samples = SAMPLES[:] + + def _get_samples(*args, **kwargs): + return samples.pop() + + with mock.patch("ceilometer.monasca_client.Client"): + conn = impl_monasca.Connection("127.0.0.1:8080") + with mock.patch.object(conn, 'get_samples') as gsm: + gsm.side_effect = _get_samples + + query = {'or': [{'=': {"project_id": "123"}}, + {'=': {"user_id": "456"}}]} + samples = conn.query_samples(query, None, 100) + self.assertEqual(2, len(samples)) + self.assertEqual(2, gsm.call_count) + + samples = SAMPLES[:] + query = {'and': [{'=': {"project_id": "123"}}, + {'>': {"counter_volume": 2}}]} + samples = conn.query_samples(query, None, 100) + self.assertEqual(0, len(samples)) + self.assertEqual(3, gsm.call_count) + + class CapabilitiesTest(base.BaseTestCase): def test_capabilities(self): @@ -704,7 +780,7 @@ class CapabilitiesTest(base.BaseTestCase): 'pagination': False, 'query': { - 'complex': False, + 'complex': True, 'metadata': True, 'simple': True }