Merge "Fix Influx statistics and Alarm History behavior"
This commit is contained in:
commit
34d7eeec2d
|
@ -13,17 +13,18 @@
|
|||
*/
|
||||
package monasca.api.infrastructure.persistence;
|
||||
|
||||
import monasca.api.ApiConfig;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
|
||||
import org.joda.time.format.DateTimeFormatter;
|
||||
import org.joda.time.format.ISODateTimeFormat;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.text.ParseException;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Date;
|
||||
|
||||
import monasca.api.ApiConfig;
|
||||
|
||||
public class PersistUtils {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(PersistUtils.class);
|
||||
|
@ -32,17 +33,7 @@ public class PersistUtils {
|
|||
|
||||
private final int DEFAULT_MAX_QUERY_LIMIT = 10000;
|
||||
|
||||
private final ThreadLocal<SimpleDateFormat> simpleDateFormatSpace = new ThreadLocal<>();
|
||||
private static final String FORMAT_WITH_SPACE = "yyyy-MM-dd HH:mm:ss.SSSX";
|
||||
|
||||
private final ThreadLocal<SimpleDateFormat> simpleDateFormatSpaceOneDigitMilli = new ThreadLocal<>();
|
||||
private static final String FORMAT_WITH_SPACE_ONE_DIGIT_MILLI = "yyyy-MM-dd HH:mm:ss.SX";
|
||||
|
||||
private final ThreadLocal<SimpleDateFormat> simpleDateFormatT = new ThreadLocal<>();
|
||||
private static final String FORMAT_WITH_T = "yyyy-MM-dd'T'HH:mm:ss.SSSX";
|
||||
|
||||
private final ThreadLocal<SimpleDateFormat> simpleDateFormatTOneDigitMilli = new ThreadLocal<>();
|
||||
private static final String FORMAT_WITH_T_ONE_DIGIT_MILLI = "yyyy-MM-dd'T'HH:mm:ss.SX";
|
||||
private DateTimeFormatter isoFormat = ISODateTimeFormat.dateTime();
|
||||
|
||||
@Inject
|
||||
public PersistUtils(ApiConfig config) {
|
||||
|
@ -108,46 +99,7 @@ public class PersistUtils {
|
|||
}
|
||||
}
|
||||
|
||||
private Date parseForFormat(final String timeStamp,
|
||||
final ThreadLocal<SimpleDateFormat> formatter,
|
||||
final String format) throws ParseException {
|
||||
if (formatter.get() == null) {
|
||||
formatter.set(new SimpleDateFormat(format));
|
||||
}
|
||||
return formatter.get().parse(timeStamp);
|
||||
}
|
||||
|
||||
public Date parseTimestamp(String timestampString) throws ParseException {
|
||||
|
||||
try {
|
||||
|
||||
// Handles 2 and 3 digit millis. '2016-01-01 01:01:01.12Z' or '2016-01-01 01:01:01.123Z'
|
||||
return parseForFormat(timestampString, this.simpleDateFormatSpace, FORMAT_WITH_SPACE);
|
||||
|
||||
} catch (ParseException pe0) {
|
||||
|
||||
try {
|
||||
|
||||
// Handles 1 digit millis. '2016-01-01 01:01:01.1Z'
|
||||
return parseForFormat(timestampString, this.simpleDateFormatSpaceOneDigitMilli,
|
||||
FORMAT_WITH_SPACE_ONE_DIGIT_MILLI);
|
||||
|
||||
} catch (ParseException pe1) {
|
||||
|
||||
try {
|
||||
|
||||
// Handles 2 and 3 digit millis with 'T'. Comes from the Python Persister.
|
||||
// '2016-01-01T01:01:01.12Z' or '2016-01-01T01:01:01.123Z'
|
||||
return parseForFormat(timestampString, this.simpleDateFormatT, FORMAT_WITH_T);
|
||||
|
||||
} catch (ParseException pe2) {
|
||||
|
||||
// Handles 1 digit millis with 'T'. Comes from the Python Persister.
|
||||
// '2016-01-01T01:01:01.1Z'
|
||||
return parseForFormat(timestampString, this.simpleDateFormatTOneDigitMilli,
|
||||
FORMAT_WITH_T_ONE_DIGIT_MILLI);
|
||||
}
|
||||
}
|
||||
}
|
||||
return isoFormat.parseDateTime(timestampString.trim().replace(' ', 'T')).toDate();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright (c) 2014, 2016 Hewlett-Packard Development Company, L.P.
|
||||
* (C) Copyright 2014, 2016 Hewlett-Packard Development LP
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License. You may obtain a copy of the License at
|
||||
|
@ -13,11 +13,14 @@
|
|||
*/
|
||||
package monasca.api.infrastructure.persistence.influxdb;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.inject.Inject;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.format.DateTimeFormatter;
|
||||
import org.joda.time.format.ISODateTimeFormat;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -31,7 +34,6 @@ import javax.annotation.Nullable;
|
|||
|
||||
import monasca.api.ApiConfig;
|
||||
import monasca.api.domain.exception.MultipleMetricsException;
|
||||
import monasca.api.domain.model.measurement.Measurements;
|
||||
import monasca.api.domain.model.statistic.StatisticRepo;
|
||||
import monasca.api.domain.model.statistic.Statistics;
|
||||
|
||||
|
@ -46,6 +48,8 @@ public class InfluxV9StatisticRepo implements StatisticRepo {
|
|||
private final InfluxV9RepoReader influxV9RepoReader;
|
||||
private final InfluxV9Utils influxV9Utils;
|
||||
private final InfluxV9MetricDefinitionRepo influxV9MetricDefinitionRepo;
|
||||
private static final DateTimeFormatter ISO_8601_FORMATTER = ISODateTimeFormat
|
||||
.dateOptionalTimeParser().withZoneUTC();
|
||||
|
||||
|
||||
private final ObjectMapper objectMapper = new ObjectMapper();
|
||||
|
@ -70,6 +74,18 @@ public class InfluxV9StatisticRepo implements StatisticRepo {
|
|||
List<String> statistics, int period, String offset, int limit,
|
||||
Boolean mergeMetricsFlag, String groupBy) throws Exception {
|
||||
|
||||
String offsetTimePart = "";
|
||||
if (!Strings.isNullOrEmpty(offset)) {
|
||||
int indexOfUnderscore = offset.indexOf('_');
|
||||
if (indexOfUnderscore > -1) {
|
||||
offsetTimePart = offset.substring(indexOfUnderscore + 1);
|
||||
// Add the period to the offset to ensure only the next group of points are returned
|
||||
DateTime offsetDateTime = DateTime.parse(offsetTimePart).plusSeconds(period);
|
||||
// leave out any ID, as influx doesn't understand it
|
||||
offset = offsetDateTime.toString();
|
||||
}
|
||||
}
|
||||
|
||||
String q = buildQuery(tenantId, name, dimensions, startTime, endTime,
|
||||
statistics, period, offset, limit, mergeMetricsFlag, groupBy);
|
||||
|
||||
|
@ -91,12 +107,18 @@ public class InfluxV9StatisticRepo implements StatisticRepo {
|
|||
String groupBy)
|
||||
throws Exception {
|
||||
|
||||
String offsetTimePart = "";
|
||||
if (!Strings.isNullOrEmpty(offset)) {
|
||||
int indexOfUnderscore = offset.indexOf('_');
|
||||
offsetTimePart = offset.substring(indexOfUnderscore + 1);
|
||||
}
|
||||
|
||||
String q;
|
||||
|
||||
if (Boolean.TRUE.equals(mergeMetricsFlag)) {
|
||||
|
||||
q = String.format("select %1$s %2$s "
|
||||
+ "where %3$s %4$s %5$s %6$s %7$s %8$s %9$s",
|
||||
+ "where %3$s %4$s %5$s %6$s %7$s %8$s %9$s %10$s",
|
||||
funcPart(statistics),
|
||||
this.influxV9Utils.namePart(name, true),
|
||||
this.influxV9Utils.privateTenantIdPart(tenantId),
|
||||
|
@ -104,6 +126,7 @@ public class InfluxV9StatisticRepo implements StatisticRepo {
|
|||
this.influxV9Utils.startTimePart(startTime),
|
||||
this.influxV9Utils.dimPart(dimensions),
|
||||
this.influxV9Utils.endTimePart(endTime),
|
||||
this.influxV9Utils.timeOffsetPart(offsetTimePart),
|
||||
this.influxV9Utils.periodPart(period),
|
||||
this.influxV9Utils.limitPart(limit));
|
||||
|
||||
|
@ -174,7 +197,7 @@ public class InfluxV9StatisticRepo implements StatisticRepo {
|
|||
|
||||
List<Object> values = buildValsList(valueObjects);
|
||||
|
||||
if (((String) values.get(0)).compareTo(offsetTimestamp) > 0 || index > offsetId) {
|
||||
if (((String) values.get(0)).compareTo(offsetTimestamp) >= 0 || index > offsetId) {
|
||||
statistics.addMeasurement(values);
|
||||
remaining_limit--;
|
||||
}
|
||||
|
@ -197,7 +220,14 @@ public class InfluxV9StatisticRepo implements StatisticRepo {
|
|||
ArrayList<Object> valObjArryList = new ArrayList<>();
|
||||
|
||||
// First value is the timestamp.
|
||||
valObjArryList.add(values[0]);
|
||||
String timestamp = values[0].toString();
|
||||
int index = timestamp.indexOf('.');
|
||||
if (index > 0)
|
||||
// In certain queries, timestamps will not align to second resolution,
|
||||
// remove the sub-second values.
|
||||
valObjArryList.add(timestamp.substring(0,index).concat("Z"));
|
||||
else
|
||||
valObjArryList.add(timestamp);
|
||||
|
||||
// All other values are doubles.
|
||||
for (int i = 1; i < values.length; ++i) {
|
||||
|
|
|
@ -0,0 +1,54 @@
|
|||
/*
|
||||
* (C) Copyright 2016 Hewlett Packard Enterprise Development LP
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
* or implied. See the License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
|
||||
package monasca.api.infrastructure.persistence;
|
||||
|
||||
import static org.testng.Assert.assertEquals;
|
||||
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import java.text.ParseException;
|
||||
|
||||
@Test
|
||||
public class PersistUtilsTest {
|
||||
private final PersistUtils persistUtils = new PersistUtils();
|
||||
|
||||
public void test3DigitWithSpace() throws ParseException {
|
||||
checkParseTimestamp("2016-01-01 01:01:01.123Z", "2016-01-01 01:01:01.123Z");
|
||||
}
|
||||
|
||||
public void test2DigitWithSpace() throws ParseException {
|
||||
checkParseTimestamp("2016-01-01 01:01:01.15Z", "2016-01-01 01:01:01.150Z");
|
||||
}
|
||||
|
||||
public void test1DigitWithSpace() throws ParseException {
|
||||
checkParseTimestamp("2016-01-01 01:01:01.1Z", "2016-01-01 01:01:01.100Z");
|
||||
}
|
||||
|
||||
public void test3DigitWithT() throws ParseException {
|
||||
checkParseTimestamp("2016-01-01T01:01:01.123Z", "2016-01-01T01:01:01.123Z");
|
||||
}
|
||||
|
||||
public void test2DigitWithT() throws ParseException {
|
||||
checkParseTimestamp("2016-01-01T01:01:01.15Z", "2016-01-01T01:01:01.150Z");
|
||||
}
|
||||
|
||||
public void test1DigitWithT() throws ParseException {
|
||||
checkParseTimestamp("2016-01-01T01:01:01.1Z", "2016-01-01T01:01:01.100Z");
|
||||
}
|
||||
|
||||
private void checkParseTimestamp(final String start, final String expected) throws ParseException {
|
||||
assertEquals(persistUtils.parseTimestamp(start).getTime(), persistUtils.parseTimestamp(expected).getTime());
|
||||
}
|
||||
}
|
|
@ -1,6 +1,6 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2014 Hewlett-Packard
|
||||
# (C) Copyright 2015,2016 Hewlett Packard Enterprise Development Company LP
|
||||
# (C) Copyright 2015,2016 Hewlett Packard Enterprise Development LP
|
||||
# Copyright 2015 Cray Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
|
@ -14,8 +14,8 @@
|
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from datetime import datetime
|
||||
from datetime import timedelta
|
||||
import hashlib
|
||||
import json
|
||||
|
||||
|
@ -82,6 +82,15 @@ class MetricsRepository(metrics_repository.AbstractMetricsRepository):
|
|||
region, start_timestamp,
|
||||
end_timestamp)
|
||||
if offset:
|
||||
if '_' in offset:
|
||||
tmp = datetime.strptime(str(offset).split('_')[1], "%Y-%m-%dT%H:%M:%SZ")
|
||||
tmp = tmp + timedelta(seconds=int(period))
|
||||
# Leave out any ID as influx doesn't understand it
|
||||
offset = tmp.isoformat()
|
||||
else:
|
||||
tmp = datetime.strptime(offset, "%Y-%m-%dT%H:%M:%SZ")
|
||||
offset = tmp + timedelta(seconds=int(period))
|
||||
|
||||
offset_clause = (" and time > '{}'".format(offset))
|
||||
from_clause += offset_clause
|
||||
|
||||
|
@ -443,6 +452,10 @@ class MetricsRepository(metrics_repository.AbstractMetricsRepository):
|
|||
|
||||
stats_list = []
|
||||
for stats in serie['values']:
|
||||
# remove sub-second timestamp values (period can never be less than 1)
|
||||
timestamp = stats[0]
|
||||
if '.' in timestamp:
|
||||
stats[0] = str(timestamp)[:19] + 'Z'
|
||||
stats[1] = stats[1] or 0
|
||||
stats_list.append(stats)
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
# (C) Copyright 2015-2016 Hewlett Packard Enterprise Development Company LP
|
||||
# (C) Copyright 2015-2016 Hewlett Packard Enterprise Development LP
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
|
@ -11,6 +11,8 @@
|
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import six.moves.urllib.parse as urlparse
|
||||
from tempest import config
|
||||
from tempest.common import credentials_factory
|
||||
import tempest.test
|
||||
|
@ -76,3 +78,20 @@ class BaseMonascaTest(tempest.test.BaseTestCase):
|
|||
id = element['id']
|
||||
cls.monasca_client.delete_alarm(id)
|
||||
cls.cred_provider.clear_creds()
|
||||
|
||||
def _get_offset(self, response_body):
|
||||
next_link = None
|
||||
self_link = None
|
||||
for link in response_body['links']:
|
||||
if link['rel'] == 'next':
|
||||
next_link = link['href']
|
||||
if link['rel'] == 'self':
|
||||
self_link = link['href']
|
||||
if not next_link:
|
||||
query_parms = urlparse.parse_qs(urlparse.urlparse(self_link).query)
|
||||
self.fail("No next link returned with query parameters: {}".format(query_parms))
|
||||
query_params = urlparse.parse_qs(urlparse.urlparse(next_link).query)
|
||||
if 'offset' not in query_params:
|
||||
self.fail("No offset in next link: {}".format(next_link))
|
||||
|
||||
return query_params['offset'][0]
|
||||
|
|
|
@ -47,11 +47,8 @@ class TestAlarmsStateHistoryOneTransition(base.BaseMonascaTest):
|
|||
# MIN_HISTORY number of Alarms State History are needed.
|
||||
metric = helpers.create_metric(name="name-" + str(i + 1))
|
||||
cls.monasca_client.create_metrics(metric)
|
||||
# sleep 1 second between metrics to make sure timestamps
|
||||
# are different in the second field. Influxdb has a bug
|
||||
# where it does not sort properly by milliseconds. .014
|
||||
# is sorted as greater than .138
|
||||
time.sleep(1.0)
|
||||
# Ensure alarms transition at different times
|
||||
time.sleep(0.1)
|
||||
resp, response_body = cls.monasca_client.\
|
||||
list_alarms_state_history()
|
||||
elements = response_body['elements']
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
# (C) Copyright 2015-2016 Hewlett Packard Enterprise Development Company LP
|
||||
# (C) Copyright 2015-2016 Hewlett Packard Enterprise Development LP
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
|
@ -22,6 +22,7 @@ from monasca_tempest_tests.tests.api import helpers
|
|||
from tempest.common.utils import data_utils
|
||||
from tempest import test
|
||||
from tempest.lib import exceptions
|
||||
from urllib import urlencode
|
||||
|
||||
NUM_MEASUREMENTS = 100
|
||||
MIN_REQUIRED_MEASUREMENTS = 2
|
||||
|
@ -198,6 +199,7 @@ class TestStatistics(base.BaseMonascaTest):
|
|||
value=4)
|
||||
]
|
||||
|
||||
num_metrics = len(metric)
|
||||
self.monasca_client.create_metrics(metric)
|
||||
query_parms = '?name=' + name
|
||||
for i in xrange(constants.MAX_RETRIES):
|
||||
|
@ -208,7 +210,7 @@ class TestStatistics(base.BaseMonascaTest):
|
|||
break
|
||||
else:
|
||||
time.sleep(constants.RETRY_WAIT_SECS)
|
||||
self._check_timeout(i, constants.MAX_RETRIES, elements, 4)
|
||||
self._check_timeout(i, constants.MAX_RETRIES, elements, num_metrics)
|
||||
|
||||
start_time = helpers.timestamp_to_iso(start_timestamp)
|
||||
end_timestamp = start_timestamp + 4000
|
||||
|
@ -223,42 +225,35 @@ class TestStatistics(base.BaseMonascaTest):
|
|||
|
||||
query_parms = '?name=' + name + '&merge_metrics=true&statistics=avg'\
|
||||
+ '&start_time=' + str(start_time) + '&end_time=' + \
|
||||
str(end_time) + '&period=1' + '&limit=4'
|
||||
str(end_time) + '&period=1' + '&limit=' + str(num_metrics)
|
||||
resp, response_body = self.monasca_client.list_statistics(
|
||||
query_parms)
|
||||
self.assertEqual(200, resp.status)
|
||||
elements = response_body['elements'][0]['statistics']
|
||||
self.assertEqual(4, len(elements))
|
||||
self.assertEqual(num_metrics, len(elements))
|
||||
self.assertEqual(first_element, elements[0])
|
||||
|
||||
for index in xrange(1, 4):
|
||||
max_limit = 4 - index
|
||||
for limit in xrange(1, num_metrics):
|
||||
start_index = 0
|
||||
params = [('name', name),
|
||||
('merge_metrics', 'true'),
|
||||
('statistics', 'avg'),
|
||||
('start_time', str(start_time)),
|
||||
('end_time', str(end_time)),
|
||||
('period', 1),
|
||||
('limit', limit)
|
||||
]
|
||||
offset = None
|
||||
while True:
|
||||
num_expected_elements = limit
|
||||
if (num_expected_elements + start_index) > num_metrics:
|
||||
num_expected_elements = num_metrics - start_index
|
||||
|
||||
# Get first offset from api
|
||||
query_parms = '?name=' + str(name) + \
|
||||
'&merge_metrics=true&start_time=' + elements[index - 1][0] + \
|
||||
'&end_time=' + end_time + \
|
||||
'&limit=1'
|
||||
resp, response_body = self.monasca_client.list_measurements(query_parms)
|
||||
for link in response_body['links']:
|
||||
if link['rel'] == 'next':
|
||||
next_link = link['href']
|
||||
if not next_link:
|
||||
self.fail("No next link returned with query parameters: {}".formet(query_parms))
|
||||
offset = helpers.get_query_param(next_link, "offset")
|
||||
# python api returns exact timestamp, but the test needs a rounded number
|
||||
offset_period_index = offset.find('.')
|
||||
offset = offset[:offset_period_index] + 'Z'
|
||||
|
||||
for limit in xrange(1, max_limit):
|
||||
expected_elements = [elem for elem in elements if elem[0] > offset]
|
||||
expected_elements = expected_elements[:limit]
|
||||
|
||||
query_parms = '?name=' + name + '&merge_metrics=true' + \
|
||||
'&statistics=avg' + '&start_time=' + \
|
||||
str(start_time) + '&end_time=' + \
|
||||
str(end_time) + '&period=1' + '&limit=' + \
|
||||
str(limit) + '&offset=' + str(offset)
|
||||
these_params = list(params)
|
||||
# If not the first call, use the offset returned by the last call
|
||||
if offset:
|
||||
these_params.extend([('offset', str(offset))])
|
||||
query_parms = '?' + urlencode(these_params)
|
||||
resp, response_body = self.monasca_client.list_statistics(query_parms)
|
||||
self.assertEqual(200, resp.status)
|
||||
if not response_body['elements']:
|
||||
|
@ -267,10 +262,14 @@ class TestStatistics(base.BaseMonascaTest):
|
|||
self.fail("No statistics returned")
|
||||
new_elements = response_body['elements'][0]['statistics']
|
||||
|
||||
self.assertEqual(limit, len(new_elements))
|
||||
# bug in the python API causes limit 1 to not have matching timestamps
|
||||
if limit > 1:
|
||||
self.assertEqual(expected_elements, new_elements)
|
||||
self.assertEqual(num_expected_elements, len(new_elements))
|
||||
expected_elements = elements[start_index:start_index+limit]
|
||||
self.assertEqual(expected_elements, new_elements)
|
||||
start_index += num_expected_elements
|
||||
if start_index >= num_metrics:
|
||||
break
|
||||
# Get the next set
|
||||
offset = self._get_offset(response_body)
|
||||
|
||||
|
||||
@test.attr(type="gate")
|
||||
|
|
Loading…
Reference in New Issue