Merge "Fixed updating node status in transaction resp"

This commit is contained in:
Jenkins 2016-08-22 16:48:34 +00:00 committed by Gerrit Code Review
commit 590b10285b
11 changed files with 313 additions and 20 deletions

View File

@ -26,6 +26,7 @@ import sqlalchemy as sa
from oslo_serialization import jsonutils
from nailgun.db.sqlalchemy.models import fields
from nailgun.utils.migration import drop_enum
from nailgun.utils.migration import upgrade_enum
@ -42,9 +43,11 @@ def upgrade():
upgrade_task_model()
upgrade_deployment_graphs_attributes()
upgrade_orchestrator_task_types()
upgrade_node_error_type()
def downgrade():
downgrade_node_error_type()
downgrade_orchestrator_task_types()
downgrade_deployment_graphs_attributes()
downgrade_task_model()
@ -413,3 +416,26 @@ def downgrade_orchestrator_task_types():
orchestrator_task_types_new,
orchestrator_task_types_old
)
node_error_types_old = (
'deploy',
'provision',
'deletion',
'discover',
'stop_deployment'
)
def upgrade_node_error_type():
op.alter_column('nodes', 'error_type', type_=sa.String(100))
drop_enum('node_error_type')
def downgrade_node_error_type():
enum_type = sa.Enum(*node_error_types_old, name='node_error_type')
enum_type.create(op.get_bind(), checkfirst=False)
op.execute(
u'ALTER TABLE nodes ALTER COLUMN error_type TYPE node_error_type'
u' USING error_type::text::node_error_type'
)

View File

@ -89,7 +89,7 @@ class Node(Base):
pending_addition = Column(Boolean, default=False)
pending_deletion = Column(Boolean, default=False)
changes = relationship("ClusterChanges", backref="node")
error_type = Column(Enum(*consts.NODE_ERRORS, name='node_error_type'))
error_type = Column(String(100))
error_msg = Column(Text)
timestamp = Column(DateTime, nullable=False)
online = Column(Boolean, default=True)

View File

@ -70,6 +70,30 @@ class Node(NailgunObject):
#: Serializer for Node
serializer = NodeSerializer
@classmethod
def get_status(cls, instance):
"""Get node state which is calculated from current state."""
# return transition statuses as is
if instance.status in (
consts.NODE_STATUSES.deploying,
consts.NODE_STATUSES.provisioning,
consts.NODE_STATUSES.removing
):
return instance.status
# if progress means that node in progress state,
# to avoid population of new status use deploying
if 0 < instance.progress < 100:
if instance.pending_deletion:
return consts.NODE_STATUSES.removing
if instance.pending_addition:
return consts.NODE_STATUSES.provisioning
return consts.NODE_STATUSES.deploying
# if error_type is set, the node is treated as in error state
if instance.error_type:
return consts.NODE_STATUSES.error
return instance.status
@classmethod
def delete(cls, instance):
fire_callback_on_node_delete(instance)

View File

@ -49,4 +49,5 @@ class NodeSerializer(BasicSerializer):
from nailgun.objects import Node
data_dict = super(NodeSerializer, cls).serialize(instance, fields)
data_dict['fqdn'] = Node.get_node_fqdn(instance)
data_dict['status'] = Node.get_status(instance)
return data_dict

View File

