470 lines
19 KiB
Python
470 lines
19 KiB
Python
#
|
|
# Copyright 2014-2015 eNovance
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
from collections import defaultdict
|
|
from hashlib import md5
|
|
import itertools
|
|
import operator
|
|
import re
|
|
import threading
|
|
import uuid
|
|
|
|
from gnocchiclient import client
|
|
from gnocchiclient import exceptions as gnocchi_exc
|
|
from gnocchiclient import utils as gnocchi_utils
|
|
from keystoneauth1 import exceptions as ka_exceptions
|
|
from keystoneauth1 import session as ka_session
|
|
from oslo_config import cfg
|
|
from oslo_log import log
|
|
from oslo_utils import fnmatch
|
|
import requests
|
|
import retrying
|
|
import six
|
|
from stevedore import extension
|
|
|
|
from ceilometer import declarative
|
|
from ceilometer import dispatcher
|
|
from ceilometer.i18n import _, _LE, _LW
|
|
from ceilometer import keystone_client
|
|
|
|
NAME_ENCODED = __name__.encode('utf-8')
|
|
CACHE_NAMESPACE = uuid.UUID(bytes=md5(NAME_ENCODED).digest())
|
|
LOG = log.getLogger(__name__)
|
|
|
|
dispatcher_opts = [
|
|
cfg.BoolOpt('filter_service_activity',
|
|
default=True,
|
|
help='Filter out samples generated by Gnocchi '
|
|
'service activity'),
|
|
cfg.StrOpt('filter_project',
|
|
default='gnocchi',
|
|
help='Gnocchi project used to filter out samples '
|
|
'generated by Gnocchi service activity'),
|
|
cfg.StrOpt('url',
|
|
deprecated_for_removal=True,
|
|
help='URL to Gnocchi. default: autodetection'),
|
|
cfg.StrOpt('archive_policy',
|
|
help='The archive policy to use when the dispatcher '
|
|
'create a new metric.'),
|
|
cfg.StrOpt('resources_definition_file',
|
|
default='gnocchi_resources.yaml',
|
|
help=_('The Yaml file that defines mapping between samples '
|
|
'and gnocchi resources/metrics')),
|
|
]
|
|
|
|
cfg.CONF.register_opts(dispatcher_opts, group="dispatcher_gnocchi")
|
|
|
|
|
|
def cache_key_mangler(key):
|
|
"""Construct an opaque cache key."""
|
|
if six.PY2:
|
|
key = key.encode('utf-8')
|
|
return uuid.uuid5(CACHE_NAMESPACE, key).hex
|
|
|
|
|
|
class ResourcesDefinition(object):
|
|
|
|
MANDATORY_FIELDS = {'resource_type': six.string_types,
|
|
'metrics': list}
|
|
|
|
def __init__(self, definition_cfg, default_archive_policy, plugin_manager):
|
|
self._default_archive_policy = default_archive_policy
|
|
self.cfg = definition_cfg
|
|
|
|
for field, field_type in self.MANDATORY_FIELDS.items():
|
|
if field not in self.cfg:
|
|
raise declarative.ResourceDefinitionException(
|
|
_LE("Required field %s not specified") % field, self.cfg)
|
|
if not isinstance(self.cfg[field], field_type):
|
|
raise declarative.ResourceDefinitionException(
|
|
_LE("Required field %(field)s should be a %(type)s") %
|
|
{'field': field, 'type': field_type}, self.cfg)
|
|
|
|
self._attributes = {}
|
|
for name, attr_cfg in self.cfg.get('attributes', {}).items():
|
|
self._attributes[name] = declarative.Definition(name, attr_cfg,
|
|
plugin_manager)
|
|
|
|
self.metrics = {}
|
|
for t in self.cfg['metrics']:
|
|
archive_policy = self.cfg.get('archive_policy',
|
|
self._default_archive_policy)
|
|
if archive_policy is None:
|
|
self.metrics[t] = {}
|
|
else:
|
|
self.metrics[t] = dict(archive_policy_name=archive_policy)
|
|
|
|
def match(self, metric_name):
|
|
for t in self.cfg['metrics']:
|
|
if fnmatch.fnmatch(metric_name, t):
|
|
return True
|
|
return False
|
|
|
|
def attributes(self, sample):
|
|
attrs = {}
|
|
for name, definition in self._attributes.items():
|
|
value = definition.parse(sample)
|
|
if value is not None:
|
|
attrs[name] = value
|
|
return attrs
|
|
|
|
|
|
def get_gnocchiclient(conf):
|
|
requests_session = requests.session()
|
|
for scheme in requests_session.adapters.keys():
|
|
requests_session.mount(scheme, ka_session.TCPKeepAliveAdapter(
|
|
pool_block=True))
|
|
|
|
session = keystone_client.get_session(requests_session=requests_session)
|
|
return client.Client('1', session,
|
|
interface=conf.service_credentials.interface,
|
|
region_name=conf.service_credentials.region_name,
|
|
endpoint_override=conf.dispatcher_gnocchi.url)
|
|
|
|
|
|
class LockedDefaultDict(defaultdict):
|
|
"""defaultdict with lock to handle threading
|
|
|
|
Dictionary only deletes if nothing is accessing dict and nothing is holding
|
|
lock to be deleted. If both cases are not true, it will skip delete.
|
|
"""
|
|
def __init__(self, *args, **kwargs):
|
|
self.lock = threading.Lock()
|
|
super(LockedDefaultDict, self).__init__(*args, **kwargs)
|
|
|
|
def __getitem__(self, key):
|
|
with self.lock:
|
|
return super(LockedDefaultDict, self).__getitem__(key)
|
|
|
|
def pop(self, key, *args):
|
|
with self.lock:
|
|
key_lock = super(LockedDefaultDict, self).__getitem__(key)
|
|
if key_lock.acquire(False):
|
|
try:
|
|
super(LockedDefaultDict, self).pop(key, *args)
|
|
finally:
|
|
key_lock.release()
|
|
|
|
|
|
class GnocchiDispatcher(dispatcher.MeterDispatcherBase):
|
|
"""Dispatcher class for recording metering data into database.
|
|
|
|
The dispatcher class records each meter into the gnocchi service
|
|
configured in ceilometer configuration file. An example configuration may
|
|
look like the following:
|
|
|
|
[dispatcher_gnocchi]
|
|
url = http://localhost:8041
|
|
archive_policy = low
|
|
|
|
To enable this dispatcher, the following section needs to be present in
|
|
ceilometer.conf file
|
|
|
|
[DEFAULT]
|
|
meter_dispatchers = gnocchi
|
|
"""
|
|
def __init__(self, conf):
|
|
super(GnocchiDispatcher, self).__init__(conf)
|
|
self.conf = conf
|
|
self.filter_service_activity = (
|
|
conf.dispatcher_gnocchi.filter_service_activity)
|
|
self._ks_client = keystone_client.get_client()
|
|
self.resources_definition = self._load_resources_definitions(conf)
|
|
|
|
self.cache = None
|
|
try:
|
|
import oslo_cache
|
|
oslo_cache.configure(self.conf)
|
|
# NOTE(cdent): The default cache backend is a real but
|
|
# noop backend. We don't want to use that here because
|
|
# we want to avoid the cache pathways entirely if the
|
|
# cache has not been configured explicitly.
|
|
if 'null' not in self.conf.cache.backend:
|
|
cache_region = oslo_cache.create_region()
|
|
self.cache = oslo_cache.configure_cache_region(
|
|
self.conf, cache_region)
|
|
self.cache.key_mangler = cache_key_mangler
|
|
except ImportError:
|
|
pass
|
|
except oslo_cache.exception.ConfigurationError as exc:
|
|
LOG.warning(_LW('unable to configure oslo_cache: %s') % exc)
|
|
|
|
self._gnocchi_project_id = None
|
|
self._gnocchi_project_id_lock = threading.Lock()
|
|
self._gnocchi_resource_lock = LockedDefaultDict(threading.Lock)
|
|
|
|
self._gnocchi = get_gnocchiclient(conf)
|
|
# Convert retry_interval secs to msecs for retry decorator
|
|
retries = conf.storage.max_retries
|
|
|
|
@retrying.retry(wait_fixed=conf.storage.retry_interval * 1000,
|
|
stop_max_attempt_number=(retries if retries >= 0
|
|
else None))
|
|
def _get_connection():
|
|
self._gnocchi.capabilities.list()
|
|
|
|
try:
|
|
_get_connection()
|
|
except Exception:
|
|
LOG.error(_LE('Failed to connect to Gnocchi.'))
|
|
raise
|
|
|
|
@classmethod
|
|
def _load_resources_definitions(cls, conf):
|
|
plugin_manager = extension.ExtensionManager(
|
|
namespace='ceilometer.event.trait_plugin')
|
|
data = declarative.load_definitions(
|
|
{}, conf.dispatcher_gnocchi.resources_definition_file)
|
|
resource_defs = []
|
|
for resource in data.get('resources', []):
|
|
try:
|
|
resource_defs.append(ResourcesDefinition(
|
|
resource,
|
|
conf.dispatcher_gnocchi.archive_policy, plugin_manager))
|
|
except Exception as exc:
|
|
LOG.error(_LE("Failed to load resource due to error %s") %
|
|
exc)
|
|
return resource_defs
|
|
|
|
@property
|
|
def gnocchi_project_id(self):
|
|
if self._gnocchi_project_id is not None:
|
|
return self._gnocchi_project_id
|
|
with self._gnocchi_project_id_lock:
|
|
if self._gnocchi_project_id is None:
|
|
try:
|
|
project = self._ks_client.projects.find(
|
|
name=self.conf.dispatcher_gnocchi.filter_project)
|
|
except ka_exceptions.NotFound:
|
|
LOG.warning(_LW('gnocchi project not found in keystone,'
|
|
' ignoring the filter_service_activity '
|
|
'option'))
|
|
self.filter_service_activity = False
|
|
return None
|
|
except Exception:
|
|
LOG.exception('fail to retrieve user of Gnocchi service')
|
|
raise
|
|
self._gnocchi_project_id = project.id
|
|
LOG.debug("gnocchi project found: %s", self.gnocchi_project_id)
|
|
return self._gnocchi_project_id
|
|
|
|
def _is_swift_account_sample(self, sample):
|
|
return bool([rd for rd in self.resources_definition
|
|
if rd.cfg['resource_type'] == 'swift_account'
|
|
and rd.match(sample['counter_name'])])
|
|
|
|
def _is_gnocchi_activity(self, sample):
|
|
return (self.filter_service_activity and self.gnocchi_project_id and (
|
|
# avoid anything from the user used by gnocchi
|
|
sample['project_id'] == self.gnocchi_project_id or
|
|
# avoid anything in the swift account used by gnocchi
|
|
(sample['resource_id'] == self.gnocchi_project_id and
|
|
self._is_swift_account_sample(sample))
|
|
))
|
|
|
|
def _get_resource_definition(self, metric_name):
|
|
for rd in self.resources_definition:
|
|
if rd.match(metric_name):
|
|
return rd
|
|
|
|
def record_metering_data(self, data):
|
|
# We may have receive only one counter on the wire
|
|
if not isinstance(data, list):
|
|
data = [data]
|
|
# NOTE(sileht): skip sample generated by gnocchi itself
|
|
data = [s for s in data if not self._is_gnocchi_activity(s)]
|
|
|
|
# FIXME(sileht): This method bulk the processing of samples
|
|
# grouped by resource_id and metric_name but this is not
|
|
# efficient yet because the data received here doesn't often
|
|
# contains a lot of different kind of samples
|
|
# So perhaps the next step will be to pool the received data from
|
|
# message bus.
|
|
data.sort(key=lambda s: (s['resource_id'], s['counter_name']))
|
|
|
|
resource_grouped_samples = itertools.groupby(
|
|
data, key=operator.itemgetter('resource_id'))
|
|
|
|
gnocchi_data = {}
|
|
measures = {}
|
|
stats = dict(measures=0, resources=0, metrics=0)
|
|
for resource_id, samples_of_resource in resource_grouped_samples:
|
|
stats['resources'] += 1
|
|
metric_grouped_samples = itertools.groupby(
|
|
list(samples_of_resource),
|
|
key=operator.itemgetter('counter_name'))
|
|
|
|
# NOTE(sileht): We convert resource id to Gnocchi format
|
|
# because batch_resources_metrics_measures exception
|
|
# returns this id and not the ceilometer one
|
|
gnocchi_id = gnocchi_utils.encode_resource_id(resource_id)
|
|
res_info = gnocchi_data[gnocchi_id] = {}
|
|
for metric_name, samples in metric_grouped_samples:
|
|
stats['metrics'] += 1
|
|
|
|
samples = list(samples)
|
|
rd = self._get_resource_definition(metric_name)
|
|
if rd is None:
|
|
LOG.warning(_LW("metric %s is not handled by Gnocchi") %
|
|
metric_name)
|
|
continue
|
|
if rd.cfg.get("ignore"):
|
|
continue
|
|
|
|
res_info['resource_type'] = rd.cfg['resource_type']
|
|
res_info.setdefault("resource", {}).update({
|
|
"id": resource_id,
|
|
"user_id": samples[0]['user_id'],
|
|
"project_id": samples[0]['project_id'],
|
|
"metrics": rd.metrics,
|
|
})
|
|
|
|
for sample in samples:
|
|
res_info.setdefault("resource_extra", {}).update(
|
|
rd.attributes(sample))
|
|
m = measures.setdefault(gnocchi_id, {}).setdefault(
|
|
metric_name, [])
|
|
m.append({'timestamp': sample['timestamp'],
|
|
'value': sample['counter_volume']})
|
|
unit = sample['counter_unit']
|
|
metric = sample['counter_name']
|
|
res_info['resource']['metrics'][metric]['unit'] = unit
|
|
|
|
stats['measures'] += len(measures[gnocchi_id][metric_name])
|
|
res_info["resource"].update(res_info["resource_extra"])
|
|
|
|
try:
|
|
self.batch_measures(measures, gnocchi_data, stats)
|
|
except gnocchi_exc.ClientException as e:
|
|
LOG.error(six.text_type(e))
|
|
except Exception as e:
|
|
LOG.error(six.text_type(e), exc_info=True)
|
|
|
|
for gnocchi_id, info in gnocchi_data.items():
|
|
resource = info["resource"]
|
|
resource_type = info["resource_type"]
|
|
resource_extra = info["resource_extra"]
|
|
if not resource_extra:
|
|
continue
|
|
try:
|
|
self._if_not_cached("update", resource_type, resource,
|
|
self._update_resource, resource_extra)
|
|
except gnocchi_exc.ClientException as e:
|
|
LOG.error(six.text_type(e))
|
|
except Exception as e:
|
|
LOG.error(six.text_type(e), exc_info=True)
|
|
|
|
RE_UNKNOW_METRICS = re.compile("Unknown metrics: (.*) \(HTTP 400\)")
|
|
RE_UNKNOW_METRICS_LIST = re.compile("([^/ ,]*)/([^,]*)")
|
|
|
|
def batch_measures(self, measures, resource_infos, stats):
|
|
# NOTE(sileht): We don't care about error here, we want
|
|
# resources metadata always been updated
|
|
try:
|
|
self._gnocchi.metric.batch_resources_metrics_measures(measures)
|
|
except gnocchi_exc.BadRequest as e:
|
|
m = self.RE_UNKNOW_METRICS.match(six.text_type(e))
|
|
if m is None:
|
|
raise
|
|
|
|
# NOTE(sileht): Create all missing resources and metrics
|
|
metric_list = self.RE_UNKNOW_METRICS_LIST.findall(m.group(1))
|
|
gnocchi_ids_freshly_handled = set()
|
|
for gnocchi_id, metric_name in metric_list:
|
|
if gnocchi_id in gnocchi_ids_freshly_handled:
|
|
continue
|
|
resource = resource_infos[gnocchi_id]['resource']
|
|
resource_type = resource_infos[gnocchi_id]['resource_type']
|
|
try:
|
|
self._if_not_cached("create", resource_type, resource,
|
|
self._create_resource)
|
|
except gnocchi_exc.ResourceAlreadyExists:
|
|
metric = {'resource_id': resource['id'],
|
|
'name': metric_name}
|
|
metric.update(resource["metrics"][metric_name])
|
|
try:
|
|
self._gnocchi.metric.create(metric)
|
|
except gnocchi_exc.NamedMetricAlreadyExists:
|
|
# NOTE(sileht): metric created in the meantime
|
|
pass
|
|
except gnocchi_exc.ClientException as e:
|
|
LOG.error(six.text_type(e))
|
|
# We cannot post measures for this metric
|
|
del measures[gnocchi_id][metric_name]
|
|
if not measures[gnocchi_id]:
|
|
del measures[gnocchi_id]
|
|
except gnocchi_exc.ClientException as e:
|
|
LOG.error(six.text_type(e))
|
|
# We cannot post measures for this resource
|
|
del measures[gnocchi_id]
|
|
gnocchi_ids_freshly_handled.add(gnocchi_id)
|
|
else:
|
|
gnocchi_ids_freshly_handled.add(gnocchi_id)
|
|
|
|
# NOTE(sileht): we have created missing resources/metrics,
|
|
# now retry to post measures
|
|
self._gnocchi.metric.batch_resources_metrics_measures(measures)
|
|
|
|
# FIXME(sileht): take care of measures removed in stats
|
|
LOG.debug("%(measures)d measures posted against %(metrics)d "
|
|
"metrics through %(resources)d resources", stats)
|
|
|
|
def _create_resource(self, resource_type, resource):
|
|
self._gnocchi.resource.create(resource_type, resource)
|
|
LOG.debug('Resource %s created', resource["id"])
|
|
|
|
def _update_resource(self, resource_type, resource, resource_extra):
|
|
self._gnocchi.resource.update(resource_type,
|
|
resource["id"],
|
|
resource_extra)
|
|
LOG.debug('Resource %s updated', resource["id"])
|
|
|
|
def _if_not_cached(self, operation, resource_type, resource, method,
|
|
*args, **kwargs):
|
|
if self.cache:
|
|
cache_key = resource['id']
|
|
attribute_hash = self._check_resource_cache(cache_key, resource)
|
|
hit = False
|
|
if attribute_hash:
|
|
with self._gnocchi_resource_lock[cache_key]:
|
|
# NOTE(luogangyi): there is a possibility that the
|
|
# resource was already built in cache by another
|
|
# ceilometer-collector when we get the lock here.
|
|
attribute_hash = self._check_resource_cache(cache_key,
|
|
resource)
|
|
if attribute_hash:
|
|
method(resource_type, resource, *args, **kwargs)
|
|
self.cache.set(cache_key, attribute_hash)
|
|
else:
|
|
hit = True
|
|
LOG.debug('resource cache recheck hit for '
|
|
'%s %s', operation, cache_key)
|
|
self._gnocchi_resource_lock.pop(cache_key, None)
|
|
else:
|
|
hit = True
|
|
LOG.debug('Resource cache hit for %s %s', operation, cache_key)
|
|
if hit and operation == "create":
|
|
raise gnocchi_exc.ResourceAlreadyExists()
|
|
else:
|
|
method(resource_type, resource, *args, **kwargs)
|
|
|
|
def _check_resource_cache(self, key, resource_data):
|
|
cached_hash = self.cache.get(key)
|
|
attribute_hash = hash(frozenset(filter(lambda x: x[0] != "metrics",
|
|
resource_data.items())))
|
|
if not cached_hash or cached_hash != attribute_hash:
|
|
return attribute_hash
|
|
else:
|
|
return None
|