Add handler for serialized tasks used in t-b deployment

Handler will return output of the same serialized graph used for
deployment in json format. And it will be available using next uri

GET clusters/<:cluster_id>/serialized_tasks?nodes=1,2,3&tasks=task1,task2

Expected return codes are the following:

200 - serialized tasks returned
400 - task based deployment is not enabled for cluster
400 - Nodes dont belong to same cluster
404 - cluster or nodes - not found

implements blueprint serialized-graph

Change-Id: I170011778296acdb977f9d938c062f14c1c3d556
This commit is contained in:
Dmitry Shulyak 2016-02-24 15:09:13 +02:00 committed by Bulat Gaifullin
parent ea336ad52e
commit 663b7ebc8e
6 changed files with 173 additions and 13 deletions

View File

@ -29,6 +29,7 @@ from nailgun.api.v1.validators.node import NodesFilterValidator
from nailgun.logger import logger
from nailgun.errors import errors
from nailgun import objects
from nailgun.orchestrator import deployment_graph
@ -37,6 +38,7 @@ from nailgun.orchestrator import graph_visualization
from nailgun.orchestrator import provisioning_serializers
from nailgun.orchestrator.stages import post_deployment_serialize
from nailgun.orchestrator.stages import pre_deployment_serialize
from nailgun.orchestrator import task_based_deployment
from nailgun.task.helpers import TaskHelper
from nailgun.task import manager
@ -366,3 +368,37 @@ class TaskDeployGraph(BaseHandler):
parents_for=parents_for,
remove=remove)
return dotgraph.to_string()
class SerializedTasksHandler(NodesFilterMixin, BaseHandler):
def get_default_nodes(self, cluster):
return TaskHelper.nodes_to_deploy(cluster)
@content
def GET(self, cluster_id):
""":returns: serialized tasks in json format
:http: * 200 (serialized tasks returned)
* 400 (task based deployment is not allowed for cluster)
* 400 (some nodes belong to different cluster)
* 404 (cluster is not found)
* 404 (nodes are not found)
"""
cluster = self.get_object_or_404(objects.Cluster, cluster_id)
nodes = self.get_nodes(cluster)
self.checked_data(self.validator.validate_placement,
data=nodes, cluster=cluster)
tasks = web.input(tasks=None).tasks
task_ids = [t.strip() for t in tasks.split(',')] if tasks else None
try:
serialized_tasks = task_based_deployment.TasksSerializer.serialize(
cluster,
nodes,
objects.Cluster.get_deployment_tasks(cluster),
task_ids=task_ids
)
return {'tasks_directory': serialized_tasks[0],
'tasks_graph': serialized_tasks[1]}
except errors.TaskBaseDeploymentNotAllowed as exc:
raise self.http(400, msg=six.text_type(exc))

View File

