This commit is contained in:
Jonathan Halterman 2013-03-28 17:54:46 -07:00
parent c94c4adb11
commit ec4525f3ba
22 changed files with 351 additions and 287 deletions

16
pom.xml
View File

@ -15,8 +15,8 @@
<properties>
<versionNumber>1.0.0</versionNumber>
<computedVersion>${versionNumber}-SNAPSHOT</computedVersion>
<ps.common.version>1.0.0.220</ps.common.version>
<maas.commons.version>1.0.0.20</maas.commons.version>
<ps.common.version>1.0.0.222</ps.common.version>
<maas.commons.version>1.0.0.24</maas.commons.version>
<dropwizard.version>0.6.1</dropwizard.version>
<skipITs>true</skipITs>
@ -72,9 +72,9 @@
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.2</version>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.0.7</version>
</dependency>
</dependencies>
</dependencyManagement>
@ -105,6 +105,12 @@
<artifactId>storm</artifactId>
<version>0.8.2</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.yammer.metrics</groupId>

View File

@ -47,10 +47,8 @@ public class TopologyModule extends AbstractModule {
@Provides
Config stormConfig() {
if (stormConfig !=null)
if (stormConfig != null)
return stormConfig;
// Config config = new Config();
// config.put(key, value)
return new Config();
}
@ -87,14 +85,16 @@ public class TopologyModule extends AbstractModule {
// Metrics / Event -> Aggregation
builder.setBolt("aggregation", new MetricAggregationBolt(), config.aggregationParallelism)
.fieldsGrouping("metrics", new Fields("metricDefinition"))
.fieldsGrouping("event-bolt", EventProcessingBolt.ALARM_EVENT_STREAM_ID,
.fieldsGrouping("event-bolt", EventProcessingBolt.METRIC_SUB_ALARM_EVENT_STREAM_ID,
new Fields("metricDefinition"))
.fieldsGrouping("event-bolt", EventProcessingBolt.METRIC_ALARM_EVENT_STREAM_ID,
new Fields("metricDefinition"));
// Aggregation / Event -> Thresholding
builder.setBolt("thresholding", new AlarmThresholdingBolt(), config.thresholdingParallelism)
.fieldsGrouping("aggregation", new Fields("compositeAlarmId"))
.fieldsGrouping("event-bolt", EventProcessingBolt.COMPOSITE_ALARM_EVENT_STREAM_ID,
new Fields("compositeAlarmId"));
.fieldsGrouping("aggregation", new Fields("alarmId"))
.fieldsGrouping("event-bolt", EventProcessingBolt.ALARM_EVENT_STREAM_ID,
new Fields("alarmId"));
return builder.createTopology();
}

View File

@ -30,7 +30,7 @@ public class Alarm extends AbstractEntity {
this.expression = expression;
this.subAlarms = new HashMap<String, SubAlarm>();
for (SubAlarm subAlarm : subAlarms)
this.subAlarms.put(subAlarm.getExpression().getId(), subAlarm);
this.subAlarms.put(subAlarm.getId(), subAlarm);
this.state = state;
}
@ -40,8 +40,8 @@ public class Alarm extends AbstractEntity {
*/
public boolean evaluate() {
if (!AlarmState.UNDETERMINED.equals(state)) {
for (SubAlarm alarm : subAlarms.values())
if (AlarmState.UNDETERMINED.equals(alarm.getState())) {
for (SubAlarm subAlarm : subAlarms.values())
if (AlarmState.UNDETERMINED.equals(subAlarm.getState())) {
state = AlarmState.UNDETERMINED;
return true;
}
@ -67,14 +67,6 @@ public class Alarm extends AbstractEntity {
return true;
}
public SubAlarm getAlarm(String alarmId) {
return subAlarms.get(alarmId);
}
public Collection<SubAlarm> getAlarms() {
return subAlarms.values();
}
public AlarmExpression getExpression() {
return expression;
}
@ -87,6 +79,14 @@ public class Alarm extends AbstractEntity {
return state;
}
public SubAlarm getSubAlarm(String subAlarmId) {
return subAlarms.get(subAlarmId);
}
public Collection<SubAlarm> getSubAlarms() {
return subAlarms.values();
}
public String getTenantId() {
return tenantId;
}
@ -113,10 +113,10 @@ public class Alarm extends AbstractEntity {
@Override
public String toString() {
return name;
return String.format("Alarm [name=%s]", name);
}
public void updateAlarm(SubAlarm alarm) {
subAlarms.put(alarm.getExpression().getId(), alarm);
public void updateSubAlarm(SubAlarm subAlarm) {
subAlarms.put(subAlarm.getId(), subAlarm);
}
}

View File

@ -1,52 +0,0 @@
package com.hpcloud.maas.domain.model;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Data for a specific metric. Value object.
*
* @author Jonathan Halterman
*/
public class MetricData {
private final Map<String, SubAlarmData> subAlarmsData = new HashMap<String, SubAlarmData>();
public MetricData(List<SubAlarm> subAlarms) {
long initialTimestamp = System.currentTimeMillis();
for (SubAlarm subAlarm : subAlarms)
addAlarm(subAlarm, initialTimestamp);
}
public void addAlarm(SubAlarm subAlarm, long initialTimestamp) {
subAlarmsData.put(subAlarm.getExpression().getId(),
new SubAlarmData(subAlarm, initialTimestamp));
}
/**
* Adds the {@code value} to the statistics for the slot associated with the {@code timestamp},
* else <b>does nothing</b> if the {@code timestamp} is outside of the window.
*/
public void addValue(double value, long timestamp) {
for (SubAlarmData alarmData : subAlarmsData.values())
alarmData.getStats().addValue(value, timestamp);
}
public SubAlarmData alarmDataFor(String alarmId) {
return subAlarmsData.get(alarmId);
}
public Collection<SubAlarmData> getAlarmData() {
return subAlarmsData.values();
}
public void removeAlarm(String alarmId) {
subAlarmsData.remove(alarmId);
}
@Override
public String toString() {
return String.format("MetricData [alarmsData=%s]", subAlarmsData);
}
}

View File

@ -2,31 +2,34 @@ package com.hpcloud.maas.domain.model;
import com.hpcloud.maas.common.model.alarm.AlarmState;
import com.hpcloud.maas.common.model.alarm.AlarmSubExpression;
import com.hpcloud.maas.domain.common.AbstractEntity;
/**
* Sub-alarm. Decorates an AlarmSubExpression.
*
* @author Jonathan Halterman
*/
public class SubAlarm {
private String compositeId;
public class SubAlarm extends AbstractEntity {
private String alarmId;
private AlarmSubExpression expression;
private AlarmState state;
public SubAlarm(String compositeId, AlarmSubExpression expression) {
this.compositeId = compositeId;
public SubAlarm(String alarmId, String id, AlarmSubExpression expression) {
this.alarmId = alarmId;
this.id = id;
this.expression = expression;
this.state = AlarmState.UNDETERMINED;
}
public SubAlarm(String compositeId, AlarmSubExpression expression, AlarmState state) {
this.compositeId = compositeId;
public SubAlarm(String alarmId, String id, AlarmSubExpression expression, AlarmState state) {
this.alarmId = alarmId;
this.id = id;
this.expression = expression;
this.state = state;
}
public String getCompositeId() {
return compositeId;
public String getAlarmId() {
return alarmId;
}
public AlarmSubExpression getExpression() {
@ -40,4 +43,10 @@ public class SubAlarm {
public void setState(AlarmState state) {
this.state = state;
}
@Override
public String toString() {
return String.format("SubAlarm [alarmId=%s, id=%s, expression=%s, state=%s]", alarmId, id,
expression, state);
}
}

View File

@ -6,27 +6,27 @@ import com.hpcloud.maas.util.stats.Statistics;
import com.hpcloud.maas.util.time.Timescale;
/**
* Data for a specific alarm. Value object.
* Aggregates statistics for a specific SubAlarm.
*
* @author Jonathan Halterman
*/
public class SubAlarmData {
public class SubAlarmStats {
/**
* Helps determine how many observations to wait for before changing an alarm's state to
* insufficient data.
*/
private static final int INSUFFICIENT_DATA_COEFFICIENT = 3;
private final SubAlarm alarm;
private final SubAlarm subAlarm;
private final SlidingWindowStats stats;
private final int emptySlotObservationThreshold;
private int emptySlotObservations;
public SubAlarmData(SubAlarm alarm, long initialTimestamp) {
this.alarm = alarm;
this.stats = new SlidingWindowStats(Timescale.SECONDS_SINCE_EPOCH, alarm.getExpression()
.getPeriod(), alarm.getExpression().getPeriods(),
Statistics.statTypeFor(alarm.getExpression().getFunction()), initialTimestamp);
public SubAlarmStats(SubAlarm subAlarm, long initialTimestamp) {
this.subAlarm = subAlarm;
this.stats = new SlidingWindowStats(Timescale.SECONDS_SINCE_EPOCH, subAlarm.getExpression()
.getPeriod(), subAlarm.getExpression().getPeriods(),
Statistics.statTypeFor(subAlarm.getExpression().getFunction()), initialTimestamp);
emptySlotObservationThreshold = (stats.getSlotWidthInMinutes() == 0 ? 1
: stats.getSlotWidthInMinutes()) * INSUFFICIENT_DATA_COEFFICIENT;
}
@ -48,19 +48,19 @@ public class SubAlarmData {
// TODO initialTimestamp should come into play here for selecting the appropriate portion of the
// window. maybe? does that mean the window needs to be larger than it is by default?
AlarmState initialState = alarm.getState();
AlarmState initialState = subAlarm.getState();
if (emptySlotObservations >= emptySlotObservationThreshold) {
if (AlarmState.UNDETERMINED.equals(initialState))
return false;
alarm.setState(AlarmState.UNDETERMINED);
subAlarm.setState(AlarmState.UNDETERMINED);
return true;
}
boolean alarmed = true;
for (double value : stats.getValues())
if (!alarm.getExpression()
if (!subAlarm.getExpression()
.getOperator()
.evaluate(value, alarm.getExpression().getThreshold())) {
.evaluate(value, subAlarm.getExpression().getThreshold())) {
alarmed = false;
break;
}
@ -68,25 +68,25 @@ public class SubAlarmData {
if (alarmed) {
if (AlarmState.ALARM.equals(initialState))
return false;
alarm.setState(AlarmState.ALARM);
subAlarm.setState(AlarmState.ALARM);
return true;
}
if (AlarmState.OK.equals(initialState))
return false;
alarm.setState(AlarmState.OK);
subAlarm.setState(AlarmState.OK);
return true;
}
/**
* Returns the alarm that data is being observed for.
*/
public SubAlarm getAlarm() {
return alarm;
public SubAlarm getSubAlarm() {
return subAlarm;
}
@Override
public String toString() {
return String.format("AlarmData [alarm=%s, stats=%s]", alarm, stats);
return String.format("SubAlarmStats [subAlarm=%s, stats=%s]", subAlarm, stats);
}
}

View File

@ -1,9 +1,5 @@
package com.hpcloud.maas.domain.service;
import java.util.List;
import com.hpcloud.maas.common.model.metric.MetricDefinition;
import com.hpcloud.maas.domain.model.SubAlarm;
import com.hpcloud.maas.domain.model.Alarm;
/**
@ -12,9 +8,6 @@ import com.hpcloud.maas.domain.model.Alarm;
* @author Jonathan Halterman
*/
public interface AlarmDAO {
/** Finds and returns all alarm components for the {@code metricDefinition}. */
List<SubAlarm> find(MetricDefinition metricDefinition);
/** Finds and returns the CompositeAlarm for the {@code compositeAlarmId}. */
Alarm findByCompositeId(String compositeAlarmId);
/** Finds and returns the Alarm for the {@code id}. */
Alarm findById(String id);
}

View File

@ -0,0 +1,16 @@
package com.hpcloud.maas.domain.service;
import java.util.List;
import com.hpcloud.maas.common.model.metric.MetricDefinition;
import com.hpcloud.maas.domain.model.SubAlarm;
/**
* SubAlarm DAO.
*
* @author Jonathan Halterman
*/
public interface SubAlarmDAO {
/** Finds and returns all sub alarms for the {@code metricDefinition}. */
List<SubAlarm> find(MetricDefinition metricDefinition);
}

View File

@ -0,0 +1,45 @@
package com.hpcloud.maas.domain.service;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.hpcloud.maas.domain.model.SubAlarm;
import com.hpcloud.maas.domain.model.SubAlarmStats;
/**
* SubAlarmStats repository.
*
* @author Jonathan Halterman
*/
public class SubAlarmStatsRepository {
private final Map<String, SubAlarmStats> subAlarmStats = new HashMap<String, SubAlarmStats>();
public SubAlarmStatsRepository(List<SubAlarm> subAlarms) {
long initialTimestamp = System.currentTimeMillis();
for (SubAlarm subAlarm : subAlarms)
add(subAlarm, initialTimestamp);
}
public void add(SubAlarm subAlarm, long initialTimestamp) {
subAlarmStats.put(subAlarm.getId(), new SubAlarmStats(subAlarm, initialTimestamp));
}
public Collection<SubAlarmStats> get() {
return subAlarmStats.values();
}
public SubAlarmStats get(String subAlarmId) {
return subAlarmStats.get(subAlarmId);
}
public void remove(String subAlarmId) {
subAlarmStats.remove(subAlarmId);
}
@Override
public String toString() {
return String.format("SubAlarmStatsRepository [subAlarmStats=%s]", subAlarmStats);
}
}

View File

@ -1,13 +1,9 @@
package com.hpcloud.maas.infrastructure.persistence;
import java.util.List;
import javax.inject.Inject;
import org.skife.jdbi.v2.DBI;
import com.hpcloud.maas.common.model.metric.MetricDefinition;
import com.hpcloud.maas.domain.model.SubAlarm;
import com.hpcloud.maas.domain.model.Alarm;
import com.hpcloud.maas.domain.service.AlarmDAO;
@ -25,27 +21,7 @@ public class AlarmDAOImpl implements AlarmDAO {
}
@Override
public List<SubAlarm> find(MetricDefinition metricDefinition) {
// Handle h = db.open();
//
// try {
// List<Alarm> alarms = h.createQuery("select * from alarm where tenant_id = :tenantId")
// .bind("tenantId", tenantId)
// .map(new BeanMapper<Alarm>(Alarm.class))
// .list();
//
// for (Alarm alarm : alarms)
// alarm.setMetricDefinition(metricDefinition);
//
// return alarms;
// } finally {
// h.close();
// }
return null;
}
@Override
public Alarm findByCompositeId(String compositeAlarmId) {
public Alarm findById(String id) {
return null;
}
}

View File

@ -0,0 +1,45 @@
package com.hpcloud.maas.infrastructure.persistence;
import java.util.List;
import javax.inject.Inject;
import org.skife.jdbi.v2.DBI;
import com.hpcloud.maas.common.model.metric.MetricDefinition;
import com.hpcloud.maas.domain.model.SubAlarm;
import com.hpcloud.maas.domain.service.SubAlarmDAO;
/**
* Alarm DAO implementation.
*
* @author Jonathan Halterman
*/
public class SubAlarmDAOImpl implements SubAlarmDAO {
private final DBI db;
@Inject
public SubAlarmDAOImpl(DBI db) {
this.db = db;
}
@Override
public List<SubAlarm> find(MetricDefinition metricDefinition) {
// Handle h = db.open();
//
// try {
// List<Alarm> alarms = h.createQuery("select * from alarm where tenant_id = :tenantId")
// .bind("tenantId", tenantId)
// .map(new BeanMapper<Alarm>(Alarm.class))
// .list();
//
// for (Alarm alarm : alarms)
// alarm.setMetricDefinition(metricDefinition);
//
// return alarms;
// } finally {
// h.close();
// }
return null;
}
}

View File

@ -13,8 +13,8 @@ import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import com.hpcloud.maas.common.event.AlarmDeletedEvent;
import com.hpcloud.maas.domain.model.SubAlarm;
import com.hpcloud.maas.domain.model.Alarm;
import com.hpcloud.maas.domain.model.SubAlarm;
import com.hpcloud.maas.domain.service.AlarmDAO;
import com.hpcloud.maas.infrastructure.storm.Streams;
import com.hpcloud.util.Injector;
@ -23,11 +23,11 @@ import com.hpcloud.util.Injector;
* Determines whether an alarm threshold has been exceeded.
*
* <p>
* Receives composite alarm state changes and events.
* Receives alarm state changes and events.
*
* <ul>
* <li>Input: String compositeAlarmId, Object alarm
* <li>Input composite-alarm-events: String compositeAlarmId, String eventType
* <li>Input: String alarmId, SubAlarm subAlarm
* <li>Input alarm-events: String eventType, String alarmId
* </ul>
*
* @author Jonathan Halterman
@ -36,7 +36,7 @@ public class AlarmThresholdingBolt extends BaseRichBolt {
private static final Logger LOG = LoggerFactory.getLogger(AlarmThresholdingBolt.class);
private static final long serialVersionUID = -4126465124017857754L;
private final Map<String, Alarm> compositeAlarms = new HashMap<String, Alarm>();
private final Map<String, Alarm> alarms = new HashMap<String, Alarm>();
private transient AlarmDAO alarmDAO;
private TopologyContext context;
private OutputCollector collector;
@ -48,19 +48,19 @@ public class AlarmThresholdingBolt extends BaseRichBolt {
@Override
public void execute(Tuple tuple) {
if (Streams.DEFAULT_STREAM_ID.equals(tuple.getSourceStreamId())) {
String compositeAlarmId = tuple.getString(0);
Alarm compositeAlarm = getOrCreateCompositeAlarm(compositeAlarmId);
if (compositeAlarm == null)
String alarmId = tuple.getString(0);
Alarm alarm = getOrCreateAlarm(alarmId);
if (alarm == null)
return;
SubAlarm alarm = (SubAlarm) tuple.getValue(1);
evaluateThreshold(compositeAlarm, alarm);
} else if (EventProcessingBolt.COMPOSITE_ALARM_EVENT_STREAM_ID.equals(tuple.getSourceStreamId())) {
String compositeAlarmId = tuple.getString(0);
String eventType = tuple.getString(1);
SubAlarm subAlarm = (SubAlarm) tuple.getValue(1);
evaluateThreshold(alarm, subAlarm);
} else if (EventProcessingBolt.ALARM_EVENT_STREAM_ID.equals(tuple.getSourceStreamId())) {
String eventType = tuple.getString(0);
String alarmId = tuple.getString(1);
if (AlarmDeletedEvent.class.getSimpleName().equals(eventType))
handleAlarmDeleted(compositeAlarmId);
handleAlarmDeleted(alarmId);
}
}
@ -72,33 +72,31 @@ public class AlarmThresholdingBolt extends BaseRichBolt {
alarmDAO = Injector.getInstance(AlarmDAO.class);
}
void evaluateThreshold(Alarm compositeAlarm, SubAlarm alarm) {
LOG.debug("{} Received state change for composite alarm id {}, {}", context.getThisTaskId(),
compositeAlarm.getId(), alarm);
compositeAlarm.updateAlarm(alarm);
if (compositeAlarm.evaluate()) {
void evaluateThreshold(Alarm alarm, SubAlarm subAlarm) {
LOG.debug("{} Received state change for {}", context.getThisTaskId(), subAlarm);
alarm.updateSubAlarm(subAlarm);
if (alarm.evaluate()) {
// Emit notification
// Update persistent alarm state
}
}
void handleAlarmDeleted(String compositeAlarmId) {
LOG.debug("{} Received AlarmDeletedEvent for composite alarm id {}", context.getThisTaskId(),
compositeAlarmId);
compositeAlarms.remove(compositeAlarmId);
void handleAlarmDeleted(String alarmId) {
LOG.debug("{} Received AlarmDeletedEvent for alarm id {}", context.getThisTaskId(), alarmId);
alarms.remove(alarmId);
}
private Alarm getOrCreateCompositeAlarm(String compositeAlarmId) {
Alarm compositeAlarm = compositeAlarms.get(compositeAlarmId);
if (compositeAlarm == null) {
compositeAlarm = alarmDAO.findByCompositeId(compositeAlarmId);
if (compositeAlarm == null)
LOG.error("Failed to locate composite alarm for id {}", compositeAlarmId);
private Alarm getOrCreateAlarm(String alarmId) {
Alarm alarm = alarms.get(alarmId);
if (alarm == null) {
alarm = alarmDAO.findById(alarmId);
if (alarm == null)
LOG.error("Failed to locate alarm for id {}", alarmId);
else {
compositeAlarms.put(compositeAlarmId, compositeAlarm);
alarms.put(alarmId, alarm);
}
}
return compositeAlarm;
return alarm;
}
}

View File

@ -17,15 +17,18 @@ import com.hpcloud.maas.common.event.AlarmCreatedEvent;
import com.hpcloud.maas.common.event.AlarmDeletedEvent;
import com.hpcloud.maas.common.model.alarm.AlarmSubExpression;
import com.hpcloud.maas.common.model.metric.MetricDefinition;
import com.hpcloud.maas.domain.model.SubAlarm;
/**
* Processes events by emitting tuples related to the event.
*
* <ul>
* <li>Input: Object event
* <li>Output alarm-events: MetricDefinition metricDefinition, String compositeAlarmId, String
* eventType, [AlarmSubExpression alarm]
* <li>Output composite-alarm-events: String compositeAlarmId, String eventType
* <li>Output alarm-events: String eventType, String alarmId
* <li>Output metric-alarm-events: String eventType, MetricDefinition metricDefinition, String
* alarmId
* <li>Output metric-sub-alarm-events: String eventType, MetricDefinition metricDefinition, SubAlarm
* subAlarm
* </ul>
*
* @author Jonathan Halterman
@ -33,18 +36,23 @@ import com.hpcloud.maas.common.model.metric.MetricDefinition;
public class EventProcessingBolt extends BaseRichBolt {
private static final long serialVersionUID = 897171858708109378L;
private static final Logger LOG = LoggerFactory.getLogger(EventProcessingBolt.class);
/** Stream for alarm specific events. */
public static final String ALARM_EVENT_STREAM_ID = "alarm-events";
public static final String COMPOSITE_ALARM_EVENT_STREAM_ID = "composite-alarm-events";
/** Stream for metric and alarm specific events. */
public static final String METRIC_ALARM_EVENT_STREAM_ID = "metric-alarm-events";
/** Stream for metric and sub-alarm specific events. */
public static final String METRIC_SUB_ALARM_EVENT_STREAM_ID = "metric-sub-alarm-events";
private TopologyContext context;
private OutputCollector collector;
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream(ALARM_EVENT_STREAM_ID, new Fields("metricDefinition",
"compositeAlarmId", "eventType", "alarm"));
declarer.declareStream(COMPOSITE_ALARM_EVENT_STREAM_ID, new Fields("compositeAlarmId",
"eventType"));
declarer.declareStream(ALARM_EVENT_STREAM_ID, new Fields("eventType", "alarmId"));
declarer.declareStream(METRIC_ALARM_EVENT_STREAM_ID, new Fields("eventType",
"metricDefinition", "alarmId"));
declarer.declareStream(METRIC_SUB_ALARM_EVENT_STREAM_ID, new Fields("eventType",
"metricDefinition", "subAlarm"));
}
@Override
@ -66,15 +74,17 @@ public class EventProcessingBolt extends BaseRichBolt {
}
void handle(AlarmCreatedEvent event) {
for (AlarmSubExpression alarm : event.expression.getSubExpressions())
collector.emit(ALARM_EVENT_STREAM_ID, new Values(alarm.getMetricDefinition(), event.id,
event.getClass().getSimpleName(), alarm));
String eventType = event.getClass().getSimpleName();
for (Map.Entry<String, AlarmSubExpression> subExpressionEntry : event.alarmSubExpressions.entrySet())
collector.emit(METRIC_SUB_ALARM_EVENT_STREAM_ID, new Values(eventType,
subExpressionEntry.getValue().getMetricDefinition(), new SubAlarm(event.alarmId,
subExpressionEntry.getKey(), subExpressionEntry.getValue())));
}
void handle(AlarmDeletedEvent event) {
String eventType = event.getClass().getSimpleName();
for (MetricDefinition metricDef : event.metricDefinitions)
collector.emit(ALARM_EVENT_STREAM_ID, new Values(metricDef, event.id, event.getClass()
.getSimpleName()));
collector.emit(COMPOSITE_ALARM_EVENT_STREAM_ID, new Values(event.id, event));
collector.emit(METRIC_ALARM_EVENT_STREAM_ID, new Values(eventType, metricDef, event.alarmId));
collector.emit(ALARM_EVENT_STREAM_ID, new Values(eventType, event.alarmId));
}
}

View File

@ -20,13 +20,12 @@ import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import com.hpcloud.maas.common.event.AlarmCreatedEvent;
import com.hpcloud.maas.common.event.AlarmDeletedEvent;
import com.hpcloud.maas.common.model.alarm.AlarmSubExpression;
import com.hpcloud.maas.common.model.metric.Metric;
import com.hpcloud.maas.common.model.metric.MetricDefinition;
import com.hpcloud.maas.domain.model.MetricData;
import com.hpcloud.maas.domain.model.SubAlarm;
import com.hpcloud.maas.domain.model.SubAlarmData;
import com.hpcloud.maas.domain.service.AlarmDAO;
import com.hpcloud.maas.domain.model.SubAlarmStats;
import com.hpcloud.maas.domain.service.SubAlarmDAO;
import com.hpcloud.maas.domain.service.SubAlarmStatsRepository;
import com.hpcloud.maas.infrastructure.storm.Streams;
import com.hpcloud.maas.infrastructure.storm.Tuples;
import com.hpcloud.util.Injector;
@ -38,9 +37,11 @@ import com.hpcloud.util.Injector;
*
* <ul>
* <li>Input: MetricDefinition metricDefinition, Metric metric
* <li>Input alarm-events: MetricDefinition metricDefinition, String compositeAlarmId, String
* eventType, [AlarmSubExpression alarm]
* <li>Output: String compositeAlarmId, Alarm alarm
* <li>Input metric-alarm-events: String eventType, MetricDefinition metricDefinition, String
* alarmId
* <li>Input metric-sub-alarm-events: String eventType, MetricDefinition metricDefinition, SubAlarm
* subAlarm
* <li>Output: String alarmId, SubAlarm subAlarm
* </ul>
*
* @author Jonathan Halterman
@ -49,15 +50,15 @@ public class MetricAggregationBolt extends BaseRichBolt {
private static final Logger LOG = LoggerFactory.getLogger(MetricAggregationBolt.class);
private static final long serialVersionUID = 5624314196838090726L;
private final Map<MetricDefinition, MetricData> metricsData = new HashMap<MetricDefinition, MetricData>();
private final Multimap<String, String> compositeAlarms = ArrayListMultimap.create();
private transient AlarmDAO alarmDAO;
private final Map<MetricDefinition, SubAlarmStatsRepository> subAlarmStatsRepos = new HashMap<MetricDefinition, SubAlarmStatsRepository>();
private final Multimap<String, String> alarmSubAlarms = ArrayListMultimap.create();
private transient SubAlarmDAO subAlarmDAO;
private TopologyContext context;
private OutputCollector collector;
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("compositeAlarmId", "alarm"));
declarer.declare(new Fields("alarmId", "subAlarm"));
}
@Override
@ -68,20 +69,22 @@ public class MetricAggregationBolt extends BaseRichBolt {
if (Streams.DEFAULT_STREAM_ID.equals(tuple.getSourceStreamId())) {
Metric metric = (Metric) tuple.getValueByField("metric");
aggregateValues(metric);
} else if (EventProcessingBolt.ALARM_EVENT_STREAM_ID.equals(tuple.getSourceStreamId())) {
MetricDefinition metricDefinition = (MetricDefinition) tuple.getValue(0);
MetricData metricData = getOrCreateMetricData(metricDefinition);
} else {
MetricDefinition metricDefinition = (MetricDefinition) tuple.getValue(1);
SubAlarmStatsRepository metricData = getOrCreateSubAlarmStatsRepo(metricDefinition);
if (metricData == null)
return;
String compositeAlarmId = tuple.getString(1);
String eventType = tuple.getString(2);
String eventType = tuple.getString(0);
if (AlarmCreatedEvent.class.getSimpleName().equals(eventType)) {
AlarmSubExpression alarmSubExpression = (AlarmSubExpression) tuple.getValueByField("alarm");
handleAlarmCreated(metricData, compositeAlarmId, alarmSubExpression);
} else if (AlarmDeletedEvent.class.getSimpleName().equals(eventType)) {
handleAlarmDeleted(metricData, compositeAlarmId);
if (EventProcessingBolt.METRIC_ALARM_EVENT_STREAM_ID.equals(tuple.getSourceStreamId())) {
String alarmId = tuple.getString(2);
if (AlarmDeletedEvent.class.getSimpleName().equals(eventType))
handleAlarmDeleted(metricData, alarmId);
} else if (EventProcessingBolt.METRIC_SUB_ALARM_EVENT_STREAM_ID.equals(tuple.getSourceStreamId())) {
SubAlarm subAlarm = (SubAlarm) tuple.getValue(2);
if (AlarmCreatedEvent.class.getSimpleName().equals(eventType))
handleAlarmCreated(metricData, subAlarm);
}
}
}
@ -99,7 +102,7 @@ public class MetricAggregationBolt extends BaseRichBolt {
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.context = context;
this.collector = collector;
alarmDAO = Injector.getInstance(AlarmDAO.class);
subAlarmDAO = Injector.getInstance(SubAlarmDAO.class);
}
/**
@ -107,58 +110,56 @@ public class MetricAggregationBolt extends BaseRichBolt {
*/
void aggregateValues(Metric metric) {
LOG.debug("{} Aggregating values for {}", context.getThisTaskId(), metric);
MetricData metricData = getOrCreateMetricData(metric.definition);
if (metricData == null)
SubAlarmStatsRepository subAlarmStatsRepo = getOrCreateSubAlarmStatsRepo(metric.definition);
if (subAlarmStatsRepo == null)
return;
metricData.addValue(metric.value, metric.timestamp);
for (SubAlarmStats stats : subAlarmStatsRepo.get())
stats.getStats().addValue(metric.value, metric.timestamp);
}
void evaluateAlarmsAndAdvanceWindows() {
LOG.debug("{} Evaluating alarms and advancing windows", context.getThisTaskId());
long initialTimestamp = System.currentTimeMillis();
for (MetricData metricData : metricsData.values())
for (SubAlarmData alarmData : metricData.getAlarmData()) {
for (SubAlarmStatsRepository metricSubAlarm : subAlarmStatsRepos.values())
for (SubAlarmStats alarmData : metricSubAlarm.get()) {
LOG.debug("{} Evaluating {}", context.getThisTaskId(), alarmData.getStats());
if (alarmData.evaluate(initialTimestamp)) {
LOG.debug("Alarm state changed for {}", alarmData.getAlarm());
collector.emit(new Values(alarmData.getAlarm().getCompositeId(), alarmData.getAlarm()));
LOG.debug("Alarm state changed for {}", alarmData.getSubAlarm());
collector.emit(new Values(alarmData.getSubAlarm().getAlarmId(), alarmData.getSubAlarm()));
}
alarmData.getStats().advanceWindowTo(initialTimestamp);
}
}
MetricData getOrCreateMetricData(MetricDefinition metricDefinition) {
MetricData metricData = metricsData.get(metricDefinition);
if (metricData == null) {
List<SubAlarm> alarms = alarmDAO.find(metricDefinition);
if (alarms.isEmpty())
LOG.warn("Failed to find alarm data for {}", metricDefinition);
SubAlarmStatsRepository getOrCreateSubAlarmStatsRepo(MetricDefinition metricDefinition) {
SubAlarmStatsRepository subAlarmStatsRepo = subAlarmStatsRepos.get(metricDefinition);
if (subAlarmStatsRepo == null) {
List<SubAlarm> subAlarms = subAlarmDAO.find(metricDefinition);
if (subAlarms.isEmpty())
LOG.warn("Failed to find sub alarms for {}", metricDefinition);
else {
metricData = new MetricData(alarms);
metricsData.put(metricDefinition, metricData);
for (SubAlarm alarm : alarms)
compositeAlarms.put(alarm.getCompositeId(), alarm.getExpression().getId());
subAlarmStatsRepo = new SubAlarmStatsRepository(subAlarms);
subAlarmStatsRepos.put(metricDefinition, subAlarmStatsRepo);
for (SubAlarm subAlarm : subAlarms)
alarmSubAlarms.put(subAlarm.getAlarmId(), subAlarm.getId());
}
}
return metricData;
return subAlarmStatsRepo;
}
void handleAlarmCreated(MetricData metricData, String compositeAlarmId,
AlarmSubExpression alarmSubExpression) {
LOG.debug("{} Received AlarmCreatedEvent for composite alarm id {}, {}",
context.getThisTaskId(), compositeAlarmId, alarmSubExpression);
void handleAlarmCreated(SubAlarmStatsRepository metricData, SubAlarm subAlarm) {
LOG.debug("{} Received AlarmCreatedEvent for {}", context.getThisTaskId(), subAlarm);
long initialTimestamp = System.currentTimeMillis();
metricData.addAlarm(new SubAlarm(compositeAlarmId, alarmSubExpression), initialTimestamp);
compositeAlarms.put(compositeAlarmId, alarmSubExpression.getId());
metricData.add(subAlarm, initialTimestamp);
alarmSubAlarms.put(subAlarm.getAlarmId(), subAlarm.getId());
}
void handleAlarmDeleted(MetricData metricData, String compositeAlarmId) {
LOG.debug("{} Received AlarmDeletedEvent for composite alarm id {}", context.getThisTaskId(),
compositeAlarmId);
for (String alarmId : compositeAlarms.removeAll(compositeAlarmId))
metricData.removeAlarm(alarmId);
void handleAlarmDeleted(SubAlarmStatsRepository metricData, String alarmId) {
LOG.debug("{} Received AlarmDeletedEvent for alarm id {}", context.getThisTaskId(), alarmId);
for (String subAlarmId : alarmSubAlarms.removeAll(alarmId))
metricData.remove(subAlarmId);
}
}

View File

@ -47,8 +47,8 @@ public class SlidingWindowStats {
}
/**
* Creates a RollingWindow containing {@code numSlots} slots of size {@code slotWidthSeconds}
* starting at the {@code initialPeriod}.
* Creates a SlidingWindow containing {@code numSlots} slots of size {@code slotWidthSeconds}
* starting at the {@code initialTimestamp}.
*
* @param timescale to scale timestamps with
* @param slotWidthSeconds the width of a slot in seconds

View File

@ -0,0 +1,12 @@
<configuration debug="true" scan="true">
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%-5p [%d{ISO8601}] [%.18thread] %c: %m\n%ex
</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>

View File

@ -27,6 +27,7 @@ import com.hpcloud.maas.common.model.metric.MetricDefinition;
import com.hpcloud.maas.domain.model.Alarm;
import com.hpcloud.maas.domain.model.SubAlarm;
import com.hpcloud.maas.domain.service.AlarmDAO;
import com.hpcloud.maas.domain.service.SubAlarmDAO;
import com.hpcloud.maas.infrastructure.storm.TopologyTestCase;
import com.hpcloud.util.Injector;
@ -38,40 +39,41 @@ public class ThresholdingEngineTest extends TopologyTestCase {
private FeederSpout metricSpout;
private FeederSpout eventSpout;
private AlarmDAO alarmDAO;
private List<MetricDefinition> metricDefs;
private Map<String, Alarm> compositeAlarms;
private Map<MetricDefinition, SubAlarm> alarms;
private SubAlarmDAO subAlarmDAO;
private Map<String, Alarm> alarms;
private Map<MetricDefinition, SubAlarm> subAlarms;
public ThresholdingEngineTest() {
// Fixtures
MetricDefinition metricDef1 = new MetricDefinition("compute", "cpu", null, null);
MetricDefinition metricDef2 = new MetricDefinition("compute", "mem", null, null);
metricDefs = Arrays.asList(metricDef1, metricDef2);
AlarmExpression expression = new AlarmExpression(
"avg(compute:cpu, 2, 3) >= 90 and avg(compute:mem, 2, 3) >= 90");
SubAlarm subAlarm1 = new SubAlarm("1", expression.getSubExpressions().get(0));
SubAlarm subAlarm2 = new SubAlarm("1", expression.getSubExpressions().get(1));
alarms = new HashMap<MetricDefinition, SubAlarm>();
alarms.put(metricDefs.get(0), subAlarm1);
alarms.put(metricDefs.get(1), subAlarm2);
compositeAlarms = new HashMap<String, Alarm>();
compositeAlarms.put("1",
"count(compute:cpu, 5, 2) >= 3 and count(compute:mem, 5, 2) >= 3");
SubAlarm subAlarm1 = new SubAlarm("1", "123", expression.getSubExpressions().get(0));
SubAlarm subAlarm2 = new SubAlarm("1", "456", expression.getSubExpressions().get(1));
subAlarms = new HashMap<MetricDefinition, SubAlarm>();
subAlarms.put(subAlarm1.getExpression().getMetricDefinition(), subAlarm1);
subAlarms.put(subAlarm2.getExpression().getMetricDefinition(), subAlarm2);
alarms = new HashMap<String, Alarm>();
alarms.put("1",
new Alarm("1", "bob", "test-alarm", expression, Arrays.asList(subAlarm1, subAlarm2),
AlarmState.OK));
// Mocks
alarmDAO = mock(AlarmDAO.class);
when(alarmDAO.find(any(MetricDefinition.class))).thenAnswer(new Answer<List<SubAlarm>>() {
when(alarmDAO.findById(anyString())).thenAnswer(new Answer<Alarm>() {
@Override
public List<SubAlarm> answer(InvocationOnMock invocation) throws Throwable {
return Arrays.asList(alarms.get((MetricDefinition) invocation.getArguments()[0]));
public Alarm answer(InvocationOnMock invocation) throws Throwable {
return alarms.get((String) invocation.getArguments()[0]);
}
});
when(alarmDAO.findByCompositeId(anyString())).thenAnswer(new Answer<Alarm>() {
subAlarmDAO = mock(SubAlarmDAO.class);
when(subAlarmDAO.find(any(MetricDefinition.class))).thenAnswer(new Answer<List<SubAlarm>>() {
@Override
public Alarm answer(InvocationOnMock invocation) throws Throwable {
return compositeAlarms.get((String) invocation.getArguments()[0]);
public List<SubAlarm> answer(InvocationOnMock invocation) throws Throwable {
MetricDefinition metricDef = (MetricDefinition) invocation.getArguments()[0];
return Arrays.asList(subAlarms.get(metricDef));
}
});
@ -79,6 +81,7 @@ public class ThresholdingEngineTest extends TopologyTestCase {
Injector.registerModules(new AbstractModule() {
protected void configure() {
bind(AlarmDAO.class).toInstance(alarmDAO);
bind(SubAlarmDAO.class).toInstance(subAlarmDAO);
}
});

View File

@ -0,0 +1,5 @@
package com.hpcloud.maas.domain.model;
public class AlarmTest {
}

View File

@ -1,13 +0,0 @@
package com.hpcloud.maas.domain.model;
import org.testng.annotations.Test;
/**
* @author Jonathan Halterman
*/
@Test
public class MetricAlarmDataTest {
public void shouldEvaluateEmptySlots() {
}
}

View File

@ -28,10 +28,11 @@ import com.hpcloud.maas.common.model.alarm.AlarmState;
import com.hpcloud.maas.common.model.alarm.AlarmSubExpression;
import com.hpcloud.maas.common.model.metric.Metric;
import com.hpcloud.maas.common.model.metric.MetricDefinition;
import com.hpcloud.maas.domain.model.MetricData;
import com.hpcloud.maas.domain.model.SubAlarm;
import com.hpcloud.maas.domain.model.SubAlarmData;
import com.hpcloud.maas.domain.model.SubAlarmStats;
import com.hpcloud.maas.domain.service.AlarmDAO;
import com.hpcloud.maas.domain.service.SubAlarmDAO;
import com.hpcloud.maas.domain.service.SubAlarmStatsRepository;
import com.hpcloud.util.Injector;
/**
@ -55,15 +56,15 @@ public class MetricAggregationBoltTest {
@BeforeMethod
protected void beforeMethod() {
// Fixtures
SubAlarm subAlarm1 = new SubAlarm("1", AlarmSubExpression.of("avg(compute:cpu, 2, 3) >= 90"),
AlarmState.OK);
SubAlarm subAlarm2 = new SubAlarm("1", AlarmSubExpression.of("avg(compute:mem, 2, 3) >= 90"),
AlarmState.OK);
SubAlarm subAlarm1 = new SubAlarm("1", "123",
AlarmSubExpression.of("avg(compute:cpu, 2, 3) >= 90"), AlarmState.OK);
SubAlarm subAlarm2 = new SubAlarm("1", "456",
AlarmSubExpression.of("avg(compute:mem, 2, 3) >= 90"), AlarmState.OK);
subAlarms = new HashMap<MetricDefinition, SubAlarm>();
subAlarms.put(metricDefs.get(0), subAlarm1);
subAlarms.put(metricDefs.get(1), subAlarm2);
final AlarmDAO dao = mock(AlarmDAO.class);
final SubAlarmDAO dao = mock(SubAlarmDAO.class);
when(dao.find(any(MetricDefinition.class))).thenAnswer(new Answer<List<SubAlarm>>() {
@Override
public List<SubAlarm> answer(InvocationOnMock invocation) throws Throwable {
@ -74,7 +75,7 @@ public class MetricAggregationBoltTest {
Injector.reset();
Injector.registerModules(new AbstractModule() {
protected void configure() {
bind(AlarmDAO.class).toInstance(dao);
bind(SubAlarmDAO.class).toInstance(dao);
}
});
@ -92,10 +93,10 @@ public class MetricAggregationBoltTest {
bolt.aggregateValues(new Metric(metricDefs.get(1), 50, t1));
bolt.aggregateValues(new Metric(metricDefs.get(1), 40, t1));
SubAlarmData alarmData = bolt.getOrCreateMetricData(metricDefs.get(0)).alarmDataFor("123");
SubAlarmStats alarmData = bolt.getOrCreateSubAlarmStatsRepo(metricDefs.get(0)).get("123");
assertEquals(alarmData.getStats().getValue(t1), 90.0);
alarmData = bolt.getOrCreateMetricData(metricDefs.get(1)).alarmDataFor("456");
alarmData = bolt.getOrCreateSubAlarmStatsRepo(metricDefs.get(1)).get("456");
assertEquals(alarmData.getStats().getValue(t1), 45.0);
}
@ -125,8 +126,8 @@ public class MetricAggregationBoltTest {
}
public void shouldGetOrCreateSameMetricData() {
MetricData data = bolt.getOrCreateMetricData(metricDefs.get(0));
SubAlarmStatsRepository data = bolt.getOrCreateSubAlarmStatsRepo(metricDefs.get(0));
assertNotNull(data);
assertEquals(bolt.getOrCreateMetricData(metricDefs.get(0)), data);
assertEquals(bolt.getOrCreateSubAlarmStatsRepo(metricDefs.get(0)), data);
}
}

View File

@ -1,7 +0,0 @@
log4j.rootLogger=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.conversionPattern=%-5p [%d{ISO8601}] [%.18t] %c: %m%n
log4j.logger.backtype.storm=OFF
log4j.logger.com.hpcloud=TRACE, console

View File

@ -0,0 +1,16 @@
<configuration debug="true" scan="true">
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%-5p [%d{ISO8601}] [%.18thread] %c: %m\n%ex
</pattern>
</encoder>
</appender>
<root level="ERROR">
<appender-ref ref="STDOUT" />
</root>
<logger name="com.hpcloud">
<level value="TRACE" />
</logger>
</configuration>