Use get_notification_transport() for notifications

In oslo.messaging 2.9.0, the RPC and notification buses were decoupled
into separate transports. This changes over the Nova notifier to use the
notification transport instead of the original transport (which is now
used for RPC).

Change-Id: I595d9dd7986a87dfc93e3579d4498b2d9838a4d8
Partially-Implements: bp oslo-for-mitaka
This commit is contained in:
Ryan Rossiter 2016-01-12 22:21:56 +00:00
parent 9a46586dcf
commit 2356f90e0b
3 changed files with 57 additions and 25 deletions

View File

@ -47,6 +47,7 @@ CONF.register_opts(notification_opts)
TRANSPORT = None TRANSPORT = None
LEGACY_NOTIFIER = None LEGACY_NOTIFIER = None
NOTIFICATION_TRANSPORT = None
NOTIFIER = None NOTIFIER = None
ALLOWED_EXMODS = [ ALLOWED_EXMODS = [
@ -68,34 +69,43 @@ TRANSPORT_ALIASES = {
def init(conf): def init(conf):
global TRANSPORT, LEGACY_NOTIFIER, NOTIFIER global TRANSPORT, NOTIFICATION_TRANSPORT, LEGACY_NOTIFIER, NOTIFIER
exmods = get_allowed_exmods() exmods = get_allowed_exmods()
TRANSPORT = messaging.get_transport(conf, TRANSPORT = messaging.get_transport(conf,
allowed_remote_exmods=exmods, allowed_remote_exmods=exmods,
aliases=TRANSPORT_ALIASES) aliases=TRANSPORT_ALIASES)
NOTIFICATION_TRANSPORT = messaging.get_notification_transport(
conf, allowed_remote_exmods=exmods, aliases=TRANSPORT_ALIASES)
serializer = RequestContextSerializer(JsonPayloadSerializer()) serializer = RequestContextSerializer(JsonPayloadSerializer())
if conf.notification_format == 'unversioned': if conf.notification_format == 'unversioned':
LEGACY_NOTIFIER = messaging.Notifier(TRANSPORT, serializer=serializer) LEGACY_NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT,
NOTIFIER = messaging.Notifier(TRANSPORT, serializer=serializer, serializer=serializer)
driver='noop') NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT,
serializer=serializer, driver='noop')
elif conf.notification_format == 'both': elif conf.notification_format == 'both':
LEGACY_NOTIFIER = messaging.Notifier(TRANSPORT, serializer=serializer) LEGACY_NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT,
NOTIFIER = messaging.Notifier(TRANSPORT, serializer=serializer, serializer=serializer)
NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT,
serializer=serializer,
topic='versioned_notifications') topic='versioned_notifications')
else: else:
LEGACY_NOTIFIER = messaging.Notifier(TRANSPORT, serializer=serializer, LEGACY_NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT,
serializer=serializer,
driver='noop') driver='noop')
NOTIFIER = messaging.Notifier(TRANSPORT, serializer=serializer, NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT,
serializer=serializer,
topic='versioned_notifications') topic='versioned_notifications')
def cleanup(): def cleanup():
global TRANSPORT, LEGACY_NOTIFIER, NOTIFIER global TRANSPORT, NOTIFICATION_TRANSPORT, LEGACY_NOTIFIER, NOTIFIER
assert TRANSPORT is not None assert TRANSPORT is not None
assert NOTIFICATION_TRANSPORT is not None
assert LEGACY_NOTIFIER is not None assert LEGACY_NOTIFIER is not None
assert NOTIFIER is not None assert NOTIFIER is not None
TRANSPORT.cleanup() TRANSPORT.cleanup()
TRANSPORT = LEGACY_NOTIFIER = NOTIFIER = None NOTIFICATION_TRANSPORT.cleanup()
TRANSPORT = NOTIFICATION_TRANSPORT = LEGACY_NOTIFIER = NOTIFIER = None
def set_defaults(control_exchange): def set_defaults(control_exchange):

View File

@ -22,9 +22,11 @@ from nova import test
class TestNotifier(test.NoDBTestCase): class TestNotifier(test.NoDBTestCase):
@mock.patch('oslo_messaging.get_transport') @mock.patch('oslo_messaging.get_transport')
@mock.patch('oslo_messaging.get_notification_transport')
@mock.patch('oslo_messaging.Notifier') @mock.patch('oslo_messaging.Notifier')
def test_notification_format_affects_notification_driver(self, def test_notification_format_affects_notification_driver(self,
mock_notifier, mock_notifier,
mock_noti_trans,
mock_transport): mock_transport):
conf = mock.Mock() conf = mock.Mock()

View File

