Fix obtaining current state in ClusterTransaction

Now we get deployment state from DeploymentHistory model. For every task
we get last success transaction and its state.

Change-Id: I2288bc2bc34023c2ca705f1d3cc6ff48347bf549
Closes-bug: #1572226
This commit is contained in:
Nikita Zubkov 2016-04-25 19:32:57 +03:00
parent 93eb8fec2a
commit 364df8addd
11 changed files with 264 additions and 24 deletions

View File

@ -14,15 +14,15 @@
"""Fuel 10.0
Revision ID: 675105097a69
Revises: 11a9adc6d36a
Revision ID: c6edea552f1e
Revises: 675105097a69
Create Date: 2016-04-08 15:20:43.989472
"""
# revision identifiers, used by Alembic.
revision = '675105097a69'
down_revision = '11a9adc6d36a'
revision = 'c6edea552f1e'
down_revision = '675105097a69'
def upgrade():

View File

@ -0,0 +1,47 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Fuel 9.0.1
Revision ID: 675105097a69
Revises: 11a9adc6d36a
Create Date: 2016-04-28 22:23:40.895589
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = '675105097a69'
down_revision = '11a9adc6d36a'
def upgrade():
upgrade_deployment_history()
def downgrade():
downgrade_deployment_history()
def upgrade_deployment_history():
op.create_index('deployment_history_task_name_status_idx',
'deployment_history',
['deployment_graph_task_name', 'status'])
def downgrade_deployment_history():
op.drop_index('deployment_history_task_name_status_idx',
'deployment_history')

View File

@ -29,6 +29,8 @@ class DeploymentHistory(Base):
__table_args__ = (
sa.Index('deployment_history_task_id_and_status',
'task_id', 'status'),
sa.Index('deployment_history_task_name_status_idx',
'deployment_graph_task_name', 'status'),
sa.UniqueConstraint(
'task_id',
'node_id',

View File

@ -15,11 +15,18 @@
class TransactionContext(object):
def __init__(self, new_state, old_state=None):
"""Wrapper around current and previous state of a transaction
:param new_state: new state of cluster
{node_id: <deployment info>, ...}
:param old_state: old state of cluster per task name or None
{task_id: {node_id: <deployment info>, ...}, ...}
"""
self.new = new_state
self.old = old_state or {}
def get_new_data(self, node_id):
return self.new[node_id]
def get_old_data(self, node_id):
return self.old.get(node_id)
def get_old_data(self, node_id, task_id):
return self.old.get(task_id, {}).get(node_id)

View File

@ -64,10 +64,10 @@ class Context(object):
def get_new_data(self, node_id):
return self._transaction.get_new_data(node_id)
def get_yaql_interpreter(self, node_id):
def get_yaql_interpreter(self, node_id, task_id):
context = self._yaql_context.create_child_context()
context['$%new'] = self._transaction.get_new_data(node_id)
context['$%old'] = self._transaction.get_old_data(node_id)
context['$%old'] = self._transaction.get_old_data(node_id, task_id)
cache = self._yaql_expressions_cache
def evaluate(expression):
@ -187,7 +187,8 @@ class DefaultTaskSerializer(NoopTaskSerializer):
utils.text_format_safe,
self.context.get_formatter_context(node_id),
{
'yaql_exp': self.context.get_yaql_interpreter(node_id)
'yaql_exp': self.context.get_yaql_interpreter(
node_id, self.task_template['id'])
}
)
if not self.should_execute(task, node_id):

View File

@ -86,3 +86,33 @@ class TransactionCollection(NailgunCollection):
None, cluster_id=cluster.id, name=consts.TASK_NAMES.deployment,
status=consts.TASK_STATUSES.ready
).order_by('-id').limit(1).first()
@classmethod
def get_successful_transactions_per_task(cls, cluster_id, task_names=None):
"""Get last successful transaction for every task name.
:param cluster_id: db id of cluster object
:param task_names: list with task names
:returns: [(Transaction, task_name), ...]
"""
history = models.DeploymentHistory
model = cls.single.model
transactions = db().query(
model, history.deployment_graph_task_name).join(history).filter(
model.cluster_id == cluster_id,
model.name == consts.TASK_NAMES.deployment,
history.status == consts.HISTORY_TASK_STATUSES.ready,
)
if task_names is not None:
transactions = transactions.filter(
history.deployment_graph_task_name.in_(task_names),
)
transactions = transactions.order_by(
history.deployment_graph_task_name, history.task_id.desc(),
).distinct(
history.deployment_graph_task_name
)
return transactions

