Adds userdata write file option

Adds an option to write the userdata to file and
an option to disable processing the userdata content.

Change-Id: I8d2bb0f7bd1ed8b6d5b62a41736fa8cda52b05f4
Implements: blueprint userdata-write-file-option
Co-Authored-By: Stefan Caraiman <scaraiman@cloudbasesolutions.com>
This commit is contained in:
Alessandro Pilotti 2017-02-14 14:34:39 +02:00
parent f4fce4f37c
commit 65f4b143a5
6 changed files with 178 additions and 2 deletions

View File

@ -232,6 +232,16 @@ class GlobalOptions(conf_base.Options):
'trim_enabled', default=False,
help='Enables or disables TRIM delete notifications for '
'the underlying storage device.'),
cfg.BoolOpt(
'process_userdata', default=True,
help='Processes the userdata content based on the type, e.g. '
'executing a PowerShell script'),
cfg.StrOpt(
'userdata_save_path',
default=None,
help='Copies the userdata to the given file path. The path '
'can include environment variables that will be expanded,'
' e.g. "%%SYSTEMDRIVE%%\\CloudbaseInit\\UserData.bin"'),
]
self._cli_options = [

View File

@ -152,3 +152,9 @@ class BaseOSUtils(object):
def enable_trim(self, enable):
"""Enables or disables TRIM delete notifications."""
raise NotImplementedError()
def set_path_admin_acls(self, path):
raise NotImplementedError()
def take_path_ownership(self, path, username=None):
raise NotImplementedError()

View File

@ -1459,3 +1459,27 @@ class WindowsUtils(base.BaseOSUtils):
raise exception.CloudbaseInitException(
'TRIM configurating failed.\nOutput: %(out)s\nError:'
' %(err)s' % {'out': out, 'err': err})
def set_path_admin_acls(self, path):
LOG.debug("Assigning admin ACLs on path: %s", path)
# Sets ACLs for "NT AUTHORITY\SYSTEM" and "BUILTIN\Administrators"
# TODO(alexpilotti): replace with SetNamedSecurityInfo
(out, err, ret_val) = self.execute_system32_process([
"icacls.exe", path, "/inheritance:r", "/grant:r",
"*S-1-5-18:(OI)(CI)F", "*S-1-5-32-544:(OI)(CI)F"])
if ret_val:
raise exception.CloudbaseInitException(
'Failed to set path ACLs.\nOutput: %(out)s\nError:'
' %(err)s' % {'out': out, 'err': err})
def take_path_ownership(self, path, username=None):
if username:
raise NotImplementedError()
LOG.debug("Taking ownership of path: %s", path)
# TODO(alexpilotti): replace with SetNamedSecurityInfo
(out, err, ret_val) = self.execute_system32_process([
"takeown.exe", "/F", path])
if ret_val:
raise exception.CloudbaseInitException(
'Failed to take path ownership.\nOutput: %(out)s\nError:'
' %(err)s' % {'out': out, 'err': err})

View File

@ -13,10 +13,14 @@
# under the License.
import email
import os
from oslo_log import log as oslo_logging
from cloudbaseinit import conf as cloudbaseinit_conf
from cloudbaseinit import exception
from cloudbaseinit.metadata.services import base as metadata_services_base
from cloudbaseinit.osutils import factory as osutils_factory
from cloudbaseinit.plugins.common import base
from cloudbaseinit.plugins.common import execcmd
from cloudbaseinit.plugins.common.userdataplugins import factory
@ -25,6 +29,7 @@ from cloudbaseinit.utils import encoding
from cloudbaseinit.utils import x509constants
CONF = cloudbaseinit_conf.CONF
LOG = oslo_logging.getLogger(__name__)
@ -42,7 +47,35 @@ class UserDataPlugin(base.BasePlugin):
return base.PLUGIN_EXECUTION_DONE, False
LOG.debug('User data content length: %d' % len(user_data))
return self._process_user_data(user_data)
if CONF.userdata_save_path:
user_data_path = os.path.abspath(
os.path.expandvars(CONF.userdata_save_path))
self._write_userdata(user_data, user_data_path)
if CONF.process_userdata:
return self._process_user_data(user_data)
return base.PLUGIN_EXECUTION_DONE, False
@staticmethod
def _write_userdata(user_data, user_data_path):
dir_path = os.path.dirname(user_data_path)
if not os.path.exists(dir_path):
os.makedirs(dir_path)
elif not os.path.isdir(dir_path):
raise exception.CloudbaseInitException(
'Path "%s" exists but it is not a directory' % dir_path)
osutils = osutils_factory.get_os_utils()
osutils.set_path_admin_acls(dir_path)
if os.path.exists(user_data_path):
osutils.take_path_ownership(user_data_path)
os.unlink(user_data_path)
LOG.debug("Writing userdata to: %s", user_data_path)
with open(user_data_path, 'wb') as file:
file.write(user_data)
@staticmethod
def _parse_mime(user_data):
@ -53,7 +86,7 @@ class UserDataPlugin(base.BasePlugin):
def _process_user_data(self, user_data):
plugin_status = base.PLUGIN_EXECUTION_DONE
reboot = False
LOG.debug("Processing userdata")
if user_data.startswith(b'Content-Type: multipart'):
user_data_plugins = factory.load_plugins()
user_handlers = {}

View File

@ -2450,3 +2450,69 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
def test_is_builtin_admin(self):
self._test_is_builtin_admin(sid_exists=True,
sid_startswith=True, sid_endswith=True)
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils.'
'execute_system32_process')
def _test_set_path_admin_acls(self, mock_execute_system32_process,
ret_val=None):
mock_path = mock.sentinel.path
expected_logging = ["Assigning admin ACLs on path: %s" % mock_path]
expected_call = [
"icacls.exe", mock_path, "/inheritance:r", "/grant:r",
"*S-1-5-18:(OI)(CI)F", "*S-1-5-32-544:(OI)(CI)F"]
mock_execute_system32_process.return_value = (
mock.sentinel.out,
mock.sentinel.err,
ret_val)
with self.snatcher:
if ret_val:
self.assertRaises(
exception.CloudbaseInitException,
self._winutils.set_path_admin_acls,
mock_path)
else:
self._winutils.set_path_admin_acls(mock_path)
self.assertEqual(self.snatcher.output, expected_logging)
mock_execute_system32_process.assert_called_once_with(expected_call)
def test_test_set_path_admin_acls(self):
self._test_set_path_admin_acls()
def test_test_set_path_admin_acls_fail(self):
self._test_set_path_admin_acls(ret_val=1)
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils.'
'execute_system32_process')
def _test_take_path_ownership(self, mock_execute_system32_process,
ret_val=None, username=None):
mock_path = mock.sentinel.path
expected_logging = ["Taking ownership of path: %s" % mock_path]
expected_call = ["takeown.exe", "/F", mock_path]
mock_execute_system32_process.return_value = (
mock.sentinel.out,
mock.sentinel.err,
ret_val)
if username:
self.assertRaises(
NotImplementedError, self._winutils.take_path_ownership,
mock_path, username)
return
with self.snatcher:
if ret_val:
self.assertRaises(
exception.CloudbaseInitException,
self._winutils.take_path_ownership,
mock_path, username)
else:
self._winutils.take_path_ownership(mock_path, username)
self.assertEqual(self.snatcher.output, expected_logging)
mock_execute_system32_process.assert_called_once_with(expected_call)
def test_take_path_ownership_username(self):
self._test_take_path_ownership(username="fake")
def test_take_path_ownership_fail(self):
self._test_take_path_ownership(ret_val=1)
def test_take_path_ownership(self):
self._test_take_path_ownership()

