Adding value_meta to certain metrics

The value_meta parameter was recently added to the
monasca-api.  This allows the addition of meta-information
to be sent with any metrics that are sent from the agent.
value_meta was added for the http_status, nagios_wrapper,
and host_alive checks in the agent.  Also, consolidated
some of the duplicate exception handling and reformatted
some code blocks.

Change-Id: I3d9e9a8e54cd2582bde47c7c128a3788b821c156
This commit is contained in:
Gary Hessler 2015-03-12 11:36:35 -06:00
parent 728ce70e85
commit b9a55a00a1
8 changed files with 278 additions and 190 deletions

View File

@ -562,7 +562,7 @@ The host alive checks return the following metric:
| Metric Name | Dimensions | Semantics |
| ----------- | ---------- | --------- |
| host_alive_status | hostname, service, component, observer_host, target_host | Provides the status of the target host based on an ssh or ping check
| host_alive_status | hostname, service, component, observer_host, target_host, test_type | Provides the status of the target host based on an ssh or ping check
## Process Checks
Process checks can be performed to verify that a set of named processes are running on the local system. The YAML file `process.yaml` contains the list of processes that are checked. The processes can be identified using a pattern match or exact match on the process name. A Python script `process.py` runs each execution cycle to check that required processes are alive. If the process is running a value of 0 is sent, otherwise a value of 1 is sent to the Monasca API.

View File

