Add a new cloud-config plugin for setting the timezone

cloud-config supports a new plugin, called 'set-timezone', which
can be used to change the timezone on the underlying instance.
The patch adds a new method in the osutils abstraction, called `set_timezone`,
which should be implemented  by each separated OS. The abstraction calls
into cloudbaseinit.utils.windows.timezone, another layer of abstraction
over two API methods, SetTimeZoneInformation for Windows 2003 and older
and SetDynamicTimeZoneInformation, for newer versions of Windows,
which also handles Daylight Saving Time.
The plugin supports standard IANA timezone names, which are then translated
to the Windows-specific timezone names, using tzlocal library.

Change-Id: I18674e1ae078fc69f3fb938065ba01a4de5464a1
This commit is contained in:
Claudiu Popa 2015-03-19 17:16:58 +02:00
parent 0073c7b86b
commit b10917f9b0
11 changed files with 717 additions and 42 deletions

View File

@ -110,3 +110,7 @@ class BaseOSUtils(object):
def get_maximum_password_length(self):
"""Obtain the maximum password length tailored for each OS."""
raise NotImplementedError()
def set_timezone(self, timezone):
"""Set the timezone for this instance."""
raise NotImplementedError()

View File

@ -23,6 +23,7 @@ import time
import pywintypes
import six
from six.moves import winreg
from tzlocal import windows_tz
from win32com import client
import win32process
import win32security
@ -33,6 +34,8 @@ from cloudbaseinit.openstack.common import log as logging
from cloudbaseinit.osutils import base
from cloudbaseinit.utils import encoding
from cloudbaseinit.utils.windows import network
from cloudbaseinit.utils.windows import privilege
from cloudbaseinit.utils.windows import timezone
LOG = logging.getLogger(__name__)
@ -296,24 +299,14 @@ class WindowsUtils(base.BaseOSUtils):
_FW_SCOPE_ALL = 0
_FW_SCOPE_LOCAL_SUBNET = 1
def _enable_shutdown_privilege(self):
process = win32process.GetCurrentProcess()
token = win32security.OpenProcessToken(
process,
win32security.TOKEN_ADJUST_PRIVILEGES |
win32security.TOKEN_QUERY)
priv_luid = win32security.LookupPrivilegeValue(
None, win32security.SE_SHUTDOWN_NAME)
privilege = [(priv_luid, win32security.SE_PRIVILEGE_ENABLED)]
win32security.AdjustTokenPrivileges(token, False, privilege)
def reboot(self):
self._enable_shutdown_privilege()
ret_val = advapi32.InitiateSystemShutdownW(0, "Cloudbase-Init reboot",
0, True, True)
if not ret_val:
raise exception.WindowsCloudbaseInitException("Reboot failed: %r")
with privilege.acquire_privilege(win32security.SE_SHUTDOWN_NAME):
ret_val = advapi32.InitiateSystemShutdownW(
0, "Cloudbase-Init reboot",
0, True, True)
if not ret_val:
raise exception.WindowsCloudbaseInitException(
"Reboot failed: %r")
def _get_user_wmi_object(self, username):
conn = wmi.WMI(moniker='//./root/cimv2')
@ -1047,3 +1040,10 @@ class WindowsUtils(base.BaseOSUtils):
def get_maximum_password_length(self):
return 20
def set_timezone(self, timezone_name):
windows_name = windows_tz.tz_win.get(timezone_name)
if not windows_name:
raise exception.CloudbaseInitException(
"The given timezone name is unrecognised: %r" % timezone_name)
timezone.Timezone(windows_name).set(self)

View File

@ -20,6 +20,8 @@ from cloudbaseinit.utils import classloader
PLUGINS = {
'write_files': 'cloudbaseinit.plugins.common.userdataplugins.'
'cloudconfigplugins.write_files.WriteFilesPlugin',
'set_timezone': 'cloudbaseinit.plugins.common.userdataplugins.'
'cloudconfigplugins.set_timezone.SetTimezonePlugin',
}

View File

