JAH-1891 Threshold Engine will not update State properly if user has changed state via the API

Use the new oldState and unchangedExpressions fields in the AlarmCreatedEvent to force the resend of unchanged sub alarms so the alarm will be re-evaluated.
This commit is contained in:
Craig Bryant 2014-06-01 08:32:04 -06:00
parent 4d6311ab68
commit 97d024daa1
8 changed files with 222 additions and 69 deletions

View File

@ -102,7 +102,6 @@ public class SubAlarmStats {
*/
boolean evaluate() {
double[] values = stats.getViewValues();
AlarmState initialState = subAlarm.getState();
boolean thresholdExceeded = false;
boolean hasEmptyWindows = false;
for (double value : values) {
@ -115,7 +114,7 @@ public class SubAlarmStats {
if (!subAlarm.getExpression()
.getOperator()
.evaluate(value, subAlarm.getExpression().getThreshold())) {
if (AlarmState.OK.equals(initialState))
if (!shouldSendStateChange(AlarmState.OK))
return false;
setSubAlarmState(AlarmState.OK);
return true;
@ -125,7 +124,7 @@ public class SubAlarmStats {
}
if (thresholdExceeded && !hasEmptyWindows) {
if (AlarmState.ALARM.equals(initialState))
if (!shouldSendStateChange(AlarmState.ALARM))
return false;
setSubAlarmState(AlarmState.ALARM);
return true;
@ -135,7 +134,7 @@ public class SubAlarmStats {
emptyWindowObservations++;
if ((emptyWindowObservations >= emptyWindowObservationThreshold) &&
(subAlarm.isNoState() || !AlarmState.UNDETERMINED.equals(initialState)) &&
shouldSendStateChange(AlarmState.UNDETERMINED) &&
!subAlarm.isSporadicMetric()) {
setSubAlarmState(AlarmState.UNDETERMINED);
return true;
@ -144,10 +143,14 @@ public class SubAlarmStats {
return false;
}
private void setSubAlarmState(AlarmState newState) {
private boolean shouldSendStateChange(AlarmState newState) {
return !subAlarm.getState().equals(newState) || subAlarm.isNoState();
}
private void setSubAlarmState(AlarmState newState) {
subAlarm.setState(newState);
subAlarm.setNoState(false);
}
}
/**
* This MUST only be used for compatible SubAlarms, i.e. where

View File

@ -69,6 +69,7 @@ public class EventProcessingBolt extends BaseRichBolt {
public static final String CREATED = "created";
public static final String DELETED = "deleted";
public static final String UPDATED = "updated";
public static final String RESEND = "resend";
private transient Logger LOG;
private OutputCollector collector;
@ -120,6 +121,10 @@ public class EventProcessingBolt extends BaseRichBolt {
sendSubAlarm(UPDATED, alarmId, subAlarmId, tenantId, alarmSubExpression);
}
private void sendResendSubAlarm(String alarmId, String subAlarmId, String tenantId, AlarmSubExpression alarmSubExpression) {
sendSubAlarm(RESEND, alarmId, subAlarmId, tenantId, alarmSubExpression);
}
private void sendSubAlarm(String eventType, String alarmId, String subAlarmId, String tenantId,
AlarmSubExpression alarmSubExpression) {
MetricDefinition metricDef = alarmSubExpression.getMetricDefinition();
@ -141,6 +146,13 @@ public class EventProcessingBolt extends BaseRichBolt {
}
void handle(AlarmUpdatedEvent event) {
if ((!event.oldAlarmState.equals(event.alarmState) ||
!event.oldAlarmSubExpressions.isEmpty()) && event.changedSubExpressions.isEmpty() &&
event.newAlarmSubExpressions.isEmpty()) {
for (Map.Entry<String, AlarmSubExpression> entry : event.unchangedSubExpressions.entrySet()) {
sendResendSubAlarm(event.alarmId, entry.getKey(), event.tenantId, entry.getValue());
}
}
for (Map.Entry<String, AlarmSubExpression> entry : event.oldAlarmSubExpressions.entrySet()) {
sendDeletedSubAlarm(entry.getKey(), event.tenantId, entry.getValue().getMetricDefinition());
}

View File

@ -121,6 +121,8 @@ public class MetricAggregationBolt extends BaseRichBolt {
handleAlarmCreated(metricDefinitionAndTenantId, subAlarm);
else if (EventProcessingBolt.UPDATED.equals(eventType))
handleAlarmUpdated(metricDefinitionAndTenantId, subAlarm);
else if (EventProcessingBolt.RESEND.equals(eventType))
handleAlarmResend(metricDefinitionAndTenantId, subAlarm);
}
}
}
@ -246,6 +248,34 @@ public class MetricAggregationBolt extends BaseRichBolt {
addSubAlarm(metricDefinitionAndTenantId, subAlarm);
}
void handleAlarmResend(MetricDefinitionAndTenantId metricDefinitionAndTenantId, SubAlarm resendSubAlarm) {
final RepoAndStats repoAndStats = findExistingSubAlarmStats(metricDefinitionAndTenantId, resendSubAlarm);
if (repoAndStats == null)
return;
final SubAlarmStats oldSubAlarmStats = repoAndStats.subAlarmStats;
final SubAlarm oldSubAlarm = oldSubAlarmStats.getSubAlarm();
resendSubAlarm.setState(oldSubAlarm.getState());
resendSubAlarm.setNoState(true); // Have it send its state again so the Alarm can be evaluated
LOG.debug("Forcing SubAlarm {} to send state at next evaluation", oldSubAlarm);
oldSubAlarmStats.updateSubAlarm(resendSubAlarm);
}
private RepoAndStats findExistingSubAlarmStats(MetricDefinitionAndTenantId metricDefinitionAndTenantId,
SubAlarm oldSubAlarm) {
final SubAlarmStatsRepository oldSubAlarmStatsRepo = subAlarmStatsRepos.get(metricDefinitionAndTenantId);
if (oldSubAlarmStatsRepo == null) {
LOG.error("Did not find SubAlarmStatsRepository for MetricDefinition {}", metricDefinitionAndTenantId);
return null;
}
final SubAlarmStats oldSubAlarmStats = oldSubAlarmStatsRepo.get(oldSubAlarm.getId());
if (oldSubAlarmStats == null) {
LOG.error("Did not find existing SubAlarm {} in SubAlarmStatsRepository", oldSubAlarm);
return null;
}
return new RepoAndStats(oldSubAlarmStatsRepo, oldSubAlarmStats);
}
private void addSubAlarm(MetricDefinitionAndTenantId metricDefinitionAndTenantId, SubAlarm subAlarm) {
SubAlarmStatsRepository subAlarmStatsRepo = getOrCreateSubAlarmStatsRepo(metricDefinitionAndTenantId);
if (subAlarmStatsRepo == null)
@ -262,33 +292,26 @@ public class MetricAggregationBolt extends BaseRichBolt {
*/
void handleAlarmUpdated(MetricDefinitionAndTenantId metricDefinitionAndTenantId, SubAlarm subAlarm) {
LOG.debug("Received AlarmUpdatedEvent for {}", subAlarm);
// Clear the old SubAlarm, but save the SubAlarm state
final SubAlarmStatsRepository oldSubAlarmStatsRepo = subAlarmStatsRepos.get(metricDefinitionAndTenantId);
if (oldSubAlarmStatsRepo == null) {
LOG.error("Did not find SubAlarmStatsRepository for MetricDefinition {}", metricDefinitionAndTenantId);
}
else {
final SubAlarmStats oldSubAlarmStats = oldSubAlarmStatsRepo.get(subAlarm.getId());
if (oldSubAlarmStats == null)
LOG.error("Did not find existing SubAlarm {} in SubAlarmStatsRepository", subAlarm);
else {
final SubAlarm oldSubAlarm = oldSubAlarmStats.getSubAlarm();
subAlarm.setState(oldSubAlarm.getState());
subAlarm.setNoState(true); // Doesn't hurt to send too many state changes, just too few
if (oldSubAlarm.isCompatible(subAlarm)) {
LOG.debug("Changing SubAlarm {} to SubAlarm {} and keeping measurements", oldSubAlarm, subAlarm);
oldSubAlarmStats.updateSubAlarm(subAlarm);
return;
}
// Have to completely change the SubAlarmStats
LOG.debug("Changing SubAlarm {} to SubAlarm {} and flushing measurements", oldSubAlarm, subAlarm);
oldSubAlarmStatsRepo.remove(subAlarm.getId());
final RepoAndStats repoAndStats = findExistingSubAlarmStats(metricDefinitionAndTenantId, subAlarm);
if (repoAndStats != null) {
// Clear the old SubAlarm, but save the SubAlarm state
final SubAlarmStats oldSubAlarmStats = repoAndStats.subAlarmStats;
final SubAlarm oldSubAlarm = oldSubAlarmStats.getSubAlarm();
subAlarm.setState(oldSubAlarm.getState());
subAlarm.setNoState(true); // Doesn't hurt to send too many state changes, just too few
if (oldSubAlarm.isCompatible(subAlarm)) {
LOG.debug("Changing SubAlarm {} to SubAlarm {} and keeping measurements", oldSubAlarm, subAlarm);
oldSubAlarmStats.updateSubAlarm(subAlarm);
return;
}
// Have to completely change the SubAlarmStats
LOG.debug("Changing SubAlarm {} to SubAlarm {} and flushing measurements", oldSubAlarm, subAlarm);
repoAndStats.subAlarmStatsRepository.remove(subAlarm.getId());
}
addSubAlarm(metricDefinitionAndTenantId, subAlarm);
}
/**
/**
* Removes the sub-alarm for the {@code subAlarmId} from the subAlarmStatsRepo for the
* {@code metricDefinitionAndTenantId}.
*/
@ -301,4 +324,15 @@ public class MetricAggregationBolt extends BaseRichBolt {
subAlarmStatsRepos.remove(metricDefinitionAndTenantId);
}
}
private static class RepoAndStats {
public final SubAlarmStatsRepository subAlarmStatsRepository;
public final SubAlarmStats subAlarmStats;
public RepoAndStats(SubAlarmStatsRepository subAlarmStatsRepository,
SubAlarmStats subAlarmStats) {
this.subAlarmStatsRepository = subAlarmStatsRepository;
this.subAlarmStats = subAlarmStats;
}
}
}

View File

@ -87,7 +87,7 @@ public class ThresholdingEngineAlarmTest extends TopologyTestCase {
private AlarmExpression expression = new AlarmExpression(
"max(hpcs.compute.cpu{id=5}) >= 3 or max(hpcs.compute.mem{id=5}) >= 557");
private AlarmState currentState = AlarmState.OK;
private AlarmState currentState = AlarmState.UNDETERMINED;
private volatile int alarmsSent = 0;
public ThresholdingEngineAlarmTest() {
@ -171,7 +171,7 @@ public class ThresholdingEngineAlarmTest extends TopologyTestCase {
return result;
}
final AlarmState[] expectedStates = { AlarmState.ALARM, AlarmState.OK, AlarmState.ALARM };
final AlarmState[] expectedStates = { AlarmState.ALARM, AlarmState.OK, AlarmState.ALARM, AlarmState.OK };
public void shouldThreshold() throws Exception {
doAnswer(new Answer<Object>() {
public Object answer(InvocationOnMock invocation) {
@ -192,9 +192,11 @@ public class ThresholdingEngineAlarmTest extends TopologyTestCase {
int goodValueCount = 0;
boolean firstUpdate = true;
boolean secondUpdate = true;
boolean thirdUpdate = true;
final Alarm initialAlarm = new Alarm(TEST_ALARM_ID, TEST_ALARM_TENANT_ID, TEST_ALARM_NAME,
TEST_ALARM_DESCRIPTION, expression, subAlarms, AlarmState.UNDETERMINED, Boolean.TRUE);
final int expectedAlarms = expectedStates.length;
AlarmExpression savedAlarmExpression = null;
for (int i = 1; alarmsSent != expectedAlarms && i < 300; i++) {
if (i == 5) {
final Map<String, AlarmSubExpression> exprs = createSubExpressionMap();
@ -215,15 +217,18 @@ public class ThresholdingEngineAlarmTest extends TopologyTestCase {
updatedSubAlarms.add(new SubAlarm(subAlarm.getId(), initialAlarm.getId(), subAlarm.getExpression()));
}
final AlarmUpdatedEvent event = EventProcessingBoltTest.createAlarmUpdatedEvent(initialAlarm, expression, updatedSubAlarms);
event.alarmState = currentState;
initialAlarm.setState(currentState);
final AlarmUpdatedEvent event = EventProcessingBoltTest.createAlarmUpdatedEvent(initialAlarm, initialAlarm.getState(), expression,
updatedSubAlarms);
subAlarms = updatedSubAlarms;
initialAlarm.setSubAlarms(updatedSubAlarms);
eventSpout.feed(new Values(event));
System.out.printf("Send AlarmUpdatedEvent for expression %s%n", expression.getExpression());
}
else if (alarmsSent == 2 && secondUpdate) {
secondUpdate = false;
savedAlarmExpression = expression;
expression = new AlarmExpression("max(hpcs.compute.load{id=5}) > 551 and (" + expression.getExpression().replace("556", "554") + ")");
final List<SubAlarm> updatedSubAlarms = new ArrayList<>();
updatedSubAlarms.add(new SubAlarm(UUID.randomUUID().toString(), initialAlarm.getId(), expression.getSubExpressions().get(0)));
@ -231,9 +236,30 @@ public class ThresholdingEngineAlarmTest extends TopologyTestCase {
updatedSubAlarms.add(new SubAlarm(subAlarms.get(index).getId(), initialAlarm.getId(), expression.getSubExpressions().get(index+1)));
}
final AlarmUpdatedEvent event = EventProcessingBoltTest.createAlarmUpdatedEvent(initialAlarm, expression, updatedSubAlarms);
event.alarmState = currentState;
initialAlarm.setState(currentState);
final AlarmUpdatedEvent event = EventProcessingBoltTest.createAlarmUpdatedEvent(initialAlarm, initialAlarm.getState(), expression,
updatedSubAlarms);
subAlarms = updatedSubAlarms;
initialAlarm.setSubAlarms(updatedSubAlarms);
eventSpout.feed(new Values(event));
System.out.printf("Send AlarmUpdatedEvent for expression %s%n", expression.getExpression());
}
else if (alarmsSent == 3 && thirdUpdate) {
thirdUpdate = false;
expression = savedAlarmExpression;
final List<SubAlarm> updatedSubAlarms = new ArrayList<>();
int index = 1;
for (AlarmSubExpression subExpression : expression.getSubExpressions()) {
updatedSubAlarms.add(new SubAlarm(subAlarms.get(index).getId(), initialAlarm.getId(), subExpression));
index++;
}
initialAlarm.setState(currentState);
final AlarmUpdatedEvent event = EventProcessingBoltTest.createAlarmUpdatedEvent(initialAlarm, initialAlarm.getState(), expression,
updatedSubAlarms);
subAlarms = updatedSubAlarms;
initialAlarm.setSubAlarms(updatedSubAlarms);
eventSpout.feed(new Values(event));
System.out.printf("Send AlarmUpdatedEvent for expression %s%n", expression.getExpression());
@ -265,7 +291,7 @@ public class ThresholdingEngineAlarmTest extends TopologyTestCase {
}
}
assertEquals(alarmsSent, expectedAlarms);
assertEquals(currentState, AlarmState.ALARM);
assertEquals(currentState, expectedStates[expectedStates.length - 1]);
}
private Map<String, AlarmSubExpression> createSubExpressionMap() {

View File

@ -152,7 +152,7 @@ public class AlarmThresholdingBoltTest {
final AlarmState newState = AlarmState.OK;
boolean newEnabled = false;
final AlarmUpdatedEvent event = new AlarmUpdatedEvent(tenantId, alarmId, newName, newDescription, alarm.getAlarmExpression().getExpression(),
newState, newEnabled, empty, empty, empty);
alarm.getState(), newState, newEnabled, empty, empty, empty, empty);
final Tuple updateTuple = createAlarmUpdateTuple(event);
bolt.execute(updateTuple);
verify(collector, times(1)).ack(updateTuple);
@ -168,6 +168,7 @@ public class AlarmThresholdingBoltTest {
final Map<String, AlarmSubExpression> newSubExpressions = new HashMap<>();
final Map<String, AlarmSubExpression> oldSubExpressions = new HashMap<>();
final Map<String, AlarmSubExpression> changedSubExpressions = new HashMap<>();
final Map<String, AlarmSubExpression> unchangedSubExpressions = new HashMap<>();
final String newExpression = subExpressions[1] + " or " +
subExpressions[2].replace("max", "avg") + " or " +
"sum(diskio{instance_id=123,device=4242}, 1) > 5000";
@ -180,13 +181,14 @@ public class AlarmThresholdingBoltTest {
final SubAlarm changedSubAlarm = new SubAlarm(subAlarms.get(2).getId(), alarmId, newAlarmExpression.getSubExpressions().get(1));
changedSubExpressions.put(changedSubAlarm.getId(), changedSubAlarm.getExpression());
final SubAlarm unChangedSubAlarm = new SubAlarm(subAlarms.get(1).getId(), alarmId, subAlarms.get(1).getExpression());
unchangedSubExpressions.put(unChangedSubAlarm.getId(), unChangedSubAlarm.getExpression());
emitSubAlarmStateChange(alarmId, changedSubAlarm, AlarmState.OK);
emitSubAlarmStateChange(alarmId, unChangedSubAlarm, AlarmState.OK);
unChangedSubAlarm.setState(AlarmState.OK);
final AlarmUpdatedEvent event = new AlarmUpdatedEvent(tenantId, alarmId, alarm.getName(), alarm.getDescription(), newExpression,
alarm.getState(), alarm.isActionsEnabled(), oldSubExpressions, changedSubExpressions, newSubExpressions);
alarm.getState(), alarm.getState(), alarm.isActionsEnabled(), oldSubExpressions, changedSubExpressions, unchangedSubExpressions, newSubExpressions);
final Tuple updateTuple = createAlarmUpdateTuple(event);
bolt.execute(updateTuple);
verify(collector, times(1)).ack(updateTuple);

View File

@ -23,8 +23,11 @@ import static org.mockito.Mockito.verify;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.testng.annotations.BeforeMethod;
@ -37,6 +40,9 @@ import backtype.storm.testing.MkTupleParam;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Sets;
import com.hpcloud.mon.common.event.AlarmCreatedEvent;
import com.hpcloud.mon.common.event.AlarmDeletedEvent;
import com.hpcloud.mon.common.event.AlarmUpdatedEvent;
@ -144,43 +150,73 @@ public class EventProcessingBoltTest {
}
public static AlarmUpdatedEvent createAlarmUpdatedEvent(final Alarm alarm,
final AlarmExpression updatedAlarmExpression, List<SubAlarm> updatedSubAlarms) {
final Alarm updatedAlarm = new Alarm();
updatedAlarm.setId(alarm.getId());
updatedAlarm.setTenantId(alarm.getTenantId());
updatedAlarm.setName(alarm.getName());
updatedAlarm.setExpression(updatedAlarmExpression.getExpression());
updatedAlarm.setDescription(alarm.getDescription());
updatedAlarm.setState(alarm.getState());
final AlarmState newState,
final AlarmExpression updatedAlarmExpression,
List<SubAlarm> updatedSubAlarms) {
final Map<String, AlarmSubExpression> oldAlarmSubExpressions = new HashMap<>();
for (final SubAlarm subAlarm : alarm.getSubAlarms())
oldAlarmSubExpressions.put(subAlarm.getId(), subAlarm.getExpression());
BiMap<String, AlarmSubExpression> oldExpressions = HashBiMap.create(oldAlarmSubExpressions);
Set<AlarmSubExpression> oldSet = oldExpressions.inverse().keySet();
Set<AlarmSubExpression> newSet = new HashSet<>();
for (final SubAlarm subAlarm : updatedSubAlarms)
newSet.add(subAlarm.getExpression());
final List<SubAlarm> toDelete = new ArrayList<>(alarm.getSubAlarms());
final Map<String, AlarmSubExpression> newAlarmSubExpressions = new HashMap<>();
final Map<String, AlarmSubExpression> updatedSubExpressions = new HashMap<>();
for (final SubAlarm newSubAlarm : updatedSubAlarms) {
final SubAlarm oldSubAlarm = alarm.getSubAlarm(newSubAlarm.getId());
if (oldSubAlarm == null) {
newAlarmSubExpressions.put(newSubAlarm.getId(), newSubAlarm.getExpression());
}
else {
toDelete.remove(oldSubAlarm);
if (!newSubAlarm.getExpression().equals(oldSubAlarm.getExpression())) {
updatedSubExpressions.put(newSubAlarm.getId(), newSubAlarm.getExpression());
}
// Identify old or changed expressions
Set<AlarmSubExpression> oldOrChangedExpressions = new HashSet<>(Sets.difference(oldSet, newSet));
// Identify new or changed expressions
Set<AlarmSubExpression> newOrChangedExpressions = new HashSet<>(Sets.difference(newSet, oldSet));
// Find changed expressions
Map<String, AlarmSubExpression> changedExpressions = new HashMap<>();
for (Iterator<AlarmSubExpression> oldIt = oldOrChangedExpressions.iterator(); oldIt.hasNext();) {
AlarmSubExpression oldExpr = oldIt.next();
for (Iterator<AlarmSubExpression> newIt = newOrChangedExpressions.iterator(); newIt.hasNext();) {
AlarmSubExpression newExpr = newIt.next();
if (sameKeyFields(oldExpr, newExpr)) {
oldIt.remove();
newIt.remove();
changedExpressions.put(oldExpressions.inverse().get(oldExpr), newExpr);
break;
}
}
}
final Map<String, AlarmSubExpression> deletedSubExpressions = new HashMap<>(toDelete.size());
for (final SubAlarm oldSubAlarm : toDelete) {
deletedSubExpressions.put(oldSubAlarm.getId(), oldSubAlarm.getExpression());
}
final AlarmUpdatedEvent event = new AlarmUpdatedEvent(updatedAlarm.getTenantId(), updatedAlarm.getId(),
updatedAlarm.getName(), updatedAlarm.getDescription(), updatedAlarm.getAlarmExpression().getExpression(), alarm.getState(), true, deletedSubExpressions,
updatedSubExpressions, newAlarmSubExpressions);
return event;
BiMap<String, AlarmSubExpression> unchangedExpressions = HashBiMap.create(oldExpressions);
unchangedExpressions.values().removeAll(oldOrChangedExpressions);
unchangedExpressions.keySet().removeAll(changedExpressions.keySet());
// Remove old sub expressions
oldExpressions.values().retainAll(oldOrChangedExpressions);
// Create IDs for new expressions
Map<String, AlarmSubExpression> newExpressions = new HashMap<>();
for (AlarmSubExpression expression : newOrChangedExpressions)
for (final SubAlarm subAlarm : updatedSubAlarms)
if (subAlarm.getExpression().equals(expression))
newExpressions.put(subAlarm.getId(), expression);
final AlarmUpdatedEvent event = new AlarmUpdatedEvent(alarm.getTenantId(), alarm.getId(),
alarm.getName(), alarm.getDescription(), updatedAlarmExpression.getExpression(), newState, alarm.getState(),
true, oldExpressions,
changedExpressions, unchangedExpressions, newExpressions);
return event;
}
/**
* Returns whether all of the fields of {@code a} and {@code b} are the same except the operator
* and threshold.
*/
private static boolean sameKeyFields(AlarmSubExpression a, AlarmSubExpression b) {
return a.getMetricDefinition().equals(b.getMetricDefinition())
&& a.getFunction().equals(b.getFunction()) && a.getPeriod() == b.getPeriod()
&& a.getPeriods() == b.getPeriods();
}
public void testAlarmUpdatedEvent() {
final String updatedExpression = "avg(hpcs.compute.cpu{instance_id=123,device=42}, 1) > 5 " +
"and max(hpcs.compute.Mem{instance_id=123,device=42}) > 90 " +
"and max(hpcs.compute.mem{instance_id=123,device=42}) > 90 " +
"and max(hpcs.compute.newLoad{instance_id=123,device=42}) > 5";
final AlarmExpression updatedAlarmExpression = new AlarmExpression(updatedExpression);
@ -190,7 +226,8 @@ public class EventProcessingBoltTest {
updatedSubAlarms.add(new SubAlarm(subAlarms.get(1).getId(), alarm.getId(), updatedAlarmExpression.getSubExpressions().get(1)));
updatedSubAlarms.add(new SubAlarm(UUID.randomUUID().toString(), alarm.getId(), updatedAlarmExpression.getSubExpressions().get(2)));
final AlarmUpdatedEvent event = createAlarmUpdatedEvent(alarm, updatedAlarmExpression, updatedSubAlarms);
final AlarmUpdatedEvent event = createAlarmUpdatedEvent(alarm, alarm.getState(), updatedAlarmExpression,
updatedSubAlarms);
final Tuple tuple = createTuple(event);
bolt.execute(tuple);

View File

@ -172,6 +172,43 @@ public class MetricAggregationBoltTest {
verify(collector, times(1)).emit(new Values(subAlarm3.getAlarmId(), subAlarm3));
}
public void shouldSendAlarmAgain() {
long t1 = 10;
bolt.setCurrentTime(t1);
bolt.execute(createMetricTuple(metricDef2, null));
bolt.execute(createMetricTuple(metricDef2, new Metric(metricDef2, t1, 100)));
bolt.execute(createMetricTuple(metricDef2, new Metric(metricDef2, t1++, 95)));
bolt.execute(createMetricTuple(metricDef2, new Metric(metricDef2, t1++, 88)));
t1 += 60;
bolt.setCurrentTime(t1);
final Tuple tickTuple = createTickTuple();
bolt.execute(tickTuple);
verify(collector, times(1)).emit(new Values(subAlarm2.getAlarmId(), subAlarm2));
assertEquals(subAlarm2.getState(), AlarmState.ALARM);
verify(collector, times(1)).ack(tickTuple);
final MkTupleParam tupleParam = new MkTupleParam();
tupleParam.setFields(EventProcessingBolt.METRIC_SUB_ALARM_EVENT_STREAM_FIELDS);
tupleParam.setStream(EventProcessingBolt.METRIC_SUB_ALARM_EVENT_STREAM_ID);
final Tuple resendTuple = Testing.testTuple(Arrays.asList(EventProcessingBolt.RESEND,
new MetricDefinitionAndTenantId(metricDef2, TENANT_ID), subAlarm2), tupleParam);
bolt.execute(resendTuple);
bolt.execute(createMetricTuple(metricDef2, new Metric(metricDef2, t1, 100)));
bolt.execute(createMetricTuple(metricDef2, new Metric(metricDef2, t1++, 95)));
bolt.execute(createMetricTuple(metricDef2, new Metric(metricDef2, t1++, 88)));
t1 += 60;
bolt.setCurrentTime(t1);
bolt.execute(tickTuple);
verify(collector, times(2)).ack(tickTuple);
assertEquals(subAlarm2.getState(), AlarmState.ALARM);
verify(collector, times(2)).emit(new Values(subAlarm2.getAlarmId(), subAlarm2));
}
public void shouldSendUndeterminedIfStateChanges() {
long t1 = System.currentTimeMillis() / 1000;
bolt.setCurrentTime(t1);

View File

@ -26,6 +26,7 @@ import org.testng.annotations.Test;
import com.hpcloud.mon.common.event.AlarmCreatedEvent;
import com.hpcloud.mon.common.event.AlarmDeletedEvent;
import com.hpcloud.mon.common.event.AlarmUpdatedEvent;
import com.hpcloud.mon.common.model.alarm.AlarmState;
import com.hpcloud.util.Serialization;
@Test
@ -46,7 +47,8 @@ public class EventDeserializerTest {
}
public void shouldDeserializeAlarmUpdatedEvent() {
roundTrip(new AlarmUpdatedEvent(TENANT_ID, ALARM_ID, ALARM_NAME, ALARM_DESCRIPTION, ALARM_EXPRESSION, null, false, null, null, null));
roundTrip(new AlarmUpdatedEvent(TENANT_ID, ALARM_ID, ALARM_NAME, ALARM_DESCRIPTION, ALARM_EXPRESSION,
AlarmState.OK, AlarmState.OK, false, null, null, null, null));
}
private void roundTrip(Object event) {