Updated rh host workflow for recovery workflow details

After merge of [1] host recovery for reserved host is getting failed
as it attempts to persists Host object.

This patch addressed this issue by sending list of names of reserved host
instead of host object to recovery flow.

[1]: https://review.openstack.org/#/c/640755/

Change-Id: Ifd008b89ff639e2e3bd8229830b5a20dced1c31b
Closes-Bug: #1819578
This commit is contained in:
shilpa 2019-03-12 12:02:55 +05:30
parent ef378b1c04
commit 5e037db22e
10 changed files with 352 additions and 240 deletions

View File

@ -39,7 +39,7 @@ LOG = logging.getLogger(__name__)
class NotificationDriver(object):
@abc.abstractmethod
def execute_host_failure(self, context, host_name, recovery_method,
notification_uuid, reserved_host_list=None):
notification_uuid, **kwargs):
pass
@abc.abstractmethod

View File

@ -44,7 +44,13 @@ class MasakariTask(task.Task):
"""
def __init__(self, context, novaclient, **kwargs):
super(MasakariTask, self).__init__(self.__class__.__name__, **kwargs)
requires = kwargs.get('requires')
rebind = kwargs.get('rebind')
provides = kwargs.get('provides')
super(MasakariTask, self).__init__(self.__class__.__name__,
requires=requires,
rebind=rebind,
provides=provides)
self.context = context
self.novaclient = novaclient
self.progress = []

View File

@ -60,14 +60,15 @@ class TaskFlowDriver(driver.NotificationDriver):
flow_engine.run()
def _execute_rh_workflow(self, context, novaclient, process_what,
reserved_host_list):
if not reserved_host_list:
**kwargs):
if not kwargs['reserved_host_list']:
msg = _('No reserved_hosts available for evacuation.')
raise exception.ReservedHostsUnavailable(message=msg)
process_what['reserved_host_list'] = reserved_host_list
process_what['reserved_host_list'] = kwargs.pop('reserved_host_list')
flow_engine = host_failure.get_rh_flow(context, novaclient,
process_what)
process_what,
**kwargs)
with base.DynamicLogListener(flow_engine, logger=LOG):
try:
@ -76,7 +77,7 @@ class TaskFlowDriver(driver.NotificationDriver):
raise exception.HostRecoveryFailureException(ex.message)
def _execute_auto_priority_workflow(self, context, novaclient,
process_what, reserved_host_list):
process_what, **kwargs):
try:
self._execute_auto_workflow(context, novaclient, process_what)
except Exception as ex:
@ -97,14 +98,15 @@ class TaskFlowDriver(driver.NotificationDriver):
'reserved_host':
fields.FailoverSegmentRecoveryMethod.RESERVED_HOST
})
self._execute_rh_workflow(context, novaclient, process_what,
reserved_host_list)
self._execute_rh_workflow(context,
novaclient, process_what,
**kwargs)
def _execute_rh_priority_workflow(self, context, novaclient, process_what,
reserved_host_list):
**kwargs):
try:
self._execute_rh_workflow(context, novaclient, process_what,
reserved_host_list)
**kwargs)
except Exception as ex:
with excutils.save_and_reraise_exception(reraise=False) as ctxt:
if isinstance(ex, exception.SkipHostRecoveryException):
@ -126,7 +128,7 @@ class TaskFlowDriver(driver.NotificationDriver):
self._execute_auto_workflow(context, novaclient, process_what)
def execute_host_failure(self, context, host_name, recovery_method,
notification_uuid, reserved_host_list=None):
notification_uuid, **kwargs):
novaclient = nova.API()
# get flow for host failure
process_what = {
@ -140,16 +142,15 @@ class TaskFlowDriver(driver.NotificationDriver):
elif recovery_method == (
fields.FailoverSegmentRecoveryMethod.RESERVED_HOST):
self._execute_rh_workflow(context, novaclient, process_what,
reserved_host_list)
**kwargs)
elif recovery_method == (
fields.FailoverSegmentRecoveryMethod.AUTO_PRIORITY):
self._execute_auto_priority_workflow(context, novaclient,
process_what,
reserved_host_list)
self._execute_auto_priority_workflow(
context, novaclient,
process_what, **kwargs)
else:
self._execute_rh_priority_workflow(context, novaclient,
process_what,
reserved_host_list)
process_what, **kwargs)
except Exception as exc:
with excutils.save_and_reraise_exception(reraise=False) as ctxt:
if isinstance(exc, (exception.SkipHostRecoveryException,
@ -240,25 +241,28 @@ class TaskFlowDriver(driver.NotificationDriver):
elif notification.type == fields.NotificationType.PROCESS:
tasks = TASKFLOW_CONF.process_failure_recovery_tasks
elif notification.type == fields.NotificationType.COMPUTE_HOST:
if recovery_method == fields.FailoverSegmentRecoveryMethod.AUTO:
if recovery_method in [
fields.FailoverSegmentRecoveryMethod.AUTO,
fields.FailoverSegmentRecoveryMethod.AUTO_PRIORITY]:
tasks = TASKFLOW_CONF.host_auto_failure_recovery_tasks
elif recovery_method == (
fields.FailoverSegmentRecoveryMethod.RESERVED_HOST):
elif recovery_method in [
fields.FailoverSegmentRecoveryMethod.RESERVED_HOST,
fields.FailoverSegmentRecoveryMethod.RH_PRIORITY]:
tasks = TASKFLOW_CONF.host_rh_failure_recovery_tasks
for plugin in base.get_recovery_flow(tasks['pre'],
context=context,
novaclient=novaclient):
for plugin in base.get_recovery_flow(
tasks['pre'], context=context, novaclient=novaclient,
update_host_method=None):
task_list.append(plugin.name)
for plugin in base.get_recovery_flow(tasks['main'],
context=context,
novaclient=novaclient):
for plugin in base.get_recovery_flow(
tasks['main'], context=context, novaclient=novaclient,
update_host_method=None):
task_list.append(plugin.name)
for plugin in base.get_recovery_flow(tasks['post'],
context=context,
novaclient=novaclient):
for plugin in base.get_recovery_flow(
tasks['post'], context=context, novaclient=novaclient,
update_host_method=None):
task_list.append(plugin.name)
return task_list
@ -268,42 +272,63 @@ class TaskFlowDriver(driver.NotificationDriver):
notification):
"""Retrieve progress details in notification"""
# Note<ShilpaSD>: Taskflow doesn't support to return task details in
# the same sequence in which all tasks are executed. Reported this
# issue in LP #1815738. To resolve this issue load the tasks based on
# the recovery method and later sort it based on this task list so
# progress_details can be returned in the expected order.
task_list = self._get_taskflow_sequence(context, recovery_method,
notification)
backend = backends.fetch(PERSISTENCE_BACKEND)
with contextlib.closing(backend.get_connection()) as conn:
progress_details = []
flow_details = conn.get_flows_for_book(
notification.notification_uuid)
if flow_details:
for flow in flow_details:
od = OrderedDict()
atom_details = list(conn.get_atoms_for_flow(flow.uuid))
for flow in flow_details:
od = OrderedDict()
atom_details = list(conn.get_atoms_for_flow(flow.uuid))
for task in task_list:
for atom in atom_details:
if task == atom.name:
od[atom.name] = atom
# TODO(ShilpaSD): In case recovery_method is auto_priority/
# rh_priority, there is no way to figure out whether the
# recovery was done successfully using AUTO or RH flow.
# Taskflow stores 'retry_instance_evacuate_engine_retry' task
# in case of RH flow so if
# 'retry_instance_evacuate_engine_retry' is stored in the
# given flow details then the sorting of task details should
# happen based on the RH flow.
# This logic won't be required after LP #1815738 is fixed.
if recovery_method in ['AUTO_PRIORITY', 'RH_PRIORITY']:
persisted_task_list = [atom.name for atom in
atom_details]
if ('retry_instance_evacuate_engine_retry' in
persisted_task_list):
recovery_method = (
fields.FailoverSegmentRecoveryMethod.
RESERVED_HOST)
else:
recovery_method = (
fields.FailoverSegmentRecoveryMethod.AUTO)
for key, value in od.items():
# Add progress_details only if tasks are executed and
# meta is available in which progress_details are
# stored.
if value.meta and value.meta.get('progress_details'):
progress_details_obj = (
objects.NotificationProgressDetails.create(
value.name,
value.meta['progress'],
value.meta['progress_details']['details']
['progress_details'],
value.state))
# TODO(ShilpaSD): Taskflow doesn't support to return task
# details in the same sequence in which all tasks are
# executed. Reported this issue in LP #1815738. To resolve
# this issue load the tasks based on the recovery method and
# later sort it based on this task list so progress_details
# can be returned in the expected order.
task_list = self._get_taskflow_sequence(context,
recovery_method,
notification)
progress_details.append(progress_details_obj)
for task in task_list:
for atom in atom_details:
if task == atom.name:
od[atom.name] = atom
for key, value in od.items():
# Add progress_details only if tasks are executed and meta
# is available in which progress_details are stored.
if value.meta:
progress_details_obj = (
objects.NotificationProgressDetails.create(
value.name,
value.meta['progress'],
value.meta['progress_details']['details']
['progress_details'],
value.state))
progress_details.append(progress_details_obj)
return progress_details

View File

@ -40,12 +40,10 @@ TASKFLOW_CONF = cfg.CONF.taskflow_driver_recovery_flows
class DisableComputeServiceTask(base.MasakariTask):
def __init__(self, context, novaclient):
requires = ["host_name"]
super(DisableComputeServiceTask, self).__init__(context,
novaclient,
requires=requires
)
def __init__(self, context, novaclient, **kwargs):
kwargs['requires'] = ["host_name"]
super(DisableComputeServiceTask, self).__init__(context, novaclient,
**kwargs)
def execute(self, host_name):
msg = "Disabling compute service on host: '%s'" % host_name
@ -64,11 +62,11 @@ class PrepareHAEnabledInstancesTask(base.MasakariTask):
"""Get all HA_Enabled instances."""
default_provides = set(["instance_list"])
def __init__(self, context, novaclient):
requires = ["host_name"]
def __init__(self, context, novaclient, **kwargs):
kwargs['requires'] = ["host_name"]
super(PrepareHAEnabledInstancesTask, self).__init__(context,
novaclient,
requires=requires)
**kwargs)
def execute(self, host_name):
def _filter_instances(instance_list):
@ -115,6 +113,7 @@ class PrepareHAEnabledInstancesTask(base.MasakariTask):
self.update_details(msg)
instance_list = self.novaclient.get_servers(self.context, host_name)
msg = ("Total instances running on failed host '%(host_name)s' is "
"%(instance_list)d") % {'host_name': host_name,
'instance_list': len(instance_list)}
@ -142,11 +141,11 @@ class PrepareHAEnabledInstancesTask(base.MasakariTask):
class EvacuateInstancesTask(base.MasakariTask):
def __init__(self, context, novaclient):
requires = ["host_name", "instance_list"]
super(EvacuateInstancesTask, self).__init__(context,
novaclient,
requires=requires)
def __init__(self, context, novaclient, **kwargs):
kwargs['requires'] = ["host_name", "instance_list"]
self.update_host_method = kwargs['update_host_method']
super(EvacuateInstancesTask, self).__init__(context, novaclient,
**kwargs)
def _get_state_and_host_of_instance(self, context, instance):
new_instance = self.novaclient.get_server(context, instance.id)
@ -177,15 +176,16 @@ class EvacuateInstancesTask(base.MasakariTask):
periodic_call_stopped.wait)
except etimeout.Timeout:
with excutils.save_and_reraise_exception():
periodic_call_stopped.stop()
msg = ("Instance '%(uuid)s' is successfully evacuated but "
"failed to stop.") % {'uuid': instance.id}
LOG.warning(msg)
self.update_details(msg, 1.0)
finally:
else:
periodic_call_stopped.stop()
def _evacuate_and_confirm(self, context, instance, host_name,
failed_evacuation_instances, reserved_host=None):
failed_evacuation_instances,
reserved_host=None):
# Before locking the instance check whether it is already locked
# by user, if yes don't lock the instance
instance_already_locked = self.novaclient.get_server(
@ -206,6 +206,24 @@ class EvacuateInstancesTask(base.MasakariTask):
old_vm_state == new_vm_state):
raise loopingcall.LoopingCallDone()
def _wait_for_evacuation():
periodic_call = loopingcall.FixedIntervalLoopingCall(
_wait_for_evacuation_confirmation)
try:
# add a timeout to the periodic call.
periodic_call.start(interval=CONF.verify_interval)
etimeout.with_timeout(
CONF.wait_period_after_evacuation,
periodic_call.wait)
except etimeout.Timeout:
# Instance is not evacuated in the expected time_limit.
failed_evacuation_instances.append(instance.id)
else:
# stop the periodic call, in case of exceptions or
# Timeout.
periodic_call.stop()
try:
vm_state = getattr(instance, "OS-EXT-STS:vm_state")
task_state = getattr(instance, "OS-EXT-STS:task_state")
@ -232,35 +250,22 @@ class EvacuateInstancesTask(base.MasakariTask):
stop_instance = False
# evacuate the instance
self.novaclient.evacuate_instance(
context, instance.id,
target=reserved_host.name if reserved_host else None)
self.novaclient.evacuate_instance(context, instance.id,
target=reserved_host)
periodic_call = loopingcall.FixedIntervalLoopingCall(
_wait_for_evacuation_confirmation)
_wait_for_evacuation()
try:
# add a timeout to the periodic call.
periodic_call.start(interval=CONF.verify_interval)
etimeout.with_timeout(
CONF.wait_period_after_evacuation,
periodic_call.wait)
if vm_state != 'active':
if stop_instance:
self._stop_after_evacuation(self.context, instance)
# If the instance was in 'error' state before failure
# it should be set to 'error' after recovery.
if vm_state == 'error':
self.novaclient.reset_instance_state(
context, instance.id)
except etimeout.Timeout:
# Instance is not evacuated in the expected time_limit.
failed_evacuation_instances.append(instance.id)
finally:
# stop the periodic call, in case of exceptions or
# Timeout.
periodic_call.stop()
if vm_state != 'active':
if stop_instance:
self._stop_after_evacuation(self.context, instance)
# If the instance was in 'error' state before failure
# it should be set to 'error' after recovery.
if vm_state == 'error':
self.novaclient.reset_instance_state(
context, instance.id)
except etimeout.Timeout:
# Instance is not stop in the expected time_limit.
failed_evacuation_instances.append(instance.id)
except Exception:
# Exception is raised while resetting instance state or
# evacuating the instance itself.
@ -272,15 +277,16 @@ class EvacuateInstancesTask(base.MasakariTask):
def execute(self, host_name, instance_list, reserved_host=None):
msg = ("Start evacuation of instances from failed host '%(host_name)s'"
", instance uuids are : '%(instance_list)s'") % {
", instance uuids are: '%(instance_list)s'") % {
'host_name': host_name, 'instance_list': ','.join(instance_list)}
self.update_details(msg)
def _do_evacuate(context, host_name, instance_list,
reserved_host=None):
failed_evacuation_instances = []
if reserved_host:
msg = "Enabling reserved host: '%s'" % reserved_host.name
msg = "Enabling reserved host: '%s'" % reserved_host
self.update_details(msg, 0.1)
if CONF.host_failure.add_reserved_host_to_aggregate:
# Assign reserved_host to an aggregate to which the failed
@ -291,22 +297,22 @@ class EvacuateInstancesTask(base.MasakariTask):
try:
msg = ("Add host %(reserved_host)s to "
"aggregate %(aggregate)s") % {
'reserved_host': reserved_host.name,
'reserved_host': reserved_host,
'aggregate': aggregate.name}
self.update_details(msg, 0.2)
self.novaclient.add_host_to_aggregate(
context, reserved_host.name, aggregate)
context, reserved_host, aggregate)
msg = ("Added host %(reserved_host)s to "
"aggregate %(aggregate)s") % {
'reserved_host': reserved_host.name,
'reserved_host': reserved_host,
'aggregate': aggregate.name}
self.update_details(msg, 0.3)
except exception.Conflict:
msg = ("Host '%(reserved_host)s' already has "
"been added to aggregate "
"'%(aggregate)s'.") % {
'reserved_host': reserved_host.name,
'reserved_host': reserved_host,
'aggregate': aggregate.name}
self.update_details(msg, 1.0)
LOG.info(msg)
@ -319,31 +325,37 @@ class EvacuateInstancesTask(base.MasakariTask):
break
self.novaclient.enable_disable_service(
context, reserved_host.name, enable=True)
context, reserved_host, enable=True)
# Set reserved property of reserved_host to False
reserved_host.reserved = False
reserved_host.save()
self.update_host_method(self.context, reserved_host)
thread_pool = greenpool.GreenPool(
CONF.host_failure_recovery_threads)
for instance_id in instance_list:
msg = "Evacuation of instance started : '%s'" % instance_id
msg = "Evacuation of instance started: '%s'" % instance_id
self.update_details(msg, 0.5)
instance = self.novaclient.get_server(self.context,
instance_id)
thread_pool.spawn_n(self._evacuate_and_confirm, context,
instance, host_name,
failed_evacuation_instances, reserved_host)
if not (instance_id in failed_evacuation_instances):
msg = ("Instance '%s' evacuated successfully" %
instance_id)
self.update_details(msg, 0.7)
failed_evacuation_instances,
reserved_host)
thread_pool.waitall()
evacuated_instances = list(set(instance_list).difference(set(
failed_evacuation_instances)))
if evacuated_instances:
evacuated_instances.sort()
msg = ("Successfully evacuate instances '%(instance_list)s' "
"from host '%(host_name)s'") % {
'instance_list': ','.join(evacuated_instances),
'host_name': host_name}
self.update_details(msg, 0.7)
if failed_evacuation_instances:
msg = ("Failed to evacuate instances "
"'%(failed_evacuation_instances)s' from host "
@ -351,11 +363,14 @@ class EvacuateInstancesTask(base.MasakariTask):
'failed_evacuation_instances':
','.join(failed_evacuation_instances),
'host_name': host_name}
self.update_details(msg, 1.0)
self.update_details(msg, 0.7)
raise exception.HostRecoveryFailureException(
message=msg)
lock_name = reserved_host.name if reserved_host else None
msg = "Evacuation process completed!"
self.update_details(msg, 1.0)
lock_name = reserved_host if reserved_host else None
@utils.synchronized(lock_name)
def do_evacuate_with_reserved_host(context, host_name, instance_list,
@ -390,17 +405,20 @@ def get_auto_flow(context, novaclient, process_what):
auto_evacuate_flow_pre = linear_flow.Flow('pre_tasks')
for plugin in base.get_recovery_flow(task_dict['pre'], context=context,
novaclient=novaclient):
novaclient=novaclient,
update_host_method=None):
auto_evacuate_flow_pre.add(plugin)
auto_evacuate_flow_main = linear_flow.Flow('main_tasks')
for plugin in base.get_recovery_flow(task_dict['main'], context=context,
novaclient=novaclient):
novaclient=novaclient,
update_host_method=None):
auto_evacuate_flow_main.add(plugin)
auto_evacuate_flow_post = linear_flow.Flow('post_tasks')
for plugin in base.get_recovery_flow(task_dict['post'], context=context,
novaclient=novaclient):
novaclient=novaclient,
update_host_method=None):
auto_evacuate_flow_post.add(plugin)
nested_flow.add(auto_evacuate_flow_pre)
@ -411,7 +429,7 @@ def get_auto_flow(context, novaclient, process_what):
process_what)
def get_rh_flow(context, novaclient, process_what):
def get_rh_flow(context, novaclient, process_what, **kwargs):
"""Constructs and returns the engine entrypoint flow.
This flow will do the following:
@ -427,21 +445,24 @@ def get_rh_flow(context, novaclient, process_what):
task_dict = TASKFLOW_CONF.host_rh_failure_recovery_tasks
rh_evacuate_flow_pre = linear_flow.Flow('pre_tasks')
for plugin in base.get_recovery_flow(task_dict['pre'], context=context,
novaclient=novaclient):
for plugin in base.get_recovery_flow(
task_dict['pre'],
context=context, novaclient=novaclient, **kwargs):
rh_evacuate_flow_pre.add(plugin)
rh_evacuate_flow_main = linear_flow.Flow(
"retry_%s" % flow_name, retry=retry.ParameterizedForEach(
rebind=['reserved_host_list'], provides='reserved_host'))
for plugin in base.get_recovery_flow(task_dict['main'], context=context,
novaclient=novaclient):
for plugin in base.get_recovery_flow(
task_dict['main'],
context=context, novaclient=novaclient, **kwargs):
rh_evacuate_flow_main.add(plugin)
rh_evacuate_flow_post = linear_flow.Flow('post_tasks')
for plugin in base.get_recovery_flow(task_dict['post'], context=context,
novaclient=novaclient):
for plugin in base.get_recovery_flow(
task_dict['post'],
context=context, novaclient=novaclient, **kwargs):
rh_evacuate_flow_post.add(plugin)
nested_flow.add(rh_evacuate_flow_pre)

