glusterfs: create share of specific size

Currently, glusterfs driver creates shares, subdirectories within
GlusterFS volume, ignoring the share size argument. Use GlusterFS's
ability to set hard-limit quota on subdirectories (shares) to enable
creation of shares of specific size.

Change-Id: I28b4787e0ada4f6505d929f8d417218a0dc4d0b4
Closes-Bug: #1401000
This commit is contained in:
Ramana Raja 2014-09-22 06:20:49 +00:00
parent 11e48a1cce
commit e6f0f88999
2 changed files with 335 additions and 149 deletions

View File

@ -29,6 +29,9 @@ import pipes
import re
import xml.etree.cElementTree as etree
from oslo.config import cfg
import six
from manila import exception
from manila.i18n import _
from manila.i18n import _LE
@ -36,7 +39,6 @@ from manila.i18n import _LW
from manila.openstack.common import log as logging
from manila.share import driver
from oslo.config import cfg
LOG = logging.getLogger(__name__)
@ -120,6 +122,32 @@ class GlusterfsShareDriver(driver.ExecuteMixin, driver.ShareDriver):
self._ensure_gluster_vol_mounted()
self._setup_gluster_vol()
def _get_gluster_vol_option(self, option):
try:
args, kw = self.gluster_address.make_gluster_args(
'--xml',
'volume',
'info',
self.gluster_address.volume
)
out, err = self._execute(*args, **kw)
except exception.ProcessExecutionError as exc:
LOG.error(_LE("Error retrieving volume info: %s"), exc.stderr)
raise exception.GlusterfsException(exc)
if not out:
raise exception.GlusterfsException(
'Empty answer from gluster command'
)
vix = etree.fromstring(out)
if int(vix.find('./volInfo/volumes/count').text) != 1:
raise exception.InvalidShare('Volume name ambiguity')
for e in vix.findall(".//option"):
o, v = (e.find(a).text for a in ('name', 'value'))
if o == option:
return v
def _setup_gluster_vol(self):
# exporting the whole volume must be prohibited
# to not to defeat access control
@ -129,8 +157,20 @@ class GlusterfsShareDriver(driver.ExecuteMixin, driver.ShareDriver):
try:
self._execute(*args, **kw)
except exception.ProcessExecutionError as exc:
LOG.error(_LE("Error in gluster volume set: %s"), exc.stderr)
LOG.error(_LE("Error in disabling access to the entire GlusterFS "
"volume: %s"), exc.stderr)
raise
# enable quota options of a GlusteFS volume to allow
# creation of shares of specific size
args, kw = self.gluster_address.make_gluster_args(
'volume', 'quota', self.gluster_address.volume, 'enable')
try:
self._execute(*args, **kw)
except exception.ProcessExecutionError as exc:
if self._get_gluster_vol_option('features.quota') != 'on':
LOG.error(_LE("Error in enabling creation of shares of "
"specific size: %s"), exc.stderr)
raise exception.GlusterfsException(exc)
def check_for_setup_error(self):
pass
@ -166,33 +206,7 @@ class GlusterfsShareDriver(driver.ExecuteMixin, driver.ShareDriver):
def _get_export_dir_dict(self):
"""Get the export entries of shares in the GlusterFS volume."""
try:
args, kw = self.gluster_address.make_gluster_args(
'--xml',
'volume',
'info',
self.gluster_address.volume
)
out, err = self._execute(*args, **kw)
except exception.ProcessExecutionError as exc:
LOG.error(_LE("Error retrieving volume info: %s"), exc.stderr)
raise
if not out:
raise exception.GlusterfsException(
'Empty answer from gluster command'
)
vix = etree.fromstring(out)
if int(vix.find('./volInfo/volumes/count').text) != 1:
raise exception.InvalidShare('Volume name ambiguity')
export_dir = None
for o, v in \
((e.find(a).text for a in ('name', 'value'))
for e in vix.findall(".//option")):
if o == NFS_EXPORT_DIR:
export_dir = v
break
export_dir = self._get_gluster_vol_option(NFS_EXPORT_DIR)
edh = {}
if export_dir:
# see
@ -267,18 +281,38 @@ class GlusterfsShareDriver(driver.ExecuteMixin, driver.ShareDriver):
def create_share(self, ctx, share, share_server=None):
"""Create a sub-directory/share in the GlusterFS volume."""
sizestr = six.text_type(share['size']) + 'GB'
share_dir = '/' + share['name']
local_share_path = self._get_local_share_path(share)
cmd = ['mkdir', local_share_path]
# set hard limit quota on the sub-directory/share
args, kw = self.gluster_address.make_gluster_args(
'volume', 'quota', self.gluster_address.volume, 'limit-usage',
share_dir, sizestr)
try:
self._execute(*cmd, run_as_root=True)
except exception.ProcessExecutionError:
self._execute(*args, **kw)
except exception.ProcessExecutionError as exc:
self._cleanup_create_share(local_share_path, share['name'])
LOG.error(_LE('Unable to create share %s'), share['name'])
raise
raise exception.GlusterfsException(exc)
export_location = os.path.join(self.gluster_address.qualified,
share['name'])
return export_location
def _cleanup_create_share(self, share_path, share_name):
"""Cleanup share that errored out during its creation."""
if os.path.exists(share_path):
cmd = ['rm', '-rf', share_path]
try:
self._execute(*cmd, run_as_root=True)
except exception.ProcessExecutionError as exc:
LOG.error(_LE('Cannot cleanup share, %s, that errored out '
'during its creation, but exists in GlusterFS '
'volume.'), share_name)
raise exception.GlusterfsException(exc)
def delete_share(self, context, share, share_server=None):
"""Remove a sub-directory/share from the GlusterFS volume."""
local_share_path = self._get_local_share_path(share)

