Removed MetricDeserializer as it it not used. Remove all references to it and its test.
Remove the idea of the collectdSpout since that is no longer used, also.
This commit is contained in:
parent
e439e22181
commit
e2667abbf7
|
@ -17,7 +17,6 @@ import com.hpcloud.mon.infrastructure.thresholding.MetricAggregationBolt;
|
|||
import com.hpcloud.mon.infrastructure.thresholding.MetricFilteringBolt;
|
||||
import com.hpcloud.mon.infrastructure.thresholding.MetricSpout;
|
||||
import com.hpcloud.mon.infrastructure.thresholding.deserializer.EventDeserializer;
|
||||
import com.hpcloud.mon.infrastructure.thresholding.deserializer.MetricDeserializer;
|
||||
import com.hpcloud.util.Injector;
|
||||
|
||||
/**
|
||||
|
@ -28,7 +27,6 @@ import com.hpcloud.util.Injector;
|
|||
public class TopologyModule extends AbstractModule {
|
||||
private final ThresholdingConfiguration config;
|
||||
private Config stormConfig;
|
||||
private IRichSpout collectdMetricSpout;
|
||||
private IRichSpout metricSpout;
|
||||
private IRichSpout eventSpout;
|
||||
|
||||
|
@ -37,10 +35,9 @@ public class TopologyModule extends AbstractModule {
|
|||
}
|
||||
|
||||
public TopologyModule(ThresholdingConfiguration threshConfig, Config stormConfig,
|
||||
IRichSpout collectdMetricSpout, IRichSpout metricSpout, IRichSpout eventSpout) {
|
||||
IRichSpout metricSpout, IRichSpout eventSpout) {
|
||||
this(threshConfig);
|
||||
this.stormConfig = stormConfig;
|
||||
this.collectdMetricSpout = collectdMetricSpout;
|
||||
this.metricSpout = metricSpout;
|
||||
this.eventSpout = eventSpout;
|
||||
}
|
||||
|
@ -65,8 +62,7 @@ public class TopologyModule extends AbstractModule {
|
|||
@Provides
|
||||
@Named("metrics")
|
||||
IRichSpout metricSpout() {
|
||||
return metricSpout == null ? new MetricSpout(config.metricSpoutConfig,
|
||||
new MetricDeserializer()) : metricSpout;
|
||||
return metricSpout == null ? new MetricSpout(config.metricSpoutConfig) : metricSpout;
|
||||
}
|
||||
|
||||
@Provides
|
||||
|
|
|
@ -10,7 +10,6 @@ import com.hpcloud.configuration.KafkaConsumerProperties;
|
|||
import com.hpcloud.mon.MetricSpoutConfig;
|
||||
import com.hpcloud.mon.common.model.metric.MetricEnvelope;
|
||||
import com.hpcloud.mon.common.model.metric.MetricEnvelopes;
|
||||
import com.hpcloud.mon.infrastructure.thresholding.deserializer.MetricDeserializer;
|
||||
|
||||
import kafka.consumer.Consumer;
|
||||
import kafka.consumer.ConsumerConfig;
|
||||
|
@ -29,7 +28,6 @@ public class MetricSpout extends BaseRichSpout {
|
|||
private static final long serialVersionUID = 744004533863562119L;
|
||||
|
||||
private final MetricSpoutConfig metricSpoutConfig;
|
||||
private final MetricDeserializer metricDeserializer;
|
||||
|
||||
private transient ConsumerConnector consumerConnector;
|
||||
|
||||
|
@ -37,9 +35,8 @@ public class MetricSpout extends BaseRichSpout {
|
|||
|
||||
private SpoutOutputCollector collector;
|
||||
|
||||
public MetricSpout(MetricSpoutConfig metricSpoutConfig, MetricDeserializer metricDeserializer) {
|
||||
public MetricSpout(MetricSpoutConfig metricSpoutConfig) {
|
||||
this.metricSpoutConfig = metricSpoutConfig;
|
||||
this.metricDeserializer = metricDeserializer;
|
||||
LOG.info("Created");
|
||||
}
|
||||
|
||||
|
|
|
@ -1,37 +0,0 @@
|
|||
package com.hpcloud.mon.infrastructure.thresholding.deserializer;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import backtype.storm.tuple.Fields;
|
||||
|
||||
import com.hpcloud.mon.common.model.metric.Metric;
|
||||
import com.hpcloud.mon.common.model.metric.Metrics;
|
||||
import com.hpcloud.streaming.storm.TupleDeserializer;
|
||||
|
||||
/**
|
||||
* Deserializes MaaS metrics.
|
||||
*
|
||||
* <ul>
|
||||
* <li>Output: Metric metric
|
||||
* </ul>
|
||||
*
|
||||
* @author Jonathan Halterman
|
||||
*/
|
||||
public class MetricDeserializer implements TupleDeserializer, Serializable {
|
||||
private static final long serialVersionUID = 4021288586913323048L;
|
||||
private static final Fields FIELDS = new Fields("metricDefinition", "metric");
|
||||
|
||||
@Override
|
||||
public List<List<?>> deserialize(byte[] tuple) {
|
||||
Metric metric = Metrics.fromJson(tuple);
|
||||
return Collections.<List<?>>singletonList(Arrays.asList(metric.definition(), metric));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Fields getOutputFields() {
|
||||
return FIELDS;
|
||||
}
|
||||
}
|
|
@ -15,7 +15,6 @@ import org.testng.annotations.Test;
|
|||
|
||||
import backtype.storm.Config;
|
||||
import backtype.storm.testing.FeederSpout;
|
||||
import backtype.storm.topology.IRichSpout;
|
||||
import backtype.storm.tuple.Fields;
|
||||
import backtype.storm.tuple.Values;
|
||||
|
||||
|
@ -31,7 +30,6 @@ import com.hpcloud.mon.domain.service.MetricDefinitionDAO;
|
|||
import com.hpcloud.mon.domain.service.SubAlarmDAO;
|
||||
import com.hpcloud.mon.infrastructure.thresholding.AlarmEventForwarder;
|
||||
import com.hpcloud.mon.infrastructure.thresholding.MetricAggregationBolt;
|
||||
import com.hpcloud.streaming.storm.NoopSpout;
|
||||
import com.hpcloud.streaming.storm.TopologyTestCase;
|
||||
import com.hpcloud.util.Injector;
|
||||
|
||||
|
@ -43,8 +41,7 @@ import com.hpcloud.util.Injector;
|
|||
*/
|
||||
@Test(groups = "integration")
|
||||
public class ThresholdingEngineTest extends TopologyTestCase {
|
||||
private FeederSpout collectdMetricSpout;
|
||||
private IRichSpout maasMetricSpout;
|
||||
private FeederSpout metricSpout;
|
||||
private FeederSpout eventSpout;
|
||||
private AlarmDAO alarmDAO;
|
||||
private SubAlarmDAO subAlarmDAO;
|
||||
|
@ -104,11 +101,10 @@ public class ThresholdingEngineTest extends TopologyTestCase {
|
|||
ThresholdingConfiguration threshConfig = new ThresholdingConfiguration();
|
||||
Config stormConfig = new Config();
|
||||
stormConfig.setMaxTaskParallelism(1);
|
||||
collectdMetricSpout = new FeederSpout(new Fields("metricDefinition", "metric"));
|
||||
maasMetricSpout = new NoopSpout(new Fields("metricDefinition", "metric"));
|
||||
metricSpout = new FeederSpout(new Fields("metricDefinition", "metric"));
|
||||
eventSpout = new FeederSpout(new Fields("event"));
|
||||
Injector.registerModules(new TopologyModule(threshConfig, stormConfig, collectdMetricSpout,
|
||||
maasMetricSpout, eventSpout));
|
||||
Injector.registerModules(new TopologyModule(threshConfig, stormConfig,
|
||||
metricSpout, eventSpout));
|
||||
|
||||
// Evaluate alarm stats every 1 seconds
|
||||
System.setProperty(MetricAggregationBolt.TICK_TUPLE_SECONDS_KEY, "1");
|
||||
|
@ -129,9 +125,9 @@ public class ThresholdingEngineTest extends TopologyTestCase {
|
|||
System.out.println("Feeding metrics...");
|
||||
|
||||
long time = System.currentTimeMillis();
|
||||
collectdMetricSpout.feed(new Values(cpuMetricDef, new Metric(cpuMetricDef.name,
|
||||
metricSpout.feed(new Values(cpuMetricDef, new Metric(cpuMetricDef.name,
|
||||
cpuMetricDef.dimensions, time, (double) (++goodValueCount == 15 ? 1 : 555))));
|
||||
collectdMetricSpout.feed(new Values(memMetricDef, new Metric(memMetricDef.name,
|
||||
metricSpout.feed(new Values(memMetricDef, new Metric(memMetricDef.name,
|
||||
memMetricDef.dimensions, time, (double) (goodValueCount == 15 ? 1 : 555))));
|
||||
|
||||
if (--feedCount == 0)
|
||||
|
|
|
@ -45,8 +45,7 @@ import com.hpcloud.util.Injector;
|
|||
*/
|
||||
@Test(groups = "integration")
|
||||
public class ThresholdingEngineTest1 extends TopologyTestCase {
|
||||
private FeederSpout collectdMetricSpout;
|
||||
private FeederSpout maasMetricSpout;
|
||||
private FeederSpout metricSpout;
|
||||
private FeederSpout eventSpout;
|
||||
private AlarmDAO alarmDAO;
|
||||
private SubAlarmDAO subAlarmDAO;
|
||||
|
@ -122,11 +121,10 @@ public class ThresholdingEngineTest1 extends TopologyTestCase {
|
|||
Config stormConfig = new Config();
|
||||
stormConfig.setMaxTaskParallelism(5);
|
||||
|
||||
collectdMetricSpout = new FeederSpout(new Fields("metricDefinition", "metric"));
|
||||
maasMetricSpout = new FeederSpout(new Fields("metricDefinition", "metric"));
|
||||
metricSpout = new FeederSpout(new Fields("metricDefinition", "metric"));
|
||||
eventSpout = new FeederSpout(new Fields("event"));
|
||||
Injector.registerModules(new TopologyModule(threshConfig, stormConfig, collectdMetricSpout,
|
||||
maasMetricSpout, eventSpout));
|
||||
Injector.registerModules(new TopologyModule(threshConfig, stormConfig,
|
||||
metricSpout, eventSpout));
|
||||
|
||||
// Evaluate alarm stats every 1 seconds
|
||||
System.setProperty(MetricAggregationBolt.TICK_TUPLE_SECONDS_KEY, "1");
|
||||
|
@ -150,11 +148,11 @@ public class ThresholdingEngineTest1 extends TopologyTestCase {
|
|||
|
||||
while (true) {
|
||||
long time = System.currentTimeMillis();
|
||||
collectdMetricSpout.feed(new Values(cpuMetricDef, new Metric(cpuMetricDef.name,
|
||||
metricSpout.feed(new Values(cpuMetricDef, new Metric(cpuMetricDef.name,
|
||||
cpuMetricDef.dimensions, time, count % 10 == 0 ? 555 : 1)));
|
||||
collectdMetricSpout.feed(new Values(memMetricDef, new Metric(memMetricDef.name,
|
||||
metricSpout.feed(new Values(memMetricDef, new Metric(memMetricDef.name,
|
||||
cpuMetricDef.dimensions, time, count % 10 == 0 ? 555 : 1)));
|
||||
maasMetricSpout.feed(new Values(customMetricDef, new Metric(customMetricDef.name,
|
||||
metricSpout.feed(new Values(customMetricDef, new Metric(customMetricDef.name,
|
||||
cpuMetricDef.dimensions, time, count % 20 == 0 ? 1 : 123)));
|
||||
|
||||
if (count % 5 == 0) {
|
||||
|
|
|
@ -1,11 +1,73 @@
|
|||
package com.hpcloud.mon.infrastructure.thresholding;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import com.hpcloud.mon.common.event.AlarmCreatedEvent;
|
||||
import com.hpcloud.mon.common.model.alarm.AlarmExpression;
|
||||
import com.hpcloud.mon.common.model.alarm.AlarmState;
|
||||
import com.hpcloud.mon.common.model.alarm.AlarmSubExpression;
|
||||
import com.hpcloud.mon.common.model.metric.Metric;
|
||||
import com.hpcloud.mon.common.model.metric.MetricEnvelope;
|
||||
import com.hpcloud.mon.common.model.metric.MetricEnvelopes;
|
||||
import com.hpcloud.mon.common.model.metric.Metrics;
|
||||
import com.hpcloud.mon.domain.model.Alarm;
|
||||
import com.hpcloud.mon.domain.model.AlarmStateTransitionEvent;
|
||||
import com.hpcloud.mon.domain.model.SubAlarm;
|
||||
import com.hpcloud.util.Serialization;
|
||||
|
||||
/**
|
||||
* @author Jonathan Halterman
|
||||
*/
|
||||
@Test
|
||||
public class AlarmThresholdingBoltTest {
|
||||
|
||||
public static void main(String[] args) {
|
||||
AlarmStateTransitionEvent event = new AlarmStateTransitionEvent("I am a TenantId",
|
||||
"I am a Alarm Id", "I am a Alarm Name", AlarmState.OK, AlarmState.ALARM,
|
||||
"I am a Alarm Change Reason", System.currentTimeMillis() / 1000);
|
||||
final String s = Serialization.toJson(event);
|
||||
|
||||
final String alarmId = "111111112222222222233333333334";
|
||||
final String tenantId = "AAAAABBBBBBCCCCC";
|
||||
final String expression = "avg(hpcs.compute.cpu{instance_id=123,device=42}, 1) > 5";
|
||||
final Alarm alarm = new Alarm();
|
||||
alarm.setName("Test CPU Alarm");
|
||||
alarm.setTenantId(tenantId);
|
||||
alarm.setId(alarmId);
|
||||
alarm.setExpression(expression);
|
||||
alarm.setState(AlarmState.OK);
|
||||
final AlarmExpression alarmExpression = new AlarmExpression(expression);
|
||||
final List<AlarmSubExpression> subExpressions = alarmExpression.getSubExpressions();
|
||||
final List<SubAlarm> subAlarms = new ArrayList<SubAlarm>(subExpressions.size());
|
||||
int subAlarmId = 4242;
|
||||
for (int i = 0; i < subExpressions.size(); i++) {
|
||||
final SubAlarm subAlarm = new SubAlarm(String.valueOf(subAlarmId++), alarmId, subExpressions.get(i));
|
||||
subAlarms.add(subAlarm);
|
||||
}
|
||||
alarm.setSubAlarms(subAlarms);
|
||||
|
||||
final Map<String, String> dimensions = new HashMap<String, String>();
|
||||
dimensions.put("instance_id", "123");
|
||||
dimensions.put("device", "42");
|
||||
final Metric metric = new Metric("hpcs.compute.cpu", dimensions, System.currentTimeMillis(), 90.0);
|
||||
final MetricEnvelope metricEnvelope = new MetricEnvelope(metric);
|
||||
System.out.println(MetricEnvelopes.toJson(metricEnvelope));
|
||||
|
||||
final AlarmCreatedEvent alarmCreatedEvent = new AlarmCreatedEvent();
|
||||
alarmCreatedEvent.tenantId = tenantId;
|
||||
alarmCreatedEvent.alarmId = alarmId;
|
||||
alarmCreatedEvent.alarmExpression = expression;
|
||||
final Map<String, AlarmSubExpression> subExpressionMap = new HashMap<String, AlarmSubExpression>();
|
||||
for (final SubAlarm subAlarm : subAlarms) {
|
||||
subExpressionMap.put(subAlarm.getId(), subAlarm.getExpression());
|
||||
}
|
||||
alarmCreatedEvent.alarmSubExpressions = subExpressionMap;
|
||||
|
||||
System.out.println(Serialization.toJson(alarmCreatedEvent));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,26 +0,0 @@
|
|||
package com.hpcloud.mon.infrastructure.thresholding.deserializer;
|
||||
|
||||
import static org.testng.Assert.assertEquals;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import com.hpcloud.mon.common.model.metric.Metric;
|
||||
import com.hpcloud.mon.common.model.metric.Metrics;
|
||||
|
||||
/**
|
||||
* @author Jonathan Halterman
|
||||
*/
|
||||
@Test
|
||||
public class MetricDeserializerTest {
|
||||
private MetricDeserializer deserializer = new MetricDeserializer();
|
||||
|
||||
public void shouldDeserialize() {
|
||||
Metric metric = new Metric("bob", null, 123, 5.0);
|
||||
List<List<?>> metrics = deserializer.deserialize(Metrics.toJson(metric).getBytes());
|
||||
assertEquals(metrics, Collections.singletonList(Arrays.asList(metric.definition(), metric)));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue