Allocation API: resume allocations on conductor restart

This change allows allocations that were not finished because of
conductor restarting or crashing to be finished after start up.

Change-Id: I016e08dcb59613b59ae753ef7d3bc9ac4a4a950a
Story: #2004341
Task: #29544
This commit is contained in:
Dmitry Tantsur 2019-02-19 13:14:36 +01:00
parent d1b572091b
commit b1cfcb7c98
4 changed files with 84 additions and 3 deletions

View File

@ -33,6 +33,7 @@ from ironic.common.i18n import _
from ironic.common import release_mappings as versions
from ironic.common import rpc
from ironic.common import states
from ironic.conductor import allocations
from ironic.conductor import notification_utils as notify_utils
from ironic.conductor import task_manager
from ironic.conf import CONF
@ -204,6 +205,13 @@ class BaseConductorManager(object):
LOG.critical('Failed to start keepalive')
self.del_host()
# Resume allocations that started before the restart.
try:
self._spawn_worker(self._resume_allocations,
ironic_context.get_admin_context())
except exception.NoFreeConductorWorker:
LOG.warning('Failed to start worker for resuming allocations.')
self._started = True
def _use_groups(self):
@ -550,3 +558,11 @@ class BaseConductorManager(object):
finally:
# Yield on every iteration
eventlet.sleep(0)
def _resume_allocations(self, context):
"""Resume unfinished allocations on restart."""
filters = {'state': states.ALLOCATING,
'conductor_affinity': self.conductor.id}
for allocation in objects.Allocation.list(context, filters=filters):
LOG.debug('Resuming unfinished allocation %s', allocation.uuid)
allocations.do_allocate(context, allocation)

View File

@ -199,6 +199,17 @@ def add_allocation_filter_by_node(query, value):
return query.filter(models.Node.uuid == value)
def add_allocation_filter_by_conductor(query, value):
if strutils.is_int_like(value):
return query.filter_by(conductor_affinity=value)
else:
# Assume hostname and join with the conductor table
query = query.join(
models.Conductor,
models.Allocation.conductor_affinity == models.Conductor.id)
return query.filter(models.Conductor.hostname == value)
def _paginate_query(model, limit=None, marker=None, sort_key=None,
sort_dir=None, query=None):
if not query:
@ -339,7 +350,8 @@ class Connection(api.Connection):
def _add_allocations_filters(self, query, filters):
if filters is None:
filters = dict()
supported_filters = {'state', 'resource_class', 'node_uuid'}
supported_filters = {'state', 'resource_class', 'node_uuid',
'conductor_affinity'}
unsupported_filters = set(filters).difference(supported_filters)
if unsupported_filters:
msg = _("SqlAlchemy API does not support "
@ -353,6 +365,13 @@ class Connection(api.Connection):
else:
query = add_allocation_filter_by_node(query, node_uuid)
try:
conductor = filters.pop('conductor_affinity')
except KeyError:
pass
else:
query = add_allocation_filter_by_conductor(query, conductor)
if filters:
query = query.filter_by(**filters)
return query

View File

@ -37,6 +37,10 @@ class AllocationTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
allocation = obj_utils.get_test_allocation(self.context,
extra={'test': 'one'})
self._start_service()
mock_spawn.assert_any_call(self.service,
self.service._resume_allocations,
mock.ANY)
mock_spawn.reset_mock()
res = self.service.create_allocation(self.context, allocation)
@ -137,6 +141,33 @@ class AllocationTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
self.assertIsNone(node['instance_uuid'])
self.assertIsNone(node['allocation_id'])
@mock.patch.object(allocations, 'do_allocate', autospec=True)
def test_resume_allocations(self, mock_allocate):
another_conductor = obj_utils.create_test_conductor(
self.context, id=42, hostname='another-host')
self._start_service()
obj_utils.create_test_allocation(
self.context,
state='active',
conductor_affinity=self.service.conductor.id)
obj_utils.create_test_allocation(
self.context,
state='allocating',
conductor_affinity=another_conductor.id)
allocation = obj_utils.create_test_allocation(
self.context,
state='allocating',
conductor_affinity=self.service.conductor.id)
self.service._resume_allocations(self.context)
mock_allocate.assert_called_once_with(self.context, mock.ANY)
actual = mock_allocate.call_args[0][1]
self.assertEqual(allocation.uuid, actual.uuid)
self.assertIsInstance(allocation, objects.Allocation)
@mock.patch('time.sleep', lambda _: None)
class DoAllocateTestCase(db_base.DbTestCase):

View File

@ -33,7 +33,7 @@ class AllocationsTestCase(base.DbTestCase):
self.assertIsNotNone(allocation.uuid)
self.assertEqual('allocating', allocation.state)
def _create_test_allocation_range(self, count, **kw):
def _create_test_allocation_range(self, count, start_idx=0, **kw):
"""Create the specified number of test allocation entries in DB
It uses create_test_allocation method. And returns List of Allocation
@ -46,7 +46,7 @@ class AllocationsTestCase(base.DbTestCase):
return [db_utils.create_test_allocation(uuid=uuidutils.generate_uuid(),
name='allocation' + str(i),
**kw).uuid
for i in range(count)]
for i in range(start_idx, count + start_idx)]
def test_get_allocation_by_id(self):
res = self.dbapi.get_allocation_by_id(self.allocation.id)
@ -117,6 +117,21 @@ class AllocationsTestCase(base.DbTestCase):
filters={'resource_class': 'very-large'})
self.assertEqual([self.allocation.uuid], [r.uuid for r in res])
def test_get_allocation_list_filter_by_conductor_affinity(self):
db_utils.create_test_conductor(id=1, hostname='host1')
db_utils.create_test_conductor(id=2, hostname='host2')
in_host1 = self._create_test_allocation_range(2, conductor_affinity=1)
in_host2 = self._create_test_allocation_range(2, conductor_affinity=2,
start_idx=2)
res = self.dbapi.get_allocation_list(
filters={'conductor_affinity': 1})
self.assertEqual(set(in_host1), {r.uuid for r in res})
res = self.dbapi.get_allocation_list(
filters={'conductor_affinity': 'host2'})
self.assertEqual(set(in_host2), {r.uuid for r in res})
def test_get_allocation_list_invalid_fields(self):
self.assertRaises(exception.InvalidParameterValue,
self.dbapi.get_allocation_list, sort_key='foo')