Update k8s utils and prometheus plugin

For k8s we are updating the pod owner
functionaility

Kubernetes 1.6 and newer supports having owner
reference instead if parsing name. This patch
takes advantage of that.

For prometheus we are adding the ability to look
up pod owner based on a pod dimension. This is
key when using kube-state-metrics so we dont
have dead alarms

Also includes a minor fix in k8s plugin for memory

Patch also includes changes to our agent tests as
we now allow () in our new dimensions keys and values
and our tests were not updated for this change yet

Change-Id: I17ea7f42d4b23534221675309c31feeafa75d20c
This commit is contained in:
mhoppal 2017-12-01 13:04:20 -07:00
parent 7cea50abba
commit 9105ce9a29
6 changed files with 181 additions and 75 deletions

View File

@ -2234,6 +2234,7 @@ The annotations the plugin is looking for are -
* monasca.io/usek8slabels: Attach Kubernetes labels of the pod that is being scraped. Default to 'true'
* monasca.io/whitelist: Yaml list of metric names to whitelist against on detected endpoint
* monasca.io/metric_types: Yaml dictionary where key is metric name and value is desired type from 'rate' or 'counter'. Metric name will be appended with '_rate' or '_counter' respectively. If not specified, the scraped value will be passed without modification.
* monasca.io/report_pod_label_owner: If the metrics that are scraped contain pod as a label key we will attempt to get the pod owner and attach that to the metric as another dimension. Very useful for other scraping from other solutions that monitor k8s (Ex. kube-state-metrics). Default to 'false'
These annotations are pulled from the Kubelet for pod autodetection and the Kubernetes API for the service auto detection

View File

