Task helpers partial refactoring added

What is done:
- most methods from TaskHelper class moved to objects.Task;
- code that was dependent on TaskHelper's methods updated
- unit and integration tests updated

What is missed:
- TaskHelper class is still in use as far as it holds several common use
  helper function (in future will be moved to utils module);

Change-Id: Iff2f2a4d277590db25c6f801f20ae75433937411
Implements: blueprint nailgun-task-helpers-refactoring
This commit is contained in:
Artem Roma 2014-05-29 20:28:05 +03:00
parent 3de47c11a0
commit 0015cd71bc
21 changed files with 598 additions and 429 deletions

View File

@ -12,9 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import string
from random import choice
import string
def password(arg=None):

View File

@ -314,9 +314,9 @@ def setup():
def save_only(iface):
import common.network as network
from common import pwgen
import netifaces
from common import pwgen
#Calculate and set Static/DHCP pool fields
#Max IPs = net size - 2 (master node + bcast)
try:

View File

@ -18,6 +18,7 @@
Handlers dealing with network configurations
"""
import six
import traceback
import web
@ -43,7 +44,6 @@ from nailgun.errors import errors
from nailgun.logger import logger
from nailgun.objects import Task
from nailgun.openstack.common import jsonutils
from nailgun.task.helpers import TaskHelper
from nailgun.task.manager import CheckNetworksTaskManager
from nailgun.task.manager import VerifyNetworksTaskManager
@ -120,7 +120,12 @@ class NovaNetworkConfigurationHandler(ProviderHandler):
cluster
).update(cluster, data)
except Exception as exc:
TaskHelper.set_error(task.uuid, exc)
# set task status to error and update its corresponding data
data = {'status': 'error',
'progress': 100,
'message': six.text_type(exc)}
objects.Task.update(task, data)
logger.error(traceback.format_exc())
#TODO(enchantner): research this behaviour
@ -183,7 +188,12 @@ class NeutronNetworkConfigurationHandler(ProviderHandler):
cluster
).update(cluster, data)
except Exception as exc:
TaskHelper.set_error(task.uuid, exc)
# set task status to error and update its corresponding data
data = {'status': 'error',
'progress': 100,
'message': six.text_type(exc)}
objects.Task.update(task, data)
logger.error(traceback.format_exc())
#TODO(enchantner): research this behaviour

View File

@ -39,6 +39,8 @@ from nailgun.objects import NailgunCollection
from nailgun.objects import NailgunObject
from nailgun.objects import Notification
from nailgun.settings import settings
class Node(NailgunObject):
"""Node object
@ -598,13 +600,6 @@ class Node(NailgunObject):
db().flush()
db().refresh(instance)
@classmethod
def can_be_updated(cls, instance):
return (instance.status in (consts.NODE_STATUSES.ready,
consts.NODE_STATUSES.provisioned)) or \
(instance.status == consts.NODE_STATUSES.error
and instance.error_type == consts.NODE_ERRORS.deploy)
@classmethod
def move_roles_to_pending_roles(cls, instance):
"""Move roles to pending_roles
@ -613,6 +608,16 @@ class Node(NailgunObject):
instance.roles = []
db().flush()
@classmethod
def make_slave_name(cls, instance):
return u"node-{node_id}".format(node_id=instance.id)
@classmethod
def make_slave_fqdn(cls, instance):
return u"{instance_name}.{dns_domain}" \
.format(instance_name=cls.make_slave_name(instance),
dns_domain=settings.DNS_DOMAIN)
class NodeCollection(NailgunCollection):
"""Node collection
@ -637,3 +642,38 @@ class NodeCollection(NailgunCollection):
subqueryload_all('ip_addrs.network_data')
)
return cls.eager_base(iterable, options)
@classmethod
def update_slave_nodes_fqdn(cls, instances):
for n in instances:
n.fqdn = cls.single.make_slave_fqdn(n)
db().flush()
@classmethod
def prepare_for_deployment(cls, instances):
"""Prepare environment for deployment,
assign management, public, storage ips
"""
cls.update_slave_nodes_fqdn(instances)
nodes_ids = [n.id for n in instances]
# TODO(enchantner): check network manager instance for each node
netmanager = Cluster.get_network_manager()
if nodes_ids:
netmanager.assign_ips(nodes_ids, 'management')
netmanager.assign_ips(nodes_ids, 'public')
netmanager.assign_ips(nodes_ids, 'storage')
for node in instances:
netmanager.assign_admin_ips(node.id)
@classmethod
def prepare_for_provisioning(cls, instances):
"""Prepare environment for provisioning,
update fqdns, assign admin IPs
"""
cls.update_slave_nodes_fqdn(instances)
for n in instances:
cls.single.get_network_manager(n).assign_admin_ips(n.id)

View File

@ -23,9 +23,15 @@ from nailgun.db.sqlalchemy import models
from nailgun import consts
from nailgun.errors import errors
from nailgun.logger import logger
from nailgun.objects import Cluster
from nailgun.objects import NailgunCollection
from nailgun.objects import NailgunObject
from nailgun.task.helpers import TaskHelper
class Task(NailgunObject):
@ -84,6 +90,180 @@ class Task(NailgunObject):
)
return res
@classmethod
def update_verify_networks(cls, instance, status,
progress, msg, result):
#TODO(dshulyak) move network tests into ostf
previous_status = instance.status
statuses = [sub.status for sub in instance.subtasks]
messages = [sub.message for sub in instance.subtasks]
messages.append(msg)
statuses.append(status)
if any(st == 'error' for st in statuses):
instance.status = 'error'
else:
instance.status = status or instance.status
instance.progress = progress or instance.progress
instance.result = result or instance.result
# join messages if not None or ""
instance.message = '\n'.join([m for m in messages if m])
if previous_status != instance.status and instance.cluster_id:
logger.debug("Updating cluster status: "
"cluster_id: %s status: %s",
instance.cluster_id, status)
cls._update_cluster_data(instance)
@classmethod
def _update_parent_instance(cls, instance):
subtasks = instance.subtasks
if len(subtasks):
data = dict()
if all(map(lambda s: s.status == 'ready', subtasks)):
data['status'] = 'ready'
data['progress'] = 100
data['message'] = u'\n'.join(map(
lambda s: s.message, filter(
lambda s: s.message is not None, subtasks)))
cls.update(instance, data)
elif any(map(lambda s: s.status in ('error',), subtasks)):
for subtask in subtasks:
if not subtask.status in ('error', 'ready'):
subtask.status = 'error'
subtask.progress = 100
subtask.message = 'Task aborted'
data['status'] = 'error'
data['progress'] = 100
data['message'] = u'\n'.join(list(set(map(
lambda s: (s.message or ""), filter(
lambda s: (
s.status == 'error' and not
# TODO: make this check less ugly
s.message == 'Task aborted'
), subtasks)))))
cls.update(instance, data)
else:
subtasks_with_progress = filter(
lambda s: s.progress is not None,
subtasks
)
if subtasks_with_progress:
instance.progress = \
TaskHelper.calculate_parent_task_progress(
subtasks_with_progress
)
else:
instance.progress = 0
@classmethod
def __update_nodes_to_error(cls, q_nodes_to_error, error_type):
if q_nodes_to_error.count():
logger.debug(
u'Updating nodes to error with error_type "{0}": {1}'
.format(error_type, [n.full_name for n in q_nodes_to_error]))
for n in q_nodes_to_error:
n.status = 'error'
n.progress = 0
n.error_type = error_type
@classmethod
def __update_cluster_status(cls, cluster, status):
logger.debug(
"Updating cluster (%s) status: from %s to %s",
cluster.full_name, cluster.status, status)
Cluster.update(cluster, data={'status': status})
@classmethod
def _update_cluster_data(cls, instance):
cluster = instance.cluster
if instance.name == 'deploy':
if instance.status == 'ready':
# If for some reasosns orchestrator
# didn't send ready status for node
# we should set it explicitly
for n in cluster.nodes:
if n.status == 'deploying':
n.status = 'ready'
n.progress = 100
cls.__update_cluster_status(cluster, 'operational')
Cluster.clear_pending_changes(cluster)
elif instance.status == 'error' and \
not TaskHelper.before_deployment_error(instance):
# We don't want to set cluster status to
# error because we don't want to lock
# settings if cluster wasn't delpoyed
cls.__update_cluster_status(cluster, 'error')
elif instance.name == 'deployment' and instance.status == 'error':
cls.__update_cluster_status(cluster, 'error')
q_nodes_to_error = \
TaskHelper.get_nodes_to_deployment_error(cluster)
cls.__update_nodes_to_error(q_nodes_to_error,
error_type='deploy')
elif instance.name == 'provision' and instance.status == 'error':
cls.__update_cluster_status(cluster, 'error')
q_nodes_to_error = \
TaskHelper.get_nodes_to_provisioning_error(cluster)
cls.__update_nodes_to_error(q_nodes_to_error,
error_type='provision')
elif instance.name == 'stop_deployment':
if instance.status == 'error':
cls.__update_cluster_status(cluster, 'error')
else:
cls.__update_cluster_status(cluster, 'stopped')
elif instance.name == consts.TASK_NAMES.update:
if instance.status == consts.TASK_STATUSES.error:
cls.__update_cluster_status(
cluster,
consts.CLUSTER_STATUSES.update_error
)
elif instance.status == consts.TASK_STATUSES.ready:
cls.__update_cluster_status(
cluster,
consts.CLUSTER_STATUSES.operational
)
cluster.release_id = cluster.pending_release_id
cluster.pending_release_id = None
@classmethod
def update(cls, instance, data):
logger.debug("Updating task: %s", instance.uuid)
super(Task, cls).update(instance, data)
# this commit is needed because of strange bug
# with db management in fake threads for testing
db().commit()
if instance.cluster_id:
logger.debug("Updating cluster status: %s "
"cluster_id: %s status: %s",
instance.uuid, instance.cluster_id,
data.get('status'))
cls._update_cluster_data(instance)
if instance.parent:
logger.debug("Updating parent task: %s.", instance.parent.uuid)
cls._update_parent_instance(instance.parent)
class TaskCollection(NailgunCollection):

