From c7cba1fe02e03744e5b45e54c84f3e971b450698 Mon Sep 17 00:00:00 2001 From: Darren Hague Date: Mon, 13 Jun 2016 15:48:30 +0100 Subject: [PATCH] Add background thread notifier sending ability Add ability to hand off notifier event sending to a queue processed by a background thread so as not to block swift proxy. This fixes an issue whereby if ceilometer's RabbitMQ went down then the swift proxy would wait for it to come back up, effectively coupling Swift's availabiilty to that of Ceilometer's RabbitMQ. Background sending is activated by setting config item 'nonblocking_notify' to True. Queue size defaults to 1000, which can be overridden by setting config item 'send_queue_size'. If the queue is full, new events are discarded. Change-Id: I3da2b88b2bc9b7fd8c572a0085fa1d78c4f54701 --- ceilometermiddleware/swift.py | 66 +++++++++++++++++++++++- ceilometermiddleware/tests/test_swift.py | 26 ++++++++++ 2 files changed, 91 insertions(+), 1 deletion(-) diff --git a/ceilometermiddleware/swift.py b/ceilometermiddleware/swift.py index 411351a..f9c7605 100644 --- a/ceilometermiddleware/swift.py +++ b/ceilometermiddleware/swift.py @@ -37,6 +37,13 @@ before "proxy-server" and add the following filter in the file: topic = notifications # skip metering of requests from listed project ids ignore_projects = , + # Whether to send events to messaging driver in a background thread + nonblocking_notify = False + # Queue size for sending notifications in background thread (0=unlimited). + # New notifications will be discarded if the queue is full. + send_queue_size = 1000 + # Logging level control + log_level = WARNING """ import functools import logging @@ -50,7 +57,9 @@ from pycadf import measurement as cadf_measurement from pycadf import metric as cadf_metric from pycadf import resource as cadf_resource import six +import six.moves.queue as queue import six.moves.urllib.parse as urlparse +import threading _LOG = logging.getLogger(__name__) @@ -99,6 +108,9 @@ class InputProxy(object): class Swift(object): """Swift middleware used for counting requests.""" + event_queue = None + threadLock = threading.Lock() + def __init__(self, app, conf): self._app = app self.ignore_projects = [ @@ -122,6 +134,27 @@ class Swift(object): if self.reseller_prefix and self.reseller_prefix[-1] != '_': self.reseller_prefix += '_' + _LOG.setLevel(getattr(logging, conf.get('log_level', 'WARNING'))) + + # NOTE: If the background thread's send queue fills up, the event will + # be discarded + # + # For backward compatibility we default to False and therefore wait for + # sending to complete. This causes swift proxy to hang if the + # destination is unavailable. + self.nonblocking_notify = conf.get('nonblocking_notify', False) + + # Initialize the sending queue and thread, but only once + if self.nonblocking_notify and Swift.event_queue is None: + Swift.threadLock.acquire() + if Swift.event_queue is None: + send_queue_size = int(conf.get('send_queue_size', 1000)) + Swift.event_queue = queue.Queue(send_queue_size) + Swift.event_sender = SendEventThread(self._notifier) + Swift.event_sender.start() + _LOG.debug('Started sender thread') + Swift.threadLock.release() + def __call__(self, env, start_response): start_response_args = [None] input_proxy = InputProxy(env['wsgi.input']) @@ -244,7 +277,38 @@ class Swift(object): metric=cadf_metric.Metric( name='storage.objects.outgoing.bytes', unit='B'))) - self._notifier.info({}, 'objectstore.http.request', event.as_dict()) + if self.nonblocking_notify: + try: + Swift.event_queue.put(event, False) + _LOG.debug('Event %s added to send queue', event.id) + except queue.Full: + _LOG.warning('Send queue FULL: Event %s not added', event.id) + else: + Swift.send_notification(self._notifier, event) + + @staticmethod + def send_notification(notifier, event): + notifier.info({}, 'objectstore.http.request', event.as_dict()) + + +class SendEventThread(threading.Thread): + + def __init__(self, notifier): + super(SendEventThread, self).__init__() + self.notifier = notifier + self.daemon = True + + def run(self): + """Send events without blocking swift proxy.""" + while True: + try: + _LOG.debug('Wait for event from send queue') + event = Swift.event_queue.get() + _LOG.debug('Got event %s from queue - now send it', event.id) + Swift.send_notification(self.notifier, event) + _LOG.debug('Event %s sent.', event.id) + except Exception: + _LOG.exception("SendEventThread loop exception") def filter_factory(global_conf, **local_conf): diff --git a/ceilometermiddleware/tests/test_swift.py b/ceilometermiddleware/tests/test_swift.py index 2e7287a..4c0f43e 100644 --- a/ceilometermiddleware/tests/test_swift.py +++ b/ceilometermiddleware/tests/test_swift.py @@ -20,6 +20,7 @@ import six from ceilometermiddleware import swift from ceilometermiddleware.tests import base as tests_base +from threading import Event class FakeApp(object): @@ -90,6 +91,31 @@ class TestSwift(tests_base.TestCase): self.assertEqual('obj', metadata['object']) self.assertEqual('get', data[2]['target']['action']) + def test_get_background(self): + notified = Event() + app = swift.Swift(FakeApp(), + {"nonblocking_notify": "True", + "send_queue_size": "1"}) + req = FakeRequest('/1.0/account/container/obj', + environ={'REQUEST_METHOD': 'GET'}) + with mock.patch('oslo_messaging.Notifier.info', + side_effect=lambda *args, **kwargs: notified.set() + ) as notify: + resp = app(req.environ, self.start_response) + self.assertEqual(["This string is 28 bytes long"], list(resp)) + notified.wait() + self.assertEqual(1, len(notify.call_args_list)) + data = notify.call_args_list[0][0] + self.assertEqual('objectstore.http.request', data[1]) + self.assertEqual(28, data[2]['measurements'][0]['result']) + self.assertEqual('storage.objects.outgoing.bytes', + data[2]['measurements'][0]['metric']['name']) + metadata = data[2]['target']['metadata'] + self.assertEqual('1.0', metadata['version']) + self.assertEqual('container', metadata['container']) + self.assertEqual('obj', metadata['object']) + self.assertEqual('get', data[2]['target']['action']) + def test_put(self): app = swift.Swift(FakeApp(body=['']), {}) req = FakeRequest(