Hyper-V: Shielded VMs

Shielded VMs in Windows Server 2016 protect virtual
machines from Hyper-V administrators with the help
of encryption technologies. Attaching vTPM devices
to the Hyper-V VMs offers users the posibility to
enhance their security and system integrity. This
blueprint implements this feature.

Depends-On: I28c54b1cab1ec98c60c7dba257291b1656429b80
Implements: blueprint hyper-v-shielded-vms
Change-Id: Ibfb2dfd254cebdc203c2beb01ec8ecf31cd010e7
This commit is contained in:
Simona Iuliana Toader 2016-02-23 01:41:52 -08:00
parent 820f5f8ff4
commit f37ce8b6bb
8 changed files with 497 additions and 10 deletions

View File

@ -103,6 +103,9 @@ REQUIRED = "required"
DISABLED = "disabled"
OPTIONAL = "optional"
IMAGE_PROP_VTPM = "os_vtpm"
IMAGE_PROP_VTPM_SHIELDED = "os_shielded_vm"
BOOT_DEVICE_FLOPPY = 0
BOOT_DEVICE_CDROM = 1
BOOT_DEVICE_HARDDISK = 2

View File

@ -197,7 +197,7 @@ class MigrationOps(object):
ephemerals = block_device_info['ephemerals']
self._check_ephemeral_disks(instance, ephemerals)
self._vmops.create_instance(instance, network_info,
self._vmops.create_instance(context, instance, network_info,
root_device, block_device_info, vm_gen,
image_meta)
@ -310,8 +310,9 @@ class MigrationOps(object):
ephemerals = block_device_info['ephemerals']
self._check_ephemeral_disks(instance, ephemerals, resize_instance)
self._vmops.create_instance(instance, network_info, root_device,
block_device_info, vm_gen, image_meta)
self._vmops.create_instance(context, instance, network_info,
root_device, block_device_info,
vm_gen, image_meta)
self._check_and_attach_config_drive(instance, vm_gen)
self._vmops.set_boot_order(vm_gen, block_device_info, instance_name)

79
hyperv/nova/pdk.py Normal file
View File

@ -0,0 +1,79 @@
# Copyright 2016 Cloudbase Solutions Srl
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from barbicanclient import client as barbican_client
from keystoneclient import session
from nova import exception
from os_win._i18n import _
class PDK(object):
def create_pdk(self, context, instance, image_meta, pdk_filepath):
"""Generates a pdk file using the barbican container referenced by
the image metadata or instance metadata. A pdk file is a shielding
data file which contains a RDP certificate, unattended file,
volume signature catalogs and guardian metadata.
"""
with open(pdk_filepath, 'wb') as pdk_file_handle:
pdk_reference = self._get_pdk_reference(instance, image_meta)
pdk_container = self._get_pdk_container(context, instance,
pdk_reference)
pdk_data = self._get_pdk_data(pdk_container)
pdk_file_handle.write(pdk_data)
def _get_pdk_reference(self, instance, image_meta):
image_pdk_ref = image_meta['properties'].get('img_pdk_reference')
boot_metadata_pdk_ref = instance.metadata.get('img_pdk_reference')
if not (image_pdk_ref or boot_metadata_pdk_ref):
reason = _('A reference to a barbican container containing the '
'pdk file must be passed as an image property. This '
'is required in order to enable VTPM')
raise exception.MissingParameter(instance_id=instance.uuid,
reason=reason)
return boot_metadata_pdk_ref or image_pdk_ref
def _get_pdk_container(self, context, instance, pdk_reference):
"""Retrieves the barbican container containing the pdk file.
"""
auth = context.get_auth_plugin()
sess = session.Session(auth=auth)
brb_client = barbican_client.Client(session=sess)
try:
pdk_container = brb_client.containers.get(pdk_reference)
except Exception as e:
err_msg = _("Retrieving barbican container with reference "
"%(pdk_reference)s failed with error: %(error)s") % {
'pdk_reference': pdk_reference,
'error': e}
raise exception.InvalidMetadata(instance_id=instance.uuid,
reason=err_msg)
return pdk_container
def _get_pdk_data(self, pdk_container):
"""Return the data from all barbican container's secrets.
"""
no_of_secrets = len(pdk_container.secrets)
data = bytes()
for index in range(no_of_secrets):
current_secret = pdk_container.secrets[str(index + 1)]
retrived_secret_data = current_secret.payload
data += retrived_secret_data
return data

View File

@ -49,6 +49,7 @@ from hyperv.nova import block_device_manager
from hyperv.nova import constants
from hyperv.nova import imagecache
from hyperv.nova import pathutils
from hyperv.nova import pdk
from hyperv.nova import serialconsoleops
from hyperv.nova import vif as vif_utils
from hyperv.nova import volumeops
@ -108,6 +109,7 @@ class VMOps(object):
self._vif_driver_cache = {}
self._block_device_manager = (
block_device_manager.BlockDeviceInfoManager())
self._pdk = pdk.PDK()
def list_instance_uuids(self):
instance_uuids = []
@ -273,7 +275,7 @@ class VMOps(object):
try:
with self.wait_vif_plug_events(instance, network_info):
self.create_instance(instance, network_info,
self.create_instance(context, instance, network_info,
root_device, block_device_info,
vm_gen, image_meta)
@ -359,7 +361,7 @@ class VMOps(object):
reason=reason)
return requires_secure_boot
def create_instance(self, instance, network_info, root_device,
def create_instance(self, context, instance, network_info, root_device,
block_device_info, vm_gen, image_meta):
instance_name = instance.name
instance_path = os.path.join(CONF.instances_path, instance_name)
@ -427,6 +429,8 @@ class VMOps(object):
image_meta)
self._vmutils.enable_secure_boot(instance.name,
certificate_required)
self._configure_secure_vm(context, instance, image_meta,
secure_boot_enabled)
def _attach_root_device(self, instance_name, root_dev_info):
if root_dev_info['type'] == constants.VOLUME:
@ -1024,3 +1028,106 @@ class VMOps(object):
if scope == 'storage_qos':
storage_qos_specs[key] = value
return self._volumeops.parse_disk_qos_specs(storage_qos_specs)
def _configure_secure_vm(self, context, instance, image_meta,
secure_boot_enabled):
"""Adds and enables a vTPM, encrypting the disks.
Shielding option implies encryption option enabled.
"""
requires_encryption = False
requires_shielded = self._feature_requested(
instance,
image_meta,
constants.IMAGE_PROP_VTPM_SHIELDED)
if not requires_shielded:
requires_encryption = self._feature_requested(
instance,
image_meta,
constants.IMAGE_PROP_VTPM)
if not (requires_shielded or requires_encryption):
return
self._check_vtpm_requirements(instance, image_meta,
secure_boot_enabled)
with self._pathutils.temporary_file('.fsk') as fsk_filepath, \
self._pathutils.temporary_file('.pdk') as pdk_filepath:
self._create_fsk(instance, fsk_filepath)
self._pdk.create_pdk(context, instance, image_meta, pdk_filepath)
self._vmutils.add_vtpm(instance.name, pdk_filepath,
shielded=requires_shielded)
LOG.info(_LI("VTPM was added."), instance=instance)
self._vmutils.provision_vm(instance.name, fsk_filepath,
pdk_filepath)
def _feature_requested(self, instance, image_meta, image_prop):
image_props = image_meta['properties']
image_prop_option = image_props.get(image_prop)
feature_requested = image_prop_option == constants.REQUIRED
return feature_requested
def _check_vtpm_requirements(self, instance, image_meta,
secure_boot_enabled):
if not secure_boot_enabled:
reason = _("Adding a vtpm requires secure boot to be enabled.")
raise exception.InstanceUnacceptable(
instance_id=instance.uuid, reason=reason)
os_type = image_meta.get('properties', {}).get('os_type')
if os_type not in os_win_const.VTPM_SUPPORTED_OS:
reason = _('vTPM is not supported for this OS type: %(os_type)s. '
' Supported OS types: %(supported_os_types)s') % {
'os_type': os_type,
'supported_os_types':
','.join(os for os in os_win_const.VTPM_SUPPORTED_OS)}
raise exception.InstanceUnacceptable(instance_id=instance.uuid,
reason=reason)
if not self._hostutils.is_host_guarded():
reason = _('This host in not guarded.')
raise exception.InstanceUnacceptable(instance_id=instance.uuid,
reason=reason)
def _create_fsk(self, instance, fsk_filepath):
"""Writes in the fsk file all the substitution strings and their
values which will populate the unattended file used when
creating the pdk.
"""
fsk_pairs = self._get_fsk_data(instance)
self._vmutils.populate_fsk(fsk_filepath, fsk_pairs)
def _get_fsk_data(self, instance):
"""The unattended file may contain substitution strings. Those with
their coresponding values are passed as metadata and will be added
to a fsk file.
"""
fsk_pairs = {'@@%s@@' % key.split('fsk:')[1]: value
for key, value in instance.metadata.items()
if key.startswith('fsk:')}
fsk_computername_key = '@@%s@@' % os_win_const.FSK_COMPUTERNAME
fsk_computer_name = fsk_pairs.get(fsk_computername_key)
if instance.hostname != fsk_computer_name and fsk_computer_name:
err_msg = _("The FSK mappings contain ComputerName "
"%(fsk_computer_name)s, which does not match the "
"instance name %(instance_name)s.") % {
'fsk_computer_name': fsk_computer_name,
'instance_name': instance.hostname}
raise exception.InstanceUnacceptable(instance_id=instance.uuid,
reason=err_msg)
# In case of not specifying the computer name as a FSK metadata value,
# it will be added by default in order to avoid a reboot when
# configuring the instance hostname
if not fsk_computer_name:
fsk_pairs[fsk_computername_key] = instance.hostname
return fsk_pairs

View File

@ -269,8 +269,8 @@ class MigrationOpsTestCase(test_base.HyperVBaseTestCase):
get_image_vm_gen.assert_called_once_with(
mock_instance.uuid, image_meta)
self._migrationops._vmops.create_instance.assert_called_once_with(
mock_instance, mock.sentinel.network_info, root_device,
block_device_info, get_image_vm_gen.return_value,
self.context, mock_instance, mock.sentinel.network_info,
root_device, block_device_info, get_image_vm_gen.return_value,
image_meta)
mock_check_attach_config_drive.assert_called_once_with(
mock_instance, get_image_vm_gen.return_value)
@ -446,8 +446,8 @@ class MigrationOpsTestCase(test_base.HyperVBaseTestCase):
get_image_vm_gen.assert_called_once_with(mock_instance.uuid,
mock.sentinel.image_meta)
self._migrationops._vmops.create_instance.assert_called_once_with(
mock_instance, mock.sentinel.network_info, root_device,
block_device_info, get_image_vm_gen.return_value,
self.context, mock_instance, mock.sentinel.network_info,
root_device, block_device_info, get_image_vm_gen.return_value,
mock.sentinel.image_meta)
mock_check_attach_config_drive.assert_called_once_with(
mock_instance, get_image_vm_gen.return_value)

View File

@ -0,0 +1,130 @@
# Copyright 2016 Cloudbase Solutions Srl
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from nova import exception
from hyperv.nova import pdk
from hyperv.tests.unit import test_base
from six.moves import builtins
class PDKTestCase(test_base.HyperVBaseTestCase):
_FAKE_PDK_FILE_PATH = 'C:\\path\\to\\fakepdk.pdk'
def setUp(self):
super(PDKTestCase, self).setUp()
self._pdk = pdk.PDK()
@mock.patch.object(builtins, 'open')
@mock.patch.object(pdk.PDK, '_get_pdk_data')
@mock.patch.object(pdk.PDK, '_get_pdk_container')
@mock.patch.object(pdk.PDK, '_get_pdk_reference')
def test_create_pdk(self, mock_get_pdk_reference, mock_get_pdk_container,
mock_get_pdk_data, mock_open):
mock_instance = mock.MagicMock()
pdk_file_handle = mock_open.return_value.__enter__.return_value
pdk_reference = mock_get_pdk_reference.return_value
pdk_container = mock_get_pdk_container.return_value
self._pdk.create_pdk(mock.sentinel.context,
mock_instance,
mock.sentinel.image_meta,
self._FAKE_PDK_FILE_PATH)
mock_get_pdk_reference.assert_called_once_with(
mock_instance, mock.sentinel.image_meta)
mock_get_pdk_container.assert_called_once_with(mock.sentinel.context,
mock_instance,
pdk_reference)
mock_get_pdk_data.assert_called_once_with(pdk_container)
pdk_file_handle.write.assert_called_once_with(
mock_get_pdk_data.return_value)
def _test_get_pdk_reference(self, pdk_reference=None,
image_meta_pdk_ref=None):
mock_instance = mock.MagicMock(
metadata={'img_pdk_reference': image_meta_pdk_ref})
image_meta = {
'properties': {'img_pdk_reference': pdk_reference}}
expected_result = image_meta_pdk_ref or pdk_reference
result = self._pdk._get_pdk_reference(mock_instance,
image_meta)
self.assertEqual(expected_result, result)
def test_get_pdk_boot_reference(self):
self._test_get_pdk_reference(
image_meta_pdk_ref=mock.sentinel.image_meta_pdk_ref)
def test_get_pdk_image_reference(self):
self._test_get_pdk_reference(pdk_reference=mock.sentinel.pdk_reference)
def test_get_pdk_no_reference(self):
image_meta = {'properties': {}}
mock_instance = mock.MagicMock(metadata={})
self.assertRaises(exception.MissingParameter,
self._pdk._get_pdk_reference,
mock_instance, image_meta)
@mock.patch('barbicanclient.client.Client')
@mock.patch('keystoneclient.session.Session')
def test_get_pdk_container(self, mock_session, mock_barbican_client):
instance = mock.MagicMock()
context = mock.MagicMock()
auth = context.get_auth_plugin.return_value
sess = mock_session.return_value
barbican_client = mock_barbican_client.return_value
barbican_client.containers.get.return_value = (
mock.sentinel.pdk_container)
result = self._pdk._get_pdk_container(context, instance,
mock.sentinel.pdk_reference)
self.assertEqual(mock.sentinel.pdk_container, result)
mock_session.assert_called_once_with(auth=auth)
mock_barbican_client.assert_called_once_with(session=sess)
@mock.patch('barbicanclient.client.Client')
@mock.patch('keystoneclient.session.Session')
def test_get_pdk_container_exception(self, mock_session,
mock_barbican_client):
instance = mock.MagicMock()
context = mock.MagicMock()
auth = context.get_auth_plugin.return_value
sess = mock_session.return_value
barbican_client = mock_barbican_client.return_value
barbican_client.containers.get.side_effect = [
exception.InvalidMetadata]
self.assertRaises(exception.InvalidMetadata,
self._pdk._get_pdk_container,
context,
instance,
mock.sentinel.pdk_reference)
mock_session.assert_called_once_with(auth=auth)
mock_barbican_client.assert_called_once_with(session=sess)
def test_get_pdk_data(self):
pdk_container = mock.MagicMock()
pdk_container.secrets = {'1': mock.MagicMock(payload=b'fake_secret1'),
'2': mock.MagicMock(payload=b'fake_secret2')}
response = self._pdk._get_pdk_data(pdk_container)
expected_result = b'fake_secret1fake_secret2'
self.assertEqual(expected_result, response)

View File

@ -32,6 +32,7 @@ import six
from hyperv.nova import block_device_manager
from hyperv.nova import constants
from hyperv.nova import pdk
from hyperv.nova import vmops
from hyperv.nova import volumeops
from hyperv.tests import fake_instance
@ -53,6 +54,8 @@ class VMOpsTestCase(test_base.HyperVBaseTestCase):
FAKE_LOG = 'fake_log'
_WIN_VERSION_6_3 = '6.3.0'
_WIN_VERSION_6_4 = '6.4.0'
_FAKE_PDK_FILE_PATH = 'C:\\path\\to\\fakepdk.pdk'
_FAKE_FSK_FILE_PATH = 'C:\\path\\to\\fakefsk.fsk'
ISO9660 = 'iso9660'
_FAKE_CONFIGDRIVE_PATH = 'C:/fake_instance_dir/configdrive.vhd'
@ -67,6 +70,7 @@ class VMOpsTestCase(test_base.HyperVBaseTestCase):
self._vmops._vhdutils = mock.MagicMock()
self._vmops._pathutils = mock.MagicMock()
self._vmops._hostutils = mock.MagicMock()
self._vmops._pdk = mock.MagicMock()
self._vmops._serial_console_ops = mock.MagicMock()
def test_get_vif_driver_cached(self):
@ -432,7 +436,7 @@ class VMOpsTestCase(test_base.HyperVBaseTestCase):
block_device_info['ephemerals'])
mock_get_image_vm_gen.assert_called_once_with(
mock_instance.uuid, mock_image_meta)
mock_create_instance.assert_called_once_with(
mock_create_instance.assert_called_once_with(self.context,
mock_instance, [fake_network_info], root_device_info,
block_device_info, fake_vm_gen, mock_image_meta)
mock_configdrive_required.assert_called_once_with(mock_instance)
@ -522,6 +526,7 @@ class VMOpsTestCase(test_base.HyperVBaseTestCase):
self.assertEqual([('network-vif-plugged', mock.sentinel.vif_id2)],
events)
@mock.patch.object(vmops.VMOps, '_configure_secure_vm')
@mock.patch.object(vmops.VMOps, '_requires_secure_boot')
@mock.patch.object(vmops.VMOps, '_requires_certificate')
@mock.patch('hyperv.nova.vif.get_vif_driver')
@ -540,6 +545,7 @@ class VMOpsTestCase(test_base.HyperVBaseTestCase):
mock_set_qos_specs, mock_get_vif_driver,
mock_requires_certificate,
mock_requires_secure_boot,
mock_configure_secure_vm,
enable_instance_metrics,
vm_gen=constants.VM_GEN_1, vnuma_enabled=False,
requires_sec_boot=True, remotefx=False):
@ -574,6 +580,7 @@ class VMOpsTestCase(test_base.HyperVBaseTestCase):
if remotefx is True and vm_gen == constants.VM_GEN_2:
self.assertRaises(os_win_exc.HyperVException,
self._vmops.create_instance,
context=self.context,
instance=mock_instance,
network_info=[fake_network_info],
block_device_info=block_device_info,
@ -582,6 +589,7 @@ class VMOpsTestCase(test_base.HyperVBaseTestCase):
image_meta=mock.sentinel.image_meta)
else:
self._vmops.create_instance(
context=self.context,
instance=mock_instance,
network_info=[fake_network_info],
block_device_info=block_device_info,
@ -634,6 +642,8 @@ class VMOpsTestCase(test_base.HyperVBaseTestCase):
enable_secure_boot = self._vmops._vmutils.enable_secure_boot
enable_secure_boot.assert_called_once_with(
mock_instance.name, mock_requires_certificate.return_value)
mock_configure_secure_vm.assert_called_once_with(self.context,
mock_instance, mock.sentinel.image_meta, requires_sec_boot)
def test_create_instance(self):
self._test_create_instance(enable_instance_metrics=True)
@ -1826,3 +1836,159 @@ class VMOpsTestCase(test_base.HyperVBaseTestCase):
def test_requires_certificate_os_type_none(self):
self._test_requires_certificate(os_type=None)
@mock.patch.object(vmops.VMOps, '_check_vtpm_requirements')
@mock.patch.object(vmops.VMOps, '_feature_requested')
@mock.patch.object(vmops.VMOps, '_create_fsk')
@mock.patch.object(pdk.PDK, 'create_pdk')
def _test_configure_secure_vm(self, mock_create_pdk, mock_create_fsk,
mock_feature_requested,
mock_check_vtpm_requirements,
requires_shielded, requires_encryption):
instance = mock.MagicMock()
mock_tmp_file = self._vmops._pathutils.temporary_file
mock_tmp_file.return_value.__enter__.side_effect = [
self._FAKE_FSK_FILE_PATH, self._FAKE_PDK_FILE_PATH]
mock_feature_requested.side_effect = [requires_shielded,
requires_encryption]
self._vmops._configure_secure_vm(mock.sentinel.context, instance,
mock.sentinel.image_meta,
mock.sentinel.secure_boot_enabled)
expected_calls = [mock.call(instance,
mock.sentinel.image_meta,
constants.IMAGE_PROP_VTPM_SHIELDED)]
if not requires_shielded:
expected_calls.append(mock.call(instance,
mock.sentinel.image_meta,
constants.IMAGE_PROP_VTPM))
mock_feature_requested.has_calls(expected_calls)
mock_check_vtpm_requirements.assert_called_with(instance,
mock.sentinel.image_meta, mock.sentinel.secure_boot_enabled)
self._vmops._vmutils.add_vtpm.assert_called_once_with(
instance.name, self._FAKE_PDK_FILE_PATH,
shielded=requires_shielded)
self._vmops._vmutils.provision_vm.assert_called_once_with(
instance.name, self._FAKE_FSK_FILE_PATH, self._FAKE_PDK_FILE_PATH)
def test_configure_secure_vm_shielded(self):
self._test_configure_secure_vm(requires_shielded=True,
requires_encryption=True)
def test_configure_secure_vm_encryption(self):
self._test_configure_secure_vm(requires_shielded=False,
requires_encryption=True)
@mock.patch.object(vmops.VMOps, '_check_vtpm_requirements')
@mock.patch.object(vmops.VMOps, '_feature_requested')
def test_configure_regular_vm(self, mock_feature_requested,
mock_check_vtpm_requirements):
mock_feature_requested.side_effect = [False, False]
self._vmops._configure_secure_vm(mock.sentinel.context,
mock.MagicMock(),
mock.sentinel.image_meta,
mock.sentinel.secure_boot_enabled)
self.assertFalse(mock_check_vtpm_requirements.called)
def _test_feature_requested(self, image_prop, image_prop_required):
mock_instance = mock.MagicMock()
mock_image_meta = {'properties': {image_prop: image_prop_required}}
feature_requested = image_prop_required == constants.REQUIRED
result = self._vmops._feature_requested(mock_instance,
mock_image_meta,
image_prop)
self.assertEqual(feature_requested, result)
def test_vtpm_image_required(self):
self._test_feature_requested(
image_prop=constants.IMAGE_PROP_VTPM_SHIELDED,
image_prop_required=constants.REQUIRED)
def test_vtpm_image_disabled(self):
self._test_feature_requested(
image_prop=constants.IMAGE_PROP_VTPM_SHIELDED,
image_prop_required=constants.DISABLED)
def _test_check_vtpm_requirements(self, os_type='windows',
secure_boot_enabled=True,
guarded_host=True):
mock_instance = mock.MagicMock()
mock_image_meta = {'properties': {'os_type': os_type}}
guarded_host = self._vmops._hostutils.is_host_guarded.return_value
if (not secure_boot_enabled or not guarded_host or
os_type not in os_win_const.VTPM_SUPPORTED_OS):
self.assertRaises(exception.InstanceUnacceptable,
self._vmops._check_vtpm_requirements,
mock_instance,
mock_image_meta,
secure_boot_enabled)
else:
self._vmops._check_vtpm_requirements(mock_instance,
mock_image_meta,
secure_boot_enabled)
def test_vtpm_requirements_all_satisfied(self):
self._test_check_vtpm_requirements()
def test_vtpm_requirement_no_secureboot(self):
self._test_check_vtpm_requirements(secure_boot_enabled=False)
def test_vtpm_requirement_not_supported_os(self):
self._test_check_vtpm_requirements(
os_type=mock.sentinel.unsupported_os)
def test_vtpm_requirement_host_not_guarded(self):
self._test_check_vtpm_requirements(guarded_host=False)
@mock.patch.object(vmops.VMOps, '_get_fsk_data')
def test_create_fsk(self, mock_get_fsk_data):
mock_instance = mock.MagicMock()
fsk_pairs = mock_get_fsk_data.return_value
self._vmops._create_fsk(mock_instance, mock.sentinel.fsk_filename)
mock_get_fsk_data.assert_called_once_with(mock_instance)
self._vmops._vmutils.populate_fsk.assert_called_once_with(
mock.sentinel.fsk_filename, fsk_pairs)
def _test_get_fsk_data(self, metadata, instance_name,
expected_fsk_pairs=None):
mock_instance = mock.MagicMock()
mock_instance.metadata = metadata
mock_instance.hostname = instance_name
result = self._vmops._get_fsk_data(mock_instance)
self.assertEqual(expected_fsk_pairs, result)
def test_get_fsk_data_no_computername(self):
metadata = {'TimeZone': mock.sentinel.timezone}
expected_fsk_pairs = {'@@ComputerName@@': mock.sentinel.instance_name}
self._test_get_fsk_data(metadata,
mock.sentinel.instance_name,
expected_fsk_pairs)
def test_get_fsk_data_with_computername(self):
metadata = {'fsk:ComputerName': mock.sentinel.instance_name,
'fsk:TimeZone': mock.sentinel.timezone}
expected_fsk_pairs = {'@@ComputerName@@': mock.sentinel.instance_name,
'@@TimeZone@@': mock.sentinel.timezone}
self._test_get_fsk_data(metadata,
mock.sentinel.instance_name,
expected_fsk_pairs)
def test_get_fsk_data_computername_exception(self):
mock_instance = mock.MagicMock()
mock_instance.metadata = {
'fsk:ComputerName': mock.sentinel.computer_name,
'fsk:TimeZone': mock.sentinel.timezone}
mock_instance.hostname = mock.sentinel.instance_name
self.assertRaises(exception.InstanceUnacceptable,
self._vmops._get_fsk_data,
mock_instance)

View File

@ -14,3 +14,4 @@ oslo.utils>=3.5.0 # Apache-2.0
oslo.i18n>=2.1.0 # Apache-2.0
eventlet!=0.18.3,>=0.18.2 # MIT
python-barbicanclient>=4.0.0 # Apache-2.0