Refactor config drive metadata service

Refactor the existing ConfigDrive Metadata service so that another
metadata format implemenation like NoCloud can be easily added.

Now, the drive label and the metadata file can be set in the
constructor, making it easy to add another config drive metadata service
with different label or path.

Change-Id: I8dd8160dfbe9f529bb8f30ab85181f264c18833e
This commit is contained in:
Adrian Vladu 2020-02-07 19:47:25 +02:00
parent 32103bd557
commit 3e5b5f37ff
7 changed files with 329 additions and 181 deletions

View File

@ -0,0 +1,93 @@
# Copyright 2012 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import shutil
from oslo_log import log as oslo_logging
from cloudbaseinit import conf as cloudbaseinit_conf
from cloudbaseinit import constant
from cloudbaseinit import exception
from cloudbaseinit.metadata.services import base
from cloudbaseinit.metadata.services.osconfigdrive import factory
CONF = cloudbaseinit_conf.CONF
LOG = oslo_logging.getLogger(__name__)
CD_TYPES = constant.CD_TYPES
CD_LOCATIONS = constant.CD_LOCATIONS
class BaseConfigDriveService(base.BaseMetadataService):
def __init__(self, drive_label, metadata_file):
super(BaseConfigDriveService, self).__init__()
self._drive_label = drive_label
self._metadata_file = metadata_file
self._metadata_path = None
self._searched_types = set()
self._searched_locations = set()
def _preprocess_options(self):
self._searched_types = set(CONF.config_drive.types)
self._searched_locations = set(CONF.config_drive.locations)
# Deprecation backward compatibility.
if CONF.config_drive.raw_hdd:
self._searched_types.add("iso")
self._searched_locations.add("hdd")
if CONF.config_drive.cdrom:
self._searched_types.add("iso")
self._searched_locations.add("cdrom")
if CONF.config_drive.vfat:
self._searched_types.add("vfat")
self._searched_locations.add("hdd")
# Check for invalid option values.
if self._searched_types | CD_TYPES != CD_TYPES:
raise exception.CloudbaseInitException(
"Invalid Config Drive types %s", self._searched_types)
if self._searched_locations | CD_LOCATIONS != CD_LOCATIONS:
raise exception.CloudbaseInitException(
"Invalid Config Drive locations %s", self._searched_locations)
def load(self):
super(BaseConfigDriveService, self).load()
self._preprocess_options()
self._mgr = factory.get_config_drive_manager()
found = self._mgr.get_config_drive_files(
drive_label=self._drive_label,
metadata_file=self._metadata_file,
searched_types=self._searched_types,
searched_locations=self._searched_locations)
if found:
self._metadata_path = self._mgr.target_path
LOG.debug('Metadata copied to folder: %r', self._metadata_path)
return found
def _get_data(self, path):
norm_path = os.path.normpath(os.path.join(self._metadata_path, path))
try:
with open(norm_path, 'rb') as stream:
return stream.read()
except IOError:
raise base.NotExistingMetadataException()
def cleanup(self):
LOG.debug('Deleting metadata folder: %r', self._mgr.target_path)
shutil.rmtree(self._mgr.target_path, ignore_errors=True)
self._metadata_path = None

View File

