From f4ebbea9bd55b2288a918f02f631b6eaab76a6b8 Mon Sep 17 00:00:00 2001 From: ptoohill1 Date: Fri, 5 Jun 2015 03:25:44 -0500 Subject: [PATCH] Registering Barbican consumers The patch will register the consumer for the Barbican service upon validation. If an error is caught it will also de-reg the consumer. Change-Id: I0377222c9492185a7b0462eb935179963027ed37 --- etc/neutron_lbaas.conf | 12 +- .../cert_manager/barbican_cert_manager.py | 48 +-- .../common/cert_manager/cert_manager.py | 5 +- neutron_lbaas/common/keystone.py | 113 ++++++ neutron_lbaas/extensions/loadbalancerv2.py | 4 + neutron_lbaas/services/loadbalancer/plugin.py | 25 +- .../unit/common/cert_manager/test_barbican.py | 328 +----------------- .../db/loadbalancer/test_db_loadbalancerv2.py | 90 ++++- 8 files changed, 256 insertions(+), 369 deletions(-) create mode 100644 neutron_lbaas/common/keystone.py diff --git a/etc/neutron_lbaas.conf b/etc/neutron_lbaas.conf index 4d188f713..88fcfc215 100644 --- a/etc/neutron_lbaas.conf +++ b/etc/neutron_lbaas.conf @@ -43,6 +43,16 @@ # quota_health_monitor. # quota_healthmonitor = -1 +[service_auth] +# auth_url = http://127.0.0.1:5000/v2.0 +# admin_tenant_name = %SERVICE_TENANT_NAME% +# admin_user = %SERVICE_USER% +# admin_password = %SERVICE_PASSWORD% +# admin_user_domain = %SERVICE_USER_DOMAIN% +# admin_project_domain = %SERVICE_PROJECT_DOMAIN% +# region = %REGION% +# service_name = lbaas +# auth_version = 2 [service_providers] # Must be in form: @@ -63,7 +73,7 @@ service_provider=LOADBALANCER:Haproxy:neutron_lbaas.services.loadbalancer.driver # service_provider = LOADBALANCERV2:Octavia:neutron_lbaas.drivers.octavia.driver.OctaviaDriver:default # service_provider = LOADBALANCERV2:LoggingNoop:neutron_lbaas.drivers.logging_noop.driver.LoggingNoopLoadBalancerDriver:default # service_provider=LOADBALANCERV2:Haproxy:neutron_lbaas.drivers.haproxy.plugin_driver.HaproxyOnHostPluginDriver:default -# service_provider = LOADBALANCERV2:A10Networks:neutron_lbaas.drivers.a10networks.driver_v2.ThunderDriver:default +# service_provider = LOADBALANCERV2:A10Networks:neutron_lbaas.drivers.a10networks.driver_v2.ThunderDriver:default # service_provider = LOADBALANCERV2:brocade:neutron_lbaas.drivers.brocade.driver_v2.BrocadeLoadBalancerDriver:default # service_provider = LOADBALANCERV2:kemptechnologies:neutron_lbaas.drivers.kemptechnologies.driver_v2.KempLoadMasterDriver:default diff --git a/neutron_lbaas/common/cert_manager/barbican_cert_manager.py b/neutron_lbaas/common/cert_manager/barbican_cert_manager.py index 1ad38ee0f..1f24d177e 100644 --- a/neutron_lbaas/common/cert_manager/barbican_cert_manager.py +++ b/neutron_lbaas/common/cert_manager/barbican_cert_manager.py @@ -13,20 +13,17 @@ # under the License. from barbicanclient import client as barbican_client -from keystoneclient import session -from keystoneclient.v2_0 import client as v2_client -from keystoneclient.v3 import client as v3_client from neutron.i18n import _LI, _LW, _LE from oslo_config import cfg from oslo_log import log as logging from oslo_utils import excutils from neutron_lbaas.common.cert_manager import cert_manager +from neutron_lbaas.common import keystone LOG = logging.getLogger(__name__) CONF = cfg.CONF -cfg.CONF.import_group('keystone_authtoken', 'keystonemiddleware.auth_token') class Cert(cert_manager.Cert): @@ -63,40 +60,6 @@ class BarbicanKeystoneAuth(object): _keystone_session = None _barbican_client = None - @classmethod - def _get_keystone_session(cls): - """Initializes a Keystone session. - - :return: a Keystone Session object - :raises Exception: if the session cannot be established - """ - if not cls._keystone_session: - try: - if CONF.keystone_authtoken.auth_version.lower() == 'v2': - kc = v2_client.Client( - username=CONF.keystone_authtoken.admin_user, - password=CONF.keystone_authtoken.admin_password, - tenant_name=CONF.keystone_authtoken.admin_tenant_name, - auth_url=CONF.keystone_authtoken.auth_uri - ) - elif CONF.keystone_authtoken.auth_version.lower() == 'v3': - kc = v3_client.Client( - username=CONF.keystone_authtoken.admin_user, - password=CONF.keystone_authtoken.admin_password, - tenant_name=CONF.keystone_authtoken.admin_tenant_name, - auth_url=CONF.keystone_authtoken.auth_uri - ) - else: - raise Exception('Unknown authentication version') - cls._keystone_session = session.Session(auth=kc) - except Exception: - # Keystone sometimes masks exceptions strangely -- this will - # reraise the original exception, while also providing useful - # feedback in the logs for debugging - with excutils.save_and_reraise_exception(): - LOG.exception(_LE("Error creating Keystone session")) - return cls._keystone_session - @classmethod def get_barbican_client(cls): """Creates a Barbican client object. @@ -106,8 +69,9 @@ class BarbicanKeystoneAuth(object): """ if not cls._barbican_client: try: + cls._keystone_session = keystone.get_session() cls._barbican_client = barbican_client.Client( - session=cls._get_keystone_session() + session=cls._keystone_session ) except Exception: # Barbican (because of Keystone-middleware) sometimes masks @@ -204,7 +168,8 @@ class CertManager(cert_manager.CertManager): LOG.exception(_LE("Error storing certificate data")) @staticmethod - def get_cert(cert_ref, service_name='Octavia', resource_ref=None, + def get_cert(cert_ref, service_name='lbaas', + resource_ref=None, check_only=False, **kwargs): """Retrieves the specified cert and registers as a consumer. @@ -239,8 +204,7 @@ class CertManager(cert_manager.CertManager): LOG.exception(_LE("Error getting {0}").format(cert_ref)) @staticmethod - def delete_cert(cert_ref, service_name='Octavia', resource_ref=None, - **kwargs): + def delete_cert(cert_ref, resource_ref, service_name='lbaas', **kwargs): """Deregister as a consumer for the specified cert. :param cert_ref: the UUID of the cert to retrieve diff --git a/neutron_lbaas/common/cert_manager/cert_manager.py b/neutron_lbaas/common/cert_manager/cert_manager.py index 7c54250e0..6baca073b 100644 --- a/neutron_lbaas/common/cert_manager/cert_manager.py +++ b/neutron_lbaas/common/cert_manager/cert_manager.py @@ -65,7 +65,8 @@ class CertManager(object): pass @abc.abstractmethod - def get_cert(self, cert_ref, check_only=False, **kwargs): + def get_cert(self, cert_ref, check_only=False, + resource_ref=None, **kwargs): """Retrieves the specified cert. If check_only is True, don't perform any sort of registration. @@ -75,7 +76,7 @@ class CertManager(object): pass @abc.abstractmethod - def delete_cert(self, cert_ref, **kwargs): + def delete_cert(self, cert_ref, resource_ref, **kwargs): """Deletes the specified cert. If the specified cert does not exist, a CertificateStorageException diff --git a/neutron_lbaas/common/keystone.py b/neutron_lbaas/common/keystone.py new file mode 100644 index 000000000..f7f8f3bb7 --- /dev/null +++ b/neutron_lbaas/common/keystone.py @@ -0,0 +1,113 @@ +# Copyright 2015 Rackspace +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from keystoneclient.auth.identity import v2 as v2_client +from keystoneclient.auth.identity import v3 as v3_client +from keystoneclient import session +from oslo_config import cfg +from oslo_log import log as logging +from oslo_utils import excutils + +from neutron.i18n import _LE + + +LOG = logging.getLogger(__name__) + +_SESSION = None +OPTS = [ + cfg.StrOpt( + 'auth_url', + default='http://127.0.0.1:5000/v2.0', + help=_('Authentication endpoint'), + ), + cfg.StrOpt( + 'admin_user', + default='admin', + help=_('The service admin user name'), + ), + cfg.StrOpt( + 'admin_tenant_name', + default='admin', + help=_('The service admin tenant name'), + ), + cfg.StrOpt( + 'admin_password', + default='password', + help=_('The service admin password'), + ), + cfg.StrOpt( + 'admin_user_domain', + default='admin', + help=_('The admin user domain name'), + ), + cfg.StrOpt( + 'admin_project_domain', + default='admin', + help=_('The admin project domain name'), + ), + cfg.StrOpt( + 'region', + default='RegionOne', + help=_('The deployment region'), + ), + cfg.StrOpt( + 'service_name', + default='lbaas', + help=_('The name of the service'), + ), + cfg.StrOpt( + 'auth_version', + default='2', + help=_('The auth version used to authenticate'), + ) +] + +cfg.CONF.register_opts(OPTS, 'service_auth') + + +def get_session(): + """Initializes a Keystone session. + + :return: a Keystone Session object + :raises Exception: if the session cannot be established + """ + global _SESSION + if not _SESSION: + + auth_url = cfg.CONF.service_auth.auth_url + kwargs = {'auth_url': auth_url, + 'username': cfg.CONF.service_auth.admin_user, + 'password': cfg.CONF.service_auth.admin_password} + + if cfg.CONF.service_auth.auth_version == '2': + client = v2_client + kwargs['tenant_name'] = cfg.CONF.service_auth.admin_tenant_name + elif cfg.CONF.service_auth.auth_version == '3': + client = v3_client + kwargs['project_name'] = cfg.CONF.service_auth.admin_tenant_name + kwargs['user_domain_name'] = (cfg.CONF.service_auth. + admin_user_domain) + kwargs['project_domain_name'] = (cfg.CONF.service_auth. + admin_project_domain) + else: + raise Exception('Unknown keystone version!') + + try: + kc = client.Password(**kwargs) + _SESSION = session.Session(auth=kc) + except Exception: + with excutils.save_and_reraise_exception(): + LOG.exception(_LE("Error creating Keystone session.")) + + return _SESSION diff --git a/neutron_lbaas/extensions/loadbalancerv2.py b/neutron_lbaas/extensions/loadbalancerv2.py index 60e4ba0c0..03ff506b3 100644 --- a/neutron_lbaas/extensions/loadbalancerv2.py +++ b/neutron_lbaas/extensions/loadbalancerv2.py @@ -120,6 +120,10 @@ class TLSContainerInvalid(nexception.NeutronException): message = _("TLS container %(container_id)s is invalid. %(reason)s") +class CertManagerError(nexception.NeutronException): + message = _("Could not process TLS container %(ref)s, %(reason)s") + + RESOURCE_ATTRIBUTE_MAP = { 'loadbalancers': { 'id': {'allow_post': False, 'allow_put': False, diff --git a/neutron_lbaas/services/loadbalancer/plugin.py b/neutron_lbaas/services/loadbalancer/plugin.py index 7ae7c46da..7d552e407 100644 --- a/neutron_lbaas/services/loadbalancer/plugin.py +++ b/neutron_lbaas/services/loadbalancer/plugin.py @@ -573,10 +573,17 @@ class LoadBalancerPluginv2(loadbalancerv2.LoadBalancerPluginBaseV2): cert_container = None try: cert_container = CERT_MANAGER_PLUGIN.CertManager.get_cert( - container_ref, check_only=True) - except Exception: - raise loadbalancerv2.TLSContainerNotFound( - container_id=container_ref) + container_ref, + resource_ref=self._get_service_url(listener)) + except Exception as e: + if hasattr(e, 'status_code') and e.status_code == 404: + raise loadbalancerv2.TLSContainerNotFound( + container_id=container_ref) + else: + # Could be a keystone configuration error... + raise loadbalancerv2.CertManagerError( + ref=container_ref, reason=e.message + ) try: cert_parser.validate_cert( @@ -586,6 +593,8 @@ class LoadBalancerPluginv2(loadbalancerv2.LoadBalancerPluginBaseV2): cert_container.get_private_key_passphrase()), intermediates=cert_container.get_intermediates()) except Exception as e: + CERT_MANAGER_PLUGIN.CertManager.delete_cert( + container_ref, self._get_service_url(listener)) raise loadbalancerv2.TLSContainerInvalid( container_id=container_ref, reason=str(e)) @@ -620,6 +629,14 @@ class LoadBalancerPluginv2(loadbalancerv2.LoadBalancerPluginBaseV2): return len(to_validate) > 0 + def _get_service_url(self, listener): + # Format: ://// + return "{0}://{1}/{2}/{3}".format( + cfg.CONF.service_auth.service_name, + cfg.CONF.service_auth.region, + constants.LOADBALANCER, + listener.get('loadbalancer_id')) + def create_listener(self, context, listener): listener = listener.get('listener') lb_id = listener.get('loadbalancer_id') diff --git a/neutron_lbaas/tests/unit/common/cert_manager/test_barbican.py b/neutron_lbaas/tests/unit/common/cert_manager/test_barbican.py index 15277e3b6..2bd444d29 100644 --- a/neutron_lbaas/tests/unit/common/cert_manager/test_barbican.py +++ b/neutron_lbaas/tests/unit/common/cert_manager/test_barbican.py @@ -1,4 +1,4 @@ -# Copyright 2014, 2015 Rackspace US, Inc. +# Copyright 2014 Rackspace # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -12,109 +12,43 @@ # License for the specific language governing permissions and limitations # under the License. -import uuid - from barbicanclient import client as barbican_client -from barbicanclient import containers -from barbicanclient import secrets -from keystoneclient import session import mock -from oslo_config import fixture -from neutron_lbaas.common.cert_manager import barbican_cert_manager as bcm -from neutron_lbaas.common.cert_manager import cert_manager -from neutron_lbaas.tests import base +import neutron_lbaas.common.cert_manager.barbican_cert_manager as bbq_common +from neutron_lbaas.common import keystone +import neutron_lbaas.tests.base as base class TestBarbicanAuth(base.BaseTestCase): def setUp(self): - # Reset the session and client - bcm.BarbicanKeystoneAuth._keystone_session = None - bcm.BarbicanKeystoneAuth._barbican_client = None + # Reset the client + bbq_common.BarbicanKeystoneAuth._barbican_client = None + keystone._SESSION = None super(TestBarbicanAuth, self).setUp() - def test_get_keystone_client_v2(self): - # self.skip("Temporarily skipping test until a workaround is found.") - bcm.v2_client = mock.MagicMock() - - fixture.Config().config(group='keystone_authtoken', auth_version='v2') - - # There should be no existing session - self.assertIsNone( - bcm.BarbicanKeystoneAuth._keystone_session - ) - - # Get us a session - ks1 = bcm.BarbicanKeystoneAuth._get_keystone_session() - - # Our returned session should also be the saved session - self.assertIsInstance( - bcm.BarbicanKeystoneAuth._keystone_session, - session.Session - ) - self.assertIs( - bcm.BarbicanKeystoneAuth._keystone_session, - ks1 - ) - - # Getting the session again should return the same object - ks2 = bcm.BarbicanKeystoneAuth._get_keystone_session() - self.assertIs(ks1, ks2) - - def test_get_keystone_client_v3(self): - # self.skip("Temporarily skipping test until a workaround is found.") - bcm.v3_client = mock.MagicMock() - - fixture.Config().config(group='keystone_authtoken', auth_version='v3') - - # There should be no existing session - self.assertIsNone( - bcm.BarbicanKeystoneAuth._keystone_session - ) - - # Get us a session - ks1 = bcm.BarbicanKeystoneAuth._get_keystone_session() - - # Our returned session should also be the saved session - self.assertIsInstance( - bcm.BarbicanKeystoneAuth._keystone_session, - session.Session - ) - self.assertIs( - bcm.BarbicanKeystoneAuth._keystone_session, - ks1 - ) - - # Getting the session again should return the same object - ks2 = bcm.BarbicanKeystoneAuth._get_keystone_session() - self.assertIs(ks1, ks2) - def test_get_barbican_client(self): # There should be no existing client - self.assertIsNone( - bcm.BarbicanKeystoneAuth._barbican_client - ) + self.assertIsNone(keystone._SESSION) # Mock out the keystone session and get the client - bcm.BarbicanKeystoneAuth._keystone_session = ( - mock.MagicMock() - ) - bc1 = bcm.BarbicanKeystoneAuth.get_barbican_client() + keystone._SESSION = mock.MagicMock() + bc1 = bbq_common.BarbicanKeystoneAuth.get_barbican_client() # Our returned client should also be the saved client self.assertIsInstance( - bcm.BarbicanKeystoneAuth._barbican_client, + bbq_common.BarbicanKeystoneAuth._barbican_client, barbican_client.Client ) self.assertIs( - bcm.BarbicanKeystoneAuth._barbican_client, + bbq_common.BarbicanKeystoneAuth._barbican_client, bc1 ) # Getting the session again should return the same object - bc2 = bcm.BarbicanKeystoneAuth.get_barbican_client() + bc2 = bbq_common.BarbicanKeystoneAuth.get_barbican_client() self.assertIs(bc1, bc2) @@ -155,7 +89,7 @@ class TestBarbicanCert(base.BaseTestCase): private_key_passphrase=self.private_key_passphrase_secret ) # Create a cert - cert = bcm.Cert( + cert = bbq_common.Cert( cert_container=container ) @@ -165,237 +99,3 @@ class TestBarbicanCert(base.BaseTestCase): self.assertEqual(cert.get_private_key(), self.private_key) self.assertEqual(cert.get_private_key_passphrase(), self.private_key_passphrase) - - def test_barbican_cert_none_values(self): - container = barbican_client.containers.CertificateContainer( - api=mock.MagicMock(), - certificate=None, - intermediates=None, - private_key=None, - private_key_passphrase=None - ) - # Create a cert - cert = bcm.Cert( - cert_container=container - ) - - # Validate the cert functions - self.assertEqual(cert.get_certificate(), None) - self.assertEqual(cert.get_intermediates(), None) - self.assertEqual(cert.get_private_key(), None) - self.assertEqual(cert.get_private_key_passphrase(), None) - - -class TestBarbicanManager(base.BaseTestCase): - - def setUp(self): - # Make a fake Container and contents - self.barbican_endpoint = 'http://localhost:9311/v1' - self.container_uuid = uuid.uuid4() - - self.container_ref = '{0}/containers/{1}'.format( - self.barbican_endpoint, self.container_uuid - ) - - self.name = 'My Fancy Cert' - self.private_key = mock.Mock(spec=secrets.Secret) - self.certificate = mock.Mock(spec=secrets.Secret) - self.intermediates = mock.Mock(spec=secrets.Secret) - self.private_key_passphrase = mock.Mock(spec=secrets.Secret) - - container = mock.Mock(spec=containers.CertificateContainer) - container.container_ref = self.container_ref - container.name = self.name - container.private_key = self.private_key - container.certificate = self.certificate - container.intermediates = self.intermediates - container.private_key_passphrase = self.private_key_passphrase - self.container = container - - self.empty_container = mock.Mock(spec=containers.CertificateContainer) - - self.secret1 = mock.Mock(spec=secrets.Secret) - self.secret2 = mock.Mock(spec=secrets.Secret) - self.secret3 = mock.Mock(spec=secrets.Secret) - self.secret4 = mock.Mock(spec=secrets.Secret) - - super(TestBarbicanManager, self).setUp() - - def test_store_cert(self): - # Mock out the client - bc = mock.MagicMock() - bc.containers.create_certificate.return_value = self.empty_container - bcm.BarbicanKeystoneAuth._barbican_client = bc - - # Attempt to store a cert - bcm.CertManager.store_cert( - certificate=self.certificate, - private_key=self.private_key, - intermediates=self.intermediates, - private_key_passphrase=self.private_key_passphrase, - name=self.name - ) - - # create_secret should be called four times with our data - calls = [ - mock.call(payload=self.certificate, expiration=None, - name=mock.ANY), - mock.call(payload=self.private_key, expiration=None, - name=mock.ANY), - mock.call(payload=self.intermediates, expiration=None, - name=mock.ANY), - mock.call(payload=self.private_key_passphrase, expiration=None, - name=mock.ANY) - ] - bc.secrets.create.assert_has_calls(calls, any_order=True) - - # create_certificate should be called once - self.assertEqual(bc.containers.create_certificate.call_count, 1) - - # Container should be stored once - self.empty_container.store.assert_called_once_with() - - def test_store_cert_failure(self): - # Mock out the client - bc = mock.MagicMock() - bc.containers.create_certificate.return_value = self.empty_container - test_secrets = [ - self.secret1, - self.secret2, - self.secret3, - self.secret4 - ] - bc.secrets.create.side_effect = test_secrets - self.empty_container.store.side_effect = ValueError() - bcm.BarbicanKeystoneAuth._barbican_client = bc - - # Attempt to store a cert - self.assertRaises( - ValueError, - bcm.CertManager.store_cert, - certificate=self.certificate, - private_key=self.private_key, - intermediates=self.intermediates, - private_key_passphrase=self.private_key_passphrase, - name=self.name - ) - - # create_secret should be called four times with our data - calls = [ - mock.call(payload=self.certificate, expiration=None, - name=mock.ANY), - mock.call(payload=self.private_key, expiration=None, - name=mock.ANY), - mock.call(payload=self.intermediates, expiration=None, - name=mock.ANY), - mock.call(payload=self.private_key_passphrase, expiration=None, - name=mock.ANY) - ] - bc.secrets.create.assert_has_calls(calls, any_order=True) - - # create_certificate should be called once - self.assertEqual(bc.containers.create_certificate.call_count, 1) - - # Container should be stored once - self.empty_container.store.assert_called_once_with() - - # All secrets should be deleted (or at least an attempt made) - for s in test_secrets: - s.delete.assert_called_once_with() - - def test_get_cert(self): - # Mock out the client - bc = mock.MagicMock() - bc.containers.register_consumer.return_value = self.container - bcm.BarbicanKeystoneAuth._barbican_client = bc - - # Get the container data - data = bcm.CertManager.get_cert( - cert_ref=self.container_ref, - resource_ref=self.container_ref, - service_name='Octavia' - ) - - # 'register_consumer' should be called once with the container_ref - bc.containers.register_consumer.assert_called_once_with( - container_ref=self.container_ref, - url=self.container_ref, - name='Octavia' - ) - - # The returned data should be a Cert object with the correct values - self.assertIsInstance(data, cert_manager.Cert) - self.assertEqual(data.get_private_key(), - self.private_key.payload) - self.assertEqual(data.get_certificate(), - self.certificate.payload) - self.assertEqual(data.get_intermediates(), - self.intermediates.payload) - self.assertEqual(data.get_private_key_passphrase(), - self.private_key_passphrase.payload) - - def test_get_cert_no_registration(self): - # Mock out the client - bc = mock.MagicMock() - bc.containers.get.return_value = self.container - bcm.BarbicanKeystoneAuth._barbican_client = bc - - # Get the container data - data = bcm.CertManager.get_cert( - cert_ref=self.container_ref, check_only=True - ) - - # 'get' should be called once with the container_ref - bc.containers.get.assert_called_once_with( - container_ref=self.container_ref - ) - - # The returned data should be a Cert object with the correct values - self.assertIsInstance(data, cert_manager.Cert) - self.assertEqual(data.get_private_key(), - self.private_key.payload) - self.assertEqual(data.get_certificate(), - self.certificate.payload) - self.assertEqual(data.get_intermediates(), - self.intermediates.payload) - self.assertEqual(data.get_private_key_passphrase(), - self.private_key_passphrase.payload) - - def test_delete_cert(self): - # Mock out the client - bc = mock.MagicMock() - bcm.BarbicanKeystoneAuth._barbican_client = bc - - # Attempt to deregister as a consumer - bcm.CertManager.delete_cert( - cert_ref=self.container_ref, - resource_ref=self.container_ref, - service_name='Octavia' - ) - - # remove_consumer should be called once with the container_ref - bc.containers.remove_consumer.assert_called_once_with( - container_ref=self.container_ref, - url=self.container_ref, - name='Octavia' - ) - - def test_actually_delete_cert(self): - # Mock out the client - bc = mock.MagicMock() - bc.containers.get.return_value = self.container - bcm.BarbicanKeystoneAuth._barbican_client = bc - - # Attempt to store a cert - bcm.CertManager._actually_delete_cert( - cert_ref=self.container_ref - ) - - # All secrets should be deleted - self.container.certificate.delete.assert_called_once_with() - self.container.private_key.delete.assert_called_once_with() - self.container.intermediates.delete.assert_called_once_with() - self.container.private_key_passphrase.delete.assert_called_once_with() - - # Container should be deleted once - self.container.delete.assert_called_once_with() diff --git a/neutron_lbaas/tests/unit/db/loadbalancer/test_db_loadbalancerv2.py b/neutron_lbaas/tests/unit/db/loadbalancer/test_db_loadbalancerv2.py index da65b8cbd..703916ba5 100644 --- a/neutron_lbaas/tests/unit/db/loadbalancer/test_db_loadbalancerv2.py +++ b/neutron_lbaas/tests/unit/db/loadbalancer/test_db_loadbalancerv2.py @@ -15,6 +15,7 @@ import contextlib import copy +import exceptions as ex import mock import six @@ -27,6 +28,7 @@ from neutron import context import neutron.db.l3_db # noqa from neutron.plugins.common import constants from neutron.tests.unit.db import test_db_base_plugin_v2 +from oslo_config import cfg from oslo_utils import uuidutils import testtools import webob.exc @@ -780,6 +782,12 @@ class CertMock(cert_manager.Cert): return "mock" +class Exceptions(object): + def __iter__(self): + return self + pass + + class LbaasListenerTests(ListenerTestBase): def test_create_listener(self, **extras): @@ -831,6 +839,19 @@ class LbaasListenerTests(ListenerTestBase): def test_create_listener_with_tls_missing_container(self, **extras): default_tls_container_ref = uuidutils.generate_uuid() + + class ReplaceClass(ex.Exception): + def __init__(self, status_code, message): + self.status_code = status_code + self.message = message + pass + + cfg.CONF.set_override('service_name', + 'lbaas', + 'service_auth') + cfg.CONF.set_override('region', + 'RegionOne', + 'service_auth') listener_data = { 'protocol': lb_const.PROTOCOL_TERMINATED_HTTPS, 'default_tls_container_ref': default_tls_container_ref, @@ -842,18 +863,70 @@ class LbaasListenerTests(ListenerTestBase): } listener_data.update(extras) - with mock.patch( - 'neutron_lbaas.services.loadbalancer.plugin.' - 'CERT_MANAGER_PLUGIN.CertManager.get_cert') as get_cert_mock: - get_cert_mock.side_effect = LookupError + with contextlib.nested( + mock.patch('neutron_lbaas.services.loadbalancer.plugin.' + 'CERT_MANAGER_PLUGIN.CertManager.get_cert'), + mock.patch('neutron_lbaas.services.loadbalancer.plugin.' + 'CERT_MANAGER_PLUGIN.CertManager.delete_cert') + ) as (get_cert_mock, rm_consumer_mock): + ex.Exception = ReplaceClass(status_code=404, + message='Cert Not Found') + get_cert_mock.side_effect = ex.Exception self.assertRaises(loadbalancerv2.TLSContainerNotFound, self.plugin.create_listener, context.get_admin_context(), {'listener': listener_data}) + def test_create_listener_with_tls_invalid_service_acct(self, **extras): + default_tls_container_ref = uuidutils.generate_uuid() + listener_data = { + 'protocol': lb_const.PROTOCOL_TERMINATED_HTTPS, + 'default_tls_container_ref': default_tls_container_ref, + 'sni_container_refs': [], + 'protocol_port': 443, + 'admin_state_up': True, + 'tenant_id': self._tenant_id, + 'loadbalancer_id': self.lb_id + } + listener_data.update(extras) + + with contextlib.nested( + mock.patch('neutron_lbaas.services.loadbalancer.plugin.' + 'CERT_MANAGER_PLUGIN.CertManager.get_cert'), + mock.patch('neutron_lbaas.services.loadbalancer.plugin.' + 'CERT_MANAGER_PLUGIN.CertManager.delete_cert') + ) as (get_cert_mock, rm_consumer_mock): + get_cert_mock.side_effect = Exception('RandomFailure') + + self.assertRaises(loadbalancerv2.CertManagerError, + self.plugin.create_listener, + context.get_admin_context(), + {'listener': listener_data}) + + def test_get_service_url(self): + # Format: ://// + cfg.CONF.set_override('service_name', + 'lbaas', + 'service_auth') + cfg.CONF.set_override('region', + 'RegionOne', + 'service_auth') + listner = { + 'loadbalancer_id': self.lb_id + } + self.assertEqual( + 'lbaas://RegionOne/LOADBALANCER/{0}'.format(self.lb_id), + self.plugin._get_service_url(listner)) + def test_create_listener_with_tls_invalid_container(self, **extras): default_tls_container_ref = uuidutils.generate_uuid() + cfg.CONF.set_override('service_name', + 'lbaas', + 'service_auth') + cfg.CONF.set_override('region', + 'RegionOne', + 'service_auth') listener_data = { 'protocol': lb_const.PROTOCOL_TERMINATED_HTTPS, 'default_tls_container_ref': default_tls_container_ref, @@ -869,8 +942,10 @@ class LbaasListenerTests(ListenerTestBase): mock.patch('neutron_lbaas.services.loadbalancer.plugin.' 'cert_parser.validate_cert'), mock.patch('neutron_lbaas.services.loadbalancer.plugin.' - 'CERT_MANAGER_PLUGIN.CertManager.get_cert') - ) as (validate_cert_mock, get_cert_mock): + 'CERT_MANAGER_PLUGIN.CertManager.get_cert'), + mock.patch('neutron_lbaas.services.loadbalancer.plugin.' + 'CERT_MANAGER_PLUGIN.CertManager.delete_cert') + ) as (validate_cert_mock, get_cert_mock, rm_consumer_mock): get_cert_mock.start().return_value = CertMock( 'mock_cert') validate_cert_mock.side_effect = exceptions.MisMatchedKey @@ -879,6 +954,9 @@ class LbaasListenerTests(ListenerTestBase): self.plugin.create_listener, context.get_admin_context(), {'listener': listener_data}) + rm_consumer_mock.assert_called_once_with( + listener_data['default_tls_container_ref'], + 'lbaas://RegionOne/LOADBALANCER/{0}'.format(self.lb_id)) def test_create_listener_with_tls(self, **extras): default_tls_container_ref = uuidutils.generate_uuid()