Merge "Add background thread notifier sending ability"

This commit is contained in:
Jenkins 2016-07-13 09:54:25 +00:00 committed by Gerrit Code Review
commit bf17fdab16
2 changed files with 91 additions and 1 deletions

View File

@ -37,6 +37,13 @@ before "proxy-server" and add the following filter in the file:
topic = notifications
# skip metering of requests from listed project ids
ignore_projects = <proj_uuid>, <proj_uuid2>
# Whether to send events to messaging driver in a background thread
nonblocking_notify = False
# Queue size for sending notifications in background thread (0=unlimited).
# New notifications will be discarded if the queue is full.
send_queue_size = 1000
# Logging level control
log_level = WARNING
"""
import functools
import logging
@ -50,7 +57,9 @@ from pycadf import measurement as cadf_measurement
from pycadf import metric as cadf_metric
from pycadf import resource as cadf_resource
import six
import six.moves.queue as queue
import six.moves.urllib.parse as urlparse
import threading
_LOG = logging.getLogger(__name__)
@ -99,6 +108,9 @@ class InputProxy(object):
class Swift(object):
"""Swift middleware used for counting requests."""
event_queue = None
threadLock = threading.Lock()
def __init__(self, app, conf):
self._app = app
self.ignore_projects = [
@ -122,6 +134,27 @@ class Swift(object):
if self.reseller_prefix and self.reseller_prefix[-1] != '_':
self.reseller_prefix += '_'
_LOG.setLevel(getattr(logging, conf.get('log_level', 'WARNING')))
# NOTE: If the background thread's send queue fills up, the event will
# be discarded
#
# For backward compatibility we default to False and therefore wait for
# sending to complete. This causes swift proxy to hang if the
# destination is unavailable.
self.nonblocking_notify = conf.get('nonblocking_notify', False)
# Initialize the sending queue and thread, but only once
if self.nonblocking_notify and Swift.event_queue is None:
Swift.threadLock.acquire()
if Swift.event_queue is None:
send_queue_size = int(conf.get('send_queue_size', 1000))
Swift.event_queue = queue.Queue(send_queue_size)
Swift.event_sender = SendEventThread(self._notifier)
Swift.event_sender.start()
_LOG.debug('Started sender thread')
Swift.threadLock.release()
def __call__(self, env, start_response):
start_response_args = [None]
input_proxy = InputProxy(env['wsgi.input'])
@ -244,7 +277,38 @@ class Swift(object):
metric=cadf_metric.Metric(
name='storage.objects.outgoing.bytes', unit='B')))
self._notifier.info({}, 'objectstore.http.request', event.as_dict())
if self.nonblocking_notify:
try:
Swift.event_queue.put(event, False)
_LOG.debug('Event %s added to send queue', event.id)
except queue.Full:
_LOG.warning('Send queue FULL: Event %s not added', event.id)
else:
Swift.send_notification(self._notifier, event)
@staticmethod
def send_notification(notifier, event):
notifier.info({}, 'objectstore.http.request', event.as_dict())
class SendEventThread(threading.Thread):
def __init__(self, notifier):
super(SendEventThread, self).__init__()
self.notifier = notifier
self.daemon = True
def run(self):
"""Send events without blocking swift proxy."""
while True:
try:
_LOG.debug('Wait for event from send queue')
event = Swift.event_queue.get()
_LOG.debug('Got event %s from queue - now send it', event.id)
Swift.send_notification(self.notifier, event)
_LOG.debug('Event %s sent.', event.id)
except Exception:
_LOG.exception("SendEventThread loop exception")
def filter_factory(global_conf, **local_conf):

View File

@ -20,6 +20,7 @@ import six
from ceilometermiddleware import swift
from ceilometermiddleware.tests import base as tests_base
from threading import Event
class FakeApp(object):
@ -90,6 +91,31 @@ class TestSwift(tests_base.TestCase):
self.assertEqual('obj', metadata['object'])
self.assertEqual('get', data[2]['target']['action'])
def test_get_background(self):
notified = Event()
app = swift.Swift(FakeApp(),
{"nonblocking_notify": "True",
"send_queue_size": "1"})
req = FakeRequest('/1.0/account/container/obj',
environ={'REQUEST_METHOD': 'GET'})
with mock.patch('oslo_messaging.Notifier.info',
side_effect=lambda *args, **kwargs: notified.set()
) as notify:
resp = app(req.environ, self.start_response)
self.assertEqual(["This string is 28 bytes long"], list(resp))
notified.wait()
self.assertEqual(1, len(notify.call_args_list))
data = notify.call_args_list[0][0]
self.assertEqual('objectstore.http.request', data[1])
self.assertEqual(28, data[2]['measurements'][0]['result'])
self.assertEqual('storage.objects.outgoing.bytes',
data[2]['measurements'][0]['metric']['name'])
metadata = data[2]['target']['metadata']
self.assertEqual('1.0', metadata['version'])
self.assertEqual('container', metadata['container'])
self.assertEqual('obj', metadata['object'])
self.assertEqual('get', data[2]['target']['action'])
def test_put(self):
app = swift.Swift(FakeApp(body=['']), {})
req = FakeRequest(