diff --git a/oslo_messaging/_drivers/amqp1_driver/addressing.py b/oslo_messaging/_drivers/amqp1_driver/addressing.py index 1a99caed1..748a54abd 100644 --- a/oslo_messaging/_drivers/amqp1_driver/addressing.py +++ b/oslo_messaging/_drivers/amqp1_driver/addressing.py @@ -104,40 +104,49 @@ class Addresser(object): """Address used for shared subscribers (competing consumers) """ + def _concat(self, sep, items): + return sep.join(filter(bool, items)) + class LegacyAddresser(Addresser): """Legacy addresses are in the following format: - multicast: '$broadcast_prefix.$exchange.$topic.all' - unicast: '$server_prefix.$exchange.$topic.$server' - anycast: '$group_prefix.$exchange.$topic' + multicast: '$broadcast_prefix[.$vhost].$exchange.$topic.all' + unicast: '$server_prefix[.$vhost].$exchange.$topic.$server' + anycast: '$group_prefix[.$vhost].$exchange.$topic' Legacy addresses do not distinguish RPC traffic from Notification traffic """ def __init__(self, default_exchange, server_prefix, broadcast_prefix, - group_prefix): + group_prefix, vhost): super(LegacyAddresser, self).__init__(default_exchange) self._server_prefix = server_prefix self._broadcast_prefix = broadcast_prefix self._group_prefix = group_prefix + self._vhost = vhost def multicast_address(self, target, service): - return self._concatenate([self._broadcast_prefix, - target.exchange or self._default_exchange, - target.topic, "all"]) + return self._concat(".", + [self._broadcast_prefix, + self._vhost, + target.exchange or self._default_exchange, + target.topic, + "all"]) def unicast_address(self, target, service=SERVICE_RPC): - return self._concatenate([self._server_prefix, - target.exchange or self._default_exchange, - target.topic, target.server]) + return self._concat(".", + [self._server_prefix, + self._vhost, + target.exchange or self._default_exchange, + target.topic, + target.server]) def anycast_address(self, target, service=SERVICE_RPC): - return self._concatenate([self._group_prefix, - target.exchange or self._default_exchange, - target.topic]) - - def _concatenate(self, items): - return ".".join(filter(bool, items)) + return self._concat(".", + [self._group_prefix, + self._vhost, + target.exchange or self._default_exchange, + target.topic]) # for debug: def _is_multicast(self, address): @@ -162,21 +171,25 @@ class RoutableAddresser(Addresser): 'anycast'. The delivery semantics are followed by information pulled from the Target. The template is: - $prefix/$semantics/$exchange/$topic[/$server] + $prefix/$semantics[/$vhost]/$exchange/$topic[/$server] Examples based on the default prefix and semantic values: - rpc-unicast: "openstack.org/om/rpc/unicast/$exchange/$topic/$server" - notify-anycast: "openstack.org/om/notify/anycast/$exchange/$topic" + rpc-unicast: "openstack.org/om/rpc/unicast/my-exchange/my-topic/my-server" + notify-anycast: "openstack.org/om/notify/anycast/my-vhost/exchange/topic" """ def __init__(self, default_exchange, rpc_exchange, rpc_prefix, notify_exchange, notify_prefix, unicast_tag, multicast_tag, - anycast_tag): + anycast_tag, vhost): super(RoutableAddresser, self).__init__(default_exchange) if not self._default_exchange: self._default_exchange = "openstack" + # templates for address generation: + + self._vhost = vhost + _rpc = rpc_prefix + "/" self._rpc_prefix = _rpc self._rpc_unicast = _rpc + unicast_tag @@ -201,32 +214,34 @@ class RoutableAddresser(Addresser): prefix = self._rpc_multicast else: prefix = self._notify_multicast - return "%s/%s/%s" % (prefix, + return self._concat("/", + [prefix, + self._vhost, target.exchange or self._exchange[service], - target.topic) + target.topic]) def unicast_address(self, target, service=SERVICE_RPC): if service == SERVICE_RPC: prefix = self._rpc_unicast else: prefix = self._notify_unicast - if target.server: - return "%s/%s/%s/%s" % (prefix, - target.exchange or self._exchange[service], - target.topic, - target.server) - return "%s/%s/%s" % (prefix, + return self._concat("/", + [prefix, + self._vhost, target.exchange or self._exchange[service], - target.topic) + target.topic, + target.server]) def anycast_address(self, target, service=SERVICE_RPC): if service == SERVICE_RPC: prefix = self._rpc_anycast else: prefix = self._notify_anycast - return "%s/%s/%s" % (prefix, + return self._concat("/", + [prefix, + self._vhost, target.exchange or self._exchange[service], - target.topic) + target.topic]) # for debug: def _is_multicast(self, address): @@ -255,7 +270,7 @@ class AddresserFactory(object): self._mode = mode self._kwargs = kwargs - def __call__(self, remote_properties): + def __call__(self, remote_properties, vhost=None): # for backwards compatibility use legacy if dynamic and we're connected # to qpidd or we cannot identify the message bus. This can be # overridden via the configuration. @@ -275,7 +290,8 @@ class AddresserFactory(object): return LegacyAddresser(self._default_exchange, self._kwargs['legacy_server_prefix'], self._kwargs['legacy_broadcast_prefix'], - self._kwargs['legacy_group_prefix']) + self._kwargs['legacy_group_prefix'], + vhost) else: return RoutableAddresser(self._default_exchange, self._kwargs.get("rpc_exchange"), @@ -284,4 +300,5 @@ class AddresserFactory(object): self._kwargs["notify_prefix"], self._kwargs["unicast"], self._kwargs["multicast"], - self._kwargs["anycast"]) + self._kwargs["anycast"], + vhost) diff --git a/oslo_messaging/_drivers/amqp1_driver/controller.py b/oslo_messaging/_drivers/amqp1_driver/controller.py index d150d99bf..0583d0e13 100644 --- a/oslo_messaging/_drivers/amqp1_driver/controller.py +++ b/oslo_messaging/_drivers/amqp1_driver/controller.py @@ -768,11 +768,12 @@ class Hosts(object): configuration and are used only if no username/password/realm is present in the URL. """ - def __init__(self, entries=None, default_username=None, + def __init__(self, url, default_username=None, default_password=None, default_realm=None): - if entries: - self._entries = entries[:] + self.virtual_host = url.virtual_host + if url.hosts: + self._entries = url.hosts[:] else: self._entries = [transport.TransportHost(hostname="localhost", port=5672)] @@ -797,7 +798,8 @@ class Hosts(object): return '' def __str__(self): - return ", ".join(["%r" % th for th in self._entries]) + r = ', vhost=%s' % self.virtual_host if self.virtual_host else '' + return ", ".join(["%r" % th for th in self._entries]) + r class Controller(pyngus.ConnectionEventHandler): @@ -807,7 +809,7 @@ class Controller(pyngus.ConnectionEventHandler): work is done on the Eventloop thread, allowing the driver to run asynchronously from the messaging clients. """ - def __init__(self, hosts, default_exchange, config): + def __init__(self, url, default_exchange, config): self.processor = None self._socket_connection = None self._node = platform.node() or "" @@ -839,10 +841,12 @@ class Controller(pyngus.ConnectionEventHandler): self.ssl_key_password = config.oslo_messaging_amqp.ssl_key_password self.ssl_allow_insecure = \ config.oslo_messaging_amqp.allow_insecure_clients + self.ssl_verify_vhost = config.oslo_messaging_amqp.ssl_verify_vhost + self.pseudo_vhost = config.oslo_messaging_amqp.pseudo_vhost self.sasl_mechanisms = config.oslo_messaging_amqp.sasl_mechanisms self.sasl_config_dir = config.oslo_messaging_amqp.sasl_config_dir self.sasl_config_name = config.oslo_messaging_amqp.sasl_config_name - self.hosts = Hosts(hosts, config.oslo_messaging_amqp.username, + self.hosts = Hosts(url, config.oslo_messaging_amqp.username, config.oslo_messaging_amqp.password, config.oslo_messaging_amqp.sasl_default_realm) self.conn_retry_interval = \ @@ -974,20 +978,44 @@ class Controller(pyngus.ConnectionEventHandler): host = self.hosts.current conn_props = {'properties': {u'process': self._command, u'pid': self._pid, - u'node': self._node}, - 'hostname': host.hostname} + u'node': self._node}} + # only set hostname in the AMQP 1.0 Open performative if the message + # bus can interpret it as the virtual host. We leave it unspecified + # since apparently noone can agree on how it should be used otherwise! + if self.hosts.virtual_host and not self.pseudo_vhost: + conn_props['hostname'] = self.hosts.virtual_host if self.idle_timeout: conn_props["idle-time-out"] = float(self.idle_timeout) if self.trace_protocol: conn_props["x-trace-protocol"] = self.trace_protocol + + # SSL configuration + ssl_enabled = False if self.ssl: + ssl_enabled = True conn_props["x-ssl"] = self.ssl if self.ssl_ca_file: conn_props["x-ssl-ca-file"] = self.ssl_ca_file + ssl_enabled = True if self.ssl_cert_file: + ssl_enabled = True conn_props["x-ssl-identity"] = (self.ssl_cert_file, self.ssl_key_file, self.ssl_key_password) + if ssl_enabled: + # Set the identity of the remote server for SSL to use when + # verifying the received certificate. Typically this is the DNS + # name used to set up the TCP connections. However some servers + # may provide a certificate for the virtual host instead. If that + # is the case we need to use the virtual hostname instead. + # Refer to SSL Server Name Indication (SNI) for the entire story: + # https://tools.ietf.org/html/rfc6066 + if self.ssl_verify_vhost: + if self.hosts.virtual_host: + conn_props['x-ssl-peer-name'] = self.hosts.virtual_host + else: + conn_props['x-ssl-peer-name'] = host.hostname + # SASL configuration: if self.sasl_mechanisms: conn_props["x-sasl-mechs"] = self.sasl_mechanisms @@ -1052,9 +1080,12 @@ class Controller(pyngus.ConnectionEventHandler): point, we are ready to receive messages, so start all pending RPC requests. """ - LOG.info(_LI("Messaging is active (%(hostname)s:%(port)s)"), + LOG.info(_LI("Messaging is active (%(hostname)s:%(port)s%(vhost)s)"), {'hostname': self.hosts.current.hostname, - 'port': self.hosts.current.port}) + 'port': self.hosts.current.port, + 'vhost': ("/" + self.hosts.virtual_host + if self.hosts.virtual_host else "")}) + for sender in itervalues(self._all_senders): sender.attach(self._socket_connection.pyngus_conn, self.reply_link, self.addresser) @@ -1099,7 +1130,9 @@ class Controller(pyngus.ConnectionEventHandler): # allocate an addresser based on the advertised properties of the # message bus props = connection.remote_properties or {} - self.addresser = self.addresser_factory(props) + self.addresser = self.addresser_factory(props, + self.hosts.virtual_host + if self.pseudo_vhost else None) for servers in itervalues(self._servers): for server in itervalues(servers): server.attach(self._socket_connection.pyngus_conn, diff --git a/oslo_messaging/_drivers/amqp1_driver/eventloop.py b/oslo_messaging/_drivers/amqp1_driver/eventloop.py index 34added77..dda40a1ff 100644 --- a/oslo_messaging/_drivers/amqp1_driver/eventloop.py +++ b/oslo_messaging/_drivers/amqp1_driver/eventloop.py @@ -346,7 +346,15 @@ class Thread(threading.Thread): """Run the proton event/timer loop.""" LOG.debug("Starting Proton thread, container=%s", self._container.name) + try: + self._main_loop() + except Exception: + # unknown error - fatal + LOG.exception("Fatal unhandled event loop error!") + raise + def _main_loop(self): + # Main event loop while not self._shutdown: readfds = [self._requests] diff --git a/oslo_messaging/_drivers/amqp1_driver/opts.py b/oslo_messaging/_drivers/amqp1_driver/opts.py index 20860306b..d40b6119b 100644 --- a/oslo_messaging/_drivers/amqp1_driver/opts.py +++ b/oslo_messaging/_drivers/amqp1_driver/opts.py @@ -60,6 +60,17 @@ amqp1_opts = [ secret=True, help='Password for decrypting ssl_key_file (if encrypted)'), + cfg.BoolOpt('ssl_verify_vhost', + default=False, + help="By default SSL checks that the name in the server's" + " certificate matches the hostname in the transport_url. In" + " some configurations it may be preferable to use the virtual" + " hostname instead, for example if the server uses the Server" + " Name Indication TLS extension (rfc6066) to provide a" + " certificate per virtual host. Set ssl_verify_vhost to True" + " if the server's SSL certificate uses the virtual host name" + " instead of the DNS name."), + cfg.BoolOpt('allow_insecure_clients', default=False, deprecated_group='amqp1', @@ -172,6 +183,17 @@ amqp1_opts = [ "'dynamic' - use legacy addresses if the message bus does not" " support routing otherwise use routable addressing"), + cfg.BoolOpt('pseudo_vhost', + default=True, + help="Enable virtual host support for those message buses" + " that do not natively support virtual hosting (such as" + " qpidd). When set to true the virtual host name will be" + " added to all message bus addresses, effectively creating" + " a private 'subnet' per virtual host. Set to False if the" + " message bus supports virtual hosting using the 'hostname'" + " field in the AMQP 1.0 Open performative as the name of the" + " virtual host."), + # Legacy addressing customization: cfg.StrOpt('server_request_prefix', diff --git a/oslo_messaging/_drivers/impl_amqp1.py b/oslo_messaging/_drivers/impl_amqp1.py index 866cc917c..188bbaa17 100644 --- a/oslo_messaging/_drivers/impl_amqp1.py +++ b/oslo_messaging/_drivers/impl_amqp1.py @@ -202,7 +202,6 @@ class ProtonDriver(base.BaseDriver): conf.register_opts(opts.amqp1_opts, group=opt_group) conf = common.ConfigOptsProxy(conf, url, opt_group.name) - self._hosts = url.hosts self._conf = conf self._default_exchange = default_exchange @@ -257,7 +256,7 @@ class ProtonDriver(base.BaseDriver): self._ctrl = None # Create a Controller that connects to the messaging # service: - self._ctrl = controller.Controller(self._hosts, + self._ctrl = controller.Controller(self._url, self._default_exchange, self._conf) self._ctrl.connect() diff --git a/oslo_messaging/tests/drivers/test_amqp_driver.py b/oslo_messaging/tests/drivers/test_amqp_driver.py index e245e78c9..09020d458 100644 --- a/oslo_messaging/tests/drivers/test_amqp_driver.py +++ b/oslo_messaging/tests/drivers/test_amqp_driver.py @@ -12,6 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. +import copy import logging import os import select @@ -1515,19 +1516,12 @@ class TestSSL(test_utils.BaseTestCase): self._tmpdir = None self.skipTest("OpenSSL tools not installed - skipping") - def test_server_ok(self): - # test client authenticates server - self._broker = FakeBroker(self.conf.oslo_messaging_amqp, - sock_addr=self._ssl_config['s_name'], - ssl_config=self._ssl_config) - url = oslo_messaging.TransportURL.parse(self.conf, "amqp://%s:%d" % - (self._broker.host, - self._broker.port)) + def _ssl_server_ok(self, url): self._broker.start() - self.config(ssl_ca_file=self._ssl_config['ca_cert'], group='oslo_messaging_amqp') - driver = amqp_driver.ProtonDriver(self.conf, url) + tport_url = oslo_messaging.TransportURL.parse(self.conf, url) + driver = amqp_driver.ProtonDriver(self.conf, tport_url) target = oslo_messaging.Target(topic="test-topic") listener = _ListenerThread( driver.listen(target, None, None)._poll_style_listener, 1) @@ -1541,6 +1535,34 @@ class TestSSL(test_utils.BaseTestCase): self.assertFalse(listener.isAlive()) driver.cleanup() + def test_server_ok(self): + # test client authenticates server + self._broker = FakeBroker(self.conf.oslo_messaging_amqp, + sock_addr=self._ssl_config['s_name'], + ssl_config=self._ssl_config) + url = "amqp://%s:%d" % (self._broker.host, self._broker.port) + self._ssl_server_ok(url) + + def test_server_ignore_vhost_ok(self): + # test client authenticates server and ignores vhost + self._broker = FakeBroker(self.conf.oslo_messaging_amqp, + sock_addr=self._ssl_config['s_name'], + ssl_config=self._ssl_config) + url = "amqp://%s:%d/my-vhost" % (self._broker.host, self._broker.port) + self._ssl_server_ok(url) + + def test_server_check_vhost_ok(self): + # test client authenticates server using vhost as CN + # Use 'Invalid' from bad_cert CN + self.config(ssl_verify_vhost=True, group='oslo_messaging_amqp') + self._ssl_config['s_cert'] = self._ssl_config['bad_cert'] + self._ssl_config['s_key'] = self._ssl_config['bad_key'] + self._broker = FakeBroker(self.conf.oslo_messaging_amqp, + sock_addr=self._ssl_config['s_name'], + ssl_config=self._ssl_config) + url = "amqp://%s:%d/Invalid" % (self._broker.host, self._broker.port) + self._ssl_server_ok(url) + @mock.patch('ssl.get_default_verify_paths') def test_server_ok_with_ssl_set_in_transport_url(self, mock_verify_paths): # test client authenticates server @@ -1630,6 +1652,90 @@ class TestSSL(test_utils.BaseTestCase): super(TestSSL, self).tearDown() +@testtools.skipUnless(pyngus, "proton modules not present") +class TestVHost(_AmqpBrokerTestCaseAuto): + """Verify the pseudo virtual host behavior""" + + def _vhost_test(self): + """Verify that all messaging for a particular vhost stays on that vhost + """ + self.config(pseudo_vhost=True, + group="oslo_messaging_amqp") + + vhosts = ["None", "HOSTA", "HOSTB", "HOSTC"] + target = oslo_messaging.Target(topic="test-topic") + fanout = oslo_messaging.Target(topic="test-topic", fanout=True) + + listeners = {} + ldrivers = {} + sdrivers = {} + + replies = {} + msgs = {} + + for vhost in vhosts: + url = copy.copy(self._broker_url) + url.virtual_host = vhost if vhost != "None" else None + ldriver = amqp_driver.ProtonDriver(self.conf, url) + listeners[vhost] = _ListenerThread( + ldriver.listen(target, None, None)._poll_style_listener, + 10) + ldrivers[vhost] = ldriver + sdrivers[vhost] = amqp_driver.ProtonDriver(self.conf, url) + replies[vhost] = [] + msgs[vhost] = [] + + # send a fanout and a single rpc call to each listener + for vhost in vhosts: + if vhost == "HOSTC": # expect no messages to HOSTC + continue + sdrivers[vhost].send(fanout, + {"context": vhost}, + {"vhost": vhost, + "fanout": True, + "id": vhost}) + replies[vhost].append(sdrivers[vhost].send(target, + {"context": vhost}, + {"method": "echo", + "id": vhost}, + wait_for_reply=True)) + time.sleep(1) + + for vhost in vhosts: + msgs[vhost] += listeners[vhost].get_messages() + if vhost == "HOSTC": + # HOSTC should get nothing + self.assertEqual(0, len(msgs[vhost])) + self.assertEqual(0, len(replies[vhost])) + continue + + self.assertEqual(2, len(msgs[vhost])) + for m in msgs[vhost]: + # the id must match the vhost + self.assertEqual(vhost, m.message.get("id")) + self.assertEqual(1, len(replies[vhost])) + for m in replies[vhost]: + # same for correlation id + self.assertEqual(vhost, m.get("correlation-id")) + + for vhost in vhosts: + listeners[vhost].kill() + ldrivers[vhost].cleanup + sdrivers[vhost].cleanup() + + def test_vhost_routing(self): + """Test vhost using routable addresses + """ + self.config(addressing_mode='routable', group="oslo_messaging_amqp") + self._vhost_test() + + def test_vhost_legacy(self): + """Test vhost using legacy addresses + """ + self.config(addressing_mode='legacy', group="oslo_messaging_amqp") + self._vhost_test() + + class FakeBroker(threading.Thread): """A test AMQP message 'broker'."""