Fix work with timeout in CallRequest.receive_reply()

Refactored CallRequest.receive_reply() method to raise MessagingTimeout
exception, when timeout is reached.  Removed unused _to_milliseconds() method

Functional test CallTestCase.test_timeout() passes now

Change-Id: Idc3224646c3626a56606d019ff7ff155d3e3201a
This commit is contained in:
Victor Sergeyev 2015-07-09 16:04:01 +03:00
parent bcdc0e88ec
commit 48f2a87a27
4 changed files with 16 additions and 10 deletions

View File

@ -14,6 +14,7 @@
import logging
import oslo_messaging
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver.rpc.client.zmq_request import Request
from oslo_messaging._drivers.zmq_driver import zmq_async
@ -31,10 +32,10 @@ class CallRequest(Request):
def __init__(self, conf, target, context, message, timeout=None,
retry=None, allowed_remote_exmods=None):
self.allowed_remote_exmods = allowed_remote_exmods or []
try:
self.zmq_context = zmq.Context()
socket = self.zmq_context.socket(zmq.REQ)
super(CallRequest, self).__init__(conf, target, context,
message, socket,
zmq_serializer.CALL_TYPE,
@ -50,8 +51,15 @@ class CallRequest(Request):
def receive_reply(self):
# NOTE(ozamiatin): Check for retry here (no retries now)
self.socket.setsockopt(zmq.RCVTIMEO, self.timeout)
reply = self.socket.recv_json()
poller = zmq_async.get_reply_poller()
poller.register(self.socket,
recv_method=lambda socket: socket.recv_json())
reply, socket = poller.poll(timeout=self.timeout)
if reply is None:
raise oslo_messaging.MessagingTimeout(
"Timeout %s seconds was reached" % self.timeout)
if reply['failure']:
raise rpc_common.deserialize_remote_exception(
reply['failure'], self.allowed_remote_exmods)

View File

@ -47,16 +47,12 @@ class Request(object):
self.target = target
self.context = context
self.message = message
self.timeout = self._to_milliseconds(conf, timeout)
self.timeout = timeout or conf.rpc_response_timeout
self.retry = retry
self.reply = None
self.socket = socket
self.topic = zmq_topic.Topic.from_target(conf, target)
@staticmethod
def _to_milliseconds(conf, timeout):
return timeout * 1000 if timeout else conf.rpc_response_timeout * 1000
@property
def is_replied(self):
return self.reply is not None

View File

@ -84,4 +84,4 @@ class TimerTestCase(test_utils.BaseTestCase):
callback = mock.Mock()
remaining = t.check_return(callback)
self.assertEqual(0, remaining)
callback.assert_called_once
self.assertEqual(1, callback.call_count)

View File

@ -41,7 +41,9 @@ setenv = TRANSPORT_URL=amqp://stackqpid:secretqpid@127.0.0.1:65123//
commands = {toxinidir}/setup-test-env-qpid.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
[testenv:py27-func-zeromq]
commands = {toxinidir}/setup-test-env-zmq.sh python -m testtools.run oslo_messaging.tests.functional.test_functional.CallTestCase.test_exception
commands = {toxinidir}/setup-test-env-zmq.sh python -m testtools.run \
oslo_messaging.tests.functional.test_functional.CallTestCase.test_exception \
oslo_messaging.tests.functional.test_functional.CallTestCase.test_timeout
# commands = {toxinidir}/setup-test-env-zmq.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
[flake8]