Add necessary constants and methods for RPC notification

As first step of adding notify functionality to masakari, this patch
adds the necessary constants and methods for RPC notification.

Co-Authored-By: Shilpa Devharakar <Shilpa.Devharakar@nttdata.com>

Change-Id: Iad3f487ef14effb41484014e5426117129543f5c
Partial-Implements: bp notifications-in-masakari
This commit is contained in:
Kengo Takahara 2017-06-15 10:41:47 +09:00 committed by shilpa
parent 68b9c42bef
commit 3253b13eb9
2 changed files with 180 additions and 6 deletions

View File

@ -14,6 +14,7 @@
import oslo_messaging as messaging
from oslo_messaging.rpc import dispatcher
from oslo_serialization import jsonutils
import masakari.context
import masakari.exception
@ -32,7 +33,10 @@ __all__ = [
'get_server',
]
CONF = masakari.conf.CONF
TRANSPORT = None
NOTIFICATION_TRANSPORT = None
NOTIFIER = None
ALLOWED_EXMODS = [
masakari.exception.__name__,
@ -41,20 +45,33 @@ EXTRA_EXMODS = []
def init(conf):
global TRANSPORT
global TRANSPORT, NOTIFICATION_TRANSPORT, NOTIFIER
exmods = get_allowed_exmods()
TRANSPORT = messaging.get_rpc_transport(conf,
allowed_remote_exmods=exmods)
TRANSPORT = create_transport(get_transport_url())
NOTIFICATION_TRANSPORT = messaging.get_notification_transport(
conf, allowed_remote_exmods=exmods)
serializer = RequestContextSerializer(JsonPayloadSerializer())
NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT,
serializer=serializer,
topics=['versioned_notifications'])
def initialized():
return None not in [TRANSPORT]
return None not in [TRANSPORT,
NOTIFICATION_TRANSPORT,
NOTIFIER]
def cleanup():
global TRANSPORT
global TRANSPORT, NOTIFICATION_TRANSPORT, NOTIFIER
assert TRANSPORT is not None
assert NOTIFICATION_TRANSPORT is not None
assert NOTIFIER is not None
TRANSPORT.cleanup()
TRANSPORT = None
NOTIFICATION_TRANSPORT.cleanup()
TRANSPORT = NOTIFICATION_TRANSPORT = NOTIFIER = None
def set_defaults(control_exchange):
@ -73,6 +90,23 @@ def get_allowed_exmods():
return ALLOWED_EXMODS + EXTRA_EXMODS
def get_transport_url(url_str=None):
return messaging.TransportURL.parse(CONF, url_str)
def create_transport(url):
exmods = get_allowed_exmods()
return messaging.get_rpc_transport(CONF,
url=url,
allowed_remote_exmods=exmods)
class JsonPayloadSerializer(messaging.NoOpSerializer):
@staticmethod
def serialize_entity(context, entity):
return jsonutils.to_primitive(entity, convert_instances=True)
class RequestContextSerializer(messaging.Serializer):
def __init__(self, base):
@ -116,6 +150,11 @@ def get_server(target, endpoints, serializer=None):
access_policy=access_policy)
def get_versioned_notifier(publisher_id):
assert NOTIFIER is not None
return NOTIFIER.prepare(publisher_id=publisher_id)
class RPCAPI(object):
"""Mixin class aggregating methods related to RPC API compatibility."""

View File

