Add support for VFAT ConfigDrive

In order to provide support for VFAT ConfigDrive, this patch uses a couple
of tools from the mtools package, mdir for listing VFAT filesystems, respectively
mcopy, for copying files from such filesystems. The first step when retrieving the
metadata from a ConfigDrive is now the lookup for VFAT filesystems. If any such filesystem
is found, the metadata will be taken from there.

Change-Id: I2fa9c96ce22936ca3d8d8fb9f55ca1ab29ce1d1d
This commit is contained in:
Claudiu Popa 2015-02-23 13:11:57 +02:00
parent 8c28f2f12b
commit 2026ee8e0f
7 changed files with 283 additions and 15 deletions

View File

@ -29,6 +29,8 @@ opts = [
help='Look for an ISO config drive in raw HDDs'),
cfg.BoolOpt('config_drive_cdrom', default=True,
help='Look for a config drive in the attached cdrom drives'),
cfg.BoolOpt('config_drive_vfat', default=True,
help='Look for a config drive in VFAT filesystems.'),
]
CONF = cfg.CONF
@ -49,9 +51,11 @@ class ConfigDriveService(baseopenstackservice.BaseOpenStackService):
target_path = os.path.join(tempfile.gettempdir(), str(uuid.uuid4()))
mgr = factory.get_config_drive_manager()
found = mgr.get_config_drive_files(target_path,
CONF.config_drive_raw_hhd,
CONF.config_drive_cdrom)
found = mgr.get_config_drive_files(
target_path,
check_raw_hhd=CONF.config_drive_raw_hhd,
check_cdrom=CONF.config_drive_cdrom,
check_vfat=CONF.config_drive_vfat)
if found:
self._metadata_path = target_path
LOG.debug('Metadata copied to folder: \'%s\'' %

View File

@ -19,5 +19,5 @@ class BaseConfigDriveManager(object):
@abc.abstractmethod
def get_config_drive_files(self, target_path, check_raw_hhd=True,
check_cdrom=True):
check_cdrom=True, check_vfat=True):
pass

View File

