Evaluate SubAlarms immediately if possible

Some SubAlarm expressions can be evaluated immediately. If the
expression is MAX(m) > 10, a single measurement of m > 10 will cause
the SubAlarm to transition to the ALARM state regardless of any other
measurement of m that is received. However if the operater is < or <=,
MAX can't be immediately evaluated since a following measurement
could be larger than the one storeed.  COUNT also can be evaluated
immediately if the operator is > or >= since it never decreases. MIN
is the opposite and can be immediately evaluated if the operator is
< or <=. AVG and SUM can't be evaluated until the end of the
evaluation window since the average or sum could go up or down
depending on the measurements received and whether or not they are
negative.

Also see if the sliding window for a SubAlarm can be slid when a
metric is received for the SubAlarm. This could allow the SubAlarm
to be evaluated faster than waiting for the tick tuple since that
is only received every 60 seconds.

Add unit tests for immediate SubAlarm evaluation.

Add unit tests for previously untested parts of SubAlarmStats

Change-Id: I989a82328fa4ccc04b49d203f70a1adc9fa4d3bb
This commit is contained in:
Craig Bryant 2015-10-22 16:48:06 -06:00
parent c3568930f6
commit 96f9b442f3
6 changed files with 398 additions and 67 deletions

View File

@ -193,4 +193,34 @@ public class SubAlarm extends AbstractEntity implements Serializable {
// Operator and Threshold can vary
return true;
}
public boolean canEvaluateImmediately() {
switch (this.getExpression().getFunction()) {
// MIN never gets larger so if the operator is < or <=,
// then they can be immediately evaluated
case MIN:
switch(this.getExpression().getOperator()) {
case LT:
case LTE:
return true;
default:
return false;
}
// These two never get smaller so if the operator is > or >=,
// then they can be immediately evaluated
case MAX:
case COUNT:
switch(this.getExpression().getOperator()) {
case GT:
case GTE:
return true;
default:
return false;
}
// SUM can increase on a positive measurement or decrease on a negative
// AVG can't be computed until all the metrics have come in
default:
return false;
}
}
}

View File

