Added adaptation of legacy tasks for task based deployment

The new option 'propagate_task_deploy' was added to cluster.
this option allows to use legacy task adapatation algorithm
to make tasks from granular deployment LCM ready on the flight.
Also the same aproach is used for adaptation legacy plugin tasks.

Change-Id: Ib212bd906acc0e6915e3c14e4741b306bdedaa98
Closes-Bug: 1572276
(cherry picked from commit 5fb792f187)
This commit is contained in:
Bulat Gaifullin 2016-04-18 18:05:11 +03:00
parent c63c84ab81
commit 54e1f64b7e
13 changed files with 574 additions and 7 deletions

View File

@ -1054,17 +1054,23 @@
type: "checkbox"
auth_key:
value: ""
label: "Public Key"
# label: "Public Key"
# description: "Public key(s) to include in authorized_keys on deployed nodes"
group: "security"
description: "Public key(s) to include in authorized_keys on deployed nodes"
weight: 70
type: "hidden"
task_deploy:
value: true
label: "Enable task based deploy"
description: "The new deployment engine based on cross-node dependencies for deployment tasks."
# label: "Enable task based deploy"
# description: "The new deployment engine based on cross-node dependencies for deployment tasks."
weight: 11
type: "hidden"
propagate_task_deploy:
value: false
# label: "Propagate task based deployment."
# description: "Enables adaptation of granular tasks for task deployment."
weight: 12
type: "hidden"
public_network_assignment:
metadata:

View File

@ -80,7 +80,9 @@ class TransactionSerializer(object):
version = StrictVersion(task.get('version', '0.0.0'))
if version < cls.min_supported_task_version:
message = (
"Task '{0}' does not support cross-dependencies."
"Task '{0}' does not support cross-dependencies.\n"
"You can enable option 'propagate_task_deploy'"
"for cluster to use task adaptation mechanism."
.format(task['id'])
)
logger.warning(message)

View File

@ -1074,6 +1074,11 @@ class Cluster(NailgunObject):
cluster_deployment_tasks
])
@classmethod
def get_legacy_plugin_tasks(cls, instance):
"""Get legacy deployment tasks from tasks.yaml."""
return PluginManager.get_legacy_tasks_for_cluster(instance)
@classmethod
def get_refreshable_tasks(cls, instance, filter_by_configs=None):
"""Return list of refreshable tasks
@ -1450,9 +1455,19 @@ class Cluster(NailgunObject):
:param instance: cluster for checking
:type instance: nailgun.db.sqlalchemy.models.Cluster instance
"""
attrs = cls.get_editable_attributes(instance, False)
attrs = cls.get_editable_attributes(instance)
return attrs['common'].get('task_deploy', {}).get('value')
@classmethod
def is_propagate_task_deploy_enabled(cls, instance):
"""Tests that task based deployment propagation enabled.
:param instance: cluster for checking
:type instance: nailgun.db.sqlalchemy.models.Cluster instance
"""
attrs = cls.get_editable_attributes(instance)
return attrs['common'].get('propagate_task_deploy', {}).get('value')
# FIXME(aroma): remove updating of 'deployed_before'
# when stop action is reworked. 'deployed_before'
# flag identifies whether stop action is allowed for the

View File

@ -160,6 +160,13 @@ class PluginAdapterBase(object):
else:
self._tasks = self._load_tasks()
slave_path = self.slaves_scripts_path
for task in self._tasks:
task['roles'] = task['role']
parameters = task.get('parameters')
if parameters is not None:
parameters.setdefault('cwd', slave_path)
return self._tasks
@property

View File

@ -370,3 +370,15 @@ class PluginManager(object):
if cluster_components & plugin_components:
ClusterPlugins.set_attributes(
cluster.id, plugin.id, enabled=True)
@classmethod
def get_legacy_tasks_for_cluster(cls, cluster):
"""Gets the tasks from tasks.yaml for all plugins.
:param cluster: the cluster object
:return: all tasks from tasks.yaml
"""
tasks = []
for plugin in cls.get_enabled_plugins(cluster):
tasks.extend(plugin.tasks)
return tasks