@ -294,9 +294,9 @@ class AgentCheck(util.Dimensions):
"""
return len(self.instances)
def gauge(self, metric, value, dimensions=None, delegated_tenant=None,
hostname=None, device_name=None, timestamp=None):
"""Record the value of a gauge, with optional dimensions, hostname and device name.
def gauge(self, metric, value, dimensions=None, delegated_tenant=None, hostname=None,
device_name=None, timestamp=None, value_meta=None):
"""Record the value of a gauge, with optional dimensions, hostname, value metadata and device name.
:param metric: The name of the metric
:param value: The value of the gauge
@ -305,6 +305,7 @@ class AgentCheck(util.Dimensions):
:param hostname: (optional) A hostname for this metric. Defaults to the current hostname.
:param device_name: (optional) The device name for this metric
:param timestamp: (optional) The timestamp for this metric value
:param value_meta: Additional metadata about this value
"""
self.aggregator.gauge(metric,
value,
@ -312,9 +313,11 @@ class AgentCheck(util.Dimensions):
delegated_tenant,
hostname,
device_name,
timestamp)
timestamp,
value_meta)
def increment(self, metric, value=1, dimensions=None, delegated_tenant=None, hostname=None, device_name=None):
def increment(self, metric, value=1, dimensions=None, delegated_tenant=None,
hostname=None, device_name=None, value_meta=None):
"""Increment a counter with optional dimensions, hostname and device name.
:param metric: The name of the metric
@ -323,15 +326,18 @@ class AgentCheck(util.Dimensions):
:param delegated_tenant: (optional) Submit metrics on behalf of this tenant ID.
:param hostname: (optional) A hostname for this metric. Defaults to the current hostname.
:param device_name: (optional) The device name for this metric
:param value_meta: Additional metadata about this value
"""
self.aggregator.increment(metric,
value,
dimensions,
delegated_tenant,
hostname,
device_name)
device_name,
value_meta)
def decrement(self, metric, value=-1, dimensions=None, delegated_tenant=None, hostname=None, device_name=None):
def decrement(self, metric, value=-1, dimensions=None, delegated_tenant=None,
hostname=None, device_name=None, value_meta=None):
"""Decrement a counter with optional dimensions, hostname and device name.
:param metric: The name of the metric
@ -340,15 +346,18 @@ class AgentCheck(util.Dimensions):
:param delegated_tenant: (optional) Submit metrics on behalf of this tenant ID.
:param hostname: (optional) A hostname for this metric. Defaults to the current hostname.
:param device_name: (optional) The device name for this metric
:param value_meta: Additional metadata about this value
"""
self.aggregator.decrement(metric,
value,
dimensions,
delegated_tenant,
hostname,
device_name)
device_name,
value_meta)
def rate(self, metric, value, dimensions=None, delegated_tenant=None, hostname=None, device_name=None):
def rate(self, metric, value, dimensions=None, delegated_tenant=None,
hostname=None, device_name=None, value_meta=None):
"""Submit a point for a metric that will be calculated as a rate on flush.
Values will persist across each call to `check` if there is not enough
@ -360,15 +369,18 @@ class AgentCheck(util.Dimensions):
:param delegated_tenant: (optional) Submit metrics on behalf of this tenant ID.
:param hostname: (optional) A hostname for this metric. Defaults to the current hostname.
:param device_name: (optional) The device name for this metric
:param value_meta: Additional metadata about this value
"""
self.aggregator.rate(metric,
value,
dimensions,
delegated_tenant,
hostname,
device_name)
device_name,
value_meta)
def histogram(self, metric, value, dimensions=None, delegated_tenant=None, hostname=None, device_name=None):
def histogram(self, metric, value, dimensions=None, delegated_tenant=None,
hostname=None, device_name=None, value_meta=None):
"""Sample a histogram value, with optional dimensions, hostname and device name.
:param metric: The name of the metric
@ -377,15 +389,18 @@ class AgentCheck(util.Dimensions):
:param delegated_tenant: (optional) Submit metrics on behalf of this tenant ID.
:param hostname: (optional) A hostname for this metric. Defaults to the current hostname.
:param device_name: (optional) The device name for this metric
:param value_meta: Additional metadata about this value
"""
self.aggregator.histogram(metric,
value,
dimensions,
delegated_tenant,
hostname,
device_name)
device_name,
value_meta)
def set(self, metric, value, dimensions=None, delegated_tenant=None, hostname=None, device_name=None):
def set(self, metric, value, dimensions=None, delegated_tenant=None,
hostname=None, device_name=None, value_meta=None):
"""Sample a set value, with optional dimensions, hostname and device name.
:param metric: The name of the metric
@ -394,13 +409,15 @@ class AgentCheck(util.Dimensions):
:param delegated_tenant: (optional) Submit metrics on behalf of this tenant ID.
:param hostname: (optional) A hostname for this metric. Defaults to the current hostname.
:param device_name: (optional) The device name for this metric
:param value_meta: Additional metadata about this value
"""
self.aggregator.set(metric,
value,
dimensions,
delegated_tenant,
hostname,
device_name)
device_name,
value_meta)
def event(self, event):
"""Save an event.
@ -446,7 +463,7 @@ class AgentCheck(util.Dimensions):
print(" Name: {0}".format(metric.name))
print(" Value: {0}".format(metric.value))
if (metric.delegated_tenant):
print(" Delegtd ID: {0}".format(metric.delegated_tenant))
print(" Delegate ID: {0}".format(metric.delegated_tenant))
print(" Dimensions: ", end='')
line = 0
@ -455,7 +472,19 @@ class AgentCheck(util.Dimensions):
print(" " * 13, end='')
print("{0}={1}".format(name, metric.dimensions[name]))
line += 1
print(" Value Meta: ", end='')
if metric.value_meta:
line = 0
for name in metric.value_meta:
if line != 0:
print(" " * 13, end='')
print("{0}={1}".format(name, metric.value_meta[name]))
line += 1
else:
print('None')
print("-" * 24)
return metrics
def get_events(self):

View File

@ -30,28 +30,33 @@ class HostAlive(services_checks.ServicesCheck):
if timeout is not None:
sock.settimeout(timeout)
except socket.error as msg:
self.log.error("Error creating socket: " + str(msg[0]) + msg[1])
return False
error_message = 'Error creating socket: {0}'.format(str(msg[0]) + msg[1])
self.log.warn(error_message)
return False, error_message
try:
host_ip = socket.gethostbyname(host)
except socket.gaierror:
self.log.error("Unable to resolve host", host)
return False
error_message = 'Unable to resolve host {0}'.format(host)
self.log.warn(error_message)
return False, error_message
try:
sock.connect((host_ip, port))
banner = sock.recv(1024)
sock.close()
except socket.error:
return False
error_message = 'Unable to open socket to host {0}'.format(host)
self.log.warn(error_message)
return False, error_message
if banner.startswith('SSH'):
return True
return True, None
else:
return False
error_message = 'Unexpected response "{0}" from host {1}'.format(banner, host)
self.log.warn(error_message)
return False, error_message
@staticmethod
def _test_ping(host, timeout=None):
def _test_ping(self, host, timeout=None):
"""Attempt to ping the host.
"""
@ -69,13 +74,17 @@ class HostAlive(services_checks.ServicesCheck):
try:
ping = subprocess.check_output(ping_command.split(" "), stderr=subprocess.STDOUT)
except subprocess.CalledProcessError:
return False
error_message = 'ping command "{0}" failed to execute on operating system'.format(ping_command)
self.log.warn(error_message)
return False, error_message
# Look at the output for a packet loss percentage
if ping.find('100%') > 0:
return False
if (ping.find('100%') > 0) or (ping.find('100.0%') > 0):
error_message = 'ping command "{0}" failed. {1} is not available.'.format(ping_command, host)
self.log.warn(error_message)
return False, error_message
else:
return True
return True, None
def _create_status_event(self, status, msg, instance):
"""Does nothing: status events are not yet supported by Mon API.
@ -97,20 +106,28 @@ class HostAlive(services_checks.ServicesCheck):
success = False
if instance['alive_test'] == 'ssh':
success = self._test_ssh(instance['host_name'],
self.init_config.get('ssh_port'),
self.init_config.get('ssh_timeout'))
elif instance['alive_test'] == 'ping':
success = self._test_ping(instance['host_name'],
self.init_config.get('ping_timeout'))
test_type = instance['alive_test']
if test_type == 'ssh':
success, error_message = self._test_ssh(instance['host_name'],
self.init_config.get('ssh_port'),
self.init_config.get('ssh_timeout'))
elif test_type == 'ping':
success, error_message = self._test_ping(instance['host_name'],
self.init_config.get('ping_timeout'))
else:
self.log.info("Unrecognized alive_test " + instance['alive_test'])
error_message = 'Unrecognized alive_test: {0}'.format(test_type)
dimensions.update({'test_type': test_type})
if success is True:
self.gauge('host_alive_status', 0, dimensions=dimensions)
self.gauge('host_alive_status',
0,
dimensions=dimensions)
return services_checks.Status.UP, "UP"
else:
self.gauge('host_alive_status', 1, dimensions=dimensions)
self.log.error("Host down: " + instance['host_name'])
self.gauge('host_alive_status',
1,
dimensions=dimensions,
value_meta={'error': error_message})
self.log.error('Host alive check for {0} failed. Error was {1}'.format(instance['host_name'],
error_message))
return services_checks.Status.DOWN, "DOWN"

