Use tempfile for powervm config drive

There are potential security issues with using predictable temp
directories or files, so use python's tempfile module to do this
safely.

Change-Id: Ia067236785882ad3acca23a425ea1333b247d8c6
Closes-Bug: #1771538
This commit is contained in:
Matthew Edmonds 2018-10-12 17:26:29 -04:00
parent f8727c4112
commit 4afe8ea5a1
2 changed files with 45 additions and 68 deletions

View File

@ -19,17 +19,13 @@ from __future__ import absolute_import
import fixtures
import mock
from oslo_utils.fixture import uuidsentinel
from pypowervm import const as pvm_const
from pypowervm.tasks import scsi_mapper as tsk_map
from pypowervm.tests import test_fixtures as pvm_fx
from pypowervm.utils import transaction as pvm_tx
from pypowervm.wrappers import network as pvm_net
from pypowervm.wrappers import storage as pvm_stg
from pypowervm.wrappers import virtual_io_server as pvm_vios
import six
if six.PY2:
import __builtin__ as builtins
elif six.PY3:
import builtins
from nova import test
from nova.virt.powervm import media as m
@ -53,55 +49,40 @@ class TestConfigDrivePowerVM(test.NoDBTestCase):
def test_crt_cfg_dr_iso(self, mock_mkdrv, mock_meta):
"""Validates that the image creation method works."""
cfg_dr_builder = m.ConfigDrivePowerVM(self.apt)
self.assertTrue(self.validate_vopt.called)
mock_instance = mock.MagicMock()
mock_instance.name = 'fake-instance'
mock_instance.uuid = uuidsentinel.inst_id
mock_files = mock.MagicMock()
mock_net = mock.MagicMock()
iso_path, file_name = cfg_dr_builder._create_cfg_dr_iso(mock_instance,
mock_files,
mock_net)
self.assertEqual('cfg_fake_instance.iso', file_name)
self.assertEqual('/tmp/cfgdrv/cfg_fake_instance.iso', iso_path)
# Make sure the length is limited properly
mock_instance.name = 'fake-instance-with-name-that-is-too-long'
iso_path, file_name = cfg_dr_builder._create_cfg_dr_iso(mock_instance,
mock_files,
mock_net)
self.assertEqual('cfg_fake_instance_with_name_that_.iso', file_name)
self.assertEqual('/tmp/cfgdrv/cfg_fake_instance_with_name_that_.iso',
iso_path)
self.assertTrue(self.validate_vopt.called)
iso_path = '/tmp/cfgdrv.iso'
cfg_dr_builder._create_cfg_dr_iso(mock_instance, mock_files, mock_net,
iso_path)
self.assertEqual(mock_mkdrv.call_count, 1)
# Test retry iso create
mock_mkdrv.reset_mock()
# Test retry vopt create
mock_mkdrv.side_effect = [OSError, mock_mkdrv]
mock_instance.name = 'fake-instance-2'
iso_path, file_name = cfg_dr_builder._create_cfg_dr_iso(mock_instance,
mock_files,
mock_net)
self.assertEqual('cfg_fake_instance_2.iso', file_name)
self.assertEqual('/tmp/cfgdrv/cfg_fake_instance_2.iso',
iso_path)
self.assertTrue(self.validate_vopt.called)
cfg_dr_builder._create_cfg_dr_iso(mock_instance, mock_files, mock_net,
iso_path)
self.assertEqual(mock_mkdrv.call_count, 2)
@mock.patch('tempfile.NamedTemporaryFile')
@mock.patch('nova.virt.powervm.vm.get_pvm_uuid')
@mock.patch('pypowervm.tasks.scsi_mapper.build_vscsi_mapping')
@mock.patch('pypowervm.tasks.scsi_mapper.add_map')
@mock.patch('os.remove')
@mock.patch('os.path.getsize')
@mock.patch('pypowervm.tasks.storage.upload_vopt')
@mock.patch.object(builtins, 'open')
@mock.patch('nova.virt.powervm.media.ConfigDrivePowerVM.'
'_create_cfg_dr_iso')
def test_create_cfg_drv_vopt(self, mock_ccdi, mock_open, mock_upl,
mock_getsize, mock_rm, mock_addmap,
mock_bldmap, mock_vm_id):
def test_create_cfg_drv_vopt(self, mock_ccdi, mock_upl, mock_getsize,
mock_addmap, mock_bldmap, mock_vm_id,
mock_ntf):
cfg_dr = m.ConfigDrivePowerVM(self.apt)
mock_ccdi.return_value = 'iso_path', 'file_name'
mock_instance = mock.MagicMock()
mock_instance.uuid = uuidsentinel.inst_id
mock_upl.return_value = 'vopt', 'f_uuid'
fh = mock_ntf.return_value.__enter__.return_value
fh.name = 'iso_path'
wtsk = mock.create_autospec(pvm_tx.WrapperTask, instance=True)
ftsk = mock.create_autospec(pvm_tx.FeedTask, instance=True)
ftsk.configure_mock(wrapper_tasks={'vios_uuid': wtsk})
@ -110,24 +91,27 @@ class TestConfigDrivePowerVM(test.NoDBTestCase):
# Validate the internal add_func
vio = mock.create_autospec(pvm_vios.VIOS)
self.assertEqual(mock_addmap.return_value, add_func(vio))
mock_vm_id.assert_called_once_with('inst')
mock_vm_id.assert_called_once_with(mock_instance)
mock_bldmap.assert_called_once_with(
None, vio, mock_vm_id.return_value, 'vopt')
mock_addmap.assert_called_once_with(vio, mock_bldmap.return_value)
wtsk.add_functor_subtask.side_effect = test_afs
cfg_dr.create_cfg_drv_vopt(
'inst', 'files', 'netinfo', ftsk, admin_pass='pass')
# calculate expected file name
expected_file_name = 'cfg_' + mock_instance.uuid.replace('-', '')
allowed_len = pvm_const.MaxLen.VOPT_NAME - 4 # '.iso' is 4 chars
expected_file_name = expected_file_name[:allowed_len] + '.iso'
mock_ccdi.assert_called_once_with('inst', 'files', 'netinfo',
admin_pass='pass')
mock_open.assert_called_once_with('iso_path', 'rb')
cfg_dr.create_cfg_drv_vopt(
mock_instance, 'files', 'netinfo', ftsk, admin_pass='pass')
mock_ntf.assert_called_once_with(mode='rb')
mock_ccdi.assert_called_once_with(mock_instance, 'files', 'netinfo',
'iso_path', admin_pass='pass')
mock_getsize.assert_called_once_with('iso_path')
mock_upl.assert_called_once_with(
self.apt, 'vios_uuid',
mock_open.return_value.__enter__.return_value, 'file_name',
mock_getsize.return_value)
mock_rm.assert_called_once_with('iso_path')
mock_upl.assert_called_once_with(self.apt, 'vios_uuid', fh,
expected_file_name,
mock_getsize.return_value)
wtsk.add_functor_subtask.assert_called_once()
def test_sanitize_network_info(self):

