Initial WIP commit

This commit is contained in:
Jonathan Halterman 2013-03-19 10:52:04 -07:00
parent 88c3a5e192
commit e870dbf2f7
34 changed files with 1861 additions and 137 deletions

100
pom.xml
View File

@ -16,7 +16,8 @@
<!-- Versioning -->
<versionNumber>1.0.0</versionNumber>
<computedVersion>${versionNumber}-SNAPSHOT</computedVersion>
<ps.common.version>1.0.0.212</ps.common.version>
<ps.common.version>1.0.0.216</ps.common.version>
<maas.commons.version>1.0.0-SNAPSHOT</maas.commons.version>
<dropwizard.version>0.6.1</dropwizard.version>
<skipITs>true</skipITs>
@ -26,8 +27,8 @@
</properties>
<scm>
<connection>scm:git:git@git.paas.hpcloud.net:maas-thresh</connection>
<developerConnection>scm:git:git@git.paas.hpcloud.net:maas-thresh</developerConnection>
<connection>scm:git:git@git.mon.hpcloud.net:maas-thresh</connection>
<developerConnection>scm:git:git@git.mon.hpcloud.net:maas-thresh</developerConnection>
</scm>
<repositories>
@ -44,21 +45,11 @@
<name>nexus snapshots</name>
<url>http://nexus.paas.hpcloud.net:8081/nexus/content/repositories/snapshots</url>
</repository>
</repositories>
<distributionManagement>
<repository>
<id>nexus</id>
<name>Internal Releases</name>
<url>http://nexus.paas.hpcloud.net:8081/nexus/content/repositories/releases</url>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
<snapshotRepository>
<id>nexus</id>
<name>Snapshots</name>
<uniqueVersion>false</uniqueVersion>
<url>http://nexus.paas.hpcloud.net:8081/nexus/content/repositories/snapshots</url>
</snapshotRepository>
</distributionManagement>
</repositories>
<profiles>
<profile>
@ -76,19 +67,9 @@
<dependencies>
<dependency>
<groupId>com.yammer.dropwizard</groupId>
<artifactId>dropwizard-core</artifactId>
<version>${dropwizard.version}</version>
</dependency>
<dependency>
<groupId>com.yammer.dropwizard</groupId>
<artifactId>dropwizard-db</artifactId>
<version>${dropwizard.version}</version>
</dependency>
<dependency>
<groupId>com.yammer.dropwizard</groupId>
<artifactId>dropwizard-jdbi</artifactId>
<version>${dropwizard.version}</version>
<groupId>com.hpcloud</groupId>
<artifactId>maas-commons</artifactId>
<version>${maas.commons.version}</version>
</dependency>
<dependency>
<groupId>com.hpcloud</groupId>
@ -105,6 +86,13 @@
<artifactId>ps-messaging</artifactId>
<version>${ps.common.version}</version>
</dependency>
<dependency>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.8.2</version>
<!-- Exclude from assembled jar -->
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-guice</artifactId>
@ -125,6 +113,11 @@
<artifactId>mysql-connector-java</artifactId>
<version>5.1.18</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.1</version>
</dependency>
<!-- Test dependencies -->
<dependency>
@ -133,12 +126,6 @@
<version>${ps.common.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.yammer.dropwizard</groupId>
<artifactId>dropwizard-testing</artifactId>
<version>${dropwizard.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.hpcloud</groupId>
<artifactId>ps-messaging</artifactId>
@ -186,41 +173,18 @@
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<finalName>maas-thresh</finalName>
<createDependencyReducedPom>true</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
<finalName>thresholding</finalName>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.hpcloud.maas.ThresholdingEngine</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.hpcloud.maas.ThresholdingService</mainClass>
</transformer>
</transformers>
<shadedArtifactAttached>true</shadedArtifactAttached>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>

View File

@ -0,0 +1,36 @@
package com.hpcloud.maas;
import javax.validation.Valid;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import org.hibernate.validator.constraints.NotEmpty;
import com.hpcloud.maas.infrastructure.storm.amqp.AMQPSpoutConfiguration;
import com.hpcloud.messaging.amqp.AMQPChannelConfiguration;
import com.hpcloud.messaging.rabbitmq.RabbitMQConfiguration;
import com.yammer.dropwizard.db.DatabaseConfiguration;
public class ThresholdingConfiguration {
@NotNull public Integer locationParallelism = 2;
@NotNull public Integer aggregationParallelism = 10;
@NotNull public Integer thresholdingParallelism = 3;
@Valid @NotNull public AMQPSpoutConfiguration amqpSpout;
@Valid @NotNull public RabbitMQConfiguration internalRabbit = new RabbitMQConfiguration();
/** Threshold for scaling internal connections up */
@NotNull @Min(1) public Integer internalConnectionScalingThreshold;
/** Max number of total internal connections */
@NotNull public Integer maxInternalConnections;
@Valid @NotNull public RabbitMQConfiguration externalRabbit = new RabbitMQConfiguration();
@Valid @NotNull public AMQPChannelConfiguration controlChannel = new AMQPChannelConfiguration();
@NotEmpty public String internalExchange;
@NotEmpty public String externalExchange;
@NotEmpty public String controlExchange;
@NotEmpty public String controlRoutingKey;
@Valid @NotNull public DatabaseConfiguration database = new DatabaseConfiguration();
}

View File

@ -0,0 +1,75 @@
package com.hpcloud.maas;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import com.hpcloud.maas.infrastructure.storm.amqp.AMQPSpout;
import com.hpcloud.maas.infrastructure.storm.amqp.MetricTupleDeserializer;
import com.hpcloud.maas.infrastructure.thresholding.AlarmLookupBolt;
import com.hpcloud.maas.infrastructure.thresholding.AlarmThresholdingBolt;
import com.hpcloud.maas.infrastructure.thresholding.MetricAggregationBolt;
import com.yammer.dropwizard.config.ConfigurationFactory;
import com.yammer.dropwizard.validation.Validator;
/**
* Alarm thresholding engine.
*
* @author Jonathan Halterman
*/
public class ThresholdingEngine {
private static final Logger LOG = LoggerFactory.getLogger(ThresholdingEngine.class);
private final String topologyName;
private final ThresholdingConfiguration config;
public ThresholdingEngine(String topologyName, ThresholdingConfiguration config) {
this.topologyName = topologyName;
this.config = config;
}
public static final ThresholdingConfiguration configFor(String configFileName) throws Exception {
return ConfigurationFactory.forClass(ThresholdingConfiguration.class, new Validator()).build();
}
public static void main(String[] args) throws Exception {
if (args.length != 1) {
LOG.error("Expected a configuration file name argument");
System.exit(1);
}
new ThresholdingEngine("maas-alarming", configFor(args[0])).run();
}
public void run() throws Exception {
StormSubmitter.submitTopology(topologyName, buildConfig(), buildTopology());
}
private Config buildConfig() {
return new Config();
}
private StormTopology buildTopology() {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("amqp", new AMQPSpout(config.amqpSpout, new MetricTupleDeserializer()), 3);
// AMQP -> Location
builder.setBolt("location", new AlarmLookupBolt(), config.locationParallelism).shuffleGrouping(
"amqp");
// Location -> Aggregation
builder.setBolt("aggregation", new MetricAggregationBolt(), config.aggregationParallelism)
.fieldsGrouping("location", new Fields("alarm"));
// Aggregation -> Thresholding
builder.setBolt("thresholding", new AlarmThresholdingBolt(), config.thresholdingParallelism)
.fieldsGrouping("aggregation", new Fields("compositeAlarmId"));
return builder.createTopology();
}
}

View File

@ -0,0 +1,22 @@
package com.hpcloud.maas.app;
import javax.inject.Inject;
import com.hpcloud.maas.common.event.AlarmCreatedEvent;
import com.hpcloud.maas.domain.service.AlarmingService;
import com.hpcloud.messaging.Message;
import com.hpcloud.messaging.MessageHandler;
/**
* Handles AlarmCreated events.
*
* @author Jonathan Halterman
*/
public class AlarmCreatedEventHandler implements MessageHandler<AlarmCreatedEvent> {
private @Inject AlarmingService service;
@Override
public void handle(Message<AlarmCreatedEvent> msg) {
// service.startAlarmingFor(msg.body.tenantId, msg.body.primaryDimension);
}
}

View File

@ -0,0 +1,22 @@
package com.hpcloud.maas.app;
import javax.inject.Inject;
import com.hpcloud.maas.common.event.AlarmDeletedEvent;
import com.hpcloud.maas.domain.service.AlarmingService;
import com.hpcloud.messaging.Message;
import com.hpcloud.messaging.MessageHandler;
/**
* Handles AlarmDeleted events.
*
* @author Jonathan Halterman
*/
public class AlarmDeletedEventHandler implements MessageHandler<AlarmDeletedEvent> {
private @Inject AlarmingService service;
@Override
public void handle(Message<AlarmDeletedEvent> msg) {
service.stopAlarmingFor(msg.body.tenantId, msg.body.primaryDimension);
}
}

View File

@ -0,0 +1,22 @@
package com.hpcloud.maas.app;
import javax.inject.Inject;
import com.hpcloud.maas.common.event.EndpointDeletedEvent;
import com.hpcloud.maas.domain.service.AlarmingService;
import com.hpcloud.messaging.Message;
import com.hpcloud.messaging.MessageHandler;
/**
* Handles EndpointDeleted events.
*
* @author Jonathan Halterman
*/
public class EndpointDeletedEventHandler implements MessageHandler<EndpointDeletedEvent> {
private @Inject AlarmingService service;
@Override
public void handle(Message<EndpointDeletedEvent> msg) {
service.stopAlarmingFor(msg.body.tenantId);
}
}

View File

@ -5,80 +5,55 @@ import java.util.Map;
import javax.annotation.Nullable;
import com.hpcloud.maas.common.model.AbstractEntity;
import com.hpcloud.maas.common.model.AggregateFunction;
import com.hpcloud.maas.common.model.AlarmOperator;
import com.hpcloud.maas.common.model.AlarmState;
public class Alarm extends AbstractEntity {
private String compositeId;
private String id;
private String name;
private String namespace;
private String metricType;
private String metricSubject;
private Map<String, String> dimensions;
private String operator;
private long threshold;
private int periodSeconds;
private int periods;
private AggregateFunction function;
private AlarmOperator operator;
private double threshold;
private AlarmState state;
public Alarm() {
}
public Alarm(String id, String name, String namespace, String metricType,
@Nullable String metricSubject, Map<String, String> dimensions, String operator,
long threshold) {
public Alarm(String compositeId, String id, String name, String namespace, String metricType,
@Nullable String metricSubject, Map<String, String> dimensions, int periodSeconds,
int periods, AggregateFunction function, AlarmOperator operator, long threshold,
AlarmState state) {
this.compositeId = compositeId;
this.id = id;
this.name = name;
this.namespace = namespace;
this.metricType = metricType;
this.metricSubject = metricSubject;
this.dimensions = dimensions;
this.periodSeconds = periodSeconds;
this.periods = periods;
this.function = function;
this.operator = operator;
this.threshold = threshold;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (!super.equals(obj))
return false;
if (getClass() != obj.getClass())
return false;
Alarm other = (Alarm) obj;
if (dimensions == null) {
if (other.dimensions != null)
return false;
} else if (!dimensions.equals(other.dimensions))
return false;
if (metricSubject == null) {
if (other.metricSubject != null)
return false;
} else if (!metricSubject.equals(other.metricSubject))
return false;
if (metricType == null) {
if (other.metricType != null)
return false;
} else if (!metricType.equals(other.metricType))
return false;
if (name == null) {
if (other.name != null)
return false;
} else if (!name.equals(other.name))
return false;
if (namespace == null) {
if (other.namespace != null)
return false;
} else if (!namespace.equals(other.namespace))
return false;
if (operator == null) {
if (other.operator != null)
return false;
} else if (!operator.equals(other.operator))
return false;
if (threshold != other.threshold)
return false;
return true;
this.state = state;
}
public Map<String, String> getDimensions() {
return dimensions;
}
public AggregateFunction getFunction() {
return function;
}
public String getMetricSubject() {
return metricSubject;
}
@ -95,34 +70,36 @@ public class Alarm extends AbstractEntity {
return namespace;
}
public String getOperator() {
public AlarmOperator getOperator() {
return operator;
}
public long getThreshold() {
public int getPeriods() {
return periods;
}
public int getPeriodSeconds() {
return periodSeconds;
}
public AlarmState getState() {
return state;
}
public double getThreshold() {
return threshold;
}
@Override
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + ((dimensions == null) ? 0 : dimensions.hashCode());
result = prime * result + ((metricSubject == null) ? 0 : metricSubject.hashCode());
result = prime * result + ((metricType == null) ? 0 : metricType.hashCode());
result = prime * result + ((name == null) ? 0 : name.hashCode());
result = prime * result + ((namespace == null) ? 0 : namespace.hashCode());
result = prime * result + ((operator == null) ? 0 : operator.hashCode());
result = prime * result + (int) (threshold ^ (threshold >>> 32));
return result;
public boolean isThresholdExceededBy(double value) {
return operator.evaluate(this.threshold, value);
}
public void setDimensions(Map<String, String> dimensions) {
this.dimensions = dimensions;
}
public void setId(String id) {
this.id = id;
public void setFunction(AggregateFunction function) {
this.function = function;
}
public void setMetricSubject(String metricSubject) {
@ -141,11 +118,43 @@ public class Alarm extends AbstractEntity {
this.namespace = namespace;
}
public void setOperator(String operator) {
public void setOperator(AlarmOperator operator) {
this.operator = operator;
}
public void setPeriods(int periods) {
this.periods = periods;
}
public void setPeriodSeconds(int periodSeconds) {
this.periodSeconds = periodSeconds;
}
public void setState(AlarmState state) {
this.state = state;
}
public void setThreshold(double threshold) {
this.threshold = threshold;
}
public void setThreshold(long threshold) {
this.threshold = threshold;
}
public String getCompositeId() {
return compositeId;
}
public void setCompositeId(String compositeId) {
this.compositeId = compositeId;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
}

View File

@ -0,0 +1,165 @@
package com.hpcloud.maas.domain.model;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A time based sliding window containing statistics for a fixed number of slots of a fixed length.
* The sliding window is advanced by calling {@link #advanceWindowTo(long)}. Attempts to call
* {@link #addValue(int, long)} for timestamps that are outside of the window, such as for old
* values or future values, will be ignored.
*
* @author Jonathan Halterman
*/
@NotThreadSafe
public class SlidingWindowStats {
private static final Logger LOG = LoggerFactory.getLogger(SlidingWindowStats.class);
private final int slotWidthInSeconds;
private final int windowLengthInSeconds;
private final Class<? extends Statistic> statType;
private long headTimestamp;
private int headIndex;
private final Slot[] slots;
private static class Slot {
private final long timestamp;
private final Statistic stat;
private Slot(long timestamp, Statistic stat) {
this.timestamp = timestamp;
this.stat = stat;
}
@Override
public String toString() {
return timestamp + "=" + stat;
}
}
/**
* Creates a RollingWindow containing {@code numSlots} slots of size {@code slotWidthSeconds}
* starting at the {@code initialPeriod}.
*
* @param slotWidthSeconds the width of a slot in seconds
* @param numSlots the number of slots in the window
* @param statType to calculate values for
* @param initialTimestamp to start window at
*/
public SlidingWindowStats(int slotWidthSeconds, int numSlots,
Class<? extends Statistic> statType, long initialTimestamp) {
this.slotWidthInSeconds = slotWidthSeconds;
this.windowLengthInSeconds = slotWidthSeconds * numSlots;
this.statType = statType;
headTimestamp = initialTimestamp;
slots = new Slot[numSlots];
}
/** Returns a new slot for the {@code timestamp} and {@code statType}. */
private static Slot createSlot(long timestamp, Class<? extends Statistic> statType) {
try {
return new Slot(timestamp, statType.newInstance());
} catch (Exception e) {
LOG.error("Failed to initialize slot", e);
return null;
}
}
/**
* Adds the {@code value} to the statistics for the slot associated with the {@code timestamp},
* else <b>does nothing</b> if the {@code timestamp} is outside of the window.
*/
public void addValue(double value, long timestamp) {
int index = slotIndexFor(timestamp);
if (index != -1) {
if (slots[index] == null)
slots[index] = createSlot(timestamp, statType);
slots[index].stat.addValue(value);
}
}
/**
* Advances the sliding window to the slot for the {@code timestamp}, erasing values for any slots
* along the way.
*/
public void advanceWindowTo(long timestamp) {
if (timestamp <= headTimestamp)
return;
int slotsToAdvance = (int) (timestamp - headTimestamp) / slotWidthInSeconds;
for (int i = slotsToAdvance; i > 0; i--)
slots[headIndex = indexAfter(headIndex)] = null;
headTimestamp = timestamp;
}
/**
* Returns the timestamps represented by the current position of the sliding window decreasing from
* newest to oldest.
*/
public long[] getTimestamps() {
long[] timestamps = new long[slots.length];
long timestamp = headTimestamp;
for (int i = 0; i < slots.length; i++, timestamp -= slotWidthInSeconds)
timestamps[i] = timestamp;
return timestamps;
}
/**
* Returns the values of the sliding window decreasing in time from newest to oldest.
*/
public double[] getValues() {
double[] values = new double[slots.length];
for (int i = 0, index = headIndex; i < slots.length; i++, index = indexBefore(index))
if (slots[index] != null)
values[i] = slots[index].stat.value();
return values;
}
/**
* Returns a logical view of the sliding window with decreasing timestamps from left to right.
*/
@Override
public String toString() {
StringBuilder b = new StringBuilder();
long timestamp = headTimestamp;
for (int i = 0, index = headIndex; i < slots.length; i++, index = indexBefore(index), timestamp -= slotWidthInSeconds) {
if (i != 0)
b.append(", ");
b.append(timestamp).append('=').append(slots[index] == null ? "na" : slots[index]);
}
return b.toString();
}
/**
* Returns index of the slot associated with the {@code timestamp}, else -1 if the
* {@code timestamp} is outside of the window. Slots decrease in time from right to left,
* wrapping.
*/
int slotIndexFor(long timestamp) {
if (timestamp == headTimestamp)
return headIndex;
if (timestamp < headTimestamp) {
int timeDiff = (int) (headTimestamp - timestamp);
if (timeDiff < windowLengthInSeconds) {
int slotDiff = timeDiff / slotWidthInSeconds;
int offset = headIndex - slotDiff;
if (offset < 0)
offset += slots.length;
return offset;
}
}
return -1;
}
/** Returns the index for the slot in time after the {@index}. */
private int indexAfter(int index) {
return ++index == slots.length ? 0 : index;
}
/** Returns the index for the slot in time before the {@index}. */
private int indexBefore(int index) {
return --index == -1 ? slots.length - 1 : index;
}
}

View File

@ -0,0 +1,14 @@
package com.hpcloud.maas.domain.model;
/**
* Statistic.
*
* @author Jonathan Halterman
*/
public interface Statistic {
/** Returns the value of the statistic. */
double value();
/** Adds the {@code value} to the statistic. */
void addValue(double value);
}

View File

@ -0,0 +1,119 @@
package com.hpcloud.maas.domain.model;
import com.hpcloud.maas.common.model.AggregateFunction;
/**
* Statistic implementations.
*
* @author Jonathan Halterman
*/
public final class Statistics {
public static abstract class AbstractStatistic implements Statistic {
@Override
public String toString() {
return Double.valueOf(value()).toString();
}
}
public static class Average extends Sum {
protected double count;
@Override
public void addValue(double value) {
super.addValue(value);
this.count++;
}
@Override
public double value() {
return sum / count;
}
}
public static class Count extends AbstractStatistic {
protected double count;
@Override
public void addValue(double value) {
this.count++;
}
@Override
public double value() {
return count;
}
}
public static class Max extends AbstractStatistic {
protected double max;
boolean initialized;
@Override
public void addValue(double value) {
if (!initialized) {
max = value;
initialized = true;
}
if (value > max)
max = value;
}
@Override
public double value() {
return max;
}
}
public static class Min extends AbstractStatistic {
protected double min;
boolean initialized;
@Override
public void addValue(double value) {
if (!initialized) {
min = value;
initialized = true;
}
if (value < min)
min = value;
}
@Override
public double value() {
return min;
}
}
public static class Sum extends AbstractStatistic {
protected double sum;
@Override
public void addValue(double value) {
this.sum += value;
}
@Override
public double value() {
return sum;
}
}
private Statistics() {
}
public static Class<? extends Statistic> statTypeFor(AggregateFunction aggregateFunction) {
if (AggregateFunction.AVERAGE.equals(aggregateFunction))
return Average.class;
if (AggregateFunction.COUNT.equals(aggregateFunction))
return Count.class;
if (AggregateFunction.SUM.equals(aggregateFunction))
return Sum.class;
if (AggregateFunction.MIN.equals(aggregateFunction))
return Min.class;
if (AggregateFunction.MAX.equals(aggregateFunction))
return Max.class;
return null;
}
}

View File

@ -0,0 +1,11 @@
package com.hpcloud.maas.domain.service;
/**
* Alarm Store.
*
* @author Jonathan Halterman
*/
public interface AlarmStore {
}

View File

@ -0,0 +1,25 @@
package com.hpcloud.maas.domain.service;
import com.yammer.dropwizard.lifecycle.Managed;
/**
* Services alarming related requests.
*
* @author Jonathan Halterman
*/
public interface AlarmingService extends Managed {
/**
* Starts alarming for the criteria.
*/
void startAlarmingFor(String tenantId, String primaryDimension);
/**
* Stops all alarming for the {@code tenantId}.
*/
void stopAlarmingFor(String tenantId);
/**
* Stops alarming for the criteria.
*/
void stopAlarmingFor(String tenantId, String primaryDimension);
}

View File

@ -0,0 +1,11 @@
package com.hpcloud.maas.domain.service.impl;
import com.hpcloud.maas.domain.service.AlarmStore;
/**
* Stores alarms.
*
* @author Jonathan Halterman
*/
public class AlarmStoreImpl implements AlarmStore {
}

View File

@ -0,0 +1,54 @@
package com.hpcloud.maas.infrastructure.alarming;
import javax.inject.Inject;
import javax.inject.Named;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.hpcloud.maas.ThresholdingConfiguration;
import com.hpcloud.maas.domain.service.AlarmingService;
import com.hpcloud.messaging.rabbitmq.RabbitMQConfiguration;
/**
* Alarming service implementation.
*
* @author Jonathan Halterman
*/
public class AlarmingServiceImpl implements AlarmingService {
private static final Logger LOG = LoggerFactory.getLogger(AlarmingServiceImpl.class);
private final ThresholdingConfiguration config;
@Inject
public AlarmingServiceImpl(ThresholdingConfiguration config,
@Named("internal") RabbitMQConfiguration internalConfig,
@Named("external") RabbitMQConfiguration externalConfig) {
this.config = config;
}
@Override
public void start() throws Exception {
LOG.info("Starting alarming service");
}
@Override
public void stop() throws Exception {
LOG.info("Stopping alarming service");
}
@Override
public void startAlarmingFor(String tenantId, String primaryDimension) {
}
@Override
public void stopAlarmingFor(String tenantId) {
}
@Override
public void stopAlarmingFor(String tenantId, String primaryDimension) {
}
}

View File

@ -0,0 +1,15 @@
package com.hpcloud.maas.infrastructure.storm;
import java.util.List;
/**
* Deserializes tuples.
*
* @author Jonathan Halterman
*/
public interface TupleDeserializer {
/**
* Returns a list of deserialized tuples for the {@code tuple}.
*/
List<?> deserialize(byte[] tuple);
}

View File

@ -0,0 +1,19 @@
package com.hpcloud.maas.infrastructure.storm;
import backtype.storm.Constants;
import backtype.storm.tuple.Tuple;
/**
* Utilities for working with Tuples.
*
* @author Jonathan Halterman
*/
public final class Tuples {
private Tuples() {
}
public static boolean isTickTuple(Tuple tuple) {
return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
&& tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
}
}

View File

@ -0,0 +1,267 @@
package com.hpcloud.maas.infrastructure.storm.amqp;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import backtype.storm.spout.Scheme;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import com.hpcloud.maas.infrastructure.storm.TupleDeserializer;
import com.hpcloud.messaging.rabbitmq.RabbitMQConnection;
import com.hpcloud.util.Exceptions;
import com.rabbitmq.client.AMQP.Queue;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
/**
* Spout to feed messages into Storm from an AMQP queue. Each message routed to the queue will be
* emitted as a Storm tuple. The message will be acked or rejected once the topology has
* respectively fully processed or failed the corresponding tuple.
*
* <p>
* <strong>N.B.</strong> if you need to guarantee all messages are reliably processed, you should
* have AMQPSpout consume from a queue that is <em>not</em> set as 'exclusive' or 'auto-delete':
* otherwise if the spout task crashes or is restarted, the queue will be deleted and any messages
* in it lost, as will any messages published while the task remains down. See
* {@link com.hpcloud.maas.infrastructure.storm.amqp.SharedQueueWithBinding} to declare a shared
* queue that allows for guaranteed processing. (For prototyping, an
* {@link com.hpcloud.maas.infrastructure.storm.amqp.ExclusiveQueueWithBinding} may be simpler to
* manage.)
* </p>
*
* <p>
* <strong>N.B.</strong> this does not currently handle malformed messages (which cannot be
* deserialised by the provided {@link Scheme}) very well: the spout worker will crash if it fails
* to serialise a message.
* </p>
*
* <p>
* This consumes messages from AMQP asynchronously, so it may receive messages before Storm requests
* them as tuples; therefore it buffers messages in an internal queue. To avoid this buffer growing
* large and consuming too much RAM, set the config prefetchCount.
* </p>
*
* <p>
* This spout can be distributed among multiple workers, depending on the queue declaration: see
* {@link QueueDeclarator#isParallelConsumable}.
* </p>
*
* @see QueueDeclarator
* @see com.hpcloud.maas.infrastructure.storm.amqp.SharedQueueWithBinding
* @see com.hpcloud.maas.infrastructure.storm.amqp.ExclusiveQueueWithBinding
*
* @author Sam Stokes (sam@rapportive.com)
* @author Jonathan Halterman (jonathan@jodah.org)
*/
public class AMQPSpout implements IRichSpout {
private static final long serialVersionUID = 11258942292629264L;
private static final Logger LOG = LoggerFactory.getLogger(AMQPSpout.class);
private final AMQPSpoutConfiguration config;
private final TupleDeserializer deserializer;
private final long waitForNextMessageMillis;
private final QueueDeclarator queueDeclarator;
private transient boolean spoutActive = true;
private transient RabbitMQConnection connection;
private transient Channel channel;
private transient QueueingConsumer consumer;
private transient String consumerTag;
private SpoutOutputCollector collector;
public AMQPSpout(AMQPSpoutConfiguration config, TupleDeserializer deserializer) {
this.config = config;
this.deserializer = deserializer;
this.waitForNextMessageMillis = config.waitForNextMessage.toMillis();
this.queueDeclarator = config.queueName == null ? new ExclusiveQueueWithBinding(
config.exchange, config.routingKey) : new SharedQueueWithBinding(config.queueName,
config.exchange, config.routingKey);
}
/**
* Acks the message with the AMQP broker using the message's delivery tack.
*/
@Override
public void ack(Object msgId) {
if (msgId instanceof Long) {
final long deliveryTag = (Long) msgId;
if (channel != null) {
try {
channel.basicAck(deliveryTag, false /* not multiple */);
} catch (IOException e) {
LOG.warn("Failed to ack delivery-tag {}", deliveryTag, e);
} catch (ShutdownSignalException e) {
LOG.warn("AMQP connection failed. Failed to ack delivery-tag {}", deliveryTag, e);
}
}
} else {
LOG.warn("Don't know how to ack({}: {})", msgId.getClass().getName(), msgId);
}
}
/**
* Resumes a paused spout
*/
public void activate() {
LOG.info("Unpausing spout");
spoutActive = true;
}
/**
* Cancels the queue subscription, and disconnects from the AMQP broker.
*/
@Override
public void close() {
LOG.info("Closing AMQP spout");
if (consumerTag != null) {
try {
channel.basicCancel(consumerTag);
} catch (IOException e) {
LOG.warn("Error cancelling AMQP consumer", e);
}
}
if (connection != null)
connection.close();
}
/**
* Pauses the spout
*/
public void deactivate() {
LOG.info("Pausing AMQP spout");
spoutActive = false;
}
/**
* Declares the output fields of this spout according to the provided
* {@link backtype.storm.spout.Scheme}.
*
* Additionally declares an error stream (see {@link #ERROR_STREAM_NAME} for handling malformed or
* empty messages to avoid infinite retry loops
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("output"));
declarer.declareStream(config.errorStream, new Fields("deliveryTag", "bytes"));
}
/**
* Tells the AMQP broker to reject the message, requeueing the message if configured to do so.
* <p>
* <strong>Note:</strong> There's a potential for infinite re-delivery in the event of
* non-transient failures (e.g. malformed messages).
*
*/
@Override
public void fail(Object msgId) {
if (msgId instanceof Long) {
final long deliveryTag = (Long) msgId;
if (channel != null) {
try {
channel.basicReject(deliveryTag, config.requeueOnFail);
} catch (IOException e) {
LOG.warn("Failed to reject delivery-tag " + deliveryTag, e);
}
}
} else {
LOG.warn(String.format("Cannot reject unknown delivery tag (%s: %s)", msgId.getClass()
.getName(), msgId));
}
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
/**
* Emits the next message from the queue as a tuple.
*
* Serialization schemes returning null will immediately ack and then emit unanchored on the
* {@link #ERROR_STREAM_NAME} stream for further handling by the consumer.
*
* <p>
* If no message is ready to emit, this will wait a short time ({@link #WAIT_FOR_NEXT_MESSAGE})
* for one to arrive on the queue, to avoid a tight loop in the spout worker.
* </p>
*/
@Override
public void nextTuple() {
if (spoutActive && consumer != null) {
QueueingConsumer.Delivery delivery = null;
try {
delivery = consumer.nextDelivery(waitForNextMessageMillis);
if (delivery == null)
return;
final long deliveryTag = delivery.getEnvelope().getDeliveryTag();
final byte[] message = delivery.getBody();
try {
for (Object tuple : deserializer.deserialize(message))
collector.emit(Collections.singletonList(tuple), deliveryTag);
} catch (Exception e) {
handleMalformedDelivery(deliveryTag, message);
}
} catch (ShutdownSignalException e) {
LOG.warn("AMQP connection dropped, will attempt to reconnect...");
if (!e.isInitiatedByApplication())
connection.reopen();
} catch (InterruptedException e) {
// interrupted while waiting for message, big deal
}
}
}
/**
* Connects to the AMQP broker, declares the queue and subscribes to incoming messages.
*/
@Override
public void open(@SuppressWarnings("rawtypes") Map config, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
createConnection();
}
private void createConnection() {
try {
connection.open();
channel = connection.channelFor("spout");
channel.basicQos(config.prefetchCount);
final Queue.DeclareOk queue = queueDeclarator.declare(channel);
final String queueName = queue.getQueue();
LOG.info("Consuming from queue {}", queueName);
consumer = new QueueingConsumer(channel);
consumerTag = channel.basicConsume(queueName, false /* no auto-ack */, consumer);
} catch (Exception e) {
Exceptions.uncheck(e, "Failed to open AMQP connection");
}
}
/**
* Acks the bad message to avoid retry loops. Also emits the bad message unreliably on the
* {@link #ERROR_STREAM_NAME} stream for consumer handling.
*
* @param deliveryTag AMQP delivery tag
* @param message bytes of the bad message
*/
private void handleMalformedDelivery(long deliveryTag, byte[] message) {
LOG.debug("Malformed deserialized message, null or zero-length. " + deliveryTag);
ack(deliveryTag);
collector.emit(config.errorStream, new Values(deliveryTag, message));
}
}

View File

@ -0,0 +1,71 @@
package com.hpcloud.maas.infrastructure.storm.amqp;
import javax.annotation.Nullable;
import javax.validation.Valid;
import javax.validation.constraints.Min;
import org.hibernate.validator.constraints.NotEmpty;
import backtype.storm.spout.Scheme;
import com.hpcloud.messaging.rabbitmq.RabbitMQConfiguration;
import com.hpcloud.util.Duration;
/**
* Encapsulates AMQP related configuration and constraints.
*
* @author Jonathan Halterman
*/
public class AMQPSpoutConfiguration {
@Valid @NotEmpty public RabbitMQConfiguration rabbit;
/** Exchange to consume messages from. */
@NotEmpty public String exchange;
/** Queue name to consume messages from. If null a random queue will be created. */
@Nullable public String queueName;
/** Routing key to bind to queue. */
@NotEmpty public String routingKey;
/**
* Indicates whether rejected messages should be re-queued. Note: This can lead to infinite loops
* if message failure is not transient.
*
* Default: false;
*/
@NotEmpty public Boolean requeueOnFail = false;
/**
* Time in milliseconds to wait to read the next message from the queue after all messages have
* been read.
*
* Default: 1 millis
*/
@NotEmpty public Duration waitForNextMessage = Duration.millis(1);
/**
* Defaults to 100.
*
* <p>
* This caps the number of messages outstanding (i.e. unacked) at a time that will be sent to each
* spout worker. Increasing this will improve throughput if the network roundtrip time to the AMQP
* broker is significant compared to the time for the topology to process each message; this will
* also increase the RAM requirements as the internal message buffer grows.
* </p>
*
* <p>
* AMQP allows a prefetch-count of zero, indicating unlimited delivery, but that is not allowed
* here to avoid unbounded buffer growth.
* </p>
*
* Default: 100.
*/
@Min(1) public int prefetchCount = 100;
/**
* Name of the stream where malformed deserialized messages are sent for special handling.
* Generally used when a {@link Scheme} implementation returns null or a zero-length tuple.
*
* Default: error-stream.
*/
@NotEmpty public String errorStream = "error-stream";
}

View File

@ -0,0 +1,76 @@
package com.hpcloud.maas.infrastructure.storm.amqp;
import java.io.IOException;
import com.rabbitmq.client.AMQP.Queue;
import com.rabbitmq.client.Channel;
/**
* Declares an exclusive, server-named queue and binds it to an existing exchange. This is probably
* the easiest way to start prototyping with an
* {@link com.hpcloud.maas.infrastructure.storm.amqp.rapportive.storm.spout.AMQPSpout}: if your app
* already publishes to an exchange, you can just point this at the exchange and start consuming
* messages.
*
* <p>
* However <strong>N.B.</strong> this queue setup <em>is not reliable</em>, in that if the spout
* task crashes or restarts, messages published while the spout is down will be lost (because the
* spout creates the queue when it starts up, and the server deletes the queue when the spout
* closes).
* </p>
*
* <p>
* It also cannot scale out to multiple parallel spout tasks. The semantics of an exclusive queue
* mean that each spout task would get its own queue bound to the exchange. That means each task
* would receive a copy of every message, so messages would get processed multiple times.
* </p>
*
* <p>
* If you need guaranteed processing or a horizontally scalable spout, consider
* {@link SharedQueueWithBinding}.
* </p>
*/
public class ExclusiveQueueWithBinding implements QueueDeclarator {
private static final long serialVersionUID = 7923072289071634425L;
private final String exchange;
private final String routingKey;
/**
* Create a declaration of an exclusive server-named queue bound to the specified exchange.
*
* @param exchange exchange to bind the queue to.
* @param routingKey routing key for the exchange binding. Use "#" to receive all messages
* published to the exchange.
*/
public ExclusiveQueueWithBinding(String exchange, String routingKey) {
this.exchange = exchange;
this.routingKey = routingKey;
}
/**
* Verifies the exchange exists, creates an exclusive, server-named queue and binds it to the
* exchange.
*
* @return the server's response to the successful queue declaration (you can use this to discover
* the name of the queue).
*
* @throws IOException if the exchange does not exist, or if the AMQP connection drops.
*/
@Override
public Queue.DeclareOk declare(Channel channel) throws IOException {
channel.exchangeDeclarePassive(exchange);
final Queue.DeclareOk queue = channel.queueDeclare();
channel.queueBind(queue.getQueue(), exchange, routingKey);
return queue;
}
/**
* Returns <tt>false</tt> as this queue is <em>not</em> safe for parallel consumers.
*/
@Override
public boolean isParallelConsumable() {
return false;
}
}

View File

@ -0,0 +1,18 @@
package com.hpcloud.maas.infrastructure.storm.amqp;
import java.util.List;
import com.hpcloud.maas.common.metric.InternalMetrics;
import com.hpcloud.maas.infrastructure.storm.TupleDeserializer;
/**
* Deserializes metrics. Outputs a single "output" field.
*
* @author Jonathan Halterman
*/
public class MetricTupleDeserializer implements TupleDeserializer {
@Override
public List<?> deserialize(byte[] tuple) {
return InternalMetrics.metricsFor(tuple);
}
}

View File

@ -0,0 +1,47 @@
package com.hpcloud.maas.infrastructure.storm.amqp;
import java.io.IOException;
import java.io.Serializable;
import com.rabbitmq.client.AMQP.Queue;
import com.rabbitmq.client.Channel;
/**
* Declaration of a queue to consume, and any exchange bindings the queue needs.
*
* <p>
* Depending on the queue parameters (exclusive, auto_delete, server-named) and exchange bindings,
* it may or may not be safe to start several consumers in parallel using a given queue declaration.
* For example, an exclusive named queue bound to an exchange is not safe because only one of the
* consumers will succeed in declaring the queue; an exclusive <em>server-named</em> queue does not
* have that problem, but is still probably not safe, because most exchange types will send a copy
* of every message to every queue bound to them, so you will end up consuming each message several
* times.
* </p>
*
* <p>
* For that reason, to implement this interface you must implement {@link #isParallelConsumable} to
* indicate whether or not this queue is safe for parallel consumers.
* </p>
*/
public interface QueueDeclarator extends Serializable {
/**
* Declare the queue, and any exchanges and bindings that it needs. Called once to determine the
* queue to consume from.
*
* @param channel An open AMQP channel which can be used to send the declarations.
* @return the server's response to the successful queue declaration (used to determine the queue
* name to subscribe to).
*
* @throws IOException if a declaration fails or the AMQP connection drops.
*/
Queue.DeclareOk declare(Channel channel) throws IOException;
/**
* Indicate whether this queue is safe for parallel consumers.
*
* @return <tt>true</tt> if safe for parallel consumers, otherwise <tt>false</tt>.
*/
boolean isParallelConsumable();
}

View File

@ -0,0 +1,128 @@
package com.hpcloud.maas.infrastructure.storm.amqp;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.AMQP.Queue;
import com.rabbitmq.client.Channel;
/**
* Declares a named, durable queue and binds it to an existing exchange. This is a good choice for
* production use as the queue will survive spout restarts, so you won't miss messages if your spout
* crashes.
*
* <p>
* <strong>N.B.</strong> this could be risky under some circumstances. e.g. if while prototyping you
* set a development topology consuming from a production AMQP server, then kill your topology and
* go home for the night; messages will continue to be queued up, which could threaten the stability
* of the AMQP server if the exchange is high-volume. For prototyping consider
* {@link ExclusiveQueueWithBinding}.
* </p>
*
* <p>
* This queue is safe for multiple parallel spout tasks: as they all consume the same named queue,
* the AMQP broker will round-robin messages between them, so each message will get processed only
* once (barring redelivery due to outages).
* </p>
*/
public class SharedQueueWithBinding implements QueueDeclarator {
private static final long serialVersionUID = 2364833412534518859L;
private final String queueName;
private final String exchange;
private final String routingKey;
private HAPolicy haPolicy;
public static class HAPolicy implements Serializable {
private static final long serialVersionUID = -5276009714329009060L;
private Map<String, Object> queueProperties;
private HAPolicy(Map<String, Object> queueParams) {
this.queueProperties = queueParams;
}
public static HAPolicy all() {
HashMap<String, Object> args = new HashMap<String, Object>();
args.put("x-ha-policy", "all");
return new HAPolicy(args);
}
public static HAPolicy nodes(String... nodeNames) {
if (nodeNames.length < 1)
throw new IllegalArgumentException("List of nodenames should contain at least one name");
HashMap<String, Object> args = new HashMap<String, Object>();
args.put("x-ha-policy", "nodes");
args.put("x-ha-x-ha-policy-params", Arrays.asList(nodeNames));
return new HAPolicy(args);
}
public Map<String, Object> asQueueProperies() {
return queueProperties;
}
}
/**
* Create a declaration of a named, durable, non-exclusive queue bound to the specified exchange.
*
* @param queueName name of the queue to be declared.
* @param exchange exchange to bind the queue to.
* @param routingKey routing key for the exchange binding. Use "#" to receive all messages
* published to the exchange.
*/
public SharedQueueWithBinding(String queueName, String exchange, String routingKey) {
this(queueName, exchange, routingKey, null);
}
/**
* Create a declaration of a named, durable, non-exclusive queue bound to the specified exchange.
*
* @param queueName name of the queue to be declared.
* @param exchange exchange to bind the queue to.
* @param routingKey routing key for the exchange binding. Use "#" to receive all messages
* published to the exchange.
* @param policy high-availability policy to use
*/
public SharedQueueWithBinding(String queueName, String exchange, String routingKey,
HAPolicy policy) {
this.queueName = queueName;
this.exchange = exchange;
this.routingKey = routingKey;
this.haPolicy = policy;
}
/**
* Verifies the exchange exists, creates the named queue if it does not exist, and binds it to the
* exchange.
*
* @return the server's response to the successful queue declaration.
*
* @throws IOException if the exchange does not exist, the queue could not be declared, or if the
* AMQP connection drops.
*/
@Override
public Queue.DeclareOk declare(Channel channel) throws IOException {
channel.exchangeDeclarePassive(exchange);
Queue.DeclareOk queue = channel.queueDeclare(queueName,
/* durable */true,
/* non-exclusive */false,
/* non-auto-delete */false,
haPolicy == null ? null /* no arguments */: haPolicy.asQueueProperies());
channel.queueBind(queue.getQueue(), exchange, routingKey);
return queue;
}
/**
* Returns <tt>true</tt> as this queue is safe for parallel consumers.
*/
@Override
public boolean isParallelConsumable() {
return true;
}
}

View File

@ -0,0 +1,36 @@
package com.hpcloud.maas.infrastructure.thresholding;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import com.hpcloud.maas.domain.model.Alarm;
/**
* Locates alarms for incoming metrics.
*
* <ul>
* <li>Input: "object" : Metric
* <li>Output: "metric" : Metric, "alarm" : Alarm
* </ul>
*
* @author Jonathan Halterman
*/
public class AlarmLookupBolt extends BaseBasicBolt {
private static final long serialVersionUID = 397873545987747100L;
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String metric = input.getString(0);
Alarm alarm = Alarms.alarmFor(metric);
collector.emit(new Values(metric, alarm));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("metric", "alarm"));
}
}

View File

@ -0,0 +1,26 @@
package com.hpcloud.maas.infrastructure.thresholding;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
import com.hpcloud.maas.domain.model.Alarm;
/**
* Determines whether an alarm threshold has been exceeded.
*
* @author Jonathan Halterman
*/
public class AlarmThresholdingBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String compositeAlarmId = tuple.getString(0);
Alarm alarm = (Alarm) tuple.getValue(1);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}

View File

@ -0,0 +1,33 @@
package com.hpcloud.maas.infrastructure.thresholding;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.hpcloud.maas.domain.model.Alarm;
public final class Alarms {
private Alarms() {
}
public static String alarmIdFor(String metric) {
// self.key_dict[memory_key] = self.cass.get_metric_key_or_create(
// message['project_id'], message['namespace'],
// message['metric_name'], message['dimensions'],
// message['unit']
// )
//
return null;
}
public static Alarm alarmFor(String metric) {
return null;
}
LoadingCache<String, Alarm> graphs = CacheBuilder.newBuilder().build(
new CacheLoader<String, Alarm>() {
public Alarm load(String key) {
// return createExpensiveGraph(key);
return null;
}
});
}

View File

@ -0,0 +1,127 @@
package com.hpcloud.maas.infrastructure.thresholding;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import backtype.storm.Config;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import com.hpcloud.maas.common.model.AlarmState;
import com.hpcloud.maas.common.model.Metric;
import com.hpcloud.maas.domain.model.Alarm;
import com.hpcloud.maas.domain.model.SlidingWindowStats;
import com.hpcloud.maas.domain.model.Statistic;
import com.hpcloud.maas.domain.model.Statistics;
import com.hpcloud.maas.infrastructure.storm.Tuples;
/**
* Aggregates metrics for individual alarms. Receives metric/alarm tuples and tick tuples, and
* outputs alarm information whenever an alarm's state changes.
*
* <ul>
* <li>Input - "metric" : Metric, "alarm" : Alarm
* <li>Output - "compositeAlarmId" : String, "alarmId" : String, "alarmState" : String
* </ul>
*
* @author Jonathan Halterman
*/
public class MetricAggregationBolt extends BaseRichBolt {
private static final Logger LOG = LoggerFactory.getLogger(MetricAggregationBolt.class);
private static final long serialVersionUID = 5624314196838090726L;
private OutputCollector collector;
private List<Alarm> alarms;
private Map<Alarm, SlidingWindowStats> alarmStats = new HashMap<Alarm, SlidingWindowStats>();
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("compositeAlarmId", "alarmId", "alarmState"));
}
@Override
public void execute(Tuple tuple) {
if (!Tuples.isTickTuple(tuple))
aggregateMetric(tuple);
else
evaluateAlarms();
}
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = new HashMap<String, Object>();
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 60);
return conf;
}
@Override
@SuppressWarnings("rawtypes")
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
/**
* Aggregates metrics for values that are within the periods defined for the alarm.
*/
private void aggregateMetric(Tuple tuple) {
Metric metric = (Metric) tuple.getValue(0);
Alarm alarm = (Alarm) tuple.getValue(1);
SlidingWindowStats stats = alarmStats.get(alarm);
if (stats == null) {
Class<? extends Statistic> statType = Statistics.statTypeFor(alarm.getFunction());
if (statType == null)
LOG.warn("Unknown statistic type {}", alarm.getFunction());
else {
stats = new SlidingWindowStats(alarm.getPeriodSeconds(), alarm.getPeriods(), statType,
metric.timestamp);
alarmStats.put(alarm, stats);
}
}
stats.addValue((int) metric.value, metric.timestamp);
}
private void evaluateAlarm(Alarm alarm) {
SlidingWindowStats stats = alarmStats.get(alarm);
AlarmState initialState = alarm.getState();
AlarmState newState = null;
if (stats == null)
newState = AlarmState.UNDETERMINED;
else {
// Evaluate and update state of alarm
String finalState = null;
// We may want to track each alarm's state and only emit when the state changes. that means
// we'd
// evaluate the alarm each time a metric hits the alarm.
// we must wait till evaluationPeriod periods have passed before even bothering to evaluate
// the
// alarm.
// after that we can evaluate every 1 second or whatever
// actually, this isn't good enough since we have to detect insufficient data things on a 60
// sec
// timer
}
if (stats == null || !newState.equals(initialState))
collector.emit(new Values(alarm.getCompositeId(), alarm.getId(), alarm.getState().toString()));
}
private void evaluateAlarms() {
for (Alarm alarm : alarms)
evaluateAlarm(alarm);
}
}

View File

@ -0,0 +1,27 @@
package com.hpcloud;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
public class PrintSampleStream {
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new TestWordsSpout(), 5);
builder.setBolt("print", new PrinterBolt(), 4).fieldsGrouping("spout", new Fields("word"));
Config conf = new Config();
conf.setDebug(true);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.shutdown();
}
}

View File

@ -0,0 +1,24 @@
package com.hpcloud;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
public class PrinterBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
// System.out.println(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer ofd) {
}
}

View File

@ -0,0 +1,50 @@
package com.hpcloud;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.Map;
import java.util.Random;
public class RandomSentenceSpout extends BaseRichSpout {
SpoutOutputCollector _collector;
Random _rand;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
_rand = new Random();
}
@Override
public void nextTuple() {
Utils.sleep(100);
String[] sentences = new String[] {
"the cow jumped over the moon",
"an apple a day keeps the doctor away",
"four score and seven years ago",
"snow white and the seven dwarfs",
"i am at two with nature"};
String sentence = sentences[_rand.nextInt(sentences.length)];
_collector.emit(new Values(sentence));
}
@Override
public void ack(Object id) {
}
@Override
public void fail(Object id) {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}

View File

@ -0,0 +1,42 @@
package com.hpcloud;
import java.util.Map;
import java.util.Random;
import org.apache.log4j.Logger;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
public class TestWordsSpout extends BaseRichSpout {
public static Logger LOG = Logger.getLogger(TestWordsSpout.class);
SpoutOutputCollector _collector;
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
}
static final String AB = "ABCDEFG";
static Random rnd = new Random();
String randomString(int len) {
StringBuilder sb = new StringBuilder(len);
for (int i = 0; i < len; i++)
sb.append(AB.charAt(rnd.nextInt(AB.length())));
return sb.toString();
}
public void nextTuple() {
Utils.sleep(100);
_collector.emit(new Values(randomString(1)));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}

View File

@ -0,0 +1,21 @@
package com.hpcloud.infrastructure.storm;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;
public class Topologies {
public static void runLocally(StormTopology topology, String topologyName, Config conf,
int runtimeInSeconds) {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(topologyName, conf, topology);
try {
Thread.sleep((long) runtimeInSeconds * 1000);
} catch (Exception ignore) {
}
cluster.killTopology(topologyName);
cluster.shutdown();
}
}

View File

@ -0,0 +1,29 @@
package com.hpcloud.maas;
public class TestThresholdingEngine extends ThresholdingEngine {
public TestThresholdingEngine(String topologyName, ThresholdingConfiguration config) {
super(topologyName, config);
}
// @Override
// protected Config buildConfig() {
// Config conf = new Config();
// conf.setDebug(true);
// return conf;
// }
// @Override
// public void run(MaasRouterConfiguration config, Environment environment) throws Exception {
//
// }
//
// @Override
// protected StormTopology buildTopology() {
//
// }
//
// @Override
// protected void submitTopology() throws Exception {
// Topologies.runLocally(buildTopology(), "storm-test-top", buildConfig(), runtimeInSeconds)
// }
}

View File

@ -0,0 +1,116 @@
package com.hpcloud.maas.domain.model;
import static org.testng.Assert.assertEquals;
import org.testng.annotations.Test;
/**
* @author Jonathan Halterman
*/
@Test
public class SlidingWindowStatsTest {
public void shouldGetSlotForTime() {
}
public void shouldNotGetSlotForOutOfWindowTime() {
}
public void shouldNotAddSlotOutsideOfWindow() {
}
public void shouldAdvanceWindowToTimestamp() {
}
public void shouldAdvanceWindow() {
SlidingWindowStats window = new SlidingWindowStats(1, 3, Statistics.Average.class, 1);
window.advanceWindowTo(2);
window.advanceWindowTo(3);
window.advanceWindowTo(5);
assertEquals(window.getTimestamps(), new long[] { 5, 4, 3 });
window.advanceWindowTo(9);
assertEquals(window.getTimestamps(), new long[] { 9, 8, 7 });
window.advanceWindowTo(14);
assertEquals(window.getTimestamps(), new long[] { 14, 13, 12 });
// Attempt to advance backwards - Noop
window.advanceWindowTo(5);
assertEquals(window.getTimestamps(), new long[] { 14, 13, 12 });
}
public void testSlotIndexFor() {
SlidingWindowStats window = new SlidingWindowStats(1, 5, Statistics.Average.class, 10);
// Window looks like 10 6 7 8 9
assertEquals(window.slotIndexFor(12), -1);
assertEquals(window.slotIndexFor(10), 0);
assertEquals(window.slotIndexFor(9), 4);
assertEquals(window.slotIndexFor(6), 1);
assertEquals(window.slotIndexFor(2), -1);
window.advanceWindowTo(12);
// Window looks like 10 11 12 8 9
assertEquals(window.slotIndexFor(12), 2);
assertEquals(window.slotIndexFor(10), 0);
assertEquals(window.slotIndexFor(8), 3);
window.advanceWindowTo(15);
// Window looks like 15 11 12 13 14
assertEquals(window.slotIndexFor(17), -1);
assertEquals(window.slotIndexFor(15), 0);
assertEquals(window.slotIndexFor(14), 4);
assertEquals(window.slotIndexFor(11), 1);
assertEquals(window.slotIndexFor(10), -1);
window.advanceWindowTo(18);
// Window looks like 15 16 17 18 14
assertEquals(window.slotIndexFor(20), -1);
assertEquals(window.slotIndexFor(18), 3);
assertEquals(window.slotIndexFor(11), -1);
assertEquals(window.slotIndexFor(14), 4);
assertEquals(window.slotIndexFor(16), 1);
}
public void shouldGetValues() {
SlidingWindowStats window = new SlidingWindowStats(1, 3, Statistics.Sum.class, 10);
window.addValue(1, 10);
window.addValue(1, 10);
window.addValue(2, 9);
window.addValue(2, 9);
window.addValue(3, 8);
window.addValue(3, 8);
assertEquals(window.getValues(), new double[] { 2, 4, 6 });
// Outside of the window - Noop
window.addValue(3, 12);
window.addValue(3, 7);
assertEquals(window.getValues(), new double[] { 2, 4, 6 });
window.advanceWindowTo(11);
assertEquals(window.getValues(), new double[] { 0, 2, 4 });
window.advanceWindowTo(12);
assertEquals(window.getValues(), new double[] { 0, 0, 2 });
window.advanceWindowTo(20);
assertEquals(window.getValues(), new double[] { 0, 0, 0 });
}
public void shouldGetTimestamps() {
SlidingWindowStats window = new SlidingWindowStats(1, 5, Statistics.Sum.class, 10);
assertEquals(window.getTimestamps(), new long[] { 10, 9, 8, 7, 6 });
window.advanceWindowTo(14);
assertEquals(window.getTimestamps(), new long[] { 14, 13, 12, 11, 10 });
}
}

View File

@ -16,6 +16,9 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.google.common.io.Resources;
import com.hpcloud.maas.common.model.AggregateFunction;
import com.hpcloud.maas.common.model.AlarmOperator;
import com.hpcloud.maas.common.model.AlarmState;
import com.hpcloud.maas.domain.model.Alarm;
import com.hpcloud.maas.domain.service.AlarmDAO;
@ -33,8 +36,7 @@ public class AlarmDAOImplTest {
protected void setupClass() throws Exception {
db = new DBI("jdbc:h2:mem:test;MODE=MySQL");
handle = db.open();
handle.execute(Resources.toString(getClass().getResource("alarm.sql"),
Charset.defaultCharset()));
handle.execute(Resources.toString(getClass().getResource("alarm.sql"), Charset.defaultCharset()));
repo = new AlarmDAOImpl(db);
// Fixtures
@ -63,7 +65,8 @@ public class AlarmDAOImplTest {
public void shouldFind() {
List<Alarm> alarms = repo.find();
Alarm alarm = new Alarm("123", "90% CPU", "compute", "CPU", "3", dimensions, "GTE", 90l);
Alarm alarm = new Alarm("111", "123", "90% CPU", "compute", "CPU", "3", dimensions, 60, 3,
AggregateFunction.AVERAGE, AlarmOperator.GT, 90l, AlarmState.UNDETERMINED);
assertEquals(alarms, Arrays.asList(alarm));
}
}