diff --git a/pom.xml b/pom.xml index 242cfcc..b61efab 100644 --- a/pom.xml +++ b/pom.xml @@ -69,12 +69,12 @@ org.slf4j slf4j-api - 1.7.2 + 1.7.6 org.slf4j slf4j-log4j12 - 1.7.2 + 1.7.6 diff --git a/src/main/java/com/hpcloud/mon/ThresholdingEngine.java b/src/main/java/com/hpcloud/mon/ThresholdingEngine.java index e852372..bc88e29 100644 --- a/src/main/java/com/hpcloud/mon/ThresholdingEngine.java +++ b/src/main/java/com/hpcloud/mon/ThresholdingEngine.java @@ -1,65 +1,69 @@ package com.hpcloud.mon; -import java.io.File; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.StormTopology; - +import ch.qos.logback.classic.LoggerContext; +import ch.qos.logback.core.util.StatusPrinter; import com.hpcloud.util.Injector; import com.hpcloud.util.config.ConfigurationFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; /** * Alarm thresholding engine. - * + * * @author Jonathan Halterman */ public class ThresholdingEngine { - private static final Logger LOG = LoggerFactory.getLogger(ThresholdingEngine.class); + private static final Logger LOG = LoggerFactory.getLogger(ThresholdingEngine.class); - private final ThresholdingConfiguration threshConfig; - private final String topologyName; - private final boolean local; + private final ThresholdingConfiguration threshConfig; + private final String topologyName; + private final boolean local; - public ThresholdingEngine(ThresholdingConfiguration threshConfig, String topologyName, - boolean local) { - this.threshConfig = threshConfig; - this.topologyName = topologyName; - this.local = local; - } - - public static final ThresholdingConfiguration configFor(String configFileName) throws Exception { - return ConfigurationFactory.forClass(ThresholdingConfiguration.class) - .build(new File(configFileName)); - } - - public static void main(String... args) throws Exception { - if (args.length < 2) { - LOG.error("Expected configuration file name and topology name arguments"); - System.exit(1); + public ThresholdingEngine(ThresholdingConfiguration threshConfig, String topologyName, + boolean local) { + this.threshConfig = threshConfig; + this.topologyName = topologyName; + this.local = local; } - ThresholdingEngine engine = new ThresholdingEngine(configFor(args[0]), args[1], - args.length > 2 ? true : false); - engine.configure(); - engine.run(); - } + public static final ThresholdingConfiguration configFor(String configFileName) throws Exception { + return ConfigurationFactory.forClass(ThresholdingConfiguration.class) + .build(new File(configFileName)); + } - protected void configure() { - Injector.registerModules(new TopologyModule(threshConfig)); - } + public static void main(String... args) throws Exception { - protected void run() throws Exception { - Config config = Injector.getInstance(Config.class); - StormTopology topology = Injector.getInstance(StormTopology.class); + // Let's show the logging status. + StatusPrinter.print((LoggerContext) LoggerFactory.getILoggerFactory()); - if (local) - new LocalCluster().submitTopology(topologyName, config, topology); - else - StormSubmitter.submitTopology(topologyName, config, topology); - } + if (args.length < 2) { + LOG.error("Expected configuration file name and topology name arguments"); + System.exit(1); + } + + ThresholdingEngine engine = new ThresholdingEngine(configFor(args[0]), args[1], + args.length > 2 ? true : false); + engine.configure(); + engine.run(); + } + + protected void configure() { + Injector.registerModules(new TopologyModule(threshConfig)); + } + + protected void run() throws Exception { + Config config = Injector.getInstance(Config.class); + StormTopology topology = Injector.getInstance(StormTopology.class); + + if (local) + new LocalCluster().submitTopology(topologyName, config, topology); + else + StormSubmitter.submitTopology(topologyName, config, topology); + } } diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/KafkaSpout.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/KafkaSpout.java index b13f50a..b3bb2c8 100644 --- a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/KafkaSpout.java +++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/KafkaSpout.java @@ -63,10 +63,12 @@ public abstract class KafkaSpout extends BaseRichSpout { @Override public void nextTuple() { - + LOG.debug("nextTuple called"); ConsumerIterator it = streams.get(0).iterator(); if (it.hasNext()) { - processMessage(it.next().message(), collector); + byte[] message = it.next().message(); + LOG.debug("Received message: " + message); + processMessage(message, collector); } } diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml new file mode 100644 index 0000000..f3433e7 --- /dev/null +++ b/src/main/resources/logback.xml @@ -0,0 +1,26 @@ + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + mon-thresh.log + + + %date %level [%thread] %logger{10} [%file:%line] %msg%n + + + + + + + + + + + \ No newline at end of file