View File

@ -52,6 +52,8 @@ class InstallationInfo(object):
'resume_guests_state_on_host_boot', None),
WhiteListRule(('common', 'task_deploy', 'value'),
'task_deploy', None),
WhiteListRule(('common', 'propagate_task_deploy', 'value'),
'propagate_task_deploy', None),
WhiteListRule(('corosync', 'verified', 'value'),
'corosync_verified', None),

View File

@ -0,0 +1,185 @@
# -*- coding: utf-8 -*-
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
from distutils.version import StrictVersion
import itertools
from nailgun import consts
from nailgun.logger import logger
from nailgun.orchestrator.orchestrator_graph import GraphSolver
TASK_START_TEMPLATE = '{0}_start'
TASK_END_TEMPLATE = '{0}_end'
def _get_role(task):
return task.get('roles', task.get('groups'))
def _get_task_stage(task):
return task['stage'].split('/')[0]
def _get_task_stage_and_priority(task):
stage_list = task['stage'].split('/')
stage = stage_list[0]
priority = stage_list[-1] if len(stage_list) > 1 else 0
try:
priority = float(priority)
except ValueError:
logger.warn(
'Task %s has non numeric priority "%s", set to 0',
task, priority)
priority = 0
return stage, priority
def _join_groups(groups):
for group in groups.values():
for req in group.get('requires', ()):
if req in groups:
group['cross_depends'].append({
'name': TASK_END_TEMPLATE.format(req),
'role': _get_role(groups[req])
})
for req in group.get('required_for', ()):
if req in groups:
groups[req]['cross_depends'].append({
'name': TASK_END_TEMPLATE.format(group['id']),
'role': _get_role(group)
})
def _get_group_start(group):
return {
'id': TASK_START_TEMPLATE.format(group['id']),
'type': consts.ORCHESTRATOR_TASK_TYPES.skipped,
'version': consts.TASK_CROSS_DEPENDENCY,
'roles': _get_role(group),
'cross_depends': group['cross_depends'],
'cross_depended_by': [{
'name': TASK_END_TEMPLATE.format(group['id']), 'role': 'self'
}],
}
def _get_group_end(group):
return {
'id': TASK_END_TEMPLATE.format(group['id']),
'type': consts.ORCHESTRATOR_TASK_TYPES.skipped,
'version': consts.TASK_CROSS_DEPENDENCY,
'roles': _get_role(group)
}
def _join_task_to_group(task, groups):
task['version'] = consts.TASK_CROSS_DEPENDENCY
# add only depends to start, because depends to end already added
task['cross_depends'] = [
{'name': TASK_START_TEMPLATE.format(g), 'role': 'self'} for g in groups
]
return task
def adapt_legacy_tasks(deployment_tasks, legacy_plugin_tasks, role_resolver):
"""Adapt the legacy tasks to execute with Task Based Engine.
:param deployment_tasks: the list of deployment tasks
:param legacy_plugin_tasks: the pre/post tasks from tasks.yaml
:param role_resolver: the RoleResolver instance
"""
min_task_version = StrictVersion(consts.TASK_CROSS_DEPENDENCY)
groups = {}
sync_points = GraphSolver()
legacy_tasks = []
for task in deployment_tasks:
task_type = task.get('type')
task_version = StrictVersion(task.get('version', '0.0.0'))
if task_type == consts.ORCHESTRATOR_TASK_TYPES.group:
groups[task['id']] = dict(task, cross_depends=[])
elif task_type == consts.ORCHESTRATOR_TASK_TYPES.stage:
sync_points.add_task(task)
else:
task = task.copy()
required_for = copy.copy(task.get('required_for', []))
required_for.extend(
TASK_END_TEMPLATE.format(x)
for x in role_resolver.get_all_roles(_get_role(task))
)
task['required_for'] = required_for
if task_version < min_task_version:
legacy_tasks.append(task)
continue
yield task
if not (legacy_tasks or legacy_plugin_tasks):
return
_join_groups(groups)
# make bubbles from each group
for group in groups.values():
yield _get_group_start(group)
yield _get_group_end(group)
# put legacy tasks into bubble
for task in legacy_tasks:
logger.warning("Added cross_depends for legacy task: %s", task['id'])
yield _join_task_to_group(
task, role_resolver.get_all_roles(_get_role(task))
)
if not legacy_plugin_tasks:
return
# process tasks from stages
legacy_plugin_tasks.sort(key=_get_task_stage_and_priority)
tasks_per_stage = itertools.groupby(
legacy_plugin_tasks, key=_get_task_stage
)
for stage, tasks in tasks_per_stage:
sync_point_name = TASK_END_TEMPLATE.format(stage)
cross_depends = [{'name': sync_point_name, 'role': None}]
successors = sync_points.successors(sync_point_name)
if successors:
logger.debug(
'The next stage is found for %s: %s',
sync_point_name, successors[0]
)
cross_depended_by = [{'name': successors[0], 'role': None}]
else:
logger.debug(
'The next stage is not found for %s.', sync_point_name
)
cross_depended_by = []
for idx, task in enumerate(tasks):
new_task = {
'id': '{0}_{1}'.format(stage, idx),
'type': task['type'],
'roles': _get_role(task),
'version': consts.TASK_CROSS_DEPENDENCY,
'cross_depends': cross_depends,
'cross_depended_by': cross_depended_by,
'parameters': task.get('parameters', {}),
'condition': task.get('condition', True)
}
cross_depends = [
{'name': new_task['id'], 'role': new_task['roles']}
]
yield new_task

