From 3927da16971dbdeec78f4d203f0de035083906cd Mon Sep 17 00:00:00 2001 From: Craig Bryant Date: Mon, 20 Jun 2016 16:33:15 -0600 Subject: [PATCH] Save Measurements that arrive before their SubAlarms Thresh creates an Alarm when a new Measurement matches an AlarmDefinition. The previous Thresh code just discarded the Measurement if it arrived before the newly created SubAlarm, which was likely to occur. This code saves a Measurement that does not match an existing SubAlarm in the expectation that the SubAlarm will arrive very soon. It then adds the Measurement to the SubAlarm. If the measurement would cause the SubAlarm to transition to the ALARM state, that happens. This is more important for determinstic alarms because they will get fewer Measurements and ignoring the first one may prevent an Alarm's state going to ALARM when it should Change-Id: I08e9e481ad55862ba602eba5a68eb371b1d35bbc --- .../thresholding/MetricAggregationBolt.java | 71 +++++++++++++++++-- .../MetricAggregationBoltTest.java | 42 ++++++++++- 2 files changed, 103 insertions(+), 10 deletions(-) diff --git a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBolt.java b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBolt.java index 5bec0e0..060781e 100644 --- a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBolt.java +++ b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBolt.java @@ -42,7 +42,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -73,6 +72,7 @@ public class MetricAggregationBolt extends BaseRichBolt { public static final String METRIC_AGGREGATION_CONTROL_STREAM = "MetricAggregationControl"; public static final String[] METRIC_AGGREGATION_CONTROL_FIELDS = new String[] {"directive"}; public static final String METRICS_BEHIND = "MetricsBehind"; + private static final int MAX_SAVED_METRIC_AGE_SECONDS = 10; private final ThresholdingConfiguration config; final Map metricDefToSubAlarmStatsRepos = @@ -81,10 +81,9 @@ public class MetricAggregationBolt extends BaseRichBolt { private final Map subAlarmToSubAlarmStats = new HashMap<>(); private transient Logger logger; - /** Namespaces for which metrics are received sporadically */ - private Set sporadicMetricNamespaces = Collections.emptySet(); private OutputCollector collector; private boolean upToDate = true; + private Map savedMetrics = new HashMap<>(); public MetricAggregationBolt(ThresholdingConfiguration config) { this.config = config; @@ -176,12 +175,15 @@ public class MetricAggregationBolt extends BaseRichBolt { void aggregateValues(MetricDefinitionAndTenantId metricDefinitionAndTenantId, Metric metric) { SubAlarmStatsRepository subAlarmStatsRepo = getOrCreateSubAlarmStatsRepo(metricDefinitionAndTenantId); - if (subAlarmStatsRepo == null || metric == null) { + if (subAlarmStatsRepo == null) { + // This is probably the metric that will cause the creation of a new SubAlarm, save it until + // the SubAlarm comes in + savedMetrics.put(metricDefinitionAndTenantId, metric); return; } for (SubAlarmStats stats : subAlarmStatsRepo.get()) { - long timestamp_secs = metric.timestamp/1000; + final long timestamp_secs = metricTimestampInSeconds(metric); if (stats.getStats().addValue(metric.value, timestamp_secs)) { logger.trace("Aggregated value {} at {} for {}. Updated {}", metric.value, metric.timestamp, metricDefinitionAndTenantId, stats.getStats()); @@ -196,6 +198,16 @@ public class MetricAggregationBolt extends BaseRichBolt { } } + /** + * Return the Metric's timestamp in seconds + * + * @param metric Metric to use + * @return Metric's timestamp in seconds + */ + private long metricTimestampInSeconds(Metric metric) { + return metric.timestamp/1000; + } + /** * Evaluates all SubAlarms for all SubAlarmStatsRepositories using an evaluation time of 1 minute * ago, then sliding the window to the current time. @@ -217,6 +229,38 @@ public class MetricAggregationBolt extends BaseRichBolt { logger.info("Did not evaluate SubAlarms because Metrics are not up to date"); upToDate = true; } + cleanSavedMetrics(); + } + + /** + * Clean saved metrics since the SubAlarm should show up within seconds of + * the metric being received + */ + private void cleanSavedMetrics() { + if (savedMetrics.isEmpty()) { + return; + } + final List toRemove = new ArrayList<>(); + for (Map.Entry entry: savedMetrics.entrySet()) { + if (savedMetricTooOld(entry.getValue())) { + toRemove.add(entry.getKey()); + } + } + logger.debug("Removing {} too old saved metrics", toRemove.size()); + for (MetricDefinitionAndTenantId mdtid : toRemove) { + savedMetrics.remove(mdtid); + } + } + + /** + * Check if a save Metric is too old + * @param Metric to check + * @return true if saved Metric is too old, false otherwise + */ + private boolean savedMetricTooOld(final Metric metric) { + final long now = currentTimeSeconds(); + final long age = metricTimestampInSeconds(metric) - now; + return age > MAX_SAVED_METRIC_AGE_SECONDS; } private void sendSubAlarmStateChange(SubAlarmStats subAlarmStats) { @@ -286,7 +330,19 @@ public class MetricAggregationBolt extends BaseRichBolt { */ void handleAlarmCreated(MetricDefinitionAndTenantId metricDefinitionAndTenantId, SubAlarm subAlarm) { logger.info("Received AlarmCreatedEvent for {}", subAlarm); - addSubAlarm(metricDefinitionAndTenantId, subAlarm); + final SubAlarmStats newStats = addSubAlarm(metricDefinitionAndTenantId, subAlarm); + // See if we have a saved metric for this SubAlarm. Add to the SubAlarm if we do. + // Because the Metric comes directly from the MetricFilterinBolt but the + // SubAlarm comes from the AlarmCreationBolt, it is very likely that the + // Metric arrives first + final Metric metric = savedMetrics.get(metricDefinitionAndTenantId); + if (metric != null && !savedMetricTooOld(metric)) { + aggregateValues(metricDefinitionAndTenantId, metric); + logger.trace("Aggregated saved value {} at {} for {}. Updated {}", metric.value, + metric.timestamp, metricDefinitionAndTenantId, newStats.getStats()); + // The metric is not deleted from savedMetrics because it is possible that + // the metric fits into two different SubAlarms. Not likely, but possible + } } void handleAlarmResend(MetricDefinitionAndTenantId metricDefinitionAndTenantId, @@ -319,7 +375,7 @@ public class MetricAggregationBolt extends BaseRichBolt { return oldSubAlarmStats; } - private void addSubAlarm(MetricDefinitionAndTenantId metricDefinitionAndTenantId, + private SubAlarmStats addSubAlarm(MetricDefinitionAndTenantId metricDefinitionAndTenantId, SubAlarm subAlarm) { SubAlarmStats subAlarmStats = subAlarmToSubAlarmStats.get(subAlarm.getId()); if (subAlarmStats == null) { @@ -334,6 +390,7 @@ public class MetricAggregationBolt extends BaseRichBolt { metricDefToSubAlarmStatsRepos.put(metricDefinitionAndTenantId, subAlarmStatsRepo); } subAlarmStatsRepo.add(subAlarm.getId(), subAlarmStats); + return subAlarmStats; } protected boolean subAlarmRemoved(final String subAlarmId, MetricDefinitionAndTenantId metricDefinitionAndTenantId) { diff --git a/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBoltTest.java b/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBoltTest.java index 894d7e5..e32c583 100644 --- a/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBoltTest.java +++ b/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBoltTest.java @@ -308,6 +308,34 @@ public class MetricAggregationBoltTest { verify(collector, never()).emit(new Values(subAlarm4.getAlarmId(), subAlarm4)); } + public void shouldImmediatelyGoToAlarmUsingSavedMetric() { + long t1 = 170000; + bolt.setCurrentTime(t1); + bolt.execute(createMetricTuple(metricDef1, new Metric(metricDef2, t1 + 1000, 100000, null))); + bolt.execute(createMetricTuple(metricDef3, new Metric(metricDef3, t1 + 1000, 100000, null))); + bolt.execute(createMetricTuple(metricDef4, new Metric(metricDef4, t1 + 1000, 100000, null))); + + sendSubAlarmCreated(metricDef2, subAlarm2); + sendSubAlarmCreated(metricDef3, subAlarm3); + sendSubAlarmCreated(metricDef4, subAlarm4); + // Since the alarmDef4 expression is a count, we need to send more metrics to drive it to ALARM, + // but it won't transition without the saved one + assertEquals(subAlarm4.getState(), AlarmState.OK); + bolt.execute(createMetricTuple(metricDef4, new Metric(metricDef4, t1 + 1001, 100000, null))); + bolt.execute(createMetricTuple(metricDef4, new Metric(metricDef4, t1 + 1002, 100000, null))); + bolt.execute(createMetricTuple(metricDef4, new Metric(metricDef4, t1 + 1003, 100000, null))); + bolt.execute(createMetricTuple(metricDef4, new Metric(metricDef4, t1 + 1004, 100000, null))); + + // subAlarm2 is AVG so it can't be evaluated immediately like the MAX for subalarm3 + assertEquals(subAlarm2.getState(), AlarmState.UNDETERMINED); + assertEquals(subAlarm3.getState(), AlarmState.ALARM); + assertEquals(subAlarm4.getState(), AlarmState.ALARM); + + verify(collector, never()).emit(new Values(subAlarm2.getAlarmId(), subAlarm2)); + verify(collector, times(1)).emit(new Values(subAlarm3.getAlarmId(), subAlarm3)); + verify(collector, times(1)).emit(new Values(subAlarm4.getAlarmId(), subAlarm4)); + } + private void sendTickTuple() { final Tuple tickTuple = createTickTuple(); bolt.execute(tickTuple); @@ -397,12 +425,12 @@ public class MetricAggregationBoltTest { verify(collector, times(1)).emit(new Values(subAlarm2.getAlarmId(), subAlarm2)); } - public void shouldSendOkAfterAlarmIfNoMetrics() { + public void shouldSendAlarmIfMetricBeforeSubAlarm() { long t1 = 50000; - bolt.setCurrentTime(t1); - sendSubAlarmCreated(metricDef4, subAlarm4); bolt.execute(createMetricTuple(metricDef4, new Metric(metricDef4, t1, 1.0, null))); t1 += 1000; + bolt.setCurrentTime(t1); + sendSubAlarmCreated(metricDef4, subAlarm4); bolt.execute(createMetricTuple(metricDef4, new Metric(metricDef4, t1, 1.0, null))); t1 += 1000; bolt.execute(createMetricTuple(metricDef4, new Metric(metricDef4, t1, 1.0, null))); @@ -415,6 +443,14 @@ public class MetricAggregationBoltTest { sendTickTuple(); assertEquals(subAlarm4.getState(), AlarmState.ALARM); verify(collector, times(1)).emit(new Values(subAlarm4.getAlarmId(), subAlarm4)); + } + + public void shouldSendOkAfterAlarmIfNoMetrics() { + long t1 = 50000; + bolt.setCurrentTime(t1); + sendSubAlarmCreated(metricDef4, subAlarm4); + bolt.execute(createMetricTuple(metricDef4, new Metric(metricDef4, t1, 1.0, null))); + t1 += 1000; // Have to reset the mock so it can tell the difference when subAlarm4 is emitted again. reset(collector);