Fix race condition when changing node states

To fix race condition we're adding a mechanism of background task
execution in conductor. The conductor will get synchrozed AMQP call,
reserve lock, start the background task and return empty response to
the API. In case when lock cannot be acquired or background tasks pool
is full, the exception is sent back to the API and the task is not
started.

Also the patch adds an ability to control resource locks manually.
This feauture used to release lock in the end of background task.

Change-Id: I4095de2d82058ea5e052531698e67a0947424435
Closes-Bug: #1259910
This commit is contained in:
Max Lobur 2014-01-22 10:13:39 -05:00
parent cb3246848f
commit c4f2f26edf
10 changed files with 441 additions and 180 deletions

View File

@ -120,21 +120,14 @@ class NodeStatesController(rest.RestController):
rpc_node = objects.Node.get_by_uuid(pecan.request.context, node_uuid)
topic = pecan.request.rpcapi.get_topic_for(rpc_node)
if rpc_node.target_power_state is not None:
raise wsme.exc.ClientSideError(_("Power operation for node %s is "
"already in progress.") %
rpc_node['uuid'],
status_code=409)
# Note that there is a race condition. The node state(s) could change
# by the time the RPC call is made and the TaskManager manager gets a
# lock.
if target in [ir_states.POWER_ON,
ir_states.POWER_OFF,
ir_states.REBOOT]:
pecan.request.rpcapi.change_node_power_state(
pecan.request.context, node_uuid, target, topic)
else:
if target not in [ir_states.POWER_ON,
ir_states.POWER_OFF,
ir_states.REBOOT]:
raise exception.InvalidStateRequested(state=target, node=node_uuid)
pecan.request.rpcapi.change_node_power_state(pecan.request.context,
node_uuid, target, topic)
# FIXME(lucasagomes): Currently WSME doesn't support returning
# the Location header. Once it's implemented we should use the
# Location to point to the /states subresource of the node so

View File

@ -322,3 +322,9 @@ class HTTPNotFound(NotFound):
class ConfigNotFound(IronicException):
message = _("Could not find config at %(path)s")
class NoFreeConductorWorker(IronicException):
message = _('Requested action cannot be performed due to lack of free '
'conductor workers.')
code = 503 # Service Unavailable (temporary).

View File

