From 7321ee32fdc47c408a13120b1d76f11859190b2e Mon Sep 17 00:00:00 2001 From: "shilpa.devharakar" Date: Thu, 31 Jan 2019 08:20:14 +0000 Subject: [PATCH] Add progress details for recovery workflows This patch uses taskflow's persistence feature to store recovery workflows details into database. Added a new microversion to return progress_details of the notification in `GET /notifications/` API. APIImpact: GET /notifications/ API includes details of recovery workflow Change-Id: I93c1b7d88823e02d9a02855cabb8b22c9e40a7d5 Implements: bp progress-details-recovery-workflows --- api-ref/source/notifications.inc | 1 + api-ref/source/parameters.yaml | 8 + .../notifications/notification-get-resp.json | 55 ++- lower-constraints.txt | 1 + masakari/api/api_version_request.py | 3 +- masakari/api/openstack/ha/notifications.py | 8 +- masakari/conf/engine_driver.py | 15 + .../versions/006_add_persistence_tables.py | 32 ++ masakari/engine/driver.py | 10 + masakari/engine/drivers/taskflow/base.py | 58 +++- masakari/engine/drivers/taskflow/driver.py | 148 ++++++-- .../engine/drivers/taskflow/host_failure.py | 179 ++++++---- .../drivers/taskflow/instance_failure.py | 106 +++--- masakari/engine/drivers/taskflow/no_op.py | 3 +- .../drivers/taskflow/process_failure.py | 64 ++-- masakari/engine/manager.py | 26 +- masakari/engine/rpcapi.py | 12 +- masakari/ha/api.py | 12 + masakari/objects/fields.py | 2 + masakari/objects/notification.py | 41 ++- .../api/openstack/ha/test_notifications.py | 56 ++- masakari/tests/unit/db/test_migrations.py | 41 +++ .../taskflow/test_host_failure_flow.py | 323 +++++++++++++++--- .../taskflow/test_instance_failure_flow.py | 149 ++++++-- .../taskflow/test_process_failure_flow.py | 62 +++- masakari/tests/unit/engine/test_engine_mgr.py | 246 ++++++++++++- masakari/tests/unit/engine/test_rpcapi.py | 8 + .../tests/unit/objects/test_notifications.py | 13 +- masakari/tests/unit/objects/test_objects.py | 3 +- ...s-recovery-workflows-5b14b7b3f87374f4.yaml | 29 ++ requirements.txt | 1 + test-requirements.txt | 1 + 32 files changed, 1435 insertions(+), 281 deletions(-) create mode 100644 masakari/db/sqlalchemy/migrate_repo/versions/006_add_persistence_tables.py create mode 100644 releasenotes/notes/progress-details-recovery-workflows-5b14b7b3f87374f4.yaml diff --git a/api-ref/source/notifications.inc b/api-ref/source/notifications.inc index 46efd959..cbdfc136 100644 --- a/api-ref/source/notifications.inc +++ b/api-ref/source/notifications.inc @@ -219,6 +219,7 @@ Response - created_at: created - status: notification_status - updated_at: updated + - recovery_workflow_details: recovery_workflow_details - id: notification_id **Example Show Notification Details** diff --git a/api-ref/source/parameters.yaml b/api-ref/source/parameters.yaml index f558076c..f06f4cb9 100644 --- a/api-ref/source/parameters.yaml +++ b/api-ref/source/parameters.yaml @@ -295,6 +295,14 @@ on_maintenance: in: body required: false type: boolean +recovery_workflow_details: + description: | + Recovery workflow details of the notification. This is a list of dictionary. + + ``New in version 1.1`` + in: body + required: true + type: array reserved: description: | A boolean indicates whether this host is reserved or not, if it is diff --git a/doc/api_samples/notifications/notification-get-resp.json b/doc/api_samples/notifications/notification-get-resp.json index f8eb18f8..ef21fe6a 100644 --- a/doc/api_samples/notifications/notification-get-resp.json +++ b/doc/api_samples/notifications/notification-get-resp.json @@ -1,19 +1,48 @@ { "notification": { - "notification_uuid": "9e66b95d-45da-4695-bfb6-ace68b35d955", - "status": "new", - "source_host_uuid": "083a8474-22c0-407f-b89b-c569134c3bfd", + "notification_uuid": "07a331b8-df15-4582-b121-73ed3541a408", + "status": "finished", + "source_host_uuid": "b5bc49be-ea6f-472d-9240-968f75d7a16a", "deleted": false, - "created_at": "2017-04-24T06:37:37.396994", - "updated_at": null, - "id": 4, - "generated_time": "2017-04-24T08:34:46.000000", - "deleted_at": null, - "type": "COMPUTE_HOST", + "created_at": "2019-02-28T07:19:49.000000", + "updated_at": "2019-02-28T07:19:59.000000", "payload": { - "host_status": "UNKNOWN", - "event": "STOPPED", - "cluster_status": "OFFLINE" - } + "instance_uuid": "b9837317-a5b8-44f4-93b4-45500c562bb8", + "vir_domain_event": "STOPPED_FAILED", + "event": "LIFECYCLE" + }, + "recovery_workflow_details": [ + { + "progress": 1.0, + "state": "SUCCESS", + "name": "StopInstanceTask", + "progress_details": [ + {"timestamp": "2019-03-07 13:54:28.842031", "message": "Stopping instance: df528f02-2415-4a40-bad8-453ad6a519f7", "progress": "0.0"}, + {"timestamp": "2019-03-07 13:54:34.442617", "message": "Stopped instance: 'df528f02-2415-4a40-bad8-453ad6a519f7'", "progress": "1.0"} + ] + }, + { + "progress": 1.0, + "state": "SUCCESS", + "name": "StartInstanceTask", + "progress_details": [ + {"timestamp": "2019-03-07 13:54:34.531755", "message": "Starting instance: 'df528f02-2415-4a40-bad8-453ad6a519f7'", "progress": "0.0"}, + {"timestamp": "2019-03-07 13:54:35.930430", "message": "Instance started: 'df528f02-2415-4a40-bad8-453ad6a519f7'", "progress": "1.0"} + ] + }, + { + "progress": 1.0, + "state": "SUCCESS", + "name": "ConfirmInstanceActiveTask", + "progress_details": [ + {"timestamp": "2019-03-07 13:54:36.019208", "message": "Confirming instance 'df528f02-2415-4a40-bad8-453ad6a519f7' vm_state is ACTIVE", "progress": "0.0"}, + {"timestamp": "2019-03-07 13:54:38.569373", "message": "Confirmed instance 'df528f02-2415-4a40-bad8-453ad6a519f7' vm_state is ACTIVE", "progress": "1.0"} + ] + } + ], + "generated_time": "2017-06-13T15:34:55.000000", + "deleted_at": null, + "type": "VM", + "id": 13 } } diff --git a/lower-constraints.txt b/lower-constraints.txt index 74044f7c..6a0f91b9 100644 --- a/lower-constraints.txt +++ b/lower-constraints.txt @@ -108,6 +108,7 @@ sqlparse==0.2.4 statsd==3.2.2 stestr==1.0.0 stevedore==1.20.0 +SQLAlchemy-Utils==0.33.10 taskflow==2.16.0 Tempita==0.5.2 tenacity==4.9.0 diff --git a/masakari/api/api_version_request.py b/masakari/api/api_version_request.py index 72b80d5c..8ae0e300 100644 --- a/masakari/api/api_version_request.py +++ b/masakari/api/api_version_request.py @@ -39,6 +39,7 @@ from masakari.i18n import _ REST_API_VERSION_HISTORY = """REST API Version History: * 1.0 - Initial version. + * 1.1 - Add support for getting notification progress details """ # The minimum and maximum versions of the API supported @@ -47,7 +48,7 @@ REST_API_VERSION_HISTORY = """REST API Version History: # Note: This only applies for the v1 API once microversions # support is fully merged. _MIN_API_VERSION = "1.0" -_MAX_API_VERSION = "1.0" +_MAX_API_VERSION = "1.1" DEFAULT_API_VERSION = _MIN_API_VERSION diff --git a/masakari/api/openstack/ha/notifications.py b/masakari/api/openstack/ha/notifications.py index eaea0764..efffaca8 100644 --- a/masakari/api/openstack/ha/notifications.py +++ b/masakari/api/openstack/ha/notifications.py @@ -17,6 +17,7 @@ from oslo_utils import timeutils from six.moves import http_client as http from webob import exc +from masakari.api import api_version_request from masakari.api.openstack import common from masakari.api.openstack import extensions from masakari.api.openstack.ha.schemas import notifications as schema @@ -123,7 +124,12 @@ class NotificationsController(wsgi.Controller): context.can(notifications_policies.NOTIFICATIONS % 'detail') try: - notification = self.api.get_notification(context, id) + if api_version_request.is_supported(req, min_version='1.1'): + notification = ( + self.api.get_notification_recovery_workflow_details( + context, id)) + else: + notification = self.api.get_notification(context, id) except exception.NotificationNotFound as err: raise exc.HTTPNotFound(explanation=err.format_message()) return {'notification': notification} diff --git a/masakari/conf/engine_driver.py b/masakari/conf/engine_driver.py index 7fa06a03..12b9f3b4 100644 --- a/masakari/conf/engine_driver.py +++ b/masakari/conf/engine_driver.py @@ -33,6 +33,11 @@ customized_recovery_flow_group = cfg.OptGroup( help="Configuration options for customizing various failure recovery" "workflow tasks.") +taskflow_group = cfg.OptGroup( + 'taskflow', + title='Taskflow driver options', + help="Configuration options for taskflow driver") + host_failure_opts = [ cfg.BoolOpt('evacuate_all_instances', @@ -77,6 +82,13 @@ When set to False, it will only execute instance failure recovery actions for an instance which contain metadata key 'HA_Enabled=True'."""), ] +taskflow_options = [ + cfg.StrOpt('connection', + help=""" +The SQLAlchemy connection string to use to connect to the taskflow database. +"""), +] + taskflow_driver_recovery_flows = [ cfg.Opt('host_auto_failure_recovery_tasks', type=types.Dict( @@ -188,16 +200,19 @@ def register_opts(conf): conf.register_group(instance_recovery_group) conf.register_group(host_recovery_group) conf.register_group(customized_recovery_flow_group) + conf.register_group(taskflow_group) conf.register_opts(instance_failure_options, group=instance_recovery_group) conf.register_opts(host_failure_opts, group=host_recovery_group) conf.register_opts(taskflow_driver_recovery_flows, group=customized_recovery_flow_group) + conf.register_opts(taskflow_options, group=taskflow_group) def list_opts(): return { instance_recovery_group.name: instance_failure_options, host_recovery_group.name: host_failure_opts, + taskflow_group.name: taskflow_options } diff --git a/masakari/db/sqlalchemy/migrate_repo/versions/006_add_persistence_tables.py b/masakari/db/sqlalchemy/migrate_repo/versions/006_add_persistence_tables.py new file mode 100644 index 00000000..53b20018 --- /dev/null +++ b/masakari/db/sqlalchemy/migrate_repo/versions/006_add_persistence_tables.py @@ -0,0 +1,32 @@ +# Copyright 2019 NTT Data. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import masakari.conf +from masakari.engine import driver + +CONF = masakari.conf.CONF +NOTIFICATION_DRIVER = CONF.notification_driver +PERSISTENCE_BACKEND = CONF.taskflow.connection + + +def upgrade(migrate_engine): + """Upgrade the engine with persistence tables. """ + + # Get the taskflow driver configured, default is 'taskflow_driver', + # to load persistence tables to store progress details. + taskflow_driver = driver.load_masakari_driver(NOTIFICATION_DRIVER) + + if PERSISTENCE_BACKEND: + taskflow_driver.upgrade_backend(PERSISTENCE_BACKEND) diff --git a/masakari/engine/driver.py b/masakari/engine/driver.py index 5b116fc2..6e3252aa 100644 --- a/masakari/engine/driver.py +++ b/masakari/engine/driver.py @@ -52,6 +52,16 @@ class NotificationDriver(object): notification_uuid): pass + @abc.abstractmethod + def get_notification_recovery_workflow_details(self, context, + recovery_method, + notification_uuid): + pass + + @abc.abstractmethod + def upgrade_backend(self, backend): + pass + def load_masakari_driver(masakari_driver=None): """Load a masakari driver module. diff --git a/masakari/engine/drivers/taskflow/base.py b/masakari/engine/drivers/taskflow/base.py index f8d1dc1c..77448c21 100644 --- a/masakari/engine/drivers/taskflow/base.py +++ b/masakari/engine/drivers/taskflow/base.py @@ -12,30 +12,30 @@ # License for the specific language governing permissions and limitations # under the License. +import contextlib import os from oslo_log import log as logging +from oslo_utils import timeutils from stevedore import named # For more information please visit: https://wiki.openstack.org/wiki/TaskFlow +import taskflow.engines +from taskflow import exceptions from taskflow import formatters from taskflow.listeners import base from taskflow.listeners import logging as logging_listener +from taskflow.persistence import backends +from taskflow.persistence import models from taskflow import task +import masakari.conf from masakari import exception +CONF = masakari.conf.CONF +PERSISTENCE_BACKEND = CONF.taskflow.connection LOG = logging.getLogger(__name__) -def _make_task_name(cls, addons=None): - """Makes a pretty name for a task class.""" - base_name = ".".join([cls.__module__, cls.__name__]) - extra = '' - if addons: - extra = ';%s' % (", ".join([str(a) for a in addons])) - return base_name + extra - - class MasakariTask(task.Task): """The root task class for all masakari tasks. @@ -43,12 +43,23 @@ class MasakariTask(task.Task): implement the given task as the task name. """ - def __init__(self, addons=None, **kwargs): - super(MasakariTask, self).__init__(self.make_name(addons), **kwargs) + def __init__(self, context, novaclient, **kwargs): + super(MasakariTask, self).__init__(self.__class__.__name__, **kwargs) + self.context = context + self.novaclient = novaclient + self.progress = [] - @classmethod - def make_name(cls, addons=None): - return _make_task_name(cls, addons) + def update_details(self, progress_data, progress=0.0): + progress_details = { + 'timestamp': str(timeutils.utcnow()), + 'progress': progress, + 'message': progress_data + } + + self.progress.append(progress_details) + self._notifier.notify('update_progress', {'progress': progress, + "progress_details": + self.progress}) class SpecialFormatter(formatters.FailureFormatter): @@ -102,3 +113,22 @@ def get_recovery_flow(task_list, **kwargs): name_order=True, invoke_on_load=True, invoke_kwds=kwargs) for extension in extensions.extensions: yield extension.obj + + +def load_taskflow_into_engine(action, nested_flow, + process_what): + book = None + backend = None + if PERSISTENCE_BACKEND: + backend = backends.fetch(PERSISTENCE_BACKEND) + with contextlib.closing(backend.get_connection()) as conn: + try: + book = conn.get_logbook(process_what['notification_uuid']) + except exceptions.NotFound: + pass + if book is None: + book = models.LogBook(action, + process_what['notification_uuid']) + + return taskflow.engines.load(nested_flow, store=process_what, + backend=backend, book=book) diff --git a/masakari/engine/drivers/taskflow/driver.py b/masakari/engine/drivers/taskflow/driver.py index e78db165..6c020a3c 100644 --- a/masakari/engine/drivers/taskflow/driver.py +++ b/masakari/engine/drivers/taskflow/driver.py @@ -18,11 +18,16 @@ Driver TaskFlowDriver: Execute notification workflows using taskflow. """ +from collections import OrderedDict +import contextlib from oslo_log import log as logging from oslo_utils import excutils +from taskflow import exceptions +from taskflow.persistence import backends from masakari.compute import nova +import masakari.conf from masakari.engine import driver from masakari.engine.drivers.taskflow import base from masakari.engine.drivers.taskflow import host_failure @@ -30,9 +35,13 @@ from masakari.engine.drivers.taskflow import instance_failure from masakari.engine.drivers.taskflow import process_failure from masakari import exception from masakari.i18n import _ +from masakari import objects from masakari.objects import fields +CONF = masakari.conf.CONF +TASKFLOW_CONF = CONF.taskflow_driver_recovery_flows +PERSISTENCE_BACKEND = CONF.taskflow.connection LOG = logging.getLogger(__name__) @@ -40,8 +49,9 @@ class TaskFlowDriver(driver.NotificationDriver): def __init__(self): super(TaskFlowDriver, self).__init__() - def _execute_auto_workflow(self, novaclient, process_what): - flow_engine = host_failure.get_auto_flow(novaclient, process_what) + def _execute_auto_workflow(self, context, novaclient, process_what): + flow_engine = host_failure.get_auto_flow(context, novaclient, + process_what) # Attaching this listener will capture all of the notifications # that taskflow sends out and redirect them to a more useful @@ -49,7 +59,7 @@ class TaskFlowDriver(driver.NotificationDriver): with base.DynamicLogListener(flow_engine, logger=LOG): flow_engine.run() - def _execute_rh_workflow(self, novaclient, process_what, + def _execute_rh_workflow(self, context, novaclient, process_what, reserved_host_list): if not reserved_host_list: msg = _('No reserved_hosts available for evacuation.') @@ -57,7 +67,8 @@ class TaskFlowDriver(driver.NotificationDriver): raise exception.ReservedHostsUnavailable(message=msg) process_what['reserved_host_list'] = reserved_host_list - flow_engine = host_failure.get_rh_flow(novaclient, process_what) + flow_engine = host_failure.get_rh_flow(context, novaclient, + process_what) with base.DynamicLogListener(flow_engine, logger=LOG): try: @@ -65,10 +76,10 @@ class TaskFlowDriver(driver.NotificationDriver): except exception.LockAlreadyAcquired as ex: raise exception.HostRecoveryFailureException(ex.message) - def _execute_auto_priority_workflow(self, novaclient, process_what, - reserved_host_list): + def _execute_auto_priority_workflow(self, context, novaclient, + process_what, reserved_host_list): try: - self._execute_auto_workflow(novaclient, process_what) + self._execute_auto_workflow(context, novaclient, process_what) except Exception as ex: with excutils.save_and_reraise_exception(reraise=False) as ctxt: if isinstance(ex, exception.SkipHostRecoveryException): @@ -87,13 +98,13 @@ class TaskFlowDriver(driver.NotificationDriver): 'reserved_host': fields.FailoverSegmentRecoveryMethod.RESERVED_HOST }) - self._execute_rh_workflow(novaclient, process_what, + self._execute_rh_workflow(context, novaclient, process_what, reserved_host_list) - def _execute_rh_priority_workflow(self, novaclient, process_what, + def _execute_rh_priority_workflow(self, context, novaclient, process_what, reserved_host_list): try: - self._execute_rh_workflow(novaclient, process_what, + self._execute_rh_workflow(context, novaclient, process_what, reserved_host_list) except Exception as ex: with excutils.save_and_reraise_exception(reraise=False) as ctxt: @@ -113,30 +124,32 @@ class TaskFlowDriver(driver.NotificationDriver): fields.FailoverSegmentRecoveryMethod.RESERVED_HOST, 'auto': fields.FailoverSegmentRecoveryMethod.AUTO }) - self._execute_auto_workflow(novaclient, process_what) + self._execute_auto_workflow(context, novaclient, process_what) def execute_host_failure(self, context, host_name, recovery_method, notification_uuid, reserved_host_list=None): novaclient = nova.API() # get flow for host failure process_what = { - 'context': context, - 'host_name': host_name + 'host_name': host_name, + 'notification_uuid': notification_uuid } try: if recovery_method == fields.FailoverSegmentRecoveryMethod.AUTO: - self._execute_auto_workflow(novaclient, process_what) + self._execute_auto_workflow(context, novaclient, process_what) elif recovery_method == ( fields.FailoverSegmentRecoveryMethod.RESERVED_HOST): - self._execute_rh_workflow(novaclient, process_what, + self._execute_rh_workflow(context, novaclient, process_what, reserved_host_list) elif recovery_method == ( fields.FailoverSegmentRecoveryMethod.AUTO_PRIORITY): - self._execute_auto_priority_workflow(novaclient, process_what, - reserved_host_list) + self._execute_auto_priority_workflow(context, novaclient, + process_what, + reserved_host_list) else: - self._execute_rh_priority_workflow(novaclient, process_what, + self._execute_rh_priority_workflow(context, novaclient, + process_what, reserved_host_list) except Exception as exc: with excutils.save_and_reraise_exception(reraise=False) as ctxt: @@ -154,13 +167,13 @@ class TaskFlowDriver(driver.NotificationDriver): novaclient = nova.API() # get flow for instance failure process_what = { - 'context': context, - 'instance_uuid': instance_uuid + 'instance_uuid': instance_uuid, + 'notification_uuid': notification_uuid } try: flow_engine = instance_failure.get_instance_recovery_flow( - novaclient, process_what) + context, novaclient, process_what) except Exception: msg = (_('Failed to create instance failure flow.'), notification_uuid) @@ -178,9 +191,9 @@ class TaskFlowDriver(driver.NotificationDriver): novaclient = nova.API() # get flow for process failure process_what = { - 'context': context, 'process_name': process_name, - 'host_name': host_name + 'host_name': host_name, + 'notification_uuid': notification_uuid } # TODO(abhishekk) We need to create a map for process_name and @@ -194,7 +207,7 @@ class TaskFlowDriver(driver.NotificationDriver): raise exception.SkipProcessRecoveryException() try: - flow_engine = recovery_flow(novaclient, process_what) + flow_engine = recovery_flow(context, novaclient, process_what) except Exception: msg = (_('Failed to create process failure flow.'), notification_uuid) @@ -206,3 +219,90 @@ class TaskFlowDriver(driver.NotificationDriver): # masakari's debugging (or error reporting) usage. with base.DynamicLogListener(flow_engine, logger=LOG): flow_engine.run() + + @contextlib.contextmanager + def upgrade_backend(self, persistence_backend): + try: + backend = backends.fetch(persistence_backend) + with contextlib.closing(backend.get_connection()) as conn: + conn.upgrade() + except exceptions.NotFound as e: + raise e + + def _get_taskflow_sequence(self, context, recovery_method, notification): + # Get the taskflow sequence based on the recovery method. + + novaclient = nova.API() + task_list = [] + + # Get linear task flow based on notification type + if notification.type == fields.NotificationType.VM: + tasks = TASKFLOW_CONF.instance_failure_recovery_tasks + elif notification.type == fields.NotificationType.PROCESS: + tasks = TASKFLOW_CONF.process_failure_recovery_tasks + elif notification.type == fields.NotificationType.COMPUTE_HOST: + if recovery_method == fields.FailoverSegmentRecoveryMethod.AUTO: + tasks = TASKFLOW_CONF.host_auto_failure_recovery_tasks + elif recovery_method == ( + fields.FailoverSegmentRecoveryMethod.RESERVED_HOST): + tasks = TASKFLOW_CONF.host_rh_failure_recovery_tasks + + for plugin in base.get_recovery_flow(tasks['pre'], + context=context, + novaclient=novaclient): + task_list.append(plugin.name) + + for plugin in base.get_recovery_flow(tasks['main'], + context=context, + novaclient=novaclient): + task_list.append(plugin.name) + + for plugin in base.get_recovery_flow(tasks['post'], + context=context, + novaclient=novaclient): + task_list.append(plugin.name) + + return task_list + + def get_notification_recovery_workflow_details(self, context, + recovery_method, + notification): + """Retrieve progress details in notification""" + + # Note: Taskflow doesn't support to return task details in + # the same sequence in which all tasks are executed. Reported this + # issue in LP #1815738. To resolve this issue load the tasks based on + # the recovery method and later sort it based on this task list so + # progress_details can be returned in the expected order. + task_list = self._get_taskflow_sequence(context, recovery_method, + notification) + + backend = backends.fetch(PERSISTENCE_BACKEND) + with contextlib.closing(backend.get_connection()) as conn: + progress_details = [] + flow_details = conn.get_flows_for_book( + notification.notification_uuid) + for flow in flow_details: + od = OrderedDict() + atom_details = list(conn.get_atoms_for_flow(flow.uuid)) + + for task in task_list: + for atom in atom_details: + if task == atom.name: + od[atom.name] = atom + + for key, value in od.items(): + # Add progress_details only if tasks are executed and meta + # is available in which progress_details are stored. + if value.meta: + progress_details_obj = ( + objects.NotificationProgressDetails.create( + value.name, + value.meta['progress'], + value.meta['progress_details']['details'] + ['progress_details'], + value.state)) + + progress_details.append(progress_details_obj) + + return progress_details diff --git a/masakari/engine/drivers/taskflow/host_failure.py b/masakari/engine/drivers/taskflow/host_failure.py index 4e3a5b46..018f5b9e 100644 --- a/masakari/engine/drivers/taskflow/host_failure.py +++ b/masakari/engine/drivers/taskflow/host_failure.py @@ -22,57 +22,55 @@ from oslo_log import log as logging from oslo_service import loopingcall from oslo_utils import excutils from oslo_utils import strutils -import taskflow.engines from taskflow.patterns import linear_flow from taskflow import retry import masakari.conf from masakari.engine.drivers.taskflow import base from masakari import exception -from masakari.i18n import _ from masakari import utils CONF = masakari.conf.CONF - LOG = logging.getLogger(__name__) - ACTION = 'instance:evacuate' - # Instance power_state SHUTDOWN = 4 - TASKFLOW_CONF = cfg.CONF.taskflow_driver_recovery_flows class DisableComputeServiceTask(base.MasakariTask): - def __init__(self, novaclient): + def __init__(self, context, novaclient): requires = ["host_name"] - super(DisableComputeServiceTask, self).__init__(addons=[ACTION], - requires=requires) - self.novaclient = novaclient - - def execute(self, context, host_name): - self.novaclient.enable_disable_service(context, host_name) + super(DisableComputeServiceTask, self).__init__(context, + novaclient, + requires=requires + ) + def execute(self, host_name): + msg = "Disabling compute service on host: '%s'" % host_name + self.update_details(msg) + self.novaclient.enable_disable_service(self.context, host_name) # Sleep until nova-compute service is marked as disabled. - msg = ("Sleeping %(wait)s sec before starting recovery " + log_msg = ("Sleeping %(wait)s sec before starting recovery " "thread until nova recognizes the node down.") - LOG.info(msg, {'wait': CONF.wait_period_after_service_update}) + LOG.info(log_msg, {'wait': CONF.wait_period_after_service_update}) eventlet.sleep(CONF.wait_period_after_service_update) + msg = "Disabled compute service on host: '%s'" % host_name + self.update_details(msg, 1.0) class PrepareHAEnabledInstancesTask(base.MasakariTask): """Get all HA_Enabled instances.""" default_provides = set(["instance_list"]) - def __init__(self, novaclient): + def __init__(self, context, novaclient): requires = ["host_name"] - super(PrepareHAEnabledInstancesTask, self).__init__(addons=[ACTION], + super(PrepareHAEnabledInstancesTask, self).__init__(context, + novaclient, requires=requires) - self.novaclient = novaclient - def execute(self, context, host_name): + def execute(self, host_name): def _filter_instances(instance_list): ha_enabled_instances = [] non_ha_enabled_instances = [] @@ -84,8 +82,10 @@ class PrepareHAEnabledInstancesTask(base.MasakariTask): getattr(instance, "OS-EXT-STS:vm_state") == "error"): if is_instance_ha_enabled: msg = ("Ignoring recovery of HA_Enabled instance " - "'%(instance_id)s' as it is in 'error' state.") - LOG.info(msg, {'instance_id': instance.id}) + "'%(instance_id)s' as it is in 'error' state." + ) % {'instance_id': instance.id} + LOG.info(msg) + self.update_details(msg, 0.4) continue if is_instance_ha_enabled: @@ -93,20 +93,48 @@ class PrepareHAEnabledInstancesTask(base.MasakariTask): else: non_ha_enabled_instances.append(instance) + msg = "Total HA Enabled instances count: '%d'" % len( + ha_enabled_instances) + self.update_details(msg, 0.6) + if CONF.host_failure.evacuate_all_instances: + msg = ("Total Non-HA Enabled instances count: '%d'" % len( + non_ha_enabled_instances)) + self.update_details(msg, 0.7) + ha_enabled_instances.extend(non_ha_enabled_instances) + msg = ("All instances (HA Enabled/Non-HA Enabled) should be " + "considered for evacuation. Total count is: '%d'") % ( + len(ha_enabled_instances)) + self.update_details(msg, 0.8) + return ha_enabled_instances - instance_list = self.novaclient.get_servers(context, host_name) + msg = "Preparing instances for evacuation" + self.update_details(msg) + + instance_list = self.novaclient.get_servers(self.context, host_name) + msg = ("Total instances running on failed host '%(host_name)s' is " + "%(instance_list)d") % {'host_name': host_name, + 'instance_list': len(instance_list)} + self.update_details(msg, 0.3) instance_list = _filter_instances(instance_list) if not instance_list: - msg = _('No instances to evacuate on host: %s.') % host_name + msg = ("Skipped host '%s' recovery as no instances needs to be " + "evacuated" % host_name) + self.update_details(msg, 1.0) LOG.info(msg) raise exception.SkipHostRecoveryException(message=msg) + # List of instance UUID + instance_list = [instance.id for instance in instance_list] + + msg = "Instances to be evacuated are: '%s'" % ','.join(instance_list) + self.update_details(msg, 1.0) + return { "instance_list": instance_list, } @@ -114,11 +142,11 @@ class PrepareHAEnabledInstancesTask(base.MasakariTask): class EvacuateInstancesTask(base.MasakariTask): - def __init__(self, novaclient): + def __init__(self, context, novaclient): requires = ["host_name", "instance_list"] - super(EvacuateInstancesTask, self).__init__(addons=[ACTION], + super(EvacuateInstancesTask, self).__init__(context, + novaclient, requires=requires) - self.novaclient = novaclient def _get_state_and_host_of_instance(self, context, instance): new_instance = self.novaclient.get_server(context, instance.id) @@ -131,7 +159,7 @@ class EvacuateInstancesTask(base.MasakariTask): def _stop_after_evacuation(self, context, instance): def _wait_for_stop_confirmation(): - old_vm_state, new_vm_state, _ = ( + old_vm_state, new_vm_state, instance_host = ( self._get_state_and_host_of_instance(context, instance)) if new_vm_state == 'stopped': @@ -149,9 +177,10 @@ class EvacuateInstancesTask(base.MasakariTask): periodic_call_stopped.wait) except etimeout.Timeout: with excutils.save_and_reraise_exception(): - msg = ("Instance '%s' is successfully evacuated but " - "failed to stop.") - LOG.warning(msg, instance.id) + msg = ("Instance '%(uuid)s' is successfully evacuated but " + "failed to stop.") % {'uuid': instance.id} + LOG.warning(msg) + self.update_details(msg, 1.0) finally: periodic_call_stopped.stop() @@ -171,7 +200,6 @@ class EvacuateInstancesTask(base.MasakariTask): def _wait_for_evacuation_confirmation(): old_vm_state, new_vm_state, instance_host = ( self._get_state_and_host_of_instance(context, instance)) - if instance_host != host_name: if ((old_vm_state == 'error' and new_vm_state == 'active') or @@ -220,7 +248,7 @@ class EvacuateInstancesTask(base.MasakariTask): if vm_state != 'active': if stop_instance: - self._stop_after_evacuation(context, instance) + self._stop_after_evacuation(self.context, instance) # If the instance was in 'error' state before failure # it should be set to 'error' after recovery. if vm_state == 'error': @@ -242,11 +270,18 @@ class EvacuateInstancesTask(base.MasakariTask): # Unlock the server after evacuation and confirmation self.novaclient.unlock_server(context, instance.id) - def execute(self, context, host_name, instance_list, reserved_host=None): + def execute(self, host_name, instance_list, reserved_host=None): + msg = ("Start evacuation of instances from failed host '%(host_name)s'" + ", instance uuids are : '%(instance_list)s'") % { + 'host_name': host_name, 'instance_list': ','.join(instance_list)} + self.update_details(msg) + def _do_evacuate(context, host_name, instance_list, reserved_host=None): failed_evacuation_instances = [] if reserved_host: + msg = "Enabling reserved host: '%s'" % reserved_host.name + self.update_details(msg, 0.1) if CONF.host_failure.add_reserved_host_to_aggregate: # Assign reserved_host to an aggregate to which the failed # compute host belongs to. @@ -254,15 +289,27 @@ class EvacuateInstancesTask(base.MasakariTask): for aggregate in aggregates: if host_name in aggregate.hosts: try: + msg = ("Add host %(reserved_host)s to " + "aggregate %(aggregate)s") % { + 'reserved_host': reserved_host.name, + 'aggregate': aggregate.name} + self.update_details(msg, 0.2) + self.novaclient.add_host_to_aggregate( context, reserved_host.name, aggregate) + msg = ("Added host %(reserved_host)s to " + "aggregate %(aggregate)s") % { + 'reserved_host': reserved_host.name, + 'aggregate': aggregate.name} + self.update_details(msg, 0.3) except exception.Conflict: msg = ("Host '%(reserved_host)s' already has " "been added to aggregate " - "'%(aggregate)s'.") - LOG.info(msg, - {'reserved_host': reserved_host.name, - 'aggregate': aggregate.name}) + "'%(aggregate)s'.") % { + 'reserved_host': reserved_host.name, + 'aggregate': aggregate.name} + self.update_details(msg, 1.0) + LOG.info(msg) # A failed compute host can be associated with # multiple aggregates but operators will not @@ -280,38 +327,52 @@ class EvacuateInstancesTask(base.MasakariTask): thread_pool = greenpool.GreenPool( CONF.host_failure_recovery_threads) - for instance in instance_list: + + for instance_id in instance_list: + msg = "Evacuation of instance started : '%s'" % instance_id + self.update_details(msg, 0.5) + instance = self.novaclient.get_server(self.context, + instance_id) thread_pool.spawn_n(self._evacuate_and_confirm, context, instance, host_name, failed_evacuation_instances, reserved_host) + + if not (instance_id in failed_evacuation_instances): + msg = ("Instance '%s' evacuated successfully" % + instance_id) + self.update_details(msg, 0.7) + thread_pool.waitall() if failed_evacuation_instances: - msg = _("Failed to evacuate instances %(instances)s from " - "host %(host_name)s.") % { - 'instances': failed_evacuation_instances, - 'host_name': host_name - } - raise exception.HostRecoveryFailureException(message=msg) + msg = ("Failed to evacuate instances " + "'%(failed_evacuation_instances)s' from host " + "'%(host_name)s'") % { + 'failed_evacuation_instances': + ','.join(failed_evacuation_instances), + 'host_name': host_name} + self.update_details(msg, 1.0) + raise exception.HostRecoveryFailureException( + message=msg) lock_name = reserved_host.name if reserved_host else None @utils.synchronized(lock_name) def do_evacuate_with_reserved_host(context, host_name, instance_list, reserved_host): - _do_evacuate(context, host_name, instance_list, + _do_evacuate(self.context, host_name, instance_list, reserved_host=reserved_host) if lock_name: - do_evacuate_with_reserved_host(context, host_name, instance_list, - reserved_host) + do_evacuate_with_reserved_host(self.context, host_name, + instance_list, reserved_host) else: # No need to acquire lock on reserved_host when recovery_method is # 'auto' as the selection of compute host will be decided by nova. - _do_evacuate(context, host_name, instance_list) + _do_evacuate(self.context, host_name, instance_list) -def get_auto_flow(novaclient, process_what): +def get_auto_flow(context, novaclient, process_what): """Constructs and returns the engine entrypoint flow. This flow will do the following: @@ -328,17 +389,17 @@ def get_auto_flow(novaclient, process_what): task_dict = TASKFLOW_CONF.host_auto_failure_recovery_tasks auto_evacuate_flow_pre = linear_flow.Flow('pre_tasks') - for plugin in base.get_recovery_flow(task_dict['pre'], + for plugin in base.get_recovery_flow(task_dict['pre'], context=context, novaclient=novaclient): auto_evacuate_flow_pre.add(plugin) auto_evacuate_flow_main = linear_flow.Flow('main_tasks') - for plugin in base.get_recovery_flow(task_dict['main'], + for plugin in base.get_recovery_flow(task_dict['main'], context=context, novaclient=novaclient): auto_evacuate_flow_main.add(plugin) auto_evacuate_flow_post = linear_flow.Flow('post_tasks') - for plugin in base.get_recovery_flow(task_dict['post'], + for plugin in base.get_recovery_flow(task_dict['post'], context=context, novaclient=novaclient): auto_evacuate_flow_post.add(plugin) @@ -346,10 +407,11 @@ def get_auto_flow(novaclient, process_what): nested_flow.add(auto_evacuate_flow_main) nested_flow.add(auto_evacuate_flow_post) - return taskflow.engines.load(nested_flow, store=process_what) + return base.load_taskflow_into_engine(ACTION, nested_flow, + process_what) -def get_rh_flow(novaclient, process_what): +def get_rh_flow(context, novaclient, process_what): """Constructs and returns the engine entrypoint flow. This flow will do the following: @@ -365,7 +427,7 @@ def get_rh_flow(novaclient, process_what): task_dict = TASKFLOW_CONF.host_rh_failure_recovery_tasks rh_evacuate_flow_pre = linear_flow.Flow('pre_tasks') - for plugin in base.get_recovery_flow(task_dict['pre'], + for plugin in base.get_recovery_flow(task_dict['pre'], context=context, novaclient=novaclient): rh_evacuate_flow_pre.add(plugin) @@ -373,12 +435,12 @@ def get_rh_flow(novaclient, process_what): "retry_%s" % flow_name, retry=retry.ParameterizedForEach( rebind=['reserved_host_list'], provides='reserved_host')) - for plugin in base.get_recovery_flow(task_dict['main'], + for plugin in base.get_recovery_flow(task_dict['main'], context=context, novaclient=novaclient): rh_evacuate_flow_main.add(plugin) rh_evacuate_flow_post = linear_flow.Flow('post_tasks') - for plugin in base.get_recovery_flow(task_dict['post'], + for plugin in base.get_recovery_flow(task_dict['post'], context=context, novaclient=novaclient): rh_evacuate_flow_post.add(plugin) @@ -386,4 +448,5 @@ def get_rh_flow(novaclient, process_what): nested_flow.add(rh_evacuate_flow_main) nested_flow.add(rh_evacuate_flow_post) - return taskflow.engines.load(nested_flow, store=process_what) + return base.load_taskflow_into_engine(ACTION, nested_flow, + process_what) diff --git a/masakari/engine/drivers/taskflow/instance_failure.py b/masakari/engine/drivers/taskflow/instance_failure.py index ba531397..d13ee4b5 100644 --- a/masakari/engine/drivers/taskflow/instance_failure.py +++ b/masakari/engine/drivers/taskflow/instance_failure.py @@ -19,34 +19,29 @@ from oslo_config import cfg from oslo_log import log as logging from oslo_service import loopingcall from oslo_utils import strutils -import taskflow.engines from taskflow.patterns import linear_flow import masakari.conf from masakari.engine.drivers.taskflow import base from masakari import exception -from masakari.i18n import _ CONF = masakari.conf.CONF - LOG = logging.getLogger(__name__) - ACTION = "instance:recovery" - TASKFLOW_CONF = cfg.CONF.taskflow_driver_recovery_flows class StopInstanceTask(base.MasakariTask): - def __init__(self, novaclient): + def __init__(self, context, novaclient): requires = ["instance_uuid"] - super(StopInstanceTask, self).__init__(addons=[ACTION], + super(StopInstanceTask, self).__init__(context, + novaclient, requires=requires) - self.novaclient = novaclient - def execute(self, context, instance_uuid): + def execute(self, instance_uuid): """Stop the instance for recovery.""" - instance = self.novaclient.get_server(context, instance_uuid) + instance = self.novaclient.get_server(self.context, instance_uuid) # If an instance is not HA_Enabled and "process_all_instances" config # option is also disabled, then there is no need to take any recovery @@ -54,27 +49,35 @@ class StopInstanceTask(base.MasakariTask): if not CONF.instance_failure.process_all_instances and not ( strutils.bool_from_string( instance.metadata.get('HA_Enabled', False))): - LOG.info("Skipping recovery for instance: %s as it is " - "not Ha_Enabled.", instance_uuid) + msg = ("Skipping recovery for instance: %(instance_uuid)s as it is" + " not Ha_Enabled") % {'instance_uuid': instance_uuid} + LOG.info(msg) + self.update_details(msg, 1.0) raise exception.SkipInstanceRecoveryException() vm_state = getattr(instance, 'OS-EXT-STS:vm_state') if vm_state in ['paused', 'rescued']: - msg = _("Recovery of instance '%(instance_uuid)s' is ignored as" - " it is in '%(vm_state)s' state.") % { - 'instance_uuid': instance_uuid, 'vm_state': vm_state} + msg = ("Recovery of instance '%(instance_uuid)s' is ignored as it " + "is in '%(vm_state)s' state.") % { + 'instance_uuid': instance_uuid, 'vm_state': vm_state + } LOG.warning(msg) + self.update_details(msg, 1.0) raise exception.IgnoreInstanceRecoveryException(msg) if vm_state != 'stopped': if vm_state == 'resized': self.novaclient.reset_instance_state( - context, instance.id, 'active') + self.context, instance.id, 'active') - self.novaclient.stop_server(context, instance.id) + msg = "Stopping instance: %s" % instance_uuid + self.update_details(msg) + + self.novaclient.stop_server(self.context, instance.id) def _wait_for_power_off(): - new_instance = self.novaclient.get_server(context, instance_uuid) + new_instance = self.novaclient.get_server(self.context, + instance_uuid) vm_state = getattr(new_instance, 'OS-EXT-STS:vm_state') if vm_state == 'stopped': raise loopingcall.LoopingCallDone() @@ -87,48 +90,60 @@ class StopInstanceTask(base.MasakariTask): periodic_call.start(interval=CONF.verify_interval) etimeout.with_timeout(CONF.wait_period_after_power_off, periodic_call.wait) + msg = "Stopped instance: '%s'" % instance_uuid + self.update_details(msg, 1.0) except etimeout.Timeout: - msg = _("Failed to stop instance %(instance)s") % { + msg = "Failed to stop instance %(instance)s" % { 'instance': instance.id } - raise exception.InstanceRecoveryFailureException(message=msg) + self.update_details(msg, 1.0) + raise exception.InstanceRecoveryFailureException( + message=msg) finally: # stop the periodic call, in case of exceptions or Timeout. periodic_call.stop() class StartInstanceTask(base.MasakariTask): - def __init__(self, novaclient): + def __init__(self, context, novaclient): requires = ["instance_uuid"] - super(StartInstanceTask, self).__init__(addons=[ACTION], + super(StartInstanceTask, self).__init__(context, + novaclient, requires=requires) - self.novaclient = novaclient - def execute(self, context, instance_uuid): + def execute(self, instance_uuid): """Start the instance.""" - instance = self.novaclient.get_server(context, instance_uuid) + msg = "Starting instance: '%s'" % instance_uuid + self.update_details(msg) + + instance = self.novaclient.get_server(self.context, instance_uuid) vm_state = getattr(instance, 'OS-EXT-STS:vm_state') if vm_state == 'stopped': - self.novaclient.start_server(context, instance.id) + self.novaclient.start_server(self.context, instance.id) + msg = "Instance started: '%s'" % instance_uuid + self.update_details(msg, 1.0) else: - msg = _("Invalid state for Instance %(instance)s. Expected state: " - "'STOPPED', Actual state: '%(actual_state)s'") % { + msg = ("Invalid state for Instance %(instance)s. Expected state: " + "'STOPPED', Actual state: '%(actual_state)s'") % { 'instance': instance_uuid, 'actual_state': vm_state } - raise exception.InstanceRecoveryFailureException(message=msg) + self.update_details(msg, 1.0) + raise exception.InstanceRecoveryFailureException( + message=msg) class ConfirmInstanceActiveTask(base.MasakariTask): - def __init__(self, novaclient): + def __init__(self, context, novaclient): requires = ["instance_uuid"] - super(ConfirmInstanceActiveTask, self).__init__(addons=[ACTION], + super(ConfirmInstanceActiveTask, self).__init__(context, + novaclient, requires=requires) - self.novaclient = novaclient - def execute(self, context, instance_uuid): + def execute(self, instance_uuid): def _wait_for_active(): - new_instance = self.novaclient.get_server(context, instance_uuid) + new_instance = self.novaclient.get_server(self.context, + instance_uuid) vm_state = getattr(new_instance, 'OS-EXT-STS:vm_state') if vm_state == 'active': raise loopingcall.LoopingCallDone() @@ -136,21 +151,29 @@ class ConfirmInstanceActiveTask(base.MasakariTask): periodic_call = loopingcall.FixedIntervalLoopingCall( _wait_for_active) try: + msg = "Confirming instance '%s' vm_state is ACTIVE" % instance_uuid + self.update_details(msg) + # add a timeout to the periodic call. periodic_call.start(interval=CONF.verify_interval) etimeout.with_timeout(CONF.wait_period_after_power_on, periodic_call.wait) + + msg = "Confirmed instance '%s' vm_state is ACTIVE" % instance_uuid + self.update_details(msg, 1.0) except etimeout.Timeout: - msg = _("Failed to start instance %(instance)s") % { + msg = "Failed to start instance %(instance)s" % { 'instance': instance_uuid } - raise exception.InstanceRecoveryFailureException(message=msg) + self.update_details(msg, 1.0) + raise exception.InstanceRecoveryFailureException( + message=msg) finally: # stop the periodic call, in case of exceptions or Timeout. periodic_call.stop() -def get_instance_recovery_flow(novaclient, process_what): +def get_instance_recovery_flow(context, novaclient, process_what): """Constructs and returns the engine entrypoint flow. This flow will do the following: @@ -166,17 +189,17 @@ def get_instance_recovery_flow(novaclient, process_what): task_dict = TASKFLOW_CONF.instance_failure_recovery_tasks instance_recovery_workflow_pre = linear_flow.Flow('pre_tasks') - for plugin in base.get_recovery_flow(task_dict['pre'], + for plugin in base.get_recovery_flow(task_dict['pre'], context=context, novaclient=novaclient): instance_recovery_workflow_pre.add(plugin) instance_recovery_workflow_main = linear_flow.Flow('main_tasks') - for plugin in base.get_recovery_flow(task_dict['main'], + for plugin in base.get_recovery_flow(task_dict['main'], context=context, novaclient=novaclient): instance_recovery_workflow_main.add(plugin) instance_recovery_workflow_post = linear_flow.Flow('post_tasks') - for plugin in base.get_recovery_flow(task_dict['post'], + for plugin in base.get_recovery_flow(task_dict['post'], context=context, novaclient=novaclient): instance_recovery_workflow_post.add(plugin) @@ -184,4 +207,5 @@ def get_instance_recovery_flow(novaclient, process_what): nested_flow.add(instance_recovery_workflow_main) nested_flow.add(instance_recovery_workflow_post) - return taskflow.engines.load(nested_flow, store=process_what) + return base.load_taskflow_into_engine(ACTION, nested_flow, + process_what) diff --git a/masakari/engine/drivers/taskflow/no_op.py b/masakari/engine/drivers/taskflow/no_op.py index 8197ab31..3f42d528 100644 --- a/masakari/engine/drivers/taskflow/no_op.py +++ b/masakari/engine/drivers/taskflow/no_op.py @@ -21,7 +21,8 @@ LOG = logging.getLogger(__name__) class Noop(task.Task): - def __init__(self, novaclient): + def __init__(self, context, novaclient): + self.context = context self.novaclient = novaclient super(Noop, self).__init__() diff --git a/masakari/engine/drivers/taskflow/process_failure.py b/masakari/engine/drivers/taskflow/process_failure.py index 8451caef..5b2cc510 100644 --- a/masakari/engine/drivers/taskflow/process_failure.py +++ b/masakari/engine/drivers/taskflow/process_failure.py @@ -18,75 +18,86 @@ from eventlet import timeout as etimeout from oslo_config import cfg from oslo_log import log as logging from oslo_service import loopingcall -import taskflow.engines from taskflow.patterns import linear_flow import masakari.conf from masakari.engine.drivers.taskflow import base from masakari import exception -from masakari.i18n import _ CONF = masakari.conf.CONF - LOG = logging.getLogger(__name__) - ACTION = "process:recovery" - TASKFLOW_CONF = cfg.CONF.taskflow_driver_recovery_flows class DisableComputeNodeTask(base.MasakariTask): - def __init__(self, novaclient): + def __init__(self, context, novaclient): requires = ["process_name", "host_name"] - super(DisableComputeNodeTask, self).__init__(addons=[ACTION], + super(DisableComputeNodeTask, self).__init__(context, + novaclient, requires=requires) - self.novaclient = novaclient - def execute(self, context, process_name, host_name): - if not self.novaclient.is_service_down(context, host_name, + def execute(self, process_name, host_name): + msg = "Disabling compute service on host: '%s'" % host_name + self.update_details(msg) + + if not self.novaclient.is_service_down(self.context, host_name, process_name): # disable compute node on given host - self.novaclient.enable_disable_service(context, host_name) + self.novaclient.enable_disable_service(self.context, host_name) + msg = "Disabled compute service on host: '%s'" % host_name + self.update_details(msg, 1.0) else: - LOG.info("Skipping recovery for process: %s as it is " - "already disabled.", - process_name) + msg = ("Skipping recovery for process %(process_name)s as it is " + "already disabled") % {'process_name': process_name} + LOG.info(msg) + self.update_details(msg, 1.0) class ConfirmComputeNodeDisabledTask(base.MasakariTask): - def __init__(self, novaclient): + def __init__(self, context, novaclient): requires = ["process_name", "host_name"] - super(ConfirmComputeNodeDisabledTask, self).__init__(addons=[ACTION], + super(ConfirmComputeNodeDisabledTask, self).__init__(context, + novaclient, requires=requires) - self.novaclient = novaclient - def execute(self, context, process_name, host_name): + def execute(self, process_name, host_name): def _wait_for_disable(): service_disabled = self.novaclient.is_service_down( - context, host_name, process_name) + self.context, host_name, process_name) if service_disabled: raise loopingcall.LoopingCallDone() periodic_call = loopingcall.FixedIntervalLoopingCall( _wait_for_disable) try: + msg = "Confirming compute service is disabled on host: '%s'" % ( + host_name) + self.update_details(msg) + # add a timeout to the periodic call. periodic_call.start(interval=CONF.verify_interval) etimeout.with_timeout( CONF.wait_period_after_service_update, periodic_call.wait) + + msg = "Confirmed compute service is disabled on host: '%s'" % ( + host_name) + self.update_details(msg, 1.0) except etimeout.Timeout: - msg = _("Failed to disable service %(process_name)s") % { + msg = "Failed to disable service %(process_name)s" % { 'process_name': process_name } - raise exception.ProcessRecoveryFailureException(message=msg) + self.update_details(msg, 1.0) + raise exception.ProcessRecoveryFailureException( + message=msg) finally: # stop the periodic call, in case of exceptions or Timeout. periodic_call.stop() -def get_compute_process_recovery_flow(novaclient, process_what): +def get_compute_process_recovery_flow(context, novaclient, process_what): """Constructs and returns the engine entrypoint flow. This flow will do the following: @@ -101,17 +112,17 @@ def get_compute_process_recovery_flow(novaclient, process_what): task_dict = TASKFLOW_CONF.process_failure_recovery_tasks process_recovery_workflow_pre = linear_flow.Flow('pre_tasks') - for plugin in base.get_recovery_flow(task_dict['pre'], + for plugin in base.get_recovery_flow(task_dict['pre'], context=context, novaclient=novaclient): process_recovery_workflow_pre.add(plugin) process_recovery_workflow_main = linear_flow.Flow('main_tasks') - for plugin in base.get_recovery_flow(task_dict['main'], + for plugin in base.get_recovery_flow(task_dict['main'], context=context, novaclient=novaclient): process_recovery_workflow_main.add(plugin) process_recovery_workflow_post = linear_flow.Flow('post_tasks') - for plugin in base.get_recovery_flow(task_dict['post'], + for plugin in base.get_recovery_flow(task_dict['post'], context=context, novaclient=novaclient): process_recovery_workflow_post.add(plugin) @@ -119,4 +130,5 @@ def get_compute_process_recovery_flow(novaclient, process_what): nested_flow.add(process_recovery_workflow_main) nested_flow.add(process_recovery_workflow_post) - return taskflow.engines.load(nested_flow, store=process_what) + return base.load_taskflow_into_engine(ACTION, nested_flow, + process_what) diff --git a/masakari/engine/manager.py b/masakari/engine/manager.py index 6e79a93d..7f5cf298 100644 --- a/masakari/engine/manager.py +++ b/masakari/engine/manager.py @@ -31,8 +31,10 @@ from oslo_utils import timeutils import masakari.conf from masakari.engine import driver from masakari.engine import instance_events as virt_events +from masakari.engine import rpcapi from masakari.engine import utils as engine_utils from masakari import exception +from masakari.i18n import _ from masakari import manager from masakari import objects from masakari.objects import fields @@ -45,8 +47,8 @@ LOG = logging.getLogger(__name__) class MasakariManager(manager.Manager): """Manages the running notifications""" - - target = messaging.Target(version='1.0') + RPC_API_VERSION = rpcapi.EngineAPI.RPC_API_VERSION + target = messaging.Target(version=RPC_API_VERSION) def __init__(self, masakari_driver=None, *args, **kwargs): """Load configuration options""" @@ -332,3 +334,23 @@ class MasakariManager(manager.Manager): "status: %(status)s.", {'notification_uuid': notification.notification_uuid, 'status': notification_status}) + + def get_notification_recovery_workflow_details(self, context, + notification): + """Retrieve recovery workflow details of the notification""" + try: + host_obj = objects.Host.get_by_uuid( + context, notification.source_host_uuid) + recovery_method = host_obj.failover_segment.recovery_method + + progress_details = (self.driver. + get_notification_recovery_workflow_details( + context, recovery_method, notification)) + notification['recovery_workflow_details'] = progress_details + except Exception: + msg = (_('Failed to fetch notification recovery workflow details ' + 'for %s'), notification.notification_uuid) + LOG.exception(msg) + raise exception.MasakariException(msg) + + return notification diff --git a/masakari/engine/rpcapi.py b/masakari/engine/rpcapi.py index ff5d7025..65616b04 100644 --- a/masakari/engine/rpcapi.py +++ b/masakari/engine/rpcapi.py @@ -30,9 +30,11 @@ class EngineAPI(rpc.RPCAPI): .. code-block:: none 1.0 - Initial version. + 1.1 - Added get_notification_recovery_workflow_details method to + retrieve progress details from notification driver. """ - RPC_API_VERSION = '1.0' + RPC_API_VERSION = '1.1' TOPIC = CONF.masakari_topic BINARY = 'masakari-engine' @@ -47,3 +49,11 @@ class EngineAPI(rpc.RPCAPI): version = '1.0' cctxt = self.client.prepare(version=version) cctxt.cast(context, 'process_notification', notification=notification) + + def get_notification_recovery_workflow_details(self, context, + notification): + version = '1.1' + cctxt = self.client.prepare(version=version) + return cctxt.call(context, + 'get_notification_recovery_workflow_details', + notification=notification) diff --git a/masakari/ha/api.py b/masakari/ha/api.py index 38c6832b..a783c632 100644 --- a/masakari/ha/api.py +++ b/masakari/ha/api.py @@ -374,3 +374,15 @@ class NotificationAPI(object): raise exception.NotificationNotFound(id=notification_uuid) return notification + + def get_notification_recovery_workflow_details(self, context, + notification_uuid): + """Get recovery workflow details details of the notification""" + notification = self.get_notification(context, notification_uuid) + + LOG.debug("Fetching recovery workflow details of a notification %s ", + notification_uuid) + notification = (self.engine_rpcapi. + get_notification_recovery_workflow_details( + context, notification)) + return notification diff --git a/masakari/objects/fields.py b/masakari/objects/fields.py index c4fe5d1b..610f37c7 100644 --- a/masakari/objects/fields.py +++ b/masakari/objects/fields.py @@ -28,6 +28,8 @@ ObjectField = fields.ObjectField BaseEnumField = fields.BaseEnumField ListOfObjectsField = fields.ListOfObjectsField ListOfStringsField = fields.ListOfStringsField +FloatField = fields.FloatField +ListOfDictOfNullableStringsField = fields.ListOfDictOfNullableStringsField Field = fields.Field diff --git a/masakari/objects/notification.py b/masakari/objects/notification.py index 01b10c2f..e1039867 100644 --- a/masakari/objects/notification.py +++ b/masakari/objects/notification.py @@ -27,11 +27,17 @@ from masakari.objects import fields LOG = logging.getLogger(__name__) +NOTIFICATION_OPTIONAL_FIELDS = ['recovery_workflow_details'] + + @base.MasakariObjectRegistry.register class Notification(base.MasakariPersistentObject, base.MasakariObject, base.MasakariObjectDictCompat): - VERSION = '1.0' + # Version 1.0: Initial version + # Version 1.1: Added recovery_workflow_details field. + # Note: This field shouldn't be persisted. + VERSION = '1.1' fields = { 'id': fields.IntegerField(), @@ -41,12 +47,19 @@ class Notification(base.MasakariPersistentObject, base.MasakariObject, 'type': fields.NotificationTypeField(), 'payload': fields.DictOfStringsField(), 'status': fields.NotificationStatusField(), + # NOTE(ShilpaSD): This field shouldn't be stored in db. + # The recovery workflow details read from the 'notification_driver' + # will be set to this field. + 'recovery_workflow_details': fields.ListOfObjectsField( + 'NotificationProgressDetails', default=[]) } @staticmethod def _from_db_object(context, notification, db_notification): for key in notification.fields: + if key in NOTIFICATION_OPTIONAL_FIELDS: + continue if key != 'payload': setattr(notification, key, db_notification.get(key)) else: @@ -73,6 +86,9 @@ class Notification(base.MasakariPersistentObject, base.MasakariObject, raise exception.ObjectActionError(action='create', reason='already created') updates = self.masakari_obj_get_changes() + # NOTE(ShilpaSD): This field doesn't exist in the Notification + # db model so don't save it. + updates.pop('recovery_workflow_details', None) if 'notification_uuid' not in updates: updates['notification_uuid'] = uuidutils.generate_uuid() @@ -99,6 +115,9 @@ class Notification(base.MasakariPersistentObject, base.MasakariObject, updates = self.masakari_obj_get_changes() updates.pop('id', None) + # NOTE(ShilpaSD): This field doesn't exist in the Notification + # db model so don't save it. + updates.pop('recovery_workflow_details', None) db_notification = db.notification_update(self._context, self.notification_uuid, @@ -152,3 +171,23 @@ def notification_sample(sample): cls.sample = sample return cls return wrap + + +@base.MasakariObjectRegistry.register +class NotificationProgressDetails(base.MasakariObject, + base.MasakariObjectDictCompat): + + VERSION = '1.0' + + fields = { + 'name': fields.StringField(), + 'progress': fields.FloatField(), + 'progress_details': fields.ListOfDictOfNullableStringsField( + default=[]), + 'state': fields.StringField() + } + + @classmethod + def create(cls, name, progress, progress_details, state,): + return cls(name=name, progress=progress, + progress_details=progress_details, state=state) diff --git a/masakari/tests/unit/api/openstack/ha/test_notifications.py b/masakari/tests/unit/api/openstack/ha/test_notifications.py index 45060f05..49101f2d 100644 --- a/masakari/tests/unit/api/openstack/ha/test_notifications.py +++ b/masakari/tests/unit/api/openstack/ha/test_notifications.py @@ -15,6 +15,8 @@ """Tests for the notifications api.""" +import copy + import ddt import mock from oslo_serialization import jsonutils @@ -35,12 +37,17 @@ from masakari.tests.unit.objects import test_objects from masakari.tests import uuidsentinel NOW = timeutils.utcnow().replace(microsecond=0) +OPTIONAL = ['recovery_workflow_details'] def _make_notification_obj(notification_dict): return notification_obj.Notification(**notification_dict) +def _make_notification_progress_details_obj(progress_details): + return notification_obj.NotificationProgressDetails(**progress_details) + + def _make_notifications_list(notifications_list): return notification_obj.Notification(objects=[ _make_notification_obj(a) for a in notifications_list]) @@ -61,6 +68,22 @@ NOTIFICATION_DATA = {"type": "VM", "id": 1, NOTIFICATION = _make_notification_obj(NOTIFICATION_DATA) +RECOVERY_DETAILS = {"progress": 1.0, + "state": "SUCCESS", + "name": "StopInstanceTask", + "progress_details": [ + {"timestamp": "2019-03-07 13:54:28", + "message": "Stopping instance", + "progress": "0.0"}, + ]} + +NOTI_DATA_WITH_DETAILS = copy.deepcopy(NOTIFICATION_DATA) + +NOTIFICATION_WITH_PROGRESS_DETAILS = _make_notification_obj( + NOTI_DATA_WITH_DETAILS) +RECOVERY_OBJ = _make_notification_progress_details_obj(RECOVERY_DETAILS) +NOTIFICATION_WITH_PROGRESS_DETAILS.recovery_workflow_details = [RECOVERY_OBJ] + NOTIFICATION_LIST = [ {"type": "VM", "id": 1, "payload": {'event': 'STOPPED', 'host_status': 'NORMAL', @@ -165,7 +188,8 @@ class NotificationTestCase(test.TestCase): "type": "VM", "generated_time": "2016-09-13T09:11:21.656788"}}) result = result['notification'] - test_objects.compare_obj(self, result, NOTIFICATION_DATA) + test_objects.compare_obj(self, result, NOTIFICATION_DATA, + allow_missing=OPTIONAL) @mock.patch.object(ha_api.NotificationAPI, 'create_notification') def test_create_process_notification(self, mock_create): @@ -180,7 +204,8 @@ class NotificationTestCase(test.TestCase): "type": "PROCESS", "generated_time": "2016-09-13T09:11:21.656788"}}) result = result['notification'] - test_objects.compare_obj(self, result, NOTIFICATION_DATA) + test_objects.compare_obj(self, result, NOTIFICATION_DATA, + allow_missing=OPTIONAL) @mock.patch('masakari.rpc.get_client') @mock.patch.object(ha_api.NotificationAPI, 'create_notification') @@ -446,3 +471,30 @@ class NotificationCasePolicyNotAuthorized(test.NoDBTestCase): self.controller.index, self.req) self._check_rule(exc, rule_name) + + +class NotificationV1_1_TestCase(NotificationTestCase): + """Test Case for notifications api for 1.1 API""" + api_version = '1.1' + + @mock.patch.object(engine_rpcapi, 'EngineAPI') + def setUp(self, mock_rpc): + super(NotificationV1_1_TestCase, self).setUp() + self.controller = notifications.NotificationsController() + self.req = fakes.HTTPRequest.blank('/v1/notifications', + use_admin_context=True, + version=self.api_version) + self.context = self.req.environ['masakari.context'] + + @mock.patch.object(ha_api.NotificationAPI, + 'get_notification_recovery_workflow_details') + def test_show(self, mock_get_notification_recovery_workflow_details): + (mock_get_notification_recovery_workflow_details + .return_value) = NOTIFICATION_WITH_PROGRESS_DETAILS + + result = self.controller.show(self.req, uuidsentinel.fake_notification) + result = result['notification'] + self.assertItemsEqual([RECOVERY_OBJ], + result.recovery_workflow_details) + self._assert_notification_data(NOTIFICATION_WITH_PROGRESS_DETAILS, + _make_notification_obj(result)) diff --git a/masakari/tests/unit/db/test_migrations.py b/masakari/tests/unit/db/test_migrations.py index 77aa33b9..282a7afe 100644 --- a/masakari/tests/unit/db/test_migrations.py +++ b/masakari/tests/unit/db/test_migrations.py @@ -24,12 +24,16 @@ import sqlalchemy from sqlalchemy.engine import reflection import sqlalchemy.exc +import masakari.conf from masakari.db.sqlalchemy import migrate_repo from masakari.db.sqlalchemy import migration as sa_migration from masakari.db.sqlalchemy import models from masakari.tests import fixtures as masakari_fixtures +CONF = masakari.conf.CONF + + class MasakariMigrationsCheckers(test_migrations.WalkVersionsMixin): """Test sqlalchemy-migrate migrations.""" @@ -176,9 +180,46 @@ class MasakariMigrationsCheckers(test_migrations.WalkVersionsMixin): for table in [failover_segments, hosts]: self.assertTrue(table.c.created_at.nullable) + def _check_006(self, engine, data): + self.assertColumnExists(engine, 'logbooks', 'created_at') + self.assertColumnExists(engine, 'logbooks', 'updated_at') + self.assertColumnExists(engine, 'logbooks', 'meta') + self.assertColumnExists(engine, 'logbooks', 'name') + self.assertColumnExists(engine, 'logbooks', 'uuid') + + self.assertColumnExists(engine, 'flowdetails', 'created_at') + self.assertColumnExists(engine, 'flowdetails', 'updated_at') + self.assertColumnExists(engine, 'flowdetails', 'parent_uuid') + self.assertColumnExists(engine, 'flowdetails', 'meta') + self.assertColumnExists(engine, 'flowdetails', 'name') + self.assertColumnExists(engine, 'flowdetails', 'state') + self.assertColumnExists(engine, 'flowdetails', 'uuid') + + self.assertColumnExists(engine, 'atomdetails', 'created_at') + self.assertColumnExists(engine, 'atomdetails', 'updated_at') + self.assertColumnExists(engine, 'atomdetails', 'parent_uuid') + self.assertColumnExists(engine, 'atomdetails', 'meta') + self.assertColumnExists(engine, 'atomdetails', 'name') + self.assertColumnExists(engine, 'atomdetails', 'results') + self.assertColumnExists(engine, 'atomdetails', 'version') + self.assertColumnExists(engine, 'atomdetails', 'state') + self.assertColumnExists(engine, 'atomdetails', 'uuid') + self.assertColumnExists(engine, 'atomdetails', 'failure') + self.assertColumnExists(engine, 'atomdetails', 'atom_type') + self.assertColumnExists(engine, 'atomdetails', 'intention') + self.assertColumnExists(engine, 'atomdetails', 'revert_results') + self.assertColumnExists(engine, 'atomdetails', 'revert_failure') + class TestMasakariMigrationsSQLite(MasakariMigrationsCheckers, test_base.DbTestCase): + + def _check_006(self, engine, data): + # NOTE(ShilpaSD): DB script '006_add_persistence_tables.py' adds db + # tables required for taskflow which doesn't support Sqlite using + # alembic migration. + pass + pass diff --git a/masakari/tests/unit/engine/drivers/taskflow/test_host_failure_flow.py b/masakari/tests/unit/engine/drivers/taskflow/test_host_failure_flow.py index 91b4c8af..7d973c93 100644 --- a/masakari/tests/unit/engine/drivers/taskflow/test_host_failure_flow.py +++ b/masakari/tests/unit/engine/drivers/taskflow/test_host_failure_flow.py @@ -55,15 +55,15 @@ class HostFailureTestCase(test.TestCase): def _verify_instance_evacuated(self, old_instance_list): for server in old_instance_list: - instance = self.novaclient.get_server(self.ctxt, server.id) + instance = self.novaclient.get_server(self.ctxt, server) - if getattr(server, 'OS-EXT-STS:vm_state') in \ - ['active', 'stopped', 'error']: - self.assertEqual(getattr(server, 'OS-EXT-STS:vm_state'), - getattr(instance, 'OS-EXT-STS:vm_state')) + if getattr(instance, 'OS-EXT-STS:vm_state') in \ + ['active', 'stopped', 'error']: + self.assertIn(getattr(instance, 'OS-EXT-STS:vm_state'), + ['active', 'stopped', 'error']) else: - if getattr(server, 'OS-EXT-STS:vm_state') == 'resized' and \ - getattr(server, 'OS-EXT-STS:power_state') != 4: + if getattr(instance, 'OS-EXT-STS:vm_state') == 'resized' and \ + getattr(instance, 'OS-EXT-STS:power_state') != 4: self.assertEqual('active', getattr(instance, 'OS-EXT-STS:vm_state')) else: @@ -71,7 +71,7 @@ class HostFailureTestCase(test.TestCase): getattr(instance, 'OS-EXT-STS:vm_state')) if CONF.host_failure.ignore_instances_in_error_state and getattr( - server, 'OS-EXT-STS:vm_state') == 'error': + instance, 'OS-EXT-STS:vm_state') == 'error': self.assertEqual( self.instance_host, getattr( instance, 'OS-EXT-SRV-ATTR:hypervisor_hostname')) @@ -81,49 +81,58 @@ class HostFailureTestCase(test.TestCase): instance, 'OS-EXT-SRV-ATTR:hypervisor_hostname')) def _test_disable_compute_service(self, mock_enable_disable): - task = host_failure.DisableComputeServiceTask(self.novaclient) - task.execute(self.ctxt, self.instance_host) + task = host_failure.DisableComputeServiceTask(self.ctxt, + self.novaclient) + task.execute(self.instance_host) mock_enable_disable.assert_called_once_with( self.ctxt, self.instance_host) def _test_instance_list(self, instances_evacuation_count): - task = host_failure.PrepareHAEnabledInstancesTask(self.novaclient) - instance_list = task.execute(self.ctxt, self.instance_host) - - for instance in instance_list['instance_list']: + task = host_failure.PrepareHAEnabledInstancesTask(self.ctxt, + self.novaclient) + instances = task.execute(self.instance_host) + instance_uuid_list = [] + for instance_id in instances['instance_list']: + instance = self.novaclient.get_server(self.ctxt, instance_id) if CONF.host_failure.ignore_instances_in_error_state: self.assertNotEqual("error", getattr(instance, "OS-EXT-STS:vm_state")) if not CONF.host_failure.evacuate_all_instances: self.assertTrue(instance.metadata.get('HA_Enabled', False)) - self.assertEqual(instances_evacuation_count, - len(instance_list['instance_list'])) + instance_uuid_list.append(instance.id) - return instance_list + self.assertEqual(instances_evacuation_count, + len(instances['instance_list'])) + + return { + "instance_list": instance_uuid_list, + } def _evacuate_instances(self, instance_list, mock_enable_disable, reserved_host=None): - task = host_failure.EvacuateInstancesTask(self.novaclient) + task = host_failure.EvacuateInstancesTask(self.ctxt, self.novaclient) old_instance_list = copy.deepcopy(instance_list['instance_list']) if reserved_host: - task.execute(self.ctxt, self.instance_host, + task.execute(self.instance_host, instance_list['instance_list'], reserved_host=reserved_host) self.assertTrue(mock_enable_disable.called) else: task.execute( - self.ctxt, self.instance_host, instance_list['instance_list']) + self.instance_host, instance_list['instance_list']) # make sure instance is active and has different host self._verify_instance_evacuated(old_instance_list) @mock.patch('masakari.compute.nova.novaclient') + @mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.' + 'update_details') def test_host_failure_flow_for_auto_recovery( - self, _mock_novaclient, mock_unlock, mock_lock, + self, _mock_notify, _mock_novaclient, mock_unlock, mock_lock, mock_enable_disable): _mock_novaclient.return_value = self.fake_client self.override_config("evacuate_all_instances", @@ -143,9 +152,31 @@ class HostFailureTestCase(test.TestCase): # execute EvacuateInstancesTask self._evacuate_instances(instance_list, mock_enable_disable) + # verify progress details + _mock_notify.assert_has_calls([ + mock.call("Disabling compute service on host: 'fake-host'"), + mock.call("Disabled compute service on host: 'fake-host'", 1.0), + mock.call('Preparing instances for evacuation'), + mock.call("Total instances running on failed host 'fake-host' is 2" + "", 0.3), + mock.call("Total HA Enabled instances count: '1'", 0.6), + mock.call("Total Non-HA Enabled instances count: '1'", 0.7), + mock.call("All instances (HA Enabled/Non-HA Enabled) should be " + "considered for evacuation. Total count is: '2'", 0.8), + mock.call("Instances to be evacuated are: '1,2'", 1.0), + mock.call("Start evacuation of instances from failed host " + "'fake-host', instance uuids are : '1,2'"), + mock.call("Evacuation of instance started : '1'", 0.5), + mock.call("Instance '1' evacuated successfully", 0.7), + mock.call("Evacuation of instance started : '2'", 0.5), + mock.call("Instance '2' evacuated successfully", 0.7), + ]) + @mock.patch('masakari.compute.nova.novaclient') + @mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.' + 'update_details') def test_host_failure_flow_for_reserved_host_recovery( - self, _mock_novaclient, mock_unlock, mock_lock, + self, _mock_notify, _mock_novaclient, mock_unlock, mock_lock, mock_enable_disable): _mock_novaclient.return_value = self.fake_client self.override_config("evacuate_all_instances", @@ -177,12 +208,39 @@ class HostFailureTestCase(test.TestCase): self.assertIn(reserved_host.name, self.fake_client.aggregates.get('1').hosts) + # verify progress details + _mock_notify.assert_has_calls([ + mock.call("Disabling compute service on host: 'fake-host'"), + mock.call("Disabled compute service on host: 'fake-host'", 1.0), + mock.call('Preparing instances for evacuation'), + mock.call("Total instances running on failed host 'fake-host' is 2" + "", 0.3), + mock.call("Total HA Enabled instances count: '1'", 0.6), + mock.call("Total Non-HA Enabled instances count: '1'", 0.7), + mock.call("All instances (HA Enabled/Non-HA Enabled) should be " + "considered for evacuation. Total count is: '2'", 0.8), + mock.call("Instances to be evacuated are: '1,2'", 1.0), + mock.call("Start evacuation of instances from failed host " + "'fake-host', instance uuids are : '1,2'"), + mock.call("Enabling reserved host: 'fake-reserved-host'", 0.1), + mock.call('Add host fake-reserved-host to aggregate fake_agg', + 0.2), + mock.call('Added host fake-reserved-host to aggregate fake_agg', + 0.3), + mock.call("Evacuation of instance started : '1'", 0.5), + mock.call("Instance '1' evacuated successfully", 0.7), + mock.call("Evacuation of instance started : '2'", 0.5), + mock.call("Instance '2' evacuated successfully", 0.7) + ]) + @mock.patch.object(nova.API, 'add_host_to_aggregate') @mock.patch('masakari.compute.nova.novaclient') + @mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.' + 'update_details') @mock.patch('masakari.engine.drivers.taskflow.host_failure.LOG') def test_host_failure_flow_ignores_conflict_error( - self, mock_log, _mock_novaclient, mock_add_host, mock_unlock, - mock_lock, mock_enable_disable): + self, mock_log, _mock_notify, _mock_novaclient, mock_add_host, + mock_unlock, mock_lock, mock_enable_disable): _mock_novaclient.return_value = self.fake_client mock_add_host.side_effect = exception.Conflict self.override_config("add_reserved_host_to_aggregate", @@ -196,10 +254,10 @@ class HostFailureTestCase(test.TestCase): self.fake_client.aggregates.create(id="1", name='fake_agg', hosts=[self.instance_host, reserved_host.name]) - expected_msg_format = "Host '%(reserved_host)s' already has been " \ - "added to aggregate '%(aggregate)s'." - expected_msg_params = {'aggregate': 'fake_agg', - 'reserved_host': u'fake-reserved-host'} + expected_msg_format = ("Host '%(reserved_host)s' already has been " + "added to aggregate '%(aggregate)s'.") % { + 'reserved_host': 'fake-reserved-host', 'aggregate': 'fake_agg' + } # execute DisableComputeServiceTask self._test_disable_compute_service(mock_enable_disable) @@ -215,15 +273,36 @@ class HostFailureTestCase(test.TestCase): self.assertEqual(1, mock_save.call_count) self.assertIn(reserved_host.name, self.fake_client.aggregates.get('1').hosts) - mock_log.info.assert_any_call( - expected_msg_format, expected_msg_params) + mock_log.info.assert_any_call(expected_msg_format) - @ddt.data('active', 'rescued', 'paused', 'shelved', 'suspended', - 'error', 'stopped', 'resized') + # verify progress details + _mock_notify.assert_has_calls([ + mock.call("Disabling compute service on host: 'fake-host'"), + mock.call("Disabled compute service on host: 'fake-host'", 1.0), + mock.call('Preparing instances for evacuation'), + mock.call("Total instances running on failed host 'fake-host' is 1" + "", 0.3), + mock.call("Total HA Enabled instances count: '1'", 0.6), + mock.call("Instances to be evacuated are: '1'", 1.0), + mock.call("Start evacuation of instances from failed host " + "'fake-host', instance uuids are : '1'"), + mock.call("Enabling reserved host: 'fake-reserved-host'", 0.1), + mock.call('Add host fake-reserved-host to aggregate fake_agg', + 0.2), + mock.call("Host 'fake-reserved-host' already has been added to " + "aggregate 'fake_agg'.", 1.0), + mock.call("Evacuation of instance started : '1'", 0.5), + mock.call("Instance '1' evacuated successfully", 0.7) + ]) + + @ddt.data('rescued', 'paused', 'shelved', 'suspended', + 'error', 'resized', 'active', 'resized', 'stopped') @mock.patch('masakari.compute.nova.novaclient') + @mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.' + 'update_details') def test_host_failure_flow_all_instances( - self, vm_state, _mock_novaclient, mock_unlock, mock_lock, - mock_enable_disable): + self, vm_state, _mock_notify, _mock_novaclient, mock_unlock, + mock_lock, mock_enable_disable): _mock_novaclient.return_value = self.fake_client # create ha_enabled test data @@ -236,16 +315,33 @@ class HostFailureTestCase(test.TestCase): vm_state=vm_state, power_state=power_state, ha_enabled=True) + + instance_uuid_list = [] + for instance in self.fake_client.servers.list(): + instance_uuid_list.append(instance.id) + instance_list = { - "instance_list": self.fake_client.servers.list() + "instance_list": instance_uuid_list, } # execute EvacuateInstancesTask self._evacuate_instances(instance_list, mock_enable_disable) + # verify progress details + _mock_notify.assert_has_calls([ + mock.call("Start evacuation of instances from failed host " + "'fake-host', instance uuids are : '1,2'"), + mock.call("Evacuation of instance started : '1'", 0.5), + mock.call("Instance '1' evacuated successfully", 0.7), + mock.call("Evacuation of instance started : '2'", 0.5), + mock.call("Instance '2' evacuated successfully", 0.7) + ]) + @mock.patch('masakari.compute.nova.novaclient') + @mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.' + 'update_details') def test_host_failure_flow_ignore_error_instances( - self, _mock_novaclient, mock_unlock, mock_lock, + self, _mock_notify, _mock_novaclient, mock_unlock, mock_lock, mock_enable_disable): self.override_config("ignore_instances_in_error_state", True, "host_failure") @@ -267,9 +363,29 @@ class HostFailureTestCase(test.TestCase): # execute EvacuateInstancesTask self._evacuate_instances(instance_list, mock_enable_disable) + # verify progress details + _mock_notify.assert_has_calls([ + mock.call('Preparing instances for evacuation'), + mock.call("Total instances running on failed host 'fake-host' is 2" + "", 0.3), + mock.call("Ignoring recovery of HA_Enabled instance '1' as it is " + "in 'error' state.", 0.4), + mock.call("Total HA Enabled instances count: '1'", 0.6), + mock.call("Total Non-HA Enabled instances count: '0'", 0.7), + mock.call("All instances (HA Enabled/Non-HA Enabled) should be " + "considered for evacuation. Total count is: '1'", 0.8), + mock.call("Instances to be evacuated are: '2'", 1.0), + mock.call("Start evacuation of instances from failed host " + "'fake-host', instance uuids are : '2'"), + mock.call("Evacuation of instance started : '2'", 0.5), + mock.call("Instance '2' evacuated successfully", 0.7) + ]) + @mock.patch('masakari.compute.nova.novaclient') + @mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.' + 'update_details') def test_host_failure_flow_ignore_error_instances_raise_skip_host_recovery( - self, _mock_novaclient, mock_unlock, mock_lock, + self, _mock_notify, _mock_novaclient, mock_unlock, mock_lock, mock_enable_disable): self.override_config("ignore_instances_in_error_state", True, "host_failure") @@ -283,13 +399,28 @@ class HostFailureTestCase(test.TestCase): ha_enabled=True) # execute PrepareHAEnabledInstancesTask - task = host_failure.PrepareHAEnabledInstancesTask(self.novaclient) + task = host_failure.PrepareHAEnabledInstancesTask(self.ctxt, + self.novaclient) self.assertRaises(exception.SkipHostRecoveryException, task.execute, - self.ctxt, self.instance_host) + self.instance_host) + + # verify progress details + _mock_notify.assert_has_calls([ + mock.call('Preparing instances for evacuation'), + mock.call("Total instances running on failed host 'fake-host' is 1" + "", 0.3), + mock.call("Ignoring recovery of HA_Enabled instance '1' as it is " + "in 'error' state.", 0.4), + mock.call("Total HA Enabled instances count: '0'", 0.6), + mock.call("Skipped host 'fake-host' recovery as no instances needs" + " to be evacuated", 1.0) + ]) @mock.patch('masakari.compute.nova.novaclient') + @mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.' + 'update_details') def test_host_failure_flow_all_instances_active_resized_instance( - self, _mock_novaclient, mock_unlock, mock_lock, + self, _mock_notify, _mock_novaclient, mock_unlock, mock_lock, mock_enable_disable): _mock_novaclient.return_value = self.fake_client @@ -300,16 +431,33 @@ class HostFailureTestCase(test.TestCase): self.fake_client.servers.create(id="2", host=self.instance_host, vm_state='resized', ha_enabled=True) + instance_uuid_list = [] + for instance in self.fake_client.servers.list(): + instance_uuid_list.append(instance.id) + instance_list = { - "instance_list": self.fake_client.servers.list() + "instance_list": instance_uuid_list, } # execute EvacuateInstancesTask - self._evacuate_instances(instance_list, mock_enable_disable) + self._evacuate_instances(instance_list, + mock_enable_disable) + + # verify progress details + _mock_notify.assert_has_calls([ + mock.call("Start evacuation of instances from failed host " + "'fake-host', instance uuids are : '1,2'"), + mock.call("Evacuation of instance started : '1'", 0.5), + mock.call("Instance '1' evacuated successfully", 0.7), + mock.call("Evacuation of instance started : '2'", 0.5), + mock.call("Instance '2' evacuated successfully", 0.7), + ]) @mock.patch('masakari.compute.nova.novaclient') + @mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.' + 'update_details') def test_host_failure_flow_no_ha_enabled_instances( - self, _mock_novaclient, mock_unlock, mock_lock, + self, _mock_notify, _mock_novaclient, mock_unlock, mock_lock, mock_enable_disable): _mock_novaclient.return_value = self.fake_client @@ -321,22 +469,45 @@ class HostFailureTestCase(test.TestCase): self._test_disable_compute_service(mock_enable_disable) # execute PrepareHAEnabledInstancesTask - task = host_failure.PrepareHAEnabledInstancesTask(self.novaclient) + task = host_failure.PrepareHAEnabledInstancesTask(self.ctxt, + self.novaclient) self.assertRaises(exception.SkipHostRecoveryException, task.execute, - self.ctxt, self.instance_host) + self.instance_host) + + # verify progress details + _mock_notify.assert_has_calls([ + mock.call("Disabling compute service on host: 'fake-host'"), + mock.call("Disabled compute service on host: 'fake-host'", 1.0), + mock.call('Preparing instances for evacuation'), + mock.call("Total instances running on failed host 'fake-host' is 2" + "", 0.3), + mock.call("Total HA Enabled instances count: '0'", 0.6), + mock.call("Skipped host 'fake-host' recovery as no instances needs" + " to be evacuated", 1.0) + ]) @mock.patch('masakari.compute.nova.novaclient') + @mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.' + 'update_details') def test_host_failure_flow_evacuation_failed( - self, _mock_novaclient, mock_unlock, mock_lock, + self, _mock_notify, _mock_novaclient, mock_unlock, mock_lock, mock_enable_disable): + # overriding 'wait_period_after_power_off' to 2 seconds to reduce the + # wait period, default is 180 seconds. + self.override_config("wait_period_after_power_off", 2) _mock_novaclient.return_value = self.fake_client # create ha_enabled test data server = self.fake_client.servers.create(id="1", vm_state='active', host=self.instance_host, ha_enabled=True) + + instance_uuid_list = [] + for instance in self.fake_client.servers.list(): + instance_uuid_list.append(instance.id) + instance_list = { - "instance_list": self.fake_client.servers.list() + "instance_list": instance_uuid_list, } def fake_get_server(context, host): @@ -351,9 +522,23 @@ class HostFailureTestCase(test.TestCase): exception.HostRecoveryFailureException, self._evacuate_instances, instance_list, mock_enable_disable) + # verify progress details + _mock_notify.assert_has_calls([ + mock.call("Start evacuation of instances from failed host " + "'fake-host', instance uuids are : '1'"), + mock.call("Evacuation of instance started : '1'", 0.5), + mock.call("Instance '1' evacuated successfully", 0.7), + mock.call("Instance '1' is successfully evacuated but failed to " + "stop.", 1.0), + mock.call("Failed to evacuate instances '1' from host " + "'fake-host'", 1.0) + ]) + @mock.patch('masakari.compute.nova.novaclient') + @mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.' + 'update_details') def test_host_failure_flow_no_instances_on_host( - self, _mock_novaclient, mock_unlock, mock_lock, + self, _mock_notify, _mock_novaclient, mock_unlock, mock_lock, mock_enable_disable): _mock_novaclient.return_value = self.fake_client self.override_config("evacuate_all_instances", @@ -363,13 +548,31 @@ class HostFailureTestCase(test.TestCase): self._test_disable_compute_service(mock_enable_disable) # execute PrepareHAEnabledInstancesTask - task = host_failure.PrepareHAEnabledInstancesTask(self.novaclient) + task = host_failure.PrepareHAEnabledInstancesTask(self.ctxt, + self.novaclient) self.assertRaises(exception.SkipHostRecoveryException, task.execute, - self.ctxt, self.instance_host) + self.instance_host) + + # verify progress details + _mock_notify.assert_has_calls([ + mock.call("Disabling compute service on host: 'fake-host'"), + mock.call("Disabled compute service on host: 'fake-host'", 1.0), + mock.call('Preparing instances for evacuation'), + mock.call("Total instances running on failed host 'fake-host' is 0" + "", 0.3), + mock.call("Total HA Enabled instances count: '0'", 0.6), + mock.call("Total Non-HA Enabled instances count: '0'", 0.7), + mock.call("All instances (HA Enabled/Non-HA Enabled) should be " + "considered for evacuation. Total count is: '0'", 0.8), + mock.call("Skipped host 'fake-host' recovery as no instances needs" + " to be evacuated", 1.0) + ]) @mock.patch('masakari.compute.nova.novaclient') + @mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.' + 'update_details') def test_host_failure_flow_for_task_state_not_none( - self, _mock_novaclient, mock_unlock, mock_lock, + self, _mock_notify, _mock_novaclient, mock_unlock, mock_lock, mock_enable_disable): _mock_novaclient.return_value = self.fake_client @@ -389,8 +592,12 @@ class HostFailureTestCase(test.TestCase): task_state='fake_task_state', power_state=None, ha_enabled=True) + instance_uuid_list = [] + for instance in self.fake_client.servers.list(): + instance_uuid_list.append(instance.id) + instance_list = { - "instance_list": self.fake_client.servers.list() + "instance_list": instance_uuid_list, } # execute EvacuateInstancesTask @@ -405,3 +612,15 @@ class HostFailureTestCase(test.TestCase): self.fake_client.servers.reset_state_calls) self.assertEqual(stop_calls, self.fake_client.servers.stop_calls) + + # verify progress details + _mock_notify.assert_has_calls([ + mock.call("Start evacuation of instances from failed host " + "'fake-host', instance uuids are : '1,2,3'"), + mock.call("Evacuation of instance started : '1'", 0.5), + mock.call("Instance '1' evacuated successfully", 0.7), + mock.call("Evacuation of instance started : '2'", 0.5), + mock.call("Instance '2' evacuated successfully", 0.7), + mock.call("Evacuation of instance started : '3'", 0.5), + mock.call("Instance '3' evacuated successfully", 0.7) + ]) diff --git a/masakari/tests/unit/engine/drivers/taskflow/test_instance_failure_flow.py b/masakari/tests/unit/engine/drivers/taskflow/test_instance_failure_flow.py index 2c851e5f..e64994e6 100644 --- a/masakari/tests/unit/engine/drivers/taskflow/test_instance_failure_flow.py +++ b/masakari/tests/unit/engine/drivers/taskflow/test_instance_failure_flow.py @@ -44,23 +44,26 @@ class InstanceFailureTestCase(test.TestCase): False, "instance_failure") def _test_stop_instance(self): - task = instance_failure.StopInstanceTask(self.novaclient) - task.execute(self.ctxt, self.instance_id) + task = instance_failure.StopInstanceTask(self.ctxt, self.novaclient) + task.execute(self.instance_id) # verify instance is stopped instance = self.novaclient.get_server(self.ctxt, self.instance_id) self.assertEqual('stopped', getattr(instance, 'OS-EXT-STS:vm_state')) def _test_confirm_instance_is_active(self): - task = instance_failure.ConfirmInstanceActiveTask(self.novaclient) - task.execute(self.ctxt, self.instance_id) + task = instance_failure.ConfirmInstanceActiveTask(self.ctxt, + self.novaclient) + task.execute(self.instance_id) # verify instance is in active state instance = self.novaclient.get_server(self.ctxt, self.instance_id) self.assertEqual('active', getattr(instance, 'OS-EXT-STS:vm_state')) @mock.patch('masakari.compute.nova.novaclient') - def test_instance_failure_flow(self, _mock_novaclient): + @mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.' + 'update_details') + def test_instance_failure_flow(self, _mock_notify, _mock_novaclient): _mock_novaclient.return_value = self.fake_client # create test data @@ -72,14 +75,29 @@ class InstanceFailureTestCase(test.TestCase): self._test_stop_instance() # test StartInstanceTask - task = instance_failure.StartInstanceTask(self.novaclient) - task.execute(self.ctxt, self.instance_id) + task = instance_failure.StartInstanceTask(self.ctxt, self.novaclient) + task.execute(self.instance_id) # test ConfirmInstanceActiveTask self._test_confirm_instance_is_active() + # verify progress details + _mock_notify.assert_has_calls([ + mock.call('Stopping instance: ' + self.instance_id), + mock.call("Stopped instance: '" + self.instance_id + "'", 1.0), + mock.call("Starting instance: '" + self.instance_id + "'"), + mock.call("Instance started: '" + self.instance_id + "'", 1.0), + mock.call("Confirming instance '" + self.instance_id + + "' vm_state is ACTIVE"), + mock.call("Confirmed instance '" + self.instance_id + + "' vm_state is ACTIVE", 1.0) + ]) + @mock.patch('masakari.compute.nova.novaclient') - def test_instance_failure_flow_resized_instance(self, _mock_novaclient): + @mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.' + 'update_details') + def test_instance_failure_flow_resized_instance(self, _mock_notify, + _mock_novaclient): _mock_novaclient.return_value = self.fake_client # create test data @@ -91,14 +109,29 @@ class InstanceFailureTestCase(test.TestCase): self._test_stop_instance() # test StartInstanceTask - task = instance_failure.StartInstanceTask(self.novaclient) - task.execute(self.ctxt, self.instance_id) + task = instance_failure.StartInstanceTask(self.ctxt, self.novaclient) + task.execute(self.instance_id) # test ConfirmInstanceActiveTask self._test_confirm_instance_is_active() + # verify progress details + _mock_notify.assert_has_calls([ + mock.call('Stopping instance: ' + self.instance_id), + mock.call("Stopped instance: '" + self.instance_id + "'", 1.0), + mock.call("Starting instance: '" + self.instance_id + "'"), + mock.call("Instance started: '" + self.instance_id + "'", 1.0), + mock.call("Confirming instance '" + self.instance_id + + "' vm_state is ACTIVE"), + mock.call("Confirmed instance '" + self.instance_id + + "' vm_state is ACTIVE", 1.0) + ]) + @mock.patch('masakari.compute.nova.novaclient') - def test_instance_failure_flow_stop_failed(self, _mock_novaclient): + @mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.' + 'update_details') + def test_instance_failure_flow_stop_failed(self, _mock_notify, + _mock_novaclient): _mock_novaclient.return_value = self.fake_client # create test data @@ -112,29 +145,70 @@ class InstanceFailureTestCase(test.TestCase): return server # test StopInstanceTask - task = instance_failure.StopInstanceTask(self.novaclient) + task = instance_failure.StopInstanceTask(self.ctxt, self.novaclient) with mock.patch.object(self.novaclient, 'stop_server', fake_stop_server): self.assertRaises( exception.InstanceRecoveryFailureException, task.execute, - self.ctxt, self.instance_id) + self.instance_id) + + # verify progress details + _mock_notify.assert_has_calls([ + mock.call('Stopping instance: ' + self.instance_id), + mock.call('Failed to stop instance ' + self.instance_id, 1.0) + ]) @mock.patch('masakari.compute.nova.novaclient') - def test_instance_failure_flow_not_ha_enabled(self, _mock_novaclient): + @mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.' + 'update_details') + def test_instance_failure_flow_not_ha_enabled(self, _mock_notify, + _mock_novaclient): _mock_novaclient.return_value = self.fake_client # create test data self.fake_client.servers.create(self.instance_id, host="fake-host") # test StopInstanceTask - task = instance_failure.StopInstanceTask(self.novaclient) + task = instance_failure.StopInstanceTask(self.ctxt, self.novaclient) self.assertRaises( exception.SkipInstanceRecoveryException, task.execute, - self.ctxt, self.instance_id) + self.instance_id) + + # verify progress details + _mock_notify.assert_has_calls([ + mock.call('Skipping recovery for instance: ' + self.instance_id + + ' as it is not Ha_Enabled', 1.0) + ]) @mock.patch('masakari.compute.nova.novaclient') + @mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.' + 'update_details') + def test_instance_failure_flow_vm_in_paused_state(self, _mock_notify, + _mock_novaclient): + _mock_novaclient.return_value = self.fake_client + + # create test data + self.fake_client.servers.create(self.instance_id, + host="fake-host", ha_enabled=True, + vm_state="paused") + + # test StopInstanceTask + task = instance_failure.StopInstanceTask(self.ctxt, self.novaclient) + self.assertRaises( + exception.IgnoreInstanceRecoveryException, task.execute, + self.instance_id) + + # verify progress details + _mock_notify.assert_has_calls([ + mock.call("Recovery of instance '" + self.instance_id + + "' is ignored as it is in 'paused' state.", 1.0) + ]) + + @mock.patch('masakari.compute.nova.novaclient') + @mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.' + 'update_details') def test_instance_failure_flow_not_ha_enabled_but_conf_option_is_set( - self, _mock_novaclient): + self, _mock_notify, _mock_novaclient): # Setting this config option to True indicates masakari has to recover # the instance irrespective of whether it is HA_Enabled or not. self.override_config("process_all_instances", @@ -149,14 +223,29 @@ class InstanceFailureTestCase(test.TestCase): self._test_stop_instance() # test StartInstanceTask - task = instance_failure.StartInstanceTask(self.novaclient) - task.execute(self.ctxt, self.instance_id) + task = instance_failure.StartInstanceTask(self.ctxt, self.novaclient) + task.execute(self.instance_id) # test ConfirmInstanceActiveTask self._test_confirm_instance_is_active() + # verify progress details + _mock_notify.assert_has_calls([ + mock.call('Stopping instance: ' + self.instance_id), + mock.call("Stopped instance: '" + self.instance_id + "'", 1.0), + mock.call("Starting instance: '" + self.instance_id + "'"), + mock.call("Instance started: '" + self.instance_id + "'", 1.0), + mock.call("Confirming instance '" + self.instance_id + + "' vm_state is ACTIVE"), + mock.call("Confirmed instance '" + self.instance_id + + "' vm_state is ACTIVE", 1.0) + ]) + @mock.patch('masakari.compute.nova.novaclient') - def test_instance_failure_flow_start_failed(self, _mock_novaclient): + @mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.' + 'update_details') + def test_instance_failure_flow_start_failed(self, _mock_notify, + _mock_novaclient): _mock_novaclient.return_value = self.fake_client # create test data @@ -173,13 +262,25 @@ class InstanceFailureTestCase(test.TestCase): return server # test StartInstanceTask - task = instance_failure.StartInstanceTask(self.novaclient) + task = instance_failure.StartInstanceTask(self.ctxt, self.novaclient) with mock.patch.object(self.novaclient, 'start_server', fake_start_server): - task.execute(self.ctxt, self.instance_id) + task.execute(self.instance_id) # test ConfirmInstanceActiveTask - task = instance_failure.ConfirmInstanceActiveTask(self.novaclient) + task = instance_failure.ConfirmInstanceActiveTask(self.ctxt, + self.novaclient) self.assertRaises( exception.InstanceRecoveryFailureException, task.execute, - self.ctxt, self.instance_id) + self.instance_id) + + # verify progress details + _mock_notify.assert_has_calls([ + mock.call('Stopping instance: ' + self.instance_id), + mock.call("Stopped instance: '" + self.instance_id + "'", 1.0), + mock.call("Starting instance: '" + self.instance_id + "'"), + mock.call("Instance started: '" + self.instance_id + "'", 1.0), + mock.call("Confirming instance '" + self.instance_id + + "' vm_state is ACTIVE"), + mock.call('Failed to start instance 1', 1.0) + ]) diff --git a/masakari/tests/unit/engine/drivers/taskflow/test_process_failure_flow.py b/masakari/tests/unit/engine/drivers/taskflow/test_process_failure_flow.py index 6432f772..7c8774b4 100644 --- a/masakari/tests/unit/engine/drivers/taskflow/test_process_failure_flow.py +++ b/masakari/tests/unit/engine/drivers/taskflow/test_process_failure_flow.py @@ -41,7 +41,10 @@ class ProcessFailureTestCase(test.TestCase): self.override_config('wait_period_after_service_update', 2) @mock.patch('masakari.compute.nova.novaclient') - def test_compute_process_failure_flow(self, _mock_novaclient): + @mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.' + 'update_details') + def test_compute_process_failure_flow(self, _mock_notify, + _mock_novaclient): _mock_novaclient.return_value = self.fake_client # create test data @@ -50,20 +53,35 @@ class ProcessFailureTestCase(test.TestCase): status="enabled") # test DisableComputeNodeTask - task = process_failure.DisableComputeNodeTask(self.novaclient) - task.execute(self.ctxt, self.process_name, self.service_host) + task = process_failure.DisableComputeNodeTask(self.ctxt, + self.novaclient) + task.execute(self.process_name, self.service_host) # test ConfirmComputeNodeDisabledTask - task = process_failure.ConfirmComputeNodeDisabledTask(self.novaclient) - task.execute(self.ctxt, self.process_name, self.service_host) + task = process_failure.ConfirmComputeNodeDisabledTask(self.ctxt, + self.novaclient) + task.execute(self.process_name, self.service_host) # verify service is disabled self.assertTrue(self.novaclient.is_service_down(self.ctxt, self.service_host, self.process_name)) + # verify progress details + _mock_notify.assert_has_calls([ + mock.call("Disabling compute service on host: 'fake-host'"), + mock.call("Disabled compute service on host: 'fake-host'", 1.0), + mock.call("Confirming compute service is disabled on host: " + "'fake-host'"), + mock.call("Confirmed compute service is disabled on host: " + "'fake-host'", 1.0) + ]) + @mock.patch('masakari.compute.nova.novaclient') + @mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.' + 'update_details') def test_compute_process_failure_flow_disabled_process(self, + _mock_notify, _mock_novaclient): _mock_novaclient.return_value = self.fake_client @@ -73,18 +91,28 @@ class ProcessFailureTestCase(test.TestCase): status="disabled") # test DisableComputeNodeTask - task = process_failure.DisableComputeNodeTask(self.novaclient) + task = process_failure.DisableComputeNodeTask(self.ctxt, + self.novaclient) with mock.patch.object( self.novaclient, 'enable_disable_service') as mock_enable_disabled: - task.execute(self.ctxt, self.process_name, self.service_host) + task.execute(self.process_name, self.service_host) # ensure that enable_disable_service method is not called self.assertEqual(0, mock_enable_disabled.call_count) + # verify progress details + _mock_notify.assert_has_calls([ + mock.call("Disabling compute service on host: 'fake-host'"), + mock.call('Skipping recovery for process nova-compute as it is ' + 'already disabled', 1.0) + ]) + @mock.patch('masakari.compute.nova.novaclient') + @mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.' + 'update_details') def test_compute_process_failure_flow_compute_service_disabled_failed( - self, _mock_novaclient): + self, _mock_notify, _mock_novaclient): _mock_novaclient.return_value = self.fake_client # create test data @@ -97,14 +125,24 @@ class ProcessFailureTestCase(test.TestCase): return False # test DisableComputeNodeTask - task = process_failure.DisableComputeNodeTask(self.novaclient) - task.execute(self.ctxt, self.process_name, self.service_host) + task = process_failure.DisableComputeNodeTask(self.ctxt, + self.novaclient) + task.execute(self.process_name, self.service_host) with mock.patch.object(self.novaclient, 'is_service_down', fake_is_service_down): # test ConfirmComputeNodeDisabledTask task = process_failure.ConfirmComputeNodeDisabledTask( - self.novaclient) + self.ctxt, self.novaclient) self.assertRaises(exception.ProcessRecoveryFailureException, - task.execute, self.ctxt, self.process_name, + task.execute, self.process_name, self.service_host) + + # verify progress details + _mock_notify.assert_has_calls([ + mock.call("Disabling compute service on host: 'fake-host'"), + mock.call("Disabled compute service on host: 'fake-host'", 1.0), + mock.call("Confirming compute service is disabled on host: " + "'fake-host'"), + mock.call('Failed to disable service nova-compute', 1.0) + ]) diff --git a/masakari/tests/unit/engine/test_engine_mgr.py b/masakari/tests/unit/engine/test_engine_mgr.py index fc9ff0fe..32086cad 100644 --- a/masakari/tests/unit/engine/test_engine_mgr.py +++ b/masakari/tests/unit/engine/test_engine_mgr.py @@ -16,6 +16,7 @@ import mock from oslo_utils import importutils from oslo_utils import timeutils +from masakari.compute import nova import masakari.conf from masakari import context from masakari.engine import utils as engine_utils @@ -760,6 +761,8 @@ class EngineManagerUnitTestCase(test.NoDBTestCase): def test_host_failure_custom_flow_for_auto_recovery( self, _mock_log, _mock_task1, _mock_task2, _mock_task3, _mock_novaclient, _mock_notification_get): + # For testing purpose setting BACKEND as memory + masakari.engine.drivers.taskflow.base.PERSISTENCE_BACKEND = 'memory://' self.override_config( "host_auto_failure_recovery_tasks", {'pre': ['disable_compute_service_task', 'no_op'], @@ -777,6 +780,72 @@ class EngineManagerUnitTestCase(test.NoDBTestCase): # is executed. _mock_log.info.assert_called_with(expected_msg_format) + @mock.patch('masakari.compute.nova.novaclient') + @mock.patch.object(nova.API, "enable_disable_service") + @mock.patch('masakari.engine.drivers.taskflow.host_failure.' + 'PrepareHAEnabledInstancesTask.execute') + @mock.patch('masakari.engine.drivers.taskflow.host_failure.' + 'EvacuateInstancesTask.execute') + @mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.' + 'update_details') + @mock.patch('masakari.engine.drivers.taskflow.host_failure.LOG') + def test_host_failure_flow_for_auto_recovery(self, _mock_log, + _mock_notify, + _mock_novaclient, + _mock_enable_disable, + _mock_task2, _mock_task3, + _mock_notification_get): + self.novaclient = nova.API() + self.fake_client = fakes.FakeNovaClient() + self.override_config("wait_period_after_evacuation", 2) + self.override_config("wait_period_after_service_update", 2) + self.override_config("evacuate_all_instances", + True, "host_failure") + + _mock_novaclient.return_value = self.fake_client + + # create test data + self.fake_client.servers.create(id="1", host="fake-host", + ha_enabled=True) + self.fake_client.servers.create(id="2", host="fake-host") + + instance_uuid_list = [] + for instance in self.fake_client.servers.list(): + instance_uuid_list.append(instance.id) + + instance_list = { + "instance_list": ','.join(instance_uuid_list), + } + _mock_task2.return_value = instance_list + + # For testing purpose setting BACKEND as memory + masakari.engine.drivers.taskflow.base.PERSISTENCE_BACKEND = 'memory://' + + self.engine.driver.execute_host_failure( + self.context, "fake-host", + fields.FailoverSegmentRecoveryMethod.AUTO, + uuidsentinel.fake_notification) + + # make sure instance is active and has different host + for server in instance_uuid_list: + instance = self.novaclient.get_server(self.context, server) + + if CONF.host_failure.ignore_instances_in_error_state and getattr( + instance, 'OS-EXT-STS:vm_state') == 'error': + self.assertEqual( + "fake-host", getattr( + instance, 'OS-EXT-SRV-ATTR:hypervisor_hostname')) + else: + self.assertNotEqual( + "fake-host", getattr( + instance, 'OS-EXT-SRV-ATTR:hypervisor_hostname')) + + # verify progress details + _mock_notify.assert_has_calls([ + mock.call("Disabling compute service on host: 'fake-host'"), + mock.call("Disabled compute service on host: 'fake-host'", 1.0) + ]) + @mock.patch('masakari.compute.nova.novaclient') @mock.patch('masakari.engine.drivers.taskflow.host_failure.' 'DisableComputeServiceTask.execute') @@ -785,9 +854,13 @@ class EngineManagerUnitTestCase(test.NoDBTestCase): @mock.patch('masakari.engine.drivers.taskflow.host_failure.' 'EvacuateInstancesTask.execute') @mock.patch('masakari.engine.drivers.taskflow.no_op.LOG') - def test_host_failure_custom_flow_for_rh_recovery( - self, _mock_log, _mock_task1, _mock_task2, _mock_task3, + def test_host_failure_custom_flow_for_rh_recovery(self, _mock_log, + _mock_task1, + _mock_task2, + _mock_task3, _mock_novaclient, _mock_notification_get): + # For testing purpose setting BACKEND as memory + masakari.engine.drivers.taskflow.base.PERSISTENCE_BACKEND = 'memory://' self.override_config( "host_rh_failure_recovery_tasks", {'pre': ['disable_compute_service_task'], @@ -806,6 +879,65 @@ class EngineManagerUnitTestCase(test.NoDBTestCase): # is executed. _mock_log.info.assert_called_with(expected_msg_format) + @mock.patch('masakari.compute.nova.novaclient') + @mock.patch.object(nova.API, "enable_disable_service") + @mock.patch('masakari.engine.drivers.taskflow.host_failure.' + 'PrepareHAEnabledInstancesTask.execute') + @mock.patch('masakari.engine.drivers.taskflow.host_failure.' + 'EvacuateInstancesTask.execute') + @mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.' + 'update_details') + @mock.patch('masakari.engine.drivers.taskflow.host_failure.LOG') + def test_host_failure_flow_for_rh_recovery(self, _mock_log, _mock_notify, + _mock_novaclient, + _mock_enable_disable, + _mock_task2, _mock_task3, + _mock_notification_get): + self.novaclient = nova.API() + self.fake_client = fakes.FakeNovaClient() + self.override_config("wait_period_after_evacuation", 2) + self.override_config("wait_period_after_service_update", 2) + self.override_config("evacuate_all_instances", + True, "host_failure") + + _mock_novaclient.return_value = self.fake_client + + # create test data + self.fake_client.servers.create(id="1", host="fake-host", + ha_enabled=True) + self.fake_client.servers.create(id="2", host="fake-host") + + instance_uuid_list = [] + for instance in self.fake_client.servers.list(): + instance_uuid_list.append(instance.id) + + instance_list = { + "instance_list": ','.join(instance_uuid_list), + } + _mock_task2.return_value = instance_list + + # For testing purpose setting BACKEND as memory + masakari.engine.drivers.taskflow.base.PERSISTENCE_BACKEND = 'memory://' + + self.engine.driver.execute_host_failure( + self.context, "fake-host", + fields.FailoverSegmentRecoveryMethod.RESERVED_HOST, + uuidsentinel.fake_notification, + reserved_host_list=['host-1', 'host-2']) + + # make sure instance is active and has different host + for server in instance_uuid_list: + instance = self.novaclient.get_server(self.context, server) + self.assertNotEqual( + "fake-host", getattr( + instance, 'OS-EXT-SRV-ATTR:hypervisor_hostname')) + + # verify progress details + _mock_notify.assert_has_calls([ + mock.call("Disabling compute service on host: 'fake-host'"), + mock.call("Disabled compute service on host: 'fake-host'", 1.0) + ]) + @mock.patch('masakari.compute.nova.novaclient') @mock.patch('masakari.engine.drivers.taskflow.instance_failure.' 'StopInstanceTask.execute') @@ -817,6 +949,8 @@ class EngineManagerUnitTestCase(test.NoDBTestCase): def test_instance_failure_custom_flow_recovery( self, _mock_log, _mock_task1, _mock_task2, _mock_task3, _mock_novaclient, _mock_notification_get): + # For testing purpose setting BACKEND as memory + masakari.engine.drivers.taskflow.base.PERSISTENCE_BACKEND = 'memory://' self.override_config( "instance_failure_recovery_tasks", {'pre': ['stop_instance_task', 'no_op'], @@ -833,6 +967,49 @@ class EngineManagerUnitTestCase(test.NoDBTestCase): # is executed. _mock_log.info.assert_called_with(expected_msg_format) + @mock.patch('masakari.compute.nova.novaclient') + @mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.' + 'update_details') + @mock.patch('masakari.engine.drivers.taskflow.instance_failure.LOG') + def test_instance_failure_flow_recovery(self, _mock_log, _mock_notify, + _mock_novaclient, + _mock_notification_get): + self.novaclient = nova.API() + self.fake_client = fakes.FakeNovaClient() + self.override_config('wait_period_after_power_off', 2) + self.override_config('wait_period_after_power_on', 2) + instance_id = uuidsentinel.fake_ins + + _mock_novaclient.return_value = self.fake_client + + # create test data + self.fake_client.servers.create(instance_id, + host="fake-host", + ha_enabled=True) + + # For testing purpose setting BACKEND as memory + masakari.engine.drivers.taskflow.base.PERSISTENCE_BACKEND = 'memory://' + + self.engine.driver.execute_instance_failure( + self.context, instance_id, + uuidsentinel.fake_notification) + + # verify instance is in active state + instance = self.novaclient.get_server(self.context, instance_id) + self.assertEqual('active', + getattr(instance, 'OS-EXT-STS:vm_state')) + + _mock_notify.assert_has_calls([ + mock.call('Stopping instance: ' + instance_id), + mock.call("Stopped instance: '" + instance_id + "'", 1.0), + mock.call("Starting instance: '" + instance_id + "'"), + mock.call("Instance started: '" + instance_id + "'", 1.0), + mock.call("Confirming instance '" + instance_id + + "' vm_state is ACTIVE"), + mock.call("Confirmed instance '" + instance_id + + "' vm_state is ACTIVE", 1.0) + ]) + @mock.patch('masakari.compute.nova.novaclient') @mock.patch('masakari.engine.drivers.taskflow.process_failure.' 'DisableComputeNodeTask.execute') @@ -842,6 +1019,8 @@ class EngineManagerUnitTestCase(test.NoDBTestCase): def test_process_failure_custom_flow_recovery( self, _mock_log, _mock_task1, _mock_task2, _mock_novaclient, _mock_notification_get): + # For testing purpose setting BACKEND as memory + masakari.engine.drivers.taskflow.base.PERSISTENCE_BACKEND = 'memory://' self.override_config( "process_failure_recovery_tasks", {'pre': ['disable_compute_node_task', 'no_op'], @@ -853,8 +1032,69 @@ class EngineManagerUnitTestCase(test.NoDBTestCase): self.engine.driver.execute_process_failure( self.context, 'nova-compute', 'fake_host', - uuidsentinel.fake_notification) + uuidsentinel.fake_notification, ) + _mock_log.info.assert_any_call(expected_msg_format) # Ensure custom_task added to the 'process_failure_recovery_tasks' # is executed. _mock_log.info.assert_called_with(expected_msg_format) + + @mock.patch('masakari.compute.nova.novaclient') + @mock.patch('masakari.engine.drivers.taskflow.base.MasakariTask.' + 'update_details') + @mock.patch('masakari.engine.drivers.taskflow.process_failure.LOG') + def test_process_failure_flow_recovery(self, _mock_log, _mock_notify, + _mock_novaclient, + _mock_notification_get): + self.novaclient = nova.API() + self.fake_client = fakes.FakeNovaClient() + _mock_novaclient.return_value = self.fake_client + + # create test data + self.fake_client.services.create("1", host="fake-host", + binary="nova-compute", + status="enabled") + + # For testing purpose setting BACKEND as memory + masakari.engine.drivers.taskflow.base.PERSISTENCE_BACKEND = 'memory://' + + self.engine.driver.execute_process_failure( + self.context, "nova-compute", "fake-host", + uuidsentinel.fake_notification) + + # verify service is disabled + self.assertTrue(self.novaclient.is_service_down(self.context, + "fake-host", + "nova-compute")) + # verify progress details + _mock_notify.assert_has_calls([ + mock.call("Disabling compute service on host: 'fake-host'"), + mock.call("Disabled compute service on host: 'fake-host'", 1.0), + mock.call("Confirming compute service is disabled on host: " + "'fake-host'"), + mock.call("Confirmed compute service is disabled on host: " + "'fake-host'", 1.0) + ]) + + @mock.patch.object(notification_obj.Notification, "save") + @mock.patch('masakari.engine.drivers.taskflow.driver.TaskFlowDriver.' + 'get_notification_recovery_workflow_details') + def test_get_notification_recovery_workflow_details(self, + mock_progress_details, + mock_save, + mock_notification_get): + notification = fakes.create_fake_notification( + type="VM", id=1, payload={ + 'event': 'fake_event', 'instance_uuid': uuidsentinel.fake_ins, + 'vir_domain_event': 'fake_vir_domain_event' + }, + source_host_uuid=uuidsentinel.fake_host, + generated_time=NOW, status="new", + notification_uuid=uuidsentinel.fake_notification,) + + mock_notification_get.return_value = notification + self.engine.driver.get_notification_recovery_workflow_details( + self.context, notification) + + mock_progress_details.assert_called_once_with( + self.context, notification) diff --git a/masakari/tests/unit/engine/test_rpcapi.py b/masakari/tests/unit/engine/test_rpcapi.py index 17ea0389..e2520ce0 100644 --- a/masakari/tests/unit/engine/test_rpcapi.py +++ b/masakari/tests/unit/engine/test_rpcapi.py @@ -86,3 +86,11 @@ class EngineRpcAPITestCase(test.TestCase): rpc_method='cast', notification=self.fake_notification_obj, version='1.0') + + @mock.patch("masakari.rpc.get_client") + def test_get_notification_recovery_workflow_details(self, + mock_get_client): + self._test_engine_api('get_notification_recovery_workflow_details', + rpc_method='call', + notification=self.fake_notification_obj, + version='1.1') diff --git a/masakari/tests/unit/objects/test_notifications.py b/masakari/tests/unit/objects/test_notifications.py index fd084799..bb7375ca 100644 --- a/masakari/tests/unit/objects/test_notifications.py +++ b/masakari/tests/unit/objects/test_notifications.py @@ -28,6 +28,7 @@ from masakari.tests.unit.objects import test_objects from masakari.tests import uuidsentinel NOW = timeutils.utcnow().replace(microsecond=0) +OPTIONAL = ['recovery_workflow_details'] def _fake_db_notification(**kwargs): @@ -86,7 +87,8 @@ class TestNotificationObject(test_objects._LocalTest): if db_exception: self.assertIsNone(obj) - self.compare_obj(obj, fake_object_notification) + self.compare_obj(obj, fake_object_notification, + allow_missing=OPTIONAL) def test_get_by_id(self): self._test_query('notification_get_by_id', 'get_by_id', 123) @@ -116,7 +118,8 @@ class TestNotificationObject(test_objects._LocalTest): notification_obj = self._notification_create_attributes() notification_obj.create() - self.compare_obj(notification_obj, fake_object_notification) + self.compare_obj(notification_obj, fake_object_notification, + allow_missing=OPTIONAL) mock_db_create.assert_called_once_with(self.context, { 'source_host_uuid': uuidsentinel.fake_host, 'notification_uuid': uuidsentinel.fake_notification, @@ -169,7 +172,8 @@ class TestNotificationObject(test_objects._LocalTest): notification_obj.create() - self.compare_obj(notification_obj, fake_object_notification) + self.compare_obj(notification_obj, fake_object_notification, + allow_missing=OPTIONAL) mock_db_create.assert_called_once_with(self.context, { 'source_host_uuid': uuidsentinel.fake_host, 'notification_uuid': uuidsentinel.fake_notification, @@ -247,7 +251,8 @@ class TestNotificationObject(test_objects._LocalTest): notification_obj.id = 123 notification_obj.save() - self.compare_obj(notification_obj, fake_object_notification) + self.compare_obj(notification_obj, fake_object_notification, + allow_missing=OPTIONAL) (mock_notification_update. assert_called_once_with(self.context, uuidsentinel.fake_notification, {'source_host_uuid': uuidsentinel.fake_host, diff --git a/masakari/tests/unit/objects/test_objects.py b/masakari/tests/unit/objects/test_objects.py index 3184a709..7cb0c023 100644 --- a/masakari/tests/unit/objects/test_objects.py +++ b/masakari/tests/unit/objects/test_objects.py @@ -657,7 +657,8 @@ object_data = { 'FailoverSegmentList': '1.0-dfc5c6f5704d24dcaa37b0bbb03cbe60', 'Host': '1.1-3fc4d548fa220c76906426095e5971fc', 'HostList': '1.0-25ebe1b17fbd9f114fae8b6a10d198c0', - 'Notification': '1.0-eedfa3c203c100897021bd23f0ddf68c', + 'Notification': '1.1-91e3a051078e35300e325a3e2ae5fde5', + 'NotificationProgressDetails': '1.0-fc611ac932b719fbc154dbe34bb8edee', 'NotificationList': '1.0-25ebe1b17fbd9f114fae8b6a10d198c0', 'EventType': '1.0-d1d2010a7391fa109f0868d964152607', 'ExceptionNotification': '1.0-1187e93f564c5cca692db76a66cda2a6', diff --git a/releasenotes/notes/progress-details-recovery-workflows-5b14b7b3f87374f4.yaml b/releasenotes/notes/progress-details-recovery-workflows-5b14b7b3f87374f4.yaml new file mode 100644 index 00000000..66f50b32 --- /dev/null +++ b/releasenotes/notes/progress-details-recovery-workflows-5b14b7b3f87374f4.yaml @@ -0,0 +1,29 @@ +--- +features: + - | + Added support to record the recovery workflow details of the notification + which will be returned in a new microversion 1.1 in + `GET /notifications/{notification_id}` API. + + For example, GET /notifications/ response will contain + `recovery_workflow_details` parameter as shown here `notification_details`_ + + Added a new config section in Masakari conf file for configuring the back + end to be used by taskflow driver:: + + [taskflow] + # The back end for storing recovery_workflow details of the notification. + # (string value) + + connection = mysql+pymysql://root:admin@127.0.0.1/?charset=utf8 + + # Where db_name, can be a new database or you can also specify masakari + # database. + + Operator should run `masakari-manage db sync` command to add new db tables + required for storing recovery_workflow_details. + + Note: When you run `masakari-manage db sync`, make sure you have + `notification_driver=taskflow_driver` set in masakari.conf. + + .. _`notification_details`: https://developer.openstack.org/api-ref/instance-ha/?expanded=show-notification-details-detail#show-notification-details diff --git a/requirements.txt b/requirements.txt index ea431947..aff54b3d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -25,4 +25,5 @@ pbr!=2.1.0,>=2.0.0 # Apache-2.0 python-novaclient>=9.1.0 # Apache-2.0 six>=1.10.0 # MIT stevedore>=1.20.0 # Apache-2.0 +SQLAlchemy-Utils>=0.33.10 # Apache-2.0 taskflow>=2.16.0 # Apache-2.0 diff --git a/test-requirements.txt b/test-requirements.txt index 98d9c3f0..15450923 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -16,6 +16,7 @@ os-api-ref>=1.4.0 # Apache-2.0 oslosphinx>=4.7.0 # Apache-2.0 oslotest>=3.2.0 # Apache-2.0 stestr>=1.0.0 # Apache-2.0 +SQLAlchemy-Utils>=0.33.10 # Apache-2.0 requests-mock>=1.2.0 # Apache-2.0 testresources>=2.0.0 # Apache-2.0/BSD testscenarios>=0.4 # Apache-2.0/BSD