Merge "Reworked ApplyChanges for LCM"
This commit is contained in:
commit
4922a9820f
|
@ -537,6 +537,10 @@ class DeferredTaskHandler(BaseHandler):
|
|||
u"on environment '{env_id}': {error}"
|
||||
task_manager = None
|
||||
|
||||
@classmethod
|
||||
def get_options(cls):
|
||||
return {}
|
||||
|
||||
@content
|
||||
def PUT(self, cluster_id):
|
||||
""":returns: JSONized Task object.
|
||||
|
@ -558,10 +562,15 @@ class DeferredTaskHandler(BaseHandler):
|
|||
|
||||
logger.info(self.log_message.format(env_id=cluster_id))
|
||||
|
||||
try:
|
||||
options = self.get_options()
|
||||
except ValueError as e:
|
||||
raise self.http(400, six.text_type(e))
|
||||
|
||||
try:
|
||||
self.validator.validate(cluster)
|
||||
task_manager = self.task_manager(cluster_id=cluster.id)
|
||||
task = task_manager.execute()
|
||||
task = task_manager.execute(**options)
|
||||
except (
|
||||
errors.AlreadyExists,
|
||||
errors.StopAlreadyRunning
|
||||
|
|
|
@ -41,8 +41,8 @@ from nailgun.api.v1.validators.cluster import VmwareAttributesValidator
|
|||
from nailgun.errors import errors
|
||||
from nailgun.logger import logger
|
||||
from nailgun import objects
|
||||
from nailgun import utils
|
||||
|
||||
from nailgun.task.manager import ApplyChangesForceTaskManager
|
||||
from nailgun.task.manager import ApplyChangesTaskManager
|
||||
from nailgun.task.manager import ClusterDeletionManager
|
||||
from nailgun.task.manager import ResetEnvironmentTaskManager
|
||||
|
@ -119,15 +119,31 @@ class ClusterChangesHandler(DeferredTaskHandler):
|
|||
task_manager = ApplyChangesTaskManager
|
||||
validator = ClusterChangesValidator
|
||||
|
||||
@classmethod
|
||||
def get_options(cls):
|
||||
data = web.input(graph_type=None)
|
||||
return {
|
||||
'graph_type': data.graph_type,
|
||||
'force': False
|
||||
}
|
||||
|
||||
|
||||
class ClusterChangesForceRedeployHandler(DeferredTaskHandler):
|
||||
|
||||
log_message = u"Trying to force deployment of the environment '{env_id}'"
|
||||
log_error = u"Error during execution of a forced deployment task " \
|
||||
u"on environment '{env_id}': {error}"
|
||||
task_manager = ApplyChangesForceTaskManager
|
||||
task_manager = ApplyChangesTaskManager
|
||||
validator = ClusterChangesValidator
|
||||
|
||||
@classmethod
|
||||
def get_options(cls):
|
||||
data = web.input(graph_type=None)
|
||||
return {
|
||||
'graph_type': data.graph_type,
|
||||
'force': True
|
||||
}
|
||||
|
||||
|
||||
class ClusterStopDeploymentHandler(DeferredTaskHandler):
|
||||
|
||||
|
@ -202,7 +218,7 @@ class ClusterAttributesHandler(BaseHandler):
|
|||
if not cluster.attributes:
|
||||
raise self.http(500, "No attributes found!")
|
||||
|
||||
force = web.input(force=None).force not in (None, '', '0')
|
||||
force = utils.parse_bool(web.input(force='0').force)
|
||||
|
||||
data = self.checked_data(cluster=cluster, force=force)
|
||||
objects.Cluster.patch_attributes(cluster, data)
|
||||
|
|
|
@ -22,8 +22,8 @@ from nailgun.api.v1.handlers.base import content
|
|||
from nailgun.api.v1.validators.task import TaskValidator
|
||||
|
||||
from nailgun.errors import errors
|
||||
|
||||
from nailgun import objects
|
||||
from nailgun import utils
|
||||
|
||||
|
||||
"""
|
||||
|
@ -50,7 +50,7 @@ class TaskHandler(SingleHandler):
|
|||
obj_id
|
||||
)
|
||||
|
||||
force = web.input(force=None).force not in (None, u'', u'0')
|
||||
force = utils.parse_bool(web.input(force='0').force)
|
||||
|
||||
try:
|
||||
self.validator.validate_delete(None, obj, force=force)
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
"""Deployment serializers for orchestrator"""
|
||||
|
||||
from copy import deepcopy
|
||||
import itertools
|
||||
|
||||
import six
|
||||
|
||||
|
@ -94,23 +93,27 @@ class DeploymentMultinodeSerializer(object):
|
|||
|
||||
def serialize(self, cluster, nodes, ignore_customized=False):
|
||||
"""Method generates facts which are passed to puppet."""
|
||||
def is_customized(node):
|
||||
return bool(node.replaced_deployment_info)
|
||||
|
||||
try:
|
||||
self.initialize(cluster)
|
||||
serialized_nodes = []
|
||||
nodes = sorted(nodes, key=is_customized)
|
||||
node_groups = itertools.groupby(nodes, is_customized)
|
||||
for customized, node_group in node_groups:
|
||||
if customized and not ignore_customized:
|
||||
serialized_nodes.extend(
|
||||
self.serialize_customized(cluster, node_group)
|
||||
)
|
||||
else:
|
||||
serialized_nodes.extend(
|
||||
self.serialize_generated(cluster, node_group)
|
||||
)
|
||||
|
||||
origin_nodes = []
|
||||
customized_nodes = []
|
||||
if ignore_customized:
|
||||
origin_nodes = nodes
|
||||
else:
|
||||
for node in nodes:
|
||||
if node.replaced_deployment_info:
|
||||
customized_nodes.append(node)
|
||||
else:
|
||||
origin_nodes.append(node)
|
||||
|
||||
serialized_nodes.extend(
|
||||
self.serialize_generated(cluster, origin_nodes)
|
||||
)
|
||||
serialized_nodes.extend(
|
||||
self.serialize_customized(cluster, customized_nodes)
|
||||
)
|
||||
|
||||
# NOTE(dshulyak) tasks should not be preserved from replaced
|
||||
# deployment info, there is different mechanism to control
|
||||
|
|
|
@ -137,6 +137,28 @@ class ApplyChangesTaskManager(TaskManager, DeploymentCheckMixin):
|
|||
|
||||
deployment_type = consts.TASK_NAMES.deploy
|
||||
|
||||
def get_deployment_task(self):
|
||||
if objects.Release.is_lcm_supported(self.cluster.release):
|
||||
return tasks.ClusterTransaction
|
||||
return tasks.DeploymentTask
|
||||
|
||||
def ensure_nodes_changed(
|
||||
self, nodes_to_provision, nodes_to_deploy, nodes_to_delete
|
||||
):
|
||||
if objects.Release.is_lcm_supported(self.cluster.release):
|
||||
return
|
||||
|
||||
if not any([nodes_to_provision, nodes_to_deploy, nodes_to_delete]):
|
||||
db().rollback()
|
||||
raise errors.WrongNodeStatus("No changes to deploy")
|
||||
|
||||
def get_nodes_to_deploy(self, force=False):
|
||||
if objects.Release.is_lcm_supported(self.cluster.release):
|
||||
return list(
|
||||
objects.Cluster.get_nodes_not_for_deletion(self.cluster).all()
|
||||
)
|
||||
return TaskHelper.nodes_to_deploy(self.cluster, force)
|
||||
|
||||
def _remove_obsolete_tasks(self):
|
||||
cluster_tasks = objects.TaskCollection.get_cluster_tasks(
|
||||
cluster_id=self.cluster.id)
|
||||
|
@ -169,7 +191,7 @@ class ApplyChangesTaskManager(TaskManager, DeploymentCheckMixin):
|
|||
db().flush()
|
||||
|
||||
def execute(self, nodes_to_provision_deploy=None, deployment_tasks=None,
|
||||
force=False, graph_type=None):
|
||||
force=False, graph_type=None, **kwargs):
|
||||
logger.info(
|
||||
u"Trying to start deployment at cluster '{0}'".format(
|
||||
self.cluster.name or self.cluster.id
|
||||
|
@ -188,9 +210,9 @@ class ApplyChangesTaskManager(TaskManager, DeploymentCheckMixin):
|
|||
TaskHelper.nodes_to_deploy(self.cluster, force)
|
||||
nodes_to_provision = TaskHelper.nodes_to_provision(self.cluster)
|
||||
|
||||
if not any([nodes_to_provision, nodes_to_deploy, nodes_to_delete]):
|
||||
db().rollback()
|
||||
raise errors.WrongNodeStatus("No changes to deploy")
|
||||
self.ensure_nodes_changed(
|
||||
nodes_to_provision, nodes_to_deploy, nodes_to_delete
|
||||
)
|
||||
|
||||
db().flush()
|
||||
TaskHelper.create_action_log(supertask)
|
||||
|
@ -212,9 +234,7 @@ class ApplyChangesTaskManager(TaskManager, DeploymentCheckMixin):
|
|||
|
||||
return supertask
|
||||
|
||||
def _execute_async(self, supertask_id, deployment_tasks=None,
|
||||
nodes_to_provision_deploy=None, force=False,
|
||||
graph_type=None):
|
||||
def _execute_async(self, supertask_id, **kwargs):
|
||||
"""Function for execute task in the mule
|
||||
|
||||
:param supertask_id: id of parent task
|
||||
|
@ -225,12 +245,7 @@ class ApplyChangesTaskManager(TaskManager, DeploymentCheckMixin):
|
|||
supertask = objects.Task.get_by_uid(supertask_id)
|
||||
|
||||
try:
|
||||
self._execute_async_content(
|
||||
supertask,
|
||||
deployment_tasks=deployment_tasks,
|
||||
nodes_to_provision_deploy=nodes_to_provision_deploy,
|
||||
force=force,
|
||||
graph_type=graph_type)
|
||||
self._execute_async_content(supertask, **kwargs)
|
||||
except Exception as e:
|
||||
logger.exception('Error occurred when running task')
|
||||
data = {
|
||||
|
@ -273,7 +288,7 @@ class ApplyChangesTaskManager(TaskManager, DeploymentCheckMixin):
|
|||
n.needs_reprovision]),
|
||||
nodes_to_deploy)
|
||||
else:
|
||||
nodes_to_deploy = TaskHelper.nodes_to_deploy(self.cluster, force)
|
||||
nodes_to_deploy = self.get_nodes_to_deploy(force=force)
|
||||
nodes_to_provision = TaskHelper.nodes_to_provision(self.cluster)
|
||||
nodes_to_delete = TaskHelper.nodes_to_delete(self.cluster)
|
||||
|
||||
|
@ -346,7 +361,8 @@ class ApplyChangesTaskManager(TaskManager, DeploymentCheckMixin):
|
|||
task_messages.append(provision_message)
|
||||
|
||||
deployment_message = None
|
||||
if nodes_to_deploy or affected_nodes:
|
||||
if (nodes_to_deploy or affected_nodes or
|
||||
objects.Release.is_lcm_supported(self.cluster.release)):
|
||||
if nodes_to_deploy:
|
||||
logger.debug("There are nodes to deploy: %s",
|
||||
" ".join((objects.Node.get_node_fqdn(n)
|
||||
|
@ -365,13 +381,14 @@ class ApplyChangesTaskManager(TaskManager, DeploymentCheckMixin):
|
|||
db().commit()
|
||||
deployment_message = self._call_silently(
|
||||
task_deployment,
|
||||
tasks.DeploymentTask,
|
||||
self.get_deployment_task(),
|
||||
nodes_to_deploy,
|
||||
affected_nodes=affected_nodes,
|
||||
deployment_tasks=deployment_tasks,
|
||||
method_name='message',
|
||||
reexecutable_filter=consts.TASKS_TO_RERUN_ON_DEPLOY_CHANGES,
|
||||
graph_type=graph_type
|
||||
graph_type=graph_type,
|
||||
force=force
|
||||
)
|
||||
|
||||
db().commit()
|
||||
|
@ -396,7 +413,8 @@ class ApplyChangesTaskManager(TaskManager, DeploymentCheckMixin):
|
|||
# nodes.yaml and /etc/hosts on all slaves. Since we need only
|
||||
# those two tasks, let's create stripped version of
|
||||
# deployment.
|
||||
if nodes_to_delete and not nodes_to_deploy:
|
||||
if (nodes_to_delete and not nodes_to_deploy and
|
||||
not objects.Release.is_lcm_supported(self.cluster.release)):
|
||||
logger.debug(
|
||||
"No nodes to deploy, just update nodes.yaml everywhere.")
|
||||
|
||||
|
@ -525,13 +543,6 @@ class ApplyChangesTaskManager(TaskManager, DeploymentCheckMixin):
|
|||
db().flush()
|
||||
|
||||
|
||||
class ApplyChangesForceTaskManager(ApplyChangesTaskManager):
|
||||
|
||||
def execute(self, **kwargs):
|
||||
kwargs['force'] = True
|
||||
return super(ApplyChangesForceTaskManager, self).execute(**kwargs)
|
||||
|
||||
|
||||
class SpawnVMsTaskManager(ApplyChangesTaskManager):
|
||||
|
||||
deployment_type = consts.TASK_NAMES.spawn_vms
|
||||
|
@ -542,7 +553,7 @@ class SpawnVMsTaskManager(ApplyChangesTaskManager):
|
|||
|
||||
class ProvisioningTaskManager(TaskManager):
|
||||
|
||||
def execute(self, nodes_to_provision):
|
||||
def execute(self, nodes_to_provision, **kwargs):
|
||||
"""Run provisioning task on specified nodes."""
|
||||
# locking nodes
|
||||
nodes_ids = [node.id for node in nodes_to_provision]
|
||||
|
@ -596,8 +607,13 @@ class ProvisioningTaskManager(TaskManager):
|
|||
|
||||
class DeploymentTaskManager(TaskManager):
|
||||
|
||||
def get_deployment_task(self):
|
||||
if objects.Release.is_lcm_supported(self.cluster.release):
|
||||
return tasks.ClusterTransaction
|
||||
return tasks.DeploymentTask
|
||||
|
||||
def execute(self, nodes_to_deployment, deployment_tasks=None,
|
||||
graph_type=None):
|
||||
graph_type=None, force=False):
|
||||
deployment_tasks = deployment_tasks or []
|
||||
|
||||
logger.debug('Nodes to deploy: {0}'.format(
|
||||
|
@ -611,11 +627,12 @@ class DeploymentTaskManager(TaskManager):
|
|||
|
||||
deployment_message = self._call_silently(
|
||||
task_deployment,
|
||||
tasks.DeploymentTask,
|
||||
self.get_deployment_task(),
|
||||
nodes_to_deployment,
|
||||
deployment_tasks=deployment_tasks,
|
||||
method_name='message',
|
||||
graph_type=graph_type)
|
||||
graph_type=graph_type,
|
||||
force=force)
|
||||
|
||||
db().refresh(task_deployment)
|
||||
|
||||
|
@ -643,7 +660,7 @@ class DeploymentTaskManager(TaskManager):
|
|||
|
||||
class StopDeploymentTaskManager(TaskManager):
|
||||
|
||||
def execute(self):
|
||||
def execute(self, **kwargs):
|
||||
stop_running = objects.TaskCollection.filter_by(
|
||||
None,
|
||||
cluster_id=self.cluster.id,
|
||||
|
@ -727,7 +744,7 @@ class StopDeploymentTaskManager(TaskManager):
|
|||
|
||||
class ResetEnvironmentTaskManager(TaskManager):
|
||||
|
||||
def execute(self):
|
||||
def execute(self, **kwargs):
|
||||
|
||||
# FIXME(aroma): remove updating of 'deployed_before'
|
||||
# when stop action is reworked. 'deployed_before'
|
||||
|
@ -797,7 +814,7 @@ class ResetEnvironmentTaskManager(TaskManager):
|
|||
|
||||
class CheckNetworksTaskManager(TaskManager):
|
||||
|
||||
def execute(self, data, check_all_parameters=False):
|
||||
def execute(self, data, check_all_parameters=False, **kwargs):
|
||||
# Make a copy of original 'data' due to being changed by
|
||||
# 'tasks.CheckNetworksTask'
|
||||
data_copy = copy.deepcopy(data)
|
||||
|
@ -890,7 +907,7 @@ class VerifyNetworksTaskManager(TaskManager):
|
|||
db().delete(ver_task)
|
||||
db().flush()
|
||||
|
||||
def execute(self, nets, vlan_ids):
|
||||
def execute(self, nets, vlan_ids, **kwargs):
|
||||
self.remove_previous_task()
|
||||
|
||||
task = Task(
|
||||
|
@ -997,7 +1014,7 @@ class VerifyNetworksTaskManager(TaskManager):
|
|||
|
||||
class ClusterDeletionManager(TaskManager):
|
||||
|
||||
def execute(self):
|
||||
def execute(self, **kwargs):
|
||||
current_tasks = objects.TaskCollection.get_cluster_tasks(
|
||||
self.cluster.id, names=(consts.TASK_NAMES.cluster_deletion,)
|
||||
)
|
||||
|
@ -1071,7 +1088,7 @@ class ClusterDeletionManager(TaskManager):
|
|||
|
||||
class DumpTaskManager(TaskManager):
|
||||
|
||||
def execute(self, conf=None):
|
||||
def execute(self, conf=None, **kwargs):
|
||||
logger.info("Trying to start dump_environment task")
|
||||
self.check_running_task(consts.TASK_NAMES.dump)
|
||||
|
||||
|
@ -1088,7 +1105,7 @@ class DumpTaskManager(TaskManager):
|
|||
|
||||
class GenerateCapacityLogTaskManager(TaskManager):
|
||||
|
||||
def execute(self):
|
||||
def execute(self, **kwargs):
|
||||
logger.info("Trying to start capacity_log task")
|
||||
self.check_running_task(consts.TASK_NAMES.capacity_log)
|
||||
|
||||
|
@ -1125,7 +1142,7 @@ class NodeDeletionTaskManager(TaskManager, DeploymentCheckMixin):
|
|||
[node.id for node in invalid_nodes], cluster_id)
|
||||
)
|
||||
|
||||
def execute(self, nodes_to_delete, mclient_remove=True):
|
||||
def execute(self, nodes_to_delete, mclient_remove=True, **kwargs):
|
||||
cluster = None
|
||||
if hasattr(self, 'cluster'):
|
||||
cluster = self.cluster
|
||||
|
@ -1221,7 +1238,7 @@ class BaseStatsUserTaskManager(TaskManager):
|
|||
|
||||
task_cls = None
|
||||
|
||||
def execute(self):
|
||||
def execute(self, **kwargs):
|
||||
logger.info("Trying to execute %s in the operational "
|
||||
"environments", self.task_name)
|
||||
created_tasks = []
|
||||
|
@ -1280,7 +1297,7 @@ class RemoveStatsUserTaskManager(BaseStatsUserTaskManager):
|
|||
|
||||
class UpdateDnsmasqTaskManager(TaskManager):
|
||||
|
||||
def execute(self):
|
||||
def execute(self, **kwargs):
|
||||
logger.info("Starting update_dnsmasq task")
|
||||
self.check_running_task(consts.TASK_NAMES.update_dnsmasq)
|
||||
|
||||
|
@ -1296,7 +1313,12 @@ class UpdateDnsmasqTaskManager(TaskManager):
|
|||
|
||||
class OpenstackConfigTaskManager(TaskManager):
|
||||
|
||||
def execute(self, filters):
|
||||
def get_deployment_task(self):
|
||||
if objects.Release.is_lcm_supported(self.cluster.release):
|
||||
return tasks.ClusterTransaction
|
||||
return tasks.UpdateOpenstackConfigTask
|
||||
|
||||
def execute(self, filters, force=False, **kwargs):
|
||||
self.check_running_task(consts.TASK_NAMES.deployment)
|
||||
|
||||
task = Task(name=consts.TASK_NAMES.deployment,
|
||||
|
@ -1308,8 +1330,13 @@ class OpenstackConfigTaskManager(TaskManager):
|
|||
self.cluster, filters.get('node_ids'), filters.get('node_role'))
|
||||
|
||||
message = self._call_silently(
|
||||
task, tasks.UpdateOpenstackConfigTask,
|
||||
self.cluster, nodes_to_update, method_name='message')
|
||||
task,
|
||||
self.get_deployment_task(),
|
||||
nodes_to_update,
|
||||
method_name='message',
|
||||
cluster=self.cluster,
|
||||
force=force
|
||||
)
|
||||
|
||||
# locking task
|
||||
task = objects.Task.get_by_uid(
|
||||
|
|
|
@ -16,7 +16,6 @@
|
|||
|
||||
import collections
|
||||
from copy import deepcopy
|
||||
from distutils.version import StrictVersion
|
||||
import os
|
||||
import socket
|
||||
|
||||
|
@ -113,15 +112,13 @@ def fake_cast(queue, messages, **kwargs):
|
|||
|
||||
class BaseDeploymentTask(object):
|
||||
@classmethod
|
||||
def _get_deployment_methods(cls, cluster):
|
||||
def get_deployment_methods(cls, cluster):
|
||||
"""Get deployment method name based on cluster version
|
||||
|
||||
:param cluster: Cluster db object
|
||||
:returns: list of available methods
|
||||
"""
|
||||
methods = []
|
||||
if objects.Release.is_lcm_supported(cluster.release):
|
||||
methods.append('lcm_transaction')
|
||||
if objects.Cluster.is_task_deploy_enabled(cluster):
|
||||
methods.append('task_deploy')
|
||||
if objects.Release.is_granular_enabled(cluster.release):
|
||||
|
@ -131,83 +128,40 @@ class BaseDeploymentTask(object):
|
|||
return methods
|
||||
|
||||
@classmethod
|
||||
def call_deployment_method(cls, transaction, *args, **kwargs):
|
||||
def call_deployment_method(cls, transaction, **kwargs):
|
||||
"""Calls the deployment method with fallback.
|
||||
|
||||
:param transaction: the transaction object
|
||||
:param args: the positional arguments
|
||||
:param kwargs: the keyword arguments
|
||||
"""
|
||||
for name in cls._get_deployment_methods(transaction.cluster):
|
||||
try:
|
||||
message_builder = getattr(cls, name)
|
||||
except AttributeError:
|
||||
logger.warning(
|
||||
"%s is not allowed, fallback to next method.", name
|
||||
)
|
||||
continue
|
||||
|
||||
available_methods = iter(
|
||||
cls.get_deployment_methods(transaction.cluster)
|
||||
)
|
||||
|
||||
error_messages = []
|
||||
|
||||
for method in available_methods:
|
||||
try:
|
||||
method, args = message_builder(transaction, *args, **kwargs)
|
||||
args = getattr(cls, method)(transaction, **kwargs)
|
||||
# save tasks history
|
||||
if 'tasks_graph' in args:
|
||||
logger.info("start saving tasks history.")
|
||||
logger.info("tasks history saving is started.")
|
||||
objects.DeploymentHistoryCollection.create(
|
||||
transaction, args['tasks_graph']
|
||||
)
|
||||
logger.info("finish saving tasks history.")
|
||||
|
||||
logger.info("tasks history saving is finished.")
|
||||
return method, args
|
||||
except errors.TaskBaseDeploymentNotAllowed:
|
||||
except errors.TaskBaseDeploymentNotAllowed as e:
|
||||
error_messages.append(six.text_type(e))
|
||||
logger.warning(
|
||||
"%s is not allowed, fallback to next method.", name
|
||||
"%s is not allowed, fallback to next method.", method
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def lcm_transaction(cls, transaction, tasks, *args, **kwargs):
|
||||
# TODO(bgaifullin) remove task version check
|
||||
# after related Library changes will be committed
|
||||
# if there is at least one task that is supported LCM, use it.
|
||||
lcm_readiness = StrictVersion(consts.TASK_LCM_READINESS)
|
||||
lcm_ready = (
|
||||
any(StrictVersion(t['version']) >= lcm_readiness for t in tasks)
|
||||
raise errors.TaskBaseDeploymentNotAllowed(
|
||||
"The task deploy is not allowed because of {0}"
|
||||
.format(", ".join(error_messages))
|
||||
)
|
||||
if not lcm_ready:
|
||||
raise errors.TaskBaseDeploymentNotAllowed()
|
||||
|
||||
# TODO(bgaifullin) always run for all nodes, because deploy-changes
|
||||
# and run for selected nodes uses same logic,
|
||||
# and need to differentiate this methods at first
|
||||
nodes = objects.Cluster.get_nodes_not_for_deletion(transaction.cluster)
|
||||
logger.info("start serialization of cluster.")
|
||||
# we should update information for all nodes except deleted
|
||||
# TODO(bgaifullin) pass role resolver to serializers
|
||||
deployment_info = deployment_serializers.serialize_for_lcm(
|
||||
transaction.cluster, nodes
|
||||
)
|
||||
logger.info("finish serialization of cluster.")
|
||||
current_state = objects.Transaction.get_deployment_info(
|
||||
objects.TransactionCollection.get_last_succeed_run(
|
||||
transaction.cluster
|
||||
)
|
||||
)
|
||||
expected_state = cls._save_deployment_info(
|
||||
transaction, deployment_info
|
||||
)
|
||||
context = lcm.TransactionContext(expected_state, current_state)
|
||||
logger.debug("start serialization of tasks.")
|
||||
# TODO(bgaifullin) Primary roles applied in deployment_serializers
|
||||
# need to move this code from deployment serializer
|
||||
# also role resolver should be created after serialization completed
|
||||
role_resolver = RoleResolver(nodes)
|
||||
directory, graph = lcm.TransactionSerializer.serialize(
|
||||
context, tasks, role_resolver
|
||||
)
|
||||
logger.info("finish serialization of tasks.")
|
||||
return 'task_deploy', {
|
||||
"tasks_directory": directory,
|
||||
"tasks_graph": graph
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def _save_deployment_info(cls, transaction, deployment_info):
|
||||
|
@ -258,7 +212,7 @@ class DeploymentTask(BaseDeploymentTask):
|
|||
|
||||
@classmethod
|
||||
def message(cls, task, nodes, affected_nodes=None, deployment_tasks=None,
|
||||
reexecutable_filter=None, graph_type=None):
|
||||
reexecutable_filter=None, graph_type=None, force=False):
|
||||
"""Builds RPC message for deployment task.
|
||||
|
||||
:param task: the database task object instance
|
||||
|
@ -292,8 +246,9 @@ class DeploymentTask(BaseDeploymentTask):
|
|||
)
|
||||
|
||||
deployment_mode, message = cls.call_deployment_method(
|
||||
task, deployment_tasks, nodes,
|
||||
affected_nodes, task_ids, reexecutable_filter
|
||||
task, tasks=deployment_tasks, nodes=nodes,
|
||||
affected_nodes=affected_nodes, selected_task_ids=task_ids,
|
||||
events=reexecutable_filter, force=force
|
||||
)
|
||||
|
||||
# After serialization set pending_addition to False
|
||||
|
@ -321,8 +276,8 @@ class DeploymentTask(BaseDeploymentTask):
|
|||
return rpc_message
|
||||
|
||||
@classmethod
|
||||
def granular_deploy(cls, transaction, tasks,
|
||||
nodes, affected_nodes, selected_task_ids, events):
|
||||
def granular_deploy(cls, transaction, tasks, nodes,
|
||||
affected_nodes, selected_task_ids, events, **kwargs):
|
||||
"""Builds parameters for granular deployment.
|
||||
|
||||
:param transaction: the transaction object
|
||||
|
@ -332,7 +287,7 @@ class DeploymentTask(BaseDeploymentTask):
|
|||
:param selected_task_ids: the list of tasks_ids to execute,
|
||||
if None, all tasks will be executed
|
||||
:param events: the list of events to find subscribed tasks
|
||||
:return: RPC method name, the arguments for RPC message
|
||||
:return: the arguments for RPC message
|
||||
"""
|
||||
graph = orchestrator_graph.AstuteGraph(transaction.cluster, tasks)
|
||||
graph.check()
|
||||
|
@ -360,20 +315,17 @@ class DeploymentTask(BaseDeploymentTask):
|
|||
graph, transaction.cluster, nodes,
|
||||
role_resolver=role_resolver)
|
||||
|
||||
return 'granular_deploy', {
|
||||
return {
|
||||
'deployment_info': serialized_cluster,
|
||||
'pre_deployment': pre_deployment,
|
||||
'post_deployment': post_deployment
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def deploy(cls, *args, **kwargs):
|
||||
args = cls.granular_deploy(*args, **kwargs)[1]
|
||||
return 'deploy', args
|
||||
deploy = granular_deploy
|
||||
|
||||
@classmethod
|
||||
def task_deploy(cls, transaction, tasks, nodes, affected_nodes,
|
||||
selected_task_ids, events):
|
||||
selected_task_ids, events, **kwargs):
|
||||
"""Builds parameters for task based deployment.
|
||||
|
||||
:param transaction: the transaction object
|
||||
|
@ -390,29 +342,71 @@ class DeploymentTask(BaseDeploymentTask):
|
|||
for task in tasks:
|
||||
task_processor.ensure_task_based_deploy_allowed(task)
|
||||
|
||||
logger.info("start cluster serialization.")
|
||||
logger.info("cluster serialization is started.")
|
||||
serialized_cluster = deployment_serializers.serialize(
|
||||
None, transaction.cluster, nodes
|
||||
)
|
||||
cls._save_deployment_info(transaction, serialized_cluster)
|
||||
logger.debug("finish cluster serialization.")
|
||||
logger.info("cluster serialization is finished.")
|
||||
tasks_events = events and \
|
||||
task_based_deployment.TaskEvents('reexecute_on', events)
|
||||
|
||||
logger.info("start tasks serialization.")
|
||||
logger.debug("tasks serialization is started.")
|
||||
directory, graph = task_based_deployment.TasksSerializer.serialize(
|
||||
transaction.cluster, nodes, tasks, affected_nodes,
|
||||
selected_task_ids, tasks_events
|
||||
)
|
||||
logger.info("finish tasks serialization.")
|
||||
logger.info("tasks serialization is finished.")
|
||||
|
||||
return 'task_deploy', {
|
||||
return {
|
||||
"deployment_info": serialized_cluster,
|
||||
"tasks_directory": directory,
|
||||
"tasks_graph": graph
|
||||
}
|
||||
|
||||
|
||||
class ClusterTransaction(DeploymentTask):
|
||||
@classmethod
|
||||
def get_deployment_methods(cls, cluster):
|
||||
return ['task_deploy']
|
||||
|
||||
@classmethod
|
||||
def task_deploy(cls, transaction, tasks, nodes, force=False, **kwargs):
|
||||
logger.info("The cluster transaction is initiated.")
|
||||
logger.info("cluster serialization is started.")
|
||||
# we should update information for all nodes except deleted
|
||||
# TODO(bgaifullin) pass role resolver to serializers
|
||||
deployment_info = deployment_serializers.serialize_for_lcm(
|
||||
transaction.cluster, nodes
|
||||
)
|
||||
logger.info("cluster serialization is finished.")
|
||||
if force:
|
||||
current_state = {}
|
||||
else:
|
||||
current_state = objects.Transaction.get_deployment_info(
|
||||
objects.TransactionCollection.get_last_succeed_run(
|
||||
transaction.cluster
|
||||
)
|
||||
)
|
||||
expected_state = cls._save_deployment_info(
|
||||
transaction, deployment_info
|
||||
)
|
||||
context = lcm.TransactionContext(expected_state, current_state)
|
||||
logger.debug("tasks serialization is started.")
|
||||
# TODO(bgaifullin) Primary roles applied in deployment_serializers
|
||||
# need to move this code from deployment serializer
|
||||
# also role resolver should be created after serialization completed
|
||||
role_resolver = RoleResolver(nodes)
|
||||
directory, graph = lcm.TransactionSerializer.serialize(
|
||||
context, tasks, role_resolver
|
||||
)
|
||||
logger.info("tasks serialization is finished.")
|
||||
return {
|
||||
"tasks_directory": directory,
|
||||
"tasks_graph": graph
|
||||
}
|
||||
|
||||
|
||||
class UpdateNodesInfoTask(object):
|
||||
"""Task for updating nodes.yaml and /etc/hosts on all slaves
|
||||
|
||||
|
@ -2067,12 +2061,12 @@ class UpdateDnsmasqTask(object):
|
|||
class UpdateOpenstackConfigTask(BaseDeploymentTask):
|
||||
|
||||
@staticmethod
|
||||
def task_deploy(transaction, tasks, nodes, task_ids):
|
||||
def task_deploy(transaction, nodes, tasks, task_ids):
|
||||
# TODO(akostrikov) https://bugs.launchpad.net/fuel/+bug/1561485
|
||||
directory, graph = task_based_deployment.TasksSerializer.serialize(
|
||||
transaction.cluster, nodes, tasks, task_ids=task_ids
|
||||
)
|
||||
return 'task_deploy', make_astute_message(
|
||||
return make_astute_message(
|
||||
transaction, "task_deploy", "update_config_resp", {
|
||||
"tasks_directory": directory,
|
||||
"tasks_graph": graph
|
||||
|
@ -2080,19 +2074,19 @@ class UpdateOpenstackConfigTask(BaseDeploymentTask):
|
|||
)
|
||||
|
||||
@staticmethod
|
||||
def granular_deploy(transaction, tasks, nodes, task_ids):
|
||||
def granular_deploy(transaction, nodes, tasks, task_ids):
|
||||
graph = orchestrator_graph.AstuteGraph(transaction.cluster, tasks)
|
||||
graph.only_tasks(task_ids)
|
||||
deployment_tasks = graph.stage_tasks_serialize(
|
||||
graph.graph.topology, nodes
|
||||
)
|
||||
return 'granular_deploy', make_astute_message(
|
||||
return make_astute_message(
|
||||
transaction, 'execute_tasks', 'update_config_resp', {
|
||||
'tasks': deployment_tasks,
|
||||
})
|
||||
|
||||
@classmethod
|
||||
def message(cls, task, cluster, nodes):
|
||||
def message(cls, task, nodes, cluster, **kwargs):
|
||||
configs = objects.OpenstackConfigCollection.find_configs_for_nodes(
|
||||
cluster, nodes)
|
||||
updated_configs = set()
|
||||
|
@ -2110,7 +2104,7 @@ class UpdateOpenstackConfigTask(BaseDeploymentTask):
|
|||
task_ids = {t['id'] for t in refreshable_tasks}
|
||||
deployment_tasks = objects.Cluster.get_deployment_tasks(task.cluster)
|
||||
return cls.call_deployment_method(
|
||||
task, deployment_tasks, nodes, task_ids
|
||||
task, tasks=deployment_tasks, nodes=nodes, task_ids=task_ids
|
||||
)[1]
|
||||
|
||||
|
||||
|
|
|
@ -950,13 +950,13 @@ class EnvironmentManager(object):
|
|||
raise Exception(
|
||||
'Cluster with ID "{0}" was not found.'.format(cluster_id))
|
||||
|
||||
def launch_provisioning_selected(self, nodes_uids=None, cluster_id=None):
|
||||
def _launch_for_selected_nodes(self, handler, nodes_uids, cluster_id):
|
||||
if self.clusters:
|
||||
cluster = self._get_cluster_by_id(cluster_id)
|
||||
if not nodes_uids:
|
||||
nodes_uids = [n.uid for n in cluster.nodes]
|
||||
action_url = reverse(
|
||||
'ProvisionSelectedNodes',
|
||||
handler,
|
||||
kwargs={'cluster_id': cluster.id}
|
||||
) + '?nodes={0}'.format(','.join(nodes_uids))
|
||||
resp = self.app.put(
|
||||
|
@ -978,12 +978,22 @@ class EnvironmentManager(object):
|
|||
"Nothing to provision - try creating cluster"
|
||||
)
|
||||
|
||||
def launch_deployment(self, cluster_id=None):
|
||||
def launch_provisioning_selected(self, nodes_uids=None, cluster_id=None):
|
||||
return self._launch_for_selected_nodes(
|
||||
'ProvisionSelectedNodes', nodes_uids, cluster_id
|
||||
)
|
||||
|
||||
def launch_deployment_selected(self, nodes_uids=None, cluster_id=None):
|
||||
return self._launch_for_selected_nodes(
|
||||
'DeploySelectedNodes', nodes_uids, cluster_id
|
||||
)
|
||||
|
||||
def _launch_for_cluster(self, handler, cluster_id):
|
||||
if self.clusters:
|
||||
cluster_id = self._get_cluster_by_id(cluster_id).id
|
||||
resp = self.app.put(
|
||||
reverse(
|
||||
'ClusterChangesHandler',
|
||||
handler,
|
||||
kwargs={'cluster_id': cluster_id}),
|
||||
headers=self.default_headers)
|
||||
|
||||
|
@ -995,6 +1005,14 @@ class EnvironmentManager(object):
|
|||
"Nothing to deploy - try creating cluster"
|
||||
)
|
||||
|
||||
def launch_deployment(self, cluster_id=None):
|
||||
return self._launch_for_cluster('ClusterChangesHandler', cluster_id)
|
||||
|
||||
def launch_redeployment(self, cluster_id=None):
|
||||
return self._launch_for_cluster(
|
||||
'ClusterChangesForceRedeployHandler', cluster_id
|
||||
)
|
||||
|
||||
def stop_deployment(self, cluster_id=None):
|
||||
if self.clusters:
|
||||
cluster_id = self._get_cluster_by_id(cluster_id).id
|
||||
|
@ -1283,6 +1301,11 @@ class EnvironmentManager(object):
|
|||
expect_errors=False
|
||||
)
|
||||
|
||||
def set_task_status_recursively(self, supertask, status):
|
||||
supertask.status = consts.TASK_STATUSES.ready
|
||||
for sub_task in supertask.subtasks:
|
||||
self.set_task_status_recursively(sub_task, status)
|
||||
|
||||
|
||||
class BaseUnitTest(TestCase):
|
||||
def datadiff(self, data1, data2, path=None, ignore_keys=[],
|
||||
|
|
|
@ -164,7 +164,9 @@ class TestReplacedDeploymentInfoSerialization(OrchestratorSerializerTestBase):
|
|||
node.replaced_deployment_info = [
|
||||
{'role': 'controller', 'priority': 'XXX', 'tasks': [], 'uid': '1'}]
|
||||
self.db.flush()
|
||||
|
||||
objects.Cluster.prepare_for_deployment(
|
||||
self.cluster, self.cluster.nodes
|
||||
)
|
||||
serialized_data = self.serializer.serialize(self.cluster, [node])
|
||||
# verify that task list is not empty
|
||||
self.assertTrue(serialized_data[0]['tasks'])
|
||||
|
|
|
@ -127,7 +127,7 @@ class TestTaskManagers(BaseIntegrationTest):
|
|||
@mock.patch('nailgun.task.task.rpc.cast')
|
||||
def test_deployment_info_saves_in_transaction(self, _):
|
||||
self.check_deployment_info_was_saved_in_transaction(
|
||||
'liberty-9.0', True, True
|
||||
'mitaka-9.0', True, True
|
||||
)
|
||||
self.check_deployment_info_was_saved_in_transaction(
|
||||
'liberty-8.0', True, False
|
||||
|
@ -696,7 +696,13 @@ class TestTaskManagers(BaseIntegrationTest):
|
|||
|
||||
@fake_tasks()
|
||||
def test_no_node_no_cry(self):
|
||||
cluster = self.env.create_cluster(api=True)
|
||||
cluster = self.env.create_cluster(
|
||||
api=True,
|
||||
release_kwargs={
|
||||
'operating_system': consts.RELEASE_OS.ubuntu,
|
||||
'version': 'liberty-8.0'
|
||||
},
|
||||
)
|
||||
cluster_id = cluster['id']
|
||||
manager_ = manager.ApplyChangesTaskManager(cluster_id)
|
||||
task = models.Task(name='provision', cluster_id=cluster_id,
|
||||
|
@ -763,7 +769,11 @@ class TestTaskManagers(BaseIntegrationTest):
|
|||
self.env.create(
|
||||
nodes_kwargs=[
|
||||
{"status": "ready"}
|
||||
]
|
||||
],
|
||||
release_kwargs={
|
||||
'operating_system': consts.RELEASE_OS.ubuntu,
|
||||
'version': 'liberty-8.0'
|
||||
},
|
||||
)
|
||||
cluster_db = self.env.clusters[0]
|
||||
objects.Cluster.clear_pending_changes(cluster_db)
|
||||
|
@ -780,11 +790,15 @@ class TestTaskManagers(BaseIntegrationTest):
|
|||
cluster_kwargs={
|
||||
'status': consts.CLUSTER_STATUSES.operational
|
||||
},
|
||||
release_kwargs={
|
||||
'operating_system': consts.RELEASE_OS.ubuntu,
|
||||
'version': 'liberty-8.0'
|
||||
},
|
||||
)
|
||||
cluster_db = self.env.clusters[0]
|
||||
objects.Cluster.clear_pending_changes(cluster_db)
|
||||
manager_ = manager.ApplyChangesForceTaskManager(cluster_db.id)
|
||||
supertask = manager_.execute()
|
||||
manager_ = manager.ApplyChangesTaskManager(cluster_db.id)
|
||||
supertask = manager_.execute(force=True)
|
||||
self.assertEqual(supertask.name, TASK_NAMES.deploy)
|
||||
self.assertIn(supertask.status, TASK_STATUSES.pending)
|
||||
|
||||
|
@ -1110,6 +1124,132 @@ class TestTaskManagers(BaseIntegrationTest):
|
|||
|
||||
self.assertNotEqual(primary_node.id, new_primary.id)
|
||||
|
||||
@mock.patch('nailgun.task.task.rpc.cast')
|
||||
def test_node_group_deletion_failed_while_previous_in_progress(
|
||||
self, mocked_rpc
|
||||
):
|
||||
self.env.create(
|
||||
cluster_kwargs={
|
||||
'net_provider': consts.CLUSTER_NET_PROVIDERS.neutron,
|
||||
'net_segment_type': consts.NEUTRON_SEGMENT_TYPES.gre},
|
||||
nodes_kwargs=[
|
||||
{'api': True,
|
||||
'pending_addition': True}
|
||||
]
|
||||
)
|
||||
ng1 = self.env.create_node_group(name='ng_1').json_body
|
||||
ng2 = self.env.create_node_group(name='ng_2').json_body
|
||||
self.assertEqual(mocked_rpc.call_count, 0)
|
||||
|
||||
self.env.delete_node_group(ng1['id'])
|
||||
self.assertEqual(mocked_rpc.call_count, 1)
|
||||
# delete other node group
|
||||
# request should be rejected as previous update_dnsmasq task is still
|
||||
# in progress
|
||||
resp = self.env.delete_node_group(ng2['id'], status_code=409)
|
||||
self.assertEqual(resp.status_code, 409)
|
||||
self.assertEqual(resp.json_body['message'],
|
||||
errors.UpdateDnsmasqTaskIsRunning.message)
|
||||
# no more calls were made
|
||||
self.assertEqual(mocked_rpc.call_count, 1)
|
||||
|
||||
@mock.patch('nailgun.task.task.rpc.cast')
|
||||
def test_deployment_starts_if_nodes_not_changed(self, rpc_mock):
|
||||
self.env.create(
|
||||
release_kwargs={
|
||||
'operating_system': consts.RELEASE_OS.ubuntu,
|
||||
'version': 'mitaka-9.0'
|
||||
},
|
||||
nodes_kwargs=[
|
||||
{'status': NODE_STATUSES.ready, 'roles': ['controller']},
|
||||
{'status': NODE_STATUSES.ready, 'roles': ['compute']},
|
||||
]
|
||||
)
|
||||
cluster = self.env.clusters[-1]
|
||||
supertask = self.env.launch_deployment(cluster.id)
|
||||
self.assertNotEqual(consts.TASK_STATUSES.error, supertask.status)
|
||||
tasks_graph = rpc_mock.call_args[0][1][0]['args']['tasks_graph']
|
||||
# check that nodes presents in tasks_graph
|
||||
self.assertItemsEqual(
|
||||
[n.uid for n in cluster.nodes] + [consts.MASTER_NODE_UID, None],
|
||||
tasks_graph
|
||||
)
|
||||
|
||||
@mock.patch('nailgun.task.task.rpc.cast')
|
||||
@mock.patch('nailgun.objects.Cluster.get_deployment_tasks')
|
||||
def test_redeployment_ignore_conditions(self, tasks_mock, rpc_mock):
|
||||
tasks_mock.return_value = [
|
||||
{
|
||||
"id": "test", "roles": ['master'], "version": "2.0.1",
|
||||
"type": "puppet", "parameters": {},
|
||||
"condition": {"yaql_exp": "changed($.nodes)"}
|
||||
}
|
||||
]
|
||||
self.env.create(
|
||||
release_kwargs={
|
||||
'operating_system': consts.RELEASE_OS.ubuntu,
|
||||
'version': 'mitaka-9.0'
|
||||
}
|
||||
)
|
||||
cluster = self.env.clusters[-1]
|
||||
# deploy cluster at first time
|
||||
supertask = self.env.launch_deployment(cluster.id)
|
||||
self.assertNotEqual(consts.TASK_STATUSES.error, supertask.status)
|
||||
self.env.set_task_status_recursively(
|
||||
supertask, consts.TASK_STATUSES.ready
|
||||
)
|
||||
self.db.flush()
|
||||
tasks_graph = rpc_mock.call_args[0][1][0]['args']['tasks_graph']
|
||||
self.assertEqual('puppet', tasks_graph['master'][0]['type'])
|
||||
|
||||
# launch cluster deployment again, because there is no changes
|
||||
# the task should be skipped
|
||||
supertask = self.env.launch_deployment(cluster.id)
|
||||
self.assertNotEqual(consts.TASK_STATUSES.error, supertask.status)
|
||||
self.env.set_task_status_recursively(
|
||||
supertask, consts.TASK_STATUSES.ready
|
||||
)
|
||||
self.db.flush()
|
||||
tasks_graph = rpc_mock.call_args[0][1][0]['args']['tasks_graph']
|
||||
self.assertEqual('skipped', tasks_graph['master'][0]['type'])
|
||||
supertask.status = consts.TASK_STATUSES.ready
|
||||
self.db.flush()
|
||||
|
||||
# force cluster re-deployment, the task should not be skipped
|
||||
supertask = self.env.launch_redeployment(cluster.id)
|
||||
self.assertNotEqual(consts.TASK_STATUSES.error, supertask.status)
|
||||
tasks_graph = rpc_mock.call_args[0][1][0]['args']['tasks_graph']
|
||||
self.assertEqual('puppet', tasks_graph['master'][0]['type'])
|
||||
|
||||
@mock.patch('nailgun.rpc.cast')
|
||||
def test_deploy_part_of_pending_addition_nodes(self, rpc_mock):
|
||||
self.env.create(
|
||||
release_kwargs={
|
||||
'operating_system': consts.RELEASE_OS.ubuntu,
|
||||
'version': 'mitaka-9.0'
|
||||
},
|
||||
nodes_kwargs=[
|
||||
{'status': NODE_STATUSES.provisioned, 'roles': ['controller']},
|
||||
{'status': NODE_STATUSES.provisioned, 'roles': ['compute']},
|
||||
]
|
||||
)
|
||||
cluster = self.env.clusters[-1]
|
||||
nodes_uids = [n.uid for n in cluster.nodes]
|
||||
node3 = self.env.create_node(
|
||||
api=False, cluster_id=cluster.id,
|
||||
roles=["compute"],
|
||||
pending_addition=True
|
||||
)
|
||||
t = self.env.launch_deployment_selected(nodes_uids, cluster.id)
|
||||
self.assertNotEqual(consts.TASK_STATUSES.error, t.status)
|
||||
self.db.refresh(node3)
|
||||
self.assertEqual(consts.NODE_STATUSES.discover, node3.status)
|
||||
self.assertTrue(node3.pending_addition)
|
||||
tasks_graph = rpc_mock.call_args[0][1]['args']['tasks_graph']
|
||||
self.assertItemsEqual(
|
||||
[consts.MASTER_NODE_UID, None] + nodes_uids, tasks_graph
|
||||
)
|
||||
|
||||
|
||||
class TestUpdateDnsmasqTaskManagers(BaseIntegrationTest):
|
||||
|
||||
|
@ -1290,22 +1430,3 @@ class TestUpdateDnsmasqTaskManagers(BaseIntegrationTest):
|
|||
update_task = self.db.query(models.Task).filter_by(
|
||||
name=consts.TASK_NAMES.update_dnsmasq).first()
|
||||
self.assertEqual(update_task.status, consts.TASK_STATUSES.running)
|
||||
|
||||
@mock.patch('nailgun.task.task.rpc.cast')
|
||||
def test_node_group_deletion_failed_while_previous_in_progress(self,
|
||||
mocked_rpc):
|
||||
ng1 = self.env.create_node_group(name='ng_1').json_body
|
||||
ng2 = self.env.create_node_group(name='ng_2').json_body
|
||||
self.assertEqual(mocked_rpc.call_count, 0)
|
||||
|
||||
self.env.delete_node_group(ng1['id'])
|
||||
self.assertEqual(mocked_rpc.call_count, 1)
|
||||
# delete other node group
|
||||
# request should be rejected as previous update_dnsmasq task is still
|
||||
# in progress
|
||||
resp = self.env.delete_node_group(ng2['id'], status_code=409)
|
||||
self.assertEqual(resp.status_code, 409)
|
||||
self.assertEqual(resp.json_body['message'],
|
||||
errors.UpdateDnsmasqTaskIsRunning.message)
|
||||
# no more calls were made
|
||||
self.assertEqual(mocked_rpc.call_count, 1)
|
||||
|
|
|
@ -478,3 +478,17 @@ class TestOpenstackConfigHandlers(BaseIntegrationTest):
|
|||
return '{0}?{1}'.format(
|
||||
reverse('OpenstackConfigCollectionHandler'),
|
||||
urlparse.urlencode(kwargs))
|
||||
|
||||
@mock.patch('nailgun.task.task.rpc.cast', mock.MagicMock())
|
||||
@mock.patch("nailgun.objects.Release.is_lcm_supported")
|
||||
@mock.patch("nailgun.task.task.ClusterTransaction.message")
|
||||
def test_openstack_config_call_apply_changes_if_lcm(
|
||||
self, message_mock, lcm_supported_mock):
|
||||
lcm_supported_mock.return_value = True
|
||||
message_mock.return_value = {
|
||||
'method': 'task_deploy', 'args': {
|
||||
'tasks_graph': {'master': []}
|
||||
}
|
||||
}
|
||||
self.execute_update_open_stack_config()
|
||||
self.assertEqual(1, message_mock.call_count)
|
||||
|
|
Loading…
Reference in New Issue