Merge "Execute pipeline only for cluster enabled exts"
This commit is contained in:
commit
9eb64605d0
|
@ -24,7 +24,7 @@ import six
|
|||
class BasePipeline(object):
|
||||
|
||||
@classmethod
|
||||
def process_deployment(cls, deployment_data, **kwargs):
|
||||
def process_deployment(cls, deployment_data, cluster, nodes, **kwargs):
|
||||
"""Change the deployment_data.
|
||||
|
||||
:param deployment_data: serialized data
|
||||
|
@ -32,7 +32,7 @@ class BasePipeline(object):
|
|||
return deployment_data
|
||||
|
||||
@classmethod
|
||||
def process_provisioning(cls, provisioning_data, **kwargs):
|
||||
def process_provisioning(cls, provisioning_data, cluster, nodes, **kwargs):
|
||||
"""Change the provisioning_data.
|
||||
|
||||
:param provisioning_data: serialized data
|
||||
|
|
|
@ -108,17 +108,23 @@ def fire_callback_on_cluster_delete(cluster):
|
|||
extension.on_cluster_delete(cluster)
|
||||
|
||||
|
||||
def fire_callback_on_deployment_data_serialization(data, **kwargs):
|
||||
for pipeline in chain.from_iterable(
|
||||
ext.data_pipelines for ext in get_all_extensions()):
|
||||
data = pipeline.process_deployment(data, **kwargs)
|
||||
def _collect_data_pipelines_for_cluster(cluster):
|
||||
extensions = set(cluster.extensions)
|
||||
return chain.from_iterable(e.data_pipelines for e in get_all_extensions()
|
||||
if e.name in extensions)
|
||||
|
||||
|
||||
def fire_callback_on_deployment_data_serialization(data, cluster, nodes,
|
||||
**kwargs):
|
||||
for pipeline in _collect_data_pipelines_for_cluster(cluster):
|
||||
data = pipeline.process_deployment(data, cluster, nodes, **kwargs)
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def fire_callback_on_provisioning_data_serialization(data, **kwargs):
|
||||
for pipeline in chain.from_iterable(
|
||||
ext.data_pipelines for ext in get_all_extensions()):
|
||||
data = pipeline.process_provisioning(data, **kwargs)
|
||||
def fire_callback_on_provisioning_data_serialization(data, cluster, nodes,
|
||||
**kwargs):
|
||||
for pipeline in _collect_data_pipelines_for_cluster(cluster):
|
||||
data = pipeline.process_provisioning(data, cluster, nodes, **kwargs)
|
||||
|
||||
return data
|
||||
|
|
|
@ -645,7 +645,7 @@ def _execute_pipeline(data, cluster, nodes, ignore_customized):
|
|||
"Executes pipelines depending on ignore_customized boolean."
|
||||
if ignore_customized:
|
||||
return fire_callback_on_deployment_data_serialization(
|
||||
data, cluster=cluster, nodes=nodes)
|
||||
data, cluster, nodes)
|
||||
|
||||
nodes_without_customized = {n.uid: n for n in nodes
|
||||
if not n.replaced_deployment_info}
|
||||
|
@ -659,8 +659,8 @@ def _execute_pipeline(data, cluster, nodes, ignore_customized):
|
|||
# NOTE(sbrzeczkowski): pipelines must be executed for nodes
|
||||
# which don't have replaced_deployment_info specified
|
||||
updated_data = fire_callback_on_deployment_data_serialization(
|
||||
nodes_data_for_pipeline, cluster=cluster,
|
||||
nodes=list(six.itervalues(nodes_without_customized)))
|
||||
nodes_data_for_pipeline, cluster,
|
||||
list(six.itervalues(nodes_without_customized)))
|
||||
|
||||
# customized nodes
|
||||
updated_data.extend(six.moves.filterfalse(keyfunc, data))
|
||||
|
|
|
@ -373,7 +373,7 @@ def _execute_pipeline(data, cluster, nodes, ignore_customized):
|
|||
"Executes pipelines depending on ignore_customized boolean."
|
||||
if ignore_customized:
|
||||
return fire_callback_on_provisioning_data_serialization(
|
||||
data, cluster=cluster, nodes=nodes)
|
||||
data, cluster, nodes)
|
||||
|
||||
nodes_without_customized = {n.uid: n for n in nodes
|
||||
if not n.replaced_provisioning_info}
|
||||
|
@ -389,8 +389,7 @@ def _execute_pipeline(data, cluster, nodes, ignore_customized):
|
|||
# NOTE(sbrzeczkowski): pipelines must be executed for nodes
|
||||
# which don't have replaced_provisioning_info specified
|
||||
updated_data = fire_callback_on_provisioning_data_serialization(
|
||||
data, cluster=cluster,
|
||||
nodes=list(six.itervalues(nodes_without_customized)))
|
||||
data, cluster, list(six.itervalues(nodes_without_customized)))
|
||||
|
||||
# customized nodes
|
||||
updated_data['nodes'].extend(six.moves.filterfalse(keyfunc, temp_nodes))
|
||||
|
|
|
@ -221,19 +221,6 @@ class TestPipeline(BaseExtensionCase):
|
|||
|
||||
return cluster
|
||||
|
||||
@mock.patch('nailgun.orchestrator.deployment_serializers.'
|
||||
'fire_callback_on_deployment_data_serialization')
|
||||
def test_deployment_serialization(self, mfire_callback):
|
||||
cluster = self._create_cluster_with_extensions()
|
||||
graph = deployment_graph.AstuteGraph(cluster)
|
||||
deployment_serializers.serialize(graph, cluster, cluster.nodes)
|
||||
|
||||
self.assertTrue(mfire_callback.called)
|
||||
self.assertEqual(mfire_callback.call_args[1], {
|
||||
'cluster': cluster,
|
||||
'nodes': cluster.nodes,
|
||||
})
|
||||
|
||||
@mock.patch.object(deployment_graph.AstuteGraph, 'deploy_task_serialize')
|
||||
def test_deployment_serialization_ignore_customized(self, _):
|
||||
cluster = self._create_cluster_with_extensions()
|
||||
|
@ -260,8 +247,7 @@ class TestPipeline(BaseExtensionCase):
|
|||
deployment_serializers.serialize(
|
||||
graph, cluster, cluster.nodes, ignore_customized=True)
|
||||
|
||||
mfire_callback.assert_called_once_with(data, cluster=cluster,
|
||||
nodes=cluster.nodes)
|
||||
mfire_callback.assert_called_once_with(data, cluster, cluster.nodes)
|
||||
|
||||
@mock.patch.object(deployment_graph.AstuteGraph, 'deploy_task_serialize')
|
||||
def test_deployment_serialization_ignore_customized_false(self, _):
|
||||
|
@ -298,12 +284,10 @@ class TestPipeline(BaseExtensionCase):
|
|||
deployment_serializers.serialize(
|
||||
graph, cluster, cluster.nodes, ignore_customized=False)
|
||||
|
||||
self.assertEqual(
|
||||
mfire_callback.call_args[0][0], expected_data)
|
||||
self.assertEqual(mfire_callback.call_args[0][0], expected_data)
|
||||
self.assertIs(mfire_callback.call_args[0][1], cluster)
|
||||
self.assertItemsEqual(
|
||||
mfire_callback.call_args[1]['nodes'], cluster.nodes[1:])
|
||||
self.assertIs(
|
||||
mfire_callback.call_args[1]['cluster'], cluster)
|
||||
mfire_callback.call_args[0][2], cluster.nodes[1:])
|
||||
|
||||
def test_provisioning_serialization_ignore_customized(self):
|
||||
cluster = self._create_cluster_with_extensions()
|
||||
|
@ -328,8 +312,7 @@ class TestPipeline(BaseExtensionCase):
|
|||
provisioning_serializers.serialize(
|
||||
cluster, cluster.nodes, ignore_customized=True)
|
||||
|
||||
mfire_callback.assert_called_once_with(data, cluster=cluster,
|
||||
nodes=cluster.nodes)
|
||||
mfire_callback.assert_called_once_with(data, cluster, cluster.nodes)
|
||||
|
||||
def test_provisioning_serialization_ignore_customized_false(self):
|
||||
cluster = self._create_cluster_with_extensions(
|
||||
|
@ -342,7 +325,7 @@ class TestPipeline(BaseExtensionCase):
|
|||
)
|
||||
|
||||
data = {"nodes": [{"uid": n.uid} for n in cluster.nodes]}
|
||||
expected_data = copy.deepcopy(data["nodes"][1:])
|
||||
expected_data = {"nodes": copy.deepcopy(data["nodes"][1:])}
|
||||
|
||||
mserializer = mock.MagicMock()
|
||||
mserializer.serialize.return_value = data
|
||||
|
@ -363,24 +346,10 @@ class TestPipeline(BaseExtensionCase):
|
|||
provisioning_serializers.serialize(
|
||||
cluster, cluster.nodes, ignore_customized=False)
|
||||
|
||||
self.assertEqual(
|
||||
mfire_callback.call_args[0][0], {'nodes': expected_data})
|
||||
self.assertEqual(mfire_callback.call_args[0][0], expected_data)
|
||||
self.assertIs(mfire_callback.call_args[0][1], cluster)
|
||||
self.assertItemsEqual(
|
||||
mfire_callback.call_args[1]['nodes'], cluster.nodes[1:])
|
||||
self.assertIs(
|
||||
mfire_callback.call_args[1]['cluster'], cluster)
|
||||
|
||||
@mock.patch('nailgun.orchestrator.provisioning_serializers.'
|
||||
'fire_callback_on_provisioning_data_serialization')
|
||||
def test_provisioning_serialization(self, mfire_callback):
|
||||
cluster = self._create_cluster_with_extensions()
|
||||
provisioning_serializers.serialize(cluster, cluster.nodes)
|
||||
|
||||
self.assertTrue(mfire_callback.called)
|
||||
self.assertEqual(mfire_callback.call_args[1], {
|
||||
'cluster': cluster,
|
||||
'nodes': cluster.nodes,
|
||||
})
|
||||
mfire_callback.call_args[0][2], cluster.nodes[1:])
|
||||
|
||||
def test_pipeline_change_data(self):
|
||||
self.env.create(
|
||||
|
@ -394,14 +363,14 @@ class TestPipeline(BaseExtensionCase):
|
|||
class PipelinePlus1(BasePipeline):
|
||||
|
||||
@classmethod
|
||||
def process_provisioning(cls, data, **kwargs):
|
||||
def process_provisioning(cls, data, cluster, nodes, **kwargs):
|
||||
data['key'] += 1
|
||||
return data
|
||||
|
||||
class PipelinePlus2(BasePipeline):
|
||||
|
||||
@classmethod
|
||||
def process_provisioning(cls, data, **kwargs):
|
||||
def process_provisioning(cls, data, cluster, nodes, **kwargs):
|
||||
data['key'] += 2
|
||||
return data
|
||||
|
||||
|
@ -413,6 +382,9 @@ class TestPipeline(BaseExtensionCase):
|
|||
|
||||
extension = Extension()
|
||||
|
||||
cluster.extensions = [extension.name]
|
||||
self.db.flush()
|
||||
|
||||
data = {'key': 0, 'nodes': []}
|
||||
|
||||
mserializer = mock.MagicMock()
|
||||
|
|
Loading…
Reference in New Issue