Merge "Fix getting previous state for deployment"

This commit is contained in:
Jenkins 2016-05-24 09:36:28 +00:00 committed by Gerrit Code Review
commit 0c6b43d57a
4 changed files with 130 additions and 47 deletions

View File

@ -88,31 +88,44 @@ class TransactionCollection(NailgunCollection):
).order_by('-id').limit(1).first()
@classmethod
def get_successful_transactions_per_task(cls, cluster_id, task_names=None):
def get_successful_transactions_per_task(cls, cluster_id,
task_names=None,
nodes_uids=None):
"""Get last successful transaction for every task name.
:param cluster_id: db id of cluster object
:param task_names: list with task names
:returns: [(Transaction, task_name), ...]
:param nodes_uids: db Node uids, which state you need
:returns: [(Transaction, node_id, task_name), ...]
"""
history = models.DeploymentHistory
model = cls.single.model
transactions = db().query(
model, history.deployment_graph_task_name).join(history).filter(
model,
history.node_id,
history.deployment_graph_task_name,
).join(history).filter(
model.cluster_id == cluster_id,
model.name == consts.TASK_NAMES.deployment,
history.status == consts.HISTORY_TASK_STATUSES.ready,
)
if nodes_uids is not None:
transactions = transactions.filter(
history.node_id.in_(nodes_uids),
)
if task_names is not None:
transactions = transactions.filter(
history.deployment_graph_task_name.in_(task_names),
)
transactions = transactions.order_by(
history.deployment_graph_task_name, history.task_id.desc(),
history.deployment_graph_task_name,
history.node_id,
history.task_id.desc(),
).distinct(
history.deployment_graph_task_name
history.deployment_graph_task_name, history.node_id
)
return transactions

View File

