# # Copyright 2012 eNovance # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. # See the License for the specific language governing permissions and # limitations under the License. """ Telemetry Middleware for Swift Proxy Configuration: In /etc/swift/proxy-server.conf on the main pipeline add "ceilometer" just before "proxy-server" and add the following filter in the file: .. code-block:: python [filter:ceilometer] paste.filter_factory = ceilometermiddleware.swift:filter_factory # Some optional configuration this allow to publish additional metadata metadata_headers = X-TEST # Set reseller prefix (defaults to "AUTH_" if not set) reseller_prefix = AUTH_ # Set control_exchange to publish to. control_exchange = swift # Set transport url url = rabbit://me:passwd@host:5672/virtual_host # set messaging driver driver = messagingv2 # set topic topic = notifications # skip metering of requests from listed project ids ignore_projects = , , # Whether to send events to messaging driver in a background thread nonblocking_notify = False # Queue size for sending notifications in background thread (0=unlimited). # New notifications will be discarded if the queue is full. send_queue_size = 1000 # Logging level control log_level = WARNING # All keystoneauth1 options can be set to query project name for # ignore_projects option, here is just a example: auth_type = password auth_url = https://[::1]:5000 project_name = service project_domain_name = Default username = user user_domain_name = Default password = a_big_secret interface = public """ import datetime import functools import logging from keystoneauth1 import exceptions as ksa_exc from keystoneauth1.loading import adapter as ksa_adapter from keystoneauth1.loading import base as ksa_base from keystoneauth1.loading import session as ksa_session from keystoneclient.v3 import client as ks_client from oslo_config import cfg import oslo_messaging from oslo_utils import strutils from pycadf import event as cadf_event from pycadf.helper import api from pycadf import measurement as cadf_measurement from pycadf import metric as cadf_metric from pycadf import resource as cadf_resource import six import six.moves.queue as queue import six.moves.urllib.parse as urlparse import threading LOG = logging.getLogger(__name__) def list_from_csv(comma_separated_str): if comma_separated_str: return list( filter(lambda x: x, map(lambda x: x.strip(), comma_separated_str.split(',')))) return [] def _log_and_ignore_error(fn): @functools.wraps(fn) def wrapper(*args, **kwargs): try: return fn(*args, **kwargs) except Exception as e: LOG.exception('An exception occurred processing ' 'the API call: %s ', e) return wrapper class InputProxy(object): """File-like object that counts bytes read. To be swapped in for wsgi.input for accounting purposes. Borrowed from swift.common.utils. Duplicated here to avoid dependency on swift package. """ def __init__(self, wsgi_input): self.wsgi_input = wsgi_input self.bytes_received = 0 def read(self, *args, **kwargs): """Pass read request to the underlying file-like object Add bytes read to total. """ chunk = self.wsgi_input.read(*args, **kwargs) self.bytes_received += len(chunk) return chunk def readline(self, *args, **kwargs): """Pass readline request to the underlying file-like object Add bytes read to total. """ line = self.wsgi_input.readline(*args, **kwargs) self.bytes_received += len(line) return line def close(self): close_method = getattr(self.wsgi_input, 'close', None) if callable(close_method): close_method() class KeystoneClientLoader(ksa_adapter.Adapter): """Keystone client adapter loader. Keystone client and Keystoneauth1 adapter take exactly the same options, so it's safe to create a keystone client with keystoneauth adapter options. """ @property def plugin_class(self): return ks_client.Client class Swift(object): """Swift middleware used for counting requests.""" event_queue = None threadLock = threading.Lock() DEFAULT_IGNORE_PROJECT_NAMES = ['service'] def __init__(self, app, conf): self._app = app self.ignore_projects = self._get_ignore_projects(conf) oslo_messaging.set_transport_defaults(conf.get('control_exchange', 'swift')) self._notifier = oslo_messaging.Notifier( oslo_messaging.get_notification_transport(cfg.CONF, url=conf.get('url')), publisher_id='ceilometermiddleware', driver=conf.get('driver', 'messagingv2'), topics=[conf.get('topic', 'notifications')]) self.metadata_headers = [h.strip().replace('-', '_').lower() for h in conf.get( "metadata_headers", "").split(",") if h.strip()] self.reseller_prefix = conf.get('reseller_prefix', 'AUTH_') if self.reseller_prefix and self.reseller_prefix[-1] != '_': self.reseller_prefix += '_' LOG.setLevel(getattr(logging, conf.get('log_level', 'WARNING'))) # NOTE: If the background thread's send queue fills up, the event will # be discarded # # For backward compatibility we default to False and therefore wait for # sending to complete. This causes swift proxy to hang if the # destination is unavailable. self.nonblocking_notify = strutils.bool_from_string( conf.get('nonblocking_notify', False)) # Initialize the sending queue and thread, but only once if self.nonblocking_notify and Swift.event_queue is None: Swift.threadLock.acquire() if Swift.event_queue is None: send_queue_size = int(conf.get('send_queue_size', 1000)) Swift.event_queue = queue.Queue(send_queue_size) self.start_sender_thread() Swift.threadLock.release() def _get_ignore_projects(self, conf): if 'auth_type' not in conf: LOG.info("'auth_type' is not set assuming ignore_projects are " "only project uuid.") return list_from_csv(conf.get('ignore_projects')) if 'ignore_projects' in conf: ignore_projects = list_from_csv(conf.get('ignore_projects')) else: ignore_projects = self.DEFAULT_IGNORE_PROJECT_NAMES if not ignore_projects: return [] def opt_getter(opt): # TODO(sileht): This method does not support deprecated opt names val = conf.get(opt.name) if val is None: val = conf.get(opt.dest) return val auth_type = conf.get('auth_type') plugin = ksa_base.get_plugin_loader(auth_type) auth = plugin.load_from_options_getter(opt_getter) session = ksa_session.Session().load_from_options_getter( opt_getter, auth=auth) client = KeystoneClientLoader().load_from_options_getter( opt_getter, session=session) projects = [] for name_or_id in ignore_projects: projects.extend(self._get_keystone_projects(client, name_or_id)) return projects @staticmethod def _get_keystone_projects(client, name_or_id): try: return [client.projects.get(name_or_id)] except ksa_exc.NotFound: pass if isinstance(name_or_id, six.binary_type): name_or_id = name_or_id.decode('utf-8', 'strict') projects = client.projects.list(name=name_or_id) if not projects: LOG.warning("fail to find project '%s' in keystone", name_or_id) return [p.id for p in projects] def __call__(self, env, start_response): start_response_args = [None] input_proxy = InputProxy(env['wsgi.input']) env['wsgi.input'] = input_proxy def my_start_response(status, headers, exc_info=None): start_response_args[0] = (status, list(headers), exc_info) def iter_response(iterable): iterator = iter(iterable) try: chunk = next(iterator) while not chunk: chunk = next(iterator) except StopIteration: chunk = '' if start_response_args[0]: start_response(*start_response_args[0]) bytes_sent = 0 try: while chunk: bytes_sent += len(chunk) yield chunk try: chunk = next(iterator) except StopIteration: chunk = '' finally: close_method = getattr(iterable, 'close', None) if callable(close_method): close_method() self.emit_event(env, input_proxy.bytes_received, bytes_sent) try: iterable = self._app(env, my_start_response) except Exception: self.emit_event(env, input_proxy.bytes_received, 0, 'failure') raise else: return iter_response(iterable) @_log_and_ignore_error def emit_event(self, env, bytes_received, bytes_sent, outcome='success'): if ((env.get('HTTP_X_SERVICE_PROJECT_ID') or env.get('HTTP_X_PROJECT_ID') or env.get('HTTP_X_TENANT_ID')) in self.ignore_projects or env.get('swift.source') is not None): return path = urlparse.quote(env.get('swift.backend_path', env['PATH_INFO'])) method = env['REQUEST_METHOD'] headers = {} for header in env: if header.startswith('HTTP_') and env[header]: key = header[5:] if isinstance(env[header], six.text_type): headers[key] = six.text_type(env[header]) else: headers[key] = str(env[header]) try: container = obj = None path = path.replace('/', '', 1) version, account, remainder = path.split('/', 2) except ValueError: try: version, account = path.split('/', 1) remainder = None except ValueError: return try: if not version or not account: raise ValueError('Invalid path: %s' % path) if remainder: if '/' in remainder: container, obj = remainder.split('/', 1) else: container = remainder except ValueError: return now = datetime.datetime.utcnow().isoformat() resource_metadata = { "path": path, "version": version, "container": container, "object": obj, } for header in self.metadata_headers: if header.upper() in headers: resource_metadata['http_header_%s' % header] = headers.get( header.upper()) # build object store details target = cadf_resource.Resource( typeURI='service/storage/object', id=account.partition(self.reseller_prefix)[2] or path) target.metadata = resource_metadata target.action = method.lower() # build user details initiator = cadf_resource.Resource( typeURI='service/security/account/user', id=env.get('HTTP_X_USER_ID')) initiator.project_id = (env.get('HTTP_X_PROJECT_ID') or env.get('HTTP_X_TENANT_ID')) # build notification body event = cadf_event.Event(eventTime=now, outcome=outcome, action=api.convert_req_action(method), initiator=initiator, target=target, observer=cadf_resource.Resource(id='target')) # measurements if bytes_received: event.add_measurement(cadf_measurement.Measurement( result=bytes_received, metric=cadf_metric.Metric( name='storage.objects.incoming.bytes', unit='B'))) if bytes_sent: event.add_measurement(cadf_measurement.Measurement( result=bytes_sent, metric=cadf_metric.Metric( name='storage.objects.outgoing.bytes', unit='B'))) if self.nonblocking_notify: try: Swift.event_queue.put(event, False) if not Swift.event_sender.is_alive(): Swift.threadLock.acquire() self.start_sender_thread() Swift.threadLock.release() except queue.Full: LOG.warning('Send queue FULL: Event %s not added', event.id) else: Swift.send_notification(self._notifier, event) def start_sender_thread(self): Swift.event_sender = SendEventThread(self._notifier) Swift.event_sender.daemon = True Swift.event_sender.start() @staticmethod def send_notification(notifier, event): notifier.info({}, 'objectstore.http.request', event.as_dict()) class SendEventThread(threading.Thread): def __init__(self, notifier): super(SendEventThread, self).__init__() self.notifier = notifier def run(self): """Send events without blocking swift proxy.""" while True: try: LOG.debug('Wait for event from send queue') event = Swift.event_queue.get() LOG.debug('Got event %s from queue - now send it', event.id) Swift.send_notification(self.notifier, event) LOG.debug('Event %s sent.', event.id) except BaseException: LOG.exception("SendEventThread loop exception") def filter_factory(global_conf, **local_conf): conf = global_conf.copy() conf.update(local_conf) def filter(app): return Swift(app, conf) return filter