View File

@ -34,7 +34,6 @@ from nailgun.db.sqlalchemy.models import Node
from nailgun.errors import errors
from nailgun.logger import logger
from nailgun.settings import settings
from nailgun.task.helpers import TaskHelper
from nailgun.utils import dict_merge
from nailgun.volumes import manager as volume_manager
@ -150,7 +149,7 @@ class DeploymentMultinodeSerializer(object):
node_list.append({
'uid': node.uid,
'fqdn': node.fqdn,
'name': TaskHelper.make_slave_name(node.id),
'name': objects.Node.make_slave_name(node),
'role': role})
return node_list
@ -887,7 +886,7 @@ class NeutronNetworkDeploymentSerializer(NetworkDeploymentSerializer):
def serialize(cluster, nodes):
"""Serialization depends on deployment mode
"""
TaskHelper.prepare_for_deployment(cluster.nodes)
objects.NodeCollection.prepare_for_deployment(cluster.nodes)
if cluster.mode == 'multinode':
serializer = DeploymentMultinodeSerializer

View File

@ -21,7 +21,6 @@ import netaddr
from nailgun.logger import logger
from nailgun.settings import settings
from nailgun.task.helpers import TaskHelper
class ProvisioningSerializer(object):
@ -62,9 +61,9 @@ class ProvisioningSerializer(object):
serialized_node = {
'uid': node.uid,
'power_address': node.ip,
'name': TaskHelper.make_slave_name(node.id),
'name': objects.Node.make_slave_name(node),
# right now it duplicates to avoid possible issues
'slave_name': TaskHelper.make_slave_name(node.id),
'slave_name': objects.Node.make_slave_name(node),
'hostname': node.fqdn,
'power_pass': cls.get_ssh_key_path(node),
@ -179,6 +178,6 @@ class ProvisioningSerializer(object):
def serialize(cluster, nodes):
"""Serialize cluster for provisioning."""
TaskHelper.prepare_for_provisioning(nodes)
objects.NodeCollection.prepare_for_provisioning(nodes)
return ProvisioningSerializer.serialize(cluster, nodes)

View File

@ -102,7 +102,9 @@ class NailgunReceiver(object):
if not error_msg:
error_msg = ". ".join([success_msg, err_msg])
TaskHelper.update_task_status(task_uuid, status, progress, error_msg)
task = objects.Task.get_by_uuid(task_uuid, fail_if_not_found=True)
data = {'status': status, 'progress': progress, 'message': error_msg}
objects.Task.update(task, data)
@classmethod
def remove_cluster_resp(cls, **kwargs):
@ -114,7 +116,7 @@ class NailgunReceiver(object):
cls.remove_nodes_resp(**kwargs)
task = TaskHelper.get_task_by_uuid(task_uuid)
task = objects.Task.get_by_uuid(task_uuid, fail_if_not_found=True)
cluster = task.cluster
if task.status in ('ready',):
@ -249,7 +251,8 @@ class NailgunReceiver(object):
elif status in ('ready',):
cls._success_action(task, status, progress)
else:
TaskHelper.update_task_status(task.uuid, status, progress, message)
data = {'status': status, 'progress': progress, 'message': message}
objects.Task.update(task, data)
@classmethod
def provision_resp(cls, **kwargs):
@ -263,8 +266,6 @@ class NailgunReceiver(object):
progress = kwargs.get('progress')
nodes = kwargs.get('nodes', [])
task = TaskHelper.get_task_by_uuid(task_uuid)
for node in nodes:
uid = node.get('uid')
node_db = db().query(Node).get(uid)
@ -284,11 +285,12 @@ class NailgunReceiver(object):
db().commit()
task = TaskHelper.get_task_by_uuid(task_uuid)
task = objects.Task.get_by_uuid(task_uuid, fail_if_not_found=True)
if nodes and not progress:
progress = TaskHelper.recalculate_provisioning_task_progress(task)
TaskHelper.update_task_status(task.uuid, status, progress, message)
data = {'status': status, 'progress': progress, 'message': message}
objects.Task.update(task, data)
@classmethod
def _generate_error_message(cls, task, error_types, names_only=False):
@ -336,7 +338,8 @@ class NailgunReceiver(object):
message,
task.cluster_id
)
TaskHelper.update_task_status(task.uuid, status, progress, message)
data = {'status': status, 'progress': progress, 'message': message}
objects.Task.update(task, data)
@classmethod
def _success_action(cls, task, status, progress):
@ -428,7 +431,9 @@ class NailgunReceiver(object):
message,
task.cluster_id
)
TaskHelper.update_task_status(task.uuid, status, progress, message)
data = {'status': status, 'progress': progress, 'message': message}
objects.Task.update(task, data)
@classmethod
def stop_deployment_resp(cls, **kwargs):
@ -443,9 +448,8 @@ class NailgunReceiver(object):
status = kwargs.get('status')
progress = kwargs.get('progress')
task = TaskHelper.get_task_by_uuid(task_uuid)
# Locking stop task
objects.Task.get_by_uuid(
task = objects.Task.get_by_uuid(
task_uuid,
fail_if_not_found=True,
lock_for_update=True
@ -532,12 +536,8 @@ class NailgunReceiver(object):
task.cluster_id
)
TaskHelper.update_task_status(
task_uuid,
status,
progress,
message
)
data = {'status': status, 'progress': progress, 'message': message}
objects.Task.update(task, data)
@classmethod
def reset_environment_resp(cls, **kwargs):
@ -552,7 +552,7 @@ class NailgunReceiver(object):
status = kwargs.get('status')
progress = kwargs.get('progress')
task = TaskHelper.get_task_by_uuid(task_uuid)
task = objects.Task.get_by_uuid(task_uuid)
if status == "ready":
@ -615,12 +615,8 @@ class NailgunReceiver(object):
task.cluster_id
)
TaskHelper.update_task_status(
task.uuid,
status,
progress,
message
)
data = {'status': status, 'progress': progress, 'message': message}
objects.Task.update(task, data)
@classmethod
def _notify_inaccessible(cls, cluster_id, nodes_uids, action):
@ -657,7 +653,7 @@ class NailgunReceiver(object):
progress = kwargs.get('progress')
# We simply check that each node received all vlans for cluster
task = TaskHelper.get_task_by_uuid(task_uuid)
task = objects.Task.get_by_uuid(task_uuid, fail_if_not_found=True)
result = []
# We expect that 'nodes' contains all nodes which we test.
@ -765,11 +761,12 @@ class NailgunReceiver(object):
status = 'error'
logger.error(error_msg)
if status not in ('ready', 'error'):
TaskHelper.update_task_status(task_uuid, status, progress,
error_msg, result)
data = {'status': status, 'progress': progress,
'message': error_msg, 'result': result}
objects.Task.update(task, data)
else:
TaskHelper.update_verify_networks(task_uuid, status, progress,
error_msg, result)
objects.Task.update_verify_networks(task, status, progress,
error_msg, result)
@classmethod
def check_dhcp_resp(cls, **kwargs):
@ -817,8 +814,10 @@ class NailgunReceiver(object):
status = status if not messages else "error"
error_msg = '\n'.join(messages) if messages else error_msg
logger.debug('Check dhcp message %s', error_msg)
TaskHelper.update_verify_networks(task_uuid, status, progress,
error_msg, result)
task = objects.Task.get_by_uuid(task_uuid, fail_if_not_found=True)
objects.Task.update_verify_networks(task, status, progress,
error_msg, result)
# Red Hat related callbacks
@ -833,7 +832,7 @@ class NailgunReceiver(object):
status = kwargs.get('status')
progress = kwargs.get('progress')
task = TaskHelper.get_task_by_uuid(task_uuid)
task = objects.Task.get_by_uuid(task_uuid, fail_if_not_found=True)
release_info = task.cache['args']['release_info']
release_id = release_info['release_id']
@ -859,13 +858,9 @@ class NailgunReceiver(object):
}
}
TaskHelper.update_task_status(
task_uuid,
status,
progress,
error_msg,
result
)
data = {'status': status, 'progress': progress, 'message': error_msg,
'result': result}
objects.Task.update(task, data)
@classmethod
def redhat_check_licenses_resp(cls, **kwargs):
@ -879,7 +874,7 @@ class NailgunReceiver(object):
progress = kwargs.get('progress')
notify = kwargs.get('msg')
task = TaskHelper.get_task_by_uuid(task_uuid)
task = objects.Task.get_by_uuid(task_uuid, fail_if_not_found=True)
release_info = task.cache['args']['release_info']
release_id = release_info['release_id']
@ -907,13 +902,9 @@ class NailgunReceiver(object):
}
}
TaskHelper.update_task_status(
task_uuid,
status,
progress,
error_msg,
result
)
data = {'status': status, 'progress': progress, 'message': error_msg,
'result': result}
objects.Task.update(task, data)
@classmethod
def download_release_resp(cls, **kwargs):
@ -926,7 +917,7 @@ class NailgunReceiver(object):
status = kwargs.get('status')
progress = kwargs.get('progress')
task = TaskHelper.get_task_by_uuid(task_uuid)
task = objects.Task.get_by_uuid(task_uuid, fail_if_not_found=True)
release_info = task.cache['args']['release_info']
release_id = release_info['release_id']
@ -953,13 +944,9 @@ class NailgunReceiver(object):
}
}
TaskHelper.update_task_status(
task_uuid,
status,
progress,
error_msg,
result
)
data = {'status': status, 'progress': progress, 'message': error_msg,
'result': result}
objects.Task.update(task, data)
@classmethod
def _update_release_state(cls, release_id, state):
@ -1002,13 +989,19 @@ class NailgunReceiver(object):
progress = kwargs.get('progress')
error = kwargs.get('error')
msg = kwargs.get('msg')
task = objects.Task.get_by_uuid(task_uuid, fail_if_not_found=True)
if status == 'error':
notifier.notify('error', error)
TaskHelper.update_task_status(task_uuid, status, 100, error)
data = {'status': status, 'progress': 100, 'message': error}
objects.Task.update(task, data)
elif status == 'ready':
dumpfile = os.path.basename(msg)
notifier.notify('done', 'Snapshot is ready. '
'Visit Support page to download')
TaskHelper.update_task_status(
task_uuid, status, progress,
'/dump/{0}'.format(dumpfile))
data = {'status': status, 'progress': progress,
'message': '/dump/{0}'.format(dumpfile)}
objects.Task.update(task, data)