@ -179,6 +179,15 @@ class BaseDeploymentTaskManager(TaskManager):
transaction_name = consts.TASK_NAMES.dry_run_deployment
return transaction_name
@staticmethod
def reset_error_message(nodes, dry_run):
if dry_run:
return
for node in nodes:
node.error_msg = None
node.error_type = None
class ApplyChangesTaskManager(BaseDeploymentTaskManager, DeploymentCheckMixin):
@ -380,8 +389,11 @@ class ApplyChangesTaskManager(BaseDeploymentTaskManager, DeploymentCheckMixin):
task_deletion, task_provision, task_deployment = None, None, None
dry_run = kwargs.get('dry_run', False)
if nodes_to_delete:
task_deletion = self.delete_nodes(supertask, nodes_to_delete)
self.reset_error_message(nodes_to_delete, dry_run)
if nodes_to_provision:
logger.debug("There are nodes to provision: %s",
@ -415,14 +427,13 @@ class ApplyChangesTaskManager(BaseDeploymentTaskManager, DeploymentCheckMixin):
if task_provision.status == consts.TASK_STATUSES.error:
return
self.reset_error_message(nodes_to_provision, dry_run)
task_provision.cache = provision_message
db().commit()
task_messages.append(provision_message)
deployment_message = None
dry_run = kwargs.get('dry_run', False)
if (nodes_to_deploy or affected_nodes or
objects.Release.is_lcm_supported(self.cluster.release)):
if nodes_to_deploy:
@ -469,6 +480,7 @@ class ApplyChangesTaskManager(BaseDeploymentTaskManager, DeploymentCheckMixin):
return
task_deployment.cache = deployment_message
self.reset_error_message(nodes_to_deploy, dry_run)
db().commit()
if deployment_message:
@ -687,6 +699,8 @@ class ProvisioningTaskManager(TaskManager):
node.pending_addition = False
node.status = consts.NODE_STATUSES.provisioning
node.progress = 0
node.error_msg = None
node.error_type = None
db().commit()
@ -758,6 +772,7 @@ class DeploymentTaskManager(BaseDeploymentTaskManager):
nodes_ids_to_deployment,
order_by='id'
)
self.reset_error_message(nodes_to_deployment, dry_run)
deployment_message = self._call_silently(
task_deployment,

View File

@ -441,7 +441,8 @@ class ClusterTransaction(DeploymentTask):
if node is None:
return False
return node.status in cls.node_statuses_for_redeploy
node_state = objects.Node.get_status(node)
return node_state in cls.node_statuses_for_redeploy
@classmethod
def get_current_state(cls, cluster, nodes, tasks):

View File

@ -165,3 +165,19 @@ class TestOrchestratorTaskTypesDowngrade(base.BaseAlembicMigrationTest):
self.assertFalse(
expected_values.intersection((x[0] for x in result))
)
class TestNodeErrorTypeMigration(base.BaseAlembicMigrationTest):
def test_error_type_is_enum(self):
nodes_table = self.meta.tables['nodes']
self.assertEqual(
'node_error_type', nodes_table.c.error_type.type.name
)
result = db.execute(sa.text(
'select unnest(enum_range(NULL::node_error_type))'
)).fetchall()
self.assertEqual(
{'deploy', 'provision', 'deletion', 'discover', 'stop_deployment'},
{x[0] for x in result},
)

View File

@ -612,3 +612,22 @@ class TestOrchestratorTaskTypesMigration(base.BaseAlembicMigrationTest):
'select unnest(enum_range(NULL::deployment_graph_tasks_type))'
)).fetchall()
self.assertTrue(expected_values.issubset((x[0] for x in result)))
class TestNodeErrorTypeMigration(base.BaseAlembicMigrationTest):
def test_error_type_accepts_any_string_value(self):
nodes_table = self.meta.tables['nodes']
node_id = db.execute(sa.select([nodes_table])).scalar()
db.execute(
nodes_table.update(),
[{
'error_type': 'custom_error_type'
}]
)
result = db.execute(
sa.select([
nodes_table.c.error_type,
]).where(nodes_table.c.id == node_id)
).first()
self.assertEqual('custom_error_type', result[0])

View File

@ -2317,3 +2317,57 @@ class TestOpenstackConfigCollection(BaseTestCase):
consts.CLUSTER_STATUSES.error,
None
)
class TestNodeStatus(BaseTestCase):
def setUp(self):
super(TestNodeStatus, self).setUp()
self.node = self.env.create_node()
def test_in_progress_has_high_priority(self):
node = self.node
node.error_type = consts.NODE_ERRORS.deploy
node.status = consts.NODE_STATUSES.removing
self.assertEqual(
consts.NODE_STATUSES.removing, objects.Node.get_status(node)
)
node.status = consts.NODE_STATUSES.provisioning
self.assertEqual(
consts.NODE_STATUSES.provisioning, objects.Node.get_status(node)
)
node.status = consts.NODE_STATUSES.deploying
self.assertEqual(
consts.NODE_STATUSES.deploying, objects.Node.get_status(node)
)
node.status = consts.NODE_STATUSES.provisioned
node.progress = 1
self.assertEqual(
consts.NODE_STATUSES.deploying, objects.Node.get_status(node)
)
def test_in_pgogress_status(self):
node = self.node
node.progress = 1
node.status = consts.NODE_STATUSES.ready
node.pending_addition = True
self.assertEqual(
consts.NODE_STATUSES.provisioning, objects.Node.get_status(node)
)
node.pending_addition = False
node.pending_deletion = True
self.assertEqual(
consts.NODE_STATUSES.removing, objects.Node.get_status(node)
)
node.pending_deletion = False
self.assertEqual(
consts.NODE_STATUSES.deploying, objects.Node.get_status(node)
)
def test_error_state(self):
node = self.node
node.status = consts.NODE_STATUSES.ready
node.error_type = consts.NODE_ERRORS.deploy
self.assertEqual(
consts.NODE_STATUSES.error, objects.Node.get_status(node)
)

