Merge "Prioritize sloppy nodes for power sync"

This commit is contained in:
Zuul 2019-02-14 07:02:48 +00:00 committed by Gerrit Code Review
commit e34941c327
2 changed files with 38 additions and 10 deletions

View File

@ -1665,20 +1665,28 @@ class ConductorManager(base_manager.BaseConductorManager):
def _sync_power_states(self, context):
"""Periodic task to sync power states for the nodes."""
filters = {'maintenance': False}
nodes = queue.Queue()
for node_info in self.iter_nodes(fields=['id'], filters=filters):
nodes.put(node_info)
# NOTE(etingof): prioritize non-responding nodes to fail them fast
nodes = sorted(
self.iter_nodes(fields=['id'], filters=filters),
key=lambda n: -self.power_state_sync_count.get(n[0], 0)
)
nodes_queue = queue.Queue()
for node_info in nodes:
nodes_queue.put(node_info)
number_of_workers = min(CONF.conductor.sync_power_state_workers,
CONF.conductor.periodic_max_workers,
nodes.qsize())
nodes_queue.qsize())
futures = []
for worker_number in range(max(0, number_of_workers - 1)):
try:
futures.append(
self._spawn_worker(self._sync_power_state_nodes_task,
context, nodes))
context, nodes_queue))
except exception.NoFreeConductorWorker:
LOG.warning("There are no more conductor workers for "
"power sync task. %(workers)d workers have "
@ -1687,7 +1695,7 @@ class ConductorManager(base_manager.BaseConductorManager):
break
try:
self._sync_power_state_nodes_task(context, nodes)
self._sync_power_state_nodes_task(context, nodes_queue)
finally:
waiters.wait_for_all(futures)

View File

@ -7160,7 +7160,7 @@ class ParallelPowerSyncTestCase(mgr_utils.CommonMixIn, db_base.DbTestCase):
CONF.set_override('sync_power_state_workers', 8, group='conductor')
with mock.patch.object(self.service, 'iter_nodes',
new=mock.MagicMock(return_value=[None] * 9)):
new=mock.MagicMock(return_value=[[0]] * 9)):
self.service._sync_power_states(self.context)
@ -7174,7 +7174,7 @@ class ParallelPowerSyncTestCase(mgr_utils.CommonMixIn, db_base.DbTestCase):
CONF.set_override('sync_power_state_workers', 8, group='conductor')
with mock.patch.object(self.service, 'iter_nodes',
new=mock.MagicMock(return_value=[None] * 6)):
new=mock.MagicMock(return_value=[[0]] * 6)):
self.service._sync_power_states(self.context)
@ -7188,7 +7188,7 @@ class ParallelPowerSyncTestCase(mgr_utils.CommonMixIn, db_base.DbTestCase):
CONF.set_override('sync_power_state_workers', 8, group='conductor')
with mock.patch.object(self.service, 'iter_nodes',
new=mock.MagicMock(return_value=[None])):
new=mock.MagicMock(return_value=[[0]])):
self.service._sync_power_states(self.context)
@ -7202,7 +7202,7 @@ class ParallelPowerSyncTestCase(mgr_utils.CommonMixIn, db_base.DbTestCase):
CONF.set_override('sync_power_state_workers', 1, group='conductor')
with mock.patch.object(self.service, 'iter_nodes',
new=mock.MagicMock(return_value=[None] * 9)):
new=mock.MagicMock(return_value=[[0]] * 9)):
self.service._sync_power_states(self.context)
@ -7210,6 +7210,26 @@ class ParallelPowerSyncTestCase(mgr_utils.CommonMixIn, db_base.DbTestCase):
self.assertEqual(1, sync_mock.call_count)
self.assertEqual(1, waiter_mock.call_count)
@mock.patch.object(queue, 'Queue', autospec=True)
def test__sync_power_states_node_prioritization(
self, queue_mock, sync_mock, spawn_mock, waiter_mock):
CONF.set_override('sync_power_state_workers', 1, group='conductor')
with mock.patch.object(
self.service, 'iter_nodes',
new=mock.MagicMock(return_value=[[0], [1], [2]])
), mock.patch.dict(
self.service.power_state_sync_count,
{0: 1, 1: 0, 2: 2}, clear=True):
queue_mock.return_value.qsize.return_value = 0
self.service._sync_power_states(self.context)
expected_calls = [mock.call([2]), mock.call([0]), mock.call([1])]
queue_mock.return_value.put.assert_has_calls(expected_calls)
@mock.patch.object(task_manager, 'acquire')
@mock.patch.object(manager.ConductorManager, '_mapped_to_this_conductor')