Thresholding work

This commit is contained in:
Jonathan Halterman 2013-04-29 17:34:31 -07:00
parent af28772ff5
commit 41bc1608da
11 changed files with 209 additions and 141 deletions

View File

@ -23,18 +23,21 @@ public class ThresholdingConfiguration {
/** Total number of acker threads across the cluster. */
@NotNull public Integer numAckerThreads = 12;
@NotNull public Integer maasMetricSpoutThreads = 6;
@NotNull public Integer maasMetricSpoutTasks = 6;
@NotNull public Integer collectdMetricSpoutThreads = 6;
@NotNull public Integer collectdMetricSpoutTasks = 9;
@NotNull public Integer maasMetricSpoutThreads = 6;
@NotNull public Integer maasMetricSpoutTasks = 6;
@NotNull public Integer eventSpoutThreads = 3;
@NotNull public Integer eventSpoutTasks = 3;
@NotNull public Integer eventBoltThreads = 3;
@NotNull public Integer eventBoltTasks = 3;
@NotNull public Integer filteringBoltThreads = 6;
@NotNull public Integer filteringBoltTasks = 15;
@NotNull public Integer aggregationBoltThreads = 12;
@NotNull public Integer aggregationBoltTasks = 30;

View File

@ -17,6 +17,7 @@ import com.hpcloud.maas.infrastructure.thresholding.EventProcessingBolt;
import com.hpcloud.maas.infrastructure.thresholding.MaasEventDeserializer;
import com.hpcloud.maas.infrastructure.thresholding.MaasMetricDeserializer;
import com.hpcloud.maas.infrastructure.thresholding.MetricAggregationBolt;
import com.hpcloud.maas.infrastructure.thresholding.MetricFilteringBolt;
import com.hpcloud.util.Injector;
/**
@ -104,11 +105,19 @@ public class TopologyModule extends AbstractModule {
.shuffleGrouping("event-spout")
.setNumTasks(config.eventBoltTasks);
// Metrics / Event -> Aggregation
// Metrics / Event -> Filtering
builder.setBolt("filtering-bolt", new MetricFilteringBolt(config.database),
config.filteringBoltThreads)
.shuffleGrouping("collectd-metrics-spout")
.shuffleGrouping("maas-metrics-spout")
.allGrouping("event-bolt", EventProcessingBolt.METRIC_SUB_ALARM_EVENT_STREAM_ID)
.allGrouping("event-bolt", EventProcessingBolt.METRIC_ALARM_EVENT_STREAM_ID)
.setNumTasks(config.filteringBoltTasks);
// Filtering / Event -> Aggregation
builder.setBolt("aggregation-bolt", new MetricAggregationBolt(config.database),
config.aggregationBoltThreads)
.fieldsGrouping("collectd-metrics-spout", new Fields("metricDefinition"))
.fieldsGrouping("maas-metrics-spout", new Fields("metricDefinition"))
.fieldsGrouping("filtering-bolt", new Fields("metricDefinition"))
.fieldsGrouping("event-bolt", EventProcessingBolt.METRIC_SUB_ALARM_EVENT_STREAM_ID,
new Fields("metricDefinition"))
.fieldsGrouping("event-bolt", EventProcessingBolt.METRIC_ALARM_EVENT_STREAM_ID,

View File

@ -0,0 +1,15 @@
package com.hpcloud.maas.domain.service;
import java.util.List;
import com.hpcloud.maas.common.model.metric.MetricDefinition;
/**
* Metric definition data access object.
*
* @author Jonathan Halterman
*/
public interface MetricDefinitionDAO {
/** Finds all metric definitions for all alarms. */
List<MetricDefinition> findForAlarms();
}

View File

@ -0,0 +1,64 @@
package com.hpcloud.maas.infrastructure.persistence;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.inject.Inject;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import com.hpcloud.maas.common.model.metric.MetricDefinition;
import com.hpcloud.maas.domain.service.MetricDefinitionDAO;
/**
* MetricDefinition DAO implementation.
*
* @author Jonathan Halterman
*/
public class MetricDefinitionDAOImpl implements MetricDefinitionDAO {
private static final String METRIC_DEF_SQL = "select sa.namespace, sa.metric_type, sa.metric_subject, sad.dimensions from sub_alarm as sa, "
+ "(select sub_alarm_id, group_concat(dimension_name, '=', value) as dimensions from sub_alarm_dimension group by sub_alarm_id) as sad "
+ "where sa.id = sad.sub_alarm_id";
private final DBI db;
@Inject
public MetricDefinitionDAOImpl(DBI db) {
this.db = db;
}
@Override
public List<MetricDefinition> findForAlarms() {
Handle h = db.open();
try {
List<Map<String, Object>> rows = h.createQuery(METRIC_DEF_SQL).list();
List<MetricDefinition> metricDefs = new ArrayList<MetricDefinition>(rows.size());
for (Map<String, Object> row : rows) {
String namespace = (String) row.get("namespace");
String type = (String) row.get("metric_type");
String subject = (String) row.get("metric_subject");
String dimensionSet = (String) row.get("dimensions");
Map<String, String> dimensions = null;
if (dimensionSet != null) {
dimensions = new HashMap<String, String>();
for (String kvStr : dimensionSet.split(",")) {
String[] kv = kvStr.split("=");
dimensions.put(kv[0], kv[1]);
}
}
metricDefs.add(new MetricDefinition(namespace, type, subject, dimensions));
}
return metricDefs;
} finally {
h.close();
}
}
}

View File

@ -8,6 +8,7 @@ import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.hpcloud.maas.domain.service.AlarmDAO;
import com.hpcloud.maas.domain.service.MetricDefinitionDAO;
import com.hpcloud.maas.domain.service.SubAlarmDAO;
import com.hpcloud.persistence.DatabaseConfiguration;
@ -27,6 +28,7 @@ public class PersistenceModule extends AbstractModule {
protected void configure() {
bind(AlarmDAO.class).to(AlarmDAOImpl.class).in(Scopes.SINGLETON);
bind(SubAlarmDAO.class).to(SubAlarmDAOImpl.class).in(Scopes.SINGLETON);
bind(MetricDefinitionDAO.class).to(MetricDefinitionDAOImpl.class).in(Scopes.SINGLETON);
}
@Provides

View File

@ -50,9 +50,7 @@ public class AlarmThresholdingBolt extends BaseRichBolt {
private final Map<String, Alarm> alarms = new HashMap<String, Alarm>();
private String alertExchange;
private String alertRoutingKey;
private transient AlarmDAO alarmDAO;
private transient RabbitMQService rabbitService;
private TopologyContext context;
private OutputCollector collector;

View File

@ -140,11 +140,11 @@ public class MetricAggregationBolt extends BaseRichBolt {
* Aggregates values for the {@code metric} that are within the periods defined for the alarm.
*/
void aggregateValues(Metric metric) {
SubAlarmStatsRepository subAlarmStatsRepo = subAlarmStatsRepos.get(metric.definition);
LOG.trace("{} Aggregating values for {}", context.getThisTaskId(), metric);
SubAlarmStatsRepository subAlarmStatsRepo = getOrCreateSubAlarmStatsRepo(metric.definition);
if (subAlarmStatsRepo == null)
return;
LOG.debug("{} Aggregating values for {}", context.getThisTaskId(), metric);
for (SubAlarmStats stats : subAlarmStatsRepo.get())
stats.getStats().addValue(metric.value, metric.timestamp);
}

View File

@ -1,39 +1,31 @@
package com.hpcloud.maas.infrastructure.thresholding;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import backtype.storm.Config;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import com.hpcloud.maas.common.event.AlarmCreatedEvent;
import com.hpcloud.maas.common.event.AlarmDeletedEvent;
import com.hpcloud.maas.common.model.metric.Metric;
import com.hpcloud.maas.common.model.metric.MetricDefinition;
import com.hpcloud.maas.domain.model.SubAlarm;
import com.hpcloud.maas.domain.model.SubAlarmStats;
import com.hpcloud.maas.domain.service.MetricDefinitionDAO;
import com.hpcloud.maas.domain.service.SubAlarmDAO;
import com.hpcloud.maas.domain.service.SubAlarmStatsRepository;
import com.hpcloud.maas.infrastructure.persistence.PersistenceModule;
import com.hpcloud.maas.infrastructure.storm.Streams;
import com.hpcloud.maas.infrastructure.storm.Tuples;
import com.hpcloud.persistence.DatabaseConfiguration;
import com.hpcloud.util.Injector;
/**
* Filters metrics for which there is no associated alarm.
* Filters metrics for which there is no associated alarm and forwards metrics for which there is an
* alarm. Receives metric alarm and metric sub-alarm events to update metric definitions.
*
* <ul>
* <li>Input: MetricDefinition metricDefinition, Metric metric
@ -47,22 +39,15 @@ import com.hpcloud.util.Injector;
* @author Jonathan Halterman
*/
public class MetricFilteringBolt extends BaseRichBolt {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(MetricFilteringBolt.class);
private static final long serialVersionUID = 5624314196838090726L;
public static final String TICK_TUPLE_SECONDS_KEY = "maas.aggregation.tick.seconds";
private static final Map<MetricDefinition, Object> METRIC_DEFS = new ConcurrentHashMap<MetricDefinition, Object>();
private static final Object SENTINAL = new Object();
private final Map<MetricDefinition, SubAlarmStatsRepository> subAlarmStatsRepos = new HashMap<MetricDefinition, SubAlarmStatsRepository>();
private final Multimap<String, String> alarmSubAlarms = ArrayListMultimap.create();
private DatabaseConfiguration dbConfig;
private transient SubAlarmDAO subAlarmDAO;
private final DatabaseConfiguration dbConfig;
private transient MetricDefinitionDAO metricDefDAO;
private TopologyContext context;
private OutputCollector collector;
private int evaluationTimeOffset;
public MetricFilteringBolt(SubAlarmDAO subAlarmDAO) {
this.subAlarmDAO = subAlarmDAO;
}
public MetricFilteringBolt(DatabaseConfiguration dbConfig) {
this.dbConfig = dbConfig;
@ -70,35 +55,27 @@ public class MetricFilteringBolt extends BaseRichBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("alarmId", "subAlarm"));
declarer.declare(new Fields("metricDefinition", "metric"));
}
@Override
public void execute(Tuple tuple) {
try {
if (Tuples.isTickTuple(tuple)) {
evaluateAlarmsAndSlideWindows();
if (Streams.DEFAULT_STREAM_ID.equals(tuple.getSourceStreamId())) {
MetricDefinition metricDef = (MetricDefinition) tuple.getValue(0);
if (METRIC_DEFS.containsKey(metricDef))
collector.emit(tuple, tuple.getValues());
} else {
if (Streams.DEFAULT_STREAM_ID.equals(tuple.getSourceStreamId())) {
Metric metric = (Metric) tuple.getValueByField("metric");
aggregateValues(metric);
} else {
MetricDefinition metricDefinition = (MetricDefinition) tuple.getValue(1);
SubAlarmStatsRepository subAlarmStatsRepo = getOrCreateSubAlarmStatsRepo(metricDefinition);
if (subAlarmStatsRepo == null)
return;
String eventType = tuple.getString(0);
MetricDefinition metricDefinition = (MetricDefinition) tuple.getValue(1);
String eventType = tuple.getString(0);
if (EventProcessingBolt.METRIC_ALARM_EVENT_STREAM_ID.equals(tuple.getSourceStreamId())) {
String alarmId = tuple.getString(2);
if (AlarmDeletedEvent.class.getSimpleName().equals(eventType))
handleAlarmDeleted(subAlarmStatsRepo, alarmId);
} else if (EventProcessingBolt.METRIC_SUB_ALARM_EVENT_STREAM_ID.equals(tuple.getSourceStreamId())) {
SubAlarm subAlarm = (SubAlarm) tuple.getValue(2);
if (AlarmCreatedEvent.class.getSimpleName().equals(eventType))
handleAlarmCreated(subAlarmStatsRepo, subAlarm);
}
if (EventProcessingBolt.METRIC_ALARM_EVENT_STREAM_ID.equals(tuple.getSourceStreamId())) {
if (AlarmDeletedEvent.class.getSimpleName().equals(eventType))
METRIC_DEFS.remove(metricDefinition);
} else if (EventProcessingBolt.METRIC_SUB_ALARM_EVENT_STREAM_ID.equals(tuple.getSourceStreamId())) {
if (AlarmCreatedEvent.class.getSimpleName().equals(eventType))
METRIC_DEFS.put(metricDefinition, SENTINAL);
}
}
} catch (Exception e) {
@ -108,103 +85,26 @@ public class MetricFilteringBolt extends BaseRichBolt {
}
}
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = new HashMap<String, Object>();
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
Integer.valueOf(System.getProperty(TICK_TUPLE_SECONDS_KEY, "60")).intValue());
return conf;
}
@Override
@SuppressWarnings("rawtypes")
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
LOG.info("{} Preparing {}", context.getThisTaskId(), context.getThisComponentId());
this.context = context;
this.collector = collector;
evaluationTimeOffset = Integer.valueOf(System.getProperty(TICK_TUPLE_SECONDS_KEY, "60"))
.intValue() * 1000;
if (subAlarmDAO == null) {
if (metricDefDAO == null) {
Injector.registerIfNotBound(SubAlarmDAO.class, new PersistenceModule(dbConfig));
subAlarmDAO = Injector.getInstance(SubAlarmDAO.class);
metricDefDAO = Injector.getInstance(MetricDefinitionDAO.class);
}
}
/**
* Aggregates values for the {@code metric} that are within the periods defined for the alarm.
*/
void aggregateValues(Metric metric) {
SubAlarmStatsRepository subAlarmStatsRepo = subAlarmStatsRepos.get(metric.definition);
if (subAlarmStatsRepo == null)
return;
LOG.debug("{} Aggregating values for {}", context.getThisTaskId(), metric);
for (SubAlarmStats stats : subAlarmStatsRepo.get())
stats.getStats().addValue(metric.value, metric.timestamp);
}
/**
* Evaluates all SubAlarms for all SubAlarmStatsRepositories using an evaluation time of 1 minute
* ago, then sliding the window to the current time.
*/
void evaluateAlarmsAndSlideWindows() {
LOG.debug("{} Evaluating alarms and advancing windows", context.getThisTaskId());
long newWindowTimestamp = System.currentTimeMillis();
long evaluationTimestamp = newWindowTimestamp - evaluationTimeOffset;
for (SubAlarmStatsRepository subAlarmStatsRepo : subAlarmStatsRepos.values())
for (SubAlarmStats subAlarmStats : subAlarmStatsRepo.get()) {
LOG.debug("{} Evaluating {} for timestamp {}", context.getThisTaskId(), subAlarmStats,
evaluationTimestamp);
if (subAlarmStats.evaluateAndSlideWindow(evaluationTimestamp, newWindowTimestamp)) {
LOG.debug("{} Alarm state changed for {}", context.getThisTaskId(), subAlarmStats);
collector.emit(new Values(subAlarmStats.getSubAlarm().getAlarmId(),
subAlarmStats.getSubAlarm()));
// DCL
if (METRIC_DEFS.isEmpty()) {
synchronized (SENTINAL) {
if (METRIC_DEFS.isEmpty()) {
for (MetricDefinition metricDef : metricDefDAO.findForAlarms())
METRIC_DEFS.put(metricDef, SENTINAL);
}
}
}
/**
* Returns an existing or newly created SubAlarmStatsRepository for the {@code metricDefinition}.
* Newly created SubAlarmStatsRepositories are initialized with stats whose view ends one minute
* from now.
*/
SubAlarmStatsRepository getOrCreateSubAlarmStatsRepo(MetricDefinition metricDefinition) {
SubAlarmStatsRepository subAlarmStatsRepo = subAlarmStatsRepos.get(metricDefinition);
if (subAlarmStatsRepo == null) {
List<SubAlarm> subAlarms = subAlarmDAO.find(metricDefinition);
if (subAlarms.isEmpty())
LOG.warn("{} Failed to find sub alarms for {}", context.getThisTaskId(), metricDefinition);
else {
long viewEndTimestamp = System.currentTimeMillis() + evaluationTimeOffset;
subAlarmStatsRepo = new SubAlarmStatsRepository(subAlarms, viewEndTimestamp);
subAlarmStatsRepos.put(metricDefinition, subAlarmStatsRepo);
for (SubAlarm subAlarm : subAlarms)
alarmSubAlarms.put(subAlarm.getAlarmId(), subAlarm.getId());
}
}
return subAlarmStatsRepo;
}
/**
* Adds the {@code subAlarm} to the {@code subAlarmStatsRepo} with a view end time of one minute
* from now, and adds the {@code subAlarm} to the {@link #alarmSubAlarms}.
*/
void handleAlarmCreated(SubAlarmStatsRepository subAlarmStatsRepo, SubAlarm subAlarm) {
LOG.debug("{} Received AlarmCreatedEvent for {}", context.getThisTaskId(), subAlarm);
long viewEndTimestamp = System.currentTimeMillis() + evaluationTimeOffset;
subAlarmStatsRepo.add(subAlarm, viewEndTimestamp);
alarmSubAlarms.put(subAlarm.getAlarmId(), subAlarm.getId());
}
/**
* Removes all SubAlarms from the {@code subAlarmStatsRepo} and the {@link #alarmSubAlarms} for
* the {@code alarmId}.
*/
void handleAlarmDeleted(SubAlarmStatsRepository subAlarmStatsRepo, String alarmId) {
LOG.debug("{} Received AlarmDeletedEvent for alarm id {}", context.getThisTaskId(), alarmId);
for (String subAlarmId : alarmSubAlarms.removeAll(alarmId))
subAlarmStatsRepo.remove(subAlarmId);
}
}

View File

@ -27,6 +27,7 @@ import com.hpcloud.maas.common.model.metric.MetricDefinition;
import com.hpcloud.maas.domain.model.Alarm;
import com.hpcloud.maas.domain.model.SubAlarm;
import com.hpcloud.maas.domain.service.AlarmDAO;
import com.hpcloud.maas.domain.service.MetricDefinitionDAO;
import com.hpcloud.maas.domain.service.SubAlarmDAO;
import com.hpcloud.maas.infrastructure.storm.NoopSpout;
import com.hpcloud.maas.infrastructure.storm.TopologyTestCase;
@ -49,6 +50,7 @@ public class ThresholdingEngineTest extends TopologyTestCase {
private SubAlarmDAO subAlarmDAO;
private MetricDefinition cpuMetricDef;
private MetricDefinition memMetricDef;
private MetricDefinitionDAO metricDefinitionDAO;
public ThresholdingEngineTest() {
// Fixtures
@ -81,6 +83,10 @@ public class ThresholdingEngineTest extends TopologyTestCase {
}
});
metricDefinitionDAO = mock(MetricDefinitionDAO.class);
List<MetricDefinition> metricDefs = Arrays.asList(cpuMetricDef, memMetricDef);
when(metricDefinitionDAO.findForAlarms()).thenReturn(metricDefs);
final RabbitMQService rabbitMQService = mock(RabbitMQService.class);
// Bindings
@ -89,6 +95,7 @@ public class ThresholdingEngineTest extends TopologyTestCase {
protected void configure() {
bind(AlarmDAO.class).toInstance(alarmDAO);
bind(SubAlarmDAO.class).toInstance(subAlarmDAO);
bind(MetricDefinitionDAO.class).toInstance(metricDefinitionDAO);
bind(RabbitMQService.class).toInstance(rabbitMQService);
}
});

View File

@ -0,0 +1,68 @@
package com.hpcloud.maas.infrastructure.persistence;
import static org.testng.Assert.assertTrue;
import java.util.Arrays;
import java.util.List;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.google.common.collect.ImmutableMap;
import com.hpcloud.maas.common.model.metric.MetricDefinition;
import com.hpcloud.maas.domain.service.MetricDefinitionDAO;
/**
* Note: MySQL dependent test.
*
* @author Jonathan Halterman
*/
@Test(groups = "database", enabled = false)
public class MetricDefinitionDAOImplTest {
private DBI db;
private Handle handle;
private MetricDefinitionDAO dao;
@BeforeClass
protected void setupClass() throws Exception {
db = new DBI("jdbc:mysql://localhost/maas", "root", "password");
handle = db.open();
dao = new MetricDefinitionDAOImpl(db);
}
@AfterClass
protected void afterClass() {
handle.close();
}
@BeforeMethod
protected void beforeMethod() {
handle.execute("delete from alarm where id = 123");
handle.execute("insert into alarm (id, tenant_id, name, expression, state, created_at, updated_at) "
+ "values ('123', 'bob', '90% CPU', 'avg(compute:cpu:{flavor_id=777, image_id=888}) > 10', 'UNDETERMINED', NOW(), NOW())");
handle.execute("insert into sub_alarm (id, alarm_id, function, namespace, metric_type, metric_subject, operator, threshold, period, periods, state, created_at, updated_at) "
+ "values ('111', '123', 'AVG', 'compute', 'cpu', '1', 'GT', 10, 60, 1, 'OK', NOW(), NOW())");
handle.execute("insert into sub_alarm (id, alarm_id, function, namespace, metric_type, metric_subject, operator, threshold, period, periods, state, created_at, updated_at) "
+ "values ('222', '123', 'AVG', 'compute', 'mem', null, 'GT', 10, 60, 1, 'OK', NOW(), NOW())");
handle.execute("insert into sub_alarm_dimension values ('111', 'flavor_id', '777')");
handle.execute("insert into sub_alarm_dimension values ('111', 'image_id', '888')");
handle.execute("insert into sub_alarm_dimension values ('222', 'instance_id', '123')");
handle.execute("insert into sub_alarm_dimension values ('222', 'az', '2')");
}
public void shouldFindForAlarms() {
List<MetricDefinition> expected = Arrays.asList(new MetricDefinition("compute", "cpu", "1",
ImmutableMap.<String, String>builder()
.put("flavor_id", "777")
.put("image_id", "888")
.build()), new MetricDefinition("compute", "mem", null,
ImmutableMap.<String, String>builder().put("instance_id", "123").put("az", "2").build()));
assertTrue(dao.findForAlarms().containsAll(expected));
}
}

View File

@ -8,6 +8,7 @@ import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
@SuppressWarnings("serial")
public class NoopSpout extends BaseRichSpout {
private final Fields outputFields;
@ -16,6 +17,7 @@ public class NoopSpout extends BaseRichSpout {
}
@Override
@SuppressWarnings("rawtypes")
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
}