Threshold Engine had no awareness of Tenant ID for incoming metrics and so did not consider it when matching Metrics to SubAlarms.

Changed MetricSpout to send a new class MetricDefinitionAndTenantId so the bolts could match Metrics to the correct SubAlarm using MetricDefinition and Tenant ID. Had to change places that sent MetricDefinitions for Events to also send a MetricDefinitionAndTenantId.

Had to change MetricDefinitionDAOImpl to add in the TenantId from the Alarm.

Had to change SubAlarmDAO to take into account the TenantId when looking for matching MetricDefinitions.
This commit is contained in:
Craig Bryant 2014-04-08 14:41:03 -06:00
parent 0f9e2f588b
commit 415bdd47d6
17 changed files with 265 additions and 143 deletions

View File

@ -110,20 +110,20 @@ public class TopologyModule extends AbstractModule {
builder.setBolt("aggregation-bolt",
new MetricAggregationBolt(config.database, config.sporadicMetricNamespaces),
config.aggregationBoltThreads)
.fieldsGrouping("filtering-bolt", new Fields("metricDefinition"))
.fieldsGrouping("filtering-bolt", new Fields(MetricFilteringBolt.FIELDS[0]))
.fieldsGrouping("event-bolt", EventProcessingBolt.METRIC_SUB_ALARM_EVENT_STREAM_ID,
new Fields("metricDefinition"))
new Fields(EventProcessingBolt.METRIC_SUB_ALARM_EVENT_STREAM_FIELDS[1]))
.fieldsGrouping("event-bolt", EventProcessingBolt.METRIC_ALARM_EVENT_STREAM_ID,
new Fields("metricDefinition"))
new Fields(EventProcessingBolt.METRIC_ALARM_EVENT_STREAM_FIELDS[1]))
.setNumTasks(config.aggregationBoltTasks);
// Aggregation / Event -> Thresholding
builder.setBolt("thresholding-bolt",
new AlarmThresholdingBolt(config.database),
config.thresholdingBoltThreads)
.fieldsGrouping("aggregation-bolt", new Fields("alarmId"))
.fieldsGrouping("aggregation-bolt", new Fields(MetricAggregationBolt.FIELDS[0]))
.fieldsGrouping("event-bolt", EventProcessingBolt.ALARM_EVENT_STREAM_ID,
new Fields("alarmId"))
new Fields(EventProcessingBolt.ALARM_EVENT_STREAM_FIELDS[1]))
.setNumTasks(config.thresholdingBoltTasks);
return builder.createTopology();

View File

@ -0,0 +1,59 @@
package com.hpcloud.mon.domain.model;
import java.io.Serializable;
import com.hpcloud.mon.common.model.metric.MetricDefinition;
public class MetricDefinitionAndTenantId implements Serializable {
private static final long serialVersionUID = -4224596705186481749L;
public MetricDefinition metricDefinition;
public String tenantId;
public MetricDefinitionAndTenantId(MetricDefinition metricDefinition,
String tenantId) {
this.metricDefinition = metricDefinition;
this.tenantId = tenantId;
}
@Override
public int hashCode() {
int result = 0;
if (this.metricDefinition != null)
result += this.metricDefinition.hashCode();
if (this.tenantId != null)
result = result * 31 + this.tenantId.hashCode();
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (getClass() != obj.getClass())
return false;
final MetricDefinitionAndTenantId other = (MetricDefinitionAndTenantId) obj;
if (!compareObjects(this.tenantId, other.tenantId))
return false;
if (!compareObjects(this.metricDefinition, other.metricDefinition))
return false;
return true;
}
private boolean compareObjects(final Object o1,
final Object o2) {
if (o1 == null) {
if (o2 != null)
return false;
} else if (!o1.equals(o2))
return false;
return true;
}
@Override
public String toString() {
return String.format("MetricDefinitionAndTenantId tenantId=%s metricDefinition=%s", this.tenantId, this.metricDefinition);
}
}

View File

@ -2,7 +2,7 @@ package com.hpcloud.mon.domain.service;
import java.util.List;
import com.hpcloud.mon.common.model.metric.MetricDefinition;
import com.hpcloud.mon.domain.model.MetricDefinitionAndTenantId;
import com.hpcloud.mon.domain.model.SubAlarm;
/**
@ -12,5 +12,5 @@ import com.hpcloud.mon.domain.model.SubAlarm;
*/
public interface SubAlarmDAO {
/** Finds and returns all sub alarms for the {@code metricDefinition}. */
List<SubAlarm> find(MetricDefinition metricDefinition);
List<SubAlarm> find(MetricDefinitionAndTenantId metricDefinition);
}

View File