View File

@ -96,14 +96,29 @@ class TestNodeForRedeploy(BaseUnitTest):
self.assertFalse(manager._is_node_for_redeploy(None))
self.assertTrue(manager._is_node_for_redeploy(mock.MagicMock(
pending_addition=True, status=consts.NODE_STATUSES.discover
pending_addition=True, status=consts.NODE_STATUSES.discover,
progress=0, error_type=None
)))
self.assertFalse(manager._is_node_for_redeploy(mock.MagicMock(
pending_addition=False, status=consts.NODE_STATUSES.ready
pending_addition=False, status=consts.NODE_STATUSES.ready,
progress=0, error_type=None
)))
self.assertTrue(manager._is_node_for_redeploy(mock.MagicMock(
pending_addition=True, status=consts.NODE_STATUSES.ready
pending_addition=True, status=consts.NODE_STATUSES.ready,
progress=0, error_type=None
)))
self.assertTrue(
manager._is_node_for_redeploy(mock.MagicMock(
pending_addition=False, error_type=consts.NODE_ERRORS.deploy,
progress=0, status=consts.NODE_STATUSES.ready
))
)
self.assertTrue(
manager._is_node_for_redeploy(mock.MagicMock(
pending_addition=False, status=consts.NODE_STATUSES.stopped,
progress=0, error_type=None
))
)
class TestGetTasksToRun(BaseUnitTest):
@ -167,3 +182,90 @@ class TestGetTasksToRun(BaseUnitTest):
adapter_mock.adapt_legacy_tasks.assert_called_with(
tasks, cluster_obj.get_legacy_plugin_tasks.return_value, resolver
)
class TestUpdateNodes(BaseUnitTest):
@mock.patch('nailgun.transactions.manager.objects')
def test_delete_node_from_cluster(self, obj_mock):
transaction = mock.MagicMock(dry_run=False)
nodes = [mock.MagicMock(uid='1')]
node_params = {'1': {'status': 'deleted'}}
manager._update_nodes(transaction, nodes, node_params)
obj_mock.Node.remove_from_cluster.assert_called_once_with(nodes[0])
@mock.patch('nailgun.transactions.manager.objects')
def test_delete_node_from_cluster_if_dry_run(self, obj_mock):
transaction = mock.MagicMock(dry_run=True)
nodes = [mock.MagicMock(uid='1')]
node_params = {'1': {'status': 'deleted'}}
manager._update_nodes(transaction, nodes, node_params)
self.assertEqual(0, obj_mock.Node.remove_from_cluster.call_count)
@mock.patch('nailgun.transactions.manager.notifier')
def test_set_error_status(self, notifier_mock):
transaction = mock.MagicMock(dry_run=False)
nodes = [mock.MagicMock(uid='1', error_type=None)]
node_params = {
'1': {
'status': 'error', 'error_type': consts.NODE_ERRORS.provision
}
}
manager._update_nodes(transaction, nodes, node_params)
self.assertEqual(consts.NODE_ERRORS.provision, nodes[0].error_type)
notifier_mock.notify.assert_called_once_with(
consts.NOTIFICATION_TOPICS.error,
"Node '{0}' failed: Unknown error".format(nodes[0].name),
cluster_id=transaction.cluster_id,
node_id=nodes[0].uid,
task_uuid=transaction.uuid
)
@mock.patch('nailgun.transactions.manager.notifier')
def test_set_default_error_type(self, notifier_mock):
transaction = mock.MagicMock(dry_run=False)
nodes = [mock.MagicMock(uid='1', error_type=None)]
node_params = {'1': {'status': 'error', 'error_msg': 'error'}}
manager._update_nodes(transaction, nodes, node_params)
self.assertEqual(consts.NODE_ERRORS.deploy, nodes[0].error_type)
notifier_mock.notify.assert_called_once_with(
consts.NOTIFICATION_TOPICS.error,
"Node '{0}' failed: error".format(nodes[0].name),
cluster_id=transaction.cluster_id,
node_id=nodes[0].uid,
task_uuid=transaction.uuid
)
@mock.patch('nailgun.transactions.manager.notifier')
def test_handle_error_status_for_node_if_dry_run(self, notifier_mock):
transaction = mock.MagicMock(dry_run=True)
nodes = [mock.MagicMock(uid='1', error_type=None)]
node_params = {'1': {'status': 'error'}}
manager._update_nodes(transaction, nodes, node_params)
self.assertIsNone(nodes[0].error_type)
self.assertEqual(0, notifier_mock.notify.call_count)
def test_update_node_progress(self):
transaction = mock.MagicMock(dry_run=False)
nodes = [mock.MagicMock(uid='1', progress=0)]
node_params = {'1': {'progress': 10}}
manager._update_nodes(transaction, nodes, node_params)
self.assertEqual(10, nodes[0].progress)
transaction.dry_run = True
node_params = {'1': {'progress': 20}}
manager._update_nodes(transaction, nodes, node_params)
self.assertEqual(20, nodes[0].progress)
def test_update_node_status(self):
transaction = mock.MagicMock(dry_run=False)
nodes = [mock.MagicMock(uid='1', status=consts.NODE_STATUSES.discover)]
node_params = {'1': {'status': consts.NODE_STATUSES.ready}}
manager._update_nodes(transaction, nodes, node_params)
self.assertEqual(consts.NODE_STATUSES.ready, nodes[0].status)
def test_update_node_status_if_dry_run(self):
transaction = mock.MagicMock(dry_run=True)
nodes = [mock.MagicMock(uid='1', status=consts.NODE_STATUSES.discover)]
node_params = {'1': {'status': consts.NODE_STATUSES.ready}}
manager._update_nodes(transaction, nodes, node_params)
self.assertEqual(consts.NODE_STATUSES.discover, nodes[0].status)

