publisher: add a Prometheus Pushgateway publisher

This change adds a publisher to push metrics to Prometheus Pushgateway.

Change-Id: I7d5f39f036714ac1e36d3297ad58dd8498908a53
This commit is contained in:
Mehdi Abaakouk 2018-04-05 15:46:58 +02:00
parent e9b7abc871
commit 2b8052052d
7 changed files with 260 additions and 9 deletions

View File

@ -69,6 +69,8 @@ class HttpPublisher(publisher.ConfigPublisherBase):
"""
HEADERS = {'Content-type': 'application/json'}
def __init__(self, conf, parsed_url):
super(HttpPublisher, self).__init__(conf, parsed_url)
@ -81,14 +83,12 @@ class HttpPublisher(publisher.ConfigPublisherBase):
# is valid, if not, ValueError will be thrown.
parsed_url.port
self.headers = {'Content-type': 'application/json'}
# Handling other configuration options in the query string
params = urlparse.parse_qs(parsed_url.query)
self.timeout = self._get_param(params, 'timeout', 5, int)
self.max_retries = self._get_param(params, 'max_retries', 2, int)
self.poster = (
self._do_post if strutils.bool_from_string(self._get_param(
self._batch_post if strutils.bool_from_string(self._get_param(
params, 'batch', True)) else self._individual_post)
verify_ssl = self._get_param(params, 'verify_ssl', True)
try:
@ -124,10 +124,20 @@ class HttpPublisher(publisher.ConfigPublisherBase):
'pool_maxsize': conf.max_parallel_requests}
self.session = requests.Session()
if parsed_url.scheme in ["http", "https"]:
scheme = parsed_url.scheme
else:
ssl = self._get_param(params, 'ssl', False)
try:
ssl = strutils.bool_from_string(ssl, strict=True)
except ValueError:
ssl = (ssl or False)
scheme = "https" if ssl else "http"
# authentication & config params have been removed, so use URL with
# updated query string
self.target = urlparse.urlunsplit([
parsed_url.scheme,
scheme,
netloc,
parsed_url.path,
urlparse.urlencode(params),
@ -149,17 +159,19 @@ class HttpPublisher(publisher.ConfigPublisherBase):
def _individual_post(self, data):
for d in data:
self._do_post(d)
self._do_post(json.dumps(data))
def _do_post(self, data):
def _batch_post(self, data):
if not data:
LOG.debug('Data set is empty!')
return
data = json.dumps(data)
self._do_post(json.dumps(data))
def _do_post(self, data):
LOG.trace('Message: %s', data)
try:
res = self.session.post(self.target, data=data,
headers=self.headers, timeout=self.timeout,
headers=self.HEADERS, timeout=self.timeout,
auth=self.client_auth,
cert=self.client_cert,
verify=self.verify_ssl)

View File

@ -0,0 +1,78 @@
#
# Copyright 2016 IBM
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from ceilometer.publisher import http
from ceilometer import sample
class PrometheusPublisher(http.HttpPublisher):
"""Publish metering data to Prometheus Pushgateway endpoint
This dispatcher inherits from all options of the http dispatcher.
To use this publisher for samples, add the following section to the
/etc/ceilometer/pipeline.yaml file or simply add it to an existing
pipeline::
- name: meter_file
meters:
- "*"
publishers:
- prometheus://mypushgateway/metrics/job/ceilometer
"""
HEADERS = {'Content-type': 'plain/text'}
def publish_samples(self, samples):
"""Send a metering message for publishing
:param samples: Samples from pipeline after transformation
"""
if not samples:
return
data = ""
doc_done = set()
for s in samples:
# NOTE(sileht): delta can't be converted into prometheus data
# format so don't set the metric type for it
metric_type = None
if s.type == sample.TYPE_CUMULATIVE:
metric_type = "counter"
elif s.type == sample.TYPE_GAUGE:
metric_type = "gauge"
if metric_type and s.name not in doc_done:
data += "# TYPE %s %s\n" % (s.name, metric_type)
doc_done.add(s.name)
# NOTE(sileht): prometheus pushgateway doesn't allow to push
# timestamp_ms
#
# timestamp_ms = (
# s.get_iso_timestamp().replace(tzinfo=None) -
# datetime.utcfromtimestamp(0)
# ).total_seconds() * 1000
# data += '%s{resource_id="%s"} %s %d\n' % (
# s.name, s.resource_id, s.volume, timestamp_ms)
data += '%s{resource_id="%s"} %s\n' % (
s.name, s.resource_id, s.volume)
self._do_post(data)
@staticmethod
def publish_events(events):
raise NotImplementedError

View File

@ -0,0 +1,132 @@
#
# Copyright 2016 IBM
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for ceilometer/publisher/prometheus.py
"""
import datetime
import mock
from oslotest import base
import requests
from six.moves.urllib import parse as urlparse
import uuid
from ceilometer.publisher import prometheus
from ceilometer import sample
from ceilometer import service
class TestPrometheusPublisher(base.BaseTestCase):
resource_id = str(uuid.uuid4())
sample_data = [
sample.Sample(
name='alpha',
type=sample.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id=resource_id,
timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'name': 'TestPublish'},
),
sample.Sample(
name='beta',
type=sample.TYPE_DELTA,
unit='',
volume=3,
user_id='test',
project_id='test',
resource_id=resource_id,
timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'name': 'TestPublish'},
),
sample.Sample(
name='gamma',
type=sample.TYPE_GAUGE,
unit='',
volume=5,
user_id='test',
project_id='test',
resource_id=resource_id,
timestamp=datetime.datetime.now().isoformat(),
resource_metadata={'name': 'TestPublish'},
),
]
def setUp(self):
super(TestPrometheusPublisher, self).setUp()
self.CONF = service.prepare_service([], [])
def test_post_samples(self):
"""Test publisher post."""
parsed_url = urlparse.urlparse(
'prometheus://localhost:90/metrics/job/os')
publisher = prometheus.PrometheusPublisher(self.CONF, parsed_url)
res = requests.Response()
res.status_code = 200
with mock.patch.object(requests.Session, 'post',
return_value=res) as m_req:
publisher.publish_samples(self.sample_data)
data = """# TYPE alpha counter
alpha{resource_id="%s"} 1
beta{resource_id="%s"} 3
# TYPE gamma gauge
gamma{resource_id="%s"} 5
""" % (self.resource_id, self.resource_id, self.resource_id)
expected = [
mock.call('http://localhost:90/metrics/job/os',
auth=None,
cert=None,
data=data,
headers={'Content-type': 'plain/text'},
timeout=5,
verify=True)
]
self.assertEqual(expected, m_req.mock_calls)
def test_post_samples_ssl(self):
"""Test publisher post."""
parsed_url = urlparse.urlparse(
'prometheus://localhost:90/metrics/job/os?ssl=1')
publisher = prometheus.PrometheusPublisher(self.CONF, parsed_url)
res = requests.Response()
res.status_code = 200
with mock.patch.object(requests.Session, 'post',
return_value=res) as m_req:
publisher.publish_samples(self.sample_data)
data = """# TYPE alpha counter
alpha{resource_id="%s"} 1
beta{resource_id="%s"} 3
# TYPE gamma gauge
gamma{resource_id="%s"} 5
""" % (self.resource_id, self.resource_id, self.resource_id)
expected = [
mock.call('https://localhost:90/metrics/job/os',
auth=None,
cert=None,
data=data,
headers={'Content-type': 'plain/text'},
timeout=5,
verify=True)
]
self.assertEqual(expected, m_req.mock_calls)

