diff --git a/conf.d/prometheus.yaml.example b/conf.d/prometheus.yaml.example new file mode 100644 index 00000000..fee4a068 --- /dev/null +++ b/conf.d/prometheus.yaml.example @@ -0,0 +1,20 @@ +# (C) Copyright 2017 Hewlett Packard Enterprise Development LP + +init_config: + timeout: 3 + auto_detect_endpoints: True + # Detection method can be either service or pod. Default is pod + detect_method: "pod" + +instances: + # If configuring each metric_endpoint + - metric_endpoint: "http://127.0.0.1:8000" + # Dimensions to add to every metric coming out of the plugin + default_dimensions: + app: my_app + + - metric_endpoint: "http://127.0.0.1:9000" + + # Kubernetes labels to match on when using auto detection in a Kubernetes environment. + # There can only be one instance when auto_detection_endpoints is set to true + - kubernetes_labels: ['app'] diff --git a/docs/Plugins.md b/docs/Plugins.md index e4be67dd..f03f19b9 100644 --- a/docs/Plugins.md +++ b/docs/Plugins.md @@ -76,6 +76,7 @@ - [Postfix Checks](#postfix-checks) - [PostgreSQL](#postgresql) - [Process Checks](#process-checks) + - [Prometheus](#prometheus) - [RabbitMQ Checks](#rabbitmq-checks) - [RedisDB](#redisdb) - [Riak](#riak) @@ -179,6 +180,7 @@ The following plugins are delivered via setup as part of the standard plugin che | postfix | | Provides metrics on the number of messages in a given postfix queue| | postgres | | | | process | | | +| prometheus | | | | rabbitmq | /root/.rabbitmq.cnf | | redisdb | | | | riak | | | @@ -2044,7 +2046,7 @@ monasca-setup -d ProcessCheck -a "conf_file_path=/home/stack/myprocess.yaml" ``` Example yaml input file format for process check by process names: ``` ---- + process_config: - process_names: - monasca-notification @@ -2054,7 +2056,7 @@ process_config: ``` Example yaml input file format for multiple process_names entries: ``` ---- + process_config: - process_names: - monasca-notification @@ -2120,6 +2122,72 @@ The process checks return the following metrics ( if detailed is set to true, ot On Linux, if the Agent is not run as root or the owner of the process the io metrics and the open_file_descriptors metric will fail to be reported if the mon-agent user does not have permission to get it for the process. +## Prometheus Client +This plugin is for scraping metrics from endpoints that are created by prometheus client libraries - https://prometheus.io/docs/instrumenting/clientlibs/ + +It can be configured in two ways. One being manually setting all the endpoints that you want to scrape. The other being +running in a Kubernetes environment where we autodetect on either services or pods based on annotations set. + +### Manually Configuring Endpoints +In this instance the plugin goes to a configured list of prometheus client endpoints and scrapes the posted metrics from each. + +When configuring each endpoint you can define a set of dimensions that is attached to each metric being scraped. + +By default we grab the defined labels on each metric as dimensions. + +Example yaml file: + +``` +init_config: + # Timeout on connections to each endpoint + timeout: 3 +instances: + - metric_endpoint: "http://127.0.0.1:8000" + # Dimensions to add to every metric coming out of the plugin + default_dimensions: + app: my_app + + - metric_endpoint: "http://127.0.0.1:9000" +``` + +### Running in a Kubernetes Environment with autodetection +There are two ways for the autodetection to be set up. One for auto detecting based on pods and the other auto detecting +for services. In both cases it is looking for the annotations set for the Kubernetes service or pod. + +The annotations the plugin is looking for are - +* prometheus.io/scrape: Only scrape pods that have a value of 'true' +* prometheus.io/path: If the metrics path is not '/metrics' override this. +* prometheus.io/port: Scrape the pod on the indicated port instead of the default of '9102'. + +These annotations are pulled from the Kubelet for pod autodetection and the Kubernetes API for the service auto detection + +There is also configuration parameter of "kubernetes_labels" where it will look for Kubernetes tags to use as dimensions +for metrics coming out. By default that will be set to "app" + +Example yaml file (by pod): + +``` +init_config: + timeout: 3 + auto_detect_endpoints: True + detect_method: "pod" +instances: +- kubernetes_labels: ['app'] +``` + +Example yaml file (by service): + +``` +init_config: + timeout: 3 + auto_detect_endpoints: True + detect_method: "service" +instances: +- kubernetes_labels: ['app'] +``` + +**NOTE** This Plugin can only have one configured instance + ## RabbitMQ Checks This section describes the RabbitMQ check that can be performed by the Agent. The RabbitMQ check gathers metrics on Nodes, Exchanges and Queues from the rabbit server. The RabbitMQ check requires a configuration file called rabbitmq.yaml to be available in the agent conf.d configuration directory. The config file must contain the names of the Exchanges and Queues that you are interested in monitoring. diff --git a/monasca_agent/collector/checks/utils.py b/monasca_agent/collector/checks/utils.py index a5865e1f..87d67406 100644 --- a/monasca_agent/collector/checks/utils.py +++ b/monasca_agent/collector/checks/utils.py @@ -2,6 +2,7 @@ # (C) Copyright 2017 KylinCloud import base64 +import json import logging import math from numbers import Number @@ -743,3 +744,70 @@ class DynamicCheckHelper(object): metric = re.sub(r"_\.", ".", metric) return metric + + +def get_pod_dimensions(kubernetes_connector, pod_metadata, kubernetes_labels): + pod_name = pod_metadata['name'] + pod_dimensions = {'pod_name': pod_name, 'namespace': pod_metadata['namespace']} + if "labels" in pod_metadata: + pod_labels = pod_metadata['labels'] + for label in kubernetes_labels: + if label in pod_labels: + pod_dimensions[label] = pod_labels[label] + # Get owner of pod to set as a dimension + # Try to get from pod owner references + pod_owner_references = pod_metadata.get('ownerReferences', None) + if pod_owner_references: + try: + if len(pod_owner_references) > 1: + log.warn("More then one owner for pod {}".format(pod_name)) + pod_owner_reference = pod_owner_references[0] + pod_owner_type = pod_owner_reference['kind'] + pod_owner_name = pod_owner_reference['name'] + _set_pod_owner_dimension(kubernetes_connector, pod_dimensions, pod_owner_type, pod_owner_name) + except Exception: + log.info("Could not get pod owner from ownerReferences for pod {}".format(pod_name)) + # Try to get owner from annotations + else: + try: + pod_created_by = json.loads(pod_metadata['annotations']['kubernetes.io/created-by']) + pod_owner_type = pod_created_by['reference']['kind'] + pod_owner_name = pod_created_by['reference']['name'] + _set_pod_owner_dimension(kubernetes_connector, pod_dimensions, pod_owner_type, pod_owner_name) + except Exception: + log.info("Could not get pod owner from annotations for pod {}".format(pod_name)) + return pod_dimensions + + +def _get_deployment_name(kubernetes_connector, pod_owner_name, pod_namespace): + replica_set_endpoint = "/apis/extensions/v1beta1/namespaces/{}/replicasets/{}".format(pod_namespace, pod_owner_name) + try: + replica_set = kubernetes_connector.get_request(replica_set_endpoint) + replica_set_annotations = replica_set['metadata']['annotations'] + if "deployment.kubernetes.io/revision" in replica_set_annotations: + return "-".join(pod_owner_name.split("-")[:-1]) + except Exception as e: + log.warn("Could not connect to api to get replicaset data - {}".format(e)) + return None + return None + + +def _set_pod_owner_dimension(kubernetes_connector, pod_dimensions, pod_owner_type, pod_owner_name): + if pod_owner_type == "ReplicationController": + pod_dimensions['replication_controller'] = pod_owner_name + elif pod_owner_type == "ReplicaSet": + if not kubernetes_connector: + log.error("Can not set deployment name as connection information to API is not set. " + "Setting ReplicaSet as dimension") + deployment_name = None + else: + deployment_name = _get_deployment_name(kubernetes_connector, pod_owner_name, pod_dimensions['namespace']) + if not deployment_name: + pod_dimensions['replica_set'] = pod_owner_name + else: + pod_dimensions['deployment'] = deployment_name + elif pod_owner_type == "DaemonSet": + pod_dimensions['daemon_set'] = pod_owner_name + else: + log.info("Unsupported pod owner kind {} as a dimension for pod {}".format(pod_owner_type, + pod_dimensions)) diff --git a/monasca_agent/collector/checks_d/kubernetes.py b/monasca_agent/collector/checks_d/kubernetes.py index 5d590568..f56b2eb2 100644 --- a/monasca_agent/collector/checks_d/kubernetes.py +++ b/monasca_agent/collector/checks_d/kubernetes.py @@ -172,7 +172,8 @@ class Kubernetes(checks.AgentCheck): # Pod does not have any containers assigned to it no-op going to next pod continue pod_dimensions = dimensions.copy() - pod_dimensions.update(self._get_pod_dimensions(pod['metadata'], kubernetes_labels)) + pod_dimensions.update(utils.get_pod_dimensions(self.kubernetes_connector, pod['metadata'], + kubernetes_labels)) pod_key = pod_dimensions['pod_name'] + pod_dimensions['namespace'] pod_dimensions_map[pod_key] = pod_dimensions pod_retry_count = 0 @@ -243,70 +244,6 @@ class Kubernetes(checks.AgentCheck): return cpu / 1000 return float(cpu_string) - def _get_pod_dimensions(self, pod_metadata, kubernetes_labels): - pod_name = pod_metadata['name'] - pod_dimensions = {'pod_name': pod_name, 'namespace': pod_metadata['namespace']} - if "labels" in pod_metadata: - pod_labels = pod_metadata['labels'] - for label in kubernetes_labels: - if label in pod_labels: - pod_dimensions[label] = pod_labels[label] - # Get owner of pod to set as a dimension - # Try to get from pod owner references - pod_owner_references = pod_metadata.get('ownerReferences', None) - if pod_owner_references: - try: - if len(pod_owner_references) > 1: - self.log.warn("More then one owner for pod {}".format(pod_name)) - pod_owner_reference = pod_owner_references[0] - pod_owner_type = pod_owner_reference['kind'] - pod_owner_name = pod_owner_reference['name'] - self._set_pod_owner_dimension(pod_dimensions, pod_owner_type, pod_owner_name) - except Exception: - self.log.info("Could not get pod owner from ownerReferences for pod {}".format(pod_name)) - # Try to get owner from annotations - else: - try: - pod_created_by = json.loads(pod_metadata['annotations']['kubernetes.io/created-by']) - pod_owner_type = pod_created_by['reference']['kind'] - pod_owner_name = pod_created_by['reference']['name'] - self._set_pod_owner_dimension(pod_dimensions, pod_owner_type, pod_owner_name) - except Exception: - self.log.info("Could not get pod owner from annotations for pod {}".format(pod_name)) - return pod_dimensions - - def _get_deployment_name(self, pod_owner_name, pod_namespace): - replica_set_endpoint = "/apis/extensions/v1beta1/namespaces/{}" \ - "/replicasets/{}".format(pod_namespace, - pod_owner_name) - try: - replica_set = self.kubernetes_connector.get_request(replica_set_endpoint) - replica_set_annotations = replica_set['metadata']['annotations'] - if "deployment.kubernetes.io/revision" in replica_set_annotations: - return "-".join(pod_owner_name.split("-")[:-1]) - except Exception as e: - self.log.warn("Could not connect to api to get replicaset data - {}".format(e)) - - def _set_pod_owner_dimension(self, pod_dimensions, pod_owner_type, pod_owner_name): - if pod_owner_type == "ReplicationController": - pod_dimensions['replication_controller'] = pod_owner_name - elif pod_owner_type == "ReplicaSet": - if not self.kubernetes_connector: - self.log.error("Can not set deployment name as connection information to API is not set." - " Setting ReplicaSet as dimension") - deployment_name = None - else: - deployment_name = self._get_deployment_name(pod_owner_name, pod_dimensions['namespace']) - if not deployment_name: - pod_dimensions['replica_set'] = pod_owner_name - else: - pod_dimensions['deployment'] = deployment_name - elif pod_owner_type == "DaemonSet": - pod_dimensions['daemon_set'] = pod_owner_name - else: - self.log.info("Unsupported pod owner kind {} as a dimension for" - " pod {}".format(pod_owner_type, pod_dimensions)) - def _send_metrics(self, metric_name, value, dimensions, metric_types, metric_units): for metric_type in metric_types: diff --git a/monasca_agent/collector/checks_d/prometheus.py b/monasca_agent/collector/checks_d/prometheus.py new file mode 100644 index 00000000..f8fdd2ea --- /dev/null +++ b/monasca_agent/collector/checks_d/prometheus.py @@ -0,0 +1,246 @@ +# (C) Copyright 2017 Hewlett Packard Enterprise Development LP +import math +import requests +import six + +from prometheus_client.parser import text_string_to_metric_families + +import monasca_agent.collector.checks as checks +import monasca_agent.collector.checks.utils as utils + +KUBERNETES_LABELS = ['app'] + + +class Prometheus(checks.AgentCheck): + """Scrapes metrics from Prometheus endpoints + Can be configured three ways: + 1. Autodetect endpoints by pod annotations + 2. Autodetect endpoints by services + 3. Manually configure each prometheus endpoints to scrape + + We autodetect based on the annotations assigned to pods/services. + + We look for the following entries: + 'prometheus.io/scrape': Only scrape pods that have a value of 'true' + 'prometheus.io/path': If the metrics path is not '/metrics' override this. + 'prometheus.io/port': Scrape the pod on the indicated port instead of the default of '9102'. + """ + + def __init__(self, name, init_config, agent_config, instances=None): + super(Prometheus, self).__init__(name, init_config, agent_config, instances) + self.connection_timeout = init_config.get("timeout", 3) + self.auto_detect_endpoints = init_config.get("auto_detect_endpoints", False) + if self.auto_detect_endpoints: + self.kubernetes_connector = None + self.detect_method = init_config.get("detect_method", "pod").lower() + self.kubelet_url = None + if instances is not None and len(instances) > 1: + raise Exception('Prometheus Client only supports one configured instance if auto detection is set') + if self.detect_method not in ['pod', 'service']: + raise Exception('Invalid detect method {}. Must be either pod or service') + + def check(self, instance): + dimensions = self._set_dimensions(None, instance) + del dimensions['hostname'] + if not self.auto_detect_endpoints: + metric_endpoint = instance.get("metric_endpoint", None) + if not metric_endpoint: + self.log.error("metric_endpoint must be defined for each instance") + return + endpoint_dimensions = instance.get("default_dimensions", {}) + endpoint_dimensions.update(dimensions) + self.report_endpoint_metrics(metric_endpoint, endpoint_dimensions) + else: + self.kubernetes_labels = instance.get('kubernetes_labels', KUBERNETES_LABELS) + if not self.kubernetes_connector: + self.kubernetes_connector = utils.KubernetesConnector(self.connection_timeout) + if self.detect_method == "pod": + if not self.kubelet_url: + try: + host = self.kubernetes_connector.get_agent_pod_host() + self.kubelet_url = "http://{}:10255/pods".format(host) + except Exception as e: + self.log.error("Could not obtain current host from Kubernetes API {}. " + "Skipping check".format(e)) + return + metric_endpoints = self._get_metric_endpoints_by_pod(dimensions) + # Detect by service + else: + metric_endpoints = self._get_metric_endpoints_by_service(dimensions) + for metric_endpoint, endpoint_dimensions in six.iteritems(metric_endpoints): + endpoint_dimensions.update(dimensions) + self.report_endpoint_metrics(metric_endpoint, endpoint_dimensions) + + def _get_metric_endpoints_by_pod(self, dimensions): + scrape_endpoints = {} + # Grab running pods from local Kubelet + try: + pods = requests.get(self.kubelet_url, timeout=self.connection_timeout).json() + except Exception as e: + exception_message = "Could not get pods from local kubelet with error - {}".format(e) + self.log.exception(exception_message) + raise Exception(exception_message) + + # Iterate through each pod and check if it contains a scrape endpoint + for pod in pods['items']: + try: + pod_metadata = pod['metadata'] + pod_spec = pod['spec'] + pod_status = pod['status'] + if "annotations" not in pod_metadata or not ('containers' in pod_spec and 'podIP' in pod_status): + # No annotations, containers, or endpoints skipping pod + continue + + # Check pod annotations if we should scrape pod + pod_annotations = pod_metadata['annotations'] + prometheus_scrape = pod_annotations.get("prometheus.io/scrape", "false").lower() + if prometheus_scrape != "true": + continue + pod_ports = [] + pod_containers = pod_spec['containers'] + for container in pod_containers: + if "ports" in container: + pod_ports += container['ports'] + pod_name = pod_metadata['name'] + endpoints = self._get_prometheus_endpoint(pod_annotations, pod_ports, pod_name) + if not endpoints: + continue + + # Add pod endpoint to scrape endpoints + pod_ip = pod_status['podIP'] + # Loop through list of ports and build list of endpoints + + pod_dimensions = dimensions.copy() + pod_dimensions.update(utils.get_pod_dimensions( + self.kubernetes_connector, pod['metadata'], + self.kubernetes_labels)) + for endpoint in endpoints: + scrape_endpoint = "http://{}:{}".format(pod_ip, endpoint) + scrape_endpoints[scrape_endpoint] = pod_dimensions + self.log.info("Detected pod endpoint - {} with metadata " + "of {}".format(scrape_endpoint, + pod_dimensions)) + except Exception as e: + self.log.warn("Error parsing {} to detect for scraping - {}".format(pod, e)) + continue + + return scrape_endpoints + + def _get_metric_endpoints_by_service(self, dimensions): + scrape_endpoints = {} + # Grab services from Kubernetes API + try: + services = self.kubernetes_connector.get_request("/api/v1/services") + except Exception as e: + exception_message = "Could not get services from Kubernetes API with error - {}".format(e) + self.log.exception(exception_message) + raise Exception(exception_message) + + # Iterate through each service and check if it is a scape endpoint + for service in services['items']: + service_metadata = service['metadata'] + service_spec = service['spec'] + if "annotations" not in service_metadata or "ports" not in service_spec: + # No annotations or pods skipping service + continue + + # Check service annotations if we should scrape service + service_annotations = service_metadata['annotations'] + prometheus_scrape = service_annotations.get("prometheus.io/scrape", "false").lower() + if prometheus_scrape != "true": + continue + service_name = service_metadata['name'] + service_ports = service_spec['ports'] + endpoints = self._get_prometheus_endpoint(service_annotations, + service_ports, + service_name) + if not endpoints: + continue + + # Add service endpoint to scrape endpoints + cluster_ip = service_spec['clusterIP'] + service_dimensions = dimensions.copy() + service_dimensions.update( + self._get_service_dimensions(service_metadata)) + for endpoint in endpoints: + scrape_endpoint = "http://{}:{}".format(cluster_ip, endpoint) + scrape_endpoints[scrape_endpoint] = service_dimensions + self.log.info("Detected service endpoint - {} with metadata " + "of {}".format(scrape_endpoint, + service_dimensions)) + return scrape_endpoints + + def _get_service_dimensions(self, service_metadata): + service_dimensions = {'service_name': service_metadata['name'], + 'namespace': service_metadata['namespace']} + if "labels" in service_metadata: + service_labels = service_metadata['labels'] + for label in self.kubernetes_labels: + if label in service_labels: + service_dimensions[label] = service_labels[label] + return service_dimensions + + def _get_prometheus_endpoint(self, annotations, ports, name): + """Analyzes annotations and ports to generate a scrape target""" + pod_index = "containerPort" if self.detect_method == "pod" else "port" + configured_ports = [] + if "prometheus.io/port" in annotations: + configured_ports = annotations.get("prometheus.io/port").split(',') + configured_ports = [int(i) for i in configured_ports] + + if self.detect_method == "pod" and not configured_ports: + configured_ports = [9102] + prometheus_endpoint = annotations.get("prometheus.io/path", "/metrics") + + endpoints = [] + for port in ports: + for configured_port in configured_ports: + if port[pod_index] == configured_port: + # Build up list of ports and prometheus endpoints to return + endpoints += "{}/{}".format(configured_port, + prometheus_endpoint) + + if len(ports) == 1 and not endpoints: + self.log.info("Could not find matching port using only port " + "configured") + endpoints += "{}/{}".format(ports[pod_index], prometheus_endpoint) + + if not endpoints: + self.log.error("Can not derive which port to use. Due to more " + "then one port configured and none of them " + "selected via configurations. {} {} skipped for " + "scraping".format(self.detect_method, name)) + return endpoints + + def _send_metrics(self, metric_families, dimensions): + for metric_family in metric_families: + for metric in metric_family.samples: + metric_dimensions = dimensions.copy() + metric_name = metric[0] + metric_labels = metric[1] + metric_value = float(metric[2]) + if math.isnan(metric_value): + self.log.debug('filtering out NaN value provided for metric %s{%s}', metric_name, metric_labels) + continue + # remove empty string dimensions from prometheus labels + for dim_key, dim_value in metric_labels.items(): + if len(dim_value) > 0: + metric_dimensions[dim_key] = dim_value + self.gauge(metric_name, metric_value, dimensions=metric_dimensions, hostname="SUPPRESS") + + def report_endpoint_metrics(self, metric_endpoint, endpoint_dimensions): + # Hit metric endpoint + try: + result = requests.get(metric_endpoint, timeout=self.connection_timeout) + except Exception as e: + self.log.error("Could not get metrics from {} with error {}".format(metric_endpoint, e)) + else: + result_content_type = result.headers['Content-Type'] + if "text/plain" in result_content_type: + try: + metric_families = text_string_to_metric_families(result.text) + self._send_metrics(metric_families, endpoint_dimensions) + except Exception as e: + self.log.error("Error parsing data from {} with error {}".format(metric_endpoint, e)) + else: + self.log.error("Unsupported content type - {}".format(result_content_type))