Use get_notification_transport() for notifications

In oslo.messaging 2.9.0, the RPC and notification buses were decoupled
into separate transports. This changes over the Nova notifier to use the
notification transport instead of the original transport (which is now
used for RPC).

Change-Id: I595d9dd7986a87dfc93e3579d4498b2d9838a4d8
Partially-Implements: bp oslo-for-mitaka
This commit is contained in:
Ryan Rossiter 2016-01-12 22:21:56 +00:00
parent 9a46586dcf
commit 2356f90e0b
3 changed files with 57 additions and 25 deletions

View File

@ -47,6 +47,7 @@ CONF.register_opts(notification_opts)
TRANSPORT = None
LEGACY_NOTIFIER = None
NOTIFICATION_TRANSPORT = None
NOTIFIER = None
ALLOWED_EXMODS = [
@ -68,34 +69,43 @@ TRANSPORT_ALIASES = {
def init(conf):
global TRANSPORT, LEGACY_NOTIFIER, NOTIFIER
global TRANSPORT, NOTIFICATION_TRANSPORT, LEGACY_NOTIFIER, NOTIFIER
exmods = get_allowed_exmods()
TRANSPORT = messaging.get_transport(conf,
allowed_remote_exmods=exmods,
aliases=TRANSPORT_ALIASES)
NOTIFICATION_TRANSPORT = messaging.get_notification_transport(
conf, allowed_remote_exmods=exmods, aliases=TRANSPORT_ALIASES)
serializer = RequestContextSerializer(JsonPayloadSerializer())
if conf.notification_format == 'unversioned':
LEGACY_NOTIFIER = messaging.Notifier(TRANSPORT, serializer=serializer)
NOTIFIER = messaging.Notifier(TRANSPORT, serializer=serializer,
driver='noop')
LEGACY_NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT,
serializer=serializer)
NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT,
serializer=serializer, driver='noop')
elif conf.notification_format == 'both':
LEGACY_NOTIFIER = messaging.Notifier(TRANSPORT, serializer=serializer)
NOTIFIER = messaging.Notifier(TRANSPORT, serializer=serializer,
LEGACY_NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT,
serializer=serializer)
NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT,
serializer=serializer,
topic='versioned_notifications')
else:
LEGACY_NOTIFIER = messaging.Notifier(TRANSPORT, serializer=serializer,
LEGACY_NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT,
serializer=serializer,
driver='noop')
NOTIFIER = messaging.Notifier(TRANSPORT, serializer=serializer,
NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT,
serializer=serializer,
topic='versioned_notifications')
def cleanup():
global TRANSPORT, LEGACY_NOTIFIER, NOTIFIER
global TRANSPORT, NOTIFICATION_TRANSPORT, LEGACY_NOTIFIER, NOTIFIER
assert TRANSPORT is not None
assert NOTIFICATION_TRANSPORT is not None
assert LEGACY_NOTIFIER is not None
assert NOTIFIER is not None
TRANSPORT.cleanup()
TRANSPORT = LEGACY_NOTIFIER = NOTIFIER = None
NOTIFICATION_TRANSPORT.cleanup()
TRANSPORT = NOTIFICATION_TRANSPORT = LEGACY_NOTIFIER = NOTIFIER = None
def set_defaults(control_exchange):

View File

@ -22,9 +22,11 @@ from nova import test
class TestNotifier(test.NoDBTestCase):
@mock.patch('oslo_messaging.get_transport')
@mock.patch('oslo_messaging.get_notification_transport')
@mock.patch('oslo_messaging.Notifier')
def test_notification_format_affects_notification_driver(self,
mock_notifier,
mock_noti_trans,
mock_transport):
conf = mock.Mock()

View File