View File

@ -76,59 +76,47 @@ class HTTPCheck(services_checks.ServicesCheck):
h.add_credentials(username, password)
resp, content = h.request(addr, "GET", headers=headers)
except socket.timeout as e:
except (socket.timeout, HttpLib2Error, socket.error) as e:
length = int((time.time() - start) * 1000)
self.log.info(
"%s is DOWN, error: %s. Connection failed after %s ms" % (addr, str(e), length))
self.gauge('http_status', 1, dimensions=dimensions)
return services_checks.Status.DOWN, "%s is DOWN, error: %s. Connection failed after %s ms" % (
addr, str(e), length)
except HttpLib2Error as e:
length = int((time.time() - start) * 1000)
self.log.info(
"%s is DOWN, error: %s. Connection failed after %s ms" % (addr, str(e), length))
self.gauge('http_status', 1, dimensions=dimensions)
return services_checks.Status.DOWN, "%s is DOWN, error: %s. Connection failed after %s ms" % (
addr, str(e), length)
except socket.error as e:
length = int((time.time() - start) * 1000)
self.log.info("%s is DOWN, error: %s. Connection failed after %s ms" % (
addr, repr(e), length))
self.gauge('http_status', 1, dimensions=dimensions)
return services_checks.Status.DOWN, "%s is DOWN, error: %s. Connection failed after %s ms" % (
addr, str(e), length)
error_string = '{0} is DOWN, error: {1}. Connection failed after {2} ms'.format(addr, str(e), length)
self.log.info(error_string)
self.gauge('http_status',
1,
dimensions=dimensions,
value_meta={'error': error_string})
return services_checks.Status.DOWN, error_string
except httplib.ResponseNotReady as e:
length = int((time.time() - start) * 1000)
self.log.info("%s is DOWN, error: %s. Network is not routable after %s ms" % (
addr, repr(e), length))
self.gauge('http_status', 1, dimensions=dimensions)
return services_checks.Status.DOWN, "%s is DOWN, error: %s. Network is not routable after %s ms" % (
addr, str(e), length)
error_string = '{0} is DOWN, error: {1}. Network is not routable after {2} ms'.format(addr, repr(e), length)
self.log.info(error_string)
self.gauge('http_status',
1,
dimensions=dimensions,
value_meta={'error': error_string})
return services_checks.Status.DOWN, error_string
except Exception as e:
length = int((time.time() - start) * 1000)
self.log.error(
"Unhandled exception %s. Connection failed after %s ms" % (str(e), length))
self.gauge('http_status', 1, dimensions=dimensions)
return services_checks.Status.DOWN, "%s is DOWN, error: %s. Connection failed after %s ms" % (
addr, str(e), length)
error_string = '{0} is DOWN, error: {1}. Connection failed after {2} ms'.format(addr, str(e), length)
self.log.error('Unhandled exception {0}. Connection failed after {1} ms'.format(str(e), length))
self.gauge('http_status',
1,
dimensions=dimensions,
value_meta={'error': error_string})
return services_checks.Status.DOWN, error_string
if response_time:
# Stop the timer as early as possible
running_time = time.time() - start
self.gauge('http_response_time', running_time, dimensions=dimensions)
# TODO(dschroeder): Save/send content data when supported by API
if int(resp.status) >= 400:
if use_keystone and int(resp.status) == 401:
if retry:
self.log.error("%s is DOWN, unable to get a valid token to connect with" % (addr))
return services_checks.Status.DOWN, "%s is DOWN, unable to get a valid token to connect with" % (
addr)
error_string = '{0} is DOWN, unable to get a valid token to connect with'.format(addr)
self.log.error(error_string)
return services_checks.Status.DOWN, error_string
else:
# Get a new token and retry
self.log.info("Token expired, getting new token and retrying...")
@ -136,20 +124,28 @@ class HTTPCheck(services_checks.ServicesCheck):
key.refresh_token()
continue
else:
self.log.info("%s is DOWN, error code: %s" % (addr, str(resp.status)))
self.gauge('http_status', 1, dimensions=dimensions)
return services_checks.Status.DOWN, "%s is DOWN, error code: %s" % (addr, str(resp.status))
error_string = '{0} is DOWN, error code: {1}'.format(addr, str(resp.status))
self.log.info(error_string)
self.gauge('http_status',
1,
dimensions=dimensions,
value_meta={'error': error_string})
return services_checks.Status.DOWN, error_string
if pattern is not None:
if re.search(pattern, content, re.DOTALL):
self.log.debug("Pattern match successful")
else:
self.log.info("Pattern match failed! '%s' not in '%s'" % (pattern, content))
self.gauge('http_status', 1, dimensions=dimensions)
return services_checks.Status.DOWN, "Pattern match failed! '%s' not in '%s'" % (
pattern, content)
error_string = 'Pattern match failed! "{0}" not in "{1}"'.format(pattern, content)
self.log.info(error_string)
self.gauge('http_status',
1,
dimensions=dimensions,
value_meta={'error': error_string})
return services_checks.Status.DOWN, error_string
self.log.debug("%s is UP" % addr)
success_string = '{0} is UP'.format(addr)
self.log.debug(success_string)
self.gauge('http_status', 0, dimensions=dimensions)
done = True
return services_checks.Status.UP, "%s is UP" % addr
return services_checks.Status.UP, success_string