View File

@ -33,11 +33,11 @@ TASKFLOW_CONF = cfg.CONF.taskflow_driver_recovery_flows
class StopInstanceTask(base.MasakariTask):
def __init__(self, context, novaclient):
requires = ["instance_uuid"]
def __init__(self, context, novaclient, **kwargs):
kwargs['requires'] = ["instance_uuid"]
super(StopInstanceTask, self).__init__(context,
novaclient,
requires=requires)
**kwargs)
def execute(self, instance_uuid):
"""Stop the instance for recovery."""
@ -105,11 +105,11 @@ class StopInstanceTask(base.MasakariTask):
class StartInstanceTask(base.MasakariTask):
def __init__(self, context, novaclient):
requires = ["instance_uuid"]
def __init__(self, context, novaclient, **kwargs):
kwargs['requires'] = ["instance_uuid"]
super(StartInstanceTask, self).__init__(context,
novaclient,
requires=requires)
**kwargs)
def execute(self, instance_uuid):
"""Start the instance."""
@ -134,11 +134,11 @@ class StartInstanceTask(base.MasakariTask):
class ConfirmInstanceActiveTask(base.MasakariTask):
def __init__(self, context, novaclient):
requires = ["instance_uuid"]
def __init__(self, context, novaclient, **kwargs):
kwargs['requires'] = ["instance_uuid"]
super(ConfirmInstanceActiveTask, self).__init__(context,
novaclient,
requires=requires)
**kwargs)
def execute(self, instance_uuid):
def _wait_for_active():