@ -0,0 +1,42 @@
# Copyright 2015 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cloudbaseinit.openstack.common import log as logging
from cloudbaseinit.osutils import factory
from cloudbaseinit.plugins.common.userdataplugins.cloudconfigplugins import (
base
)
LOG = logging.getLogger(__name__)
class SetTimezonePlugin(base.BaseCloudConfigPlugin):
"""Change the timezone for the underlying platform.
This uses IANA timezone names (which are mapped to the Windows
time zone names, as seen in the following link:
https://technet.microsoft.com/en-us/library/cc749073%28v=ws.10%29.aspx).
For instance, to change the timezone to 'America/Montevideo', use
this syntax::
set_timezone: America/Montevideo
"""
def process(self, data):
LOG.info("Changing timezone to %r", data)
osutils = factory.get_os_utils()
osutils.set_timezone(data)

View File

@ -51,6 +51,7 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
self._moves_mock = mock.MagicMock()
self._xmlrpc_client_mock = mock.MagicMock()
self._ctypes_mock = mock.MagicMock()
self._tzlocal_mock = mock.Mock()
self._module_patcher = mock.patch.dict(
'sys.modules',
@ -61,7 +62,8 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
'six.moves': self._moves_mock,
'six.moves.xmlrpc_client': self._xmlrpc_client_mock,
'ctypes': self._ctypes_mock,
'pywintypes': self._pywintypes_mock})
'pywintypes': self._pywintypes_mock,
'tzlocal': self._tzlocal_mock})
self._module_patcher.start()
self.windows_utils = importlib.import_module(
@ -78,31 +80,10 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
def tearDown(self):
self._module_patcher.stop()
def test_enable_shutdown_privilege(self):
fake_process = mock.MagicMock()
fake_token = True
LUID = 'fakeid'
self._win32process_mock.GetCurrentProcess.return_value = fake_process
self._win32security_mock.OpenProcessToken.return_value = fake_token
self._win32security_mock.LookupPrivilegeValue.return_value = LUID
self._winutils._enable_shutdown_privilege()
privilege = [(LUID,
self._win32security_mock.SE_PRIVILEGE_ENABLED)]
self._win32security_mock.AdjustTokenPrivileges.assert_called_with(
fake_token,
False,
privilege)
self._win32security_mock.OpenProcessToken.assert_called_with(
fake_process, self._win32security_mock.TOKEN_ADJUST_PRIVILEGES |
self._win32security_mock.TOKEN_QUERY)
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
'._enable_shutdown_privilege')
def _test_reboot(self, mock_enable_shutdown_privilege, ret_value,
@mock.patch('cloudbaseinit.osutils.windows.privilege')
def _test_reboot(self, mock_privilege_module, ret_value,
expected_ret_value=None):
mock_privilege_module.acquire_privilege = mock.MagicMock()
advapi32 = self._windll_mock.advapi32
advapi32.InitiateSystemShutdownW = mock.MagicMock(
return_value=ret_value)
@ -118,6 +99,8 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
0,
"Cloudbase-Init reboot",
0, True, True)
mock_privilege_module.acquire_privilege.assert_called_once_with(
self._win32security_mock.SE_SHUTDOWN_NAME)
def test_reboot(self):
self._test_reboot(ret_value=True)
@ -1477,3 +1460,32 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
def test_get_password_maximum_length(self):
self.assertEqual(20, self._winutils.get_maximum_password_length())
@mock.patch('cloudbaseinit.osutils.windows.windows_tz')
def test_set_timezone_fails(self, mock_windows_tz):
mock_windows_tz.tz_win.get.return_value = None
with self.assertRaises(exception.CloudbaseInitException) as cm:
self._winutils.set_timezone(mock.sentinel.timezone)
expected = (
"The given timezone name is unrecognised: %r"
% mock.sentinel.timezone
)
self.assertEqual(expected, str(cm.exception))
mock_windows_tz.tz_win.get.assert_called_once_with(
mock.sentinel.timezone)
@mock.patch('cloudbaseinit.osutils.windows.timezone')
@mock.patch('cloudbaseinit.osutils.windows.windows_tz')
def test_set_timezone(self, mock_windows_tz, mock_timezone):
mock_windows_tz.tz_win.get.return_value = (
mock.sentinel.windows_timezone)
self._winutils.set_timezone(mock.sentinel.timezone)
mock_windows_tz.tz_win.get.assert_called_once_with(
mock.sentinel.timezone)
mock_timezone.Timezone.assert_called_once_with(
mock.sentinel.windows_timezone)
mock_timezone.Timezone.return_value.set.assert_called_once_with(
self._winutils)

