Merge "Fix Influx statistics and Alarm History behavior"

This commit is contained in:
Jenkins 2016-08-23 19:52:23 +00:00 committed by Gerrit Code Review
commit 34d7eeec2d
7 changed files with 166 additions and 102 deletions

View File

@ -13,17 +13,18 @@
*/
package monasca.api.infrastructure.persistence;
import monasca.api.ApiConfig;
import com.google.inject.Inject;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import monasca.api.ApiConfig;
public class PersistUtils {
private static final Logger logger = LoggerFactory.getLogger(PersistUtils.class);
@ -32,17 +33,7 @@ public class PersistUtils {
private final int DEFAULT_MAX_QUERY_LIMIT = 10000;
private final ThreadLocal<SimpleDateFormat> simpleDateFormatSpace = new ThreadLocal<>();
private static final String FORMAT_WITH_SPACE = "yyyy-MM-dd HH:mm:ss.SSSX";
private final ThreadLocal<SimpleDateFormat> simpleDateFormatSpaceOneDigitMilli = new ThreadLocal<>();
private static final String FORMAT_WITH_SPACE_ONE_DIGIT_MILLI = "yyyy-MM-dd HH:mm:ss.SX";
private final ThreadLocal<SimpleDateFormat> simpleDateFormatT = new ThreadLocal<>();
private static final String FORMAT_WITH_T = "yyyy-MM-dd'T'HH:mm:ss.SSSX";
private final ThreadLocal<SimpleDateFormat> simpleDateFormatTOneDigitMilli = new ThreadLocal<>();
private static final String FORMAT_WITH_T_ONE_DIGIT_MILLI = "yyyy-MM-dd'T'HH:mm:ss.SX";
private DateTimeFormatter isoFormat = ISODateTimeFormat.dateTime();
@Inject
public PersistUtils(ApiConfig config) {
@ -108,46 +99,7 @@ public class PersistUtils {
}
}
private Date parseForFormat(final String timeStamp,
final ThreadLocal<SimpleDateFormat> formatter,
final String format) throws ParseException {
if (formatter.get() == null) {
formatter.set(new SimpleDateFormat(format));
}
return formatter.get().parse(timeStamp);
}
public Date parseTimestamp(String timestampString) throws ParseException {
try {
// Handles 2 and 3 digit millis. '2016-01-01 01:01:01.12Z' or '2016-01-01 01:01:01.123Z'
return parseForFormat(timestampString, this.simpleDateFormatSpace, FORMAT_WITH_SPACE);
} catch (ParseException pe0) {
try {
// Handles 1 digit millis. '2016-01-01 01:01:01.1Z'
return parseForFormat(timestampString, this.simpleDateFormatSpaceOneDigitMilli,
FORMAT_WITH_SPACE_ONE_DIGIT_MILLI);
} catch (ParseException pe1) {
try {
// Handles 2 and 3 digit millis with 'T'. Comes from the Python Persister.
// '2016-01-01T01:01:01.12Z' or '2016-01-01T01:01:01.123Z'
return parseForFormat(timestampString, this.simpleDateFormatT, FORMAT_WITH_T);
} catch (ParseException pe2) {
// Handles 1 digit millis with 'T'. Comes from the Python Persister.
// '2016-01-01T01:01:01.1Z'
return parseForFormat(timestampString, this.simpleDateFormatTOneDigitMilli,
FORMAT_WITH_T_ONE_DIGIT_MILLI);
}
}
}
return isoFormat.parseDateTime(timestampString.trim().replace(' ', 'T')).toDate();
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2014, 2016 Hewlett-Packard Development Company, L.P.
* (C) Copyright 2014, 2016 Hewlett-Packard Development LP
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@ -13,11 +13,14 @@
*/
package monasca.api.infrastructure.persistence.influxdb;
import com.google.common.base.Strings;
import com.google.inject.Inject;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -31,7 +34,6 @@ import javax.annotation.Nullable;
import monasca.api.ApiConfig;
import monasca.api.domain.exception.MultipleMetricsException;
import monasca.api.domain.model.measurement.Measurements;
import monasca.api.domain.model.statistic.StatisticRepo;
import monasca.api.domain.model.statistic.Statistics;
@ -46,6 +48,8 @@ public class InfluxV9StatisticRepo implements StatisticRepo {
private final InfluxV9RepoReader influxV9RepoReader;
private final InfluxV9Utils influxV9Utils;
private final InfluxV9MetricDefinitionRepo influxV9MetricDefinitionRepo;
private static final DateTimeFormatter ISO_8601_FORMATTER = ISODateTimeFormat
.dateOptionalTimeParser().withZoneUTC();
private final ObjectMapper objectMapper = new ObjectMapper();
@ -70,6 +74,18 @@ public class InfluxV9StatisticRepo implements StatisticRepo {
List<String> statistics, int period, String offset, int limit,
Boolean mergeMetricsFlag, String groupBy) throws Exception {
String offsetTimePart = "";
if (!Strings.isNullOrEmpty(offset)) {
int indexOfUnderscore = offset.indexOf('_');
if (indexOfUnderscore > -1) {
offsetTimePart = offset.substring(indexOfUnderscore + 1);
// Add the period to the offset to ensure only the next group of points are returned
DateTime offsetDateTime = DateTime.parse(offsetTimePart).plusSeconds(period);
// leave out any ID, as influx doesn't understand it
offset = offsetDateTime.toString();
}
}
String q = buildQuery(tenantId, name, dimensions, startTime, endTime,
statistics, period, offset, limit, mergeMetricsFlag, groupBy);
@ -91,12 +107,18 @@ public class InfluxV9StatisticRepo implements StatisticRepo {
String groupBy)
throws Exception {
String offsetTimePart = "";
if (!Strings.isNullOrEmpty(offset)) {
int indexOfUnderscore = offset.indexOf('_');
offsetTimePart = offset.substring(indexOfUnderscore + 1);
}
String q;
if (Boolean.TRUE.equals(mergeMetricsFlag)) {
q = String.format("select %1$s %2$s "
+ "where %3$s %4$s %5$s %6$s %7$s %8$s %9$s",
+ "where %3$s %4$s %5$s %6$s %7$s %8$s %9$s %10$s",
funcPart(statistics),
this.influxV9Utils.namePart(name, true),
this.influxV9Utils.privateTenantIdPart(tenantId),
@ -104,6 +126,7 @@ public class InfluxV9StatisticRepo implements StatisticRepo {
this.influxV9Utils.startTimePart(startTime),
this.influxV9Utils.dimPart(dimensions),
this.influxV9Utils.endTimePart(endTime),
this.influxV9Utils.timeOffsetPart(offsetTimePart),
this.influxV9Utils.periodPart(period),
this.influxV9Utils.limitPart(limit));
@ -174,7 +197,7 @@ public class InfluxV9StatisticRepo implements StatisticRepo {
List<Object> values = buildValsList(valueObjects);
if (((String) values.get(0)).compareTo(offsetTimestamp) > 0 || index > offsetId) {
if (((String) values.get(0)).compareTo(offsetTimestamp) >= 0 || index > offsetId) {
statistics.addMeasurement(values);
remaining_limit--;
}
@ -197,7 +220,14 @@ public class InfluxV9StatisticRepo implements StatisticRepo {
ArrayList<Object> valObjArryList = new ArrayList<>();
// First value is the timestamp.
valObjArryList.add(values[0]);
String timestamp = values[0].toString();
int index = timestamp.indexOf('.');
if (index > 0)
// In certain queries, timestamps will not align to second resolution,
// remove the sub-second values.
valObjArryList.add(timestamp.substring(0,index).concat("Z"));
else
valObjArryList.add(timestamp);
// All other values are doubles.
for (int i = 1; i < values.length; ++i) {

View File

@ -0,0 +1,54 @@
/*
* (C) Copyright 2016 Hewlett Packard Enterprise Development LP
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package monasca.api.infrastructure.persistence;
import static org.testng.Assert.assertEquals;
import org.testng.annotations.Test;
import java.text.ParseException;
@Test
public class PersistUtilsTest {
private final PersistUtils persistUtils = new PersistUtils();
public void test3DigitWithSpace() throws ParseException {
checkParseTimestamp("2016-01-01 01:01:01.123Z", "2016-01-01 01:01:01.123Z");
}
public void test2DigitWithSpace() throws ParseException {
checkParseTimestamp("2016-01-01 01:01:01.15Z", "2016-01-01 01:01:01.150Z");
}
public void test1DigitWithSpace() throws ParseException {
checkParseTimestamp("2016-01-01 01:01:01.1Z", "2016-01-01 01:01:01.100Z");
}
public void test3DigitWithT() throws ParseException {
checkParseTimestamp("2016-01-01T01:01:01.123Z", "2016-01-01T01:01:01.123Z");
}
public void test2DigitWithT() throws ParseException {
checkParseTimestamp("2016-01-01T01:01:01.15Z", "2016-01-01T01:01:01.150Z");
}
public void test1DigitWithT() throws ParseException {
checkParseTimestamp("2016-01-01T01:01:01.1Z", "2016-01-01T01:01:01.100Z");
}
private void checkParseTimestamp(final String start, final String expected) throws ParseException {
assertEquals(persistUtils.parseTimestamp(start).getTime(), persistUtils.parseTimestamp(expected).getTime());
}
}

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014 Hewlett-Packard
# (C) Copyright 2015,2016 Hewlett Packard Enterprise Development Company LP
# (C) Copyright 2015,2016 Hewlett Packard Enterprise Development LP
# Copyright 2015 Cray Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -14,8 +14,8 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from datetime import datetime
from datetime import timedelta
import hashlib
import json
@ -82,6 +82,15 @@ class MetricsRepository(metrics_repository.AbstractMetricsRepository):
region, start_timestamp,
end_timestamp)
if offset:
if '_' in offset:
tmp = datetime.strptime(str(offset).split('_')[1], "%Y-%m-%dT%H:%M:%SZ")
tmp = tmp + timedelta(seconds=int(period))
# Leave out any ID as influx doesn't understand it
offset = tmp.isoformat()
else:
tmp = datetime.strptime(offset, "%Y-%m-%dT%H:%M:%SZ")
offset = tmp + timedelta(seconds=int(period))
offset_clause = (" and time > '{}'".format(offset))
from_clause += offset_clause
@ -443,6 +452,10 @@ class MetricsRepository(metrics_repository.AbstractMetricsRepository):
stats_list = []
for stats in serie['values']:
# remove sub-second timestamp values (period can never be less than 1)
timestamp = stats[0]
if '.' in timestamp:
stats[0] = str(timestamp)[:19] + 'Z'
stats[1] = stats[1] or 0
stats_list.append(stats)

View File

@ -1,4 +1,4 @@
# (C) Copyright 2015-2016 Hewlett Packard Enterprise Development Company LP
# (C) Copyright 2015-2016 Hewlett Packard Enterprise Development LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -11,6 +11,8 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six.moves.urllib.parse as urlparse
from tempest import config
from tempest.common import credentials_factory
import tempest.test
@ -76,3 +78,20 @@ class BaseMonascaTest(tempest.test.BaseTestCase):
id = element['id']
cls.monasca_client.delete_alarm(id)
cls.cred_provider.clear_creds()
def _get_offset(self, response_body):
next_link = None
self_link = None
for link in response_body['links']:
if link['rel'] == 'next':
next_link = link['href']
if link['rel'] == 'self':
self_link = link['href']
if not next_link:
query_parms = urlparse.parse_qs(urlparse.urlparse(self_link).query)
self.fail("No next link returned with query parameters: {}".format(query_parms))
query_params = urlparse.parse_qs(urlparse.urlparse(next_link).query)
if 'offset' not in query_params:
self.fail("No offset in next link: {}".format(next_link))
return query_params['offset'][0]

View File

@ -47,11 +47,8 @@ class TestAlarmsStateHistoryOneTransition(base.BaseMonascaTest):
# MIN_HISTORY number of Alarms State History are needed.
metric = helpers.create_metric(name="name-" + str(i + 1))
cls.monasca_client.create_metrics(metric)
# sleep 1 second between metrics to make sure timestamps
# are different in the second field. Influxdb has a bug
# where it does not sort properly by milliseconds. .014
# is sorted as greater than .138
time.sleep(1.0)
# Ensure alarms transition at different times
time.sleep(0.1)
resp, response_body = cls.monasca_client.\
list_alarms_state_history()
elements = response_body['elements']

View File

@ -1,4 +1,4 @@
# (C) Copyright 2015-2016 Hewlett Packard Enterprise Development Company LP
# (C) Copyright 2015-2016 Hewlett Packard Enterprise Development LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -22,6 +22,7 @@ from monasca_tempest_tests.tests.api import helpers
from tempest.common.utils import data_utils
from tempest import test
from tempest.lib import exceptions
from urllib import urlencode
NUM_MEASUREMENTS = 100
MIN_REQUIRED_MEASUREMENTS = 2
@ -198,6 +199,7 @@ class TestStatistics(base.BaseMonascaTest):
value=4)
]
num_metrics = len(metric)
self.monasca_client.create_metrics(metric)
query_parms = '?name=' + name
for i in xrange(constants.MAX_RETRIES):
@ -208,7 +210,7 @@ class TestStatistics(base.BaseMonascaTest):
break
else:
time.sleep(constants.RETRY_WAIT_SECS)
self._check_timeout(i, constants.MAX_RETRIES, elements, 4)
self._check_timeout(i, constants.MAX_RETRIES, elements, num_metrics)
start_time = helpers.timestamp_to_iso(start_timestamp)
end_timestamp = start_timestamp + 4000
@ -223,42 +225,35 @@ class TestStatistics(base.BaseMonascaTest):
query_parms = '?name=' + name + '&merge_metrics=true&statistics=avg'\
+ '&start_time=' + str(start_time) + '&end_time=' + \
str(end_time) + '&period=1' + '&limit=4'
str(end_time) + '&period=1' + '&limit=' + str(num_metrics)
resp, response_body = self.monasca_client.list_statistics(
query_parms)
self.assertEqual(200, resp.status)
elements = response_body['elements'][0]['statistics']
self.assertEqual(4, len(elements))
self.assertEqual(num_metrics, len(elements))
self.assertEqual(first_element, elements[0])
for index in xrange(1, 4):
max_limit = 4 - index
for limit in xrange(1, num_metrics):
start_index = 0
params = [('name', name),
('merge_metrics', 'true'),
('statistics', 'avg'),
('start_time', str(start_time)),
('end_time', str(end_time)),
('period', 1),
('limit', limit)
]
offset = None
while True:
num_expected_elements = limit
if (num_expected_elements + start_index) > num_metrics:
num_expected_elements = num_metrics - start_index
# Get first offset from api
query_parms = '?name=' + str(name) + \
'&merge_metrics=true&start_time=' + elements[index - 1][0] + \
'&end_time=' + end_time + \
'&limit=1'
resp, response_body = self.monasca_client.list_measurements(query_parms)
for link in response_body['links']:
if link['rel'] == 'next':
next_link = link['href']
if not next_link:
self.fail("No next link returned with query parameters: {}".formet(query_parms))
offset = helpers.get_query_param(next_link, "offset")
# python api returns exact timestamp, but the test needs a rounded number
offset_period_index = offset.find('.')
offset = offset[:offset_period_index] + 'Z'
for limit in xrange(1, max_limit):
expected_elements = [elem for elem in elements if elem[0] > offset]
expected_elements = expected_elements[:limit]
query_parms = '?name=' + name + '&merge_metrics=true' + \
'&statistics=avg' + '&start_time=' + \
str(start_time) + '&end_time=' + \
str(end_time) + '&period=1' + '&limit=' + \
str(limit) + '&offset=' + str(offset)
these_params = list(params)
# If not the first call, use the offset returned by the last call
if offset:
these_params.extend([('offset', str(offset))])
query_parms = '?' + urlencode(these_params)
resp, response_body = self.monasca_client.list_statistics(query_parms)
self.assertEqual(200, resp.status)
if not response_body['elements']:
@ -267,10 +262,14 @@ class TestStatistics(base.BaseMonascaTest):
self.fail("No statistics returned")
new_elements = response_body['elements'][0]['statistics']
self.assertEqual(limit, len(new_elements))
# bug in the python API causes limit 1 to not have matching timestamps
if limit > 1:
self.assertEqual(expected_elements, new_elements)
self.assertEqual(num_expected_elements, len(new_elements))
expected_elements = elements[start_index:start_index+limit]
self.assertEqual(expected_elements, new_elements)
start_index += num_expected_elements
if start_index >= num_metrics:
break
# Get the next set
offset = self._get_offset(response_body)
@test.attr(type="gate")