[OSC] Implement security services commands

In this patch we add the following openstack share commands:
share security service create
share security service delete
share security service list
share security service show
share security service set
share security service unset

Partially-implements: bp openstack-client-support
Change-Id: I24d27a7789f7be4f250b6b4d14a7c03acd2474bb
This commit is contained in:
Maari Tamm 2021-11-02 19:56:10 +00:00
parent 429f26c8d9
commit 8873dc20e9
5 changed files with 1123 additions and 0 deletions

View File

@ -151,6 +151,13 @@ share services
.. autoprogram-cliff:: openstack.share.v2
:command: share service *
=======================
share security services
=======================
.. autoprogram-cliff:: openstack.share.v2
:command: share security service *
===========
share pools
===========

View File

@ -0,0 +1,500 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from osc_lib.cli import parseractions
from osc_lib.command import command
from osc_lib import exceptions
from osc_lib import utils as oscutils
from manilaclient import api_versions
from manilaclient.common._i18n import _
LOG = logging.getLogger(__name__)
class CreateShareSecurityService(command.ShowOne):
"""Create security service used by project."""
_description = _("Create security service used by project.")
def get_parser(self, prog_name):
parser = super(CreateShareSecurityService, self).get_parser(prog_name)
parser.add_argument(
'type',
metavar='<type>',
default=None,
choices=['ldap', 'kerberos', 'active_directory'],
help=_("Security service type. Possible options are:"
"'ldap', 'kerberos', 'active_directory'.")
)
parser.add_argument(
'--dns-ip',
metavar='<dns-ip>',
default=None,
help=_("DNS IP address used inside project's network.")
)
parser.add_argument(
'--ou',
metavar='<ou>',
default=None,
help=_("Security service OU (Organizational Unit). "
"Available only for microversion >= 2.44.")
)
parser.add_argument(
'--server',
metavar='<server>',
default=None,
help=_("Security service IP address or hostname.")
)
parser.add_argument(
'--domain',
metavar='<domain>',
default=None,
help=_("Security service domain.")
)
parser.add_argument(
'--user',
metavar='<user',
default=None,
help=_("Security service user or group used by project.")
)
parser.add_argument(
'--password',
metavar='<password>',
default=None,
help=_("Password used by user.")
)
parser.add_argument(
'--name',
metavar='<name>',
default=None,
help=_("Security service name.")
)
parser.add_argument(
'--description',
metavar='<description>',
default=None,
help=_("Security service description.")
)
return parser
def take_action(self, parsed_args):
share_client = self.app.client_manager.share
kwargs = {
'dns_ip': parsed_args.dns_ip,
'server': parsed_args.server,
'domain': parsed_args.domain,
'user': parsed_args.user,
'password': parsed_args.password,
'name': parsed_args.name,
'description': parsed_args.description,
}
if share_client.api_version >= api_versions.APIVersion("2.44"):
kwargs['ou'] = parsed_args.ou
elif parsed_args.ou:
raise exceptions.CommandError(
"Defining a security service Organizational Unit is "
"available only for microversion >= 2.44")
security_service = share_client.security_services.create(
parsed_args.type, **kwargs)
return self.dict2columns(security_service._info)
class DeleteShareSecurityService(command.Command):
"""Delete one or more security services."""
_description = _("Delete one or more security services.")
def get_parser(self, prog_name):
parser = super(DeleteShareSecurityService, self).get_parser(prog_name)
parser.add_argument(
'security_service',
metavar='<security-service>',
nargs="+",
help=_("Name or ID of the security service(s) to delete.")
)
return parser
def take_action(self, parsed_args):
share_client = self.app.client_manager.share
result = 0
for security_service in parsed_args.security_service:
try:
security_service_obj = oscutils.find_resource(
share_client.security_services,
security_service)
share_client.security_services.delete(
security_service_obj)
except Exception as e:
result += 1
LOG.error(f"Failed to delete security service with "
f"name or ID {security_service}: {e}")
if result > 0:
total = len(parsed_args.security_service)
msg = (f"{result} of {total} security services failed "
f"to be deleted.")
raise exceptions.CommandError(msg)
class ShowShareSecurityService(command.ShowOne):
"""Show security service."""
_description = _("Show security service.")
def get_parser(self, prog_name):
parser = super(ShowShareSecurityService, self).get_parser(prog_name)
parser.add_argument(
'security_service',
metavar='<security-service>',
help=_("Security service name or ID to show.")
)
return parser
def take_action(self, parsed_args):
share_client = self.app.client_manager.share
security_service = oscutils.find_resource(
share_client.security_services,
parsed_args.security_service)
data = security_service._info
if parsed_args.formatter == 'table':
if 'share_networks' in data.keys():
data['share_networks'] = "\n".join(
data['share_networks'])
return self.dict2columns(data)
class SetShareSecurityService(command.Command):
"""Set security service."""
_description = _("Set security service.")
def get_parser(self, prog_name):
parser = super(SetShareSecurityService, self).get_parser(prog_name)
parser.add_argument(
'security_service',
metavar='<security-service>',
help=_("Security service name or ID.")
)
parser.add_argument(
'--dns-ip',
metavar='<dns-ip>',
default=None,
help=_("Set DNS IP address used inside project's network.")
)
parser.add_argument(
'--ou',
metavar='<ou>',
default=None,
help=_("Set security service OU (Organizational Unit). "
"Available only for microversion >= 2.44.")
)
parser.add_argument(
'--server',
metavar='<server>',
default=None,
help=_("Set security service IP address or hostname.")
)
parser.add_argument(
'--domain',
metavar='<domain>',
default=None,
help=_("Set security service domain.")
)
parser.add_argument(
'--user',
metavar='<user',
default=None,
help=_("Set security service user or group used by project.")
)
parser.add_argument(
'--password',
metavar='<password>',
default=None,
help=_("Set password used by user.")
)
parser.add_argument(
'--name',
metavar='<name>',
default=None,
help=_("Set security service name.")
)
parser.add_argument(
'--description',
metavar='<description>',
default=None,
help=_("Set security service description.")
)
return parser
def take_action(self, parsed_args):
share_client = self.app.client_manager.share
security_service = oscutils.find_resource(
share_client.security_services,
parsed_args.security_service)
kwargs = {
'dns_ip': parsed_args.dns_ip,
'server': parsed_args.server,
'domain': parsed_args.domain,
'user': parsed_args.user,
'password': parsed_args.password,
'name': parsed_args.name,
'description': parsed_args.description,
}
if share_client.api_version >= api_versions.APIVersion("2.44"):
kwargs['ou'] = parsed_args.ou
elif parsed_args.ou:
raise exceptions.CommandError(_(
"Setting a security service Organizational Unit is "
"available only for microversion >= 2.44"))
try:
security_service.update(**kwargs)
except Exception as e:
raise exceptions.CommandError(
f"One or more set operations failed: {e}")
class UnsetShareSecurityService(command.Command):
"""Unset security service."""
_description = _("Unset security service.")
def get_parser(self, prog_name):
parser = super(UnsetShareSecurityService, self).get_parser(prog_name)
parser.add_argument(
'security_service',
metavar='<security-service>',
help=_("Security service name or ID.")
)
parser.add_argument(
'--dns-ip',
action='store_true',
help=_("Unset DNS IP address used inside project's network.")
)
parser.add_argument(
'--ou',
action='store_true',
help=_("Unset security service OU (Organizational Unit). "
"Available only for microversion >= 2.44.")
)
parser.add_argument(
'--server',
action='store_true',
help=_("Unset security service IP address or hostname.")
)
parser.add_argument(
'--domain',
action='store_true',
help=_("Unset security service domain.")
)
parser.add_argument(
'--user',
action='store_true',
help=_("Unset security service user or group used by project.")
)
parser.add_argument(
'--password',
action='store_true',
help=_("Unset password used by user.")
)
parser.add_argument(
'--name',
action='store_true',
help=_("Unset security service name.")
)
parser.add_argument(
'--description',
action='store_true',
help=_("Unset security service description.")
)
return parser
def take_action(self, parsed_args):
share_client = self.app.client_manager.share
security_service = oscutils.find_resource(
share_client.security_services,
parsed_args.security_service)
kwargs = {}
args = ['dns_ip', 'server', 'domain', 'user', 'password',
'name', 'description']
for arg in args:
if getattr(parsed_args, arg):
# the SDK unsets a value if it is an empty string
kwargs[arg] = ''
if (parsed_args.ou and
share_client.api_version >= api_versions.APIVersion("2.44")):
# the SDK unsets a value if it is an empty string
kwargs['ou'] = ''
elif parsed_args.ou:
raise exceptions.CommandError(_(
"Unsetting a security service Organizational Unit is "
"available only for microversion >= 2.44"))
try:
security_service.update(**kwargs)
except Exception as e:
raise exceptions.CommandError(
f"One or more unset operations failed: {e}")
class ListShareSecurityService(command.Lister):
"""List security services."""
_description = _("List security services.")
def get_parser(self, prog_name):
parser = super(ListShareSecurityService, self).get_parser(prog_name)
parser.add_argument(
'--all-projects',
action='store_true',
help=_("Display information from all projects (Admin only).")
)
parser.add_argument(
'--share-network',
metavar='<share-network>',
default=None,
help=_("Filter results by share network name or ID.")
)
parser.add_argument(
'--status',
metavar='<status>',
default=None,
help=_("Filter results by status.")
)
parser.add_argument(
'--name',
metavar='<name>',
default=None,
help=_("Filter results by security service name.")
)
parser.add_argument(
'--type',
metavar='<type>',
default=None,
help=_("Filter results by security service type.")
)
parser.add_argument(
'--user',
metavar='<user',
default=None,
help=_("Filter results by security service user or group "
"used by project.")
)
parser.add_argument(
'--dns-ip',
metavar='<dns-ip>',
default=None,
help=_("Filter results by DNS IP address used inside "
"project's network.")
)
parser.add_argument(
'--ou',
metavar='<ou>',
default=None,
help=_("Filter results by security service OU "
"(Organizational Unit). "
"Available only for microversion >= 2.44.")
)
parser.add_argument(
'--server',
metavar='<server>',
default=None,
help=_("Filter results by security service IP "
"address or hostname.")
)
parser.add_argument(
'--domain',
metavar='<domain>',
default=None,
help=_("Filter results by security service domain.")
)
parser.add_argument(
'--detail',
action='store_true',
help=_("Show detailed information about filtered "
"security services.")
)
parser.add_argument(
"--limit",
metavar="<num-security-services>",
type=int,
default=None,
action=parseractions.NonNegativeAction,
help=_("Limit the number of security services returned")
)
parser.add_argument(
"--marker",
metavar="<security-service>",
help=_("The last security service ID of the previous page")
)
return parser
def take_action(self, parsed_args):
share_client = self.app.client_manager.share
columns = ['ID', 'Name', 'Status', 'Type']
if parsed_args.all_projects:
columns.append('Project ID')
if parsed_args.detail:
columns.append('Share Networks')
search_opts = {
'all_tenants': parsed_args.all_projects,
'status': parsed_args.status,
'name': parsed_args.name,
'type': parsed_args.type,
'user': parsed_args.user,
'dns_ip': parsed_args.dns_ip,
'server': parsed_args.server,
'domain': parsed_args.domain,
'offset': parsed_args.marker,
'limit': parsed_args.limit,
}
if (parsed_args.ou and
share_client.api_version >= api_versions.APIVersion("2.44")):
search_opts['ou'] = parsed_args.ou
elif parsed_args.ou:
raise exceptions.CommandError(_(
"Filtering results by security service Organizational Unit is "
"available only for microversion >= 2.44"))
if parsed_args.share_network:
search_opts['share_network_id'] = oscutils.find_resource(
share_client.share_networks,
parsed_args.share_network).id
data = share_client.security_services.list(
search_opts=search_opts,
detailed=parsed_args.detail
)
return (
columns,
(oscutils.get_item_properties(s, columns) for s in data)
)

