diff --git a/ceilometer/publisher/http.py b/ceilometer/publisher/http.py index e57370e319..e107fa9adf 100644 --- a/ceilometer/publisher/http.py +++ b/ceilometer/publisher/http.py @@ -69,6 +69,8 @@ class HttpPublisher(publisher.ConfigPublisherBase): """ + HEADERS = {'Content-type': 'application/json'} + def __init__(self, conf, parsed_url): super(HttpPublisher, self).__init__(conf, parsed_url) @@ -81,14 +83,12 @@ class HttpPublisher(publisher.ConfigPublisherBase): # is valid, if not, ValueError will be thrown. parsed_url.port - self.headers = {'Content-type': 'application/json'} - # Handling other configuration options in the query string params = urlparse.parse_qs(parsed_url.query) self.timeout = self._get_param(params, 'timeout', 5, int) self.max_retries = self._get_param(params, 'max_retries', 2, int) self.poster = ( - self._do_post if strutils.bool_from_string(self._get_param( + self._batch_post if strutils.bool_from_string(self._get_param( params, 'batch', True)) else self._individual_post) verify_ssl = self._get_param(params, 'verify_ssl', True) try: @@ -124,10 +124,20 @@ class HttpPublisher(publisher.ConfigPublisherBase): 'pool_maxsize': conf.max_parallel_requests} self.session = requests.Session() + if parsed_url.scheme in ["http", "https"]: + scheme = parsed_url.scheme + else: + ssl = self._get_param(params, 'ssl', False) + try: + ssl = strutils.bool_from_string(ssl, strict=True) + except ValueError: + ssl = (ssl or False) + scheme = "https" if ssl else "http" + # authentication & config params have been removed, so use URL with # updated query string self.target = urlparse.urlunsplit([ - parsed_url.scheme, + scheme, netloc, parsed_url.path, urlparse.urlencode(params), @@ -149,17 +159,19 @@ class HttpPublisher(publisher.ConfigPublisherBase): def _individual_post(self, data): for d in data: - self._do_post(d) + self._do_post(json.dumps(data)) - def _do_post(self, data): + def _batch_post(self, data): if not data: LOG.debug('Data set is empty!') return - data = json.dumps(data) + self._do_post(json.dumps(data)) + + def _do_post(self, data): LOG.trace('Message: %s', data) try: res = self.session.post(self.target, data=data, - headers=self.headers, timeout=self.timeout, + headers=self.HEADERS, timeout=self.timeout, auth=self.client_auth, cert=self.client_cert, verify=self.verify_ssl) diff --git a/ceilometer/publisher/prometheus.py b/ceilometer/publisher/prometheus.py new file mode 100644 index 0000000000..36f13a5edf --- /dev/null +++ b/ceilometer/publisher/prometheus.py @@ -0,0 +1,78 @@ +# +# Copyright 2016 IBM +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from ceilometer.publisher import http +from ceilometer import sample + + +class PrometheusPublisher(http.HttpPublisher): + """Publish metering data to Prometheus Pushgateway endpoint + + This dispatcher inherits from all options of the http dispatcher. + + To use this publisher for samples, add the following section to the + /etc/ceilometer/pipeline.yaml file or simply add it to an existing + pipeline:: + + - name: meter_file + meters: + - "*" + publishers: + - prometheus://mypushgateway/metrics/job/ceilometer + + """ + + HEADERS = {'Content-type': 'plain/text'} + + def publish_samples(self, samples): + """Send a metering message for publishing + + :param samples: Samples from pipeline after transformation + """ + if not samples: + return + + data = "" + doc_done = set() + for s in samples: + # NOTE(sileht): delta can't be converted into prometheus data + # format so don't set the metric type for it + metric_type = None + if s.type == sample.TYPE_CUMULATIVE: + metric_type = "counter" + elif s.type == sample.TYPE_GAUGE: + metric_type = "gauge" + + if metric_type and s.name not in doc_done: + data += "# TYPE %s %s\n" % (s.name, metric_type) + doc_done.add(s.name) + + # NOTE(sileht): prometheus pushgateway doesn't allow to push + # timestamp_ms + # + # timestamp_ms = ( + # s.get_iso_timestamp().replace(tzinfo=None) - + # datetime.utcfromtimestamp(0) + # ).total_seconds() * 1000 + # data += '%s{resource_id="%s"} %s %d\n' % ( + # s.name, s.resource_id, s.volume, timestamp_ms) + + data += '%s{resource_id="%s"} %s\n' % ( + s.name, s.resource_id, s.volume) + self._do_post(data) + + @staticmethod + def publish_events(events): + raise NotImplementedError diff --git a/ceilometer/tests/unit/publisher/test_prometheus.py b/ceilometer/tests/unit/publisher/test_prometheus.py new file mode 100644 index 0000000000..dc0f4ecf65 --- /dev/null +++ b/ceilometer/tests/unit/publisher/test_prometheus.py @@ -0,0 +1,132 @@ +# +# Copyright 2016 IBM +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Tests for ceilometer/publisher/prometheus.py +""" + +import datetime +import mock +from oslotest import base +import requests +from six.moves.urllib import parse as urlparse +import uuid + +from ceilometer.publisher import prometheus +from ceilometer import sample +from ceilometer import service + + +class TestPrometheusPublisher(base.BaseTestCase): + + resource_id = str(uuid.uuid4()) + + sample_data = [ + sample.Sample( + name='alpha', + type=sample.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id=resource_id, + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'name': 'TestPublish'}, + ), + sample.Sample( + name='beta', + type=sample.TYPE_DELTA, + unit='', + volume=3, + user_id='test', + project_id='test', + resource_id=resource_id, + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'name': 'TestPublish'}, + ), + sample.Sample( + name='gamma', + type=sample.TYPE_GAUGE, + unit='', + volume=5, + user_id='test', + project_id='test', + resource_id=resource_id, + timestamp=datetime.datetime.now().isoformat(), + resource_metadata={'name': 'TestPublish'}, + ), + ] + + def setUp(self): + super(TestPrometheusPublisher, self).setUp() + self.CONF = service.prepare_service([], []) + + def test_post_samples(self): + """Test publisher post.""" + parsed_url = urlparse.urlparse( + 'prometheus://localhost:90/metrics/job/os') + publisher = prometheus.PrometheusPublisher(self.CONF, parsed_url) + + res = requests.Response() + res.status_code = 200 + with mock.patch.object(requests.Session, 'post', + return_value=res) as m_req: + publisher.publish_samples(self.sample_data) + + data = """# TYPE alpha counter +alpha{resource_id="%s"} 1 +beta{resource_id="%s"} 3 +# TYPE gamma gauge +gamma{resource_id="%s"} 5 +""" % (self.resource_id, self.resource_id, self.resource_id) + + expected = [ + mock.call('http://localhost:90/metrics/job/os', + auth=None, + cert=None, + data=data, + headers={'Content-type': 'plain/text'}, + timeout=5, + verify=True) + ] + self.assertEqual(expected, m_req.mock_calls) + + def test_post_samples_ssl(self): + """Test publisher post.""" + parsed_url = urlparse.urlparse( + 'prometheus://localhost:90/metrics/job/os?ssl=1') + publisher = prometheus.PrometheusPublisher(self.CONF, parsed_url) + + res = requests.Response() + res.status_code = 200 + with mock.patch.object(requests.Session, 'post', + return_value=res) as m_req: + publisher.publish_samples(self.sample_data) + + data = """# TYPE alpha counter +alpha{resource_id="%s"} 1 +beta{resource_id="%s"} 3 +# TYPE gamma gauge +gamma{resource_id="%s"} 5 +""" % (self.resource_id, self.resource_id, self.resource_id) + + expected = [ + mock.call('https://localhost:90/metrics/job/os', + auth=None, + cert=None, + data=data, + headers={'Content-type': 'plain/text'}, + timeout=5, + verify=True) + ] + self.assertEqual(expected, m_req.mock_calls) diff --git a/doc/source/admin/telemetry-data-pipelines.rst b/doc/source/admin/telemetry-data-pipelines.rst index ebcac0c548..2f0a311a02 100644 --- a/doc/source/admin/telemetry-data-pipelines.rst +++ b/doc/source/admin/telemetry-data-pipelines.rst @@ -396,6 +396,29 @@ service. More details on how to enable and configure gnocchi can be found on its `official documentation page `__. +prometheus +`````````` + +Metering data can be send to the `pushgateway +`__ of Prometheus by using: + +``prometheus://pushgateway-host:9091/metrics/job/openstack-telemetry`` + +With this publisher, timestamp are not sent to Prometheus due to Prometheus +Pushgateway design. All timestamps are set at the time it scrapes the metrics +from the Pushgateway and not when the metric was polled on the OpenStack +services. + +In order to get timeseries in Prometheus that looks like the reality (but with +the lag added by the Prometheus scrapping mechanism). The `scrape_interval` for +the pushgateway must be lower and a multiple of the Ceilometer polling +interval. + +You can read more `here `__ + +Due to this, this is not recommended to use this publisher for billing purpose +as timestamps in Prometheus will not be exact. + panko ````` diff --git a/doc/source/contributor/architecture.rst b/doc/source/contributor/architecture.rst index 249208675d..dfb367239a 100644 --- a/doc/source/contributor/architecture.rst +++ b/doc/source/contributor/architecture.rst @@ -185,7 +185,7 @@ Publishing the data This figure shows how a sample can be published to multiple destinations. -Currently, processed data can be published using 7 different transports: +Currently, processed data can be published using 8 different transports: 1. gnocchi, which publishes samples/events to Gnocchi API; 2. notifier, a notification based publisher which pushes samples to a message @@ -196,6 +196,7 @@ Currently, processed data can be published using 7 different transports: 6. zaqar, a multi-tenant cloud messaging and notification service for web and mobile developers; 7. https, which is http over SSL and targets a REST interface. +8. prometheus, which publishes samples to Prometheus Pushgateway Storing/Accessing the data diff --git a/releasenotes/notes/prometheus-bcb201cfe46d5778.yaml b/releasenotes/notes/prometheus-bcb201cfe46d5778.yaml new file mode 100644 index 0000000000..73a228b6c2 --- /dev/null +++ b/releasenotes/notes/prometheus-bcb201cfe46d5778.yaml @@ -0,0 +1,4 @@ +--- +features: + - | + A new pulisher have been added to push data to Prometheus Pushgateway. diff --git a/setup.cfg b/setup.cfg index abef55e5ef..ea9cc16200 100644 --- a/setup.cfg +++ b/setup.cfg @@ -236,6 +236,7 @@ ceilometer.sample.publisher = udp = ceilometer.publisher.udp:UDPPublisher file = ceilometer.publisher.file:FilePublisher http = ceilometer.publisher.http:HttpPublisher + prometheus = ceilometer.publisher.prometheus:PrometheusPublisher https = ceilometer.publisher.http:HttpPublisher gnocchi = ceilometer.publisher.gnocchi:GnocchiPublisher zaqar = ceilometer.publisher.zaqar:ZaqarPublisher