Add support for virtual hosts
Adds the 'pseudo_vhost' option which when enabled will incorporate the
virtual host into the address semantics. This creates a 'subnet' like
address space for each virtual host. Use this when the messaging bus
does not provide virtual hosting support. It is enabled by default as
to date none of the supported AMQP 1.0 message buses natively support
virtual hosting.
It also updates SSL support: SSL can either use the connection
hostname or the vhost name when validating a server's
certificate. This is controlled by the 'ssl_verify_vhost' option.
This option is disabled by default as it requires both vhost and SNI
support from the server. By default SSL will use the DNS name from
the TransportURL.
Change-Id: I49bb99d1b19e8e7e6fded76198da92ca5f7d65ab
Closes-Bug: #1700835
Partial-Bug: #1706987
(cherry picked from commit 263dce9ea8
)
This commit is contained in:
parent
459511e8d7
commit
a07d852b23
|
@ -104,40 +104,49 @@ class Addresser(object):
|
|||
"""Address used for shared subscribers (competing consumers)
|
||||
"""
|
||||
|
||||
def _concat(self, sep, items):
|
||||
return sep.join(filter(bool, items))
|
||||
|
||||
|
||||
class LegacyAddresser(Addresser):
|
||||
"""Legacy addresses are in the following format:
|
||||
|
||||
multicast: '$broadcast_prefix.$exchange.$topic.all'
|
||||
unicast: '$server_prefix.$exchange.$topic.$server'
|
||||
anycast: '$group_prefix.$exchange.$topic'
|
||||
multicast: '$broadcast_prefix[.$vhost].$exchange.$topic.all'
|
||||
unicast: '$server_prefix[.$vhost].$exchange.$topic.$server'
|
||||
anycast: '$group_prefix[.$vhost].$exchange.$topic'
|
||||
|
||||
Legacy addresses do not distinguish RPC traffic from Notification traffic
|
||||
"""
|
||||
def __init__(self, default_exchange, server_prefix, broadcast_prefix,
|
||||
group_prefix):
|
||||
group_prefix, vhost):
|
||||
super(LegacyAddresser, self).__init__(default_exchange)
|
||||
self._server_prefix = server_prefix
|
||||
self._broadcast_prefix = broadcast_prefix
|
||||
self._group_prefix = group_prefix
|
||||
self._vhost = vhost
|
||||
|
||||
def multicast_address(self, target, service):
|
||||
return self._concatenate([self._broadcast_prefix,
|
||||
target.exchange or self._default_exchange,
|
||||
target.topic, "all"])
|
||||
return self._concat(".",
|
||||
[self._broadcast_prefix,
|
||||
self._vhost,
|
||||
target.exchange or self._default_exchange,
|
||||
target.topic,
|
||||
"all"])
|
||||
|
||||
def unicast_address(self, target, service=SERVICE_RPC):
|
||||
return self._concatenate([self._server_prefix,
|
||||
target.exchange or self._default_exchange,
|
||||
target.topic, target.server])
|
||||
return self._concat(".",
|
||||
[self._server_prefix,
|
||||
self._vhost,
|
||||
target.exchange or self._default_exchange,
|
||||
target.topic,
|
||||
target.server])
|
||||
|
||||
def anycast_address(self, target, service=SERVICE_RPC):
|
||||
return self._concatenate([self._group_prefix,
|
||||
target.exchange or self._default_exchange,
|
||||
target.topic])
|
||||
|
||||
def _concatenate(self, items):
|
||||
return ".".join(filter(bool, items))
|
||||
return self._concat(".",
|
||||
[self._group_prefix,
|
||||
self._vhost,
|
||||
target.exchange or self._default_exchange,
|
||||
target.topic])
|
||||
|
||||
# for debug:
|
||||
def _is_multicast(self, address):
|
||||
|
@ -162,21 +171,25 @@ class RoutableAddresser(Addresser):
|
|||
'anycast'. The delivery semantics are followed by information pulled from
|
||||
the Target. The template is:
|
||||
|
||||
$prefix/$semantics/$exchange/$topic[/$server]
|
||||
$prefix/$semantics[/$vhost]/$exchange/$topic[/$server]
|
||||
|
||||
Examples based on the default prefix and semantic values:
|
||||
|
||||
rpc-unicast: "openstack.org/om/rpc/unicast/$exchange/$topic/$server"
|
||||
notify-anycast: "openstack.org/om/notify/anycast/$exchange/$topic"
|
||||
rpc-unicast: "openstack.org/om/rpc/unicast/my-exchange/my-topic/my-server"
|
||||
notify-anycast: "openstack.org/om/notify/anycast/my-vhost/exchange/topic"
|
||||
"""
|
||||
|
||||
def __init__(self, default_exchange, rpc_exchange, rpc_prefix,
|
||||
notify_exchange, notify_prefix, unicast_tag, multicast_tag,
|
||||
anycast_tag):
|
||||
anycast_tag, vhost):
|
||||
super(RoutableAddresser, self).__init__(default_exchange)
|
||||
if not self._default_exchange:
|
||||
self._default_exchange = "openstack"
|
||||
|
||||
# templates for address generation:
|
||||
|
||||
self._vhost = vhost
|
||||
|
||||
_rpc = rpc_prefix + "/"
|
||||
self._rpc_prefix = _rpc
|
||||
self._rpc_unicast = _rpc + unicast_tag
|
||||
|
@ -201,32 +214,34 @@ class RoutableAddresser(Addresser):
|
|||
prefix = self._rpc_multicast
|
||||
else:
|
||||
prefix = self._notify_multicast
|
||||
return "%s/%s/%s" % (prefix,
|
||||
return self._concat("/",
|
||||
[prefix,
|
||||
self._vhost,
|
||||
target.exchange or self._exchange[service],
|
||||
target.topic)
|
||||
target.topic])
|
||||
|
||||
def unicast_address(self, target, service=SERVICE_RPC):
|
||||
if service == SERVICE_RPC:
|
||||
prefix = self._rpc_unicast
|
||||
else:
|
||||
prefix = self._notify_unicast
|
||||
if target.server:
|
||||
return "%s/%s/%s/%s" % (prefix,
|
||||
target.exchange or self._exchange[service],
|
||||
target.topic,
|
||||
target.server)
|
||||
return "%s/%s/%s" % (prefix,
|
||||
return self._concat("/",
|
||||
[prefix,
|
||||
self._vhost,
|
||||
target.exchange or self._exchange[service],
|
||||
target.topic)
|
||||
target.topic,
|
||||
target.server])
|
||||
|
||||
def anycast_address(self, target, service=SERVICE_RPC):
|
||||
if service == SERVICE_RPC:
|
||||
prefix = self._rpc_anycast
|
||||
else:
|
||||
prefix = self._notify_anycast
|
||||
return "%s/%s/%s" % (prefix,
|
||||
return self._concat("/",
|
||||
[prefix,
|
||||
self._vhost,
|
||||
target.exchange or self._exchange[service],
|
||||
target.topic)
|
||||
target.topic])
|
||||
|
||||
# for debug:
|
||||
def _is_multicast(self, address):
|
||||
|
@ -255,7 +270,7 @@ class AddresserFactory(object):
|
|||
self._mode = mode
|
||||
self._kwargs = kwargs
|
||||
|
||||
def __call__(self, remote_properties):
|
||||
def __call__(self, remote_properties, vhost=None):
|
||||
# for backwards compatibility use legacy if dynamic and we're connected
|
||||
# to qpidd or we cannot identify the message bus. This can be
|
||||
# overridden via the configuration.
|
||||
|
@ -275,7 +290,8 @@ class AddresserFactory(object):
|
|||
return LegacyAddresser(self._default_exchange,
|
||||
self._kwargs['legacy_server_prefix'],
|
||||
self._kwargs['legacy_broadcast_prefix'],
|
||||
self._kwargs['legacy_group_prefix'])
|
||||
self._kwargs['legacy_group_prefix'],
|
||||
vhost)
|
||||
else:
|
||||
return RoutableAddresser(self._default_exchange,
|
||||
self._kwargs.get("rpc_exchange"),
|
||||
|
@ -284,4 +300,5 @@ class AddresserFactory(object):
|
|||
self._kwargs["notify_prefix"],
|
||||
self._kwargs["unicast"],
|
||||
self._kwargs["multicast"],
|
||||
self._kwargs["anycast"])
|
||||
self._kwargs["anycast"],
|
||||
vhost)
|
||||
|
|
|
@ -768,11 +768,12 @@ class Hosts(object):
|
|||
configuration and are used only if no username/password/realm is present in
|
||||
the URL.
|
||||
"""
|
||||
def __init__(self, entries=None, default_username=None,
|
||||
def __init__(self, url, default_username=None,
|
||||
default_password=None,
|
||||
default_realm=None):
|
||||
if entries:
|
||||
self._entries = entries[:]
|
||||
self.virtual_host = url.virtual_host
|
||||
if url.hosts:
|
||||
self._entries = url.hosts[:]
|
||||
else:
|
||||
self._entries = [transport.TransportHost(hostname="localhost",
|
||||
port=5672)]
|
||||
|
@ -797,7 +798,8 @@ class Hosts(object):
|
|||
return '<Hosts ' + str(self) + '>'
|
||||
|
||||
def __str__(self):
|
||||
return ", ".join(["%r" % th for th in self._entries])
|
||||
r = ', vhost=%s' % self.virtual_host if self.virtual_host else ''
|
||||
return ", ".join(["%r" % th for th in self._entries]) + r
|
||||
|
||||
|
||||
class Controller(pyngus.ConnectionEventHandler):
|
||||
|
@ -807,7 +809,7 @@ class Controller(pyngus.ConnectionEventHandler):
|
|||
work is done on the Eventloop thread, allowing the driver to run
|
||||
asynchronously from the messaging clients.
|
||||
"""
|
||||
def __init__(self, hosts, default_exchange, config):
|
||||
def __init__(self, url, default_exchange, config):
|
||||
self.processor = None
|
||||
self._socket_connection = None
|
||||
self._node = platform.node() or "<UNKNOWN>"
|
||||
|
@ -839,10 +841,12 @@ class Controller(pyngus.ConnectionEventHandler):
|
|||
self.ssl_key_password = config.oslo_messaging_amqp.ssl_key_password
|
||||
self.ssl_allow_insecure = \
|
||||
config.oslo_messaging_amqp.allow_insecure_clients
|
||||
self.ssl_verify_vhost = config.oslo_messaging_amqp.ssl_verify_vhost
|
||||
self.pseudo_vhost = config.oslo_messaging_amqp.pseudo_vhost
|
||||
self.sasl_mechanisms = config.oslo_messaging_amqp.sasl_mechanisms
|
||||
self.sasl_config_dir = config.oslo_messaging_amqp.sasl_config_dir
|
||||
self.sasl_config_name = config.oslo_messaging_amqp.sasl_config_name
|
||||
self.hosts = Hosts(hosts, config.oslo_messaging_amqp.username,
|
||||
self.hosts = Hosts(url, config.oslo_messaging_amqp.username,
|
||||
config.oslo_messaging_amqp.password,
|
||||
config.oslo_messaging_amqp.sasl_default_realm)
|
||||
self.conn_retry_interval = \
|
||||
|
@ -974,20 +978,44 @@ class Controller(pyngus.ConnectionEventHandler):
|
|||
host = self.hosts.current
|
||||
conn_props = {'properties': {u'process': self._command,
|
||||
u'pid': self._pid,
|
||||
u'node': self._node},
|
||||
'hostname': host.hostname}
|
||||
u'node': self._node}}
|
||||
# only set hostname in the AMQP 1.0 Open performative if the message
|
||||
# bus can interpret it as the virtual host. We leave it unspecified
|
||||
# since apparently noone can agree on how it should be used otherwise!
|
||||
if self.hosts.virtual_host and not self.pseudo_vhost:
|
||||
conn_props['hostname'] = self.hosts.virtual_host
|
||||
if self.idle_timeout:
|
||||
conn_props["idle-time-out"] = float(self.idle_timeout)
|
||||
if self.trace_protocol:
|
||||
conn_props["x-trace-protocol"] = self.trace_protocol
|
||||
|
||||
# SSL configuration
|
||||
ssl_enabled = False
|
||||
if self.ssl:
|
||||
ssl_enabled = True
|
||||
conn_props["x-ssl"] = self.ssl
|
||||
if self.ssl_ca_file:
|
||||
conn_props["x-ssl-ca-file"] = self.ssl_ca_file
|
||||
ssl_enabled = True
|
||||
if self.ssl_cert_file:
|
||||
ssl_enabled = True
|
||||
conn_props["x-ssl-identity"] = (self.ssl_cert_file,
|
||||
self.ssl_key_file,
|
||||
self.ssl_key_password)
|
||||
if ssl_enabled:
|
||||
# Set the identity of the remote server for SSL to use when
|
||||
# verifying the received certificate. Typically this is the DNS
|
||||
# name used to set up the TCP connections. However some servers
|
||||
# may provide a certificate for the virtual host instead. If that
|
||||
# is the case we need to use the virtual hostname instead.
|
||||
# Refer to SSL Server Name Indication (SNI) for the entire story:
|
||||
# https://tools.ietf.org/html/rfc6066
|
||||
if self.ssl_verify_vhost:
|
||||
if self.hosts.virtual_host:
|
||||
conn_props['x-ssl-peer-name'] = self.hosts.virtual_host
|
||||
else:
|
||||
conn_props['x-ssl-peer-name'] = host.hostname
|
||||
|
||||
# SASL configuration:
|
||||
if self.sasl_mechanisms:
|
||||
conn_props["x-sasl-mechs"] = self.sasl_mechanisms
|
||||
|
@ -1052,9 +1080,12 @@ class Controller(pyngus.ConnectionEventHandler):
|
|||
point, we are ready to receive messages, so start all pending RPC
|
||||
requests.
|
||||
"""
|
||||
LOG.info(_LI("Messaging is active (%(hostname)s:%(port)s)"),
|
||||
LOG.info(_LI("Messaging is active (%(hostname)s:%(port)s%(vhost)s)"),
|
||||
{'hostname': self.hosts.current.hostname,
|
||||
'port': self.hosts.current.port})
|
||||
'port': self.hosts.current.port,
|
||||
'vhost': ("/" + self.hosts.virtual_host
|
||||
if self.hosts.virtual_host else "")})
|
||||
|
||||
for sender in itervalues(self._all_senders):
|
||||
sender.attach(self._socket_connection.pyngus_conn,
|
||||
self.reply_link, self.addresser)
|
||||
|
@ -1099,7 +1130,9 @@ class Controller(pyngus.ConnectionEventHandler):
|
|||
# allocate an addresser based on the advertised properties of the
|
||||
# message bus
|
||||
props = connection.remote_properties or {}
|
||||
self.addresser = self.addresser_factory(props)
|
||||
self.addresser = self.addresser_factory(props,
|
||||
self.hosts.virtual_host
|
||||
if self.pseudo_vhost else None)
|
||||
for servers in itervalues(self._servers):
|
||||
for server in itervalues(servers):
|
||||
server.attach(self._socket_connection.pyngus_conn,
|
||||
|
|
|
@ -346,7 +346,15 @@ class Thread(threading.Thread):
|
|||
"""Run the proton event/timer loop."""
|
||||
LOG.debug("Starting Proton thread, container=%s",
|
||||
self._container.name)
|
||||
try:
|
||||
self._main_loop()
|
||||
except Exception:
|
||||
# unknown error - fatal
|
||||
LOG.exception("Fatal unhandled event loop error!")
|
||||
raise
|
||||
|
||||
def _main_loop(self):
|
||||
# Main event loop
|
||||
while not self._shutdown:
|
||||
|
||||
readfds = [self._requests]
|
||||
|
|
|
@ -60,6 +60,17 @@ amqp1_opts = [
|
|||
secret=True,
|
||||
help='Password for decrypting ssl_key_file (if encrypted)'),
|
||||
|
||||
cfg.BoolOpt('ssl_verify_vhost',
|
||||
default=False,
|
||||
help="By default SSL checks that the name in the server's"
|
||||
" certificate matches the hostname in the transport_url. In"
|
||||
" some configurations it may be preferable to use the virtual"
|
||||
" hostname instead, for example if the server uses the Server"
|
||||
" Name Indication TLS extension (rfc6066) to provide a"
|
||||
" certificate per virtual host. Set ssl_verify_vhost to True"
|
||||
" if the server's SSL certificate uses the virtual host name"
|
||||
" instead of the DNS name."),
|
||||
|
||||
cfg.BoolOpt('allow_insecure_clients',
|
||||
default=False,
|
||||
deprecated_group='amqp1',
|
||||
|
@ -172,6 +183,17 @@ amqp1_opts = [
|
|||
"'dynamic' - use legacy addresses if the message bus does not"
|
||||
" support routing otherwise use routable addressing"),
|
||||
|
||||
cfg.BoolOpt('pseudo_vhost',
|
||||
default=True,
|
||||
help="Enable virtual host support for those message buses"
|
||||
" that do not natively support virtual hosting (such as"
|
||||
" qpidd). When set to true the virtual host name will be"
|
||||
" added to all message bus addresses, effectively creating"
|
||||
" a private 'subnet' per virtual host. Set to False if the"
|
||||
" message bus supports virtual hosting using the 'hostname'"
|
||||
" field in the AMQP 1.0 Open performative as the name of the"
|
||||
" virtual host."),
|
||||
|
||||
# Legacy addressing customization:
|
||||
|
||||
cfg.StrOpt('server_request_prefix',
|
||||
|
|
|
@ -202,7 +202,6 @@ class ProtonDriver(base.BaseDriver):
|
|||
conf.register_opts(opts.amqp1_opts, group=opt_group)
|
||||
conf = common.ConfigOptsProxy(conf, url, opt_group.name)
|
||||
|
||||
self._hosts = url.hosts
|
||||
self._conf = conf
|
||||
self._default_exchange = default_exchange
|
||||
|
||||
|
@ -257,7 +256,7 @@ class ProtonDriver(base.BaseDriver):
|
|||
self._ctrl = None
|
||||
# Create a Controller that connects to the messaging
|
||||
# service:
|
||||
self._ctrl = controller.Controller(self._hosts,
|
||||
self._ctrl = controller.Controller(self._url,
|
||||
self._default_exchange,
|
||||
self._conf)
|
||||
self._ctrl.connect()
|
||||
|
|
|
@ -12,6 +12,7 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
import logging
|
||||
import os
|
||||
import select
|
||||
|
@ -1515,19 +1516,12 @@ class TestSSL(test_utils.BaseTestCase):
|
|||
self._tmpdir = None
|
||||
self.skipTest("OpenSSL tools not installed - skipping")
|
||||
|
||||
def test_server_ok(self):
|
||||
# test client authenticates server
|
||||
self._broker = FakeBroker(self.conf.oslo_messaging_amqp,
|
||||
sock_addr=self._ssl_config['s_name'],
|
||||
ssl_config=self._ssl_config)
|
||||
url = oslo_messaging.TransportURL.parse(self.conf, "amqp://%s:%d" %
|
||||
(self._broker.host,
|
||||
self._broker.port))
|
||||
def _ssl_server_ok(self, url):
|
||||
self._broker.start()
|
||||
|
||||
self.config(ssl_ca_file=self._ssl_config['ca_cert'],
|
||||
group='oslo_messaging_amqp')
|
||||
driver = amqp_driver.ProtonDriver(self.conf, url)
|
||||
tport_url = oslo_messaging.TransportURL.parse(self.conf, url)
|
||||
driver = amqp_driver.ProtonDriver(self.conf, tport_url)
|
||||
target = oslo_messaging.Target(topic="test-topic")
|
||||
listener = _ListenerThread(
|
||||
driver.listen(target, None, None)._poll_style_listener, 1)
|
||||
|
@ -1541,6 +1535,34 @@ class TestSSL(test_utils.BaseTestCase):
|
|||
self.assertFalse(listener.isAlive())
|
||||
driver.cleanup()
|
||||
|
||||
def test_server_ok(self):
|
||||
# test client authenticates server
|
||||
self._broker = FakeBroker(self.conf.oslo_messaging_amqp,
|
||||
sock_addr=self._ssl_config['s_name'],
|
||||
ssl_config=self._ssl_config)
|
||||
url = "amqp://%s:%d" % (self._broker.host, self._broker.port)
|
||||
self._ssl_server_ok(url)
|
||||
|
||||
def test_server_ignore_vhost_ok(self):
|
||||
# test client authenticates server and ignores vhost
|
||||
self._broker = FakeBroker(self.conf.oslo_messaging_amqp,
|
||||
sock_addr=self._ssl_config['s_name'],
|
||||
ssl_config=self._ssl_config)
|
||||
url = "amqp://%s:%d/my-vhost" % (self._broker.host, self._broker.port)
|
||||
self._ssl_server_ok(url)
|
||||
|
||||
def test_server_check_vhost_ok(self):
|
||||
# test client authenticates server using vhost as CN
|
||||
# Use 'Invalid' from bad_cert CN
|
||||
self.config(ssl_verify_vhost=True, group='oslo_messaging_amqp')
|
||||
self._ssl_config['s_cert'] = self._ssl_config['bad_cert']
|
||||
self._ssl_config['s_key'] = self._ssl_config['bad_key']
|
||||
self._broker = FakeBroker(self.conf.oslo_messaging_amqp,
|
||||
sock_addr=self._ssl_config['s_name'],
|
||||
ssl_config=self._ssl_config)
|
||||
url = "amqp://%s:%d/Invalid" % (self._broker.host, self._broker.port)
|
||||
self._ssl_server_ok(url)
|
||||
|
||||
@mock.patch('ssl.get_default_verify_paths')
|
||||
def test_server_ok_with_ssl_set_in_transport_url(self, mock_verify_paths):
|
||||
# test client authenticates server
|
||||
|
@ -1630,6 +1652,90 @@ class TestSSL(test_utils.BaseTestCase):
|
|||
super(TestSSL, self).tearDown()
|
||||
|
||||
|
||||
@testtools.skipUnless(pyngus, "proton modules not present")
|
||||
class TestVHost(_AmqpBrokerTestCaseAuto):
|
||||
"""Verify the pseudo virtual host behavior"""
|
||||
|
||||
def _vhost_test(self):
|
||||
"""Verify that all messaging for a particular vhost stays on that vhost
|
||||
"""
|
||||
self.config(pseudo_vhost=True,
|
||||
group="oslo_messaging_amqp")
|
||||
|
||||
vhosts = ["None", "HOSTA", "HOSTB", "HOSTC"]
|
||||
target = oslo_messaging.Target(topic="test-topic")
|
||||
fanout = oslo_messaging.Target(topic="test-topic", fanout=True)
|
||||
|
||||
listeners = {}
|
||||
ldrivers = {}
|
||||
sdrivers = {}
|
||||
|
||||
replies = {}
|
||||
msgs = {}
|
||||
|
||||
for vhost in vhosts:
|
||||
url = copy.copy(self._broker_url)
|
||||
url.virtual_host = vhost if vhost != "None" else None
|
||||
ldriver = amqp_driver.ProtonDriver(self.conf, url)
|
||||
listeners[vhost] = _ListenerThread(
|
||||
ldriver.listen(target, None, None)._poll_style_listener,
|
||||
10)
|
||||
ldrivers[vhost] = ldriver
|
||||
sdrivers[vhost] = amqp_driver.ProtonDriver(self.conf, url)
|
||||
replies[vhost] = []
|
||||
msgs[vhost] = []
|
||||
|
||||
# send a fanout and a single rpc call to each listener
|
||||
for vhost in vhosts:
|
||||
if vhost == "HOSTC": # expect no messages to HOSTC
|
||||
continue
|
||||
sdrivers[vhost].send(fanout,
|
||||
{"context": vhost},
|
||||
{"vhost": vhost,
|
||||
"fanout": True,
|
||||
"id": vhost})
|
||||
replies[vhost].append(sdrivers[vhost].send(target,
|
||||
{"context": vhost},
|
||||
{"method": "echo",
|
||||
"id": vhost},
|
||||
wait_for_reply=True))
|
||||
time.sleep(1)
|
||||
|
||||
for vhost in vhosts:
|
||||
msgs[vhost] += listeners[vhost].get_messages()
|
||||
if vhost == "HOSTC":
|
||||
# HOSTC should get nothing
|
||||
self.assertEqual(0, len(msgs[vhost]))
|
||||
self.assertEqual(0, len(replies[vhost]))
|
||||
continue
|
||||
|
||||
self.assertEqual(2, len(msgs[vhost]))
|
||||
for m in msgs[vhost]:
|
||||
# the id must match the vhost
|
||||
self.assertEqual(vhost, m.message.get("id"))
|
||||
self.assertEqual(1, len(replies[vhost]))
|
||||
for m in replies[vhost]:
|
||||
# same for correlation id
|
||||
self.assertEqual(vhost, m.get("correlation-id"))
|
||||
|
||||
for vhost in vhosts:
|
||||
listeners[vhost].kill()
|
||||
ldrivers[vhost].cleanup
|
||||
sdrivers[vhost].cleanup()
|
||||
|
||||
def test_vhost_routing(self):
|
||||
"""Test vhost using routable addresses
|
||||
"""
|
||||
self.config(addressing_mode='routable', group="oslo_messaging_amqp")
|
||||
self._vhost_test()
|
||||
|
||||
def test_vhost_legacy(self):
|
||||
"""Test vhost using legacy addresses
|
||||
"""
|
||||
self.config(addressing_mode='legacy', group="oslo_messaging_amqp")
|
||||
self._vhost_test()
|
||||
|
||||
|
||||
class FakeBroker(threading.Thread):
|
||||
"""A test AMQP message 'broker'."""
|
||||
|
||||
|
|
Loading…
Reference in New Issue