View File

@ -28,7 +28,6 @@ from sqlalchemy import not_
from sqlalchemy.orm import ColumnProperty
from sqlalchemy.orm import object_mapper
from nailgun import consts
from nailgun.db import db
from nailgun.db.sqlalchemy.models import CapacityLog
@ -52,6 +51,7 @@ import nailgun.rpc as rpc
from nailgun.settings import settings
from nailgun.task.fake import FAKE_THREADS
from nailgun.task.helpers import TaskHelper
from nailgun.task.legacy_tasks_adapter import adapt_legacy_tasks
from nailgun.utils import logs as logs_utils
from nailgun.utils.restrictions import VmwareAttributesRestriction
from nailgun.utils.role_resolver import RoleResolver
@ -413,6 +413,16 @@ class ClusterTransaction(DeploymentTask):
tasks = cls.mark_skipped(tasks, selected_task_ids)
role_resolver = RoleResolver(nodes)
cluster = transaction.cluster
if objects.Cluster.is_propagate_task_deploy_enabled(cluster):
logger.info("The legacy tasks adaptation is used.")
tasks = adapt_legacy_tasks(
tasks,
objects.Cluster.get_legacy_plugin_tasks(cluster),
role_resolver,
)
directory, graph = lcm.TransactionSerializer.serialize(
context,
tasks,

View File

@ -206,6 +206,40 @@ class TestTaskManagers(BaseIntegrationTest):
[x['id'] for x in tasks_graph[cluster.nodes[1].uid]]
)
@fake_tasks()
@mock.patch('nailgun.lcm.transaction_serializer.settings',
LCM_CHECK_TASK_VERSION=True)
@mock.patch('objects.Cluster.get_deployment_tasks')
@mock.patch('objects.Cluster.is_propagate_task_deploy_enabled')
def test_adaptation_legacy_tasks(self, propagate_mock, tasks_mock, _):
tasks_mock.return_value = [
{
'id': 'task', 'parameters': {}, 'type': 'puppet',
'roles': ['controller'], 'version': '1.0.0',
},
{
'id': 'controller', 'type': 'group', 'roles': ['controller']
}
]
self.env.create(
nodes_kwargs=[
{"pending_addition": True, "pending_roles": ['controller']},
{"pending_addition": True, "pending_roles": ['controller']},
],
release_kwargs={
'operating_system': consts.RELEASE_OS.ubuntu,
'version': 'liberty-9.0',
}
)
cluster = self.env.clusters[-1]
propagate_mock.return_value = False
supertask = self.env.launch_deployment(cluster.id)
self.assertEqual(TASK_STATUSES.error, supertask.status)
self.assertIn("Task 'task'", supertask.message)
propagate_mock.return_value = True
supertask = self.env.launch_deployment(cluster.id)
self.assertEqual(TASK_STATUSES.ready, supertask.status)
@fake_tasks(fake_rpc=False, mock_rpc=True)
def test_write_action_logs(self, _):
self.env.create(

View File

@ -0,0 +1,233 @@
# -*- coding: utf-8 -*-
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the 'License'); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from nailgun import consts
from nailgun.task.legacy_tasks_adapter import adapt_legacy_tasks
from nailgun.test.base import BaseUnitTest
class TestLegacyTasksAdapter(BaseUnitTest):
@classmethod
def setUpClass(cls):
super(TestLegacyTasksAdapter, cls).setUpClass()
cls.role_resolver = mock.MagicMock()
cls.role_resolver.get_all_roles.side_effect = lambda x: set(x)
def test_returns_same_task_if_no_legacy(self):
tasks = [
{'id': 'test1', 'version': '2.0.0', 'roles': ['group1'],
'type': consts.ORCHESTRATOR_TASK_TYPES.puppet,
'required_for': []},
{'id': 'group1', 'type': consts.ORCHESTRATOR_TASK_TYPES.group},
{'id': 'stage1', 'type': consts.ORCHESTRATOR_TASK_TYPES.stage}
]
new_tasks = list(adapt_legacy_tasks(tasks, None, self.role_resolver))
self.datadiff(tasks, new_tasks, ignore_keys='required_for')
self.assertEqual([], tasks[0]['required_for'])
self.assertEqual(
['group1_end'], new_tasks[0]['required_for']
)
def test_legacy_deployment_task_adaptation(self):
tasks = [
{'id': 'task1', 'version': '2.0.0', 'roles': 'group1',
'type': consts.ORCHESTRATOR_TASK_TYPES.puppet},
{'id': 'task2', 'roles': ['group2'],
'type': consts.ORCHESTRATOR_TASK_TYPES.puppet},
{'id': 'group1', 'roles': ['group1'],
'type': consts.ORCHESTRATOR_TASK_TYPES.group,
'requires': ['stage1'], 'required_for': ['group2']},
{'id': 'group3', 'roles': ['group3'], 'requires': ['group1'],
'type': consts.ORCHESTRATOR_TASK_TYPES.group, },
{'id': 'group2', 'type': consts.ORCHESTRATOR_TASK_TYPES.group,
'roles': ['group2'], 'required_for': ['stage2']},
{'id': 'stage1', 'type': consts.ORCHESTRATOR_TASK_TYPES.stage},
{'id': 'stage2', 'type': consts.ORCHESTRATOR_TASK_TYPES.stage}
]
self.role_resolver.get_all_roles.side_effect = lambda x: set(x)
new_tasks = list(adapt_legacy_tasks(tasks, [], self.role_resolver))
self.assertEqual(
{
'id': 'group1_start',
'type': consts.ORCHESTRATOR_TASK_TYPES.skipped,
'version': consts.TASK_CROSS_DEPENDENCY,
'roles': ['group1'],
'cross_depends': [],
'cross_depended_by': [{'name': 'group1_end', 'role': 'self'}]
},
next(x for x in new_tasks if x['id'] == 'group1_start')
)
self.assertEqual(
{
'id': 'group1_end',
'type': consts.ORCHESTRATOR_TASK_TYPES.skipped,
'version': consts.TASK_CROSS_DEPENDENCY,
'roles': ['group1']
},
next(x for x in new_tasks if x['id'] == 'group1_end')
)
self.assertEqual(
{
'id': 'group2_start',
'type': consts.ORCHESTRATOR_TASK_TYPES.skipped,
'version': consts.TASK_CROSS_DEPENDENCY,
'roles': ['group2'],
'cross_depends': [{'name': 'group1_end', 'role': ['group1']}],
'cross_depended_by': [{'name': 'group2_end', 'role': 'self'}]
},
next(x for x in new_tasks if x['id'] == 'group2_start')
)
self.assertEqual(
{
'id': 'group2_end',
'type': consts.ORCHESTRATOR_TASK_TYPES.skipped,
'version': consts.TASK_CROSS_DEPENDENCY,
'roles': ['group2']
},
next(x for x in new_tasks if x['id'] == 'group2_end')
)
self.assertEqual(
{
'id': 'group3_start',
'type': consts.ORCHESTRATOR_TASK_TYPES.skipped,
'version': consts.TASK_CROSS_DEPENDENCY,
'roles': ['group3'],
'cross_depends': [{'name': 'group1_end', 'role': ['group1']}],
'cross_depended_by': [{'name': 'group3_end', 'role': 'self'}]
},
next(x for x in new_tasks if x['id'] == 'group3_start')
)
self.assertEqual(
{
'id': 'group3_end',
'type': consts.ORCHESTRATOR_TASK_TYPES.skipped,
'version': consts.TASK_CROSS_DEPENDENCY,
'roles': ['group3']
},
next(x for x in new_tasks if x['id'] == 'group3_end')
)
self.assertEqual(
{
'id': 'task2',
'type': consts.ORCHESTRATOR_TASK_TYPES.puppet,
'version': '2.0.0',
'roles': ['group2'],
'required_for': ['group2_end'],
'cross_depends': [{'name': 'group2_start', 'role': 'self'}],
},
next(x for x in new_tasks if x['id'] == 'task2')
)
def test_legacy_plugin_tasks_adaptation(self):
tasks = [
{'id': 'task1', 'version': '2.0.0', 'roles': 'group1',
'type': consts.ORCHESTRATOR_TASK_TYPES.puppet},
{'id': 'group1', 'roles': ['group1'],
'type': consts.ORCHESTRATOR_TASK_TYPES.group,
'requires': ['stage1'], 'required_for': ['stage2']},
{'id': 'stage1_start',
'type': consts.ORCHESTRATOR_TASK_TYPES.stage},
{'id': 'stage1_end', 'requires': 'stage1_start',
'type': consts.ORCHESTRATOR_TASK_TYPES.stage},
{'id': 'stage2_start', 'requires': ['stage1_end'],
'type': consts.ORCHESTRATOR_TASK_TYPES.stage},
{'id': 'stage2_end', 'requires': ['stage2_start'],
'type': consts.ORCHESTRATOR_TASK_TYPES.stage},
{'id': 'stage3_start', 'requires': ['stage2_end'],
'type': consts.ORCHESTRATOR_TASK_TYPES.stage},
{'id': 'stage3_end', 'requires': ['stage3_start'],
'type': consts.ORCHESTRATOR_TASK_TYPES.stage}
]
legacy_plugin_tasks = [
{
'roles': '*',
'stage': 'stage1',
'type': consts.ORCHESTRATOR_TASK_TYPES.puppet,
'parameters': {'number': 0}
},
{
'roles': '*',
'stage': 'stage1/100',
'type': consts.ORCHESTRATOR_TASK_TYPES.puppet,
'parameters': {'number': 2}
},
{
'roles': '*',
'stage': 'stage1/10',
'type': consts.ORCHESTRATOR_TASK_TYPES.puppet,
'parameters': {'number': 1}
},
{
'roles': '*',
'stage': 'stage3/100',
'type': consts.ORCHESTRATOR_TASK_TYPES.puppet,
'parameters': {'number': 1}
},
{
'roles': 'group1',
'stage': 'stage3',
'type': consts.ORCHESTRATOR_TASK_TYPES.puppet,
'parameters': {'number': 0}
}
]
new_tasks = list(adapt_legacy_tasks(
tasks, legacy_plugin_tasks, self.role_resolver
))
stage1_tasks = new_tasks[-5:-2]
depends = [{'role': None, 'name': 'stage1_end'}]
depended_by = [{'role': None, 'name': 'stage2_start'}]
for idx, task in enumerate(stage1_tasks):
self.assertEqual(
{
'id': 'stage1_{0}'.format(idx),
'type': legacy_plugin_tasks[idx]['type'],
'roles': legacy_plugin_tasks[idx]['roles'],
'version': consts.TASK_CROSS_DEPENDENCY,
'cross_depends': depends,
'cross_depended_by': depended_by,
'condition': True,
'parameters': {'number': idx}
},
task
)
depends = [{'role': task['roles'], 'name': task['id']}]
stage3_tasks = new_tasks[-2:]
depends = [{'role': None, 'name': 'stage3_end'}]
depended_by = []
for idx, task in enumerate(stage3_tasks):
self.assertEqual(
{
'id': 'stage3_{0}'.format(idx),
'type': legacy_plugin_tasks[3 + idx]['type'],
'roles': legacy_plugin_tasks[3 + idx]['roles'],
'version': consts.TASK_CROSS_DEPENDENCY,
'cross_depends': depends,
'cross_depended_by': depended_by,
'condition': True,
'parameters': {'number': idx}
},
task
)
depends = [{'role': task['roles'], 'name': task['id']}]

