Adds support for renaming the builtin account administrator

Add new parameter which renames the builtin administrator user instead of
creating a new one. The new account name is provided by the metadata
service or specified in the conf file using the "username" option.
- rename_admin_user: type bool, default value False

Change-Id: I5f8c98c4854db21f7bb8eeddb81e9cf98c309a01
Implements: blueprint add-rename-administrator-option
Co-Authored-By: Paula Madalina Crismaru <pcrismaru@cloudbasesolutions.com>
This commit is contained in:
Alessandro Pilotti 2017-02-07 11:19:11 +02:00
parent 278468d2e2
commit f4fce4f37c
7 changed files with 191 additions and 8 deletions

View File

@ -95,6 +95,10 @@ class GlobalOptions(conf_base.Options):
'groups', default=['Administrators'],
help='List of local groups to which the user specified in '
'"username" will be added'),
cfg.BoolOpt(
'rename_admin_user', default=False,
help='Renames the builtin admin user instead of creating a '
'new user'),
cfg.StrOpt(
'heat_config_dir', default='C:\\cfn',
help='The directory where the Heat configuration files must '

View File

@ -137,6 +137,9 @@ class BaseMetadataService(object):
in the namedtuple defined above.
"""
def get_admin_username(self):
pass
def get_admin_password(self):
pass

View File

@ -54,6 +54,15 @@ class BaseOSUtils(object):
def create_user(self, username, password, password_expires=False):
raise NotImplementedError()
def rename_user(self, username, new_username):
raise NotImplementedError()
def enum_users(self):
raise NotImplementedError()
def is_builtin_admin(self, username):
raise NotImplementedError()
def set_user_password(self, username, password, password_expires=False):
raise NotImplementedError()

View File

@ -407,6 +407,39 @@ class WindowsUtils(base.BaseOSUtils):
raise exception.CloudbaseInitException(
"Create user failed: %s" % ex.args[2])
def rename_user(self, username, new_username):
user_info = {
"name": new_username,
}
try:
win32net.NetUserSetInfo(None, username, 0, user_info)
except win32net.error as ex:
if ex.args[0] == self.NERR_UserNotFound:
raise exception.ItemNotFoundException(
"User not found: %s" % username)
else:
raise exception.CloudbaseInitException(
"Renaming user failed: %s" % ex.args[2])
def enum_users(self):
usernames = []
resume_handle = 0
while True:
try:
users_info, total, resume_handle = win32net.NetUserEnum(
None, 0, win32netcon.FILTER_NORMAL_ACCOUNT, resume_handle)
except win32net.error as ex:
raise exception.CloudbaseInitException(
"Enumerating users failed: %s" % ex.args[2])
usernames += [u["name"] for u in users_info]
if not resume_handle:
return usernames
def is_builtin_admin(self, username):
sid = self.get_user_sid(username)
return sid and sid.startswith(u"S-1-5-") and sid.endswith(u"-500")
def _get_user_info(self, username, level):
try:
return win32net.NetUserGetInfo(None, username, level)

View File

@ -54,13 +54,27 @@ class BaseCreateUserPlugin(base.BasePlugin):
return osutils.generate_random_password(maximum_length)
def execute(self, service, shared_data):
user_name = CONF.username
user_name = service.get_admin_username() or CONF.username
shared_data[constants.SHARED_DATA_USERNAME] = user_name
osutils = osutils_factory.get_os_utils()
password = self._get_password(osutils)
if osutils.user_exists(user_name):
if CONF.rename_admin_user:
admin_user_name = [u for u in osutils.enum_users()
if osutils.is_builtin_admin(u)][0]
if admin_user_name.lower() != user_name.lower():
LOG.info('Renaming builtin admin user "%(admin_user_name)s" '
'to %(new_user_name)s and setting password',
{'admin_user_name': admin_user_name,
'new_user_name': user_name})
osutils.rename_user(admin_user_name, user_name)
osutils.set_user_password(user_name, password)
else:
LOG.info('"%s" is already the name of the builtin admin '
'user, skipping renaming', user_name)
elif osutils.user_exists(user_name):
LOG.info('Setting password for existing user "%s"', user_name)
osutils.set_user_password(user_name, password)
else:

View File

@ -2361,3 +2361,92 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
def test_trim_exception(self):
self._test_trim(err=True)
def _test_rename_user(self, exc=None):
new_username = "new username"
user_info = {
"name": new_username,
}
userset_mock = self._win32net_mock.NetUserSetInfo
if exc:
userset_mock.side_effect = [exc]
error_class = (
exception.ItemNotFoundException if
exc.args[0] == self._winutils.NERR_UserNotFound else
exception.CloudbaseInitException)
with self.assertRaises(error_class):
self._winutils.rename_user(self._USERNAME, new_username)
return
self._winutils.rename_user(self._USERNAME, new_username)
userset_mock.assert_called_once_with(
None, self._USERNAME, 0, user_info)
def test_rename_user(self):
self._test_rename_user()
def test_rename_user_item_not_found(self):
exc = self._win32net_mock.error(self._winutils.NERR_UserNotFound,
*([mock.Mock()] * 2))
self._test_rename_user(exc=exc)
def test_rename_user_failed(self):
exc = self._win32net_mock.error(*([mock.Mock()] * 3))
self._test_rename_user(exc=exc)
def _test_enum_users(self, resume_handle=False, exc=None):
userenum_mock = self._win32net_mock.NetUserEnum
if exc is not None:
userenum_mock.side_effect = [exc]
with self.assertRaises(exception.CloudbaseInitException):
self._winutils.enum_users()
return
else:
userenum_mock.side_effect = (
[([{"name": "fake name"}], mock.sentinel, False)] * 3 +
[([{"name": "fake name"}], mock.sentinel, resume_handle)])
self._winutils.enum_users()
result = self._winutils.enum_users()
if resume_handle:
self.assertEqual(result, ["fake name"] * 3)
def test_enum_users_exception(self):
exc = self._win32net_mock.error(self._win32net_mock.error,
*([mock.Mock()] * 2))
self._test_enum_users(exc=exc)
def test_enum_users(self):
self._test_enum_users(resume_handle=False)
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
'.get_user_sid')
def _test_is_builtin_admin(self, mock_get_user_sid, sid_exists,
sid_startswith, sid_endswith):
mock_sid = mock.Mock()
mock_sid.startswith.return_value = sid_startswith
mock_sid.endswith.return_value = sid_endswith
if sid_exists:
mock_get_user_sid.return_value = mock_sid
else:
mock_get_user_sid.return_value = False
expected_result = sid_exists and sid_startswith and sid_endswith
result = self._winutils.is_builtin_admin(mock.sentinel)
self.assertEqual(result, expected_result)
def test_is_builtin_admin_no_sid(self):
self._test_is_builtin_admin(sid_exists=False,
sid_startswith=True, sid_endswith=True)
def test_is_builtin_admin_sid_no_startswith(self):
self._test_is_builtin_admin(sid_exists=True,
sid_startswith=False, sid_endswith=True)
def test_is_builtin_admin_sid_no_endswith(self):
self._test_is_builtin_admin(sid_exists=True,
sid_startswith=True, sid_endswith=False)
def test_is_builtin_admin(self):
self._test_is_builtin_admin(sid_exists=True,
sid_startswith=True, sid_endswith=True)

View File

@ -58,29 +58,51 @@ class CreateUserPluginTests(unittest.TestCase):
@mock.patch.object(CreateUserPlugin, 'post_create_user')
def _test_execute(self, mock_post_create_user, mock_create_user,
mock_get_password, mock_get_os_utils,
user_exists=True,
group_adding_works=True):
user_exists=False, group_adding_works=True,
rename_admin_user=False, rename_admin_taken=False):
shared_data = {}
mock_osutils = mock.MagicMock()
mock_service = mock.MagicMock()
mock_service.get_admin_username.return_value = CONF.username
mock_get_password.return_value = 'password'
mock_get_os_utils.return_value = mock_osutils
mock_osutils.user_exists.return_value = user_exists
if rename_admin_user:
mock_osutils.is_builtin_admin.return_value = True
if rename_admin_taken:
mock_osutils.enum_users.return_value = [CONF.username]
else:
mock_osutils.enum_users.return_value = ["fake user name"]
if not group_adding_works:
mock_osutils.add_user_to_local_group.side_effect = Exception
with testutils.LogSnatcher("cloudbaseinit.plugins.common."
"createuser") as snatcher:
response = self._create_user.execute(mock_service, shared_data)
with testutils.ConfPatcher('rename_admin_user', rename_admin_user):
with testutils.LogSnatcher("cloudbaseinit.plugins.common."
"createuser") as snatcher:
response = self._create_user.execute(mock_service, shared_data)
mock_get_os_utils.assert_called_once_with()
mock_get_password.assert_called_once_with(mock_osutils)
mock_osutils.user_exists.assert_called_once_with(CONF.username)
if user_exists:
mock_osutils.user_exists.assert_called_once_with(CONF.username)
mock_osutils.set_user_password.assert_called_once_with(
CONF.username, 'password')
expected_logging = ["Setting password for existing user \"%s\""
% CONF.username]
elif rename_admin_user:
if rename_admin_taken:
expected_logging = [
'"%s" is already the name of the builtin admin '
'user, skipping renaming' % CONF.username
]
else:
expected_logging = [
'Renaming builtin admin user "{admin_user_name}" '
'to {new_user_name} and setting password'.format(
admin_user_name="fake user name",
new_user_name=CONF.username)
]
else:
mock_create_user.assert_called_once_with(
CONF.username, 'password',
@ -110,3 +132,12 @@ class CreateUserPluginTests(unittest.TestCase):
def test_execute_add_to_group_fails(self):
self._test_execute(group_adding_works=False)
def test_execute_rename_admin(self):
self._test_execute(
user_exists=False,
rename_admin_user=True)
def test_execute_rename_admin_taken(self):
self._test_execute(rename_admin_user=True,
rename_admin_taken=True)