From afc22b56a185e26bc59d26dc350c9c164f09bf04 Mon Sep 17 00:00:00 2001 From: Dexter Fryar Date: Mon, 27 Jul 2015 18:58:29 -0500 Subject: [PATCH] Added a whitelist for restricting the StatsD metrics A whitelist and metric map for the metrics that are sent by Storm / Threshold Engine to the Monasca StatsD agent/daemon. Also relates to: https://github.com/hpcloud-mon/ansible-monasca-thresh/pull/14 ======= /etc/monasca/thresh-config.yml ``` statsdConfig: host: localhost port: 8125 debugmetrics: false dimensions: !!map service : monitoring component : storm whitelist: !!seq - aggregation-bolt.execute-count.filtering-bolt_alarm-creation-stream - aggregation-bolt.execute-count.filtering-bolt_default - aggregation-bolt.execute-count.system_tick - filtering-bolt.execute-count.event-bolt_metric-alarm-events - filtering-bolt.execute-count.metrics-spout_default - thresholding-bolt.execute-count.aggregation-bolt_default - thresholding-bolt.execute-count.event-bolt_alarm-definition-events - system.memory_heap.committedBytes - system.memory_nonHeap.committedBytes - system.newWorkerEvent - system.startTimeSecs - system.GC_ConcurrentMarkSweep.timeMs metricmap: !!map aggregation-bolt.execute-count.filtering-bolt_alarm-creation-stream : monasca_threshold.aggregation-bolt.execute-count.filtering-bolt_alarm-creation-stream aggregation-bolt.execute-count.filtering-bolt_default : monasca_threshold.aggregation-bolt.execute-count.filtering-bolt_default aggregation-bolt.execute-count.system_tick : monasca_threshold.aggregation-bolt.execute-count.system_tick filtering-bolt.execute-count.event-bolt_metric-alarm-events : monasca_threshold.filtering-bolt.execute-count.event-bolt_metric-alarm-events filtering-bolt.execute-count.metrics-spout_default : monasca_threshold.filtering-bolt.execute-count.metrics-spout_default thresholding-bolt.execute-count.aggregation-bolt_default : monasca_threshold.thresholding-bolt.execute-count.aggregation-bolt_default thresholding-bolt.execute-count.event-bolt_alarm-definition-events : monasca_threshold.thresholding-bolt.execute-count.event-bolt_alarm-definition-events system.memory_heap.committedBytes : monasca_threshold.system.memory_heap.committedBytes system.memory_nonHeap.committedBytes : monasca_threshold.system.memory_nonHeap.committedBytes system.newWorkerEvent : monasca_threshold.system.newWorkerEvent system.startTimeSecs : monasca_threshold.system.startTimeSecs system.GC_ConcurrentMarkSweep.timeMs : monasca_threshold.system.GC_ConcurrentMarkSweep.timeMs ``` host: IP or host where the Monasca Agent running a StatsD is running that will consume the metrics produced by Storm / Threshold Engine port: UDP port number where the Monasca Agent running a StatsD daemon that will consume the metrics produced by Storm / Threshold Engine dimensions: A map of key/value pairs that will be passed along as dimensions for each metric whitelist: A list of metrics in the native name that Storm presents metricmap: A mapping from the native Storm metric name to a user defined name. The user defined name is what will appear in the Monasca data store. If there is no mapping present and it is listed in the whitelist then it will be published with the native name. The 12 metrics whitelisted/mapped above correspond to the monasca health dashboard which is defined in grafana. https://github.com/hpcloud-mon/grafana/blob/master/src/app/dashboards/monasca.json Change-Id: I7bcefd03d02714ac42efd9b2d9cadb77907fa17e --- .../src/main/config/thresh-sample-config.yml | 385 +++++++++++++++++- .../java/monasca/thresh/TopologyModule.java | 12 +- .../monasca/thresh/utils/StatsdConfig.java | 47 ++- .../thresh/utils/StatsdMetricConsumer.java | 137 +++++-- 4 files changed, 525 insertions(+), 56 deletions(-) diff --git a/thresh/src/main/config/thresh-sample-config.yml b/thresh/src/main/config/thresh-sample-config.yml index af89a03..c607355 100644 --- a/thresh/src/main/config/thresh-sample-config.yml +++ b/thresh/src/main/config/thresh-sample-config.yml @@ -1,14 +1,6 @@ metricSpoutThreads: 2 metricSpoutTasks: 2 -statsdConfig: - host: localhost - port: 8125 - prefix: monasca.storm. - dimensions: !!map - service : monitoring - component : storm - metricSpoutConfig: maxWaitTime: 500 @@ -80,9 +72,9 @@ kafkaProducerConfig: requestRequiredAcks: 1 requestTimeoutMs: 10000 producerType: sync - keySerializerClass: + keySerializerClass: compressionCodec: none - compressedTopics: + compressedTopics: messageSendMaxRetries: 3 retryBackoffMs: 100 topicMetadataRefreshIntervalMs: 600000 @@ -117,3 +109,376 @@ database: maxSize: 41 + + +statsdConfig: + host: localhost + port: 8125 + debugmetrics: false + dimensions: !!map + service : monitoring + component : storm + whitelist: !!seq + - aggregation-bolt.execute-count.filtering-bolt_alarm-creation-stream + - aggregation-bolt.execute-count.filtering-bolt_default + - aggregation-bolt.execute-count.system_tick + - filtering-bolt.execute-count.event-bolt_metric-alarm-events + - filtering-bolt.execute-count.metrics-spout_default + - thresholding-bolt.execute-count.aggregation-bolt_default + - thresholding-bolt.execute-count.event-bolt_alarm-definition-events + - system.memory_heap.committedBytes + - system.memory_nonHeap.committedBytes + - system.newWorkerEvent + - system.startTimeSecs + - system.GC_ConcurrentMarkSweep.timeMs + metricmap: !!map + acker.emit-count.metrics : + monasca_threshold.acker.emit-count.metrics + acker.receive.capacity : + monasca_threshold.acker.receive.capacity + acker.receive.population : + monasca_threshold.acker.receive.population + acker.receive.read_pos : + monasca_threshold.acker.receive.read_pos + acker.receive.write_pos : + monasca_threshold.acker.receive.write_pos + acker.sendqueue.capacity : + monasca_threshold.acker.sendqueue.capacity + acker.sendqueue.population : + monasca_threshold.acker.sendqueue.population + acker.sendqueue.read_pos : + monasca_threshold.acker.sendqueue.read_pos + acker.sendqueue.write_pos : + monasca_threshold.acker.sendqueue.write_pos + acker.transfer-count.metrics : + monasca_threshold.acker.transfer-count.metrics + aggregation-bolt.ack-count.alarm-creation-bolt_alarm-creation-stream : + monasca_threshold.aggregation-bolt.ack-count.alarm-creation-bolt_alarm-creation-stream + aggregation-bolt.ack-count.event-bolt_metric-sub-alarm-events : + monasca_threshold.aggregation-bolt.ack-count.event-bolt_metric-sub-alarm-events + aggregation-bolt.ack-count.filtering-bolt_default : + monasca_threshold.aggregation-bolt.ack-count.filtering-bolt_default + aggregation-bolt.ack-count.system_tick : + monasca_threshold.aggregation-bolt.ack-count.system_tick + aggregation-bolt.emit-count.default : + monasca_threshold.aggregation-bolt.emit-count.default + aggregation-bolt.emit-count.metrics : + monasca_threshold.aggregation-bolt.emit-count.metrics + aggregation-bolt.emit-count.system : + monasca_threshold.aggregation-bolt.emit-count.system + aggregation-bolt.execute-count.alarm-creation-bolt_alarm-creation-stream : + monasca_threshold.aggregation-bolt.execute-count.alarm-creation-bolt_alarm-creation-stream + aggregation-bolt.execute-count.event-bolt_metric-sub-alarm-events : + monasca_threshold.aggregation-bolt.execute-count.event-bolt_metric-sub-alarm-events + aggregation-bolt.execute-count.filtering-bolt_default : + monasca_threshold.aggregation-bolt.execute-count.filtering-bolt_default + aggregation-bolt.execute-count.system_tick : + monasca_threshold.aggregation-bolt.execute-count.system_tick + aggregation-bolt.execute-latency.alarm-creation-bolt_alarm-creation-stream : + monasca_threshold.aggregation-bolt.execute-latency.alarm-creation-bolt_alarm-creation-stream + aggregation-bolt.execute-latency.event-bolt_metric-sub-alarm-events : + monasca_threshold.aggregation-bolt.execute-latency.event-bolt_metric-sub-alarm-events + aggregation-bolt.execute-latency.filtering-bolt_default : + monasca_threshold.aggregation-bolt.execute-latency.filtering-bolt_default + aggregation-bolt.execute-latency.system_tick : + monasca_threshold.aggregation-bolt.execute-latency.system_tick + aggregation-bolt.process-latency.alarm-creation-bolt_alarm-creation-stream : + monasca_threshold.aggregation-bolt.process-latency.alarm-creation-bolt_alarm-creation-stream + aggregation-bolt.process-latency.event-bolt_metric-sub-alarm-events : + monasca_threshold.aggregation-bolt.process-latency.event-bolt_metric-sub-alarm-events + aggregation-bolt.process-latency.filtering-bolt_default : + monasca_threshold.aggregation-bolt.process-latency.filtering-bolt_default + aggregation-bolt.process-latency.system_tick : + monasca_threshold.aggregation-bolt.process-latency.system_tick + aggregation-bolt.receive.capacity : + monasca_threshold.aggregation-bolt.receive.capacity + aggregation-bolt.receive.population : + monasca_threshold.aggregation-bolt.receive.population + aggregation-bolt.receive.read_pos : + monasca_threshold.aggregation-bolt.receive.read_pos + aggregation-bolt.receive.write_pos : + monasca_threshold.aggregation-bolt.receive.write_pos + aggregation-bolt.sendqueue.capacity : + monasca_threshold.aggregation-bolt.sendqueue.capacity + aggregation-bolt.sendqueue.population : + monasca_threshold.aggregation-bolt.sendqueue.population + aggregation-bolt.sendqueue.read_pos : + monasca_threshold.aggregation-bolt.sendqueue.read_pos + aggregation-bolt.sendqueue.write_pos : + monasca_threshold.aggregation-bolt.sendqueue.write_pos + aggregation-bolt.transfer-count.default : + monasca_threshold.aggregation-bolt.transfer-count.default + aggregation-bolt.transfer-count.metrics : + monasca_threshold.aggregation-bolt.transfer-count.metrics + aggregation-bolt.transfer-count.system : + monasca_threshold.aggregation-bolt.transfer-count.system + alarm-creation-bolt.ack-count.event-bolt_alarm-definition-events : + monasca_threshold.alarm-creation-bolt.ack-count.event-bolt_alarm-definition-events + alarm-creation-bolt.ack-count.filtering-bolt_newMetricForAlarmDefinitionStream : + monasca_threshold.alarm-creation-bolt.ack-count.filtering-bolt_newMetricForAlarmDefinitionStream + alarm-creation-bolt.emit-count.alarm-creation-stream : + monasca_threshold.alarm-creation-bolt.emit-count.alarm-creation-stream + alarm-creation-bolt.emit-count.metrics : + monasca_threshold.alarm-creation-bolt.emit-count.metrics + alarm-creation-bolt.execute-count.event-bolt_alarm-definition-events : + monasca_threshold.alarm-creation-bolt.execute-count.event-bolt_alarm-definition-events + alarm-creation-bolt.execute-count.filtering-bolt_newMetricForAlarmDefinitionStream : + monasca_threshold.alarm-creation-bolt.execute-count.filtering-bolt_newMetricForAlarmDefinitionStream + alarm-creation-bolt.execute-latency.event-bolt_alarm-definition-events : + monasca_threshold.alarm-creation-bolt.execute-latency.event-bolt_alarm-definition-events + alarm-creation-bolt.execute-latency.filtering-bolt_newMetricForAlarmDefinitionStream : + monasca_threshold.alarm-creation-bolt.execute-latency.filtering-bolt_newMetricForAlarmDefinitionStream + alarm-creation-bolt.process-latency.event-bolt_alarm-definition-events : + monasca_threshold.alarm-creation-bolt.process-latency.event-bolt_alarm-definition-events + alarm-creation-bolt.process-latency.filtering-bolt_newMetricForAlarmDefinitionStream : + monasca_threshold.alarm-creation-bolt.process-latency.filtering-bolt_newMetricForAlarmDefinitionStream + alarm-creation-bolt.receive.capacity : + monasca_threshold.alarm-creation-bolt.receive.capacity + alarm-creation-bolt.receive.population : + monasca_threshold.alarm-creation-bolt.receive.population + alarm-creation-bolt.receive.read_pos : + monasca_threshold.alarm-creation-bolt.receive.read_pos + alarm-creation-bolt.receive.write_pos : + monasca_threshold.alarm-creation-bolt.receive.write_pos + alarm-creation-bolt.sendqueue.capacity : + monasca_threshold.alarm-creation-bolt.sendqueue.capacity + alarm-creation-bolt.sendqueue.population : + monasca_threshold.alarm-creation-bolt.sendqueue.population + alarm-creation-bolt.sendqueue.read_pos : + monasca_threshold.alarm-creation-bolt.sendqueue.read_pos + alarm-creation-bolt.sendqueue.write_pos : + monasca_threshold.alarm-creation-bolt.sendqueue.write_pos + alarm-creation-bolt.transfer-count.alarm-creation-stream : + monasca_threshold.alarm-creation-bolt.transfer-count.alarm-creation-stream + alarm-creation-bolt.transfer-count.metrics : + monasca_threshold.alarm-creation-bolt.transfer-count.metrics + event-bolt.emit-count.alarm-definition-events : + monasca_threshold.event-bolt.emit-count.alarm-definition-events + event-bolt.emit-count.metrics : + monasca_threshold.event-bolt.emit-count.metrics + event-bolt.execute-count.event-spout_default : + monasca_threshold.event-bolt.execute-count.event-spout_default + event-bolt.execute-latency.event-spout_default : + monasca_threshold.event-bolt.execute-latency.event-spout_default + event-bolt.receive.capacity : + monasca_threshold.event-bolt.receive.capacity + event-bolt.receive.population : + monasca_threshold.event-bolt.receive.population + event-bolt.receive.read_pos : + monasca_threshold.event-bolt.receive.read_pos + event-bolt.receive.write_pos : + monasca_threshold.event-bolt.receive.write_pos + event-bolt.sendqueue.capacity : + monasca_threshold.event-bolt.sendqueue.capacity + event-bolt.sendqueue.population : + monasca_threshold.event-bolt.sendqueue.population + event-bolt.sendqueue.read_pos : + monasca_threshold.event-bolt.sendqueue.read_pos + event-bolt.sendqueue.write_pos : + monasca_threshold.event-bolt.sendqueue.write_pos + event-bolt.transfer-count.alarm-definition-events : + monasca_threshold.event-bolt.transfer-count.alarm-definition-events + event-bolt.transfer-count.metrics : + monasca_threshold.event-bolt.transfer-count.metrics + event-spout.emit-count.default : + monasca_threshold.event-spout.emit-count.default + event-spout.emit-count.metrics : + monasca_threshold.event-spout.emit-count.metrics + event-spout.receive.capacity : + monasca_threshold.event-spout.receive.capacity + event-spout.receive.population : + monasca_threshold.event-spout.receive.population + event-spout.receive.read_pos : + monasca_threshold.event-spout.receive.read_pos + event-spout.receive.write_pos : + monasca_threshold.event-spout.receive.write_pos + event-spout.sendqueue.capacity : + monasca_threshold.event-spout.sendqueue.capacity + event-spout.sendqueue.population : + monasca_threshold.event-spout.sendqueue.population + event-spout.sendqueue.read_pos : + monasca_threshold.event-spout.sendqueue.read_pos + event-spout.sendqueue.write_pos : + monasca_threshold.event-spout.sendqueue.write_pos + event-spout.transfer-count.default : + monasca_threshold.event-spout.transfer-count.default + event-spout.transfer-count.metrics : + monasca_threshold.event-spout.transfer-count.metrics + filtering-bolt.ack-count.event-bolt_alarm-definition-events : + monasca_threshold.filtering-bolt.ack-count.event-bolt_alarm-definition-events + filtering-bolt.ack-count.metrics-spout_default : + monasca_threshold.filtering-bolt.ack-count.metrics-spout_default + filtering-bolt.emit-count.default : + monasca_threshold.filtering-bolt.emit-count.default + filtering-bolt.emit-count.metrics : + monasca_threshold.filtering-bolt.emit-count.metrics + filtering-bolt.emit-count.newMetricForAlarmDefinitionStream : + monasca_threshold.filtering-bolt.emit-count.newMetricForAlarmDefinitionStream + filtering-bolt.execute-count.event-bolt_alarm-definition-events : + monasca_threshold.filtering-bolt.execute-count.event-bolt_alarm-definition-events + filtering-bolt.execute-count.metrics-spout_default : + monasca_threshold.filtering-bolt.execute-count.metrics-spout_default + filtering-bolt.execute-latency.event-bolt_alarm-definition-events : + monasca_threshold.filtering-bolt.execute-latency.event-bolt_alarm-definition-events + filtering-bolt.execute-latency.metrics-spout_default : + monasca_threshold.filtering-bolt.execute-latency.metrics-spout_default + filtering-bolt.process-latency.event-bolt_alarm-definition-events : + monasca_threshold.filtering-bolt.process-latency.event-bolt_alarm-definition-events + filtering-bolt.process-latency.metrics-spout_default : + monasca_threshold.filtering-bolt.process-latency.metrics-spout_default + filtering-bolt.receive.capacity : + monasca_threshold.filtering-bolt.receive.capacity + filtering-bolt.receive.population : + monasca_threshold.filtering-bolt.receive.population + filtering-bolt.receive.read_pos : + monasca_threshold.filtering-bolt.receive.read_pos + filtering-bolt.receive.write_pos : + monasca_threshold.filtering-bolt.receive.write_pos + filtering-bolt.sendqueue.capacity : + monasca_threshold.filtering-bolt.sendqueue.capacity + filtering-bolt.sendqueue.population : + monasca_threshold.filtering-bolt.sendqueue.population + filtering-bolt.sendqueue.read_pos : + monasca_threshold.filtering-bolt.sendqueue.read_pos + filtering-bolt.sendqueue.write_pos : + monasca_threshold.filtering-bolt.sendqueue.write_pos + filtering-bolt.transfer-count.default : + monasca_threshold.filtering-bolt.transfer-count.default + filtering-bolt.transfer-count.metrics : + monasca_threshold.filtering-bolt.transfer-count.metrics + filtering-bolt.transfer-count.newMetricForAlarmDefinitionStream : + monasca_threshold.filtering-bolt.transfer-count.newMetricForAlarmDefinitionStream + metrics-spout.emit-count.default : + monasca_threshold.metrics-spout.emit-count.default + metrics-spout.emit-count.metrics : + monasca_threshold.metrics-spout.emit-count.metrics + metrics-spout.receive.capacity : + monasca_threshold.metrics-spout.receive.capacity + metrics-spout.receive.population : + monasca_threshold.metrics-spout.receive.population + metrics-spout.receive.read_pos : + monasca_threshold.metrics-spout.receive.read_pos + metrics-spout.receive.write_pos : + monasca_threshold.metrics-spout.receive.write_pos + metrics-spout.sendqueue.capacity : + monasca_threshold.metrics-spout.sendqueue.capacity + metrics-spout.sendqueue.population : + monasca_threshold.metrics-spout.sendqueue.population + metrics-spout.sendqueue.read_pos : + monasca_threshold.metrics-spout.sendqueue.read_pos + metrics-spout.sendqueue.write_pos : + monasca_threshold.metrics-spout.sendqueue.write_pos + metrics-spout.transfer-count.default : + monasca_threshold.metrics-spout.transfer-count.default + metrics-spout.transfer-count.metrics : + monasca_threshold.metrics-spout.transfer-count.metrics + system.emit-count.metrics : + monasca_threshold.system.emit-count.metrics + system.GC_ConcurrentMarkSweep.count : + monasca_threshold.system.GC_ConcurrentMarkSweep.count + system.GC_ConcurrentMarkSweep.timeMs : + monasca_threshold.system.GC_ConcurrentMarkSweep.timeMs + system.GC_ParNew.count : + monasca_threshold.system.GC_ParNew.count + system.GC_ParNew.timeMs : + monasca_threshold.system.GC_ParNew.timeMs + system.memory_heap.committedBytes : + monasca_threshold.system.memory_heap.committedBytes + system.memory_heap.initBytes : + monasca_threshold.system.memory_heap.initBytes + system.memory_heap.maxBytes : + monasca_threshold.system.memory_heap.maxBytes + system.memory_heap.unusedBytes : + monasca_threshold.system.memory_heap.unusedBytes + system.memory_heap.usedBytes : + monasca_threshold.system.memory_heap.usedBytes + system.memory_heap.virtualFreeBytes : + monasca_threshold.system.memory_heap.virtualFreeBytes + system.memory_nonHeap.committedBytes : + monasca_threshold.system.memory_nonHeap.committedBytes + system.memory_nonHeap.initBytes : + monasca_threshold.system.memory_nonHeap.initBytes + system.memory_nonHeap.maxBytes : + monasca_threshold.system.memory_nonHeap.maxBytes + system.memory_nonHeap.unusedBytes : + monasca_threshold.system.memory_nonHeap.unusedBytes + system.memory_nonHeap.usedBytes : + monasca_threshold.system.memory_nonHeap.usedBytes + system.memory_nonHeap.virtualFreeBytes : + monasca_threshold.system.memory_nonHeap.virtualFreeBytes + system.newWorkerEvent : + monasca_threshold.system.newWorkerEvent + system.receive.capacity : + monasca_threshold.system.receive.capacity + system.receive.population : + monasca_threshold.system.receive.population + system.receive.read_pos : + monasca_threshold.system.receive.read_pos + system.receive.write_pos : + monasca_threshold.system.receive.write_pos + system.sendqueue.capacity : + monasca_threshold.system.sendqueue.capacity + system.sendqueue.population : + monasca_threshold.system.sendqueue.population + system.sendqueue.read_pos : + monasca_threshold.system.sendqueue.read_pos + system.sendqueue.write_pos : + monasca_threshold.system.sendqueue.write_pos + system.startTimeSecs : + monasca_threshold.system.startTimeSecs + system.transfer.capacity : + monasca_threshold.system.transfer.capacity + system.transfer-count.metrics : + monasca_threshold.system.transfer-count.metrics + system.transfer.population : + monasca_threshold.system.transfer.population + system.transfer.read_pos : + monasca_threshold.system.transfer.read_pos + system.transfer.write_pos : + monasca_threshold.system.transfer.write_pos + system.uptimeSecs : + monasca_threshold.system.uptimeSecs + thresholding-bolt.ack-count.aggregation-bolt_default : + monasca_threshold.thresholding-bolt.ack-count.aggregation-bolt_default + thresholding-bolt.ack-count.event-bolt_alarm-definition-events : + monasca_threshold.thresholding-bolt.ack-count.event-bolt_alarm-definition-events + thresholding-bolt.ack-count.event-bolt_metric-sub-alarm-events : + monasca_threshold.thresholding-bolt.ack-count.event-bolt_metric-sub-alarm-events + thresholding-bolt.emit-count.metrics : + monasca_threshold.thresholding-bolt.emit-count.metrics + thresholding-bolt.execute-count.aggregation-bolt_default : + monasca_threshold.thresholding-bolt.execute-count.aggregation-bolt_default + thresholding-bolt.execute-count.event-bolt_alarm-definition-events : + monasca_threshold.thresholding-bolt.execute-count.event-bolt_alarm-definition-events + thresholding-bolt.execute-count.event-bolt_metric-sub-alarm-events : + monasca_threshold.thresholding-bolt.execute-count.event-bolt_metric-sub-alarm-events + thresholding-bolt.execute-latency.aggregation-bolt_default : + monasca_threshold.thresholding-bolt.execute-latency.aggregation-bolt_default + thresholding-bolt.execute-latency.event-bolt_alarm-definition-events : + monasca_threshold.thresholding-bolt.execute-latency.event-bolt_alarm-definition-events + thresholding-bolt.execute-latency.event-bolt_metric-sub-alarm-events : + monasca_threshold.thresholding-bolt.execute-latency.event-bolt_metric-sub-alarm-events + thresholding-bolt.process-latency.aggregation-bolt_default : + monasca_threshold.thresholding-bolt.process-latency.aggregation-bolt_default + thresholding-bolt.process-latency.event-bolt_alarm-definition-events : + monasca_threshold.thresholding-bolt.process-latency.event-bolt_alarm-definition-events + thresholding-bolt.process-latency.event-bolt_metric-sub-alarm-events : + monasca_threshold.thresholding-bolt.process-latency.event-bolt_metric-sub-alarm-events + thresholding-bolt.receive.capacity : + monasca_threshold.thresholding-bolt.receive.capacity + thresholding-bolt.receive.population : + monasca_threshold.thresholding-bolt.receive.population + thresholding-bolt.receive.read_pos : + monasca_threshold.thresholding-bolt.receive.read_pos + thresholding-bolt.receive.write_pos : + monasca_threshold.thresholding-bolt.receive.write_pos + thresholding-bolt.sendqueue.capacity : + monasca_threshold.thresholding-bolt.sendqueue.capacity + thresholding-bolt.sendqueue.population : + monasca_threshold.thresholding-bolt.sendqueue.population + thresholding-bolt.sendqueue.read_pos : + monasca_threshold.thresholding-bolt.sendqueue.read_pos + thresholding-bolt.sendqueue.write_pos : + monasca_threshold.thresholding-bolt.sendqueue.write_pos + thresholding-bolt.transfer-count.metrics : + monasca_threshold.thresholding-bolt.transfer-count.metrics diff --git a/thresh/src/main/java/monasca/thresh/TopologyModule.java b/thresh/src/main/java/monasca/thresh/TopologyModule.java index f7eba1f..716ecc5 100644 --- a/thresh/src/main/java/monasca/thresh/TopologyModule.java +++ b/thresh/src/main/java/monasca/thresh/TopologyModule.java @@ -85,12 +85,18 @@ public class TopologyModule extends AbstractModule { if (config.statsdConfig.getPort() != null) statsdConfig.put(StatsdMetricConsumer.STATSD_PORT, config.statsdConfig.getPort()); - if (config.statsdConfig.getPrefix() != null) - statsdConfig.put(StatsdMetricConsumer.STATSD_PREFIX, - config.statsdConfig.getPrefix()); + if (config.statsdConfig.getWhitelist() != null) + statsdConfig.put(StatsdMetricConsumer.STATSD_WHITELIST, + config.statsdConfig.getWhitelist()); + if (config.statsdConfig.getMetricmap() != null) + statsdConfig.put(StatsdMetricConsumer.STATSD_METRICMAP, + config.statsdConfig.getMetricmap()); if (config.statsdConfig.getDimensions() != null) statsdConfig.put(StatsdMetricConsumer.STATSD_DIMENSIONS, config.statsdConfig.getDimensions()); + if (config.statsdConfig.getDebugmetrics() != null) + statsdConfig.put(StatsdMetricConsumer.STATSD_DEBUGMETRICS, + config.statsdConfig.getDebugmetrics()); stormConfig.registerMetricsConsumer(StatsdMetricConsumer.class, statsdConfig, 2); diff --git a/thresh/src/main/java/monasca/thresh/utils/StatsdConfig.java b/thresh/src/main/java/monasca/thresh/utils/StatsdConfig.java index a672daa..a0e6e03 100644 --- a/thresh/src/main/java/monasca/thresh/utils/StatsdConfig.java +++ b/thresh/src/main/java/monasca/thresh/utils/StatsdConfig.java @@ -19,6 +19,7 @@ package monasca.thresh.utils; import java.io.Serializable; import java.util.Map; +import java.util.List; /* * Intended to deserialize the statsdConfig element in the @@ -29,11 +30,11 @@ public class StatsdConfig implements Serializable { private static final long serialVersionUID = 3634080153227179376L; private String host; - private Integer port; - private String prefix; - + private List whitelist; + private Boolean debugmetrics; + private Map metricmap; private Map dimensions; public Map getDimensions() { @@ -44,6 +45,30 @@ public class StatsdConfig implements Serializable { this.dimensions = dimensions; } + public List getWhitelist() { + return whitelist; + } + + public void setWhitelist(List whitelist) { + this.whitelist = whitelist; + } + + public Boolean getDebugmetrics() { + return debugmetrics; + } + + public void setDebugmetrics(Boolean debugmetrics) { + this.debugmetrics = debugmetrics; + } + + public Map getMetricmap() { + return metricmap; + } + + public void setMetricmap(Map metricmap) { + this.metricmap = metricmap; + } + public String getHost() { return host; } @@ -52,6 +77,14 @@ public class StatsdConfig implements Serializable { this.host = host; } + public String getPrefix() { + return prefix; + } + + public void setPrefix(String prefix) { + this.prefix = prefix; + } + public Integer getPort() { return port; } @@ -60,11 +93,5 @@ public class StatsdConfig implements Serializable { this.port = port; } - public String getPrefix() { - return prefix; - } - - public void setPrefix(String prefix) { - this.prefix = prefix; - } } + diff --git a/thresh/src/main/java/monasca/thresh/utils/StatsdMetricConsumer.java b/thresh/src/main/java/monasca/thresh/utils/StatsdMetricConsumer.java index 392d4cf..beec0c9 100644 --- a/thresh/src/main/java/monasca/thresh/utils/StatsdMetricConsumer.java +++ b/thresh/src/main/java/monasca/thresh/utils/StatsdMetricConsumer.java @@ -19,8 +19,12 @@ package monasca.thresh.utils; import java.io.IOException; import java.io.StringWriter; +import java.lang.Boolean; +import java.lang.String; import java.nio.charset.Charset; +import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -47,14 +51,20 @@ public class StatsdMetricConsumer implements IMetricsConsumer { public static final String STATSD_HOST = "metrics.statsd.host"; public static final String STATSD_PORT = "metrics.statsd.port"; - public static final String STATSD_PREFIX = "metrics.statsd.prefix"; + public static final String STATSD_METRICMAP = "metrics.statsd.metricmap"; + public static final String STATSD_WHITELIST = "metrics.statsd.whitelist"; public static final String STATSD_DIMENSIONS = "metrics.statsd.dimensions"; + public static final String STATSD_DEBUGMETRICS = "metrics.statsd.debugmetrics"; + + private String topologyName; + private String statsdHost = "localhost"; + private int statsdPort = 8125; + private String monascaStatsdDimPrefix = "|#"; + private List whiteList = new ArrayList(); + private Map metricMap = new HashMap(); + private Boolean debugMetrics = false; + - String topologyName; - String statsdHost = "localhost"; - int statsdPort = 8125; - String statsdPrefix = "monasca.storm."; - String monascaStatsdDimPrefix = "|#"; String defaultDimensions = new StringBuilder().append(monascaStatsdDimPrefix) .append("{\"service\":\"monitoring\",\"component\":\"storm\"}") .toString(); @@ -92,9 +102,13 @@ public class StatsdMetricConsumer implements IMetricsConsumer { @Override public void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) { + logger = LoggerFactory.getLogger(Logging.categoryFor(getClass(), context)); + + /* Sets up locals from the config STATSD_WHITELIST, STATSD_HOST ... */ parseConfig(stormConf); + /* Sets up local vars from config vars if present */ if (registrationArgument instanceof Map) { parseConfig((Map) registrationArgument); } @@ -102,8 +116,9 @@ public class StatsdMetricConsumer implements IMetricsConsumer { initClient(); logger.info( - "statsdPrefix ({}), topologyName ({}), clean(topologyName) ({})", - new Object[] { statsdPrefix, topologyName, clean(topologyName) }); + "topologyName ({}), " + + "clean(topologyName) ({})", new Object[] { topologyName, + clean(topologyName) }); } private void initClient() { @@ -144,13 +159,6 @@ public class StatsdMetricConsumer implements IMetricsConsumer { statsdPort = ((Number) conf.get(STATSD_PORT)).intValue(); } - if (conf.containsKey(STATSD_PREFIX)) { - statsdPrefix = (String) conf.get(STATSD_PREFIX); - if (!statsdPrefix.endsWith(".")) { - statsdPrefix += "."; - } - } - if (conf.containsKey(STATSD_DIMENSIONS)) { statsdDimensions = mapToJsonStr((Map) conf .get(STATSD_DIMENSIONS)); @@ -164,6 +172,18 @@ public class StatsdMetricConsumer implements IMetricsConsumer { statsdDimensions = monascaStatsdDimPrefix + statsdDimensions; } } + + if (conf.containsKey(STATSD_WHITELIST)) { + whiteList = (List) conf.get(STATSD_WHITELIST); + } + + if (conf.containsKey(STATSD_METRICMAP)) { + metricMap = (Map) conf.get(STATSD_METRICMAP); + } + + if (conf.containsKey(STATSD_DEBUGMETRICS)) { + debugMetrics = (Boolean) conf.get(STATSD_DEBUGMETRICS); + } } private String mapToJsonStr(Map inputMap) { @@ -216,19 +236,17 @@ public class StatsdMetricConsumer implements IMetricsConsumer { public void handleDataPoints(TaskInfo taskInfo, Collection dataPoints) { for (Metric metric : dataPointsToMetrics(taskInfo, dataPoints)) { - report(metric.name, metric.value, metric.dimensions); + reportUOM(metric.name, metric.value); } } public static class Metric { String name; Double value; - String dimensions; - public Metric(String name, Double value, String dimensions) { + public Metric(String name, Double value) { this.name = name; this.value = value; - this.dimensions = dimensions; } @Override @@ -248,15 +266,12 @@ public class StatsdMetricConsumer implements IMetricsConsumer { return false; if (value != other.value) return false; - if (!dimensions.equals(other.dimensions)) - return false; return true; } @Override public String toString() { - return "Metric [name=" + name + ", value=" + value + ", dimensions=" - + dimensions + "]"; + return "Metric [name=" + name + ", value=" + value + "]"; } } @@ -278,11 +293,11 @@ public class StatsdMetricConsumer implements IMetricsConsumer { new Object[] { p.name, p.value }); if (p.value instanceof Number) { - res.add(new Metric(sb.toString(), ((Number) p.value).doubleValue(), - statsdDimensions)); + res.add(new Metric(sb.toString(), ((Number) p.value).doubleValue())); } // There is a map of data points and it's not empty - else if (p.value instanceof Map && !(((Map) (p.value)).isEmpty())) { + else if (p.value instanceof Map && + !(((Map) (p.value)).isEmpty())) { int hdrAndNameLength = sb.length(); @SuppressWarnings("rawtypes") Map map = (Map) p.value; @@ -293,7 +308,7 @@ public class StatsdMetricConsumer implements IMetricsConsumer { sb.append(".").append(clean(subName.toString())); res.add(new Metric(sb.toString(), - ((Number) subValue).doubleValue(), statsdDimensions)); + ((Number) subValue).doubleValue())); } } } @@ -305,13 +320,10 @@ public class StatsdMetricConsumer implements IMetricsConsumer { * Since the Java client doesn't support the Monasca metric type we need to * build it with a raw UDP request */ - public void report(String s, Double number, String dimensions) { + public void report(String s) { if (udpclient != null) { - StringBuilder statsdMessage = new StringBuilder().append(statsdPrefix) - .append(s).append(":").append(String.valueOf(number)).append("|c") - .append(statsdDimensions); - logger.debug("reporting: {}={}{}", s, number, dimensions); - udpclient.send(statsdMessage.toString()); + logger.debug("reporting: {}", s); + udpclient.send(s); } else { /* Try to setup the UDP client since it was null */ @@ -319,6 +331,65 @@ public class StatsdMetricConsumer implements IMetricsConsumer { } } + private void reportUOM(String s, Double number) { + String metricName = null; + StringBuilder results = new StringBuilder(); + Boolean published = false; + + if (whiteList.contains(s)) { + + if (!metricMap.isEmpty() && metricMap.containsKey(s)) { + metricName = metricMap.get(s); + } + /* Send the unmapped uom as the same name storm calls it */ + else { + metricName = s; + } + + /* Make sure we don't send metric names that may be null or empty */ + if (metricName != null && !metricName.isEmpty()) { + published = true; + } + } + + /* + * To enable debug message, you also need to add an entry like this: + * + * + * + * + * + * + * Storm/Thresh logger config file: + * /opt/storm/apache-storm-0.9.5/logback/cluster.xml + * + */ + + if (debugMetrics) { + String mappedName = new String(); + + if (!metricMap.isEmpty() && metricMap.containsKey(s)) { + mappedName = metricMap.get(s); + } + else { + mappedName = s; + } + + logger.info(", RawMetricName, {}, MappedMetricName, {}, " + + "val, {}, {}", new Object[] + { s, mappedName, number, + published == true ? "PUBLISHED" : "UNPUBLISHED"}); + } + + if (published) { + results = results.append(metricName).append(":") + .append(String.valueOf(number)).append("|c") + .append(statsdDimensions); + + report(results.toString()); + } + } + @Override public void cleanup() { udpclient.stop();