Added fault_tolerance_group to deployment metadata

This property contains list of groups, that is built from
tasks with type 'group' and each task may contain property
fault_tolerance, that shall be moved from openstack.yaml
to deployment tasks.
For plugins this attribute is filled from roles_metadata
for all tasks with type group (for backward compatibility).

DocImpact
Partial-Bug: 1435610
Change-Id: I1969b953eca667c09248a6b67ffee37bfd20f474
This commit is contained in:
Bulat Gaifullin 2016-05-30 19:51:18 +03:00
parent 40041a1132
commit ebe80dc4ef
7 changed files with 182 additions and 15 deletions

View File

@ -23,6 +23,7 @@
- id: primary-controller
type: group
fault_tolerance: 0
roles: [primary-controller]
required_for: [deploy_end]
requires: [deploy_start]
@ -31,6 +32,7 @@
type: one_by_one
- id: controller
type: group
fault_tolerance: 0
roles: [controller]
requires: [primary-controller]
required_for: [deploy_end]
@ -56,6 +58,7 @@
type: parallel
- id: compute
type: group
fault_tolerance: "2%"
roles: [compute]
requires: [controller]
required_for: [deploy_end]

View File

@ -135,6 +135,10 @@ class TransactionSerializer(object):
self.context = context
self.tasks_graph = {}
self.tasks_dictionary = {}
# the list of groups, that contains information about
# ids of nodes in this group and how many nodes in this group can fail
# and deployment will not be interrupted
self.fault_tolerance_groups = []
self.concurrency_policy = get_concurrency_policy()
@classmethod
@ -154,7 +158,12 @@ class TransactionSerializer(object):
tasks_graph[node_id] = list(
six.itervalues(tasks_graph[node_id])
)
return serializer.tasks_dictionary, tasks_graph
return (
serializer.tasks_dictionary,
tasks_graph,
{'fault_tolerance_groups': serializer.fault_tolerance_groups}
)
@classmethod
def ensure_task_based_deploy_allowed(cls, task):
@ -219,6 +228,9 @@ class TransactionSerializer(object):
node_ids = self.role_resolver.resolve(
task.get('roles', task.get('groups'))
)
if not node_ids:
continue
for sub_task_id in task.get('tasks', ()):
try:
sub_task = tasks_mapping[sub_task_id]
@ -231,6 +243,14 @@ class TransactionSerializer(object):
for node_id in node_ids:
yield node_id, sub_task
self.fault_tolerance_groups.append({
'name': task['id'],
'node_ids': list(node_ids),
'fault_tolerance': self.calculate_fault_tolerance(
task.get('fault_tolerance'), len(node_ids)
)
})
def resolve_nodes(self, task):
if task.get('type') == consts.ORCHESTRATOR_TASK_TYPES.stage:
# all synchronisation tasks will run on sync node
@ -345,3 +365,37 @@ class TransactionSerializer(object):
return False
return task['type'] != consts.ORCHESTRATOR_TASK_TYPES.skipped
@classmethod
def calculate_fault_tolerance(cls, percentage_or_value, total):
"""Calculates actual fault tolerance value.
:param percentage_or_value: the fault tolerance as percent of nodes
that can fail or actual number of nodes
:param total: the total number of nodes in group
:return: the actual number of nodes that can fail
"""
if percentage_or_value is None:
# unattainable number
return total + 1
try:
if (isinstance(percentage_or_value, six.string_types) and
percentage_or_value[-1] == '%'):
result = (int(percentage_or_value[:-1]) * total) // 100
else:
result = int(percentage_or_value)
if result >= 0:
return result
else:
# the negative number means the number of nodes
# those have to deploy successfully
return max(0, total + result)
except ValueError as e:
logger.error(
"Failed to handle fault_tolerance: '%s': %s. it is ignored",
percentage_or_value, e
)
# unattainable number
return total + 1

View File

@ -128,11 +128,22 @@ class PluginAdapterBase(object):
graph_type = consts.DEFAULT_DEPLOYMENT_GRAPH_TYPE
deployment_tasks = []
graph_instance = DeploymentGraph.get_for_model(self.plugin, graph_type)
roles_metadata = self.plugin.roles_metadata
if graph_instance:
for task in DeploymentGraph.get_tasks(graph_instance):
if task.get('parameters'):
task['parameters'].setdefault(
'cwd', self.slaves_scripts_path)
if task.get('type') == consts.ORCHESTRATOR_TASK_TYPES.group:
try:
task.setdefault(
'fault_tolerance',
roles_metadata[task['id']]['fault_tolerance']
)
except KeyError:
pass
deployment_tasks.append(task)
return deployment_tasks

View File

