Always allow delete share-network when no shares exist

Currently tenants are unable to delete share_networks if there are existing
share_servers. This doesn't make sense, because the share_server exists only
for the share_network, and if the tenant is done with the share_network,
then the share_server can go away safely.

Change-Id: I8925f3deba64da6b011516163e5554b8134c8c0a
Closes-Bug: #1429123
This commit is contained in:
Valeriy Ponomaryov 2015-03-06 15:02:46 +02:00 committed by vponomaryov
parent 57fce07062
commit 6bdeb63c3b
8 changed files with 250 additions and 76 deletions

View File

@ -212,7 +212,7 @@ class ShareServersAdminTest(base.BaseSharesAdminTest):
self.assertTrue(isinstance(v, six.string_types)) self.assertTrue(isinstance(v, six.string_types))
@test.attr(type=["gate", "smoke", ]) @test.attr(type=["gate", "smoke", ])
def test_delete_share_server(self): def _delete_share_server(self, delete_share_network):
# Get network and subnet from existing share_network and reuse it # Get network and subnet from existing share_network and reuse it
# to be able to delete share_server after test ends. # to be able to delete share_server after test ends.
# TODO(vponomaryov): attach security-services too. If any exist from # TODO(vponomaryov): attach security-services too. If any exist from
@ -258,9 +258,27 @@ class ShareServersAdminTest(base.BaseSharesAdminTest):
self.assertIn(int(resp["status"]), self.HTTP_SUCCESS) self.assertIn(int(resp["status"]), self.HTTP_SUCCESS)
self.assertEqual(len(empty), 0) self.assertEqual(len(empty), 0)
# Delete share server if delete_share_network:
resp, __ = self.shares_client.delete_share_server(serv["id"]) # Delete share network, it should trigger share server deletion
resp, __ = self.shares_client.delete_share_network(
new_sn["id"])
else:
# Delete share server
resp, __ = self.shares_client.delete_share_server(serv["id"])
self.assertIn(int(resp["status"]), self.HTTP_SUCCESS) self.assertIn(int(resp["status"]), self.HTTP_SUCCESS)
# Wait for share server deletion # Wait for share server deletion
self.shares_client.wait_for_resource_deletion(server_id=serv["id"]) self.shares_client.wait_for_resource_deletion(server_id=serv["id"])
if delete_share_network:
self.shares_client.wait_for_resource_deletion(
sn_id=new_sn["id"])
@test.attr(type=["gate", "smoke", ])
def test_delete_share_server(self):
self._delete_share_server(False)
@test.attr(type=["gate", "smoke", ])
def test_delete_share_server_by_deletion_of_share_network(self):
self._delete_share_server(True)

View File

@ -110,3 +110,24 @@ class ShareNetworksNegativeTest(base.BaseSharesTest):
exceptions.BadRequest, exceptions.BadRequest,
self.shares_client.list_share_networks_with_detail, self.shares_client.list_share_networks_with_detail,
params={'created_before': '2014-10-23T08:31:58.000000'}) params={'created_before': '2014-10-23T08:31:58.000000'})
@test.attr(type=["gate", "smoke", "negative"])
@testtools.skipIf(not CONF.share.multitenancy_enabled,
'Can run only with drivers that do handle share servers '
'creation. Skipping.')
def test_try_delete_share_network_with_existing_shares(self):
# Get valid network data for successful share creation
__, share_network = self.shares_client.get_share_network(
self.shares_client.share_network_id)
__, new_sn = self.create_share_network(
neutron_net_id=share_network['neutron_net_id'],
neutron_subnet_id=share_network['neutron_subnet_id'],
nova_net_id=share_network['nova_net_id'])
# Create share with share network
__, share = self.create_share(share_network_id=new_sn['id'])
# Try delete share network
self.assertRaises(
lib_exc.Conflict,
self.shares_client.delete_share_network, new_sn['id'])

View File