@ -1,4 +1,4 @@
# Copyright 2012 Cloudbase Solutions Srl # Copyright 2020 Cloudbase Solutions Srl
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
@ -12,77 +12,19 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import os
import shutil
from oslo_log import log as oslo_logging from oslo_log import log as oslo_logging
from cloudbaseinit import conf as cloudbaseinit_conf from cloudbaseinit import conf as cloudbaseinit_conf
from cloudbaseinit import constant from cloudbaseinit.metadata.services import baseconfigdrive
from cloudbaseinit import exception
from cloudbaseinit.metadata.services import base
from cloudbaseinit.metadata.services import baseopenstackservice from cloudbaseinit.metadata.services import baseopenstackservice
from cloudbaseinit.metadata.services.osconfigdrive import factory
CONF = cloudbaseinit_conf.CONF CONF = cloudbaseinit_conf.CONF
LOG = oslo_logging.getLogger(__name__) LOG = oslo_logging.getLogger(__name__)
CD_TYPES = constant.CD_TYPES
CD_LOCATIONS = constant.CD_LOCATIONS
class ConfigDriveService(baseconfigdrive.BaseConfigDriveService,
class ConfigDriveService(baseopenstackservice.BaseOpenStackService): baseopenstackservice.BaseOpenStackService):
def __init__(self): def __init__(self):
super(ConfigDriveService, self).__init__() super(ConfigDriveService, self).__init__(
self._metadata_path = None 'config-2', 'openstack\\latest\\meta_data.json')
def _preprocess_options(self):
self._searched_types = set(CONF.config_drive.types)
self._searched_locations = set(CONF.config_drive.locations)
# Deprecation backward compatibility.
if CONF.config_drive.raw_hdd:
self._searched_types.add("iso")
self._searched_locations.add("hdd")
if CONF.config_drive.cdrom:
self._searched_types.add("iso")
self._searched_locations.add("cdrom")
if CONF.config_drive.vfat:
self._searched_types.add("vfat")
self._searched_locations.add("hdd")
# Check for invalid option values.
if self._searched_types | CD_TYPES != CD_TYPES:
raise exception.CloudbaseInitException(
"Invalid Config Drive types %s", self._searched_types)
if self._searched_locations | CD_LOCATIONS != CD_LOCATIONS:
raise exception.CloudbaseInitException(
"Invalid Config Drive locations %s", self._searched_locations)
def load(self):
super(ConfigDriveService, self).load()
self._preprocess_options()
self._mgr = factory.get_config_drive_manager()
found = self._mgr.get_config_drive_files(
searched_types=self._searched_types,
searched_locations=self._searched_locations)
if found:
self._metadata_path = self._mgr.target_path
LOG.debug('Metadata copied to folder: %r', self._metadata_path)
return found
def _get_data(self, path):
norm_path = os.path.normpath(os.path.join(self._metadata_path, path))
try:
with open(norm_path, 'rb') as stream:
return stream.read()
except IOError:
raise base.NotExistingMetadataException()
def cleanup(self):
LOG.debug('Deleting metadata folder: %r', self._mgr.target_path)
shutil.rmtree(self._mgr.target_path, ignore_errors=True)
self._metadata_path = None

View File

@ -25,5 +25,7 @@ class BaseConfigDriveManager(object):
self.target_path = tempfile.mkdtemp() self.target_path = tempfile.mkdtemp()
@abc.abstractmethod @abc.abstractmethod
def get_config_drive_files(self, check_types=None, check_locations=None): def get_config_drive_files(self, drive_label, metadata_file,
check_types=None, check_locations=None,
):
pass pass

View File

