Class-level _exchanges in FakeExchangeManager

The FakeExchangeManager uses an instance-level storage for FakeExchanges
mapping[1].  When a client--server pair is created, each keeps their own
instance of FakeDriver -> FakeExchangeManager -> FakeExchange, each of
which has their own (instance-level) copy of e.g _server_queues[2], making
it impossible for them to communicate.

This patch makes the _exchanges mapping a class-level attribute in order
to keep the registered exchanges shared between all Manager instances,
allowing client and server communication (within a single process).

The test_server unit-tests had to be refactored to explicitly pass an exchange
name when building a target. This is required for an exchange name change to
have any effect during a test case run time when compared to passing the
exchange name through the URL. This issue was revealed with this patch.

[1] https://github.com/openstack/oslo.messaging/blob/master/oslo_messaging/_drivers/impl_fake.py#L145,#L148
[2] https://github.com/openstack/oslo.messaging/blob/master/oslo_messaging/_drivers/impl_fake.py#L88,#L92

Change-Id: I8dff66f4cafeb1f4c57dbfbfaba5d49e50f55fee
Closes-Bug: #1714055
This commit is contained in:
dparalen 2017-08-31 16:32:53 +02:00
parent ba30a3067d
commit d1dac1c11d
3 changed files with 55 additions and 29 deletions

View File

@ -142,10 +142,11 @@ class FakeExchange(object):
class FakeExchangeManager(object):
_exchanges_lock = threading.Lock()
_exchanges = {}
def __init__(self, default_exchange):
self._default_exchange = default_exchange
self._exchanges_lock = threading.Lock()
self._exchanges = {}
def get_exchange(self, name):
if name is None:

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import fixtures
import threading
from oslo_config import cfg
@ -131,6 +132,9 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
def setUp(self):
super(TestNotifyListener, self).setUp(conf=cfg.ConfigOpts())
ListenerSetupMixin.setUp(self)
self.useFixture(fixtures.MonkeyPatch(
'oslo_messaging._drivers.impl_fake.FakeExchangeManager._exchanges',
new_value={}))
@mock.patch('debtcollector.deprecate')
def test_constructor(self, deprecate):

View File

