Changes to allow MetricDefinitions in SubAlarms to match against MetricDefinitions that have at least the same set of Dimensions.

This is to support the use cases of aggregation of Metrics across systems and to handle Metrics where an extra Dimension is added but we still want the old MetricDefinition to be matched.

Added new MetricDefinitionAndTenantIdMatcher class to do the matching. Added its associated test.

Added tests for these cases.
This commit is contained in:
Craig Bryant 2014-04-09 16:14:06 -06:00
parent cbca836aa5
commit 76a2138fbf
5 changed files with 273 additions and 17 deletions

View File

@ -0,0 +1,87 @@
package com.hpcloud.mon.domain.model;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
public class MetricDefinitionAndTenantIdMatcher {
final Map<String, Map<String, List<MetricDefinitionAndTenantId>>> byTenantId = new ConcurrentHashMap<>();
public void add(MetricDefinitionAndTenantId metricDefinitionAndTenantId) {
Map<String, List<MetricDefinitionAndTenantId>> byMetricName = byTenantId.get(metricDefinitionAndTenantId.tenantId);
if (byMetricName == null) {
byMetricName = new ConcurrentHashMap<>();
byTenantId.put(metricDefinitionAndTenantId.tenantId, byMetricName);
}
List<MetricDefinitionAndTenantId> defsList = byMetricName.get(metricDefinitionAndTenantId.metricDefinition.name);
if (defsList == null) {
defsList = new LinkedList<>();
byMetricName.put(metricDefinitionAndTenantId.metricDefinition.name, defsList);
}
defsList.add(metricDefinitionAndTenantId);
}
public boolean remove(MetricDefinitionAndTenantId metricDefinitionAndTenantId) {
Map<String, List<MetricDefinitionAndTenantId>> byMetricName = byTenantId.get(metricDefinitionAndTenantId.tenantId);
if (byMetricName == null) {
return false;
}
List<MetricDefinitionAndTenantId> defsList = byMetricName.get(metricDefinitionAndTenantId.metricDefinition.name);
if (defsList == null) {
return false;
}
final boolean result = defsList.remove(metricDefinitionAndTenantId);
if (result) {
if (defsList.isEmpty()) {
byMetricName.remove(metricDefinitionAndTenantId.metricDefinition.name);
if (byMetricName.isEmpty())
byTenantId.remove(metricDefinitionAndTenantId.tenantId);
}
}
return result;
}
public boolean match(final MetricDefinitionAndTenantId toMatch,
final List<MetricDefinitionAndTenantId> matches) {
Map<String, List<MetricDefinitionAndTenantId>> byMetricName = byTenantId.get(toMatch.tenantId);
if (byMetricName == null)
return false;
List<MetricDefinitionAndTenantId> defsList = byMetricName.get(toMatch.metricDefinition.name);
if (defsList == null)
return false;
matches.clear();
for (final MetricDefinitionAndTenantId existing : defsList) {
if (toMatch.metricDefinition.dimensions.size() >= existing.metricDefinition.dimensions.size()) {
boolean isMatch = true;
for (final Entry<String, String> entry : existing.metricDefinition.dimensions.entrySet()) {
if (!compareStrings(entry.getValue(), toMatch.metricDefinition.dimensions.get(entry.getKey()))) {
isMatch = false;
break;
}
}
if (isMatch)
matches.add(existing);
}
}
return !matches.isEmpty();
}
public boolean isEmpty() {
return byTenantId.isEmpty();
}
public void clear() {
byTenantId.clear();
}
private boolean compareStrings(final String s1,
final String s2) {
if (s1 == s2)
return true;
if (s1 == null)
return false;
return s1.equals(s2);
}
}

View File