View File

@ -0,0 +1,54 @@
# Copyright 2015 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import unittest
try:
import unittest.mock as mock
except ImportError:
import mock
from cloudbaseinit.plugins.common.userdataplugins import cloudconfig
from cloudbaseinit.plugins.common.userdataplugins.cloudconfigplugins import (
set_timezone
)
from cloudbaseinit.tests import testutils
class TestSetTimezone(unittest.TestCase):
@mock.patch('cloudbaseinit.plugins.common.userdataplugins.'
'cloudconfigplugins.set_timezone.factory')
def test_process(self, mock_osutils_factory):
with testutils.LogSnatcher('cloudbaseinit.plugins.common.'
'userdataplugins.cloudconfigplugins.'
'set_timezone') as snatcher:
set_timezone.SetTimezonePlugin().process(mock.sentinel.timezone)
expected_logging = [
'Changing timezone to %r' % mock.sentinel.timezone
]
mock_osutils_factory.get_os_utils.assert_called_once_with()
mock_osutils = mock_osutils_factory.get_os_utils.return_value
mock_osutils.set_timezone.assert_called_once_with(
mock.sentinel.timezone)
self.assertEqual(expected_logging, snatcher.output)
@mock.patch('cloudbaseinit.plugins.common.userdataplugins.'
'cloudconfigplugins.set_timezone.SetTimezonePlugin.process')
def test_timezone_dispatch(self, mock_process_plugin):
plugin = cloudconfig.CloudConfigPlugin()
plugin.process_non_multipart("set_timezone: America Standard Time")
mock_process_plugin.assert_called_once_with("America Standard Time")

View File

@ -0,0 +1,63 @@
# Copyright 2015 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import importlib
import unittest
try:
import unittest.mock as mock
except ImportError:
import mock
class TestPrivilege(unittest.TestCase):
def setUp(self):
self._win32process_mock = mock.MagicMock()
self._win32security_mock = mock.MagicMock()
self._module_patcher = mock.patch.dict(
'sys.modules',
{'win32process': self._win32process_mock,
'win32security': self._win32security_mock})
self._module_patcher.start()
self.privilege_module = importlib.import_module(
"cloudbaseinit.utils.windows.privilege")
def tearDown(self):
self._module_patcher.stop()
def test_privilege_context_manager(self):
fake_process = mock.MagicMock()
fake_token = True
LUID = 'fakeid'
self._win32process_mock.GetCurrentProcess.return_value = fake_process
self._win32security_mock.OpenProcessToken.return_value = fake_token
self._win32security_mock.LookupPrivilegeValue.return_value = LUID
privilege_enabled = [(LUID,
self._win32security_mock.SE_PRIVILEGE_ENABLED)]
privilege_removed = [(LUID,
self._win32security_mock.SE_PRIVILEGE_REMOVED)]
with self.privilege_module.acquire_privilege(mock.sentinel.privilege):
self._win32security_mock.AdjustTokenPrivileges.assert_called_with(
fake_token, False, privilege_enabled)
self._win32security_mock.OpenProcessToken.assert_called_with(
fake_process,
self._win32security_mock.TOKEN_ADJUST_PRIVILEGES |
self._win32security_mock.TOKEN_QUERY)
self._win32security_mock.AdjustTokenPrivileges.assert_called_with(
fake_token, False, privilege_removed)

View File