View File

@ -494,7 +494,8 @@ class FakeVerificationThread(FakeThread):
receiver = NailgunReceiver
kwargs = {
'task_uuid': self.task_uuid,
'progress': 0
'progress': 0,
'status': 'running'
}
tick_count = int(settings.FAKE_TASKS_TICK_COUNT)
@ -527,6 +528,8 @@ class FakeVerificationThread(FakeThread):
kwargs['status'] = 'ready'
ready = True
resp_method(**kwargs)
db().commit()
if time.time() - timer > timeout:
raise Exception("Timeout exceed")
self.sleep(tick_interval)

View File

@ -20,9 +20,6 @@ import shutil
from sqlalchemy import or_
from nailgun import consts
from nailgun import objects
from nailgun.db import db
from nailgun.db.sqlalchemy.models import IPAddr
from nailgun.db.sqlalchemy.models import Node
@ -34,25 +31,9 @@ from nailgun.settings import settings
class TaskHelper(object):
# TODO(aroma): move it to utils module
@classmethod
def make_slave_name(cls, nid):
return u"node-%s" % str(nid)
@classmethod
def make_slave_fqdn(cls, nid):
return u"%s.%s" % (cls.make_slave_name(nid), settings.DNS_DOMAIN)
@classmethod
def update_slave_nodes_fqdn(cls, nodes):
for n in nodes:
fqdn = cls.make_slave_fqdn(n.id)
if n.fqdn != fqdn:
n.fqdn = fqdn
logger.debug("Updating node fqdn: %s %s", n.id, n.fqdn)
db().commit()
@classmethod
def prepare_syslog_dir(cls, node, prefix=None):
def prepare_syslog_dir(cls, node, admin_net_id, prefix=None):
logger.debug("Preparing syslog directories for node: %s", node.fqdn)
if not prefix:
prefix = settings.SYSLOG_DIR
@ -62,10 +43,6 @@ class TaskHelper(object):
bak = os.path.join(prefix, "%s.bak" % str(node.fqdn))
new = os.path.join(prefix, str(node.fqdn))
admin_net_id = objects.Node.get_network_manager(
node
).get_admin_network_group_id()
links = map(
lambda i: os.path.join(prefix, i.ip_addr),
db().query(IPAddr.ip_addr).
@ -119,123 +96,9 @@ class TaskHelper(object):
os.system("/usr/bin/pkill -HUP rsyslog")
# TODO(aroma): move this function to utils module
@classmethod
def update_task_status(cls, uuid, status, progress,
msg="", result=None):
logger.debug("Updating task: %s", uuid)
task = db().query(Task).filter_by(uuid=uuid).first()
if not task:
logger.error("Can't set status='%s', message='%s':no task \
with UUID %s found!", status, msg, uuid)
return
data = {'status': status, 'progress': progress,
'message': msg, 'result': result}
for key, value in data.iteritems():
if value is not None:
setattr(task, key, value)
logger.info(
u"Task {0} ({1}) {2} is set to {3}".format(
task.uuid, task.name, key, value))
db().commit()
if task.cluster_id:
logger.debug("Updating cluster status: %s "
"cluster_id: %s status: %s",
uuid, task.cluster_id, status)
cls.update_cluster_status(uuid)
if task.parent:
logger.debug("Updating parent task: %s.", task.parent.uuid)
cls.update_parent_task(task.parent.uuid)
@classmethod
def update_verify_networks(cls, uuid, status,
progress, msg, result):
#TODO(dshulyak) move network tests into ostf
task = db().query(Task).filter_by(uuid=uuid).first()
if not task:
logger.error("Can't set status='%s', message='%s': No task \
with UUID %s found!", status, msg, uuid)
return
previous_status = task.status
statuses = [sub.status for sub in task.subtasks]
messages = [sub.message for sub in task.subtasks]
messages.append(msg)
statuses.append(status)
if any(st == 'error' for st in statuses):
task.status = 'error'
else:
task.status = status or task.status
task.progress = progress or task.progress
task.result = result or task.result
# join messages if not None or ""
task.message = '\n'.join([m for m in messages if m])
db().commit()
if previous_status != task.status and task.cluster_id:
logger.debug("Updating cluster status: "
"cluster_id: %s status: %s",
task.cluster_id, status)
cls.update_cluster_status(uuid)
@classmethod
def get_task_by_uuid(cls, uuid):
task = db().query(Task).filter_by(uuid=uuid).first()
if not task:
raise errors.CannotFindTask(
'Cannot find task with uuid {0}'.format(uuid)
)
return task
@classmethod
def update_parent_task(cls, uuid):
task = db().query(Task).filter_by(uuid=uuid).first()
subtasks = task.subtasks
if len(subtasks):
if all(map(lambda s: s.status == 'ready', subtasks)):
task.status = 'ready'
task.progress = 100
task.message = u'\n'.join(map(
lambda s: s.message, filter(
lambda s: s.message is not None, subtasks)))
db().commit()
cls.update_cluster_status(uuid)
elif any(map(lambda s: s.status in ('error',), subtasks)):
for subtask in subtasks:
if not subtask.status in ('error', 'ready'):
subtask.status = 'error'
subtask.progress = 100
subtask.message = 'Task aborted'
task.status = 'error'
task.progress = 100
task.message = u'\n'.join(list(set(map(
lambda s: (s.message or ""), filter(
lambda s: (
s.status == 'error' and not
# TODO: make this check less ugly
s.message == 'Task aborted'
), subtasks)))))
db().commit()
cls.update_cluster_status(uuid)
else:
subtasks_with_progress = filter(
lambda s: s.progress is not None,
subtasks
)
if subtasks_with_progress:
task.progress = cls._calculate_parent_task_progress(
subtasks_with_progress
)
else:
task.progress = 0
db().commit()
@classmethod
def _calculate_parent_task_progress(cls, subtasks_list):
def calculate_parent_task_progress(cls, subtasks_list):
return int(
round(
sum(
@ -248,52 +111,9 @@ class TaskHelper(object):
), 0)
)
# TODO(aroma): move it to utils module
@classmethod
def update_cluster_status(cls, uuid):
task = db().query(Task).filter_by(uuid=uuid).first()
cluster = task.cluster
if task.name == 'deploy':
if task.status == 'ready':
# If for some reasosns orchestrator
# didn't send ready status for node
# we should set it explicitly
for n in cluster.nodes:
if n.status == 'deploying':
n.status = 'ready'
n.progress = 100
cls.__set_cluster_status(cluster, 'operational')
objects.Cluster.clear_pending_changes(cluster)
elif task.status == 'error' and \
not cls.__before_deployment_error(task):
# We don't want to set cluster status to
# error because we don't want to lock
# settings if cluster wasn't delpoyed
cls.__set_cluster_status(cluster, 'error')
elif task.name == 'deployment' and task.status == 'error':
cls.__update_cluster_to_deployment_error(cluster)
elif task.name == 'provision' and task.status == 'error':
cls.__update_cluster_to_provisioning_error(cluster)
elif task.name == 'stop_deployment':
if task.status == 'error':
cls.__set_cluster_status(cluster, 'error')
else:
cls.__set_cluster_status(cluster, 'stopped')
elif task.name == consts.TASK_NAMES.update:
if task.status == consts.TASK_STATUSES.error:
cls.__set_cluster_status(cluster,
consts.CLUSTER_STATUSES.update_error)
elif task.status == consts.TASK_STATUSES.ready:
cls.__set_cluster_status(cluster,
consts.CLUSTER_STATUSES.operational)
cluster.release_id = cluster.pending_release_id
cluster.pending_release_id = None
db().commit()
@classmethod
def __before_deployment_error(cls, task):
def before_deployment_error(cls, task):
"""Returns True in case of check_before_deployment
or check_networks error and if cluster wasn't
deployed yet
@ -306,42 +126,23 @@ class TaskHelper(object):
return not task.cluster.is_locked and error_checking_tasks_count
# TODO(aroma): move this method to utils module
@classmethod
def __update_cluster_to_provisioning_error(cls, cluster):
cls.__set_cluster_status(cluster, 'error')
nodes_to_error = db().query(Node).\
def get_nodes_to_provisioning_error(cls, cluster):
q_nodes_to_error = db().query(Node).\
filter(Node.cluster == cluster).\
filter(Node.status.in_(['provisioning']))
cls.__set_nodes_status_to_error(nodes_to_error, 'provision')
return q_nodes_to_error
# TODO(aroma): move this method to utils module
@classmethod
def __update_cluster_to_deployment_error(cls, cluster):
cls.__set_cluster_status(cluster, 'error')
nodes_to_error = db().query(Node).\
def get_nodes_to_deployment_error(cls, cluster):
q_nodes_to_error = db().query(Node).\
filter(Node.cluster == cluster).\
filter(Node.status.in_(['provisioned', 'deploying']))
cls.__set_nodes_status_to_error(nodes_to_error, 'deploy')
@classmethod
def __set_cluster_status(cls, cluster, new_state):
logger.debug(
"Updating cluster (%s) status: from %s to %s",
cluster.full_name, cluster.status, new_state)
cluster.status = new_state
@classmethod
def __set_nodes_status_to_error(cls, nodes_to_error, error_type):
if nodes_to_error.count():
logger.debug(
u'Updating nodes to error with error_type "{0}": {1}'.format(
error_type, [n.full_name for n in nodes_to_error]))
for node in nodes_to_error:
node.status = 'error'
node.progress = 0
node.error_type = error_type
return q_nodes_to_error
@classmethod
def recalculate_deployment_task_progress(cls, task):
@ -386,6 +187,8 @@ class TaskHelper(object):
cluster.nodes
)
# TODO(aroma): considering moving this code to
# nailgun Cluster object's methods
@classmethod
def nodes_to_deploy(cls, cluster):
nodes_to_deploy = sorted(filter(
@ -419,10 +222,17 @@ class TaskHelper(object):
cluster.nodes
), key=lambda n: n.id)
@classmethod
def _node_can_be_updated(cls, node):
return (node.status in (consts.NODE_STATUSES.ready,
consts.NODE_STATUSES.provisioned)) or \
(node.status == consts.NODE_STATUSES.error
and node.error_type == consts.NODE_ERRORS.deploy)
@classmethod
def nodes_to_upgrade(cls, cluster):
nodes_to_upgrade = filter(
lambda n: objects.Node.can_be_updated(n),
lambda n: cls._node_can_be_updated(n),
cluster.nodes
)
@ -489,33 +299,3 @@ class TaskHelper(object):
db().commit()
full_err_msg = u"\n".join(err_messages)
raise errors.NetworkCheckError(full_err_msg)
@classmethod
def prepare_for_provisioning(cls, nodes):
"""Prepare environment for provisioning,
update fqdns, assign admin IPs
"""
cls.update_slave_nodes_fqdn(nodes)
for node in nodes:
objects.Node.get_network_manager(
node
).assign_admin_ips(node.id)
@classmethod
def prepare_for_deployment(cls, nodes):
"""Prepare environment for deployment,
assign management, public, storage ips
"""
cls.update_slave_nodes_fqdn(nodes)
nodes_ids = [n.id for n in nodes]
# TODO(enchantner): check network manager instance for each node
netmanager = objects.Cluster.get_network_manager()
if nodes_ids:
netmanager.assign_ips(nodes_ids, 'management')
netmanager.assign_ips(nodes_ids, 'public')
netmanager.assign_ips(nodes_ids, 'storage')
for node in nodes:
netmanager.assign_admin_ips(node.id)

View File

@ -27,6 +27,7 @@ from nailgun.db.sqlalchemy.models import RedHatAccount
from nailgun.db.sqlalchemy.models import Task
from nailgun.errors import errors
from nailgun.logger import logger
from nailgun import objects
from nailgun.openstack.common import jsonutils
import nailgun.rpc as rpc
from nailgun.task import task as tasks
@ -52,12 +53,12 @@ class TaskManager(object):
hasattr(exc, "log_traceback") and exc.log_traceback
]):
logger.error(traceback.format_exc())
TaskHelper.update_task_status(
task.uuid,
status="error",
progress=100,
msg=err
)
# update task entity with given data
data = {'status': 'error',
'progress': 100,
'message': err}
objects.Task.update(task, data)
def check_running_task(self, task_name):
current_tasks = db().query(Task).filter_by(
@ -149,12 +150,11 @@ class ApplyChangesTaskManager(TaskManager):
]
)
except Exception as exc:
TaskHelper.update_task_status(
supertask.uuid,
status='error',
progress=100,
msg=str(exc)
)
# update task entity with given data
data = {'status': 'error',
'progress': 100,
'message': str(exc)}
objects.Task.update(supertask, data)
return supertask
task_messages.extend(redhat_messages)
# /in case of Red Hat
@ -170,7 +170,7 @@ class ApplyChangesTaskManager(TaskManager):
self._call_silently(task_deletion, tasks.DeletionTask)
if nodes_to_provision:
TaskHelper.update_slave_nodes_fqdn(nodes_to_provision)
objects.NodeCollection.update_slave_nodes_fqdn(nodes_to_provision)
logger.debug("There are nodes to provision: %s",
" ".join([n.fqdn for n in nodes_to_provision]))
@ -197,7 +197,7 @@ class ApplyChangesTaskManager(TaskManager):
task_messages.append(provision_message)
if nodes_to_deploy:
TaskHelper.update_slave_nodes_fqdn(nodes_to_deploy)
objects.NodeCollection.update_slave_nodes_fqdn(nodes_to_deploy)
logger.debug("There are nodes to deploy: %s",
" ".join([n.fqdn for n in nodes_to_deploy]))
task_deployment = supertask.create_subtask("deployment")
@ -241,12 +241,11 @@ class ApplyChangesTaskManager(TaskManager):
def _redhat_messages(self, supertask, nodes_info):
account = db().query(RedHatAccount).first()
if not account:
TaskHelper.update_task_status(
supertask.uuid,
status="error",
progress=100,
msg="RHEL account is not found"
)
# update task entity with given data
data = {'status': 'error',
'progress': 100,
'message': 'RHEL account is not found'}
objects.Task.update(supertask, data)
return supertask
rhel_data = {
@ -358,7 +357,8 @@ class ProvisioningTaskManager(TaskManager):
https://blueprints.launchpad.net/fuel/+spec
/nailgun-separate-provisioning-for-redhat
"""
TaskHelper.update_slave_nodes_fqdn(nodes_to_provision)
objects.NodeCollection.update_slave_nodes_fqdn(nodes_to_provision)
logger.debug('Nodes to provision: {0}'.format(
' '.join([n.fqdn for n in nodes_to_provision])))
@ -391,7 +391,9 @@ class ProvisioningTaskManager(TaskManager):
class DeploymentTaskManager(TaskManager):
def execute(self, nodes_to_deployment):
TaskHelper.update_slave_nodes_fqdn(nodes_to_deployment)
objects.NodeCollection.update_slave_nodes_fqdn(nodes_to_deployment)
logger.debug('Nodes to deploy: {0}'.format(
' '.join([n.fqdn for n in nodes_to_deployment])))
task_deployment = Task(name='deployment', cluster=self.cluster)
@ -413,6 +415,7 @@ class DeploymentTaskManager(TaskManager):
node.progress = 0
db().commit()
rpc.cast('naily', deployment_message)
return task_deployment
@ -538,7 +541,7 @@ class UpdateEnvironmentTaskManager(TaskManager):
)
nodes_to_change = TaskHelper.nodes_to_upgrade(self.cluster)
TaskHelper.update_slave_nodes_fqdn(nodes_to_change)
objects.NodeCollection.update_slave_nodes_fqdn(nodes_to_change)
logger.debug('Nodes to update: {0}'.format(
' '.join([n.fqdn for n in nodes_to_change])))
task_update = Task(name='update', cluster=self.cluster)
@ -591,11 +594,9 @@ class CheckNetworksTaskManager(TaskManager):
)
db().refresh(task)
if task.status == 'running':
TaskHelper.update_task_status(
task.uuid,
status="ready",
progress=100
)
# update task status with given data
data = {'status': 'ready', 'progress': 100}
objects.Task.update(task, data)
return task
@ -801,12 +802,10 @@ class RedHatSetupTaskManager(TaskManager):
)
db().refresh(task)
if task.status == 'error':
TaskHelper.update_task_status(
supertask.uuid,
status="error",
progress=100,
msg=task.message
)
# update task entity with given data
data = {'status': 'error', 'progress': 100,
'message': task.message}
objects.Task.update(supertask, data)
return supertask
task.cache = msg
db().add(task)

