Reworked ApplyChanges for LCM

The following legacy tasks were reworked to use ClusterTransaction:
 - OpenstackConfigTaskManager
 - SpawnVMsTaskManager

Change-Id: I4a6f5f37161e4290050ec4926cf029cd7af566e4
Closes-Bug: 1565885
Closes-Bug: 1561994
Closes-Bug: 1565760
This commit is contained in:
Bulat Gaifullin 2016-04-04 18:02:48 +03:00
parent 4e7f0588e1
commit fe73beacc6
10 changed files with 388 additions and 179 deletions

View File

@ -537,6 +537,10 @@ class DeferredTaskHandler(BaseHandler):
u"on environment '{env_id}': {error}"
task_manager = None
@classmethod
def get_options(cls):
return {}
@content
def PUT(self, cluster_id):
""":returns: JSONized Task object.
@ -558,10 +562,15 @@ class DeferredTaskHandler(BaseHandler):
logger.info(self.log_message.format(env_id=cluster_id))
try:
options = self.get_options()
except ValueError as e:
raise self.http(400, six.text_type(e))
try:
self.validator.validate(cluster)
task_manager = self.task_manager(cluster_id=cluster.id)
task = task_manager.execute()
task = task_manager.execute(**options)
except (
errors.AlreadyExists,
errors.StopAlreadyRunning

View File

@ -41,8 +41,8 @@ from nailgun.api.v1.validators.cluster import VmwareAttributesValidator
from nailgun.errors import errors
from nailgun.logger import logger
from nailgun import objects
from nailgun import utils
from nailgun.task.manager import ApplyChangesForceTaskManager
from nailgun.task.manager import ApplyChangesTaskManager
from nailgun.task.manager import ClusterDeletionManager
from nailgun.task.manager import ResetEnvironmentTaskManager
@ -119,15 +119,31 @@ class ClusterChangesHandler(DeferredTaskHandler):
task_manager = ApplyChangesTaskManager
validator = ClusterChangesValidator
@classmethod
def get_options(cls):
data = web.input(graph_type=None)
return {
'graph_type': data.graph_type,
'force': False
}
class ClusterChangesForceRedeployHandler(DeferredTaskHandler):
log_message = u"Trying to force deployment of the environment '{env_id}'"
log_error = u"Error during execution of a forced deployment task " \
u"on environment '{env_id}': {error}"
task_manager = ApplyChangesForceTaskManager
task_manager = ApplyChangesTaskManager
validator = ClusterChangesValidator
@classmethod
def get_options(cls):
data = web.input(graph_type=None)
return {
'graph_type': data.graph_type,
'force': True
}
class ClusterStopDeploymentHandler(DeferredTaskHandler):
@ -202,7 +218,7 @@ class ClusterAttributesHandler(BaseHandler):
if not cluster.attributes:
raise self.http(500, "No attributes found!")
force = web.input(force=None).force not in (None, '', '0')
force = utils.parse_bool(web.input(force='0').force)
data = self.checked_data(cluster=cluster, force=force)
objects.Cluster.patch_attributes(cluster, data)

View File

@ -22,8 +22,8 @@ from nailgun.api.v1.handlers.base import content
from nailgun.api.v1.validators.task import TaskValidator
from nailgun.errors import errors
from nailgun import objects
from nailgun import utils
"""
@ -50,7 +50,7 @@ class TaskHandler(SingleHandler):
obj_id
)
force = web.input(force=None).force not in (None, u'', u'0')
force = utils.parse_bool(web.input(force='0').force)
try:
self.validator.validate_delete(None, obj, force=force)

View File

@ -17,7 +17,6 @@
"""Deployment serializers for orchestrator"""
from copy import deepcopy
import itertools
import six
@ -94,23 +93,27 @@ class DeploymentMultinodeSerializer(object):
def serialize(self, cluster, nodes, ignore_customized=False):
"""Method generates facts which are passed to puppet."""
def is_customized(node):
return bool(node.replaced_deployment_info)
try:
self.initialize(cluster)
serialized_nodes = []
nodes = sorted(nodes, key=is_customized)
node_groups = itertools.groupby(nodes, is_customized)
for customized, node_group in node_groups:
if customized and not ignore_customized:
serialized_nodes.extend(
self.serialize_customized(cluster, node_group)
)
else:
serialized_nodes.extend(
self.serialize_generated(cluster, node_group)
)
origin_nodes = []
customized_nodes = []
if ignore_customized:
origin_nodes = nodes
else:
for node in nodes:
if node.replaced_deployment_info:
customized_nodes.append(node)
else:
origin_nodes.append(node)
serialized_nodes.extend(
self.serialize_generated(cluster, origin_nodes)
)
serialized_nodes.extend(
self.serialize_customized(cluster, customized_nodes)
)
# NOTE(dshulyak) tasks should not be preserved from replaced
# deployment info, there is different mechanism to control

View File