View File

@ -23,6 +23,7 @@ from nailgun.db import db
from nailgun import errors
from nailgun import lcm
from nailgun.logger import logger
from nailgun import notifier
from nailgun import objects
from nailgun.orchestrator import deployment_serializers
from nailgun import rpc
@ -224,7 +225,7 @@ class TransactionsManager(object):
)
).all()
_update_nodes(nodes_instances, nodes_params, transaction.dry_run)
_update_nodes(transaction, nodes_instances, nodes_params)
_update_history(transaction, nodes)
if status:
@ -264,6 +265,13 @@ class TransactionsManager(object):
cluster = sub_transaction.cluster
nodes = _get_nodes_to_run(cluster, sub_transaction.cache.get('nodes'))
for node in nodes:
node.roles = list(set(node.roles + node.pending_roles))
node.pending_roles = []
node.error_type = None
node.progress = 1
resolver = role_resolver.RoleResolver(nodes)
tasks = _get_tasks_to_run(
cluster,
@ -373,9 +381,15 @@ def _get_tasks_to_run(cluster, graph_type, node_resolver, names=None):
def _is_node_for_redeploy(node):
if node is None:
return False
return (
node.pending_addition or
node.status == consts.NODE_STATUSES.discover
if node.pending_addition:
return True
node_status = objects.Node.get_status(node)
return node_status in (
consts.NODE_STATUSES.discover,
consts.NODE_STATUSES.error,
consts.NODE_STATUSES.provisioned,
consts.NODE_STATUSES.stopped,
)
@ -438,7 +452,7 @@ def _dump_expected_state(transaction, state, tasks):
db().flush()
def _update_nodes(nodes_instances, nodes_params, dry_run=False):
def _update_nodes(transaction, nodes_instances, nodes_params):
allow_update = {
'name',
'status',
@ -446,26 +460,47 @@ def _update_nodes(nodes_instances, nodes_params, dry_run=False):
'kernel_params',
'pending_addition',
'pending_deletion',
'error_type',
'error_msg',
'online',
'progress',
}
# dry-run transactions must not update nodes except progress column
if dry_run:
if transaction.dry_run:
allow_update = {'progress'}
for node in nodes_instances:
node_params = nodes_params.pop(node.uid)
for param in allow_update.intersection(node_params):
setattr(node, param, node_params[param])
# TODO(ikalnitsky): Ensure Astute sends progress=100 in case of error.
if node.status == consts.NODE_STATUSES.error:
node.progress = 100
if param == 'status':
new_status = node_params['status']
if new_status == 'deleted':
# the deleted is special status which causes
# to delete node from cluster
objects.Node.remove_from_cluster(node)
elif new_status == 'error':
# do not update status of node, only set
# appropriate error type
node.error_type = node_params.get(
'error_type', consts.NODE_ERRORS.deploy
)
node.progress = 100
# Notification on particular node failure
notifier.notify(
consts.NOTIFICATION_TOPICS.error,
u"Node '{0}' failed: {1}".format(
node.name,
node_params.get('error_msg', "Unknown error")
),
cluster_id=transaction.cluster_id,
node_id=node.uid,
task_uuid=transaction.uuid
)
else:
node.status = new_status
else:
setattr(node, param, node_params[param])
db.flush()
if nodes_params: