Merge "Add functional tests for access rules"

This commit is contained in:
Jenkins 2015-08-03 18:10:47 +00:00 committed by Gerrit Code Review
commit a5b9af63ef
4 changed files with 256 additions and 0 deletions

View File

@ -91,6 +91,22 @@ share_opts = [
default=500,
help="Timeout in seconds to wait for a share to become "
"available."),
cfg.DictOpt('access_types_mapping',
default={'nfs': 'ip', 'cifs': 'ip'},
help="Dict contains access types mapping to share "
"protocol. It will be used to create access rules "
"for shares. Format: '<protocol>: <type1> <type2>',..."
"Allowed share protocols: nfs, cifs, glusterfs, hdfs. "),
cfg.DictOpt('access_levels_mapping',
default={'nfs': 'rw ro', 'cifs': 'rw'},
help="Dict contains access levels mapping to share "
"protocol. It will be used to create access rules for "
"shares. Format: '<protocol>: <level1> <level2>',... "
"Allowed share protocols: nfs, cifs, glusterfs, hdfs. "),
cfg.StrOpt("username_for_user_rules",
default="TESTDOMAIN\\Administrator",
help="Username, that will be used in share access tests for "
"user type of access."),
]
# 2. Generate config

View File

@ -620,3 +620,80 @@ class ManilaCLIClient(base.CLIClient):
metadata_raw = self.manila('metadata-show %s' % share)
metadata = output_parser.details(metadata_raw)
return metadata
@not_found_wrapper
def list_access(self, share_id):
access_list_raw = self.manila('access-list %s' % share_id)
return output_parser.listing(access_list_raw)
@not_found_wrapper
def get_access(self, share_id, access_id):
for access in self.list_access(share_id):
if access['id'] == access_id:
return access
raise tempest_lib_exc.NotFound()
@not_found_wrapper
def access_allow(self, share_id, access_type, access_to, access_level):
raw_access = self.manila(
'access-allow --access-level %(level)s %(id)s %(type)s '
'%(access_to)s' % {
'level': access_level,
'id': share_id,
'type': access_type,
'access_to': access_to,
})
return output_parser.details(raw_access)
@not_found_wrapper
def access_deny(self, share_id, access_id):
self.manila('access-deny %(share_id)s %(access_id)s' % {
'share_id': share_id,
'access_id': access_id,
})
def wait_for_access_rule_status(self, share_id, access_id, state='active'):
access = self.get_access(share_id, access_id)
start = int(time.time())
while access['state'] != state:
time.sleep(self.build_interval)
access = self.get_access(share_id, access_id)
if access['state'] == state:
return
elif access['state'] == 'error':
raise exceptions.AccessRuleCreateErrorException(
access=access_id)
if int(time.time()) - start >= self.build_timeout:
message = (
"Access rule %(access)s failed to reach %(state)s state "
"within the required time (%(build_timeout)s s)." % {
"access": access_id, "state": state,
"build_timeout": self.build_timeout})
raise tempest_lib_exc.TimeoutException(message)
def wait_for_access_rule_deletion(self, share_id, access_id):
try:
access = self.get_access(share_id, access_id)
except tempest_lib_exc.NotFound:
return
start = int(time.time())
while True:
time.sleep(self.build_interval)
try:
access = self.get_access(share_id, access_id)
except tempest_lib_exc.NotFound:
return
if access['state'] == 'error':
raise exceptions.AccessRuleDeleteErrorException(
access=access_id)
if int(time.time()) - start >= self.build_timeout:
message = (
"Access rule %(access)s failed to reach deleted state "
"within the required time (%s s)." % self.build_timeout)
raise tempest_lib_exc.TimeoutException(message)

View File

@ -42,3 +42,11 @@ class InvalidConfiguration(exceptions.TempestException):
class ShareBuildErrorException(exceptions.TempestException):
message = "Share %(share)s failed to build and is in ERROR status"
class AccessRuleCreateErrorException(exceptions.TempestException):
message = "Access rule %(access)s failed to create and is in ERROR state."
class AccessRuleDeleteErrorException(exceptions.TempestException):
message = "Access rule %(access)s failed to delete and is in ERROR state."

View File

