glusterfs: Implement update_access() method

Implement update_access for
  - glusterfs_native driver
  - glusterfs driver with volume layout
  - glusterfs driver with directory layout

Co-Authored-By: Csaba Henk <chenk@redhat.com>
Implements bp glusterfs-update-access

Change-Id: I553bc8e23442102252163a29740967d4b64af460
This commit is contained in:
Ramana Raja 2016-02-24 22:32:43 +05:30 committed by Csaba Henk
parent bde7105a5b
commit a74d4e43f2
6 changed files with 332 additions and 580 deletions

View File

@ -30,6 +30,7 @@ import sys
from oslo_config import cfg
from manila.common import constants
from manila import exception
from manila.i18n import _
from manila.share import driver
@ -138,20 +139,28 @@ class GlusterfsShareDriver(driver.ExecuteMixin, driver.GaneshaMixin,
helper.init_helper()
return helper
def _allow_access_via_manager(self, gluster_mgr, context, share, access,
share_server=None):
"""Allow access to the share."""
self._get_helper(gluster_mgr).allow_access('/', share, access)
@property
def supported_access_types(self):
return self.nfs_helper.supported_access_types
def _deny_access_via_manager(self, gluster_mgr, context, share, access,
share_server=None):
"""Allow access to the share."""
self._get_helper(gluster_mgr).deny_access('/', share, access)
@property
def supported_access_levels(self):
return self.nfs_helper.supported_access_levels
def _update_access_via_manager(self, gluster_mgr, context, share,
add_rules, delete_rules, recovery=False,
share_server=None):
"""Update access to the share."""
self._get_helper(gluster_mgr).update_access(
'/', share, add_rules, delete_rules, recovery=recovery)
class GlusterNFSHelper(ganesha.NASHelperBase):
"""Manage shares with Gluster-NFS server."""
supported_access_types = ('ip', )
supported_access_levels = (constants.ACCESS_LEVEL_RW, )
def __init__(self, execute, config_object, **kwargs):
self.gluster_manager = kwargs.pop('gluster_manager')
super(GlusterNFSHelper, self).__init__(execute, config_object,
@ -179,73 +188,41 @@ class GlusterNFSHelper(ganesha.NASHelperBase):
edh[d] = e.split('|')
return edh
def _manage_access(self, share_name, access_type, access_to, cbk):
"""Manage share access with cbk.
Adjust the exports of the Gluster-NFS server using cbk.
:param share_name: name of the share
:type share_name: string
:param access_type: type of access allowed in Manila
:type access_type: string
:param access_to: ip of the guest whose share access is managed
:type access_to: string
:param cbk: callback to adjust the exports of NFS server
Following is the description of cbk(ddict, edir, host).
:param ddict: association of shares with ips that have access to them
:type ddict: dict
:param edir: name of share i.e. export directory
:type edir: string
:param host: ip address derived from the access object
:type host: string
:returns: bool (cbk leaves ddict intact) or None (cbk modifies ddict)
"""
if access_type != 'ip':
raise exception.InvalidShareAccess('only ip access type allowed')
export_dir_dict = self._get_export_dir_dict()
if cbk(export_dir_dict, share_name, access_to):
return
if export_dir_dict:
export_dir_new = (",".join("/%s(%s)" % (d, "|".join(v))
for d, v in sorted(export_dir_dict.items())))
else:
export_dir_new = None
self.gluster_manager.set_vol_option(NFS_EXPORT_DIR, export_dir_new)
def allow_access(self, base, share, access):
"""Allow access to a share."""
def cbk(ddict, edir, host):
if edir not in ddict:
ddict[edir] = []
if host in ddict[edir]:
return True
ddict[edir].append(host)
path = self.gluster_manager.path
self._manage_access(path[1:], access['access_type'],
access['access_to'], cbk)
def deny_access(self, base, share, access):
"""Deny access to a share."""
def cbk(ddict, edir, host):
if edir not in ddict or host not in ddict[edir]:
return True
ddict[edir].remove(host)
if not ddict[edir]:
ddict.pop(edir)
path = self.gluster_manager.path
self._manage_access(path[1:], access['access_type'],
access['access_to'], cbk)
def update_access(self, base_path, share, add_rules, delete_rules,
recovery=False):
"""Update access rules of share."""
"""Update access rules."""
# Stub needed to meet parent's implementation enforcement.
raise NotImplementedError
existing_rules_set = set()
# The name of the directory, which is exported as the share.
export_dir = self.gluster_manager.path[1:]
# Fetch the existing export entries as an export dictionary with the
# exported directories and the list of client IP addresses authorized
# to access them as key-value pairs.
export_dir_dict = self._get_export_dir_dict()
if export_dir in export_dir_dict:
existing_rules_set = set(export_dir_dict[export_dir])
add_rules_set = {rule['access_to'] for rule in add_rules}
delete_rules_set = {rule['access_to'] for rule in delete_rules}
new_rules_set = (
(existing_rules_set | add_rules_set) - delete_rules_set)
if new_rules_set:
export_dir_dict[export_dir] = new_rules_set
elif export_dir not in export_dir_dict:
return
else:
export_dir_dict.pop(export_dir)
# Reconstruct the export entries.
if export_dir_dict:
export_dirs_new = (",".join("/%s(%s)" % (d, "|".join(sorted(v)))
for d, v in sorted(export_dir_dict.items())))
else:
export_dirs_new = None
self.gluster_manager.set_vol_option(NFS_EXPORT_DIR, export_dirs_new)
class GlusterNFSVolHelper(GlusterNFSHelper):
@ -256,57 +233,26 @@ class GlusterNFSVolHelper(GlusterNFSHelper):
NFS_RPC_AUTH_ALLOW)
return export_vol.split(',') if export_vol else []
def _manage_access(self, access_type, access_to, cbk):
"""Manage share access with cbk.
def update_access(self, base_path, share, add_rules, delete_rules,
recovery=False):
"""Update access rules."""
Adjust the exports of the Gluster-NFS server using cbk.
existing_rules_set = set(self._get_vol_exports())
add_rules_set = {rule['access_to'] for rule in add_rules}
delete_rules_set = {rule['access_to'] for rule in delete_rules}
new_rules_set = (
(existing_rules_set | add_rules_set) - delete_rules_set)
:param access_type: type of access allowed in Manila
:type access_type: string
:param access_to: ip of the guest whose share access is managed
:type access_to: string
:param cbk: callback to adjust the exports of NFS server
Following is the description of cbk(explist, host).
:param explist: list of hosts that have access to the share
:type explist: list
:param host: ip address derived from the access object
:type host: string
:returns: bool (cbk leaves ddict intact) or None (cbk modifies ddict)
"""
if access_type != 'ip':
raise exception.InvalidShareAccess('only ip access type allowed')
export_vol_list = self._get_vol_exports()
if cbk(export_vol_list, access_to):
return
if export_vol_list:
argseq = ((NFS_RPC_AUTH_ALLOW, ','.join(export_vol_list)),
if new_rules_set:
argseq = ((NFS_RPC_AUTH_ALLOW, ','.join(sorted(new_rules_set))),
(NFS_RPC_AUTH_REJECT, None))
else:
argseq = ((NFS_RPC_AUTH_ALLOW, None),
(NFS_RPC_AUTH_REJECT, '*'))
for args in argseq:
self.gluster_manager.set_vol_option(*args)
def allow_access(self, base, share, access):
"""Allow access to a share."""
def cbk(explist, host):
if host in explist:
return True
explist.append(host)
self._manage_access(access['access_type'], access['access_to'], cbk)
def deny_access(self, base, share, access):
"""Deny access to a share."""
def cbk(explist, host):
if host not in explist:
return True
explist.remove(host)
self._manage_access(access['access_type'], access['access_to'], cbk)
class GaneshaNFSHelper(ganesha.GaneshaNASHelper):
@ -360,11 +306,3 @@ class GaneshaNFSHelper(ganesha.GaneshaNASHelper):
return {"Hostname": self.gluster_manager.host,
"Volume": self.gluster_manager.volume,
"Volpath": self.gluster_manager.path}
# TODO(csaba): remove the following when the driver moves to update_access
def allow_access(self, *a, **kw):
self._allow_access(*a, **kw)
def deny_access(self, *a, **kw):
self._deny_access(*a, **kw)

View File

@ -29,9 +29,9 @@ import re
from oslo_log import log
from manila.common import constants
from manila import exception
from manila.i18n import _
from manila.i18n import _LW
from manila.share import driver
from manila.share.drivers.glusterfs import common
from manila.share.drivers.glusterfs import layout
@ -63,6 +63,8 @@ class GlusterfsNativeShareDriver(driver.ExecuteMixin,
GLUSTERFS_VERSION_MIN = (3, 6)
_supported_access_levels = (constants.ACCESS_LEVEL_RW, )
_supported_access_types = (ACCESS_TYPE_CERT, )
supported_layouts = ('layout_volume.GlusterfsVolumeMappedLayout',)
supported_protocols = ('GLUSTERFS',)
@ -145,67 +147,45 @@ class GlusterfsNativeShareDriver(driver.ExecuteMixin,
return gluster_mgr.export
@utils.synchronized("glusterfs_native_access", external=False)
def _allow_access_via_manager(self, gluster_mgr, context, share, access,
share_server=None):
"""Allow access to a share using certs.
def _update_access_via_manager(self, gluster_mgr, context, share,
add_rules, delete_rules,
recovery=False, share_server=None):
"""Update access rules, authorize SSL CNs (Common Names)."""
Add the SSL CN (Common Name) that's allowed to access the server.
"""
if access['access_type'] != ACCESS_TYPE_CERT:
raise exception.InvalidShareAccess(_("Only 'cert' access type "
"allowed"))
ssl_allow_opt = gluster_mgr.get_vol_option(AUTH_SSL_ALLOW)
# Fetch existing authorized CNs, the value of Gluster option
# 'auth.ssl-allow' that is available as a comma seperated string.
# wrt. GlusterFS' parsing of auth.ssl-allow, please see code from
# https://github.com/gluster/glusterfs/blob/v3.6.2/
# xlators/protocol/auth/login/src/login.c#L80
# until end of gf_auth() function
ssl_allow = re.split('[ ,]', ssl_allow_opt)
access_to = access['access_to']
if access_to in ssl_allow:
LOG.warning(_LW("Access to %(share)s at %(export)s is already "
"granted for %(access_to)s. GlusterFS volume "
"options might have been changed externally."),
{'share': share['id'], 'export': gluster_mgr.qualified,
'access_to': access_to})
return
ssl_allow.append(access_to)
ssl_allow_opt = ','.join(ssl_allow)
gluster_mgr.set_vol_option(AUTH_SSL_ALLOW, ssl_allow_opt)
@utils.synchronized("glusterfs_native_access", external=False)
def _deny_access_via_manager(self, gluster_mgr, context, share, access,
share_server=None):
"""Deny access to a share that's using cert based auth.
Remove the SSL CN (Common Name) that's allowed to access the server.
"""
if access['access_type'] != ACCESS_TYPE_CERT:
raise exception.InvalidShareAccess(_("Only 'cert' access type "
"allowed for access "
"removal."))
ssl_allow_opt = gluster_mgr.get_vol_option(AUTH_SSL_ALLOW)
ssl_allow = re.split('[ ,]', ssl_allow_opt)
access_to = access['access_to']
if access_to not in ssl_allow:
LOG.warning(_LW("Access to %(share)s at %(export)s is already "
"denied for %(access_to)s. GlusterFS volume "
"options might have been changed externally."),
{'share': share['id'], 'export': gluster_mgr.qualified,
'access_to': access_to})
return
ssl_allow.remove(access_to)
ssl_allow_opt = ','.join(ssl_allow)
existing_rules_set = set(re.split('[ ,]', ssl_allow_opt))
add_rules_set = {rule['access_to'] for rule in add_rules}
for rule in add_rules_set:
if re.search('[ ,]', rule):
raise exception.GlusterfsException(
_("Invalid 'access_to' '%s': common names used for "
"GlusterFS authentication should not contain comma "
"or whitespace.") % rule)
delete_rules_set = {rule['access_to'] for rule in delete_rules}
new_rules_set = (
(existing_rules_set | add_rules_set) - delete_rules_set)
# Addition or removal of CNs in the authorized list through the
# Gluster CLI, used by 'GlusterManager' objects, can only be done by
# replacing the existing list with the newly modified list.
ssl_allow_opt = ','.join(sorted(new_rules_set))
gluster_mgr.set_vol_option(AUTH_SSL_ALLOW, ssl_allow_opt)
dynauth = gluster_mgr.get_vol_option(DYNAMIC_AUTH, boolean=True)
if not dynauth:
common._restart_gluster_vol(gluster_mgr)
# When the Gluster option, DYNAMIC_AUTH is not enabled for the gluster
# volume/manila share, the removal of CN of a client does not affect
# the client's existing connection to the volume until the volume is
# restarted.
if delete_rules:
dynauth = gluster_mgr.get_vol_option(DYNAMIC_AUTH, boolean=True)
if not dynauth:
common._restart_gluster_vol(gluster_mgr)
def _update_share_stats(self):
"""Send stats info for the GlusterFS volume."""

View File

@ -29,6 +29,7 @@ import six
from manila import exception
from manila.i18n import _
from manila.share import driver
from manila.share.drivers.ganesha import utils as ganesha_utils
glusterfs_share_layout_opts = [
cfg.StrOpt(
@ -48,6 +49,8 @@ class GlusterfsShareDriverBase(driver.ShareDriver):
supported_layouts = ()
supported_protocols = ()
_supported_access_types = ()
_supported_access_levels = ()
GLUSTERFS_VERSION_MIN = (0, 0)
@ -95,22 +98,58 @@ class GlusterfsShareDriverBase(driver.ShareDriver):
:returns: export location for share_mgr['share'].
"""
def allow_access(self, context, share, access, share_server=None):
@property
def supported_access_levels(self):
return self._supported_access_levels
@property
def supported_access_types(self):
return self._supported_access_types
def _access_rule_validator(self, abort):
def validator(rule):
return ganesha_utils.validate_access_rule(
self.supported_access_types, self.supported_access_levels,
rule, abort)
return validator
def update_access(self, context, share, access_rules, add_rules,
delete_rules, share_server=None):
"""Update access rules for given share.
Driver supports 2 different cases in this method:
1. Recovery after error - 'access_rules' contains all access_rules,
'add_rules' and 'delete_rules' are []. Driver should clear any
existent access rules and apply all access rules for given share.
This recovery is made at driver start up.
2. Adding/Deleting of several access rules - 'access_rules' contains
all access_rules, 'add_rules' and 'delete_rules' contain rules which
should be added/deleted. Driver can ignore rules in 'access_rules' and
apply only rules from 'add_rules' and 'delete_rules'.
"""
gluster_mgr = self.layout._share_manager(share)
return self._allow_access_via_manager(gluster_mgr, context, share,
access, share_server)
def deny_access(self, context, share, access, share_server=None):
gluster_mgr = self.layout._share_manager(share)
return self._deny_access_via_manager(gluster_mgr, context, share,
access, share_server)
access_rules, add_rules, delete_rules = (
list(filter(self._access_rule_validator(abort), rules)) for (
rules, abort) in ((access_rules, True),
(add_rules, True),
(delete_rules, False)))
def _allow_access_via_manager(self, gluster_mgr, context, share, access,
share_server):
raise NotImplementedError()
# Recovery mode.
if not (add_rules or delete_rules):
ruleop, recovery = (access_rules, []), True
else:
ruleop, recovery = (add_rules, delete_rules), False
def _deny_access_via_manager(self, gluster_mgr, context, share, access,
share_server):
self._update_access_via_manager(gluster_mgr, context, share,
*ruleop, recovery=recovery)
def _update_access_via_manager(self, gluster_mgr, context, share,
add_rules, delete_rules, recovery=False,
share_server=None):
raise NotImplementedError()
def do_setup(self, *a, **kw):

View File

@ -184,133 +184,62 @@ class GlusterfsNativeShareDriverTestCase(test.TestCase):
def test_snapshots_are_supported(self):
self.assertTrue(self._driver.snapshots_are_supported)
def test_allow_access_via_manager(self):
access = {'access_type': 'cert', 'access_to': 'client.example.com'}
gmgr1 = common.GlusterManager(self.glusterfs_target1, self._execute,
None, None)
self.mock_object(gmgr1, 'get_vol_option',
mock.Mock(return_value='some.common.name'))
self.mock_object(gmgr1, 'set_vol_option')
test_args = ('auth.ssl-allow',
'some.common.name,' + access['access_to'])
self._driver.layout.gluster_used_vols = set([self.glusterfs_target1])
self._driver._allow_access_via_manager(gmgr1, self._context,
self.share1, access)
gmgr1.get_vol_option.assert_called_once_with('auth.ssl-allow')
gmgr1.set_vol_option.assert_called_once_with(*test_args)
def test_allow_access_via_manager_with_share_having_access(self):
access = {'access_type': 'cert', 'access_to': 'client.example.com'}
gmgr1 = common.GlusterManager(self.glusterfs_target1, self._execute,
None, None)
@ddt.data({'delta': (["oldCN"], []), 'expected': "glusterCN,oldCN"},
{'delta': (["newCN"], []), 'expected': "glusterCN,newCN,oldCN"},
{'delta': ([], ["newCN"]), 'expected': "glusterCN,oldCN"},
{'delta': ([], ["oldCN"]), 'expected': "glusterCN"})
@ddt.unpack
def test_update_access_via_manager(self, delta, expected):
gluster_mgr = common.GlusterManager(self.glusterfs_target1,
self._execute, None, None)
self.mock_object(
gmgr1, 'get_vol_option',
mock.Mock(return_value='some.common.name,' + access['access_to']))
self.mock_object(gmgr1, 'set_vol_option')
gluster_mgr, 'get_vol_option', mock.Mock(
side_effect=lambda a, *x, **kw: {
'auth.ssl-allow': "glusterCN,oldCN",
'server.dynamic-auth': True}[a]))
self.mock_object(gluster_mgr, 'set_vol_option')
add_rules, delete_rules = (
map(lambda a: {'access_to': a}, r) for r in delta)
self._driver.layout.gluster_used_vols = set([self.glusterfs_target1])
self._driver._update_access_via_manager(
gluster_mgr, self._context, self.share1, add_rules, delete_rules)
self._driver._allow_access_via_manager(gmgr1, self._context,
self.share1, access)
gmgr1.get_vol_option.assert_called_once_with('auth.ssl-allow')
self.assertFalse(gmgr1.set_vol_option.called)
def test_allow_access_via_manager_invalid_access_type(self):
access = {'access_type': 'invalid', 'access_to': 'client.example.com'}
expected_exec = []
self.assertRaises(exception.InvalidShareAccess,
self._driver._allow_access_via_manager,
self.gmgr1, self._context, self.share1, access)
self.assertEqual(expected_exec, fake_utils.fake_execute_get_log())
def test_deny_access_via_manager(self):
self.mock_object(common, '_restart_gluster_vol', mock.Mock())
access = {'access_type': 'cert', 'access_to': 'client.example.com'}
gmgr1 = common.GlusterManager(self.glusterfs_target1, self._execute,
None, None)
def _get_vol_option(opt, **kw):
if opt == 'auth.ssl-allow':
return('some.common.name,' + access['access_to'])
elif opt == 'server.dynamic-auth':
return True
argseq = [('auth.ssl-allow', {})]
if delete_rules:
argseq.append(('server.dynamic-auth', {'boolean': True}))
self.assertEqual([mock.call(a[0], **a[1]) for a in argseq],
gluster_mgr.get_vol_option.call_args_list)
gluster_mgr.set_vol_option.assert_called_once_with('auth.ssl-allow',
expected)
def test_update_access_via_manager_restart(self):
gluster_mgr = common.GlusterManager(self.glusterfs_target1,
self._execute, None, None)
self.mock_object(
gmgr1, 'get_vol_option',
mock.Mock(side_effect=_get_vol_option))
self.mock_object(gmgr1, 'set_vol_option')
self._driver.layout.gluster_used_vols = set([self.glusterfs_target1])
gluster_mgr, 'get_vol_option', mock.Mock(
side_effect=lambda a, *x, **kw: {
'auth.ssl-allow': "glusterCN,oldCN",
'server.dynamic-auth': False}[a]))
self.mock_object(gluster_mgr, 'set_vol_option')
self.mock_object(common, '_restart_gluster_vol')
self._driver._deny_access_via_manager(gmgr1, self._context,
self.share1, access)
self._driver._update_access_via_manager(
gluster_mgr, self._context, self.share1, [],
[{'access_to': "oldCN"}])
gmgr1.get_vol_option.assert_has_calls(
[mock.call(a, **kw) for a, kw in (
('auth.ssl-allow', {}),
('server.dynamic-auth', {'boolean': True}))])
test_args = ('auth.ssl-allow', 'some.common.name')
gmgr1.set_vol_option.assert_called_once_with(*test_args)
self.assertFalse(common._restart_gluster_vol.called)
common._restart_gluster_vol.assert_called_once_with(gluster_mgr)
def test_deny_access_via_manager_no_dyn_auth(self):
self.mock_object(common, '_restart_gluster_vol', mock.Mock())
access = {'access_type': 'cert', 'access_to': 'client.example.com'}
gmgr1 = common.GlusterManager(self.glusterfs_target1, self._execute,
None, None)
@ddt.data('common name with space', 'comma,nama')
def test_update_access_via_manager_badcn(self, common_name):
gluster_mgr = common.GlusterManager(self.glusterfs_target1,
self._execute, None, None)
self.mock_object(gluster_mgr, 'get_vol_option', mock.Mock(
return_value="glusterCN,oldCN"))
def _get_vol_option(opt, **kw):
if opt == 'auth.ssl-allow':
return('some.common.name,' + access['access_to'])
elif opt == 'server.dynamic-auth':
return False
self.mock_object(
gmgr1, 'get_vol_option',
mock.Mock(side_effect=_get_vol_option))
self.mock_object(gmgr1, 'set_vol_option')
self._driver.layout.gluster_used_vols = set([self.glusterfs_target1])
self._driver._deny_access_via_manager(gmgr1, self._context,
self.share1, access)
gmgr1.get_vol_option.assert_has_calls(
[mock.call(a, **kw) for a, kw in (
('auth.ssl-allow', {}),
('server.dynamic-auth', {'boolean': True}))])
test_args = ('auth.ssl-allow', 'some.common.name')
gmgr1.set_vol_option.assert_called_once_with(*test_args)
common._restart_gluster_vol.assert_called_once_with(gmgr1)
def test_deny_access_via_manager_with_share_having_no_access(self):
self.mock_object(common, '_restart_gluster_vol', mock.Mock())
access = {'access_type': 'cert', 'access_to': 'client.example.com'}
gmgr1 = common.GlusterManager(self.glusterfs_target1, self._execute,
None, None)
self.mock_object(gmgr1, 'get_vol_option',
mock.Mock(return_value='some.common.name'))
self.mock_object(gmgr1, 'set_vol_option')
self._driver.layout.gluster_used_vols = set([self.glusterfs_target1])
self._driver._deny_access_via_manager(gmgr1, self._context,
self.share1, access)
gmgr1.get_vol_option.assert_called_once_with('auth.ssl-allow')
self.assertFalse(gmgr1.set_vol_option.called)
self.assertFalse(common._restart_gluster_vol.called)
def test_deny_access_via_manager_invalid_access_type(self):
self.mock_object(common, '_restart_gluster_vol', mock.Mock())
access = {'access_type': 'invalid', 'access_to': 'NotApplicable'}
self.assertRaises(exception.InvalidShareAccess,
self._driver._deny_access_via_manager, self.gmgr1,
self._context, self.share1, access)
self.assertFalse(common._restart_gluster_vol.called)
self.assertRaises(exception.GlusterfsException,
self._driver._update_access_via_manager,
gluster_mgr, self._context, self.share1,
[{'access_to': common_name}], [])
def test_update_share_stats(self):
self._driver._update_share_stats()

View File

@ -26,6 +26,7 @@ from manila.share import configuration as config
from manila.share import driver
from manila.share.drivers.glusterfs import layout
from manila import test
from manila.tests import fake_share
from manila.tests import fake_utils
@ -38,11 +39,19 @@ fake_path_to_private_key = '/fakepath/to/privatekey'
fake_remote_server_password = 'fakepassword'
def fake_access(kwargs):
fake_access_rule = fake_share.fake_access(**kwargs)
fake_access_rule.to_dict = lambda: fake_access_rule.values
return fake_access_rule
class GlusterfsFakeShareDriver(layout.GlusterfsShareDriverBase):
supported_layouts = ('layout_fake.FakeLayout',
'layout_something.SomeLayout')
supported_protocols = ('NFS,')
_supported_access_types = ('ip',)
_supported_access_levels = ('rw',)
@ddt.ddt
@ -54,9 +63,9 @@ class GlusterfsShareDriverBaseTestCase(test.TestCase):
CONF.set_default('driver_handles_share_servers', False)
fake_conf, __ = self._setup()
self._driver = GlusterfsFakeShareDriver(False, configuration=fake_conf)
self.fake_share = mock.Mock()
self.fake_context = mock.Mock()
self.fake_access = mock.Mock()
self.fake_share = mock.Mock(name='fake_share')
self.fake_context = mock.Mock(name='fake_context')
self.fake_access = mock.Mock(name='fake_access')
def _setup(self):
fake_conf = config.Configuration(None)
@ -104,31 +113,64 @@ class GlusterfsShareDriverBaseTestCase(test.TestCase):
def test_setup_via_manager(self):
self.assertIsNone(self._driver._setup_via_manager(mock.Mock()))
@ddt.data('allow', 'deny')
def test_allow_deny_access(self, op):
def test_supported_access_types(self):
self.assertEqual(('ip',), self._driver.supported_access_types)
def test_supported_access_levels(self):
self.assertEqual(('rw',), self._driver.supported_access_levels)
def test_access_rule_validator(self):
rule = mock.Mock()
abort = mock.Mock()
valid = mock.Mock()
self.mock_object(layout.ganesha_utils, 'validate_access_rule',
mock.Mock(return_value=valid))
ret = self._driver._access_rule_validator(abort)(rule)
self.assertEqual(valid, ret)
layout.ganesha_utils.validate_access_rule.assert_called_once_with(
('ip',), ('rw',), rule, abort)
@ddt.data({'inset': ([], ['ADD'], []), 'outset': (['ADD'], []),
'recovery': False},
{'inset': ([], [], ['DELETE']), 'outset': ([], ['DELETE']),
'recovery': False},
{'inset': (['EXISTING'], ['ADD'], ['DELETE']),
'outset': (['ADD'], ['DELETE']), 'recovery': False},
{'inset': (['EXISTING'], [], []), 'outset': (['EXISTING'], []),
'recovery': True})
@ddt.unpack
def test_update_access(self, inset, outset, recovery):
conf, _layout = self._setup()
gmgr = mock.Mock()
gluster_mgr = mock.Mock(name='gluster_mgr')
self.mock_object(_layout, '_share_manager',
mock.Mock(return_value=gmgr))
mock.Mock(return_value=gluster_mgr))
_driver = GlusterfsFakeShareDriver(False, configuration=conf)
self.mock_object(_driver, "_%s_access_via_manager" % op, mock.Mock())
self.mock_object(_driver, '_update_access_via_manager', mock.Mock())
rulemap = {t: fake_access({'access_type': "ip",
'access_level': "rw",
'access_to': t}) for t in (
'EXISTING', 'ADD', 'DELETE')}
in_rules, out_rules = (
[
[
rulemap[t] for t in r
] for r in rs
] for rs in (inset, outset))
getattr(_driver, "%s_access" % op)(self.fake_context, self.fake_share,
self.fake_access)
_driver.update_access(self.fake_context, self.fake_share, *in_rules)
_layout._share_manager.assert_called_once_with(self.fake_share)
getattr(_driver,
"_%s_access_via_manager" % op).assert_called_once_with(
gmgr, self.fake_context, self.fake_share, self.fake_access, None)
_driver._update_access_via_manager.assert_called_once_with(
gluster_mgr, self.fake_context, self.fake_share,
*out_rules, recovery=recovery)
@ddt.data('allow', 'deny')
def test_allow_deny_access_via_manager(self, op):
def test_update_access_via_manager(self):
self.assertRaises(NotImplementedError,
getattr(self._driver,
"_%s_access_via_manager" % op),
self._driver._update_access_via_manager,
mock.Mock(), self.fake_context, self.fake_share,
self.fake_access, None)
[self.fake_access], [self.fake_access])
@ddt.data('NFS', 'PROTATO')
def test_check_proto_baseclass(self, proto):

View File

@ -175,25 +175,31 @@ class GlusterfsShareDriverTestCase(test.TestCase):
self.assertIsInstance(ret, helper)
@ddt.data({'op': 'allow', 'kwargs': {}},
{'op': 'allow', 'kwargs': {'share_server': None}},
{'op': 'deny', 'kwargs': {}},
{'op': 'deny', 'kwargs': {'share_server': None}})
@ddt.unpack
def test_allow_deny_access_via_manager(self, op, kwargs):
@ddt.data('type', 'level')
def test_supported_access_features(self, feature):
nfs_helper = mock.Mock()
supported_access_feature = mock.Mock()
setattr(nfs_helper, 'supported_access_%ss' % feature,
supported_access_feature)
self.mock_object(self._driver, 'nfs_helper', nfs_helper)
ret = getattr(self._driver, 'supported_access_%ss' % feature)
self.assertEqual(supported_access_feature, ret)
def test_update_access_via_manager(self):
self.mock_object(self._driver, '_get_helper')
gmgr = mock.Mock()
add_rules = mock.Mock()
delete_rules = mock.Mock()
ret = getattr(self._driver, "_%s_access_via_manager" % op
)(gmgr, self._context, self.share,
fake_share.fake_access, **kwargs)
self._driver._update_access_via_manager(
gmgr, self._context, self.share,
add_rules, delete_rules, recovery=True)
self._driver._get_helper.assert_called_once_with(gmgr)
getattr(
self._driver._get_helper(),
"%s_access" % op).assert_called_once_with(
'/', self.share, fake_share.fake_access)
self.assertIsNone(ret)
self._driver._get_helper().update_access.assert_called_once_with(
'/', self.share, add_rules, delete_rules, recovery=True)
@ddt.ddt
@ -230,132 +236,51 @@ class GlusterNFSHelperTestCase(test.TestCase):
(self._helper.gluster_manager.get_vol_option.
assert_called_once_with(NFS_EXPORT_DIR))
def test_manage_access_bad_access_type(self):
cbk = None
access = {'access_type': 'bad', 'access_to': None}
self.assertRaises(exception.InvalidShareAccess,
self._helper._manage_access, fake_share_name,
access['access_type'], access['access_to'], cbk)
def test_manage_access_noop(self):
cbk = mock.Mock(return_value=True)
access = fake_share.fake_access()
export_dir_dict = mock.Mock()
self.mock_object(self._helper, '_get_export_dir_dict',
mock.Mock(return_value=export_dir_dict))
ret = self._helper._manage_access(fake_share_name,
access['access_type'],
access['access_to'], cbk)
self._helper._get_export_dir_dict.assert_called_once_with()
cbk.assert_called_once_with(export_dir_dict, fake_share_name,
access['access_to'])
self.assertIsNone(ret)
def test_manage_access_adding_entry(self):
def cbk(d, key, value):
d[key].append(value)
access = fake_share.fake_access()
export_dir_dict = {
'example.com': ['10.0.0.1'],
'fakename': ['10.0.0.2'],
}
export_str = '/example.com(10.0.0.1),/fakename(10.0.0.2|10.0.0.1)'
args = (NFS_EXPORT_DIR, export_str)
self.mock_object(self._helper, '_get_export_dir_dict',
mock.Mock(return_value=export_dir_dict))
ret = self._helper._manage_access(fake_share_name,
access['access_type'],
access['access_to'], cbk)
self.assertIsNone(ret)
self._helper._get_export_dir_dict.assert_called_once_with()
self._helper.gluster_manager.set_vol_option.assert_called_once_with(
*args)
def test_manage_access_removing_last_entry(self):
def cbk(d, key, value):
d.pop(key)
access = fake_share.fake_access()
args = (NFS_EXPORT_DIR, None)
@ddt.data({'delta': (['10.0.0.2'], []), 'extra_exports': {},
'new_exports': '/fakename(10.0.0.1|10.0.0.2)'},
{'delta': (['10.0.0.1'], []), 'extra_exports': {},
'new_exports': '/fakename(10.0.0.1)'},
{'delta': ([], ['10.0.0.2']), 'extra_exports': {},
'new_exports': '/fakename(10.0.0.1)'},
{'delta': ([], ['10.0.0.1']), 'extra_exports': {},
'new_exports': None},
{'delta': ([], ['10.0.0.1']),
'extra_exports': {'elsewhere': ['10.0.1.3']},
'new_exports': '/elsewhere(10.0.1.3)'})
@ddt.unpack
def test_update_access(self, delta, extra_exports, new_exports):
gluster_manager_attrs = {'path': '/fakename'}
gluster_manager_attrs.update(fake_gluster_manager_attrs)
gluster_mgr = mock.Mock(**gluster_manager_attrs)
helper = glusterfs.GlusterNFSHelper(
self._execute, self.fake_conf, gluster_manager=gluster_mgr)
export_dir_dict = {'fakename': ['10.0.0.1']}
self.mock_object(self._helper, '_get_export_dir_dict',
mock.Mock(return_value=export_dir_dict))
export_dir_dict.update(extra_exports)
helper._get_export_dir_dict = mock.Mock(return_value=export_dir_dict)
_share = mock.Mock()
ret = self._helper._manage_access(fake_share_name,
access['access_type'],
access['access_to'], cbk)
add_rules, delete_rules = (
map(lambda a: {'access_to': a}, r) for r in delta)
helper.update_access('/', _share, add_rules, delete_rules)
self.assertIsNone(ret)
self._helper._get_export_dir_dict.assert_called_once_with()
self._helper.gluster_manager.set_vol_option.assert_called_once_with(
*args)
helper._get_export_dir_dict.assert_called_once_with()
gluster_mgr.set_vol_option.assert_called_once_with(NFS_EXPORT_DIR,
new_exports)
def test_allow_access_with_share_having_noaccess(self):
access = fake_share.fake_access()
share = fake_share.fake_share()
export_dir_dict = {'example.com': ['10.0.0.1']}
export_str = '/example.com(10.0.0.1),/fakename(10.0.0.1)'
self.mock_object(self._helper, '_get_export_dir_dict',
mock.Mock(return_value=export_dir_dict))
self._helper.gluster_manager.path = '/fakename'
@ddt.data({}, {'elsewhere': '10.0.1.3'})
def test_update_access_disjoint(self, export_dir_dict):
gluster_manager_attrs = {'path': '/fakename'}
gluster_manager_attrs.update(fake_gluster_manager_attrs)
gluster_mgr = mock.Mock(**gluster_manager_attrs)
helper = glusterfs.GlusterNFSHelper(
self._execute, self.fake_conf, gluster_manager=gluster_mgr)
helper._get_export_dir_dict = mock.Mock(return_value=export_dir_dict)
_share = mock.Mock()
self._helper.allow_access(None, share, access)
helper.update_access('/', _share, [], [{'access_to': '10.0.0.2'}])
self._helper._get_export_dir_dict.assert_called_once_with()
self._helper.gluster_manager.set_vol_option.assert_called_once_with(
NFS_EXPORT_DIR, export_str)
def test_allow_access_with_share_having_access(self):
access = fake_share.fake_access()
share = fake_share.fake_share()
export_dir_dict = {'fakename': ['10.0.0.1']}
self.mock_object(self._helper, '_get_export_dir_dict',
mock.Mock(return_value=export_dir_dict))
self._helper.gluster_manager.path = '/fakename'
self._helper.allow_access(None, share, access)
self._helper._get_export_dir_dict.assert_called_once_with()
self.assertFalse(self._helper.gluster_manager.set_vol_option.called)
def test_deny_access_with_share_having_noaccess(self):
access = fake_share.fake_access()
share = fake_share.fake_share()
export_dir_dict = {}
self.mock_object(self._helper, '_get_export_dir_dict',
mock.Mock(return_value=export_dir_dict))
self._helper.gluster_manager.path = '/fakename'
self._helper.deny_access(None, share, access)
self._helper._get_export_dir_dict.assert_called_once_with()
self.assertFalse(self._helper.gluster_manager.set_vol_option.called)
def test_deny_access_with_share_having_access(self):
access = fake_share.fake_access()
share = fake_share.fake_share()
export_dir_dict = {
'example.com': ['10.0.0.1'],
'fakename': ['10.0.0.1'],
}
export_str = '/example.com(10.0.0.1)'
args = (NFS_EXPORT_DIR, export_str)
self.mock_object(self._helper, '_get_export_dir_dict',
mock.Mock(return_value=export_dir_dict))
self._helper.gluster_manager.path = '/fakename'
self._helper.deny_access(None, share, access)
self._helper._get_export_dir_dict.assert_called_once_with()
self._helper.gluster_manager.set_vol_option.assert_called_once_with(
*args)
helper._get_export_dir_dict.assert_called_once_with()
self.assertFalse(gluster_mgr.set_vol_option.called)
@ddt.ddt
@ -386,124 +311,37 @@ class GlusterNFSVolHelperTestCase(test.TestCase):
(self._helper.gluster_manager.get_vol_option.
assert_called_once_with(NFS_RPC_AUTH_ALLOW))
def test_manage_access_bad_access_type(self):
cbk = None
access = {'access_type': 'bad', 'access_to': None}
self.assertRaises(exception.InvalidShareAccess,
self._helper._manage_access,
access['access_type'], access['access_to'], cbk)
@ddt.data({'delta': (["10.0.0.1"], []), 'expected': "10.0.0.1,10.0.0.3"},
{'delta': (["10.0.0.2"], []),
'expected': "10.0.0.1,10.0.0.2,10.0.0.3"},
{'delta': ([], ["10.0.0.1"]), 'expected': "10.0.0.3"},
{'delta': ([], ["10.0.0.2"]), 'expected': "10.0.0.1,10.0.0.3"})
@ddt.unpack
def test_update_access(self, delta, expected):
self.mock_object(self._helper, '_get_vol_exports', mock.Mock(
return_value=["10.0.0.1", "10.0.0.3"]))
_share = mock.Mock()
def test_manage_access_noop(self):
cbk = mock.Mock(return_value=True)
access = fake_share.fake_access()
export_list = mock.Mock()
self.mock_object(self._helper, '_get_vol_exports',
mock.Mock(return_value=export_list))
ret = self._helper._manage_access(access['access_type'],
access['access_to'], cbk)
add_rules, delete_rules = (
map(lambda a: {'access_to': a}, r) for r in delta)
self._helper.update_access("/", _share, add_rules, delete_rules)
self._helper._get_vol_exports.assert_called_once_with()
cbk.assert_called_once_with(export_list, access['access_to'])
self.assertIsNone(ret)
def test_manage_access_adding_entry(self):
def cbk(li, v):
li.append(v)
access = fake_share.fake_access()
export_list = ['10.0.0.2']
self.mock_object(self._helper, '_get_vol_exports',
mock.Mock(return_value=export_list))
ret = self._helper._manage_access(access['access_type'],
access['access_to'], cbk)
self.assertIsNone(ret)
self._helper._get_vol_exports.assert_called_once_with()
export_str = '10.0.0.2,10.0.0.1'
argseq = ((NFS_RPC_AUTH_ALLOW, export_str),
(NFS_RPC_AUTH_REJECT, None))
argseq = [(NFS_RPC_AUTH_ALLOW, expected), (NFS_RPC_AUTH_REJECT, None)]
self.assertEqual(
[mock.call(*a) for a in argseq],
self._helper.gluster_manager.set_vol_option.call_args_list)
def test_manage_access_removing_last_entry(self):
def test_update_access_empty(self):
self.mock_object(self._helper, '_get_vol_exports', mock.Mock(
return_value=["10.0.0.1"]))
_share = mock.Mock()
def cbk(li, v):
li.remove(v)
access = fake_share.fake_access()
export_list = ['10.0.0.1']
self.mock_object(self._helper, '_get_vol_exports',
mock.Mock(return_value=export_list))
ret = self._helper._manage_access(access['access_type'],
access['access_to'], cbk)
self.assertIsNone(ret)
self._helper._get_vol_exports.assert_called_once_with()
argseq = ((NFS_RPC_AUTH_ALLOW, None),
(NFS_RPC_AUTH_REJECT, '*'))
self.assertEqual(
[mock.call(*a) for a in argseq],
self._helper.gluster_manager.set_vol_option.call_args_list)
def test_allow_access_with_share_having_noaccess(self):
access = fake_share.fake_access()
share = fake_share.fake_share()
export_list = ['10.0.0.2']
self.mock_object(self._helper, '_get_vol_exports',
mock.Mock(return_value=export_list))
self._helper.allow_access(None, share, access)
self._helper.update_access("/", _share, [],
[{'access_to': "10.0.0.1"}])
self._helper._get_vol_exports.assert_called_once_with()
export_str = '10.0.0.2,10.0.0.1'
argseq = ((NFS_RPC_AUTH_ALLOW, export_str),
(NFS_RPC_AUTH_REJECT, None))
self.assertEqual(
[mock.call(*a) for a in argseq],
self._helper.gluster_manager.set_vol_option.call_args_list)
def test_allow_access_with_share_having_access(self):
access = fake_share.fake_access()
share = fake_share.fake_share()
export_list = ['10.0.0.1']
self.mock_object(self._helper, '_get_vol_exports',
mock.Mock(return_value=export_list))
self._helper.allow_access(None, share, access)
self._helper._get_vol_exports.assert_called_once_with()
self.assertFalse(self._helper.gluster_manager.set_vol_option.called)
def test_deny_access_with_share_having_noaccess(self):
access = fake_share.fake_access()
share = fake_share.fake_share()
export_list = []
self.mock_object(self._helper, '_get_vol_exports',
mock.Mock(return_value=export_list))
self._helper.deny_access(None, share, access)
self._helper._get_vol_exports.assert_called_once_with()
self.assertFalse(self._helper.gluster_manager.set_vol_option.called)
def test_deny_access_with_share_having_access(self):
access = fake_share.fake_access()
share = fake_share.fake_share()
export_list = ['10.0.0.1', '10.0.0.2']
self.mock_object(self._helper, '_get_vol_exports',
mock.Mock(return_value=export_list))
self._helper.deny_access(None, share, access)
self._helper._get_vol_exports.assert_called_once_with()
export_str = '10.0.0.2'
argseq = ((NFS_RPC_AUTH_ALLOW, export_str),
(NFS_RPC_AUTH_REJECT, None))
argseq = [(NFS_RPC_AUTH_ALLOW, None), (NFS_RPC_AUTH_REJECT, "*")]
self.assertEqual(
[mock.call(*a) for a in argseq],
self._helper.gluster_manager.set_vol_option.call_args_list)
@ -633,17 +471,3 @@ class GaneshaNFSHelperTestCase(test.TestCase):
ret = self._helper._fsal_hook('/fakepath', self.share, self.access)
self.assertEqual(output, ret)
def test_allow_access(self):
self.mock_object(self._helper, '_allow_access')
self._helper.allow_access("foo")
self._helper._allow_access.assert_called_once_with("foo")
def test_deny_access(self):
self.mock_object(self._helper, '_deny_access')
self._helper.deny_access("foo")
self._helper._deny_access.assert_called_once_with("foo")