Change to set lastMinLagMessageSent to the time the first lagging metric is received. The problem is that storm can take a long time to send the first metric after prepare() is called and we want to give the Bolt time to clear the metrics before sending a lagging message.

This commit is contained in:
Craig Bryant 2014-05-01 14:13:58 -06:00
parent e216fdc574
commit 5835e250cc
2 changed files with 17 additions and 6 deletions

View File

@ -154,6 +154,9 @@ public class MetricFilteringBolt extends BaseRichBolt {
MAX_LAG_MESSAGES * LAG_MESSAGE_PERIOD, minLag);
lagging = false;
}
else if (lastMinLagMessageSent == 0) {
lastMinLagMessageSent = now;
}
else if ((now - lastMinLagMessageSent) >= LAG_MESSAGE_PERIOD) {
LOG.info("Sending {} message, minLag = {}", MetricAggregationBolt.METRICS_BEHIND, minLag);
collector.emit(MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM,
@ -200,8 +203,7 @@ public class MetricFilteringBolt extends BaseRichBolt {
}
}
}
// Not really when it was sent, but want to wait at least LAG_MESSAGE_PERIOD before sending a message
lastMinLagMessageSent = getCurrentSeconds();
lastMinLagMessageSent = 0;
}
/**

View File

@ -92,10 +92,13 @@ public class MetricFilteringBoltTest {
final long prepareTime = bolt.getCurrentSeconds();
final MetricDefinition metricDefinition = subAlarms.get(0).getExpression().getMetricDefinition();
final Tuple lateMetricTuple = createMetricTuple(metricDefinition, new Metric(metricDefinition, prepareTime, 42.0));
bolt.setCurrentSeconds(prepareTime + MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT);
final Tuple lateMetricTuple = createMetricTuple(metricDefinition, new Metric(metricDefinition, prepareTime - MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT, 42.0));
bolt.execute(lateMetricTuple);
verify(collector, times(1)).ack(lateMetricTuple);
bolt.setCurrentSeconds(prepareTime + MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT);
final Tuple lateMetricTuple2 = createMetricTuple(metricDefinition, new Metric(metricDefinition, prepareTime, 42.0));
bolt.execute(lateMetricTuple2);
verify(collector, times(1)).ack(lateMetricTuple2);
verify(collector, times(1)).emit(MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM,
new Values(MetricAggregationBolt.METRICS_BEHIND));
bolt.setCurrentSeconds(prepareTime + 2 * MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT);
@ -114,13 +117,19 @@ public class MetricFilteringBoltTest {
long prepareTime = bolt.getCurrentSeconds();
final MetricDefinition metricDefinition = subAlarms.get(0).getExpression().getMetricDefinition();
// Fake sending metrics for MetricFilteringBolt.MAX_LAG_MESSAGES_DEFAULT * MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT seconds
for (int i = 0; i < MetricFilteringBolt.MAX_LAG_MESSAGES_DEFAULT; i++) {
boolean first = true;
// Need to send MetricFilteringBolt.MAX_LAG_MESSAGES_DEFAULT + 1 metrics because the lag message is not
// output on the first one.
for (int i = 0; i < MetricFilteringBolt.MAX_LAG_MESSAGES_DEFAULT + 1; i++) {
final Tuple lateMetricTuple = createMetricTuple(metricDefinition, new Metric(metricDefinition, prepareTime, 42.0));
bolt.setCurrentSeconds(prepareTime + MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT);
bolt.execute(lateMetricTuple);
verify(collector, times(1)).ack(lateMetricTuple);
verify(collector, times(i + 1)).emit(MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM,
if (!first) {
verify(collector, times(i)).emit(MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM,
new Values(MetricAggregationBolt.METRICS_BEHIND));
}
first = false;
prepareTime = bolt.getCurrentSeconds();
}
// One more