View File

@ -182,6 +182,26 @@ class TestPluginBase(base.BaseTestCase):
def _find_path(self, config_name):
return '{0}.yaml'.format(config_name)
def test_plugin_adapter_get_tasks(self):
self.plugin.tasks = [
{
'role': '*',
'stage': 'stage3/100',
'type': consts.ORCHESTRATOR_TASK_TYPES.puppet,
'parameters': {}
},
{
'role': 'controller',
'stage': 'stage3/100',
'type': consts.ORCHESTRATOR_TASK_TYPES.shell,
}
]
tasks = self.plugin_adapter.get_tasks()
for task in tasks:
self.assertEqual(task['role'], task['roles'])
if 'parameters' in task:
self.assertIn('cwd', task['parameters'])
class TestPluginV1(TestPluginBase):

View File

@ -86,6 +86,22 @@ class TestPatternBasedRoleResolver(BaseUnitTest):
self.assertEqual(1, len(any_node))
self.assertTrue(any_node.issubset(all_nodes))
def test_get_all_roles(self):
resolver = role_resolver.RoleResolver(self.nodes)
all_roles = {r for roles in self.roles_of_nodes for r in roles}
self.assertEqual(all_roles, resolver.get_all_roles())
self.assertEqual(all_roles, resolver.get_all_roles(
consts.TASK_ROLES.all
))
self.assertEqual(
{'controller', 'primary-controller'},
resolver.get_all_roles("/.*controller/")
)
self.assertEqual(
{'compute', "cinder"},
resolver.get_all_roles(["compute", "cinder", "cinder2"])
)
class TestNullResolver(BaseUnitTest):
def test_resolve(self):

View File

@ -43,6 +43,14 @@ class BaseRoleResolver(object):
:return: the unique set of nodes
"""
@abc.abstractmethod
def get_all_roles(self, pattern=None):
"""Gets all roles by pattern if pattern is specified.
:param pattern: option pattern to match role
:return: the all roles that forth pattern
"""
class NullResolver(BaseRoleResolver):
"""The implementation of RoleResolver
@ -55,6 +63,9 @@ class NullResolver(BaseRoleResolver):
def resolve(self, roles, policy=None):
return self.nodes_ids
def get_all_roles(self, pattern=None):
return []
class RoleResolver(BaseRoleResolver):
"""The general role resolver.
@ -119,3 +130,17 @@ class RoleResolver(BaseRoleResolver):
roles, policy, result
)
return result
def get_all_roles(self, pattern=None):
if pattern is None or pattern == consts.TASK_ROLES.all:
return set(self.__mapping)
if isinstance(pattern, six.string_types):
pattern = [pattern]
result = set()
if isinstance(pattern, (list, tuple, set)):
for p in pattern:
p = NameMatchingPolicy.create(p)
result.update(r for r in self.__mapping if p.match(r))
return result