Merge "Parallelize periodic power sync calls"
This commit is contained in:
commit
c9aa36b78c
|
@ -1663,10 +1663,40 @@ class ConductorManager(base_manager.BaseConductorManager):
|
|||
@periodics.periodic(spacing=CONF.conductor.sync_power_state_interval,
|
||||
enabled=CONF.conductor.sync_power_state_interval > 0)
|
||||
def _sync_power_states(self, context):
|
||||
"""Periodic task to sync power states for the nodes.
|
||||
"""Periodic task to sync power states for the nodes."""
|
||||
filters = {'maintenance': False}
|
||||
nodes = queue.Queue()
|
||||
for node_info in self.iter_nodes(fields=['id'], filters=filters):
|
||||
nodes.put(node_info)
|
||||
|
||||
Attempt to grab a lock and sync only if the following
|
||||
conditions are met:
|
||||
number_of_threads = min(CONF.conductor.sync_power_state_workers,
|
||||
CONF.conductor.periodic_max_workers,
|
||||
nodes.qsize())
|
||||
futures = []
|
||||
|
||||
for thread_number in range(max(0, number_of_threads - 1)):
|
||||
try:
|
||||
futures.append(
|
||||
self._spawn_worker(self._sync_power_state_nodes_task,
|
||||
context, nodes))
|
||||
except exception.NoFreeConductorWorker:
|
||||
LOG.warning("There are no more conductor workers for "
|
||||
"power sync task. %(workers)d workers have "
|
||||
"been already spawned.",
|
||||
{'workers': thread_number})
|
||||
break
|
||||
|
||||
try:
|
||||
self._sync_power_state_nodes_task(context, nodes)
|
||||
|
||||
finally:
|
||||
waiters.wait_for_all(futures)
|
||||
|
||||
def _sync_power_state_nodes_task(self, context, nodes):
|
||||
"""Invokes power state sync on nodes from synchronized queue.
|
||||
|
||||
Attempt to grab a lock and sync only if the following conditions
|
||||
are met:
|
||||
|
||||
1) Node is mapped to this conductor.
|
||||
2) Node is not in maintenance mode.
|
||||
|
@ -1692,9 +1722,13 @@ class ConductorManager(base_manager.BaseConductorManager):
|
|||
# (through to its DB API call) so that we can eliminate our call
|
||||
# and first set of checks below.
|
||||
|
||||
filters = {'maintenance': False}
|
||||
node_iter = self.iter_nodes(fields=['id'], filters=filters)
|
||||
for (node_uuid, driver, conductor_group, node_id) in node_iter:
|
||||
while not self._shutdown:
|
||||
try:
|
||||
(node_uuid, driver, conductor_group,
|
||||
node_id) = nodes.get_nowait()
|
||||
except queue.Empty:
|
||||
break
|
||||
|
||||
try:
|
||||
# NOTE(dtantsur): start with a shared lock, upgrade if needed
|
||||
with task_manager.acquire(context, node_uuid,
|
||||
|
|
|
@ -24,7 +24,9 @@ opts = [
|
|||
default=100, min=3,
|
||||
help=_('The size of the workers greenthread pool. '
|
||||
'Note that 2 threads will be reserved by the conductor '
|
||||
'itself for handling heart beats and periodic tasks.')),
|
||||
'itself for handling heart beats and periodic tasks. '
|
||||
'On top of that, `sync_power_state_workers` will take '
|
||||
'up to 7 green threads with the default value of 8.')),
|
||||
cfg.IntOpt('heartbeat_interval',
|
||||
default=10,
|
||||
help=_('Seconds between conductor heart beats.')),
|
||||
|
@ -77,6 +79,11 @@ opts = [
|
|||
'number of times Ironic should try syncing the '
|
||||
'hardware node power state with the node power state '
|
||||
'in DB')),
|
||||
cfg.IntOpt('sync_power_state_workers',
|
||||
default=8, min=1,
|
||||
help=_('The maximum number of workers that can be started '
|
||||
'simultaneously to sync nodes power state from the '
|
||||
'periodic task.')),
|
||||
cfg.IntOpt('periodic_max_workers',
|
||||
default=8,
|
||||
help=_('Maximum number of worker threads that can be started '
|
||||
|
|
|
@ -22,6 +22,7 @@ from collections import namedtuple
|
|||
import datetime
|
||||
|
||||
import eventlet
|
||||
from futurist import waiters
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
from oslo_db import exception as db_exception
|
||||
|
@ -6416,6 +6417,10 @@ class ManagerDoSyncPowerStateTestCase(db_base.DbTestCase):
|
|||
self.task.upgrade_lock.assert_called_once_with()
|
||||
|
||||
|
||||
@mock.patch.object(waiters, 'wait_for_all',
|
||||
new=mock.MagicMock(return_value=(0, 0)))
|
||||
@mock.patch.object(manager.ConductorManager, '_spawn_worker',
|
||||
new=lambda self, fun, *args: fun(*args))
|
||||
@mock.patch.object(manager, 'do_sync_power_state')
|
||||
@mock.patch.object(task_manager, 'acquire')
|
||||
@mock.patch.object(manager.ConductorManager, '_mapped_to_this_conductor')
|
||||
|
@ -7140,6 +7145,72 @@ class ManagerTestHardwareTypeProperties(mgr_utils.ServiceSetUpMixin,
|
|||
self._check_hardware_type_properties('manual-management', expected)
|
||||
|
||||
|
||||
@mock.patch.object(waiters, 'wait_for_all')
|
||||
@mock.patch.object(manager.ConductorManager, '_spawn_worker')
|
||||
@mock.patch.object(manager.ConductorManager, '_sync_power_state_nodes_task')
|
||||
class ParallelPowerSyncTestCase(mgr_utils.CommonMixIn):
|
||||
|
||||
def setUp(self):
|
||||
super(ParallelPowerSyncTestCase, self).setUp()
|
||||
self.service = manager.ConductorManager('hostname', 'test-topic')
|
||||
|
||||
def test__sync_power_states_9_nodes_8_workers(
|
||||
self, sync_mock, spawn_mock, waiter_mock):
|
||||
|
||||
CONF.set_override('sync_power_state_workers', 8, group='conductor')
|
||||
|
||||
with mock.patch.object(self.service, 'iter_nodes',
|
||||
new=mock.MagicMock(return_value=[None] * 9)):
|
||||
|
||||
self.service._sync_power_states(self.context)
|
||||
|
||||
self.assertEqual(7, spawn_mock.call_count)
|
||||
self.assertEqual(1, sync_mock.call_count)
|
||||
self.assertEqual(1, waiter_mock.call_count)
|
||||
|
||||
def test__sync_power_states_6_nodes_8_workers(
|
||||
self, sync_mock, spawn_mock, waiter_mock):
|
||||
|
||||
CONF.set_override('sync_power_state_workers', 8, group='conductor')
|
||||
|
||||
with mock.patch.object(self.service, 'iter_nodes',
|
||||
new=mock.MagicMock(return_value=[None] * 6)):
|
||||
|
||||
self.service._sync_power_states(self.context)
|
||||
|
||||
self.assertEqual(5, spawn_mock.call_count)
|
||||
self.assertEqual(1, sync_mock.call_count)
|
||||
self.assertEqual(1, waiter_mock.call_count)
|
||||
|
||||
def test__sync_power_states_1_nodes_8_workers(
|
||||
self, sync_mock, spawn_mock, waiter_mock):
|
||||
|
||||
CONF.set_override('sync_power_state_workers', 8, group='conductor')
|
||||
|
||||
with mock.patch.object(self.service, 'iter_nodes',
|
||||
new=mock.MagicMock(return_value=[None])):
|
||||
|
||||
self.service._sync_power_states(self.context)
|
||||
|
||||
self.assertEqual(0, spawn_mock.call_count)
|
||||
self.assertEqual(1, sync_mock.call_count)
|
||||
self.assertEqual(1, waiter_mock.call_count)
|
||||
|
||||
def test__sync_power_states_9_nodes_1_worker(
|
||||
self, sync_mock, spawn_mock, waiter_mock):
|
||||
|
||||
CONF.set_override('sync_power_state_workers', 1, group='conductor')
|
||||
|
||||
with mock.patch.object(self.service, 'iter_nodes',
|
||||
new=mock.MagicMock(return_value=[None] * 9)):
|
||||
|
||||
self.service._sync_power_states(self.context)
|
||||
|
||||
self.assertEqual(0, spawn_mock.call_count)
|
||||
self.assertEqual(9, sync_mock.call_count)
|
||||
self.assertEqual(1, waiter_mock.call_count)
|
||||
|
||||
|
||||
@mock.patch.object(task_manager, 'acquire')
|
||||
@mock.patch.object(manager.ConductorManager, '_mapped_to_this_conductor')
|
||||
@mock.patch.object(dbapi.IMPL, 'get_nodeinfo_list')
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
---
|
||||
features:
|
||||
- |
|
||||
Parallelizes periodic power sync calls by running up to
|
||||
``sync_power_state_workers`` simultenously. The default is to run
|
||||
up to ``8`` workers.
|
||||
This change should let larger-scale setups to run power syncs more
|
||||
frequently and make the whole power sync procedure more resilient to slow
|
||||
or dead BMCs.
|
Loading…
Reference in New Issue