Add support to InfluxDB v2 as storage backend
This patch allows CloudKitty to use InfluxDB v2 with Flux queries. This type of query uses less CPU and RAM to be processed in the InfluxDB backend. Change-Id: I8ee3c92776aa69afbede353981a5fcd65dd7d099 Depends-On: https://review.opendev.org/c/openstack/requirements/+/895629 Story: 2010863 Task: 48539
This commit is contained in:
parent
6414edca51
commit
101a410739
19
.zuul.yaml
19
.zuul.yaml
|
@ -76,12 +76,25 @@
|
|||
name: cloudkitty-tempest-full-v2-storage-influxdb
|
||||
parent: base-cloudkitty-v2-api-tempest-job
|
||||
description: |
|
||||
Job testing cloudkitty installation on devstack with python 3 and the
|
||||
InfluxDB v2 storage driver and running tempest tests
|
||||
Job testing cloudkitty installation on devstack with python 3, InfluxDB
|
||||
v1 and the InfluxDB v2 storage driver and running tempest tests
|
||||
vars:
|
||||
devstack_localrc:
|
||||
CLOUDKITTY_STORAGE_BACKEND: influxdb
|
||||
CLOUDKITTY_STORAGE_VERSION: 2
|
||||
CLOUDKITTY_INFLUX_VERSION: 1
|
||||
|
||||
- job:
|
||||
name: cloudkitty-tempest-full-v2-storage-influxdb-v2
|
||||
parent: base-cloudkitty-v2-api-tempest-job
|
||||
description: |
|
||||
Job testing cloudkitty installation on devstack with python 3, InfluxDB
|
||||
v2 and the InfluxDB v2 storage driver and running tempest tests
|
||||
vars:
|
||||
devstack_localrc:
|
||||
CLOUDKITTY_STORAGE_BACKEND: influxdb
|
||||
CLOUDKITTY_STORAGE_VERSION: 2
|
||||
CLOUDKITTY_INFLUX_VERSION: 2
|
||||
|
||||
- job:
|
||||
name: cloudkitty-tempest-full-v2-storage-elasticsearch
|
||||
|
@ -139,6 +152,7 @@
|
|||
check:
|
||||
jobs:
|
||||
- cloudkitty-tempest-full-v2-storage-influxdb
|
||||
- cloudkitty-tempest-full-v2-storage-influxdb-v2
|
||||
- cloudkitty-tempest-full-v2-storage-elasticsearch:
|
||||
voting: false
|
||||
- cloudkitty-tempest-full-v2-storage-opensearch:
|
||||
|
@ -150,5 +164,6 @@
|
|||
gate:
|
||||
jobs:
|
||||
- cloudkitty-tempest-full-v2-storage-influxdb
|
||||
- cloudkitty-tempest-full-v2-storage-influxdb-v2
|
||||
- cloudkitty-tempest-full-v1-storage-sqlalchemy
|
||||
- cloudkitty-tempest-full-ipv6-only
|
||||
|
|
|
@ -12,11 +12,18 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
import csv
|
||||
import datetime
|
||||
import influxdb
|
||||
import io
|
||||
import json
|
||||
import re
|
||||
|
||||
from influxdb_client.client.write_api import SYNCHRONOUS
|
||||
from influxdb_client import InfluxDBClient
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
import requests
|
||||
|
||||
from cloudkitty import dataframe
|
||||
from cloudkitty.storage import v2 as v2_storage
|
||||
|
@ -56,6 +63,29 @@ influx_storage_opts = [
|
|||
help='Path of the CA certificate to trust for HTTPS connections',
|
||||
default=None
|
||||
),
|
||||
cfg.IntOpt('version', help='InfluxDB version', default=1),
|
||||
cfg.IntOpt('query_timeout', help='Flux query timeout in milliseconds',
|
||||
default=3600000),
|
||||
cfg.StrOpt(
|
||||
'token',
|
||||
help='InfluxDB API token for version 2 authentication',
|
||||
default=None
|
||||
),
|
||||
cfg.StrOpt(
|
||||
'org',
|
||||
help='InfluxDB 2 org',
|
||||
default="openstack"
|
||||
),
|
||||
cfg.StrOpt(
|
||||
'bucket',
|
||||
help='InfluxDB 2 bucket',
|
||||
default="cloudkitty"
|
||||
),
|
||||
cfg.StrOpt(
|
||||
'url',
|
||||
help='InfluxDB 2 URL',
|
||||
default=None
|
||||
)
|
||||
]
|
||||
|
||||
CONF.register_opts(influx_storage_opts, INFLUX_STORAGE_GROUP)
|
||||
|
@ -192,7 +222,7 @@ class InfluxClient(object):
|
|||
return " AND " + InfluxClient._get_filter("type", types)
|
||||
|
||||
def get_total(self, types, begin, end, custom_fields,
|
||||
groupby=None, filters=None):
|
||||
groupby=None, filters=None, limit=None):
|
||||
|
||||
self.validate_custom_fields(custom_fields)
|
||||
|
||||
|
@ -232,11 +262,8 @@ class InfluxClient(object):
|
|||
" clauses are not allowed [%s].",
|
||||
field, forbidden_clauses)
|
||||
|
||||
def retrieve(self,
|
||||
types,
|
||||
filters,
|
||||
begin, end,
|
||||
offset=0, limit=1000, paginate=True):
|
||||
def retrieve(self, types, filters, begin, end, offset=0, limit=1000,
|
||||
paginate=True):
|
||||
query = 'SELECT * FROM "dataframes"'
|
||||
query += self._get_time_query(begin, end)
|
||||
query += self._get_filter_query(filters)
|
||||
|
@ -282,15 +309,448 @@ class InfluxClient(object):
|
|||
|
||||
self._conn.query(query)
|
||||
|
||||
def _get_total_elem(self, begin, end, groupby, series_groupby, point):
|
||||
if groupby and 'time' in groupby:
|
||||
begin = tzutils.dt_from_iso(point['time'])
|
||||
period = point.get(PERIOD_FIELD_NAME) or self._default_period
|
||||
end = tzutils.add_delta(begin, datetime.timedelta(seconds=period))
|
||||
output = {
|
||||
'begin': begin,
|
||||
'end': end,
|
||||
}
|
||||
|
||||
for key in point.keys():
|
||||
if "time" != key:
|
||||
output[key] = point[key]
|
||||
|
||||
if groupby:
|
||||
for group in _sanitized_groupby(groupby):
|
||||
output[group] = series_groupby.get(group, '')
|
||||
return output
|
||||
|
||||
def process_total(self, total, begin, end, groupby, *args):
|
||||
output = []
|
||||
for (series_name, series_groupby), points in total.items():
|
||||
for point in points:
|
||||
# NOTE(peschk_l): InfluxDB returns all timestamps for a given
|
||||
# period and interval, even those with no data. This filters
|
||||
# out periods with no data
|
||||
|
||||
# NOTE (rafaelweingartner): the summary get API is allowing
|
||||
# users to customize the report. Therefore, we only ignore
|
||||
# data points, if all of the entries have None values.
|
||||
# Otherwise, they are presented to the user.
|
||||
if [k for k in point.keys() if point[k]]:
|
||||
output.append(self._get_total_elem(
|
||||
tzutils.utc_to_local(begin),
|
||||
tzutils.utc_to_local(end),
|
||||
groupby,
|
||||
series_groupby,
|
||||
point))
|
||||
return output
|
||||
|
||||
|
||||
class InfluxClientV2(InfluxClient):
|
||||
"""Class used to facilitate interaction with InfluxDB v2
|
||||
|
||||
custom_fields_rgx: Regex to parse the input custom_fields and
|
||||
retrieve the field name, the desired alias
|
||||
and the aggregation function to use.
|
||||
It allows us to keep the same custom_fields
|
||||
representation for both InfluxQL and Flux
|
||||
queries.
|
||||
|
||||
"""
|
||||
|
||||
custom_fields_rgx = r'([\w_\\"]+)\(([\w_\\"]+)\) (AS|as) ' \
|
||||
r'\\?"?([\w_ \\]+)"?,? ?'
|
||||
|
||||
class FluxResponseHandler(object):
|
||||
"""Class used to process the response of Flux queries
|
||||
|
||||
As the Flux response splits its result set by the
|
||||
requested fields, we need to merge them into a single
|
||||
one based on their groups (tags).
|
||||
|
||||
Using this approach we keep the response data
|
||||
compatible with the InfluxQL result set, where we
|
||||
already have the multiple result set for each field
|
||||
merged into a single one.
|
||||
"""
|
||||
|
||||
def __init__(self, response, groupby, fields, begin, end,
|
||||
field_filters):
|
||||
self.data = response
|
||||
self.field_filters = field_filters
|
||||
self.response = {}
|
||||
self.begin = begin
|
||||
self.end = end
|
||||
self.groupby = groupby
|
||||
self.fields = fields
|
||||
self.process()
|
||||
|
||||
def process(self):
|
||||
"""This method merges all the Flux result sets into a single one.
|
||||
|
||||
To make sure the fields filtering comply with the user's
|
||||
request, we need to remove the merged entries that have None
|
||||
value for filtered fields, we need to do that because working
|
||||
with fields one by one in Flux queries is more performant
|
||||
than working with all the fields together, but it brings some
|
||||
problems when we want to filter some data. E.g:
|
||||
|
||||
We want the fields A and B, grouped by C and D, and the field
|
||||
A must be 2. Imagine this query for the following
|
||||
dataset:
|
||||
|
||||
A : C : D B : C : D
|
||||
1 : 1 : 1 5 : 1 : 1
|
||||
2 : 2 : 2 6 : 2 : 2
|
||||
2 : 3 : 3 7 : 3 : 3
|
||||
2 : 4 : 4
|
||||
|
||||
The result set is going to be like:
|
||||
|
||||
A : C : D B : C : D
|
||||
2 : 2 : 2 5 : 1 : 1
|
||||
2 : 3 : 3 6 : 2 : 2
|
||||
2 : 4 : 4 7 : 3 : 3
|
||||
|
||||
And the merged value is going to be like:
|
||||
|
||||
A : B : C : D
|
||||
None : 5 : 1 : 1
|
||||
2 : 6 : 2 : 2
|
||||
2 : 7 : 3 : 3
|
||||
2 : None : 4 : 4
|
||||
|
||||
So, we need to remove the first undesired entry to get the
|
||||
correct result:
|
||||
|
||||
A : B : C : D
|
||||
2 : 6 : 2 : 2
|
||||
2 : 7 : 3 : 3
|
||||
2 : None : 4 : 4
|
||||
"""
|
||||
|
||||
LOG.debug("Using fields %s to process InfluxDB V2 response.",
|
||||
self.fields)
|
||||
LOG.debug("Start processing data [%s] of InfluxDB V2 API.",
|
||||
self.data)
|
||||
if self.fields == ["*"] and not self.groupby:
|
||||
self.process_data_wildcard()
|
||||
else:
|
||||
self.process_data_with_fields()
|
||||
|
||||
LOG.debug("Data processed by the InfluxDB V2 backend with "
|
||||
"result [%s].", self.response)
|
||||
LOG.debug("Start sanitizing the response of Influx V2 API.")
|
||||
self.sanitize_filtered_entries()
|
||||
LOG.debug("Response sanitized [%s] for InfluxDB V2 API.",
|
||||
self.response)
|
||||
|
||||
def process_data_wildcard(self):
|
||||
LOG.debug("Processing wildcard response for InfluxDB V2 API.")
|
||||
found_fields = set()
|
||||
for r in self.data:
|
||||
if self.is_header_entry(r):
|
||||
LOG.debug("Skipping header entry: [%s].", r)
|
||||
continue
|
||||
r_key = ''.join(sorted(r.values()))
|
||||
found_fields.add(r['_field'])
|
||||
r_value = r
|
||||
r_value['begin'] = self.begin
|
||||
r_value['end'] = self.end
|
||||
self.response.setdefault(
|
||||
r_key, r_value)[r['result']] = float(r['_value'])
|
||||
|
||||
def process_data_with_fields(self):
|
||||
for r in self.data:
|
||||
if self.is_header_entry(r):
|
||||
LOG.debug("Skipping header entry: [%s].", r)
|
||||
continue
|
||||
r_key = ''
|
||||
r_value = {f: None for f in self.fields}
|
||||
r_value['begin'] = self.begin
|
||||
r_value['end'] = self.end
|
||||
for g in (self.groupby or []):
|
||||
val = r.get(g)
|
||||
r_key += val or ''
|
||||
r_value[g] = val
|
||||
|
||||
self.response.setdefault(
|
||||
r_key, r_value)[r['result']] = float(r['_value'])
|
||||
|
||||
@staticmethod
|
||||
def is_header_entry(entry):
|
||||
"""Check header entries.
|
||||
|
||||
As the response contains multiple resultsets,
|
||||
each entry in the response CSV has its own
|
||||
header, which is the same for all the result sets,
|
||||
but the CSV parser does not ignore it
|
||||
and processes all headers except the first as a
|
||||
dict entry, so for these cases, each dict's value
|
||||
is going to be the same as the dict's key, so we
|
||||
are picking one and if it is this case, we skip it.
|
||||
|
||||
"""
|
||||
|
||||
return entry.get('_start') == '_start'
|
||||
|
||||
def sanitize_filtered_entries(self):
|
||||
"""Removes entries where filtered fields have None as value."""
|
||||
|
||||
for d in self.field_filters or []:
|
||||
for k in list(self.response.keys()):
|
||||
if self.response[k][d] is None:
|
||||
self.response.pop(k, None)
|
||||
|
||||
def __init__(self, default_period=None):
|
||||
super().__init__(default_period=default_period)
|
||||
self.client = InfluxDBClient(
|
||||
url=CONF.storage_influxdb.url,
|
||||
timeout=CONF.storage_influxdb.query_timeout,
|
||||
token=CONF.storage_influxdb.token,
|
||||
org=CONF.storage_influxdb.org)
|
||||
self._conn = self.client
|
||||
|
||||
def retrieve(self, types, filters, begin, end, offset=0, limit=1000,
|
||||
paginate=True):
|
||||
|
||||
query = self.get_query(begin, end, '*', filters=filters)
|
||||
response = self.query(query)
|
||||
output = self.process_total(
|
||||
response, begin, end, None, '*', filters)
|
||||
LOG.debug("Retrieved output %s", output)
|
||||
results = {'results': output[
|
||||
offset:offset + limit] if paginate else output}
|
||||
return len(output), results
|
||||
|
||||
def delete(self, begin, end, filters):
|
||||
predicate = '_measurement="dataframes"'
|
||||
f = self.get_group_filters_query(
|
||||
filters, fmt=lambda x: '"' + str(x) + '"')
|
||||
if f:
|
||||
f = f.replace('==', '=').replace('and', 'AND')
|
||||
predicate += f'{f}'
|
||||
|
||||
LOG.debug("InfluxDB v2 deleting elements filtering by [%s] and "
|
||||
"with [begin=%s, end=%s].", predicate, begin, end)
|
||||
delete_api = self.client.delete_api()
|
||||
delete_api.delete(begin, end, bucket=CONF.storage_influxdb.bucket,
|
||||
predicate=predicate)
|
||||
|
||||
def process_total(self, total, begin, end, groupby, custom_fields,
|
||||
filters):
|
||||
cf = self.get_custom_fields(custom_fields)
|
||||
fields = list(map(lambda f: f[2], cf))
|
||||
c_fields = {f[1]: f[2] for f in cf}
|
||||
field_filters = [c_fields[f] for f in filters if f in c_fields]
|
||||
handler = self.FluxResponseHandler(total, groupby, fields, begin, end,
|
||||
field_filters)
|
||||
return list(handler.response.values())
|
||||
|
||||
def commit(self):
|
||||
total_points = len(self._points)
|
||||
if len(self._points) < 1:
|
||||
return
|
||||
LOG.debug('Pushing {} points to InfluxDB'.format(total_points))
|
||||
self.write_points(self._points,
|
||||
retention_policy=self._retention_policy)
|
||||
self._points = []
|
||||
|
||||
def write_points(self, points, retention_policy=None):
|
||||
write_api = self.client.write_api(write_options=SYNCHRONOUS)
|
||||
[write_api.write(
|
||||
bucket=CONF.storage_influxdb.bucket, record=p)
|
||||
for p in points]
|
||||
|
||||
def _get_filter_query(self, filters):
|
||||
if not filters:
|
||||
return ''
|
||||
return ' and ' + ' and '.join(
|
||||
self._get_filter(k, v) for k, v in filters.items())
|
||||
|
||||
def get_custom_fields(self, custom_fields):
|
||||
|
||||
if not custom_fields:
|
||||
return []
|
||||
|
||||
if custom_fields.strip() == '*':
|
||||
return [('*', '*', '*')]
|
||||
|
||||
groups = [list(i.groups()) for i in re.finditer(
|
||||
self.custom_fields_rgx, custom_fields)]
|
||||
|
||||
# Remove the "As|as" group that is useless.
|
||||
if groups:
|
||||
for g in groups:
|
||||
del g[2]
|
||||
|
||||
return groups
|
||||
|
||||
def get_group_filters_query(self, group_filters, fmt=lambda x: f'r.{x}'):
|
||||
if not group_filters:
|
||||
return ''
|
||||
|
||||
get_val = (lambda x: x if isinstance(v, (int, float)) or
|
||||
x.isnumeric() else f'"{x}"')
|
||||
|
||||
f = ''
|
||||
for k, v in group_filters.items():
|
||||
if isinstance(v, (list, tuple)):
|
||||
if len(v) == 1:
|
||||
f += f' and {fmt(k)}=={get_val(v[0])}'
|
||||
continue
|
||||
|
||||
f += ' and (%s)' % ' or '.join([f'{fmt(k)}=={get_val(val)}'
|
||||
for val in v])
|
||||
continue
|
||||
|
||||
f += f' and {fmt(k)}=={get_val(v)}'
|
||||
|
||||
return f
|
||||
|
||||
def get_field_filters_query(self, field_filters,
|
||||
fmt=lambda x: 'r["_value"]'):
|
||||
return self.get_group_filters_query(field_filters, fmt)
|
||||
|
||||
def get_custom_fields_query(self, custom_fields, query, field_filters,
|
||||
group_filters, limit=None, groupby=None):
|
||||
if not groupby:
|
||||
groupby = []
|
||||
if not custom_fields:
|
||||
custom_fields = 'sum(price) AS price,sum(qty) AS qty'
|
||||
columns_to_keep = ', '.join(map(lambda g: f'"{g}"', groupby))
|
||||
columns_to_keep += ', "_field", "_value", "_start", "_stop"'
|
||||
new_query = ''
|
||||
LOG.debug("Custom fields: %s", custom_fields)
|
||||
LOG.debug("Custom fields processed: %s",
|
||||
self.get_custom_fields(custom_fields))
|
||||
for operation, field, alias in self.get_custom_fields(custom_fields):
|
||||
LOG.debug("Generating query with operation [%s],"
|
||||
" field [%s] and alias [%s]", operation, field, alias)
|
||||
field_filter = {}
|
||||
if field_filters and field in field_filters:
|
||||
field_filter = {field: field_filters[field]}
|
||||
|
||||
if field == '*':
|
||||
group_filter = self.get_group_filters_query(
|
||||
group_filters).replace(" and ", "", 1)
|
||||
filter_to_replace = f'|> filter(fn: (r) => {group_filter})'
|
||||
new_query += query.replace(
|
||||
'<placeholder-filter>',
|
||||
filter_to_replace).replace(
|
||||
'<placeholder-operations>',
|
||||
f'''|> drop(columns: ["_time"])
|
||||
{'|> limit(n: ' + str(limit) + ')' if limit else ''}
|
||||
|> yield(name: "result")''')
|
||||
continue
|
||||
|
||||
new_query += query.replace(
|
||||
'<placeholder-filter>',
|
||||
f'|> filter(fn: (r) => r["_field"] == '
|
||||
f'"{field}" {self.get_group_filters_query(group_filters)} '
|
||||
f'{self.get_field_filters_query(field_filter)})'
|
||||
).replace(
|
||||
'<placeholder-operations>',
|
||||
f'''|> {operation.lower()}()
|
||||
|> keep(columns: [{columns_to_keep}])
|
||||
|> set(key: "_field", value: "{alias}")
|
||||
|> yield(name: "{alias}")''')
|
||||
return new_query
|
||||
|
||||
def get_groupby(self, groupby):
|
||||
if not groupby:
|
||||
return "|> group()"
|
||||
return f'''|> group(columns: [{','.join([f'"{g}"'
|
||||
for g in groupby])}])'''
|
||||
|
||||
def get_time_range(self, begin, end):
|
||||
if not begin or not end:
|
||||
return ''
|
||||
return f'|> range(start: {begin.isoformat()}, stop: {end.isoformat()})'
|
||||
|
||||
def get_query(self, begin, end, custom_fields, groupby=None, filters=None,
|
||||
limit=None):
|
||||
|
||||
custom_fields_processed = list(
|
||||
map(lambda x: x[1], self.get_custom_fields(custom_fields)))
|
||||
field_filters = dict(filter(
|
||||
lambda f: f[0] in custom_fields_processed, filters.items()))
|
||||
group_filters = dict(filter(
|
||||
lambda f: f[0] not in field_filters, filters.items()))
|
||||
|
||||
query = f'''
|
||||
from(bucket:"{CONF.storage_influxdb.bucket}")
|
||||
{self.get_time_range(begin, end)}
|
||||
|> filter(fn: (r) => r["_measurement"] == "dataframes")
|
||||
<placeholder-filter>
|
||||
{self.get_groupby(groupby)}
|
||||
<placeholder-operations>
|
||||
'''
|
||||
|
||||
LOG.debug("Field Filters: %s", field_filters)
|
||||
LOG.debug("Group Filters: %s", group_filters)
|
||||
query = self.get_custom_fields_query(custom_fields, query,
|
||||
field_filters, group_filters,
|
||||
limit, groupby)
|
||||
return query
|
||||
|
||||
def query(self, query):
|
||||
url_base = CONF.storage_influxdb.url
|
||||
org = CONF.storage_influxdb.org
|
||||
url = f'{url_base}/api/v2/query?org={org}'
|
||||
response = requests.post(
|
||||
url=url,
|
||||
headers={
|
||||
'Content-type': 'application/json',
|
||||
'Authorization': f'Token {CONF.storage_influxdb.token}'},
|
||||
data=json.dumps({
|
||||
'query': query}))
|
||||
response_text = response.text
|
||||
LOG.debug("Raw Response: [%s].", response_text)
|
||||
handled_response = []
|
||||
for csv_tables in response_text.split(',result,table,'):
|
||||
csv_tables = ',result,table,' + csv_tables
|
||||
LOG.debug("Processing CSV [%s].", csv_tables)
|
||||
processed = list(csv.DictReader(io.StringIO(csv_tables)))
|
||||
LOG.debug("Processed CSV in dict [%s]", processed)
|
||||
handled_response.extend(processed)
|
||||
return handled_response
|
||||
|
||||
def get_total(self, types, begin, end, custom_fields,
|
||||
groupby=None, filters=None, limit=None):
|
||||
|
||||
LOG.debug("Query types: %s", types)
|
||||
if types:
|
||||
if not filters:
|
||||
filters = {}
|
||||
filters['type'] = types
|
||||
|
||||
LOG.debug("Query filters: %s", filters)
|
||||
query = self.get_query(begin, end, custom_fields,
|
||||
groupby, filters, limit)
|
||||
|
||||
LOG.debug("Executing the Flux query [%s].", query)
|
||||
|
||||
return self.query(query)
|
||||
|
||||
|
||||
class InfluxStorage(v2_storage.BaseStorage):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(InfluxStorage, self).__init__(*args, **kwargs)
|
||||
self._default_period = kwargs.get('period') or CONF.collect.period
|
||||
self._conn = InfluxClient(default_period=self._default_period)
|
||||
if CONF.storage_influxdb.version == 2:
|
||||
self._conn = InfluxClientV2(default_period=self._default_period)
|
||||
else:
|
||||
self._conn = InfluxClient(default_period=self._default_period)
|
||||
|
||||
def init(self):
|
||||
if CONF.storage_influxdb.version == 2:
|
||||
return
|
||||
policy = CONF.storage_influxdb.retention_policy
|
||||
database = CONF.storage_influxdb.database
|
||||
if not self._conn.retention_policy_exists(database, policy):
|
||||
|
@ -371,25 +831,6 @@ class InfluxStorage(v2_storage.BaseStorage):
|
|||
def delete(self, begin=None, end=None, filters=None):
|
||||
self._conn.delete(begin, end, filters)
|
||||
|
||||
def _get_total_elem(self, begin, end, groupby, series_groupby, point):
|
||||
if groupby and 'time' in groupby:
|
||||
begin = tzutils.dt_from_iso(point['time'])
|
||||
period = point.get(PERIOD_FIELD_NAME) or self._default_period
|
||||
end = tzutils.add_delta(begin, datetime.timedelta(seconds=period))
|
||||
output = {
|
||||
'begin': begin,
|
||||
'end': end,
|
||||
}
|
||||
|
||||
for key in point.keys():
|
||||
if "time" != key:
|
||||
output[key] = point[key]
|
||||
|
||||
if groupby:
|
||||
for group in _sanitized_groupby(groupby):
|
||||
output[group] = series_groupby.get(group, '')
|
||||
return output
|
||||
|
||||
def total(self, groupby=None, begin=None, end=None, metric_types=None,
|
||||
filters=None, offset=0, limit=1000, paginate=True,
|
||||
custom_fields="SUM(qty) AS qty, SUM(price) AS rate"):
|
||||
|
@ -398,30 +839,14 @@ class InfluxStorage(v2_storage.BaseStorage):
|
|||
groupby = self.parse_groupby_syntax_to_groupby_elements(groupby)
|
||||
|
||||
total = self._conn.get_total(metric_types, begin, end,
|
||||
custom_fields, groupby, filters)
|
||||
custom_fields, groupby, filters, limit)
|
||||
|
||||
output = []
|
||||
for (series_name, series_groupby), points in total.items():
|
||||
for point in points:
|
||||
# NOTE(peschk_l): InfluxDB returns all timestamps for a given
|
||||
# period and interval, even those with no data. This filters
|
||||
# out periods with no data
|
||||
|
||||
# NOTE (rafaelweingartner): the summary get API is allowing
|
||||
# users to customize the report. Therefore, we only ignore
|
||||
# data points, if all of the entries have None values.
|
||||
# Otherwise, they are presented to the user.
|
||||
if [k for k in point.keys() if point[k]]:
|
||||
output.append(self._get_total_elem(
|
||||
tzutils.utc_to_local(begin),
|
||||
tzutils.utc_to_local(end),
|
||||
groupby,
|
||||
series_groupby,
|
||||
point))
|
||||
output = self._conn.process_total(
|
||||
total, begin, end, groupby, custom_fields, filters)
|
||||
|
||||
groupby = _sanitized_groupby(groupby)
|
||||
if groupby:
|
||||
output.sort(key=lambda x: [x[group] for group in groupby])
|
||||
output.sort(key=lambda x: [x[group] or "" for group in groupby])
|
||||
|
||||
return {
|
||||
'total': len(output),
|
||||
|
|
|
@ -90,7 +90,7 @@ class FakeInfluxClient(InfluxClient):
|
|||
return target_serie
|
||||
|
||||
def get_total(self, types, begin, end, custom_fields, groupby=None,
|
||||
filters=None):
|
||||
filters=None, limit=None):
|
||||
total = copy.deepcopy(self.total_sample)
|
||||
series = []
|
||||
|
||||
|
|
|
@ -209,3 +209,233 @@ class TestInfluxClient(unittest.TestCase):
|
|||
self._storage.delete(end=datetime(2019, 1, 2))
|
||||
m.assert_called_once_with("""DELETE FROM "dataframes" WHERE """
|
||||
"""time < '2019-01-02T00:00:00';""")
|
||||
|
||||
def test_process_total(self):
|
||||
begin = datetime(2019, 1, 2, 10)
|
||||
end = datetime(2019, 1, 2, 11)
|
||||
groupby = ['valA', 'time']
|
||||
points_1 = [
|
||||
{
|
||||
'qty': 42,
|
||||
'price': 1.0,
|
||||
'time': begin.isoformat()
|
||||
}
|
||||
]
|
||||
series_groupby_1 = {
|
||||
'valA': '1'
|
||||
}
|
||||
points_2 = [
|
||||
{
|
||||
'qty': 12,
|
||||
'price': 2.0,
|
||||
'time': begin.isoformat()
|
||||
}
|
||||
]
|
||||
series_groupby_2 = {
|
||||
'valA': '2'
|
||||
}
|
||||
points_3 = [
|
||||
{
|
||||
'qty': None,
|
||||
'price': None,
|
||||
'time': None
|
||||
}
|
||||
]
|
||||
series_groupby_3 = {
|
||||
'valA': None
|
||||
}
|
||||
series_name = 'dataframes'
|
||||
items = [((series_name, series_groupby_1), points_1),
|
||||
((series_name, series_groupby_2), points_2),
|
||||
((series_name, series_groupby_3), points_3)]
|
||||
total = FakeResultSet(items=items)
|
||||
result = self.client.process_total(total=total, begin=begin, end=end,
|
||||
groupby=groupby)
|
||||
expected = [{'begin': tzutils.utc_to_local(begin),
|
||||
'end': tzutils.utc_to_local(end),
|
||||
'qty': 42,
|
||||
'price': 1.0,
|
||||
'valA': '1'},
|
||||
{'begin': tzutils.utc_to_local(begin),
|
||||
'end': tzutils.utc_to_local(end),
|
||||
'qty': 12,
|
||||
'price': 2.0,
|
||||
'valA': '2'}
|
||||
]
|
||||
self.assertEqual(expected, result)
|
||||
|
||||
|
||||
class TestInfluxClientV2(unittest.TestCase):
|
||||
|
||||
@mock.patch('cloudkitty.storage.v2.influx.InfluxDBClient')
|
||||
def setUp(self, client_mock):
|
||||
self.period_begin = tzutils.local_to_utc(
|
||||
tzutils.get_month_start())
|
||||
self.period_end = tzutils.local_to_utc(
|
||||
tzutils.get_next_month())
|
||||
self.client = influx.InfluxClientV2()
|
||||
|
||||
@mock.patch('cloudkitty.storage.v2.influx.requests')
|
||||
def test_query(self, mock_request):
|
||||
static_vals = ['', 'result', 'table', '_start', '_value']
|
||||
custom_fields = 'last(f1) AS f1, last(f2) AS f2, last(f3) AS f3'
|
||||
groups = ['g1', 'g2', 'g3']
|
||||
data = [
|
||||
static_vals + groups,
|
||||
['', 'f1', 0, 1, 1, 1, 2, 3],
|
||||
['', 'f2', 0, 1, 2, 1, 2, 3],
|
||||
['', 'f3', 0, 1, 3, 1, 2, 3],
|
||||
static_vals + groups,
|
||||
['', 'f1', 0, 1, 3, 3, 1, 2],
|
||||
['', 'f2', 0, 1, 1, 3, 1, 2],
|
||||
['', 'f3', 0, 1, 2, 3, 1, 2],
|
||||
static_vals + groups,
|
||||
['', 'f1', 0, 1, 2, 2, 3, 1],
|
||||
['', 'f2', 0, 1, 3, 2, 3, 1],
|
||||
['', 'f3', 0, 1, 1, 2, 3, 1]
|
||||
]
|
||||
|
||||
expected_value = [
|
||||
{'f1': 1.0, 'f2': 2.0, 'f3': 3.0, 'begin': self.period_begin,
|
||||
'end': self.period_end, 'g1': '1', 'g2': '2', 'g3': '3'},
|
||||
{'f1': 3.0, 'f2': 1.0, 'f3': 2.0, 'begin': self.period_begin,
|
||||
'end': self.period_end, 'g1': '3', 'g2': '1', 'g3': '2'},
|
||||
{'f1': 2.0, 'f2': 3.0, 'f3': 1.0, 'begin': self.period_begin,
|
||||
'end': self.period_end, 'g1': '2', 'g2': '3', 'g3': '1'}
|
||||
]
|
||||
|
||||
data_csv = '\n'.join([','.join(map(str, d)) for d in data])
|
||||
mock_request.post.return_value = mock.Mock(text=data_csv)
|
||||
response = self.client.get_total(
|
||||
None, self.period_begin, self.period_end, custom_fields,
|
||||
filters={}, groupby=groups)
|
||||
|
||||
result = self.client.process_total(
|
||||
response, self.period_begin, self.period_end,
|
||||
groups, custom_fields, {})
|
||||
|
||||
self.assertEqual(result, expected_value)
|
||||
|
||||
def test_query_build(self):
|
||||
custom_fields = 'last(field1) AS F1, sum(field2) AS F2'
|
||||
groupby = ['group1', 'group2', 'group3']
|
||||
filters = {
|
||||
'filter1': '10',
|
||||
'filter2': 'filter2_filter'
|
||||
}
|
||||
beg = self.period_begin.isoformat()
|
||||
end = self.period_end.isoformat()
|
||||
expected = ('\n'
|
||||
' from(bucket:"cloudkitty")\n'
|
||||
f' |> range(start: {beg}, stop: {end})\n'
|
||||
' |> filter(fn: (r) => r["_measurement"] == '
|
||||
'"dataframes")\n'
|
||||
' |> filter(fn: (r) => r["_field"] == "field1"'
|
||||
' and r.filter1==10 and r.filter2=="filter2_filter" )\n'
|
||||
' |> group(columns: ["group1","group2",'
|
||||
'"group3"])\n'
|
||||
' |> last()\n'
|
||||
' |> keep(columns: ["group1", "group2",'
|
||||
' "group3", "_field", "_value", "_start", "_stop"])\n'
|
||||
' |> set(key: "_field", value: "F1")\n'
|
||||
' |> yield(name: "F1")\n'
|
||||
' \n'
|
||||
' from(bucket:"cloudkitty")\n'
|
||||
f' |> range(start: {beg}, stop: {end})\n'
|
||||
' |> filter(fn: (r) => r["_measurement"] == '
|
||||
'"dataframes")\n'
|
||||
' |> filter(fn: (r) => r["_field"] == "field2"'
|
||||
' and r.filter1==10 and r.filter2=="filter2_filter" )\n'
|
||||
' |> group(columns: ["group1","group2",'
|
||||
'"group3"])\n'
|
||||
' |> sum()\n'
|
||||
' |> keep(columns: ["group1", "group2", '
|
||||
'"group3", "_field", "_value", "_start", "_stop"])\n'
|
||||
' |> set(key: "_field", value: "F2")\n'
|
||||
' |> yield(name: "F2")\n'
|
||||
' ')
|
||||
|
||||
query = self.client.get_query(begin=self.period_begin,
|
||||
end=self.period_end,
|
||||
custom_fields=custom_fields,
|
||||
filters=filters,
|
||||
groupby=groupby)
|
||||
|
||||
self.assertEqual(query, expected)
|
||||
|
||||
def test_query_build_no_custom_fields(self):
|
||||
custom_fields = None
|
||||
groupby = ['group1', 'group2', 'group3']
|
||||
filters = {
|
||||
'filter1': '10',
|
||||
'filter2': 'filter2_filter'
|
||||
}
|
||||
beg = self.period_begin.isoformat()
|
||||
end = self.period_end.isoformat()
|
||||
self.maxDiff = None
|
||||
expected = ('\n'
|
||||
' from(bucket:"cloudkitty")\n'
|
||||
f' |> range(start: {beg}, stop: {end})\n'
|
||||
' |> filter(fn: (r) => r["_measurement"] == '
|
||||
'"dataframes")\n'
|
||||
' |> filter(fn: (r) => r["_field"] == "price"'
|
||||
' and r.filter1==10 and r.filter2=="filter2_filter" )\n'
|
||||
' |> group(columns: ["group1","group2",'
|
||||
'"group3"])\n'
|
||||
' |> sum()\n'
|
||||
' |> keep(columns: ["group1", "group2",'
|
||||
' "group3", "_field", "_value", "_start", "_stop"])\n'
|
||||
' |> set(key: "_field", value: "price")\n'
|
||||
' |> yield(name: "price")\n'
|
||||
' \n'
|
||||
' from(bucket:"cloudkitty")\n'
|
||||
f' |> range(start: {beg}, stop: {end})\n'
|
||||
' |> filter(fn: (r) => r["_measurement"] == '
|
||||
'"dataframes")\n'
|
||||
' |> filter(fn: (r) => r["_field"] == "qty"'
|
||||
' and r.filter1==10 and r.filter2=="filter2_filter" )\n'
|
||||
' |> group(columns: ["group1","group2",'
|
||||
'"group3"])\n'
|
||||
' |> sum()\n'
|
||||
' |> keep(columns: ["group1", "group2", '
|
||||
'"group3", "_field", "_value", "_start", "_stop"])\n'
|
||||
' |> set(key: "_field", value: "qty")\n'
|
||||
' |> yield(name: "qty")\n'
|
||||
' ')
|
||||
|
||||
query = self.client.get_query(begin=self.period_begin,
|
||||
end=self.period_end,
|
||||
custom_fields=custom_fields,
|
||||
filters=filters,
|
||||
groupby=groupby)
|
||||
|
||||
self.assertEqual(query, expected)
|
||||
|
||||
def test_query_build_all_custom_fields(self):
|
||||
custom_fields = '*'
|
||||
groupby = ['group1', 'group2', 'group3']
|
||||
filters = {
|
||||
'filter1': '10',
|
||||
'filter2': 'filter2_filter'
|
||||
}
|
||||
beg = self.period_begin.isoformat()
|
||||
end = self.period_end.isoformat()
|
||||
expected = (f'''
|
||||
from(bucket:"cloudkitty")
|
||||
|> range(start: {beg}, stop: {end})
|
||||
|> filter(fn: (r) => r["_measurement"] == "dataframes")
|
||||
|> filter(fn: (r) => r.filter1==10 and r.filter2=="filter
|
||||
2_filter")
|
||||
|> group(columns: ["group1","group2","group3"])
|
||||
|> drop(columns: ["_time"])
|
||||
|> yield(name: "result")'''.replace(
|
||||
' ', '').replace('\n', '').replace('\t', ''))
|
||||
|
||||
query = self.client.get_query(begin=self.period_begin,
|
||||
end=self.period_end,
|
||||
custom_fields=custom_fields,
|
||||
filters=filters,
|
||||
groupby=groupby).replace(
|
||||
' ', '').replace('\n', '').replace('\t', '')
|
||||
|
||||
self.assertEqual(query, expected)
|
||||
|
|
|
@ -165,7 +165,7 @@ function configure_cloudkitty {
|
|||
iniset $CLOUDKITTY_CONF fetcher_keystone keystone_version 3
|
||||
fi
|
||||
|
||||
if [ "$CLOUDKITTY_STORAGE_BACKEND" == "influxdb" ]; then
|
||||
if [ "$CLOUDKITTY_STORAGE_BACKEND" == "influxdb" ] && [ "$CLOUDKITTY_INFLUX_VERSION" == 1 ]; then
|
||||
iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} user ${CLOUDKITTY_INFLUXDB_USER}
|
||||
iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} password ${CLOUDKITTY_INFLUXDB_PASSWORD}
|
||||
iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} database ${CLOUDKITTY_INFLUXDB_DATABASE}
|
||||
|
@ -173,6 +173,14 @@ function configure_cloudkitty {
|
|||
iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} port ${CLOUDKITTY_INFLUXDB_PORT}
|
||||
fi
|
||||
|
||||
if [ "$CLOUDKITTY_STORAGE_BACKEND" == "influxdb" ] && [ "$CLOUDKITTY_INFLUX_VERSION" == 2 ]; then
|
||||
iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} host ${CLOUDKITTY_INFLUXDB_HOST}
|
||||
iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} port ${CLOUDKITTY_INFLUXDB_PORT}
|
||||
iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} url "http://${CLOUDKITTY_INFLUXDB_HOST}:${CLOUDKITTY_INFLUXDB_PORT}"
|
||||
iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} token ${CLOUDKITTY_INFLUXDB_PASSWORD}
|
||||
iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} version 2
|
||||
fi
|
||||
|
||||
if [ "$CLOUDKITTY_STORAGE_BACKEND" == "elasticsearch" ]; then
|
||||
iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} host ${CLOUDKITTY_ELASTICSEARCH_HOST}
|
||||
iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} index_name ${CLOUDKITTY_ELASTICSEARCH_INDEX}
|
||||
|
@ -242,9 +250,13 @@ function create_cloudkitty_data_dir {
|
|||
}
|
||||
|
||||
function create_influxdb_database {
|
||||
if [ "$CLOUDKITTY_STORAGE_BACKEND" == "influxdb" ]; then
|
||||
if [ "$CLOUDKITTY_STORAGE_BACKEND" == "influxdb" ] && [ "$CLOUDKITTY_INFLUX_VERSION" == 1 ]; then
|
||||
influx -execute "CREATE DATABASE ${CLOUDKITTY_INFLUXDB_DATABASE}"
|
||||
fi
|
||||
if [ "$CLOUDKITTY_STORAGE_BACKEND" == "influxdb" ] && [ "$CLOUDKITTY_INFLUX_VERSION" == 2 ]; then
|
||||
influx setup --username ${CLOUDKITTY_INFLUXDB_USER} --password ${CLOUDKITTY_INFLUXDB_PASSWORD} --token ${CLOUDKITTY_INFLUXDB_PASSWORD} --org openstack --bucket cloudkitty --force
|
||||
fi
|
||||
|
||||
}
|
||||
|
||||
function create_elasticsearch_index {
|
||||
|
@ -296,11 +308,27 @@ function install_influx_ubuntu {
|
|||
sudo dpkg -i --skip-same-version ${influxdb_file}
|
||||
}
|
||||
|
||||
function install_influx_v2_ubuntu {
|
||||
local influxdb_file=$(get_extra_file https://dl.influxdata.com/influxdb/releases/influxdb2_2.7.5-1_amd64.deb)
|
||||
sudo dpkg -i --skip-same-version ${influxdb_file}
|
||||
local influxcli_file=$(get_extra_file https://dl.influxdata.com/influxdb/releases/influxdb2-client-2.7.3-linux-amd64.tar.gz)
|
||||
tar xvzf ${influxcli_file}
|
||||
sudo cp ./influx /usr/local/bin/
|
||||
}
|
||||
|
||||
function install_influx_fedora {
|
||||
local influxdb_file=$(get_extra_file https://dl.influxdata.com/influxdb/releases/influxdb-1.6.3.x86_64.rpm)
|
||||
sudo yum localinstall -y ${influxdb_file}
|
||||
}
|
||||
|
||||
function install_influx_v2_fedora {
|
||||
local influxdb_file=$(get_extra_file https://dl.influxdata.com/influxdb/releases/influxdb2-2.7.5-1.x86_64.rpm)
|
||||
sudo yum localinstall -y ${influxdb_file}
|
||||
local influxcli_file=$(get_extra_file https://dl.influxdata.com/influxdb/releases/influxdb2-client-2.7.3-linux-amd64.tar.gz)
|
||||
tar xvzf ${influxcli_file}
|
||||
sudo cp ./influx /usr/local/bin/
|
||||
}
|
||||
|
||||
function install_influx {
|
||||
if is_ubuntu; then
|
||||
install_influx_ubuntu
|
||||
|
@ -313,6 +341,19 @@ function install_influx {
|
|||
sudo systemctl start influxdb || sudo systemctl restart influxdb
|
||||
}
|
||||
|
||||
|
||||
function install_influx_v2 {
|
||||
if is_ubuntu; then
|
||||
install_influx_v2_ubuntu
|
||||
elif is_fedora; then
|
||||
install_influx_v2_fedora
|
||||
else
|
||||
die $LINENO "Distribution must be Debian or Fedora-based"
|
||||
fi
|
||||
sudo cp -f "${CLOUDKITTY_DIR}"/devstack/files/influxdb.conf /etc/influxdb/influxdb.conf
|
||||
sudo systemctl start influxdb || sudo systemctl restart influxdb
|
||||
}
|
||||
|
||||
function install_elasticsearch_ubuntu {
|
||||
local opensearch_file=$(get_extra_file https://artifacts.opensearch.org/releases/bundle/opensearch/1.3.9/opensearch-1.3.9-linux-x64.deb)
|
||||
sudo dpkg -i --skip-same-version ${opensearch_file}
|
||||
|
@ -367,9 +408,10 @@ function install_opensearch {
|
|||
function install_cloudkitty {
|
||||
git_clone $CLOUDKITTY_REPO $CLOUDKITTY_DIR $CLOUDKITTY_BRANCH
|
||||
setup_develop $CLOUDKITTY_DIR
|
||||
|
||||
if [ $CLOUDKITTY_STORAGE_BACKEND == 'influxdb' ]; then
|
||||
if [ $CLOUDKITTY_STORAGE_BACKEND == 'influxdb' ] && [ "$CLOUDKITTY_INFLUX_VERSION" == 1 ]; then
|
||||
install_influx
|
||||
elif [ $CLOUDKITTY_STORAGE_BACKEND == 'influxdb' ] && [ "$CLOUDKITTY_INFLUX_VERSION" == 2 ]; then
|
||||
install_influx_v2
|
||||
elif [ $CLOUDKITTY_STORAGE_BACKEND == 'elasticsearch' ]; then
|
||||
install_elasticsearch
|
||||
elif [ $CLOUDKITTY_STORAGE_BACKEND == 'opensearch' ]; then
|
||||
|
|
|
@ -50,6 +50,7 @@ CLOUDKITTY_METRICS_CONF=metrics.yml
|
|||
# Set CloudKitty storage info
|
||||
CLOUDKITTY_STORAGE_BACKEND=${CLOUDKITTY_STORAGE_BACKEND:-"influxdb"}
|
||||
CLOUDKITTY_STORAGE_VERSION=${CLOUDKITTY_STORAGE_VERSION:-"2"}
|
||||
CLOUDKITTY_INFLUX_VERSION=${CLOUDKITTY_INFLUX_VERSION:-1}
|
||||
|
||||
# Set CloudKitty output info
|
||||
CLOUDKITTY_OUTPUT_BACKEND=${CLOUDKITTY_OUTPUT_BACKEND:-"cloudkitty.backend.file.FileBackend"}
|
||||
|
|
|
@ -0,0 +1,4 @@
|
|||
---
|
||||
features:
|
||||
- |
|
||||
Add support to Influx v2 database as storage backend.
|
|
@ -34,6 +34,7 @@ stevedore>=3.2.2 # Apache-2.0
|
|||
tooz>=2.7.1 # Apache-2.0
|
||||
voluptuous>=0.12.0 # BSD License
|
||||
influxdb>=5.3.1 # MIT
|
||||
influxdb-client>=1.36.0 # MIT
|
||||
Flask>=2.0.0 # BSD
|
||||
Flask-RESTful>=0.3.9 # BSD
|
||||
cotyledon>=1.7.3 # Apache-2.0
|
||||
|
|
Loading…
Reference in New Issue