diff --git a/pom.xml b/pom.xml index f6257a7..4d4703a 100644 --- a/pom.xml +++ b/pom.xml @@ -14,7 +14,7 @@ - 1.0.0.35 + 1.0.0.41 true UTF-8 diff --git a/src/main/java/com/hpcloud/mon/domain/model/SubAlarmStats.java b/src/main/java/com/hpcloud/mon/domain/model/SubAlarmStats.java index bf4cabb..00cc8d9 100644 --- a/src/main/java/com/hpcloud/mon/domain/model/SubAlarmStats.java +++ b/src/main/java/com/hpcloud/mon/domain/model/SubAlarmStats.java @@ -33,6 +33,7 @@ public class SubAlarmStats { public SubAlarmStats(SubAlarm subAlarm, TimeResolution timeResolution, long viewEndTimestamp) { slotWidth = subAlarm.getExpression().getPeriod(); this.subAlarm = subAlarm; + this.subAlarm.setNoState(true); this.stats = new SlidingWindowStats(subAlarm.getExpression().getFunction().toStatistic(), timeResolution, slotWidth, subAlarm.getExpression().getPeriods(), FUTURE_SLOTS, viewEndTimestamp); @@ -41,7 +42,7 @@ public class SubAlarmStats { // convert to minutes emptyWindowObservationThreshold = periodMinutes * subAlarm.getExpression().getPeriods() * UNDETERMINED_COEFFICIENT; - emptyWindowObservations = emptyWindowObservationThreshold; + emptyWindowObservations = 0; } /** @@ -89,8 +90,11 @@ public class SubAlarmStats { double[] values = stats.getViewValues(); AlarmState initialState = subAlarm.getState(); boolean thresholdExceeded = false; + boolean hasEmptyWindows = false; for (double value : values) { - if (!Double.isNaN(value)) { + if (Double.isNaN(value)) + hasEmptyWindows = true; + else { emptyWindowObservations = 0; // Check if value is OK @@ -99,17 +103,17 @@ public class SubAlarmStats { .evaluate(value, subAlarm.getExpression().getThreshold())) { if (AlarmState.OK.equals(initialState)) return false; - subAlarm.setState(AlarmState.OK); + setSubAlarmState(AlarmState.OK); return true; } else thresholdExceeded = true; } } - if (thresholdExceeded) { + if (thresholdExceeded && !hasEmptyWindows) { if (AlarmState.ALARM.equals(initialState)) return false; - subAlarm.setState(AlarmState.ALARM); + setSubAlarmState(AlarmState.ALARM); return true; } @@ -119,13 +123,18 @@ public class SubAlarmStats { if ((emptyWindowObservations >= emptyWindowObservationThreshold) && (subAlarm.isNoState() || !AlarmState.UNDETERMINED.equals(initialState)) && !subAlarm.isSporadicMetric()) { - subAlarm.setState(AlarmState.UNDETERMINED); + setSubAlarmState(AlarmState.UNDETERMINED); return true; } return false; } +private void setSubAlarmState(AlarmState newState) { + subAlarm.setState(newState); + subAlarm.setNoState(false); +} + /** * This MUST only be used for compatible SubAlarms, i.e. where * this.subAlarm.isCompatible(subAlarm) is true diff --git a/src/main/java/com/hpcloud/mon/domain/service/SubAlarmStatsRepository.java b/src/main/java/com/hpcloud/mon/domain/service/SubAlarmStatsRepository.java index bb27133..9c4b622 100644 --- a/src/main/java/com/hpcloud/mon/domain/service/SubAlarmStatsRepository.java +++ b/src/main/java/com/hpcloud/mon/domain/service/SubAlarmStatsRepository.java @@ -2,7 +2,6 @@ package com.hpcloud.mon.domain.service; import java.util.Collection; import java.util.HashMap; -import java.util.List; import java.util.Map; import com.hpcloud.mon.domain.model.SubAlarm; @@ -16,15 +15,6 @@ import com.hpcloud.mon.domain.model.SubAlarmStats; public class SubAlarmStatsRepository { private final Map subAlarmStats = new HashMap(); - /** - * Creates a new SubAlarmStatsRepository initialized with SubAlarmStats for each of the - * {@code subAlarms} with the {@code viewEndTimestamp}. - */ - public SubAlarmStatsRepository(List subAlarms, long viewEndTimestamp) { - for (SubAlarm subAlarm : subAlarms) - add(subAlarm, viewEndTimestamp); - } - /** * Creates a new SubAlarmStats instance for the {@code subAlarm} and {@code viewEndTimestamp} and * adds it to the repository. diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBolt.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBolt.java index 2b62ca6..5b4254f 100644 --- a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBolt.java +++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBolt.java @@ -64,7 +64,6 @@ public class MetricAggregationBolt extends BaseRichBolt { /** Namespaces for which metrics are received sporadically */ private Set sporadicMetricNamespaces = Collections.emptySet(); private OutputCollector collector; - private int evaluationTimeOffset; private boolean upToDate = true; public MetricAggregationBolt(SubAlarmDAO subAlarmDAO) { @@ -141,8 +140,6 @@ public class MetricAggregationBolt extends BaseRichBolt { LOG = LoggerFactory.getLogger(Logging.categoryFor(getClass(), context)); LOG.info("Preparing"); this.collector = collector; - evaluationTimeOffset = Integer.valueOf(System.getProperty(TICK_TUPLE_SECONDS_KEY, "60")) - .intValue(); if (subAlarmDAO == null) { Injector.registerIfNotBound(SubAlarmDAO.class, new PersistenceModule(dbConfig)); @@ -178,7 +175,7 @@ public class MetricAggregationBolt extends BaseRichBolt { upToDate = true; return; } - long newWindowTimestamp = System.currentTimeMillis() / 1000; + long newWindowTimestamp = currentTimeSeconds(); for (SubAlarmStatsRepository subAlarmStatsRepo : subAlarmStatsRepos.values()) for (SubAlarmStats subAlarmStats : subAlarmStatsRepo.get()) { LOG.debug("Evaluating {}", subAlarmStats); @@ -186,11 +183,18 @@ public class MetricAggregationBolt extends BaseRichBolt { LOG.debug("Alarm state changed for {}", subAlarmStats); collector.emit(new Values(subAlarmStats.getSubAlarm().getAlarmId(), subAlarmStats.getSubAlarm())); - subAlarmStats.getSubAlarm().setNoState(false); } } } + /** + * Only used for testing. + * @return + */ + protected long currentTimeSeconds() { + return System.currentTimeMillis() / 1000; + } + /** * Returns an existing or newly created SubAlarmStatsRepository for the {@code metricDefinitionAndTenantId}. * Newly created SubAlarmStatsRepositories are initialized with stats whose view ends one minute @@ -207,10 +211,12 @@ public class MetricAggregationBolt extends BaseRichBolt { for (SubAlarm subAlarm : subAlarms) { // TODO should treat metric def name prefix like a namespace subAlarm.setSporadicMetric(sporadicMetricNamespaces.contains(metricDefinitionAndTenantId.metricDefinition.name)); - subAlarm.setNoState(true); } - long viewEndTimestamp = (System.currentTimeMillis() / 1000) + evaluationTimeOffset; - subAlarmStatsRepo = new SubAlarmStatsRepository(subAlarms, viewEndTimestamp); + subAlarmStatsRepo = new SubAlarmStatsRepository(); + for (SubAlarm subAlarm : subAlarms) { + long viewEndTimestamp = currentTimeSeconds() + subAlarm.getExpression().getPeriod(); + subAlarmStatsRepo.add(subAlarm, viewEndTimestamp); + } subAlarmStatsRepos.put(metricDefinitionAndTenantId, subAlarmStatsRepo); } } @@ -223,7 +229,6 @@ public class MetricAggregationBolt extends BaseRichBolt { */ void handleAlarmCreated(MetricDefinitionAndTenantId metricDefinitionAndTenantId, SubAlarm subAlarm) { LOG.debug("Received AlarmCreatedEvent for {}", subAlarm); - subAlarm.setNoState(true); addSubAlarm(metricDefinitionAndTenantId, subAlarm); } @@ -232,7 +237,7 @@ public class MetricAggregationBolt extends BaseRichBolt { if (subAlarmStatsRepo == null) return; - long viewEndTimestamp = (System.currentTimeMillis() / 1000) + evaluationTimeOffset; + long viewEndTimestamp = currentTimeSeconds() + subAlarm.getExpression().getPeriod(); subAlarmStatsRepo.add(subAlarm, viewEndTimestamp); } diff --git a/src/test/java/com/hpcloud/mon/ThresholdingEngineAlarmTest.java b/src/test/java/com/hpcloud/mon/ThresholdingEngineAlarmTest.java index 518e182..e444b10 100644 --- a/src/test/java/com/hpcloud/mon/ThresholdingEngineAlarmTest.java +++ b/src/test/java/com/hpcloud/mon/ThresholdingEngineAlarmTest.java @@ -171,15 +171,13 @@ public class ThresholdingEngineAlarmTest extends TopologyTestCase { } ) .when(alarmEventForwarder).send(anyString(), anyString(), anyString()); - int waitCount = 0; - int feedCount = 5; int goodValueCount = 0; boolean firstUpdate = true; boolean secondUpdate = true; final Alarm initialAlarm = new Alarm(TEST_ALARM_ID, TEST_ALARM_TENANT_ID, TEST_ALARM_NAME, TEST_ALARM_DESCRIPTION, expression, subAlarms, AlarmState.UNDETERMINED, Boolean.TRUE); final int expectedAlarms = expectedStates.length; - for (int i = 1; alarmsSent != expectedAlarms && i < 150; i++) { + for (int i = 1; alarmsSent != expectedAlarms && i < 300; i++) { if (i == 5) { final Map exprs = createSubExpressionMap(); final AlarmCreatedEvent event = new AlarmCreatedEvent(TEST_ALARM_TENANT_ID, TEST_ALARM_ID, TEST_ALARM_NAME, @@ -222,31 +220,20 @@ public class ThresholdingEngineAlarmTest extends TopologyTestCase { System.out.printf("Send AlarmUpdatedEvent for expression %s%n", expression.getExpression()); } - if (feedCount > 0) { + else { System.out.println("Feeding metrics..."); long time = System.currentTimeMillis() / 1000; ++goodValueCount; for (final SubAlarm subAlarm : subAlarms) { - final MetricDefinitionAndTenantId metricDefinitionAndTenantId = - new MetricDefinitionAndTenantId(subAlarm.getExpression().getMetricDefinition(), TEST_ALARM_TENANT_ID); - metricSpout.feed(new Values(metricDefinitionAndTenantId, + final MetricDefinitionAndTenantId metricDefinitionAndTenantId = + new MetricDefinitionAndTenantId(subAlarm.getExpression().getMetricDefinition(), TEST_ALARM_TENANT_ID); + metricSpout.feed(new Values(metricDefinitionAndTenantId, new Metric(metricDefinitionAndTenantId.metricDefinition, time, (double) (goodValueCount == 15 ? 1 : 555)))); } - - if (--feedCount == 0) - waitCount = 3; - - if (goodValueCount == 15) - goodValueCount = 0; - } else { - System.out.println("Waiting..."); - if (--waitCount == 0) - feedCount = 5; } - try { - Thread.sleep(1000); + Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } diff --git a/src/test/java/com/hpcloud/mon/ThresholdingEngineTest.java b/src/test/java/com/hpcloud/mon/ThresholdingEngineTest.java index f2dc723..7a0ac6a 100644 --- a/src/test/java/com/hpcloud/mon/ThresholdingEngineTest.java +++ b/src/test/java/com/hpcloud/mon/ThresholdingEngineTest.java @@ -72,7 +72,7 @@ public class ThresholdingEngineTest extends TopologyTestCase { public ThresholdingEngineTest() { // Fixtures final AlarmExpression expression = new AlarmExpression( - "max(hpcs.compute.cpu{id=5}) >= 3 or max(hpcs.compute.mem{id=5}) >= 5 times 2"); + "max(cpu{id=5}) >= 3 or max(mem{id=5}) >= 5"); cpuMetricDef = expression.getSubExpressions().get(0).getMetricDefinition(); memMetricDef = expression.getSubExpressions().get(1).getMetricDefinition(); diff --git a/src/test/java/com/hpcloud/mon/domain/model/SubAlarmStatsTest.java b/src/test/java/com/hpcloud/mon/domain/model/SubAlarmStatsTest.java index 021e97a..09b2bad 100644 --- a/src/test/java/com/hpcloud/mon/domain/model/SubAlarmStatsTest.java +++ b/src/test/java/com/hpcloud/mon/domain/model/SubAlarmStatsTest.java @@ -9,7 +9,6 @@ import org.testng.annotations.Test; import com.hpcloud.mon.common.model.alarm.AlarmState; import com.hpcloud.mon.common.model.alarm.AlarmSubExpression; -import com.hpcloud.util.time.TimeResolution; /** * @author Jonathan Halterman @@ -22,42 +21,39 @@ public class SubAlarmStatsTest { @BeforeMethod protected void beforeMethod() { - expression = AlarmSubExpression.of("avg(hpcs.compute.cpu{id=5}, 1) > 3 times 3"); + expression = AlarmSubExpression.of("avg(hpcs.compute.cpu{id=5}, 60) > 3 times 3"); subAlarm = new SubAlarm("123", "1", expression); - subAlarmStats = new SubAlarmStats(subAlarm, TimeResolution.ABSOLUTE, 4); + subAlarm.setNoState(true); + subAlarmStats = new SubAlarmStats(subAlarm, expression.getPeriod()); } public void shouldBeOkIfAnySlotsInViewAreBelowThreshold() { subAlarmStats.getStats().addValue(5, 1); - assertTrue(subAlarmStats.evaluate()); - // This went to alarm because at least one period is over the threshold, - // none are under the threshold and the others are UNDETERMINED - assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM); + assertFalse(subAlarmStats.evaluateAndSlideWindow(61)); + assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED); - subAlarmStats.getStats().addValue(1, 2); - assertTrue(subAlarmStats.evaluate()); - // This went to alarm because at least one period is under the threshold + subAlarmStats.getStats().addValue(1, 62); + assertTrue(subAlarmStats.evaluateAndSlideWindow(121)); + // This went to OK because at least one period is under the threshold assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK); - subAlarmStats.getStats().addValue(5, 3); - assertFalse(subAlarmStats.evaluate()); + subAlarmStats.getStats().addValue(5, 123); + assertFalse(subAlarmStats.evaluateAndSlideWindow(181)); // Still one under the threshold assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK); } public void shouldBeAlarmedIfAllSlotsInViewExceedThreshold() { subAlarmStats.getStats().addValue(5, 1); - subAlarmStats.getStats().addValue(5, 2); - subAlarmStats.getStats().addValue(5, 3); + assertFalse(subAlarmStats.evaluateAndSlideWindow(61)); + assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED); - assertTrue(subAlarmStats.evaluate()); - assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM); - } + subAlarmStats.getStats().addValue(5, 62); + assertFalse(subAlarmStats.evaluateAndSlideWindow(121)); + assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED); - public void shouldBeAlarmedIfAllSlotsExceedThresholdOrAreUninitialized() { - subAlarmStats.getStats().addValue(5, 1); - - assertTrue(subAlarmStats.evaluate()); + subAlarmStats.getStats().addValue(5, 123); + assertTrue(subAlarmStats.evaluateAndSlideWindow(181)); assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM); } @@ -66,32 +62,31 @@ public class SubAlarmStatsTest { */ public void shouldEvaluateAndSlideWindow() { long initialTime = 11; - subAlarmStats = new SubAlarmStats(subAlarm, TimeResolution.ABSOLUTE, initialTime); assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED); - assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 1)); - assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 1)); - assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 1)); + assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60)); + assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60)); + assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60)); // Add value and trigger OK subAlarmStats.getStats().addValue(1, initialTime - 1); - assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 1)); + assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60)); assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK); // Slide in some values that exceed the threshold subAlarmStats.getStats().addValue(5, initialTime - 1); - assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 1)); + assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60)); subAlarmStats.getStats().addValue(5, initialTime - 1); - assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 1)); + assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60)); subAlarmStats.getStats().addValue(5, initialTime - 1); // Trigger ALARM - assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 1)); + assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60)); assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM); // Add value and trigger OK subAlarmStats.getStats().addValue(1, initialTime - 1); - assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 1)); + assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60)); assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.OK); // Must slide 9 times total from the last added value to trigger UNDETERMINED. This is @@ -99,9 +94,28 @@ public class SubAlarmStatsTest { // slides to move the value outside of the window and 6 more to exceed the observation // threshold. for (int i = 0; i < 7; i++) - assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 1)); - assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 1)); + assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60)); + assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60)); assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED); + subAlarmStats.getStats().addValue(5, initialTime - 1); + } + + public void shouldAlarmIfAllSlotsAlarmed() { + long initialTime = 11; + + assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.UNDETERMINED); + + assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60)); + + subAlarmStats.getStats().addValue(5, initialTime - 1); + assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60)); + + subAlarmStats.getStats().addValue(5, initialTime - 1); + assertFalse(subAlarmStats.evaluateAndSlideWindow(initialTime += 60)); + + subAlarmStats.getStats().addValue(5, initialTime - 1); + assertTrue(subAlarmStats.evaluateAndSlideWindow(initialTime += 60)); + assertEquals(subAlarmStats.getSubAlarm().getState(), AlarmState.ALARM); } public void testEmptyWindowObservationThreshold() { @@ -110,4 +124,27 @@ public class SubAlarmStatsTest { SubAlarmStats saStats = new SubAlarmStats(subAlarm, (System.currentTimeMillis() / 1000) + 60); assertEquals(saStats.emptyWindowObservationThreshold, 6); } + + public void checkLongPeriod() { + final AlarmSubExpression subExpr = AlarmSubExpression.of("sum(hpcs.compute.mem{id=5}, 120) >= 96"); + + final SubAlarm subAlarm = new SubAlarm("42", "4242", subExpr); + + long t1 = 0; + final SubAlarmStats stats = new SubAlarmStats(subAlarm, t1 + subExpr.getPeriod()); + for (int i = 0; i < 360; i++) { + t1++; + stats.getStats().addValue(1.0, t1); + if ((t1 % 60) == 0) { + stats.evaluateAndSlideWindow(t1); + if (i <= 60) + // First check will show it is OK. You could argue that this is incorrect + // as we have not waited for the whole period so we can't really evaluate it. + // That is true for sum and count + assertEquals(stats.getSubAlarm().getState(), AlarmState.OK); + else + assertEquals(stats.getSubAlarm().getState(), AlarmState.ALARM); + } + } + } } diff --git a/src/test/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBoltTest.java b/src/test/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBoltTest.java index d721bfb..a1ca21d 100644 --- a/src/test/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBoltTest.java +++ b/src/test/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBoltTest.java @@ -8,7 +8,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.Mockito.reset; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; @@ -50,7 +49,7 @@ import com.hpcloud.streaming.storm.Streams; @Test public class MetricAggregationBoltTest { private static final String TENANT_ID = "42"; - private MetricAggregationBolt bolt; + private MockMetricAggregationBolt bolt; private TopologyContext context; private OutputCollector collector; private List subAlarms; @@ -79,9 +78,9 @@ public class MetricAggregationBoltTest { @BeforeMethod protected void beforeMethod() { // Fixtures - subAlarm1 = new SubAlarm("123", "1", subExpr1, AlarmState.OK); - subAlarm2 = new SubAlarm("456", "1", subExpr2, AlarmState.OK); - subAlarm3 = new SubAlarm("789", "2", subExpr3, AlarmState.ALARM); + subAlarm1 = new SubAlarm("123", "1", subExpr1, AlarmState.UNDETERMINED); + subAlarm2 = new SubAlarm("456", "1", subExpr2, AlarmState.UNDETERMINED); + subAlarm3 = new SubAlarm("789", "2", subExpr3, AlarmState.UNDETERMINED); subAlarms = new ArrayList<>(); subAlarms.add(subAlarm1); subAlarms.add(subAlarm2); @@ -100,7 +99,7 @@ public class MetricAggregationBoltTest { } }); - bolt = new MetricAggregationBolt(dao); + bolt = new MockMetricAggregationBolt(dao); context = mock(TopologyContext.class); collector = mock(OutputCollector.class); bolt.prepare(null, context, collector); @@ -127,7 +126,6 @@ public class MetricAggregationBoltTest { bolt.execute(createMetricTuple(metricDef2, null)); - // Send metrics for subAlarm1 long t1 = System.currentTimeMillis() / 1000; bolt.execute(createMetricTuple(metricDef1, new Metric(metricDef1, t1, 100))); @@ -142,12 +140,13 @@ public class MetricAggregationBoltTest { assertEquals(subAlarm2.getState(), AlarmState.UNDETERMINED); assertEquals(subAlarm3.getState(), AlarmState.UNDETERMINED); - verify(collector, times(1)).emit(new Values(subAlarm2.getAlarmId(), subAlarm2)); - verify(collector, times(1)).emit(new Values(subAlarm3.getAlarmId(), subAlarm3)); + verify(collector, times(1)).emit(new Values(subAlarm1.getAlarmId(), subAlarm1)); // Have to reset the mock so it can tell the difference when subAlarm2 and subAlarm3 are emitted again. reset(collector); + // Drive subAlarm1 to ALARM bolt.execute(createMetricTuple(metricDef1, new Metric(metricDef1, t1, 99))); + // Drive subAlarm2 to ALARM and subAlarm3 to OK since they use the same MetricDefinition bolt.execute(createMetricTuple(metricDef2, new Metric(metricDef2, System.currentTimeMillis() / 1000, 94))); bolt.execute(tickTuple); verify(collector, times(1)).ack(tickTuple); @@ -161,21 +160,33 @@ public class MetricAggregationBoltTest { } public void shouldSendUndeterminedIfStateChanges() { - - assertNotEquals(AlarmState.UNDETERMINED, subAlarm2.getState()); + long t1 = System.currentTimeMillis() / 1000; + bolt.setCurrentTime(t1); bolt.execute(createMetricTuple(metricDef2, null)); + t1 += 1; + bolt.execute(createMetricTuple(metricDef2, new Metric(metricDef2, t1, 1.0))); + bolt.setCurrentTime(t1 += 60); final Tuple tickTuple = createTickTuple(); bolt.execute(tickTuple); + assertEquals(subAlarm2.getState(), AlarmState.OK); - assertEquals(AlarmState.UNDETERMINED, subAlarm2.getState()); + bolt.setCurrentTime(t1 += 60); + bolt.execute(tickTuple); + assertEquals(subAlarm2.getState(), AlarmState.OK); + verify(collector, times(1)).emit(new Values(subAlarm2.getAlarmId(), subAlarm2)); + + // Have to reset the mock so it can tell the difference when subAlarm2 is emitted again. + reset(collector); + + bolt.setCurrentTime(t1 += 60); + bolt.execute(tickTuple); + assertEquals(subAlarm2.getState(), AlarmState.UNDETERMINED); verify(collector, times(1)).emit(new Values(subAlarm2.getAlarmId(), subAlarm2)); } public void shouldSendUndeterminedOnStartup() { - subAlarm2.setNoState(true); - subAlarm2.setState(AlarmState.UNDETERMINED); bolt.execute(createMetricTuple(metricDef2, null)); final MkTupleParam tupleParam = new MkTupleParam(); @@ -191,6 +202,11 @@ public class MetricAggregationBoltTest { bolt.execute(tickTuple); verify(collector, times(2)).ack(tickTuple); + verify(collector, never()).emit(new Values(subAlarm2.getAlarmId(), subAlarm2)); + + bolt.execute(tickTuple); + verify(collector, times(3)).ack(tickTuple); + assertEquals(subAlarm2.getState(), AlarmState.UNDETERMINED); verify(collector, times(1)).emit(new Values(subAlarm2.getAlarmId(), subAlarm2)); } @@ -311,4 +327,25 @@ public class MetricAggregationBoltTest { tupleParam.setStream(Streams.DEFAULT_STREAM_ID); return Testing.testTuple(Arrays.asList(new MetricDefinitionAndTenantId(metricDef, TENANT_ID), metric), tupleParam); } + + private static class MockMetricAggregationBolt extends MetricAggregationBolt { + private static final long serialVersionUID = 1L; + + private long currentTime; + + public MockMetricAggregationBolt(SubAlarmDAO subAlarmDAO) { + super(subAlarmDAO); + } + + @Override + protected long currentTimeSeconds() { + if (currentTime != 0) + return currentTime; + return super.currentTimeSeconds(); + } + + public void setCurrentTime(long currentTime) { + this.currentTime = currentTime; + } + } }