diff --git a/pom.xml b/pom.xml
index e85e2da..9253ecf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -84,7 +84,6 @@
storm
storm
0.9.0.1
- provided
com.hpcloud
@@ -116,6 +115,11 @@
mon-streaming
${mon.common.version}
+
+ com.hpcloud
+ mon-kafka
+ ${mon.common.version}
+
io.dropwizard
dropwizard-db
diff --git a/src/main/java/com/hpcloud/mon/EventSpoutConfig.java b/src/main/java/com/hpcloud/mon/EventSpoutConfig.java
new file mode 100644
index 0000000..0c6410b
--- /dev/null
+++ b/src/main/java/com/hpcloud/mon/EventSpoutConfig.java
@@ -0,0 +1,8 @@
+package com.hpcloud.mon;
+
+import com.hpcloud.configuration.KafkaConsumerConfiguration;
+
+public class EventSpoutConfig {
+ public KafkaConsumerConfiguration kafkaConsumerConfiguration;
+
+}
diff --git a/src/main/java/com/hpcloud/mon/KafkaConsumerConfiguration.java b/src/main/java/com/hpcloud/mon/KafkaConsumerConfiguration.java
deleted file mode 100644
index 2bce962..0000000
--- a/src/main/java/com/hpcloud/mon/KafkaConsumerConfiguration.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package com.hpcloud.mon;
-
-public class KafkaConsumerConfiguration {
-
-}
diff --git a/src/main/java/com/hpcloud/mon/MetricSpoutConfig.java b/src/main/java/com/hpcloud/mon/MetricSpoutConfig.java
new file mode 100644
index 0000000..d3fed7a
--- /dev/null
+++ b/src/main/java/com/hpcloud/mon/MetricSpoutConfig.java
@@ -0,0 +1,8 @@
+package com.hpcloud.mon;
+
+import com.hpcloud.configuration.*;
+
+public class MetricSpoutConfig {
+
+ public KafkaConsumerConfiguration kafkaConsumerConfiguration;
+}
diff --git a/src/main/java/com/hpcloud/mon/ThresholdingConfiguration.java b/src/main/java/com/hpcloud/mon/ThresholdingConfiguration.java
index d92bcd3..0a66503 100644
--- a/src/main/java/com/hpcloud/mon/ThresholdingConfiguration.java
+++ b/src/main/java/com/hpcloud/mon/ThresholdingConfiguration.java
@@ -1,5 +1,6 @@
package com.hpcloud.mon;
+import com.hpcloud.configuration.KafkaConsumerConfiguration;
import io.dropwizard.db.DataSourceFactory;
import java.util.Set;
@@ -23,8 +24,8 @@ public class ThresholdingConfiguration {
/** Total number of acker threads across the cluster. */
@NotNull public Integer numAckerThreads = 12;
- @NotNull public Integer maasMetricSpoutThreads = 6;
- @NotNull public Integer maasMetricSpoutTasks = 6;
+ @NotNull public Integer metricSpoutThreads = 6;
+ @NotNull public Integer metricSpoutTasks = 6;
@NotNull public Integer eventSpoutThreads = 3;
@NotNull public Integer eventSpoutTasks = 3;
@@ -44,10 +45,10 @@ public class ThresholdingConfiguration {
/** Namespaces for which metrics are received sporadically. */
@NotNull public Set sporadicMetricNamespaces;
- /** Configuration for the spout that receives MaaS metrics from the external exchange. */
- @Valid @NotNull public KafkaConsumerConfiguration maasMetricSpout;
+ /** Configuration for the spout that receives metrics from the external exchange. */
+ @Valid @NotNull public MetricSpoutConfig metricSpoutConfig;
/** Configuration for the spout that receives MaaS events from the external exchange. */
- @Valid @NotNull public KafkaConsumerConfiguration eventSpout;
+ @Valid @NotNull public EventSpoutConfig eventSpoutConfig;
/** Configuration for publishing to the alerts exchange on the external server. */
@NotEmpty public String alertsExchange = "alerts";
diff --git a/src/main/java/com/hpcloud/mon/TopologyModule.java b/src/main/java/com/hpcloud/mon/TopologyModule.java
index 514467f..35efbb1 100644
--- a/src/main/java/com/hpcloud/mon/TopologyModule.java
+++ b/src/main/java/com/hpcloud/mon/TopologyModule.java
@@ -17,7 +17,7 @@ import com.hpcloud.mon.infrastructure.thresholding.MetricAggregationBolt;
import com.hpcloud.mon.infrastructure.thresholding.MetricFilteringBolt;
import com.hpcloud.mon.infrastructure.thresholding.MetricSpout;
import com.hpcloud.mon.infrastructure.thresholding.deserializer.MaasEventDeserializer;
-import com.hpcloud.mon.infrastructure.thresholding.deserializer.MaasMetricDeserializer;
+import com.hpcloud.mon.infrastructure.thresholding.deserializer.MetricDeserializer;
import com.hpcloud.util.Injector;
/**
@@ -29,7 +29,7 @@ public class TopologyModule extends AbstractModule {
private final ThresholdingConfiguration config;
private Config stormConfig;
private IRichSpout collectdMetricSpout;
- private IRichSpout maasMetricSpout;
+ private IRichSpout metricSpout;
private IRichSpout eventSpout;
public TopologyModule(ThresholdingConfiguration config) {
@@ -37,11 +37,11 @@ public class TopologyModule extends AbstractModule {
}
public TopologyModule(ThresholdingConfiguration threshConfig, Config stormConfig,
- IRichSpout collectdMetricSpout, IRichSpout maasMetricSpout, IRichSpout eventSpout) {
+ IRichSpout collectdMetricSpout, IRichSpout metricSpout, IRichSpout eventSpout) {
this(threshConfig);
this.stormConfig = stormConfig;
this.collectdMetricSpout = collectdMetricSpout;
- this.maasMetricSpout = maasMetricSpout;
+ this.metricSpout = metricSpout;
this.eventSpout = eventSpout;
}
@@ -63,16 +63,16 @@ public class TopologyModule extends AbstractModule {
}
@Provides
- @Named("maas-metrics")
- IRichSpout maasMetricSpout() {
- return maasMetricSpout == null ? new MetricSpout(config.maasMetricSpout,
- new MaasMetricDeserializer()) : maasMetricSpout;
+ @Named("metrics")
+ IRichSpout metricSpout() {
+ return metricSpout == null ? new MetricSpout(config.metricSpoutConfig,
+ new MetricDeserializer()) : metricSpout;
}
@Provides
@Named("event")
IRichSpout eventSpout() {
- return eventSpout == null ? new EventSpout(config.eventSpout, new MaasEventDeserializer())
+ return eventSpout == null ? new EventSpout(config.eventSpoutConfig, new MaasEventDeserializer())
: eventSpout;
}
@@ -80,11 +80,11 @@ public class TopologyModule extends AbstractModule {
StormTopology topology() {
TopologyBuilder builder = new TopologyBuilder();
- // Receives MaaS Metrics
- builder.setSpout("maas-metrics-spout", Injector.getInstance(IRichSpout.class, "maas-metrics"),
- config.maasMetricSpoutThreads).setNumTasks(config.maasMetricSpoutTasks);
+ // Receives metrics
+ builder.setSpout("metrics-spout", Injector.getInstance(IRichSpout.class, "metrics"),
+ config.metricSpoutThreads).setNumTasks(config.metricSpoutTasks);
- // Receives MaaS events
+ // Receives events
builder.setSpout("event-spout", Injector.getInstance(IRichSpout.class, "event"),
config.eventSpoutThreads).setNumTasks(config.eventSpoutTasks);
@@ -96,7 +96,7 @@ public class TopologyModule extends AbstractModule {
// Metrics / Event -> Filtering
builder.setBolt("filtering-bolt", new MetricFilteringBolt(config.database),
config.filteringBoltThreads)
- .shuffleGrouping("maas-metrics-spout")
+ .shuffleGrouping("metrics-spout")
.allGrouping("event-bolt", EventProcessingBolt.METRIC_SUB_ALARM_EVENT_STREAM_ID)
.allGrouping("event-bolt", EventProcessingBolt.METRIC_ALARM_EVENT_STREAM_ID)
.setNumTasks(config.filteringBoltTasks);
diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/EventSpout.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/EventSpout.java
index 9c2b418..86190c3 100644
--- a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/EventSpout.java
+++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/EventSpout.java
@@ -7,14 +7,15 @@ import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
-import com.hpcloud.mon.KafkaConsumerConfiguration;
+import com.hpcloud.configuration.KafkaConsumerConfiguration;
+import com.hpcloud.mon.EventSpoutConfig;
import com.hpcloud.mon.infrastructure.thresholding.deserializer.MaasEventDeserializer;
public class EventSpout extends BaseRichSpout {
private static final long serialVersionUID = 8457340455857276878L;
- public EventSpout(KafkaConsumerConfiguration eventSpout, MaasEventDeserializer maasEventDeserializer) {
+ public EventSpout(EventSpoutConfig eventSpout, MaasEventDeserializer maasEventDeserializer) {
// TODO Auto-generated constructor stub
}
diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricSpout.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricSpout.java
index 1c909ef..ed670d7 100644
--- a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricSpout.java
+++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/MetricSpout.java
@@ -1,32 +1,73 @@
package com.hpcloud.mon.infrastructure.thresholding;
-import java.util.Map;
-
-import com.hpcloud.mon.KafkaConsumerConfiguration;
-import com.hpcloud.mon.infrastructure.thresholding.deserializer.MaasMetricDeserializer;
-
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import com.hpcloud.configuration.KafkaConsumerProperties;
+import com.hpcloud.mon.MetricSpoutConfig;
+import com.hpcloud.mon.common.model.metric.Metric;
+import com.hpcloud.mon.common.model.metric.MetricDefinition;
+import com.hpcloud.mon.common.model.metric.MetricEnvelope;
+import com.hpcloud.mon.common.model.metric.MetricEnvelopes;
+import com.hpcloud.mon.infrastructure.thresholding.deserializer.MetricDeserializer;
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+
+import java.util.*;
public class MetricSpout extends BaseRichSpout {
private static final long serialVersionUID = 744004533863562119L;
- public MetricSpout(KafkaConsumerConfiguration maasMetricSpout, MaasMetricDeserializer maasMetricDeserializer) {
- // TODO Auto-generated constructor stub
+ private final MetricSpoutConfig metricSpoutConfig;
+ private final MetricDeserializer metricDeserializer;
+
+ private final ConsumerConnector consumerConnector;
+
+ private List> streams = null;
+
+ private SpoutOutputCollector collector;
+
+ public MetricSpout(MetricSpoutConfig metricSpoutConfig, MetricDeserializer metricDeserializer) {
+ this.metricSpoutConfig = metricSpoutConfig;
+ this.metricDeserializer = metricDeserializer;
+
+ Properties kafkaProperties = KafkaConsumerProperties.createKafkaProperties(metricSpoutConfig.kafkaConsumerConfiguration);
+ ConsumerConfig consumerConfig = new ConsumerConfig(kafkaProperties);
+ this.consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
+ }
+
+ @Override
+ public void activate() {
+ if (streams == null) {
+ Map topicCountMap = new HashMap<>();
+ topicCountMap.put(metricSpoutConfig.kafkaConsumerConfiguration.getTopic(), new Integer(1));
+ Map>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
+ streams = consumerMap.get(metricSpoutConfig.kafkaConsumerConfiguration.getTopic());
+ }
}
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-
+ this.collector = collector;
+
}
@Override
public void nextTuple() {
-
+
+ ConsumerIterator it = streams.get(0).iterator();
+ if (it.hasNext()) {
+ MetricEnvelope metricEnvelope = MetricEnvelopes.fromJson(it.next().message());
+ collector.emit(new Values(metricEnvelope.metric.definition(), metricEnvelope.metric));
+ }
+
}
@Override
diff --git a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/deserializer/MaasMetricDeserializer.java b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/deserializer/MetricDeserializer.java
similarity index 91%
rename from src/main/java/com/hpcloud/mon/infrastructure/thresholding/deserializer/MaasMetricDeserializer.java
rename to src/main/java/com/hpcloud/mon/infrastructure/thresholding/deserializer/MetricDeserializer.java
index 40faee7..07273cc 100644
--- a/src/main/java/com/hpcloud/mon/infrastructure/thresholding/deserializer/MaasMetricDeserializer.java
+++ b/src/main/java/com/hpcloud/mon/infrastructure/thresholding/deserializer/MetricDeserializer.java
@@ -20,7 +20,7 @@ import com.hpcloud.streaming.storm.TupleDeserializer;
*
* @author Jonathan Halterman
*/
-public class MaasMetricDeserializer implements TupleDeserializer, Serializable {
+public class MetricDeserializer implements TupleDeserializer, Serializable {
private static final long serialVersionUID = 4021288586913323048L;
private static final Fields FIELDS = new Fields("metricDefinition", "metric");
diff --git a/src/test/java/com/hpcloud/mon/infrastructure/thresholding/deserializer/MaasMetricDeserializerTest.java b/src/test/java/com/hpcloud/mon/infrastructure/thresholding/deserializer/MetricDeserializerTest.java
similarity index 84%
rename from src/test/java/com/hpcloud/mon/infrastructure/thresholding/deserializer/MaasMetricDeserializerTest.java
rename to src/test/java/com/hpcloud/mon/infrastructure/thresholding/deserializer/MetricDeserializerTest.java
index 7e28f6b..4143bb3 100644
--- a/src/test/java/com/hpcloud/mon/infrastructure/thresholding/deserializer/MaasMetricDeserializerTest.java
+++ b/src/test/java/com/hpcloud/mon/infrastructure/thresholding/deserializer/MetricDeserializerTest.java
@@ -15,8 +15,8 @@ import com.hpcloud.mon.common.model.metric.Metrics;
* @author Jonathan Halterman
*/
@Test
-public class MaasMetricDeserializerTest {
- private MaasMetricDeserializer deserializer = new MaasMetricDeserializer();
+public class MetricDeserializerTest {
+ private MetricDeserializer deserializer = new MetricDeserializer();
public void shouldDeserialize() {
Metric metric = new Metric("bob", null, 123, 5.0);