@ -26,6 +26,7 @@ from cloudbaseinit.metadata.services.osconfigdrive import base
from cloudbaseinit.openstack.common import log as logging
from cloudbaseinit.osutils import factory as osutils_factory
from cloudbaseinit.utils.windows import physical_disk
from cloudbaseinit.utils.windows import vfat
opts = [
cfg.StrOpt('bsdtar_path', default='bsdtar.exe',
@ -144,10 +145,24 @@ class WindowsConfigDriveManager(base.BaseConfigDriveManager):
phys_disk.close()
return iso_disk_found
def _get_conf_drive_from_vfat(self, target_path):
osutils = osutils_factory.get_os_utils()
for drive_path in osutils.get_physical_disks():
if vfat.is_vfat_drive(osutils, drive_path):
LOG.info('Config Drive found on disk %r', drive_path)
os.makedirs(target_path)
vfat.copy_from_vfat_drive(osutils, drive_path, target_path)
return True
def get_config_drive_files(self, target_path, check_raw_hhd=True,
check_cdrom=True):
check_cdrom=True, check_vfat=True):
config_drive_found = False
if check_raw_hhd:
if check_vfat:
LOG.debug('Looking for Config Drive in VFAT filesystems')
config_drive_found = self._get_conf_drive_from_vfat(target_path)
if not config_drive_found and check_raw_hhd:
LOG.debug('Looking for Config Drive in raw HDDs')
config_drive_found = self._get_conf_drive_from_raw_hdd(
target_path)

View File

@ -23,6 +23,8 @@ except ImportError:
from oslo.config import cfg
from cloudbaseinit import exception
from cloudbaseinit.tests import testutils
CONF = cfg.CONF
@ -265,22 +267,46 @@ class TestWindowsConfigDriveManager(unittest.TestCase):
'WindowsConfigDriveManager._get_conf_drive_from_raw_hdd')
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager._get_conf_drive_from_cdrom_drive')
def test_get_config_drive_files(self,
mock_get_conf_drive_from_cdrom_drive,
mock_get_conf_drive_from_raw_hdd):
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager._get_conf_drive_from_vfat')
def _test_get_config_drive_files(self,
mock_get_conf_drive_from_vfat,
mock_get_conf_drive_from_cdrom_drive,
mock_get_conf_drive_from_raw_hdd,
raw_hdd_found=False,
cdrom_drive_found=False,
vfat_found=False):
fake_path = os.path.join('fake', 'path')
mock_get_conf_drive_from_raw_hdd.return_value = False
mock_get_conf_drive_from_cdrom_drive.return_value = True
mock_get_conf_drive_from_raw_hdd.return_value = raw_hdd_found
mock_get_conf_drive_from_cdrom_drive.return_value = cdrom_drive_found
mock_get_conf_drive_from_vfat.return_value = vfat_found
response = self._config_manager.get_config_drive_files(
target_path=fake_path)
mock_get_conf_drive_from_raw_hdd.assert_called_once_with(fake_path)
mock_get_conf_drive_from_cdrom_drive.assert_called_once_with(
fake_path)
if vfat_found:
mock_get_conf_drive_from_vfat.assert_called_once_with(fake_path)
self.assertFalse(mock_get_conf_drive_from_raw_hdd.called)
self.assertFalse(mock_get_conf_drive_from_cdrom_drive.called)
elif cdrom_drive_found:
mock_get_conf_drive_from_vfat.assert_called_once_with(fake_path)
mock_get_conf_drive_from_cdrom_drive.assert_called_once_with(
fake_path)
mock_get_conf_drive_from_raw_hdd.assert_called_once_with(
fake_path)
elif raw_hdd_found:
mock_get_conf_drive_from_vfat.assert_called_once_with(fake_path)
mock_get_conf_drive_from_raw_hdd.assert_called_once_with(
fake_path)
self.assertFalse(mock_get_conf_drive_from_cdrom_drive.called)
self.assertTrue(response)
def test_get_config_drive_files(self):
self._test_get_config_drive_files(raw_hdd_found=True)
self._test_get_config_drive_files(cdrom_drive_found=True)
self._test_get_config_drive_files(vfat_found=True)
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager.'
'_get_config_drive_cdrom_mount_point')
@ -354,3 +380,43 @@ class TestWindowsConfigDriveManager(unittest.TestCase):
def test_get_conf_drive_from_raw_hdd_no_drive_found(self):
self._test_get_conf_drive_from_raw_hdd(found_drive=False)
@mock.patch('os.makedirs')
@mock.patch('cloudbaseinit.utils.windows.vfat.copy_from_vfat_drive')
@mock.patch('cloudbaseinit.utils.windows.vfat.is_vfat_drive')
@mock.patch('cloudbaseinit.osutils.factory.get_os_utils')
def test_get_conf_drive_from_vfat(self, mock_get_os_utils,
mock_is_vfat_drive,
mock_copy_from_vfat_drive,
mock_os_makedirs):
mock_osutils = mock_get_os_utils.return_value
mock_osutils.get_physical_disks.return_value = (
mock.sentinel.drive1,
mock.sentinel.drive2,
)
mock_is_vfat_drive.side_effect = (None, True)
with testutils.LogSnatcher('cloudbaseinit.metadata.services.'
'osconfigdrive.windows') as snatcher:
response = self._config_manager._get_conf_drive_from_vfat(
mock.sentinel.target_path)
self.assertTrue(response)
mock_osutils.get_physical_disks.assert_called_once_with()
expected_is_vfat_calls = [
mock.call(mock_osutils, mock.sentinel.drive1),
mock.call(mock_osutils, mock.sentinel.drive2),
]
self.assertEqual(expected_is_vfat_calls, mock_is_vfat_drive.mock_calls)
mock_copy_from_vfat_drive.assert_called_once_with(
mock_osutils,
mock.sentinel.drive2,
mock.sentinel.target_path)
expected_logging = [
'Config Drive found on disk %r' % mock.sentinel.drive2,
]
self.assertEqual(expected_logging, snatcher.output)
mock_os_makedirs.assert_called_once_with(mock.sentinel.target_path)

View File

@ -68,7 +68,10 @@ class ConfigDriveServiceTest(unittest.TestCase):
mock_gettempdir.assert_called_once_with()
mock_get_config_drive_manager.assert_called_once_with()
mock_manager.get_config_drive_files.assert_called_once_with(
fake_path, CONF.config_drive_raw_hhd, CONF.config_drive_cdrom)
fake_path,
check_raw_hhd=CONF.config_drive_raw_hhd,
check_cdrom=CONF.config_drive_cdrom,
check_vfat=CONF.config_drive_vfat)
self.assertTrue(response)
self.assertEqual(fake_path, self._config_drive._metadata_path)

View File