@ -97,6 +97,7 @@ from nailgun.api.v1.handlers.orchestrator import DeploySelectedNodes
from nailgun.api.v1.handlers.orchestrator import DeploySelectedNodesWithTasks
from nailgun.api.v1.handlers.orchestrator import ProvisioningInfo
from nailgun.api.v1.handlers.orchestrator import ProvisionSelectedNodes
from nailgun.api.v1.handlers.orchestrator import SerializedTasksHandler
from nailgun.api.v1.handlers.orchestrator import TaskDeployGraph
from nailgun.api.v1.handlers.release import ReleaseCollectionHandler
@ -195,6 +196,9 @@ urls = (
DefaultPrePluginsHooksInfo,
r'/clusters/(?P<cluster_id>\d+)/orchestrator/plugins_post_hooks/?$',
DefaultPostPluginsHooksInfo,
r'/clusters/(?P<cluster_id>\d+)/serialized_tasks/?$',
SerializedTasksHandler,
r'/clusters/(?P<cluster_id>\d+)/provision/?$',
ProvisionSelectedNodes,

View File

@ -333,6 +333,23 @@ class NodesFilterValidator(BasicValidator):
return node_ids
@classmethod
def validate_placement(cls, nodes, cluster):
"""Validates that given nodes placed in given cluster
:param nodes: list of objects.Node instances
:param cluster: objects.Cluster instance
"""
wrongly_placed_uids = []
for node in nodes:
if node.cluster_id != cluster.id:
wrongly_placed_uids.append(node.uid)
if wrongly_placed_uids:
raise errors.InvalidData(
'Nodes {} do not belong to cluster {}'.format(
', '.join(wrongly_placed_uids), cluster.id))
class ProvisionSelectedNodesValidator(NodesFilterValidator):

View File

@ -468,6 +468,10 @@ class EnvironmentManager(object):
self.db.flush()
return created_ips
def disable_task_deploy(self, cluster):
cluster.attributes.editable['common']['task_deploy']['value'] = False
self.db().flush()
def delete_node_group(self, ng_id, status_code=200, api=True):
if api:
return self.app.delete(

View File

@ -16,9 +16,11 @@
from mock import patch
from oslo_serialization import jsonutils
import six
from nailgun import consts
from nailgun import objects
from nailgun.orchestrator.task_based_deployment import TaskProcessor
from nailgun.db.sqlalchemy.models import Cluster
from nailgun.test.base import BaseIntegrationTest
@ -26,8 +28,17 @@ from nailgun.test.base import fake_tasks
from nailgun.utils import reverse
def make_orchestrator_uri(node_ids):
return '?nodes={0}'.format(','.join(node_ids))
def make_query(**kwargs):
"""Each value in kwargs should be iterable
:returns: ?k1=1,2,3&k2=1,2,3
"""
query = ''
for key, value in six.iteritems(kwargs):
if query:
query += '&'
query += '{}={}'.format(key, ','.join(value))
return '?' + query if query else query
class TestDefaultOrchestratorInfoHandlers(BaseIntegrationTest):
@ -76,7 +87,7 @@ class TestDefaultOrchestratorInfoHandlers(BaseIntegrationTest):
url = reverse(
'DefaultProvisioningInfo',
kwargs={'cluster_id': self.cluster.id}) + \
make_orchestrator_uri(node_ids)
make_query(nodes=node_ids)
resp = self.app.get(url, headers=self.default_headers)
self.assertEqual(resp.status_code, 200)
@ -90,7 +101,7 @@ class TestDefaultOrchestratorInfoHandlers(BaseIntegrationTest):
url = reverse(
'DefaultDeploymentInfo',
kwargs={'cluster_id': self.cluster.id}) + \
make_orchestrator_uri(node_ids)
make_query(nodes=node_ids)
resp = self.app.get(url, headers=self.default_headers)
self.assertEqual(resp.status_code, 200)
@ -163,7 +174,7 @@ class BaseSelectedNodesTest(BaseIntegrationTest):
return reverse(
handler_name,
kwargs={'cluster_id': self.cluster.id}) + \
make_orchestrator_uri(node_uids)
make_query(nodes=node_uids)
def emulate_nodes_provisioning(self, nodes):
for node in nodes:
@ -405,3 +416,98 @@ class TestDeployMethodVersioning(BaseSelectedNodesTest):
@patch('nailgun.task.task.rpc.cast')
def test_granular_is_used_in_61(self, mcast):
self.assert_deployment_method('2014.2-6.1', 'granular_deploy', mcast)
class TestSerializedTasksHandler(BaseIntegrationTest):
def setUp(self):
super(TestSerializedTasksHandler, self).setUp()
self.env.create(
nodes_kwargs=[
{'roles': ['controller'], 'pending_addition': True},
{'roles': ['compute'], 'pending_addition': True}])
self.cluster = self.env.clusters[-1]
self.nodes = self.cluster.nodes
objects.Cluster.prepare_for_deployment(
self.cluster, self.cluster.nodes)
def get_serialized_tasks(self, cluster_id, **kwargs):
uri = reverse(
"SerializedTasksHandler",
kwargs={'cluster_id': cluster_id}) + \
make_query(**kwargs)
return self.app.get(uri, expect_errors=True)
def previous(self, obj):
return str(obj.id - 1)
@patch.object(TaskProcessor, 'ensure_task_based_deploy_allowed')
def test_serialized_tasks_returned(self, _):
nodes_uids = [n.uid for n in self.nodes]
resp = self.get_serialized_tasks(
self.cluster.id, nodes=nodes_uids)
self.assertEqual(resp.status_code, 200)
self.assertIn('tasks_graph', resp.json)
self.assertIn('tasks_directory', resp.json)
tasks_graph = resp.json['tasks_graph']
self.assertItemsEqual(nodes_uids + ['null'], tasks_graph.keys())
expected_tasks = ['netconfig', 'globals', 'deploy_legacy',
'upload_nodes_info', 'update_hosts']
for n in nodes_uids:
self.assertItemsEqual(
[t['id'] for t in tasks_graph[n]], expected_tasks)
virtual_tasks = ['post_deployment_start', 'post_deployment_end',
'deploy_start', 'deploy_end',
'pre_deployment_start', 'pre_deployment_end']
self.assertItemsEqual(
[t['id'] for t in tasks_graph['null']], virtual_tasks)
# sanity check that it returns dictionary with tasks metadata
for task_name, task in six.iteritems(resp.json['tasks_directory']):
self.assertIn(task['type'], consts.ORCHESTRATOR_TASK_TYPES)
@patch.object(TaskProcessor, 'ensure_task_based_deploy_allowed')
def test_query_nodes_and_tasks(self, _):
not_skipped = ['globals']
resp = self.get_serialized_tasks(
self.cluster.id,
nodes=[self.nodes[0].uid],
tasks=not_skipped)
for t in resp.json['tasks_graph'][self.nodes[0].uid]:
if t['id'] in not_skipped:
self.assertNotEqual(
t['type'], consts.ORCHESTRATOR_TASK_TYPES.skipped)
else:
self.assertEqual(
t['type'], consts.ORCHESTRATOR_TASK_TYPES.skipped)
@patch.object(TaskProcessor, 'ensure_task_based_deploy_allowed')
def test_pending_nodes_serialized(self, _):
resp = self.get_serialized_tasks(self.cluster.id)
self.assertEqual(resp.status_code, 200)
expected = set((n.uid for n in self.nodes))
expected.add('null')
self.assertTrue(expected, resp.json['tasks_graph'].keys())
def test_404_if_cluster_doesnt_exist(self):
resp = self.get_serialized_tasks(self.previous(self.cluster))
self.assertEqual(resp.status_code, 404)
self.assertIn("Cluster not found", resp.body)
def test_404_if_node_doesnt_exist(self):
resp = self.get_serialized_tasks(self.cluster.id,
nodes=[self.previous(self.nodes[0])])
self.assertEqual(resp.status_code, 404)
self.assertIn("NodeCollection not found", resp.body)
def test_400_if_node_not_in_this_cluster(self):
node = self.env.create_node()
resp = self.get_serialized_tasks(self.cluster.id,
nodes=[node['uid']])
self.assertEqual(resp.status_code, 400)
self.assertIn("do not belong to cluster", resp.body)
def test_400_if_task_based_not_allowed(self):
self.env.disable_task_deploy(self.cluster)
resp = self.get_serialized_tasks(self.cluster.id)
self.assertEqual(resp.status_code, 400)
self.assertIn("The task-based deployment is not allowed", resp.body)

View File

@ -14,7 +14,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
import mock
from nailgun import consts
@ -44,12 +43,6 @@ class TestTaskDeploy(BaseIntegrationTest):
)
self.cluster = self.env.clusters[-1]
def disable_task_deploy(self):
cluster_attrs = copy.deepcopy(self.cluster.attributes.editable)
cluster_attrs['common']['task_deploy']['value'] = False
self.cluster.attributes.editable = cluster_attrs
self.db().flush()
def add_plugin_with_tasks(self, task_id):
deployment_tasks = self.env.get_default_plugin_deployment_tasks(
id=task_id, type="skipped",
@ -99,7 +92,7 @@ class TestTaskDeploy(BaseIntegrationTest):
ensure_allowed.assert_called_once_with(mock.ANY)
def test_granular_deploy_if_not_enabled(self):
self.disable_task_deploy()
self.env.disable_task_deploy(self.cluster)
message = self.get_deploy_message()
self.assertEqual("granular_deploy", message["method"])
self.assertItemsEqual(