From bcdc0e88ecdb285dc3b21ce6ed858ddc1ac2c628 Mon Sep 17 00:00:00 2001 From: Victor Sergeyev Date: Thu, 9 Jul 2015 11:39:31 +0300 Subject: [PATCH] ZMQ: Allow to raise remote exception This patch adds possibility to re-raise on client's side exception, that was raised on server side - serialize it on server side, restore and re-raise on client. Allowed to pass `allowed_remote_exmods` parameter from impl_zmq to CallRequest class Functional test CallTestCase.test_exception() passes now, so added it to tox.ini. Modified zmq_receiver to be able run functional tests. Change-Id: Ic055f3574962f3e80a0528d5d99320386303634e --- oslo_messaging/_cmd/zmq_receiver.py | 8 ++++++-- oslo_messaging/_drivers/impl_zmq.py | 8 +++++++- .../_drivers/zmq_driver/broker/zmq_base_proxy.py | 1 + .../_drivers/zmq_driver/poller/green_poller.py | 3 --- .../zmq_driver/rpc/client/zmq_call_request.py | 11 +++++++++-- .../_drivers/zmq_driver/rpc/client/zmq_client.py | 8 +++++--- .../zmq_driver/rpc/server/zmq_call_responder.py | 4 ++++ .../_drivers/zmq_driver/rpc/server/zmq_server.py | 2 +- oslo_messaging/tests/functional/test_functional.py | 3 +-- oslo_messaging/tests/functional/utils.py | 9 +++++++-- tox.ini | 3 ++- 11 files changed, 43 insertions(+), 17 deletions(-) diff --git a/oslo_messaging/_cmd/zmq_receiver.py b/oslo_messaging/_cmd/zmq_receiver.py index cbcdfe88d..f259299f9 100644 --- a/oslo_messaging/_cmd/zmq_receiver.py +++ b/oslo_messaging/_cmd/zmq_receiver.py @@ -24,6 +24,7 @@ import sys from oslo_config import cfg from oslo_messaging._drivers import impl_zmq +from oslo_messaging._drivers.zmq_driver.broker import zmq_broker from oslo_messaging._executors import base # FIXME(markmc) CONF = cfg.CONF @@ -35,6 +36,9 @@ def main(): CONF(sys.argv[1:], project='oslo') logging.basicConfig(level=logging.DEBUG) - with contextlib.closing(impl_zmq.ZmqProxy(CONF)) as reactor: - reactor.consume_in_thread() + with contextlib.closing(zmq_broker.ZmqBroker(CONF)) as reactor: + reactor.start() reactor.wait() + +if __name__ == "__main__": + main() diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index 7357aa3e3..b75bf8f9c 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -71,6 +71,11 @@ zmq_opts = [ default=30, help='Seconds to wait before a cast expires (TTL). ' 'Only supported by impl_zmq.'), + + cfg.IntOpt('rpc_poll_timeout', + default=1, + help='The default number of seconds that poll should wait. ' + 'Poll raises timeout exception when timeout expired.'), ] @@ -95,7 +100,8 @@ class ZmqDriver(base.BaseDriver): def send(self, target, ctxt, message, wait_for_reply=None, timeout=None, retry=None): if self.client is None: - self.client = zmq_client.ZmqClient(self.conf, self.matchmaker) + self.client = zmq_client.ZmqClient(self.conf, self.matchmaker, + self._allowed_remote_exmods) if wait_for_reply: return self.client.call(target, ctxt, message, timeout, retry) else: diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py index d591d94eb..59cd42a79 100644 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py @@ -104,6 +104,7 @@ class BaseTcpFrontend(object): def receive_incoming(self): message, socket = self.poller.poll(1) + LOG.info(_LI("Message %s received."), message) return message diff --git a/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py b/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py index b2c26c8a8..f09bb016a 100644 --- a/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py +++ b/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py @@ -19,13 +19,10 @@ import eventlet import six from oslo_messaging._drivers import common as rpc_common -from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_poller LOG = logging.getLogger(__name__) -zmq = zmq_async.import_zmq() - class GreenPoller(zmq_poller.ZmqPoller): diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py index 682b46fb9..460275378 100644 --- a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py +++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py @@ -14,6 +14,7 @@ import logging +from oslo_messaging._drivers import common as rpc_common from oslo_messaging._drivers.zmq_driver.rpc.client.zmq_request import Request from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_serializer @@ -28,7 +29,8 @@ zmq = zmq_async.import_zmq() class CallRequest(Request): def __init__(self, conf, target, context, message, timeout=None, - retry=None): + retry=None, allowed_remote_exmods=None): + self.allowed_remote_exmods = allowed_remote_exmods or [] try: self.zmq_context = zmq.Context() socket = self.zmq_context.socket(zmq.REQ) @@ -44,9 +46,14 @@ class CallRequest(Request): self.socket.connect(self.connect_address) except zmq.ZMQError as e: LOG.error(_LE("Error connecting to socket: %s") % str(e)) + raise def receive_reply(self): # NOTE(ozamiatin): Check for retry here (no retries now) self.socket.setsockopt(zmq.RCVTIMEO, self.timeout) reply = self.socket.recv_json() - return reply[u'reply'] + if reply['failure']: + raise rpc_common.deserialize_remote_exception( + reply['failure'], self.allowed_remote_exmods) + else: + return reply['reply'] diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_client.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_client.py index a4eed4953..ec00cb912 100644 --- a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_client.py +++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_client.py @@ -19,14 +19,16 @@ from oslo_messaging._drivers.zmq_driver.rpc.client import zmq_cast_dealer class ZmqClient(object): - def __init__(self, conf, matchmaker=None): + def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None): self.conf = conf + self.allowed_remote_exmods = allowed_remote_exmods or [] self.cast_publisher = zmq_cast_dealer.DealerCastPublisher(conf, matchmaker) def call(self, target, context, message, timeout=None, retry=None): - request = zmq_call_request.CallRequest(self.conf, target, context, - message, timeout, retry) + request = zmq_call_request.CallRequest( + self.conf, target, context, message, timeout, retry, + self.allowed_remote_exmods) return request() def cast(self, target, context, message, timeout=None, retry=None): diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_call_responder.py b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_call_responder.py index 9431b8f67..59b46e535 100644 --- a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_call_responder.py +++ b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_call_responder.py @@ -16,6 +16,7 @@ import logging from oslo_messaging._drivers import base +from oslo_messaging._drivers import common as rpc_common from oslo_messaging._drivers.zmq_driver.rpc.server import zmq_base_consumer from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_topic as topic_utils @@ -37,6 +38,9 @@ class ZmqIncomingRequest(base.IncomingMessage): self.poller = poller def reply(self, reply=None, failure=None, log_failure=True): + if failure is not None: + failure = rpc_common.serialize_remote_exception(failure, + log_failure) message_reply = {u'reply': reply, u'failure': failure, u'log_failure': log_failure} diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py index a5540ec59..b51ff0187 100644 --- a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py +++ b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py @@ -39,7 +39,7 @@ class ZmqServer(base.Listener): self.context) def poll(self, timeout=None): - incoming = self.poller.poll(timeout) + incoming = self.poller.poll(timeout or self.conf.rpc_poll_timeout) return incoming[0] def stop(self): diff --git a/oslo_messaging/tests/functional/test_functional.py b/oslo_messaging/tests/functional/test_functional.py index 32cc0190c..962d473fe 100644 --- a/oslo_messaging/tests/functional/test_functional.py +++ b/oslo_messaging/tests/functional/test_functional.py @@ -103,8 +103,7 @@ class CallTestCase(utils.SkipIfNoTransportURL): group = self.useFixture(utils.RpcServerGroupFixture(self.url)) client = group.client(1) client.add(increment=2) - f = lambda: client.subtract(increment=3) - self.assertThat(f, matchers.raises(ValueError)) + self.assertRaises(ValueError, client.subtract, increment=3) def test_timeout_with_concurrently_queues(self): transport = self.useFixture(utils.TransportFixture(self.url)) diff --git a/oslo_messaging/tests/functional/utils.py b/oslo_messaging/tests/functional/utils.py index 8ac087bc4..de1673839 100644 --- a/oslo_messaging/tests/functional/utils.py +++ b/oslo_messaging/tests/functional/utils.py @@ -125,8 +125,13 @@ class RpcServerGroupFixture(fixtures.Fixture): # NOTE(sileht): topic and servier_name must be uniq # to be able to run all tests in parallel self.topic = topic or str(uuid.uuid4()) - self.names = names or ["server_%i_%s" % (i, uuid.uuid4()) - for i in range(3)] + if self.url.startswith('zmq'): + # NOTE(viktors): We need to pass correct hots name to the to + # get_tcp_.*() methods. Should we use nameserver here? + self.names = names or [cfg.CONF.rpc_zmq_host for i in range(3)] + else: + self.names = names or ["server_%i_%s" % (i, uuid.uuid4()) + for i in range(3)] self.exchange = exchange self.targets = [self._target(server=n) for n in self.names] self.use_fanout_ctrl = use_fanout_ctrl diff --git a/tox.ini b/tox.ini index 6c86be990..7d0d665cb 100644 --- a/tox.ini +++ b/tox.ini @@ -41,7 +41,8 @@ setenv = TRANSPORT_URL=amqp://stackqpid:secretqpid@127.0.0.1:65123// commands = {toxinidir}/setup-test-env-qpid.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional' [testenv:py27-func-zeromq] -commands = {toxinidir}/setup-test-env-zmq.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional' +commands = {toxinidir}/setup-test-env-zmq.sh python -m testtools.run oslo_messaging.tests.functional.test_functional.CallTestCase.test_exception +# commands = {toxinidir}/setup-test-env-zmq.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional' [flake8] show-source = True