246 lines
11 KiB
Python
246 lines
11 KiB
Python
# Copyright 2016 AT&T Corp
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import mock
|
|
from oslo_config import cfg
|
|
from oslo_serialization import jsonutils
|
|
import ssl as ssl_module
|
|
|
|
from murano.common.i18n import _
|
|
from murano.common.messaging import mqclient
|
|
from murano.tests.unit import base
|
|
|
|
CONF = cfg.CONF
|
|
|
|
|
|
class MQClientTest(base.MuranoTestCase):
|
|
|
|
@mock.patch('murano.common.messaging.mqclient.kombu')
|
|
def setUp(self, mock_kombu):
|
|
super(MQClientTest, self).setUp()
|
|
self.ssl_client = mqclient.MqClient(login='test_login',
|
|
password='test_password',
|
|
host='test_host', port='test_port',
|
|
virtual_host='test_virtual_host',
|
|
ssl=True, ca_certs=['cert1'],
|
|
insecure=False)
|
|
|
|
mock_kombu.Connection.assert_called_once_with(
|
|
'amqp://{0}:{1}@{2}:{3}/{4}'.format('test_login', 'test_password',
|
|
'test_host', 'test_port',
|
|
'test_virtual_host'),
|
|
ssl={'ca_certs': ['cert1'], 'cert_reqs': ssl_module.CERT_REQUIRED})
|
|
self.assertEqual(mock_kombu.Connection(), self.ssl_client._connection)
|
|
self.assertIsNone(self.ssl_client._channel)
|
|
self.assertFalse(self.ssl_client._connected)
|
|
|
|
@mock.patch('murano.common.messaging.mqclient.kombu', autospec=True)
|
|
def test_client_initialization_with_ssl_version(self, mock_kombu):
|
|
ssl_versions = (
|
|
('tlsv1', getattr(ssl_module, 'PROTOCOL_TLSv1', None)),
|
|
('tlsv1_1', getattr(ssl_module, 'PROTOCOL_TLSv1_1', None)),
|
|
('tlsv1_2', getattr(ssl_module, 'PROTOCOL_TLSv1_2', None)),
|
|
('sslv2', getattr(ssl_module, 'PROTOCOL_SSLv2', None)),
|
|
('sslv23', getattr(ssl_module, 'PROTOCOL_SSLv23', None)),
|
|
('sslv3', getattr(ssl_module, 'PROTOCOL_SSLv3', None)))
|
|
exception_count = 0
|
|
|
|
for ssl_name, ssl_version in ssl_versions:
|
|
ssl_kwargs = {
|
|
'login': 'test_login',
|
|
'password': 'test_password',
|
|
'host': 'test_host',
|
|
'port': 'test_port',
|
|
'virtual_host': 'test_virtual_host',
|
|
'ssl': True,
|
|
'ssl_version': ssl_name,
|
|
'ca_certs': ['cert1'],
|
|
'insecure': False
|
|
}
|
|
|
|
# If a ssl_version is not valid, a RuntimeError is thrown.
|
|
# According to the ssl_version docs in config.py, certain versions
|
|
# of TLS may be available depending on the system. So, just
|
|
# check that at least 1 ssl_version works.
|
|
if ssl_version is None:
|
|
e = self.assertRaises(RuntimeError, mqclient.MqClient,
|
|
**ssl_kwargs)
|
|
self.assertEqual(_('Invalid SSL version: %s') % ssl_name,
|
|
e.__str__())
|
|
exception_count += 1
|
|
continue
|
|
|
|
self.ssl_client = mqclient.MqClient(**ssl_kwargs)
|
|
|
|
mock_kombu.Connection.assert_called_once_with(
|
|
'amqp://{0}:{1}@{2}:{3}/{4}'.format(
|
|
'test_login', 'test_password', 'test_host', 'test_port',
|
|
'test_virtual_host'),
|
|
ssl={'ca_certs': ['cert1'],
|
|
'cert_reqs': ssl_module.CERT_REQUIRED,
|
|
'ssl_version': ssl_version})
|
|
self.assertEqual(
|
|
mock_kombu.Connection(), self.ssl_client._connection)
|
|
self.assertIsNone(self.ssl_client._channel)
|
|
self.assertFalse(self.ssl_client._connected)
|
|
mock_kombu.Connection.reset_mock()
|
|
|
|
# Check that at least one ssl_version worked.
|
|
self.assertGreater(len(ssl_versions), exception_count)
|
|
|
|
@mock.patch('murano.common.messaging.mqclient.kombu')
|
|
def test_alternate_client_initializations(self, mock_kombu):
|
|
for ca_cert in ['cert1', None]:
|
|
client = mqclient.MqClient(login='test_login',
|
|
password='test_password',
|
|
host='test_host', port='test_port',
|
|
virtual_host='test_virtual_host',
|
|
ssl=True,
|
|
ca_certs=ca_cert,
|
|
insecure=True)
|
|
|
|
mock_kombu.Connection.assert_called_once_with(
|
|
'amqp://{0}:{1}@{2}:{3}/{4}'.format('test_login',
|
|
'test_password',
|
|
'test_host', 'test_port',
|
|
'test_virtual_host'),
|
|
ssl={'ca_certs': ca_cert,
|
|
'cert_reqs': ssl_module.CERT_OPTIONAL
|
|
if ca_cert else ssl_module.CERT_NONE})
|
|
self.assertEqual(mock_kombu.Connection(),
|
|
client._connection)
|
|
mock_kombu.Connection.reset_mock()
|
|
|
|
client = mqclient.MqClient(login='test_login',
|
|
password='test_password',
|
|
host='test_host', port='test_port',
|
|
virtual_host='test_virtual_host',
|
|
ssl=False,
|
|
ca_certs=None,
|
|
insecure=False)
|
|
|
|
mock_kombu.Connection.assert_called_once_with(
|
|
'amqp://{0}:{1}@{2}:{3}/{4}'.format('test_login',
|
|
'test_password',
|
|
'test_host', 'test_port',
|
|
'test_virtual_host'),
|
|
ssl=None)
|
|
self.assertEqual(mock_kombu.Connection(),
|
|
client._connection)
|
|
|
|
def test_connect(self):
|
|
self.ssl_client.connect()
|
|
|
|
self.ssl_client._connection.connect.assert_called_once_with()
|
|
self.ssl_client._connection.channel.assert_called_once_with()
|
|
self.assertEqual(self.ssl_client._connection.channel(),
|
|
self.ssl_client._channel)
|
|
self.assertTrue(self.ssl_client._connected)
|
|
|
|
def test_close(self):
|
|
self.ssl_client.close()
|
|
|
|
self.ssl_client._connection.close.assert_called_once_with()
|
|
self.assertFalse(self.ssl_client._connected)
|
|
|
|
def test_enter_and_exit(self):
|
|
with self.ssl_client:
|
|
pass
|
|
|
|
self.ssl_client._connection.connect.assert_called_once_with()
|
|
self.ssl_client._connection.close.assert_called_once_with()
|
|
|
|
@mock.patch('murano.common.messaging.mqclient.kombu')
|
|
def test_declare(self, mock_kombu):
|
|
self.ssl_client.connect()
|
|
self.ssl_client.declare(queue='test_queue', exchange='test_exchange',
|
|
enable_ha=True, ttl=1)
|
|
|
|
queue_args = {
|
|
'x-ha-policy': 'all',
|
|
'x-expires': 1
|
|
}
|
|
mock_kombu.Exchange.assert_called_once_with(
|
|
'test_exchange', type='direct', durable=True)
|
|
mock_kombu.Queue.assert_called_once_with('test_queue',
|
|
mock_kombu.Exchange(),
|
|
'test_queue',
|
|
durable=True,
|
|
queue_arguments=queue_args)
|
|
mock_kombu.Queue()().declare.assert_called_once_with()
|
|
mock_kombu.reset_mock()
|
|
|
|
self.ssl_client.declare(queue='test_queue', exchange='test_exchange',
|
|
enable_ha=False, ttl=0)
|
|
|
|
mock_kombu.Exchange.assert_called_once_with(
|
|
'test_exchange', type='direct', durable=True)
|
|
mock_kombu.Queue.assert_called_once_with('test_queue',
|
|
mock_kombu.Exchange(),
|
|
'test_queue',
|
|
durable=True,
|
|
queue_arguments={})
|
|
|
|
def test_declare_except_runtime_error(self):
|
|
with self.assertRaisesRegex(RuntimeError,
|
|
'Not connected to RabbitMQ'):
|
|
self.ssl_client.declare(None)
|
|
|
|
@mock.patch('murano.common.messaging.mqclient.kombu')
|
|
def test_send(self, mock_kombu):
|
|
mock_message = mock.MagicMock(body='test_message', id=3)
|
|
|
|
self.ssl_client.connect()
|
|
self.ssl_client.send(mock_message, 'test_key', 'test_exchange')
|
|
|
|
mock_kombu.Producer.assert_called_once_with(
|
|
self.ssl_client._connection)
|
|
mock_kombu.Producer().publish.assert_called_once_with(
|
|
exchange='test_exchange', routing_key='test_key',
|
|
body=jsonutils.dumps('test_message'), message_id='3',
|
|
headers=None)
|
|
|
|
@mock.patch('murano.common.messaging.mqclient.kombu')
|
|
def test_send_signed(self, mock_kombu):
|
|
mock_message = mock.MagicMock(body='test_message', id=3)
|
|
|
|
signer = lambda msg: "SIGNATURE"
|
|
self.ssl_client.connect()
|
|
self.ssl_client.send(mock_message, 'test_key', 'test_exchange', signer)
|
|
|
|
mock_kombu.Producer.assert_called_once_with(
|
|
self.ssl_client._connection)
|
|
mock_kombu.Producer().publish.assert_called_once_with(
|
|
exchange='test_exchange', routing_key='test_key',
|
|
body=jsonutils.dumps('test_message'), message_id='3',
|
|
headers={'signature': 'SIGNATURE'})
|
|
|
|
def test_send_except_runtime_error(self):
|
|
with self.assertRaisesRegex(RuntimeError,
|
|
'Not connected to RabbitMQ'):
|
|
self.ssl_client.send(None, None)
|
|
|
|
@mock.patch('murano.common.messaging.mqclient.subscription')
|
|
def test_open(self, mock_subscription):
|
|
self.ssl_client.connect()
|
|
self.ssl_client.open('test_queue', prefetch_count=2)
|
|
mock_subscription.Subscription.assert_called_once_with(
|
|
self.ssl_client._connection, 'test_queue', 2)
|
|
|
|
def test_open_except_runtime_error(self):
|
|
with self.assertRaisesRegex(RuntimeError,
|
|
'Not connected to RabbitMQ'):
|
|
self.ssl_client.open(None)
|