Make the dispatcher responsible to listen()

The dispatcher is now responsible to configure and to get the listener from
the transport.

The server just ask to the dispatcher to build and return a configured
listener for a provider transport.

Partial implements blueprint notification-subscriber-server

Change-Id: I4a6d9620b8239f6d377bc5788b8a90a860b2f02c
This commit is contained in:
Mehdi Abaakouk 2013-12-17 19:44:51 +01:00 committed by Mehdi Abaakouk
parent 11a90eabc9
commit 86e5737bf6
5 changed files with 23 additions and 15 deletions

View File

@ -29,7 +29,7 @@ from oslo.messaging import _utils as utils
from oslo.messaging import localcontext
from oslo.messaging import serializer as msg_serializer
from oslo.messaging import server as msg_server
from oslo.messaging import target
from oslo.messaging import target as msg_target
class RPCDispatcherError(msg_server.MessagingServerError):
@ -68,12 +68,24 @@ class RPCDispatcher(object):
Endpoints may have a target attribute describing the namespace and version
of the methods exposed by that object. All public methods on an endpoint
object are remotely invokable by clients.
"""
def __init__(self, endpoints, serializer):
def __init__(self, target, endpoints, serializer):
"""Construct a rpc server dispatcher.
:param target: the exchange, topic and server to listen on
:type target: Target
"""
self.endpoints = endpoints
self.serializer = serializer or msg_serializer.NoOpSerializer()
self._default_target = target.Target()
self._default_target = msg_target.Target()
self._target = target
def _listen(self, transport):
return transport._listen(self._target)
@staticmethod
def _is_namespace(target, namespace):

View File

@ -121,9 +121,8 @@ def get_rpc_server(transport, target, endpoints,
:param serializer: an optional entity serializer
:type serializer: Serializer
"""
dispatcher = rpc_dispatcher.RPCDispatcher(endpoints, serializer)
return msg_server.MessageHandlingServer(transport, target,
dispatcher, executor)
dispatcher = rpc_dispatcher.RPCDispatcher(target, endpoints, serializer)
return msg_server.MessageHandlingServer(transport, dispatcher, executor)
class ExpectedException(Exception):

View File

@ -61,7 +61,7 @@ class MessageHandlingServer(object):
new tasks.
"""
def __init__(self, transport, target, dispatcher, executor='blocking'):
def __init__(self, transport, dispatcher, executor='blocking'):
"""Construct a message handling server.
The dispatcher parameter is a callable which is invoked with context
@ -73,8 +73,6 @@ class MessageHandlingServer(object):
:param transport: the messaging transport
:type transport: Transport
:param target: the exchange, topic and server to listen on
:type target: Target
:param dispatcher: a callable which is invoked for each method
:type dispatcher: callable
:param executor: name of message executor - e.g. 'eventlet', 'blocking'
@ -83,7 +81,6 @@ class MessageHandlingServer(object):
self.conf = transport.conf
self.transport = transport
self.target = target
self.dispatcher = dispatcher
self.executor = executor
@ -116,9 +113,8 @@ class MessageHandlingServer(object):
"""
if self._executor is not None:
return
try:
listener = self.transport._listen(self.target)
listener = self.dispatcher._listen(self.transport)
except driver_base.TransportDriverError as ex:
raise ServerListenError(self.target, ex)

View File

@ -97,7 +97,8 @@ class TestDispatcher(test_utils.BaseTestCase):
endpoints.append(_FakeEndpoint(target))
serializer = None
dispatcher = messaging.RPCDispatcher(endpoints, serializer)
target = messaging.Target()
dispatcher = messaging.RPCDispatcher(target, endpoints, serializer)
if self.dispatch_to is not None:
endpoint = endpoints[self.dispatch_to['endpoint']]
@ -139,7 +140,8 @@ class TestSerializer(test_utils.BaseTestCase):
def test_serializer(self):
endpoint = _FakeEndpoint()
serializer = msg_serializer.NoOpSerializer()
dispatcher = messaging.RPCDispatcher([endpoint], serializer)
target = messaging.Target()
dispatcher = messaging.RPCDispatcher(target, [endpoint], serializer)
self.mox.StubOutWithMock(endpoint, 'foo')
args = dict([(k, 'd' + v) for k, v in self.args.items()])

View File

@ -105,7 +105,6 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self.assertIs(server.conf, self.conf)
self.assertIs(server.transport, transport)
self.assertIs(server.target, target)
self.assertIsInstance(server.dispatcher, messaging.RPCDispatcher)
self.assertIs(server.dispatcher.endpoints, endpoints)
self.assertIs(server.dispatcher.serializer, serializer)