View File

@ -893,6 +893,65 @@ class FakeShareService(object):
return services
class FakeShareSecurityService(object):
"""Fake one or more share security service"""
@staticmethod
def create_fake_security_service(attrs=None, methods=None):
"""Create a fake share security service
:param Dictionary attrs:
A dictionary with all attributes
:return:
A FakeResource object, with project_id, resource and so on
"""
attrs = attrs or {}
methods = methods or {}
share_security_service_info = {
"created_at": datetime.datetime.now().isoformat(),
"description": 'description',
"dns_ip": '0.0.0.0',
"domain": 'fake.domain',
"id": uuid.uuid4().hex,
"name": 'name-' + uuid.uuid4().hex,
"ou": 'fake_OU',
"password": 'password',
"project_id": uuid.uuid4().hex,
"server": 'fake_hostname',
"status": 'new',
"type": 'ldap',
"updated_at": datetime.datetime.now().isoformat(),
"user": 'fake_user',
}
share_security_service_info.update(attrs)
share_security_service = osc_fakes.FakeResource(info=copy.deepcopy(
share_security_service_info),
methods=methods,
loaded=True)
return share_security_service
@staticmethod
def create_fake_security_services(attrs=None, count=2):
"""Create multiple fake security services.
:param Dictionary attrs:
A dictionary with all attributes
:param Integer count:
The number of share security services to be faked
:return:
A list of FakeResource objects
"""
security_services = []
for n in range(count):
security_services.append(
FakeShareSecurityService.create_fake_security_service(attrs))
return security_services
class FakeSharePools(object):
"""Fake one or more share pool"""