@ -35,9 +35,11 @@ load_tests = testscenarios.load_tests_apply_scenarios
class ServerSetupMixin(object):
class Server(object):
def __init__(self, transport, topic, server, endpoint, serializer):
def __init__(self, transport, topic, server, endpoint, serializer,
exchange):
self.controller = ServerSetupMixin.ServerController()
target = oslo_messaging.Target(topic=topic, server=server)
target = oslo_messaging.Target(topic=topic, server=server,
exchange=exchange)
self.server = oslo_messaging.get_rpc_server(transport,
target,
[endpoint,
@ -81,25 +83,25 @@ class ServerSetupMixin(object):
def __init__(self):
self.serializer = self.TestSerializer()
def _setup_server(self, transport, endpoint, topic=None, server=None):
def _setup_server(self, transport, endpoint, topic=None, server=None,
exchange=None):
server = self.Server(transport,
topic=topic or 'testtopic',
server=server or 'testserver',
endpoint=endpoint,
serializer=self.serializer)
serializer=self.serializer,
exchange=exchange)
server.start()
return server
def _stop_server(self, client, server, topic=None):
if topic is not None:
client = client.prepare(topic=topic)
def _stop_server(self, client, server, topic=None, exchange=None):
client.cast({}, 'stop')
server.wait()
def _setup_client(self, transport, topic='testtopic'):
return oslo_messaging.RPCClient(transport,
oslo_messaging.Target(topic=topic),
def _setup_client(self, transport, topic='testtopic', exchange=None):
target = oslo_messaging.Target(topic=topic, exchange=exchange)
return oslo_messaging.RPCClient(transport, target=target,
serializer=self.serializer)
@ -111,6 +113,11 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
def setUp(self):
super(TestRPCServer, self).setUp(conf=cfg.ConfigOpts())
# FakeExchangeManager uses a class-level exchanges mapping; "reset" it
# before tests assert amount of items stored
self.useFixture(fixtures.MonkeyPatch(
'oslo_messaging._drivers.impl_fake.FakeExchangeManager._exchanges',
new_value={}))
@mock.patch('warnings.warn')
def test_constructor(self, warn):
@ -300,14 +307,20 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self.assertEqual(['dsfoo', 'dsbar'], endpoint.pings)
def test_call(self):
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
# NOTE(milan): using a separate transport instance for each the client
# and the server to be able to check independent transport instances
# can communicate over same exchange&topic
transport_srv = oslo_messaging.get_rpc_transport(self.conf,
url='fake:')
transport_cli = oslo_messaging.get_rpc_transport(self.conf,
url='fake:')
class TestEndpoint(object):
def ping(self, ctxt, arg):
return arg
server_thread = self._setup_server(transport, TestEndpoint())
client = self._setup_client(transport)
server_thread = self._setup_server(transport_srv, TestEndpoint())
client = self._setup_client(transport_cli)
self.assertIsNone(client.call({}, 'ping', arg=None))
self.assertEqual(0, client.call({}, 'ping', arg=0))
@ -498,8 +511,8 @@ class TestMultipleServers(test_utils.BaseTestCase, ServerSetupMixin):
single_server = params['server1'] == params['server2']
return not (single_topic and single_server)
# fanout to multiple servers on same topic and exchange
# each endpoint will receive both messages
# fanout to multiple servers on same topic and exchange each endpoint
# will receive both messages
def fanout_to_servers(scenario):
params = scenario[1]
fanout = params['fanout1'] or params['fanout2']
@ -536,14 +549,16 @@ class TestMultipleServers(test_utils.BaseTestCase, ServerSetupMixin):
def setUp(self):
super(TestMultipleServers, self).setUp(conf=cfg.ConfigOpts())
self.useFixture(fixtures.MonkeyPatch(
'oslo_messaging._drivers.impl_fake.FakeExchangeManager._exchanges',
new_value={}))
def test_multiple_servers(self):
url1 = 'fake:///' + (self.exchange1 or '')
url2 = 'fake:///' + (self.exchange2 or '')
transport1 = oslo_messaging.get_rpc_transport(self.conf, url=url1)
if url1 != url2:
transport2 = oslo_messaging.get_rpc_transport(self.conf, url=url1)
transport1 = oslo_messaging.get_rpc_transport(self.conf,
url='fake:')
if self.exchange1 != self.exchange2:
transport2 = oslo_messaging.get_rpc_transport(self.conf,
url='fake:')
else:
transport2 = transport1
@ -563,12 +578,18 @@ class TestMultipleServers(test_utils.BaseTestCase, ServerSetupMixin):
endpoint1 = endpoint2 = TestEndpoint()
server1 = self._setup_server(transport1, endpoint1,
topic=self.topic1, server=self.server1)
topic=self.topic1,
exchange=self.exchange1,
server=self.server1)
server2 = self._setup_server(transport2, endpoint2,
topic=self.topic2, server=self.server2)
topic=self.topic2,
exchange=self.exchange2,
server=self.server2)
client1 = self._setup_client(transport1, topic=self.topic1)
client2 = self._setup_client(transport2, topic=self.topic2)
client1 = self._setup_client(transport1, topic=self.topic1,
exchange=self.exchange1)
client2 = self._setup_client(transport2, topic=self.topic2,
exchange=self.exchange2)
client1 = client1.prepare(server=self.server1)
client2 = client2.prepare(server=self.server2)
@ -584,9 +605,9 @@ class TestMultipleServers(test_utils.BaseTestCase, ServerSetupMixin):
(client2.call if self.call2 else client2.cast)({}, 'ping', arg='2')
self._stop_server(client1.prepare(fanout=None),
server1, topic=self.topic1)
server1, topic=self.topic1, exchange=self.exchange1)
self._stop_server(client2.prepare(fanout=None),
server2, topic=self.topic2)
server2, topic=self.topic2, exchange=self.exchange2)
def check(pings, expect):
self.assertEqual(len(expect), len(pings))