View File

@ -21,7 +21,7 @@ LOG = logging.getLogger(__name__)
class Noop(task.Task):
def __init__(self, context, novaclient):
def __init__(self, context, novaclient, **kwargs):
self.context = context
self.novaclient = novaclient
super(Noop, self).__init__()

View File

@ -32,11 +32,11 @@ TASKFLOW_CONF = cfg.CONF.taskflow_driver_recovery_flows
class DisableComputeNodeTask(base.MasakariTask):
def __init__(self, context, novaclient):
requires = ["process_name", "host_name"]
def __init__(self, context, novaclient, **kwargs):
kwargs['requires'] = ["process_name", "host_name"]
super(DisableComputeNodeTask, self).__init__(context,
novaclient,
requires=requires)
**kwargs)
def execute(self, process_name, host_name):
msg = "Disabling compute service on host: '%s'" % host_name
@ -56,11 +56,11 @@ class DisableComputeNodeTask(base.MasakariTask):
class ConfirmComputeNodeDisabledTask(base.MasakariTask):
def __init__(self, context, novaclient):
requires = ["process_name", "host_name"]
def __init__(self, context, novaclient, **kwargs):
kwargs['requires'] = ["process_name", "host_name"]
super(ConfirmComputeNodeDisabledTask, self).__init__(context,
novaclient,
requires=requires)
**kwargs)
def execute(self, process_name, host_name):
def _wait_for_disable():

