Adds ADSI based Windows user management

Replaces the Windows user creation and password management
implementation from "NET USER" execution to ADSI COM usage.

Change-Id: I4f95b094c95abb11762943afdb4be26541493159
Closes-Bug: #1404533
This commit is contained in:
Alessandro Pilotti 2014-12-21 22:50:21 +01:00
parent 2195a97213
commit 414fd1116c
4 changed files with 55 additions and 25 deletions

View File

@ -18,6 +18,7 @@ import os
import re
import time
import pywintypes
import six
from six.moves import winreg
from win32com import client
@ -325,24 +326,37 @@ class WindowsUtils(base.BaseOSUtils):
def user_exists(self, username):
return self._get_user_wmi_object(username) is not None
def _get_adsi_object(self, hostname='.', object_name=None,
object_type='computer'):
adsi = client.Dispatch("ADsNameSpaces")
winnt = adsi.GetObject("", "WinNT:")
query = "WinNT://%s" % hostname
if object_name:
object_name_san = self.sanitize_shell_input(object_name)
query += "/%s" % object_name_san
query += ",%s" % object_type
return winnt.OpenDSObject(query, "", "", 0)
def _create_or_change_user(self, username, password, create,
password_expires):
username_san = self.sanitize_shell_input(username)
password_san = self.sanitize_shell_input(password)
try:
if create:
host = self._get_adsi_object()
user = host.Create('user', username)
else:
user = self._get_adsi_object(object_name=username,
object_type='user')
args = ['NET', 'USER', username_san, password_san]
if create:
args.append('/ADD')
user.setpassword(password)
user.SetInfo()
(out, err, ret_val) = self.execute_process(args)
if not ret_val:
self._set_user_password_expiration(username, password_expires)
else:
except pywintypes.com_error as ex:
if create:
msg = "Create user failed: %s"
else:
msg = "Set user password failed: %s"
raise exception.CloudbaseInitException(msg % err)
raise exception.CloudbaseInitException(msg % ex.excepinfo[2])
def _sanitize_wmi_input(self, value):
return value.replace('\'', '\'\'')

View File

@ -0,0 +1,19 @@
# Copyright 2013 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class FakeComError(Exception):
def __init__(self):
super(FakeComError, self).__init__()
self.excepinfo = [None, None, None, None, None, -2144108544]

View File

@ -22,7 +22,7 @@ from oslo.config import cfg
import six
from cloudbaseinit import exception
from cloudbaseinit.tests import fake
CONF = cfg.CONF
@ -40,6 +40,7 @@ class WindowsUtilsTest(unittest.TestCase):
def setUp(self):
self._pywintypes_mock = mock.MagicMock()
self._pywintypes_mock.com_error = fake.FakeComError
self._win32com_mock = mock.MagicMock()
self._win32process_mock = mock.MagicMock()
self._win32security_mock = mock.MagicMock()
@ -177,15 +178,10 @@ class WindowsUtilsTest(unittest.TestCase):
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
'._set_user_password_expiration')
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
'.execute_process')
def _test_create_or_change_user(self, mock_execute_process,
'._get_adsi_object')
def _test_create_or_change_user(self, mock_get_adsi_object,
mock_set_user_password_expiration,
create, password_expires, ret_value=0):
args = ['NET', 'USER', self._USERNAME, self._PASSWORD]
if create:
args.append('/ADD')
mock_execute_process.return_value = (None, None, ret_value)
if not ret_value:
self._winutils._create_or_change_user(self._USERNAME,
@ -194,12 +190,18 @@ class WindowsUtilsTest(unittest.TestCase):
mock_set_user_password_expiration.assert_called_with(
self._USERNAME, password_expires)
else:
mock_get_adsi_object.side_effect = [
self._pywintypes_mock.com_error]
self.assertRaises(
exception.CloudbaseInitException,
self._winutils._create_or_change_user,
self._USERNAME, self._PASSWORD, create, password_expires)
mock_execute_process.assert_called_with(args)
if create:
mock_get_adsi_object.assert_called_with()
else:
mock_get_adsi_object.assert_called_with(
object_name=self._USERNAME, object_type='user')
def test_create_user_and_add_password_expire_true(self):
self._test_create_or_change_user(create=True, password_expires=True)

View File

@ -19,18 +19,13 @@ import mock
import unittest
from cloudbaseinit import exception
class FakeComError(Exception):
def __init__(self):
super(FakeComError, self).__init__()
self.excepinfo = [None, None, None, None, None, -2144108544]
from cloudbaseinit.tests import fake
class WinRMConfigTests(unittest.TestCase):
def setUp(self):
self._pywintypes_mock = mock.MagicMock()
self._pywintypes_mock.com_error = FakeComError
self._pywintypes_mock.com_error = fake.FakeComError
self._win32com_mock = mock.MagicMock()
self._module_patcher = mock.patch.dict(
'sys.modules',