View File

@ -199,7 +199,11 @@ class ProvisionTask(object):
if settings.FAKE_TASKS or settings.FAKE_TASKS_AMQP:
continue
TaskHelper.prepare_syslog_dir(node)
admin_net_id = objects.Node.get_network_manager(
node
).get_admin_network_group_id()
TaskHelper.prepare_syslog_dir(node, admin_net_id)
return make_astute_message(
'provision',
@ -241,7 +245,7 @@ class DeletionTask(object):
'id': node.id,
'uid': node.id,
'roles': node.roles,
'slave_name': TaskHelper.make_slave_name(node.id)
'slave_name': objects.Node.make_slave_name(node)
})
if USE_FAKE:
@ -269,7 +273,7 @@ class DeletionTask(object):
for node in nodes_to_delete_constant:
node_db = db().query(Node).get(node['id'])
slave_name = TaskHelper.make_slave_name(node['id'])
slave_name = objects.Node.make_slave_name(node_db)
logger.debug("Removing node from database and pending it "
"to clean its MBR: %s", slave_name)
if node_db.status == 'discover':
@ -322,7 +326,7 @@ class StopDeploymentTask(object):
{
'uid': n.uid,
'roles': n.roles,
'slave_name': TaskHelper.make_slave_name(n.id),
'slave_name': objects.Node.make_slave_name(n),
'admin_ip': objects.Node.get_network_manager(
n
).get_admin_ip_for_node(n)
@ -369,7 +373,7 @@ class ResetEnvironmentTask(object):
{
'uid': n.uid,
'roles': n.roles,
'slave_name': TaskHelper.make_slave_name(n.id)
'slave_name': objects.Node.make_slave_name(n)
} for n in nodes_to_reset
],
"engine": {

View File

@ -27,7 +27,6 @@ from nailgun.db.sqlalchemy.models import NetworkGroup
from nailgun.network.manager import NetworkManager
from nailgun.openstack.common import jsonutils
from nailgun.settings import settings
from nailgun.task.helpers import TaskHelper
from nailgun.test.base import BaseIntegrationTest
from nailgun.test.base import fake_tasks
from nailgun.test.base import reverse
@ -248,7 +247,7 @@ class TestHandlers(BaseIntegrationTest):
'udevrules': udev_interfaces_mapping},
'power_address': n.ip,
'power_pass': settings.PATH_TO_BOOTSTRAP_SSH_KEY,
'name': TaskHelper.make_slave_name(n.id),
'name': objects.Node.make_slave_name(n),
'hostname': n.fqdn,
'name_servers': '\"%s\"' % settings.DNS_SERVERS,
'name_servers_search': '\"%s\"' % settings.DNS_SEARCH,
@ -647,7 +646,7 @@ class TestHandlers(BaseIntegrationTest):
'udevrules': udev_interfaces_mapping},
'power_address': n.ip,
'power_pass': settings.PATH_TO_BOOTSTRAP_SSH_KEY,
'name': TaskHelper.make_slave_name(n.id),
'name': objects.Node.make_slave_name(n),
'hostname': n.fqdn,
'name_servers': '\"%s\"' % settings.DNS_SERVERS,
'name_servers_search': '\"%s\"' % settings.DNS_SEARCH,
@ -781,7 +780,7 @@ class TestHandlers(BaseIntegrationTest):
self.assertEqual(len(n_rpc_provision), 1)
self.assertEqual(
n_rpc_provision[0]['name'],
TaskHelper.make_slave_name(self.env.nodes[0].id)
objects.Node.make_slave_name(self.env.nodes[0])
)
# deploy method call [1][0][1][1]