@ -754,62 +754,86 @@ def get_pod_dimensions(kubernetes_connector, pod_metadata, kubernetes_labels):
for label in kubernetes_labels:
if label in pod_labels:
pod_dimensions[label] = pod_labels[label]
# Get owner of pod to set as a dimension
# Try to get from pod owner references
pod_owner_references = pod_metadata.get('ownerReferences', None)
if pod_owner_references:
try:
if len(pod_owner_references) > 1:
log.warn("More then one owner for pod {}".format(pod_name))
pod_owner_reference = pod_owner_references[0]
pod_owner_type = pod_owner_reference['kind']
pod_owner_name = pod_owner_reference['name']
_set_pod_owner_dimension(kubernetes_connector, pod_dimensions, pod_owner_type, pod_owner_name)
except Exception:
log.info("Could not get pod owner from ownerReferences for pod {}".format(pod_name))
# Try to get owner from annotations
else:
try:
pod_created_by = json.loads(pod_metadata['annotations']['kubernetes.io/created-by'])
pod_owner_type = pod_created_by['reference']['kind']
pod_owner_name = pod_created_by['reference']['name']
_set_pod_owner_dimension(kubernetes_connector, pod_dimensions, pod_owner_type, pod_owner_name)
except Exception:
log.info("Could not get pod owner from annotations for pod {}".format(pod_name))
pod_owner_dimension_set = get_pod_owner(kubernetes_connector, pod_metadata)
if pod_owner_dimension_set:
pod_dimensions[pod_owner_dimension_set[0]] = pod_owner_dimension_set[1]
return pod_dimensions
def _get_deployment_name(kubernetes_connector, pod_owner_name, pod_namespace):
replica_set_endpoint = "/apis/extensions/v1beta1/namespaces/{}/replicasets/{}".format(pod_namespace, pod_owner_name)
def _attempt_to_get_owner_name(kubernetes_connector, resource_endpoint):
try:
replica_set = kubernetes_connector.get_request(replica_set_endpoint)
replica_set_annotations = replica_set['metadata']['annotations']
if "deployment.kubernetes.io/revision" in replica_set_annotations:
return "-".join(pod_owner_name.split("-")[:-1])
resource = kubernetes_connector.get_request(resource_endpoint)
resource_metadata = resource['metadata']
pod_owner_pair = _parse_manifest_for_owner(resource_metadata)
return None if not pod_owner_pair else pod_owner_pair[1]
except Exception as e:
log.warn("Could not connect to api to get replicaset data - {}".format(e))
log.info("Could not connect to api to get owner data - {}".format(e))
return None
return None
def _set_pod_owner_dimension(kubernetes_connector, pod_dimensions, pod_owner_type, pod_owner_name):
def _parse_manifest_for_owner(resource_metadata):
resource_name = resource_metadata['name']
owner_references = resource_metadata.get('ownerReferences', None)
if owner_references:
try:
if len(owner_references) > 1:
log.warn("More then one owner for resource {}".format(resource_name))
owner_reference = owner_references[0]
resource_owner_type = owner_reference['kind']
resource_owner_name = owner_reference['name']
return resource_owner_type, resource_owner_name
except Exception:
log.info("Could not get owner from ownerReferences for resource {}".format(resource_name))
# Try to get owner from annotations
else:
try:
resource_created_by = json.loads(resource_metadata['annotations']['kubernetes.io/created-by'])
resource_owner_type = resource_created_by['reference']['kind']
resource_owner_name = resource_created_by['reference']['name']
return resource_owner_type, resource_owner_name
except Exception:
log.info("Could not get resource owner from annotations for resource {}".format(resource_name))
return None
def _get_pod_owner_pair(kubernetes_connector, pod_owner_type, pod_owner_name, pod_namespace):
if pod_owner_type == "ReplicationController":
pod_dimensions['replication_controller'] = pod_owner_name
return 'replication_controller', pod_owner_name
elif pod_owner_type == "ReplicaSet":
if not kubernetes_connector:
log.error("Can not set deployment name as connection information to API is not set. "
"Setting ReplicaSet as dimension")
log.info("Can not set deployment name as connection information to API is not set. "
"Setting ReplicaSet as dimension")
deployment_name = None
else:
deployment_name = _get_deployment_name(kubernetes_connector, pod_owner_name, pod_dimensions['namespace'])
replicaset_endpoint = "/apis/extensions/v1beta1/namespaces/{}/replicasets/{}".format(pod_namespace, pod_owner_name)
deployment_name = _attempt_to_get_owner_name(kubernetes_connector, replicaset_endpoint)
if not deployment_name:
pod_dimensions['replica_set'] = pod_owner_name
return 'replica_set', pod_owner_name
else:
pod_dimensions['deployment'] = deployment_name
return 'deployment', deployment_name
elif pod_owner_type == "DaemonSet":
pod_dimensions['daemon_set'] = pod_owner_name
return 'daemon_set', pod_owner_name
elif pod_owner_type == "Job":
pod_dimensions['job'] = pod_owner_name
if not kubernetes_connector:
log.info("Can not set cronjob name as connection information to API is not set. "
"Setting job as dimension")
cronjob_name = None
else:
job_endpoint = "/apis/batch/v1/namespaces/{}/jobs/{}".format(pod_namespace, pod_owner_name)
cronjob_name = _attempt_to_get_owner_name(kubernetes_connector, job_endpoint)
if not cronjob_name:
return 'job', pod_owner_name
else:
return 'cronjob', cronjob_name
else:
log.info("Unsupported pod owner kind {} as a dimension for pod {}".format(pod_owner_type,
pod_dimensions))
log.warn("Unsupported pod owner kind {}".format(pod_owner_type))
return None
def get_pod_owner(kubernetes_connector, pod_metadata):
pod_namespace = pod_metadata['namespace']
owner_pair = _parse_manifest_for_owner(pod_metadata)
if owner_pair:
return _get_pod_owner_pair(kubernetes_connector, owner_pair[0], owner_pair[1], pod_namespace)
return None

View File

