Rework scheduling filters

Remove ValidationFilter, make it a part of reservation to avoid calling
it too many times. Fix AttributeError on failing the custom predicate.
Remove double validation in the reserver, we do another validation later
on anyway.

BREAKING: changes the exception classes.

Change-Id: Ibc3815989917ab777298a05810fd8f3e64debc8f
Story: #2003584
Task: #26178
This commit is contained in:
Dmitry Tantsur 2018-09-07 15:31:58 +02:00
parent d20424fb0e
commit 51a006e307
5 changed files with 118 additions and 110 deletions

View File

@ -95,29 +95,24 @@ class Provisioner(object):
if candidates:
nodes = [self._api.get_node(node) for node in candidates]
if resource_class:
nodes = [node for node in nodes
if node.resource_class == resource_class]
if conductor_group is not None:
nodes = [node for node in nodes
if node.conductor_group == conductor_group]
filters = [
_scheduler.NodeTypeFilter(resource_class, conductor_group),
]
else:
nodes = self._api.list_nodes(resource_class=resource_class,
conductor_group=conductor_group)
if not nodes:
raise exceptions.NodesNotFound(resource_class, conductor_group)
# Ensure parallel executions don't try nodes in the same sequence
random.shuffle(nodes)
# No need to filter by resource_class and conductor_group any more
filters = []
if not nodes:
raise exceptions.NodesNotFound(resource_class, conductor_group)
LOG.debug('Candidate nodes: %s', nodes)
LOG.debug('Ironic nodes: %s', nodes)
filters = [_scheduler.CapabilitiesFilter(capabilities),
_scheduler.ValidationFilter(self._api)]
filters.append(_scheduler.CapabilitiesFilter(capabilities))
if predicate is not None:
# NOTE(dtantsur): run the provided predicate before the validation,
# since validation requires network interactions.
filters.insert(-1, predicate)
filters.append(_scheduler.CustomPredicateFilter(predicate))
reserver = _scheduler.IronicReserver(self._api)
node = _scheduler.schedule_node(nodes, filters, reserver,

View File

@ -114,6 +114,26 @@ def schedule_node(nodes, filters, reserver, dry_run=False):
assert False, "BUG: %s.fail did not raise" % reserver.__class__.__name__
class NodeTypeFilter(Filter):
"""Filter that checks resource class and conductor group."""
def __init__(self, resource_class=None, conductor_group=None):
self.resource_class = resource_class
self.conductor_group = conductor_group
def __call__(self, node):
return (
(self.resource_class is None or
node.resource_class == self.resource_class) and
(self.conductor_group is None or
node.conductor_group == self.conductor_group)
)
def fail(self):
raise exceptions.NodesNotFound(self.resource_class,
self.conductor_group)
class CapabilitiesFilter(Filter):
"""Filter that checks capabilities."""
@ -161,31 +181,22 @@ class CapabilitiesFilter(Filter):
raise exceptions.CapabilitiesNotFound(message, self._capabilities)
class ValidationFilter(Filter):
"""Filter that runs validation on nodes."""
class CustomPredicateFilter(Filter):
def __init__(self, api):
self._api = api
self._messages = []
def __init__(self, predicate):
self.predicate = predicate
self._failed_nodes = []
def __call__(self, node):
try:
self._api.validate_node(node.uuid)
except RuntimeError as exc:
message = ('Node %(node)s failed validation: %(err)s' %
{'node': _utils.log_node(node), 'err': exc})
LOG.warning(message)
self._messages.append(message)
if not self.predicate(node):
self._failed_nodes.append(node)
return False
return True
def fail(self):
errors = ", ".join(self._messages)
message = "All available nodes have failed validation: %s" % errors
raise exceptions.ValidationFailed(message, self._failed_nodes)
message = 'No nodes satisfied the custom predicate %s' % self.predicate
raise exceptions.CustomPredicateFailed(message, self._failed_nodes)
class IronicReserver(Reserver):
@ -194,26 +205,22 @@ class IronicReserver(Reserver):
self._api = api
self._failed_nodes = []
def validate(self, node):
try:
self._api.validate_node(node)
except RuntimeError as exc:
message = ('Node %(node)s failed validation: %(err)s' %
{'node': _utils.log_node(node), 'err': exc})
LOG.warning(message)
raise exceptions.ValidationFailed(message)
def __call__(self, node):
try:
result = self._api.reserve_node(node, instance_uuid=node.uuid)
# Try validation again to be sure nothing has changed
validator = ValidationFilter(self._api)
if not validator(result):
LOG.warning('Validation of node %s failed after reservation',
_utils.log_node(node))
try:
self._api.release_node(node)
except Exception:
LOG.exception('Failed to release the reserved node %s',
_utils.log_node(node))
validator.fail()
return result
self.validate(node)
return self._api.reserve_node(node, instance_uuid=node.uuid)
except Exception:
self._failed_nodes.append(node)
raise
def fail(self):
raise exceptions.AllNodesReserved(self._failed_nodes)
raise exceptions.NoNodesReserved(self._failed_nodes)

View File

@ -45,6 +45,17 @@ class NodesNotFound(ReservationFailed):
super(NodesNotFound, self).__init__(message)
class CustomPredicateFailed(ReservationFailed):
"""Custom predicate yielded no nodes.
:ivar nodes: List of nodes that were checked.
"""
def __init__(self, message, nodes):
self.nodes = nodes
super(CustomPredicateFailed, self).__init__(message)
class CapabilitiesNotFound(ReservationFailed):
"""Requested capabilities do not match any nodes.
@ -57,26 +68,20 @@ class CapabilitiesNotFound(ReservationFailed):
class ValidationFailed(ReservationFailed):
"""Validation failed for all requested nodes.
:ivar nodes: List of nodes that were checked.
"""
def __init__(self, message, nodes):
self.nodes = nodes
super(ValidationFailed, self).__init__(message)
"""Validation failed for all requested nodes."""
class AllNodesReserved(ReservationFailed):
"""All nodes are already reserved.
class NoNodesReserved(ReservationFailed):
"""All nodes are already reserved or failed validation.
:ivar nodes: List of nodes that were checked.
"""
def __init__(self, nodes):
self.nodes = nodes
message = 'All the candidate nodes are already reserved'
super(AllNodesReserved, self).__init__(message)
message = ('All the candidate nodes are already reserved '
'or failed validation')
super(NoNodesReserved, self).__init__(message)
class InvalidImage(Error):

View File

@ -156,6 +156,25 @@ class TestReserveNode(Base):
self.assertEqual(node, nodes[1])
self.assertFalse(self.api.update_node.called)
def test_custom_predicate_false(self):
nodes = [
mock.Mock(spec=['uuid', 'name', 'properties'],
properties={'local_gb': 100}),
mock.Mock(spec=['uuid', 'name', 'properties'],
properties={'local_gb': 150}),
mock.Mock(spec=['uuid', 'name', 'properties'],
properties={'local_gb': 200}),
]
self.api.list_nodes.return_value = nodes[:]
self.assertRaisesRegex(exceptions.CustomPredicateFailed,
'custom predicate',
self.pr.reserve_node,
predicate=lambda node: False)
self.assertFalse(self.api.update_node.called)
self.assertFalse(self.api.reserve_node.called)
def test_provided_node(self):
nodes = [
mock.Mock(spec=['uuid', 'name', 'properties'],
@ -226,6 +245,26 @@ class TestReserveNode(Base):
self.api.update_node.assert_called_once_with(
node, {'/instance_info/capabilities': {'cat': 'meow'}})
def test_provided_nodes_no_match(self):
nodes = [
mock.Mock(spec=['uuid', 'name', 'properties', 'resource_class',
'conductor_group'],
properties={'local_gb': 100}, resource_class='compute',
conductor_group='loc1'),
mock.Mock(spec=['uuid', 'name', 'properties', 'resource_class',
'conductor_group'],
properties={'local_gb': 100}, resource_class='control',
conductor_group='loc2'),
]
self.assertRaises(exceptions.NodesNotFound,
self.pr.reserve_node, candidates=nodes,
resource_class='control', conductor_group='loc1')
self.assertFalse(self.api.list_nodes.called)
self.assertFalse(self.api.reserve_node.called)
self.assertFalse(self.api.update_node.called)
CLEAN_UP = {
'/extra/metalsmith_created_ports': _os_api.REMOVE,

View File

@ -164,77 +164,39 @@ class TestCapabilitiesFilter(testtools.TestCase):
fltr.fail)
class TestValidationFilter(testtools.TestCase):
def setUp(self):
super(TestValidationFilter, self).setUp()
self.api = mock.Mock(spec=['validate_node'])
self.fltr = _scheduler.ValidationFilter(self.api)
def test_pass(self):
node = mock.Mock(spec=['uuid', 'name'])
self.assertTrue(self.fltr(node))
def test_fail_validation(self):
node = mock.Mock(spec=['uuid', 'name'])
self.api.validate_node.side_effect = RuntimeError('boom')
self.assertFalse(self.fltr(node))
self.assertRaisesRegex(exceptions.ValidationFailed,
'All available nodes have failed validation: '
'Node .* failed validation: boom',
self.fltr.fail)
@mock.patch.object(_scheduler, 'ValidationFilter', autospec=True)
class TestIronicReserver(testtools.TestCase):
def setUp(self):
super(TestIronicReserver, self).setUp()
self.node = mock.Mock(spec=['uuid', 'name'])
self.api = mock.Mock(spec=['reserve_node', 'release_node'])
self.api = mock.Mock(spec=['reserve_node', 'release_node',
'validate_node'])
self.api.reserve_node.side_effect = lambda node, instance_uuid: node
self.reserver = _scheduler.IronicReserver(self.api)
def test_fail(self, mock_validation):
self.assertRaisesRegex(exceptions.AllNodesReserved,
def test_fail(self):
self.assertRaisesRegex(exceptions.NoNodesReserved,
'All the candidate nodes are already reserved',
self.reserver.fail)
def test_ok(self, mock_validation):
def test_ok(self):
self.assertEqual(self.node, self.reserver(self.node))
self.api.validate_node.assert_called_with(self.node)
self.api.reserve_node.assert_called_once_with(
self.node, instance_uuid=self.node.uuid)
mock_validation.return_value.assert_called_once_with(self.node)
def test_reservation_failed(self, mock_validation):
def test_reservation_failed(self):
self.api.reserve_node.side_effect = RuntimeError('conflict')
self.assertRaisesRegex(RuntimeError, 'conflict',
self.reserver, self.node)
self.api.validate_node.assert_called_with(self.node)
self.api.reserve_node.assert_called_once_with(
self.node, instance_uuid=self.node.uuid)
self.assertFalse(mock_validation.return_value.called)
def test_validation_failed(self, mock_validation):
mock_validation.return_value.return_value = False
mock_validation.return_value.fail.side_effect = RuntimeError('fail')
self.assertRaisesRegex(RuntimeError, 'fail',
def test_validation_failed(self):
self.api.validate_node.side_effect = RuntimeError('fail')
self.assertRaisesRegex(exceptions.ValidationFailed, 'fail',
self.reserver, self.node)
self.api.reserve_node.assert_called_once_with(
self.node, instance_uuid=self.node.uuid)
mock_validation.return_value.assert_called_once_with(self.node)
self.api.release_node.assert_called_once_with(self.node)
@mock.patch.object(_scheduler.LOG, 'exception', autospec=True)
def test_validation_and_release_failed(self, mock_log_exc,
mock_validation):
mock_validation.return_value.return_value = False
mock_validation.return_value.fail.side_effect = RuntimeError('fail')
self.api.release_node.side_effect = Exception()
self.assertRaisesRegex(RuntimeError, 'fail',
self.reserver, self.node)
self.api.reserve_node.assert_called_once_with(
self.node, instance_uuid=self.node.uuid)
mock_validation.return_value.assert_called_once_with(self.node)
self.api.release_node.assert_called_once_with(self.node)
self.assertTrue(mock_log_exc.called)
self.api.validate_node.assert_called_once_with(self.node)
self.assertFalse(self.api.reserve_node.called)
self.assertFalse(self.api.release_node.called)