Allocation API: resume allocations on conductor restart
This change allows allocations that were not finished because of conductor restarting or crashing to be finished after start up. Change-Id: I016e08dcb59613b59ae753ef7d3bc9ac4a4a950a Story: #2004341 Task: #29544
This commit is contained in:
parent
d1b572091b
commit
b1cfcb7c98
|
@ -33,6 +33,7 @@ from ironic.common.i18n import _
|
|||
from ironic.common import release_mappings as versions
|
||||
from ironic.common import rpc
|
||||
from ironic.common import states
|
||||
from ironic.conductor import allocations
|
||||
from ironic.conductor import notification_utils as notify_utils
|
||||
from ironic.conductor import task_manager
|
||||
from ironic.conf import CONF
|
||||
|
@ -204,6 +205,13 @@ class BaseConductorManager(object):
|
|||
LOG.critical('Failed to start keepalive')
|
||||
self.del_host()
|
||||
|
||||
# Resume allocations that started before the restart.
|
||||
try:
|
||||
self._spawn_worker(self._resume_allocations,
|
||||
ironic_context.get_admin_context())
|
||||
except exception.NoFreeConductorWorker:
|
||||
LOG.warning('Failed to start worker for resuming allocations.')
|
||||
|
||||
self._started = True
|
||||
|
||||
def _use_groups(self):
|
||||
|
@ -550,3 +558,11 @@ class BaseConductorManager(object):
|
|||
finally:
|
||||
# Yield on every iteration
|
||||
eventlet.sleep(0)
|
||||
|
||||
def _resume_allocations(self, context):
|
||||
"""Resume unfinished allocations on restart."""
|
||||
filters = {'state': states.ALLOCATING,
|
||||
'conductor_affinity': self.conductor.id}
|
||||
for allocation in objects.Allocation.list(context, filters=filters):
|
||||
LOG.debug('Resuming unfinished allocation %s', allocation.uuid)
|
||||
allocations.do_allocate(context, allocation)
|
||||
|
|
|
@ -199,6 +199,17 @@ def add_allocation_filter_by_node(query, value):
|
|||
return query.filter(models.Node.uuid == value)
|
||||
|
||||
|
||||
def add_allocation_filter_by_conductor(query, value):
|
||||
if strutils.is_int_like(value):
|
||||
return query.filter_by(conductor_affinity=value)
|
||||
else:
|
||||
# Assume hostname and join with the conductor table
|
||||
query = query.join(
|
||||
models.Conductor,
|
||||
models.Allocation.conductor_affinity == models.Conductor.id)
|
||||
return query.filter(models.Conductor.hostname == value)
|
||||
|
||||
|
||||
def _paginate_query(model, limit=None, marker=None, sort_key=None,
|
||||
sort_dir=None, query=None):
|
||||
if not query:
|
||||
|
@ -339,7 +350,8 @@ class Connection(api.Connection):
|
|||
def _add_allocations_filters(self, query, filters):
|
||||
if filters is None:
|
||||
filters = dict()
|
||||
supported_filters = {'state', 'resource_class', 'node_uuid'}
|
||||
supported_filters = {'state', 'resource_class', 'node_uuid',
|
||||
'conductor_affinity'}
|
||||
unsupported_filters = set(filters).difference(supported_filters)
|
||||
if unsupported_filters:
|
||||
msg = _("SqlAlchemy API does not support "
|
||||
|
@ -353,6 +365,13 @@ class Connection(api.Connection):
|
|||
else:
|
||||
query = add_allocation_filter_by_node(query, node_uuid)
|
||||
|
||||
try:
|
||||
conductor = filters.pop('conductor_affinity')
|
||||
except KeyError:
|
||||
pass
|
||||
else:
|
||||
query = add_allocation_filter_by_conductor(query, conductor)
|
||||
|
||||
if filters:
|
||||
query = query.filter_by(**filters)
|
||||
return query
|
||||
|
|
|
@ -37,6 +37,10 @@ class AllocationTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
|
|||
allocation = obj_utils.get_test_allocation(self.context,
|
||||
extra={'test': 'one'})
|
||||
self._start_service()
|
||||
|
||||
mock_spawn.assert_any_call(self.service,
|
||||
self.service._resume_allocations,
|
||||
mock.ANY)
|
||||
mock_spawn.reset_mock()
|
||||
|
||||
res = self.service.create_allocation(self.context, allocation)
|
||||
|
@ -137,6 +141,33 @@ class AllocationTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
|
|||
self.assertIsNone(node['instance_uuid'])
|
||||
self.assertIsNone(node['allocation_id'])
|
||||
|
||||
@mock.patch.object(allocations, 'do_allocate', autospec=True)
|
||||
def test_resume_allocations(self, mock_allocate):
|
||||
another_conductor = obj_utils.create_test_conductor(
|
||||
self.context, id=42, hostname='another-host')
|
||||
|
||||
self._start_service()
|
||||
|
||||
obj_utils.create_test_allocation(
|
||||
self.context,
|
||||
state='active',
|
||||
conductor_affinity=self.service.conductor.id)
|
||||
obj_utils.create_test_allocation(
|
||||
self.context,
|
||||
state='allocating',
|
||||
conductor_affinity=another_conductor.id)
|
||||
allocation = obj_utils.create_test_allocation(
|
||||
self.context,
|
||||
state='allocating',
|
||||
conductor_affinity=self.service.conductor.id)
|
||||
|
||||
self.service._resume_allocations(self.context)
|
||||
|
||||
mock_allocate.assert_called_once_with(self.context, mock.ANY)
|
||||
actual = mock_allocate.call_args[0][1]
|
||||
self.assertEqual(allocation.uuid, actual.uuid)
|
||||
self.assertIsInstance(allocation, objects.Allocation)
|
||||
|
||||
|
||||
@mock.patch('time.sleep', lambda _: None)
|
||||
class DoAllocateTestCase(db_base.DbTestCase):
|
||||
|
|
|
@ -33,7 +33,7 @@ class AllocationsTestCase(base.DbTestCase):
|
|||
self.assertIsNotNone(allocation.uuid)
|
||||
self.assertEqual('allocating', allocation.state)
|
||||
|
||||
def _create_test_allocation_range(self, count, **kw):
|
||||
def _create_test_allocation_range(self, count, start_idx=0, **kw):
|
||||
"""Create the specified number of test allocation entries in DB
|
||||
|
||||
It uses create_test_allocation method. And returns List of Allocation
|
||||
|
@ -46,7 +46,7 @@ class AllocationsTestCase(base.DbTestCase):
|
|||
return [db_utils.create_test_allocation(uuid=uuidutils.generate_uuid(),
|
||||
name='allocation' + str(i),
|
||||
**kw).uuid
|
||||
for i in range(count)]
|
||||
for i in range(start_idx, count + start_idx)]
|
||||
|
||||
def test_get_allocation_by_id(self):
|
||||
res = self.dbapi.get_allocation_by_id(self.allocation.id)
|
||||
|
@ -117,6 +117,21 @@ class AllocationsTestCase(base.DbTestCase):
|
|||
filters={'resource_class': 'very-large'})
|
||||
self.assertEqual([self.allocation.uuid], [r.uuid for r in res])
|
||||
|
||||
def test_get_allocation_list_filter_by_conductor_affinity(self):
|
||||
db_utils.create_test_conductor(id=1, hostname='host1')
|
||||
db_utils.create_test_conductor(id=2, hostname='host2')
|
||||
in_host1 = self._create_test_allocation_range(2, conductor_affinity=1)
|
||||
in_host2 = self._create_test_allocation_range(2, conductor_affinity=2,
|
||||
start_idx=2)
|
||||
|
||||
res = self.dbapi.get_allocation_list(
|
||||
filters={'conductor_affinity': 1})
|
||||
self.assertEqual(set(in_host1), {r.uuid for r in res})
|
||||
|
||||
res = self.dbapi.get_allocation_list(
|
||||
filters={'conductor_affinity': 'host2'})
|
||||
self.assertEqual(set(in_host2), {r.uuid for r in res})
|
||||
|
||||
def test_get_allocation_list_invalid_fields(self):
|
||||
self.assertRaises(exception.InvalidParameterValue,
|
||||
self.dbapi.get_allocation_list, sort_key='foo')
|
||||
|
|
Loading…
Reference in New Issue