Refactor agent
Remove windows related code as we do not support running on windows Remove the idea of check status as it was added code and complexity that we did not gain much from. Remove some functions at the AgentCheck level that either added another layer of complexity that we did not get any functionality from or functions that we didnt use like events Change-Id: I4b6bc4f9d38e6b4f4fe5c632f885b84aaff7fd08
This commit is contained in:
parent
aede87ac06
commit
ff5e9c7d7c
|
@ -1,4 +1,4 @@
|
|||
# (C) Copyright 2015 Hewlett Packard Enterprise Development Company LP
|
||||
# (C) Copyright 2015,2016 Hewlett Packard Enterprise Development Company LP
|
||||
"""Base class for Checks.
|
||||
|
||||
If you are writing your own checks you should subclass the AgentCheck class.
|
||||
|
@ -9,263 +9,15 @@ from __future__ import print_function
|
|||
|
||||
import logging
|
||||
import os
|
||||
import pprint
|
||||
import re
|
||||
import time
|
||||
import traceback
|
||||
|
||||
import yaml
|
||||
|
||||
import monasca_agent.common.aggregator as aggregator
|
||||
import monasca_agent.common.check_status as check_status
|
||||
import monasca_agent.common.exceptions as exceptions
|
||||
import monasca_agent.common.metrics as metrics_pkg
|
||||
import monasca_agent.common.util as util
|
||||
|
||||
|
||||
# todo convert all checks to the new interface then remove this and Laconic filter which isn't used elsewhere
|
||||
# =============================================================================
|
||||
# DEPRECATED
|
||||
# ------------------------------
|
||||
# If you are writing your own check, you should inherit from AgentCheck
|
||||
# and not this class. This class will be removed in a future version
|
||||
# of the agent and is currently only used for Windows.
|
||||
# =============================================================================
|
||||
class Check(util.Dimensions):
|
||||
|
||||
"""(Abstract) class for all checks with the ability to:
|
||||
|
||||
* store 1 (and only 1) sample for gauges per metric/dimensions combination
|
||||
* compute rates for counters
|
||||
* only log error messages once (instead of each time they occur)
|
||||
"""
|
||||
|
||||
def __init__(self, logger, agent_config=None):
|
||||
# where to store samples, indexed by metric_name
|
||||
# metric_name: {("sorted", "dimensions"): [(ts, value), (ts, value)],
|
||||
# tuple(dimensions) are stored as a key since lists are not hashable
|
||||
# None: [(ts, value), (ts, value)]}
|
||||
# untagged values are indexed by None
|
||||
super(Check, self).__init__(agent_config)
|
||||
self._sample_store = {}
|
||||
self._counters = {} # metric_name: bool
|
||||
self.logger = logger
|
||||
try:
|
||||
self.logger.addFilter(util.LaconicFilter())
|
||||
except Exception:
|
||||
self.logger.exception("Trying to install laconic log filter and failed")
|
||||
|
||||
@staticmethod
|
||||
def normalize(metric, prefix=None):
|
||||
"""Turn a metric into a well-formed metric name
|
||||
|
||||
prefix.b.c
|
||||
"""
|
||||
name = re.sub(r"[,\+\*\-/()\[\]{}]", "_", metric)
|
||||
# Eliminate multiple _
|
||||
name = re.sub(r"__+", "_", name)
|
||||
# Don't start/end with _
|
||||
name = re.sub(r"^_", "", name)
|
||||
name = re.sub(r"_$", "", name)
|
||||
# Drop ._ and _.
|
||||
name = re.sub(r"\._", ".", name)
|
||||
name = re.sub(r"_\.", ".", name)
|
||||
|
||||
if prefix is not None:
|
||||
return prefix + "." + name
|
||||
else:
|
||||
return name
|
||||
|
||||
@staticmethod
|
||||
def normalize_device_name(device_name):
|
||||
return device_name.strip().lower().replace(' ', '_')
|
||||
|
||||
def counter(self, metric):
|
||||
"""Treats the metric as a counter, i.e. computes its per second derivative
|
||||
|
||||
ACHTUNG: Resets previous values associated with this metric.
|
||||
"""
|
||||
self._counters[metric] = True
|
||||
self._sample_store[metric] = {}
|
||||
|
||||
def is_counter(self, metric):
|
||||
"""Is this metric a counter?
|
||||
"""
|
||||
return metric in self._counters
|
||||
|
||||
def gauge(self, metric):
|
||||
"""Treats the metric as a gauge, i.e. keep the data as is
|
||||
|
||||
ACHTUNG: Resets previous values associated with this metric.
|
||||
"""
|
||||
self._sample_store[metric] = {}
|
||||
|
||||
def is_metric(self, metric):
|
||||
return metric in self._sample_store
|
||||
|
||||
def is_gauge(self, metric):
|
||||
return self.is_metric(metric) and not self.is_counter(metric)
|
||||
|
||||
def get_metric_names(self):
|
||||
"""Get all metric names.
|
||||
"""
|
||||
return self._sample_store.keys()
|
||||
|
||||
def save_gauge(self, metric, value, timestamp=None,
|
||||
dimensions=None, hostname=None, device_name=None):
|
||||
"""Save a gauge value.
|
||||
"""
|
||||
if not self.is_gauge(metric):
|
||||
self.gauge(metric)
|
||||
self.save_sample(metric, value, timestamp, dimensions, hostname, device_name)
|
||||
|
||||
def save_sample(self, metric, value, timestamp=None,
|
||||
dimensions=None, hostname=None, device_name=None):
|
||||
"""Save a simple sample, evict old values if needed.
|
||||
"""
|
||||
if timestamp is None:
|
||||
timestamp = time.time()
|
||||
if metric not in self._sample_store:
|
||||
raise exceptions.CheckException("Saving a sample for an undefined metric: %s" % metric)
|
||||
try:
|
||||
value = util.cast_metric_val(value)
|
||||
except ValueError as ve:
|
||||
raise exceptions.NaN(ve)
|
||||
|
||||
# Data eviction rules
|
||||
key = (tuple(sorted(dimensions.items())), device_name)
|
||||
if self.is_gauge(metric):
|
||||
self._sample_store[metric][key] = ((timestamp, value, hostname, device_name), )
|
||||
elif self.is_counter(metric):
|
||||
if self._sample_store[metric].get(key) is None:
|
||||
self._sample_store[metric][key] = [(timestamp, value, hostname, device_name)]
|
||||
else:
|
||||
self._sample_store[metric][key] = self._sample_store[metric][key][-1:] + \
|
||||
[(timestamp, value, hostname, device_name)]
|
||||
else:
|
||||
raise exceptions.CheckException("%s must be either gauge or counter, skipping sample at %s" %
|
||||
(metric, time.ctime(timestamp)))
|
||||
|
||||
if self.is_gauge(metric):
|
||||
# store[metric][dimensions] = (ts, val) - only 1 value allowed
|
||||
assert len(self._sample_store[metric][key]) == 1, self._sample_store[metric]
|
||||
elif self.is_counter(metric):
|
||||
assert len(self._sample_store[metric][key]) in (1, 2), self._sample_store[metric]
|
||||
|
||||
@classmethod
|
||||
def _rate(cls, sample1, sample2):
|
||||
"""Simple rate.
|
||||
"""
|
||||
try:
|
||||
rate_interval = sample2[0] - sample1[0]
|
||||
if rate_interval == 0:
|
||||
raise exceptions.Infinity()
|
||||
|
||||
delta = sample2[1] - sample1[1]
|
||||
if delta < 0:
|
||||
raise exceptions.UnknownValue()
|
||||
|
||||
return (sample2[0], delta / rate_interval, sample2[2], sample2[3])
|
||||
except exceptions.Infinity:
|
||||
raise
|
||||
except exceptions.UnknownValue:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise exceptions.NaN(e)
|
||||
|
||||
def get_sample_with_timestamp(self, metric, dimensions=None, device_name=None, expire=True):
|
||||
"""Get (timestamp-epoch-style, value).
|
||||
"""
|
||||
|
||||
# Get the proper dimensions
|
||||
key = (tuple(sorted(dimensions.items())), device_name)
|
||||
|
||||
# Never seen this metric
|
||||
if metric not in self._sample_store:
|
||||
raise exceptions.UnknownValue()
|
||||
|
||||
# Not enough value to compute rate
|
||||
elif self.is_counter(metric) and len(self._sample_store[metric][key]) < 2:
|
||||
raise exceptions.UnknownValue()
|
||||
|
||||
elif self.is_counter(metric) and len(self._sample_store[metric][key]) >= 2:
|
||||
res = self._rate(
|
||||
self._sample_store[metric][key][-2], self._sample_store[metric][key][-1])
|
||||
if expire:
|
||||
del self._sample_store[metric][key][:-1]
|
||||
return res
|
||||
|
||||
elif self.is_gauge(metric) and len(self._sample_store[metric][key]) >= 1:
|
||||
return self._sample_store[metric][key][-1]
|
||||
|
||||
else:
|
||||
raise exceptions.UnknownValue()
|
||||
|
||||
def get_sample(self, metric, dimensions=None, device_name=None, expire=True):
|
||||
"""Return the last value for that metric.
|
||||
"""
|
||||
x = self.get_sample_with_timestamp(metric, dimensions, device_name, expire)
|
||||
assert isinstance(x, tuple) and len(x) == 4, x
|
||||
return x[1]
|
||||
|
||||
def get_samples_with_timestamps(self, expire=True):
|
||||
"""Return all values {metric: (ts, value)} for non-tagged metrics.
|
||||
"""
|
||||
values = {}
|
||||
for m in self._sample_store:
|
||||
try:
|
||||
values[m] = self.get_sample_with_timestamp(m, expire=expire)
|
||||
except Exception:
|
||||
pass
|
||||
return values
|
||||
|
||||
def get_samples(self, expire=True):
|
||||
"""Return all values {metric: value} for non-tagged metrics.
|
||||
"""
|
||||
values = {}
|
||||
for m in self._sample_store:
|
||||
try:
|
||||
# Discard the timestamp
|
||||
values[m] = self.get_sample_with_timestamp(m, expire=expire)[1]
|
||||
except Exception:
|
||||
pass
|
||||
return values
|
||||
|
||||
def get_metrics(self, expire=True, prettyprint=False):
|
||||
"""Get all metrics, including the ones that are tagged.
|
||||
|
||||
This is the preferred method to retrieve metrics
|
||||
|
||||
@return the list of samples
|
||||
@rtype [(metric_name, timestamp, value,
|
||||
{"dimensions": {"name1": "key1", "name2": "key2"}}), ...]
|
||||
"""
|
||||
metrics = []
|
||||
for m in self._sample_store:
|
||||
try:
|
||||
for key in self._sample_store[m]:
|
||||
dimensions_list, device_name = key
|
||||
dimensions = dict(dimensions_list)
|
||||
try:
|
||||
ts, val, hostname, device_name = self.get_sample_with_timestamp(
|
||||
m, dimensions, device_name, expire)
|
||||
except exceptions.UnknownValue:
|
||||
continue
|
||||
attributes = {}
|
||||
if dimensions_list:
|
||||
attributes['dimensions'] = self._set_dimensions(dimensions)
|
||||
if hostname:
|
||||
attributes['hostname'] = hostname
|
||||
if device_name:
|
||||
attributes['device'] = device_name
|
||||
metrics.append((m, int(ts), val, attributes))
|
||||
except Exception:
|
||||
pass
|
||||
if prettyprint:
|
||||
print("Metrics: {0}".format(metrics))
|
||||
return metrics
|
||||
|
||||
|
||||
class AgentCheck(util.Dimensions):
|
||||
|
||||
def __init__(self, name, init_config, agent_config, instances=None):
|
||||
|
@ -289,7 +41,6 @@ class AgentCheck(util.Dimensions):
|
|||
|
||||
self.events = []
|
||||
self.instances = instances or []
|
||||
self.warnings = []
|
||||
self.library_versions = None
|
||||
|
||||
def instance_count(self):
|
||||
|
@ -429,37 +180,6 @@ class AgentCheck(util.Dimensions):
|
|||
device_name,
|
||||
value_meta)
|
||||
|
||||
def event(self, event):
|
||||
"""Save an event.
|
||||
|
||||
:param event: The event payload as a dictionary. Has the following
|
||||
structure:
|
||||
|
||||
{
|
||||
"timestamp": int, the epoch timestamp for the event,
|
||||
"event_type": string, the event time name,
|
||||
"api_key": string, the api key of the account to associate the event with,
|
||||
"msg_title": string, the title of the event,
|
||||
"msg_text": string, the text body of the event,
|
||||
"alert_type": (optional) string, one of ('error', 'warning', 'success', 'info').
|
||||
Defaults to 'info'.
|
||||
"source_type_name": (optional) string, the source type name,
|
||||
"host": (optional) string, the name of the host,
|
||||
"dimensions": (optional) a dictionary of dimensions to associate with this event
|
||||
}
|
||||
"""
|
||||
if event.get('api_key') is None:
|
||||
event['api_key'] = self.agent_config['api_key']
|
||||
self.events.append(event)
|
||||
|
||||
def has_events(self):
|
||||
"""Check whether the check has saved any events
|
||||
|
||||
@return whether or not the check has saved any events
|
||||
@rtype boolean
|
||||
"""
|
||||
return len(self.events) > 0
|
||||
|
||||
def get_metrics(self, prettyprint=False):
|
||||
"""Get all metrics, including the ones that are tagged.
|
||||
|
||||
|
@ -497,28 +217,6 @@ class AgentCheck(util.Dimensions):
|
|||
|
||||
return metrics
|
||||
|
||||
def get_events(self):
|
||||
"""Return a list of the events saved by the check, if any
|
||||
|
||||
@return the list of events saved by this check
|
||||
@rtype list of event dictionaries
|
||||
"""
|
||||
events = self.events
|
||||
self.events = []
|
||||
return events
|
||||
|
||||
def has_warnings(self):
|
||||
"""Check whether the instance run created any warnings.
|
||||
"""
|
||||
return len(self.warnings) > 0
|
||||
|
||||
def warning(self, warning_message):
|
||||
"""Add a warning message that will be printed in the info page
|
||||
|
||||
:param warning_message: String. Warning message to be displayed
|
||||
"""
|
||||
self.warnings.append(warning_message)
|
||||
|
||||
def get_library_info(self):
|
||||
if self.library_versions is not None:
|
||||
return self.library_versions
|
||||
|
@ -534,13 +232,6 @@ class AgentCheck(util.Dimensions):
|
|||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def get_warnings(self):
|
||||
"""Return the list of warnings messages to be displayed in the info page.
|
||||
"""
|
||||
warnings = self.warnings
|
||||
self.warnings = []
|
||||
return warnings
|
||||
|
||||
def prepare_run(self):
|
||||
"""Do any setup required before running all instances"""
|
||||
return
|
||||
|
@ -550,25 +241,11 @@ class AgentCheck(util.Dimensions):
|
|||
"""
|
||||
self.prepare_run()
|
||||
|
||||
instance_statuses = []
|
||||
for i, instance in enumerate(self.instances):
|
||||
try:
|
||||
self.check(instance)
|
||||
if self.has_warnings():
|
||||
instance_status = check_status.InstanceStatus(i,
|
||||
check_status.STATUS_WARNING,
|
||||
warnings=self.get_warnings())
|
||||
else:
|
||||
instance_status = check_status.InstanceStatus(i,
|
||||
check_status.STATUS_OK)
|
||||
except Exception as e:
|
||||
except Exception:
|
||||
self.log.exception("Check '%s' instance #%s failed" % (self.name, i))
|
||||
instance_status = check_status.InstanceStatus(i,
|
||||
check_status.STATUS_ERROR,
|
||||
error=e,
|
||||
tb=traceback.format_exc())
|
||||
instance_statuses.append(instance_status)
|
||||
return instance_statuses
|
||||
|
||||
def check(self, instance):
|
||||
"""Overriden by the check class. This will be called to run the check.
|
||||
|
|
|
@ -6,7 +6,6 @@ import socket
|
|||
import threading
|
||||
import time
|
||||
|
||||
import monasca_agent.common.check_status as check_status
|
||||
import monasca_agent.common.metrics as metrics
|
||||
import monasca_agent.common.util as util
|
||||
|
||||
|
@ -49,25 +48,14 @@ class Collector(util.Dimensions):
|
|||
def _emit(self, payload):
|
||||
"""Send the payload via the emitter.
|
||||
"""
|
||||
statuses = []
|
||||
# Don't try to send to an emitter if we're stopping/
|
||||
if self.continue_running:
|
||||
name = self.emitter.__name__
|
||||
emitter_status = check_status.EmitterStatus(name)
|
||||
try:
|
||||
self.emitter(payload, log, self.agent_config['forwarder_url'])
|
||||
except Exception as e:
|
||||
except Exception:
|
||||
log.exception("Error running emitter: %s" % self.emitter.__name__)
|
||||
emitter_status = check_status.EmitterStatus(name, e)
|
||||
statuses.append(emitter_status)
|
||||
return statuses
|
||||
|
||||
def _set_status(self, check_statuses, emitter_statuses, collect_duration):
|
||||
try:
|
||||
check_status.CollectorStatus(check_statuses, emitter_statuses).persist()
|
||||
except Exception:
|
||||
log.exception("Error persisting collector status")
|
||||
|
||||
def _set_status(self, collect_duration):
|
||||
if self.run_count <= FLUSH_LOGGING_INITIAL or self.run_count % FLUSH_LOGGING_PERIOD == 0:
|
||||
log.info("Finished run #%s. Collection time: %.2fs." %
|
||||
(self.run_count, round(collect_duration, 2)))
|
||||
|
@ -103,7 +91,7 @@ class Collector(util.Dimensions):
|
|||
log.debug("Starting collection run #%s" % self.run_count)
|
||||
|
||||
# checks_d checks
|
||||
num_metrics, emitter_statuses, checks_statuses = self.run_checks_d()
|
||||
num_metrics = self.run_checks_d()
|
||||
|
||||
collect_duration = timer.step()
|
||||
|
||||
|
@ -116,44 +104,37 @@ class Collector(util.Dimensions):
|
|||
value,
|
||||
self._set_dimensions(dimensions),
|
||||
None))
|
||||
emitter_statuses.append(self._emit(collect_stats))
|
||||
self._emit(collect_stats)
|
||||
|
||||
# Persist the status of the collection run.
|
||||
self._set_status(checks_statuses, emitter_statuses, collect_duration)
|
||||
self._set_status(collect_duration)
|
||||
|
||||
def run_checks_d(self):
|
||||
"""Run defined checks_d checks.
|
||||
|
||||
returns a list of Measurements and a list of check statuses.
|
||||
returns a list of Measurements.
|
||||
"""
|
||||
sub_timer = util.Timer()
|
||||
measurements = 0
|
||||
check_statuses = []
|
||||
emitter_statuses = []
|
||||
for check in self.initialized_checks_d:
|
||||
if not self.continue_running:
|
||||
return
|
||||
log.debug("Running check %s" % check.name)
|
||||
instance_statuses = []
|
||||
metric_count = 0
|
||||
try:
|
||||
# Run the check.
|
||||
instance_statuses = check.run()
|
||||
check.run()
|
||||
|
||||
current_check_metrics = check.get_metrics()
|
||||
|
||||
# Emit the metrics after each check
|
||||
emitter_statuses.append(self._emit(current_check_metrics))
|
||||
self._emit(current_check_metrics)
|
||||
|
||||
# Save the status of the check.
|
||||
metric_count = len(current_check_metrics)
|
||||
measurements += metric_count
|
||||
measurements += len(current_check_metrics)
|
||||
|
||||
except Exception:
|
||||
log.exception("Error running check %s" % check.name)
|
||||
|
||||
status_check = check_status.CheckStatus(check.name, instance_statuses, metric_count,
|
||||
library_versions=check.get_library_info())
|
||||
check_statuses.append(status_check)
|
||||
sub_collect_duration = sub_timer.step()
|
||||
sub_collect_duration_mills = sub_collect_duration * 1000
|
||||
log.debug("Finished run check %s. Collection time: %.2fms." % (
|
||||
|
@ -162,15 +143,7 @@ class Collector(util.Dimensions):
|
|||
log.warn("Collection time for check %s is high: %.2fs." % (
|
||||
check.name, round(sub_collect_duration, 2)))
|
||||
|
||||
for check_name, info in self.init_failed_checks_d.iteritems():
|
||||
if not self.continue_running:
|
||||
return
|
||||
status_check = check_status.CheckStatus(check_name, None, None,
|
||||
init_failed_error=info['error'],
|
||||
init_failed_traceback=info['traceback'])
|
||||
check_statuses.append(status_check)
|
||||
|
||||
return measurements, emitter_statuses, check_statuses
|
||||
return measurements
|
||||
|
||||
def stop(self):
|
||||
"""Tell the collector to stop at the next logical point.
|
||||
|
|
|
@ -1,248 +0,0 @@
|
|||
# (C) Copyright 2015 Hewlett Packard Enterprise Development Company LP
|
||||
|
||||
import monasca_agent.collector.checks.check
|
||||
|
||||
try:
|
||||
import wmi
|
||||
w = wmi.WMI()
|
||||
except Exception:
|
||||
wmi, w = None, None
|
||||
|
||||
|
||||
# Device WMI drive types
|
||||
class DriveType(object):
|
||||
UNKNOWN, NOROOT, REMOVEABLE, LOCAL, NETWORK, CD, RAM = (0, 1, 2, 3, 4, 5, 6)
|
||||
IGNORED = ('_total',)
|
||||
B2MB = float(1048576)
|
||||
KB2MB = B2KB = float(1024)
|
||||
|
||||
|
||||
class Processes(monasca_agent.collector.checks.check.Check):
|
||||
|
||||
def __init__(self, logger):
|
||||
monasca_agent.collector.checks.check.Check.__init__(self, logger)
|
||||
self.gauge('system.proc.queue_length')
|
||||
self.gauge('system.proc.count')
|
||||
|
||||
def check(self, agentConfig):
|
||||
try:
|
||||
os = w.Win32_PerfFormattedData_PerfOS_System()[0]
|
||||
except AttributeError:
|
||||
self.logger.info('Missing Win32_PerfFormattedData_PerfOS_System WMI class.' +
|
||||
' No process metrics will be returned.')
|
||||
return
|
||||
|
||||
try:
|
||||
w.Win32_PerfFormattedData_PerfOS_Processor(name="_Total")[0]
|
||||
except AttributeError:
|
||||
self.logger.info('Missing Win32_PerfFormattedData_PerfOS_Processor WMI class.' +
|
||||
' No process metrics will be returned.')
|
||||
return
|
||||
if os.ProcessorQueueLength is not None:
|
||||
self.save_sample('system.proc.queue_length', os.ProcessorQueueLength)
|
||||
if os.Processes is not None:
|
||||
self.save_sample('system.proc.count', os.Processes)
|
||||
|
||||
return self.get_metrics()
|
||||
|
||||
|
||||
class Memory(monasca_agent.collector.checks.check.Check):
|
||||
|
||||
def __init__(self, logger):
|
||||
monasca_agent.collector.checks.check.Check.__init__(self, logger)
|
||||
self.logger = logger
|
||||
self.gauge('system.mem.free')
|
||||
self.gauge('system.mem.used')
|
||||
self.gauge('system.mem.total')
|
||||
self.gauge('system.mem.cached')
|
||||
self.gauge('system.mem.committed')
|
||||
self.gauge('system.mem.paged')
|
||||
self.gauge('system.mem.nonpaged')
|
||||
|
||||
def check(self, agentConfig):
|
||||
try:
|
||||
os = w.Win32_OperatingSystem()[0]
|
||||
except AttributeError:
|
||||
self.logger.info('Missing Win32_OperatingSystem. No memory metrics will be returned.')
|
||||
return
|
||||
|
||||
if os.TotalVisibleMemorySize is not None and os.FreePhysicalMemory is not None:
|
||||
total = int(os.TotalVisibleMemorySize) / KB2MB
|
||||
free = int(os.FreePhysicalMemory) / KB2MB
|
||||
self.save_sample('system.mem.total', total)
|
||||
self.save_sample('system.mem.free', free)
|
||||
self.save_sample('system.mem.used', total - free)
|
||||
|
||||
mem = w.Win32_PerfFormattedData_PerfOS_Memory()[0]
|
||||
if mem.CacheBytes is not None:
|
||||
self.save_sample('system.mem.cached', int(mem.CacheBytes) / B2MB)
|
||||
if mem.CommittedBytes is not None:
|
||||
self.save_sample('system.mem.committed', int(mem.CommittedBytes) / B2MB)
|
||||
if mem.PoolPagedBytes is not None:
|
||||
self.save_sample('system.mem.paged', int(mem.PoolPagedBytes) / B2MB)
|
||||
if mem.PoolNonpagedBytes is not None:
|
||||
self.save_sample('system.mem.nonpaged', int(mem.PoolNonpagedBytes) / B2MB)
|
||||
|
||||
return self.get_metrics()
|
||||
|
||||
|
||||
class Cpu(monasca_agent.collector.checks.check.Check):
|
||||
|
||||
def __init__(self, logger):
|
||||
monasca_agent.collector.checks.check.Check.__init__(self, logger)
|
||||
self.logger = logger
|
||||
self.gauge('system.cpu.user')
|
||||
self.gauge('system.cpu.idle')
|
||||
self.gauge('system.cpu.interrupt')
|
||||
self.gauge('system.cpu.system')
|
||||
|
||||
def check(self, agentConfig):
|
||||
try:
|
||||
cpu = w.Win32_PerfFormattedData_PerfOS_Processor()
|
||||
except AttributeError:
|
||||
self.logger.info('Missing Win32_PerfFormattedData_PerfOS_Processor WMI class.' +
|
||||
' No CPU metrics will be returned.')
|
||||
return
|
||||
|
||||
cpu_user = self._average_metric(cpu, 'PercentUserTime')
|
||||
if cpu_user:
|
||||
self.save_sample('system.cpu.user', cpu_user)
|
||||
|
||||
cpu_idle = self._average_metric(cpu, 'PercentIdleTime')
|
||||
if cpu_idle:
|
||||
self.save_sample('system.cpu.idle', cpu_idle)
|
||||
|
||||
cpu_interrupt = self._average_metric(cpu, 'PercentInterruptTime')
|
||||
if cpu_interrupt is not None:
|
||||
self.save_sample('system.cpu.interrupt', cpu_interrupt)
|
||||
|
||||
cpu_privileged = self._average_metric(cpu, 'PercentPrivilegedTime')
|
||||
if cpu_privileged is not None:
|
||||
self.save_sample('system.cpu.system', cpu_privileged)
|
||||
|
||||
return self.get_metrics()
|
||||
|
||||
@staticmethod
|
||||
def _average_metric(wmi_class, wmi_prop):
|
||||
"""Sum all of the values of a metric from a WMI class object.
|
||||
|
||||
Excludes the value for "_Total"
|
||||
"""
|
||||
val = 0
|
||||
counter = 0
|
||||
for wmi_object in wmi_class:
|
||||
if wmi_object.Name == '_Total':
|
||||
# Skip the _Total value
|
||||
continue
|
||||
|
||||
if getattr(wmi_object, wmi_prop) is not None:
|
||||
counter += 1
|
||||
val += float(getattr(wmi_object, wmi_prop))
|
||||
|
||||
if counter > 0:
|
||||
return val / counter
|
||||
|
||||
return val
|
||||
|
||||
|
||||
class Network(monasca_agent.collector.checks.check.Check):
|
||||
|
||||
def __init__(self, logger):
|
||||
monasca_agent.collector.checks.check.Check.__init__(self, logger)
|
||||
self.logger = logger
|
||||
self.gauge('system.net.bytes_rcvd')
|
||||
self.gauge('system.net.bytes_sent')
|
||||
|
||||
def check(self, agentConfig):
|
||||
try:
|
||||
net = w.Win32_PerfFormattedData_Tcpip_NetworkInterface()
|
||||
except AttributeError:
|
||||
self.logger.info('Missing Win32_PerfFormattedData_Tcpip_NetworkInterface WMI class.' +
|
||||
' No network metrics will be returned')
|
||||
return
|
||||
|
||||
for iface in net:
|
||||
name = self.normalize_device_name(iface.name)
|
||||
if iface.BytesReceivedPerSec is not None:
|
||||
self.save_sample('system.net.bytes_rcvd', iface.BytesReceivedPerSec,
|
||||
device_name=name)
|
||||
if iface.BytesSentPerSec is not None:
|
||||
self.save_sample('system.net.bytes_sent', iface.BytesSentPerSec,
|
||||
device_name=name)
|
||||
return self.get_metrics()
|
||||
|
||||
|
||||
class Disk(monasca_agent.collector.checks.check.Check):
|
||||
|
||||
def __init__(self, logger):
|
||||
monasca_agent.collector.checks.check.Check.__init__(self, logger)
|
||||
self.logger = logger
|
||||
self.gauge('system.disk.free')
|
||||
self.gauge('system.disk.total')
|
||||
self.gauge('system.disk.in_use')
|
||||
self.gauge('system.disk.used')
|
||||
|
||||
def check(self, agentConfig):
|
||||
try:
|
||||
disk = w.Win32_LogicalDisk()
|
||||
except AttributeError:
|
||||
self.logger.info('Missing Win32_LogicalDisk WMI class.' +
|
||||
' No disk metrics will be returned.')
|
||||
return
|
||||
|
||||
for device in disk:
|
||||
name = self.normalize_device_name(device.name)
|
||||
if device.DriveType in (DriveType.CD, DriveType.UNKNOWN) or name in IGNORED:
|
||||
continue
|
||||
if device.FreeSpace is not None and device.Size is not None:
|
||||
free = float(device.FreeSpace) / B2KB
|
||||
total = float(device.Size) / B2KB
|
||||
used = total - free
|
||||
self.save_sample('system.disk.free', free, device_name=name)
|
||||
self.save_sample('system.disk.total', total, device_name=name)
|
||||
self.save_sample('system.disk.used', used, device_name=name)
|
||||
self.save_sample('system.disk.in_use', (used / total),
|
||||
device_name=name)
|
||||
return self.get_metrics()
|
||||
|
||||
|
||||
class IO(monasca_agent.collector.checks.check.Check):
|
||||
|
||||
def __init__(self, logger):
|
||||
monasca_agent.collector.checks.check.Check.__init__(self, logger)
|
||||
self.logger = logger
|
||||
self.gauge('system.io.wkb_s')
|
||||
self.gauge('system.io.w_s')
|
||||
self.gauge('system.io.rkb_s')
|
||||
self.gauge('system.io.r_s')
|
||||
self.gauge('system.io.avg_q_sz')
|
||||
|
||||
def check(self, agentConfig):
|
||||
try:
|
||||
disk = w.Win32_PerfFormattedData_PerfDisk_LogicalDisk()
|
||||
except AttributeError:
|
||||
self.logger.info(
|
||||
'Missing Win32_PerfFormattedData_PerfDisk_LogicalDiskUnable WMI class.' +
|
||||
' No I/O metrics will be returned.')
|
||||
return
|
||||
|
||||
for device in disk:
|
||||
name = self.normalize_device_name(device.name)
|
||||
if name in IGNORED:
|
||||
continue
|
||||
if device.DiskWriteBytesPerSec is not None:
|
||||
self.save_sample('system.io.wkb_s', int(device.DiskWriteBytesPerSec) / B2KB,
|
||||
device_name=name)
|
||||
if device.DiskWritesPerSec is not None:
|
||||
self.save_sample('system.io.w_s', int(device.DiskWritesPerSec),
|
||||
device_name=name)
|
||||
if device.DiskReadBytesPerSec is not None:
|
||||
self.save_sample('system.io.rkb_s', int(device.DiskReadBytesPerSec) / B2KB,
|
||||
device_name=name)
|
||||
if device.DiskReadsPerSec is not None:
|
||||
self.save_sample('system.io.r_s', int(device.DiskReadsPerSec),
|
||||
device_name=name)
|
||||
if device.CurrentDiskQueueLength is not None:
|
||||
self.save_sample('system.io.avg_q_sz', device.CurrentDiskQueueLength,
|
||||
device_name=name)
|
||||
return self.get_metrics()
|
|
@ -107,7 +107,7 @@ class Apache(checks.AgentCheck):
|
|||
if metric_count == 0:
|
||||
if self.url[-5:] != '?auto':
|
||||
self.url = '%s?auto' % self.url
|
||||
self.warning("Assuming url was not correct. Trying to add ?auto suffix to the url")
|
||||
self.log.warn("Assuming url was not correct. Trying to add ?auto suffix to the url")
|
||||
self.check(instance)
|
||||
else:
|
||||
return services_checks.Status.DOWN, "%s is DOWN, error: No metrics available.".format(service_check_name)
|
||||
|
|
|
@ -125,15 +125,14 @@ class Docker(AgentCheck):
|
|||
dimensions = self._set_dimensions(None, instance)
|
||||
containers = self._get_containers(instance)
|
||||
if not containers:
|
||||
self.warning("No containers are running.")
|
||||
self.log.warn("No containers are running.")
|
||||
|
||||
max_containers = instance.get('max_containers', DEFAULT_MAX_CONTAINERS)
|
||||
|
||||
if not instance.get("exclude") or not instance.get("include"):
|
||||
if len(containers) > max_containers:
|
||||
self.warning(
|
||||
"Too many containers to collect. Please refine the containers to collect by editing the configuration file. Truncating to %s containers" %
|
||||
max_containers)
|
||||
self.log.warn("Too many containers to collect. Please refine the containers to collect by editing the "
|
||||
"configuration file. Truncating to %s containers" % max_containers)
|
||||
containers = containers[:max_containers]
|
||||
|
||||
collected_containers = 0
|
||||
|
@ -148,8 +147,8 @@ class Docker(AgentCheck):
|
|||
|
||||
collected_containers += 1
|
||||
if collected_containers > max_containers:
|
||||
self.warning(
|
||||
"Too many containers are matching the current configuration. Some containers will not be collected. Please refine your configuration")
|
||||
self.log.warn("Too many containers are matching the current configuration. Some containers will not "
|
||||
"be collected. Please refine your configuration")
|
||||
break
|
||||
|
||||
for key, (dd_key, metric_type) in DOCKER_METRICS.items():
|
||||
|
|
|
@ -174,8 +174,8 @@ class ElasticSearch(AgentCheck):
|
|||
data = self._get_data(config_url, auth)
|
||||
version = map(int, data['version']['number'].split('.'))
|
||||
except Exception as e:
|
||||
self.warning("Error while trying to get Elasticsearch version from %s %s" %
|
||||
(config_url, str(e)))
|
||||
self.log.warn("Error while trying to get Elasticsearch version from %s %s" %
|
||||
(config_url, str(e)))
|
||||
version = [0, 0, 0]
|
||||
|
||||
self.log.debug("Elasticsearch version is %s" % version)
|
||||
|
|
|
@ -54,11 +54,11 @@ class Gearman(AgentCheck):
|
|||
port = instance.get('port', None)
|
||||
|
||||
if host is None:
|
||||
self.warning("Host not set, assuming 127.0.0.1")
|
||||
self.log.warn("Host not set, assuming 127.0.0.1")
|
||||
host = "127.0.0.1"
|
||||
|
||||
if port is None:
|
||||
self.warning("Port is not set, assuming 4730")
|
||||
self.log.warn("Port is not set, assuming 4730")
|
||||
port = 4730
|
||||
|
||||
dimensions = self._set_dimensions(None, instance)
|
||||
|
|
|
@ -82,7 +82,7 @@ class GUnicornCheck(AgentCheck):
|
|||
try:
|
||||
cpu_time_by_pid[proc.pid] = sum(proc.get_cpu_times())
|
||||
except psutil.NoSuchProcess:
|
||||
self.warning('Process %s disappeared while scanning' % proc.name)
|
||||
self.log.warn('Process %s disappeared while scanning' % proc.name)
|
||||
continue
|
||||
|
||||
# Let them do a little bit more work.
|
||||
|
|
|
@ -92,8 +92,7 @@ class HTTPCheck(services_checks.ServicesCheck):
|
|||
try:
|
||||
self.log.debug("Connecting to %s" % addr)
|
||||
if disable_ssl_validation:
|
||||
self.warning(
|
||||
"Skipping SSL certificate validation for %s based on configuration" % addr)
|
||||
self.log.warn("Skipping SSL certificate validation for %s based on configuration" % addr)
|
||||
h = Http(timeout=timeout, disable_ssl_certificate_validation=disable_ssl_validation)
|
||||
if username is not None and password is not None:
|
||||
h.add_credentials(username, password)
|
||||
|
|
|
@ -89,9 +89,8 @@ class IIS(AgentCheck):
|
|||
|
||||
for metric, mtype, wmi_val in self.METRICS:
|
||||
if not hasattr(iis_site, wmi_val):
|
||||
self.warning(
|
||||
'Unable to fetch metric %s. Missing %s in Win32_PerfFormattedData_W3SVC_WebService' %
|
||||
(metric, wmi_val))
|
||||
self.log.warn('Unable to fetch metric %s. Missing %s in '
|
||||
'Win32_PerfFormattedData_W3SVC_WebService' % (metric, wmi_val))
|
||||
continue
|
||||
|
||||
# Submit the metric value with the correct type
|
||||
|
|
|
@ -118,8 +118,7 @@ class Lighttpd(AgentCheck):
|
|||
if self.assumed_url.get(instance['lighttpd_status_url'],
|
||||
None) is None and url[-len(url_suffix):] != url_suffix:
|
||||
self.assumed_url[instance['lighttpd_status_url']] = '%s%s' % (url, url_suffix)
|
||||
self.warning(
|
||||
"Assuming url was not correct. Trying to add %s suffix to the url" % url_suffix)
|
||||
self.log.warn("Assuming url was not correct. Trying to add %s suffix to the url" % url_suffix)
|
||||
self.check(instance)
|
||||
else:
|
||||
raise Exception(
|
||||
|
|
|
@ -196,8 +196,7 @@ class MySql(checks.AgentCheck):
|
|||
greater_502 = True
|
||||
|
||||
except Exception as exception:
|
||||
self.warning("Cannot compute mysql version, assuming older than 5.0.2: %s" %
|
||||
str(exception))
|
||||
self.log.warn("Cannot compute mysql version, assuming older than 5.0.2: %s" % str(exception))
|
||||
|
||||
self.greater_502[host] = greater_502
|
||||
|
||||
|
@ -271,7 +270,7 @@ class MySql(checks.AgentCheck):
|
|||
cursor.close()
|
||||
del cursor
|
||||
except Exception:
|
||||
self.warning("Error while running %s\n%s" % (query, traceback.format_exc()))
|
||||
self.log.warn("Error while running %s\n%s" % (query, traceback.format_exc()))
|
||||
self.log.exception("Error while running %s" % query)
|
||||
|
||||
def _collect_system_metrics(self, host, db, dimensions):
|
||||
|
@ -303,8 +302,8 @@ class MySql(checks.AgentCheck):
|
|||
self.rate("mysql.performance.kernel_time", int(
|
||||
(float(kcpu) / float(clk_tck)) * 100), dimensions=dimensions)
|
||||
except Exception:
|
||||
self.warning("Error while reading mysql (pid: %s) procfs data\n%s" %
|
||||
(pid, traceback.format_exc()))
|
||||
self.log.warn("Error while reading mysql (pid: %s) procfs data\n%s" %
|
||||
(pid, traceback.format_exc()))
|
||||
|
||||
def _get_server_pid(self, db):
|
||||
pid = None
|
||||
|
@ -318,7 +317,7 @@ class MySql(checks.AgentCheck):
|
|||
cursor.close()
|
||||
del cursor
|
||||
except Exception:
|
||||
self.warning("Error while fetching pid_file variable of MySQL.")
|
||||
self.log.warn("Error while fetching pid_file variable of MySQL.")
|
||||
|
||||
if pid_file is not None:
|
||||
self.log.debug("pid file: %s" % str(pid_file))
|
||||
|
|
|
@ -106,7 +106,7 @@ class SQLServer(AgentCheck):
|
|||
self._fetch_all_instances(metric, cursor, custom_dimensions)
|
||||
except Exception:
|
||||
self.log.exception('Unable to fetch metric: %s' % mname)
|
||||
self.warning('Unable to fetch metric: %s' % mname)
|
||||
self.log.warn('Unable to fetch metric: %s' % mname)
|
||||
else:
|
||||
try:
|
||||
cursor.execute("""
|
||||
|
@ -118,7 +118,7 @@ class SQLServer(AgentCheck):
|
|||
(value,) = cursor.fetchone()
|
||||
except Exception:
|
||||
self.log.exception('Unable to fetch metric: %s' % mname)
|
||||
self.warning('Unable to fetch metric: %s' % mname)
|
||||
self.log.warn('Unable to fetch metric: %s' % mname)
|
||||
continue
|
||||
|
||||
# Save the metric
|
||||
|
|
|
@ -104,7 +104,7 @@ class Varnish(AgentCheck):
|
|||
|
||||
if m1 is None and m2 is None:
|
||||
self.log.warn("Cannot determine the version of varnishstat, assuming 3 or greater")
|
||||
self.warning("Cannot determine the version of varnishstat, assuming 3 or greater")
|
||||
self.log.warn("Cannot determine the version of varnishstat, assuming 3 or greater")
|
||||
else:
|
||||
if m1 is not None:
|
||||
version = int(m1.group(1))
|
||||
|
|
|
@ -1,233 +0,0 @@
|
|||
# (C) Copyright 2015 Hewlett Packard Enterprise Development Company LP
|
||||
"""Monitor the Windows Event Log.
|
||||
|
||||
"""
|
||||
import calendar
|
||||
from datetime import datetime
|
||||
try:
|
||||
import wmi
|
||||
except Exception:
|
||||
wmi = None
|
||||
|
||||
from monasca_agent.collector.checks import AgentCheck
|
||||
|
||||
SOURCE_TYPE_NAME = 'event viewer'
|
||||
EVENT_TYPE = 'win32_log_event'
|
||||
|
||||
|
||||
class Win32EventLog(AgentCheck):
|
||||
|
||||
def __init__(self, name, init_config, agent_config):
|
||||
AgentCheck.__init__(self, name, init_config, agent_config)
|
||||
self.last_ts = {}
|
||||
self.wmi_conns = {}
|
||||
|
||||
def _get_wmi_conn(self, host, user, password):
|
||||
key = "%s:%s:%s" % (host, user, password)
|
||||
if key not in self.wmi_conns:
|
||||
self.wmi_conns[key] = wmi.WMI(host, user=user, password=password)
|
||||
return self.wmi_conns[key]
|
||||
|
||||
def check(self, instance):
|
||||
if wmi is None:
|
||||
raise Exception("Missing 'wmi' module")
|
||||
|
||||
host = instance.get('host')
|
||||
user = instance.get('username')
|
||||
password = instance.get('password')
|
||||
dimensions = self._set_dimensions(None, instance)
|
||||
notify = instance.get('notify', [])
|
||||
w = self._get_wmi_conn(host, user, password)
|
||||
|
||||
# Store the last timestamp by instance
|
||||
instance_key = self._instance_key(instance)
|
||||
if instance_key not in self.last_ts:
|
||||
self.last_ts[instance_key] = datetime.utcnow()
|
||||
return
|
||||
|
||||
# Find all events in the last check that match our search by running a
|
||||
# straight WQL query against the event log
|
||||
last_ts = self.last_ts[instance_key]
|
||||
q = EventLogQuery(ltype=instance.get('type'),
|
||||
user=instance.get('user'),
|
||||
source_name=instance.get('source_name'),
|
||||
log_file=instance.get('log_file'),
|
||||
message_filters=instance.get('message_filters', []),
|
||||
start_ts=last_ts)
|
||||
wql = q.to_wql()
|
||||
self.log.debug("Querying for Event Log events: %s" % wql)
|
||||
events = w.query(wql)
|
||||
|
||||
# Save any events returned to the payload as Datadog events
|
||||
for ev in events:
|
||||
log_ev = LogEvent(ev, self.agent_config.get('api_key', ''),
|
||||
self.hostname, dimensions, notify)
|
||||
|
||||
# Since WQL only compares on the date and NOT the time, we have to
|
||||
# do a secondary check to make sure events are after the last
|
||||
# timestamp
|
||||
if log_ev.is_after(last_ts):
|
||||
self.event(log_ev.to_event_dict())
|
||||
else:
|
||||
self.log.debug('Skipping event after %s. ts=%s' % (last_ts, log_ev.timestamp))
|
||||
|
||||
# Update the last time checked
|
||||
self.last_ts[instance_key] = datetime.utcnow()
|
||||
|
||||
@staticmethod
|
||||
def _instance_key(instance):
|
||||
"""Generate a unique key per instance for use with keeping track of
|
||||
|
||||
state for each instance.
|
||||
"""
|
||||
return '%s' % (instance)
|
||||
|
||||
|
||||
class EventLogQuery(object):
|
||||
|
||||
def __init__(self, ltype=None, user=None, source_name=None, log_file=None,
|
||||
start_ts=None, message_filters=None):
|
||||
self.filters = [
|
||||
('Type', self._convert_event_types(ltype)),
|
||||
('User', user),
|
||||
('SourceName', source_name),
|
||||
('LogFile', log_file)
|
||||
]
|
||||
self.message_filters = message_filters or []
|
||||
self.start_ts = start_ts
|
||||
|
||||
def to_wql(self):
|
||||
"""Return this query as a WQL string.
|
||||
|
||||
"""
|
||||
wql = """
|
||||
SELECT Message, SourceName, TimeGenerated, Type, User, InsertionStrings
|
||||
FROM Win32_NTLogEvent
|
||||
WHERE TimeGenerated >= "%s"
|
||||
""" % (self._dt_to_wmi(self.start_ts))
|
||||
for name, vals in self.filters:
|
||||
wql = self._add_filter(name, vals, wql)
|
||||
for msg_filter in self.message_filters:
|
||||
wql = self._add_message_filter(msg_filter, wql)
|
||||
return wql
|
||||
|
||||
@staticmethod
|
||||
def _add_filter(name, vals, q):
|
||||
if not vals:
|
||||
return q
|
||||
# A query like (X = Y) does not work, unless there are multiple
|
||||
# statements inside the parentheses, such as (X = Y OR Z = Q)
|
||||
if len(vals) == 1:
|
||||
vals = vals[0]
|
||||
if not isinstance(vals, list):
|
||||
q += '\nAND %s = "%s"' % (name, vals)
|
||||
else:
|
||||
q += "\nAND (%s)" % (' OR '.join(
|
||||
['%s = "%s"' % (name, l) for l in vals]
|
||||
))
|
||||
return q
|
||||
|
||||
@staticmethod
|
||||
def _add_message_filter(msg_filter, q):
|
||||
"""Filter on the message text using a LIKE query. If the filter starts
|
||||
|
||||
with '-' then we'll assume that it's a NOT LIKE filter.
|
||||
"""
|
||||
if msg_filter.startswith('-'):
|
||||
msg_filter = msg_filter[1:]
|
||||
q += '\nAND NOT Message LIKE "%s"' % msg_filter
|
||||
else:
|
||||
q += '\nAND Message LIKE "%s"' % msg_filter
|
||||
return q
|
||||
|
||||
@staticmethod
|
||||
def _dt_to_wmi(dt):
|
||||
"""A wrapper around wmi.from_time to get a WMI-formatted time from a time struct.
|
||||
|
||||
"""
|
||||
return wmi.from_time(year=dt.year, month=dt.month, day=dt.day,
|
||||
hours=dt.hour, minutes=dt.minute, seconds=dt.second, microseconds=0,
|
||||
timezone=0)
|
||||
|
||||
@staticmethod
|
||||
def _convert_event_types(types):
|
||||
"""Detect if we are running on <= Server 2003. If so, we should convert
|
||||
|
||||
the EventType values to integers
|
||||
"""
|
||||
return types
|
||||
|
||||
|
||||
class LogEvent(object):
|
||||
|
||||
def __init__(self, ev, api_key, hostname, dimensions, notify_list):
|
||||
self.event = ev
|
||||
self.api_key = api_key
|
||||
self.hostname = hostname
|
||||
self.dimensions = dimensions
|
||||
self.notify_list = notify_list
|
||||
self.timestamp = self._wmi_to_ts(self.event.TimeGenerated)
|
||||
|
||||
def to_event_dict(self):
|
||||
return {
|
||||
'timestamp': self.timestamp,
|
||||
'event_type': EVENT_TYPE,
|
||||
'api_key': self.api_key,
|
||||
'msg_title': self._msg_title(self.event),
|
||||
'msg_text': self._msg_text(self.event).strip(),
|
||||
'aggregation_key': self._aggregation_key(self.event),
|
||||
'alert_type': self._alert_type(self.event),
|
||||
'source_type_name': SOURCE_TYPE_NAME,
|
||||
'host': self.hostname,
|
||||
'dimensions': self.dimensions
|
||||
}
|
||||
|
||||
def is_after(self, ts):
|
||||
"""Compare this event's timestamp to a give timestamp.
|
||||
|
||||
"""
|
||||
if self.timestamp >= int(calendar.timegm(ts.timetuple())):
|
||||
return True
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def _wmi_to_ts(wmi_ts):
|
||||
"""Convert a wmi formatted timestamp into an epoch using wmi.to_time().
|
||||
|
||||
"""
|
||||
year, month, day, hour, minute, second, microsecond, tz = \
|
||||
wmi.to_time(wmi_ts)
|
||||
dt = datetime(year=year, month=month, day=day, hour=hour, minute=minute,
|
||||
second=second, microsecond=microsecond)
|
||||
return int(calendar.timegm(dt.timetuple()))
|
||||
|
||||
@staticmethod
|
||||
def _msg_title(event):
|
||||
return '%s/%s' % (event.Logfile, event.SourceName)
|
||||
|
||||
def _msg_text(self, event):
|
||||
msg_text = ""
|
||||
if event.Message:
|
||||
msg_text = "%s\n" % event.Message
|
||||
elif event.InsertionStrings:
|
||||
msg_text = "\n".join([i_str for i_str in event.InsertionStrings
|
||||
if i_str.strip()])
|
||||
|
||||
if self.notify_list:
|
||||
msg_text += "\n%s" % ' '.join([" @" + n for n in self.notify_list])
|
||||
|
||||
return msg_text
|
||||
|
||||
@staticmethod
|
||||
def _alert_type(event):
|
||||
event_type = event.Type
|
||||
# Convert to a Datadog alert type
|
||||
if event_type == 'Warning':
|
||||
return 'warning'
|
||||
elif event_type == 'Error':
|
||||
return 'error'
|
||||
return 'info'
|
||||
|
||||
@staticmethod
|
||||
def _aggregation_key(event):
|
||||
return event.SourceName
|
|
@ -66,10 +66,6 @@ class CollectorDaemon(monasca_agent.common.daemon.Daemon):
|
|||
self._handle_sigterm(signum, frame)
|
||||
self._do_restart()
|
||||
|
||||
def info(self, verbose=None):
|
||||
logging.getLogger().setLevel(logging.ERROR)
|
||||
return monasca_agent.common.check_status.CollectorStatus.print_latest_status(verbose=verbose)
|
||||
|
||||
def run(self, config):
|
||||
"""Main loop of the collector.
|
||||
|
||||
|
@ -83,9 +79,6 @@ class CollectorDaemon(monasca_agent.common.daemon.Daemon):
|
|||
# Handle Keyboard Interrupt
|
||||
signal.signal(signal.SIGINT, self._handle_sigterm)
|
||||
|
||||
# Save the agent start-up stats.
|
||||
monasca_agent.common.check_status.CollectorStatus().persist()
|
||||
|
||||
# Load the checks_d checks
|
||||
checksd = util.load_check_directory()
|
||||
|
||||
|
@ -143,12 +136,6 @@ class CollectorDaemon(monasca_agent.common.daemon.Daemon):
|
|||
"of {1}. Starting collection again without waiting in result.".format(collection_time,
|
||||
check_frequency))
|
||||
|
||||
# Now clean-up.
|
||||
try:
|
||||
monasca_agent.common.check_status.CollectorStatus.remove_latest_status()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Explicitly kill the process, because it might be running
|
||||
# as a daemon.
|
||||
log.info("Exiting. Bye bye.")
|
||||
|
|
|
@ -1,713 +0,0 @@
|
|||
# (C) Copyright 2015,2016 Hewlett Packard Enterprise Development Company LP
|
||||
"""
|
||||
This module contains classes which are used to occasionally persist the status
|
||||
of checks.
|
||||
"""
|
||||
|
||||
# stdlib
|
||||
import datetime
|
||||
import logging
|
||||
import os
|
||||
import pickle
|
||||
import platform
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
|
||||
# project
|
||||
import collections
|
||||
import config
|
||||
import util
|
||||
import yaml
|
||||
|
||||
# 3rd party
|
||||
import ntplib
|
||||
|
||||
STATUS_OK = 'OK'
|
||||
STATUS_ERROR = 'ERROR'
|
||||
STATUS_WARNING = 'WARNING'
|
||||
|
||||
NTP_OFFSET_THRESHOLD = 600
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Stylizer(object):
|
||||
|
||||
STYLES = {
|
||||
'bold': 1,
|
||||
'grey': 30,
|
||||
'red': 31,
|
||||
'green': 32,
|
||||
'yellow': 33,
|
||||
'blue': 34,
|
||||
'magenta': 35,
|
||||
'cyan': 36,
|
||||
'white': 37,
|
||||
}
|
||||
|
||||
HEADER = '\033[1m'
|
||||
UNDERLINE = '\033[2m'
|
||||
|
||||
OKBLUE = '\033[94m'
|
||||
OKGREEN = '\033[92m'
|
||||
WARNING = '\033[93m'
|
||||
FAIL = '\033[91m'
|
||||
RESET = '\033[0m'
|
||||
|
||||
ENABLED = False
|
||||
|
||||
@classmethod
|
||||
def stylize(cls, text, *styles):
|
||||
"""stylize the text. """
|
||||
if not cls.ENABLED:
|
||||
return text
|
||||
# don't bother about escaping, not that complicated.
|
||||
fmt = '\033[%dm%s'
|
||||
|
||||
for style in styles or []:
|
||||
text = fmt % (cls.STYLES[style], text)
|
||||
|
||||
return text + fmt % (0, '') # reset
|
||||
|
||||
|
||||
# a small convienence method
|
||||
def style(*args):
|
||||
return Stylizer.stylize(*args)
|
||||
|
||||
|
||||
def logger_info():
|
||||
loggers = []
|
||||
root_logger = logging.getLogger()
|
||||
if len(root_logger.handlers) > 0:
|
||||
for handler in root_logger.handlers:
|
||||
if isinstance(handler, logging.StreamHandler):
|
||||
loggers.append(handler.stream.name)
|
||||
if isinstance(handler, logging.handlers.SysLogHandler):
|
||||
if isinstance(handler.address, basestring):
|
||||
loggers.append('syslog:%s' % handler.address)
|
||||
else:
|
||||
loggers.append('syslog:(%s, %s)' % handler.address)
|
||||
else:
|
||||
loggers.append("No loggers configured")
|
||||
return ', '.join(loggers)
|
||||
|
||||
|
||||
def get_ntp_info():
|
||||
ntp_offset = ntplib.NTPClient().request('pool.ntp.org', version=3).offset
|
||||
if abs(ntp_offset) > NTP_OFFSET_THRESHOLD:
|
||||
ntp_styles = ['red', 'bold']
|
||||
else:
|
||||
ntp_styles = []
|
||||
return ntp_offset, ntp_styles
|
||||
|
||||
|
||||
class AgentStatus(object):
|
||||
"""A small class used to load and save status messages to the filesystem.
|
||||
"""
|
||||
|
||||
NAME = None
|
||||
agent_config = config.Config()
|
||||
|
||||
def __init__(self):
|
||||
self.created_at = datetime.datetime.now()
|
||||
self.created_by_pid = os.getpid()
|
||||
|
||||
def has_error(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def persist(self):
|
||||
try:
|
||||
path = self._get_pickle_path()
|
||||
log.debug("Persisting status to %s" % path)
|
||||
f = open(path, 'w')
|
||||
try:
|
||||
pickle.dump(self, f)
|
||||
finally:
|
||||
f.close()
|
||||
except Exception:
|
||||
log.exception("Error persisting status")
|
||||
|
||||
def created_seconds_ago(self):
|
||||
td = datetime.datetime.now() - self.created_at
|
||||
return td.seconds
|
||||
|
||||
def render(self):
|
||||
indent = " "
|
||||
lines = self._header_lines(indent) + [
|
||||
indent + l for l in self.body_lines()
|
||||
] + ["", ""]
|
||||
return "\n".join(lines)
|
||||
|
||||
@classmethod
|
||||
def _title_lines(cls):
|
||||
name_line = "%s (v %s)" % (cls.NAME, AgentStatus.agent_config.get_version())
|
||||
lines = [
|
||||
"=" * len(name_line),
|
||||
"%s" % name_line,
|
||||
"=" * len(name_line),
|
||||
"",
|
||||
]
|
||||
return lines
|
||||
|
||||
def _header_lines(self, indent):
|
||||
# Don't indent the header
|
||||
lines = self._title_lines()
|
||||
if self.created_seconds_ago() > 120:
|
||||
styles = ['red', 'bold']
|
||||
else:
|
||||
styles = []
|
||||
# We color it in red if the status is too old
|
||||
fields = [
|
||||
(
|
||||
style("Status date", *styles),
|
||||
style("%s (%ss ago)" %
|
||||
(self.created_at.strftime('%Y-%m-%d %H:%M:%S'),
|
||||
self.created_seconds_ago()), *styles)
|
||||
)
|
||||
]
|
||||
|
||||
fields += [
|
||||
("Pid", self.created_by_pid),
|
||||
("Platform", platform.platform()),
|
||||
("Python Version", platform.python_version()),
|
||||
("Logs", logger_info()),
|
||||
]
|
||||
|
||||
for key, value in fields:
|
||||
l = indent + "%s: %s" % (key, value)
|
||||
lines.append(l)
|
||||
return lines + [""]
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
'pid': self.created_by_pid,
|
||||
'status_date': "%s (%ss ago)" % (self.created_at.strftime('%Y-%m-%d %H:%M:%S'),
|
||||
self.created_seconds_ago())}
|
||||
|
||||
@classmethod
|
||||
def _not_running_message(cls):
|
||||
lines = cls._title_lines() + [
|
||||
style(" %s is not running." % cls.NAME, 'red'),
|
||||
style(""" You can get more details in the logs: %s""" % logger_info(), 'red'),
|
||||
"",
|
||||
""
|
||||
]
|
||||
return "\n".join(lines)
|
||||
|
||||
@classmethod
|
||||
def remove_latest_status(cls):
|
||||
log.debug("Removing latest status")
|
||||
try:
|
||||
os.remove(cls._get_pickle_path())
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def load_latest_status(cls):
|
||||
try:
|
||||
f = open(cls._get_pickle_path())
|
||||
try:
|
||||
return pickle.load(f)
|
||||
finally:
|
||||
f.close()
|
||||
except IOError:
|
||||
log.info("Couldn't load latest status")
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def print_latest_status(cls, verbose=False):
|
||||
cls.verbose = verbose
|
||||
Stylizer.ENABLED = False
|
||||
try:
|
||||
if sys.stdout.isatty():
|
||||
Stylizer.ENABLED = True
|
||||
except Exception:
|
||||
# Don't worry if we can't enable the
|
||||
# stylizer.
|
||||
pass
|
||||
|
||||
message = cls._not_running_message()
|
||||
exit_code = -1
|
||||
|
||||
module_status = cls.load_latest_status()
|
||||
if module_status:
|
||||
message = module_status.render()
|
||||
exit_code = 0
|
||||
if module_status.has_error():
|
||||
exit_code = 1
|
||||
|
||||
sys.stdout.write(message)
|
||||
return exit_code
|
||||
|
||||
@classmethod
|
||||
def _get_pickle_path(cls):
|
||||
return os.path.join(tempfile.gettempdir(), cls.__name__ + '.pickle')
|
||||
|
||||
|
||||
class InstanceStatus(object):
|
||||
|
||||
def __init__(self, instance_id, status, error=None, tb=None, warnings=None, metric_count=None):
|
||||
self.instance_id = instance_id
|
||||
self.status = status
|
||||
self.error = repr(error)
|
||||
self.traceback = tb
|
||||
self.warnings = warnings
|
||||
self.metric_count = metric_count
|
||||
|
||||
def has_error(self):
|
||||
return self.status == STATUS_ERROR
|
||||
|
||||
def has_warnings(self):
|
||||
return self.status == STATUS_WARNING
|
||||
|
||||
|
||||
class CheckStatus(object):
|
||||
|
||||
def __init__(self, check_name, instance_statuses, metric_count,
|
||||
event_count=None, init_failed_error=None,
|
||||
init_failed_traceback=None, library_versions=None):
|
||||
self.name = check_name
|
||||
self.instance_statuses = instance_statuses
|
||||
self.metric_count = metric_count
|
||||
self.event_count = event_count
|
||||
self.init_failed_error = init_failed_error
|
||||
self.init_failed_traceback = init_failed_traceback
|
||||
self.library_versions = library_versions
|
||||
|
||||
@property
|
||||
def status(self):
|
||||
if self.init_failed_error:
|
||||
return STATUS_ERROR
|
||||
for instance_status in self.instance_statuses:
|
||||
if instance_status.status == STATUS_ERROR:
|
||||
return STATUS_ERROR
|
||||
return STATUS_OK
|
||||
|
||||
def has_error(self):
|
||||
return self.status == STATUS_ERROR
|
||||
|
||||
|
||||
class EmitterStatus(object):
|
||||
|
||||
def __init__(self, name, error=None):
|
||||
self.name = name
|
||||
self.error = None
|
||||
if error:
|
||||
self.error = repr(error)
|
||||
|
||||
@property
|
||||
def status(self):
|
||||
if self.error:
|
||||
return STATUS_ERROR
|
||||
else:
|
||||
return STATUS_OK
|
||||
|
||||
def has_error(self):
|
||||
return self.status != STATUS_OK
|
||||
|
||||
|
||||
class CollectorStatus(AgentStatus):
|
||||
|
||||
NAME = 'Collector'
|
||||
|
||||
def __init__(self, check_statuses=None, emitter_statuses=None):
|
||||
AgentStatus.__init__(self)
|
||||
self.check_statuses = check_statuses or []
|
||||
self.emitter_statuses = emitter_statuses or []
|
||||
|
||||
@property
|
||||
def status(self):
|
||||
for check_status in self.check_statuses:
|
||||
if check_status.status == STATUS_ERROR:
|
||||
return STATUS_ERROR
|
||||
return STATUS_OK
|
||||
|
||||
def has_error(self):
|
||||
return self.status != STATUS_OK
|
||||
|
||||
def body_lines(self):
|
||||
lines = [
|
||||
'Clocks',
|
||||
'======',
|
||||
''
|
||||
]
|
||||
try:
|
||||
ntp_offset, ntp_styles = get_ntp_info()
|
||||
lines.append(' ' + style('NTP offset', *ntp_styles) + ': ' +
|
||||
style('%s s' % round(ntp_offset, 4), *ntp_styles))
|
||||
except Exception as e:
|
||||
lines.append(' NTP offset: Unkwown (%s)' % str(e))
|
||||
lines.append(' System UTC time: ' + datetime.datetime.utcnow().__str__())
|
||||
lines.append('')
|
||||
|
||||
# Paths to checks_d/conf.d
|
||||
lines += [
|
||||
'Paths',
|
||||
'=====',
|
||||
''
|
||||
]
|
||||
|
||||
paths = util.Paths()
|
||||
try:
|
||||
confd_path = paths.get_confd_path()
|
||||
except util.PathNotFound:
|
||||
confd_path = 'Not found'
|
||||
|
||||
try:
|
||||
checksd_path = paths.get_checksd_path()
|
||||
except util.PathNotFound:
|
||||
checksd_path = 'Not found'
|
||||
|
||||
lines.append(' conf.d: ' + confd_path)
|
||||
lines.append(' checks_d: ' + checksd_path)
|
||||
lines.append('')
|
||||
|
||||
# Hostnames
|
||||
lines += [
|
||||
'Hostnames',
|
||||
'=========',
|
||||
''
|
||||
]
|
||||
|
||||
# Checks.d Status
|
||||
lines += [
|
||||
'Checks',
|
||||
'======',
|
||||
''
|
||||
]
|
||||
check_statuses = self.check_statuses + get_jmx_status()
|
||||
if not check_statuses:
|
||||
lines.append(" No checks have run yet.")
|
||||
else:
|
||||
for cs in check_statuses:
|
||||
check_lines = [
|
||||
' ' + cs.name,
|
||||
' ' + '-' * len(cs.name)
|
||||
]
|
||||
if cs.init_failed_error:
|
||||
check_lines.append(" - initialize check class [%s]: %s" %
|
||||
(style(STATUS_ERROR, 'red'),
|
||||
repr(cs.init_failed_error)))
|
||||
if self.verbose and cs.init_failed_traceback:
|
||||
check_lines.extend(' ' + line for line in
|
||||
cs.init_failed_traceback.split('\n'))
|
||||
else:
|
||||
for s in cs.instance_statuses:
|
||||
c = 'green'
|
||||
if s.has_warnings():
|
||||
c = 'yellow'
|
||||
if s.has_error():
|
||||
c = 'red'
|
||||
line = " - instance #%s [%s]" % (s.instance_id, style(s.status, c))
|
||||
if s.has_error():
|
||||
line += u": %s" % s.error
|
||||
if s.metric_count is not None:
|
||||
line += " collected %s metrics" % s.metric_count
|
||||
|
||||
check_lines.append(line)
|
||||
|
||||
if s.has_warnings():
|
||||
for warning in s.warnings:
|
||||
warn = warning.split('\n')
|
||||
if not len(warn):
|
||||
continue
|
||||
check_lines.append(u" %s: %s" %
|
||||
(style("Warning", 'yellow'), warn[0]))
|
||||
check_lines.extend(u" %s" % l for l in warn[1:])
|
||||
if self.verbose and s.traceback is not None:
|
||||
check_lines.extend(' ' + line for line in s.traceback.split('\n'))
|
||||
|
||||
check_lines += [
|
||||
" - Collected %s metrics & %s events" % (
|
||||
cs.metric_count, cs.event_count),
|
||||
]
|
||||
|
||||
if cs.library_versions is not None:
|
||||
check_lines += [
|
||||
" - Dependencies:"]
|
||||
for library, version in cs.library_versions.iteritems():
|
||||
check_lines += [" - %s: %s" % (library, version)]
|
||||
|
||||
check_lines += [""]
|
||||
|
||||
lines += check_lines
|
||||
|
||||
# Emitter status
|
||||
lines += [
|
||||
"",
|
||||
"Emitters",
|
||||
"========",
|
||||
""
|
||||
]
|
||||
if not self.emitter_statuses:
|
||||
lines.append(" No emitters have run yet.")
|
||||
else:
|
||||
for es in self.emitter_statuses:
|
||||
c = 'green'
|
||||
if es.has_error():
|
||||
c = 'red'
|
||||
line = " - %s [%s]" % (es.name, style(es.status, c))
|
||||
if es.status != STATUS_OK:
|
||||
line += ": %s" % es.error
|
||||
lines.append(line)
|
||||
|
||||
return lines
|
||||
|
||||
def to_dict(self):
|
||||
status_info = AgentStatus.to_dict(self)
|
||||
|
||||
# Hostnames
|
||||
status_info['hostnames'] = {}
|
||||
|
||||
# Checks.d Status
|
||||
status_info['checks'] = {}
|
||||
check_statuses = self.check_statuses + get_jmx_status()
|
||||
for cs in check_statuses:
|
||||
status_info['checks'][cs.name] = {'instances': {}}
|
||||
if cs.init_failed_error:
|
||||
status_info['checks'][cs.name]['init_failed'] = True
|
||||
status_info['checks'][cs.name]['traceback'] = cs.init_failed_traceback
|
||||
else:
|
||||
status_info['checks'][cs.name] = {'instances': {}}
|
||||
status_info['checks'][cs.name]['init_failed'] = False
|
||||
for s in cs.instance_statuses:
|
||||
status_info['checks'][cs.name]['instances'][s.instance_id] = {
|
||||
'status': s.status,
|
||||
'has_error': s.has_error(),
|
||||
'has_warnings': s.has_warnings(),
|
||||
}
|
||||
if s.has_error():
|
||||
status_info['checks'][cs.name]['instances'][
|
||||
s.instance_id]['error'] = s.error
|
||||
if s.has_warnings():
|
||||
status_info['checks'][cs.name]['instances'][
|
||||
s.instance_id]['warnings'] = s.warnings
|
||||
status_info['checks'][cs.name]['metric_count'] = cs.metric_count
|
||||
status_info['checks'][cs.name]['event_count'] = cs.event_count
|
||||
|
||||
# Emitter status
|
||||
status_info['emitter'] = []
|
||||
for es in self.emitter_statuses:
|
||||
check_status = {'name': es.name,
|
||||
'status': es.status,
|
||||
'has_error': es.has_error()}
|
||||
if es.has_error():
|
||||
check_status['error'] = es.error
|
||||
status_info['emitter'].append(check_status)
|
||||
|
||||
paths = util.Paths()
|
||||
try:
|
||||
status_info['confd_path'] = paths.get_confd_path()
|
||||
except config.PathNotFound:
|
||||
status_info['confd_path'] = 'Not found'
|
||||
|
||||
try:
|
||||
status_info['checksd_path'] = paths.get_checksd_path()
|
||||
except config.PathNotFound:
|
||||
status_info['checksd_path'] = 'Not found'
|
||||
|
||||
return status_info
|
||||
|
||||
|
||||
class MonascaStatsdStatus(AgentStatus):
|
||||
|
||||
NAME = 'Monasca_Statsd'
|
||||
|
||||
def __init__(self, flush_count=0, packet_count=0,
|
||||
packets_per_second=0, metric_count=0, event_count=0):
|
||||
AgentStatus.__init__(self)
|
||||
self.flush_count = flush_count
|
||||
self.packet_count = packet_count
|
||||
self.packets_per_second = packets_per_second
|
||||
self.metric_count = metric_count
|
||||
self.event_count = event_count
|
||||
|
||||
def has_error(self):
|
||||
return self.flush_count == 0 and self.packet_count == 0 and self.metric_count == 0
|
||||
|
||||
def body_lines(self):
|
||||
lines = [
|
||||
"Flush count: %s" % self.flush_count,
|
||||
"Packet Count: %s" % self.packet_count,
|
||||
"Packets per second: %s" % self.packets_per_second,
|
||||
"Metric count: %s" % self.metric_count,
|
||||
"Event count: %s" % self.event_count,
|
||||
]
|
||||
return lines
|
||||
|
||||
def to_dict(self):
|
||||
status_info = AgentStatus.to_dict(self)
|
||||
status_info.update({
|
||||
'flush_count': self.flush_count,
|
||||
'packet_count': self.packet_count,
|
||||
'packets_per_second': self.packets_per_second,
|
||||
'metric_count': self.metric_count,
|
||||
'event_count': self.event_count,
|
||||
})
|
||||
return status_info
|
||||
|
||||
|
||||
class ForwarderStatus(AgentStatus):
|
||||
|
||||
NAME = 'Forwarder'
|
||||
|
||||
def __init__(self, queue_length=0, queue_size=0, flush_count=0, transactions_received=0,
|
||||
transactions_flushed=0):
|
||||
AgentStatus.__init__(self)
|
||||
self.queue_length = queue_length
|
||||
self.queue_size = queue_size
|
||||
self.flush_count = flush_count
|
||||
self.transactions_received = transactions_received
|
||||
self.transactions_flushed = transactions_flushed
|
||||
|
||||
def body_lines(self):
|
||||
lines = [
|
||||
"Queue Size: %s bytes" % self.queue_size,
|
||||
"Queue Length: %s" % self.queue_length,
|
||||
"Flush Count: %s" % self.flush_count,
|
||||
"Transactions received: %s" % self.transactions_received,
|
||||
"Transactions flushed: %s" % self.transactions_flushed
|
||||
]
|
||||
if self.transactions_flushed == 0:
|
||||
lines.append("[%s]: Unable to flush transactions\n %s" %
|
||||
(style(STATUS_ERROR, 'red'),
|
||||
"Please verify monasca-api is running as configured"))
|
||||
elif self.transactions_flushed != self.transactions_received:
|
||||
lines.append("[%s]: Transactions out of sync\n %s" %
|
||||
(style(STATUS_WARNING, 'yellow'),
|
||||
"Likely contact interruption with monasca-api"))
|
||||
else:
|
||||
lines.append("[%s]: Transactions up to date" %
|
||||
style(STATUS_OK, 'green'))
|
||||
|
||||
return lines
|
||||
|
||||
def has_error(self):
|
||||
return self.flush_count == 0
|
||||
|
||||
def to_dict(self):
|
||||
status_info = AgentStatus.to_dict(self)
|
||||
status_info.update({
|
||||
'flush_count': self.flush_count,
|
||||
'queue_length': self.queue_length,
|
||||
'queue_size': self.queue_size,
|
||||
})
|
||||
return status_info
|
||||
|
||||
|
||||
def get_jmx_instance_status(instance_name, status, message, metric_count):
|
||||
if status == STATUS_ERROR:
|
||||
instance_status = InstanceStatus(
|
||||
instance_name, STATUS_ERROR, error=message, metric_count=metric_count)
|
||||
|
||||
elif status == STATUS_WARNING:
|
||||
instance_status = InstanceStatus(
|
||||
instance_name, STATUS_WARNING, warnings=[message], metric_count=metric_count)
|
||||
|
||||
elif status == STATUS_OK:
|
||||
instance_status = InstanceStatus(instance_name, STATUS_OK, metric_count=metric_count)
|
||||
|
||||
return instance_status
|
||||
|
||||
|
||||
def get_jmx_status():
|
||||
"""This function tries to read the 2 jmxfetch status file which are yaml file
|
||||
located in the temp directory.
|
||||
|
||||
There are 2 files:
|
||||
- One generated by the Agent itself, for jmx checks that can't be initialized because
|
||||
there are missing stuff.
|
||||
Its format is as following:
|
||||
|
||||
###
|
||||
invalid_checks:
|
||||
jmx: !!python/object/apply:jmxfetch.InvalidJMXConfiguration [You need to have at
|
||||
least one instance defined in the YAML file for this check]
|
||||
timestamp: 1391040927.136523
|
||||
###
|
||||
|
||||
- One generated by jmxfetch that return information about the collection of metrics
|
||||
its format is as following:
|
||||
|
||||
###
|
||||
timestamp: 1391037347435
|
||||
checks:
|
||||
failed_checks:
|
||||
jmx:
|
||||
- {message: Unable to create instance. Please check your yaml file, status: ERROR}
|
||||
initialized_checks:
|
||||
tomcat:
|
||||
- {message: null, status: OK, metric_count: 7, instance_name: jmx-remihakim.fr-3000}
|
||||
###
|
||||
"""
|
||||
check_statuses = []
|
||||
java_status_path = os.path.join(tempfile.gettempdir(), "jmx_status.yaml")
|
||||
python_status_path = os.path.join(tempfile.gettempdir(), "jmx_status_python.yaml")
|
||||
if not os.path.exists(java_status_path) and not os.path.exists(python_status_path):
|
||||
log.debug("There is no jmx_status file at: %s or at: %s" %
|
||||
(java_status_path, python_status_path))
|
||||
return []
|
||||
|
||||
check_data = collections.defaultdict(lambda: collections.defaultdict(list))
|
||||
try:
|
||||
if os.path.exists(java_status_path):
|
||||
java_jmx_stats = yaml.load(file(java_status_path))
|
||||
|
||||
# JMX timestamp is saved in milliseconds
|
||||
status_age = time.time() - java_jmx_stats.get('timestamp') / 1000
|
||||
jmx_checks = java_jmx_stats.get('checks', {})
|
||||
|
||||
if status_age > 60:
|
||||
check_statuses.append(
|
||||
CheckStatus(
|
||||
"jmx", [
|
||||
InstanceStatus(
|
||||
0, STATUS_ERROR, error="JMXfetch didn't return any metrics during the last minute")], 0, 0))
|
||||
else:
|
||||
|
||||
for check_name, instances in jmx_checks.get('failed_checks', {}).iteritems():
|
||||
for info in instances:
|
||||
message = info.get('message', None)
|
||||
metric_count = info.get('metric_count', 0)
|
||||
status = info.get('status')
|
||||
instance_name = info.get('instance_name', None)
|
||||
check_data[check_name]['statuses'].append(
|
||||
get_jmx_instance_status(
|
||||
instance_name,
|
||||
status,
|
||||
message,
|
||||
metric_count))
|
||||
check_data[check_name]['metric_count'].append(metric_count)
|
||||
|
||||
for check_name, instances in jmx_checks.get('initialized_checks', {}).iteritems():
|
||||
for info in instances:
|
||||
message = info.get('message', None)
|
||||
metric_count = info.get('metric_count', 0)
|
||||
status = info.get('status')
|
||||
instance_name = info.get('instance_name', None)
|
||||
check_data[check_name]['statuses'].append(
|
||||
get_jmx_instance_status(
|
||||
instance_name,
|
||||
status,
|
||||
message,
|
||||
metric_count))
|
||||
check_data[check_name]['metric_count'].append(metric_count)
|
||||
|
||||
for check_name, data in check_data.iteritems():
|
||||
check_status = CheckStatus(
|
||||
check_name, data['statuses'], sum(data['metric_count']), 0)
|
||||
check_statuses.append(check_status)
|
||||
|
||||
if os.path.exists(python_status_path):
|
||||
python_jmx_stats = yaml.load(file(python_status_path))
|
||||
jmx_checks = python_jmx_stats.get('invalid_checks', {})
|
||||
for check_name, excep in jmx_checks.iteritems():
|
||||
check_statuses.append(CheckStatus(check_name, [], 0, 0, init_failed_error=excep))
|
||||
|
||||
return check_statuses
|
||||
|
||||
except Exception:
|
||||
log.exception("Couldn't load latest jmx status")
|
||||
return []
|
|
@ -232,18 +232,11 @@ class Paths(object):
|
|||
|
||||
def get_confd_path(self):
|
||||
bad_path = ''
|
||||
if self.osname == 'windows':
|
||||
try:
|
||||
return self._windows_confd_path()
|
||||
except PathNotFound as e:
|
||||
if len(e.args) > 0:
|
||||
bad_path = e.args[0]
|
||||
else:
|
||||
try:
|
||||
return self._unix_confd_path()
|
||||
except PathNotFound as e:
|
||||
if len(e.args) > 0:
|
||||
bad_path = e.args[0]
|
||||
try:
|
||||
return self._unix_confd_path()
|
||||
except PathNotFound as e:
|
||||
if len(e.args) > 0:
|
||||
bad_path = e.args[0]
|
||||
|
||||
cur_path = os.path.dirname(os.path.realpath(__file__))
|
||||
cur_path = os.path.join(cur_path, 'conf.d')
|
||||
|
@ -262,18 +255,8 @@ class Paths(object):
|
|||
return path2
|
||||
raise PathNotFound(path)
|
||||
|
||||
def _windows_confd_path(self):
|
||||
common_data = self._windows_commondata_path()
|
||||
path = os.path.join(common_data, 'Datadog', 'conf.d')
|
||||
if os.path.exists(path):
|
||||
return path
|
||||
raise PathNotFound(path)
|
||||
|
||||
def get_checksd_path(self):
|
||||
if self.osname == 'windows':
|
||||
return self._windows_checksd_path()
|
||||
else:
|
||||
return self._unix_checksd_path()
|
||||
return self._unix_checksd_path()
|
||||
|
||||
def _unix_checksd_path(self):
|
||||
# Unix only will look up based on the current directory
|
||||
|
@ -285,54 +268,6 @@ class Paths(object):
|
|||
return checksd_path
|
||||
raise PathNotFound(checksd_path)
|
||||
|
||||
def _windows_checksd_path(self):
|
||||
if hasattr(sys, 'frozen'):
|
||||
# we're frozen - from py2exe
|
||||
prog_path = os.path.dirname(sys.executable)
|
||||
checksd_path = os.path.join(prog_path, '..', 'checks_d')
|
||||
else:
|
||||
cur_path = os.path.dirname(__file__)
|
||||
checksd_path = os.path.join(cur_path, '../collector/checks_d')
|
||||
|
||||
if os.path.exists(checksd_path):
|
||||
return checksd_path
|
||||
raise PathNotFound(checksd_path)
|
||||
|
||||
def _windows_commondata_path():
|
||||
"""Return the common appdata path, using ctypes
|
||||
From http://stackoverflow.com/questions/626796/\
|
||||
how-do-i-find-the-windows-common-application-data-folder-using-python
|
||||
"""
|
||||
import ctypes
|
||||
from ctypes import windll
|
||||
from ctypes import wintypes
|
||||
|
||||
_SHGetFolderPath = windll.shell32.SHGetFolderPathW
|
||||
_SHGetFolderPath.argtypes = [wintypes.HWND,
|
||||
ctypes.c_int,
|
||||
wintypes.HANDLE,
|
||||
wintypes.DWORD, wintypes.LPCWSTR]
|
||||
|
||||
path_buf = wintypes.create_unicode_buffer(wintypes.MAX_PATH)
|
||||
return path_buf.value
|
||||
|
||||
def set_win32_cert_path(self):
|
||||
"""In order to use tornado.httpclient with the packaged .exe on Windows we
|
||||
need to override the default ceritifcate location which is based on the path
|
||||
to tornado and will give something like "C:\path\to\program.exe\tornado/cert-file".
|
||||
"""
|
||||
if hasattr(sys, 'frozen'):
|
||||
# we're frozen - from py2exe
|
||||
prog_path = os.path.dirname(sys.executable)
|
||||
crt_path = os.path.join(prog_path, 'ca-certificates.crt')
|
||||
else:
|
||||
cur_path = os.path.dirname(__file__)
|
||||
crt_path = os.path.join(cur_path, 'packaging', 'monasca-agent', 'win32',
|
||||
'install_files', 'ca-certificates.crt')
|
||||
import tornado.simple_httpclient
|
||||
log.info("Windows certificate path: %s" % crt_path)
|
||||
tornado.simple_httpclient._DEFAULT_CA_CERTS = crt_path
|
||||
|
||||
|
||||
def plural(count):
|
||||
if count == 1:
|
||||
|
|
|
@ -29,7 +29,6 @@ import tornado.options
|
|||
import tornado.web
|
||||
|
||||
# agent import
|
||||
import monasca_agent.common.check_status as check_status
|
||||
import monasca_agent.common.config as cfg
|
||||
import monasca_agent.common.metrics as metrics
|
||||
import monasca_agent.common.util as util
|
||||
|
@ -222,18 +221,12 @@ def main():
|
|||
# If we don't have any arguments, run the server.
|
||||
if not args:
|
||||
app = init_forwarder(skip_ssl_validation, use_simple_http_client=use_simple_http_client)
|
||||
try:
|
||||
app.run()
|
||||
finally:
|
||||
check_status.ForwarderStatus.remove_latest_status()
|
||||
app.run()
|
||||
|
||||
else:
|
||||
usage = "%s [help|info]. Run with no commands to start the server" % (sys.argv[0])
|
||||
command = args[0]
|
||||
if command == 'info':
|
||||
logging.getLogger().setLevel(logging.ERROR)
|
||||
return check_status.ForwarderStatus.print_latest_status()
|
||||
elif command == 'help':
|
||||
if command == 'help':
|
||||
print(usage)
|
||||
else:
|
||||
print("Unknown command: %s" % command)
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
#!/usr/bin/env python
|
||||
# (C) Copyright 2015 Hewlett Packard Enterprise Development Company LP
|
||||
# (C) Copyright 2015,2016 Hewlett Packard Enterprise Development Company LP
|
||||
|
||||
"""
|
||||
A Python Statsd implementation with dimensions added
|
||||
|
@ -23,7 +23,6 @@ import sys
|
|||
|
||||
# project
|
||||
import monasca_agent.common.aggregator as agg
|
||||
import monasca_agent.common.check_status as check_status
|
||||
|
||||
log = logging.getLogger('statsd')
|
||||
|
||||
|
@ -91,13 +90,8 @@ def main():
|
|||
parser = argparse.ArgumentParser(description='Monasca statsd - statsd server supporting metric dimensions')
|
||||
parser.add_argument('--config', '-c',
|
||||
help="Location for an alternate config rather than using the default config location.")
|
||||
parser.add_argument('--info', action='store_true', help="Output info about the running Monasca Statsd")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.info:
|
||||
logging.getLogger().setLevel(logging.ERROR)
|
||||
return check_status.MonascaStatsdStatus.print_latest_status()
|
||||
|
||||
monasca_statsd = MonascaStatsd(args.config)
|
||||
monasca_statsd.run()
|
||||
|
||||
|
|
|
@ -1,10 +1,9 @@
|
|||
# (C) Copyright 2015 Hewlett Packard Enterprise Development Company LP
|
||||
# (C) Copyright 2015,2016 Hewlett Packard Enterprise Development Company LP
|
||||
|
||||
import json
|
||||
import logging
|
||||
import threading
|
||||
|
||||
import monasca_agent.common.check_status as check_status
|
||||
import monasca_agent.common.emitter as emitter
|
||||
import monasca_agent.common.util as util
|
||||
|
||||
|
@ -47,16 +46,12 @@ class Reporter(threading.Thread):
|
|||
|
||||
log.info("Reporting to %s every %ss" % (self.api_host, self.interval))
|
||||
|
||||
# Persist a start-up message.
|
||||
check_status.MonascaStatsdStatus().persist()
|
||||
|
||||
while not self.finished.isSet(): # Use camel case isSet for 2.4 support.
|
||||
self.finished.wait(self.interval)
|
||||
self.flush()
|
||||
|
||||
# Clean up the status messages.
|
||||
log.debug("Stopped reporter")
|
||||
check_status.MonascaStatsdStatus.remove_latest_status()
|
||||
|
||||
def flush(self):
|
||||
try:
|
||||
|
@ -93,15 +88,5 @@ class Reporter(threading.Thread):
|
|||
log.info(
|
||||
"First flushes done, %s flushes will be logged every %s flushes." %
|
||||
(FLUSH_LOGGING_COUNT, FLUSH_LOGGING_PERIOD))
|
||||
|
||||
# Persist a status message.
|
||||
packet_count = self.aggregator.total_count
|
||||
packets_per_second = self.aggregator.packets_per_second(self.interval)
|
||||
check_status.MonascaStatsdStatus(flush_count=self.flush_count,
|
||||
packet_count=packet_count,
|
||||
packets_per_second=packets_per_second,
|
||||
metric_count=count,
|
||||
event_count=event_count).persist()
|
||||
|
||||
except Exception:
|
||||
log.exception("Error flushing metrics")
|
||||
|
|
|
@ -1,204 +0,0 @@
|
|||
# (C) Copyright 2015 Hewlett Packard Enterprise Development Company LP
|
||||
# set up logging before importing any other components
|
||||
from collector import modules
|
||||
from config import initialize_logging
|
||||
from monasca_agent.pup import pup
|
||||
from monasca_agent.statsd import daemon
|
||||
|
||||
initialize_logging('collector')
|
||||
|
||||
import logging
|
||||
import multiprocessing
|
||||
import sys
|
||||
import time
|
||||
import win32event
|
||||
import win32service
|
||||
import win32serviceutil
|
||||
|
||||
from collector.checks.collector import Collector
|
||||
from collector.jmxfetch import JMXFetch
|
||||
from ddagent import Application
|
||||
from emitter import http_emitter
|
||||
from optparse import Values
|
||||
from win32.common import handle_exe_click
|
||||
|
||||
from monasca_agent.common.config import get_config
|
||||
from monasca_agent.common.config import load_check_directory
|
||||
from monasca_agent.common.config import set_win32_cert_path
|
||||
from monasca_agent.statsd.daemon import MonascaStatsd
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
RESTART_INTERVAL = 24 * 60 * 60 # Defaults to 1 day
|
||||
|
||||
|
||||
class AgentSvc(win32serviceutil.ServiceFramework):
|
||||
_svc_name_ = "MonascaAgent"
|
||||
_svc_display_name_ = "Monasca Agent"
|
||||
_svc_description_ = "Sends metrics to Monasca"
|
||||
|
||||
def __init__(self, args):
|
||||
win32serviceutil.ServiceFramework.__init__(self, args)
|
||||
self.hWaitStop = win32event.CreateEvent(None, 0, 0, None)
|
||||
config = get_config(parse_args=False)
|
||||
|
||||
# Setup the correct options so the agent will use the forwarder
|
||||
opts, args = Values({
|
||||
'clean': False,
|
||||
'disabled_dd': False
|
||||
}), []
|
||||
agentConfig = get_config(parse_args=False, options=opts)
|
||||
self.restart_interval = \
|
||||
int(agentConfig.get('autorestart_interval', RESTART_INTERVAL))
|
||||
log.info("Autorestarting the collector ever %s seconds" % self.restart_interval)
|
||||
|
||||
# Keep a list of running processes so we can start/end as needed.
|
||||
# Processes will start started in order and stopped in reverse order.
|
||||
self.procs = {
|
||||
'monasca-forwarder': MonascaForwarder(config),
|
||||
'monasca-collector': MonascaCollector(agentConfig),
|
||||
'monasca-statsd': MonascaStatsd(config),
|
||||
}
|
||||
|
||||
def SvcStop(self):
|
||||
self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
|
||||
win32event.SetEvent(self.hWaitStop)
|
||||
|
||||
# Stop all services.
|
||||
self.running = False
|
||||
for proc in self.procs.values():
|
||||
proc.terminate()
|
||||
|
||||
def SvcDoRun(self):
|
||||
import servicemanager
|
||||
servicemanager.LogMsg(servicemanager.EVENTLOG_INFORMATION_TYPE,
|
||||
servicemanager.PYS_SERVICE_STARTED,
|
||||
(self._svc_name_, ''))
|
||||
self.start_ts = time.time()
|
||||
|
||||
# Start all services.
|
||||
for proc in self.procs.values():
|
||||
proc.start()
|
||||
|
||||
# Loop to keep the service running since all DD services are
|
||||
# running in separate processes
|
||||
self.running = True
|
||||
while self.running:
|
||||
if self.running:
|
||||
# Restart any processes that might have died.
|
||||
for name, proc in self.procs.iteritems():
|
||||
if not proc.is_alive() and proc.is_enabled:
|
||||
log.info("%s has died. Restarting..." % proc.name)
|
||||
# Make a new proc instances because multiprocessing
|
||||
# won't let you call .start() twice on the same instance.
|
||||
new_proc = proc.__class__(proc.config)
|
||||
new_proc.start()
|
||||
self.procs[name] = new_proc
|
||||
# Auto-restart the collector if we've been running for a while.
|
||||
if time.time() - self.start_ts > self.restart_interval:
|
||||
log.info('Auto-restarting collector after %s seconds' % self.restart_interval)
|
||||
collector = self.procs['collector']
|
||||
new_collector = collector.__class__(collector.config,
|
||||
start_event=False)
|
||||
collector.terminate()
|
||||
del self.procs['collector']
|
||||
new_collector.start()
|
||||
|
||||
# Replace old process and reset timer.
|
||||
self.procs['collector'] = new_collector
|
||||
self.start_ts = time.time()
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
class MonascaCollector(multiprocessing.Process):
|
||||
|
||||
def __init__(self, agentConfig, start_event=True):
|
||||
multiprocessing.Process.__init__(self, name='monasca-collector')
|
||||
self.config = agentConfig
|
||||
self.start_event = start_event
|
||||
# FIXME: `running` flag should be handled by the service
|
||||
self.running = True
|
||||
self.is_enabled = True
|
||||
|
||||
def run(self):
|
||||
log.debug("Windows Service - Starting monasca-collector")
|
||||
emitters = self.get_emitters()
|
||||
self.collector = Collector(self.config, emitters)
|
||||
|
||||
# Load the checks_d checks
|
||||
checksd = load_check_directory(self.config)
|
||||
|
||||
# Main agent loop will run until interrupted
|
||||
while self.running:
|
||||
self.collector.run(checksd=checksd, start_event=self.start_event)
|
||||
time.sleep(self.config['check_freq'])
|
||||
|
||||
def stop(self):
|
||||
log.debug("Windows Service - Stopping monasca-collector")
|
||||
self.collector.stop()
|
||||
if JMXFetch.is_running():
|
||||
JMXFetch.stop()
|
||||
self.running = False
|
||||
|
||||
def get_emitters(self):
|
||||
emitters = [http_emitter]
|
||||
custom = [s.strip() for s in
|
||||
self.config.get('custom_emitters', '').split(',')]
|
||||
for emitter_spec in custom:
|
||||
if not emitter_spec:
|
||||
continue
|
||||
emitters.append(modules.load(emitter_spec, 'emitter'))
|
||||
|
||||
return emitters
|
||||
|
||||
|
||||
class MonascaForwarder(multiprocessing.Process):
|
||||
|
||||
def __init__(self, agentConfig):
|
||||
multiprocessing.Process.__init__(self, name='monasca-forwarder')
|
||||
self.config = agentConfig
|
||||
self.is_enabled = True
|
||||
|
||||
def run(self):
|
||||
log.debug("Windows Service - Starting monasca-forwarder")
|
||||
set_win32_cert_path()
|
||||
port = self.config.get('listen_port', 17123)
|
||||
if port is None:
|
||||
port = 17123
|
||||
else:
|
||||
port = int(port)
|
||||
app_config = get_config(parse_args=False)
|
||||
self.forwarder = Application(port, app_config)
|
||||
self.forwarder.run()
|
||||
|
||||
def stop(self):
|
||||
log.debug("Windows Service - Stopping monasca-forwarder")
|
||||
self.forwarder.stop()
|
||||
|
||||
|
||||
class MonascaStatsdProcess(multiprocessing.Process):
|
||||
|
||||
def __init__(self, agentConfig):
|
||||
multiprocessing.Process.__init__(self, name='monasca-statsd')
|
||||
self.config = agentConfig
|
||||
self.is_enabled = True
|
||||
|
||||
def run(self):
|
||||
log.debug("Windows Service - Starting monasca-statsd server")
|
||||
self.reporter, self.server, _ = daemon.init_monasca_statsd()
|
||||
self.reporter.start()
|
||||
self.server.start()
|
||||
|
||||
def stop(self):
|
||||
log.debug("Windows Service - Stopping monasca-statsd server")
|
||||
self.server.stop()
|
||||
self.reporter.stop()
|
||||
self.reporter.join()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
multiprocessing.freeze_support()
|
||||
if len(sys.argv) == 1:
|
||||
handle_exe_click(AgentSvc._svc_name_)
|
||||
else:
|
||||
win32serviceutil.HandleCommandLine(AgentSvc)
|
|
@ -1,22 +0,0 @@
|
|||
# (C) Copyright 2015 Hewlett Packard Enterprise Development Company LP
|
||||
|
||||
import ctypes
|
||||
|
||||
|
||||
def handle_exe_click(name):
|
||||
"""When the executables are clicked directly in the UI, we must let the
|
||||
user know that they have to install the program as a service instead of
|
||||
running it directly.
|
||||
"""
|
||||
message = """To use %(name)s, you must install it as a service.
|
||||
|
||||
To install %(name)s as a service, you must run the following in the console:
|
||||
|
||||
%(name)s.exe install
|
||||
|
||||
For all available options, including how to install the service for a particular user, run the following in a console:
|
||||
|
||||
%(name)s.exe help
|
||||
""" % ({'name': name})
|
||||
MessageBox = ctypes.windll.user32.MessageBoxA
|
||||
MessageBox(None, message, 'Install as a Service', 0)
|
|
@ -1,536 +0,0 @@
|
|||
# Copyright <20><> 2009-2010 CEA
|
||||
# (C) Copyright 2015 Hewlett Packard Enterprise Development Company LP
|
||||
# Pierre Raybaut
|
||||
# Licensed under the terms of the CECILL License
|
||||
# Modified for Datadog
|
||||
|
||||
try:
|
||||
from yaml import CLoader as Loader
|
||||
except ImportError:
|
||||
from yaml import Loader
|
||||
import os
|
||||
import os.path as osp
|
||||
import sys
|
||||
import threading as thread
|
||||
import webbrowser
|
||||
import yaml
|
||||
|
||||
import win32service
|
||||
import win32serviceutil
|
||||
|
||||
# GUI Imports
|
||||
from guidata.qt.QtCore import QPoint
|
||||
from guidata.qt.QtCore import QSize
|
||||
from guidata.qt.QtCore import Qt
|
||||
from guidata.qt.QtCore import QTimer
|
||||
from guidata.qt.QtCore import SIGNAL
|
||||
|
||||
from guidata.qt.QtGui import QFont
|
||||
from guidata.qt.QtGui import QGroupBox
|
||||
from guidata.qt.QtGui import QHBoxLayout
|
||||
from guidata.qt.QtGui import QInputDialog
|
||||
from guidata.qt.QtGui import QLabel
|
||||
from guidata.qt.QtGui import QListWidget
|
||||
from guidata.qt.QtGui import QMenu
|
||||
from guidata.qt.QtGui import QMessageBox
|
||||
from guidata.qt.QtGui import QPushButton
|
||||
from guidata.qt.QtGui import QSplitter
|
||||
from guidata.qt.QtGui import QSystemTrayIcon
|
||||
from guidata.qt.QtGui import QVBoxLayout
|
||||
from guidata.qt.QtGui import QWidget
|
||||
|
||||
from guidata.configtools import get_family
|
||||
from guidata.configtools import get_icon
|
||||
from guidata.configtools import MONOSPACE
|
||||
from guidata.qthelpers import get_std_icon
|
||||
from spyderlib.widgets.sourcecode.codeeditor import CodeEditor
|
||||
|
||||
|
||||
# Datadog
|
||||
from common.util import get_os
|
||||
from config import _windows_commondata_path
|
||||
from config import get_confd_path
|
||||
from config import get_config
|
||||
from config import get_config_path
|
||||
|
||||
EXCLUDED_WINDOWS_CHECKS = [
|
||||
'cacti', 'directory', 'gearmand',
|
||||
'hdfs', 'kafka_consumer', 'mcache', 'network',
|
||||
'redis', 'postfix', 'process', 'gunicorn', 'zk',
|
||||
]
|
||||
|
||||
MAIN_WINDOW_TITLE = "Datadog Agent Manager"
|
||||
|
||||
DATADOG_SERVICE = "DatadogAgent"
|
||||
|
||||
STATUS_PAGE_URL = "http://localhost:17125/status"
|
||||
AGENT_LOG_FILE = osp.join(_windows_commondata_path(), 'Datadog', 'logs', 'ddagent.log')
|
||||
|
||||
HUMAN_SERVICE_STATUS = {win32service.SERVICE_RUNNING: 'Service is running',
|
||||
win32service.SERVICE_START_PENDING: 'Service is starting',
|
||||
win32service.SERVICE_STOP_PENDING: 'Service is stopping',
|
||||
win32service.SERVICE_STOPPED: 'Service is stopped',
|
||||
"Unknown": "Cannot get service status"}
|
||||
|
||||
REFRESH_PERIOD = 5000
|
||||
|
||||
START_AGENT = "Start Agent"
|
||||
STOP_AGENT = "Stop Agent"
|
||||
RESTART_AGENT = "Restart Agent"
|
||||
STATUS_PAGE = "Status page"
|
||||
EXIT_MANAGER = "Exit Agent Manager"
|
||||
OPEN_LOG = "Open log file"
|
||||
|
||||
SYSTEM_TRAY_MENU = [
|
||||
(START_AGENT, lambda: service_manager("start")),
|
||||
(STOP_AGENT, lambda: service_manager("stop")),
|
||||
(RESTART_AGENT, lambda: service_manager("restart")),
|
||||
(STATUS_PAGE, lambda: webbrowser.open(STATUS_PAGE_URL)),
|
||||
(EXIT_MANAGER, lambda: sys.exit(0)),
|
||||
]
|
||||
|
||||
|
||||
def get_checks():
|
||||
checks = {}
|
||||
conf_d_directory = get_confd_path(get_os())
|
||||
|
||||
for filename in sorted(os.listdir(conf_d_directory)):
|
||||
module_name, ext = osp.splitext(filename)
|
||||
if filename.split('.')[0] in EXCLUDED_WINDOWS_CHECKS:
|
||||
continue
|
||||
if ext not in ('.yaml', '.example', '.disabled'):
|
||||
continue
|
||||
|
||||
agent_check = AgentCheck(filename, ext, conf_d_directory)
|
||||
if (agent_check.enabled or agent_check.module_name not in checks or
|
||||
(not agent_check.is_example and not checks[agent_check.module_name].enabled)):
|
||||
checks[agent_check.module_name] = agent_check
|
||||
|
||||
checks_list = checks.values()
|
||||
checks_list.sort(key=lambda c: c.module_name)
|
||||
|
||||
return checks_list
|
||||
|
||||
|
||||
class EditorFile(object):
|
||||
|
||||
def __init__(self, file_path, description):
|
||||
self.file_path = file_path
|
||||
self.description = description
|
||||
|
||||
def get_description(self):
|
||||
return self.description
|
||||
|
||||
def save(self, content):
|
||||
try:
|
||||
f = open(self.file_path, 'w')
|
||||
f.write(content)
|
||||
self.content = content
|
||||
info_popup("File saved.")
|
||||
except Exception as e:
|
||||
warning_popup("Unable to save file: \n %s" % str(e))
|
||||
raise
|
||||
|
||||
|
||||
class LogFile(EditorFile):
|
||||
|
||||
def __init__(self):
|
||||
EditorFile.__init__(self, AGENT_LOG_FILE, "Agent log file")
|
||||
|
||||
|
||||
class DatadogConf(EditorFile):
|
||||
|
||||
@property
|
||||
def api_key(self):
|
||||
config = get_config(parse_args=False, cfg_path=self.file_path)
|
||||
api_key = config.get('api_key', None)
|
||||
if not api_key or api_key == 'APIKEYHERE':
|
||||
return None
|
||||
return api_key
|
||||
|
||||
def check_api_key(self, editor):
|
||||
if self.api_key is None:
|
||||
api_key, ok = QInputDialog.getText(
|
||||
None, "Add your API KEY", "You must first set your api key in this file. You can find it here: https://app.datadoghq.com/account/settings#api")
|
||||
if ok and api_key:
|
||||
new_content = []
|
||||
for line in self.content.splitlines():
|
||||
if "api_key:" in line:
|
||||
new_content.append("api_key: %s" % str(api_key))
|
||||
else:
|
||||
new_content.append("%s" % line)
|
||||
new_content = "\n".join(new_content)
|
||||
self.save(new_content)
|
||||
editor.set_text(new_content)
|
||||
|
||||
if not is_service_stopped():
|
||||
service_manager("restart")
|
||||
else:
|
||||
service_manager("start")
|
||||
else:
|
||||
self.check_api_key(editor)
|
||||
|
||||
|
||||
class AgentCheck(EditorFile):
|
||||
|
||||
def __init__(self, filename, ext, conf_d_directory):
|
||||
file_path = osp.join(conf_d_directory, filename)
|
||||
self.module_name = filename.split('.')[0]
|
||||
|
||||
EditorFile.__init__(self, file_path, description=self.module_name.replace("_", " ").title())
|
||||
|
||||
self.enabled = ext == '.yaml'
|
||||
self.is_example = ext == '.example'
|
||||
self.enabled_name = osp.join(conf_d_directory, "%s.yaml" % self.module_name)
|
||||
self.disabled_name = "%s.disabled" % self.enabled_name
|
||||
|
||||
def enable(self):
|
||||
self.enabled = True
|
||||
os.rename(self.file_path, self.enabled_name)
|
||||
self.file_path = self.enabled_name
|
||||
|
||||
def disable(self):
|
||||
self.enabled = False
|
||||
os.rename(self.file_path, self.disabled_name)
|
||||
self.file_path = self.disabled_name
|
||||
|
||||
def save(self, content):
|
||||
check_yaml_syntax(content)
|
||||
EditorFile.save(self, content)
|
||||
|
||||
|
||||
class PropertiesWidget(QWidget):
|
||||
|
||||
def __init__(self, parent):
|
||||
QWidget.__init__(self, parent)
|
||||
font = QFont(get_family(MONOSPACE), 10, QFont.Normal)
|
||||
|
||||
info_icon = QLabel()
|
||||
icon = get_std_icon('MessageBoxInformation').pixmap(24, 24)
|
||||
info_icon.setPixmap(icon)
|
||||
info_icon.setFixedWidth(32)
|
||||
info_icon.setAlignment(Qt.AlignTop)
|
||||
|
||||
self.service_status_label = QLabel()
|
||||
self.service_status_label.setWordWrap(True)
|
||||
self.service_status_label.setAlignment(Qt.AlignTop)
|
||||
self.service_status_label.setFont(font)
|
||||
|
||||
self.desc_label = QLabel()
|
||||
self.desc_label.setWordWrap(True)
|
||||
self.desc_label.setAlignment(Qt.AlignTop)
|
||||
self.desc_label.setFont(font)
|
||||
|
||||
group_desc = QGroupBox("Description", self)
|
||||
layout = QHBoxLayout()
|
||||
layout.addWidget(info_icon)
|
||||
layout.addWidget(self.desc_label)
|
||||
layout.addStretch()
|
||||
layout.addWidget(self.service_status_label)
|
||||
|
||||
group_desc.setLayout(layout)
|
||||
|
||||
self.editor = CodeEditor(self)
|
||||
self.editor.setup_editor(linenumbers=True, font=font)
|
||||
self.editor.setReadOnly(False)
|
||||
group_code = QGroupBox("Source code", self)
|
||||
layout = QVBoxLayout()
|
||||
layout.addWidget(self.editor)
|
||||
group_code.setLayout(layout)
|
||||
|
||||
self.enable_button = QPushButton(get_icon("apply.png"),
|
||||
"Enable", self)
|
||||
|
||||
self.save_button = QPushButton(get_icon("filesave.png"),
|
||||
"Save", self)
|
||||
|
||||
self.edit_datadog_conf_button = QPushButton(get_icon("edit.png"),
|
||||
"Edit agent settings", self)
|
||||
|
||||
self.disable_button = QPushButton(get_icon("delete.png"),
|
||||
"Disable", self)
|
||||
|
||||
self.view_log_button = QPushButton(get_icon("txt.png"),
|
||||
"View log", self)
|
||||
|
||||
self.menu_button = QPushButton(get_icon("settings.png"),
|
||||
"Manager", self)
|
||||
|
||||
hlayout = QHBoxLayout()
|
||||
hlayout.addWidget(self.save_button)
|
||||
hlayout.addStretch()
|
||||
hlayout.addWidget(self.enable_button)
|
||||
hlayout.addStretch()
|
||||
hlayout.addWidget(self.disable_button)
|
||||
hlayout.addStretch()
|
||||
hlayout.addWidget(self.edit_datadog_conf_button)
|
||||
hlayout.addStretch()
|
||||
hlayout.addWidget(self.view_log_button)
|
||||
hlayout.addStretch()
|
||||
hlayout.addWidget(self.menu_button)
|
||||
|
||||
vlayout = QVBoxLayout()
|
||||
vlayout.addWidget(group_desc)
|
||||
vlayout.addWidget(group_code)
|
||||
vlayout.addLayout(hlayout)
|
||||
self.setLayout(vlayout)
|
||||
|
||||
self.current_file = None
|
||||
|
||||
def set_item(self, check):
|
||||
self.current_file = check
|
||||
self.desc_label.setText(check.get_description())
|
||||
self.editor.set_text_from_file(check.file_path)
|
||||
check.content = self.editor.toPlainText().__str__()
|
||||
if check.enabled:
|
||||
self.disable_button.setEnabled(True)
|
||||
self.enable_button.setEnabled(False)
|
||||
else:
|
||||
self.disable_button.setEnabled(False)
|
||||
self.enable_button.setEnabled(True)
|
||||
|
||||
def set_datadog_conf(self, datadog_conf):
|
||||
self.current_file = datadog_conf
|
||||
self.desc_label.setText(datadog_conf.get_description())
|
||||
self.editor.set_text_from_file(datadog_conf.file_path)
|
||||
datadog_conf.content = self.editor.toPlainText().__str__()
|
||||
self.disable_button.setEnabled(False)
|
||||
self.enable_button.setEnabled(False)
|
||||
|
||||
datadog_conf.check_api_key(self.editor)
|
||||
|
||||
def set_log_file(self, log_file):
|
||||
self.current_file = log_file
|
||||
self.desc_label.setText(log_file.get_description())
|
||||
self.editor.set_text_from_file(log_file.file_path)
|
||||
log_file.content = self.editor.toPlainText().__str__()
|
||||
self.disable_button.setEnabled(False)
|
||||
self.enable_button.setEnabled(False)
|
||||
self.editor.go_to_line(len(log_file.content.splitlines()))
|
||||
|
||||
|
||||
class MainWindow(QSplitter):
|
||||
|
||||
def __init__(self, parent=None):
|
||||
|
||||
QSplitter.__init__(self, parent)
|
||||
self.setWindowTitle(MAIN_WINDOW_TITLE)
|
||||
self.setWindowIcon(get_icon("agent.svg"))
|
||||
|
||||
self.sysTray = SystemTray(self)
|
||||
|
||||
self.connect(self.sysTray, SIGNAL(
|
||||
"activated(QSystemTrayIcon::ActivationReason)"), self.__icon_activated)
|
||||
|
||||
checks = get_checks()
|
||||
datadog_conf = DatadogConf(
|
||||
get_config_path(), description="Agent settings file: datadog.conf")
|
||||
self.log_file = LogFile()
|
||||
|
||||
listwidget = QListWidget(self)
|
||||
listwidget.addItems(
|
||||
[osp.basename(check.module_name).replace("_", " ").title() for check in checks])
|
||||
|
||||
self.properties = PropertiesWidget(self)
|
||||
|
||||
self.addWidget(listwidget)
|
||||
self.addWidget(self.properties)
|
||||
|
||||
self.connect(self.properties.enable_button, SIGNAL("clicked()"),
|
||||
lambda: enable_check(self.properties))
|
||||
|
||||
self.connect(self.properties.disable_button, SIGNAL("clicked()"),
|
||||
lambda: disable_check(self.properties))
|
||||
|
||||
self.connect(self.properties.save_button, SIGNAL("clicked()"),
|
||||
lambda: save_file(self.properties))
|
||||
|
||||
self.connect(listwidget, SIGNAL('currentRowChanged(int)'),
|
||||
lambda row: self.properties.set_item(checks[row]))
|
||||
|
||||
self.connect(self.properties.edit_datadog_conf_button, SIGNAL('clicked()'),
|
||||
lambda: self.properties.set_datadog_conf(datadog_conf))
|
||||
|
||||
self.connect(self.properties.view_log_button, SIGNAL('clicked()'),
|
||||
lambda: self.properties.set_log_file(self.log_file))
|
||||
|
||||
self.manager_menu = Menu(self)
|
||||
self.connect(
|
||||
self.properties.menu_button,
|
||||
SIGNAL("clicked()"),
|
||||
lambda: self.manager_menu.popup(
|
||||
self.properties.menu_button.mapToGlobal(
|
||||
QPoint(
|
||||
0,
|
||||
0))))
|
||||
|
||||
listwidget.setCurrentRow(0)
|
||||
|
||||
self.setSizes([150, 1])
|
||||
self.setStretchFactor(1, 1)
|
||||
self.resize(QSize(950, 600))
|
||||
self.properties.set_datadog_conf(datadog_conf)
|
||||
|
||||
self.do_refresh()
|
||||
|
||||
def do_refresh(self):
|
||||
try:
|
||||
if self.isVisible():
|
||||
service_status = get_service_status()
|
||||
self.properties.service_status_label.setText(HUMAN_SERVICE_STATUS[service_status])
|
||||
|
||||
if not is_service_stopped(
|
||||
service_status) and self.properties.current_file == self.log_file:
|
||||
self.properties.set_log_file(self.log_file)
|
||||
finally:
|
||||
QTimer.singleShot(REFRESH_PERIOD, self.do_refresh)
|
||||
|
||||
def closeEvent(self, event):
|
||||
self.hide()
|
||||
self.sysTray.show()
|
||||
event.ignore()
|
||||
|
||||
def __icon_activated(self, reason):
|
||||
if reason == QSystemTrayIcon.DoubleClick:
|
||||
self.show()
|
||||
|
||||
|
||||
class Menu(QMenu):
|
||||
|
||||
def __init__(self, parent=None, ):
|
||||
QMenu.__init__(self, parent)
|
||||
self.options = {}
|
||||
|
||||
for name, action in SYSTEM_TRAY_MENU:
|
||||
menu_action = self.addAction(name)
|
||||
self.connect(menu_action, SIGNAL('triggered()'), action)
|
||||
self.options[name] = menu_action
|
||||
|
||||
self.connect(self, SIGNAL("aboutToShow()"), lambda: self.update_options())
|
||||
|
||||
def update_options(self):
|
||||
status = get_service_status()
|
||||
if is_service_running(status):
|
||||
self.options[START_AGENT].setEnabled(False)
|
||||
self.options[RESTART_AGENT].setEnabled(True)
|
||||
self.options[STOP_AGENT].setEnabled(True)
|
||||
elif is_service_stopped(status):
|
||||
self.options[START_AGENT].setEnabled(True)
|
||||
self.options[RESTART_AGENT].setEnabled(False)
|
||||
self.options[STOP_AGENT].setEnabled(False)
|
||||
elif is_service_pending(status):
|
||||
self.options[START_AGENT].setEnabled(False)
|
||||
self.options[RESTART_AGENT].setEnabled(False)
|
||||
self.options[STOP_AGENT].setEnabled(False)
|
||||
|
||||
|
||||
class SystemTray(QSystemTrayIcon):
|
||||
|
||||
def __init__(self, parent=None):
|
||||
QSystemTrayIcon.__init__(self, parent)
|
||||
self.setIcon(get_icon("agent.png"))
|
||||
self.setVisible(True)
|
||||
self.options = {}
|
||||
|
||||
menu = Menu(self.parent())
|
||||
self.setContextMenu(menu)
|
||||
|
||||
|
||||
def disable_check(properties):
|
||||
check = properties.current_file
|
||||
new_content = properties.editor.toPlainText().__str__()
|
||||
|
||||
if check.content != new_content:
|
||||
warning_popup("You must first save the file.")
|
||||
return
|
||||
|
||||
properties.enable_button.setEnabled(True)
|
||||
properties.disable_button.setEnabled(False)
|
||||
check.disable()
|
||||
|
||||
|
||||
def enable_check(properties):
|
||||
check = properties.current_file
|
||||
|
||||
new_content = properties.editor.toPlainText().__str__()
|
||||
if check.content != new_content:
|
||||
warning_popup("You must first save the file")
|
||||
return
|
||||
|
||||
properties.enable_button.setEnabled(False)
|
||||
properties.disable_button.setEnabled(True)
|
||||
check.enable()
|
||||
|
||||
|
||||
def save_file(properties):
|
||||
current_file = properties.current_file
|
||||
new_content = properties.editor.toPlainText().__str__()
|
||||
current_file.save(new_content)
|
||||
|
||||
|
||||
def check_yaml_syntax(content):
|
||||
try:
|
||||
yaml.load(content, Loader=Loader)
|
||||
except Exception as e:
|
||||
warning_popup("Unable to parse yaml: \n %s" % str(e))
|
||||
raise
|
||||
|
||||
|
||||
def _service_manager(action):
|
||||
try:
|
||||
if action == 'stop':
|
||||
win32serviceutil.StopService(DATADOG_SERVICE)
|
||||
elif action == 'start':
|
||||
win32serviceutil.StartService(DATADOG_SERVICE)
|
||||
elif action == 'restart':
|
||||
win32serviceutil.RestartService(DATADOG_SERVICE)
|
||||
except Exception as e:
|
||||
warning_popup("Couldn't %s service: \n %s" % (action, str(e)))
|
||||
|
||||
|
||||
def service_manager(action, async=True):
|
||||
if not async:
|
||||
_service_manager(action)
|
||||
else:
|
||||
thread.start_new_thread(_service_manager, (action,))
|
||||
|
||||
|
||||
def get_service_status():
|
||||
try:
|
||||
return win32serviceutil.QueryServiceStatus(DATADOG_SERVICE)[1]
|
||||
except Exception:
|
||||
return "Unknown"
|
||||
|
||||
|
||||
def is_service_running(status=None):
|
||||
if status is None:
|
||||
status = get_service_status()
|
||||
return status == win32service.SERVICE_RUNNING
|
||||
|
||||
|
||||
def is_service_pending(status=None):
|
||||
if status is None:
|
||||
status = get_service_status()
|
||||
return status in [win32service.SERVICE_STOP_PENDING, win32service.SERVICE_START_PENDING]
|
||||
|
||||
|
||||
def is_service_stopped(status=None):
|
||||
if status is None:
|
||||
status = get_service_status()
|
||||
return status == win32service.SERVICE_STOPPED
|
||||
|
||||
|
||||
def warning_popup(message, parent=None):
|
||||
QMessageBox.warning(parent, 'Message', message, QMessageBox.Ok)
|
||||
|
||||
|
||||
def info_popup(message, parent=None):
|
||||
QMessageBox.information(parent, 'Message', message, QMessageBox.Ok)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
from guidata.qt.QtGui import QApplication
|
||||
app = QApplication([])
|
||||
win = MainWindow()
|
||||
win.show()
|
||||
app.exec_()
|
|
@ -1,21 +0,0 @@
|
|||
# (C) Copyright 2015 Hewlett Packard Enterprise Development Company LP
|
||||
|
||||
import traceback
|
||||
|
||||
|
||||
def shell():
|
||||
from config import get_version
|
||||
|
||||
print("""
|
||||
Datadog Agent v%s - Python Shell
|
||||
|
||||
""" % (get_version()))
|
||||
while True:
|
||||
cmd = raw_input('>>> ')
|
||||
try:
|
||||
exec(cmd)
|
||||
except Exception as e:
|
||||
print(traceback.format_exc(e))
|
||||
|
||||
if __name__ == "__main__":
|
||||
shell()
|
|
@ -5,23 +5,13 @@ import signal
|
|||
|
||||
from monasca_agent.collector.checks import AgentCheck
|
||||
from monasca_agent.common.util import Paths
|
||||
from monasca_agent.common.util import get_os
|
||||
|
||||
|
||||
def kill_subprocess(process_obj):
|
||||
try:
|
||||
process_obj.terminate()
|
||||
except AttributeError:
|
||||
# py < 2.6 doesn't support process.terminate()
|
||||
if get_os() == 'windows':
|
||||
import ctypes
|
||||
PROCESS_TERMINATE = 1
|
||||
handle = ctypes.windll.kernel32.OpenProcess(PROCESS_TERMINATE, False,
|
||||
process_obj.pid)
|
||||
ctypes.windll.kernel32.TerminateProcess(handle, -1)
|
||||
ctypes.windll.kernel32.CloseHandle(handle)
|
||||
else:
|
||||
os.kill(process_obj.pid, signal.SIGKILL)
|
||||
os.kill(process_obj.pid, signal.SIGKILL)
|
||||
|
||||
|
||||
def get_check(name, config_str):
|
||||
|
|
Loading…
Reference in New Issue