Add support for virtual hosts

Adds the 'pseudo_vhost' option which when enabled will incorporate the
virtual host into the address semantics.  This creates a 'subnet' like
address space for each virtual host.  Use this when the messaging bus
does not provide virtual hosting support.  It is enabled by default as
to date none of the supported AMQP 1.0 message buses natively support
virtual hosting.

It also updates SSL support: SSL can either use the connection
hostname or the vhost name when validating a server's
certificate. This is controlled by the 'ssl_verify_vhost' option.
This option is disabled by default as it requires both vhost and SNI
support from the server.  By default SSL will use the DNS name from
the TransportURL.

Change-Id: I49bb99d1b19e8e7e6fded76198da92ca5f7d65ab
Closes-Bug: #1700835
Partial-Bug: #1706987
(cherry picked from commit 263dce9ea8)
This commit is contained in:
Kenneth Giusti 2017-06-27 18:06:16 -04:00
parent 459511e8d7
commit a07d852b23
6 changed files with 242 additions and 57 deletions

View File

@ -104,40 +104,49 @@ class Addresser(object):
"""Address used for shared subscribers (competing consumers)
"""
def _concat(self, sep, items):
return sep.join(filter(bool, items))
class LegacyAddresser(Addresser):
"""Legacy addresses are in the following format:
multicast: '$broadcast_prefix.$exchange.$topic.all'
unicast: '$server_prefix.$exchange.$topic.$server'
anycast: '$group_prefix.$exchange.$topic'
multicast: '$broadcast_prefix[.$vhost].$exchange.$topic.all'
unicast: '$server_prefix[.$vhost].$exchange.$topic.$server'
anycast: '$group_prefix[.$vhost].$exchange.$topic'
Legacy addresses do not distinguish RPC traffic from Notification traffic
"""
def __init__(self, default_exchange, server_prefix, broadcast_prefix,
group_prefix):
group_prefix, vhost):
super(LegacyAddresser, self).__init__(default_exchange)
self._server_prefix = server_prefix
self._broadcast_prefix = broadcast_prefix
self._group_prefix = group_prefix
self._vhost = vhost
def multicast_address(self, target, service):
return self._concatenate([self._broadcast_prefix,
target.exchange or self._default_exchange,
target.topic, "all"])
return self._concat(".",
[self._broadcast_prefix,
self._vhost,
target.exchange or self._default_exchange,
target.topic,
"all"])
def unicast_address(self, target, service=SERVICE_RPC):
return self._concatenate([self._server_prefix,
target.exchange or self._default_exchange,
target.topic, target.server])
return self._concat(".",
[self._server_prefix,
self._vhost,
target.exchange or self._default_exchange,
target.topic,
target.server])
def anycast_address(self, target, service=SERVICE_RPC):
return self._concatenate([self._group_prefix,
target.exchange or self._default_exchange,
target.topic])
def _concatenate(self, items):
return ".".join(filter(bool, items))
return self._concat(".",
[self._group_prefix,
self._vhost,
target.exchange or self._default_exchange,
target.topic])
# for debug:
def _is_multicast(self, address):
@ -162,21 +171,25 @@ class RoutableAddresser(Addresser):
'anycast'. The delivery semantics are followed by information pulled from
the Target. The template is:
$prefix/$semantics/$exchange/$topic[/$server]
$prefix/$semantics[/$vhost]/$exchange/$topic[/$server]
Examples based on the default prefix and semantic values:
rpc-unicast: "openstack.org/om/rpc/unicast/$exchange/$topic/$server"
notify-anycast: "openstack.org/om/notify/anycast/$exchange/$topic"
rpc-unicast: "openstack.org/om/rpc/unicast/my-exchange/my-topic/my-server"
notify-anycast: "openstack.org/om/notify/anycast/my-vhost/exchange/topic"
"""
def __init__(self, default_exchange, rpc_exchange, rpc_prefix,
notify_exchange, notify_prefix, unicast_tag, multicast_tag,
anycast_tag):
anycast_tag, vhost):
super(RoutableAddresser, self).__init__(default_exchange)
if not self._default_exchange:
self._default_exchange = "openstack"
# templates for address generation:
self._vhost = vhost
_rpc = rpc_prefix + "/"
self._rpc_prefix = _rpc
self._rpc_unicast = _rpc + unicast_tag
@ -201,32 +214,34 @@ class RoutableAddresser(Addresser):
prefix = self._rpc_multicast
else:
prefix = self._notify_multicast
return "%s/%s/%s" % (prefix,
return self._concat("/",
[prefix,
self._vhost,
target.exchange or self._exchange[service],
target.topic)
target.topic])
def unicast_address(self, target, service=SERVICE_RPC):
if service == SERVICE_RPC:
prefix = self._rpc_unicast
else:
prefix = self._notify_unicast
if target.server:
return "%s/%s/%s/%s" % (prefix,
target.exchange or self._exchange[service],
target.topic,
target.server)
return "%s/%s/%s" % (prefix,
return self._concat("/",
[prefix,
self._vhost,
target.exchange or self._exchange[service],
target.topic)
target.topic,
target.server])
def anycast_address(self, target, service=SERVICE_RPC):
if service == SERVICE_RPC:
prefix = self._rpc_anycast
else:
prefix = self._notify_anycast
return "%s/%s/%s" % (prefix,
return self._concat("/",
[prefix,
self._vhost,
target.exchange or self._exchange[service],
target.topic)
target.topic])
# for debug:
def _is_multicast(self, address):
@ -255,7 +270,7 @@ class AddresserFactory(object):
self._mode = mode
self._kwargs = kwargs
def __call__(self, remote_properties):
def __call__(self, remote_properties, vhost=None):
# for backwards compatibility use legacy if dynamic and we're connected
# to qpidd or we cannot identify the message bus. This can be
# overridden via the configuration.
@ -275,7 +290,8 @@ class AddresserFactory(object):
return LegacyAddresser(self._default_exchange,
self._kwargs['legacy_server_prefix'],
self._kwargs['legacy_broadcast_prefix'],
self._kwargs['legacy_group_prefix'])
self._kwargs['legacy_group_prefix'],
vhost)
else:
return RoutableAddresser(self._default_exchange,
self._kwargs.get("rpc_exchange"),
@ -284,4 +300,5 @@ class AddresserFactory(object):
self._kwargs["notify_prefix"],
self._kwargs["unicast"],
self._kwargs["multicast"],
self._kwargs["anycast"])
self._kwargs["anycast"],
vhost)

View File

@ -768,11 +768,12 @@ class Hosts(object):
configuration and are used only if no username/password/realm is present in
the URL.
"""
def __init__(self, entries=None, default_username=None,
def __init__(self, url, default_username=None,
default_password=None,
default_realm=None):
if entries:
self._entries = entries[:]
self.virtual_host = url.virtual_host
if url.hosts:
self._entries = url.hosts[:]
else:
self._entries = [transport.TransportHost(hostname="localhost",
port=5672)]
@ -797,7 +798,8 @@ class Hosts(object):
return '<Hosts ' + str(self) + '>'
def __str__(self):
return ", ".join(["%r" % th for th in self._entries])
r = ', vhost=%s' % self.virtual_host if self.virtual_host else ''
return ", ".join(["%r" % th for th in self._entries]) + r
class Controller(pyngus.ConnectionEventHandler):
@ -807,7 +809,7 @@ class Controller(pyngus.ConnectionEventHandler):
work is done on the Eventloop thread, allowing the driver to run
asynchronously from the messaging clients.
"""
def __init__(self, hosts, default_exchange, config):
def __init__(self, url, default_exchange, config):
self.processor = None
self._socket_connection = None
self._node = platform.node() or "<UNKNOWN>"
@ -839,10 +841,12 @@ class Controller(pyngus.ConnectionEventHandler):
self.ssl_key_password = config.oslo_messaging_amqp.ssl_key_password
self.ssl_allow_insecure = \
config.oslo_messaging_amqp.allow_insecure_clients
self.ssl_verify_vhost = config.oslo_messaging_amqp.ssl_verify_vhost
self.pseudo_vhost = config.oslo_messaging_amqp.pseudo_vhost
self.sasl_mechanisms = config.oslo_messaging_amqp.sasl_mechanisms
self.sasl_config_dir = config.oslo_messaging_amqp.sasl_config_dir
self.sasl_config_name = config.oslo_messaging_amqp.sasl_config_name
self.hosts = Hosts(hosts, config.oslo_messaging_amqp.username,
self.hosts = Hosts(url, config.oslo_messaging_amqp.username,
config.oslo_messaging_amqp.password,
config.oslo_messaging_amqp.sasl_default_realm)
self.conn_retry_interval = \
@ -974,20 +978,44 @@ class Controller(pyngus.ConnectionEventHandler):
host = self.hosts.current
conn_props = {'properties': {u'process': self._command,
u'pid': self._pid,
u'node': self._node},
'hostname': host.hostname}
u'node': self._node}}
# only set hostname in the AMQP 1.0 Open performative if the message
# bus can interpret it as the virtual host. We leave it unspecified
# since apparently noone can agree on how it should be used otherwise!
if self.hosts.virtual_host and not self.pseudo_vhost:
conn_props['hostname'] = self.hosts.virtual_host
if self.idle_timeout:
conn_props["idle-time-out"] = float(self.idle_timeout)
if self.trace_protocol:
conn_props["x-trace-protocol"] = self.trace_protocol
# SSL configuration
ssl_enabled = False
if self.ssl:
ssl_enabled = True
conn_props["x-ssl"] = self.ssl
if self.ssl_ca_file:
conn_props["x-ssl-ca-file"] = self.ssl_ca_file
ssl_enabled = True
if self.ssl_cert_file:
ssl_enabled = True
conn_props["x-ssl-identity"] = (self.ssl_cert_file,
self.ssl_key_file,
self.ssl_key_password)
if ssl_enabled:
# Set the identity of the remote server for SSL to use when
# verifying the received certificate. Typically this is the DNS
# name used to set up the TCP connections. However some servers
# may provide a certificate for the virtual host instead. If that
# is the case we need to use the virtual hostname instead.
# Refer to SSL Server Name Indication (SNI) for the entire story:
# https://tools.ietf.org/html/rfc6066
if self.ssl_verify_vhost:
if self.hosts.virtual_host:
conn_props['x-ssl-peer-name'] = self.hosts.virtual_host
else:
conn_props['x-ssl-peer-name'] = host.hostname
# SASL configuration:
if self.sasl_mechanisms:
conn_props["x-sasl-mechs"] = self.sasl_mechanisms
@ -1052,9 +1080,12 @@ class Controller(pyngus.ConnectionEventHandler):
point, we are ready to receive messages, so start all pending RPC
requests.
"""
LOG.info(_LI("Messaging is active (%(hostname)s:%(port)s)"),
LOG.info(_LI("Messaging is active (%(hostname)s:%(port)s%(vhost)s)"),
{'hostname': self.hosts.current.hostname,
'port': self.hosts.current.port})
'port': self.hosts.current.port,
'vhost': ("/" + self.hosts.virtual_host
if self.hosts.virtual_host else "")})
for sender in itervalues(self._all_senders):
sender.attach(self._socket_connection.pyngus_conn,
self.reply_link, self.addresser)
@ -1099,7 +1130,9 @@ class Controller(pyngus.ConnectionEventHandler):
# allocate an addresser based on the advertised properties of the
# message bus
props = connection.remote_properties or {}
self.addresser = self.addresser_factory(props)
self.addresser = self.addresser_factory(props,
self.hosts.virtual_host
if self.pseudo_vhost else None)
for servers in itervalues(self._servers):
for server in itervalues(servers):
server.attach(self._socket_connection.pyngus_conn,

View File

@ -346,7 +346,15 @@ class Thread(threading.Thread):
"""Run the proton event/timer loop."""
LOG.debug("Starting Proton thread, container=%s",
self._container.name)
try:
self._main_loop()
except Exception:
# unknown error - fatal
LOG.exception("Fatal unhandled event loop error!")
raise
def _main_loop(self):
# Main event loop
while not self._shutdown:
readfds = [self._requests]

View File

@ -60,6 +60,17 @@ amqp1_opts = [
secret=True,
help='Password for decrypting ssl_key_file (if encrypted)'),
cfg.BoolOpt('ssl_verify_vhost',
default=False,
help="By default SSL checks that the name in the server's"
" certificate matches the hostname in the transport_url. In"
" some configurations it may be preferable to use the virtual"
" hostname instead, for example if the server uses the Server"
" Name Indication TLS extension (rfc6066) to provide a"
" certificate per virtual host. Set ssl_verify_vhost to True"
" if the server's SSL certificate uses the virtual host name"
" instead of the DNS name."),
cfg.BoolOpt('allow_insecure_clients',
default=False,
deprecated_group='amqp1',
@ -172,6 +183,17 @@ amqp1_opts = [
"'dynamic' - use legacy addresses if the message bus does not"
" support routing otherwise use routable addressing"),
cfg.BoolOpt('pseudo_vhost',
default=True,
help="Enable virtual host support for those message buses"
" that do not natively support virtual hosting (such as"
" qpidd). When set to true the virtual host name will be"
" added to all message bus addresses, effectively creating"
" a private 'subnet' per virtual host. Set to False if the"
" message bus supports virtual hosting using the 'hostname'"
" field in the AMQP 1.0 Open performative as the name of the"
" virtual host."),
# Legacy addressing customization:
cfg.StrOpt('server_request_prefix',

View File

@ -202,7 +202,6 @@ class ProtonDriver(base.BaseDriver):
conf.register_opts(opts.amqp1_opts, group=opt_group)
conf = common.ConfigOptsProxy(conf, url, opt_group.name)
self._hosts = url.hosts
self._conf = conf
self._default_exchange = default_exchange
@ -257,7 +256,7 @@ class ProtonDriver(base.BaseDriver):
self._ctrl = None
# Create a Controller that connects to the messaging
# service:
self._ctrl = controller.Controller(self._hosts,
self._ctrl = controller.Controller(self._url,
self._default_exchange,
self._conf)
self._ctrl.connect()

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
import logging
import os
import select
@ -1515,19 +1516,12 @@ class TestSSL(test_utils.BaseTestCase):
self._tmpdir = None
self.skipTest("OpenSSL tools not installed - skipping")
def test_server_ok(self):
# test client authenticates server
self._broker = FakeBroker(self.conf.oslo_messaging_amqp,
sock_addr=self._ssl_config['s_name'],
ssl_config=self._ssl_config)
url = oslo_messaging.TransportURL.parse(self.conf, "amqp://%s:%d" %
(self._broker.host,
self._broker.port))
def _ssl_server_ok(self, url):
self._broker.start()
self.config(ssl_ca_file=self._ssl_config['ca_cert'],
group='oslo_messaging_amqp')
driver = amqp_driver.ProtonDriver(self.conf, url)
tport_url = oslo_messaging.TransportURL.parse(self.conf, url)
driver = amqp_driver.ProtonDriver(self.conf, tport_url)
target = oslo_messaging.Target(topic="test-topic")
listener = _ListenerThread(
driver.listen(target, None, None)._poll_style_listener, 1)
@ -1541,6 +1535,34 @@ class TestSSL(test_utils.BaseTestCase):
self.assertFalse(listener.isAlive())
driver.cleanup()
def test_server_ok(self):
# test client authenticates server
self._broker = FakeBroker(self.conf.oslo_messaging_amqp,
sock_addr=self._ssl_config['s_name'],
ssl_config=self._ssl_config)
url = "amqp://%s:%d" % (self._broker.host, self._broker.port)
self._ssl_server_ok(url)
def test_server_ignore_vhost_ok(self):
# test client authenticates server and ignores vhost
self._broker = FakeBroker(self.conf.oslo_messaging_amqp,
sock_addr=self._ssl_config['s_name'],
ssl_config=self._ssl_config)
url = "amqp://%s:%d/my-vhost" % (self._broker.host, self._broker.port)
self._ssl_server_ok(url)
def test_server_check_vhost_ok(self):
# test client authenticates server using vhost as CN
# Use 'Invalid' from bad_cert CN
self.config(ssl_verify_vhost=True, group='oslo_messaging_amqp')
self._ssl_config['s_cert'] = self._ssl_config['bad_cert']
self._ssl_config['s_key'] = self._ssl_config['bad_key']
self._broker = FakeBroker(self.conf.oslo_messaging_amqp,
sock_addr=self._ssl_config['s_name'],
ssl_config=self._ssl_config)
url = "amqp://%s:%d/Invalid" % (self._broker.host, self._broker.port)
self._ssl_server_ok(url)
@mock.patch('ssl.get_default_verify_paths')
def test_server_ok_with_ssl_set_in_transport_url(self, mock_verify_paths):
# test client authenticates server
@ -1630,6 +1652,90 @@ class TestSSL(test_utils.BaseTestCase):
super(TestSSL, self).tearDown()
@testtools.skipUnless(pyngus, "proton modules not present")
class TestVHost(_AmqpBrokerTestCaseAuto):
"""Verify the pseudo virtual host behavior"""
def _vhost_test(self):
"""Verify that all messaging for a particular vhost stays on that vhost
"""
self.config(pseudo_vhost=True,
group="oslo_messaging_amqp")
vhosts = ["None", "HOSTA", "HOSTB", "HOSTC"]
target = oslo_messaging.Target(topic="test-topic")
fanout = oslo_messaging.Target(topic="test-topic", fanout=True)
listeners = {}
ldrivers = {}
sdrivers = {}
replies = {}
msgs = {}
for vhost in vhosts:
url = copy.copy(self._broker_url)
url.virtual_host = vhost if vhost != "None" else None
ldriver = amqp_driver.ProtonDriver(self.conf, url)
listeners[vhost] = _ListenerThread(
ldriver.listen(target, None, None)._poll_style_listener,
10)
ldrivers[vhost] = ldriver
sdrivers[vhost] = amqp_driver.ProtonDriver(self.conf, url)
replies[vhost] = []
msgs[vhost] = []
# send a fanout and a single rpc call to each listener
for vhost in vhosts:
if vhost == "HOSTC": # expect no messages to HOSTC
continue
sdrivers[vhost].send(fanout,
{"context": vhost},
{"vhost": vhost,
"fanout": True,
"id": vhost})
replies[vhost].append(sdrivers[vhost].send(target,
{"context": vhost},
{"method": "echo",
"id": vhost},
wait_for_reply=True))
time.sleep(1)
for vhost in vhosts:
msgs[vhost] += listeners[vhost].get_messages()
if vhost == "HOSTC":
# HOSTC should get nothing
self.assertEqual(0, len(msgs[vhost]))
self.assertEqual(0, len(replies[vhost]))
continue
self.assertEqual(2, len(msgs[vhost]))
for m in msgs[vhost]:
# the id must match the vhost
self.assertEqual(vhost, m.message.get("id"))
self.assertEqual(1, len(replies[vhost]))
for m in replies[vhost]:
# same for correlation id
self.assertEqual(vhost, m.get("correlation-id"))
for vhost in vhosts:
listeners[vhost].kill()
ldrivers[vhost].cleanup
sdrivers[vhost].cleanup()
def test_vhost_routing(self):
"""Test vhost using routable addresses
"""
self.config(addressing_mode='routable', group="oslo_messaging_amqp")
self._vhost_test()
def test_vhost_legacy(self):
"""Test vhost using legacy addresses
"""
self.config(addressing_mode='legacy', group="oslo_messaging_amqp")
self._vhost_test()
class FakeBroker(threading.Thread):
"""A test AMQP message 'broker'."""