Implement the Last Function

The Alarm state is driven by the last measurement with the newest
timestamp. Use the value even if the measurement is older than the
oldest bucket. This ensures the measurement will be used when the
Threshold Engine is started if the measurement
is received while the Threshold Engine is stopped

Never evaluate subAlarm with function Last except on receiving of
a measurement.

Add tests to ensure this works.

The change is dependent on the monasca-common change and the
change to monasca-api to add the state field to sub_alarm.

Change-Id: Ib5123ed035018757a50d9ebeb7335fbca48054f2
Implements: Blueprint last-value
This commit is contained in:
Craig Bryant 2016-07-27 14:30:12 -06:00
parent 865816dd78
commit 2acdd58dc3
14 changed files with 620 additions and 162 deletions

View File

@ -1,5 +1,5 @@
/*
* (C) Copyright 2015-2016 Hewlett Packard Enterprise Development Company LP.
* (C) Copyright 2015-2016 Hewlett Packard Enterprise Development LP
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -159,7 +159,7 @@ public class TopologyModule extends AbstractModule {
// Filtering / Event / Alarm Creation -> Aggregation
builder
.setBolt("aggregation-bolt",
new MetricAggregationBolt(config), config.aggregationBoltThreads)
new MetricAggregationBolt(config, config.database), config.aggregationBoltThreads)
.fieldsGrouping("filtering-bolt", new Fields(MetricFilteringBolt.FIELDS[0]))
.allGrouping("filtering-bolt", MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM)
.fieldsGrouping("filtering-bolt", AlarmCreationBolt.ALARM_CREATION_STREAM,

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
* (C) Copyright 2014-2016 Hewlett Packard Enterprise Development LP
* Copyright 2016 FUJITSU LIMITED
*
* Licensed under the Apache License, Version 2.0 (the "License");
@ -197,8 +197,11 @@ public class SubAlarm extends AbstractEntity implements Serializable {
return true;
}
public boolean canEvaluateImmediately() {
public boolean canEvaluateAlarmImmediately() {
switch (this.getExpression().getFunction()) {
// LAST must be evaluated immediately
case LAST:
return true;
// MIN never gets larger so if the operator is < or <=,
// then they can be immediately evaluated
case MIN:
@ -227,6 +230,50 @@ public class SubAlarm extends AbstractEntity implements Serializable {
}
}
public boolean canEvaluateOkImmediately() {
switch (this.getExpression().getFunction()) {
// LAST must be evaluated immediately
case LAST:
return true;
// MIN never gets larger so if the operator is > or >=,
// then they can be immediately evaluated
case MIN:
switch(this.getExpression().getOperator()) {
case GT:
case GTE:
return true;
default:
return false;
}
// These two never get smaller so if the operator is < or <=,
// then they can be immediately evaluated
case MAX:
case COUNT:
switch(this.getExpression().getOperator()) {
case LT:
case LTE:
return true;
default:
return false;
}
// SUM can increase on a positive measurement or decrease on a negative
// AVG can't be computed until all the metrics have come in
default:
return false;
}
}
public boolean onlyImmediateEvaluation() {
switch (this.getExpression().getFunction()) {
// LAST must be evaluated immediately
case LAST:
return true;
// All others at this time can't be evaluated immediately
default:
return false;
}
}
/**
* Computes initial state for an {@link SubAlarm} based on
* underlying {@link SubExpression}.

View File

@ -1,5 +1,5 @@
/*
* (C) Copyright 2014-2016 Hewlett Packard Enterprise Development Company LP.
* (C) Copyright 2014-2016 Hewlett Packard Enterprise Development LP
* Copyright 2016 FUJITSU LIMITED
*
* Licensed under the Apache License, Version 2.0 (the "License");
@ -101,6 +101,11 @@ public class SubAlarmStats {
return stats;
}
public boolean addValue(double value, long timestamp) {
return this.getStats().addValue(value, timestamp,
this.getSubAlarm().onlyImmediateEvaluation());
}
/**
* Returns the SubAlarm.
*/
@ -124,19 +129,28 @@ public class SubAlarmStats {
final boolean shouldEvaluate = this.stats.shouldEvaluate(now, alarmDelay);
final AlarmState newState;
final boolean immediateAlarmTransition;
if (immediateAlarmEvaluate()) {
newState = AlarmState.ALARM;
} else {
immediateAlarmTransition = true;
}
else if (immediateOkEvaluate()) {
newState = AlarmState.OK;
immediateAlarmTransition = true;
}
else {
if (!shouldEvaluate) {
return false;
}
if (this.subAlarm.onlyImmediateEvaluation()) {
return false;
}
newState = this.determineAlarmStateUsingView();
immediateAlarmTransition = false;
}
final boolean shouldSendStateChange = this.shouldSendStateChange(newState);
final boolean immediateAlarmTransition =
newState == AlarmState.ALARM && this.subAlarm.canEvaluateImmediately();
if (shouldSendStateChange && (shouldEvaluate || immediateAlarmTransition)) {
logger.debug("SubAlarm[deterministic={}] {} transitions from {} to {}",
this.getSubAlarm().isDeterministic(),
@ -209,7 +223,7 @@ public class SubAlarmStats {
}
private boolean immediateAlarmEvaluate() {
if (!this.subAlarm.canEvaluateImmediately()) {
if (!this.subAlarm.canEvaluateAlarmImmediately()) {
return false;
}
// Check the future slots as well
@ -240,6 +254,31 @@ public class SubAlarmStats {
return false;
}
private boolean immediateOkEvaluate() {
if (!this.subAlarm.canEvaluateOkImmediately()) {
return false;
}
// Check the future slots as well
final double[] allValues = stats.getWindowValues();
subAlarm.clearCurrentValues();
for (final double value : allValues) {
if (Double.isNaN(value)) {
subAlarm.clearCurrentValues();
} else {
// Check if value is ALARM
if (subAlarm.getExpression().getOperator()
.evaluate(value, subAlarm.getExpression().getThreshold())) {
subAlarm.clearCurrentValues();
}
else {
subAlarm.addCurrentValue(value);
return true;
}
}
}
return false;
}
private boolean shouldSendStateChange(AlarmState newState) {
return newState != null && (!subAlarm.getState().equals(newState) || subAlarm.isNoState());
}

View File

@ -51,4 +51,7 @@ public interface AlarmDAO {
/** Deletes all alarms for the given AlarmDefinition */
void deleteByDefinitionId(String alarmDefinitionId);
/** Update the state of the given SubAlarm */
void updateSubAlarmState(String subAlarmId, AlarmState subAlarmState);
}

View File

@ -79,7 +79,8 @@ public class AlarmDAOImpl implements AlarmDAO {
try (final Handle h = db.open()) {
final String ALARMS_SQL =
"select a.id, a.alarm_definition_id, a.state, sa.id as sub_alarm_id, sa.expression, sa.sub_expression_id, ad.tenant_id from alarm a "
"select a.id, a.alarm_definition_id, a.state, sa.id as sub_alarm_id, sa.expression, "
+ "sa.state as sub_alarm_state, sa.sub_expression_id, ad.tenant_id from alarm a "
+ "inner join sub_alarm sa on sa.alarm_id = a.id "
+ "inner join alarm_definition ad on a.alarm_definition_id = ad.id "
+ "where ad.deleted_at is null and %s "
@ -113,8 +114,9 @@ public class AlarmDAOImpl implements AlarmDAO {
final SubExpression subExpression =
new SubExpression(getString(row, "sub_expression_id"), AlarmSubExpression.of(getString(
row, "expression")));
final AlarmState subAlarmState = AlarmState.valueOf(getString(row, "sub_alarm_state"));
final SubAlarm subAlarm =
new SubAlarm(getString(row, "sub_alarm_id"), alarmId, subExpression);
new SubAlarm(getString(row, "sub_alarm_id"), alarmId, subExpression, subAlarmState);
subAlarms.add(subAlarm);
prevAlarmId = alarmId;
}
@ -256,9 +258,9 @@ public class AlarmDAOImpl implements AlarmDAO {
for (final SubAlarm subAlarm : alarm.getSubAlarms()) {
h.insert(
"insert into sub_alarm (id, alarm_id, sub_expression_id, expression, created_at, updated_at) values (?, ?, ?, ?, ?, ?)",
"insert into sub_alarm (id, alarm_id, sub_expression_id, expression, state, created_at, updated_at) values (?, ?, ?, ?, ?, ?, ?)",
subAlarm.getId(), subAlarm.getAlarmId(), subAlarm.getAlarmSubExpressionId(), subAlarm
.getExpression().getExpression(), timestamp, timestamp);
.getExpression().getExpression(), subAlarm.getState().toString(), timestamp, timestamp);
}
for (final MetricDefinitionAndTenantId md : alarm.getAlarmedMetrics()) {
createAlarmedMetric(h, md, alarm.getId());
@ -306,6 +308,18 @@ public class AlarmDAOImpl implements AlarmDAO {
}
}
@Override
public void updateSubAlarmState(String id, AlarmState subAlarmState) {
try (Handle h = db.open()) {
final String timestamp = formatDateFromMillis(System.currentTimeMillis());
h.createStatement(
"update sub_alarm set state=:state, updated_at=:updated_at where id=:id")
.bind("state", subAlarmState.toString())
.bind("updated_at", timestamp)
.bind("id", id).execute();
}
}
@Override
public void deleteByDefinitionId(String alarmDefinitionId){
try (Handle h = db.open()) {

View File

@ -29,6 +29,7 @@ import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.codec.digest.DigestUtils;
import org.hibernate.Criteria;
import org.hibernate.Query;
@ -210,6 +211,7 @@ public class AlarmSqlImpl
.setAlarm(alarm)
.setSubExpression(session.get(SubAlarmDefinitionDb.class, subAlarm.getAlarmSubExpressionId()))
.setExpression(subAlarm.getExpression().getExpression())
.setState(subAlarm.getState())
.setUpdatedAt(now)
.setCreatedAt(now)
.setId(subAlarm.getId())
@ -260,6 +262,34 @@ public class AlarmSqlImpl
}
@Override
public void updateSubAlarmState(String subAlarmId, AlarmState subAlarmState) {
Transaction tx = null;
Session session = null;
try {
session = sessionFactory.openSession();
tx = session.beginTransaction();
final DateTime now = DateTime.now();
final SubAlarmDb subAlarm = (SubAlarmDb) session.get(SubAlarmDb.class, subAlarmId);
subAlarm.setState(subAlarmState);
subAlarm.setUpdatedAt(now);
session.update(subAlarm);
tx.commit();
tx = null;
} finally {
this.rollbackIfNotNull(tx);
if (session != null) {
session.close();
}
}
}
@Override
public void deleteByDefinitionId(final String alarmDefinitionId) {
Transaction tx = null;

View File

@ -207,7 +207,7 @@ public class AlarmThresholdingBolt extends BaseRichBolt {
private boolean allSubAlarmsHaveState(final Alarm alarm) {
for (SubAlarm subAlarm : alarm.getSubAlarms()) {
if (subAlarm.isNoState()) {
if (subAlarm.isNoState() && !subAlarm.onlyImmediateEvaluation()) {
return false;
}
}

View File

@ -1,5 +1,5 @@
/*
* (C) Copyright 2014-2016 Hewlett Packard Enterprise Development Company LP.
* (C) Copyright 2014-2016 Hewlett Packard Enterprise Development LP
* Copyright 2016 FUJITSU LIMITED
*
* Licensed under the Apache License, Version 2.0 (the "License");
@ -19,13 +19,16 @@
package monasca.thresh.infrastructure.thresholding;
import monasca.common.model.metric.Metric;
import monasca.common.util.Injector;
import monasca.thresh.ThresholdingConfiguration;
import monasca.thresh.domain.model.MetricDefinitionAndTenantId;
import monasca.thresh.domain.model.SubAlarm;
import monasca.thresh.domain.model.SubAlarmStats;
import monasca.thresh.domain.model.SubExpression;
import monasca.thresh.domain.model.TenantIdAndMetricName;
import monasca.thresh.domain.service.AlarmDAO;
import monasca.thresh.domain.service.SubAlarmStatsRepository;
import monasca.thresh.infrastructure.persistence.PersistenceModule;
import monasca.thresh.utils.Logging;
import monasca.thresh.utils.Streams;
import monasca.thresh.utils.Tuples;
@ -72,9 +75,11 @@ public class MetricAggregationBolt extends BaseRichBolt {
public static final String METRIC_AGGREGATION_CONTROL_STREAM = "MetricAggregationControl";
public static final String[] METRIC_AGGREGATION_CONTROL_FIELDS = new String[] {"directive"};
public static final String METRICS_BEHIND = "MetricsBehind";
private static final int MAX_SAVED_METRIC_AGE_SECONDS = 10;
private final ThresholdingConfiguration config;
private DataSourceFactory dbConfig;
private transient AlarmDAO alarmDAO;
final Map<MetricDefinitionAndTenantId, SubAlarmStatsRepository> metricDefToSubAlarmStatsRepos =
new HashMap<>();
private final Set<SubAlarmStats> subAlarmStatsSet = new HashSet<>();
@ -85,8 +90,14 @@ public class MetricAggregationBolt extends BaseRichBolt {
private boolean upToDate = true;
private Map<MetricDefinitionAndTenantId, Metric> savedMetrics = new HashMap<>();
public MetricAggregationBolt(ThresholdingConfiguration config) {
public MetricAggregationBolt(ThresholdingConfiguration config, DataSourceFactory dbConfig) {
this.config = config;
this.dbConfig = dbConfig;
}
public MetricAggregationBolt(ThresholdingConfiguration config, AlarmDAO alarmDAO) {
this.config = config;
this.alarmDAO = alarmDAO;
}
@Override
@ -118,7 +129,8 @@ public class MetricAggregationBolt extends BaseRichBolt {
final String subAlarmId = tuple.getString(4);
if (EventProcessingBolt.DELETED.equals(eventType)) {
handleAlarmDeleted(metricDefinitionAndTenantId, subAlarmId);
} else if (EventProcessingBolt.RESEND.equals(eventType)) {
} else if (EventProcessingBolt.RESEND.equals(eventType) ||
EventProcessingBolt.UPDATED.equals(eventType)) {
handleAlarmResend(metricDefinitionAndTenantId, subAlarmId);
}
} else if (EventProcessingBolt.METRIC_SUB_ALARM_EVENT_STREAM_ID.equals(tuple
@ -167,6 +179,11 @@ public class MetricAggregationBolt extends BaseRichBolt {
logger = LoggerFactory.getLogger(Logging.categoryFor(getClass(), context));
logger.info("Preparing");
this.collector = collector;
if (this.alarmDAO == null) {
Injector.registerIfNotBound(AlarmDAO.class, new PersistenceModule(this.dbConfig));
this.alarmDAO = Injector.getInstance(AlarmDAO.class);
}
}
/**
@ -184,7 +201,7 @@ public class MetricAggregationBolt extends BaseRichBolt {
for (SubAlarmStats stats : subAlarmStatsRepo.get()) {
final long timestamp_secs = metricTimestampInSeconds(metric);
if (stats.getStats().addValue(metric.value, timestamp_secs)) {
if (stats.addValue(metric.value, timestamp_secs)) {
logger.trace("Aggregated value {} at {} for {}. Updated {}", metric.value,
metric.timestamp, metricDefinitionAndTenantId, stats.getStats());
if (stats.evaluateAndSlideWindow(timestamp_secs, config.alarmDelay)) {
@ -229,42 +246,14 @@ public class MetricAggregationBolt extends BaseRichBolt {
logger.info("Did not evaluate SubAlarms because Metrics are not up to date");
upToDate = true;
}
cleanSavedMetrics();
}
/**
* Clean saved metrics since the SubAlarm should show up within seconds of
* the metric being received
*/
private void cleanSavedMetrics() {
if (savedMetrics.isEmpty()) {
return;
}
final List<MetricDefinitionAndTenantId> toRemove = new ArrayList<>();
for (Map.Entry<MetricDefinitionAndTenantId, Metric> entry: savedMetrics.entrySet()) {
if (savedMetricTooOld(entry.getValue())) {
toRemove.add(entry.getKey());
}
}
logger.debug("Removing {} too old saved metrics", toRemove.size());
for (MetricDefinitionAndTenantId mdtid : toRemove) {
savedMetrics.remove(mdtid);
}
}
/**
* Check if a save Metric is too old
* @param Metric to check
* @return true if saved Metric is too old, false otherwise
*/
private boolean savedMetricTooOld(final Metric metric) {
final long now = currentTimeSeconds();
final long age = metricTimestampInSeconds(metric) - now;
return age > MAX_SAVED_METRIC_AGE_SECONDS;
}
private void sendSubAlarmStateChange(SubAlarmStats subAlarmStats) {
logger.debug("Alarm state changed for {}", subAlarmStats);
if (subAlarmStats.getSubAlarm().onlyImmediateEvaluation()) {
alarmDAO.updateSubAlarmState(subAlarmStats.getSubAlarm().getId(),
subAlarmStats.getSubAlarm().getState());
}
collector.emit(new Values(subAlarmStats.getSubAlarm().getAlarmId(), duplicate(subAlarmStats
.getSubAlarm())));
}
@ -332,11 +321,11 @@ public class MetricAggregationBolt extends BaseRichBolt {
logger.info("Received AlarmCreatedEvent for {}", subAlarm);
final SubAlarmStats newStats = addSubAlarm(metricDefinitionAndTenantId, subAlarm);
// See if we have a saved metric for this SubAlarm. Add to the SubAlarm if we do.
// Because the Metric comes directly from the MetricFilterinBolt but the
// Because the Metric comes directly from the MetricFilteringBolt but the
// SubAlarm comes from the AlarmCreationBolt, it is very likely that the
// Metric arrives first
final Metric metric = savedMetrics.get(metricDefinitionAndTenantId);
if (metric != null && !savedMetricTooOld(metric)) {
if (metric != null) {
aggregateValues(metricDefinitionAndTenantId, metric);
logger.trace("Aggregated saved value {} at {} for {}. Updated {}", metric.value,
metric.timestamp, metricDefinitionAndTenantId, newStats.getStats());

View File

@ -434,6 +434,17 @@ public class ThresholdingEngineAlarmTest extends TopologyTestCase {
return updated;
}
@Override
public void updateSubAlarmState(String subAlarmId, AlarmState subAlarmState) {
for (final Alarm alarm : alarms) {
for (final SubAlarm subAlarm : alarm.getSubAlarms()) {
if (subAlarm.getId().equals(subAlarmId)) {
subAlarm.setState(subAlarmState);
}
}
}
}
public boolean deleteAlarm(final Alarm toDelete) {
for (final Alarm alarm : alarms) {
if (alarm.getId().equals(toDelete.getId())) {

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
* (C) Copyright 2014-2016 Hewlett Packard Enterprise Development LP
* Copyright 2016 FUJITSU LIMITED
*
* Licensed under the Apache License, Version 2.0 (the "License");
@ -32,48 +32,139 @@ import org.testng.annotations.Test;
@Test
public class SubAlarmStatsTest {
private SubExpression expression;
private SubAlarm subAlarm;
private SubAlarmStats subAlarmStats;
private SubExpression avgExpression;
private SubAlarm avgSubAlarm;
private SubAlarmStats avgSubAlarmStats;
private SubExpression lastExpression;
private SubAlarm lastSubAlarm;
private SubAlarmStats lastSubAlarmStats;
private long lastViewStartTime;
@BeforeMethod
protected void beforeMethod() {
expression =
avgExpression =
new SubExpression(UUID.randomUUID().toString(),
AlarmSubExpression.of("avg(hpcs.compute.cpu{id=5}, 60) > 3 times 3"));
subAlarm = new SubAlarm("123", "1", expression);
subAlarm.setNoState(true);
subAlarmStats = new SubAlarmStats(subAlarm, expression.getAlarmSubExpression().getPeriod());
avgSubAlarm = new SubAlarm("123", "1", avgExpression);
avgSubAlarm.setNoState(true);
avgSubAlarmStats = new SubAlarmStats(avgSubAlarm, avgExpression.getAlarmSubExpression().getPeriod());
lastExpression =
new SubExpression(UUID.randomUUID().toString(),
AlarmSubExpression.of("last(hpcs.compute.cpu{id=5}) > 0"));
lastSubAlarm = new SubAlarm("456", "1", lastExpression, AlarmState.UNDETERMINED);
lastSubAlarm.setNoState(true);
lastViewStartTime = 10000;
lastSubAlarmStats = new SubAlarmStats(lastSubAlarm,
lastViewStartTime + lastExpression.getAlarmSubExpression().getPeriod());
}
public void shouldAcceptLastMetricIfOld() {
assertTrue(lastSubAlarmStats.addValue(99, 10));
assertTrue(lastSubAlarmStats.evaluate(lastViewStartTime + 10, 0));
assertEquals(lastSubAlarmStats.getSubAlarm().getState(), AlarmState.ALARM);
}
public void shouldImmediateTransitionToOk() {
assertTrue(lastSubAlarmStats.addValue(0, lastViewStartTime + 10));
assertTrue(lastSubAlarmStats.evaluate(lastViewStartTime + 10, 0));
assertEquals(lastSubAlarmStats.getSubAlarm().getState(), AlarmState.OK);
}
public void shouldNotTransitionToAlarmTwice() {
assertTrue(lastSubAlarmStats.addValue(99, lastViewStartTime + 10));
assertTrue(lastSubAlarmStats.evaluate(lastViewStartTime + 10, 0));
assertEquals(lastSubAlarmStats.getSubAlarm().getState(), AlarmState.ALARM);
assertTrue(lastSubAlarmStats.addValue(98, lastViewStartTime + 20));
assertFalse(lastSubAlarmStats.evaluate(lastViewStartTime + 20, 0));
assertEquals(lastSubAlarmStats.getSubAlarm().getState(), AlarmState.ALARM);
}
public void shouldNotTransitionToOkTwice() {
assertTrue(lastSubAlarmStats.addValue(0, lastViewStartTime + 10));
assertTrue(lastSubAlarmStats.evaluate(lastViewStartTime + 10, 0));
assertEquals(lastSubAlarmStats.getSubAlarm().getState(), AlarmState.OK);
assertTrue(lastSubAlarmStats.addValue(0, lastViewStartTime + 20));
assertFalse(lastSubAlarmStats.evaluate(lastViewStartTime + 20, 0));
assertEquals(lastSubAlarmStats.getSubAlarm().getState(), AlarmState.OK);
}
public void shouldNotTransitionOnOldMeasurement() {
assertTrue(lastSubAlarmStats.addValue(99, lastViewStartTime + 10));
assertTrue(lastSubAlarmStats.evaluate(lastViewStartTime + 10, 0));
assertEquals(lastSubAlarmStats.getSubAlarm().getState(), AlarmState.ALARM);
assertTrue(lastSubAlarmStats.addValue(0, lastViewStartTime + 5));
assertFalse(lastSubAlarmStats.evaluate(lastViewStartTime + 5, 0));
assertEquals(lastSubAlarmStats.getSubAlarm().getState(), AlarmState.ALARM);
}
public void shouldImmediatelyTransition() {
assertTrue(lastSubAlarmStats.addValue(99, lastViewStartTime + 10));
assertTrue(lastSubAlarmStats.evaluate(lastViewStartTime + 10, 0));
assertEquals(lastSubAlarmStats.getSubAlarm().getState(), AlarmState.ALARM);
assertTrue(lastSubAlarmStats.addValue(0, lastViewStartTime + 15));
assertTrue(lastSubAlarmStats.evaluate(lastViewStartTime + 15, 0));
assertEquals(lastSubAlarmStats.getSubAlarm().getState(), AlarmState.OK);
assertTrue(lastSubAlarmStats.addValue(99, lastViewStartTime + 20));
assertTrue(lastSubAlarmStats.evaluate(lastViewStartTime + 20, 0));
assertEquals(lastSubAlarmStats.getSubAlarm().getState(), AlarmState.ALARM);
}
public void shouldNotTransitionFromAlarmWithNoMetrics() {
assertTrue(lastSubAlarmStats.addValue(99, lastViewStartTime + 10));
assertTrue(lastSubAlarmStats.evaluate(lastViewStartTime + 10, 0));
assertEquals(lastSubAlarmStats.getSubAlarm().getState(), AlarmState.ALARM);
for (int period = 1; period < 10; period++) {
long time = lastViewStartTime + period * lastSubAlarm.getExpression().getPeriod();
assertFalse(lastSubAlarmStats.evaluateAndSlideWindow(time, 30));
assertEquals(lastSubAlarmStats.getSubAlarm().getState(), AlarmState.ALARM);
}
}
public void shouldNotTransitionFromOkWithNoMetrics() {
assertTrue(lastSubAlarmStats.addValue(0, lastViewStartTime + 10));
assertTrue(lastSubAlarmStats.evaluate(lastViewStartTime + 10, 0));
assertEquals(lastSubAlarmStats.getSubAlarm().getState(), AlarmState.OK);
for (int period = 1; period < 10; period++) {
long time = lastViewStartTime + period * lastSubAlarm.getExpression().getPeriod();
assertFalse(lastSubAlarmStats.evaluateAndSlideWindow(time, 30));
assertEquals(lastSubAlarmStats.getSubAlarm().getState(), AlarmState.OK);
}
}
public void shouldBeOkIfAnySlotsInViewAreBelowThreshold() {
sendMetric(5, 1, false);
assertFalse(subAlarmStats.evaluateAndSlideWindow(62, 1));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
assertFalse(avgSubAlarmStats.evaluateAndSlideWindow(62, 1));
assertEquals(avgSubAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
sendMetric(1, 62, false);
assertTrue(subAlarmStats.evaluateAndSlideWindow(122, 1));
assertTrue(avgSubAlarmStats.evaluateAndSlideWindow(122, 1));
// This went to OK because at least one period is under the threshold
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK);
assertEquals(avgSubAlarmStats.getSubAlarm().getState(), AlarmState.OK);
sendMetric(5, 123, false);
assertFalse(subAlarmStats.evaluateAndSlideWindow(182, 1));
assertFalse(avgSubAlarmStats.evaluateAndSlideWindow(182, 1));
// Still one under the threshold
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK);
assertEquals(avgSubAlarmStats.getSubAlarm().getState(), AlarmState.OK);
}
public void shouldBeAlarmedIfAllSlotsInViewExceedThreshold() {
sendMetric(5, 1, false);
assertFalse(subAlarmStats.evaluateAndSlideWindow(62, 1));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
assertFalse(avgSubAlarmStats.evaluateAndSlideWindow(62, 1));
assertEquals(avgSubAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
sendMetric(5, 62, false);
assertFalse(subAlarmStats.evaluateAndSlideWindow(122, 1));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
assertFalse(avgSubAlarmStats.evaluateAndSlideWindow(122, 1));
assertEquals(avgSubAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
sendMetric(5, 123, false);
assertTrue(subAlarmStats.evaluateAndSlideWindow(182, 1));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM);
assertTrue(avgSubAlarmStats.evaluateAndSlideWindow(182, 1));
assertEquals(avgSubAlarmStats.getSubAlarm().getState(), AlarmState.ALARM);
}
/**
@ -82,53 +173,53 @@ public class SubAlarmStatsTest {
public void shouldEvaluateAndSlideWindow() {
long initialTime = 11;
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
assertEquals(avgSubAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
assertFalse(avgSubAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10));
assertEquals(avgSubAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
assertFalse(avgSubAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10));
assertEquals(avgSubAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
assertFalse(avgSubAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10));
assertEquals(avgSubAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
// Add value and trigger OK
sendMetric(1, initialTime - 1, false);
assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK);
assertTrue(avgSubAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10));
assertEquals(avgSubAlarmStats.getSubAlarm().getState(), AlarmState.OK);
// Slide in some values that exceed the threshold
sendMetric(5, initialTime - 1, false);
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK);
assertFalse(avgSubAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10));
assertEquals(avgSubAlarmStats.getSubAlarm().getState(), AlarmState.OK);
sendMetric(5, initialTime - 1, false);
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK);
assertFalse(avgSubAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10));
assertEquals(avgSubAlarmStats.getSubAlarm().getState(), AlarmState.OK);
sendMetric(5, initialTime - 1, false);
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK);
assertEquals(avgSubAlarmStats.getSubAlarm().getState(), AlarmState.OK);
// Trigger ALARM
assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM);
assertTrue(avgSubAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10));
assertEquals(avgSubAlarmStats.getSubAlarm().getState(), AlarmState.ALARM);
// Add value and trigger OK
sendMetric(1, initialTime - 1, false);
assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK);
assertTrue(avgSubAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10));
assertEquals(avgSubAlarmStats.getSubAlarm().getState(), AlarmState.OK);
// Must slide 8 times total from the last added value to trigger UNDETERMINED. This is
// equivalent to the behavior in CloudWatch for an alarm with 3 evaluation periods. 2 more
// slides to move the value outside of the window and 6 more to exceed the observation
// threshold.
for (int i = 0; i < 7; i++) {
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10));
assertFalse(avgSubAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10));
}
assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
assertTrue(avgSubAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10));
assertEquals(avgSubAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
sendMetric(5, initialTime - 1, false);
}
private void sendMetric(double value, long timestamp, boolean expected) {
subAlarmStats.getStats().addValue(value, timestamp);
assertEquals(subAlarmStats.evaluateAndSlideWindow(timestamp, timestamp), expected);
assertTrue(avgSubAlarmStats.addValue(value, timestamp));
assertEquals(avgSubAlarmStats.evaluateAndSlideWindow(timestamp, timestamp), expected);
}
/**
@ -138,120 +229,120 @@ public class SubAlarmStatsTest {
long initialTime = 11;
// Need a different expression for this test
expression =
avgExpression =
new SubExpression(UUID.randomUUID().toString(),
AlarmSubExpression.of("max(hpcs.compute.cpu{id=5}, 60) > 3 times 3"));
subAlarm = new SubAlarm("123", "1", expression);
assertEquals(subAlarm.getState(), AlarmState.UNDETERMINED);
subAlarm.setNoState(true);
subAlarmStats = new SubAlarmStats(subAlarm, expression.getAlarmSubExpression().getPeriod());
avgSubAlarm = new SubAlarm("123", "1", avgExpression);
assertEquals(avgSubAlarm.getState(), AlarmState.UNDETERMINED);
avgSubAlarm.setNoState(true);
avgSubAlarmStats = new SubAlarmStats(avgSubAlarm, avgExpression.getAlarmSubExpression().getPeriod());
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
assertEquals(avgSubAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
assertFalse(avgSubAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertEquals(avgSubAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
assertFalse(avgSubAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertEquals(avgSubAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
assertFalse(avgSubAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertEquals(avgSubAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
// Add value and trigger OK
sendMetric(1, initialTime - 1, false);
assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK);
assertTrue(avgSubAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertEquals(avgSubAlarmStats.getSubAlarm().getState(), AlarmState.OK);
// Slide in some values that exceed the threshold
sendMetric(5, initialTime - 1, false);
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK);
assertFalse(avgSubAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertEquals(avgSubAlarmStats.getSubAlarm().getState(), AlarmState.OK);
sendMetric(5, initialTime - 1, false);
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertFalse(avgSubAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
// Trigger ALARM
sendMetric(5, initialTime - 1, true);
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM);
assertEquals(avgSubAlarmStats.getSubAlarm().getState(), AlarmState.ALARM);
// Ensure it is still ALARM on next evaluation
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM);
assertFalse(avgSubAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertEquals(avgSubAlarmStats.getSubAlarm().getState(), AlarmState.ALARM);
// Add value and trigger OK
sendMetric(1, initialTime - 1, false);
assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK);
assertTrue(avgSubAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertEquals(avgSubAlarmStats.getSubAlarm().getState(), AlarmState.OK);
// Must slide 8 times total from the last added value to trigger UNDETERMINED. This is
// equivalent to the behavior in CloudWatch for an alarm with 3 evaluation periods. 2 more
// slides to move the value outside of the window and 6 more to exceed the observation
// threshold.
for (int i = 0; i < 7; i++) {
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertFalse(avgSubAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
}
assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
assertTrue(avgSubAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertEquals(avgSubAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
// Now test that future buckets are evaluated
// Set the current bucket to ALARM
sendMetric(5, initialTime - 1, false);
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
assertEquals(avgSubAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
// Set the future bucket of current + 2 to ALARM
sendMetric(5, initialTime + 120, false);
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
assertEquals(avgSubAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
// Set the future bucket of current + 1 to ALARM. That will trigger the
// SubAlarm to go to ALARM
sendMetric(5, initialTime + 60, true);
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM);
assertEquals(avgSubAlarmStats.getSubAlarm().getState(), AlarmState.ALARM);
}
public void shouldAlarmIfAllSlotsAlarmed() {
long initialTime = 11;
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
assertEquals(avgSubAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertFalse(avgSubAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
sendMetric(5, initialTime - 1, false);
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertFalse(avgSubAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
sendMetric(5, initialTime - 1, false);
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertFalse(avgSubAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
sendMetric(5, initialTime - 1, false);
assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM);
assertTrue(avgSubAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertEquals(avgSubAlarmStats.getSubAlarm().getState(), AlarmState.ALARM);
}
public void testEmptyWindowObservationThreshold() {
expression =
avgExpression =
new SubExpression(UUID.randomUUID().toString(),
AlarmSubExpression.of("avg(hpcs.compute.cpu{id=5}) > 3 times 3"));
subAlarm = new SubAlarm("123", "1", expression);
assertEquals(subAlarm.getState(), AlarmState.UNDETERMINED);
SubAlarmStats saStats = new SubAlarmStats(subAlarm, (System.currentTimeMillis() / 1000) + 60);
avgSubAlarm = new SubAlarm("123", "1", avgExpression);
assertEquals(avgSubAlarm.getState(), AlarmState.UNDETERMINED);
SubAlarmStats saStats = new SubAlarmStats(avgSubAlarm, (System.currentTimeMillis() / 1000) + 60);
assertEquals(saStats.emptyWindowObservationThreshold, 6);
}
public void checkUpdateSubAlarm() {
// Can keep data with threshold change
verifyUpdateSubAlarm(expression.getAlarmSubExpression().getExpression().replace("> 3", "> 6"), 100.0);
verifyUpdateSubAlarm(avgExpression.getAlarmSubExpression().getExpression().replace("> 3", "> 6"), 100.0);
// Can keep data with operator change
verifyUpdateSubAlarm(expression.getAlarmSubExpression().getExpression().replace("< 3", "< 6"), 100.0);
verifyUpdateSubAlarm(avgExpression.getAlarmSubExpression().getExpression().replace("< 3", "< 6"), 100.0);
// Have to flush data with function change
verifyUpdateSubAlarm(expression.getAlarmSubExpression().getExpression().replace("avg", "max"), Double.NaN);
verifyUpdateSubAlarm(avgExpression.getAlarmSubExpression().getExpression().replace("avg", "max"), Double.NaN);
// Have to flush data with periods change
verifyUpdateSubAlarm(expression.getAlarmSubExpression().getExpression().replace("times 3", "times 2"), Double.NaN);
verifyUpdateSubAlarm(avgExpression.getAlarmSubExpression().getExpression().replace("times 3", "times 2"), Double.NaN);
// Have to flush data with period change
verifyUpdateSubAlarm(expression.getAlarmSubExpression().getExpression().replace(", 60", ", 120"), Double.NaN);
verifyUpdateSubAlarm(avgExpression.getAlarmSubExpression().getExpression().replace(", 60", ", 120"), Double.NaN);
}
private void verifyUpdateSubAlarm(String newExpressionString, double expectedValue) {
final AlarmSubExpression newExpression = AlarmSubExpression.of(newExpressionString);
assertNotEquals(newExpression, expression.getAlarmSubExpression().getExpression());
int timestamp = expression.getAlarmSubExpression().getPeriod() / 2;
assertNotEquals(newExpression, avgExpression.getAlarmSubExpression().getExpression());
int timestamp = avgExpression.getAlarmSubExpression().getPeriod() / 2;
sendMetric(100.00, timestamp, false);
assertEquals(subAlarmStats.getStats().getValue(timestamp), 100.0);
subAlarmStats.updateSubAlarm(newExpression, expression.getAlarmSubExpression().getPeriod());
assertEquals(subAlarmStats.getStats().getValue(timestamp), expectedValue);
assertTrue(subAlarm.isNoState());
assertEquals(avgSubAlarmStats.getStats().getValue(timestamp), 100.0);
avgSubAlarmStats.updateSubAlarm(newExpression, avgExpression.getAlarmSubExpression().getPeriod());
assertEquals(avgSubAlarmStats.getStats().getValue(timestamp), expectedValue);
assertTrue(avgSubAlarm.isNoState());
}
@ -266,7 +357,7 @@ public class SubAlarmStatsTest {
final SubAlarmStats stats = new SubAlarmStats(subAlarm, t1 + subExpr.getAlarmSubExpression().getPeriod());
for (int i = 0; i < 360; i++) {
t1++;
stats.getStats().addValue(1.0, t1);
stats.addValue(1.0, t1);
if ((t1 % 60) == 2) {
stats.evaluateAndSlideWindow(t1, 1);
if (i <= subExpr.getAlarmSubExpression().getPeriod()) {
@ -296,7 +387,7 @@ public class SubAlarmStatsTest {
int t1 = 0;
for (int i = 0; i < 1080; i++) {
t1++;
stats.getStats().addValue(1.0, t1);
stats.getStats().addValue(1.0, t1, false);
if ((t1 % 60) == 2) {
stats.evaluateAndSlideWindow(t1, 1);
if (i <= subExpr.getAlarmSubExpression().getPeriod()) {

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
* (C) Copyright 2015-2016 Hewlett Packard Enterprise Development LP
* Copyright 2016 FUJITSU LIMITED
*
* Licensed under the Apache License, Version 2.0 (the "License");
@ -78,7 +78,7 @@ public class SubAlarmTest {
private void checkExpression(String expressionString, boolean expected) {
final SubAlarm subAlarm = this.getSubAlarm(expressionString);
assertEquals(subAlarm.canEvaluateImmediately(), expected);
assertEquals(subAlarm.canEvaluateAlarmImmediately(), expected);
assertEquals(subAlarm.getState(), AlarmState.UNDETERMINED);
assertFalse(subAlarm.isDeterministic());
}

View File

@ -1,5 +1,5 @@
/*
* (C) Copyright 2014,2016 Hewlett Packard Enterprise Development LP
* (C) Copyright 2014,2016 Hewlett Packard Enterprise Development LP
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -72,7 +72,7 @@ public class AlarmDAOImplTest {
@BeforeClass
protected void setupClass() throws Exception {
// See class comment
db = new DBI("jdbc:mysql://192.168.10.4/mon", "monapi", "password");
db = new DBI("jdbc:mysql://192.168.10.6/mon", "monapi", "password");
handle = db.open();
dao = new AlarmDAOImpl(db);
}

View File

@ -65,14 +65,15 @@ public class AlarmThresholdingBoltTest {
private AlarmDefinition alarmDefinition;
private Alarm alarm;
private List<SubAlarm> subAlarms;
private SubAlarm lastSubAlarm = null;
private AlarmEventForwarder alarmEventForwarder;
private AlarmDAO alarmDAO;
private AlarmDefinitionDAO alarmDefinitionDAO;
private AlarmThresholdingBolt bolt;
private OutputCollector collector;
private final String[] subExpressions = {"avg(cpu{instance_id=123,device=42}, 1) > 5",
"max(load{instance_id=123,device=42}, 1) > 8",
"sum(diskio{instance_id=123,device=42}, 1) > 5000"};
private final String[] subExpressions = {"avg(cpu{instance_id=123,device=42}) > 5",
"max(load{instance_id=123,device=42}) > 8",
"last(diskio{instance_id=123,device=42}) > 5000"};
@BeforeMethod
protected void beforeMethod() {
@ -99,6 +100,14 @@ public class AlarmThresholdingBoltTest {
final Map<String, String> config = new HashMap<>();
final TopologyContext context = mock(TopologyContext.class);
bolt.prepare(config, context, collector);
for (SubAlarm subAlarm : subAlarms) {
if (subAlarm.getExpression().getFunction().equals(AggregateFunction.LAST)) {
lastSubAlarm = subAlarm;
}
}
assertNotNull(lastSubAlarm, "Did not find a SubAlarm with Function of last");
lastSubAlarm.setState(AlarmState.OK);
}
/**
@ -158,6 +167,50 @@ public class AlarmThresholdingBoltTest {
verify(alarmDAO, times(1)).updateState(eq(alarmId), eq(AlarmState.OK), anyLong());
}
/**
* Create a Alarm with all sub expressions. Send a SubAlarm with state set to ALARM for
* the first SubAlarm which is not the one with operator Last. That one is initialized to
* OK. Send OK for the other SubAlarm which is not the one with operator Last. Ensure
* that the Alarm was triggered to ALARM and sent.
*
* Since SubAlarms with the function last() are only sent when they change while all the other
* types are sent when they first achieve a state after startup, this test ensures
* that Alarms with more than one SubAlarm that has at least one function last() work correctly
* on startup
*/
public void triggerAlarmWithLast() {
final String alarmId = alarm.getId();
when(alarmDAO.findById(alarmId)).thenReturn(alarm);
when(alarmDefinitionDAO.findById(alarmDefinition.getId())).thenReturn(alarmDefinition);
SubAlarm firstAlarmSubAlarm = null;
AlarmState sendState = AlarmState.ALARM;
for (SubAlarm subAlarm : subAlarms) {
if (lastSubAlarm != subAlarm) {
if (firstAlarmSubAlarm == null) {
firstAlarmSubAlarm = subAlarm;
}
emitSubAlarmStateChange(alarmId, subAlarm, sendState);
sendState = AlarmState.OK;
}
}
final String alarmJson =
"{\"alarm-transitioned\":{\"tenantId\":\""
+ tenantId
+ "\","
+ "\"alarmId\":\"" + alarmId + "\","
+ "\"alarmDefinitionId\":\"" + alarmDefinition.getId() + "\",\"metrics\":[],"
+ "\"alarmName\":\"Test CPU Alarm\","
+ "\"alarmDescription\":\"Description of Alarm\",\"oldState\":\"OK\",\"newState\":\"ALARM\","
+ "\"actionsEnabled\":true,"
+ "\"stateChangeReason\":\"Thresholds were exceeded for the sub-alarms: "
+ firstAlarmSubAlarm.getExpression().getExpression() + " with the values: []\"," + "\"severity\":\"LOW\","
+ "\"link\":null," + "\"lifecycleState\":null,"
+ "\"subAlarms\":[" + buildSubAlarmJson(alarm.getSubAlarms()) + "],"
+ "\"timestamp\":1395587091003}}";
verify(alarmEventForwarder, times(1)).send(alarmJson);
verify(alarmDAO, times(1)).updateState(eq(alarmId), eq(AlarmState.ALARM), anyLong()); }
public void simpleAlarmUpdate() {
// Now send an AlarmUpdatedEvent
final AlarmState newState = AlarmState.OK;

View File

@ -1,5 +1,5 @@
/*
* (C) Copyright 2014,2016 Hewlett Packard Enterprise Development Company LP.
* (C) Copyright 2014-2016 Hewlett Packard Enterprise Development LP
* Copyright 2016 FUJITSU LIMITED
*
* Licensed under the Apache License, Version 2.0 (the "License");
@ -18,6 +18,8 @@
package monasca.thresh.infrastructure.thresholding;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
@ -31,7 +33,6 @@ import static org.testng.Assert.assertTrue;
import monasca.common.model.alarm.AlarmOperator;
import monasca.common.model.alarm.AlarmState;
import monasca.common.model.alarm.AlarmSubExpression;
import monasca.common.model.metric.Metric;
import monasca.common.model.metric.MetricDefinition;
@ -41,6 +42,7 @@ import monasca.thresh.domain.model.SubAlarm;
import monasca.thresh.domain.model.SubAlarmStats;
import monasca.thresh.domain.model.SubExpression;
import monasca.thresh.domain.model.TenantIdAndMetricName;
import monasca.thresh.domain.service.AlarmDAO;
import monasca.thresh.domain.service.SubAlarmStatsRepository;
import monasca.thresh.utils.Streams;
@ -71,14 +73,18 @@ public class MetricAggregationBoltTest {
private SubAlarm subAlarm2;
private SubAlarm subAlarm3;
private SubAlarm subAlarm4;
private SubAlarm subAlarm5;
private SubExpression subExpr1;
private SubExpression subExpr2;
private SubExpression subExpr3;
private SubExpression subExpr4;
private SubExpression subExpr5;
private MetricDefinition metricDef1;
private MetricDefinition metricDef2;
private MetricDefinition metricDef3;
private MetricDefinition metricDef4;
private MetricDefinition metricDef5;
private AlarmDAO alarmDao;
@BeforeClass
protected void beforeClass() {
@ -90,10 +96,12 @@ public class MetricAggregationBoltTest {
subExpr4 = new SubExpression("777", AlarmSubExpression.of(
"count(log.error{id=5},deterministic,60) >= 5")
);
subExpr5 = new SubExpression("888", AlarmSubExpression.of("last(hpcs.compute.mem{id=5}) > 0"));
metricDef1 = subExpr1.getAlarmSubExpression().getMetricDefinition();
metricDef2 = subExpr2.getAlarmSubExpression().getMetricDefinition();
metricDef3 = subExpr3.getAlarmSubExpression().getMetricDefinition();
metricDef4 = subExpr4.getAlarmSubExpression().getMetricDefinition();
metricDef5 = subExpr5.getAlarmSubExpression().getMetricDefinition();
}
@BeforeMethod
@ -103,15 +111,18 @@ public class MetricAggregationBoltTest {
subAlarm2 = new SubAlarm("456", "1", subExpr2);
subAlarm3 = new SubAlarm("789", "2", subExpr3);
subAlarm4 = new SubAlarm("666", "3", subExpr4);
subAlarm5 = new SubAlarm("891", "3", subExpr5);
subAlarms = new ArrayList<>();
subAlarms.add(subAlarm1);
subAlarms.add(subAlarm2);
subAlarms.add(subAlarm3);
subAlarms.add(subAlarm4);
subAlarms.add(subAlarm5);
final ThresholdingConfiguration config = new ThresholdingConfiguration();
config.alarmDelay = 10;
bolt = new MockMetricAggregationBolt(config);
alarmDao = mock(AlarmDAO.class);
bolt = new MockMetricAggregationBolt(config, alarmDao);
context = mock(TopologyContext.class);
collector = mock(OutputCollector.class);
bolt.prepare(null, context, collector);
@ -461,6 +472,176 @@ public class MetricAggregationBoltTest {
verify(collector, times(1)).emit(new Values(subAlarm4.getAlarmId(), subAlarm4));
}
public void shouldTransitionLastImmediatelyForNewAlarm() {
long t1 = 50000;
bolt.setCurrentTime(t1);
sendSubAlarmCreated(metricDef5, subAlarm5);
verify(alarmDao, never()).updateSubAlarmState(eq(subAlarm5.getId()), (AlarmState) any());
t1 += 1000;
bolt.execute(createMetricTuple(metricDef5, new Metric(metricDef5, t1, 1.0, null)));
assertEquals(subAlarm5.getState(), AlarmState.ALARM);
verify(collector, times(1)).emit(new Values(subAlarm5.getAlarmId(), subAlarm5));
verify(alarmDao, times(1)).updateSubAlarmState(subAlarm5.getId(), AlarmState.ALARM);
t1 += 1000;
// Have to reset the mock so it can tell the difference when subAlarm5 is emitted again.
reset(collector);
reset(alarmDao);
// Make sure it doesn't transition out of ALARM even with no measurements arriving
bolt.setCurrentTime(t1 += 60000);
sendTickTuple();
assertEquals(subAlarm5.getState(), AlarmState.ALARM);
verify(collector, never()).emit(new Values(subAlarm5.getAlarmId(), subAlarm5));
verify(alarmDao, never()).updateSubAlarmState(eq(subAlarm5.getId()), (AlarmState) any());
bolt.setCurrentTime(t1 += 60000);
sendTickTuple();
assertEquals(subAlarm5.getState(), AlarmState.ALARM);
verify(collector, never()).emit(new Values(subAlarm5.getAlarmId(), subAlarm5));
verify(alarmDao, never()).updateSubAlarmState(eq(subAlarm5.getId()), (AlarmState) any());
bolt.setCurrentTime(t1 += 60000);
sendTickTuple();
assertEquals(subAlarm5.getState(), AlarmState.ALARM);
verify(collector, never()).emit(new Values(subAlarm5.getAlarmId(), subAlarm5));
verify(alarmDao, never()).updateSubAlarmState(eq(subAlarm5.getId()), (AlarmState) any());
bolt.setCurrentTime(t1 += 60000);
sendTickTuple();
assertEquals(subAlarm5.getState(), AlarmState.ALARM);
verify(collector, never()).emit(new Values(subAlarm5.getAlarmId(), subAlarm5));
bolt.execute(createMetricTuple(metricDef5, new Metric(metricDef5, t1, 0.0, null)));
assertEquals(subAlarm5.getState(), AlarmState.OK);
verify(collector, times(1)).emit(new Values(subAlarm5.getAlarmId(), subAlarm5));
verify(alarmDao, times(1)).updateSubAlarmState(subAlarm5.getId(), AlarmState.OK);
// Have to reset the mock so it can tell the difference when subAlarm5 is emitted again.
reset(collector);
reset(alarmDao);
// Make sure it doesn't transition out of ALARM even with no measurements arriving
bolt.setCurrentTime(t1 += 60000);
sendTickTuple();
assertEquals(subAlarm5.getState(), AlarmState.OK);
verify(collector, never()).emit(new Values(subAlarm5.getAlarmId(), subAlarm5));
verify(alarmDao, never()).updateSubAlarmState(eq(subAlarm5.getId()), (AlarmState) any());
bolt.setCurrentTime(t1 += 60000);
sendTickTuple();
assertEquals(subAlarm5.getState(), AlarmState.OK);
verify(collector, never()).emit(new Values(subAlarm5.getAlarmId(), subAlarm5));
verify(alarmDao, never()).updateSubAlarmState(eq(subAlarm5.getId()), (AlarmState) any());
bolt.setCurrentTime(t1 += 60000);
sendTickTuple();
assertEquals(subAlarm5.getState(), AlarmState.OK);
verify(collector, never()).emit(new Values(subAlarm5.getAlarmId(), subAlarm5));
verify(alarmDao, never()).updateSubAlarmState(eq(subAlarm5.getId()), (AlarmState) any());
bolt.setCurrentTime(t1 += 60000);
sendTickTuple();
assertEquals(subAlarm5.getState(), AlarmState.OK);
verify(collector, never()).emit(new Values(subAlarm5.getAlarmId(), subAlarm5));
verify(alarmDao, never()).updateSubAlarmState(eq(subAlarm5.getId()), (AlarmState) any());
}
public void shouldTransitionLastImmediatelyToAlarmForExistingAlarm() {
testImmediateChangeWithInitialState(AlarmState.OK, 1.0, AlarmState.ALARM);
}
public void shouldTransitionLastImmediatelyToOKForExistingAlarm() {
testImmediateChangeWithInitialState(AlarmState.ALARM, 0.0, AlarmState.OK);
}
private void testImmediateChangeWithInitialState(AlarmState initialState, double value,
final AlarmState expectedState) {
long t1 = 50000;
bolt.setCurrentTime(t1);
subAlarm5.setState(initialState);
sendSubAlarmCreated(metricDef5, subAlarm5);
verify(alarmDao, never()).updateSubAlarmState(eq(subAlarm5.getId()), (AlarmState) any());
t1 += 1000;
bolt.execute(createMetricTuple(metricDef5, new Metric(metricDef5, t1, value, null)));
assertEquals(subAlarm5.getState(), expectedState);
verify(collector, times(1)).emit(new Values(subAlarm5.getAlarmId(), subAlarm5));
verify(alarmDao, times(1)).updateSubAlarmState(subAlarm5.getId(), expectedState);
}
public void testAlarmStateUpdatedWithLast() {
final AlarmState initialState = AlarmState.OK;
final AlarmState expectedState = AlarmState.OK;
long t1 = 50000;
bolt.setCurrentTime(t1);
subAlarm5.setState(initialState);
sendSubAlarmCreated(metricDef5, subAlarm5);
verify(alarmDao, never()).updateSubAlarmState(eq(subAlarm5.getId()), (AlarmState) any());
t1 += 1000;
final SubAlarm updated = new SubAlarm(subAlarm5.getId(), subAlarm5.getAlarmId(), new SubExpression("", subAlarm5.getExpression()), AlarmState.OK);
// Simulate an Alarm Update message from the API that would toggle the Alarm to ALARM
sendSubAlarmMsg(EventProcessingBolt.UPDATED, metricDef5, updated);
// Send another OK measurement. SubAlarm needs to be sent again
bolt.execute(createMetricTuple(metricDef5, new Metric(metricDef5, t1, 0, null)));
assertEquals(subAlarm5.getState(), expectedState);
verify(collector, times(1)).emit(new Values(subAlarm5.getAlarmId(), subAlarm5));
verify(alarmDao, times(1)).updateSubAlarmState(subAlarm5.getId(), expectedState);
// Simulate another Alarm Update message from the API that would toggle the Alarm to ALARM
t1 += 1000;
sendSubAlarmMsg("updated", metricDef5, updated);
// Have to reset the mocks so they can tell the difference when subAlarm5 is emitted again.
reset(collector);
reset(alarmDao);
// Send another OK measurement. SubAlarm needs to be sent again
bolt.execute(createMetricTuple(metricDef5, new Metric(metricDef5, t1, 0, null)));
assertEquals(subAlarm5.getState(), expectedState);
verify(collector, times(1)).emit(new Values(subAlarm5.getAlarmId(), subAlarm5));
verify(alarmDao, times(1)).updateSubAlarmState(subAlarm5.getId(), expectedState);
}
public void testImmediateChangeWithOldMetric() {
final AlarmState initialState = AlarmState.OK;
final double value = 1.0;
final AlarmState expectedState = AlarmState.ALARM;
final SubAlarm subAlarm = subAlarm5;
final MetricDefinition metricDef = metricDef5;
testOldMetric(subAlarm, metricDef, initialState, value, expectedState);
}
public void testNonImmediateChangeWithOldMetric() {
final AlarmState initialState = AlarmState.OK;
final double value = 1000.0;
final AlarmState expectedState = AlarmState.OK;
final SubAlarm subAlarm = subAlarm3;
final MetricDefinition metricDef = metricDef3;
testOldMetric(subAlarm, metricDef, initialState, value, expectedState);
}
private void testOldMetric(final SubAlarm subAlarm, final MetricDefinition metricDef,
final AlarmState initialState, final double value, final AlarmState expectedState) {
long t1 = 500000;
bolt.setCurrentTime(t1);
subAlarm.setState(initialState);
sendSubAlarmCreated(metricDef, subAlarm);
verify(alarmDao, never()).updateSubAlarmState(eq(subAlarm.getId()), (AlarmState) any());
// Even though this measurement is way outside the window, make sure it gets processed
// anyways
bolt.execute(createMetricTuple(metricDef, new Metric(metricDef, 1000, value, null)));
assertEquals(subAlarm.getState(), expectedState);
if (initialState != expectedState) {
verify(collector, times(1)).emit(new Values(subAlarm.getAlarmId(), subAlarm));
verify(alarmDao, times(1)).updateSubAlarmState(subAlarm.getId(), expectedState);
}
}
public void shouldNeverLeaveOkIfThresholdNotExceededForDeterministic() {
long t1 = 50000;
bolt.setCurrentTime(t1);
@ -620,7 +801,7 @@ public class MetricAggregationBoltTest {
final SubAlarmStats oldStats =
bolt.metricDefToSubAlarmStatsRepos.get(metricDefinitionAndTenantId).get(ALARM_ID_1);
assertEquals(oldStats.getSubAlarm().getExpression().getThreshold(), 90.0);
assertTrue(oldStats.getStats().addValue(80.0, System.currentTimeMillis() / 1000));
assertTrue(oldStats.addValue(80.0, System.currentTimeMillis() / 1000));
assertFalse(Double.isNaN(oldStats.getStats().getWindowValues()[0]));
assertNotNull(bolt.metricDefToSubAlarmStatsRepos.get(metricDefinitionAndTenantId).get(ALARM_ID_1));
@ -673,8 +854,8 @@ public class MetricAggregationBoltTest {
private long currentTime;
public MockMetricAggregationBolt(ThresholdingConfiguration config) {
super(config);
public MockMetricAggregationBolt(ThresholdingConfiguration config, AlarmDAO alarmDao) {
super(config, alarmDao);
}
@Override