Use tempfile for powervm config drive

There are potential security issues with using predictable temp
directories or files, so use python's tempfile module to do this
safely.

Change-Id: I5e23933af71180da1d55950fcf49e39b0b800ef5
Closes-Bug: #1771538
This commit is contained in:
Matthew Edmonds 2018-10-25 10:45:47 -04:00
parent 00a9526314
commit 54e501481d
4 changed files with 82 additions and 85 deletions

View File

@ -328,7 +328,8 @@ class TestPowerVMDriver(test.NoDBTestCase):
inst_on_disk()
@mock.patch('pypowervm.tasks.power_opts.PowerOnOpts')
@mock.patch('pypowervm.util.sanitize_file_name_for_api')
@mock.patch('nova_powervm.virt.powervm.media.ConfigDrivePowerVM.'
'get_cfg_drv_name')
@mock.patch('nova_powervm.virt.powervm.tasks.storage.'
'CreateAndConnectCfgDrive.execute')
@mock.patch('nova_powervm.virt.powervm.tasks.storage.ConnectVolume'
@ -345,7 +346,7 @@ class TestPowerVMDriver(test.NoDBTestCase):
def test_spawn_ops(self, mock_vdi, mock_pwron, mock_cfg_drv,
mock_plug_vifs, mock_plug_mgmt_vif, mock_boot_from_vol,
mock_crt_disk_img, mock_conn_vol, mock_crt_cfg_drv,
mock_sanitize, mock_pwron_opts):
mock_cfg_name, mock_pwron_opts):
"""Validates the 'typical' spawn flow of the spawn of an instance.
Uses a basic disk image, attaching networks and powering on.
@ -371,7 +372,7 @@ class TestPowerVMDriver(test.NoDBTestCase):
slot_mgr=self.slot_mgr)
mock_pwron.assert_called_once_with(self.apt, self.inst,
opts='fake-opts')
mock_sanitize.assert_not_called()
mock_cfg_name.assert_not_called()
# Assert that tasks that are not supposed to be called are not called
self.assertFalse(mock_conn_vol.called)
self.assertFalse(mock_crt_cfg_drv.called)
@ -379,20 +380,21 @@ class TestPowerVMDriver(test.NoDBTestCase):
lpars_exist=True)
@mock.patch('pypowervm.tasks.power_opts.PowerOnOpts')
@mock.patch('pypowervm.util.sanitize_file_name_for_api')
@mock.patch('nova_powervm.virt.powervm.tasks.network.PlugMgmtVif.execute')
@mock.patch('nova_powervm.virt.powervm.tasks.network.PlugVifs.execute')
@mock.patch('nova_powervm.virt.powervm.media.ConfigDrivePowerVM.'
'get_cfg_drv_name')
@mock.patch('nova_powervm.virt.powervm.media.ConfigDrivePowerVM.'
'create_cfg_drv_vopt')
@mock.patch('nova.virt.configdrive.required_by')
@mock.patch('nova_powervm.virt.powervm.vm.power_on')
def test_spawn_with_cfg(self, mock_pwron, mock_cfg_drv, mock_cfg_vopt,
mock_plug_vifs, mock_plug_mgmt_vif, mock_sanitize,
mock_pwron_opts):
mock_cfg_name, mock_plug_vifs,
mock_plug_mgmt_vif, mock_pwron_opts):
"""Validates the PowerVM spawn w/ config drive operations."""
# Set up the mocks to the tasks.
mock_cfg_drv.return_value = True
mock_sanitize.return_value = 'fake-name'
mock_cfg_name.return_value = 'fake-name'
self.flags(remove_vopt_media_on_boot=True, group='powervm')
mock_opts = mock.MagicMock()
mock_pwron_opts.return_value = mock_opts
@ -412,8 +414,7 @@ class TestPowerVMDriver(test.NoDBTestCase):
# Power on was called
mock_pwron.assert_called_once_with(self.apt, self.inst, opts=mock_opts)
mock_opts.remove_optical.assert_called_with('fake-name', time=60)
mock_sanitize.assert_called_with(
self.inst.name, prefix='cfg_', suffix='.iso', max_len=37)
mock_cfg_name.assert_called_with(self.inst)
self.scrub_stg.assert_called_with(mock.ANY, self.stg_ftsk,
lpars_exist=True)

View File

@ -18,8 +18,9 @@ from __future__ import absolute_import
import fixtures
import mock
from nova import test
from oslo_utils.fixture import uuidsentinel
from pypowervm import const as pvm_const
from pypowervm.tests import test_fixtures as pvm_fx
from pypowervm.wrappers import storage as pvm_stg
from pypowervm.wrappers import virtual_io_server as pvm_vios
@ -46,62 +47,71 @@ class TestConfigDrivePowerVM(test.NoDBTestCase):
def test_crt_cfg_dr_iso(self, mock_pvm_uuid, mock_mkdrv, mock_meta):
"""Validates that the image creation method works."""
cfg_dr_builder = m.ConfigDrivePowerVM(self.apt)
self.assertTrue(self.validate_vopt.called)
mock_instance = mock.MagicMock()
mock_instance.name = 'fake-instance'
mock_instance.uuid = '1e46bbfd-73b6-3c2a-aeab-a1d3f065e92f'
mock_files = mock.MagicMock()
mock_net = mock.MagicMock()
iso_path, file_name = cfg_dr_builder._create_cfg_dr_iso(mock_instance,
mock_files,
mock_net)
self.assertEqual('cfg_fake_instance.iso', file_name)
self.assertEqual('/tmp/cfgdrv/cfg_fake_instance.iso', iso_path)
iso_path = '/tmp/cfgdrv.iso'
cfg_dr_builder._create_cfg_dr_iso(mock_instance, mock_files, mock_net,
iso_path)
self.assertTrue(mock_pvm_uuid.called)
# Make sure the length is limited properly
mock_instance.name = 'fake-instance-with-name-that-is-too-long'
iso_path, file_name = cfg_dr_builder._create_cfg_dr_iso(mock_instance,
mock_files,
mock_net)
self.assertEqual('cfg_fake_instance_with_name_that_.iso', file_name)
self.assertEqual('/tmp/cfgdrv/cfg_fake_instance_with_name_that_.iso',
iso_path)
self.assertTrue(self.validate_vopt.called)
# Test retry vopt create
self.assertEqual(mock_mkdrv.call_count, 1)
# Test retry iso create
mock_mkdrv.reset_mock()
mock_mkdrv.side_effect = [OSError, mock_mkdrv]
mock_instance.name = 'fake-instance-2'
iso_path, file_name = cfg_dr_builder._create_cfg_dr_iso(mock_instance,
mock_files,
mock_net)
self.assertEqual('cfg_fake_instance_2.iso', file_name)
self.assertEqual('/tmp/cfgdrv/cfg_fake_instance_2.iso',
iso_path)
self.assertTrue(self.validate_vopt.called)
cfg_dr_builder._create_cfg_dr_iso(mock_instance, mock_files, mock_net,
iso_path)
self.assertEqual(mock_mkdrv.call_count, 2)
def test_get_cfg_drv_name(self):
cfg_dr_builder = m.ConfigDrivePowerVM(self.apt)
mock_instance = mock.MagicMock()
mock_instance.uuid = uuidsentinel.inst_id
# calculate expected file name
expected_file_name = 'cfg_' + mock_instance.uuid.replace('-', '')
allowed_len = pvm_const.MaxLen.VOPT_NAME - 4 # '.iso' is 4 chars
expected_file_name = expected_file_name[:allowed_len] + '.iso'
name = cfg_dr_builder.get_cfg_drv_name(mock_instance)
self.assertEqual(name, expected_file_name)
@mock.patch('nova_powervm.virt.powervm.media.ConfigDrivePowerVM.'
'_create_cfg_dr_iso', autospec=True)
@mock.patch('os.path.getsize', autospec=True)
@mock.patch('os.remove', autospec=True)
@mock.patch('nova_powervm.virt.powervm.media.ConfigDrivePowerVM.'
'_upload_vopt', autospec=True)
'get_cfg_drv_name')
@mock.patch('tempfile.NamedTemporaryFile', autospec=True)
@mock.patch('nova_powervm.virt.powervm.media.ConfigDrivePowerVM.'
'_attach_vopt')
def test_crt_cfg_drv_vopt(self, mock_attach, mock_upld, mock_rm, mock_size,
mock_cfg_iso):
@mock.patch('os.path.getsize', autospec=True)
@mock.patch('pypowervm.tasks.storage.upload_vopt', autospec=True)
@mock.patch('nova_powervm.virt.powervm.media.ConfigDrivePowerVM.'
'_create_cfg_dr_iso', autospec=True)
def test_crt_cfg_drv_vopt(self, mock_ccdi, mock_upl, mock_getsize,
mock_attach, mock_ntf, mock_name):
# Mock Returns
mock_cfg_iso.return_value = '/tmp/cfgdrv/fake.iso', 'fake.iso'
mock_size.return_value = 10000
mock_upld.return_value = (mock.Mock(), None)
cfg_dr_builder = m.ConfigDrivePowerVM(self.apt)
cfg_dr_builder.vios_uuid = 'vios_uuid'
mock_instance = mock.MagicMock()
mock_instance.uuid = uuidsentinel.inst_id
mock_upl.return_value = 'vopt', 'f_uuid'
fh = mock_ntf.return_value.__enter__.return_value
fh.name = 'iso_path'
mock_name.return_value = 'fake-name'
# Run
cfg_dr_builder = m.ConfigDrivePowerVM(self.apt)
cfg_dr_builder.create_cfg_drv_vopt(mock.MagicMock(), mock.MagicMock(),
mock.MagicMock(), 'fake_lpar')
self.assertTrue(mock_upld.called)
self.assertTrue(mock_attach.called)
mock_attach.assert_called_with(mock.ANY, 'fake_lpar', mock.ANY, None)
self.assertTrue(self.validate_vopt.called)
cfg_dr_builder.create_cfg_drv_vopt(mock_instance, 'files', 'netinfo',
'fake_lpar', admin_pass='pass')
mock_ntf.assert_called_once_with(mode='rb')
mock_ccdi.assert_called_once_with(cfg_dr_builder, mock_instance,
'files', 'netinfo', 'iso_path',
admin_pass='pass')
mock_getsize.assert_called_once_with('iso_path')
mock_upl.assert_called_once_with(self.apt, 'vios_uuid', fh,
'fake-name',
mock_getsize.return_value)
mock_attach.assert_called_once_with(mock_instance, 'fake_lpar',
'vopt', None)
@mock.patch('pypowervm.tasks.scsi_mapper.add_map', autospec=True)
@mock.patch('pypowervm.tasks.scsi_mapper.build_vscsi_mapping',

View File

@ -463,11 +463,7 @@ class PowerVMDriver(driver.ComputeDriver):
pwr_opts = pvm_popts.PowerOnOpts()
if CONF.powervm.remove_vopt_media_on_boot:
# Get the cfgdrive media name for the vopt removal task in PowerOn.
media_name = pvm_util.sanitize_file_name_for_api(
instance.name, prefix=media.CFG_DRV_PREFIX,
suffix=media.CFG_DRV_SUFFIX,
max_len=pvm_const.MaxLen.VOPT_NAME)
media_name = media.ConfigDrivePowerVM.get_cfg_drv_name(instance)
pwr_opts.remove_optical(
media_name, time=CONF.powervm.remove_vopt_media_time)

View File

@ -21,6 +21,7 @@ from nova.virt import configdrive
import os
import retrying
from taskflow import task
import tempfile
from oslo_log import log as logging
from oslo_utils import excutils
@ -44,9 +45,6 @@ CONF = cfg.CONF
_LLA_SUBNET = "fe80::/64"
CFG_DRV_PREFIX = "cfg_"
CFG_DRV_SUFFIX = ".iso"
class ConfigDrivePowerVM(object):
@ -83,7 +81,7 @@ class ConfigDrivePowerVM(object):
return network_info
def _create_cfg_dr_iso(self, instance, injected_files, network_info,
admin_pass=None):
iso_path, admin_pass=None):
"""Creates an ISO file that contains the injected files.
Used for config drive.
@ -92,9 +90,8 @@ class ConfigDrivePowerVM(object):
:param injected_files: A list of file paths that will be injected into
the ISO.
:param network_info: The network_info from the nova spawn method.
:param iso_path: The absolute file path for the new ISO
:param admin_pass: Optional password to inject for the VM.
:return iso_path: The path to the ISO
:return file_name: The file name for the ISO
"""
LOG.info("Creating config drive.", instance=instance)
extra_md = {}
@ -112,15 +109,6 @@ class ConfigDrivePowerVM(object):
# fix instance uuid to match uuid assigned to VM
inst_md.uuid = vm.get_pvm_uuid(instance).lower()
# Make sure the path exists.
im_path = CONF.powervm.image_meta_local_path
if not os.path.exists(im_path):
os.mkdir(im_path)
file_name = pvm_util.sanitize_file_name_for_api(
instance.name, prefix=CFG_DRV_PREFIX, suffix=CFG_DRV_SUFFIX,
max_len=pvm_const.MaxLen.VOPT_NAME)
iso_path = os.path.join(im_path, file_name)
with configdrive.ConfigDriveBuilder(instance_md=inst_md) as cdb:
LOG.info("Config drive ISO building to path %(iso_path)s.",
{'iso_path': iso_path}, instance=instance)
@ -134,9 +122,9 @@ class ConfigDrivePowerVM(object):
stop_max_attempt_number=2)
def _make_cfg_drive(iso_path):
cdb.make_drive(iso_path)
try:
_make_cfg_drive(iso_path)
return iso_path, file_name
except OSError:
with excutils.save_and_reraise_exception(logger=LOG):
# If we get here, that means there's an exception during
@ -145,6 +133,12 @@ class ConfigDrivePowerVM(object):
LOG.exception("Config drive ISO could not be built.",
instance=instance)
@staticmethod
def get_cfg_drv_name(instance):
return pvm_util.sanitize_file_name_for_api(
instance.uuid.replace('-', ''), prefix='cfg_', suffix='.iso',
max_len=pvm_const.MaxLen.VOPT_NAME)
def create_cfg_drv_vopt(self, instance, injected_files, network_info,
lpar_uuid, admin_pass=None, mgmt_cna=None,
stg_ftsk=None):
@ -169,15 +163,16 @@ class ConfigDrivePowerVM(object):
network_info = copy.deepcopy(network_info)
network_info.append(self._mgmt_cna_to_vif(mgmt_cna))
iso_path, file_name = self._create_cfg_dr_iso(instance, injected_files,
network_info, admin_pass)
# Pick a file name for when we upload the media to VIOS
file_name = self.get_cfg_drv_name(instance)
# Upload the media
file_size = os.path.getsize(iso_path)
vopt, f_uuid = self._upload_vopt(iso_path, file_name, file_size)
# Delete the media
os.remove(iso_path)
# Create and upload the media
with tempfile.NamedTemporaryFile(mode='rb') as fh:
self._create_cfg_dr_iso(instance, injected_files, network_info,
fh.name, admin_pass=admin_pass)
vopt, f_uuid = tsk_stg.upload_vopt(
self.adapter, self.vios_uuid, fh, file_name,
os.path.getsize(fh.name))
# Run the attach of the virtual optical
self._attach_vopt(instance, lpar_uuid, vopt, stg_ftsk)
@ -255,11 +250,6 @@ class ConfigDrivePowerVM(object):
for x in range(0, len(splits), 2)])
return ':'.join(ll)
def _upload_vopt(self, iso_path, file_name, file_size):
with open(iso_path, 'rb') as d_stream:
return tsk_stg.upload_vopt(self.adapter, self.vios_uuid, d_stream,
file_name, file_size)
def dlt_vopt(self, lpar_uuid, stg_ftsk=None, remove_mappings=True):
"""Deletes the virtual optical and scsi mappings for a VM.