[scenario] Add test case to check for RO access

Test if a compute instance with RO access granted to it via floating
IP address fails to write on the share. Also, add the capability to
pass the access level to the methods that allow access to shares.

Change-Id: I1aef8f1ed1b93c0847a6aa982f62c8e539d04337
Signed-off-by: Rishabh Dave <ridave@redhat.com>
This commit is contained in:
Rishabh Dave 2018-05-03 17:30:26 +05:30
parent 7f35fc0f5e
commit 42329e2a9f
2 changed files with 58 additions and 14 deletions

View File

@ -136,18 +136,20 @@ class ShareScenarioTest(manager.NetworkScenarioTest):
sn['id'])
return sn
def _allow_access(self, share_id, client=None,
access_type="ip", access_to="0.0.0.0", cleanup=True):
def _allow_access(self, share_id, client=None, access_type="ip",
access_level="rw", access_to="0.0.0.0", cleanup=True):
"""Allow share access
:param share_id: id of the share
:param client: client object
:param access_type: "ip", "user" or "cert"
:param access_level: "rw" or "ro"
:param access_to
:returns: access object
"""
client = client or self.shares_client
access = client.create_access_rule(share_id, access_type, access_to)
access = client.create_access_rule(share_id, access_type, access_to,
access_level)
# NOTE(u_glide): Ignore provided client, because we always need v2
# client to make this call
@ -158,6 +160,17 @@ class ShareScenarioTest(manager.NetworkScenarioTest):
self.addCleanup(client.delete_access_rule, share_id, access['id'])
return access
def _deny_access(self, share_id, rule_id, client=None):
"""Deny share access
:param share_id: id of the share
:param rule_id: id of the rule that will be deleted
"""
client = client or self.shares_client
client.delete_access_rule(share_id, rule_id)
self.shares_v2_client.wait_for_share_status(
share_id, "active", status_attr='access_rules_status')
def _allow_access_snapshot(self, snapshot_id, access_type="ip",
access_to="0.0.0.0/0", cleanup=True):
"""Allow snapshot access

View File

@ -206,8 +206,8 @@ class ShareBasicOpsBase(manager.ShareScenarioTest):
self.share = self._create_share(**kwargs)
return self.share
def allow_access_ip(self, share_id, ip=None, instance=None, cleanup=True,
snapshot=None):
def allow_access_ip(self, share_id, ip=None, instance=None,
access_level="rw", cleanup=True, snapshot=None):
if instance and not ip:
try:
net_addresses = instance['addresses']
@ -225,16 +225,21 @@ class ShareBasicOpsBase(manager.ShareScenarioTest):
self._allow_access_snapshot(snapshot['id'], access_type='ip',
access_to=ip, cleanup=cleanup)
else:
self._allow_access(share_id, access_type='ip', access_to=ip,
cleanup=cleanup, client=self.shares_v2_client)
return self._allow_access(share_id, access_type='ip',
access_level=access_level, access_to=ip,
cleanup=cleanup,
client=self.shares_v2_client)
def deny_access(self, share_id, access_rule_id):
self._deny_access(share_id, access_rule_id)
def provide_access_to_auxiliary_instance(self, instance, share=None,
snapshot=None):
snapshot=None, access_level='rw'):
share = share or self.share
if self.protocol.lower() == 'cifs':
self.allow_access_ip(
return self.allow_access_ip(
share['id'], instance=instance, cleanup=False,
snapshot=snapshot)
snapshot=snapshot, access_level=access_level)
elif not CONF.share.multitenancy_enabled:
if self.use_ipv6:
server_ip = self._get_ipv6_server_ip(instance)
@ -242,14 +247,15 @@ class ShareBasicOpsBase(manager.ShareScenarioTest):
server_ip = (CONF.share.override_ip_for_nfs_access or
self.floatings[instance['id']]['ip'])
self.assertIsNotNone(server_ip)
self.allow_access_ip(
return self.allow_access_ip(
share['id'], ip=server_ip,
instance=instance, cleanup=False, snapshot=snapshot)
instance=instance, cleanup=False, snapshot=snapshot,
access_level=access_level)
elif (CONF.share.multitenancy_enabled and
self.protocol.lower() == 'nfs'):
self.allow_access_ip(
return self.allow_access_ip(
share['id'], instance=instance, cleanup=False,
snapshot=snapshot)
snapshot=snapshot, access_level=access_level)
def wait_for_active_instance(self, instance_id):
waiters.wait_for_server_status(
@ -340,6 +346,31 @@ class ShareBasicOpsBase(manager.ShareScenarioTest):
return locations
@tc.attr(base.TAG_NEGATIVE, base.TAG_BACKEND)
def test_write_with_ro_access(self):
'''Test if an instance with ro access can write on the share.'''
test_data = "Some test data to write"
instance = self.boot_instance(wait_until="BUILD")
self.create_share()
location = self._get_user_export_locations(self.share)[0]
instance = self.wait_for_active_instance(instance["id"])
ssh_client_inst = self.init_ssh(instance)
# First, check if write works RW access.
acc_rule_id = self.provide_access_to_auxiliary_instance(instance)['id']
self.mount_share(location, ssh_client_inst)
self.write_data(test_data, ssh_client_inst)
self.deny_access(self.share['id'], acc_rule_id)
self.provide_access_to_auxiliary_instance(instance, access_level='ro')
self.addCleanup(self.umount_share, ssh_client_inst)
# Test if write with RO access fails.
self.assertRaises(exceptions.SSHExecCommandFailed,
self.write_data, test_data, ssh_client_inst)
@tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
def test_read_write_two_vms(self):
"""Boots two vms and writes/reads data on it."""