Use tempfile for powervm config drive

There are potential security issues with using predictable temp
directories or files, so use python's tempfile module to do this
safely.

Change-Id: I5e23933af71180da1d55950fcf49e39b0b800ef5
Closes-Bug: #1771538
This commit is contained in:
Matthew Edmonds 2018-10-25 10:45:47 -04:00
parent 00a9526314
commit 54e501481d
4 changed files with 82 additions and 85 deletions

View File

@ -328,7 +328,8 @@ class TestPowerVMDriver(test.NoDBTestCase):
inst_on_disk() inst_on_disk()
@mock.patch('pypowervm.tasks.power_opts.PowerOnOpts') @mock.patch('pypowervm.tasks.power_opts.PowerOnOpts')
@mock.patch('pypowervm.util.sanitize_file_name_for_api') @mock.patch('nova_powervm.virt.powervm.media.ConfigDrivePowerVM.'
'get_cfg_drv_name')
@mock.patch('nova_powervm.virt.powervm.tasks.storage.' @mock.patch('nova_powervm.virt.powervm.tasks.storage.'
'CreateAndConnectCfgDrive.execute') 'CreateAndConnectCfgDrive.execute')
@mock.patch('nova_powervm.virt.powervm.tasks.storage.ConnectVolume' @mock.patch('nova_powervm.virt.powervm.tasks.storage.ConnectVolume'
@ -345,7 +346,7 @@ class TestPowerVMDriver(test.NoDBTestCase):
def test_spawn_ops(self, mock_vdi, mock_pwron, mock_cfg_drv, def test_spawn_ops(self, mock_vdi, mock_pwron, mock_cfg_drv,
mock_plug_vifs, mock_plug_mgmt_vif, mock_boot_from_vol, mock_plug_vifs, mock_plug_mgmt_vif, mock_boot_from_vol,
mock_crt_disk_img, mock_conn_vol, mock_crt_cfg_drv, mock_crt_disk_img, mock_conn_vol, mock_crt_cfg_drv,
mock_sanitize, mock_pwron_opts): mock_cfg_name, mock_pwron_opts):
"""Validates the 'typical' spawn flow of the spawn of an instance. """Validates the 'typical' spawn flow of the spawn of an instance.
Uses a basic disk image, attaching networks and powering on. Uses a basic disk image, attaching networks and powering on.
@ -371,7 +372,7 @@ class TestPowerVMDriver(test.NoDBTestCase):
slot_mgr=self.slot_mgr) slot_mgr=self.slot_mgr)
mock_pwron.assert_called_once_with(self.apt, self.inst, mock_pwron.assert_called_once_with(self.apt, self.inst,
opts='fake-opts') opts='fake-opts')
mock_sanitize.assert_not_called() mock_cfg_name.assert_not_called()
# Assert that tasks that are not supposed to be called are not called # Assert that tasks that are not supposed to be called are not called
self.assertFalse(mock_conn_vol.called) self.assertFalse(mock_conn_vol.called)
self.assertFalse(mock_crt_cfg_drv.called) self.assertFalse(mock_crt_cfg_drv.called)
@ -379,20 +380,21 @@ class TestPowerVMDriver(test.NoDBTestCase):
lpars_exist=True) lpars_exist=True)
@mock.patch('pypowervm.tasks.power_opts.PowerOnOpts') @mock.patch('pypowervm.tasks.power_opts.PowerOnOpts')
@mock.patch('pypowervm.util.sanitize_file_name_for_api')
@mock.patch('nova_powervm.virt.powervm.tasks.network.PlugMgmtVif.execute') @mock.patch('nova_powervm.virt.powervm.tasks.network.PlugMgmtVif.execute')
@mock.patch('nova_powervm.virt.powervm.tasks.network.PlugVifs.execute') @mock.patch('nova_powervm.virt.powervm.tasks.network.PlugVifs.execute')
@mock.patch('nova_powervm.virt.powervm.media.ConfigDrivePowerVM.'
'get_cfg_drv_name')
@mock.patch('nova_powervm.virt.powervm.media.ConfigDrivePowerVM.' @mock.patch('nova_powervm.virt.powervm.media.ConfigDrivePowerVM.'
'create_cfg_drv_vopt') 'create_cfg_drv_vopt')
@mock.patch('nova.virt.configdrive.required_by') @mock.patch('nova.virt.configdrive.required_by')
@mock.patch('nova_powervm.virt.powervm.vm.power_on') @mock.patch('nova_powervm.virt.powervm.vm.power_on')
def test_spawn_with_cfg(self, mock_pwron, mock_cfg_drv, mock_cfg_vopt, def test_spawn_with_cfg(self, mock_pwron, mock_cfg_drv, mock_cfg_vopt,
mock_plug_vifs, mock_plug_mgmt_vif, mock_sanitize, mock_cfg_name, mock_plug_vifs,
mock_pwron_opts): mock_plug_mgmt_vif, mock_pwron_opts):
"""Validates the PowerVM spawn w/ config drive operations.""" """Validates the PowerVM spawn w/ config drive operations."""
# Set up the mocks to the tasks. # Set up the mocks to the tasks.
mock_cfg_drv.return_value = True mock_cfg_drv.return_value = True
mock_sanitize.return_value = 'fake-name' mock_cfg_name.return_value = 'fake-name'
self.flags(remove_vopt_media_on_boot=True, group='powervm') self.flags(remove_vopt_media_on_boot=True, group='powervm')
mock_opts = mock.MagicMock() mock_opts = mock.MagicMock()
mock_pwron_opts.return_value = mock_opts mock_pwron_opts.return_value = mock_opts
@ -412,8 +414,7 @@ class TestPowerVMDriver(test.NoDBTestCase):
# Power on was called # Power on was called
mock_pwron.assert_called_once_with(self.apt, self.inst, opts=mock_opts) mock_pwron.assert_called_once_with(self.apt, self.inst, opts=mock_opts)
mock_opts.remove_optical.assert_called_with('fake-name', time=60) mock_opts.remove_optical.assert_called_with('fake-name', time=60)
mock_sanitize.assert_called_with( mock_cfg_name.assert_called_with(self.inst)
self.inst.name, prefix='cfg_', suffix='.iso', max_len=37)
self.scrub_stg.assert_called_with(mock.ANY, self.stg_ftsk, self.scrub_stg.assert_called_with(mock.ANY, self.stg_ftsk,
lpars_exist=True) lpars_exist=True)