View File

@ -16,6 +16,7 @@
import copy
import os
import tempfile
from oslo_log import log as logging
from oslo_utils import excutils
@ -41,7 +42,6 @@ _LLA_SUBNET = "fe80::/64"
# TODO(efried): CONF these (maybe)
_VOPT_VG = 'rootvg'
_VOPT_SIZE_GB = 1
_VOPT_TMPDIR = '/tmp/cfgdrv/'
class ConfigDrivePowerVM(object):
@ -84,7 +84,7 @@ class ConfigDrivePowerVM(object):
return network_info
def _create_cfg_dr_iso(self, instance, injected_files, network_info,
admin_pass=None):
iso_path, admin_pass=None):
"""Creates an ISO file that contains the injected files.
Used for config drive.
@ -93,9 +93,8 @@ class ConfigDrivePowerVM(object):
:param injected_files: A list of file paths that will be injected into
the ISO.
:param network_info: The network_info from the nova spawn method.
:param iso_path: The absolute file path for the new ISO
:param admin_pass: Optional password to inject for the VM.
:return iso_path: The path to the ISO
:return file_name: The file name for the ISO
"""
LOG.info("Creating config drive.", instance=instance)
extra_md = {}
@ -110,13 +109,6 @@ class ConfigDrivePowerVM(object):
extra_md=extra_md,
network_info=network_info)
if not os.path.exists(_VOPT_TMPDIR):
os.mkdir(_VOPT_TMPDIR)
file_name = pvm_util.sanitize_file_name_for_api(
instance.name, prefix='cfg_', suffix='.iso',
max_len=pvm_const.MaxLen.VOPT_NAME)
iso_path = os.path.join(_VOPT_TMPDIR, file_name)
with configdrive.ConfigDriveBuilder(instance_md=inst_md) as cdb:
LOG.info("Config drive ISO being built in %s.", iso_path,
instance=instance)
@ -127,9 +119,9 @@ class ConfigDrivePowerVM(object):
exc, OSError), stop_max_attempt_number=2)
def _make_cfg_drive(iso_path):
cdb.make_drive(iso_path)
try:
_make_cfg_drive(iso_path)
return iso_path, file_name
except OSError:
with excutils.save_and_reraise_exception(logger=LOG):
LOG.exception("Config drive ISO could not be built",
@ -153,17 +145,18 @@ class ConfigDrivePowerVM(object):
network_info = copy.deepcopy(network_info)
network_info.append(self._mgmt_cna_to_vif(mgmt_cna))
iso_path, file_name = self._create_cfg_dr_iso(
instance, injected_files, network_info, admin_pass=admin_pass)
# Pick a file name for when we upload the media to VIOS
file_name = pvm_util.sanitize_file_name_for_api(
instance.uuid.replace('-', ''), prefix='cfg_', suffix='.iso',
max_len=pvm_const.MaxLen.VOPT_NAME)
# Upload the media
with open(iso_path, 'rb') as d_stream:
# Create and upload the media
with tempfile.NamedTemporaryFile(mode='rb') as fh:
self._create_cfg_dr_iso(instance, injected_files, network_info,
fh.name, admin_pass=admin_pass)
vopt, f_uuid = tsk_stg.upload_vopt(
self.adapter, self.vios_uuid, d_stream, file_name,
os.path.getsize(iso_path))
# The media can be removed now that the upload is complete
os.remove(iso_path)
self.adapter, self.vios_uuid, fh, file_name,
os.path.getsize(fh.name))
# Define the function to build and add the mapping
def add_func(vios_w):