Allocation API: taking over allocations of offline conductors

This change allows conductors to periodically check and take over
allocations that were processed by conductors that went offline.

Change-Id: Ia7b9b5bc485a66215def4a76c6682c47342b86d9
Story: #2004341
Task: #28474
This commit is contained in:
Dmitry Tantsur 2019-02-19 16:52:21 +01:00
parent b1cfcb7c98
commit 6885c674cb
7 changed files with 150 additions and 5 deletions

View File

@ -3527,6 +3527,37 @@ class ConductorManager(base_manager.BaseConductorManager):
LOG.info('Successfully deleted allocation %(uuid)s',
allocation.uuid)
@METRICS.timer('ConductorManager._check_orphan_allocations')
@periodics.periodic(
spacing=CONF.conductor.check_allocations_interval,
enabled=CONF.conductor.check_allocations_interval > 0)
def _check_orphan_allocations(self, context):
"""Periodically checks the status of allocations that were taken over.
Periodically checks the allocations assigned to a conductor that
went offline, tries to take them over and finish.
:param context: request context.
"""
offline_conductors = self.dbapi.get_offline_conductors(field='id')
for conductor_id in offline_conductors:
filters = {'state': states.ALLOCATING,
'conductor_affinity': conductor_id}
for allocation in objects.Allocation.list(context,
filters=filters):
try:
if not self.dbapi.take_over_allocation(allocation.id,
conductor_id,
self.conductor.id):
# Another conductor has taken over, skipping
continue
LOG.debug('Taking over allocation %s', allocation.uuid)
allocations.do_allocate(context, allocation)
except Exception:
LOG.exception('Unexpected exception when taking over '
'allocation %s', allocation.uuid)
@METRICS.timer('get_vendor_passthru_metadata')
def get_vendor_passthru_metadata(route_dict):

View File

