Add Prometheus Collector

* Add Prometheus Collector that supports Prometheus HTTP instant queries.

* Due to the design of Cloudkitty, only instant queries are supported as
  the time window used by Prometheus can exceed the one used by Cloudkitty,
  using range queries, especially if coupled with function that
  makes calculation back in time from a supplied timestamp
  (e.g. increase(), delta()).

Change-Id: I4ba137b0b079f5ae6bfb645372778698eaa391fc
This commit is contained in:
Martin CAMEY 2018-08-16 17:04:01 +02:00
parent 2d455849b1
commit 49d18e168a
5 changed files with 402 additions and 0 deletions

View File

@ -0,0 +1,178 @@
# -*- coding: utf-8 -*-
# Copyright 2018 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Martin CAMEY
#
from decimal import Decimal
from decimal import localcontext
from decimal import ROUND_HALF_UP
from oslo_config import cfg
from oslo_log import log
import requests
from voluptuous import All
from voluptuous import Length
from voluptuous import Required
from voluptuous import Schema
from cloudkitty import collector
from cloudkitty import utils as ck_utils
LOG = log.getLogger(__name__)
PROMETHEUS_COLLECTOR_OPTS = 'prometheus_collector'
pcollector_collector_opts = [
cfg.StrOpt(
'prometheus_url',
default='',
help='Prometheus service URL',
),
]
cfg.CONF.register_opts(pcollector_collector_opts, PROMETHEUS_COLLECTOR_OPTS)
CONF = cfg.CONF
PROMETHEUS_EXTRA_SCHEMA = {
Required('extra_args'): {
Required('query'): All(str, Length(min=1)),
}
}
class PrometheusClient(object):
@classmethod
def build_query(cls, source, query, start, end, period, metric_name):
"""Build PromQL instant queries."""
start = ck_utils.iso8601_from_timestamp(start)
end = ck_utils.iso8601_from_timestamp(end)
if '$period' in query:
try:
query = ck_utils.template_str_substitute(
query, {'period': str(period) + 's'},
)
except (KeyError, ValueError):
raise collector.NoDataCollected(
collector.collector_name,
metric_name
)
# Due to the design of Cloudkitty, only instant queries are supported.
# In that case 'time' equals 'end' and
# the window time is reprezented by the period.
return source + '/query?query=' + query + '&time=' + end
@classmethod
def get_data(cls, source, query, start, end, period, metric_name):
url = cls.build_query(
source,
query,
start,
end,
period,
metric_name,
)
return requests.get(url).json()
class PrometheusCollector(collector.BaseCollector):
collector_name = 'prometheus'
def __init__(self, transformers, **kwargs):
super(PrometheusCollector, self).__init__(transformers, **kwargs)
@staticmethod
def check_configuration(conf):
"""Check metrics configuration."""
conf = Schema(collector.CONF_BASE_SCHEMA)(conf)
metric_schema = Schema(collector.METRIC_BASE_SCHEMA).extend(
PROMETHEUS_EXTRA_SCHEMA,
)
output = {}
for metric_name, metric in conf['metrics'].items():
output[metric_name] = metric_schema(metric)
return output
def _format_data(self, metric_name, project_id, start, end, data):
"""Formats Prometheus data format to Cloudkitty data format.
Returns metadata, groupby, qty
"""
metadata = {}
for meta in self.conf[metric_name]['metadata']:
metadata[meta] = data['metric'][meta]
groupby = {}
for meta in self.conf[metric_name]['groupby']:
groupby[meta] = data['metric'].get(meta, '')
with localcontext() as ctx:
ctx.prec = 9
ctx.rounding = ROUND_HALF_UP
qty = ck_utils.convert_unit(
+Decimal(data['value'][1]),
self.conf[metric_name]['factor'],
self.conf[metric_name]['offset'],
)
return metadata, groupby, qty
def fetch_all(self, metric_name, start, end, project_id, q_filter=None):
"""Returns metrics to be valorized."""
# NOTE(mc): Remove potential trailing '/' to avoid
# url building problems
url = CONF.prometheus_collector.prometheus_url
if url.endswith('/'):
url = url[:-1]
res = PrometheusClient.get_data(
url,
self.conf[metric_name]['extra_args']['query'],
start,
end,
self.period,
metric_name,
)
# If the query returns an empty dataset,
# raise a NoDataCollected exception.
if not res['data']['result']:
raise collector.NoDataCollected(self.collector_name, metric_name)
formatted_resources = []
for item in res['data']['result']:
metadata, groupby, qty = self._format_data(
metric_name,
project_id,
start,
end,
item,
)
item = self.t_cloudkitty.format_item(
groupby,
metadata,
self.conf[metric_name]['unit'],
qty=qty,
)
formatted_resources.append(item)
return formatted_resources

View File

