Merge "Parallelize periodic power sync calls"

This commit is contained in:
Zuul 2019-02-02 11:57:17 +00:00 committed by Gerrit Code Review
commit c9aa36b78c
4 changed files with 128 additions and 7 deletions

View File

@ -1663,10 +1663,40 @@ class ConductorManager(base_manager.BaseConductorManager):
@periodics.periodic(spacing=CONF.conductor.sync_power_state_interval,
enabled=CONF.conductor.sync_power_state_interval > 0)
def _sync_power_states(self, context):
"""Periodic task to sync power states for the nodes.
"""Periodic task to sync power states for the nodes."""
filters = {'maintenance': False}
nodes = queue.Queue()
for node_info in self.iter_nodes(fields=['id'], filters=filters):
nodes.put(node_info)
Attempt to grab a lock and sync only if the following
conditions are met:
number_of_threads = min(CONF.conductor.sync_power_state_workers,
CONF.conductor.periodic_max_workers,
nodes.qsize())
futures = []
for thread_number in range(max(0, number_of_threads - 1)):
try:
futures.append(
self._spawn_worker(self._sync_power_state_nodes_task,
context, nodes))
except exception.NoFreeConductorWorker:
LOG.warning("There are no more conductor workers for "
"power sync task. %(workers)d workers have "
"been already spawned.",
{'workers': thread_number})
break
try:
self._sync_power_state_nodes_task(context, nodes)
finally:
waiters.wait_for_all(futures)
def _sync_power_state_nodes_task(self, context, nodes):
"""Invokes power state sync on nodes from synchronized queue.
Attempt to grab a lock and sync only if the following conditions
are met:
1) Node is mapped to this conductor.
2) Node is not in maintenance mode.
@ -1692,9 +1722,13 @@ class ConductorManager(base_manager.BaseConductorManager):
# (through to its DB API call) so that we can eliminate our call
# and first set of checks below.
filters = {'maintenance': False}
node_iter = self.iter_nodes(fields=['id'], filters=filters)
for (node_uuid, driver, conductor_group, node_id) in node_iter:
while not self._shutdown:
try:
(node_uuid, driver, conductor_group,
node_id) = nodes.get_nowait()
except queue.Empty:
break
try:
# NOTE(dtantsur): start with a shared lock, upgrade if needed
with task_manager.acquire(context, node_uuid,

View File

@ -24,7 +24,9 @@ opts = [
default=100, min=3,
help=_('The size of the workers greenthread pool. '
'Note that 2 threads will be reserved by the conductor '
'itself for handling heart beats and periodic tasks.')),
'itself for handling heart beats and periodic tasks. '
'On top of that, `sync_power_state_workers` will take '
'up to 7 green threads with the default value of 8.')),
cfg.IntOpt('heartbeat_interval',
default=10,
help=_('Seconds between conductor heart beats.')),
@ -77,6 +79,11 @@ opts = [
'number of times Ironic should try syncing the '
'hardware node power state with the node power state '
'in DB')),
cfg.IntOpt('sync_power_state_workers',
default=8, min=1,
help=_('The maximum number of workers that can be started '
'simultaneously to sync nodes power state from the '
'periodic task.')),
cfg.IntOpt('periodic_max_workers',
default=8,
help=_('Maximum number of worker threads that can be started '

View File

@ -22,6 +22,7 @@ from collections import namedtuple
import datetime
import eventlet
from futurist import waiters
import mock
from oslo_config import cfg
from oslo_db import exception as db_exception
@ -6416,6 +6417,10 @@ class ManagerDoSyncPowerStateTestCase(db_base.DbTestCase):
self.task.upgrade_lock.assert_called_once_with()
@mock.patch.object(waiters, 'wait_for_all',
new=mock.MagicMock(return_value=(0, 0)))
@mock.patch.object(manager.ConductorManager, '_spawn_worker',
new=lambda self, fun, *args: fun(*args))
@mock.patch.object(manager, 'do_sync_power_state')
@mock.patch.object(task_manager, 'acquire')
@mock.patch.object(manager.ConductorManager, '_mapped_to_this_conductor')
@ -7140,6 +7145,72 @@ class ManagerTestHardwareTypeProperties(mgr_utils.ServiceSetUpMixin,
self._check_hardware_type_properties('manual-management', expected)
@mock.patch.object(waiters, 'wait_for_all')
@mock.patch.object(manager.ConductorManager, '_spawn_worker')
@mock.patch.object(manager.ConductorManager, '_sync_power_state_nodes_task')
class ParallelPowerSyncTestCase(mgr_utils.CommonMixIn):
def setUp(self):
super(ParallelPowerSyncTestCase, self).setUp()
self.service = manager.ConductorManager('hostname', 'test-topic')
def test__sync_power_states_9_nodes_8_workers(
self, sync_mock, spawn_mock, waiter_mock):
CONF.set_override('sync_power_state_workers', 8, group='conductor')
with mock.patch.object(self.service, 'iter_nodes',
new=mock.MagicMock(return_value=[None] * 9)):
self.service._sync_power_states(self.context)
self.assertEqual(7, spawn_mock.call_count)
self.assertEqual(1, sync_mock.call_count)
self.assertEqual(1, waiter_mock.call_count)
def test__sync_power_states_6_nodes_8_workers(
self, sync_mock, spawn_mock, waiter_mock):
CONF.set_override('sync_power_state_workers', 8, group='conductor')
with mock.patch.object(self.service, 'iter_nodes',
new=mock.MagicMock(return_value=[None] * 6)):
self.service._sync_power_states(self.context)
self.assertEqual(5, spawn_mock.call_count)
self.assertEqual(1, sync_mock.call_count)
self.assertEqual(1, waiter_mock.call_count)
def test__sync_power_states_1_nodes_8_workers(
self, sync_mock, spawn_mock, waiter_mock):
CONF.set_override('sync_power_state_workers', 8, group='conductor')
with mock.patch.object(self.service, 'iter_nodes',
new=mock.MagicMock(return_value=[None])):
self.service._sync_power_states(self.context)
self.assertEqual(0, spawn_mock.call_count)
self.assertEqual(1, sync_mock.call_count)
self.assertEqual(1, waiter_mock.call_count)
def test__sync_power_states_9_nodes_1_worker(
self, sync_mock, spawn_mock, waiter_mock):
CONF.set_override('sync_power_state_workers', 1, group='conductor')
with mock.patch.object(self.service, 'iter_nodes',
new=mock.MagicMock(return_value=[None] * 9)):
self.service._sync_power_states(self.context)
self.assertEqual(0, spawn_mock.call_count)
self.assertEqual(9, sync_mock.call_count)
self.assertEqual(1, waiter_mock.call_count)
@mock.patch.object(task_manager, 'acquire')
@mock.patch.object(manager.ConductorManager, '_mapped_to_this_conductor')
@mock.patch.object(dbapi.IMPL, 'get_nodeinfo_list')

View File

@ -0,0 +1,9 @@
---
features:
- |
Parallelizes periodic power sync calls by running up to
``sync_power_state_workers`` simultenously. The default is to run
up to ``8`` workers.
This change should let larger-scale setups to run power syncs more
frequently and make the whole power sync procedure more resilient to slow
or dead BMCs.