diff --git a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBolt.java b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBolt.java index 9184386..18b1f5e 100644 --- a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBolt.java +++ b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBolt.java @@ -179,12 +179,13 @@ public class MetricAggregationBolt extends BaseRichBolt { } for (SubAlarmStats stats : subAlarmStatsRepo.get()) { - if (stats.getStats().addValue(metric.value, metric.timestamp)) { + long timestamp_secs = metric.timestamp/1000; + if (stats.getStats().addValue(metric.value, timestamp_secs)) { logger.trace("Aggregated value {} at {} for {}. Updated {}", metric.value, metric.timestamp, metricDefinitionAndTenantId, stats.getStats()); } else { logger.warn("Metric is too old, age {} seconds: timestamp {} for {}, {}", - currentTimeSeconds() - metric.timestamp, metric.timestamp, metricDefinitionAndTenantId, + currentTimeSeconds() - timestamp_secs, timestamp_secs, metricDefinitionAndTenantId, stats.getStats()); } } diff --git a/thresh/src/test/java/monasca/thresh/ThresholdingEngineAlarmTest.java b/thresh/src/test/java/monasca/thresh/ThresholdingEngineAlarmTest.java index 832d5ae..28d0b8b 100644 --- a/thresh/src/test/java/monasca/thresh/ThresholdingEngineAlarmTest.java +++ b/thresh/src/test/java/monasca/thresh/ThresholdingEngineAlarmTest.java @@ -358,7 +358,7 @@ public class ThresholdingEngineAlarmTest extends TopologyTestCase { } System.out.printf("Feeding metrics...%d Stage %s\n", i, stage); - long time = System.currentTimeMillis() / 1000; + long time = System.currentTimeMillis(); for (final MetricDefinitionAndTenantId metricDefinitionAndTenantId : mtids) { metricSpout.feed(new Values(new TenantIdAndMetricName(metricDefinitionAndTenantId), time, new Metric(metricDefinitionAndTenantId.metricDefinition, time, diff --git a/thresh/src/test/java/monasca/thresh/ThresholdingEngineTest.java b/thresh/src/test/java/monasca/thresh/ThresholdingEngineTest.java index 8853f1b..491c834 100644 --- a/thresh/src/test/java/monasca/thresh/ThresholdingEngineTest.java +++ b/thresh/src/test/java/monasca/thresh/ThresholdingEngineTest.java @@ -256,7 +256,7 @@ public class ThresholdingEngineTest extends TopologyTestCase { if (feedCount > 0) { System.out.println("Feeding metrics..."); - long time = System.currentTimeMillis() / 1000; + long time = System.currentTimeMillis(); final MetricDefinitionAndTenantId cpuMtid = new MetricDefinitionAndTenantId(cpuMetricDef, TEST_ALARM_TENANT_ID); metricSpout.feed(new Values(new TenantIdAndMetricName(cpuMtid), time, new Metric(cpuMetricDef.name, cpuMetricDef.dimensions, diff --git a/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBoltTest.java b/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBoltTest.java index 29d7bed..012889c 100644 --- a/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBoltTest.java +++ b/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/MetricAggregationBoltTest.java @@ -112,7 +112,7 @@ public class MetricAggregationBoltTest { sendSubAlarmCreated(metricDef1, subAlarm1); sendSubAlarmCreated(metricDef2, subAlarm2); - long t1 = System.currentTimeMillis() / 1000; + long t1 = System.currentTimeMillis(); bolt.aggregateValues(new MetricDefinitionAndTenantId(metricDef1, TENANT_ID), new Metric( metricDef1.name, metricDef1.dimensions, t1, 100, null)); @@ -127,30 +127,30 @@ public class MetricAggregationBoltTest { SubAlarmStats alarmData = orCreateSubAlarmStatsRepo .get(subAlarm1.getId()); - assertEquals(alarmData.getStats().getValue(t1), 90.0); + assertEquals(alarmData.getStats().getValue(t1/1000), 90.0); alarmData = bolt.getOrCreateSubAlarmStatsRepo(new MetricDefinitionAndTenantId(metricDef2, TENANT_ID)) .get(subAlarm2.getId()); - assertEquals(alarmData.getStats().getValue(t1), 45.0); + assertEquals(alarmData.getStats().getValue(t1/1000), 45.0); } public void shouldEvaluateAlarms() { // Ensure subAlarm2 and subAlarm3 map to the same Metric Definition assertEquals(metricDef3, metricDef2); - long t1 = 170; + long t1 = 170000; bolt.setCurrentTime(t1); sendSubAlarmCreated(metricDef1, subAlarm1); sendSubAlarmCreated(metricDef2, subAlarm2); sendSubAlarmCreated(metricDef3, subAlarm3); // Send metrics for subAlarm1 - bolt.execute(createMetricTuple(metricDef1, new Metric(metricDef1, t1, 100, null))); - bolt.execute(createMetricTuple(metricDef1, new Metric(metricDef1, t1 - 60, 95, null))); - bolt.execute(createMetricTuple(metricDef1, new Metric(metricDef1, t1 - 120, 88, null))); + bolt.execute(createMetricTuple(metricDef1, new Metric(metricDef1, t1, 100000, null))); + bolt.execute(createMetricTuple(metricDef1, new Metric(metricDef1, t1 - 60000, 95, null))); + bolt.execute(createMetricTuple(metricDef1, new Metric(metricDef1, t1 - 120000, 88, null))); - t1 += 20; + t1 += 20000; bolt.setCurrentTime(t1); final Tuple tickTuple = createTickTuple(); bolt.execute(tickTuple); @@ -168,9 +168,9 @@ public class MetricAggregationBoltTest { // Drive subAlarm1 to ALARM bolt.execute(createMetricTuple(metricDef1, new Metric(metricDef1, t1, 99, null))); // Drive subAlarm2 to ALARM and subAlarm3 to OK since they use the same MetricDefinition - t1 += 10; + t1 += 10000; bolt.execute(createMetricTuple(metricDef2, new Metric(metricDef2, t1, 94, null))); - t1 += 50; + t1 += 50000; bolt.setCurrentTime(t1); bolt.execute(tickTuple); verify(collector, times(1)).ack(tickTuple); @@ -185,15 +185,15 @@ public class MetricAggregationBoltTest { public void shouldSendAlarmAgain() { - long t1 = 10; + long t1 = 10000; bolt.setCurrentTime(t1); sendSubAlarmCreated(metricDef2, subAlarm2); bolt.execute(createMetricTuple(metricDef2, new Metric(metricDef2, t1, 100, null))); - bolt.execute(createMetricTuple(metricDef2, new Metric(metricDef2, t1++, 95, null))); - bolt.execute(createMetricTuple(metricDef2, new Metric(metricDef2, t1++, 88, null))); + bolt.execute(createMetricTuple(metricDef2, new Metric(metricDef2, t1 += 1000, 95, null))); + bolt.execute(createMetricTuple(metricDef2, new Metric(metricDef2, t1 += 1000, 88, null))); - t1 += 60; + t1 += 60000; bolt.setCurrentTime(t1); final Tuple tickTuple = createTickTuple(); bolt.execute(tickTuple); @@ -204,10 +204,10 @@ public class MetricAggregationBoltTest { sendSubAlarmResend(metricDef2, subAlarm2); bolt.execute(createMetricTuple(metricDef2, new Metric(metricDef2, t1, 100, null))); - bolt.execute(createMetricTuple(metricDef2, new Metric(metricDef2, t1++, 95, null))); - bolt.execute(createMetricTuple(metricDef2, new Metric(metricDef2, t1++, 88, null))); + bolt.execute(createMetricTuple(metricDef2, new Metric(metricDef2, t1 += 1000, 95, null))); + bolt.execute(createMetricTuple(metricDef2, new Metric(metricDef2, t1 += 1000, 88, null))); - t1 += 60; + t1 += 60000; bolt.setCurrentTime(t1); bolt.execute(tickTuple); verify(collector, times(2)).ack(tickTuple); @@ -245,19 +245,19 @@ public class MetricAggregationBoltTest { } public void shouldSendUndeterminedIfStateChanges() { - long t1 = 50; + long t1 = 50000; bolt.setCurrentTime(t1); sendSubAlarmCreated(metricDef2, subAlarm2); bolt.execute(createMetricTuple(metricDef2, new Metric(metricDef2, t1, 1.0, null))); - t1 += 1; + t1 += 1000; bolt.execute(createMetricTuple(metricDef2, new Metric(metricDef2, t1, 1.0, null))); - bolt.setCurrentTime(t1 += 60); + bolt.setCurrentTime(t1 += 60000); final Tuple tickTuple = createTickTuple(); bolt.execute(tickTuple); assertEquals(subAlarm2.getState(), AlarmState.OK); - bolt.setCurrentTime(t1 += 60); + bolt.setCurrentTime(t1 += 60000); bolt.execute(tickTuple); assertEquals(subAlarm2.getState(), AlarmState.OK); verify(collector, times(1)).emit(new Values(subAlarm2.getAlarmId(), subAlarm2)); @@ -265,14 +265,14 @@ public class MetricAggregationBoltTest { // Have to reset the mock so it can tell the difference when subAlarm2 is emitted again. reset(collector); - bolt.setCurrentTime(t1 += 60); + bolt.setCurrentTime(t1 += 60000); bolt.execute(tickTuple); assertEquals(subAlarm2.getState(), AlarmState.UNDETERMINED); verify(collector, times(1)).emit(new Values(subAlarm2.getAlarmId(), subAlarm2)); } public void shouldSendUndeterminedOnStartup() { - long t1 = 14; + long t1 = 14000; bolt.setCurrentTime(t1); sendSubAlarmCreated(metricDef2, subAlarm2); @@ -284,19 +284,19 @@ public class MetricAggregationBoltTest { verify(collector, times(1)).ack(lagTuple); final Tuple tickTuple = createTickTuple(); - t1 += 60; + t1 += 60000; bolt.setCurrentTime(t1); bolt.execute(tickTuple); verify(collector, times(1)).ack(tickTuple); verify(collector, never()).emit(new Values(subAlarm2.getAlarmId(), subAlarm2)); - t1 += 60; + t1 += 60000; bolt.setCurrentTime(t1); bolt.execute(tickTuple); verify(collector, times(2)).ack(tickTuple); verify(collector, never()).emit(new Values(subAlarm2.getAlarmId(), subAlarm2)); - t1 += 60; + t1 += 60000; bolt.setCurrentTime(t1); bolt.execute(tickTuple); verify(collector, times(3)).ack(tickTuple); @@ -447,8 +447,8 @@ public class MetricAggregationBoltTest { return super.currentTimeSeconds(); } - public void setCurrentTime(long currentTime) { - this.currentTime = currentTime; + public void setCurrentTime(long millis) { + this.currentTime = millis/1000; } } } diff --git a/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/MetricFilteringBoltTest.java b/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/MetricFilteringBoltTest.java index b8dd745..f35c67e 100644 --- a/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/MetricFilteringBoltTest.java +++ b/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/MetricFilteringBoltTest.java @@ -63,7 +63,7 @@ import java.util.UUID; @Test public class MetricFilteringBoltTest { private final static String TEST_TENANT_ID = "42"; - private long metricTimestamp = System.currentTimeMillis() / 1000; // Make sure the metric + private long metricTimestamp = System.currentTimeMillis(); // Make sure the metric private AlarmDefinition alarmDef1; private AlarmDefinition dupMetricAlarmDef;