Implemented support for graph metadata

Added support for node filters and node_transitions.
If one of node_transitions status is not specified,
the default will be used, where default is:
- on success:  switch node to status ready
- on error: switch node to status error
- on stop: swith node to status stopped

Change-Id: I8b4d49dc1bada2479017697bf5858e85958579f2
Blueprint: graph-concept-extension
This commit is contained in:
Bulat Gaifullin 2016-08-19 19:47:57 +03:00
parent 590b10285b
commit 06d1396059
6 changed files with 266 additions and 72 deletions

View File

@ -1137,6 +1137,7 @@ class Cluster(NailgunObject):
dict_update(graph_metadata, plugins_deployment_graph)
dict_update(graph_metadata, cluster_deployment_graph)
graph_metadata['tasks'] = tasks
graph_metadata['type'] = graph_type
return graph_metadata
@classmethod

View File

@ -53,6 +53,13 @@ class TestGraphExecutorHandler(base.BaseIntegrationTest):
instance=self.cluster,
graph_type='test_graph'
)
self.expected_metadata = {
'fault_tolerance_groups': [],
'node_statuses_transitions': {
'successful': {'status': consts.NODE_STATUSES.ready},
'failed': {'status': consts.NODE_STATUSES.error},
'stopped': {'status': consts.NODE_STATUSES.stopped}}
}
@mock.patch('nailgun.transactions.manager.rpc')
def test_execute(self, rpc_mock):
@ -73,7 +80,7 @@ class TestGraphExecutorHandler(base.BaseIntegrationTest):
'naily',
[{
'args': {
'tasks_metadata': {'fault_tolerance_groups': []},
'tasks_metadata': self.expected_metadata,
'task_uuid': sub_task.uuid,
'tasks_graph': {
None: [],

View File

@ -53,8 +53,15 @@ class TestTransactionManager(base.BaseIntegrationTest):
graph_type='test_graph')
self.manager = manager.TransactionsManager(self.cluster.id)
self.receiver = receiver.NailgunReceiver
self.expected_metadata = {
'fault_tolerance_groups': [],
'node_statuses_transitions': {
'successful': {'status': consts.NODE_STATUSES.ready},
'failed': {'status': consts.NODE_STATUSES.error},
'stopped': {'status': consts.NODE_STATUSES.stopped}}
}
def _sucess(self, transaction_uuid):
def _success(self, transaction_uuid):
self.receiver.transaction_resp(
task_uuid=transaction_uuid,
nodes=[
@ -82,7 +89,7 @@ class TestTransactionManager(base.BaseIntegrationTest):
'naily',
[{
'args': {
'tasks_metadata': {'fault_tolerance_groups': []},
'tasks_metadata': self.expected_metadata,
'task_uuid': task.subtasks[0].uuid,
'tasks_graph': {
None: [],
@ -103,7 +110,7 @@ class TestTransactionManager(base.BaseIntegrationTest):
'api_version': '1'
}])
self._sucess(task.subtasks[0].uuid)
self._success(task.subtasks[0].uuid)
self.assertEqual(task.status, consts.TASK_STATUSES.ready)
@mock.patch('nailgun.transactions.manager.rpc')
@ -138,7 +145,7 @@ class TestTransactionManager(base.BaseIntegrationTest):
'naily',
[{
'args': {
'tasks_metadata': {'fault_tolerance_groups': []},
'tasks_metadata': self.expected_metadata,
'task_uuid': task.subtasks[0].uuid,
'tasks_graph': {
None: [],
@ -160,14 +167,14 @@ class TestTransactionManager(base.BaseIntegrationTest):
}])
# Consider we've got success from Astute.
self._sucess(task.subtasks[0].uuid)
self._success(task.subtasks[0].uuid)
# It's time to send the second graph to execution.
rpc_mock.cast.assert_called_with(
'naily',
[{
'args': {
'tasks_metadata': {'fault_tolerance_groups': []},
'tasks_metadata': self.expected_metadata,
'task_uuid': task.subtasks[1].uuid,
'tasks_graph': {
None: [],
@ -189,7 +196,7 @@ class TestTransactionManager(base.BaseIntegrationTest):
}])
# Consider we've got success from Astute.
self._sucess(task.subtasks[1].uuid)
self._success(task.subtasks[1].uuid)
# Ensure the top leve transaction is ready.
self.assertEqual(task.status, consts.TASK_STATUSES.ready)
@ -226,7 +233,7 @@ class TestTransactionManager(base.BaseIntegrationTest):
'naily',
[{
'args': {
'tasks_metadata': {'fault_tolerance_groups': []},
'tasks_metadata': self.expected_metadata,
'task_uuid': task.subtasks[0].uuid,
'tasks_graph': {
None: [],
@ -271,7 +278,7 @@ class TestTransactionManager(base.BaseIntegrationTest):
'naily',
[{
'args': {
'tasks_metadata': {'fault_tolerance_groups': []},
'tasks_metadata': self.expected_metadata,
'task_uuid': task.subtasks[0].uuid,
'tasks_graph': {
None: [],
@ -300,7 +307,7 @@ class TestTransactionManager(base.BaseIntegrationTest):
}
])
self._sucess(task.subtasks[0].uuid)
self._success(task.subtasks[0].uuid)
self.assertEqual(task.status, consts.TASK_STATUSES.ready)
@mock.patch('nailgun.transactions.manager.rpc')
@ -315,7 +322,7 @@ class TestTransactionManager(base.BaseIntegrationTest):
'naily',
[{
'args': {
'tasks_metadata': {'fault_tolerance_groups': []},
'tasks_metadata': self.expected_metadata,
'task_uuid': task.subtasks[0].uuid,
'tasks_graph': {
None: [],
@ -335,7 +342,7 @@ class TestTransactionManager(base.BaseIntegrationTest):
'api_version': '1'
}])
self._sucess(task.subtasks[0].uuid)
self._success(task.subtasks[0].uuid)
self.assertEqual(task.status, consts.TASK_STATUSES.ready)
@mock.patch('nailgun.transactions.manager.rpc')
@ -347,7 +354,7 @@ class TestTransactionManager(base.BaseIntegrationTest):
'naily',
[{
'args': {
'tasks_metadata': {'fault_tolerance_groups': []},
'tasks_metadata': self.expected_metadata,
'task_uuid': task.subtasks[0].uuid,
'tasks_graph': {
None: [],
@ -368,7 +375,7 @@ class TestTransactionManager(base.BaseIntegrationTest):
'api_version': '1'
}])
self._sucess(task.subtasks[0].uuid)
self._success(task.subtasks[0].uuid)
self.assertEqual(task.status, consts.TASK_STATUSES.ready)
@mock.patch('nailgun.transactions.manager.rpc')
@ -386,7 +393,7 @@ class TestTransactionManager(base.BaseIntegrationTest):
'naily',
[{
'args': {
'tasks_metadata': {'fault_tolerance_groups': []},
'tasks_metadata': self.expected_metadata,
'task_uuid': task.subtasks[0].uuid,
'tasks_graph': {
None: [],
@ -408,5 +415,58 @@ class TestTransactionManager(base.BaseIntegrationTest):
}]
)
self._sucess(task.subtasks[0].uuid)
self._success(task.subtasks[0].uuid)
self.assertEqual(task.status, consts.TASK_STATUSES.ready)
@mock.patch('nailgun.transactions.manager.rpc')
def test_execute_with_node_filter(self, rpc_mock):
node = self.env.create_node(
cluster_id=self.cluster.id, pending_deletion=True,
roles=["compute"]
)
objects.DeploymentGraph.create_for_model(
{
'tasks': [
{
'id': 'delete_node',
'type': consts.ORCHESTRATOR_TASK_TYPES.puppet,
'roles': ['/.*/']
},
],
'name': 'deletion_graph',
'node_filter': '$.pending_deletion'
},
instance=self.cluster,
graph_type='deletion_graph',
)
task = self.manager.execute(graphs=[{"type": "deletion_graph"}])
self.assertNotEqual(consts.TASK_STATUSES.error, task.status)
rpc_mock.cast.assert_called_once_with(
'naily',
[{
'args': {
'tasks_metadata': self.expected_metadata,
'task_uuid': task.subtasks[0].uuid,
'tasks_graph': {
None: [],
node.uid: [
{
'id': 'delete_node',
'type': 'puppet',
'fail_on_error': True,
'parameters': {'cwd': '/'}
},
]
},
'tasks_directory': {},
'dry_run': False,
},
'respond_to': 'transaction_resp',
'method': 'task_deploy',
'api_version': '1'
}]
)
self._success(task.subtasks[0].uuid)
self.assertEqual(task.status, consts.TASK_STATUSES.ready)

View File

@ -24,22 +24,32 @@ from nailgun.test.base import BaseUnitTest
class TestMakeAstuteMessage(BaseUnitTest):
maxDiff = None
@mock.patch('nailgun.transactions.manager.objects')
@mock.patch('nailgun.transactions.manager.lcm')
def test_make_astute_message(self, lcm_mock, obj_mock):
resolver = mock.MagicMock()
context = mock.MagicMock()
tx = mock.MagicMock(dry_run=False)
tasks = mock.MagicMock()
graph = {
'tasks': mock.MagicMock(),
'on_success': {'node_attributes': {}},
'on_error': {},
}
tasks_directory = mock.MagicMock()
tasks_graph = mock.MagicMock()
tasks_metadata = mock.MagicMock()
tasks_metadata = {
'node_statuses_transitions': {
'successful': {},
'failed': {'status': consts.NODE_STATUSES.error},
'stopped': {'status': consts.NODE_STATUSES.stopped},
}
}
lcm_mock.TransactionSerializer.serialize.return_value = (
tasks_directory, tasks_graph, tasks_metadata
tasks_directory, tasks_graph, {}
)
result = manager.make_astute_message(tx, context, tasks, resolver)
result = manager.make_astute_message(tx, context, graph, resolver)
self.assertEqual(
{
'api_version': manager.settings.VERSION['api'],
@ -56,7 +66,7 @@ class TestMakeAstuteMessage(BaseUnitTest):
result
)
lcm_mock.TransactionSerializer.serialize.assert_called_once_with(
context, tasks, resolver
context, graph['tasks'], resolver
)
obj_mock.DeploymentHistoryCollection.create.assert_called_once_with(
tx, tasks_graph
@ -121,62 +131,57 @@ class TestNodeForRedeploy(BaseUnitTest):
)
class TestGetTasksToRun(BaseUnitTest):
class TestAdjustTasksToRun(BaseUnitTest):
@mock.patch('nailgun.transactions.manager.objects')
def test_get_tasks_if_no_legacy(self, objects_mock):
def test_adjust_tasks_if_no_legacy(self, objects_mock):
cluster_obj = objects_mock.Cluster
tasks = [
{'id': 'tasks1', 'type': consts.ORCHESTRATOR_TASK_TYPES.puppet},
{'id': 'tasks2', 'type': consts.ORCHESTRATOR_TASK_TYPES.group},
{'id': 'tasks3', 'type': consts.ORCHESTRATOR_TASK_TYPES.shell},
{'id': 'tasks4', 'type': consts.ORCHESTRATOR_TASK_TYPES.skipped}
{'id': 'task1', 'type': consts.ORCHESTRATOR_TASK_TYPES.puppet},
{'id': 'task2', 'type': consts.ORCHESTRATOR_TASK_TYPES.group},
{'id': 'task3', 'type': consts.ORCHESTRATOR_TASK_TYPES.shell},
{'id': 'task4', 'type': consts.ORCHESTRATOR_TASK_TYPES.skipped}
]
cluster_obj.get_deployment_tasks.return_value = tasks
graph = {'tasks': tasks[:]}
cluster_obj.is_propagate_task_deploy_enabled.return_value = False
cluster = mock.MagicMock()
result = manager._get_tasks_to_run(cluster, 'test', None, None)
self.assertEqual(tasks, result)
cluster_obj.get_deployment_tasks.assert_called_once_with(
cluster, 'test'
)
manager._adjust_graph_tasks(graph, cluster, None, None)
self.assertEqual(tasks, graph['tasks'])
cluster_obj.is_propagate_task_deploy_enabled.assert_called_once_with(
cluster
)
filtered_result = manager._get_tasks_to_run(
cluster, 'test', None, ['task2']
)
# filter result
manager._adjust_graph_tasks(graph, cluster, None, ['task1'])
tasks[2]['type'] = consts.ORCHESTRATOR_TASK_TYPES.skipped
self.assertEqual(tasks, filtered_result)
self.assertEqual(tasks, graph['tasks'])
@mock.patch('nailgun.transactions.manager.objects')
@mock.patch('nailgun.transactions.manager.legacy_tasks_adapter')
def test_get_tasks_with_legacy(self, adapter_mock, objects_mock):
def test_adjust_tasks_with_legacy(self, adapter_mock, objects_mock):
cluster_obj = objects_mock.Cluster
tasks = [
{'id': 'tasks2', 'type': consts.ORCHESTRATOR_TASK_TYPES.group},
]
cluster_obj.get_deployment_tasks.return_value = tasks
graph = {'tasks': tasks[:], 'type': 'provision'}
cluster_obj.is_propagate_task_deploy_enabled.return_value = True
adapter_mock.adapt_legacy_tasks.return_value = tasks
cluster = mock.MagicMock()
resolver = mock.MagicMock()
result = manager._get_tasks_to_run(cluster, 'test', resolver, None)
self.assertEqual(tasks, result)
manager._adjust_graph_tasks(graph, cluster, resolver, None)
self.assertEqual(tasks, graph['tasks'])
cluster_obj.is_propagate_task_deploy_enabled.assert_called_once_with(
cluster
)
adapter_mock.adapt_legacy_tasks.assert_called_once_with(
tasks, None, resolver
graph['tasks'], None, resolver
)
result2 = manager._get_tasks_to_run(
cluster, consts.DEFAULT_DEPLOYMENT_GRAPH_TYPE, resolver, None,
)
self.assertEqual(tasks, result2)
graph2 = {
'tasks': tasks[:], 'type': consts.DEFAULT_DEPLOYMENT_GRAPH_TYPE
}
manager._adjust_graph_tasks(graph2, cluster, resolver, None)
self.assertEqual(tasks, graph2['tasks'])
cluster_obj.get_legacy_plugin_tasks.assert_called_once_with(cluster)
adapter_mock.adapt_legacy_tasks.assert_called_with(
@ -269,3 +274,63 @@ class TestUpdateNodes(BaseUnitTest):
node_params = {'1': {'status': consts.NODE_STATUSES.ready}}
manager._update_nodes(transaction, nodes, node_params)
self.assertEqual(consts.NODE_STATUSES.discover, nodes[0].status)
class TestGetNodesToRun(BaseUnitTest):
@mock.patch('nailgun.transactions.manager.objects')
def test_get_nodes_by_ids(self, objects_mock):
nodes_obj_mock = objects_mock.NodeCollection
cluster = mock.MagicMock()
node_ids = [1, 2]
filtered_nodes = manager._get_nodes_to_run(cluster, None, node_ids)
nodes_obj_mock.filter_by.assert_called_once_with(
None, cluster_id=cluster.id, online=True
)
nodes_obj_mock.filter_by_list.assert_called_once_with(
mock.ANY, 'id', node_ids
)
nodes_obj_mock.order_by.assert_called_once_with(
mock.ANY, 'id'
)
self.assertEqual(
filtered_nodes, nodes_obj_mock.lock_for_update().all()
)
@mock.patch('nailgun.transactions.manager.objects')
def test_get_by_node_filter(self, obj_mock):
nodes_obj_mock = obj_mock.NodeCollection
cluster = mock.MagicMock()
node_filter = '$.pending_deletion'
nodes_list = [
{'id': 1, 'pending_deletion': False},
{'id': 2, 'pending_deletion': True}
]
nodes_obj_mock.to_list.return_value = nodes_list
manager._get_nodes_to_run(cluster, node_filter)
nodes_obj_mock.filter_by_list.assert_called_once_with(
mock.ANY, 'id', [2]
)
@mock.patch('nailgun.transactions.manager.objects')
@mock.patch('nailgun.transactions.manager.yaql_ext')
def test_ids_has_high_priority_then_node_filter(self, yaql_mock, obj_mock):
nodes_obj_mock = obj_mock.NodeCollection
cluster = mock.MagicMock()
node_ids = [1, 2]
node_filter = '$.pending_deletion'
manager._get_nodes_to_run(cluster, node_filter, node_ids)
nodes_obj_mock.filter_by_list.assert_called_once_with(
mock.ANY, 'id', node_ids
)
self.assertEqual(0, yaql_mock.create_context.call_count)
@mock.patch('nailgun.transactions.manager.objects')
@mock.patch('nailgun.transactions.manager.yaql_ext')
def test_get_all_nodes_with_empty_ids(self, yaql_mock, obj_mock):
nodes_obj_mock = obj_mock.NodeCollection
cluster = mock.MagicMock()
node_ids = []
node_filter = '$.pending_deletion'
manager._get_nodes_to_run(cluster, node_filter, node_ids)
self.assertEqual(0, nodes_obj_mock.filter_by_list.call_count)
self.assertEqual(0, yaql_mock.create_context.call_count)

View File

@ -31,15 +31,38 @@ from nailgun.settings import settings
from nailgun.task import helpers
from nailgun.task import legacy_tasks_adapter
from nailgun.utils import dict_update
from nailgun.utils import get_in
from nailgun.utils import mule
from nailgun.utils import role_resolver
from nailgun import yaql_ext
def make_astute_message(transaction, context, tasks, node_resolver):
directory, graph, metadata = lcm.TransactionSerializer.serialize(
context, tasks, node_resolver
_DEFAULT_NODE_ATTRIBUTES = {
'on_success': {'status': consts.NODE_STATUSES.ready},
'on_error': {'status': consts.NODE_STATUSES.error},
'on_stop': {'status': consts.NODE_STATUSES.stopped},
}
def _get_node_attributes(graph, kind):
r = get_in(graph, kind, 'node_attributes')
if r is None:
r = _DEFAULT_NODE_ATTRIBUTES[kind]
return r
def make_astute_message(transaction, context, graph, node_resolver):
directory, tasks, metadata = lcm.TransactionSerializer.serialize(
context, graph['tasks'], node_resolver
)
objects.DeploymentHistoryCollection.create(transaction, graph)
metadata['node_statuses_transitions'] = {
'successful': _get_node_attributes(graph, 'on_success'),
'failed': _get_node_attributes(graph, 'on_error'),
'stopped': _get_node_attributes(graph, 'on_stop')
}
objects.DeploymentHistoryCollection.create(transaction, tasks)
return {
'api_version': settings.VERSION['api'],
'method': 'task_deploy',
@ -47,7 +70,7 @@ def make_astute_message(transaction, context, tasks, node_resolver):
'args': {
'task_uuid': transaction.uuid,
'tasks_directory': directory,
'tasks_graph': graph,
'tasks_graph': tasks,
'tasks_metadata': metadata,
'dry_run': transaction.dry_run,
}
@ -264,8 +287,14 @@ class TransactionsManager(object):
return False
cluster = sub_transaction.cluster
nodes = _get_nodes_to_run(cluster, sub_transaction.cache.get('nodes'))
graph = objects.Cluster.get_deployment_graph(
cluster, sub_transaction.graph_type
)
nodes = _get_nodes_to_run(
cluster,
graph.get('node_filter'),
sub_transaction.cache.get('nodes')
)
for node in nodes:
node.roles = list(set(node.roles + node.pending_roles))
node.pending_roles = []
@ -273,26 +302,27 @@ class TransactionsManager(object):
node.progress = 1
resolver = role_resolver.RoleResolver(nodes)
tasks = _get_tasks_to_run(
_adjust_graph_tasks(
graph,
cluster,
sub_transaction.graph_type,
resolver,
sub_transaction.cache.get('tasks'))
context = lcm.TransactionContext(
_get_expected_state(cluster, nodes),
_get_current_state(
cluster, nodes, tasks, sub_transaction.cache.get('force')
cluster, nodes, graph['tasks'],
sub_transaction.cache.get('force')
))
# Attach desired state to the sub transaction, so when we continue
# our top-level transaction, the new state will be calculated on
# top of this.
_dump_expected_state(sub_transaction, context.new, tasks)
_dump_expected_state(sub_transaction, context.new, graph['tasks'])
with try_transaction(sub_transaction):
message = make_astute_message(
sub_transaction, context, tasks, resolver)
sub_transaction, context, graph, resolver)
# Once rpc.cast() is called, the message is sent to Astute. By
# that moment all transaction instanced must exist in database,
@ -335,13 +365,46 @@ def _remove_obsolete_tasks(cluster):
db().flush()
def _get_nodes_to_run(cluster, ids=None):
def _get_nodes_to_run(cluster, node_filter, ids=None):
# Trying to run tasks on offline nodes will lead to error, since most
# probably MCollective is unreachable. In order to avoid that, we need
# to select only online nodes.
nodes = objects.NodeCollection.filter_by(
None, cluster_id=cluster.id, online=True)
if ids is None and node_filter:
# TODO(bgaifullin) Need to implement adapter for YAQL
# to direct query data from DB instead of query all data from DB
yaql_exp = yaql_ext.get_default_engine()(
'$.where({0}).select($.id)'.format(node_filter)
)
ids = yaql_exp.evaluate(
data=objects.NodeCollection.to_list(
nodes,
# TODO(bgaifullin) remove hard-coded list of fields
# the field network_data causes fail of following
# cluster serialization because it modifies attributes of
# node and this update will be stored in DB.
fields=(
'id',
'name',
'status',
'pending_deletion',
'pending_addition',
'error_type',
'roles',
'pending_roles',
'attributes',
'meta',
'hostname',
'labels'
)
),
context=yaql_ext.create_context(
add_extensions=True, yaqlized=False
)
)
if ids:
nodes = objects.NodeCollection.filter_by_list(nodes, 'id', ids)
@ -350,23 +413,23 @@ def _get_nodes_to_run(cluster, ids=None):
).all()
def _get_tasks_to_run(cluster, graph_type, node_resolver, names=None):
tasks = objects.Cluster.get_deployment_tasks(cluster, graph_type)
def _adjust_graph_tasks(graph, cluster, node_resolver, names=None):
if objects.Cluster.is_propagate_task_deploy_enabled(cluster):
# TODO(bgaifullin) move this code into Cluster.get_deployment_tasks
# after dependency from role_resolver will be removed
if graph_type == consts.DEFAULT_DEPLOYMENT_GRAPH_TYPE:
if graph['type'] == consts.DEFAULT_DEPLOYMENT_GRAPH_TYPE:
plugin_tasks = objects.Cluster.get_legacy_plugin_tasks(cluster)
else:
plugin_tasks = None
tasks = list(legacy_tasks_adapter.adapt_legacy_tasks(
tasks, plugin_tasks, node_resolver
graph['tasks'] = list(legacy_tasks_adapter.adapt_legacy_tasks(
graph['tasks'], plugin_tasks, node_resolver
))
if names:
# filter task by names, mark all other task as skipped
task_ids = set(names)
tasks = graph['tasks']
for idx, task in enumerate(tasks):
if (task['id'] not in task_ids and
task['type'] not in consts.INTERNAL_TASKS):
@ -375,8 +438,6 @@ def _get_tasks_to_run(cluster, graph_type, node_resolver, names=None):
task['type'] = consts.ORCHESTRATOR_TASK_TYPES.skipped
tasks[idx] = task
return tasks
def _is_node_for_redeploy(node):
if node is None:

View File

@ -23,7 +23,7 @@ _global_engine = None
def create_context(add_serializers=False, add_datadiff=False,
add_extensions=False, **kwargs):
add_extensions=False, **kwargs):
context = yaql.create_context(**kwargs)
if add_serializers:
serializers.register(context)