Merge "Add service user password rotation actions"

This commit is contained in:
Zuul 2023-02-28 18:40:15 +00:00 committed by Gerrit Code Review
commit 0f974bb539
7 changed files with 377 additions and 2 deletions

View File

@ -58,6 +58,10 @@ deployed then see file `actions.yaml`.
* `pause`
* `resume`
* `security-checklist`
* `get-admin-password`
* `rotate-admin-password`
* `rotate-service-user-password`
* `list-service-usernames`
## High availability

View File

@ -17,6 +17,24 @@ rotate-admin-password:
Rotate the admin user's password.
The current password is replaced with a randomly generated password. The
new password is stored in the leader's admin_passwd bucket.
rotate-service-user-password:
description: |
Rotate the specified service user's password. The current password is
replaced with a randomly generated password. The password is changed on the
relation to the service user's units. This may result in a control plane
outage for the duration of the password changing process.
params:
service-user:
type: string
description: |
The name of the service as specified by the service user charm. e.g.
'cinder' for the cinder charm, 'glance' for the glance charm.
list-service-usernames:
description: |
List the usernames of the services as known by keystone. This may differ
from those expected due to either prefixes or a service providing multiple
endpoints. The service username passed to 'rotate-service-user-password'
needs to be one of this list.
openstack-upgrade:
description: |
Perform openstack upgrades. Config option action-managed-upgrade must be

View File

@ -0,0 +1 @@
rotate_service_user_password.py

View File

@ -0,0 +1 @@
rotate_service_user_password.py

View File

@ -0,0 +1,74 @@
#!/usr/bin/env python3
#
# Copyright 2023 Canonical Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
sys.path.append('.')
sys.path.append('./hooks')
from charmhelpers.core.hookenv import (
action_fail,
action_get,
action_set,
)
import keystone_utils
def rotate_service_user_password_action(args):
"""Rotate the service user's password.
The parameter must be passed in the service-user parameter.
:raises: Exception if keystone client cannot update the password
"""
service_user = action_get("service-user")
try:
keystone_utils.rotate_service_user_passwd(service_user)
except keystone_utils.NotLeaderError as e:
action_fail(str(e))
except keystone_utils.InvalidService as e:
action_fail(str(e))
def list_service_usernames(args):
"""List the service usernames known in this model that can be rotated."""
usernames = keystone_utils.get_service_usernames()
action_set({'usernames': ','.join(usernames)})
ACTIONS = {
"rotate-service-user-password": rotate_service_user_password_action,
"list-service-usernames": list_service_usernames,
}
def main(args):
action_name = os.path.basename(args[0])
try:
action = ACTIONS[action_name]
except KeyError:
return "Action {} undefined".format(action_name)
else:
try:
action(args)
except Exception as e:
action_fail("Action {} failed: {}".format(action_name, str(e)))
if __name__ == "__main__":
sys.exit(main(sys.argv))

View File

