Add progress details for recovery workflows

This patch uses taskflow's persistence feature to store recovery
workflows details into database.

Added a new microversion to return progress_details of the
notification in `GET /notifications/<notification_uuid>` API.

APIImpact:
GET /notifications/<notification_uuid> API includes details
of recovery workflow

Change-Id: I93c1b7d88823e02d9a02855cabb8b22c9e40a7d5
Implements: bp progress-details-recovery-workflows
This commit is contained in:
shilpa.devharakar 2019-01-31 08:20:14 +00:00 committed by shilpa
parent 0cbe4a3f7c
commit 7321ee32fd
32 changed files with 1435 additions and 281 deletions

View File

@ -219,6 +219,7 @@ Response
- created_at: created
- status: notification_status
- updated_at: updated
- recovery_workflow_details: recovery_workflow_details
- id: notification_id
**Example Show Notification Details**

View File

@ -295,6 +295,14 @@ on_maintenance:
in: body
required: false
type: boolean
recovery_workflow_details:
description: |
Recovery workflow details of the notification. This is a list of dictionary.
``New in version 1.1``
in: body
required: true
type: array
reserved:
description: |
A boolean indicates whether this host is reserved or not, if it is

View File

@ -1,19 +1,48 @@
{
"notification": {
"notification_uuid": "9e66b95d-45da-4695-bfb6-ace68b35d955",
"status": "new",
"source_host_uuid": "083a8474-22c0-407f-b89b-c569134c3bfd",
"notification_uuid": "07a331b8-df15-4582-b121-73ed3541a408",
"status": "finished",
"source_host_uuid": "b5bc49be-ea6f-472d-9240-968f75d7a16a",
"deleted": false,
"created_at": "2017-04-24T06:37:37.396994",
"updated_at": null,
"id": 4,
"generated_time": "2017-04-24T08:34:46.000000",
"deleted_at": null,
"type": "COMPUTE_HOST",
"created_at": "2019-02-28T07:19:49.000000",
"updated_at": "2019-02-28T07:19:59.000000",
"payload": {
"host_status": "UNKNOWN",
"event": "STOPPED",
"cluster_status": "OFFLINE"
}
"instance_uuid": "b9837317-a5b8-44f4-93b4-45500c562bb8",
"vir_domain_event": "STOPPED_FAILED",
"event": "LIFECYCLE"
},
"recovery_workflow_details": [
{
"progress": 1.0,
"state": "SUCCESS",
"name": "StopInstanceTask",
"progress_details": [
{"timestamp": "2019-03-07 13:54:28.842031", "message": "Stopping instance: df528f02-2415-4a40-bad8-453ad6a519f7", "progress": "0.0"},
{"timestamp": "2019-03-07 13:54:34.442617", "message": "Stopped instance: 'df528f02-2415-4a40-bad8-453ad6a519f7'", "progress": "1.0"}
]
},
{
"progress": 1.0,
"state": "SUCCESS",
"name": "StartInstanceTask",
"progress_details": [
{"timestamp": "2019-03-07 13:54:34.531755", "message": "Starting instance: 'df528f02-2415-4a40-bad8-453ad6a519f7'", "progress": "0.0"},
{"timestamp": "2019-03-07 13:54:35.930430", "message": "Instance started: 'df528f02-2415-4a40-bad8-453ad6a519f7'", "progress": "1.0"}
]
},
{
"progress": 1.0,
"state": "SUCCESS",
"name": "ConfirmInstanceActiveTask",
"progress_details": [
{"timestamp": "2019-03-07 13:54:36.019208", "message": "Confirming instance 'df528f02-2415-4a40-bad8-453ad6a519f7' vm_state is ACTIVE", "progress": "0.0"},
{"timestamp": "2019-03-07 13:54:38.569373", "message": "Confirmed instance 'df528f02-2415-4a40-bad8-453ad6a519f7' vm_state is ACTIVE", "progress": "1.0"}
]
}
],
"generated_time": "2017-06-13T15:34:55.000000",
"deleted_at": null,
"type": "VM",
"id": 13
}
}

View File

@ -108,6 +108,7 @@ sqlparse==0.2.4
statsd==3.2.2
stestr==1.0.0
stevedore==1.20.0
SQLAlchemy-Utils==0.33.10
taskflow==2.16.0
Tempita==0.5.2
tenacity==4.9.0

View File