@ -32,7 +32,6 @@ from cloudbaseinit.utils.windows import vfat
CONF = cloudbaseinit_conf.CONF CONF = cloudbaseinit_conf.CONF
LOG = oslo_logging.getLogger(__name__) LOG = oslo_logging.getLogger(__name__)
CONFIG_DRIVE_LABEL = 'config-2'
MAX_SECTOR_SIZE = 4096 MAX_SECTOR_SIZE = 4096
# Absolute offset values and the ISO magic string. # Absolute offset values and the ISO magic string.
OFFSET_BOOT_RECORD = 0x8000 OFFSET_BOOT_RECORD = 0x8000
@ -50,18 +49,25 @@ class WindowsConfigDriveManager(base.BaseConfigDriveManager):
super(WindowsConfigDriveManager, self).__init__() super(WindowsConfigDriveManager, self).__init__()
self._osutils = osutils_factory.get_os_utils() self._osutils = osutils_factory.get_os_utils()
def _check_for_config_drive(self, drive): def _meta_data_file_exists(self, drive, metadata_file):
metadata_file = os.path.join(drive, metadata_file)
if os.path.exists(metadata_file):
return True
LOG.debug('%s not found', metadata_file)
return False
def _check_for_config_drive(self, drive, required_drive_label,
metadata_file):
label = self._osutils.get_volume_label(drive) label = self._osutils.get_volume_label(drive)
if label and label.lower() == CONFIG_DRIVE_LABEL: if label and label.lower() == required_drive_label and \
if os.path.exists(os.path.join(drive, self._meta_data_file_exists(drive, metadata_file):
'openstack\\latest\\meta_data.json')): LOG.info('Config Drive found on %s', drive)
LOG.info('Config Drive found on %s', drive) return True
return True
LOG.debug('%s\openstack\latest\meta_data.json not '
'found', drive)
LOG.debug("Looking for a Config Drive with label '%s' on '%s'. " LOG.debug("Looking for a Config Drive with label '%s' on '%s'. "
"Found mismatching label '%s'.", "Found mismatching label '%s'.",
CONFIG_DRIVE_LABEL, drive, label) required_drive_label, drive, label)
return False return False
def _get_iso_file_size(self, device): def _get_iso_file_size(self, device):
@ -137,20 +143,21 @@ class WindowsConfigDriveManager(base.BaseConfigDriveManager):
os.remove(iso_file_path) os.remove(iso_file_path)
return extracted return extracted
def _get_config_drive_from_cdrom_drive(self): def _get_config_drive_from_cdrom_drive(self, drive_label, metadata_file):
for drive_letter in self._osutils.get_cdrom_drives(): for drive_letter in self._osutils.get_cdrom_drives():
if self._check_for_config_drive(drive_letter): if self._check_for_config_drive(drive_letter, drive_label,
metadata_file):
os.rmdir(self.target_path) os.rmdir(self.target_path)
shutil.copytree(drive_letter, self.target_path) shutil.copytree(drive_letter, self.target_path)
return True return True
return False return False
def _get_config_drive_from_raw_hdd(self): def _get_config_drive_from_raw_hdd(self, drive_label, metadata_file):
disks = map(disk.Disk, self._osutils.get_physical_disks()) disks = map(disk.Disk, self._osutils.get_physical_disks())
return self._extract_iso_from_devices(disks) return self._extract_iso_from_devices(disks)
def _get_config_drive_from_vfat(self): def _get_config_drive_from_vfat(self, drive_label, metadata_file):
for drive_path in self._osutils.get_physical_disks(): for drive_path in self._osutils.get_physical_disks():
if vfat.is_vfat_drive(self._osutils, drive_path): if vfat.is_vfat_drive(self._osutils, drive_path):
LOG.info('Config Drive found on disk %r', drive_path) LOG.info('Config Drive found on disk %r', drive_path)
@ -159,7 +166,7 @@ class WindowsConfigDriveManager(base.BaseConfigDriveManager):
return True return True
return False return False
def _get_config_drive_from_partition(self): def _get_config_drive_from_partition(self, drive_label, metadata_file):
for disk_path in self._osutils.get_physical_disks(): for disk_path in self._osutils.get_physical_disks():
physical_drive = disk.Disk(disk_path) physical_drive = disk.Disk(disk_path)
with physical_drive: with physical_drive:
@ -169,22 +176,24 @@ class WindowsConfigDriveManager(base.BaseConfigDriveManager):
return True return True
return False return False
def _get_config_drive_from_volume(self): def _get_config_drive_from_volume(self, drive_label, metadata_file):
"""Look through all the volumes for config drive.""" """Look through all the volumes for config drive."""
volumes = self._osutils.get_volumes() volumes = self._osutils.get_volumes()
for volume in volumes: for volume in volumes:
if self._check_for_config_drive(volume): if self._check_for_config_drive(volume, drive_label,
metadata_file):
os.rmdir(self.target_path) os.rmdir(self.target_path)
shutil.copytree(volume, self.target_path) shutil.copytree(volume, self.target_path)
return True return True
return False return False
def _get_config_drive_files(self, cd_type, cd_location): def _get_config_drive_files(self, drive_label, metadata_file,
cd_type, cd_location):
try: try:
get_config_drive = self.config_drive_type_location.get( get_config_drive = self.config_drive_type_location.get(
"{}_{}".format(cd_location, cd_type)) "{}_{}".format(cd_location, cd_type))
if get_config_drive: if get_config_drive:
return get_config_drive() return get_config_drive(drive_label, metadata_file)
else: else:
LOG.debug("Irrelevant type %(type)s in %(location)s " LOG.debug("Irrelevant type %(type)s in %(location)s "
"location; skip", "location; skip",
@ -196,16 +205,19 @@ class WindowsConfigDriveManager(base.BaseConfigDriveManager):
return False return False
def get_config_drive_files(self, searched_types=None, def get_config_drive_files(self, drive_label, metadata_file,
searched_locations=None): searched_types=None, searched_locations=None):
searched_types = searched_types or [] searched_types = searched_types or []
searched_locations = searched_locations or [] searched_locations = searched_locations or []
for cd_type, cd_location in itertools.product(searched_types, for cd_type, cd_location in itertools.product(searched_types,
searched_locations): searched_locations):
LOG.debug('Looking for Config Drive %(type)s in %(location)s', LOG.debug('Looking for Config Drive %(type)s in %(location)s '
{"type": cd_type, "location": cd_location}) 'with expected label %(drive_label)s',
if self._get_config_drive_files(cd_type, cd_location): {"type": cd_type, "location": cd_location,
"drive_label": drive_label})
if self._get_config_drive_files(drive_label, metadata_file,
cd_type, cd_location):
return True return True
return False return False

View File

@ -61,6 +61,8 @@ class TestWindowsConfigDriveManager(unittest.TestCase):
self.osutils = mock.Mock() self.osutils = mock.Mock()
self._config_manager._osutils = self.osutils self._config_manager._osutils = self.osutils
self.snatcher = testutils.LogSnatcher(module_path) self.snatcher = testutils.LogSnatcher(module_path)
self._fake_label = 'config-2'
self._fake_metadata_file = 'fake_metadata_file'
@mock.patch('os.path.exists') @mock.patch('os.path.exists')
def _test_check_for_config_drive(self, mock_exists, exists=True, def _test_check_for_config_drive(self, mock_exists, exists=True,
@ -70,7 +72,8 @@ class TestWindowsConfigDriveManager(unittest.TestCase):
mock_exists.return_value = exists mock_exists.return_value = exists
with self.snatcher: with self.snatcher:
response = self._config_manager._check_for_config_drive(drive) response = self._config_manager._check_for_config_drive(
drive, self._fake_label, self._fake_metadata_file)
self.osutils.get_volume_label.assert_called_once_with(drive) self.osutils.get_volume_label.assert_called_once_with(drive)
if exists and not fail: if exists and not fail:
@ -256,11 +259,15 @@ class TestWindowsConfigDriveManager(unittest.TestCase):
checks[2] = False checks[2] = False
mock_check_for_config_drive.side_effect = checks mock_check_for_config_drive.side_effect = checks
response = self._config_manager._get_config_drive_from_cdrom_drive() response = self._config_manager._get_config_drive_from_cdrom_drive(
self._fake_label, self._fake_metadata_file
)
self.osutils.get_cdrom_drives.assert_called_once_with() self.osutils.get_cdrom_drives.assert_called_once_with()
idx = 3 if found else 4 idx = 3 if found else 4
check_calls = [mock.call(drive) for drive in drives[:idx]] check_calls = [
mock.call(drive, self._fake_label,
self._fake_metadata_file) for drive in drives[:idx]]
mock_check_for_config_drive.assert_has_calls(check_calls) mock_check_for_config_drive.assert_has_calls(check_calls)
if found: if found:
mock_os_rmdir.assert_called_once_with( mock_os_rmdir.assert_called_once_with(
@ -287,7 +294,9 @@ class TestWindowsConfigDriveManager(unittest.TestCase):
self.osutils.get_physical_disks.return_value = paths self.osutils.get_physical_disks.return_value = paths
mock_extract_iso_from_devices.return_value = True mock_extract_iso_from_devices.return_value = True
response = self._config_manager._get_config_drive_from_raw_hdd() response = self._config_manager._get_config_drive_from_raw_hdd(
self._fake_label, self._fake_metadata_file)
mock_map.assert_called_once_with(Disk, paths) mock_map.assert_called_once_with(Disk, paths)
self.osutils.get_physical_disks.assert_called_once_with() self.osutils.get_physical_disks.assert_called_once_with()
mock_extract_iso_from_devices.assert_called_once_with( mock_extract_iso_from_devices.assert_called_once_with(
@ -306,7 +315,8 @@ class TestWindowsConfigDriveManager(unittest.TestCase):
with testutils.LogSnatcher('cloudbaseinit.metadata.services.' with testutils.LogSnatcher('cloudbaseinit.metadata.services.'
'osconfigdrive.windows') as snatcher: 'osconfigdrive.windows') as snatcher:
response = self._config_manager._get_config_drive_from_vfat() response = self._config_manager._get_config_drive_from_vfat(
self._fake_label, self._fake_metadata_file)
self.assertTrue(response) self.assertTrue(response)
self.osutils.get_physical_disks.assert_called_once_with() self.osutils.get_physical_disks.assert_called_once_with()
@ -340,7 +350,9 @@ class TestWindowsConfigDriveManager(unittest.TestCase):
extract_calls = [mock.call(disk.partitions()) extract_calls = [mock.call(disk.partitions())
for disk in disks[:idx]] for disk in disks[:idx]]
response = self._config_manager._get_config_drive_from_partition() response = self._config_manager._get_config_drive_from_partition(
self._fake_label, self._fake_metadata_file
)
self.osutils.get_physical_disks.assert_called_once_with() self.osutils.get_physical_disks.assert_called_once_with()
mock_extract_iso_from_devices.assert_has_calls(extract_calls) mock_extract_iso_from_devices.assert_has_calls(extract_calls)
self.assertEqual(found, response) self.assertEqual(found, response)
@ -364,9 +376,13 @@ class TestWindowsConfigDriveManager(unittest.TestCase):
checks = [False, found, found] checks = [False, found, found]
mock_check_for_config_drive.side_effect = checks mock_check_for_config_drive.side_effect = checks
idx = 3 - int(found) idx = 3 - int(found)
check_calls = [mock.call(volume) for volume in volumes[:idx]] check_calls = [
mock.call(volume, self._fake_label,
self._fake_metadata_file) for volume in volumes[:idx]]
response = self._config_manager._get_config_drive_from_volume() response = self._config_manager._get_config_drive_from_volume(
self._fake_label, self._fake_metadata_file
)
self.osutils.get_volumes.assert_called_once_with() self.osutils.get_volumes.assert_called_once_with()
mock_check_for_config_drive.assert_has_calls(check_calls) mock_check_for_config_drive.assert_has_calls(check_calls)
if found: if found:
@ -384,11 +400,13 @@ class TestWindowsConfigDriveManager(unittest.TestCase):
def _test__get_config_drive_files(self, cd_type, cd_location, def _test__get_config_drive_files(self, cd_type, cd_location,
func, found=True): func, found=True):
response = self._config_manager._get_config_drive_files(cd_type, response = self._config_manager._get_config_drive_files(
cd_location) self._fake_label, self._fake_metadata_file,
cd_type, cd_location)
if found: if found:
if func: if func:
func.assert_called_once_with() func.assert_called_once_with(self._fake_label,
self._fake_metadata_file)
self.assertEqual(func.return_value, response) self.assertEqual(func.return_value, response)
else: else:
self.assertFalse(response) self.assertFalse(response)
@ -450,18 +468,24 @@ class TestWindowsConfigDriveManager(unittest.TestCase):
found=True): found=True):
check_types = ["iso", "vfat"] if found else [] check_types = ["iso", "vfat"] if found else []
check_locations = ["cdrom", "hdd", "partition"] check_locations = ["cdrom", "hdd", "partition"]
product = list(itertools.product(check_types, check_locations)) product = list(itertools.product(check_types, check_locations,
product_calls = [mock.call(cd_type, cd_location) [self._fake_label]))
for cd_type, cd_location in product] product_calls = [mock.call(self._fake_label, self._fake_metadata_file,
cd_type, cd_location)
for cd_type, cd_location, _ in product]
mock_get_config_drive_files.side_effect = \ mock_get_config_drive_files.side_effect = \
[False] * (len(product_calls) - 1) + [True] [False] * (len(product_calls) - 1) + [True]
expected_log = ["Looking for Config Drive %(type)s in %(location)s" % expected_log = [("Looking for Config Drive %(type)s in %(location)s"
{"type": cd_type, "location": cd_location} " with expected label %(label)s") %
for cd_type, cd_location in product] {"type": cd_type, "location": cd_location,
"label": self._fake_label}
for cd_type, cd_location, config_type in product]
with self.snatcher: with self.snatcher:
response = self._config_manager.get_config_drive_files( response = self._config_manager.get_config_drive_files(
self._fake_label, self._fake_metadata_file,
check_types, check_locations) check_types, check_locations)
mock_get_config_drive_files.assert_has_calls(product_calls) mock_get_config_drive_files.assert_has_calls(product_calls)
self.assertEqual(expected_log, self.snatcher.output) self.assertEqual(expected_log, self.snatcher.output)
self.assertEqual(found, response) self.assertEqual(found, response)

View File

@ -0,0 +1,138 @@
# Copyright 2013 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import importlib
import os
import unittest
try:
import unittest.mock as mock
except ImportError:
import mock
from cloudbaseinit import exception
from cloudbaseinit.metadata.services import baseconfigdrive
from cloudbaseinit.tests import testutils
MODULE_PATH = "cloudbaseinit.metadata.services.baseconfigdrive"
class TestBaseConfigDriveService(unittest.TestCase):
def setUp(self):
self._win32com_mock = mock.MagicMock()
self._ctypes_mock = mock.MagicMock()
self._ctypes_util_mock = mock.MagicMock()
self._win32com_client_mock = mock.MagicMock()
self._pywintypes_mock = mock.MagicMock()
self._module_patcher = mock.patch.dict(
'sys.modules',
{'win32com': self._win32com_mock,
'ctypes': self._ctypes_mock,
'ctypes.util': self._ctypes_util_mock,
'win32com.client': self._win32com_client_mock,
'pywintypes': self._pywintypes_mock})
self._module_patcher.start()
self.addCleanup(self._module_patcher.stop)
self.baseconfigdrive_module = importlib.import_module(MODULE_PATH)
self._drive_label = 'fake_drive_label'
self._metadata_file = 'fake_metadata_file'
self._config_drive = (
self.baseconfigdrive_module.BaseConfigDriveService(
self._drive_label, self._metadata_file
))
self.snatcher = testutils.LogSnatcher(MODULE_PATH)
def _test_preprocess_options(self, fail=False):
if fail:
with testutils.ConfPatcher("types", ["vfat", "ntfs"],
group="config_drive"):
with self.assertRaises(exception.CloudbaseInitException):
self._config_drive._preprocess_options()
with testutils.ConfPatcher("locations", ["device"],
group="config_drive"):
with self.assertRaises(exception.CloudbaseInitException):
self._config_drive._preprocess_options()
return
options = {
"raw_hdd": False,
"cdrom": False,
"vfat": True,
# Deprecated options above.
"types": ["vfat", "iso"],
"locations": ["partition"]
}
contexts = [testutils.ConfPatcher(key, value, group="config_drive")
for key, value in options.items()]
with contexts[0], contexts[1], contexts[2], \
contexts[3], contexts[4]:
self._config_drive._preprocess_options()
self.assertEqual({"vfat", "iso"},
self._config_drive._searched_types)
self.assertEqual({"hdd", "partition"},
self._config_drive._searched_locations)
def test_preprocess_options_fail(self):
self._test_preprocess_options(fail=True)
def test_preprocess_options(self):
self._test_preprocess_options()
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.factory.'
'get_config_drive_manager')
def test_load(self, mock_get_config_drive_manager):
mock_manager = mock.MagicMock()
mock_manager.get_config_drive_files.return_value = True
fake_path = "fake\\fake_id"
mock_manager.target_path = fake_path
mock_get_config_drive_manager.return_value = mock_manager
response = self._config_drive.load()
mock_get_config_drive_manager.assert_called_once_with()
mock_manager.get_config_drive_files.assert_called_once_with(
drive_label=self._drive_label,
metadata_file=self._metadata_file,
searched_types=baseconfigdrive.CD_TYPES,
searched_locations=baseconfigdrive.CD_LOCATIONS)
self.assertTrue(response)
self.assertEqual(fake_path, self._config_drive._metadata_path)
@mock.patch('os.path.normpath')
@mock.patch('os.path.join')
def test_get_data(self, mock_join, mock_normpath):
fake_path = os.path.join('fake', 'path')
with mock.patch('six.moves.builtins.open',
mock.mock_open(read_data='fake data'), create=True):
response = self._config_drive._get_data(fake_path)
self.assertEqual('fake data', response)
mock_join.assert_called_with(
self._config_drive._metadata_path, fake_path)
mock_normpath.assert_called_once_with(mock_join.return_value)
@mock.patch('shutil.rmtree')
def test_cleanup(self, mock_rmtree):
fake_path = os.path.join('fake', 'path')
self._config_drive._metadata_path = fake_path
mock_mgr = mock.Mock()
self._config_drive._mgr = mock_mgr
mock_mgr.target_path = fake_path
self._config_drive.cleanup()
mock_rmtree.assert_called_once_with(fake_path,
ignore_errors=True)
self.assertEqual(None, self._config_drive._metadata_path)

View File

@ -14,7 +14,6 @@
import importlib import importlib
import os
import unittest import unittest
try: try:
@ -22,14 +21,15 @@ try:
except ImportError: except ImportError:
import mock import mock
from cloudbaseinit import exception from cloudbaseinit.metadata.services import baseconfigdrive
from cloudbaseinit.tests import testutils from cloudbaseinit.tests import testutils
MODULE_PATH = "cloudbaseinit.metadata.services.configdrive"
class TestConfigDriveService(unittest.TestCase): class TestConfigDriveService(unittest.TestCase):
def setUp(self): def setUp(self):
module_path = "cloudbaseinit.metadata.services.configdrive"
self._win32com_mock = mock.MagicMock() self._win32com_mock = mock.MagicMock()
self._ctypes_mock = mock.MagicMock() self._ctypes_mock = mock.MagicMock()
self._ctypes_util_mock = mock.MagicMock() self._ctypes_util_mock = mock.MagicMock()
@ -46,45 +46,11 @@ class TestConfigDriveService(unittest.TestCase):
self._module_patcher.start() self._module_patcher.start()
self.addCleanup(self._module_patcher.stop) self.addCleanup(self._module_patcher.stop)
self.configdrive_module = importlib.import_module(module_path) self.configdrive_module = importlib.import_module(MODULE_PATH)
self._drive_label = 'config-2'
self._metadata_file = 'openstack\\latest\\meta_data.json'
self._config_drive = self.configdrive_module.ConfigDriveService() self._config_drive = self.configdrive_module.ConfigDriveService()
self.snatcher = testutils.LogSnatcher(module_path) self.snatcher = testutils.LogSnatcher(MODULE_PATH)
def _test_preprocess_options(self, fail=False):
if fail:
with testutils.ConfPatcher("types", ["vfat", "ntfs"],
group="config_drive"):
with self.assertRaises(exception.CloudbaseInitException):
self._config_drive._preprocess_options()
with testutils.ConfPatcher("locations", ["device"],
group="config_drive"):
with self.assertRaises(exception.CloudbaseInitException):
self._config_drive._preprocess_options()
return
options = {
"raw_hdd": False,
"cdrom": False,
"vfat": True,
# Deprecated options above.
"types": ["vfat", "iso"],
"locations": ["partition"]
}
contexts = [testutils.ConfPatcher(key, value, group="config_drive")
for key, value in options.items()]
with contexts[0], contexts[1], contexts[2], \
contexts[3], contexts[4]:
self._config_drive._preprocess_options()
self.assertEqual({"vfat", "iso"},
self._config_drive._searched_types)
self.assertEqual({"hdd", "partition"},
self._config_drive._searched_locations)
def test_preprocess_options_fail(self):
self._test_preprocess_options(fail=True)
def test_preprocess_options(self):
self._test_preprocess_options()
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.factory.' @mock.patch('cloudbaseinit.metadata.services.osconfigdrive.factory.'
'get_config_drive_manager') 'get_config_drive_manager')
@ -94,43 +60,14 @@ class TestConfigDriveService(unittest.TestCase):
fake_path = "fake\\fake_id" fake_path = "fake\\fake_id"
mock_manager.target_path = fake_path mock_manager.target_path = fake_path
mock_get_config_drive_manager.return_value = mock_manager mock_get_config_drive_manager.return_value = mock_manager
expected_log = [
"Metadata copied to folder: %r" % fake_path]
with self.snatcher: response = self._config_drive.load()
response = self._config_drive.load()
mock_get_config_drive_manager.assert_called_once_with() mock_get_config_drive_manager.assert_called_once_with()
mock_manager.get_config_drive_files.assert_called_once_with( mock_manager.get_config_drive_files.assert_called_once_with(
searched_types=self.configdrive_module.CD_TYPES, drive_label=self._drive_label,
searched_locations=self.configdrive_module.CD_LOCATIONS) metadata_file=self._metadata_file,
self.assertEqual(expected_log, self.snatcher.output) searched_types=baseconfigdrive.CD_TYPES,
searched_locations=baseconfigdrive.CD_LOCATIONS)
self.assertTrue(response) self.assertTrue(response)
self.assertEqual(fake_path, self._config_drive._metadata_path) self.assertEqual(fake_path, self._config_drive._metadata_path)
@mock.patch('os.path.normpath')
@mock.patch('os.path.join')
def test_get_data(self, mock_join, mock_normpath):
fake_path = os.path.join('fake', 'path')
with mock.patch('six.moves.builtins.open',
mock.mock_open(read_data='fake data'), create=True):
response = self._config_drive._get_data(fake_path)
self.assertEqual('fake data', response)
mock_join.assert_called_with(
self._config_drive._metadata_path, fake_path)
mock_normpath.assert_called_once_with(mock_join.return_value)
@mock.patch('shutil.rmtree')
def test_cleanup(self, mock_rmtree):
fake_path = os.path.join('fake', 'path')
self._config_drive._metadata_path = fake_path
mock_mgr = mock.Mock()
self._config_drive._mgr = mock_mgr
mock_mgr.target_path = fake_path
with self.snatcher:
self._config_drive.cleanup()
self.assertEqual(["Deleting metadata folder: %r" % fake_path],
self.snatcher.output)
mock_rmtree.assert_called_once_with(fake_path,
ignore_errors=True)
self.assertEqual(None, self._config_drive._metadata_path)