Allow a user to run dry-run deployment

Now following handlers:

     /clusters/:cluster_id/changes/
     /clusters/:cluster_id/changes/redeploy/
     /clusters/:cluster_id/deploy/
     /clusters/:cluster_id/deploy_tasks/

?dry_run=1 that is telling Astute not to run cluster executionat all.

Dry run assumes that it does not actually affect
cluster status regardless of their result.

Also, remove redundant update of nodes statuses to 'deploying'
within OpenStackConfigManager and DeploymentTaskManager as it should be
done by receiever.

Do not set cluster status to 'deployment' for these nodes in order to
retain its real status

Modify stop deployment tests to move failing stop deployment for already
deployed clusters to another test class. Since 9.0 we can run stop
deployment for new clusters.

Change-Id: I374fc86b63af64411d4a5ca45ff6c3680cb44897
Partial-bug: #1569839
This commit is contained in:
Vladimir Kuklin 2016-05-05 12:51:46 +03:00 committed by Bulat Gaifullin
parent 58355b34a5
commit e4fbfe80ed
21 changed files with 490 additions and 160 deletions

View File

@ -121,10 +121,12 @@ class ClusterChangesHandler(DeferredTaskHandler):
@classmethod
def get_options(cls):
data = web.input(graph_type=None)
data = web.input(graph_type=None, dry_run="0")
return {
'graph_type': data.graph_type,
'force': False
'force': False,
'dry_run': utils.parse_bool(data.dry_run),
}
@ -138,10 +140,11 @@ class ClusterChangesForceRedeployHandler(DeferredTaskHandler):
@classmethod
def get_options(cls):
data = web.input(graph_type=None)
data = web.input(graph_type=None, dry_run="0")
return {
'graph_type': data.graph_type,
'force': True
'force': True,
'dry_run': utils.parse_bool(data.dry_run)
}

View File

