Merge "Add background thread notifier sending ability"
This commit is contained in:
commit
bf17fdab16
|
@ -37,6 +37,13 @@ before "proxy-server" and add the following filter in the file:
|
|||
topic = notifications
|
||||
# skip metering of requests from listed project ids
|
||||
ignore_projects = <proj_uuid>, <proj_uuid2>
|
||||
# Whether to send events to messaging driver in a background thread
|
||||
nonblocking_notify = False
|
||||
# Queue size for sending notifications in background thread (0=unlimited).
|
||||
# New notifications will be discarded if the queue is full.
|
||||
send_queue_size = 1000
|
||||
# Logging level control
|
||||
log_level = WARNING
|
||||
"""
|
||||
import functools
|
||||
import logging
|
||||
|
@ -50,7 +57,9 @@ from pycadf import measurement as cadf_measurement
|
|||
from pycadf import metric as cadf_metric
|
||||
from pycadf import resource as cadf_resource
|
||||
import six
|
||||
import six.moves.queue as queue
|
||||
import six.moves.urllib.parse as urlparse
|
||||
import threading
|
||||
|
||||
_LOG = logging.getLogger(__name__)
|
||||
|
||||
|
@ -99,6 +108,9 @@ class InputProxy(object):
|
|||
class Swift(object):
|
||||
"""Swift middleware used for counting requests."""
|
||||
|
||||
event_queue = None
|
||||
threadLock = threading.Lock()
|
||||
|
||||
def __init__(self, app, conf):
|
||||
self._app = app
|
||||
self.ignore_projects = [
|
||||
|
@ -122,6 +134,27 @@ class Swift(object):
|
|||
if self.reseller_prefix and self.reseller_prefix[-1] != '_':
|
||||
self.reseller_prefix += '_'
|
||||
|
||||
_LOG.setLevel(getattr(logging, conf.get('log_level', 'WARNING')))
|
||||
|
||||
# NOTE: If the background thread's send queue fills up, the event will
|
||||
# be discarded
|
||||
#
|
||||
# For backward compatibility we default to False and therefore wait for
|
||||
# sending to complete. This causes swift proxy to hang if the
|
||||
# destination is unavailable.
|
||||
self.nonblocking_notify = conf.get('nonblocking_notify', False)
|
||||
|
||||
# Initialize the sending queue and thread, but only once
|
||||
if self.nonblocking_notify and Swift.event_queue is None:
|
||||
Swift.threadLock.acquire()
|
||||
if Swift.event_queue is None:
|
||||
send_queue_size = int(conf.get('send_queue_size', 1000))
|
||||
Swift.event_queue = queue.Queue(send_queue_size)
|
||||
Swift.event_sender = SendEventThread(self._notifier)
|
||||
Swift.event_sender.start()
|
||||
_LOG.debug('Started sender thread')
|
||||
Swift.threadLock.release()
|
||||
|
||||
def __call__(self, env, start_response):
|
||||
start_response_args = [None]
|
||||
input_proxy = InputProxy(env['wsgi.input'])
|
||||
|
@ -244,7 +277,38 @@ class Swift(object):
|
|||
metric=cadf_metric.Metric(
|
||||
name='storage.objects.outgoing.bytes', unit='B')))
|
||||
|
||||
self._notifier.info({}, 'objectstore.http.request', event.as_dict())
|
||||
if self.nonblocking_notify:
|
||||
try:
|
||||
Swift.event_queue.put(event, False)
|
||||
_LOG.debug('Event %s added to send queue', event.id)
|
||||
except queue.Full:
|
||||
_LOG.warning('Send queue FULL: Event %s not added', event.id)
|
||||
else:
|
||||
Swift.send_notification(self._notifier, event)
|
||||
|
||||
@staticmethod
|
||||
def send_notification(notifier, event):
|
||||
notifier.info({}, 'objectstore.http.request', event.as_dict())
|
||||
|
||||
|
||||
class SendEventThread(threading.Thread):
|
||||
|
||||
def __init__(self, notifier):
|
||||
super(SendEventThread, self).__init__()
|
||||
self.notifier = notifier
|
||||
self.daemon = True
|
||||
|
||||
def run(self):
|
||||
"""Send events without blocking swift proxy."""
|
||||
while True:
|
||||
try:
|
||||
_LOG.debug('Wait for event from send queue')
|
||||
event = Swift.event_queue.get()
|
||||
_LOG.debug('Got event %s from queue - now send it', event.id)
|
||||
Swift.send_notification(self.notifier, event)
|
||||
_LOG.debug('Event %s sent.', event.id)
|
||||
except Exception:
|
||||
_LOG.exception("SendEventThread loop exception")
|
||||
|
||||
|
||||
def filter_factory(global_conf, **local_conf):
|
||||
|
|
|
@ -20,6 +20,7 @@ import six
|
|||
|
||||
from ceilometermiddleware import swift
|
||||
from ceilometermiddleware.tests import base as tests_base
|
||||
from threading import Event
|
||||
|
||||
|
||||
class FakeApp(object):
|
||||
|
@ -90,6 +91,31 @@ class TestSwift(tests_base.TestCase):
|
|||
self.assertEqual('obj', metadata['object'])
|
||||
self.assertEqual('get', data[2]['target']['action'])
|
||||
|
||||
def test_get_background(self):
|
||||
notified = Event()
|
||||
app = swift.Swift(FakeApp(),
|
||||
{"nonblocking_notify": "True",
|
||||
"send_queue_size": "1"})
|
||||
req = FakeRequest('/1.0/account/container/obj',
|
||||
environ={'REQUEST_METHOD': 'GET'})
|
||||
with mock.patch('oslo_messaging.Notifier.info',
|
||||
side_effect=lambda *args, **kwargs: notified.set()
|
||||
) as notify:
|
||||
resp = app(req.environ, self.start_response)
|
||||
self.assertEqual(["This string is 28 bytes long"], list(resp))
|
||||
notified.wait()
|
||||
self.assertEqual(1, len(notify.call_args_list))
|
||||
data = notify.call_args_list[0][0]
|
||||
self.assertEqual('objectstore.http.request', data[1])
|
||||
self.assertEqual(28, data[2]['measurements'][0]['result'])
|
||||
self.assertEqual('storage.objects.outgoing.bytes',
|
||||
data[2]['measurements'][0]['metric']['name'])
|
||||
metadata = data[2]['target']['metadata']
|
||||
self.assertEqual('1.0', metadata['version'])
|
||||
self.assertEqual('container', metadata['container'])
|
||||
self.assertEqual('obj', metadata['object'])
|
||||
self.assertEqual('get', data[2]['target']['action'])
|
||||
|
||||
def test_put(self):
|
||||
app = swift.Swift(FakeApp(body=['']), {})
|
||||
req = FakeRequest(
|
||||
|
|
Loading…
Reference in New Issue