@ -43,6 +43,8 @@ building or tearing down the TFTP environment for a node, notifying Neutron of
a change, etc.
"""
from eventlet import greenpool
from oslo.config import cfg
from ironic.common import driver_factory
@ -55,10 +57,12 @@ from ironic.conductor import utils
from ironic.db import api as dbapi
from ironic.objects import base as objects_base
from ironic.openstack.common import excutils
from ironic.openstack.common import lockutils
from ironic.openstack.common import log
from ironic.openstack.common import periodic_task
MANAGER_TOPIC = 'ironic.conductor_manager'
WORKER_SPAWN_lOCK = "conductor_worker_spawn"
LOG = log.getLogger(__name__)
@ -115,6 +119,9 @@ class ConductorManager(service.PeriodicService):
self.driver_rings = self._get_current_driver_rings()
"""Consistent hash ring which maps drivers to conductors."""
self._worker_pool = greenpool.GreenPool(size=CONF.rpc_thread_pool_size)
"""GreenPool of background workers for performing tasks async."""
# TODO(deva): add stop() to call unregister_conductor
def initialize_service_hook(self, service):
@ -181,25 +188,35 @@ class ConductorManager(service.PeriodicService):
def change_node_power_state(self, context, node_id, new_state):
"""RPC method to encapsulate changes to a node's state.
Perform actions such as power on, power off. It waits for the power
action to finish, then if successful, it updates the power_state for
the node with the new power state.
Perform actions such as power on, power off. The validation and power
action are performed in background (async). Once the power action is
finished and successful, it updates the power_state for the node with
the new power state.
:param context: an admin context.
:param node_id: the id or uuid of a node.
:param new_state: the desired power state of the node.
:raises: InvalidParameterValue when the wrong state is specified
or the wrong driver info is specified.
:raises: other exceptions by the node's power driver if something
wrong occurred during the power action.
:raises: NoFreeConductorWorker when there is no free worker to start
async task.
"""
LOG.debug(_("RPC change_node_power_state called for node %(node)s. "
"The desired new state is %(state)s.")
% {'node': node_id, 'state': new_state})
with task_manager.acquire(context, node_id, shared=False) as task:
utils.node_power_action(task, task.node, new_state)
task = task_manager.TaskManager(context)
task.acquire_resources(node_id, shared=False)
try:
# Start requested action in the background.
thread = self._spawn_worker(utils.node_power_action,
task, task.node, new_state)
# Release node lock at the end.
thread.link(lambda t: task.release_resources())
except Exception:
with excutils.save_and_reraise_exception():
# Release node lock if error occurred.
task.release_resources()
# NOTE(deva): There is a race condition in the RPC API for vendor_passthru.
# Between the validate_vendor_action and do_vendor_action calls, it's
@ -481,3 +498,20 @@ class ConductorManager(service.PeriodicService):
reason=msg)
return node
@lockutils.synchronized(WORKER_SPAWN_lOCK, 'ironic-')
def _spawn_worker(self, func, *args, **kwargs):
"""Create a greenthread to run func(*args, **kwargs).
Spawns a greenthread if there are free slots in pool, otherwise raises
exception. Execution control returns immediately to the caller.
:returns: GreenThread object.
:raises: NoFreeConductorWorker if worker pool is currently full.
"""
if self._worker_pool.free():
return self._worker_pool.spawn(func, *args, **kwargs)
else:
raise exception.NoFreeConductorWorker()

View File

@ -127,19 +127,22 @@ class ConductorAPI(ironic.openstack.common.rpc.proxy.RpcProxy):
topic=topic or self.topic)
def change_node_power_state(self, context, node_id, new_state, topic=None):
"""Asynchronously change power state of a node.
"""Synchronously, acquire lock and start the conductor background task
to change power state of a node.
:param context: request context.
:param node_id: node id or uuid.
:param new_state: one of ironic.common.states power state values
:param topic: RPC topic. Defaults to self.topic.
:raises: NoFreeConductorWorker when there is no free worker to start
async task.
"""
self.cast(context,
self.make_msg('change_node_power_state',
node_id=node_id,
new_state=new_state),
topic=topic or self.topic)
return self.call(context,
self.make_msg('change_node_power_state',
node_id=node_id,
new_state=new_state),
topic=topic or self.topic)
def vendor_passthru(self, context, node_id, driver_method, info,
topic=None):

View File

