Update pep8 checks

* set the maximum line length to 100
* cleaned up the codes for pep8

Depends-On: https://review.openstack.org/560844
Change-Id: Id548b78b673cce869ef8291bdb6b4f1367b383ed
Signed-off-by: Jui Chandwaskar <jchandwaskar@op5.com>
This commit is contained in:
Jui Chandwaskar 2018-04-11 09:25:30 +02:00 committed by Witold Bedyk
parent bd42ecb1c3
commit f837eee559
81 changed files with 1575 additions and 697 deletions

View File

@ -1,3 +1,3 @@
# (C) Copyright 2015 Hewlett Packard Enterprise Development Company LP
from check import AgentCheck
from check import AgentCheck # noqa

View File

@ -6,7 +6,6 @@ import Queue
import threading
import eventlet
from eventlet.green import time
import multiprocessing
import monasca_agent.collector.checks
@ -44,7 +43,8 @@ class ServicesCheck(monasca_agent.collector.checks.AgentCheck):
"""
def __init__(self, name, init_config, agent_config, instances):
monasca_agent.collector.checks.AgentCheck.__init__(self, name, init_config, agent_config, instances)
monasca_agent.collector.checks.AgentCheck.__init__(
self, name, init_config, agent_config, instances)
# A dictionary to keep track of service statuses
self.statuses = {}

View File

@ -176,8 +176,8 @@ class KubernetesConnector(object):
class DynamicCheckHelper(object):
"""Supplements existing check class with reusable functionality to transform third-party metrics into Monasca ones
in a configurable way
"""Supplements existing check class with reusable functionality to transform
third-party metrics into Monasca ones in a configurable way
"""
COUNTERS_KEY = 'counters'
@ -211,15 +211,18 @@ class DynamicCheckHelper(object):
* Replace \\x?? values with _
* Replace illegal characters
- according to ANTLR grammar: ( '}' | '{' | '&' | '|' | '>' | '<' | '=' | ',' | ')' | '(' | ' ' | '"' )
- according to ANTLR grammar:
( '}' | '{' | '&' | '|' | '>' | '<' | '=' | ',' | ')' | '(' | ' ' | '"' )
- according to Python API validation: "<>={}(),\"\\\\|;&"
* Truncate to 255 chars
:param value: input value
:return: valid dimension value
"""
return re.sub(r'[|\\;,&=\']', '-', re.sub(r'[(}>]', ']', re.sub(r'[({<]', '[', value.replace(r'\x2d', '-').
replace(r'\x7e', '~'))))[:255]
return re.sub(r'[|\\;,&=\']', '-',
re.sub(r'[(}>]', ']',
re.sub(r'[({<]', '[', value.replace(r'\x2d', '-').
replace(r'\x7e', '~'))))[:255]
class DimMapping(object):
"""Describes how to transform dictionary like metadata attached to a metric into Monasca dimensions
@ -239,14 +242,16 @@ class DynamicCheckHelper(object):
def map_value(self, source_value):
"""Transform source value into target dimension value
:param source_value: label value to transform
:return: transformed dimension value or None if the regular expression did not match. An empty
result (caused by the regex having no match-groups) indicates that the label is used for filtering
:return: transformed dimension value or None if the regular
expression did not match. An empty result (caused by the regex
having no match-groups) indicates that the label is used for filtering
but not mapped to a dimension.
"""
if self.cregex:
match_groups = self.cregex.match(source_value)
if match_groups:
return DynamicCheckHelper._normalize_dim_value(self.separator.join(match_groups.groups()))
return DynamicCheckHelper._normalize_dim_value(
self.separator.join(match_groups.groups()))
else:
return None
else:
@ -312,15 +317,21 @@ class DynamicCheckHelper(object):
for grp, gspec in groups.items():
self._grp_metric_map[iname][grp] = gspec
self._grp_metric_cache[iname][grp] = {}
self._grp_dimension_map[iname][grp] = DynamicCheckHelper._build_dimension_map(gspec)
# add the global mappings as pseudo group, so that it is considered when searching for metrics
self._grp_dimension_map[iname][grp] =\
DynamicCheckHelper._build_dimension_map(gspec)
# add the global mappings as pseudo group, so that it is considered when
# searching for metrics
self._groups[iname].append(DynamicCheckHelper.DEFAULT_GROUP)
self._grp_metric_map[iname][DynamicCheckHelper.DEFAULT_GROUP] = self._metric_map[iname]
self._grp_metric_cache[iname][DynamicCheckHelper.DEFAULT_GROUP] = self._metric_cache[iname]
self._grp_dimension_map[iname][DynamicCheckHelper.DEFAULT_GROUP] = self._dimension_map[iname]
self._grp_metric_map[iname][DynamicCheckHelper.DEFAULT_GROUP] =\
self._metric_map[iname]
self._grp_metric_cache[iname][DynamicCheckHelper.DEFAULT_GROUP] =\
self._metric_cache[iname]
self._grp_dimension_map[iname][DynamicCheckHelper.DEFAULT_GROUP] =\
self._dimension_map[iname]
else:
raise exceptions.CheckException('instance %s is not supported: no element "mapping" found!', iname)
raise exceptions.CheckException(
'instance %s is not supported: no element "mapping" found!', iname)
def _get_group(self, instance, metric):
"""Search the group for a metric. Can be used only when metric names unambiguous across groups.
@ -361,10 +372,22 @@ class DynamicCheckHelper(object):
return DynamicCheckHelper._lookup_metric(metric, metric_cache, metric_map)
def is_enabled_metric(self, instance, metric, group=None):
return self._fetch_metric_spec(instance, metric, group).metric_type != DynamicCheckHelper.SKIP
return self._fetch_metric_spec(
instance, metric, group).metric_type != DynamicCheckHelper.SKIP
def push_metric_dict(self, instance, metric_dict, labels=None, group=None, timestamp=None, fixed_dimensions=None,
default_dimensions=None, max_depth=0, curr_depth=0, prefix='', index=-1):
def push_metric_dict(
self,
instance,
metric_dict,
labels=None,
group=None,
timestamp=None,
fixed_dimensions=None,
default_dimensions=None,
max_depth=0,
curr_depth=0,
prefix='',
index=-1):
"""This will extract metrics and dimensions from a dictionary.
The following mappings are applied:
@ -389,8 +412,9 @@ class DynamicCheckHelper(object):
server_requests=12
Mapping of textual values to dimensions to distinguish array elements. Make sure that tests attributes
are sufficient to distinguish the array elements. If not use the build-in 'index' dimension.
Mapping of textual values to dimensions to distinguish array elements. Make sure that tests
attributes are sufficient to distinguish the array elements. If not use the build-in
'index' dimension.
Input:
@ -430,8 +454,10 @@ class DynamicCheckHelper(object):
server_requests{server_role=slave, node_name=server2} = 500.0
Distinguish array elements where no textual attribute are available or no mapping has been configured for them.
In that case an 'index' dimension will be attached to the metric which has to be mapped properly.
Distinguish array elements where no textual attribute are available or no mapping has been
configured for them.
In that case an 'index' dimension will be attached to the metric which has to be mapped
properly.
Input:
@ -470,7 +496,8 @@ class DynamicCheckHelper(object):
:param group: group to use for mapping labels and prefixing
:param timestamp: timestamp to report for the measurement
:param fixed_dimensions: dimensions which are always added with fixed values
:param default_dimensions: dimensions to be added, can be overwritten by actual data in metric_dict
:param default_dimensions: dimensions to be added, can be overwritten by actual data in
metric_dict
:param max_depth: max. depth to recurse
:param curr_depth: depth of recursion
:param prefix: prefix to prepend to any metric
@ -478,7 +505,8 @@ class DynamicCheckHelper(object):
"""
# when traversing through an array, each element must be distinguished with dimensions
# therefore additional dimensions need to be calculated from the siblings of the actual number valued fields
# therefore additional dimensions need to be calculated from the siblings
# of the actual number valued fields
if default_dimensions is None:
default_dimensions = {}
if fixed_dimensions is None:
@ -486,7 +514,8 @@ class DynamicCheckHelper(object):
if labels is None:
labels = {}
if index != -1:
ext_labels = self.extract_dist_labels(instance['name'], group, metric_dict, labels.copy(), index)
ext_labels = self.extract_dist_labels(
instance['name'], group, metric_dict, labels.copy(), index)
if not ext_labels:
log.debug(
"skipping array due to lack of mapped dimensions for group %s "
@ -500,49 +529,92 @@ class DynamicCheckHelper(object):
for element, child in metric_dict.items():
# if child is a dictionary, then recurse
if isinstance(child, dict) and curr_depth < max_depth:
self.push_metric_dict(instance, child, ext_labels, group, timestamp, fixed_dimensions,
default_dimensions, max_depth, curr_depth + 1, prefix + element + '_')
# if child is a number, assume that it is a metric (it will be filtered out by the rate/gauge names)
self.push_metric_dict(
instance,
child,
ext_labels,
group,
timestamp,
fixed_dimensions,
default_dimensions,
max_depth,
curr_depth + 1,
prefix + element + '_')
# if child is a number, assume that it is a metric (it will be filtered
# out by the rate/gauge names)
elif isinstance(child, Number):
self.push_metric(instance, prefix + element, float(child), ext_labels, group, timestamp,
fixed_dimensions,
default_dimensions)
# if it is a list, then each array needs to be added. Additional dimensions must be found in order to
# distinguish the measurements.
self.push_metric(
instance,
prefix + element,
float(child),
ext_labels,
group,
timestamp,
fixed_dimensions,
default_dimensions)
# if it is a list, then each array needs to be added. Additional dimensions must be
# found in order to distinguish the measurements.
elif isinstance(child, list):
for i, child_element in enumerate(child):
if isinstance(child_element, dict):
if curr_depth < max_depth:
self.push_metric_dict(instance, child_element, ext_labels, group, timestamp,
fixed_dimensions, default_dimensions, max_depth, curr_depth + 1,
prefix + element + '_', index=i)
self.push_metric_dict(
instance,
child_element,
ext_labels,
group,
timestamp,
fixed_dimensions,
default_dimensions,
max_depth,
curr_depth + 1,
prefix + element + '_',
index=i)
elif isinstance(child_element, Number):
if len(self._get_mappings(instance['name'], group, 'index')) > 0:
idx_labels = ext_labels.copy()
idx_labels['index'] = str(i)
self.push_metric(instance, prefix + element, float(child_element), idx_labels, group,
timestamp, fixed_dimensions, default_dimensions)
self.push_metric(
instance,
prefix + element,
float(child_element),
idx_labels,
group,
timestamp,
fixed_dimensions,
default_dimensions)
else:
log.debug("skipping array due to lack of mapped 'index' dimensions for group %s",
group if group else '<root>')
log.debug(
"skipping array due to lack of mapped 'index' dimensions"
"for group %s",
group if group else '<root>')
else:
log.debug('nested arrays are not supported for configurable extraction of element %s', element)
log.debug(
'nested arrays are not supported for configurable extraction of'
'element %s', element)
def extract_dist_labels(self, instance_name, group, metric_dict, labels, index):
"""Extract additional distinguishing labels from metric dictionary. All top-level attributes which are
strings and for which a dimension mapping is available will be transformed into dimensions.
"""Extract additional distinguishing labels from metric dictionary. All top-level
attributes which are strings and for which a dimension mapping is available will be
transformed into dimensions.
:param instance_name: instance to be used
:param group: metric group or None for root/unspecified group
:param metric_dict: input dictionary containing the metric at the top-level
:param labels: labels dictionary to extend with the additional found metrics
:param index: index value to be used as fallback if no labels can be derived from string-valued attributes
or the derived labels are not mapped in the config.
:param index: index value to be used as fallback if no labels can be derived from
string-valued attributes or the derived labels are not mapped in the config.
:return: Extended labels, already including the 'labels' passed into this method
"""
ext_labels = None
# collect additional dimensions first from non-metrics
for element, child in metric_dict.items():
if isinstance(child, str) and len(self._get_mappings(instance_name, group, element)) > 0:
if isinstance(
child,
str) and len(
self._get_mappings(
instance_name,
group,
element)) > 0:
if not ext_labels:
ext_labels = labels.copy()
ext_labels[element] = child
@ -554,9 +626,18 @@ class DynamicCheckHelper(object):
return ext_labels
def push_metric(self, instance, metric, value, labels=None, group=None, timestamp=None, fixed_dimensions=None,
default_dimensions=None):
"""Pushes a meter using the configured mapping information to determine metric_type and map the name and dimensions
def push_metric(
self,
instance,
metric,
value,
labels=None,
group=None,
timestamp=None,
fixed_dimensions=None,
default_dimensions=None):
"""Pushes a meter using the configured mapping information to determine metric_type
and map the name and dimensions
:param instance: instance containing the check configuration
:param value: metric value (float)
@ -602,7 +683,12 @@ class DynamicCheckHelper(object):
if fixed_dimensions:
dims.update(fixed_dimensions)
log.debug('push %s %s = %s {%s}', metric_entry.metric_type, metric_entry.metric_name, value, dims)
log.debug(
'push %s %s = %s {%s}',
metric_entry.metric_type,
metric_entry.metric_name,
value,
dims)
if metric_entry.metric_type == DynamicCheckHelper.RATE:
self._check.rate(metric_name, float(value), dimensions=dims)
@ -640,7 +726,8 @@ class DynamicCheckHelper(object):
:param group:
:param instance_name:
:param labels:
:return: mapped dimensions or None if the dimensions filter did not match and the metric needs to be filtered
:return: mapped dimensions or None if the dimensions filter did not match
and the metric needs to be filtered
"""
dims = default_dimensions.copy()
# map all specified dimension all keys
@ -663,8 +750,8 @@ class DynamicCheckHelper(object):
# else the dimension will not map
except (IndexError, AttributeError): # probably the regex was faulty
log.exception(
'dimension %s value could not be mapped from %s: regex for mapped dimension %s '
'does not match %s',
'dimension %s value could not be mapped from %s: regex for mapped'
'dimension %s does not match %s',
target_dim, labelvalue, labelname, map_spec.regex)
return None
@ -687,17 +774,19 @@ class DynamicCheckHelper(object):
:param metric: incoming metric name
:param metric_type: GAUGE, RATE, COUNTER
:param metric_map: dictionary with mapping configuration for a metric group or the entire instance
:param metric_map: dictionary with mapping configuration for a metric group or
the entire instance
:return: new MetricSpec entry or None if metric is not listed as metric_type
"""
re_list = metric_map.get(DynamicCheckHelper.CONFIG_SECTIONS[metric_type], [])
for rx in re_list:
match_groups = re.match(rx, metric)
if match_groups:
metric_entry = DynamicCheckHelper.MetricSpec(metric_type=metric_type,
metric_name=DynamicCheckHelper._normalize_metricname(
metric,
match_groups))
metric_entry = DynamicCheckHelper.MetricSpec(
metric_type=metric_type,
metric_name=DynamicCheckHelper._normalize_metricname(
metric,
match_groups))
return metric_entry
return None
@ -713,14 +802,15 @@ class DynamicCheckHelper(object):
i = DynamicCheckHelper.GAUGE
metric_entry = metric_cache.get(metric, None)
while not metric_entry and i < len(DynamicCheckHelper.METRIC_TYPES):
metric_entry = DynamicCheckHelper._create_metric_spec(metric, DynamicCheckHelper.METRIC_TYPES[i],
metric_map)
metric_entry = DynamicCheckHelper._create_metric_spec(
metric, DynamicCheckHelper.METRIC_TYPES[i], metric_map)
i += 1
if not metric_entry:
# fall-through
metric_entry = DynamicCheckHelper.MetricSpec(metric_type=DynamicCheckHelper.SKIP,
metric_name=DynamicCheckHelper._normalize_metricname(metric))
metric_entry = DynamicCheckHelper.MetricSpec(
metric_type=DynamicCheckHelper.SKIP,
metric_name=DynamicCheckHelper._normalize_metricname(metric))
metric_cache[metric] = metric_entry
@ -732,7 +822,14 @@ class DynamicCheckHelper(object):
if match_groups and match_groups.lastindex > 0:
metric = '_'.join(match_groups.groups())
metric = re.sub('(?!^)([A-Z]+)', r'_\1', metric.replace('.', '_')).replace('__', '_').lower()
metric = re.sub(
'(?!^)([A-Z]+)',
r'_\1',
metric.replace(
'.',
'_')).replace(
'__',
'_').lower()
metric = re.sub(r"[,+*\-/()\[\]{}]", "_", metric)
# Eliminate multiple _
metric = re.sub(r"__+", "_", metric)
@ -785,16 +882,19 @@ def _parse_manifest_for_owner(resource_metadata):
resource_owner_name = owner_reference['name']
return resource_owner_type, resource_owner_name
except Exception:
log.info("Could not get owner from ownerReferences for resource {}".format(resource_name))
log.info("Could not get owner from ownerReferences"
"for resource {}".format(resource_name))
# Try to get owner from annotations
else:
try:
resource_created_by = json.loads(resource_metadata['annotations']['kubernetes.io/created-by'])
resource_created_by = json.loads(
resource_metadata['annotations']['kubernetes.io/created-by'])
resource_owner_type = resource_created_by['reference']['kind']
resource_owner_name = resource_created_by['reference']['name']
return resource_owner_type, resource_owner_name
except Exception:
log.info("Could not get resource owner from annotations for resource {}".format(resource_name))
log.info("Could not get resource owner from annotations"
"for resource {}".format(resource_name))
return None
@ -807,7 +907,8 @@ def _get_pod_owner_pair(kubernetes_connector, pod_owner_type, pod_owner_name, po
"Setting ReplicaSet as dimension")
deployment_name = None
else:
replicaset_endpoint = "/apis/extensions/v1beta1/namespaces/{}/replicasets/{}".format(pod_namespace, pod_owner_name)
replicaset_endpoint = "/apis/extensions/v1beta1/namespaces/{}/replicasets/{}".format(
pod_namespace, pod_owner_name)
deployment_name = _attempt_to_get_owner_name(kubernetes_connector, replicaset_endpoint)
if not deployment_name:
return 'replica_set', pod_owner_name
@ -821,7 +922,8 @@ def _get_pod_owner_pair(kubernetes_connector, pod_owner_type, pod_owner_name, po
"Setting job as dimension")
cronjob_name = None
else:
job_endpoint = "/apis/batch/v1/namespaces/{}/jobs/{}".format(pod_namespace, pod_owner_name)
job_endpoint = "/apis/batch/v1/namespaces/{}/jobs/{}".format(
pod_namespace, pod_owner_name)
cronjob_name = _attempt_to_get_owner_name(kubernetes_connector, job_endpoint)
if not cronjob_name:
return 'job', pod_owner_name
@ -836,5 +938,9 @@ def get_pod_owner(kubernetes_connector, pod_metadata):
pod_namespace = pod_metadata['namespace']
owner_pair = _parse_manifest_for_owner(pod_metadata)
if owner_pair:
return _get_pod_owner_pair(kubernetes_connector, owner_pair[0], owner_pair[1], pod_namespace)
return _get_pod_owner_pair(
kubernetes_connector,
owner_pair[0],
owner_pair[1],
pod_namespace)
return None

View File

@ -15,7 +15,11 @@ class SSLHTTPAdapter(HTTPAdapter):
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.options |= ssl.OP_NO_SSLv2
context.options |= ssl.OP_NO_SSLv3
self.poolmanager = PoolManager(num_pools=connections, maxsize=maxsize, block=block, ssl_version=context)
self.poolmanager = PoolManager(
num_pools=connections,
maxsize=maxsize,
block=block,
ssl_version=context)
class A10Session(object):
@ -53,7 +57,8 @@ class A10MemoryCheck(AgentCheck):
a10_device = instance.get("a10_device")
username = instance.get('a10_username')
password = instance.get('a10_password')
dimensions = self._set_dimensions({'service': 'networking', 'a10_device': a10_device}, instance)
dimensions = self._set_dimensions(
{'service': 'networking', 'a10_device': a10_device}, instance)
try:
authed_session = A10Session(a10_device, username, password, SSLHTTPAdapter)
@ -79,8 +84,13 @@ class A10MemoryCheck(AgentCheck):
try:
url = "https://" + a10_device + "/axapi/v3/system/memory/oper"
try:
request = requests.get(url=url, headers={"Content-type": "application/json",
"Authorization": "A10 %s" % self.auth_sig}, verify=False)
request = requests.get(
url=url,
headers={
"Content-type": "application/json",
"Authorization": "A10 %s" %
self.auth_sig},
verify=False)
except urllib3.exceptions.SSLError as e:
self.log.warning("Caught SSL exception {}".format(e))
@ -90,7 +100,8 @@ class A10MemoryCheck(AgentCheck):
convert_kb_to_mb = 1024
memory_data['a10.memory_total_mb'] = (data['memory']['oper']['Total']) / convert_kb_to_mb
memory_data['a10.memory_total_mb'] = (
data['memory']['oper']['Total']) / convert_kb_to_mb
memory_data['a10.memory_used_mb'] = (data['memory']['oper']['Used']) / convert_kb_to_mb
memory_data['a10.memory_free_mb'] = (data['memory']['oper']['Free']) / convert_kb_to_mb
memory_data['a10.memory_used'] = int(mem_used[:2])

View File

@ -110,6 +110,8 @@ class Apache(checks.AgentCheck):
self.log.warn("Assuming url was not correct. Trying to add ?auto suffix to the url")
self.check(instance)
else:
return services_checks.Status.DOWN, "%s is DOWN, error: No metrics available.".format(service_check_name)
return services_checks.Status.DOWN,
"%s is DOWN, error: No metrics available.".format(service_check_name)
else:
log.debug("Collected {0} metrics for {1} Apache Web Server".format(apache_host, metric_count))
log.debug("Collected {0} metrics for {1} Apache Web Server".format(apache_host,
metric_count))

View File

@ -56,17 +56,19 @@ class Cacti(AgentCheck):
# The rrdtool module is required for the check to work
try:
import rrdtool
import rrdtool # noqa
except ImportError:
raise Exception(
"Cannot import rrdtool module. This module is required for the cacti plugin to work correctly")
"Cannot import rrdtool module. This module is required for "
"the cacti plugin to work correctly")
# Try importing MySQL
try:
import pymysql
except ImportError:
raise Exception(
"Cannot import PyMySQL module. This module is required for the cacti plugin to work correctly")
"Cannot import PyMySQL module. This module is required for "
"the cacti plugin to work correctly")
connection = pymysql.connect(config.host, config.user, config.password, config.db)

View File

