Add background thread notifier sending ability

Add ability to hand off notifier event sending to a queue processed by
a background thread so as not to block swift proxy. This fixes an
issue whereby if ceilometer's RabbitMQ went down then the swift proxy
would wait for it to come back up, effectively coupling Swift's
availabiilty to that of Ceilometer's RabbitMQ.
Background sending is activated by setting config item
'nonblocking_notify' to True.
Queue size defaults to 1000, which can be overridden by setting config
item 'send_queue_size'. If the queue is full, new events are discarded.

Change-Id: I3da2b88b2bc9b7fd8c572a0085fa1d78c4f54701
This commit is contained in:
Darren Hague 2016-06-13 15:48:30 +01:00
parent c962fedb85
commit c7cba1fe02
2 changed files with 91 additions and 1 deletions

View File

@ -37,6 +37,13 @@ before "proxy-server" and add the following filter in the file:
topic = notifications
# skip metering of requests from listed project ids
ignore_projects = <proj_uuid>, <proj_uuid2>
# Whether to send events to messaging driver in a background thread
nonblocking_notify = False
# Queue size for sending notifications in background thread (0=unlimited).
# New notifications will be discarded if the queue is full.
send_queue_size = 1000
# Logging level control
log_level = WARNING
import functools
import logging
@ -50,7 +57,9 @@ from pycadf import measurement as cadf_measurement
from pycadf import metric as cadf_metric
from pycadf import resource as cadf_resource
import six
import six.moves.queue as queue
import six.moves.urllib.parse as urlparse
import threading
_LOG = logging.getLogger(__name__)
@ -99,6 +108,9 @@ class InputProxy(object):
class Swift(object):
"""Swift middleware used for counting requests."""
event_queue = None
threadLock = threading.Lock()
def __init__(self, app, conf):
self._app = app
self.ignore_projects = [
@ -122,6 +134,27 @@ class Swift(object):
if self.reseller_prefix and self.reseller_prefix[-1] != '_':
self.reseller_prefix += '_'
_LOG.setLevel(getattr(logging, conf.get('log_level', 'WARNING')))
# NOTE: If the background thread's send queue fills up, the event will
# be discarded
# For backward compatibility we default to False and therefore wait for
# sending to complete. This causes swift proxy to hang if the
# destination is unavailable.
self.nonblocking_notify = conf.get('nonblocking_notify', False)
# Initialize the sending queue and thread, but only once
if self.nonblocking_notify and Swift.event_queue is None:
if Swift.event_queue is None:
send_queue_size = int(conf.get('send_queue_size', 1000))
Swift.event_queue = queue.Queue(send_queue_size)
Swift.event_sender = SendEventThread(self._notifier)
_LOG.debug('Started sender thread')
def __call__(self, env, start_response):
start_response_args = [None]
input_proxy = InputProxy(env['wsgi.input'])
@ -244,7 +277,38 @@ class Swift(object):
name='storage.objects.outgoing.bytes', unit='B'))){}, 'objectstore.http.request', event.as_dict())
if self.nonblocking_notify:
Swift.event_queue.put(event, False)
_LOG.debug('Event %s added to send queue',
except queue.Full:
_LOG.warning('Send queue FULL: Event %s not added',
Swift.send_notification(self._notifier, event)
def send_notification(notifier, event):{}, 'objectstore.http.request', event.as_dict())
class SendEventThread(threading.Thread):
def __init__(self, notifier):
super(SendEventThread, self).__init__()
self.notifier = notifier
self.daemon = True
def run(self):
"""Send events without blocking swift proxy."""
while True:
_LOG.debug('Wait for event from send queue')
event = Swift.event_queue.get()
_LOG.debug('Got event %s from queue - now send it',
Swift.send_notification(self.notifier, event)
_LOG.debug('Event %s sent.',
except Exception:
_LOG.exception("SendEventThread loop exception")
def filter_factory(global_conf, **local_conf):

View File

@ -20,6 +20,7 @@ import six
from ceilometermiddleware import swift
from ceilometermiddleware.tests import base as tests_base
from threading import Event
class FakeApp(object):
@ -90,6 +91,31 @@ class TestSwift(tests_base.TestCase):
self.assertEqual('obj', metadata['object'])
self.assertEqual('get', data[2]['target']['action'])
def test_get_background(self):
notified = Event()
app = swift.Swift(FakeApp(),
{"nonblocking_notify": "True",
"send_queue_size": "1"})
req = FakeRequest('/1.0/account/container/obj',
environ={'REQUEST_METHOD': 'GET'})
with mock.patch('',
side_effect=lambda *args, **kwargs: notified.set()
) as notify:
resp = app(req.environ, self.start_response)
self.assertEqual(["This string is 28 bytes long"], list(resp))
self.assertEqual(1, len(notify.call_args_list))
data = notify.call_args_list[0][0]
self.assertEqual('objectstore.http.request', data[1])
self.assertEqual(28, data[2]['measurements'][0]['result'])
metadata = data[2]['target']['metadata']
self.assertEqual('1.0', metadata['version'])
self.assertEqual('container', metadata['container'])
self.assertEqual('obj', metadata['object'])
self.assertEqual('get', data[2]['target']['action'])
def test_put(self):
app = swift.Swift(FakeApp(body=['']), {})
req = FakeRequest(