@ -39,6 +39,7 @@ from masakari.i18n import _
REST_API_VERSION_HISTORY = """REST API Version History:
* 1.0 - Initial version.
* 1.1 - Add support for getting notification progress details
"""
# The minimum and maximum versions of the API supported
@ -47,7 +48,7 @@ REST_API_VERSION_HISTORY = """REST API Version History:
# Note: This only applies for the v1 API once microversions
# support is fully merged.
_MIN_API_VERSION = "1.0"
_MAX_API_VERSION = "1.0"
_MAX_API_VERSION = "1.1"
DEFAULT_API_VERSION = _MIN_API_VERSION

View File

@ -17,6 +17,7 @@ from oslo_utils import timeutils
from six.moves import http_client as http
from webob import exc
from masakari.api import api_version_request
from masakari.api.openstack import common
from masakari.api.openstack import extensions
from masakari.api.openstack.ha.schemas import notifications as schema
@ -123,7 +124,12 @@ class NotificationsController(wsgi.Controller):
context.can(notifications_policies.NOTIFICATIONS % 'detail')
try:
notification = self.api.get_notification(context, id)
if api_version_request.is_supported(req, min_version='1.1'):
notification = (
self.api.get_notification_recovery_workflow_details(
context, id))
else:
notification = self.api.get_notification(context, id)
except exception.NotificationNotFound as err:
raise exc.HTTPNotFound(explanation=err.format_message())
return {'notification': notification}

View File

@ -33,6 +33,11 @@ customized_recovery_flow_group = cfg.OptGroup(
help="Configuration options for customizing various failure recovery"
"workflow tasks.")
taskflow_group = cfg.OptGroup(
'taskflow',
title='Taskflow driver options',
help="Configuration options for taskflow driver")
host_failure_opts = [
cfg.BoolOpt('evacuate_all_instances',
@ -77,6 +82,13 @@ When set to False, it will only execute instance failure recovery actions
for an instance which contain metadata key 'HA_Enabled=True'."""),
]
taskflow_options = [
cfg.StrOpt('connection',
help="""
The SQLAlchemy connection string to use to connect to the taskflow database.
"""),
]
taskflow_driver_recovery_flows = [
cfg.Opt('host_auto_failure_recovery_tasks',
type=types.Dict(
@ -188,16 +200,19 @@ def register_opts(conf):
conf.register_group(instance_recovery_group)
conf.register_group(host_recovery_group)
conf.register_group(customized_recovery_flow_group)
conf.register_group(taskflow_group)
conf.register_opts(instance_failure_options, group=instance_recovery_group)
conf.register_opts(host_failure_opts, group=host_recovery_group)
conf.register_opts(taskflow_driver_recovery_flows,
group=customized_recovery_flow_group)
conf.register_opts(taskflow_options, group=taskflow_group)
def list_opts():
return {
instance_recovery_group.name: instance_failure_options,
host_recovery_group.name: host_failure_opts,
taskflow_group.name: taskflow_options
}

View File

@ -0,0 +1,32 @@
# Copyright 2019 NTT Data.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import masakari.conf
from masakari.engine import driver
CONF = masakari.conf.CONF
NOTIFICATION_DRIVER = CONF.notification_driver
PERSISTENCE_BACKEND = CONF.taskflow.connection
def upgrade(migrate_engine):
"""Upgrade the engine with persistence tables. """
# Get the taskflow driver configured, default is 'taskflow_driver',
# to load persistence tables to store progress details.
taskflow_driver = driver.load_masakari_driver(NOTIFICATION_DRIVER)
if PERSISTENCE_BACKEND:
taskflow_driver.upgrade_backend(PERSISTENCE_BACKEND)

View File

@ -52,6 +52,16 @@ class NotificationDriver(object):
notification_uuid):
pass
@abc.abstractmethod
def get_notification_recovery_workflow_details(self, context,
recovery_method,
notification_uuid):
pass
@abc.abstractmethod
def upgrade_backend(self, backend):
pass
def load_masakari_driver(masakari_driver=None):
"""Load a masakari driver module.

View File

@ -12,30 +12,30 @@
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import os
from oslo_log import log as logging
from oslo_utils import timeutils
from stevedore import named
# For more information please visit: https://wiki.openstack.org/wiki/TaskFlow
import taskflow.engines
from taskflow import exceptions
from taskflow import formatters
from taskflow.listeners import base
from taskflow.listeners import logging as logging_listener
from taskflow.persistence import backends
from taskflow.persistence import models
from taskflow import task
import masakari.conf
from masakari import exception
CONF = masakari.conf.CONF
PERSISTENCE_BACKEND = CONF.taskflow.connection
LOG = logging.getLogger(__name__)
def _make_task_name(cls, addons=None):
"""Makes a pretty name for a task class."""
base_name = ".".join([cls.__module__, cls.__name__])
extra = ''
if addons:
extra = ';%s' % (", ".join([str(a) for a in addons]))
return base_name + extra
class MasakariTask(task.Task):
"""The root task class for all masakari tasks.
@ -43,12 +43,23 @@ class MasakariTask(task.Task):
implement the given task as the task name.
"""
def __init__(self, addons=None, **kwargs):
super(MasakariTask, self).__init__(self.make_name(addons), **kwargs)
def __init__(self, context, novaclient, **kwargs):
super(MasakariTask, self).__init__(self.__class__.__name__, **kwargs)
self.context = context
self.novaclient = novaclient
self.progress = []
@classmethod
def make_name(cls, addons=None):
return _make_task_name(cls, addons)
def update_details(self, progress_data, progress=0.0):
progress_details = {
'timestamp': str(timeutils.utcnow()),
'progress': progress,
'message': progress_data
}
self.progress.append(progress_details)
self._notifier.notify('update_progress', {'progress': progress,
"progress_details":
self.progress})
class SpecialFormatter(formatters.FailureFormatter):
@ -102,3 +113,22 @@ def get_recovery_flow(task_list, **kwargs):
name_order=True, invoke_on_load=True, invoke_kwds=kwargs)
for extension in extensions.extensions:
yield extension.obj
def load_taskflow_into_engine(action, nested_flow,
process_what):
book = None
backend = None
if PERSISTENCE_BACKEND:
backend = backends.fetch(PERSISTENCE_BACKEND)
with contextlib.closing(backend.get_connection()) as conn:
try:
book = conn.get_logbook(process_what['notification_uuid'])
except exceptions.NotFound:
pass
if book is None:
book = models.LogBook(action,
process_what['notification_uuid'])
return taskflow.engines.load(nested_flow, store=process_what,
backend=backend, book=book)

View File

@ -18,11 +18,16 @@ Driver TaskFlowDriver:
Execute notification workflows using taskflow.
"""
from collections import OrderedDict
import contextlib
from oslo_log import log as logging
from oslo_utils import excutils
from taskflow import exceptions
from taskflow.persistence import backends
from masakari.compute import nova
import masakari.conf
from masakari.engine import driver
from masakari.engine.drivers.taskflow import base
from masakari.engine.drivers.taskflow import host_failure
@ -30,9 +35,13 @@ from masakari.engine.drivers.taskflow import instance_failure
from masakari.engine.drivers.taskflow import process_failure
from masakari import exception
from masakari.i18n import _
from masakari import objects
from masakari.objects import fields
CONF = masakari.conf.CONF
TASKFLOW_CONF = CONF.taskflow_driver_recovery_flows
PERSISTENCE_BACKEND = CONF.taskflow.connection
LOG = logging.getLogger(__name__)
@ -40,8 +49,9 @@ class TaskFlowDriver(driver.NotificationDriver):
def __init__(self):
super(TaskFlowDriver, self).__init__()
def _execute_auto_workflow(self, novaclient, process_what):
flow_engine = host_failure.get_auto_flow(novaclient, process_what)
def _execute_auto_workflow(self, context, novaclient, process_what):
flow_engine = host_failure.get_auto_flow(context, novaclient,
process_what)
# Attaching this listener will capture all of the notifications
# that taskflow sends out and redirect them to a more useful
@ -49,7 +59,7 @@ class TaskFlowDriver(driver.NotificationDriver):
with base.DynamicLogListener(flow_engine, logger=LOG):
flow_engine.run()
def _execute_rh_workflow(self, novaclient, process_what,
def _execute_rh_workflow(self, context, novaclient, process_what,
reserved_host_list):
if not reserved_host_list:
msg = _('No reserved_hosts available for evacuation.')
@ -57,7 +67,8 @@ class TaskFlowDriver(driver.NotificationDriver):
raise exception.ReservedHostsUnavailable(message=msg)
process_what['reserved_host_list'] = reserved_host_list
flow_engine = host_failure.get_rh_flow(novaclient, process_what)
flow_engine = host_failure.get_rh_flow(context, novaclient,
process_what)
with base.DynamicLogListener(flow_engine, logger=LOG):
try:
@ -65,10 +76,10 @@ class TaskFlowDriver(driver.NotificationDriver):
except exception.LockAlreadyAcquired as ex:
raise exception.HostRecoveryFailureException(ex.message)
def _execute_auto_priority_workflow(self, novaclient, process_what,
reserved_host_list):
def _execute_auto_priority_workflow(self, context, novaclient,
process_what, reserved_host_list):
try:
self._execute_auto_workflow(novaclient, process_what)
self._execute_auto_workflow(context, novaclient, process_what)
except Exception as ex:
with excutils.save_and_reraise_exception(reraise=False) as ctxt:
if isinstance(ex, exception.SkipHostRecoveryException):
@ -87,13 +98,13 @@ class TaskFlowDriver(driver.NotificationDriver):
'reserved_host':
fields.FailoverSegmentRecoveryMethod.RESERVED_HOST
})
self._execute_rh_workflow(novaclient, process_what,
self._execute_rh_workflow(context, novaclient, process_what,
reserved_host_list)
def _execute_rh_priority_workflow(self, novaclient, process_what,
def _execute_rh_priority_workflow(self, context, novaclient, process_what,
reserved_host_list):
try:
self._execute_rh_workflow(novaclient, process_what,
self._execute_rh_workflow(context, novaclient, process_what,
reserved_host_list)
except Exception as ex:
with excutils.save_and_reraise_exception(reraise=False) as ctxt:
@ -113,30 +124,32 @@ class TaskFlowDriver(driver.NotificationDriver):
fields.FailoverSegmentRecoveryMethod.RESERVED_HOST,
'auto': fields.FailoverSegmentRecoveryMethod.AUTO
})
self._execute_auto_workflow(novaclient, process_what)
self._execute_auto_workflow(context, novaclient, process_what)
def execute_host_failure(self, context, host_name, recovery_method,
notification_uuid, reserved_host_list=None):
novaclient = nova.API()
# get flow for host failure
process_what = {
'context': context,
'host_name': host_name
'host_name': host_name,
'notification_uuid': notification_uuid
}
try:
if recovery_method == fields.FailoverSegmentRecoveryMethod.AUTO:
self._execute_auto_workflow(novaclient, process_what)
self._execute_auto_workflow(context, novaclient, process_what)
elif recovery_method == (
fields.FailoverSegmentRecoveryMethod.RESERVED_HOST):
self._execute_rh_workflow(novaclient, process_what,
self._execute_rh_workflow(context, novaclient, process_what,
reserved_host_list)
elif recovery_method == (
fields.FailoverSegmentRecoveryMethod.AUTO_PRIORITY):
self._execute_auto_priority_workflow(novaclient, process_what,
reserved_host_list)
self._execute_auto_priority_workflow(context, novaclient,
process_what,
reserved_host_list)
else:
self._execute_rh_priority_workflow(novaclient, process_what,
self._execute_rh_priority_workflow(context, novaclient,
process_what,
reserved_host_list)
except Exception as exc:
with excutils.save_and_reraise_exception(reraise=False) as ctxt:
@ -154,13 +167,13 @@ class TaskFlowDriver(driver.NotificationDriver):
novaclient = nova.API()
# get flow for instance failure
process_what = {
'context': context,
'instance_uuid': instance_uuid
'instance_uuid': instance_uuid,
'notification_uuid': notification_uuid
}
try:
flow_engine = instance_failure.get_instance_recovery_flow(
novaclient, process_what)
context, novaclient, process_what)
except Exception:
msg = (_('Failed to create instance failure flow.'),
notification_uuid)
@ -178,9 +191,9 @@ class TaskFlowDriver(driver.NotificationDriver):
novaclient = nova.API()
# get flow for process failure
process_what = {
'context': context,
'process_name': process_name,
'host_name': host_name
'host_name': host_name,
'notification_uuid': notification_uuid
}
# TODO(abhishekk) We need to create a map for process_name and
@ -194,7 +207,7 @@ class TaskFlowDriver(driver.NotificationDriver):
raise exception.SkipProcessRecoveryException()
try:
flow_engine = recovery_flow(novaclient, process_what)
flow_engine = recovery_flow(context, novaclient, process_what)
except Exception:
msg = (_('Failed to create process failure flow.'),
notification_uuid)
@ -206,3 +219,90 @@ class TaskFlowDriver(driver.NotificationDriver):
# masakari's debugging (or error reporting) usage.
with base.DynamicLogListener(flow_engine, logger=LOG):
flow_engine.run()
@contextlib.contextmanager
def upgrade_backend(self, persistence_backend):
try:
backend = backends.fetch(persistence_backend)
with contextlib.closing(backend.get_connection()) as conn:
conn.upgrade()
except exceptions.NotFound as e:
raise e
def _get_taskflow_sequence(self, context, recovery_method, notification):
# Get the taskflow sequence based on the recovery method.
novaclient = nova.API()
task_list = []
# Get linear task flow based on notification type
if notification.type == fields.NotificationType.VM:
tasks = TASKFLOW_CONF.instance_failure_recovery_tasks
elif notification.type == fields.NotificationType.PROCESS:
tasks = TASKFLOW_CONF.process_failure_recovery_tasks
elif notification.type == fields.NotificationType.COMPUTE_HOST:
if recovery_method == fields.FailoverSegmentRecoveryMethod.AUTO:
tasks = TASKFLOW_CONF.host_auto_failure_recovery_tasks
elif recovery_method == (
fields.FailoverSegmentRecoveryMethod.RESERVED_HOST):
tasks = TASKFLOW_CONF.host_rh_failure_recovery_tasks
for plugin in base.get_recovery_flow(tasks['pre'],
context=context,
novaclient=novaclient):
task_list.append(plugin.name)
for plugin in base.get_recovery_flow(tasks['main'],
context=context,
novaclient=novaclient):
task_list.append(plugin.name)
for plugin in base.get_recovery_flow(tasks['post'],
context=context,
novaclient=novaclient):
task_list.append(plugin.name)
return task_list
def get_notification_recovery_workflow_details(self, context,
recovery_method,
notification):
"""Retrieve progress details in notification"""
# Note<ShilpaSD>: Taskflow doesn't support to return task details in
# the same sequence in which all tasks are executed. Reported this
# issue in LP #1815738. To resolve this issue load the tasks based on
# the recovery method and later sort it based on this task list so
# progress_details can be returned in the expected order.
task_list = self._get_taskflow_sequence(context, recovery_method,
notification)
backend = backends.fetch(PERSISTENCE_BACKEND)
with contextlib.closing(backend.get_connection()) as conn:
progress_details = []
flow_details = conn.get_flows_for_book(
notification.notification_uuid)
for flow in flow_details:
od = OrderedDict()
atom_details = list(conn.get_atoms_for_flow(flow.uuid))
for task in task_list:
for atom in atom_details:
if task == atom.name:
od[atom.name] = atom
for key, value in od.items():
# Add progress_details only if tasks are executed and meta
# is available in which progress_details are stored.
if value.meta:
progress_details_obj = (
objects.NotificationProgressDetails.create(
value.name,
value.meta['progress'],
value.meta['progress_details']['details']
['progress_details'],
value.state))
progress_details.append(progress_details_obj)
return progress_details

View File

@ -22,57 +22,55 @@ from oslo_log import log as logging
from oslo_service import loopingcall
from oslo_utils import excutils
from oslo_utils import strutils
import taskflow.engines
from taskflow.patterns import linear_flow
from taskflow import retry
import masakari.conf
from masakari.engine.drivers.taskflow import base
from masakari import exception
from masakari.i18n import _
from masakari import utils
CONF = masakari.conf.CONF
LOG = logging.getLogger(__name__)
ACTION = 'instance:evacuate'
# Instance power_state
SHUTDOWN = 4
TASKFLOW_CONF = cfg.CONF.taskflow_driver_recovery_flows
class DisableComputeServiceTask(base.MasakariTask):
def __init__(self, novaclient):
def __init__(self, context, novaclient):
requires = ["host_name"]
super(DisableComputeServiceTask, self).__init__(addons=[ACTION],
requires=requires)
self.novaclient = novaclient
def execute(self, context, host_name):
self.novaclient.enable_disable_service(context, host_name)
super(DisableComputeServiceTask, self).__init__(context,
novaclient,
requires=requires
)
def execute(self, host_name):
msg = "Disabling compute service on host: '%s'" % host_name
self.update_details(msg)
self.novaclient.enable_disable_service(self.context, host_name)
# Sleep until nova-compute service is marked as disabled.
msg = ("Sleeping %(wait)s sec before starting recovery "
log_msg = ("Sleeping %(wait)s sec before starting recovery "
"thread until nova recognizes the node down.")
LOG.info(msg, {'wait': CONF.wait_period_after_service_update})
LOG.info(log_msg, {'wait': CONF.wait_period_after_service_update})
eventlet.sleep(CONF.wait_period_after_service_update)
msg = "Disabled compute service on host: '%s'" % host_name
self.update_details(msg, 1.0)
class PrepareHAEnabledInstancesTask(base.MasakariTask):
"""Get all HA_Enabled instances."""
default_provides = set(["instance_list"])
def __init__(self, novaclient):
def __init__(self, context, novaclient):
requires = ["host_name"]
super(PrepareHAEnabledInstancesTask, self).__init__(addons=[ACTION],
super(PrepareHAEnabledInstancesTask, self).__init__(context,
novaclient,
requires=requires)
self.novaclient = novaclient
def execute(self, context, host_name):
def execute(self, host_name):
def _filter_instances(instance_list):
ha_enabled_instances = []
non_ha_enabled_instances = []
@ -84,8 +82,10 @@ class PrepareHAEnabledInstancesTask(base.MasakariTask):
getattr(instance, "OS-EXT-STS:vm_state") == "error"):
if is_instance_ha_enabled:
msg = ("Ignoring recovery of HA_Enabled instance "
"'%(instance_id)s' as it is in 'error' state.")
LOG.info(msg, {'instance_id': instance.id})
"'%(instance_id)s' as it is in 'error' state."
) % {'instance_id': instance.id}
LOG.info(msg)
self.update_details(msg, 0.4)
continue
if is_instance_ha_enabled:
@ -93,20 +93,48 @@ class PrepareHAEnabledInstancesTask(base.MasakariTask):
else:
non_ha_enabled_instances.append(instance)
msg = "Total HA Enabled instances count: '%d'" % len(
ha_enabled_instances)
self.update_details(msg, 0.6)
if CONF.host_failure.evacuate_all_instances:
msg = ("Total Non-HA Enabled instances count: '%d'" % len(
non_ha_enabled_instances))
self.update_details(msg, 0.7)
ha_enabled_instances.extend(non_ha_enabled_instances)
msg = ("All instances (HA Enabled/Non-HA Enabled) should be "
"considered for evacuation. Total count is: '%d'") % (
len(ha_enabled_instances))
self.update_details(msg, 0.8)
return ha_enabled_instances
instance_list = self.novaclient.get_servers(context, host_name)
msg = "Preparing instances for evacuation"
self.update_details(msg)
instance_list = self.novaclient.get_servers(self.context, host_name)
msg = ("Total instances running on failed host '%(host_name)s' is "
"%(instance_list)d") % {'host_name': host_name,
'instance_list': len(instance_list)}
self.update_details(msg, 0.3)
instance_list = _filter_instances(instance_list)
if not instance_list:
msg = _('No instances to evacuate on host: %s.') % host_name
msg = ("Skipped host '%s' recovery as no instances needs to be "
"evacuated" % host_name)
self.update_details(msg, 1.0)
LOG.info(msg)
raise exception.SkipHostRecoveryException(message=msg)
# List of instance UUID
instance_list = [instance.id for instance in instance_list]
msg = "Instances to be evacuated are: '%s'" % ','.join(instance_list)
self.update_details(msg, 1.0)
return {
"instance_list": instance_list,
}
@ -114,11 +142,11 @@ class PrepareHAEnabledInstancesTask(base.MasakariTask):
class EvacuateInstancesTask(base.MasakariTask):
def __init__(self, novaclient):
def __init__(self, context, novaclient):
requires = ["host_name", "instance_list"]
super(EvacuateInstancesTask, self).__init__(addons=[ACTION],
super(EvacuateInstancesTask, self).__init__(context,
novaclient,
requires=requires)
self.novaclient = novaclient
def _get_state_and_host_of_instance(self, context, instance):
new_instance = self.novaclient.get_server(context, instance.id)
@ -131,7 +159,7 @@ class EvacuateInstancesTask(base.MasakariTask):
def _stop_after_evacuation(self, context, instance):
def _wait_for_stop_confirmation():
old_vm_state, new_vm_state, _ = (
old_vm_state, new_vm_state, instance_host = (
self._get_state_and_host_of_instance(context, instance))
if new_vm_state == 'stopped':
@ -149,9 +177,10 @@ class EvacuateInstancesTask(base.MasakariTask):
periodic_call_stopped.wait)
except etimeout.Timeout:
with excutils.save_and_reraise_exception():
msg = ("Instance '%s' is successfully evacuated but "
"failed to stop.")
LOG.warning(msg, instance.id)
msg = ("Instance '%(uuid)s' is successfully evacuated but "
"failed to stop.") % {'uuid': instance.id}
LOG.warning(msg)
self.update_details(msg, 1.0)
finally:
periodic_call_stopped.stop()
@ -171,7 +200,6 @@ class EvacuateInstancesTask(base.MasakariTask):
def _wait_for_evacuation_confirmation():
old_vm_state, new_vm_state, instance_host = (
self._get_state_and_host_of_instance(context, instance))
if instance_host != host_name:
if ((old_vm_state == 'error' and
new_vm_state == 'active') or
@ -220,7 +248,7 @@ class EvacuateInstancesTask(base.MasakariTask):
if vm_state != 'active':
if stop_instance:
self._stop_after_evacuation(context, instance)
self._stop_after_evacuation(self.context, instance)
# If the instance was in 'error' state before failure
# it should be set to 'error' after recovery.
if vm_state == 'error':
@ -242,11 +270,18 @@ class EvacuateInstancesTask(base.MasakariTask):
# Unlock the server after evacuation and confirmation
self.novaclient.unlock_server(context, instance.id)
def execute(self, context, host_name, instance_list, reserved_host=None):
def execute(self, host_name, instance_list, reserved_host=None):
msg = ("Start evacuation of instances from failed host '%(host_name)s'"
", instance uuids are : '%(instance_list)s'") % {
'host_name': host_name, 'instance_list': ','.join(instance_list)}
self.update_details(msg)
def _do_evacuate(context, host_name, instance_list,
reserved_host=None):
failed_evacuation_instances = []
if reserved_host:
msg = "Enabling reserved host: '%s'" % reserved_host.name
self.update_details(msg, 0.1)
if CONF.host_failure.add_reserved_host_to_aggregate:
# Assign reserved_host to an aggregate to which the failed
# compute host belongs to.
@ -254,15 +289,27 @@ class EvacuateInstancesTask(base.MasakariTask):
for aggregate in aggregates:
if host_name in aggregate.hosts:
try:
msg = ("Add host %(reserved_host)s to "
"aggregate %(aggregate)s") % {
'reserved_host': reserved_host.name,
'aggregate': aggregate.name}
self.update_details(msg, 0.2)
self.novaclient.add_host_to_aggregate(
context, reserved_host.name, aggregate)
msg = ("Added host %(reserved_host)s to "
"aggregate %(aggregate)s") % {
'reserved_host': reserved_host.name,
'aggregate': aggregate.name}
self.update_details(msg, 0.3)
except exception.Conflict:
msg = ("Host '%(reserved_host)s' already has "
"been added to aggregate "
"'%(aggregate)s'.")
LOG.info(msg,
{'reserved_host': reserved_host.name,
'aggregate': aggregate.name})
"'%(aggregate)s'.") % {
'reserved_host': reserved_host.name,
'aggregate': aggregate.name}
self.update_details(msg, 1.0)
LOG.info(msg)
# A failed compute host can be associated with
# multiple aggregates but operators will not
@ -280,38 +327,52 @@ class EvacuateInstancesTask(base.MasakariTask):
thread_pool = greenpool.GreenPool(
CONF.host_failure_recovery_threads)
for instance in instance_list:
for instance_id in instance_list:
msg = "Evacuation of instance started : '%s'" % instance_id
self.update_details(msg, 0.5)
instance = self.novaclient.get_server(self.context,
instance_id)
thread_pool.spawn_n(self._evacuate_and_confirm, context,
instance, host_name,
failed_evacuation_instances, reserved_host)
if not (instance_id in failed_evacuation_instances):
msg = ("Instance '%s' evacuated successfully" %
instance_id)
self.update_details(msg, 0.7)
thread_pool.waitall()
if failed_evacuation_instances:
msg = _("Failed to evacuate instances %(instances)s from "
"host %(host_name)s.") % {
'instances': failed_evacuation_instances,
'host_name': host_name
}
raise exception.HostRecoveryFailureException(message=msg)
msg = ("Failed to evacuate instances "
"'%(failed_evacuation_instances)s' from host "
"'%(host_name)s'") % {
'failed_evacuation_instances':
','.join(failed_evacuation_instances),
'host_name': host_name}
self.update_details(msg, 1.0)
raise exception.HostRecoveryFailureException(
message=msg)
lock_name = reserved_host.name if reserved_host else None
@utils.synchronized(lock_name)
def do_evacuate_with_reserved_host(context, host_name, instance_list,
reserved_host):
_do_evacuate(context, host_name, instance_list,
_do_evacuate(self.context, host_name, instance_list,
reserved_host=reserved_host)
if lock_name:
do_evacuate_with_reserved_host(context, host_name, instance_list,
reserved_host)
do_evacuate_with_reserved_host(self.context, host_name,
instance_list, reserved_host)
else:
# No need to acquire lock on reserved_host when recovery_method is
# 'auto' as the selection of compute host will be decided by nova.
_do_evacuate(context, host_name, instance_list)
_do_evacuate(self.context, host_name, instance_list)
def get_auto_flow(novaclient, process_what):
def get_auto_flow(context, novaclient, process_what):
"""Constructs and returns the engine entrypoint flow.
This flow will do the following:
@ -328,17 +389,17 @@ def get_auto_flow(novaclient, process_what):
task_dict = TASKFLOW_CONF.host_auto_failure_recovery_tasks
auto_evacuate_flow_pre = linear_flow.Flow('pre_tasks')
for plugin in base.get_recovery_flow(task_dict['pre'],
for plugin in base.get_recovery_flow(task_dict['pre'], context=context,
novaclient=novaclient):
auto_evacuate_flow_pre.add(plugin)
auto_evacuate_flow_main = linear_flow.Flow('main_tasks')
for plugin in base.get_recovery_flow(task_dict['main'],
for plugin in base.get_recovery_flow(task_dict['main'], context=context,
novaclient=novaclient):
auto_evacuate_flow_main.add(plugin)
auto_evacuate_flow_post = linear_flow.Flow('post_tasks')
for plugin in base.get_recovery_flow(task_dict['post'],
for plugin in base.get_recovery_flow(task_dict['post'], context=context,
novaclient=novaclient):
auto_evacuate_flow_post.add(plugin)
@ -346,10 +407,11 @@ def get_auto_flow(novaclient, process_what):
nested_flow.add(auto_evacuate_flow_main)
nested_flow.add(auto_evacuate_flow_post)
return taskflow.engines.load(nested_flow, store=process_what)
return base.load_taskflow_into_engine(ACTION, nested_flow,
process_what)
def get_rh_flow(novaclient, process_what):
def get_rh_flow(context, novaclient, process_what):
"""Constructs and returns the engine entrypoint flow.
This flow will do the following:
@ -365,7 +427,7 @@ def get_rh_flow(novaclient, process_what):
task_dict = TASKFLOW_CONF.host_rh_failure_recovery_tasks
rh_evacuate_flow_pre = linear_flow.Flow('pre_tasks')
for plugin in base.get_recovery_flow(task_dict['pre'],
for plugin in base.get_recovery_flow(task_dict['pre'], context=context,
novaclient=novaclient):
rh_evacuate_flow_pre.add(plugin)
@ -373,12 +435,12 @@ def get_rh_flow(novaclient, process_what):
"retry_%s" % flow_name, retry=retry.ParameterizedForEach(
rebind=['reserved_host_list'], provides='reserved_host'))
for plugin in base.get_recovery_flow(task_dict['main'],
for plugin in base.get_recovery_flow(task_dict['main'], context=context,
novaclient=novaclient):
rh_evacuate_flow_main.add(plugin)
rh_evacuate_flow_post = linear_flow.Flow('post_tasks')
for plugin in base.get_recovery_flow(task_dict['post'],
for plugin in base.get_recovery_flow(task_dict['post'], context=context,
novaclient=novaclient):
rh_evacuate_flow_post.add(plugin)
@ -386,4 +448,5 @@ def get_rh_flow(novaclient, process_what):
nested_flow.add(rh_evacuate_flow_main)
nested_flow.add(rh_evacuate_flow_post)
return taskflow.engines.load(nested_flow, store=process_what)
return base.load_taskflow_into_engine(ACTION, nested_flow,
process_what)

View File

@ -19,34 +19,29 @@ from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import loopingcall
from oslo_utils import strutils
import taskflow.engines
from taskflow.patterns import linear_flow
import masakari.conf
from masakari.engine.drivers.taskflow import base
from masakari import exception
from masakari.i18n import _
CONF = masakari.conf.CONF
LOG = logging.getLogger(__name__)
ACTION = "instance:recovery"
TASKFLOW_CONF = cfg.CONF.taskflow_driver_recovery_flows
class StopInstanceTask(base.MasakariTask):
def __init__(self, novaclient):
def __init__(self, context, novaclient):
requires = ["instance_uuid"]
super(StopInstanceTask, self).__init__(addons=[ACTION],
super(StopInstanceTask, self).__init__(context,
novaclient,
requires=requires)
self.novaclient = novaclient
def execute(self, context, instance_uuid):
def execute(self, instance_uuid):
"""Stop the instance for recovery."""
instance = self.novaclient.get_server(context, instance_uuid)
instance = self.novaclient.get_server(self.context, instance_uuid)
# If an instance is not HA_Enabled and "process_all_instances" config
# option is also disabled, then there is no need to take any recovery
@ -54,27 +49,35 @@ class StopInstanceTask(base.MasakariTask):
if not CONF.instance_failure.process_all_instances and not (
strutils.bool_from_string(
instance.metadata.get('HA_Enabled', False))):
LOG.info("Skipping recovery for instance: %s as it is "
"not Ha_Enabled.", instance_uuid)
msg = ("Skipping recovery for instance: %(instance_uuid)s as it is"
" not Ha_Enabled") % {'instance_uuid': instance_uuid}
LOG.info(msg)
self.update_details(msg, 1.0)
raise exception.SkipInstanceRecoveryException()
vm_state = getattr(instance, 'OS-EXT-STS:vm_state')
if vm_state in ['paused', 'rescued']:
msg = _("Recovery of instance '%(instance_uuid)s' is ignored as"
" it is in '%(vm_state)s' state.") % {
'instance_uuid': instance_uuid, 'vm_state': vm_state}
msg = ("Recovery of instance '%(instance_uuid)s' is ignored as it "
"is in '%(vm_state)s' state.") % {
'instance_uuid': instance_uuid, 'vm_state': vm_state
}
LOG.warning(msg)
self.update_details(msg, 1.0)
raise exception.IgnoreInstanceRecoveryException(msg)
if vm_state != 'stopped':
if vm_state == 'resized':
self.novaclient.reset_instance_state(
context, instance.id, 'active')
self.context, instance.id, 'active')
self.novaclient.stop_server(context, instance.id)
msg = "Stopping instance: %s" % instance_uuid
self.update_details(msg)
self.novaclient.stop_server(self.context, instance.id)
def _wait_for_power_off():
new_instance = self.novaclient.get_server(context, instance_uuid)
new_instance = self.novaclient.get_server(self.context,
instance_uuid)
vm_state = getattr(new_instance, 'OS-EXT-STS:vm_state')
if vm_state == 'stopped':
raise loopingcall.LoopingCallDone()
@ -87,48 +90,60 @@ class StopInstanceTask(base.MasakariTask):
periodic_call.start(interval=CONF.verify_interval)
etimeout.with_timeout(CONF.wait_period_after_power_off,
periodic_call.wait)
msg = "Stopped instance: '%s'" % instance_uuid
self.update_details(msg, 1.0)
except etimeout.Timeout:
msg = _("Failed to stop instance %(instance)s") % {
msg = "Failed to stop instance %(instance)s" % {
'instance': instance.id
}
raise exception.InstanceRecoveryFailureException(message=msg)
self.update_details(msg, 1.0)
raise exception.InstanceRecoveryFailureException(
message=msg)
finally:
# stop the periodic call, in case of exceptions or Timeout.
periodic_call.stop()
class StartInstanceTask(base.MasakariTask):
def __init__(self, novaclient):
def __init__(self, context, novaclient):
requires = ["instance_uuid"]
super(StartInstanceTask, self).__init__(addons=[ACTION],
super(StartInstanceTask, self).__init__(context,
novaclient,
requires=requires)
self.novaclient = novaclient
def execute(self, context, instance_uuid):
def execute(self, instance_uuid):
"""Start the instance."""
instance = self.novaclient.get_server(context, instance_uuid)
msg = "Starting instance: '%s'" % instance_uuid
self.update_details(msg)
instance = self.novaclient.get_server(self.context, instance_uuid)
vm_state = getattr(instance, 'OS-EXT-STS:vm_state')
if vm_state == 'stopped':
self.novaclient.start_server(context, instance.id)
self.novaclient.start_server(self.context, instance.id)
msg = "Instance started: '%s'" % instance_uuid
self.update_details(msg, 1.0)
else:
msg = _("Invalid state for Instance %(instance)s. Expected state: "
"'STOPPED', Actual state: '%(actual_state)s'") % {
msg = ("Invalid state for Instance %(instance)s. Expected state: "
"'STOPPED', Actual state: '%(actual_state)s'") % {
'instance': instance_uuid,
'actual_state': vm_state
}
raise exception.InstanceRecoveryFailureException(message=msg)
self.update_details(msg, 1.0)
raise exception.InstanceRecoveryFailureException(
message=msg)
class ConfirmInstanceActiveTask(base.MasakariTask):
def __init__(self, novaclient):
def __init__(self, context, novaclient):
requires = ["instance_uuid"]
super(ConfirmInstanceActiveTask, self).__init__(addons=[ACTION],
super(ConfirmInstanceActiveTask, self).__init__(context,
novaclient,
requires=requires)
self.novaclient = novaclient
def execute(self, context, instance_uuid):
def execute(self, instance_uuid):
def _wait_for_active():
new_instance = self.novaclient.get_server(context, instance_uuid)
new_instance = self.novaclient.get_server(self.context,
instance_uuid)
vm_state = getattr(new_instance, 'OS-EXT-STS:vm_state')
if vm_state == 'active':
raise loopingcall.LoopingCallDone()
@ -136,21 +151,29 @@ class ConfirmInstanceActiveTask(base.MasakariTask):
periodic_call = loopingcall.FixedIntervalLoopingCall(
_wait_for_active)
try:
msg = "Confirming instance '%s' vm_state is ACTIVE" % instance_uuid
self.update_details(msg)
# add a timeout to the periodic call.
periodic_call.start(interval=CONF.verify_interval)
etimeout.with_timeout(CONF.wait_period_after_power_on,
periodic_call.wait)
msg = "Confirmed instance '%s' vm_state is ACTIVE" % instance_uuid
self.update_details(msg, 1.0)
except etimeout.Timeout:
msg = _("Failed to start instance %(instance)s") % {
msg = "Failed to start instance %(instance)s" % {
'instance': instance_uuid
}
raise exception.InstanceRecoveryFailureException(message=msg)
self.update_details(msg, 1.0)
raise exception.InstanceRecoveryFailureException(
message=msg)
finally:
# stop the periodic call, in case of exceptions or Timeout.
periodic_call.stop()
def get_instance_recovery_flow(novaclient, process_what):
def get_instance_recovery_flow(context, novaclient, process_what):
"""Constructs and returns the engine entrypoint flow.
This flow will do the following:
@ -166,17 +189,17 @@ def get_instance_recovery_flow(novaclient, process_what):
task_dict = TASKFLOW_CONF.instance_failure_recovery_tasks
instance_recovery_workflow_pre = linear_flow.Flow('pre_tasks')
for plugin in base.get_recovery_flow(task_dict['pre'],
for plugin in base.get_recovery_flow(task_dict['pre'], context=context,
novaclient=novaclient):
instance_recovery_workflow_pre.add(plugin)
instance_recovery_workflow_main = linear_flow.Flow('main_tasks')
for plugin in base.get_recovery_flow(task_dict['main'],
for plugin in base.get_recovery_flow(task_dict['main'], context=context,
novaclient=novaclient):
instance_recovery_workflow_main.add(plugin)
instance_recovery_workflow_post = linear_flow.Flow('post_tasks')
for plugin in base.get_recovery_flow(task_dict['post'],
for plugin in base.get_recovery_flow(task_dict['post'], context=context,
novaclient=novaclient):
instance_recovery_workflow_post.add(plugin)
@ -184,4 +207,5 @@ def get_instance_recovery_flow(novaclient, process_what):
nested_flow.add(instance_recovery_workflow_main)
nested_flow.add(instance_recovery_workflow_post)
return taskflow.engines.load(nested_flow, store=process_what)
return base.load_taskflow_into_engine(ACTION, nested_flow,
process_what)

View File

@ -21,7 +21,8 @@ LOG = logging.getLogger(__name__)
class Noop(task.Task):
def __init__(self, novaclient):
def __init__(self, context, novaclient):
self.context = context
self.novaclient = novaclient
super(Noop, self).__init__()

View File

@ -18,75 +18,86 @@ from eventlet import timeout as etimeout
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import loopingcall
import taskflow.engines
from taskflow.patterns import linear_flow
import masakari.conf
from masakari.engine.drivers.taskflow import base
from masakari import exception
from masakari.i18n import _
CONF = masakari.conf.CONF
LOG = logging.getLogger(__name__)
ACTION = "process:recovery"
TASKFLOW_CONF = cfg.CONF.taskflow_driver_recovery_flows
class DisableComputeNodeTask(base.MasakariTask):
def __init__(self, novaclient):
def __init__(self, context, novaclient):
requires = ["process_name", "host_name"]
super(DisableComputeNodeTask, self).__init__(addons=[ACTION],
super(DisableComputeNodeTask, self).__init__(context,
novaclient,
requires=requires)
self.novaclient = novaclient
def execute(self, context, process_name, host_name):
if not self.novaclient.is_service_down(context, host_name,
def execute(self, process_name, host_name):
msg = "Disabling compute service on host: '%s'" % host_name
self.update_details(msg)
if not self.novaclient.is_service_down(self.context, host_name,
process_name):
# disable compute node on given host
self.novaclient.enable_disable_service(context, host_name)
self.novaclient.enable_disable_service(self.context, host_name)
msg = "Disabled compute service on host: '%s'" % host_name
self.update_details(msg, 1.0)
else:
LOG.info("Skipping recovery for process: %s as it is "
"already disabled.",
process_name)
msg = ("Skipping recovery for process %(process_name)s as it is "
"already disabled") % {'process_name': process_name}
LOG.info(msg)
self.update_details(msg, 1.0)
class ConfirmComputeNodeDisabledTask(base.MasakariTask):
def __init__(self, novaclient):
def __init__(self, context, novaclient):
requires = ["process_name", "host_name"]
super(ConfirmComputeNodeDisabledTask, self).__init__(addons=[ACTION],
super(ConfirmComputeNodeDisabledTask, self).__init__(context,
novaclient,
requires=requires)
self.novaclient = novaclient
def execute(self, context, process_name, host_name):
def execute(self, process_name, host_name):
def _wait_for_disable():
service_disabled = self.novaclient.is_service_down(
context, host_name, process_name)
self.context, host_name, process_name)
if service_disabled:
raise loopingcall.LoopingCallDone()
periodic_call = loopingcall.FixedIntervalLoopingCall(
_wait_for_disable)
try:
msg = "Confirming compute service is disabled on host: '%s'" % (
host_name)
self.update_details(msg)
# add a timeout to the periodic call.
periodic_call.start(interval=CONF.verify_interval)
etimeout.with_timeout(
CONF.wait_period_after_service_update,
periodic_call.wait)
msg = "Confirmed compute service is disabled on host: '%s'" % (
host_name)
self.update_details(msg, 1.0)
except etimeout.Timeout:
msg = _("Failed to disable service %(process_name)s") % {
msg = "Failed to disable service %(process_name)s" % {
'process_name': process_name
}
raise exception.ProcessRecoveryFailureException(message=msg)
self.update_details(msg, 1.0)
raise exception.ProcessRecoveryFailureException(
message=msg)
finally:
# stop the periodic call, in case of exceptions or Timeout.
periodic_call.stop()
def get_compute_process_recovery_flow(novaclient, process_what):
def get_compute_process_recovery_flow(context, novaclient, process_what):
"""Constructs and returns the engine entrypoint flow.
This flow will do the following:
@ -101,17 +112,17 @@ def get_compute_process_recovery_flow(novaclient, process_what):
task_dict = TASKFLOW_CONF.process_failure_recovery_tasks
process_recovery_workflow_pre = linear_flow.Flow('pre_tasks')
for plugin in base.get_recovery_flow(task_dict['pre'],
for plugin in base.get_recovery_flow(task_dict['pre'], context=context,
novaclient=novaclient):
process_recovery_workflow_pre.add(plugin)
process_recovery_workflow_main = linear_flow.Flow('main_tasks')
for plugin in base.get_recovery_flow(task_dict['main'],
for plugin in base.get_recovery_flow(task_dict['main'], context=context,
novaclient=novaclient):
process_recovery_workflow_main.add(plugin)
process_recovery_workflow_post = linear_flow.Flow('post_tasks')
for plugin in base.get_recovery_flow(task_dict['post'],
for plugin in base.get_recovery_flow(task_dict['post'], context=context,
novaclient=novaclient):
process_recovery_workflow_post.add(plugin)
@ -119,4 +130,5 @@ def get_compute_process_recovery_flow(novaclient, process_what):
nested_flow.add(process_recovery_workflow_main)
nested_flow.add(process_recovery_workflow_post)
return taskflow.engines.load(nested_flow, store=process_what)
return base.load_taskflow_into_engine(ACTION, nested_flow,
process_what)

View File

@ -31,8 +31,10 @@ from oslo_utils import timeutils
import masakari.conf
from masakari.engine import driver
from masakari.engine import instance_events as virt_events
from masakari.engine import rpcapi
from masakari.engine import utils as engine_utils
from masakari import exception
from masakari.i18n import _
from masakari import manager
from masakari import objects
from masakari.objects import fields
@ -45,8 +47,8 @@ LOG = logging.getLogger(__name__)
class MasakariManager(manager.Manager):
"""Manages the running notifications"""
target = messaging.Target(version='1.0')
RPC_API_VERSION = rpcapi.EngineAPI.RPC_API_VERSION
target = messaging.Target(version=RPC_API_VERSION)
def __init__(self, masakari_driver=None, *args, **kwargs):
"""Load configuration options"""
@ -332,3 +334,23 @@ class MasakariManager(manager.Manager):
"status: %(status)s.",
{'notification_uuid': notification.notification_uuid,
'status': notification_status})
def get_notification_recovery_workflow_details(self, context,
notification):
"""Retrieve recovery workflow details of the notification"""
try:
host_obj = objects.Host.get_by_uuid(
context, notification.source_host_uuid)
recovery_method = host_obj.failover_segment.recovery_method
progress_details = (self.driver.
get_notification_recovery_workflow_details(
context, recovery_method, notification))
notification['recovery_workflow_details'] = progress_details
except Exception:
msg = (_('Failed to fetch notification recovery workflow details '
'for %s'), notification.notification_uuid)
LOG.exception(msg)
raise exception.MasakariException(msg)
return notification

View File

@ -30,9 +30,11 @@ class EngineAPI(rpc.RPCAPI):
.. code-block:: none
1.0 - Initial version.
1.1 - Added get_notification_recovery_workflow_details method to
retrieve progress details from notification driver.
"""
RPC_API_VERSION = '1.0'
RPC_API_VERSION = '1.1'
TOPIC = CONF.masakari_topic
BINARY = 'masakari-engine'
@ -47,3 +49,11 @@ class EngineAPI(rpc.RPCAPI):
version = '1.0'
cctxt = self.client.prepare(version=version)
cctxt.cast(context, 'process_notification', notification=notification)
def get_notification_recovery_workflow_details(self, context,
notification):
version = '1.1'
cctxt = self.client.prepare(version=version)
return cctxt.call(context,
'get_notification_recovery_workflow_details',
notification=notification)

View File

@ -374,3 +374,15 @@ class NotificationAPI(object):
raise exception.NotificationNotFound(id=notification_uuid)
return notification
def get_notification_recovery_workflow_details(self, context,
notification_uuid):
"""Get recovery workflow details details of the notification"""
notification = self.get_notification(context, notification_uuid)
LOG.debug("Fetching recovery workflow details of a notification %s ",
notification_uuid)
notification = (self.engine_rpcapi.
get_notification_recovery_workflow_details(
context, notification))
return notification

View File

@ -28,6 +28,8 @@ ObjectField = fields.ObjectField
BaseEnumField = fields.BaseEnumField
ListOfObjectsField = fields.ListOfObjectsField
ListOfStringsField = fields.ListOfStringsField
FloatField = fields.FloatField
ListOfDictOfNullableStringsField = fields.ListOfDictOfNullableStringsField
Field = fields.Field

View File

@ -27,11 +27,17 @@ from masakari.objects import fields
LOG = logging.getLogger(__name__)
NOTIFICATION_OPTIONAL_FIELDS = ['recovery_workflow_details']
@base.MasakariObjectRegistry.register
class Notification(base.MasakariPersistentObject, base.MasakariObject,
base.MasakariObjectDictCompat):
VERSION = '1.0'
# Version 1.0: Initial version
# Version 1.1: Added recovery_workflow_details field.
# Note: This field shouldn't be persisted.
VERSION = '1.1'
fields = {
'id': fields.IntegerField(),
@ -41,12 +47,19 @@ class Notification(base.MasakariPersistentObject, base.MasakariObject,
'type': fields.NotificationTypeField(),
'payload': fields.DictOfStringsField(),
'status': fields.NotificationStatusField(),
# NOTE(ShilpaSD): This field shouldn't be stored in db.
# The recovery workflow details read from the 'notification_driver'
# will be set to this field.
'recovery_workflow_details': fields.ListOfObjectsField(
'NotificationProgressDetails', default=[])
}
@staticmethod
def _from_db_object(context, notification, db_notification):
for key in notification.fields:
if key in NOTIFICATION_OPTIONAL_FIELDS:
continue
if key != 'payload':
setattr(notification, key, db_notification.get(key))
else:
@ -73,6 +86,9 @@ class Notification(base.MasakariPersistentObject, base.MasakariObject,
raise exception.ObjectActionError(action='create',
reason='already created')
updates = self.masakari_obj_get_changes()
# NOTE(ShilpaSD): This field doesn't exist in the Notification
# db model so don't save it.
updates.pop('recovery_workflow_details', None)
if 'notification_uuid' not in updates:
updates['notification_uuid'] = uuidutils.generate_uuid()
@ -99,6 +115,9 @@ class Notification(base.MasakariPersistentObject, base.MasakariObject,
updates = self.masakari_obj_get_changes()
updates.pop('id', None)
# NOTE(ShilpaSD): This field doesn't exist in the Notification
# db model so don't save it.
updates.pop('recovery_workflow_details', None)
db_notification = db.notification_update(self._context,
self.notification_uuid,
@ -152,3 +171,23 @@ def notification_sample(sample):
cls.sample = sample
return cls
return wrap
@base.MasakariObjectRegistry.register
class NotificationProgressDetails(base.MasakariObject,
base.MasakariObjectDictCompat):
VERSION = '1.0'
fields = {
'name': fields.StringField(),
'progress': fields.FloatField(),
'progress_details': fields.ListOfDictOfNullableStringsField(
default=[]),
'state': fields.StringField()
}
@classmethod
def create(cls, name, progress, progress_details, state,):
return cls(name=name, progress=progress,
progress_details=progress_details, state=state)

View File

@ -15,6 +15,8 @@
"""Tests for the notifications api."""
import copy
import ddt
import mock
from oslo_serialization import jsonutils
@ -35,12 +37,17 @@ from masakari.tests.unit.objects import test_objects
from masakari.tests import uuidsentinel
NOW = timeutils.utcnow().replace(microsecond=0)
OPTIONAL = ['recovery_workflow_details']
def _make_notification_obj(notification_dict):
return notification_obj.Notification(**notification_dict)
def _make_notification_progress_details_obj(progress_details):
return notification_obj.NotificationProgressDetails(**progress_details)
def _make_notifications_list(notifications_list):
return notification_obj.Notification(objects=[
_make_notification_obj(a) for a in notifications_list])
@ -61,6 +68,22 @@ NOTIFICATION_DATA = {"type": "VM", "id": 1,
NOTIFICATION = _make_notification_obj(NOTIFICATION_DATA)
RECOVERY_DETAILS = {"progress": 1.0,
"state": "SUCCESS",
"name": "StopInstanceTask",
"progress_details": [
{"timestamp": "2019-03-07 13:54:28",
"message": "Stopping instance",
"progress": "0.0"},
]}
NOTI_DATA_WITH_DETAILS = copy.deepcopy(NOTIFICATION_DATA)
NOTIFICATION_WITH_PROGRESS_DETAILS = _make_notification_obj(
NOTI_DATA_WITH_DETAILS)
RECOVERY_OBJ = _make_notification_progress_details_obj(RECOVERY_DETAILS)
NOTIFICATION_WITH_PROGRESS_DETAILS.recovery_workflow_details = [RECOVERY_OBJ]
NOTIFICATION_LIST = [
{"type": "VM", "id": 1, "payload": {'event': 'STOPPED',
'host_status': 'NORMAL',
@ -165,7 +188,8 @@ class NotificationTestCase(test.TestCase):
"type": "VM",
"generated_time": "2016-09-13T09:11:21.656788"}})
result = result['notification']
test_objects.compare_obj(self, result, NOTIFICATION_DATA)
test_objects.compare_obj(self, result, NOTIFICATION_DATA,
allow_missing=OPTIONAL)
@mock.patch.object(ha_api.NotificationAPI, 'create_notification')
def test_create_process_notification(self, mock_create):
@ -180,7 +204,8 @@ class NotificationTestCase(test.TestCase):
"type": "PROCESS",
"generated_time": "2016-09-13T09:11:21.656788"}})
result = result['notification']
test_objects.compare_obj(self, result, NOTIFICATION_DATA)
test_objects.compare_obj(self, result, NOTIFICATION_DATA,
allow_missing=OPTIONAL)
@mock.patch('masakari.rpc.get_client')
@mock.patch.object(ha_api.NotificationAPI, 'create_notification')
@ -446,3 +471,30 @@ class NotificationCasePolicyNotAuthorized(test.NoDBTestCase):
self.controller.index,
self.req)
self._check_rule(exc, rule_name)
class NotificationV1_1_TestCase(NotificationTestCase):
"""Test Case for notifications api for 1.1 API"""
api_version = '1.1'
@mock.patch.object(engine_rpcapi, 'EngineAPI')
def setUp(self, mock_rpc):
super(NotificationV1_1_TestCase, self).setUp()
self.controller = notifications.NotificationsController()
self.req = fakes.HTTPRequest.blank('/v1/notifications',
use_admin_context=True,
version=self.api_version)
self.context = self.req.environ['masakari.context']
@mock.patch.object(ha_api.NotificationAPI,
'get_notification_recovery_workflow_details')
def test_show(self, mock_get_notification_recovery_workflow_details):
(mock_get_notification_recovery_workflow_details
.return_value) = NOTIFICATION_WITH_PROGRESS_DETAILS
result = self.controller.show(self.req, uuidsentinel.fake_notification)
result = result['notification']
self.assertItemsEqual([RECOVERY_OBJ],
result.recovery_workflow_details)
self._assert_notification_data(NOTIFICATION_WITH_PROGRESS_DETAILS,
_make_notification_obj(result))

View File

@ -24,12 +24,16 @@ import sqlalchemy
from sqlalchemy.engine import reflection
import sqlalchemy.exc
import masakari.conf
from masakari.db.sqlalchemy import migrate_repo
from masakari.db.sqlalchemy import migration as sa_migration
from masakari.db.sqlalchemy import models
from masakari.tests import fixtures as masakari_fixtures
CONF = masakari.conf.CONF
class MasakariMigrationsCheckers(test_migrations.WalkVersionsMixin):
"""Test sqlalchemy-migrate migrations."""
@ -176,9 +180,46 @@ class MasakariMigrationsCheckers(test_migrations.WalkVersionsMixin):
for table in [failover_segments, hosts]:
self.assertTrue(table.c.created_at.nullable)
def _check_006(self, engine, data):
self.assertColumnExists(engine, 'logbooks', 'created_at')
self.assertColumnExists(engine, 'logbooks', 'updated_at')
self.assertColumnExists(engine, 'logbooks', 'meta')
self.assertColumnExists(engine, 'logbooks', 'name')
self.assertColumnExists(engine, 'logbooks', 'uuid')
self.assertColumnExists(engine, 'flowdetails', 'created_at')
self.assertColumnExists(engine, 'flowdetails', 'updated_at')
self.assertColumnExists(engine, 'flowdetails', 'parent_uuid')
self.assertColumnExists(engine, 'flowdetails', 'meta')
self.assertColumnExists(engine, 'flowdetails', 'name')
self.assertColumnExists(engine, 'flowdetails', 'state')
self.assertColumnExists(engine, 'flowdetails', 'uuid')
self.assertColumnExists(engine, 'atomdetails', 'created_at')
self.assertColumnExists(engine, 'atomdetails', 'updated_at')
self.assertColumnExists(engine, 'atomdetails', 'parent_uuid')
self.assertColumnExists(engine, 'atomdetails', 'meta')
self.assertColumnExists(engine, 'atomdetails', 'name')
self.assertColumnExists(engine, 'atomdetails', 'results')
self.assertColumnExists(engine, 'atomdetails', 'version')
self.assertColumnExists(engine, 'atomdetails', 'state')
self.assertColumnExists(engine, 'atomdetails', 'uuid')
self.assertColumnExists(engine, 'atomdetails', 'failure')
self.assertColumnExists(engine, 'atomdetails', 'atom_type')
self.assertColumnExists(engine, 'atomdetails', 'intention')
self.assertColumnExists(engine, 'atomdetails', 'revert_results')
self.assertColumnExists(engine, 'atomdetails', 'revert_failure')
class TestMasakariMigrationsSQLite(MasakariMigrationsCheckers,
test_base.DbTestCase):
def _check_006(self, engine, data):
# NOTE(ShilpaSD): DB script '006_add_persistence_tables.py' adds db
# tables required for taskflow which doesn't support Sqlite using
# alembic migration.
pass
pass

View File

@ -55,15 +55,15 @@ class HostFailureTestCase(test.TestCase):
def _verify_instance_evacuated(self, old_instance_list):
for server in old_instance_list:
instance = self.novaclient.get_server(self.ctxt, server.id)
instance = self.novaclient.get_server(self.ctxt, server)
if getattr(server, 'OS-EXT-STS:vm_state') in \
['active', 'stopped', 'error']:
self.assertEqual(getattr(server, 'OS-EXT-STS:vm_state'),
getattr(instance, 'OS-EXT-STS:vm_state'))
if getattr(instance, 'OS-EXT-STS:vm_state') in \
['active', 'stopped', 'error']:
self.assertIn(getattr(instance, 'OS-EXT-STS:vm_state'),
['active', 'stopped', 'error'])
else:
if getattr(server, 'OS-EXT-STS:vm_state') == 'resized' and \
getattr(server, 'OS-EXT-STS:power_state') != 4:
if getattr(instance, 'OS-EXT-STS:vm_state') == 'resized' and \
getattr(instance, 'OS-EXT-STS:power_state') != 4:
self.assertEqual('active',
getattr(instance, 'OS-EXT-STS:vm_state'))
else:
@ -71,7 +71,7 @@ class HostFailureTestCase(test.TestCase):
getattr(instance, 'OS-EXT-STS:vm_state'))
if CONF.host_failure.ignore_instances_in_error_state and getattr(
server, 'OS-EXT-STS:vm_state') == 'error':
instance, 'OS-EXT-STS:vm_state') == 'error':
self.assertEqual(
self.instance_host, getattr(
instance, 'OS-EXT-SRV-ATTR:hypervisor_hostname'))
@ -81,49 +81,58 @@ class HostFailureTestCase(test.TestCase):
instance, 'OS-EXT-SRV-ATTR:hypervisor_hostname'))
def _test_disable_compute_service(self, mock_enable_disable):
task = host_failure.DisableComputeServiceTask(self.novaclient)
task.execute(self.ctxt, self.instance_host)
task = host_failure.DisableComputeServiceTask(self.ctxt,
self.novaclient)
task.execute(self.instance_host)
mock_enable_disable.assert_called_once_with(
self.ctxt, self.instance_host)
def _test_instance_list(self, instances_evacuation_count):
task = host_failure.PrepareHAEnabledInstancesTask(self.novaclient)
instance_list = task.execute(self.ctxt, self.instance_host)
for instance in instance_list['instance_list']:
task = host_failure.PrepareHAEnabledInstancesTask(self.ctxt,
self.novaclient)
instances = task.execute(self.instance_host)
instance_uuid_list = []
for instance_id in instances['instance_list']:
instance = self.novaclient.get_server(self.ctxt, instance_id)
if CONF.host_failure.ignore_instances_in_error_state:
self.assertNotEqual("error",
getattr(instance, "OS-EXT-STS:vm_state"))
if not CONF.host_failure.evacuate_all_instances:
self.assertTrue(instance.metadata.get('HA_Enabled', False))
self.assertEqual(instances_evacuation_count,
len(instance_list['instance_list']))
instance_uuid_list.append(instance.id)
return instance_list
self.assertEqual(instances_evacuation_count,
len(instances['instance_list']))
return {
"instance_list": instance_uuid_list,
}
def _evacuate_instances(self, instance_list, mock_enable_disable,
reserved_host=None):
task = host_failure.EvacuateInstancesTask(self.novaclient)
task = host_failure.EvacuateInstancesTask(self.ctxt, self.novaclient)
old_instance_list = copy.deepcopy(instance_list['instance_list'])
if reserved_host:
task.execute(self.ctxt, self.instance_host,
task.execute(self.instance_host,
instance_list['instance_list'],
reserved_host=reserved_host)
self.assertTrue(mock_enable_disable.called)
else:
task.execute(
self.ctxt, self.instance_host, instance_list['instance_list'])
self.instance_host, instance_list['instance_list'])
# make sure instance is active and has different host
self._verify_instance_evacuated(old_instance_list)
@mock.patch('masakari.compute.nova.novaclient')
@mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.'
'update_details')
def test_host_failure_flow_for_auto_recovery(
self, _mock_novaclient, mock_unlock, mock_lock,
self, _mock_notify, _mock_novaclient, mock_unlock, mock_lock,
mock_enable_disable):
_mock_novaclient.return_value = self.fake_client
self.override_config("evacuate_all_instances",
@ -143,9 +152,31 @@ class HostFailureTestCase(test.TestCase):
# execute EvacuateInstancesTask
self._evacuate_instances(instance_list, mock_enable_disable)
# verify progress details
_mock_notify.assert_has_calls([
mock.call("Disabling compute service on host: 'fake-host'"),
mock.call("Disabled compute service on host: 'fake-host'", 1.0),
mock.call('Preparing instances for evacuation'),
mock.call("Total instances running on failed host 'fake-host' is 2"
"", 0.3),
mock.call("Total HA Enabled instances count: '1'", 0.6),
mock.call("Total Non-HA Enabled instances count: '1'", 0.7),
mock.call("All instances (HA Enabled/Non-HA Enabled) should be "
"considered for evacuation. Total count is: '2'", 0.8),
mock.call("Instances to be evacuated are: '1,2'", 1.0),
mock.call("Start evacuation of instances from failed host "
"'fake-host', instance uuids are : '1,2'"),
mock.call("Evacuation of instance started : '1'", 0.5),
mock.call("Instance '1' evacuated successfully", 0.7),
mock.call("Evacuation of instance started : '2'", 0.5),
mock.call("Instance '2' evacuated successfully", 0.7),
])
@mock.patch('masakari.compute.nova.novaclient')
@mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.'
'update_details')
def test_host_failure_flow_for_reserved_host_recovery(
self, _mock_novaclient, mock_unlock, mock_lock,
self, _mock_notify, _mock_novaclient, mock_unlock, mock_lock,
mock_enable_disable):
_mock_novaclient.return_value = self.fake_client
self.override_config("evacuate_all_instances",
@ -177,12 +208,39 @@ class HostFailureTestCase(test.TestCase):
self.assertIn(reserved_host.name,
self.fake_client.aggregates.get('1').hosts)
# verify progress details
_mock_notify.assert_has_calls([
mock.call("Disabling compute service on host: 'fake-host'"),
mock.call("Disabled compute service on host: 'fake-host'", 1.0),
mock.call('Preparing instances for evacuation'),
mock.call("Total instances running on failed host 'fake-host' is 2"
"", 0.3),
mock.call("Total HA Enabled instances count: '1'", 0.6),
mock.call("Total Non-HA Enabled instances count: '1'", 0.7),
mock.call("All instances (HA Enabled/Non-HA Enabled) should be "
"considered for evacuation. Total count is: '2'", 0.8),
mock.call("Instances to be evacuated are: '1,2'", 1.0),
mock.call("Start evacuation of instances from failed host "
"'fake-host', instance uuids are : '1,2'"),
mock.call("Enabling reserved host: 'fake-reserved-host'", 0.1),
mock.call('Add host fake-reserved-host to aggregate fake_agg',
0.2),
mock.call('Added host fake-reserved-host to aggregate fake_agg',
0.3),
mock.call("Evacuation of instance started : '1'", 0.5),
mock.call("Instance '1' evacuated successfully", 0.7),
mock.call("Evacuation of instance started : '2'", 0.5),
mock.call("Instance '2' evacuated successfully", 0.7)
])
@mock.patch.object(nova.API, 'add_host_to_aggregate')
@mock.patch('masakari.compute.nova.novaclient')
@mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.'
'update_details')
@mock.patch('masakari.engine.drivers.taskflow.host_failure.LOG')
def test_host_failure_flow_ignores_conflict_error(
self, mock_log, _mock_novaclient, mock_add_host, mock_unlock,
mock_lock, mock_enable_disable):
self, mock_log, _mock_notify, _mock_novaclient, mock_add_host,
mock_unlock, mock_lock, mock_enable_disable):
_mock_novaclient.return_value = self.fake_client
mock_add_host.side_effect = exception.Conflict
self.override_config("add_reserved_host_to_aggregate",
@ -196,10 +254,10 @@ class HostFailureTestCase(test.TestCase):
self.fake_client.aggregates.create(id="1", name='fake_agg',
hosts=[self.instance_host,
reserved_host.name])
expected_msg_format = "Host '%(reserved_host)s' already has been " \
"added to aggregate '%(aggregate)s'."
expected_msg_params = {'aggregate': 'fake_agg',
'reserved_host': u'fake-reserved-host'}
expected_msg_format = ("Host '%(reserved_host)s' already has been "
"added to aggregate '%(aggregate)s'.") % {
'reserved_host': 'fake-reserved-host', 'aggregate': 'fake_agg'
}
# execute DisableComputeServiceTask
self._test_disable_compute_service(mock_enable_disable)
@ -215,15 +273,36 @@ class HostFailureTestCase(test.TestCase):
self.assertEqual(1, mock_save.call_count)
self.assertIn(reserved_host.name,
self.fake_client.aggregates.get('1').hosts)
mock_log.info.assert_any_call(
expected_msg_format, expected_msg_params)
mock_log.info.assert_any_call(expected_msg_format)
@ddt.data('active', 'rescued', 'paused', 'shelved', 'suspended',
'error', 'stopped', 'resized')
# verify progress details
_mock_notify.assert_has_calls([
mock.call("Disabling compute service on host: 'fake-host'"),
mock.call("Disabled compute service on host: 'fake-host'", 1.0),
mock.call('Preparing instances for evacuation'),
mock.call("Total instances running on failed host 'fake-host' is 1"
"", 0.3),
mock.call("Total HA Enabled instances count: '1'", 0.6),
mock.call("Instances to be evacuated are: '1'", 1.0),
mock.call("Start evacuation of instances from failed host "
"'fake-host', instance uuids are : '1'"),
mock.call("Enabling reserved host: 'fake-reserved-host'", 0.1),
mock.call('Add host fake-reserved-host to aggregate fake_agg',
0.2),
mock.call("Host 'fake-reserved-host' already has been added to "
"aggregate 'fake_agg'.", 1.0),
mock.call("Evacuation of instance started : '1'", 0.5),
mock.call("Instance '1' evacuated successfully", 0.7)
])
@ddt.data('rescued', 'paused', 'shelved', 'suspended',
'error', 'resized', 'active', 'resized', 'stopped')
@mock.patch('masakari.compute.nova.novaclient')
@mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.'
'update_details')
def test_host_failure_flow_all_instances(
self, vm_state, _mock_novaclient, mock_unlock, mock_lock,
mock_enable_disable):
self, vm_state, _mock_notify, _mock_novaclient, mock_unlock,
mock_lock, mock_enable_disable):
_mock_novaclient.return_value = self.fake_client
# create ha_enabled test data
@ -236,16 +315,33 @@ class HostFailureTestCase(test.TestCase):
vm_state=vm_state,
power_state=power_state,
ha_enabled=True)
instance_uuid_list = []
for instance in self.fake_client.servers.list():
instance_uuid_list.append(instance.id)
instance_list = {
"instance_list": self.fake_client.servers.list()
"instance_list": instance_uuid_list,
}
# execute EvacuateInstancesTask
self._evacuate_instances(instance_list, mock_enable_disable)
# verify progress details
_mock_notify.assert_has_calls([
mock.call("Start evacuation of instances from failed host "
"'fake-host', instance uuids are : '1,2'"),
mock.call("Evacuation of instance started : '1'", 0.5),
mock.call("Instance '1' evacuated successfully", 0.7),
mock.call("Evacuation of instance started : '2'", 0.5),
mock.call("Instance '2' evacuated successfully", 0.7)
])
@mock.patch('masakari.compute.nova.novaclient')
@mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.'
'update_details')
def test_host_failure_flow_ignore_error_instances(
self, _mock_novaclient, mock_unlock, mock_lock,
self, _mock_notify, _mock_novaclient, mock_unlock, mock_lock,
mock_enable_disable):
self.override_config("ignore_instances_in_error_state",
True, "host_failure")
@ -267,9 +363,29 @@ class HostFailureTestCase(test.TestCase):
# execute EvacuateInstancesTask
self._evacuate_instances(instance_list, mock_enable_disable)
# verify progress details
_mock_notify.assert_has_calls([
mock.call('Preparing instances for evacuation'),
mock.call("Total instances running on failed host 'fake-host' is 2"
"", 0.3),
mock.call("Ignoring recovery of HA_Enabled instance '1' as it is "
"in 'error' state.", 0.4),
mock.call("Total HA Enabled instances count: '1'", 0.6),
mock.call("Total Non-HA Enabled instances count: '0'", 0.7),
mock.call("All instances (HA Enabled/Non-HA Enabled) should be "
"considered for evacuation. Total count is: '1'", 0.8),
mock.call("Instances to be evacuated are: '2'", 1.0),
mock.call("Start evacuation of instances from failed host "
"'fake-host', instance uuids are : '2'"),
mock.call("Evacuation of instance started : '2'", 0.5),
mock.call("Instance '2' evacuated successfully", 0.7)
])
@mock.patch('masakari.compute.nova.novaclient')
@mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.'
'update_details')
def test_host_failure_flow_ignore_error_instances_raise_skip_host_recovery(
self, _mock_novaclient, mock_unlock, mock_lock,
self, _mock_notify, _mock_novaclient, mock_unlock, mock_lock,
mock_enable_disable):
self.override_config("ignore_instances_in_error_state",
True, "host_failure")
@ -283,13 +399,28 @@ class HostFailureTestCase(test.TestCase):
ha_enabled=True)
# execute PrepareHAEnabledInstancesTask
task = host_failure.PrepareHAEnabledInstancesTask(self.novaclient)
task = host_failure.PrepareHAEnabledInstancesTask(self.ctxt,
self.novaclient)
self.assertRaises(exception.SkipHostRecoveryException, task.execute,
self.ctxt, self.instance_host)
self.instance_host)
# verify progress details
_mock_notify.assert_has_calls([
mock.call('Preparing instances for evacuation'),
mock.call("Total instances running on failed host 'fake-host' is 1"
"", 0.3),
mock.call("Ignoring recovery of HA_Enabled instance '1' as it is "
"in 'error' state.", 0.4),
mock.call("Total HA Enabled instances count: '0'", 0.6),
mock.call("Skipped host 'fake-host' recovery as no instances needs"
" to be evacuated", 1.0)
])
@mock.patch('masakari.compute.nova.novaclient')
@mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.'
'update_details')
def test_host_failure_flow_all_instances_active_resized_instance(
self, _mock_novaclient, mock_unlock, mock_lock,
self, _mock_notify, _mock_novaclient, mock_unlock, mock_lock,
mock_enable_disable):
_mock_novaclient.return_value = self.fake_client
@ -300,16 +431,33 @@ class HostFailureTestCase(test.TestCase):
self.fake_client.servers.create(id="2", host=self.instance_host,
vm_state='resized',
ha_enabled=True)
instance_uuid_list = []
for instance in self.fake_client.servers.list():
instance_uuid_list.append(instance.id)
instance_list = {
"instance_list": self.fake_client.servers.list()
"instance_list": instance_uuid_list,
}
# execute EvacuateInstancesTask
self._evacuate_instances(instance_list, mock_enable_disable)
self._evacuate_instances(instance_list,
mock_enable_disable)
# verify progress details
_mock_notify.assert_has_calls([
mock.call("Start evacuation of instances from failed host "
"'fake-host', instance uuids are : '1,2'"),
mock.call("Evacuation of instance started : '1'", 0.5),
mock.call("Instance '1' evacuated successfully", 0.7),
mock.call("Evacuation of instance started : '2'", 0.5),
mock.call("Instance '2' evacuated successfully", 0.7),
])
@mock.patch('masakari.compute.nova.novaclient')
@mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.'
'update_details')
def test_host_failure_flow_no_ha_enabled_instances(
self, _mock_novaclient, mock_unlock, mock_lock,
self, _mock_notify, _mock_novaclient, mock_unlock, mock_lock,
mock_enable_disable):
_mock_novaclient.return_value = self.fake_client
@ -321,22 +469,45 @@ class HostFailureTestCase(test.TestCase):
self._test_disable_compute_service(mock_enable_disable)
# execute PrepareHAEnabledInstancesTask
task = host_failure.PrepareHAEnabledInstancesTask(self.novaclient)
task = host_failure.PrepareHAEnabledInstancesTask(self.ctxt,
self.novaclient)
self.assertRaises(exception.SkipHostRecoveryException, task.execute,
self.ctxt, self.instance_host)
self.instance_host)
# verify progress details
_mock_notify.assert_has_calls([
mock.call("Disabling compute service on host: 'fake-host'"),
mock.call("Disabled compute service on host: 'fake-host'", 1.0),
mock.call('Preparing instances for evacuation'),
mock.call("Total instances running on failed host 'fake-host' is 2"
"", 0.3),
mock.call("Total HA Enabled instances count: '0'", 0.6),
mock.call("Skipped host 'fake-host' recovery as no instances needs"
" to be evacuated", 1.0)
])
@mock.patch('masakari.compute.nova.novaclient')
@mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.'
'update_details')
def test_host_failure_flow_evacuation_failed(
self, _mock_novaclient, mock_unlock, mock_lock,
self, _mock_notify, _mock_novaclient, mock_unlock, mock_lock,
mock_enable_disable):
# overriding 'wait_period_after_power_off' to 2 seconds to reduce the
# wait period, default is 180 seconds.
self.override_config("wait_period_after_power_off", 2)
_mock_novaclient.return_value = self.fake_client
# create ha_enabled test data
server = self.fake_client.servers.create(id="1", vm_state='active',
host=self.instance_host,
ha_enabled=True)
instance_uuid_list = []
for instance in self.fake_client.servers.list():
instance_uuid_list.append(instance.id)
instance_list = {
"instance_list": self.fake_client.servers.list()
"instance_list": instance_uuid_list,
}
def fake_get_server(context, host):
@ -351,9 +522,23 @@ class HostFailureTestCase(test.TestCase):
exception.HostRecoveryFailureException,
self._evacuate_instances, instance_list, mock_enable_disable)
# verify progress details
_mock_notify.assert_has_calls([
mock.call("Start evacuation of instances from failed host "
"'fake-host', instance uuids are : '1'"),
mock.call("Evacuation of instance started : '1'", 0.5),
mock.call("Instance '1' evacuated successfully", 0.7),
mock.call("Instance '1' is successfully evacuated but failed to "
"stop.", 1.0),
mock.call("Failed to evacuate instances '1' from host "
"'fake-host'", 1.0)
])
@mock.patch('masakari.compute.nova.novaclient')
@mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.'
'update_details')
def test_host_failure_flow_no_instances_on_host(
self, _mock_novaclient, mock_unlock, mock_lock,
self, _mock_notify, _mock_novaclient, mock_unlock, mock_lock,
mock_enable_disable):
_mock_novaclient.return_value = self.fake_client
self.override_config("evacuate_all_instances",
@ -363,13 +548,31 @@ class HostFailureTestCase(test.TestCase):
self._test_disable_compute_service(mock_enable_disable)
# execute PrepareHAEnabledInstancesTask
task = host_failure.PrepareHAEnabledInstancesTask(self.novaclient)
task = host_failure.PrepareHAEnabledInstancesTask(self.ctxt,
self.novaclient)
self.assertRaises(exception.SkipHostRecoveryException, task.execute,
self.ctxt, self.instance_host)
self.instance_host)
# verify progress details
_mock_notify.assert_has_calls([
mock.call("Disabling compute service on host: 'fake-host'"),
mock.call("Disabled compute service on host: 'fake-host'", 1.0),
mock.call('Preparing instances for evacuation'),
mock.call("Total instances running on failed host 'fake-host' is 0"
"", 0.3),
mock.call("Total HA Enabled instances count: '0'", 0.6),
mock.call("Total Non-HA Enabled instances count: '0'", 0.7),
mock.call("All instances (HA Enabled/Non-HA Enabled) should be "
"considered for evacuation. Total count is: '0'", 0.8),
mock.call("Skipped host 'fake-host' recovery as no instances needs"
" to be evacuated", 1.0)
])
@mock.patch('masakari.compute.nova.novaclient')
@mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.'
'update_details')
def test_host_failure_flow_for_task_state_not_none(
self, _mock_novaclient, mock_unlock, mock_lock,
self, _mock_notify, _mock_novaclient, mock_unlock, mock_lock,
mock_enable_disable):
_mock_novaclient.return_value = self.fake_client
@ -389,8 +592,12 @@ class HostFailureTestCase(test.TestCase):
task_state='fake_task_state',
power_state=None,
ha_enabled=True)
instance_uuid_list = []
for instance in self.fake_client.servers.list():
instance_uuid_list.append(instance.id)
instance_list = {
"instance_list": self.fake_client.servers.list()
"instance_list": instance_uuid_list,
}
# execute EvacuateInstancesTask
@ -405,3 +612,15 @@ class HostFailureTestCase(test.TestCase):
self.fake_client.servers.reset_state_calls)
self.assertEqual(stop_calls,
self.fake_client.servers.stop_calls)
# verify progress details
_mock_notify.assert_has_calls([
mock.call("Start evacuation of instances from failed host "
"'fake-host', instance uuids are : '1,2,3'"),
mock.call("Evacuation of instance started : '1'", 0.5),
mock.call("Instance '1' evacuated successfully", 0.7),
mock.call("Evacuation of instance started : '2'", 0.5),
mock.call("Instance '2' evacuated successfully", 0.7),
mock.call("Evacuation of instance started : '3'", 0.5),
mock.call("Instance '3' evacuated successfully", 0.7)
])

View File

@ -44,23 +44,26 @@ class InstanceFailureTestCase(test.TestCase):
False, "instance_failure")
def _test_stop_instance(self):
task = instance_failure.StopInstanceTask(self.novaclient)
task.execute(self.ctxt, self.instance_id)
task = instance_failure.StopInstanceTask(self.ctxt, self.novaclient)
task.execute(self.instance_id)
# verify instance is stopped
instance = self.novaclient.get_server(self.ctxt, self.instance_id)
self.assertEqual('stopped',
getattr(instance, 'OS-EXT-STS:vm_state'))
def _test_confirm_instance_is_active(self):
task = instance_failure.ConfirmInstanceActiveTask(self.novaclient)
task.execute(self.ctxt, self.instance_id)
task = instance_failure.ConfirmInstanceActiveTask(self.ctxt,
self.novaclient)
task.execute(self.instance_id)
# verify instance is in active state
instance = self.novaclient.get_server(self.ctxt, self.instance_id)
self.assertEqual('active',
getattr(instance, 'OS-EXT-STS:vm_state'))
@mock.patch('masakari.compute.nova.novaclient')
def test_instance_failure_flow(self, _mock_novaclient):
@mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.'
'update_details')
def test_instance_failure_flow(self, _mock_notify, _mock_novaclient):
_mock_novaclient.return_value = self.fake_client
# create test data
@ -72,14 +75,29 @@ class InstanceFailureTestCase(test.TestCase):
self._test_stop_instance()
# test StartInstanceTask
task = instance_failure.StartInstanceTask(self.novaclient)
task.execute(self.ctxt, self.instance_id)
task = instance_failure.StartInstanceTask(self.ctxt, self.novaclient)
task.execute(self.instance_id)
# test ConfirmInstanceActiveTask
self._test_confirm_instance_is_active()
# verify progress details
_mock_notify.assert_has_calls([
mock.call('Stopping instance: ' + self.instance_id),
mock.call("Stopped instance: '" + self.instance_id + "'", 1.0),
mock.call("Starting instance: '" + self.instance_id + "'"),
mock.call("Instance started: '" + self.instance_id + "'", 1.0),
mock.call("Confirming instance '" + self.instance_id +
"' vm_state is ACTIVE"),
mock.call("Confirmed instance '" + self.instance_id +
"' vm_state is ACTIVE", 1.0)
])
@mock.patch('masakari.compute.nova.novaclient')
def test_instance_failure_flow_resized_instance(self, _mock_novaclient):
@mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.'
'update_details')
def test_instance_failure_flow_resized_instance(self, _mock_notify,
_mock_novaclient):
_mock_novaclient.return_value = self.fake_client
# create test data
@ -91,14 +109,29 @@ class InstanceFailureTestCase(test.TestCase):
self._test_stop_instance()
# test StartInstanceTask
task = instance_failure.StartInstanceTask(self.novaclient)
task.execute(self.ctxt, self.instance_id)
task = instance_failure.StartInstanceTask(self.ctxt, self.novaclient)
task.execute(self.instance_id)
# test ConfirmInstanceActiveTask
self._test_confirm_instance_is_active()
# verify progress details
_mock_notify.assert_has_calls([
mock.call('Stopping instance: ' + self.instance_id),
mock.call("Stopped instance: '" + self.instance_id + "'", 1.0),
mock.call("Starting instance: '" + self.instance_id + "'"),
mock.call("Instance started: '" + self.instance_id + "'", 1.0),
mock.call("Confirming instance '" + self.instance_id +
"' vm_state is ACTIVE"),
mock.call("Confirmed instance '" + self.instance_id +
"' vm_state is ACTIVE", 1.0)
])
@mock.patch('masakari.compute.nova.novaclient')
def test_instance_failure_flow_stop_failed(self, _mock_novaclient):
@mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.'
'update_details')
def test_instance_failure_flow_stop_failed(self, _mock_notify,
_mock_novaclient):
_mock_novaclient.return_value = self.fake_client
# create test data
@ -112,29 +145,70 @@ class InstanceFailureTestCase(test.TestCase):
return server
# test StopInstanceTask
task = instance_failure.StopInstanceTask(self.novaclient)
task = instance_failure.StopInstanceTask(self.ctxt, self.novaclient)
with mock.patch.object(self.novaclient, 'stop_server',
fake_stop_server):
self.assertRaises(
exception.InstanceRecoveryFailureException, task.execute,
self.ctxt, self.instance_id)
self.instance_id)
# verify progress details
_mock_notify.assert_has_calls([
mock.call('Stopping instance: ' + self.instance_id),
mock.call('Failed to stop instance ' + self.instance_id, 1.0)
])
@mock.patch('masakari.compute.nova.novaclient')
def test_instance_failure_flow_not_ha_enabled(self, _mock_novaclient):
@mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.'
'update_details')
def test_instance_failure_flow_not_ha_enabled(self, _mock_notify,
_mock_novaclient):
_mock_novaclient.return_value = self.fake_client
# create test data
self.fake_client.servers.create(self.instance_id, host="fake-host")
# test StopInstanceTask
task = instance_failure.StopInstanceTask(self.novaclient)
task = instance_failure.StopInstanceTask(self.ctxt, self.novaclient)
self.assertRaises(
exception.SkipInstanceRecoveryException, task.execute,
self.ctxt, self.instance_id)
self.instance_id)
# verify progress details
_mock_notify.assert_has_calls([
mock.call('Skipping recovery for instance: ' + self.instance_id +
' as it is not Ha_Enabled', 1.0)
])
@mock.patch('masakari.compute.nova.novaclient')
@mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.'
'update_details')
def test_instance_failure_flow_vm_in_paused_state(self, _mock_notify,
_mock_novaclient):
_mock_novaclient.return_value = self.fake_client
# create test data
self.fake_client.servers.create(self.instance_id,
host="fake-host", ha_enabled=True,
vm_state="paused")
# test StopInstanceTask
task = instance_failure.StopInstanceTask(self.ctxt, self.novaclient)
self.assertRaises(
exception.IgnoreInstanceRecoveryException, task.execute,
self.instance_id)
# verify progress details
_mock_notify.assert_has_calls([
mock.call("Recovery of instance '" + self.instance_id +
"' is ignored as it is in 'paused' state.", 1.0)
])
@mock.patch('masakari.compute.nova.novaclient')
@mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.'
'update_details')
def test_instance_failure_flow_not_ha_enabled_but_conf_option_is_set(
self, _mock_novaclient):
self, _mock_notify, _mock_novaclient):
# Setting this config option to True indicates masakari has to recover
# the instance irrespective of whether it is HA_Enabled or not.
self.override_config("process_all_instances",
@ -149,14 +223,29 @@ class InstanceFailureTestCase(test.TestCase):
self._test_stop_instance()
# test StartInstanceTask
task = instance_failure.StartInstanceTask(self.novaclient)
task.execute(self.ctxt, self.instance_id)
task = instance_failure.StartInstanceTask(self.ctxt, self.novaclient)
task.execute(self.instance_id)
# test ConfirmInstanceActiveTask
self._test_confirm_instance_is_active()
# verify progress details
_mock_notify.assert_has_calls([
mock.call('Stopping instance: ' + self.instance_id),
mock.call("Stopped instance: '" + self.instance_id + "'", 1.0),
mock.call("Starting instance: '" + self.instance_id + "'"),
mock.call("Instance started: '" + self.instance_id + "'", 1.0),
mock.call("Confirming instance '" + self.instance_id +
"' vm_state is ACTIVE"),
mock.call("Confirmed instance '" + self.instance_id +
"' vm_state is ACTIVE", 1.0)
])
@mock.patch('masakari.compute.nova.novaclient')
def test_instance_failure_flow_start_failed(self, _mock_novaclient):
@mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.'
'update_details')
def test_instance_failure_flow_start_failed(self, _mock_notify,
_mock_novaclient):
_mock_novaclient.return_value = self.fake_client
# create test data
@ -173,13 +262,25 @@ class InstanceFailureTestCase(test.TestCase):
return server
# test StartInstanceTask
task = instance_failure.StartInstanceTask(self.novaclient)
task = instance_failure.StartInstanceTask(self.ctxt, self.novaclient)
with mock.patch.object(self.novaclient, 'start_server',
fake_start_server):
task.execute(self.ctxt, self.instance_id)
task.execute(self.instance_id)
# test ConfirmInstanceActiveTask
task = instance_failure.ConfirmInstanceActiveTask(self.novaclient)
task = instance_failure.ConfirmInstanceActiveTask(self.ctxt,
self.novaclient)
self.assertRaises(
exception.InstanceRecoveryFailureException, task.execute,
self.ctxt, self.instance_id)
self.instance_id)
# verify progress details
_mock_notify.assert_has_calls([
mock.call('Stopping instance: ' + self.instance_id),
mock.call("Stopped instance: '" + self.instance_id + "'", 1.0),
mock.call("Starting instance: '" + self.instance_id + "'"),
mock.call("Instance started: '" + self.instance_id + "'", 1.0),
mock.call("Confirming instance '" + self.instance_id +
"' vm_state is ACTIVE"),
mock.call('Failed to start instance 1', 1.0)
])

View File

@ -41,7 +41,10 @@ class ProcessFailureTestCase(test.TestCase):
self.override_config('wait_period_after_service_update', 2)
@mock.patch('masakari.compute.nova.novaclient')
def test_compute_process_failure_flow(self, _mock_novaclient):
@mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.'
'update_details')
def test_compute_process_failure_flow(self, _mock_notify,
_mock_novaclient):
_mock_novaclient.return_value = self.fake_client
# create test data
@ -50,20 +53,35 @@ class ProcessFailureTestCase(test.TestCase):
status="enabled")
# test DisableComputeNodeTask
task = process_failure.DisableComputeNodeTask(self.novaclient)
task.execute(self.ctxt, self.process_name, self.service_host)
task = process_failure.DisableComputeNodeTask(self.ctxt,
self.novaclient)
task.execute(self.process_name, self.service_host)
# test ConfirmComputeNodeDisabledTask
task = process_failure.ConfirmComputeNodeDisabledTask(self.novaclient)
task.execute(self.ctxt, self.process_name, self.service_host)
task = process_failure.ConfirmComputeNodeDisabledTask(self.ctxt,
self.novaclient)
task.execute(self.process_name, self.service_host)
# verify service is disabled
self.assertTrue(self.novaclient.is_service_down(self.ctxt,
self.service_host,
self.process_name))
# verify progress details
_mock_notify.assert_has_calls([
mock.call("Disabling compute service on host: 'fake-host'"),
mock.call("Disabled compute service on host: 'fake-host'", 1.0),
mock.call("Confirming compute service is disabled on host: "
"'fake-host'"),
mock.call("Confirmed compute service is disabled on host: "
"'fake-host'", 1.0)
])
@mock.patch('masakari.compute.nova.novaclient')
@mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.'
'update_details')
def test_compute_process_failure_flow_disabled_process(self,
_mock_notify,
_mock_novaclient):
_mock_novaclient.return_value = self.fake_client
@ -73,18 +91,28 @@ class ProcessFailureTestCase(test.TestCase):
status="disabled")
# test DisableComputeNodeTask
task = process_failure.DisableComputeNodeTask(self.novaclient)
task = process_failure.DisableComputeNodeTask(self.ctxt,
self.novaclient)
with mock.patch.object(
self.novaclient,
'enable_disable_service') as mock_enable_disabled:
task.execute(self.ctxt, self.process_name, self.service_host)
task.execute(self.process_name, self.service_host)
# ensure that enable_disable_service method is not called
self.assertEqual(0, mock_enable_disabled.call_count)
# verify progress details
_mock_notify.assert_has_calls([
mock.call("Disabling compute service on host: 'fake-host'"),
mock.call('Skipping recovery for process nova-compute as it is '
'already disabled', 1.0)
])
@mock.patch('masakari.compute.nova.novaclient')
@mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.'
'update_details')
def test_compute_process_failure_flow_compute_service_disabled_failed(
self, _mock_novaclient):
self, _mock_notify, _mock_novaclient):
_mock_novaclient.return_value = self.fake_client
# create test data
@ -97,14 +125,24 @@ class ProcessFailureTestCase(test.TestCase):
return False
# test DisableComputeNodeTask
task = process_failure.DisableComputeNodeTask(self.novaclient)
task.execute(self.ctxt, self.process_name, self.service_host)
task = process_failure.DisableComputeNodeTask(self.ctxt,
self.novaclient)
task.execute(self.process_name, self.service_host)
with mock.patch.object(self.novaclient, 'is_service_down',
fake_is_service_down):
# test ConfirmComputeNodeDisabledTask
task = process_failure.ConfirmComputeNodeDisabledTask(
self.novaclient)
self.ctxt, self.novaclient)
self.assertRaises(exception.ProcessRecoveryFailureException,
task.execute, self.ctxt, self.process_name,
task.execute, self.process_name,
self.service_host)
# verify progress details
_mock_notify.assert_has_calls([
mock.call("Disabling compute service on host: 'fake-host'"),
mock.call("Disabled compute service on host: 'fake-host'", 1.0),
mock.call("Confirming compute service is disabled on host: "
"'fake-host'"),
mock.call('Failed to disable service nova-compute', 1.0)
])

View File

@ -16,6 +16,7 @@ import mock
from oslo_utils import importutils
from oslo_utils import timeutils
from masakari.compute import nova
import masakari.conf
from masakari import context
from masakari.engine import utils as engine_utils
@ -760,6 +761,8 @@ class EngineManagerUnitTestCase(test.NoDBTestCase):
def test_host_failure_custom_flow_for_auto_recovery(
self, _mock_log, _mock_task1, _mock_task2, _mock_task3,
_mock_novaclient, _mock_notification_get):
# For testing purpose setting BACKEND as memory
masakari.engine.drivers.taskflow.base.PERSISTENCE_BACKEND = 'memory://'
self.override_config(
"host_auto_failure_recovery_tasks",
{'pre': ['disable_compute_service_task', 'no_op'],
@ -777,6 +780,72 @@ class EngineManagerUnitTestCase(test.NoDBTestCase):
# is executed.
_mock_log.info.assert_called_with(expected_msg_format)
@mock.patch('masakari.compute.nova.novaclient')
@mock.patch.object(nova.API, "enable_disable_service")
@mock.patch('masakari.engine.drivers.taskflow.host_failure.'
'PrepareHAEnabledInstancesTask.execute')
@mock.patch('masakari.engine.drivers.taskflow.host_failure.'
'EvacuateInstancesTask.execute')
@mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.'
'update_details')
@mock.patch('masakari.engine.drivers.taskflow.host_failure.LOG')
def test_host_failure_flow_for_auto_recovery(self, _mock_log,
_mock_notify,
_mock_novaclient,
_mock_enable_disable,
_mock_task2, _mock_task3,
_mock_notification_get):
self.novaclient = nova.API()
self.fake_client = fakes.FakeNovaClient()
self.override_config("wait_period_after_evacuation", 2)
self.override_config("wait_period_after_service_update", 2)
self.override_config("evacuate_all_instances",
True, "host_failure")
_mock_novaclient.return_value = self.fake_client
# create test data
self.fake_client.servers.create(id="1", host="fake-host",
ha_enabled=True)
self.fake_client.servers.create(id="2", host="fake-host")
instance_uuid_list = []
for instance in self.fake_client.servers.list():
instance_uuid_list.append(instance.id)
instance_list = {
"instance_list": ','.join(instance_uuid_list),
}
_mock_task2.return_value = instance_list
# For testing purpose setting BACKEND as memory
masakari.engine.drivers.taskflow.base.PERSISTENCE_BACKEND = 'memory://'
self.engine.driver.execute_host_failure(
self.context, "fake-host",
fields.FailoverSegmentRecoveryMethod.AUTO,
uuidsentinel.fake_notification)
# make sure instance is active and has different host
for server in instance_uuid_list:
instance = self.novaclient.get_server(self.context, server)
if CONF.host_failure.ignore_instances_in_error_state and getattr(
instance, 'OS-EXT-STS:vm_state') == 'error':
self.assertEqual(
"fake-host", getattr(
instance, 'OS-EXT-SRV-ATTR:hypervisor_hostname'))
else:
self.assertNotEqual(
"fake-host", getattr(
instance, 'OS-EXT-SRV-ATTR:hypervisor_hostname'))
# verify progress details
_mock_notify.assert_has_calls([
mock.call("Disabling compute service on host: 'fake-host'"),
mock.call("Disabled compute service on host: 'fake-host'", 1.0)
])
@mock.patch('masakari.compute.nova.novaclient')
@mock.patch('masakari.engine.drivers.taskflow.host_failure.'
'DisableComputeServiceTask.execute')
@ -785,9 +854,13 @@ class EngineManagerUnitTestCase(test.NoDBTestCase):
@mock.patch('masakari.engine.drivers.taskflow.host_failure.'
'EvacuateInstancesTask.execute')
@mock.patch('masakari.engine.drivers.taskflow.no_op.LOG')
def test_host_failure_custom_flow_for_rh_recovery(
self, _mock_log, _mock_task1, _mock_task2, _mock_task3,
def test_host_failure_custom_flow_for_rh_recovery(self, _mock_log,
_mock_task1,
_mock_task2,
_mock_task3,
_mock_novaclient, _mock_notification_get):
# For testing purpose setting BACKEND as memory
masakari.engine.drivers.taskflow.base.PERSISTENCE_BACKEND = 'memory://'
self.override_config(
"host_rh_failure_recovery_tasks",
{'pre': ['disable_compute_service_task'],
@ -806,6 +879,65 @@ class EngineManagerUnitTestCase(test.NoDBTestCase):
# is executed.
_mock_log.info.assert_called_with(expected_msg_format)
@mock.patch('masakari.compute.nova.novaclient')
@mock.patch.object(nova.API, "enable_disable_service")
@mock.patch('masakari.engine.drivers.taskflow.host_failure.'
'PrepareHAEnabledInstancesTask.execute')
@mock.patch('masakari.engine.drivers.taskflow.host_failure.'
'EvacuateInstancesTask.execute')
@mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.'
'update_details')
@mock.patch('masakari.engine.drivers.taskflow.host_failure.LOG')
def test_host_failure_flow_for_rh_recovery(self, _mock_log, _mock_notify,
_mock_novaclient,
_mock_enable_disable,
_mock_task2, _mock_task3,
_mock_notification_get):
self.novaclient = nova.API()
self.fake_client = fakes.FakeNovaClient()
self.override_config("wait_period_after_evacuation", 2)
self.override_config("wait_period_after_service_update", 2)
self.override_config("evacuate_all_instances",
True, "host_failure")
_mock_novaclient.return_value = self.fake_client
# create test data
self.fake_client.servers.create(id="1", host="fake-host",
ha_enabled=True)
self.fake_client.servers.create(id="2", host="fake-host")
instance_uuid_list = []
for instance in self.fake_client.servers.list():
instance_uuid_list.append(instance.id)
instance_list = {
"instance_list": ','.join(instance_uuid_list),
}
_mock_task2.return_value = instance_list
# For testing purpose setting BACKEND as memory
masakari.engine.drivers.taskflow.base.PERSISTENCE_BACKEND = 'memory://'
self.engine.driver.execute_host_failure(
self.context, "fake-host",
fields.FailoverSegmentRecoveryMethod.RESERVED_HOST,
uuidsentinel.fake_notification,
reserved_host_list=['host-1', 'host-2'])
# make sure instance is active and has different host
for server in instance_uuid_list:
instance = self.novaclient.get_server(self.context, server)
self.assertNotEqual(
"fake-host", getattr(
instance, 'OS-EXT-SRV-ATTR:hypervisor_hostname'))
# verify progress details
_mock_notify.assert_has_calls([
mock.call("Disabling compute service on host: 'fake-host'"),
mock.call("Disabled compute service on host: 'fake-host'", 1.0)
])
@mock.patch('masakari.compute.nova.novaclient')
@mock.patch('masakari.engine.drivers.taskflow.instance_failure.'
'StopInstanceTask.execute')
@ -817,6 +949,8 @@ class EngineManagerUnitTestCase(test.NoDBTestCase):
def test_instance_failure_custom_flow_recovery(
self, _mock_log, _mock_task1, _mock_task2, _mock_task3,
_mock_novaclient, _mock_notification_get):
# For testing purpose setting BACKEND as memory
masakari.engine.drivers.taskflow.base.PERSISTENCE_BACKEND = 'memory://'
self.override_config(
"instance_failure_recovery_tasks",
{'pre': ['stop_instance_task', 'no_op'],
@ -833,6 +967,49 @@ class EngineManagerUnitTestCase(test.NoDBTestCase):
# is executed.
_mock_log.info.assert_called_with(expected_msg_format)
@mock.patch('masakari.compute.nova.novaclient')
@mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.'
'update_details')
@mock.patch('masakari.engine.drivers.taskflow.instance_failure.LOG')
def test_instance_failure_flow_recovery(self, _mock_log, _mock_notify,
_mock_novaclient,
_mock_notification_get):
self.novaclient = nova.API()
self.fake_client = fakes.FakeNovaClient()
self.override_config('wait_period_after_power_off', 2)
self.override_config('wait_period_after_power_on', 2)
instance_id = uuidsentinel.fake_ins
_mock_novaclient.return_value = self.fake_client
# create test data
self.fake_client.servers.create(instance_id,
host="fake-host",
ha_enabled=True)
# For testing purpose setting BACKEND as memory
masakari.engine.drivers.taskflow.base.PERSISTENCE_BACKEND = 'memory://'
self.engine.driver.execute_instance_failure(
self.context, instance_id,
uuidsentinel.fake_notification)
# verify instance is in active state
instance = self.novaclient.get_server(self.context, instance_id)
self.assertEqual('active',
getattr(instance, 'OS-EXT-STS:vm_state'))
_mock_notify.assert_has_calls([
mock.call('Stopping instance: ' + instance_id),
mock.call("Stopped instance: '" + instance_id + "'", 1.0),
mock.call("Starting instance: '" + instance_id + "'"),
mock.call("Instance started: '" + instance_id + "'", 1.0),
mock.call("Confirming instance '" + instance_id +
"' vm_state is ACTIVE"),
mock.call("Confirmed instance '" + instance_id +
"' vm_state is ACTIVE", 1.0)
])
@mock.patch('masakari.compute.nova.novaclient')
@mock.patch('masakari.engine.drivers.taskflow.process_failure.'
'DisableComputeNodeTask.execute')
@ -842,6 +1019,8 @@ class EngineManagerUnitTestCase(test.NoDBTestCase):
def test_process_failure_custom_flow_recovery(
self, _mock_log, _mock_task1, _mock_task2, _mock_novaclient,
_mock_notification_get):
# For testing purpose setting BACKEND as memory
masakari.engine.drivers.taskflow.base.PERSISTENCE_BACKEND = 'memory://'
self.override_config(
"process_failure_recovery_tasks",
{'pre': ['disable_compute_node_task', 'no_op'],
@ -853,8 +1032,69 @@ class EngineManagerUnitTestCase(test.NoDBTestCase):
self.engine.driver.execute_process_failure(
self.context, 'nova-compute', 'fake_host',
uuidsentinel.fake_notification)
uuidsentinel.fake_notification, )
_mock_log.info.assert_any_call(expected_msg_format)
# Ensure custom_task added to the 'process_failure_recovery_tasks'
# is executed.
_mock_log.info.assert_called_with(expected_msg_format)
@mock.patch('masakari.compute.nova.novaclient')
@mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.'
'update_details')
@mock.patch('masakari.engine.drivers.taskflow.process_failure.LOG')
def test_process_failure_flow_recovery(self, _mock_log, _mock_notify,
_mock_novaclient,
_mock_notification_get):
self.novaclient = nova.API()
self.fake_client = fakes.FakeNovaClient()
_mock_novaclient.return_value = self.fake_client
# create test data
self.fake_client.services.create("1", host="fake-host",
binary="nova-compute",
status="enabled")
# For testing purpose setting BACKEND as memory
masakari.engine.drivers.taskflow.base.PERSISTENCE_BACKEND = 'memory://'
self.engine.driver.execute_process_failure(
self.context, "nova-compute", "fake-host",
uuidsentinel.fake_notification)
# verify service is disabled
self.assertTrue(self.novaclient.is_service_down(self.context,
"fake-host",
"nova-compute"))
# verify progress details
_mock_notify.assert_has_calls([
mock.call("Disabling compute service on host: 'fake-host'"),
mock.call("Disabled compute service on host: 'fake-host'", 1.0),
mock.call("Confirming compute service is disabled on host: "
"'fake-host'"),
mock.call("Confirmed compute service is disabled on host: "
"'fake-host'", 1.0)
])
@mock.patch.object(notification_obj.Notification, "save")
@mock.patch('masakari.engine.drivers.taskflow.driver.TaskFlowDriver.'
'get_notification_recovery_workflow_details')
def test_get_notification_recovery_workflow_details(self,
mock_progress_details,
mock_save,
mock_notification_get):
notification = fakes.create_fake_notification(
type="VM", id=1, payload={
'event': 'fake_event', 'instance_uuid': uuidsentinel.fake_ins,
'vir_domain_event': 'fake_vir_domain_event'
},
source_host_uuid=uuidsentinel.fake_host,
generated_time=NOW, status="new",
notification_uuid=uuidsentinel.fake_notification,)
mock_notification_get.return_value = notification
self.engine.driver.get_notification_recovery_workflow_details(
self.context, notification)
mock_progress_details.assert_called_once_with(
self.context, notification)

View File

@ -86,3 +86,11 @@ class EngineRpcAPITestCase(test.TestCase):
rpc_method='cast',
notification=self.fake_notification_obj,
version='1.0')
@mock.patch("masakari.rpc.get_client")
def test_get_notification_recovery_workflow_details(self,
mock_get_client):
self._test_engine_api('get_notification_recovery_workflow_details',
rpc_method='call',
notification=self.fake_notification_obj,
version='1.1')

View File

@ -28,6 +28,7 @@ from masakari.tests.unit.objects import test_objects
from masakari.tests import uuidsentinel
NOW = timeutils.utcnow().replace(microsecond=0)
OPTIONAL = ['recovery_workflow_details']
def _fake_db_notification(**kwargs):
@ -86,7 +87,8 @@ class TestNotificationObject(test_objects._LocalTest):
if db_exception:
self.assertIsNone(obj)
self.compare_obj(obj, fake_object_notification)
self.compare_obj(obj, fake_object_notification,
allow_missing=OPTIONAL)
def test_get_by_id(self):
self._test_query('notification_get_by_id', 'get_by_id', 123)
@ -116,7 +118,8 @@ class TestNotificationObject(test_objects._LocalTest):
notification_obj = self._notification_create_attributes()
notification_obj.create()
self.compare_obj(notification_obj, fake_object_notification)
self.compare_obj(notification_obj, fake_object_notification,
allow_missing=OPTIONAL)
mock_db_create.assert_called_once_with(self.context, {
'source_host_uuid': uuidsentinel.fake_host,
'notification_uuid': uuidsentinel.fake_notification,
@ -169,7 +172,8 @@ class TestNotificationObject(test_objects._LocalTest):
notification_obj.create()
self.compare_obj(notification_obj, fake_object_notification)
self.compare_obj(notification_obj, fake_object_notification,
allow_missing=OPTIONAL)
mock_db_create.assert_called_once_with(self.context, {
'source_host_uuid': uuidsentinel.fake_host,
'notification_uuid': uuidsentinel.fake_notification,
@ -247,7 +251,8 @@ class TestNotificationObject(test_objects._LocalTest):
notification_obj.id = 123
notification_obj.save()
self.compare_obj(notification_obj, fake_object_notification)
self.compare_obj(notification_obj, fake_object_notification,
allow_missing=OPTIONAL)
(mock_notification_update.
assert_called_once_with(self.context, uuidsentinel.fake_notification,
{'source_host_uuid': uuidsentinel.fake_host,

View File

@ -657,7 +657,8 @@ object_data = {
'FailoverSegmentList': '1.0-dfc5c6f5704d24dcaa37b0bbb03cbe60',
'Host': '1.1-3fc4d548fa220c76906426095e5971fc',
'HostList': '1.0-25ebe1b17fbd9f114fae8b6a10d198c0',
'Notification': '1.0-eedfa3c203c100897021bd23f0ddf68c',
'Notification': '1.1-91e3a051078e35300e325a3e2ae5fde5',
'NotificationProgressDetails': '1.0-fc611ac932b719fbc154dbe34bb8edee',
'NotificationList': '1.0-25ebe1b17fbd9f114fae8b6a10d198c0',
'EventType': '1.0-d1d2010a7391fa109f0868d964152607',
'ExceptionNotification': '1.0-1187e93f564c5cca692db76a66cda2a6',

View File

@ -0,0 +1,29 @@
---
features:
- |
Added support to record the recovery workflow details of the notification
which will be returned in a new microversion 1.1 in
`GET /notifications/{notification_id}` API.
For example, GET /notifications/<notification_uuid> response will contain
`recovery_workflow_details` parameter as shown here `notification_details`_
Added a new config section in Masakari conf file for configuring the back
end to be used by taskflow driver::
[taskflow]
# The back end for storing recovery_workflow details of the notification.
# (string value)
connection = mysql+pymysql://root:admin@127.0.0.1/<db name>?charset=utf8
# Where db_name, can be a new database or you can also specify masakari
# database.
Operator should run `masakari-manage db sync` command to add new db tables
required for storing recovery_workflow_details.
Note: When you run `masakari-manage db sync`, make sure you have
`notification_driver=taskflow_driver` set in masakari.conf.
.. _`notification_details`: https://developer.openstack.org/api-ref/instance-ha/?expanded=show-notification-details-detail#show-notification-details

View File

@ -25,4 +25,5 @@ pbr!=2.1.0,>=2.0.0 # Apache-2.0
python-novaclient>=9.1.0 # Apache-2.0
six>=1.10.0 # MIT
stevedore>=1.20.0 # Apache-2.0
SQLAlchemy-Utils>=0.33.10 # Apache-2.0
taskflow>=2.16.0 # Apache-2.0

View File

@ -16,6 +16,7 @@ os-api-ref>=1.4.0 # Apache-2.0
oslosphinx>=4.7.0 # Apache-2.0
oslotest>=3.2.0 # Apache-2.0
stestr>=1.0.0 # Apache-2.0
SQLAlchemy-Utils>=0.33.10 # Apache-2.0
requests-mock>=1.2.0 # Apache-2.0
testresources>=2.0.0 # Apache-2.0/BSD
testscenarios>=0.4 # Apache-2.0/BSD