View File

@ -82,6 +82,7 @@ class WrapNagios(ServicesCheck):
metric_name = instance.get('metric_name', instance['name'])
detail = None
try:
proc = subprocess.Popen(instance['check_command'].split(" "),
env={"PATH": extra_path},
@ -90,16 +91,22 @@ class WrapNagios(ServicesCheck):
output = proc.communicate()
# The check detail is all the text before the pipe
detail = output[0].split('|')[0]
# TODO(dschroeder): Save/send 'detail' when supported by the API
except OSError:
# Return an UNKNOWN code (3) if I have landed here
self.gauge(metric_name, 3, dimensions=dimensions)
self.log.info(instance['check_command'].split(" ")[0] + " is missing or unreadable")
error_string = instance['check_command'].split(" ")[0] + " is missing or unreadable"
self.gauge(metric_name,
3,
dimensions=dimensions,
value_meta={'error': error_string})
self.log.info(error_string)
return
status_code = proc.poll()
last_run_data[instance['name']] = time.time()
self.gauge(metric_name, status_code, dimensions=dimensions)
self.gauge(metric_name,
status_code,
dimensions=dimensions,
value_meta={'detail': detail})
# Return DOWN on critical, UP otherwise
if status_code == "2":
return Status.DOWN, "DOWN: {0}".format(detail)

View File

@ -45,9 +45,9 @@ class MetricsAggregator(object):
}
def decrement(self, name, value=-1, dimensions=None, delegated_tenant=None,
hostname=None, device_name=None):
hostname=None, device_name=None, value_meta=None):
self.submit_metric(name, value, 'c', dimensions, delegated_tenant,
hostname, device_name)
hostname, device_name, value_meta)
def event(
self,
@ -92,7 +92,10 @@ class MetricsAggregator(object):
# when required
metrics = []
for context, metric in self.metrics.items():
metrics.extend(metric.flush(timestamp))
try:
metrics.extend(metric.flush(timestamp))
except Exception as e:
log.exception('Error flushing {0} metrics.'.format(metric.name))
# Log a warning regarding metrics with old timestamps being submitted
if self.num_discarded_old_points > 0:
@ -118,8 +121,8 @@ class MetricsAggregator(object):
return events
@staticmethod
def formatter(metric, value, timestamp, dimensions, hostname,
delegated_tenant=None, device_name=None, metric_type=None):
def formatter(metric, value, timestamp, dimensions, hostname, delegated_tenant=None,
device_name=None, metric_type=None, value_meta=None):
""" Formats metrics, put them into a Measurement class
(metric, timestamp, value, {"dimensions": {"name1": "value1", "name2": "value2"}, ...})
dimensions should be a dictionary
@ -133,22 +136,23 @@ class MetricsAggregator(object):
int(timestamp),
value,
dimensions,
delegated_tenant)
delegated_tenant=delegated_tenant,
value_meta=value_meta)
def gauge(self, name, value, dimensions=None, delegated_tenant=None,
hostname=None, device_name=None, timestamp=None):
hostname=None, device_name=None, timestamp=None, value_meta=None):
self.submit_metric(name, value, 'g', dimensions, delegated_tenant,
hostname, device_name, timestamp)
hostname, device_name, value_meta, timestamp)
def histogram(self, name, value, dimensions=None, delegated_tenant=None,
hostname=None, device_name=None):
hostname=None, device_name=None, value_meta=None):
self.submit_metric(name, value, 'h', dimensions, delegated_tenant,
hostname, device_name)
hostname, device_name, value_meta)
def increment(self, name, value=1, dimensions=None, delegated_tenant=None,
hostname=None, device_name=None):
hostname=None, device_name=None, value_meta=None):
self.submit_metric(name, value, 'c', dimensions, delegated_tenant,
hostname, device_name)
hostname, device_name, value_meta)
def packets_per_second(self, interval):
if interval == 0:
@ -156,21 +160,26 @@ class MetricsAggregator(object):
return round(float(self.count) / interval, 2)
def rate(self, name, value, dimensions=None, delegated_tenant=None,
hostname=None, device_name=None):
hostname=None, device_name=None, value_meta=None):
self.submit_metric(name, value, 'r', dimensions, delegated_tenant,
hostname, device_name)
hostname, device_name, value_meta)
def set(self, name, value, dimensions=None, delegated_tenant=None,
hostname=None, device_name=None):
hostname=None, device_name=None, value_meta=None):
self.submit_metric(name, value, 's', dimensions, delegated_tenant,
hostname, device_name)
hostname, device_name, value_meta)
def submit_metric(self, name, value, mtype, dimensions=None,
delegated_tenant=None, hostname=None, device_name=None,
timestamp=None, sample_rate=1):
value_meta=None, timestamp=None, sample_rate=1):
context = (name, tuple(dimensions.items()),
delegated_tenant, hostname, device_name)
if value_meta:
meta = tuple(value_meta.items())
else:
meta = None
context = (name, tuple(dimensions.items()), meta, delegated_tenant,
hostname, device_name)
if context not in self.metrics:
metric_class = self.metric_type_to_class[mtype]
@ -179,7 +188,8 @@ class MetricsAggregator(object):
dimensions,
hostname or self.hostname,
device_name,
delegated_tenant)
delegated_tenant,
value_meta)
cur_time = time()
if timestamp is not None:
if cur_time - int(timestamp) > self.recent_point_threshold:

View File

@ -11,12 +11,16 @@ log = logging.getLogger(__name__)
class Measurement(object):
def __init__(self, name, timestamp, value, dimensions, delegated_tenant=None):
def __init__(self, name, timestamp, value, dimensions, delegated_tenant=None, value_meta=None):
self.name = name
self.timestamp = timestamp
self.value = value
self.dimensions = dimensions.copy()
self.delegated_tenant = delegated_tenant
if value_meta:
self.value_meta = value_meta.copy()
else:
self.value_meta = None
class MetricTypes(object):
@ -46,7 +50,8 @@ class Gauge(Metric):
""" A metric that tracks a value at particular points in time. """
def __init__(self, formatter, name, dimensions,
hostname, device_name, delegated_tenant=None):
hostname, device_name, delegated_tenant=None,
value_meta=None):
self.formatter = formatter
self.name = name
self.value = None
@ -55,6 +60,10 @@ class Gauge(Metric):
self.hostname = hostname
self.device_name = device_name
self.timestamp = time()
if value_meta:
self.value_meta = value_meta.copy()
else:
self.value_meta = None
def sample(self, value, sample_rate, timestamp=None):
self.value = value
@ -62,20 +71,19 @@ class Gauge(Metric):
def flush(self, timestamp):
if self.value is not None:
res = [self.formatter(
metric=self.name,
timestamp=self.timestamp or timestamp,
value=self.value,
dimensions=self.dimensions,
delegated_tenant=self.delegated_tenant,
hostname=self.hostname,
device_name=self.device_name,
metric_type=MetricTypes.GAUGE
)]
value = self.value
self.value = None
return res
return []
return [self.formatter(metric=self.name,
timestamp=self.timestamp or timestamp,
value=value,
dimensions=self.dimensions,
delegated_tenant=self.delegated_tenant,
hostname=self.hostname,
device_name=self.device_name,
metric_type=MetricTypes.GAUGE,
value_meta=self.value_meta)]
else:
return []
class Counter(Metric):
@ -83,7 +91,8 @@ class Counter(Metric):
""" A metric that tracks a counter value. """
def __init__(self, formatter, name, dimensions,
hostname, device_name, delegated_tenant=None):
hostname, device_name, delegated_tenant=None,
value_meta=None):
self.formatter = formatter
self.name = name
self.value = 0
@ -91,24 +100,29 @@ class Counter(Metric):
self.delegated_tenant = delegated_tenant
self.hostname = hostname
self.device_name = device_name
if value_meta:
self.value_meta = value_meta.copy()
else:
self.value_meta = None
def sample(self, value, sample_rate, timestamp=None):
self.value += value * int(1 / sample_rate)
def flush(self, timestamp):
try:
return [self.formatter(
metric=self.name,
value=self.value,
timestamp=timestamp,
dimensions=self.dimensions,
delegated_tenant=self.delegated_tenant,
hostname=self.hostname,
device_name=self.device_name,
metric_type=MetricTypes.RATE
)]
finally:
self.value = 0
if self.value is not None:
value = self.value
self.value = None
return [self.formatter(metric=self.name,
value=value,
timestamp=timestamp,
dimensions=self.dimensions,
delegated_tenant=self.delegated_tenant,
hostname=self.hostname,
device_name=self.device_name,
metric_type=MetricTypes.RATE,
value_meta=self.value_meta)]
else:
return []
class Histogram(Metric):
@ -116,7 +130,8 @@ class Histogram(Metric):
""" A metric to track the distribution of a set of values. """
def __init__(self, formatter, name, dimensions,
hostname, device_name, delegated_tenant=None):
hostname, device_name, delegated_tenant=None,
value_meta=None):
self.formatter = formatter
self.name = name
self.count = 0
@ -126,14 +141,19 @@ class Histogram(Metric):
self.delegated_tenant = delegated_tenant
self.hostname = hostname
self.device_name = device_name
if value_meta:
self.value_meta = value_meta.copy()
else:
self.value_meta = None
def sample(self, value, sample_rate, timestamp=None):
self.count += int(1 / sample_rate)
self.samples.append(value)
def flush(self, ts):
def flush(self, timestamp):
metrics = []
if not self.count:
return []
return metrics
self.samples.sort()
length = len(self.samples)
@ -149,31 +169,28 @@ class Histogram(Metric):
('count', self.count, MetricTypes.RATE)
]
metrics = [self.formatter(
hostname=self.hostname,
device_name=self.device_name,
dimensions=self.dimensions,
delegated_tenant=self.delegated_tenant,
metric='%s.%s' % (self.name, suffix),
value=value,
timestamp=ts,
metric_type=metric_type
) for suffix, value, metric_type in metric_aggrs
]
metrics.append([self.formatter(hostname=self.hostname,
device_name=self.device_name,
dimensions=self.dimensions,
delegated_tenant=self.delegated_tenant,
metric='%s.%s' % (self.name, suffix),
value=value,
timestamp=timestamp,
metric_type=metric_type,
value_meta=self.value_meta
) for suffix, value, metric_type in metric_aggrs])
for p in self.percentiles:
val = self.samples[int(round(p * length - 1))]
name = '%s.%spercentile' % (self.name, int(p * 100))
metrics.append(self.formatter(
hostname=self.hostname,
dimensions=self.dimensions,
delegated_tenant=self.delegated_tenant,
metric=name,
value=val,
timestamp=ts,
metric_type=MetricTypes.GAUGE
))
metrics.append(self.formatter(hostname=self.hostname,
dimensions=self.dimensions,
delegated_tenant=self.delegated_tenant,
metric=name,
value=val,
timestamp=timestamp,
metric_type=MetricTypes.GAUGE,
value_meta=self.value_meta))
# Reset our state.
self.samples = []
self.count = 0
@ -186,7 +203,8 @@ class Set(Metric):
""" A metric to track the number of unique elements in a set. """
def __init__(self, formatter, name, dimensions,
hostname, device_name, delegated_tenant=None):
hostname, device_name, delegated_tenant=None,
value_meta=None):
self.formatter = formatter
self.name = name
self.dimensions = dimensions.copy()
@ -194,34 +212,41 @@ class Set(Metric):
self.hostname = hostname
self.device_name = device_name
self.values = set()
if value_meta:
self.value_meta = value_meta.copy()
else:
self.value_meta = None
def sample(self, value, sample_rate, timestamp=None):
self.values.add(value)
def flush(self, timestamp):
metrics = []
if not self.values:
return []
try:
return [self.formatter(
hostname=self.hostname,
device_name=self.device_name,
dimensions=self.dimensions,
delegated_tenant=self.delegated_tenant,
metric=self.name,
value=len(self.values),
timestamp=timestamp,
metric_type=MetricTypes.GAUGE
)]
finally:
else:
values = self.values.copy()
self.values = set()
return [self.formatter(hostname=self.hostname,
device_name=self.device_name,
dimensions=self.dimensions,
delegated_tenant=self.delegated_tenant,
metric=self.name,
value=len(values),
timestamp=timestamp,
metric_type=MetricTypes.GAUGE,
value_meta=self.value_meta)]
class Rate(Metric):
""" Track the rate of metrics over each flush interval """
def __init__(self, formatter, name, dimensions,
hostname, device_name, delegated_tenant=None):
hostname, device_name, delegated_tenant=None,
value_meta=None):
self.formatter = formatter
self.name = name
self.dimensions = dimensions.copy()
@ -229,6 +254,10 @@ class Rate(Metric):
self.hostname = hostname
self.device_name = device_name
self.samples = []
if value_meta:
self.value_meta = value_meta.copy()
else:
self.value_meta = None
def sample(self, value, sample_rate, timestamp=None):
if not timestamp:
@ -251,18 +280,16 @@ class Rate(Metric):
def flush(self, timestamp):
if len(self.samples) < 2:
return []
try:
val = self._rate(self.samples[-2], self.samples[-1])
self.samples = self.samples[-1:]
except Exception:
log.warn('Error flushing {0} sample.'.format(self.name))
return []
val = self._rate(self.samples[-2], self.samples[-1])
self.samples = self.samples[-1:]
return [self.formatter(hostname=self.hostname,
device_name=self.device_name,
dimensions=self.dimensions,
delegated_tenant=self.delegated_tenant,
metric=self.name,
value=val,
timestamp=timestamp,
metric_type=MetricTypes.GAUGE)]
device_name=self.device_name,
dimensions=self.dimensions,
delegated_tenant=self.delegated_tenant,
metric=self.name,
value=val,
timestamp=timestamp,
metric_type=MetricTypes.GAUGE,
value_meta=self.value_meta)]

View File

@ -99,6 +99,8 @@ class MonAPI(object):
for measurement in measurements:
m_dict = measurement.__dict__
m_dict['timestamp'] *= 1000
if m_dict['value_meta'] is None:
del m_dict['value_meta']
delegated_tenant = m_dict.pop('delegated_tenant')
if delegated_tenant not in tenant_group:
tenant_group[delegated_tenant] = []