diff --git a/docs/Plugins.md b/docs/Plugins.md index e7696556..d238fac1 100644 --- a/docs/Plugins.md +++ b/docs/Plugins.md @@ -2234,6 +2234,7 @@ The annotations the plugin is looking for are - * monasca.io/usek8slabels: Attach Kubernetes labels of the pod that is being scraped. Default to 'true' * monasca.io/whitelist: Yaml list of metric names to whitelist against on detected endpoint * monasca.io/metric_types: Yaml dictionary where key is metric name and value is desired type from 'rate' or 'counter'. Metric name will be appended with '_rate' or '_counter' respectively. If not specified, the scraped value will be passed without modification. +* monasca.io/report_pod_label_owner: If the metrics that are scraped contain pod as a label key we will attempt to get the pod owner and attach that to the metric as another dimension. Very useful for other scraping from other solutions that monitor k8s (Ex. kube-state-metrics). Default to 'false' These annotations are pulled from the Kubelet for pod autodetection and the Kubernetes API for the service auto detection diff --git a/monasca_agent/collector/checks/utils.py b/monasca_agent/collector/checks/utils.py index 2db6cbe0..8580eb53 100644 --- a/monasca_agent/collector/checks/utils.py +++ b/monasca_agent/collector/checks/utils.py @@ -754,62 +754,86 @@ def get_pod_dimensions(kubernetes_connector, pod_metadata, kubernetes_labels): for label in kubernetes_labels: if label in pod_labels: pod_dimensions[label] = pod_labels[label] - # Get owner of pod to set as a dimension - # Try to get from pod owner references - pod_owner_references = pod_metadata.get('ownerReferences', None) - if pod_owner_references: - try: - if len(pod_owner_references) > 1: - log.warn("More then one owner for pod {}".format(pod_name)) - pod_owner_reference = pod_owner_references[0] - pod_owner_type = pod_owner_reference['kind'] - pod_owner_name = pod_owner_reference['name'] - _set_pod_owner_dimension(kubernetes_connector, pod_dimensions, pod_owner_type, pod_owner_name) - except Exception: - log.info("Could not get pod owner from ownerReferences for pod {}".format(pod_name)) - # Try to get owner from annotations - else: - try: - pod_created_by = json.loads(pod_metadata['annotations']['kubernetes.io/created-by']) - pod_owner_type = pod_created_by['reference']['kind'] - pod_owner_name = pod_created_by['reference']['name'] - _set_pod_owner_dimension(kubernetes_connector, pod_dimensions, pod_owner_type, pod_owner_name) - except Exception: - log.info("Could not get pod owner from annotations for pod {}".format(pod_name)) + pod_owner_dimension_set = get_pod_owner(kubernetes_connector, pod_metadata) + if pod_owner_dimension_set: + pod_dimensions[pod_owner_dimension_set[0]] = pod_owner_dimension_set[1] return pod_dimensions -def _get_deployment_name(kubernetes_connector, pod_owner_name, pod_namespace): - replica_set_endpoint = "/apis/extensions/v1beta1/namespaces/{}/replicasets/{}".format(pod_namespace, pod_owner_name) +def _attempt_to_get_owner_name(kubernetes_connector, resource_endpoint): try: - replica_set = kubernetes_connector.get_request(replica_set_endpoint) - replica_set_annotations = replica_set['metadata']['annotations'] - if "deployment.kubernetes.io/revision" in replica_set_annotations: - return "-".join(pod_owner_name.split("-")[:-1]) + resource = kubernetes_connector.get_request(resource_endpoint) + resource_metadata = resource['metadata'] + pod_owner_pair = _parse_manifest_for_owner(resource_metadata) + return None if not pod_owner_pair else pod_owner_pair[1] except Exception as e: - log.warn("Could not connect to api to get replicaset data - {}".format(e)) + log.info("Could not connect to api to get owner data - {}".format(e)) return None return None -def _set_pod_owner_dimension(kubernetes_connector, pod_dimensions, pod_owner_type, pod_owner_name): +def _parse_manifest_for_owner(resource_metadata): + resource_name = resource_metadata['name'] + owner_references = resource_metadata.get('ownerReferences', None) + if owner_references: + try: + if len(owner_references) > 1: + log.warn("More then one owner for resource {}".format(resource_name)) + owner_reference = owner_references[0] + resource_owner_type = owner_reference['kind'] + resource_owner_name = owner_reference['name'] + return resource_owner_type, resource_owner_name + except Exception: + log.info("Could not get owner from ownerReferences for resource {}".format(resource_name)) + # Try to get owner from annotations + else: + try: + resource_created_by = json.loads(resource_metadata['annotations']['kubernetes.io/created-by']) + resource_owner_type = resource_created_by['reference']['kind'] + resource_owner_name = resource_created_by['reference']['name'] + return resource_owner_type, resource_owner_name + except Exception: + log.info("Could not get resource owner from annotations for resource {}".format(resource_name)) + return None + + +def _get_pod_owner_pair(kubernetes_connector, pod_owner_type, pod_owner_name, pod_namespace): if pod_owner_type == "ReplicationController": - pod_dimensions['replication_controller'] = pod_owner_name + return 'replication_controller', pod_owner_name elif pod_owner_type == "ReplicaSet": if not kubernetes_connector: - log.error("Can not set deployment name as connection information to API is not set. " - "Setting ReplicaSet as dimension") + log.info("Can not set deployment name as connection information to API is not set. " + "Setting ReplicaSet as dimension") deployment_name = None else: - deployment_name = _get_deployment_name(kubernetes_connector, pod_owner_name, pod_dimensions['namespace']) + replicaset_endpoint = "/apis/extensions/v1beta1/namespaces/{}/replicasets/{}".format(pod_namespace, pod_owner_name) + deployment_name = _attempt_to_get_owner_name(kubernetes_connector, replicaset_endpoint) if not deployment_name: - pod_dimensions['replica_set'] = pod_owner_name + return 'replica_set', pod_owner_name else: - pod_dimensions['deployment'] = deployment_name + return 'deployment', deployment_name elif pod_owner_type == "DaemonSet": - pod_dimensions['daemon_set'] = pod_owner_name + return 'daemon_set', pod_owner_name elif pod_owner_type == "Job": - pod_dimensions['job'] = pod_owner_name + if not kubernetes_connector: + log.info("Can not set cronjob name as connection information to API is not set. " + "Setting job as dimension") + cronjob_name = None + else: + job_endpoint = "/apis/batch/v1/namespaces/{}/jobs/{}".format(pod_namespace, pod_owner_name) + cronjob_name = _attempt_to_get_owner_name(kubernetes_connector, job_endpoint) + if not cronjob_name: + return 'job', pod_owner_name + else: + return 'cronjob', cronjob_name else: - log.info("Unsupported pod owner kind {} as a dimension for pod {}".format(pod_owner_type, - pod_dimensions)) + log.warn("Unsupported pod owner kind {}".format(pod_owner_type)) + return None + + +def get_pod_owner(kubernetes_connector, pod_metadata): + pod_namespace = pod_metadata['namespace'] + owner_pair = _parse_manifest_for_owner(pod_metadata) + if owner_pair: + return _get_pod_owner_pair(kubernetes_connector, owner_pair[0], owner_pair[1], pod_namespace) + return None diff --git a/monasca_agent/collector/checks_d/kubernetes.py b/monasca_agent/collector/checks_d/kubernetes.py index 05241ed9..069d6d15 100644 --- a/monasca_agent/collector/checks_d/kubernetes.py +++ b/monasca_agent/collector/checks_d/kubernetes.py @@ -32,7 +32,7 @@ CADVISOR_METRICS = { "cache": "mem.cache_bytes", "usage": "mem.used_bytes", "failcnt": "mem.fail_count", - "working_set": "mem.working_set" + "working_set": "mem.working_set_bytes" }, "filesystem_metrics": { "capacity": "fs.total_bytes", @@ -67,7 +67,7 @@ METRIC_TYPES_UNITS = { "mem.cache_bytes": (["gauge"], ["bytes"]), "mem.used_bytes": (["gauge"], ["bytes"]), "mem.fail_count": (["gauge"], ["count"]), - "mem.working_set": (["gauge"], ["bytes"]), + "mem.working_set_bytes": (["gauge"], ["bytes"]), "fs.total_bytes": (["gauge"], ["bytes"]), "fs.usage_bytes": (["gauge"], ["bytes"]), "fs.writes": (["gauge", "rate"], ["bytes", "bytes_per_second"]), diff --git a/monasca_agent/collector/checks_d/prometheus.py b/monasca_agent/collector/checks_d/prometheus.py index 74466d21..729055da 100644 --- a/monasca_agent/collector/checks_d/prometheus.py +++ b/monasca_agent/collector/checks_d/prometheus.py @@ -2,6 +2,7 @@ import math import requests import six +import time import yaml from prometheus_client.parser import text_string_to_metric_families @@ -29,7 +30,9 @@ class Prometheus(checks.AgentCheck): Additional settings for prometheus endpoints 'monasca.io/usek8slabels': Attach kubernetes labels of the pod that is being scraped. Default to 'true' 'monasca.io/whitelist': Yaml list of metric names to whitelist against on detected endpoint - + 'monasca.io/report_pod_label_owner': If the metrics that are scraped contain pod as a label key we will attempt to get the + pod owner and attach that to the metric as another dimension. Very useful for other scraping from other solutions + that monitor k8s (Ex. kube-state-metrics). Default to 'false' """ def __init__(self, name, init_config, agent_config, instances=None): @@ -44,6 +47,8 @@ class Prometheus(checks.AgentCheck): raise Exception('Prometheus Client only supports one configured instance if auto detection is set') if self.detect_method not in ['pod', 'service']: raise Exception('Invalid detect method {}. Must be either pod or service') + self.k8s_pod_cache = None + self.cache_start_time = None def check(self, instance): dimensions = self._set_dimensions(None, instance) @@ -60,6 +65,16 @@ class Prometheus(checks.AgentCheck): self.kubernetes_labels = instance.get('kubernetes_labels', KUBERNETES_LABELS) if not self.kubernetes_connector: self.kubernetes_connector = utils.KubernetesConnector(self.connection_timeout) + # Check if we need to clear pod cache so it does not build up over time + if self.k8s_pod_cache is not None: + if not self.cache_start_time: + self.cache_start_time = time.time() + else: + current_time = time.time() + if (current_time - self.cache_start_time) > 86400: + self.cache_start_time = current_time + self.k8s_pod_cache = {} + self.initialize_pod_cache() if self.detect_method == "pod": if not self.kubelet_url: try: @@ -69,19 +84,17 @@ class Prometheus(checks.AgentCheck): self.log.error("Could not obtain current host from Kubernetes API {}. " "Skipping check".format(e)) return - metric_endpoints, endpoints_whitelist, endpoint_metric_types = self._get_metric_endpoints_by_pod(dimensions) + prometheus_endpoints = self._get_metric_endpoints_by_pod(dimensions) # Detect by service else: - metric_endpoints, endpoints_whitelist, endpoint_metric_types = self._get_metric_endpoints_by_service(dimensions) - for metric_endpoint, endpoint_dimensions in six.iteritems(metric_endpoints): - endpoint_dimensions.update(dimensions) - self.report_endpoint_metrics(metric_endpoint, endpoint_dimensions, - endpoints_whitelist[metric_endpoint], endpoint_metric_types[metric_endpoint]) + prometheus_endpoints = self._get_metric_endpoints_by_service(dimensions) + for prometheus_endpoint in prometheus_endpoints: + self.report_endpoint_metrics(prometheus_endpoint.scrape_endpoint, prometheus_endpoint.dimensions, + prometheus_endpoint.whitelist, prometheus_endpoint.metric_types, + prometheus_endpoint.report_pod_label_owner) def _get_metric_endpoints_by_pod(self, dimensions): - scrape_endpoints = {} - endpoint_whitelist = {} - endpoint_metric_types = {} + prometheus_endpoints = [] # Grab running pods from local Kubelet try: pods = requests.get(self.kubelet_url, timeout=self.connection_timeout).json() @@ -121,21 +134,26 @@ class Prometheus(checks.AgentCheck): pod_dimensions = dimensions.copy() try: - use_k8s_labels, whitelist, metric_types = self._get_monasca_settings(pod_name, pod_annotations) + use_k8s_labels, whitelist, metric_types, report_pod_label_owner = \ + self._get_monasca_settings(pod_name, pod_annotations) except Exception as e: error_message = "Error parsing monasca annotations on endpoints {} with error - {}. " \ "Skipping scraping metrics".format(endpoints, e) self.log.error(error_message) continue + # set global_pod_cache + if report_pod_label_owner and self.k8s_pod_cache is None: + self.k8s_pod_cache = {} + self.initialize_pod_cache() if use_k8s_labels: pod_dimensions.update(utils.get_pod_dimensions( self.kubernetes_connector, pod['metadata'], self.kubernetes_labels)) for endpoint in endpoints: scrape_endpoint = "http://{}:{}".format(pod_ip, endpoint) - scrape_endpoints[scrape_endpoint] = pod_dimensions - endpoint_whitelist[scrape_endpoint] = whitelist - endpoint_metric_types[scrape_endpoint] = metric_types + prometheus_endpoint = PrometheusEndpoint(scrape_endpoint, pod_dimensions, whitelist, metric_types, + report_pod_label_owner) + prometheus_endpoints.append(prometheus_endpoint) self.log.info("Detected pod endpoint - {} with metadata " "of {}".format(scrape_endpoint, pod_dimensions)) @@ -143,12 +161,10 @@ class Prometheus(checks.AgentCheck): self.log.warn("Error parsing {} to detect for scraping - {}".format(pod, e)) continue - return scrape_endpoints, endpoint_whitelist, endpoint_metric_types + return prometheus_endpoints def _get_metric_endpoints_by_service(self, dimensions): - scrape_endpoints = {} - endpoint_whitelist = {} - endpoint_metric_types = {} + prometheus_endpoints = [] # Grab services from Kubernetes API try: services = self.kubernetes_connector.get_request("/api/v1/services") @@ -182,26 +198,31 @@ class Prometheus(checks.AgentCheck): cluster_ip = service_spec['clusterIP'] service_dimensions = dimensions.copy() try: - use_k8s_labels, whitelist, metric_types = self._get_monasca_settings(service_name, service_annotations) + use_k8s_labels, whitelist, metric_types, report_pod_label_owner = \ + self._get_monasca_settings(service_name, service_annotations) except Exception as e: error_message = "Error parsing monasca annotations on endpoints {} with error - {}. " \ "Skipping scraping metrics".format(endpoints, e) self.log.error(error_message) continue + # set global_pod_cache + if report_pod_label_owner and self.k8s_pod_cache is None: + self.k8s_pod_cache = {} + self.initialize_pod_cache() if use_k8s_labels: service_dimensions.update( self._get_service_dimensions(service_metadata)) for endpoint in endpoints: scrape_endpoint = "http://{}:{}".format(cluster_ip, endpoint) - scrape_endpoints[scrape_endpoint] = service_dimensions - endpoint_whitelist[scrape_endpoint] = whitelist - endpoint_metric_types[scrape_endpoint] = metric_types + prometheus_endpoint = PrometheusEndpoint(scrape_endpoint, service_dimensions, whitelist, metric_types, + report_pod_label_owner) + prometheus_endpoints.append(prometheus_endpoint) self.log.info("Detected service endpoint - {} with metadata " "of {}".format(scrape_endpoint, service_dimensions)) - return scrape_endpoints, endpoint_whitelist, endpoint_metric_types + return prometheus_endpoints - def _get_monasca_settings(self, service_name, annotations): + def _get_monasca_settings(self, resource_name, annotations): use_k8s_labels = annotations.get("monasca.io/usek8slabels", "true").lower() == "true" whitelist = None if "monasca.io/whitelist" in annotations: @@ -212,9 +233,11 @@ class Prometheus(checks.AgentCheck): for typ in metric_types: if metric_types[typ] not in ['rate', 'counter']: self.log.warn("Ignoring unknown metric type '{}' configured for '{}' on endpoint '{}'".format( - typ, metric_types[typ], service_name)) + typ, metric_types[typ], resource_name)) del metric_types[typ] - return use_k8s_labels, whitelist, metric_types + report_pod_label_owner_annotation = annotations.get("monasca.io/report_pod_label_owner", "false").lower() + report_pod_label_owner = True if report_pod_label_owner_annotation == "true" else False + return use_k8s_labels, whitelist, metric_types, report_pod_label_owner def _get_service_dimensions(self, service_metadata): service_dimensions = {'service_name': service_metadata['name'], @@ -260,7 +283,8 @@ class Prometheus(checks.AgentCheck): "{} {} skipped for scraping".format(self.detect_method, name)) return endpoints - def _send_metrics(self, metric_families, dimensions, endpoint_whitelist, endpoint_metric_types): + def _send_metrics(self, metric_families, dimensions, endpoint_whitelist, endpoint_metric_types, + report_pod_label_owner): for metric_family in metric_families: for metric in metric_family.samples: metric_dimensions = dimensions.copy() @@ -286,10 +310,23 @@ class Prometheus(checks.AgentCheck): elif typ == "counter": metric_func = self.increment metric_name += "_counter" + if report_pod_label_owner: + if "pod" in metric_dimensions and "namespace" in metric_dimensions: + pod_name = metric_dimensions["pod"] + if pod_name in self.k8s_pod_cache: + pod_owner, pod_owner_name = self.k8s_pod_cache[pod_name] + metric_dimensions[pod_owner] = pod_owner_name + else: + pod_owner_pair = self.get_pod_owner(pod_name, metric_dimensions['namespace']) + if pod_owner_pair: + pod_owner = pod_owner_pair[0] + pod_owner_name = pod_owner_pair[1] + metric_dimensions[pod_owner] = pod_owner_name + self.k8s_pod_cache[pod_name] = pod_owner, pod_owner_name metric_func(metric_name, metric_value, dimensions=metric_dimensions, hostname="SUPPRESS") def report_endpoint_metrics(self, metric_endpoint, endpoint_dimensions, endpoint_whitelist=None, - endpoint_metric_types=None): + endpoint_metric_types=None, report_pod_label_owner=False): # Hit metric endpoint try: result = requests.get(metric_endpoint, timeout=self.connection_timeout) @@ -300,8 +337,53 @@ class Prometheus(checks.AgentCheck): if "text/plain" in result_content_type: try: metric_families = text_string_to_metric_families(result.text) - self._send_metrics(metric_families, endpoint_dimensions, endpoint_whitelist, endpoint_metric_types) + self._send_metrics(metric_families, endpoint_dimensions, endpoint_whitelist, endpoint_metric_types, + report_pod_label_owner) except Exception as e: self.log.error("Error parsing data from {} with error {}".format(metric_endpoint, e)) else: self.log.error("Unsupported content type - {}".format(result_content_type)) + + def get_pod_owner(self, pod_name, namespace): + try: + pod = self.kubernetes_connector.get_request("/api/v1/namespaces/{}/pods/{}".format(namespace, pod_name)) + pod_metadata = pod['metadata'] + pod_owner, pod_owner_name = utils.get_pod_owner(self.kubernetes_connector, pod_metadata) + if not pod_owner: + self.log.info("Could not get pod owner for pod {}".format(pod_name)) + return None + return pod_owner, pod_owner_name + except Exception as e: + self.log.info("Could not get pod {} from Kubernetes API with error - {}".format(pod_name, e)) + return None + + def initialize_pod_cache(self): + self.k8s_pod_cache = {} + try: + pods = self.kubernetes_connector.get_request("/api/v1/pods") + except Exception as e: + exception_message = "Could not get pods from Kubernetes API with error - {}".format(e) + self.log.exception(exception_message) + raise Exception(exception_message) + for pod in pods['items']: + pod_metadata = pod['metadata'] + pod_name = pod_metadata['name'] + try: + pod_owner, pod_owner_name = utils.get_pod_owner(self.kubernetes_connector, pod_metadata) + except Exception as e: + self.log.info("Error attempting to get pod {} owner with error {}".format(pod_name, e)) + continue + if not pod_owner: + self.log.info("Could not get pod owner for pod {}".format(pod_name)) + continue + self.k8s_pod_cache[pod_name] = (pod_owner, pod_owner_name) + + +# Class to hold prometheus endpoint metadata +class PrometheusEndpoint(object): + def __init__(self, scrape_endpoint, dimensions, whitelist, metric_types, report_pod_label_owner): + self.scrape_endpoint = scrape_endpoint + self.dimensions = dimensions + self.whitelist = whitelist + self.metric_types = metric_types + self.report_pod_label_owner = report_pod_label_owner diff --git a/tests/test_agent_check.py b/tests/test_agent_check.py index 1756843d..65645120 100644 --- a/tests/test_agent_check.py +++ b/tests/test_agent_check.py @@ -29,7 +29,7 @@ class TestAgentCheck(unittest.TestCase): value_meta=None) self.assertEqual(len(check.aggregator.metrics), 1) - dimensions = {'A': '()', 'B': 'C', 'D': 'E'} + dimensions = {'A': '{}', 'B': 'C', 'D': 'E'} check.submit_metric("Bar", 5, metrics_pkg.Gauge, diff --git a/tests/test_aggregator.py b/tests/test_aggregator.py index 1c8ea3f0..771d32b6 100644 --- a/tests/test_aggregator.py +++ b/tests/test_aggregator.py @@ -166,14 +166,13 @@ class TestMetricsAggregator(unittest.TestCase): value_meta=value_meta, exception=metric_validator.InvalidDimensionValue) - def testInvalidDimensionKeyRestrictedCharacters(self): + def testValidDimensionKeyParenthesesCharacter(self): dimensions = {'A': 'B', 'B': 'C', '(D)': 'E'} value_meta = {"This is a test": "test, test, test"} self.submit_metric("Foo", 5, dimensions=dimensions, - value_meta=value_meta, - exception=metric_validator.InvalidDimensionKey) + value_meta=value_meta) def testInvalidDimensionValueRestrictedCharacters(self): dimensions = {'A': 'B;', 'B': 'C', 'D': 'E'}