Adds plugin execution stages

Some plugins need to be executed before metadata discovery starts,
e.g. for MTU or NTP settings.

This commit adds support for plugin execution stages.

Co-Authored-By: Cosmin Poieana <cpoieana@cloudbasesolutions.com>
Change-Id: Ia2d7dfc812d04f04269ea3e1cee3e9ed61037e17
Implements: blueprint plugin-stages
This commit is contained in:
Alessandro Pilotti 2015-09-25 14:01:33 +02:00 committed by Cosmin Poieana
parent aa0c4eaecd
commit a855b7fdc4
7 changed files with 182 additions and 47 deletions

View File

@ -62,7 +62,9 @@ class InitManager(object):
def _exec_plugin(self, osutils, service, plugin, instance_id, shared_data):
plugin_name = plugin.get_name()
status = self._get_plugin_status(osutils, instance_id, plugin_name)
status = None
if instance_id is not None:
status = self._get_plugin_status(osutils, instance_id, plugin_name)
if status == plugins_base.PLUGIN_EXECUTION_DONE:
LOG.debug('Plugin \'%s\' execution already done, skipping',
plugin_name)
@ -71,8 +73,9 @@ class InitManager(object):
try:
(status, reboot_required) = plugin.execute(service,
shared_data)
self._set_plugin_status(osutils, instance_id, plugin_name,
status)
if instance_id is not None:
self._set_plugin_status(osutils, instance_id, plugin_name,
status)
return reboot_required
except Exception as ex:
LOG.error('plugin \'%(plugin_name)s\' failed with error '
@ -106,35 +109,54 @@ class InitManager(object):
LOG.info, 'Found new version of cloudbase-init %s')
version.check_latest_version(log_version)
def _handle_plugins_stage(self, osutils, service, instance_id, stage):
plugins_shared_data = {}
reboot_required = False
plugins = plugins_factory.load_plugins(stage)
LOG.info('Executing plugins for stage %r:', stage)
for plugin in plugins:
if self._check_plugin_os_requirements(osutils, plugin):
if self._exec_plugin(osutils, service, plugin,
instance_id, plugins_shared_data):
reboot_required = True
if CONF.allow_reboot:
break
return reboot_required
def configure_host(self):
LOG.info('Cloudbase-Init version: %s', version.get_version())
osutils = osutils_factory.get_os_utils()
osutils.wait_for_boot_completion()
service = metadata_factory.get_metadata_service()
LOG.info('Metadata service loaded: \'%s\'' %
service.get_name())
reboot_required = self._handle_plugins_stage(
osutils, None, None,
plugins_base.PLUGIN_STAGE_PRE_NETWORKING)
self._check_latest_version()
instance_id = service.get_instance_id()
LOG.debug('Instance id: %s', instance_id)
if not (reboot_required and CONF.allow_reboot):
reboot_required = self._handle_plugins_stage(
osutils, None, None,
plugins_base.PLUGIN_STAGE_PRE_METADATA_DISCOVERY)
plugins = plugins_factory.load_plugins()
plugins_shared_data = {}
if not (reboot_required and CONF.allow_reboot):
service = metadata_factory.get_metadata_service()
LOG.info('Metadata service loaded: \'%s\'' %
service.get_name())
reboot_required = False
try:
for plugin in plugins:
if self._check_plugin_os_requirements(osutils, plugin):
if self._exec_plugin(osutils, service, plugin,
instance_id, plugins_shared_data):
reboot_required = True
if CONF.allow_reboot:
break
finally:
service.cleanup()
instance_id = service.get_instance_id()
LOG.debug('Instance id: %s', instance_id)
try:
reboot_required = self._handle_plugins_stage(
osutils, service, instance_id,
plugins_base.PLUGIN_STAGE_MAIN)
finally:
service.cleanup()
if reboot_required and CONF.allow_reboot:
try:

View File

@ -15,8 +15,13 @@
PLUGIN_EXECUTION_DONE = 1
PLUGIN_EXECUTE_ON_NEXT_BOOT = 2
PLUGIN_STAGE_PRE_NETWORKING = "PRE_NETWORKING"
PLUGIN_STAGE_PRE_METADATA_DISCOVERY = "PRE_METADATA_DISCOVERY"
PLUGIN_STAGE_MAIN = "MAIN"
class BasePlugin(object):
execution_stage = PLUGIN_STAGE_MAIN
def get_name(self):
return self.__class__.__name__

View File

@ -80,7 +80,7 @@ OLD_PLUGINS = {
}
def load_plugins():
def load_plugins(stage):
plugins = []
cl = classloader.ClassLoader()
for class_path in CONF.plugins:
@ -93,10 +93,10 @@ def load_plugins():
try:
plugin_cls = cl.load_class(class_path)
if not stage or plugin_cls.execution_stage == stage:
plugin = plugin_cls()
plugins.append(plugin)
except ImportError:
LOG.error("Could not import plugin module %r", class_path)
continue
plugin = plugin_cls()
plugins.append(plugin)
return plugins

View File