@ -52,6 +52,31 @@ a shorthand to access it. For example::
driver = task.node.driver
driver.power.power_on(task.node)
If you need to execute task-requiring code in the background thread the
TaskManager provides the interface to manage resource locks manually. Common
approach is to use manager._spawn_worker method and release resources using
link method of the returned thread object.
For example (somewhere inside conductor manager)::
task = task_manager.TaskManager(context)
task.acquire_resources(node_id, shared=False)
try:
# Start requested action in the background.
thread = self._spawn_worker(utils.node_power_action,
task, task.node, new_state)
# Release node lock at the end.
thread.link(lambda t: task.release_resources())
except Exception:
with excutils.save_and_reraise_exception():
# Release node lock if error occurred.
task.release_resources()
link callback will be called whenever:
- background task finished with no errors.
- background task has crashed with exception.
- callback was added after the background task has finished or crashed.
Eventually, driver functionality may be wrapped by tasks to facilitate
multi-node tasks more easily. Once implemented, it might look like this::
@ -67,8 +92,6 @@ multi-node tasks more easily. Once implemented, it might look like this::
states = task.get_power_state()
"""
import contextlib
from oslo.config import cfg
from ironic.common import exception
@ -94,14 +117,8 @@ def require_exclusive_lock(f):
return wrapper
@contextlib.contextmanager
def acquire(context, node_ids, shared=False, driver_name=None):
"""Context manager for acquiring a lock on one or more Nodes.
Acquire a lock atomically on a non-empty set of nodes. The lock
can be either shared or exclusive. Shared locks may be used for
read-only or non-disruptive actions only, and must be considerate
to what other threads may be doing on the nodes at the same time.
"""Shortcut for acquiring a lock on one or more Nodes.
:param context: Request context.
:param node_ids: A list of ids or uuids of nodes to lock.
@ -111,36 +128,66 @@ def acquire(context, node_ids, shared=False, driver_name=None):
:returns: An instance of :class:`TaskManager`.
"""
t = TaskManager(context, shared)
# instead of generating an exception, DTRT and convert to a list
if not isinstance(node_ids, list):
node_ids = [node_ids]
try:
if not shared:
t.dbapi.reserve_nodes(CONF.host, node_ids)
for id in node_ids:
t.resources.append(resource_manager.NodeManager.acquire(
id, t, driver_name))
yield t
finally:
for id in [r.id for r in t.resources]:
resource_manager.NodeManager.release(id, t)
if not shared:
t.dbapi.release_nodes(CONF.host, node_ids)
mgr = TaskManager(context)
mgr.acquire_resources(node_ids, shared, driver_name)
return mgr
class TaskManager(object):
"""Context manager for tasks."""
def __init__(self, context, shared):
def __init__(self, context):
self.context = context
self.shared = shared
self.resources = []
self.shared = False
self.dbapi = dbapi.get_instance()
def acquire_resources(self, node_ids, shared=False, driver_name=None):
"""Acquire a lock on one or more Nodes.
Acquire a lock atomically on a non-empty set of nodes. The lock
can be either shared or exclusive. Shared locks may be used for
read-only or non-disruptive actions only, and must be considerate
to what other threads may be doing on the nodes at the same time.
:param node_ids: A list of ids or uuids of nodes to lock.
:param shared: Boolean indicating whether to take a shared or exclusive
lock. Default: False.
:param driver_name: Name of Driver. Default: None.
"""
# Do not allow multiple acquire calls.
if self.resources:
raise exception.IronicException(
_("Task manager already has resources."))
self.shared = shared
# instead of generating an exception, DTRT and convert to a list
if not isinstance(node_ids, list):
node_ids = [node_ids]
if not self.shared:
self.dbapi.reserve_nodes(CONF.host, node_ids)
for node_id in node_ids:
node_mgr = resource_manager.NodeManager.acquire(node_id, self,
driver_name)
self.resources.append(node_mgr)
def release_resources(self):
"""Release all the resources acquired for this TaskManager."""
# Do not allow multiple release calls.
if not self.resources:
raise exception.IronicException(
_("Task manager doesn't have resources to release."))
node_ids = [r.id for r in self.resources]
for node_id in node_ids:
resource_manager.NodeManager.release(node_id, self)
if not self.shared:
self.dbapi.release_nodes(CONF.host, node_ids)
self.resources = []
@property
def node(self):
"""Special accessor for single-node tasks."""
@ -167,3 +214,9 @@ class TaskManager(object):
else:
raise AttributeError(_("Multi-node TaskManager "
"can't select single node manager from the list"))
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.release_resources()

View File

@ -25,11 +25,19 @@ LOG = log.getLogger(__name__)
def node_power_action(task, node, state):
"""Change power state or reset for a node.
Validate whether the given power transition is possible and perform
power action.
:param task: a TaskManager instance.
:param node: the Node object to act upon.
:param state: Any power state from ironic.common.states. If the
state is 'REBOOT' then a reboot will be attempted, otherwise
the node power state is directly set to 'state'.
:raises: InvalidParameterValue when the wrong state is specified
or the wrong driver info is specified.
:raises: other exceptions by the node's power driver if something
wrong occurred during the power action.
"""
context = task.context
new_state = states.POWER_ON if state == states.REBOOT else state

View File