View File

@ -396,6 +396,29 @@ service.
More details on how to enable and configure gnocchi can be found on its
`official documentation page <http://gnocchi.xyz>`__.
prometheus
``````````
Metering data can be send to the `pushgateway
<https://github.com/prometheus/pushgateway>`__ of Prometheus by using:
``prometheus://pushgateway-host:9091/metrics/job/openstack-telemetry``
With this publisher, timestamp are not sent to Prometheus due to Prometheus
Pushgateway design. All timestamps are set at the time it scrapes the metrics
from the Pushgateway and not when the metric was polled on the OpenStack
services.
In order to get timeseries in Prometheus that looks like the reality (but with
the lag added by the Prometheus scrapping mechanism). The `scrape_interval` for
the pushgateway must be lower and a multiple of the Ceilometer polling
interval.
You can read more `here <https://github.com/prometheus/pushgateway#about-timestamps>`__
Due to this, this is not recommended to use this publisher for billing purpose
as timestamps in Prometheus will not be exact.
panko
`````

View File

@ -185,7 +185,7 @@ Publishing the data
This figure shows how a sample can be published to multiple destinations.
Currently, processed data can be published using 7 different transports:
Currently, processed data can be published using 8 different transports:
1. gnocchi, which publishes samples/events to Gnocchi API;
2. notifier, a notification based publisher which pushes samples to a message
@ -196,6 +196,7 @@ Currently, processed data can be published using 7 different transports:
6. zaqar, a multi-tenant cloud messaging and notification service for web and
mobile developers;
7. https, which is http over SSL and targets a REST interface.
8. prometheus, which publishes samples to Prometheus Pushgateway
Storing/Accessing the data

View File

@ -0,0 +1,4 @@
---
features:
- |
A new pulisher have been added to push data to Prometheus Pushgateway.

View File

@ -236,6 +236,7 @@ ceilometer.sample.publisher =
udp = ceilometer.publisher.udp:UDPPublisher
file = ceilometer.publisher.file:FilePublisher
http = ceilometer.publisher.http:HttpPublisher
prometheus = ceilometer.publisher.prometheus:PrometheusPublisher
https = ceilometer.publisher.http:HttpPublisher
gnocchi = ceilometer.publisher.gnocchi:GnocchiPublisher
zaqar = ceilometer.publisher.zaqar:ZaqarPublisher