View File

@ -0,0 +1,551 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from osc_lib import exceptions
from osc_lib import utils as oscutils
from manilaclient import api_versions
from manilaclient.osc.v2 import security_services as osc_security_services
from manilaclient.tests.unit.osc import osc_utils
from manilaclient.tests.unit.osc.v2 import fakes as manila_fakes
class TestShareSecurityService(manila_fakes.TestShare):
def setUp(self):
super(TestShareSecurityService, self).setUp()
self.security_services_mock = (
self.app.client_manager.share.security_services)
self.security_services_mock.reset_mock()
self.share_networks_mock = self.app.client_manager.share.share_networks
self.share_networks_mock.reset_mock()
self.app.client_manager.share.api_version = api_versions.APIVersion(
api_versions.MAX_VERSION
)
class TestShareSecurityServiceCreate(TestShareSecurityService):
def setUp(self):
super(TestShareSecurityServiceCreate, self).setUp()
self.security_service = manila_fakes.FakeShareSecurityService \
.create_fake_security_service()
self.security_services_mock.create.return_value = self.security_service
self.cmd = osc_security_services.CreateShareSecurityService(
self.app, None)
self.data = self.security_service._info.values()
self.columns = self.security_service._info.keys()
def test_share_security_service_create_missing_args(self):
arglist = []
verifylist = []
self.assertRaises(osc_utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
def test_share_security_service_create(self):
arglist = [
self.security_service.type,
'--dns-ip', self.security_service.dns_ip,
'--ou', self.security_service.ou,
'--server', self.security_service.server,
'--domain', self.security_service.domain,
'--user', self.security_service.user,
'--password', self.security_service.password,
'--name', self.security_service.name,
'--description', self.security_service.description
]
verifylist = [
('type', self.security_service.type),
('dns_ip', self.security_service.dns_ip),
('ou', self.security_service.ou),
('server', self.security_service.server),
('domain', self.security_service.domain),
('user', self.security_service.user),
('password', self.security_service.password),
('name', self.security_service.name),
('description', self.security_service.description)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.security_services_mock.create.assert_called_with(
self.security_service.type,
dns_ip=self.security_service.dns_ip,
server=self.security_service.server,
domain=self.security_service.domain,
user=self.security_service.user,
password=self.security_service.password,
name=self.security_service.name,
description=self.security_service.description,
ou=self.security_service.ou
)
self.assertCountEqual(self.columns, columns)
self.assertCountEqual(self.data, data)
def test_share_security_service_create_api_version_exception(self):
self.app.client_manager.share.api_version = api_versions.APIVersion(
'2.43'
)
arglist = [
self.security_service.type,
'--ou', self.security_service.ou,
]
verifylist = [
('type', self.security_service.type),
('ou', self.security_service.ou),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.assertRaises(
exceptions.CommandError, self.cmd.take_action, parsed_args)
class TestShareSecurityServiceDelete(TestShareSecurityService):
def setUp(self):
super(TestShareSecurityServiceDelete, self).setUp()
self.security_service = manila_fakes.FakeShareSecurityService \
.create_fake_security_service()
self.security_services_mock.get.return_value = self.security_service
self.security_services = manila_fakes.FakeShareSecurityService \
.create_fake_security_services()
self.cmd = osc_security_services.DeleteShareSecurityService(
self.app, None)
def test_share_security_service_delete_missing_args(self):
arglist = []
verifylist = []
self.assertRaises(osc_utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
def test_share_security_service_delete(self):
arglist = [
self.security_services[0].id,
self.security_services[1].id,
]
verifylist = [
('security_service', [self.security_services[0].id,
self.security_services[1].id]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.assertEqual(self.security_services_mock.delete.call_count,
len(self.security_services))
self.assertIsNone(result)
def test_share_security_service_delete_exception(self):
arglist = [
self.security_services[0].id,
]
verifylist = [
('security_service', [self.security_services[0].id]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.security_services_mock.delete.side_effect = \
exceptions.CommandError()
self.assertRaises(exceptions.CommandError,
self.cmd.take_action,
parsed_args)
class TestShareSecurityServiceShow(TestShareSecurityService):
def setUp(self):
super(TestShareSecurityServiceShow, self).setUp()
self.security_service = manila_fakes.FakeShareSecurityService \
.create_fake_security_service()
self.security_services_mock.get.return_value = self.security_service
self.cmd = osc_security_services.ShowShareSecurityService(
self.app, None)
self.data = self.security_service._info.values()
self.columns = self.security_service._info.keys()
def test_share_security_service_show_missing_args(self):
arglist = []
verifylist = []
self.assertRaises(osc_utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
def test_share_security_service_show(self):
arglist = [
self.security_service.id
]
verifylist = [
('security_service', self.security_service.id)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.security_services_mock.get.assert_called_with(
self.security_service.id
)
self.assertCountEqual(self.columns, columns)
self.assertCountEqual(self.data, data)
class TestShareSecurityServiceSet(TestShareSecurityService):
def setUp(self):
super(TestShareSecurityServiceSet, self).setUp()
self.security_service = manila_fakes.FakeShareSecurityService \
.create_fake_security_service(methods={'update': None})
self.security_services_mock.get.return_value = self.security_service
self.cmd = osc_security_services.SetShareSecurityService(
self.app, None)
def test_share_security_service_set_missing_args(self):
arglist = []
verifylist = []
self.assertRaises(osc_utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
def test_share_security_service_set(self):
arglist = [
self.security_service.id,
'--dns-ip', self.security_service.dns_ip,
'--ou', self.security_service.ou,
'--server', self.security_service.server,
'--domain', self.security_service.domain,
'--user', self.security_service.user,
'--password', self.security_service.password,
'--name', self.security_service.name,
'--description', self.security_service.description
]
verifylist = [
('security_service', self.security_service.id),
('dns_ip', self.security_service.dns_ip),
('ou', self.security_service.ou),
('server', self.security_service.server),
('domain', self.security_service.domain),
('user', self.security_service.user),
('password', self.security_service.password),
('name', self.security_service.name),
('description', self.security_service.description)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.security_service.update.assert_called_with(
dns_ip=self.security_service.dns_ip,
server=self.security_service.server,
domain=self.security_service.domain,
user=self.security_service.user,
password=self.security_service.password,
name=self.security_service.name,
description=self.security_service.description,
ou=self.security_service.ou
)
self.assertIsNone(result)
def test_share_security_service_set_exception(self):
arglist = [
self.security_service.id,
'--name', self.security_service.name,
]
verifylist = [
('security_service', self.security_service.id),
('name', self.security_service.name),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.security_service.update.side_effect = \
exceptions.CommandError()
self.assertRaises(
exceptions.CommandError, self.cmd.take_action, parsed_args)
def test_share_security_service_set_api_version_exception(self):
self.app.client_manager.share.api_version = api_versions.APIVersion(
'2.43'
)
arglist = [
self.security_service.id,
'--ou', self.security_service.ou,
]
verifylist = [
('security_service', self.security_service.id),
('ou', self.security_service.ou),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.assertRaises(
exceptions.CommandError, self.cmd.take_action, parsed_args)
class TestShareSecurityServiceUnset(TestShareSecurityService):
def setUp(self):
super(TestShareSecurityServiceUnset, self).setUp()
self.security_service = manila_fakes.FakeShareSecurityService \
.create_fake_security_service(methods={'update': None})
self.security_services_mock.get.return_value = self.security_service
self.cmd = osc_security_services.UnsetShareSecurityService(
self.app, None)
def test_share_security_service_unset_missing_args(self):
arglist = []
verifylist = []
self.assertRaises(osc_utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
def test_share_security_service_unset(self):
arglist = [
self.security_service.id,
'--dns-ip',
'--ou',
'--server',
'--domain',
'--user',
'--password',
'--name',
'--description',
]
verifylist = [
('security_service', self.security_service.id),
('dns_ip', True),
('ou', True),
('server', True),
('domain', True),
('user', True),
('password', True),
('name', True),
('description', True)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.security_service.update.assert_called_with(
dns_ip='',
server='',
domain='',
user='',
password='',
name='',
description='',
ou=''
)
self.assertIsNone(result)
def test_share_security_service_unset_exception(self):
arglist = [
self.security_service.id,
'--name',
]
verifylist = [
('security_service', self.security_service.id),
('name', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.security_service.update.side_effect = \
exceptions.CommandError()
self.assertRaises(
exceptions.CommandError, self.cmd.take_action, parsed_args)
def test_share_security_service_unset_api_version_exception(self):
self.app.client_manager.share.api_version = api_versions.APIVersion(
'2.43'
)
arglist = [
self.security_service.id,
'--ou',
]
verifylist = [
('security_service', self.security_service.id),
('ou', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.assertRaises(
exceptions.CommandError, self.cmd.take_action, parsed_args)
class TestShareSecurityServiceList(TestShareSecurityService):
columns = [
'ID',
'Name',
'Status',
'Type',
]
def setUp(self):
super(TestShareSecurityServiceList, self).setUp()
self.share_network = (
manila_fakes.FakeShareNetwork.create_one_share_network())
self.share_networks_mock.get.return_value = self.share_network
self.services_list = manila_fakes.FakeShareSecurityService \
.create_fake_security_services()
self.security_services_mock.list.return_value = self.services_list
self.values = (oscutils.get_dict_properties(
i._info, self.columns) for i in self.services_list)
self.cmd = osc_security_services.ListShareSecurityService(
self.app, None)
def test_share_security_service_list_no_args(self):
arglist = []
verifylist = []
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.security_services_mock.list.assert_called_with(
search_opts={
'all_tenants': False,
'status': None,
'name': None,
'type': None,
'user': None,
'dns_ip': None,
'server': None,
'domain': None,
'offset': None,
'limit': None,
},
detailed=False)
self.assertEqual(self.columns, columns)
self.assertEqual(list(self.values), list(data))
def test_share_security_service_list(self):
arglist = [
'--share-network', self.share_network.id,
'--status', self.services_list[0].status,
'--name', self.services_list[0].name,
'--type', self.services_list[0].type,
'--user', self.services_list[0].user,
'--dns-ip', self.services_list[0].dns_ip,
'--ou', self.services_list[0].ou,
'--server', self.services_list[0].server,
'--domain', self.services_list[0].domain,
'--limit', '1',
]
verifylist = [
('share_network', self.share_network.id),
('status', self.services_list[0].status),
('name', self.services_list[0].name),
('type', self.services_list[0].type),
('user', self.services_list[0].user),
('dns_ip', self.services_list[0].dns_ip),
('ou', self.services_list[0].ou),
('server', self.services_list[0].server),
('domain', self.services_list[0].domain),
('limit', 1),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.security_services_mock.list.assert_called_with(
search_opts={
'all_tenants': False,
'status': self.services_list[0].status,
'name': self.services_list[0].name,
'type': self.services_list[0].type,
'user': self.services_list[0].user,
'dns_ip': self.services_list[0].dns_ip,
'server': self.services_list[0].server,
'domain': self.services_list[0].domain,
'offset': None,
'limit': 1,
'ou': self.services_list[0].ou,
'share_network_id': self.share_network.id,
},
detailed=False)
self.assertEqual(self.columns, columns)
self.assertEqual(list(self.values), list(data))
def test_share_security_service_list_ou_api_version_exception(self):
self.app.client_manager.share.api_version = api_versions.APIVersion(
'2.43'
)
arglist = [
'--ou', self.services_list[0].ou,
]
verifylist = [
('ou', self.services_list[0].ou),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.assertRaises(
exceptions.CommandError, self.cmd.take_action, parsed_args)
def test_share_security_service_list_detail_all_projects(self):
arglist = [
'--all-projects',
'--detail'
]
verifylist = [
('all_projects', True),
('detail', True),
]
columns_detail = self.columns.copy()
columns_detail.append('Project ID')
columns_detail.append('Share Networks')
values_detail = (oscutils.get_dict_properties(
i._info, columns_detail) for i in self.services_list)
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.security_services_mock.list.assert_called_with(
search_opts={
'all_tenants': True,
'status': None,
'name': None,
'type': None,
'user': None,
'dns_ip': None,
'server': None,
'domain': None,
'offset': None,
'limit': None,
},
detailed=True)
self.assertEqual(columns_detail, columns)
self.assertEqual(list(values_detail), list(data))

View File

@ -102,6 +102,12 @@ openstack.share.v2 =
share_availability_zone_list = manilaclient.osc.v2.availability_zones:ShareAvailabilityZoneList
share_service_set = manilaclient.osc.v2.services:SetShareService
share_service_list = manilaclient.osc.v2.services:ListShareService
share_security_service_create = manilaclient.osc.v2.security_services:CreateShareSecurityService
share_security_service_delete = manilaclient.osc.v2.security_services:DeleteShareSecurityService
share_security_service_show = manilaclient.osc.v2.security_services:ShowShareSecurityService
share_security_service_set = manilaclient.osc.v2.security_services:SetShareSecurityService
share_security_service_unset = manilaclient.osc.v2.security_services:UnsetShareSecurityService
share_security_service_list = manilaclient.osc.v2.security_services:ListShareSecurityService
share_pool_list = manilaclient.osc.v2.share_pools:ListSharePools
share_instance_delete = manilaclient.osc.v2.share_instances:ShareInstanceDelete
share_instance_list = manilaclient.osc.v2.share_instances:ShareInstanceList