Merge "get_resources and get_samples uses iterative approach"

This commit is contained in:
Jenkins 2015-12-18 19:44:56 +00:00 committed by Gerrit Code Review
commit 34b63f8cbe
2 changed files with 146 additions and 274 deletions

View File

@ -17,16 +17,13 @@
"""
import datetime
from monascaclient import exc as monasca_exc
from oslo_config import cfg
from oslo_log import log
from oslo_service import service as os_service
from oslo_utils import netutils
from oslo_utils import timeutils
import eventlet
from eventlet.queue import Empty
import ceilometer
from ceilometer.i18n import _
from ceilometer import monasca_client
@ -41,10 +38,6 @@ OPTS = [
default=300,
help='Default period (in seconds) to use for querying stats '
'in case no period specified in the stats API call.'),
cfg.IntOpt('query_concurrency_limit',
default=30,
help='Number of concurrent queries to use for querying '
'Monasca API'),
]
cfg.CONF.register_opts(OPTS, group='monasca')
@ -120,8 +113,7 @@ class Connection(base.Connection):
:param value_meta: metadata from monasca
:returns: True for matched, False for not matched
"""
if (len(query) > 0 and
(len(value_meta) == 0 or
if (query and (len(value_meta) == 0 or
not set(query.items()).issubset(set(value_meta.items())))):
return False
else:
@ -183,7 +175,6 @@ class Connection(base.Connection):
"""
if limit == 0:
return
# TODO(Implement limit correctly)
q = {}
if metaquery:
@ -222,6 +213,7 @@ class Connection(base.Connection):
_search_args = {k: v for k, v in _search_args.items()
if v is not None}
result_count = 0
for metric in self.mc.metrics_list(
**dict(dimensions=dims_filter)):
_search_args['name'] = metric['name']
@ -235,6 +227,8 @@ class Connection(base.Connection):
if not self._match_metaquery_to_value_meta(q, vm):
continue
if d.get('resource_id'):
result_count += 1
yield api_models.Resource(
resource_id=d.get('resource_id'),
first_sample_timestamp=(
@ -243,8 +237,12 @@ class Connection(base.Connection):
project_id=d.get('project_id'),
source=d.get('source'),
user_id=d.get('user_id'),
metadata=m['value_meta'],
metadata=m['value_meta']
)
if result_count == limit:
return
except monasca_exc.HTTPConflict:
pass
@ -296,125 +294,13 @@ class Connection(base.Connection):
source=metric['dimensions'].get('source'),
user_id=metric['dimensions'].get('user_id'))
def get_measurements(self, result_queue, metric_name, metric_dimensions,
meta_q, start_ts, end_ts, start_op, end_op, limit):
start_ts = timeutils.isotime(start_ts)
end_ts = timeutils.isotime(end_ts)
_search_args = dict(name=metric_name,
start_time=start_ts,
start_timestamp_op=start_op,
end_time=end_ts,
end_timestamp_op=end_op,
merge_metrics=False,
limit=limit,
dimensions=metric_dimensions)
_search_args = {k: v for k, v in _search_args.items()
if v is not None}
for sample in self.mc.measurements_list(**_search_args):
LOG.debug(_('Retrieved sample: %s'), sample)
d = sample['dimensions']
for measurement in sample['measurements']:
meas_dict = self._convert_to_dict(measurement,
sample['columns'])
vm = meas_dict['value_meta']
if not self._match_metaquery_to_value_meta(meta_q, vm):
continue
result_queue.put(api_models.Sample(
source=d.get('source'),
counter_name=sample['name'],
counter_type=d.get('type'),
counter_unit=d.get('unit'),
counter_volume=meas_dict['value'],
user_id=d.get('user_id'),
project_id=d.get('project_id'),
resource_id=d.get('resource_id'),
timestamp=timeutils.parse_isotime(meas_dict['timestamp']),
resource_metadata=meas_dict['value_meta'],
message_id=sample['id'],
message_signature='',
recorded_at=(
timeutils.parse_isotime(meas_dict['timestamp']))))
def get_next_time_delta(self, start, end, delta):
# Gets next time window
curr = start
while curr < end:
next = curr + delta
yield curr, next
curr = next
def get_next_task_args(self, start_timestamp=None, end_timestamp=None,
delta=None, **kwargs):
# Yields next set of measurement related args
metrics = self.mc.metrics_list(**kwargs)
has_ts = start_timestamp and end_timestamp and delta
if has_ts:
for start, end in self.get_next_time_delta(
start_timestamp,
end_timestamp,
delta):
for metric in metrics:
task = {'metric': metric['name'],
'dimension': metric['dimensions'],
'start_ts': start,
'end_ts': end}
LOG.debug(_('next task is : %s'), task)
yield task
else:
for metric in metrics:
task = {'metric': metric['name'],
'dimension': metric['dimensions']
}
LOG.debug(_('next task is : %s'), task)
yield task
def has_more_results(self, result_queue, t_pool):
if result_queue.empty() and t_pool.pool.running() == 0:
return False
return True
def fetch_from_queue(self, result_queue, t_pool):
# Fetches result from queue in non-blocking way
try:
result = result_queue.get_nowait()
LOG.debug(_('Retrieved result : %s'), result)
return result
except Empty:
# if no data in queue, yield to work threads
# to give them a chance
if t_pool.pool.running() > 0:
eventlet.sleep(0)
def get_results(self, result_queue, t_pool, limit=None, result_count=None):
# Inspect and yield results
if limit:
while result_count < limit:
if not self.has_more_results(result_queue, t_pool):
break
result = self.fetch_from_queue(result_queue, t_pool)
if result:
yield result
result_count += 1
else:
while True:
if not self.has_more_results(result_queue, t_pool):
break
result = self.fetch_from_queue(result_queue, t_pool)
if result:
yield result
def get_samples(self, sample_filter, limit=None):
"""Return an iterable of dictionaries containing sample information.
{
'source': source of the resource,
'counter_name': name of the resource,
'counter_name': name of the resource,if groupby:
raise ceilometer.NotImplementedError('Groupby not implemented')
'counter_type': type of the sample (gauge, delta, cumulative),
'counter_unit': unit of the sample,
'counter_volume': volume of the sample,
@ -431,10 +317,9 @@ class Connection(base.Connection):
:param sample_filter: constraints for the sample search.
:param limit: Maximum number of results to return.
"""
# Initialize pool of green work threads and queue to handle results
thread_pool = os_service.threadgroup.ThreadGroup(
thread_pool_size=cfg.CONF.monasca.query_concurrency_limit)
result_queue = eventlet.queue.Queue()
if limit == 0:
return
if not sample_filter or not sample_filter.meter:
raise ceilometer.NotImplementedError(
@ -469,9 +354,6 @@ class Connection(base.Connection):
if not sample_filter.end_timestamp:
sample_filter.end_timestamp = datetime.datetime.utcnow()
delta = sample_filter.end_timestamp - sample_filter.start_timestamp
delta = delta / cfg.CONF.monasca.query_concurrency_limit
_dimensions = dict(
user_id=sample_filter.user,
project_id=sample_filter.project,
@ -484,36 +366,51 @@ class Connection(base.Connection):
_metric_args = dict(name=sample_filter.meter,
dimensions=_dimensions)
if limit:
result_count = 0
start_ts = timeutils.isotime(sample_filter.start_timestamp)
end_ts = timeutils.isotime(sample_filter.end_timestamp)
for task_cnt, task in enumerate(self.get_next_task_args(
sample_filter.start_timestamp, sample_filter.end_timestamp,
delta, **_metric_args)):
# Spawn query_concurrency_limit number of green threads
# simultaneously to fetch measurements
thread_pool.add_thread(self.get_measurements,
result_queue,
task['metric'],
task['dimension'],
q,
task['start_ts'],
task['end_ts'],
sample_filter.start_timestamp_op,
sample_filter.end_timestamp_op,
limit)
# For every query_conncurrency_limit set of tasks,
# consume data from queue and yield before moving on to
# next set of tasks.
if (task_cnt + 1) % cfg.CONF.monasca.query_concurrency_limit == 0:
for result in self.get_results(result_queue, thread_pool,
limit,
result_count=result_count if
limit else None):
yield result
_search_args = dict(
start_time=start_ts,
start_timestamp_op=sample_filter.start_timestamp_op,
end_time=end_ts,
end_timestamp_op=sample_filter.end_timestamp_op,
merge_metrics=False
)
# Shutdown threadpool
thread_pool.stop()
result_count = 0
for metric in self.mc.metrics_list(
**_metric_args):
_search_args['name'] = metric['name']
_search_args['dimensions'] = metric['dimensions']
_search_args = {k: v for k, v in _search_args.items()
if v is not None}
for sample in self.mc.measurements_list(**_search_args):
d = sample['dimensions']
for meas in sample['measurements']:
m = self._convert_to_dict(
meas, sample['columns'])
vm = m['value_meta']
if not self._match_metaquery_to_value_meta(q, vm):
continue
result_count += 1
yield api_models.Sample(
source=d.get('source'),
counter_name=sample['name'],
counter_type=d.get('type'),
counter_unit=d.get('unit'),
counter_volume=m['value'],
user_id=d.get('user_id'),
project_id=d.get('project_id'),
resource_id=d.get('resource_id'),
timestamp=timeutils.parse_isotime(m['timestamp']),
resource_metadata=m['value_meta'],
message_id=sample['id'],
message_signature='',
recorded_at=(timeutils.parse_isotime(m['timestamp'])))
if result_count == limit:
return
def get_meter_statistics(self, filter, period=None, groupby=None,
aggregate=None):
@ -610,12 +507,10 @@ class Connection(base.Connection):
dimensions=dims_filter)
group_stats_list = []
for task_cnt, task in enumerate(
self.get_next_task_args(**_metric_args)):
for metric in self.mc.metrics_list(**_metric_args):
_search_args = dict(
name=task['metric'],
dimensions=task['dimension'],
name=metric['name'],
dimensions=metric['dimensions'],
start_time=filter.start_timestamp,
end_time=filter.end_timestamp,
period=period,

View File

@ -28,6 +28,19 @@ from ceilometer.storage import impl_monasca
class TestGetResources(base.BaseTestCase):
dummy_get_resources_mocked_return_value = (
[{u'dimensions': {},
u'measurements': [[u'2015-04-14T17:52:31Z', 1.0, {}]],
u'id': u'2015-04-14T18:42:31Z',
u'columns': [u'timestamp', u'value', u'value_meta'],
u'name': u'image'}])
def setUp(self):
super(TestGetResources, self).setUp()
self.CONF = self.useFixture(fixture_config.Config()).conf
self.CONF([], project='ceilometer', validate_default_values=True)
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
def test_not_implemented_params(self, mock_mdf):
with mock.patch("ceilometer.monasca_client.Client"):
@ -69,9 +82,10 @@ class TestGetResources(base.BaseTestCase):
'dimensions': {}}
]
kwargs = dict(source='openstack')
list(conn.get_resources(**kwargs))
ml_mock = mock_client().measurements_list
ml_mock.return_value = (
TestGetResources.dummy_get_resources_mocked_return_value)
list(conn.get_resources(**kwargs))
self.assertEqual(2, ml_mock.call_count)
self.assertEqual(dict(dimensions={},
name='metric1',
@ -79,6 +93,42 @@ class TestGetResources(base.BaseTestCase):
start_time='1970-01-01T00:00:00Z'),
ml_mock.call_args_list[0][1])
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
def test_get_resources_limit(self, mdf_mock):
with mock.patch("ceilometer.monasca_client.Client") as mock_client:
conn = impl_monasca.Connection("127.0.0.1:8080")
mnl_mock = mock_client().metrics_list
mnl_mock.return_value = [{'name': 'metric1',
'dimensions': {'resource_id': 'abcd'}},
{'name': 'metric2',
'dimensions': {'resource_id': 'abcd'}}
]
dummy_get_resources_mocked_return_value = (
[{u'dimensions': {u'resource_id': u'abcd'},
u'measurements': [[u'2015-04-14T17:52:31Z', 1.0, {}],
[u'2015-04-15T17:52:31Z', 2.0, {}],
[u'2015-04-16T17:52:31Z', 3.0, {}]],
u'id': u'2015-04-14T18:42:31Z',
u'columns': [u'timestamp', u'value', u'value_meta'],
u'name': u'image'}])
ml_mock = mock_client().measurements_list
ml_mock.return_value = (
TestGetSamples.dummy_metrics_mocked_return_value
)
ml_mock = mock_client().measurements_list
ml_mock.return_value = (
dummy_get_resources_mocked_return_value)
sample_filter = storage.SampleFilter(
meter='specific meter', end_timestamp='2015-04-20T00:00:00Z')
resources = list(conn.get_resources(sample_filter, limit=2))
self.assertEqual(2, len(resources))
self.assertEqual(True, ml_mock.called)
self.assertEqual(2, ml_mock.call_count)
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
def test_get_resources_simple_metaquery(self, mock_mdf):
with mock.patch("ceilometer.monasca_client.Client") as mock_client:
@ -92,8 +142,12 @@ class TestGetResources(base.BaseTestCase):
'value_meta': {'key': 'value2'}},
]
kwargs = dict(metaquery={'metadata.key': 'value1'})
list(conn.get_resources(**kwargs))
ml_mock = mock_client().measurements_list
ml_mock.return_value = (
TestGetResources.dummy_get_resources_mocked_return_value)
list(conn.get_resources(**kwargs))
self.assertEqual(2, ml_mock.call_count)
self.assertEqual(dict(dimensions={},
name='metric2',
@ -155,7 +209,6 @@ class TestGetSamples(base.BaseTestCase):
super(TestGetSamples, self).setUp()
self.CONF = self.useFixture(fixture_config.Config()).conf
self.CONF([], project='ceilometer', validate_default_values=True)
self.CONF.set_override('query_concurrency_limit', 3, group='monasca')
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
def test_get_samples_not_implemented_params(self, mdf_mock):
@ -177,37 +230,10 @@ class TestGetSamples(base.BaseTestCase):
self.assertRaises(ceilometer.NotImplementedError,
lambda: list(conn.get_samples(sample_filter)))
def get_concurrent_task_args(self, conn, start_time, end_time,
sample_filter, dimensions=None, limit=None):
delta = ((end_time - start_time) /
self.CONF.monasca.query_concurrency_limit)
expected_args_list = []
for start, end in impl_monasca.Connection.get_next_time_delta(
conn, start_time, end_time, delta):
if limit:
expected_args_list.append(dict(
dimensions=dimensions if dimensions else {},
start_time=timeutils.isotime(start),
start_timestamp_op=sample_filter.start_timestamp_op,
merge_metrics=False, name='specific meter',
limit=limit,
end_time=timeutils.isotime(end)))
else:
expected_args_list.append(dict(
dimensions=dimensions if dimensions else {},
start_time=timeutils.isotime(start),
start_timestamp_op=sample_filter.start_timestamp_op,
merge_metrics=False, name='specific meter',
end_time=timeutils.isotime(end)))
return expected_args_list
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
def test_get_samples_name(self, mdf_mock):
with mock.patch("ceilometer.monasca_client.Client") as mock_client:
conn = impl_monasca.Connection("127.0.0.1:8080")
metrics_list_mock = mock_client().metrics_list
metrics_list_mock.return_value = (
TestGetSamples.dummy_metrics_mocked_return_value
@ -215,22 +241,17 @@ class TestGetSamples(base.BaseTestCase):
ml_mock = mock_client().measurements_list
ml_mock.return_value = (
TestGetSamples.dummy_get_samples_mocked_return_value)
start_time = datetime.datetime(1970, 1, 1)
end_time = datetime.datetime(2015, 4, 20)
sample_filter = storage.SampleFilter(
meter='specific meter',
end_timestamp=timeutils.isotime(end_time))
meter='specific meter', end_timestamp='2015-04-20T00:00:00Z')
list(conn.get_samples(sample_filter))
self.assertEqual(True, ml_mock.called)
expected_args_list = self.get_concurrent_task_args(
conn, start_time, end_time, sample_filter)
self.assertEqual(3, ml_mock.call_count)
(self.assertIn(call_arg[1], expected_args_list)
for call_arg in ml_mock.call_args_list)
self.assertEqual(dict(
dimensions={},
start_time='1970-01-01T00:00:00Z',
merge_metrics=False, name='specific meter',
end_time='2015-04-20T00:00:00Z'),
ml_mock.call_args[1])
self.assertEqual(1, ml_mock.call_count)
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
def test_get_samples_start_timestamp_filter(self, mdf_mock):
@ -246,7 +267,6 @@ class TestGetSamples(base.BaseTestCase):
TestGetSamples.dummy_get_samples_mocked_return_value)
start_time = datetime.datetime(2015, 3, 20)
end_time = datetime.datetime.utcnow()
sample_filter = storage.SampleFilter(
meter='specific meter',
@ -254,14 +274,7 @@ class TestGetSamples(base.BaseTestCase):
start_timestamp_op='ge')
list(conn.get_samples(sample_filter))
self.assertEqual(True, ml_mock.called)
expected_args_list = self.get_concurrent_task_args(conn,
start_time,
end_time,
sample_filter)
self.assertEqual(3, ml_mock.call_count)
(self.assertIn(call_arg[1], expected_args_list)
for call_arg in ml_mock.call_args_list)
self.assertEqual(1, ml_mock.call_count)
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
def test_get_samples_limit(self, mdf_mock):
@ -269,30 +282,29 @@ class TestGetSamples(base.BaseTestCase):
conn = impl_monasca.Connection("127.0.0.1:8080")
metrics_list_mock = mock_client().metrics_list
dummy_get_samples_mocked_return_value = (
[{u'dimensions': {},
u'measurements': [[u'2015-04-14T17:52:31Z', 1.0, {}],
[u'2015-04-15T17:52:31Z', 2.0, {}],
[u'2015-04-16T17:52:31Z', 3.0, {}]],
u'id': u'2015-04-14T18:42:31Z',
u'columns': [u'timestamp', u'value', u'value_meta'],
u'name': u'image'}])
metrics_list_mock.return_value = (
TestGetSamples.dummy_metrics_mocked_return_value
)
ml_mock = mock_client().measurements_list
ml_mock.return_value = (
TestGetSamples.dummy_get_samples_mocked_return_value)
start_time = datetime.datetime(1970, 1, 1)
end_time = datetime.datetime(2015, 4, 20)
dummy_get_samples_mocked_return_value)
sample_filter = storage.SampleFilter(
meter='specific meter', end_timestamp='2015-04-20T00:00:00Z')
list(conn.get_samples(sample_filter, limit=50))
samples = list(conn.get_samples(sample_filter, limit=2))
self.assertEqual(2, len(samples))
self.assertEqual(True, ml_mock.called)
expected_args_list = self.get_concurrent_task_args(conn,
start_time,
end_time,
sample_filter,
limit=50)
self.assertEqual(3, ml_mock.call_count)
(self.assertIn(call_arg[1], expected_args_list)
for call_arg in ml_mock.call_args_list)
self.assertEqual(1, ml_mock.call_count)
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
def test_get_samples_project_filter(self, mock_mdf):
@ -309,20 +321,11 @@ class TestGetSamples(base.BaseTestCase):
ml_mock.return_value = (
TestGetSamples.dummy_get_samples_mocked_return_value)
start_time = datetime.datetime(1970, 1, 1)
end_time = datetime.datetime.utcnow()
sample_filter = storage.SampleFilter(meter='specific meter',
project='specific project')
list(conn.get_samples(sample_filter))
self.assertEqual(True, ml_mock.called)
expected_args_list = self.get_concurrent_task_args(
conn, start_time, end_time, sample_filter,
dimensions=dict(project_id=sample_filter.project))
self.assertEqual(3, ml_mock.call_count)
(self.assertIn(call_arg[1], expected_args_list)
for call_arg in ml_mock.call_args_list)
self.assertEqual(1, ml_mock.call_count)
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
def test_get_samples_resource_filter(self, mock_mdf):
@ -338,20 +341,11 @@ class TestGetSamples(base.BaseTestCase):
ml_mock.return_value = (
TestGetSamples.dummy_get_samples_mocked_return_value)
start_time = datetime.datetime(1970, 1, 1)
end_time = datetime.datetime.utcnow()
sample_filter = storage.SampleFilter(meter='specific meter',
resource='specific resource')
list(conn.get_samples(sample_filter))
self.assertEqual(True, ml_mock.called)
expected_args_list = self.get_concurrent_task_args(
conn, start_time, end_time, sample_filter,
dimensions=dict(resource_id=sample_filter.resource))
self.assertEqual(3, ml_mock.call_count)
(self.assertIn(call_arg[1], expected_args_list)
for call_arg in ml_mock.call_args_list)
self.assertEqual(1, ml_mock.call_count)
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
def test_get_samples_source_filter(self, mdf_mock):
@ -367,20 +361,11 @@ class TestGetSamples(base.BaseTestCase):
ml_mock.return_value = (
TestGetSamples.dummy_get_samples_mocked_return_value)
start_time = datetime.datetime(1970, 1, 1)
end_time = datetime.datetime.utcnow()
sample_filter = storage.SampleFilter(meter='specific meter',
source='specific source')
list(conn.get_samples(sample_filter))
self.assertEqual(True, ml_mock.called)
expected_args_list = self.get_concurrent_task_args(
conn, start_time, end_time, sample_filter,
dimensions=dict(source=sample_filter.source))
self.assertEqual(3, ml_mock.call_count)
(self.assertIn(call_arg[1], expected_args_list)
for call_arg in ml_mock.call_args_list)
self.assertEqual(1, ml_mock.call_count)
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
def test_get_samples_simple_metaquery(self, mdf_mock):
@ -394,20 +379,12 @@ class TestGetSamples(base.BaseTestCase):
ml_mock.return_value = (
TestGetSamples.dummy_get_samples_mocked_return_value)
start_time = datetime.datetime(1970, 1, 1)
end_time = datetime.datetime.utcnow()
sample_filter = storage.SampleFilter(
meter='specific meter',
metaquery={'metadata.key': u'value'})
list(conn.get_samples(sample_filter))
self.assertEqual(True, ml_mock.called)
expected_args_list = self.get_concurrent_task_args(
conn, start_time, end_time, sample_filter)
self.assertEqual(3, ml_mock.call_count)
(self.assertIn(call_arg[1], expected_args_list)
for call_arg in ml_mock.call_args_list)
self.assertEqual(1, ml_mock.call_count)
@mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter")
def test_get_samples_results(self, mdf_mock):
@ -480,7 +457,7 @@ class TestGetSamples(base.BaseTestCase):
get('measurements')[0][0]))
self.assertEqual(results[0].user_id, None)
self.assertEqual(3, ml_mock.call_count)
self.assertEqual(1, ml_mock.call_count)
class MeterStatisticsTest(base.BaseTestCase):