diff --git a/nailgun/nailgun/db/sqlalchemy/models/task.py b/nailgun/nailgun/db/sqlalchemy/models/task.py index 2c329855c0..10d3cf6f98 100644 --- a/nailgun/nailgun/db/sqlalchemy/models/task.py +++ b/nailgun/nailgun/db/sqlalchemy/models/task.py @@ -66,7 +66,8 @@ class Task(Base): subtasks = relationship( "Task", backref=backref('parent', remote_side=[id]), - cascade="all,delete" + cascade="all,delete", + order_by='Task.id' ) notifications = relationship( "Notification", diff --git a/nailgun/nailgun/objects/__init__.py b/nailgun/nailgun/objects/__init__.py index 47af1e9e28..d3284f17b2 100644 --- a/nailgun/nailgun/objects/__init__.py +++ b/nailgun/nailgun/objects/__init__.py @@ -26,6 +26,7 @@ from nailgun.objects.oswl import OpenStackWorkloadStatsCollection from nailgun.objects.deployment_graph import DeploymentGraph from nailgun.objects.deployment_graph import DeploymentGraphCollection +from nailgun.objects.deployment_graph import DeploymentGraphTask from nailgun.objects.release import Release from nailgun.objects.release import ReleaseCollection diff --git a/nailgun/nailgun/objects/task.py b/nailgun/nailgun/objects/task.py index 9a76d2a3f1..675dc11e91 100644 --- a/nailgun/nailgun/objects/task.py +++ b/nailgun/nailgun/objects/task.py @@ -184,11 +184,10 @@ class Task(NailgunObject): n.error_type = error_type @classmethod - def __update_cluster_status(cls, cluster, status, expected_node_status): + def _update_cluster_status(cls, cluster, status, expected_node_status): logger.debug( "Updating cluster (%s) status: from %s to %s", cluster.full_name, cluster.status, status) - if expected_node_status is not None: remaining = Cluster.get_nodes_count_unmet_status( cluster, expected_node_status @@ -222,7 +221,7 @@ class Task(NailgunObject): n.status = consts.NODE_STATUSES.ready n.progress = 100 - cls.__update_cluster_status( + cls._update_cluster_status( cluster, consts.CLUSTER_STATUSES.operational, consts.NODE_STATUSES.ready @@ -231,7 +230,7 @@ class Task(NailgunObject): Cluster.clear_pending_changes(cluster) elif instance.status == consts.TASK_STATUSES.error: - cls.__update_cluster_status( + cls._update_cluster_status( cluster, consts.CLUSTER_STATUSES.error, None ) q_nodes_to_error = TaskHelper.get_nodes_to_deployment_error( @@ -245,7 +244,7 @@ class Task(NailgunObject): Cluster.set_vms_created_state(cluster) elif (instance.status == consts.TASK_STATUSES.error and not TaskHelper.before_deployment_error(instance)): - cls.__update_cluster_status( + cls._update_cluster_status( cluster, consts.CLUSTER_STATUSES.error, None ) elif instance.name == consts.TASK_NAMES.deploy and \ @@ -255,17 +254,17 @@ class Task(NailgunObject): # error because we don't want to lock # settings if cluster wasn't deployed - cls.__update_cluster_status( + cls._update_cluster_status( cluster, consts.CLUSTER_STATUSES.error, None ) elif instance.name == consts.TASK_NAMES.provision: if instance.status == consts.TASK_STATUSES.ready: - cls.__update_cluster_status( + cls._update_cluster_status( cluster, consts.CLUSTER_STATUSES.partially_deployed, None ) elif instance.status == consts.TASK_STATUSES.error: - cls.__update_cluster_status( + cls._update_cluster_status( cluster, consts.CLUSTER_STATUSES.error, None ) q_nodes_to_error = \ @@ -275,11 +274,11 @@ class Task(NailgunObject): q_nodes_to_error, error_type=consts.NODE_ERRORS.provision) elif instance.name == consts.TASK_NAMES.stop_deployment: if instance.status == consts.TASK_STATUSES.error: - cls.__update_cluster_status( + cls._update_cluster_status( cluster, consts.CLUSTER_STATUSES.error, None ) else: - cls.__update_cluster_status( + cls._update_cluster_status( cluster, consts.CLUSTER_STATUSES.stopped, None ) @@ -290,6 +289,60 @@ class Task(NailgunObject): result.pop('status', None) return result + @classmethod + def update_recursively(cls, instance, data): + logger.debug("Updating task: %s", instance.uuid) + clean_data = cls._clean_data(data) + super(Task, cls).update(instance, data) + if instance.parent: + parent = instance.parent + siblings = parent.subtasks + status = clean_data.get('status') + if status == consts.TASK_STATUSES.ready: + clean_data['progress'] = 100 + instance.progress = 100 + ready_siblings_count = sum( + x.status == consts.TASK_STATUSES.ready for x in siblings + ) + if ready_siblings_count == len(siblings): + parent.status = consts.TASK_STATUSES.ready + elif status == consts.TASK_STATUSES.error: + parent.status = consts.TASK_STATUSES.error + for s in siblings: + if s.status != consts.TASK_STATUSES.ready: + s.status = consts.TASK_STATUSES.error + s.progress = 100 + s.message = "Task aborted" + clean_data['progress'] = 100 + instance.progress = 100 + TaskHelper.update_action_log(parent) + elif status == consts.TASK_STATUSES.running: + parent.status = consts.TASK_STATUSES.running + + if 'progress' in clean_data: + total_progress = sum(x.progress for x in siblings) + parent.progress = total_progress // len(siblings) + + task_status = parent.status + else: + task_status = instance.status + + if not instance.dry_run: + if task_status == consts.TASK_STATUSES.ready: + cls._update_cluster_status( + instance.cluster, + consts.CLUSTER_STATUSES.operational, + consts.NODE_STATUSES.ready + ) + elif task_status == consts.TASK_STATUSES.error: + cls._update_cluster_status( + instance.cluster, + consts.CLUSTER_STATUSES.error, + None + ) + + db().flush() + @classmethod def update(cls, instance, data): logger.debug("Updating task: %s", instance.uuid) diff --git a/nailgun/nailgun/objects/transaction.py b/nailgun/nailgun/objects/transaction.py index ed7cda86c1..31a0dde85b 100644 --- a/nailgun/nailgun/objects/transaction.py +++ b/nailgun/nailgun/objects/transaction.py @@ -124,7 +124,7 @@ class TransactionCollection(NailgunCollection): # TODO(bgaifullin) remove hardcoded name of task return cls.filter_by( None, cluster_id=cluster.id, name=consts.TASK_NAMES.deployment, - status=consts.TASK_STATUSES.ready + status=consts.TASK_STATUSES.ready, dry_run=False, ).order_by('-id').limit(1).first() @classmethod @@ -148,6 +148,7 @@ class TransactionCollection(NailgunCollection): ).join(history).filter( model.cluster_id == cluster_id, model.name == consts.TASK_NAMES.deployment, + model.dry_run.is_(False), history.status == consts.HISTORY_TASK_STATUSES.ready, ) diff --git a/nailgun/nailgun/rpc/receiver.py b/nailgun/nailgun/rpc/receiver.py index c2a2bc8dea..dea8418d81 100644 --- a/nailgun/nailgun/rpc/receiver.py +++ b/nailgun/nailgun/rpc/receiver.py @@ -30,6 +30,7 @@ from nailgun import errors as nailgun_errors from nailgun import notifier from nailgun import objects from nailgun.settings import settings +from nailgun import transactions from nailgun.consts import TASK_STATUSES from nailgun.db import db @@ -229,6 +230,21 @@ class NailgunReceiver(object): objects.Task.update(task, {'status': status}) + @classmethod + def transaction_resp(cls, **kwargs): + logger.info( + "RPC method transaction_resp received: %s", jsonutils.dumps(kwargs) + ) + + transaction = objects.Task.get_by_uuid( + kwargs.pop('task_uuid', None), + fail_if_not_found=True, + lock_for_update=True, + ) + + manager = transactions.TransactionsManager(transaction.cluster.id) + manager.process(transaction, kwargs) + @classmethod def deploy_resp(cls, **kwargs): logger.info( @@ -463,6 +479,25 @@ class NailgunReceiver(object): task_uuid=task_uuid ) + @classmethod + def _assemble_task_update(cls, task, status, progress, message, nodes): + """Assemble arguments to update task. + + :param task: objects.Task object + :param status: consts.TASK_STATUSES value + :param progress: progress number value + :param message: message text + :param nodes: the modified nodes list + """ + + if status == consts.TASK_STATUSES.error: + data = cls._error_action(task, status, progress, message) + elif status == consts.TASK_STATUSES.ready: + data = cls._success_action(task, status, progress, nodes) + else: + data = {'status': status, 'progress': progress, 'message': message} + return data + @classmethod def _update_task_status(cls, task, status, progress, message, nodes): """Do update task status actions. @@ -473,14 +508,10 @@ class NailgunReceiver(object): :param message: message text :param nodes: the modified nodes list """ - # Let's check the whole task status - if status == consts.TASK_STATUSES.error: - cls._error_action(task, status, progress, message) - elif status == consts.TASK_STATUSES.ready: - cls._success_action(task, status, progress, nodes) - else: - data = {'status': status, 'progress': progress, 'message': message} - objects.Task.update(task, data) + objects.Task.update( + task, + cls._assemble_task_update(task, status, progress, message, nodes) + ) @classmethod def _update_action_log_entry(cls, task_status, task_name, task_uuid, @@ -562,15 +593,14 @@ class NailgunReceiver(object): notify_message = message if error_message is not None else None cls._notify(task, consts.NOTIFICATION_TOPICS.error, notify_message) - data = {'status': status, 'progress': progress, 'message': message} - objects.Task.update(task, data) + return {'status': status, 'progress': progress, 'message': message} @classmethod def _success_action(cls, task, status, progress, nodes): - # check if all nodes are ready + # we shouldn't report success if there's at least one node in + # error state if any(n.status == consts.NODE_STATUSES.error for n in nodes): - cls._error_action(task, 'error', 100) - return + return cls._error_action(task, 'error', 100) task_name = task.name.title() if nodes: @@ -596,8 +626,7 @@ class NailgunReceiver(object): message = '{0}\n\n{1}'.format(message, plugins_msg) cls._notify(task, consts.NOTIFICATION_TOPICS.done, message) - data = {'status': status, 'progress': progress, 'message': message} - objects.Task.update(task, data) + return {'status': status, 'progress': progress, 'message': message} @classmethod def _make_plugins_success_message(cls, plugins): diff --git a/nailgun/nailgun/test/integration/test_transactions_manager.py b/nailgun/nailgun/test/integration/test_transactions_manager.py new file mode 100644 index 0000000000..aed8d6f1fb --- /dev/null +++ b/nailgun/nailgun/test/integration/test_transactions_manager.py @@ -0,0 +1,412 @@ +# -*- coding: utf-8 -*- + +# Copyright 2016 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +from nailgun import consts +from nailgun import objects +from nailgun.rpc import receiver +from nailgun.transactions import manager + +from nailgun.test import base + + +class TestTransactionManager(base.BaseIntegrationTest): + + def setUp(self): + super(TestTransactionManager, self).setUp() + self.cluster = self.env.create( + cluster_kwargs={}, + nodes_kwargs=[ + {"status": consts.NODE_STATUSES.discover}, + {"status": consts.NODE_STATUSES.discover, "online": False}, + ], + release_kwargs={ + 'version': 'mitaka-9.0', + 'operating_system': consts.RELEASE_OS.ubuntu + }) + self.graph = objects.DeploymentGraph.create_for_model( + { + 'tasks': [ + { + 'id': 'test_task', + 'type': consts.ORCHESTRATOR_TASK_TYPES.puppet, + 'roles': ['/.*/'] + }, + ], + 'name': 'test_graph', + }, + instance=self.cluster, + graph_type='test_graph') + self.manager = manager.TransactionsManager(self.cluster.id) + self.receiver = receiver.NailgunReceiver + + def _sucess(self, transaction_uuid): + self.receiver.transaction_resp( + task_uuid=transaction_uuid, + nodes=[ + {'uid': n.uid, 'status': consts.NODE_STATUSES.ready} + for n in self.cluster.nodes + ], + progress=100, + status=consts.TASK_STATUSES.ready) + + def _fail(self, transaction_uuid): + self.receiver.transaction_resp( + task_uuid=transaction_uuid, + nodes=[ + {'uid': n.uid, 'status': consts.NODE_STATUSES.error} + for n in self.cluster.nodes + ], + progress=100, + status=consts.TASK_STATUSES.error) + + @mock.patch('nailgun.transactions.manager.rpc') + def test_execute_graph(self, rpc_mock): + task = self.manager.execute(graphs=[{"type": "test_graph"}]) + + rpc_mock.cast.assert_called_once_with( + 'naily', + [{ + 'args': { + 'tasks_metadata': {'fault_tolerance_groups': []}, + 'task_uuid': task.subtasks[0].uuid, + 'tasks_graph': { + None: [], + self.cluster.nodes[0].uid: [ + { + 'id': 'test_task', + 'type': 'puppet', + 'fail_on_error': True, + 'parameters': {'cwd': '/'} + }, + ] + }, + 'tasks_directory': {}, + 'dry_run': False, + }, + 'respond_to': 'transaction_resp', + 'method': 'task_deploy', + 'api_version': '1' + }]) + + self._sucess(task.subtasks[0].uuid) + self.assertEqual(task.status, consts.TASK_STATUSES.ready) + + @mock.patch('nailgun.transactions.manager.rpc') + def test_execute_few_graphs(self, rpc_mock): + objects.DeploymentGraph.create_for_model( + { + 'tasks': [ + { + 'id': 'super-mega-other-task', + 'type': consts.ORCHESTRATOR_TASK_TYPES.puppet, + 'roles': ['/.*/'] + }, + ], + 'name': 'test_graph_2', + }, + instance=self.cluster, + graph_type='test_graph_2') + + task = self.manager.execute(graphs=[ + {"type": "test_graph"}, + {"type": "test_graph_2"}, + ]) + + self.assertItemsEqual( + ["test_graph", "test_graph_2"], + [sub.graph_type for sub in task.subtasks]) + + # Only a message for the first graph should be sent, because + # the second graph should be sent by RPC receiver once first + # one is completed. + rpc_mock.cast.assert_called_once_with( + 'naily', + [{ + 'args': { + 'tasks_metadata': {'fault_tolerance_groups': []}, + 'task_uuid': task.subtasks[0].uuid, + 'tasks_graph': { + None: [], + self.cluster.nodes[0].uid: [ + { + 'id': 'test_task', + 'type': 'puppet', + 'fail_on_error': True, + 'parameters': {'cwd': '/'} + }, + ] + }, + 'tasks_directory': {}, + 'dry_run': False, + }, + 'respond_to': 'transaction_resp', + 'method': 'task_deploy', + 'api_version': '1' + }]) + + # Consider we've got success from Astute. + self._sucess(task.subtasks[0].uuid) + + # It's time to send the second graph to execution. + rpc_mock.cast.assert_called_with( + 'naily', + [{ + 'args': { + 'tasks_metadata': {'fault_tolerance_groups': []}, + 'task_uuid': task.subtasks[1].uuid, + 'tasks_graph': { + None: [], + self.cluster.nodes[0].uid: [ + { + 'id': 'super-mega-other-task', + 'type': 'puppet', + 'fail_on_error': True, + 'parameters': {'cwd': '/'} + }, + ] + }, + 'tasks_directory': {}, + 'dry_run': False, + }, + 'respond_to': 'transaction_resp', + 'method': 'task_deploy', + 'api_version': '1' + }]) + + # Consider we've got success from Astute. + self._sucess(task.subtasks[1].uuid) + + # Ensure the top leve transaction is ready. + self.assertEqual(task.status, consts.TASK_STATUSES.ready) + + @mock.patch('nailgun.transactions.manager.rpc') + def test_execute_few_graphs_first_fail(self, rpc_mock): + objects.DeploymentGraph.create_for_model( + { + 'tasks': [ + { + 'id': 'super-mega-other-task', + 'type': consts.ORCHESTRATOR_TASK_TYPES.puppet, + 'roles': ['/.*/'] + }, + ], + 'name': 'test_graph_2', + }, + instance=self.cluster, + graph_type='test_graph_2') + + task = self.manager.execute(graphs=[ + {"type": "test_graph"}, + {"type": "test_graph_2"}, + ]) + + self.assertItemsEqual( + ["test_graph", "test_graph_2"], + [sub.graph_type for sub in task.subtasks]) + + # Only a message for the first graph should be sent, because + # the second graph should be sent by RPC receiver once first + # one is completed. + rpc_mock.cast.assert_called_once_with( + 'naily', + [{ + 'args': { + 'tasks_metadata': {'fault_tolerance_groups': []}, + 'task_uuid': task.subtasks[0].uuid, + 'tasks_graph': { + None: [], + self.cluster.nodes[0].uid: [ + { + 'id': 'test_task', + 'type': 'puppet', + 'fail_on_error': True, + 'parameters': {'cwd': '/'} + }, + ] + }, + 'tasks_directory': {}, + 'dry_run': False, + }, + 'respond_to': 'transaction_resp', + 'method': 'task_deploy', + 'api_version': '1' + }]) + + self._fail(task.subtasks[0].uuid) + + self.assertEqual(rpc_mock.cast.call_count, 1) + self.assertEqual(task.status, consts.TASK_STATUSES.error) + + @mock.patch('nailgun.transactions.manager.rpc') + def test_execute_w_task(self, rpc_mock): + self.graph.tasks.append(objects.DeploymentGraphTask.create( + { + 'id': 'test_task_2', + 'type': consts.ORCHESTRATOR_TASK_TYPES.puppet, + 'roles': ['/.*/'] + })) + + task = self.manager.execute(graphs=[ + { + "type": "test_graph", + "tasks": ["test_task"], + }]) + + rpc_mock.cast.assert_called_once_with( + 'naily', + [{ + 'args': { + 'tasks_metadata': {'fault_tolerance_groups': []}, + 'task_uuid': task.subtasks[0].uuid, + 'tasks_graph': { + None: [], + self.cluster.nodes[0].uid: mock.ANY, + }, + 'tasks_directory': {}, + 'dry_run': False, + }, + 'respond_to': 'transaction_resp', + 'method': 'task_deploy', + 'api_version': '1' + }]) + + tasks_graph = rpc_mock.cast.call_args[0][1][0]['args']['tasks_graph'] + self.assertItemsEqual(tasks_graph[self.cluster.nodes[0].uid], [ + { + 'id': 'test_task', + 'type': 'puppet', + 'fail_on_error': True, + 'parameters': {'cwd': '/'} + }, + { + 'id': 'test_task_2', + 'type': 'skipped', + 'fail_on_error': False, + } + ]) + + self._sucess(task.subtasks[0].uuid) + self.assertEqual(task.status, consts.TASK_STATUSES.ready) + + @mock.patch('nailgun.transactions.manager.rpc') + def test_execute_w_non_existing_task(self, rpc_mock): + task = self.manager.execute(graphs=[ + { + "type": "test_graph", + "tasks": ["non_exist"], + }]) + + rpc_mock.cast.assert_called_once_with( + 'naily', + [{ + 'args': { + 'tasks_metadata': {'fault_tolerance_groups': []}, + 'task_uuid': task.subtasks[0].uuid, + 'tasks_graph': { + None: [], + self.cluster.nodes[0].uid: [ + { + 'id': 'test_task', + 'type': 'skipped', + 'fail_on_error': False, + }, + ] + }, + 'tasks_directory': {}, + 'dry_run': False, + }, + 'respond_to': 'transaction_resp', + 'method': 'task_deploy', + 'api_version': '1' + }]) + + self._sucess(task.subtasks[0].uuid) + self.assertEqual(task.status, consts.TASK_STATUSES.ready) + + @mock.patch('nailgun.transactions.manager.rpc') + def test_execute_dry_run(self, rpc_mock): + task = self.manager.execute( + graphs=[{"type": "test_graph"}], dry_run=True) + + rpc_mock.cast.assert_called_once_with( + 'naily', + [{ + 'args': { + 'tasks_metadata': {'fault_tolerance_groups': []}, + 'task_uuid': task.subtasks[0].uuid, + 'tasks_graph': { + None: [], + self.cluster.nodes[0].uid: [ + { + 'id': 'test_task', + 'type': 'puppet', + 'fail_on_error': True, + 'parameters': {'cwd': '/'} + }, + ] + }, + 'tasks_directory': {}, + 'dry_run': True, + }, + 'respond_to': 'transaction_resp', + 'method': 'task_deploy', + 'api_version': '1' + }]) + + self._sucess(task.subtasks[0].uuid) + self.assertEqual(task.status, consts.TASK_STATUSES.ready) + + @mock.patch('nailgun.transactions.manager.rpc') + def test_execute_on_one_node(self, rpc_mock): + node = self.env.create_node( + cluster_id=self.cluster.id, pending_roles=["compute"]) + + task = self.manager.execute(graphs=[ + { + "type": "test_graph", + "nodes": [node.id], + }]) + + rpc_mock.cast.assert_called_once_with( + 'naily', + [{ + 'args': { + 'tasks_metadata': {'fault_tolerance_groups': []}, + 'task_uuid': task.subtasks[0].uuid, + 'tasks_graph': { + None: [], + node.uid: [ + { + 'id': 'test_task', + 'type': 'puppet', + 'fail_on_error': True, + 'parameters': {'cwd': '/'} + }, + ] + }, + 'tasks_directory': {}, + 'dry_run': False, + }, + 'respond_to': 'transaction_resp', + 'method': 'task_deploy', + 'api_version': '1' + }] + ) + + self._sucess(task.subtasks[0].uuid) + self.assertEqual(task.status, consts.TASK_STATUSES.ready) diff --git a/nailgun/nailgun/test/unit/test_objects.py b/nailgun/nailgun/test/unit/test_objects.py index e363555fce..c516abe52b 100644 --- a/nailgun/nailgun/test/unit/test_objects.py +++ b/nailgun/nailgun/test/unit/test_objects.py @@ -2197,3 +2197,123 @@ class TestOpenstackConfigCollection(BaseTestCase): self.assertEqual(config.node_id, node_id) self.assertEqual(configs[0].config_type, consts.OPENSTACK_CONFIG_TYPES.node) + + def test_task_update_recursively(self): + parent = self.env.create_task( + name=consts.TASK_NAMES.deployment, + status=consts.TASK_STATUSES.pending, + cluster_id=self.cluster.id + ) + child1 = parent.create_subtask( + name=consts.TASK_NAMES.deployment, + status=consts.TASK_STATUSES.pending + ) + child2 = parent.create_subtask( + name=consts.TASK_NAMES.deployment, + status=consts.TASK_STATUSES.pending + ) + # update progress for child1 + objects.Task.update_recursively( + child1, {'status': consts.TASK_STATUSES.running, 'progress': 50} + ) + self.assertEqual(50, child1.progress) + self.assertEqual(consts.TASK_STATUSES.running, child1.status) + self.assertEqual(25, parent.progress) + self.assertEqual(consts.TASK_STATUSES.running, parent.status) + # finish child 1 + objects.Task.update_recursively( + child1, {'status': consts.TASK_STATUSES.ready} + ) + self.assertEqual(100, child1.progress) + self.assertEqual(consts.TASK_STATUSES.ready, child1.status) + self.assertEqual(50, parent.progress) + self.assertEqual(consts.TASK_STATUSES.running, parent.status) + # finish child 2 + objects.Task.update_recursively( + child2, {'status': consts.TASK_STATUSES.ready} + ) + self.assertEqual(100, parent.progress) + self.assertEqual(consts.TASK_STATUSES.ready, parent.status) + # fail child 2 when child1 is ready + objects.Task.update_recursively( + child2, {'status': consts.TASK_STATUSES.error} + ) + self.assertEqual(100, parent.progress) + self.assertEqual(consts.TASK_STATUSES.error, parent.status) + self.assertEqual(consts.TASK_STATUSES.ready, child1.status) + child1.status = consts.TASK_STATUSES.running + objects.Task.update_recursively( + child2, {'status': consts.TASK_STATUSES.error} + ) + self.assertEqual(100, parent.progress) + self.assertEqual(consts.TASK_STATUSES.error, parent.status) + self.assertEqual(consts.TASK_STATUSES.error, child1.status) + + def test_update_cluster_status_on_updating_task_status(self): + with mock.patch.object(objects.Task, '_update_cluster_status') as m: + task = self.env.create_task( + name=consts.TASK_NAMES.deployment, + status=consts.TASK_STATUSES.running, + cluster_id=self.cluster.id, + dry_run=True + ) + child1 = task.create_subtask( + name=consts.TASK_NAMES.deployment, + status=consts.TASK_STATUSES.pending, + dry_run=True + ) + child2 = task.create_subtask( + name=consts.TASK_NAMES.deployment, + status=consts.TASK_STATUSES.pending, + dry_run=True + ) + objects.Task.update_recursively( + task, {'status': consts.TASK_STATUSES.error} + ) + objects.Task.update_recursively( + task, {'status': consts.TASK_STATUSES.ready} + ) + + task.dry_run = False + child1.dry_run = False + child2.dry_run = False + task.status = consts.TASK_STATUSES.running + objects.Task.update_recursively( + child1, {'status': consts.TASK_STATUSES.ready} + ) + self.assertEqual(0, m.call_count) + + objects.Task.update_recursively( + child2, {'status': consts.TASK_STATUSES.ready} + ) + m.assert_called_with( + task.cluster, + consts.CLUSTER_STATUSES.operational, + consts.NODE_STATUSES.ready + ) + objects.Task.update_recursively( + child2, {'status': consts.TASK_STATUSES.error} + ) + + m.assert_called_with( + task.cluster, + consts.CLUSTER_STATUSES.error, + None + ) + + objects.Task.update_recursively( + task, {'status': consts.TASK_STATUSES.ready} + ) + m.assert_called_with( + task.cluster, + consts.CLUSTER_STATUSES.operational, + consts.NODE_STATUSES.ready + ) + objects.Task.update_recursively( + task, {'status': consts.TASK_STATUSES.error} + ) + m.assert_called_with( + task.cluster, + consts.CLUSTER_STATUSES.error, + None + ) diff --git a/nailgun/nailgun/test/unit/test_receiver.py b/nailgun/nailgun/test/unit/test_receiver.py index 9f9b47612e..1f18378b9a 100644 --- a/nailgun/nailgun/test/unit/test_receiver.py +++ b/nailgun/nailgun/test/unit/test_receiver.py @@ -54,11 +54,11 @@ class TestNailgunReceiver(base.BaseTestCase): cluster_id=self.cluster.id) def test_success_action_with_plugins(self): - NailgunReceiver._success_action( + data = NailgunReceiver._success_action( self.task, 'ready', 100, self.env.nodes ) self.assertRegexpMatches( - self.task.message, + data['message'], "Deployment of environment '[^\s]+' is done." "\n\n" "Plugin name\d is deployed. description\d\n" @@ -267,3 +267,85 @@ class TestNailgunReceiver(base.BaseTestCase): node_id="123", task_uuid=sub_task.uuid ) + + def test_transaction_resp_update_node_attributes(self): + task = self.env.create_task( + name=consts.TASK_NAMES.deployment, + status=consts.TASK_STATUSES.running, + cluster_id=self.cluster.id + ) + node = self.cluster.nodes[0] + node.status = consts.NODE_STATUSES.provisioned + node.progress = 1 + node.pending_addition = True + NailgunReceiver.transaction_resp( + task_uuid=task.uuid, + nodes=[{ + 'uid': node.uid, 'progress': 50, 'pending_addition': False + }] + ) + self.db.refresh(node) + self.assertEqual(50, node.progress) + self.assertFalse(node.pending_addition) + + @patch('nailgun.rpc.receiver.notifier.notify') + def test_transaction_resp_update_transaction_status(self, _): + task = self.env.create_task( + name=consts.TASK_NAMES.deployment, + status=consts.TASK_STATUSES.running, + cluster_id=self.cluster.id + ) + NailgunReceiver.transaction_resp( + task_uuid=task.uuid, + status=consts.TASK_STATUSES.ready + ) + self.db.refresh(task) + self.assertEqual(consts.TASK_STATUSES.ready, task.status) + + @patch('nailgun.rpc.receiver.notifier.notify') + def test_transaction_resp_does_not_update_nodes_if_dry_run(self, _): + task = self.env.create_task( + name=consts.TASK_NAMES.deployment, + status=consts.TASK_STATUSES.running, + cluster_id=self.cluster.id, + dry_run=True + ) + self.cluster.status = consts.CLUSTER_STATUSES.operational + node = self.cluster.nodes[0] + node.status = consts.NODE_STATUSES.provisioned + NailgunReceiver.transaction_resp( + task_uuid=task.uuid, + status=consts.TASK_STATUSES.ready, + nodes=[{'uid': node.uid, 'status': consts.NODE_STATUSES.ready}] + ) + self.db.refresh(task) + self.db.refresh(node) + self.db.refresh(self.cluster) + self.assertEqual(consts.TASK_STATUSES.ready, task.status) + self.assertEqual(consts.NODE_STATUSES.provisioned, node.status) + self.assertEqual( + consts.CLUSTER_STATUSES.operational, self.cluster.status + ) + + def test_transaction_resp_does_not_fail_on_virtual_nodes(self): + task = self.env.create_task( + name=consts.TASK_NAMES.deployment, + status=consts.TASK_STATUSES.running, + cluster_id=self.cluster.id, + dry_run=True + ) + + NailgunReceiver.transaction_resp( + task_uuid=task.uuid, + status=consts.TASK_STATUSES.running, + nodes=[ + { + 'uid': consts.MASTER_NODE_UID, + 'status': consts.NODE_STATUSES.provisioned, + }, + { + # cluster node uid is null + 'uid': None, + 'status': consts.NODE_STATUSES.provisioned, + }, + ]) diff --git a/nailgun/nailgun/test/unit/test_transactions_manager.py b/nailgun/nailgun/test/unit/test_transactions_manager.py new file mode 100644 index 0000000000..b7152567cf --- /dev/null +++ b/nailgun/nailgun/test/unit/test_transactions_manager.py @@ -0,0 +1,169 @@ +# -*- coding: utf-8 -*- + +# Copyright 2016 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +from nailgun import consts +from nailgun.transactions import manager + +from nailgun.test.base import BaseUnitTest + + +class TestMakeAstuteMessage(BaseUnitTest): + + @mock.patch('nailgun.transactions.manager.objects') + @mock.patch('nailgun.transactions.manager.lcm') + def test_make_astute_message(self, lcm_mock, obj_mock): + resolver = mock.MagicMock() + context = mock.MagicMock() + tx = mock.MagicMock(dry_run=False) + tasks = mock.MagicMock() + tasks_directory = mock.MagicMock() + tasks_graph = mock.MagicMock() + tasks_metadata = mock.MagicMock() + + lcm_mock.TransactionSerializer.serialize.return_value = ( + tasks_directory, tasks_graph, tasks_metadata + ) + + result = manager.make_astute_message(tx, context, tasks, resolver) + self.assertEqual( + { + 'api_version': manager.settings.VERSION['api'], + 'method': 'task_deploy', + 'respond_to': 'transaction_resp', + 'args': { + 'task_uuid': tx.uuid, + 'tasks_directory': tasks_directory, + 'tasks_graph': tasks_graph, + 'tasks_metadata': tasks_metadata, + 'dry_run': False, + } + }, + result + ) + lcm_mock.TransactionSerializer.serialize.assert_called_once_with( + context, tasks, resolver + ) + obj_mock.DeploymentHistoryCollection.create.assert_called_once_with( + tx, tasks_graph + ) + + +class TestRemoveObsoleteTasks(BaseUnitTest): + + @mock.patch('nailgun.transactions.manager.db') + @mock.patch('nailgun.transactions.manager.objects') + def test_remove_obsolete_tasks(self, objects_mock, db_mock): + tasks = [ + mock.MagicMock(status=consts.TASK_STATUSES.ready), + mock.MagicMock(status=consts.TASK_STATUSES.error), + mock.MagicMock(status=consts.TASK_STATUSES.running), + ] + objects_mock.TaskCollection.order_by.return_value = tasks + + cluster = mock.MagicMock() + manager._remove_obsolete_tasks(cluster) + + db_mock().flush.assert_called_once_with() + objects_mock.TaskCollection.get_cluster_tasks.assert_called_once_with( + cluster.id + ) + objects_mock.TaskCollection.order_by( + objects_mock.TaskCollection.get_cluster_tasks.return_value, 'id' + ) + objects_mock.Task.delete.assert_has_calls([ + mock.call(tasks[0]), mock.call(tasks[1]) + ]) + + +class TestNodeForRedeploy(BaseUnitTest): + + def test_is_node_for_redeploy(self): + self.assertFalse(manager._is_node_for_redeploy(None)) + + self.assertTrue(manager._is_node_for_redeploy(mock.MagicMock( + pending_addition=True, status=consts.NODE_STATUSES.discover + ))) + self.assertFalse(manager._is_node_for_redeploy(mock.MagicMock( + pending_addition=False, status=consts.NODE_STATUSES.ready + ))) + self.assertTrue(manager._is_node_for_redeploy(mock.MagicMock( + pending_addition=True, status=consts.NODE_STATUSES.ready + ))) + + +class TestGetTasksToRun(BaseUnitTest): + + @mock.patch('nailgun.transactions.manager.objects') + def test_get_tasks_if_no_legacy(self, objects_mock): + cluster_obj = objects_mock.Cluster + tasks = [ + {'id': 'tasks1', 'type': consts.ORCHESTRATOR_TASK_TYPES.puppet}, + {'id': 'tasks2', 'type': consts.ORCHESTRATOR_TASK_TYPES.group}, + {'id': 'tasks3', 'type': consts.ORCHESTRATOR_TASK_TYPES.shell}, + {'id': 'tasks4', 'type': consts.ORCHESTRATOR_TASK_TYPES.skipped} + ] + cluster_obj.get_deployment_tasks.return_value = tasks + cluster_obj.is_propagate_task_deploy_enabled.return_value = False + + cluster = mock.MagicMock() + result = manager._get_tasks_to_run(cluster, 'test', None, None) + self.assertEqual(tasks, result) + cluster_obj.get_deployment_tasks.assert_called_once_with( + cluster, 'test' + ) + cluster_obj.is_propagate_task_deploy_enabled.assert_called_once_with( + cluster + ) + + filtered_result = manager._get_tasks_to_run( + cluster, 'test', None, ['task2'] + ) + tasks[2]['type'] = consts.ORCHESTRATOR_TASK_TYPES.skipped + self.assertEqual(tasks, filtered_result) + + @mock.patch('nailgun.transactions.manager.objects') + @mock.patch('nailgun.transactions.manager.legacy_tasks_adapter') + def test_get_tasks_with_legacy(self, adapter_mock, objects_mock): + cluster_obj = objects_mock.Cluster + tasks = [ + {'id': 'tasks2', 'type': consts.ORCHESTRATOR_TASK_TYPES.group}, + ] + cluster_obj.get_deployment_tasks.return_value = tasks + cluster_obj.is_propagate_task_deploy_enabled.return_value = True + adapter_mock.adapt_legacy_tasks.return_value = tasks + + cluster = mock.MagicMock() + resolver = mock.MagicMock() + result = manager._get_tasks_to_run(cluster, 'test', resolver, None) + self.assertEqual(tasks, result) + + cluster_obj.is_propagate_task_deploy_enabled.assert_called_once_with( + cluster + ) + adapter_mock.adapt_legacy_tasks.assert_called_once_with( + tasks, None, resolver + ) + result2 = manager._get_tasks_to_run( + cluster, consts.DEFAULT_DEPLOYMENT_GRAPH_TYPE, resolver, None, + ) + self.assertEqual(tasks, result2) + + cluster_obj.get_legacy_plugin_tasks.assert_called_once_with(cluster) + adapter_mock.adapt_legacy_tasks.assert_called_with( + tasks, cluster_obj.get_legacy_plugin_tasks.return_value, resolver + ) diff --git a/nailgun/nailgun/transactions/__init__.py b/nailgun/nailgun/transactions/__init__.py new file mode 100644 index 0000000000..71e58f8f89 --- /dev/null +++ b/nailgun/nailgun/transactions/__init__.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- + +# Copyright 2016 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from nailgun.transactions.manager import TransactionsManager + +__all__ = ["TransactionsManager"] diff --git a/nailgun/nailgun/transactions/manager.py b/nailgun/nailgun/transactions/manager.py new file mode 100644 index 0000000000..075823dd69 --- /dev/null +++ b/nailgun/nailgun/transactions/manager.py @@ -0,0 +1,487 @@ +# -*- coding: utf-8 -*- + +# Copyright 2016 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import itertools + +import six + +from nailgun import consts +from nailgun.db import db +from nailgun import errors +from nailgun import lcm +from nailgun.logger import logger +from nailgun import objects +from nailgun.orchestrator import deployment_serializers +from nailgun import rpc +from nailgun.settings import settings +from nailgun.task import helpers +from nailgun.task import legacy_tasks_adapter +from nailgun.utils import dict_update +from nailgun.utils import mule +from nailgun.utils import role_resolver + + +def make_astute_message(transaction, context, tasks, node_resolver): + directory, graph, metadata = lcm.TransactionSerializer.serialize( + context, tasks, node_resolver + ) + objects.DeploymentHistoryCollection.create(transaction, graph) + return { + 'api_version': settings.VERSION['api'], + 'method': 'task_deploy', + 'respond_to': 'transaction_resp', + 'args': { + 'task_uuid': transaction.uuid, + 'tasks_directory': directory, + 'tasks_graph': graph, + 'tasks_metadata': metadata, + 'dry_run': transaction.dry_run, + } + } + + +class try_transaction(object): + """Wraps transaction in some sort of pre-/post- actions. + + So far it includes the following actions: + + * mark transaction as failed if exception has been raised; + * create an action log record on start/finish; + + :param transaction: a transaction instance to be wrapped + :param suppress: do not propagate exception if True + """ + + def __init__(self, transaction, suppress=False): + self._transaction = transaction + self._suppress = suppress + + def __enter__(self): + logger.debug("Transaction %s starts assembling.", self._transaction.id) + self._logitem = helpers.TaskHelper.create_action_log(self._transaction) + return self._transaction + + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_val: + logger.error( + "Transaction %s failed.", + self._transaction.id, exc_info=(exc_type, exc_val, exc_tb) + ) + objects.Task.update(self._transaction, { + 'status': consts.TASK_STATUSES.error, + 'progress': 100, + 'message': six.text_type(exc_val), + }) + helpers.TaskHelper.update_action_log( + self._transaction, self._logitem + ) + else: + logger.debug( + "Transaction %s finish assembling.", self._transaction.id + ) + return self._suppress + + +class TransactionsManager(object): + + # We're moving towards everything-is-a-graph approach where there's + # no place for transaction names. From now on we're going to use + # transaction's attributes (e.g. graph_type, dry_run) to find out + # what this transaction about. Still, we need to specify transaction + # name until we move everything to graphs. + task_name = consts.TASK_NAMES.deployment + + def __init__(self, cluster_id): + self.cluster_id = cluster_id + + def execute(self, graphs, dry_run=False, force=False): + """Start a new transaction with a given parameters. + + Under the hood starting a new transaction means serialize a lot of + stuff and assemble an Astute message. So at the end of method we + either send an Astute message with execution flow or mark transaction + as failed. + + :param graphs: a list of graph type to be run on a given nodes + :param dry_run: run a new transaction in dry run mode + :param force: re-evaluate tasks's conditions as it's a first run + """ + logger.debug( + 'Start new transaction: cluster=%d graphs=%s dry_run=%d force=%d', + self.cluster_id, graphs, dry_run, force + ) + + # So far we don't support parallel execution of transactions within + # one cluster. So we need to fail quickly in there's transaction + # in-progress. + cluster = self._acquire_cluster() + + # Unfortunately, by historical reasons UI polls 'deployment' tasks + # for cluster and expects there's only one. That one is considered + # as latest and is used for tracking progress and showing error + # message. So we have came up with the following workaround: + # + # * each new transaction we mark previous ones as deleted + # * /tasks endpoint doesn't return "deleted" transactions in response + # * /transactions endpoint does return "deleted" transactions + # + # FIXME: We must provide a way to get latest transaction with its + # sub-transactions via API. Once it's done, and UI uses it - + # we can safely remove this workaround. + _remove_obsolete_tasks(cluster) + + transaction = objects.Transaction.create({ + 'name': self.task_name, + 'cluster_id': self.cluster_id, + 'status': consts.TASK_STATUSES.pending, + 'dry_run': dry_run, + }) + + for graph in graphs: + # 'dry_run' flag is a part of transaction, so we can restore its + # value anywhere. That doesn't apply to 'force' flag, because it + # affects only context calculation. However we need somehow to + # pass it down in order to build context once first graph + # is executed (much much latter, when we call continue_ in RPC + # receiver). + cache = graph.copy() + cache['force'] = force + + transaction.create_subtask( + self.task_name, + status=consts.TASK_STATUSES.pending, + dry_run=dry_run, + graph_type=graph['type'], + # We need to save input parameters in cache, so RPC receiver + # can use them to do further serialization. + # + # FIXME: Consider to use a separate set of columns. + cache=cache, + ) + + # We need to commit transaction because asynchronous call below might + # be executed in separate process or thread. + db().commit() + + self.continue_(transaction) + return transaction + + def continue_(self, transaction): + """Pick next pending task and send it to execution. + + Transaction may consist of a number of sub-transactions. We should + execute them one-by-one. This method allows to pick first pending + transaction and send it to execution. + + :param transaction: a top-level transaction to continue + """ + with try_transaction(transaction, suppress=True): + # uWSGI mule is a separate process, and that means it won't share + # our DB session. Hence, we can't pass fetched DB instances to the + # function we want to be executed in mule, so let's proceed with + # unique identifiers. + mule.call_task_manager_async( + self.__class__, + '_continue_async', + self.cluster_id, + transaction.id, + ) + + def process(self, transaction, report): + """Process feedback from executor (Astute). + + :param transaction: a transaction to handle (sibling, not top level) + :param report: a report to process + """ + nodes = report.get('nodes', []) + error = report.get('error') + status = report.get('status') + progress = report.get('progress') + + # Report may contain two virtual nodes: master and cluster ('None'). + # Since we don't have them in database we should ensure we ain't + # going to update them. + nodes_params = { + str(node['uid']): node for node in nodes + if node['uid'] not in (consts.MASTER_NODE_UID, None) + } + nodes_instances = objects.NodeCollection.lock_for_update( + objects.NodeCollection.filter_by_list( + None, 'id', nodes_params.keys(), order_by=('id', ) + ) + ).all() + + _update_nodes(nodes_instances, nodes_params, transaction.dry_run) + _update_history(transaction, nodes) + + if status: + # FIXME: resolve circular dependencies by moving assemble task + # updates from receiver to objects layer. + from nailgun.rpc.receiver import NailgunReceiver + objects.Task.update_recursively( + transaction, + NailgunReceiver._assemble_task_update( + transaction, status, progress, error, nodes_instances + ) + ) + + # if transaction is completed successfully, we've got to initiate + # the next one in the chain + if transaction.parent and status == consts.TASK_STATUSES.ready: + self.continue_(transaction.parent) + + def _continue_async(self, transaction_id): + transaction = objects.Transaction.get_by_uid(transaction_id) + + with try_transaction(transaction, suppress=True): + self._continue_sync(transaction) + + # Since the whole function is executed in separate process, we must + # commit all changes in order to do not lost them. + db().commit() + + def _continue_sync(self, transaction): + sub_transaction = next(( + sub_transaction + for sub_transaction in transaction.subtasks + if sub_transaction.status == consts.TASK_STATUSES.pending), None) + + if sub_transaction is None: + return False + + cluster = sub_transaction.cluster + nodes = _get_nodes_to_run(cluster, sub_transaction.cache.get('nodes')) + resolver = role_resolver.RoleResolver(nodes) + tasks = _get_tasks_to_run( + cluster, + sub_transaction.graph_type, + resolver, + sub_transaction.cache.get('tasks')) + + context = lcm.TransactionContext( + _get_expected_state(cluster, nodes), + _get_current_state( + cluster, nodes, tasks, sub_transaction.cache.get('force') + )) + + # Attach desired state to the sub transaction, so when we continue + # our top-level transaction, the new state will be calculated on + # top of this. + _dump_expected_state(sub_transaction, context.new, tasks) + + with try_transaction(sub_transaction): + message = make_astute_message( + sub_transaction, context, tasks, resolver) + + # Once rpc.cast() is called, the message is sent to Astute. By + # that moment all transaction instanced must exist in database, + # otherwise we may get wrong result due to RPC receiver won't + # found entry to update. + db().commit() + rpc.cast('naily', [message]) + + def _acquire_cluster(self): + cluster = objects.Cluster.get_by_uid( + self.cluster_id, fail_if_not_found=True, lock_for_update=True + ) + cluster_tasks = objects.TaskCollection.get_by_cluster_id( + cluster_id=cluster.id + ) + cluster_tasks = objects.TaskCollection.filter_by( + cluster_tasks, name=self.task_name + ) + cluster_tasks = objects.TaskCollection.filter_by_list( + cluster_tasks, + 'status', + [consts.TASK_STATUSES.pending, consts.TASK_STATUSES.running] + ) + + # TODO(bgaifullin) need new lock approach for cluster + if objects.TaskCollection.count(cluster_tasks): + raise errors.DeploymentAlreadyStarted() + return cluster + + +def _remove_obsolete_tasks(cluster): + cluster_tasks = objects.TaskCollection.get_cluster_tasks(cluster.id) + cluster_tasks = objects.TaskCollection.order_by(cluster_tasks, 'id') + + for task in cluster_tasks: + if task.status in (consts.TASK_STATUSES.ready, + consts.TASK_STATUSES.error): + objects.Task.delete(task) + + db().flush() + + +def _get_nodes_to_run(cluster, ids=None): + # Trying to run tasks on offline nodes will lead to error, since most + # probably MCollective is unreachable. In order to avoid that, we need + # to select only online nodes. + nodes = objects.NodeCollection.filter_by( + None, cluster_id=cluster.id, online=True) + + if ids: + nodes = objects.NodeCollection.filter_by_list(nodes, 'id', ids) + + return objects.NodeCollection.lock_for_update( + objects.NodeCollection.order_by(nodes, 'id') + ).all() + + +def _get_tasks_to_run(cluster, graph_type, node_resolver, names=None): + tasks = objects.Cluster.get_deployment_tasks(cluster, graph_type) + if objects.Cluster.is_propagate_task_deploy_enabled(cluster): + # TODO(bgaifullin) move this code into Cluster.get_deployment_tasks + # after dependency from role_resolver will be removed + if graph_type == consts.DEFAULT_DEPLOYMENT_GRAPH_TYPE: + plugin_tasks = objects.Cluster.get_legacy_plugin_tasks(cluster) + else: + plugin_tasks = None + + tasks = list(legacy_tasks_adapter.adapt_legacy_tasks( + tasks, plugin_tasks, node_resolver + )) + + if names: + # filter task by names, mark all other task as skipped + task_ids = set(names) + for idx, task in enumerate(tasks): + if (task['id'] not in task_ids and + task['type'] not in consts.INTERNAL_TASKS): + + task = task.copy() + task['type'] = consts.ORCHESTRATOR_TASK_TYPES.skipped + tasks[idx] = task + + return tasks + + +def _is_node_for_redeploy(node): + if node is None: + return False + return ( + node.pending_addition or + node.status == consts.NODE_STATUSES.discover + ) + + +def _get_current_state(cluster, nodes, tasks, force=False): + # In case of force=True, the current state is {} which means: behave like + # an intial deployment. + if force: + return {} + + nodes = {n.uid: n for n in nodes} + nodes[consts.MASTER_NODE_UID] = None + tasks_names = [ + t['id'] for t in tasks if t['type'] not in consts.INTERNAL_TASKS + ] + + txs = objects.TransactionCollection.get_successful_transactions_per_task( + cluster.id, tasks_names, nodes + ) + state = {} + for tx, data in itertools.groupby(txs, lambda x: x[0]): + node_ids = [] + deferred_state = {} + for _, node_id, task_name in data: + t_state = state.setdefault(task_name, {}) + if _is_node_for_redeploy(nodes.get(node_id)): + t_state[node_id] = {} + else: + t_state[node_id] = deferred_state.setdefault(node_id, {}) + node_ids.append(node_id) + + dict_update( + deferred_state, + objects.Transaction.get_deployment_info(tx, node_uids=node_ids), + level=2 + ) + return state + + +def _get_expected_state(cluster, nodes): + info = deployment_serializers.serialize_for_lcm(cluster, nodes) + info = {n['uid']: n for n in info} + # Added cluster state + info[None] = {} + return info + + +def _dump_expected_state(transaction, state, tasks): + cluster = transaction.cluster + + objects.Transaction.attach_deployment_info(transaction, state) + objects.Transaction.attach_tasks_snapshot(transaction, tasks) + objects.Transaction.attach_cluster_settings( + transaction, + { + 'editable': objects.Cluster.get_editable_attributes(cluster, True) + }) + objects.Transaction.attach_network_settings( + transaction, objects.Cluster.get_network_attributes(cluster)) + + db().flush() + + +def _update_nodes(nodes_instances, nodes_params, dry_run=False): + allow_update = { + 'name', + 'status', + 'hostname', + 'kernel_params', + 'pending_addition', + 'pending_deletion', + 'error_type', + 'error_msg', + 'online', + 'progress', + } + + # dry-run transactions must not update nodes except progress column + if dry_run: + allow_update = {'progress'} + + for node in nodes_instances: + node_params = nodes_params.pop(node.uid) + + for param in allow_update.intersection(node_params): + setattr(node, param, node_params[param]) + + # TODO(ikalnitsky): Ensure Astute sends progress=100 in case of error. + if node.status == consts.NODE_STATUSES.error: + node.progress = 100 + + db.flush() + + if nodes_params: + logger.warning( + "The following nodes are not found: %s", + ",".join(sorted(nodes_params.keys())) + ) + + +def _update_history(transaction, nodes): + for node in nodes: + if {'deployment_graph_task_name', 'task_status'}.issubset(node.keys()): + objects.DeploymentHistory.update_if_exist( + transaction.id, + node['uid'], + node['deployment_graph_task_name'], + node['task_status'], + node.get('custom'), + )