Check for the label of the volume from the VFAT filesystem

Currently, in order to see if a drive contains a VFAT filesystem, we
are using mdir to list the files, but this doesn't make sure that it's
actually contains a Config Drive. Using a check for the label is much
better and safe.

Change-Id: If148692820887ad488378d889a8cba61dacfba01
This commit is contained in:
Claudiu Popa 2015-03-05 19:49:21 +02:00
parent aead094c86
commit 0ed49b14cc
2 changed files with 29 additions and 13 deletions

View File

@ -43,9 +43,9 @@ class TestVfat(unittest.TestCase):
response = vfat.is_vfat_drive(mock_osutils,
mock.sentinel.drive)
mdir = os.path.join(CONF.mtools_path, "mdir.exe")
mdir = os.path.join(CONF.mtools_path, "mlabel.exe")
mock_osutils.execute_process.assert_called_once_with(
[mdir, "-/", "-b", "-i", mock.sentinel.drive, "/"],
[mdir, "-i", mock.sentinel.drive, "-s"],
shell=False)
self.assertEqual(expected_logging, snatcher.output)
@ -53,19 +53,28 @@ class TestVfat(unittest.TestCase):
def test_is_vfat_drive_fails(self):
expected_logging = [
"%r is not a VFAT location." % mock.sentinel.drive,
"Could not retrieve label for VFAT drive path %r"
% mock.sentinel.drive,
]
execute_process_value = (None, None, 1)
expected_response = None
expected_response = False
self._test_is_vfat_drive(execute_process_value=execute_process_value,
expected_logging=expected_logging,
expected_response=expected_response)
def test_is_vfat_drive_different_label(self):
expected_logging = []
execute_process_value = (b"Volume label is config", None, 0)
expected_response = False
self._test_is_vfat_drive(execute_process_value=execute_process_value,
expected_logging=expected_logging,
expected_response=expected_response)
def test_is_vfat_drive_works(self):
mock_out = mock.Mock()
expected_logging = []
execute_process_value = (mock_out, None, 0)
execute_process_value = (b"Volume label is config-2 \r\n", None, 0)
expected_response = True
self._test_is_vfat_drive(execute_process_value=execute_process_value,

View File

@ -13,6 +13,7 @@
# under the License.
import os
import re
from cloudbaseinit import exception
from cloudbaseinit.openstack.common import log as logging
@ -28,7 +29,9 @@ opts = [
CONF = cfg.CONF
CONF.register_opts(opts)
CONFIG_DRIVE_LABEL = 'config-2'
LOG = logging.getLogger(__name__)
VOLUME_LABEL_REGEX = re.compile("Volume label is (.*?)$")
def _check_mtools_path():
@ -41,14 +44,18 @@ def _check_mtools_path():
def is_vfat_drive(osutils, drive_path):
"""Check if the given drive contains a VFAT filesystem."""
_check_mtools_path()
mdir = os.path.join(CONF.mtools_path, "mdir.exe")
args = [mdir, "-/", "-b", "-i", drive_path, "/"]
_, _, exit_code = osutils.execute_process(args, shell=False)
if exit_code:
LOG.warning("%r is not a VFAT location.", drive_path)
return
mlabel = os.path.join(CONF.mtools_path, "mlabel.exe")
args = [mlabel, "-i", drive_path, "-s"]
return True
out, _, exit_code = osutils.execute_process(args, shell=False)
if exit_code:
LOG.warning("Could not retrieve label for VFAT drive path %r",
drive_path)
return False
out = out.decode().strip()
match = VOLUME_LABEL_REGEX.search(out)
return match.group(1) == CONFIG_DRIVE_LABEL
def copy_from_vfat_drive(osutils, drive_path, target_path):