@ -109,10 +109,15 @@ class ShareNetworkController(wsgi.Controller):
share_network = db_api.share_network_get(context, id) share_network = db_api.share_network_get(context, id)
except exception.ShareNetworkNotFound as e: except exception.ShareNetworkNotFound as e:
raise exc.HTTPNotFound(explanation=six.text_type(e)) raise exc.HTTPNotFound(explanation=six.text_type(e))
if share_network['share_servers']:
msg = _("Cannot delete share network %s. " shares = db_api.share_get_all_by_share_network(context, id)
"There are share servers using it") % id if shares:
raise exc.HTTPForbidden(explanation=msg) msg = _("Can not delete share network %(id)s, it has "
"%(len)s share(s).") % {'id': id, 'len': len(shares)}
LOG.error(msg)
raise exc.HTTPConflict(explanation=msg)
for share_server in share_network['share_servers']:
self.share_rpcapi.delete_share_server(context, share_server)
db_api.share_network_delete(context, id) db_api.share_network_delete(context, id)
try: try:

View File

@ -318,6 +318,14 @@ def share_get_all_by_project(context, project_id, filters=None, sort_key=None,
) )
def share_get_all_by_share_network(context, share_network_id, filters=None,
sort_key=None, sort_dir=None):
"""Returns list of shares that belong to given share network."""
return IMPL.share_get_all_by_share_network(
context, share_network_id, filters=filters, sort_key=sort_key,
sort_dir=sort_dir)
def share_get_all_by_share_server(context, share_server_id, filters=None, def share_get_all_by_share_server(context, share_server_id, filters=None,
sort_key=None, sort_dir=None): sort_key=None, sort_dir=None):
"""Returns all shares with given share server ID.""" """Returns all shares with given share server ID."""

View File