@ -35,6 +35,7 @@ LOG = oslo_logging.getLogger(__name__)
class MTUPlugin(base.BasePlugin):
execution_stage = base.PLUGIN_STAGE_PRE_METADATA_DISCOVERY
def execute(self, service, shared_data):
if CONF.mtu_use_dhcp_config:

View File

@ -34,6 +34,7 @@ LOG = oslo_logging.getLogger(__name__)
class NTPClientPlugin(base.BasePlugin):
execution_stage = base.PLUGIN_STAGE_PRE_NETWORKING
def verify_time_service(self, osutils):
"""Verify that the time service is up.

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import itertools
import unittest
try:
@ -20,28 +21,83 @@ except ImportError:
import mock
from oslo_config import cfg
from cloudbaseinit.plugins.common import base
from cloudbaseinit.plugins.common import factory
from cloudbaseinit.tests import testutils
CONF = cfg.CONF
STAGE = {
base.PLUGIN_STAGE_PRE_NETWORKING: [
'cloudbaseinit.plugins.windows.ntpclient.NTPClientPlugin'
],
base.PLUGIN_STAGE_PRE_METADATA_DISCOVERY: [
'cloudbaseinit.plugins.common.mtu.MTUPlugin'
],
base.PLUGIN_STAGE_MAIN: [
'cloudbaseinit.plugins.common.sethostname.SetHostNamePlugin',
'cloudbaseinit.plugins.windows.createuser.CreateUserPlugin',
'cloudbaseinit.plugins.common.networkconfig.NetworkConfigPlugin',
'cloudbaseinit.plugins.windows.licensing.WindowsLicensingPlugin',
'cloudbaseinit.plugins.common.sshpublickeys.'
'SetUserSSHPublicKeysPlugin',
'cloudbaseinit.plugins.windows.extendvolumes.ExtendVolumesPlugin',
'cloudbaseinit.plugins.common.userdata.UserDataPlugin',
'cloudbaseinit.plugins.common.setuserpassword.'
'SetUserPasswordPlugin',
'cloudbaseinit.plugins.windows.winrmlistener.'
'ConfigWinRMListenerPlugin',
'cloudbaseinit.plugins.windows.winrmcertificateauth.'
'ConfigWinRMCertificateAuthPlugin',
'cloudbaseinit.plugins.common.localscripts.LocalScriptsPlugin',
]
}
class PluginFactoryTests(unittest.TestCase):
class TestPluginFactory(unittest.TestCase):
@mock.patch('cloudbaseinit.utils.classloader.ClassLoader.load_class')
def test_load_plugins(self, mock_load_class):
expected = []
for path in CONF.plugins:
expected.append(mock.call(path))
response = factory.load_plugins()
self.assertEqual(expected, mock_load_class.call_args_list)
self.assertTrue(response is not None)
def _test_load_plugins(self, mock_load_class, stage=None):
if stage:
expected_plugins = STAGE.get(stage, [])
else:
expected_plugins = list(itertools.chain(*STAGE.values()))
expected_load = [mock.call(path) for path in CONF.plugins]
side_effect = []
for path in expected_plugins:
plugin = mock.Mock()
plugin.execution_stage = (stage if stage in STAGE.keys() else
None)
plugin.return_value = path
side_effect.append(plugin)
mock_load_class.side_effect = (
side_effect + [mock.Mock() for _ in range(len(expected_load) -
len(side_effect))])
response = factory.load_plugins(stage)
self.assertEqual(expected_load, mock_load_class.call_args_list)
self.assertEqual(sorted(expected_plugins), sorted(response))
def test_load_plugins(self):
self._test_load_plugins()
def test_load_plugins_main(self):
self._test_load_plugins(stage=base.PLUGIN_STAGE_MAIN)
def test_load_plugins_networking(self):
self._test_load_plugins(stage=base.PLUGIN_STAGE_PRE_NETWORKING)
def test_load_plugins_metadata(self):
self._test_load_plugins(stage=base.PLUGIN_STAGE_PRE_METADATA_DISCOVERY)
def test_load_plugins_empty(self):
self._test_load_plugins(stage=mock.Mock())
@testutils.ConfPatcher('plugins', ['missing.plugin'])
def test_load_plugins_plugin_failed(self):
with testutils.LogSnatcher('cloudbaseinit.plugins.'
'common.factory') as snatcher:
plugins = factory.load_plugins()
plugins = factory.load_plugins(None)
self.assertEqual([], plugins)
self.assertEqual(["Could not import plugin module 'missing.plugin'"],
@ -53,7 +109,7 @@ class PluginFactoryTests(unittest.TestCase):
def test_old_plugin_mapping(self, mock_load_class):
with testutils.LogSnatcher('cloudbaseinit.plugins.common.'
'factory') as snatcher:
factory.load_plugins()
factory.load_plugins(None)
expected = [
"Old plugin module 'cloudbaseinit.plugins.windows."

View File

@ -28,7 +28,7 @@ from cloudbaseinit.tests import testutils
CONF = cfg.CONF
class InitManagerTest(unittest.TestCase):
class TestInitManager(unittest.TestCase):
def setUp(self):
self._win32com_mock = mock.MagicMock()
@ -136,19 +136,61 @@ class InitManagerTest(unittest.TestCase):
def test_check_plugin_os_requirements_other_requirenments(self):
self._test_check_plugin_os_requirements(('linux', (5, 2)))
@mock.patch('cloudbaseinit.init.InitManager.'
'_exec_plugin')
@mock.patch('cloudbaseinit.init.InitManager.'
'_check_plugin_os_requirements')
@mock.patch('cloudbaseinit.plugins.common.factory.load_plugins')
def _test_handle_plugins_stage(self, mock_load_plugins,
mock_check_plugin_os_requirements,
mock_exec_plugin,
reboot=True, fast_reboot=True):
stage = "fake stage"
service, instance_id = mock.Mock(), mock.Mock()
plugins = [mock.Mock() for _ in range(3)]
mock_check_plugin_os_requirements.return_value = True
mock_exec_plugin.return_value = reboot
mock_load_plugins.return_value = plugins
requirements_calls = [mock.call(self.osutils, plugin)
for plugin in plugins]
exec_plugin_calls = [mock.call(self.osutils, service, plugin,
instance_id, {})
for plugin in plugins]
with testutils.LogSnatcher('cloudbaseinit.init') as snatcher:
response = self._init._handle_plugins_stage(
self.osutils, service, instance_id, stage)
self.assertEqual(
["Executing plugins for stage '{}':".format(stage)],
snatcher.output)
mock_load_plugins.assert_called_once_with(stage)
idx = 1 if (reboot and fast_reboot) else len(plugins)
mock_check_plugin_os_requirements.assert_has_calls(
requirements_calls[:idx])
mock_exec_plugin.assert_has_calls(exec_plugin_calls[:idx])
self.assertEqual(reboot, response)
def test_handle_plugins_stage(self):
self._test_handle_plugins_stage()
def test_handle_plugins_stage_no_reboot(self):
self._test_handle_plugins_stage(reboot=False, fast_reboot=False)
@testutils.ConfPatcher('allow_reboot', False)
def test_handle_plugins_stage_no_fast_reboot(self):
self._test_handle_plugins_stage(fast_reboot=False)
@mock.patch('cloudbaseinit.init.InitManager'
'._handle_plugins_stage')
@mock.patch('cloudbaseinit.init.InitManager._check_latest_version')
@mock.patch('cloudbaseinit.version.get_version')
@mock.patch('cloudbaseinit.init.InitManager'
'._check_plugin_os_requirements')
@mock.patch('cloudbaseinit.init.InitManager._exec_plugin')
@mock.patch('cloudbaseinit.plugins.common.factory.load_plugins')
@mock.patch('cloudbaseinit.osutils.factory.get_os_utils')
@mock.patch('cloudbaseinit.metadata.factory.get_metadata_service')
def _test_configure_host(self, mock_get_metadata_service,
mock_get_os_utils, mock_load_plugins,
mock_exec_plugin,
mock_check_os_requirements,
mock_get_version, mock_check_latest_version,
mock_handle_plugins_stage,
expected_logging,
version, name, instance_id, reboot=True):
@ -160,24 +202,31 @@ class InitManagerTest(unittest.TestCase):
mock_get_metadata_service.return_value = fake_service
fake_service.get_name.return_value = name
fake_service.get_instance_id.return_value = instance_id
mock_handle_plugins_stage.side_effect = [False, False, True]
stages = [
base.PLUGIN_STAGE_PRE_NETWORKING,
base.PLUGIN_STAGE_PRE_METADATA_DISCOVERY,
base.PLUGIN_STAGE_MAIN]
stage_calls_list = [[self.osutils, None, None, stage]
for stage in stages]
stage_calls_list[2][1] = fake_service
stage_calls_list[2][2] = instance_id
stage_calls = [mock.call(*args) for args in stage_calls_list]
with testutils.LogSnatcher('cloudbaseinit.init') as snatcher:
self._init.configure_host()
self.assertEqual(expected_logging, snatcher.output)
mock_check_latest_version.assert_called_once_with()
self.osutils.wait_for_boot_completion.assert_called_once_with()
mock_get_metadata_service.assert_called_once_with()
fake_service.get_name.assert_called_once_with()
mock_check_os_requirements.assert_called_once_with(self.osutils,
fake_plugin)
mock_exec_plugin.assert_called_once_with(self.osutils, fake_service,
fake_plugin, instance_id, {})
fake_service.get_instance_id.assert_called_once_with()
fake_service.cleanup.assert_called_once_with()
mock_handle_plugins_stage.assert_has_calls(stage_calls)
if reboot:
self.osutils.reboot.assert_called_once_with()
else:
self.assertFalse(self.osutils.reboot.called)
mock_check_latest_version.assert_called_once_with()
def _test_configure_host_with_logging(self, extra_logging, reboot=True):
instance_id = 'fake id'
@ -207,6 +256,7 @@ class InitManagerTest(unittest.TestCase):
reboot=False,
extra_logging=['Plugins execution done',
'Stopping Cloudbase-Init service'])
self.osutils.terminate.assert_called_once_with()
@testutils.ConfPatcher('allow_reboot', True)
def test_configure_host_reboot(self):