View File

@ -45,6 +45,12 @@ CONF = masakari.conf.CONF
LOG = logging.getLogger(__name__)
def update_host_method(context, host_name, reserved=False):
reserved_host = objects.Host.get_by_name(context, host_name)
reserved_host.reserved = reserved
reserved_host.save()
class MasakariManager(manager.Manager):
"""Manages the running notifications"""
RPC_API_VERSION = rpcapi.EngineAPI.RPC_API_VERSION
@ -195,19 +201,24 @@ class MasakariManager(manager.Manager):
host_obj.save()
reserved_host_list = None
if not recovery_method == (
fields.FailoverSegmentRecoveryMethod.AUTO):
reserved_host_list = objects.HostList.get_all(
reserved_host_object_list = objects.HostList.get_all(
context, filters={
'failover_segment_id': host_obj.failover_segment_id,
'reserved': True,
'on_maintenance': False
})
# Create list of host name from reserved_host_object_list
reserved_host_list = [host.name for host in
reserved_host_object_list]
try:
self.driver.execute_host_failure(
context, host_name, recovery_method,
notification.notification_uuid,
update_host_method=update_host_method,
reserved_host_list=reserved_host_list)
except exception.SkipHostRecoveryException as e:
notification_status = fields.NotificationStatus.FINISHED
@ -355,9 +366,9 @@ class MasakariManager(manager.Manager):
context, notification.source_host_uuid)
recovery_method = host_obj.failover_segment.recovery_method
progress_details = (self.driver.
get_notification_recovery_workflow_details(
context, recovery_method, notification))
progress_details = (
self.driver.get_notification_recovery_workflow_details(
context, recovery_method, notification))
notification['recovery_workflow_details'] = progress_details
except Exception:
msg = (_('Failed to fetch notification recovery workflow details '

View File

@ -25,8 +25,8 @@ from masakari.compute import nova
from masakari import conf
from masakari import context
from masakari.engine.drivers.taskflow import host_failure
from masakari.engine import manager
from masakari import exception
from masakari.objects import host as host_obj
from masakari import test
from masakari.tests.unit import fakes
@ -112,7 +112,9 @@ class HostFailureTestCase(test.TestCase):
def _evacuate_instances(self, instance_list, mock_enable_disable,
reserved_host=None):
task = host_failure.EvacuateInstancesTask(self.ctxt, self.novaclient)
task = host_failure.EvacuateInstancesTask(
self.ctxt, self.novaclient,
update_host_method=manager.update_host_method)
old_instance_list = copy.deepcopy(instance_list['instance_list'])
if reserved_host:
@ -165,11 +167,12 @@ class HostFailureTestCase(test.TestCase):
"considered for evacuation. Total count is: '2'", 0.8),
mock.call("Instances to be evacuated are: '1,2'", 1.0),
mock.call("Start evacuation of instances from failed host "
"'fake-host', instance uuids are : '1,2'"),
mock.call("Evacuation of instance started : '1'", 0.5),
mock.call("Instance '1' evacuated successfully", 0.7),
mock.call("Evacuation of instance started : '2'", 0.5),
mock.call("Instance '2' evacuated successfully", 0.7),
"'fake-host', instance uuids are: '1,2'"),
mock.call("Evacuation of instance started: '1'", 0.5),
mock.call("Evacuation of instance started: '2'", 0.5),
mock.call("Successfully evacuate instances '1,2' from host "
"'fake-host'", 0.7),
mock.call('Evacuation process completed!', 1.0)
])
@mock.patch('masakari.compute.nova.novaclient')
@ -200,10 +203,10 @@ class HostFailureTestCase(test.TestCase):
instance_list = self._test_instance_list(2)
# execute EvacuateInstancesTask
with mock.patch.object(host_obj.Host, "save") as mock_save:
with mock.patch.object(manager, "update_host_method") as mock_save:
self._evacuate_instances(
instance_list, mock_enable_disable,
reserved_host=reserved_host)
reserved_host=reserved_host.name)
self.assertEqual(1, mock_save.call_count)
self.assertIn(reserved_host.name,
self.fake_client.aggregates.get('1').hosts)
@ -221,16 +224,17 @@ class HostFailureTestCase(test.TestCase):
"considered for evacuation. Total count is: '2'", 0.8),
mock.call("Instances to be evacuated are: '1,2'", 1.0),
mock.call("Start evacuation of instances from failed host "
"'fake-host', instance uuids are : '1,2'"),
"'fake-host', instance uuids are: '1,2'"),
mock.call("Enabling reserved host: 'fake-reserved-host'", 0.1),
mock.call('Add host fake-reserved-host to aggregate fake_agg',
0.2),
mock.call('Added host fake-reserved-host to aggregate fake_agg',
0.3),
mock.call("Evacuation of instance started : '1'", 0.5),
mock.call("Instance '1' evacuated successfully", 0.7),
mock.call("Evacuation of instance started : '2'", 0.5),
mock.call("Instance '2' evacuated successfully", 0.7)
mock.call("Evacuation of instance started: '1'", 0.5),
mock.call("Evacuation of instance started: '2'", 0.5),
mock.call("Successfully evacuate instances '1,2' from host "
"'fake-host'", 0.7),
mock.call('Evacuation process completed!', 1.0)
])
@mock.patch.object(nova.API, 'add_host_to_aggregate')
@ -266,10 +270,10 @@ class HostFailureTestCase(test.TestCase):
instance_list = self._test_instance_list(1)
# execute EvacuateInstancesTask
with mock.patch.object(host_obj.Host, "save") as mock_save:
with mock.patch.object(manager, "update_host_method") as mock_save:
self._evacuate_instances(
instance_list, mock_enable_disable,
reserved_host=reserved_host)
reserved_host=reserved_host.name)
self.assertEqual(1, mock_save.call_count)
self.assertIn(reserved_host.name,
self.fake_client.aggregates.get('1').hosts)
@ -285,14 +289,16 @@ class HostFailureTestCase(test.TestCase):
mock.call("Total HA Enabled instances count: '1'", 0.6),
mock.call("Instances to be evacuated are: '1'", 1.0),
mock.call("Start evacuation of instances from failed host "
"'fake-host', instance uuids are : '1'"),
"'fake-host', instance uuids are: '1'"),
mock.call("Enabling reserved host: 'fake-reserved-host'", 0.1),
mock.call('Add host fake-reserved-host to aggregate fake_agg',
0.2),
mock.call("Host 'fake-reserved-host' already has been added to "
"aggregate 'fake_agg'.", 1.0),
mock.call("Evacuation of instance started : '1'", 0.5),
mock.call("Instance '1' evacuated successfully", 0.7)
mock.call("Evacuation of instance started: '1'", 0.5),
mock.call("Successfully evacuate instances '1' from host "
"'fake-host'", 0.7),
mock.call('Evacuation process completed!', 1.0)
])
@ddt.data('rescued', 'paused', 'shelved', 'suspended',
@ -330,11 +336,12 @@ class HostFailureTestCase(test.TestCase):
# verify progress details
_mock_notify.assert_has_calls([
mock.call("Start evacuation of instances from failed host "
"'fake-host', instance uuids are : '1,2'"),
mock.call("Evacuation of instance started : '1'", 0.5),
mock.call("Instance '1' evacuated successfully", 0.7),
mock.call("Evacuation of instance started : '2'", 0.5),
mock.call("Instance '2' evacuated successfully", 0.7)
"'fake-host', instance uuids are: '1,2'"),
mock.call("Evacuation of instance started: '1'", 0.5),
mock.call("Evacuation of instance started: '2'", 0.5),
mock.call("Successfully evacuate instances '1,2' from host "
"'fake-host'", 0.7),
mock.call('Evacuation process completed!', 1.0)
])
@mock.patch('masakari.compute.nova.novaclient')
@ -376,9 +383,11 @@ class HostFailureTestCase(test.TestCase):
"considered for evacuation. Total count is: '1'", 0.8),
mock.call("Instances to be evacuated are: '2'", 1.0),
mock.call("Start evacuation of instances from failed host "
"'fake-host', instance uuids are : '2'"),
mock.call("Evacuation of instance started : '2'", 0.5),
mock.call("Instance '2' evacuated successfully", 0.7)
"'fake-host', instance uuids are: '2'"),
mock.call("Evacuation of instance started: '2'", 0.5),
mock.call("Successfully evacuate instances '2' from host "
"'fake-host'", 0.7),
mock.call('Evacuation process completed!', 1.0)
])
@mock.patch('masakari.compute.nova.novaclient')
@ -416,43 +425,6 @@ class HostFailureTestCase(test.TestCase):
" to be evacuated", 1.0)
])
@mock.patch('masakari.compute.nova.novaclient')
@mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.'
'update_details')
def test_host_failure_flow_all_instances_active_resized_instance(
self, _mock_notify, _mock_novaclient, mock_unlock, mock_lock,
mock_enable_disable):
_mock_novaclient.return_value = self.fake_client
# create ha_enabled test data
self.fake_client.servers.create(id="1", host=self.instance_host,
vm_state='resized',
ha_enabled=True)
self.fake_client.servers.create(id="2", host=self.instance_host,
vm_state='resized',
ha_enabled=True)
instance_uuid_list = []
for instance in self.fake_client.servers.list():
instance_uuid_list.append(instance.id)
instance_list = {
"instance_list": instance_uuid_list,
}
# execute EvacuateInstancesTask
self._evacuate_instances(instance_list,
mock_enable_disable)
# verify progress details
_mock_notify.assert_has_calls([
mock.call("Start evacuation of instances from failed host "
"'fake-host', instance uuids are : '1,2'"),
mock.call("Evacuation of instance started : '1'", 0.5),
mock.call("Instance '1' evacuated successfully", 0.7),
mock.call("Evacuation of instance started : '2'", 0.5),
mock.call("Instance '2' evacuated successfully", 0.7),
])
@mock.patch('masakari.compute.nova.novaclient')
@mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.'
'update_details')
@ -525,13 +497,10 @@ class HostFailureTestCase(test.TestCase):
# verify progress details
_mock_notify.assert_has_calls([
mock.call("Start evacuation of instances from failed host "
"'fake-host', instance uuids are : '1'"),
mock.call("Evacuation of instance started : '1'", 0.5),
mock.call("Instance '1' evacuated successfully", 0.7),
mock.call("Instance '1' is successfully evacuated but failed to "
"stop.", 1.0),
mock.call("Failed to evacuate instances '1' from host "
"'fake-host'", 1.0)
"'fake-host', instance uuids are: '1'"),
mock.call("Evacuation of instance started: '1'", 0.5),
mock.call("Failed to evacuate instances '1' from host 'fake-host'"
"", 0.7)
])
@mock.patch('masakari.compute.nova.novaclient')
@ -616,11 +585,73 @@ class HostFailureTestCase(test.TestCase):
# verify progress details
_mock_notify.assert_has_calls([
mock.call("Start evacuation of instances from failed host "
"'fake-host', instance uuids are : '1,2,3'"),
mock.call("Evacuation of instance started : '1'", 0.5),
mock.call("Instance '1' evacuated successfully", 0.7),
mock.call("Evacuation of instance started : '2'", 0.5),
mock.call("Instance '2' evacuated successfully", 0.7),
mock.call("Evacuation of instance started : '3'", 0.5),
mock.call("Instance '3' evacuated successfully", 0.7)
"'fake-host', instance uuids are: '1,2,3'"),
mock.call("Evacuation of instance started: '1'", 0.5),
mock.call("Evacuation of instance started: '2'", 0.5),
mock.call("Evacuation of instance started: '3'", 0.5),
mock.call("Successfully evacuate instances '1,2,3' from host "
"'fake-host'", 0.7),
mock.call('Evacuation process completed!', 1.0)
])
@mock.patch('masakari.compute.nova.novaclient')
@mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.'
'update_details')
def test_host_failure_flow_for_RH_recovery(
self, _mock_notify, _mock_novaclient, mock_unlock, mock_lock,
mock_enable_disable):
_mock_novaclient.return_value = self.fake_client
self.override_config("evacuate_all_instances",
True, "host_failure")
self.override_config("add_reserved_host_to_aggregate",
True, "host_failure")
# create test data
self.fake_client.servers.create(id="1", host=self.instance_host,
ha_enabled=True)
self.fake_client.servers.create(id="2", host=self.instance_host)
reserved_host = fakes.create_fake_host(name="fake-reserved-host",
reserved=True)
self.fake_client.aggregates.create(id="1", name='fake_agg',
hosts=[self.instance_host])
# execute DisableComputeServiceTask
self._test_disable_compute_service(mock_enable_disable)
# execute PrepareHAEnabledInstancesTask
instance_list = self._test_instance_list(2)
# execute EvacuateInstancesTask
with mock.patch.object(manager, "update_host_method") as mock_save:
self._evacuate_instances(
instance_list, mock_enable_disable,
reserved_host=reserved_host.name)
self.assertEqual(1, mock_save.call_count)
self.assertIn(reserved_host.name,
self.fake_client.aggregates.get('1').hosts)
# verify progress details
_mock_notify.assert_has_calls([
mock.call("Disabling compute service on host: 'fake-host'"),
mock.call("Disabled compute service on host: 'fake-host'", 1.0),
mock.call('Preparing instances for evacuation'),
mock.call("Total instances running on failed host 'fake-host' is 2"
"", 0.3),
mock.call("Total HA Enabled instances count: '1'", 0.6),
mock.call("Total Non-HA Enabled instances count: '1'", 0.7),
mock.call("All instances (HA Enabled/Non-HA Enabled) should be "
"considered for evacuation. Total count is: '2'", 0.8),
mock.call("Instances to be evacuated are: '1,2'", 1.0),
mock.call("Start evacuation of instances from failed host "
"'fake-host', instance uuids are: '1,2'"),
mock.call("Enabling reserved host: 'fake-reserved-host'", 0.1),
mock.call('Add host fake-reserved-host to aggregate fake_agg',
0.2),
mock.call('Added host fake-reserved-host to aggregate fake_agg',
0.3),
mock.call("Evacuation of instance started: '1'", 0.5),
mock.call("Evacuation of instance started: '2'", 0.5),
mock.call("Successfully evacuate instances '1,2' from host "
"'fake-host'", 0.7),
mock.call('Evacuation process completed!', 1.0)
])

