Add background thread notifier sending ability
Add ability to hand off notifier event sending to a queue processed by a background thread so as not to block swift proxy. This fixes an issue whereby if ceilometer's RabbitMQ went down then the swift proxy would wait for it to come back up, effectively coupling Swift's availabiilty to that of Ceilometer's RabbitMQ. Background sending is activated by setting config item 'nonblocking_notify' to True. Queue size defaults to 1000, which can be overridden by setting config item 'send_queue_size'. If the queue is full, new events are discarded. Change-Id: I3da2b88b2bc9b7fd8c572a0085fa1d78c4f54701
This commit is contained in:
parent
c962fedb85
commit
c7cba1fe02
|
@ -37,6 +37,13 @@ before "proxy-server" and add the following filter in the file:
|
|||
topic = notifications
|
||||
# skip metering of requests from listed project ids
|
||||
ignore_projects = <proj_uuid>, <proj_uuid2>
|
||||
# Whether to send events to messaging driver in a background thread
|
||||
nonblocking_notify = False
|
||||
# Queue size for sending notifications in background thread (0=unlimited).
|
||||
# New notifications will be discarded if the queue is full.
|
||||
send_queue_size = 1000
|
||||
# Logging level control
|
||||
log_level = WARNING
|
||||
"""
|
||||
import functools
|
||||
import logging
|
||||
|
@ -50,7 +57,9 @@ from pycadf import measurement as cadf_measurement
|
|||
from pycadf import metric as cadf_metric
|
||||
from pycadf import resource as cadf_resource
|
||||
import six
|
||||
import six.moves.queue as queue
|
||||
import six.moves.urllib.parse as urlparse
|
||||
import threading
|
||||
|
||||
_LOG = logging.getLogger(__name__)
|
||||
|
||||
|
@ -99,6 +108,9 @@ class InputProxy(object):
|
|||
class Swift(object):
|
||||
"""Swift middleware used for counting requests."""
|
||||
|
||||
event_queue = None
|
||||
threadLock = threading.Lock()
|
||||
|
||||
def __init__(self, app, conf):
|
||||
self._app = app
|
||||
self.ignore_projects = [
|
||||
|
@ -122,6 +134,27 @@ class Swift(object):
|
|||
if self.reseller_prefix and self.reseller_prefix[-1] != '_':
|
||||
self.reseller_prefix += '_'
|
||||
|
||||
_LOG.setLevel(getattr(logging, conf.get('log_level', 'WARNING')))
|
||||
|
||||
# NOTE: If the background thread's send queue fills up, the event will
|
||||
# be discarded
|
||||
#
|
||||
# For backward compatibility we default to False and therefore wait for
|
||||
# sending to complete. This causes swift proxy to hang if the
|
||||
# destination is unavailable.
|
||||
self.nonblocking_notify = conf.get('nonblocking_notify', False)
|
||||
|
||||
# Initialize the sending queue and thread, but only once
|
||||
if self.nonblocking_notify and Swift.event_queue is None:
|
||||
Swift.threadLock.acquire()
|
||||
if Swift.event_queue is None:
|
||||
send_queue_size = int(conf.get('send_queue_size', 1000))
|
||||
Swift.event_queue = queue.Queue(send_queue_size)
|
||||
Swift.event_sender = SendEventThread(self._notifier)
|
||||
Swift.event_sender.start()
|
||||
_LOG.debug('Started sender thread')
|
||||
Swift.threadLock.release()
|
||||
|
||||
def __call__(self, env, start_response):
|
||||
start_response_args = [None]
|
||||
input_proxy = InputProxy(env['wsgi.input'])
|
||||
|
@ -244,7 +277,38 @@ class Swift(object):
|
|||
metric=cadf_metric.Metric(
|
||||
name='storage.objects.outgoing.bytes', unit='B')))
|
||||
|
||||
self._notifier.info({}, 'objectstore.http.request', event.as_dict())
|
||||
if self.nonblocking_notify:
|
||||
try:
|
||||
Swift.event_queue.put(event, False)
|
||||
_LOG.debug('Event %s added to send queue', event.id)
|
||||
except queue.Full:
|
||||
_LOG.warning('Send queue FULL: Event %s not added', event.id)
|
||||
else:
|
||||
Swift.send_notification(self._notifier, event)
|
||||
|
||||
@staticmethod
|
||||
def send_notification(notifier, event):
|
||||
notifier.info({}, 'objectstore.http.request', event.as_dict())
|
||||
|
||||
|
||||
class SendEventThread(threading.Thread):
|
||||
|
||||
def __init__(self, notifier):
|
||||
super(SendEventThread, self).__init__()
|
||||
self.notifier = notifier
|
||||
self.daemon = True
|
||||
|
||||
def run(self):
|
||||
"""Send events without blocking swift proxy."""
|
||||
while True:
|
||||
try:
|
||||
_LOG.debug('Wait for event from send queue')
|
||||
event = Swift.event_queue.get()
|
||||
_LOG.debug('Got event %s from queue - now send it', event.id)
|
||||
Swift.send_notification(self.notifier, event)
|
||||
_LOG.debug('Event %s sent.', event.id)
|
||||
except Exception:
|
||||
_LOG.exception("SendEventThread loop exception")
|
||||
|
||||
|
||||
def filter_factory(global_conf, **local_conf):
|
||||
|
|
|
@ -20,6 +20,7 @@ import six
|
|||
|
||||
from ceilometermiddleware import swift
|
||||
from ceilometermiddleware.tests import base as tests_base
|
||||
from threading import Event
|
||||
|
||||
|
||||
class FakeApp(object):
|
||||
|
@ -90,6 +91,31 @@ class TestSwift(tests_base.TestCase):
|
|||
self.assertEqual('obj', metadata['object'])
|
||||
self.assertEqual('get', data[2]['target']['action'])
|
||||
|
||||
def test_get_background(self):
|
||||
notified = Event()
|
||||
app = swift.Swift(FakeApp(),
|
||||
{"nonblocking_notify": "True",
|
||||
"send_queue_size": "1"})
|
||||
req = FakeRequest('/1.0/account/container/obj',
|
||||
environ={'REQUEST_METHOD': 'GET'})
|
||||
with mock.patch('oslo_messaging.Notifier.info',
|
||||
side_effect=lambda *args, **kwargs: notified.set()
|
||||
) as notify:
|
||||
resp = app(req.environ, self.start_response)
|
||||
self.assertEqual(["This string is 28 bytes long"], list(resp))
|
||||
notified.wait()
|
||||
self.assertEqual(1, len(notify.call_args_list))
|
||||
data = notify.call_args_list[0][0]
|
||||
self.assertEqual('objectstore.http.request', data[1])
|
||||
self.assertEqual(28, data[2]['measurements'][0]['result'])
|
||||
self.assertEqual('storage.objects.outgoing.bytes',
|
||||
data[2]['measurements'][0]['metric']['name'])
|
||||
metadata = data[2]['target']['metadata']
|
||||
self.assertEqual('1.0', metadata['version'])
|
||||
self.assertEqual('container', metadata['container'])
|
||||
self.assertEqual('obj', metadata['object'])
|
||||
self.assertEqual('get', data[2]['target']['action'])
|
||||
|
||||
def test_put(self):
|
||||
app = swift.Swift(FakeApp(body=['']), {})
|
||||
req = FakeRequest(
|
||||
|
|
Loading…
Reference in New Issue