@ -16,6 +16,7 @@
import collections
from copy import deepcopy
from itertools import groupby
import os
import netaddr
@ -379,6 +380,13 @@ class ClusterTransaction(DeploymentTask):
consts.ORCHESTRATOR_TASK_TYPES.stage,
}
node_statuses_for_redeploy = {
consts.NODE_STATUSES.discover,
consts.NODE_STATUSES.error,
consts.NODE_STATUSES.provisioned,
consts.NODE_STATUSES.stopped,
}
@classmethod
def get_deployment_methods(cls, cluster):
return ['task_deploy']
@ -400,6 +408,57 @@ class ClusterTransaction(DeploymentTask):
yield task
@classmethod
def is_node_for_redeploy(cls, node):
"""Should node's previous state be cleared.
:param node: db Node object or None
:returns: Bool
"""
if node is None:
return False
return node.status in cls.node_statuses_for_redeploy
@classmethod
def get_current_state(cls, cluster, nodes, tasks):
"""Current state for deployment.
:param cluster: Cluster db object
:param nodes: iterable of Node db objects
:param tasks: list of tasks which state needed
:returns: current state {task_name: {node_uid: <astute.yaml>, ...},}
"""
nodes = {n.uid: n for n in nodes}
nodes[consts.MASTER_NODE_UID] = None
tasks_names = [t['id'] for t in tasks
if t['type'] not in cls.ignored_types]
transactions = list(
objects.TransactionCollection.get_successful_transactions_per_task(
cluster.id, tasks_names, nodes)
)
# sort by transaction.id
transactions.sort(key=lambda x: x[0].id)
state = {}
for transaction, data in groupby(transactions, lambda x: x[0]):
deployment_info = objects.Transaction.get_deployment_info(
transaction)
for _, node_uid, task_name in data:
task_state = state.setdefault(task_name, {})
task_state.setdefault(node_uid, {})
if cls.is_node_for_redeploy(nodes.get(node_uid)):
task_state[node_uid] = {}
else:
task_state[node_uid] = deployment_info.get(node_uid, {})
return state
@classmethod
def task_deploy(cls, transaction, tasks, nodes, force=False,
selected_task_ids=None, **kwargs):
@ -415,27 +474,11 @@ class ClusterTransaction(DeploymentTask):
if selected_task_ids:
tasks = list(cls.mark_skipped(tasks, selected_task_ids))
current_state = {}
if not force:
tasks_names = [t['id'] for t in tasks
if t['type'] not in cls.ignored_types]
transaction_collection = objects.TransactionCollection
transactions = (
transaction_collection.get_successful_transactions_per_task(
transaction.cluster.id, tasks_names)
)
current_state = {
task_id: objects.Transaction.get_deployment_info(tr)
for tr, task_id in transactions
}
# FIXME: https://bugs.launchpad.net/fuel/+bug/1582269
for n in nodes:
if n.status == consts.NODE_STATUSES.provisioned:
for task_deployment_info in current_state.values():
if task_deployment_info.get(n.uid):
task_deployment_info[n.uid] = {}
if force:
current_state = {}
else:
current_state = cls.get_current_state(
transaction.cluster, nodes, tasks)
expected_state = cls._save_deployment_info(
transaction, deployment_info

View File

@ -1410,11 +1410,11 @@ class TestTaskManagers(BaseIntegrationTest):
@mock.patch('nailgun.objects.Cluster.get_deployment_tasks')
@mock.patch('nailgun.objects.TransactionCollection'
'.get_successful_transactions_per_task')
def check_correct_state_calculation(self, provision, state_mock,
tasks_mock, rpc_mock):
def check_correct_state_calculation(self, node_status, is_skip_expected,
state_mock, tasks_mock, rpc_mock):
cluster = self.env.create(
nodes_kwargs=[{'roles': ['controller'],
'status': consts.NODE_STATUSES.provisioned}],
'status': consts.NODE_STATUSES.ready}],
release_kwargs={
'operating_system': consts.RELEASE_OS.ubuntu,
'version': 'mitaka-9.0'
@ -1439,21 +1439,20 @@ class TestTaskManagers(BaseIntegrationTest):
self.set_history_ready()
if provision:
node.status = consts.NODE_STATUSES.provisioned
state_mock.return_value = [(supertask, 'test1')]
node.status = node_status
state_mock.return_value = [(supertask, node.uid, 'test1')]
task = self.env.launch_deployment_selected([node.uid], cluster.id)
self.assertNotEqual(consts.TASK_STATUSES.error, task.status)
tasks_graph = rpc_mock.call_args[0][1]['args']['tasks_graph']
# chek that test1 task skipped by condition and test2 was not
for task in tasks_graph[node.uid]:
if task['id'] == 'test1':
if provision:
self.assertNotEqual(
if is_skip_expected:
self.assertEqual(
task['type'], consts.ORCHESTRATOR_TASK_TYPES.skipped)
else:
self.assertEqual(
self.assertNotEqual(
task['type'], consts.ORCHESTRATOR_TASK_TYPES.skipped)
elif task['id'] == 'test2':
self.assertNotEqual(
@ -1462,10 +1461,20 @@ class TestTaskManagers(BaseIntegrationTest):
self.fail('Unexpected task in graph')
def test_correct_state_calculation(self):
self.check_correct_state_calculation(False)
self.check_correct_state_calculation(
consts.NODE_STATUSES.ready, True)
def test_state_calculation_after_provision(self):
self.check_correct_state_calculation(True)
self.check_correct_state_calculation(
consts.NODE_STATUSES.provisioned, False)
def test_state_calculation_after_stop(self):
self.check_correct_state_calculation(
consts.NODE_STATUSES.stopped, False)
def test_state_calculation_after_rediscover(self):
self.check_correct_state_calculation(
consts.NODE_STATUSES.discover, False)
class TestUpdateDnsmasqTaskManagers(BaseIntegrationTest):

View File

@ -995,12 +995,15 @@ class TestTransactionObject(BaseIntegrationTest):
get_succeed = (
objects.TransactionCollection.get_successful_transactions_per_task
)
uid1 = '1'
uid2 = '2'
tasks_graph = {
None: [
{'id': 'post_deployment_start'},
{'id': 'post_deployment_end'}
],
'1': [{'id': 'dns-client'}]
uid1: [{'id': 'dns-client'}]
}
def make_task_with_history(task_status, graph):
@ -1018,23 +1021,38 @@ class TestTransactionObject(BaseIntegrationTest):
# create some tasks in history
task1 = make_task_with_history('ready', tasks_graph)
transactions = get_succeed(self.cluster.id, ['dns-client']).all()
self.assertEqual(transactions, [(task1, 'dns-client')])
self.assertEqual(transactions, [(task1, uid1, 'dns-client')])
# remove 'dns-client' and add 'test' to graph
tasks_graph['1'] = [{'id': 'test'}]
# remove 'dns-client' and add 'test' to graph for two nodes
tasks_graph[uid1] = tasks_graph[uid2] = [{'id': 'test'}]
task2 = make_task_with_history('ready', tasks_graph)
transactions = get_succeed(self.cluster.id, ['test']).all()
self.assertEqual(transactions, [(task2, 'test')])
self.assertEqual(transactions, [(task2, uid1, 'test'),
(task2, uid2, 'test')])
# remove 'test' and add 'dns-client' to graph
tasks_graph['1'] = [{'id': 'dns-client'}]
# remove 'test' and add 'dns-client' to graph, leave node2 as previous
tasks_graph[uid1] = [{'id': 'dns-client'}]
task3 = make_task_with_history('ready', tasks_graph)
transactions = get_succeed(self.cluster.id,
['dns-client', 'test']).all()
# now we should find both `test` and `dns-client` transactions
self.assertEqual(transactions,
[(task3, 'dns-client'), (task2, 'test')])
# on node 1 and onle `test` on node 2
self.assertEqual(
transactions,
[(task3, uid1, 'dns-client'),
(task2, uid1, 'test'),
(task3, uid2, 'test')]
)
# filter out node 2
transactions = get_succeed(self.cluster.id,
['dns-client', 'test'], [uid1]).all()
self.assertEqual(
transactions,
[(task3, uid1, 'dns-client'),
(task2, uid1, 'test')]
)
class TestActionLogObject(BaseIntegrationTest):