View File

@ -157,6 +157,7 @@ class TestNodeHandlers(BaseIntegrationTest):
for net in nets['networks']:
if net['name'] == 'management':
net['vlan_start'] = None
resp = self.env.nova_networks_put(cluster['id'], nets)
self.assertEqual(resp.status_code, 202)
task = jsonutils.loads(resp.body)

View File

@ -29,8 +29,10 @@ from nailgun.orchestrator.deployment_serializers import\
DeploymentHASerializer
from nailgun.orchestrator.deployment_serializers import\
DeploymentMultinodeSerializer
from nailgun import objects
from nailgun.settings import settings
from nailgun.task.helpers import TaskHelper
from nailgun.test.base import BaseIntegrationTest
from nailgun.test.base import reverse
from nailgun.volumes import manager
@ -60,7 +62,7 @@ class OrchestratorSerializerTestBase(BaseIntegrationTest):
return DeploymentHASerializer
def serialize(self, cluster):
TaskHelper.prepare_for_deployment(cluster.nodes)
objects.NodeCollection.prepare_for_deployment(cluster.nodes)
return self.serializer.serialize(cluster, cluster.nodes)
@ -86,7 +88,8 @@ class TestNovaOrchestratorSerializer(OrchestratorSerializerTestBase):
nodes_kwargs=node_args)
cluster_db = self.db.query(Cluster).get(cluster['id'])
TaskHelper.prepare_for_deployment(cluster_db.nodes)
objects.NodeCollection.prepare_for_deployment(cluster_db.nodes)
self.db.flush()
return cluster_db
def assert_roles_flattened(self, nodes):
@ -112,9 +115,12 @@ class TestNovaOrchestratorSerializer(OrchestratorSerializerTestBase):
def test_serialize_node(self):
node = self.env.create_node(
api=True, cluster_id=self.cluster.id, pending_addition=True)
TaskHelper.prepare_for_deployment(self.cluster.nodes)
objects.NodeCollection.prepare_for_deployment(self.cluster.nodes)
self.db.flush()
node_db = self.db.query(Node).get(node['id'])
serialized_data = self.serializer.serialize_node(node_db, 'controller')
self.assertEqual(serialized_data['role'], 'controller')
@ -289,7 +295,7 @@ class TestNovaOrchestratorHASerializer(OrchestratorSerializerTestBase):
{'roles': ['cinder'], 'pending_addition': True}])
cluster_db = self.db.query(Cluster).get(cluster['id'])
TaskHelper.prepare_for_deployment(cluster_db.nodes)
objects.NodeCollection.prepare_for_deployment(cluster_db.nodes)
return cluster_db
@property
@ -395,7 +401,7 @@ class TestNeutronOrchestratorSerializer(OrchestratorSerializerTestBase):
'pending_addition': True}])
cluster_db = self.db.query(Cluster).get(cluster['id'])
TaskHelper.prepare_for_deployment(cluster_db.nodes)
objects.NodeCollection.prepare_for_deployment(cluster_db.nodes)
return cluster_db
def assert_roles_flattened(self, nodes):
@ -420,7 +426,7 @@ class TestNeutronOrchestratorSerializer(OrchestratorSerializerTestBase):
def test_serialize_node(self):
node = self.env.create_node(
api=True, cluster_id=self.cluster.id, pending_addition=True)
TaskHelper.prepare_for_deployment(self.cluster.nodes)
objects.NodeCollection.prepare_for_deployment(self.cluster.nodes)
node_db = self.db.query(Node).get(node['id'])
serialized_data = self.serializer.serialize_node(node_db, 'controller')
@ -533,7 +539,7 @@ class TestNeutronOrchestratorSerializer(OrchestratorSerializerTestBase):
)
cluster_db = self.db.query(Cluster).get(cluster['id'])
TaskHelper.prepare_for_deployment(cluster_db.nodes)
objects.NodeCollection.prepare_for_deployment(cluster_db.nodes)
return cluster_db
def _make_data_copy(self, data_to_copy):
@ -737,7 +743,7 @@ class TestNeutronOrchestratorHASerializer(OrchestratorSerializerTestBase):
)
cluster_db = self.db.query(Cluster).get(cluster['id'])
TaskHelper.prepare_for_deployment(cluster_db.nodes)
objects.NodeCollection.prepare_for_deployment(cluster_db.nodes)
return cluster_db
@property
@ -927,7 +933,7 @@ class TestMongoNodesSerialization(OrchestratorSerializerTestBase):
]
)
cluster = self.db.query(Cluster).get(cluster['id'])
TaskHelper.prepare_for_deployment(cluster.nodes)
objects.NodeCollection.prepare_for_deployment(cluster.nodes)
return cluster
def test_mongo_roles_equals_in_defferent_modes(self):
@ -985,7 +991,7 @@ class TestRepoAndPuppetDataSerialization(OrchestratorSerializerTestBase):
)["id"]
cluster = self.db.query(Cluster).get(cluster_id)
TaskHelper.prepare_for_deployment(cluster.nodes)
objects.NodeCollection.prepare_for_deployment(cluster.nodes)
facts = self.serializer.serialize(cluster, cluster.nodes)
self.assertEqual(1, len(facts))