@ -137,6 +137,28 @@ class ApplyChangesTaskManager(TaskManager, DeploymentCheckMixin):
deployment_type = consts.TASK_NAMES.deploy
def get_deployment_task(self):
if objects.Release.is_lcm_supported(self.cluster.release):
return tasks.ClusterTransaction
return tasks.DeploymentTask
def ensure_nodes_changed(
self, nodes_to_provision, nodes_to_deploy, nodes_to_delete
):
if objects.Release.is_lcm_supported(self.cluster.release):
return
if not any([nodes_to_provision, nodes_to_deploy, nodes_to_delete]):
db().rollback()
raise errors.WrongNodeStatus("No changes to deploy")
def get_nodes_to_deploy(self, force=False):
if objects.Release.is_lcm_supported(self.cluster.release):
return list(
objects.Cluster.get_nodes_not_for_deletion(self.cluster).all()
)
return TaskHelper.nodes_to_deploy(self.cluster, force)
def _remove_obsolete_tasks(self):
cluster_tasks = objects.TaskCollection.get_cluster_tasks(
cluster_id=self.cluster.id)
@ -169,7 +191,7 @@ class ApplyChangesTaskManager(TaskManager, DeploymentCheckMixin):
db().flush()
def execute(self, nodes_to_provision_deploy=None, deployment_tasks=None,
force=False, graph_type=None):
force=False, graph_type=None, **kwargs):
logger.info(
u"Trying to start deployment at cluster '{0}'".format(
self.cluster.name or self.cluster.id
@ -188,9 +210,9 @@ class ApplyChangesTaskManager(TaskManager, DeploymentCheckMixin):
TaskHelper.nodes_to_deploy(self.cluster, force)
nodes_to_provision = TaskHelper.nodes_to_provision(self.cluster)
if not any([nodes_to_provision, nodes_to_deploy, nodes_to_delete]):
db().rollback()
raise errors.WrongNodeStatus("No changes to deploy")
self.ensure_nodes_changed(
nodes_to_provision, nodes_to_deploy, nodes_to_delete
)
db().flush()
TaskHelper.create_action_log(supertask)
@ -212,9 +234,7 @@ class ApplyChangesTaskManager(TaskManager, DeploymentCheckMixin):
return supertask
def _execute_async(self, supertask_id, deployment_tasks=None,
nodes_to_provision_deploy=None, force=False,
graph_type=None):
def _execute_async(self, supertask_id, **kwargs):
"""Function for execute task in the mule
:param supertask_id: id of parent task
@ -225,12 +245,7 @@ class ApplyChangesTaskManager(TaskManager, DeploymentCheckMixin):
supertask = objects.Task.get_by_uid(supertask_id)
try:
self._execute_async_content(
supertask,
deployment_tasks=deployment_tasks,
nodes_to_provision_deploy=nodes_to_provision_deploy,
force=force,
graph_type=graph_type)
self._execute_async_content(supertask, **kwargs)
except Exception as e:
logger.exception('Error occurred when running task')
data = {
@ -273,7 +288,7 @@ class ApplyChangesTaskManager(TaskManager, DeploymentCheckMixin):
n.needs_reprovision]),
nodes_to_deploy)
else:
nodes_to_deploy = TaskHelper.nodes_to_deploy(self.cluster, force)
nodes_to_deploy = self.get_nodes_to_deploy(force=force)
nodes_to_provision = TaskHelper.nodes_to_provision(self.cluster)
nodes_to_delete = TaskHelper.nodes_to_delete(self.cluster)
@ -346,7 +361,8 @@ class ApplyChangesTaskManager(TaskManager, DeploymentCheckMixin):
task_messages.append(provision_message)
deployment_message = None
if nodes_to_deploy or affected_nodes:
if (nodes_to_deploy or affected_nodes or
objects.Release.is_lcm_supported(self.cluster.release)):
if nodes_to_deploy:
logger.debug("There are nodes to deploy: %s",
" ".join((objects.Node.get_node_fqdn(n)
@ -365,13 +381,14 @@ class ApplyChangesTaskManager(TaskManager, DeploymentCheckMixin):
db().commit()
deployment_message = self._call_silently(
task_deployment,
tasks.DeploymentTask,
self.get_deployment_task(),
nodes_to_deploy,
affected_nodes=affected_nodes,
deployment_tasks=deployment_tasks,
method_name='message',
reexecutable_filter=consts.TASKS_TO_RERUN_ON_DEPLOY_CHANGES,
graph_type=graph_type
graph_type=graph_type,
force=force
)
db().commit()
@ -396,7 +413,8 @@ class ApplyChangesTaskManager(TaskManager, DeploymentCheckMixin):
# nodes.yaml and /etc/hosts on all slaves. Since we need only
# those two tasks, let's create stripped version of
# deployment.
if nodes_to_delete and not nodes_to_deploy:
if (nodes_to_delete and not nodes_to_deploy and
not objects.Release.is_lcm_supported(self.cluster.release)):
logger.debug(
"No nodes to deploy, just update nodes.yaml everywhere.")
@ -525,13 +543,6 @@ class ApplyChangesTaskManager(TaskManager, DeploymentCheckMixin):
db().flush()
class ApplyChangesForceTaskManager(ApplyChangesTaskManager):
def execute(self, **kwargs):
kwargs['force'] = True
return super(ApplyChangesForceTaskManager, self).execute(**kwargs)
class SpawnVMsTaskManager(ApplyChangesTaskManager):
deployment_type = consts.TASK_NAMES.spawn_vms
@ -542,7 +553,7 @@ class SpawnVMsTaskManager(ApplyChangesTaskManager):
class ProvisioningTaskManager(TaskManager):
def execute(self, nodes_to_provision):
def execute(self, nodes_to_provision, **kwargs):
"""Run provisioning task on specified nodes."""
# locking nodes
nodes_ids = [node.id for node in nodes_to_provision]
@ -596,8 +607,13 @@ class ProvisioningTaskManager(TaskManager):
class DeploymentTaskManager(TaskManager):
def get_deployment_task(self):
if objects.Release.is_lcm_supported(self.cluster.release):
return tasks.ClusterTransaction
return tasks.DeploymentTask
def execute(self, nodes_to_deployment, deployment_tasks=None,
graph_type=None):
graph_type=None, force=False):
deployment_tasks = deployment_tasks or []
logger.debug('Nodes to deploy: {0}'.format(
@ -611,11 +627,12 @@ class DeploymentTaskManager(TaskManager):
deployment_message = self._call_silently(
task_deployment,
tasks.DeploymentTask,
self.get_deployment_task(),
nodes_to_deployment,
deployment_tasks=deployment_tasks,
method_name='message',
graph_type=graph_type)
graph_type=graph_type,
force=force)
db().refresh(task_deployment)
@ -643,7 +660,7 @@ class DeploymentTaskManager(TaskManager):
class StopDeploymentTaskManager(TaskManager):
def execute(self):
def execute(self, **kwargs):
stop_running = objects.TaskCollection.filter_by(
None,
cluster_id=self.cluster.id,
@ -727,7 +744,7 @@ class StopDeploymentTaskManager(TaskManager):
class ResetEnvironmentTaskManager(TaskManager):
def execute(self):
def execute(self, **kwargs):
# FIXME(aroma): remove updating of 'deployed_before'
# when stop action is reworked. 'deployed_before'
@ -797,7 +814,7 @@ class ResetEnvironmentTaskManager(TaskManager):
class CheckNetworksTaskManager(TaskManager):
def execute(self, data, check_all_parameters=False):
def execute(self, data, check_all_parameters=False, **kwargs):
# Make a copy of original 'data' due to being changed by
# 'tasks.CheckNetworksTask'
data_copy = copy.deepcopy(data)
@ -890,7 +907,7 @@ class VerifyNetworksTaskManager(TaskManager):
db().delete(ver_task)
db().flush()
def execute(self, nets, vlan_ids):
def execute(self, nets, vlan_ids, **kwargs):
self.remove_previous_task()
task = Task(
@ -997,7 +1014,7 @@ class VerifyNetworksTaskManager(TaskManager):
class ClusterDeletionManager(TaskManager):
def execute(self):
def execute(self, **kwargs):
current_tasks = objects.TaskCollection.get_cluster_tasks(
self.cluster.id, names=(consts.TASK_NAMES.cluster_deletion,)
)
@ -1071,7 +1088,7 @@ class ClusterDeletionManager(TaskManager):
class DumpTaskManager(TaskManager):
def execute(self, conf=None):
def execute(self, conf=None, **kwargs):
logger.info("Trying to start dump_environment task")
self.check_running_task(consts.TASK_NAMES.dump)
@ -1088,7 +1105,7 @@ class DumpTaskManager(TaskManager):
class GenerateCapacityLogTaskManager(TaskManager):
def execute(self):
def execute(self, **kwargs):
logger.info("Trying to start capacity_log task")
self.check_running_task(consts.TASK_NAMES.capacity_log)
@ -1125,7 +1142,7 @@ class NodeDeletionTaskManager(TaskManager, DeploymentCheckMixin):
[node.id for node in invalid_nodes], cluster_id)
)
def execute(self, nodes_to_delete, mclient_remove=True):
def execute(self, nodes_to_delete, mclient_remove=True, **kwargs):
cluster = None
if hasattr(self, 'cluster'):
cluster = self.cluster
@ -1221,7 +1238,7 @@ class BaseStatsUserTaskManager(TaskManager):
task_cls = None
def execute(self):
def execute(self, **kwargs):
logger.info("Trying to execute %s in the operational "
"environments", self.task_name)
created_tasks = []
@ -1280,7 +1297,7 @@ class RemoveStatsUserTaskManager(BaseStatsUserTaskManager):
class UpdateDnsmasqTaskManager(TaskManager):
def execute(self):
def execute(self, **kwargs):
logger.info("Starting update_dnsmasq task")
self.check_running_task(consts.TASK_NAMES.update_dnsmasq)
@ -1296,7 +1313,12 @@ class UpdateDnsmasqTaskManager(TaskManager):
class OpenstackConfigTaskManager(TaskManager):
def execute(self, filters):
def get_deployment_task(self):
if objects.Release.is_lcm_supported(self.cluster.release):
return tasks.ClusterTransaction
return tasks.UpdateOpenstackConfigTask
def execute(self, filters, force=False, **kwargs):
self.check_running_task(consts.TASK_NAMES.deployment)
task = Task(name=consts.TASK_NAMES.deployment,
@ -1308,8 +1330,13 @@ class OpenstackConfigTaskManager(TaskManager):
self.cluster, filters.get('node_ids'), filters.get('node_role'))
message = self._call_silently(
task, tasks.UpdateOpenstackConfigTask,
self.cluster, nodes_to_update, method_name='message')
task,
self.get_deployment_task(),
nodes_to_update,
method_name='message',
cluster=self.cluster,
force=force
)
# locking task
task = objects.Task.get_by_uid(

View File

@ -16,7 +16,6 @@
import collections
from copy import deepcopy
from distutils.version import StrictVersion
import os
import socket
@ -113,15 +112,13 @@ def fake_cast(queue, messages, **kwargs):
class BaseDeploymentTask(object):
@classmethod
def _get_deployment_methods(cls, cluster):
def get_deployment_methods(cls, cluster):
"""Get deployment method name based on cluster version
:param cluster: Cluster db object
:returns: list of available methods
"""
methods = []
if objects.Release.is_lcm_supported(cluster.release):
methods.append('lcm_transaction')
if objects.Cluster.is_task_deploy_enabled(cluster):
methods.append('task_deploy')
if objects.Release.is_granular_enabled(cluster.release):
@ -131,83 +128,40 @@ class BaseDeploymentTask(object):
return methods
@classmethod
def call_deployment_method(cls, transaction, *args, **kwargs):
def call_deployment_method(cls, transaction, **kwargs):
"""Calls the deployment method with fallback.
:param transaction: the transaction object
:param args: the positional arguments
:param kwargs: the keyword arguments
"""
for name in cls._get_deployment_methods(transaction.cluster):
try:
message_builder = getattr(cls, name)
except AttributeError:
logger.warning(
"%s is not allowed, fallback to next method.", name
)
continue
available_methods = iter(
cls.get_deployment_methods(transaction.cluster)
)
error_messages = []
for method in available_methods:
try:
method, args = message_builder(transaction, *args, **kwargs)
args = getattr(cls, method)(transaction, **kwargs)
# save tasks history
if 'tasks_graph' in args:
logger.info("start saving tasks history.")
logger.info("tasks history saving is started.")
objects.DeploymentHistoryCollection.create(
transaction, args['tasks_graph']
)
logger.info("finish saving tasks history.")
logger.info("tasks history saving is finished.")
return method, args
except errors.TaskBaseDeploymentNotAllowed:
except errors.TaskBaseDeploymentNotAllowed as e:
error_messages.append(six.text_type(e))
logger.warning(
"%s is not allowed, fallback to next method.", name
"%s is not allowed, fallback to next method.", method
)
@classmethod
def lcm_transaction(cls, transaction, tasks, *args, **kwargs):
# TODO(bgaifullin) remove task version check
# after related Library changes will be committed
# if there is at least one task that is supported LCM, use it.
lcm_readiness = StrictVersion(consts.TASK_LCM_READINESS)
lcm_ready = (
any(StrictVersion(t['version']) >= lcm_readiness for t in tasks)
raise errors.TaskBaseDeploymentNotAllowed(
"The task deploy is not allowed because of {0}"
.format(", ".join(error_messages))
)
if not lcm_ready:
raise errors.TaskBaseDeploymentNotAllowed()
# TODO(bgaifullin) always run for all nodes, because deploy-changes
# and run for selected nodes uses same logic,
# and need to differentiate this methods at first
nodes = objects.Cluster.get_nodes_not_for_deletion(transaction.cluster)
logger.info("start serialization of cluster.")
# we should update information for all nodes except deleted
# TODO(bgaifullin) pass role resolver to serializers
deployment_info = deployment_serializers.serialize_for_lcm(
transaction.cluster, nodes
)
logger.info("finish serialization of cluster.")
current_state = objects.Transaction.get_deployment_info(
objects.TransactionCollection.get_last_succeed_run(
transaction.cluster
)
)
expected_state = cls._save_deployment_info(
transaction, deployment_info
)
context = lcm.TransactionContext(expected_state, current_state)
logger.debug("start serialization of tasks.")
# TODO(bgaifullin) Primary roles applied in deployment_serializers
# need to move this code from deployment serializer
# also role resolver should be created after serialization completed
role_resolver = RoleResolver(nodes)
directory, graph = lcm.TransactionSerializer.serialize(
context, tasks, role_resolver
)
logger.info("finish serialization of tasks.")
return 'task_deploy', {
"tasks_directory": directory,
"tasks_graph": graph
}
@classmethod
def _save_deployment_info(cls, transaction, deployment_info):
@ -258,7 +212,7 @@ class DeploymentTask(BaseDeploymentTask):
@classmethod
def message(cls, task, nodes, affected_nodes=None, deployment_tasks=None,
reexecutable_filter=None, graph_type=None):
reexecutable_filter=None, graph_type=None, force=False):
"""Builds RPC message for deployment task.
:param task: the database task object instance
@ -292,8 +246,9 @@ class DeploymentTask(BaseDeploymentTask):
)
deployment_mode, message = cls.call_deployment_method(
task, deployment_tasks, nodes,
affected_nodes, task_ids, reexecutable_filter
task, tasks=deployment_tasks, nodes=nodes,
affected_nodes=affected_nodes, selected_task_ids=task_ids,
events=reexecutable_filter, force=force
)
# After serialization set pending_addition to False
@ -321,8 +276,8 @@ class DeploymentTask(BaseDeploymentTask):
return rpc_message
@classmethod
def granular_deploy(cls, transaction, tasks,
nodes, affected_nodes, selected_task_ids, events):
def granular_deploy(cls, transaction, tasks, nodes,
affected_nodes, selected_task_ids, events, **kwargs):
"""Builds parameters for granular deployment.
:param transaction: the transaction object
@ -332,7 +287,7 @@ class DeploymentTask(BaseDeploymentTask):
:param selected_task_ids: the list of tasks_ids to execute,
if None, all tasks will be executed
:param events: the list of events to find subscribed tasks
:return: RPC method name, the arguments for RPC message
:return: the arguments for RPC message
"""
graph = orchestrator_graph.AstuteGraph(transaction.cluster, tasks)
graph.check()
@ -360,20 +315,17 @@ class DeploymentTask(BaseDeploymentTask):
graph, transaction.cluster, nodes,
role_resolver=role_resolver)
return 'granular_deploy', {
return {
'deployment_info': serialized_cluster,
'pre_deployment': pre_deployment,
'post_deployment': post_deployment
}
@classmethod
def deploy(cls, *args, **kwargs):
args = cls.granular_deploy(*args, **kwargs)[1]
return 'deploy', args
deploy = granular_deploy
@classmethod
def task_deploy(cls, transaction, tasks, nodes, affected_nodes,
selected_task_ids, events):
selected_task_ids, events, **kwargs):
"""Builds parameters for task based deployment.
:param transaction: the transaction object
@ -390,29 +342,71 @@ class DeploymentTask(BaseDeploymentTask):
for task in tasks:
task_processor.ensure_task_based_deploy_allowed(task)
logger.info("start cluster serialization.")
logger.info("cluster serialization is started.")
serialized_cluster = deployment_serializers.serialize(
None, transaction.cluster, nodes
)
cls._save_deployment_info(transaction, serialized_cluster)
logger.debug("finish cluster serialization.")
logger.info("cluster serialization is finished.")
tasks_events = events and \
task_based_deployment.TaskEvents('reexecute_on', events)
logger.info("start tasks serialization.")
logger.debug("tasks serialization is started.")
directory, graph = task_based_deployment.TasksSerializer.serialize(
transaction.cluster, nodes, tasks, affected_nodes,
selected_task_ids, tasks_events
)
logger.info("finish tasks serialization.")
logger.info("tasks serialization is finished.")
return 'task_deploy', {
return {
"deployment_info": serialized_cluster,
"tasks_directory": directory,
"tasks_graph": graph
}
class ClusterTransaction(DeploymentTask):
@classmethod
def get_deployment_methods(cls, cluster):
return ['task_deploy']
@classmethod
def task_deploy(cls, transaction, tasks, nodes, force=False, **kwargs):
logger.info("The cluster transaction is initiated.")
logger.info("cluster serialization is started.")
# we should update information for all nodes except deleted
# TODO(bgaifullin) pass role resolver to serializers
deployment_info = deployment_serializers.serialize_for_lcm(
transaction.cluster, nodes
)
logger.info("cluster serialization is finished.")
if force:
current_state = {}
else:
current_state = objects.Transaction.get_deployment_info(
objects.TransactionCollection.get_last_succeed_run(
transaction.cluster
)
)
expected_state = cls._save_deployment_info(
transaction, deployment_info
)
context = lcm.TransactionContext(expected_state, current_state)
logger.debug("tasks serialization is started.")
# TODO(bgaifullin) Primary roles applied in deployment_serializers
# need to move this code from deployment serializer
# also role resolver should be created after serialization completed
role_resolver = RoleResolver(nodes)
directory, graph = lcm.TransactionSerializer.serialize(
context, tasks, role_resolver
)
logger.info("tasks serialization is finished.")
return {
"tasks_directory": directory,
"tasks_graph": graph
}
class UpdateNodesInfoTask(object):
"""Task for updating nodes.yaml and /etc/hosts on all slaves
@ -2067,12 +2061,12 @@ class UpdateDnsmasqTask(object):
class UpdateOpenstackConfigTask(BaseDeploymentTask):
@staticmethod
def task_deploy(transaction, tasks, nodes, task_ids):
def task_deploy(transaction, nodes, tasks, task_ids):
# TODO(akostrikov) https://bugs.launchpad.net/fuel/+bug/1561485
directory, graph = task_based_deployment.TasksSerializer.serialize(
transaction.cluster, nodes, tasks, task_ids=task_ids
)
return 'task_deploy', make_astute_message(
return make_astute_message(
transaction, "task_deploy", "update_config_resp", {
"tasks_directory": directory,
"tasks_graph": graph
@ -2080,19 +2074,19 @@ class UpdateOpenstackConfigTask(BaseDeploymentTask):
)
@staticmethod
def granular_deploy(transaction, tasks, nodes, task_ids):
def granular_deploy(transaction, nodes, tasks, task_ids):
graph = orchestrator_graph.AstuteGraph(transaction.cluster, tasks)
graph.only_tasks(task_ids)
deployment_tasks = graph.stage_tasks_serialize(
graph.graph.topology, nodes
)
return 'granular_deploy', make_astute_message(
return make_astute_message(
transaction, 'execute_tasks', 'update_config_resp', {
'tasks': deployment_tasks,
})
@classmethod
def message(cls, task, cluster, nodes):
def message(cls, task, nodes, cluster, **kwargs):
configs = objects.OpenstackConfigCollection.find_configs_for_nodes(
cluster, nodes)
updated_configs = set()
@ -2110,7 +2104,7 @@ class UpdateOpenstackConfigTask(BaseDeploymentTask):
task_ids = {t['id'] for t in refreshable_tasks}
deployment_tasks = objects.Cluster.get_deployment_tasks(task.cluster)
return cls.call_deployment_method(
task, deployment_tasks, nodes, task_ids
task, tasks=deployment_tasks, nodes=nodes, task_ids=task_ids
)[1]

View File

@ -953,13 +953,13 @@ class EnvironmentManager(object):
raise Exception(
'Cluster with ID "{0}" was not found.'.format(cluster_id))
def launch_provisioning_selected(self, nodes_uids=None, cluster_id=None):
def _launch_for_selected_nodes(self, handler, nodes_uids, cluster_id):
if self.clusters:
cluster = self._get_cluster_by_id(cluster_id)
if not nodes_uids:
nodes_uids = [n.uid for n in cluster.nodes]
action_url = reverse(
'ProvisionSelectedNodes',
handler,
kwargs={'cluster_id': cluster.id}
) + '?nodes={0}'.format(','.join(nodes_uids))
resp = self.app.put(
@ -981,12 +981,22 @@ class EnvironmentManager(object):
"Nothing to provision - try creating cluster"
)
def launch_deployment(self, cluster_id=None):
def launch_provisioning_selected(self, nodes_uids=None, cluster_id=None):
return self._launch_for_selected_nodes(
'ProvisionSelectedNodes', nodes_uids, cluster_id
)
def launch_deployment_selected(self, nodes_uids=None, cluster_id=None):
return self._launch_for_selected_nodes(
'DeploySelectedNodes', nodes_uids, cluster_id
)
def _launch_for_cluster(self, handler, cluster_id):
if self.clusters:
cluster_id = self._get_cluster_by_id(cluster_id).id
resp = self.app.put(
reverse(
'ClusterChangesHandler',
handler,
kwargs={'cluster_id': cluster_id}),
headers=self.default_headers)
@ -998,6 +1008,14 @@ class EnvironmentManager(object):
"Nothing to deploy - try creating cluster"
)
def launch_deployment(self, cluster_id=None):
return self._launch_for_cluster('ClusterChangesHandler', cluster_id)
def launch_redeployment(self, cluster_id=None):
return self._launch_for_cluster(
'ClusterChangesForceRedeployHandler', cluster_id
)
def stop_deployment(self, cluster_id=None):
if self.clusters:
cluster_id = self._get_cluster_by_id(cluster_id).id
@ -1286,6 +1304,11 @@ class EnvironmentManager(object):
expect_errors=False
)
def set_task_status_recursively(self, supertask, status):
supertask.status = consts.TASK_STATUSES.ready
for sub_task in supertask.subtasks:
self.set_task_status_recursively(sub_task, status)
class BaseUnitTest(TestCase):
def datadiff(self, data1, data2, path=None, ignore_keys=[],

View File

@ -164,7 +164,9 @@ class TestReplacedDeploymentInfoSerialization(OrchestratorSerializerTestBase):
node.replaced_deployment_info = [
{'role': 'controller', 'priority': 'XXX', 'tasks': [], 'uid': '1'}]
self.db.flush()
objects.Cluster.prepare_for_deployment(
self.cluster, self.cluster.nodes
)
serialized_data = self.serializer.serialize(self.cluster, [node])
# verify that task list is not empty
self.assertTrue(serialized_data[0]['tasks'])

View File

@ -127,7 +127,7 @@ class TestTaskManagers(BaseIntegrationTest):
@mock.patch('nailgun.task.task.rpc.cast')
def test_deployment_info_saves_in_transaction(self, _):
self.check_deployment_info_was_saved_in_transaction(
'liberty-9.0', True, True
'mitaka-9.0', True, True
)
self.check_deployment_info_was_saved_in_transaction(
'liberty-8.0', True, False
@ -696,7 +696,13 @@ class TestTaskManagers(BaseIntegrationTest):
@fake_tasks()
def test_no_node_no_cry(self):
cluster = self.env.create_cluster(api=True)
cluster = self.env.create_cluster(
api=True,
release_kwargs={
'operating_system': consts.RELEASE_OS.ubuntu,
'version': 'liberty-8.0'
},
)
cluster_id = cluster['id']
manager_ = manager.ApplyChangesTaskManager(cluster_id)
task = models.Task(name='provision', cluster_id=cluster_id,
@ -763,7 +769,11 @@ class TestTaskManagers(BaseIntegrationTest):
self.env.create(
nodes_kwargs=[
{"status": "ready"}
]
],
release_kwargs={
'operating_system': consts.RELEASE_OS.ubuntu,
'version': 'liberty-8.0'
},
)
cluster_db = self.env.clusters[0]
objects.Cluster.clear_pending_changes(cluster_db)
@ -780,11 +790,15 @@ class TestTaskManagers(BaseIntegrationTest):
cluster_kwargs={
'status': consts.CLUSTER_STATUSES.operational
},
release_kwargs={
'operating_system': consts.RELEASE_OS.ubuntu,
'version': 'liberty-8.0'
},
)
cluster_db = self.env.clusters[0]
objects.Cluster.clear_pending_changes(cluster_db)
manager_ = manager.ApplyChangesForceTaskManager(cluster_db.id)
supertask = manager_.execute()
manager_ = manager.ApplyChangesTaskManager(cluster_db.id)
supertask = manager_.execute(force=True)
self.assertEqual(supertask.name, TASK_NAMES.deploy)
self.assertIn(supertask.status, TASK_STATUSES.pending)
@ -1110,6 +1124,132 @@ class TestTaskManagers(BaseIntegrationTest):
self.assertNotEqual(primary_node.id, new_primary.id)
@mock.patch('nailgun.task.task.rpc.cast')
def test_node_group_deletion_failed_while_previous_in_progress(
self, mocked_rpc
):
self.env.create(
cluster_kwargs={
'net_provider': consts.CLUSTER_NET_PROVIDERS.neutron,
'net_segment_type': consts.NEUTRON_SEGMENT_TYPES.gre},
nodes_kwargs=[
{'api': True,
'pending_addition': True}
]
)
ng1 = self.env.create_node_group(name='ng_1').json_body
ng2 = self.env.create_node_group(name='ng_2').json_body
self.assertEqual(mocked_rpc.call_count, 0)
self.env.delete_node_group(ng1['id'])
self.assertEqual(mocked_rpc.call_count, 1)
# delete other node group
# request should be rejected as previous update_dnsmasq task is still
# in progress
resp = self.env.delete_node_group(ng2['id'], status_code=409)
self.assertEqual(resp.status_code, 409)
self.assertEqual(resp.json_body['message'],
errors.UpdateDnsmasqTaskIsRunning.message)
# no more calls were made
self.assertEqual(mocked_rpc.call_count, 1)
@mock.patch('nailgun.task.task.rpc.cast')
def test_deployment_starts_if_nodes_not_changed(self, rpc_mock):
self.env.create(
release_kwargs={
'operating_system': consts.RELEASE_OS.ubuntu,
'version': 'mitaka-9.0'
},
nodes_kwargs=[
{'status': NODE_STATUSES.ready, 'roles': ['controller']},
{'status': NODE_STATUSES.ready, 'roles': ['compute']},
]
)
cluster = self.env.clusters[-1]
supertask = self.env.launch_deployment(cluster.id)
self.assertNotEqual(consts.TASK_STATUSES.error, supertask.status)
tasks_graph = rpc_mock.call_args[0][1][0]['args']['tasks_graph']
# check that nodes presents in tasks_graph
self.assertItemsEqual(
[n.uid for n in cluster.nodes] + [consts.MASTER_NODE_UID, None],
tasks_graph
)
@mock.patch('nailgun.task.task.rpc.cast')
@mock.patch('nailgun.objects.Cluster.get_deployment_tasks')
def test_redeployment_ignore_conditions(self, tasks_mock, rpc_mock):
tasks_mock.return_value = [
{
"id": "test", "roles": ['master'], "version": "2.0.1",
"type": "puppet", "parameters": {},
"condition": {"yaql_exp": "changed($.nodes)"}
}
]
self.env.create(
release_kwargs={
'operating_system': consts.RELEASE_OS.ubuntu,
'version': 'mitaka-9.0'
}
)
cluster = self.env.clusters[-1]
# deploy cluster at first time
supertask = self.env.launch_deployment(cluster.id)
self.assertNotEqual(consts.TASK_STATUSES.error, supertask.status)
self.env.set_task_status_recursively(
supertask, consts.TASK_STATUSES.ready
)
self.db.flush()
tasks_graph = rpc_mock.call_args[0][1][0]['args']['tasks_graph']
self.assertEqual('puppet', tasks_graph['master'][0]['type'])
# launch cluster deployment again, because there is no changes
# the task should be skipped
supertask = self.env.launch_deployment(cluster.id)
self.assertNotEqual(consts.TASK_STATUSES.error, supertask.status)
self.env.set_task_status_recursively(
supertask, consts.TASK_STATUSES.ready
)
self.db.flush()
tasks_graph = rpc_mock.call_args[0][1][0]['args']['tasks_graph']
self.assertEqual('skipped', tasks_graph['master'][0]['type'])
supertask.status = consts.TASK_STATUSES.ready
self.db.flush()
# force cluster re-deployment, the task should not be skipped
supertask = self.env.launch_redeployment(cluster.id)
self.assertNotEqual(consts.TASK_STATUSES.error, supertask.status)
tasks_graph = rpc_mock.call_args[0][1][0]['args']['tasks_graph']
self.assertEqual('puppet', tasks_graph['master'][0]['type'])
@mock.patch('nailgun.rpc.cast')
def test_deploy_part_of_pending_addition_nodes(self, rpc_mock):
self.env.create(
release_kwargs={
'operating_system': consts.RELEASE_OS.ubuntu,
'version': 'mitaka-9.0'
},
nodes_kwargs=[
{'status': NODE_STATUSES.provisioned, 'roles': ['controller']},
{'status': NODE_STATUSES.provisioned, 'roles': ['compute']},
]
)
cluster = self.env.clusters[-1]
nodes_uids = [n.uid for n in cluster.nodes]
node3 = self.env.create_node(
api=False, cluster_id=cluster.id,
roles=["compute"],
pending_addition=True
)
t = self.env.launch_deployment_selected(nodes_uids, cluster.id)
self.assertNotEqual(consts.TASK_STATUSES.error, t.status)
self.db.refresh(node3)
self.assertEqual(consts.NODE_STATUSES.discover, node3.status)
self.assertTrue(node3.pending_addition)
tasks_graph = rpc_mock.call_args[0][1]['args']['tasks_graph']
self.assertItemsEqual(
[consts.MASTER_NODE_UID, None] + nodes_uids, tasks_graph
)
class TestUpdateDnsmasqTaskManagers(BaseIntegrationTest):
@ -1290,22 +1430,3 @@ class TestUpdateDnsmasqTaskManagers(BaseIntegrationTest):
update_task = self.db.query(models.Task).filter_by(
name=consts.TASK_NAMES.update_dnsmasq).first()
self.assertEqual(update_task.status, consts.TASK_STATUSES.running)
@mock.patch('nailgun.task.task.rpc.cast')
def test_node_group_deletion_failed_while_previous_in_progress(self,
mocked_rpc):
ng1 = self.env.create_node_group(name='ng_1').json_body
ng2 = self.env.create_node_group(name='ng_2').json_body
self.assertEqual(mocked_rpc.call_count, 0)
self.env.delete_node_group(ng1['id'])
self.assertEqual(mocked_rpc.call_count, 1)
# delete other node group
# request should be rejected as previous update_dnsmasq task is still
# in progress
resp = self.env.delete_node_group(ng2['id'], status_code=409)
self.assertEqual(resp.status_code, 409)
self.assertEqual(resp.json_body['message'],
errors.UpdateDnsmasqTaskIsRunning.message)
# no more calls were made
self.assertEqual(mocked_rpc.call_count, 1)

View File

@ -478,3 +478,17 @@ class TestOpenstackConfigHandlers(BaseIntegrationTest):
return '{0}?{1}'.format(
reverse('OpenstackConfigCollectionHandler'),
urlparse.urlencode(kwargs))
@mock.patch('nailgun.task.task.rpc.cast', mock.MagicMock())
@mock.patch("nailgun.objects.Release.is_lcm_supported")
@mock.patch("nailgun.task.task.ClusterTransaction.message")
def test_openstack_config_call_apply_changes_if_lcm(
self, message_mock, lcm_supported_mock):
lcm_supported_mock.return_value = True
message_mock.return_value = {
'method': 'task_deploy', 'args': {
'tasks_graph': {'master': []}
}
}
self.execute_update_open_stack_config()
self.assertEqual(1, message_mock.call_count)