Save Measurements that arrive before their SubAlarms

Thresh creates an Alarm when a new Measurement matches an
AlarmDefinition. The previous Thresh code just discarded the
Measurement if it arrived before the newly created SubAlarm,
which was likely to occur. This code saves a Measurement that
does not match an existing SubAlarm in the expectation that the
SubAlarm will arrive very soon. It then adds the Measurement
to the SubAlarm. If the measurement would cause the SubAlarm to
transition to the ALARM state, that happens.

This is more important for determinstic alarms because they will get
fewer Measurements and ignoring the first one may prevent an Alarm's
state going to ALARM when it should

Change-Id: I08e9e481ad55862ba602eba5a68eb371b1d35bbc
This commit is contained in:
Craig Bryant 2016-06-20 16:33:15 -06:00
parent 0d80a987db
commit 3927da1697
2 changed files with 103 additions and 10 deletions

View File

@ -42,7 +42,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -73,6 +72,7 @@ public class MetricAggregationBolt extends BaseRichBolt {
public static final String METRIC_AGGREGATION_CONTROL_STREAM = "MetricAggregationControl";
public static final String[] METRIC_AGGREGATION_CONTROL_FIELDS = new String[] {"directive"};
public static final String METRICS_BEHIND = "MetricsBehind";
private static final int MAX_SAVED_METRIC_AGE_SECONDS = 10;
private final ThresholdingConfiguration config;
final Map<MetricDefinitionAndTenantId, SubAlarmStatsRepository> metricDefToSubAlarmStatsRepos =
@ -81,10 +81,9 @@ public class MetricAggregationBolt extends BaseRichBolt {
private final Map<String, SubAlarmStats> subAlarmToSubAlarmStats = new HashMap<>();
private transient Logger logger;
/** Namespaces for which metrics are received sporadically */
private Set<String> sporadicMetricNamespaces = Collections.emptySet();
private OutputCollector collector;
private boolean upToDate = true;
private Map<MetricDefinitionAndTenantId, Metric> savedMetrics = new HashMap<>();
public MetricAggregationBolt(ThresholdingConfiguration config) {
this.config = config;
@ -176,12 +175,15 @@ public class MetricAggregationBolt extends BaseRichBolt {
void aggregateValues(MetricDefinitionAndTenantId metricDefinitionAndTenantId, Metric metric) {
SubAlarmStatsRepository subAlarmStatsRepo =
getOrCreateSubAlarmStatsRepo(metricDefinitionAndTenantId);
if (subAlarmStatsRepo == null || metric == null) {
if (subAlarmStatsRepo == null) {
// This is probably the metric that will cause the creation of a new SubAlarm, save it until
// the SubAlarm comes in
savedMetrics.put(metricDefinitionAndTenantId, metric);
return;
}
for (SubAlarmStats stats : subAlarmStatsRepo.get()) {
long timestamp_secs = metric.timestamp/1000;
final long timestamp_secs = metricTimestampInSeconds(metric);
if (stats.getStats().addValue(metric.value, timestamp_secs)) {
logger.trace("Aggregated value {} at {} for {}. Updated {}", metric.value,
metric.timestamp, metricDefinitionAndTenantId, stats.getStats());
@ -196,6 +198,16 @@ public class MetricAggregationBolt extends BaseRichBolt {
}
}
/**
* Return the Metric's timestamp in seconds
*
* @param metric Metric to use
* @return Metric's timestamp in seconds
*/
private long metricTimestampInSeconds(Metric metric) {
return metric.timestamp/1000;
}
/**
* Evaluates all SubAlarms for all SubAlarmStatsRepositories using an evaluation time of 1 minute
* ago, then sliding the window to the current time.
@ -217,6 +229,38 @@ public class MetricAggregationBolt extends BaseRichBolt {
logger.info("Did not evaluate SubAlarms because Metrics are not up to date");
upToDate = true;
}
cleanSavedMetrics();
}
/**
* Clean saved metrics since the SubAlarm should show up within seconds of
* the metric being received
*/
private void cleanSavedMetrics() {
if (savedMetrics.isEmpty()) {
return;
}
final List<MetricDefinitionAndTenantId> toRemove = new ArrayList<>();
for (Map.Entry<MetricDefinitionAndTenantId, Metric> entry: savedMetrics.entrySet()) {
if (savedMetricTooOld(entry.getValue())) {
toRemove.add(entry.getKey());
}
}
logger.debug("Removing {} too old saved metrics", toRemove.size());
for (MetricDefinitionAndTenantId mdtid : toRemove) {
savedMetrics.remove(mdtid);
}
}
/**
* Check if a save Metric is too old
* @param Metric to check
* @return true if saved Metric is too old, false otherwise
*/
private boolean savedMetricTooOld(final Metric metric) {
final long now = currentTimeSeconds();
final long age = metricTimestampInSeconds(metric) - now;
return age > MAX_SAVED_METRIC_AGE_SECONDS;
}
private void sendSubAlarmStateChange(SubAlarmStats subAlarmStats) {
@ -286,7 +330,19 @@ public class MetricAggregationBolt extends BaseRichBolt {
*/
void handleAlarmCreated(MetricDefinitionAndTenantId metricDefinitionAndTenantId, SubAlarm subAlarm) {
logger.info("Received AlarmCreatedEvent for {}", subAlarm);
addSubAlarm(metricDefinitionAndTenantId, subAlarm);
final SubAlarmStats newStats = addSubAlarm(metricDefinitionAndTenantId, subAlarm);
// See if we have a saved metric for this SubAlarm. Add to the SubAlarm if we do.
// Because the Metric comes directly from the MetricFilterinBolt but the
// SubAlarm comes from the AlarmCreationBolt, it is very likely that the
// Metric arrives first
final Metric metric = savedMetrics.get(metricDefinitionAndTenantId);
if (metric != null && !savedMetricTooOld(metric)) {
aggregateValues(metricDefinitionAndTenantId, metric);
logger.trace("Aggregated saved value {} at {} for {}. Updated {}", metric.value,
metric.timestamp, metricDefinitionAndTenantId, newStats.getStats());
// The metric is not deleted from savedMetrics because it is possible that
// the metric fits into two different SubAlarms. Not likely, but possible
}
}
void handleAlarmResend(MetricDefinitionAndTenantId metricDefinitionAndTenantId,
@ -319,7 +375,7 @@ public class MetricAggregationBolt extends BaseRichBolt {
return oldSubAlarmStats;
}
private void addSubAlarm(MetricDefinitionAndTenantId metricDefinitionAndTenantId,
private SubAlarmStats addSubAlarm(MetricDefinitionAndTenantId metricDefinitionAndTenantId,
SubAlarm subAlarm) {
SubAlarmStats subAlarmStats = subAlarmToSubAlarmStats.get(subAlarm.getId());
if (subAlarmStats == null) {
@ -334,6 +390,7 @@ public class MetricAggregationBolt extends BaseRichBolt {
metricDefToSubAlarmStatsRepos.put(metricDefinitionAndTenantId, subAlarmStatsRepo);
}
subAlarmStatsRepo.add(subAlarm.getId(), subAlarmStats);
return subAlarmStats;
}
protected boolean subAlarmRemoved(final String subAlarmId, MetricDefinitionAndTenantId metricDefinitionAndTenantId) {

View File

@ -308,6 +308,34 @@ public class MetricAggregationBoltTest {
verify(collector, never()).emit(new Values(subAlarm4.getAlarmId(), subAlarm4));
}
public void shouldImmediatelyGoToAlarmUsingSavedMetric() {
long t1 = 170000;
bolt.setCurrentTime(t1);
bolt.execute(createMetricTuple(metricDef1, new Metric(metricDef2, t1 + 1000, 100000, null)));
bolt.execute(createMetricTuple(metricDef3, new Metric(metricDef3, t1 + 1000, 100000, null)));
bolt.execute(createMetricTuple(metricDef4, new Metric(metricDef4, t1 + 1000, 100000, null)));
sendSubAlarmCreated(metricDef2, subAlarm2);
sendSubAlarmCreated(metricDef3, subAlarm3);
sendSubAlarmCreated(metricDef4, subAlarm4);
// Since the alarmDef4 expression is a count, we need to send more metrics to drive it to ALARM,
// but it won't transition without the saved one
assertEquals(subAlarm4.getState(), AlarmState.OK);
bolt.execute(createMetricTuple(metricDef4, new Metric(metricDef4, t1 + 1001, 100000, null)));
bolt.execute(createMetricTuple(metricDef4, new Metric(metricDef4, t1 + 1002, 100000, null)));
bolt.execute(createMetricTuple(metricDef4, new Metric(metricDef4, t1 + 1003, 100000, null)));
bolt.execute(createMetricTuple(metricDef4, new Metric(metricDef4, t1 + 1004, 100000, null)));
// subAlarm2 is AVG so it can't be evaluated immediately like the MAX for subalarm3
assertEquals(subAlarm2.getState(), AlarmState.UNDETERMINED);
assertEquals(subAlarm3.getState(), AlarmState.ALARM);
assertEquals(subAlarm4.getState(), AlarmState.ALARM);
verify(collector, never()).emit(new Values(subAlarm2.getAlarmId(), subAlarm2));
verify(collector, times(1)).emit(new Values(subAlarm3.getAlarmId(), subAlarm3));
verify(collector, times(1)).emit(new Values(subAlarm4.getAlarmId(), subAlarm4));
}
private void sendTickTuple() {
final Tuple tickTuple = createTickTuple();
bolt.execute(tickTuple);
@ -397,12 +425,12 @@ public class MetricAggregationBoltTest {
verify(collector, times(1)).emit(new Values(subAlarm2.getAlarmId(), subAlarm2));
}
public void shouldSendOkAfterAlarmIfNoMetrics() {
public void shouldSendAlarmIfMetricBeforeSubAlarm() {
long t1 = 50000;
bolt.setCurrentTime(t1);
sendSubAlarmCreated(metricDef4, subAlarm4);
bolt.execute(createMetricTuple(metricDef4, new Metric(metricDef4, t1, 1.0, null)));
t1 += 1000;
bolt.setCurrentTime(t1);
sendSubAlarmCreated(metricDef4, subAlarm4);
bolt.execute(createMetricTuple(metricDef4, new Metric(metricDef4, t1, 1.0, null)));
t1 += 1000;
bolt.execute(createMetricTuple(metricDef4, new Metric(metricDef4, t1, 1.0, null)));
@ -415,6 +443,14 @@ public class MetricAggregationBoltTest {
sendTickTuple();
assertEquals(subAlarm4.getState(), AlarmState.ALARM);
verify(collector, times(1)).emit(new Values(subAlarm4.getAlarmId(), subAlarm4));
}
public void shouldSendOkAfterAlarmIfNoMetrics() {
long t1 = 50000;
bolt.setCurrentTime(t1);
sendSubAlarmCreated(metricDef4, subAlarm4);
bolt.execute(createMetricTuple(metricDef4, new Metric(metricDef4, t1, 1.0, null)));
t1 += 1000;
// Have to reset the mock so it can tell the difference when subAlarm4 is emitted again.
reset(collector);