@ -12,9 +12,14 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
import fixtures
import mock
import oslo_messaging as messaging
from oslo_messaging.rpc import dispatcher
from oslo_serialization import jsonutils
import testtools
from masakari import context
from masakari import rpc
@ -118,6 +123,136 @@ class RPCAPITestCase(test.TestCase):
self.assertEqual('server', server)
class RPCResetFixture(fixtures.Fixture):
def _setUp(self):
self.trans = copy.copy(rpc.TRANSPORT)
self.noti_trans = copy.copy(rpc.NOTIFICATION_TRANSPORT)
self.noti = copy.copy(rpc.NOTIFIER)
self.all_mods = copy.copy(rpc.ALLOWED_EXMODS)
self.ext_mods = copy.copy(rpc.EXTRA_EXMODS)
self.addCleanup(self._reset_everything)
def _reset_everything(self):
rpc.TRANSPORT = self.trans
rpc.NOTIFICATION_TRANSPORT = self.noti_trans
rpc.NOTIFIER = self.noti
rpc.ALLOWED_EXMODS = self.all_mods
rpc.EXTRA_EXMODS = self.ext_mods
class TestRPC(testtools.TestCase):
def setUp(self):
super(TestRPC, self).setUp()
self.useFixture(RPCResetFixture())
@mock.patch.object(rpc, 'get_allowed_exmods')
@mock.patch.object(rpc, 'RequestContextSerializer')
@mock.patch.object(messaging, 'get_notification_transport')
@mock.patch.object(messaging, 'Notifier')
def test_init_versioned(self, mock_notif, mock_noti_trans,
mock_ser, mock_exmods):
expected = [{'topics': ['versioned_notifications']}]
self._test_init(mock_notif, mock_noti_trans, mock_ser,
mock_exmods, 'versioned', expected)
def test_cleanup_transport_null(self):
rpc.TRANSPORT = None
rpc.NOTIFICATION_TRANSPORT = mock.Mock()
rpc.NOTIFIER = mock.Mock()
self.assertRaises(AssertionError, rpc.cleanup)
def test_cleanup_notification_transport_null(self):
rpc.TRANSPORT = mock.Mock()
rpc.NOTIFICATION_TRANSPORT = None
rpc.NOTIFIER = mock.Mock()
self.assertRaises(AssertionError, rpc.cleanup)
def test_cleanup_notifier_null(self):
rpc.TRANSPORT = mock.Mock()
rpc.NOTIFICATION_TRANSPORT = mock.Mock()
rpc.NOTIFIER = None
self.assertRaises(AssertionError, rpc.cleanup)
def test_cleanup(self):
rpc.NOTIFIER = mock.Mock()
rpc.NOTIFICATION_TRANSPORT = mock.Mock()
rpc.TRANSPORT = mock.Mock()
trans_cleanup = mock.Mock()
not_trans_cleanup = mock.Mock()
rpc.TRANSPORT.cleanup = trans_cleanup
rpc.NOTIFICATION_TRANSPORT.cleanup = not_trans_cleanup
rpc.cleanup()
trans_cleanup.assert_called_once_with()
not_trans_cleanup.assert_called_once_with()
self.assertIsNone(rpc.TRANSPORT)
self.assertIsNone(rpc.NOTIFICATION_TRANSPORT)
self.assertIsNone(rpc.NOTIFIER)
def test_get_versioned_notifier(self):
rpc.NOTIFIER = mock.Mock()
mock_prep = mock.Mock()
mock_prep.return_value = 'notifier'
rpc.NOTIFIER.prepare = mock_prep
notifier = rpc.get_versioned_notifier('service.foo')
mock_prep.assert_called_once_with(publisher_id='service.foo')
self.assertEqual('notifier', notifier)
def _test_init(self, mock_notif, mock_noti_trans, mock_ser,
mock_exmods, notif_format, expected_driver_topic_kwargs,
versioned_notification_topics=['versioned_notifications']):
notifier = mock.Mock()
notif_transport = mock.Mock()
transport = mock.Mock()
serializer = mock.Mock()
conf = mock.Mock()
conf.transport_url = None
conf.notification_format = notif_format
mock_exmods.return_value = ['foo']
conf.notifications.versioned_notifications_topics = (
versioned_notification_topics)
mock_noti_trans.return_value = notif_transport
mock_ser.return_value = serializer
mock_notif.side_effect = [notifier]
@mock.patch.object(rpc, 'CONF', new=conf)
@mock.patch.object(rpc, 'create_transport')
@mock.patch.object(rpc, 'get_transport_url')
def _test(get_url, create_transport):
create_transport.return_value = transport
rpc.init(conf)
create_transport.assert_called_once_with(get_url.return_value)
_test()
self.assertTrue(mock_exmods.called)
self.assertIsNotNone(rpc.TRANSPORT)
self.assertIsNotNone(rpc.NOTIFIER)
self.assertEqual(notifier, rpc.NOTIFIER)
expected_calls = []
for kwargs in expected_driver_topic_kwargs:
expected_kwargs = {'serializer': serializer}
expected_kwargs.update(kwargs)
expected_calls.append(((notif_transport,), expected_kwargs))
self.assertEqual(expected_calls, mock_notif.call_args_list,
"The calls to messaging.Notifier() did not create "
"the versioned notifiers properly.")
class TestJsonPayloadSerializer(test.NoDBTestCase):
def test_serialize_entity(self):
with mock.patch.object(jsonutils, 'to_primitive') as mock_prim:
rpc.JsonPayloadSerializer.serialize_entity('context', 'entity')
mock_prim.assert_called_once_with('entity', convert_instances=True)
class TestRequestContextSerializer(test.NoDBTestCase):
def setUp(self):
super(TestRequestContextSerializer, self).setUp()