Add one more functional test for MessagingTimeout

Add a check, that MessagingTimeout raises on long-running queries,
if client sends another queries at the same time.

Added a long_running_task() to TestServerEndpoint and allowed to pass a
message executor into the RpcServerFixture.

Related bug: #1338732

Co-Authored-By: Roman Podoliaka <rpodolyaka@mirantis.com>
Change-Id: Icafb6838e2d9fb76b6d1c202465c09c174a3bed9
This commit is contained in:
Victor Sergeyev 2015-05-20 15:49:53 +03:00
parent 3d483fda70
commit cc4ca1f9ef
2 changed files with 36 additions and 5 deletions

View File

@ -11,10 +11,13 @@
# License for the specific language governing permissions and limitations
# under the License.
import oslo_messaging
import time
import uuid
import concurrent.futures
from testtools import matchers
import oslo_messaging
from oslo_messaging.tests.functional import utils
@ -103,6 +106,27 @@ class CallTestCase(utils.SkipIfNoTransportURL):
f = lambda: client.subtract(increment=3)
self.assertThat(f, matchers.raises(ValueError))
def test_timeout_with_concurrently_queues(self):
transport = self.useFixture(utils.TransportFixture(self.url))
target = oslo_messaging.Target(topic="topic_" + str(uuid.uuid4()),
server="server_" + str(uuid.uuid4()))
server = self.useFixture(
utils.RpcServerFixture(self.url, target, executor="threading"))
client = utils.ClientStub(transport.transport, target,
cast=False, timeout=5)
def short_periodical_tasks():
for i in range(10):
client.add(increment=1)
time.sleep(1)
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
future = executor.submit(client.long_running_task, seconds=10)
executor.submit(short_periodical_tasks)
self.assertRaises(oslo_messaging.MessagingTimeout, future.result)
self.assertEqual(10, server.endpoint.ival)
class CastTestCase(utils.SkipIfNoTransportURL):
# Note: casts return immediately, so these tests utilise a special

View File

@ -46,6 +46,9 @@ class TestServerEndpoint(object):
self.sval += text
return self.sval
def long_running_task(self, ctxt, seconds):
time.sleep(seconds)
class TransportFixture(fixtures.Fixture):
"""Fixture defined to setup the oslo_messaging transport."""
@ -69,11 +72,13 @@ class TransportFixture(fixtures.Fixture):
class RpcServerFixture(fixtures.Fixture):
"""Fixture to setup the TestServerEndpoint."""
def __init__(self, url, target, endpoint=None, ctrl_target=None):
def __init__(self, url, target, endpoint=None, ctrl_target=None,
executor='blocking'):
super(RpcServerFixture, self).__init__()
self.url = url
self.target = target
self.endpoint = endpoint or TestServerEndpoint()
self.executor = executor
self.syncq = moves.queue.Queue()
self.ctrl_target = ctrl_target or self.target
@ -81,9 +86,11 @@ class RpcServerFixture(fixtures.Fixture):
super(RpcServerFixture, self).setUp()
endpoints = [self.endpoint, self]
transport = self.useFixture(TransportFixture(self.url))
self.server = oslo_messaging.get_rpc_server(transport.transport,
self.target,
endpoints)
self.server = oslo_messaging.get_rpc_server(
transport=transport.transport,
target=self.target,
endpoints=endpoints,
executor=self.executor)
self._ctrl = oslo_messaging.RPCClient(transport.transport,
self.ctrl_target)
self._start()