diff --git a/ceilometer/keystone_client.py b/ceilometer/keystone_client.py index 835bf3fc84..7c44bf6a66 100644 --- a/ceilometer/keystone_client.py +++ b/ceilometer/keystone_client.py @@ -23,7 +23,7 @@ DEFAULT_GROUP = "service_credentials" # List of group that can set auth_section to use a different # credentials section -OVERRIDABLE_GROUPS = ['gnocchi', 'zaqar'] +OVERRIDABLE_GROUPS = ['gnocchi', 'zaqar', 'monasca'] def get_session(conf, requests_session=None, group=None, timeout=None): diff --git a/ceilometer/monasca_client.py b/ceilometer/monasca_client.py new file mode 100644 index 0000000000..4835fdcc31 --- /dev/null +++ b/ceilometer/monasca_client.py @@ -0,0 +1,112 @@ +# Copyright 2015 Hewlett-Packard Company +# (c) Copyright 2018 SUSE LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from monascaclient import client +from monascaclient import exc +from oslo_log import log +import tenacity + +from ceilometer.i18n import _ +from ceilometer import keystone_client + +LOG = log.getLogger(__name__) + + +class MonascaException(Exception): + def __init__(self, message=''): + msg = 'An exception is raised from Monasca: ' + message + super(MonascaException, self).__init__(msg) + + +class MonascaServiceException(Exception): + def __init__(self, message=''): + msg = 'Monasca service is unavailable: ' + message + super(MonascaServiceException, self).__init__(msg) + + +class MonascaInvalidParametersException(Exception): + code = 400 + + def __init__(self, message=''): + msg = 'Request cannot be handled by Monasca: ' + message + super(MonascaInvalidParametersException, self).__init__(msg) + + +class Client(object): + """A client which gets information via python-monascaclient.""" + + def __init__(self, conf, parsed_url): + self.conf = conf + self._retry_interval = conf.monasca.client_retry_interval + self._max_retries = conf.monasca.client_max_retries or 1 + self._enable_api_pagination = conf.monasca.enable_api_pagination + # NOTE(zqfan): There are many concurrency requests while using + # Ceilosca, to save system resource, we don't retry too many times. + if self._max_retries < 0 or self._max_retries > 10: + LOG.warning('Reduce max retries from %s to 10', + self._max_retries) + self._max_retries = 10 + + monasca_auth_group = conf.monasca.auth_section + session = keystone_client.get_session(conf, group=monasca_auth_group) + + self._endpoint = parsed_url.netloc + parsed_url.path + LOG.info(_("monasca_client: using %s as Monasca endpoint") % + self._endpoint) + + self._get_client(session) + + def _get_client(self, session): + self._mon_client = client.Client(self.conf.monasca.clientapi_version, + endpoint=self._endpoint, + session=session) + + def call_func(self, func, **kwargs): + """General method for calling any Monasca API function.""" + @tenacity.retry( + wait=tenacity.wait_fixed(self._retry_interval), + stop=tenacity.stop_after_attempt(self._max_retries), + retry=(tenacity.retry_if_exception_type(MonascaServiceException) | + tenacity.retry_if_exception_type(MonascaException))) + def _inner(): + try: + return func(**kwargs) + except (exc.http.InternalServerError, + exc.http.ServiceUnavailable, + exc.http.BadGateway, + exc.connection.ConnectionError) as e: + LOG.exception(e) + msg = '%s: %s' % (e.__class__.__name__, e) + raise MonascaServiceException(msg) + except exc.http.HttpError as e: + LOG.exception(e) + msg = '%s: %s' % (e.__class__.__name__, e) + status_code = e.http_status + if not isinstance(status_code, int): + status_code = 500 + if 400 <= status_code < 500: + raise MonascaInvalidParametersException(msg) + else: + raise MonascaException(msg) + except Exception as e: + LOG.exception(e) + msg = '%s: %s' % (e.__class__.__name__, e) + raise MonascaException(msg) + + return _inner() + + def metrics_create(self, **kwargs): + return self.call_func(self._mon_client.metrics.create, + **kwargs) diff --git a/ceilometer/monasca_opts.py b/ceilometer/monasca_opts.py new file mode 100644 index 0000000000..2d9190f179 --- /dev/null +++ b/ceilometer/monasca_opts.py @@ -0,0 +1,92 @@ +# +# (c) Copyright 2018 SUSE LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" All monasca ceilometer config opts""" + +from oslo_config import cfg + +OPTS = [ + + # from monasca_client + cfg.StrOpt('clientapi_version', + default='2_0', + help='Version of Monasca client to use while publishing.'), + cfg.BoolOpt('enable_api_pagination', + default=False, + help='Enable paging through monasca api resultset.'), + + # from monasca_data_filter + cfg.StrOpt('monasca_mappings', + default='/etc/ceilometer/monasca_field_definitions.yaml', + help='Monasca static and dynamic field mappings'), + + # from multi region opts + cfg.StrOpt('control_plane', + default='None', + help='The name of control plane'), + cfg.StrOpt('cluster', + default='None', + help='The name of cluster'), + cfg.StrOpt('cloud_name', + default='None', + help='The name of cloud'), + + # from publisher monasca + cfg.BoolOpt('batch_mode', + default=True, + help='Indicates whether samples are' + ' published in a batch.'), + cfg.IntOpt('batch_count', + default=1000, + help='Maximum number of samples in a batch.'), + cfg.IntOpt('batch_timeout', + default=15, + help='Maximum time interval(seconds) after which ' + 'samples are published in a batch.'), + cfg.IntOpt('batch_polling_interval', + default=5, + help='Frequency of checking if batch criteria is met.'), + cfg.BoolOpt('retry_on_failure', + default=False, + help='Indicates whether publisher retries publishing' + 'sample in case of failure. Only a few error cases ' + 'are queued for a retry.'), + # NOTE: the retry interval is hard coded for the periodicals decorator + cfg.IntOpt('batch_max_retries', + default=3, + help='Maximum number of retry attempts on a publishing ' + 'failure to Monasca API.'), + cfg.BoolOpt('archive_on_failure', + default=False, + help='When turned on, archives metrics in file system when ' + 'publish to Monasca fails or metric publish maxes out ' + 'retry attempts.'), + cfg.StrOpt('archive_path', + default='mon_pub_failures.txt', + help='File of metrics that failed to publish to ' + 'Monasca. These include metrics that failed to ' + 'publish on first attempt and failed metrics that' + ' maxed out their retries.'), + # For use with the monasca_client + cfg.IntOpt('client_max_retries', + default=3, + help='Maximum number of retry attempts of connecting to ' + 'Monasca API.'), + cfg.IntOpt('client_retry_interval', + default=60, + help='Frequency of attempting a retry connecting to Monasca ' + 'API.'), + +] diff --git a/ceilometer/opts.py b/ceilometer/opts.py index 8b844cc00a..e37461f981 100644 --- a/ceilometer/opts.py +++ b/ceilometer/opts.py @@ -30,6 +30,7 @@ import ceilometer.ipmi.platform.intel_node_manager import ceilometer.ipmi.pollsters import ceilometer.keystone_client import ceilometer.meter.notifications +import ceilometer.monasca_opts import ceilometer.neutron_client import ceilometer.notification import ceilometer.nova_client @@ -99,6 +100,7 @@ def list_opts(): itertools.chain(ceilometer.ipmi.platform.intel_node_manager.OPTS, ceilometer.ipmi.pollsters.OPTS)), ('meter', ceilometer.meter.notifications.OPTS), + ('monasca', ceilometer.monasca_opts.OPTS), ('notification', itertools.chain(ceilometer.notification.OPTS, ceilometer.notification.EXCHANGES_OPTS)), diff --git a/ceilometer/publisher/monasca.py b/ceilometer/publisher/monasca.py new file mode 100755 index 0000000000..37aee07ea1 --- /dev/null +++ b/ceilometer/publisher/monasca.py @@ -0,0 +1,250 @@ +# +# Copyright 2015 Hewlett Packard +# (c) Copyright 2018 SUSE LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from futurist import periodics + +import os +import threading +import time + +from oslo_log import log +from six import moves + +import ceilometer +from ceilometer import monasca_client as mon_client +from ceilometer import publisher +from ceilometer.publisher.monasca_data_filter import MonascaDataFilter + +from monascaclient import exc +import traceback + +# Have to use constants rather than conf to satisfy @periodicals +BATCH_POLLING_INTERVAL = 5 +BATCH_RETRY_INTERVAL = 60 + +LOG = log.getLogger(__name__) + + +class MonascaPublisher(publisher.ConfigPublisherBase): + """Publisher to publish samples to monasca using monasca-client. + + Example URL to place in pipeline.yaml: + - monasca://http://192.168.10.4:8070/v2.0 + """ + def __init__(self, conf, parsed_url): + super(MonascaPublisher, self).__init__(conf, parsed_url) + + # list to hold metrics to be published in batch (behaves like queue) + self.metric_queue = [] + self.time_of_last_batch_run = time.time() + + self.mon_client = mon_client.Client(self.conf, parsed_url) + self.mon_filter = MonascaDataFilter(self.conf) + + # add flush_batch function to periodic callables + periodic_callables = [ + # The function to run + any automatically provided + # positional and keyword arguments to provide to it + # everytime it is activated. + (self.flush_batch, (), {}), + ] + + if self.conf.monasca.retry_on_failure: + # list to hold metrics to be re-tried (behaves like queue) + self.retry_queue = [] + # list to store retry attempts for metrics in retry_queue + self.retry_counter = [] + + # add retry_batch function to periodic callables + periodic_callables.append((self.retry_batch, (), {})) + + if self.conf.monasca.archive_on_failure: + archive_path = self.conf.monasca.archive_path + if not os.path.exists(archive_path): + archive_path = self.conf.find_file(archive_path) + + self.archive_handler = publisher.get_publisher( + self.conf, + 'file://' + + str(archive_path), + 'ceilometer.sample.publisher') + + # start periodic worker + self.periodic_worker = periodics.PeriodicWorker(periodic_callables) + self.periodic_thread = threading.Thread( + target=self.periodic_worker.start) + self.periodic_thread.daemon = True + self.periodic_thread.start() + + def _publish_handler(self, func, metrics, batch=False): + """Handles publishing and exceptions that arise.""" + + try: + metric_count = len(metrics) + if batch: + func(**{'jsonbody': metrics}) + else: + func(**metrics[0]) + LOG.info('Successfully published %d metric(s)' % metric_count) + except mon_client.MonascaServiceException: + # Assuming atomicity of create or failure - meaning + # either all succeed or all fail in a batch + LOG.error('Metric create failed for %(count)d metric(s) with' + ' name(s) %(names)s ' % + ({'count': len(metrics), + 'names': ','.join([metric['name'] + for metric in metrics])})) + if self.conf.monasca.retry_on_failure: + # retry payload in case of internal server error(500), + # service unavailable error(503),bad gateway (502) or + # Communication Error + + # append failed metrics to retry_queue + LOG.debug('Adding metrics to retry queue.') + self.retry_queue.extend(metrics) + # initialize the retry_attempt for the each failed + # metric in retry_counter + self.retry_counter.extend( + [0 * i for i in range(metric_count)]) + else: + if hasattr(self, 'archive_handler'): + self.archive_handler.publish_samples(None, metrics) + except Exception: + LOG.info(traceback.format_exc()) + if hasattr(self, 'archive_handler'): + self.archive_handler.publish_samples(None, metrics) + + def publish_samples(self, samples): + """Main method called to publish samples.""" + + for sample in samples: + metric = self.mon_filter.process_sample_for_monasca(sample) + # In batch mode, push metric to queue, + # else publish the metric + if self.conf.monasca.batch_mode: + LOG.debug('Adding metric to queue.') + self.metric_queue.append(metric) + else: + LOG.info('Publishing metric with name %(name)s and' + ' timestamp %(ts)s to endpoint.' % + ({'name': metric['name'], + 'ts': metric['timestamp']})) + + self._publish_handler(self.mon_client.metrics_create, [metric]) + + def is_batch_ready(self): + """Method to check if batch is ready to trigger.""" + + previous_time = self.time_of_last_batch_run + current_time = time.time() + elapsed_time = current_time - previous_time + + if elapsed_time >= self.conf.monasca.batch_timeout and len(self. + metric_queue) > 0: + LOG.info('Batch timeout exceeded, triggering batch publish.') + return True + else: + if len(self.metric_queue) >= self.conf.monasca.batch_count: + LOG.info('Batch queue full, triggering batch publish.') + return True + else: + return False + + @periodics.periodic(BATCH_POLLING_INTERVAL) + def flush_batch(self): + """Method to flush the queued metrics.""" + # print "flush batch... %s" % str(time.time()) + if self.is_batch_ready(): + # publish all metrics in queue at this point + batch_count = len(self.metric_queue) + + LOG.info("batch is ready: batch_count %s" % str(batch_count)) + + self._publish_handler(self.mon_client.metrics_create, + self.metric_queue[:batch_count], + batch=True) + + self.time_of_last_batch_run = time.time() + # slice queue to remove metrics that + # published with success or failed and got queued on + # retry queue + self.metric_queue = self.metric_queue[batch_count:] + + def is_retry_ready(self): + """Method to check if retry batch is ready to trigger.""" + + if len(self.retry_queue) > 0: + LOG.info('Retry queue has items, triggering retry.') + return True + else: + return False + + @periodics.periodic(BATCH_RETRY_INTERVAL) + def retry_batch(self): + """Method to retry the failed metrics.""" + # print "retry batch...%s" % str(time.time()) + if self.is_retry_ready(): + retry_count = len(self.retry_queue) + + # Iterate over the retry_queue to eliminate + # metrics that have maxed out their retry attempts + for ctr in moves.xrange(retry_count): + if self.retry_counter[ctr] > self.conf.\ + monasca.batch_max_retries: + if hasattr(self, 'archive_handler'): + self.archive_handler.publish_samples( + None, + [self.retry_queue[ctr]]) + LOG.info('Removing metric %s from retry queue.' + ' Metric retry maxed out retry attempts' % + self.retry_queue[ctr]['name']) + del self.retry_queue[ctr] + del self.retry_counter[ctr] + + # Iterate over the retry_queue to retry the + # publish for each metric. + # If an exception occurs, the retry count for + # the failed metric is incremented. + # If the retry succeeds, remove the metric and + # the retry count from the retry_queue and retry_counter resp. + ctr = 0 + while ctr < len(self.retry_queue): + try: + LOG.info('Retrying metric publish from retry queue.') + self.mon_client.metrics_create(**self.retry_queue[ctr]) + # remove from retry queue if publish was success + LOG.info('Retrying metric %s successful,' + ' removing metric from retry queue.' % + self.retry_queue[ctr]['name']) + del self.retry_queue[ctr] + del self.retry_counter[ctr] + except exc.ClientException: + LOG.error('Exception encountered in retry. ' + 'Batch will be retried in next attempt.') + # if retry failed, increment the retry counter + self.retry_counter[ctr] += 1 + ctr += 1 + + def flush_to_file(self): + # TODO(persist maxed-out metrics to file) + pass + + def publish_events(self, events): + """Send an event message for publishing + + :param events: events from pipeline after transformation + """ + raise ceilometer.NotImplementedError diff --git a/ceilometer/publisher/monasca_data_filter.py b/ceilometer/publisher/monasca_data_filter.py new file mode 100644 index 0000000000..62dd82339f --- /dev/null +++ b/ceilometer/publisher/monasca_data_filter.py @@ -0,0 +1,229 @@ +# +# Copyright 2015 Hewlett-Packard Company +# (c) Copyright 2018 SUSE LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime + +from jsonpath_rw_ext import parser +from oslo_log import log +from oslo_utils import timeutils +import six +import yaml + +from ceilometer import sample as sample_util + +LOG = log.getLogger(__name__) + + +class UnableToLoadMappings(Exception): + pass + + +class NoMappingsFound(Exception): + pass + + +class CeiloscaMappingDefinitionException(Exception): + def __init__(self, message, definition_cfg): + super(CeiloscaMappingDefinitionException, self).__init__(message) + self.message = message + self.definition_cfg = definition_cfg + + def __str__(self): + return '%s %s: %s' % (self.__class__.__name__, + self.definition_cfg, self.message) + + +class MonascaDataFilter(object): + JSONPATH_RW_PARSER = parser.ExtentedJsonPathParser() + + def __init__(self, conf): + self.conf = conf + self._mapping = {} + self._mapping = self._get_mapping() + + def _get_mapping(self): + with open(self.conf.monasca.monasca_mappings, 'r') as f: + try: + return yaml.safe_load(f) + except yaml.YAMLError as err: + if hasattr(err, 'problem_mark'): + mark = err.problem_mark + errmsg = ("Invalid YAML syntax in Monasca Data " + "Filter file %(file)s at line: " + "%(line)s, column: %(column)s." + % dict(file=self.conf.monasca.monasca_mappings, + line=mark.line + 1, + column=mark.column + 1)) + else: + errmsg = ("YAML error reading Monasca Data Filter " + "file %(file)s" % + dict(file=self.conf.monasca.monasca_mappings)) + LOG.error(errmsg) + raise UnableToLoadMappings(err.message) + + def _convert_timestamp(self, timestamp): + if isinstance(timestamp, datetime.datetime): + ts = timestamp + else: + ts = timeutils.parse_isotime(timestamp) + tdelta = (ts - datetime.datetime(1970, 1, 1, tzinfo=ts.tzinfo)) + # convert timestamp to milli seconds as Monasca expects + return int(tdelta.total_seconds() * 1000) + + def _convert_to_sample(self, s): + return sample_util.Sample( + name=s['counter_name'], + type=s['counter_type'], + unit=s['counter_unit'], + volume=s['counter_volume'], + user_id=s['user_id'], + project_id=s['project_id'], + resource_id=s['resource_id'], + timestamp=s['timestamp'], + resource_metadata=s['resource_metadata'], + source=s.get('source')).as_dict() + + def get_value_for_nested_dictionary(self, lst, dct): + val = dct + for element in lst: + if isinstance(val, dict) and element in val: + val = val.get(element) + else: + return + return val + + def parse_jsonpath(self, field): + try: + parts = self.JSONPATH_RW_PARSER.parse(field) + except Exception as e: + raise CeiloscaMappingDefinitionException( + "Parse error in JSONPath specification " + "'%(jsonpath)s': %(err)s" + % dict(jsonpath=field, err=e)) + return parts + + def _get_value_metadata_for_key(self, sample_meta, meta_key): + """Get the data for the given key, supporting JSONPath""" + if isinstance(meta_key, dict): + # extract key and jsonpath + # If following convention, dict will have one and only one + # element of the form : + if len(meta_key.keys()) == 1: + mon_key = list(meta_key.keys())[0] + else: + # If no keys or more keys than one + raise CeiloscaMappingDefinitionException( + "Field definition format mismatch, should " + "have only one key:value pair. %(meta_key)s" % + {'meta_key': meta_key}, meta_key) + json_path = meta_key[mon_key] + parts = self.parse_jsonpath(json_path) + val_matches = parts.find(sample_meta) + if len(val_matches) > 0: + # resolve the find to the first match and get value + val = val_matches[0].value + if not isinstance(val, (str, six.text_type)) \ + and not isinstance(val, int): + # Don't support lists or dicts or ... + raise CeiloscaMappingDefinitionException( + "Metadata format mismatch, value " + "should be a simple string. %(valuev)s" % + {'valuev': val}, meta_key) + else: + val = 'None' + return mon_key, val + else: + # simple string + val = sample_meta.get(meta_key, None) + if val is not None: + return meta_key, val + else: + # one more attempt using a dotted notation + # TODO(joadavis) Deprecate this . notation code + # in favor of jsonpath + if len(meta_key.split('.')) > 1: + val = self.get_value_for_nested_dictionary( + meta_key.split('.'), sample_meta) + if val is not None: + return meta_key, val + else: + return meta_key, 'None' + else: + return meta_key, 'None' + + def process_sample_for_monasca(self, sample_obj): + if not self._mapping: + raise NoMappingsFound("Unable to process the sample") + + dimensions = {} + dimensions['datasource'] = 'ceilometer' + # control_plane, cluster and cloud_name can be None, but we use + # literal 'None' for such case + dimensions['control_plane'] = self.conf.monasca.control_plane or 'None' + dimensions['cluster'] = self.conf.monasca.cluster or 'None' + dimensions['cloud_name'] = self.conf.monasca.cloud_name or 'None' + if isinstance(sample_obj, sample_util.Sample): + sample = sample_obj.as_dict() + elif isinstance(sample_obj, dict): + if 'counter_name' in sample_obj: + sample = self._convert_to_sample(sample_obj) + else: + sample = sample_obj + + for dim in self._mapping['dimensions']: + val = sample.get(dim, None) + if val: + dimensions[dim] = val + else: + dimensions[dim] = 'None' + + sample_meta = sample.get('resource_metadata', None) + value_meta = {} + + meter_name = sample.get('name') or sample.get('counter_name') + if sample_meta: + for meta_key in self._mapping['metadata']['common']: + monasca_key, val = self._get_value_metadata_for_key( + sample_meta, meta_key) + value_meta[monasca_key] = val + + if meter_name in self._mapping['metadata'].keys(): + for meta_key in self._mapping['metadata'][meter_name]: + monasca_key, val = self._get_value_metadata_for_key( + sample_meta, meta_key) + value_meta[monasca_key] = val + + meter_value = sample.get('volume') or sample.get('counter_volume') + if meter_value is None: + meter_value = 0 + + metric = dict( + name=meter_name, + timestamp=self._convert_timestamp(sample['timestamp']), + value=meter_value, + dimensions=dimensions, + value_meta=value_meta, + ) + + LOG.debug("Generated metric with name %(name)s," + " timestamp %(timestamp)s, value %(value)s," + " dimensions %(dimensions)s" % + {'name': metric['name'], + 'timestamp': metric['timestamp'], + 'value': metric['value'], + 'dimensions': metric['dimensions']}) + + return metric diff --git a/ceilometer/tests/unit/publisher/test_monasca_data_filter.py b/ceilometer/tests/unit/publisher/test_monasca_data_filter.py new file mode 100644 index 0000000000..8e6dfcae3b --- /dev/null +++ b/ceilometer/tests/unit/publisher/test_monasca_data_filter.py @@ -0,0 +1,512 @@ +# +# Copyright 2015 Hewlett-Packard Company +# (c) Copyright 2018 SUSE LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime +import mock +from oslo_utils import timeutils +from oslotest import base + +from ceilometer import monasca_opts +from ceilometer.publisher import monasca_data_filter as mdf +from ceilometer import sample +from ceilometer import service + + +class TestMonUtils(base.BaseTestCase): + def setUp(self): + super(TestMonUtils, self).setUp() + self.CONF = service.prepare_service([], []) + self.CONF.register_opts(list(monasca_opts.OPTS), + 'monasca') + self._field_mappings = { + 'dimensions': ['resource_id', + 'project_id', + 'user_id', + 'geolocation', + 'region', + 'source', + 'availability_zone'], + + 'metadata': { + 'common': ['event_type', + 'audit_period_beginning', + 'audit_period_ending'], + 'image': ['size', 'status', 'image_meta.base_url', + 'image_meta.base_url2', 'image_meta.base_url3', + 'image_meta.base_url4'], + 'image.delete': ['size', 'status'], + 'image.size': ['size', 'status'], + 'image.update': ['size', 'status'], + 'image.upload': ['size', 'status'], + 'instance': ['state', 'state_description'], + 'snapshot': ['status'], + 'snapshot.size': ['status'], + 'volume': ['status'], + 'volume.size': ['status'], + } + } + self._field_mappings_cinder = { + 'dimensions': ['resource_id', + 'project_id', + 'user_id', + 'geolocation', + 'region', + 'source', + 'availability_zone'], + + 'metadata': { + 'common': ['event_type', + 'audit_period_beginning', + 'audit_period_ending', + 'arbitrary_new_field'], + 'volume.create.end': + ['size', 'status', + {'metering.prn_name': + "$.metadata[?(@.key = 'metering.prn_name')].value"}, + {'metering.prn_type': + "$.metadata[?(@.key = 'metering.prn_type')].value"}, + 'volume_type', 'created_at', + 'host'], + 'volume': ['status'], + 'volume.size': ['status'], + } + } + + self._field_mappings_bad_format = { + 'dimensions': ['resource_id', + 'project_id', + 'user_id', + 'geolocation', + 'region', + 'source', + 'availability_zone'], + + 'metadata': { + 'common': ['event_type', + 'audit_period_beginning', + 'audit_period_ending', + 'arbitrary_new_field'], + 'volume.create.end': + ['size', 'status', + {'metering.prn_name': + "$.metadata[?(@.key = 'metering.prn_name')].value", + 'metering.prn_type': + "$.metadata[?(@.key = 'metering.prn_type')].value"}, + 'volume_type', 'created_at', + 'host'], + 'volume': ['status'], + 'volume.size': ['status'], + } + } + + def test_process_sample(self): + s = sample.Sample( + name='test', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'name': 'TestPublish'}, + ) + + to_patch = ("ceilometer.publisher.monasca_data_filter." + "MonascaDataFilter._get_mapping") + with mock.patch(to_patch, side_effect=[self._field_mappings]): + data_filter = mdf.MonascaDataFilter(self.CONF) + r = data_filter.process_sample_for_monasca(s) + + self.assertEqual(s.name, r['name']) + self.assertIsNotNone(r.get('dimensions')) + self.assertIsNotNone(r.get('value_meta')) + self.assertIsNotNone(r.get('value')) + self.assertEqual(s.user_id, r['dimensions'].get('user_id')) + self.assertEqual(s.project_id, r['dimensions'].get('project_id')) + self.assertEqual(s.resource_id, r['dimensions'].get('resource_id')) + # 2015-04-07T20:07:06.156986 compare upto millisec + monasca_ts = \ + timeutils.iso8601_from_timestamp(r['timestamp'] / 1000.0, + microsecond=True)[:23] + self.assertEqual(s.timestamp[:23], monasca_ts) + + def test_process_sample_field_mappings(self): + s = sample.Sample( + name='test', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'name': 'TestPublish'}, + ) + + field_map = self._field_mappings + field_map['dimensions'].remove('project_id') + field_map['dimensions'].remove('user_id') + + to_patch = ("ceilometer.publisher.monasca_data_filter." + "MonascaDataFilter._get_mapping") + with mock.patch(to_patch, side_effect=[field_map]): + data_filter = mdf.MonascaDataFilter(self.CONF) + r = data_filter.process_sample_for_monasca(s) + + self.assertIsNone(r['dimensions'].get('project_id')) + self.assertIsNone(r['dimensions'].get('user_id')) + + def convert_dict_to_list(self, dct, prefix=None, outlst={}): + prefix = prefix + '.' if prefix else "" + for k, v in dct.items(): + if type(v) is dict: + self.convert_dict_to_list(v, prefix + k, outlst) + else: + if v is not None: + outlst[prefix + k] = v + else: + outlst[prefix + k] = 'None' + return outlst + + def test_process_sample_metadata(self): + s = sample.Sample( + name='image', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'event_type': 'notification', + 'status': 'active', + 'image_meta': {'base_url': 'http://image.url', + 'base_url2': '', + 'base_url3': None}, + 'size': 1500}, + ) + + to_patch = ("ceilometer.publisher.monasca_data_filter." + "MonascaDataFilter._get_mapping") + with mock.patch(to_patch, side_effect=[self._field_mappings]): + data_filter = mdf.MonascaDataFilter(self.CONF) + r = data_filter.process_sample_for_monasca(s) + self.assertEqual(s.name, r['name']) + self.assertIsNotNone(r.get('value_meta')) + self.assertTrue(set(self.convert_dict_to_list( + s.resource_metadata + ).items()).issubset(set(r['value_meta'].items()))) + + def test_process_sample_metadata_with_empty_data(self): + s = sample.Sample( + name='image', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + source='', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'event_type': 'notification', + 'status': 'active', + 'image_meta': {'base_url': 'http://image.url', + 'base_url2': '', + 'base_url3': None}, + 'size': 0}, + ) + + to_patch = ("ceilometer.publisher.monasca_data_filter." + "MonascaDataFilter._get_mapping") + with mock.patch(to_patch, side_effect=[self._field_mappings]): + data_filter = mdf.MonascaDataFilter(self.CONF) + r = data_filter.process_sample_for_monasca(s) + + self.assertEqual(s.name, r['name']) + self.assertIsNotNone(r.get('value_meta')) + self.assertEqual(s.source, r['dimensions']['source']) + self.assertTrue(set(self.convert_dict_to_list( + s.resource_metadata + ).items()).issubset(set(r['value_meta'].items()))) + + def test_process_sample_metadata_with_extendedKey(self): + s = sample.Sample( + name='image', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + source='', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'event_type': 'notification', + 'status': 'active', + 'image_meta': {'base_url': 'http://image.url', + 'base_url2': '', + 'base_url3': None}, + 'size': 0}, + ) + + to_patch = ("ceilometer.publisher.monasca_data_filter." + "MonascaDataFilter._get_mapping") + with mock.patch(to_patch, side_effect=[self._field_mappings]): + data_filter = mdf.MonascaDataFilter(self.CONF) + r = data_filter.process_sample_for_monasca(s) + + self.assertEqual(s.name, r['name']) + self.assertIsNotNone(r.get('value_meta')) + self.assertTrue(set(self.convert_dict_to_list( + s.resource_metadata + ).items()).issubset(set(r['value_meta'].items()))) + self.assertEqual(r.get('value_meta')['image_meta.base_url'], + s.resource_metadata.get('image_meta') + ['base_url']) + self.assertEqual(r.get('value_meta')['image_meta.base_url2'], + s.resource_metadata.get('image_meta') + ['base_url2']) + self.assertEqual(r.get('value_meta')['image_meta.base_url3'], + str(s.resource_metadata.get('image_meta') + ['base_url3'])) + self.assertEqual(r.get('value_meta')['image_meta.base_url4'], + 'None') + + def test_process_sample_metadata_with_jsonpath(self): + """Test meter sample in a format produced by cinder.""" + s = sample.Sample( + name='volume.create.end', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + source='', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'event_type': 'volume.create.end', + 'status': 'available', + 'volume_type': None, + # 'created_at': '2017-03-21T21:05:44+00:00', + 'host': 'testhost', + # this "value: , key: " format is + # how cinder reports metadata + 'metadata': + [{'value': 'aaa0001', + 'key': 'metering.prn_name'}, + {'value': 'Cust001', + 'key': 'metering.prn_type'}], + 'size': 0}, + ) + + to_patch = ("ceilometer.publisher.monasca_data_filter." + "MonascaDataFilter._get_mapping") + # use the cinder specific mapping + with mock.patch(to_patch, side_effect=[self._field_mappings_cinder]): + data_filter = mdf.MonascaDataFilter(self.CONF) + r = data_filter.process_sample_for_monasca(s) + + self.assertEqual(s.name, r['name']) + self.assertIsNotNone(r.get('value_meta')) + # Using convert_dict_to_list is too simplistic for this + self.assertEqual(r.get('value_meta')['event_type'], + s.resource_metadata.get('event_type'), + "Failed to match common element.") + self.assertEqual(r.get('value_meta')['host'], + s.resource_metadata.get('host'), + "Failed to match meter specific element.") + self.assertEqual(r.get('value_meta')['size'], + s.resource_metadata.get('size'), + "Unable to handle an int.") + self.assertEqual(r.get('value_meta')['metering.prn_name'], + 'aaa0001', + "Failed to extract a value " + "using specified jsonpath.") + + def test_process_sample_metadata_with_jsonpath_nomatch(self): + """Test meter sample in a format produced by cinder. + + Behavior when no matching element is found for the specified jsonpath + """ + + s = sample.Sample( + name='volume.create.end', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + source='', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'event_type': 'volume.create.end', + 'status': 'available', + 'volume_type': None, + # 'created_at': '2017-03-21T21:05:44+00:00', + 'host': 'testhost', + 'metadata': [{'value': 'aaa0001', + 'key': 'metering.THISWONTMATCH'}], + 'size': 0}, + ) + + to_patch = ("ceilometer.publisher.monasca_data_filter." + "MonascaDataFilter._get_mapping") + # use the cinder specific mapping + with mock.patch(to_patch, side_effect=[self._field_mappings_cinder]): + data_filter = mdf.MonascaDataFilter(self.CONF) + r = data_filter.process_sample_for_monasca(s) + + self.assertEqual(s.name, r['name']) + self.assertIsNotNone(r.get('value_meta')) + # Using convert_dict_to_list is too simplistic for this + self.assertEqual(r.get('value_meta')['event_type'], + s.resource_metadata.get('event_type'), + "Failed to match common element.") + self.assertEqual(r.get('value_meta')['host'], + s.resource_metadata.get('host'), + "Failed to match meter specific element.") + self.assertEqual(r.get('value_meta')['size'], + s.resource_metadata.get('size'), + "Unable to handle an int.") + self.assertEqual(r.get('value_meta')['metering.prn_name'], + 'None', "This metadata should fail to match " + "and then return 'None'.") + + def test_process_sample_metadata_with_jsonpath_value_not_str(self): + """Test where jsonpath is used but result is not a simple string""" + + s = sample.Sample( + name='volume.create.end', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + source='', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'event_type': 'volume.create.end', + 'status': 'available', + 'volume_type': None, + # 'created_at': '2017-03-21T21:05:44+00:00', + 'host': 'testhost', + 'metadata': [{'value': ['aaa0001', 'bbbb002'], + 'key': 'metering.prn_name'}], + 'size': 0}, + ) + + to_patch = ("ceilometer.publisher.monasca_data_filter." + "MonascaDataFilter._get_mapping") + # use the cinder specific mapping + with mock.patch(to_patch, side_effect=[self._field_mappings_cinder]): + data_filter = mdf.MonascaDataFilter(self.CONF) + try: + # Don't assign to a variable, this should raise + data_filter.process_sample_for_monasca(s) + except mdf.CeiloscaMappingDefinitionException as e: + self.assertEqual( + 'Metadata format mismatch, value should be ' + 'a simple string. [\'aaa0001\', \'bbbb002\']', + e.message) + + def test_process_sample_metadata_with_jsonpath_value_is_int(self): + """Test meter sample where jsonpath result is an int.""" + + s = sample.Sample( + name='volume.create.end', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + source='', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'event_type': 'volume.create.end', + 'status': 'available', + 'volume_type': None, + # 'created_at': '2017-03-21T21:05:44+00:00', + 'host': 'testhost', + 'metadata': [{'value': 13, + 'key': 'metering.prn_name'}], + 'size': 0}, + ) + + to_patch = ("ceilometer.publisher.monasca_data_filter." + "MonascaDataFilter._get_mapping") + # use the cinder specific mapping + with mock.patch(to_patch, side_effect=[self._field_mappings_cinder]): + data_filter = mdf.MonascaDataFilter(self.CONF) + r = data_filter.process_sample_for_monasca(s) + + self.assertEqual(s.name, r['name']) + self.assertIsNotNone(r.get('value_meta')) + # Using convert_dict_to_list is too simplistic for this + self.assertEqual(r.get('value_meta')['event_type'], + s.resource_metadata.get('event_type'), + "Failed to match common element.") + self.assertEqual(r.get('value_meta')['host'], + s.resource_metadata.get('host'), + "Failed to match meter specific element.") + self.assertEqual(r.get('value_meta')['size'], + s.resource_metadata.get('size'), + "Unable to handle an int.") + self.assertEqual(r.get('value_meta')['metering.prn_name'], + 13, + "Unable to handle an int " + "through the jsonpath processing") + + def test_process_sample_metadata_with_jsonpath_bad_format(self): + """Test handling of definition that is not written correctly""" + + s = sample.Sample( + name='volume.create.end', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + source='', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'event_type': 'volume.create.end', + 'status': 'available', + 'volume_type': None, + # 'created_at': '2017-03-21T21:05:44+00:00', + 'host': 'testhost', + 'metadata': [{'value': 13, + 'key': 'metering.prn_name'}], + 'size': 0}, + ) + + to_patch = ("ceilometer.publisher.monasca_data_filter." + "MonascaDataFilter._get_mapping") + # use the bad mapping + with mock.patch(to_patch, + side_effect=[self._field_mappings_bad_format]): + data_filter = mdf.MonascaDataFilter(self.CONF) + try: + # Don't assign to a variable as this should raise + data_filter.process_sample_for_monasca(s) + except mdf.CeiloscaMappingDefinitionException as e: + # Make sure we got the right kind of error + # Cannot check the whole message text, as python + # may reorder a dict when producing a string version + self.assertIn( + 'Field definition format mismatch, should ' + 'have only one key:value pair.', + e.message, + "Did raise exception but wrong message - %s" % e.message) diff --git a/ceilometer/tests/unit/publisher/test_monasca_publisher.py b/ceilometer/tests/unit/publisher/test_monasca_publisher.py new file mode 100755 index 0000000000..60095c27b4 --- /dev/null +++ b/ceilometer/tests/unit/publisher/test_monasca_publisher.py @@ -0,0 +1,205 @@ +# +# Copyright 2015 Hewlett Packard +# (c) Copyright 2018 SUSE LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Tests for ceilometer/publisher/monasca.py +""" + +import datetime +import fixtures +import time + +import mock +from oslotest import base + +from ceilometer import monasca_client as mon_client +from ceilometer.publisher import monasca as mon_publisher +from ceilometer import sample +from ceilometer import service + + +class FakeResponse(object): + def __init__(self, status_code): + self.status_code = status_code + + +class TestMonascaPublisher(base.BaseTestCase): + + test_data = [ + sample.Sample( + name='test', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'name': 'TestPublish'}, + ), + sample.Sample( + name='test2', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'name': 'TestPublish'}, + ), + sample.Sample( + name='test2', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'name': 'TestPublish'}, + ), + ] + + field_mappings = { + 'dimensions': ['resource_id', + 'project_id', + 'user_id', + 'geolocation', + 'region', + 'availability_zone'], + + 'metadata': { + 'common': ['event_type', + 'audit_period_beginning', + 'audit_period_ending'], + 'image': ['size', 'status'], + 'image.delete': ['size', 'status'], + 'image.size': ['size', 'status'], + 'image.update': ['size', 'status'], + 'image.upload': ['size', 'status'], + 'instance': ['state', 'state_description'], + 'snapshot': ['status'], + 'snapshot.size': ['status'], + 'volume': ['status'], + 'volume.size': ['status'], + } + } + + @staticmethod + def create_side_effect(exception_type, test_exception): + def side_effect(*args, **kwargs): + if test_exception.pop(): + raise exception_type + else: + return FakeResponse(204) + return side_effect + + def setUp(self): + super(TestMonascaPublisher, self).setUp() + + self.CONF = service.prepare_service([], []) + self.parsed_url = mock.MagicMock() + + def tearDown(self): + # For some reason, cfg.CONF is registered a required option named + # auth_url after these tests run, which occasionally blocks test + # case test_event_pipeline_endpoint_requeue_on_failure, so we + # unregister it here. + self.CONF.reset() + # self.CONF.unregister_opt(cfg.StrOpt('service_auth_url'), + # group='monasca') + super(TestMonascaPublisher, self).tearDown() + + @mock.patch("ceilometer.publisher.monasca_data_filter." + "MonascaDataFilter._get_mapping", + side_effect=[field_mappings]) + def test_publisher_publish(self, mapping_patch): + self.CONF.set_override('batch_mode', False, group='monasca') + publisher = mon_publisher.MonascaPublisher(self.CONF, self.parsed_url) + publisher.mon_client = mock.MagicMock() + + with mock.patch.object(publisher.mon_client, + 'metrics_create') as mock_create: + mock_create.return_value = FakeResponse(204) + publisher.publish_samples(self.test_data) + self.assertEqual(3, mock_create.call_count) + self.assertEqual(1, mapping_patch.called) + + @mock.patch("ceilometer.publisher.monasca_data_filter." + "MonascaDataFilter._get_mapping", + side_effect=[field_mappings]) + def test_publisher_batch(self, mapping_patch): + self.CONF.set_override('batch_mode', True, group='monasca') + self.CONF.set_override('batch_count', 3, group='monasca') + self.CONF.set_override('batch_polling_interval', 1, group='monasca') + + publisher = mon_publisher.MonascaPublisher(self.CONF, self.parsed_url) + publisher.mon_client = mock.MagicMock() + with mock.patch.object(publisher.mon_client, + 'metrics_create') as mock_create: + mock_create.return_value = FakeResponse(204) + publisher.publish_samples(self.test_data) + time.sleep(10) + self.assertEqual(1, mock_create.call_count) + self.assertEqual(1, mapping_patch.called) + + @mock.patch("ceilometer.publisher.monasca_data_filter." + "MonascaDataFilter._get_mapping", + side_effect=[field_mappings]) + def test_publisher_batch_retry(self, mapping_patch): + self.CONF.set_override('batch_mode', True, group='monasca') + self.CONF.set_override('batch_count', 3, group='monasca') + self.CONF.set_override('batch_polling_interval', 1, group='monasca') + self.CONF.set_override('retry_on_failure', True, group='monasca') + # Constant in code for @periodicals, can't override + # self.CONF.set_override('retry_interval', 2, group='monasca') + self.CONF.set_override('batch_max_retries', 1, group='monasca') + + publisher = mon_publisher.MonascaPublisher(self.CONF, self.parsed_url) + publisher.mon_client = mock.MagicMock() + with mock.patch.object(publisher.mon_client, + 'metrics_create') as mock_create: + raise_http_error = [False, False, False, True] + mock_create.side_effect = self.create_side_effect( + mon_client.MonascaServiceException, + raise_http_error) + publisher.publish_samples(self.test_data) + time.sleep(60) + self.assertEqual(4, mock_create.call_count) + self.assertEqual(1, mapping_patch.called) + + @mock.patch("ceilometer.publisher.monasca_data_filter." + "MonascaDataFilter._get_mapping", + side_effect=[field_mappings]) + def test_publisher_archival_on_failure(self, mapping_patch): + self.CONF.set_override('archive_on_failure', True, group='monasca') + self.CONF.set_override('batch_mode', False, group='monasca') + self.fake_publisher = mock.Mock() + + self.useFixture(fixtures.MockPatch( + 'ceilometer.publisher.file.FilePublisher', + return_value=self.fake_publisher)) + + publisher = mon_publisher.MonascaPublisher(self.CONF, + self.parsed_url) + publisher.mon_client = mock.MagicMock() + + with mock.patch.object(publisher.mon_client, + 'metrics_create') as mock_create: + mock_create.side_effect = Exception + metrics_archiver = self.fake_publisher.publish_samples + publisher.publish_samples(self.test_data) + self.assertEqual(1, metrics_archiver.called) + self.assertEqual(3, metrics_archiver.call_count) diff --git a/ceilometer/tests/unit/test_monascaclient.py b/ceilometer/tests/unit/test_monascaclient.py new file mode 100644 index 0000000000..33ce536ff3 --- /dev/null +++ b/ceilometer/tests/unit/test_monascaclient.py @@ -0,0 +1,148 @@ +# Copyright 2015 Hewlett-Packard Company +# (c) Copyright 2018 SUSE LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import mock + +from oslo_utils import netutils +from oslotest import base + +from ceilometer import monasca_client +from ceilometer import service + +from monascaclient import exc +import tenacity + + +class TestMonascaClient(base.BaseTestCase): + def setUp(self): + super(TestMonascaClient, self).setUp() + + self.CONF = service.prepare_service([], []) + self.CONF.set_override('client_max_retries', 0, 'monasca') + self.mc = self._get_client() + + def tearDown(self): + # For some reason, cfg.CONF is registered a required option named + # auth_url after these tests run, which occasionally blocks test + # case test_event_pipeline_endpoint_requeue_on_failure, so we + # unregister it here. + self.CONF.reset() + # self.CONF.unregister_opt(cfg.StrOpt('service_auth_url'), + # group='monasca') + super(TestMonascaClient, self).tearDown() + + @mock.patch('monascaclient.client.Client') + def _get_client(self, monclient_mock): + return monasca_client.Client( + self.CONF, + netutils.urlsplit("http://127.0.0.1:8080")) + + @mock.patch('monascaclient.client.Client') + def test_client_url_correctness(self, monclient_mock): + mon_client = monasca_client.Client( + self.CONF, + netutils.urlsplit("monasca://https://127.0.0.1:8080")) + self.assertEqual("https://127.0.0.1:8080", mon_client._endpoint) + + def test_metrics_create(self): + with mock.patch.object(self.mc._mon_client.metrics, 'create', + side_effect=[True]) as create_patch: + self.mc.metrics_create() + + self.assertEqual(1, create_patch.call_count) + + def test_metrics_create_exception(self): + with mock.patch.object( + self.mc._mon_client.metrics, 'create', + side_effect=[exc.http.InternalServerError, True])\ + as create_patch: + e = self.assertRaises(tenacity.RetryError, + self.mc.metrics_create) + original_ex = e.last_attempt.exception() + self.assertIsInstance(original_ex, + monasca_client.MonascaServiceException) + self.assertEqual(1, create_patch.call_count) + + def test_metrics_create_unprocessable_exception(self): + with mock.patch.object( + self.mc._mon_client.metrics, 'create', + side_effect=[exc.http.UnprocessableEntity, True])\ + as create_patch: + self.assertRaises(monasca_client.MonascaInvalidParametersException, + self.mc.metrics_create) + self.assertEqual(1, create_patch.call_count) + + def test_no_retry_on_invalid_parameter(self): + self.CONF.set_override('client_max_retries', 2, 'monasca') + self.CONF.set_override('client_retry_interval', 1, 'monasca') + self.mc = self._get_client() + + def _check(exception): + expected_exc = monasca_client.MonascaInvalidParametersException + with mock.patch.object( + self.mc._mon_client.metrics, 'create', + side_effect=[exception, True] + ) as mocked_metrics_list: + self.assertRaises(expected_exc, self.mc.metrics_create) + self.assertEqual(1, mocked_metrics_list.call_count) + + _check(exc.http.UnprocessableEntity) + _check(exc.http.BadRequest) + + def test_max_retries_not_too_much(self): + def _check(configured, expected): + self.CONF.set_override('client_max_retries', configured, + 'monasca') + self.mc = self._get_client() + self.assertEqual(expected, self.mc._max_retries) + + _check(-1, 10) + _check(11, 10) + _check(5, 5) + _check(None, 1) + + def test_meaningful_exception_message(self): + with mock.patch.object( + self.mc._mon_client.metrics, 'create', + side_effect=[exc.http.InternalServerError, + exc.http.UnprocessableEntity, + KeyError]): + e = self.assertRaises( + tenacity.RetryError, + self.mc.metrics_create) + original_ex = e.last_attempt.exception() + self.assertIn('Monasca service is unavailable', + str(original_ex)) + + e = self.assertRaises( + monasca_client.MonascaInvalidParametersException, + self.mc.metrics_create) + self.assertIn('Request cannot be handled by Monasca', + str(e)) + + e = self.assertRaises( + tenacity.RetryError, + self.mc.metrics_create) + original_ex = e.last_attempt.exception() + self.assertIn('An exception is raised from Monasca', + str(original_ex)) + + @mock.patch.object(monasca_client.Client, '_get_client') + def test_metrics_create_with_401(self, rc_patch): + with mock.patch.object( + self.mc._mon_client.metrics, 'create', + side_effect=[exc.http.Unauthorized, True]): + self.assertRaises( + monasca_client.MonascaInvalidParametersException, + self.mc.metrics_create) diff --git a/doc/source/admin/telemetry-data-pipelines.rst b/doc/source/admin/telemetry-data-pipelines.rst index 30366d0489..48547ff675 100644 --- a/doc/source/admin/telemetry-data-pipelines.rst +++ b/doc/source/admin/telemetry-data-pipelines.rst @@ -213,6 +213,49 @@ The following customization options are available: default topic defined by ``metering_topic`` and ``event_topic`` options. This option can be used to support multiple consumers. +monasca +``````` + +The monasca publisher can be used to send measurements to the Monasca API, +where it will be stored with other metrics gathered by Monasca Agent. Data +is accessible through the Monasca API and be transformed like other Monasca +metrics. + +The pipeline sink is specified with a ``publishers:`` element of the form +``- monasca://https:///metrics/v2.0`` + +Monasca API connection information is configured in the ceilometer.conf +file in a [monasca] section:: + + [monasca] + auth_section = monasca_auth + enable_api_pagination = True + client_retry_interval = 60 + client_max_retries = 5 + monasca_mappings = + + [monasca_auth] + auth_url = https:///identity + auth_type = password + username = + password = + project_name = + project_domain_id = + user_domain_id = + verify = + region_name = + + +..note:: + The username specified should be for a Keystone user that has the + ``monasca_agent`` or ``monasca_user`` role enabled. For management purposes, + this may be the ceilometer user if the appropriate role is granted. + +For more detail and history of this publisher, see the +`Ceilosca Wiki `__ and +`monasca-ceilometer README +`__. + udp ``` diff --git a/doc/source/contributor/architecture.rst b/doc/source/contributor/architecture.rst index 832c1d1def..7971dd087a 100644 --- a/doc/source/contributor/architecture.rst +++ b/doc/source/contributor/architecture.rst @@ -164,7 +164,7 @@ Publishing the data This figure shows how a sample can be published to multiple destinations. -Currently, processed data can be published using 8 different transports: +Currently, processed data can be published using different transport options: 1. gnocchi, which publishes samples/events to Gnocchi API; 2. notifier, a notification based publisher which pushes samples to a message @@ -174,8 +174,9 @@ Currently, processed data can be published using 8 different transports: 5. file, which publishes samples to a file with specified name and location; 6. zaqar, a multi-tenant cloud messaging and notification service for web and mobile developers; -7. https, which is http over SSL and targets a REST interface. -8. prometheus, which publishes samples to Prometheus Pushgateway +7. https, which is http over SSL and targets a REST interface; +8. prometheus, which publishes samples to Prometheus Pushgateway; +9. monasca, which publishes samples to the Monasca API. Storing/Accessing the data diff --git a/etc/ceilometer/examples/monasca-publisher/example_ceilometer.conf b/etc/ceilometer/examples/monasca-publisher/example_ceilometer.conf new file mode 100644 index 0000000000..de9f345e25 --- /dev/null +++ b/etc/ceilometer/examples/monasca-publisher/example_ceilometer.conf @@ -0,0 +1,32 @@ +[DEFAULT] +debug = False + +### +# Skipped ceilometer-config-generator common sections +### + +[monasca] +auth_section = monasca_auth +monasca_mappings = /etc/ceilometer/monasca_field_definitions.yaml + +# NOTE: These are additional options which may be set +# retry_on_failure = False +# enable_api_pagination = True +# client_retry_interval = 60 +# client_max_retries = 5 + +[monasca_auth] +# NOTE: the values here may match the values in the [service_credentials] +# section +auth_url = https://:5000/v2.0 +password = +username = ceilometer +interface = internal +auth_type = password +# project_id may also be used +project_name = admin +project_domain_id = default +user_domain_id = default +region_name = RegionOne +# Specify a CA bundle to enable SSL connection +# verify = /somewhere/ca-bundle.pem diff --git a/etc/ceilometer/examples/monasca-publisher/example_pipeline.yaml b/etc/ceilometer/examples/monasca-publisher/example_pipeline.yaml new file mode 100644 index 0000000000..b86aefd78e --- /dev/null +++ b/etc/ceilometer/examples/monasca-publisher/example_pipeline.yaml @@ -0,0 +1,44 @@ +# An example pipeline.yaml +# Reference https://docs.openstack.org/ceilometer/latest/admin/telemetry-measurements.html +# for other possible meters. +--- +sources: + - name: compute_source + interval: 30 + meters: + - "memory" + - "vcpus" + sinks: + - meter_sink + - name: network_source + interval: 30 + meters: + - "bandwidth" + sinks: + - meter_sink + - name: image_source + interval: 30 + meters: + - "image.size" + sinks: + - meter_sink + - name: volume_source + interval: 30 + meters: + - "volume.size" + - "snapshot.size" + sinks: + - meter_sink + - name: swift_source + interval: 3600 + meters: + - "storage.objects" + - "storage.objects.size" + - "storage.objects.containers" + sinks: + - meter_sink +sinks: + - name: meter_sink + transformers: + publishers: + - monasca://https://:8070/v2.0 diff --git a/etc/ceilometer/examples/monasca-publisher/monasca_field_definitions.yaml b/etc/ceilometer/examples/monasca-publisher/monasca_field_definitions.yaml new file mode 100644 index 0000000000..5274101745 --- /dev/null +++ b/etc/ceilometer/examples/monasca-publisher/monasca_field_definitions.yaml @@ -0,0 +1,58 @@ +dimensions: + - resource_id + - project_id + - user_id + - region + - type + - unit + - source +metadata: + common: + - event_type + - audit_period_beginning + - audit_period_ending + - availability_zone + memory: + - state + - memory_mb + - root_gb + - vcpus + - disk_gb + - ephemeral_gb + - host + - instance_flavor_id + - image_ref_url + - created_at + - deleted_at + - launched_at + vcpus: + - state + - memory_mb + - root_gb + - vcpus + - disk_gb + - ephemeral_gb + - host + - instance_flavor_id + - image_ref_url + - created_at + - deleted_at + - launched_at + image.size: + - size + - status + - is_public + - properties.image_type + snapshot.size: + - volume_size + - state + - status + - created_at + - host + volume.size: + - size + - volume_type + - status + - created_at + - host + - status diff --git a/lower-constraints.txt b/lower-constraints.txt index da538f7bf5..91046ffdfa 100644 --- a/lower-constraints.txt +++ b/lower-constraints.txt @@ -33,6 +33,7 @@ pysnmp==4.2.3 python-cinderclient==3.3.0 python-glanceclient==2.8.0 python-keystoneclient==3.15.0 +python-monascaclient==1.12.0 python-neutronclient==6.7.0 python-novaclient==9.1.0 python-swiftclient==3.2.0 diff --git a/releasenotes/notes/include-monasca-publisher-1f47dde52af50feb.yaml b/releasenotes/notes/include-monasca-publisher-1f47dde52af50feb.yaml new file mode 100644 index 0000000000..871025a7c2 --- /dev/null +++ b/releasenotes/notes/include-monasca-publisher-1f47dde52af50feb.yaml @@ -0,0 +1,8 @@ +--- +features: + - | + Include a publisher for the Monasca API. A ``monasca://`` pipeline sink + will send data to a Monasca instance, using credentials configured in + ceilometer.conf. This functionality was previously available in the + Ceilosca project + (https://github.com/openstack/monasca-ceilometer). diff --git a/requirements.txt b/requirements.txt index 46edc35604..8fedb54b53 100644 --- a/requirements.txt +++ b/requirements.txt @@ -38,4 +38,5 @@ tooz[zake]>=1.47.0 # Apache-2.0 os-xenapi>=0.3.3 # Apache-2.0 oslo.cache>=1.26.0 # Apache-2.0 gnocchiclient>=7.0.0 # Apache-2.0 +python-monascaclient>=1.12.0 # Apache-2.0 python-zaqarclient>=1.3.0 # Apache-2.0 diff --git a/setup.cfg b/setup.cfg index 9488926e39..fba735beb1 100644 --- a/setup.cfg +++ b/setup.cfg @@ -207,6 +207,7 @@ ceilometer.sample.publisher = http = ceilometer.publisher.http:HttpPublisher prometheus = ceilometer.publisher.prometheus:PrometheusPublisher https = ceilometer.publisher.http:HttpPublisher + monasca = ceilometer.publisher.monasca:MonascaPublisher gnocchi = ceilometer.publisher.gnocchi:GnocchiPublisher zaqar = ceilometer.publisher.zaqar:ZaqarPublisher