Merge "Cleanup DB API unit tests"

This commit is contained in:
Jenkins 2015-07-28 17:12:48 +00:00 committed by Gerrit Code Review
commit 4a1b70bf64
9 changed files with 1016 additions and 1063 deletions

View File

@ -25,6 +25,7 @@ from manila.share import api as share_api
from manila import test
from manila.tests.api.contrib import stubs
from manila.tests.api import fakes
from manila.tests import db_utils
CONF = cfg.CONF
@ -48,8 +49,7 @@ class AdminActionsTest(test.TestCase):
def test_reset_status_as_admin(self):
# current status is available
share = db.share_create(
self.admin_context, {'status': constants.STATUS_AVAILABLE})
share = db_utils.create_share(status=constants.STATUS_AVAILABLE)
req = webob.Request.blank('/v1/fake/shares/%s/action' % share['id'])
req.method = 'POST'
req.headers['content-type'] = 'application/json'
@ -67,8 +67,7 @@ class AdminActionsTest(test.TestCase):
def test_reset_status_as_non_admin(self):
# current status is 'error'
share = db.share_create(context.get_admin_context(),
{'status': constants.STATUS_ERROR})
share = db_utils.create_share(status=constants.STATUS_ERROR)
req = webob.Request.blank('/v1/fake/shares/%s/action' % share['id'])
req.method = 'POST'
req.headers['content-type'] = 'application/json'
@ -86,8 +85,7 @@ class AdminActionsTest(test.TestCase):
def test_malformed_reset_status_body(self):
# current status is available
share = db.share_create(
self.admin_context, {'status': constants.STATUS_AVAILABLE})
share = db_utils.create_share(status=constants.STATUS_AVAILABLE)
req = webob.Request.blank('/v1/fake/shares/%s/action' % share['id'])
req.method = 'POST'
req.headers['content-type'] = 'application/json'
@ -104,8 +102,7 @@ class AdminActionsTest(test.TestCase):
def test_invalid_status_for_share(self):
# current status is available
share = db.share_create(
self.admin_context, {'status': constants.STATUS_AVAILABLE})
share = db_utils.create_share(status=constants.STATUS_AVAILABLE)
req = webob.Request.blank('/v1/fake/shares/%s/action' % share['id'])
req.method = 'POST'
req.headers['content-type'] = 'application/json'
@ -141,11 +138,11 @@ class AdminActionsTest(test.TestCase):
def test_snapshot_reset_status(self):
# snapshot in 'error_deleting'
share = db.share_create(self.admin_context, {})
snapshot = db.share_snapshot_create(
self.admin_context,
{'status': constants.STATUS_ERROR_DELETING,
'share_id': share['id']})
share = db_utils.create_share()
snapshot = db_utils.create_snapshot(
status=constants.STATUS_ERROR_DELETING,
share_id=share['id']
)
req = webob.Request.blank('/v1/fake/snapshots/%s/action' %
snapshot['id'])
req.method = 'POST'
@ -164,10 +161,11 @@ class AdminActionsTest(test.TestCase):
def test_invalid_status_for_snapshot(self):
# snapshot in 'available'
share = db.share_create(self.admin_context, {})
snapshot = db.share_snapshot_create(
self.admin_context,
{'status': constants.STATUS_AVAILABLE, 'share_id': share['id']})
share = db_utils.create_share()
snapshot = db_utils.create_snapshot(
status=constants.STATUS_AVAILABLE,
share_id=share['id']
)
req = webob.Request.blank('/v1/fake/snapshots/%s/action' %
snapshot['id'])
req.method = 'POST'
@ -185,7 +183,7 @@ class AdminActionsTest(test.TestCase):
self.assertEqual(snapshot['status'], constants.STATUS_AVAILABLE)
def test_admin_force_delete_share(self):
share = db.share_create(self.admin_context, {'size': 1})
share = db_utils.create_share(size=1, override_defaults=True)
req = webob.Request.blank('/v1/fake/shares/%s/action' % share['id'])
req.method = 'POST'
req.headers['content-type'] = 'application/json'
@ -199,7 +197,7 @@ class AdminActionsTest(test.TestCase):
share['id'])
def test_member_force_delete_share(self):
share = db.share_create(self.admin_context, {'size': 1})
share = db_utils.create_share(size=1)
req = webob.Request.blank('/v1/fake/shares/%s/action' % share['id'])
req.method = 'POST'
req.headers['content-type'] = 'application/json'

View File