@ -223,6 +223,7 @@ FERNET_KEY_ROTATE_SYNC_CRON_FILE = '/etc/cron.d/keystone-fernet-rotate-sync'
WSGI_KEYSTONE_API_CONF = '/etc/apache2/sites-enabled/wsgi-openstack-api.conf'
UNUSED_APACHE_SITE_FILES = ['/etc/apache2/sites-enabled/keystone.conf',
'/etc/apache2/sites-enabled/wsgi-keystone.conf']
SERVICE_PASSWD_LENGTH = 64
BASE_RESOURCE_MAP = OrderedDict([
(KEYSTONE_CONF, {
@ -1521,6 +1522,99 @@ def rotate_admin_passwd():
leader_set({'admin_passwd': new_passwd})
class NotLeaderError(Exception):
"""Raised if not the leader."""
pass
class InvalidService(Exception):
"""Raised if not the leader."""
pass
def get_service_usernames():
"""Return the known service usernames that can be password rotated.
:returns: the list of known service usernames
:rtype: List[str]
"""
usernames = [k[:-len('_passwd')] for k in leader_get()
if k.endswith('_passwd') and k != 'admin_passwd']
# now match against the service list.
valid_service_names = valid_services.keys()
known_service_usernames = []
for username in usernames:
# if a username has '_' in it, then it is a compound name.
parts = username.split('_')
if 'keystone' in parts:
continue
if not all(p in valid_service_names for p in parts):
continue
known_service_usernames.append(username)
return known_service_usernames
def rotate_service_user_passwd(service):
"""Rotate the password for the specified service user.
Note that the function checks that the charm is the leader.
:param service: the service to rotate the password for. This needs to be
in the form that it is stored in leader settings.
:type service: str
:raises: RuntimeError if the unit is not the leader.
:raises: ValueError if the service_user doesn't exist.
:raises: RuntimeError if the password can't be changed.
"""
if not is_leader():
msg = (
"This unit is not the leader and therefore can't rotate the "
"password for user {}.".format(service))
log(msg, level=ERROR)
raise NotLeaderError(msg)
# validate that the service is actually known about.
known_usernames = get_service_usernames()
if service not in known_usernames:
msg = ("Invalid service requested: '{}' not one of {} services."
.format(service, ', '.join(known_usernames)))
log(msg, level=ERROR)
raise InvalidService(msg)
# Note that the service is already prefixed as that is how it is stored in
# the leader-settings
# check whether the service has been related/saved
if not is_service_password_saved(service):
msg = ("Service requested: '{}' is not known in this model."
.format(service))
log(msg, level=ERROR)
raise InvalidService(msg)
# Rotate the password - note that rotate_service_password is for leader
# storage, and update_user_password uses the manager to get keystone to
# update the password in MySQL
passwd = rotate_service_password(service)
update_user_password(service, passwd, SERVICE_DOMAIN)
# Update just the password for the the relation data for the service.
relation_data = {
"service_password": passwd,
}
# workout what the relation id is?
id_svc_rel_ids = relation_ids('identity-service')
_relation_ids = []
for rid in id_svc_rel_ids:
rel_service_username = relation_get(unit=local_unit(),
rid=rid,
attribute='service_username')
if rel_service_username == service:
_relation_ids.append(rid)
if not _relation_ids:
msg = ("Service '{}' not found in relations, so not updating relation "
"data." .format(service))
log(msg, level=INFO)
return
for rid in _relation_ids:
peer_store_and_set(relation_id=rid, **relation_data)
relation_set(relation_id=rid, **relation_data)
def get_api_version():
api_version = config('preferred-api-version')
cmp_release = CompareOpenStackReleases(
@ -1696,8 +1790,7 @@ def get_service_password(service_username):
_migrate_service_passwords()
passwd = leader_get("{}_passwd".format(service_username))
if passwd is None:
passwd = pwgen(length=64)
passwd = pwgen(length=SERVICE_PASSWD_LENGTH)
return passwd
@ -1705,6 +1798,33 @@ def set_service_password(passwd, user):
_leader_set_secret({"{}_passwd".format(user): passwd})
def rotate_service_password(service_username):
"""Create a new service password for a service user and save it.
It is saved via `set_service_password`.
:param service_username: The service username to set the password for.
:type service_username: str
:returns: the generated password.
:rtype: str
"""
passwd = pwgen(length=SERVICE_PASSWD_LENGTH)
set_service_password(passwd, service_username)
return passwd
def is_service_password_saved(service_username):
"""Return true if the service_username has been saved.
:param service_username: The service username to check.
:type service_username: str
:returns: True if set
:rtype: bool
"""
passwd = leader_get("{}_passwd".format(service_username))
return passwd is not None
def is_password_changed(username, passwd):
_passwd = leader_get("{}_passwd".format(username))
return (_passwd is None or passwd != _passwd)

View File

@ -2188,3 +2188,160 @@ class TestKeystoneUtils(CharmTestCase):
pwgen.assert_not_called()
update_user_password.assert_not_called()
leader_set.assert_not_called()
@patch.object(utils, 'leader_get')
def test_get_service_usernames(self, leader_get):
leader_get.return_value = {
'ignore-me': 'ignored-value',
'glance_passwd': 'glance-pass',
'admin_passed': 'ignored-value',
'head_passwd': 'ignored-value',
'cinderv2_cinderv3_passwd': 'cinder-pass',
'cinderv2_other_passwd': 'ignored-value',
'keystone_passwd': 'ignored-value',
}
self.assertEqual(sorted(utils.get_service_usernames()),
sorted(['glance', 'cinderv2_cinderv3']))
@patch.object(utils, 'update_user_password')
@patch.object(utils, 'rotate_service_password')
@patch.object(utils, 'is_service_password_saved')
@patch.object(utils, 'is_leader')
def test_rotate_service_user_passwd__not_leader(
self,
is_leader,
is_service_password_saved,
rotate_service_password,
update_user_password,
):
is_leader.return_value = False
with self.assertRaises(utils.NotLeaderError):
utils.rotate_service_user_passwd('glance')
def _assert_regex_in_log(self, regex):
calls = self.log.call_args_list
self.assertEqual(len(calls), 1)
args = calls[0][0]
msg = args[0]
self.assertRegex(msg, regex)
@patch.object(utils, 'update_user_password')
@patch.object(utils, 'rotate_service_password')
@patch.object(utils, 'is_service_password_saved')
@patch.object(utils, 'is_leader')
@patch.object(utils, 'get_service_usernames')
def test_rotate_service_user_passwd__not_valid_service(
self,
get_service_usernames,
is_leader,
is_service_password_saved,
rotate_service_password,
update_user_password,
):
is_leader.return_value = True
get_service_usernames.return_value = ['glance', 'heat']
with self.assertRaises(utils.InvalidService):
utils.rotate_service_user_passwd('unknown-service')
self._assert_regex_in_log(r"^Invalid service.*unknown-service")
@patch.object(utils, 'update_user_password')
@patch.object(utils, 'rotate_service_password')
@patch.object(utils, 'is_service_password_saved')
@patch.object(utils, 'is_leader')
@patch.object(utils, 'get_service_usernames')
def test_rotate_service_user_passwd__service_not_in_model(
self,
get_service_usernames,
is_leader,
is_service_password_saved,
rotate_service_password,
update_user_password,
):
is_leader.return_value = True
is_service_password_saved.return_value = False
get_service_usernames.return_value = ['glance', 'heat']
with self.assertRaises(utils.InvalidService):
utils.rotate_service_user_passwd('glance')
self._assert_regex_in_log(r"^Service requested.*this model")
@patch.object(utils, 'update_user_password')
@patch.object(utils, 'rotate_service_password')
@patch.object(utils, 'is_service_password_saved')
@patch.object(utils, 'is_leader')
@patch.object(utils, 'get_service_usernames')
def test_rotate_service_user_passwd__service_not_in_relations(
self,
get_service_usernames,
is_leader,
is_service_password_saved,
rotate_service_password,
update_user_password,
):
is_leader.return_value = True
is_service_password_saved.return_value = True
get_service_usernames.return_value = ['glance', 'heat']
self.relation_ids.return_value = []
rotate_service_password.return_value = 'the-password'
utils.rotate_service_user_passwd('glance')
self._assert_regex_in_log(
r"^Service .*'glance'.*not found.*relations")
rotate_service_password.assert_called_once_with('glance')
update_user_password.assert_called_once_with(
'glance', 'the-password', utils.SERVICE_DOMAIN)
self.relation_ids.assert_called_once_with('identity-service')
@patch.object(utils, 'update_user_password')
@patch.object(utils, 'rotate_service_password')
@patch.object(utils, 'is_service_password_saved')
@patch.object(utils, 'is_leader')
@patch.object(utils, 'get_service_usernames')
def test_rotate_service_user_passwd__complete(
self,
get_service_usernames,
is_leader,
is_service_password_saved,
rotate_service_password,
update_user_password,
):
is_leader.return_value = True
is_service_password_saved.return_value = True
get_service_usernames.return_value = ['glance', 'heat']
self.relation_ids.return_value = ['relation:4']
self.relation_get.return_value = 'glance'
rotate_service_password.return_value = 'the-password'
self.local_unit.return_value = 'keystone/0'
utils.rotate_service_user_passwd('glance')
self.log.assert_not_called()
rotate_service_password.assert_called_once_with('glance')
update_user_password.assert_called_once_with(
'glance', 'the-password', utils.SERVICE_DOMAIN)
self.relation_ids.assert_called_once_with('identity-service')
self.relation_get.assert_called_once_with(
unit='keystone/0', rid='relation:4',
attribute='service_username')
self.peer_store_and_set.assert_called_once_with(
relation_id='relation:4', service_password='the-password')
self.relation_set.assert_called_once_with(
relation_id='relation:4', service_password='the-password')
@patch.object(utils, 'set_service_password')
@patch.object(utils, 'pwgen')
def test_rotate_service_password(self, pwgen, set_service_password):
pwgen.return_value = 'the-password'
self.assertEqual(utils.rotate_service_password('glance'),
'the-password')
pwgen.assert_called_once_with(length=utils.SERVICE_PASSWD_LENGTH)
set_service_password.assert_called_once_with('the-password', 'glance')
@patch.object(utils, 'leader_get')
def test_is_service_password_saved__true(self, _leader_get):
_leader_get.return_value = 'a-thing'
self.assertTrue(utils.is_service_password_saved('glance'))
_leader_get.assert_called_once_with('glance_passwd')
@patch.object(utils, 'leader_get')
def test_is_service_password_saved__false(self, _leader_get):
_leader_get.return_value = None
self.assertFalse(utils.is_service_password_saved('glance'))
_leader_get.assert_called_once_with('glance_passwd')