View File

@ -68,6 +68,7 @@ class TestVerifyNetworks(BaseIntegrationTest):
'nodes': [{'uid': node1.id, 'networks': nets},
{'uid': node2.id, 'networks': nets}]}
self.receiver.verify_networks_resp(**kwargs)
self.db.flush()
self.db.refresh(task)
self.assertEqual(task.status, "ready")
self.assertEqual(task.message, '')
@ -103,6 +104,7 @@ class TestVerifyNetworks(BaseIntegrationTest):
'nodes': [{'uid': node1.id, 'networks': nets_resp},
{'uid': node2.id, 'networks': nets_resp}]}
self.receiver.verify_networks_resp(**kwargs)
self.db.flush()
self.db.refresh(task)
self.assertEqual(task.status, "error")
error_nodes = []
@ -147,6 +149,7 @@ class TestVerifyNetworks(BaseIntegrationTest):
self.db.delete(node2)
self.db.commit()
self.receiver.verify_networks_resp(**kwargs)
self.db.flush()
resp = self.app.get(
reverse('TaskHandler', kwargs={'obj_id': task.id}),
headers=self.default_headers
@ -193,6 +196,7 @@ class TestVerifyNetworks(BaseIntegrationTest):
'nodes': [],
'error': error_msg}
self.receiver.verify_networks_resp(**kwargs)
self.db.flush()
self.db.refresh(task)
self.assertEqual(task.status, "error")
self.assertEqual(task.message, error_msg)
@ -229,6 +233,7 @@ class TestVerifyNetworks(BaseIntegrationTest):
{'uid': node2.id, 'networks': nets_sent},
{'uid': node1.id, 'networks': nets_sent}]}
self.receiver.verify_networks_resp(**kwargs)
self.db.flush()
self.db.refresh(task)
self.assertEqual(task.status, "ready")
self.assertEqual(task.message, '')
@ -355,6 +360,7 @@ class TestVerifyNetworks(BaseIntegrationTest):
'nodes': [{'uid': node1.id, 'networks': nets_sent},
{'uid': node2.id, 'networks': nets_sent}]}
self.receiver.verify_networks_resp(**kwargs)
self.db.flush()
self.db.refresh(task)
self.assertEqual(task.status, "error")
self.assertRegexpMatches(task.message, node3.name)
@ -402,6 +408,7 @@ class TestVerifyNetworks(BaseIntegrationTest):
{'uid': node2.id, 'networks': []},
{'uid': node3.id, 'networks': nets_sent}]}
self.receiver.verify_networks_resp(**kwargs)
self.db.flush()
self.db.refresh(task)
self.assertEqual(task.status, "error")
self.assertEqual(task.message, '')
@ -452,6 +459,7 @@ class TestVerifyNetworks(BaseIntegrationTest):
'nodes': [{'uid': node1.id, 'networks': []},
{'uid': node2.id, 'networks': nets_sent}]}
self.receiver.verify_networks_resp(**kwargs)
self.db.flush()
self.db.refresh(task)
self.assertEqual(task.status, "error")
self.assertEqual(task.message, '')
@ -494,6 +502,7 @@ class TestVerifyNetworks(BaseIntegrationTest):
'nodes': [{'uid': node1.id, 'networks': nets_sent},
{'uid': node2.id, 'networks': nets_sent}]}
self.receiver.verify_networks_resp(**kwargs)
self.db.flush()
self.db.refresh(task)
self.assertEqual(task.status, "ready")
@ -531,6 +540,7 @@ class TestVerifyNetworks(BaseIntegrationTest):
'nodes': [{'uid': node1.id, 'networks': nets_resp},
{'uid': node2.id, 'networks': nets_resp}]}
self.receiver.verify_networks_resp(**kwargs)
self.db.flush()
self.db.refresh(task)
self.assertEqual(task.status, "error")
error_nodes = [{'uid': node1.id, 'interface': 'eth0',
@ -574,6 +584,7 @@ class TestVerifyNetworks(BaseIntegrationTest):
'nodes': [{'uid': node1.id, 'networks': nets_sent},
{'uid': node2.id, 'networks': nets_sent}]}
self.receiver.verify_networks_resp(**kwargs)
self.db.flush()
self.db.refresh(task)
self.assertEqual(task.status, "ready")
@ -619,6 +630,7 @@ class TestDhcpCheckTask(BaseIntegrationTest):
}
self.receiver.check_dhcp_resp(**kwargs)
self.db.flush()
self.db.refresh(self.task)
self.assertEqual(self.task.status, "ready")
self.assertEqual(self.task.result, {})
@ -641,6 +653,7 @@ class TestDhcpCheckTask(BaseIntegrationTest):
'iface': 'eth0'}]}]
}
self.receiver.check_dhcp_resp(**kwargs)
self.db.flush()
self.db.refresh(self.task)
self.assertEqual(self.task.status, "error")
@ -650,6 +663,7 @@ class TestDhcpCheckTask(BaseIntegrationTest):
'status': 'ready'
}
self.receiver.check_dhcp_resp(**kwargs)
self.db.flush()
self.db.refresh(self.task)
self.assertEqual(self.task.status, "ready")
self.assertEqual(self.task.result, {})
@ -660,6 +674,7 @@ class TestDhcpCheckTask(BaseIntegrationTest):
'status': 'error'
}
self.receiver.check_dhcp_resp(**kwargs)
self.db.flush()
self.db.refresh(self.task)
self.assertEqual(self.task.status, 'error')
self.assertEqual(self.task.result, {})
@ -696,6 +711,7 @@ class TestClusterUpdate(BaseIntegrationTest):
'nodes': [{'uid': node1.id, 'status': 'ready'},
{'uid': node2.id, 'status': 'ready'}]}
self.receiver.deploy_resp(**kwargs)
self.db.flush()
self.db.refresh(node1)
self.db.refresh(node2)
self.db.refresh(self.task)
@ -795,6 +811,7 @@ class TestConsumer(BaseIntegrationTest):
self.db.commit()
kwargs = {'task_uuid': task.uuid,
'status': 'running',
'nodes': [
{'uid': node.id,
'status': 'provisioning',
@ -841,12 +858,15 @@ class TestConsumer(BaseIntegrationTest):
subtask_progress = random.randint(1, 20)
deletion_kwargs = {'task_uuid': task_deletion.uuid,
'progress': subtask_progress}
'progress': subtask_progress,
'status': 'running'}
provision_kwargs = {'task_uuid': task_provision.uuid,
'progress': subtask_progress}
'progress': subtask_progress,
'status': 'running'}
def progress_difference():
self.receiver.provision_resp(**provision_kwargs)
self.db.flush()
self.db.refresh(task_provision)
self.assertEqual(task_provision.progress, subtask_progress)
@ -855,6 +875,7 @@ class TestConsumer(BaseIntegrationTest):
progress_before_delete_subtask = supertask.progress
self.receiver.remove_nodes_resp(**deletion_kwargs)
self.db.flush()
self.db.refresh(task_deletion)
self.assertEqual(task_deletion.progress, subtask_progress)
@ -904,19 +925,23 @@ class TestConsumer(BaseIntegrationTest):
subtask_progress = random.randint(1, 20)
deletion_kwargs = {'task_uuid': task_deletion.uuid,
'progress': subtask_progress}
'progress': subtask_progress,
'status': 'running'}
provision_kwargs = {'task_uuid': task_provision.uuid,
'progress': subtask_progress}
'progress': subtask_progress,
'status': 'running'}
self.receiver.provision_resp(**provision_kwargs)
self.db.flush()
self.receiver.remove_nodes_resp(**deletion_kwargs)
self.db.flush()
self.db.refresh(task_deletion)
self.db.refresh(task_provision)
self.db.refresh(supertask)
calculated_progress = helpers.\
TaskHelper._calculate_parent_task_progress(
TaskHelper.calculate_parent_task_progress(
[task_deletion, task_provision]
)

View File

@ -30,7 +30,6 @@ from nailgun.db.sqlalchemy.models import Notification
from nailgun.db.sqlalchemy.models import Task
from nailgun.errors import errors
from nailgun.openstack.common import jsonutils
from nailgun.task.helpers import TaskHelper
from nailgun.task.manager import ApplyChangesTaskManager
from nailgun.test.base import BaseIntegrationTest
from nailgun.test.base import fake_tasks
@ -120,7 +119,7 @@ class TestTaskManagers(BaseIntegrationTest):
{"pending_addition": True, 'roles': ['compute']}])
cluster_db = self.env.clusters[0]
# Generate ips, fqdns
TaskHelper.prepare_for_deployment(cluster_db.nodes)
objects.NodeCollection.prepare_for_deployment(cluster_db.nodes)
# First node with status ready
# should not be readeployed
self.env.nodes[0].status = 'ready'
@ -416,6 +415,7 @@ class TestTaskManagers(BaseIntegrationTest):
)
supertask = self.env.launch_deployment()
self.db.flush()
self.env.wait_ready(supertask, timeout=5)
self.assertEqual(self.env.db.query(Node).count(), 1)