View File

@ -40,6 +40,8 @@ gluster_address_attrs = {
'volume': 'testvol',
}
fake_local_share_path = '/mnt/nfs/testvol/fakename'
def fake_share(**kwargs):
share = {
@ -52,6 +54,8 @@ def fake_share(**kwargs):
share.update(kwargs)
return db_fakes.FakeModel(share)
NFS_EXPORT_DIR = 'nfs.export-dir'
class GlusterAddressTestCase(test.TestCase):
"""Tests GlusterAddress."""
@ -118,7 +122,10 @@ class GlusterfsShareDriverTestCase(test.TestCase):
self._driver = glusterfs.GlusterfsShareDriver(
self._db, execute=self._execute,
configuration=self.fake_conf)
self._driver.gluster_address = mock.Mock(**gluster_address_attrs)
self._driver.gluster_address = (
mock.Mock(
make_gluster_args=mock.Mock(return_value=(('true',), {})),
**gluster_address_attrs))
self.share = fake_share()
def tearDown(self):
@ -127,13 +134,19 @@ class GlusterfsShareDriverTestCase(test.TestCase):
fake_utils.fake_execute_clear_log()
def test_do_setup(self):
self._driver._ensure_gluster_vol_mounted = mock.Mock()
exec_cmd1 = 'mount.glusterfs'
exec_cmd2 = 'gluster volume set testvol nfs.export-volumes off'
expected_exec = [exec_cmd1, exec_cmd2]
methods = ('_ensure_gluster_vol_mounted', '_setup_gluster_vol')
for method in methods:
self.stubs.Set(self._driver, method, mock.Mock())
self.stubs.Set(glusterfs, 'GlusterAddress',
mock.Mock(return_value=True))
expected_exec = ['mount.glusterfs']
self._driver.do_setup(self._context)
self.assertEqual(True, self._driver.gluster_address)
self.assertEqual(expected_exec, fake_utils.fake_execute_get_log())
glusterfs.GlusterAddress.assert_called_once_with(
self._driver.configuration.glusterfs_target)
self._driver._ensure_gluster_vol_mounted.assert_called_once_with()
self.assertEqual(fake_utils.fake_execute_get_log(), expected_exec)
self._driver._setup_gluster_vol.assert_called_once_with()
def test_do_setup_glusterfs_target_not_set(self):
self._driver.configuration.glusterfs_target = None
@ -151,24 +164,175 @@ class GlusterfsShareDriverTestCase(test.TestCase):
self.assertRaises(exception.GlusterfsException, self._driver.do_setup,
self._context)
def test_do_setup_mount_glusterfs_error_gluster_vol_set(self):
self._driver._ensure_gluster_vol_mounted = mock.Mock()
glusterfs.LOG.error = mock.Mock()
def test_get_gluster_vol_option_empty_volinfo(self):
self._driver.gluster_address = mock.Mock(
make_gluster_args=mock.Mock(return_value=(('true',), {})))
expected_exec = ['true']
self.assertRaises(exception.GlusterfsException,
self._driver._get_gluster_vol_option, NFS_EXPORT_DIR)
self.assertEqual(expected_exec, fake_utils.fake_execute_get_log())
def test_get_gluster_vol_option_failing_volinfo(self):
self._driver.gluster_address = mock.Mock(
make_gluster_args=mock.Mock(return_value=(('true',), {})))
def exec_runner(*ignore_args, **ignore_kwargs):
raise exception.ProcessExecutionError(stderr='testvol')
expected_exec = ['gluster volume set testvol nfs.export-volumes off']
raise RuntimeError('fake error')
expected_exec = ['true']
fake_utils.fake_execute_set_repliers([(expected_exec[0], exec_runner)])
self.assertRaises(RuntimeError, self._driver._get_gluster_vol_option,
NFS_EXPORT_DIR)
self.assertEqual(expected_exec, fake_utils.fake_execute_get_log())
try:
self._driver.do_setup(self._context)
except exception.ProcessExecutionError:
pass
else:
self.fail('Expecting exception.ProcessExecutionError')
def test_get_gluster_vol_option_ambiguous_volinfo(self):
self._driver.gluster_address = mock.Mock(
make_gluster_args=mock.Mock(return_value=(('true',), {})))
glusterfs.LOG.error.assert_called_with(
"Error in gluster volume set: %s", "testvol")
def exec_runner(*ignore_args, **ignore_kwargs):
return """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<cliOutput>
<volInfo>
<volumes>
<count>0</count>
</volumes>
</volInfo>
</cliOutput>""", ''
expected_exec = ['true']
fake_utils.fake_execute_set_repliers([(expected_exec[0], exec_runner)])
self.assertRaises(exception.InvalidShare,
self._driver._get_gluster_vol_option, NFS_EXPORT_DIR)
self.assertEqual(expected_exec, fake_utils.fake_execute_get_log())
def test_get_gluster_vol_option_trivial_volinfo(self):
self._driver.gluster_address = mock.Mock(
make_gluster_args=mock.Mock(return_value=(('true',), {})))
def exec_runner(*ignore_args, **ignore_kwargs):
return """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<cliOutput>
<volInfo>
<volumes>
<volume>
</volume>
<count>1</count>
</volumes>
</volInfo>
</cliOutput>""", ''
expected_exec = ['true']
fake_utils.fake_execute_set_repliers([(expected_exec[0], exec_runner)])
ret = self._driver._get_gluster_vol_option(NFS_EXPORT_DIR)
self.assertEqual(expected_exec, fake_utils.fake_execute_get_log())
self.assertEqual(None, ret)
def test_get_gluster_vol_option(self):
self._driver.gluster_address = mock.Mock(
make_gluster_args=mock.Mock(return_value=(('true',), {})))
def exec_runner(*ignore_args, **ignore_kwargs):
return """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<cliOutput>
<volInfo>
<volumes>
<volume>
<options>
<option>
<name>nfs.export-dir</name>
<value>/foo(10.0.0.1|10.0.0.2),/bar(10.0.0.1)</value>
</option>
</options>
</volume>
<count>1</count>
</volumes>
</volInfo>
</cliOutput>""", ''
expected_exec = ['true']
fake_utils.fake_execute_set_repliers([(expected_exec[0], exec_runner)])
ret = self._driver._get_gluster_vol_option(NFS_EXPORT_DIR)
self.assertEqual(fake_utils.fake_execute_get_log(), expected_exec)
self.assertEqual(
'/foo(10.0.0.1|10.0.0.2),/bar(10.0.0.1)', ret)
def test_setup_gluster_vol(self):
args1 = ('volume', 'set', 'testvol', 'nfs.export-volumes', 'off')
args2 = ('volume', 'quota', 'testvol', 'enable')
cmd_join = lambda args: 'gluster ' + ' '.join(arg for arg in args)
expected_exec = [cmd_join(args1), cmd_join(args2)]
self.stubs.Set(
self._driver.gluster_address, 'make_gluster_args',
mock.Mock(side_effect=[(('gluster',) + args1, {}),
(('gluster',) + args2, {})]))
self._driver._setup_gluster_vol()
self._driver.gluster_address.make_gluster_args.has_calls(
mock.call(args1), mock.call(args2))
self.assertEqual(expected_exec, fake_utils.fake_execute_get_log())
def test_setup_gluster_vol_error_disabling_access(self):
def exec_runner(*ignore_args, **ignore_kwargs):
raise exception.ProcessExecutionError()
args1 = ('volume', 'set', 'testvol', 'nfs.export-volumes', 'off')
cmd_join = lambda args: 'gluster ' + ' '.join(arg for arg in args)
expected_exec = [cmd_join(args1)]
self.stubs.Set(
self._driver.gluster_address, 'make_gluster_args',
mock.Mock(side_effect=[(('gluster',) + args1, {})]))
self.stubs.Set(glusterfs.LOG, 'error', mock.Mock())
fake_utils.fake_execute_set_repliers([(expected_exec[0], exec_runner)])
self.assertRaises(
exception.ProcessExecutionError, self._driver._setup_gluster_vol)
self._driver.gluster_address.make_gluster_args.has_calls(
mock.call(args1))
self.assertEqual(expected_exec, fake_utils.fake_execute_get_log())
glusterfs.LOG.error.assert_called_once_with(mock.ANY, mock.ANY)
def test_setup_gluster_vol_error_enabling_creation_share_specific_size(
self):
def exec_runner(*ignore_args, **ignore_kwargs):
raise exception.ProcessExecutionError(stderr='fake error')
args1 = ('volume', 'set', 'testvol', 'nfs.export-volumes', 'off')
args2 = ('volume', 'quota', 'testvol', 'enable')
cmd_join = lambda args: 'gluster ' + ' '.join(arg for arg in args)
expected_exec = [cmd_join(args1), cmd_join(args2)]
self.stubs.Set(
self._driver.gluster_address, 'make_gluster_args',
mock.Mock(side_effect=[(('gluster',) + args1, {}),
(('gluster',) + args2, {})]))
self.stubs.Set(glusterfs.LOG, 'error', mock.Mock())
self.stubs.Set(self._driver, '_get_gluster_vol_option',
mock.Mock(return_value='off'))
fake_utils.fake_execute_set_repliers([(expected_exec[1], exec_runner)])
self.assertRaises(
exception.GlusterfsException, self._driver._setup_gluster_vol)
self._driver.gluster_address.make_gluster_args.has_calls(
mock.call(args1), mock.call(args2))
self.assertEqual(expected_exec, fake_utils.fake_execute_get_log())
self._driver._get_gluster_vol_option.assert_called_once_with(
'features.quota')
glusterfs.LOG.error.assert_called_once_with(mock.ANY, mock.ANY)
def test_setup_gluster_vol_already_enabled_creation_share_specific_size(
self):
def exec_runner(*ignore_args, **ignore_kwargs):
raise exception.ProcessExecutionError(stderr='already enabled')
args1 = ('volume', 'set', 'testvol', 'nfs.export-volumes', 'off')
args2 = ('volume', 'quota', 'testvol', 'enable')
cmd_join = lambda args: 'gluster ' + ' '.join(arg for arg in args)
expected_exec = [cmd_join(args1), cmd_join(args2)]
self.stubs.Set(
self._driver.gluster_address, 'make_gluster_args',
mock.Mock(side_effect=[(('gluster',) + args1, {}),
(('gluster',) + args2, {})]))
self.stubs.Set(self._driver, '_get_gluster_vol_option',
mock.Mock(return_value='on'))
fake_utils.fake_execute_set_repliers([(expected_exec[1], exec_runner)])
self._driver._setup_gluster_vol()
self._driver.gluster_address.make_gluster_args.has_calls(
mock.call(args1), mock.call(args2))
self._driver._get_gluster_vol_option.assert_called_once_with(
'features.quota')
self.assertEqual(expected_exec, fake_utils.fake_execute_get_log())
def test_do_mount(self):
expected_exec = ['true']
@ -237,93 +401,24 @@ class GlusterfsShareDriverTestCase(test.TestCase):
self.assertRaises(exception.GlusterfsException,
self._driver._ensure_gluster_vol_mounted)
def test_get_export_dir_dict_empty_volinfo(self):
self._driver.gluster_address = mock.Mock(
make_gluster_args=mock.Mock(return_value=(('true',), {})))
expected_exec = ['true']
self.assertRaises(exception.GlusterfsException,
self._driver._get_export_dir_dict)
self.assertEqual(fake_utils.fake_execute_get_log(), expected_exec)
def test_get_export_dir_dict_failing_volinfo(self):
self._driver.gluster_address = mock.Mock(
make_gluster_args=mock.Mock(return_value=(('true',), {})))
def exec_runner(*ignore_args, **ignore_kwargs):
raise RuntimeError('fake error')
expected_exec = ['true']
fake_utils.fake_execute_set_repliers([(expected_exec[0], exec_runner)])
self.assertRaises(RuntimeError, self._driver._get_export_dir_dict)
self.assertEqual(fake_utils.fake_execute_get_log(), expected_exec)
def test_get_export_dir_dict_ambiguous_volinfo(self):
self._driver.gluster_address = mock.Mock(
make_gluster_args=mock.Mock(return_value=(('true',), {})))
def exec_runner(*ignore_args, **ignore_kwargs):
return """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<cliOutput>
<volInfo>
<volumes>
<count>0</count>
</volumes>
</volInfo>
</cliOutput>""", ''
expected_exec = ['true']
fake_utils.fake_execute_set_repliers([(expected_exec[0], exec_runner)])
self.assertRaises(exception.InvalidShare,
self._driver._get_export_dir_dict)
self.assertEqual(fake_utils.fake_execute_get_log(), expected_exec)
def test_get_export_dir_dict_trivial_volinfo(self):
self._driver.gluster_address = mock.Mock(
make_gluster_args=mock.Mock(return_value=(('true',), {})))
def exec_runner(*ignore_args, **ignore_kwargs):
return """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<cliOutput>
<volInfo>
<volumes>
<volume>
</volume>
<count>1</count>
</volumes>
</volInfo>
</cliOutput>""", ''
expected_exec = ['true']
fake_utils.fake_execute_set_repliers([(expected_exec[0], exec_runner)])
ret = self._driver._get_export_dir_dict()
self.assertEqual(fake_utils.fake_execute_get_log(), expected_exec)
self.assertEqual(ret, {})
def test_get_export_dir_dict(self):
self._driver.gluster_address = mock.Mock(
make_gluster_args=mock.Mock(return_value=(('true',), {})))
def exec_runner(*ignore_args, **ignore_kwargs):
return """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<cliOutput>
<volInfo>
<volumes>
<volume>
<options>
<option>
<name>nfs.export-dir</name>
<value>/foo(10.0.0.1|10.0.0.2),/bar(10.0.0.1)</value>
</option>
</options>
</volume>
<count>1</count>
</volumes>
</volInfo>
</cliOutput>""", ''
expected_exec = ['true']
fake_utils.fake_execute_set_repliers([(expected_exec[0], exec_runner)])
get_gluster_vol_option_output = (
'/foo(10.0.0.1|10.0.0.2),/bar(10.0.0.1)')
self.stubs.Set(self._driver, '_get_gluster_vol_option',
mock.Mock(return_value=get_gluster_vol_option_output))
ret = self._driver._get_export_dir_dict()
self.assertEqual(fake_utils.fake_execute_get_log(), expected_exec)
self.assertEqual(ret,
{'foo': ['10.0.0.1', '10.0.0.2'], 'bar': ['10.0.0.1']}
)
self._driver._get_gluster_vol_option.assert_called_once_with(
NFS_EXPORT_DIR)
self.assertEqual(
{'foo': ['10.0.0.1', '10.0.0.2'], 'bar': ['10.0.0.1']}, ret)
def test_get_export_dir_dict_export_dir_none(self):
self.stubs.Set(self._driver, '_get_gluster_vol_option',
mock.Mock(return_value=None))
ret = self._driver._get_export_dir_dict()
self._driver._get_gluster_vol_option.assert_called_once_with(
NFS_EXPORT_DIR)
self.assertEqual({}, ret)
def test_get_local_share_path(self):
with mock.patch.object(os, 'access', return_value=True):
@ -383,27 +478,49 @@ class GlusterfsShareDriverTestCase(test.TestCase):
self._driver._update_share_stats)
def test_create_share(self):
self._driver._get_local_share_path =\
mock.Mock(return_value='/mnt/nfs/testvol/fakename')
expected_exec = ['mkdir /mnt/nfs/testvol/fakename', ]
fake_sizestr = '1GB'
fake_share_dir = '/fakename'
fake_args = ('executable', 'arg1',)
exec_cmd1 = 'mkdir %s' % fake_local_share_path
exec_cmd2 = ' '.join(arg for arg in fake_args)
expected_exec = [exec_cmd1, exec_cmd2]
expected_ret = 'testuser@127.0.0.1:/testvol/fakename'
self.stubs.Set(
self._driver, '_get_local_share_path',
mock.Mock(return_value=fake_local_share_path))
self.stubs.Set(
self._driver.gluster_address, 'make_gluster_args',
mock.Mock(return_value=(fake_args, {})))
ret = self._driver.create_share(self._context, self.share)
self.assertEqual(fake_utils.fake_execute_get_log(), expected_exec)
self.assertEqual(ret, expected_ret)
def test_cannot_create_share(self):
self._driver._get_local_share_path =\
mock.Mock(return_value='/mnt/nfs/testvol/fakename')
self._driver._get_local_share_path.called_once_with(self.share)
self._driver.gluster_address.make_gluster_args.called_once_with(
'volume', 'quota', 'testvol', 'limit-usage', fake_share_dir,
fake_sizestr)
self.assertEqual(expected_exec, fake_utils.fake_execute_get_log())
self.assertEqual(expected_ret, ret)
def test_create_share_unable_to_create_share(self):
def exec_runner(*ignore_args, **ignore_kw):
raise exception.ProcessExecutionError
expected_exec = ['mkdir %s' % (self._driver._get_local_share_path())]
self.stubs.Set(
self._driver, '_get_local_share_path',
mock.Mock(return_value=fake_local_share_path))
self.stubs.Set(
self._driver, '_cleanup_create_share', mock.Mock())
self.stubs.Set(
glusterfs.LOG, 'error', mock.Mock())
expected_exec = ['mkdir %s' % fake_local_share_path]
fake_utils.fake_execute_set_repliers([(expected_exec[0],
exec_runner)])
self.assertRaises(exception.ProcessExecutionError,
self._driver.create_share, self._context, self.share)
self.assertRaises(
exception.GlusterfsException, self._driver.create_share,
self._context, self.share)
self._driver._get_local_share_path.called_once_with(self.share)
self._driver._cleanup_create_share.assert_called_once_with(
fake_local_share_path, self.share['name'])
glusterfs.LOG.error.assert_called_once_with(
mock.ANY, mock.ANY)
def test_create_share_can_be_called_with_extra_arg_share_server(self):
share_server = None
@ -411,12 +528,47 @@ class GlusterfsShareDriverTestCase(test.TestCase):
with mock.patch.object(os.path, 'join', return_value=None):
ret = self._driver.create_share(self._context, self.share,
share_server)
self.assertEqual(ret, None)
self.assertEqual(None, ret)
self._driver._get_local_share_path.assert_called_once_with(
self.share)
os.path.join.assert_called_once_with(
self._driver.gluster_address.qualified, self.share['name'])
def test_cleanup_create_share_local_share_path_exists(self):
expected_exec = ['rm -rf %s' % fake_local_share_path]
self.stubs.Set(
os.path, 'exists', mock.Mock(return_value=True))
ret = self._driver._cleanup_create_share(fake_local_share_path,
self.share['name'])
os.path.exists.assert_called_once_with(fake_local_share_path)
self.assertEqual(expected_exec, fake_utils.fake_execute_get_log())
self.assertEqual(None, ret)
def test_cleanup_create_share_cannot_cleanup_unusable_share(self):
def exec_runner(*ignore_args, **ignore_kw):
raise exception.ProcessExecutionError
expected_exec = ['rm -rf %s' % fake_local_share_path]
fake_utils.fake_execute_set_repliers([(expected_exec[0],
exec_runner)])
self.stubs.Set(
glusterfs.LOG, 'error', mock.Mock())
self.stubs.Set(
os.path, 'exists', mock.Mock(return_value=True))
self.assertRaises(exception.GlusterfsException,
self._driver._cleanup_create_share,
fake_local_share_path, self.share['name'])
os.path.exists.assert_called_once_with(fake_local_share_path)
glusterfs.LOG.error.assert_called_once_with(mock.ANY, mock.ANY)
def test_cleanup_create_share_local_share_path_does_not_exist(self):
self.stubs.Set(
os.path, 'exists', mock.Mock(return_value=False))
ret = self._driver._cleanup_create_share(fake_local_share_path,
self.share['name'])
os.path.exists.assert_called_once_with(fake_local_share_path)
self.assertEqual(None, ret)
def test_delete_share(self):
self._driver._get_local_share_path =\
mock.Mock(return_value='/mnt/nfs/testvol/fakename')