View File

@ -23,6 +23,7 @@ try:
except ImportError:
import mock
from cloudbaseinit import exception
from cloudbaseinit.metadata.services import base as metadata_services_base
from cloudbaseinit.plugins.common import base
from cloudbaseinit.plugins.common import userdata
@ -51,6 +52,42 @@ class UserDataPluginTest(unittest.TestCase):
self.fake_data = fake_json_response.get_fake_metadata_json(
'2013-04-04')
@mock.patch('cloudbaseinit.osutils.factory.get_os_utils')
@mock.patch('os.unlink')
@mock.patch('os.path.isdir')
@mock.patch('os.makedirs')
@mock.patch('os.path.dirname')
@mock.patch('os.path.exists')
def _test_write_userdata(self, mock_exists, mock_dirname, mock_makedirs,
mock_is_dir, mock_unlink, mock_get_os_utils,
os_exists_effects=None, is_dir=True):
mock_userdata = str(mock.sentinel.user_data)
mock_user_data_path = str(mock.sentinel.user_data_path)
mock_osutils = mock.Mock()
mock_get_os_utils.return_value = mock_osutils
mock_exists.side_effect = os_exists_effects
mock_is_dir.return_value = is_dir
expected_logs = ["Writing userdata to: %s" % mock_user_data_path]
if not is_dir:
self.assertRaises(
exception.CloudbaseInitException,
self._userdata._write_userdata,
mock_userdata, mock_user_data_path)
return
with mock.patch('cloudbaseinit.plugins.common.userdata'
'.open', create=True):
with testutils.LogSnatcher('cloudbaseinit.plugins.common.'
'userdata') as snatcher:
self._userdata._write_userdata(mock_userdata,
mock_user_data_path)
self.assertEqual(snatcher.output, expected_logs)
def test_write_userdata_fail(self):
self._test_write_userdata(is_dir=False)
def test_write_userdata(self):
self._test_write_userdata(os_exists_effects=(False, True))
@mock.patch('cloudbaseinit.plugins.common.userdata.UserDataPlugin'
'._process_user_data')
def _test_execute(self, mock_process_user_data, ret_val):