@ -33,6 +33,7 @@ from nailgun.logger import logger
from nailgun import consts
from nailgun import errors
from nailgun import objects
from nailgun import utils
from nailgun.orchestrator import deployment_serializers
from nailgun.orchestrator import graph_visualization
@ -44,7 +45,6 @@ from nailgun.orchestrator import task_based_deployment
from nailgun.task.helpers import TaskHelper
from nailgun.task import manager
from nailgun.task import task
from nailgun import utils
class NodesFilterMixin(object):
@ -209,6 +209,13 @@ class DeploymentInfo(OrchestratorInfo):
return objects.Cluster.replace_deployment_info(cluster, data)
class DryRunMixin(object):
"""Provides dry_run parameters."""
def get_dry_run(self):
return utils.parse_bool(web.input(dry_run='0').dry_run)
class SelectedNodesBase(NodesFilterMixin, BaseHandler):
"""Base class for running task manager on selected nodes."""
@ -217,7 +224,8 @@ class SelectedNodesBase(NodesFilterMixin, BaseHandler):
nodes = self.get_nodes(cluster)
try:
task_manager = self.task_manager(cluster_id=cluster.id)
task_manager = self.task_manager(
cluster_id=cluster.id)
task = task_manager.execute(nodes, **kwargs)
except Exception as exc:
logger.warn(
@ -292,7 +300,7 @@ class BaseDeploySelectedNodes(SelectedNodesBase):
graph_type=graph_type)
class DeploySelectedNodes(BaseDeploySelectedNodes):
class DeploySelectedNodes(BaseDeploySelectedNodes, DryRunMixin):
"""Handler for deployment selected nodes."""
@content
@ -305,10 +313,14 @@ class DeploySelectedNodes(BaseDeploySelectedNodes):
* 404 (cluster or nodes not found in db)
"""
cluster = self.get_object_or_404(objects.Cluster, cluster_id)
return self.handle_task(cluster, graph_type=self.get_graph_type())
return self.handle_task(
cluster=cluster,
graph_type=self.get_graph_type(),
dry_run=self.get_dry_run()
)
class DeploySelectedNodesWithTasks(BaseDeploySelectedNodes):
class DeploySelectedNodesWithTasks(BaseDeploySelectedNodes, DryRunMixin):
validator = NodeDeploymentValidator
@ -332,7 +344,9 @@ class DeploySelectedNodesWithTasks(BaseDeploySelectedNodes):
cluster,
deployment_tasks=data,
graph_type=self.get_graph_type(),
force=force)
force=force,
dry_run=self.get_dry_run()
)
class TaskDeployGraph(BaseHandler):

View File

@ -37,7 +37,8 @@ class OpenstackConfigValidator(BasicValidator):
deploy_task_ids = [
six.text_type(task.id)
for task in objects.TaskCollection.get_by_name_and_cluster(
cluster, (consts.TASK_NAMES.deployment,))
cluster, (consts.TASK_NAMES.deployment,
consts.TASK_NAMES.dry_run_deployment))
.filter(models.Task.status.in_((consts.TASK_STATUSES.pending,
consts.TASK_STATUSES.running)))
.all()]

View File

@ -285,6 +285,7 @@ TASK_NAMES = Enum(
'multicast_verification',
'check_repo_availability',
'check_repo_availability_with_setup',
'dry_run_deployment',
# dump
'dump',

View File

@ -24,6 +24,8 @@ from alembic import op
import sqlalchemy as sa
from nailgun.db.sqlalchemy.models import fields
from nailgun.utils.migration import upgrade_enum
# revision identifiers, used by Alembic.
revision = '675105097a69'
@ -32,6 +34,7 @@ down_revision = '11a9adc6d36a'
def upgrade():
upgrade_deployment_history()
upgrade_transaction_names()
upgrade_clusters_replaced_info_wrong_default()
upgrade_tasks_snapshot()
@ -39,6 +42,7 @@ def upgrade():
def downgrade():
downgrade_tasks_snapshot()
downgrade_clusters_replaced_info_wrong_default()
downgrade_transaction_names()
downgrade_deployment_history()
@ -82,3 +86,68 @@ def upgrade_tasks_snapshot():
def downgrade_tasks_snapshot():
op.drop_column('tasks', 'tasks_snapshot')
transaction_names_old = (
'super',
# Cluster changes
# For deployment supertask, it contains
# two subtasks deployment and provision
'deploy',
'deployment',
'provision',
'stop_deployment',
'reset_environment',
'update',
'spawn_vms',
'node_deletion',
'cluster_deletion',
'remove_images',
'check_before_deployment',
# network
'check_networks',
'verify_networks',
'check_dhcp',
'verify_network_connectivity',
'multicast_verification',
'check_repo_availability',
'check_repo_availability_with_setup',
# dump
'dump',
'capacity_log',
# statistics
'create_stats_user',
'remove_stats_user',
# setup dhcp via dnsmasq for multi-node-groups
'update_dnsmasq'
)
transaction_names_new = transaction_names_old + ('dry_run_deployment',)
def upgrade_transaction_names():
upgrade_enum(
'tasks',
'name',
'task_name',
transaction_names_old,
transaction_names_new
)
def downgrade_transaction_names():
upgrade_enum(
'tasks',
'name',
'task_name',
transaction_names_new,
transaction_names_old
)

View File

@ -101,3 +101,7 @@ class TaskBaseDeploymentNotAllowed(DeploymentException):
class NoChanges(DeploymentException):
message = "There is no changes to apply"
class DryRunSupportedOnlyByLCM(DeploymentException):
message = "Dry run deployment mode is supported only by LCM serializer"

View File

@ -14,7 +14,7 @@
class TransactionContext(object):
def __init__(self, new_state, old_state=None):
def __init__(self, new_state, old_state=None, **kwargs):
"""Wrapper around current and previous state of a transaction
:param new_state: new state of cluster
@ -24,6 +24,7 @@ class TransactionContext(object):
"""
self.new = new_state
self.old = old_state or {}
self.options = kwargs
def get_new_data(self, node_id):
return self.new[node_id]

View File

@ -61,6 +61,9 @@ class Context(object):
self._yaql_engine = yaql_ext.create_engine()
self._yaql_expressions_cache = {}
def get_transaction_option(self, name, default=None):
return self._transaction.options.get(name, default)
def get_new_data(self, node_id):
return self._transaction.get_new_data(node_id)
@ -189,6 +192,7 @@ class NoopTaskSerializer(DeploymentTaskSerializer):
class DefaultTaskSerializer(NoopTaskSerializer):
def should_execute(self, task, node_id):
condition = task.get('condition', True)
if isinstance(condition, six.string_types):

View File

@ -230,7 +230,7 @@ class Task(NailgunObject):
Cluster.clear_pending_changes(cluster)
elif instance.status == consts.CLUSTER_STATUSES.error:
elif instance.status == consts.TASK_STATUSES.error:
cls.__update_cluster_status(
cluster, consts.CLUSTER_STATUSES.error, None
)
@ -337,11 +337,19 @@ class TaskCollection(NailgunCollection):
@classmethod
def get_cluster_tasks(cls, cluster_id, names=None):
"""Get unordered cluster tasks query.
:param cluster_id: cluster ID
:type cluster_id: int
:param names: tasks names
:type names: iterable[dict]
:returns: sqlalchemy query
:rtype: sqlalchemy.Query[models.Task]
"""
query = cls.get_by_cluster_id(cluster_id)
if isinstance(names, (list, tuple)):
query = cls.filter_by_list(query, 'name', names)
query = cls.order_by(query, 'id')
return query.all()
return query
@classmethod
def get_by_name_and_cluster(cls, cluster, names):

View File

@ -281,46 +281,51 @@ class NailgunReceiver(object):
else:
db_nodes = []
# First of all, let's update nodes in database
for node_db in db_nodes:
node = nodes_by_id.pop(node_db.uid)
update_fields = (
'error_msg',
'error_type',
'status',
'progress',
'online'
)
for param in update_fields:
if param in node:
logger.debug("Updating node %s - set %s to %s",
node['uid'], param, node[param])
setattr(node_db, param, node[param])
task = objects.Task.get_by_uuid(task_uuid)
# Dry run deployments should not actually lead to update of
# nodes' statuses
if task.name != consts.TASK_NAMES.dry_run_deployment:
if param == 'progress' and node.get('status') == 'error' \
or node.get('online') is False:
# If failure occurred with node
# it's progress should be 100
node_db.progress = 100
# Setting node error_msg for offline nodes
if node.get('online') is False \
and not node_db.error_msg:
node_db.error_msg = u"Node is offline"
# Notification on particular node failure
notifier.notify(
consts.NOTIFICATION_TOPICS.error,
u"Failed to {0} node '{1}': {2}".format(
consts.TASK_NAMES.deploy,
node_db.name,
node_db.error_msg or "Unknown error"
),
cluster_id=task.cluster_id,
node_id=node['uid'],
task_uuid=task_uuid
)
if nodes_by_id:
logger.warning("The following nodes is not found: %s",
",".join(sorted(nodes_by_id)))
# First of all, let's update nodes in database
for node_db in db_nodes:
node = nodes_by_id.pop(node_db.uid)
update_fields = (
'error_msg',
'error_type',
'status',
'progress',
'online'
)
for param in update_fields:
if param in node:
logger.debug("Updating node %s - set %s to %s",
node['uid'], param, node[param])
setattr(node_db, param, node[param])
if param == 'progress' and node.get('status') == \
'error' or node.get('online') is False:
# If failure occurred with node
# it's progress should be 100
node_db.progress = 100
# Setting node error_msg for offline nodes
if node.get('online') is False \
and not node_db.error_msg:
node_db.error_msg = u"Node is offline"
# Notification on particular node failure
notifier.notify(
consts.NOTIFICATION_TOPICS.error,
u"Failed to {0} node '{1}': {2}".format(
consts.TASK_NAMES.deploy,
node_db.name,
node_db.error_msg or "Unknown error"
),
cluster_id=task.cluster_id,
node_id=node['uid'],
task_uuid=task_uuid
)
if nodes_by_id:
logger.warning("The following nodes are not found: %s",
",".join(sorted(nodes_by_id)))
for node in nodes:
if node.get('deployment_graph_task_name') \

View File

@ -34,6 +34,7 @@ from nailgun.statistics.fuel_statistics.tasks_params_white_lists \
tasks_names_actions_groups_mapping = {
consts.TASK_NAMES.deploy: "cluster_changes",
consts.TASK_NAMES.deployment: "cluster_changes",
consts.TASK_NAMES.dry_run_deployment: "cluster_changes",
consts.TASK_NAMES.provision: "cluster_changes",
consts.TASK_NAMES.node_deletion: "cluster_changes",
consts.TASK_NAMES.update: "cluster_changes",

View File

@ -16,20 +16,21 @@
import copy
from distutils.version import StrictVersion
import six
import traceback
from oslo_serialization import jsonutils
from nailgun.extensions.network_manager.objects.serializers.\
network_configuration import NeutronNetworkConfigurationSerializer
from nailgun.extensions.network_manager.objects.serializers.\
network_configuration import NovaNetworkConfigurationSerializer
from nailgun import consts
from nailgun.db import db
from nailgun.db.sqlalchemy.models import Cluster
from nailgun.db.sqlalchemy.models import Task
from nailgun import errors
from nailgun.extensions.network_manager.objects.serializers.\
network_configuration import NeutronNetworkConfigurationSerializer
from nailgun.extensions.network_manager.objects.serializers.\
network_configuration import NovaNetworkConfigurationSerializer
from nailgun.logger import logger
from nailgun import notifier
from nailgun import objects
@ -86,10 +87,13 @@ class TaskManager(object):
db().commit()
def check_running_task(self, task_name):
current_tasks = db().query(Task).filter_by(
name=task_name
)
def check_running_task(self, task_names):
if isinstance(task_names, six.string_types):
task_names = (task_names,)
cluster = getattr(self, 'cluster', None)
current_tasks = objects.TaskCollection.get_cluster_tasks(
cluster_id=cluster.id if cluster else None,
names=task_names).all()
for task in current_tasks:
if task.status == "running":
raise errors.TaskAlreadyRunning()
@ -113,6 +117,7 @@ class DeploymentCheckMixin(object):
deployment_tasks = (
consts.TASK_NAMES.deploy,
consts.TASK_NAMES.deployment,
consts.TASK_NAMES.dry_run_deployment,
consts.TASK_NAMES.provision,
consts.TASK_NAMES.stop_deployment,
consts.TASK_NAMES.reset_environment,
@ -124,7 +129,7 @@ class DeploymentCheckMixin(object):
def check_no_running_deployment(cls, cluster):
tasks_q = objects.TaskCollection.get_by_name_and_cluster(
cluster, cls.deployment_tasks).filter_by(
status=consts.TASK_STATUSES.running)
status=consts.TASK_STATUSES.running)
tasks_exists = db.query(tasks_q.exists()).scalar()
if tasks_exists:
@ -139,6 +144,13 @@ class BaseDeploymentTaskManager(TaskManager):
return tasks.ClusterTransaction
return tasks.DeploymentTask
@staticmethod
def get_deployment_transaction_name(dry_run):
transaction_name = consts.TASK_NAMES.deployment
if dry_run:
transaction_name = consts.TASK_NAMES.dry_run_deployment
return transaction_name
class ApplyChangesTaskManager(BaseDeploymentTaskManager, DeploymentCheckMixin):
@ -163,6 +175,8 @@ class ApplyChangesTaskManager(BaseDeploymentTaskManager, DeploymentCheckMixin):
def _remove_obsolete_tasks(self):
cluster_tasks = objects.TaskCollection.get_cluster_tasks(
cluster_id=self.cluster.id)
cluster_tasks = objects.TaskCollection.order_by(
cluster_tasks, 'id').all()
current_tasks = objects.TaskCollection.filter_by(
cluster_tasks,
@ -230,7 +244,8 @@ class ApplyChangesTaskManager(BaseDeploymentTaskManager, DeploymentCheckMixin):
nodes_to_provision_deploy=nodes_ids_to_deploy,
deployment_tasks=deployment_tasks,
force=force,
graph_type=graph_type
graph_type=graph_type,
**kwargs
)
return supertask
@ -272,7 +287,7 @@ class ApplyChangesTaskManager(BaseDeploymentTaskManager, DeploymentCheckMixin):
def _execute_async_content(self, supertask, deployment_tasks=None,
nodes_to_provision_deploy=None, force=False,
graph_type=None):
graph_type=None, **kwargs):
"""Processes supertask async in mule
:param supertask: SqlAlchemy task object
@ -362,6 +377,9 @@ class ApplyChangesTaskManager(BaseDeploymentTaskManager, DeploymentCheckMixin):
task_messages.append(provision_message)
deployment_message = None
dry_run = kwargs.get('dry_run', False)
if (nodes_to_deploy or affected_nodes or
objects.Release.is_lcm_supported(self.cluster.release)):
if nodes_to_deploy:
@ -373,23 +391,27 @@ class ApplyChangesTaskManager(BaseDeploymentTaskManager, DeploymentCheckMixin):
" ".join((objects.Node.get_node_fqdn(n)
for n in affected_nodes)))
deployment_task_provider = self.get_deployment_task()
transaction_name = self.get_deployment_transaction_name(dry_run)
task_deployment = supertask.create_subtask(
name=consts.TASK_NAMES.deployment,
name=transaction_name,
status=consts.TASK_STATUSES.pending
)
# we should have task committed for processing in other threads
db().commit()
deployment_message = self._call_silently(
task_deployment,
self.get_deployment_task(),
deployment_task_provider,
nodes_to_deploy,
affected_nodes=affected_nodes,
deployment_tasks=deployment_tasks,
method_name='message',
reexecutable_filter=consts.TASKS_TO_RERUN_ON_DEPLOY_CHANGES,
graph_type=graph_type,
force=force
force=force,
**kwargs
)
db().commit()
@ -436,14 +458,13 @@ class ApplyChangesTaskManager(BaseDeploymentTaskManager, DeploymentCheckMixin):
node.status = consts.NODE_STATUSES.provisioning
db().commit()
objects.Cluster.get_by_uid(
self.cluster.id,
fail_if_not_found=True,
lock_for_update=True
)
self.cluster.status = consts.CLUSTER_STATUSES.deployment
db().add(self.cluster)
db().commit()
if not dry_run:
objects.Cluster.get_by_uid(
self.cluster.id,
fail_if_not_found=True
)
self.cluster.status = consts.CLUSTER_STATUSES.deployment
db().commit()
# We have to execute node deletion task only when provision,
# deployment and other tasks are in the database. Otherwise,
@ -608,14 +629,19 @@ class ProvisioningTaskManager(TaskManager):
class DeploymentTaskManager(BaseDeploymentTaskManager):
def execute(self, nodes_to_deployment, deployment_tasks=None,
graph_type=None, force=False, **kwargs):
graph_type=None, force=False, dry_run=False,
**kwargs):
deployment_tasks = deployment_tasks or []
logger.debug('Nodes to deploy: {0}'.format(
' '.join([objects.Node.get_node_fqdn(n)
for n in nodes_to_deployment])))
transaction_name = self.get_deployment_transaction_name(dry_run)
task_deployment = Task(
name=consts.TASK_NAMES.deployment, cluster=self.cluster,
name=transaction_name,
cluster=self.cluster,
status=consts.TASK_STATUSES.pending
)
db().add(task_deployment)
@ -627,7 +653,8 @@ class DeploymentTaskManager(BaseDeploymentTaskManager):
deployment_tasks=deployment_tasks,
method_name='message',
graph_type=graph_type,
force=force)
force=force,
dry_run=dry_run)
db().refresh(task_deployment)
@ -637,15 +664,9 @@ class DeploymentTaskManager(BaseDeploymentTaskManager):
fail_if_not_found=True,
lock_for_update=True
)
# locking nodes
objects.NodeCollection.lock_nodes(nodes_to_deployment)
task_deployment.cache = deployment_message
for node in nodes_to_deployment:
node.status = 'deploying'
node.progress = 0
db().commit()
rpc.cast('naily', deployment_message)
@ -767,6 +788,7 @@ class ResetEnvironmentTaskManager(TaskManager):
Task.name.in_([
consts.TASK_NAMES.deploy,
consts.TASK_NAMES.deployment,
consts.TASK_NAMES.dry_run_deployment,
consts.TASK_NAMES.stop_deployment
])
)
@ -1013,6 +1035,8 @@ class ClusterDeletionManager(TaskManager):
current_tasks = objects.TaskCollection.get_cluster_tasks(
self.cluster.id, names=(consts.TASK_NAMES.cluster_deletion,)
)
current_tasks = objects.TaskCollection.order_by(
current_tasks, 'id').all()
# locking cluster
objects.Cluster.get_by_uid(
@ -1313,7 +1337,12 @@ class OpenstackConfigTaskManager(TaskManager):
return tasks.UpdateOpenstackConfigTask
def execute(self, filters, force=False, graph_type=None, **kwargs):
self.check_running_task(consts.TASK_NAMES.deployment)
self.check_running_task(
(
consts.TASK_NAMES.deployment,
consts.TASK_NAMES.dry_run_deployment
)
)
task = Task(name=consts.TASK_NAMES.deployment,
cluster=self.cluster,
@ -1332,7 +1361,6 @@ class OpenstackConfigTaskManager(TaskManager):
force=force
)
# locking task
task = objects.Task.get_by_uid(
task.id,
fail_if_not_found=True,
@ -1342,16 +1370,9 @@ class OpenstackConfigTaskManager(TaskManager):
if task.is_completed():
return task
# locking nodes
objects.NodeCollection.lock_nodes(nodes_to_update)
task.cache = copy.copy(message)
task.cache['nodes'] = [n.id for n in nodes_to_update]
for node in nodes_to_update:
node.status = consts.NODE_STATUSES.deploying
node.progress = 0
db().commit()
rpc.cast('naily', message)

View File

@ -143,7 +143,7 @@ class BaseDeploymentTask(object):
try:
args = getattr(cls, method)(transaction, **kwargs)
# save tasks history
if 'tasks_graph' in args:
if 'tasks_graph' in args and not args.get('dry_run', False):
logger.info("tasks history saving is started.")
objects.DeploymentHistoryCollection.create(
transaction, args['tasks_graph']
@ -210,7 +210,8 @@ class DeploymentTask(BaseDeploymentTask):
@classmethod
def message(cls, task, nodes, affected_nodes=None, deployment_tasks=None,
reexecutable_filter=None, graph_type=None, force=False):
reexecutable_filter=None, graph_type=None,
force=False, dry_run=False, **kwargs):
"""Builds RPC message for deployment task.
:param task: the database task object instance
@ -219,9 +220,12 @@ class DeploymentTask(BaseDeploymentTask):
:param deployment_tasks: the list of tasks_ids to execute,
if None, all tasks will be executed
:param reexecutable_filter: the list of events to find subscribed tasks
:param force: force
:param dry_run: dry run
:param graph_type: deployment graph type
"""
logger.debug("DeploymentTask.message(task=%s)" % task.uuid)
task_ids = deployment_tasks or []
objects.NodeCollection.lock_nodes(nodes)
@ -260,7 +264,8 @@ class DeploymentTask(BaseDeploymentTask):
deployment_mode, message = cls.call_deployment_method(
task, tasks=deployment_tasks, nodes=nodes,
affected_nodes=affected_nodes, selected_task_ids=task_ids,
events=reexecutable_filter, force=force
events=reexecutable_filter, force=force,
dry_run=dry_run, **kwargs
)
# After serialization set pending_addition to False
@ -290,7 +295,8 @@ class DeploymentTask(BaseDeploymentTask):
@classmethod
def granular_deploy(cls, transaction, tasks, nodes,
affected_nodes, selected_task_ids, events, **kwargs):
affected_nodes, selected_task_ids, events,
dry_run=False, **kwargs):
"""Builds parameters for granular deployment.
:param transaction: the transaction object
@ -300,8 +306,12 @@ class DeploymentTask(BaseDeploymentTask):
:param selected_task_ids: the list of tasks_ids to execute,
if None, all tasks will be executed
:param events: the list of events to find subscribed tasks
:param dry_run: dry run
:return: the arguments for RPC message
"""
if dry_run:
raise errors.DryRunSupportedOnlyByLCM()
graph = orchestrator_graph.AstuteGraph(transaction.cluster, tasks)
graph.check()
graph.only_tasks(selected_task_ids)
@ -338,7 +348,8 @@ class DeploymentTask(BaseDeploymentTask):
@classmethod
def task_deploy(cls, transaction, tasks, nodes, affected_nodes,
selected_task_ids, events, **kwargs):
selected_task_ids, events, dry_run=False,
**kwargs):
"""Builds parameters for task based deployment.
:param transaction: the transaction object
@ -348,9 +359,13 @@ class DeploymentTask(BaseDeploymentTask):
:param selected_task_ids: the list of tasks_ids to execute,
if None, all tasks will be executed
:param events: the list of events to find subscribed tasks
:param dry_run: dry run
:return: RPC method name, the arguments for RPC message
"""
if dry_run:
raise errors.DryRunSupportedOnlyByLCM()
task_processor = task_based_deployment.TaskProcessor
for task in tasks:
task_processor.ensure_task_based_deploy_allowed(task)
@ -501,11 +516,12 @@ class ClusterTransaction(DeploymentTask):
@classmethod
def task_deploy(cls, transaction, tasks, nodes, force=False,
selected_task_ids=None, **kwargs):
selected_task_ids=None, dry_run=False, **kwargs):
logger.info("The cluster transaction is initiated.")
logger.info("cluster serialization is started.")
# we should update information for all nodes except deleted
# TODO(bgaifullin) pass role resolver to serializers
deployment_info = deployment_serializers.serialize_for_lcm(
transaction.cluster, nodes
)
@ -550,7 +566,8 @@ class ClusterTransaction(DeploymentTask):
logger.info("tasks serialization is finished.")
return {
"tasks_directory": directory,
"tasks_graph": graph
"tasks_graph": graph,
"dry_run": dry_run,
}
@ -1802,9 +1819,11 @@ class CheckBeforeDeploymentTask(object):
@classmethod
def _check_dpdk_network_scheme(cls, network_scheme, node_group):
"""Check that endpoint with dpdk provider mapped only to neutron/private
"""DPDK endpoint provider check
Check that endpoint with dpdk provider mapped only to neutron/private
"""
for net_template in network_scheme.values():
roles = net_template['roles']

View File

@ -1000,13 +1000,22 @@ class EnvironmentManager(object):
task_ids or [],
)
def _launch_for_cluster(self, handler, cluster_id):
def _launch_for_cluster(self, handler, cluster_id, **kwargs):
if self.clusters:
cluster_id = self._get_cluster_by_id(cluster_id).id
if kwargs:
get_string = '?' + ('&'.join(
'{}={}'.format(k, v) for k, v in six.iteritems(kwargs)
))
else:
get_string = ''
resp = self.app.put(
reverse(
handler,
kwargs={'cluster_id': cluster_id}),
kwargs={'cluster_id': cluster_id}
) + get_string,
headers=self.default_headers)
return self.db.query(Task).filter_by(
@ -1017,12 +1026,14 @@ class EnvironmentManager(object):
"Nothing to deploy - try creating cluster"
)
def launch_deployment(self, cluster_id=None):
return self._launch_for_cluster('ClusterChangesHandler', cluster_id)
def launch_redeployment(self, cluster_id=None):
def launch_deployment(self, cluster_id=None, **kwargs):
return self._launch_for_cluster(
'ClusterChangesForceRedeployHandler', cluster_id
'ClusterChangesHandler', cluster_id, **kwargs
)
def launch_redeployment(self, cluster_id=None, **kwargs):
return self._launch_for_cluster(
'ClusterChangesForceRedeployHandler', cluster_id, **kwargs
)
def stop_deployment(self, cluster_id=None):

View File

@ -26,6 +26,7 @@ from nailgun import objects
from nailgun.db.sqlalchemy import models
from nailgun.db.sqlalchemy.models import NetworkGroup
from nailgun.extensions.network_manager.manager import NetworkManager
from nailgun.objects import Task
from nailgun.settings import settings
from nailgun.test.base import BaseIntegrationTest
from nailgun.test.base import mock_rpc
@ -1870,6 +1871,42 @@ class TestHandlers(BaseIntegrationTest):
[node['uid'] for node in deployment_info]
)
@patch('nailgun.task.manager.rpc.cast')
def test_dry_run(self, mcast):
self.env.create(
release_kwargs={
'operating_system': consts.RELEASE_OS.ubuntu,
'version': 'mitaka-9.0',
},
nodes_kwargs=[
{
'roles': ['controller'],
'status': consts.NODE_STATUSES.provisioned
}
],
cluster_kwargs={
'status': consts.CLUSTER_STATUSES.operational
},
)
for handler in ('ClusterChangesHandler',
'ClusterChangesForceRedeployHandler'):
resp = self.app.put(
reverse(
handler,
kwargs={'cluster_id': self.env.clusters[0].id}
) + '?dry_run=1',
headers=self.default_headers,
expect_errors=True
)
self.assertEqual(resp.status_code, 202)
self.assertEqual(
mcast.call_args[0][1][0]['args']['dry_run'], True)
task_uuid = mcast.call_args[0][1][0]['args']['task_uuid']
task = Task.get_by_uuid(uuid=task_uuid, fail_if_not_found=True)
self.assertNotEqual(consts.TASK_STATUSES.error, task.status)
self.assertEqual('dry_run_deployment', task.name)
@patch('nailgun.rpc.cast')
def test_occurs_error_not_enough_memory_for_hugepages(self, *_):
meta = self.env.default_metadata()

View File

@ -16,8 +16,10 @@ from mock import patch
from nailgun import consts
from nailgun.db.sqlalchemy.models import DeploymentGraphTask
from nailgun import errors
from nailgun import objects
from nailgun.orchestrator.tasks_templates import make_generic_task
from nailgun.rpc.receiver import NailgunReceiver
from nailgun.task.manager import OpenstackConfigTaskManager
from nailgun.test import base
@ -155,6 +157,26 @@ class TestOpenstackConfigTaskManager80(base.BaseIntegrationTest):
all_node_ids = [self.nodes[0].id]
self.assertEqual(task.cache['nodes'], all_node_ids)
@patch('nailgun.rpc.cast')
def test_config_execute_fails_if_deployment_running(self, mocked_rpc):
task_manager = OpenstackConfigTaskManager(self.cluster.id)
task = task_manager.execute({'cluster_id': self.cluster.id})
self.assertEqual(task.status, consts.TASK_STATUSES.pending)
NailgunReceiver.deploy_resp(
task_uuid=task.uuid,
status=consts.TASK_STATUSES.running,
progress=50,
nodes=[{'uid': n.uid, 'status': consts.NODE_STATUSES.ready}
for n in self.env.nodes],
)
self.assertEqual(task.status, consts.TASK_STATUSES.running)
task2 = OpenstackConfigTaskManager(self.cluster.id)
self.assertRaises(errors.TaskAlreadyRunning,
task2.execute, {'cluster_id': self.cluster.id})
class TestOpenstackConfigTaskManager90(TestOpenstackConfigTaskManager80):
env_version = "liberty-8.0"

View File

@ -275,7 +275,27 @@ class TestSelectedNodesAction(BaseSelectedNodesTest):
self.check_deployment_call_made(self.node_uids, mcast)
@mock_rpc(pass_mock=True)
def test_start_deployment_on_selected_nodes_with_tasks(self, mcast):
@patch("objects.Release.is_lcm_supported")
@patch('nailgun.task.task.rpc.cast')
def test_start_dry_run_deployment_on_selected_nodes(self, _, mcast, __):
controller_nodes = [
n for n in self.cluster.nodes
if "controller" in n.roles
]
self.emulate_nodes_provisioning(controller_nodes)
deploy_action_url = reverse(
"DeploySelectedNodes",
kwargs={'cluster_id': self.cluster.id}) + \
make_query(nodes=[n.uid for n in controller_nodes], dry_run='1')
self.send_put(deploy_action_url)
self.assertTrue(mcast.call_args[0][1]['args']['dry_run'])
@mock_rpc(pass_mock=True)
@patch('nailgun.task.task.rpc.cast')
def test_start_deployment_on_selected_nodes_with_tasks(self, _, mcast):
controller_nodes = [
n for n in self.cluster.nodes
if "controller" in n.roles

View File

@ -835,6 +835,43 @@ class TestConsumer(BaseReciverTestCase):
# if there are error nodes
self.assertEqual(task.status, "running")
def test_node_deploy_resp_dry_run(self):
cluster = self.env.create(
cluster_kwargs={},
nodes_kwargs=[
{"api": False},
{"api": False}]
)
node, node2 = self.env.nodes
node.status = consts.NODE_STATUSES.ready
node2.status = consts.NODE_STATUSES.ready
cluster.status = consts.CLUSTER_STATUSES.operational
task = Task(
uuid=str(uuid.uuid4()),
name="dry_run_deployment",
cluster_id=cluster.id
)
self.db.add(task)
self.db.commit()
kwargs = {'task_uuid': task.uuid,
'status': consts.TASK_STATUSES.ready,
'progress': 100,
'nodes': []
}
self.receiver.deploy_resp(**kwargs)
self.db.refresh(node)
self.db.refresh(node2)
self.db.refresh(task)
self.assertEqual(
(node.status, node2.status),
(consts.NODE_STATUSES.ready, consts.NODE_STATUSES.ready)
)
self.assertEqual(task.status, consts.TASK_STATUSES.ready)
self.assertEqual(cluster.status, consts.CLUSTER_STATUSES.operational)
def test_node_provision_resp(self):
cluster = self.env.create(
cluster_kwargs={},

View File

@ -15,6 +15,7 @@
# under the License.
import nailgun
from nailgun import consts
from nailgun.db.sqlalchemy.models.notification import Notification
from nailgun.db.sqlalchemy.models.task import Task
@ -25,10 +26,10 @@ from nailgun.test.base import mock_rpc
from nailgun.test.base import reverse
class TestStopDeployment(BaseIntegrationTest):
class TestStopDeploymentPre90(BaseIntegrationTest):
def setUp(self):
super(TestStopDeployment, self).setUp()
super(TestStopDeploymentPre90, self).setUp()
self.cluster = self.env.create(
nodes_kwargs=[
{"name": "First",
@ -36,57 +37,15 @@ class TestStopDeployment(BaseIntegrationTest):
{"name": "Second",
"roles": ["compute"],
"pending_addition": True}
]
],
release_kwargs={
'version': "liberty-8.0"
}
)
self.controller = self.env.nodes[0]
self.compute = self.env.nodes[1]
self.node_uids = [n.uid for n in self.cluster.nodes][:3]
@mock_rpc()
def test_stop_deployment(self):
supertask = self.env.launch_deployment()
self.assertEqual(supertask.status, consts.TASK_STATUSES.pending)
deploy_task = [t for t in supertask.subtasks
if t.name == consts.TASK_NAMES.deployment][0]
NailgunReceiver.deploy_resp(
task_uuid=deploy_task.uuid,
status=consts.TASK_STATUSES.running,
progress=50,
)
stop_task = self.env.stop_deployment()
NailgunReceiver.stop_deployment_resp(
task_uuid=stop_task.uuid,
status=consts.TASK_STATUSES.ready,
progress=100,
nodes=[{'uid': n.uid} for n in self.env.nodes],
)
self.assertEqual(stop_task.status, consts.TASK_STATUSES.ready)
self.assertTrue(self.db().query(Task).filter_by(
uuid=deploy_task.uuid
).first())
self.assertIsNone(objects.Task.get_by_uuid(deploy_task.uuid))
self.assertEqual(self.cluster.status, consts.CLUSTER_STATUSES.stopped)
self.assertEqual(stop_task.progress, 100)
self.assertFalse(self.cluster.is_locked)
for n in self.cluster.nodes:
self.assertEqual(n.roles, [])
self.assertNotEqual(n.pending_roles, [])
notification = self.db.query(Notification).filter_by(
cluster_id=stop_task.cluster_id
).order_by(
Notification.datetime.desc()
).first()
self.assertRegexpMatches(
notification.message,
'was successfully stopped')
# FIXME(aroma): remove when stop action will be reworked for ha
# cluster. To get more details, please, refer to [1]
# [1]: https://bugs.launchpad.net/fuel/+bug/1529691
@ -148,6 +107,73 @@ class TestStopDeployment(BaseIntegrationTest):
self.assertEqual(resp.json_body['message'],
'Stop action is forbidden for the cluster')
class TestStopDeployment(BaseIntegrationTest):
def setUp(self):
super(TestStopDeployment, self).setUp()
self.cluster = self.env.create(
nodes_kwargs=[
{"name": "First",
"pending_addition": True},
{"name": "Second",
"roles": ["compute"],
"pending_addition": True}
],
release_kwargs={
'version': "mitaka-9.0"
}
)
self.controller = self.env.nodes[0]
self.compute = self.env.nodes[1]
self.node_uids = [n.uid for n in self.cluster.nodes][:3]
@mock_rpc()
def test_stop_deployment(self):
supertask = self.env.launch_deployment()
self.assertEqual(supertask.status, consts.TASK_STATUSES.pending)
deploy_task = [t for t in supertask.subtasks
if t.name in (consts.TASK_NAMES.deployment)][0]
NailgunReceiver.deploy_resp(
task_uuid=deploy_task.uuid,
status=consts.TASK_STATUSES.running,
progress=50,
)
stop_task = self.env.stop_deployment()
NailgunReceiver.stop_deployment_resp(
task_uuid=stop_task.uuid,
status=consts.TASK_STATUSES.ready,
progress=100,
nodes=[{'uid': n.uid} for n in self.env.nodes],
)
self.assertEqual(stop_task.status, consts.TASK_STATUSES.ready)
self.assertTrue(self.db().query(Task).filter_by(
uuid=deploy_task.uuid
).first())
self.assertIsNone(objects.Task.get_by_uuid(deploy_task.uuid))
self.assertEqual(self.cluster.status,
consts.CLUSTER_STATUSES.stopped)
self.assertEqual(stop_task.progress, 100)
self.assertFalse(self.cluster.is_locked)
for n in self.cluster.nodes:
self.assertEqual(n.roles, [])
self.assertNotEqual(n.pending_roles, [])
notification = self.db.query(Notification).filter_by(
cluster_id=stop_task.cluster_id
).order_by(
Notification.datetime.desc()
).first()
self.assertRegexpMatches(
notification.message,
'was successfully stopped')
@mock_rpc()
def test_admin_ip_in_args(self):
deploy_task = self.env.launch_deployment()

View File

@ -77,7 +77,8 @@ class TestTaskDeploy80(BaseIntegrationTest):
return args[1][1]
@mock.patch.object(TaskProcessor, "ensure_task_based_deploy_allowed")
def test_task_deploy_used_by_default(self, _):
@mock.patch.object(objects.Release, "is_lcm_supported", return_value=False)
def test_task_deploy_used_by_default(self, _, lcm_mock):
message = self.get_deploy_message()
self.assertEqual("task_deploy", message["method"])
self.assertItemsEqual(
@ -86,6 +87,16 @@ class TestTaskDeploy80(BaseIntegrationTest):
message["args"]
)
@mock.patch.object(TaskProcessor, "ensure_task_based_deploy_allowed")
@mock.patch.object(objects.Release, "is_lcm_supported", return_value=True)
def test_task_deploy_dry_run(self, _, lcm_mock):
message = self.get_deploy_message()
self.assertEqual("task_deploy", message["method"])
self.assertItemsEqual(
["task_uuid", "tasks_directory", "tasks_graph", "dry_run"],
message["args"]
)
@mock.patch.object(TaskProcessor, "ensure_task_based_deploy_allowed")
def test_fallback_to_granular_deploy(self, ensure_allowed):
ensure_allowed.side_effect = errors.TaskBaseDeploymentNotAllowed

View File

@ -102,3 +102,18 @@ class TestTasksSnapshotField(base.BaseAlembicMigrationTest):
])
).first()
self.assertIsNotNone(result['tasks_snapshot'])
class TestTransactionsNames(base.BaseAlembicMigrationTest):
def test_fields_exist(self):
db.execute(
self.meta.tables['tasks'].insert(),
[
{
'uuid': 'fake_task_uuid_0',
'name': 'dry_run_deployment',
'status': 'pending'
}
]
)