@ -26,7 +26,6 @@ from ironic.common import exception
from ironic.common import states
from ironic.common import utils
from ironic.conductor import rpcapi
from ironic import objects
from ironic.openstack.common import timeutils
from ironic.tests.api import base
from ironic.tests.db import utils as dbutils
@ -732,31 +731,6 @@ class TestPut(base.FunctionalTest):
states.POWER_ON,
'test-topic')
def test_power_state_in_progress(self):
manager = mock.MagicMock()
with mock.patch.object(objects.Node, 'get_by_uuid') as mock_get_node:
mock_get_node.return_value = self.node
manager.attach_mock(mock_get_node, 'get_by_uuid')
manager.attach_mock(self.mock_cnps, 'change_node_power_state')
expected = [mock.call.get_by_uuid(mock.ANY, self.node['uuid']),
mock.call.change_node_power_state(mock.ANY,
self.node['uuid'],
states.POWER_ON,
'test-topic')]
self.put_json('/nodes/%s/states/power' % self.node['uuid'],
{'target': states.POWER_ON})
self.assertEqual(expected, manager.mock_calls)
self.dbapi.update_node(self.node['uuid'],
{'target_power_state': 'fake'})
response = self.put_json('/nodes/%s/states/power' % self.node['uuid'],
{'target': states.POWER_ON},
expect_errors=True)
self.assertEqual('application/json', response.content_type)
self.assertEqual(409, response.status_code)
self.assertTrue(response.json['error_message'])
def test_power_invalid_state_request(self):
ret = self.put_json('/nodes/%s/states/power' % self.node.uuid,
{'target': 'not-supported'}, expect_errors=True)

View File

