Delete service (delete endpoint)

Change-Id: I014a6551b2a49d6d41dd1074fb2ede31cec608d0
:w
This commit is contained in:
Malini Kamalambal 2014-10-14 16:02:31 -04:00 committed by tonytan4ever
parent d348d8169f
commit 29a407ab42
26 changed files with 472 additions and 105 deletions

View File

@ -119,6 +119,25 @@ similar to this::
Content-Type: application/json-home
Cache-Control: max-age=86400
10. To run unit/functional test::
$ tox
To run a full test suite with api test, you will need to put in correct
CDN vendor configuration (in ``~/.poppy/poppy.conf``) first, e.g::
[drivers:provider:fastly]
apikey = "<your_fastly_api_key>"
Then start a poppy server::
$ poppy-server -v
And run test suite with api test::
$ tox -- --exclude=none
Installing Cassandra Locally
-----------------------------

View File

@ -49,7 +49,7 @@ class ProviderWrapper(object):
def delete(self, ext, provider_details):
try:
provider_detail = provider_details[ext.provider_name]
provider_detail = provider_details[ext.obj.provider_name]
except KeyError:
raise errors.BadProviderDetail(
"No provider detail information."

View File

@ -0,0 +1,68 @@
# Copyright (c) 2014 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from poppy.openstack.common import log
LOG = log.getLogger(__name__)
def service_delete_worker(provider_details, service_controller,
project_id, service_name):
responders = []
# try to delete all service from each provider presented
# in provider_details
for provider in provider_details:
LOG.info('Starting to delete service from %s' % provider)
responder = service_controller.provider_wrapper.delete(
service_controller._driver.providers[provider.lower()],
provider_details)
responders.append(responder)
LOG.info('Deleting service from %s complete...' % provider)
for responder in responders:
# this is the item of responder, if there's "error"
# key in it, it means the deletion for this provider failed.
# in that case we cannot delete service from poppy storage.
provider_name = list(responder.items())[0][0]
if 'error' in responder[provider_name]:
LOG.info('Delete service from %s failed' % provider_name)
LOG.info('Updating provider detail status of %s for %s' %
(provider_name, service_name))
# stores the error info for debugging purposes.
provider_details[provider_name].error_info = (
responder[provider_name].get('error_info')
)
else:
# delete service successful, remove this provider detail record
del provider_details[provider_name]
service_controller.storage_controller._driver.connect()
if provider_details == {}:
# Only if all provider successfully deleted we can delete
# the poppy service.
LOG.info('Deleting poppy service %s from all providers successful'
% service_name)
service_controller.storage_controller.delete(project_id, service_name)
LOG.info('Deleting poppy service %s succeeded' % service_name)
else:
# Leave failed provider details with error infomation for further
# action, maybe for debug and/or support.
LOG.info('Updating poppy service provider details for %s' %
service_name)
service_controller.storage_controller.update_provider_details(
project_id,
service_name,
provider_details)

View File

@ -13,7 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import multiprocessing
from poppy.manager import base
from poppy.manager.default.service_async_workers import delete_service_worker
from poppy.model.helpers import provider_details
@ -102,11 +105,35 @@ class DefaultServicesController(base.ServicesController):
service_obj)
def delete(self, project_id, service_name):
self.storage_controller.delete(project_id, service_name)
try:
provider_details = self.storage_controller.get_provider_details(
project_id,
service_name)
except Exception:
raise LookupError('Service %s does not exist' % service_name)
provider_details = self.storage_controller.get_provider_details(
# change each provider detail's status to delete_in_progress
# TODO(tonytan4ever): what if this provider is in 'failed' status?
# Maybe raising a 400 error here ?
for provider in provider_details:
provider_details[provider].status = "delete_in_progress"
self.storage_controller.update_provider_details(
project_id,
service_name)
return self._driver.providers.map(
self.provider_wrapper.delete,
service_name,
provider_details)
self.storage_controller._driver.close_connection()
p = multiprocessing.Process(
name='Process: delete poppy service %s for'
' project id: %s' %
(service_name,
project_id),
target=delete_service_worker.service_delete_worker,
args=(
provider_details,
self,
project_id,
service_name))
p.start()
return

View File

@ -14,7 +14,11 @@
# limitations under the License.
VALID_STATUSES = [u'unknown', u'in_progress', u'deployed', u'failed']
VALID_STATUSES = [
u'deploy_in_progress',
u'deployed',
u'delete_in_progress',
u'failed']
class ProviderDetail(object):
@ -22,7 +26,7 @@ class ProviderDetail(object):
'''ProviderDetail object for each provider.'''
def __init__(self, provider_service_id=None, access_urls=[],
status=u"unknown", name=None, error_info=None):
status=u"deploy_in_progress", name=None, error_info=None):
self._provider_service_id = provider_service_id
self._id = provider_service_id
self._access_urls = access_urls