@ -63,6 +63,11 @@ opts = [
min=1,
help=_('Interval (seconds) between checks of rescue '
'timeouts.')),
cfg.IntOpt('check_allocations_interval',
default=60,
min=0,
help=_('Interval between checks of orphaned allocations, '
'in seconds. Set to 0 to disable checks.')),
cfg.IntOpt('deploy_callback_timeout',
default=1800,
help=_('Timeout (seconds) to wait for a callback from '

View File

@ -549,10 +549,11 @@ class Connection(object):
"""
@abc.abstractmethod
def get_offline_conductors(self):
"""Get a list conductor hostnames that are offline (dead).
def get_offline_conductors(self, field='hostname'):
"""Get a list conductors that are offline (dead).
:returns: A list of conductor hostnames.
:param field: A field to return, hostname by default.
:returns: A list of requested fields of offline conductors.
"""
@abc.abstractmethod
@ -1158,6 +1159,23 @@ class Connection(object):
:raises: NodeAssociated
"""
@abc.abstractmethod
def take_over_allocation(self, allocation_id, old_conductor_id,
new_conductor_id):
"""Do a take over for an allocation.
The allocation is only updated if the old conductor matches the
provided value, thus guarding against races.
:param allocation_id: Allocation ID
:param old_conductor_id: The conductor ID we expect to be the current
``conductor_affinity`` of the allocation.
:param new_conductor_id: The conductor ID of the new
``conductor_affinity``.
:returns: True if the take over was successful, False otherwise.
:raises: AllocationNotFound
"""
@abc.abstractmethod
def destroy_allocation(self, allocation_id):
"""Destroy an allocation.

View File

@ -993,10 +993,11 @@ class Connection(api.Connection):
d2c[key].add(cdr_row['hostname'])
return d2c
def get_offline_conductors(self):
def get_offline_conductors(self, field='hostname'):
field = getattr(models.Conductor, field)
interval = CONF.conductor.heartbeat_timeout
limit = timeutils.utcnow() - datetime.timedelta(seconds=interval)
result = (model_query(models.Conductor.hostname)
result = (model_query(field)
.filter(models.Conductor.updated_at < limit))
return [row[0] for row in result]
@ -1750,6 +1751,39 @@ class Connection(api.Connection):
raise
return ref
@oslo_db_api.retry_on_deadlock
def take_over_allocation(self, allocation_id, old_conductor_id,
new_conductor_id):
"""Do a take over for an allocation.
The allocation is only updated if the old conductor matches the
provided value, thus guarding against races.
:param allocation_id: Allocation ID
:param old_conductor_id: The conductor ID we expect to be the current
``conductor_affinity`` of the allocation.
:param new_conductor_id: The conductor ID of the new
``conductor_affinity``.
:returns: True if the take over was successful, False otherwise.
:raises: AllocationNotFound
"""
with _session_for_write() as session:
try:
query = model_query(models.Allocation, session=session)
query = add_identity_filter(query, allocation_id)
# NOTE(dtantsur): the FOR UPDATE clause locks the allocation
ref = query.with_for_update().one()
if ref.conductor_affinity != old_conductor_id:
# Race detected, bailing out
return False
ref.update({'conductor_affinity': new_conductor_id})
session.flush()
except NoResultFound:
raise exception.AllocationNotFound(allocation=allocation_id)
else:
return True
@oslo_db_api.retry_on_deadlock
def destroy_allocation(self, allocation_id):
"""Destroy an allocation.

View File

@ -168,6 +168,37 @@ class AllocationTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
self.assertEqual(allocation.uuid, actual.uuid)
self.assertIsInstance(allocation, objects.Allocation)
@mock.patch.object(allocations, 'do_allocate', autospec=True)
def test_check_orphaned_allocations(self, mock_allocate):
alive_conductor = obj_utils.create_test_conductor(
self.context, id=42, hostname='alive')
dead_conductor = obj_utils.create_test_conductor(
self.context, id=43, hostname='dead')
obj_utils.create_test_allocation(
self.context,
state='allocating',
conductor_affinity=alive_conductor.id)
allocation = obj_utils.create_test_allocation(
self.context,
state='allocating',
conductor_affinity=dead_conductor.id)
self._start_service()
with mock.patch.object(self.dbapi, 'get_offline_conductors',
autospec=True) as mock_conds:
mock_conds.return_value = [dead_conductor.id]
self.service._check_orphan_allocations(self.context)
mock_allocate.assert_called_once_with(self.context, mock.ANY)
actual = mock_allocate.call_args[0][1]
self.assertEqual(allocation.uuid, actual.uuid)
self.assertIsInstance(allocation, objects.Allocation)
allocation = self.dbapi.get_allocation_by_id(allocation.id)
self.assertEqual(self.service.conductor.id,
allocation.conductor_affinity)
@mock.patch('time.sleep', lambda _: None)
class DoAllocateTestCase(db_base.DbTestCase):

View File

@ -244,6 +244,31 @@ class AllocationsTestCase(base.DbTestCase):
self.assertIsNone(node.instance_uuid)
self.assertNotIn('traits', node.instance_info)
def test_take_over_success(self):
for i in range(2):
db_utils.create_test_conductor(id=i, hostname='host-%d' % i)
allocation = db_utils.create_test_allocation(conductor_affinity=0)
self.assertTrue(self.dbapi.take_over_allocation(
allocation.id, old_conductor_id=0, new_conductor_id=1))
allocation = self.dbapi.get_allocation_by_id(allocation.id)
self.assertEqual(1, allocation.conductor_affinity)
def test_take_over_conflict(self):
for i in range(3):
db_utils.create_test_conductor(id=i, hostname='host-%d' % i)
allocation = db_utils.create_test_allocation(conductor_affinity=2)
self.assertFalse(self.dbapi.take_over_allocation(
allocation.id, old_conductor_id=0, new_conductor_id=1))
allocation = self.dbapi.get_allocation_by_id(allocation.id)
# The affinity was not changed
self.assertEqual(2, allocation.conductor_affinity)
def test_take_over_allocation_not_found(self):
self.assertRaises(exception.AllocationNotFound,
self.dbapi.take_over_allocation, 999, 0, 1)
def test_create_allocation_duplicated_name(self):
self.assertRaises(exception.AllocationDuplicateName,
db_utils.create_test_allocation,

View File

@ -334,6 +334,7 @@ class DbConductorTestCase(base.DbTestCase):
# 61 seconds passed since last heartbeat, it's dead
mock_utcnow.return_value = time_ + datetime.timedelta(seconds=61)
self.assertEqual([c.hostname], self.dbapi.get_offline_conductors())
self.assertEqual([c.id], self.dbapi.get_offline_conductors(field='id'))
@mock.patch.object(timeutils, 'utcnow', autospec=True)
def test_get_online_conductors(self, mock_utcnow):