Merge "Fixed misleading provision task message"

This commit is contained in:
Jenkins 2016-02-19 16:11:21 +00:00 committed by Gerrit Code Review
commit 28b0c96a51
6 changed files with 155 additions and 87 deletions

View File

@ -1380,6 +1380,18 @@ class Cluster(NailgunObject):
db.flush()
@classmethod
def get_nodes_count_unmet_status(cls, instance, status):
"""Gets the number of nodes, that does not have specified status.
:param instance: nailgun.db.sqlalchemy.models.Cluster instance
:param status: the expected status
:return: the number of nodes that does not have specified status
"""
q = db().query(models.Node).filter_by(cluster_id=instance.id)
return q.filter(models.Node.status != status).count()
class ClusterCollection(NailgunCollection):
"""Cluster collection."""

View File

@ -21,7 +21,6 @@ import itertools
import logging
import os
import six
import traceback
from oslo_serialization import jsonutils
from sqlalchemy import or_
@ -87,11 +86,13 @@ class NailgunReceiver(object):
node['id'] if 'id' in node else node['uid']
for node in all_nodes
]
locked_nodes = objects.NodeCollection.filter_by_list(
None,
'id',
all_nodes_ids,
order_by='id'
locked_nodes = objects.NodeCollection.order_by(
objects.NodeCollection.filter_by_list(
None,
'id',
all_nodes_ids,
),
'id'
)
objects.NodeCollection.lock_for_update(locked_nodes).all()
@ -100,7 +101,7 @@ class NailgunReceiver(object):
nodes_to_delete_ids = [get_node_id(n) for n in nodes]
if(len(inaccessible_nodes) > 0):
if len(inaccessible_nodes) > 0:
inaccessible_node_ids = [
get_node_id(n) for n in inaccessible_nodes]
@ -256,38 +257,26 @@ class NailgunReceiver(object):
status = task.status
# for deployment we need just to pop
master = next((
n for n in nodes if n['uid'] == consts.MASTER_NODE_UID), {})
# we should remove master node from the nodes since it requires
# special handling and won't work with old code
if master:
nodes.remove(master)
# if there no node except master - then just skip updating
# nodes status, for the task itself astute will send
# message with descriptive error
if nodes:
nodes_by_id = {str(n['uid']): n for n in nodes}
master = nodes_by_id.pop(consts.MASTER_NODE_UID, {})
if nodes_by_id:
# lock nodes for updating so they can't be deleted
q_nodes = objects.NodeCollection.filter_by_id_list(
None,
[n['uid'] for n in nodes],
nodes_by_id,
)
q_nodes = objects.NodeCollection.order_by(q_nodes, 'id')
objects.NodeCollection.lock_for_update(q_nodes).all()
db_nodes = objects.NodeCollection.lock_for_update(q_nodes).all()
else:
db_nodes = []
# First of all, let's update nodes in database
for node in nodes:
node_db = objects.Node.get_by_uid(node['uid'])
if not node_db:
logger.warning(
u"No node found with uid '{0}' - nothing changed".format(
node['uid']
)
)
continue
for node_db in db_nodes:
node = nodes_by_id.pop(node_db.uid)
update_fields = (
'error_msg',
'error_type',
@ -297,13 +286,8 @@ class NailgunReceiver(object):
)
for param in update_fields:
if param in node:
logger.debug(
u"Updating node {0} - set {1} to {2}".format(
node['uid'],
param,
node[param]
)
)
logger.debug("Updating node %s - set %s to %s",
node['uid'], param, node[param])
setattr(node_db, param, node[param])
if param == 'progress' and node.get('status') == 'error' \
@ -328,6 +312,10 @@ class NailgunReceiver(object):
task_uuid=task_uuid
)
db().flush()
if nodes_by_id:
logger.warning("The following nodes is not found: %s",
",".join(sorted(nodes_by_id)))
if nodes and not progress:
progress = TaskHelper.recalculate_deployment_task_progress(task)
@ -335,7 +323,7 @@ class NailgunReceiver(object):
if master.get('status') == consts.TASK_STATUSES.error:
status = consts.TASK_STATUSES.error
cls._update_task_status(task, status, progress, message)
cls._update_task_status(task, status, progress, message, db_nodes)
cls._update_action_log_entry(status, task.name, task_uuid, nodes)
@classmethod
@ -356,35 +344,23 @@ class NailgunReceiver(object):
lock_for_update=True
)
# if task was failed on master node then we should
# mark all cluster's nodes in error state
master = next((
n for n in nodes if n['uid'] == consts.MASTER_NODE_UID), {})
# we should remove master node from the nodes since it requires
# special handling and won't work with old code
if master:
nodes.remove(master)
# lock nodes for updating
nodes_by_id = {str(n['uid']): n for n in nodes}
master = nodes_by_id.pop(consts.MASTER_NODE_UID, {})
if master.get('status') == consts.TASK_STATUSES.error:
status = consts.TASK_STATUSES.error
progress = 100
# lock nodes for updating
q_nodes = objects.NodeCollection.filter_by_id_list(
None,
[n['uid'] for n in nodes])
None, nodes_by_id
)
q_nodes = objects.NodeCollection.order_by(q_nodes, 'id')
objects.NodeCollection.lock_for_update(q_nodes).all()
for node in nodes:
uid = node.get('uid')
node_db = objects.Node.get_by_uid(node['uid'])
if not node_db:
logger.warn('Node with uid "{0}" not found'.format(uid))
continue
db_nodes = objects.NodeCollection.lock_for_update(q_nodes).all()
for node_db in db_nodes:
node = nodes_by_id[node_db.uid]
if node.get('status') == consts.TASK_STATUSES.error:
node_db.status = consts.TASK_STATUSES.error
node_db.progress = 100
@ -395,10 +371,14 @@ class NailgunReceiver(object):
node_db.progress = node.get('progress')
db().flush()
if nodes_by_id:
logger.warning("The following nodes is not found: %s",
",".join(sorted(six.moves.map(str, nodes_by_id))))
if nodes and not progress:
progress = TaskHelper.recalculate_provisioning_task_progress(task)
cls._update_task_status(task, status, progress, message)
cls._update_task_status(task, status, progress, message, db_nodes)
cls._update_action_log_entry(status, task.name, task_uuid, nodes)
@classmethod
@ -467,19 +447,20 @@ class NailgunReceiver(object):
)
@classmethod
def _update_task_status(cls, task, status, progress, message):
def _update_task_status(cls, task, status, progress, message, nodes):
"""Do update task status actions.
:param task: objects.Task object
:param status: consts.TASK_STATUSES value
:param progress: progress number value
:param message: message text
:param nodes: the modified nodes list
"""
# Let's check the whole task status
if status == consts.TASK_STATUSES.error:
cls._error_action(task, status, progress, message)
elif status == consts.TASK_STATUSES.ready:
cls._success_action(task, status, progress)
cls._success_action(task, status, progress, nodes)
else:
data = {'status': status, 'progress': progress, 'message': message}
objects.Task.update(task, data)
@ -568,30 +549,28 @@ class NailgunReceiver(object):
objects.Task.update(task, data)
@classmethod
def _success_action(cls, task, status, progress):
def _success_action(cls, task, status, progress, nodes):
# check if all nodes are ready
if any(map(lambda n: n.status == 'error',
task.cluster.nodes)):
if any(n.status == consts.NODE_STATUSES.error for n in nodes):
cls._error_action(task, 'error', 100)
return
task_name = task.name.title()
try:
message = (
u"{0} of environment '{1}' is done. "
).format(
task_name,
task.cluster.name,
)
except Exception as exc:
logger.error(": ".join([
str(exc),
traceback.format_exc()
]))
message = u"{0} of environment '{1}' is done".format(
task_name,
task.cluster.name
if nodes:
# check that all nodes in same state
remaining = objects.Cluster.get_nodes_count_unmet_status(
nodes[0].cluster, nodes[0].status
)
if remaining > 0:
message = u"{0} of {1} environment node(s) is done.".format(
task_name, len(nodes)
)
else:
message = u"{0} of environment '{1}' is done.".format(
task_name, task.cluster.name
)
else:
message = u"{0} is done. No changes.".format(task_name)
zabbix_url = objects.Cluster.get_network_manager(
task.cluster

View File

@ -432,9 +432,9 @@ class FakeDeletionThread(FakeThread):
try:
resp_method(**kwargs)
db().commit()
except Exception as e:
except Exception:
db().rollback()
raise e
raise
recover_nodes = self.params.get("recover_nodes", True)
recover_offline_nodes = self.params.get("recover_offline_nodes", True)
@ -482,9 +482,9 @@ class FakeStopDeploymentThread(FakeThread):
try:
resp_method(**kwargs)
db().commit()
except Exception as e:
except Exception:
db().rollback()
raise e
raise
if not recover_nodes:
db().commit()
@ -530,9 +530,9 @@ class FakeResetEnvironmentThread(FakeThread):
try:
resp_method(**kwargs)
db().commit()
except Exception as e:
except Exception:
db().rollback()
raise e
raise
if not recover_nodes:
db().commit()
@ -579,9 +579,9 @@ class FakeVerificationThread(FakeThread):
nodes=self.data['args']['nodes'],
)
db().commit()
except Exception as e:
except Exception:
db().rollback()
raise e
raise
class FakeMulticastVerifications(FakeAmpqThread):

View File

@ -1632,6 +1632,65 @@ class TestConsumer(BaseReciverTestCase):
self.assertEqual(nodes[1].status, consts.NODE_STATUSES.ready)
self.assertEqual(task.status, consts.TASK_STATUSES.ready)
def _check_success_message(self, callback, task_name, node_status):
self.env.create(
cluster_kwargs={},
nodes_kwargs=[
{'api': False, 'roles': ['controller'],
'status': consts.NODE_STATUSES.discover},
{'api': False, 'roles': ['compute'],
'status': consts.NODE_STATUSES.discover},
])
cluster = self.env.clusters[-1]
nodes = self.env.nodes
task_title = task_name.title()
task = Task(
uuid=str(uuid.uuid4()),
name=task_name,
cluster_id=cluster.id
)
task.cache = {'nodes': [nodes[0].uid, nodes[1].uid]}
self.db.add(task)
self.db.flush()
params = {
'task_uuid': task.uuid,
'status': consts.TASK_STATUSES.ready,
'progress': 100,
'nodes': [{'uid': nodes[0].uid, 'status':node_status}]
}
callback(**params)
self.assertEqual(
"{0} of 1 environment node(s) is done.".format(task_title),
task.message
)
params['nodes'] = []
callback(**params)
self.assertEqual(
"{0} is done. No changes.".format(task_title),
task.message
)
params['nodes'] = [{'uid': nodes[1].uid, 'status':node_status}]
callback(**params)
self.assertEqual(
"{0} of environment '{1}' is done."
.format(task_title, cluster.name),
task.message
)
def test_success_deploy_messsage(self):
self._check_success_message(
self.receiver.deploy_resp,
consts.TASK_NAMES.deployment,
consts.NODE_STATUSES.ready
)
def test_success_provision_messsage(self):
self._check_success_message(
self.receiver.deploy_resp,
consts.TASK_NAMES.provision,
consts.NODE_STATUSES.provisioned
)
class TestResetEnvironment(BaseReciverTestCase):

View File

@ -1476,6 +1476,22 @@ class TestClusterObject(BaseTestCase):
self.db().refresh(config)
self.assertFalse(config.is_active)
def test_get_nodes_count_unmet_status(self):
# by default all nodes in discover state
remaining = objects.Cluster.get_nodes_count_unmet_status(
self.cluster, consts.NODE_STATUSES.discover
)
self.assertEqual(0, remaining)
remaining = objects.Cluster.get_nodes_count_unmet_status(
self.cluster, consts.NODE_STATUSES.ready
)
self.assertEqual(len(self.env.nodes), remaining)
self.env.nodes[0].status = consts.NODE_STATUSES.ready
remaining = objects.Cluster.get_nodes_count_unmet_status(
self.cluster, consts.NODE_STATUSES.ready
)
self.assertEqual(len(self.env.nodes) - 1, remaining)
class TestClusterObjectVirtRoles(BaseTestCase):

View File

@ -55,10 +55,12 @@ class TestNailgunReceiver(base.BaseTestCase):
cluster_id=self.cluster.id)
def test_success_action_with_plugins(self):
NailgunReceiver._success_action(self.task, 'ready', 100)
NailgunReceiver._success_action(
self.task, 'ready', 100, self.env.nodes
)
self.assertRegexpMatches(
self.task.message,
"Deployment of environment '[^\s]+' is done. "
"Deployment of environment '[^\s]+' is done."
"\n\n"
"Plugin name\d is deployed. description\d\n"
"Plugin name\d is deployed. description\d")