551 lines
22 KiB
Python
551 lines
22 KiB
Python
#
|
|
# Copyright 2014-2015 eNovance
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
from collections import defaultdict
|
|
import hashlib
|
|
import itertools
|
|
import json
|
|
import operator
|
|
import pkg_resources
|
|
import threading
|
|
import uuid
|
|
|
|
from gnocchiclient import exceptions as gnocchi_exc
|
|
from keystoneauth1 import exceptions as ka_exceptions
|
|
from oslo_log import log
|
|
from oslo_utils import fnmatch
|
|
from oslo_utils import timeutils
|
|
import six
|
|
import six.moves.urllib.parse as urlparse
|
|
from stevedore import extension
|
|
|
|
from ceilometer import declarative
|
|
from ceilometer import gnocchi_client
|
|
from ceilometer.i18n import _
|
|
from ceilometer import keystone_client
|
|
from ceilometer import publisher
|
|
|
|
NAME_ENCODED = __name__.encode('utf-8')
|
|
CACHE_NAMESPACE = uuid.UUID(bytes=hashlib.md5(NAME_ENCODED).digest())
|
|
LOG = log.getLogger(__name__)
|
|
|
|
|
|
def cache_key_mangler(key):
|
|
"""Construct an opaque cache key."""
|
|
if six.PY2:
|
|
key = key.encode('utf-8')
|
|
return uuid.uuid5(CACHE_NAMESPACE, key).hex
|
|
|
|
|
|
EVENT_CREATE, EVENT_UPDATE, EVENT_DELETE = ("create", "update", "delete")
|
|
|
|
|
|
class ResourcesDefinition(object):
|
|
|
|
MANDATORY_FIELDS = {'resource_type': six.string_types,
|
|
'metrics': (dict, list)}
|
|
|
|
MANDATORY_EVENT_FIELDS = {'id': six.string_types}
|
|
|
|
def __init__(self, definition_cfg, archive_policy_default,
|
|
archive_policy_override, plugin_manager):
|
|
self.cfg = definition_cfg
|
|
|
|
self._check_required_and_types(self.MANDATORY_FIELDS, self.cfg)
|
|
|
|
if self.support_events():
|
|
self._check_required_and_types(self.MANDATORY_EVENT_FIELDS,
|
|
self.cfg['event_attributes'])
|
|
|
|
self._attributes = {}
|
|
for name, attr_cfg in self.cfg.get('attributes', {}).items():
|
|
self._attributes[name] = declarative.Definition(name, attr_cfg,
|
|
plugin_manager)
|
|
|
|
self._event_attributes = {}
|
|
for name, attr_cfg in self.cfg.get('event_attributes', {}).items():
|
|
self._event_attributes[name] = declarative.Definition(
|
|
name, attr_cfg, plugin_manager)
|
|
|
|
self.metrics = {}
|
|
|
|
# NOTE(sileht): Convert old list to new dict format
|
|
if isinstance(self.cfg['metrics'], list):
|
|
values = [None] * len(self.cfg['metrics'])
|
|
self.cfg['metrics'] = dict(zip(self.cfg['metrics'], values))
|
|
|
|
for m, extra in self.cfg['metrics'].items():
|
|
if not extra:
|
|
extra = {}
|
|
|
|
if not extra.get("archive_policy_name"):
|
|
extra["archive_policy_name"] = archive_policy_default
|
|
|
|
if archive_policy_override:
|
|
extra["archive_policy_name"] = archive_policy_override
|
|
|
|
# NOTE(sileht): For backward compat, this is after the override to
|
|
# preserve the wierd previous behavior. We don't really care as we
|
|
# deprecate it.
|
|
if 'archive_policy' in self.cfg:
|
|
LOG.warning("archive_policy '%s' for a resource-type (%s) is "
|
|
"deprecated, set it for each metric instead.",
|
|
self.cfg["archive_policy"],
|
|
self.cfg["resource_type"])
|
|
extra["archive_policy_name"] = self.cfg['archive_policy']
|
|
|
|
self.metrics[m] = extra
|
|
|
|
@staticmethod
|
|
def _check_required_and_types(expected, definition):
|
|
for field, field_types in expected.items():
|
|
if field not in definition:
|
|
raise declarative.ResourceDefinitionException(
|
|
_("Required field %s not specified") % field, definition)
|
|
if not isinstance(definition[field], field_types):
|
|
raise declarative.ResourceDefinitionException(
|
|
_("Required field %(field)s should be a %(type)s") %
|
|
{'field': field, 'type': field_types}, definition)
|
|
|
|
@staticmethod
|
|
def _ensure_list(value):
|
|
if isinstance(value, list):
|
|
return value
|
|
return [value]
|
|
|
|
def support_events(self):
|
|
for e in ["event_create", "event_delete", "event_update"]:
|
|
if e in self.cfg:
|
|
return True
|
|
return False
|
|
|
|
def event_match(self, event_type):
|
|
for e in self._ensure_list(self.cfg.get('event_create', [])):
|
|
if fnmatch.fnmatch(event_type, e):
|
|
return EVENT_CREATE
|
|
for e in self._ensure_list(self.cfg.get('event_delete', [])):
|
|
if fnmatch.fnmatch(event_type, e):
|
|
return EVENT_DELETE
|
|
for e in self._ensure_list(self.cfg.get('event_update', [])):
|
|
if fnmatch.fnmatch(event_type, e):
|
|
return EVENT_UPDATE
|
|
|
|
def sample_attributes(self, sample):
|
|
attrs = {}
|
|
sample_dict = sample.as_dict()
|
|
for name, definition in self._attributes.items():
|
|
value = definition.parse(sample_dict)
|
|
if value is not None:
|
|
attrs[name] = value
|
|
return attrs
|
|
|
|
def event_attributes(self, event):
|
|
attrs = {'type': self.cfg['resource_type']}
|
|
traits = dict([(trait.name, trait.value) for trait in event.traits])
|
|
for attr, field in self.cfg.get('event_attributes', {}).items():
|
|
value = traits.get(field)
|
|
if value is not None:
|
|
attrs[attr] = value
|
|
return attrs
|
|
|
|
|
|
class LockedDefaultDict(defaultdict):
|
|
"""defaultdict with lock to handle threading
|
|
|
|
Dictionary only deletes if nothing is accessing dict and nothing is holding
|
|
lock to be deleted. If both cases are not true, it will skip delete.
|
|
"""
|
|
def __init__(self, *args, **kwargs):
|
|
self.lock = threading.Lock()
|
|
super(LockedDefaultDict, self).__init__(*args, **kwargs)
|
|
|
|
def __getitem__(self, key):
|
|
with self.lock:
|
|
return super(LockedDefaultDict, self).__getitem__(key)
|
|
|
|
def pop(self, key, *args):
|
|
with self.lock:
|
|
key_lock = super(LockedDefaultDict, self).__getitem__(key)
|
|
if key_lock.acquire(False):
|
|
try:
|
|
super(LockedDefaultDict, self).pop(key, *args)
|
|
finally:
|
|
key_lock.release()
|
|
|
|
|
|
class GnocchiPublisher(publisher.ConfigPublisherBase):
|
|
"""Publisher class for recording metering data into the Gnocchi service.
|
|
|
|
The publisher class records each meter into the gnocchi service
|
|
configured in Ceilometer pipeline file. An example target may
|
|
look like the following:
|
|
|
|
gnocchi://?archive_policy=low&filter_project=gnocchi
|
|
"""
|
|
def __init__(self, conf, parsed_url):
|
|
super(GnocchiPublisher, self).__init__(conf, parsed_url)
|
|
# TODO(jd) allow to override Gnocchi endpoint via the host in the URL
|
|
options = urlparse.parse_qs(parsed_url.query)
|
|
|
|
self.filter_project = options.get('filter_project', ['service'])[-1]
|
|
self.filter_domain = options.get('filter_domain', ['Default'])[-1]
|
|
|
|
resources_definition_file = options.get(
|
|
'resources_definition_file', ['gnocchi_resources.yaml'])[-1]
|
|
|
|
archive_policy_override = options.get('archive_policy', [None])[-1]
|
|
self.resources_definition, self.archive_policies_definition = (
|
|
self._load_definitions(conf, archive_policy_override,
|
|
resources_definition_file))
|
|
self.metric_map = dict((metric, rd) for rd in self.resources_definition
|
|
for metric in rd.metrics)
|
|
|
|
timeout = options.get('timeout', [6.05])[-1]
|
|
self._ks_client = keystone_client.get_client(conf)
|
|
|
|
self.cache = None
|
|
try:
|
|
import oslo_cache
|
|
oslo_cache.configure(conf)
|
|
# NOTE(cdent): The default cache backend is a real but
|
|
# noop backend. We don't want to use that here because
|
|
# we want to avoid the cache pathways entirely if the
|
|
# cache has not been configured explicitly.
|
|
if conf.cache.enabled:
|
|
cache_region = oslo_cache.create_region()
|
|
self.cache = oslo_cache.configure_cache_region(
|
|
conf, cache_region)
|
|
self.cache.key_mangler = cache_key_mangler
|
|
except ImportError:
|
|
pass
|
|
except oslo_cache.exception.ConfigurationError as exc:
|
|
LOG.warning('unable to configure oslo_cache: %s', exc)
|
|
|
|
self._gnocchi_project_id = None
|
|
self._gnocchi_project_id_lock = threading.Lock()
|
|
self._gnocchi_resource_lock = LockedDefaultDict(threading.Lock)
|
|
|
|
self._gnocchi = gnocchi_client.get_gnocchiclient(
|
|
conf, request_timeout=timeout)
|
|
self._already_logged_event_types = set()
|
|
self._already_logged_metric_names = set()
|
|
|
|
self._already_configured_archive_policies = False
|
|
|
|
@staticmethod
|
|
def _load_definitions(conf, archive_policy_override,
|
|
resources_definition_file):
|
|
plugin_manager = extension.ExtensionManager(
|
|
namespace='ceilometer.event.trait_plugin')
|
|
data = declarative.load_definitions(
|
|
conf, {}, resources_definition_file,
|
|
pkg_resources.resource_filename(__name__,
|
|
"data/gnocchi_resources.yaml"))
|
|
|
|
archive_policy_default = data.get("archive_policy_default",
|
|
"ceilometer-low")
|
|
resource_defs = []
|
|
for resource in data.get('resources', []):
|
|
try:
|
|
resource_defs.append(ResourcesDefinition(
|
|
resource,
|
|
archive_policy_default,
|
|
archive_policy_override,
|
|
plugin_manager))
|
|
except Exception:
|
|
LOG.error("Failed to load resource due to error",
|
|
exc_info=True)
|
|
return resource_defs, data.get("archive_policies", [])
|
|
|
|
def ensures_archives_policies(self):
|
|
if not self._already_configured_archive_policies:
|
|
for ap in self.archive_policies_definition:
|
|
try:
|
|
self._gnocchi.archive_policy.create(ap)
|
|
except gnocchi_exc.ArchivePolicyAlreadyExists:
|
|
# created in the meantime by another worker
|
|
pass
|
|
self._already_configured_archive_policies = True
|
|
|
|
@property
|
|
def gnocchi_project_id(self):
|
|
if self._gnocchi_project_id is not None:
|
|
return self._gnocchi_project_id
|
|
with self._gnocchi_project_id_lock:
|
|
if self._gnocchi_project_id is None:
|
|
try:
|
|
project = self._ks_client.projects.find(
|
|
name=self.filter_project,
|
|
domain=self.filter_domain)
|
|
except ka_exceptions.NotFound:
|
|
LOG.warning('filtered project not found in keystone,'
|
|
' ignoring the filter_project '
|
|
'option')
|
|
self.filter_project = None
|
|
return None
|
|
except Exception:
|
|
LOG.exception('fail to retrieve filtered project ')
|
|
raise
|
|
self._gnocchi_project_id = project.id
|
|
LOG.debug("filtered project found: %s",
|
|
self._gnocchi_project_id)
|
|
return self._gnocchi_project_id
|
|
|
|
def _is_swift_account_sample(self, sample):
|
|
try:
|
|
return (self.metric_map[sample.name].cfg['resource_type']
|
|
== 'swift_account')
|
|
except KeyError:
|
|
return False
|
|
|
|
def _is_gnocchi_activity(self, sample):
|
|
return (self.filter_project and self.gnocchi_project_id and (
|
|
# avoid anything from the user used by gnocchi
|
|
sample.project_id == self.gnocchi_project_id or
|
|
# avoid anything in the swift account used by gnocchi
|
|
(sample.resource_id == self.gnocchi_project_id and
|
|
self._is_swift_account_sample(sample))
|
|
))
|
|
|
|
def _get_resource_definition_from_event(self, event_type):
|
|
for rd in self.resources_definition:
|
|
operation = rd.event_match(event_type)
|
|
if operation:
|
|
return rd, operation
|
|
|
|
def publish_samples(self, data):
|
|
self.ensures_archives_policies()
|
|
|
|
# NOTE(sileht): skip sample generated by gnocchi itself
|
|
data = [s for s in data if not self._is_gnocchi_activity(s)]
|
|
data.sort(key=operator.attrgetter('resource_id'))
|
|
resource_grouped_samples = itertools.groupby(
|
|
data, key=operator.attrgetter('resource_id'))
|
|
|
|
gnocchi_data = {}
|
|
measures = {}
|
|
for resource_id, samples_of_resource in resource_grouped_samples:
|
|
# NOTE(sileht): / is forbidden by Gnocchi
|
|
resource_id = resource_id.replace('/', '_')
|
|
for sample in samples_of_resource:
|
|
metric_name = sample.name
|
|
LOG.debug("Processing sample [%s] for resource ID [%s].",
|
|
sample, resource_id)
|
|
rd = self.metric_map.get(metric_name)
|
|
if rd is None:
|
|
if metric_name not in self._already_logged_metric_names:
|
|
LOG.warning("metric %s is not handled by Gnocchi" %
|
|
metric_name)
|
|
self._already_logged_metric_names.add(metric_name)
|
|
continue
|
|
|
|
if resource_id not in gnocchi_data:
|
|
gnocchi_data[resource_id] = {
|
|
'resource_type': rd.cfg['resource_type'],
|
|
'resource': {"id": resource_id,
|
|
"user_id": sample.user_id,
|
|
"project_id": sample.project_id}}
|
|
|
|
gnocchi_data[resource_id].setdefault(
|
|
"resource_extra", {}).update(rd.sample_attributes(sample))
|
|
measures.setdefault(resource_id, {}).setdefault(
|
|
metric_name,
|
|
{"measures": [],
|
|
"archive_policy_name":
|
|
rd.metrics[metric_name]["archive_policy_name"],
|
|
"unit": sample.unit}
|
|
)["measures"].append(
|
|
{'timestamp': sample.timestamp,
|
|
'value': sample.volume}
|
|
)
|
|
|
|
try:
|
|
self.batch_measures(measures, gnocchi_data)
|
|
except gnocchi_exc.ClientException as e:
|
|
LOG.error(six.text_type(e))
|
|
except Exception as e:
|
|
LOG.error(six.text_type(e), exc_info=True)
|
|
|
|
for info in gnocchi_data.values():
|
|
resource = info["resource"]
|
|
resource_type = info["resource_type"]
|
|
resource_extra = info["resource_extra"]
|
|
if not resource_extra:
|
|
continue
|
|
try:
|
|
self._if_not_cached(resource_type, resource['id'],
|
|
resource_extra)
|
|
except gnocchi_exc.ClientException as e:
|
|
LOG.error(six.text_type(e))
|
|
except Exception as e:
|
|
LOG.error(six.text_type(e), exc_info=True)
|
|
|
|
@staticmethod
|
|
def _extract_resources_from_error(e, resource_infos):
|
|
resource_ids = set([r['original_resource_id']
|
|
for r in e.message['detail']])
|
|
return [(resource_infos[rid]['resource_type'],
|
|
resource_infos[rid]['resource'],
|
|
resource_infos[rid]['resource_extra'])
|
|
for rid in resource_ids]
|
|
|
|
def batch_measures(self, measures, resource_infos):
|
|
# NOTE(sileht): We don't care about error here, we want
|
|
# resources metadata always been updated
|
|
try:
|
|
self._gnocchi.metric.batch_resources_metrics_measures(
|
|
measures, create_metrics=True)
|
|
except gnocchi_exc.BadRequest as e:
|
|
if not isinstance(e.message, dict):
|
|
raise
|
|
if e.message.get('cause') != 'Unknown resources':
|
|
raise
|
|
|
|
resources = self._extract_resources_from_error(e, resource_infos)
|
|
for resource_type, resource, resource_extra in resources:
|
|
try:
|
|
resource.update(resource_extra)
|
|
self._create_resource(resource_type, resource)
|
|
except gnocchi_exc.ResourceAlreadyExists:
|
|
# NOTE(sileht): resource created in the meantime
|
|
pass
|
|
except gnocchi_exc.ClientException as e:
|
|
LOG.error('Error creating resource %(id)s: %(err)s',
|
|
{'id': resource['id'], 'err': six.text_type(e)})
|
|
# We cannot post measures for this resource
|
|
# and we can't patch it later
|
|
del measures[resource['id']]
|
|
del resource_infos[resource['id']]
|
|
else:
|
|
if self.cache and resource_extra:
|
|
self.cache.set(resource['id'],
|
|
self._hash_resource(resource_extra))
|
|
|
|
# NOTE(sileht): we have created missing resources/metrics,
|
|
# now retry to post measures
|
|
self._gnocchi.metric.batch_resources_metrics_measures(
|
|
measures, create_metrics=True)
|
|
|
|
LOG.debug(
|
|
"%d measures posted against %d metrics through %d resources",
|
|
sum(len(m["measures"])
|
|
for rid in measures
|
|
for m in measures[rid].values()),
|
|
sum(len(m) for m in measures.values()), len(resource_infos))
|
|
|
|
def _create_resource(self, resource_type, resource):
|
|
self._gnocchi.resource.create(resource_type, resource)
|
|
LOG.debug('Resource %s created', resource["id"])
|
|
|
|
def _update_resource(self, resource_type, res_id, resource_extra):
|
|
self._gnocchi.resource.update(resource_type, res_id, resource_extra)
|
|
LOG.debug('Resource %s updated', res_id)
|
|
|
|
def _if_not_cached(self, resource_type, res_id, resource_extra):
|
|
if self.cache:
|
|
attribute_hash = self._hash_resource(resource_extra)
|
|
if self._resource_cache_diff(res_id, attribute_hash):
|
|
with self._gnocchi_resource_lock[res_id]:
|
|
# NOTE(luogangyi): there is a possibility that the
|
|
# resource was already built in cache by another
|
|
# ceilometer-notification-agent when we get the lock here.
|
|
if self._resource_cache_diff(res_id, attribute_hash):
|
|
self._update_resource(resource_type, res_id,
|
|
resource_extra)
|
|
self.cache.set(res_id, attribute_hash)
|
|
else:
|
|
LOG.debug('Resource cache hit for %s', res_id)
|
|
self._gnocchi_resource_lock.pop(res_id, None)
|
|
else:
|
|
LOG.debug('Resource cache hit for %s', res_id)
|
|
else:
|
|
self._update_resource(resource_type, res_id, resource_extra)
|
|
|
|
@staticmethod
|
|
def _hash_resource(resource):
|
|
return hash(tuple(i for i in resource.items() if i[0] != 'metrics'))
|
|
|
|
def _resource_cache_diff(self, key, attribute_hash):
|
|
cached_hash = self.cache.get(key)
|
|
return not cached_hash or cached_hash != attribute_hash
|
|
|
|
def publish_events(self, events):
|
|
for event in events:
|
|
rd = self._get_resource_definition_from_event(event.event_type)
|
|
if not rd:
|
|
if event.event_type not in self._already_logged_event_types:
|
|
LOG.debug("No gnocchi definition for event type: %s",
|
|
event.event_type)
|
|
self._already_logged_event_types.add(event.event_type)
|
|
continue
|
|
|
|
rd, operation = rd
|
|
if operation == EVENT_DELETE:
|
|
self._delete_event(rd, event)
|
|
if operation == EVENT_CREATE:
|
|
self._create_event(rd, event)
|
|
|
|
def _delete_event(self, rd, event):
|
|
ended_at = timeutils.utcnow().isoformat()
|
|
|
|
resource = rd.event_attributes(event)
|
|
associated_resources = rd.cfg.get('event_associated_resources', {})
|
|
|
|
if associated_resources:
|
|
to_end = itertools.chain([resource], *[
|
|
self._search_resource(resource_type, query % resource['id'])
|
|
for resource_type, query in associated_resources.items()
|
|
])
|
|
else:
|
|
to_end = [resource]
|
|
|
|
for resource in to_end:
|
|
self._set_ended_at(resource, ended_at)
|
|
|
|
def _create_event(self, rd, event):
|
|
resource = rd.event_attributes(event)
|
|
resource_type = resource.pop('type')
|
|
|
|
try:
|
|
self._create_resource(resource_type, resource)
|
|
except gnocchi_exc.ResourceAlreadyExists:
|
|
LOG.debug("Create event received on existing resource (%s), "
|
|
"ignore it.", resource['id'])
|
|
except Exception:
|
|
LOG.error("Failed to create resource %s", resource,
|
|
exc_info=True)
|
|
|
|
def _search_resource(self, resource_type, query):
|
|
try:
|
|
return self._gnocchi.resource.search(
|
|
resource_type, json.loads(query))
|
|
except Exception:
|
|
LOG.error("Fail to search resource type %(resource_type)s "
|
|
"with '%(query)s'",
|
|
{'resource_type': resource_type, 'query': query},
|
|
exc_info=True)
|
|
return []
|
|
|
|
def _set_ended_at(self, resource, ended_at):
|
|
try:
|
|
self._gnocchi.resource.update(resource['type'], resource['id'],
|
|
{'ended_at': ended_at})
|
|
except gnocchi_exc.ResourceNotFound:
|
|
LOG.debug("Delete event received on unexisting resource (%s), "
|
|
"ignore it.", resource['id'])
|
|
except Exception:
|
|
LOG.error("Fail to update the resource %s", resource,
|
|
exc_info=True)
|
|
LOG.debug('Resource %s ended at %s' % (resource["id"], ended_at))
|