View File

@ -19,6 +19,7 @@ from oslo_utils import timeutils
from masakari.compute import nova
import masakari.conf
from masakari import context
from masakari.engine import manager
from masakari.engine import utils as engine_utils
from masakari import exception
from masakari.objects import fields
@ -200,6 +201,7 @@ class EngineManagerUnitTestCase(test.NoDBTestCase):
self.engine.process_notification(self.context,
notification=notification)
self.assertEqual("finished", notification.status)
mock_host_save.assert_called_once()
mock_process_failure.assert_called_once_with(
self.context, notification.payload.get('process_name'),
fake_host.name,
@ -235,6 +237,7 @@ class EngineManagerUnitTestCase(test.NoDBTestCase):
self.engine.process_notification(self.context,
notification=notification)
self.assertEqual("finished", notification.status)
mock_host_save.assert_called_once()
action = fields.EventNotificationAction.NOTIFICATION_PROCESS
phase_start = fields.EventNotificationPhase.START
phase_end = fields.EventNotificationPhase.END
@ -266,6 +269,7 @@ class EngineManagerUnitTestCase(test.NoDBTestCase):
self.engine.process_notification(self.context,
notification=notification)
self.assertEqual("error", notification.status)
mock_host_save.assert_called_once()
e = exception.ProcessRecoveryFailureException('Failed to execute '
'process recovery workflow.')
action = fields.EventNotificationAction.NOTIFICATION_PROCESS
@ -281,14 +285,13 @@ class EngineManagerUnitTestCase(test.NoDBTestCase):
mock_notify_about_notification_update.assert_has_calls(notify_calls)
@mock.patch.object(host_obj.Host, "get_by_uuid")
@mock.patch.object(host_obj.Host, "save")
@mock.patch.object(notification_obj.Notification, "save")
@mock.patch.object(engine_utils, 'notify_about_notification_update')
@mock.patch("masakari.engine.drivers.taskflow."
"TaskFlowDriver.execute_process_failure")
def test_process_notification_type_process_event_started(
self, mock_process_failure, mock_notify_about_notification_update,
mock_notification_save, mock_host_save, mock_host_obj,
mock_notification_save, mock_host_obj,
mock_notification_get):
notification = self._get_process_type_notification()
mock_notification_get.return_value = notification
@ -360,11 +363,13 @@ class EngineManagerUnitTestCase(test.NoDBTestCase):
'on_maintenance': True,
}
mock_host_update.assert_called_once_with(update_data_by_host_failure)
mock_host_save.assert_called_once()
self.assertEqual("finished", notification.status)
mock_host_failure.assert_called_once_with(
self.context,
fake_host.name, fake_host.failover_segment.recovery_method,
notification.notification_uuid, reserved_host_list=None)
notification.notification_uuid, reserved_host_list=None,
update_host_method=manager.update_host_method)
action = fields.EventNotificationAction.NOTIFICATION_PROCESS
phase_start = fields.EventNotificationPhase.START
phase_end = fields.EventNotificationPhase.END
@ -388,8 +393,6 @@ class EngineManagerUnitTestCase(test.NoDBTestCase):
mock_host_update, mock_host_save, mock_host_obj,
mock_notification_get):
mock_format.return_value = mock.ANY
reserved_host_list = []
mock_get_all.return_value = reserved_host_list
fake_host = fakes.create_fake_host()
fake_host.failover_segment = fakes.create_fake_failover_segment(
@ -406,6 +409,7 @@ class EngineManagerUnitTestCase(test.NoDBTestCase):
'on_maintenance': True,
}
mock_host_update.assert_called_once_with(update_data_by_host_failure)
mock_host_save.assert_called_once()
self.assertEqual("error", notification.status)
action = fields.EventNotificationAction.NOTIFICATION_PROCESS
phase_start = fields.EventNotificationPhase.START
@ -437,10 +441,13 @@ class EngineManagerUnitTestCase(test.NoDBTestCase):
fake_host = fakes.create_fake_host()
fake_host.failover_segment = fakes.create_fake_failover_segment(
recovery_method='reserved_host')
reserved_host_list = [fake_host]
mock_get_all.return_value = reserved_host_list
reserved_host_object_list = [fake_host]
mock_get_all.return_value = reserved_host_object_list
mock_host_obj.return_value = fake_host
reserved_host_list = [host.name for host in
reserved_host_object_list]
notification = self._get_compute_host_type_notification()
mock_notification_get.return_value = notification
mock_host_failure.side_effect = self._fake_notification_workflow()
@ -452,12 +459,14 @@ class EngineManagerUnitTestCase(test.NoDBTestCase):
'on_maintenance': True,
}
mock_host_update.assert_called_once_with(update_data_by_host_failure)
mock_host_save.assert_called_once()
self.assertEqual("finished", notification.status)
mock_host_failure.assert_called_once_with(
self.context,
fake_host.name, fake_host.failover_segment.recovery_method,
notification.notification_uuid,
reserved_host_list=reserved_host_list)
reserved_host_list=reserved_host_list,
update_host_method=manager.update_host_method)
mock_get_all.assert_called_once_with(self.context, filters={
'failover_segment_id': fake_host.failover_segment.uuid,
'reserved': True, 'on_maintenance': False})
@ -487,14 +496,17 @@ class EngineManagerUnitTestCase(test.NoDBTestCase):
fake_host = fakes.create_fake_host(reserved=True)
fake_host.failover_segment = fakes.create_fake_failover_segment(
recovery_method='reserved_host')
reserved_host_list = [fake_host]
mock_get_all.return_value = reserved_host_list
reserved_host_object_list = [fake_host]
mock_get_all.return_value = reserved_host_object_list
mock_host_obj.return_value = fake_host
notification = self._get_compute_host_type_notification()
mock_notification_get.return_value = notification
mock_host_failure.side_effect = self._fake_notification_workflow()
reserved_host_list = [host.name for host in
reserved_host_object_list]
self.engine.process_notification(self.context,
notification=notification)
@ -503,12 +515,14 @@ class EngineManagerUnitTestCase(test.NoDBTestCase):
'reserved': False,
}
mock_host_update.assert_called_once_with(update_data_by_host_failure)
mock_host_save.assert_called_once()
self.assertEqual("finished", notification.status)
mock_host_failure.assert_called_once_with(
self.context,
fake_host.name, fake_host.failover_segment.recovery_method,
notification.notification_uuid,
reserved_host_list=reserved_host_list)
reserved_host_list=reserved_host_list,
update_host_method=manager.update_host_method)
action = fields.EventNotificationAction.NOTIFICATION_PROCESS
phase_start = fields.EventNotificationPhase.START
phase_end = fields.EventNotificationPhase.END
@ -549,6 +563,7 @@ class EngineManagerUnitTestCase(test.NoDBTestCase):
'on_maintenance': True,
}
mock_host_update.assert_called_once_with(update_data_by_host_failure)
mock_host_save.assert_called_once()
self.assertEqual("error", notification.status)
e = exception.HostRecoveryFailureException('Failed to execute host '
'recovery.')
@ -595,6 +610,7 @@ class EngineManagerUnitTestCase(test.NoDBTestCase):
'on_maintenance': True,
}
mock_host_update.assert_called_once_with(update_data_by_host_failure)
mock_host_save.assert_called_once()
self.assertEqual("finished", notification.status)
action = fields.EventNotificationAction.NOTIFICATION_PROCESS
phase_start = fields.EventNotificationPhase.START
@ -874,6 +890,7 @@ class EngineManagerUnitTestCase(test.NoDBTestCase):
self.context, 'fake_host',
fields.FailoverSegmentRecoveryMethod.RESERVED_HOST,
uuidsentinel.fake_notification,
update_host_method=manager.update_host_method,
reserved_host_list=['host-1', 'host-2'])
# Ensure custom_task added to the 'host_rh_failure_recovery_tasks'
# is executed.
@ -923,6 +940,7 @@ class EngineManagerUnitTestCase(test.NoDBTestCase):
self.context, "fake-host",
fields.FailoverSegmentRecoveryMethod.RESERVED_HOST,
uuidsentinel.fake_notification,
update_host_method=manager.update_host_method,
reserved_host_list=['host-1', 'host-2'])
# make sure instance is active and has different host