# Copyright (c) 2017 Cloudbase Solutions Srl # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import datetime import importlib import os import unittest try: import unittest.mock as mock except ImportError: import mock from cloudbaseinit import conf as cloudbaseinit_conf from cloudbaseinit import exception from cloudbaseinit.plugins.common import base from cloudbaseinit.tests import testutils CONF = cloudbaseinit_conf.CONF MODPATH = "cloudbaseinit.plugins.windows.azureguestagent" class AzureGuestAgentPluginTest(unittest.TestCase): def setUp(self): self.mock_wmi = mock.MagicMock() self._moves_mock = mock.MagicMock() patcher = mock.patch.dict( "sys.modules", { "wmi": self.mock_wmi, "six.moves": self._moves_mock } ) patcher.start() self.addCleanup(patcher.stop) self._winreg_mock = self._moves_mock.winreg self._azureguestagent = importlib.import_module(MODPATH) self._azureagentplugin = self._azureguestagent.AzureGuestAgentPlugin() self.snatcher = testutils.LogSnatcher(MODPATH) def test_check_delete_service(self): mock_osutils = mock.Mock() mock_service_name = mock.sentinel.name self._azureagentplugin._check_delete_service(mock_osutils, mock_service_name) mock_osutils.check_service_exists.assert_called_once_with( mock_service_name) mock_osutils.get_service_status.assert_called_once_with( mock_service_name) mock_osutils.stop_service.assert_called_once_with(mock_service_name, wait=True) mock_osutils.delete_service.assert_called_once_with(mock_service_name) @mock.patch(MODPATH + ".AzureGuestAgentPlugin._check_delete_service") def test_remove_agent_services(self, mock_check_delete_service): mock_osutils = mock.Mock() expected_logs = ["Stopping and removing any existing Azure guest " "agent services"] with self.snatcher: self._azureagentplugin._remove_agent_services(mock_osutils) self.assertEqual(self.snatcher.output, expected_logs) self.assertEqual(mock_check_delete_service.call_count, 3) @mock.patch("shutil.rmtree") @mock.patch("os.path.exists") @mock.patch("os.getenv") def test_remove_azure_dirs(self, mock_os_getenv, mock_exists, mock_rmtree): mock_rmtree.side_effect = (None, Exception) mock_exists.return_value = True with self.snatcher: self._azureagentplugin._remove_azure_dirs() mock_os_getenv.assert_called_with("SystemDrive") self.assertEqual(mock_os_getenv.call_count, 2) self.assertEqual(mock_exists.call_count, 2) self.assertEqual(mock_rmtree.call_count, 2) def test_set_registry_vm_type(self): vm_type = mock.sentinel.vm key_name = "SOFTWARE\\Microsoft\\Windows Azure" self._azureagentplugin._set_registry_vm_type(vm_type) key = self._winreg_mock.CreateKey.return_value.__enter__.return_value self._winreg_mock.CreateKey.assert_called_with( self._winreg_mock.HKEY_LOCAL_MACHINE, key_name) self._winreg_mock.SetValueEx.assert_called_once_with( key, "VMType", 0, self._winreg_mock.REG_SZ, vm_type) def test_set_registry_ga_params(self): fake_version = (1, 2, 3, 4) fake_install_timestamp = datetime.datetime.now() key_name = "SOFTWARE\\Microsoft\\GuestAgent" self._azureagentplugin._set_registry_ga_params(fake_version, fake_install_timestamp) self._winreg_mock.CreateKey.assert_called_with( self._winreg_mock.HKEY_LOCAL_MACHINE, key_name) self.assertEqual(self._winreg_mock.SetValueEx.call_count, 2) @mock.patch(MODPATH + ".AzureGuestAgentPlugin._set_registry_ga_params") @mock.patch(MODPATH + ".AzureGuestAgentPlugin._set_registry_vm_type") def test_configure_rd_agent(self, mock_set_registry_vm_type, mock_set_registry_ga_params): mock_osutils = mock.Mock() fake_ga_path = "C:\\" expected_rd_path = os.path.join(fake_ga_path, self._azureguestagent.RDAGENT_FILENAME) expected_path = os.path.join(fake_ga_path, "TransparentInstaller.dll") self._azureagentplugin._configure_rd_agent(mock_osutils, fake_ga_path) mock_osutils.create_service.assert_called_once_with( self._azureguestagent.SERVICE_NAME_RDAGENT, self._azureguestagent.SERVICE_NAME_RDAGENT, expected_rd_path, mock_osutils.SERVICE_START_MODE_MANUAL) mock_osutils.get_file_version.assert_called_once_with(expected_path) mock_set_registry_vm_type.assert_called_once_with() @mock.patch(MODPATH + ".AzureGuestAgentPlugin._run_logman") def test_stop_event_trace(self, mock_run_logman): mock_osutils = mock.Mock() fake_name = mock.sentinel.event_name res = self._azureagentplugin._stop_event_trace(mock_osutils, fake_name) mock_run_logman.assert_called_once_with(mock_osutils, "stop", fake_name, False) self.assertIsNotNone(res) @mock.patch(MODPATH + ".AzureGuestAgentPlugin._run_logman") def test_delete_event_trace(self, mock_run_logman): mock_osutils = mock.Mock() fake_name = mock.sentinel.event_name res = self._azureagentplugin._delete_event_trace(mock_osutils, fake_name) mock_run_logman.assert_called_once_with(mock_osutils, "delete", fake_name) self.assertIsNotNone(res) def test_run_logman(self): mock_osutils = mock.Mock() fake_action = mock.sentinel.action fake_name = mock.sentinel.cmd_name expected_args = ["logman.exe", "-ets", fake_action, fake_name] mock_osutils.execute_system32_process.return_value = (0, 0, -1) self._azureagentplugin._run_logman(mock_osutils, fake_action, fake_name, True) mock_osutils.execute_system32_process.assert_called_once_with( expected_args) @mock.patch(MODPATH + ".AzureGuestAgentPlugin._stop_event_trace") def test_stop_ga_event_traces(self, mock_stop_event_trace): mock_osutils = mock.Mock() expected_logs = ["Stopping Azure guest agent event traces"] with self.snatcher: self._azureagentplugin._stop_ga_event_traces(mock_osutils) self.assertEqual(mock_stop_event_trace.call_count, 4) self.assertEqual(self.snatcher.output, expected_logs) @mock.patch(MODPATH + ".AzureGuestAgentPlugin._delete_event_trace") def test_delete_ga_event_traces(self, mock_delete_event_trace): mock_osutils = mock.Mock() expected_logs = ["Deleting Azure guest agent event traces"] with self.snatcher: self._azureagentplugin._delete_ga_event_traces(mock_osutils) self.assertEqual(mock_delete_event_trace.call_count, 2) self.assertEqual(self.snatcher.output, expected_logs) @mock.patch("os.path.exists") def _test_get_guest_agent_source_path(self, mock_exists, drives=None, exists=False): mock_osutils = mock.Mock() mock_exists.return_value = exists mock_osutils.get_logical_drives.return_value = drives if not exists: self.assertRaises( exception.CloudbaseInitException, self._azureagentplugin._get_guest_agent_source_path, mock_osutils) return res = self._azureagentplugin._get_guest_agent_source_path(mock_osutils) self.assertIsNotNone(res) def test_get_guest_agent_source_path_no_agent(self): self._test_get_guest_agent_source_path(drives=[]) def test_get_guest_agent_source_path(self): mock_drive = "C:" self._test_get_guest_agent_source_path(drives=[mock_drive], exists=True) def _test_execute(self, provisioning_data=None, expected_logs=None): mock_service = mock.Mock() mock_sharedata = mock.Mock() expected_res = (base.PLUGIN_EXECUTION_DONE, False) (mock_service.get_vm_agent_package_provisioning_data. return_value) = provisioning_data if not provisioning_data or not provisioning_data.get("provision"): with self.snatcher: res = self._azureagentplugin.execute(mock_service, mock_sharedata) (mock_service.get_vm_agent_package_provisioning_data. assert_called_once_with()) self.assertEqual(res, expected_res) self.assertEqual(self.snatcher.output, expected_logs) return def test_execute_no_data(self): expected_logs = ["Azure guest agent provisioning data not present"] self._test_execute(expected_logs=expected_logs) def test_execute_no_provision(self): mock_data = {"provision": None} expected_logs = ["Skipping Azure guest agent provisioning " "as by metadata request"] self._test_execute(provisioning_data=mock_data, expected_logs=expected_logs) def test_get_os_requirements(self): expected_res = ('win32', (6, 1)) res = self._azureagentplugin.get_os_requirements() self.assertEqual(res, expected_res)