@ -120,7 +120,8 @@ class CadvisorHost(AgentCheck):
try:
machine_info = requests.get(self.cadvisor_machine_url).json()
except Exception as ex:
self.log.error("Error communicating with cAdvisor to collect machine data - {}".format(ex))
self.log.error(
"Error communicating with cAdvisor to collect machine data - {}".format(ex))
else:
self._parse_machine_info(machine_info)
self._parse_send_metrics(host_metrics, dimensions)
@ -164,7 +165,8 @@ class CadvisorHost(AgentCheck):
file_dimensions['device'] = filesystem['device']
usage_fs = -1
capacity_fs = 0
for cadvisor_key, (metric_name, metric_types, metric_units) in filesystem_metrics.items():
for cadvisor_key, (metric_name, metric_types,
metric_units) in filesystem_metrics.items():
if cadvisor_key in filesystem:
self._send_metrics("fs." + metric_name,
filesystem[cadvisor_key],
@ -175,14 +177,20 @@ class CadvisorHost(AgentCheck):
elif cadvisor_key == "capacity":
capacity_fs = int(filesystem[cadvisor_key])
if usage_fs < 0:
self.log.warn("no value for usage size of {}, file system usage (percent) couldn't be calculated".format(filesystem['device']))
self.log.warn(
"no value for usage size of {}, file system usage (percent) "
"couldn't be calculated".format(
filesystem['device']))
elif capacity_fs > 0:
self._send_metrics("fs.usage_perc",
(float(usage_fs) / capacity_fs) * 100,
file_dimensions,
["gauge"], ["percent"])
else:
self.log.warn("no value for capacity of {}, file system usage (percent) couldn't be calculated".format(filesystem['device']))
self.log.warn(
"no value for capacity of {}, file system usage (percent) "
"couldn't be calculated".format(
filesystem['device']))
def _parse_network(self, network_data, dimensions):
network_interfaces = network_data['interfaces']
@ -221,10 +229,20 @@ class CadvisorHost(AgentCheck):
if cadvisor_key in cpu_usage:
# Convert nanoseconds to seconds
cpu_usage_sec = cpu_usage[cadvisor_key] / 1000000000.0
self._send_metrics("cpu." + metric_name, cpu_usage_sec, dimensions, metric_types, metric_units)
self._send_metrics(
"cpu." + metric_name,
cpu_usage_sec,
dimensions,
metric_types,
metric_units)
# Provide metrics for number of cores if given
if self.num_cores > 0:
self._send_metrics("cpu.num_cores", self.num_cores, dimensions, ["gauge"], ["number_of_cores"])
self._send_metrics(
"cpu.num_cores",
self.num_cores,
dimensions,
["gauge"],
["number_of_cores"])
def _parse_send_metrics(self, metrics, dimensions):
for host, cadvisor_metrics in metrics.items():

View File

@ -5,7 +5,6 @@
from copy import deepcopy
import json
import logging
import math
import os
import stat
import subprocess
@ -264,7 +263,7 @@ class Congestion(AgentCheck):
for net in instance.addresses:
for ip in instance.addresses[net]:
if (ip['OS-EXT-IPS:type'] == 'fixed' and
ip['version'] == 4):
ip['version'] == 4):
vm_list[inst_name] = ip['addr']
return vm_list

View File

@ -64,7 +64,8 @@ class Cpu(checks.AgentCheck):
def _add_cpu_freq(self, data):
try:
lscpu_command = subprocess.Popen('lscpu', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
lscpu_command = subprocess.Popen(
'lscpu', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
lscpu_output = lscpu_command.communicate()[0].decode(
encoding='UTF-8')
cpu_freq_output = re.search("(CPU MHz:.*?(\d+\.\d+)\n)", lscpu_output)

View File

@ -33,7 +33,8 @@ class Crash(checks.AgentCheck):
dt = None
if dir_name is None:
return None
# Check for CentOS 7.1 and RHEL 7.1. <IP-address>-YYYY.MM.dd-HH:mm:ss (e.g. 127.0.0.1-2015.10.02-16:07:51)
# Check for CentOS 7.1 and RHEL 7.1. <IP-address>-YYYY.MM.dd-HH:mm:ss
# (e.g. 127.0.0.1-2015.10.02-16:07:51)
elif re.match(r".*-\d{4}[.]\d{2}[.]\d{2}-\d{2}:\d{2}:\d{2}$", dir_name):
dt = self._create_datetime_for_rhel71(dir_name)
else:

View File

@ -81,21 +81,33 @@ class Disk(checks.AgentCheck):
dimensions=dimensions)
disk_count += 1
log.debug('Collected {0} disk usage metrics for partition {1}'.format(disk_count, partition.mountpoint))
log.debug('Collected {0} disk usage metrics for partition {1}'.format(
disk_count,
partition.mountpoint))
disk_count = 0
if send_io_stats:
try:
stats = disk_stats[device_name]
self.rate("io.read_req_sec", round(float(stats.read_count), 2), device_name=device_name, dimensions=dimensions)
self.rate("io.write_req_sec", round(float(stats.write_count), 2), device_name=device_name, dimensions=dimensions)
self.rate("io.read_kbytes_sec", round(float(stats.read_bytes / 1024), 2), device_name=device_name, dimensions=dimensions)
self.rate("io.write_kbytes_sec", round(float(stats.write_bytes / 1024), 2), device_name=device_name, dimensions=dimensions)
self.rate("io.read_time_sec", round(float(stats.read_time / 1000), 2), device_name=device_name, dimensions=dimensions)
self.rate("io.write_time_sec", round(float(stats.write_time / 1000), 2), device_name=device_name, dimensions=dimensions)
self.rate("io.read_req_sec", round(float(stats.read_count), 2),
device_name=device_name, dimensions=dimensions)
self.rate("io.write_req_sec", round(float(stats.write_count), 2),
device_name=device_name, dimensions=dimensions)
self.rate("io.read_kbytes_sec",
round(float(stats.read_bytes / 1024), 2),
device_name=device_name, dimensions=dimensions)
self.rate("io.write_kbytes_sec",
round(float(stats.write_bytes / 1024), 2),
device_name=device_name, dimensions=dimensions)
self.rate("io.read_time_sec", round(float(stats.read_time / 1000), 2),
device_name=device_name, dimensions=dimensions)
self.rate("io.write_time_sec", round(float(stats.write_time / 1000), 2),
device_name=device_name, dimensions=dimensions)
log.debug('Collected 6 disk I/O metrics for partition {0}'.format(partition.mountpoint))
log.debug('Collected 6 disk I/O metrics for'
'partition {0}'.format(partition.mountpoint))
except KeyError:
log.debug('No Disk I/O metrics available for {0}...Skipping'.format(device_name))
log.debug('No Disk I/O metrics available for'
' {0}...Skipping'.format(device_name))
if send_rollup_stats:
self.gauge("disk.total_space_mb",

View File

@ -45,11 +45,15 @@ class Docker(checks.AgentCheck):
try:
docker_client = docker.Client(base_url=docker_url, version=self.docker_version,
timeout=self.connection_timeout)
running_containers = {container['Id']: container for container in self._get_containers(docker_client)}
running_containers = {
container['Id']: container for container in self._get_containers(docker_client)}
except Exception as e:
self.log.error("Could not get containers from Docker API skipping Docker check - {}".format(e))
self.log.error(
"Could not get containers from Docker API skipping Docker check - {}".format(e))
return
add_kubernetes_dimensions = instance.get('add_kubernetes_dimensions', DEFAULT_ADD_KUBERNETES_DIMENSIONS)
add_kubernetes_dimensions = instance.get(
'add_kubernetes_dimensions',
DEFAULT_ADD_KUBERNETES_DIMENSIONS)
dimensions = self._set_dimensions(None, instance)
self.gauge("container.running_count", len(running_containers), dimensions=dimensions)
self._set_container_pids(running_containers)
@ -67,14 +71,17 @@ class Docker(checks.AgentCheck):
try:
container_dimensions = dimensions.copy()
container_id = container['Id']
container_dimensions['name'] = self._get_container_name(container['Names'], container_id)
container_dimensions['name'] = self._get_container_name(
container['Names'], container_id)
container_dimensions['image'] = container['Image']
container_labels = container['Labels']
if add_kubernetes_dimensions:
if 'io.kubernetes.pod.name' in container_labels:
container_dimensions['kubernetes_pod_name'] = container_labels['io.kubernetes.pod.name']
container_dimensions['kubernetes_pod_name'] = \
container_labels['io.kubernetes.pod.name']
if 'io.kubernetes.pod.namespace' in container_labels:
container_dimensions['kubernetes_namespace'] = container_labels['io.kubernetes.pod.namespace']
container_dimensions['kubernetes_namespace'] = \
container_labels['io.kubernetes.pod.namespace']
self._report_cgroup_cpuacct(container_id, container_dimensions)
self._report_cgroup_memory(container_id, container_dimensions, system_memory)
self._report_cgroup_blkio(container_id, container_dimensions)
@ -104,8 +111,14 @@ class Docker(checks.AgentCheck):
def _report_cgroup_cpuacct(self, container_id, container_dimensions):
stat_file = self._get_cgroup_file('cpuacct', container_id, 'cpuacct.stat')
stats = self._parse_cgroup_pairs(stat_file)
self._report_rate_gauge_metric('container.cpu.user_time', stats['user'], container_dimensions)
self._report_rate_gauge_metric('container.cpu.system_time', stats['system'], container_dimensions)
self._report_rate_gauge_metric(
'container.cpu.user_time',
stats['user'],
container_dimensions)
self._report_rate_gauge_metric(
'container.cpu.system_time',
stats['system'],
container_dimensions)
def _report_cgroup_memory(self, container_id, container_dimensions, system_memory_limit):
stat_file = self._get_cgroup_file('memory', container_id, 'memory.stat')
@ -133,8 +146,14 @@ class Docker(checks.AgentCheck):
stat_file = self._get_cgroup_file('blkio', container_id,
'blkio.throttle.io_service_bytes')
stats = self._parse_cgroup_blkio_metrics(stat_file)
self._report_rate_gauge_metric('container.io.read_bytes', stats['io_read'], container_dimensions)
self._report_rate_gauge_metric('container.io.write_bytes', stats['io_write'], container_dimensions)
self._report_rate_gauge_metric(
'container.io.read_bytes',
stats['io_read'],
container_dimensions)
self._report_rate_gauge_metric(
'container.io.write_bytes',
stats['io_write'],
container_dimensions)
def _report_cgroup_cpu_pct(self, container_id, container_dimensions):
usage_file = self._get_cgroup_file('cpuacct', container_id, 'cpuacct.usage')
@ -154,14 +173,17 @@ class Docker(checks.AgentCheck):
self.gauge('container.cpu.utilization_perc', cpu_pct, dimensions=container_dimensions)
def _report_net_metrics(self, container, container_dimensions):
"""Find container network metrics by looking at /proc/$PID/net/dev of the container process."""
"""Find container network metrics by looking at /proc/$PID/net/dev
of the container process.
"""
proc_net_file = os.path.join(container['_proc_root'], 'net/dev')
try:
with open(proc_net_file, 'r') as f:
lines = f.readlines()
"""Two first lines are headers:
Inter-| Receive | Transmit
face |bytes packets errs drop fifo frame compressed multicast|bytes packets errs drop fifo colls carrier compressed
Inter-| Receive bytes packets errs drop | Transmit bytes packkets errs drop
face | fifo frame compressed multicast | fifo colls carrier compressed
"""
for line in lines[2:]:
cols = line.split(':', 1)
@ -170,13 +192,17 @@ class Docker(checks.AgentCheck):
container_network_dimensions = container_dimensions.copy()
container_network_dimensions['interface'] = interface_name
network_values = cols[1].split()
self._report_rate_gauge_metric("container.net.in_bytes", long(network_values[0]),
container_network_dimensions)
self._report_rate_gauge_metric("container.net.out_bytes", long(network_values[8]),
container_network_dimensions)
self._report_rate_gauge_metric(
"container.net.in_bytes", long(
network_values[0]), container_network_dimensions)
self._report_rate_gauge_metric(
"container.net.out_bytes", long(
network_values[8]), container_network_dimensions)
break
except Exception as e:
self.log.error("Failed to report network metrics from file {0}. Exception: {1}".format(proc_net_file, e))
self.log.error(
"Failed to report network metrics from file {0}. Exception: {1}".format(
proc_net_file, e))
# Docker API
def _get_containers(self, docker_client):
@ -190,8 +216,10 @@ class Docker(checks.AgentCheck):
stat_file_path_docker = os.path.join(mountpoint, "docker")
stat_file_path_coreos = os.path.join(mountpoint, "system.slice")
stat_file_path_kubernetes = os.path.join(mountpoint, container_id)
stat_file_path_kubernetes_docker = os.path.join(mountpoint, "system", "docker", container_id)
stat_file_path_docker_daemon = os.path.join(mountpoint, "docker-daemon", "docker", container_id)
stat_file_path_kubernetes_docker = os.path.join(
mountpoint, "system", "docker", container_id)
stat_file_path_docker_daemon = os.path.join(
mountpoint, "docker-daemon", "docker", container_id)
if os.path.exists(stat_file_path_lxc):
return '%(mountpoint)s/lxc/%(id)s/%(file)s'
@ -353,7 +381,9 @@ class Docker(checks.AgentCheck):
if matches:
container_id = matches[-1]
if container_id not in containers:
self.log.debug("Container %s not in container_dict, it's likely excluded", container_id)
self.log.debug(
"Container %s not in container_dict, it's likely excluded",
container_id)
continue
containers[container_id]['_pid'] = pid_dir
containers[container_id]['_proc_root'] = os.path.join(proc_path, pid_dir)

View File

@ -22,35 +22,55 @@ class ElasticSearch(AgentCheck):
"elasticsearch.docs.deleted": ("gauge", "indices.docs.deleted"),
"elasticsearch.store.size": ("gauge", "indices.store.size_in_bytes"),
"elasticsearch.indexing.index.total": ("gauge", "indices.indexing.index_total"),
"elasticsearch.indexing.index.time": ("gauge", "indices.indexing.index_time_in_millis", lambda v: float(v) / 1000),
"elasticsearch.indexing.index.time": ("gauge",
"indices.indexing.index_time_in_millis",
lambda v: float(v) / 1000),
"elasticsearch.indexing.index.current": ("gauge", "indices.indexing.index_current"),
"elasticsearch.indexing.delete.total": ("gauge", "indices.indexing.delete_total"),
"elasticsearch.indexing.delete.time": ("gauge", "indices.indexing.delete_time_in_millis", lambda v: float(v) / 1000),
"elasticsearch.indexing.delete.time": ("gauge",
"indices.indexing.delete_time_in_millis",
lambda v: float(v) / 1000),
"elasticsearch.indexing.delete.current": ("gauge", "indices.indexing.delete_current"),
"elasticsearch.get.total": ("gauge", "indices.get.total"),
"elasticsearch.get.time": ("gauge", "indices.get.time_in_millis", lambda v: float(v) / 1000),
"elasticsearch.get.time": ("gauge",
"indices.get.time_in_millis",
lambda v: float(v) / 1000),
"elasticsearch.get.current": ("gauge", "indices.get.current"),
"elasticsearch.get.exists.total": ("gauge", "indices.get.exists_total"),
"elasticsearch.get.exists.time": ("gauge", "indices.get.exists_time_in_millis", lambda v: float(v) / 1000),
"elasticsearch.get.exists.time": ("gauge",
"indices.get.exists_time_in_millis",
lambda v: float(v) / 1000),
"elasticsearch.get.missing.total": ("gauge", "indices.get.missing_total"),
"elasticsearch.get.missing.time": ("gauge", "indices.get.missing_time_in_millis", lambda v: float(v) / 1000),
"elasticsearch.get.missing.time": ("gauge",
"indices.get.missing_time_in_millis",
lambda v: float(v) / 1000),
"elasticsearch.search.query.total": ("gauge", "indices.search.query_total"),
"elasticsearch.search.query.time": ("gauge", "indices.search.query_time_in_millis", lambda v: float(v) / 1000),
"elasticsearch.search.query.time": ("gauge",
"indices.search.query_time_in_millis",
lambda v: float(v) / 1000),
"elasticsearch.search.query.current": ("gauge", "indices.search.query_current"),
"elasticsearch.search.fetch.total": ("gauge", "indices.search.fetch_total"),
"elasticsearch.search.fetch.time": ("gauge", "indices.search.fetch_time_in_millis", lambda v: float(v) / 1000),
"elasticsearch.search.fetch.time": ("gauge",
"indices.search.fetch_time_in_millis",
lambda v: float(v) / 1000),
"elasticsearch.search.fetch.current": ("gauge", "indices.search.fetch_current"),
"elasticsearch.merges.current": ("gauge", "indices.merges.current"),
"elasticsearch.merges.current.docs": ("gauge", "indices.merges.current_docs"),
"elasticsearch.merges.current.size": ("gauge", "indices.merges.current_size_in_bytes"),
"elasticsearch.merges.total": ("gauge", "indices.merges.total"),
"elasticsearch.merges.total.time": ("gauge", "indices.merges.total_time_in_millis", lambda v: float(v) / 1000),
"elasticsearch.merges.total.time": ("gauge",
"indices.merges.total_time_in_millis",
lambda v: float(v) / 1000),
"elasticsearch.merges.total.docs": ("gauge", "indices.merges.total_docs"),
"elasticsearch.merges.total.size": ("gauge", "indices.merges.total_size_in_bytes"),
"elasticsearch.refresh.total": ("gauge", "indices.refresh.total"),
"elasticsearch.refresh.total.time": ("gauge", "indices.refresh.total_time_in_millis", lambda v: float(v) / 1000),
"elasticsearch.refresh.total.time": ("gauge",
"indices.refresh.total_time_in_millis",
lambda v: float(v) / 1000),
"elasticsearch.flush.total": ("gauge", "indices.flush.total"),
"elasticsearch.flush.total.time": ("gauge", "indices.flush.total_time_in_millis", lambda v: float(v) / 1000),
"elasticsearch.flush.total.time": ("gauge",
"indices.flush.total_time_in_millis",
lambda v: float(v) / 1000),
"elasticsearch.process.open_fd": ("gauge", "process.open_file_descriptors"),
"elasticsearch.transport.rx_count": ("gauge", "transport.rx_count"),
"elasticsearch.transport.tx_count": ("gauge", "transport.tx_count"),
@ -78,9 +98,11 @@ class ElasticSearch(AgentCheck):
"elasticsearch.thread_pool.index.queue": ("gauge", "thread_pool.index.queue"),
"elasticsearch.thread_pool.index.rejected": ("gauge", "thread_pool.index.rejected"),
"elasticsearch.thread_pool.management.active": ("gauge", "thread_pool.management.active"),
"elasticsearch.thread_pool.management.threads": ("gauge", "thread_pool.management.threads"),
"elasticsearch.thread_pool.management.threads": ("gauge",
"thread_pool.management.threads"),
"elasticsearch.thread_pool.management.queue": ("gauge", "thread_pool.management.queue"),
"elasticsearch.thread_pool.management.rejected": ("gauge", "thread_pool.management.rejected"),
"elasticsearch.thread_pool.management.rejected": ("gauge",
"thread_pool.management.rejected"),
"elasticsearch.thread_pool.merge.active": ("gauge", "thread_pool.merge.active"),
"elasticsearch.thread_pool.merge.threads": ("gauge", "thread_pool.merge.threads"),
"elasticsearch.thread_pool.merge.queue": ("gauge", "thread_pool.merge.queue"),
@ -88,7 +110,8 @@ class ElasticSearch(AgentCheck):
"elasticsearch.thread_pool.percolate.active": ("gauge", "thread_pool.percolate.active"),
"elasticsearch.thread_pool.percolate.threads": ("gauge", "thread_pool.percolate.threads"),
"elasticsearch.thread_pool.percolate.queue": ("gauge", "thread_pool.percolate.queue"),
"elasticsearch.thread_pool.percolate.rejected": ("gauge", "thread_pool.percolate.rejected"),
"elasticsearch.thread_pool.percolate.rejected": ("gauge",
"thread_pool.percolate.rejected"),
"elasticsearch.thread_pool.refresh.active": ("gauge", "thread_pool.refresh.active"),
"elasticsearch.thread_pool.refresh.threads": ("gauge", "thread_pool.refresh.threads"),
"elasticsearch.thread_pool.refresh.queue": ("gauge", "thread_pool.refresh.queue"),
@ -103,10 +126,16 @@ class ElasticSearch(AgentCheck):
"elasticsearch.thread_pool.snapshot.rejected": ("gauge", "thread_pool.snapshot.rejected"),
"elasticsearch.http.current_open": ("gauge", "http.current_open"),
"elasticsearch.http.total_opened": ("gauge", "http.total_opened"),
"jvm.gc.concurrent_mark_sweep.count": ("gauge", "jvm.gc.collectors.ConcurrentMarkSweep.collection_count"),
"jvm.gc.concurrent_mark_sweep.collection_time": ("gauge", "jvm.gc.collectors.ConcurrentMarkSweep.collection_time_in_millis", lambda v: float(v) / 1000),
"jvm.gc.concurrent_mark_sweep.count": ("gauge", "jvm.gc.collectors.ConcurrentMarkSweep."
"collection_count"),
"jvm.gc.concurrent_mark_sweep.collection_time": ("gauge", "jvm.gc.collectors."
"ConcurrentMarkSweep."
"collection_time_in_millis",
lambda v: float(v) / 1000),
"jvm.gc.par_new.count": ("gauge", "jvm.gc.collectors.ParNew.collection_count"),
"jvm.gc.par_new.collection_time": ("gauge", "jvm.gc.collectors.ParNew.collection_time_in_millis", lambda v: float(v) / 1000),
"jvm.gc.par_new.collection_time": ("gauge", "jvm.gc.collectors.ParNew."
"collection_time_in_millis",
lambda v: float(v) / 1000),
"jvm.mem.heap_committed": ("gauge", "jvm.mem.heap_committed_in_bytes"),
"jvm.mem.heap_used": ("gauge", "jvm.mem.heap_used_in_bytes"),
"jvm.mem.non_heap_committed": ("gauge", "jvm.mem.non_heap_committed_in_bytes"),
@ -120,7 +149,10 @@ class ElasticSearch(AgentCheck):
"elasticsearch.relocating_shards": ("gauge", "relocating_shards"),
"elasticsearch.initializing_shards": ("gauge", "initializing_shards"),
"elasticsearch.unassigned_shards": ("gauge", "unassigned_shards"),
"elasticsearch.cluster_status": ("gauge", "status", lambda v: {"red": 0, "yellow": 1, "green": 2}.get(v, -1)),
"elasticsearch.cluster_status": ("gauge", "status", lambda v:
{"red": 0,
"yellow": 1,
"green": 2}.get(v, -1)),
}
def __init__(self, name, init_config, agent_config):
@ -193,11 +225,15 @@ class ElasticSearch(AgentCheck):
self.NODES_URL = "/_nodes?network=true"
additional_metrics = {
"elasticsearch.search.fetch.open_contexts": ("gauge", "indices.search.open_contexts"),
"elasticsearch.cache.filter.evictions": ("gauge", "indices.filter_cache.evictions"),
"elasticsearch.cache.filter.size": ("gauge", "indices.filter_cache.memory_size_in_bytes"),
"elasticsearch.search.fetch.open_contexts": ("gauge",
"indices.search.open_contexts"),
"elasticsearch.cache.filter.evictions": ("gauge",
"indices.filter_cache.evictions"),
"elasticsearch.cache.filter.size": ("gauge",
"indices.filter_cache.memory_size_in_bytes"),
"elasticsearch.id_cache.size": ("gauge", "indices.id_cache.memory_size_in_bytes"),
"elasticsearch.fielddata.size": ("gauge", "indices.fielddata.memory_size_in_bytes"),
"elasticsearch.fielddata.size": ("gauge",
"indices.fielddata.memory_size_in_bytes"),
"elasticsearch.fielddata.evictions": ("gauge", "indices.fielddata.evictions")
}
@ -218,9 +254,14 @@ class ElasticSearch(AgentCheck):
"elasticsearch.thread_pool.cache.queue": ("gauge", "thread_pool.cache.queue"),
"elasticsearch.thread_pool.cache.rejected": ("gauge", "thread_pool.cache.rejected"),
"jvm.gc.collection_count": ("gauge", "jvm.gc.collection_count"),
"jvm.gc.collection_time": ("gauge", "jvm.gc.collection_time_in_millis", lambda v: float(v) / 1000),
"jvm.gc.collection_time": ("gauge",
"jvm.gc.collection_time_in_millis",
lambda v: float(v) / 1000),
"jvm.gc.copy.count": ("gauge", "jvm.gc.collectors.Copy.collection_count"),
"jvm.gc.copy.collection_time": ("gauge", "jvm.gc.collectors.Copy.collection_time_in_millis", lambda v: float(v) / 1000)
"jvm.gc.copy.collection_time": ("gauge",
"jvm.gc.collectors."
"Copy.collection_time_in_millis",
lambda v: float(v) / 1000)
}
self.METRICS.update(additional_metrics)