@ -0,0 +1,271 @@
# Copyright 2015 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import importlib
import os
import struct
import unittest
try:
import unittest.mock as mock
except ImportError:
import mock
from cloudbaseinit import exception
class FakeWindowsError(Exception):
pass
class TestTimezone(unittest.TestCase):
def setUp(self):
self._mock_moves = mock.MagicMock()
self._mock_winreg = mock.Mock()
self._mock_ctypes = mock.Mock()
self._mock_win32security = mock.Mock()
self._mock_win32process = mock.Mock()
self._mock_wintypes = mock.MagicMock()
self._mock_ctypes.wintypes = self._mock_wintypes
self._module_patcher = mock.patch.dict(
'sys.modules',
{'ctypes': self._mock_ctypes,
'six.moves': self._mock_moves,
'win32process': self._mock_win32process,
'win32security': self._mock_win32security})
self._module_patcher.start()
self._mock_moves.winreg = self._mock_winreg
self._timezone_module = importlib.import_module(
'cloudbaseinit.utils.windows.timezone')
self._timezone_module.WindowsError = FakeWindowsError
self._fixture_timezone_info = [
0, 'StandardName', list(range(8)),
3, "DaylightName", list(reversed(range(8))), 6,
]
def tearDown(self):
self._module_patcher.stop()
@mock.patch('cloudbaseinit.utils.windows.timezone.SYSTEMTIME')
@mock.patch('cloudbaseinit.utils.windows.timezone.Timezone.'
'_get_timezone_info', new=mock.MagicMock())
def test__create_system_time(self, mock_systemtime):
values = list(range(8))
timezoneobj = self._timezone_module.Timezone(mock.sentinel.timezone)
result = timezoneobj._create_system_time(values)
mock_systemtime.assert_called_once_with()
self.assertEqual(tuple(range(8)),
(result.wYear, result.wMonth, result.wDayOfWeek,
result.wDay, result.wHour, result.wMinute,
result.wSecond, result.wMilliseconds))
@mock.patch('cloudbaseinit.utils.windows.timezone.Timezone.'
'_create_system_time')
@mock.patch('cloudbaseinit.utils.windows.timezone.TIME_ZONE_INFORMATION')
@mock.patch('cloudbaseinit.utils.windows.timezone.Timezone.'
'_get_timezone_info')
def test__get_timezone_struct(self, mock_get_timezone_info,
mock_time_zone_information,
mock_create_system_time):
mock_get_timezone_info.return_value = self._fixture_timezone_info
timezoneobj = self._timezone_module.Timezone(mock.sentinel.timezone)
result = timezoneobj._get_timezone_struct()
mock_time_zone_information.assert_called_once_with()
self.assertEqual(0, result.Bias)
self.assertEqual('StandardName', result.StandardName)
self.assertEqual(result.StandardDate,
mock_create_system_time.return_value)
self.assertEqual(result.DaylightDate,
mock_create_system_time.return_value)
self.assertEqual(3, result.StandardBias)
self.assertEqual("DaylightName", result.DaylightName)
self.assertEqual(6, result.DaylightBias)
@mock.patch('cloudbaseinit.utils.windows.timezone.Timezone.'
'_create_system_time')
@mock.patch('cloudbaseinit.utils.windows.timezone.'
'DYNAMIC_TIME_ZONE_INFORMATION')
@mock.patch('cloudbaseinit.utils.windows.timezone.Timezone.'
'_get_timezone_info')
def test__get_dynamic_timezone_struct(self, mock_get_timezone_info,
mock_dynamic_time_zone_information,
mock_create_system_time):
mock_get_timezone_info.return_value = self._fixture_timezone_info
timezoneobj = self._timezone_module.Timezone("timezone name")
result = timezoneobj._get_dynamic_timezone_struct()
mock_dynamic_time_zone_information.assert_called_once_with()
self.assertEqual(0, result.Bias)
self.assertEqual('StandardName', result.StandardName)
self.assertEqual(3, result.StandardBias)
self.assertEqual("DaylightName", result.DaylightName)
self.assertEqual(6, result.DaylightBias)
self.assertFalse(result.DynamicDaylightTimeDisabled)
self.assertEqual("timezone name", result.TimeZoneKeyName)
self.assertEqual(result.StandardDate,
mock_create_system_time.return_value)
self.assertEqual(result.DaylightDate,
mock_create_system_time.return_value)
@mock.patch('cloudbaseinit.utils.windows.timezone.Timezone.'
'_unpack_timezone_info')
def test__get_timezone_info(self, mock_unpack_timezone_info):
mock_unpack_timezone_info.return_value = range(7)
registry_key = mock.MagicMock()
self._mock_winreg.OpenKey.return_value = registry_key
self._timezone_module.Timezone("timezone test")
self._mock_winreg.OpenKey.assert_called_once_with(
self._mock_winreg.HKEY_LOCAL_MACHINE,
os.path.join(self._timezone_module.REG_TIME_ZONES,
"timezone test"))
mock_unpack_timezone_info.assert_called_once_with(
registry_key.__enter__.return_value)
def test__get_time_zone_info_reraise_cloudbaseinit_exception(self):
error = FakeWindowsError()
error.errno = self._timezone_module.NOT_FOUND
self._mock_winreg.OpenKey.side_effect = error
with self.assertRaises(exception.CloudbaseInitException) as cm:
self._timezone_module.Timezone("timezone test")
self.assertEqual("Timezone 'timezone test' not found",
str(cm.exception))
def test__get_time_zone_info_reraise_exception(self):
error = FakeWindowsError()
error.errno = 404
self._mock_winreg.OpenKey.side_effect = error
with self.assertRaises(FakeWindowsError) as cm:
self._timezone_module.Timezone("timezone test")
self.assertIsInstance(cm.exception, FakeWindowsError)
self.assertEqual(404, cm.exception.errno)
@mock.patch('cloudbaseinit.utils.windows.timezone.Timezone.'
'_query_tz_key')
def test__get_time_zone_info_real_data(self, mock_query_tz_key):
orig_unpack = struct.unpack
def unpacker(format, blob):
if format == "l":
format = "i"
return orig_unpack(format, blob)
mock_query_tz_key.return_value = (
b'\xf0\x00\x00\x00\x00\x00\x00\x00\xc4\xff\xff\xff\x00\x00'
b'\x0b\x00\x00\x00\x01\x00\x02\x00\x00\x00\x00\x00\x00\x00'
b'\x00\x00\x03\x00\x00\x00\x02\x00\x02\x00\x00\x00\x00\x00'
b'\x00\x00',
"Atlantic Standard Time",
"Atlantic Daylight Time",
)
registry_key = mock.MagicMock()
self._mock_winreg.OpenKey.return_value = registry_key
with mock.patch('struct.unpack', side_effect=unpacker):
timezoneobj = self._timezone_module.Timezone("timezone test")
mock_query_tz_key.assert_called_once_with(registry_key.__enter__())
self.assertEqual(240, timezoneobj.bias)
self.assertEqual(-60, timezoneobj.daylight_bias)
self.assertEqual((0, 3, 0, 2, 2, 0, 0, 0),
timezoneobj.daylight_date)
self.assertEqual('Atlantic Daylight Time', timezoneobj.daylight_name)
self.assertEqual(0, timezoneobj.standard_bias)
self.assertEqual((0, 11, 0, 1, 2, 0, 0, 0),
timezoneobj.standard_date)
self.assertEqual('Atlantic Standard Time', timezoneobj.standard_name)
@mock.patch('cloudbaseinit.utils.windows.timezone.Timezone.'
'_get_timezone_info')
@mock.patch('cloudbaseinit.utils.windows.timezone.Timezone.'
'_set_dynamic_time_zone_information')
@mock.patch('cloudbaseinit.utils.windows.timezone.Timezone.'
'_set_time_zone_information')
def _test_set_time_zone_information(
self, mock__set_time_zone_information,
mock__set_dynamic_time_zone_information,
mock_get_timezone_info, windows_60=True):
mock_osutils = mock.Mock()
mock_osutils.check_os_version.return_value = windows_60
mock_get_timezone_info.return_value = self._fixture_timezone_info
timezoneobj = self._timezone_module.Timezone("fake")
timezoneobj.set(mock_osutils)
if windows_60:
mock__set_dynamic_time_zone_information.assert_called_once_with()
else:
mock__set_time_zone_information.assert_called_once_with()
def test_set_daylight_not_supported(self):
self._test_set_time_zone_information(windows_60=False)
def test_set_daylight_supported(self):
self._test_set_time_zone_information(windows_60=True)
@mock.patch('cloudbaseinit.utils.windows.privilege.acquire_privilege')
@mock.patch('cloudbaseinit.utils.windows.timezone.Timezone.'
'_get_timezone_info')
@mock.patch('cloudbaseinit.utils.windows.timezone.Timezone.'
'_get_timezone_struct')
@mock.patch('cloudbaseinit.utils.windows.timezone.Timezone.'
'_get_dynamic_timezone_struct')
def _test__set_time_zone_information(
self, mock__get_dynamic_timezone_struct,
mock__get_timezone_struct,
mock_get_timezone_info,
mock_acquire_privilege,
windows_60=True,
privilege=None):
mock_get_timezone_info.return_value = self._fixture_timezone_info
mock__get_timezone_struct.return_value = (
mock.sentinel.timezone_struct,
)
mock__get_dynamic_timezone_struct.return_value = (
mock.sentinel.timezone_struct,
)
timezoneobj = self._timezone_module.Timezone("fake")
if windows_60:
timezoneobj._set_dynamic_time_zone_information()
mock__get_dynamic_timezone_struct.assert_called_once_with()
else:
timezoneobj._set_time_zone_information()
mock__get_timezone_struct.assert_called_once_with()
mock_acquire_privilege.assert_called_once_with(privilege)
if windows_60:
self._mock_ctypes.windll.kernel32.SetDynamicTimeZoneInformation(
self._mock_ctypes.byref(mock.sentinel.timezone_struct))
else:
self._mock_ctypes.windll.kernel32.SetTimeZoneInformation(
self._mock_ctypes.byref(mock.sentinel.timezone_struct))
def test__set_time_zone_information(self):
self._test__set_time_zone_information(
windows_60=False,
privilege=self._mock_win32security.SE_SYSTEMTIME_NAME)
def test__set_dynamic_time_zone_information(self):
self._test__set_time_zone_information(
windows_60=True,
privilege=self._mock_win32security.SE_TIME_ZONE_NAME)

