From 989edb006e483b14ebee9cf21a6e39d2a991cb35 Mon Sep 17 00:00:00 2001 From: Deklan Dieterly Date: Thu, 20 Mar 2014 07:42:05 -0600 Subject: [PATCH] Added logging. Changed version of mysql-connector. Removed unnecessary injection. Removed log4j.properties file. --- pom.xml | 2 +- .../com/hpcloud/mon/ThresholdingEngine.java | 12 +- .../thresholding/AlarmThresholdingBolt.java | 213 +++++++++--------- .../KafkaAlarmEventForwarder.java | 20 +- .../thresholding/KafkaSpout.java | 26 +-- .../thresholding/MetricAggregationBolt.java | 1 + .../thresholding/MetricFilteringBolt.java | 2 + .../thresholding/MetricSpout.java | 1 + src/test/resources/log4j.properties | 8 - 9 files changed, 143 insertions(+), 142 deletions(-) delete mode 100644 src/test/resources/log4j.properties diff --git a/pom.xml b/pom.xml index b61efab..265136b 100644 --- a/pom.xml +++ b/pom.xml @@ -133,7 +133,7 @@ mysql mysql-connector-java - 5.1.18 + 5.1.26 diff --git a/src/main/java/com/hpcloud/mon/ThresholdingEngine.java b/src/main/java/com/hpcloud/mon/ThresholdingEngine.java index bc88e29..632430d 100644 --- a/src/main/java/com/hpcloud/mon/ThresholdingEngine.java +++ b/src/main/java/com/hpcloud/mon/ThresholdingEngine.java @@ -30,6 +30,8 @@ public class ThresholdingEngine { this.threshConfig = threshConfig; this.topologyName = topologyName; this.local = local; + LOG.info("local set to {}", local); + } public static final ThresholdingConfiguration configFor(String configFileName) throws Exception { @@ -47,6 +49,9 @@ public class ThresholdingEngine { System.exit(1); } + LOG.info("Instantiating ThresholdingEngine with config file: {}, topology: {}", + args[0], args[1]); + ThresholdingEngine engine = new ThresholdingEngine(configFor(args[0]), args[1], args.length > 2 ? true : false); engine.configure(); @@ -61,9 +66,12 @@ public class ThresholdingEngine { Config config = Injector.getInstance(Config.class); StormTopology topology = Injector.getInstance(StormTopology.class); - if (local) + if (local) { + LOG.info("submitting topology {} to local storm cluster", topologyName); new LocalCluster().submitTopology(topologyName, config, topology); - else + } else { + LOG.info("submitting topology {} to non-local storm cluster", topologyName); StormSubmitter.submitTopology(topologyName, config, topology); + } } } diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/AlarmThresholdingBolt.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/AlarmThresholdingBolt.java index a4ca608..ab7a85c 100644 --- a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/AlarmThresholdingBolt.java +++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/AlarmThresholdingBolt.java @@ -1,17 +1,10 @@ package com.hpcloud.mon.infrastructure.thresholding; -import java.util.HashMap; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; - import com.hpcloud.configuration.KafkaProducerConfiguration; import com.hpcloud.mon.ThresholdingConfiguration; import com.hpcloud.mon.common.event.AlarmDeletedEvent; @@ -25,125 +18,131 @@ import com.hpcloud.streaming.storm.Logging; import com.hpcloud.streaming.storm.Streams; import com.hpcloud.util.Injector; import com.hpcloud.util.Serialization; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; /** * Determines whether an alarm threshold has been exceeded. - * - *

+ *

+ *

* Receives alarm state changes and events. - * + *

*

- * + * * @author Jonathan Halterman */ public class AlarmThresholdingBolt extends BaseRichBolt { - private static final long serialVersionUID = -4126465124017857754L; + private static final long serialVersionUID = -4126465124017857754L; - private transient Logger LOG; - private final DataSourceFactory dbConfig; - private final KafkaProducerConfiguration kafkaConfig; - private final Map alarms = new HashMap(); - private String alertExchange; - private String alertRoutingKey; - private transient AlarmDAO alarmDAO; - private transient AlarmEventForwarder alarmEventForwarder; - private OutputCollector collector; + private transient Logger LOG; + private final DataSourceFactory dbConfig; + private final KafkaProducerConfiguration kafkaConfig; + private final Map alarms = new HashMap(); + private String alertExchange; + private String alertRoutingKey; + private transient AlarmDAO alarmDAO; + private transient AlarmEventForwarder alarmEventForwarder; + private OutputCollector collector; - public AlarmThresholdingBolt(DataSourceFactory dbConfig, KafkaProducerConfiguration rabbitConfig) { - this.dbConfig = dbConfig; - this.kafkaConfig = rabbitConfig; - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - } - - @Override - public void execute(Tuple tuple) { - try { - if (Streams.DEFAULT_STREAM_ID.equals(tuple.getSourceStreamId())) { - String alarmId = tuple.getString(0); - Alarm alarm = getOrCreateAlarm(alarmId); - if (alarm == null) - return; - - SubAlarm subAlarm = (SubAlarm) tuple.getValue(1); - evaluateThreshold(alarm, subAlarm); - } else if (EventProcessingBolt.ALARM_EVENT_STREAM_ID.equals(tuple.getSourceStreamId())) { - String eventType = tuple.getString(0); - String alarmId = tuple.getString(1); - - if (AlarmDeletedEvent.class.getSimpleName().equals(eventType)) - handleAlarmDeleted(alarmId); - } - } catch (Exception e) { - LOG.error("Error processing tuple {}", tuple, e); - } finally { - collector.ack(tuple); + public AlarmThresholdingBolt(DataSourceFactory dbConfig, KafkaProducerConfiguration rabbitConfig) { + this.dbConfig = dbConfig; + this.kafkaConfig = rabbitConfig; } - } - @Override - @SuppressWarnings("rawtypes") - public void prepare(Map config, TopologyContext context, OutputCollector collector) { - LOG = LoggerFactory.getLogger(Logging.categoryFor(getClass(), context)); - LOG.info("Preparing"); - this.collector = collector; - alertExchange = (String) config.get(ThresholdingConfiguration.ALERTS_EXCHANGE); - alertRoutingKey = (String) config.get(ThresholdingConfiguration.ALERTS_ROUTING_KEY); + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + } - Injector.registerIfNotBound(AlarmDAO.class, new PersistenceModule(dbConfig)); - Injector.registerIfNotBound(AlarmEventForwarder.class, new KafkaAlarmEventForwarder(kafkaConfig)); + @Override + public void execute(Tuple tuple) { - alarmDAO = Injector.getInstance(AlarmDAO.class); - alarmEventForwarder = new KafkaAlarmEventForwarder(kafkaConfig); - } - - void evaluateThreshold(Alarm alarm, SubAlarm subAlarm) { - LOG.debug("Received state change for {}", subAlarm); - alarm.updateSubAlarm(subAlarm); - - AlarmState initialState = alarm.getState(); - if (alarm.evaluate()) { - alarmDAO.updateState(alarm.getId(), alarm.getState()); - - if (AlarmState.ALARM.equals(alarm.getState())) { - LOG.debug("ALARM triggered for {}", alarm); - AlarmStateTransitionEvent event = new AlarmStateTransitionEvent(alarm.getTenantId(), - alarm.getId(), alarm.getName(), initialState, alarm.getState(), - alarm.getStateChangeReason(), System.currentTimeMillis() / 1000); + LOG.debug("tuple: {}", tuple); try { - alarmEventForwarder.send(alertExchange, alertRoutingKey, Serialization.toJson(event)); - } catch (Exception ignore) { + if (Streams.DEFAULT_STREAM_ID.equals(tuple.getSourceStreamId())) { + String alarmId = tuple.getString(0); + Alarm alarm = getOrCreateAlarm(alarmId); + if (alarm == null) + return; + + SubAlarm subAlarm = (SubAlarm) tuple.getValue(1); + evaluateThreshold(alarm, subAlarm); + } else if (EventProcessingBolt.ALARM_EVENT_STREAM_ID.equals(tuple.getSourceStreamId())) { + String eventType = tuple.getString(0); + String alarmId = tuple.getString(1); + + if (AlarmDeletedEvent.class.getSimpleName().equals(eventType)) + handleAlarmDeleted(alarmId); + } + } catch (Exception e) { + LOG.error("Error processing tuple {}", tuple, e); + } finally { + collector.ack(tuple); } - } else - LOG.debug("State changed for {}", alarm); - } - } - - void handleAlarmDeleted(String alarmId) { - LOG.debug("Received AlarmDeletedEvent for alarm id {}", alarmId); - alarms.remove(alarmId); - } - - String buildStateChangeReason() { - return null; - } - - private Alarm getOrCreateAlarm(String alarmId) { - Alarm alarm = alarms.get(alarmId); - if (alarm == null) { - alarm = alarmDAO.findById(alarmId); - if (alarm == null) - LOG.error("Failed to locate alarm for id {}", alarmId); - else { - alarms.put(alarmId, alarm); - } } - return alarm; - } + @Override + @SuppressWarnings("rawtypes") + public void prepare(Map config, TopologyContext context, OutputCollector collector) { + LOG = LoggerFactory.getLogger(Logging.categoryFor(getClass(), context)); + LOG.info("Preparing"); + this.collector = collector; + alertExchange = (String) config.get(ThresholdingConfiguration.ALERTS_EXCHANGE); + alertRoutingKey = (String) config.get(ThresholdingConfiguration.ALERTS_ROUTING_KEY); + + Injector.registerIfNotBound(AlarmDAO.class, new PersistenceModule(dbConfig)); + + alarmDAO = Injector.getInstance(AlarmDAO.class); + alarmEventForwarder = new KafkaAlarmEventForwarder(kafkaConfig); + } + + void evaluateThreshold(Alarm alarm, SubAlarm subAlarm) { + LOG.debug("Received state change for {}", subAlarm); + alarm.updateSubAlarm(subAlarm); + + AlarmState initialState = alarm.getState(); + if (alarm.evaluate()) { + alarmDAO.updateState(alarm.getId(), alarm.getState()); + + if (AlarmState.ALARM.equals(alarm.getState())) { + LOG.debug("ALARM triggered for {}", alarm); + AlarmStateTransitionEvent event = new AlarmStateTransitionEvent(alarm.getTenantId(), + alarm.getId(), alarm.getName(), initialState, alarm.getState(), + alarm.getStateChangeReason(), System.currentTimeMillis() / 1000); + try { + alarmEventForwarder.send(alertExchange, alertRoutingKey, Serialization.toJson(event)); + } catch (Exception ignore) { + } + } else + LOG.debug("State changed for {}", alarm); + } + } + + void handleAlarmDeleted(String alarmId) { + LOG.debug("Received AlarmDeletedEvent for alarm id {}", alarmId); + alarms.remove(alarmId); + } + + String buildStateChangeReason() { + return null; + } + + private Alarm getOrCreateAlarm(String alarmId) { + Alarm alarm = alarms.get(alarmId); + if (alarm == null) { + alarm = alarmDAO.findById(alarmId); + if (alarm == null) + LOG.error("Failed to locate alarm for id {}", alarmId); + else { + alarms.put(alarmId, alarm); + } + } + + return alarm; + } } diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/KafkaAlarmEventForwarder.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/KafkaAlarmEventForwarder.java index d47b335..30d51ae 100644 --- a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/KafkaAlarmEventForwarder.java +++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/KafkaAlarmEventForwarder.java @@ -1,16 +1,18 @@ package com.hpcloud.mon.infrastructure.thresholding; -import java.util.Properties; - +import com.hpcloud.configuration.KafkaProducerConfiguration; +import com.hpcloud.configuration.KafkaProducerProperties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import com.google.inject.AbstractModule; -import com.hpcloud.configuration.KafkaProducerConfiguration; -import com.hpcloud.configuration.KafkaProducerProperties; +import java.util.Properties; -public class KafkaAlarmEventForwarder extends AbstractModule implements AlarmEventForwarder { +public class KafkaAlarmEventForwarder implements AlarmEventForwarder { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaAlarmEventForwarder.class); private final Producer producer; @@ -25,6 +27,8 @@ public class KafkaAlarmEventForwarder extends AbstractModule implements AlarmEve @Override public void send(String alertExchange, String alertRoutingKey, String json) { + LOG.debug("sending alertExchange: {}, alertRoutingKey: {}, json: {}", alertExchange, + alertRoutingKey, json); final KeyedMessage message = new KeyedMessage(topic, alertRoutingKey, json); producer.send(message); } @@ -34,7 +38,5 @@ public class KafkaAlarmEventForwarder extends AbstractModule implements AlarmEve producer.close(); } - @Override - protected void configure() { - } + } diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/KafkaSpout.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/KafkaSpout.java index b3bb2c8..028c7f9 100644 --- a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/KafkaSpout.java +++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/KafkaSpout.java @@ -1,25 +1,22 @@ package com.hpcloud.mon.infrastructure.thresholding; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; - +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.base.BaseRichSpout; +import com.hpcloud.configuration.KafkaConsumerConfiguration; +import com.hpcloud.configuration.KafkaConsumerProperties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.base.BaseRichSpout; - -import com.hpcloud.configuration.KafkaConsumerConfiguration; -import com.hpcloud.configuration.KafkaConsumerProperties; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; public abstract class KafkaSpout extends BaseRichSpout { private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class); @@ -36,7 +33,6 @@ public abstract class KafkaSpout extends BaseRichSpout { protected KafkaSpout(KafkaConsumerConfiguration kafkaConsumerConfig) { this.kafkaConsumerConfig = kafkaConsumerConfig; - LOG.info("Created"); } @Override @@ -63,11 +59,11 @@ public abstract class KafkaSpout extends BaseRichSpout { @Override public void nextTuple() { - LOG.debug("nextTuple called"); + LOG.debug("nextTuple called"); ConsumerIterator it = streams.get(0).iterator(); if (it.hasNext()) { + LOG.debug("streams iterator has next"); byte[] message = it.next().message(); - LOG.debug("Received message: " + message); processMessage(message, collector); } } diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBolt.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBolt.java index eb8aaca..66accac 100644 --- a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBolt.java +++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricAggregationBolt.java @@ -80,6 +80,7 @@ public class MetricAggregationBolt extends BaseRichBolt { @Override public void execute(Tuple tuple) { + LOG.debug("tuple: {}", tuple); try { if (Tuples.isTickTuple(tuple)) { evaluateAlarmsAndSlideWindows(); diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBolt.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBolt.java index 6020898..7c848eb 100644 --- a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBolt.java +++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricFilteringBolt.java @@ -60,10 +60,12 @@ public class MetricFilteringBolt extends BaseRichBolt { @Override public void execute(Tuple tuple) { + LOG.debug("tuple: {}", tuple); try { if (Streams.DEFAULT_STREAM_ID.equals(tuple.getSourceStreamId())) { MetricDefinition metricDef = (MetricDefinition) tuple.getValue(0); + LOG.debug("metric definition: {}", metricDef); if (METRIC_DEFS.containsKey(metricDef)) collector.emit(tuple, tuple.getValues()); } else { diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricSpout.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricSpout.java index c98f48d..6eb7d96 100644 --- a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricSpout.java +++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricSpout.java @@ -26,6 +26,7 @@ public class MetricSpout extends KafkaSpout { final MetricEnvelope metricEnvelope; try { metricEnvelope = MetricEnvelopes.fromJson(message); + LOG.debug("metric envelope: {}", metricEnvelope); } catch (RuntimeException re) { LOG.warn("Error parsing MetricEnvelope", re); diff --git a/src/test/resources/log4j.properties b/src/test/resources/log4j.properties deleted file mode 100644 index de1a03c..0000000 --- a/src/test/resources/log4j.properties +++ /dev/null @@ -1,8 +0,0 @@ -log4j.rootLogger=INFO, console - -log4j.logger.com.hpcloud.maas=DEBUG -log4j.logger.com.hpcloud.messaging.rabbitmq=DEBUG - -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.layout=org.apache.log4j.EnhancedPatternLayout -log4j.appender.console.layout.conversionPattern=%-5p [%d{ISO8601}] [%.18thread] %c{1}: %m%n%throwable \ No newline at end of file