View File

@ -95,7 +95,9 @@ class HAProxy(AgentCheck):
# Split the first line into an index of fields
# The line looks like:
# "# pxname,svname,qcur,qmax,scur,smax,slim,stot,bin,bout,dreq,dresp,ereq,econ,eresp,wretr,wredis,status,weight,act,bck,chkfail,chkdown,lastchg,downtime,qlimit,pid,iid,sid,throttle,lbtot,tracked,type,rate,rate_lim,rate_max,"
# "# pxname,svname,qcur,qmax,scur,smax,slim,stot,bin,bout,dreq,dresp,ereq,econ,eresp,wretr,
# wredis,status,weight,act,bck,chkfail,chkdown,lastchg,downtime,qlimit,pid,iid,sid,throttle
# ,lbtot,tracked,type,rate,rate_lim,rate_max,"
fields = [f.strip() for f in data[0][2:].split(',') if f]
hosts_statuses = defaultdict(int)

View File

@ -93,7 +93,9 @@ class HTTPCheck(services_checks.ServicesCheck):
try:
self.log.debug("Connecting to %s" % addr)
if disable_ssl_validation:
self.log.info("Skipping SSL certificate validation for %s based on configuration" % addr)
self.log.info(
"Skipping SSL certificate validation for %s based on configuration" %
addr)
h = Http(timeout=timeout, disable_ssl_certificate_validation=disable_ssl_validation)
if username is not None and password is not None:
h.add_credentials(username, password)
@ -186,7 +188,8 @@ class HTTPCheck(services_checks.ServicesCheck):
if re.search(pattern, result_string, re.DOTALL):
self.log.debug("Pattern match successful")
else:
error_string = 'Pattern match failed! "{0}" not in "{1}"'.format(pattern, result_string)
error_string = 'Pattern match failed! "{0}" not in "{1}"'.format(
pattern, result_string)
self.log.info(error_string)
# maximum length of value_meta including {'error':''} is 2048
# Cutting it down to 1024 here so we don't clutter the

View File

@ -61,7 +61,9 @@ class HTTPMetrics(http_check.HTTPCheck):
# everything requires a number
if metric_type in ['gauge', 'counter', 'rate']:
if not self._valid_number(value, metric_name):
self.log.warning("Invalid value '{0}' for metric '{1}'".format(value, metric_name))
self.log.warning(
"Invalid value '{0}' for metric '{1}'".format(
value, metric_name))
continue
if metric_type in self.metric_method:
@ -69,7 +71,9 @@ class HTTPMetrics(http_check.HTTPCheck):
value,
dimensions=dimensions)
else:
self.log.warning("Unrecognized type '{0}' for metric '{1}'".format(metric_type, metric_name))
self.log.warning(
"Unrecognized type '{0}' for metric '{1}'".format(
metric_type, metric_name))
success_string = '{0} is UP'.format(addr)
self.log.debug(success_string)

View File

@ -116,7 +116,7 @@ class JsonPlugin(checks.AgentCheck):
errors.append('%s: %s' % (path, err))
msg = ''
if errors:
msg = ', '.join(errors)
msg = ', '.join(errors)
if msg:
if len(msg) > 1024: # keep well below length limit
msg = msg[:-1021] + '...'

View File

@ -88,14 +88,18 @@ class KafkaCheck(checks.AgentCheck):
kafka_consumer.stop()
continue
# Remember the topic partitions encountered so that we can look up their broker offsets later
# Remember the topic partitions encountered so that we can look up their
# broker offsets later
topic_partitions[topic].update(set(partitions))
consumer_offsets[(consumer_group, topic)] = {}
for partition in partitions:
try:
consumer_offsets[(consumer_group, topic)][partition] = kafka_consumer.offsets[partition]
consumer_offsets[(consumer_group,
topic)][partition] = kafka_consumer.offsets[partition]
except KeyError:
self.log.error('Error fetching consumer offset for {0} partition {1}'.format(topic, partition))
self.log.error(
'Error fetching consumer offset for {0} partition {1}'.format(
topic, partition))
kafka_consumer.stop()
@ -106,7 +110,8 @@ class KafkaCheck(checks.AgentCheck):
offset_responses = []
for p in partitions:
try:
response = kafka_conn.send_offset_request([common.OffsetRequest(topic, p, -1, 1)])
response = kafka_conn.send_offset_request(
[common.OffsetRequest(topic, p, -1, 1)])
offset_responses.append(response[0])
except common.KafkaError as e:
self.log.error("Error fetching broker offset: {0}".format(e))

View File

@ -94,7 +94,8 @@ class Kubernetes(checks.AgentCheck):
raise Exception('Kubernetes check only supports one configured instance.')
self.connection_timeout = int(init_config.get('connection_timeout', DEFAULT_TIMEOUT))
self.host = None
self.report_container_metrics = init_config.get('report_container_metrics', REPORT_CONTAINER_METRICS)
self.report_container_metrics = init_config.get(
'report_container_metrics', REPORT_CONTAINER_METRICS)
self.report_container_mem_percent = init_config.get('report_container_mem_percent', True)
self.kubernetes_connector = None
@ -120,10 +121,14 @@ class Kubernetes(checks.AgentCheck):
pod_dimensions_map = {}
memory_limit_map = {}
dimensions = self._set_dimensions(None, instance)
# Remove hostname from dimensions as the majority of the metrics are not tied to the hostname.
# Remove hostname from dimensions as the majority of the metrics are not
# tied to the hostname.
del dimensions['hostname']
kubelet_health_status = self._get_api_health("{}/healthz".format(kubelet))
self.gauge("kubelet.health_status", 0 if kubelet_health_status else 1, dimensions=dimensions)
self.gauge(
"kubelet.health_status",
0 if kubelet_health_status else 1,
dimensions=dimensions)
try:
pods = self._get_result("{}/pods".format(kubelet))
except Exception as e:
@ -157,7 +162,9 @@ class Kubernetes(checks.AgentCheck):
try:
result = self._get_result(health_url, as_json=False)
except Exception as e:
self.log.error("Error connecting to the health endpoint {} with exception {}".format(health_url, e))
self.log.error(
"Error connecting to the health endpoint {} with exception {}".format(
health_url, e))
return False
else:
api_health = False
@ -167,8 +174,14 @@ class Kubernetes(checks.AgentCheck):
break
return api_health
def _process_pods(self, pods, kubernetes_labels, dimensions, container_dimension_map, pod_dimensions_map,
memory_limit_map):
def _process_pods(
self,
pods,
kubernetes_labels,
dimensions,
container_dimension_map,
pod_dimensions_map,
memory_limit_map):
for pod in pods:
pod_status = pod['status']
pod_spec = pod['spec']
@ -178,8 +191,11 @@ class Kubernetes(checks.AgentCheck):
# Pod does not have any containers assigned to it no-op going to next pod
continue
pod_dimensions = dimensions.copy()
pod_dimensions.update(utils.get_pod_dimensions(self.kubernetes_connector, pod['metadata'],
kubernetes_labels))
pod_dimensions.update(
utils.get_pod_dimensions(
self.kubernetes_connector,
pod['metadata'],
kubernetes_labels))
pod_key = pod_dimensions['pod_name'] + pod_dimensions['namespace']
pod_dimensions_map[pod_key] = pod_dimensions
pod_retry_count = 0
@ -197,20 +213,38 @@ class Kubernetes(checks.AgentCheck):
container_dimension_map[container_id] = container_dimensions
if self.report_container_metrics:
container_ready = 0 if container_status['ready'] else 1
self.gauge("container.ready_status", container_ready, container_dimensions, hostname="SUPPRESS")
self.gauge("container.restart_count", container_restart_count, container_dimensions,
hostname="SUPPRESS")
self.gauge(
"container.ready_status",
container_ready,
container_dimensions,
hostname="SUPPRESS")
self.gauge(
"container.restart_count",
container_restart_count,
container_dimensions,
hostname="SUPPRESS")
# getting an aggregated value for pod restart count
pod_retry_count += container_restart_count
# Report limit/request metrics
if self.report_container_metrics or self.report_container_mem_percent:
self._report_container_limits(pod_containers, container_dimension_map, name2id, memory_limit_map)
self._report_container_limits(
pod_containers, container_dimension_map, name2id, memory_limit_map)
self.gauge("pod.restart_count", pod_retry_count, pod_dimensions, hostname="SUPPRESS")
self.gauge("pod.phase", POD_PHASE.get(pod_status['phase']), pod_dimensions, hostname="SUPPRESS")
self.gauge(
"pod.phase",
POD_PHASE.get(
pod_status['phase']),
pod_dimensions,
hostname="SUPPRESS")
def _report_container_limits(self, pod_containers, container_dimension_map, name2id, memory_limit_map):
def _report_container_limits(
self,
pod_containers,
container_dimension_map,
name2id,
memory_limit_map):
for container in pod_containers:
container_name = container['name']
container_dimensions = container_dimension_map[name2id[container_name]]
@ -226,15 +260,22 @@ class Kubernetes(checks.AgentCheck):
cpu_limit = container_limits['cpu']
cpu_value = self._convert_cpu_to_cores(cpu_limit)
if self.report_container_metrics:
self.gauge("container.cpu.limit", cpu_value, container_dimensions, hostname="SUPPRESS")
self.gauge(
"container.cpu.limit",
cpu_value,
container_dimensions,
hostname="SUPPRESS")
else:
self.log.debug("Container {} does not have cpu limit set", container_name)
if 'memory' in container_limits:
memory_limit = container_limits['memory']
memory_in_bytes = utils.convert_memory_string_to_bytes(memory_limit)
if self.report_container_metrics:
self.gauge("container.memory.limit_bytes", memory_in_bytes, container_dimensions,
hostname="SUPPRESS")
self.gauge(
"container.memory.limit_bytes",
memory_in_bytes,
container_dimensions,
hostname="SUPPRESS")
if self.report_container_mem_percent:
container_key = container_name + " " + container_dimensions["namespace"]
if container_key not in memory_limit_map:
@ -249,15 +290,22 @@ class Kubernetes(checks.AgentCheck):
cpu_request = container_requests['cpu']
cpu_value = self._convert_cpu_to_cores(cpu_request)
if self.report_container_metrics:
self.gauge("container.request.cpu", cpu_value, container_dimensions, hostname="SUPPRESS")
self.gauge(
"container.request.cpu",
cpu_value,
container_dimensions,
hostname="SUPPRESS")
else:
self.log.debug("Container {} does not have cpu request set", container_name)
if 'memory' in container_requests:
memory_request = container_requests['memory']
memory_in_bytes = utils.convert_memory_string_to_bytes(memory_request)
if self.report_container_metrics:
self.gauge("container.request.memory_bytes", memory_in_bytes, container_dimensions,
hostname="SUPPRESS")
self.gauge(
"container.request.memory_bytes",
memory_in_bytes,
container_dimensions,
hostname="SUPPRESS")
else:
self.log.debug("Container {} does not have memory request set", container_name)
@ -297,15 +345,20 @@ class Kubernetes(checks.AgentCheck):
METRIC_TYPES_UNITS[metric_name][1])
self._add_pod_metric(metric_name, metric_value, pod_key, pod_map)
if self.report_container_mem_percent and cadvisor_key == "working_set":
if "container_name" in container_dimensions and "namespace" in container_dimensions:
container_key = container_dimensions["container_name"] + " " + container_dimensions["namespace"]
if "container_name" in container_dimensions \
and "namespace" in container_dimensions:
container_key = container_dimensions["container_name"] + \
" " + container_dimensions["namespace"]
if container_key not in memory_limit_map:
continue
memory_limit = memory_limit_map[container_key]
memory_usage = metric_value
memory_usage_percent = (memory_usage / memory_limit) * 100
self.gauge("container.mem.usage_percent", memory_usage_percent, container_dimensions,
hostname="SUPPRESS")
self.gauge(
"container.mem.usage_percent",
memory_usage_percent,
container_dimensions,
hostname="SUPPRESS")
def _parse_filesystem(self, filesystem_data, container_dimensions):
if not self.report_container_metrics:
@ -379,16 +432,21 @@ class Kubernetes(checks.AgentCheck):
pod_metrics)
def _add_pod_metric(self, metric_name, metric_value, pod_key, pod_metrics):
if pod_key:
if pod_key not in pod_metrics:
pod_metrics[pod_key] = {}
if metric_name not in pod_metrics[pod_key]:
pod_metrics[pod_key][metric_name] = metric_value
else:
pod_metrics[pod_key][metric_name] += metric_value
if pod_key:
if pod_key not in pod_metrics:
pod_metrics[pod_key] = {}
if metric_name not in pod_metrics[pod_key]:
pod_metrics[pod_key][metric_name] = metric_value
else:
pod_metrics[pod_key][metric_name] += metric_value
def _get_container_dimensions(self, container, instance_dimensions, container_spec, container_dimension_map,
pod_dimension_map):
def _get_container_dimensions(
self,
container,
instance_dimensions,
container_spec,
container_dimension_map,
pod_dimension_map):
container_id = ""
# meant to key through pod metrics/dimension dictionaries
@ -410,19 +468,26 @@ class Kubernetes(checks.AgentCheck):
pod_key = None
if 'labels' in container_spec:
container_labels = container_spec['labels']
if 'io.kubernetes.pod.namespace' in container_labels and 'io.kubernetes.pod.name' in container_labels:
if 'io.kubernetes.pod.namespace' in container_labels \
and 'io.kubernetes.pod.name' in container_labels:
pod_key = container_labels['io.kubernetes.pod.name'] + \
container_labels['io.kubernetes.pod.namespace']
# In case new pods showed up since we got our pod list from the kubelet
if pod_key in pod_dimension_map:
container_dimensions.update(pod_dimension_map[pod_key])
container_dimensions['container_name'] = container_labels['io.kubernetes.container.name']
container_dimensions['container_name'] = \
container_labels['io.kubernetes.container.name']
else:
pod_key = None
return pod_key, container_dimensions
def _process_containers(self, cadvisor_url, dimensions, container_dimension_map, pod_dimension_map,
memory_limit_map):
def _process_containers(
self,
cadvisor_url,
dimensions,
container_dimension_map,
pod_dimension_map,
memory_limit_map):
try:
cadvisor_spec_url = cadvisor_url + CADVISOR_SPEC_URL
cadvisor_metric_url = cadvisor_url + CADVISOR_METRIC_URL

View File

