From b1cfcb7c9867ea55a458b6f0ef67e916b2de5e9d Mon Sep 17 00:00:00 2001 From: Dmitry Tantsur Date: Tue, 19 Feb 2019 13:14:36 +0100 Subject: [PATCH] Allocation API: resume allocations on conductor restart This change allows allocations that were not finished because of conductor restarting or crashing to be finished after start up. Change-Id: I016e08dcb59613b59ae753ef7d3bc9ac4a4a950a Story: #2004341 Task: #29544 --- ironic/conductor/base_manager.py | 16 ++++++++++ ironic/db/sqlalchemy/api.py | 21 ++++++++++++- .../tests/unit/conductor/test_allocations.py | 31 +++++++++++++++++++ ironic/tests/unit/db/test_allocations.py | 19 ++++++++++-- 4 files changed, 84 insertions(+), 3 deletions(-) diff --git a/ironic/conductor/base_manager.py b/ironic/conductor/base_manager.py index 5156cbc941..3960688b1f 100644 --- a/ironic/conductor/base_manager.py +++ b/ironic/conductor/base_manager.py @@ -33,6 +33,7 @@ from ironic.common.i18n import _ from ironic.common import release_mappings as versions from ironic.common import rpc from ironic.common import states +from ironic.conductor import allocations from ironic.conductor import notification_utils as notify_utils from ironic.conductor import task_manager from ironic.conf import CONF @@ -204,6 +205,13 @@ class BaseConductorManager(object): LOG.critical('Failed to start keepalive') self.del_host() + # Resume allocations that started before the restart. + try: + self._spawn_worker(self._resume_allocations, + ironic_context.get_admin_context()) + except exception.NoFreeConductorWorker: + LOG.warning('Failed to start worker for resuming allocations.') + self._started = True def _use_groups(self): @@ -550,3 +558,11 @@ class BaseConductorManager(object): finally: # Yield on every iteration eventlet.sleep(0) + + def _resume_allocations(self, context): + """Resume unfinished allocations on restart.""" + filters = {'state': states.ALLOCATING, + 'conductor_affinity': self.conductor.id} + for allocation in objects.Allocation.list(context, filters=filters): + LOG.debug('Resuming unfinished allocation %s', allocation.uuid) + allocations.do_allocate(context, allocation) diff --git a/ironic/db/sqlalchemy/api.py b/ironic/db/sqlalchemy/api.py index ec7e9dd673..468d01c93b 100644 --- a/ironic/db/sqlalchemy/api.py +++ b/ironic/db/sqlalchemy/api.py @@ -199,6 +199,17 @@ def add_allocation_filter_by_node(query, value): return query.filter(models.Node.uuid == value) +def add_allocation_filter_by_conductor(query, value): + if strutils.is_int_like(value): + return query.filter_by(conductor_affinity=value) + else: + # Assume hostname and join with the conductor table + query = query.join( + models.Conductor, + models.Allocation.conductor_affinity == models.Conductor.id) + return query.filter(models.Conductor.hostname == value) + + def _paginate_query(model, limit=None, marker=None, sort_key=None, sort_dir=None, query=None): if not query: @@ -339,7 +350,8 @@ class Connection(api.Connection): def _add_allocations_filters(self, query, filters): if filters is None: filters = dict() - supported_filters = {'state', 'resource_class', 'node_uuid'} + supported_filters = {'state', 'resource_class', 'node_uuid', + 'conductor_affinity'} unsupported_filters = set(filters).difference(supported_filters) if unsupported_filters: msg = _("SqlAlchemy API does not support " @@ -353,6 +365,13 @@ class Connection(api.Connection): else: query = add_allocation_filter_by_node(query, node_uuid) + try: + conductor = filters.pop('conductor_affinity') + except KeyError: + pass + else: + query = add_allocation_filter_by_conductor(query, conductor) + if filters: query = query.filter_by(**filters) return query diff --git a/ironic/tests/unit/conductor/test_allocations.py b/ironic/tests/unit/conductor/test_allocations.py index 817c175b5c..e888fd476c 100644 --- a/ironic/tests/unit/conductor/test_allocations.py +++ b/ironic/tests/unit/conductor/test_allocations.py @@ -37,6 +37,10 @@ class AllocationTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase): allocation = obj_utils.get_test_allocation(self.context, extra={'test': 'one'}) self._start_service() + + mock_spawn.assert_any_call(self.service, + self.service._resume_allocations, + mock.ANY) mock_spawn.reset_mock() res = self.service.create_allocation(self.context, allocation) @@ -137,6 +141,33 @@ class AllocationTestCase(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase): self.assertIsNone(node['instance_uuid']) self.assertIsNone(node['allocation_id']) + @mock.patch.object(allocations, 'do_allocate', autospec=True) + def test_resume_allocations(self, mock_allocate): + another_conductor = obj_utils.create_test_conductor( + self.context, id=42, hostname='another-host') + + self._start_service() + + obj_utils.create_test_allocation( + self.context, + state='active', + conductor_affinity=self.service.conductor.id) + obj_utils.create_test_allocation( + self.context, + state='allocating', + conductor_affinity=another_conductor.id) + allocation = obj_utils.create_test_allocation( + self.context, + state='allocating', + conductor_affinity=self.service.conductor.id) + + self.service._resume_allocations(self.context) + + mock_allocate.assert_called_once_with(self.context, mock.ANY) + actual = mock_allocate.call_args[0][1] + self.assertEqual(allocation.uuid, actual.uuid) + self.assertIsInstance(allocation, objects.Allocation) + @mock.patch('time.sleep', lambda _: None) class DoAllocateTestCase(db_base.DbTestCase): diff --git a/ironic/tests/unit/db/test_allocations.py b/ironic/tests/unit/db/test_allocations.py index 425c80909f..37a1ed5190 100644 --- a/ironic/tests/unit/db/test_allocations.py +++ b/ironic/tests/unit/db/test_allocations.py @@ -33,7 +33,7 @@ class AllocationsTestCase(base.DbTestCase): self.assertIsNotNone(allocation.uuid) self.assertEqual('allocating', allocation.state) - def _create_test_allocation_range(self, count, **kw): + def _create_test_allocation_range(self, count, start_idx=0, **kw): """Create the specified number of test allocation entries in DB It uses create_test_allocation method. And returns List of Allocation @@ -46,7 +46,7 @@ class AllocationsTestCase(base.DbTestCase): return [db_utils.create_test_allocation(uuid=uuidutils.generate_uuid(), name='allocation' + str(i), **kw).uuid - for i in range(count)] + for i in range(start_idx, count + start_idx)] def test_get_allocation_by_id(self): res = self.dbapi.get_allocation_by_id(self.allocation.id) @@ -117,6 +117,21 @@ class AllocationsTestCase(base.DbTestCase): filters={'resource_class': 'very-large'}) self.assertEqual([self.allocation.uuid], [r.uuid for r in res]) + def test_get_allocation_list_filter_by_conductor_affinity(self): + db_utils.create_test_conductor(id=1, hostname='host1') + db_utils.create_test_conductor(id=2, hostname='host2') + in_host1 = self._create_test_allocation_range(2, conductor_affinity=1) + in_host2 = self._create_test_allocation_range(2, conductor_affinity=2, + start_idx=2) + + res = self.dbapi.get_allocation_list( + filters={'conductor_affinity': 1}) + self.assertEqual(set(in_host1), {r.uuid for r in res}) + + res = self.dbapi.get_allocation_list( + filters={'conductor_affinity': 'host2'}) + self.assertEqual(set(in_host2), {r.uuid for r in res}) + def test_get_allocation_list_invalid_fields(self): self.assertRaises(exception.InvalidParameterValue, self.dbapi.get_allocation_list, sort_key='foo')