@ -0,0 +1,112 @@
# Copyright 2015 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import unittest
try:
import unittest.mock as mock
except ImportError:
import mock
from cloudbaseinit import exception
from cloudbaseinit.tests import testutils
from cloudbaseinit.utils.windows import vfat
CONF = vfat.CONF
class TestVfat(unittest.TestCase):
def _test_is_vfat_drive(self, execute_process_value,
expected_logging,
expected_response):
mock_osutils = mock.Mock()
mock_osutils.execute_process.return_value = execute_process_value
with testutils.LogSnatcher('cloudbaseinit.utils.windows.'
'vfat') as snatcher:
with testutils.ConfPatcher('mtools_path', 'mtools_path'):
response = vfat.is_vfat_drive(mock_osutils,
mock.sentinel.drive)
mdir = os.path.join(CONF.mtools_path, "mdir.exe")
mock_osutils.execute_process.assert_called_once_with(
[mdir, "-/", "-b", "-i", mock.sentinel.drive, "/"],
shell=False)
self.assertEqual(expected_logging, snatcher.output)
self.assertEqual(expected_response, response)
def test_is_vfat_drive_fails(self):
expected_logging = [
"%r is not a VFAT location." % mock.sentinel.drive,
]
execute_process_value = (None, None, 1)
expected_response = None
self._test_is_vfat_drive(execute_process_value=execute_process_value,
expected_logging=expected_logging,
expected_response=expected_response)
def test_is_vfat_drive_works(self):
mock_out = mock.Mock()
expected_logging = []
execute_process_value = (mock_out, None, 0)
expected_response = True
self._test_is_vfat_drive(execute_process_value=execute_process_value,
expected_logging=expected_logging,
expected_response=expected_response)
@testutils.ConfPatcher('mtools_path', 'mtools_path')
@mock.patch('os.chdir')
def test_copy(self, mock_os_chdir):
cwd = os.getcwd()
mock_osutils = mock.Mock()
vfat.copy_from_vfat_drive(mock_osutils,
mock.sentinel.drive,
mock.sentinel.target_path)
mock_os_chdir_calls = [
mock.call(mock.sentinel.target_path),
mock.call(cwd),
]
self.assertEqual(mock_os_chdir_calls, mock_os_chdir.mock_calls)
self.assertEqual(os.getcwd(), cwd)
mcopy = os.path.join(CONF.mtools_path, "mcopy.exe")
mock_osutils.execute_process.assert_called_once_with(
[mcopy, "-s", "-n", "-i", mock.sentinel.drive, "::/", "."],
shell=False)
def test_is_vfat_drive_mtools_not_given(self):
with self.assertRaises(exception.CloudbaseInitException) as cm:
vfat.is_vfat_drive(mock.sentinel.osutils,
mock.sentinel.target_path)
expected_message = ('"mtools_path" needs to be provided in order '
'to access VFAT drives')
self.assertEqual(expected_message, str(cm.exception.args[0]))
def test_copy_from_vfat_drive_mtools_not_given(self):
with self.assertRaises(exception.CloudbaseInitException) as cm:
vfat.copy_from_vfat_drive(mock.sentinel.osutils,
mock.sentinel.drive_path,
mock.sentinel.target_path)
expected_message = ('"mtools_path" needs to be provided in order '
'to access VFAT drives')
self.assertEqual(expected_message, str(cm.exception.args[0]))

View File

@ -0,0 +1,68 @@
# Copyright 2015 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from cloudbaseinit import exception
from cloudbaseinit.openstack.common import log as logging
from oslo.config import cfg
opts = [
cfg.StrOpt('mtools_path', default=None,
help='Path to "mtools" program suite, used for interacting '
'with VFAT filesystems'),
]
CONF = cfg.CONF
CONF.register_opts(opts)
LOG = logging.getLogger(__name__)
def _check_mtools_path():
if not CONF.mtools_path:
raise exception.CloudbaseInitException(
'"mtools_path" needs to be provided in order '
'to access VFAT drives')
def is_vfat_drive(osutils, drive_path):
"""Check if the given drive contains a VFAT filesystem."""
_check_mtools_path()
mdir = os.path.join(CONF.mtools_path, "mdir.exe")
args = [mdir, "-/", "-b", "-i", drive_path, "/"]
_, _, exit_code = osutils.execute_process(args, shell=False)
if exit_code:
LOG.warning("%r is not a VFAT location.", drive_path)
return
return True
def copy_from_vfat_drive(osutils, drive_path, target_path):
"""Copy everything from the given VFAT drive into the given target."""
_check_mtools_path()
cwd = os.getcwd()
try:
os.chdir(target_path)
# A mcopy call looks like this:
#
# mcopy -n -i \\.\PHYSICALDRIVEx ::/file/path destination/path
mcopy = os.path.join(CONF.mtools_path, "mcopy.exe")
args = [mcopy, "-s", "-n", "-i", drive_path, "::/", "."]
osutils.execute_process(args, shell=False)
finally:
os.chdir(cwd)