poppy/tests/unit/manager/default/test_ssl_certificate.py

470 lines
17 KiB
Python

# Copyright (c) 2016 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ddt
import json
import mock
from oslo_config import cfg
import testtools
from poppy.manager.default import driver
from poppy.manager.default import ssl_certificate
from poppy.model import ssl_certificate as ssl_cert_model
from poppy.transport.validators import helpers as validators
from tests.unit import base
@ddt.ddt
class DefaultSSLCertificateControllerTests(base.TestCase):
@mock.patch('poppy.bootstrap.Bootstrap')
@mock.patch('poppy.notification.base.driver.NotificationDriverBase')
@mock.patch('poppy.dns.base.driver.DNSDriverBase')
@mock.patch('poppy.storage.base.driver.StorageDriverBase')
@mock.patch('poppy.distributed_task.base.driver.DistributedTaskDriverBase')
@mock.patch('poppy.metrics.base.driver.MetricsDriverBase')
def setUp(self, mock_metrics, mock_distributed_task, mock_storage,
mock_dns, mock_notification, mock_bootstrap):
super(DefaultSSLCertificateControllerTests, self).setUp()
conf = cfg.ConfigOpts()
self.provider_mocks = {
'akamai': mock.Mock(provider_name="Akamai"),
'maxcdn': mock.Mock(provider_name='MaxCDN'),
'cloudfront': mock.Mock(provider_name='CloudFront'),
'fastly': mock.Mock(provider_name='Fastly'),
'mock': mock.Mock(provider_name='Mock')
}
# mock a stevedore provider extension
def get_provider_by_name(name):
obj_mock = self.provider_mocks[name]
obj_mock.san_cert_cnames = ["san1", "san2"]
obj_mock.akamai_sps_api_base_url = 'akamai_base_url/{spsId}'
provider = mock.Mock(obj=obj_mock)
return provider
def provider_membership(key):
return True if key in self.provider_mocks else False
self.mock_providers = mock.MagicMock()
self.mock_providers.__getitem__.side_effect = get_provider_by_name
self.mock_providers.__contains__.side_effect = provider_membership
self.manager_driver = driver.DefaultManagerDriver(
conf,
mock_storage,
self.mock_providers,
mock_dns,
mock_distributed_task,
mock_notification,
mock_metrics
)
self.scc = ssl_certificate.DefaultSSLCertificateController(
self.manager_driver
)
def test_create_ssl_certificate_exception_validation(self):
cert_obj = ssl_cert_model.SSLCertificate(
'flavor_id',
'invalid_domain',
'san',
project_id='project_id'
)
with testtools.ExpectedException(ValueError):
self.scc.create_ssl_certificate('project_id', cert_obj=cert_obj)
def test_create_ssl_certificate_invalid_domain(self):
cert_obj = ssl_cert_model.SSLCertificate(
'premium',
'www.krusty.happyclowns',
'san',
project_id='000'
)
validators.is_valid_tld = mock.Mock(return_value=False)
with testtools.ExpectedException(ValueError):
self.scc.create_ssl_certificate('project_id', cert_obj=cert_obj)
def test_create_ssl_certificate_exception_storage_create_cert(self):
cert_obj = ssl_cert_model.SSLCertificate(
'flavor_id',
'www.valid-domain.com',
'san',
project_id='project_id'
)
self.scc.storage.create_certificate.side_effect = ValueError(
'Mock -- Cert already exists!')
with testtools.ExpectedException(ValueError):
self.scc.create_ssl_certificate('project_id', cert_obj=cert_obj)
def test_get_san_cert_configuration_positive(self):
resp = self.scc.get_san_cert_configuration("san1")
self.assertIsNotNone(resp)
def test_get_san_cert_configuration_positive_no_akamai_provider(self):
del self.provider_mocks['akamai']
resp = self.scc.get_san_cert_configuration("san1")
self.assertEqual({}, resp)
def test_get_san_cert_configuration_invalid_san_cert_cname(self):
with testtools.ExpectedException(ValueError):
self.scc.get_san_cert_configuration("non-existant")
def test_set_san_cert_hostname_limit_positive(self):
resp = mock.Mock()
resp.status_code = 200
resp.json.return_value = {
'requestList': [
{
'jobId': '5555'
}
]
}
api_client = self.mock_providers['akamai'].obj.sps_api_client
api_client.get.return_value = resp
self.scc.update_san_cert_configuration("san1", {"spsId": '1234'})
self.assertTrue(api_client.get.called)
api_client.get.assert_called_once_with(
self.mock_providers['akamai'].obj.akamai_sps_api_base_url.format(
spsId=1234
)
)
def test_update_san_cert_configuration_positive(self):
self.scc.set_san_cert_hostname_limit(
{"san_cert_hostname_limit": '1234'}
)
cert_info_storage = self.mock_providers['akamai'].obj.cert_info_storage
cert_info_storage.set_san_cert_hostname_limit.\
assert_called_once_with('1234')
def test_update_san_cert_configuration_negative(self):
with testtools.ExpectedException(ValueError):
self.scc.set_san_cert_hostname_limit(
{"invalid_setting_name": '1234'}
)
cert_info_storage = self.mock_providers['akamai'].obj.cert_info_storage
self.assertFalse(
cert_info_storage.set_san_cert_hostname_limit.called)
def test_update_san_cert_configuration_no_sps_id(self):
api_client = self.mock_providers['akamai'].obj.sps_api_client
self.scc.update_san_cert_configuration("san1", {"spsId": None})
self.assertEqual(False, api_client.called)
def test_update_san_cert_invalid_san_cert_cname(self):
with testtools.ExpectedException(ValueError):
self.scc.update_san_cert_configuration("non-existant",
{"spsId": '1234'})
def test_update_san_cert_configuration_api_failure(self):
resp = mock.Mock()
resp.status_code = 404
resp.text = 'spsId Not Found'
api_client = self.mock_providers['akamai'].obj.sps_api_client
api_client.get.return_value = resp
err_text = "SPS GET Request failed. Exception: spsId Not Found"
with testtools.ExpectedException(RuntimeError, value_re=err_text):
self.scc.update_san_cert_configuration("san1", {"spsId": '1234'})
self.assertTrue(api_client.get.called)
api_client.get.assert_called_once_with(
self.mock_providers['akamai'].obj.akamai_sps_api_base_url.format(
spsId=1234
)
)
def test_get_san_retry_list(self):
self.manager_driver.providers[
'akamai'].obj.mod_san_queue.traverse_queue.return_value = [
json.dumps({
"domain_name": "a_domain",
"project_id": "00000",
"flavor_id": "flavor",
"validate_service": True
})
]
self.assertEqual(1, len(self.scc.get_san_retry_list()))
def test_update_san_retry_list(self):
original_queue_data = [{
"domain_name": "a_domain",
"project_id": "00000",
"flavor_id": "flavor",
"validate_service": True
}]
akamai_driver = self.manager_driver.providers[
'akamai'].obj
akamai_driver.mod_san_queue.traverse_queue.return_value = [
json.dumps(original_queue_data)
]
akamai_driver.mod_san_queue.put_queue_data.side_effect = lambda \
value: value
new_queue_data = [
{
"domain_name": "b_domain",
"project_id": "00000",
"flavor_id": "flavor",
"validate_service": True
}
]
cert_domain_mock = mock.Mock()
cert_domain_mock.get_cert_status.return_value = "create_in_progress"
self.scc.storage.get_certs_by_domain. \
return_value = cert_domain_mock
res, deleted = self.scc.update_san_retry_list(new_queue_data)
self.assertEqual(new_queue_data, res)
self.assertEqual(
True,
self.scc.storage.get_certs_by_domain.called
)
self.assertEqual(
True,
akamai_driver.mod_san_queue.traverse_queue.called
)
self.assertEqual(
True,
akamai_driver.mod_san_queue.put_queue_data.called
)
def test_update_san_retry_list_cert_deployed_error(self):
queue_data = [
{
"domain_name": "a_domain",
"project_id": "00000",
"flavor_id": "flavor",
"validate_service": True
}
]
cert_domain_mock = mock.Mock()
cert_domain_mock.get_cert_status.return_value = "deployed"
self.scc.storage.get_certs_by_domain. \
return_value = cert_domain_mock
with testtools.ExpectedException(ValueError):
self.scc.update_san_retry_list(queue_data)
self.assertEqual(
True,
self.scc.storage.get_certs_by_domain.called
)
def test_rerun_san_retry_list(self):
mod_san_queue = self.manager_driver.providers[
'akamai'].obj.mod_san_queue
mod_san_queue.mod_san_queue_backend = mock.MagicMock()
mod_san_queue.mod_san_queue_backend.__len__.side_effect = [1, 0]
mod_san_queue.dequeue_mod_san_request.side_effect = [
bytearray(json.dumps({
"domain_name": "a_domain",
"project_id": "00000",
"flavor_id": "flavor",
"validate_service": True
}), encoding='utf-8')
]
self.scc.storage.get_certs_by_domain. \
return_value = []
self.scc.rerun_san_retry_list()
self.assertEqual(
True,
self.scc.distributed_task_controller.submit_task.called
)
def test_rerun_san_retry_list_exception_service_validation(self):
mod_san_queue = self.manager_driver.providers[
'akamai'].obj.mod_san_queue
mod_san_queue.mod_san_queue_backend = mock.MagicMock()
mod_san_queue.mod_san_queue_backend.__len__.side_effect = [1, 0]
test_queue_item = {
"domain_name": "a_domain",
"project_id": "00000",
"flavor_id": "flavor",
"validate_service": True
}
mod_san_queue.dequeue_mod_san_request.side_effect = [
bytearray(json.dumps(test_queue_item), encoding='utf-8')
]
self.scc.service_storage.get_service_details_by_domain_name. \
return_value = None
with mock.patch('oslo_log.log.KeywordArgumentAdapter.error') as logger:
self.scc.rerun_san_retry_list()
self.assertTrue(logger.called)
args, _ = logger.call_args
self.assertTrue(test_queue_item["domain_name"] in args[0])
self.assertTrue(test_queue_item["project_id"] in args[0])
self.assertTrue(test_queue_item["flavor_id"] in args[0])
self.assertTrue(
str(test_queue_item["validate_service"]) in args[0]
)
self.assertEqual(
False,
self.scc.distributed_task_controller.submit_task.called
)
def test_rerun_san_retry_list_exception_cert_already_deployed(self):
mod_san_queue = self.manager_driver.providers[
'akamai'].obj.mod_san_queue
mod_san_queue.mod_san_queue_backend = mock.MagicMock()
mod_san_queue.mod_san_queue_backend.__len__.side_effect = [1, 0]
test_domain = "a_domain"
mod_san_queue.dequeue_mod_san_request.side_effect = [
bytearray(json.dumps({
"domain_name": test_domain,
"project_id": "00000",
"flavor_id": "flavor",
"validate_service": True
}), encoding='utf-8')
]
cert_domain_mock = mock.Mock()
cert_domain_mock.get_cert_status.return_value = "deployed"
self.scc.storage.get_certs_by_domain. \
return_value = cert_domain_mock
with mock.patch('oslo_log.log.KeywordArgumentAdapter.error') as logger:
self.scc.rerun_san_retry_list()
self.assertTrue(logger.called)
args, _ = logger.call_args
self.assertEqual(
(u'Certificate on {0} has already been provisioned '
'successfully.'.format(test_domain), ),
args
)
self.assertEqual(
False,
self.scc.distributed_task_controller.submit_task.called
)
def test_rerun_san_retry_list_exception_flavor_get(self):
mod_san_queue = self.manager_driver.providers[
'akamai'].obj.mod_san_queue
mod_san_queue.mod_san_queue_backend = mock.MagicMock()
mod_san_queue.mod_san_queue_backend.__len__.side_effect = [1, 0]
mod_san_queue.dequeue_mod_san_request.side_effect = [
bytearray(json.dumps({
"domain_name": "a_domain",
"project_id": "00000",
"flavor_id": "flavor",
"validate_service": True
}), encoding='utf-8')
]
self.scc.flavor_controller.get.side_effect = LookupError(
"Mock -- Flavor not found!")
self.scc.rerun_san_retry_list()
self.assertEqual(True, mod_san_queue.enqueue_mod_san_request.called)
self.assertEqual(
False,
self.scc.distributed_task_controller.submit_task.called
)
def test_rerun_san_retry_list_cert_deployed_second_check(self):
mod_san_queue = self.manager_driver.providers[
'akamai'].obj.mod_san_queue
mod_san_queue.mod_san_queue_backend = mock.MagicMock()
mod_san_queue.mod_san_queue_backend.__len__.side_effect = [1, 0]
mod_san_queue.dequeue_mod_san_request.side_effect = [
bytearray(json.dumps({
"domain_name": "a_domain",
"project_id": "00000",
"flavor_id": "flavor",
"validate_service": True
}), encoding='utf-8')
]
cert_domain_mock = mock.Mock()
cert_domain_mock.get_cert_status.return_value = "create_in_progress"
cert_domain_mock_2 = mock.Mock()
cert_domain_mock_2.get_cert_status.return_value = "deployed"
self.scc.storage.get_certs_by_domain. \
side_effect = [cert_domain_mock, cert_domain_mock_2]
self.scc.rerun_san_retry_list()
self.assertEqual(
False,
self.scc.distributed_task_controller.submit_task.called
)
def test_rerun_san_retry_list_cert_create_in_progress_second_check(self):
mod_san_queue = self.manager_driver.providers[
'akamai'].obj.mod_san_queue
mod_san_queue.mod_san_queue_backend = mock.MagicMock()
mod_san_queue.mod_san_queue_backend.__len__.side_effect = [1, 0]
mod_san_queue.dequeue_mod_san_request.side_effect = [
bytearray(json.dumps({
"domain_name": "a_domain",
"project_id": "00000",
"flavor_id": "flavor",
"validate_service": True
}), encoding='utf-8')
]
cert_domain_mock = mock.Mock()
cert_domain_mock.get_cert_status.return_value = "create_in_progress"
cert_domain_mock_2 = mock.Mock()
cert_domain_mock_2.get_cert_status.return_value = "create_in_progress"
self.scc.storage.get_certs_by_domain. \
side_effect = [cert_domain_mock, cert_domain_mock_2]
self.scc.rerun_san_retry_list()
self.assertEqual(
True,
self.scc.distributed_task_controller.submit_task.called
)
def test_get_certs_by_status(self):
result_list_mock = [mock.Mock()]
self.scc.storage.get_certs_by_status.return_value = result_list_mock
results = self.scc.get_certs_by_status("create_in_progress")
self.assertEqual(result_list_mock, results)