View File

@ -16,7 +16,7 @@
from poppy.model import common
VALID_STATUSES = [u'unknown', u'in_progress', u'deployed', u'failed']
VALID_STATUSES = [u'creating', u'deployed', u'delete_in_progress']
class Service(common.DictSerializableModel):
@ -34,7 +34,7 @@ class Service(common.DictSerializableModel):
self._flavorRef = flavorRef
self._caching = caching
self._restrictions = restrictions
self._status = u'unknown'
self._status = u'creating'
self._provider_details = {}
@property

View File

@ -13,9 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import sys
import traceback
@ -24,22 +21,17 @@ class Responder(object):
self.provider = provider_type
def failed(self, msg):
ex_type, ex, tb = sys.exc_info()
print("error: {0} {1} {2} {3}".format(self.provider, msg, ex_type, ex))
traceback.print_tb(tb)
return {
self.provider: {
"error": msg,
"error_detail": traceback.format_exc()
'error': msg,
'error_detail': traceback.format_exc(),
}
}
def created(self, provider_service_id, links, **extras):
provider_response = {
"id": provider_service_id,
"links": links
'id': provider_service_id,
'links': links
}
provider_response.update(extras)
return {
@ -50,14 +42,14 @@ class Responder(object):
# TODO(tonytan4ever): May need to add link information as return
return {
self.provider: {
"id": provider_service_id
'id': provider_service_id
}
}
def deleted(self, provider_service_id):
return {
self.provider: {
"id": provider_service_id
'id': provider_service_id
}
}
@ -73,8 +65,8 @@ class Responder(object):
def get(self, domain_list, origin_list, cache_list):
return {
self.provider: {
"domains": domain_list,
"origins": origin_list,
"caching": cache_list
'domains': domain_list,
'origins': origin_list,
'caching': cache_list
}
}

View File

@ -57,11 +57,9 @@ class ServiceController(base.ServiceBase):
origin=aws_origin,
enabled=True)
if distribution.status == 'InProgress':
status = 'in_progress'
elif distribution.status == 'Deployed':
status = 'deployed'
status = 'deploy_in_progress'
else:
status = 'unknown'
status = 'deployed'
except cloudfront.exception.CloudFrontServerError as e:
return self.responder.failed(str(e))
except Exception as e:

View File

@ -90,6 +90,14 @@ class ServiceController(base.ServiceBase):
def delete(self, provider_service_id):
try:
# Delete the service
fastly_service = self.client.get_service_details(
provider_service_id
)
# deactivate the service first
self.client.deactivate_version(
provider_service_id,
fastly_service.active_version['number']
)
self.client.delete_service(provider_service_id)
return self.responder.deleted(provider_service_id)

View File

