The task attributes substitution is called for all tasks

The yaql_exp can be used for calculate dependencies of all tasks
including skipped tasks, so the task attributes traverse should
be called for all task too.
Also added check that dependency is not empty object, because
it is possible when it has been dynamically generated via yaql.

Partial-Bug: 1541309
Change-Id: Ibcb786d2a7917d7583433c0b96f6324be4de759b
This commit is contained in:
Bulat Gaifullin 2016-05-25 18:03:48 +03:00
parent c3a868dd32
commit 5630e40e61
4 changed files with 74 additions and 32 deletions

View File

@ -134,23 +134,46 @@ class DeploymentTaskSerializer(object):
))
@abc.abstractmethod
def finalize(self, task, node_id):
"""Finish task serialization.
:param task: the serialized task
:param node_id: the target node_id
:return: the result
"""
def serialize(self, node_id):
"""Serialize task in expected by orchestrator format.
This interface should return generator, because in some cases one
external task - should serialize several tasks internally.
:param node_id: the target node_id
"""
@classmethod
def get_required_fields(cls, task, fields=None):
"""Gets only specified fields from task.
logger.debug(
"serialize task %s for node %s",
self.task_template['id'], node_id
)
task = utils.traverse(
self.task_template,
utils.text_format_safe,
self.context.get_formatter_context(node_id),
{
'yaql_exp': self.context.get_yaql_interpreter(
node_id, self.task_template['id'])
}
)
return self.normalize(self.finalize(task, node_id))
def normalize(self, task):
"""Removes unnecessary fields.
:param task: the serialized task
:param fields: the list of fields for including
:return: the task instance
"""
return {k: task.get(k) for k in (fields or cls.fields)}
fields = self.fields
for k in list(task):
if k not in fields:
del task[k]
return task
class NoopTaskSerializer(DeploymentTaskSerializer):
@ -158,10 +181,8 @@ class NoopTaskSerializer(DeploymentTaskSerializer):
self.task_template = task_template
self.context = context
def serialize(self, node_id):
task = self.get_required_fields(
self.task_template, self.fields - {'parameters'}
)
def finalize(self, task, node_id):
task.pop('parameters', None)
task['type'] = consts.ORCHESTRATOR_TASK_TYPES.skipped
task['fail_on_error'] = False
return task
@ -177,29 +198,16 @@ class DefaultTaskSerializer(NoopTaskSerializer):
return interpreter(condition)
return condition
def serialize(self, node_id):
logger.debug(
"serialize task %s for node %s",
self.task_template['id'], node_id
)
task = utils.traverse(
self.task_template,
utils.text_format_safe,
self.context.get_formatter_context(node_id),
{
'yaql_exp': self.context.get_yaql_interpreter(
node_id, self.task_template['id'])
}
)
def finalize(self, task, node_id):
if not self.should_execute(task, node_id):
logger.debug(
"Task %s is skipped by condition.", self.task_template['id']
"Task %s is skipped by condition.", task['id']
)
return super(DefaultTaskSerializer, self).serialize(node_id)
return super(DefaultTaskSerializer, self).finalize(task, node_id)
task.setdefault('parameters', {}).setdefault('cwd', '/')
task.setdefault('fail_on_error', True)
return self.get_required_fields(task)
return task
def handle_unsupported(_, task_template):

View File

@ -212,7 +212,7 @@ class TransactionSerializer(object):
if not dependencies:
return
for dep in dependencies:
for dep in six.moves.filter(None, dependencies):
roles = dep.get('role', consts.TASK_ROLES.all)
if roles == consts.TASK_ROLES.self:

View File

@ -523,6 +523,8 @@ class ClusterTransaction(DeploymentTask):
expected_state = cls._save_deployment_info(
transaction, deployment_info
)
# Added cluster state
expected_state[None] = cls.get_cluster_state(expected_state)
context = lcm.TransactionContext(expected_state, current_state)
logger.debug("tasks serialization is started.")

View File

@ -134,10 +134,9 @@ class TestTaskSerializerContext(BaseUnitTest):
self.assertTrue(interpreter('old($.attribute).isUndef()'))
class TestDefaultTaskSerializer(BaseUnitTest):
class TestTaskSerializer(BaseUnitTest):
@classmethod
def setUpClass(cls):
cls.serializer_class = task_serializer.DefaultTaskSerializer
cls.context = task_serializer.Context(TransactionContext({
'1': {
'cluster': {'id': 1},
@ -161,6 +160,13 @@ class TestDefaultTaskSerializer(BaseUnitTest):
}
}))
class TestDefaultTaskSerializer(TestTaskSerializer):
@classmethod
def setUpClass(cls):
super(TestDefaultTaskSerializer, cls).setUpClass()
cls.serializer_class = task_serializer.DefaultTaskSerializer
def check_condition(self, condition, expected):
task_template = {
'id': 'test',
@ -277,6 +283,32 @@ class TestDefaultTaskSerializer(BaseUnitTest):
)
class TestNoopTaskSerialzer(TestTaskSerializer):
@classmethod
def setUpClass(cls):
super(TestNoopTaskSerialzer, cls).setUpClass()
cls.serializer_class = task_serializer.NoopTaskSerializer
def test_serialize(self):
task_template = {
'id': 'test',
'type': 'skipped',
'parameters': {},
'fail_on_error': True,
'requires': {'yaql_exp': '["deploy_start"]'},
'required_for': ['deploy_end'],
}
serializer = self.serializer_class(self.context, task_template)
serialized = serializer.serialize('1')
self.assertEqual(
{
'id': 'test', 'type': 'skipped', 'fail_on_error': False,
'requires': ['deploy_start'], 'required_for': ['deploy_end']
},
serialized
)
class TestTasksSerializersFactory(BaseUnitTest):
factory_class = task_serializer.TasksSerializersFactory