@ -57,7 +57,8 @@ class KubernetesAPI(checks.AgentCheck):
def check(self, instance):
kubernetes_labels = instance.get('kubernetes_labels', ["app"])
dimensions = self._set_dimensions(None, instance)
# Remove hostname from dimensions as the majority of the metrics are not tied to the hostname.
# Remove hostname from dimensions as the majority of the metrics are not
# tied to the hostname.
del dimensions['hostname']
kubernetes_api_health = self._get_api_health()
self.gauge("kubernetes.api.health_status", 0 if kubernetes_api_health else 1, dimensions,
@ -100,8 +101,11 @@ class KubernetesAPI(checks.AgentCheck):
if condition['status']:
component_status = True
break
self.gauge("kubernetes.component_status", 0 if component_status else 1, component_dimensions,
hostname="SUPPRESS")
self.gauge(
"kubernetes.component_status",
0 if component_status else 1,
component_dimensions,
hostname="SUPPRESS")
def _set_kubernetes_dimensions(self, dimensions, type, metadata, kubernetes_labels):
dimensions['type'] = metadata['name']
@ -136,7 +140,12 @@ class KubernetesAPI(checks.AgentCheck):
self.gauge("kubernetes." + condition_map['metric_name'], 0, node_dimensions)
else:
value_meta = {"reason": condition['message'][:1024]}
self.gauge("kubernetes." + condition_map['metric_name'], 1, node_dimensions, value_meta=value_meta)
self.gauge(
"kubernetes." +
condition_map['metric_name'],
1,
node_dimensions,
value_meta=value_meta)
def _report_nodes_metrics(self, dimensions):
try:
@ -166,23 +175,35 @@ class KubernetesAPI(checks.AgentCheck):
for deployment in deployments['items']:
try:
deployment_dimensions = dimensions.copy()
self._set_kubernetes_dimensions(deployment_dimensions, "deployment", deployment['metadata'],
kubernetes_labels)
self._set_kubernetes_dimensions(
deployment_dimensions,
"deployment",
deployment['metadata'],
kubernetes_labels)
deployment_status = deployment['status']
deployment_replicas = deployment_status['replicas']
deployment_updated_replicas = deployment_status['updatedReplicas']
deployment_available_replicas = deployment_status['availableReplicas']
deployment_unavailable_replicas = deployment_available_replicas - deployment_replicas
deployment_unavailable_replicas = \
deployment_available_replicas - deployment_replicas
self.gauge("kubernetes.deployment.replicas", deployment_replicas,
deployment_dimensions, hostname="SUPPRESS")
self.gauge("kubernetes.deployment.available_replicas", deployment_available_replicas,
deployment_dimensions, hostname="SUPPRESS")
self.gauge("kubernetes.deployment.unavailable_replicas", deployment_unavailable_replicas,
deployment_dimensions, hostname="SUPPRESS")
self.gauge(
"kubernetes.deployment.available_replicas",
deployment_available_replicas,
deployment_dimensions,
hostname="SUPPRESS")
self.gauge(
"kubernetes.deployment.unavailable_replicas",
deployment_unavailable_replicas,
deployment_dimensions,
hostname="SUPPRESS")
self.gauge("kubernetes.deployment.updated_replicas", deployment_updated_replicas,
deployment_dimensions, hostname="SUPPRESS")
except Exception as e:
self.log.info("Error {} parsing deployment {}. Skipping".format(e, deployment), exc_info=e)
self.log.info(
"Error {} parsing deployment {}. Skipping".format(
e, deployment), exc_info=e)
def _report_replication_controller_metrics(self, dimensions, kubernetes_labels):
# Get namespaces first
@ -205,12 +226,18 @@ class KubernetesAPI(checks.AgentCheck):
continue
for rc in replication_controllers['items']:
rc_dimensions = dimensions.copy()
self._set_kubernetes_dimensions(rc_dimensions, "replication_controller", rc['metadata'],
kubernetes_labels)
self._set_kubernetes_dimensions(
rc_dimensions,
"replication_controller",
rc['metadata'],
kubernetes_labels)
rc_status = rc['status']
if 'replicas' not in rc_status or not rc_status['replicas']:
continue
self.gauge("kubernetes.replication.controller.replicas", rc_status['replicas'],
rc_dimensions, hostname="SUPPRESS")
self.gauge("kubernetes.replication.controller.ready_replicas", rc_status['readyReplicas'],
rc_dimensions, hostname="SUPPRESS")
self.gauge(
"kubernetes.replication.controller.ready_replicas",
rc_status['readyReplicas'],
rc_dimensions,
hostname="SUPPRESS")

View File

@ -58,14 +58,15 @@ DOM_ALIVE_NAMES = {libvirt.VIR_DOMAIN_BLOCKED: 'blocked',
libvirt.VIR_DOMAIN_SHUTDOWN: 'shuttingdown',
libvirt.VIR_DOMAIN_SHUTOFF: 'shutoff'} # shut off/nova suspend
DOM_SHUTOFF_STATES = {libvirt.VIR_DOMAIN_SHUTOFF_UNKNOWN: 'VM has been shutoff (reason unknown)',
libvirt.VIR_DOMAIN_SHUTOFF_SHUTDOWN: 'VM has been shut down',
libvirt.VIR_DOMAIN_SHUTOFF_DESTROYED: 'VM has been destroyed (forced off)',
libvirt.VIR_DOMAIN_SHUTOFF_CRASHED: 'VM has crashed',
libvirt.VIR_DOMAIN_SHUTOFF_MIGRATED: 'VM has been migrated',
libvirt.VIR_DOMAIN_SHUTOFF_SAVED: 'VM has been suspended',
libvirt.VIR_DOMAIN_SHUTOFF_FAILED: 'VM has failed to start',
libvirt.VIR_DOMAIN_SHUTOFF_FROM_SNAPSHOT: 'VM has been restored from powered off snapshot'}
DOM_SHUTOFF_STATES = {
libvirt.VIR_DOMAIN_SHUTOFF_UNKNOWN: 'VM has been shutoff (reason unknown)',
libvirt.VIR_DOMAIN_SHUTOFF_SHUTDOWN: 'VM has been shut down',
libvirt.VIR_DOMAIN_SHUTOFF_DESTROYED: 'VM has been destroyed (forced off)',
libvirt.VIR_DOMAIN_SHUTOFF_CRASHED: 'VM has crashed',
libvirt.VIR_DOMAIN_SHUTOFF_MIGRATED: 'VM has been migrated',
libvirt.VIR_DOMAIN_SHUTOFF_SAVED: 'VM has been suspended',
libvirt.VIR_DOMAIN_SHUTOFF_FAILED: 'VM has failed to start',
libvirt.VIR_DOMAIN_SHUTOFF_FROM_SNAPSHOT: 'VM has been restored from powered off snapshot'}
class LibvirtCheck(AgentCheck):
@ -297,8 +298,8 @@ class LibvirtCheck(AgentCheck):
if nsuuid is not None and ping_allowed:
if 'network' not in id_cache[inst_name]:
id_cache[inst_name]['network'] = []
id_cache[inst_name]['network'].append({'namespace': "qrouter-{0}".format(nsuuid),
'ip': ip['addr']})
id_cache[inst_name]['network'].append(
{'namespace': "qrouter-{0}".format(nsuuid), 'ip': ip['addr']})
elif ping_allowed is False:
self.log.debug("ICMP disallowed for {0} on {1}".format(inst_name,
ip['addr']))
@ -344,9 +345,9 @@ class LibvirtCheck(AgentCheck):
# Remove inactive VMs from the metric cache
write_metric_cache = deepcopy(metric_cache)
for instance in metric_cache:
if (('cpu.time' not in metric_cache[instance] or
self._test_vm_probation(time.strftime('%Y-%m-%dT%H:%M:%SZ',
time.gmtime(metric_cache[instance]['cpu.time']['timestamp'] + run_time))) < 0)):
if (('cpu.time' not in metric_cache[instance] or self._test_vm_probation(time.strftime(
'%Y-%m-%dT%H:%M:%SZ',
time.gmtime(metric_cache[instance]['cpu.time']['timestamp'] + run_time))) < 0)):
self.log.info("Expiring old/empty {0} from cache".format(instance))
del(write_metric_cache[instance])
try:
@ -357,7 +358,15 @@ class LibvirtCheck(AgentCheck):
except IOError as e:
self.log.error("Cannot write to {0}: {1}".format(self.metric_cache_file, e))
def _inspect_network(self, insp, inst, inst_name, instance_cache, metric_cache, dims_customer, dims_operations):
def _inspect_network(
self,
insp,
inst,
inst_name,
instance_cache,
metric_cache,
dims_customer,
dims_operations):
"""Inspect network metrics for an instance"""
for vnic in insp.inspect_vnics(inst):
sample_time = time.time()
@ -376,17 +385,20 @@ class LibvirtCheck(AgentCheck):
value = int(vnic[1].__getattribute__(metric))
if vnic[0].name in metric_cache[inst_name][metric_name]:
last_update_time = metric_cache[inst_name][metric_name][vnic[0].name]['timestamp']
last_update_time = \
metric_cache[inst_name][metric_name][vnic[0].name]['timestamp']
time_diff = sample_time - float(last_update_time)
rate_value = self._calculate_rate(value,
metric_cache[inst_name][metric_name][vnic[0].name]['value'],
time_diff)
rate_value = self._calculate_rate(
value,
metric_cache[inst_name][metric_name][vnic[0].name]['value'],
time_diff)
if rate_value < 0:
# Bad value, save current reading and skip
self.log.warn("Ignoring negative network sample for: "
"{0} new value: {1} old value: {2}"
.format(inst_name, value,
metric_cache[inst_name][metric_name][vnic[0].name]['value']))
self.log.warn(
"Ignoring negative network sample for: "
"{0} new value: {1} old value: {2}"
.format(inst_name, value,
metric_cache[inst_name][metric_name][vnic[0].name]['value']))
metric_cache[inst_name][metric_name][vnic[0].name] = {
'timestamp': sample_time,
'value': value}
@ -429,7 +441,15 @@ class LibvirtCheck(AgentCheck):
'timestamp': sample_time,
'value': value}
def _inspect_cpu(self, insp, inst, inst_name, instance_cache, metric_cache, dims_customer, dims_operations):
def _inspect_cpu(
self,
insp,
inst,
inst_name,
instance_cache,
metric_cache,
dims_customer,
dims_operations):
"""Inspect cpu metrics for an instance"""
sample_time = float("{:9f}".format(time.time()))
@ -491,7 +511,15 @@ class LibvirtCheck(AgentCheck):
metric_cache[inst_name]['cpu.time'] = {'timestamp': sample_time,
'value': cpu_info.time}
def _inspect_disks(self, insp, inst, inst_name, instance_cache, metric_cache, dims_customer, dims_operations):
def _inspect_disks(
self,
insp,
inst,
inst_name,
instance_cache,
metric_cache,
dims_customer,
dims_operations):
"""Inspect disk metrics for an instance"""
metric_aggregate = {}
@ -628,7 +656,8 @@ class LibvirtCheck(AgentCheck):
"""
inst_state = inst.state()
dom_status = inst_state[0] - 1
health_status = 0 if dom_status == 0 else 1 # anything other than 'running' is considered unhealthy
# Anything other than 'running' is considered unhealthy
health_status = 0 if dom_status == 0 else 1
metatag = None
if inst_state[0] in DOM_STATES:
@ -767,7 +796,8 @@ class LibvirtCheck(AgentCheck):
# Add dimensions that would be helpful for operations
dims_operations = dims_customer.copy()
dims_operations['tenant_id'] = instance_cache.get(inst_name)['tenant_id']
dims_operations = self._update_dims_with_metadata(instance_cache, inst_name, dims_operations)
dims_operations = self._update_dims_with_metadata(
instance_cache, inst_name, dims_operations)
if self.init_config.get('customer_metadata'):
for metadata in self.init_config.get('customer_metadata'):
metadata_value = (instance_cache.get(inst_name).
@ -787,7 +817,9 @@ class LibvirtCheck(AgentCheck):
# Nova can potentially get into a state where it can't see an
# instance, but libvirt can. This would cause TypeErrors as
# incomplete data is cached for this instance. Log and skip.
self.log.error("{0} is not known to nova after instance cache update -- skipping this ghost VM.".format(inst_name))
self.log.error(
"{0} is not known to nova after instance cache update -- "
"skipping this ghost VM.".format(inst_name))
continue
# Accumulate aggregate data
@ -796,10 +828,12 @@ class LibvirtCheck(AgentCheck):
agg_values[gauge] += instance_cache.get(inst_name)[gauge]
# Skip instances created within the probation period
vm_probation_remaining = self._test_vm_probation(instance_cache.get(inst_name)['created'])
vm_probation_remaining = self._test_vm_probation(
instance_cache.get(inst_name)['created'])
if (vm_probation_remaining >= 0):
self.log.info("Libvirt: {0} in probation for another {1} seconds".format(instance_cache.get(inst_name)['hostname'].encode('utf8'),
vm_probation_remaining))
self.log.info("Libvirt: {0} in probation for another {1} seconds".format(
instance_cache.get(inst_name)['hostname'].encode('utf8'),
vm_probation_remaining))
continue
vm_dom_state = self._inspect_state(insp, inst, inst_name,
@ -821,28 +855,72 @@ class LibvirtCheck(AgentCheck):
metric_cache[inst_name] = {}
if self.init_config.get('vm_cpu_check_enable'):
self._inspect_cpu(insp, inst, inst_name, instance_cache, metric_cache, dims_customer, dims_operations)
self._inspect_cpu(
insp,
inst,
inst_name,
instance_cache,
metric_cache,
dims_customer,
dims_operations)
if not self._collect_intervals['disk']['skip']:
if self.init_config.get('vm_disks_check_enable'):
self._inspect_disks(insp, inst, inst_name, instance_cache, metric_cache, dims_customer,
dims_operations)
self._inspect_disks(
insp,
inst,
inst_name,
instance_cache,
metric_cache,
dims_customer,
dims_operations)
if self.init_config.get('vm_extended_disks_check_enable'):
self._inspect_disk_info(insp, inst, inst_name, instance_cache, metric_cache, dims_customer,
dims_operations)
self._inspect_disk_info(
insp,
inst,
inst_name,
instance_cache,
metric_cache,
dims_customer,
dims_operations)
if not self._collect_intervals['vnic']['skip']:
if self.init_config.get('vm_network_check_enable'):
self._inspect_network(insp, inst, inst_name, instance_cache, metric_cache, dims_customer, dims_operations)
self._inspect_network(
insp,
inst,
inst_name,
instance_cache,
metric_cache,
dims_customer,
dims_operations)
# Memory utilization
# (req. balloon driver; Linux kernel param CONFIG_VIRTIO_BALLOON)
try:
mem_stats = inst.memoryStats()
mem_metrics = {'mem.free_gb': float(mem_stats['unused']) / 1024 / 1024,
'mem.swap_used_gb': float(mem_stats['swap_out']) / 1024 / 1024,
'mem.total_gb': float(mem_stats['available']) / 1024 / 1024,
'mem.used_gb': float(mem_stats['available'] - mem_stats['unused']) / 1024 / 1024,
'mem.free_perc': float(mem_stats['unused']) / float(mem_stats['available']) * 100}
mem_metrics = {
'mem.free_gb': float(
mem_stats['unused']) /
1024 /
1024,
'mem.swap_used_gb': float(
mem_stats['swap_out']) /
1024 /
1024,
'mem.total_gb': float(
mem_stats['available']) /
1024 /
1024,
'mem.used_gb': float(
mem_stats['available'] -
mem_stats['unused']) /
1024 /
1024,
'mem.free_perc': float(
mem_stats['unused']) /
float(
mem_stats['available']) *
100}
for name in mem_metrics:
self.gauge(name, mem_metrics[name], dimensions=dims_customer,
delegated_tenant=instance_cache.get(inst_name)['tenant_id'],
@ -850,12 +928,17 @@ class LibvirtCheck(AgentCheck):
self.gauge("vm.{0}".format(name), mem_metrics[name],
dimensions=dims_operations)
memory_info = insp.inspect_memory_resident(inst)
self.gauge('vm.mem.resident_gb', float(memory_info.resident) / 1024, dimensions=dims_operations)
self.gauge(
'vm.mem.resident_gb',
float(
memory_info.resident) / 1024,
dimensions=dims_operations)
except KeyError:
self.log.debug("Balloon driver not active/available on guest {0} ({1})".format(inst_name,
instance_cache.get(inst_name)['hostname'].encode('utf8')))
self.log.debug("Balloon driver not active/available on guest {0} ({1})".format(
inst_name, instance_cache.get(inst_name)['hostname'].encode('utf8')))
# Test instance's remote responsiveness (ping check) if possible
if (self.init_config.get('vm_ping_check_enable')) and self.init_config.get('ping_check') and 'network' in instance_cache.get(inst_name):
if (self.init_config.get('vm_ping_check_enable')) and self.init_config.get(
'ping_check') and 'network' in instance_cache.get(inst_name):
for net in instance_cache.get(inst_name)['network']:
ping_args = [dims_customer, dims_operations, inst_name, instance_cache, net]
ping_results.append(self.pool.apply_async(self._run_ping, ping_args))

View File

@ -118,11 +118,14 @@ class Lighttpd(AgentCheck):
if self.assumed_url.get(instance['lighttpd_status_url'],
None) is None and url[-len(url_suffix):] != url_suffix:
self.assumed_url[instance['lighttpd_status_url']] = '%s%s' % (url, url_suffix)
self.log.warn("Assuming url was not correct. Trying to add %s suffix to the url" % url_suffix)
self.log.warn(
"Assuming url was not correct. Trying to add %s suffix to the url" %
url_suffix)
self.check(instance)
else:
raise Exception(
"No metrics were fetched for this instance. Make sure that %s is the proper url." %
"No metrics were fetched for this instance. "
"Make sure that %s is the proper url." %
instance['lighttpd_status_url'])
def _get_server_version(self, headers):

View File

@ -168,7 +168,8 @@ class Memcache(AgentCheck):
import memcache
except ImportError:
raise Exception(
"Cannot import memcache module. Check the instructions to install this module at https://app.datadoghq.com/account/settings#integrations/mcache")
"Cannot import memcache module. Check the instructions to install"
"this module at https://app.datadoghq.com/account/settings#integrations/mcache")
# Hacky monkeypatch to fix a memory leak in the memcache library.
# See https://github.com/DataDog/dd-agent/issues/278 for details.

View File

@ -61,7 +61,7 @@ class Memory(checks.AgentCheck):
count += 1
if (hasattr(mem_info, 'buffers') and mem_info.buffers and
hasattr(mem_info, 'cached') and mem_info.cached):
hasattr(mem_info, 'cached') and mem_info.cached):
mem_used_real = mem_info.used
if psutil.version_info < (4, 4, 0):

View File

@ -109,8 +109,9 @@ class WrapMK(AgentCheck):
if 'name' in instance:
metric_name = instance['name']
elif instance['check_type'] == 'service':
metric_name = re.sub(' ', '_',
"nagios.{0}_status".format(measurement['display_name'].lower()))
metric_name = re.sub(
' ', '_', "nagios.{0}_status".format(
measurement['display_name'].lower()))
elif instance['check_type'] == 'host':
metric_name = 'nagios.host_status'

View File

@ -132,7 +132,8 @@ class MongoDb(AgentCheck):
self.log.error(
'mongo.yaml exists but pymongo module can not be imported. Skipping check.')
raise Exception(
'Python PyMongo Module can not be imported. Please check the installation instruction on the Datadog Website')
'Python PyMongo Module can not be imported. Please check the installation '
'instruction on the Datadog Website')
try:
from pymongo import uri_parser
@ -194,7 +195,8 @@ class MongoDb(AgentCheck):
data['replicationLag'] = lag.total_seconds()
else:
data['replicationLag'] = (
lag.microseconds + (lag.seconds + lag.days * 24 * 3600) * 10 ** 6) / 10.0 ** 6
lag.microseconds + (
lag.seconds + lag.days * 24 * 3600) * 10 ** 6) / 10.0 ** 6
if current is not None:
data['health'] = current['health']

View File

@ -66,8 +66,9 @@ class MySql(checks.AgentCheck):
return {"PyMySQL": version}
def check(self, instance):
host, port, user, password, mysql_sock, ssl_ca, ssl_key, ssl_cert, defaults_file, options = self._get_config(
instance)
host, port, user, password, mysql_sock, ssl_ca, ssl_key, ssl_cert, defaults_file, \
options = self._get_config(
instance)
self.ssl_options = {}
if ssl_ca is not None:
self.ssl_options['ca'] = ssl_ca
@ -102,7 +103,8 @@ class MySql(checks.AgentCheck):
defaults_file = instance.get('defaults_file', '')
options = instance.get('options', {})
return host, port, user, password, mysql_sock, ssl_ca, ssl_key, ssl_cert, defaults_file, options
return host, port, user, password, mysql_sock, ssl_ca, ssl_key, ssl_cert, \
defaults_file, options
def _connect(self, host, port, mysql_sock, user, password, defaults_file):
try:
@ -212,7 +214,9 @@ class MySql(checks.AgentCheck):
greater_502 = True
except Exception as exception:
self.log.warn("Cannot compute mysql version, assuming older than 5.0.2: %s" % str(exception))
self.log.warn(
"Cannot compute mysql version, assuming older than 5.0.2: %s" %
str(exception))
self.greater_502[host] = greater_502

View File

@ -37,19 +37,59 @@ class Network(checks.AgentCheck):
if self._is_nic_monitored(nic_name, excluded_ifaces, exclude_iface_re):
nic = nics[nic_name]
if instance.get('use_bits'):
self.rate('net.in_bits_sec', nic.bytes_recv * 8, device_name=nic_name, dimensions=dimensions)
self.rate('net.out_bits_sec', nic.bytes_sent * 8, device_name=nic_name, dimensions=dimensions)
self.rate(
'net.in_bits_sec',
nic.bytes_recv * 8,
device_name=nic_name,
dimensions=dimensions)
self.rate(
'net.out_bits_sec',
nic.bytes_sent * 8,
device_name=nic_name,
dimensions=dimensions)
else:
self.rate('net.in_bytes_sec', nic.bytes_recv, device_name=nic_name, dimensions=dimensions)
self.rate('net.out_bytes_sec', nic.bytes_sent, device_name=nic_name, dimensions=dimensions)
self.rate(
'net.in_bytes_sec',
nic.bytes_recv,
device_name=nic_name,
dimensions=dimensions)
self.rate(
'net.out_bytes_sec',
nic.bytes_sent,
device_name=nic_name,
dimensions=dimensions)
if instance.get('net_bytes_only'):
continue
self.rate('net.in_packets_sec', nic.packets_recv, device_name=nic_name, dimensions=dimensions)
self.rate('net.out_packets_sec', nic.packets_sent, device_name=nic_name, dimensions=dimensions)
self.rate('net.in_errors_sec', nic.errin, device_name=nic_name, dimensions=dimensions)
self.rate('net.out_errors_sec', nic.errout, device_name=nic_name, dimensions=dimensions)
self.rate('net.in_packets_dropped_sec', nic.dropin, device_name=nic_name, dimensions=dimensions)
self.rate('net.out_packets_dropped_sec', nic.dropout, device_name=nic_name, dimensions=dimensions)
self.rate(
'net.in_packets_sec',
nic.packets_recv,
device_name=nic_name,
dimensions=dimensions)
self.rate(
'net.out_packets_sec',
nic.packets_sent,
device_name=nic_name,
dimensions=dimensions)
self.rate(
'net.in_errors_sec',
nic.errin,
device_name=nic_name,
dimensions=dimensions)
self.rate(
'net.out_errors_sec',
nic.errout,
device_name=nic_name,
dimensions=dimensions)
self.rate(
'net.in_packets_dropped_sec',
nic.dropin,
device_name=nic_name,
dimensions=dimensions)
self.rate(
'net.out_packets_dropped_sec',
nic.dropout,
device_name=nic_name,
dimensions=dimensions)
log.debug('Collected 8 network metrics for device {0}'.format(nic_name))

View File

@ -82,7 +82,8 @@ class OvsCheck(AgentCheck):
for ifx in interface_data:
if not re.match(self.include_iface_re, ifx):
self.log.debug("include_iface_re {0} does not match with "
"ovs-vsctl interface {1} ".format(self.include_iface_re.pattern, ifx))
"ovs-vsctl interface {1} ".format(self.include_iface_re.pattern,
ifx))
continue
if ifx not in ctr_cache:
@ -90,7 +91,8 @@ class OvsCheck(AgentCheck):
for metric_name, idx in self._get_metrics_map(measure).items():
interface_stats_key = self._get_interface_stats_key(idx, metric_name, measure, ifx)
statistics_dict = interface_data[ifx]['statistics']
value = statistics_dict[interface_stats_key] if interface_stats_key in statistics_dict else 0
value = statistics_dict[interface_stats_key] \
if interface_stats_key in statistics_dict else 0
if metric_name in ctr_cache[ifx]:
cache_time = ctr_cache[ifx][metric_name]['timestamp']
time_diff = sample_time - float(cache_time)
@ -207,7 +209,7 @@ class OvsCheck(AgentCheck):
metric_name_rate = "vswitch.{0}_sec".format(metric_name)
metric_name_abs = "vswitch.{0}".format(metric_name)
if not self.use_health_metrics and interface_stats_key in HEALTH_METRICS:
continue
continue
if self.use_rate_metrics:
self.gauge(metric_name_rate, value[interface_stats_key],
dimensions=customer_dimensions,
@ -218,7 +220,8 @@ class OvsCheck(AgentCheck):
dimensions=ops_dimensions)
if self.use_absolute_metrics:
statistics_dict = interface_data[ifx]['statistics']
abs_value = statistics_dict[interface_stats_key] if interface_stats_key in statistics_dict else 0
abs_value = statistics_dict[interface_stats_key] \
if interface_stats_key in statistics_dict else 0
if self.use_bits and 'bytes' in interface_stats_key:
abs_value = abs_value * 8
# POST to customer

View File

@ -174,7 +174,8 @@ SELECT relname,
# Build dimensions
# descriptors are: (pg_name, dd_tag_name): value
# Special-case the "db" tag, which overrides the one that is passed as instance_dimensions
# Special-case the "db" tag, which overrides the one that is passed as
# instance_dimensions
# The reason is that pg_stat_database returns all databases regardless of the
# connection.
if not scope['relation'] and 'db' in dimensions:
@ -205,7 +206,8 @@ SELECT relname,
import psycopg2 as pg
except ImportError:
raise ImportError(
"psycopg2 library cannot be imported. Please check the installation instruction on the Datadog Website.")
"psycopg2 library cannot be imported. Please check the"
"installation instruction on the Datadog Website.")
if host == 'localhost' and password == '': # nosec
# Use ident method

View File

@ -92,7 +92,8 @@ class ProcessCheck(checks.AgentCheck):
total_thr = self._safely_increment_var(total_thr, p.num_threads())
try:
total_open_file_descriptors = self._safely_increment_var(total_open_file_descriptors, float(p.num_fds()))
total_open_file_descriptors = self._safely_increment_var(
total_open_file_descriptors, float(p.num_fds()))
except psutil.AccessDenied:
got_denied = True
@ -105,10 +106,14 @@ class ProcessCheck(checks.AgentCheck):
if io_permission:
try:
io_counters = p.io_counters()
total_read_count = self._safely_increment_var(total_read_count, io_counters.read_count)
total_write_count = self._safely_increment_var(total_write_count, io_counters.write_count)
total_read_kbytes = self._safely_increment_var(total_read_kbytes, float(io_counters.read_bytes / 1024))
total_write_kbytes = self._safely_increment_var(total_write_kbytes, float(io_counters.write_bytes / 1024))
total_read_count = self._safely_increment_var(
total_read_count, io_counters.read_count)
total_write_count = self._safely_increment_var(
total_write_count, io_counters.write_count)
total_read_kbytes = self._safely_increment_var(
total_read_kbytes, float(io_counters.read_bytes / 1024))
total_write_kbytes = self._safely_increment_var(
total_write_kbytes, float(io_counters.write_bytes / 1024))
except psutil.AccessDenied:
self.log.debug('monasca-agent user does not have ' +
'access to I/O counters for process' +
@ -130,8 +135,14 @@ class ProcessCheck(checks.AgentCheck):
"when trying to get the number of file descriptors")
return dict(zip(ProcessCheck.PROCESS_GAUGE,
(total_thr, total_cpu, total_rss, total_open_file_descriptors, total_read_count,
total_write_count, total_read_kbytes, total_write_kbytes)))
(total_thr,
total_cpu,
total_rss,
total_open_file_descriptors,
total_read_count,
total_write_count,
total_read_kbytes,
total_write_kbytes)))
def prepare_run(self):
"""Collect the list of processes once before each run"""
@ -164,13 +175,15 @@ class ProcessCheck(checks.AgentCheck):
if name is None:
raise KeyError('The "name" of process groups is mandatory')
if (search_string is None and username is None) or (search_string is not None and username is not None):
if (search_string is None and username is None) or (
search_string is not None and username is not None):
raise KeyError('"You must provide either "search_string" or "user"')
if username is None:
dimensions = self._set_dimensions({'process_name': name}, instance)
else:
dimensions = self._set_dimensions({'process_user': username, 'process_name': name}, instance)
dimensions = self._set_dimensions(
{'process_user': username, 'process_name': name}, instance)
pids = self.find_pids(search_string, username, exact_match=exact_match)

View File

@ -1,7 +1,6 @@
# (C) Copyright 2017-2018 Hewlett Packard Enterprise Development LP
import math
import requests
import six
import time
import yaml
@ -28,11 +27,13 @@ class Prometheus(checks.AgentCheck):
'prometheus.io/port': Scrape the pod on the indicated port instead of the default of '9102'.
Additional settings for prometheus endpoints
'monasca.io/usek8slabels': Attach kubernetes labels of the pod that is being scraped. Default to 'true'
'monasca.io/usek8slabels': Attach kubernetes labels of the pod that is being scraped.
Default to 'true'
'monasca.io/whitelist': Yaml list of metric names to whitelist against on detected endpoint
'monasca.io/report_pod_label_owner': If the metrics that are scraped contain pod as a label key we will attempt to get the
pod owner and attach that to the metric as another dimension. Very useful for other scraping from other solutions
that monitor k8s (Ex. kube-state-metrics). Default to 'false'
'monasca.io/report_pod_label_owner': If the metrics that are scraped contain pod as a label
key we will attempt to get the pod owner and attach that to the metric as another
dimension. Very useful for other scraping from other solutions
that monitor k8s (Ex. kube-state-metrics). Default to 'false'
"""
def __init__(self, name, init_config, agent_config, instances=None):
@ -44,7 +45,9 @@ class Prometheus(checks.AgentCheck):
self.detect_method = init_config.get("detect_method", "pod").lower()
self.kubelet_url = None
if instances is not None and len(instances) > 1:
raise Exception('Prometheus Client only supports one configured instance if auto detection is set')
raise Exception(
'Prometheus Client only supports one configured instance if auto'
'detection is set')
if self.detect_method not in ['pod', 'service']:
raise Exception('Invalid detect method {}. Must be either pod or service')
self.k8s_pod_cache = None
@ -89,9 +92,12 @@ class Prometheus(checks.AgentCheck):
else:
prometheus_endpoints = self._get_metric_endpoints_by_service(dimensions)
for prometheus_endpoint in prometheus_endpoints:
self.report_endpoint_metrics(prometheus_endpoint.scrape_endpoint, prometheus_endpoint.dimensions,
prometheus_endpoint.whitelist, prometheus_endpoint.metric_types,
prometheus_endpoint.report_pod_label_owner)
self.report_endpoint_metrics(
prometheus_endpoint.scrape_endpoint,
prometheus_endpoint.dimensions,
prometheus_endpoint.whitelist,
prometheus_endpoint.metric_types,
prometheus_endpoint.report_pod_label_owner)
def _get_metric_endpoints_by_pod(self, dimensions):
prometheus_endpoints = []
@ -109,7 +115,8 @@ class Prometheus(checks.AgentCheck):
pod_metadata = pod['metadata']
pod_spec = pod['spec']
pod_status = pod['status']
if "annotations" not in pod_metadata or not ('containers' in pod_spec and 'podIP' in pod_status):
if "annotations" not in pod_metadata or not (
'containers' in pod_spec and 'podIP' in pod_status):
# No annotations, containers, or endpoints skipping pod
continue
@ -137,7 +144,8 @@ class Prometheus(checks.AgentCheck):
use_k8s_labels, whitelist, metric_types, report_pod_label_owner = \
self._get_monasca_settings(pod_name, pod_annotations)
except Exception as e:
error_message = "Error parsing monasca annotations on endpoints {} with error - {}. " \
error_message = "Error parsing monasca annotations on endpoints {} " \
"with error - {}. " \
"Skipping scraping metrics".format(endpoints, e)
self.log.error(error_message)
continue
@ -151,8 +159,12 @@ class Prometheus(checks.AgentCheck):
self.kubernetes_labels))
for endpoint in endpoints:
scrape_endpoint = "http://{}:{}".format(pod_ip, endpoint)
prometheus_endpoint = PrometheusEndpoint(scrape_endpoint, pod_dimensions, whitelist, metric_types,
report_pod_label_owner)
prometheus_endpoint = PrometheusEndpoint(
scrape_endpoint,
pod_dimensions,
whitelist,
metric_types,
report_pod_label_owner)
prometheus_endpoints.append(prometheus_endpoint)
self.log.info("Detected pod endpoint - {} with metadata "
"of {}".format(scrape_endpoint,
@ -169,7 +181,8 @@ class Prometheus(checks.AgentCheck):
try:
services = self.kubernetes_connector.get_request("/api/v1/services")
except Exception as e:
exception_message = "Could not get services from Kubernetes API with error - {}".format(e)
exception_message = \
"Could not get services from Kubernetes API with error - {}".format(e)
self.log.exception(exception_message)
raise Exception(exception_message)
@ -201,7 +214,8 @@ class Prometheus(checks.AgentCheck):
use_k8s_labels, whitelist, metric_types, report_pod_label_owner = \
self._get_monasca_settings(service_name, service_annotations)
except Exception as e:
error_message = "Error parsing monasca annotations on endpoints {} with error - {}. " \
error_message = "Error parsing monasca annotations on endpoints {} " \
"with error - {}. " \
"Skipping scraping metrics".format(endpoints, e)
self.log.error(error_message)
continue
@ -214,8 +228,12 @@ class Prometheus(checks.AgentCheck):
self._get_service_dimensions(service_metadata))
for endpoint in endpoints:
scrape_endpoint = "http://{}:{}".format(cluster_ip, endpoint)
prometheus_endpoint = PrometheusEndpoint(scrape_endpoint, service_dimensions, whitelist, metric_types,
report_pod_label_owner)
prometheus_endpoint = PrometheusEndpoint(
scrape_endpoint,
service_dimensions,
whitelist,
metric_types,
report_pod_label_owner)
prometheus_endpoints.append(prometheus_endpoint)
self.log.info("Detected service endpoint - {} with metadata "
"of {}".format(scrape_endpoint,
@ -232,10 +250,13 @@ class Prometheus(checks.AgentCheck):
metric_types = yaml.safe_load(annotations["monasca.io/metric_types"])
for typ in metric_types:
if metric_types[typ] not in ['rate', 'counter']:
self.log.warn("Ignoring unknown metric type '{}' configured for '{}' on endpoint '{}'".format(
typ, metric_types[typ], resource_name))
self.log.warn(
"Ignoring unknown metric type '{}' configured for '{}'"
" on endpoint '{}'".format(
typ, metric_types[typ], resource_name))
del metric_types[typ]
report_pod_label_owner_annotation = annotations.get("monasca.io/report_pod_label_owner", "false").lower()
report_pod_label_owner_annotation = annotations.get(
"monasca.io/report_pod_label_owner", "false").lower()
report_pod_label_owner = True if report_pod_label_owner_annotation == "true" else False
return use_k8s_labels, whitelist, metric_types, report_pod_label_owner
@ -293,7 +314,10 @@ class Prometheus(checks.AgentCheck):
metric_labels = metric[1]
metric_value = float(metric[2])
if math.isnan(metric_value):
self.log.debug('filtering out NaN value provided for metric %s{%s}', metric_name, metric_labels)
self.log.debug(
'filtering out NaN value provided for metric %s{%s}',
metric_name,
metric_labels)
continue
if endpoint_whitelist is not None and metric_name not in endpoint_whitelist:
continue
@ -319,14 +343,19 @@ class Prometheus(checks.AgentCheck):
metric_dimensions[pod_owner] = pod_owner_name
metric_dimensions["owner_type"] = pod_owner
else:
pod_owner_pair = self.get_pod_owner(pod_name, metric_dimensions['namespace'])
pod_owner_pair = self.get_pod_owner(
pod_name, metric_dimensions['namespace'])
if pod_owner_pair:
pod_owner = pod_owner_pair[0]
pod_owner_name = pod_owner_pair[1]
metric_dimensions[pod_owner] = pod_owner_name
metric_dimensions["owner_type"] = pod_owner
self.k8s_pod_cache[pod_name] = pod_owner, pod_owner_name
metric_func(metric_name, metric_value, dimensions=metric_dimensions, hostname="SUPPRESS")
metric_func(
metric_name,
metric_value,
dimensions=metric_dimensions,
hostname="SUPPRESS")
def report_endpoint_metrics(self, metric_endpoint, endpoint_dimensions, endpoint_whitelist=None,
endpoint_metric_types=None, report_pod_label_owner=False):
@ -340,16 +369,23 @@ class Prometheus(checks.AgentCheck):
if "text/plain" in result_content_type:
try:
metric_families = text_string_to_metric_families(result.text)
self._send_metrics(metric_families, endpoint_dimensions, endpoint_whitelist, endpoint_metric_types,
report_pod_label_owner)
self._send_metrics(
metric_families,
endpoint_dimensions,
endpoint_whitelist,
endpoint_metric_types,
report_pod_label_owner)
except Exception as e:
self.log.error("Error parsing data from {} with error {}".format(metric_endpoint, e))
self.log.error(
"Error parsing data from {} with error {}".format(
metric_endpoint, e))
else:
self.log.error("Unsupported content type - {}".format(result_content_type))
def get_pod_owner(self, pod_name, namespace):
try:
pod = self.kubernetes_connector.get_request("/api/v1/namespaces/{}/pods/{}".format(namespace, pod_name))
pod = self.kubernetes_connector.get_request(
"/api/v1/namespaces/{}/pods/{}".format(namespace, pod_name))
pod_metadata = pod['metadata']
pod_owner, pod_owner_name = utils.get_pod_owner(self.kubernetes_connector, pod_metadata)
if not pod_owner:
@ -357,7 +393,8 @@ class Prometheus(checks.AgentCheck):
return None
return pod_owner, pod_owner_name
except Exception as e:
self.log.info("Could not get pod {} from Kubernetes API with error - {}".format(pod_name, e))
self.log.info(
"Could not get pod {} from Kubernetes API with error - {}".format(pod_name, e))
return None
def initialize_pod_cache(self):
@ -372,9 +409,12 @@ class Prometheus(checks.AgentCheck):
pod_metadata = pod['metadata']
pod_name = pod_metadata['name']
try:
pod_owner, pod_owner_name = utils.get_pod_owner(self.kubernetes_connector, pod_metadata)
pod_owner, pod_owner_name = utils.get_pod_owner(
self.kubernetes_connector, pod_metadata)
except Exception as e:
self.log.info("Error attempting to get pod {} owner with error {}".format(pod_name, e))
self.log.info(
"Error attempting to get pod {} owner with error {}".format(
pod_name, e))
continue
if not pod_owner:
self.log.info("Could not get pod owner for pod {}".format(pod_name))
@ -384,7 +424,13 @@ class Prometheus(checks.AgentCheck):
# Class to hold prometheus endpoint metadata
class PrometheusEndpoint(object):
def __init__(self, scrape_endpoint, dimensions, whitelist, metric_types, report_pod_label_owner):
def __init__(
self,
scrape_endpoint,
dimensions,
whitelist,
metric_types,
report_pod_label_owner):
self.scrape_endpoint = scrape_endpoint
self.dimensions = dimensions
self.whitelist = whitelist

View File

@ -2,7 +2,6 @@
import json
import re
import time
import urllib2
import urlparse
@ -48,11 +47,13 @@ QUEUE_ATTRIBUTES = [
('message_stats/redeliver', 'messages.redeliver_count', float),
('message_stats/redeliver_details/rate', 'messages.redeliver_rate', float)]
EXCHANGE_ATTRIBUTES = [('message_stats/publish_out', 'messages.published_count', float),
('message_stats/publish_out_details/rate', 'messages.published_rate', float),
EXCHANGE_ATTRIBUTES = [
('message_stats/publish_out', 'messages.published_count', float),
('message_stats/publish_out_details/rate', 'messages.published_rate', float),
('message_stats/publish_in', 'messages.received_count', float),
('message_stats/publish_in_details/rate', 'messages.received_rate', float)]
('message_stats/publish_in', 'messages.received_count', float),
('message_stats/publish_in_details/rate', 'messages.received_rate', float)
]
NODE_ATTRIBUTES = [
('fd_used', 'fd_used', float),
@ -192,7 +193,8 @@ class RabbitMQ(checks.AgentCheck):
base_url: the url of the rabbitmq management api (e.g. http://localhost:15672/api)
object_type: either QUEUE_TYPE, EXCHANGE_TYPE or NODE_TYPE
max_detailed: the limit of objects to collect for this type
filters: explicit or regexes filters of specified queues or nodes (specified in the yaml file)
filters: explicit or regexes filters of specified queues or nodes
(specified in the yaml file)
"""
data = self._get_data(urlparse.urljoin(base_url, object_type))
# Make a copy of this list as we will remove items from it at each iteration
@ -202,9 +204,30 @@ class RabbitMQ(checks.AgentCheck):
"""data is a list of nodes or queues:
data = [
{'status': 'running', 'node': 'rabbit@host', 'name': 'queue1', 'consumers': 0, 'vhost': '/', 'backing_queue_status': {'q1': 0, 'q3': 0, 'q2': 0, 'q4': 0, 'avg_ack_egress_rate': 0.0, 'ram_msg_count': 0, 'ram_ack_count': 0, 'len': 0, 'persistent_count': 0, 'target_ram_count': 'infinity', 'next_seq_id': 0, 'delta': ['delta', 'undefined', 0, 'undefined'], 'pending_acks': 0, 'avg_ack_ingress_rate': 0.0, 'avg_egress_rate': 0.0, 'avg_ingress_rate': 0.0}, 'durable': True, 'idle_since': '2013-10-03 13:38:18', 'exclusive_consumer_tag': '', 'arguments': {}, 'memory': 10956, 'policy': '', 'auto_delete': False},
{'status': 'running', 'node': 'rabbit@host, 'name': 'queue10', 'consumers': 0, 'vhost': '/', 'backing_queue_status': {'q1': 0, 'q3': 0, 'q2': 0, 'q4': 0, 'avg_ack_egress_rate': 0.0, 'ram_msg_count': 0, 'ram_ack_count': 0, 'len': 0, 'persistent_count': 0, 'target_ram_count': 'infinity', 'next_seq_id': 0, 'delta': ['delta', 'undefined', 0, 'undefined'], 'pending_acks': 0, 'avg_ack_ingress_rate': 0.0, 'avg_egress_rate': 0.0, 'avg_ingress_rate': 0.0}, 'durable': True, 'idle_since': '2013-10-03 13:38:18', 'exclusive_consumer_tag': '', 'arguments': {}, 'memory': 10956, 'policy': '', 'auto_delete': False},
{'status': 'running', 'node': 'rabbit@host', 'name': 'queue11', 'consumers': 0, 'vhost': '/', 'backing_queue_status': {'q1': 0, 'q3': 0, 'q2': 0, 'q4': 0, 'avg_ack_egress_rate': 0.0, 'ram_msg_count': 0, 'ram_ack_count': 0, 'len': 0, 'persistent_count': 0, 'target_ram_count': 'infinity', 'next_seq_id': 0, 'delta': ['delta', 'undefined', 0, 'undefined'], 'pending_acks': 0, 'avg_ack_ingress_rate': 0.0, 'avg_egress_rate': 0.0, 'avg_ingress_rate': 0.0}, 'durable': True, 'idle_since': '2013-10-03 13:38:18', 'exclusive_consumer_tag': '', 'arguments': {}, 'memory': 10956, 'policy': '', 'auto_delete': False},
{ 'status': 'running', 'node': 'rabbit@host', 'name': 'queue1', 'consumers': 0,
'vhost': '/', 'backing_queue_status': {'q1': 0, 'q3': 0, 'q2': 0, 'q4': 0,
'avg_ack_egress_rate': 0.0, 'ram_msg_count': 0, 'ram_ack_count': 0, 'len': 0,
'persistent_count': 0, 'target_ram_count': 'infinity', 'next_seq_id': 0,
'delta': ['delta', 'undefined', 0, 'undefined'], 'pending_acks': 0,
'avg_ack_ingress_rate': 0.0, 'avg_egress_rate': 0.0, 'avg_ingress_rate': 0.0},
'durable': True, 'idle_since': '2013-10-03 13:38:18', 'exclusive_consumer_tag': '',
'arguments': {}, 'memory': 10956, 'policy': '', 'auto_delete': False},
{'status': 'running', 'node': 'rabbit@host, 'name': 'queue10', 'consumers': 0,
'vhost': '/', 'backing_queue_status': {'q1': 0, 'q3': 0, 'q2': 0, 'q4': 0,
'avg_ack_egress_rate': 0.0, 'ram_msg_count': 0, 'ram_ack_count': 0, 'len': 0,
'persistent_count': 0, 'target_ram_count': 'infinity', 'next_seq_id': 0,
'delta': ['delta', 'undefined', 0, 'undefined'], 'pending_acks': 0,
'avg_ack_ingress_rate': 0.0, 'avg_egress_rate': 0.0, 'avg_ingress_rate': 0.0},
'durable': True, 'idle_since': '2013-10-03 13:38:18', 'exclusive_consumer_tag': '',
'arguments': {}, 'memory': 10956, 'policy': '', 'auto_delete': False},
{'status': 'running', 'node': 'rabbit@host', 'name': 'queue11', 'consumers': 0,
'vhost': '/', 'backing_queue_status': {'q1': 0, 'q3': 0, 'q2': 0, 'q4': 0,
'avg_ack_egress_rate': 0.0, 'ram_msg_count': 0, 'ram_ack_count': 0, 'len': 0,
'persistent_count': 0, 'target_ram_count': 'infinity', 'next_seq_id': 0,
'delta': ['delta', 'undefined', 0, 'undefined'], 'pending_acks': 0,
'avg_ack_ingress_rate': 0.0, 'avg_egress_rate': 0.0, 'avg_ingress_rate': 0.0},
'durable': True, 'idle_since': '2013-10-03 13:38:18', 'exclusive_consumer_tag': '',
'arguments': {}, 'memory': 10956, 'policy': '', 'auto_delete': False},
...
]
"""
@ -233,7 +256,9 @@ class RabbitMQ(checks.AgentCheck):
if len(data) > max_detailed:
self.log.warning(
"Too many %s to fetch. Increase max_detailed_ in the config or results will be truncated." % object_type)
"Too many %s to fetch. Increase max_detailed_ in the config"
"or results will be truncated." %
object_type)
for data_line in data[:max_detailed]:
# We truncate the list of nodes/queues if it's above the limit
@ -275,8 +300,15 @@ class RabbitMQ(checks.AgentCheck):
if value is None:
value = 0.0
try:
self.log.debug("Collected data for %s: metric name: %s: value: %f dimensions: %s" % (object_type, metric_name, operation(value), str(dimensions)))
self.gauge('rabbitmq.%s.%s' % (METRIC_SUFFIX[object_type], metric_name), operation(value), dimensions=dimensions)
self.log.debug(
"Collected data for %s: metric name: %s: value: %f dimensions: %s" %
(object_type, metric_name, operation(value), str(dimensions)))
self.gauge(
'rabbitmq.%s.%s' %
(METRIC_SUFFIX[object_type],
metric_name),
operation(value),
dimensions=dimensions)
except ValueError:
self.log.exception("Caught ValueError for %s %s = %s with dimensions: %s" % (
METRIC_SUFFIX[object_type], attribute, value, dimensions))

View File

@ -125,7 +125,8 @@ class Redis(AgentCheck):
except TypeError:
raise Exception(
"You need a redis library that supports authenticated connections. Try sudo easy_install redis.")
"You need a redis library that supports authenticated connections."
"Try sudo easy_install redis.")
return self.connections[key]
@ -152,7 +153,8 @@ class Redis(AgentCheck):
import redis
raise Exception(
"""Unable to run the info command. This is probably an issue with your version of the python-redis library.
"""Unable to run the info command. This is probably an issue with your version
of the python-redis library.
Minimum required version: 2.4.11
Your current version: %s
Please upgrade to a newer version by running sudo easy_install redis""" %
@ -189,11 +191,13 @@ class Redis(AgentCheck):
def check(self, instance):
try:
import redis
import redis # noqa
except ImportError:
raise Exception(
'Python Redis Module can not be imported. Please check the installation instruction on the Datadog Website')
'Python Redis Module can not be imported. Please check the installation'
'instruction on the Datadog Website')
if ("host" not in instance or "port" not in instance) and "unix_socket_path" not in instance:
if ("host" not in instance or "port" not in instance) \
and "unix_socket_path" not in instance:
raise Exception("You must specify a host/port couple or a unix_socket_path")
self._check_db(instance)

View File

@ -78,13 +78,18 @@ class TCPCheck(ServicesCheck):
length = int((time.time() - start) * 1000)
if "timed out" in str(e):
# The connection timed out because it took more time than the system tcp stack allows
# The connection timed out because it took more time than the system tcp
# stack allows
self.log.warning(
"The connection timed out because it took more time than the system tcp stack allows. You might want to change this setting to allow longer timeouts")
"The connection timed out because it took more time than"
"the system tcp stack allows. You might want to change this"
"setting to allow longer timeouts")
self.log.info("System tcp timeout. Assuming that the checked system is down")
return Status.DOWN, """Socket error: %s.
The connection timed out after %s ms because it took more time than the system tcp stack allows.
You might want to change this setting to allow longer timeouts""" % (str(e), length)
The connection timed out after %s ms
because it took more time than the system tcp stack allows.
You might want to change this setting to allow longer timeouts
""" % (str(e), length)
else:
self.log.info("%s:%s is DOWN (%s). Connection failed after %s ms" %
(addr, port, str(e), length))

View File

@ -5,13 +5,9 @@ Generic VCenter check. This check allows you to specify particular metrics that
you want from vCenter in your configuration.
"""
import json
import logging as log
from monasca_agent.collector.checks import AgentCheck
from monasca_agent.common.config import Config
from oslo_vmware import api
from oslo_vmware import vim_util
import requests
import traceback
CLUSTER_COMPUTE_PROPERTIES = ["name", "host", "datastore"]
@ -158,23 +154,23 @@ class VCenterCheck(AgentCheck):
self._build_resource_dict(mor)
def _get_sample(self, samples, counter_name, is_summation=False):
res = 0
num_samples = 0
res = 0
num_samples = 0
for cn in samples:
if cn.startswith(counter_name):
vals = samples[cn]
if vals:
for val in vals:
i_val = int(val)
if i_val != -1:
res += i_val
num_samples += 1
for cn in samples:
if cn.startswith(counter_name):
vals = samples[cn]
if vals:
for val in vals:
i_val = int(val)
if i_val != -1:
res += i_val
num_samples += 1
if not is_summation and num_samples:
res /= num_samples
if not is_summation and num_samples:
res /= num_samples
return res
return res
def _get_shared_datastores(self, datastore_stats, managed_cluster):
"""Method to find the shared datastores associated with the cluster
@ -608,8 +604,8 @@ class VcenterOperations(object):
name = self.counters[perf_metric_series_csv.id.counterId]
instance = perf_metric_series_csv.id.instance
if (instance is not None and
len(instance) > 0 and
instance is not "*"):
len(instance) > 0 and
instance is not "*"):
name += "." + instance
perf_result[name] = perf_metric_series_csv.value

View File

@ -12,7 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
"""
VCenter plugin that returns only vm status. Takes no instances, reads from a single configured VCenter
VCenter plugin that returns only vm status. Takes no instances,
reads from a single configured VCenter
"""
from monasca_agent.collector.checks import AgentCheck
@ -36,8 +37,15 @@ class VcenterSlim(AgentCheck):
session = self.get_api_session()
result = session.invoke_api(vim_util, "get_objects", session.vim, "VirtualMachine", self.max_objects,
["runtime.connectionState", "config.annotation", "config.instanceUuid"])
result = session.invoke_api(
vim_util,
"get_objects",
session.vim,
"VirtualMachine",
self.max_objects,
["runtime.connectionState",
"config.annotation",
"config.instanceUuid"])
for vm in result[0]:
vm_status = 1
# vm_name = vm.obj.value

View File

@ -19,8 +19,10 @@ PROJECTION_METRICS_QUERY = "SELECT projection_name, wos_used_bytes, ros_count, "
"COALESCE(tuple_mover_mergeouts, 0) tuple_mover_mergeouts " \
"FROM projection_storage " \
"LEFT JOIN (SELECT projection_id, " \
"SUM(case when operation_name = 'Moveout' then 1 else 0 end) tuple_mover_moveouts, " \
"SUM(case when operation_name = 'Mergeout' then 1 else 0 end) tuple_mover_mergeouts " \
"SUM(case when operation_name = " \
"'Moveout' then 1 else 0 end) tuple_mover_moveouts, " \
"SUM(case when operation_name = " \
"'Mergeout' then 1 else 0 end) tuple_mover_mergeouts " \
"FROM tuple_mover_operations " \
"WHERE node_name = '{0}' and is_executing = 't' " \
"GROUP BY projection_id) tm " \
@ -34,7 +36,8 @@ RESOURCE_METRICS_QUERY = "SELECT COALESCE(request_queue_depth, 0) request_queue_
"FROM resource_usage " \
"WHERE node_name = '{0}';"
RESOURCE_POOL_METRICS_QUERY = "SELECT pool_name, memory_size_actual_kb, memory_inuse_kb, running_query_count, " \
RESOURCE_POOL_METRICS_QUERY = "SELECT pool_name, memory_size_actual_kb, " \
"memory_inuse_kb, running_query_count, " \
"COALESCE(rejection_count, 0) rejection_count " \
"FROM resource_pool_status " \
"LEFT JOIN (" \
@ -95,14 +98,20 @@ class Vertica(checks.AgentCheck):
self._report_resource_pool_metrics(results[4], dimensions)
def _query_database(self, user, password, timeout, query):
stdout, stderr, return_code = timeout_command(["/opt/vertica/bin/vsql", "-U", user, "-w", password, "-A", "-R",
"|", "-t", "-F", ",", "-x"], timeout, command_input=query)
stdout, stderr, return_code = timeout_command(["/opt/vertica/bin/vsql",
"-U", user, "-w",
password, "-A", "-R",
"|", "-t", "-F", ",", "-x"],
timeout,
command_input=query)
if return_code == 0:
# remove trailing newline
stdout = stdout.rstrip()
return stdout, 0
else:
self.log.error("Error querying vertica with return code of {0} and error {1}".format(return_code, stderr))
self.log.error(
"Error querying vertica with return code of {0} and error {1}".format(
return_code, stderr))
return stderr, 1
def _build_query(self, node_name):
@ -115,13 +124,18 @@ class Vertica(checks.AgentCheck):
return query
def _results_to_dict(self, results):
return [dict(entry.split(',') for entry in dictionary.split('|')) for dictionary in results.split('||')]
return [dict(entry.split(',') for entry in dictionary.split('|'))
for dictionary in results.split('||')]
def _report_node_status(self, results, dimensions):
result = self._results_to_dict(results)
node_status = result[0]['node_state']
status_metric = 0 if node_status == 'UP' else 1
self.gauge('vertica.node_status', status_metric, dimensions=dimensions, value_meta=result[0])
self.gauge(
'vertica.node_status',
status_metric,
dimensions=dimensions,
value_meta=result[0])
def _report_projection_metrics(self, results, dimensions):
results = self._results_to_dict(results)
@ -135,11 +149,12 @@ class Vertica(checks.AgentCheck):
result['wos_used_bytes'] = '0'
self.gauge(projection_metric_name + 'wos_used_bytes', int(result['wos_used_bytes']),
dimensions=projection_dimensions)
self.gauge(projection_metric_name + 'ros_count', int(result['ros_count']), dimensions=projection_dimensions)
self.rate(projection_metric_name + 'tuple_mover_moveouts', int(result['tuple_mover_moveouts']),
dimensions=projection_dimensions)
self.rate(projection_metric_name + 'tuple_mover_mergeouts', int(result['tuple_mover_mergeouts']),
dimensions=projection_dimensions)
self.gauge(projection_metric_name + 'ros_count',
int(result['ros_count']), dimensions=projection_dimensions)
self.rate(projection_metric_name + 'tuple_mover_moveouts',
int(result['tuple_mover_moveouts']), dimensions=projection_dimensions)
self.rate(projection_metric_name + 'tuple_mover_mergeouts',
int(result['tuple_mover_mergeouts']), dimensions=projection_dimensions)
def _report_resource_metrics(self, results, dimensions):
results = self._results_to_dict(results)
@ -147,13 +162,21 @@ class Vertica(checks.AgentCheck):
resource_metrics = results[0]
for metric_name, metric_value in resource_metrics.items():
if metric_name in ['resource_rejections', 'disk_space_rejections']:
self.rate(resource_metric_name + metric_name, int(metric_value), dimensions=dimensions)
self.rate(
resource_metric_name +
metric_name,
int(metric_value),
dimensions=dimensions)
else:
if metric_name == 'wos_used_bytes' and not metric_value:
# when nothing has been written, wos_used_bytes is empty.
# Needs to convert it to zero.
metric_value = '0'
self.gauge(resource_metric_name + metric_name, int(metric_value), dimensions=dimensions)
self.gauge(
resource_metric_name +
metric_name,
int(metric_value),
dimensions=dimensions)
def _report_resource_pool_metrics(self, results, dimensions):
results = self._results_to_dict(results)
@ -161,12 +184,12 @@ class Vertica(checks.AgentCheck):
for result in results:
resource_pool_dimensions = dimensions.copy()
resource_pool_dimensions['resource_pool'] = result['pool_name']
self.gauge(resource_pool_metric_name + 'memory_size_actual_kb', int(result['memory_size_actual_kb']),
dimensions=resource_pool_dimensions)
self.gauge(resource_pool_metric_name + 'memory_inuse_kb', int(result['memory_inuse_kb']),
dimensions=resource_pool_dimensions)
self.gauge(resource_pool_metric_name + 'running_query_count', int(result['running_query_count']),
dimensions=resource_pool_dimensions)
self.gauge(resource_pool_metric_name + 'memory_size_actual_kb',
int(result['memory_size_actual_kb']), dimensions=resource_pool_dimensions)
self.gauge(resource_pool_metric_name + 'memory_inuse_kb',
int(result['memory_inuse_kb']), dimensions=resource_pool_dimensions)
self.gauge(resource_pool_metric_name + 'running_query_count',
int(result['running_query_count']), dimensions=resource_pool_dimensions)
self.rate(resource_pool_metric_name + 'rejection_count', int(result['rejection_count']),
dimensions=resource_pool_dimensions)

View File

@ -38,7 +38,8 @@ class Zookeeper(AgentCheck):
host = instance.get('host', 'localhost')
port = int(instance.get('port', 2181))
timeout = float(instance.get('timeout', 3.0))
dimensions = self._set_dimensions({'component': 'zookeeper', 'service': 'zookeeper'}, instance)
dimensions = self._set_dimensions(
{'component': 'zookeeper', 'service': 'zookeeper'}, instance)
sock = socket.socket()
sock.settimeout(timeout)

View File

@ -92,7 +92,8 @@ class CollectorDaemon(monasca_agent.common.daemon.Daemon):
# Load the checks_d checks
checksd = util.load_check_directory()
self.collector = checks.collector.Collector(config, monasca_agent.common.emitter.http_emitter, checksd)
self.collector = checks.collector.Collector(
config, monasca_agent.common.emitter.http_emitter, checksd)
check_frequency = int(config['check_freq'])
@ -146,9 +147,10 @@ class CollectorDaemon(monasca_agent.common.daemon.Daemon):
if collection_time < check_frequency:
time.sleep(check_frequency - collection_time)
else:
log.info("Collection took {0} which is as long or longer then the configured collection frequency "
"of {1}. Starting collection again without waiting in result.".format(collection_time,
check_frequency))
log.info(
"Collection took {0} which is as long or longer then the configured "
"collection frequency of {1}. Starting collection again without waiting "
"in result.".format(collection_time, check_frequency))
self._stop(exitTimeout)
# Explicitly kill the process, because it might be running
@ -173,8 +175,9 @@ def main():
if collector_restart_interval in range(1, 49):
pass
else:
log.error("Collector_restart_interval = {0} is out of legal range"
" [1, 48]. Reset collector_restart_interval to 24".format(collector_restart_interval))
log.error(
"Collector_restart_interval = {0} is out of legal range"
" [1, 48]. Reset collector_restart_interval to 24".format(collector_restart_interval))
collector_restart_interval = 24
COMMANDS = [
@ -281,7 +284,8 @@ def main():
print("You have to specify one of the following commands:")
for command, desc in jmxfetch.JMX_LIST_COMMANDS.items():
print(" - %s [OPTIONAL: LIST OF CHECKS]: %s" % (command, desc))
print("Example: sudo /etc/init.d/monasca-agent jmx list_matching_attributes tomcat jmx solr")
print("Example: sudo /etc/init.d/monasca-agent jmx list_matching_attributes "
"tomcat jmx solr")
print("\n")
else:
@ -297,7 +301,9 @@ def main():
checks_list,
reporter="console")
if not should_run:
print("Couldn't find any valid JMX configuration in your conf.d directory: %s" % confd_path)
print(
"Couldn't find any valid JMX configuration in your conf.d directory: %s" %
confd_path)
print("Have you enabled any JMX checks ?")
return 0

View File

@ -35,12 +35,18 @@ JMX_CHECKS = [
]
JMX_COLLECT_COMMAND = 'collect'
JMX_LIST_COMMANDS = {
'list_everything': 'List every attributes available that has a type supported by JMXFetch',
'list_collected_attributes': 'List attributes that will actually be collected by your current instances configuration',
'list_matching_attributes': 'List attributes that match at least one of your instances configuration',
'list_not_matching_attributes': "List attributes that don't match any of your instances configuration",
'list_limited_attributes': "List attributes that do match one of your instances configuration but that are not being collected because it would exceed the number of metrics that can be collected",
JMX_COLLECT_COMMAND: "Start the collection of metrics based on your current configuration and display them in the console"}
"list_everything": "List every attributes available that has a type supported by JMXFetch",
"list_collected_attributes": "List attributes that will actually be collected by your current "
"instances configuration",
"list_matching_attributes": "List attributes that match at least one of your instances "
"configuration",
"list_not_matching_attributes": "List attributes that don't match any of your instances "
"configuration",
"list_limited_attributes": "List attributes that do match one of your instances "
"configuration but that are not being collected because it would "
"exceed the number of metrics that can be collected",
JMX_COLLECT_COMMAND: "Start the collection of metrics based on your current "
"configuration and display them in the console"}
PYTHON_JMX_STATUS_FILE = 'jmx_status_python.yaml'
@ -168,7 +174,8 @@ class JMXFetch(object):
instances = check_config.get('instances', [])
if not isinstance(instances, list) or len(instances) == 0:
raise InvalidJMXConfiguration(
'You need to have at least one instance defined in the YAML file for this check')
'You need to have at least one instance defined in the YAML '
'file for this check')
for inst in instances:
if not isinstance(inst, dict):
@ -184,7 +191,8 @@ class JMXFetch(object):
if conf is None:
log.warning(
"%s doesn't have a 'conf' section. Only basic JVM metrics will be collected. %s" %
"%s doesn't have a 'conf' section. Only basic JVM metrics will "
"be collected. %s" %
(inst, LINK_TO_DOC))
else:
if not isinstance(conf, list) or len(conf) == 0:

View File

@ -14,7 +14,6 @@
# under the License.
"""Implementation of Inspector abstraction for Hyper-V"""
from oslo_config import cfg
from oslo_utils import units
from monasca_agent.collector.virt.hyperv import utilsv2

View File

@ -106,7 +106,10 @@ class MetricsAggregator(object):
cur_time = time()
if timestamp is not None:
if cur_time - int(timestamp) > self.recent_point_threshold:
log.debug("Discarding {0} - ts = {1}, current ts = {2} ".format(name, timestamp, cur_time))
log.debug(
"Discarding {0} - ts = {1}, current ts = {2} ".format(name,
timestamp,
cur_time))
self.num_discarded_old_points += 1
return
else:

View File

@ -37,7 +37,8 @@ class Config(object):
elif os.path.exists(os.getcwd() + '/agent.yaml'):
self._configFile = os.getcwd() + '/agent.yaml'
else:
error_msg = 'No config file found at {0} nor in the working directory.'.format(DEFAULT_CONFIG_FILE)
error_msg = 'No config file found at {0} nor in the working directory.'.format(
DEFAULT_CONFIG_FILE)
log.error(error_msg)
raise IOError(error_msg)
@ -146,7 +147,8 @@ class Config(object):
break
if not valid_instances:
raise Exception(
'You need to have at least one instance defined in the YAML file for this check')
'You need to have at least one instance defined'
'in the YAML file for this check')
else:
return check_config
finally:

View File

@ -214,16 +214,19 @@ class Daemon(object):
# Check for the existence of a process with the pid
try:
# os.kill(pid, 0) will raise an OSError exception if the process
# does not exist, or if access to the process is denied (access denied will be an EPERM error).
# does not exist, or if access to the process is denied
# (access denied will be an EPERM error).
# If we get an OSError that isn't an EPERM error, the process
# does not exist.
# (from http://stackoverflow.com/questions/568271/check-if-pid-is-not-in-use-in-python,
# (from http://stackoverflow.com/questions/568271/
# check-if-pid-is-not-in-use-in-python,
# Giampaolo's answer)
os.kill(pid, 0)
except OSError as e:
if e.errno != errno.EPERM:
message = '%s pidfile contains pid %s, but no running process could be found' % (
self.__class__.__name__, pid)
message = \
'%s pidfile contains pid %s, but no running process could be found' % (
self.__class__.__name__, pid)
exit_code = 1
else:
message = '%s is running with pid %s' % (self.__class__.__name__, pid)

View File

@ -47,7 +47,8 @@ def get_session(**kwargs):
- using **Keystone v3** be careful with the scope of authentication.
For more details about scopes refer to identity_tokens_ and v3_identity_
.. _v3_api: https://developer.openstack.org/api-ref/identity/v3/index.html?expanded=token-authentication-with-scoped-authorization-detail
.. _v3_api: https://developer.openstack.org/api-ref/identity/v3/index.html?expanded=token-authe
ntication-with-scoped-authorization-detail
.. _identity_tokens: https://docs.openstack.org/admin-guide/identity-tokens.html
In overall:

View File

@ -107,7 +107,8 @@ class Rate(Metric):
# redefine flush method to calculate rate from metrics
def flush(self):
# need at least two timestamps to determine rate
# is the second one is missing then the first is kept as start value for the subsequent interval
# is the second one is missing then the first is kept as start value for
# the subsequent interval
if self.start_timestamp is None or self.timestamp is None:
return []
@ -116,8 +117,13 @@ class Rate(Metric):
try:
rate = delta_v / float(delta_t)
except ZeroDivisionError:
log.warning('Conflicting values reported for metric %s with dimensions %s at time %d: (%f, %f)', self.metric['name'],
self.metric['dimensions'], self.timestamp, self.start_value, self.value)
log.warning(
'Conflicting values reported for metric %s with dimensions %s at time %d: (%f, %f)',
self.metric['name'],
self.metric['dimensions'],
self.timestamp,
self.start_value,
self.value)
# skip this measurement, but keep value for next cycle
self.start_value = self.value

View File

@ -27,7 +27,8 @@ log = logging.getLogger(__name__)
VALID_HOSTNAME_RFC_1123_PATTERN = re.compile(
r"^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])$")
r"^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*"
"([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])$")
MAX_HOSTNAME_LEN = 255
LOGGING_MAX_BYTES = 5 * 1024 * 1024
@ -285,11 +286,16 @@ def get_uuid():
def timeout_command(command, timeout, command_input=None):
# call shell-command with timeout (in seconds) and stdinput for the command (optional)
# returns None if timeout or the command output.
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE)
command_timer = threading.Timer(timeout, process.kill)
try:
command_timer.start()
stdout, stderr = process.communicate(input=command_input.encode() if command_input else None)
stdout, stderr = process.communicate(
input=command_input.encode() if command_input else None)
return_code = process.returncode
return stdout, stderr, return_code
finally:
@ -421,9 +427,11 @@ def get_hostname():
if hostname is None:
log.critical(
'Unable to reliably determine host name. You can define one in agent.yaml or in your hosts file')
'Unable to reliably determine host name. You can define one in agent.yaml '
'or in your hosts file')
raise Exception(
'Unable to reliably determine host name. You can define one in agent.yaml or in your hosts file')
'Unable to reliably determine host name. You can define one in agent.yaml '
'or in your hosts file')
else:
return hostname
@ -470,7 +478,8 @@ def load_check_directory():
confd_path = paths.get_confd_path()
except PathNotFound as e:
log.error(
"No conf.d folder found at '%s' or in the directory where the Agent is currently deployed.\n" %
"No conf.d folder found at '%s' or in the directory where the Agent is "
"currently deployed.\n" %
e.args[0])
sys.exit(3)
@ -581,7 +590,8 @@ def load_check_directory():
def initialize_logging(logger_name):
try:
log_format = '%%(asctime)s | %%(levelname)s | %s | %%(name)s(%%(filename)s:%%(lineno)s) | %%(message)s' % logger_name
log_format = '%%(asctime)s | %%(levelname)s | %s | %%(name)s(%%(filename)s:%%(lineno)s) '
'| %%(message)s' % logger_name
log_date_format = "%Y-%m-%d %H:%M:%S %Z"
config = configuration.Config()
logging_config = config.get_config(sections='Logging')
@ -614,7 +624,8 @@ def initialize_logging(logger_name):
# set up syslog
if logging_config['log_to_syslog']:
try:
syslog_format = '%s[%%(process)d]: %%(levelname)s (%%(filename)s:%%(lineno)s): %%(message)s' % logger_name
syslog_format = '%s[%%(process)d]: %%(levelname)s (%%(filename)s:%%(lineno)s): '
'%%(message)s' % logger_name
from logging.handlers import SysLogHandler
if logging_config['syslog_host'] is not None and logging_config[

View File

@ -171,7 +171,9 @@ class MonascaAPI(object):
log.info("Current measurements in queue: {0} of {1}".format(
self._current_number_measurements, self._max_measurement_buffer_size))
log.info("A message will be logged for every {0} messages queued.".format(MonascaAPI.LOG_INTERVAL))
log.info(
"A message will be logged for every {0} messages queued.".format(
MonascaAPI.LOG_INTERVAL))
self._log_interval_remaining = MonascaAPI.LOG_INTERVAL
else:
self._log_interval_remaining -= 1

View File

@ -34,10 +34,14 @@ class MonascaStatsd(object):
config = cfg.Config()
statsd_config = config.get_config(['Main', 'Statsd'])
# Create the aggregator (which is the point of communication between the server and reporting threads.
aggregator = agg.MetricsAggregator(util.get_hostname(),
recent_point_threshold=statsd_config['recent_point_threshold'],
tenant_id=statsd_config.get('global_delegated_tenant', None))
# Create the aggregator (which is the point of communication between the
# server and reporting threads.
aggregator = agg.MetricsAggregator(
util.get_hostname(),
recent_point_threshold=statsd_config['recent_point_threshold'],
tenant_id=statsd_config.get(
'global_delegated_tenant',
None))
# Start the reporting thread.
interval = int(statsd_config['monasca_statsd_interval'])
@ -54,9 +58,13 @@ class MonascaStatsd(object):
else:
server_host = 'localhost'
self.server = udp.Server(aggregator, server_host, statsd_config['monasca_statsd_port'],
forward_to_host=statsd_config.get('monasca_statsd_forward_host'),
forward_to_port=int(statsd_config.get('monasca_statsd_forward_port')))
self.server = udp.Server(
aggregator,
server_host,
statsd_config['monasca_statsd_port'],
forward_to_host=statsd_config.get('monasca_statsd_forward_host'),
forward_to_port=int(
statsd_config.get('monasca_statsd_forward_port')))
def _handle_sigterm(self, signum, frame):
log.debug("Caught sigterm. Stopping run loop.")
@ -88,9 +96,13 @@ class MonascaStatsd(object):
def main():
"""The main entry point for the unix version of monasca_statsd. """
parser = argparse.ArgumentParser(description='Monasca statsd - statsd server supporting metric dimensions')
parser.add_argument('--config', '--config-file', '-c',
help="Location for an alternate config rather than using the default config location.")
parser = argparse.ArgumentParser(
description='Monasca statsd - statsd server supporting metric dimensions')
parser.add_argument(
'--config',
'--config-file',
'-c',
help="Location for an alternate config rather than using the default config location.")
args = parser.parse_args()
monasca_statsd = MonascaStatsd(args.config)

View File

@ -68,7 +68,8 @@ class Reporter(threading.Thread):
except Exception:
log.exception("Error running emitter.")
should_log = self.flush_count <= FLUSH_LOGGING_INITIAL or self.log_count <= FLUSH_LOGGING_COUNT
should_log = self.flush_count <= FLUSH_LOGGING_INITIAL \
or self.log_count <= FLUSH_LOGGING_COUNT
log_func = log.info
if not should_log:
log_func = log.debug

View File

@ -42,7 +42,8 @@ class Server(object):
forward_to_port = 8125
log.info(
"External statsd forwarding enabled. All packets received will be forwarded to %s:%s" %
"External statsd forwarding enabled. All packets received will"
"be forwarded to %s:%s" %
(forward_to_host, forward_to_port))
try:
self.forward_udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

View File

@ -15,8 +15,8 @@ class Plugins(collections.defaultdict):
"""A container for the plugin configurations used by the monasca-agent.
This is essentially a defaultdict(dict) but put into a class primarily to make the interface clear, also
to add a couple of helper methods.
This is essentially a defaultdict(dict) but put into a class primarily to make the
interface clear, also to add a couple of helper methods.
Each plugin config is stored with the key being its config name (excluding .yaml).
The value a dict which will convert to yaml.
"""
@ -33,13 +33,14 @@ class Plugins(collections.defaultdict):
"""Do a deep merge with precedence going to other (as is the case with update).
"""
# Implemented as a function so it can be used for arbitrary dictionaries not just self, this is needed
# for the recursive nature of the merge.
# Implemented as a function so it can be used for arbitrary dictionaries not just self,
# this is needed for the recursive nature of the merge.
deep_merge(self, other)
def deep_merge(adict, other):
"""A recursive merge of two dictionaries including combining of any lists within the data structure.
"""A recursive merge of two dictionaries including combining of any lists within the data
structure.
"""
for key, value in other.items():
@ -53,8 +54,10 @@ def deep_merge(adict, other):
def merge_by_name(first, second):
"""Merge a list of dictionaries replacing any dictionaries with the same 'name' value rather than merging.
The precedence goes to first.
"""Merge a list of dictionaries replacing any dictionaries with the same 'name' value rather
than merging.
The precedence goes to first.
"""
first_names = [i['name'] for i in first if 'name' in i]
for item in second:

View File

@ -1,10 +1,10 @@
# (C) Copyright 2015,2016 Hewlett Packard Enterprise Development Company LP
from args_plugin import ArgsPlugin
from plugin import Plugin
from service_plugin import ServicePlugin
from utils import find_process_cmdline
from utils import find_process_name
from utils import find_process_service
from utils import watch_process
from utils import watch_process_by_username
from args_plugin import ArgsPlugin # noqa
from plugin import Plugin # noqa
from service_plugin import ServicePlugin # noqa
from utils import find_process_cmdline # noqa
from utils import find_process_name # noqa
from utils import find_process_service # noqa
from utils import watch_process # noqa
from utils import watch_process_by_username # noqa

View File

@ -9,16 +9,20 @@ log = logging.getLogger(__name__)
class ArgsPlugin(Plugin):
"""Base plugin for detection plugins that take arguments for configuration rather than do detection."""
"""Base plugin for detection plugins that take arguments for configuration rather than do
detection.
"""
def _build_instance(self, arg_list):
"""If a value for each arg in the arg_list was specified build it into an instance dictionary. Also check for dimensions and add if they were specified.
"""If a value for each arg in the arg_list was specified build it into an instance
dictionary. Also check for dimensions and add if they were specified.
:param arg_list: Arguments to include
:return: instance dictionary
"""
instance = {}
if 'dimensions' in self.args:
instance['dimensions'] = dict(item.strip().split(":") for item in self.args['dimensions'].split(","))
instance['dimensions'] = dict(item.strip().split(":")
for item in self.args['dimensions'].split(","))
for arg in arg_list:
if arg in self.args:
instance[arg] = self.args[arg]

View File

@ -95,7 +95,8 @@ class Apache(monasca_setup.detection.Plugin):
[self._apache_process_name], 'apache'))
log.info("\tWatching the apache webserver process.")
error_msg = '\n\t*** The Apache plugin is not configured ***\n\tPlease correct and re-run monasca-setup.'
error_msg = '\n\t*** The Apache plugin is not configured ***\n\tPlease correct and re-run'
'monasca-setup.'
# Attempt login, requires either an empty root password from localhost
# or relying on a configured /root/.apache.cnf
if self.dependencies_installed():
@ -167,7 +168,9 @@ class Apache(monasca_setup.detection.Plugin):
log.error(exception_msg)
raise Exception(exception_msg)
else:
log.error('\tThe dependencies for Apache Web Server are not installed or unavailable.' + error_msg)
log.error(
'\tThe dependencies for Apache Web Server are not installed or unavailable.' +
error_msg)
return config

View File

@ -13,9 +13,6 @@
# under the License.
import logging
import os
import yaml
import monasca_setup.agent_config
import monasca_setup.detection

View File

@ -44,7 +44,7 @@ class Congestion(monasca_setup.detection.Plugin):
try:
cmd = proc.as_dict(['cmdline'])['cmdline']
if (len(cmd) > 2 and 'python' in cmd[0] and
'nova-compute' in cmd[1]):
'nova-compute' in cmd[1]):
conf_indexes = [cmd.index(y)
for y in cmd if 'nova.conf' in y]
if not conf_indexes:
@ -82,16 +82,16 @@ class Congestion(monasca_setup.detection.Plugin):
's_factor': s_factor,
'collect_period': collect_period}
for option in cfg_needed:
init_config[option] = nova_cfg.get(
cfg_section, cfg_needed[option])
init_config[option] = nova_cfg.get(
cfg_section, cfg_needed[option])
init_config['region_name'] = nova_cfg.get(
region_name_sec, 'region_name')
# Create an identity URI (again, slightly different for Devstack)
if nova_cfg.has_option(cfg_section, 'auth_url'):
init_config['auth_url'] = nova_cfg.get(cfg_section, 'auth_url')
init_config['auth_url'] = nova_cfg.get(cfg_section, 'auth_url')
else:
init_config['auth_url'] = nova_cfg.get(
cfg_section, 'identity_uri')
init_config['auth_url'] = nova_cfg.get(
cfg_section, 'identity_uri')
config = monasca_setup.agent_config.Plugins()
config['congestion'] = {

View File

@ -26,9 +26,17 @@ class HAProxy(monasca_setup.detection.Plugin):
config = monasca_setup.agent_config.Plugins()
log.info("\tEnabling HAProxy process watching")
config.merge(monasca_setup.detection.watch_process(['haproxy'], 'haproxy', exact_match=False))
config.merge(
monasca_setup.detection.watch_process(
['haproxy'],
'haproxy',
exact_match=False))
if monasca_setup.detection.find_process_cmdline('keepalived') is not None:
config.merge(monasca_setup.detection.watch_process(['keepalived'], 'haproxy', exact_match=False))
config.merge(
monasca_setup.detection.watch_process(
['keepalived'],
'haproxy',
exact_match=False))
proxy_cfgfile = '/etc/haproxy/haproxy.cfg'
if os.path.exists(proxy_cfgfile):
@ -57,11 +65,17 @@ class HAProxy(monasca_setup.detection.Plugin):
password = auth[1].strip()
if url is None:
log.warn('Unable to parse haproxy config for stats url, skipping HAProxy check plugin configuration')
log.warn(
'Unable to parse haproxy config for stats url, skipping HAProxy check plugin'
'configuration')
else:
log.info('Enabling the HAProxy check plugin')
instance_config = {'name': url, 'url': url, 'status_check': False, 'collect_service_stats_only': True,
'collect_status_metrics': False}
instance_config = {
'name': url,
'url': url,
'status_check': False,
'collect_service_stats_only': True,
'collect_status_metrics': False}
if user is not None:
instance_config['username'] = user
if password is not None:

View File

@ -8,7 +8,8 @@ import monasca_setup.detection
class HttpCheck(monasca_setup.detection.ArgsPlugin):
"""Setup an http_check according to the passed in args.
Despite being a detection plugin this plugin does no detection and will be a noop without arguments.
Despite being a detection plugin this plugin does no detection and will be a noop without
arguments.
Expects space separated arguments, the required argument is url. Optional parameters include:
disable_ssl_validation and match_pattern.
"""

View File

@ -83,7 +83,8 @@ class InfluxDB(detection.Plugin):
def _monitor_process(self):
dimensions = {}
if self.args and self.args.get(self.INFLUXDB_NODE_ARG_NAME):
dimensions.update({self.INFLUXDB_NODE_ARG_NAME: self.args.get(self.INFLUXDB_NODE_ARG_NAME)})
dimensions.update(
{self.INFLUXDB_NODE_ARG_NAME: self.args.get(self.INFLUXDB_NODE_ARG_NAME)})
return detection.watch_process([self.PROC_NAME],
service='influxdb',

View File

@ -34,15 +34,19 @@ _CONSUMER_GROUP_COMMAND_LINE_VALUES_LEN = 7
class Kafka(Plugin):
"""Detect Kafka daemons and sets up configuration to monitor them.
This plugin configures the kafka_consumer plugin and does not configure any jmx based checks against kafka.
Note this plugin will pull the same information from kafka on each node in the cluster it runs on.
This plugin configures the kafka_consumer plugin and does not configure any jmx based
checks against kafka.
Note this plugin will pull the same information from kafka on each node in the cluster it
runs on.
To skip detection consumer groups and topics can be specified with plugin args, for example:
`monasca-setup -d kafka -a "group1=topic1 group2=topic2/topic3"`
All partitions are assumed for each topic and '/' is used to deliminate more than one topic per consumer group.
To skip detection consumer groups and topics can be specified with plugin args,
for example:
`monasca-setup -d kafka -a "group1=topic1 group2=topic2/topic3"`
All partitions are assumed for each topic and '/' is used to deliminate more than one
topic per consumer group.
For more information see:
- https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
For more information see:
- https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
"""
def __init__(self, template_dir, overwrite=True, args=None, port=9092):
@ -182,7 +186,10 @@ class Kafka(Plugin):
if listen_ip:
log.info("\tKafka found listening on {:s}:{:d}".format(listen_ip, self.port))
else:
log.info("\tKafka not found listening on a specific IP (port {:d}), using 'localhost'".format(self.port))
log.info(
"\tKafka not found listening on a specific IP (port {:d}),"
"using 'localhost'".format(
self.port))
listen_ip = 'localhost'
return "{:s}:{:d}".format(listen_ip, self.port)
@ -207,8 +214,8 @@ class Kafka(Plugin):
def _ls_zookeeper(self, path):
"""Do a ls on the given zookeeper path.
I am using the local command line kafka rather than kazoo because it doesn't make sense to
have kazoo as a dependency only for detection.
I am using the local command line kafka rather than kazoo because it doesn't make
sense to have kazoo as a dependency only for detection.
"""
zk_shell = [self._zookeeper_consumer_bin, self.zk_url, 'ls', path]
try:
@ -218,7 +225,8 @@ class Kafka(Plugin):
path)
raise
# The last line is like '[item1, item2, item3]', '[]' or an error message not starting with [
# The last line is like '[item1, item2, item3]', '[]' or an error message
# not starting with [
last_line = output.splitlines()[-1]
if len(last_line) == 2 or last_line[0] != '[':
return []

View File

@ -144,7 +144,7 @@ class Kibana(detection.Plugin):
def dependencies_installed(self):
try:
import yaml
import yaml # noqa
except Exception:
return False
return True

View File

@ -12,7 +12,6 @@ import sys
from oslo_config import cfg
from oslo_utils import importutils
from monasca_agent.common.psutil_wrapper import psutil
from monasca_setup import agent_config
from monasca_setup.detection import plugin
from monasca_setup.detection import utils

View File

@ -10,11 +10,9 @@
polls all services and all hosts.
"""
import logging
import os
import re
import monasca_setup.agent_config
import monasca_setup.detection
import os
log = logging.getLogger(__name__)
@ -55,8 +53,8 @@ class MKLivestatus(monasca_setup.detection.Plugin):
if self.dependencies_installed and socket_path is not None:
if os.path.exists(socket_path):
# Is it readable by the monasca-agent user?
test_readable = os.system('sudo -u {0} ls -1 {1} >/dev/null 2>&1'.format(agent_user,
socket_path))
test_readable = os.system(
'sudo -u {0} ls -1 {1} >/dev/null 2>&1'.format(agent_user, socket_path))
if test_readable != 0:
log.info("Not configuring MK_Livestatus:")
log.info("\t{0} exists but is not readable by {1}.".format(socket_path,
@ -81,7 +79,7 @@ class MKLivestatus(monasca_setup.detection.Plugin):
def dependencies_installed(self):
try:
import socket
import socket # noqa
except ImportError:
return False
else:

View File

@ -5,7 +5,8 @@
"""Classes for monitoring the monitoring server stack.
Covering mon-persister, mon-api and mon-thresh.
Kafka, mysql, vertica and influxdb are covered by other detection plugins. Mon-notification uses statsd.
Kafka, mysql, vertica and influxdb are covered by other detection plugins. Mon-notification
uses statsd.
"""
import logging
@ -17,7 +18,6 @@ from six.moves import configparser
import monasca_setup.agent_config
import monasca_setup.detection
from monasca_setup.detection import find_process_cmdline
from monasca_setup.detection import find_process_name
from monasca_setup.detection.utils import get_agent_username
from monasca_setup.detection.utils import watch_process
from monasca_setup.detection.utils import watch_process_by_username
@ -293,8 +293,19 @@ class MonThresh(monasca_setup.detection.Plugin):
config = monasca_setup.agent_config.Plugins()
for process in ['storm.daemon.nimbus', 'storm.daemon.supervisor', 'storm.daemon.worker']:
if find_process_cmdline(process) is not None:
config.merge(watch_process([process], 'monitoring', 'apache-storm', exact_match=False, detailed=False))
config.merge(watch_process_by_username('storm', 'monasca-thresh', 'monitoring', 'apache-storm'))
config.merge(
watch_process(
[process],
'monitoring',
'apache-storm',
exact_match=False,
detailed=False))
config.merge(
watch_process_by_username(
'storm',
'monasca-thresh',
'monitoring',
'apache-storm'))
return config
def dependencies_installed(self):
@ -305,11 +316,12 @@ def dropwizard_health_check(service, component, url):
"""Setup a dropwizard heathcheck to be watched by the http_check plugin."""
config = monasca_setup.agent_config.Plugins()
config['http_check'] = {'init_config': None,
'instances': [{'name': "{0}-{1} healthcheck".format(service, component),
'url': url,
'timeout': 5,
'include_content': False,
'dimensions': {'service': service, 'component': component}}]}
'instances': [
{'name': "{0}-{1} healthcheck".format(service, component),
'url': url,
'timeout': 5,
'include_content': False,
'dimensions': {'service': service, 'component': component}}]}
return config
@ -320,7 +332,8 @@ def dropwizard_metrics(service, component, url, whitelist):
'instances': [{'name': "{0}-{1} metrics".format(service, component),
'url': url,
'timeout': 5,
'dimensions': {'service': service, 'component': component},
'dimensions': {'service': service,
'component': component},
'whitelist': whitelist}]}
return config
@ -462,56 +475,55 @@ class _MonPersisterJavaHelper(_DropwizardJavaHelper):
for idx in range(alarm_num_threads):
new_thread = {
"name": "alarm-state-transitions-added-to-batch-counter[{0}]".format(idx),
"path": "counters/monasca.persister.pipeline.event.AlarmStateTransitionHandler[alarm-state-transition-{0}].alarm-state-transitions-added-to-batch-counter/count".format(idx),
"type": "rate"
}
"path": "counters/monasca.persister.pipeline.event."
"AlarmStateTransitionHandler[alarm-state-transition-{0}]."
"alarm-state-transitions-added-to-batch-counter/count".format(idx),
"type": "rate"}
whitelist.append(new_thread)
for idx in range(metric_num_threads):
new_thread = {
"name": "metrics-added-to-batch-counter[{0}]".format(idx),
"path": "counters/monasca.persister.pipeline.event.MetricHandler[metric-{0}].metrics-added-to-batch-counter/count".format(idx),
"type": "rate"
}
"path": "counters/monasca.persister.pipeline.event.MetricHandler[metric-{0}]."
"metrics-added-to-batch-counter/count".format(idx),
"type": "rate"}
whitelist.append(new_thread)
def _add_vertica_metrics(self, whitelist):
whitelist.extend([
{
"name": "monasca.persister.repository.vertica.VerticaMetricRepo.definition-cache-hit-meter",
"path": "meters/monasca.persister.repository.vertica.VerticaMetricRepo.definition-cache-hit-meter/count",
"type": "rate"
},
{
"name": "monasca.persister.repository.vertica.VerticaMetricRepo.definition-cache-miss-meter",
"path": "meters/monasca.persister.repository.vertica.VerticaMetricRepo.definition-cache-miss-meter/count",
"type": "rate"
},
{
"name": "monasca.persister.repository.vertica.VerticaMetricRepo.definition-dimension-cache-hit-meter",
"path": "meters/monasca.persister.repository.vertica.VerticaMetricRepo.definition-dimension-cache-hit-meter/count",
"type": "rate"
},
{
"name": "monasca.persister.repository.vertica.VerticaMetricRepo.definition-dimension-cache-miss-meter",
"path": "meters/monasca.persister.repository.vertica.VerticaMetricRepo.definition-dimension-cache-miss-meter/count",
"type": "rate"
},
{
"name": "monasca.persister.repository.vertica.VerticaMetricRepo.dimension-cache-hit-meter",
"path": "meters/monasca.persister.repository.vertica.VerticaMetricRepo.dimension-cache-hit-meter/count",
"type": "rate"
},
{
"name": "monasca.persister.repository.vertica.VerticaMetricRepo.dimension-cache-miss-meter",
"path": "meters/monasca.persister.repository.vertica.VerticaMetricRepo.dimension-cache-miss-meter/count",
"type": "rate"
},
{
"name": "monasca.persister.repository.vertica.VerticaMetricRepo.measurement-meter",
"path": "meters/monasca.persister.repository.vertica.VerticaMetricRepo.measurement-meter/count",
"type": "rate"
}
])
whitelist.extend(
[{"name": "monasca.persister.repository.vertica.VerticaMetricRepo."
"definition-cache-hit-meter",
"path": "meters/monasca.persister.repository.vertica.VerticaMetricRepo."
"definition-cache-hit-meter/count",
"type": "rate"},
{"name": "monasca.persister.repository.vertica.VerticaMetricRepo."
"definition-cache-miss-meter",
"path": "meters/monasca.persister.repository.vertica.VerticaMetricRepo."
"definition-cache-miss-meter/count",
"type": "rate"},
{"name": "monasca.persister.repository.vertica.VerticaMetricRepo."
"definition-dimension-cache-hit-meter",
"path": "meters/monasca.persister.repository.vertica.VerticaMetricRepo."
"definition-dimension-cache-hit-meter/count",
"type": "rate"},
{"name": "monasca.persister.repository.vertica.VerticaMetricRepo."
"definition-dimension-cache-miss-meter",
"path": "meters/monasca.persister.repository.vertica.VerticaMetricRepo."
"definition-dimension-cache-miss-meter/count",
"type": "rate"},
{"name": "monasca.persister.repository.vertica.VerticaMetricRepo."
"dimension-cache-hit-meter",
"path": "meters/monasca.persister.repository.vertica.VerticaMetricRepo."
"dimension-cache-hit-meter/count",
"type": "rate"},
{"name": "monasca.persister.repository.vertica.VerticaMetricRepo."
"dimension-cache-miss-meter",
"path": "meters/monasca.persister.repository.vertica.VerticaMetricRepo."
"dimension-cache-miss-meter/count",
"type": "rate"},
{"name": "monasca.persister.repository.vertica.VerticaMetricRepo.measurement-meter",
"path": "meters/monasca.persister.repository.vertica.VerticaMetricRepo."
"measurement-meter/count",
"type": "rate"}])
def _monitor_endpoints(self, config, metrics):
admin_connector = self._cfg['server']['adminConnectors'][0]

View File

@ -149,7 +149,7 @@ class MySQL(monasca_setup.detection.Plugin):
def dependencies_installed(self):
try:
import pymysql
import pymysql # noqa
except ImportError:
return False
return True

View File

@ -38,15 +38,18 @@ class Ntp(monasca_setup.detection.Plugin):
else:
ntp_server = 'pool.ntp.org'
if re.match('^127', ntp_server):
log.warn("NTP Server points to localhost no value in collecting NTP metrics. Skipping configuration.")
log.warn(
"NTP Server points to localhost no value in collecting NTP metrics."
"Skipping configuration.")
return None
config['ntp'] = {'init_config': None, 'instances': [{'name': ntp_server, 'host': ntp_server}]}
config['ntp'] = {'init_config': None, 'instances': [
{'name': ntp_server, 'host': ntp_server}]}
return config
def dependencies_installed(self):
try:
import ntplib
import ntplib # noqa
except ImportError:
return False
else:

View File

@ -66,7 +66,8 @@ class Postfix(plugin.Plugin):
"""
# A bit silly to parse the yaml only for it to be converted back but this
# plugin is the exception not the rule
with open(os.path.join(self.template_dir, 'conf.d/postfix.yaml.example'), 'r') as postfix_template:
with open(os.path.join(self.template_dir, 'conf.d/postfix.yaml.example'),
'r') as postfix_template:
default_net_config = yaml.safe_load(postfix_template.read())
config = agent_config.Plugins()
config['postfix'] = default_net_config

View File

@ -13,9 +13,6 @@
# under the License.
import logging
import os
import yaml
import monasca_setup.agent_config
import monasca_setup.detection

View File

@ -17,16 +17,20 @@ class ProcessCheck(monasca_setup.detection.Plugin):
"""Setup a process check according to the passed in JSON string or YAML config file path.
A process can be monitored by process_names or by process_username, or by both if
the process_config list contains both dictionary entries. Pass in the dictionary containing process_names
when watching process by name. Pass in the dictionary containing process_username and dimensions with
component when watching process by username. Watching by process_username is useful for groups of processes
that are owned by a specific user. For process monitoring by process_username the component dimension
is required since it is used to initialize the instance name in process.yaml.
the process_config list contains both dictionary entries. Pass in the dictionary containing
process_names when watching process by name. Pass in the dictionary containing process_user
name and dimensions with component when watching process by username. Watching by
process_username is useful for groups of processes that are owned by a specific user.
For process monitoring by process_username the component dimension is required since it is
used to initialize the instance name in process.yaml.
service and component dimensions are recommended to distinguish multiple components per service. The component
dimensions will be defaulted to the process name when it is not input when monitoring by process_names.
exact_match is optional and defaults to false, meaning the process name search string can be found within the process name.
exact_match can be set to true if the process_names search string should match the process name.
service and component dimensions are recommended to distinguish multiple components per
service. The component dimensions will be defaulted to the process name when it is not
input when monitoring by process_names.
exact_match is optional and defaults to false, meaning the process name search string can
be found within the process name.
exact_match can be set to true if the process_names search string should match the process
name.
Pass in a YAML config file path:
monasca-setup -d ProcessCheck -a "conf_file_path=/home/stack/myprocess.yaml"
@ -34,27 +38,35 @@ class ProcessCheck(monasca_setup.detection.Plugin):
or
Pass in a JSON string command line argument:
Using monasca-setup, you can pass in a json string with arguments --detection_args_json, or the shortcut -json.
Using monasca-setup, you can pass in a json string with arguments --detection_args_json,
or the shortcut -json.
Monitor by process_names:
monasca-setup -d ProcessCheck -json \
'{"process_config":[{"process_names":["monasca-notification","monasca-api"],"dimensions":{"service":"monitoring"}}]}'
'{"process_config":[{"process_names":["monasca-notification","monasca-api"],
"dimensions":{"service":"monitoring"}}]}'
Specifiy one or more dictionary entries to the process_config list:
monasca-setup -d ProcessCheck -json \
'{"process_config":[
{"process_names":["monasca-notification","monasca-api"],"dimensions":{"service":"monitoring"}},
{"process_names":["elasticsearch"],"dimensions":{"service":"logging"},"exact_match":"true"},
{"process_names":["monasca-thresh"],"dimensions":{"service":"monitoring","component":"thresh"}}]}'
{"process_names":["monasca-notification","monasca-api"],
"dimensions":{"service":"monitoring"}},
{"process_names":["elasticsearch"],
"dimensions":{"service":"logging"},"exact_match":"true"},
{"process_names":["monasca-thresh"],
"dimensions":{"service":"monitoring","component":"thresh"}}]}'
Monitor by process_username:
monasca-setup -d ProcessCheck -json \
'{"process_config":[{"process_username":"dbadmin","dimensions":{"service":"monitoring","component":"vertica"}}]}'
'{"process_config":[{"process_username":"dbadmin",
"dimensions":{"service":"monitoring","component":"vertica"}}]}'
Can specify monitoring by both process_username and process_names:
monasca-setup -d ProcessCheck -json \
'{"process_config":[{"process_names":["monasca-api"],"dimensions":{"service":"monitoring"}},
{"process_username":"mon-api","dimensions":{"service":"monitoring","component":"monasca-api"}}]}'
'{"process_config":[{"process_names":["monasca-api"],
"dimensions":{"service":"monitoring"}},
{"process_username":"mon-api",
"dimensions":{"service":"monitoring","component":"monasca-api"}}]}'
"""
def __init__(self, template_dir, overwrite=False, args=None, **kwargs):
@ -83,7 +95,8 @@ class ProcessCheck(monasca_setup.detection.Plugin):
else:
log.error("\tInvalid format yaml file, missing key: process_config")
except yaml.YAMLError as e:
exception_msg = ("Could not read config file. Invalid yaml format detected. {0}.".format(e))
exception_msg = (
"Could not read config file. Invalid yaml format detected. {0}.".format(e))
raise Exception(exception_msg)
def _detect(self):
@ -106,12 +119,15 @@ class ProcessCheck(monasca_setup.detection.Plugin):
# monitoring by process_names
if not_found_process_names:
log.info("\tDid not discover process_name(s): {0}.".format(",".join(not_found_process_names)))
log.info(
"\tDid not discover process_name(s): {0}.".format(
",".join(not_found_process_names)))
if found_process_names:
process_item['found_process_names'] = found_process_names
if 'exact_match' in process_item:
if isinstance(process_item['exact_match'], six.string_types):
process_item['exact_match'] = (process_item['exact_match'].lower() == 'true')
process_item['exact_match'] = (
process_item['exact_match'].lower() == 'true')
else:
process_item['exact_match'] = False
self.valid_process_names.append(process_item)
@ -126,7 +142,12 @@ class ProcessCheck(monasca_setup.detection.Plugin):
if self.valid_process_names or self.valid_usernames:
self.available = True
def _monitor_by_process_name(self, process_name, exact_match=False, detailed=True, dimensions=None):
def _monitor_by_process_name(
self,
process_name,
exact_match=False,
detailed=True,
dimensions=None):
config = monasca_setup.agent_config.Plugins()
instance = {'name': process_name,
'detailed': detailed,
@ -172,10 +193,15 @@ class ProcessCheck(monasca_setup.detection.Plugin):
# Watch by process_username
for process in self.valid_usernames:
log.info("\tMonitoring by process_username: {0} "
"for dimensions: {1}.".format(process['process_username'], json.dumps(process['dimensions'])))
config.merge(self._monitor_by_process_username(process_username=process['process_username'],
dimensions=process['dimensions']))
log.info(
"\tMonitoring by process_username: {0} "
"for dimensions: {1}.".format(
process['process_username'], json.dumps(
process['dimensions'])))
config.merge(
self._monitor_by_process_username(
process_username=process['process_username'],
dimensions=process['dimensions']))
return config
def dependencies_installed(self):

View File

@ -131,7 +131,9 @@ class RabbitMQ(monasca_setup.detection.Plugin):
# First watch the process
config.merge(monasca_setup.detection.watch_process(['epmd'], 'rabbitmq', detailed=False))
log.info("\tWatching the rabbitmq-server process.")
config.merge(monasca_setup.detection.watch_process_by_username('rabbitmq', 'rabbitmq', 'rabbitmq'))
config.merge(
monasca_setup.detection.watch_process_by_username(
'rabbitmq', 'rabbitmq', 'rabbitmq'))
log.info("\tWatching all processes owned by the rabbitmq user.")
if not self._watch_api:

View File

@ -30,7 +30,8 @@ class System(Plugin):
config = agent_config.Plugins()
for metric in System.system_metrics:
try:
with open(os.path.join(self.template_dir, 'conf.d/' + metric + '.yaml'), 'r') as metric_template:
with open(os.path.join(self.template_dir, 'conf.d/' + metric + '.yaml'),
'r') as metric_template:
default_config = yaml.safe_load(metric_template.read())
config[metric] = default_config
if self.args:

View File

@ -21,11 +21,13 @@ class Vertica(monasca_setup.detection.Plugin):
"""Detect Vertica process running and DB connection status
This plugin has the following options (each optional) that you can pass in via command line:
This plugin has the following options (each optional) that you can pass in via command line:
- user (optional - user to connect with) - Defaults to monitor user
- password (optional - password to use when connecting) - Defaults to password
- service (optional - dimensions service to be set for the metrics coming out of the plugin)
- timeout (optional - timeout for vertica connection in seconds) - Defaults to 3 second
- service (optional - dimensions service to be set for the metrics coming out
of the plugin)
- timeout (optional - timeout for vertica connection in seconds) - Defaults to
3 seconds)
"""
def _detect(self):
@ -66,7 +68,9 @@ class Vertica(monasca_setup.detection.Plugin):
self.node_name = stdout
return True
else:
log.error("Error querying vertica with return code of {0} and the error {1}".format(return_code, stderr))
log.error(
"Error querying vertica with return code of {0} and the error {1}".format(
return_code, stderr))
return False
def build_config(self):

View File

@ -17,8 +17,6 @@
import logging
import os
import yaml
import monasca_setup.agent_config
import monasca_setup.detection
@ -50,7 +48,8 @@ class Zookeeper(monasca_setup.detection.Plugin):
if not process_found:
log.error('Zookeeper process has not been found: {0}'.format(err_str))
elif not has_config_file:
log.error('Zookeeper plugin cannot find configuration file: {0}. {1}'.format(self._cfg_file, err_str))
log.error('Zookeeper plugin cannot find configuration '
'file: {0}. {1}'.format(self._cfg_file, err_str))
def build_config(self):
"""Build the config as a Plugins object and return.
@ -60,8 +59,11 @@ class Zookeeper(monasca_setup.detection.Plugin):
host, port = self._read_config_file(self._cfg_file)
# First watch the process
log.info("\tWatching the zookeeper process.")
config.merge(monasca_setup.detection.watch_process(['org.apache.zookeeper.server'], 'zookeeper',
exact_match=False))
config.merge(
monasca_setup.detection.watch_process(
['org.apache.zookeeper.server'],
'zookeeper',
exact_match=False))
log.info("\tEnabling the zookeeper plugin")
config['zk'] = {

View File

@ -19,30 +19,34 @@ log = logging.getLogger(__name__)
class ServicePlugin(Plugin):
"""Base class implemented by the monasca-agent plugin detection classes for OpenStack Services.
Detection plugins inheriting from this class can easily setup up processes to be watched and
a http endpoint to be checked.
Detection plugins inheriting from this class can easily setup up processes to be watched and
a http endpoint to be checked.
This class covers Process, HTTP endpoints, Directory, and File monitoring. It is primarily used for
monitoring OpenStack components.
Note: There are existing default detection plugins for http_check.py, directory.py, and file_size.py that
only require configuration.
This class covers Process, HTTP endpoints, Directory, and File monitoring. It is primarily
used for monitoring OpenStack components.
Note: There are existing default detection plugins for http_check.py, directory.py, and
file_size.py that only require configuration.
A process can be monitored by process_names or by process_username. Pass in the process_names list argument
when watching process by name. Pass in the process_username argument and component_name arguments when
watching process by username. Watching by username is useful for groups of processes that are owned by a specific user.
For process monitoring by process_username the component_name is required since it is used to initialize the
instance name in process.yaml. component_name is optional for monitoring by process_name and all other checks.
A process can be monitored by process_names or by process_username. Pass in the process_names
list argument when watching process by name. Pass in the process_username argument and
component_name arguments when watching process by username. Watching by username is useful for
groups of processes that are owned by a specific user.
For process monitoring by process_username the component_name is required since it is used to
initialize the instance name in process.yaml. component_name is optional for monitoring by
process_name and all other checks.
An http endpoint connection can be checked by passing in the service_api_url and optional search_pattern parameters.
The http check can be skipped by specifying the argument 'disable_http_check'
An http endpoint connection can be checked by passing in the service_api_url and optional
search_pattern parameters.
The http check can be skipped by specifying the argument 'disable_http_check'
Directory size can be checked by passing in a directory_names list.
Directory size can be checked by passing in a directory_names list.
File size can be checked by passing in a file_dirs_names list where each directory name item includes a list of files.
example: 'file_dirs_names': [('/var/log/monasca/api', ['monasca-api'])]
File size can be checked by passing in a file_dirs_names list where each directory name item
includes a list of files.
example: 'file_dirs_names': [('/var/log/monasca/api', ['monasca-api'])]
Note: service_name and component_name are optional (except component_name is required with process_username) arguments
used for metric dimensions by all checks.
Note: service_name and component_name are optional (except component_name is required with
process_username) arguments used for metric dimensions by all checks.
"""
def __init__(self, kwargs):
@ -141,17 +145,29 @@ class ServicePlugin(Plugin):
log.info("\tMonitoring the size of all the files in the "
"directory {0}.".format(file_dir))
else:
log.info("\tMonitoring the size of files {0} in the "
"directory {1}.".format(", ".join(str(name) for name in file_names), file_dir))
config.merge(watch_file_size(directory_name=file_dir, file_names=file_names,
file_recursive=file_recursive, service=self.service_name,
component=self.component_name))
log.info(
"\tMonitoring the size of files {0} in the "
"directory {1}.".format(
", ".join(
str(name) for name in file_names),
file_dir))
config.merge(
watch_file_size(
directory_name=file_dir,
file_names=file_names,
file_recursive=file_recursive,
service=self.service_name,
component=self.component_name))
if self.directory_names:
for dir_name in self.directory_names:
log.info("\tMonitoring the size of directory {0}.".format(
dir_name))
config.merge(watch_directory(directory_name=dir_name, service=self.service_name, component=self.component_name))
config.merge(
watch_directory(
directory_name=dir_name,
service=self.service_name,
component=self.component_name))
# Skip the http_check if disable_http_check is set
if self.args is not None and self.args.get('disable_http_check', False):
@ -166,7 +182,8 @@ class ServicePlugin(Plugin):
if len(listening) > 0:
# If not listening on localhost or ips then use another local ip
if host == 'localhost' and len(set(['127.0.0.1', '0.0.0.0', '::', '::1']) & set(listening)) == 0:
if host == 'localhost' and len(
set(['127.0.0.1', '0.0.0.0', '::', '::1']) & set(listening)) == 0:
new_url = list(parsed)
new_url[1] = listening[0] + ':' + port
api_url = urlparse.urlunparse(new_url)

View File

@ -30,9 +30,10 @@ PREFIX_DIR = os.path.dirname(os.path.dirname(os.path.realpath(sys.argv[0])))
def main(argv=None):
parser = argparse.ArgumentParser(description='Configure and setup the agent. In a full run it will detect running' +
' daemons then configure and start the agent.',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser = argparse.ArgumentParser(
description='Configure and setup the agent. In a full run it will detect running' +
' daemons then configure and start the agent.',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
args = parse_arguments(parser)
if args.verbose:
@ -55,7 +56,8 @@ def main(argv=None):
# Verify required options
if args.username is None or args.password is None or args.keystone_url is None:
log.error('Username, password and keystone_url are required when running full configuration.')
log.error('Username, password and keystone_url are required when running full'
'configuration.')
parser.print_help()
sys.exit(1)
base_configuration(args)
@ -76,18 +78,22 @@ def main(argv=None):
if args.remove: # Remove entries for each plugin from the various plugin config files
changes = remove_config(args, plugin_names)
else:
# Run detection for all the plugins, halting on any failures if plugins were specified in the arguments
# Run detection for all the plugins, halting on any failures if plugins
# were specified in the arguments
detected_config = plugin_detection(plugins, args.template_dir, args.detection_args,
args.detection_args_json,
skip_failed=(args.detection_plugins is None))
if detected_config is None:
return 1 # Indicates detection problem, skip remaining steps and give non-zero exit code
# Indicates detection problem, skip remaining steps and give non-zero exit code
return 1
changes = modify_config(args, detected_config)
# Don't restart if only doing detection plugins and no changes found
if args.detection_plugins is not None and not changes:
log.info('No changes found for plugins {0}, skipping restart of Monasca Agent'.format(plugin_names))
log.info(
'No changes found for plugins {0}, skipping restart of'
'Monasca Agent'.format(plugin_names))
return 0
elif args.dry_run:
log.info('Running in dry mode, skipping changes and restart of Monasca Agent')
@ -151,9 +157,11 @@ def modify_config(args, detected_config):
if args.dry_run:
continue
else:
agent_config.save_plugin_config(args.config_dir, detection_plugin_name, args.user, new_config)
agent_config.save_plugin_config(
args.config_dir, detection_plugin_name, args.user, new_config)
else:
config = agent_config.read_plugin_config_from_disk(args.config_dir, detection_plugin_name)
config = agent_config.read_plugin_config_from_disk(
args.config_dir, detection_plugin_name)
# merge old and new config, new has precedence
if config is not None:
# For HttpCheck, if the new input url has the same host and
@ -196,7 +204,8 @@ def modify_config(args, detected_config):
log.info("Changes would be made to the config file for the {0}"
" check plugin".format(detection_plugin_name))
else:
agent_config.save_plugin_config(args.config_dir, detection_plugin_name, args.user, new_config)
agent_config.save_plugin_config(
args.config_dir, detection_plugin_name, args.user, new_config)
return modified_config
@ -209,82 +218,167 @@ def validate_positive(value):
def parse_arguments(parser):
parser.add_argument(
'-u', '--username', help="Username used for keystone authentication. Required for basic configuration.")
'-u',
'--username',
help="Username used for keystone authentication. Required for basic configuration.")
parser.add_argument(
'-p', '--password', help="Password used for keystone authentication. Required for basic configuration.")
'-p',
'--password',
help="Password used for keystone authentication. Required for basic configuration.")
parser.add_argument('--user_domain_id', help="User domain id for keystone authentication", default='')
parser.add_argument('--user_domain_name', help="User domain name for keystone authentication", default='')
parser.add_argument(
'--user_domain_id',
help="User domain id for keystone authentication",
default='')
parser.add_argument(
'--user_domain_name',
help="User domain name for keystone authentication",
default='')
parser.add_argument('--keystone_url', help="Keystone url. Required for basic configuration.")
parser.add_argument('--project_name', help="Project name for keystone authentication", default='')
parser.add_argument('--project_domain_id', help="Project domain id for keystone authentication", default='')
parser.add_argument('--project_domain_name', help="Project domain name for keystone authentication", default='')
parser.add_argument('--project_id', help="Keystone project id for keystone authentication", default='')
parser.add_argument(
'--project_name',
help="Project name for keystone authentication",
default='')
parser.add_argument(
'--project_domain_id',
help="Project domain id for keystone authentication",
default='')
parser.add_argument(
'--project_domain_name',
help="Project domain name for keystone authentication",
default='')
parser.add_argument(
'--project_id',
help="Keystone project id for keystone authentication",
default='')
parser.add_argument('--monasca_url', help="Monasca API url, if not defined the url is pulled from keystone",
type=six.text_type,
default='')
parser.add_argument('--service_type', help="Monasca API url service type in keystone catalog", default='')
parser.add_argument('--endpoint_type', help="Monasca API url endpoint type in keystone catalog", default='')
parser.add_argument('--region_name', help="Monasca API url region name in keystone catalog", default='')
parser.add_argument(
'--monasca_url',
help="Monasca API url, if not defined the url is pulled from keystone",
type=six.text_type,
default='')
parser.add_argument(
'--service_type',
help="Monasca API url service type in keystone catalog",
default='')
parser.add_argument(
'--endpoint_type',
help="Monasca API url endpoint type in keystone catalog",
default='')
parser.add_argument(
'--region_name',
help="Monasca API url region name in keystone catalog",
default='')
parser.add_argument('--system_only', help="Setup the service but only configure the base config and system " +
"metrics (cpu, disk, load, memory, network).",
action="store_true", default=False)
parser.add_argument('-d', '--detection_plugins', nargs='*',
help="Skip base config and service setup and only configure this space separated list. " +
"This assumes the base config has already run.")
parser.add_argument(
'--system_only',
help="Setup the service but only configure the base config and system " +
"metrics (cpu, disk, load, memory, network).",
action="store_true",
default=False)
parser.add_argument(
'-d',
'--detection_plugins',
nargs='*',
help="Skip base config and service setup and only configure this space separated list. " +
"This assumes the base config has already run.")
parser.add_argument('--skip_detection_plugins', nargs='*',
help="Skip detection for all plugins in this space separated list.")
detection_args_group = parser.add_mutually_exclusive_group()
detection_args_group.add_argument('-a', '--detection_args', help="A string of arguments that will be passed to detection " +
"plugins. Only certain detection plugins use arguments.")
detection_args_group.add_argument('-json', '--detection_args_json',
help="A JSON string that will be passed to detection plugins that parse JSON.")
detection_args_group.add_argument(
'-a',
'--detection_args',
help="A string of arguments that will be passed to detection " +
"plugins. Only certain detection plugins use arguments.")
detection_args_group.add_argument(
'-json',
'--detection_args_json',
help="A JSON string that will be passed to detection plugins that parse JSON.")
parser.add_argument('--check_frequency', help="How often to run metric collection in seconds",
type=validate_positive, default=30)
parser.add_argument('--num_collector_threads', help="Number of Threads to use in Collector " +
"for running checks", type=validate_positive, default=1)
parser.add_argument('--pool_full_max_retries', help="Maximum number of collection cycles where all of the threads " +
"in the pool are still running plugins before the " +
"collector will exit and be restart",
type=validate_positive, default=4)
parser.add_argument('--plugin_collect_time_warn', help="Number of seconds a plugin collection time exceeds " +
"that causes a warning to be logged for that plugin",
type=validate_positive, default=6)
parser.add_argument('--dimensions', help="Additional dimensions to set for all metrics. A comma separated list " +
"of name/value pairs, 'name:value,name2:value2'")
parser.add_argument('--ca_file', help="Sets the path to the ca certs file if using certificates. " +
"Required only if insecure is set to False", default='')
parser.add_argument('--insecure', help="Set whether certificates are used for Keystone authentication",
default=False)
parser.add_argument('--config_dir', help="Configuration directory", default='/etc/monasca/agent')
parser.add_argument('--log_dir', help="monasca-agent log directory", default='/var/log/monasca/agent')
parser.add_argument('--log_level', help="monasca-agent logging level (ERROR, WARNING, INFO, DEBUG)", required=False,
default='WARN')
parser.add_argument(
'--num_collector_threads',
help="Number of Threads to use in Collector " +
"for running checks",
type=validate_positive,
default=1)
parser.add_argument(
'--pool_full_max_retries',
help="Maximum number of collection cycles where all of the threads " +
"in the pool are still running plugins before the " +
"collector will exit and be restart",
type=validate_positive,
default=4)
parser.add_argument(
'--plugin_collect_time_warn',
help="Number of seconds a plugin collection time exceeds " +
"that causes a warning to be logged for that plugin",
type=validate_positive,
default=6)
parser.add_argument(
'--dimensions',
help="Additional dimensions to set for all metrics. A comma separated list " +
"of name/value pairs, 'name:value,name2:value2'")
parser.add_argument(
'--ca_file',
help="Sets the path to the ca certs file if using certificates. " +
"Required only if insecure is set to False",
default='')
parser.add_argument(
'--insecure',
help="Set whether certificates are used for Keystone authentication",
default=False)
parser.add_argument(
'--config_dir',
help="Configuration directory",
default='/etc/monasca/agent')
parser.add_argument(
'--log_dir',
help="monasca-agent log directory",
default='/var/log/monasca/agent')
parser.add_argument(
'--log_level',
help="monasca-agent logging level (ERROR, WARNING, INFO, DEBUG)",
required=False,
default='WARN')
parser.add_argument('--template_dir', help="Alternative template directory",
default=os.path.join(PREFIX_DIR, 'share/monasca/agent'))
parser.add_argument('--overwrite',
help="Overwrite existing plugin configuration. " +
"The default is to merge. agent.yaml is always overwritten.",
action="store_true")
parser.add_argument('-r', '--remove', help="Rather than add the detected configuration remove it.",
action="store_true", default=False)
parser.add_argument('--skip_enable', help="By default the service is enabled, " +
"which requires the script run as root. Set this to skip that step.",
action="store_true")
parser.add_argument(
'-r',
'--remove',
help="Rather than add the detected configuration remove it.",
action="store_true",
default=False)
parser.add_argument(
'--skip_enable',
help="By default the service is enabled, " +
"which requires the script run as root. Set this to skip that step.",
action="store_true")
parser.add_argument('--install_plugins_only', help="Only update plugin "
"configuration, do not configure services, users, etc."
" or restart services",
action="store_true")
parser.add_argument('--user', help="User name to run monasca-agent as", default='mon-agent')
parser.add_argument('-s', '--service', help="Service this node is associated with, added as a dimension.")
parser.add_argument('--amplifier', help="Integer for the number of additional measurements to create. " +
"Additional measurements contain the 'amplifier' dimension. " +
"Useful for load testing; not for production use.", default=0)
parser.add_argument(
'-s',
'--service',
help="Service this node is associated with, added as a dimension.")
parser.add_argument(
'--amplifier',
help="Integer for the number of additional measurements to create. " +
"Additional measurements contain the 'amplifier' dimension. " +
"Useful for load testing; not for production use.",
default=0)
parser.add_argument('-v', '--verbose', help="Verbose Output", action="store_true")
parser.add_argument('--dry_run', help="Make no changes just report on changes", action="store_true")
parser.add_argument(
'--dry_run',
help="Make no changes just report on changes",
action="store_true")
parser.add_argument('--max_buffer_size',
help="Maximum number of batches of measurements to"
" buffer while unable to communicate with monasca-api",
@ -312,13 +406,21 @@ def parse_arguments(parser):
return parser.parse_args()
def plugin_detection(plugins, template_dir, detection_args, detection_args_json, skip_failed=True, remove=False):
"""Runs the detection step for each plugin in the list and returns the complete detected agent config.
def plugin_detection(
plugins,
template_dir,
detection_args,
detection_args_json,
skip_failed=True,
remove=False):
"""Runs the detection step for each plugin in the list and returns the complete detected
agent config.
:param plugins: A list of detection plugin classes
:param template_dir: Location of plugin configuration templates
:param detection_args: Arguments passed to each detection plugin
:param skip_failed: When False any detection failure causes the run to halt and return None
:return: An agent_config instance representing the total configuration from all detection plugins run.
:return: An agent_config instance representing the total configuration from all detection
plugins run.
"""
plugin_config = agent_config.Plugins()
if detection_args_json:
@ -364,7 +466,8 @@ def remove_config(args, plugin_names):
deletes = False
plugin_name = os.path.splitext(os.path.basename(file_path))[0]
config = agent_config.read_plugin_config_from_disk(args.config_dir, plugin_name)
new_instances = [] # To avoid odd issues from iterating over a list you delete from, build a new instead
# To avoid odd issues from iterating over a list you delete from, build a new instead
new_instances = []
if args.detection_args is None:
for inst in config['instances']:
if 'built_by' in inst and inst['built_by'] in plugin_names:

View File

@ -1,3 +1,3 @@
# (C) Copyright 2015 Hewlett Packard Enterprise Development Company LP
from service import Service
from service import Service # noqa

View File

@ -63,10 +63,14 @@ class Systemd(LinuxInit):
# Write the systemd script
init_path = '/etc/systemd/system/{0}.service'.format(self.name)
with open(os.path.join(self.template_dir, 'monasca-agent.service.template'), 'r') as template:
with open(os.path.join(self.template_dir, 'monasca-agent.service.template'),
'r') as template:
with open(init_path, 'w') as service_script:
service_script.write(template.read().format(prefix=self.prefix_dir, monasca_user=self.username,
config_dir=self.config_dir))
service_script.write(
template.read().format(
prefix=self.prefix_dir,
monasca_user=self.username,
config_dir=self.config_dir))
os.chown(init_path, 0, 0)
os.chmod(init_path, 0o644)
@ -110,11 +114,25 @@ class Systemd(LinuxInit):
class SysV(LinuxInit):
def __init__(self, prefix_dir, config_dir, log_dir, template_dir, username, name='monasca-agent'):
def __init__(
self,
prefix_dir,
config_dir,
log_dir,
template_dir,
username,
name='monasca-agent'):
"""Setup this service with the given init template.
"""
service.Service.__init__(self, prefix_dir, config_dir, log_dir, template_dir, name, username)
service.Service.__init__(
self,
prefix_dir,
config_dir,
log_dir,
template_dir,
name,
username)
self.init_script = '/etc/init.d/%s' % self.name
self.init_template = os.path.join(template_dir, 'monasca-agent.init.template')
@ -127,8 +145,11 @@ class SysV(LinuxInit):
# Write the init script and enable.
with open(self.init_template, 'r') as template:
with open(self.init_script, 'w') as conf:
conf.write(template.read().format(prefix=self.prefix_dir, monasca_user=self.username,
config_dir=self.config_dir))
conf.write(
template.read().format(
prefix=self.prefix_dir,
monasca_user=self.username,
config_dir=self.config_dir))
os.chown(self.init_script, 0, 0)
os.chmod(self.init_script, 0o755)
@ -148,7 +169,8 @@ class SysV(LinuxInit):
log.info('Starting {0} service via SysV init script'.format(self.name))
if restart:
subprocess.check_call([self.init_script, 'restart']) # Throws CalledProcessError on error
# Throws CalledProcessError on error
subprocess.check_call([self.init_script, 'restart'])
else:
subprocess.check_call([self.init_script, 'start']) # Throws CalledProcessError on error
return True

View File

@ -14,14 +14,20 @@ log = logging.getLogger(__name__)
def discover_plugins(custom_path):
"""Find and import all detection plugins. It will look in detection/plugins dir of the code as well as custom_path
"""Find and import all detection plugins. It will look in detection/plugins dir of the code
as well as custom_path
:param custom_path: An additional path to search for detection plugins
:return: A list of imported detection plugin classes.
"""
# This was adapted from what monasca_agent.common.util.load_check_directory
plugin_paths = glob.glob(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'detection/plugins', '*.py'))
plugin_paths = glob.glob(
os.path.join(
os.path.dirname(
os.path.realpath(__file__)),
'detection/plugins',
'*.py'))
plugin_paths.extend(glob.glob(os.path.join(custom_path, '*.py')))
plugins = []
@ -30,7 +36,10 @@ def discover_plugins(custom_path):
if os.path.basename(plugin_path) == '__init__.py':
continue
try:
plugin = imp.load_source(os.path.splitext(os.path.basename(plugin_path))[0], plugin_path)
plugin = imp.load_source(
os.path.splitext(
os.path.basename(plugin_path))[0],
plugin_path)
except Exception:
log.exception('Unable to import detection plugin {0}'.format(plugin_path))
@ -60,8 +69,10 @@ def select_plugins(plugin_names, plugin_list, skip=False):
if len(plugins) != len(plugin_names) and not skip:
pnames = [p.__name__ for p in plugin_list]
log.warn("Not all plugins found, discovered plugins {0}\nAvailable plugins{1}".format(plugins,
pnames))
log.warn(
"Not all plugins found, discovered plugins {0}\nAvailable plugins{1}".format(
plugins,
pnames))
return plugins

View File

@ -82,13 +82,11 @@ deps = bindep
commands = bindep test
[flake8]
max-line-length = 120
max-line-length = 100
max-complexity = 30
# TODO: ignored checks should be enabled in the future
# E501 Line length > 80 characters
# F401 module imported but unused
# H405 multi line docstring summary not separated with an empty line
ignore = E501,F401,H405
ignore = H405
show-source = True
exclude=.venv,.git,.tox,dist,*egg,build,tests,tests_to_fix