Merge "Add misc unit and functional tests"

This commit is contained in:
Jenkins 2016-09-14 17:32:00 +00:00 committed by Gerrit Code Review
commit ae84ae694f
23 changed files with 776 additions and 19 deletions

View File

@ -23,9 +23,9 @@ from poppy.provider.akamai import utils
AKAMAI_OPTIONS = [
# queue backend configs
cfg.StrOpt(
'queue_backend_type',
help='SAN Cert Queueing backend'),
# cfg.StrOpt(
# 'queue_backend_type',
# help='HTTP policy queueing backend'),
cfg.ListOpt(
'queue_backend_host',
default=['localhost'],

View File

@ -79,6 +79,7 @@ class TestServiceLimits(base.TestBase):
self.assertEqual(json.loads(resp.content)['limit'], limit)
def tearDown(self):
super(TestServiceLimits, self).tearDown()
for service in self.service_list:
self.service_limit_user_client.delete_service(location=service)
self.service_limit_user_client.wait_for_service_delete(

View File

@ -76,6 +76,27 @@ class HealthControllerTest(base.FunctionalTest):
self.assertEqual(200, response.status_code)
self.assertIn('true', str(response.body))
@mock.patch('requests.get')
def test_health_dns(self, mock_requests):
response_object = util.dict2obj(
{'content': '', 'status_code': 200})
mock_requests.return_value = response_object
response = self.app.get('/v1.0/health',
headers={'X-Project-ID': self.project_id})
for name in response.json['dns']:
endpoint = '/v1.0/health/dns/{0}'.format(name)
response = self.app.get(endpoint,
headers={'X-Project-ID': self.project_id})
self.assertEqual(200, response.status_code)
self.assertIn('true', str(response.body))
def test_get_unknown_dns(self):
response = self.app.get('/v1.0/health/dns/unknown',
headers={'X-Project-ID': self.project_id},
expect_errors=True)
self.assertEqual(404, response.status_code)
def test_get_unknown_provider(self):
response = self.app.get('/v1.0/health/provider/unknown',
headers={'X-Project-ID': self.project_id},

View File

@ -140,7 +140,7 @@ class SSLCertificateControllerTest(base.FunctionalTest):
self.assertEqual(200, response.status_code)
def test_create_with_invalid_json(self):
# create with errorenous data: invalid json data
# create with erroneous data: invalid json data
response = self.app.post('/v1.0/ssl_certificate',
params="{",
headers={
@ -151,7 +151,7 @@ class SSLCertificateControllerTest(base.FunctionalTest):
@ddt.file_data("data_create_ssl_certificate_bad_input_json.json")
def test_create_with_bad_input_json(self, ssl_certificate_json):
# create with errorenous data
# create with erroneous data
response = self.app.post('/v1.0/ssl_certificate',
params=json.dumps(ssl_certificate_json),
headers={'Content-Type': 'application/json',
@ -160,14 +160,14 @@ class SSLCertificateControllerTest(base.FunctionalTest):
self.assertEqual(400, response.status_code)
def test_delete_cert(self):
# create with errorenous data: invalid json data
# create with erroneous data: invalid json data
response = self.app.delete('/v1.0/ssl_certificate/blog.test.com',
headers={'X-Project-ID': self.project_id}
)
self.assertEqual(202, response.status_code)
def test_delete_cert_non_exist(self):
# create with errorenous data: invalid json data
# create with erroneous data: invalid json data
response = self.app.delete('/v1.0/ssl_certificate/blog.non_exist.com',
headers={'X-Project-ID': self.project_id},
expect_errors=True)

View File

@ -18,7 +18,9 @@ import random
import ddt
import mock
from oslo_config import cfg
import requests
from poppy.provider.akamai import certificates
from poppy.provider.akamai import driver
from tests.unit import base
@ -191,3 +193,57 @@ class TestDriver(base.TestCase):
mock_connect.return_value = mock.Mock()
provider = driver.CDNProvider(self.conf)
self.assertNotEqual(None, provider.cert_info_storage)
@mock.patch.object(driver, 'AKAMAI_OPTIONS', new=AKAMAI_OPTIONS)
def test_certificate_controller(self):
provider = driver.CDNProvider(self.conf)
self.assertTrue(
isinstance(
provider.certificate_controller,
certificates.CertificateController
)
)
@mock.patch.object(driver, 'AKAMAI_OPTIONS', new=AKAMAI_OPTIONS)
def test_policy_api_client(self):
provider = driver.CDNProvider(self.conf)
self.assertTrue(
isinstance(provider.policy_api_client, requests.Session))
@mock.patch.object(driver, 'AKAMAI_OPTIONS', new=AKAMAI_OPTIONS)
def test_ccu_api_client(self):
provider = driver.CDNProvider(self.conf)
self.assertTrue(
isinstance(provider.ccu_api_client, requests.Session))
@mock.patch.object(driver, 'AKAMAI_OPTIONS', new=AKAMAI_OPTIONS)
def test_sps_api_client(self):
provider = driver.CDNProvider(self.conf)
self.assertTrue(
isinstance(provider.sps_api_client, requests.Session))
@mock.patch.object(driver, 'AKAMAI_OPTIONS', new=AKAMAI_OPTIONS)
def test_papi_api_client(self):
provider = driver.CDNProvider(self.conf)
self.assertTrue(
isinstance(provider.papi_api_client, requests.Session))
@mock.patch.object(driver, 'AKAMAI_OPTIONS', new=AKAMAI_OPTIONS)
def test_papi_property_id_positive(self):
provider = driver.CDNProvider(self.conf)
prop_id = provider.papi_property_id('akamai_http_config_number')
self.assertIsNotNone(prop_id)
@mock.patch.object(driver, 'AKAMAI_OPTIONS', new=AKAMAI_OPTIONS)
def test_papi_property_id_positive_spec_returns_list(self):
provider = driver.CDNProvider(self.conf)
prop_id = provider.papi_property_id('akamai_https_san_config_numbers')
self.assertIsNotNone(prop_id)
@mock.patch.object(driver, 'AKAMAI_OPTIONS', new=AKAMAI_OPTIONS)
def test_papi_property_id_invalid_spec(self):
provider = driver.CDNProvider(self.conf)
self.assertRaises(
ValueError, provider.papi_property_id, 'invalid_spec_name')

View File

@ -0,0 +1,142 @@
# Copyright (c) 2016 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import mock
from oslo_config import cfg
from zake import fake_client
from poppy.provider.akamai.http_policy_queue import http_policy_queue
from tests.unit import base
AKAMAI_OPTIONS = [
# queue backend configs
cfg.StrOpt(
'queue_backend_type',
help='HTTP policy queueing backend'),
cfg.ListOpt('queue_backend_host', default=['localhost'],
help='default queue backend server hosts'),
cfg.IntOpt('queue_backend_port', default=2181, help='default'
' default queue backend server port (e.g: 2181)'),
cfg.StrOpt(
'http_policy_queue_path',
default='/http_policy_queue',
help='Zookeeper path '
'for http_policy_queue'
),
]
AKAMAI_GROUP = 'drivers:provider:akamai'
class TestHTTPPolicyQueue(base.TestCase):
def setUp(self):
super(TestHTTPPolicyQueue, self).setUp()
self.http_policy_dict = {
"configuration_number": 1,
"policy_name": "www.abc.com",
"project_id": "12345"
}
# Need this fake class bc zake's fake client
# does not take any host parameters
class fake_kz_client(fake_client.FakeClient):
def __init__(self, hosts):
super(self.__class__, self).__init__()
zookeeper_client_patcher = mock.patch(
'kazoo.client.KazooClient',
fake_kz_client
)
zookeeper_client_patcher.start()
self.addCleanup(zookeeper_client_patcher.stop)
self.conf = cfg.ConfigOpts()
self.zk_queue = http_policy_queue.ZookeeperHttpPolicyQueue(self.conf)
def test_enqueue_http_policy(self):
self.zk_queue.enqueue_http_policy(
json.dumps(self.http_policy_dict).encode('utf-8'))
self.assertTrue(len(self.zk_queue.http_policy_queue_backend) == 1)
self.assertTrue(
json.loads(self.zk_queue.http_policy_queue_backend.get().
decode('utf-8')) == self.http_policy_dict)
def test_dequeue_http_policy(self):
self.zk_queue.enqueue_http_policy(
json.dumps(self.http_policy_dict).encode('utf-8'))
res = self.zk_queue.dequeue_http_policy(False).decode('utf-8')
self.assertTrue(len(self.zk_queue.http_policy_queue_backend) == 1)
self.assertTrue(json.loads(res) == self.http_policy_dict)
res = self.zk_queue.dequeue_http_policy().decode('utf-8')
self.assertTrue(len(self.zk_queue.http_policy_queue_backend) == 0)
self.assertTrue(json.loads(res) == self.http_policy_dict)
def test_traverse_queue(self):
self.zk_queue.enqueue_http_policy(
json.dumps(self.http_policy_dict).encode('utf-8'))
res = self.zk_queue.traverse_queue()
self.assertTrue(len(res) == 1)
res = [json.loads(r.decode('utf-8')) for r in res]
self.assertTrue(res == [self.http_policy_dict])
def test_traverse_queue_multiple_records(self):
# Get a list of records to enqueue
policy_obj_list = []
for i in range(10):
policy_object = {
"configuration_number": 1,
"policy_name": "www.abc{0}.com".format(i),
"project_id": "12345{0}".format(i),
}
policy_obj_list.append(policy_object)
for cert_obj in policy_obj_list:
self.zk_queue.enqueue_http_policy(
json.dumps(cert_obj).encode('utf-8'))
res = self.zk_queue.traverse_queue()
self.assertTrue(len(res) == 10)
res = [json.loads(r.decode('utf-8')) for r in res]
self.assertTrue(res == policy_obj_list)
def test_put_queue_data(self):
res = self.zk_queue.put_queue_data([])
self.assertTrue(len(res) == 0)
policy_obj_list = []
for i in range(10):
policy_object = {
"configuration_number": 1,
"policy_name": "www.abc{0}.com".format(i),
"project_id": "12345{0}".format(i),
}
policy_obj_list.append(policy_object)
self.zk_queue.put_queue_data(
[json.dumps(o).encode('utf-8') for o in policy_obj_list])
self.assertTrue(len(self.zk_queue.http_policy_queue_backend) == 10)
res = self.zk_queue.traverse_queue()
res = [json.loads(r.decode('utf-8')) for r in res]
self.assertTrue(res == policy_obj_list)
# test put data to non-empty queue
# should replace all items added above
self.zk_queue.put_queue_data(
[json.dumps(o).encode('utf-8') for o in policy_obj_list])
self.assertTrue(len(self.zk_queue.http_policy_queue_backend) == 10)

View File

@ -131,3 +131,9 @@ class TestModSanQueue(base.TestCase):
res = self.zk_queue.traverse_queue()
res = [json.loads(r.decode('utf-8')) for r in res]
self.assertTrue(res == cert_obj_list)
# test put data to non-empty queue
# should replace all items added above
self.zk_queue.put_queue_data(
[json.dumps(o).encode('utf-8') for o in cert_obj_list])
self.assertTrue(len(self.zk_queue.mod_san_queue_backend) == 10)

View File

@ -0,0 +1,158 @@
# Copyright (c) 2016 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import mock
from oslo_config import cfg
from zake import fake_client
from poppy.provider.akamai.domain_san_mapping_queue import zk_san_mapping_queue
from tests.unit import base
class TestSanMappingQueue(base.TestCase):
def setUp(self):
super(TestSanMappingQueue, self).setUp()
self.san_mapping_dict = {
"domain_name": "test-san1.cnamecdn.com",
"flavor_id": "flavor_id",
"project_id": "project_id",
"cert_type": "san",
"cert_details": {
"Akamai": {
"extra_info": {
"san cert": "san1.example.com",
"akamai_spsId": 1
}
}
}
}
# Need this fake class bc zake's fake client
# does not take any host parameters
class fake_kz_client(fake_client.FakeClient):
def __init__(self, hosts):
super(self.__class__, self).__init__()
zookeeper_client_patcher = mock.patch(
'kazoo.client.KazooClient',
fake_kz_client
)
zookeeper_client_patcher.start()
self.addCleanup(zookeeper_client_patcher.stop)
self.conf = cfg.ConfigOpts()
self.zk_queue = zk_san_mapping_queue.ZookeeperSanMappingQueue(
self.conf)
def test_enqueue_san_mapping(self):
self.zk_queue.enqueue_san_mapping(
json.dumps(self.san_mapping_dict).encode('utf-8'))
self.assertTrue(len(self.zk_queue.san_mapping_queue_backend) == 1)
self.assertTrue(
json.loads(self.zk_queue.san_mapping_queue_backend.get().
decode('utf-8')) == self.san_mapping_dict)
def test_dequeue_san_mapping(self):
self.zk_queue.enqueue_san_mapping(
json.dumps(self.san_mapping_dict).encode('utf-8'))
res = self.zk_queue.dequeue_san_mapping(False).decode('utf-8')
self.assertTrue(len(self.zk_queue.san_mapping_queue_backend) == 1)
self.assertTrue(json.loads(res) == self.san_mapping_dict)
res = self.zk_queue.dequeue_san_mapping().decode('utf-8')
self.assertTrue(len(self.zk_queue.san_mapping_queue_backend) == 0)
self.assertTrue(json.loads(res) == self.san_mapping_dict)
def test_traverse_queue(self):
self.zk_queue.enqueue_san_mapping(
json.dumps(self.san_mapping_dict).encode('utf-8'))
res = self.zk_queue.traverse_queue()
self.assertTrue(len(res) == 1)
res = [json.loads(r.decode('utf-8')) for r in res]
self.assertTrue(res == [self.san_mapping_dict])
def test_traverse_queue_consume(self):
self.zk_queue.enqueue_san_mapping(
json.dumps(self.san_mapping_dict).encode('utf-8'))
res = self.zk_queue.traverse_queue(consume=True)
self.assertTrue(len(res) == 1)
res = [json.loads(r.decode('utf-8')) for r in res]
self.assertTrue(res == [self.san_mapping_dict])
def test_traverse_queue_multiple_records(self):
# Get a list of records to enqueue
san_mapping_list = []
for i in range(10):
mapping_object = {
"domain_name": "domain.domain{0}.com".format(i),
"flavor_id": "flavor_id",
"project_id": "project_id",
"cert_type": "san",
"cert_details": {
"Akamai": {
"extra_info": {
"san cert": "san1.example.com",
"akamai_spsId": 1
}
}
}
}
san_mapping_list.append(mapping_object)
for cert_obj in san_mapping_list:
self.zk_queue.enqueue_san_mapping(
json.dumps(cert_obj).encode('utf-8'))
res = self.zk_queue.traverse_queue()
self.assertTrue(len(res) == 10)
res = [json.loads(r.decode('utf-8')) for r in res]
self.assertTrue(res == san_mapping_list)
def test_put_queue_data(self):
res = self.zk_queue.put_queue_data([])
self.assertTrue(len(res) == 0)
san_mapping_list = []
for i in range(10):
mapping_object = {
"domain_name": "domain.domain{0}.com".format(i),
"flavor_id": "flavor_id",
"project_id": "project_id",
"cert_type": "san",
"cert_details": {
"Akamai": {
"extra_info": {
"san cert": "san1.example.com",
"akamai_spsId": 1
}
}
}
}
san_mapping_list.append(mapping_object)
self.zk_queue.put_queue_data(
[json.dumps(o).encode('utf-8') for o in san_mapping_list])
self.assertTrue(len(self.zk_queue.san_mapping_queue_backend) == 10)
res = self.zk_queue.traverse_queue()
res = [json.loads(r.decode('utf-8')) for r in res]
self.assertTrue(res == san_mapping_list)
# test put data to non-empty queue
# should replace all items added above
self.zk_queue.put_queue_data(
[json.dumps(o).encode('utf-8') for o in san_mapping_list])
self.assertTrue(len(self.zk_queue.san_mapping_queue_backend) == 10)

View File

@ -0,0 +1,34 @@
# Copyright (c) 2016 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from poppy.provider.cloudfront import certificates
from tests.unit import base
class TestCertificates(base.TestCase):
def setUp(self):
super(TestCertificates, self).setUp()
self.driver = mock.Mock()
self.driver.provider_name = 'Cloudfront'
self.controller = certificates.CertificateController(self.driver)
def test_create_certificate(self):
self.assertEqual(
NotImplemented, self.controller.create_certificate({}))

View File

@ -83,3 +83,7 @@ class TestDriver(base.TestCase):
def test_service_controller(self, MockController):
provider = driver.CDNProvider(self.conf)
self.assertNotEqual(provider.service_controller, None)
def test_certificate_controller(self):
provider = driver.CDNProvider(self.conf)
self.assertNotEqual(provider.certificate_controller, None)

View File

@ -160,3 +160,14 @@ class TestServices(base.TestCase):
def test_regions(self):
self.assertEqual(self.controller.driver.regions, [])
def test_get_provider_service_id(self):
mock_service = mock.Mock()
mock_service.name = uuid.uuid4()
self.assertEqual(
mock_service.name,
self.controller.get_provider_service_id(mock_service))
def test_get_metrics_by_domain(self):
self.assertEqual([], self.controller.get_metrics_by_domain(
'project_id', 'domain_name', []))

View File

@ -0,0 +1,34 @@
# Copyright (c) 2016 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from poppy.provider.fastly import certificates
from tests.unit import base
class TestCertificates(base.TestCase):
def setUp(self):
super(TestCertificates, self).setUp()
self.driver = mock.Mock()
self.driver.provider_name = 'Fastly'
self.controller = certificates.CertificateController(self.driver)
def test_create_certificate(self):
self.assertEqual(
NotImplemented, self.controller.create_certificate({}))

View File

@ -75,8 +75,12 @@ class TestDriver(base.TestCase):
client = provider.client()
self.assertNotEqual(client, None)
@mock.patch('poppy.provider.fastly.controllers.ServiceController')
@mock.patch.object(driver, 'FASTLY_OPTIONS', new=FASTLY_OPTIONS)
def test_service_controller(self, MockController):
def test_service_controller(self):
provider = driver.CDNProvider(self.conf)
self.assertNotEqual(provider.service_controller, None)
@mock.patch.object(driver, 'FASTLY_OPTIONS', new=FASTLY_OPTIONS)
def test_certificate_controller(self):
provider = driver.CDNProvider(self.conf)
self.assertIsNotNone(provider.certificate_controller)

View File

@ -380,6 +380,32 @@ class TestServices(base.TestCase):
provider_service_id, service_obj)
self.assertIn('id', resp[self.driver.provider_name])
@ddt.file_data('data_service.json')
def test_update_fastly_exception(self, service_json):
provider_service_id = uuid.uuid1()
controller = services.ServiceController(self.driver)
controller.client.list_versions.return_value = [self.version]
controller.client.get_service_details.side_effect = (
fastly.FastlyError('Mock -- Fastly error!')
)
service_obj = service.load_from_json(service_json)
resp = controller.update(
provider_service_id, service_obj)
self.assertIn('error', resp[self.driver.provider_name])
@ddt.file_data('data_service.json')
def test_update_general_exception(self, service_json):
provider_service_id = uuid.uuid1()
controller = services.ServiceController(self.driver)
controller.client.list_versions.return_value = [self.version]
controller.client.get_service_details.side_effect = (
Exception('Mock -- Something went wrong!')
)
service_obj = service.load_from_json(service_json)
resp = controller.update(
provider_service_id, service_obj)
self.assertIn('error', resp[self.driver.provider_name])
def test_purge_with_exception(self):
provider_service_id = uuid.uuid1()
controller = services.ServiceController(self.driver)
@ -408,7 +434,8 @@ class TestServices(base.TestCase):
mock.Mock(name='domain_1'),
mock.Mock(name='domain_2')]
controller.client.purge_url.return_value = 'purge_url_return'
resp = controller.purge(provider_service_id, ['/url_1', '/url_2'])
resp = controller.purge(
provider_service_id, hard=True, purge_url='some_url')
self.assertIn('id', resp[self.driver.provider_name])
def test_client(self):
@ -504,3 +531,19 @@ class TestProviderValidation(base.TestCase):
driver.regions = []
controller = services.ServiceController(driver)
self.assertEqual(controller.driver.regions, [])
def test_get_provider_service_id(self):
driver = mock.Mock()
controller = services.ServiceController(driver)
mock_service = mock.Mock()
mock_service.service_id = uuid.uuid4()
self.assertEqual(
mock_service.service_id,
controller.get_provider_service_id(mock_service))
def test_get_metrics_by_domain(self):
driver = mock.Mock()
controller = services.ServiceController(driver)
self.assertEqual(
[],
controller.get_metrics_by_domain('project_id', 'domain_name', []))

View File

@ -0,0 +1,34 @@
# Copyright (c) 2016 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from poppy.provider.maxcdn import certificates
from tests.unit import base
class TestCertificates(base.TestCase):
def setUp(self):
super(TestCertificates, self).setUp()
self.driver = mock.Mock()
self.driver.provider_name = 'Maxcdn'
self.controller = certificates.CertificateController(self.driver)
def test_create_certificate(self):
self.assertEqual(
NotImplemented, self.controller.create_certificate({}))

View File

@ -76,8 +76,12 @@ class TestDriver(base.TestCase):
client = provider.client
self.assertNotEqual(client, None)
@mock.patch('poppy.provider.maxcdn.controllers.ServiceController')
@mock.patch.object(driver, 'MAXCDN_OPTIONS', new=MAXCDN_OPTIONS)
def test_service_controller(self, MockController):
def test_service_controller(self):
provider = driver.CDNProvider(self.conf)
self.assertNotEqual(provider.service_controller, None)
@mock.patch.object(driver, 'MAXCDN_OPTIONS', new=MAXCDN_OPTIONS)
def test_certificate_controller(self):
provider = driver.CDNProvider(self.conf)
self.assertIsNotNone(provider.certificate_controller)

View File

@ -218,8 +218,35 @@ class TestServices(base.TestCase):
@mock.patch('poppy.provider.maxcdn.driver.CDNProvider.client')
@mock.patch('poppy.provider.maxcdn.driver.CDNProvider')
def test_delete_with_exception(self, mock_controllerclient,
mock_driver):
def test_delete_failed(self, mock_controllerclient, mock_driver):
# test create with exceptions
mock_response = mock.MagicMock()
mock_response.text = 'Mock -- something went wrong!'
mock_dict = {u'code': 500}
mock_response.__getitem__.side_effect = mock_dict.__getitem__
driver = mock_driver()
driver.attach_mock(mock_controllerclient, 'client')
driver.client.configure_mock(
**{'delete.return_value': mock_response})
controller = services.ServiceController(driver)
service_name = 'test_service_name'
service_id = str(uuid.uuid4())
current_domain = str(uuid.uuid1())
domains_old = domain.Domain(domain=current_domain)
current_origin = origin.Origin(origin='poppy.org')
service_obj = Service(service_id=service_id,
name='poppy cdn service',
domains=[domains_old],
origins=[current_origin],
flavor_id='cdn',
project_id=str(uuid.uuid4()))
resp = controller.delete(service_obj, service_name)
self.assertIn('error', resp[driver.provider_name])
@mock.patch('poppy.provider.maxcdn.driver.CDNProvider.client')
@mock.patch('poppy.provider.maxcdn.driver.CDNProvider')
def test_delete_with_exception(self, mock_controllerclient, mock_driver):
# test create with exceptions
driver = mock_driver()
driver.attach_mock(mock_controllerclient, 'client')
@ -280,13 +307,33 @@ class TestServices(base.TestCase):
resp = controller_purge_with_error.purge(pullzone_id)
self.assertIn('id', resp[driver.provider_name])
@mock.patch('poppy.provider.maxcdn.driver.CDNProvider.client')
@mock.patch('poppy.provider.maxcdn.driver.CDNProvider')
def test_purge_failed(self, mock_controllerclient, mock_driver):
# test create with exceptions
mock_response = mock.MagicMock()
mock_response.text = 'Mock -- something went wrong!'
mock_dict = {u'code': 500}
mock_response.__getitem__.side_effect = mock_dict.__getitem__
driver = mock_driver()
driver.attach_mock(mock_controllerclient, 'client')
driver.client.configure_mock(
**{'purge.return_value': mock_response})
controller = services.ServiceController(driver)
pullzone_id = 'test_random_pullzone_id'
resp = controller.purge(pullzone_id)
self.assertIn('error', resp[driver.provider_name])
@ddt.data('good-service-name', 'yahooservice')
@mock.patch.object(driver.CDNProvider, 'client',
new=fake_maxcdn_api_client())
def test_map_service_name_no_hash(self, service_name):
maxcdn_driver = driver.CDNProvider(self.conf)
controller = services.ServiceController(maxcdn_driver)
self.assertEqual(controller._map_service_name(service_name),
mock_service = mock.Mock()
mock_service.name = service_name
self.assertEqual(controller.get_provider_service_id(mock_service),
service_name)
@ddt.data(u'www.düsseldorf-Lörick.com'.encode("utf-8"),
@ -337,3 +384,13 @@ class TestServices(base.TestCase):
driver.attach_mock(mock_controllerclient, 'client')
controller = services.ServiceController(driver)
self.assertEqual(controller.driver.regions, [])
@mock.patch('poppy.provider.maxcdn.driver.CDNProvider.client')
@mock.patch('poppy.provider.maxcdn.driver.CDNProvider')
def test_get_metrics_by_domain(self, mock_controllerclient, mock_driver):
driver = mock_driver()
driver.attach_mock(mock_controllerclient, 'client')
controller = services.ServiceController(driver)
self.assertEqual(
[],
controller.get_metrics_by_domain('project_id', 'domain_name', []))

View File

@ -0,0 +1,33 @@
# Copyright (c) 2016 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from poppy.provider.mock import certificates
from tests.unit import base
class TestCertificates(base.TestCase):
def setUp(self):
super(TestCertificates, self).setUp()
self.driver = mock.Mock()
self.driver.provider_name = 'Mock'
self.controller = certificates.CertificateController(self.driver)
def test_create_certificate(self):
self.assertIsNone(self.controller.create_certificate({}))

View File

@ -28,6 +28,7 @@ class MockProviderDriverTest(base.TestCase):
self.assertTrue(self.driver.is_alive())
def test_mock_provider_service_controller(self):
self.assertTrue(
self.driver.service_controller is not None
)
self.assertIsNotNone(self.driver.service_controller)
def test_mock_provider_certificate_controller(self):
self.assertIsNotNone(self.driver.certificate_controller)

View File

@ -85,3 +85,10 @@ class MockProviderServicesTest(base.TestCase):
def test_regions(self):
self.assertEqual(self.sc._driver.regions, [])
def test_get_provider_service_id(self):
self.assertEqual([], self.sc.get_provider_service_id({}))
def test_get_metrics_by_domain(self):
self.assertEqual(
[], self.sc.get_metrics_by_domain('project_id', 'domain_name', []))

View File

@ -70,13 +70,25 @@ class CassandraStorageFlavorsTests(base.TestCase):
self.assertEqual(
len(actual_response.providers), len(value[0]['providers']))
@ddt.file_data('data_get_flavor.json')
@mock.patch.object(flavors.FlavorsController, 'session')
@mock.patch.object(cassandra.cluster.Session, 'execute')
def test_get_flavor_multiple(self, value, mock_session, mock_execute):
# mock the response from cassandra
# more than one flavor returned
mock_execute.execute.return_value = [value[0], value[0]]
self.assertRaises(
LookupError, lambda: self.fc.get(value[0]['flavor_id']))
@ddt.file_data('data_get_flavor_bad.json')
@mock.patch.object(flavors.FlavorsController, 'session')
@mock.patch.object(cassandra.cluster.Session, 'execute')
def test_get_flavor_error(self, value, mock_session, mock_execute):
# mock the response from cassandra
mock_execute.execute.return_value = value
mock_execute.execute.return_value = []
self.assertRaises(
LookupError, lambda: self.fc.get(value[0]['flavor_id']))

View File

@ -0,0 +1,59 @@
# Copyright (c) 2016 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from poppy.transport.pecan import controllers
import poppy.transport.pecan.controllers.v1.admin
from poppy.transport.pecan.models.response import health
from tests.unit.transport.pecan.controllers import base
class HealthControllerTests(base.BasePecanControllerUnitTest):
def setUp(self):
super(HealthControllerTests, self).setUp(
poppy.transport.pecan.controllers.v1.admin
)
self.controller = controllers.v1.health.HealthController(self.driver)
self.manager = self.driver.manager
self.response.status = 0
def test_health_not_alive(self):
self.controller.base_url = 'base_url'
self.manager.health_controller.health.return_value = (
False, {
'dns': {
'is_alive': True,
'dns_name': 'example_cloud_dns'
},
'storage': {
'is_alive': True,
'storage_name': 'example_storage'
},
'distributed_task': {
'is_alive': True,
'distributed_task_name': 'example_distributed_task'
},
'providers': [
{
'is_alive': True,
'provider_name': 'example_provider'
}
],
}
)
response = self.controller.get()
self.assertIsInstance(response, health.HealthModel)
self.assertEqual(503, self.response.status)

View File

@ -0,0 +1,36 @@
# Copyright (c) 2016 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from poppy.transport.pecan import controllers
import poppy.transport.pecan.controllers.v1.admin
from tests.unit.transport.pecan.controllers import base
class PingControllerTests(base.BasePecanControllerUnitTest):
def setUp(self):
super(PingControllerTests, self).setUp(
poppy.transport.pecan.controllers.v1.admin
)
self.controller = controllers.v1.ping.PingController(self.driver)
self.manager = self.driver.manager
def test_ping_not_alive(self):
self.manager.health_controller.ping_check.return_value = (
{}, False
)
response = self.controller.get()
self.assertEqual(503, response.status_code)