View File

@ -0,0 +1,36 @@
# Copyright 2015 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import win32process
import win32security
@contextlib.contextmanager
def acquire_privilege(privilege):
process = win32process.GetCurrentProcess()
token = win32security.OpenProcessToken(
process,
win32security.TOKEN_ADJUST_PRIVILEGES |
win32security.TOKEN_QUERY)
priv_luid = win32security.LookupPrivilegeValue(None, privilege)
privilege_enable = [(priv_luid, win32security.SE_PRIVILEGE_ENABLED)]
privilege_disable = [(priv_luid, win32security.SE_PRIVILEGE_REMOVED)]
win32security.AdjustTokenPrivileges(token, False, privilege_enable)
try:
yield
finally:
win32security.AdjustTokenPrivileges(token, False, privilege_disable)

View File

@ -0,0 +1,190 @@
# Copyright 2015 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ctypes
from ctypes import wintypes
import os
import struct
from six.moves import winreg
import win32security
from cloudbaseinit import exception
from cloudbaseinit.utils.windows import privilege
REG_TIME_ZONES = "Software\\Microsoft\\Windows NT\\CurrentVersion\\Time Zones"
NOT_FOUND = 2
kernel32 = ctypes.windll.kernel32
class SYSTEMTIME(ctypes.Structure):
_fields_ = [
('wYear', wintypes.WORD),
('wMonth', wintypes.WORD),
('wDayOfWeek', wintypes.WORD),
('wDay', wintypes.WORD),
('wHour', wintypes.WORD),
('wMinute', wintypes.WORD),
('wMilliseconds', wintypes.WORD),
]
class TIME_ZONE_INFORMATION(ctypes.Structure):
_fields_ = [
('Bias', wintypes.LONG),
('StandardName', wintypes.WCHAR * 32),
('StandardDate', SYSTEMTIME),
('StandardBias', wintypes.LONG),
('DaylightName', wintypes.WCHAR * 32),
('DaylightDate', SYSTEMTIME),
('DaylightBias', wintypes.LONG),
]
class DYNAMIC_TIME_ZONE_INFORMATION(ctypes.Structure):
_fields_ = [
('Bias', wintypes.LONG),
('StandardName', wintypes.WCHAR * 32),
('StandardDate', SYSTEMTIME),
('StandardBias', wintypes.LONG),
('DaylightName', wintypes.WCHAR * 32),
('DaylightDate', SYSTEMTIME),
('DaylightBias', wintypes.LONG),
('TimeZoneKeyName', wintypes.WCHAR * 128),
('DynamicDaylightTimeDisabled', wintypes.BOOLEAN),
]
class Timezone(object):
"""Class which holds details about a particular timezone.
It also can be used to change the current timezone,
by calling the :meth:`~set`. The supported time zone names
are the ones found here:
https://technet.microsoft.com/en-us/library/cc749073%28v=ws.10%29.aspx
"""
def __init__(self, name):
self._name = name
self._timezone_info = self._get_timezone_info()
# Public API.
self.bias = self._timezone_info[0]
self.standard_name = self._timezone_info[1]
self.standard_date = self._timezone_info[2]
self.standard_bias = self._timezone_info[3]
self.daylight_name = self._timezone_info[4]
self.daylight_date = self._timezone_info[5]
self.daylight_bias = self._timezone_info[6]
@staticmethod
def _create_system_time(values):
mtime = SYSTEMTIME()
mtime.wYear = values[0]
mtime.wMonth = values[1]
mtime.wDayOfWeek = values[2]
mtime.wDay = values[3]
mtime.wHour = values[4]
mtime.wMinute = values[5]
mtime.wSecond = values[6]
mtime.wMilliseconds = values[7]
return mtime
def _get_timezone_struct(self):
info = TIME_ZONE_INFORMATION()
info.Bias = self.bias
info.StandardName = self.standard_name
info.StandardDate = self._create_system_time(self.standard_date)
info.StandardBias = self.standard_bias
info.DaylightName = self.daylight_name
info.DaylightBias = self.daylight_bias
info.DaylightDate = self._create_system_time(self.daylight_date)
return info
def _get_dynamic_timezone_struct(self):
info = DYNAMIC_TIME_ZONE_INFORMATION()
info.Bias = self.bias
info.StandardName = self.standard_name
info.StandardDate = self._create_system_time(self.standard_date)
info.StandardBias = self.standard_bias
info.DaylightName = self.daylight_name
info.DaylightBias = self.daylight_bias
info.DaylightDate = self._create_system_time(self.daylight_date)
# TODO(cpopa): should this flag be controllable?
info.DynamicDaylightTimeDisabled = False
info.TimeZoneKeyName = self._name
return info
def _get_timezone_info(self):
keyname = os.path.join(REG_TIME_ZONES, self._name)
try:
with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, keyname) as key:
return self._unpack_timezone_info(key)
except WindowsError as exc:
if exc.errno == NOT_FOUND:
raise exception.CloudbaseInitException(
"Timezone %r not found" % self._name)
else:
raise
@staticmethod
def _unpack_system_time(tzi, offset):
# Unpack the values of a TIME_ZONE_INFORMATION structure
# from the given blob, starting at the given offset.
return [struct.unpack("H", tzi[index: index + 2])[0]
for index in range(offset, offset + 16, 2)]
@staticmethod
def _query_tz_key(key):
tzi = winreg.QueryValueEx(key, "TZI")[0]
daylight_name = winreg.QueryValueEx(key, "Dlt")[0]
standard_name = winreg.QueryValueEx(key, "Std")[0]
return tzi, standard_name, daylight_name
def _unpack_timezone_info(self, key):
# Get information about the current timezone from the given
# registry key.
tzi, standard_name, daylight_name = self._query_tz_key(key)
bias, = struct.unpack("l", tzi[:4])
standard_bias, = struct.unpack("l", tzi[4:8])
daylight_bias, = struct.unpack("l", tzi[8:12])
standard_date = self._unpack_system_time(tzi, 12)
daylight_date = self._unpack_system_time(tzi, 12 + 16)
return (bias, standard_name, tuple(standard_date),
standard_bias, daylight_name,
tuple(daylight_date), daylight_bias)
def _set_time_zone_information(self):
info = self._get_timezone_struct()
with privilege.acquire_privilege(win32security.SE_SYSTEMTIME_NAME):
kernel32.SetTimeZoneInformation(ctypes.byref(info))
def _set_dynamic_time_zone_information(self):
info = self._get_dynamic_timezone_struct()
with privilege.acquire_privilege(win32security.SE_TIME_ZONE_NAME):
kernel32.SetDynamicTimeZoneInformation(ctypes.byref(info))
def set(self, osutils):
"""Change the underlying timezone with this one.
This will use SetDynamicTimeZoneInformation on Windows Vista+ and
for Windows 2003 it will fallback to SetTimeZoneInformation, which
doesn't handle Daylight Saving Time.
"""
if osutils.check_os_version(6, 0):
self._set_dynamic_time_zone_information()
else:
self._set_time_zone_information()

View File

@ -1,3 +1,4 @@
pywin32
comtypes
wmi
tzlocal