Add Prometheus Collector
* Add Prometheus Collector that supports Prometheus HTTP instant queries. * Due to the design of Cloudkitty, only instant queries are supported as the time window used by Prometheus can exceed the one used by Cloudkitty, using range queries, especially if coupled with function that makes calculation back in time from a supplied timestamp (e.g. increase(), delta()). Change-Id: I4ba137b0b079f5ae6bfb645372778698eaa391fc
This commit is contained in:
parent
2d455849b1
commit
49d18e168a
|
@ -0,0 +1,178 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2018 Objectif Libre
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
# @author: Martin CAMEY
|
||||
#
|
||||
from decimal import Decimal
|
||||
from decimal import localcontext
|
||||
from decimal import ROUND_HALF_UP
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
import requests
|
||||
from voluptuous import All
|
||||
from voluptuous import Length
|
||||
from voluptuous import Required
|
||||
from voluptuous import Schema
|
||||
|
||||
from cloudkitty import collector
|
||||
from cloudkitty import utils as ck_utils
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
PROMETHEUS_COLLECTOR_OPTS = 'prometheus_collector'
|
||||
pcollector_collector_opts = [
|
||||
cfg.StrOpt(
|
||||
'prometheus_url',
|
||||
default='',
|
||||
help='Prometheus service URL',
|
||||
),
|
||||
]
|
||||
cfg.CONF.register_opts(pcollector_collector_opts, PROMETHEUS_COLLECTOR_OPTS)
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
PROMETHEUS_EXTRA_SCHEMA = {
|
||||
Required('extra_args'): {
|
||||
Required('query'): All(str, Length(min=1)),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class PrometheusClient(object):
|
||||
@classmethod
|
||||
def build_query(cls, source, query, start, end, period, metric_name):
|
||||
"""Build PromQL instant queries."""
|
||||
start = ck_utils.iso8601_from_timestamp(start)
|
||||
end = ck_utils.iso8601_from_timestamp(end)
|
||||
|
||||
if '$period' in query:
|
||||
try:
|
||||
query = ck_utils.template_str_substitute(
|
||||
query, {'period': str(period) + 's'},
|
||||
)
|
||||
except (KeyError, ValueError):
|
||||
raise collector.NoDataCollected(
|
||||
collector.collector_name,
|
||||
metric_name
|
||||
)
|
||||
|
||||
# Due to the design of Cloudkitty, only instant queries are supported.
|
||||
# In that case 'time' equals 'end' and
|
||||
# the window time is reprezented by the period.
|
||||
return source + '/query?query=' + query + '&time=' + end
|
||||
|
||||
@classmethod
|
||||
def get_data(cls, source, query, start, end, period, metric_name):
|
||||
url = cls.build_query(
|
||||
source,
|
||||
query,
|
||||
start,
|
||||
end,
|
||||
period,
|
||||
metric_name,
|
||||
)
|
||||
|
||||
return requests.get(url).json()
|
||||
|
||||
|
||||
class PrometheusCollector(collector.BaseCollector):
|
||||
collector_name = 'prometheus'
|
||||
|
||||
def __init__(self, transformers, **kwargs):
|
||||
super(PrometheusCollector, self).__init__(transformers, **kwargs)
|
||||
|
||||
@staticmethod
|
||||
def check_configuration(conf):
|
||||
"""Check metrics configuration."""
|
||||
conf = Schema(collector.CONF_BASE_SCHEMA)(conf)
|
||||
metric_schema = Schema(collector.METRIC_BASE_SCHEMA).extend(
|
||||
PROMETHEUS_EXTRA_SCHEMA,
|
||||
)
|
||||
|
||||
output = {}
|
||||
for metric_name, metric in conf['metrics'].items():
|
||||
output[metric_name] = metric_schema(metric)
|
||||
return output
|
||||
|
||||
def _format_data(self, metric_name, project_id, start, end, data):
|
||||
"""Formats Prometheus data format to Cloudkitty data format.
|
||||
|
||||
Returns metadata, groupby, qty
|
||||
"""
|
||||
metadata = {}
|
||||
for meta in self.conf[metric_name]['metadata']:
|
||||
metadata[meta] = data['metric'][meta]
|
||||
|
||||
groupby = {}
|
||||
for meta in self.conf[metric_name]['groupby']:
|
||||
groupby[meta] = data['metric'].get(meta, '')
|
||||
|
||||
with localcontext() as ctx:
|
||||
ctx.prec = 9
|
||||
ctx.rounding = ROUND_HALF_UP
|
||||
|
||||
qty = ck_utils.convert_unit(
|
||||
+Decimal(data['value'][1]),
|
||||
self.conf[metric_name]['factor'],
|
||||
self.conf[metric_name]['offset'],
|
||||
)
|
||||
|
||||
return metadata, groupby, qty
|
||||
|
||||
def fetch_all(self, metric_name, start, end, project_id, q_filter=None):
|
||||
"""Returns metrics to be valorized."""
|
||||
# NOTE(mc): Remove potential trailing '/' to avoid
|
||||
# url building problems
|
||||
url = CONF.prometheus_collector.prometheus_url
|
||||
if url.endswith('/'):
|
||||
url = url[:-1]
|
||||
|
||||
res = PrometheusClient.get_data(
|
||||
url,
|
||||
self.conf[metric_name]['extra_args']['query'],
|
||||
start,
|
||||
end,
|
||||
self.period,
|
||||
metric_name,
|
||||
)
|
||||
|
||||
# If the query returns an empty dataset,
|
||||
# raise a NoDataCollected exception.
|
||||
if not res['data']['result']:
|
||||
raise collector.NoDataCollected(self.collector_name, metric_name)
|
||||
|
||||
formatted_resources = []
|
||||
|
||||
for item in res['data']['result']:
|
||||
metadata, groupby, qty = self._format_data(
|
||||
metric_name,
|
||||
project_id,
|
||||
start,
|
||||
end,
|
||||
item,
|
||||
)
|
||||
|
||||
item = self.t_cloudkitty.format_item(
|
||||
groupby,
|
||||
metadata,
|
||||
self.conf[metric_name]['unit'],
|
||||
qty=qty,
|
||||
)
|
||||
|
||||
formatted_resources.append(item)
|
||||
|
||||
return formatted_resources
|
|
@ -0,0 +1,166 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2018 Objectif Libre
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
# @author: Martin CAMEY
|
||||
#
|
||||
from decimal import Decimal
|
||||
import mock
|
||||
|
||||
from cloudkitty import collector
|
||||
from cloudkitty.collector import prometheus
|
||||
from cloudkitty import tests
|
||||
from cloudkitty.tests import samples
|
||||
from cloudkitty import transformer
|
||||
|
||||
|
||||
class PrometheusCollectorTest(tests.TestCase):
|
||||
def setUp(self):
|
||||
super(PrometheusCollectorTest, self).setUp()
|
||||
self._tenant_id = samples.TENANT
|
||||
args = {
|
||||
'period': 3600,
|
||||
'conf': {
|
||||
'metrics': {
|
||||
'http_requests_total': {
|
||||
'unit': 'instance',
|
||||
'extra_args': {
|
||||
'query': 'http_request_total[$period]',
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
transformers = transformer.get_transformers()
|
||||
self.collector = prometheus.PrometheusCollector(transformers, **args)
|
||||
|
||||
def test_format_data_instant_query(self):
|
||||
expected = ({}, {}, Decimal('7'))
|
||||
|
||||
params = {
|
||||
'metric_name': 'http_requests_total',
|
||||
'project_id': self._tenant_id,
|
||||
'start': samples.FIRST_PERIOD_BEGIN,
|
||||
'end': samples.FIRST_PERIOD_END,
|
||||
'data': samples.PROMETHEUS_RESP_INSTANT_QUERY['data']['result'][0],
|
||||
}
|
||||
actual = self.collector._format_data(**params)
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
def test_format_data_instant_query_2(self):
|
||||
expected = ({}, {}, Decimal('42'))
|
||||
|
||||
params = {
|
||||
'metric_name': 'http_requests_total',
|
||||
'project_id': self._tenant_id,
|
||||
'start': samples.FIRST_PERIOD_BEGIN,
|
||||
'end': samples.FIRST_PERIOD_END,
|
||||
'data': samples.PROMETHEUS_RESP_INSTANT_QUERY['data']['result'][1],
|
||||
}
|
||||
actual = self.collector._format_data(**params)
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
def test_format_retrieve(self):
|
||||
expected = {
|
||||
'http_requests_total': [
|
||||
{
|
||||
'desc': {},
|
||||
'groupby': {},
|
||||
'metadata': {},
|
||||
'vol': {
|
||||
'qty': Decimal('7'),
|
||||
'unit': 'instance'
|
||||
}
|
||||
},
|
||||
{
|
||||
'desc': {},
|
||||
'groupby': {},
|
||||
'metadata': {},
|
||||
'vol': {
|
||||
'qty': Decimal('42'),
|
||||
'unit': 'instance'
|
||||
}
|
||||
}
|
||||
|
||||
]
|
||||
}
|
||||
|
||||
no_response = mock.patch(
|
||||
'cloudkitty.collector.prometheus.PrometheusClient.get_data',
|
||||
return_value=samples.PROMETHEUS_RESP_INSTANT_QUERY,
|
||||
)
|
||||
|
||||
with no_response:
|
||||
actual = self.collector.retrieve(
|
||||
metric_name='http_requests_total',
|
||||
start=samples.FIRST_PERIOD_BEGIN,
|
||||
end=samples.FIRST_PERIOD_END,
|
||||
project_id=samples.TENANT,
|
||||
q_filter=None,
|
||||
)
|
||||
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
def test_format_retrieve_raise_NoDataCollected(self):
|
||||
no_response = mock.patch(
|
||||
'cloudkitty.collector.prometheus.PrometheusClient.get_data',
|
||||
return_value=samples.PROMETHEUS_EMPTY_RESP_INSTANT_QUERY,
|
||||
)
|
||||
|
||||
with no_response:
|
||||
self.assertRaises(
|
||||
collector.NoDataCollected,
|
||||
self.collector.retrieve,
|
||||
metric_name='http_requests_total',
|
||||
start=samples.FIRST_PERIOD_BEGIN,
|
||||
end=samples.FIRST_PERIOD_END,
|
||||
project_id=samples.TENANT,
|
||||
q_filter=None,
|
||||
)
|
||||
|
||||
|
||||
class PrometheusClientTest(tests.TestCase):
|
||||
def setUp(self):
|
||||
super(PrometheusClientTest, self).setUp()
|
||||
self.client = prometheus.PrometheusClient
|
||||
|
||||
def test_build_instant_query_first_period(self):
|
||||
expected = 'http://localhost:9090/api/v1/query?' \
|
||||
'query=increase(http_requests_total[3600s])' \
|
||||
'&time=2015-01-01T01:00:00Z'
|
||||
params = {
|
||||
'source': 'http://localhost:9090/api/v1',
|
||||
'query': 'increase(http_requests_total[$period])',
|
||||
'start': samples.FIRST_PERIOD_BEGIN,
|
||||
'end': samples.FIRST_PERIOD_END,
|
||||
'period': '3600',
|
||||
'metric_name': 'http_requests_total',
|
||||
}
|
||||
actual = self.client.build_query(**params)
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
def test_build_instant_query_second_period(self):
|
||||
expected = 'http://localhost:9090/api/v1/query?' \
|
||||
'query=increase(http_requests_total[3600s])' \
|
||||
'&time=2015-01-01T02:00:00Z'
|
||||
params = {
|
||||
'source': 'http://localhost:9090/api/v1',
|
||||
'query': 'increase(http_requests_total[$period])',
|
||||
'start': samples.SECOND_PERIOD_BEGIN,
|
||||
'end': samples.SECOND_PERIOD_END,
|
||||
'period': '3600',
|
||||
'metric_name': 'http_requests_total',
|
||||
}
|
||||
actual = self.client.build_query(**params)
|
||||
self.assertEqual(expected, actual)
|
|
@ -245,3 +245,48 @@ STORED_DATA[1]['usage']['instance'][0]['rating'] = {
|
|||
STORED_DATA = split_storage_data(STORED_DATA)
|
||||
|
||||
METRICS_CONF = DEFAULT_METRICS_CONF
|
||||
|
||||
|
||||
PROMETHEUS_RESP_INSTANT_QUERY = {
|
||||
"status": "success",
|
||||
"data": {
|
||||
"resultType": "vector",
|
||||
"result": [
|
||||
{
|
||||
"metric": {
|
||||
"code": "200",
|
||||
"method": "get",
|
||||
"group": "prometheus_group",
|
||||
"instance": "localhost:9090",
|
||||
"job": "prometheus",
|
||||
},
|
||||
"value": [
|
||||
FIRST_PERIOD_END,
|
||||
"7",
|
||||
]
|
||||
},
|
||||
{
|
||||
"metric": {
|
||||
"code": "200",
|
||||
"method": "post",
|
||||
"group": "prometheus_group",
|
||||
"instance": "localhost:9090",
|
||||
"job": "prometheus",
|
||||
},
|
||||
"value": [
|
||||
FIRST_PERIOD_END,
|
||||
"42",
|
||||
]
|
||||
},
|
||||
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
PROMETHEUS_EMPTY_RESP_INSTANT_QUERY = {
|
||||
"status": "success",
|
||||
"data": {
|
||||
"resultType": "vector",
|
||||
"result": [],
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ import fractions
|
|||
import math
|
||||
import shutil
|
||||
import six
|
||||
from string import Template
|
||||
import sys
|
||||
import tempfile
|
||||
import yaml
|
||||
|
@ -302,3 +303,14 @@ def flat_dict(item, parent=None):
|
|||
else:
|
||||
parent[k] = val
|
||||
return parent
|
||||
|
||||
|
||||
def template_str_substitute(string, replace_map):
|
||||
"""Returns a string with subtituted patterns."""
|
||||
try:
|
||||
tmp = Template(string)
|
||||
return tmp.substitute(replace_map)
|
||||
except (KeyError, ValueError) as e:
|
||||
LOG.error("Error when trying to substitute the string placeholders. \
|
||||
Please, check your metrics configuration.", e)
|
||||
raise
|
||||
|
|
|
@ -49,6 +49,7 @@ cloudkitty.collector.backends =
|
|||
gnocchi = cloudkitty.collector.gnocchi:GnocchiCollector
|
||||
monasca = cloudkitty.collector.monasca:MonascaCollector
|
||||
meta = cloudkitty.collector.meta:MetaCollector
|
||||
prometheus = cloudkitty.collector.prometheus:PrometheusCollector
|
||||
|
||||
cloudkitty.fetchers =
|
||||
fake = cloudkitty.fetcher.fake:FakeFetcher
|
||||
|
|
Loading…
Reference in New Issue