Implement Transaction Manager

So far Nailgun has a bunch of managers for various tasks (transactions).
The way towards everything-is-a-graph means we won't have hardcoded
transactions and hence requires a new single manager to handle them all.

Transaction manager runs specified graphs on specified nodes within
single transaction (task), so anyone can poll it to check the progress.

Co-Authored-By: Igor Kalnitsky <igor@kalnitsky.org>

Change-Id: I98df7b98a07f64d97b561b85faa8472c3c551346
Blueprint: graph-concept-extension
This commit is contained in:
Bulat Gaifullin 2016-08-08 18:29:01 +03:00 committed by Igor Kalnitsky
parent beeeceadf4
commit 55d20c1119
No known key found for this signature in database
GPG Key ID: F05067E18910196E
11 changed files with 1403 additions and 29 deletions

View File

@ -66,7 +66,8 @@ class Task(Base):
subtasks = relationship(
"Task",
backref=backref('parent', remote_side=[id]),
cascade="all,delete"
cascade="all,delete",
order_by='Task.id'
)
notifications = relationship(
"Notification",

View File

@ -26,6 +26,7 @@ from nailgun.objects.oswl import OpenStackWorkloadStatsCollection
from nailgun.objects.deployment_graph import DeploymentGraph
from nailgun.objects.deployment_graph import DeploymentGraphCollection
from nailgun.objects.deployment_graph import DeploymentGraphTask
from nailgun.objects.release import Release
from nailgun.objects.release import ReleaseCollection

View File

@ -184,11 +184,10 @@ class Task(NailgunObject):
n.error_type = error_type
@classmethod
def __update_cluster_status(cls, cluster, status, expected_node_status):
def _update_cluster_status(cls, cluster, status, expected_node_status):
logger.debug(
"Updating cluster (%s) status: from %s to %s",
cluster.full_name, cluster.status, status)
if expected_node_status is not None:
remaining = Cluster.get_nodes_count_unmet_status(
cluster, expected_node_status
@ -222,7 +221,7 @@ class Task(NailgunObject):
n.status = consts.NODE_STATUSES.ready
n.progress = 100
cls.__update_cluster_status(
cls._update_cluster_status(
cluster,
consts.CLUSTER_STATUSES.operational,
consts.NODE_STATUSES.ready
@ -231,7 +230,7 @@ class Task(NailgunObject):
Cluster.clear_pending_changes(cluster)
elif instance.status == consts.TASK_STATUSES.error:
cls.__update_cluster_status(
cls._update_cluster_status(
cluster, consts.CLUSTER_STATUSES.error, None
)
q_nodes_to_error = TaskHelper.get_nodes_to_deployment_error(
@ -245,7 +244,7 @@ class Task(NailgunObject):
Cluster.set_vms_created_state(cluster)
elif (instance.status == consts.TASK_STATUSES.error and
not TaskHelper.before_deployment_error(instance)):
cls.__update_cluster_status(
cls._update_cluster_status(
cluster, consts.CLUSTER_STATUSES.error, None
)
elif instance.name == consts.TASK_NAMES.deploy and \
@ -255,17 +254,17 @@ class Task(NailgunObject):
# error because we don't want to lock
# settings if cluster wasn't deployed
cls.__update_cluster_status(
cls._update_cluster_status(
cluster, consts.CLUSTER_STATUSES.error, None
)
elif instance.name == consts.TASK_NAMES.provision:
if instance.status == consts.TASK_STATUSES.ready:
cls.__update_cluster_status(
cls._update_cluster_status(
cluster, consts.CLUSTER_STATUSES.partially_deployed, None
)
elif instance.status == consts.TASK_STATUSES.error:
cls.__update_cluster_status(
cls._update_cluster_status(
cluster, consts.CLUSTER_STATUSES.error, None
)
q_nodes_to_error = \
@ -275,11 +274,11 @@ class Task(NailgunObject):
q_nodes_to_error, error_type=consts.NODE_ERRORS.provision)
elif instance.name == consts.TASK_NAMES.stop_deployment:
if instance.status == consts.TASK_STATUSES.error:
cls.__update_cluster_status(
cls._update_cluster_status(
cluster, consts.CLUSTER_STATUSES.error, None
)
else:
cls.__update_cluster_status(
cls._update_cluster_status(
cluster, consts.CLUSTER_STATUSES.stopped, None
)
@ -290,6 +289,60 @@ class Task(NailgunObject):
result.pop('status', None)
return result
@classmethod
def update_recursively(cls, instance, data):
logger.debug("Updating task: %s", instance.uuid)
clean_data = cls._clean_data(data)
super(Task, cls).update(instance, data)
if instance.parent:
parent = instance.parent
siblings = parent.subtasks
status = clean_data.get('status')
if status == consts.TASK_STATUSES.ready:
clean_data['progress'] = 100
instance.progress = 100
ready_siblings_count = sum(
x.status == consts.TASK_STATUSES.ready for x in siblings
)
if ready_siblings_count == len(siblings):
parent.status = consts.TASK_STATUSES.ready
elif status == consts.TASK_STATUSES.error:
parent.status = consts.TASK_STATUSES.error
for s in siblings:
if s.status != consts.TASK_STATUSES.ready:
s.status = consts.TASK_STATUSES.error
s.progress = 100
s.message = "Task aborted"
clean_data['progress'] = 100
instance.progress = 100
TaskHelper.update_action_log(parent)
elif status == consts.TASK_STATUSES.running:
parent.status = consts.TASK_STATUSES.running
if 'progress' in clean_data:
total_progress = sum(x.progress for x in siblings)
parent.progress = total_progress // len(siblings)
task_status = parent.status
else:
task_status = instance.status
if not instance.dry_run:
if task_status == consts.TASK_STATUSES.ready:
cls._update_cluster_status(
instance.cluster,
consts.CLUSTER_STATUSES.operational,
consts.NODE_STATUSES.ready
)
elif task_status == consts.TASK_STATUSES.error:
cls._update_cluster_status(
instance.cluster,
consts.CLUSTER_STATUSES.error,
None
)
db().flush()
@classmethod
def update(cls, instance, data):
logger.debug("Updating task: %s", instance.uuid)

View File

@ -124,7 +124,7 @@ class TransactionCollection(NailgunCollection):
# TODO(bgaifullin) remove hardcoded name of task
return cls.filter_by(
None, cluster_id=cluster.id, name=consts.TASK_NAMES.deployment,
status=consts.TASK_STATUSES.ready
status=consts.TASK_STATUSES.ready, dry_run=False,
).order_by('-id').limit(1).first()
@classmethod
@ -148,6 +148,7 @@ class TransactionCollection(NailgunCollection):
).join(history).filter(
model.cluster_id == cluster_id,
model.name == consts.TASK_NAMES.deployment,
model.dry_run.is_(False),
history.status == consts.HISTORY_TASK_STATUSES.ready,
)

View File

@ -30,6 +30,7 @@ from nailgun import errors as nailgun_errors
from nailgun import notifier
from nailgun import objects
from nailgun.settings import settings
from nailgun import transactions
from nailgun.consts import TASK_STATUSES
from nailgun.db import db
@ -229,6 +230,21 @@ class NailgunReceiver(object):
objects.Task.update(task, {'status': status})
@classmethod
def transaction_resp(cls, **kwargs):
logger.info(
"RPC method transaction_resp received: %s", jsonutils.dumps(kwargs)
)
transaction = objects.Task.get_by_uuid(
kwargs.pop('task_uuid', None),
fail_if_not_found=True,
lock_for_update=True,
)
manager = transactions.TransactionsManager(transaction.cluster.id)
manager.process(transaction, kwargs)
@classmethod
def deploy_resp(cls, **kwargs):
logger.info(
@ -463,6 +479,25 @@ class NailgunReceiver(object):
task_uuid=task_uuid
)
@classmethod
def _assemble_task_update(cls, task, status, progress, message, nodes):
"""Assemble arguments to update task.
:param task: objects.Task object
:param status: consts.TASK_STATUSES value
:param progress: progress number value
:param message: message text
:param nodes: the modified nodes list
"""
if status == consts.TASK_STATUSES.error:
data = cls._error_action(task, status, progress, message)
elif status == consts.TASK_STATUSES.ready:
data = cls._success_action(task, status, progress, nodes)
else:
data = {'status': status, 'progress': progress, 'message': message}
return data
@classmethod
def _update_task_status(cls, task, status, progress, message, nodes):
"""Do update task status actions.
@ -473,14 +508,10 @@ class NailgunReceiver(object):
:param message: message text
:param nodes: the modified nodes list
"""
# Let's check the whole task status
if status == consts.TASK_STATUSES.error:
cls._error_action(task, status, progress, message)
elif status == consts.TASK_STATUSES.ready:
cls._success_action(task, status, progress, nodes)
else:
data = {'status': status, 'progress': progress, 'message': message}
objects.Task.update(task, data)
objects.Task.update(
task,
cls._assemble_task_update(task, status, progress, message, nodes)
)
@classmethod
def _update_action_log_entry(cls, task_status, task_name, task_uuid,
@ -562,15 +593,14 @@ class NailgunReceiver(object):
notify_message = message if error_message is not None else None
cls._notify(task, consts.NOTIFICATION_TOPICS.error, notify_message)
data = {'status': status, 'progress': progress, 'message': message}
objects.Task.update(task, data)
return {'status': status, 'progress': progress, 'message': message}
@classmethod
def _success_action(cls, task, status, progress, nodes):
# check if all nodes are ready
# we shouldn't report success if there's at least one node in
# error state
if any(n.status == consts.NODE_STATUSES.error for n in nodes):
cls._error_action(task, 'error', 100)
return
return cls._error_action(task, 'error', 100)
task_name = task.name.title()
if nodes:
@ -596,8 +626,7 @@ class NailgunReceiver(object):
message = '{0}\n\n{1}'.format(message, plugins_msg)
cls._notify(task, consts.NOTIFICATION_TOPICS.done, message)
data = {'status': status, 'progress': progress, 'message': message}
objects.Task.update(task, data)
return {'status': status, 'progress': progress, 'message': message}
@classmethod
def _make_plugins_success_message(cls, plugins):

View File

@ -0,0 +1,412 @@
# -*- coding: utf-8 -*-
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from nailgun import consts
from nailgun import objects
from nailgun.rpc import receiver
from nailgun.transactions import manager
from nailgun.test import base
class TestTransactionManager(base.BaseIntegrationTest):
def setUp(self):
super(TestTransactionManager, self).setUp()
self.cluster = self.env.create(
cluster_kwargs={},
nodes_kwargs=[
{"status": consts.NODE_STATUSES.discover},
{"status": consts.NODE_STATUSES.discover, "online": False},
],
release_kwargs={
'version': 'mitaka-9.0',
'operating_system': consts.RELEASE_OS.ubuntu
})
self.graph = objects.DeploymentGraph.create_for_model(
{
'tasks': [
{
'id': 'test_task',
'type': consts.ORCHESTRATOR_TASK_TYPES.puppet,
'roles': ['/.*/']
},
],
'name': 'test_graph',
},
instance=self.cluster,
graph_type='test_graph')
self.manager = manager.TransactionsManager(self.cluster.id)
self.receiver = receiver.NailgunReceiver
def _sucess(self, transaction_uuid):
self.receiver.transaction_resp(
task_uuid=transaction_uuid,
nodes=[
{'uid': n.uid, 'status': consts.NODE_STATUSES.ready}
for n in self.cluster.nodes
],
progress=100,
status=consts.TASK_STATUSES.ready)
def _fail(self, transaction_uuid):
self.receiver.transaction_resp(
task_uuid=transaction_uuid,
nodes=[
{'uid': n.uid, 'status': consts.NODE_STATUSES.error}
for n in self.cluster.nodes
],
progress=100,
status=consts.TASK_STATUSES.error)
@mock.patch('nailgun.transactions.manager.rpc')
def test_execute_graph(self, rpc_mock):
task = self.manager.execute(graphs=[{"type": "test_graph"}])
rpc_mock.cast.assert_called_once_with(
'naily',
[{
'args': {
'tasks_metadata': {'fault_tolerance_groups': []},
'task_uuid': task.subtasks[0].uuid,
'tasks_graph': {
None: [],
self.cluster.nodes[0].uid: [
{
'id': 'test_task',
'type': 'puppet',
'fail_on_error': True,
'parameters': {'cwd': '/'}
},
]
},
'tasks_directory': {},
'dry_run': False,
},
'respond_to': 'transaction_resp',
'method': 'task_deploy',
'api_version': '1'
}])
self._sucess(task.subtasks[0].uuid)
self.assertEqual(task.status, consts.TASK_STATUSES.ready)
@mock.patch('nailgun.transactions.manager.rpc')
def test_execute_few_graphs(self, rpc_mock):
objects.DeploymentGraph.create_for_model(
{
'tasks': [
{
'id': 'super-mega-other-task',
'type': consts.ORCHESTRATOR_TASK_TYPES.puppet,
'roles': ['/.*/']
},
],
'name': 'test_graph_2',
},
instance=self.cluster,
graph_type='test_graph_2')
task = self.manager.execute(graphs=[
{"type": "test_graph"},
{"type": "test_graph_2"},
])
self.assertItemsEqual(
["test_graph", "test_graph_2"],
[sub.graph_type for sub in task.subtasks])
# Only a message for the first graph should be sent, because
# the second graph should be sent by RPC receiver once first
# one is completed.
rpc_mock.cast.assert_called_once_with(
'naily',
[{
'args': {
'tasks_metadata': {'fault_tolerance_groups': []},
'task_uuid': task.subtasks[0].uuid,
'tasks_graph': {
None: [],
self.cluster.nodes[0].uid: [
{
'id': 'test_task',
'type': 'puppet',
'fail_on_error': True,
'parameters': {'cwd': '/'}
},
]
},
'tasks_directory': {},
'dry_run': False,
},
'respond_to': 'transaction_resp',
'method': 'task_deploy',
'api_version': '1'
}])
# Consider we've got success from Astute.
self._sucess(task.subtasks[0].uuid)
# It's time to send the second graph to execution.
rpc_mock.cast.assert_called_with(
'naily',
[{
'args': {
'tasks_metadata': {'fault_tolerance_groups': []},
'task_uuid': task.subtasks[1].uuid,
'tasks_graph': {
None: [],
self.cluster.nodes[0].uid: [
{
'id': 'super-mega-other-task',
'type': 'puppet',
'fail_on_error': True,
'parameters': {'cwd': '/'}
},
]
},
'tasks_directory': {},
'dry_run': False,
},
'respond_to': 'transaction_resp',
'method': 'task_deploy',
'api_version': '1'
}])
# Consider we've got success from Astute.
self._sucess(task.subtasks[1].uuid)
# Ensure the top leve transaction is ready.
self.assertEqual(task.status, consts.TASK_STATUSES.ready)
@mock.patch('nailgun.transactions.manager.rpc')
def test_execute_few_graphs_first_fail(self, rpc_mock):
objects.DeploymentGraph.create_for_model(
{
'tasks': [
{
'id': 'super-mega-other-task',
'type': consts.ORCHESTRATOR_TASK_TYPES.puppet,
'roles': ['/.*/']
},
],
'name': 'test_graph_2',
},
instance=self.cluster,
graph_type='test_graph_2')
task = self.manager.execute(graphs=[
{"type": "test_graph"},
{"type": "test_graph_2"},
])
self.assertItemsEqual(
["test_graph", "test_graph_2"],
[sub.graph_type for sub in task.subtasks])
# Only a message for the first graph should be sent, because
# the second graph should be sent by RPC receiver once first
# one is completed.
rpc_mock.cast.assert_called_once_with(
'naily',
[{
'args': {
'tasks_metadata': {'fault_tolerance_groups': []},
'task_uuid': task.subtasks[0].uuid,
'tasks_graph': {
None: [],
self.cluster.nodes[0].uid: [
{
'id': 'test_task',
'type': 'puppet',
'fail_on_error': True,
'parameters': {'cwd': '/'}
},
]
},
'tasks_directory': {},
'dry_run': False,
},
'respond_to': 'transaction_resp',
'method': 'task_deploy',
'api_version': '1'
}])
self._fail(task.subtasks[0].uuid)
self.assertEqual(rpc_mock.cast.call_count, 1)
self.assertEqual(task.status, consts.TASK_STATUSES.error)
@mock.patch('nailgun.transactions.manager.rpc')
def test_execute_w_task(self, rpc_mock):
self.graph.tasks.append(objects.DeploymentGraphTask.create(
{
'id': 'test_task_2',
'type': consts.ORCHESTRATOR_TASK_TYPES.puppet,
'roles': ['/.*/']
}))
task = self.manager.execute(graphs=[
{
"type": "test_graph",
"tasks": ["test_task"],
}])
rpc_mock.cast.assert_called_once_with(
'naily',
[{
'args': {
'tasks_metadata': {'fault_tolerance_groups': []},
'task_uuid': task.subtasks[0].uuid,
'tasks_graph': {
None: [],
self.cluster.nodes[0].uid: mock.ANY,
},
'tasks_directory': {},
'dry_run': False,
},
'respond_to': 'transaction_resp',
'method': 'task_deploy',
'api_version': '1'
}])
tasks_graph = rpc_mock.cast.call_args[0][1][0]['args']['tasks_graph']
self.assertItemsEqual(tasks_graph[self.cluster.nodes[0].uid], [
{
'id': 'test_task',
'type': 'puppet',
'fail_on_error': True,
'parameters': {'cwd': '/'}
},
{
'id': 'test_task_2',
'type': 'skipped',
'fail_on_error': False,
}
])
self._sucess(task.subtasks[0].uuid)
self.assertEqual(task.status, consts.TASK_STATUSES.ready)
@mock.patch('nailgun.transactions.manager.rpc')
def test_execute_w_non_existing_task(self, rpc_mock):
task = self.manager.execute(graphs=[
{
"type": "test_graph",
"tasks": ["non_exist"],
}])
rpc_mock.cast.assert_called_once_with(
'naily',
[{
'args': {
'tasks_metadata': {'fault_tolerance_groups': []},
'task_uuid': task.subtasks[0].uuid,
'tasks_graph': {
None: [],
self.cluster.nodes[0].uid: [
{
'id': 'test_task',
'type': 'skipped',
'fail_on_error': False,
},
]
},
'tasks_directory': {},
'dry_run': False,
},
'respond_to': 'transaction_resp',
'method': 'task_deploy',
'api_version': '1'
}])
self._sucess(task.subtasks[0].uuid)
self.assertEqual(task.status, consts.TASK_STATUSES.ready)
@mock.patch('nailgun.transactions.manager.rpc')
def test_execute_dry_run(self, rpc_mock):
task = self.manager.execute(
graphs=[{"type": "test_graph"}], dry_run=True)
rpc_mock.cast.assert_called_once_with(
'naily',
[{
'args': {
'tasks_metadata': {'fault_tolerance_groups': []},
'task_uuid': task.subtasks[0].uuid,
'tasks_graph': {
None: [],
self.cluster.nodes[0].uid: [
{
'id': 'test_task',
'type': 'puppet',
'fail_on_error': True,
'parameters': {'cwd': '/'}
},
]
},
'tasks_directory': {},
'dry_run': True,
},
'respond_to': 'transaction_resp',
'method': 'task_deploy',
'api_version': '1'
}])
self._sucess(task.subtasks[0].uuid)
self.assertEqual(task.status, consts.TASK_STATUSES.ready)
@mock.patch('nailgun.transactions.manager.rpc')
def test_execute_on_one_node(self, rpc_mock):
node = self.env.create_node(
cluster_id=self.cluster.id, pending_roles=["compute"])
task = self.manager.execute(graphs=[
{
"type": "test_graph",
"nodes": [node.id],
}])
rpc_mock.cast.assert_called_once_with(
'naily',
[{
'args': {
'tasks_metadata': {'fault_tolerance_groups': []},
'task_uuid': task.subtasks[0].uuid,
'tasks_graph': {
None: [],
node.uid: [
{
'id': 'test_task',
'type': 'puppet',
'fail_on_error': True,
'parameters': {'cwd': '/'}
},
]
},
'tasks_directory': {},
'dry_run': False,
},
'respond_to': 'transaction_resp',
'method': 'task_deploy',
'api_version': '1'
}]
)
self._sucess(task.subtasks[0].uuid)
self.assertEqual(task.status, consts.TASK_STATUSES.ready)

View File

@ -2197,3 +2197,123 @@ class TestOpenstackConfigCollection(BaseTestCase):
self.assertEqual(config.node_id, node_id)
self.assertEqual(configs[0].config_type,
consts.OPENSTACK_CONFIG_TYPES.node)
def test_task_update_recursively(self):
parent = self.env.create_task(
name=consts.TASK_NAMES.deployment,
status=consts.TASK_STATUSES.pending,
cluster_id=self.cluster.id
)
child1 = parent.create_subtask(
name=consts.TASK_NAMES.deployment,
status=consts.TASK_STATUSES.pending
)
child2 = parent.create_subtask(
name=consts.TASK_NAMES.deployment,
status=consts.TASK_STATUSES.pending
)
# update progress for child1
objects.Task.update_recursively(
child1, {'status': consts.TASK_STATUSES.running, 'progress': 50}
)
self.assertEqual(50, child1.progress)
self.assertEqual(consts.TASK_STATUSES.running, child1.status)
self.assertEqual(25, parent.progress)
self.assertEqual(consts.TASK_STATUSES.running, parent.status)
# finish child 1
objects.Task.update_recursively(
child1, {'status': consts.TASK_STATUSES.ready}
)
self.assertEqual(100, child1.progress)
self.assertEqual(consts.TASK_STATUSES.ready, child1.status)
self.assertEqual(50, parent.progress)
self.assertEqual(consts.TASK_STATUSES.running, parent.status)
# finish child 2
objects.Task.update_recursively(
child2, {'status': consts.TASK_STATUSES.ready}
)
self.assertEqual(100, parent.progress)
self.assertEqual(consts.TASK_STATUSES.ready, parent.status)
# fail child 2 when child1 is ready
objects.Task.update_recursively(
child2, {'status': consts.TASK_STATUSES.error}
)
self.assertEqual(100, parent.progress)
self.assertEqual(consts.TASK_STATUSES.error, parent.status)
self.assertEqual(consts.TASK_STATUSES.ready, child1.status)
child1.status = consts.TASK_STATUSES.running
objects.Task.update_recursively(
child2, {'status': consts.TASK_STATUSES.error}
)
self.assertEqual(100, parent.progress)
self.assertEqual(consts.TASK_STATUSES.error, parent.status)
self.assertEqual(consts.TASK_STATUSES.error, child1.status)
def test_update_cluster_status_on_updating_task_status(self):
with mock.patch.object(objects.Task, '_update_cluster_status') as m:
task = self.env.create_task(
name=consts.TASK_NAMES.deployment,
status=consts.TASK_STATUSES.running,
cluster_id=self.cluster.id,
dry_run=True
)
child1 = task.create_subtask(
name=consts.TASK_NAMES.deployment,
status=consts.TASK_STATUSES.pending,
dry_run=True
)
child2 = task.create_subtask(
name=consts.TASK_NAMES.deployment,
status=consts.TASK_STATUSES.pending,
dry_run=True
)
objects.Task.update_recursively(
task, {'status': consts.TASK_STATUSES.error}
)
objects.Task.update_recursively(
task, {'status': consts.TASK_STATUSES.ready}
)
task.dry_run = False
child1.dry_run = False
child2.dry_run = False
task.status = consts.TASK_STATUSES.running
objects.Task.update_recursively(
child1, {'status': consts.TASK_STATUSES.ready}
)
self.assertEqual(0, m.call_count)
objects.Task.update_recursively(
child2, {'status': consts.TASK_STATUSES.ready}
)
m.assert_called_with(
task.cluster,
consts.CLUSTER_STATUSES.operational,
consts.NODE_STATUSES.ready
)
objects.Task.update_recursively(
child2, {'status': consts.TASK_STATUSES.error}
)
m.assert_called_with(
task.cluster,
consts.CLUSTER_STATUSES.error,
None
)
objects.Task.update_recursively(
task, {'status': consts.TASK_STATUSES.ready}
)
m.assert_called_with(
task.cluster,
consts.CLUSTER_STATUSES.operational,
consts.NODE_STATUSES.ready
)
objects.Task.update_recursively(
task, {'status': consts.TASK_STATUSES.error}
)
m.assert_called_with(
task.cluster,
consts.CLUSTER_STATUSES.error,
None
)

View File

@ -54,11 +54,11 @@ class TestNailgunReceiver(base.BaseTestCase):
cluster_id=self.cluster.id)
def test_success_action_with_plugins(self):
NailgunReceiver._success_action(
data = NailgunReceiver._success_action(
self.task, 'ready', 100, self.env.nodes
)
self.assertRegexpMatches(
self.task.message,
data['message'],
"Deployment of environment '[^\s]+' is done."
"\n\n"
"Plugin name\d is deployed. description\d\n"
@ -267,3 +267,85 @@ class TestNailgunReceiver(base.BaseTestCase):
node_id="123",
task_uuid=sub_task.uuid
)
def test_transaction_resp_update_node_attributes(self):
task = self.env.create_task(
name=consts.TASK_NAMES.deployment,
status=consts.TASK_STATUSES.running,
cluster_id=self.cluster.id
)
node = self.cluster.nodes[0]
node.status = consts.NODE_STATUSES.provisioned
node.progress = 1
node.pending_addition = True
NailgunReceiver.transaction_resp(
task_uuid=task.uuid,
nodes=[{
'uid': node.uid, 'progress': 50, 'pending_addition': False
}]
)
self.db.refresh(node)
self.assertEqual(50, node.progress)
self.assertFalse(node.pending_addition)
@patch('nailgun.rpc.receiver.notifier.notify')
def test_transaction_resp_update_transaction_status(self, _):
task = self.env.create_task(
name=consts.TASK_NAMES.deployment,
status=consts.TASK_STATUSES.running,
cluster_id=self.cluster.id
)
NailgunReceiver.transaction_resp(
task_uuid=task.uuid,
status=consts.TASK_STATUSES.ready
)
self.db.refresh(task)
self.assertEqual(consts.TASK_STATUSES.ready, task.status)
@patch('nailgun.rpc.receiver.notifier.notify')
def test_transaction_resp_does_not_update_nodes_if_dry_run(self, _):
task = self.env.create_task(
name=consts.TASK_NAMES.deployment,
status=consts.TASK_STATUSES.running,
cluster_id=self.cluster.id,
dry_run=True
)
self.cluster.status = consts.CLUSTER_STATUSES.operational
node = self.cluster.nodes[0]
node.status = consts.NODE_STATUSES.provisioned
NailgunReceiver.transaction_resp(
task_uuid=task.uuid,
status=consts.TASK_STATUSES.ready,
nodes=[{'uid': node.uid, 'status': consts.NODE_STATUSES.ready}]
)
self.db.refresh(task)
self.db.refresh(node)
self.db.refresh(self.cluster)
self.assertEqual(consts.TASK_STATUSES.ready, task.status)
self.assertEqual(consts.NODE_STATUSES.provisioned, node.status)
self.assertEqual(
consts.CLUSTER_STATUSES.operational, self.cluster.status
)
def test_transaction_resp_does_not_fail_on_virtual_nodes(self):
task = self.env.create_task(
name=consts.TASK_NAMES.deployment,
status=consts.TASK_STATUSES.running,
cluster_id=self.cluster.id,
dry_run=True
)
NailgunReceiver.transaction_resp(
task_uuid=task.uuid,
status=consts.TASK_STATUSES.running,
nodes=[
{
'uid': consts.MASTER_NODE_UID,
'status': consts.NODE_STATUSES.provisioned,
},
{
# cluster node uid is null
'uid': None,
'status': consts.NODE_STATUSES.provisioned,
},
])

View File

@ -0,0 +1,169 @@
# -*- coding: utf-8 -*-
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from nailgun import consts
from nailgun.transactions import manager
from nailgun.test.base import BaseUnitTest
class TestMakeAstuteMessage(BaseUnitTest):
@mock.patch('nailgun.transactions.manager.objects')
@mock.patch('nailgun.transactions.manager.lcm')
def test_make_astute_message(self, lcm_mock, obj_mock):
resolver = mock.MagicMock()
context = mock.MagicMock()
tx = mock.MagicMock(dry_run=False)
tasks = mock.MagicMock()
tasks_directory = mock.MagicMock()
tasks_graph = mock.MagicMock()
tasks_metadata = mock.MagicMock()
lcm_mock.TransactionSerializer.serialize.return_value = (
tasks_directory, tasks_graph, tasks_metadata
)
result = manager.make_astute_message(tx, context, tasks, resolver)
self.assertEqual(
{
'api_version': manager.settings.VERSION['api'],
'method': 'task_deploy',
'respond_to': 'transaction_resp',
'args': {
'task_uuid': tx.uuid,
'tasks_directory': tasks_directory,
'tasks_graph': tasks_graph,
'tasks_metadata': tasks_metadata,
'dry_run': False,
}
},
result
)
lcm_mock.TransactionSerializer.serialize.assert_called_once_with(
context, tasks, resolver
)
obj_mock.DeploymentHistoryCollection.create.assert_called_once_with(
tx, tasks_graph
)
class TestRemoveObsoleteTasks(BaseUnitTest):
@mock.patch('nailgun.transactions.manager.db')
@mock.patch('nailgun.transactions.manager.objects')
def test_remove_obsolete_tasks(self, objects_mock, db_mock):
tasks = [
mock.MagicMock(status=consts.TASK_STATUSES.ready),
mock.MagicMock(status=consts.TASK_STATUSES.error),
mock.MagicMock(status=consts.TASK_STATUSES.running),
]
objects_mock.TaskCollection.order_by.return_value = tasks
cluster = mock.MagicMock()
manager._remove_obsolete_tasks(cluster)
db_mock().flush.assert_called_once_with()
objects_mock.TaskCollection.get_cluster_tasks.assert_called_once_with(
cluster.id
)
objects_mock.TaskCollection.order_by(
objects_mock.TaskCollection.get_cluster_tasks.return_value, 'id'
)
objects_mock.Task.delete.assert_has_calls([
mock.call(tasks[0]), mock.call(tasks[1])
])
class TestNodeForRedeploy(BaseUnitTest):
def test_is_node_for_redeploy(self):
self.assertFalse(manager._is_node_for_redeploy(None))
self.assertTrue(manager._is_node_for_redeploy(mock.MagicMock(
pending_addition=True, status=consts.NODE_STATUSES.discover
)))
self.assertFalse(manager._is_node_for_redeploy(mock.MagicMock(
pending_addition=False, status=consts.NODE_STATUSES.ready
)))
self.assertTrue(manager._is_node_for_redeploy(mock.MagicMock(
pending_addition=True, status=consts.NODE_STATUSES.ready
)))
class TestGetTasksToRun(BaseUnitTest):
@mock.patch('nailgun.transactions.manager.objects')
def test_get_tasks_if_no_legacy(self, objects_mock):
cluster_obj = objects_mock.Cluster
tasks = [
{'id': 'tasks1', 'type': consts.ORCHESTRATOR_TASK_TYPES.puppet},
{'id': 'tasks2', 'type': consts.ORCHESTRATOR_TASK_TYPES.group},
{'id': 'tasks3', 'type': consts.ORCHESTRATOR_TASK_TYPES.shell},
{'id': 'tasks4', 'type': consts.ORCHESTRATOR_TASK_TYPES.skipped}
]
cluster_obj.get_deployment_tasks.return_value = tasks
cluster_obj.is_propagate_task_deploy_enabled.return_value = False
cluster = mock.MagicMock()
result = manager._get_tasks_to_run(cluster, 'test', None, None)
self.assertEqual(tasks, result)
cluster_obj.get_deployment_tasks.assert_called_once_with(
cluster, 'test'
)
cluster_obj.is_propagate_task_deploy_enabled.assert_called_once_with(
cluster
)
filtered_result = manager._get_tasks_to_run(
cluster, 'test', None, ['task2']
)
tasks[2]['type'] = consts.ORCHESTRATOR_TASK_TYPES.skipped
self.assertEqual(tasks, filtered_result)
@mock.patch('nailgun.transactions.manager.objects')
@mock.patch('nailgun.transactions.manager.legacy_tasks_adapter')
def test_get_tasks_with_legacy(self, adapter_mock, objects_mock):
cluster_obj = objects_mock.Cluster
tasks = [
{'id': 'tasks2', 'type': consts.ORCHESTRATOR_TASK_TYPES.group},
]
cluster_obj.get_deployment_tasks.return_value = tasks
cluster_obj.is_propagate_task_deploy_enabled.return_value = True
adapter_mock.adapt_legacy_tasks.return_value = tasks
cluster = mock.MagicMock()
resolver = mock.MagicMock()
result = manager._get_tasks_to_run(cluster, 'test', resolver, None)
self.assertEqual(tasks, result)
cluster_obj.is_propagate_task_deploy_enabled.assert_called_once_with(
cluster
)
adapter_mock.adapt_legacy_tasks.assert_called_once_with(
tasks, None, resolver
)
result2 = manager._get_tasks_to_run(
cluster, consts.DEFAULT_DEPLOYMENT_GRAPH_TYPE, resolver, None,
)
self.assertEqual(tasks, result2)
cluster_obj.get_legacy_plugin_tasks.assert_called_once_with(cluster)
adapter_mock.adapt_legacy_tasks.assert_called_with(
tasks, cluster_obj.get_legacy_plugin_tasks.return_value, resolver
)

View File

@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nailgun.transactions.manager import TransactionsManager
__all__ = ["TransactionsManager"]

View File

@ -0,0 +1,487 @@
# -*- coding: utf-8 -*-
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import itertools
import six
from nailgun import consts
from nailgun.db import db
from nailgun import errors
from nailgun import lcm
from nailgun.logger import logger
from nailgun import objects
from nailgun.orchestrator import deployment_serializers
from nailgun import rpc
from nailgun.settings import settings
from nailgun.task import helpers
from nailgun.task import legacy_tasks_adapter
from nailgun.utils import dict_update
from nailgun.utils import mule
from nailgun.utils import role_resolver
def make_astute_message(transaction, context, tasks, node_resolver):
directory, graph, metadata = lcm.TransactionSerializer.serialize(
context, tasks, node_resolver
)
objects.DeploymentHistoryCollection.create(transaction, graph)
return {
'api_version': settings.VERSION['api'],
'method': 'task_deploy',
'respond_to': 'transaction_resp',
'args': {
'task_uuid': transaction.uuid,
'tasks_directory': directory,
'tasks_graph': graph,
'tasks_metadata': metadata,
'dry_run': transaction.dry_run,
}
}
class try_transaction(object):
"""Wraps transaction in some sort of pre-/post- actions.
So far it includes the following actions:
* mark transaction as failed if exception has been raised;
* create an action log record on start/finish;
:param transaction: a transaction instance to be wrapped
:param suppress: do not propagate exception if True
"""
def __init__(self, transaction, suppress=False):
self._transaction = transaction
self._suppress = suppress
def __enter__(self):
logger.debug("Transaction %s starts assembling.", self._transaction.id)
self._logitem = helpers.TaskHelper.create_action_log(self._transaction)
return self._transaction
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_val:
logger.error(
"Transaction %s failed.",
self._transaction.id, exc_info=(exc_type, exc_val, exc_tb)
)
objects.Task.update(self._transaction, {
'status': consts.TASK_STATUSES.error,
'progress': 100,
'message': six.text_type(exc_val),
})
helpers.TaskHelper.update_action_log(
self._transaction, self._logitem
)
else:
logger.debug(
"Transaction %s finish assembling.", self._transaction.id
)
return self._suppress
class TransactionsManager(object):
# We're moving towards everything-is-a-graph approach where there's
# no place for transaction names. From now on we're going to use
# transaction's attributes (e.g. graph_type, dry_run) to find out
# what this transaction about. Still, we need to specify transaction
# name until we move everything to graphs.
task_name = consts.TASK_NAMES.deployment
def __init__(self, cluster_id):
self.cluster_id = cluster_id
def execute(self, graphs, dry_run=False, force=False):
"""Start a new transaction with a given parameters.
Under the hood starting a new transaction means serialize a lot of
stuff and assemble an Astute message. So at the end of method we
either send an Astute message with execution flow or mark transaction
as failed.
:param graphs: a list of graph type to be run on a given nodes
:param dry_run: run a new transaction in dry run mode
:param force: re-evaluate tasks's conditions as it's a first run
"""
logger.debug(
'Start new transaction: cluster=%d graphs=%s dry_run=%d force=%d',
self.cluster_id, graphs, dry_run, force
)
# So far we don't support parallel execution of transactions within
# one cluster. So we need to fail quickly in there's transaction
# in-progress.
cluster = self._acquire_cluster()
# Unfortunately, by historical reasons UI polls 'deployment' tasks
# for cluster and expects there's only one. That one is considered
# as latest and is used for tracking progress and showing error
# message. So we have came up with the following workaround:
#
# * each new transaction we mark previous ones as deleted
# * /tasks endpoint doesn't return "deleted" transactions in response
# * /transactions endpoint does return "deleted" transactions
#
# FIXME: We must provide a way to get latest transaction with its
# sub-transactions via API. Once it's done, and UI uses it -
# we can safely remove this workaround.
_remove_obsolete_tasks(cluster)
transaction = objects.Transaction.create({
'name': self.task_name,
'cluster_id': self.cluster_id,
'status': consts.TASK_STATUSES.pending,
'dry_run': dry_run,
})
for graph in graphs:
# 'dry_run' flag is a part of transaction, so we can restore its
# value anywhere. That doesn't apply to 'force' flag, because it
# affects only context calculation. However we need somehow to
# pass it down in order to build context once first graph
# is executed (much much latter, when we call continue_ in RPC
# receiver).
cache = graph.copy()
cache['force'] = force
transaction.create_subtask(
self.task_name,
status=consts.TASK_STATUSES.pending,
dry_run=dry_run,
graph_type=graph['type'],
# We need to save input parameters in cache, so RPC receiver
# can use them to do further serialization.
#
# FIXME: Consider to use a separate set of columns.
cache=cache,
)
# We need to commit transaction because asynchronous call below might
# be executed in separate process or thread.
db().commit()
self.continue_(transaction)
return transaction
def continue_(self, transaction):
"""Pick next pending task and send it to execution.
Transaction may consist of a number of sub-transactions. We should
execute them one-by-one. This method allows to pick first pending
transaction and send it to execution.
:param transaction: a top-level transaction to continue
"""
with try_transaction(transaction, suppress=True):
# uWSGI mule is a separate process, and that means it won't share
# our DB session. Hence, we can't pass fetched DB instances to the
# function we want to be executed in mule, so let's proceed with
# unique identifiers.
mule.call_task_manager_async(
self.__class__,
'_continue_async',
self.cluster_id,
transaction.id,
)
def process(self, transaction, report):
"""Process feedback from executor (Astute).
:param transaction: a transaction to handle (sibling, not top level)
:param report: a report to process
"""
nodes = report.get('nodes', [])
error = report.get('error')
status = report.get('status')
progress = report.get('progress')
# Report may contain two virtual nodes: master and cluster ('None').
# Since we don't have them in database we should ensure we ain't
# going to update them.
nodes_params = {
str(node['uid']): node for node in nodes
if node['uid'] not in (consts.MASTER_NODE_UID, None)
}
nodes_instances = objects.NodeCollection.lock_for_update(
objects.NodeCollection.filter_by_list(
None, 'id', nodes_params.keys(), order_by=('id', )
)
).all()
_update_nodes(nodes_instances, nodes_params, transaction.dry_run)
_update_history(transaction, nodes)
if status:
# FIXME: resolve circular dependencies by moving assemble task
# updates from receiver to objects layer.
from nailgun.rpc.receiver import NailgunReceiver
objects.Task.update_recursively(
transaction,
NailgunReceiver._assemble_task_update(
transaction, status, progress, error, nodes_instances
)
)
# if transaction is completed successfully, we've got to initiate
# the next one in the chain
if transaction.parent and status == consts.TASK_STATUSES.ready:
self.continue_(transaction.parent)
def _continue_async(self, transaction_id):
transaction = objects.Transaction.get_by_uid(transaction_id)
with try_transaction(transaction, suppress=True):
self._continue_sync(transaction)
# Since the whole function is executed in separate process, we must
# commit all changes in order to do not lost them.
db().commit()
def _continue_sync(self, transaction):
sub_transaction = next((
sub_transaction
for sub_transaction in transaction.subtasks
if sub_transaction.status == consts.TASK_STATUSES.pending), None)
if sub_transaction is None:
return False
cluster = sub_transaction.cluster
nodes = _get_nodes_to_run(cluster, sub_transaction.cache.get('nodes'))
resolver = role_resolver.RoleResolver(nodes)
tasks = _get_tasks_to_run(
cluster,
sub_transaction.graph_type,
resolver,
sub_transaction.cache.get('tasks'))
context = lcm.TransactionContext(
_get_expected_state(cluster, nodes),
_get_current_state(
cluster, nodes, tasks, sub_transaction.cache.get('force')
))
# Attach desired state to the sub transaction, so when we continue
# our top-level transaction, the new state will be calculated on
# top of this.
_dump_expected_state(sub_transaction, context.new, tasks)
with try_transaction(sub_transaction):
message = make_astute_message(
sub_transaction, context, tasks, resolver)
# Once rpc.cast() is called, the message is sent to Astute. By
# that moment all transaction instanced must exist in database,
# otherwise we may get wrong result due to RPC receiver won't
# found entry to update.
db().commit()
rpc.cast('naily', [message])
def _acquire_cluster(self):
cluster = objects.Cluster.get_by_uid(
self.cluster_id, fail_if_not_found=True, lock_for_update=True
)
cluster_tasks = objects.TaskCollection.get_by_cluster_id(
cluster_id=cluster.id
)
cluster_tasks = objects.TaskCollection.filter_by(
cluster_tasks, name=self.task_name
)
cluster_tasks = objects.TaskCollection.filter_by_list(
cluster_tasks,
'status',
[consts.TASK_STATUSES.pending, consts.TASK_STATUSES.running]
)
# TODO(bgaifullin) need new lock approach for cluster
if objects.TaskCollection.count(cluster_tasks):
raise errors.DeploymentAlreadyStarted()
return cluster
def _remove_obsolete_tasks(cluster):
cluster_tasks = objects.TaskCollection.get_cluster_tasks(cluster.id)
cluster_tasks = objects.TaskCollection.order_by(cluster_tasks, 'id')
for task in cluster_tasks:
if task.status in (consts.TASK_STATUSES.ready,
consts.TASK_STATUSES.error):
objects.Task.delete(task)
db().flush()
def _get_nodes_to_run(cluster, ids=None):
# Trying to run tasks on offline nodes will lead to error, since most
# probably MCollective is unreachable. In order to avoid that, we need
# to select only online nodes.
nodes = objects.NodeCollection.filter_by(
None, cluster_id=cluster.id, online=True)
if ids:
nodes = objects.NodeCollection.filter_by_list(nodes, 'id', ids)
return objects.NodeCollection.lock_for_update(
objects.NodeCollection.order_by(nodes, 'id')
).all()
def _get_tasks_to_run(cluster, graph_type, node_resolver, names=None):
tasks = objects.Cluster.get_deployment_tasks(cluster, graph_type)
if objects.Cluster.is_propagate_task_deploy_enabled(cluster):
# TODO(bgaifullin) move this code into Cluster.get_deployment_tasks
# after dependency from role_resolver will be removed
if graph_type == consts.DEFAULT_DEPLOYMENT_GRAPH_TYPE:
plugin_tasks = objects.Cluster.get_legacy_plugin_tasks(cluster)
else:
plugin_tasks = None
tasks = list(legacy_tasks_adapter.adapt_legacy_tasks(
tasks, plugin_tasks, node_resolver
))
if names:
# filter task by names, mark all other task as skipped
task_ids = set(names)
for idx, task in enumerate(tasks):
if (task['id'] not in task_ids and
task['type'] not in consts.INTERNAL_TASKS):
task = task.copy()
task['type'] = consts.ORCHESTRATOR_TASK_TYPES.skipped
tasks[idx] = task
return tasks
def _is_node_for_redeploy(node):
if node is None:
return False
return (
node.pending_addition or
node.status == consts.NODE_STATUSES.discover
)
def _get_current_state(cluster, nodes, tasks, force=False):
# In case of force=True, the current state is {} which means: behave like
# an intial deployment.
if force:
return {}
nodes = {n.uid: n for n in nodes}
nodes[consts.MASTER_NODE_UID] = None
tasks_names = [
t['id'] for t in tasks if t['type'] not in consts.INTERNAL_TASKS
]
txs = objects.TransactionCollection.get_successful_transactions_per_task(
cluster.id, tasks_names, nodes
)
state = {}
for tx, data in itertools.groupby(txs, lambda x: x[0]):
node_ids = []
deferred_state = {}
for _, node_id, task_name in data:
t_state = state.setdefault(task_name, {})
if _is_node_for_redeploy(nodes.get(node_id)):
t_state[node_id] = {}
else:
t_state[node_id] = deferred_state.setdefault(node_id, {})
node_ids.append(node_id)
dict_update(
deferred_state,
objects.Transaction.get_deployment_info(tx, node_uids=node_ids),
level=2
)
return state
def _get_expected_state(cluster, nodes):
info = deployment_serializers.serialize_for_lcm(cluster, nodes)
info = {n['uid']: n for n in info}
# Added cluster state
info[None] = {}
return info
def _dump_expected_state(transaction, state, tasks):
cluster = transaction.cluster
objects.Transaction.attach_deployment_info(transaction, state)
objects.Transaction.attach_tasks_snapshot(transaction, tasks)
objects.Transaction.attach_cluster_settings(
transaction,
{
'editable': objects.Cluster.get_editable_attributes(cluster, True)
})
objects.Transaction.attach_network_settings(
transaction, objects.Cluster.get_network_attributes(cluster))
db().flush()
def _update_nodes(nodes_instances, nodes_params, dry_run=False):
allow_update = {
'name',
'status',
'hostname',
'kernel_params',
'pending_addition',
'pending_deletion',
'error_type',
'error_msg',
'online',
'progress',
}
# dry-run transactions must not update nodes except progress column
if dry_run:
allow_update = {'progress'}
for node in nodes_instances:
node_params = nodes_params.pop(node.uid)
for param in allow_update.intersection(node_params):
setattr(node, param, node_params[param])
# TODO(ikalnitsky): Ensure Astute sends progress=100 in case of error.
if node.status == consts.NODE_STATUSES.error:
node.progress = 100
db.flush()
if nodes_params:
logger.warning(
"The following nodes are not found: %s",
",".join(sorted(nodes_params.keys()))
)
def _update_history(transaction, nodes):
for node in nodes:
if {'deployment_graph_task_name', 'task_status'}.issubset(node.keys()):
objects.DeploymentHistory.update_if_exist(
transaction.id,
node['uid'],
node['deployment_graph_task_name'],
node['task_status'],
node.get('custom'),
)