View File

@ -18,8 +18,9 @@ from __future__ import absolute_import
import fixtures import fixtures
import mock import mock
from nova import test from nova import test
from oslo_utils.fixture import uuidsentinel
from pypowervm import const as pvm_const
from pypowervm.tests import test_fixtures as pvm_fx from pypowervm.tests import test_fixtures as pvm_fx
from pypowervm.wrappers import storage as pvm_stg from pypowervm.wrappers import storage as pvm_stg
from pypowervm.wrappers import virtual_io_server as pvm_vios from pypowervm.wrappers import virtual_io_server as pvm_vios
@ -46,62 +47,71 @@ class TestConfigDrivePowerVM(test.NoDBTestCase):
def test_crt_cfg_dr_iso(self, mock_pvm_uuid, mock_mkdrv, mock_meta): def test_crt_cfg_dr_iso(self, mock_pvm_uuid, mock_mkdrv, mock_meta):
"""Validates that the image creation method works.""" """Validates that the image creation method works."""
cfg_dr_builder = m.ConfigDrivePowerVM(self.apt) cfg_dr_builder = m.ConfigDrivePowerVM(self.apt)
self.assertTrue(self.validate_vopt.called)
mock_instance = mock.MagicMock() mock_instance = mock.MagicMock()
mock_instance.name = 'fake-instance'
mock_instance.uuid = '1e46bbfd-73b6-3c2a-aeab-a1d3f065e92f' mock_instance.uuid = '1e46bbfd-73b6-3c2a-aeab-a1d3f065e92f'
mock_files = mock.MagicMock() mock_files = mock.MagicMock()
mock_net = mock.MagicMock() mock_net = mock.MagicMock()
iso_path, file_name = cfg_dr_builder._create_cfg_dr_iso(mock_instance, iso_path = '/tmp/cfgdrv.iso'
mock_files, cfg_dr_builder._create_cfg_dr_iso(mock_instance, mock_files, mock_net,
mock_net) iso_path)
self.assertEqual('cfg_fake_instance.iso', file_name)
self.assertEqual('/tmp/cfgdrv/cfg_fake_instance.iso', iso_path)
self.assertTrue(mock_pvm_uuid.called) self.assertTrue(mock_pvm_uuid.called)
# Make sure the length is limited properly self.assertEqual(mock_mkdrv.call_count, 1)
mock_instance.name = 'fake-instance-with-name-that-is-too-long'
iso_path, file_name = cfg_dr_builder._create_cfg_dr_iso(mock_instance, # Test retry iso create
mock_files,
mock_net)
self.assertEqual('cfg_fake_instance_with_name_that_.iso', file_name)
self.assertEqual('/tmp/cfgdrv/cfg_fake_instance_with_name_that_.iso',
iso_path)
self.assertTrue(self.validate_vopt.called)
# Test retry vopt create
mock_mkdrv.reset_mock() mock_mkdrv.reset_mock()
mock_mkdrv.side_effect = [OSError, mock_mkdrv] mock_mkdrv.side_effect = [OSError, mock_mkdrv]
mock_instance.name = 'fake-instance-2' cfg_dr_builder._create_cfg_dr_iso(mock_instance, mock_files, mock_net,
iso_path, file_name = cfg_dr_builder._create_cfg_dr_iso(mock_instance, iso_path)
mock_files,
mock_net)
self.assertEqual('cfg_fake_instance_2.iso', file_name)
self.assertEqual('/tmp/cfgdrv/cfg_fake_instance_2.iso',
iso_path)
self.assertTrue(self.validate_vopt.called)
self.assertEqual(mock_mkdrv.call_count, 2) self.assertEqual(mock_mkdrv.call_count, 2)
def test_get_cfg_drv_name(self):
cfg_dr_builder = m.ConfigDrivePowerVM(self.apt)
mock_instance = mock.MagicMock()
mock_instance.uuid = uuidsentinel.inst_id
# calculate expected file name
expected_file_name = 'cfg_' + mock_instance.uuid.replace('-', '')
allowed_len = pvm_const.MaxLen.VOPT_NAME - 4 # '.iso' is 4 chars
expected_file_name = expected_file_name[:allowed_len] + '.iso'
name = cfg_dr_builder.get_cfg_drv_name(mock_instance)
self.assertEqual(name, expected_file_name)
@mock.patch('nova_powervm.virt.powervm.media.ConfigDrivePowerVM.' @mock.patch('nova_powervm.virt.powervm.media.ConfigDrivePowerVM.'
'_create_cfg_dr_iso', autospec=True) 'get_cfg_drv_name')
@mock.patch('os.path.getsize', autospec=True) @mock.patch('tempfile.NamedTemporaryFile', autospec=True)
@mock.patch('os.remove', autospec=True)
@mock.patch('nova_powervm.virt.powervm.media.ConfigDrivePowerVM.'
'_upload_vopt', autospec=True)
@mock.patch('nova_powervm.virt.powervm.media.ConfigDrivePowerVM.' @mock.patch('nova_powervm.virt.powervm.media.ConfigDrivePowerVM.'
'_attach_vopt') '_attach_vopt')
def test_crt_cfg_drv_vopt(self, mock_attach, mock_upld, mock_rm, mock_size, @mock.patch('os.path.getsize', autospec=True)
mock_cfg_iso): @mock.patch('pypowervm.tasks.storage.upload_vopt', autospec=True)
@mock.patch('nova_powervm.virt.powervm.media.ConfigDrivePowerVM.'
'_create_cfg_dr_iso', autospec=True)
def test_crt_cfg_drv_vopt(self, mock_ccdi, mock_upl, mock_getsize,
mock_attach, mock_ntf, mock_name):
# Mock Returns # Mock Returns
mock_cfg_iso.return_value = '/tmp/cfgdrv/fake.iso', 'fake.iso' cfg_dr_builder = m.ConfigDrivePowerVM(self.apt)
mock_size.return_value = 10000 cfg_dr_builder.vios_uuid = 'vios_uuid'
mock_upld.return_value = (mock.Mock(), None) mock_instance = mock.MagicMock()
mock_instance.uuid = uuidsentinel.inst_id
mock_upl.return_value = 'vopt', 'f_uuid'
fh = mock_ntf.return_value.__enter__.return_value
fh.name = 'iso_path'
mock_name.return_value = 'fake-name'
# Run # Run
cfg_dr_builder = m.ConfigDrivePowerVM(self.apt) cfg_dr_builder.create_cfg_drv_vopt(mock_instance, 'files', 'netinfo',
cfg_dr_builder.create_cfg_drv_vopt(mock.MagicMock(), mock.MagicMock(), 'fake_lpar', admin_pass='pass')
mock.MagicMock(), 'fake_lpar') mock_ntf.assert_called_once_with(mode='rb')
self.assertTrue(mock_upld.called) mock_ccdi.assert_called_once_with(cfg_dr_builder, mock_instance,
self.assertTrue(mock_attach.called) 'files', 'netinfo', 'iso_path',
mock_attach.assert_called_with(mock.ANY, 'fake_lpar', mock.ANY, None) admin_pass='pass')
self.assertTrue(self.validate_vopt.called) mock_getsize.assert_called_once_with('iso_path')
mock_upl.assert_called_once_with(self.apt, 'vios_uuid', fh,
'fake-name',
mock_getsize.return_value)
mock_attach.assert_called_once_with(mock_instance, 'fake_lpar',
'vopt', None)
@mock.patch('pypowervm.tasks.scsi_mapper.add_map', autospec=True) @mock.patch('pypowervm.tasks.scsi_mapper.add_map', autospec=True)
@mock.patch('pypowervm.tasks.scsi_mapper.build_vscsi_mapping', @mock.patch('pypowervm.tasks.scsi_mapper.build_vscsi_mapping',

View File

@ -463,11 +463,7 @@ class PowerVMDriver(driver.ComputeDriver):
pwr_opts = pvm_popts.PowerOnOpts() pwr_opts = pvm_popts.PowerOnOpts()
if CONF.powervm.remove_vopt_media_on_boot: if CONF.powervm.remove_vopt_media_on_boot:
# Get the cfgdrive media name for the vopt removal task in PowerOn. media_name = media.ConfigDrivePowerVM.get_cfg_drv_name(instance)
media_name = pvm_util.sanitize_file_name_for_api(
instance.name, prefix=media.CFG_DRV_PREFIX,
suffix=media.CFG_DRV_SUFFIX,
max_len=pvm_const.MaxLen.VOPT_NAME)
pwr_opts.remove_optical( pwr_opts.remove_optical(
media_name, time=CONF.powervm.remove_vopt_media_time) media_name, time=CONF.powervm.remove_vopt_media_time)

View File

@ -21,6 +21,7 @@ from nova.virt import configdrive
import os import os
import retrying import retrying
from taskflow import task from taskflow import task
import tempfile
from oslo_log import log as logging from oslo_log import log as logging
from oslo_utils import excutils from oslo_utils import excutils
@ -44,9 +45,6 @@ CONF = cfg.CONF
_LLA_SUBNET = "fe80::/64" _LLA_SUBNET = "fe80::/64"
CFG_DRV_PREFIX = "cfg_"
CFG_DRV_SUFFIX = ".iso"
class ConfigDrivePowerVM(object): class ConfigDrivePowerVM(object):
@ -83,7 +81,7 @@ class ConfigDrivePowerVM(object):
return network_info return network_info
def _create_cfg_dr_iso(self, instance, injected_files, network_info, def _create_cfg_dr_iso(self, instance, injected_files, network_info,
admin_pass=None): iso_path, admin_pass=None):
"""Creates an ISO file that contains the injected files. """Creates an ISO file that contains the injected files.
Used for config drive. Used for config drive.
@ -92,9 +90,8 @@ class ConfigDrivePowerVM(object):
:param injected_files: A list of file paths that will be injected into :param injected_files: A list of file paths that will be injected into
the ISO. the ISO.
:param network_info: The network_info from the nova spawn method. :param network_info: The network_info from the nova spawn method.
:param iso_path: The absolute file path for the new ISO
:param admin_pass: Optional password to inject for the VM. :param admin_pass: Optional password to inject for the VM.
:return iso_path: The path to the ISO
:return file_name: The file name for the ISO
""" """
LOG.info("Creating config drive.", instance=instance) LOG.info("Creating config drive.", instance=instance)
extra_md = {} extra_md = {}
@ -112,15 +109,6 @@ class ConfigDrivePowerVM(object):
# fix instance uuid to match uuid assigned to VM # fix instance uuid to match uuid assigned to VM
inst_md.uuid = vm.get_pvm_uuid(instance).lower() inst_md.uuid = vm.get_pvm_uuid(instance).lower()
# Make sure the path exists.
im_path = CONF.powervm.image_meta_local_path
if not os.path.exists(im_path):
os.mkdir(im_path)
file_name = pvm_util.sanitize_file_name_for_api(
instance.name, prefix=CFG_DRV_PREFIX, suffix=CFG_DRV_SUFFIX,
max_len=pvm_const.MaxLen.VOPT_NAME)
iso_path = os.path.join(im_path, file_name)
with configdrive.ConfigDriveBuilder(instance_md=inst_md) as cdb: with configdrive.ConfigDriveBuilder(instance_md=inst_md) as cdb:
LOG.info("Config drive ISO building to path %(iso_path)s.", LOG.info("Config drive ISO building to path %(iso_path)s.",
{'iso_path': iso_path}, instance=instance) {'iso_path': iso_path}, instance=instance)
@ -134,9 +122,9 @@ class ConfigDrivePowerVM(object):
stop_max_attempt_number=2) stop_max_attempt_number=2)
def _make_cfg_drive(iso_path): def _make_cfg_drive(iso_path):
cdb.make_drive(iso_path) cdb.make_drive(iso_path)
try: try:
_make_cfg_drive(iso_path) _make_cfg_drive(iso_path)
return iso_path, file_name
except OSError: except OSError:
with excutils.save_and_reraise_exception(logger=LOG): with excutils.save_and_reraise_exception(logger=LOG):
# If we get here, that means there's an exception during # If we get here, that means there's an exception during
@ -145,6 +133,12 @@ class ConfigDrivePowerVM(object):
LOG.exception("Config drive ISO could not be built.", LOG.exception("Config drive ISO could not be built.",
instance=instance) instance=instance)
@staticmethod
def get_cfg_drv_name(instance):
return pvm_util.sanitize_file_name_for_api(
instance.uuid.replace('-', ''), prefix='cfg_', suffix='.iso',
max_len=pvm_const.MaxLen.VOPT_NAME)
def create_cfg_drv_vopt(self, instance, injected_files, network_info, def create_cfg_drv_vopt(self, instance, injected_files, network_info,
lpar_uuid, admin_pass=None, mgmt_cna=None, lpar_uuid, admin_pass=None, mgmt_cna=None,
stg_ftsk=None): stg_ftsk=None):
@ -169,15 +163,16 @@ class ConfigDrivePowerVM(object):
network_info = copy.deepcopy(network_info) network_info = copy.deepcopy(network_info)
network_info.append(self._mgmt_cna_to_vif(mgmt_cna)) network_info.append(self._mgmt_cna_to_vif(mgmt_cna))
iso_path, file_name = self._create_cfg_dr_iso(instance, injected_files, # Pick a file name for when we upload the media to VIOS
network_info, admin_pass) file_name = self.get_cfg_drv_name(instance)
# Upload the media # Create and upload the media
file_size = os.path.getsize(iso_path) with tempfile.NamedTemporaryFile(mode='rb') as fh:
vopt, f_uuid = self._upload_vopt(iso_path, file_name, file_size) self._create_cfg_dr_iso(instance, injected_files, network_info,
fh.name, admin_pass=admin_pass)
# Delete the media vopt, f_uuid = tsk_stg.upload_vopt(
os.remove(iso_path) self.adapter, self.vios_uuid, fh, file_name,
os.path.getsize(fh.name))
# Run the attach of the virtual optical # Run the attach of the virtual optical
self._attach_vopt(instance, lpar_uuid, vopt, stg_ftsk) self._attach_vopt(instance, lpar_uuid, vopt, stg_ftsk)
@ -255,11 +250,6 @@ class ConfigDrivePowerVM(object):
for x in range(0, len(splits), 2)]) for x in range(0, len(splits), 2)])
return ':'.join(ll) return ':'.join(ll)
def _upload_vopt(self, iso_path, file_name, file_size):
with open(iso_path, 'rb') as d_stream:
return tsk_stg.upload_vopt(self.adapter, self.vios_uuid, d_stream,
file_name, file_size)
def dlt_vopt(self, lpar_uuid, stg_ftsk=None, remove_mappings=True): def dlt_vopt(self, lpar_uuid, stg_ftsk=None, remove_mappings=True):
"""Deletes the virtual optical and scsi mappings for a VM. """Deletes the virtual optical and scsi mappings for a VM.