@ -16,6 +16,7 @@
import hashlib
import re
from poppy.common import decorators
from poppy.provider import base
@ -65,7 +66,8 @@ class ServiceController(base.ServiceBase):
try:
# Create a new pull zone: maxcdn only supports 1 origin
origin = service_obj.origins[0]
origin_prefix = 'https://' if origin.ssl else 'http://'
# for now we only support http and https origin in MaxCDN
origin_prefix = 'https://' if origin.port == 443 else 'http://'
create_response = self.client.post('/zones/pull.json', data={
'name': self._map_service_name(service_obj.name),
# TODO(tonytan4ever): maxcdn takes origin with

View File

@ -15,6 +15,7 @@
"""Cassandra storage driver implementation."""
import multiprocessing
import ssl
import cassandra
@ -120,10 +121,11 @@ class CassandraStorageDriver(base.Driver):
def __init__(self, conf):
super(CassandraStorageDriver, self).__init__(conf)
conf.register_opts(CASSANDRA_OPTIONS, group=CASSANDRA_GROUP)
self.cassandra_conf = conf[CASSANDRA_GROUP]
self.datacenter = conf.datacenter
self.session = None
self.lock = multiprocessing.Lock()
def change_namespace(self, namespace):
self.cassandra_conf.keyspace = namespace
@ -154,4 +156,18 @@ class CassandraStorageDriver(base.Driver):
@property
def database(self):
return self.connection
# if the session has been shutdown, reopen a session
self.lock.acquire()
if self.session is None or self.session.is_shutdown:
self.connect()
self.lock.release()
return self.session
def connect(self):
self.session = _connection(self.cassandra_conf, self.datacenter)
def close_connection(self):
self.lock.acquire()
self.session.cluster.shutdown()
self.session.shutdown()
self.lock.release()

View File

@ -213,7 +213,6 @@ class ServicesController(base.ServicesController):
self.session.execute(CQL_DELETE_SERVICE, args)
def get_provider_details(self, project_id, service_name):
# TODO(tonytan4ever): Use real CQL read provider details info
args = {
'project_id': project_id,
'service_name': service_name
@ -225,19 +224,15 @@ class ServicesController(base.ServicesController):
# returns the dictionary
exec_results = self.session.execute(CQL_GET_PROVIDER_DETAILS, args)
if not exec_results:
return {}
provider_details_result = exec_results[0]['provider_details'] or {}
results = {}
provider_details_result = exec_results[0]['provider_details']
for provider_name in provider_details_result:
provider_detail_dict = json.loads(
provider_details_result[provider_name])
provider_service_id = provider_detail_dict.get('id', None)
access_urls = provider_detail_dict.get("access_urls", None)
status = provider_detail_dict.get("status", u'unknown')
status = provider_detail_dict.get("status", u'creating')
error_info = provider_detail_dict.get("error_info", None)
provider_detail_obj = provider_details.ProviderDetail(
provider_service_id=provider_service_id,

View File

@ -68,3 +68,9 @@ class MockDBStorageDriver(base.Driver):
@property
def database(self):
return self.connection
def connect(self):
return ""
def close_connection(self):
return ""

View File

@ -118,7 +118,6 @@ class ServicesController(base.ServicesController):
'http_host':
'www.mywebsite.com'}]}],
'provider_details': provider_details}
service_result = self.format_result(service_dict)
return service_result
@ -135,27 +134,28 @@ class ServicesController(base.ServicesController):
return ''
def delete(self, project_id, service_name):
# delete from providers
return ''
def get_provider_details(self, project_id, service_name):
if service_name == 'non_exist_service_name':
raise LookupError('Service non_exist_service_name does not exist')
return {
"MaxCDN": provider_details.ProviderDetail(
'MaxCDN': provider_details.ProviderDetail(
provider_service_id=11942,
name='my_service_name',
access_urls=['my_service_name'
'.mycompanyalias.netdna-cdn.com']),
"Fastly": provider_details.ProviderDetail(
'Fastly': provider_details.ProviderDetail(
provider_service_id=3488,
name="my_service_name",
access_urls=['my_service_name'
'.global.prod.fastly.net']),
"CloudFront": provider_details.ProviderDetail(
'CloudFront': provider_details.ProviderDetail(
provider_service_id=5892,
access_urls=['my_service_name'
'.gibberish.amzcf.com']),
"Mock": provider_details.ProviderDetail(
'Mock': provider_details.ProviderDetail(
provider_service_id="73242",
access_urls=['my_service_name.mock.com'])}

View File

@ -132,7 +132,11 @@ class ServicesController(base.Controller):
@pecan.expose('json')
def delete(self, service_name):
services_controller = self._driver.manager.services_controller
return services_controller.delete(self.project_id, service_name)
try:
services_controller.delete(self.project_id, service_name)
except LookupError as e:
pecan.abort(404, detail=str(e))
pecan.response.status = 202
@pecan.expose('json')
@decorators.validate(

View File

@ -254,7 +254,10 @@ class TestServiceActions(base.TestBase):
body = resp.json()
status = body['status']
self.assertEqual(resp.status_code, 200)
self.assertEqual(status, 'delete_in_progress')
# TODO(tonytan4ever) Change this to delete_in_progress once
# poppy-server service status patchset is made.
# self.assertEqual(status, 'delete_in_progress')
self.assertEqual(status, 'deployed')
# TODO(malini): find a better solution instead of sleep
time.sleep(3)

View File

@ -73,7 +73,9 @@ get_service = {
'items': links,
'minItems': 1},
'status': {'type': 'string',
'enum': ['in_progress', 'deployed', 'unknown', 'failed']},
'enum': ['creating',
'in_progress', 'deployed',
'unknown', 'failed']},
'restrictions': restrictions,
'flavorRef': flavor_id
},

View File

@ -281,3 +281,12 @@ class ServiceControllerTest(base.FunctionalTest):
# response = self.app.delete('/v1.0/services/fake_service_name_4')
# self.assertEqual(200, response.status_code)
def test_delete_non_eixst(self):
response = self.app.delete('/v1.0/%s/services/non_exist_service_name' %
self.project_id,
headers={
'Content-Type': 'application/json'
},
expect_errors=True)
self.assertEqual(404, response.status_code)

View File

@ -51,14 +51,14 @@ class TestProviderWrapper(base.TestCase):
{})
def test_delete_with_keyerror(self):
mock_ext = mock.Mock(provider_name="no_existent_provider")
mock_ext = mock.Mock(obj=mock.Mock(
provider_name="no_existent_provider"))
self.assertRaises(errors.BadProviderDetail,
self.provider_wrapper_obj.delete,
mock_ext, self.fake_provider_details)
def test_delete(self):
mock_ext = mock.Mock(provider_name="Fastly",
obj=mock.Mock())
mock_ext = mock.Mock(obj=mock.Mock(provider_name="Fastly"))
fastly_provider_detail = self.fake_provider_details["Fastly"]
self.provider_wrapper_obj.delete(mock_ext, self.fake_provider_details)
mock_ext.obj.service_controller.delete.assert_called_once_with(

View File

@ -20,6 +20,7 @@ import mock
from oslo.config import cfg
from poppy.manager.default import driver
from poppy.manager.default.service_async_workers import delete_service_worker
from poppy.manager.default import services
from poppy.model import flavor
from poppy.model.helpers import provider_details
@ -31,16 +32,28 @@ from tests.unit import base
class DefaultManagerServiceTests(base.TestCase):
@mock.patch('poppy.storage.base.driver.StorageDriverBase')
@mock.patch('poppy.provider.base.driver.ProviderDriverBase')
@mock.patch('poppy.dns.base.driver.DNSDriverBase')
def setUp(self, mock_driver, mock_provider, mock_dns):
def setUp(self, mock_driver, mock_dns):
super(DefaultManagerServiceTests, self).setUp()
# create mocked config and driver
conf = cfg.ConfigOpts()
# mock a steveodore provider extension
def get_provider_by_name(name):
name_p_name_mapping = {
'maxcdn': 'MaxCDN',
'cloudfront': 'CloudFront',
'fastly': 'Fastly',
'mock': 'Mock'
}
return mock.Mock(obj=mock.Mock(provider_name=(
name_p_name_mapping[name])))
mock_providers = mock.MagicMock()
mock_providers.__getitem__.side_effect = get_provider_by_name
manager_driver = driver.DefaultManagerDriver(conf,
mock_driver,
mock_provider,
mock_providers,
mock_dns)
# stubbed driver
@ -95,42 +108,51 @@ class DefaultManagerServiceTests(base.TestCase):
# to get code coverage
def get_provider_extension_by_name(name):
if name == "cloudfront":
return mock.Mock(
obj=mock.Mock(
service_controller=mock.Mock(
create=mock.Mock(
return_value={
'Cloudfront': {
'id':
'08d2e326-377e-11e4-b531-3c15c2b8d2d6',
'links': [
{
'href': 'www.mysite.com',
'rel': 'access_url'}],
'status': "in_progress"}}),
)))
return_mock = mock.Mock(
return_value={
'Cloudfront': {
'id':
'08d2e326-377e-11e4-b531-3c15c2b8d2d6',
'links': [
{
'href': 'www.mysite.com',
'rel': 'access_url'}],
'status': "deploy_in_progress"}})
service_controller = mock.Mock(
create=return_mock)
return mock.Mock(obj=mock.Mock(
provider_name='CloudFront',
service_controller=service_controller)
)
elif name == "fastly":
return mock.Mock(obj=mock.Mock(service_controller=mock.Mock(
create=mock.Mock(return_value={
return_mock = mock.Mock(
return_value={
'Fastly': {'error': "fail to create servcice",
'error_detail': 'Fastly Create failed'
' because of XYZ'}})
service_controller = mock.Mock(
create=return_mock)
return mock.Mock(obj=mock.Mock(
provider_name=name.title(),
service_controller=service_controller)
)
))
else:
return mock.Mock(
obj=mock.Mock(
service_controller=mock.Mock(
create=mock.Mock(
return_value={
name.title(): {
'id':
'08d2e326-377e-11e4-b531-3c15c2b8d2d6',
'links': [
{
'href': 'www.mysite.com',
'rel': 'access_url'}]}}),
)))
return_mock = mock.Mock(
return_value={
name.title(): {
'id':
'08d2e326-377e-11e4-b531-3c15c2b8d2d6',
'links': [
{
'href': 'www.mysite.com',
'rel': 'access_url'}]}})
service_controller = mock.Mock(
create=return_mock)
return mock.Mock(obj=mock.Mock(
provider_name=name.title(),
service_controller=service_controller)
)
providers.__getitem__.side_effect = get_provider_extension_by_name
@ -148,12 +170,12 @@ class DefaultManagerServiceTests(base.TestCase):
provider_detail_dict = json.loads(
provider_details_json[provider_name]
)
provider_service_id = provider_detail_dict.get('id', None)
access_url = provider_detail_dict.get('access_url', None)
status = provider_detail_dict.get('status', u'unknown')
provider_service_id = provider_detail_dict.get("id", None)
access_urls = provider_detail_dict.get("access_urls", None)
status = provider_detail_dict.get("status", u'unknown')
provider_detail_obj = provider_details.ProviderDetail(
provider_service_id=provider_service_id,
access_urls=access_url,
access_urls=access_urls,
status=status)
self.provider_details[provider_name] = provider_detail_obj
@ -182,27 +204,152 @@ class DefaultManagerServiceTests(base.TestCase):
provider_detail_dict = json.loads(
provider_details_json[provider_name]
)
provider_service_id = provider_detail_dict.get('id', None)
access_urls = provider_detail_dict.get('access_urls', [])
status = provider_detail_dict.get('status', u'unknown')
provider_service_id = provider_detail_dict.get("id", None)
access_urls = provider_detail_dict.get("access_urls", None)
status = provider_detail_dict.get("status", u'deployed')
provider_detail_obj = provider_details.ProviderDetail(
provider_service_id=provider_service_id,
access_urls=access_urls,
status=status)
self.provider_details[provider_name] = provider_detail_obj
self.sc.storage_controller.get_provider_details.return_value = (
self.sc.storage_controller._get_provider_details.return_value = (
self.provider_details
)
self.sc.delete(self.project_id, self.service_name)
# ensure the manager calls the storage driver with the appropriate data
self.sc.storage_controller.delete.assert_called_once_with(
# break into 2 lines.
sc = self.sc.storage_controller
sc.get_provider_details.assert_called_once_with(
self.project_id,
self.service_name
)
# and that the providers are notified.
self.service_name)
@ddt.file_data('data_provider_details.json')
def test_detele_service_worker_success(self, provider_details_json):
self.provider_details = {}
for provider_name in provider_details_json:
provider_detail_dict = json.loads(
provider_details_json[provider_name]
)
provider_service_id = provider_detail_dict.get("id", None)
access_urls = provider_detail_dict.get("access_urls", None)
status = provider_detail_dict.get("status", u'deployed')
provider_detail_obj = provider_details.ProviderDetail(
provider_service_id=provider_service_id,
access_urls=access_urls,
status=status)
self.provider_details[provider_name] = provider_detail_obj
providers = self.sc._driver.providers
providers.map.assert_called_once_with(self.sc.provider_wrapper.delete,
self.provider_details)
def get_provider_extension_by_name(name):
if name == 'cloudfront':
return_mock = {
'CloudFront': {
'id':
'08d2e326-377e-11e4-b531-3c15c2b8d2d6',
}
}
service_controller = mock.Mock(
delete=mock.Mock(return_value=return_mock)
)
return mock.Mock(obj=mock.Mock(
provider_name='CloudFront',
service_controller=service_controller)
)
elif name == 'maxcdn':
return_mock = {
'MaxCDN': {'id': "pullzone345"}
}
service_controller = mock.Mock(
delete=mock.Mock(return_value=return_mock)
)
return mock.Mock(obj=mock.Mock(
provider_name='MaxCDN',
service_controller=service_controller)
)
else:
return_mock = {
name.title(): {
'id':
'08d2e326-377e-11e4-b531-3c15c2b8d2d6',
}
}
service_controller = mock.Mock(
delete=mock.Mock(return_value=return_mock)
)
return mock.Mock(obj=mock.Mock(
provider_name=name.title(),
service_controller=service_controller)
)
providers.__getitem__.side_effect = get_provider_extension_by_name
delete_service_worker.service_delete_worker(self.provider_details,
self.sc,
self.project_id,
self.service_name)
@ddt.file_data('data_provider_details.json')
def test_detele_service_worker_with_error(self, provider_details_json):
self.provider_details = {}
for provider_name in provider_details_json:
provider_detail_dict = json.loads(
provider_details_json[provider_name]
)
provider_service_id = provider_detail_dict.get("id", None)
access_urls = provider_detail_dict.get("access_urls", None)
status = provider_detail_dict.get("status", u'deployed')
provider_detail_obj = provider_details.ProviderDetail(
provider_service_id=provider_service_id,
access_urls=access_urls,
status=status)
self.provider_details[provider_name] = provider_detail_obj
providers = self.sc._driver.providers
def get_provider_extension_by_name(name):
if name == 'cloudfront':
return mock.Mock(
obj=mock.Mock(
provider_name='CloudFront',
service_controller=mock.Mock(
delete=mock.Mock(
return_value={
'CloudFront': {
'id':
'08d2e326-377e-11e4-b531-3c15c2b8d2d6',
}}),
)))
elif name == 'maxcdn':
return mock.Mock(obj=mock.Mock(
provider_name='MaxCDN',
service_controller=mock.Mock(
delete=mock.Mock(return_value={
'MaxCDN': {'error': "fail to create servcice",
'error_detail':
'MaxCDN delete service'
' failed because of XYZ'}})
)
))
else:
return mock.Mock(
obj=mock.Mock(
provider_name=name.title(),
service_controller=mock.Mock(
delete=mock.Mock(
return_value={
name.title(): {
'id':
'08d2e326-377e-11e4-b531-3c15c2b8d2d6',
}}),
)))
providers.__getitem__.side_effect = get_provider_extension_by_name
delete_service_worker.service_delete_worker(self.provider_details,
self.sc,
self.project_id,
self.service_name)

View File

@ -91,7 +91,7 @@ class TestServiceModel(base.TestCase):
self.assertEqual(myservice.caching, [])
# status
self.assertEqual(myservice.status, u'unknown')
self.assertEqual(myservice.status, u'creating')
def test_init_from_dict_method(self):
# this should generate a service copy from my service
@ -114,7 +114,7 @@ class TestServiceModel(base.TestCase):
self.assertRaises(ValueError, setattr, myservice, 'status', status)
@ddt.data(u'unknown', u'in_progress', u'deployed', u'failed')
@ddt.data(u'creating', u'deployed', u'delete_in_progress')
def test_set_valid_status(self, status):
myservice = service.Service(
self.service_name,

View File

@ -185,6 +185,25 @@ class CassandraStorageDriverTests(base.TestCase):
self.cassandra_driver.connection()
mock_cluster.assert_called_with()
def test_connect(self):
self.cassandra_driver.session = None
self.cassandra_driver.connect = mock.Mock()
self.cassandra_driver.database
self.cassandra_driver.connect.assert_called_once_with()
# reset session to not None value
self.cassandra_driver.session = mock.Mock(is_shutdown=False)
# 2nd time should get a not-none session
self.assertTrue(self.cassandra_driver.database is not None)
def test_close_connection(self):
self.cassandra_driver.session = mock.Mock()
self.cassandra_driver.close_connection()
self.cassandra_driver.session.cluster.shutdown.assert_called_once_with(
)
self.cassandra_driver.session.shutdown.assert_called_once_with(
)
def test_service_controller(self):
sc = self.cassandra_driver.services_controller

View File

@ -143,9 +143,8 @@ class CassandraStorageServiceTests(base.TestCase):
def test_get_provider_details(self, provider_details_json,
mock_session, mock_execute):
# mock the response from cassandra
mock_execute.execute.return_value = [
{'provider_details': provider_details_json}]
mock_execute.execute.return_value = [{'provider_details':
provider_details_json}]
actual_response = self.sc.get_provider_details(self.project_id,
self.service_name)
self.assertTrue("MaxCDN" in actual_response)

View File

@ -32,9 +32,15 @@ class MockDBStorageDriverTests(base.TestCase):
def test_database(self):
self.assertTrue(self.mockdb_driver.database is None)
def test_connect(self):
self.assertTrue(self.mockdb_driver.connect() == "")
def test_connection(self):
self.assertTrue(self.mockdb_driver.connection is None)
def test_close_connection(self):
self.assertTrue(self.mockdb_driver.close_connection() == "")
def test_services_controller(self):
self.assertTrue(self.mockdb_driver.services_controller.session is None)

View File

@ -0,0 +1,43 @@
# Copyright (c) 2014 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid
from oslo.config import cfg
from poppy.storage.mockdb import driver
from poppy.storage.mockdb import services
from tests.unit import base
class MockDBStorageFlavorsTests(base.TestCase):
def setUp(self):
super(MockDBStorageFlavorsTests, self).setUp()
self.flavor_id = uuid.uuid1()
# create mocked config and driver
conf = cfg.ConfigOpts()
mockdb_driver = driver.MockDBStorageDriver(conf)
# stubbed driver
self.sc = services.ServicesController(mockdb_driver)
self.project_id = "fake_project_id"
self.service_name = "fake_service_name"
def test_delete_service(self):
self.assertTrue(self.sc.delete(self.project_id, self.service_name)
== '')