charm-ceph-mon/lib/charms/prometheus_k8s/v0/prometheus_scrape.py

2288 lines
91 KiB
Python

# Copyright 2021 Canonical Ltd.
# See LICENSE file for licensing details.
"""## Overview.
This document explains how to integrate with the Prometheus charm
for the purpose of providing a metrics endpoint to Prometheus. It
also explains how alternative implementations of the Prometheus charms
may maintain the same interface and be backward compatible with all
currently integrated charms. Finally this document is the
authoritative reference on the structure of relation data that is
shared between Prometheus charms and any other charm that intends to
provide a scrape target for Prometheus.
## Provider Library Usage
This Prometheus charm interacts with its scrape targets using its
charm library. Charms seeking to expose metric endpoints for the
Prometheus charm, must do so using the `MetricsEndpointProvider`
object from this charm library. For the simplest use cases, using the
`MetricsEndpointProvider` object only requires instantiating it,
typically in the constructor of your charm (the one which exposes a
metrics endpoint). The `MetricsEndpointProvider` constructor requires
the name of the relation over which a scrape target (metrics endpoint)
is exposed to the Prometheus charm. This relation must use the
`prometheus_scrape` interface. By default address of the metrics
endpoint is set to the unit IP address, by each unit of the
`MetricsEndpointProvider` charm. These units set their address in
response to the `PebbleReady` event of each container in the unit,
since container restarts of Kubernetes charms can result in change of
IP addresses. The default name for the metrics endpoint relation is
`metrics-endpoint`. It is strongly recommended to use the same
relation name for consistency across charms and doing so obviates the
need for an additional constructor argument. The
`MetricsEndpointProvider` object may be instantiated as follows
from charms.prometheus_k8s.v0.prometheus_scrape import MetricsEndpointProvider
def __init__(self, *args):
super().__init__(*args)
...
self.metrics_endpoint = MetricsEndpointProvider(self)
...
Note that the first argument (`self`) to `MetricsEndpointProvider` is
always a reference to the parent (scrape target) charm.
An instantiated `MetricsEndpointProvider` object will ensure that each
unit of its parent charm, is a scrape target for the
`MetricsEndpointConsumer` (Prometheus) charm. By default
`MetricsEndpointProvider` assumes each unit of the consumer charm
exports its metrics at a path given by `/metrics` on port 80. These
defaults may be changed by providing the `MetricsEndpointProvider`
constructor an optional argument (`jobs`) that represents a
Prometheus scrape job specification using Python standard data
structures. This job specification is a subset of Prometheus' own
[scrape
configuration](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config)
format but represented using Python data structures. More than one job
may be provided using the `jobs` argument. Hence `jobs` accepts a list
of dictionaries where each dictionary represents one `<scrape_config>`
object as described in the Prometheus documentation. The currently
supported configuration subset is: `job_name`, `metrics_path`,
`static_configs`
Suppose it is required to change the port on which scraped metrics are
exposed to 8000. This may be done by providing the following data
structure as the value of `jobs`.
```
[
{
"static_configs": [
{
"targets": ["*:8000"]
}
]
}
]
```
The wildcard ("*") host specification implies that the scrape targets
will automatically be set to the host addresses advertised by each
unit of the consumer charm.
It is also possible to change the metrics path and scrape multiple
ports, for example
```
[
{
"metrics_path": "/my-metrics-path",
"static_configs": [
{
"targets": ["*:8000", "*:8081"],
}
]
}
]
```
More complex scrape configurations are possible. For example
```
[
{
"static_configs": [
{
"targets": ["10.1.32.215:7000", "*:8000"],
"labels": {
"some-key": "some-value"
}
}
]
}
]
```
This example scrapes the target "10.1.32.215" at port 7000 in addition
to scraping each unit at port 8000. There is however one difference
between wildcard targets (specified using "*") and fully qualified
targets (such as "10.1.32.215"). The Prometheus charm automatically
associates labels with metrics generated by each target. These labels
localise the source of metrics within the Juju topology by specifying
its "model name", "model UUID", "application name" and "unit
name". However unit name is associated only with wildcard targets but
not with fully qualified targets.
Multiple jobs with different metrics paths and labels are allowed, but
each job must be given a unique name:
```
[
{
"job_name": "my-first-job",
"metrics_path": "one-path",
"static_configs": [
{
"targets": ["*:7000"],
"labels": {
"some-key": "some-value"
}
}
]
},
{
"job_name": "my-second-job",
"metrics_path": "another-path",
"static_configs": [
{
"targets": ["*:8000"],
"labels": {
"some-other-key": "some-other-value"
}
}
]
}
]
```
**Important:** `job_name` should be a fixed string (e.g. hardcoded literal).
For instance, if you include variable elements, like your `unit.name`, it may break
the continuity of the metrics time series gathered by Prometheus when the leader unit
changes (e.g. on upgrade or rescale).
Additionally, it is also technically possible, but **strongly discouraged**, to
configure the following scrape-related settings, which behave as described by the
[Prometheus documentation](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config):
- `static_configs`
- `scrape_interval`
- `scrape_timeout`
- `proxy_url`
- `relabel_configs`
- `metrics_relabel_configs`
- `sample_limit`
- `label_limit`
- `label_name_length_limit`
- `label_value_length_limit`
The settings above are supported by the `prometheus_scrape` library only for the sake of
specialized facilities like the [Prometheus Scrape Config](https://charmhub.io/prometheus-scrape-config-k8s)
charm. Virtually no charms should use these settings, and charmers definitely **should not**
expose them to the Juju administrator via configuration options.
## Consumer Library Usage
The `MetricsEndpointConsumer` object may be used by Prometheus
charms to manage relations with their scrape targets. For this
purposes a Prometheus charm needs to do two things
1. Instantiate the `MetricsEndpointConsumer` object by providing it a
reference to the parent (Prometheus) charm and optionally the name of
the relation that the Prometheus charm uses to interact with scrape
targets. This relation must confirm to the `prometheus_scrape`
interface and it is strongly recommended that this relation be named
`metrics-endpoint` which is its default value.
For example a Prometheus charm may instantiate the
`MetricsEndpointConsumer` in its constructor as follows
from charms.prometheus_k8s.v0.prometheus_scrape import MetricsEndpointConsumer
def __init__(self, *args):
super().__init__(*args)
...
self.metrics_consumer = MetricsEndpointConsumer(self)
...
2. A Prometheus charm also needs to respond to the
`TargetsChangedEvent` event of the `MetricsEndpointConsumer` by adding itself as
an observer for these events, as in
self.framework.observe(
self.metrics_consumer.on.targets_changed,
self._on_scrape_targets_changed,
)
In responding to the `TargetsChangedEvent` event the Prometheus
charm must update the Prometheus configuration so that any new scrape
targets are added and/or old ones removed from the list of scraped
endpoints. For this purpose the `MetricsEndpointConsumer` object
exposes a `jobs()` method that returns a list of scrape jobs. Each
element of this list is the Prometheus scrape configuration for that
job. In order to update the Prometheus configuration, the Prometheus
charm needs to replace the current list of jobs with the list provided
by `jobs()` as follows
def _on_scrape_targets_changed(self, event):
...
scrape_jobs = self.metrics_consumer.jobs()
for job in scrape_jobs:
prometheus_scrape_config.append(job)
...
## Alerting Rules
This charm library also supports gathering alerting rules from all
related `MetricsEndpointProvider` charms and enabling corresponding alerts within the
Prometheus charm. Alert rules are automatically gathered by `MetricsEndpointProvider`
charms when using this library, from a directory conventionally named
`prometheus_alert_rules`. This directory must reside at the top level
in the `src` folder of the consumer charm. Each file in this directory
is assumed to be in one of two formats:
- the official prometheus alert rule format, conforming to the
[Prometheus docs](https://prometheus.io/docs/prometheus/latest/configuration/alerting_rules/)
- a single rule format, which is a simplified subset of the official format,
comprising a single alert rule per file, using the same YAML fields.
The file name must have the `.rule` extension.
An example of the contents of such a file in the custom single rule
format is shown below.
```
alert: HighRequestLatency
expr: job:request_latency_seconds:mean5m{my_key=my_value} > 0.5
for: 10m
labels:
severity: Medium
type: HighLatency
annotations:
summary: High request latency for {{ $labels.instance }}.
```
The `MetricsEndpointProvider` will read all available alert rules and
also inject "filtering labels" into the alert expressions. The
filtering labels ensure that alert rules are localised to the metrics
provider charm's Juju topology (application, model and its UUID). Such
a topology filter is essential to ensure that alert rules submitted by
one provider charm generates alerts only for that same charm. When
alert rules are embedded in a charm, and the charm is deployed as a
Juju application, the alert rules from that application have their
expressions automatically updated to filter for metrics coming from
the units of that application alone. This remove risk of spurious
evaluation, e.g., when you have multiple deployments of the same charm
monitored by the same Prometheus.
Not all alerts one may want to specify can be embedded in a
charm. Some alert rules will be specific to a user's use case. This is
the case, for example, of alert rules that are based on business
constraints, like expecting a certain amount of requests to a specific
API every five minutes. Such alert rules can be specified via the
[COS Config Charm](https://charmhub.io/cos-configuration-k8s),
which allows importing alert rules and other settings like dashboards
from a Git repository.
Gathering alert rules and generating rule files within the Prometheus
charm is easily done using the `alerts()` method of
`MetricsEndpointConsumer`. Alerts generated by Prometheus will
automatically include Juju topology labels in the alerts. These labels
indicate the source of the alert. The following labels are
automatically included with each alert
- `juju_model`
- `juju_model_uuid`
- `juju_application`
## Relation Data
The Prometheus charm uses both application and unit relation data to
obtain information regarding its scrape jobs, alert rules and scrape
targets. This relation data is in JSON format and it closely resembles
the YAML structure of Prometheus [scrape configuration]
(https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config).
Units of Metrics provider charms advertise their names and addresses
over unit relation data using the `prometheus_scrape_unit_name` and
`prometheus_scrape_unit_address` keys. While the `scrape_metadata`,
`scrape_jobs` and `alert_rules` keys in application relation data
of Metrics provider charms hold eponymous information.
""" # noqa: W505
import copy
import hashlib
import ipaddress
import json
import logging
import os
import platform
import re
import socket
import subprocess
import tempfile
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union
import yaml
from charms.observability_libs.v0.juju_topology import JujuTopology
from ops.charm import CharmBase, RelationRole
from ops.framework import BoundEvent, EventBase, EventSource, Object, ObjectEvents
# The unique Charmhub library identifier, never change it
LIBID = "bc84295fef5f4049878f07b131968ee2"
# Increment this major API version when introducing breaking changes
LIBAPI = 0
# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 22
logger = logging.getLogger(__name__)
ALLOWED_KEYS = {
"job_name",
"metrics_path",
"static_configs",
"scrape_interval",
"scrape_timeout",
"proxy_url",
"relabel_configs",
"metrics_relabel_configs",
"sample_limit",
"label_limit",
"label_name_length_limit",
"label_value_length_limit",
"scheme",
"basic_auth",
"tls_config",
}
DEFAULT_JOB = {
"metrics_path": "/metrics",
"static_configs": [{"targets": ["*:80"]}],
}
DEFAULT_RELATION_NAME = "metrics-endpoint"
RELATION_INTERFACE_NAME = "prometheus_scrape"
DEFAULT_ALERT_RULES_RELATIVE_PATH = "./src/prometheus_alert_rules"
class RelationNotFoundError(Exception):
"""Raised if there is no relation with the given name is found."""
def __init__(self, relation_name: str):
self.relation_name = relation_name
self.message = "No relation named '{}' found".format(relation_name)
super().__init__(self.message)
class RelationInterfaceMismatchError(Exception):
"""Raised if the relation with the given name has a different interface."""
def __init__(
self,
relation_name: str,
expected_relation_interface: str,
actual_relation_interface: str,
):
self.relation_name = relation_name
self.expected_relation_interface = expected_relation_interface
self.actual_relation_interface = actual_relation_interface
self.message = (
"The '{}' relation has '{}' as interface rather than the expected '{}'".format(
relation_name, actual_relation_interface, expected_relation_interface
)
)
super().__init__(self.message)
class RelationRoleMismatchError(Exception):
"""Raised if the relation with the given name has a different role."""
def __init__(
self,
relation_name: str,
expected_relation_role: RelationRole,
actual_relation_role: RelationRole,
):
self.relation_name = relation_name
self.expected_relation_interface = expected_relation_role
self.actual_relation_role = actual_relation_role
self.message = "The '{}' relation has role '{}' rather than the expected '{}'".format(
relation_name, repr(actual_relation_role), repr(expected_relation_role)
)
super().__init__(self.message)
class InvalidAlertRuleEvent(EventBase):
"""Event emitted when alert rule files are not parsable.
Enables us to set a clear status on the provider.
"""
def __init__(self, handle, errors: str = "", valid: bool = False):
super().__init__(handle)
self.errors = errors
self.valid = valid
def snapshot(self) -> Dict:
"""Save alert rule information."""
return {
"valid": self.valid,
"errors": self.errors,
}
def restore(self, snapshot):
"""Restore alert rule information."""
self.valid = snapshot["valid"]
self.errors = snapshot["errors"]
class MetricsEndpointProviderEvents(ObjectEvents):
"""Events raised by :class:`InvalidAlertRuleEvent`s."""
alert_rule_status_changed = EventSource(InvalidAlertRuleEvent)
def _validate_relation_by_interface_and_direction(
charm: CharmBase,
relation_name: str,
expected_relation_interface: str,
expected_relation_role: RelationRole,
):
"""Verifies that a relation has the necessary characteristics.
Verifies that the `relation_name` provided: (1) exists in metadata.yaml,
(2) declares as interface the interface name passed as `relation_interface`
and (3) has the right "direction", i.e., it is a relation that `charm`
provides or requires.
Args:
charm: a `CharmBase` object to scan for the matching relation.
relation_name: the name of the relation to be verified.
expected_relation_interface: the interface name to be matched by the
relation named `relation_name`.
expected_relation_role: whether the `relation_name` must be either
provided or required by `charm`.
Raises:
RelationNotFoundError: If there is no relation in the charm's metadata.yaml
with the same name as provided via `relation_name` argument.
RelationInterfaceMismatchError: The relation with the same name as provided
via `relation_name` argument does not have the same relation interface
as specified via the `expected_relation_interface` argument.
RelationRoleMismatchError: If the relation with the same name as provided
via `relation_name` argument does not have the same role as specified
via the `expected_relation_role` argument.
"""
if relation_name not in charm.meta.relations:
raise RelationNotFoundError(relation_name)
relation = charm.meta.relations[relation_name]
actual_relation_interface = relation.interface_name
if actual_relation_interface != expected_relation_interface:
raise RelationInterfaceMismatchError(
relation_name, expected_relation_interface, actual_relation_interface
)
if expected_relation_role == RelationRole.provides:
if relation_name not in charm.meta.provides:
raise RelationRoleMismatchError(
relation_name, RelationRole.provides, RelationRole.requires
)
elif expected_relation_role == RelationRole.requires:
if relation_name not in charm.meta.requires:
raise RelationRoleMismatchError(
relation_name, RelationRole.requires, RelationRole.provides
)
else:
raise Exception("Unexpected RelationDirection: {}".format(expected_relation_role))
def _sanitize_scrape_configuration(job) -> dict:
"""Restrict permissible scrape configuration options.
If job is empty then a default job is returned. The
default job is
```
{
"metrics_path": "/metrics",
"static_configs": [{"targets": ["*:80"]}],
}
```
Args:
job: a dict containing a single Prometheus job
specification.
Returns:
a dictionary containing a sanitized job specification.
"""
sanitized_job = DEFAULT_JOB.copy()
sanitized_job.update({key: value for key, value in job.items() if key in ALLOWED_KEYS})
return sanitized_job
class InvalidAlertRulePathError(Exception):
"""Raised if the alert rules folder cannot be found or is otherwise invalid."""
def __init__(
self,
alert_rules_absolute_path: Path,
message: str,
):
self.alert_rules_absolute_path = alert_rules_absolute_path
self.message = message
super().__init__(self.message)
def _is_official_alert_rule_format(rules_dict: dict) -> bool:
"""Are alert rules in the upstream format as supported by Prometheus.
Alert rules in dictionary format are in "official" form if they
contain a "groups" key, since this implies they contain a list of
alert rule groups.
Args:
rules_dict: a set of alert rules in Python dictionary format
Returns:
True if alert rules are in official Prometheus file format.
"""
return "groups" in rules_dict
def _is_single_alert_rule_format(rules_dict: dict) -> bool:
"""Are alert rules in single rule format.
The Prometheus charm library supports reading of alert rules in a
custom format that consists of a single alert rule per file. This
does not conform to the official Prometheus alert rule file format
which requires that each alert rules file consists of a list of
alert rule groups and each group consists of a list of alert
rules.
Alert rules in dictionary form are considered to be in single rule
format if in the least it contains two keys corresponding to the
alert rule name and alert expression.
Returns:
True if alert rule is in single rule file format.
"""
# one alert rule per file
return set(rules_dict) >= {"alert", "expr"}
class AlertRules:
"""Utility class for amalgamating prometheus alert rule files and injecting juju topology.
An `AlertRules` object supports aggregating alert rules from files and directories in both
official and single rule file formats using the `add_path()` method. All the alert rules
read are annotated with Juju topology labels and amalgamated into a single data structure
in the form of a Python dictionary using the `as_dict()` method. Such a dictionary can be
easily dumped into JSON format and exchanged over relation data. The dictionary can also
be dumped into YAML format and written directly into an alert rules file that is read by
Prometheus. Note that multiple `AlertRules` objects must not be written into the same file,
since Prometheus allows only a single list of alert rule groups per alert rules file.
The official Prometheus format is a YAML file conforming to the Prometheus documentation
(https://prometheus.io/docs/prometheus/latest/configuration/alerting_rules/).
The custom single rule format is a subsection of the official YAML, having a single alert
rule, effectively "one alert per file".
"""
# This class uses the following terminology for the various parts of a rule file:
# - alert rules file: the entire groups[] yaml, including the "groups:" key.
# - alert groups (plural): the list of groups[] (a list, i.e. no "groups:" key) - it is a list
# of dictionaries that have the "name" and "rules" keys.
# - alert group (singular): a single dictionary that has the "name" and "rules" keys.
# - alert rules (plural): all the alerts in a given alert group - a list of dictionaries with
# the "alert" and "expr" keys.
# - alert rule (singular): a single dictionary that has the "alert" and "expr" keys.
def __init__(self, topology: Optional[JujuTopology] = None):
"""Build and alert rule object.
Args:
topology: an optional `JujuTopology` instance that is used to annotate all alert rules.
"""
self.topology = topology
self.tool = CosTool(None)
self.alert_groups = [] # type: List[dict]
def _from_file(self, root_path: Path, file_path: Path) -> List[dict]:
"""Read a rules file from path, injecting juju topology.
Args:
root_path: full path to the root rules folder (used only for generating group name)
file_path: full path to a *.rule file.
Returns:
A list of dictionaries representing the rules file, if file is valid (the structure is
formed by `yaml.safe_load` of the file); an empty list otherwise.
"""
with file_path.open() as rf:
# Load a list of rules from file then add labels and filters
try:
rule_file = yaml.safe_load(rf)
except Exception as e:
logger.error("Failed to read alert rules from %s: %s", file_path.name, e)
return []
if not rule_file:
logger.warning("Empty rules file: %s", file_path.name)
return []
if not isinstance(rule_file, dict):
logger.error("Invalid rules file (must be a dict): %s", file_path.name)
return []
if _is_official_alert_rule_format(rule_file):
alert_groups = rule_file["groups"]
elif _is_single_alert_rule_format(rule_file):
# convert to list of alert groups
# group name is made up from the file name
alert_groups = [{"name": file_path.stem, "rules": [rule_file]}]
else:
# invalid/unsupported
logger.error("Invalid rules file: %s", file_path.name)
return []
# update rules with additional metadata
for alert_group in alert_groups:
# update group name with topology and sub-path
alert_group["name"] = self._group_name(
str(root_path),
str(file_path),
alert_group["name"],
)
# add "juju_" topology labels
for alert_rule in alert_group["rules"]:
if "labels" not in alert_rule:
alert_rule["labels"] = {}
if self.topology:
alert_rule["labels"].update(self.topology.label_matcher_dict)
# insert juju topology filters into a prometheus alert rule
alert_rule["expr"] = self.tool.inject_label_matchers(
re.sub(r"%%juju_topology%%,?", "", alert_rule["expr"]),
self.topology.label_matcher_dict,
)
return alert_groups
def _group_name(self, root_path: str, file_path: str, group_name: str) -> str:
"""Generate group name from path and topology.
The group name is made up of the relative path between the root dir_path, the file path,
and topology identifier.
Args:
root_path: path to the root rules dir.
file_path: path to rule file.
group_name: original group name to keep as part of the new augmented group name
Returns:
New group name, augmented by juju topology and relative path.
"""
rel_path = os.path.relpath(os.path.dirname(file_path), root_path)
rel_path = "" if rel_path == "." else rel_path.replace(os.path.sep, "_")
# Generate group name:
# - name, from juju topology
# - suffix, from the relative path of the rule file;
group_name_parts = [self.topology.identifier] if self.topology else []
group_name_parts.extend([rel_path, group_name, "alerts"])
# filter to remove empty strings
return "_".join(filter(None, group_name_parts))
@classmethod
def _multi_suffix_glob(
cls, dir_path: Path, suffixes: List[str], recursive: bool = True
) -> list:
"""Helper function for getting all files in a directory that have a matching suffix.
Args:
dir_path: path to the directory to glob from.
suffixes: list of suffixes to include in the glob (items should begin with a period).
recursive: a flag indicating whether a glob is recursive (nested) or not.
Returns:
List of files in `dir_path` that have one of the suffixes specified in `suffixes`.
"""
all_files_in_dir = dir_path.glob("**/*" if recursive else "*")
return list(filter(lambda f: f.is_file() and f.suffix in suffixes, all_files_in_dir))
def _from_dir(self, dir_path: Path, recursive: bool) -> List[dict]:
"""Read all rule files in a directory.
All rules from files for the same directory are loaded into a single
group. The generated name of this group includes juju topology.
By default, only the top directory is scanned; for nested scanning, pass `recursive=True`.
Args:
dir_path: directory containing *.rule files (alert rules without groups).
recursive: flag indicating whether to scan for rule files recursively.
Returns:
a list of dictionaries representing prometheus alert rule groups, each dictionary
representing an alert group (structure determined by `yaml.safe_load`).
"""
alert_groups = [] # type: List[dict]
# Gather all alerts into a list of groups
for file_path in self._multi_suffix_glob(dir_path, [".rule", ".rules"], recursive):
alert_groups_from_file = self._from_file(dir_path, file_path)
if alert_groups_from_file:
logger.debug("Reading alert rule from %s", file_path)
alert_groups.extend(alert_groups_from_file)
return alert_groups
def add_path(self, path: str, *, recursive: bool = False) -> None:
"""Add rules from a dir path.
All rules from files are aggregated into a data structure representing a single rule file.
All group names are augmented with juju topology.
Args:
path: either a rules file or a dir of rules files.
recursive: whether to read files recursively or not (no impact if `path` is a file).
Returns:
True if path was added else False.
"""
path = Path(path) # type: Path
if path.is_dir():
self.alert_groups.extend(self._from_dir(path, recursive))
elif path.is_file():
self.alert_groups.extend(self._from_file(path.parent, path))
else:
logger.debug("Alert rules path does not exist: %s", path)
def as_dict(self) -> dict:
"""Return standard alert rules file in dict representation.
Returns:
a dictionary containing a single list of alert rule groups.
The list of alert rule groups is provided as value of the
"groups" dictionary key.
"""
return {"groups": self.alert_groups} if self.alert_groups else {}
class TargetsChangedEvent(EventBase):
"""Event emitted when Prometheus scrape targets change."""
def __init__(self, handle, relation_id):
super().__init__(handle)
self.relation_id = relation_id
def snapshot(self):
"""Save scrape target relation information."""
return {"relation_id": self.relation_id}
def restore(self, snapshot):
"""Restore scrape target relation information."""
self.relation_id = snapshot["relation_id"]
class MonitoringEvents(ObjectEvents):
"""Event descriptor for events raised by `MetricsEndpointConsumer`."""
targets_changed = EventSource(TargetsChangedEvent)
class MetricsEndpointConsumer(Object):
"""A Prometheus based Monitoring service."""
on = MonitoringEvents()
def __init__(self, charm: CharmBase, relation_name: str = DEFAULT_RELATION_NAME):
"""A Prometheus based Monitoring service.
Args:
charm: a `CharmBase` instance that manages this
instance of the Prometheus service.
relation_name: an optional string name of the relation between `charm`
and the Prometheus charmed service. The default is "metrics-endpoint".
It is strongly advised not to change the default, so that people
deploying your charm will have a consistent experience with all
other charms that consume metrics endpoints.
Raises:
RelationNotFoundError: If there is no relation in the charm's metadata.yaml
with the same name as provided via `relation_name` argument.
RelationInterfaceMismatchError: The relation with the same name as provided
via `relation_name` argument does not have the `prometheus_scrape` relation
interface.
RelationRoleMismatchError: If the relation with the same name as provided
via `relation_name` argument does not have the `RelationRole.requires`
role.
"""
_validate_relation_by_interface_and_direction(
charm, relation_name, RELATION_INTERFACE_NAME, RelationRole.requires
)
super().__init__(charm, relation_name)
self._charm = charm
self._relation_name = relation_name
self._tool = CosTool(self._charm)
events = self._charm.on[relation_name]
self.framework.observe(events.relation_changed, self._on_metrics_provider_relation_changed)
self.framework.observe(
events.relation_departed, self._on_metrics_provider_relation_departed
)
def _on_metrics_provider_relation_changed(self, event):
"""Handle changes with related metrics providers.
Anytime there are changes in relations between Prometheus
and metrics provider charms the Prometheus charm is informed,
through a `TargetsChangedEvent` event. The Prometheus charm can
then choose to update its scrape configuration.
Args:
event: a `CharmEvent` in response to which the Prometheus
charm must update its scrape configuration.
"""
rel_id = event.relation.id
self.on.targets_changed.emit(relation_id=rel_id)
def _on_metrics_provider_relation_departed(self, event):
"""Update job config when a metrics provider departs.
When a metrics provider departs the Prometheus charm is informed
through a `TargetsChangedEvent` event so that it can update its
scrape configuration to ensure that the departed metrics provider
is removed from the list of scrape jobs and
Args:
event: a `CharmEvent` that indicates a metrics provider
unit has departed.
"""
rel_id = event.relation.id
self.on.targets_changed.emit(relation_id=rel_id)
def jobs(self) -> list:
"""Fetch the list of scrape jobs.
Returns:
A list consisting of all the static scrape configurations
for each related `MetricsEndpointProvider` that has specified
its scrape targets.
"""
scrape_jobs = []
for relation in self._charm.model.relations[self._relation_name]:
static_scrape_jobs = self._static_scrape_config(relation)
if static_scrape_jobs:
scrape_jobs.extend(static_scrape_jobs)
scrape_jobs = _dedupe_job_names(scrape_jobs)
return scrape_jobs
def alerts(self) -> dict:
"""Fetch alerts for all relations.
A Prometheus alert rules file consists of a list of "groups". Each
group consists of a list of alerts (`rules`) that are sequentially
executed. This method returns all the alert rules provided by each
related metrics provider charm. These rules may be used to generate a
separate alert rules file for each relation since the returned list
of alert groups are indexed by that relations Juju topology identifier.
The Juju topology identifier string includes substrings that identify
alert rule related metadata such as the Juju model, model UUID and the
application name from where the alert rule originates. Since this
topology identifier is globally unique, it may be used for instance as
the name for the file into which the list of alert rule groups are
written. For each relation, the structure of data returned is a dictionary
representation of a standard prometheus rules file:
{"groups": [{"name": ...}, ...]}
per official prometheus documentation
https://prometheus.io/docs/prometheus/latest/configuration/alerting_rules/
The value of the `groups` key is such that it may be used to generate
a Prometheus alert rules file directly using `yaml.dump` but the
`groups` key itself must be included as this is required by Prometheus.
For example the list of alert rule groups returned by this method may
be written into files consumed by Prometheus as follows
```
for topology_identifier, alert_rule_groups in self.metrics_consumer.alerts().items():
filename = "juju_" + topology_identifier + ".rules"
path = os.path.join(PROMETHEUS_RULES_DIR, filename)
rules = yaml.safe_dump(alert_rule_groups)
container.push(path, rules, make_dirs=True)
```
Returns:
A dictionary mapping the Juju topology identifier of the source charm to
its list of alert rule groups.
"""
alerts = {} # type: Dict[str, dict] # mapping b/w juju identifiers and alert rule files
for relation in self._charm.model.relations[self._relation_name]:
if not relation.units or not relation.app:
continue
alert_rules = json.loads(relation.data[relation.app].get("alert_rules", "{}"))
if not alert_rules:
continue
try:
scrape_metadata = json.loads(relation.data[relation.app]["scrape_metadata"])
identifier = JujuTopology.from_dict(scrape_metadata).identifier
alerts[identifier] = self._tool.apply_label_matchers(alert_rules)
except KeyError as e:
logger.debug(
"Relation %s has no 'scrape_metadata': %s",
relation.id,
e,
)
identifier = self._get_identifier_by_alert_rules(alert_rules)
if not identifier:
logger.error(
"Alert rules were found but no usable group or identifier was present"
)
continue
alerts[identifier] = alert_rules
_, errmsg = self._tool.validate_alert_rules(alert_rules)
if errmsg:
if alerts[identifier]:
del alerts[identifier]
relation.data[self._charm.app]["event"] = json.dumps({"errors": errmsg})
continue
return alerts
def _get_identifier_by_alert_rules(self, rules: dict) -> Union[str, None]:
"""Determine an appropriate dict key for alert rules.
The key is used as the filename when writing alerts to disk, so the structure
and uniqueness is important.
Args:
rules: a dict of alert rules
"""
if "groups" not in rules:
logger.debug("No alert groups were found in relation data")
return None
# Construct an ID based on what's in the alert rules if they have labels
for group in rules["groups"]:
try:
labels = group["rules"][0]["labels"]
identifier = "{}_{}_{}".format(
labels["juju_model"],
labels["juju_model_uuid"],
labels["juju_application"],
)
return identifier
except KeyError:
logger.debug("Alert rules were found but no usable labels were present")
continue
logger.warning(
"No labeled alert rules were found, and no 'scrape_metadata' "
"was available. Using the alert group name as filename."
)
try:
for group in rules["groups"]:
return group["name"]
except KeyError:
logger.debug("No group name was found to use as identifier")
return None
def _static_scrape_config(self, relation) -> list:
"""Generate the static scrape configuration for a single relation.
If the relation data includes `scrape_metadata` then the value
of this key is used to annotate the scrape jobs with Juju
Topology labels before returning them.
Args:
relation: an `ops.model.Relation` object whose static
scrape configuration is required.
Returns:
A list (possibly empty) of scrape jobs. Each job is a
valid Prometheus scrape configuration for that job,
represented as a Python dictionary.
"""
if not relation.units:
return []
scrape_jobs = json.loads(relation.data[relation.app].get("scrape_jobs", "[]"))
if not scrape_jobs:
return []
scrape_metadata = json.loads(relation.data[relation.app].get("scrape_metadata", "{}"))
if not scrape_metadata:
return scrape_jobs
job_name_prefix = "juju_{}_prometheus_scrape".format(
JujuTopology.from_dict(scrape_metadata).identifier
)
hosts = self._relation_hosts(relation)
labeled_job_configs = []
for job in scrape_jobs:
config = self._labeled_static_job_config(
_sanitize_scrape_configuration(job),
job_name_prefix,
hosts,
scrape_metadata,
)
labeled_job_configs.append(config)
return labeled_job_configs
def _relation_hosts(self, relation) -> dict:
"""Fetch unit names and address of all metrics provider units for a single relation.
Args:
relation: An `ops.model.Relation` object for which the unit name to
address mapping is required.
Returns:
A dictionary that maps unit names to unit addresses for
the specified relation.
"""
hosts = {}
for unit in relation.units:
# TODO deprecate and remove unit.name
unit_name = relation.data[unit].get("prometheus_scrape_unit_name") or unit.name
# TODO deprecate and remove "prometheus_scrape_host"
unit_address = relation.data[unit].get(
"prometheus_scrape_unit_address"
) or relation.data[unit].get("prometheus_scrape_host")
if unit_name and unit_address:
hosts.update({unit_name: unit_address})
return hosts
def _labeled_static_job_config(self, job, job_name_prefix, hosts, scrape_metadata) -> dict:
"""Construct labeled job configuration for a single job.
Args:
job: a dictionary representing the job configuration as obtained from
`MetricsEndpointProvider` over relation data.
job_name_prefix: a string that may either be used as the
job name if the job has no associated name or used as a prefix for
the job if it does have a job name.
hosts: a dictionary mapping host names to host address for
all units of the relation for which this job configuration
must be constructed.
scrape_metadata: scrape configuration metadata obtained
from `MetricsEndpointProvider` from the same relation for
which this job configuration is being constructed.
Returns:
A dictionary representing a Prometheus job configuration
for a single job.
"""
name = job.get("job_name")
job_name = "{}_{}".format(job_name_prefix, name) if name else job_name_prefix
labeled_job = job.copy()
labeled_job["job_name"] = job_name
static_configs = job.get("static_configs")
labeled_job["static_configs"] = []
# relabel instance labels so that instance identifiers are globally unique
# stable over unit recreation
instance_relabel_config = {
"source_labels": ["juju_model", "juju_model_uuid", "juju_application"],
"separator": "_",
"target_label": "instance",
"regex": "(.*)",
}
# label all static configs in the Prometheus job
# labeling inserts Juju topology information and
# sets a relabeling config for instance labels
for static_config in static_configs:
labels = static_config.get("labels", {}) if static_configs else {}
all_targets = static_config.get("targets", [])
# split all targets into those which will have unit labels
# and those which will not
ports = []
unitless_targets = []
for target in all_targets:
host, port = self._target_parts(target)
if host.strip() == "*":
ports.append(port.strip())
else:
unitless_targets.append(target)
# label scrape targets that do not have unit labels
if unitless_targets:
unitless_config = self._labeled_unitless_config(
unitless_targets, labels, scrape_metadata
)
labeled_job["static_configs"].append(unitless_config)
# label scrape targets that do have unit labels
for host_name, host_address in hosts.items():
static_config = self._labeled_unit_config(
host_name, host_address, ports, labels, scrape_metadata
)
labeled_job["static_configs"].append(static_config)
if "juju_unit" not in instance_relabel_config["source_labels"]:
instance_relabel_config["source_labels"].append("juju_unit") # type: ignore
# ensure topology relabeling of instance label is last in order of relabelings
relabel_configs = job.get("relabel_configs", [])
relabel_configs.append(instance_relabel_config)
labeled_job["relabel_configs"] = relabel_configs
return labeled_job
def _target_parts(self, target) -> list:
"""Extract host and port from a wildcard target.
Args:
target: a string specifying a scrape target. A
scrape target is expected to have the format
"host:port". The host part may be a wildcard
"*" and the port part can be missing (along
with ":") in which case port is set to 80.
Returns:
a list with target host and port as in [host, port]
"""
if ":" in target:
parts = target.split(":")
else:
parts = [target, "80"]
return parts
def _set_juju_labels(self, labels, scrape_metadata) -> dict:
"""Create a copy of metric labels with Juju topology information.
Args:
labels: a dictionary containing Prometheus metric labels.
scrape_metadata: scrape related metadata provided by
`MetricsEndpointProvider`.
Returns:
a copy of the `labels` dictionary augmented with Juju
topology information except for unit name.
"""
juju_labels = labels.copy() # deep copy not needed
juju_labels.update(JujuTopology.from_dict(scrape_metadata).label_matcher_dict)
return juju_labels
def _labeled_unitless_config(self, targets, labels, scrape_metadata) -> dict:
"""Static scrape configuration for fully qualified host addresses.
Fully qualified hosts are those scrape targets for which the
address are specified by the `MetricsEndpointProvider` as part
of the scrape job specification set in application relation data.
The address specified need not belong to any unit of the
`MetricsEndpointProvider` charm. As a result there is no reliable
way to determine the name (Juju topology unit name) for such a
target.
Args:
targets: a list of addresses of fully qualified hosts.
labels: labels specified by `MetricsEndpointProvider` clients
which are associated with `targets`.
scrape_metadata: scrape related metadata provided by `MetricsEndpointProvider`.
Returns:
A dictionary containing the static scrape configuration
for a list of fully qualified hosts.
"""
juju_labels = self._set_juju_labels(labels, scrape_metadata)
unitless_config = {"targets": targets, "labels": juju_labels}
return unitless_config
def _labeled_unit_config(
self, unit_name, host_address, ports, labels, scrape_metadata
) -> dict:
"""Static scrape configuration for a wildcard host.
Wildcard hosts are those scrape targets whose name (Juju unit
name) and address (unit IP address) is set into unit relation
data by the `MetricsEndpointProvider` charm, which sets this
data for ALL its units.
Args:
unit_name: a string representing the unit name of the wildcard host.
host_address: a string representing the address of the wildcard host.
ports: list of ports on which this wildcard host exposes its metrics.
labels: a dictionary of labels provided by
`MetricsEndpointProvider` intended to be associated with
this wildcard host.
scrape_metadata: scrape related metadata provided by `MetricsEndpointProvider`.
Returns:
A dictionary containing the static scrape configuration
for a single wildcard host.
"""
juju_labels = self._set_juju_labels(labels, scrape_metadata)
juju_labels["juju_unit"] = unit_name
static_config = {"labels": juju_labels}
if ports:
targets = []
for port in ports:
targets.append("{}:{}".format(host_address, port))
static_config["targets"] = targets # type: ignore
else:
static_config["targets"] = [host_address] # type: ignore
return static_config
def _dedupe_job_names(jobs: List[dict]):
"""Deduplicate a list of dicts by appending a hash to the value of the 'job_name' key.
Additionally, fully de-duplicate any identical jobs.
Args:
jobs: A list of prometheus scrape jobs
"""
jobs_copy = copy.deepcopy(jobs)
# Convert to a dict with job names as keys
# I think this line is O(n^2) but it should be okay given the list sizes
jobs_dict = {
job["job_name"]: list(filter(lambda x: x["job_name"] == job["job_name"], jobs_copy))
for job in jobs_copy
}
# If multiple jobs have the same name, convert the name to "name_<hash-of-job>"
for key in jobs_dict:
if len(jobs_dict[key]) > 1:
for job in jobs_dict[key]:
job_json = json.dumps(job)
hashed = hashlib.sha256(job_json.encode()).hexdigest()
job["job_name"] = "{}_{}".format(job["job_name"], hashed)
new_jobs = []
for key in jobs_dict:
new_jobs.extend([i for i in jobs_dict[key]])
# Deduplicate jobs which are equal
# Again this in O(n^2) but it should be okay
deduped_jobs = []
seen = []
for job in new_jobs:
job_json = json.dumps(job)
hashed = hashlib.sha256(job_json.encode()).hexdigest()
if hashed in seen:
continue
seen.append(hashed)
deduped_jobs.append(job)
return deduped_jobs
def _resolve_dir_against_charm_path(charm: CharmBase, *path_elements: str) -> str:
"""Resolve the provided path items against the directory of the main file.
Look up the directory of the `main.py` file being executed. This is normally
going to be the charm.py file of the charm including this library. Then, resolve
the provided path elements and, if the result path exists and is a directory,
return its absolute path; otherwise, raise en exception.
Raises:
InvalidAlertRulePathError, if the path does not exist or is not a directory.
"""
charm_dir = Path(str(charm.charm_dir))
if not charm_dir.exists() or not charm_dir.is_dir():
# Operator Framework does not currently expose a robust
# way to determine the top level charm source directory
# that is consistent across deployed charms and unit tests
# Hence for unit tests the current working directory is used
# TODO: updated this logic when the following ticket is resolved
# https://github.com/canonical/operator/issues/643
charm_dir = Path(os.getcwd())
alerts_dir_path = charm_dir.absolute().joinpath(*path_elements)
if not alerts_dir_path.exists():
raise InvalidAlertRulePathError(alerts_dir_path, "directory does not exist")
if not alerts_dir_path.is_dir():
raise InvalidAlertRulePathError(alerts_dir_path, "is not a directory")
return str(alerts_dir_path)
class MetricsEndpointProvider(Object):
"""A metrics endpoint for Prometheus."""
on = MetricsEndpointProviderEvents()
def __init__(
self,
charm,
relation_name: str = DEFAULT_RELATION_NAME,
jobs=None,
alert_rules_path: str = DEFAULT_ALERT_RULES_RELATIVE_PATH,
refresh_event: Optional[Union[BoundEvent, List[BoundEvent]]] = None,
external_hostname: str = None,
):
"""Construct a metrics provider for a Prometheus charm.
If your charm exposes a Prometheus metrics endpoint, the
`MetricsEndpointProvider` object enables your charm to easily
communicate how to reach that metrics endpoint.
By default, a charm instantiating this object has the metrics
endpoints of each of its units scraped by the related Prometheus
charms. The scraped metrics are automatically tagged by the
Prometheus charms with Juju topology data via the
`juju_model_name`, `juju_model_uuid`, `juju_application_name`
and `juju_unit` labels. To support such tagging `MetricsEndpointProvider`
automatically forwards scrape metadata to a `MetricsEndpointConsumer`
(Prometheus charm).
Scrape targets provided by `MetricsEndpointProvider` can be
customized when instantiating this object. For example in the
case of a charm exposing the metrics endpoint for each of its
units on port 8080 and the `/metrics` path, the
`MetricsEndpointProvider` can be instantiated as follows:
self.metrics_endpoint_provider = MetricsEndpointProvider(
self,
jobs=[{
"static_configs": [{"targets": ["*:8080"]}],
}])
The notation `*:<port>` means "scrape each unit of this charm on port
`<port>`.
In case the metrics endpoints are not on the standard `/metrics` path,
a custom path can be specified as follows:
self.metrics_endpoint_provider = MetricsEndpointProvider(
self,
jobs=[{
"metrics_path": "/my/strange/metrics/path",
"static_configs": [{"targets": ["*:8080"]}],
}])
Note how the `jobs` argument is a list: this allows you to expose multiple
combinations of paths "metrics_path" and "static_configs" in case your charm
exposes multiple endpoints, which could happen, for example, when you have
multiple workload containers, with applications in each needing to be scraped.
The structure of the objects in the `jobs` list is one-to-one with the
`scrape_config` configuration item of Prometheus' own configuration (see
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config
), but with only a subset of the fields allowed. The permitted fields are
listed in `ALLOWED_KEYS` object in this charm library module.
It is also possible to specify alert rules. By default, this library will look
into the `<charm_parent_dir>/prometheus_alert_rules`, which in a standard charm
layouts resolves to `src/prometheus_alert_rules`. Each alert rule goes into a
separate `*.rule` file. If the syntax of a rule is invalid,
the `MetricsEndpointProvider` logs an error and does not load the particular
rule.
To avoid false positives and negatives in the evaluation of alert rules,
all ingested alert rule expressions are automatically qualified using Juju
Topology filters. This ensures that alert rules provided by your charm, trigger
alerts based only on data scrapped from your charm. For example an alert rule
such as the following
alert: UnitUnavailable
expr: up < 1
for: 0m
will be automatically transformed into something along the lines of the following
alert: UnitUnavailable
expr: up{juju_model=<model>, juju_model_uuid=<uuid-prefix>, juju_application=<app>} < 1
for: 0m
An attempt will be made to validate alert rules prior to loading them into Prometheus.
If they are invalid, an event will be emitted from this object which charms can respond
to in order to set a meaningful status for administrators.
This can be observed via `consumer.on.alert_rule_status_changed` which contains:
- The error(s) encountered when validating as `errors`
- A `valid` attribute, which can be used to reset the state of charms if alert rules
are updated via another mechanism (e.g. `cos-config`) and refreshed.
Args:
charm: a `CharmBase` object that manages this
`MetricsEndpointProvider` object. Typically, this is
`self` in the instantiating class.
relation_name: an optional string name of the relation between `charm`
and the Prometheus charmed service. The default is "metrics-endpoint".
It is strongly advised not to change the default, so that people
deploying your charm will have a consistent experience with all
other charms that provide metrics endpoints.
jobs: an optional list of dictionaries where each
dictionary represents the Prometheus scrape
configuration for a single job. When not provided, a
default scrape configuration is provided for the
`/metrics` endpoint polling all units of the charm on port `80`
using the `MetricsEndpointProvider` object.
alert_rules_path: an optional path for the location of alert rules
files. Defaults to "./prometheus_alert_rules",
resolved relative to the directory hosting the charm entry file.
The alert rules are automatically updated on charm upgrade.
refresh_event: an optional bound event or list of bound events which
will be observed to re-set scrape job data (IP address and others)
external_hostname: an optional argument that represents an external hostname that
can be generated by an Ingress or a Proxy.
Raises:
RelationNotFoundError: If there is no relation in the charm's metadata.yaml
with the same name as provided via `relation_name` argument.
RelationInterfaceMismatchError: The relation with the same name as provided
via `relation_name` argument does not have the `prometheus_scrape` relation
interface.
RelationRoleMismatchError: If the relation with the same name as provided
via `relation_name` argument does not have the `RelationRole.provides`
role.
"""
_validate_relation_by_interface_and_direction(
charm, relation_name, RELATION_INTERFACE_NAME, RelationRole.provides
)
try:
alert_rules_path = _resolve_dir_against_charm_path(charm, alert_rules_path)
except InvalidAlertRulePathError as e:
logger.debug(
"Invalid Prometheus alert rules folder at %s: %s",
e.alert_rules_absolute_path,
e.message,
)
super().__init__(charm, relation_name)
self.topology = JujuTopology.from_charm(charm)
self._charm = charm
self._alert_rules_path = alert_rules_path
self._relation_name = relation_name
# sanitize job configurations to the supported subset of parameters
jobs = [] if jobs is None else jobs
self._jobs = [_sanitize_scrape_configuration(job) for job in jobs]
self.external_hostname = external_hostname
events = self._charm.on[self._relation_name]
self.framework.observe(events.relation_joined, self._set_scrape_job_spec)
self.framework.observe(events.relation_changed, self._on_relation_changed)
if not refresh_event:
if len(self._charm.meta.containers) == 1:
if "kubernetes" in self._charm.meta.series:
# This is a podspec charm
refresh_event = [self._charm.on.update_status]
else:
# This is a sidecar/pebble charm
container = list(self._charm.meta.containers.values())[0]
refresh_event = [self._charm.on[container.name.replace("-", "_")].pebble_ready]
else:
logger.warning(
"%d containers are present in metadata.yaml and "
"refresh_event was not specified. Defaulting to update_status. "
"Metrics IP may not be set in a timely fashion.",
len(self._charm.meta.containers),
)
refresh_event = [self._charm.on.update_status]
else:
if not isinstance(refresh_event, list):
refresh_event = [refresh_event]
for ev in refresh_event:
self.framework.observe(ev, self._set_scrape_job_spec)
self.framework.observe(self._charm.on.upgrade_charm, self._set_scrape_job_spec)
# If there is no leader during relation_joined we will still need to set alert rules.
self.framework.observe(self._charm.on.leader_elected, self._set_scrape_job_spec)
def _on_relation_changed(self, event):
"""Check for alert rule messages in the relation data before moving on."""
if self._charm.unit.is_leader():
ev = json.loads(event.relation.data[event.app].get("event", "{}"))
if ev:
valid = bool(ev.get("valid", True))
errors = ev.get("errors", "")
if valid and not errors:
self.on.alert_rule_status_changed.emit(valid=valid)
else:
self.on.alert_rule_status_changed.emit(valid=valid, errors=errors)
self._set_scrape_job_spec(event)
def _set_scrape_job_spec(self, event):
"""Ensure scrape target information is made available to prometheus.
When a metrics provider charm is related to a prometheus charm, the
metrics provider sets specification and metadata related to its own
scrape configuration. This information is set using Juju application
data. In addition, each of the consumer units also sets its own
host address in Juju unit relation data.
"""
self._set_unit_ip(event)
if not self._charm.unit.is_leader():
return
alert_rules = AlertRules(topology=self.topology)
alert_rules.add_path(self._alert_rules_path, recursive=True)
alert_rules_as_dict = alert_rules.as_dict()
for relation in self._charm.model.relations[self._relation_name]:
relation.data[self._charm.app]["scrape_metadata"] = json.dumps(self._scrape_metadata)
relation.data[self._charm.app]["scrape_jobs"] = json.dumps(self._scrape_jobs)
if alert_rules_as_dict:
# Update relation data with the string representation of the rule file.
# Juju topology is already included in the "scrape_metadata" field above.
# The consumer side of the relation uses this information to name the rules file
# that is written to the filesystem.
relation.data[self._charm.app]["alert_rules"] = json.dumps(alert_rules_as_dict)
def _set_unit_ip(self, _):
"""Set unit host address.
Each time a metrics provider charm container is restarted it updates its own
host address in the unit relation data for the prometheus charm.
The only argument specified is an event, and it ignored. This is for expediency
to be able to use this method as an event handler, although no access to the
event is actually needed.
"""
for relation in self._charm.model.relations[self._relation_name]:
unit_ip = str(self._charm.model.get_binding(relation).network.bind_address)
if self.external_hostname:
unit_address = self.external_hostname
elif self._is_valid_unit_address(unit_ip):
unit_address = unit_ip
else:
unit_address = socket.getfqdn()
relation.data[self._charm.unit]["prometheus_scrape_unit_address"] = unit_address
relation.data[self._charm.unit]["prometheus_scrape_unit_name"] = str(
self._charm.model.unit.name
)
def _is_valid_unit_address(self, address: str) -> bool:
"""Validate a unit address.
At present only IP address validation is supported, but
this may be extended to DNS addresses also, as needed.
Args:
address: a string representing a unit address
"""
try:
_ = ipaddress.ip_address(address)
except ValueError:
return False
return True
@property
def _scrape_jobs(self) -> list:
"""Fetch list of scrape jobs.
Returns:
A list of dictionaries, where each dictionary specifies a
single scrape job for Prometheus.
"""
return self._jobs if self._jobs else [DEFAULT_JOB]
@property
def _scrape_metadata(self) -> dict:
"""Generate scrape metadata.
Returns:
Scrape configuration metadata for this metrics provider charm.
"""
return self.topology.as_dict()
class PrometheusRulesProvider(Object):
"""Forward rules to Prometheus.
This object may be used to forward rules to Prometheus. At present it only supports
forwarding alert rules. This is unlike :class:`MetricsEndpointProvider`, which
is used for forwarding both scrape targets and associated alert rules. This object
is typically used when there is a desire to forward rules that apply globally (across
all deployed charms and units) rather than to a single charm. All rule files are
forwarded using the same 'prometheus_scrape' interface that is also used by
`MetricsEndpointProvider`.
Args:
charm: A charm instance that `provides` a relation with the `prometheus_scrape` interface.
relation_name: Name of the relation in `metadata.yaml` that
has the `prometheus_scrape` interface.
dir_path: Root directory for the collection of rule files.
recursive: Whether to scan for rule files recursively.
"""
def __init__(
self,
charm: CharmBase,
relation_name: str = DEFAULT_RELATION_NAME,
dir_path: str = DEFAULT_ALERT_RULES_RELATIVE_PATH,
recursive=True,
):
super().__init__(charm, relation_name)
self._charm = charm
self._relation_name = relation_name
self._recursive = recursive
try:
dir_path = _resolve_dir_against_charm_path(charm, dir_path)
except InvalidAlertRulePathError as e:
logger.debug(
"Invalid Prometheus alert rules folder at %s: %s",
e.alert_rules_absolute_path,
e.message,
)
self.dir_path = dir_path
events = self._charm.on[self._relation_name]
event_sources = [
events.relation_joined,
events.relation_changed,
self._charm.on.leader_elected,
self._charm.on.upgrade_charm,
]
for event_source in event_sources:
self.framework.observe(event_source, self._update_relation_data)
def _reinitialize_alert_rules(self):
"""Reloads alert rules and updates all relations."""
self._update_relation_data(None)
def _update_relation_data(self, _):
"""Update application relation data with alert rules for all relations."""
if not self._charm.unit.is_leader():
return
alert_rules = AlertRules()
alert_rules.add_path(self.dir_path, recursive=self._recursive)
alert_rules_as_dict = alert_rules.as_dict()
logger.info("Updating relation data with rule files from disk")
for relation in self._charm.model.relations[self._relation_name]:
relation.data[self._charm.app]["alert_rules"] = json.dumps(
alert_rules_as_dict,
sort_keys=True, # sort, to prevent unnecessary relation_changed events
)
class MetricsEndpointAggregator(Object):
"""Aggregate metrics from multiple scrape targets.
`MetricsEndpointAggregator` collects scrape target information from one
or more related charms and forwards this to a `MetricsEndpointConsumer`
charm, which may be in a different Juju model. However, it is
essential that `MetricsEndpointAggregator` itself resides in the same
model as its scrape targets, as this is currently the only way to
ensure in Juju that the `MetricsEndpointAggregator` will be able to
determine the model name and uuid of the scrape targets.
`MetricsEndpointAggregator` should be used in place of
`MetricsEndpointProvider` in the following two use cases:
1. Integrating one or more scrape targets that do not support the
`prometheus_scrape` interface.
2. Integrating one or more scrape targets through cross model
relations. Although the [Scrape Config Operator](https://charmhub.io/cos-configuration-k8s)
may also be used for the purpose of supporting cross model
relations.
Using `MetricsEndpointAggregator` to build a Prometheus charm client
only requires instantiating it. Instantiating
`MetricsEndpointAggregator` is similar to `MetricsEndpointProvider` except
that it requires specifying the names of three relations: the
relation with scrape targets, the relation for alert rules, and
that with the Prometheus charms. For example
```python
self._aggregator = MetricsEndpointAggregator(
self,
{
"prometheus": "monitoring",
"scrape_target": "prometheus-target",
"alert_rules": "prometheus-rules"
}
)
```
`MetricsEndpointAggregator` assumes that each unit of a scrape target
sets in its unit-level relation data two entries with keys
"hostname" and "port". If it is required to integrate with charms
that do not honor these assumptions, it is always possible to
derive from `MetricsEndpointAggregator` overriding the `_get_targets()`
method, which is responsible for aggregating the unit name, host
address ("hostname") and port of the scrape target.
`MetricsEndpointAggregator` also assumes that each unit of a
scrape target sets in its unit-level relation data a key named
"groups". The value of this key is expected to be the string
representation of list of Prometheus Alert rules in YAML format.
An example of a single such alert rule is
```yaml
- alert: HighRequestLatency
expr: job:request_latency_seconds:mean5m{job="myjob"} > 0.5
for: 10m
labels:
severity: page
annotations:
summary: High request latency
```
Once again if it is required to integrate with charms that do not
honour these assumptions about alert rules then an object derived
from `MetricsEndpointAggregator` may be used by overriding the
`_get_alert_rules()` method.
`MetricsEndpointAggregator` ensures that Prometheus scrape job
specifications and alert rules are annotated with Juju topology
information, just like `MetricsEndpointProvider` and
`MetricsEndpointConsumer` do.
By default, `MetricsEndpointAggregator` ensures that Prometheus
"instance" labels refer to Juju topology. This ensures that
instance labels are stable over unit recreation. While it is not
advisable to change this option, if required it can be done by
setting the "relabel_instance" keyword argument to `False` when
constructing an aggregator object.
"""
def __init__(self, charm, relation_names, relabel_instance=True):
"""Construct a `MetricsEndpointAggregator`.
Args:
charm: a `CharmBase` object that manages this
`MetricsEndpointAggregator` object. Typically, this is
`self` in the instantiating class.
relation_names: a dictionary with three keys. The value
of the "scrape_target" and "alert_rules" keys are
the relation names over which scrape job and alert rule
information is gathered by this `MetricsEndpointAggregator`.
And the value of the "prometheus" key is the name of
the relation with a `MetricsEndpointConsumer` such as
the Prometheus charm.
relabel_instance: A boolean flag indicating if Prometheus
scrape job "instance" labels must refer to Juju Topology.
"""
super().__init__(charm, relation_names["prometheus"])
self._charm = charm
self._target_relation = relation_names["scrape_target"]
self._prometheus_relation = relation_names["prometheus"]
self._alert_rules_relation = relation_names["alert_rules"]
self._relabel_instance = relabel_instance
# manage Prometheus charm relation events
prometheus_events = self._charm.on[self._prometheus_relation]
self.framework.observe(prometheus_events.relation_joined, self._set_prometheus_data)
# manage list of Prometheus scrape jobs from related scrape targets
target_events = self._charm.on[self._target_relation]
self.framework.observe(target_events.relation_changed, self._update_prometheus_jobs)
self.framework.observe(target_events.relation_departed, self._remove_prometheus_jobs)
# manage alert rules for Prometheus from related scrape targets
alert_rule_events = self._charm.on[self._alert_rules_relation]
self.framework.observe(alert_rule_events.relation_changed, self._update_alert_rules)
self.framework.observe(alert_rule_events.relation_departed, self._remove_alert_rules)
def _set_prometheus_data(self, event):
"""Ensure every new Prometheus instances is updated.
Any time a new Prometheus unit joins the relation with
`MetricsEndpointAggregator`, that Prometheus unit is provided
with the complete set of existing scrape jobs and alert rules.
"""
jobs = [] # list of scrape jobs, one per relation
for relation in self.model.relations[self._target_relation]:
targets = self._get_targets(relation)
if targets and relation.app:
jobs.append(self._static_scrape_job(targets, relation.app.name))
groups = [] # list of alert rule groups, one group per relation
for relation in self.model.relations[self._alert_rules_relation]:
unit_rules = self._get_alert_rules(relation)
if unit_rules and relation.app:
appname = relation.app.name
rules = self._label_alert_rules(unit_rules, appname)
group = {"name": self._group_name(appname), "rules": rules}
groups.append(group)
event.relation.data[self._charm.app]["scrape_jobs"] = json.dumps(jobs)
event.relation.data[self._charm.app]["alert_rules"] = json.dumps({"groups": groups})
def _set_target_job_data(self, targets: dict, app_name: str, **kwargs) -> None:
"""Update scrape jobs in response to scrape target changes.
When there is any change in relation data with any scrape
target, the Prometheus scrape job, for that specific target is
updated. Additionally, if this method is called manually, do the
same.
Args:
targets: a `dict` containing target information
app_name: a `str` identifying the application
"""
# new scrape job for the relation that has changed
updated_job = self._static_scrape_job(targets, app_name, **kwargs)
for relation in self.model.relations[self._prometheus_relation]:
jobs = json.loads(relation.data[self._charm.app].get("scrape_jobs", "[]"))
# list of scrape jobs that have not changed
jobs = [job for job in jobs if updated_job["job_name"] != job["job_name"]]
jobs.append(updated_job)
relation.data[self._charm.app]["scrape_jobs"] = json.dumps(jobs)
def _update_prometheus_jobs(self, event):
"""Update scrape jobs in response to scrape target changes.
When there is any change in relation data with any scrape
target, the Prometheus scrape job, for that specific target is
updated.
"""
targets = self._get_targets(event.relation)
if not targets:
return
# new scrape job for the relation that has changed
updated_job = self._static_scrape_job(targets, event.relation.app.name)
for relation in self.model.relations[self._prometheus_relation]:
jobs = json.loads(relation.data[self._charm.app].get("scrape_jobs", "[]"))
# list of scrape jobs that have not changed
jobs = [job for job in jobs if updated_job["job_name"] != job["job_name"]]
jobs.append(updated_job)
relation.data[self._charm.app]["scrape_jobs"] = json.dumps(jobs)
def _remove_prometheus_jobs(self, event):
"""Remove scrape jobs when a target departs.
Any time a scrape target departs, any Prometheus scrape job
associated with that specific scrape target is removed.
"""
job_name = self._job_name(event.relation.app.name)
unit_name = event.unit.name
for relation in self.model.relations[self._prometheus_relation]:
jobs = json.loads(relation.data[self._charm.app].get("scrape_jobs", "[]"))
if not jobs:
continue
changed_job = [j for j in jobs if j.get("job_name") == job_name]
if not changed_job:
continue
changed_job = changed_job[0]
# list of scrape jobs that have not changed
jobs = [job for job in jobs if job.get("job_name") != job_name]
# list of scrape jobs for units of the same application that still exist
configs_kept = [
config
for config in changed_job["static_configs"] # type: ignore
if config.get("labels", {}).get("juju_unit") != unit_name
]
if configs_kept:
changed_job["static_configs"] = configs_kept # type: ignore
jobs.append(changed_job)
relation.data[self._charm.app]["scrape_jobs"] = json.dumps(jobs)
def _update_alert_rules(self, event):
"""Update alert rules in response to scrape target changes.
When there is any change in alert rule relation data for any
scrape target, the list of alert rules for that specific
target is updated.
"""
unit_rules = self._get_alert_rules(event.relation)
if not unit_rules:
return
appname = event.relation.app.name
rules = self._label_alert_rules(unit_rules, appname)
# the alert rule group that has changed
updated_group = {"name": self._group_name(appname), "rules": rules}
for relation in self.model.relations[self._prometheus_relation]:
alert_rules = json.loads(relation.data[self._charm.app].get("alert_rules", "{}"))
groups = alert_rules.get("groups", [])
# list of alert rule groups that have not changed
groups = [group for group in groups if updated_group["name"] != group["name"]]
groups.append(updated_group)
relation.data[self._charm.app]["alert_rules"] = json.dumps({"groups": groups})
def _remove_alert_rules(self, event):
"""Remove alert rules for departed targets.
Any time a scrape target departs any alert rules associated
with that specific scrape target is removed.
"""
group_name = self._group_name(event.relation.app.name)
unit_name = event.unit.name
for relation in self.model.relations[self._prometheus_relation]:
alert_rules = json.loads(relation.data[self._charm.app].get("alert_rules", "{}"))
if not alert_rules:
continue
groups = alert_rules.get("groups", [])
if not groups:
continue
changed_group = [group for group in groups if group["name"] == group_name]
if not changed_group:
continue
changed_group = changed_group[0]
# list of alert rule groups that have not changed
groups = [group for group in groups if group["name"] != group_name]
# list of alert rules not associated with departing unit
rules_kept = [
rule
for rule in changed_group.get("rules") # type: ignore
if rule.get("labels").get("juju_unit") != unit_name
]
if rules_kept:
changed_group["rules"] = rules_kept # type: ignore
groups.append(changed_group)
relation.data[self._charm.app]["alert_rules"] = (
json.dumps({"groups": groups}) if groups else "{}"
)
def _get_targets(self, relation) -> dict:
"""Fetch scrape targets for a relation.
Scrape target information is returned for each unit in the
relation. This information contains the unit name, network
hostname (or address) for that unit, and port on which a
metrics endpoint is exposed in that unit.
Args:
relation: an `ops.model.Relation` object for which scrape
targets are required.
Returns:
a dictionary whose keys are names of the units in the
relation. There values associated with each key is itself
a dictionary of the form
```
{"hostname": hostname, "port": port}
```
"""
targets = {}
for unit in relation.units:
port = relation.data[unit].get("port", 80)
hostname = relation.data[unit].get("hostname")
if hostname:
targets.update({unit.name: {"hostname": hostname, "port": port}})
return targets
def _get_alert_rules(self, relation) -> dict:
"""Fetch alert rules for a relation.
Each unit of the related scrape target may have its own
associated alert rules. Alert rules for all units are returned
indexed by unit name.
Args:
relation: an `ops.model.Relation` object for which alert
rules are required.
Returns:
a dictionary whose keys are names of the units in the
relation. There values associated with each key is a list
of alert rules. Each rule is in dictionary format. The
structure "rule dictionary" corresponds to single
Prometheus alert rule.
"""
rules = {}
for unit in relation.units:
unit_rules = yaml.safe_load(relation.data[unit].get("groups", ""))
if unit_rules:
rules.update({unit.name: unit_rules})
return rules
def _job_name(self, appname) -> str:
"""Construct a scrape job name.
Each relation has its own unique scrape job name. All units in
the relation are scraped as part of the same scrape job.
Args:
appname: string name of a related application.
Returns:
a string Prometheus scrape job name for the application.
"""
return "juju_{}_{}_{}_prometheus_scrape".format(
self.model.name, self.model.uuid[:7], appname
)
def _group_name(self, appname) -> str:
"""Construct name for an alert rule group.
Each unit in a relation may define its own alert rules. All
rules, for all units in a relation are grouped together and
given a single alert rule group name.
Args:
appname: string name of a related application.
Returns:
a string Prometheus alert rules group name for the application.
"""
return "juju_{}_{}_{}_alert_rules".format(self.model.name, self.model.uuid[:7], appname)
def _label_alert_rules(self, unit_rules, appname) -> list:
"""Apply juju topology labels to alert rules.
Args:
unit_rules: a list of alert rules, where each rule is in
dictionary format.
appname: a string name of the application to which the
alert rules belong.
Returns:
a list of alert rules with Juju topology labels.
"""
labeled_rules = []
for unit_name, rules in unit_rules.items():
for rule in rules:
# the new JujuTopology removed this, so build it up by hand
matchers = {
"juju_{}".format(k): v
for k, v in JujuTopology(self.model.name, self.model.uuid, appname, unit_name)
.as_dict(excluded_keys=["charm_name"])
.items()
}
rule["labels"].update(matchers.items())
labeled_rules.append(rule)
return labeled_rules
def _static_scrape_job(self, targets, application_name, **kwargs) -> dict:
"""Construct a static scrape job for an application.
Args:
targets: a dictionary providing hostname and port for all
scrape target. The keys of this dictionary are unit
names. Values corresponding to these keys are
themselves a dictionary with keys "hostname" and
"port".
application_name: a string name of the application for
which this static scrape job is being constructed.
Returns:
A dictionary corresponding to a Prometheus static scrape
job configuration for one application. The returned
dictionary may be transformed into YAML and appended to
the list of any existing list of Prometheus static configs.
"""
juju_model = self.model.name
juju_model_uuid = self.model.uuid
job = {
"job_name": self._job_name(application_name),
"static_configs": [
{
"targets": ["{}:{}".format(target["hostname"], target["port"])],
"labels": {
"juju_model": juju_model,
"juju_model_uuid": juju_model_uuid,
"juju_application": application_name,
"juju_unit": unit_name,
"host": target["hostname"],
},
}
for unit_name, target in targets.items()
],
"relabel_configs": self._relabel_configs + kwargs.get("relabel_configs", []),
}
job.update(kwargs.get("updates", {}))
return job
@property
def _relabel_configs(self) -> list:
"""Create Juju topology relabeling configuration.
Using Juju topology for instance labels ensures that these
labels are stable across unit recreation.
Returns:
a list of Prometheus relabeling configurations. Each item in
this list is one relabel configuration.
"""
return (
[
{
"source_labels": [
"juju_model",
"juju_model_uuid",
"juju_application",
"juju_unit",
],
"separator": "_",
"target_label": "instance",
"regex": "(.*)",
}
]
if self._relabel_instance
else []
)
class CosTool:
"""Uses cos-tool to inject label matchers into alert rule expressions and validate rules."""
_path = None
_disabled = False
def __init__(self, charm):
self._charm = charm
@property
def path(self):
"""Lazy lookup of the path of cos-tool."""
if self._disabled:
return None
if not self._path:
self._path = self._get_tool_path()
if not self._path:
logger.debug("Skipping injection of juju topology as label matchers")
self._disabled = True
return self._path
def apply_label_matchers(self, rules) -> dict:
"""Will apply label matchers to the expression of all alerts in all supplied groups."""
if not self.path:
return rules
for group in rules["groups"]:
rules_in_group = group.get("rules", [])
for rule in rules_in_group:
topology = {}
# if the user for some reason has provided juju_unit, we'll need to honor it
# in most cases, however, this will be empty
for label in [
"juju_model",
"juju_model_uuid",
"juju_application",
"juju_charm",
"juju_unit",
]:
if label in rule["labels"]:
topology[label] = rule["labels"][label]
rule["expr"] = self.inject_label_matchers(rule["expr"], topology)
return rules
def validate_alert_rules(self, rules: dict) -> Tuple[bool, str]:
"""Will validate correctness of alert rules, returning a boolean and any errors."""
if not self.path:
logger.debug("`cos-tool` unavailable. Not validating alert correctness.")
return True, ""
with tempfile.TemporaryDirectory() as tmpdir:
rule_path = Path(tmpdir + "/validate_rule.yaml")
rule_path.write_text(yaml.dump(rules))
args = [str(self.path), "validate", str(rule_path)]
# noinspection PyBroadException
try:
self._exec(args)
return True, ""
except subprocess.CalledProcessError as e:
logger.debug("Validating the rules failed: %s", e.output)
return False, ", ".join(
[
line
for line in e.output.decode("utf8").splitlines()
if "error validating" in line
]
)
def inject_label_matchers(self, expression, topology) -> str:
"""Add label matchers to an expression."""
if not topology:
return expression
if not self.path:
logger.debug("`cos-tool` unavailable. Leaving expression unchanged: %s", expression)
return expression
args = [str(self.path), "transform"]
args.extend(
["--label-matcher={}={}".format(key, value) for key, value in topology.items()]
)
args.extend(["{}".format(expression)])
# noinspection PyBroadException
try:
return self._exec(args)
except subprocess.CalledProcessError as e:
logger.debug('Applying the expression failed: "%s", falling back to the original', e)
return expression
def _get_tool_path(self) -> Optional[Path]:
arch = platform.machine()
arch = "amd64" if arch == "x86_64" else arch
res = "cos-tool-{}".format(arch)
try:
path = Path(res).resolve()
path.chmod(0o777)
return path
except NotImplementedError:
logger.debug("System lacks support for chmod")
except FileNotFoundError:
logger.debug('Could not locate cos-tool at: "{}"'.format(res))
return None
def _exec(self, cmd) -> str:
result = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
return result.stdout.decode("utf-8").strip()