diff --git a/pom.xml b/pom.xml
index b61efab..265136b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -133,7 +133,7 @@
mysql
mysql-connector-java
- 5.1.18
+ 5.1.26
diff --git a/src/main/java/com/hpcloud/mon/ThresholdingEngine.java b/src/main/java/com/hpcloud/mon/ThresholdingEngine.java
index bc88e29..632430d 100644
--- a/src/main/java/com/hpcloud/mon/ThresholdingEngine.java
+++ b/src/main/java/com/hpcloud/mon/ThresholdingEngine.java
@@ -30,6 +30,8 @@ public class ThresholdingEngine {
this.threshConfig = threshConfig;
this.topologyName = topologyName;
this.local = local;
+ LOG.info("local set to {}", local);
+
}
public static final ThresholdingConfiguration configFor(String configFileName) throws Exception {
@@ -47,6 +49,9 @@ public class ThresholdingEngine {
System.exit(1);
}
+ LOG.info("Instantiating ThresholdingEngine with config file: {}, topology: {}",
+ args[0], args[1]);
+
ThresholdingEngine engine = new ThresholdingEngine(configFor(args[0]), args[1],
args.length > 2 ? true : false);
engine.configure();
@@ -61,9 +66,12 @@ public class ThresholdingEngine {
Config config = Injector.getInstance(Config.class);
StormTopology topology = Injector.getInstance(StormTopology.class);
- if (local)
+ if (local) {
+ LOG.info("submitting topology {} to local storm cluster", topologyName);
new LocalCluster().submitTopology(topologyName, config, topology);
- else
+ } else {
+ LOG.info("submitting topology {} to non-local storm cluster", topologyName);
StormSubmitter.submitTopology(topologyName, config, topology);
+ }
}
}
diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/AlarmThresholdingBolt.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/AlarmThresholdingBolt.java
index a4ca608..ab7a85c 100644
--- a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/AlarmThresholdingBolt.java
+++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/AlarmThresholdingBolt.java
@@ -1,17 +1,10 @@
package com.hpcloud.mon.infrastructure.thresholding;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
-
import com.hpcloud.configuration.KafkaProducerConfiguration;
import com.hpcloud.mon.ThresholdingConfiguration;
import com.hpcloud.mon.common.event.AlarmDeletedEvent;
@@ -25,125 +18,131 @@ import com.hpcloud.streaming.storm.Logging;
import com.hpcloud.streaming.storm.Streams;
import com.hpcloud.util.Injector;
import com.hpcloud.util.Serialization;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
/**
* Determines whether an alarm threshold has been exceeded.
- *
- *
+ *
+ *
* Receives alarm state changes and events.
- *
+ *
*
* - Input: String alarmId, SubAlarm subAlarm
*
- Input alarm-events: String eventType, String alarmId
*
- *
+ *
* @author Jonathan Halterman
*/
public class AlarmThresholdingBolt extends BaseRichBolt {
- private static final long serialVersionUID = -4126465124017857754L;
+ private static final long serialVersionUID = -4126465124017857754L;
- private transient Logger LOG;
- private final DataSourceFactory dbConfig;
- private final KafkaProducerConfiguration kafkaConfig;
- private final Map alarms = new HashMap();
- private String alertExchange;
- private String alertRoutingKey;
- private transient AlarmDAO alarmDAO;
- private transient AlarmEventForwarder alarmEventForwarder;
- private OutputCollector collector;
+ private transient Logger LOG;
+ private final DataSourceFactory dbConfig;
+ private final KafkaProducerConfiguration kafkaConfig;
+ private final Map alarms = new HashMap();
+ private String alertExchange;
+ private String alertRoutingKey;
+ private transient AlarmDAO alarmDAO;
+ private transient AlarmEventForwarder alarmEventForwarder;
+ private OutputCollector collector;
- public AlarmThresholdingBolt(DataSourceFactory dbConfig, KafkaProducerConfiguration rabbitConfig) {
- this.dbConfig = dbConfig;
- this.kafkaConfig = rabbitConfig;
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- }
-
- @Override
- public void execute(Tuple tuple) {
- try {
- if (Streams.DEFAULT_STREAM_ID.equals(tuple.getSourceStreamId())) {
- String alarmId = tuple.getString(0);
- Alarm alarm = getOrCreateAlarm(alarmId);
- if (alarm == null)
- return;
-
- SubAlarm subAlarm = (SubAlarm) tuple.getValue(1);
- evaluateThreshold(alarm, subAlarm);
- } else if (EventProcessingBolt.ALARM_EVENT_STREAM_ID.equals(tuple.getSourceStreamId())) {
- String eventType = tuple.getString(0);
- String alarmId = tuple.getString(1);
-
- if (AlarmDeletedEvent.class.getSimpleName().equals(eventType))
- handleAlarmDeleted(alarmId);
- }
- } catch (Exception e) {
- LOG.error("Error processing tuple {}", tuple, e);
- } finally {
- collector.ack(tuple);
+ public AlarmThresholdingBolt(DataSourceFactory dbConfig, KafkaProducerConfiguration rabbitConfig) {
+ this.dbConfig = dbConfig;
+ this.kafkaConfig = rabbitConfig;
}
- }
- @Override
- @SuppressWarnings("rawtypes")
- public void prepare(Map config, TopologyContext context, OutputCollector collector) {
- LOG = LoggerFactory.getLogger(Logging.categoryFor(getClass(), context));
- LOG.info("Preparing");
- this.collector = collector;
- alertExchange = (String) config.get(ThresholdingConfiguration.ALERTS_EXCHANGE);
- alertRoutingKey = (String) config.get(ThresholdingConfiguration.ALERTS_ROUTING_KEY);
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ }
- Injector.registerIfNotBound(AlarmDAO.class, new PersistenceModule(dbConfig));
- Injector.registerIfNotBound(AlarmEventForwarder.class, new KafkaAlarmEventForwarder(kafkaConfig));
+ @Override
+ public void execute(Tuple tuple) {
- alarmDAO = Injector.getInstance(AlarmDAO.class);
- alarmEventForwarder = new KafkaAlarmEventForwarder(kafkaConfig);
- }
-
- void evaluateThreshold(Alarm alarm, SubAlarm subAlarm) {
- LOG.debug("Received state change for {}", subAlarm);
- alarm.updateSubAlarm(subAlarm);
-
- AlarmState initialState = alarm.getState();
- if (alarm.evaluate()) {
- alarmDAO.updateState(alarm.getId(), alarm.getState());
-
- if (AlarmState.ALARM.equals(alarm.getState())) {
- LOG.debug("ALARM triggered for {}", alarm);
- AlarmStateTransitionEvent event = new AlarmStateTransitionEvent(alarm.getTenantId(),
- alarm.getId(), alarm.getName(), initialState, alarm.getState(),
- alarm.getStateChangeReason(), System.currentTimeMillis() / 1000);
+ LOG.debug("tuple: {}", tuple);
try {
- alarmEventForwarder.send(alertExchange, alertRoutingKey, Serialization.toJson(event));
- } catch (Exception ignore) {
+ if (Streams.DEFAULT_STREAM_ID.equals(tuple.getSourceStreamId())) {
+ String alarmId = tuple.getString(0);
+ Alarm alarm = getOrCreateAlarm(alarmId);
+ if (alarm == null)
+ return;
+
+ SubAlarm subAlarm = (SubAlarm) tuple.getValue(1);
+ evaluateThreshold(alarm, subAlarm);
+ } else if (EventProcessingBolt.ALARM_EVENT_STREAM_ID.equals(tuple.getSourceStreamId())) {
+ String eventType = tuple.getString(0);
+ String alarmId = tuple.getString(1);
+
+ if (AlarmDeletedEvent.class.getSimpleName().equals(eventType))
+ handleAlarmDeleted(alarmId);
+ }
+ } catch (Exception e) {
+ LOG.error("Error processing tuple {}", tuple, e);
+ } finally {
+ collector.ack(tuple);
}
- } else
- LOG.debug("State changed for {}", alarm);
- }
- }
-
- void handleAlarmDeleted(String alarmId) {
- LOG.debug("Received AlarmDeletedEvent for alarm id {}", alarmId);
- alarms.remove(alarmId);
- }
-
- String buildStateChangeReason() {
- return null;
- }
-
- private Alarm getOrCreateAlarm(String alarmId) {
- Alarm alarm = alarms.get(alarmId);
- if (alarm == null) {
- alarm = alarmDAO.findById(alarmId);
- if (alarm == null)
- LOG.error("Failed to locate alarm for id {}", alarmId);
- else {
- alarms.put(alarmId, alarm);
- }
}
- return alarm;
- }
+ @Override
+ @SuppressWarnings("rawtypes")
+ public void prepare(Map config, TopologyContext context, OutputCollector collector) {
+ LOG = LoggerFactory.getLogger(Logging.categoryFor(getClass(), context));
+ LOG.info("Preparing");
+ this.collector = collector;
+ alertExchange = (String) config.get(ThresholdingConfiguration.ALERTS_EXCHANGE);
+ alertRoutingKey = (String) config.get(ThresholdingConfiguration.ALERTS_ROUTING_KEY);
+
+ Injector.registerIfNotBound(AlarmDAO.class, new PersistenceModule(dbConfig));
+
+ alarmDAO = Injector.getInstance(AlarmDAO.class);
+ alarmEventForwarder = new KafkaAlarmEventForwarder(kafkaConfig);
+ }
+
+ void evaluateThreshold(Alarm alarm, SubAlarm subAlarm) {
+ LOG.debug("Received state change for {}", subAlarm);
+ alarm.updateSubAlarm(subAlarm);
+
+ AlarmState initialState = alarm.getState();
+ if (alarm.evaluate()) {
+ alarmDAO.updateState(alarm.getId(), alarm.getState());
+
+ if (AlarmState.ALARM.equals(alarm.getState())) {
+ LOG.debug("ALARM triggered for {}", alarm);
+ AlarmStateTransitionEvent event = new AlarmStateTransitionEvent(alarm.getTenantId(),
+ alarm.getId(), alarm.getName(), initialState, alarm.getState(),
+ alarm.getStateChangeReason(), System.currentTimeMillis() / 1000);
+ try {
+ alarmEventForwarder.send(alertExchange, alertRoutingKey, Serialization.toJson(event));
+ } catch (Exception ignore) {
+ }
+ } else
+ LOG.debug("State changed for {}", alarm);
+ }
+ }
+
+ void handleAlarmDeleted(String alarmId) {
+ LOG.debug("Received AlarmDeletedEvent for alarm id {}", alarmId);
+ alarms.remove(alarmId);
+ }
+
+ String buildStateChangeReason() {
+ return null;
+ }
+
+ private Alarm getOrCreateAlarm(String alarmId) {
+ Alarm alarm = alarms.get(alarmId);
+ if (alarm == null) {
+ alarm = alarmDAO.findById(alarmId);
+ if (alarm == null)
+ LOG.error("Failed to locate alarm for id {}", alarmId);
+ else {
+ alarms.put(alarmId, alarm);
+ }
+ }
+
+ return alarm;
+ }
}
diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/KafkaAlarmEventForwarder.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/KafkaAlarmEventForwarder.java
index d47b335..30d51ae 100644
--- a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/KafkaAlarmEventForwarder.java
+++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/KafkaAlarmEventForwarder.java
@@ -1,16 +1,18 @@
package com.hpcloud.mon.infrastructure.thresholding;
-import java.util.Properties;
-
+import com.hpcloud.configuration.KafkaProducerConfiguration;
+import com.hpcloud.configuration.KafkaProducerProperties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import com.google.inject.AbstractModule;
-import com.hpcloud.configuration.KafkaProducerConfiguration;
-import com.hpcloud.configuration.KafkaProducerProperties;
+import java.util.Properties;
-public class KafkaAlarmEventForwarder extends AbstractModule implements AlarmEventForwarder {
+public class KafkaAlarmEventForwarder implements AlarmEventForwarder {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaAlarmEventForwarder.class);
private final Producer producer;
@@ -25,6 +27,8 @@ public class KafkaAlarmEventForwarder extends AbstractModule implements AlarmEve
@Override
public void send(String alertExchange, String alertRoutingKey, String json) {
+ LOG.debug("sending alertExchange: {}, alertRoutingKey: {}, json: {}", alertExchange,
+ alertRoutingKey, json);
final KeyedMessage message = new KeyedMessage(topic, alertRoutingKey, json);
producer.send(message);
}
@@ -34,7 +38,5 @@ public class KafkaAlarmEventForwarder extends AbstractModule implements AlarmEve
producer.close();
}
- @Override
- protected void configure() {
- }
+
}
diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/KafkaSpout.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/KafkaSpout.java
index b3bb2c8..028c7f9 100644
--- a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/KafkaSpout.java
+++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/KafkaSpout.java
@@ -1,25 +1,22 @@
package com.hpcloud.mon.infrastructure.thresholding;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.base.BaseRichSpout;
+import com.hpcloud.configuration.KafkaConsumerConfiguration;
+import com.hpcloud.configuration.KafkaConsumerProperties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.base.BaseRichSpout;
-
-import com.hpcloud.configuration.KafkaConsumerConfiguration;
-import com.hpcloud.configuration.KafkaConsumerProperties;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
public abstract class KafkaSpout extends BaseRichSpout {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
@@ -36,7 +33,6 @@ public abstract class KafkaSpout extends BaseRichSpout {
protected KafkaSpout(KafkaConsumerConfiguration kafkaConsumerConfig) {
this.kafkaConsumerConfig = kafkaConsumerConfig;
- LOG.info("Created");
}
@Override
@@ -63,11 +59,11 @@ public abstract class KafkaSpout extends BaseRichSpout {
@Override
public void nextTuple() {
- LOG.debug("nextTuple called");
+ LOG.debug("nextTuple called");
ConsumerIterator it = streams.get(0).iterator();
if (it.hasNext()) {
+ LOG.debug("streams iterator has next");
byte[] message = it.next().message();
- LOG.debug("Received message: " + message);
processMessage(message, collector);
}
}
diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBolt.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBolt.java
index eb8aaca..66accac 100644
--- a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBolt.java
+++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBolt.java
@@ -80,6 +80,7 @@ public class MetricAggregationBolt extends BaseRichBolt {
@Override
public void execute(Tuple tuple) {
+ LOG.debug("tuple: {}", tuple);
try {
if (Tuples.isTickTuple(tuple)) {
evaluateAlarmsAndSlideWindows();
diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBolt.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBolt.java
index 6020898..7c848eb 100644
--- a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBolt.java
+++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBolt.java
@@ -60,10 +60,12 @@ public class MetricFilteringBolt extends BaseRichBolt {
@Override
public void execute(Tuple tuple) {
+ LOG.debug("tuple: {}", tuple);
try {
if (Streams.DEFAULT_STREAM_ID.equals(tuple.getSourceStreamId())) {
MetricDefinition metricDef = (MetricDefinition) tuple.getValue(0);
+ LOG.debug("metric definition: {}", metricDef);
if (METRIC_DEFS.containsKey(metricDef))
collector.emit(tuple, tuple.getValues());
} else {
diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricSpout.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricSpout.java
index c98f48d..6eb7d96 100644
--- a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricSpout.java
+++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricSpout.java
@@ -26,6 +26,7 @@ public class MetricSpout extends KafkaSpout {
final MetricEnvelope metricEnvelope;
try {
metricEnvelope = MetricEnvelopes.fromJson(message);
+ LOG.debug("metric envelope: {}", metricEnvelope);
}
catch (RuntimeException re) {
LOG.warn("Error parsing MetricEnvelope", re);
diff --git a/src/test/resources/log4j.properties b/src/test/resources/log4j.properties
deleted file mode 100644
index de1a03c..0000000
--- a/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,8 +0,0 @@
-log4j.rootLogger=INFO, console
-
-log4j.logger.com.hpcloud.maas=DEBUG
-log4j.logger.com.hpcloud.messaging.rabbitmq=DEBUG
-
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.layout=org.apache.log4j.EnhancedPatternLayout
-log4j.appender.console.layout.conversionPattern=%-5p [%d{ISO8601}] [%.18thread] %c{1}: %m%n%throwable
\ No newline at end of file