diff --git a/ceilometermiddleware/swift.py b/ceilometermiddleware/swift.py index 411351a..f9c7605 100644 --- a/ceilometermiddleware/swift.py +++ b/ceilometermiddleware/swift.py @@ -37,6 +37,13 @@ before "proxy-server" and add the following filter in the file: topic = notifications # skip metering of requests from listed project ids ignore_projects = , + # Whether to send events to messaging driver in a background thread + nonblocking_notify = False + # Queue size for sending notifications in background thread (0=unlimited). + # New notifications will be discarded if the queue is full. + send_queue_size = 1000 + # Logging level control + log_level = WARNING """ import functools import logging @@ -50,7 +57,9 @@ from pycadf import measurement as cadf_measurement from pycadf import metric as cadf_metric from pycadf import resource as cadf_resource import six +import six.moves.queue as queue import six.moves.urllib.parse as urlparse +import threading _LOG = logging.getLogger(__name__) @@ -99,6 +108,9 @@ class InputProxy(object): class Swift(object): """Swift middleware used for counting requests.""" + event_queue = None + threadLock = threading.Lock() + def __init__(self, app, conf): self._app = app self.ignore_projects = [ @@ -122,6 +134,27 @@ class Swift(object): if self.reseller_prefix and self.reseller_prefix[-1] != '_': self.reseller_prefix += '_' + _LOG.setLevel(getattr(logging, conf.get('log_level', 'WARNING'))) + + # NOTE: If the background thread's send queue fills up, the event will + # be discarded + # + # For backward compatibility we default to False and therefore wait for + # sending to complete. This causes swift proxy to hang if the + # destination is unavailable. + self.nonblocking_notify = conf.get('nonblocking_notify', False) + + # Initialize the sending queue and thread, but only once + if self.nonblocking_notify and Swift.event_queue is None: + Swift.threadLock.acquire() + if Swift.event_queue is None: + send_queue_size = int(conf.get('send_queue_size', 1000)) + Swift.event_queue = queue.Queue(send_queue_size) + Swift.event_sender = SendEventThread(self._notifier) + Swift.event_sender.start() + _LOG.debug('Started sender thread') + Swift.threadLock.release() + def __call__(self, env, start_response): start_response_args = [None] input_proxy = InputProxy(env['wsgi.input']) @@ -244,7 +277,38 @@ class Swift(object): metric=cadf_metric.Metric( name='storage.objects.outgoing.bytes', unit='B'))) - self._notifier.info({}, 'objectstore.http.request', event.as_dict()) + if self.nonblocking_notify: + try: + Swift.event_queue.put(event, False) + _LOG.debug('Event %s added to send queue', event.id) + except queue.Full: + _LOG.warning('Send queue FULL: Event %s not added', event.id) + else: + Swift.send_notification(self._notifier, event) + + @staticmethod + def send_notification(notifier, event): + notifier.info({}, 'objectstore.http.request', event.as_dict()) + + +class SendEventThread(threading.Thread): + + def __init__(self, notifier): + super(SendEventThread, self).__init__() + self.notifier = notifier + self.daemon = True + + def run(self): + """Send events without blocking swift proxy.""" + while True: + try: + _LOG.debug('Wait for event from send queue') + event = Swift.event_queue.get() + _LOG.debug('Got event %s from queue - now send it', event.id) + Swift.send_notification(self.notifier, event) + _LOG.debug('Event %s sent.', event.id) + except Exception: + _LOG.exception("SendEventThread loop exception") def filter_factory(global_conf, **local_conf): diff --git a/ceilometermiddleware/tests/test_swift.py b/ceilometermiddleware/tests/test_swift.py index 2e7287a..4c0f43e 100644 --- a/ceilometermiddleware/tests/test_swift.py +++ b/ceilometermiddleware/tests/test_swift.py @@ -20,6 +20,7 @@ import six from ceilometermiddleware import swift from ceilometermiddleware.tests import base as tests_base +from threading import Event class FakeApp(object): @@ -90,6 +91,31 @@ class TestSwift(tests_base.TestCase): self.assertEqual('obj', metadata['object']) self.assertEqual('get', data[2]['target']['action']) + def test_get_background(self): + notified = Event() + app = swift.Swift(FakeApp(), + {"nonblocking_notify": "True", + "send_queue_size": "1"}) + req = FakeRequest('/1.0/account/container/obj', + environ={'REQUEST_METHOD': 'GET'}) + with mock.patch('oslo_messaging.Notifier.info', + side_effect=lambda *args, **kwargs: notified.set() + ) as notify: + resp = app(req.environ, self.start_response) + self.assertEqual(["This string is 28 bytes long"], list(resp)) + notified.wait() + self.assertEqual(1, len(notify.call_args_list)) + data = notify.call_args_list[0][0] + self.assertEqual('objectstore.http.request', data[1]) + self.assertEqual(28, data[2]['measurements'][0]['result']) + self.assertEqual('storage.objects.outgoing.bytes', + data[2]['measurements'][0]['metric']['name']) + metadata = data[2]['target']['metadata'] + self.assertEqual('1.0', metadata['version']) + self.assertEqual('container', metadata['container']) + self.assertEqual('obj', metadata['object']) + self.assertEqual('get', data[2]['target']['action']) + def test_put(self): app = swift.Swift(FakeApp(body=['']), {}) req = FakeRequest(