@ -1,3 +1,5 @@
# Copyright 2013 OpenStack Foundation
# Copyright (c) 2014 NetApp, Inc.
# Copyright (c) 2015 Rushil Chugh
# All Rights Reserved.
#
@ -16,21 +18,44 @@
"""Testing of SQLAlchemy backend."""
import ddt
from oslo_db import exception as db_exception
from oslo_utils import uuidutils
import six
from manila.common import constants
from manila import context
from manila.db.sqlalchemy import api
from manila.db.sqlalchemy import api as db_api
from manila.db.sqlalchemy import models
from manila import exception
from manila import test
from manila.tests import db_utils
security_service_dict = {
'id': 'fake id',
'project_id': 'fake project',
'type': 'ldap',
'dns_ip': 'fake dns',
'server': 'fake ldap server',
'domain': 'fake ldap domain',
'user': 'fake user',
'password': 'fake password',
'name': 'whatever',
'description': 'nevermind',
}
class BaseDatabaseAPITestCase(test.TestCase):
def _check_fields(self, expected, actual):
for key in expected:
self.assertEqual(actual[key], expected[key])
@ddt.ddt
class SQLAlchemyAPIShareTestCase(test.TestCase):
class GenericDatabaseAPITestCase(test.TestCase):
def setUp(self):
"""Run before each test."""
super(SQLAlchemyAPIShareTestCase, self).setUp()
super(GenericDatabaseAPITestCase, self).setUp()
self.ctxt = context.get_admin_context()
@ddt.unpack
@ -43,61 +68,96 @@ class SQLAlchemyAPIShareTestCase(test.TestCase):
def test_ensure_model_values_has_id(self, values, call_count):
self.mock_object(uuidutils, 'generate_uuid')
api.ensure_model_dict_has_id(values)
db_api.ensure_model_dict_has_id(values)
self.assertEqual(call_count, uuidutils.generate_uuid.call_count)
self.assertIn('id', values)
def test_custom_query(self):
share = db_utils.create_share()
share_access = db_utils.create_access(share_id=share['id'])
db_api.share_access_delete(self.ctxt, share_access.id)
self.assertRaises(exception.NotFound, db_api.share_access_get,
self.ctxt, share_access.id)
class ShareDatabaseAPITestCase(test.TestCase):
def setUp(self):
"""Run before each test."""
super(ShareDatabaseAPITestCase, self).setUp()
self.ctxt = context.get_admin_context()
def test_share_filter_by_host_with_pools(self):
shares = [[api.share_create(self.ctxt, {'host': value})
shares = [[db_api.share_create(self.ctxt, {'host': value})
for value in ('foo', 'foo#pool0')]]
api.share_create(self.ctxt, {'host': 'foobar'})
db_utils.create_share()
self._assertEqualListsOfObjects(shares[0],
api.share_get_all_by_host(
db_api.share_get_all_by_host(
self.ctxt, 'foo'),
ignored_keys=['share_type',
'share_type_id',
'export_locations'])
def test_share_filter_all_by_host_with_pools_multiple_hosts(self):
shares = [[api.share_create(self.ctxt, {'host': value})
shares = [[db_api.share_create(self.ctxt, {'host': value})
for value in ('foo', 'foo#pool0', 'foo', 'foo#pool1')]]
api.share_create(self.ctxt, {'host': 'foobar'})
db_utils.create_share()
self._assertEqualListsOfObjects(shares[0],
api.share_get_all_by_host(
db_api.share_get_all_by_host(
self.ctxt, 'foo'),
ignored_keys=['share_type',
'share_type_id',
'export_locations'])
def test_share_export_locations_update_valid_order(self):
share = api.share_create(self.ctxt, {'host': 'foobar'})
class ShareExportLocationsDatabaseAPITestCase(test.TestCase):
def setUp(self):
"""Run before each test."""
super(ShareExportLocationsDatabaseAPITestCase, self).setUp()
self.ctxt = context.get_admin_context()
def test_update_valid_order(self):
share = db_utils.create_share()
initial_locations = ['fake1/1/', 'fake2/2', 'fake3/3']
update_locations = ['fake4/4', 'fake2/2', 'fake3/3']
# add initial locations
api.share_export_locations_update(self.ctxt, share['id'],
initial_locations, False)
db_api.share_export_locations_update(self.ctxt, share['id'],
initial_locations, False)
# update locations
api.share_export_locations_update(self.ctxt, share['id'],
update_locations, True)
actual_result = api.share_export_locations_get(self.ctxt, share['id'])
db_api.share_export_locations_update(self.ctxt, share['id'],
update_locations, True)
actual_result = db_api.share_export_locations_get(self.ctxt,
share['id'])
# actual result should contain locations in exact same order
self.assertTrue(actual_result == update_locations)
def test_share_export_locations_update_string(self):
share = api.share_create(self.ctxt, {'host': 'foobar'})
def test_update_string(self):
share = db_utils.create_share()
initial_location = 'fake1/1/'
api.share_export_locations_update(self.ctxt, share['id'],
initial_location, False)
actual_result = api.share_export_locations_get(self.ctxt, share['id'])
db_api.share_export_locations_update(self.ctxt, share['id'],
initial_location, False)
actual_result = db_api.share_export_locations_get(self.ctxt,
share['id'])
self.assertTrue(actual_result == [initial_location])
@ddt.ddt
class DriverPrivateDataDatabaseAPITestCase(test.TestCase):
def setUp(self):
"""Run before each test."""
super(DriverPrivateDataDatabaseAPITestCase, self).setUp()
self.ctxt = context.get_admin_context()
def _get_driver_test_data(self):
return ("fake@host", uuidutils.generate_uuid())
@ -106,98 +166,756 @@ class SQLAlchemyAPIShareTestCase(test.TestCase):
{"details": {"foo": "bar", "tee": ["test"]},
"valid": {"foo": "bar", "tee": six.text_type(["test"])}})
@ddt.unpack
def test_driver_private_data_update(self, details, valid):
def test_update(self, details, valid):
test_host, test_id = self._get_driver_test_data()
initial_data = api.driver_private_data_get(
initial_data = db_api.driver_private_data_get(
self.ctxt, test_host, test_id)
api.driver_private_data_update(self.ctxt, test_host, test_id, details)
actual_data = api.driver_private_data_get(
db_api.driver_private_data_update(self.ctxt, test_host, test_id,
details)
actual_data = db_api.driver_private_data_get(
self.ctxt, test_host, test_id)
self.assertEqual({}, initial_data)
self.assertEqual(valid, actual_data)
def test_driver_private_data_update_with_duplicate(self):
def test_update_with_duplicate(self):
test_host, test_id = self._get_driver_test_data()
details = {"tee": "too"}
api.driver_private_data_update(self.ctxt, test_host, test_id, details)
api.driver_private_data_update(self.ctxt, test_host, test_id, details)
db_api.driver_private_data_update(self.ctxt, test_host, test_id,
details)
db_api.driver_private_data_update(self.ctxt, test_host, test_id,
details)
actual_result = api.driver_private_data_get(
actual_result = db_api.driver_private_data_get(
self.ctxt, test_host, test_id)
self.assertEqual(details, actual_result)
def test_driver_private_data_update_with_delete_existing(self):
def test_update_with_delete_existing(self):
test_host, test_id = self._get_driver_test_data()
details = {"key1": "val1", "key2": "val2", "key3": "val3"}
details_update = {"key1": "val1_upd", "key4": "new_val"}
# Create new details
api.driver_private_data_update(self.ctxt, test_host, test_id, details)
api.driver_private_data_update(self.ctxt, test_host, test_id,
details_update, delete_existing=True)
db_api.driver_private_data_update(self.ctxt, test_host, test_id,
details)
db_api.driver_private_data_update(self.ctxt, test_host, test_id,
details_update, delete_existing=True)
actual_result = api.driver_private_data_get(
actual_result = db_api.driver_private_data_get(
self.ctxt, test_host, test_id)
self.assertEqual(details_update, actual_result)
def test_driver_private_data_get(self):
def test_get(self):
test_host, test_id = self._get_driver_test_data()
test_key = "foo"
test_keys = [test_key, "tee"]
details = {test_keys[0]: "val", test_keys[1]: "val", "mee": "foo"}
api.driver_private_data_update(self.ctxt, test_host, test_id, details)
db_api.driver_private_data_update(self.ctxt, test_host, test_id,
details)
actual_result_all = api.driver_private_data_get(
actual_result_all = db_api.driver_private_data_get(
self.ctxt, test_host, test_id)
actual_result_single_key = api.driver_private_data_get(
actual_result_single_key = db_api.driver_private_data_get(
self.ctxt, test_host, test_id, test_key)
actual_result_list = api.driver_private_data_get(
actual_result_list = db_api.driver_private_data_get(
self.ctxt, test_host, test_id, test_keys)
self.assertEqual(details, actual_result_all)
self.assertEqual(details[test_key], actual_result_single_key)
self.assertEqual(dict.fromkeys(test_keys, "val"), actual_result_list)
def test_driver_private_data_delete_single(self):
def test_delete_single(self):
test_host, test_id = self._get_driver_test_data()
test_key = "foo"
details = {test_key: "bar", "tee": "too"}
valid_result = {"tee": "too"}
api.driver_private_data_update(self.ctxt, test_host, test_id, details)
db_api.driver_private_data_update(self.ctxt, test_host, test_id,
details)
api.driver_private_data_delete(self.ctxt, test_host, test_id, test_key)
db_api.driver_private_data_delete(self.ctxt, test_host, test_id,
test_key)
actual_result = api.driver_private_data_get(
actual_result = db_api.driver_private_data_get(
self.ctxt, test_host, test_id)
self.assertEqual(valid_result, actual_result)
def test_driver_private_data_delete_all(self):
def test_delete_all(self):
test_host, test_id = self._get_driver_test_data()
details = {"foo": "bar", "tee": "too"}
api.driver_private_data_update(self.ctxt, test_host, test_id, details)
db_api.driver_private_data_update(self.ctxt, test_host, test_id,
details)
api.driver_private_data_delete(self.ctxt, test_host, test_id)
db_api.driver_private_data_delete(self.ctxt, test_host, test_id)
actual_result = api.driver_private_data_get(
actual_result = db_api.driver_private_data_get(
self.ctxt, test_host, test_id)
self.assertEqual({}, actual_result)
def test_custom_query(self):
share = api.share_create(self.ctxt, {'host': 'foobar'})
test_access_values = {
'share_id': share['id'],
'access_type': 'ip',
'access_to': 'fake',
}
share_access = api.share_access_create(self.ctxt, test_access_values)
api.share_access_delete(self.ctxt, share_access.id)
self.assertRaises(exception.NotFound, api.share_access_get,
self.ctxt, share_access.id)
@ddt.ddt
class ShareNetworkDatabaseAPITestCase(BaseDatabaseAPITestCase):
def __init__(self, *args, **kwargs):
super(ShareNetworkDatabaseAPITestCase, self).__init__(*args, **kwargs)
self.fake_context = context.RequestContext(user_id='fake user',
project_id='fake project',
is_admin=False)
def setUp(self):
super(ShareNetworkDatabaseAPITestCase, self).setUp()
self.share_nw_dict = {'id': 'fake network id',
'neutron_net_id': 'fake net id',
'neutron_subnet_id': 'fake subnet id',
'project_id': self.fake_context.project_id,
'user_id': 'fake_user_id',
'network_type': 'vlan',
'segmentation_id': 1000,
'cidr': '10.0.0.0/24',
'ip_version': 4,
'name': 'whatever',
'description': 'fake description'}
def test_create_one_network(self):
result = db_api.share_network_create(self.fake_context,
self.share_nw_dict)
self._check_fields(expected=self.share_nw_dict, actual=result)
self.assertEqual(len(result['shares']), 0)
self.assertEqual(len(result['security_services']), 0)
def test_create_two_networks_in_different_tenants(self):
share_nw_dict2 = self.share_nw_dict.copy()
share_nw_dict2['id'] = None
share_nw_dict2['project_id'] = 'fake project 2'
result1 = db_api.share_network_create(self.fake_context,
self.share_nw_dict)
result2 = db_api.share_network_create(self.fake_context,
share_nw_dict2)
self._check_fields(expected=self.share_nw_dict, actual=result1)
self._check_fields(expected=share_nw_dict2, actual=result2)
def test_create_two_networks_in_one_tenant(self):
share_nw_dict2 = self.share_nw_dict.copy()
share_nw_dict2['id'] += "suffix"
result1 = db_api.share_network_create(self.fake_context,
self.share_nw_dict)
result2 = db_api.share_network_create(self.fake_context,
share_nw_dict2)
self._check_fields(expected=self.share_nw_dict, actual=result1)
self._check_fields(expected=share_nw_dict2, actual=result2)
def test_create_with_duplicated_id(self):
db_api.share_network_create(self.fake_context, self.share_nw_dict)
self.assertRaises(db_exception.DBDuplicateEntry,
db_api.share_network_create,
self.fake_context,
self.share_nw_dict)
def test_get(self):
db_api.share_network_create(self.fake_context, self.share_nw_dict)
result = db_api.share_network_get(self.fake_context,
self.share_nw_dict['id'])
self._check_fields(expected=self.share_nw_dict, actual=result)
self.assertEqual(len(result['shares']), 0)
self.assertEqual(len(result['security_services']), 0)
@ddt.data([{'id': 'fake share id1'}],
[{'id': 'fake share id1'}, {'id': 'fake share id2'}],)
def test_get_with_shares(self, shares):
db_api.share_network_create(self.fake_context, self.share_nw_dict)
for share in shares:
share.update({'share_network_id': self.share_nw_dict['id']})
db_api.share_create(self.fake_context, share)
result = db_api.share_network_get(self.fake_context,
self.share_nw_dict['id'])
self.assertEqual(len(result['shares']), len(shares))
for index, share in enumerate(shares):
self._check_fields(expected=share, actual=result['shares'][index])
@ddt.data([{'id': 'fake security service id1', 'type': 'fake type'}],
[{'id': 'fake security service id1', 'type': 'fake type'},
{'id': 'fake security service id2', 'type': 'fake type'}])
def test_get_with_security_services(self, security_services):
db_api.share_network_create(self.fake_context, self.share_nw_dict)
for service in security_services:
service.update({'project_id': self.fake_context.project_id})
db_api.security_service_create(self.fake_context, service)
db_api.share_network_add_security_service(
self.fake_context, self.share_nw_dict['id'], service['id'])
result = db_api.share_network_get(self.fake_context,
self.share_nw_dict['id'])
self.assertEqual(len(result['security_services']),
len(security_services))
for index, service in enumerate(security_services):
self._check_fields(expected=service,
actual=result['security_services'][index])
def test_get_not_found(self):
self.assertRaises(exception.ShareNetworkNotFound,
db_api.share_network_get,
self.fake_context,
'fake id')
def test_delete(self):
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.share_network_delete(self.fake_context,
self.share_nw_dict['id'])
self.assertRaises(exception.ShareNetworkNotFound,
db_api.share_network_get,
self.fake_context,
self.share_nw_dict['id'])
def test_delete_not_found(self):
self.assertRaises(exception.ShareNetworkNotFound,
db_api.share_network_delete,
self.fake_context,
'fake id')
def test_update(self):
new_name = 'fake_new_name'
db_api.share_network_create(self.fake_context, self.share_nw_dict)
result_update = db_api.share_network_update(self.fake_context,
self.share_nw_dict['id'],
{'name': new_name})
result_get = db_api.share_network_get(self.fake_context,
self.share_nw_dict['id'])
self.assertEqual(result_update['name'], new_name)
self._check_fields(expected=dict(six.iteritems(result_update)),
actual=dict(six.iteritems(result_get)))
def test_update_not_found(self):
self.assertRaises(exception.ShareNetworkNotFound,
db_api.share_network_update,
self.fake_context,
'fake id',
{})
@ddt.data(1, 2)
def test_get_all_one_record(self, records_count):
index = 0
share_networks = []
while index < records_count:
share_network_dict = dict(self.share_nw_dict)
fake_id = 'fake_id%s' % index
share_network_dict.update({'id': fake_id,
'neutron_subnet_id': fake_id})
share_networks.append(share_network_dict)
db_api.share_network_create(self.fake_context, share_network_dict)
index += 1
result = db_api.share_network_get_all(self.fake_context)
self.assertEqual(len(result), len(share_networks))
for index, net in enumerate(share_networks):
self._check_fields(expected=net, actual=result[index])
def test_get_all_by_project(self):
share_nw_dict2 = dict(self.share_nw_dict)
share_nw_dict2['id'] = 'fake share nw id2'
share_nw_dict2['project_id'] = 'fake project 2'
share_nw_dict2['neutron_subnet_id'] = 'fake subnet id2'
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.share_network_create(self.fake_context, share_nw_dict2)
result = db_api.share_network_get_all_by_project(
self.fake_context,
share_nw_dict2['project_id'])
self.assertEqual(len(result), 1)
self._check_fields(expected=share_nw_dict2, actual=result[0])
def test_add_security_service(self):
security_dict1 = {'id': 'fake security service id1',
'project_id': self.fake_context.project_id,
'type': 'fake type'}
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.security_service_create(self.fake_context, security_dict1)
db_api.share_network_add_security_service(self.fake_context,
self.share_nw_dict['id'],
security_dict1['id'])
result = db_api.model_query(
self.fake_context,
models.ShareNetworkSecurityServiceAssociation).\
filter_by(security_service_id=security_dict1['id']).\
filter_by(share_network_id=self.share_nw_dict['id']).first()
self.assertTrue(result is not None)
def test_add_security_service_not_found_01(self):
security_service_id = 'unknown security service'
db_api.share_network_create(self.fake_context, self.share_nw_dict)
self.assertRaises(exception.SecurityServiceNotFound,
db_api.share_network_add_security_service,
self.fake_context,
self.share_nw_dict['id'],
security_service_id)
def test_add_security_service_not_found_02(self):
security_dict1 = {'id': 'fake security service id1',
'project_id': self.fake_context.project_id,
'type': 'fake type'}
share_nw_id = 'unknown share network'
db_api.security_service_create(self.fake_context, security_dict1)
self.assertRaises(exception.ShareNetworkNotFound,
db_api.share_network_add_security_service,
self.fake_context,
share_nw_id,
security_dict1['id'])
def test_add_security_service_association_error_already_associated(self):
security_dict1 = {'id': 'fake security service id1',
'project_id': self.fake_context.project_id,
'type': 'fake type'}
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.security_service_create(self.fake_context, security_dict1)
db_api.share_network_add_security_service(self.fake_context,
self.share_nw_dict['id'],
security_dict1['id'])
self.assertRaises(
exception.ShareNetworkSecurityServiceAssociationError,
db_api.share_network_add_security_service,
self.fake_context,
self.share_nw_dict['id'],
security_dict1['id'])
def test_remove_security_service(self):
security_dict1 = {'id': 'fake security service id1',
'project_id': self.fake_context.project_id,
'type': 'fake type'}
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.security_service_create(self.fake_context, security_dict1)
db_api.share_network_add_security_service(self.fake_context,
self.share_nw_dict['id'],
security_dict1['id'])
db_api.share_network_remove_security_service(self.fake_context,
self.share_nw_dict['id'],
security_dict1['id'])
result = db_api.model_query(
self.fake_context,
models.ShareNetworkSecurityServiceAssociation).\
filter_by(security_service_id=security_dict1['id']).\
filter_by(share_network_id=self.share_nw_dict['id']).first()
self.assertTrue(result is None)
share_nw_ref = db_api.share_network_get(self.fake_context,
self.share_nw_dict['id'])
self.assertEqual(len(share_nw_ref['security_services']), 0)
def test_remove_security_service_not_found_01(self):
security_service_id = 'unknown security service'
db_api.share_network_create(self.fake_context, self.share_nw_dict)
self.assertRaises(exception.SecurityServiceNotFound,
db_api.share_network_remove_security_service,
self.fake_context,
self.share_nw_dict['id'],
security_service_id)
def test_remove_security_service_not_found_02(self):
security_dict1 = {'id': 'fake security service id1',
'project_id': self.fake_context.project_id,
'type': 'fake type'}
share_nw_id = 'unknown share network'
db_api.security_service_create(self.fake_context, security_dict1)
self.assertRaises(exception.ShareNetworkNotFound,
db_api.share_network_remove_security_service,
self.fake_context,
share_nw_id,
security_dict1['id'])
def test_remove_security_service_dissociation_error(self):
security_dict1 = {'id': 'fake security service id1',
'project_id': self.fake_context.project_id,
'type': 'fake type'}
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.security_service_create(self.fake_context, security_dict1)
self.assertRaises(
exception.ShareNetworkSecurityServiceDissociationError,
db_api.share_network_remove_security_service,
self.fake_context,
self.share_nw_dict['id'],
security_dict1['id'])
def test_security_services_relation(self):
security_dict1 = {'id': 'fake security service id1',
'project_id': self.fake_context.project_id,
'type': 'fake type'}
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.security_service_create(self.fake_context, security_dict1)
result = db_api.share_network_get(self.fake_context,
self.share_nw_dict['id'])
self.assertEqual(len(result['security_services']), 0)
def test_shares_relation(self):
share_dict = {'id': 'fake share id1'}
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.share_create(self.fake_context, share_dict)
result = db_api.share_network_get(self.fake_context,
self.share_nw_dict['id'])
self.assertEqual(len(result['shares']), 0)
@ddt.ddt
class SecurityServiceDatabaseAPITestCase(BaseDatabaseAPITestCase):
def __init__(self, *args, **kwargs):
super(SecurityServiceDatabaseAPITestCase, self).__init__(*args,
**kwargs)
self.fake_context = context.RequestContext(user_id='fake user',
project_id='fake project',
is_admin=False)
def _check_expected_fields(self, result, expected):
for key in expected:
self.assertEqual(result[key], expected[key])
def test_create(self):
result = db_api.security_service_create(self.fake_context,
security_service_dict)
self._check_expected_fields(result, security_service_dict)
def test_create_with_duplicated_id(self):
db_api.security_service_create(self.fake_context,
security_service_dict)
self.assertRaises(db_exception.DBDuplicateEntry,
db_api.security_service_create,
self.fake_context,
security_service_dict)
def test_get(self):
db_api.security_service_create(self.fake_context,
security_service_dict)
result = db_api.security_service_get(self.fake_context,
security_service_dict['id'])
self._check_expected_fields(result, security_service_dict)
def test_get_not_found(self):
self.assertRaises(exception.SecurityServiceNotFound,
db_api.security_service_get,
self.fake_context,
'wrong id')
def test_delete(self):
db_api.security_service_create(self.fake_context,
security_service_dict)
db_api.security_service_delete(self.fake_context,
security_service_dict['id'])
self.assertRaises(exception.SecurityServiceNotFound,
db_api.security_service_get,
self.fake_context,
security_service_dict['id'])
def test_update(self):
update_dict = {
'dns_ip': 'new dns',
'server': 'new ldap server',
'domain': 'new ldap domain',
'user': 'new user',
'password': 'new password',
'name': 'new whatever',
'description': 'new nevermind',
}
db_api.security_service_create(self.fake_context,
security_service_dict)
result = db_api.security_service_update(self.fake_context,
security_service_dict['id'],
update_dict)
self._check_expected_fields(result, update_dict)
def test_update_no_updates(self):
db_api.security_service_create(self.fake_context,
security_service_dict)
result = db_api.security_service_update(self.fake_context,
security_service_dict['id'],
{})
self._check_expected_fields(result, security_service_dict)
def test_update_not_found(self):
self.assertRaises(exception.SecurityServiceNotFound,
db_api.security_service_update,
self.fake_context,
'wrong id',
{})
def test_get_all_no_records(self):
result = db_api.security_service_get_all(self.fake_context)
self.assertEqual(len(result), 0)
@ddt.data(1, 2)
def test_get_all(self, records_count):
index = 0
services = []
while index < records_count:
service_dict = dict(security_service_dict)
service_dict.update({'id': 'fake_id%s' % index})
services.append(service_dict)
db_api.security_service_create(self.fake_context, service_dict)
index += 1
result = db_api.security_service_get_all(self.fake_context)
self.assertEqual(len(result), len(services))
for index, service in enumerate(services):
self._check_fields(expected=service, actual=result[index])
def test_get_all_two_records(self):
dict1 = security_service_dict
dict2 = security_service_dict.copy()
dict2['id'] = 'fake id 2'
db_api.security_service_create(self.fake_context,
dict1)
db_api.security_service_create(self.fake_context,
dict2)
result = db_api.security_service_get_all(self.fake_context)
self.assertEqual(len(result), 2)
def test_get_all_by_project(self):
dict1 = security_service_dict
dict2 = security_service_dict.copy()
dict2['id'] = 'fake id 2'
dict2['project_id'] = 'fake project 2'
db_api.security_service_create(self.fake_context,
dict1)
db_api.security_service_create(self.fake_context,
dict2)
result1 = db_api.security_service_get_all_by_project(
self.fake_context,
dict1['project_id'])
self.assertEqual(len(result1), 1)
self._check_expected_fields(result1[0], dict1)
result2 = db_api.security_service_get_all_by_project(
self.fake_context,
dict2['project_id'])
self.assertEqual(len(result2), 1)
self._check_expected_fields(result2[0], dict2)
class ShareServerDatabaseAPITestCase(test.TestCase):
def setUp(self):
super(ShareServerDatabaseAPITestCase, self).setUp()
self.ctxt = context.RequestContext(user_id='user_id',
project_id='project_id',
is_admin=True)
def test_share_server_get(self):
expected = db_utils.create_share_server()
server = db_api.share_server_get(self.ctxt, expected['id'])
self.assertEqual(expected['id'], server['id'])
self.assertEqual(server.share_network_id, expected.share_network_id)
self.assertEqual(server.host, expected.host)
self.assertEqual(server.status, expected.status)
def test_get_not_found(self):
fake_id = 'FAKE_UUID'
self.assertRaises(exception.ShareServerNotFound,
db_api.share_server_get, self.ctxt, fake_id)
def test_create(self):
server = db_utils.create_share_server()
self.assertTrue(server['id'])
self.assertEqual(server.share_network_id, server['share_network_id'])
self.assertEqual(server.host, server['host'])
self.assertEqual(server.status, server['status'])
def test_delete(self):
server = db_utils.create_share_server()
num_records = len(db_api.share_server_get_all(self.ctxt))
db_api.share_server_delete(self.ctxt, server['id'])
self.assertEqual(len(db_api.share_server_get_all(self.ctxt)),
num_records - 1)
def test_delete_not_found(self):
fake_id = 'FAKE_UUID'
self.assertRaises(exception.ShareServerNotFound,
db_api.share_server_delete,
self.ctxt, fake_id)
def test_update(self):
update = {
'share_network_id': 'update_net',
'host': 'update_host',
'status': constants.STATUS_ACTIVE,
}
server = db_utils.create_share_server()
updated_server = db_api.share_server_update(self.ctxt, server['id'],
update)
self.assertEqual(server['id'], updated_server['id'])
self.assertEqual(updated_server.share_network_id,
update['share_network_id'])
self.assertEqual(updated_server.host, update['host'])
self.assertEqual(updated_server.status, update['status'])
def test_update_not_found(self):
fake_id = 'FAKE_UUID'
self.assertRaises(exception.ShareServerNotFound,
db_api.share_server_update,
self.ctxt, fake_id, {})
def test_get_all_by_host_and_share_net_valid(self):
valid = {
'share_network_id': '1',
'host': 'host1',
'status': constants.STATUS_ACTIVE,
}
invalid = {
'share_network_id': '1',
'host': 'host1',
'status': constants.STATUS_ERROR,
}
other = {
'share_network_id': '2',
'host': 'host2',
'status': constants.STATUS_ACTIVE,
}
valid = db_utils.create_share_server(**valid)
db_utils.create_share_server(**invalid)
db_utils.create_share_server(**other)
servers = db_api.share_server_get_all_by_host_and_share_net_valid(
self.ctxt,
host='host1',
share_net_id='1')
self.assertEqual(servers[0]['id'], valid['id'])
def test_get_all_by_host_and_share_net_not_found(self):
self.assertRaises(
exception.ShareServerNotFound,
db_api.share_server_get_all_by_host_and_share_net_valid,
self.ctxt, host='fake', share_net_id='fake'
)
def test_get_all(self):
srv1 = {
'share_network_id': '1',
'host': 'host1',
'status': constants.STATUS_ACTIVE,
}
srv2 = {
'share_network_id': '1',
'host': 'host1',
'status': constants.STATUS_ERROR,
}
srv3 = {
'share_network_id': '2',
'host': 'host2',
'status': constants.STATUS_ACTIVE,
}
servers = db_api.share_server_get_all(self.ctxt)
self.assertEqual(len(servers), 0)
to_delete = db_utils.create_share_server(**srv1)
db_utils.create_share_server(**srv2)
db_utils.create_share_server(**srv3)
servers = db_api.share_server_get_all(self.ctxt)
self.assertEqual(len(servers), 3)
db_api.share_server_delete(self.ctxt, to_delete['id'])
servers = db_api.share_server_get_all(self.ctxt)
self.assertEqual(len(servers), 2)
def test_backend_details_set(self):
details = {
'value1': '1',
'value2': '2',
}
server = db_utils.create_share_server()
db_api.share_server_backend_details_set(self.ctxt, server['id'],
details)
self.assertDictMatch(
details,
db_api.share_server_get(self.ctxt, server['id'])['backend_details']
)
def test_backend_details_set_not_found(self):
fake_id = 'FAKE_UUID'
self.assertRaises(exception.ShareServerNotFound,
db_api.share_server_backend_details_set,
self.ctxt, fake_id, {})
def test_get_with_details(self):
values = {
'share_network_id': 'fake-share-net-id',
'host': 'hostname',
'status': constants.STATUS_ACTIVE,
}
details = {
'value1': '1',
'value2': '2',
}
srv_id = db_utils.create_share_server(**values)['id']
db_api.share_server_backend_details_set(self.ctxt, srv_id, details)
server = db_api.share_server_get(self.ctxt, srv_id)
self.assertEqual(srv_id, server['id'])
self.assertEqual(server.share_network_id, values['share_network_id'])
self.assertEqual(server.host, values['host'])
self.assertEqual(server.status, values['status'])
self.assertDictMatch(details, server['backend_details'])
self.assertTrue('backend_details' in server.to_dict())
def test_delete_with_details(self):
server = db_utils.create_share_server(backend_details={
'value1': '1',
'value2': '2',
})
num_records = len(db_api.share_server_get_all(self.ctxt))
db_api.share_server_delete(self.ctxt, server['id'])
self.assertEqual(len(db_api.share_server_get_all(self.ctxt)),
num_records - 1)

120
manila/tests/db_utils.py Normal file
View File

@ -0,0 +1,120 @@
# Copyright 2015 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
from manila.common import constants
from manila import context
from manila import db
def _create_db_row(method, default_values, custom_values):
override_defaults = custom_values.pop('override_defaults', None)
if override_defaults:
default_values = custom_values
else:
default_values.update(copy.deepcopy(custom_values))
return method(context.get_admin_context(), default_values)
def create_share(**kwargs):
"""Create a share object."""
share = {
'share_proto': "NFS",
'size': 0,
'snapshot_id': None,
'share_network_id': None,
'share_server_id': None,
'user_id': 'fake',
'project_id': 'fake',
'metadata': {'fake_key': 'fake_value'},
'availability_zone': 'fake_availability_zone',
'status': "creating",
'host': 'fake_host'
}
return _create_db_row(db.share_create, share, kwargs)
def create_snapshot(**kwargs):
"""Create a snapshot object."""
snapshot = {
'share_proto': "NFS",
'size': 0,
'share_id': None,
'user_id': 'fake',
'project_id': 'fake',
'status': 'creating'
}
return _create_db_row(db.share_snapshot_create, snapshot, kwargs)
def create_access(**kwargs):
"""Create a access rule object."""
access = {
'access_type': 'fake_type',
'access_to': 'fake_IP',
'share_id': None,
'state': 'new',
}
return _create_db_row(db.share_access_create, access, kwargs)
def create_share_server(**kwargs):
"""Create a share server object."""
backend_details = kwargs.pop('backend_details', {})
srv = {
'host': 'host1',
'share_network_id': 'fake_srv_id',
'status': constants.STATUS_ACTIVE
}
share_srv = _create_db_row(db.share_server_create, srv, kwargs)
if backend_details:
db.share_server_backend_details_set(
context.get_admin_context(), share_srv['id'], backend_details)
return db.share_server_get(context.get_admin_context(),
share_srv['id'])
def create_share_network(**kwargs):
"""Create a share network object."""
net = {
'user_id': 'fake',
'project_id': 'fake',
'neutron_net_id': 'fake-neutron-net',
'neutron_subnet_id': 'fake-neutron-subnet',
'status': 'new',
'network_type': 'vlan',
'segmentation_id': 1000,
'cidr': '10.0.0.0/24',
'ip_version': 4,
'name': 'whatever',
'description': 'fake description',
}
return _create_db_row(db.share_network_create, net, kwargs)
def create_security_service(**kwargs):
share_network_id = kwargs.pop('share_network_id', None)
service = {
'type': "FAKE",
'project_id': 'fake-project-id',
}
service_ref = _create_db_row(db.security_service_create, service, kwargs)
if share_network_id:
db.share_network_add_security_service(context.get_admin_context(),
share_network_id,
service_ref['id'])
return service_ref

View File

@ -1,179 +0,0 @@
# Copyright 2013 OpenStack Foundation
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_db import exception as db_exception
from manila import context
from manila.db import api as db_api
from manila import exception
from manila import test
security_service_dict = {
'id': 'fake id',
'project_id': 'fake project',
'type': 'ldap',
'dns_ip': 'fake dns',
'server': 'fake ldap server',
'domain': 'fake ldap domain',
'user': 'fake user',
'password': 'fake password',
'name': 'whatever',
'description': 'nevermind',
}
class SecurityServiceDBTest(test.TestCase):
def __init__(self, *args, **kwargs):
super(SecurityServiceDBTest, self).__init__(*args, **kwargs)
self.fake_context = context.RequestContext(user_id='fake user',
project_id='fake project',
is_admin=False)
def _check_expected_fields(self, result, expected):
for key in expected:
self.assertEqual(result[key], expected[key])
def test_create(self):
result = db_api.security_service_create(self.fake_context,
security_service_dict)
self._check_expected_fields(result, security_service_dict)
def test_create_with_duplicated_id(self):
db_api.security_service_create(self.fake_context,
security_service_dict)
self.assertRaises(db_exception.DBDuplicateEntry,
db_api.security_service_create,
self.fake_context,
security_service_dict)
def test_get(self):
db_api.security_service_create(self.fake_context,
security_service_dict)
result = db_api.security_service_get(self.fake_context,
security_service_dict['id'])
self._check_expected_fields(result, security_service_dict)
def test_get_not_found(self):
self.assertRaises(exception.SecurityServiceNotFound,
db_api.security_service_get,
self.fake_context,
'wrong id')
def test_delete(self):
db_api.security_service_create(self.fake_context,
security_service_dict)
db_api.security_service_delete(self.fake_context,
security_service_dict['id'])
self.assertRaises(exception.SecurityServiceNotFound,
db_api.security_service_get,
self.fake_context,
security_service_dict['id'])
def test_update(self):
update_dict = {
'dns_ip': 'new dns',
'server': 'new ldap server',
'domain': 'new ldap domain',
'user': 'new user',
'password': 'new password',
'name': 'new whatever',
'description': 'new nevermind',
}
db_api.security_service_create(self.fake_context,
security_service_dict)
result = db_api.security_service_update(self.fake_context,
security_service_dict['id'],
update_dict)
self._check_expected_fields(result, update_dict)
def test_update_no_updates(self):
db_api.security_service_create(self.fake_context,
security_service_dict)
result = db_api.security_service_update(self.fake_context,
security_service_dict['id'],
{})
self._check_expected_fields(result, security_service_dict)
def test_update_not_found(self):
self.assertRaises(exception.SecurityServiceNotFound,
db_api.security_service_update,
self.fake_context,
'wrong id',
{})
def test_get_all_no_records(self):
result = db_api.security_service_get_all(self.fake_context)
self.assertEqual(len(result), 0)
def test_get_all_one_record(self):
db_api.security_service_create(self.fake_context,
security_service_dict)
result = db_api.security_service_get_all(self.fake_context)
self.assertEqual(len(result), 1)
self._check_expected_fields(result[0], security_service_dict)
def test_get_all_two_records(self):
dict1 = security_service_dict
dict2 = security_service_dict.copy()
dict2['id'] = 'fake id 2'
db_api.security_service_create(self.fake_context,
dict1)
db_api.security_service_create(self.fake_context,
dict2)
result = db_api.security_service_get_all(self.fake_context)
self.assertEqual(len(result), 2)
def test_get_all_by_project(self):
dict1 = security_service_dict
dict2 = security_service_dict.copy()
dict2['id'] = 'fake id 2'
dict2['project_id'] = 'fake project 2'
db_api.security_service_create(self.fake_context,
dict1)
db_api.security_service_create(self.fake_context,
dict2)
result1 = db_api.security_service_get_all_by_project(
self.fake_context,
dict1['project_id'])
self.assertEqual(len(result1), 1)
self._check_expected_fields(result1[0], dict1)
result2 = db_api.security_service_get_all_by_project(
self.fake_context,
dict2['project_id'])
self.assertEqual(len(result2), 1)
self._check_expected_fields(result2[0], dict2)

View File

@ -1,390 +0,0 @@
# Copyright 2013 OpenStack Foundation
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_db import exception as db_exception
import six
from manila import context
from manila.db import api as db_api
from manila.db.sqlalchemy import api as sqlalchemy_api
from manila.db.sqlalchemy import models
from manila import exception
from manila import test
class ShareNetworkDBTest(test.TestCase):
def __init__(self, *args, **kwargs):
super(ShareNetworkDBTest, self).__init__(*args, **kwargs)
self.fake_context = context.RequestContext(user_id='fake user',
project_id='fake project',
is_admin=False)
def _check_fields(self, expected, actual):
for key in expected:
self.assertEqual(actual[key], expected[key])
def setUp(self):
super(ShareNetworkDBTest, self).setUp()
self.share_nw_dict = {'id': 'fake network id',
'neutron_net_id': 'fake net id',
'neutron_subnet_id': 'fake subnet id',
'project_id': self.fake_context.project_id,
'user_id': 'fake_user_id',
'network_type': 'vlan',
'segmentation_id': 1000,
'cidr': '10.0.0.0/24',
'ip_version': 4,
'name': 'whatever',
'description': 'fake description'}
def test_create_one_network(self):
result = db_api.share_network_create(self.fake_context,
self.share_nw_dict)
self._check_fields(expected=self.share_nw_dict, actual=result)
self.assertEqual(len(result['shares']), 0)
self.assertEqual(len(result['security_services']), 0)
def test_create_two_networks_in_different_tenants(self):
share_nw_dict2 = self.share_nw_dict.copy()
share_nw_dict2['id'] = None
share_nw_dict2['project_id'] = 'fake project 2'
result1 = db_api.share_network_create(self.fake_context,
self.share_nw_dict)
result2 = db_api.share_network_create(self.fake_context,
share_nw_dict2)
self._check_fields(expected=self.share_nw_dict, actual=result1)
self._check_fields(expected=share_nw_dict2, actual=result2)
def test_create_two_networks_in_one_tenant(self):
share_nw_dict2 = self.share_nw_dict.copy()
share_nw_dict2['id'] = share_nw_dict2['id'] + "suffix"
result1 = db_api.share_network_create(self.fake_context,
self.share_nw_dict)
result2 = db_api.share_network_create(self.fake_context,
share_nw_dict2)
self._check_fields(expected=self.share_nw_dict, actual=result1)
self._check_fields(expected=share_nw_dict2, actual=result2)
def test_create_with_duplicated_id(self):
db_api.share_network_create(self.fake_context, self.share_nw_dict)
self.assertRaises(db_exception.DBDuplicateEntry,
db_api.share_network_create,
self.fake_context,
self.share_nw_dict)
def test_get(self):
db_api.share_network_create(self.fake_context, self.share_nw_dict)
result = db_api.share_network_get(self.fake_context,
self.share_nw_dict['id'])
self._check_fields(expected=self.share_nw_dict, actual=result)
self.assertEqual(len(result['shares']), 0)
self.assertEqual(len(result['security_services']), 0)
def test_get_with_one_share(self):
share_dict1 = {'id': 'fake share id1',
'share_network_id': self.share_nw_dict['id']}
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.share_create(self.fake_context, share_dict1)
result = db_api.share_network_get(self.fake_context,
self.share_nw_dict['id'])
self.assertEqual(len(result['shares']), 1)
self._check_fields(expected=share_dict1,
actual=result['shares'][0])
def test_get_with_two_shares(self):
share_dict1 = {'id': 'fake share id1',
'share_network_id': self.share_nw_dict['id']}
share_dict2 = {'id': 'fake share id2',
'share_network_id': self.share_nw_dict['id']}
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.share_create(self.fake_context, share_dict1)
db_api.share_create(self.fake_context, share_dict2)
result = db_api.share_network_get(self.fake_context,
self.share_nw_dict['id'])
self.assertEqual(len(result['shares']), 2)
def test_get_with_one_security_service(self):
security_dict1 = {'id': 'fake security service id1',
'project_id': self.fake_context.project_id,
'type': 'fake type'}
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.security_service_create(self.fake_context, security_dict1)
db_api.share_network_add_security_service(self.fake_context,
self.share_nw_dict['id'],
security_dict1['id'])
result = db_api.share_network_get(self.fake_context,
self.share_nw_dict['id'])
self.assertEqual(len(result['security_services']), 1)
self._check_fields(expected=security_dict1,
actual=result['security_services'][0])
def test_get_with_two_security_services(self):
security_dict1 = {'id': 'fake security service id1',
'project_id': self.fake_context.project_id,
'type': 'fake type'}
security_dict2 = {'id': 'fake security service id2',
'project_id': self.fake_context.project_id,
'type': 'fake type'}
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.security_service_create(self.fake_context, security_dict1)
db_api.security_service_create(self.fake_context, security_dict2)
db_api.share_network_add_security_service(self.fake_context,
self.share_nw_dict['id'],
security_dict1['id'])
db_api.share_network_add_security_service(self.fake_context,
self.share_nw_dict['id'],
security_dict2['id'])
result = db_api.share_network_get(self.fake_context,
self.share_nw_dict['id'])
self.assertEqual(len(result['security_services']), 2)
def test_get_not_found(self):
self.assertRaises(exception.ShareNetworkNotFound,
db_api.share_network_get,
self.fake_context,
'fake id')
def test_delete(self):
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.share_network_delete(self.fake_context,
self.share_nw_dict['id'])
self.assertRaises(exception.ShareNetworkNotFound,
db_api.share_network_get,
self.fake_context,
self.share_nw_dict['id'])
def test_delete_not_found(self):
self.assertRaises(exception.ShareNetworkNotFound,
db_api.share_network_delete,
self.fake_context,
'fake id')
def test_update(self):
new_name = 'fake_new_name'
db_api.share_network_create(self.fake_context, self.share_nw_dict)
result_update = db_api.share_network_update(self.fake_context,
self.share_nw_dict['id'],
{'name': new_name})
result_get = db_api.share_network_get(self.fake_context,
self.share_nw_dict['id'])
self.assertEqual(result_update['name'], new_name)
self._check_fields(expected=dict(six.iteritems(result_update)),
actual=dict(six.iteritems(result_get)))
def test_update_not_found(self):
self.assertRaises(exception.ShareNetworkNotFound,
db_api.share_network_update,
self.fake_context,
'fake id',
{})
def test_get_all_one_record(self):
db_api.share_network_create(self.fake_context, self.share_nw_dict)
result = db_api.share_network_get_all(self.fake_context)
self.assertEqual(len(result), 1)
self._check_fields(expected=self.share_nw_dict, actual=result[0])
def test_get_all_two_records(self):
share_nw_dict2 = dict(self.share_nw_dict)
share_nw_dict2['id'] = 'fake subnet id2'
share_nw_dict2['neutron_subnet_id'] = 'fake subnet id2'
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.share_network_create(self.fake_context, share_nw_dict2)
result = db_api.share_network_get_all(self.fake_context)
self.assertEqual(len(result), 2)
def test_get_all_by_project(self):
share_nw_dict2 = dict(self.share_nw_dict)
share_nw_dict2['id'] = 'fake share nw id2'
share_nw_dict2['project_id'] = 'fake project 2'
share_nw_dict2['neutron_subnet_id'] = 'fake subnet id2'
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.share_network_create(self.fake_context, share_nw_dict2)
result = db_api.share_network_get_all_by_project(
self.fake_context,
share_nw_dict2['project_id'])
self.assertEqual(len(result), 1)
self._check_fields(expected=share_nw_dict2, actual=result[0])
def test_add_security_service(self):
security_dict1 = {'id': 'fake security service id1',
'project_id': self.fake_context.project_id,
'type': 'fake type'}
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.security_service_create(self.fake_context, security_dict1)
db_api.share_network_add_security_service(self.fake_context,
self.share_nw_dict['id'],
security_dict1['id'])
result = sqlalchemy_api.model_query(
self.fake_context,
models.ShareNetworkSecurityServiceAssociation).\
filter_by(security_service_id=security_dict1['id']).\
filter_by(share_network_id=self.share_nw_dict['id']).first()
self.assertTrue(result is not None)
def test_add_security_service_not_found_01(self):
security_service_id = 'unknown security service'
db_api.share_network_create(self.fake_context, self.share_nw_dict)
self.assertRaises(exception.SecurityServiceNotFound,
db_api.share_network_add_security_service,
self.fake_context,
self.share_nw_dict['id'],
security_service_id)
def test_add_security_service_not_found_02(self):
security_dict1 = {'id': 'fake security service id1',
'project_id': self.fake_context.project_id,
'type': 'fake type'}
share_nw_id = 'unknown share network'
db_api.security_service_create(self.fake_context, security_dict1)
self.assertRaises(exception.ShareNetworkNotFound,
db_api.share_network_add_security_service,
self.fake_context,
share_nw_id,
security_dict1['id'])
def test_add_security_service_association_error_already_associated(self):
security_dict1 = {'id': 'fake security service id1',
'project_id': self.fake_context.project_id,
'type': 'fake type'}
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.security_service_create(self.fake_context, security_dict1)
db_api.share_network_add_security_service(self.fake_context,
self.share_nw_dict['id'],
security_dict1['id'])
self.assertRaises(
exception.ShareNetworkSecurityServiceAssociationError,
db_api.share_network_add_security_service,
self.fake_context,
self.share_nw_dict['id'],
security_dict1['id'])
def test_remove_security_service(self):
security_dict1 = {'id': 'fake security service id1',
'project_id': self.fake_context.project_id,
'type': 'fake type'}
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.security_service_create(self.fake_context, security_dict1)
db_api.share_network_add_security_service(self.fake_context,
self.share_nw_dict['id'],
security_dict1['id'])
db_api.share_network_remove_security_service(self.fake_context,
self.share_nw_dict['id'],
security_dict1['id'])
result = sqlalchemy_api.model_query(
self.fake_context,
models.ShareNetworkSecurityServiceAssociation).\
filter_by(security_service_id=security_dict1['id']).\
filter_by(share_network_id=self.share_nw_dict['id']).first()
self.assertTrue(result is None)
share_nw_ref = db_api.share_network_get(self.fake_context,
self.share_nw_dict['id'])
self.assertEqual(len(share_nw_ref['security_services']), 0)
def test_remove_security_service_not_found_01(self):
security_service_id = 'unknown security service'
db_api.share_network_create(self.fake_context, self.share_nw_dict)
self.assertRaises(exception.SecurityServiceNotFound,
db_api.share_network_remove_security_service,
self.fake_context,
self.share_nw_dict['id'],
security_service_id)
def test_remove_security_service_not_found_02(self):
security_dict1 = {'id': 'fake security service id1',
'project_id': self.fake_context.project_id,
'type': 'fake type'}
share_nw_id = 'unknown share network'
db_api.security_service_create(self.fake_context, security_dict1)
self.assertRaises(exception.ShareNetworkNotFound,
db_api.share_network_remove_security_service,
self.fake_context,
share_nw_id,
security_dict1['id'])
def test_remove_security_service_dissociation_error(self):
security_dict1 = {'id': 'fake security service id1',
'project_id': self.fake_context.project_id,
'type': 'fake type'}
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.security_service_create(self.fake_context, security_dict1)
self.assertRaises(
exception.ShareNetworkSecurityServiceDissociationError,
db_api.share_network_remove_security_service,
self.fake_context,
self.share_nw_dict['id'],
security_dict1['id'])
def test_security_services_relation(self):
security_dict1 = {'id': 'fake security service id1',
'project_id': self.fake_context.project_id,
'type': 'fake type'}
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.security_service_create(self.fake_context, security_dict1)
result = db_api.share_network_get(self.fake_context,
self.share_nw_dict['id'])
self.assertEqual(len(result['security_services']), 0)
def test_shares_relation(self):
share_dict = {'id': 'fake share id1'}
db_api.share_network_create(self.fake_context, self.share_nw_dict)
db_api.share_create(self.fake_context, share_dict)
result = db_api.share_network_get(self.fake_context,
self.share_nw_dict['id'])
self.assertEqual(len(result['shares']), 0)

View File

@ -32,6 +32,7 @@ from manila import quota
from manila.share import drivers_private_data
from manila.share import manager
from manila import test
from manila.tests import db_utils
from manila.tests import utils as test_utils
from manila import utils
@ -65,84 +66,6 @@ class ShareManagerTestCase(test.TestCase):
self.mock_object(self.share_manager.driver, 'check_for_setup_error')
self.context = context.get_admin_context()
@staticmethod
def _create_share(status="creating", size=0, snapshot_id=None,
share_network_id=None, share_server_id=None):
"""Create a share object."""
share = {}
share['share_proto'] = "NFS"
share['size'] = size
share['snapshot_id'] = snapshot_id
share['share_network_id'] = share_network_id
share['share_server_id'] = share_server_id
share['user_id'] = 'fake'
share['project_id'] = 'fake'
share['metadata'] = {'fake_key': 'fake_value'}
share['availability_zone'] = 'fake_availability_zone'
share['status'] = status
share['host'] = 'fake_host'
return db.share_create(context.get_admin_context(), share)
@staticmethod
def _create_snapshot(status="creating", size=0, share_id=None):
"""Create a snapshot object."""
snapshot = {}
snapshot['share_proto'] = "NFS"
snapshot['size'] = size
snapshot['share_id'] = share_id
snapshot['user_id'] = 'fake'
snapshot['project_id'] = 'fake'
snapshot['status'] = status
return db.share_snapshot_create(context.get_admin_context(), snapshot)
@staticmethod
def _create_access(state='new', share_id=None):
"""Create a access rule object."""
access = {}
access['access_type'] = 'fake_type'
access['access_to'] = 'fake_IP'
access['share_id'] = share_id
access['state'] = state
return db.share_access_create(context.get_admin_context(), access)
@staticmethod
def _create_share_server(state=None, share_network_id=None, host=None,
backend_details=None):
"""Create a share server object."""
srv = {}
srv['host'] = host
srv['share_network_id'] = share_network_id
srv['status'] = state or constants.STATUS_ACTIVE
share_srv = db.share_server_create(context.get_admin_context(), srv)
if backend_details:
db.share_server_backend_details_set(
context.get_admin_context(), share_srv['id'], backend_details)
return db.share_server_get(context.get_admin_context(),
share_srv['id'])
@staticmethod
def _create_share_network(state='new'):
"""Create a share network object."""
srv = {}
srv['user_id'] = 'fake'
srv['project_id'] = 'fake'
srv['neutron_net_id'] = 'fake-neutron-net'
srv['neutron_subnet_id'] = 'fake-neutron-subnet'
srv['status'] = state
return db.share_network_create(context.get_admin_context(), srv)
@staticmethod
def _create_security_service(share_network_id=None):
service = {}
service['type'] = "FAKE"
service['project_id'] = 'fake-project-id'
service_ref = db.security_service_create(
context.get_admin_context(), service)
db.share_network_add_security_service(context.get_admin_context(),
share_network_id,
service_ref['id'])
return service_ref
def test_share_manager_instance(self):
fake_service_name = "fake_service"
import_mock = mock.Mock()
@ -381,15 +304,15 @@ class ShareManagerTestCase(test.TestCase):
def test_create_share_from_snapshot_with_server(self):
"""Test share can be created from snapshot if server exists."""
network = self._create_share_network()
server = self._create_share_server(
network = db_utils.create_share_network()
server = db_utils.create_share_server(
share_network_id=network['id'], host='fake_host',
backend_details=dict(fake='fake'))
parent_share = self._create_share(share_network_id='net-id',
share_server_id=server['id'])
share = self._create_share()
parent_share = db_utils.create_share(share_network_id='net-id',
share_server_id=server['id'])
share = db_utils.create_share()
share_id = share['id']
snapshot = self._create_snapshot(share_id=parent_share['id'])
snapshot = db_utils.create_snapshot(share_id=parent_share['id'])
snapshot_id = snapshot['id']
self.share_manager.create_share(self.context, share_id,
@ -403,11 +326,11 @@ class ShareManagerTestCase(test.TestCase):
def test_create_share_from_snapshot_with_server_not_found(self):
"""Test creation from snapshot fails if server not found."""
parent_share = self._create_share(share_network_id='net-id',
share_server_id='fake-id')
share = self._create_share()
parent_share = db_utils.create_share(share_network_id='net-id',
share_server_id='fake-id')
share = db_utils.create_share()
share_id = share['id']
snapshot = self._create_snapshot(share_id=parent_share['id'])
snapshot = db_utils.create_snapshot(share_id=parent_share['id'])
snapshot_id = snapshot['id']
self.assertRaises(exception.ShareServerNotFound,
@ -422,9 +345,9 @@ class ShareManagerTestCase(test.TestCase):
def test_create_share_from_snapshot(self):
"""Test share can be created from snapshot."""
share = self._create_share()
share = db_utils.create_share()
share_id = share['id']
snapshot = self._create_snapshot(share_id=share_id)
snapshot = db_utils.create_snapshot(share_id=share_id)
snapshot_id = snapshot['id']
self.share_manager.create_share(self.context, share_id,
@ -447,9 +370,9 @@ class ShareManagerTestCase(test.TestCase):
self.mock_object(self.share_manager.driver, "create_snapshot",
_fake_create_snapshot)
share = self._create_share()
share = db_utils.create_share()
share_id = share['id']
snapshot = self._create_snapshot(share_id=share_id)
snapshot = db_utils.create_snapshot(share_id=share_id)
snapshot_id = snapshot['id']
self.share_manager.create_snapshot(self.context, share_id,
@ -478,9 +401,9 @@ class ShareManagerTestCase(test.TestCase):
self.mock_object(self.share_manager.driver, "delete_snapshot",
mock.Mock(side_effect=_raise_not_found))
share = self._create_share()
share = db_utils.create_share()
share_id = share['id']
snapshot = self._create_snapshot(share_id=share_id)
snapshot = db_utils.create_snapshot(share_id=share_id)
snapshot_id = snapshot['id']
self.assertRaises(exception.NotFound,
@ -513,8 +436,8 @@ class ShareManagerTestCase(test.TestCase):
self.mock_object(self.share_manager.driver, "delete_snapshot",
mock.Mock(side_effect=_raise_share_snapshot_is_busy))
share = self._create_share(status=constants.STATUS_ACTIVE)
snapshot = self._create_snapshot(share_id=share['id'])
share = db_utils.create_share(status=constants.STATUS_ACTIVE)
snapshot = db_utils.create_snapshot(share_id=share['id'])
snapshot_id = snapshot['id']
self.share_manager.delete_snapshot(self.context, snapshot_id)
@ -535,7 +458,7 @@ class ShareManagerTestCase(test.TestCase):
share_network_id = 'fake_sn'
self.mock_object(
self.share_manager.db, 'share_get',
mock.Mock(return_value=self._create_share(
mock.Mock(return_value=db_utils.create_share(
share_network_id=share_network_id)))
self.mock_object(self.share_manager.db, 'share_update')
@ -552,13 +475,13 @@ class ShareManagerTestCase(test.TestCase):
def test_create_share_with_share_network_server_not_exists(self):
"""Test share can be created without share server."""
share_net = self._create_share_network()
share = self._create_share(share_network_id=share_net['id'])
share_net = db_utils.create_share_network()
share = db_utils.create_share(share_network_id=share_net['id'])
share_id = share['id']
def fake_setup_server(context, share_network, *args, **kwargs):
return self._create_share_server(
return db_utils.create_share_server(
share_network_id=share_network['id'],
host='fake_host')
@ -627,7 +550,7 @@ class ShareManagerTestCase(test.TestCase):
def test_create_share_with_share_network_not_found(self):
"""Test creation fails if share network not found."""
share = self._create_share(share_network_id='fake-net-id')
share = db_utils.create_share(share_network_id='fake-net-id')
share_id = share['id']
self.assertRaises(
exception.ShareNetworkNotFound,
@ -640,9 +563,9 @@ class ShareManagerTestCase(test.TestCase):
def test_create_share_with_share_network_server_exists(self):
"""Test share can be created with existing share server."""
share_net = self._create_share_network()
share = self._create_share(share_network_id=share_net['id'])
share_srv = self._create_share_server(
share_net = db_utils.create_share_network()
share = db_utils.create_share(share_network_id=share_net['id'])
share_srv = db_utils.create_share_server(
share_network_id=share_net['id'], host=self.share_manager.host)
share_id = share['id']
@ -667,7 +590,7 @@ class ShareManagerTestCase(test.TestCase):
@ddt.data('export_location', 'export_locations')
def test_create_share_with_error_in_driver(self, details_key):
"""Test db updates if share creation fails in driver."""
share = self._create_share()
share = db_utils.create_share()
share_id = share['id']
some_data = 'fake_location'
self.share_manager.driver = mock.Mock()
@ -685,11 +608,11 @@ class ShareManagerTestCase(test.TestCase):
def test_create_share_with_server_created(self):
"""Test share can be created and share server is created."""
share_net = self._create_share_network()
share = self._create_share(share_network_id=share_net['id'])
self._create_share_server(
share_net = db_utils.create_share_network()
share = db_utils.create_share(share_network_id=share_net['id'])
db_utils.create_share_server(
share_network_id=share_net['id'], host=self.share_manager.host,
state=constants.STATUS_ERROR)
status=constants.STATUS_ERROR)
share_id = share['id']
fake_server = {
'id': 'fake_srv_id',
@ -723,7 +646,7 @@ class ShareManagerTestCase(test.TestCase):
self.mock_object(self.share_manager.driver, "delete_share",
mock.Mock(side_effect=_raise_not_found))
share = self._create_share()
share = db_utils.create_share()
share_id = share['id']
self.assertRaises(exception.NotFound,
self.share_manager.create_share,
@ -751,7 +674,7 @@ class ShareManagerTestCase(test.TestCase):
def test_provide_share_server_for_share_incompatible_servers(self):
fake_exception = exception.ManilaException("fake")
fake_share_server = {'id': 'fake'}
share = self._create_share()
share = db_utils.create_share()
self.mock_object(db,
'share_server_get_all_by_host_and_share_net_valid',
@ -780,7 +703,7 @@ class ShareManagerTestCase(test.TestCase):
def test_provide_share_server_for_share_parent_ss_not_found(self):
fake_parent_id = "fake_server_id"
fake_exception = exception.ShareServerNotFound("fake")
share = self._create_share()
share = db_utils.create_share()
fake_snapshot = {'share': {'share_server_id': fake_parent_id}}
self.mock_object(db, 'share_server_get',
mock.Mock(side_effect=fake_exception))
@ -795,7 +718,7 @@ class ShareManagerTestCase(test.TestCase):
def test_provide_share_server_for_share_parent_ss_invalid(self):
fake_parent_id = "fake_server_id"
share = self._create_share()
share = db_utils.create_share()
fake_snapshot = {'share': {'share_server_id': fake_parent_id}}
fake_parent_share_server = {'status': 'fake'}
self.mock_object(db, 'share_server_get',
@ -813,7 +736,7 @@ class ShareManagerTestCase(test.TestCase):
self.mock_object(self.share_manager, 'driver', mock.Mock())
self.share_manager.driver.driver_handles_share_servers = True
self.mock_object(self.share_manager.db, 'share_update', mock.Mock())
share = self._create_share()
share = db_utils.create_share()
share_id = share['id']
self.assertRaises(
@ -832,7 +755,7 @@ class ShareManagerTestCase(test.TestCase):
self.share_manager.driver,
"manage_existing", mock.Mock(side_effect=CustomException))
self.mock_object(self.share_manager.db, 'share_update', mock.Mock())
share = self._create_share()
share = db_utils.create_share()
share_id = share['id']
driver_options = {'fake': 'fake'}
@ -855,7 +778,7 @@ class ShareManagerTestCase(test.TestCase):
"manage_existing",
mock.Mock(return_value=None))
self.mock_object(self.share_manager.db, 'share_update', mock.Mock())
share = self._create_share()
share = db_utils.create_share()
share_id = share['id']
driver_options = {'fake': 'fake'}
@ -879,7 +802,7 @@ class ShareManagerTestCase(test.TestCase):
self.mock_object(self.share_manager, '_update_quota_usages',
mock.Mock(side_effect=exception.QuotaError))
self.mock_object(self.share_manager.db, 'share_update', mock.Mock())
share = self._create_share()
share = db_utils.create_share()
share_id = share['id']
driver_options = {'fake': 'fake'}
@ -916,7 +839,7 @@ class ShareManagerTestCase(test.TestCase):
self.mock_object(self.share_manager.driver,
"manage_existing",
mock.Mock(return_value=driver_data))
share = self._create_share()
share = db_utils.create_share()
share_id = share['id']
driver_options = {'fake': 'fake'}
@ -990,11 +913,11 @@ class ShareManagerTestCase(test.TestCase):
self.share_manager.driver.driver_handles_share_servers = (
driver_handles_share_servers
)
share_net = self._create_share_network()
share_srv = self._create_share_server(share_network_id=share_net['id'],
host=self.share_manager.host)
share = self._create_share(share_network_id=share_net['id'],
share_server_id=share_srv['id'])
share_net = db_utils.create_share_network()
share_srv = db_utils.create_share_server(
share_network_id=share_net['id'], host=self.share_manager.host)
share = db_utils.create_share(share_network_id=share_net['id'],
share_server_id=share_srv['id'])
self.share_manager.unmanage_share(self.context, share['id'])
@ -1004,7 +927,7 @@ class ShareManagerTestCase(test.TestCase):
def test_unmanage_share_invalid_share(self):
unmanage = mock.Mock(side_effect=exception.InvalidShare(reason="fake"))
self._setup_unmanage_mocks(mock_driver=False, mock_unmanage=unmanage)
share = self._create_share()
share = db_utils.create_share()
self.share_manager.unmanage_share(self.context, share['id'])
@ -1015,7 +938,7 @@ class ShareManagerTestCase(test.TestCase):
manager.CONF.set_default('driver_handles_share_servers', False)
self._setup_unmanage_mocks(mock_driver=False,
mock_unmanage=mock.Mock())
share = self._create_share()
share = db_utils.create_share()
share_id = share['id']
self.share_manager.unmanage_share(self.context, share_id)
@ -1032,7 +955,7 @@ class ShareManagerTestCase(test.TestCase):
mock_unmanage=mock.Mock())
self.mock_object(quota.QUOTAS, 'reserve',
mock.Mock(side_effect=Exception()))
share = self._create_share()
share = db_utils.create_share()
self.share_manager.unmanage_share(self.context, share['id'])
@ -1050,7 +973,7 @@ class ShareManagerTestCase(test.TestCase):
self.mock_object(self.share_manager, '_remove_share_access_rules',
mock.Mock(side_effect=Exception()))
self.mock_object(quota.QUOTAS, 'reserve', mock.Mock(return_value=[]))
share = self._create_share()
share = db_utils.create_share()
self.share_manager.unmanage_share(self.context, share['id'])
@ -1064,7 +987,7 @@ class ShareManagerTestCase(test.TestCase):
mock_unmanage=mock.Mock())
self.mock_object(self.share_manager, '_remove_share_access_rules')
self.mock_object(quota.QUOTAS, 'reserve', mock.Mock(return_value=[]))
share = self._create_share()
share = db_utils.create_share()
share_id = share['id']
self.share_manager.unmanage_share(self.context, share_id)
@ -1094,9 +1017,9 @@ class ShareManagerTestCase(test.TestCase):
self.assertEqual(2, self.share_manager._deny_access.call_count)
def test_delete_share_share_server_not_found(self):
share_net = self._create_share_network()
share = self._create_share(share_network_id=share_net['id'],
share_server_id='fake-id')
share_net = db_utils.create_share_network()
share = db_utils.create_share(share_network_id=share_net['id'],
share_server_id='fake-id')
share_id = share['id']
self.assertRaises(
@ -1108,23 +1031,24 @@ class ShareManagerTestCase(test.TestCase):
@ddt.data(True, False)
def test_delete_share_last_on_server_with_sec_services(self, with_details):
share_net = self._create_share_network()
sec_service = self._create_security_service(share_net['id'])
share_net = db_utils.create_share_network()
sec_service = db_utils.create_security_service(
share_network_id=share_net['id'])
backend_details = dict(
security_service_ldap=jsonutils.dumps(sec_service))
if with_details:
share_srv = self._create_share_server(
share_srv = db_utils.create_share_server(
share_network_id=share_net['id'],
host=self.share_manager.host,
backend_details=backend_details)
else:
share_srv = self._create_share_server(
share_srv = db_utils.create_share_server(
share_network_id=share_net['id'],
host=self.share_manager.host)
db.share_server_backend_details_set(
context.get_admin_context(), share_srv['id'], backend_details)
share = self._create_share(share_network_id=share_net['id'],
share_server_id=share_srv['id'])
share = db_utils.create_share(share_network_id=share_net['id'],
share_server_id=share_srv['id'])
share_id = share['id']
self.share_manager.driver = mock.Mock()
manager.CONF.delete_share_server_with_last_share = True
@ -1137,13 +1061,13 @@ class ShareManagerTestCase(test.TestCase):
backend_details['security_service_ldap'])])
def test_delete_share_last_on_server(self):
share_net = self._create_share_network()
share_srv = self._create_share_server(
share_net = db_utils.create_share_network()
share_srv = db_utils.create_share_server(
share_network_id=share_net['id'],
host=self.share_manager.host
)
share = self._create_share(share_network_id=share_net['id'],
share_server_id=share_srv['id'])
share = db_utils.create_share(share_network_id=share_net['id'],
share_server_id=share_srv['id'])
share_id = share['id']
@ -1155,13 +1079,13 @@ class ShareManagerTestCase(test.TestCase):
security_services=[])
def test_delete_share_last_on_server_deletion_disabled(self):
share_net = self._create_share_network()
share_srv = self._create_share_server(
share_net = db_utils.create_share_network()
share_srv = db_utils.create_share_server(
share_network_id=share_net['id'],
host=self.share_manager.host
)
share = self._create_share(share_network_id=share_net['id'],
share_server_id=share_srv['id'])
share = db_utils.create_share(share_network_id=share_net['id'],
share_server_id=share_srv['id'])
share_id = share['id']
manager.CONF.delete_share_server_with_last_share = False
@ -1170,15 +1094,15 @@ class ShareManagerTestCase(test.TestCase):
self.assertFalse(self.share_manager.driver.teardown_network.called)
def test_delete_share_not_last_on_server(self):
share_net = self._create_share_network()
share_srv = self._create_share_server(
share_net = db_utils.create_share_network()
share_srv = db_utils.create_share_server(
share_network_id=share_net['id'],
host=self.share_manager.host
)
share = self._create_share(share_network_id=share_net['id'],
share_server_id=share_srv['id'])
self._create_share(share_network_id=share_net['id'],
share_server_id=share_srv['id'])
share = db_utils.create_share(share_network_id=share_net['id'],
share_server_id=share_srv['id'])
db_utils.create_share(share_network_id=share_net['id'],
share_server_id=share_srv['id'])
share_id = share['id']
manager.CONF.delete_share_server_with_last_share = True
@ -1188,9 +1112,9 @@ class ShareManagerTestCase(test.TestCase):
def test_allow_deny_access(self):
"""Test access rules to share can be created and deleted."""
share = self._create_share()
share = db_utils.create_share()
share_id = share['id']
access = self._create_access(share_id=share_id)
access = db_utils.create_access(share_id=share_id)
access_id = access['id']
self.share_manager.allow_access(self.context, access_id)
self.assertEqual('active', db.share_access_get(self.context,
@ -1216,9 +1140,9 @@ class ShareManagerTestCase(test.TestCase):
self.mock_object(self.share_manager.driver, "deny_access",
_fake_deny_access)
share = self._create_share()
share = db_utils.create_share()
share_id = share['id']
access = self._create_access(share_id=share_id)
access = db_utils.create_access(share_id=share_id)
access_id = access['id']
self.assertRaises(exception.NotFound,
@ -1626,7 +1550,7 @@ class ShareManagerTestCase(test.TestCase):
timeutils.utcnow.assert_called_once_with()
def test_extend_share_invalid(self):
share = self._create_share()
share = db_utils.create_share()
share_id = share['id']
self.mock_object(self.share_manager, 'driver')
@ -1640,7 +1564,7 @@ class ShareManagerTestCase(test.TestCase):
self.share_manager.extend_share, self.context, share_id, 123, {})
def test_extend_share(self):
share = self._create_share()
share = db_utils.create_share()
share_id = share['id']
new_size = 123
shr_update = {
@ -1677,7 +1601,7 @@ class ShareManagerTestCase(test.TestCase):
def test_shrink_share_quota_error(self):
size = 5
new_size = 1
share = self._create_share(size=size)
share = db_utils.create_share(size=size)
share_id = share['id']
self.mock_object(self.share_manager.db, 'share_update')
@ -1701,7 +1625,7 @@ class ShareManagerTestCase(test.TestCase):
'status': constants.STATUS_SHRINKING_POSSIBLE_DATA_LOSS_ERROR})
@ddt.unpack
def test_shrink_share_invalid(self, exc, status):
share = self._create_share()
share = db_utils.create_share()
new_size = 1
share_id = share['id']
@ -1728,7 +1652,7 @@ class ShareManagerTestCase(test.TestCase):
self.assertTrue(self.share_manager.db.share_get.called)
def test_shrink_share(self):
share = self._create_share()
share = db_utils.create_share()
share_id = share['id']
new_size = 123
shr_update = {

View File

@ -24,9 +24,9 @@ import six
from manila.common import constants
from manila import context
from manila import db
from manila.share import rpcapi as share_rpcapi
from manila import test
from manila.tests import db_utils
CONF = cfg.CONF
@ -36,21 +36,14 @@ class ShareRpcAPITestCase(test.TestCase):
def setUp(self):
super(ShareRpcAPITestCase, self).setUp()
self.context = context.get_admin_context()
shr = {}
shr['host'] = 'fake_host'
shr['availability_zone'] = CONF.storage_availability_zone
shr['status'] = constants.STATUS_AVAILABLE
share = db.share_create(self.context, shr)
acs = {}
acs['access_type'] = "ip"
acs['access_to'] = "123.123.123.123"
acs['share_id'] = share['id']
access = db.share_access_create(self.context, acs)
snap = {}
snap['share_id'] = share['id']
snapshot = db.share_snapshot_create(self.context, snap)
server = {'id': 'fake_share_server_id', 'host': 'fake_host'}
share_server = db.share_server_create(self.context, server)
share = db_utils.create_share(
availability_zone=CONF.storage_availability_zone,
status=constants.STATUS_AVAILABLE
)
access = db_utils.create_access(share_id=share['id'])
snapshot = db_utils.create_snapshot(share_id=share['id'])
share_server = db_utils.create_share_server()
self.fake_share = jsonutils.to_primitive(share)
self.fake_access = jsonutils.to_primitive(access)
self.fake_snapshot = jsonutils.to_primitive(snapshot)

View File

@ -1,218 +0,0 @@
# Copyright (c) 2014 NetApp, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for the ShareServer and ShareServerBackendDetails tables."""
from oslo_utils import uuidutils
from manila.common import constants
from manila import context
from manila import db
from manila import exception
from manila import test
class ShareServerTableTestCase(test.TestCase):
def setUp(self):
super(ShareServerTableTestCase, self).setUp()
self.ctxt = context.RequestContext(user_id='user_id',
project_id='project_id',
is_admin=True)
def _create_share_server(self, values=None):
if not values:
values = {
'share_network_id': uuidutils.generate_uuid(),
'host': 'host1',
'status': constants.STATUS_ACTIVE,
}
return db.share_server_create(self.ctxt, values)
def test_share_server_get(self):
values = {
'share_network_id': 'fake-share-net-id',
'host': 'hostname',
'status': constants.STATUS_ACTIVE
}
expected = self._create_share_server(values)
server = db.share_server_get(self.ctxt, expected['id'])
self.assertEqual(expected['id'], server['id'])
self.assertEqual(server.share_network_id, expected.share_network_id)
self.assertEqual(server.host, expected.host)
self.assertEqual(server.status, expected.status)
def test_share_get_not_found(self):
fake_id = 'FAKE_UUID'
self.assertRaises(exception.ShareServerNotFound, db.share_server_get,
self.ctxt, fake_id)
def test_share_server_create(self):
values = {
'share_network_id': 'fake-share-net-id',
'host': 'hostname',
'status': constants.STATUS_ACTIVE,
}
server = self._create_share_server(values)
self.assertTrue(server['id'])
self.assertEqual(server.share_network_id, values['share_network_id'])
self.assertEqual(server.host, values['host'])
self.assertEqual(server.status, values['status'])
def test_share_server_delete(self):
server = self._create_share_server()
num_records = len(db.share_server_get_all(self.ctxt))
db.share_server_delete(self.ctxt, server['id'])
self.assertEqual(len(db.share_server_get_all(self.ctxt)),
num_records - 1)
def test_share_server_delete_not_found(self):
fake_id = 'FAKE_UUID'
self.assertRaises(exception.ShareServerNotFound,
db.share_server_delete,
self.ctxt, fake_id)
def test_share_server_update(self):
update = {
'share_network_id': 'update_net',
'host': 'update_host',
'status': constants.STATUS_ACTIVE,
}
server = self._create_share_server()
updated_server = db.share_server_update(self.ctxt, server['id'],
update)
self.assertEqual(server['id'], updated_server['id'])
self.assertEqual(updated_server.share_network_id,
update['share_network_id'])
self.assertEqual(updated_server.host, update['host'])
self.assertEqual(updated_server.status, update['status'])
def test_share_server_update_not_found(self):
fake_id = 'FAKE_UUID'
self.assertRaises(exception.ShareServerNotFound,
db.share_server_update,
self.ctxt, fake_id, {})
def test_share_server_get_all_by_host_and_share_net_valid(self):
valid = {
'share_network_id': '1',
'host': 'host1',
'status': constants.STATUS_ACTIVE,
}
invalid = {
'share_network_id': '1',
'host': 'host1',
'status': constants.STATUS_ERROR,
}
other = {
'share_network_id': '2',
'host': 'host2',
'status': constants.STATUS_ACTIVE,
}
valid = self._create_share_server(valid)
self._create_share_server(invalid)
self._create_share_server(other)
servers = db.share_server_get_all_by_host_and_share_net_valid(
self.ctxt,
host='host1',
share_net_id='1')
self.assertEqual(servers[0]['id'], valid['id'])
def test_share_server_get_all_by_host_and_share_net_not_found(self):
self.assertRaises(exception.ShareServerNotFound,
db.share_server_get_all_by_host_and_share_net_valid,
self.ctxt, host='fake', share_net_id='fake')
def test_share_server_get_all(self):
srv1 = {
'share_network_id': '1',
'host': 'host1',
'status': constants.STATUS_ACTIVE,
}
srv2 = {
'share_network_id': '1',
'host': 'host1',
'status': constants.STATUS_ERROR,
}
srv3 = {
'share_network_id': '2',
'host': 'host2',
'status': constants.STATUS_ACTIVE,
}
servers = db.share_server_get_all(self.ctxt)
self.assertEqual(len(servers), 0)
to_delete = self._create_share_server(srv1)
self._create_share_server(srv2)
self._create_share_server(srv3)
servers = db.share_server_get_all(self.ctxt)
self.assertEqual(len(servers), 3)
db.share_server_delete(self.ctxt, to_delete['id'])
servers = db.share_server_get_all(self.ctxt)
self.assertEqual(len(servers), 2)
def test_share_server_backend_details_set(self):
details = {
'value1': '1',
'value2': '2',
}
server = self._create_share_server()
db.share_server_backend_details_set(self.ctxt, server['id'], details)
self.assertDictMatch(
details,
db.share_server_get(self.ctxt, server['id'])['backend_details']
)
def test_share_server_backend_details_set_not_found(self):
fake_id = 'FAKE_UUID'
self.assertRaises(exception.ShareServerNotFound,
db.share_server_backend_details_set,
self.ctxt, fake_id, {})
def test_share_server_get_with_details(self):
values = {
'share_network_id': 'fake-share-net-id',
'host': 'hostname',
'status': constants.STATUS_ACTIVE,
}
details = {
'value1': '1',
'value2': '2',
}
srv_id = self._create_share_server(values)['id']
db.share_server_backend_details_set(self.ctxt, srv_id, details)
server = db.share_server_get(self.ctxt, srv_id)
self.assertEqual(srv_id, server['id'])
self.assertEqual(server.share_network_id, values['share_network_id'])
self.assertEqual(server.host, values['host'])
self.assertEqual(server.status, values['status'])
self.assertDictMatch(details, server['backend_details'])
self.assertTrue('backend_details' in server.to_dict())
def test_share_server_delete_with_details(self):
server = self._create_share_server()
details = {
'value1': '1',
'value2': '2',
}
db.share_server_backend_details_set(self.ctxt, server['id'], details)
num_records = len(db.share_server_get_all(self.ctxt))
db.share_server_delete(self.ctxt, server['id'])
self.assertEqual(len(db.share_server_get_all(self.ctxt)),
num_records - 1)

View File

@ -30,6 +30,7 @@ from manila import exception
from manila import quota
from manila import share
from manila import test
from manila.tests import db_utils
CONF = cfg.CONF
@ -42,35 +43,21 @@ class QuotaIntegrationTestCase(test.TestCase):
self.user_id = 'admin'
self.project_id = 'admin'
self.create_share = lambda size=10: (
db_utils.create_share(user_id=self.user_id,
project_id=self.project_id,
size=size,
status=constants.STATUS_AVAILABLE)
)
self.context = context.RequestContext(self.user_id,
self.project_id,
is_admin=True)
def _create_share(self, size=10):
"""Create a test share."""
share = {}
share['user_id'] = self.user_id
share['project_id'] = self.project_id
share['size'] = size
share['status'] = constants.STATUS_AVAILABLE
share['host'] = 'fake_host'
return db.share_create(self.context, share)
def _create_snapshot(self, share):
snapshot = {}
snapshot['user_id'] = self.user_id
snapshot['project_id'] = self.project_id
snapshot['share_id'] = share['id']
snapshot['share_size'] = share['size']
snapshot['host'] = share['host']
snapshot['status'] = constants.STATUS_AVAILABLE
return db.share_snapshot_create(self.context, snapshot)
@testtools.skip("SQLAlchemy sqlite insert bug")
def test_too_many_shares(self):
share_ids = []
for i in range(CONF.quota_shares):
share_ref = self._create_share()
share_ref = self.create_share()
share_ids.append(share_ref['id'])
self.assertRaises(exception.QuotaError,
share.API().create,
@ -81,7 +68,7 @@ class QuotaIntegrationTestCase(test.TestCase):
@testtools.skip("SQLAlchemy sqlite insert bug")
def test_too_many_gigabytes(self):
share_ids = []
share_ref = self._create_share(size=20)
share_ref = self.create_share(size=20)
share_ids.append(share_ref['id'])
self.assertRaises(exception.QuotaError,
share.API().create,