diff --git a/pom.xml b/pom.xml index 4d4703a..0aeb33a 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ 1.0.0.41 - + 0.9.1-incubating true UTF-8 UTF-8 @@ -82,7 +82,7 @@ org.apache.storm storm-core - 0.9.1-incubating + ${storm.version} com.hpcloud @@ -219,6 +219,13 @@ + + + junit:junit + org.apache.storm:storm-core + org.hamcrest:* + + @@ -263,6 +270,34 @@ 6 + + org.apache.maven.plugins + maven-dependency-plugin + 2.8 + + + copy + package + + copy + + + + + org.apache.storm + storm-core + ${storm.version} + jar + false + + + ${project.build.directory} + false + true + + + + jdeb org.vafer @@ -282,11 +317,22 @@ /opt/mon/mon-thresh.jar + + file + ${project.build.directory}/storm-core-${storm.version}.jar + + /opt/mon/storm-core-${storm.version}.jar + file ${project.basedir}/src/deb/init/mon-thresh.conf /etc/init/mon-thresh.conf + + file + ${project.basedir}/src/main/resources/logback.xml + /etc/mon/logback.xml + file diff --git a/src/deb/init/mon-thresh.conf b/src/deb/init/mon-thresh.conf index 1673559..50564bd 100644 --- a/src/deb/init/mon-thresh.conf +++ b/src/deb/init/mon-thresh.conf @@ -15,7 +15,7 @@ script . /etc/default/mon-thresh fi - exec /usr/bin/java -Xmx8g -DLOGDIR=${LOGDIR:-/tmp} -cp /opt/mon/mon-thresh.jar com.hpcloud.mon.ThresholdingEngine /etc/mon/mon-thresh-config.yml mon-thresh local + exec /usr/bin/java -Xmx8g -Dlogback.configurationFile=/etc/mon/logback.xml -DLOGDIR=${LOGDIR:-/tmp} -cp "/opt/mon/*" com.hpcloud.mon.ThresholdingEngine /etc/mon/mon-thresh-config.yml mon-thresh local end script diff --git a/src/main/java/com/hpcloud/mon/ThresholdingEngine.java b/src/main/java/com/hpcloud/mon/ThresholdingEngine.java index 632430d..4dbf2b4 100644 --- a/src/main/java/com/hpcloud/mon/ThresholdingEngine.java +++ b/src/main/java/com/hpcloud/mon/ThresholdingEngine.java @@ -65,6 +65,7 @@ public class ThresholdingEngine { protected void run() throws Exception { Config config = Injector.getInstance(Config.class); StormTopology topology = Injector.getInstance(StormTopology.class); + config.registerSerialization(com.hpcloud.mon.domain.model.SubAlarm.class); if (local) { LOG.info("submitting topology {} to local storm cluster", topologyName); diff --git a/src/main/java/com/hpcloud/mon/TopologyModule.java b/src/main/java/com/hpcloud/mon/TopologyModule.java index 0c393c2..2fa2bed 100644 --- a/src/main/java/com/hpcloud/mon/TopologyModule.java +++ b/src/main/java/com/hpcloud/mon/TopologyModule.java @@ -10,11 +10,9 @@ import backtype.storm.tuple.Fields; import com.google.inject.AbstractModule; import com.google.inject.Provides; -import com.hpcloud.mon.infrastructure.thresholding.AlarmEventForwarder; import com.hpcloud.mon.infrastructure.thresholding.AlarmThresholdingBolt; import com.hpcloud.mon.infrastructure.thresholding.EventProcessingBolt; import com.hpcloud.mon.infrastructure.thresholding.EventSpout; -import com.hpcloud.mon.infrastructure.thresholding.KafkaAlarmEventForwarder; import com.hpcloud.mon.infrastructure.thresholding.MetricAggregationBolt; import com.hpcloud.mon.infrastructure.thresholding.MetricFilteringBolt; import com.hpcloud.mon.infrastructure.thresholding.MetricSpout; @@ -31,19 +29,17 @@ public class TopologyModule extends AbstractModule { private Config stormConfig; private IRichSpout metricSpout; private IRichSpout eventSpout; - private AlarmEventForwarder alarmEventForwarder; public TopologyModule(ThresholdingConfiguration config) { this.config = config; } public TopologyModule(ThresholdingConfiguration threshConfig, Config stormConfig, - IRichSpout metricSpout, IRichSpout eventSpout, AlarmEventForwarder alarmEventForwarder) { + IRichSpout metricSpout, IRichSpout eventSpout) { this(threshConfig); this.stormConfig = stormConfig; this.metricSpout = metricSpout; this.eventSpout = eventSpout; - this.alarmEventForwarder = alarmEventForwarder; } @Override @@ -63,11 +59,6 @@ public class TopologyModule extends AbstractModule { return stormConfig; } - @Provides - AlarmEventForwarder alarmEventForwarder() { - return alarmEventForwarder == null ? new KafkaAlarmEventForwarder(config.kafkaProducerConfig) : alarmEventForwarder; - } - @Provides @Named("metrics") IRichSpout metricSpout() { @@ -120,7 +111,7 @@ public class TopologyModule extends AbstractModule { // Aggregation / Event -> Thresholding builder.setBolt("thresholding-bolt", - new AlarmThresholdingBolt(config.database), + new AlarmThresholdingBolt(config.database, config.kafkaProducerConfig), config.thresholdingBoltThreads) .fieldsGrouping("aggregation-bolt", new Fields(MetricAggregationBolt.FIELDS[0])) .fieldsGrouping("event-bolt", EventProcessingBolt.ALARM_EVENT_STREAM_ID, diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/AlarmThresholdingBolt.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/AlarmThresholdingBolt.java index f3af002..409e9ee 100644 --- a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/AlarmThresholdingBolt.java +++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/AlarmThresholdingBolt.java @@ -12,6 +12,7 @@ import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; +import com.hpcloud.configuration.KafkaProducerConfiguration; import com.hpcloud.mon.ThresholdingConfiguration; import com.hpcloud.mon.common.event.AlarmStateTransitionedEvent; import com.hpcloud.mon.common.event.AlarmUpdatedEvent; @@ -44,6 +45,7 @@ public class AlarmThresholdingBolt extends BaseRichBolt { private transient Logger LOG; private DataSourceFactory dbConfig; + private KafkaProducerConfiguration producerConfiguration; final Map alarms = new HashMap(); private String alertExchange; private String alertRoutingKey; @@ -51,8 +53,10 @@ public class AlarmThresholdingBolt extends BaseRichBolt { private transient AlarmEventForwarder alarmEventForwarder; private OutputCollector collector; - public AlarmThresholdingBolt(DataSourceFactory dbConfig) { + public AlarmThresholdingBolt(DataSourceFactory dbConfig, + KafkaProducerConfiguration producerConfig) { this.dbConfig = dbConfig; + this.producerConfiguration = producerConfig; } public AlarmThresholdingBolt(final AlarmDAO alarmDAO, @@ -108,6 +112,7 @@ public class AlarmThresholdingBolt extends BaseRichBolt { alarmDAO = Injector.getInstance(AlarmDAO.class); } if (alarmEventForwarder == null) { + Injector.registerIfNotBound(AlarmEventForwarder.class, new ProducerModule(this.producerConfiguration)); alarmEventForwarder = Injector.getInstance(AlarmEventForwarder.class); } } diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/ProducerModule.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/ProducerModule.java new file mode 100644 index 0000000..5ca5565 --- /dev/null +++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/ProducerModule.java @@ -0,0 +1,27 @@ +package com.hpcloud.mon.infrastructure.thresholding; + +import com.google.inject.AbstractModule; +import com.google.inject.Provides; +import com.hpcloud.configuration.KafkaProducerConfiguration; + +public class ProducerModule extends AbstractModule { + private KafkaProducerConfiguration config; + private AlarmEventForwarder alarmEventForwarder; + + @Override + protected void configure() { + } + + public ProducerModule(KafkaProducerConfiguration config) { + this.config = config; + } + + public ProducerModule(AlarmEventForwarder alarmEventForwarder) { + this.alarmEventForwarder = alarmEventForwarder; + } + + @Provides + AlarmEventForwarder alarmEventForwarder() { + return alarmEventForwarder == null ? new KafkaAlarmEventForwarder(config) : alarmEventForwarder; + } +} diff --git a/src/main/resources/storm.yaml b/src/main/resources/storm.yaml deleted file mode 100644 index caec556..0000000 --- a/src/main/resources/storm.yaml +++ /dev/null @@ -1,3 +0,0 @@ -topology.fall.back.on.java.serialization: true -topology.kryo.register: - - com.hpcloud.mon.domain.model.SubAlarm diff --git a/src/test/java/com/hpcloud/mon/ThresholdingEngineAlarmTest.java b/src/test/java/com/hpcloud/mon/ThresholdingEngineAlarmTest.java index e444b10..951c2e7 100644 --- a/src/test/java/com/hpcloud/mon/ThresholdingEngineAlarmTest.java +++ b/src/test/java/com/hpcloud/mon/ThresholdingEngineAlarmTest.java @@ -46,6 +46,7 @@ import com.hpcloud.mon.infrastructure.thresholding.AlarmEventForwarder; import com.hpcloud.mon.infrastructure.thresholding.EventProcessingBoltTest; import com.hpcloud.mon.infrastructure.thresholding.MetricAggregationBolt; import com.hpcloud.mon.infrastructure.thresholding.MetricSpout; +import com.hpcloud.mon.infrastructure.thresholding.ProducerModule; import com.hpcloud.streaming.storm.TopologyTestCase; import com.hpcloud.util.Injector; import com.hpcloud.util.Serialization; @@ -128,7 +129,8 @@ public class ThresholdingEngineAlarmTest extends TopologyTestCase { eventSpout = new FeederSpout(new Fields("event")); alarmEventForwarder = mock(AlarmEventForwarder.class); Injector.registerModules(new TopologyModule(threshConfig, stormConfig, - metricSpout, eventSpout, alarmEventForwarder)); + metricSpout, eventSpout)); + Injector.registerModules(new ProducerModule(alarmEventForwarder)); // Evaluate alarm stats every 1 seconds System.setProperty(MetricAggregationBolt.TICK_TUPLE_SECONDS_KEY, "5"); diff --git a/src/test/java/com/hpcloud/mon/ThresholdingEngineTest.java b/src/test/java/com/hpcloud/mon/ThresholdingEngineTest.java index 7a0ac6a..9aef72e 100644 --- a/src/test/java/com/hpcloud/mon/ThresholdingEngineTest.java +++ b/src/test/java/com/hpcloud/mon/ThresholdingEngineTest.java @@ -40,6 +40,7 @@ import com.hpcloud.mon.domain.service.SubAlarmDAO; import com.hpcloud.mon.domain.service.SubAlarmMetricDefinition; import com.hpcloud.mon.infrastructure.thresholding.AlarmEventForwarder; import com.hpcloud.mon.infrastructure.thresholding.MetricSpout; +import com.hpcloud.mon.infrastructure.thresholding.ProducerModule; import com.hpcloud.streaming.storm.TopologyTestCase; import com.hpcloud.util.Injector; import com.hpcloud.util.Serialization; @@ -134,7 +135,8 @@ public class ThresholdingEngineTest extends TopologyTestCase { eventSpout = new FeederSpout(new Fields("event")); alarmEventForwarder = mock(AlarmEventForwarder.class); Injector.registerModules(new TopologyModule(threshConfig, stormConfig, - metricSpout, eventSpout, alarmEventForwarder)); + metricSpout, eventSpout)); + Injector.registerModules(new ProducerModule(alarmEventForwarder)); } private List subAlarmsFor(AlarmExpression expression) { diff --git a/src/test/java/com/hpcloud/mon/ThresholdingEngineTest1.java b/src/test/java/com/hpcloud/mon/ThresholdingEngineTest1.java index 4995f5d..236dcf8 100644 --- a/src/test/java/com/hpcloud/mon/ThresholdingEngineTest1.java +++ b/src/test/java/com/hpcloud/mon/ThresholdingEngineTest1.java @@ -37,6 +37,7 @@ import com.hpcloud.mon.domain.service.SubAlarmMetricDefinition; import com.hpcloud.mon.infrastructure.thresholding.AlarmEventForwarder; import com.hpcloud.mon.infrastructure.thresholding.MetricAggregationBolt; import com.hpcloud.mon.infrastructure.thresholding.MetricSpout; +import com.hpcloud.mon.infrastructure.thresholding.ProducerModule; import com.hpcloud.streaming.storm.TopologyTestCase; import com.hpcloud.util.Injector; @@ -135,7 +136,8 @@ public class ThresholdingEngineTest1 extends TopologyTestCase { final AlarmEventForwarder alarmEventForwarder = mock(AlarmEventForwarder.class); Injector.registerModules(new TopologyModule(threshConfig, stormConfig, - metricSpout, eventSpout, alarmEventForwarder)); + metricSpout, eventSpout)); + Injector.registerModules(new ProducerModule(alarmEventForwarder)); // Evaluate alarm stats every 1 seconds System.setProperty(MetricAggregationBolt.TICK_TUPLE_SECONDS_KEY, "1");