monasca-agent/monasca_agent/collector/checks_d/elastic.py

425 lines
23 KiB
Python

# (C) Copyright 2015,2016 Hewlett Packard Enterprise Development Company LP
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import socket
import subprocess
import sys
from six.moves import urllib
from monasca_agent.collector.checks import AgentCheck
from monasca_agent.collector.checks.utils import add_basic_auth
from monasca_agent.common.util import headers
class NodeNotFound(Exception):
pass
class ElasticSearch(AgentCheck):
METRICS = { # Metrics that are common to all Elasticsearch versions
"elasticsearch.docs.count": ("gauge", "indices.docs.count"),
"elasticsearch.docs.deleted": ("gauge", "indices.docs.deleted"),
"elasticsearch.store.size": ("gauge", "indices.store.size_in_bytes"),
"elasticsearch.indexing.index.total": ("gauge", "indices.indexing.index_total"),
"elasticsearch.indexing.index.time": ("gauge",
"indices.indexing.index_time_in_millis",
lambda v: float(v) / 1000),
"elasticsearch.indexing.index.current": ("gauge", "indices.indexing.index_current"),
"elasticsearch.indexing.delete.total": ("gauge", "indices.indexing.delete_total"),
"elasticsearch.indexing.delete.time": ("gauge",
"indices.indexing.delete_time_in_millis",
lambda v: float(v) / 1000),
"elasticsearch.indexing.delete.current": ("gauge", "indices.indexing.delete_current"),
"elasticsearch.get.total": ("gauge", "indices.get.total"),
"elasticsearch.get.time": ("gauge",
"indices.get.time_in_millis",
lambda v: float(v) / 1000),
"elasticsearch.get.current": ("gauge", "indices.get.current"),
"elasticsearch.get.exists.total": ("gauge", "indices.get.exists_total"),
"elasticsearch.get.exists.time": ("gauge",
"indices.get.exists_time_in_millis",
lambda v: float(v) / 1000),
"elasticsearch.get.missing.total": ("gauge", "indices.get.missing_total"),
"elasticsearch.get.missing.time": ("gauge",
"indices.get.missing_time_in_millis",
lambda v: float(v) / 1000),
"elasticsearch.search.query.total": ("gauge", "indices.search.query_total"),
"elasticsearch.search.query.time": ("gauge",
"indices.search.query_time_in_millis",
lambda v: float(v) / 1000),
"elasticsearch.search.query.current": ("gauge", "indices.search.query_current"),
"elasticsearch.search.fetch.total": ("gauge", "indices.search.fetch_total"),
"elasticsearch.search.fetch.time": ("gauge",
"indices.search.fetch_time_in_millis",
lambda v: float(v) / 1000),
"elasticsearch.search.fetch.current": ("gauge", "indices.search.fetch_current"),
"elasticsearch.merges.current": ("gauge", "indices.merges.current"),
"elasticsearch.merges.current.docs": ("gauge", "indices.merges.current_docs"),
"elasticsearch.merges.current.size": ("gauge", "indices.merges.current_size_in_bytes"),
"elasticsearch.merges.total": ("gauge", "indices.merges.total"),
"elasticsearch.merges.total.time": ("gauge",
"indices.merges.total_time_in_millis",
lambda v: float(v) / 1000),
"elasticsearch.merges.total.docs": ("gauge", "indices.merges.total_docs"),
"elasticsearch.merges.total.size": ("gauge", "indices.merges.total_size_in_bytes"),
"elasticsearch.refresh.total": ("gauge", "indices.refresh.total"),
"elasticsearch.refresh.total.time": ("gauge",
"indices.refresh.total_time_in_millis",
lambda v: float(v) / 1000),
"elasticsearch.flush.total": ("gauge", "indices.flush.total"),
"elasticsearch.flush.total.time": ("gauge",
"indices.flush.total_time_in_millis",
lambda v: float(v) / 1000),
"elasticsearch.process.open_fd": ("gauge", "process.open_file_descriptors"),
"elasticsearch.transport.rx_count": ("gauge", "transport.rx_count"),
"elasticsearch.transport.tx_count": ("gauge", "transport.tx_count"),
"elasticsearch.transport.rx_size": ("gauge", "transport.rx_size_in_bytes"),
"elasticsearch.transport.tx_size": ("gauge", "transport.tx_size_in_bytes"),
"elasticsearch.transport.server_open": ("gauge", "transport.server_open"),
"elasticsearch.thread_pool.bulk.active": ("gauge", "thread_pool.bulk.active"),
"elasticsearch.thread_pool.bulk.threads": ("gauge", "thread_pool.bulk.threads"),
"elasticsearch.thread_pool.bulk.queue": ("gauge", "thread_pool.bulk.queue"),
"elasticsearch.thread_pool.bulk.rejected": ("gauge", "thread_pool.bulk.rejected"),
"elasticsearch.thread_pool.flush.active": ("gauge", "thread_pool.flush.active"),
"elasticsearch.thread_pool.flush.threads": ("gauge", "thread_pool.flush.threads"),
"elasticsearch.thread_pool.flush.queue": ("gauge", "thread_pool.flush.queue"),
"elasticsearch.thread_pool.flush.rejected": ("gauge", "thread_pool.flush.rejected"),
"elasticsearch.thread_pool.generic.active": ("gauge", "thread_pool.generic.active"),
"elasticsearch.thread_pool.generic.threads": ("gauge", "thread_pool.generic.threads"),
"elasticsearch.thread_pool.generic.queue": ("gauge", "thread_pool.generic.queue"),
"elasticsearch.thread_pool.generic.rejected": ("gauge", "thread_pool.generic.rejected"),
"elasticsearch.thread_pool.get.active": ("gauge", "thread_pool.get.active"),
"elasticsearch.thread_pool.get.threads": ("gauge", "thread_pool.get.threads"),
"elasticsearch.thread_pool.get.queue": ("gauge", "thread_pool.get.queue"),
"elasticsearch.thread_pool.get.rejected": ("gauge", "thread_pool.get.rejected"),
"elasticsearch.thread_pool.index.active": ("gauge", "thread_pool.index.active"),
"elasticsearch.thread_pool.index.threads": ("gauge", "thread_pool.index.threads"),
"elasticsearch.thread_pool.index.queue": ("gauge", "thread_pool.index.queue"),
"elasticsearch.thread_pool.index.rejected": ("gauge", "thread_pool.index.rejected"),
"elasticsearch.thread_pool.management.active": ("gauge", "thread_pool.management.active"),
"elasticsearch.thread_pool.management.threads": ("gauge",
"thread_pool.management.threads"),
"elasticsearch.thread_pool.management.queue": ("gauge", "thread_pool.management.queue"),
"elasticsearch.thread_pool.management.rejected": ("gauge",
"thread_pool.management.rejected"),
"elasticsearch.thread_pool.merge.active": ("gauge", "thread_pool.merge.active"),
"elasticsearch.thread_pool.merge.threads": ("gauge", "thread_pool.merge.threads"),
"elasticsearch.thread_pool.merge.queue": ("gauge", "thread_pool.merge.queue"),
"elasticsearch.thread_pool.merge.rejected": ("gauge", "thread_pool.merge.rejected"),
"elasticsearch.thread_pool.percolate.active": ("gauge", "thread_pool.percolate.active"),
"elasticsearch.thread_pool.percolate.threads": ("gauge", "thread_pool.percolate.threads"),
"elasticsearch.thread_pool.percolate.queue": ("gauge", "thread_pool.percolate.queue"),
"elasticsearch.thread_pool.percolate.rejected": ("gauge",
"thread_pool.percolate.rejected"),
"elasticsearch.thread_pool.refresh.active": ("gauge", "thread_pool.refresh.active"),
"elasticsearch.thread_pool.refresh.threads": ("gauge", "thread_pool.refresh.threads"),
"elasticsearch.thread_pool.refresh.queue": ("gauge", "thread_pool.refresh.queue"),
"elasticsearch.thread_pool.refresh.rejected": ("gauge", "thread_pool.refresh.rejected"),
"elasticsearch.thread_pool.search.active": ("gauge", "thread_pool.search.active"),
"elasticsearch.thread_pool.search.threads": ("gauge", "thread_pool.search.threads"),
"elasticsearch.thread_pool.search.queue": ("gauge", "thread_pool.search.queue"),
"elasticsearch.thread_pool.search.rejected": ("gauge", "thread_pool.search.rejected"),
"elasticsearch.thread_pool.snapshot.active": ("gauge", "thread_pool.snapshot.active"),
"elasticsearch.thread_pool.snapshot.threads": ("gauge", "thread_pool.snapshot.threads"),
"elasticsearch.thread_pool.snapshot.queue": ("gauge", "thread_pool.snapshot.queue"),
"elasticsearch.thread_pool.snapshot.rejected": ("gauge", "thread_pool.snapshot.rejected"),
"elasticsearch.http.current_open": ("gauge", "http.current_open"),
"elasticsearch.http.total_opened": ("gauge", "http.total_opened"),
"jvm.gc.concurrent_mark_sweep.count": ("gauge", "jvm.gc.collectors.ConcurrentMarkSweep."
"collection_count"),
"jvm.gc.concurrent_mark_sweep.collection_time": ("gauge", "jvm.gc.collectors."
"ConcurrentMarkSweep."
"collection_time_in_millis",
lambda v: float(v) / 1000),
"jvm.gc.par_new.count": ("gauge", "jvm.gc.collectors.ParNew.collection_count"),
"jvm.gc.par_new.collection_time": ("gauge", "jvm.gc.collectors.ParNew."
"collection_time_in_millis",
lambda v: float(v) / 1000),
"jvm.mem.heap_committed": ("gauge", "jvm.mem.heap_committed_in_bytes"),
"jvm.mem.heap_used": ("gauge", "jvm.mem.heap_used_in_bytes"),
"jvm.mem.non_heap_committed": ("gauge", "jvm.mem.non_heap_committed_in_bytes"),
"jvm.mem.non_heap_used": ("gauge", "jvm.mem.non_heap_used_in_bytes"),
"jvm.threads.count": ("gauge", "jvm.threads.count"),
"jvm.threads.peak_count": ("gauge", "jvm.threads.peak_count"),
"elasticsearch.number_of_nodes": ("gauge", "number_of_nodes"),
"elasticsearch.number_of_data_nodes": ("gauge", "number_of_data_nodes"),
"elasticsearch.active_primary_shards": ("gauge", "active_primary_shards"),
"elasticsearch.active_shards": ("gauge", "active_shards"),
"elasticsearch.relocating_shards": ("gauge", "relocating_shards"),
"elasticsearch.initializing_shards": ("gauge", "initializing_shards"),
"elasticsearch.unassigned_shards": ("gauge", "unassigned_shards"),
"elasticsearch.cluster_status": ("gauge", "status", lambda v:
{"red": 0,
"yellow": 1,
"green": 2}.get(v, -1)),
}
def __init__(self, name, init_config, agent_config):
AgentCheck.__init__(self, name, init_config, agent_config)
# Host status needs to persist across all checks
self.cluster_status = {}
def check(self, instance):
config_url = instance.get('url')
if config_url is None:
raise Exception("An url must be specified")
# Load basic authentication configuration, if available.
username, password = instance.get('username'), instance.get('password')
if username and password:
auth = (username, password)
else:
auth = None
# Support URLs that have a path in them from the config, for
# backwards-compatibility.
parsed = urllib.parse.urlparse(config_url)
if parsed[2] != "":
config_url = "%s://%s" % (parsed[0], parsed[1])
# Tag by URL so we can differentiate the metrics from multiple instances
dimensions = self._set_dimensions({'url': config_url}, instance)
# Check ES version for this instance and define parameters (URLs and metrics) accordingly
version = self._get_es_version(config_url, auth)
self._define_params(version)
# Load stats data.
url = urllib.parse.urljoin(config_url, self.STATS_URL)
stats_data = self._get_data(url, auth)
self._process_stats_data(config_url, stats_data, auth, dimensions=dimensions)
# Load the health data.
url = urllib.parse.urljoin(config_url, self.HEALTH_URL)
health_data = self._get_data(url, auth)
self._process_health_data(config_url, health_data, dimensions=dimensions)
def _get_es_version(self, config_url, auth=None):
"""Get the running version of Elastic Search.
"""
try:
data = self._get_data(config_url, auth)
version = map(int, data['version']['number'].split('.'))
except Exception as e:
self.log.warn("Error while trying to get Elasticsearch version from %s %s" %
(config_url, str(e)))
version = [0, 0, 0]
self.log.debug("Elasticsearch version is %s" % version)
return version
def _define_params(self, version):
"""Define the set of URLs and METRICS to use depending on the running ES version.
"""
if version >= [0, 90, 10]:
# ES versions 0.90.10 and above
# Metrics architecture changed starting with version 0.90.10
self.HEALTH_URL = "/_cluster/health?pretty=true"
self.STATS_URL = "/_nodes/stats?all=true"
self.NODES_URL = "/_nodes?network=true"
additional_metrics = {
"elasticsearch.search.fetch.open_contexts": ("gauge",
"indices.search.open_contexts"),
"elasticsearch.cache.filter.evictions": ("gauge",
"indices.filter_cache.evictions"),
"elasticsearch.cache.filter.size": ("gauge",
"indices.filter_cache.memory_size_in_bytes"),
"elasticsearch.id_cache.size": ("gauge", "indices.id_cache.memory_size_in_bytes"),
"elasticsearch.fielddata.size": ("gauge",
"indices.fielddata.memory_size_in_bytes"),
"elasticsearch.fielddata.evictions": ("gauge", "indices.fielddata.evictions")
}
else:
# ES version 0.90.9 and below
self.HEALTH_URL = "/_cluster/health?pretty=true"
self.STATS_URL = "/_cluster/nodes/stats?all=true"
self.NODES_URL = "/_cluster/nodes?network=true"
additional_metrics = {
"elasticsearch.cache.field.evictions": ("gauge", "indices.cache.field_evictions"),
"elasticsearch.cache.field.size": ("gauge", "indices.cache.field_size_in_bytes"),
"elasticsearch.cache.filter.count": ("gauge", "indices.cache.filter_count"),
"elasticsearch.cache.filter.evictions": ("gauge", "indices.cache.filter_evictions"),
"elasticsearch.cache.filter.size": ("gauge", "indices.cache.filter_size_in_bytes"),
"elasticsearch.thread_pool.cache.active": ("gauge", "thread_pool.cache.active"),
"elasticsearch.thread_pool.cache.threads": ("gauge", "thread_pool.cache.threads"),
"elasticsearch.thread_pool.cache.queue": ("gauge", "thread_pool.cache.queue"),
"elasticsearch.thread_pool.cache.rejected": ("gauge", "thread_pool.cache.rejected"),
"jvm.gc.collection_count": ("gauge", "jvm.gc.collection_count"),
"jvm.gc.collection_time": ("gauge",
"jvm.gc.collection_time_in_millis",
lambda v: float(v) / 1000),
"jvm.gc.copy.count": ("gauge", "jvm.gc.collectors.Copy.collection_count"),
"jvm.gc.copy.collection_time": ("gauge",
"jvm.gc.collectors."
"Copy.collection_time_in_millis",
lambda v: float(v) / 1000)
}
self.METRICS.update(additional_metrics)
def _get_data(self, url, auth=None):
"""Hit a given URL and return the parsed json
`auth` is a tuple of (username, password) or None
"""
req = urllib.request.Request(url, None, headers(self.agent_config))
if auth:
add_basic_auth(req, *auth)
request = urllib.request.urlopen(req)
response = request.read()
return json.loads(response)
def _process_stats_data(self, config_url, data, auth, dimensions=None):
for node in data['nodes']:
node_data = data['nodes'][node]
def process_metric(metric, xtype, path, xform=None):
# closure over node_data
self._process_metric(node_data, metric, path, xform, dimensions=dimensions)
# On newer version of ES it's "host" not "hostname"
node_hostname = node_data.get('hostname', node_data.get('host', None))
if node_hostname is not None:
# For ES >= 0.19
hostnames = (
self.hostname.decode('utf-8'),
socket.gethostname().decode('utf-8'),
socket.getfqdn().decode('utf-8'),
socket.gethostbyname(socket.gethostname()).decode('utf-8')
)
if node_hostname.decode('utf-8') in hostnames:
for metric in self.METRICS:
# metric description
desc = self.METRICS[metric]
process_metric(metric, *desc)
else:
# ES < 0.19
# Fetch interface address from ifconfig or ip addr and check
# against the primary IP from ES
try:
nodes_url = urllib.parse.urljoin(config_url, self.NODES_URL)
primary_addr = self._get_primary_addr(nodes_url, node, auth)
except NodeNotFound:
# Skip any nodes that aren't found
continue
if self._host_matches_node(primary_addr):
for metric in self.METRICS:
# metric description
desc = self.METRICS[metric]
process_metric(metric, *desc)
def _get_primary_addr(self, url, node_name, auth):
"""Returns a list of primary interface addresses as seen by ES.
Used in ES < 0.19
"""
req = urllib.request.Request(url, None, headers(self.agent_config))
# Load basic authentication configuration, if available.
if auth:
add_basic_auth(req, *auth)
request = urllib.request.urlopen(req)
response = request.read()
data = json.loads(response)
if node_name in data['nodes']:
node = data['nodes'][node_name]
if 'network' in node\
and 'primary_interface' in node['network']\
and 'address' in node['network']['primary_interface']:
return node['network']['primary_interface']['address']
raise NodeNotFound()
@staticmethod
def _host_matches_node(primary_addrs):
"""For < 0.19, check if the current host matches the IP given in the
cluster nodes check `/_cluster/nodes`. Uses `ip addr` on Linux and
`ifconfig` on Mac
"""
if sys.platform == 'darwin':
ifaces = subprocess.Popen(['ifconfig'], stdout=subprocess.PIPE)
else:
ifaces = subprocess.Popen(['ip', 'addr'], stdout=subprocess.PIPE)
grepper = subprocess.Popen(['grep', 'inet'], stdin=ifaces.stdout,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
ifaces.stdout.close()
out, err = grepper.communicate()
# Capture the list of interface IPs
ips = []
for iface in out.split("\n"):
iface = iface.strip()
if iface:
ips.append(iface.split(' ')[1].split('/')[0])
# Check the interface addresses against the primary address
return primary_addrs in ips
def _process_metric(self, data, metric, path, xform=None, dimensions=None):
"""data: dictionary containing all the stats
metric: datadog metric
path: corresponding path in data, flattened, e.g. thread_pool.bulk.queue
xfom: a lambda to apply to the numerical value
"""
value = data
# Traverse the nested dictionaries
for key in path.split('.'):
if value is not None:
value = value.get(key, None)
else:
break
if value is not None:
if xform:
value = xform(value)
if self.METRICS[metric][0] == "gauge":
self.gauge(metric, value, dimensions=dimensions)
else:
self.rate(metric, value, dimensions=dimensions)
else:
self._metric_not_found(metric, path)
def _process_health_data(self, config_url, data, dimensions=None):
if self.cluster_status.get(config_url, None) is None:
self.cluster_status[config_url] = data['status']
if data['status'] != self.cluster_status.get(config_url):
self.cluster_status[config_url] = data['status']
def process_metric(metric, xtype, path, xform=None):
# closure over data
self._process_metric(data, metric, path, xform, dimensions=dimensions)
for metric in self.METRICS:
# metric description
desc = self.METRICS[metric]
process_metric(metric, *desc)
def _metric_not_found(self, metric, path):
self.log.debug("Metric not found: %s -> %s", path, metric)