@ -28,6 +28,7 @@ from nova import test
class RPCResetFixture(fixtures.Fixture): class RPCResetFixture(fixtures.Fixture):
def _setUp(self): def _setUp(self):
self.trans = copy.copy(rpc.TRANSPORT) self.trans = copy.copy(rpc.TRANSPORT)
self.noti_trans = copy.copy(rpc.NOTIFICATION_TRANSPORT)
self.noti = copy.copy(rpc.NOTIFIER) self.noti = copy.copy(rpc.NOTIFIER)
self.all_mods = copy.copy(rpc.ALLOWED_EXMODS) self.all_mods = copy.copy(rpc.ALLOWED_EXMODS)
self.ext_mods = copy.copy(rpc.EXTRA_EXMODS) self.ext_mods = copy.copy(rpc.EXTRA_EXMODS)
@ -35,6 +36,7 @@ class RPCResetFixture(fixtures.Fixture):
def _reset_everything(self): def _reset_everything(self):
rpc.TRANSPORT = self.trans rpc.TRANSPORT = self.trans
rpc.NOTIFICATION_TRANSPORT = self.noti_trans
rpc.NOTIFIER = self.noti rpc.NOTIFIER = self.noti
rpc.ALLOWED_EXMODS = self.all_mods rpc.ALLOWED_EXMODS = self.all_mods
rpc.EXTRA_EXMODS = self.ext_mods rpc.EXTRA_EXMODS = self.ext_mods
@ -50,60 +52,76 @@ class TestRPC(testtools.TestCase):
@mock.patch.object(rpc, 'get_allowed_exmods') @mock.patch.object(rpc, 'get_allowed_exmods')
@mock.patch.object(rpc, 'RequestContextSerializer') @mock.patch.object(rpc, 'RequestContextSerializer')
@mock.patch.object(messaging, 'get_transport') @mock.patch.object(messaging, 'get_transport')
@mock.patch.object(messaging, 'get_notification_transport')
@mock.patch.object(messaging, 'Notifier') @mock.patch.object(messaging, 'Notifier')
def test_init_unversioned(self, mock_notif, mock_trans, mock_ser, def test_init_unversioned(self, mock_notif, mock_noti_trans, mock_trans,
mock_exmods): mock_ser, mock_exmods):
# The expected call to get the legacy notifier will require no new # The expected call to get the legacy notifier will require no new
# kwargs, and we expect the new notifier will need the noop driver # kwargs, and we expect the new notifier will need the noop driver
expected = [{}, {'driver': 'noop'}] expected = [{}, {'driver': 'noop'}]
self._test_init(mock_notif, mock_trans, mock_ser, mock_exmods, self._test_init(mock_notif, mock_noti_trans, mock_trans, mock_ser,
'unversioned', expected) mock_exmods, 'unversioned', expected)
@mock.patch.object(rpc, 'get_allowed_exmods') @mock.patch.object(rpc, 'get_allowed_exmods')
@mock.patch.object(rpc, 'RequestContextSerializer') @mock.patch.object(rpc, 'RequestContextSerializer')
@mock.patch.object(messaging, 'get_transport') @mock.patch.object(messaging, 'get_transport')
@mock.patch.object(messaging, 'get_notification_transport')
@mock.patch.object(messaging, 'Notifier') @mock.patch.object(messaging, 'Notifier')
def test_init_both(self, mock_notif, mock_trans, mock_ser, mock_exmods): def test_init_both(self, mock_notif, mock_noti_trans, mock_trans,
mock_ser, mock_exmods):
expected = [{}, {'topic': 'versioned_notifications'}] expected = [{}, {'topic': 'versioned_notifications'}]
self._test_init(mock_notif, mock_trans, mock_ser, mock_exmods, self._test_init(mock_notif, mock_noti_trans, mock_trans, mock_ser,
'both', expected) mock_exmods, 'both', expected)
@mock.patch.object(rpc, 'get_allowed_exmods') @mock.patch.object(rpc, 'get_allowed_exmods')
@mock.patch.object(rpc, 'RequestContextSerializer') @mock.patch.object(rpc, 'RequestContextSerializer')
@mock.patch.object(messaging, 'get_transport') @mock.patch.object(messaging, 'get_transport')
@mock.patch.object(messaging, 'get_notification_transport')
@mock.patch.object(messaging, 'Notifier') @mock.patch.object(messaging, 'Notifier')
def test_init_versioned(self, mock_notif, mock_trans, mock_ser, def test_init_versioned(self, mock_notif, mock_noti_trans, mock_trans,
mock_exmods): mock_ser, mock_exmods):
expected = [{'driver': 'noop'}, {'topic': 'versioned_notifications'}] expected = [{'driver': 'noop'}, {'topic': 'versioned_notifications'}]
self._test_init(mock_notif, mock_trans, mock_ser, mock_exmods, self._test_init(mock_notif, mock_noti_trans, mock_trans, mock_ser,
'versioned', expected) mock_exmods, 'versioned', expected)
def test_cleanup_transport_null(self): def test_cleanup_transport_null(self):
rpc.NOTIFICATION_TRANSPORT = mock.Mock()
rpc.LEGACY_NOTIFIER = mock.Mock() rpc.LEGACY_NOTIFIER = mock.Mock()
rpc.NOTIFIER = mock.Mock() rpc.NOTIFIER = mock.Mock()
self.assertRaises(AssertionError, rpc.cleanup) self.assertRaises(AssertionError, rpc.cleanup)
def test_cleanup_notification_transport_null(self):
rpc.TRANSPORT = mock.Mock()
rpc.NOTIFIER = mock.Mock()
self.assertRaises(AssertionError, rpc.cleanup)
def test_cleanup_legacy_notifier_null(self): def test_cleanup_legacy_notifier_null(self):
rpc.TRANSPORT = mock.Mock() rpc.TRANSPORT = mock.Mock()
rpc.NOTIFICATION_TRANSPORT = mock.Mock()
rpc.NOTIFIER = mock.Mock() rpc.NOTIFIER = mock.Mock()
self.assertRaises(AssertionError, rpc.cleanup)
def test_cleanup_notifier_null(self): def test_cleanup_notifier_null(self):
rpc.TRANSPORT = mock.Mock() rpc.TRANSPORT = mock.Mock()
rpc.LEGACY_NOTIFIER = mock.Mock() rpc.LEGACY_NOTIFIER = mock.Mock()
rpc.NOTIFICATION_TRANSPORT = mock.Mock()
self.assertRaises(AssertionError, rpc.cleanup) self.assertRaises(AssertionError, rpc.cleanup)
def test_cleanup(self): def test_cleanup(self):
rpc.LEGACY_NOTIFIER = mock.Mock() rpc.LEGACY_NOTIFIER = mock.Mock()
rpc.NOTIFIER = mock.Mock() rpc.NOTIFIER = mock.Mock()
rpc.NOTIFICATION_TRANSPORT = mock.Mock()
rpc.TRANSPORT = mock.Mock() rpc.TRANSPORT = mock.Mock()
trans_cleanup = mock.Mock() trans_cleanup = mock.Mock()
not_trans_cleanup = mock.Mock()
rpc.TRANSPORT.cleanup = trans_cleanup rpc.TRANSPORT.cleanup = trans_cleanup
rpc.NOTIFICATION_TRANSPORT.cleanup = not_trans_cleanup
rpc.cleanup() rpc.cleanup()
trans_cleanup.assert_called_once_with() trans_cleanup.assert_called_once_with()
not_trans_cleanup.assert_called_once_with()
self.assertIsNone(rpc.TRANSPORT) self.assertIsNone(rpc.TRANSPORT)
self.assertIsNone(rpc.NOTIFICATION_TRANSPORT)
self.assertIsNone(rpc.LEGACY_NOTIFIER) self.assertIsNone(rpc.LEGACY_NOTIFIER)
self.assertIsNone(rpc.NOTIFIER) self.assertIsNone(rpc.NOTIFIER)
@ -228,10 +246,11 @@ class TestRPC(testtools.TestCase):
mock_prep.assert_called_once_with(publisher_id='service.foo') mock_prep.assert_called_once_with(publisher_id='service.foo')
self.assertEqual('notifier', notifier) self.assertEqual('notifier', notifier)
def _test_init(self, mock_notif, mock_trans, mock_ser, mock_exmods, def _test_init(self, mock_notif, mock_noti_trans, mock_trans, mock_ser,
notif_format, expected_driver_topic_kwargs): mock_exmods, notif_format, expected_driver_topic_kwargs):
legacy_notifier = mock.Mock() legacy_notifier = mock.Mock()
notifier = mock.Mock() notifier = mock.Mock()
notif_transport = mock.Mock()
transport = mock.Mock() transport = mock.Mock()
serializer = mock.Mock() serializer = mock.Mock()
conf = mock.Mock() conf = mock.Mock()
@ -239,6 +258,7 @@ class TestRPC(testtools.TestCase):
conf.notification_format = notif_format conf.notification_format = notif_format
mock_exmods.return_value = ['foo'] mock_exmods.return_value = ['foo']
mock_trans.return_value = transport mock_trans.return_value = transport
mock_noti_trans.return_value = notif_transport
mock_ser.return_value = serializer mock_ser.return_value = serializer
mock_notif.side_effect = [legacy_notifier, notifier] mock_notif.side_effect = [legacy_notifier, notifier]
@ -258,7 +278,7 @@ class TestRPC(testtools.TestCase):
for kwargs in expected_driver_topic_kwargs: for kwargs in expected_driver_topic_kwargs:
expected_kwargs = {'serializer': serializer} expected_kwargs = {'serializer': serializer}
expected_kwargs.update(kwargs) expected_kwargs.update(kwargs)
expected_calls.append(((transport,), expected_kwargs)) expected_calls.append(((notif_transport,), expected_kwargs))
self.assertEqual(expected_calls, mock_notif.call_args_list, self.assertEqual(expected_calls, mock_notif.call_args_list,
"The calls to messaging.Notifier() did not create " "The calls to messaging.Notifier() did not create "