From c4f2f26edf5cd1f5880f86853fcc611281605e63 Mon Sep 17 00:00:00 2001 From: Max Lobur Date: Wed, 22 Jan 2014 10:13:39 -0500 Subject: [PATCH] Fix race condition when changing node states To fix race condition we're adding a mechanism of background task execution in conductor. The conductor will get synchrozed AMQP call, reserve lock, start the background task and return empty response to the API. In case when lock cannot be acquired or background tasks pool is full, the exception is sent back to the API and the task is not started. Also the patch adds an ability to control resource locks manually. This feauture used to release lock in the end of background task. Change-Id: I4095de2d82058ea5e052531698e67a0947424435 Closes-Bug: #1259910 --- ironic/api/controllers/v1/node.py | 21 +- ironic/common/exception.py | 6 + ironic/conductor/manager.py | 52 ++++- ironic/conductor/rpcapi.py | 15 +- ironic/conductor/task_manager.py | 113 ++++++++--- ironic/conductor/utils.py | 8 + ironic/tests/api/test_nodes.py | 26 --- .../tests/conductor/test_conductor_utils.py | 189 +++++++++--------- ironic/tests/conductor/test_manager.py | 189 ++++++++++++++++++ ironic/tests/conductor/test_rpcapi.py | 2 +- 10 files changed, 441 insertions(+), 180 deletions(-) diff --git a/ironic/api/controllers/v1/node.py b/ironic/api/controllers/v1/node.py index 6240660abf..50f6654510 100644 --- a/ironic/api/controllers/v1/node.py +++ b/ironic/api/controllers/v1/node.py @@ -120,21 +120,14 @@ class NodeStatesController(rest.RestController): rpc_node = objects.Node.get_by_uuid(pecan.request.context, node_uuid) topic = pecan.request.rpcapi.get_topic_for(rpc_node) - if rpc_node.target_power_state is not None: - raise wsme.exc.ClientSideError(_("Power operation for node %s is " - "already in progress.") % - rpc_node['uuid'], - status_code=409) - # Note that there is a race condition. The node state(s) could change - # by the time the RPC call is made and the TaskManager manager gets a - # lock. - if target in [ir_states.POWER_ON, - ir_states.POWER_OFF, - ir_states.REBOOT]: - pecan.request.rpcapi.change_node_power_state( - pecan.request.context, node_uuid, target, topic) - else: + if target not in [ir_states.POWER_ON, + ir_states.POWER_OFF, + ir_states.REBOOT]: raise exception.InvalidStateRequested(state=target, node=node_uuid) + + pecan.request.rpcapi.change_node_power_state(pecan.request.context, + node_uuid, target, topic) + # FIXME(lucasagomes): Currently WSME doesn't support returning # the Location header. Once it's implemented we should use the # Location to point to the /states subresource of the node so diff --git a/ironic/common/exception.py b/ironic/common/exception.py index 26792bc0c2..64413c10b1 100644 --- a/ironic/common/exception.py +++ b/ironic/common/exception.py @@ -322,3 +322,9 @@ class HTTPNotFound(NotFound): class ConfigNotFound(IronicException): message = _("Could not find config at %(path)s") + + +class NoFreeConductorWorker(IronicException): + message = _('Requested action cannot be performed due to lack of free ' + 'conductor workers.') + code = 503 # Service Unavailable (temporary). diff --git a/ironic/conductor/manager.py b/ironic/conductor/manager.py index 8a7a1b03a8..6ebdd85d67 100644 --- a/ironic/conductor/manager.py +++ b/ironic/conductor/manager.py @@ -43,6 +43,8 @@ building or tearing down the TFTP environment for a node, notifying Neutron of a change, etc. """ +from eventlet import greenpool + from oslo.config import cfg from ironic.common import driver_factory @@ -55,10 +57,12 @@ from ironic.conductor import utils from ironic.db import api as dbapi from ironic.objects import base as objects_base from ironic.openstack.common import excutils +from ironic.openstack.common import lockutils from ironic.openstack.common import log from ironic.openstack.common import periodic_task MANAGER_TOPIC = 'ironic.conductor_manager' +WORKER_SPAWN_lOCK = "conductor_worker_spawn" LOG = log.getLogger(__name__) @@ -115,6 +119,9 @@ class ConductorManager(service.PeriodicService): self.driver_rings = self._get_current_driver_rings() """Consistent hash ring which maps drivers to conductors.""" + self._worker_pool = greenpool.GreenPool(size=CONF.rpc_thread_pool_size) + """GreenPool of background workers for performing tasks async.""" + # TODO(deva): add stop() to call unregister_conductor def initialize_service_hook(self, service): @@ -181,25 +188,35 @@ class ConductorManager(service.PeriodicService): def change_node_power_state(self, context, node_id, new_state): """RPC method to encapsulate changes to a node's state. - Perform actions such as power on, power off. It waits for the power - action to finish, then if successful, it updates the power_state for - the node with the new power state. + Perform actions such as power on, power off. The validation and power + action are performed in background (async). Once the power action is + finished and successful, it updates the power_state for the node with + the new power state. :param context: an admin context. :param node_id: the id or uuid of a node. :param new_state: the desired power state of the node. - :raises: InvalidParameterValue when the wrong state is specified - or the wrong driver info is specified. - :raises: other exceptions by the node's power driver if something - wrong occurred during the power action. + :raises: NoFreeConductorWorker when there is no free worker to start + async task. """ LOG.debug(_("RPC change_node_power_state called for node %(node)s. " "The desired new state is %(state)s.") % {'node': node_id, 'state': new_state}) - with task_manager.acquire(context, node_id, shared=False) as task: - utils.node_power_action(task, task.node, new_state) + task = task_manager.TaskManager(context) + task.acquire_resources(node_id, shared=False) + + try: + # Start requested action in the background. + thread = self._spawn_worker(utils.node_power_action, + task, task.node, new_state) + # Release node lock at the end. + thread.link(lambda t: task.release_resources()) + except Exception: + with excutils.save_and_reraise_exception(): + # Release node lock if error occurred. + task.release_resources() # NOTE(deva): There is a race condition in the RPC API for vendor_passthru. # Between the validate_vendor_action and do_vendor_action calls, it's @@ -481,3 +498,20 @@ class ConductorManager(service.PeriodicService): reason=msg) return node + + @lockutils.synchronized(WORKER_SPAWN_lOCK, 'ironic-') + def _spawn_worker(self, func, *args, **kwargs): + + """Create a greenthread to run func(*args, **kwargs). + + Spawns a greenthread if there are free slots in pool, otherwise raises + exception. Execution control returns immediately to the caller. + + :returns: GreenThread object. + :raises: NoFreeConductorWorker if worker pool is currently full. + + """ + if self._worker_pool.free(): + return self._worker_pool.spawn(func, *args, **kwargs) + else: + raise exception.NoFreeConductorWorker() diff --git a/ironic/conductor/rpcapi.py b/ironic/conductor/rpcapi.py index 2b5a59807d..e375fc25c3 100644 --- a/ironic/conductor/rpcapi.py +++ b/ironic/conductor/rpcapi.py @@ -127,19 +127,22 @@ class ConductorAPI(ironic.openstack.common.rpc.proxy.RpcProxy): topic=topic or self.topic) def change_node_power_state(self, context, node_id, new_state, topic=None): - """Asynchronously change power state of a node. + """Synchronously, acquire lock and start the conductor background task + to change power state of a node. :param context: request context. :param node_id: node id or uuid. :param new_state: one of ironic.common.states power state values :param topic: RPC topic. Defaults to self.topic. + :raises: NoFreeConductorWorker when there is no free worker to start + async task. """ - self.cast(context, - self.make_msg('change_node_power_state', - node_id=node_id, - new_state=new_state), - topic=topic or self.topic) + return self.call(context, + self.make_msg('change_node_power_state', + node_id=node_id, + new_state=new_state), + topic=topic or self.topic) def vendor_passthru(self, context, node_id, driver_method, info, topic=None): diff --git a/ironic/conductor/task_manager.py b/ironic/conductor/task_manager.py index e583e9a224..259c16c6ab 100644 --- a/ironic/conductor/task_manager.py +++ b/ironic/conductor/task_manager.py @@ -52,6 +52,31 @@ a shorthand to access it. For example:: driver = task.node.driver driver.power.power_on(task.node) +If you need to execute task-requiring code in the background thread the +TaskManager provides the interface to manage resource locks manually. Common +approach is to use manager._spawn_worker method and release resources using +link method of the returned thread object. +For example (somewhere inside conductor manager):: + + task = task_manager.TaskManager(context) + task.acquire_resources(node_id, shared=False) + + try: + # Start requested action in the background. + thread = self._spawn_worker(utils.node_power_action, + task, task.node, new_state) + # Release node lock at the end. + thread.link(lambda t: task.release_resources()) + except Exception: + with excutils.save_and_reraise_exception(): + # Release node lock if error occurred. + task.release_resources() + +link callback will be called whenever: + - background task finished with no errors. + - background task has crashed with exception. + - callback was added after the background task has finished or crashed. + Eventually, driver functionality may be wrapped by tasks to facilitate multi-node tasks more easily. Once implemented, it might look like this:: @@ -67,8 +92,6 @@ multi-node tasks more easily. Once implemented, it might look like this:: states = task.get_power_state() """ - -import contextlib from oslo.config import cfg from ironic.common import exception @@ -94,14 +117,8 @@ def require_exclusive_lock(f): return wrapper -@contextlib.contextmanager def acquire(context, node_ids, shared=False, driver_name=None): - """Context manager for acquiring a lock on one or more Nodes. - - Acquire a lock atomically on a non-empty set of nodes. The lock - can be either shared or exclusive. Shared locks may be used for - read-only or non-disruptive actions only, and must be considerate - to what other threads may be doing on the nodes at the same time. + """Shortcut for acquiring a lock on one or more Nodes. :param context: Request context. :param node_ids: A list of ids or uuids of nodes to lock. @@ -111,36 +128,66 @@ def acquire(context, node_ids, shared=False, driver_name=None): :returns: An instance of :class:`TaskManager`. """ - - t = TaskManager(context, shared) - - # instead of generating an exception, DTRT and convert to a list - if not isinstance(node_ids, list): - node_ids = [node_ids] - - try: - if not shared: - t.dbapi.reserve_nodes(CONF.host, node_ids) - for id in node_ids: - t.resources.append(resource_manager.NodeManager.acquire( - id, t, driver_name)) - yield t - finally: - for id in [r.id for r in t.resources]: - resource_manager.NodeManager.release(id, t) - if not shared: - t.dbapi.release_nodes(CONF.host, node_ids) + mgr = TaskManager(context) + mgr.acquire_resources(node_ids, shared, driver_name) + return mgr class TaskManager(object): """Context manager for tasks.""" - def __init__(self, context, shared): + def __init__(self, context): self.context = context - self.shared = shared self.resources = [] + self.shared = False self.dbapi = dbapi.get_instance() + def acquire_resources(self, node_ids, shared=False, driver_name=None): + """Acquire a lock on one or more Nodes. + + Acquire a lock atomically on a non-empty set of nodes. The lock + can be either shared or exclusive. Shared locks may be used for + read-only or non-disruptive actions only, and must be considerate + to what other threads may be doing on the nodes at the same time. + + :param node_ids: A list of ids or uuids of nodes to lock. + :param shared: Boolean indicating whether to take a shared or exclusive + lock. Default: False. + :param driver_name: Name of Driver. Default: None. + + """ + # Do not allow multiple acquire calls. + if self.resources: + raise exception.IronicException( + _("Task manager already has resources.")) + + self.shared = shared + + # instead of generating an exception, DTRT and convert to a list + if not isinstance(node_ids, list): + node_ids = [node_ids] + + if not self.shared: + self.dbapi.reserve_nodes(CONF.host, node_ids) + for node_id in node_ids: + node_mgr = resource_manager.NodeManager.acquire(node_id, self, + driver_name) + self.resources.append(node_mgr) + + def release_resources(self): + """Release all the resources acquired for this TaskManager.""" + # Do not allow multiple release calls. + if not self.resources: + raise exception.IronicException( + _("Task manager doesn't have resources to release.")) + + node_ids = [r.id for r in self.resources] + for node_id in node_ids: + resource_manager.NodeManager.release(node_id, self) + if not self.shared: + self.dbapi.release_nodes(CONF.host, node_ids) + self.resources = [] + @property def node(self): """Special accessor for single-node tasks.""" @@ -167,3 +214,9 @@ class TaskManager(object): else: raise AttributeError(_("Multi-node TaskManager " "can't select single node manager from the list")) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.release_resources() diff --git a/ironic/conductor/utils.py b/ironic/conductor/utils.py index 83fd7e9088..1f91400875 100644 --- a/ironic/conductor/utils.py +++ b/ironic/conductor/utils.py @@ -25,11 +25,19 @@ LOG = log.getLogger(__name__) def node_power_action(task, node, state): """Change power state or reset for a node. + Validate whether the given power transition is possible and perform + power action. + :param task: a TaskManager instance. :param node: the Node object to act upon. :param state: Any power state from ironic.common.states. If the state is 'REBOOT' then a reboot will be attempted, otherwise the node power state is directly set to 'state'. + :raises: InvalidParameterValue when the wrong state is specified + or the wrong driver info is specified. + :raises: other exceptions by the node's power driver if something + wrong occurred during the power action. + """ context = task.context new_state = states.POWER_ON if state == states.REBOOT else state diff --git a/ironic/tests/api/test_nodes.py b/ironic/tests/api/test_nodes.py index 4d2c78f375..718b3ea9a9 100644 --- a/ironic/tests/api/test_nodes.py +++ b/ironic/tests/api/test_nodes.py @@ -26,7 +26,6 @@ from ironic.common import exception from ironic.common import states from ironic.common import utils from ironic.conductor import rpcapi -from ironic import objects from ironic.openstack.common import timeutils from ironic.tests.api import base from ironic.tests.db import utils as dbutils @@ -732,31 +731,6 @@ class TestPut(base.FunctionalTest): states.POWER_ON, 'test-topic') - def test_power_state_in_progress(self): - manager = mock.MagicMock() - with mock.patch.object(objects.Node, 'get_by_uuid') as mock_get_node: - mock_get_node.return_value = self.node - manager.attach_mock(mock_get_node, 'get_by_uuid') - manager.attach_mock(self.mock_cnps, 'change_node_power_state') - expected = [mock.call.get_by_uuid(mock.ANY, self.node['uuid']), - mock.call.change_node_power_state(mock.ANY, - self.node['uuid'], - states.POWER_ON, - 'test-topic')] - - self.put_json('/nodes/%s/states/power' % self.node['uuid'], - {'target': states.POWER_ON}) - self.assertEqual(expected, manager.mock_calls) - - self.dbapi.update_node(self.node['uuid'], - {'target_power_state': 'fake'}) - response = self.put_json('/nodes/%s/states/power' % self.node['uuid'], - {'target': states.POWER_ON}, - expect_errors=True) - self.assertEqual('application/json', response.content_type) - self.assertEqual(409, response.status_code) - self.assertTrue(response.json['error_message']) - def test_power_invalid_state_request(self): ret = self.put_json('/nodes/%s/states/power' % self.node.uuid, {'target': 'not-supported'}, expect_errors=True) diff --git a/ironic/tests/conductor/test_conductor_utils.py b/ironic/tests/conductor/test_conductor_utils.py index ee891c8531..e5fc2d3e56 100644 --- a/ironic/tests/conductor/test_conductor_utils.py +++ b/ironic/tests/conductor/test_conductor_utils.py @@ -16,8 +16,9 @@ import mock from ironic.common import exception from ironic.common import states -from ironic.conductor import manager +from ironic.common import utils as cmn_utils from ironic.conductor import task_manager +from ironic.conductor import utils as conductor_utils from ironic.db import api as dbapi from ironic.openstack.common import context from ironic.tests.conductor import utils as mgr_utils @@ -25,197 +26,195 @@ from ironic.tests.db import base from ironic.tests.db import utils -class PowerActionTestCase(base.DbTestCase): +class NodePowerActionTestCase(base.DbTestCase): def setUp(self): - super(PowerActionTestCase, self).setUp() - self.service = manager.ConductorManager('test-host', 'test-topic') + super(NodePowerActionTestCase, self).setUp() self.context = context.get_admin_context() self.dbapi = dbapi.get_instance() self.driver = mgr_utils.get_mocked_node_manager() + self.task = task_manager.TaskManager(self.context) - def test_change_node_power_state_power_on(self): - """Test if change_node_power_state to turn node power on - is successful or not. - """ - ndict = utils.get_test_node(driver='fake', + def test_node_power_action_power_on(self): + """Test node_power_action to turn node power on.""" + ndict = utils.get_test_node(uuid=cmn_utils.generate_uuid(), + driver='fake', power_state=states.POWER_OFF) node = self.dbapi.create_node(ndict) + self.task.acquire_resources(node.uuid) with mock.patch.object(self.driver.power, 'get_power_state') \ as get_power_mock: get_power_mock.return_value = states.POWER_OFF - self.service.change_node_power_state(self.context, - node['uuid'], states.POWER_ON) + conductor_utils.node_power_action(self.task, self.task.node, + states.POWER_ON) + node.refresh(self.context) get_power_mock.assert_called_once_with(mock.ANY, mock.ANY) self.assertEqual(node['power_state'], states.POWER_ON) - self.assertEqual(node['target_power_state'], None) - self.assertEqual(node['last_error'], None) + self.assertIsNone(node['target_power_state']) + self.assertIsNone(node['last_error']) - def test_change_node_power_state_power_off(self): - """Test if change_node_power_state to turn node power off - is successful or not. - """ - ndict = utils.get_test_node(driver='fake', + def test_node_power_action_power_off(self): + """Test node_power_action to turn node power off.""" + ndict = utils.get_test_node(uuid=cmn_utils.generate_uuid(), + driver='fake', power_state=states.POWER_ON) node = self.dbapi.create_node(ndict) + self.task.acquire_resources(node.uuid) with mock.patch.object(self.driver.power, 'get_power_state') \ as get_power_mock: get_power_mock.return_value = states.POWER_ON - self.service.change_node_power_state(self.context, node['uuid'], - states.POWER_OFF) + conductor_utils.node_power_action(self.task, self.task.node, + states.POWER_OFF) + node.refresh(self.context) get_power_mock.assert_called_once_with(mock.ANY, mock.ANY) self.assertEqual(node['power_state'], states.POWER_OFF) - self.assertEqual(node['target_power_state'], None) - self.assertEqual(node['last_error'], None) + self.assertIsNone(node['target_power_state']) + self.assertIsNone(node['last_error']) - def test_change_node_power_state_reboot(self): + def test_node_power_action_power_reboot(self): """Test for reboot a node.""" - ndict = utils.get_test_node(driver='fake', + ndict = utils.get_test_node(uuid=cmn_utils.generate_uuid(), + driver='fake', power_state=states.POWER_ON) node = self.dbapi.create_node(ndict) + self.task.acquire_resources(node.uuid) with mock.patch.object(self.driver.power, 'reboot') as reboot_mock: - self.service.change_node_power_state(self.context, node['uuid'], - states.REBOOT) + conductor_utils.node_power_action(self.task, self.task.node, + states.REBOOT) + node.refresh(self.context) reboot_mock.assert_called_once() self.assertEqual(node['power_state'], states.POWER_ON) - self.assertEqual(node['target_power_state'], None) - self.assertEqual(node['last_error'], None) + self.assertIsNone(node['target_power_state']) + self.assertIsNone(node['last_error']) - def test_change_node_power_state_invalid_state(self): + def test_node_power_action_invalid_state(self): """Test if an exception is thrown when changing to an invalid power state. """ - ndict = utils.get_test_node(driver='fake', + ndict = utils.get_test_node(uuid=cmn_utils.generate_uuid(), + driver='fake', power_state=states.POWER_ON) node = self.dbapi.create_node(ndict) + self.task.acquire_resources(node.uuid) with mock.patch.object(self.driver.power, 'get_power_state') \ as get_power_mock: get_power_mock.return_value = states.POWER_ON self.assertRaises(exception.InvalidParameterValue, - self.service.change_node_power_state, - self.context, - node['uuid'], - "POWER") + conductor_utils.node_power_action, + self.task, + self.task.node, + "INVALID_POWER_STATE") + node.refresh(self.context) get_power_mock.assert_called_once_with(mock.ANY, mock.ANY) self.assertEqual(node['power_state'], states.POWER_ON) - self.assertEqual(node['target_power_state'], None) - self.assertNotEqual(node['last_error'], None) + self.assertIsNone(node['target_power_state']) + self.assertIsNotNone(node['last_error']) # last_error is cleared when a new transaction happens - self.service.change_node_power_state(self.context, node['uuid'], - states.POWER_OFF) + conductor_utils.node_power_action(self.task, self.task.node, + states.POWER_OFF) node.refresh(self.context) self.assertEqual(node['power_state'], states.POWER_OFF) - self.assertEqual(node['target_power_state'], None) - self.assertEqual(node['last_error'], None) + self.assertIsNone(node['target_power_state']) + self.assertIsNone(node['last_error']) - def test_change_node_power_state_already_locked(self): - """Test if an exception is thrown when applying an exclusive - lock to the node failed. - """ - ndict = utils.get_test_node(driver='fake', - power_state=states.POWER_ON) - node = self.dbapi.create_node(ndict) - - # check if the node is locked - with task_manager.acquire(self.context, node['id'], shared=False): - self.assertRaises(exception.NodeLocked, - self.service.change_node_power_state, - self.context, - node['uuid'], - states.POWER_ON) - node.refresh(self.context) - self.assertEqual(node['power_state'], states.POWER_ON) - self.assertEqual(node['target_power_state'], None) - self.assertEqual(node['last_error'], None) - - def test_change_node_power_state_already_being_processed(self): + def test_node_power_action_already_being_processed(self): """The target_power_state is expected to be None so it isn't checked in the code. This is what happens if it is not None. (Eg, if a conductor had died during a previous power-off attempt and left the target_power_state set to states.POWER_OFF, and the user is attempting to power-off again.) """ - ndict = utils.get_test_node(driver='fake', + ndict = utils.get_test_node(uuid=cmn_utils.generate_uuid(), + driver='fake', power_state=states.POWER_ON, target_power_state=states.POWER_OFF) node = self.dbapi.create_node(ndict) + self.task.acquire_resources(node.uuid) + + conductor_utils.node_power_action(self.task, self.task.node, + states.POWER_OFF) - self.service.change_node_power_state(self.context, node['uuid'], - states.POWER_OFF) node.refresh(self.context) self.assertEqual(node['power_state'], states.POWER_OFF) self.assertEqual(node['target_power_state'], states.NOSTATE) - self.assertEqual(node['last_error'], None) + self.assertIsNone(node['last_error']) - def test_change_node_power_state_in_same_state(self): + def test_node_power_action_in_same_state(self): """Test that we don't try to set the power state if the requested state is the same as the current state. """ - ndict = utils.get_test_node(driver='fake', + ndict = utils.get_test_node(uuid=cmn_utils.generate_uuid(), + driver='fake', last_error='anything but None', power_state=states.POWER_ON) node = self.dbapi.create_node(ndict) + self.task.acquire_resources(node.uuid) with mock.patch.object(self.driver.power, 'get_power_state') \ as get_power_mock: get_power_mock.return_value = states.POWER_ON + with mock.patch.object(self.driver.power, 'set_power_state') \ as set_power_mock: - set_power_mock.side_effect = exception.IronicException() + conductor_utils.node_power_action(self.task, self.task.node, + states.POWER_ON) - self.service.change_node_power_state(self.context, - node['uuid'], - states.POWER_ON) - node.refresh(self.context) - get_power_mock.assert_called_once_with(mock.ANY, mock.ANY) - self.assertFalse(set_power_mock.called) - self.assertEqual(node['power_state'], states.POWER_ON) - self.assertEqual(node['target_power_state'], None) - self.assertEqual(node['last_error'], None) + node.refresh(self.context) + get_power_mock.assert_called_once_with(mock.ANY, mock.ANY) + self.assertFalse(set_power_mock.called, + "set_power_state unexpectedly called") + self.assertEqual(node['power_state'], states.POWER_ON) + self.assertIsNone(node['target_power_state']) + self.assertIsNone(node['last_error']) - def test_change_node_power_state_invalid_driver_info(self): + def test_node_power_action_invalid_driver_info(self): """Test if an exception is thrown when the driver validation fails. """ - ndict = utils.get_test_node(driver='fake', + ndict = utils.get_test_node(uuid=cmn_utils.generate_uuid(), + driver='fake', power_state=states.POWER_ON) node = self.dbapi.create_node(ndict) + self.task.acquire_resources(node.uuid) with mock.patch.object(self.driver.power, 'validate') \ as validate_mock: validate_mock.side_effect = exception.InvalidParameterValue( - 'wrong power driver info') + 'wrong power driver info') self.assertRaises(exception.InvalidParameterValue, - self.service.change_node_power_state, - self.context, - node['uuid'], + conductor_utils.node_power_action, + self.task, + self.task.node, states.POWER_ON) + node.refresh(self.context) validate_mock.assert_called_once_with(mock.ANY) self.assertEqual(node['power_state'], states.POWER_ON) - self.assertEqual(node['target_power_state'], None) - self.assertNotEqual(node['last_error'], None) + self.assertIsNone(node['target_power_state']) + self.assertIsNotNone(node['last_error']) - def test_change_node_power_state_set_power_failure(self): + def test_node_power_action_set_power_failure(self): """Test if an exception is thrown when the set_power call fails. """ - ndict = utils.get_test_node(driver='fake', + ndict = utils.get_test_node(uuid=cmn_utils.generate_uuid(), + driver='fake', power_state=states.POWER_OFF) node = self.dbapi.create_node(ndict) + self.task.acquire_resources(node.uuid) with mock.patch.object(self.driver.power, 'get_power_state') \ as get_power_mock: @@ -224,15 +223,17 @@ class PowerActionTestCase(base.DbTestCase): get_power_mock.return_value = states.POWER_OFF set_power_mock.side_effect = exception.IronicException() - self.assertRaises(exception.IronicException, - self.service.change_node_power_state, - self.context, - node['uuid'], - states.POWER_ON) + self.assertRaises( + exception.IronicException, + conductor_utils.node_power_action, + self.task, + self.task.node, + states.POWER_ON) + node.refresh(self.context) get_power_mock.assert_called_once_with(mock.ANY, mock.ANY) set_power_mock.assert_called_once_with(mock.ANY, mock.ANY, states.POWER_ON) self.assertEqual(node['power_state'], states.POWER_OFF) - self.assertEqual(node['target_power_state'], None) - self.assertNotEqual(node['last_error'], None) + self.assertIsNone(node['target_power_state']) + self.assertIsNotNone(node['last_error']) diff --git a/ironic/tests/conductor/test_manager.py b/ironic/tests/conductor/test_manager.py index 9dac36ffd9..ed16c943eb 100644 --- a/ironic/tests/conductor/test_manager.py +++ b/ironic/tests/conductor/test_manager.py @@ -19,6 +19,8 @@ """Test class for Ironic ManagerService.""" +import time + import mock from oslo.config import cfg from testtools.matchers import HasLength @@ -29,6 +31,7 @@ from ironic.common import states from ironic.common import utils as ironic_utils from ironic.conductor import manager from ironic.conductor import task_manager +from ironic.conductor import utils as conductor_utils from ironic.db import api as dbapi from ironic import objects from ironic.openstack.common import context @@ -207,6 +210,124 @@ class ManagerTestCase(base.DbTestCase): self.assertEqual(state, states.POWER_ON) self.assertEqual(get_power_mock.call_args_list, expected) + def test_change_node_power_state_power_on(self): + # Test change_node_power_state including integration with + # conductor.utils.node_power_action and lower. + n = utils.get_test_node(driver='fake', + power_state=states.POWER_OFF) + db_node = self.dbapi.create_node(n) + self.service.start() + + with mock.patch.object(self.driver.power, 'get_power_state') \ + as get_power_mock: + get_power_mock.return_value = states.POWER_OFF + + self.service.change_node_power_state(self.context, + db_node.uuid, + states.POWER_ON) + self.service._worker_pool.waitall() + + get_power_mock.assert_called_once_with(mock.ANY, mock.ANY) + db_node.refresh(self.context) + self.assertEqual(states.POWER_ON, db_node.power_state) + self.assertIsNone(db_node.target_power_state) + self.assertIsNone(db_node.last_error) + # Verify the reservation has been cleared by + # background task's link callback. + self.assertIsNone(db_node.reservation) + + @mock.patch.object(conductor_utils, 'node_power_action') + def test_change_node_power_state_node_already_locked(self, + pwr_act_mock): + # Test change_node_power_state with mocked + # conductor.utils.node_power_action. + fake_reservation = 'fake-reserv' + pwr_state = states.POWER_ON + n = utils.get_test_node(driver='fake', + power_state=pwr_state, + reservation=fake_reservation) + db_node = self.dbapi.create_node(n) + self.service.start() + + self.assertRaises(exception.NodeLocked, + self.service.change_node_power_state, + self.context, + db_node.uuid, + states.POWER_ON) + # In this test worker should not be spawned, but waiting to make sure + # the below perform_mock assertion is valid. + self.service._worker_pool.waitall() + self.assertFalse(pwr_act_mock.called, 'node_power_action has been ' + 'unexpectedly called.') + # Verify existing reservation wasn't broken. + db_node.refresh(self.context) + self.assertEqual(fake_reservation, db_node.reservation) + + def test_change_node_power_state_worker_pool_full(self): + # Test change_node_power_state including integration with + # conductor.utils.node_power_action and lower. + initial_state = states.POWER_OFF + n = utils.get_test_node(driver='fake', + power_state=initial_state) + db_node = self.dbapi.create_node(n) + self.service.start() + + with mock.patch.object(self.service, '_spawn_worker') \ + as spawn_mock: + spawn_mock.side_effect = exception.NoFreeConductorWorker() + + self.assertRaises(exception.NoFreeConductorWorker, + self.service.change_node_power_state, + self.context, + db_node.uuid, + states.POWER_ON) + + spawn_mock.assert_called_once_with(mock.ANY, mock.ANY, + mock.ANY, mock.ANY) + db_node.refresh(self.context) + self.assertEqual(initial_state, db_node.power_state) + self.assertIsNone(db_node.target_power_state) + self.assertIsNone(db_node.last_error) + # Verify the picked reservation has been cleared due to full pool. + self.assertIsNone(db_node.reservation) + + def test_change_node_power_state_exception_in_background_task( + self): + # Test change_node_power_state including integration with + # conductor.utils.node_power_action and lower. + initial_state = states.POWER_OFF + n = utils.get_test_node(driver='fake', + power_state=initial_state) + db_node = self.dbapi.create_node(n) + self.service.start() + + with mock.patch.object(self.driver.power, 'get_power_state') \ + as get_power_mock: + get_power_mock.return_value = states.POWER_OFF + + with mock.patch.object(self.driver.power, 'set_power_state') \ + as set_power_mock: + new_state = states.POWER_ON + set_power_mock.side_effect = exception.PowerStateFailure( + pstate=new_state + ) + + self.service.change_node_power_state(self.context, + db_node.uuid, + new_state) + self.service._worker_pool.waitall() + + get_power_mock.assert_called_once_with(mock.ANY, mock.ANY) + set_power_mock.assert_called_once_with(mock.ANY, mock.ANY, + new_state) + db_node.refresh(self.context) + self.assertEqual(initial_state, db_node.power_state) + self.assertIsNone(db_node.target_power_state) + self.assertIsNotNone(db_node.last_error) + # Verify the reservation has been cleared by background task's + # link callback despite exception in background task. + self.assertIsNone(db_node.reservation) + def test_update_node(self): ndict = utils.get_test_node(driver='fake', extra={'test': 'one'}) node = self.dbapi.create_node(ndict) @@ -517,3 +638,71 @@ class ManagerTestCase(base.DbTestCase): self.context, node.uuid, False) node.refresh(self.context) self.assertFalse(node.maintenance) + + def test__spawn_worker(self): + func_mock = mock.Mock() + args = (1, 2, "test") + kwargs = dict(kw1='test1', kw2='test2') + self.service.start() + + thread = self.service._spawn_worker(func_mock, *args, **kwargs) + self.service._worker_pool.waitall() + + self.assertIsNotNone(thread) + func_mock.assert_called_once_with(*args, **kwargs) + + # The tests below related to greenthread. We have they to assert our + # assumptions about greenthread behavior. + + def test__spawn_link_callback_added_during_execution(self): + def func(): + time.sleep(1) + link_callback = mock.Mock() + self.service.start() + + thread = self.service._spawn_worker(func) + # func_mock executing at this moment + thread.link(link_callback) + self.service._worker_pool.waitall() + + link_callback.assert_called_once_with(thread) + + def test__spawn_link_callback_added_after_execution(self): + def func(): + pass + link_callback = mock.Mock() + self.service.start() + + thread = self.service._spawn_worker(func) + self.service._worker_pool.waitall() + # func_mock finished at this moment + thread.link(link_callback) + + link_callback.assert_called_once_with(thread) + + def test__spawn_link_callback_exception_inside_thread(self): + def func(): + time.sleep(1) + raise Exception() + link_callback = mock.Mock() + self.service.start() + + thread = self.service._spawn_worker(func) + # func_mock executing at this moment + thread.link(link_callback) + self.service._worker_pool.waitall() + + link_callback.assert_called_once_with(thread) + + def test__spawn_link_callback_added_after_exception_inside_thread(self): + def func(): + raise Exception() + link_callback = mock.Mock() + self.service.start() + + thread = self.service._spawn_worker(func) + self.service._worker_pool.waitall() + # func_mock finished at this moment + thread.link(link_callback) + + link_callback.assert_called_once_with(thread) diff --git a/ironic/tests/conductor/test_rpcapi.py b/ironic/tests/conductor/test_rpcapi.py index 86e0c366da..67d5eadb8b 100644 --- a/ironic/tests/conductor/test_rpcapi.py +++ b/ironic/tests/conductor/test_rpcapi.py @@ -117,7 +117,7 @@ class RPCAPITestCase(base.DbTestCase): def test_change_node_power_state(self): self._test_rpcapi('change_node_power_state', - 'cast', + 'call', node_id=self.fake_node['uuid'], new_state=states.POWER_ON)