diff --git a/pom.xml b/pom.xml index 43f2273..cd3d073 100644 --- a/pom.xml +++ b/pom.xml @@ -14,7 +14,7 @@ - 1.0.0.43 + 1.0.0.48 0.9.1-incubating true UTF-8 @@ -63,8 +63,8 @@ - + org.slf4j slf4j-api @@ -75,6 +75,17 @@ slf4j-log4j12 1.7.6 + + + org.scala-lang + scala-compiler + 2.9.2 + + + org.scala-lang + scala-library + 2.9.2 + diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBolt.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBolt.java index 28270c8..a4e497c 100644 --- a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBolt.java +++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBolt.java @@ -84,9 +84,9 @@ public class MetricFilteringBolt extends BaseRichBolt { public static final int LAG_MESSAGE_PERIOD_DEFAULT = 30; public static final String[] FIELDS = new String[] { "metricDefinitionAndTenantId", "metric" }; - private static final int MIN_LAG_VALUE = PropertyFinder.getIntProperty(MIN_LAG_VALUE_KEY, MIN_LAG_VALUE_DEFAULT, 0, Integer.MAX_VALUE); + private static final int MIN_LAG_VALUE = 1000 * PropertyFinder.getIntProperty(MIN_LAG_VALUE_KEY, MIN_LAG_VALUE_DEFAULT, 0, Integer.MAX_VALUE); private static final int MAX_LAG_MESSAGES = PropertyFinder.getIntProperty(MAX_LAG_MESSAGES_KEY, MAX_LAG_MESSAGES_DEFAULT, 0, Integer.MAX_VALUE); - private static final int LAG_MESSAGE_PERIOD = PropertyFinder.getIntProperty(LAG_MESSAGE_PERIOD_KEY, LAG_MESSAGE_PERIOD_DEFAULT, 1, 600); + private static final int LAG_MESSAGE_PERIOD = 1000 * PropertyFinder.getIntProperty(LAG_MESSAGE_PERIOD_KEY, LAG_MESSAGE_PERIOD_DEFAULT, 1, 600); private static final Map> METRIC_DEFS = new ConcurrentHashMap<>(); private static final MetricDefinitionAndTenantIdMatcher matcher = new MetricDefinitionAndTenantIdMatcher(); private static final Object SENTINAL = new Object(); @@ -121,8 +121,9 @@ public class MetricFilteringBolt extends BaseRichBolt { try { if (Streams.DEFAULT_STREAM_ID.equals(tuple.getSourceStreamId())) { final MetricDefinitionAndTenantId metricDefinitionAndTenantId = (MetricDefinitionAndTenantId) tuple.getValue(0); - final Metric metric = (Metric)tuple.getValue(1); - checkLag(metric); + final Long timestamp = (Long)tuple.getValue(1); + final Metric metric = (Metric)tuple.getValue(2); + checkLag(timestamp); LOG.debug("metric definition and tenant id: {}", metricDefinitionAndTenantId); // Check for exact matches as well as inexact matches @@ -153,31 +154,34 @@ public class MetricFilteringBolt extends BaseRichBolt { } } - private void checkLag(Metric metric) { - final long now = getCurrentSeconds(); - final long lag = now - metric.timestamp; + private void checkLag(Long apiTimeStamp) { + if (!lagging) + return; + if ((apiTimeStamp == null) || (apiTimeStamp.longValue() == 0)) + return; // Remove this code at some point, just to handle old metrics without a NPE + final long now = getCurrentTime(); + final long lag = now - apiTimeStamp.longValue(); if (lag < minLag) minLag = lag; - if (lagging) - if (minLag <= MIN_LAG_VALUE) { - lagging = false; - LOG.info("Metrics no longer lagging, minLag = {}", minLag); - } - else if (minLagMessageSent >= MAX_LAG_MESSAGES) { - LOG.info("Waited for {} seconds for Metrics to catch up. Giving up. minLag = {}", + if (minLag <= MIN_LAG_VALUE) { + lagging = false; + LOG.info("Metrics no longer lagging, minLag = {}", minLag); + } + else if (minLagMessageSent >= MAX_LAG_MESSAGES) { + LOG.info("Waited for {} seconds for Metrics to catch up. Giving up. minLag = {}", MAX_LAG_MESSAGES * LAG_MESSAGE_PERIOD, minLag); - lagging = false; - } - else if (lastMinLagMessageSent == 0) { - lastMinLagMessageSent = now; - } - else if ((now - lastMinLagMessageSent) >= LAG_MESSAGE_PERIOD) { - LOG.info("Sending {} message, minLag = {}", MetricAggregationBolt.METRICS_BEHIND, minLag); - collector.emit(MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM, + lagging = false; + } + else if (lastMinLagMessageSent == 0) { + lastMinLagMessageSent = now; + } + else if ((now - lastMinLagMessageSent) >= LAG_MESSAGE_PERIOD) { + LOG.info("Sending {} message, minLag = {}", MetricAggregationBolt.METRICS_BEHIND, minLag); + collector.emit(MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM, new Values(MetricAggregationBolt.METRICS_BEHIND)); - lastMinLagMessageSent = now; - minLagMessageSent++; - } + lastMinLagMessageSent = now; + minLagMessageSent++; + } } private void removeSubAlarm(MetricDefinitionAndTenantId metricDefinitionAndTenantId, String subAlarmId) { @@ -214,6 +218,11 @@ public class MetricFilteringBolt extends BaseRichBolt { // Iterate again to ensure we only emit each metricDef once for (MetricDefinitionAndTenantId metricDefinitionAndTenantId : METRIC_DEFS.keySet()) collector.emit(new Values(metricDefinitionAndTenantId, null)); + LOG.info("Found {} Metric Definitions", METRIC_DEFS.size()); + // Just output these here so they are only output once per JVM + LOG.info("MIN_LAG_VALUE set to {} seconds", MIN_LAG_VALUE/1000); + LOG.info("MAX_LAG_MESSAGES set to {}", MAX_LAG_MESSAGES); + LOG.info("LAG_MESSAGE_PERIOD set to {} seconds", LAG_MESSAGE_PERIOD/1000); } } } @@ -223,8 +232,8 @@ public class MetricFilteringBolt extends BaseRichBolt { /** * Allow override of current time for testing. */ - protected long getCurrentSeconds() { - return System.currentTimeMillis() / 1000; + protected long getCurrentTime() { + return System.currentTimeMillis(); } private void addMetricDef(MetricDefinitionAndTenantId metricDefinitionAndTenantId, String subAlarmId) { diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricSpout.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricSpout.java index 08c6d31..03eda73 100644 --- a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricSpout.java +++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricSpout.java @@ -34,7 +34,7 @@ public class MetricSpout extends KafkaSpout { private static final long serialVersionUID = 744004533863562119L; - public static final String[] FIELDS = new String[] { "metricDefinitionAndTenantId", "metric" }; + public static final String[] FIELDS = new String[] { "metricDefinitionAndTenantId", "apiTimeStamp", "metric" }; public static final String DEFAULT_TENANT_ID = "TENANT_ID_NOT_SET"; public MetricSpout(MetricSpoutConfig metricSpoutConfig) { @@ -58,7 +58,8 @@ public class MetricSpout extends KafkaSpout { LOG.error("No tenantId so using default tenantId {} for Metric {}", DEFAULT_TENANT_ID, metricEnvelope.metric); tenantId = DEFAULT_TENANT_ID; } - collector.emit(new Values(new MetricDefinitionAndTenantId(metricEnvelope.metric.definition(), tenantId), metricEnvelope.metric)); + collector.emit(new Values(new MetricDefinitionAndTenantId(metricEnvelope.metric.definition(), tenantId), + metricEnvelope.creationTime, metricEnvelope.metric)); } @Override diff --git a/src/test/java/com/hpcloud/mon/ThresholdingEngineAlarmTest.java b/src/test/java/com/hpcloud/mon/ThresholdingEngineAlarmTest.java index ddbb051..435936c 100644 --- a/src/test/java/com/hpcloud/mon/ThresholdingEngineAlarmTest.java +++ b/src/test/java/com/hpcloud/mon/ThresholdingEngineAlarmTest.java @@ -241,13 +241,13 @@ public class ThresholdingEngineAlarmTest extends TopologyTestCase { else { System.out.println("Feeding metrics..."); - long time = System.currentTimeMillis() / 1000; + long time = System.currentTimeMillis(); ++goodValueCount; for (final SubAlarm subAlarm : subAlarms) { final MetricDefinitionAndTenantId metricDefinitionAndTenantId = new MetricDefinitionAndTenantId(subAlarm.getExpression().getMetricDefinition(), TEST_ALARM_TENANT_ID); - metricSpout.feed(new Values(metricDefinitionAndTenantId, - new Metric(metricDefinitionAndTenantId.metricDefinition, time, (double) (goodValueCount == 15 ? 1 : 555)))); + metricSpout.feed(new Values(metricDefinitionAndTenantId, time, + new Metric(metricDefinitionAndTenantId.metricDefinition, time / 1000, (double) (goodValueCount == 15 ? 1 : 555)))); } } try { diff --git a/src/test/java/com/hpcloud/mon/ThresholdingEngineTest.java b/src/test/java/com/hpcloud/mon/ThresholdingEngineTest.java index 25b7c0d..5999c03 100644 --- a/src/test/java/com/hpcloud/mon/ThresholdingEngineTest.java +++ b/src/test/java/com/hpcloud/mon/ThresholdingEngineTest.java @@ -192,11 +192,11 @@ public class ThresholdingEngineTest extends TopologyTestCase { if (feedCount > 0) { System.out.println("Feeding metrics..."); - long time = System.currentTimeMillis() / 1000; - metricSpout.feed(new Values(new MetricDefinitionAndTenantId(cpuMetricDef, TEST_ALARM_TENANT_ID), new Metric(cpuMetricDef.name, - cpuMetricDef.dimensions, time, (double) (++goodValueCount == 15 ? 1 : 555)))); - metricSpout.feed(new Values(new MetricDefinitionAndTenantId(memMetricDef, TEST_ALARM_TENANT_ID), new Metric(memMetricDef.name, - extraMemMetricDefDimensions, time, (double) (goodValueCount == 15 ? 1 : 555)))); + long time = System.currentTimeMillis(); + metricSpout.feed(new Values(new MetricDefinitionAndTenantId(cpuMetricDef, TEST_ALARM_TENANT_ID), time, + new Metric(cpuMetricDef.name, cpuMetricDef.dimensions, time / 1000, (double) (++goodValueCount == 15 ? 1 : 555)))); + metricSpout.feed(new Values(new MetricDefinitionAndTenantId(memMetricDef, TEST_ALARM_TENANT_ID), time, + new Metric(memMetricDef.name, extraMemMetricDefDimensions, time / 1000, (double) (goodValueCount == 15 ? 1 : 555)))); if (--feedCount == 0) waitCount = 3; diff --git a/src/test/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBoltTest.java b/src/test/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBoltTest.java index 80cf7a1..801d9ce 100644 --- a/src/test/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBoltTest.java +++ b/src/test/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBoltTest.java @@ -107,20 +107,23 @@ public class MetricFilteringBoltTest { final MockMetricFilteringBolt bolt = createBolt(new ArrayList(0), collector, true); - final long prepareTime = bolt.getCurrentSeconds(); + final long prepareTime = bolt.getCurrentTime(); final MetricDefinition metricDefinition = subAlarms.get(0).getExpression().getMetricDefinition(); - final Tuple lateMetricTuple = createMetricTuple(metricDefinition, new Metric(metricDefinition, prepareTime - MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT, 42.0)); + final long oldestTimestamp = prepareTime - MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT*1000; + final Tuple lateMetricTuple = createMetricTuple(metricDefinition, oldestTimestamp, new Metric(metricDefinition, oldestTimestamp/1000, 42.0)); bolt.execute(lateMetricTuple); verify(collector, times(1)).ack(lateMetricTuple); - bolt.setCurrentSeconds(prepareTime + MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT); - final Tuple lateMetricTuple2 = createMetricTuple(metricDefinition, new Metric(metricDefinition, prepareTime, 42.0)); + bolt.setCurrentTime(prepareTime + MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT*1000); + final Tuple lateMetricTuple2 = createMetricTuple(metricDefinition, prepareTime, new Metric(metricDefinition, prepareTime/1000, 42.0)); bolt.execute(lateMetricTuple2); verify(collector, times(1)).ack(lateMetricTuple2); verify(collector, times(1)).emit(MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM, new Values(MetricAggregationBolt.METRICS_BEHIND)); - bolt.setCurrentSeconds(prepareTime + 2 * MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT); - final Tuple metricTuple = createMetricTuple(metricDefinition, new Metric(metricDefinition, bolt.getCurrentSeconds() - MetricFilteringBolt.MIN_LAG_VALUE_DEFAULT, 42.0)); + bolt.setCurrentTime(prepareTime + 2 * MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT * 1000); + long caughtUpTimestamp = bolt.getCurrentTime() - MetricFilteringBolt.MIN_LAG_VALUE_DEFAULT; + final Tuple metricTuple = createMetricTuple(metricDefinition, caughtUpTimestamp, new Metric(metricDefinition, caughtUpTimestamp/1000, 42.0)); bolt.execute(metricTuple); + // Metrics are caught up so there should not be another METRICS_BEHIND message verify(collector, times(1)).ack(metricTuple); verify(collector, times(1)).emit(MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM, new Values(MetricAggregationBolt.METRICS_BEHIND)); @@ -131,15 +134,15 @@ public class MetricFilteringBoltTest { final MockMetricFilteringBolt bolt = createBolt(new ArrayList(0), collector, true); - long prepareTime = bolt.getCurrentSeconds(); + long prepareTime = bolt.getCurrentTime(); final MetricDefinition metricDefinition = subAlarms.get(0).getExpression().getMetricDefinition(); // Fake sending metrics for MetricFilteringBolt.MAX_LAG_MESSAGES_DEFAULT * MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT seconds boolean first = true; // Need to send MetricFilteringBolt.MAX_LAG_MESSAGES_DEFAULT + 1 metrics because the lag message is not // output on the first one. for (int i = 0; i < MetricFilteringBolt.MAX_LAG_MESSAGES_DEFAULT + 1; i++) { - final Tuple lateMetricTuple = createMetricTuple(metricDefinition, new Metric(metricDefinition, prepareTime, 42.0)); - bolt.setCurrentSeconds(prepareTime + MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT); + final Tuple lateMetricTuple = createMetricTuple(metricDefinition, prepareTime, new Metric(metricDefinition, prepareTime/1000, 42.0)); + bolt.setCurrentTime(prepareTime + MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT * 1000); bolt.execute(lateMetricTuple); verify(collector, times(1)).ack(lateMetricTuple); if (!first) { @@ -147,10 +150,11 @@ public class MetricFilteringBoltTest { new Values(MetricAggregationBolt.METRICS_BEHIND)); } first = false; - prepareTime = bolt.getCurrentSeconds(); + prepareTime = bolt.getCurrentTime(); } // One more - final Tuple metricTuple = createMetricTuple(metricDefinition, new Metric(metricDefinition, bolt.getCurrentSeconds() - MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT, 42.0)); + long timestamp = bolt.getCurrentTime() - MetricFilteringBolt.LAG_MESSAGE_PERIOD_DEFAULT * 1000; + final Tuple metricTuple = createMetricTuple(metricDefinition, timestamp, new Metric(metricDefinition, timestamp/1000, 42.0)); bolt.execute(metricTuple); verify(collector, times(1)).ack(metricTuple); // Won't be any more of these @@ -160,19 +164,19 @@ public class MetricFilteringBoltTest { private static class MockMetricFilteringBolt extends MetricFilteringBolt { private static final long serialVersionUID = 1L; - private long currentSeconds = System.currentTimeMillis() / 1000; + private long currentTimeMillis = System.currentTimeMillis(); public MockMetricFilteringBolt(MetricDefinitionDAO metricDefDAO) { super(metricDefDAO); } @Override - protected long getCurrentSeconds() { - return currentSeconds; + protected long getCurrentTime() { + return currentTimeMillis; } - public void setCurrentSeconds(final long currentSeconds) { - this.currentSeconds = currentSeconds; + public void setCurrentTime(final long currentTimeMillis) { + this.currentTimeMillis = currentTimeMillis; } } @@ -220,17 +224,17 @@ public class MetricFilteringBoltTest { for (final SubAlarm subAlarm : subAlarms) { // First do a MetricDefinition that is an exact match final MetricDefinition metricDefinition = subAlarm.getExpression().getMetricDefinition(); - final Tuple exactTuple = createMetricTuple(metricDefinition, new Metric(metricDefinition, metricTimestamp++, 42.0)); + final Tuple exactTuple = createMetricTuple(metricDefinition, metricTimestamp++ * 1000, new Metric(metricDefinition, metricTimestamp, 42.0)); bolt1.execute(exactTuple); verify(collector1, times(1)).ack(exactTuple); - verify(collector1, howMany).emit(exactTuple.getValues()); + verify(collector1, howMany).emit(new Values(exactTuple.getValue(0), exactTuple.getValue(2))); // Now do a MetricDefinition with an extra dimension that should still match the SubAlarm final Map extraDimensions = new HashMap<>(metricDefinition.dimensions); extraDimensions.put("group", "group_a"); final MetricDefinition inexactMetricDef = new MetricDefinition(metricDefinition.name, extraDimensions); - Metric inexactMetric = new Metric(inexactMetricDef, metricTimestamp++, 42.0); - final Tuple inexactTuple = createMetricTuple(metricDefinition, inexactMetric); + Metric inexactMetric = new Metric(inexactMetricDef, metricTimestamp, 42.0); + final Tuple inexactTuple = createMetricTuple(metricDefinition, metricTimestamp++ * 1000, inexactMetric); bolt1.execute(inexactTuple); verify(collector1, times(1)).ack(inexactTuple); // We want the MetricDefinitionAndTenantId from the exact tuple, but the inexactMetric @@ -310,11 +314,14 @@ public class MetricFilteringBoltTest { } private Tuple createMetricTuple(final MetricDefinition metricDefinition, + final long timestamp, final Metric metric) { final MkTupleParam tupleParam = new MkTupleParam(); - tupleParam.setFields(MetricFilteringBolt.FIELDS); - tupleParam.setStream(Streams.DEFAULT_STREAM_ID); final Tuple tuple = Testing.testTuple(Arrays.asList( - new MetricDefinitionAndTenantId(metricDefinition, TEST_TENANT_ID), metric), tupleParam); + tupleParam.setFields(MetricSpout.FIELDS); + tupleParam.setStream(Streams.DEFAULT_STREAM_ID); + final Tuple tuple = Testing.testTuple(Arrays.asList( + new MetricDefinitionAndTenantId(metricDefinition, TEST_TENANT_ID), + timestamp, metric), tupleParam); return tuple; } }