@ -32,7 +32,7 @@ CADVISOR_METRICS = {
"cache": "mem.cache_bytes",
"usage": "mem.used_bytes",
"failcnt": "mem.fail_count",
"working_set": "mem.working_set"
"working_set": "mem.working_set_bytes"
},
"filesystem_metrics": {
"capacity": "fs.total_bytes",
@ -67,7 +67,7 @@ METRIC_TYPES_UNITS = {
"mem.cache_bytes": (["gauge"], ["bytes"]),
"mem.used_bytes": (["gauge"], ["bytes"]),
"mem.fail_count": (["gauge"], ["count"]),
"mem.working_set": (["gauge"], ["bytes"]),
"mem.working_set_bytes": (["gauge"], ["bytes"]),
"fs.total_bytes": (["gauge"], ["bytes"]),
"fs.usage_bytes": (["gauge"], ["bytes"]),
"fs.writes": (["gauge", "rate"], ["bytes", "bytes_per_second"]),

View File

@ -2,6 +2,7 @@
import math
import requests
import six
import time
import yaml
from prometheus_client.parser import text_string_to_metric_families
@ -29,7 +30,9 @@ class Prometheus(checks.AgentCheck):
Additional settings for prometheus endpoints
'monasca.io/usek8slabels': Attach kubernetes labels of the pod that is being scraped. Default to 'true'
'monasca.io/whitelist': Yaml list of metric names to whitelist against on detected endpoint
'monasca.io/report_pod_label_owner': If the metrics that are scraped contain pod as a label key we will attempt to get the
pod owner and attach that to the metric as another dimension. Very useful for other scraping from other solutions
that monitor k8s (Ex. kube-state-metrics). Default to 'false'
"""
def __init__(self, name, init_config, agent_config, instances=None):
@ -44,6 +47,8 @@ class Prometheus(checks.AgentCheck):
raise Exception('Prometheus Client only supports one configured instance if auto detection is set')
if self.detect_method not in ['pod', 'service']:
raise Exception('Invalid detect method {}. Must be either pod or service')
self.k8s_pod_cache = None
self.cache_start_time = None
def check(self, instance):
dimensions = self._set_dimensions(None, instance)
@ -60,6 +65,16 @@ class Prometheus(checks.AgentCheck):
self.kubernetes_labels = instance.get('kubernetes_labels', KUBERNETES_LABELS)
if not self.kubernetes_connector:
self.kubernetes_connector = utils.KubernetesConnector(self.connection_timeout)
# Check if we need to clear pod cache so it does not build up over time
if self.k8s_pod_cache is not None:
if not self.cache_start_time:
self.cache_start_time = time.time()
else:
current_time = time.time()
if (current_time - self.cache_start_time) > 86400:
self.cache_start_time = current_time
self.k8s_pod_cache = {}
self.initialize_pod_cache()
if self.detect_method == "pod":
if not self.kubelet_url:
try:
@ -69,19 +84,17 @@ class Prometheus(checks.AgentCheck):
self.log.error("Could not obtain current host from Kubernetes API {}. "
"Skipping check".format(e))
return
metric_endpoints, endpoints_whitelist, endpoint_metric_types = self._get_metric_endpoints_by_pod(dimensions)
prometheus_endpoints = self._get_metric_endpoints_by_pod(dimensions)
# Detect by service
else:
metric_endpoints, endpoints_whitelist, endpoint_metric_types = self._get_metric_endpoints_by_service(dimensions)
for metric_endpoint, endpoint_dimensions in six.iteritems(metric_endpoints):
endpoint_dimensions.update(dimensions)
self.report_endpoint_metrics(metric_endpoint, endpoint_dimensions,
endpoints_whitelist[metric_endpoint], endpoint_metric_types[metric_endpoint])
prometheus_endpoints = self._get_metric_endpoints_by_service(dimensions)
for prometheus_endpoint in prometheus_endpoints:
self.report_endpoint_metrics(prometheus_endpoint.scrape_endpoint, prometheus_endpoint.dimensions,
prometheus_endpoint.whitelist, prometheus_endpoint.metric_types,
prometheus_endpoint.report_pod_label_owner)
def _get_metric_endpoints_by_pod(self, dimensions):
scrape_endpoints = {}
endpoint_whitelist = {}
endpoint_metric_types = {}
prometheus_endpoints = []
# Grab running pods from local Kubelet
try:
pods = requests.get(self.kubelet_url, timeout=self.connection_timeout).json()
@ -121,21 +134,26 @@ class Prometheus(checks.AgentCheck):
pod_dimensions = dimensions.copy()
try:
use_k8s_labels, whitelist, metric_types = self._get_monasca_settings(pod_name, pod_annotations)
use_k8s_labels, whitelist, metric_types, report_pod_label_owner = \
self._get_monasca_settings(pod_name, pod_annotations)
except Exception as e:
error_message = "Error parsing monasca annotations on endpoints {} with error - {}. " \
"Skipping scraping metrics".format(endpoints, e)
self.log.error(error_message)
continue
# set global_pod_cache
if report_pod_label_owner and self.k8s_pod_cache is None:
self.k8s_pod_cache = {}
self.initialize_pod_cache()
if use_k8s_labels:
pod_dimensions.update(utils.get_pod_dimensions(
self.kubernetes_connector, pod['metadata'],
self.kubernetes_labels))
for endpoint in endpoints:
scrape_endpoint = "http://{}:{}".format(pod_ip, endpoint)
scrape_endpoints[scrape_endpoint] = pod_dimensions
endpoint_whitelist[scrape_endpoint] = whitelist
endpoint_metric_types[scrape_endpoint] = metric_types
prometheus_endpoint = PrometheusEndpoint(scrape_endpoint, pod_dimensions, whitelist, metric_types,
report_pod_label_owner)
prometheus_endpoints.append(prometheus_endpoint)
self.log.info("Detected pod endpoint - {} with metadata "
"of {}".format(scrape_endpoint,
pod_dimensions))
@ -143,12 +161,10 @@ class Prometheus(checks.AgentCheck):
self.log.warn("Error parsing {} to detect for scraping - {}".format(pod, e))
continue
return scrape_endpoints, endpoint_whitelist, endpoint_metric_types
return prometheus_endpoints
def _get_metric_endpoints_by_service(self, dimensions):
scrape_endpoints = {}
endpoint_whitelist = {}
endpoint_metric_types = {}
prometheus_endpoints = []
# Grab services from Kubernetes API
try:
services = self.kubernetes_connector.get_request("/api/v1/services")
@ -182,26 +198,31 @@ class Prometheus(checks.AgentCheck):
cluster_ip = service_spec['clusterIP']
service_dimensions = dimensions.copy()
try:
use_k8s_labels, whitelist, metric_types = self._get_monasca_settings(service_name, service_annotations)
use_k8s_labels, whitelist, metric_types, report_pod_label_owner = \
self._get_monasca_settings(service_name, service_annotations)
except Exception as e:
error_message = "Error parsing monasca annotations on endpoints {} with error - {}. " \
"Skipping scraping metrics".format(endpoints, e)
self.log.error(error_message)
continue
# set global_pod_cache
if report_pod_label_owner and self.k8s_pod_cache is None:
self.k8s_pod_cache = {}
self.initialize_pod_cache()
if use_k8s_labels:
service_dimensions.update(
self._get_service_dimensions(service_metadata))
for endpoint in endpoints:
scrape_endpoint = "http://{}:{}".format(cluster_ip, endpoint)
scrape_endpoints[scrape_endpoint] = service_dimensions
endpoint_whitelist[scrape_endpoint] = whitelist
endpoint_metric_types[scrape_endpoint] = metric_types
prometheus_endpoint = PrometheusEndpoint(scrape_endpoint, service_dimensions, whitelist, metric_types,
report_pod_label_owner)
prometheus_endpoints.append(prometheus_endpoint)
self.log.info("Detected service endpoint - {} with metadata "
"of {}".format(scrape_endpoint,
service_dimensions))
return scrape_endpoints, endpoint_whitelist, endpoint_metric_types
return prometheus_endpoints
def _get_monasca_settings(self, service_name, annotations):
def _get_monasca_settings(self, resource_name, annotations):
use_k8s_labels = annotations.get("monasca.io/usek8slabels", "true").lower() == "true"
whitelist = None
if "monasca.io/whitelist" in annotations:
@ -212,9 +233,11 @@ class Prometheus(checks.AgentCheck):
for typ in metric_types:
if metric_types[typ] not in ['rate', 'counter']:
self.log.warn("Ignoring unknown metric type '{}' configured for '{}' on endpoint '{}'".format(
typ, metric_types[typ], service_name))
typ, metric_types[typ], resource_name))
del metric_types[typ]
return use_k8s_labels, whitelist, metric_types
report_pod_label_owner_annotation = annotations.get("monasca.io/report_pod_label_owner", "false").lower()
report_pod_label_owner = True if report_pod_label_owner_annotation == "true" else False
return use_k8s_labels, whitelist, metric_types, report_pod_label_owner
def _get_service_dimensions(self, service_metadata):
service_dimensions = {'service_name': service_metadata['name'],
@ -260,7 +283,8 @@ class Prometheus(checks.AgentCheck):
"{} {} skipped for scraping".format(self.detect_method, name))
return endpoints
def _send_metrics(self, metric_families, dimensions, endpoint_whitelist, endpoint_metric_types):
def _send_metrics(self, metric_families, dimensions, endpoint_whitelist, endpoint_metric_types,
report_pod_label_owner):
for metric_family in metric_families:
for metric in metric_family.samples:
metric_dimensions = dimensions.copy()
@ -286,10 +310,23 @@ class Prometheus(checks.AgentCheck):
elif typ == "counter":
metric_func = self.increment
metric_name += "_counter"
if report_pod_label_owner:
if "pod" in metric_dimensions and "namespace" in metric_dimensions:
pod_name = metric_dimensions["pod"]
if pod_name in self.k8s_pod_cache:
pod_owner, pod_owner_name = self.k8s_pod_cache[pod_name]
metric_dimensions[pod_owner] = pod_owner_name
else:
pod_owner_pair = self.get_pod_owner(pod_name, metric_dimensions['namespace'])
if pod_owner_pair:
pod_owner = pod_owner_pair[0]
pod_owner_name = pod_owner_pair[1]
metric_dimensions[pod_owner] = pod_owner_name
self.k8s_pod_cache[pod_name] = pod_owner, pod_owner_name
metric_func(metric_name, metric_value, dimensions=metric_dimensions, hostname="SUPPRESS")
def report_endpoint_metrics(self, metric_endpoint, endpoint_dimensions, endpoint_whitelist=None,
endpoint_metric_types=None):
endpoint_metric_types=None, report_pod_label_owner=False):
# Hit metric endpoint
try:
result = requests.get(metric_endpoint, timeout=self.connection_timeout)
@ -300,8 +337,53 @@ class Prometheus(checks.AgentCheck):
if "text/plain" in result_content_type:
try:
metric_families = text_string_to_metric_families(result.text)
self._send_metrics(metric_families, endpoint_dimensions, endpoint_whitelist, endpoint_metric_types)
self._send_metrics(metric_families, endpoint_dimensions, endpoint_whitelist, endpoint_metric_types,
report_pod_label_owner)
except Exception as e:
self.log.error("Error parsing data from {} with error {}".format(metric_endpoint, e))
else:
self.log.error("Unsupported content type - {}".format(result_content_type))
def get_pod_owner(self, pod_name, namespace):
try:
pod = self.kubernetes_connector.get_request("/api/v1/namespaces/{}/pods/{}".format(namespace, pod_name))
pod_metadata = pod['metadata']
pod_owner, pod_owner_name = utils.get_pod_owner(self.kubernetes_connector, pod_metadata)
if not pod_owner:
self.log.info("Could not get pod owner for pod {}".format(pod_name))
return None
return pod_owner, pod_owner_name
except Exception as e:
self.log.info("Could not get pod {} from Kubernetes API with error - {}".format(pod_name, e))
return None
def initialize_pod_cache(self):
self.k8s_pod_cache = {}
try:
pods = self.kubernetes_connector.get_request("/api/v1/pods")
except Exception as e:
exception_message = "Could not get pods from Kubernetes API with error - {}".format(e)
self.log.exception(exception_message)
raise Exception(exception_message)
for pod in pods['items']:
pod_metadata = pod['metadata']
pod_name = pod_metadata['name']
try:
pod_owner, pod_owner_name = utils.get_pod_owner(self.kubernetes_connector, pod_metadata)
except Exception as e:
self.log.info("Error attempting to get pod {} owner with error {}".format(pod_name, e))
continue
if not pod_owner:
self.log.info("Could not get pod owner for pod {}".format(pod_name))
continue
self.k8s_pod_cache[pod_name] = (pod_owner, pod_owner_name)
# Class to hold prometheus endpoint metadata
class PrometheusEndpoint(object):
def __init__(self, scrape_endpoint, dimensions, whitelist, metric_types, report_pod_label_owner):
self.scrape_endpoint = scrape_endpoint
self.dimensions = dimensions
self.whitelist = whitelist
self.metric_types = metric_types
self.report_pod_label_owner = report_pod_label_owner

View File

@ -29,7 +29,7 @@ class TestAgentCheck(unittest.TestCase):
value_meta=None)
self.assertEqual(len(check.aggregator.metrics), 1)
dimensions = {'A': '()', 'B': 'C', 'D': 'E'}
dimensions = {'A': '{}', 'B': 'C', 'D': 'E'}
check.submit_metric("Bar",
5,
metrics_pkg.Gauge,

View File

@ -166,14 +166,13 @@ class TestMetricsAggregator(unittest.TestCase):
value_meta=value_meta,
exception=metric_validator.InvalidDimensionValue)
def testInvalidDimensionKeyRestrictedCharacters(self):
def testValidDimensionKeyParenthesesCharacter(self):
dimensions = {'A': 'B', 'B': 'C', '(D)': 'E'}
value_meta = {"This is a test": "test, test, test"}
self.submit_metric("Foo",
5,
dimensions=dimensions,
value_meta=value_meta,
exception=metric_validator.InvalidDimensionKey)
value_meta=value_meta)
def testInvalidDimensionValueRestrictedCharacters(self):
dimensions = {'A': 'B;', 'B': 'C', 'D': 'E'}