Merge "Parallelize periodic power sync calls"
This commit is contained in:
commit
c9aa36b78c
|
@ -1663,10 +1663,40 @@ class ConductorManager(base_manager.BaseConductorManager):
|
||||||
@periodics.periodic(spacing=CONF.conductor.sync_power_state_interval,
|
@periodics.periodic(spacing=CONF.conductor.sync_power_state_interval,
|
||||||
enabled=CONF.conductor.sync_power_state_interval > 0)
|
enabled=CONF.conductor.sync_power_state_interval > 0)
|
||||||
def _sync_power_states(self, context):
|
def _sync_power_states(self, context):
|
||||||
"""Periodic task to sync power states for the nodes.
|
"""Periodic task to sync power states for the nodes."""
|
||||||
|
filters = {'maintenance': False}
|
||||||
|
nodes = queue.Queue()
|
||||||
|
for node_info in self.iter_nodes(fields=['id'], filters=filters):
|
||||||
|
nodes.put(node_info)
|
||||||
|
|
||||||
Attempt to grab a lock and sync only if the following
|
number_of_threads = min(CONF.conductor.sync_power_state_workers,
|
||||||
conditions are met:
|
CONF.conductor.periodic_max_workers,
|
||||||
|
nodes.qsize())
|
||||||
|
futures = []
|
||||||
|
|
||||||
|
for thread_number in range(max(0, number_of_threads - 1)):
|
||||||
|
try:
|
||||||
|
futures.append(
|
||||||
|
self._spawn_worker(self._sync_power_state_nodes_task,
|
||||||
|
context, nodes))
|
||||||
|
except exception.NoFreeConductorWorker:
|
||||||
|
LOG.warning("There are no more conductor workers for "
|
||||||
|
"power sync task. %(workers)d workers have "
|
||||||
|
"been already spawned.",
|
||||||
|
{'workers': thread_number})
|
||||||
|
break
|
||||||
|
|
||||||
|
try:
|
||||||
|
self._sync_power_state_nodes_task(context, nodes)
|
||||||
|
|
||||||
|
finally:
|
||||||
|
waiters.wait_for_all(futures)
|
||||||
|
|
||||||
|
def _sync_power_state_nodes_task(self, context, nodes):
|
||||||
|
"""Invokes power state sync on nodes from synchronized queue.
|
||||||
|
|
||||||
|
Attempt to grab a lock and sync only if the following conditions
|
||||||
|
are met:
|
||||||
|
|
||||||
1) Node is mapped to this conductor.
|
1) Node is mapped to this conductor.
|
||||||
2) Node is not in maintenance mode.
|
2) Node is not in maintenance mode.
|
||||||
|
@ -1692,9 +1722,13 @@ class ConductorManager(base_manager.BaseConductorManager):
|
||||||
# (through to its DB API call) so that we can eliminate our call
|
# (through to its DB API call) so that we can eliminate our call
|
||||||
# and first set of checks below.
|
# and first set of checks below.
|
||||||
|
|
||||||
filters = {'maintenance': False}
|
while not self._shutdown:
|
||||||
node_iter = self.iter_nodes(fields=['id'], filters=filters)
|
try:
|
||||||
for (node_uuid, driver, conductor_group, node_id) in node_iter:
|
(node_uuid, driver, conductor_group,
|
||||||
|
node_id) = nodes.get_nowait()
|
||||||
|
except queue.Empty:
|
||||||
|
break
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# NOTE(dtantsur): start with a shared lock, upgrade if needed
|
# NOTE(dtantsur): start with a shared lock, upgrade if needed
|
||||||
with task_manager.acquire(context, node_uuid,
|
with task_manager.acquire(context, node_uuid,
|
||||||
|
|
|
@ -24,7 +24,9 @@ opts = [
|
||||||
default=100, min=3,
|
default=100, min=3,
|
||||||
help=_('The size of the workers greenthread pool. '
|
help=_('The size of the workers greenthread pool. '
|
||||||
'Note that 2 threads will be reserved by the conductor '
|
'Note that 2 threads will be reserved by the conductor '
|
||||||
'itself for handling heart beats and periodic tasks.')),
|
'itself for handling heart beats and periodic tasks. '
|
||||||
|
'On top of that, `sync_power_state_workers` will take '
|
||||||
|
'up to 7 green threads with the default value of 8.')),
|
||||||
cfg.IntOpt('heartbeat_interval',
|
cfg.IntOpt('heartbeat_interval',
|
||||||
default=10,
|
default=10,
|
||||||
help=_('Seconds between conductor heart beats.')),
|
help=_('Seconds between conductor heart beats.')),
|
||||||
|
@ -77,6 +79,11 @@ opts = [
|
||||||
'number of times Ironic should try syncing the '
|
'number of times Ironic should try syncing the '
|
||||||
'hardware node power state with the node power state '
|
'hardware node power state with the node power state '
|
||||||
'in DB')),
|
'in DB')),
|
||||||
|
cfg.IntOpt('sync_power_state_workers',
|
||||||
|
default=8, min=1,
|
||||||
|
help=_('The maximum number of workers that can be started '
|
||||||
|
'simultaneously to sync nodes power state from the '
|
||||||
|
'periodic task.')),
|
||||||
cfg.IntOpt('periodic_max_workers',
|
cfg.IntOpt('periodic_max_workers',
|
||||||
default=8,
|
default=8,
|
||||||
help=_('Maximum number of worker threads that can be started '
|
help=_('Maximum number of worker threads that can be started '
|
||||||
|
|
|
@ -22,6 +22,7 @@ from collections import namedtuple
|
||||||
import datetime
|
import datetime
|
||||||
|
|
||||||
import eventlet
|
import eventlet
|
||||||
|
from futurist import waiters
|
||||||
import mock
|
import mock
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_db import exception as db_exception
|
from oslo_db import exception as db_exception
|
||||||
|
@ -6416,6 +6417,10 @@ class ManagerDoSyncPowerStateTestCase(db_base.DbTestCase):
|
||||||
self.task.upgrade_lock.assert_called_once_with()
|
self.task.upgrade_lock.assert_called_once_with()
|
||||||
|
|
||||||
|
|
||||||
|
@mock.patch.object(waiters, 'wait_for_all',
|
||||||
|
new=mock.MagicMock(return_value=(0, 0)))
|
||||||
|
@mock.patch.object(manager.ConductorManager, '_spawn_worker',
|
||||||
|
new=lambda self, fun, *args: fun(*args))
|
||||||
@mock.patch.object(manager, 'do_sync_power_state')
|
@mock.patch.object(manager, 'do_sync_power_state')
|
||||||
@mock.patch.object(task_manager, 'acquire')
|
@mock.patch.object(task_manager, 'acquire')
|
||||||
@mock.patch.object(manager.ConductorManager, '_mapped_to_this_conductor')
|
@mock.patch.object(manager.ConductorManager, '_mapped_to_this_conductor')
|
||||||
|
@ -7140,6 +7145,72 @@ class ManagerTestHardwareTypeProperties(mgr_utils.ServiceSetUpMixin,
|
||||||
self._check_hardware_type_properties('manual-management', expected)
|
self._check_hardware_type_properties('manual-management', expected)
|
||||||
|
|
||||||
|
|
||||||
|
@mock.patch.object(waiters, 'wait_for_all')
|
||||||
|
@mock.patch.object(manager.ConductorManager, '_spawn_worker')
|
||||||
|
@mock.patch.object(manager.ConductorManager, '_sync_power_state_nodes_task')
|
||||||
|
class ParallelPowerSyncTestCase(mgr_utils.CommonMixIn):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(ParallelPowerSyncTestCase, self).setUp()
|
||||||
|
self.service = manager.ConductorManager('hostname', 'test-topic')
|
||||||
|
|
||||||
|
def test__sync_power_states_9_nodes_8_workers(
|
||||||
|
self, sync_mock, spawn_mock, waiter_mock):
|
||||||
|
|
||||||
|
CONF.set_override('sync_power_state_workers', 8, group='conductor')
|
||||||
|
|
||||||
|
with mock.patch.object(self.service, 'iter_nodes',
|
||||||
|
new=mock.MagicMock(return_value=[None] * 9)):
|
||||||
|
|
||||||
|
self.service._sync_power_states(self.context)
|
||||||
|
|
||||||
|
self.assertEqual(7, spawn_mock.call_count)
|
||||||
|
self.assertEqual(1, sync_mock.call_count)
|
||||||
|
self.assertEqual(1, waiter_mock.call_count)
|
||||||
|
|
||||||
|
def test__sync_power_states_6_nodes_8_workers(
|
||||||
|
self, sync_mock, spawn_mock, waiter_mock):
|
||||||
|
|
||||||
|
CONF.set_override('sync_power_state_workers', 8, group='conductor')
|
||||||
|
|
||||||
|
with mock.patch.object(self.service, 'iter_nodes',
|
||||||
|
new=mock.MagicMock(return_value=[None] * 6)):
|
||||||
|
|
||||||
|
self.service._sync_power_states(self.context)
|
||||||
|
|
||||||
|
self.assertEqual(5, spawn_mock.call_count)
|
||||||
|
self.assertEqual(1, sync_mock.call_count)
|
||||||
|
self.assertEqual(1, waiter_mock.call_count)
|
||||||
|
|
||||||
|
def test__sync_power_states_1_nodes_8_workers(
|
||||||
|
self, sync_mock, spawn_mock, waiter_mock):
|
||||||
|
|
||||||
|
CONF.set_override('sync_power_state_workers', 8, group='conductor')
|
||||||
|
|
||||||
|
with mock.patch.object(self.service, 'iter_nodes',
|
||||||
|
new=mock.MagicMock(return_value=[None])):
|
||||||
|
|
||||||
|
self.service._sync_power_states(self.context)
|
||||||
|
|
||||||
|
self.assertEqual(0, spawn_mock.call_count)
|
||||||
|
self.assertEqual(1, sync_mock.call_count)
|
||||||
|
self.assertEqual(1, waiter_mock.call_count)
|
||||||
|
|
||||||
|
def test__sync_power_states_9_nodes_1_worker(
|
||||||
|
self, sync_mock, spawn_mock, waiter_mock):
|
||||||
|
|
||||||
|
CONF.set_override('sync_power_state_workers', 1, group='conductor')
|
||||||
|
|
||||||
|
with mock.patch.object(self.service, 'iter_nodes',
|
||||||
|
new=mock.MagicMock(return_value=[None] * 9)):
|
||||||
|
|
||||||
|
self.service._sync_power_states(self.context)
|
||||||
|
|
||||||
|
self.assertEqual(0, spawn_mock.call_count)
|
||||||
|
self.assertEqual(9, sync_mock.call_count)
|
||||||
|
self.assertEqual(1, waiter_mock.call_count)
|
||||||
|
|
||||||
|
|
||||||
@mock.patch.object(task_manager, 'acquire')
|
@mock.patch.object(task_manager, 'acquire')
|
||||||
@mock.patch.object(manager.ConductorManager, '_mapped_to_this_conductor')
|
@mock.patch.object(manager.ConductorManager, '_mapped_to_this_conductor')
|
||||||
@mock.patch.object(dbapi.IMPL, 'get_nodeinfo_list')
|
@mock.patch.object(dbapi.IMPL, 'get_nodeinfo_list')
|
||||||
|
|
|
@ -0,0 +1,9 @@
|
||||||
|
---
|
||||||
|
features:
|
||||||
|
- |
|
||||||
|
Parallelizes periodic power sync calls by running up to
|
||||||
|
``sync_power_state_workers`` simultenously. The default is to run
|
||||||
|
up to ``8`` workers.
|
||||||
|
This change should let larger-scale setups to run power syncs more
|
||||||
|
frequently and make the whole power sync procedure more resilient to slow
|
||||||
|
or dead BMCs.
|
Loading…
Reference in New Issue