@ -0,0 +1,166 @@
# -*- coding: utf-8 -*-
# Copyright 2018 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Martin CAMEY
#
from decimal import Decimal
import mock
from cloudkitty import collector
from cloudkitty.collector import prometheus
from cloudkitty import tests
from cloudkitty.tests import samples
from cloudkitty import transformer
class PrometheusCollectorTest(tests.TestCase):
def setUp(self):
super(PrometheusCollectorTest, self).setUp()
self._tenant_id = samples.TENANT
args = {
'period': 3600,
'conf': {
'metrics': {
'http_requests_total': {
'unit': 'instance',
'extra_args': {
'query': 'http_request_total[$period]',
},
},
}
}
}
transformers = transformer.get_transformers()
self.collector = prometheus.PrometheusCollector(transformers, **args)
def test_format_data_instant_query(self):
expected = ({}, {}, Decimal('7'))
params = {
'metric_name': 'http_requests_total',
'project_id': self._tenant_id,
'start': samples.FIRST_PERIOD_BEGIN,
'end': samples.FIRST_PERIOD_END,
'data': samples.PROMETHEUS_RESP_INSTANT_QUERY['data']['result'][0],
}
actual = self.collector._format_data(**params)
self.assertEqual(expected, actual)
def test_format_data_instant_query_2(self):
expected = ({}, {}, Decimal('42'))
params = {
'metric_name': 'http_requests_total',
'project_id': self._tenant_id,
'start': samples.FIRST_PERIOD_BEGIN,
'end': samples.FIRST_PERIOD_END,
'data': samples.PROMETHEUS_RESP_INSTANT_QUERY['data']['result'][1],
}
actual = self.collector._format_data(**params)
self.assertEqual(expected, actual)
def test_format_retrieve(self):
expected = {
'http_requests_total': [
{
'desc': {},
'groupby': {},
'metadata': {},
'vol': {
'qty': Decimal('7'),
'unit': 'instance'
}
},
{
'desc': {},
'groupby': {},
'metadata': {},
'vol': {
'qty': Decimal('42'),
'unit': 'instance'
}
}
]
}
no_response = mock.patch(
'cloudkitty.collector.prometheus.PrometheusClient.get_data',
return_value=samples.PROMETHEUS_RESP_INSTANT_QUERY,
)
with no_response:
actual = self.collector.retrieve(
metric_name='http_requests_total',
start=samples.FIRST_PERIOD_BEGIN,
end=samples.FIRST_PERIOD_END,
project_id=samples.TENANT,
q_filter=None,
)
self.assertEqual(expected, actual)
def test_format_retrieve_raise_NoDataCollected(self):
no_response = mock.patch(
'cloudkitty.collector.prometheus.PrometheusClient.get_data',
return_value=samples.PROMETHEUS_EMPTY_RESP_INSTANT_QUERY,
)
with no_response:
self.assertRaises(
collector.NoDataCollected,
self.collector.retrieve,
metric_name='http_requests_total',
start=samples.FIRST_PERIOD_BEGIN,
end=samples.FIRST_PERIOD_END,
project_id=samples.TENANT,
q_filter=None,
)
class PrometheusClientTest(tests.TestCase):
def setUp(self):
super(PrometheusClientTest, self).setUp()
self.client = prometheus.PrometheusClient
def test_build_instant_query_first_period(self):
expected = 'http://localhost:9090/api/v1/query?' \
'query=increase(http_requests_total[3600s])' \
'&time=2015-01-01T01:00:00Z'
params = {
'source': 'http://localhost:9090/api/v1',
'query': 'increase(http_requests_total[$period])',
'start': samples.FIRST_PERIOD_BEGIN,
'end': samples.FIRST_PERIOD_END,
'period': '3600',
'metric_name': 'http_requests_total',
}
actual = self.client.build_query(**params)
self.assertEqual(expected, actual)
def test_build_instant_query_second_period(self):
expected = 'http://localhost:9090/api/v1/query?' \
'query=increase(http_requests_total[3600s])' \
'&time=2015-01-01T02:00:00Z'
params = {
'source': 'http://localhost:9090/api/v1',
'query': 'increase(http_requests_total[$period])',
'start': samples.SECOND_PERIOD_BEGIN,
'end': samples.SECOND_PERIOD_END,
'period': '3600',
'metric_name': 'http_requests_total',
}
actual = self.client.build_query(**params)
self.assertEqual(expected, actual)

View File

@ -245,3 +245,48 @@ STORED_DATA[1]['usage']['instance'][0]['rating'] = {
STORED_DATA = split_storage_data(STORED_DATA)
METRICS_CONF = DEFAULT_METRICS_CONF
PROMETHEUS_RESP_INSTANT_QUERY = {
"status": "success",
"data": {
"resultType": "vector",
"result": [
{
"metric": {
"code": "200",
"method": "get",
"group": "prometheus_group",
"instance": "localhost:9090",
"job": "prometheus",
},
"value": [
FIRST_PERIOD_END,
"7",
]
},
{
"metric": {
"code": "200",
"method": "post",
"group": "prometheus_group",
"instance": "localhost:9090",
"job": "prometheus",
},
"value": [
FIRST_PERIOD_END,
"42",
]
},
]
}
}
PROMETHEUS_EMPTY_RESP_INSTANT_QUERY = {
"status": "success",
"data": {
"resultType": "vector",
"result": [],
}
}

View File

@ -29,6 +29,7 @@ import fractions
import math
import shutil
import six
from string import Template
import sys
import tempfile
import yaml
@ -302,3 +303,14 @@ def flat_dict(item, parent=None):
else:
parent[k] = val
return parent
def template_str_substitute(string, replace_map):
"""Returns a string with subtituted patterns."""
try:
tmp = Template(string)
return tmp.substitute(replace_map)
except (KeyError, ValueError) as e:
LOG.error("Error when trying to substitute the string placeholders. \
Please, check your metrics configuration.", e)
raise

View File

@ -49,6 +49,7 @@ cloudkitty.collector.backends =
gnocchi = cloudkitty.collector.gnocchi:GnocchiCollector
monasca = cloudkitty.collector.monasca:MonascaCollector
meta = cloudkitty.collector.meta:MetaCollector
prometheus = cloudkitty.collector.prometheus:PrometheusCollector
cloudkitty.fetchers =
fake = cloudkitty.fetcher.fake:FakeFetcher