From ff9bbb979aa264e1969ebad13442b81432a21b75 Mon Sep 17 00:00:00 2001 From: Rohit Jaiswal Date: Wed, 28 Oct 2015 19:14:14 +0000 Subject: [PATCH] get_resources and get_samples uses iterative approach Implements get_resources and get_samples using iterative approach with limit, monasca measurements api accepts a limit, but cannot be used here since user can query for measurements from different metrics. Change-Id: I87d070c60af3435d20917ffc5d07adf8ed749ee2 --- ceilosca/ceilometer/storage/impl_monasca.py | 227 +++++------------- .../tests/unit/storage/test_impl_monasca.py | 193 +++++++-------- 2 files changed, 146 insertions(+), 274 deletions(-) diff --git a/ceilosca/ceilometer/storage/impl_monasca.py b/ceilosca/ceilometer/storage/impl_monasca.py index 9716f92..2d8f2ad 100644 --- a/ceilosca/ceilometer/storage/impl_monasca.py +++ b/ceilosca/ceilometer/storage/impl_monasca.py @@ -17,16 +17,13 @@ """ import datetime + from monascaclient import exc as monasca_exc from oslo_config import cfg from oslo_log import log -from oslo_service import service as os_service from oslo_utils import netutils from oslo_utils import timeutils -import eventlet -from eventlet.queue import Empty - import ceilometer from ceilometer.i18n import _ from ceilometer import monasca_client @@ -41,10 +38,6 @@ OPTS = [ default=300, help='Default period (in seconds) to use for querying stats ' 'in case no period specified in the stats API call.'), - cfg.IntOpt('query_concurrency_limit', - default=30, - help='Number of concurrent queries to use for querying ' - 'Monasca API'), ] cfg.CONF.register_opts(OPTS, group='monasca') @@ -120,8 +113,7 @@ class Connection(base.Connection): :param value_meta: metadata from monasca :returns: True for matched, False for not matched """ - if (len(query) > 0 and - (len(value_meta) == 0 or + if (query and (len(value_meta) == 0 or not set(query.items()).issubset(set(value_meta.items())))): return False else: @@ -183,7 +175,6 @@ class Connection(base.Connection): """ if limit == 0: return - # TODO(Implement limit correctly) q = {} if metaquery: @@ -222,6 +213,7 @@ class Connection(base.Connection): _search_args = {k: v for k, v in _search_args.items() if v is not None} + result_count = 0 for metric in self.mc.metrics_list( **dict(dimensions=dims_filter)): _search_args['name'] = metric['name'] @@ -235,6 +227,8 @@ class Connection(base.Connection): if not self._match_metaquery_to_value_meta(q, vm): continue if d.get('resource_id'): + result_count += 1 + yield api_models.Resource( resource_id=d.get('resource_id'), first_sample_timestamp=( @@ -243,8 +237,12 @@ class Connection(base.Connection): project_id=d.get('project_id'), source=d.get('source'), user_id=d.get('user_id'), - metadata=m['value_meta'], + metadata=m['value_meta'] ) + + if result_count == limit: + return + except monasca_exc.HTTPConflict: pass @@ -296,125 +294,13 @@ class Connection(base.Connection): source=metric['dimensions'].get('source'), user_id=metric['dimensions'].get('user_id')) - def get_measurements(self, result_queue, metric_name, metric_dimensions, - meta_q, start_ts, end_ts, start_op, end_op, limit): - - start_ts = timeutils.isotime(start_ts) - end_ts = timeutils.isotime(end_ts) - - _search_args = dict(name=metric_name, - start_time=start_ts, - start_timestamp_op=start_op, - end_time=end_ts, - end_timestamp_op=end_op, - merge_metrics=False, - limit=limit, - dimensions=metric_dimensions) - - _search_args = {k: v for k, v in _search_args.items() - if v is not None} - - for sample in self.mc.measurements_list(**_search_args): - LOG.debug(_('Retrieved sample: %s'), sample) - - d = sample['dimensions'] - for measurement in sample['measurements']: - meas_dict = self._convert_to_dict(measurement, - sample['columns']) - vm = meas_dict['value_meta'] - if not self._match_metaquery_to_value_meta(meta_q, vm): - continue - result_queue.put(api_models.Sample( - source=d.get('source'), - counter_name=sample['name'], - counter_type=d.get('type'), - counter_unit=d.get('unit'), - counter_volume=meas_dict['value'], - user_id=d.get('user_id'), - project_id=d.get('project_id'), - resource_id=d.get('resource_id'), - timestamp=timeutils.parse_isotime(meas_dict['timestamp']), - resource_metadata=meas_dict['value_meta'], - message_id=sample['id'], - message_signature='', - recorded_at=( - timeutils.parse_isotime(meas_dict['timestamp'])))) - - def get_next_time_delta(self, start, end, delta): - # Gets next time window - curr = start - while curr < end: - next = curr + delta - yield curr, next - curr = next - - def get_next_task_args(self, start_timestamp=None, end_timestamp=None, - delta=None, **kwargs): - - # Yields next set of measurement related args - metrics = self.mc.metrics_list(**kwargs) - has_ts = start_timestamp and end_timestamp and delta - if has_ts: - for start, end in self.get_next_time_delta( - start_timestamp, - end_timestamp, - delta): - for metric in metrics: - task = {'metric': metric['name'], - 'dimension': metric['dimensions'], - 'start_ts': start, - 'end_ts': end} - LOG.debug(_('next task is : %s'), task) - yield task - else: - for metric in metrics: - task = {'metric': metric['name'], - 'dimension': metric['dimensions'] - } - LOG.debug(_('next task is : %s'), task) - yield task - - def has_more_results(self, result_queue, t_pool): - if result_queue.empty() and t_pool.pool.running() == 0: - return False - return True - - def fetch_from_queue(self, result_queue, t_pool): - # Fetches result from queue in non-blocking way - try: - result = result_queue.get_nowait() - LOG.debug(_('Retrieved result : %s'), result) - return result - except Empty: - # if no data in queue, yield to work threads - # to give them a chance - if t_pool.pool.running() > 0: - eventlet.sleep(0) - - def get_results(self, result_queue, t_pool, limit=None, result_count=None): - # Inspect and yield results - if limit: - while result_count < limit: - if not self.has_more_results(result_queue, t_pool): - break - result = self.fetch_from_queue(result_queue, t_pool) - if result: - yield result - result_count += 1 - else: - while True: - if not self.has_more_results(result_queue, t_pool): - break - result = self.fetch_from_queue(result_queue, t_pool) - if result: - yield result - def get_samples(self, sample_filter, limit=None): """Return an iterable of dictionaries containing sample information. { 'source': source of the resource, - 'counter_name': name of the resource, + 'counter_name': name of the resource,if groupby: + raise ceilometer.NotImplementedError('Groupby not implemented') 'counter_type': type of the sample (gauge, delta, cumulative), 'counter_unit': unit of the sample, 'counter_volume': volume of the sample, @@ -431,10 +317,9 @@ class Connection(base.Connection): :param sample_filter: constraints for the sample search. :param limit: Maximum number of results to return. """ - # Initialize pool of green work threads and queue to handle results - thread_pool = os_service.threadgroup.ThreadGroup( - thread_pool_size=cfg.CONF.monasca.query_concurrency_limit) - result_queue = eventlet.queue.Queue() + + if limit == 0: + return if not sample_filter or not sample_filter.meter: raise ceilometer.NotImplementedError( @@ -469,9 +354,6 @@ class Connection(base.Connection): if not sample_filter.end_timestamp: sample_filter.end_timestamp = datetime.datetime.utcnow() - delta = sample_filter.end_timestamp - sample_filter.start_timestamp - delta = delta / cfg.CONF.monasca.query_concurrency_limit - _dimensions = dict( user_id=sample_filter.user, project_id=sample_filter.project, @@ -484,36 +366,51 @@ class Connection(base.Connection): _metric_args = dict(name=sample_filter.meter, dimensions=_dimensions) - if limit: - result_count = 0 + start_ts = timeutils.isotime(sample_filter.start_timestamp) + end_ts = timeutils.isotime(sample_filter.end_timestamp) - for task_cnt, task in enumerate(self.get_next_task_args( - sample_filter.start_timestamp, sample_filter.end_timestamp, - delta, **_metric_args)): - # Spawn query_concurrency_limit number of green threads - # simultaneously to fetch measurements - thread_pool.add_thread(self.get_measurements, - result_queue, - task['metric'], - task['dimension'], - q, - task['start_ts'], - task['end_ts'], - sample_filter.start_timestamp_op, - sample_filter.end_timestamp_op, - limit) - # For every query_conncurrency_limit set of tasks, - # consume data from queue and yield before moving on to - # next set of tasks. - if (task_cnt + 1) % cfg.CONF.monasca.query_concurrency_limit == 0: - for result in self.get_results(result_queue, thread_pool, - limit, - result_count=result_count if - limit else None): - yield result + _search_args = dict( + start_time=start_ts, + start_timestamp_op=sample_filter.start_timestamp_op, + end_time=end_ts, + end_timestamp_op=sample_filter.end_timestamp_op, + merge_metrics=False + ) - # Shutdown threadpool - thread_pool.stop() + result_count = 0 + for metric in self.mc.metrics_list( + **_metric_args): + _search_args['name'] = metric['name'] + _search_args['dimensions'] = metric['dimensions'] + _search_args = {k: v for k, v in _search_args.items() + if v is not None} + + for sample in self.mc.measurements_list(**_search_args): + d = sample['dimensions'] + for meas in sample['measurements']: + m = self._convert_to_dict( + meas, sample['columns']) + vm = m['value_meta'] + if not self._match_metaquery_to_value_meta(q, vm): + continue + result_count += 1 + yield api_models.Sample( + source=d.get('source'), + counter_name=sample['name'], + counter_type=d.get('type'), + counter_unit=d.get('unit'), + counter_volume=m['value'], + user_id=d.get('user_id'), + project_id=d.get('project_id'), + resource_id=d.get('resource_id'), + timestamp=timeutils.parse_isotime(m['timestamp']), + resource_metadata=m['value_meta'], + message_id=sample['id'], + message_signature='', + recorded_at=(timeutils.parse_isotime(m['timestamp']))) + + if result_count == limit: + return def get_meter_statistics(self, filter, period=None, groupby=None, aggregate=None): @@ -610,12 +507,10 @@ class Connection(base.Connection): dimensions=dims_filter) group_stats_list = [] - for task_cnt, task in enumerate( - self.get_next_task_args(**_metric_args)): - + for metric in self.mc.metrics_list(**_metric_args): _search_args = dict( - name=task['metric'], - dimensions=task['dimension'], + name=metric['name'], + dimensions=metric['dimensions'], start_time=filter.start_timestamp, end_time=filter.end_timestamp, period=period, diff --git a/ceilosca/ceilometer/tests/unit/storage/test_impl_monasca.py b/ceilosca/ceilometer/tests/unit/storage/test_impl_monasca.py index 5bf50f3..2e819d9 100644 --- a/ceilosca/ceilometer/tests/unit/storage/test_impl_monasca.py +++ b/ceilosca/ceilometer/tests/unit/storage/test_impl_monasca.py @@ -28,6 +28,19 @@ from ceilometer.storage import impl_monasca class TestGetResources(base.BaseTestCase): + + dummy_get_resources_mocked_return_value = ( + [{u'dimensions': {}, + u'measurements': [[u'2015-04-14T17:52:31Z', 1.0, {}]], + u'id': u'2015-04-14T18:42:31Z', + u'columns': [u'timestamp', u'value', u'value_meta'], + u'name': u'image'}]) + + def setUp(self): + super(TestGetResources, self).setUp() + self.CONF = self.useFixture(fixture_config.Config()).conf + self.CONF([], project='ceilometer', validate_default_values=True) + @mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter") def test_not_implemented_params(self, mock_mdf): with mock.patch("ceilometer.monasca_client.Client"): @@ -69,9 +82,10 @@ class TestGetResources(base.BaseTestCase): 'dimensions': {}} ] kwargs = dict(source='openstack') - list(conn.get_resources(**kwargs)) - ml_mock = mock_client().measurements_list + ml_mock.return_value = ( + TestGetResources.dummy_get_resources_mocked_return_value) + list(conn.get_resources(**kwargs)) self.assertEqual(2, ml_mock.call_count) self.assertEqual(dict(dimensions={}, name='metric1', @@ -79,6 +93,42 @@ class TestGetResources(base.BaseTestCase): start_time='1970-01-01T00:00:00Z'), ml_mock.call_args_list[0][1]) + @mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter") + def test_get_resources_limit(self, mdf_mock): + with mock.patch("ceilometer.monasca_client.Client") as mock_client: + conn = impl_monasca.Connection("127.0.0.1:8080") + + mnl_mock = mock_client().metrics_list + mnl_mock.return_value = [{'name': 'metric1', + 'dimensions': {'resource_id': 'abcd'}}, + {'name': 'metric2', + 'dimensions': {'resource_id': 'abcd'}} + ] + + dummy_get_resources_mocked_return_value = ( + [{u'dimensions': {u'resource_id': u'abcd'}, + u'measurements': [[u'2015-04-14T17:52:31Z', 1.0, {}], + [u'2015-04-15T17:52:31Z', 2.0, {}], + [u'2015-04-16T17:52:31Z', 3.0, {}]], + u'id': u'2015-04-14T18:42:31Z', + u'columns': [u'timestamp', u'value', u'value_meta'], + u'name': u'image'}]) + + ml_mock = mock_client().measurements_list + ml_mock.return_value = ( + TestGetSamples.dummy_metrics_mocked_return_value + ) + ml_mock = mock_client().measurements_list + ml_mock.return_value = ( + dummy_get_resources_mocked_return_value) + + sample_filter = storage.SampleFilter( + meter='specific meter', end_timestamp='2015-04-20T00:00:00Z') + resources = list(conn.get_resources(sample_filter, limit=2)) + self.assertEqual(2, len(resources)) + self.assertEqual(True, ml_mock.called) + self.assertEqual(2, ml_mock.call_count) + @mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter") def test_get_resources_simple_metaquery(self, mock_mdf): with mock.patch("ceilometer.monasca_client.Client") as mock_client: @@ -92,8 +142,12 @@ class TestGetResources(base.BaseTestCase): 'value_meta': {'key': 'value2'}}, ] kwargs = dict(metaquery={'metadata.key': 'value1'}) - list(conn.get_resources(**kwargs)) + ml_mock = mock_client().measurements_list + ml_mock.return_value = ( + TestGetResources.dummy_get_resources_mocked_return_value) + list(conn.get_resources(**kwargs)) + self.assertEqual(2, ml_mock.call_count) self.assertEqual(dict(dimensions={}, name='metric2', @@ -155,7 +209,6 @@ class TestGetSamples(base.BaseTestCase): super(TestGetSamples, self).setUp() self.CONF = self.useFixture(fixture_config.Config()).conf self.CONF([], project='ceilometer', validate_default_values=True) - self.CONF.set_override('query_concurrency_limit', 3, group='monasca') @mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter") def test_get_samples_not_implemented_params(self, mdf_mock): @@ -177,37 +230,10 @@ class TestGetSamples(base.BaseTestCase): self.assertRaises(ceilometer.NotImplementedError, lambda: list(conn.get_samples(sample_filter))) - def get_concurrent_task_args(self, conn, start_time, end_time, - sample_filter, dimensions=None, limit=None): - - delta = ((end_time - start_time) / - self.CONF.monasca.query_concurrency_limit) - expected_args_list = [] - for start, end in impl_monasca.Connection.get_next_time_delta( - conn, start_time, end_time, delta): - if limit: - expected_args_list.append(dict( - dimensions=dimensions if dimensions else {}, - start_time=timeutils.isotime(start), - start_timestamp_op=sample_filter.start_timestamp_op, - merge_metrics=False, name='specific meter', - limit=limit, - end_time=timeutils.isotime(end))) - else: - expected_args_list.append(dict( - dimensions=dimensions if dimensions else {}, - start_time=timeutils.isotime(start), - start_timestamp_op=sample_filter.start_timestamp_op, - merge_metrics=False, name='specific meter', - end_time=timeutils.isotime(end))) - - return expected_args_list - @mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter") def test_get_samples_name(self, mdf_mock): with mock.patch("ceilometer.monasca_client.Client") as mock_client: conn = impl_monasca.Connection("127.0.0.1:8080") - metrics_list_mock = mock_client().metrics_list metrics_list_mock.return_value = ( TestGetSamples.dummy_metrics_mocked_return_value @@ -215,22 +241,17 @@ class TestGetSamples(base.BaseTestCase): ml_mock = mock_client().measurements_list ml_mock.return_value = ( TestGetSamples.dummy_get_samples_mocked_return_value) - - start_time = datetime.datetime(1970, 1, 1) - end_time = datetime.datetime(2015, 4, 20) - sample_filter = storage.SampleFilter( - meter='specific meter', - end_timestamp=timeutils.isotime(end_time)) - + meter='specific meter', end_timestamp='2015-04-20T00:00:00Z') list(conn.get_samples(sample_filter)) self.assertEqual(True, ml_mock.called) - - expected_args_list = self.get_concurrent_task_args( - conn, start_time, end_time, sample_filter) - self.assertEqual(3, ml_mock.call_count) - (self.assertIn(call_arg[1], expected_args_list) - for call_arg in ml_mock.call_args_list) + self.assertEqual(dict( + dimensions={}, + start_time='1970-01-01T00:00:00Z', + merge_metrics=False, name='specific meter', + end_time='2015-04-20T00:00:00Z'), + ml_mock.call_args[1]) + self.assertEqual(1, ml_mock.call_count) @mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter") def test_get_samples_start_timestamp_filter(self, mdf_mock): @@ -246,7 +267,6 @@ class TestGetSamples(base.BaseTestCase): TestGetSamples.dummy_get_samples_mocked_return_value) start_time = datetime.datetime(2015, 3, 20) - end_time = datetime.datetime.utcnow() sample_filter = storage.SampleFilter( meter='specific meter', @@ -254,14 +274,7 @@ class TestGetSamples(base.BaseTestCase): start_timestamp_op='ge') list(conn.get_samples(sample_filter)) self.assertEqual(True, ml_mock.called) - - expected_args_list = self.get_concurrent_task_args(conn, - start_time, - end_time, - sample_filter) - self.assertEqual(3, ml_mock.call_count) - (self.assertIn(call_arg[1], expected_args_list) - for call_arg in ml_mock.call_args_list) + self.assertEqual(1, ml_mock.call_count) @mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter") def test_get_samples_limit(self, mdf_mock): @@ -269,30 +282,29 @@ class TestGetSamples(base.BaseTestCase): conn = impl_monasca.Connection("127.0.0.1:8080") metrics_list_mock = mock_client().metrics_list + + dummy_get_samples_mocked_return_value = ( + [{u'dimensions': {}, + u'measurements': [[u'2015-04-14T17:52:31Z', 1.0, {}], + [u'2015-04-15T17:52:31Z', 2.0, {}], + [u'2015-04-16T17:52:31Z', 3.0, {}]], + u'id': u'2015-04-14T18:42:31Z', + u'columns': [u'timestamp', u'value', u'value_meta'], + u'name': u'image'}]) + metrics_list_mock.return_value = ( TestGetSamples.dummy_metrics_mocked_return_value ) ml_mock = mock_client().measurements_list ml_mock.return_value = ( - TestGetSamples.dummy_get_samples_mocked_return_value) - - start_time = datetime.datetime(1970, 1, 1) - end_time = datetime.datetime(2015, 4, 20) + dummy_get_samples_mocked_return_value) sample_filter = storage.SampleFilter( meter='specific meter', end_timestamp='2015-04-20T00:00:00Z') - list(conn.get_samples(sample_filter, limit=50)) + samples = list(conn.get_samples(sample_filter, limit=2)) + self.assertEqual(2, len(samples)) self.assertEqual(True, ml_mock.called) - - expected_args_list = self.get_concurrent_task_args(conn, - start_time, - end_time, - sample_filter, - limit=50) - - self.assertEqual(3, ml_mock.call_count) - (self.assertIn(call_arg[1], expected_args_list) - for call_arg in ml_mock.call_args_list) + self.assertEqual(1, ml_mock.call_count) @mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter") def test_get_samples_project_filter(self, mock_mdf): @@ -309,20 +321,11 @@ class TestGetSamples(base.BaseTestCase): ml_mock.return_value = ( TestGetSamples.dummy_get_samples_mocked_return_value) - start_time = datetime.datetime(1970, 1, 1) - end_time = datetime.datetime.utcnow() sample_filter = storage.SampleFilter(meter='specific meter', project='specific project') list(conn.get_samples(sample_filter)) self.assertEqual(True, ml_mock.called) - - expected_args_list = self.get_concurrent_task_args( - conn, start_time, end_time, sample_filter, - dimensions=dict(project_id=sample_filter.project)) - - self.assertEqual(3, ml_mock.call_count) - (self.assertIn(call_arg[1], expected_args_list) - for call_arg in ml_mock.call_args_list) + self.assertEqual(1, ml_mock.call_count) @mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter") def test_get_samples_resource_filter(self, mock_mdf): @@ -338,20 +341,11 @@ class TestGetSamples(base.BaseTestCase): ml_mock.return_value = ( TestGetSamples.dummy_get_samples_mocked_return_value) - start_time = datetime.datetime(1970, 1, 1) - end_time = datetime.datetime.utcnow() sample_filter = storage.SampleFilter(meter='specific meter', resource='specific resource') list(conn.get_samples(sample_filter)) self.assertEqual(True, ml_mock.called) - - expected_args_list = self.get_concurrent_task_args( - conn, start_time, end_time, sample_filter, - dimensions=dict(resource_id=sample_filter.resource)) - - self.assertEqual(3, ml_mock.call_count) - (self.assertIn(call_arg[1], expected_args_list) - for call_arg in ml_mock.call_args_list) + self.assertEqual(1, ml_mock.call_count) @mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter") def test_get_samples_source_filter(self, mdf_mock): @@ -367,20 +361,11 @@ class TestGetSamples(base.BaseTestCase): ml_mock.return_value = ( TestGetSamples.dummy_get_samples_mocked_return_value) - start_time = datetime.datetime(1970, 1, 1) - end_time = datetime.datetime.utcnow() sample_filter = storage.SampleFilter(meter='specific meter', source='specific source') list(conn.get_samples(sample_filter)) self.assertEqual(True, ml_mock.called) - - expected_args_list = self.get_concurrent_task_args( - conn, start_time, end_time, sample_filter, - dimensions=dict(source=sample_filter.source)) - - self.assertEqual(3, ml_mock.call_count) - (self.assertIn(call_arg[1], expected_args_list) - for call_arg in ml_mock.call_args_list) + self.assertEqual(1, ml_mock.call_count) @mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter") def test_get_samples_simple_metaquery(self, mdf_mock): @@ -394,20 +379,12 @@ class TestGetSamples(base.BaseTestCase): ml_mock.return_value = ( TestGetSamples.dummy_get_samples_mocked_return_value) - start_time = datetime.datetime(1970, 1, 1) - end_time = datetime.datetime.utcnow() sample_filter = storage.SampleFilter( meter='specific meter', metaquery={'metadata.key': u'value'}) list(conn.get_samples(sample_filter)) self.assertEqual(True, ml_mock.called) - - expected_args_list = self.get_concurrent_task_args( - conn, start_time, end_time, sample_filter) - - self.assertEqual(3, ml_mock.call_count) - (self.assertIn(call_arg[1], expected_args_list) - for call_arg in ml_mock.call_args_list) + self.assertEqual(1, ml_mock.call_count) @mock.patch("ceilometer.storage.impl_monasca.MonascaDataFilter") def test_get_samples_results(self, mdf_mock): @@ -480,7 +457,7 @@ class TestGetSamples(base.BaseTestCase): get('measurements')[0][0])) self.assertEqual(results[0].user_id, None) - self.assertEqual(3, ml_mock.call_count) + self.assertEqual(1, ml_mock.call_count) class MeterStatisticsTest(base.BaseTestCase):