fake_notifier: Refactor wait_for_versioned_notification

A subsequent change will add wait_for_legacy_notifications(). This
refactor achieves 2 things:

* Make it easier to return the notifications we were waiting for,
  required in both current tests and subsequent legacy tests.
* Allow code re-use between fake legacy and versioned notifications.

It incidentally fixes a minor bug in
NotificationSampleTestBase._wait_for_notifications where we may wait for
up to double the specified timeout. It is also thread safe without
having to audit all code paths for potential eventlet context switch
points.

Change-Id: Ie4676eed0039c927b35af7573f0b57fd762adbaa
This commit is contained in:
Matthew Booth 2017-08-01 15:35:02 +01:00 committed by Balazs Gibizer
parent 3f6447120b
commit ea00db9cca
2 changed files with 39 additions and 32 deletions

View File

@ -13,7 +13,6 @@
# under the License.
import os
import time
from oslo_config import cfg
from oslo_serialization import jsonutils
@ -208,26 +207,15 @@ class NotificationSampleTestBase(test.TestCase,
if notification['event_type'] == event_type]
def _wait_for_notification(self, event_type, timeout=1.0):
received = fake_notifier.wait_for_versioned_notification(
event_type, timeout)
notifications = fake_notifier.wait_for_versioned_notifications(
event_type, timeout=timeout)
self.assertTrue(
received,
len(notifications) > 0,
'notification %s hasn\'t been received' % event_type)
def _wait_for_notifications(self, event_type, expected_count, timeout=1.0):
notifications = []
start_time = time.clock()
while (len(notifications) < expected_count
and time.clock() - start_time < timeout):
fake_notifier.wait_for_versioned_notification(event_type, timeout)
notifications += self._get_notifications(event_type)
# NOTE(gibi): reading and then resetting the fake_notifier without
# synchronization doesn't lead to race condition as the only
# parallelism is due to eventlet.
fake_notifier.reset()
notifications = fake_notifier.wait_for_versioned_notifications(
event_type, n_events=expected_count, timeout=timeout)
self.assertEqual(expected_count, len(notifications),
'Unexpected number of %s notifications '
'within the given timeout. '

View File

@ -13,24 +13,52 @@
# under the License.
import collections
import copy
import functools
import threading
import oslo_messaging as messaging
from oslo_serialization import jsonutils
from oslo_utils import timeutils
from nova import rpc
SUBSCRIBERS = collections.defaultdict(threading.Event)
NOTIFICATIONS = []
class _Sub(object):
"""Allow a subscriber to efficiently wait for an event to occur, and
retrieve events which have occured.
"""
def __init__(self):
self._cond = threading.Condition()
self._notifications = []
def received(self, notification):
with self._cond:
self._notifications.append(notification)
self._cond.notifyAll()
def wait_n(self, n, timeout=1.0):
"""Wait until at least n notifications have been received, and return
them. May return less than n notifications if timeout is reached.
"""
with timeutils.StopWatch(timeout) as timer:
with self._cond:
while len(self._notifications) < n and not timer.expired():
self._cond.wait(timer.leftover())
return copy.copy(self._notifications)
VERSIONED_SUBS = collections.defaultdict(_Sub)
VERSIONED_NOTIFICATIONS = []
NOTIFICATIONS = []
def reset():
del NOTIFICATIONS[:]
del VERSIONED_NOTIFICATIONS[:]
SUBSCRIBERS.clear()
VERSIONED_SUBS.clear()
FakeMessage = collections.namedtuple('Message',
@ -81,7 +109,7 @@ class FakeVersionedNotifier(FakeNotifier):
'event_type': event_type,
'payload': payload}
VERSIONED_NOTIFICATIONS.append(notification)
_notify_subscribers(notification)
VERSIONED_SUBS[event_type].received(notification)
def stub_notifier(test):
@ -101,14 +129,5 @@ def stub_notifier(test):
None)))
def wait_for_versioned_notification(event_type, timeout=1.0):
# NOTE: The event stored in SUBSCRIBERS is not used for synchronizing
# the access to shared state as there is no parallel access to
# SUBSCRIBERS because the only parallelism is due to eventlet.
# The event is simply used to avoid polling the list of received
# notifications
return SUBSCRIBERS[event_type].wait(timeout)
def _notify_subscribers(notification):
SUBSCRIBERS[notification['event_type']].set()
def wait_for_versioned_notifications(event_type, n_events=1, timeout=1.0):
return VERSIONED_SUBS[event_type].wait_n(n_events, timeout=timeout)