View File

@ -394,25 +394,39 @@ class ClusterTransaction(DeploymentTask):
transaction.cluster, nodes
)
logger.info("cluster serialization is finished.")
if force:
current_state = {}
else:
current_state = objects.Transaction.get_deployment_info(
objects.TransactionCollection.get_last_succeed_run(
transaction.cluster
)
if selected_task_ids:
tasks = list(cls.mark_skipped(tasks, selected_task_ids))
current_state = {}
if not force:
ignored_types = {
consts.ORCHESTRATOR_TASK_TYPES.skipped,
consts.ORCHESTRATOR_TASK_TYPES.group,
consts.ORCHESTRATOR_TASK_TYPES.stage,
}
tasks_names = [t['id'] for t in tasks
if t['type'] not in ignored_types]
transaction_collection = objects.TransactionCollection
transactions = (
transaction_collection.get_successful_transactions_per_task(
transaction.cluster.id, tasks_names)
)
current_state = {
task_id: objects.Transaction.get_deployment_info(tr)
for tr, task_id in transactions
}
expected_state = cls._save_deployment_info(
transaction, deployment_info
)
context = lcm.TransactionContext(expected_state, current_state)
logger.debug("tasks serialization is started.")
# TODO(bgaifullin) Primary roles applied in deployment_serializers
# need to move this code from deployment serializer
# also role resolver should be created after serialization completed
if selected_task_ids:
tasks = cls.mark_skipped(tasks, selected_task_ids)
role_resolver = RoleResolver(nodes)
cluster = transaction.cluster

View File

@ -56,6 +56,10 @@ class TestTaskManagers(BaseIntegrationTest):
self.assertEqual(task_.cluster_id, None)
self.assertNotEqual(task_.deleted_at, None)
def set_history_ready(self):
objects.DeploymentHistoryCollection.all().update(
{'status': consts.HISTORY_TASK_STATUSES.ready})
@fake_tasks(override_state={"progress": 100, "status": "ready"})
def test_deployment_task_managers(self):
cluster = self.env.create(
@ -1232,6 +1236,7 @@ class TestTaskManagers(BaseIntegrationTest):
self.env.set_task_status_recursively(
supertask, consts.TASK_STATUSES.ready
)
self.set_history_ready()
self.db.flush()
tasks_graph = rpc_mock.call_args[0][1][0]['args']['tasks_graph']
self.assertEqual('puppet', tasks_graph['master'][0]['type'])
@ -1247,6 +1252,7 @@ class TestTaskManagers(BaseIntegrationTest):
tasks_graph = rpc_mock.call_args[0][1][0]['args']['tasks_graph']
self.assertEqual('skipped', tasks_graph['master'][0]['type'])
supertask.status = consts.TASK_STATUSES.ready
self.set_history_ready()
self.db.flush()
# force cluster re-deployment, the task should not be skipped
@ -1285,7 +1291,7 @@ class TestTaskManagers(BaseIntegrationTest):
)
@mock.patch('nailgun.task.task.rpc.cast')
@mock.patch('objects.Cluster.get_deployment_tasks')
@mock.patch('nailgun.objects.Cluster.get_deployment_tasks')
def test_only_certain_tasks_run_in_deploy(self, tasks_mock, rpc_mock):
task = {
'id': 'test', 'parameters': {}, 'type': 'puppet',
@ -1300,7 +1306,7 @@ class TestTaskManagers(BaseIntegrationTest):
tasks_mock.return_value = tasks
self.env.create(
cluster = self.env.create(
nodes_kwargs=[
{'status': NODE_STATUSES.provisioned, 'roles': ['controller']},
{'status': NODE_STATUSES.provisioned, 'roles': ['compute']},
@ -1311,7 +1317,6 @@ class TestTaskManagers(BaseIntegrationTest):
'version': 'mitaka-9.0',
},
)
cluster = self.env.clusters[-1]
task_ids = ['test0', 'test3']
task = self.env.launch_deployment_selected_tasks(
[n.uid for n in cluster.nodes],
@ -1352,6 +1357,54 @@ class TestTaskManagers(BaseIntegrationTest):
tasks_graph
)
@mock.patch('nailgun.task.task.rpc.cast')
@mock.patch('nailgun.objects.Cluster.get_deployment_tasks')
@mock.patch('nailgun.objects.TransactionCollection'
'.get_successful_transactions_per_task')
def test_correct_state_calculation(self, state_mock, tasks_mock, rpc_mock):
cluster = self.env.create(
nodes_kwargs=[{'roles': ['controller'],
'status': consts.NODE_STATUSES.provisioned}],
release_kwargs={
'operating_system': consts.RELEASE_OS.ubuntu,
'version': 'mitaka-9.0'
}
)
node = cluster.nodes[0]
task = {
'parameters': {}, 'type': 'puppet',
'roles': ['master'], 'version': '2.1.0',
'condition': {'yaql_exp': 'changed($)'},
}
tasks_mock.return_value = [
dict(task, id='test1'), dict(task, id='test2')
]
state_mock.return_value = []
# deploy cluster at first time and create history
supertask = self.env.launch_deployment_selected([node.uid], cluster.id)
self.assertNotEqual(consts.TASK_STATUSES.error, supertask.status)
self.set_history_ready()
state_mock.return_value = [(supertask, 'test1')]
task = self.env.launch_deployment_selected([node.uid], cluster.id)
self.assertNotEqual(consts.TASK_STATUSES.error, task.status)
tasks_graph = rpc_mock.call_args[0][1]['args']['tasks_graph']
# chek that test1 task skipped by condition and test2 was not
for task in tasks_graph['master']:
if task['id'] == 'test1':
self.assertEqual(
task['type'], consts.ORCHESTRATOR_TASK_TYPES.skipped)
elif task['id'] == 'test2':
self.assertNotEqual(
task['type'], consts.ORCHESTRATOR_TASK_TYPES.skipped)
else:
self.fail('Unexpected task in graph')
class TestUpdateDnsmasqTaskManagers(BaseIntegrationTest):

View File

@ -17,8 +17,8 @@ import alembic
from nailgun.db import dropdb
from nailgun.db.migration import ALEMBIC_CONFIG
_prepare_revision = '11a9adc6d36a'
_test_revision = '675105097a69'
_prepare_revision = '675105097a69'
_test_revision = 'c6edea552f1e'
def setup_module():

View File

@ -0,0 +1,40 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import alembic
from nailgun.db import dropdb
from nailgun.db.migration import ALEMBIC_CONFIG
from nailgun.test import base
_prepare_revision = '11a9adc6d36a'
_test_revision = '675105097a69'
def setup_module():
dropdb()
alembic.command.upgrade(ALEMBIC_CONFIG, _prepare_revision)
prepare()
alembic.command.upgrade(ALEMBIC_CONFIG, _test_revision)
def prepare():
pass
class TestDeploymentHistoryMigration(base.BaseAlembicMigrationTest):
def test_history_has_task_name_status_idx_index(self):
tbl = self.meta.tables['deployment_history']
self.assertIn('deployment_history_task_name_status_idx',
[i.name for i in tbl.indexes])

View File

@ -999,6 +999,52 @@ class TestTransactionObject(BaseIntegrationTest):
)
self.assertIsNone(objects.Transaction.get_network_settings(None))
def test_get_successful_transactions_per_task(self):
history_collection = objects.DeploymentHistoryCollection
get_succeed = (
objects.TransactionCollection.get_successful_transactions_per_task
)
tasks_graph = {
None: [
{'id': 'post_deployment_start'},
{'id': 'post_deployment_end'}
],
'1': [{'id': 'dns-client'}]
}
def make_task_with_history(task_status, graph):
task = self.env.create_task(
name=consts.TASK_NAMES.deployment,
status=task_status,
cluster_id=self.cluster.id)
history_collection.create(task, graph)
history_collection.all().update(
{'status': consts.HISTORY_TASK_STATUSES.ready})
return task
# create some tasks in history
task1 = make_task_with_history('ready', tasks_graph)
transactions = get_succeed(self.cluster.id, ['dns-client']).all()
self.assertEqual(transactions, [(task1, 'dns-client')])
# remove 'dns-client' and add 'test' to graph
tasks_graph['1'] = [{'id': 'test'}]
task2 = make_task_with_history('ready', tasks_graph)
transactions = get_succeed(self.cluster.id, ['test']).all()
self.assertEqual(transactions, [(task2, 'test')])
# remove 'test' and add 'dns-client' to graph
tasks_graph['1'] = [{'id': 'dns-client'}]
task3 = make_task_with_history('ready', tasks_graph)
transactions = get_succeed(self.cluster.id,
['dns-client', 'test']).all()
# now we should find both `test` and `dns-client` transactions
self.assertEqual(transactions,
[(task3, 'dns-client'), (task2, 'test')])
class TestActionLogObject(BaseIntegrationTest):