@ -16,8 +16,9 @@ import mock
from ironic.common import exception
from ironic.common import states
from ironic.conductor import manager
from ironic.common import utils as cmn_utils
from ironic.conductor import task_manager
from ironic.conductor import utils as conductor_utils
from ironic.db import api as dbapi
from ironic.openstack.common import context
from ironic.tests.conductor import utils as mgr_utils
@ -25,197 +26,195 @@ from ironic.tests.db import base
from ironic.tests.db import utils
class PowerActionTestCase(base.DbTestCase):
class NodePowerActionTestCase(base.DbTestCase):
def setUp(self):
super(PowerActionTestCase, self).setUp()
self.service = manager.ConductorManager('test-host', 'test-topic')
super(NodePowerActionTestCase, self).setUp()
self.context = context.get_admin_context()
self.dbapi = dbapi.get_instance()
self.driver = mgr_utils.get_mocked_node_manager()
self.task = task_manager.TaskManager(self.context)
def test_change_node_power_state_power_on(self):
"""Test if change_node_power_state to turn node power on
is successful or not.
"""
ndict = utils.get_test_node(driver='fake',
def test_node_power_action_power_on(self):
"""Test node_power_action to turn node power on."""
ndict = utils.get_test_node(uuid=cmn_utils.generate_uuid(),
driver='fake',
power_state=states.POWER_OFF)
node = self.dbapi.create_node(ndict)
self.task.acquire_resources(node.uuid)
with mock.patch.object(self.driver.power, 'get_power_state') \
as get_power_mock:
get_power_mock.return_value = states.POWER_OFF
self.service.change_node_power_state(self.context,
node['uuid'], states.POWER_ON)
conductor_utils.node_power_action(self.task, self.task.node,
states.POWER_ON)
node.refresh(self.context)
get_power_mock.assert_called_once_with(mock.ANY, mock.ANY)
self.assertEqual(node['power_state'], states.POWER_ON)
self.assertEqual(node['target_power_state'], None)
self.assertEqual(node['last_error'], None)
self.assertIsNone(node['target_power_state'])
self.assertIsNone(node['last_error'])
def test_change_node_power_state_power_off(self):
"""Test if change_node_power_state to turn node power off
is successful or not.
"""
ndict = utils.get_test_node(driver='fake',
def test_node_power_action_power_off(self):
"""Test node_power_action to turn node power off."""
ndict = utils.get_test_node(uuid=cmn_utils.generate_uuid(),
driver='fake',
power_state=states.POWER_ON)
node = self.dbapi.create_node(ndict)
self.task.acquire_resources(node.uuid)
with mock.patch.object(self.driver.power, 'get_power_state') \
as get_power_mock:
get_power_mock.return_value = states.POWER_ON
self.service.change_node_power_state(self.context, node['uuid'],
states.POWER_OFF)
conductor_utils.node_power_action(self.task, self.task.node,
states.POWER_OFF)
node.refresh(self.context)
get_power_mock.assert_called_once_with(mock.ANY, mock.ANY)
self.assertEqual(node['power_state'], states.POWER_OFF)
self.assertEqual(node['target_power_state'], None)
self.assertEqual(node['last_error'], None)
self.assertIsNone(node['target_power_state'])
self.assertIsNone(node['last_error'])
def test_change_node_power_state_reboot(self):
def test_node_power_action_power_reboot(self):
"""Test for reboot a node."""
ndict = utils.get_test_node(driver='fake',
ndict = utils.get_test_node(uuid=cmn_utils.generate_uuid(),
driver='fake',
power_state=states.POWER_ON)
node = self.dbapi.create_node(ndict)
self.task.acquire_resources(node.uuid)
with mock.patch.object(self.driver.power, 'reboot') as reboot_mock:
self.service.change_node_power_state(self.context, node['uuid'],
states.REBOOT)
conductor_utils.node_power_action(self.task, self.task.node,
states.REBOOT)
node.refresh(self.context)
reboot_mock.assert_called_once()
self.assertEqual(node['power_state'], states.POWER_ON)
self.assertEqual(node['target_power_state'], None)
self.assertEqual(node['last_error'], None)
self.assertIsNone(node['target_power_state'])
self.assertIsNone(node['last_error'])
def test_change_node_power_state_invalid_state(self):
def test_node_power_action_invalid_state(self):
"""Test if an exception is thrown when changing to an invalid
power state.
"""
ndict = utils.get_test_node(driver='fake',
ndict = utils.get_test_node(uuid=cmn_utils.generate_uuid(),
driver='fake',
power_state=states.POWER_ON)
node = self.dbapi.create_node(ndict)
self.task.acquire_resources(node.uuid)
with mock.patch.object(self.driver.power, 'get_power_state') \
as get_power_mock:
get_power_mock.return_value = states.POWER_ON
self.assertRaises(exception.InvalidParameterValue,
self.service.change_node_power_state,
self.context,
node['uuid'],
"POWER")
conductor_utils.node_power_action,
self.task,
self.task.node,
"INVALID_POWER_STATE")
node.refresh(self.context)
get_power_mock.assert_called_once_with(mock.ANY, mock.ANY)
self.assertEqual(node['power_state'], states.POWER_ON)
self.assertEqual(node['target_power_state'], None)
self.assertNotEqual(node['last_error'], None)
self.assertIsNone(node['target_power_state'])
self.assertIsNotNone(node['last_error'])
# last_error is cleared when a new transaction happens
self.service.change_node_power_state(self.context, node['uuid'],
states.POWER_OFF)
conductor_utils.node_power_action(self.task, self.task.node,
states.POWER_OFF)
node.refresh(self.context)
self.assertEqual(node['power_state'], states.POWER_OFF)
self.assertEqual(node['target_power_state'], None)
self.assertEqual(node['last_error'], None)
self.assertIsNone(node['target_power_state'])
self.assertIsNone(node['last_error'])
def test_change_node_power_state_already_locked(self):
"""Test if an exception is thrown when applying an exclusive
lock to the node failed.
"""
ndict = utils.get_test_node(driver='fake',
power_state=states.POWER_ON)
node = self.dbapi.create_node(ndict)
# check if the node is locked
with task_manager.acquire(self.context, node['id'], shared=False):
self.assertRaises(exception.NodeLocked,
self.service.change_node_power_state,
self.context,
node['uuid'],
states.POWER_ON)
node.refresh(self.context)
self.assertEqual(node['power_state'], states.POWER_ON)
self.assertEqual(node['target_power_state'], None)
self.assertEqual(node['last_error'], None)
def test_change_node_power_state_already_being_processed(self):
def test_node_power_action_already_being_processed(self):
"""The target_power_state is expected to be None so it isn't
checked in the code. This is what happens if it is not None.
(Eg, if a conductor had died during a previous power-off
attempt and left the target_power_state set to states.POWER_OFF,
and the user is attempting to power-off again.)
"""
ndict = utils.get_test_node(driver='fake',
ndict = utils.get_test_node(uuid=cmn_utils.generate_uuid(),
driver='fake',
power_state=states.POWER_ON,
target_power_state=states.POWER_OFF)
node = self.dbapi.create_node(ndict)
self.task.acquire_resources(node.uuid)
conductor_utils.node_power_action(self.task, self.task.node,
states.POWER_OFF)
self.service.change_node_power_state(self.context, node['uuid'],
states.POWER_OFF)
node.refresh(self.context)
self.assertEqual(node['power_state'], states.POWER_OFF)
self.assertEqual(node['target_power_state'], states.NOSTATE)
self.assertEqual(node['last_error'], None)
self.assertIsNone(node['last_error'])
def test_change_node_power_state_in_same_state(self):
def test_node_power_action_in_same_state(self):
"""Test that we don't try to set the power state if the requested
state is the same as the current state.
"""
ndict = utils.get_test_node(driver='fake',
ndict = utils.get_test_node(uuid=cmn_utils.generate_uuid(),
driver='fake',
last_error='anything but None',
power_state=states.POWER_ON)
node = self.dbapi.create_node(ndict)
self.task.acquire_resources(node.uuid)
with mock.patch.object(self.driver.power, 'get_power_state') \
as get_power_mock:
get_power_mock.return_value = states.POWER_ON
with mock.patch.object(self.driver.power, 'set_power_state') \
as set_power_mock:
set_power_mock.side_effect = exception.IronicException()
conductor_utils.node_power_action(self.task, self.task.node,
states.POWER_ON)
self.service.change_node_power_state(self.context,
node['uuid'],
states.POWER_ON)
node.refresh(self.context)
get_power_mock.assert_called_once_with(mock.ANY, mock.ANY)
self.assertFalse(set_power_mock.called)
self.assertEqual(node['power_state'], states.POWER_ON)
self.assertEqual(node['target_power_state'], None)
self.assertEqual(node['last_error'], None)
node.refresh(self.context)
get_power_mock.assert_called_once_with(mock.ANY, mock.ANY)
self.assertFalse(set_power_mock.called,
"set_power_state unexpectedly called")
self.assertEqual(node['power_state'], states.POWER_ON)
self.assertIsNone(node['target_power_state'])
self.assertIsNone(node['last_error'])
def test_change_node_power_state_invalid_driver_info(self):
def test_node_power_action_invalid_driver_info(self):
"""Test if an exception is thrown when the driver validation
fails.
"""
ndict = utils.get_test_node(driver='fake',
ndict = utils.get_test_node(uuid=cmn_utils.generate_uuid(),
driver='fake',
power_state=states.POWER_ON)
node = self.dbapi.create_node(ndict)
self.task.acquire_resources(node.uuid)
with mock.patch.object(self.driver.power, 'validate') \
as validate_mock:
validate_mock.side_effect = exception.InvalidParameterValue(
'wrong power driver info')
'wrong power driver info')
self.assertRaises(exception.InvalidParameterValue,
self.service.change_node_power_state,
self.context,
node['uuid'],
conductor_utils.node_power_action,
self.task,
self.task.node,
states.POWER_ON)
node.refresh(self.context)
validate_mock.assert_called_once_with(mock.ANY)
self.assertEqual(node['power_state'], states.POWER_ON)
self.assertEqual(node['target_power_state'], None)
self.assertNotEqual(node['last_error'], None)
self.assertIsNone(node['target_power_state'])
self.assertIsNotNone(node['last_error'])
def test_change_node_power_state_set_power_failure(self):
def test_node_power_action_set_power_failure(self):
"""Test if an exception is thrown when the set_power call
fails.
"""
ndict = utils.get_test_node(driver='fake',
ndict = utils.get_test_node(uuid=cmn_utils.generate_uuid(),
driver='fake',
power_state=states.POWER_OFF)
node = self.dbapi.create_node(ndict)
self.task.acquire_resources(node.uuid)
with mock.patch.object(self.driver.power, 'get_power_state') \
as get_power_mock:
@ -224,15 +223,17 @@ class PowerActionTestCase(base.DbTestCase):
get_power_mock.return_value = states.POWER_OFF
set_power_mock.side_effect = exception.IronicException()
self.assertRaises(exception.IronicException,
self.service.change_node_power_state,
self.context,
node['uuid'],
states.POWER_ON)
self.assertRaises(
exception.IronicException,
conductor_utils.node_power_action,
self.task,
self.task.node,
states.POWER_ON)
node.refresh(self.context)
get_power_mock.assert_called_once_with(mock.ANY, mock.ANY)
set_power_mock.assert_called_once_with(mock.ANY, mock.ANY,
states.POWER_ON)
self.assertEqual(node['power_state'], states.POWER_OFF)
self.assertEqual(node['target_power_state'], None)
self.assertNotEqual(node['last_error'], None)
self.assertIsNone(node['target_power_state'])
self.assertIsNotNone(node['last_error'])

View File

@ -19,6 +19,8 @@
"""Test class for Ironic ManagerService."""
import time
import mock
from oslo.config import cfg
from testtools.matchers import HasLength
@ -29,6 +31,7 @@ from ironic.common import states
from ironic.common import utils as ironic_utils
from ironic.conductor import manager
from ironic.conductor import task_manager
from ironic.conductor import utils as conductor_utils
from ironic.db import api as dbapi
from ironic import objects
from ironic.openstack.common import context
@ -207,6 +210,124 @@ class ManagerTestCase(base.DbTestCase):
self.assertEqual(state, states.POWER_ON)
self.assertEqual(get_power_mock.call_args_list, expected)
def test_change_node_power_state_power_on(self):
# Test change_node_power_state including integration with
# conductor.utils.node_power_action and lower.
n = utils.get_test_node(driver='fake',
power_state=states.POWER_OFF)
db_node = self.dbapi.create_node(n)
self.service.start()
with mock.patch.object(self.driver.power, 'get_power_state') \
as get_power_mock:
get_power_mock.return_value = states.POWER_OFF
self.service.change_node_power_state(self.context,
db_node.uuid,
states.POWER_ON)
self.service._worker_pool.waitall()
get_power_mock.assert_called_once_with(mock.ANY, mock.ANY)
db_node.refresh(self.context)
self.assertEqual(states.POWER_ON, db_node.power_state)
self.assertIsNone(db_node.target_power_state)
self.assertIsNone(db_node.last_error)
# Verify the reservation has been cleared by
# background task's link callback.
self.assertIsNone(db_node.reservation)
@mock.patch.object(conductor_utils, 'node_power_action')
def test_change_node_power_state_node_already_locked(self,
pwr_act_mock):
# Test change_node_power_state with mocked
# conductor.utils.node_power_action.
fake_reservation = 'fake-reserv'
pwr_state = states.POWER_ON
n = utils.get_test_node(driver='fake',
power_state=pwr_state,
reservation=fake_reservation)
db_node = self.dbapi.create_node(n)
self.service.start()
self.assertRaises(exception.NodeLocked,
self.service.change_node_power_state,
self.context,
db_node.uuid,
states.POWER_ON)
# In this test worker should not be spawned, but waiting to make sure
# the below perform_mock assertion is valid.
self.service._worker_pool.waitall()
self.assertFalse(pwr_act_mock.called, 'node_power_action has been '
'unexpectedly called.')
# Verify existing reservation wasn't broken.
db_node.refresh(self.context)
self.assertEqual(fake_reservation, db_node.reservation)
def test_change_node_power_state_worker_pool_full(self):
# Test change_node_power_state including integration with
# conductor.utils.node_power_action and lower.
initial_state = states.POWER_OFF
n = utils.get_test_node(driver='fake',
power_state=initial_state)
db_node = self.dbapi.create_node(n)
self.service.start()
with mock.patch.object(self.service, '_spawn_worker') \
as spawn_mock:
spawn_mock.side_effect = exception.NoFreeConductorWorker()
self.assertRaises(exception.NoFreeConductorWorker,
self.service.change_node_power_state,
self.context,
db_node.uuid,
states.POWER_ON)
spawn_mock.assert_called_once_with(mock.ANY, mock.ANY,
mock.ANY, mock.ANY)
db_node.refresh(self.context)
self.assertEqual(initial_state, db_node.power_state)
self.assertIsNone(db_node.target_power_state)
self.assertIsNone(db_node.last_error)
# Verify the picked reservation has been cleared due to full pool.
self.assertIsNone(db_node.reservation)
def test_change_node_power_state_exception_in_background_task(
self):
# Test change_node_power_state including integration with
# conductor.utils.node_power_action and lower.
initial_state = states.POWER_OFF
n = utils.get_test_node(driver='fake',
power_state=initial_state)
db_node = self.dbapi.create_node(n)
self.service.start()
with mock.patch.object(self.driver.power, 'get_power_state') \
as get_power_mock:
get_power_mock.return_value = states.POWER_OFF
with mock.patch.object(self.driver.power, 'set_power_state') \
as set_power_mock:
new_state = states.POWER_ON
set_power_mock.side_effect = exception.PowerStateFailure(
pstate=new_state
)
self.service.change_node_power_state(self.context,
db_node.uuid,
new_state)
self.service._worker_pool.waitall()
get_power_mock.assert_called_once_with(mock.ANY, mock.ANY)
set_power_mock.assert_called_once_with(mock.ANY, mock.ANY,
new_state)
db_node.refresh(self.context)
self.assertEqual(initial_state, db_node.power_state)
self.assertIsNone(db_node.target_power_state)
self.assertIsNotNone(db_node.last_error)
# Verify the reservation has been cleared by background task's
# link callback despite exception in background task.
self.assertIsNone(db_node.reservation)
def test_update_node(self):
ndict = utils.get_test_node(driver='fake', extra={'test': 'one'})
node = self.dbapi.create_node(ndict)
@ -517,3 +638,71 @@ class ManagerTestCase(base.DbTestCase):
self.context, node.uuid, False)
node.refresh(self.context)
self.assertFalse(node.maintenance)
def test__spawn_worker(self):
func_mock = mock.Mock()
args = (1, 2, "test")
kwargs = dict(kw1='test1', kw2='test2')
self.service.start()
thread = self.service._spawn_worker(func_mock, *args, **kwargs)
self.service._worker_pool.waitall()
self.assertIsNotNone(thread)
func_mock.assert_called_once_with(*args, **kwargs)
# The tests below related to greenthread. We have they to assert our
# assumptions about greenthread behavior.
def test__spawn_link_callback_added_during_execution(self):
def func():
time.sleep(1)
link_callback = mock.Mock()
self.service.start()
thread = self.service._spawn_worker(func)
# func_mock executing at this moment
thread.link(link_callback)
self.service._worker_pool.waitall()
link_callback.assert_called_once_with(thread)
def test__spawn_link_callback_added_after_execution(self):
def func():
pass
link_callback = mock.Mock()
self.service.start()
thread = self.service._spawn_worker(func)
self.service._worker_pool.waitall()
# func_mock finished at this moment
thread.link(link_callback)
link_callback.assert_called_once_with(thread)
def test__spawn_link_callback_exception_inside_thread(self):
def func():
time.sleep(1)
raise Exception()
link_callback = mock.Mock()
self.service.start()
thread = self.service._spawn_worker(func)
# func_mock executing at this moment
thread.link(link_callback)
self.service._worker_pool.waitall()
link_callback.assert_called_once_with(thread)
def test__spawn_link_callback_added_after_exception_inside_thread(self):
def func():
raise Exception()
link_callback = mock.Mock()
self.service.start()
thread = self.service._spawn_worker(func)
self.service._worker_pool.waitall()
# func_mock finished at this moment
thread.link(link_callback)
link_callback.assert_called_once_with(thread)

View File

@ -117,7 +117,7 @@ class RPCAPITestCase(base.DbTestCase):
def test_change_node_power_state(self):
self._test_rpcapi('change_node_power_state',
'cast',
'call',
node_id=self.fake_node['uuid'],
new_state=states.POWER_ON)