@ -0,0 +1,155 @@
# Copyright 2015 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest_lib import exceptions as tempest_lib_exc
from manilaclient import config
from manilaclient.tests.functional import base
CONF = config.CONF
class ShareAccessReadWriteBase(base.BaseTestCase):
protocol = None
access_level = None
@classmethod
def setUpClass(cls):
super(ShareAccessReadWriteBase, cls).setUpClass()
if cls.protocol not in CONF.enable_protocols:
message = "%s tests are disabled." % cls.protocol
raise cls.skipException(message)
if cls.access_level not in CONF.access_levels_mapping.get(
cls.protocol, '').split(' '):
raise cls.skipException("%(level)s tests for %(protocol)s share "
"access are disabled." % {
'level': cls.access_level,
'protocol': cls.protocol
})
cls.access_types = CONF.access_types_mapping.get(
cls.protocol, '').split(' ')
if not cls.access_types:
raise cls.skipException("No access levels were provided for %s "
"share access tests." % cls.protoco)
cls.share = cls.create_share(share_protocol=cls.protocol,
public=True,
cleanup_in_class=True)
cls.access_to = {
'ip': '10.0.0.1',
'user': CONF.username_for_user_rules,
'cert': 'tenant.example.com',
}
def test_list_access_rule_for_share(self):
access_to = {
'ip': '10.0.0.1',
'user': CONF.username_for_user_rules,
'cert': 'tenant.example.com',
}
access_type = self.access_types[0]
access = self.user_client.access_allow(self.share['id'], access_type,
access_to[access_type],
self.access_level)
access_list = self.user_client.list_access(self.share['id'])
self.assertTrue(any(
[item for item in access_list if access['id'] == item['id']]))
def _create_delete_access_rule(self, share_id, access_type, access_to):
access = self.user_client.access_allow(share_id, access_type,
access_to, self.access_level)
self.assertEqual(share_id, access.get('share_id'))
self.assertEqual(access_type, access.get('access_type'))
self.assertEqual(access_to.replace('\\\\', '\\'),
access.get('access_to'))
self.assertEqual(self.access_level, access.get('access_level'))
self.user_client.wait_for_access_rule_status(share_id, access['id'])
self.user_client.access_deny(share_id, access['id'])
self.user_client.wait_for_access_rule_deletion(share_id, access['id'])
self.assertRaises(tempest_lib_exc.NotFound,
self.user_client.get_access, share_id, access['id'])
def test_create_delete_ip_access_rule(self):
if 'ip' not in self.access_types:
raise self.skipException("IP access rule is disabled for protocol "
"%s." % self.protocol)
self._create_delete_access_rule(self.share['id'], 'ip', '10.0.0.1')
def test_create_delete_user_access_rule(self):
if 'user' not in self.access_types:
raise self.skipException("User access rule is disabled for "
"protocol %s." % self.protocol)
self._create_delete_access_rule(self.share['id'], 'user',
CONF.username_for_user_rules)
def test_create_delete_cert_access_rule(self):
if 'cert' not in self.access_types:
raise self.skipException("Cert access rule is disabled for "
"protocol %s." % self.protocol)
self._create_delete_access_rule(self.share['id'], 'cert',
'tenant.example.com')
class NFSShareRWAccessReadWriteTest(ShareAccessReadWriteBase):
protocol = 'nfs'
access_level = 'rw'
class NFSShareROAccessReadWriteTest(ShareAccessReadWriteBase):
protocol = 'nfs'
access_level = 'ro'
class CIFSShareRWAccessReadWriteTest(ShareAccessReadWriteBase):
protocol = 'cifs'
access_level = 'rw'
class CIFSShareROAccessReadWriteTest(ShareAccessReadWriteBase):
protocol = 'cifs'
access_level = 'ro'
class GlusterFSShareRWAccessReadWriteTest(ShareAccessReadWriteBase):
protocol = 'glusterfs'
access_level = 'rw'
class GlusterFSShareROAccessReadWriteTest(ShareAccessReadWriteBase):
protocol = 'glusterfs'
access_level = 'ro'
class HDFSShareRWAccessReadWriteTest(ShareAccessReadWriteBase):
protocol = 'hdfs'
access_level = 'rw'
class HDFSShareROAccessReadWriteTest(ShareAccessReadWriteBase):
protocol = 'hdfs'
access_level = 'ro'
def load_tests(loader, tests, _):
result = []
for test_case in tests:
if type(test_case._tests[0]) is ShareAccessReadWriteBase:
continue
result.append(test_case)
return loader.suiteClass(result)