View File

@ -20,9 +20,12 @@ from itertools import ifilter
from nailgun.test.base import BaseIntegrationTest
from nailgun.errors import errors
from nailgun import consts
from nailgun.db import NoCacheQuery
from nailgun.db.sqlalchemy.models import Task
from nailgun import objects
@ -332,3 +335,152 @@ class TestNodeObject(BaseIntegrationTest):
self.env.create_nodes(nodes_count)
nodes_db = objects.NodeCollection.eager_nodes_handlers(None)
self.assertEqual(nodes_db.count(), nodes_count)
class TestTaskObject(BaseIntegrationTest):
def setUp(self):
super(TestTaskObject, self).setUp()
self.env.create(
nodes_kwargs=[
{'roles': ['controller']},
{'roles': ['compute']},
{'roles': ['cinder']}])
def _node_should_be_error_with_type(self, node, error_type):
self.assertEquals(node.status, 'error')
self.assertEquals(node.error_type, error_type)
self.assertEquals(node.progress, 0)
def _nodes_should_not_be_error(self, nodes):
for node in nodes:
self.assertEquals(node.status, 'discover')
@property
def cluster(self):
return self.env.clusters[0]
def test_update_nodes_to_error_if_deployment_task_failed(self):
self.cluster.nodes[0].status = 'deploying'
self.cluster.nodes[0].progress = 12
task = Task(name='deployment', cluster=self.cluster, status='error')
self.db.add(task)
self.db.flush()
objects.Task._update_cluster_data(task)
self.db.flush()
self.assertEquals(self.cluster.status, 'error')
self._node_should_be_error_with_type(self.cluster.nodes[0], 'deploy')
self._nodes_should_not_be_error(self.cluster.nodes[1:])
def test_update_cluster_to_error_if_deploy_task_failed(self):
task = Task(name='deploy', cluster=self.cluster, status='error')
self.db.add(task)
self.db.flush()
objects.Task._update_cluster_data(task)
self.db.flush()
self.assertEquals(self.cluster.status, 'error')
def test_update_nodes_to_error_if_provision_task_failed(self):
self.cluster.nodes[0].status = 'provisioning'
self.cluster.nodes[0].progress = 12
task = Task(name='provision', cluster=self.cluster, status='error')
self.db.add(task)
self.db.flush()
objects.Task._update_cluster_data(task)
self.db.flush()
self.assertEquals(self.cluster.status, 'error')
self._node_should_be_error_with_type(self.cluster.nodes[0],
'provision')
self._nodes_should_not_be_error(self.cluster.nodes[1:])
def test_update_cluster_to_operational(self):
task = Task(name='deploy', cluster=self.cluster, status='ready')
self.db.add(task)
self.db.flush()
objects.Task._update_cluster_data(task)
self.db.flush()
self.assertEquals(self.cluster.status, 'operational')
def test_update_if_parent_task_is_ready_all_nodes_should_be_ready(self):
for node in self.cluster.nodes:
node.status = 'ready'
node.progress = 100
self.cluster.nodes[0].status = 'deploying'
self.cluster.nodes[0].progress = 24
task = Task(name='deploy', cluster=self.cluster, status='ready')
self.db.add(task)
self.db.flush()
objects.Task._update_cluster_data(task)
self.db.flush()
self.assertEquals(self.cluster.status, 'operational')
for node in self.cluster.nodes:
self.assertEquals(node.status, 'ready')
self.assertEquals(node.progress, 100)
def test_update_cluster_status_if_task_was_already_in_error_status(self):
for node in self.cluster.nodes:
node.status = 'provisioning'
node.progress = 12
task = Task(name='provision', cluster=self.cluster, status='error')
self.db.add(task)
self.db.flush()
data = {'status': 'error', 'progress': 100}
objects.Task.update(task, data)
self.db.flush()
self.assertEquals(self.cluster.status, 'error')
self.assertEquals(task.status, 'error')
for node in self.cluster.nodes:
self.assertEquals(node.status, 'error')
self.assertEquals(node.progress, 0)
def test_do_not_set_cluster_to_error_if_validation_failed(self):
for task_name in ['check_before_deployment', 'check_networks']:
supertask = Task(
name='deploy',
cluster=self.cluster,
status='error')
check_task = Task(
name=task_name,
cluster=self.cluster,
status='error')
supertask.subtasks.append(check_task)
self.db.add(check_task)
self.db.flush()
objects.Task._update_cluster_data(supertask)
self.db.flush()
self.assertEquals(self.cluster.status, 'new')
def test_get_task_by_uuid_returns_task(self):
task = Task(name='deploy')
self.db.add(task)
self.db.flush()
task_by_uuid = objects.Task.get_by_uuid(task.uuid)
self.assertEquals(task.uuid, task_by_uuid.uuid)
def test_get_task_by_uuid_raises_error(self):
self.assertRaises(errors.ObjectNotFound,
objects.Task.get_by_uuid,
uuid='not_found_uuid',
fail_if_not_found=True)