@ -28,6 +28,7 @@ from nova import test
class RPCResetFixture(fixtures.Fixture):
def _setUp(self):
self.trans = copy.copy(rpc.TRANSPORT)
self.noti_trans = copy.copy(rpc.NOTIFICATION_TRANSPORT)
self.noti = copy.copy(rpc.NOTIFIER)
self.all_mods = copy.copy(rpc.ALLOWED_EXMODS)
self.ext_mods = copy.copy(rpc.EXTRA_EXMODS)
@ -35,6 +36,7 @@ class RPCResetFixture(fixtures.Fixture):
def _reset_everything(self):
rpc.TRANSPORT = self.trans
rpc.NOTIFICATION_TRANSPORT = self.noti_trans
rpc.NOTIFIER = self.noti
rpc.ALLOWED_EXMODS = self.all_mods
rpc.EXTRA_EXMODS = self.ext_mods
@ -50,60 +52,76 @@ class TestRPC(testtools.TestCase):
@mock.patch.object(rpc, 'get_allowed_exmods')
@mock.patch.object(rpc, 'RequestContextSerializer')
@mock.patch.object(messaging, 'get_transport')
@mock.patch.object(messaging, 'get_notification_transport')
@mock.patch.object(messaging, 'Notifier')
def test_init_unversioned(self, mock_notif, mock_trans, mock_ser,
mock_exmods):
def test_init_unversioned(self, mock_notif, mock_noti_trans, mock_trans,
mock_ser, mock_exmods):
# The expected call to get the legacy notifier will require no new
# kwargs, and we expect the new notifier will need the noop driver
expected = [{}, {'driver': 'noop'}]
self._test_init(mock_notif, mock_trans, mock_ser, mock_exmods,
'unversioned', expected)
self._test_init(mock_notif, mock_noti_trans, mock_trans, mock_ser,
mock_exmods, 'unversioned', expected)
@mock.patch.object(rpc, 'get_allowed_exmods')
@mock.patch.object(rpc, 'RequestContextSerializer')
@mock.patch.object(messaging, 'get_transport')
@mock.patch.object(messaging, 'get_notification_transport')
@mock.patch.object(messaging, 'Notifier')
def test_init_both(self, mock_notif, mock_trans, mock_ser, mock_exmods):
def test_init_both(self, mock_notif, mock_noti_trans, mock_trans,
mock_ser, mock_exmods):
expected = [{}, {'topic': 'versioned_notifications'}]
self._test_init(mock_notif, mock_trans, mock_ser, mock_exmods,
'both', expected)
self._test_init(mock_notif, mock_noti_trans, mock_trans, mock_ser,
mock_exmods, 'both', expected)
@mock.patch.object(rpc, 'get_allowed_exmods')
@mock.patch.object(rpc, 'RequestContextSerializer')
@mock.patch.object(messaging, 'get_transport')
@mock.patch.object(messaging, 'get_notification_transport')
@mock.patch.object(messaging, 'Notifier')
def test_init_versioned(self, mock_notif, mock_trans, mock_ser,
mock_exmods):
def test_init_versioned(self, mock_notif, mock_noti_trans, mock_trans,
mock_ser, mock_exmods):
expected = [{'driver': 'noop'}, {'topic': 'versioned_notifications'}]
self._test_init(mock_notif, mock_trans, mock_ser, mock_exmods,
'versioned', expected)
self._test_init(mock_notif, mock_noti_trans, mock_trans, mock_ser,
mock_exmods, 'versioned', expected)
def test_cleanup_transport_null(self):
rpc.NOTIFICATION_TRANSPORT = mock.Mock()
rpc.LEGACY_NOTIFIER = mock.Mock()
rpc.NOTIFIER = mock.Mock()
self.assertRaises(AssertionError, rpc.cleanup)
def test_cleanup_notification_transport_null(self):
rpc.TRANSPORT = mock.Mock()
rpc.NOTIFIER = mock.Mock()
self.assertRaises(AssertionError, rpc.cleanup)
def test_cleanup_legacy_notifier_null(self):
rpc.TRANSPORT = mock.Mock()
rpc.NOTIFICATION_TRANSPORT = mock.Mock()
rpc.NOTIFIER = mock.Mock()
self.assertRaises(AssertionError, rpc.cleanup)
def test_cleanup_notifier_null(self):
rpc.TRANSPORT = mock.Mock()
rpc.LEGACY_NOTIFIER = mock.Mock()
rpc.NOTIFICATION_TRANSPORT = mock.Mock()
self.assertRaises(AssertionError, rpc.cleanup)
def test_cleanup(self):
rpc.LEGACY_NOTIFIER = mock.Mock()
rpc.NOTIFIER = mock.Mock()
rpc.NOTIFICATION_TRANSPORT = mock.Mock()
rpc.TRANSPORT = mock.Mock()
trans_cleanup = mock.Mock()
not_trans_cleanup = mock.Mock()
rpc.TRANSPORT.cleanup = trans_cleanup
rpc.NOTIFICATION_TRANSPORT.cleanup = not_trans_cleanup
rpc.cleanup()
trans_cleanup.assert_called_once_with()
not_trans_cleanup.assert_called_once_with()
self.assertIsNone(rpc.TRANSPORT)
self.assertIsNone(rpc.NOTIFICATION_TRANSPORT)
self.assertIsNone(rpc.LEGACY_NOTIFIER)
self.assertIsNone(rpc.NOTIFIER)
@ -228,10 +246,11 @@ class TestRPC(testtools.TestCase):
mock_prep.assert_called_once_with(publisher_id='service.foo')
self.assertEqual('notifier', notifier)
def _test_init(self, mock_notif, mock_trans, mock_ser, mock_exmods,
notif_format, expected_driver_topic_kwargs):
def _test_init(self, mock_notif, mock_noti_trans, mock_trans, mock_ser,
mock_exmods, notif_format, expected_driver_topic_kwargs):
legacy_notifier = mock.Mock()
notifier = mock.Mock()
notif_transport = mock.Mock()
transport = mock.Mock()
serializer = mock.Mock()
conf = mock.Mock()
@ -239,6 +258,7 @@ class TestRPC(testtools.TestCase):
conf.notification_format = notif_format
mock_exmods.return_value = ['foo']
mock_trans.return_value = transport
mock_noti_trans.return_value = notif_transport
mock_ser.return_value = serializer
mock_notif.side_effect = [legacy_notifier, notifier]
@ -258,7 +278,7 @@ class TestRPC(testtools.TestCase):
for kwargs in expected_driver_topic_kwargs:
expected_kwargs = {'serializer': serializer}
expected_kwargs.update(kwargs)
expected_calls.append(((transport,), expected_kwargs))
expected_calls.append(((notif_transport,), expected_kwargs))
self.assertEqual(expected_calls, mock_notif.call_args_list,
"The calls to messaging.Notifier() did not create "