@ -120,13 +120,31 @@ public class SubAlarmStats {
* @param alarmDelay How long to give metrics a chance to arrive
*/
boolean evaluate(final long now, long alarmDelay) {
if (!stats.shouldEvaluate(now, alarmDelay)) {
return false;
final AlarmState newState;
if (immediateAlarmEvaluate()) {
newState = AlarmState.ALARM;
}
double[] values = stats.getViewValues();
else {
if (!stats.shouldEvaluate(now, alarmDelay)) {
return false;
}
newState = determineAlarmStateUsingView();
}
if (shouldSendStateChange(newState) &&
(stats.shouldEvaluate(now, alarmDelay) ||
(newState == AlarmState.ALARM && this.subAlarm.canEvaluateImmediately()))) {
setSubAlarmState(newState);
return true;
}
return false;
}
private AlarmState determineAlarmStateUsingView() {
boolean thresholdExceeded = false;
boolean hasEmptyWindows = false;
subAlarm.clearCurrentValues();
double[] values = stats.getViewValues();
for (double value : values) {
if (Double.isNaN(value)) {
hasEmptyWindows = true;
@ -137,38 +155,61 @@ public class SubAlarmStats {
// Check if value is OK
if (!subAlarm.getExpression().getOperator()
.evaluate(value, subAlarm.getExpression().getThreshold())) {
if (!shouldSendStateChange(AlarmState.OK)) {
return false;
}
setSubAlarmState(AlarmState.OK);
return true;
return AlarmState.OK;
} else
thresholdExceeded = true;
}
}
if (thresholdExceeded && !hasEmptyWindows) {
if (!shouldSendStateChange(AlarmState.ALARM)) {
return false;
}
setSubAlarmState(AlarmState.ALARM);
return true;
return AlarmState.ALARM;
}
// Window is empty at this point
emptyWindowObservations++;
if ((emptyWindowObservations >= emptyWindowObservationThreshold)
&& shouldSendStateChange(AlarmState.UNDETERMINED) && !subAlarm.isSporadicMetric()) {
setSubAlarmState(AlarmState.UNDETERMINED);
return true;
return AlarmState.UNDETERMINED;
}
// Hasn't transitioned to UNDETERMINED yet, so use the current state
return null;
}
private boolean immediateAlarmEvaluate() {
if (!this.subAlarm.canEvaluateImmediately()) {
return false;
}
// Check the future slots as well
final double[] allValues = stats.getWindowValues();
subAlarm.clearCurrentValues();
int alarmRun = 0;
for (final double value : allValues) {
if (Double.isNaN(value)) {
alarmRun = 0;
subAlarm.clearCurrentValues();
} else {
// Check if value is OK
if (!subAlarm.getExpression().getOperator()
.evaluate(value, subAlarm.getExpression().getThreshold())) {
alarmRun = 0;
subAlarm.clearCurrentValues();
}
else {
subAlarm.addCurrentValue(value);
alarmRun++;
if (alarmRun == subAlarm.getExpression().getPeriods()) {
return true;
}
}
}
}
return false;
}
private boolean shouldSendStateChange(AlarmState newState) {
return !subAlarm.getState().equals(newState) || subAlarm.isNoState();
return newState != null && (!subAlarm.getState().equals(newState) || subAlarm.isNoState());
}
private void setSubAlarmState(AlarmState newState) {
@ -191,3 +232,4 @@ public class SubAlarmStats {
}
}
}

View File

@ -183,6 +183,9 @@ public class MetricAggregationBolt extends BaseRichBolt {
if (stats.getStats().addValue(metric.value, timestamp_secs)) {
logger.trace("Aggregated value {} at {} for {}. Updated {}", metric.value,
metric.timestamp, metricDefinitionAndTenantId, stats.getStats());
if (stats.evaluateAndSlideWindow(timestamp_secs, config.alarmDelay)) {
sendSubAlarmStateChange(stats);
}
} else {
logger.warn("Metric is too old, age {} seconds: timestamp {} for {}, {}",
currentTimeSeconds() - timestamp_secs, timestamp_secs, metricDefinitionAndTenantId,
@ -202,9 +205,7 @@ public class MetricAggregationBolt extends BaseRichBolt {
if (upToDate) {
logger.debug("Evaluating {}", subAlarmStats);
if (subAlarmStats.evaluateAndSlideWindow(newWindowTimestamp, config.alarmDelay)) {
logger.debug("Alarm state changed for {}", subAlarmStats);
collector.emit(new Values(subAlarmStats.getSubAlarm().getAlarmId(), subAlarmStats
.getSubAlarm()));
sendSubAlarmStateChange(subAlarmStats);
}
} else {
subAlarmStats.slideWindow(newWindowTimestamp, config.alarmDelay);
@ -216,6 +217,12 @@ public class MetricAggregationBolt extends BaseRichBolt {
}
}
private void sendSubAlarmStateChange(SubAlarmStats subAlarmStats) {
logger.debug("Alarm state changed for {}", subAlarmStats);
collector.emit(new Values(subAlarmStats.getSubAlarm().getAlarmId(), subAlarmStats
.getSubAlarm()));
}
/**
* Only used for testing.
*

View File

@ -18,6 +18,7 @@
package monasca.thresh.domain.model;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import monasca.common.model.alarm.AlarmState;
@ -45,31 +46,31 @@ public class SubAlarmStatsTest {
}
public void shouldBeOkIfAnySlotsInViewAreBelowThreshold() {
subAlarmStats.getStats().addValue(5, 1);
sendMetric(5, 1, false);
assertFalse(subAlarmStats.evaluateAndSlideWindow(62, 1));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
subAlarmStats.getStats().addValue(1, 62);
sendMetric(1, 62, false);
assertTrue(subAlarmStats.evaluateAndSlideWindow(122, 1));
// This went to OK because at least one period is under the threshold
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK);
subAlarmStats.getStats().addValue(5, 123);
sendMetric(5, 123, false);
assertFalse(subAlarmStats.evaluateAndSlideWindow(182, 1));
// Still one under the threshold
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK);
}
public void shouldBeAlarmedIfAllSlotsInViewExceedThreshold() {
subAlarmStats.getStats().addValue(5, 1);
sendMetric(5, 1, false);
assertFalse(subAlarmStats.evaluateAndSlideWindow(62, 1));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
subAlarmStats.getStats().addValue(5, 62);
sendMetric(5, 62, false);
assertFalse(subAlarmStats.evaluateAndSlideWindow(122, 1));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
subAlarmStats.getStats().addValue(5, 123);
sendMetric(5, 123, false);
assertTrue(subAlarmStats.evaluateAndSlideWindow(182, 1));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM);
}
@ -81,32 +82,102 @@ public class SubAlarmStatsTest {
long initialTime = 11;
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
// Add value and trigger OK
subAlarmStats.getStats().addValue(1, initialTime - 1);
sendMetric(1, initialTime - 1, false);
assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK);
// Slide in some values that exceed the threshold
sendMetric(5, initialTime - 1, false);
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK);
sendMetric(5, initialTime - 1, false);
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK);
sendMetric(5, initialTime - 1, false);
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK);
// Trigger ALARM
assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM);
// Add value and trigger OK
sendMetric(1, initialTime - 1, false);
assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK);
// Must slide 8 times total from the last added value to trigger UNDETERMINED. This is
// equivalent to the behavior in CloudWatch for an alarm with 3 evaluation periods. 2 more
// slides to move the value outside of the window and 6 more to exceed the observation
// threshold.
for (int i = 0; i < 7; i++) {
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10));
}
assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 10));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
sendMetric(5, initialTime - 1, false);
}
private void sendMetric(double value, long timestamp, boolean expected) {
subAlarmStats.getStats().addValue(value, timestamp);
assertEquals(subAlarmStats.evaluateAndSlideWindow(timestamp, timestamp), expected);
}
/**
* Simulates the way a window will fill up in practice.
*/
public void shouldImmediatelyEvaluate() {
long initialTime = 11;
// Need a different expression for this test
expression =
new SubExpression(UUID.randomUUID().toString(),
AlarmSubExpression.of("max(hpcs.compute.cpu{id=5}, 60) > 3 times 3"));
subAlarm = new SubAlarm("123", "1", expression);
subAlarm.setNoState(true);
subAlarmStats = new SubAlarmStats(subAlarm, expression.getAlarmSubExpression().getPeriod());
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
// Add value and trigger OK
sendMetric(1, initialTime - 1, false);
assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK);
// Slide in some values that exceed the threshold
subAlarmStats.getStats().addValue(5, initialTime - 1);
sendMetric(5, initialTime - 1, false);
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
subAlarmStats.getStats().addValue(5, initialTime - 1);
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK);
sendMetric(5, initialTime - 1, false);
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
subAlarmStats.getStats().addValue(5, initialTime - 1);
// Trigger ALARM
assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
sendMetric(5, initialTime - 1, true);
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM);
// Ensure it is still ALARM on next evaluation
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM);
// Add value and trigger OK
subAlarmStats.getStats().addValue(1, initialTime - 1);
sendMetric(1, initialTime - 1, false);
assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK);
// Must slide 9 times total from the last added value to trigger UNDETERMINED. This is
// Must slide 8 times total from the last added value to trigger UNDETERMINED. This is
// equivalent to the behavior in CloudWatch for an alarm with 3 evaluation periods. 2 more
// slides to move the value outside of the window and 6 more to exceed the observation
// threshold.
@ -115,7 +186,18 @@ public class SubAlarmStatsTest {
}
assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
subAlarmStats.getStats().addValue(5, initialTime - 1);
// Now test that future buckets are evaluated
// Set the current bucket to ALARM
sendMetric(5, initialTime - 1, false);
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
// Set the future bucket of current + 2 to ALARM
sendMetric(5, initialTime + 120, false);
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED);
// Set the future bucket of current + 1 to ALARM. That will trigger the
// SubAlarm to go to ALARM
sendMetric(5, initialTime + 60, true);
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM);
}
public void shouldAlarmIfAllSlotsAlarmed() {
@ -125,13 +207,13 @@ public class SubAlarmStatsTest {
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
subAlarmStats.getStats().addValue(5, initialTime - 1);
sendMetric(5, initialTime - 1, false);
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
subAlarmStats.getStats().addValue(5, initialTime - 1);
sendMetric(5, initialTime - 1, false);
assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
subAlarmStats.getStats().addValue(5, initialTime - 1);
sendMetric(5, initialTime - 1, false);
assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60, 1));
assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM);
}
@ -145,6 +227,31 @@ public class SubAlarmStatsTest {
assertEquals(saStats.emptyWindowObservationThreshold, 6);
}
public void checkUpdateSubAlarm() {
// Can keep data with threshold change
verifyUpdateSubAlarm(expression.getAlarmSubExpression().getExpression().replace("> 3", "> 6"), 100.0);
// Can keep data with operator change
verifyUpdateSubAlarm(expression.getAlarmSubExpression().getExpression().replace("< 3", "< 6"), 100.0);
// Have to flush data with function change
verifyUpdateSubAlarm(expression.getAlarmSubExpression().getExpression().replace("avg", "max"), Double.NaN);
// Have to flush data with periods change
verifyUpdateSubAlarm(expression.getAlarmSubExpression().getExpression().replace("times 3", "times 2"), Double.NaN);
// Have to flush data with period change
verifyUpdateSubAlarm(expression.getAlarmSubExpression().getExpression().replace(", 60", ", 120"), Double.NaN);
}
private void verifyUpdateSubAlarm(String newExpressionString, double expectedValue) {
final AlarmSubExpression newExpression = AlarmSubExpression.of(newExpressionString);
assertNotEquals(newExpression, expression.getAlarmSubExpression().getExpression());
int timestamp = expression.getAlarmSubExpression().getPeriod() / 2;
sendMetric(100.00, timestamp, false);
assertEquals(subAlarmStats.getStats().getValue(timestamp), 100.0);
subAlarmStats.updateSubAlarm(newExpression, expression.getAlarmSubExpression().getPeriod());
assertEquals(subAlarmStats.getStats().getValue(timestamp), expectedValue);
assertTrue(subAlarm.isNoState());
}
public void checkLongPeriod() {
final SubExpression subExpr = new SubExpression(UUID.randomUUID().toString(),
AlarmSubExpression.of("sum(hpcs.compute.mem{id=5}, 120) >= 96"));

View File

@ -0,0 +1,65 @@
/*
* Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monasca.thresh.domain.model;
import static org.testng.Assert.assertEquals;
import monasca.common.model.alarm.AlarmSubExpression;
import org.testng.annotations.Test;
import java.util.UUID;
@Test
public class SubAlarmTest {
public void checkCanEvaluateImmediately() {
checkExpression("avg(hpcs.compute.cpu{id=5}, 60) > 3 times 3", false);
checkExpression("avg(hpcs.compute.cpu{id=5}, 60) >= 3 times 3", false);
checkExpression("avg(hpcs.compute.cpu{id=5}, 60) < 3 times 3", false);
checkExpression("avg(hpcs.compute.cpu{id=5}, 60) <= 3 times 3", false);
checkExpression("sum(hpcs.compute.cpu{id=5}, 60) > 3 times 3", false);
checkExpression("sum(hpcs.compute.cpu{id=5}, 60) >= 3 times 3", false);
checkExpression("sum(hpcs.compute.cpu{id=5}, 60) < 3 times 3", false);
checkExpression("sum(hpcs.compute.cpu{id=5}, 60) <= 3 times 3", false);
checkExpression("count(hpcs.compute.cpu{id=5}, 60) < 3 times 3", false);
checkExpression("count(hpcs.compute.cpu{id=5}, 60) <= 3 times 3", false);
checkExpression("count(hpcs.compute.cpu{id=5}, 60) > 3 times 3", true);
checkExpression("count(hpcs.compute.cpu{id=5}, 60) >= 3 times 3", true);
checkExpression("max(hpcs.compute.cpu{id=5}, 60) > 3 times 3", true);
checkExpression("max(hpcs.compute.cpu{id=5}, 60) >= 3 times 3", true);
checkExpression("max(hpcs.compute.cpu{id=5}, 60) < 3 times 3", false);
checkExpression("max(hpcs.compute.cpu{id=5}, 60) <= 3 times 3", false);
checkExpression("min(hpcs.compute.cpu{id=5}, 60) > 3 times 3", false);
checkExpression("min(hpcs.compute.cpu{id=5}, 60) >= 3 times 3", false);
checkExpression("min(hpcs.compute.cpu{id=5}, 60) < 3 times 3", true);
checkExpression("min(hpcs.compute.cpu{id=5}, 60) <= 3 times 3", true);
}
private void checkExpression(String expressionString, boolean expected) {
final SubExpression expression =
new SubExpression(UUID.randomUUID().toString(),
AlarmSubExpression.of(expressionString));
final SubAlarm subAlarm = new SubAlarm(UUID.randomUUID().toString(), "1", expression);
assertEquals(subAlarm.canEvaluateImmediately(), expected);
}
}

View File

@ -82,7 +82,7 @@ public class MetricAggregationBoltTest {
System.clearProperty(MetricAggregationBolt.TICK_TUPLE_SECONDS_KEY);
subExpr1 = new SubExpression("444", AlarmSubExpression.of("avg(hpcs.compute.cpu{id=5}, 60) >= 90 times 3"));
subExpr2 = new SubExpression("555", AlarmSubExpression.of("avg(hpcs.compute.mem{id=5}, 60) >= 90"));
subExpr3 = new SubExpression("666", AlarmSubExpression.of("avg(hpcs.compute.mem{id=5}, 60) >= 96"));
subExpr3 = new SubExpression("666", AlarmSubExpression.of("max(hpcs.compute.mem{id=5}, 60) >= 96"));
metricDef1 = subExpr1.getAlarmSubExpression().getMetricDefinition();
metricDef2 = subExpr2.getAlarmSubExpression().getMetricDefinition();
metricDef3 = subExpr3.getAlarmSubExpression().getMetricDefinition();
@ -100,7 +100,7 @@ public class MetricAggregationBoltTest {
subAlarms.add(subAlarm3);
final ThresholdingConfiguration config = new ThresholdingConfiguration();
config.alarmDelay = 1;
config.alarmDelay = 10;
bolt = new MockMetricAggregationBolt(config);
context = mock(TopologyContext.class);
collector = mock(OutputCollector.class);
@ -150,11 +150,9 @@ public class MetricAggregationBoltTest {
bolt.execute(createMetricTuple(metricDef1, new Metric(metricDef1, t1 - 60000, 95, null)));
bolt.execute(createMetricTuple(metricDef1, new Metric(metricDef1, t1 - 120000, 88, null)));
t1 += 20000;
t1 += 25000;
bolt.setCurrentTime(t1);
final Tuple tickTuple = createTickTuple();
bolt.execute(tickTuple);
verify(collector, times(1)).ack(tickTuple);
sendTickTuple();
assertEquals(subAlarm1.getState(), AlarmState.OK);
assertEquals(subAlarm2.getState(), AlarmState.UNDETERMINED);
@ -172,8 +170,7 @@ public class MetricAggregationBoltTest {
bolt.execute(createMetricTuple(metricDef2, new Metric(metricDef2, t1, 94, null)));
t1 += 50000;
bolt.setCurrentTime(t1);
bolt.execute(tickTuple);
verify(collector, times(1)).ack(tickTuple);
sendTickTuple();
assertEquals(subAlarm1.getState(), AlarmState.ALARM);
assertEquals(subAlarm2.getState(), AlarmState.ALARM);
@ -183,9 +180,101 @@ public class MetricAggregationBoltTest {
verify(collector, times(1)).emit(new Values(subAlarm3.getAlarmId(), subAlarm3));
}
public void shouldImmediatelyEvaluateSubAlarm() {
// Ensure subAlarm2 and subAlarm3 map to the same Metric Definition
assertEquals(metricDef3, metricDef2);
long t1 = 170000;
bolt.setCurrentTime(t1);
sendSubAlarmCreated(metricDef2, subAlarm2);
sendSubAlarmCreated(metricDef3, subAlarm3);
// Send metric for subAlarm2 and subAlarm3
bolt.execute(createMetricTuple(metricDef3, new Metric(metricDef3, t1 + 1000, 100000, null)));
// subAlarm2 is AVG so it can't be evaluated immediately like the MAX for subalarm3
assertEquals(subAlarm2.getState(), AlarmState.UNDETERMINED);
assertEquals(subAlarm3.getState(), AlarmState.ALARM);
verify(collector, never()).emit(new Values(subAlarm2.getAlarmId(), subAlarm2));
verify(collector, times(1)).emit(new Values(subAlarm3.getAlarmId(), subAlarm3));
// Have to reset the mock so it can tell the difference when subAlarm2 and subAlarm3 are emitted
// again.
reset(collector);
t1 = 195000;
bolt.setCurrentTime(t1);
sendTickTuple();
assertEquals(subAlarm2.getState(), AlarmState.ALARM);
assertEquals(subAlarm3.getState(), AlarmState.ALARM);
verify(collector, times(1)).emit(new Values(subAlarm2.getAlarmId(), subAlarm2));
verify(collector, never()).emit(new Values(subAlarm3.getAlarmId(), subAlarm3));
// Have to reset the mock so it can tell the difference when subAlarm2 and subAlarm3 are emitted
// again.
reset(collector);
// Now drive SubAlarms back to OK
t1 = 235000;
bolt.setCurrentTime(t1);
bolt.execute(createMetricTuple(metricDef3, new Metric(metricDef3, t1 + 1000, 20, null)));
t1 = 315000;
bolt.setCurrentTime(t1);
bolt.execute(createMetricTuple(metricDef3, new Metric(metricDef3, t1 + 1000, 20, null)));
sendTickTuple();
assertEquals(subAlarm2.getState(), AlarmState.OK);
assertEquals(subAlarm3.getState(), AlarmState.OK);
verify(collector, times(1)).emit(new Values(subAlarm2.getAlarmId(), subAlarm2));
verify(collector, times(1)).emit(new Values(subAlarm3.getAlarmId(), subAlarm3));
// Have to reset the mock so it can tell the difference when subAlarm2 and subAlarm3 are emitted
// again.
reset(collector);
// Now send a metric that is after the window end time but within alarm delay
t1 = 365000;
bolt.setCurrentTime(t1);
bolt.execute(createMetricTuple(metricDef3, new Metric(metricDef3, t1 + 1000, 100000, null)));
// subAlarm2 is AVG so it can't be evaluated immediately like the MAX for subalarm3
assertEquals(subAlarm2.getState(), AlarmState.OK);
assertEquals(subAlarm3.getState(), AlarmState.ALARM);
verify(collector, never()).emit(new Values(subAlarm2.getAlarmId(), subAlarm2));
verify(collector, times(1)).emit(new Values(subAlarm3.getAlarmId(), subAlarm3));
// Have to reset the mock so it can tell the difference when subAlarm2 and subAlarm3 are emitted
// again.
reset(collector);
t1 = 375000;
bolt.setCurrentTime(t1);
sendTickTuple();
// Ensure that subAlarm3 is still ALARM. subAlarm2 is still OK but because the metric
// that triggered ALARM is in the future bucket
assertEquals(subAlarm2.getState(), AlarmState.OK);
assertEquals(subAlarm3.getState(), AlarmState.ALARM);
verify(collector, never()).emit(new Values(subAlarm2.getAlarmId(), subAlarm2));
verify(collector, never()).emit(new Values(subAlarm3.getAlarmId(), subAlarm3));
}
private void sendTickTuple() {
final Tuple tickTuple = createTickTuple();
bolt.execute(tickTuple);
verify(collector, times(1)).ack(tickTuple);
}
public void shouldSendAlarmAgain() {
long t1 = 10000;
long t1 = 12000;
bolt.setCurrentTime(t1);
sendSubAlarmCreated(metricDef2, subAlarm2);
@ -195,11 +284,9 @@ public class MetricAggregationBoltTest {
t1 += 60000;
bolt.setCurrentTime(t1);
final Tuple tickTuple = createTickTuple();
bolt.execute(tickTuple);
sendTickTuple();
verify(collector, times(1)).emit(new Values(subAlarm2.getAlarmId(), subAlarm2));
assertEquals(subAlarm2.getState(), AlarmState.ALARM);
verify(collector, times(1)).ack(tickTuple);
sendSubAlarmResend(metricDef2, subAlarm2);
@ -209,9 +296,7 @@ public class MetricAggregationBoltTest {
t1 += 60000;
bolt.setCurrentTime(t1);
bolt.execute(tickTuple);
verify(collector, times(2)).ack(tickTuple);
sendTickTuple();
assertEquals(subAlarm2.getState(), AlarmState.ALARM);
verify(collector, times(2)).emit(new Values(subAlarm2.getAlarmId(), subAlarm2));
}
@ -253,12 +338,11 @@ public class MetricAggregationBoltTest {
bolt.execute(createMetricTuple(metricDef2, new Metric(metricDef2, t1, 1.0, null)));
bolt.setCurrentTime(t1 += 60000);
final Tuple tickTuple = createTickTuple();
bolt.execute(tickTuple);
sendTickTuple();
assertEquals(subAlarm2.getState(), AlarmState.OK);
bolt.setCurrentTime(t1 += 60000);
bolt.execute(tickTuple);
sendTickTuple();
assertEquals(subAlarm2.getState(), AlarmState.OK);
verify(collector, times(1)).emit(new Values(subAlarm2.getAlarmId(), subAlarm2));
@ -266,7 +350,7 @@ public class MetricAggregationBoltTest {
reset(collector);
bolt.setCurrentTime(t1 += 60000);
bolt.execute(tickTuple);
sendTickTuple();
assertEquals(subAlarm2.getState(), AlarmState.UNDETERMINED);
verify(collector, times(1)).emit(new Values(subAlarm2.getAlarmId(), subAlarm2));
}
@ -283,23 +367,19 @@ public class MetricAggregationBoltTest {
bolt.execute(lagTuple);
verify(collector, times(1)).ack(lagTuple);
final Tuple tickTuple = createTickTuple();
t1 += 60000;
bolt.setCurrentTime(t1);
bolt.execute(tickTuple);
verify(collector, times(1)).ack(tickTuple);
sendTickTuple();
verify(collector, never()).emit(new Values(subAlarm2.getAlarmId(), subAlarm2));
t1 += 60000;
bolt.setCurrentTime(t1);
bolt.execute(tickTuple);
verify(collector, times(2)).ack(tickTuple);
sendTickTuple();
verify(collector, never()).emit(new Values(subAlarm2.getAlarmId(), subAlarm2));
t1 += 60000;
bolt.setCurrentTime(t1);
bolt.execute(tickTuple);
verify(collector, times(3)).ack(tickTuple);
sendTickTuple();
assertEquals(subAlarm2.getState(), AlarmState.UNDETERMINED);
verify(collector, times(1)).emit(new Values(subAlarm2.getAlarmId(), subAlarm2));
@ -408,7 +488,7 @@ public class MetricAggregationBoltTest {
bolt.getOrCreateSubAlarmStatsRepo(metricDefinitionAndTenantId);
sendSubAlarmCreated(metricDef1, subAlarm1);
assertNotNull(bolt.metricDefToSubAlarmStatsRepos.get(metricDefinitionAndTenantId).get(ALARM_ID_1));
// We don't have an AlarmDefinition so no id, but the MetricAggregationBolt doesn't use this