Merge "Implement complex query API"
This commit is contained in:
commit
1254809222
|
@ -1,3 +1,5 @@
|
||||||
*.egg*
|
*.egg*
|
||||||
*.pyc
|
*.pyc
|
||||||
.tox
|
.tox
|
||||||
|
.testrepository
|
||||||
|
simulator
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import datetime
|
import datetime
|
||||||
|
import operator
|
||||||
|
|
||||||
from monascaclient import exc as monasca_exc
|
from monascaclient import exc as monasca_exc
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
|
@ -28,6 +29,7 @@ import ceilometer
|
||||||
from ceilometer.i18n import _
|
from ceilometer.i18n import _
|
||||||
from ceilometer import monasca_client
|
from ceilometer import monasca_client
|
||||||
from ceilometer.publisher.monasca_data_filter import MonascaDataFilter
|
from ceilometer.publisher.monasca_data_filter import MonascaDataFilter
|
||||||
|
from ceilometer import storage
|
||||||
from ceilometer.storage import base
|
from ceilometer.storage import base
|
||||||
from ceilometer.storage import models as api_models
|
from ceilometer.storage import models as api_models
|
||||||
from ceilometer import utils
|
from ceilometer import utils
|
||||||
|
@ -53,7 +55,7 @@ AVAILABLE_CAPABILITIES = {
|
||||||
'groupby': False,
|
'groupby': False,
|
||||||
'query': {'simple': True,
|
'query': {'simple': True,
|
||||||
'metadata': True,
|
'metadata': True,
|
||||||
'complex': False}},
|
'complex': True}},
|
||||||
'statistics': {'groupby': False,
|
'statistics': {'groupby': False,
|
||||||
'query': {'simple': True,
|
'query': {'simple': True,
|
||||||
'metadata': False},
|
'metadata': False},
|
||||||
|
@ -358,7 +360,11 @@ class Connection(base.Connection):
|
||||||
user_id=sample_filter.user,
|
user_id=sample_filter.user,
|
||||||
project_id=sample_filter.project,
|
project_id=sample_filter.project,
|
||||||
resource_id=sample_filter.resource,
|
resource_id=sample_filter.resource,
|
||||||
source=sample_filter.source
|
source=sample_filter.source,
|
||||||
|
# Dynamic sample filter attributes, these fields are useful for
|
||||||
|
# filtering result.
|
||||||
|
unit=getattr(sample_filter, 'unit', None),
|
||||||
|
type=getattr(sample_filter, 'type', None),
|
||||||
)
|
)
|
||||||
|
|
||||||
_dimensions = {k: v for k, v in _dimensions.items() if v is not None}
|
_dimensions = {k: v for k, v in _dimensions.items() if v is not None}
|
||||||
|
@ -664,3 +670,174 @@ class Connection(base.Connection):
|
||||||
groupby={u'': u''},
|
groupby={u'': u''},
|
||||||
**stats_dict
|
**stats_dict
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def _parse_to_filter_list(self, filter_expr):
|
||||||
|
"""Parse complex query expression to simple filter list.
|
||||||
|
|
||||||
|
For i.e. parse:
|
||||||
|
{"or":[{"=":{"meter":"cpu"}},{"=":{"meter":"memory"}}]}
|
||||||
|
to
|
||||||
|
[[{"=":{"counter_name":"cpu"}}],
|
||||||
|
[{"=":{"counter_name":"memory"}}]]
|
||||||
|
"""
|
||||||
|
op, nodes = filter_expr.items()[0]
|
||||||
|
msg = "%s operand is not supported" % op
|
||||||
|
|
||||||
|
if op == 'or':
|
||||||
|
filter_list = []
|
||||||
|
for node in nodes:
|
||||||
|
filter_list.extend(self._parse_to_filter_list(node))
|
||||||
|
return filter_list
|
||||||
|
elif op == 'and':
|
||||||
|
filter_list_subtree = []
|
||||||
|
for node in nodes:
|
||||||
|
filter_list_subtree.append(self._parse_to_filter_list(node))
|
||||||
|
filter_list = [[]]
|
||||||
|
for filters in filter_list_subtree:
|
||||||
|
tmp = []
|
||||||
|
for filter in filters:
|
||||||
|
for f in filter_list:
|
||||||
|
tmp.append(f + filter)
|
||||||
|
filter_list = tmp
|
||||||
|
return filter_list
|
||||||
|
elif op == 'not':
|
||||||
|
raise ceilometer.NotImplementedError(msg)
|
||||||
|
elif op in ("<", "<=", "=", ">=", ">", '!='):
|
||||||
|
return [[filter_expr]]
|
||||||
|
else:
|
||||||
|
raise ceilometer.NotImplementedError(msg)
|
||||||
|
|
||||||
|
def _parse_to_sample_filter(self, simple_filters):
|
||||||
|
"""Parse to simple filters to sample filter.
|
||||||
|
|
||||||
|
For i.e.: parse
|
||||||
|
[{"=":{"counter_name":"cpu"}},{"=":{"counter_volume": 1}}]
|
||||||
|
to
|
||||||
|
SampleFilter(counter_name="cpu", counter_volume=1)
|
||||||
|
"""
|
||||||
|
equal_only_fields = (
|
||||||
|
'counter_name',
|
||||||
|
'counter_unit',
|
||||||
|
'counter_type',
|
||||||
|
'project_id',
|
||||||
|
'user_id',
|
||||||
|
'source',
|
||||||
|
'resource_id',
|
||||||
|
# These fields are supported by Ceilometer but cannot supported
|
||||||
|
# by Monasca.
|
||||||
|
# 'message_id',
|
||||||
|
# 'message_signature',
|
||||||
|
# 'recorded_at',
|
||||||
|
)
|
||||||
|
field_map = {
|
||||||
|
"project_id": "project",
|
||||||
|
"user_id": "user",
|
||||||
|
"resource_id": "resource",
|
||||||
|
"counter_name": "meter",
|
||||||
|
"counter_type": "type",
|
||||||
|
"counter_unit": "unit",
|
||||||
|
}
|
||||||
|
msg = "operand %s cannot be applied to field %s"
|
||||||
|
kwargs = {'metaquery': {}}
|
||||||
|
for sf in simple_filters:
|
||||||
|
op = sf.keys()[0]
|
||||||
|
field, value = sf.values()[0].items()[0]
|
||||||
|
if field in equal_only_fields:
|
||||||
|
if op != '=':
|
||||||
|
raise ceilometer.NotImplementedError(msg % (op, field))
|
||||||
|
field = field_map.get(field, field)
|
||||||
|
kwargs[field] = value
|
||||||
|
elif field == 'timestamp':
|
||||||
|
if op == '>=':
|
||||||
|
kwargs['start_timestamp'] = value
|
||||||
|
kwargs['start_timestamp_op'] = 'ge'
|
||||||
|
elif op == '<=':
|
||||||
|
kwargs['end_timestamp'] = value
|
||||||
|
kwargs['end_timestamp_op'] = 'le'
|
||||||
|
else:
|
||||||
|
raise ceilometer.NotImplementedError(msg % (op, field))
|
||||||
|
elif field == 'counter_volume':
|
||||||
|
kwargs['volume'] = value
|
||||||
|
kwargs['volume_op'] = op
|
||||||
|
elif (field.startswith('resource_metadata.') or
|
||||||
|
field.startswith('metadata.')):
|
||||||
|
kwargs['metaquery'][field] = value
|
||||||
|
else:
|
||||||
|
ra_msg = "field %s is not supported" % field
|
||||||
|
raise ceilometer.NotImplementedError(ra_msg)
|
||||||
|
sample_type = kwargs.pop('type', None)
|
||||||
|
sample_unit = kwargs.pop('unit', None)
|
||||||
|
sample_volume = kwargs.pop('volume', None)
|
||||||
|
sample_volume_op = kwargs.pop('volume_op', None)
|
||||||
|
sample_filter = storage.SampleFilter(**kwargs)
|
||||||
|
# Add some dynamic attributes, type and unit attributes can be used
|
||||||
|
# when query Monasca API, volume and volime_op attributes can
|
||||||
|
# be used for volume comparison.
|
||||||
|
sample_filter.type = sample_type
|
||||||
|
sample_filter.unit = sample_unit
|
||||||
|
sample_filter.volume = sample_volume
|
||||||
|
sample_filter.volume_op = sample_volume_op
|
||||||
|
return sample_filter
|
||||||
|
|
||||||
|
def _parse_to_sample_filters(self, filter_expr):
|
||||||
|
"""Parse complex query expression to sample filter list."""
|
||||||
|
filter_list = self._parse_to_filter_list(filter_expr)
|
||||||
|
sample_filters = []
|
||||||
|
for filters in filter_list:
|
||||||
|
sf = self._parse_to_sample_filter(filters)
|
||||||
|
if sf:
|
||||||
|
sample_filters.append(sf)
|
||||||
|
return sample_filters
|
||||||
|
|
||||||
|
def _validate_samples_by_volume(self, samples, sf):
|
||||||
|
if not sf.volume:
|
||||||
|
return samples
|
||||||
|
|
||||||
|
op_func_map = {
|
||||||
|
'<': operator.lt,
|
||||||
|
'<=': operator.le,
|
||||||
|
'=': operator.eq,
|
||||||
|
'>=': operator.ge,
|
||||||
|
'>': operator.gt,
|
||||||
|
'!=': operator.ne,
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = []
|
||||||
|
for s in samples:
|
||||||
|
op_func = op_func_map[sf.volume_op]
|
||||||
|
volume = getattr(s, 'volume', getattr(s, 'counter_volume', None))
|
||||||
|
if op_func(volume, sf.volume):
|
||||||
|
ret.append(s)
|
||||||
|
return ret
|
||||||
|
|
||||||
|
def query_samples(self, filter_expr=None, orderby=None, limit=None):
|
||||||
|
if not filter_expr:
|
||||||
|
msg = "fitler must be specified"
|
||||||
|
raise ceilometer.NotImplementedError(msg)
|
||||||
|
if orderby:
|
||||||
|
msg = "orderby is not supported"
|
||||||
|
raise ceilometer.NotImplementedError(msg)
|
||||||
|
if not limit:
|
||||||
|
msg = "limit must be specified"
|
||||||
|
raise ceilometer.NotImplementedError(msg)
|
||||||
|
|
||||||
|
LOG.debug("filter_expr = %s", filter_expr)
|
||||||
|
sample_filters = self._parse_to_sample_filters(filter_expr)
|
||||||
|
LOG.debug("sample_filters = %s", sample_filters)
|
||||||
|
|
||||||
|
ret = []
|
||||||
|
for sf in sample_filters:
|
||||||
|
if not sf.volume:
|
||||||
|
samples = list(self.get_samples(sf, limit))
|
||||||
|
else:
|
||||||
|
samples = self.get_samples(sf)
|
||||||
|
samples = list(self._validate_samples_by_volume(samples, sf))
|
||||||
|
|
||||||
|
if limit <= len(samples):
|
||||||
|
ret.extend(samples[0:limit])
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
ret.extend(samples)
|
||||||
|
limit -= len(samples)
|
||||||
|
|
||||||
|
return ret
|
||||||
|
|
|
@ -23,8 +23,9 @@ from oslotest import base
|
||||||
|
|
||||||
import ceilometer
|
import ceilometer
|
||||||
from ceilometer.api.controllers.v2.meters import Aggregate
|
from ceilometer.api.controllers.v2.meters import Aggregate
|
||||||
import ceilometer.storage as storage
|
from ceilometer import storage
|
||||||
from ceilometer.storage import impl_monasca
|
from ceilometer.storage import impl_monasca
|
||||||
|
from ceilometer.storage import models as storage_models
|
||||||
|
|
||||||
|
|
||||||
class TestGetResources(base.BaseTestCase):
|
class TestGetResources(base.BaseTestCase):
|
||||||
|
@ -460,10 +461,7 @@ class TestGetSamples(base.BaseTestCase):
|
||||||
self.assertEqual(1, ml_mock.call_count)
|
self.assertEqual(1, ml_mock.call_count)
|
||||||
|
|
||||||
|
|
||||||
class MeterStatisticsTest(base.BaseTestCase):
|
class _BaseTestCase(base.BaseTestCase):
|
||||||
|
|
||||||
Aggregate = collections.namedtuple("Aggregate", ['func', 'param'])
|
|
||||||
|
|
||||||
def assertRaisesWithMessage(self, msg, exc_class, func, *args, **kwargs):
|
def assertRaisesWithMessage(self, msg, exc_class, func, *args, **kwargs):
|
||||||
try:
|
try:
|
||||||
func(*args, **kwargs)
|
func(*args, **kwargs)
|
||||||
|
@ -471,9 +469,14 @@ class MeterStatisticsTest(base.BaseTestCase):
|
||||||
exc_class.__name__)
|
exc_class.__name__)
|
||||||
except AssertionError:
|
except AssertionError:
|
||||||
raise
|
raise
|
||||||
except Exception as e:
|
# Only catch specific exception so we can get stack trace when fail
|
||||||
self.assertIsInstance(e, exc_class)
|
except exc_class as e:
|
||||||
self.assertEqual(e.message, msg)
|
self.assertEqual(msg, e.message)
|
||||||
|
|
||||||
|
|
||||||
|
class MeterStatisticsTest(_BaseTestCase):
|
||||||
|
|
||||||
|
Aggregate = collections.namedtuple("Aggregate", ['func', 'param'])
|
||||||
|
|
||||||
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
|
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
|
||||||
def test_not_implemented_params(self, mock_mdf):
|
def test_not_implemented_params(self, mock_mdf):
|
||||||
|
@ -678,6 +681,79 @@ class MeterStatisticsTest(base.BaseTestCase):
|
||||||
stat.period_end.isoformat())
|
stat.period_end.isoformat())
|
||||||
|
|
||||||
|
|
||||||
|
class TestQuerySamples(_BaseTestCase):
|
||||||
|
def setUp(self):
|
||||||
|
super(TestQuerySamples, self).setUp()
|
||||||
|
self.CONF = self.useFixture(fixture_config.Config()).conf
|
||||||
|
self.CONF([], project='ceilometer', validate_default_values=True)
|
||||||
|
|
||||||
|
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
|
||||||
|
def test_query_samples_not_implemented_params(self, mdf_mock):
|
||||||
|
with mock.patch("ceilometer.monasca_client.Client"):
|
||||||
|
conn = impl_monasca.Connection("127.0.0.1:8080")
|
||||||
|
query = {'or': [{'=': {"project_id": "123"}},
|
||||||
|
{'=': {"user_id": "456"}}]}
|
||||||
|
|
||||||
|
self.assertRaisesWithMessage(
|
||||||
|
'fitler must be specified',
|
||||||
|
ceilometer.NotImplementedError,
|
||||||
|
lambda: list(conn.query_samples()))
|
||||||
|
self.assertRaisesWithMessage(
|
||||||
|
'limit must be specified',
|
||||||
|
ceilometer.NotImplementedError,
|
||||||
|
lambda: list(conn.query_samples(query)))
|
||||||
|
order_by = [{"timestamp": "desc"}]
|
||||||
|
self.assertRaisesWithMessage(
|
||||||
|
'orderby is not supported',
|
||||||
|
ceilometer.NotImplementedError,
|
||||||
|
lambda: list(conn.query_samples(query, order_by)))
|
||||||
|
self.assertRaisesWithMessage(
|
||||||
|
'Supply meter name at the least',
|
||||||
|
ceilometer.NotImplementedError,
|
||||||
|
lambda: list(conn.query_samples(query, None, 1)))
|
||||||
|
|
||||||
|
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
|
||||||
|
def test_query_samples(self, mdf_mock):
|
||||||
|
SAMPLES = [[
|
||||||
|
storage_models.Sample(
|
||||||
|
counter_name="instance",
|
||||||
|
counter_type="gauge",
|
||||||
|
counter_unit="instance",
|
||||||
|
counter_volume=1,
|
||||||
|
project_id="123",
|
||||||
|
user_id="456",
|
||||||
|
resource_id="789",
|
||||||
|
resource_metadata={},
|
||||||
|
source="openstack",
|
||||||
|
recorded_at=timeutils.utcnow(),
|
||||||
|
timestamp=timeutils.utcnow(),
|
||||||
|
message_id="0",
|
||||||
|
message_signature='',)
|
||||||
|
]] * 2
|
||||||
|
samples = SAMPLES[:]
|
||||||
|
|
||||||
|
def _get_samples(*args, **kwargs):
|
||||||
|
return samples.pop()
|
||||||
|
|
||||||
|
with mock.patch("ceilometer.monasca_client.Client"):
|
||||||
|
conn = impl_monasca.Connection("127.0.0.1:8080")
|
||||||
|
with mock.patch.object(conn, 'get_samples') as gsm:
|
||||||
|
gsm.side_effect = _get_samples
|
||||||
|
|
||||||
|
query = {'or': [{'=': {"project_id": "123"}},
|
||||||
|
{'=': {"user_id": "456"}}]}
|
||||||
|
samples = conn.query_samples(query, None, 100)
|
||||||
|
self.assertEqual(2, len(samples))
|
||||||
|
self.assertEqual(2, gsm.call_count)
|
||||||
|
|
||||||
|
samples = SAMPLES[:]
|
||||||
|
query = {'and': [{'=': {"project_id": "123"}},
|
||||||
|
{'>': {"counter_volume": 2}}]}
|
||||||
|
samples = conn.query_samples(query, None, 100)
|
||||||
|
self.assertEqual(0, len(samples))
|
||||||
|
self.assertEqual(3, gsm.call_count)
|
||||||
|
|
||||||
|
|
||||||
class CapabilitiesTest(base.BaseTestCase):
|
class CapabilitiesTest(base.BaseTestCase):
|
||||||
|
|
||||||
def test_capabilities(self):
|
def test_capabilities(self):
|
||||||
|
@ -704,7 +780,7 @@ class CapabilitiesTest(base.BaseTestCase):
|
||||||
'pagination': False,
|
'pagination': False,
|
||||||
'query':
|
'query':
|
||||||
{
|
{
|
||||||
'complex': False,
|
'complex': True,
|
||||||
'metadata': True,
|
'metadata': True,
|
||||||
'simple': True
|
'simple': True
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue