diff --git a/cloudbaseinit/exception.py b/cloudbaseinit/exception.py index 147611b9..b87b0e48 100644 --- a/cloudbaseinit/exception.py +++ b/cloudbaseinit/exception.py @@ -12,6 +12,21 @@ # License for the specific language governing permissions and limitations # under the License. +import ctypes + class CloudbaseInitException(Exception): pass + + +class WindowsCloudbaseInitException(CloudbaseInitException): + + def __init__(self, msg="%r", error_code=None): + if error_code is None: + error_code = ctypes.GetLastError() + description = ctypes.FormatError(error_code) + try: + formatted_msg = msg % description + except TypeError: + formatted_msg = msg + super(WindowsCloudbaseInitException, self).__init__(formatted_msg) diff --git a/cloudbaseinit/osutils/windows.py b/cloudbaseinit/osutils/windows.py index 49d37f69..8d7fce1b 100644 --- a/cloudbaseinit/osutils/windows.py +++ b/cloudbaseinit/osutils/windows.py @@ -313,7 +313,7 @@ class WindowsUtils(base.BaseOSUtils): ret_val = advapi32.InitiateSystemShutdownW(0, "Cloudbase-Init reboot", 0, True, True) if not ret_val: - raise exception.CloudbaseInitException("Reboot failed") + raise exception.WindowsCloudbaseInitException("Reboot failed: %r") def _get_user_wmi_object(self, username): conn = wmi.WMI(moniker='//./root/cimv2') @@ -390,7 +390,8 @@ class WindowsUtils(base.BaseOSUtils): 0, six.text_type(username), sid, ctypes.byref(cbSid), domainName, ctypes.byref(cchReferencedDomainName), ctypes.byref(sidNameUse)) if not ret_val: - raise exception.CloudbaseInitException("Cannot get user SID") + raise exception.WindowsCloudbaseInitException( + "Cannot get user SID: %r") return (sid, domainName.value) @@ -430,7 +431,8 @@ class WindowsUtils(base.BaseOSUtils): six.text_type(password), 2, 0, ctypes.byref(token)) if not ret_val: - raise exception.CloudbaseInitException("User logon failed") + raise exception.WindowsCloudbaseInitException( + "User logon failed: %r") if load_profile: pi = Win32_PROFILEINFO() @@ -439,8 +441,8 @@ class WindowsUtils(base.BaseOSUtils): ret_val = userenv.LoadUserProfileW(token, ctypes.byref(pi)) if not ret_val: kernel32.CloseHandle(token) - raise exception.CloudbaseInitException( - "Cannot load user profile") + raise exception.WindowsCloudbaseInitException( + "Cannot load user profile: %r") return token @@ -465,7 +467,8 @@ class WindowsUtils(base.BaseOSUtils): self.ComputerNamePhysicalDnsHostname, six.text_type(new_host_name)) if not ret_val: - raise exception.CloudbaseInitException("Cannot set host name") + raise exception.WindowsCloudbaseInitException( + "Cannot set host name: %r") return True def get_network_adapters(self): @@ -823,8 +826,8 @@ class WindowsUtils(base.BaseOSUtils): buf = ctypes.create_unicode_buffer(buf_size + 1) buf_len = kernel32.GetLogicalDriveStringsW(buf_size, buf) if not buf_len: - raise exception.CloudbaseInitException( - "GetLogicalDriveStringsW failed") + raise exception.WindowsCloudbaseInitException( + "GetLogicalDriveStringsW failed: %r") return self._split_str_buf_list(buf, buf_len) @@ -865,8 +868,8 @@ class WindowsUtils(base.BaseOSUtils): ctypes.byref(required_size), None): if (kernel32.GetLastError() != self.ERROR_INSUFFICIENT_BUFFER): - raise exception.CloudbaseInitException( - "SetupDiGetDeviceInterfaceDetailW failed") + raise exception.WindowsCloudbaseInitException( + "SetupDiGetDeviceInterfaceDetailW failed: %r") pdidd = ctypes.cast( msvcrt.malloc(ctypes.c_size_t(required_size.value)), @@ -884,8 +887,8 @@ class WindowsUtils(base.BaseOSUtils): if not setupapi.SetupDiGetDeviceInterfaceDetailW( handle_disks, ctypes.byref(did), pdidd, required_size, None, None): - raise exception.CloudbaseInitException( - "SetupDiGetDeviceInterfaceDetailW failed") + raise exception.WindowsCloudbaseInitException( + "SetupDiGetDeviceInterfaceDetailW failed: %r") device_path = ctypes.cast( pdidd.contents.DevicePath, wintypes.LPWSTR).value @@ -904,8 +907,8 @@ class WindowsUtils(base.BaseOSUtils): handle_disk, self.IOCTL_STORAGE_GET_DEVICE_NUMBER, None, 0, ctypes.byref(sdn), ctypes.sizeof(sdn), ctypes.byref(b), None): - raise exception.CloudbaseInitException( - 'DeviceIoControl failed') + raise exception.WindowsCloudbaseInitException( + 'DeviceIoControl failed: %r') physical_disks.append( r"\\.\PHYSICALDRIVE%d" % sdn.DeviceNumber) diff --git a/cloudbaseinit/tests/osutils/test_windows.py b/cloudbaseinit/tests/osutils/test_windows.py index e91ba68c..382f7410 100644 --- a/cloudbaseinit/tests/osutils/test_windows.py +++ b/cloudbaseinit/tests/osutils/test_windows.py @@ -15,7 +15,6 @@ import importlib import os -import unittest try: import unittest.mock as mock @@ -26,11 +25,12 @@ import six from cloudbaseinit import exception from cloudbaseinit.tests import fake +from cloudbaseinit.tests import testutils CONF = cfg.CONF -class TestWindowsUtils(unittest.TestCase): +class TestWindowsUtils(testutils.CloudbaseInitTestBase): '''Tests for the windows utils class.''' _CONFIG_NAME = 'FakeConfig' @@ -101,14 +101,16 @@ class TestWindowsUtils(unittest.TestCase): @mock.patch('cloudbaseinit.osutils.windows.WindowsUtils' '._enable_shutdown_privilege') - def _test_reboot(self, mock_enable_shutdown_privilege, ret_value): + def _test_reboot(self, mock_enable_shutdown_privilege, ret_value, + expected_ret_value=None): advapi32 = self._windll_mock.advapi32 advapi32.InitiateSystemShutdownW = mock.MagicMock( return_value=ret_value) if not ret_value: - self.assertRaises(exception.CloudbaseInitException, - self._winutils.reboot) + with self.assert_raises_windows_message( + "Reboot failed: %r", expected_ret_value): + self._winutils.reboot() else: self._winutils.reboot() @@ -121,7 +123,7 @@ class TestWindowsUtils(unittest.TestCase): self._test_reboot(ret_value=True) def test_reboot_failed(self): - self._test_reboot(ret_value=None) + self._test_reboot(ret_value=None, expected_ret_value=100) @mock.patch('cloudbaseinit.osutils.windows.WindowsUtils' '._sanitize_wmi_input') @@ -256,7 +258,7 @@ class TestWindowsUtils(unittest.TestCase): def test_set_password_expiration_no_object(self): self._test_set_user_password_expiration(fake_obj=None) - def _test_get_user_sid_and_domain(self, ret_val): + def _test_get_user_sid_and_domain(self, ret_val, last_error=None): cbSid = mock.Mock() sid = mock.Mock() size = 1024 @@ -272,10 +274,11 @@ class TestWindowsUtils(unittest.TestCase): advapi32.LookupAccountNameW.return_value = ret_val if ret_val is None: - self.assertRaises( - exception.CloudbaseInitException, - self._winutils._get_user_sid_and_domain, - self._USERNAME) + with self.assert_raises_windows_message( + "Cannot get user SID: %r", + last_error): + self._winutils._get_user_sid_and_domain( + self._USERNAME) else: response = self._winutils._get_user_sid_and_domain(self._USERNAME) @@ -291,7 +294,7 @@ class TestWindowsUtils(unittest.TestCase): self._test_get_user_sid_and_domain(ret_val=fake_obj) def test_get_user_sid_and_domain_no_return_value(self): - self._test_get_user_sid_and_domain(ret_val=None) + self._test_get_user_sid_and_domain(ret_val=None, last_error=100) @mock.patch('cloudbaseinit.osutils.windows' '.Win32_LOCALGROUP_MEMBERS_INFO_3') @@ -372,8 +375,8 @@ class TestWindowsUtils(unittest.TestCase): @mock.patch('cloudbaseinit.osutils.windows.Win32_PROFILEINFO') def _test_create_user_logon_session(self, mock_Win32_PROFILEINFO, logon, - loaduser, - load_profile=True): + loaduser, load_profile=True, + last_error=None): self._wintypes_mock.HANDLE = mock.MagicMock() pi = self.windows_utils.Win32_PROFILEINFO() advapi32 = self._windll_mock.advapi32 @@ -383,20 +386,20 @@ class TestWindowsUtils(unittest.TestCase): advapi32.LogonUserW.return_value = logon if not logon: - self.assertRaises( - exception.CloudbaseInitException, - self._winutils.create_user_logon_session, - self._USERNAME, self._PASSWORD, domain='.', - load_profile=load_profile) + with self.assert_raises_windows_message( + "User logon failed: %r", last_error): + self._winutils.create_user_logon_session( + self._USERNAME, self._PASSWORD, domain='.', + load_profile=load_profile) elif load_profile and not loaduser: userenv.LoadUserProfileW.return_value = None kernel32.CloseHandle.return_value = None - - self.assertRaises(exception.CloudbaseInitException, - self._winutils.create_user_logon_session, - self._USERNAME, self._PASSWORD, domain='.', - load_profile=load_profile) + with self.assert_raises_windows_message( + "Cannot load user profile: %r", last_error): + self._winutils.create_user_logon_session( + self._USERNAME, self._PASSWORD, domain='.', + load_profile=load_profile) userenv.LoadUserProfileW.assert_called_with( self._wintypes_mock.HANDLE.return_value, @@ -427,28 +430,38 @@ class TestWindowsUtils(unittest.TestCase): self.assertTrue(response is not None) def test_create_user_logon_session_fail_load_false(self): - self._test_create_user_logon_session(0, 0, True) + self._test_create_user_logon_session(logon=0, loaduser=0, + load_profile=True, + last_error=100) def test_create_user_logon_session_fail_load_true(self): - self._test_create_user_logon_session(0, 0, False) + self._test_create_user_logon_session(logon=0, loaduser=0, + load_profile=False, + last_error=100) def test_create_user_logon_session_load_true(self): m = mock.Mock() n = mock.Mock() - self._test_create_user_logon_session(m, n, True) + self._test_create_user_logon_session(logon=m, loaduser=n, + load_profile=True) def test_create_user_logon_session_load_false(self): m = mock.Mock() n = mock.Mock() - self._test_create_user_logon_session(m, n, False) + self._test_create_user_logon_session(logon=m, loaduser=n, + load_profile=False) def test_create_user_logon_session_no_load_true(self): m = mock.Mock() - self._test_create_user_logon_session(m, None, True) + self._test_create_user_logon_session(logon=m, loaduser=None, + load_profile=True, + last_error=100) def test_create_user_logon_session_no_load_false(self): m = mock.Mock() - self._test_create_user_logon_session(m, None, False) + self._test_create_user_logon_session(logon=m, loaduser=None, + load_profile=False, + last_error=100) def test_close_user_logon_session(self): token = mock.Mock() @@ -459,12 +472,14 @@ class TestWindowsUtils(unittest.TestCase): self._windll_mock.kernel32.CloseHandle.assert_called_with(token) @mock.patch('ctypes.windll.kernel32.SetComputerNameExW') - def _test_set_host_name(self, mock_SetComputerNameExW, ret_value): + def _test_set_host_name(self, mock_SetComputerNameExW, ret_value, + last_error=None): mock_SetComputerNameExW.return_value = ret_value if not ret_value: - self.assertRaises(exception.CloudbaseInitException, - self._winutils.set_host_name, 'fake name') + with self.assert_raises_windows_message( + "Cannot set host name: %r", last_error): + self._winutils.set_host_name('fake name') else: self.assertTrue(self._winutils.set_host_name('fake name')) @@ -476,7 +491,7 @@ class TestWindowsUtils(unittest.TestCase): self._test_set_host_name(ret_value='fake response') def test_set_host_exception(self): - self._test_set_host_name(ret_value=None) + self._test_set_host_name(ret_value=None, last_error=100) @mock.patch('cloudbaseinit.osutils.windows.WindowsUtils' '.get_user_sid') @@ -1013,7 +1028,7 @@ class TestWindowsUtils(unittest.TestCase): mock_generate_random_password.assert_called_once_with(length) self.assertEqual('Passw0rd', response) - def _test_get_logical_drives(self, buf_length): + def _test_get_logical_drives(self, buf_length, last_error=None): mock_buf = mock.MagicMock() mock_buf.__getitem__.side_effect = ['1', '\x00'] mock_get_drives = self._windll_mock.kernel32.GetLogicalDriveStringsW @@ -1022,9 +1037,9 @@ class TestWindowsUtils(unittest.TestCase): mock_get_drives.return_value = buf_length if buf_length is None: - self.assertRaises(exception.CloudbaseInitException, - self._winutils._get_logical_drives) - + with self.assert_raises_windows_message( + "GetLogicalDriveStringsW failed: %r", last_error): + self._winutils._get_logical_drives() else: response = self._winutils._get_logical_drives() @@ -1033,7 +1048,7 @@ class TestWindowsUtils(unittest.TestCase): self.assertEqual(['1'], response) def test_get_logical_drives_exception(self): - self._test_get_logical_drives(buf_length=None) + self._test_get_logical_drives(buf_length=None, last_error=100) def test_get_logical_drives(self): self._test_get_logical_drives(buf_length=2) @@ -1056,7 +1071,8 @@ class TestWindowsUtils(unittest.TestCase): @mock.patch('cloudbaseinit.osutils.windows.Win32_STORAGE_DEVICE_NUMBER') def _test_get_physical_disks(self, mock_sdn, mock_setupapi, mock_kernel32, mock_msvcrt, handle_disks, last_error, - interface_detail, disk_handle, io_control): + interface_detail, disk_handle, io_control, + last_error_code=None): sizeof_calls = [ mock.call( @@ -1095,9 +1111,18 @@ class TestWindowsUtils(unittest.TestCase): interface_detail) or ( disk_handle == self._winutils.INVALID_HANDLE_VALUE) or ( not io_control): - - self.assertRaises(exception.CloudbaseInitException, - self._winutils.get_physical_disks) + if not io_control: + with self.assert_raises_windows_message( + "DeviceIoControl failed: %r", last_error_code): + self._winutils.get_physical_disks() + elif not interface_detail: + with self.assert_raises_windows_message( + "SetupDiGetDeviceInterfaceDetailW failed: %r", + last_error_code): + self._winutils.get_physical_disks() + else: + self.assertRaises(exception.CloudbaseInitException, + self._winutils.get_physical_disks) else: response = self._winutils.get_physical_disks() @@ -1163,6 +1188,7 @@ class TestWindowsUtils(unittest.TestCase): self._test_get_physical_disks( handle_disks=mock_handle_disks, last_error='other', interface_detail=None, + last_error_code=100, disk_handle=mock_disk_handle, io_control=True) def test_get_physical_disks_invalid_disk_handle(self): @@ -1180,6 +1206,7 @@ class TestWindowsUtils(unittest.TestCase): handle_disks=mock_handle_disks, last_error=self._winutils.ERROR_INSUFFICIENT_BUFFER, interface_detail='fake interface detail', + last_error_code=100, disk_handle=mock_disk_handle, io_control=False) def test_get_physical_disks_handle_disks_invalid(self): diff --git a/cloudbaseinit/tests/test_exception.py b/cloudbaseinit/tests/test_exception.py new file mode 100644 index 00000000..f5943f7b --- /dev/null +++ b/cloudbaseinit/tests/test_exception.py @@ -0,0 +1,35 @@ +# Copyright 2015 Cloudbase Solutions Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +import unittest + +from cloudbaseinit import exception +from cloudbaseinit.tests import testutils + + +WINDOWS = os.name == "nt" + + +@unittest.skipUnless(WINDOWS, "This requires the Windows platform.") +class TestException(testutils.CloudbaseInitTestBase): + + def test_windows_exception_no_error_code_given(self): + with self.assert_raises_windows_message("Test %r", error_code=100): + raise exception.WindowsCloudbaseInitException("Test %r") + + def test_windows_exception_error_code_given(self): + with self.assert_raises_windows_message("Test %r", error_code=100): + raise exception.WindowsCloudbaseInitException("Test %r", + error_code=100) diff --git a/cloudbaseinit/tests/testutils.py b/cloudbaseinit/tests/testutils.py index 3165d59f..2969dab6 100644 --- a/cloudbaseinit/tests/testutils.py +++ b/cloudbaseinit/tests/testutils.py @@ -18,9 +18,16 @@ import logging as base_logging import os import shutil import tempfile +import unittest + +try: + import unittest.mock as mock +except ImportError: + import mock from oslo.config import cfg +from cloudbaseinit import exception from cloudbaseinit.openstack.common import log as logging @@ -30,6 +37,7 @@ __all__ = ( 'create_tempfile', 'create_tempdir', 'LogSnatcher', + 'CloudbaseInitTestBase', 'ConfPatcher', ) @@ -145,3 +153,48 @@ class ConfPatcher(object): def __exit__(self, exc_type, exc_val, exc_tb): self._conf.set_override(self._key, self._original_value) + + +class CloudbaseInitTestBase(unittest.TestCase): + """A test base class, which provides a couple of useful methods.""" + + @contextlib.contextmanager + def assert_raises_windows_message( + self, expected_msg, error_code, + exc=exception.WindowsCloudbaseInitException): + """Helper method for testing raised error messages + + This assert method is similar to :meth:`~assertRaises`, but + it can only be used as a context manager. It will check that the + block of the with statement raises an exception of type :class:`exc`, + having as message the result of the interpolation between + `expected_msg` and a formatted string, obtained through + `ctypes.FormatError(error_code)`. + """ + # Can't use the decorator form, since it will not be properly set + # after the function passes control with the `yield` (so the + # with statement block will have the original value, not the + # mocked one). + + with self.assertRaises(exc) as cm: + with mock.patch('cloudbaseinit.exception.' + 'ctypes.FormatError', + create=True) as mock_format_error: + with mock.patch('cloudbaseinit.exception.ctypes.' + 'GetLastError', + create=True) as mock_get_last_error: + mock_format_error.return_value = "description" + yield + + if mock_get_last_error.called: + # This can be called when the error code is not given, + # but we don't have control over that, so test that + # it's actually called only once. + mock_get_last_error.assert_called_once_with() + mock_format_error.assert_called_once_with( + mock_get_last_error.return_value) + else: + mock_format_error.assert_called_once_with(error_code) + + expected_msg = expected_msg % mock_format_error.return_value + self.assertEqual(expected_msg, cm.exception.args[0]) diff --git a/cloudbaseinit/tests/utils/windows/test_physical_disk.py b/cloudbaseinit/tests/utils/windows/test_physical_disk.py index 7ceb4714..ca94ea9f 100644 --- a/cloudbaseinit/tests/utils/windows/test_physical_disk.py +++ b/cloudbaseinit/tests/utils/windows/test_physical_disk.py @@ -13,7 +13,6 @@ # under the License. import importlib -import unittest try: import unittest.mock as mock @@ -21,9 +20,10 @@ except ImportError: import mock from cloudbaseinit import exception as cbinit_exception +from cloudbaseinit.tests import testutils -class WindowsPhysicalDiskUtilsTests(unittest.TestCase): +class WindowsPhysicalDiskUtilsTests(testutils.CloudbaseInitTestBase): def setUp(self): self._ctypes_mock = mock.MagicMock() @@ -100,7 +100,8 @@ class WindowsPhysicalDiskUtilsTests(unittest.TestCase): @mock.patch('cloudbaseinit.utils.windows.physical_disk' '.Win32_DiskGeometry') - def _test_get_geometry(self, mock_Win32_DiskGeometry, _geom, ret_val): + def _test_get_geometry(self, mock_Win32_DiskGeometry, _geom, ret_val, + last_error=None): mock_DeviceIoControl = self.physical_disk.kernel32.DeviceIoControl expect_byref = [mock.call(mock_Win32_DiskGeometry.return_value), mock.call( @@ -110,8 +111,9 @@ class WindowsPhysicalDiskUtilsTests(unittest.TestCase): self.physical_disk.kernel32.DeviceIoControl.return_value = ret_val if not ret_val: - self.assertRaises(cbinit_exception.CloudbaseInitException, - self._phys_disk_class.get_geometry) + with self.assert_raises_windows_message( + "Cannot get disk geometry: %r", last_error): + self._phys_disk_class.get_geometry() elif _geom: response = self._phys_disk_class.get_geometry() self.assertEqual(_geom, response) @@ -142,10 +144,12 @@ class WindowsPhysicalDiskUtilsTests(unittest.TestCase): def test_get_geometry_no_geom(self): self._test_get_geometry(_geom=None, - ret_val=mock.sentinel.ret_val) + ret_val=mock.sentinel.ret_val, + last_error=100) def test_get_geometry_no_geom_exception(self): - self._test_get_geometry(_geom=None, ret_val=None) + self._test_get_geometry(_geom=None, ret_val=None, + last_error=100) def _test_seek(self, exception): expect_DWORD = [mock.call(0), mock.call(1)] @@ -174,13 +178,14 @@ class WindowsPhysicalDiskUtilsTests(unittest.TestCase): def test_seek_exception(self): self._test_seek(exception=True) - def _test_read(self, ret_val): + def _test_read(self, ret_val, last_error=None): bytes_to_read = mock.sentinel.bytes_to_read self.physical_disk.kernel32.ReadFile.return_value = ret_val if not ret_val: - self.assertRaises(cbinit_exception.CloudbaseInitException, - self._phys_disk_class.read, bytes_to_read) + with self.assert_raises_windows_message( + "Read exception: %r", last_error): + self._phys_disk_class.read(bytes_to_read) else: response = self._phys_disk_class.read(bytes_to_read) @@ -204,4 +209,4 @@ class WindowsPhysicalDiskUtilsTests(unittest.TestCase): self._test_read(ret_val=mock.sentinel.ret_val) def test_read_exception(self): - self._test_read(ret_val=None) + self._test_read(ret_val=None, last_error=100) diff --git a/cloudbaseinit/tests/utils/windows/test_virtual_disk.py b/cloudbaseinit/tests/utils/windows/test_virtual_disk.py index 4a86f319..16644584 100644 --- a/cloudbaseinit/tests/utils/windows/test_virtual_disk.py +++ b/cloudbaseinit/tests/utils/windows/test_virtual_disk.py @@ -13,17 +13,16 @@ # under the License. import importlib -import unittest try: import unittest.mock as mock except ImportError: import mock -from cloudbaseinit import exception +from cloudbaseinit.tests import testutils -class WindowsVirtualDiskUtilsTests(unittest.TestCase): +class WindowsVirtualDiskUtilsTests(testutils.CloudbaseInitTestBase): def setUp(self): self._ctypes_mock = mock.MagicMock() @@ -73,9 +72,10 @@ class WindowsVirtualDiskUtilsTests(unittest.TestCase): self._vdisk_class._handle = handle if ret_val: - self.assertRaises(exception.CloudbaseInitException, - self._vdisk_class.open) - + with self.assert_raises_windows_message( + "Cannot open virtual disk: %r", + ret_val): + self._vdisk_class.open() else: self._vdisk_class.open() if handle: @@ -95,7 +95,7 @@ class WindowsVirtualDiskUtilsTests(unittest.TestCase): self._test_open(handle=None, ret_val=None) def test_open_exception(self): - self._test_open(handle=None, ret_val=mock.sentinel.error_value) + self._test_open(handle=None, ret_val=100) def test_open_handle_exists(self): self._test_open(handle=None, ret_val=None) @@ -106,8 +106,10 @@ class WindowsVirtualDiskUtilsTests(unittest.TestCase): virtdisk.AttachVirtualDisk.return_value = ret_val if ret_val: - self.assertRaises(exception.CloudbaseInitException, - self._vdisk_class.attach) + with self.assert_raises_windows_message( + "Cannot attach virtual disk: %r", + ret_val): + self._vdisk_class.attach() else: self._vdisk_class.attach() @@ -119,7 +121,7 @@ class WindowsVirtualDiskUtilsTests(unittest.TestCase): self._test_attach(ret_val=None) def test_attach_exception(self): - self._test_attach(ret_val=mock.sentinel.error_value) + self._test_attach(ret_val=100) def _test_detach(self, ret_val): virtdisk = self._ctypes_mock.windll.virtdisk @@ -127,8 +129,9 @@ class WindowsVirtualDiskUtilsTests(unittest.TestCase): virtdisk.DetachVirtualDisk.return_value = ret_val if ret_val: - self.assertRaises(exception.CloudbaseInitException, - self._vdisk_class.detach) + with self.assert_raises_windows_message( + "Cannot detach virtual disk: %r", ret_val): + self._vdisk_class.detach() else: self._vdisk_class.detach() @@ -140,7 +143,7 @@ class WindowsVirtualDiskUtilsTests(unittest.TestCase): self._test_detach(ret_val=None) def test_detach_exception(self): - self._test_detach(ret_val=mock.sentinel.error_value) + self._test_detach(ret_val=100) def _test_get_physical_path(self, ret_val): virtdisk = self._ctypes_mock.windll.virtdisk @@ -150,8 +153,9 @@ class WindowsVirtualDiskUtilsTests(unittest.TestCase): buf = self._ctypes_mock.create_unicode_buffer.return_value if ret_val: - self.assertRaises(exception.CloudbaseInitException, - self._vdisk_class.get_physical_path) + with self.assert_raises_windows_message( + "Cannot get virtual disk physical path: %r", ret_val): + self._vdisk_class.get_physical_path() else: response = self._vdisk_class.get_physical_path() self.assertEqual(buf.value, response) @@ -163,14 +167,23 @@ class WindowsVirtualDiskUtilsTests(unittest.TestCase): buf) virtdisk.GetVirtualDiskPhysicalPath.assert_called_once_with( - self._vdisk_class._handle, self._ctypes_mock.byref.return_value) + self._vdisk_class._handle, + self._ctypes_mock.byref.return_value, + self._ctypes_mock.create_unicode_buffer.return_value) self._ctypes_mock.byref.assert_called_once_with( self._ctypes_mock.wintypes.DWORD.return_value) + self._ctypes_mock.create_unicode_buffer.assert_called_once_with(1024) + + def test_get_physical_path(self): + self._test_get_physical_path(ret_val=None) + + def test_get_physical_path_fails(self): + self._test_get_physical_path(ret_val=100) @mock.patch('cloudbaseinit.utils.windows.virtual_disk' '.VirtualDisk.get_physical_path') def _test_get_cdrom_drive_mount_point(self, mock_get_physical_path, - buf_len, ret_val): + buf_len, ret_val, last_error=None): buf = self._ctypes_mock.create_unicode_buffer.return_value kernel32 = self.virtual_disk.kernel32 kernel32.GetLogicalDriveStringsW.return_value = buf_len @@ -186,11 +199,13 @@ class WindowsVirtualDiskUtilsTests(unittest.TestCase): expected_create_unicode_buffer = [mock.call(2048)] if not buf_len: - self.assertRaises(exception.CloudbaseInitException, - self._vdisk_class.get_cdrom_drive_mount_point) + with self.assert_raises_windows_message( + "Cannot enumerate logical devices: %r", last_error): + self._vdisk_class.get_cdrom_drive_mount_point() elif not ret_val: - self.assertRaises(exception.CloudbaseInitException, - self._vdisk_class.get_cdrom_drive_mount_point) + with self.assert_raises_windows_message( + "Cannot query NT device: %r", last_error): + self._vdisk_class.get_cdrom_drive_mount_point() expected_create_unicode_buffer.append(mock.call(2048)) expected_sizeof.append(mock.call(self._ctypes_mock.wintypes.WCHAR)) @@ -230,10 +245,12 @@ class WindowsVirtualDiskUtilsTests(unittest.TestCase): kernel32.GetLogicalDriveStringsW.assert_called_once_with(1, buf) def test_get_cdrom_drive_mount_point_exception_buf_len(self): - self._test_get_cdrom_drive_mount_point(buf_len=0, ret_val=1) + self._test_get_cdrom_drive_mount_point(buf_len=0, ret_val=1, + last_error=100) def test_get_cdrom_drive_mount_point_exception_query(self): - self._test_get_cdrom_drive_mount_point(buf_len=1, ret_val=0) + self._test_get_cdrom_drive_mount_point(buf_len=1, ret_val=0, + last_error=100) def test_get_cdrom_drive_mount_point(self): self._test_get_cdrom_drive_mount_point(buf_len=1, ret_val=1) diff --git a/cloudbaseinit/utils/windows/network.py b/cloudbaseinit/utils/windows/network.py index df790c65..2f338cc6 100644 --- a/cloudbaseinit/utils/windows/network.py +++ b/cloudbaseinit/utils/windows/network.py @@ -99,7 +99,7 @@ def get_adapter_addresses(): if ret_val: raise exception.CloudbaseInitException( - "GetAdaptersAddresses failed") + "GetAdaptersAddresses failed: %r" % ret_val) p_curr_addr = p_addr while p_curr_addr: diff --git a/cloudbaseinit/utils/windows/physical_disk.py b/cloudbaseinit/utils/windows/physical_disk.py index cd52e075..fc363875 100644 --- a/cloudbaseinit/utils/windows/physical_disk.py +++ b/cloudbaseinit/utils/windows/physical_disk.py @@ -84,8 +84,8 @@ class PhysicalDisk(object): ctypes.byref(bytes_returned), 0) if not ret_val: - raise exception.CloudbaseInitException( - "Cannot get disk geometry") + raise exception.WindowsCloudbaseInitException( + "Cannot get disk geometry: %r") self._geom = geom return self._geom @@ -105,5 +105,6 @@ class PhysicalDisk(object): ret_val = kernel32.ReadFile(self._handle, buf, bytes_to_read, ctypes.byref(bytes_read), 0) if not ret_val: - raise exception.CloudbaseInitException("Read exception") + raise exception.WindowsCloudbaseInitException( + "Read exception: %r") return (buf, bytes_read.value) diff --git a/cloudbaseinit/utils/windows/virtual_disk.py b/cloudbaseinit/utils/windows/virtual_disk.py index db266330..df64333d 100644 --- a/cloudbaseinit/utils/windows/virtual_disk.py +++ b/cloudbaseinit/utils/windows/virtual_disk.py @@ -84,22 +84,23 @@ class VirtualDisk(object): self.OPEN_VIRTUAL_DISK_FLAG_NONE, 0, ctypes.byref(handle)) if ret_val: - raise exception.CloudbaseInitException("Cannot open virtual disk") + raise exception.WindowsCloudbaseInitException( + "Cannot open virtual disk: %r", ret_val) self._handle = handle def attach(self): ret_val = virtdisk.AttachVirtualDisk( self._handle, 0, self.ATTACH_VIRTUAL_DISK_FLAG_READ_ONLY, 0, 0, 0) if ret_val: - raise exception.CloudbaseInitException( - "Cannot attach virtual disk") + raise exception.WindowsCloudbaseInitException( + "Cannot attach virtual disk: %r", ret_val) def detach(self): ret_val = virtdisk.DetachVirtualDisk( self._handle, self.DETACH_VIRTUAL_DISK_FLAG_NONE, 0) if ret_val: - raise exception.CloudbaseInitException( - "Cannot detach virtual disk") + raise exception.WindowsCloudbaseInitException( + "Cannot detach virtual disk: %r", ret_val) def get_physical_path(self): buf = ctypes.create_unicode_buffer(1024) @@ -108,8 +109,8 @@ class VirtualDisk(object): ctypes.byref(bufLen), buf) if ret_val: - raise exception.CloudbaseInitException( - "Cannot get virtual disk physical path") + raise exception.WindowsCloudbaseInitException( + "Cannot get virtual disk physical path: %r", ret_val) return buf.value def get_cdrom_drive_mount_point(self): @@ -120,8 +121,8 @@ class VirtualDisk(object): buf_len = kernel32.GetLogicalDriveStringsW( ctypes.sizeof(buf) / ctypes.sizeof(wintypes.WCHAR), buf) if not buf_len: - raise exception.CloudbaseInitException( - "Cannot enumerate logical devices") + raise exception.WindowsCloudbaseInitException( + "Cannot enumerate logical devices: %r") cdrom_dev = self.get_physical_path().rsplit('\\')[-1].upper() @@ -135,8 +136,8 @@ class VirtualDisk(object): ctypes.sizeof(dev) / ctypes.sizeof(wintypes.WCHAR)) if not ret_val: - raise exception.CloudbaseInitException( - "Cannot query NT device") + raise exception.WindowsCloudbaseInitException( + "Cannot query NT device: %r") if dev.value.rsplit('\\')[-1].upper() == cdrom_dev: mount_point = curr_drive