From 46f2f5f3dfcdcb6a3d388a6bc002a50990cbf4ae Mon Sep 17 00:00:00 2001 From: Craig Bryant Date: Mon, 15 Dec 2014 17:00:16 -0700 Subject: [PATCH] Improve performance of Alarm Creation The AlarmCreationBolt now caches AlarmDefinitions and Alarms for quicker evaluation of incoming metrics. Incoming metrics end up in one of these buckets: 1. Fits into an existing Alarm 2. Causes a new Alarm to be created 3. Already exists in an existing Alarm All of these require the analysis of existing Alarms. I tried writing SQL to do this analysis but it just wasn't fast enough so instead I added the caching of the Alarm Definitions and Alarms. The AlarmCreationBolt now needs to process Alarm deletion message so that stream from the EventProcessingBolt had to be hooked up to the AlarmCreationBolt. The AlarmCreationBolt used to incorrectly handle Alarm Definition Updated events. Improved the queries in AlarmDAOImpl to be more efficient by using fewer queries intsead of multiple queries per Alarm. However the AlarmDAOImplTest now requires a real mysql instance since the h2 emulator doesn't understand "group_concat". Mark that test as only run for target integration-test Turn on tests for target integration-test Previous code was not reusing Metrics from existing Alarms all times it should have. Added test for this case Changed info messages to debug to lessen normal logging. Added more tests of existing and new functionality Added some timing code for debug Removed unused code Added more debug logging code Added reference to API doc for Alarm Definitions in README Change-Id: Ied9841ecde7608b9eb1eb9c110b73b079ede71bc --- README.md | 2 + thresh/pom.xml | 2 +- .../java/monasca/thresh/TopologyModule.java | 1 + .../monasca/thresh/domain/model/Alarm.java | 13 +- .../thresh/domain/model/AlarmDefinition.java | 12 +- .../persistence/AlarmDAOImpl.java | 216 +++++++++--------- .../thresholding/AlarmCreationBolt.java | 165 +++++++++---- .../thresholding/AlarmThresholdingBolt.java | 61 +---- .../thresholding/EventProcessingBolt.java | 1 + .../thresholding/MetricFilteringBolt.java | 2 + .../persistence/AlarmDAOImplTest.java | 31 ++- .../AlarmDefinitionDAOImplTest.java | 10 +- .../thresholding/AlarmCreationBoltTest.java | 172 +++++++++++++- 13 files changed, 460 insertions(+), 228 deletions(-) diff --git a/README.md b/README.md index 3ef2148..f9b492f 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,8 @@ Alarms have three possible states: `UNDETERMINED`, `OK` and `ALARM`. Alarms are avg(cpu{service=nova}, 120) > 90 or avg(load{service=nova}, 120) > 15 ``` +For more details on Alarm Definitions versus Alarms refer to the Monasca API documentation at https://github.com/stackforge/monasca-api/blob/master/docs/monasca-api-spec.md. + If the expression evaluates to true, the Alarm state transitions to `ALARM`, if it evaluates to false, the state transitions to `OK` and if there aren't any metrics for the two times the measuring period, the Alarm state transitions to `UNDETERMINED`. Each part of the expression is represented by a Sub Alarm, so for the above example, there are two Sub Alarms. The Threshold Engine is designed as a series of Storm Spouts and Bolts. For an overview of Storm, look at [the tutorial][storm-tutorial]. Spouts feed external data into the system as messages while bolts process incoming messages and optionally produce output messages for a downstream bolt. diff --git a/thresh/pom.xml b/thresh/pom.xml index 9189839..127bf71 100644 --- a/thresh/pom.xml +++ b/thresh/pom.xml @@ -18,7 +18,7 @@ 1.0.0-SNAPSHOT 0.9.1-incubating - true + false UTF-8 UTF-8 ${project.artifactId}-${project.version}-${timestamp}-${buildNumber} diff --git a/thresh/src/main/java/monasca/thresh/TopologyModule.java b/thresh/src/main/java/monasca/thresh/TopologyModule.java index 004e2cb..5c98fa7 100644 --- a/thresh/src/main/java/monasca/thresh/TopologyModule.java +++ b/thresh/src/main/java/monasca/thresh/TopologyModule.java @@ -121,6 +121,7 @@ public class TopologyModule extends AbstractModule { MetricFilteringBolt.NEW_METRIC_FOR_ALARM_DEFINITION_STREAM, new Fields(AlarmCreationBolt.ALARM_CREATION_FIELDS[3])) .allGrouping("event-bolt", EventProcessingBolt.METRIC_SUB_ALARM_EVENT_STREAM_ID) + .allGrouping("event-bolt", EventProcessingBolt.ALARM_EVENT_STREAM_ID) .allGrouping("event-bolt", EventProcessingBolt.ALARM_DEFINITION_EVENT_STREAM_ID) .setNumTasks(1); // This has to be a single bolt right now because there is no // database protection for adding metrics and dimensions diff --git a/thresh/src/main/java/monasca/thresh/domain/model/Alarm.java b/thresh/src/main/java/monasca/thresh/domain/model/Alarm.java index e70b8ea..08168dd 100644 --- a/thresh/src/main/java/monasca/thresh/domain/model/Alarm.java +++ b/thresh/src/main/java/monasca/thresh/domain/model/Alarm.java @@ -33,8 +33,7 @@ import java.util.UUID; /** * An alarm comprised of sub-alarms. - */ -/** + * * @author craigbr * */ @@ -207,6 +206,16 @@ public class Alarm extends AbstractEntity { } } + public boolean updateSubAlarm(final SubExpression subExpression) { + for (final SubAlarm subAlarm : this.subAlarms.values()) { + if (subAlarm.getAlarmSubExpressionId().equals(subExpression.getId())) { + subAlarm.setExpression(subExpression.getAlarmSubExpression()); + return true; + } + } + return false; + } + @Override public String toString() { final StringBuilder alarmedMetricsString = new StringBuilder(); diff --git a/thresh/src/main/java/monasca/thresh/domain/model/AlarmDefinition.java b/thresh/src/main/java/monasca/thresh/domain/model/AlarmDefinition.java index 54602f7..8133615 100644 --- a/thresh/src/main/java/monasca/thresh/domain/model/AlarmDefinition.java +++ b/thresh/src/main/java/monasca/thresh/domain/model/AlarmDefinition.java @@ -26,7 +26,7 @@ import java.util.List; import java.util.UUID; /** - * An alarm comprised of sub-alarms. + * Defines the "policy" for creating alarms */ public class AlarmDefinition extends AbstractEntity { private String tenantId; @@ -216,4 +216,14 @@ public class AlarmDefinition extends AbstractEntity { public void setSubExpressions(List subExpressions) { this.subExpressions = subExpressions; } + + public boolean updateSubExpression(final String id, final AlarmSubExpression alarmSubExpression) { + for (final SubExpression subExpression : this.subExpressions) { + if (subExpression.getId().equals(id)) { + subExpression.setAlarmSubExpression(alarmSubExpression); + return true; + } + } + return false; + } } diff --git a/thresh/src/main/java/monasca/thresh/infrastructure/persistence/AlarmDAOImpl.java b/thresh/src/main/java/monasca/thresh/infrastructure/persistence/AlarmDAOImpl.java index 5f67494..6ad36f5 100644 --- a/thresh/src/main/java/monasca/thresh/infrastructure/persistence/AlarmDAOImpl.java +++ b/thresh/src/main/java/monasca/thresh/infrastructure/persistence/AlarmDAOImpl.java @@ -20,7 +20,6 @@ package monasca.thresh.infrastructure.persistence; import monasca.common.model.alarm.AlarmState; import monasca.common.model.alarm.AlarmSubExpression; import monasca.common.model.metric.MetricDefinition; -import monasca.common.persistence.BeanMapper; import monasca.thresh.domain.model.Alarm; import monasca.thresh.domain.model.MetricDefinitionAndTenantId; import monasca.thresh.domain.model.SubAlarm; @@ -31,19 +30,15 @@ import org.apache.commons.codec.binary.Hex; import org.apache.commons.codec.digest.DigestUtils; import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.Handle; -import org.skife.jdbi.v2.StatementContext; -import org.skife.jdbi.v2.tweak.ResultSetMapper; +import org.skife.jdbi.v2.Query; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.sql.ResultSet; -import java.sql.SQLException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.TreeMap; import javax.inject.Inject; @@ -65,38 +60,98 @@ public class AlarmDAOImpl implements AlarmDAO { @Override public List findForAlarmDefinitionId(String alarmDefinitionId) { - Handle h = db.open(); - try { - List alarms = - h.createQuery("select * from alarm where alarm_definition_id = :id") - .bind("id", alarmDefinitionId).map(new BeanMapper(Alarm.class)).list(); - - for (final Alarm alarm : alarms) { - alarm.setSubAlarms(getSubAlarms(h, alarm.getId())); - - alarm.setAlarmedMetrics(findAlarmedMetrics(h, alarm.getId())); - } - return alarms; - } finally { - h.close(); - } + return findAlarms("a.alarm_definition_id = :alarmDefinitionId ", "alarmDefinitionId", + alarmDefinitionId); } @Override public List listAll() { - Handle h = db.open(); - try { - List alarms = - h.createQuery("select * from alarm").map(new BeanMapper(Alarm.class)).list(); + return findAlarms("1=1"); // This is basically "true" and gets optimized out + } - for (final Alarm alarm : alarms) { - alarm.setSubAlarms(getSubAlarms(h, alarm.getId())); + private List findAlarms(final String additionalWhereClause, String ... params) { + try (final Handle h = db.open()) { - alarm.setAlarmedMetrics(findAlarmedMetrics(h, alarm.getId())); + final String ALARMS_SQL = + "select a.id, a.alarm_definition_id, a.state, sa.id as sub_alarm_id, sa.expression, sa.sub_expression_id, ad.tenant_id from alarm a " + + "inner join sub_alarm sa on sa.alarm_id = a.id " + + "inner join alarm_definition ad on a.alarm_definition_id = ad.id " + + "where ad.deleted_at is null and %s " + + "order by a.id"; + final String sql = String.format(ALARMS_SQL, additionalWhereClause); + final Query> query = h.createQuery(sql); + addQueryParameters(query, params); + final List> rows = query.list(); + + final List alarms = new ArrayList<>(rows.size()); + List subAlarms = new ArrayList(); + String prevAlarmId = null; + Alarm alarm = null; + final Map alarmMap = new HashMap<>(); + final Map tenantIdMap = new HashMap<>(); + for (final Map row : rows) { + final String alarmId = getString(row, "id"); + if (!alarmId.equals(prevAlarmId)) { + if (alarm != null) { + alarm.setSubAlarms(subAlarms); + } + alarm = new Alarm(); + alarm.setId(alarmId); + alarm.setAlarmDefinitionId(getString(row, "alarm_definition_id")); + alarm.setState(AlarmState.valueOf(getString(row, "state"))); + subAlarms = new ArrayList(); + alarms.add(alarm); + alarmMap.put(alarmId, alarm); + tenantIdMap.put(alarmId, getString(row, "tenant_id")); + } + final SubExpression subExpression = + new SubExpression(getString(row, "sub_expression_id"), AlarmSubExpression.of(getString( + row, "expression"))); + final SubAlarm subAlarm = + new SubAlarm(getString(row, "sub_alarm_id"), alarmId, subExpression); + subAlarms.add(subAlarm); + prevAlarmId = alarmId; + } + if (alarm != null) { + alarm.setSubAlarms(subAlarms); + } + if (!alarms.isEmpty()) { + getAlarmedMetrics(h, alarmMap, tenantIdMap, additionalWhereClause, params); } return alarms; - } finally { - h.close(); + } + } + + private void addQueryParameters(final Query> query, String... params) { + for (int i = 0; i < params.length;) { + query.bind(params[i], params[i+1]); + i += 2; + } + } + + private void getAlarmedMetrics(Handle h, final Map alarmMap, + final Map tenantIdMap, final String additionalWhereClause, String ... params) { + final String baseSql = "select a.id, md.name, mdg.dimensions from metric_definition as md " + + "inner join metric_definition_dimensions as mdd on md.id = mdd.metric_definition_id " + + "inner join alarm_metric as am on mdd.id = am.metric_definition_dimensions_id " + + "inner join alarm as a on am.alarm_id = a.id " + + "left join (select dimension_set_id, name, value, group_concat(name, '=', value) as dimensions " + + " from metric_dimension group by dimension_set_id) as mdg on mdg.dimension_set_id = mdd.metric_dimension_set_id where %s"; + final String sql = String.format(baseSql, additionalWhereClause); + final Query> query = h.createQuery(sql); + addQueryParameters(query, params); + final List> metricRows = query.list(); + for (final Map row : metricRows) { + final String alarmId = getString(row, "id"); + final Alarm alarm = alarmMap.get(alarmId); + // This shouldn't happen but it is possible an Alarm gets created after the AlarmDefinition is + // marked deleted and any existing alarms are deleted but before the Threshold Engine gets the + // AlarmDefinitionDeleted message + if (alarm == null) { + continue; + } + final MetricDefinition md = createMetricDefinitionFromRow(row); + alarm.addAlarmedMetric(new MetricDefinitionAndTenantId(md, tenantIdMap.get(alarmId))); } } @@ -208,105 +263,54 @@ public class AlarmDAOImpl implements AlarmDAO { @Override public Alarm findById(String id) { - Handle h = db.open(); - - try { - Alarm alarm = - h.createQuery("select * from alarm where id = :id").bind("id", id) - .map(new BeanMapper(Alarm.class)).first(); - if (alarm == null) { - return null; - } - - alarm.setSubAlarms(getSubAlarms(h, alarm.getId())); - - alarm.setAlarmedMetrics(findAlarmedMetrics(h, id)); - return alarm; - } finally { - h.close(); + final List alarms = findAlarms("a.id = :alarm_id ", "alarm_id", id); + if (alarms.isEmpty()) { + return null; } - } - - private static class SubAlarmMapper implements ResultSetMapper { - public SubAlarm map(int rowIndex, ResultSet rs, StatementContext ctxt) throws SQLException { - SubExpression subExpression = new SubExpression( - rs.getString("sub_expression_id"), AlarmSubExpression.of(rs.getString("expression"))); - return new SubAlarm(rs.getString("id"), rs.getString("alarm_id"), subExpression); - } - } - - private List getSubAlarms(Handle h, String alarmId) { - return h.createQuery("select * from sub_alarm where alarm_id = :alarmId") - .bind("alarmId", alarmId).map(new SubAlarmMapper()).list(); - } - - private Set findAlarmedMetrics(Handle h, String alarmId) { - final List> result = - h.createQuery( - "select md.name as metric_name, md.tenant_id, md.region, mdi.name, mdi.value, mdd.id, mdd.metric_dimension_set_id " + - "from metric_definition_dimensions as mdd left join metric_definition as md on md.id = mdd.metric_definition_id " + - "left join metric_dimension as mdi on mdi.dimension_set_id = mdd.metric_dimension_set_id where mdd.id in " + - "(select metric_definition_dimensions_id from alarm_metric where alarm_id=:alarm_id) order by mdd.id") - .bind("alarm_id", alarmId).list(); - if ((result == null) || result.isEmpty()) { - return new HashSet<>(0); + else { + return alarms.get(0); } - - final Set alarmedMetrics = new HashSet<>(result.size()); - Sha1HashId previous = null; - MetricDefinitionAndTenantId mdtid = null; - for (Map row : result) { - final Sha1HashId next = new Sha1HashId((byte[]) row.get("id")); - // The order by clause in the SQL guarantees this order - if (!next.equals(previous)) { - if (mdtid != null) { - alarmedMetrics.add(mdtid); - } - final String name = (String) row.get("metric_name"); - final String tenantId = (String) row.get("tenant_id"); - mdtid = new MetricDefinitionAndTenantId(new MetricDefinition(name, new HashMap()), tenantId); - previous = next; - } - final String name = (String) row.get("name"); - final String value = (String) row.get("value"); - if ((name != null) && !name.isEmpty()) { - mdtid.metricDefinition.dimensions.put(name, value); - } - } - if (mdtid != null) { - alarmedMetrics.add(mdtid); - } - return alarmedMetrics; } @Override public void updateState(String id, AlarmState state) { - Handle h = db.open(); - try { + try (final Handle h = db.open()) { h.createStatement("update alarm set state = :state, updated_at = NOW() where id = :id") .bind("id", id).bind("state", state.toString()).execute(); - } finally { - h.close(); } } @Override public int updateSubAlarmExpressions(String alarmSubExpressionId, AlarmSubExpression alarmSubExpression) { - Handle h = db.open(); - try { + try (final Handle h = db.open()) { return h .createStatement( "update sub_alarm set expression=:expression where sub_expression_id=:alarmSubExpressionId") .bind("expression", alarmSubExpression.getExpression()) .bind("alarmSubExpressionId", alarmSubExpressionId).execute(); - } finally { - h.close(); } } + private MetricDefinition createMetricDefinitionFromRow(final Map row) { + final Map dimensionMap = new HashMap<>(); + final String dimensions = getString(row, "dimensions"); + if (dimensions != null) { + for (String dimension : dimensions.split(",")) { + final String[] parsed_dimension = dimension.split("="); + dimensionMap.put(parsed_dimension[0], parsed_dimension[1]); + } + } + final MetricDefinition md = new MetricDefinition(getString(row, "name"), dimensionMap); + return md; + } + + private String getString(final Map row, String fieldName) { + return (String) row.get(fieldName); + } + private String trunc(String s, int l) { if (s == null) { diff --git a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/AlarmCreationBolt.java b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/AlarmCreationBolt.java index 4b5d245..0bf1816 100644 --- a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/AlarmCreationBolt.java +++ b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/AlarmCreationBolt.java @@ -28,6 +28,8 @@ import backtype.storm.tuple.Values; import monasca.common.model.alarm.AlarmState; import monasca.common.model.alarm.AlarmSubExpression; import monasca.common.model.event.AlarmDefinitionDeletedEvent; +import monasca.common.model.event.AlarmDefinitionUpdatedEvent; +import monasca.common.model.event.AlarmDeletedEvent; import monasca.common.model.metric.MetricDefinition; import monasca.common.streaming.storm.Logging; import monasca.common.util.Injector; @@ -35,6 +37,7 @@ import monasca.thresh.domain.model.Alarm; import monasca.thresh.domain.model.AlarmDefinition; import monasca.thresh.domain.model.MetricDefinitionAndTenantId; import monasca.thresh.domain.model.SubAlarm; +import monasca.thresh.domain.model.SubExpression; import monasca.thresh.domain.model.TenantIdAndMetricName; import monasca.thresh.domain.service.AlarmDAO; import monasca.thresh.domain.service.AlarmDefinitionDAO; @@ -51,8 +54,6 @@ import java.util.Map; /** * Handles creation of Alarms and Alarmed Metrics. - * - * MUST be only one of these bolts in the storm application */ public class AlarmCreationBolt extends BaseRichBolt { private static final long serialVersionUID = 1096706128973976599L; @@ -67,6 +68,8 @@ public class AlarmCreationBolt extends BaseRichBolt { private transient AlarmDAO alarmDAO; private OutputCollector collector; private final Map> waitingAlarms = new HashMap<>(); + private final Map> alarmCache = new HashMap<>(); + private final Map alarmDefinitionCache = new HashMap<>(); private static final List EMPTY_LIST = Collections.emptyList(); public AlarmCreationBolt(DataSourceFactory dbConfig) { @@ -86,7 +89,7 @@ public class AlarmCreationBolt extends BaseRichBolt { @Override public void execute(Tuple tuple) { - logger.info("tuple: {}", tuple); + logger.debug("tuple: {}", tuple); try { if (MetricFilteringBolt.NEW_METRIC_FOR_ALARM_DEFINITION_STREAM.equals(tuple.getSourceStreamId())) { final MetricDefinitionAndTenantId metricDefinitionAndTenantId = @@ -96,10 +99,9 @@ public class AlarmCreationBolt extends BaseRichBolt { .getSourceStreamId())) { final String eventType = tuple.getString(0); if (EventProcessingBolt.UPDATED.equals(eventType)) { - // We could try to update the subalarms, but it is easier just to delete - // the waiting alarms and wait for them to be recreated. The AlarmDefinition - // itself is not cached so we don't have to do anything with it - removeWaitingAlarmsForAlarmDefinition(tuple.getString(2)); + final SubExpression subExpression = (SubExpression) tuple.getValue(1); + final String alarmDefinitionId = tuple.getString(2); + updateSubAlarms(subExpression, alarmDefinitionId); } } else if (EventProcessingBolt.ALARM_DEFINITION_EVENT_STREAM_ID.equals(tuple.getSourceStreamId())) { final String eventType = tuple.getString(0); @@ -108,14 +110,22 @@ public class AlarmCreationBolt extends BaseRichBolt { if (EventProcessingBolt.DELETED.equals(eventType)) { final AlarmDefinitionDeletedEvent event = (AlarmDefinitionDeletedEvent) tuple.getValue(1); - removeWaitingAlarmsForAlarmDefinition(event.alarmDefinitionId); + deleteAlarmDefinition(event.alarmDefinitionId); } + else if (EventProcessingBolt.UPDATED.equals(eventType)) { + updateAlarmDefinition((AlarmDefinitionUpdatedEvent) tuple.getValue(1)); + } + } + } else if (EventProcessingBolt.ALARM_EVENT_STREAM_ID.equals(tuple.getSourceStreamId())) { + final String eventType = tuple.getString(0); + if (EventProcessingBolt.DELETED.equals(eventType)) { + removeAlarm((AlarmDeletedEvent) tuple.getValue(2)); } } else { - logger.error("Receieved tuple on unknown stream {}", tuple); + logger.error("Received tuple on unknown stream {}", tuple); } - + } catch (Exception e) { logger.error("Error processing tuple {}", tuple, e); } finally { @@ -123,16 +133,76 @@ public class AlarmCreationBolt extends BaseRichBolt { } } - private void removeWaitingAlarmsForAlarmDefinition(String alarmDefinitionId) { + private void removeAlarm(AlarmDeletedEvent event) { + logger.debug("Deleting alarm {} for Alarm Definition {}", event.alarmId, event.alarmDefinitionId); + final List alarms = alarmCache.get(event.alarmDefinitionId); + if (alarms != null) { + for (final Alarm alarm : alarms) { + if (alarm.getId().equals(event.alarmId)) { + logger.debug("Deleted alarm {} for Alarm Definition {}", event.alarmId, event.alarmDefinitionId); + alarms.remove(alarm); + break; + } + } + } + } + + private void updateSubAlarms(final SubExpression subExpression, final String alarmDefinitionId) { + logger.debug("Updating SubAlarms for AlarmDefinition Id {} SubExpression {}", + alarmDefinitionId, subExpression); + int count = 0; + if (alarmDefinitionCache.containsKey(alarmDefinitionId)) { + final List waiting = waitingAlarms.get(alarmDefinitionId); + if (waiting != null && !waiting.isEmpty()) { + for (final Alarm alarm : waiting) { + if (!alarm.updateSubAlarm(subExpression)) { + logger.error("Did not find SubAlarms for AlarmDefinition Id {} SubExpression {} Alarm {}", + alarmDefinitionId, subExpression, alarm); + } + count++; + } + } + } + logger.debug("Updated {} SubAlarms for AlarmDefinition Id {}", count, alarmDefinitionId); + } + + private void updateAlarmDefinition(final AlarmDefinitionUpdatedEvent event) { + final AlarmDefinition alarmDefinition = alarmDefinitionCache.get(event.alarmDefinitionId); + if (alarmDefinition != null) { + logger.debug("Updating AlarmDefinition {}", event.alarmDefinitionId); + alarmDefinition.setName(event.alarmName); + alarmDefinition.setDescription(event.alarmDescription); + alarmDefinition.setActionsEnabled(event.alarmActionsEnabled); + alarmDefinition.setExpression(event.alarmExpression); + alarmDefinition.setSeverity(event.severity); + if (!alarmDefinition.getMatchBy().equals(event.matchBy)) { + logger.error("AlarmDefinition {}: match-by changed, was {} now {}", + event.alarmDefinitionId, alarmDefinition.getMatchBy(), event.matchBy); + } + alarmDefinition.setMatchBy(event.matchBy); // Should never change + for (Map.Entry entry : event.changedSubExpressions.entrySet()) { + if (!alarmDefinition.updateSubExpression(entry.getKey(), entry.getValue())) { + logger.error("AlarmDefinition {}: Did not finding matching SubAlarmExpression id={} SubAlarmExpression{}", + event.alarmDefinitionId, entry.getKey(), entry.getValue()); + } + } + } + } + + private void deleteAlarmDefinition(String alarmDefinitionId) { + logger.debug("Deleting AlarmDefinition {}", alarmDefinitionId); final List waiting = waitingAlarms.remove(alarmDefinitionId); if (waiting != null && !waiting.isEmpty()) { - logger.info("{} waiting alarms removed for Alarm Definition Id {}", waiting != null + logger.debug("{} waiting alarms removed for Alarm Definition Id {}", waiting != null && !waiting.isEmpty() ? waiting.size() : "No", alarmDefinitionId); } + alarmCache.remove(alarmDefinitionId); + alarmDefinitionCache.remove(alarmDefinitionId); } protected void handleNewMetricDefinition( final MetricDefinitionAndTenantId metricDefinitionAndTenantId, final String alarmDefinitionId) { + final long start = System.currentTimeMillis(); final AlarmDefinition alarmDefinition = lookUpAlarmDefinition(alarmDefinitionId); if (alarmDefinition == null) { return; @@ -142,7 +212,7 @@ public class AlarmCreationBolt extends BaseRichBolt { return; } - final List existingAlarms = alarmDAO.findForAlarmDefinitionId(alarmDefinitionId); + final List existingAlarms = getExistingAlarms(alarmDefinitionId); if (alreadyCreated(existingAlarms, metricDefinitionAndTenantId)) { logger.warn("MetricDefinition {} is already in existing Alarm", metricDefinitionAndTenantId); return; @@ -154,49 +224,66 @@ public class AlarmCreationBolt extends BaseRichBolt { return; } - final Alarm existingAlarm = + final List matchingAlarms = fitsInExistingAlarm(metricDefinitionAndTenantId, alarmDefinition, existingAlarms); - if (existingAlarm != null) { - logger.info("Metric {} fits into existing alarm {}", metricDefinitionAndTenantId, - existingAlarm); - addToExistingAlarm(existingAlarm, metricDefinitionAndTenantId); - sendNewMetricDefinition(existingAlarm, metricDefinitionAndTenantId); + if (!matchingAlarms.isEmpty()) { + for (final Alarm matchingAlarm : matchingAlarms) { + logger.info("Metric {} fits into existing alarm {}", metricDefinitionAndTenantId, + matchingAlarm.getId()); + addToExistingAlarm(matchingAlarm, metricDefinitionAndTenantId); + sendNewMetricDefinition(matchingAlarm, metricDefinitionAndTenantId); + } } else { final List newAlarms = finishesAlarm(alarmDefinition, metricDefinitionAndTenantId, existingAlarms); for (final Alarm newAlarm : newAlarms) { logger.info("Metric {} finishes waiting alarm {}", metricDefinitionAndTenantId, newAlarm); + existingAlarms.add(newAlarm); for (final MetricDefinitionAndTenantId md : newAlarm.getAlarmedMetrics()) { sendNewMetricDefinition(newAlarm, md); } } } + logger.debug("Total processing took {} milliseconds", System.currentTimeMillis() - start); } - private Alarm fitsInExistingAlarm(final MetricDefinitionAndTenantId metricDefinitionAndTenantId, + private List getExistingAlarms(final String alarmDefinitionId) { + List alarms = alarmCache.get(alarmDefinitionId); + if (alarms != null) { + return alarms; + } + final long start = System.currentTimeMillis(); + alarms = alarmDAO.findForAlarmDefinitionId(alarmDefinitionId); + logger.info("Loading {} Alarms took {} milliseconds", alarms.size(), System.currentTimeMillis() - start); + alarmCache.put(alarmDefinitionId, alarms); + return alarms; + } + + private List fitsInExistingAlarm(final MetricDefinitionAndTenantId metricDefinitionAndTenantId, final AlarmDefinition alarmDefinition, final List existingAlarms) { - Alarm existingAlarm = null; + final List result = new LinkedList<>(); if (alarmDefinition.getMatchBy().isEmpty()) { if (!existingAlarms.isEmpty()) { - existingAlarm = existingAlarms.get(0); + result.add(existingAlarms.get(0)); } } else { for (final Alarm alarm : existingAlarms) { if (metricFitsInAlarm(alarm, alarmDefinition, metricDefinitionAndTenantId)) { - existingAlarm = alarm; - break; + result.add(alarm); } } } - return existingAlarm; + return result; } private void addToExistingAlarm(Alarm existingAlarm, MetricDefinitionAndTenantId metricDefinitionAndTenantId) { existingAlarm.addAlarmedMetric(metricDefinitionAndTenantId); + final long start = System.currentTimeMillis(); alarmDAO.addAlarmedMetric(existingAlarm.getId(), metricDefinitionAndTenantId); + logger.debug("Add Alarm Metric took {} milliseconds", System.currentTimeMillis() - start); } private void sendNewMetricDefinition(Alarm existingAlarm, @@ -264,19 +351,14 @@ public class AlarmCreationBolt extends BaseRichBolt { if (waitingAlarms.isEmpty()) { final Alarm newAlarm = new Alarm(alarmDefinition, AlarmState.UNDETERMINED); newAlarm.addAlarmedMetric(metricDefinitionAndTenantId); + reuseExistingMetric(newAlarm, alarmDefinition, existingAlarms); if (alarmIsComplete(newAlarm)) { logger.debug("New alarm is complete. Saving"); saveAlarm(newAlarm); result.add(newAlarm); } else { - if (reuseExistingMetric(newAlarm, alarmDefinition, existingAlarms)) { - logger.debug("New alarm is complete reusing existing metric. Saving"); - saveAlarm(newAlarm); - result.add(newAlarm); } - else { - logger.debug("Adding new alarm to the waiting list"); - addToWaitingAlarms(newAlarm, alarmDefinition); - } + logger.debug("Adding new alarm to the waiting list"); + addToWaitingAlarms(newAlarm, alarmDefinition); } } else { for (final Alarm waiting : waitingAlarms) { @@ -291,25 +373,21 @@ public class AlarmCreationBolt extends BaseRichBolt { return result; } - private boolean reuseExistingMetric(Alarm newAlarm, final AlarmDefinition alarmDefinition, + private void reuseExistingMetric(Alarm newAlarm, final AlarmDefinition alarmDefinition, List existingAlarms) { - boolean addedOne = false; for (final Alarm existingAlarm : existingAlarms) { for (final MetricDefinitionAndTenantId mtid : existingAlarm.getAlarmedMetrics()) { if (metricFitsInAlarm(newAlarm, alarmDefinition, mtid)) { newAlarm.addAlarmedMetric(mtid); - addedOne = true; } } } - if (!addedOne) { - return false; - } - return alarmIsComplete(newAlarm); } private void saveAlarm(Alarm newAlarm) { + final long start = System.currentTimeMillis(); alarmDAO.createAlarm(newAlarm); + logger.debug("Add Alarm took {} milliseconds", System.currentTimeMillis() - start); } private List findMatchingWaitingAlarms(List waiting, AlarmDefinition alarmDefinition, @@ -418,12 +496,17 @@ public class AlarmCreationBolt extends BaseRichBolt { } private AlarmDefinition lookUpAlarmDefinition(String alarmDefinitionId) { - final AlarmDefinition found = alarmDefDAO.findById(alarmDefinitionId); + AlarmDefinition found = alarmDefinitionCache.get(alarmDefinitionId); + if (found != null) { + return found; + } + found = alarmDefDAO.findById(alarmDefinitionId); if (found == null) { logger.warn("Did not find AlarmDefinition for ID {}", alarmDefinitionId); return null; } + alarmDefinitionCache.put(found.getId(), found); return found; } diff --git a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/AlarmThresholdingBolt.java b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/AlarmThresholdingBolt.java index 9eb0dd4..5a7cd65 100644 --- a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/AlarmThresholdingBolt.java +++ b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/AlarmThresholdingBolt.java @@ -22,6 +22,7 @@ import monasca.common.model.event.AlarmDefinitionUpdatedEvent; import monasca.common.model.event.AlarmStateTransitionedEvent; import monasca.common.model.event.AlarmUpdatedEvent; import monasca.common.model.alarm.AlarmState; +import monasca.common.model.alarm.AlarmSubExpression; import monasca.common.model.metric.MetricDefinition; import monasca.common.streaming.storm.Logging; import monasca.common.streaming.storm.Streams; @@ -34,7 +35,6 @@ import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; -import monasca.thresh.ThresholdingConfiguration; import monasca.thresh.domain.model.Alarm; import monasca.thresh.domain.model.AlarmDefinition; import monasca.thresh.domain.model.MetricDefinitionAndTenantId; @@ -153,7 +153,7 @@ public class AlarmThresholdingBolt extends BaseRichBolt { final AlarmDefinition alarmDefinition = alarmDefinitions.get(event.alarmDefinitionId); if (alarmDefinition == null) { // This is OK. No Alarms are using this AlarmDefinition - logger.info("Update of AlarmDefinition {} skipped. Not in use by this bolt", + logger.debug("Update of AlarmDefinition {} skipped. Not in use by this bolt", event.alarmDefinitionId); return; } @@ -163,6 +163,12 @@ public class AlarmThresholdingBolt extends BaseRichBolt { alarmDefinition.setSeverity(event.severity); alarmDefinition.setActionsEnabled(event.alarmActionsEnabled); alarmDefinition.setExpression(event.alarmExpression); + for (Map.Entry entry : event.changedSubExpressions.entrySet()) { + if (!alarmDefinition.updateSubExpression(entry.getKey(), entry.getValue())) { + logger.error("AlarmDefinition {}: Did not finding matching SubAlarmExpression id={} SubAlarmExpression{}", + event.alarmDefinitionId, entry.getKey(), entry.getValue()); + } + } } @Override @@ -251,57 +257,6 @@ public class AlarmThresholdingBolt extends BaseRichBolt { } - void handleAlarmDefinitionUpdated(String alarmDefId, AlarmDefinitionUpdatedEvent event) { - final AlarmDefinition oldAlarmDef = alarmDefinitions.get(alarmDefId); - if (oldAlarmDef == null) { - logger.debug("Updated Alarm Definition {} not loaded, ignoring", alarmDefId); - return; - } - - oldAlarmDef.setName(event.alarmName); - oldAlarmDef.setDescription(event.alarmDescription); - oldAlarmDef.setExpression(event.alarmExpression); - oldAlarmDef.setActionsEnabled(event.alarmActionsEnabled); - - /* Have to figure out how to handle this - // Now handle the SubAlarms - // First remove the deleted SubAlarms so we don't have to consider them later - for (Map.Entry entry : event.oldAlarmSubExpressions - .entrySet()) { - logger.debug("Removing deleted SubAlarm {}", entry.getValue()); - if (!oldAlarmDef.removeSubAlarmById(entry.getKey())) { - logger.error("Did not find removed SubAlarm {}", entry.getValue()); - } - } - - // Reuse what we can from the changed SubAlarms - for (Map.Entry entry : event.changedSubExpressions - .entrySet()) { - final SubAlarm oldSubAlarm = oldAlarmDef.getSubAlarm(entry.getKey()); - if (oldSubAlarm == null) { - logger.error("Did not find changed SubAlarm {}", entry.getValue()); - continue; - } - final SubAlarm newSubAlarm = new SubAlarm(entry.getKey(), oldAlarmDef.getId(), entry.getValue()); - newSubAlarm.setState(oldSubAlarm.getState()); - if (!oldSubAlarm.isCompatible(newSubAlarm)) { - newSubAlarm.setNoState(true); - } - logger.debug("Changing SubAlarm from {} to {}", oldSubAlarm, newSubAlarm); - oldAlarmDef.updateSubAlarm(newSubAlarm); - } - - // Add the new SubAlarms - for (Map.Entry entry : event.newAlarmSubExpressions - .entrySet()) { - final SubAlarm newSubAlarm = new SubAlarm(entry.getKey(), oldAlarmDef.getId(), entry.getValue()); - newSubAlarm.setNoState(true); - logger.debug("Adding SubAlarm {}", newSubAlarm); - oldAlarmDef.updateSubAlarm(newSubAlarm); - } - */ - } - String buildStateChangeReason() { return null; } diff --git a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/EventProcessingBolt.java b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/EventProcessingBolt.java index cf895d9..d1cea0f 100644 --- a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/EventProcessingBolt.java +++ b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/EventProcessingBolt.java @@ -165,6 +165,7 @@ public class EventProcessingBolt extends BaseRichBolt { } void handle(AlarmDeletedEvent event) { + logger.debug("Alarm {} deleted", event.alarmId); processSubAlarms(DELETED, event.tenantId, event.alarmDefinitionId, event.alarmMetrics, event.subAlarms); diff --git a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricFilteringBolt.java b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricFilteringBolt.java index 1123891..0e13697 100644 --- a/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricFilteringBolt.java +++ b/thresh/src/main/java/monasca/thresh/infrastructure/thresholding/MetricFilteringBolt.java @@ -296,6 +296,8 @@ public class MetricFilteringBolt extends BaseRichBolt { synchronized (SENTINAL) { alreadyFound.remove(metricDefinitionAndTenantId, alarmDefinitionId); } + logger.debug("Removed {} for Alarm Definition {}", metricDefinitionAndTenantId, + alarmDefinitionId); } } diff --git a/thresh/src/test/java/monasca/thresh/infrastructure/persistence/AlarmDAOImplTest.java b/thresh/src/test/java/monasca/thresh/infrastructure/persistence/AlarmDAOImplTest.java index 2c9b707..d236949 100644 --- a/thresh/src/test/java/monasca/thresh/infrastructure/persistence/AlarmDAOImplTest.java +++ b/thresh/src/test/java/monasca/thresh/infrastructure/persistence/AlarmDAOImplTest.java @@ -19,8 +19,7 @@ package monasca.thresh.infrastructure.persistence; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; - -import com.google.common.io.Resources; +import static org.testng.Assert.assertNull; import monasca.common.model.alarm.AggregateFunction; import monasca.common.model.alarm.AlarmExpression; @@ -40,14 +39,19 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -@Test +/** + * These tests won't work without the real mysql database so use mini-mon. + * Warning, this will truncate the alarms part of your mini-mon database + * @author craigbr + * + */ +@Test(groups = "database") public class AlarmDAOImplTest { private static final String TENANT_ID = "bob"; private static String ALARM_NAME = "90% CPU"; @@ -63,10 +67,9 @@ public class AlarmDAOImplTest { @BeforeClass protected void setupClass() throws Exception { - db = new DBI("jdbc:h2:mem:test;MODE=MySQL"); + // See class comment + db = new DBI("jdbc:mysql://192.168.10.4/mon", "monapi", "password"); handle = db.open(); - handle - .execute(Resources.toString(getClass().getResource("alarm.sql"), Charset.defaultCharset())); dao = new AlarmDAOImpl(db); } @@ -77,6 +80,8 @@ public class AlarmDAOImplTest { @BeforeMethod protected void beforeMethod() { + handle.execute("SET foreign_key_checks = 0;"); + handle.execute("truncate table alarm_definition"); handle.execute("truncate table alarm"); handle.execute("truncate table sub_alarm"); handle.execute("truncate table sub_alarm_definition"); @@ -90,6 +95,7 @@ public class AlarmDAOImplTest { alarmDef = new AlarmDefinition(TENANT_ID, ALARM_NAME, ALARM_DESCR, new AlarmExpression( expr), "LOW", ALARM_ENABLED, new ArrayList()); + AlarmDefinitionDAOImplTest.insertAlarmDefinition(handle, alarmDef); final Map dimensions = new HashMap(); dimensions.put("first", "first_value"); @@ -107,13 +113,20 @@ public class AlarmDAOImplTest { dao.createAlarm(firstAlarm); final Alarm secondAlarm = new Alarm(alarmDef, AlarmState.OK); + secondAlarm.addAlarmedMetric(newMetric); dao.createAlarm(secondAlarm); final AlarmDefinition secondAlarmDef = new AlarmDefinition(TENANT_ID, "Second", null, new AlarmExpression( "avg(cpu{disk=vda, instance_id=123}) > 10"), "LOW", true, Arrays.asList("dev")); + AlarmDefinitionDAOImplTest.insertAlarmDefinition(handle, secondAlarmDef); final Alarm thirdAlarm = new Alarm(secondAlarmDef, AlarmState.OK); + final Map dims = new HashMap<>(); + dims.put("disk", "vda"); + dims.put("instance_id", "123"); + thirdAlarm.addAlarmedMetric(new MetricDefinitionAndTenantId(new MetricDefinition("cpu", dims), + secondAlarmDef.getTenantId())); dao.createAlarm(thirdAlarm); verifyAlarmList(dao.findForAlarmDefinitionId(alarmDef.getId()), firstAlarm, secondAlarm); @@ -124,7 +137,7 @@ public class AlarmDAOImplTest { } private void verifyAlarmList(final List found, Alarm... expected) { - assertEquals(expected.length, found.size()); + assertEquals(found.size(), expected.length); for (final Alarm alarm : expected) { assertTrue(found.contains(alarm)); } @@ -132,6 +145,8 @@ public class AlarmDAOImplTest { public void shouldFindById() { final Alarm newAlarm = new Alarm(alarmDef, AlarmState.OK); + assertNull(dao.findById(newAlarm.getId())); + dao.createAlarm(newAlarm); assertEquals(dao.findById(newAlarm.getId()), newAlarm); diff --git a/thresh/src/test/java/monasca/thresh/infrastructure/persistence/AlarmDefinitionDAOImplTest.java b/thresh/src/test/java/monasca/thresh/infrastructure/persistence/AlarmDefinitionDAOImplTest.java index b767d68..e966df1 100644 --- a/thresh/src/test/java/monasca/thresh/infrastructure/persistence/AlarmDefinitionDAOImplTest.java +++ b/thresh/src/test/java/monasca/thresh/infrastructure/persistence/AlarmDefinitionDAOImplTest.java @@ -106,20 +106,20 @@ public class AlarmDefinitionDAOImplTest { final AlarmDefinition alarmDefinition = new AlarmDefinition(TENANT_ID, ALARM_NAME, ALARM_DESCR, expression, "LOW", false, Arrays.asList("fred", "barney")); - insert(alarmDefinition); + insertAlarmDefinition(handle, alarmDefinition); verifyListAllMatches(alarmDefinition); final AlarmExpression expression2 = new AlarmExpression("max(cpu{service=swift}) > 90"); final AlarmDefinition alarmDefinition2 = new AlarmDefinition(TENANT_ID, ALARM_NAME, ALARM_DESCR, expression2, "LOW", false, Arrays.asList("fred", "barney", "wilma", "betty")); - insert(alarmDefinition2); + insertAlarmDefinition(handle, alarmDefinition2); verifyListAllMatches(alarmDefinition, alarmDefinition2); } private void insertAndCheck(final AlarmDefinition alarmDefinition) { - insert(alarmDefinition); + insertAlarmDefinition(handle, alarmDefinition); assertEquals(dao.findById(alarmDefinition.getId()), alarmDefinition); } @@ -133,7 +133,9 @@ public class AlarmDefinitionDAOImplTest { } } - private void insert(AlarmDefinition alarmDefinition) { + // This method is not a test but without this TestNG tries to run it + @Test(enabled=false) + public static void insertAlarmDefinition(Handle handle, AlarmDefinition alarmDefinition) { try { handle.begin(); handle diff --git a/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/AlarmCreationBoltTest.java b/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/AlarmCreationBoltTest.java index 331ba63..c1289a0 100644 --- a/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/AlarmCreationBoltTest.java +++ b/thresh/src/test/java/monasca/thresh/infrastructure/thresholding/AlarmCreationBoltTest.java @@ -40,6 +40,7 @@ import monasca.common.model.alarm.AlarmExpression; import monasca.common.model.alarm.AlarmState; import monasca.common.model.alarm.AlarmSubExpression; import monasca.common.model.event.AlarmDefinitionDeletedEvent; +import monasca.common.model.event.AlarmDeletedEvent; import monasca.common.model.metric.MetricDefinition; import monasca.thresh.domain.model.Alarm; import monasca.thresh.domain.model.AlarmDefinition; @@ -55,6 +56,7 @@ import org.mockito.stubbing.Answer; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; @@ -257,20 +259,48 @@ public class AlarmCreationBoltTest { // Update the Alarm Definition final SubExpression first = alarmDefinition.getSubExpressions().get(0); - first.getAlarmSubExpression().setThreshold(42.0); + + // We make a copy of the SubExpression because the actual SubExpression from the AlarmDefinition is + // in the Alarm and updating first updates the SubAlarm's SubExpresssion directly + final SubExpression copy = new SubExpression(first.getId(), AlarmSubExpression.of(first.getAlarmSubExpression().getExpression())); + copy.getAlarmSubExpression().setThreshold(42.0); final MkTupleParam tupleParam = new MkTupleParam(); tupleParam.setFields(EventProcessingBolt.METRIC_SUB_ALARM_EVENT_STREAM_FIELDS); tupleParam.setStream(EventProcessingBolt.METRIC_SUB_ALARM_EVENT_STREAM_ID); final Tuple tuple = Testing.testTuple( - Arrays.asList(EventProcessingBolt.UPDATED, first, alarmDefinition.getId()), tupleParam); + Arrays.asList(EventProcessingBolt.UPDATED, copy, alarmDefinition.getId()), tupleParam); bolt.execute(tuple); - // Ensure they are gone - assertNull(bolt.countWaitingAlarms(alarmDefinition.getId())); + final AlarmSubExpression subExpr2 = + alarmDefinition.getAlarmExpression().getSubExpressions().get(1); + + // Now finish the Alarms + for (final String hostname : hostnames) { + final MetricDefinition metric = + build(subExpr2.getMetricDefinition().name, "hostname", hostname, "service", "2"); + sendNewMetric(new MetricDefinitionAndTenantId(metric, TENANT_ID), alarmDefinition.getId()); + } + + assertEquals(this.createdAlarms.size(), hostnames.size()); + + // Can't use verifyCreatedAlarm because then the AlarmDefinition must be updated which + // might update the SubAlarms directly because of reuse of AlarmSubExpressions + for (final Alarm alarm : this.createdAlarms) { + boolean found = false; + for (SubAlarm subAlarm : alarm.getSubAlarms()) { + if (subAlarm.getAlarmSubExpressionId().equals(first.getId())) { + assertEquals(subAlarm.getExpression().getThreshold(), + copy.getAlarmSubExpression().getThreshold()); + found = true; + break; + } + } + assertTrue(found, "Did not find expected sub alarm"); + } } - + private void sendAlarmDefinitionDeleted(final AlarmDefinition alarmDefinition) { final Map subAlarmMetricDefinitions = new HashMap<>(); for (final AlarmSubExpression subExpr : alarmDefinition.getAlarmExpression().getSubExpressions()) { @@ -347,6 +377,118 @@ public class AlarmCreationBoltTest { testMultipleExpressions(metricDefinitionsToSend, numDevs); } + public void testReuseMetricFromExistingAlarm() { + final String expression = "max(cpu{service=vivi}) > 90"; + final String[] matchBy = new String[] { "hostname", "amplifier" }; + final AlarmDefinition alarmDefinition = createAlarmDefinition(expression, matchBy); + + final MetricDefinition metric = + build("cpu", "hostname", "eleanore", "amplifier", "2", "service", "vivi"); + + bolt.handleNewMetricDefinition(new MetricDefinitionAndTenantId(metric, TENANT_ID), + alarmDefinition.getId()); + + assertEquals(this.createdAlarms.size(), 1); + verifyCreatedAlarm(this.createdAlarms.get(0), alarmDefinition, collector, + new MetricDefinitionAndTenantId(metric, TENANT_ID)); + + final MetricDefinition metric2 = + build("cpu", "hostname", "eleanore", "service", "vivi"); + + sendNewMetric(new MetricDefinitionAndTenantId(metric2, TENANT_ID), alarmDefinition.getId()); + + assertEquals(this.createdAlarms.size(), 1, + "A second alarm was created instead of the metric fitting into the first"); + + verifyCreatedAlarm(this.createdAlarms.get(0), alarmDefinition, collector, + new MetricDefinitionAndTenantId(metric, TENANT_ID), + new MetricDefinitionAndTenantId(metric2, TENANT_ID)); + + final MetricDefinition metric3 = + build("cpu", "hostname", "eleanore", "amplifier", "3", "service", "vivi"); + + bolt.handleNewMetricDefinition(new MetricDefinitionAndTenantId(metric3, TENANT_ID), + alarmDefinition.getId()); + + assertEquals(this.createdAlarms.size(), 2); + + verifyCreatedAlarm(this.createdAlarms.get(1), alarmDefinition, collector, + new MetricDefinitionAndTenantId(metric3, TENANT_ID), + new MetricDefinitionAndTenantId(metric2, TENANT_ID)); + } + + public void testUseMetricInExistingAlarm() { + final String expression = "max(cpu{service=vivi}) > 90"; + final String[] matchBy = new String[] { "hostname", "amplifier" }; + final AlarmDefinition alarmDefinition = createAlarmDefinition(expression, matchBy); + + final MetricDefinition metric = + build("cpu", "hostname", "eleanore", "amplifier", "2", "service", "vivi"); + + bolt.handleNewMetricDefinition(new MetricDefinitionAndTenantId(metric, TENANT_ID), + alarmDefinition.getId()); + + assertEquals(this.createdAlarms.size(), 1); + verifyCreatedAlarm(this.createdAlarms.get(0), alarmDefinition, collector, + new MetricDefinitionAndTenantId(metric, TENANT_ID)); + + final MetricDefinition metric3 = + build("cpu", "hostname", "eleanore", "amplifier", "3", "service", "vivi"); + + bolt.handleNewMetricDefinition(new MetricDefinitionAndTenantId(metric3, TENANT_ID), + alarmDefinition.getId()); + + assertEquals(this.createdAlarms.size(), 2); + + verifyCreatedAlarm(this.createdAlarms.get(1), alarmDefinition, collector, + new MetricDefinitionAndTenantId(metric3, TENANT_ID)); + + final MetricDefinition metric2 = + build("cpu", "hostname", "eleanore", "service", "vivi"); + + sendNewMetric(new MetricDefinitionAndTenantId(metric2, TENANT_ID), alarmDefinition.getId()); + + assertEquals(this.createdAlarms.size(), 2, + "A third alarm was created instead of the metric fitting into the first two"); + + verifyCreatedAlarm(this.createdAlarms.get(0), alarmDefinition, collector, + new MetricDefinitionAndTenantId(metric, TENANT_ID), + new MetricDefinitionAndTenantId(metric2, TENANT_ID)); + + verifyCreatedAlarm(this.createdAlarms.get(1), alarmDefinition, collector, + new MetricDefinitionAndTenantId(metric3, TENANT_ID), + new MetricDefinitionAndTenantId(metric2, TENANT_ID)); + } + + public void testDeletedAlarm() { + final AlarmDefinition alarmDefinition = runCreateSimpleAlarm(); + assertEquals(this.createdAlarms.size(), 1); + final Alarm alarmToDelete = this.createdAlarms.get(0); + this.createdAlarms.clear(); + final Map subAlarms = new HashMap<>(); + for (final SubAlarm subAlarm : alarmToDelete.getSubAlarms()) { + subAlarms.put(subAlarm.getId(), subAlarm.getExpression()); + } + final List alarmedMetrics = new ArrayList<>(); + for (final MetricDefinitionAndTenantId mdtid : alarmToDelete.getAlarmedMetrics()) { + alarmedMetrics.add(mdtid.metricDefinition); + } + final AlarmDeletedEvent event = new AlarmDeletedEvent(TENANT_ID, alarmToDelete.getId(), + alarmedMetrics, alarmToDelete.getAlarmDefinitionId(), subAlarms); + + final MkTupleParam tupleParam = new MkTupleParam(); + tupleParam.setFields(EventProcessingBolt.ALARM_EVENT_STREAM_FIELDS); + tupleParam.setStream(EventProcessingBolt.ALARM_EVENT_STREAM_ID); + final Tuple tuple = + Testing.testTuple(Arrays.asList(EventProcessingBolt.DELETED, alarmToDelete.getId(), event), + tupleParam); + + bolt.execute(tuple); + + // Make sure the alarm gets created again + createAlarms(alarmDefinition); + } + private void testMultipleExpressions(final List metricDefinitionsToSend, final int numAlarms) { final AlarmDefinition alarmDefinition = @@ -359,10 +501,15 @@ public class AlarmCreationBoltTest { assertEquals(this.createdAlarms.size(), numAlarms); } - private void runCreateSimpleAlarm(final String... matchBy) { + private AlarmDefinition runCreateSimpleAlarm(final String... matchBy) { final String expression = "max(cpu{service=2}) > 90"; final AlarmDefinition alarmDefinition = createAlarmDefinition(expression, matchBy); + createAlarms(alarmDefinition, matchBy); + return alarmDefinition; + } + + private void createAlarms(final AlarmDefinition alarmDefinition, final String... matchBy) { final MetricDefinition metric = build("cpu", "hostname", "eleanore", "service", "2", "other", "vivi"); @@ -373,19 +520,18 @@ public class AlarmCreationBoltTest { verifyCreatedAlarm(this.createdAlarms.get(0), alarmDefinition, collector, new MetricDefinitionAndTenantId(metric, TENANT_ID)); - this.createdAlarms.clear(); final MetricDefinition metric2 = build("cpu", "hostname", "vivi", "service", "2", "other", "eleanore"); sendNewMetric(new MetricDefinitionAndTenantId(metric2, TENANT_ID), alarmDefinition.getId()); if (matchBy.length == 0) { - assertEquals(this.createdAlarms.size(), 0, + assertEquals(this.createdAlarms.size(), 1, "A second alarm was created instead of the metric fitting into the first"); } else { - assertEquals(this.createdAlarms.size(), 1, + assertEquals(this.createdAlarms.size(), 2, "The metric was fitted into the first alarm instead of creating a new alarm"); - verifyCreatedAlarm(this.createdAlarms.get(0), alarmDefinition, collector, + verifyCreatedAlarm(this.createdAlarms.get(1), alarmDefinition, collector, new MetricDefinitionAndTenantId(metric2, TENANT_ID)); // Now send a metric that must fit into the just created alarm to test that @@ -395,10 +541,10 @@ public class AlarmCreationBoltTest { sendNewMetric(new MetricDefinitionAndTenantId(metric3, TENANT_ID), alarmDefinition.getId()); - assertEquals(this.createdAlarms.size(), 1, + assertEquals(this.createdAlarms.size(), 2, "The metric created a new alarm instead of fitting into the second"); - verifyCreatedAlarm(this.createdAlarms.get(0), alarmDefinition, collector, + verifyCreatedAlarm(this.createdAlarms.get(1), alarmDefinition, collector, new MetricDefinitionAndTenantId(metric2, TENANT_ID), new MetricDefinitionAndTenantId(metric3, TENANT_ID)); } } @@ -467,6 +613,8 @@ public class AlarmCreationBoltTest { } expectedAlarm.setSubAlarms(expectedSubAlarms); + assertEquals(newAlarm.getAlarmedMetrics().size(), mtids.length); + for (final SubAlarm subAlarm : expectedAlarm.getSubAlarms()) { // Have to do it this way because order of sub alarms is not deterministic MetricDefinitionAndTenantId mtid = null;