Add necessary constants and methods for RPC notification
As first step of adding notify functionality to masakari, this patch adds the necessary constants and methods for RPC notification. Co-Authored-By: Shilpa Devharakar <Shilpa.Devharakar@nttdata.com> Change-Id: Iad3f487ef14effb41484014e5426117129543f5c Partial-Implements: bp notifications-in-masakari
This commit is contained in:
parent
68b9c42bef
commit
3253b13eb9
|
@ -14,6 +14,7 @@
|
|||
|
||||
import oslo_messaging as messaging
|
||||
from oslo_messaging.rpc import dispatcher
|
||||
from oslo_serialization import jsonutils
|
||||
|
||||
import masakari.context
|
||||
import masakari.exception
|
||||
|
@ -32,7 +33,10 @@ __all__ = [
|
|||
'get_server',
|
||||
]
|
||||
|
||||
CONF = masakari.conf.CONF
|
||||
TRANSPORT = None
|
||||
NOTIFICATION_TRANSPORT = None
|
||||
NOTIFIER = None
|
||||
|
||||
ALLOWED_EXMODS = [
|
||||
masakari.exception.__name__,
|
||||
|
@ -41,20 +45,33 @@ EXTRA_EXMODS = []
|
|||
|
||||
|
||||
def init(conf):
|
||||
global TRANSPORT
|
||||
global TRANSPORT, NOTIFICATION_TRANSPORT, NOTIFIER
|
||||
exmods = get_allowed_exmods()
|
||||
TRANSPORT = messaging.get_rpc_transport(conf,
|
||||
allowed_remote_exmods=exmods)
|
||||
|
||||
TRANSPORT = create_transport(get_transport_url())
|
||||
NOTIFICATION_TRANSPORT = messaging.get_notification_transport(
|
||||
conf, allowed_remote_exmods=exmods)
|
||||
serializer = RequestContextSerializer(JsonPayloadSerializer())
|
||||
NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT,
|
||||
serializer=serializer,
|
||||
topics=['versioned_notifications'])
|
||||
|
||||
|
||||
def initialized():
|
||||
return None not in [TRANSPORT]
|
||||
return None not in [TRANSPORT,
|
||||
NOTIFICATION_TRANSPORT,
|
||||
NOTIFIER]
|
||||
|
||||
|
||||
def cleanup():
|
||||
global TRANSPORT
|
||||
global TRANSPORT, NOTIFICATION_TRANSPORT, NOTIFIER
|
||||
assert TRANSPORT is not None
|
||||
assert NOTIFICATION_TRANSPORT is not None
|
||||
assert NOTIFIER is not None
|
||||
|
||||
TRANSPORT.cleanup()
|
||||
TRANSPORT = None
|
||||
NOTIFICATION_TRANSPORT.cleanup()
|
||||
TRANSPORT = NOTIFICATION_TRANSPORT = NOTIFIER = None
|
||||
|
||||
|
||||
def set_defaults(control_exchange):
|
||||
|
@ -73,6 +90,23 @@ def get_allowed_exmods():
|
|||
return ALLOWED_EXMODS + EXTRA_EXMODS
|
||||
|
||||
|
||||
def get_transport_url(url_str=None):
|
||||
return messaging.TransportURL.parse(CONF, url_str)
|
||||
|
||||
|
||||
def create_transport(url):
|
||||
exmods = get_allowed_exmods()
|
||||
return messaging.get_rpc_transport(CONF,
|
||||
url=url,
|
||||
allowed_remote_exmods=exmods)
|
||||
|
||||
|
||||
class JsonPayloadSerializer(messaging.NoOpSerializer):
|
||||
@staticmethod
|
||||
def serialize_entity(context, entity):
|
||||
return jsonutils.to_primitive(entity, convert_instances=True)
|
||||
|
||||
|
||||
class RequestContextSerializer(messaging.Serializer):
|
||||
|
||||
def __init__(self, base):
|
||||
|
@ -116,6 +150,11 @@ def get_server(target, endpoints, serializer=None):
|
|||
access_policy=access_policy)
|
||||
|
||||
|
||||
def get_versioned_notifier(publisher_id):
|
||||
assert NOTIFIER is not None
|
||||
return NOTIFIER.prepare(publisher_id=publisher_id)
|
||||
|
||||
|
||||
class RPCAPI(object):
|
||||
"""Mixin class aggregating methods related to RPC API compatibility."""
|
||||
|
||||
|
|
|
@ -12,9 +12,14 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
|
||||
import fixtures
|
||||
import mock
|
||||
import oslo_messaging as messaging
|
||||
from oslo_messaging.rpc import dispatcher
|
||||
from oslo_serialization import jsonutils
|
||||
import testtools
|
||||
|
||||
from masakari import context
|
||||
from masakari import rpc
|
||||
|
@ -118,6 +123,136 @@ class RPCAPITestCase(test.TestCase):
|
|||
self.assertEqual('server', server)
|
||||
|
||||
|
||||
class RPCResetFixture(fixtures.Fixture):
|
||||
def _setUp(self):
|
||||
self.trans = copy.copy(rpc.TRANSPORT)
|
||||
self.noti_trans = copy.copy(rpc.NOTIFICATION_TRANSPORT)
|
||||
self.noti = copy.copy(rpc.NOTIFIER)
|
||||
self.all_mods = copy.copy(rpc.ALLOWED_EXMODS)
|
||||
self.ext_mods = copy.copy(rpc.EXTRA_EXMODS)
|
||||
self.addCleanup(self._reset_everything)
|
||||
|
||||
def _reset_everything(self):
|
||||
rpc.TRANSPORT = self.trans
|
||||
rpc.NOTIFICATION_TRANSPORT = self.noti_trans
|
||||
rpc.NOTIFIER = self.noti
|
||||
rpc.ALLOWED_EXMODS = self.all_mods
|
||||
rpc.EXTRA_EXMODS = self.ext_mods
|
||||
|
||||
|
||||
class TestRPC(testtools.TestCase):
|
||||
def setUp(self):
|
||||
super(TestRPC, self).setUp()
|
||||
self.useFixture(RPCResetFixture())
|
||||
|
||||
@mock.patch.object(rpc, 'get_allowed_exmods')
|
||||
@mock.patch.object(rpc, 'RequestContextSerializer')
|
||||
@mock.patch.object(messaging, 'get_notification_transport')
|
||||
@mock.patch.object(messaging, 'Notifier')
|
||||
def test_init_versioned(self, mock_notif, mock_noti_trans,
|
||||
mock_ser, mock_exmods):
|
||||
expected = [{'topics': ['versioned_notifications']}]
|
||||
self._test_init(mock_notif, mock_noti_trans, mock_ser,
|
||||
mock_exmods, 'versioned', expected)
|
||||
|
||||
def test_cleanup_transport_null(self):
|
||||
rpc.TRANSPORT = None
|
||||
rpc.NOTIFICATION_TRANSPORT = mock.Mock()
|
||||
rpc.NOTIFIER = mock.Mock()
|
||||
self.assertRaises(AssertionError, rpc.cleanup)
|
||||
|
||||
def test_cleanup_notification_transport_null(self):
|
||||
rpc.TRANSPORT = mock.Mock()
|
||||
rpc.NOTIFICATION_TRANSPORT = None
|
||||
rpc.NOTIFIER = mock.Mock()
|
||||
self.assertRaises(AssertionError, rpc.cleanup)
|
||||
|
||||
def test_cleanup_notifier_null(self):
|
||||
rpc.TRANSPORT = mock.Mock()
|
||||
rpc.NOTIFICATION_TRANSPORT = mock.Mock()
|
||||
rpc.NOTIFIER = None
|
||||
self.assertRaises(AssertionError, rpc.cleanup)
|
||||
|
||||
def test_cleanup(self):
|
||||
rpc.NOTIFIER = mock.Mock()
|
||||
rpc.NOTIFICATION_TRANSPORT = mock.Mock()
|
||||
rpc.TRANSPORT = mock.Mock()
|
||||
trans_cleanup = mock.Mock()
|
||||
not_trans_cleanup = mock.Mock()
|
||||
rpc.TRANSPORT.cleanup = trans_cleanup
|
||||
rpc.NOTIFICATION_TRANSPORT.cleanup = not_trans_cleanup
|
||||
|
||||
rpc.cleanup()
|
||||
|
||||
trans_cleanup.assert_called_once_with()
|
||||
not_trans_cleanup.assert_called_once_with()
|
||||
self.assertIsNone(rpc.TRANSPORT)
|
||||
self.assertIsNone(rpc.NOTIFICATION_TRANSPORT)
|
||||
self.assertIsNone(rpc.NOTIFIER)
|
||||
|
||||
def test_get_versioned_notifier(self):
|
||||
rpc.NOTIFIER = mock.Mock()
|
||||
mock_prep = mock.Mock()
|
||||
mock_prep.return_value = 'notifier'
|
||||
rpc.NOTIFIER.prepare = mock_prep
|
||||
|
||||
notifier = rpc.get_versioned_notifier('service.foo')
|
||||
|
||||
mock_prep.assert_called_once_with(publisher_id='service.foo')
|
||||
self.assertEqual('notifier', notifier)
|
||||
|
||||
def _test_init(self, mock_notif, mock_noti_trans, mock_ser,
|
||||
mock_exmods, notif_format, expected_driver_topic_kwargs,
|
||||
versioned_notification_topics=['versioned_notifications']):
|
||||
notifier = mock.Mock()
|
||||
notif_transport = mock.Mock()
|
||||
transport = mock.Mock()
|
||||
serializer = mock.Mock()
|
||||
conf = mock.Mock()
|
||||
|
||||
conf.transport_url = None
|
||||
conf.notification_format = notif_format
|
||||
mock_exmods.return_value = ['foo']
|
||||
conf.notifications.versioned_notifications_topics = (
|
||||
versioned_notification_topics)
|
||||
mock_noti_trans.return_value = notif_transport
|
||||
mock_ser.return_value = serializer
|
||||
mock_notif.side_effect = [notifier]
|
||||
|
||||
@mock.patch.object(rpc, 'CONF', new=conf)
|
||||
@mock.patch.object(rpc, 'create_transport')
|
||||
@mock.patch.object(rpc, 'get_transport_url')
|
||||
def _test(get_url, create_transport):
|
||||
create_transport.return_value = transport
|
||||
rpc.init(conf)
|
||||
create_transport.assert_called_once_with(get_url.return_value)
|
||||
|
||||
_test()
|
||||
|
||||
self.assertTrue(mock_exmods.called)
|
||||
self.assertIsNotNone(rpc.TRANSPORT)
|
||||
self.assertIsNotNone(rpc.NOTIFIER)
|
||||
self.assertEqual(notifier, rpc.NOTIFIER)
|
||||
|
||||
expected_calls = []
|
||||
for kwargs in expected_driver_topic_kwargs:
|
||||
expected_kwargs = {'serializer': serializer}
|
||||
expected_kwargs.update(kwargs)
|
||||
expected_calls.append(((notif_transport,), expected_kwargs))
|
||||
|
||||
self.assertEqual(expected_calls, mock_notif.call_args_list,
|
||||
"The calls to messaging.Notifier() did not create "
|
||||
"the versioned notifiers properly.")
|
||||
|
||||
|
||||
class TestJsonPayloadSerializer(test.NoDBTestCase):
|
||||
def test_serialize_entity(self):
|
||||
with mock.patch.object(jsonutils, 'to_primitive') as mock_prim:
|
||||
rpc.JsonPayloadSerializer.serialize_entity('context', 'entity')
|
||||
|
||||
mock_prim.assert_called_once_with('entity', convert_instances=True)
|
||||
|
||||
|
||||
class TestRequestContextSerializer(test.NoDBTestCase):
|
||||
def setUp(self):
|
||||
super(TestRequestContextSerializer, self).setUp()
|
||||
|
|
Loading…
Reference in New Issue