Rework scheduling filters
Remove ValidationFilter, make it a part of reservation to avoid calling it too many times. Fix AttributeError on failing the custom predicate. Remove double validation in the reserver, we do another validation later on anyway. BREAKING: changes the exception classes. Change-Id: Ibc3815989917ab777298a05810fd8f3e64debc8f Story: #2003584 Task: #26178
This commit is contained in:
parent
d20424fb0e
commit
51a006e307
|
@ -95,29 +95,24 @@ class Provisioner(object):
|
|||
|
||||
if candidates:
|
||||
nodes = [self._api.get_node(node) for node in candidates]
|
||||
if resource_class:
|
||||
nodes = [node for node in nodes
|
||||
if node.resource_class == resource_class]
|
||||
if conductor_group is not None:
|
||||
nodes = [node for node in nodes
|
||||
if node.conductor_group == conductor_group]
|
||||
filters = [
|
||||
_scheduler.NodeTypeFilter(resource_class, conductor_group),
|
||||
]
|
||||
else:
|
||||
nodes = self._api.list_nodes(resource_class=resource_class,
|
||||
conductor_group=conductor_group)
|
||||
if not nodes:
|
||||
raise exceptions.NodesNotFound(resource_class, conductor_group)
|
||||
# Ensure parallel executions don't try nodes in the same sequence
|
||||
random.shuffle(nodes)
|
||||
# No need to filter by resource_class and conductor_group any more
|
||||
filters = []
|
||||
|
||||
if not nodes:
|
||||
raise exceptions.NodesNotFound(resource_class, conductor_group)
|
||||
LOG.debug('Candidate nodes: %s', nodes)
|
||||
|
||||
LOG.debug('Ironic nodes: %s', nodes)
|
||||
|
||||
filters = [_scheduler.CapabilitiesFilter(capabilities),
|
||||
_scheduler.ValidationFilter(self._api)]
|
||||
filters.append(_scheduler.CapabilitiesFilter(capabilities))
|
||||
if predicate is not None:
|
||||
# NOTE(dtantsur): run the provided predicate before the validation,
|
||||
# since validation requires network interactions.
|
||||
filters.insert(-1, predicate)
|
||||
filters.append(_scheduler.CustomPredicateFilter(predicate))
|
||||
|
||||
reserver = _scheduler.IronicReserver(self._api)
|
||||
node = _scheduler.schedule_node(nodes, filters, reserver,
|
||||
|
|
|
@ -114,6 +114,26 @@ def schedule_node(nodes, filters, reserver, dry_run=False):
|
|||
assert False, "BUG: %s.fail did not raise" % reserver.__class__.__name__
|
||||
|
||||
|
||||
class NodeTypeFilter(Filter):
|
||||
"""Filter that checks resource class and conductor group."""
|
||||
|
||||
def __init__(self, resource_class=None, conductor_group=None):
|
||||
self.resource_class = resource_class
|
||||
self.conductor_group = conductor_group
|
||||
|
||||
def __call__(self, node):
|
||||
return (
|
||||
(self.resource_class is None or
|
||||
node.resource_class == self.resource_class) and
|
||||
(self.conductor_group is None or
|
||||
node.conductor_group == self.conductor_group)
|
||||
)
|
||||
|
||||
def fail(self):
|
||||
raise exceptions.NodesNotFound(self.resource_class,
|
||||
self.conductor_group)
|
||||
|
||||
|
||||
class CapabilitiesFilter(Filter):
|
||||
"""Filter that checks capabilities."""
|
||||
|
||||
|
@ -161,31 +181,22 @@ class CapabilitiesFilter(Filter):
|
|||
raise exceptions.CapabilitiesNotFound(message, self._capabilities)
|
||||
|
||||
|
||||
class ValidationFilter(Filter):
|
||||
"""Filter that runs validation on nodes."""
|
||||
class CustomPredicateFilter(Filter):
|
||||
|
||||
def __init__(self, api):
|
||||
self._api = api
|
||||
self._messages = []
|
||||
def __init__(self, predicate):
|
||||
self.predicate = predicate
|
||||
self._failed_nodes = []
|
||||
|
||||
def __call__(self, node):
|
||||
try:
|
||||
self._api.validate_node(node.uuid)
|
||||
except RuntimeError as exc:
|
||||
message = ('Node %(node)s failed validation: %(err)s' %
|
||||
{'node': _utils.log_node(node), 'err': exc})
|
||||
LOG.warning(message)
|
||||
self._messages.append(message)
|
||||
if not self.predicate(node):
|
||||
self._failed_nodes.append(node)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def fail(self):
|
||||
errors = ", ".join(self._messages)
|
||||
message = "All available nodes have failed validation: %s" % errors
|
||||
raise exceptions.ValidationFailed(message, self._failed_nodes)
|
||||
message = 'No nodes satisfied the custom predicate %s' % self.predicate
|
||||
raise exceptions.CustomPredicateFailed(message, self._failed_nodes)
|
||||
|
||||
|
||||
class IronicReserver(Reserver):
|
||||
|
@ -194,26 +205,22 @@ class IronicReserver(Reserver):
|
|||
self._api = api
|
||||
self._failed_nodes = []
|
||||
|
||||
def validate(self, node):
|
||||
try:
|
||||
self._api.validate_node(node)
|
||||
except RuntimeError as exc:
|
||||
message = ('Node %(node)s failed validation: %(err)s' %
|
||||
{'node': _utils.log_node(node), 'err': exc})
|
||||
LOG.warning(message)
|
||||
raise exceptions.ValidationFailed(message)
|
||||
|
||||
def __call__(self, node):
|
||||
try:
|
||||
result = self._api.reserve_node(node, instance_uuid=node.uuid)
|
||||
|
||||
# Try validation again to be sure nothing has changed
|
||||
validator = ValidationFilter(self._api)
|
||||
if not validator(result):
|
||||
LOG.warning('Validation of node %s failed after reservation',
|
||||
_utils.log_node(node))
|
||||
try:
|
||||
self._api.release_node(node)
|
||||
except Exception:
|
||||
LOG.exception('Failed to release the reserved node %s',
|
||||
_utils.log_node(node))
|
||||
validator.fail()
|
||||
|
||||
return result
|
||||
self.validate(node)
|
||||
return self._api.reserve_node(node, instance_uuid=node.uuid)
|
||||
except Exception:
|
||||
self._failed_nodes.append(node)
|
||||
raise
|
||||
|
||||
def fail(self):
|
||||
raise exceptions.AllNodesReserved(self._failed_nodes)
|
||||
raise exceptions.NoNodesReserved(self._failed_nodes)
|
||||
|
|
|
@ -45,6 +45,17 @@ class NodesNotFound(ReservationFailed):
|
|||
super(NodesNotFound, self).__init__(message)
|
||||
|
||||
|
||||
class CustomPredicateFailed(ReservationFailed):
|
||||
"""Custom predicate yielded no nodes.
|
||||
|
||||
:ivar nodes: List of nodes that were checked.
|
||||
"""
|
||||
|
||||
def __init__(self, message, nodes):
|
||||
self.nodes = nodes
|
||||
super(CustomPredicateFailed, self).__init__(message)
|
||||
|
||||
|
||||
class CapabilitiesNotFound(ReservationFailed):
|
||||
"""Requested capabilities do not match any nodes.
|
||||
|
||||
|
@ -57,26 +68,20 @@ class CapabilitiesNotFound(ReservationFailed):
|
|||
|
||||
|
||||
class ValidationFailed(ReservationFailed):
|
||||
"""Validation failed for all requested nodes.
|
||||
|
||||
:ivar nodes: List of nodes that were checked.
|
||||
"""
|
||||
|
||||
def __init__(self, message, nodes):
|
||||
self.nodes = nodes
|
||||
super(ValidationFailed, self).__init__(message)
|
||||
"""Validation failed for all requested nodes."""
|
||||
|
||||
|
||||
class AllNodesReserved(ReservationFailed):
|
||||
"""All nodes are already reserved.
|
||||
class NoNodesReserved(ReservationFailed):
|
||||
"""All nodes are already reserved or failed validation.
|
||||
|
||||
:ivar nodes: List of nodes that were checked.
|
||||
"""
|
||||
|
||||
def __init__(self, nodes):
|
||||
self.nodes = nodes
|
||||
message = 'All the candidate nodes are already reserved'
|
||||
super(AllNodesReserved, self).__init__(message)
|
||||
message = ('All the candidate nodes are already reserved '
|
||||
'or failed validation')
|
||||
super(NoNodesReserved, self).__init__(message)
|
||||
|
||||
|
||||
class InvalidImage(Error):
|
||||
|
|
|
@ -156,6 +156,25 @@ class TestReserveNode(Base):
|
|||
self.assertEqual(node, nodes[1])
|
||||
self.assertFalse(self.api.update_node.called)
|
||||
|
||||
def test_custom_predicate_false(self):
|
||||
nodes = [
|
||||
mock.Mock(spec=['uuid', 'name', 'properties'],
|
||||
properties={'local_gb': 100}),
|
||||
mock.Mock(spec=['uuid', 'name', 'properties'],
|
||||
properties={'local_gb': 150}),
|
||||
mock.Mock(spec=['uuid', 'name', 'properties'],
|
||||
properties={'local_gb': 200}),
|
||||
]
|
||||
self.api.list_nodes.return_value = nodes[:]
|
||||
|
||||
self.assertRaisesRegex(exceptions.CustomPredicateFailed,
|
||||
'custom predicate',
|
||||
self.pr.reserve_node,
|
||||
predicate=lambda node: False)
|
||||
|
||||
self.assertFalse(self.api.update_node.called)
|
||||
self.assertFalse(self.api.reserve_node.called)
|
||||
|
||||
def test_provided_node(self):
|
||||
nodes = [
|
||||
mock.Mock(spec=['uuid', 'name', 'properties'],
|
||||
|
@ -226,6 +245,26 @@ class TestReserveNode(Base):
|
|||
self.api.update_node.assert_called_once_with(
|
||||
node, {'/instance_info/capabilities': {'cat': 'meow'}})
|
||||
|
||||
def test_provided_nodes_no_match(self):
|
||||
nodes = [
|
||||
mock.Mock(spec=['uuid', 'name', 'properties', 'resource_class',
|
||||
'conductor_group'],
|
||||
properties={'local_gb': 100}, resource_class='compute',
|
||||
conductor_group='loc1'),
|
||||
mock.Mock(spec=['uuid', 'name', 'properties', 'resource_class',
|
||||
'conductor_group'],
|
||||
properties={'local_gb': 100}, resource_class='control',
|
||||
conductor_group='loc2'),
|
||||
]
|
||||
|
||||
self.assertRaises(exceptions.NodesNotFound,
|
||||
self.pr.reserve_node, candidates=nodes,
|
||||
resource_class='control', conductor_group='loc1')
|
||||
|
||||
self.assertFalse(self.api.list_nodes.called)
|
||||
self.assertFalse(self.api.reserve_node.called)
|
||||
self.assertFalse(self.api.update_node.called)
|
||||
|
||||
|
||||
CLEAN_UP = {
|
||||
'/extra/metalsmith_created_ports': _os_api.REMOVE,
|
||||
|
|
|
@ -164,77 +164,39 @@ class TestCapabilitiesFilter(testtools.TestCase):
|
|||
fltr.fail)
|
||||
|
||||
|
||||
class TestValidationFilter(testtools.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestValidationFilter, self).setUp()
|
||||
self.api = mock.Mock(spec=['validate_node'])
|
||||
self.fltr = _scheduler.ValidationFilter(self.api)
|
||||
|
||||
def test_pass(self):
|
||||
node = mock.Mock(spec=['uuid', 'name'])
|
||||
self.assertTrue(self.fltr(node))
|
||||
|
||||
def test_fail_validation(self):
|
||||
node = mock.Mock(spec=['uuid', 'name'])
|
||||
self.api.validate_node.side_effect = RuntimeError('boom')
|
||||
self.assertFalse(self.fltr(node))
|
||||
|
||||
self.assertRaisesRegex(exceptions.ValidationFailed,
|
||||
'All available nodes have failed validation: '
|
||||
'Node .* failed validation: boom',
|
||||
self.fltr.fail)
|
||||
|
||||
|
||||
@mock.patch.object(_scheduler, 'ValidationFilter', autospec=True)
|
||||
class TestIronicReserver(testtools.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestIronicReserver, self).setUp()
|
||||
self.node = mock.Mock(spec=['uuid', 'name'])
|
||||
self.api = mock.Mock(spec=['reserve_node', 'release_node'])
|
||||
self.api = mock.Mock(spec=['reserve_node', 'release_node',
|
||||
'validate_node'])
|
||||
self.api.reserve_node.side_effect = lambda node, instance_uuid: node
|
||||
self.reserver = _scheduler.IronicReserver(self.api)
|
||||
|
||||
def test_fail(self, mock_validation):
|
||||
self.assertRaisesRegex(exceptions.AllNodesReserved,
|
||||
def test_fail(self):
|
||||
self.assertRaisesRegex(exceptions.NoNodesReserved,
|
||||
'All the candidate nodes are already reserved',
|
||||
self.reserver.fail)
|
||||
|
||||
def test_ok(self, mock_validation):
|
||||
def test_ok(self):
|
||||
self.assertEqual(self.node, self.reserver(self.node))
|
||||
self.api.validate_node.assert_called_with(self.node)
|
||||
self.api.reserve_node.assert_called_once_with(
|
||||
self.node, instance_uuid=self.node.uuid)
|
||||
mock_validation.return_value.assert_called_once_with(self.node)
|
||||
|
||||
def test_reservation_failed(self, mock_validation):
|
||||
def test_reservation_failed(self):
|
||||
self.api.reserve_node.side_effect = RuntimeError('conflict')
|
||||
self.assertRaisesRegex(RuntimeError, 'conflict',
|
||||
self.reserver, self.node)
|
||||
self.api.validate_node.assert_called_with(self.node)
|
||||
self.api.reserve_node.assert_called_once_with(
|
||||
self.node, instance_uuid=self.node.uuid)
|
||||
self.assertFalse(mock_validation.return_value.called)
|
||||
|
||||
def test_validation_failed(self, mock_validation):
|
||||
mock_validation.return_value.return_value = False
|
||||
mock_validation.return_value.fail.side_effect = RuntimeError('fail')
|
||||
self.assertRaisesRegex(RuntimeError, 'fail',
|
||||
def test_validation_failed(self):
|
||||
self.api.validate_node.side_effect = RuntimeError('fail')
|
||||
self.assertRaisesRegex(exceptions.ValidationFailed, 'fail',
|
||||
self.reserver, self.node)
|
||||
self.api.reserve_node.assert_called_once_with(
|
||||
self.node, instance_uuid=self.node.uuid)
|
||||
mock_validation.return_value.assert_called_once_with(self.node)
|
||||
self.api.release_node.assert_called_once_with(self.node)
|
||||
|
||||
@mock.patch.object(_scheduler.LOG, 'exception', autospec=True)
|
||||
def test_validation_and_release_failed(self, mock_log_exc,
|
||||
mock_validation):
|
||||
mock_validation.return_value.return_value = False
|
||||
mock_validation.return_value.fail.side_effect = RuntimeError('fail')
|
||||
self.api.release_node.side_effect = Exception()
|
||||
self.assertRaisesRegex(RuntimeError, 'fail',
|
||||
self.reserver, self.node)
|
||||
self.api.reserve_node.assert_called_once_with(
|
||||
self.node, instance_uuid=self.node.uuid)
|
||||
mock_validation.return_value.assert_called_once_with(self.node)
|
||||
self.api.release_node.assert_called_once_with(self.node)
|
||||
self.assertTrue(mock_log_exc.called)
|
||||
self.api.validate_node.assert_called_once_with(self.node)
|
||||
self.assertFalse(self.api.reserve_node.called)
|
||||
self.assertFalse(self.api.release_node.called)
|
||||
|
|
Loading…
Reference in New Issue