Add ephemeral disk plugin

This plugin is implemented for Azure, and its purpose it's to copy a given
file on the specified partition, if the following options are configured:
- ephemeral_disk_volume_label: Ephemeral disk volume label
- ephemeral_disk_volume_mount_point: Ephemeral disk volume mount point
- ephemeral_disk_data_loss_warning_path: Ephemeral disk data loss warning path,
  relative to the ephemeral disk volume path

Change-Id: Ieadf5699f71deb22cb791324e3d32310be1e8bd5
Implements: add-ephemeral-disk-plugin
Co-Authored-By: Paula Madalina Crismaru <pcrismaru@cloudbasesolutions.com>
This commit is contained in:
Alessandro Pilotti 2017-03-06 12:05:41 +02:00 committed by Paula Madalina Crismaru
parent 834f19058c
commit 4d46b6d081
4 changed files with 375 additions and 0 deletions

View File

@ -279,6 +279,18 @@ class GlobalOptions(conf_base.Options):
'metadata_report_provisioning_completed', default=False,
help='Reports to the metadata service that provisioning '
'completed successfully or failed'),
cfg.StrOpt(
'ephemeral_disk_volume_label', default=None,
help='Ephemeral disk volume label, e.g.: "Temporary Storage"'),
cfg.StrOpt(
'ephemeral_disk_volume_mount_point', default=None,
help='Ephemeral disk volume mount point, e.g.:'
'"\\\\?\\GLOBALROOT\\device\\Harddisk1\\Partition1\\"'),
cfg.StrOpt(
'ephemeral_disk_data_loss_warning_path', default=None,
help='Ephemeral disk data loss warning path, relative to the '
'ephemeral disk volume path. E.g.: '
'DATALOSS_WARNING_README.txt'),
]
self._cli_options = [

View File

@ -215,6 +215,9 @@ class BaseMetadataService(object):
"""Check if the metadata provider enforces automatic updates."""
pass
def get_ephemeral_disk_data_loss_warning(self):
raise NotExistingMetadataException()
class BaseHTTPMetadataService(BaseMetadataService):

View File

@ -0,0 +1,94 @@
# Copyright (c) 2017 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from oslo_log import log as oslo_logging
from cloudbaseinit import conf as cloudbaseinit_conf
from cloudbaseinit import exception
from cloudbaseinit.metadata.services import base as metadata_services_base
from cloudbaseinit.osutils import factory as osutils_factory
from cloudbaseinit.plugins.common import base
CONF = cloudbaseinit_conf.CONF
LOG = oslo_logging.getLogger(__name__)
class EphemeralDiskPlugin(base.BasePlugin):
@staticmethod
def _get_ephemeral_disk_volume_by_mount_point(osutils):
if CONF.ephemeral_disk_volume_mount_point:
try:
paths = osutils.get_volume_path_names_by_mount_point(
CONF.ephemeral_disk_volume_mount_point)
if paths:
return paths[0]
except exception.ItemNotFoundException:
LOG.debug("Ephemeral disk mount point not found: %s",
CONF.ephemeral_disk_volume_mount_point)
@staticmethod
def _get_ephemeral_disk_volume_by_label(osutils):
if CONF.ephemeral_disk_volume_label:
logical_drives = osutils.get_logical_drives()
for logical_drive in logical_drives:
label = osutils.get_volume_label(logical_drive)
if not label:
continue
if label.upper() == CONF.ephemeral_disk_volume_label.upper():
return logical_drive
def _get_ephemeral_disk_volume_path(self, osutils):
return (self._get_ephemeral_disk_volume_by_mount_point(osutils) or
self._get_ephemeral_disk_volume_by_label(osutils))
def _set_ephemeral_disk_data_loss_warning(self, service,
disk_warning_path):
LOG.debug("Setting ephemeral disk data loss warning: %s",
disk_warning_path)
data_loss_warning = b''
try:
data_loss_warning = service.get_ephemeral_disk_data_loss_warning()
except metadata_services_base.NotExistingMetadataException:
LOG.debug("Metadata service does not provide an ephemeral "
"disk data loss warning content")
with open(disk_warning_path, 'wb') as f:
f.write(data_loss_warning)
def execute(self, service, shared_data):
try:
service.get_ephemeral_disk_data_loss_warning()
except metadata_services_base.NotExistingMetadataException:
return base.PLUGIN_EXECUTION_DONE, False
osutils = osutils_factory.get_os_utils()
ephemeral_disk_volume_path = self._get_ephemeral_disk_volume_path(
osutils)
if not ephemeral_disk_volume_path:
LOG.info("Ephemeral disk volume not found")
else:
if CONF.ephemeral_disk_data_loss_warning_path:
disk_warning_path = os.path.join(
ephemeral_disk_volume_path,
CONF.ephemeral_disk_data_loss_warning_path)
self._set_ephemeral_disk_data_loss_warning(
service, disk_warning_path)
return base.PLUGIN_EXECUTION_DONE, False
def get_os_requirements(self):
return 'win32', (5, 2)

View File

@ -0,0 +1,266 @@
# Copyright (c) 2017 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import unittest
try:
import unittest.mock as mock
except ImportError:
import mock
from cloudbaseinit import conf as cloudbaseinit_conf
from cloudbaseinit import exception
from cloudbaseinit.metadata.services import base as metadata_services_base
from cloudbaseinit.plugins.common import base
from cloudbaseinit.plugins.common import ephemeraldisk
from cloudbaseinit.tests import testutils
CONF = cloudbaseinit_conf.CONF
MODULE_PATH = 'cloudbaseinit.plugins.common.ephemeraldisk'
class TestEphemeralDiskPlugin(unittest.TestCase):
def setUp(self):
self._disk = ephemeraldisk.EphemeralDiskPlugin()
def _test_get_ephemeral_disk_volume_by_mount_point(self, mount_point,
paths, exception_raise):
mock_osutils = mock.MagicMock()
eclass = exception.ItemNotFoundException
if exception_raise:
mock_osutils.get_volume_path_names_by_mount_point.side_effect = \
eclass()
else:
mock_osutils.get_volume_path_names_by_mount_point.return_value = \
paths
with testutils.ConfPatcher('ephemeral_disk_volume_mount_point',
mount_point):
if exception_raise:
with testutils.LogSnatcher(MODULE_PATH) as snatcher:
self._disk._get_ephemeral_disk_volume_by_mount_point(
mock_osutils)
expected_logging = [
"Ephemeral disk mount point not found: %s" % mount_point
]
else:
result = None
result = self._disk._get_ephemeral_disk_volume_by_mount_point(
mock_osutils)
if mount_point:
(mock_osutils.get_volume_path_names_by_mount_point.
assert_called_once_with(str(mount_point)))
if not exception_raise:
if paths:
self.assertEqual(result, paths[0])
else:
self.assertEqual(result, None)
else:
self.assertEqual(snatcher.output, expected_logging)
def test_get_ephemeral_disk_volume_by_mount_point_no_mount_point(self):
self._test_get_ephemeral_disk_volume_by_mount_point(
mount_point=None, paths=None, exception_raise=False)
def test_get_ephemeral_disk_volume_by_mount_point_no_paths(self):
self._test_get_ephemeral_disk_volume_by_mount_point(
mount_point=True, paths=None, exception_raise=False)
def test_get_ephemeral_disk_volume_by_mount_point_exception(self):
self._test_get_ephemeral_disk_volume_by_mount_point(
mount_point=True, paths=None, exception_raise=True)
def test_get_ephemeral_disk_volume_by_mount_point(self):
self._test_get_ephemeral_disk_volume_by_mount_point(
mount_point=True, paths=[mock.sentinel.paths],
exception_raise=False)
def _test_get_ephemeral_disk_volume_by_label(self, label,
ephemeral_disk_volume_label):
expected_result = None
mock_osutils = mock.MagicMock()
if ephemeral_disk_volume_label:
labels = [None, str(mock.sentinel.label)] * 2
labels += [label] + [None, str(mock.sentinel.label)]
mock_osutils.get_logical_drives.return_value = range(len(labels))
mock_osutils.get_volume_label.side_effect = labels
if label.upper() == ephemeral_disk_volume_label.upper():
expected_result = labels.index(label)
with testutils.ConfPatcher('ephemeral_disk_volume_label',
ephemeral_disk_volume_label):
result = self._disk._get_ephemeral_disk_volume_by_label(
mock_osutils)
self.assertEqual(result, expected_result)
if ephemeral_disk_volume_label:
mock_osutils.get_logical_drives.assert_called_once_with()
if expected_result is not None:
self.assertEqual(mock_osutils.get_volume_label.call_count,
expected_result + 1)
else:
self.assertEqual(mock_osutils.get_volume_label.call_count,
len(labels))
def test_get_ephemeral_disk_volume_by_label_no_disk_volume_label(self):
self._test_get_ephemeral_disk_volume_by_label(
label=None, ephemeral_disk_volume_label=None)
def test_get_ephemeral_disk_volume_by_label_no_label(self):
self._test_get_ephemeral_disk_volume_by_label(
label=str(mock.sentinel.label),
ephemeral_disk_volume_label=str(mock.sentinel.disk_volume_label))
def test_get_ephemeral_disk_volume_by_label(self):
self._test_get_ephemeral_disk_volume_by_label(
label=str(mock.sentinel.same_label),
ephemeral_disk_volume_label=str(mock.sentinel.same_label))
@mock.patch.object(ephemeraldisk.EphemeralDiskPlugin,
'_get_ephemeral_disk_volume_by_label')
@mock.patch.object(ephemeraldisk.EphemeralDiskPlugin,
'_get_ephemeral_disk_volume_by_mount_point')
def _test_get_ephemeral_disk_volume_path(self, mock_by_mount_point,
mock_by_label,
by_mount_point, by_label):
mock_osutils = mock.MagicMock()
mock_by_mount_point.return_value = by_mount_point
mock_by_label.return_value = by_label
result = self._disk._get_ephemeral_disk_volume_path(mock_osutils)
if by_mount_point:
expected_result = mock_by_mount_point.return_value
elif by_label:
expected_result = mock_by_label.return_value
else:
expected_result = None
self.assertEqual(result, expected_result)
mock_by_mount_point.assert_called_once_with(mock_osutils)
if not by_mount_point:
mock_by_label.assert_called_once_with(mock_osutils)
def test_get_ephemeral_disk_volume_path_by_mount_point(self):
self._test_get_ephemeral_disk_volume_path(
by_mount_point=True, by_label=True)
def test_get_ephemeral_disk_volume_path_by_label(self):
self._test_get_ephemeral_disk_volume_path(
by_mount_point=None, by_label=True)
def test_get_ephemeral_disk_volume_path_None(self):
self._test_get_ephemeral_disk_volume_path(
by_mount_point=None, by_label=None)
def _test_set_ephemeral_disk_data_loss_warning(self, exception_raised):
mock_service = mock.MagicMock()
disk_warning_path = str(mock.sentinel.disk_warning_path)
expected_logging = [
"Setting ephemeral disk data loss warning: %s" % disk_warning_path
]
if exception_raised:
eclass = metadata_services_base.NotExistingMetadataException
mock_service.get_ephemeral_disk_data_loss_warning.side_effect = \
eclass
expected_logging += [
"Metadata service does not provide an ephemeral "
"disk data loss warning content"
]
else:
mock_service.get_ephemeral_disk_data_loss_warning.return_value = \
str(mock.sentinel.data_loss_warning)
with testutils.LogSnatcher(MODULE_PATH) as snatcher:
with mock.patch(MODULE_PATH + '.open', mock.mock_open(),
create=True) as mock_open:
self._disk._set_ephemeral_disk_data_loss_warning(
mock_service, disk_warning_path)
self.assertEqual(snatcher.output, expected_logging)
mock_open.assert_called_once_with(disk_warning_path, 'wb')
def test_set_ephemeral_disk_data_loss_warning(self):
self._test_set_ephemeral_disk_data_loss_warning(exception_raised=False)
def test_set_ephemeral_disk_data_loss_warning_exception(self):
self._test_set_ephemeral_disk_data_loss_warning(exception_raised=True)
@mock.patch('os.path.join')
@mock.patch.object(ephemeraldisk.EphemeralDiskPlugin,
'_set_ephemeral_disk_data_loss_warning')
@mock.patch.object(ephemeraldisk.EphemeralDiskPlugin,
'_get_ephemeral_disk_volume_path')
@mock.patch('cloudbaseinit.osutils.factory.get_os_utils')
@mock.patch('cloudbaseinit.plugins.common.base.PLUGIN_EXECUTION_DONE',
mock.sentinel)
def _test_execute(self, mock_get_osutils, mock_get_volume_path,
mock_set_volume_path, mock_join,
not_existing_exception, disk_volume_path,
disk_warning_path):
mock_service = mock.MagicMock()
shared_data = mock.sentinel.shared_data
eclass = metadata_services_base.NotExistingMetadataException
expected_result = base.PLUGIN_EXECUTION_DONE, False
expected_logging = []
if not_existing_exception:
mock_service.get_ephemeral_disk_data_loss_warning.side_effect = \
eclass()
else:
mock_osutils = mock.MagicMock()
mock_get_osutils.return_value = mock_osutils
mock_get_volume_path.return_value = disk_volume_path
if not disk_volume_path:
expected_logging += [
"Ephemeral disk volume not found"
]
else:
mock_join.return_value = disk_warning_path
with testutils.ConfPatcher('ephemeral_disk_data_loss_warning_path',
disk_warning_path):
with testutils.LogSnatcher(MODULE_PATH) as snatcher:
result = self._disk.execute(mock_service, shared_data)
self.assertEqual(result, expected_result)
self.assertEqual(snatcher.output, expected_logging)
(mock_service.get_ephemeral_disk_data_loss_warning.
assert_called_once_with())
if not not_existing_exception:
mock_get_osutils.assert_called_once_with()
mock_get_volume_path.assert_called_once_with(mock_osutils)
if disk_volume_path and disk_warning_path:
mock_join.assert_called_once_with(
disk_volume_path, disk_warning_path)
mock_set_volume_path.assert_called_once_with(
mock_service, disk_warning_path)
def test_execute_no_metadata(self):
self._test_execute(not_existing_exception=True, disk_volume_path=None,
disk_warning_path=None)
def test_execute_no_ephemeral_disk_volume_path(self):
self._test_execute(not_existing_exception=None, disk_volume_path=None,
disk_warning_path=None)
def test_execute_no_ephemeral_disk_data_loss_warning_path(self):
self._test_execute(
not_existing_exception=None,
disk_volume_path=str(mock.sentinel.disk_volume_path),
disk_warning_path=None)
def test_execute(self):
self._test_execute(
not_existing_exception=None,
disk_volume_path=str(mock.sentinel.disk_volume_path),
disk_warning_path=str(mock.sentinel.path))
def test_get_os_requirements(self):
result = self._disk.get_os_requirements()
self.assertEqual(result, ('win32', (5, 2)))