Remove startswith for share_proto check

We have explicit definition for supported share protocols.
Using "startswith" to search for substring is not a right way.

Change-Id: I8f482aea74249efca3bd490d971cf3e944919bb8
This commit is contained in:
chen-li 2015-01-23 16:26:42 +08:00 committed by li,chen
parent 7623504203
commit 7b050b8932
11 changed files with 154 additions and 25 deletions

View File

@ -57,11 +57,11 @@ class VNXStorageConnection(driver.StorageConnection):
vdm = self.share_server_validation(share_server)
self.allocate_container(share_name, size, vdm['id'])
if share['share_proto'].startswith('NFS'):
if share['share_proto'] == 'NFS':
location = self._create_nfs_share(share_name, vdm['name'],
share_server)
elif share['share_proto'].startswith('CIFS'):
elif share['share_proto'] == 'CIFS':
location = self._create_cifs_share(share_name, vdm)
else:
raise exception.InvalidShare(
@ -156,12 +156,12 @@ class VNXStorageConnection(driver.StorageConnection):
vdm_ref = self.share_server_validation(share_server)
self.allocate_container_from_snapshot(share, snapshot, vdm_ref)
if share['share_proto'].startswith('NFS'):
if share['share_proto'] == 'NFS':
self._create_nfs_share(share_name, vdm_ref['name'], share_server)
location = ('%(nfs_if)s:/%(share_name)s'
% {'nfs_if': share_server['backend_details']['nfs_if'],
'share_name': share_name})
elif share['share_proto'].startswith('CIFS'):
elif share['share_proto'] == 'CIFS':
location = self._create_cifs_share(share_name, vdm_ref)
else:
raise exception.InvalidShare(
@ -197,9 +197,9 @@ class VNXStorageConnection(driver.StorageConnection):
"Return directly because there is nothing to clean"))
return
if share['share_proto'].startswith('NFS'):
if share['share_proto'] == 'NFS':
self._delete_nfs_share(share, share_server)
elif share['share_proto'].startswith('CIFS'):
elif share['share_proto'] == 'CIFS':
self._delete_cifs_share(share, share_server)
else:
raise exception.InvalidShare(
@ -345,9 +345,9 @@ class VNXStorageConnection(driver.StorageConnection):
def allow_access(self, emc_share_driver, context, share, access,
share_server=None):
"""Allow access to the share."""
if share['share_proto'].startswith('NFS'):
if share['share_proto'] == 'NFS':
self._nfs_allow_access(context, share, access, share_server)
elif share['share_proto'].startswith('CIFS'):
elif share['share_proto'] == 'CIFS':
self._cifs_allow_access(context, share, access, share_server)
else:
raise exception.InvalidShare(
@ -400,9 +400,9 @@ class VNXStorageConnection(driver.StorageConnection):
def deny_access(self, emc_share_driver, context, share, access,
share_server=None):
"""Deny access to the share."""
if share['share_proto'].startswith('NFS'):
if share['share_proto'] == 'NFS':
self._nfs_deny_access(share, access, share_server)
elif share['share_proto'].startswith('CIFS'):
elif share['share_proto'] == 'CIFS':
self._cifs_deny_access(context, share, access, share_server)
else:
raise exception.InvalidShare(

View File

@ -570,9 +570,9 @@ class GenericShareDriver(driver.ExecuteMixin, driver.ShareDriver):
access['access_to'])
def _get_helper(self, share):
if share['share_proto'].startswith('NFS'):
if share['share_proto'] == 'NFS':
return self._helpers['NFS']
elif share['share_proto'].startswith('CIFS'):
elif share['share_proto'] == 'CIFS':
return self._helpers['CIFS']
else:
raise exception.InvalidShare(reason='Wrong share type')

View File

@ -512,7 +512,7 @@ class GPFSShareDriver(driver.ExecuteMixin, driver.GaneshaMixin,
super(GPFSShareDriver, self)._update_share_stats(data)
def _get_helper(self, share):
if share['share_proto'].startswith('NFS'):
if share['share_proto'] == 'NFS':
return self._helpers[self.configuration.gpfs_nfs_server_type]
else:
msg = (_('Share protocol %s not supported by GPFS driver.'),

View File

@ -366,7 +366,7 @@ class NetAppClusteredShareDriver(driver.ShareDriver):
raise exception.NetAppException(msg)
for proto in self._helpers.keys():
if share_proto.upper().startswith(proto):
if share_proto == proto:
return self._helpers[proto]
err_msg = _("Invalid NAS protocol supplied: %s. ") % share_proto

View File

@ -194,7 +194,7 @@ class ZFSSAShareDriver(driver.ShareDriver):
arg.update(self.default_args)
arg.update({'name': share['id']})
if share['share_proto'].startswith('CIFS'):
if share['share_proto'] == 'CIFS':
arg.update({'sharesmb': 'on'})
LOG.debug("ZFSSAShareDriver.create_share: id=%(name)s, size=%(quota)s",
{'name': arg['name'],
@ -240,7 +240,7 @@ class ZFSSAShareDriver(driver.ShareDriver):
}
arg.update(details)
if share['share_proto'].startswith('CIFS'):
if share['share_proto'] == 'CIFS':
arg.update({'sharesmb': 'on'})
self.zfssa.clone_snapshot(lcfg.zfssa_pool,
lcfg.zfssa_project,
@ -281,7 +281,7 @@ class ZFSSAShareDriver(driver.ShareDriver):
"""Allows access to an NFS share for the specified IP."""
LOG.debug("ZFSSAShareDriver.allow_access: share=%s", share['id'])
lcfg = self.configuration
if share['share_proto'].startswith('NFS'):
if share['share_proto'] == 'NFS':
self.zfssa.allow_access_nfs(lcfg.zfssa_pool,
lcfg.zfssa_project,
share['id'],
@ -291,12 +291,12 @@ class ZFSSAShareDriver(driver.ShareDriver):
"""Deny access to an NFS share for the specified IP."""
LOG.debug("ZFSSAShareDriver.deny_access: share=%s", share['id'])
lcfg = self.configuration
if share['share_proto'].startswith('NFS'):
if share['share_proto'] == 'NFS':
self.zfssa.deny_access_nfs(lcfg.zfssa_pool,
lcfg.zfssa_project,
share['id'],
access)
elif share['share_proto'].startswith('CIFS'):
elif share['share_proto'] == 'CIFS':
return
def _update_share_stats(self):

View File

@ -25,7 +25,7 @@ class FakeModel(object):
self.values = values
def __getattr__(self, name):
return self.values[name]
return self.values.get(name)
def __getitem__(self, key):
if key in self.values:

View File

@ -22,7 +22,7 @@ def fake_share(**kwargs):
'id': 'fakeid',
'name': 'fakename',
'size': 1,
'share_proto': 'NFS',
'share_proto': 'fake_proto',
'share_network_id': 'fake share network id',
'share_server_id': 'fake share server id',
'export_location': 'fake_location:/fake_share',
@ -38,7 +38,7 @@ def fake_snapshot(**kwargs):
'share_id': 'fakeid',
'name': 'fakesnapshotname',
'share_size': 1,
'share_proto': 'NFS',
'share_proto': 'fake_proto',
'export_location': 'fake_location:/fake_share',
}
snapshot.update(kwargs)

View File

@ -12,6 +12,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ddt
import mock
from oslo_utils import units
@ -1126,6 +1127,7 @@ class SSHSideEffect(object):
return item[1]
@ddt.ddt
class EMCShareDriverVNXTestCase(test.TestCase):
def setUp(self):
super(EMCShareDriverVNXTestCase, self).setUp()
@ -1237,6 +1239,22 @@ class EMCShareDriverVNXTestCase(test.TestCase):
helper.XMLAPIConnector.request.assert_has_calls(expected_calls)
helper.SSHConnector.run_ssh.assert_has_calls(ssh_calls)
@ddt.data(fake_share.fake_share(),
fake_share.fake_share(share_proto='NFSBOGUS'),
fake_share.fake_share(share_proto='CIFSBOGUS'))
def test_create_share_with_wrong_proto(self, share):
share_server = TD.fake_share_server()
hook = RequestSideEffect()
hook.append(TD.resp_get_vdm())
hook.append(TD.resp_task_succeed())
hook.append(TD.resp_task_succeed())
helper.XMLAPIConnector.request = mock.Mock(side_effect=hook)
sshHook = SSHSideEffect()
sshHook.append(TD.CREATE_NFS_EXPORT_OUT, TD.FAKE_ERROR)
helper.SSHConnector.run_ssh = mock.Mock(side_effect=sshHook)
self.assertRaises(exception.InvalidShare,
self.driver.create_share, None, share, share_server)
def test_create_nfs_share_default(self):
share = TD.fake_share(share_proto='NFS')
share_server = TD.fake_share_server()
@ -1260,6 +1278,28 @@ class EMCShareDriverVNXTestCase(test.TestCase):
self.assertEqual(location, '192.168.1.1:/%s' % share['name'],
"NFS export path is incorrect")
@ddt.data(fake_share.fake_share(),
fake_share.fake_share(share_proto='NFSBOGUS'),
fake_share.fake_share(share_proto='CIFSBOGUS'))
def test_delete_share_with_wrong_proto(self, share):
share_server = TD.fake_share_server()
mover_name = share_server['backend_details']['share_server_name']
path = '/' + share['name']
sshHook = SSHSideEffect()
sshHook.append(TD.resp_get_nfs_share_by_path(mover_name,
path))
sshHook.append(TD.resp_delete_nfs_share_success(mover_name))
helper.SSHConnector.run_ssh = mock.Mock(side_effect=sshHook)
hook = RequestSideEffect()
hook.append(TD.resp_task_succeed())
hook.append(TD.resp_get_filesystem(share['name']))
hook.append(TD.resp_task_succeed())
helper.XMLAPIConnector.request = mock.Mock(side_effect=hook)
self.assertRaises(exception.InvalidShare,
self.driver.delete_share,
None, share, share_server)
def test_delete_nfs_share_default(self):
share = TD.fake_share(share_proto='NFS')
share_server = TD.fake_share_server()
@ -1438,6 +1478,23 @@ class EMCShareDriverVNXTestCase(test.TestCase):
expected_calls = [mock.call(TD.req_query_snapshot('fakesnapshotname'))]
helper.XMLAPIConnector.request.assert_has_calls(expected_calls)
@ddt.data(fake_share.fake_share(),
fake_share.fake_share(share_proto='NFSBOGUS'),
fake_share.fake_share(share_proto='CIFSBOGUS'))
def test_allow_access_with_wrong_proto(self, share):
access = fake_share.fake_access()
share_server = TD.fake_share_server()
mover_name = share_server['backend_details']['share_server_name']
path = '/' + share['name']
sshHook = SSHSideEffect()
sshHook.append(TD.resp_get_nfs_share_by_path(mover_name,
path))
sshHook.append(TD.resp_change_nfs_share_success(mover_name))
helper.SSHConnector.run_ssh = mock.Mock(side_effect=sshHook)
self.assertRaises(exception.InvalidShare,
self.driver.allow_access,
None, share, access, share_server)
def test_nfs_allow_access(self):
share = TD.fake_share(share_proto='NFS')
access = TD.fake_access()
@ -1501,6 +1558,24 @@ class EMCShareDriverVNXTestCase(test.TestCase):
]
helper.SSHConnector.run_ssh.assert_has_calls(expected_calls)
@ddt.data(fake_share.fake_share(),
fake_share.fake_share(share_proto='NFSBOGUS'),
fake_share.fake_share(share_proto='CIFSBOGUS'))
def test_deny_access_with_wrong_proto(self, share):
access = fake_share.fake_access()
share_server = TD.fake_share_server()
mover_name = share_server['backend_details']['share_server_name']
path = '/' + share['name']
sshHook = SSHSideEffect()
sshHook.append(TD.resp_get_nfs_share_by_path(mover_name,
path,
[access['access_to']]))
sshHook.append(TD.resp_change_nfs_share_success(mover_name))
helper.SSHConnector.run_ssh = mock.Mock(side_effect=sshHook)
self.assertRaises(exception.InvalidShare,
self.driver.deny_access,
None, share, access, share_server)
def test_nfs_deny_access(self):
share = TD.fake_share(share_proto='NFS')
access = TD.fake_access()
@ -1556,6 +1631,32 @@ class EMCShareDriverVNXTestCase(test.TestCase):
manila.db.share_network_get.assert_called_once_with(
context, share['share_network_id'])
@ddt.data(fake_share.fake_share(),
fake_share.fake_share(share_proto='NFSBOGUS'),
fake_share.fake_share(share_proto='CIFSBOGUS'))
def test_create_share_from_snapshot_with_wrong_proto(self, share):
snap = fake_share.fake_snapshot()
share_server = TD.fake_share_server()
fake_ckpt = "fake_ckpt"
hook = RequestSideEffect()
hook.append(TD.resp_get_filesystem(share['name']))
helper.XMLAPIConnector.request = mock.Mock(side_effect=hook)
sshHook = SSHSideEffect()
sshHook.append(TD.GET_INTERCONNECT_ID_OUT, TD.FAKE_ERROR)
sshHook.append(TD.FAKE_OUTPUT, TD.FAKE_ERROR)
sshHook.append(TD.FAKE_OUTPUT, TD.FAKE_ERROR)
sshHook.append(TD.FAKE_OUTPUT, TD.COPY_CKPT_OUTPUT)
sshHook.append(TD.nas_info_out(snap['name'], fake_ckpt), TD.FAKE_ERROR)
sshHook.append(TD.FAKE_OUTPUT, TD.FAKE_ERROR)
sshHook.append(TD.FAKE_OUTPUT, TD.FAKE_ERROR)
sshHook.append(TD.FAKE_OUTPUT, TD.FAKE_ERROR)
sshHook.append(TD.CREATE_NFS_EXPORT_OUT, TD.FAKE_ERROR)
helper.SSHConnector.run_ssh = mock.Mock(side_effect=sshHook)
self.assertRaises(exception.InvalidShare,
self.driver.create_share_from_snapshot,
None, share, snap, share_server)
def test_create_share_from_snapshot(self):
share = TD.fake_share(share_proto='NFS')
snap = TD.fake_snapshot()

View File

@ -17,6 +17,7 @@
import re
import socket
import ddt
import mock
from oslo_config import cfg
@ -33,6 +34,7 @@ from manila import utils
CONF = cfg.CONF
@ddt.ddt
class GPFSShareDriverTestCase(test.TestCase):
"""Tests GPFSShareDriver."""
@ -60,7 +62,7 @@ class GPFSShareDriverTestCase(test.TestCase):
self._driver._helpers = {
'KNFS': self._helper_fake
}
self.share = fake_share.fake_share()
self.share = fake_share.fake_share(share_proto='NFS')
self.server = {
'backend_details': {
'ip': '1.2.3.4',
@ -131,6 +133,12 @@ class GPFSShareDriverTestCase(test.TestCase):
)
self.assertEqual(len(self._driver._helpers), 1)
@ddt.data(fake_share.fake_share(),
fake_share.fake_share(share_proto='NFSBOGUS'))
def test__get_helper_with_wrong_proto(self, share):
self.assertRaises(exception.InvalidShare,
self._driver._get_helper, share)
def test_create_share(self):
self._helper_fake.create_export.return_value = 'fakelocation'
methods = ('_create_share', '_get_share_path')

View File

@ -17,6 +17,7 @@
import copy
import hashlib
import ddt
import mock
from manila import context
@ -30,6 +31,7 @@ from manila.tests import fake_share
from manila import utils
@ddt.ddt
class NetAppClusteredDrvTestCase(test.TestCase):
"""Test suite for NetApp Cluster Mode driver."""
@ -49,7 +51,7 @@ class NetAppClusteredDrvTestCase(test.TestCase):
self._vserver_client = mock.Mock()
self._vserver_client.send_request = mock.Mock()
driver.NetAppApiClient = mock.Mock(return_value=self._vserver_client)
self.share = fake_share.fake_share()
self.share = fake_share.fake_share(share_proto='NFS')
self.snapshot = fake_share.fake_snapshot()
self.security_service = {'id': 'fake_id',
'domain': 'FAKE',
@ -461,6 +463,15 @@ class NetAppClusteredDrvTestCase(test.TestCase):
mock.call('net-interface-create', interface_args),
])
@ddt.data(fake_share.fake_share(),
fake_share.fake_share(share_proto='NFSBOGUS'),
fake_share.fake_share(share_proto='CIFSBOGUS'))
def test_get_helper_with_wrong_proto(self, share):
self.stubs.Set(self.driver, '_check_licenses',
mock.Mock(return_value=share['share_proto']))
self.assertRaises(exception.NetAppException,
self.driver._get_helper, share)
def test_enable_nfs(self):
self.driver._enable_nfs(self._vserver_client)
export_args = {

View File

@ -17,6 +17,7 @@
import os
import ddt
import mock
from oslo_concurrency import processutils
from oslo_config import cfg
@ -39,6 +40,7 @@ from manila import volume
CONF = cfg.CONF
@ddt.ddt
class GenericShareDriverTestCase(test.TestCase):
"""Tests GenericShareDriver."""
@ -87,7 +89,7 @@ class GenericShareDriverTestCase(test.TestCase):
'CIFS': self._helper_cifs,
'NFS': self._helper_nfs,
}
self.share = fake_share.fake_share()
self.share = fake_share.fake_share(share_proto='NFS')
self.server = {
'instance_id': 'fake_instance_id',
'ip': 'fake_ip',
@ -778,6 +780,13 @@ class GenericShareDriverTestCase(test.TestCase):
access['access_type'],
access['access_to'])
@ddt.data(fake_share.fake_share(),
fake_share.fake_share(share_proto='NFSBOGUS'),
fake_share.fake_share(share_proto='CIFSBOGUS'))
def test__get_helper_with_wrong_proto(self, share):
self.assertRaises(exception.InvalidShare,
self._driver._get_helper, share)
def test_setup_network(self):
sim = self._driver.instance_manager
net_info = {'server_id': 'fake',