@ -1,22 +1,22 @@
package com.hpcloud.mon.domain.service;
import com.hpcloud.mon.common.model.metric.MetricDefinition;
import com.hpcloud.mon.domain.model.MetricDefinitionAndTenantId;
public class SubAlarmMetricDefinition {
private final String subAlarmId;
private final MetricDefinition metricDefinition;
private final MetricDefinitionAndTenantId metricDefinitionAndTenantId;
public SubAlarmMetricDefinition(String subAlarmId,
MetricDefinition metricDefinition) {
MetricDefinitionAndTenantId metricDefinitionAndTenantId) {
this.subAlarmId = subAlarmId;
this.metricDefinition = metricDefinition;
this.metricDefinitionAndTenantId = metricDefinitionAndTenantId;
}
public String getSubAlarmId() {
return subAlarmId;
}
public MetricDefinition getMetricDefinition() {
return metricDefinition;
public MetricDefinitionAndTenantId getMetricDefinitionAndTenantId() {
return metricDefinitionAndTenantId;
}
}

View File

@ -11,6 +11,7 @@ import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import com.hpcloud.mon.common.model.metric.MetricDefinition;
import com.hpcloud.mon.domain.model.MetricDefinitionAndTenantId;
import com.hpcloud.mon.domain.service.MetricDefinitionDAO;
import com.hpcloud.mon.domain.service.SubAlarmMetricDefinition;
@ -20,7 +21,7 @@ import com.hpcloud.mon.domain.service.SubAlarmMetricDefinition;
* @author Jonathan Halterman
*/
public class MetricDefinitionDAOImpl implements MetricDefinitionDAO {
private static final String METRIC_DEF_SQL = "select sa.id, sa.metric_name, sad.dimensions from alarm as a, sub_alarm as sa "
private static final String METRIC_DEF_SQL = "select sa.id, a.tenant_id, sa.metric_name, sad.dimensions from alarm as a, sub_alarm as sa "
+ "left join (select sub_alarm_id, group_concat(dimension_name, '=', value) as dimensions from sub_alarm_dimension group by sub_alarm_id) as sad on sa.id = sad.sub_alarm_id "
+ "where a.id = sa.alarm_id and a.deleted_at is null";
@ -41,6 +42,7 @@ public class MetricDefinitionDAOImpl implements MetricDefinitionDAO {
List<SubAlarmMetricDefinition> metricDefs = new ArrayList<>(rows.size());
for (Map<String, Object> row : rows) {
String subAlarmId = (String) row.get("id");
String tenantId = (String) row.get("tenant_id");
String metric_name = (String) row.get("metric_name");
String dimensionSet = (String) row.get("dimensions");
Map<String, String> dimensions = null;
@ -57,7 +59,7 @@ public class MetricDefinitionDAOImpl implements MetricDefinitionDAO {
}
metricDefs.add(new SubAlarmMetricDefinition(subAlarmId,
new MetricDefinition(metric_name, dimensions)));
new MetricDefinitionAndTenantId(new MetricDefinition(metric_name, dimensions), tenantId)));
}
return metricDefs;

View File

@ -14,6 +14,7 @@ import com.hpcloud.mon.common.model.alarm.AggregateFunction;
import com.hpcloud.mon.common.model.alarm.AlarmOperator;
import com.hpcloud.mon.common.model.alarm.AlarmSubExpression;
import com.hpcloud.mon.common.model.metric.MetricDefinition;
import com.hpcloud.mon.domain.model.MetricDefinitionAndTenantId;
import com.hpcloud.mon.domain.model.SubAlarm;
import com.hpcloud.mon.domain.service.SubAlarmDAO;
import com.hpcloud.persistence.SqlStatements;
@ -29,12 +30,12 @@ public class SubAlarmDAOImpl implements SubAlarmDAO {
* table, grouping by the dimension id and counting them to ensure that the number of matched
* dimensions equals the number of actual dimensions in the table for the subscription.
*/
private static final String FIND_BY_METRIC_DEF_SQL = "select sa.* from sub_alarm sa, sub_alarm_dimension d "
private static final String FIND_BY_METRIC_DEF_SQL = "select sa.* from sub_alarm sa, alarm a, sub_alarm_dimension d "
+ "join (%s) v on d.dimension_name = v.dimension_name and d.value = v.value "
+ "where sa.id = d.sub_alarm_id and sa.metric_name = :metric_name "
+ "where sa.id = d.sub_alarm_id and sa.metric_name = :metric_name and a.tenant_id = :tenant_id and a.id = sa.alarm_id and a.deleted_at is null "
+ "group by d.sub_alarm_id having count(d.sub_alarm_id) = %s";
private static final String FIND_BY_METRIC_DEF_NO_DIMS_SQL = "select * from sub_alarm sa where sa.metric_name = :metric_name "
+ "and (select count(*) from sub_alarm_dimension where sub_alarm_id = sa.id) = 0";
+ "and a.tenant_id = :tenant_id and a.id = sa.alarm_id and a.deleted_at is null and (select count(*) from sub_alarm_dimension where sub_alarm_id = sa.id) = 0";
private final DBI db;
@ -44,11 +45,12 @@ public class SubAlarmDAOImpl implements SubAlarmDAO {
}
@Override
public List<SubAlarm> find(MetricDefinition metricDefinition) {
public List<SubAlarm> find(MetricDefinitionAndTenantId metricDefinitionTenantId) {
Handle h = db.open();
try {
String sql = null;
final MetricDefinition metricDefinition = metricDefinitionTenantId.metricDefinition;
final String sql;
if (metricDefinition.dimensions == null || metricDefinition.dimensions.isEmpty())
sql = FIND_BY_METRIC_DEF_NO_DIMS_SQL;
else {
@ -59,7 +61,7 @@ public class SubAlarmDAOImpl implements SubAlarmDAO {
}
Query<Map<String, Object>> query = h.createQuery(sql).bind("metric_name",
metricDefinition.name);
metricDefinition.name).bind("tenant_id", metricDefinitionTenantId.tenantId);
List<Map<String, Object>> rows = query.list();
List<SubAlarm> subAlarms = new ArrayList<SubAlarm>(rows.size());

View File

@ -18,6 +18,7 @@ import com.hpcloud.mon.common.event.AlarmDeletedEvent;
import com.hpcloud.mon.common.event.AlarmUpdatedEvent;
import com.hpcloud.mon.common.model.alarm.AlarmSubExpression;
import com.hpcloud.mon.common.model.metric.MetricDefinition;
import com.hpcloud.mon.domain.model.MetricDefinitionAndTenantId;
import com.hpcloud.mon.domain.model.SubAlarm;
import com.hpcloud.streaming.storm.Logging;
@ -47,9 +48,9 @@ public class EventProcessingBolt extends BaseRichBolt {
public static final String[] ALARM_EVENT_STREAM_FIELDS = new String[] {"eventType", "alarmId"};
public static final String[] METRIC_ALARM_EVENT_STREAM_FIELDS = new String[] {"eventType",
"metricDefinition", "subAlarmId"};
"metricDefinitionAndTenantId", "subAlarmId"};
public static final String[] METRIC_SUB_ALARM_EVENT_STREAM_FIELDS = new String[] {"eventType",
"metricDefinition", "subAlarm"};
"metricDefinitionAndTenantId", "subAlarm"};
public static final String CREATED = "created";
public static final String DELETED = "deleted";
@ -93,34 +94,35 @@ public class EventProcessingBolt extends BaseRichBolt {
void handle(AlarmCreatedEvent event) {
for (Map.Entry<String, AlarmSubExpression> subExpressionEntry : event.alarmSubExpressions.entrySet()) {
sendAddSubAlarm(event.alarmId, subExpressionEntry.getKey(), subExpressionEntry.getValue());
sendAddSubAlarm(event.alarmId, subExpressionEntry.getKey(), event.tenantId, subExpressionEntry.getValue());
}
}
private void sendAddSubAlarm(String alarmId, String subAlarmId, AlarmSubExpression alarmSubExpression) {
private void sendAddSubAlarm(String alarmId, String subAlarmId, String tenantId, AlarmSubExpression alarmSubExpression) {
MetricDefinition metricDef = alarmSubExpression.getMetricDefinition();
collector.emit(METRIC_SUB_ALARM_EVENT_STREAM_ID, new Values(CREATED, metricDef,
collector.emit(METRIC_SUB_ALARM_EVENT_STREAM_ID, new Values(CREATED, new MetricDefinitionAndTenantId(metricDef, tenantId),
new SubAlarm(subAlarmId, alarmId, alarmSubExpression)));
}
void handle(AlarmDeletedEvent event) {
for (Map.Entry<String, MetricDefinition> entry : event.subAlarmMetricDefinitions.entrySet()) {
sendDeletedSubAlarm(entry.getKey(), entry.getValue());
sendDeletedSubAlarm(entry.getKey(), event.tenantId, entry.getValue());
}
collector.emit(ALARM_EVENT_STREAM_ID, new Values(DELETED, event.alarmId));
}
private void sendDeletedSubAlarm(String subAlarmId, MetricDefinition metricDef) {
collector.emit(METRIC_ALARM_EVENT_STREAM_ID, new Values(DELETED, metricDef, subAlarmId));
private void sendDeletedSubAlarm(String subAlarmId, String tenantId, MetricDefinition metricDef) {
collector.emit(METRIC_ALARM_EVENT_STREAM_ID, new Values(DELETED,
new MetricDefinitionAndTenantId(metricDef, tenantId), subAlarmId));
}
void handle(AlarmUpdatedEvent event) {
for (Map.Entry<String, AlarmSubExpression> entry : event.oldAlarmSubExpressions.entrySet()) {
sendDeletedSubAlarm(entry.getKey(), entry.getValue().getMetricDefinition());
sendDeletedSubAlarm(entry.getKey(), event.tenantId, entry.getValue().getMetricDefinition());
}
for (Map.Entry<String, AlarmSubExpression> entry : event.newAlarmSubExpressions.entrySet()) {
sendAddSubAlarm(event.alarmId, entry.getKey(), entry.getValue());
sendAddSubAlarm(event.alarmId, entry.getKey(), event.tenantId, entry.getValue());
}
collector.emit(ALARM_EVENT_STREAM_ID, new Values(UPDATED, event.alarmId));
}

View File

@ -19,7 +19,7 @@ import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import com.hpcloud.mon.common.model.metric.Metric;
import com.hpcloud.mon.common.model.metric.MetricDefinition;
import com.hpcloud.mon.domain.model.MetricDefinitionAndTenantId;
import com.hpcloud.mon.domain.model.SubAlarm;
import com.hpcloud.mon.domain.model.SubAlarmStats;
import com.hpcloud.mon.domain.service.SubAlarmDAO;
@ -52,8 +52,9 @@ import com.hpcloud.util.Injector;
public class MetricAggregationBolt extends BaseRichBolt {
private static final long serialVersionUID = 5624314196838090726L;
public static final String TICK_TUPLE_SECONDS_KEY = "maas.aggregation.tick.seconds";
public static final String[] FIELDS = new String[] { "alarmId", "subAlarm" };
final Map<MetricDefinition, SubAlarmStatsRepository> subAlarmStatsRepos = new HashMap<MetricDefinition, SubAlarmStatsRepository>();
final Map<MetricDefinitionAndTenantId, SubAlarmStatsRepository> subAlarmStatsRepos = new HashMap<>();
private transient Logger LOG;
private DataSourceFactory dbConfig;
private transient SubAlarmDAO subAlarmDAO;
@ -73,7 +74,7 @@ public class MetricAggregationBolt extends BaseRichBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("alarmId", "subAlarm"));
declarer.declare(new Fields(FIELDS));
}
@Override
@ -84,21 +85,21 @@ public class MetricAggregationBolt extends BaseRichBolt {
evaluateAlarmsAndSlideWindows();
} else {
if (Streams.DEFAULT_STREAM_ID.equals(tuple.getSourceStreamId())) {
MetricDefinition metricDefinition = (MetricDefinition) tuple.getValue(0);
MetricDefinitionAndTenantId metricDefinitionAndTenantId = (MetricDefinitionAndTenantId) tuple.getValue(0);
Metric metric = (Metric) tuple.getValueByField("metric");
aggregateValues(metricDefinition, metric);
aggregateValues(metricDefinitionAndTenantId, metric);
} else {
String eventType = tuple.getString(0);
MetricDefinition metricDefinition = (MetricDefinition) tuple.getValue(1);
MetricDefinitionAndTenantId metricDefinitionAndTenantId = (MetricDefinitionAndTenantId) tuple.getValue(1);
if (EventProcessingBolt.METRIC_ALARM_EVENT_STREAM_ID.equals(tuple.getSourceStreamId())) {
String subAlarmId = tuple.getString(2);
if (EventProcessingBolt.DELETED.equals(eventType))
handleAlarmDeleted(metricDefinition, subAlarmId);
handleAlarmDeleted(metricDefinitionAndTenantId, subAlarmId);
} else if (EventProcessingBolt.METRIC_SUB_ALARM_EVENT_STREAM_ID.equals(tuple.getSourceStreamId())) {
SubAlarm subAlarm = (SubAlarm) tuple.getValue(2);
if (EventProcessingBolt.CREATED.equals(eventType))
handleAlarmCreated(metricDefinition, subAlarm);
handleAlarmCreated(metricDefinitionAndTenantId, subAlarm);
}
}
}
@ -135,17 +136,17 @@ public class MetricAggregationBolt extends BaseRichBolt {
/**
* Aggregates values for the {@code metric} that are within the periods defined for the alarm.
*/
void aggregateValues(MetricDefinition metricDefinition, Metric metric) {
SubAlarmStatsRepository subAlarmStatsRepo = getOrCreateSubAlarmStatsRepo(metricDefinition);
void aggregateValues(MetricDefinitionAndTenantId metricDefinitionAndTenantId, Metric metric) {
SubAlarmStatsRepository subAlarmStatsRepo = getOrCreateSubAlarmStatsRepo(metricDefinitionAndTenantId);
if (subAlarmStatsRepo == null || metric == null)
return;
for (SubAlarmStats stats : subAlarmStatsRepo.get()) {
if (stats.getStats().addValue(metric.value, metric.timestamp))
LOG.trace("Aggregated value {} at {} for {}. Updated {}", metric.value, metric.timestamp,
metricDefinition, stats.getStats());
metricDefinitionAndTenantId, stats.getStats());
else
LOG.warn("Invalid metric timestamp {} for {}, {}", metric.timestamp, metricDefinition,
LOG.warn("Invalid metric timestamp {} for {}, {}", metric.timestamp, metricDefinitionAndTenantId,
stats.getStats());
}
}
@ -168,24 +169,24 @@ public class MetricAggregationBolt extends BaseRichBolt {
}
/**
* Returns an existing or newly created SubAlarmStatsRepository for the {@code metricDefinition}.
* Returns an existing or newly created SubAlarmStatsRepository for the {@code metricDefinitionAndTenantId}.
* Newly created SubAlarmStatsRepositories are initialized with stats whose view ends one minute
* from now.
*/
SubAlarmStatsRepository getOrCreateSubAlarmStatsRepo(MetricDefinition metricDefinition) {
SubAlarmStatsRepository subAlarmStatsRepo = subAlarmStatsRepos.get(metricDefinition);
SubAlarmStatsRepository getOrCreateSubAlarmStatsRepo(MetricDefinitionAndTenantId metricDefinitionAndTenantId) {
SubAlarmStatsRepository subAlarmStatsRepo = subAlarmStatsRepos.get(metricDefinitionAndTenantId);
if (subAlarmStatsRepo == null) {
List<SubAlarm> subAlarms = subAlarmDAO.find(metricDefinition);
List<SubAlarm> subAlarms = subAlarmDAO.find(metricDefinitionAndTenantId);
if (subAlarms.isEmpty())
LOG.warn("Failed to find sub alarms for {}", metricDefinition);
LOG.warn("Failed to find sub alarms for {}", metricDefinitionAndTenantId);
else {
LOG.debug("Creating SubAlarmStats for {}", metricDefinition);
LOG.debug("Creating SubAlarmStats for {}", metricDefinitionAndTenantId);
for (SubAlarm subAlarm : subAlarms)
// TODO should treat metric def name previx like a namespace
subAlarm.setSporadicMetric(sporadicMetricNamespaces.contains(metricDefinition.name));
// TODO should treat metric def name prefix like a namespace
subAlarm.setSporadicMetric(sporadicMetricNamespaces.contains(metricDefinitionAndTenantId.metricDefinition.name));
long viewEndTimestamp = (System.currentTimeMillis() / 1000) + evaluationTimeOffset;
subAlarmStatsRepo = new SubAlarmStatsRepository(subAlarms, viewEndTimestamp);
subAlarmStatsRepos.put(metricDefinition, subAlarmStatsRepo);
subAlarmStatsRepos.put(metricDefinitionAndTenantId, subAlarmStatsRepo);
}
}
@ -193,11 +194,11 @@ public class MetricAggregationBolt extends BaseRichBolt {
}
/**
* Adds the {@code subAlarm} subAlarmStatsRepo for the {@code metricDefinition}.
* Adds the {@code subAlarm} subAlarmStatsRepo for the {@code metricDefinitionAndTenantId}.
*/
void handleAlarmCreated(MetricDefinition metricDefinition, SubAlarm subAlarm) {
void handleAlarmCreated(MetricDefinitionAndTenantId metricDefinitionAndTenantId, SubAlarm subAlarm) {
LOG.debug("Received AlarmCreatedEvent for {}", subAlarm);
SubAlarmStatsRepository subAlarmStatsRepo = getOrCreateSubAlarmStatsRepo(metricDefinition);
SubAlarmStatsRepository subAlarmStatsRepo = getOrCreateSubAlarmStatsRepo(metricDefinitionAndTenantId);
if (subAlarmStatsRepo == null)
return;
@ -207,15 +208,15 @@ public class MetricAggregationBolt extends BaseRichBolt {
/**
* Removes the sub-alarm for the {@code subAlarmId} from the subAlarmStatsRepo for the
* {@code metricDefinition}.
* {@code metricDefinitionAndTenantId}.
*/
void handleAlarmDeleted(MetricDefinition metricDefinition, String subAlarmId) {
void handleAlarmDeleted(MetricDefinitionAndTenantId metricDefinitionAndTenantId, String subAlarmId) {
LOG.debug("Received AlarmDeletedEvent for subAlarm id {}", subAlarmId);
SubAlarmStatsRepository subAlarmStatsRepo = subAlarmStatsRepos.get(metricDefinition);
SubAlarmStatsRepository subAlarmStatsRepo = subAlarmStatsRepos.get(metricDefinitionAndTenantId);
if (subAlarmStatsRepo != null) {
subAlarmStatsRepo.remove(subAlarmId);
if (subAlarmStatsRepo.isEmpty())
subAlarmStatsRepos.remove(metricDefinition);
subAlarmStatsRepos.remove(metricDefinitionAndTenantId);
}
}
}

View File

@ -16,7 +16,7 @@ import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import com.hpcloud.mon.common.model.metric.MetricDefinition;
import com.hpcloud.mon.domain.model.MetricDefinitionAndTenantId;
import com.hpcloud.mon.domain.model.SubAlarm;
import com.hpcloud.mon.domain.service.MetricDefinitionDAO;
import com.hpcloud.mon.domain.service.SubAlarmDAO;
@ -31,16 +31,16 @@ import com.hpcloud.util.Injector;
* alarm. Receives metric alarm and metric sub-alarm events to update metric definitions.
*
* METRIC_DEFS table is shared between any bolts in the same worker process so that all of the
* Metric Definitions for existing SubAlarms only have to be read once and because it is not
* MetricDefinitionAndTenantIds for existing SubAlarms only have to be read once and because it is not
* possible to predict which bolt gets which Metrics so all Bolts know about all starting
* MetricDefinitions.
* MetricDefinitionAndTenantIds.
*
* The current topology uses shuffleGrouping for the incoming Metrics and allGrouping for the
* events. So, any Bolt may get any Metric so the METRIC_DEFS table must be kept up to date
* for all MetricDefinitions.
* for all MetricDefinitionAndTenantIds.
*
* The METRIC_DEFS table contains a List of SubAlarms IDs that reference the same MetricDefinition
* so if a SubAlarm is deleted, the MetricDefinition will only be deleted if no more SubAlarms
* The METRIC_DEFS table contains a List of SubAlarms IDs that reference the same MetricDefinitionAndTenantId
* so if a SubAlarm is deleted, the MetricDefinitionAndTenantId will only be deleted if no more SubAlarms
* reference it. Incrementing and decrementing the count is done under the static lock SENTINAL
* to ensure it is correct across all Bolts sharing the same METRIC_DEFS table. The
* amount of adds and deletes will be very small compared to the number of Metrics so it shouldn't
@ -48,20 +48,20 @@ import com.hpcloud.util.Injector;
*
* <ul>
* <li>Input: MetricDefinition metricDefinition, Metric metric
* <li>Input metric-alarm-events: String eventType, MetricDefinition metricDefinition, String
* <li>Input metric-alarm-events: String eventType, MetricDefinitionAndTenantId metricDefinitionAndTenantId, String
* alarmId
* <li>Input metric-sub-alarm-events: String eventType, MetricDefinition metricDefinition, SubAlarm
* <li>Input metric-sub-alarm-events: String eventType, MetricDefinitionAndTenantId metricDefinitionAndTenantId, SubAlarm
* subAlarm
* <li>Output: MetricDefinition metricDefinition, Metric metric
* <li>Output: MetricDefinitionAndTenantId metricDefinitionAndTenantId, Metric metric
* </ul>
*
* @author Jonathan Halterman
*/
public class MetricFilteringBolt extends BaseRichBolt {
private static final long serialVersionUID = 1096706128973976599L;
private static final Map<MetricDefinition, List<String>> METRIC_DEFS = new ConcurrentHashMap<>();
private static final Map<MetricDefinitionAndTenantId, List<String>> METRIC_DEFS = new ConcurrentHashMap<>();
private static final Object SENTINAL = new Object();
public static final String[] FIELDS = new String[] { "metricDefinition", "metric" };
public static final String[] FIELDS = new String[] { "metricDefinitionAndTenantId", "metric" };
private transient Logger LOG;
private DataSourceFactory dbConfig;
@ -83,27 +83,27 @@ public class MetricFilteringBolt extends BaseRichBolt {
@Override
public void execute(Tuple tuple) {
LOG.debug("tuple: {}", tuple);
LOG.debug("tuple: {}", tuple);
try {
if (Streams.DEFAULT_STREAM_ID.equals(tuple.getSourceStreamId())) {
MetricDefinition metricDef = (MetricDefinition) tuple.getValue(0);
MetricDefinitionAndTenantId metricDefinitionAndTenantId = (MetricDefinitionAndTenantId) tuple.getValue(0);
LOG.debug("metric definition: {}", metricDef);
if (METRIC_DEFS.containsKey(metricDef))
LOG.debug("metric definition and tenant id: {}", metricDefinitionAndTenantId);
if (METRIC_DEFS.containsKey(metricDefinitionAndTenantId))
collector.emit(tuple, tuple.getValues());
} else {
String eventType = tuple.getString(0);
MetricDefinition metricDefinition = (MetricDefinition) tuple.getValue(1);
MetricDefinitionAndTenantId metricDefinitionAndTenantId = (MetricDefinitionAndTenantId) tuple.getValue(1);
LOG.debug("Received {} for {}", eventType, metricDefinition);
LOG.debug("Received {} for {}", eventType, metricDefinitionAndTenantId);
if (EventProcessingBolt.METRIC_ALARM_EVENT_STREAM_ID.equals(tuple.getSourceStreamId())) {
if (EventProcessingBolt.DELETED.equals(eventType))
removeSubAlarm(metricDefinition, tuple.getString(2));
removeSubAlarm(metricDefinitionAndTenantId, tuple.getString(2));
} else if (EventProcessingBolt.METRIC_SUB_ALARM_EVENT_STREAM_ID.equals(tuple.getSourceStreamId())) {
if (EventProcessingBolt.CREATED.equals(eventType))
synchronized(SENTINAL) {
final SubAlarm subAlarm = (SubAlarm) tuple.getValue(2);
addMetricDef(metricDefinition, subAlarm.getId());
addMetricDef(metricDefinitionAndTenantId, subAlarm.getId());
}
}
}
@ -114,12 +114,12 @@ public class MetricFilteringBolt extends BaseRichBolt {
}
}
private void removeSubAlarm(MetricDefinition metricDefinition, String subAlarmId) {
private void removeSubAlarm(MetricDefinitionAndTenantId metricDefinitionAndTenantId, String subAlarmId) {
synchronized(SENTINAL) {
final List<String> count = METRIC_DEFS.get(metricDefinition);
if (count != null) {
if (count.remove(subAlarmId) && count.isEmpty()) {
METRIC_DEFS.remove(metricDefinition);
final List<String> subAlarmIds = METRIC_DEFS.get(metricDefinitionAndTenantId);
if (subAlarmIds != null) {
if (subAlarmIds.remove(subAlarmId) && subAlarmIds.isEmpty()) {
METRIC_DEFS.remove(metricDefinitionAndTenantId);
}
}
}
@ -142,21 +142,21 @@ public class MetricFilteringBolt extends BaseRichBolt {
synchronized (SENTINAL) {
if (METRIC_DEFS.isEmpty()) {
for (SubAlarmMetricDefinition subAlarmMetricDef : metricDefDAO.findForAlarms()) {
addMetricDef(subAlarmMetricDef.getMetricDefinition(), subAlarmMetricDef.getSubAlarmId());
addMetricDef(subAlarmMetricDef.getMetricDefinitionAndTenantId(), subAlarmMetricDef.getSubAlarmId());
}
// Iterate again to ensure we only emit each metricDef once
for (MetricDefinition metricDef : METRIC_DEFS.keySet())
collector.emit(new Values(metricDef, null));
for (MetricDefinitionAndTenantId metricDefinitionAndTenantId : METRIC_DEFS.keySet())
collector.emit(new Values(metricDefinitionAndTenantId, null));
}
}
}
}
private void addMetricDef(MetricDefinition metricDef, String subAlarmId) {
List<String> subAlarmIds = METRIC_DEFS.get(metricDef);
private void addMetricDef(MetricDefinitionAndTenantId metricDefinitionAndTenantId, String subAlarmId) {
List<String> subAlarmIds = METRIC_DEFS.get(metricDefinitionAndTenantId);
if (subAlarmIds == null) {
subAlarmIds = new LinkedList<>();
METRIC_DEFS.put(metricDef, subAlarmIds);
METRIC_DEFS.put(metricDefinitionAndTenantId, subAlarmIds);
}
else if (subAlarmIds.contains(subAlarmId))
return; // Make sure it only gets added once. Multiple bolts process the same AlarmCreatedEvent

View File

@ -8,6 +8,7 @@ import backtype.storm.tuple.Values;
import com.hpcloud.mon.MetricSpoutConfig;
import com.hpcloud.mon.common.model.metric.MetricEnvelope;
import com.hpcloud.mon.common.model.metric.MetricEnvelopes;
import com.hpcloud.mon.domain.model.MetricDefinitionAndTenantId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -17,7 +18,8 @@ public class MetricSpout extends KafkaSpout {
private static final long serialVersionUID = 744004533863562119L;
public static final String[] FIELDS = new String[] { "metricDefinition", "metric" };
public static final String[] FIELDS = new String[] { "metricDefinitionAndTenantId", "metric" };
public static final String DEFAULT_TENANT_ID = "TENANT_ID_NOT_SET";
public MetricSpout(MetricSpoutConfig metricSpoutConfig) {
super(metricSpoutConfig.kafkaConsumerConfiguration);
@ -35,7 +37,12 @@ public class MetricSpout extends KafkaSpout {
LOG.warn("Error parsing MetricEnvelope", re);
return;
}
collector.emit(new Values(metricEnvelope.metric.definition(), metricEnvelope.metric));
String tenantId = (String)metricEnvelope.meta.get("tenantId");
if (tenantId == null) {
LOG.error("No tenantId so using default tenantId {} for Metric {}", DEFAULT_TENANT_ID, metricEnvelope.metric);
tenantId = DEFAULT_TENANT_ID;
}
collector.emit(new Values(new MetricDefinitionAndTenantId(metricEnvelope.metric.definition(), tenantId), metricEnvelope.metric));
}
@Override

View File

@ -30,6 +30,7 @@ import com.hpcloud.mon.common.model.alarm.AlarmState;
import com.hpcloud.mon.common.model.metric.Metric;
import com.hpcloud.mon.common.model.metric.MetricDefinition;
import com.hpcloud.mon.domain.model.Alarm;
import com.hpcloud.mon.domain.model.MetricDefinitionAndTenantId;
import com.hpcloud.mon.domain.model.SubAlarm;
import com.hpcloud.mon.domain.service.AlarmDAO;
import com.hpcloud.mon.domain.service.MetricDefinitionDAO;
@ -88,10 +89,11 @@ public class ThresholdingEngineTest extends TopologyTestCase {
subAlarmDAO = mock(SubAlarmDAO.class);
final SubAlarm cpuMetricDefSubAlarm = new SubAlarm("123", TEST_ALARM_ID, expression.getSubExpressions().get(0));
final SubAlarm memMetricDefSubAlarm = new SubAlarm("456", TEST_ALARM_ID, expression.getSubExpressions().get(1));
when(subAlarmDAO.find(any(MetricDefinition.class))).thenAnswer(new Answer<List<SubAlarm>>() {
when(subAlarmDAO.find(any(MetricDefinitionAndTenantId.class))).thenAnswer(new Answer<List<SubAlarm>>() {
@Override
public List<SubAlarm> answer(InvocationOnMock invocation) throws Throwable {
MetricDefinition metricDef = (MetricDefinition) invocation.getArguments()[0];
MetricDefinitionAndTenantId metricDefinitionAndTenantId = (MetricDefinitionAndTenantId) invocation.getArguments()[0];
MetricDefinition metricDef = metricDefinitionAndTenantId.metricDefinition;
if (metricDef.equals(cpuMetricDef)) {
return Arrays.asList(cpuMetricDefSubAlarm);
} else if (metricDef.equals(memMetricDef)) {
@ -103,8 +105,10 @@ public class ThresholdingEngineTest extends TopologyTestCase {
metricDefinitionDAO = mock(MetricDefinitionDAO.class);
final List<SubAlarmMetricDefinition> metricDefs = Arrays.asList(
new SubAlarmMetricDefinition(cpuMetricDefSubAlarm.getId(), cpuMetricDef),
new SubAlarmMetricDefinition(memMetricDefSubAlarm.getId(), memMetricDef));
new SubAlarmMetricDefinition(cpuMetricDefSubAlarm.getId(),
new MetricDefinitionAndTenantId(cpuMetricDef, TEST_ALARM_TENANT_ID)),
new SubAlarmMetricDefinition(memMetricDefSubAlarm.getId(),
new MetricDefinitionAndTenantId(memMetricDef, TEST_ALARM_TENANT_ID)));
when(metricDefinitionDAO.findForAlarms()).thenReturn(metricDefs);
// Bindings
@ -173,9 +177,9 @@ public class ThresholdingEngineTest extends TopologyTestCase {
System.out.println("Feeding metrics...");
long time = System.currentTimeMillis() / 1000;
metricSpout.feed(new Values(cpuMetricDef, new Metric(cpuMetricDef.name,
metricSpout.feed(new Values(new MetricDefinitionAndTenantId(cpuMetricDef, TEST_ALARM_TENANT_ID), new Metric(cpuMetricDef.name,
cpuMetricDef.dimensions, time, (double) (++goodValueCount == 15 ? 1 : 555))));
metricSpout.feed(new Values(memMetricDef, new Metric(memMetricDef.name,
metricSpout.feed(new Values(new MetricDefinitionAndTenantId(memMetricDef, TEST_ALARM_TENANT_ID), new Metric(memMetricDef.name,
memMetricDef.dimensions, time, (double) (goodValueCount == 15 ? 1 : 555))));
if (--feedCount == 0)

View File

@ -28,6 +28,7 @@ import com.hpcloud.mon.common.model.alarm.AlarmSubExpression;
import com.hpcloud.mon.common.model.metric.Metric;
import com.hpcloud.mon.common.model.metric.MetricDefinition;
import com.hpcloud.mon.domain.model.Alarm;
import com.hpcloud.mon.domain.model.MetricDefinitionAndTenantId;
import com.hpcloud.mon.domain.model.SubAlarm;
import com.hpcloud.mon.domain.service.AlarmDAO;
import com.hpcloud.mon.domain.service.MetricDefinitionDAO;
@ -47,6 +48,8 @@ import com.hpcloud.util.Injector;
*/
@Test(groups = "integration")
public class ThresholdingEngineTest1 extends TopologyTestCase {
private static final String JOE_TENANT_ID = "joe";
private static final String BOB_TENANT_ID = "bob";
private FeederSpout metricSpout;
private FeederSpout eventSpout;
private AlarmDAO alarmDAO;
@ -77,20 +80,21 @@ public class ThresholdingEngineTest1 extends TopologyTestCase {
@Override
public Alarm answer(InvocationOnMock invocation) throws Throwable {
if (invocation.getArguments()[0].equals("1"))
return new Alarm("1", "bob", "test-alarm", "Descr of test-alarm", expression, Arrays.asList(createCpuSubAlarm(),
return new Alarm("1", BOB_TENANT_ID, "test-alarm", "Descr of test-alarm", expression, Arrays.asList(createCpuSubAlarm(),
createMemSubAlarm()), AlarmState.OK);
else if (invocation.getArguments()[0].equals("2"))
return new Alarm("2", "joe", "joes-alarm", "Descr of joes-alarm", customExpression,
return new Alarm("2", JOE_TENANT_ID, "joes-alarm", "Descr of joes-alarm", customExpression,
Arrays.asList(createCustomSubAlarm()), AlarmState.OK);
return null;
}
});
subAlarmDAO = mock(SubAlarmDAO.class);
when(subAlarmDAO.find(any(MetricDefinition.class))).thenAnswer(new Answer<List<SubAlarm>>() {
when(subAlarmDAO.find(any(MetricDefinitionAndTenantId.class))).thenAnswer(new Answer<List<SubAlarm>>() {
@Override
public List<SubAlarm> answer(InvocationOnMock invocation) throws Throwable {
MetricDefinition metricDef = (MetricDefinition) invocation.getArguments()[0];
MetricDefinitionAndTenantId metricDefinitionAndTenantId = (MetricDefinitionAndTenantId) invocation.getArguments()[0];
MetricDefinition metricDef = metricDefinitionAndTenantId.metricDefinition;
if (metricDef.equals(cpuMetricDef))
return Arrays.asList(createCpuSubAlarm());
else if (metricDef.equals(memMetricDef))
@ -103,9 +107,12 @@ public class ThresholdingEngineTest1 extends TopologyTestCase {
metricDefinitionDAO = mock(MetricDefinitionDAO.class);
final List<SubAlarmMetricDefinition> metricDefs = Arrays.asList(
new SubAlarmMetricDefinition(createCpuSubAlarm().getId(), cpuMetricDef),
new SubAlarmMetricDefinition(createMemSubAlarm().getId(), memMetricDef),
new SubAlarmMetricDefinition(createCustomSubAlarm().getId(), customMetricDef));
new SubAlarmMetricDefinition(createCpuSubAlarm().getId(),
new MetricDefinitionAndTenantId(cpuMetricDef, BOB_TENANT_ID)),
new SubAlarmMetricDefinition(createMemSubAlarm().getId(),
new MetricDefinitionAndTenantId(memMetricDef, BOB_TENANT_ID)),
new SubAlarmMetricDefinition(createCustomSubAlarm().getId(),
new MetricDefinitionAndTenantId(customMetricDef, JOE_TENANT_ID)));
when(metricDefinitionDAO.findForAlarms()).thenReturn(metricDefs);
// Bindings
@ -152,20 +159,20 @@ public class ThresholdingEngineTest1 extends TopologyTestCase {
while (true) {
long time = System.currentTimeMillis();
metricSpout.feed(new Values(cpuMetricDef, new Metric(cpuMetricDef.name,
metricSpout.feed(new Values(new MetricDefinitionAndTenantId(cpuMetricDef, BOB_TENANT_ID), new Metric(cpuMetricDef.name,
cpuMetricDef.dimensions, time, count % 10 == 0 ? 555 : 1)));
metricSpout.feed(new Values(memMetricDef, new Metric(memMetricDef.name,
metricSpout.feed(new Values(new MetricDefinitionAndTenantId(memMetricDef, BOB_TENANT_ID), new Metric(memMetricDef.name,
cpuMetricDef.dimensions, time, count % 10 == 0 ? 555 : 1)));
metricSpout.feed(new Values(customMetricDef, new Metric(customMetricDef.name,
metricSpout.feed(new Values(new MetricDefinitionAndTenantId(customMetricDef, JOE_TENANT_ID), new Metric(customMetricDef.name,
cpuMetricDef.dimensions, time, count % 20 == 0 ? 1 : 123)));
if (count % 5 == 0) {
Object event = null;
if (++eventCounter % 2 == 0)
event = new AlarmDeletedEvent("joe", "2",
event = new AlarmDeletedEvent(JOE_TENANT_ID, "2",
ImmutableMap.<String, MetricDefinition>builder().put("444", customMetricDef).build());
else
event = new AlarmCreatedEvent("joe", "2", "foo", customSubExpression.getExpression(),
event = new AlarmCreatedEvent(JOE_TENANT_ID, "2", "foo", customSubExpression.getExpression(),
ImmutableMap.<String, AlarmSubExpression>builder()
.put("444", customSubExpression)
.build());

View File

@ -19,6 +19,7 @@ import com.google.common.io.Resources;
import com.hpcloud.mon.common.model.alarm.AlarmState;
import com.hpcloud.mon.common.model.alarm.AlarmSubExpression;
import com.hpcloud.mon.common.model.metric.MetricDefinition;
import com.hpcloud.mon.domain.model.MetricDefinitionAndTenantId;
import com.hpcloud.mon.domain.model.SubAlarm;
import com.hpcloud.mon.domain.service.SubAlarmDAO;
@ -27,6 +28,7 @@ import com.hpcloud.mon.domain.service.SubAlarmDAO;
*/
@Test
public class SubAlarmDAOImplTest {
private static final String TENANT_ID = "42";
private DBI db;
private Handle handle;
private SubAlarmDAO dao;
@ -46,9 +48,20 @@ public class SubAlarmDAOImplTest {
@BeforeMethod
protected void beforeMethod() {
handle.execute("truncate table alarm");
handle.execute("truncate table sub_alarm");
handle.execute("truncate table sub_alarm_dimension");
// These don't have the real Alarm expression because it doesn't matter for this test
handle.execute("insert into alarm (id, tenant_id, name, description, expression, state, enabled, created_at, updated_at) "
+ "values ('123', '" + TENANT_ID + "', 'Test Alarm', 'Test Alarm Description', 'Not real expr', 'OK', '1', NOW(), NOW())");
handle.execute("insert into alarm (id, tenant_id, name, description, expression, state, enabled, created_at, updated_at) "
+ "values ('234', '" + TENANT_ID + "', 'Test Alarm2', 'Test Alarm2 Description', 'Not real expr', 'OK', '1', NOW(), NOW())");
handle.execute("insert into alarm (id, tenant_id, name, description, expression, state, enabled, created_at, updated_at) "
+ "values ('345', '" + TENANT_ID + "', 'Test Alarm3', 'Test Alarm3 Description', 'Not real expr', 'OK', '1', NOW(), NOW())");
handle.execute("insert into alarm (id, tenant_id, name, description, expression, state, enabled, created_at, updated_at) "
+ "values ('456', '" + TENANT_ID + "', 'Test Alarm4', 'Test Alarm4 Description', 'Not real expr', 'OK', '1', NOW(), NOW())");
handle.execute("insert into sub_alarm (id, alarm_id, function, metric_name, operator, threshold, period, periods, created_at, updated_at) "
+ "values ('111', '123', 'AVG', 'hpcs.compute', 'GT', 10, 60, 1, NOW(), NOW())");
handle.execute("insert into sub_alarm_dimension values ('111', 'instance_id', '555')");
@ -80,31 +93,40 @@ public class SubAlarmDAOImplTest {
List<SubAlarm> expected = Arrays.asList(new SubAlarm("111", "123",
AlarmSubExpression.of("avg(hpcs.compute{instance_id=555,az=1,metric_name=cpu}) > 10"),
AlarmState.UNDETERMINED));
List<SubAlarm> subAlarms = dao.find(expected.get(0).getExpression().getMetricDefinition());
List<SubAlarm> subAlarms = dao.find(new MetricDefinitionAndTenantId(expected.get(0).getExpression().getMetricDefinition(), TENANT_ID));
assertEquals(subAlarms, expected);
expected = Arrays.asList(new SubAlarm("222", "234",
AlarmSubExpression.of("avg(hpcs.compute{instance_id=666,az=1,metric_name=cpu}) > 10"),
AlarmState.UNDETERMINED));
subAlarms = dao.find(expected.get(0).getExpression().getMetricDefinition());
subAlarms = dao.find(new MetricDefinitionAndTenantId(expected.get(0).getExpression().getMetricDefinition(), TENANT_ID));
assertEquals(subAlarms, expected);
}
public void shouldNotFind() {
final String badTenantId = TENANT_ID + "42";
List<SubAlarm> subAlarms = dao.find(new MetricDefinitionAndTenantId(AlarmSubExpression.of("avg(hpcs.compute{instance_id=555,az=1,metric_name=cpu}) > 10").getMetricDefinition(), badTenantId));
assertEquals(subAlarms.size(), 0);
subAlarms = dao.find(new MetricDefinitionAndTenantId(AlarmSubExpression.of("avg(hpcs.compute{instance_id=666,az=1,metric_name=cpu}) > 10").getMetricDefinition(), badTenantId));
assertEquals(subAlarms.size(), 0);
}
public void shouldFindWithSubject() {
List<SubAlarm> expected = Arrays.asList(new SubAlarm(
"333",
"345",
AlarmSubExpression.of("avg(hpcs.compute{instance_id=777,az=1,metric_name=disk,device=vda}) > 10"),
AlarmState.UNDETERMINED));
List<SubAlarm> subAlarms = dao.find(expected.get(0).getExpression().getMetricDefinition());
List<SubAlarm> subAlarms = dao.find(new MetricDefinitionAndTenantId(expected.get(0).getExpression().getMetricDefinition(), TENANT_ID));
assertEquals(subAlarms, expected);
}
public void shouldFailFindForNullDimensions() {
List<SubAlarm> expected = Arrays.asList(new SubAlarm("444", "456",
AlarmSubExpression.of("avg(hpcs.compute{metric_name=cpu}) > 10"), AlarmState.UNDETERMINED));
List<SubAlarm> subAlarms = dao.find(new MetricDefinition("hpcs.compute",
new ImmutableMap.Builder<String, String>().put("metric_name", "cpu").build()));
List<SubAlarm> subAlarms = dao.find(new MetricDefinitionAndTenantId(new MetricDefinition("hpcs.compute",
new ImmutableMap.Builder<String, String>().put("metric_name", "cpu").build()), TENANT_ID));
assertNotEquals(subAlarms, expected);
}
}

View File

@ -34,12 +34,14 @@ import com.hpcloud.mon.common.model.alarm.AlarmState;
import com.hpcloud.mon.common.model.alarm.AlarmSubExpression;
import com.hpcloud.mon.common.model.metric.MetricDefinition;
import com.hpcloud.mon.domain.model.Alarm;
import com.hpcloud.mon.domain.model.MetricDefinitionAndTenantId;
import com.hpcloud.mon.domain.model.SubAlarm;
import com.hpcloud.streaming.storm.Streams;
@Test
public class EventProcessingBoltTest {
private static final String TENANT_ID = "AAAAABBBBBBCCCCC";
private EventProcessingBolt bolt;
private OutputCollector collector;
private AlarmExpression alarmExpression;
@ -56,7 +58,6 @@ public class EventProcessingBoltTest {
bolt.prepare(config, context, collector);
final String alarmId = "111111112222222222233333333334";
final String tenantId = "AAAAABBBBBBCCCCC";
final String name = "Test CPU Alarm";
final String description = "Description of " + name;
final String expression = "avg(hpcs.compute.cpu{instance_id=123,device=42}, 1) > 5 " +
@ -64,7 +65,7 @@ public class EventProcessingBoltTest {
"and max(hpcs.compute.load{instance_id=123,device=42}) > 5";
alarmExpression = new AlarmExpression(expression);
subAlarms = createSubAlarms(alarmId, alarmExpression);
alarm = new Alarm(alarmId, tenantId, name, description, alarmExpression, subAlarms, AlarmState.UNDETERMINED);
alarm = new Alarm(alarmId, TENANT_ID, name, description, alarmExpression, subAlarms, AlarmState.UNDETERMINED);
}
private List<SubAlarm> createSubAlarms(final String alarmId,
@ -125,7 +126,9 @@ public class EventProcessingBoltTest {
private void verifyDeletedSubAlarm(final SubAlarm subAlarm) {
verify(collector, times(1)).emit(EventProcessingBolt.METRIC_ALARM_EVENT_STREAM_ID,
new Values(EventProcessingBolt.DELETED, subAlarm.getExpression().getMetricDefinition(), subAlarm.getId()));
new Values(EventProcessingBolt.DELETED,
new MetricDefinitionAndTenantId(
subAlarm.getExpression().getMetricDefinition(), TENANT_ID), subAlarm.getId()));
}
public static AlarmUpdatedEvent createAlarmUpdatedEvent(final Alarm alarm,
@ -209,7 +212,9 @@ public class EventProcessingBoltTest {
private void verifyAddedSubAlarm(final SubAlarm subAlarm) {
verify(collector, times(1)).emit(EventProcessingBolt.METRIC_SUB_ALARM_EVENT_STREAM_ID,
new Values(EventProcessingBolt.CREATED, subAlarm.getExpression().getMetricDefinition(), subAlarm));
new Values(EventProcessingBolt.CREATED,
new MetricDefinitionAndTenantId(
subAlarm.getExpression().getMetricDefinition(), TENANT_ID), subAlarm));
}
private static Map<String, AlarmSubExpression> createAlarmSubExpressionMap(

View File

@ -32,6 +32,7 @@ import com.hpcloud.mon.common.model.alarm.AlarmState;
import com.hpcloud.mon.common.model.alarm.AlarmSubExpression;
import com.hpcloud.mon.common.model.metric.Metric;
import com.hpcloud.mon.common.model.metric.MetricDefinition;
import com.hpcloud.mon.domain.model.MetricDefinitionAndTenantId;
import com.hpcloud.mon.domain.model.SubAlarm;
import com.hpcloud.mon.domain.model.SubAlarmStats;
import com.hpcloud.mon.domain.service.SubAlarmDAO;
@ -43,6 +44,7 @@ import com.hpcloud.streaming.storm.Streams;
*/
@Test
public class MetricAggregationBoltTest {
private static final String TENANT_ID = "42";
private MetricAggregationBolt bolt;
private TopologyContext context;
private OutputCollector collector;
@ -81,13 +83,13 @@ public class MetricAggregationBoltTest {
subAlarms.add(subAlarm3);
final SubAlarmDAO dao = mock(SubAlarmDAO.class);
when(dao.find(any(MetricDefinition.class))).thenAnswer(new Answer<List<SubAlarm>>() {
when(dao.find(any(MetricDefinitionAndTenantId.class))).thenAnswer(new Answer<List<SubAlarm>>() {
@Override
public List<SubAlarm> answer(InvocationOnMock invocation) throws Throwable {
final MetricDefinition metricDef = (MetricDefinition) invocation.getArguments()[0];
final MetricDefinitionAndTenantId metricDefinitionAndTenantId = (MetricDefinitionAndTenantId) invocation.getArguments()[0];
final List<SubAlarm> result = new ArrayList<>();
for (final SubAlarm subAlarm : subAlarms)
if (subAlarm.getExpression().getMetricDefinition().equals(metricDef))
if (subAlarm.getExpression().getMetricDefinition().equals(metricDefinitionAndTenantId.metricDefinition))
result.add(subAlarm);
return result;
}
@ -102,15 +104,15 @@ public class MetricAggregationBoltTest {
public void shouldAggregateValues() {
long t1 = System.currentTimeMillis() / 1000;
bolt.aggregateValues(metricDef1, new Metric(metricDef1.name, metricDef1.dimensions, t1, 100));
bolt.aggregateValues(metricDef1, new Metric(metricDef1.name, metricDef1.dimensions, t1, 80));
bolt.aggregateValues(metricDef2, new Metric(metricDef2.name, metricDef2.dimensions, t1, 50));
bolt.aggregateValues(metricDef2, new Metric(metricDef2.name, metricDef2.dimensions, t1, 40));
bolt.aggregateValues(new MetricDefinitionAndTenantId(metricDef1, TENANT_ID), new Metric(metricDef1.name, metricDef1.dimensions, t1, 100));
bolt.aggregateValues(new MetricDefinitionAndTenantId(metricDef1, TENANT_ID), new Metric(metricDef1.name, metricDef1.dimensions, t1, 80));
bolt.aggregateValues(new MetricDefinitionAndTenantId(metricDef2, TENANT_ID), new Metric(metricDef2.name, metricDef2.dimensions, t1, 50));
bolt.aggregateValues(new MetricDefinitionAndTenantId(metricDef2, TENANT_ID), new Metric(metricDef2.name, metricDef2.dimensions, t1, 40));
SubAlarmStats alarmData = bolt.getOrCreateSubAlarmStatsRepo(metricDef1).get("123");
SubAlarmStats alarmData = bolt.getOrCreateSubAlarmStatsRepo(new MetricDefinitionAndTenantId(metricDef1, TENANT_ID)).get("123");
assertEquals(alarmData.getStats().getValue(t1), 90.0);
alarmData = bolt.getOrCreateSubAlarmStatsRepo(metricDef2).get("456");
alarmData = bolt.getOrCreateSubAlarmStatsRepo(new MetricDefinitionAndTenantId(metricDef2, TENANT_ID)).get("456");
assertEquals(alarmData.getStats().getValue(t1), 45.0);
}
@ -166,32 +168,34 @@ private Tuple createTickTuple() {
tupleParam.setFields(EventProcessingBolt.METRIC_SUB_ALARM_EVENT_STREAM_FIELDS);
tupleParam.setStream(EventProcessingBolt.METRIC_SUB_ALARM_EVENT_STREAM_ID);
assertNull(bolt.subAlarmStatsRepos.get(metricDef1));
MetricDefinitionAndTenantId metricDefinitionAndTenantId = new MetricDefinitionAndTenantId(metricDef1, TENANT_ID);
assertNull(bolt.subAlarmStatsRepos.get(metricDefinitionAndTenantId));
bolt.execute(Testing.testTuple(Arrays.asList(EventProcessingBolt.CREATED,
metricDef1, new SubAlarm("123", "1", subExpr1)), tupleParam));
metricDefinitionAndTenantId, new SubAlarm("123", "1", subExpr1)), tupleParam));
assertNotNull(bolt.subAlarmStatsRepos.get(metricDef1).get("123"));
assertNotNull(bolt.subAlarmStatsRepos.get(metricDefinitionAndTenantId).get("123"));
}
public void validateMetricDefDeleted() {
MkTupleParam tupleParam = new MkTupleParam();
tupleParam.setFields(EventProcessingBolt.METRIC_ALARM_EVENT_STREAM_FIELDS);
tupleParam.setStream(EventProcessingBolt.METRIC_ALARM_EVENT_STREAM_ID);
bolt.getOrCreateSubAlarmStatsRepo(metricDef1);
MetricDefinitionAndTenantId metricDefinitionAndTenantId = new MetricDefinitionAndTenantId(metricDef1, TENANT_ID);
bolt.getOrCreateSubAlarmStatsRepo(metricDefinitionAndTenantId);
assertNotNull(bolt.subAlarmStatsRepos.get(metricDef1).get("123"));
assertNotNull(bolt.subAlarmStatsRepos.get(metricDefinitionAndTenantId).get("123"));
bolt.execute(Testing.testTuple(
Arrays.asList(EventProcessingBolt.DELETED, metricDef1, "123"), tupleParam));
Arrays.asList(EventProcessingBolt.DELETED, metricDefinitionAndTenantId, "123"), tupleParam));
assertNull(bolt.subAlarmStatsRepos.get(metricDef1));
assertNull(bolt.subAlarmStatsRepos.get(metricDefinitionAndTenantId));
}
public void shouldGetOrCreateSameMetricData() {
SubAlarmStatsRepository data = bolt.getOrCreateSubAlarmStatsRepo(metricDef1);
SubAlarmStatsRepository data = bolt.getOrCreateSubAlarmStatsRepo(new MetricDefinitionAndTenantId(metricDef1, TENANT_ID));
assertNotNull(data);
assertEquals(bolt.getOrCreateSubAlarmStatsRepo(metricDef1), data);
assertEquals(bolt.getOrCreateSubAlarmStatsRepo(new MetricDefinitionAndTenantId(metricDef1, TENANT_ID)), data);
}
private Tuple createMetricTuple(final MetricDefinition metricDef,
@ -199,6 +203,6 @@ private Tuple createTickTuple() {
final MkTupleParam tupleParam = new MkTupleParam();
tupleParam.setFields(MetricFilteringBolt.FIELDS);
tupleParam.setStream(Streams.DEFAULT_STREAM_ID);
return Testing.testTuple(Arrays.asList(metricDef, metric), tupleParam);
return Testing.testTuple(Arrays.asList(new MetricDefinitionAndTenantId(metricDef, TENANT_ID), metric), tupleParam);
}
}

View File

@ -5,7 +5,6 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import java.util.ArrayList;
@ -30,6 +29,7 @@ import com.hpcloud.mon.common.model.alarm.AlarmExpression;
import com.hpcloud.mon.common.model.alarm.AlarmSubExpression;
import com.hpcloud.mon.common.model.metric.Metric;
import com.hpcloud.mon.common.model.metric.MetricDefinition;
import com.hpcloud.mon.domain.model.MetricDefinitionAndTenantId;
import com.hpcloud.mon.domain.model.SubAlarm;
import com.hpcloud.mon.domain.service.MetricDefinitionDAO;
import com.hpcloud.mon.domain.service.SubAlarmMetricDefinition;
@ -39,6 +39,7 @@ import com.hpcloud.streaming.storm.Streams;
public class MetricFilteringBoltTest {
private List<SubAlarm> subAlarms;
private List<SubAlarm> duplicateMetricSubAlarms;
private final static String TEST_TENANT_ID = "42";
@BeforeMethod
protected void beforeMethod() {
@ -78,7 +79,7 @@ public class MetricFilteringBoltTest {
if (willEmit) {
// Validate the prepare emits the initial Metric Definitions
for (final SubAlarmMetricDefinition metricDefinition : initialMetricDefinitions) {
verify(collector, times(1)).emit(new Values(metricDefinition.getMetricDefinition(), null));
verify(collector, times(1)).emit(new Values(metricDefinition.getMetricDefinitionAndTenantId(), null));
}
}
return bolt;
@ -138,7 +139,7 @@ public class MetricFilteringBoltTest {
final List<SubAlarmMetricDefinition> initialMetricDefinitions = new ArrayList<>(subAlarms.size());
for (final SubAlarm subAlarm : subAlarms) {
initialMetricDefinitions.add(new SubAlarmMetricDefinition(subAlarm.getId(),
subAlarm.getExpression().getMetricDefinition()));
new MetricDefinitionAndTenantId(subAlarm.getExpression().getMetricDefinition(), TEST_TENANT_ID)));
}
final OutputCollector collector1 = mock(OutputCollector.class);
@ -186,7 +187,9 @@ public class MetricFilteringBoltTest {
tupleParam.setFields(EventProcessingBolt.METRIC_SUB_ALARM_EVENT_STREAM_FIELDS);
tupleParam.setStream(EventProcessingBolt.METRIC_SUB_ALARM_EVENT_STREAM_ID);
final Tuple tuple = Testing.testTuple(Arrays.asList(EventProcessingBolt.CREATED,
subAlarm.getExpression().getMetricDefinition(), subAlarm), tupleParam);
new MetricDefinitionAndTenantId(
subAlarm.getExpression().getMetricDefinition(), TEST_TENANT_ID),
subAlarm), tupleParam);
return tuple;
}
@ -195,7 +198,9 @@ public class MetricFilteringBoltTest {
tupleParam.setFields(EventProcessingBolt.METRIC_ALARM_EVENT_STREAM_FIELDS);
tupleParam.setStream(EventProcessingBolt.METRIC_ALARM_EVENT_STREAM_ID);
final Tuple tuple = Testing.testTuple(Arrays.asList(EventProcessingBolt.DELETED,
subAlarm.getExpression().getMetricDefinition(), subAlarm.getId()), tupleParam);
new MetricDefinitionAndTenantId(
subAlarm.getExpression().getMetricDefinition(), TEST_TENANT_ID),
subAlarm.getId()), tupleParam);
return tuple;
}
@ -206,7 +211,8 @@ public class MetricFilteringBoltTest {
tupleParam.setStream(Streams.DEFAULT_STREAM_ID);
MetricDefinition metricDefinition = subAlarm.getExpression().getMetricDefinition();
final Metric metric = new Metric(metricDefinition, System.currentTimeMillis()/1000, 42.0);
final Tuple tuple = Testing.testTuple(Arrays.asList(metricDefinition, metric), tupleParam);
final Tuple tuple = Testing.testTuple(Arrays.asList(
new MetricDefinitionAndTenantId(metricDefinition, TEST_TENANT_ID), metric), tupleParam);
return tuple;
}
}

View File

@ -5,6 +5,7 @@ CREATE TABLE `alarm` (
`description` varchar(250) NOT NULL,
`expression` mediumtext NOT NULL,
`state` varchar(20) NOT NULL check state in ('UNDETERMINED','OK','ALARM'),
`enabled` tinyint(1) NOT NULL DEFAULT '1',
`created_at` datetime NOT NULL,
`updated_at` datetime NOT NULL,
`deleted_at` datetime,