View File

@ -1,35 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2013 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nailgun.errors import errors
from nailgun.test.base import BaseTestCase
from nailgun.db.sqlalchemy.models import Task
from nailgun.task.helpers import TaskHelper
class TestUtils(BaseTestCase):
def test_get_task_by_uuid_returns_task(self):
task = Task(name='deploy')
self.db.add(task)
self.db.commit()
task_by_uuid = TaskHelper.get_task_by_uuid(task.uuid)
self.assertEqual(task.uuid, task_by_uuid.uuid)
def test_get_task_by_uuid_raises_error(self):
self.assertRaises(errors.CannotFindTask,
TaskHelper.get_task_by_uuid,
'not_found_uuid')

View File

@ -17,7 +17,7 @@ from mock import patch
from nailgun.db.sqlalchemy.models import Task
from nailgun.errors import errors
from nailgun.task.helpers import TaskHelper
from nailgun import objects
from nailgun.task.task import CheckBeforeDeploymentTask
from nailgun.test.base import BaseTestCase
from nailgun.volumes.manager import VolumeManager
@ -53,7 +53,8 @@ class TestHelperUpdateClusterStatus(BaseTestCase):
self.db.add(task)
self.db.commit()
TaskHelper.update_cluster_status(task.uuid)
objects.Task._update_cluster_data(task)
self.db.flush()
self.assertEqual(self.cluster.status, 'error')
self.node_should_be_error_with_type(self.cluster.nodes[0], 'deploy')
@ -64,7 +65,8 @@ class TestHelperUpdateClusterStatus(BaseTestCase):
self.db.add(task)
self.db.commit()
TaskHelper.update_cluster_status(task.uuid)
objects.Task._update_cluster_data(task)
self.db.flush()
self.assertEqual(self.cluster.status, 'error')
@ -75,7 +77,8 @@ class TestHelperUpdateClusterStatus(BaseTestCase):
self.db.add(task)
self.db.commit()
TaskHelper.update_cluster_status(task.uuid)
objects.Task._update_cluster_data(task)
self.db.flush()
self.assertEqual(self.cluster.status, 'error')
self.node_should_be_error_with_type(self.cluster.nodes[0], 'provision')
@ -86,7 +89,8 @@ class TestHelperUpdateClusterStatus(BaseTestCase):
self.db.add(task)
self.db.commit()
TaskHelper.update_cluster_status(task.uuid)
objects.Task._update_cluster_data(task)
self.db.flush()
self.assertEqual(self.cluster.status, 'operational')
@ -102,7 +106,8 @@ class TestHelperUpdateClusterStatus(BaseTestCase):
self.db.add(task)
self.db.commit()
TaskHelper.update_cluster_status(task.uuid)
objects.Task._update_cluster_data(task)
self.db.flush()
self.assertEqual(self.cluster.status, 'operational')
@ -119,7 +124,9 @@ class TestHelperUpdateClusterStatus(BaseTestCase):
self.db.add(task)
self.db.commit()
TaskHelper.update_task_status(task.uuid, 'error', 100)
data = {'status': 'error', 'progress': 100}
objects.Task.update(task, data)
self.db.flush()
self.assertEqual(self.cluster.status, 'error')
self.assertEqual(task.status, 'error')
@ -144,7 +151,9 @@ class TestHelperUpdateClusterStatus(BaseTestCase):
self.db.add(check_task)
self.db.commit()
TaskHelper.update_cluster_status(supertask.uuid)
objects.Task._update_cluster_data(supertask)
self.db.flush()
self.assertEqual(self.cluster.status, 'new')

View File

@ -17,6 +17,9 @@
from nailgun.db.sqlalchemy.models import Cluster
from nailgun.db.sqlalchemy.models import Task
from nailgun import objects
from nailgun.orchestrator.deployment_serializers \
import DeploymentHASerializer
from nailgun.task.helpers import TaskHelper
@ -30,7 +33,8 @@ class TestTaskHelpers(BaseTestCase):
nodes_kwargs=nodes)
cluster_db = self.db.query(Cluster).get(cluster['id'])
TaskHelper.prepare_for_deployment(cluster_db.nodes)
objects.NodeCollection.prepare_for_deployment(cluster_db.nodes)
self.db.flush()
return cluster_db
@property
@ -94,6 +98,7 @@ class TestTaskHelpers(BaseTestCase):
computes = self.filter_by_role(nodes, 'compute')
self.assertEqual(len(computes), 1)
# TODO(aroma): move it to utils testing code
def test_recalculate_deployment_task_progress(self):
cluster = self.create_env([
{'roles': ['controller'],
@ -116,6 +121,7 @@ class TestTaskHelpers(BaseTestCase):
progress = TaskHelper.recalculate_deployment_task_progress(task)
self.assertEqual(progress, 25)
# TODO(aroma): move it to utils testing code
def test_recalculate_provisioning_task_progress(self):
cluster = self.create_env([
{'roles': ['controller'],