@ -1,5 +1,6 @@
package com.hpcloud.mon.infrastructure.thresholding;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@ -17,6 +18,7 @@ import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import com.hpcloud.mon.domain.model.MetricDefinitionAndTenantId;
import com.hpcloud.mon.domain.model.MetricDefinitionAndTenantIdMatcher;
import com.hpcloud.mon.domain.model.SubAlarm;
import com.hpcloud.mon.domain.service.MetricDefinitionDAO;
import com.hpcloud.mon.domain.service.SubAlarmDAO;
@ -30,19 +32,19 @@ import com.hpcloud.util.Injector;
* Filters metrics for which there is no associated alarm and forwards metrics for which there is an
* alarm. Receives metric alarm and metric sub-alarm events to update metric definitions.
*
* METRIC_DEFS table is shared between any bolts in the same worker process so that all of the
* METRIC_DEFS table and the matcher are shared between any bolts in the same worker process so that all of the
* MetricDefinitionAndTenantIds for existing SubAlarms only have to be read once and because it is not
* possible to predict which bolt gets which Metrics so all Bolts know about all starting
* MetricDefinitionAndTenantIds.
*
* The current topology uses shuffleGrouping for the incoming Metrics and allGrouping for the
* events. So, any Bolt may get any Metric so the METRIC_DEFS table must be kept up to date
* events. So, any Bolt may get any Metric so the METRIC_DEFS table and the matcher must be kept up to date
* for all MetricDefinitionAndTenantIds.
*
* The METRIC_DEFS table contains a List of SubAlarms IDs that reference the same MetricDefinitionAndTenantId
* so if a SubAlarm is deleted, the MetricDefinitionAndTenantId will only be deleted if no more SubAlarms
* reference it. Incrementing and decrementing the count is done under the static lock SENTINAL
* to ensure it is correct across all Bolts sharing the same METRIC_DEFS table. The
* so if a SubAlarm is deleted, the MetricDefinitionAndTenantId will only be deleted from it and the matcher if no
* more SubAlarms reference it. Incrementing and decrementing the count is done under the static lock SENTINAL
* to ensure it is correct across all Bolts sharing the same METRIC_DEFS table and the matcher. The
* amount of adds and deletes will be very small compared to the number of Metrics so it shouldn't
* block the Metric handling.
*
@ -60,6 +62,7 @@ import com.hpcloud.util.Injector;
public class MetricFilteringBolt extends BaseRichBolt {
private static final long serialVersionUID = 1096706128973976599L;
private static final Map<MetricDefinitionAndTenantId, List<String>> METRIC_DEFS = new ConcurrentHashMap<>();
private static final MetricDefinitionAndTenantIdMatcher matcher = new MetricDefinitionAndTenantIdMatcher();
private static final Object SENTINAL = new Object();
public static final String[] FIELDS = new String[] { "metricDefinitionAndTenantId", "metric" };
@ -67,6 +70,7 @@ public class MetricFilteringBolt extends BaseRichBolt {
private DataSourceFactory dbConfig;
private transient MetricDefinitionDAO metricDefDAO;
private OutputCollector collector;
private List<MetricDefinitionAndTenantId> matches = new ArrayList<>();
public MetricFilteringBolt(DataSourceFactory dbConfig) {
this.dbConfig = dbConfig;
@ -89,8 +93,12 @@ public class MetricFilteringBolt extends BaseRichBolt {
MetricDefinitionAndTenantId metricDefinitionAndTenantId = (MetricDefinitionAndTenantId) tuple.getValue(0);
LOG.debug("metric definition and tenant id: {}", metricDefinitionAndTenantId);
if (METRIC_DEFS.containsKey(metricDefinitionAndTenantId))
collector.emit(tuple, tuple.getValues());
// Check for exact matches as well as inexact matches
if (matcher.match(metricDefinitionAndTenantId, matches))
for (final MetricDefinitionAndTenantId match : matches)
// Must send with the MetricDefinitionAndTenantId that it matches, not one in the Metric although
// they may be the same
collector.emit(tuple, new Values(match, tuple.getValue(1)));
} else {
String eventType = tuple.getString(0);
MetricDefinitionAndTenantId metricDefinitionAndTenantId = (MetricDefinitionAndTenantId) tuple.getValue(1);
@ -120,6 +128,7 @@ public class MetricFilteringBolt extends BaseRichBolt {
if (subAlarmIds != null) {
if (subAlarmIds.remove(subAlarmId) && subAlarmIds.isEmpty()) {
METRIC_DEFS.remove(metricDefinitionAndTenantId);
matcher.remove(metricDefinitionAndTenantId);
}
}
}
@ -157,6 +166,7 @@ public class MetricFilteringBolt extends BaseRichBolt {
if (subAlarmIds == null) {
subAlarmIds = new LinkedList<>();
METRIC_DEFS.put(metricDefinitionAndTenantId, subAlarmIds);
matcher.add(metricDefinitionAndTenantId);
}
else if (subAlarmIds.contains(subAlarmId))
return; // Make sure it only gets added once. Multiple bolts process the same AlarmCreatedEvent
@ -168,6 +178,7 @@ public class MetricFilteringBolt extends BaseRichBolt {
*/
static void clearMetricDefinitions() {
METRIC_DEFS.clear();
matcher.clear();
}
/**

View File

@ -8,8 +8,10 @@ import static org.mockito.Mockito.doAnswer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@ -172,6 +174,8 @@ public class ThresholdingEngineTest extends TopologyTestCase {
int waitCount = 0;
int feedCount = 5;
int goodValueCount = 0;
final Map<String, String> extraMemMetricDefDimensions = new HashMap<>(memMetricDef.dimensions);
extraMemMetricDefDimensions.put("Group", "group A");
for (int i = 1; i < 40 && alarmsSent == 0; i++) {
if (feedCount > 0) {
System.out.println("Feeding metrics...");
@ -180,7 +184,7 @@ public class ThresholdingEngineTest extends TopologyTestCase {
metricSpout.feed(new Values(new MetricDefinitionAndTenantId(cpuMetricDef, TEST_ALARM_TENANT_ID), new Metric(cpuMetricDef.name,
cpuMetricDef.dimensions, time, (double) (++goodValueCount == 15 ? 1 : 555))));
metricSpout.feed(new Values(new MetricDefinitionAndTenantId(memMetricDef, TEST_ALARM_TENANT_ID), new Metric(memMetricDef.name,
memMetricDef.dimensions, time, (double) (goodValueCount == 15 ? 1 : 555))));
extraMemMetricDefDimensions, time, (double) (goodValueCount == 15 ? 1 : 555))));
if (--feedCount == 0)
waitCount = 3;

View File

@ -0,0 +1,143 @@
package com.hpcloud.mon.domain.model;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertEqualsNoOrder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.hpcloud.mon.common.model.metric.MetricDefinition;
@Test
public class MetricDefinitionAndTenantIdMatcherTest {
private static final String HOST = "host";
private static final String LOAD_BALANCER_GROUP = "loadBalancerGroup";
private static final String CPU_METRIC_NAME = "cpu";
private MetricDefinitionAndTenantIdMatcher matcher;
private List<MetricDefinitionAndTenantId> matches = new ArrayList<>();
private final String tenantId = "4242";
private MetricDefinition metricDef;
private Map<String, String> dimensions;
@BeforeMethod
protected void beforeMethod() {
matches.clear();
matcher = new MetricDefinitionAndTenantIdMatcher();
dimensions = new HashMap<>();
dimensions.put(HOST, "CloudAmI");
dimensions.put(LOAD_BALANCER_GROUP, "GroupA");
metricDef = new MetricDefinition(CPU_METRIC_NAME, dimensions);
}
public void shouldNotFind() {
assertTrue(matcher.isEmpty());
final MetricDefinitionAndTenantId toMatch = new MetricDefinitionAndTenantId(metricDef, tenantId);
assertFalse(matcher.match(toMatch, matches));
final MetricDefinitionAndTenantId diffTenantId = new MetricDefinitionAndTenantId(metricDef, "Different");
matcher.add(diffTenantId);
assertFalse(matcher.match(toMatch, matches));
matcher.add(toMatch);
assertTrue(matcher.match(toMatch, matches));
assertEquals(matches, Arrays.asList(toMatch));
matches.clear();
final MetricDefinitionAndTenantId noMatchOnName = new MetricDefinitionAndTenantId(
new MetricDefinition("NotCpu", dimensions), tenantId);
assertFalse(matcher.match(noMatchOnName, matches));
final Map<String, String> hostDimensions = new HashMap<>(dimensions);
hostDimensions.put(HOST, "OtherHost");
final MetricDefinitionAndTenantId noMatchOnDimensions = new MetricDefinitionAndTenantId(
new MetricDefinition(CPU_METRIC_NAME, hostDimensions), tenantId);
assertFalse(matcher.match(noMatchOnDimensions, matches));
matcher.remove(toMatch);
assertFalse(matcher.match(toMatch, matches));
matcher.remove(diffTenantId);
assertTrue(matcher.isEmpty());
}
public void shouldFind() {
assertTrue(matcher.isEmpty());
final MetricDefinitionAndTenantId toMatch = new MetricDefinitionAndTenantId(metricDef, tenantId);
final Map<String, String> hostDimensions = new HashMap<>();
hostDimensions.put(HOST, dimensions.get(HOST));
final MetricDefinitionAndTenantId hostMatch = new MetricDefinitionAndTenantId(
new MetricDefinition(CPU_METRIC_NAME, hostDimensions), tenantId);
matcher.add(hostMatch);
final Map<String, String> groupDimensions = new HashMap<>();
groupDimensions.put(LOAD_BALANCER_GROUP, dimensions.get(LOAD_BALANCER_GROUP));
final MetricDefinitionAndTenantId groupMatch = new MetricDefinitionAndTenantId(
new MetricDefinition(CPU_METRIC_NAME, groupDimensions), tenantId);
matcher.add(groupMatch);
assertTrue(matcher.match(toMatch, matches));
assertEqualsNoOrder(matches.toArray(), new MetricDefinitionAndTenantId[] {
hostMatch, groupMatch});
matches.clear();
matcher.add(toMatch);
assertTrue(matcher.match(toMatch, matches));
assertEqualsNoOrder(matches.toArray(), new MetricDefinitionAndTenantId[] {
hostMatch, groupMatch, toMatch});
matches.clear();
matcher.remove(groupMatch);
assertTrue(matcher.match(toMatch, matches));
assertEqualsNoOrder(matches.toArray(), new MetricDefinitionAndTenantId[] {
hostMatch, toMatch});
matches.clear();
matcher.remove(toMatch);
assertTrue(matcher.match(toMatch, matches));
assertEqualsNoOrder(matches.toArray(), new MetricDefinitionAndTenantId[] {
hostMatch});
matches.clear();
// Remove it again to ensure it won't throw an exception if the MetricDefinitionAndTenantId
// doesn't exist
matcher.remove(toMatch);
final MetricDefinitionAndTenantId loadMetric = new MetricDefinitionAndTenantId(
new MetricDefinition("load", new HashMap<String, String>(dimensions)), tenantId);
matcher.add(loadMetric);
matcher.remove(hostMatch);
assertFalse(matcher.match(toMatch, matches));
// Remove it again to ensure it won't throw an exception if the MetricDefinitionAndTenantId
// doesn't exist
matcher.remove(hostMatch);
matcher.remove(loadMetric);
assertTrue(matcher.isEmpty());
// I don't really expect nulls values for the dimensions, but make sure it doesn't throw an exception
final Map<String, String> nullDimensions = new HashMap<>(dimensions);
nullDimensions.put(HOST, null);
final MetricDefinitionAndTenantId nullMatch = new MetricDefinitionAndTenantId(
new MetricDefinition(CPU_METRIC_NAME, nullDimensions), tenantId);
matcher.add(nullMatch);
assertTrue(matcher.match(nullMatch, matches));
assertEqualsNoOrder(matches.toArray(), new MetricDefinitionAndTenantId[] {
nullMatch});
assertFalse(matcher.match(toMatch, matches));
matcher.remove(nullMatch);
assertTrue(matcher.isEmpty());
}
}

View File

@ -127,10 +127,23 @@ public class MetricFilteringBoltTest {
private void sendMetricsAndVerify(final OutputCollector collector1,
final MetricFilteringBolt bolt1, VerificationMode howMany) {
for (final SubAlarm subAlarm : subAlarms) {
final Tuple tuple = createMetricTuple(subAlarm);
bolt1.execute(tuple);
verify(collector1, times(1)).ack(tuple);
verify(collector1, howMany).emit(tuple, tuple.getValues());
// First do a MetricDefinition that is an exact match
final MetricDefinition metricDefinition = subAlarm.getExpression().getMetricDefinition();
final Tuple exactTuple = createMetricTuple(metricDefinition, new Metric(metricDefinition, System.currentTimeMillis()/1000, 42.0));
bolt1.execute(exactTuple);
verify(collector1, times(1)).ack(exactTuple);
verify(collector1, howMany).emit(exactTuple, exactTuple.getValues());
// Now do a MetricDefinition with an extra dimension that should still match the SubAlarm
final Map<String, String> extraDimensions = new HashMap<>(metricDefinition.dimensions);
extraDimensions.put("group", "group_a");
final MetricDefinition inexactMetricDef = new MetricDefinition(metricDefinition.name, extraDimensions);
Metric inexactMetric = new Metric(inexactMetricDef, System.currentTimeMillis()/1000, 42.0);
final Tuple inexactTuple = createMetricTuple(metricDefinition, inexactMetric);
bolt1.execute(inexactTuple);
verify(collector1, times(1)).ack(inexactTuple);
// We want the MetricDefinitionAndTenantId from the exact tuple, but the inexactMetric
verify(collector1, howMany).emit(inexactTuple, new Values(exactTuple.getValue(0), inexactMetric));
}
}
@ -205,13 +218,11 @@ public class MetricFilteringBoltTest {
return tuple;
}
private Tuple createMetricTuple(final SubAlarm subAlarm) {
private Tuple createMetricTuple(final MetricDefinition metricDefinition,
final Metric metric) {
final MkTupleParam tupleParam = new MkTupleParam();
tupleParam.setFields(MetricFilteringBolt.FIELDS);
tupleParam.setStream(Streams.DEFAULT_STREAM_ID);
MetricDefinition metricDefinition = subAlarm.getExpression().getMetricDefinition();
final Metric metric = new Metric(metricDefinition, System.currentTimeMillis()/1000, 42.0);
final Tuple tuple = Testing.testTuple(Arrays.asList(
tupleParam.setStream(Streams.DEFAULT_STREAM_ID); final Tuple tuple = Testing.testTuple(Arrays.asList(
new MetricDefinitionAndTenantId(metricDefinition, TEST_TENANT_ID), metric), tupleParam);
return tuple;
}