@ -558,7 +558,7 @@ class ClusterTransaction(DeploymentTask):
role_resolver,
)
directory, graph = lcm.TransactionSerializer.serialize(
directory, graph, metadata = lcm.TransactionSerializer.serialize(
context,
tasks,
role_resolver,
@ -567,6 +567,7 @@ class ClusterTransaction(DeploymentTask):
return {
"tasks_directory": directory,
"tasks_graph": graph,
"tasks_metadata": metadata,
"dry_run": dry_run,
}

View File

@ -70,8 +70,8 @@ class TestTaskDeploy80(BaseIntegrationTest):
self.db.flush()
@mock_rpc(pass_mock=True)
def get_deploy_message(self, rpc_cast):
task = self.env.launch_deployment(self.cluster.id)
def get_deploy_message(self, rpc_cast, **kwargs):
task = self.env.launch_deployment(self.cluster.id, **kwargs)
self.assertNotEqual(consts.TASK_STATUSES.error, task.status)
args, kwargs = rpc_cast.call_args
return args[1][1]
@ -90,12 +90,10 @@ class TestTaskDeploy80(BaseIntegrationTest):
@mock.patch.object(TaskProcessor, "ensure_task_based_deploy_allowed")
@mock.patch.object(objects.Release, "is_lcm_supported", return_value=True)
def test_task_deploy_dry_run(self, _, lcm_mock):
message = self.get_deploy_message()
message = self.get_deploy_message(dry_run=True)
self.assertEqual("task_deploy", message["method"])
self.assertItemsEqual(
["task_uuid", "tasks_directory", "tasks_graph", "dry_run"],
message["args"]
)
self.assertIn('dry_run', message['args'])
self.assertTrue(message["args"]['dry_run'])
@mock.patch.object(TaskProcessor, "ensure_task_based_deploy_allowed")
def test_fallback_to_granular_deploy(self, ensure_allowed):

View File

@ -63,7 +63,7 @@ class TestTransactionSerializer(BaseUnitTest):
'type': 'puppet', 'version': '2.0.0',
'parameters': {},
'cross_depended_by': [{'name': 'task3'}]
},
}
]
cls.nodes = [
@ -271,12 +271,23 @@ class TestTransactionSerializer(BaseUnitTest):
'cross_depends': [{'name': 'task2', 'role': 'self'}],
})
tasks.append({
'type': 'group', 'roles': 'custom',
'id': 'custom', 'type': 'group', 'roles': 'custom',
'fault_tolerance': '100%',
'tasks': ['task4', 'task2']
})
tasks.append({
'id': 'controller', 'type': 'group', 'roles': 'controller',
'fault_tolerance': '0%',
'tasks': ['task4', 'task2']
})
tasks.append({
'id': 'compute', 'type': 'group', 'roles': 'compute',
'tasks': ['task4', 'task2']
})
serialized = lcm.TransactionSerializer.serialize(
self.context, tasks, self.role_resolver
)[1]
)
tasks_per_node = serialized[1]
self.datadiff(
[
{
@ -291,11 +302,36 @@ class TestTransactionSerializer(BaseUnitTest):
},
],
serialized['4'],
tasks_per_node['4'],
ignore_keys=['parameters', 'fail_on_error'],
compare_sorted=True
)
tasks_metadata = serialized[2]
self.datadiff(
{
'fault_tolerance_groups': [
{
'name': 'custom',
'node_ids': ['4'],
'fault_tolerance': 1
},
{
'name': 'controller',
'node_ids': ['1'],
'fault_tolerance': 0
},
{
'name': 'compute',
'node_ids': ['2'],
'fault_tolerance': 2
}
]
},
tasks_metadata,
compare_sorted=True
)
def test_expand_dependencies(self):
serializer = lcm.TransactionSerializer(
self.context, self.role_resolver
@ -404,3 +440,33 @@ class TestTransactionSerializer(BaseUnitTest):
)
def test_multi_processing_serialization(self):
self.test_serialize_integration()
def test_get_fault_tolerance(self):
self.assertEqual(
11,
lcm.TransactionSerializer.calculate_fault_tolerance(None, 10)
)
self.assertEqual(
10,
lcm.TransactionSerializer.calculate_fault_tolerance('10', 10)
)
self.assertEqual(
10,
lcm.TransactionSerializer.calculate_fault_tolerance(10, 10)
)
self.assertEqual(
1,
lcm.TransactionSerializer.calculate_fault_tolerance('10%', 10)
)
self.assertEqual(
11,
lcm.TransactionSerializer.calculate_fault_tolerance('a%', 10)
)
self.assertEqual(
9,
lcm.TransactionSerializer.calculate_fault_tolerance('-10%', 10)
)
self.assertEqual(
9,
lcm.TransactionSerializer.calculate_fault_tolerance('-1', 10)
)

View File

@ -51,14 +51,16 @@ class TestPluginBase(base.BaseTestCase):
'role_y': {
'name': 'Role Y',
'description': 'Role Y is ...',
'restrictions': []
'restrictions': [],
'fault_tolerance': '5%'
},
'role_z': {
'name': 'Role Z',
'description': 'Role Z is ...',
'restrictions': [
'settings:some.stuff.value == false'
]
],
'fault_tolerance': '10%'
}
}
)
@ -175,6 +177,38 @@ class TestPluginBase(base.BaseTestCase):
self.assertEqual(depl_task['parameters'].get('cwd'),
self.plugin_adapter.slaves_scripts_path)
@mock.patch('nailgun.plugins.adapters.DeploymentGraph')
def test_fault_tolerance_set_for_task_groups(self, deployment_graph_mock):
deployment_graph_mock.get_for_model.return_value = True
deployment_graph_mock.get_tasks.return_value = [
{
'id': 'role_x',
'type': consts.ORCHESTRATOR_TASK_TYPES.group,
'roles': ['role_x'],
'fault_tolerance': '0'
},
{
'id': 'role_y',
'type': consts.ORCHESTRATOR_TASK_TYPES.group,
'roles': ['role_y'],
},
{
'id': 'role_z',
'type': consts.ORCHESTRATOR_TASK_TYPES.group,
'roles': ['role_z'],
'fault_tolerance': '50%'
},
]
depl_task = self.plugin_adapter.get_deployment_tasks()
fault_tolerance_groups = {
task['id']: task.get('fault_tolerance')
for task in depl_task
}
self.assertEqual(
{'role_x': '0', 'role_y': '5%', 'role_z': '50%'},
fault_tolerance_groups
)
def test_get_deployment_tasks_params_not_changed(self):
expected = 'path/to/some/dir'
dg = DeploymentGraph.get_for_model(self.plugin_adapter.plugin)