Adds boot configuration plugins

BootStatusPolicyPlugin: sets the boot status policy via a configurable
option.

BCDConfigPlugin: ensures that the boot device has a unique disk id
and sets the BCD auto recovery mode. Both actions have corresponding
configurable options.

Change-Id: I9d41d27c8570f9376d0d3de2f676323b031ddf77
Co-Authored-By: Stefan Caraiman <scaraiman@cloudbasesolutions.com>
Implements: blueprint bootconfig-plugin
This commit is contained in:
Alessandro Pilotti 2017-02-13 17:24:23 +02:00 committed by Alexandru Coman
parent 6337a12d2c
commit 0ca5b4d830
No known key found for this signature in database
GPG Key ID: A7B6A9021F704507
8 changed files with 526 additions and 6 deletions

View File

@ -192,6 +192,18 @@ class GlobalOptions(conf_base.Options):
cfg.BoolOpt(
'rdp_set_keepalive', default=True,
help='Sets the RDP KeepAlive policy'),
cfg.StrOpt(
'bcd_boot_status_policy',
default=None,
choices=[constant.POLICY_IGNORE_ALL_FAILURES],
help='Sets the Windows BCD boot status policy'),
cfg.BoolOpt(
'bcd_enable_auto_recovery', default=False,
help='Enables or disables the BCD auto recovery'),
cfg.BoolOpt(
'set_unique_boot_disk_id', default=True,
help='Sets a new random unique id on the boot disk to avoid '
'collisions'),
]
self._cli_options = [

View File

@ -29,6 +29,7 @@ CD_LOCATIONS = {
"partition",
}
POLICY_IGNORE_ALL_FAILURES = "ignoreallfailures"
CLEAR_TEXT_INJECTED_ONLY = 'clear_text_injected_only'
ALWAYS_CHANGE = 'always'
NEVER_CHANGE = 'no'

View File

@ -23,6 +23,10 @@ class ItemNotFoundException(CloudbaseInitException):
pass
class InvalidStateException(CloudbaseInitException):
pass
class ServiceException(Exception):
"""Base exception for all the metadata services related errors."""

View File

@ -0,0 +1,63 @@
# Copyright (c) 2017 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as oslo_logging
from cloudbaseinit import conf as cloudbaseinit_conf
from cloudbaseinit.plugins.common import base
from cloudbaseinit.utils.windows import bootconfig
from cloudbaseinit.utils.windows import disk
CONF = cloudbaseinit_conf.CONF
LOG = oslo_logging.getLogger(__name__)
class BootStatusPolicyPlugin(base.BasePlugin):
def execute(self, service, shared_data):
if CONF.bcd_boot_status_policy:
LOG.info("Configuring boot status policy: %s",
CONF.bcd_boot_status_policy)
bootconfig.set_boot_status_policy(CONF.bcd_boot_status_policy)
return base.PLUGIN_EXECUTION_DONE, False
def get_os_requirements(self):
return 'win32', (6, 0)
class BCDConfigPlugin(base.BasePlugin):
@staticmethod
def _set_unique_disk_id(phys_disk_path):
# A unique disk ID is needed to avoid disk signature collisions
# https://blogs.technet.microsoft.com/markrussinovich/2011/11/06/fixing-disk-signature-collisions/
LOG.info("Setting unique id on disk: %s", phys_disk_path)
with disk.Disk(phys_disk_path, allow_write=True) as d:
d.set_unique_id()
def execute(self, service, shared_data):
if CONF.set_unique_boot_disk_id:
if len(bootconfig.get_boot_system_devices()) == 1:
LOG.info("Configuring boot device")
bootconfig.set_current_bcd_device_to_boot_partition()
# TODO(alexpilotti): get disk number from volume
self._set_unique_disk_id(u"\\\\.\\PHYSICALDRIVE0")
bootconfig.enable_auto_recovery(CONF.bcd_enable_auto_recovery)
return base.PLUGIN_EXECUTION_DONE, False
def get_os_requirements(self):
return 'win32', (6, 0)

View File

@ -0,0 +1,116 @@
# Copyright (c) 2017 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import importlib
import unittest
try:
import unittest.mock as mock
except ImportError:
import mock
from cloudbaseinit import conf as cloudbaseinit_conf
from cloudbaseinit.plugins.common import base
from cloudbaseinit.tests import testutils
CONF = cloudbaseinit_conf.CONF
MODPATH = "cloudbaseinit.plugins.windows.bootconfig"
class BootConfigPluginTest(unittest.TestCase):
def setUp(self):
self.mock_wmi = mock.MagicMock()
self._moves_mock = mock.MagicMock()
patcher = mock.patch.dict(
"sys.modules",
{
"wmi": self.mock_wmi,
"six.moves": self._moves_mock,
'ctypes': mock.MagicMock(),
'ctypes.windll': mock.MagicMock(),
'ctypes.wintypes': mock.MagicMock(),
'winioctlcon': mock.MagicMock()
}
)
patcher.start()
self.addCleanup(patcher.stop)
bootconfig = importlib.import_module(MODPATH)
self.boot_policy_plugin = bootconfig.BootStatusPolicyPlugin()
self.bcd_config = bootconfig.BCDConfigPlugin()
self.snatcher = testutils.LogSnatcher(MODPATH)
@testutils.ConfPatcher("bcd_boot_status_policy", True)
@mock.patch("cloudbaseinit.utils.windows.bootconfig."
"set_boot_status_policy")
def _test_execute_policy_plugin(self, mock_set_boot_status_policy,
mock_service=None, mock_shared_data=None):
expected_res = (base.PLUGIN_EXECUTION_DONE, False)
expected_logs = [
"Configuring boot status policy: %s" % CONF.bcd_boot_status_policy]
with self.snatcher:
res = self.boot_policy_plugin.execute(mock_service,
mock_shared_data)
self.assertEqual(res, expected_res)
self.assertEqual(self.snatcher.output, expected_logs)
mock_set_boot_status_policy.assert_called_once_with(
CONF.bcd_boot_status_policy)
def test_execute_set_bootstatus_policy(self):
self._test_execute_policy_plugin()
@mock.patch("cloudbaseinit.utils.windows.disk.Disk")
def test_set_unique_disk_id(self, mock_disk):
fake_disk_path = mock.sentinel.path
mock_physical_disk = mock.MagicMock()
expected_logs = ["Setting unique id on disk: %s" % fake_disk_path]
mock_disk.__enter__.return_value = mock_physical_disk
with self.snatcher:
self.bcd_config._set_unique_disk_id(fake_disk_path)
self.assertEqual(self.snatcher.output, expected_logs)
mock_disk.assert_called_once_with(fake_disk_path, allow_write=True)
@testutils.ConfPatcher("set_unique_boot_disk_id", True)
@mock.patch(MODPATH + ".BCDConfigPlugin._set_unique_disk_id")
@mock.patch("cloudbaseinit.utils.windows.bootconfig."
"enable_auto_recovery")
@mock.patch("cloudbaseinit.utils.windows.bootconfig."
"set_current_bcd_device_to_boot_partition")
@mock.patch("cloudbaseinit.utils.windows.bootconfig."
"get_boot_system_devices")
def test_execute_bcd_config(self, mock_get_boot,
mock_set_current_bcd,
mock_enable_auto_recovery,
mock_set_unique_disk_id):
mock_service = mock.Mock()
mock_shared_data = mock.Mock()
expected_res = (base.PLUGIN_EXECUTION_DONE, False)
expected_logs = ["Configuring boot device"]
mock_get_boot.return_value = "1"
with self.snatcher:
res_execute = self.bcd_config.execute(mock_service,
mock_shared_data)
self.assertEqual(self.snatcher.output, expected_logs)
self.assertEqual(res_execute, expected_res)
mock_get_boot.assert_called_once_with()
mock_set_current_bcd.assert_called_once_with()
mock_set_unique_disk_id.assert_called_once_with(
u"\\\\.\\PHYSICALDRIVE0")
def test_get_os_requirements(self):
expected_res = ('win32', (6, 0))
res_plugin = self.boot_policy_plugin.get_os_requirements()
res_config = self.bcd_config.get_os_requirements()
for res in (res_plugin, res_config):
self.assertEqual(res, expected_res)

View File

@ -0,0 +1,174 @@
# Copyright (c) 2017 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import importlib
import unittest
try:
import unittest.mock as mock
except ImportError:
import mock
from cloudbaseinit import exception
from cloudbaseinit.tests import testutils
MODPATH = "cloudbaseinit.utils.windows.bootconfig"
class BootConfigTest(unittest.TestCase):
def setUp(self):
self._wmi_mock = mock.MagicMock()
self._module_patcher = mock.patch.dict(
'sys.modules', {
'wmi': self._wmi_mock})
self.snatcher = testutils.LogSnatcher(MODPATH)
self._module_patcher.start()
self.bootconfig = importlib.import_module(MODPATH)
def tearDown(self):
self._module_patcher.stop()
@mock.patch('cloudbaseinit.osutils.factory.get_os_utils')
def _test_run_bcdedit(self, mock_get_os_utils, ret_val=0):
mock_osutils = mock.Mock()
mock_get_os_utils.return_value = mock_osutils
mock_args = [mock.sentinel.args]
expected_call = ["bcdedit.exe"] + mock_args
mock_osutils.execute_system32_process.return_value = (
mock.sentinel.out_val, mock.sentinel.err, ret_val)
if ret_val:
self.assertRaises(exception.CloudbaseInitException,
self.bootconfig._run_bcdedit, mock_args)
else:
self.bootconfig._run_bcdedit(mock_args)
mock_osutils.execute_system32_process.assert_called_once_with(
expected_call)
def test_run_bcdedit(self):
self._test_run_bcdedit()
def test_run_bcdedit_fail(self):
self._test_run_bcdedit(ret_val=1)
@mock.patch(MODPATH + "._run_bcdedit")
def test_set_boot_status_policy(self, mock_run_bcdedit):
fake_policy = mock.sentinel.policy
expected_logs = ["Setting boot status policy: %s" % fake_policy]
with self.snatcher:
self.bootconfig.set_boot_status_policy(fake_policy)
mock_run_bcdedit.assert_called_once_with(
["/set", "{current}", "bootstatuspolicy", fake_policy])
self.assertEqual(expected_logs, self.snatcher.output)
def test_get_boot_system_devices(self):
mock_vol = mock.Mock()
mock_win32volume = mock.MagicMock()
mock_id = mock.sentinel.id
mock_vol.DeviceID = mock_id
conn = self._wmi_mock.WMI
conn.return_value = mock_win32volume
mock_win32volume.Win32_Volume.return_value = [mock_vol]
expected_call_args = {"BootVolume": True, "SystemVolume": True}
res = self.bootconfig.get_boot_system_devices()
mock_win32volume.Win32_Volume.assert_called_once_with(
**expected_call_args)
self.assertEqual(res, [mock_id])
def _test_get_current_bcd_store(self, mock_success=True, mock_store=None):
conn = self._wmi_mock.WMI
store = self._wmi_mock._wmi_object
mock_store = mock.Mock()
mock_bcdstore = mock.MagicMock()
conn.return_value = mock_bcdstore
store.return_value = mock_store
mock_bcdstore.BcdStore.OpenStore.return_value = (mock_success,
mock_store)
if not mock_success:
self.assertRaises(
exception.CloudbaseInitException,
self.bootconfig._get_current_bcd_store)
else:
mock_store.OpenObject.return_value = [None, mock_success]
res_store = self.bootconfig._get_current_bcd_store()
self.assertEqual(res_store, mock_store)
def test_get_current_bcd_store(self):
self._test_get_current_bcd_store()
def test_get_current_bcd_store_fail(self):
self._test_get_current_bcd_store(mock_success=False)
@mock.patch(MODPATH + "._get_current_bcd_store")
def _test_set_current_bcd_device_to_boot_partition(
self, mock_get_current_bcd_store, side_effects=True,
success_set_os=True, success_set_app=True):
mock_store = mock.Mock()
mock_get_current_bcd_store.return_value = mock_store
mock_store.SetDeviceElement.side_effect = ([success_set_os],
[success_set_app])
if not success_set_os:
self.assertRaises(
exception.CloudbaseInitException,
self.bootconfig.set_current_bcd_device_to_boot_partition)
self.assertEqual(mock_store.SetDeviceElement.call_count, 1)
elif success_set_os and not success_set_app:
self.assertRaises(
exception.CloudbaseInitException,
self.bootconfig.set_current_bcd_device_to_boot_partition)
self.assertEqual(mock_store.SetDeviceElement.call_count, 2)
elif success_set_os and success_set_app:
self.bootconfig.set_current_bcd_device_to_boot_partition()
self.assertEqual(mock_store.SetDeviceElement.call_count, 2)
mock_get_current_bcd_store.assert_called_once_with()
def test_set_current_bcd_device_to_boot_partition_success(self):
self._test_set_current_bcd_device_to_boot_partition()
def test_set_current_bcd_device_to_boot_partition_fail_os(self):
self._test_set_current_bcd_device_to_boot_partition(
success_set_os=False)
def test_set_current_bcd_device_to_boot_partition_fail_app(self):
self._test_set_current_bcd_device_to_boot_partition(
success_set_app=False)
@mock.patch(MODPATH + "._get_current_bcd_store")
def _test_enable_auto_recovery(self, mock_get_current_bcd_store,
mock_success=True, mock_enable=True):
mock_store = mock.Mock()
mock_get_current_bcd_store.return_value = mock_store
mock_store.SetBooleanElement.side_effect = ((mock_success,),)
expected_call = (
self.bootconfig.BCDLIBRARY_BOOLEAN_AUTO_RECOVERY_ENABLED,
mock_enable)
if not mock_success:
self.assertRaises(exception.CloudbaseInitException,
self.bootconfig.enable_auto_recovery,
mock_enable)
else:
self.bootconfig.enable_auto_recovery(enable=mock_enable)
mock_store.SetBooleanElement.assert_called_once_with(
*expected_call)
def test_enable_auto_recovery(self):
self._test_enable_auto_recovery()
def test_enable_auto_recovery_failed(self):
self._test_enable_auto_recovery(mock_success=False)

View File

@ -0,0 +1,96 @@
# Copyright 2017 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as oslo_logging
import wmi
from cloudbaseinit import constant
from cloudbaseinit import exception
from cloudbaseinit.osutils import factory as osutils_factory
LOG = oslo_logging.getLogger(__name__)
STORE_CURRENT = "{fa926493-6f1c-4193-a414-58f0b2456d1e}"
BCDOSLOADER_DEVICE_OSDEVICE = 0x21000001
BCDLIBRARY_DEVICE_APPLICATION_DEVICE = 0x11000001
BCDLIBRARY_BOOLEAN_AUTO_RECOVERY_ENABLED = 0x16000009
BOOT_DEVICE = 1
def _run_bcdedit(bcdedit_args):
args = ["bcdedit.exe"] + bcdedit_args
osutils = osutils_factory.get_os_utils()
(out, err, ret_val) = osutils.execute_system32_process(args)
if ret_val:
raise exception.CloudbaseInitException(
'bcdedit failed.\nOutput: %(out)s\nError:'
' %(err)s' % {'out': out, 'err': err})
def set_boot_status_policy(policy=constant.POLICY_IGNORE_ALL_FAILURES):
LOG.debug("Setting boot status policy: %s", policy)
_run_bcdedit(["/set", "{current}", "bootstatuspolicy", policy])
def get_boot_system_devices():
conn = wmi.WMI(moniker='//./root/cimv2')
return [v.DeviceID for v in conn.Win32_Volume(
BootVolume=True, SystemVolume=True)]
def _get_current_bcd_store():
conn = wmi.WMI(moniker='//./root/wmi')
success, store = conn.BcdStore.OpenStore(File="")
if not success:
raise exception.CloudbaseInitException("Cannot open BCD store")
store = wmi._wmi_object(store)
current_store, success = store.OpenObject(Id=STORE_CURRENT)
current_store = wmi._wmi_object(current_store)
if not success:
raise exception.CloudbaseInitException("Cannot open BCD current store")
return current_store
def set_current_bcd_device_to_boot_partition():
current_store = _get_current_bcd_store()
success, = current_store.SetDeviceElement(
Type=BCDOSLOADER_DEVICE_OSDEVICE, DeviceType=BOOT_DEVICE,
AdditionalOptions="")
if not success:
raise exception.CloudbaseInitException(
"Cannot set device element: %s" % BCDOSLOADER_DEVICE_OSDEVICE)
success, = current_store.SetDeviceElement(
Type=BCDLIBRARY_DEVICE_APPLICATION_DEVICE, DeviceType=BOOT_DEVICE,
AdditionalOptions="")
if not success:
raise exception.CloudbaseInitException(
"Cannot set device element: %s" %
BCDLIBRARY_DEVICE_APPLICATION_DEVICE)
def enable_auto_recovery(enable):
current_store = _get_current_bcd_store()
success, = current_store.SetBooleanElement(
BCDLIBRARY_BOOLEAN_AUTO_RECOVERY_ENABLED, enable)
if not success:
raise exception.CloudbaseInitException(
"Cannot set boolean element: %s" %
BCDLIBRARY_BOOLEAN_AUTO_RECOVERY_ENABLED)

View File

@ -17,6 +17,7 @@ import abc
import ctypes
from ctypes import windll
from ctypes import wintypes
import random
import re
import six
@ -26,6 +27,7 @@ from cloudbaseinit import exception
kernel32 = windll.kernel32
rpcrt4 = windll.rpcrt4
class Win32_DiskGeometry(ctypes.Structure):
@ -57,8 +59,9 @@ class GUID(ctypes.Structure):
("data4", wintypes.BYTE * 8)
]
def __init__(self, l, w1, w2, b1, b2, b3, b4, b5, b6, b7, b8):
self.data1 = l
def __init__(self, dw=0, w1=0, w2=0, b1=0, b2=0, b3=0, b4=0, b5=0, b6=0,
b7=0, b8=0):
self.data1 = dw
self.data2 = w1
self.data3 = w2
self.data4[0] = b1
@ -153,19 +156,22 @@ class BaseDevice(object):
"""
GENERIC_READ = 0x80000000
GENERIC_WRITE = 0x40000000
FILE_SHARE_READ = 1
FILE_SHARE_WRITE = 2
OPEN_EXISTING = 3
FILE_ATTRIBUTE_READONLY = 1
INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value
FILE_BEGIN = 0
INVALID_SET_FILE_POINTER = 0xFFFFFFFF
def __init__(self, path):
def __init__(self, path, allow_write=False):
self._path = path
self._handle = None
self._sector_size = None
self._disk_size = None
self._allow_write = allow_write
self.fixed = None
def __repr__(self):
@ -224,13 +230,22 @@ class BaseDevice(object):
return buff.raw[:bytes_read.value] # all bytes without the null byte
def open(self):
access = self.GENERIC_READ
share_mode = self.FILE_SHARE_READ
if self._allow_write:
access |= self.GENERIC_WRITE
share_mode |= self.FILE_SHARE_WRITE
attributes = 0
else:
attributes = self.FILE_ATTRIBUTE_READONLY
handle = kernel32.CreateFileW(
ctypes.c_wchar_p(self._path),
self.GENERIC_READ,
self.FILE_SHARE_READ,
access,
share_mode,
0,
self.OPEN_EXISTING,
self.FILE_ATTRIBUTE_READONLY,
attributes,
0)
if handle == self.INVALID_HANDLE_VALUE:
raise exception.WindowsCloudbaseInitException(
@ -299,6 +314,45 @@ class Disk(BaseDevice):
"Cannot get disk layout: %r")
return layout
@staticmethod
def _create_guid():
guid = GUID()
ret_val = rpcrt4.UuidCreate(ctypes.byref(guid))
if ret_val:
raise exception.CloudbaseInitException(
"UuidCreate failed: %r" % ret_val)
return guid
def set_unique_id(self, unique_id=None):
layout = self._get_layout()
if layout.PartitionStyle == self.PARTITION_STYLE_MBR:
if not unique_id:
unique_id = random.randint(-2147483648, 2147483647)
layout.Mbr.Signature = unique_id
elif layout.PartitionStyle == self.PARTITION_STYLE_GPT:
if not unique_id:
unique_id = self._create_guid()
layout.Gpt.DiskId = unique_id
else:
raise exception.InvalidStateException(
"A unique id can be set on MBR or GPT partitions only")
bytes_returned = wintypes.DWORD()
ret_val = kernel32.DeviceIoControl(
self._handle, winioctlcon.IOCTL_DISK_SET_DRIVE_LAYOUT_EX,
ctypes.byref(layout), ctypes.sizeof(layout), 0, 0,
ctypes.byref(bytes_returned), 0)
if not ret_val:
raise exception.WindowsCloudbaseInitException(
"Cannot set disk layout: %r")
ret_val = kernel32.DeviceIoControl(
self._handle, winioctlcon.IOCTL_DISK_UPDATE_PROPERTIES, 0, 0, 0, 0,
ctypes.byref(bytes_returned), 0)
if not ret_val:
raise exception.WindowsCloudbaseInitException(
"Cannot update cached disk properties: %r")
def _get_partition_indexes(self, layout):
partition_style = layout.PartitionStyle
if partition_style not in (self.PARTITION_STYLE_MBR,