identity: Migrate 'user' commands to SDK

Change-Id: I06f3848812bce60c65909f1311f36b70eba427d4
This commit is contained in:
ArtofBugs 2024-02-14 09:27:13 -08:00
parent c86b9d8cc7
commit 680e3e3011
3 changed files with 429 additions and 396 deletions

View File

@ -18,7 +18,7 @@
import copy
import logging
from keystoneauth1 import exceptions as ks_exc
from openstack import exceptions as sdk_exc
from osc_lib.command import command
from osc_lib import exceptions
from osc_lib import utils
@ -30,6 +30,33 @@ from openstackclient.identity import common
LOG = logging.getLogger(__name__)
def _format_user(user):
columns = (
'default_project_id',
'domain_id',
'email',
'is_enabled',
'id',
'name',
'description',
'password_expires_at',
)
column_headers = (
'default_project_id',
'domain_id',
'email',
'enabled',
'id',
'name',
'description',
'password_expires_at',
)
return (
column_headers,
utils.get_item_properties(user, columns),
)
def _get_options_for_user(identity_client, parsed_args):
options = {}
if parsed_args.ignore_lockout_failure_attempts:
@ -220,25 +247,26 @@ class CreateUser(command.ShowOne):
return parser
def take_action(self, parsed_args):
identity_client = self.app.client_manager.identity
project_id = None
if parsed_args.project:
project_id = common.find_project(
identity_client,
parsed_args.project,
parsed_args.project_domain,
).id
identity_client = self.app.client_manager.sdk_connection.identity
domain_id = None
if parsed_args.domain:
domain_id = common.find_domain(
identity_client, parsed_args.domain
domain_id = identity_client.find_domain(
name_or_id=parsed_args.domain,
ignore_missing=False,
).id
enabled = True
project_id = None
if parsed_args.project:
project_id = identity_client.find_project(
name_or_id=parsed_args.project,
ignore_missing=False,
domain_id=domain_id,
).id
is_enabled = True
if parsed_args.disable:
enabled = False
is_enabled = False
if parsed_args.password_prompt:
parsed_args.password = utils.get_password(self.app.stdin)
@ -252,29 +280,28 @@ class CreateUser(command.ShowOne):
options = _get_options_for_user(identity_client, parsed_args)
try:
user = identity_client.users.create(
name=parsed_args.name,
domain=domain_id,
default_project=project_id,
password=parsed_args.password,
email=parsed_args.email,
user = identity_client.create_user(
default_project_id=project_id,
description=parsed_args.description,
enabled=enabled,
domain_id=domain_id,
email=parsed_args.email,
is_enabled=is_enabled,
name=parsed_args.name,
password=parsed_args.password,
options=options,
)
except ks_exc.Conflict:
except sdk_exc.ConflictException:
if parsed_args.or_show:
user = utils.find_resource(
identity_client.users,
parsed_args.name,
user = identity_client.find_user(
name_or_id=parsed_args.name,
domain_id=domain_id,
ignore_missing=False,
)
LOG.info(_('Returning existing user %s'), user.name)
else:
raise
user._info.pop('links')
return zip(*sorted(user._info.items()))
return _format_user(user)
class DeleteUser(command.Command):
@ -296,21 +323,28 @@ class DeleteUser(command.Command):
return parser
def take_action(self, parsed_args):
identity_client = self.app.client_manager.identity
identity_client = self.app.client_manager.sdk_connection.identity
domain = None
if parsed_args.domain:
domain = common.find_domain(identity_client, parsed_args.domain)
domain = identity_client.find_domain(
name_or_id=parsed_args.domain,
ignore_missing=True,
)
errors = 0
for user in parsed_args.users:
try:
if domain is not None:
user_obj = utils.find_resource(
identity_client.users, user, domain_id=domain.id
user_obj = identity_client.find_user(
name_or_id=user,
domain_id=domain.id,
ignore_missing=False,
)
else:
user_obj = utils.find_resource(identity_client.users, user)
identity_client.users.delete(user_obj.id)
user_obj = identity_client.find_user(
name_or_id=user, ignore_missing=False
)
identity_client.delete_user(user_obj.id, ignore_missing=False)
except Exception as e:
errors += 1
LOG.error(
@ -360,32 +394,36 @@ class ListUser(command.Lister):
return parser
def take_action(self, parsed_args):
identity_client = self.app.client_manager.identity
identity_client = self.app.client_manager.sdk_connection.identity
domain = None
if parsed_args.domain:
domain = common.find_domain(identity_client, parsed_args.domain).id
domain = identity_client.find_domain(
name_or_id=parsed_args.domain,
).id
group = None
if parsed_args.group:
group = common.find_group(
identity_client, parsed_args.group, parsed_args.domain
group = identity_client.find_group(
name_or_id=parsed_args.group,
domain_id=parsed_args.domain,
ignore_missing=False,
).id
if parsed_args.project:
if domain is not None:
project = utils.find_resource(
identity_client.projects,
parsed_args.project,
project = identity_client.find_project(
name_or_id=parsed_args.project,
domain_id=domain,
ignore_missing=False,
).id
else:
project = utils.find_resource(
identity_client.projects,
parsed_args.project,
project = identity_client.find_project(
name_or_id=parsed_args.project,
ignore_missing=False,
).id
assignments = identity_client.role_assignments.list(
assignments = identity_client.role_assignments_filter(
project=project
)
@ -394,19 +432,19 @@ class ListUser(command.Lister):
# are looking for any role, let's just track unique user IDs.
user_ids = set()
for assignment in assignments:
if hasattr(assignment, 'user'):
if assignment.user:
user_ids.add(assignment.user['id'])
# NOTE(stevemar): Call find_resource once we have unique IDs, so
# it's fewer trips to the Identity API, then collect the data.
data = []
for user_id in user_ids:
user = utils.find_resource(identity_client.users, user_id)
user = identity_client.find_user(user_id, ignore_missing=False)
data.append(user)
else:
data = identity_client.users.list(
domain=domain,
data = identity_client.users(
domain_id=domain,
group=group,
)
@ -419,11 +457,12 @@ class ListUser(command.Lister):
'Domain Id',
'Description',
'Email',
'Enabled',
'Is Enabled',
]
column_headers = copy.deepcopy(columns)
column_headers[2] = 'Project'
column_headers[3] = 'Domain'
column_headers[6] = 'Enabled'
else:
columns = ['ID', 'Name']
column_headers = columns
@ -507,7 +546,7 @@ class SetUser(command.Command):
return parser
def take_action(self, parsed_args):
identity_client = self.app.client_manager.identity
identity_client = self.app.client_manager.sdk_connection.identity
if parsed_args.password_prompt:
parsed_args.password = utils.get_password(self.app.stdin)
@ -524,14 +563,19 @@ class SetUser(command.Command):
identity_client, 'user', parsed_args.user, parsed_args.domain
)
if parsed_args.domain:
domain = common.find_domain(identity_client, parsed_args.domain)
user = utils.find_resource(
identity_client.users, user_str, domain_id=domain.id
domain = identity_client.find_domain(
name_or_id=parsed_args.domain,
ignore_missing=False,
)
user = identity_client.find_user(
name_or_id=user_str,
domain_id=domain.id,
ignore_missing=False,
)
else:
user = utils.find_resource(
identity_client.users,
parsed_args.user,
user = identity_client.find_user(
name_or_id=parsed_args.user,
ignore_missing=False,
)
kwargs = {}
@ -544,23 +588,27 @@ class SetUser(command.Command):
if parsed_args.description:
kwargs['description'] = parsed_args.description
if parsed_args.project:
project_id = common.find_project(
identity_client,
parsed_args.project,
parsed_args.project_domain,
project_domain_id = identity_client.find_domain(
name_or_id=parsed_args.project_domain,
ignore_missing=False,
).id
kwargs['default_project'] = project_id
kwargs['enabled'] = user.enabled
project_id = identity_client.find_project(
name_or_id=parsed_args.project,
ignore_missing=False,
domain_id=project_domain_id,
).id
kwargs['default_project_id'] = project_id
kwargs['is_enabled'] = user.is_enabled
if parsed_args.enable:
kwargs['enabled'] = True
kwargs['is_enabled'] = True
if parsed_args.disable:
kwargs['enabled'] = False
kwargs['is_enabled'] = False
options = _get_options_for_user(identity_client, parsed_args)
if options:
kwargs['options'] = options
identity_client.users.update(user.id, **kwargs)
identity_client.update_user(user=user, **kwargs)
class SetPasswordUser(command.Command):
@ -583,7 +631,7 @@ class SetPasswordUser(command.Command):
return parser
def take_action(self, parsed_args):
identity_client = self.app.client_manager.identity
identity_client = self.app.client_manager.sdk_connection.identity
# FIXME(gyee): there are two scenarios:
#
@ -625,7 +673,9 @@ class SetPasswordUser(command.Command):
)
)
identity_client.users.update_password(current_password, password)
identity_client.update_user(
current_password=current_password, password=password
)
class ShowUser(command.ShowOne):
@ -646,18 +696,28 @@ class ShowUser(command.ShowOne):
return parser
def take_action(self, parsed_args):
identity_client = self.app.client_manager.identity
identity_client = self.app.client_manager.sdk_connection.identity
user_str = common._get_token_resource(
identity_client, 'user', parsed_args.user, parsed_args.domain
)
domain = None
if parsed_args.domain:
domain = common.find_domain(identity_client, parsed_args.domain)
user = utils.find_resource(
identity_client.users, user_str, domain_id=domain.id
domain = identity_client.find_domain(
name_or_id=parsed_args.domain,
ignore_missing=True,
)
if domain:
user = identity_client.find_user(
name_or_id=user_str,
domain_id=domain.id,
ignore_missing=False,
)
else:
user = utils.find_resource(identity_client.users, user_str)
user = identity_client.find_user(
name_or_id=user_str,
ignore_missing=False,
)
user._info.pop('links')
return zip(*sorted(user._info.items()))
return _format_user(user)

View File

@ -33,7 +33,6 @@ class IdentityTests(base.TestCase):
'enabled',
'id',
'name',
'name',
'domain_id',
'default_project_id',
'description',

File diff suppressed because it is too large Load Diff