@ -1170,13 +1170,14 @@ def share_get(context, share_id, session=None):
@require_context @require_context
def _share_get_all_with_filters(context, project_id=None, share_server_id=None, def _share_get_all_with_filters(context, project_id=None, share_server_id=None,
host=None, filters=None, share_network_id=None, host=None, filters=None,
sort_key=None, sort_dir=None): sort_key=None, sort_dir=None):
"""Returns sorted list of shares that satisfies filters. """Returns sorted list of shares that satisfies filters.
:param context: context to query under :param context: context to query under
:param project_id: project id that owns shares :param project_id: project id that owns shares
:param share_server_id: share server that hosts shares :param share_server_id: share server that hosts shares
:param share_network_id: share network that was used for shares
:param host: host name where shares [and share servers] are located :param host: host name where shares [and share servers] are located
:param filters: dict of filters to specify share selection :param filters: dict of filters to specify share selection
:param sort_key: key of models.Share to be used for sorting :param sort_key: key of models.Share to be used for sorting
@ -1193,6 +1194,8 @@ def _share_get_all_with_filters(context, project_id=None, share_server_id=None,
query = query.filter_by(project_id=project_id) query = query.filter_by(project_id=project_id)
if share_server_id: if share_server_id:
query = query.filter_by(share_server_id=share_server_id) query = query.filter_by(share_server_id=share_server_id)
if share_network_id:
query = query.filter_by(share_network_id=share_network_id)
if host and isinstance(host, six.string_types): if host and isinstance(host, six.string_types):
session = get_session() session = get_session()
with session.begin(): with session.begin():
@ -1268,6 +1271,16 @@ def share_get_all_by_project(context, project_id, filters=None,
return query return query
@require_context
def share_get_all_by_share_network(context, share_network_id, filters=None,
sort_key=None, sort_dir=None):
"""Returns list of shares that belong to given share network."""
query = _share_get_all_with_filters(
context, share_network_id=share_network_id, filters=filters,
sort_key=sort_key, sort_dir=sort_dir)
return query
@require_context @require_context
def share_get_all_by_share_server(context, share_server_id, filters=None, def share_get_all_by_share_server(context, share_server_id, filters=None,
sort_key=None, sort_dir=None): sort_key=None, sort_dir=None):
@ -1948,6 +1961,8 @@ def share_server_backend_details_delete(context, share_server_id,
@require_context @require_context
def share_server_backend_details_get(context, share_server_id, def share_server_backend_details_get(context, share_server_id,
session=None): session=None):
if not session:
session = get_session()
query = model_query(context, models.ShareServerBackendDetails, query = model_query(context, models.ShareServerBackendDetails,
session=session)\ session=session)\
.filter_by(share_server_id=share_server_id).all() .filter_by(share_server_id=share_server_id).all()

View File

@ -21,6 +21,7 @@
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log from oslo_log import log
from oslo_serialization import jsonutils
from oslo_utils import excutils from oslo_utils import excutils
from oslo_utils import importutils from oslo_utils import importutils
from oslo_utils import timeutils from oslo_utils import timeutils
@ -484,11 +485,31 @@ class ShareManager(manager.SchedulerDependentManager):
# Get share_network again in case it was updated. # Get share_network again in case it was updated.
share_network = self.db.share_network_get( share_network = self.db.share_network_get(
context, share_server['share_network_id']) context, share_server['share_network_id'])
network_info = self._form_server_setup_info( network_info = self._form_server_setup_info(
context, share_server, share_network) context, share_server, share_network)
# NOTE(vponomaryov): Save security services data to share server
# details table to remove dependency from share network after
# creation operation. It will allow us to delete share server and
# share network separately without dependency on each other.
for security_service in network_info['security_services']:
ss_type = security_service['type']
data = {
'name': security_service['name'],
'domain': security_service['domain'],
'server': security_service['server'],
'dns_ip': security_service['dns_ip'],
'user': security_service['user'],
'type': ss_type,
'password': security_service['password'],
}
self.db.share_server_backend_details_set(
context, share_server['id'],
{'security_service_' + ss_type: jsonutils.dumps(data)})
server_info = self.driver.setup_server( server_info = self.driver.setup_server(
network_info, metadata=metadata) network_info, metadata=metadata)
if server_info and isinstance(server_info, dict): if server_info and isinstance(server_info, dict):
self.db.share_server_backend_details_set( self.db.share_server_backend_details_set(
context, share_server['id'], server_info) context, share_server['id'], server_info)
@ -528,31 +549,43 @@ class ShareManager(manager.SchedulerDependentManager):
# of share_server_id field for share. If so, after lock realese # of share_server_id field for share. If so, after lock realese
# this method starts executing when amount of dependent shares # this method starts executing when amount of dependent shares
# has been changed. # has been changed.
shares = self.db.share_get_all_by_share_server( server_id = share_server['id']
context, share_server['id']) shares = self.db.share_get_all_by_share_server(context, server_id)
if shares:
raise exception.ShareServerInUse(
share_server_id=share_server['id'])
self.db.share_server_update(context, share_server['id'], if shares:
raise exception.ShareServerInUse(share_server_id=server_id)
if 'backend_details' not in share_server:
server_details = self.db.share_server_backend_details_get(
context, server_id)
else:
server_details = share_server['backend_details']
self.db.share_server_update(context, server_id,
{'status': constants.STATUS_DELETING}) {'status': constants.STATUS_DELETING})
sec_services = self.db.share_network_get(
context,
share_server['share_network_id'])['security_services']
try: try:
LOG.debug("Deleting share server") LOG.debug("Deleting share server '%s'", server_id)
self.driver.teardown_server(share_server['backend_details'], security_services = []
security_services=sec_services) for ss_name in constants.SECURITY_SERVICES_ALLOWED_TYPES:
ss = server_details.get('security_service_' + ss_name)
if ss:
security_services.append(jsonutils.loads(ss))
self.driver.teardown_server(
server_details=server_details,
security_services=security_services)
except Exception: except Exception:
with excutils.save_and_reraise_exception(): with excutils.save_and_reraise_exception():
LOG.error(_LE("Share server %s failed on deletion."), LOG.error(
share_server['id']) _LE("Share server '%s' failed on deletion."),
server_id)
self.db.share_server_update( self.db.share_server_update(
context, share_server['id'], context, server_id, {'status': constants.STATUS_ERROR})
{'status': constants.STATUS_ERROR})
else: else:
self.db.share_server_delete(context, share_server['id']) self.db.share_server_delete(context, share_server['id'])
_teardown_server() _teardown_server()
LOG.info(_LI("Share server deleted successfully.")) LOG.info(
_LI("Share server '%s' has been deleted successfully."),
share_server['id'])
self.driver.deallocate_network(context, share_server['id']) self.driver.deallocate_network(context, share_server['id'])

View File

@ -201,42 +201,83 @@ class ShareNetworkAPITest(test.TestCase):
self.req, self.req,
body) body)
@mock.patch.object(db_api, 'share_network_get', mock.Mock())
def test_delete_nominal(self): def test_delete_nominal(self):
share_nw = fake_share_network.copy() share_nw = fake_share_network.copy()
share_nw['share_servers'] = [] share_nw['share_servers'] = ['foo', 'bar']
db_api.share_network_get.return_value = share_nw self.mock_object(db_api, 'share_network_get',
mock.Mock(return_value=share_nw))
self.mock_object(db_api, 'share_get_all_by_share_network',
mock.Mock(return_value=[]))
self.mock_object(self.controller.share_rpcapi, 'delete_share_server')
self.mock_object(db_api, 'share_network_delete')
with mock.patch.object(db_api, 'share_network_delete'): self.controller.delete(self.req, share_nw['id'])
self.controller.delete(self.req, share_nw)
db_api.share_network_delete.assert_called_once_with( db_api.share_network_get.assert_called_once_with(
self.req.environ['manila.context'], self.req.environ['manila.context'], share_nw['id'])
share_nw) db_api.share_get_all_by_share_network.assert_called_once_with(
self.req.environ['manila.context'], share_nw['id'])
self.controller.share_rpcapi.delete_share_server.assert_has_calls([
mock.call(self.req.environ['manila.context'], 'foo'),
mock.call(self.req.environ['manila.context'], 'bar')])
db_api.share_network_delete.assert_called_once_with(
self.req.environ['manila.context'], share_nw['id'])
@mock.patch.object(db_api, 'share_network_get', mock.Mock())
def test_delete_not_found(self): def test_delete_not_found(self):
share_nw = 'fake network id' share_nw = 'fake network id'
db_api.share_network_get.side_effect = exception.ShareNetworkNotFound( self.mock_object(db_api, 'share_network_get',
share_network_id=share_nw) mock.Mock(side_effect=exception.ShareNetworkNotFound(
share_network_id=share_nw)))
self.assertRaises(webob_exc.HTTPNotFound, self.assertRaises(webob_exc.HTTPNotFound,
self.controller.delete, self.controller.delete,
self.req, self.req,
share_nw) share_nw)
@mock.patch.object(db_api, 'share_network_get', mock.Mock()) def test_quota_delete_reservation_failed(self):
share_nw = fake_share_network.copy()
share_nw['share_servers'] = ['foo', 'bar']
self.mock_object(db_api, 'share_network_get',
mock.Mock(return_value=share_nw))
self.mock_object(db_api, 'share_get_all_by_share_network',
mock.Mock(return_value=[]))
self.mock_object(self.controller.share_rpcapi, 'delete_share_server')
self.mock_object(db_api, 'share_network_delete')
self.mock_object(share_networks.QUOTAS, 'reserve',
mock.Mock(side_effect=Exception))
self.mock_object(share_networks.QUOTAS, 'commit')
self.controller.delete(self.req, share_nw['id'])
db_api.share_network_get.assert_called_once_with(
self.req.environ['manila.context'], share_nw['id'])
db_api.share_get_all_by_share_network.assert_called_once_with(
self.req.environ['manila.context'], share_nw['id'])
self.controller.share_rpcapi.delete_share_server.assert_has_calls([
mock.call(self.req.environ['manila.context'], 'foo'),
mock.call(self.req.environ['manila.context'], 'bar')])
db_api.share_network_delete.assert_called_once_with(
self.req.environ['manila.context'], share_nw['id'])
self.assertTrue(share_networks.QUOTAS.reserve.called)
self.assertFalse(share_networks.QUOTAS.commit.called)
def test_delete_in_use(self): def test_delete_in_use(self):
share_nw = fake_share_network.copy() share_nw = fake_share_network.copy()
share_servers = [{'id': 1}] self.mock_object(db_api, 'share_network_get',
share_nw['share_servers'] = share_servers mock.Mock(return_value=share_nw))
self.mock_object(db_api, 'share_get_all_by_share_network',
mock.Mock(return_value=['foo', 'bar']))
db_api.share_network_get.return_value = share_nw self.assertRaises(webob_exc.HTTPConflict,
self.assertRaises(webob_exc.HTTPForbidden,
self.controller.delete, self.controller.delete,
self.req, self.req,
share_nw['id']) share_nw['id'])
db_api.share_network_get.assert_called_once_with(
self.req.environ['manila.context'], share_nw['id'])
db_api.share_get_all_by_share_network.assert_called_once_with(
self.req.environ['manila.context'], share_nw['id'])
def test_show_nominal(self): def test_show_nominal(self):
share_nw = 'fake network id' share_nw = 'fake network id'
with mock.patch.object(db_api, with mock.patch.object(db_api,

View File

@ -15,7 +15,9 @@
"""Test of Share Manager for Manila.""" """Test of Share Manager for Manila."""
import ddt
import mock import mock
from oslo_serialization import jsonutils
from oslo_utils import importutils from oslo_utils import importutils
from manila.common import constants from manila.common import constants
@ -43,6 +45,7 @@ class FakeAccessRule(object):
return getattr(self, item) return getattr(self, item)
@ddt.ddt
class ShareManagerTestCase(test.TestCase): class ShareManagerTestCase(test.TestCase):
def setUp(self): def setUp(self):
@ -97,17 +100,17 @@ class ShareManagerTestCase(test.TestCase):
return db.share_access_create(context.get_admin_context(), access) return db.share_access_create(context.get_admin_context(), access)
@staticmethod @staticmethod
def _create_share_server(state='ACTIVE', share_network_id=None, host=None): def _create_share_server(state='ACTIVE', share_network_id=None, host=None,
backend_details=None):
"""Create a share server object.""" """Create a share server object."""
srv = {} srv = {}
srv['host'] = host srv['host'] = host
srv['share_network_id'] = share_network_id srv['share_network_id'] = share_network_id
srv['status'] = state srv['status'] = state
share_srv = db.share_server_create(context.get_admin_context(), srv) share_srv = db.share_server_create(context.get_admin_context(), srv)
backend_details = {'fake': 'fake'} if backend_details:
db.share_server_backend_details_set(context.get_admin_context(), db.share_server_backend_details_set(
share_srv['id'], context.get_admin_context(), share_srv['id'], backend_details)
backend_details)
return db.share_server_get(context.get_admin_context(), return db.share_server_get(context.get_admin_context(),
share_srv['id']) share_srv['id'])
@ -334,8 +337,9 @@ class ShareManagerTestCase(test.TestCase):
def test_create_share_from_snapshot_with_server(self): def test_create_share_from_snapshot_with_server(self):
"""Test share can be created from snapshot if server exists.""" """Test share can be created from snapshot if server exists."""
network = self._create_share_network() network = self._create_share_network()
server = self._create_share_server(share_network_id=network['id'], server = self._create_share_server(
host='fake_host') share_network_id=network['id'], host='fake_host',
backend_details=dict(fake='fake'))
parent_share = self._create_share(share_network_id='net-id', parent_share = self._create_share(share_network_id='net-id',
share_server_id=server['id']) share_server_id=server['id'])
share = self._create_share() share = self._create_share()
@ -696,33 +700,35 @@ class ShareManagerTestCase(test.TestCase):
share_id share_id
) )
def test_delete_share_last_on_server_with_sec_services(self): @ddt.data(True, False)
def test_delete_share_last_on_server_with_sec_services(self, with_details):
share_net = self._create_share_network() share_net = self._create_share_network()
sec_service = self._create_security_service(share_net['id']) sec_service = self._create_security_service(share_net['id'])
share_srv = self._create_share_server( backend_details = dict(
share_network_id=share_net['id'], security_service_ldap=jsonutils.dumps(sec_service))
host=self.share_manager.host if with_details:
) share_srv = self._create_share_server(
share_network_id=share_net['id'],
host=self.share_manager.host,
backend_details=backend_details)
else:
share_srv = self._create_share_server(
share_network_id=share_net['id'],
host=self.share_manager.host)
db.share_server_backend_details_set(
context.get_admin_context(), share_srv['id'], backend_details)
share = self._create_share(share_network_id=share_net['id'], share = self._create_share(share_network_id=share_net['id'],
share_server_id=share_srv['id']) share_server_id=share_srv['id'])
share_id = share['id'] share_id = share['id']
self.share_manager.driver = mock.Mock() self.share_manager.driver = mock.Mock()
manager.CONF.delete_share_server_with_last_share = True manager.CONF.delete_share_server_with_last_share = True
self.share_manager.delete_share(self.context, share_id)
self.assertTrue(self.share_manager.driver.teardown_server.called)
call_args = self.share_manager.driver.teardown_server.call_args[0]
call_kwargs = self.share_manager.driver.teardown_server.call_args[1]
self.assertEqual(
call_args[0],
share_srv.get('backend_details'))
self.assertEqual( self.share_manager.delete_share(self.context, share_id)
len(call_kwargs['security_services']), 1)
self.assertTrue( self.share_manager.driver.teardown_server.assert_called_once_with(
call_kwargs['security_services'][0]['id'], server_details=backend_details,
sec_service['id']) security_services=[jsonutils.loads(
backend_details['security_service_ldap'])])
def test_delete_share_last_on_server(self): def test_delete_share_last_on_server(self):
share_net = self._create_share_network() share_net = self._create_share_network()
@ -739,8 +745,8 @@ class ShareManagerTestCase(test.TestCase):
manager.CONF.delete_share_server_with_last_share = True manager.CONF.delete_share_server_with_last_share = True
self.share_manager.delete_share(self.context, share_id) self.share_manager.delete_share(self.context, share_id)
self.share_manager.driver.teardown_server.assert_called_once_with( self.share_manager.driver.teardown_server.assert_called_once_with(
share_srv.get('backend_details'), security_services=[] server_details=share_srv.get('backend_details'),
) security_services=[])
def test_delete_share_last_on_server_deletion_disabled(self): def test_delete_share_last_on_server_deletion_disabled(self):
share_net = self._create_share_network() share_net = self._create_share_network()
@ -833,7 +839,18 @@ class ShareManagerTestCase(test.TestCase):
} }
metadata = {'fake_metadata_key': 'fake_metadata_value'} metadata = {'fake_metadata_key': 'fake_metadata_value'}
share_network = {'id': 'fake_sn_id'} share_network = {'id': 'fake_sn_id'}
network_info = {'fake_network_info_key': 'fake_network_info_value'} network_info = {'security_services': []}
for ss_type in constants.SECURITY_SERVICES_ALLOWED_TYPES:
network_info['security_services'].append({
'name': 'fake_name' + ss_type,
'domain': 'fake_domain' + ss_type,
'server': 'fake_server' + ss_type,
'dns_ip': 'fake_dns_ip' + ss_type,
'user': 'fake_user' + ss_type,
'type': ss_type,
'password': 'fake_password' + ss_type,
})
sec_services = network_info['security_services']
server_info = {'fake_server_info_key': 'fake_server_info_value'} server_info = {'fake_server_info_key': 'fake_server_info_value'}
# mock required stuff # mock required stuff
@ -866,8 +883,18 @@ class ShareManagerTestCase(test.TestCase):
self.share_manager.driver.setup_server.assert_called_once_with( self.share_manager.driver.setup_server.assert_called_once_with(
network_info, metadata=metadata) network_info, metadata=metadata)
self.share_manager.db.share_server_backend_details_set.\ self.share_manager.db.share_server_backend_details_set.\
assert_called_once_with( assert_has_calls([
self.context, share_server['id'], server_info) mock.call(self.context, share_server['id'],
{'security_service_' + sec_services[0]['type']:
jsonutils.dumps(sec_services[0])}),
mock.call(self.context, share_server['id'],
{'security_service_' + sec_services[1]['type']:
jsonutils.dumps(sec_services[1])}),
mock.call(self.context, share_server['id'],
{'security_service_' + sec_services[2]['type']:
jsonutils.dumps(sec_services[2])}),
mock.call(self.context, share_server['id'], server_info),
])
self.share_manager.db.share_server_update.assert_called_once_with( self.share_manager.db.share_server_update.assert_called_once_with(
self.context, share_server['id'], self.context, share_server['id'],
{'status': constants.STATUS_ACTIVE}) {'status': constants.STATUS_ACTIVE})
@ -880,7 +907,10 @@ class ShareManagerTestCase(test.TestCase):
} }
metadata = {'fake_metadata_key': 'fake_metadata_value'} metadata = {'fake_metadata_key': 'fake_metadata_value'}
share_network = {'id': 'fake_sn_id'} share_network = {'id': 'fake_sn_id'}
network_info = {'fake_network_info_key': 'fake_network_info_value'} network_info = {
'fake_network_info_key': 'fake_network_info_value',
'security_services': [],
}
server_info = {} server_info = {}
# mock required stuff # mock required stuff
@ -921,7 +951,10 @@ class ShareManagerTestCase(test.TestCase):
} }
server_info = {'details_key': 'value'} server_info = {'details_key': 'value'}
share_network = {'id': 'fake_sn_id'} share_network = {'id': 'fake_sn_id'}
network_info = {'fake_network_info_key': 'fake_network_info_value'} network_info = {
'fake_network_info_key': 'fake_network_info_value',
'security_services': [],
}
if detail_data_proper: if detail_data_proper:
detail_data = {'server_details': server_info} detail_data = {'server_details': server_info}
self.mock_object(self.share_manager.db, self.mock_object(self.share_manager.db,