diff --git a/masakari/compute/nova.py b/masakari/compute/nova.py index e2982c6a..bd34e7c4 100644 --- a/masakari/compute/nova.py +++ b/masakari/compute/nova.py @@ -75,6 +75,9 @@ def translate_nova_exception(method): nova_exception.NotFound) as exc: err_msg = encodeutils.exception_to_unicode(exc) _reraise(exception.NotFound(reason=err_msg)) + except nova_exception.Conflict as exc: + err_msg = encodeutils.exception_to_unicode(exc) + _reraise(exception.Conflict(reason=err_msg)) return res return wrapper @@ -207,6 +210,14 @@ class API(object): LOG.info(_LI('Enable nova-compute on %s'), host_name) nova.services.enable(host_name, 'nova-compute') + @translate_nova_exception + def is_service_down(self, context, host_name, binary): + """Check whether service is up or down on given host.""" + nova = novaclient(context, admin_endpoint=True, + privileged_user=True) + service = nova.services.list(host=host_name, binary=binary)[0] + return service.status == 'disabled' + @translate_nova_exception def evacuate_instance(self, context, uuid, target=None, on_shared_storage=True): @@ -237,3 +248,21 @@ class API(object): msg = (_LI('Call get server command for instance %(uuid)s')) LOG.info(msg, {'uuid': uuid}) return nova.servers.get(uuid) + + @translate_nova_exception + def stop_server(self, context, uuid): + """Stop a server.""" + nova = novaclient(context, admin_endpoint=True, + privileged_user=True) + msg = (_LI('Call stop server command for instance %(uuid)s')) + LOG.info(msg, {'uuid': uuid}) + return nova.servers.stop(uuid) + + @translate_nova_exception + def start_server(self, context, uuid): + """Start a server.""" + nova = novaclient(context, admin_endpoint=True, + privileged_user=True) + msg = (_LI('Call start server command for instance %(uuid)s')) + LOG.info(msg, {'uuid': uuid}) + return nova.servers.start(uuid) diff --git a/masakari/conf/engine.py b/masakari/conf/engine.py index 1e7c6057..02884742 100644 --- a/masakari/conf/engine.py +++ b/masakari/conf/engine.py @@ -71,6 +71,12 @@ notification_opts = [ cfg.IntOpt('verify_interval', default=1, help='The monitoring interval for looping'), + cfg.IntOpt('wait_period_after_power_off', + default=60, + help='Number of seconds to wait for instance to shut down'), + cfg.IntOpt('wait_period_after_power_on', + default=60, + help='Number of seconds to wait for instance to start'), ] diff --git a/masakari/engine/drivers/taskflow/driver.py b/masakari/engine/drivers/taskflow/driver.py index 2cd29df9..b7c20c3b 100644 --- a/masakari/engine/drivers/taskflow/driver.py +++ b/masakari/engine/drivers/taskflow/driver.py @@ -25,8 +25,10 @@ from masakari.compute import nova from masakari.engine import driver from masakari.engine.drivers.taskflow import base from masakari.engine.drivers.taskflow import host_failure +from masakari.engine.drivers.taskflow import instance_failure +from masakari.engine.drivers.taskflow import process_failure from masakari import exception -from masakari.i18n import _ +from masakari.i18n import _, _LW from masakari.objects import fields @@ -79,8 +81,58 @@ class TaskFlowDriver(driver.NotificationDriver): def execute_instance_failure(self, context, instance_uuid, notification_uuid): - raise NotImplementedError() + novaclient = nova.API() + # get flow for instance failure + process_what = { + 'context': context, + 'instance_uuid': instance_uuid + } + + try: + flow_engine = instance_failure.get_instance_recovery_flow( + novaclient, process_what) + except Exception: + msg = (_('Failed to create instance failure flow.'), + notification_uuid) + LOG.exception(msg) + raise exception.MasakariException(msg) + + # Attaching this listener will capture all of the notifications that + # taskflow sends out and redirect them to a more useful log for + # masakari's debugging (or error reporting) usage. + with base.DynamicLogListener(flow_engine, logger=LOG): + flow_engine.run() def execute_process_failure(self, context, process_name, host_name, notification_uuid): - raise NotImplementedError() + novaclient = nova.API() + # get flow for process failure + process_what = { + 'context': context, + 'process_name': process_name, + 'host_name': host_name + } + + # TODO(abhishekk) We need to create a map for process_name and + # respective python-client so that we can pass appropriate client + # as a input to the process. + if process_name == "nova-compute": + recovery_flow = process_failure.get_compute_process_recovery_flow + else: + LOG.warning(_LW("Skipping recovery for process: %s."), + process_name) + raise exception.SkipProcessRecoveryException() + + try: + flow_engine = recovery_flow(novaclient, process_what) + except Exception: + msg = (_('Failed to create process failure flow.'), + notification_uuid) + LOG.exception(msg) + raise exception.MasakariException(msg) + + # Attaching this listener will capture all of the notifications that + # taskflow sends out and redirect them to a more useful log for + # masakari's debugging (or error reporting) usage. + with base.DynamicLogListener(flow_engine, logger=LOG): + flow_engine.run() diff --git a/masakari/engine/drivers/taskflow/instance_failure.py b/masakari/engine/drivers/taskflow/instance_failure.py new file mode 100644 index 00000000..be69156f --- /dev/null +++ b/masakari/engine/drivers/taskflow/instance_failure.py @@ -0,0 +1,158 @@ +# Copyright 2016 NTT DATA +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from eventlet import timeout as etimeout + +from oslo_log import log as logging +from oslo_service import loopingcall +from oslo_utils import strutils +import taskflow.engines +from taskflow.patterns import linear_flow + +import masakari.conf +from masakari.engine.drivers.taskflow import base +from masakari import exception +from masakari.i18n import _, _LI + + +CONF = masakari.conf.CONF + +LOG = logging.getLogger(__name__) + +ACTION = "instance:recovery" + + +class StopInstanceTask(base.MasakariTask): + def __init__(self, novaclient): + requires = ["instance_uuid"] + super(StopInstanceTask, self).__init__(addons=[ACTION], + requires=requires) + self.novaclient = novaclient + + def execute(self, context, instance_uuid): + # Get vm infomation. + instance = self.novaclient.get_server(context, instance_uuid) + + vm_state = getattr(instance, 'OS-EXT-STS:vm_state') + if vm_state != 'stopped': + if vm_state == 'resized': + self.novaclient.reset_instance_state( + context, instance.id, 'active') + + self.novaclient.stop_server(context, instance.id) + + def _wait_for_power_off(): + new_instance = self.novaclient.get_server(context, instance_uuid) + vm_state = getattr(new_instance, 'OS-EXT-STS:vm_state') + if vm_state == 'stopped': + raise loopingcall.LoopingCallDone() + + periodic_call = loopingcall.FixedIntervalLoopingCall( + _wait_for_power_off) + + try: + # add a timeout to the periodic call. + periodic_call.start(interval=CONF.verify_interval) + etimeout.with_timeout(CONF.wait_period_after_power_off, + periodic_call.wait) + except etimeout.Timeout: + msg = _("Failed to stop instance %(instance)s") % { + 'instance': instance.id + } + raise exception.InstanceRecoveryFailureException(message=msg) + finally: + # stop the periodic call, in case of exceptions or Timeout. + periodic_call.stop() + + # If instance is not HA-Enabled then exit from the flow + if not strutils.bool_from_string(instance.metadata.get( + 'HA_Enabled', False), strict=True): + LOG.info(_LI("Skipping recovery for instance: %s as it is " + "not Ha_Enabled."), instance_uuid) + raise exception.SkipInstanceRecoveryException() + + +class StartInstanceTask(base.MasakariTask): + def __init__(self, novaclient): + requires = ["instance_uuid"] + super(StartInstanceTask, self).__init__(addons=[ACTION], + requires=requires) + self.novaclient = novaclient + + def execute(self, context, instance_uuid): + # Get vm infomation. + instance = self.novaclient.get_server(context, instance_uuid) + vm_state = getattr(instance, 'OS-EXT-STS:vm_state') + if vm_state == 'stopped': + self.novaclient.start_server(context, instance.id) + else: + msg = _("Invalid state for Instance %(instance)s. Expected state: " + "'STOPPED', Actual state: '%(actual_state)s'") % { + 'instance': instance_uuid, + 'actual_state': vm_state + } + raise exception.InstanceRecoveryFailureException(message=msg) + + +class ConfirmInstanceActiveTask(base.MasakariTask): + def __init__(self, novaclient): + requires = ["instance_uuid"] + super(ConfirmInstanceActiveTask, self).__init__(addons=[ACTION], + requires=requires) + self.novaclient = novaclient + + def execute(self, context, instance_uuid): + def _wait_for_active(): + new_instance = self.novaclient.get_server(context, instance_uuid) + vm_state = getattr(new_instance, 'OS-EXT-STS:vm_state') + if vm_state == 'active': + raise loopingcall.LoopingCallDone() + + periodic_call = loopingcall.FixedIntervalLoopingCall( + _wait_for_active) + try: + # add a timeout to the periodic call. + periodic_call.start(interval=CONF.verify_interval) + etimeout.with_timeout(CONF.wait_period_after_power_on, + periodic_call.wait) + except etimeout.Timeout: + msg = _("Failed to start instance %(instance)s") % { + 'instance': instance_uuid + } + raise exception.InstanceRecoveryFailureException(message=msg) + finally: + # stop the periodic call, in case of exceptions or Timeout. + periodic_call.stop() + + +def get_instance_recovery_flow(novaclient, process_what): + """Constructs and returns the engine entrypoint flow. + + This flow will do the following: + + 1. Stop the instance + 2. Start the instance. + 3. Confirm instance is in active state. + """ + + flow_name = ACTION.replace(":", "_") + "_engine" + instance_recovery_workflow = linear_flow.Flow(flow_name) + + instance_recovery_workflow.add(StopInstanceTask(novaclient), + StartInstanceTask(novaclient), + ConfirmInstanceActiveTask(novaclient)) + + return taskflow.engines.load(instance_recovery_workflow, + store=process_what) diff --git a/masakari/engine/drivers/taskflow/process_failure.py b/masakari/engine/drivers/taskflow/process_failure.py new file mode 100644 index 00000000..ab73e3f7 --- /dev/null +++ b/masakari/engine/drivers/taskflow/process_failure.py @@ -0,0 +1,102 @@ +# Copyright 2016 NTT DATA +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from eventlet import timeout as etimeout + +from oslo_log import log as logging +from oslo_service import loopingcall +import taskflow.engines +from taskflow.patterns import linear_flow + +import masakari.conf +from masakari.engine.drivers.taskflow import base +from masakari import exception +from masakari.i18n import _, _LI + + +CONF = masakari.conf.CONF + +LOG = logging.getLogger(__name__) + +ACTION = "process:recovery" + + +class DisableComputeNodeTask(base.MasakariTask): + def __init__(self, novaclient): + requires = ["process_name", "host_name"] + super(DisableComputeNodeTask, self).__init__(addons=[ACTION], + requires=requires) + self.novaclient = novaclient + + def execute(self, context, process_name, host_name): + if not self.novaclient.is_service_down(context, host_name, + process_name): + # disable compute node on given host + self.novaclient.enable_disable_service(context, host_name) + else: + LOG.info(_LI("Skipping recovery for process: %s as it is " + "already disabled."), + process_name) + + +class ConfirmComputeNodeDisabledTask(base.MasakariTask): + def __init__(self, novaclient): + requires = ["process_name", "host_name"] + super(ConfirmComputeNodeDisabledTask, self).__init__(addons=[ACTION], + requires=requires) + self.novaclient = novaclient + + def execute(self, context, process_name, host_name): + def _wait_for_disable(): + service_disabled = self.novaclient.is_service_down( + context, host_name, process_name) + if service_disabled: + raise loopingcall.LoopingCallDone() + + periodic_call = loopingcall.FixedIntervalLoopingCall( + _wait_for_disable) + try: + # add a timeout to the periodic call. + periodic_call.start(interval=CONF.verify_interval) + etimeout.with_timeout(CONF.wait_period_after_service_disabled, + periodic_call.wait) + except etimeout.Timeout: + msg = _("Failed to disable service %(process_name)s") % { + 'process_name': process_name + } + raise exception.ProcessRecoveryFailureException(message=msg) + finally: + # stop the periodic call, in case of exceptions or Timeout. + periodic_call.stop() + + +def get_compute_process_recovery_flow(novaclient, process_what): + """Constructs and returns the engine entrypoint flow. + + This flow will do the following: + + 1. Disable nova-compute process + 2. Confirm nova-compute process is disabled + """ + + flow_name = ACTION.replace(":", "_") + "_engine" + compute_process_recovery_workflow = linear_flow.Flow(flow_name) + + compute_process_recovery_workflow.add( + DisableComputeNodeTask(novaclient), + ConfirmComputeNodeDisabledTask(novaclient)) + + return taskflow.engines.load(compute_process_recovery_workflow, + store=process_what) diff --git a/masakari/engine/manager.py b/masakari/engine/manager.py index b1f7ec16..1886c8fe 100644 --- a/masakari/engine/manager.py +++ b/masakari/engine/manager.py @@ -28,7 +28,7 @@ import oslo_messaging as messaging import masakari.conf from masakari.engine import driver from masakari import exception -from masakari.i18n import _, _LI, _LW +from masakari.i18n import _LI, _LW from masakari import manager from masakari import objects from masakari.objects import fields @@ -52,12 +52,115 @@ class MasakariManager(manager.Manager): self.driver = driver.load_masakari_driver(masakari_driver) + def _handle_notification_type_process(self, context, notification): + notification_status = fields.NotificationStatus.FINISHED + notification_event = notification.payload.get('event') + process_name = notification.payload.get('process_name') + + if notification_event.upper() == 'STARTED': + LOG.info(_LI("Notification type '%(type)s' received for host " + "'%(host_uuid)s': '%(process_name)s' has been " + "%(event)s."), { + 'type': notification.type, + 'host_uuid': notification.source_host_uuid, + 'process_name': process_name, + 'event': notification_event + }) + elif notification_event.upper() == 'STOPPED': + host_obj = objects.Host.get_by_uuid( + context, notification.source_host_uuid) + host_name = host_obj.name + + # Mark host on_maintenance mode as True + update_data = { + 'on_maintenance': True, + } + host_obj.update(update_data) + host_obj.save() + + try: + self.driver.execute_process_failure( + context, process_name, host_name, + notification.notification_uuid) + except exception.SkipProcessRecoveryException: + notification_status = fields.NotificationStatus.FINISHED + except (exception.MasakariException, + exception.ProcessRecoveryFailureException): + notification_status = fields.NotificationStatus.ERROR + else: + LOG.warning(_LW("Invalid event: %(event)s received for " + "notification type: %(notification_type)s"), { + 'event': notification_event, + 'notification_type': notification.type + }) + notification_status = fields.NotificationStatus.IGNORED + + return notification_status + + def _handle_notification_type_instance(self, context, notification): + notification_status = fields.NotificationStatus.FINISHED + try: + self.driver.execute_instance_failure( + context, notification.payload.get('instance_uuid'), + notification.notification_uuid) + except exception.SkipInstanceRecoveryException: + notification_status = fields.NotificationStatus.FINISHED + except (exception.MasakariException, + exception.InstanceRecoveryFailureException): + notification_status = fields.NotificationStatus.ERROR + + return notification_status + + def _handle_notification_type_host(self, context, notification): + notification_status = fields.NotificationStatus.FINISHED + notification_event = notification.payload.get('event') + + if notification_event.upper() == 'STARTED': + LOG.info(_LI("Notification type '%(type)s' received for host " + "'%(host_uuid)s' has been %(event)s."), { + 'type': notification.type, + 'host_uuid': notification.source_host_uuid, + 'event': notification_event + }) + elif notification_event.upper() == 'STOPPED': + host_obj = objects.Host.get_by_uuid( + context, notification.source_host_uuid) + host_name = host_obj.name + recovery_method = host_obj.failover_segment.recovery_method + + # Mark host on_maintenance mode as True + update_data = { + 'on_maintenance': True, + } + host_obj.update(update_data) + host_obj.save() + + try: + self.driver.execute_host_failure( + context, host_name, recovery_method, + notification.notification_uuid) + except (exception.MasakariException, + exception.AutoRecoveryFailureException): + notification_status = fields.NotificationStatus.ERROR + else: + LOG.warning(_LW("Invalid event: %(event)s received for " + "notification type: %(type)s"), { + 'event': notification_event, + 'type': notification.type + }) + notification_status = fields.NotificationStatus.IGNORED + + return notification_status + def process_notification(self, context, notification=None): """Processes the notification""" @utils.synchronized(notification.source_host_uuid) def do_process_notification(notification): - LOG.info(_LI('Processing notification %s'), - notification.notification_uuid) + LOG.info(_LI('Processing notification %(notification_uuid)s of ' + 'type: %(type)s'), { + 'notification_uuid': notification.notification_uuid, + 'type': notification.type + }) update_data = { 'status': fields.NotificationStatus.RUNNING, @@ -65,52 +168,15 @@ class MasakariManager(manager.Manager): notification.update(update_data) notification.save() - notification_status = fields.NotificationStatus.FINISHED if notification.type == fields.NotificationType.PROCESS: - # TODO(Dinesh_Bhor) Execute workflow for process-failure - # notification. - raise NotImplementedError(_("Flow not implemented for " - "notification type"), - notification.type) + notification_status = self._handle_notification_type_process( + context, notification) elif notification.type == fields.NotificationType.VM: - # TODO(Dinesh_Bhor) Execute workflow for instnace-failure - # notification. - raise NotImplementedError(_("Flow not implemented for " - "notification type"), - notification.type) + notification_status = self._handle_notification_type_instance( + context, notification) elif notification.type == fields.NotificationType.COMPUTE_HOST: - notification_event = notification.payload.get('event') - if notification_event.upper() == 'STARTED': - LOG.info(_LI("Notification event: '%(event)s' received " - "for host: '%(host_uuid)s'."), { - 'event': notification_event, - 'host_uuid': notification.source_host_uuid - }) - notification_status = fields.NotificationStatus.FINISHED - elif notification_event.upper() == 'STOPPED': - host_obj = objects.Host.get_by_uuid( - context, notification.source_host_uuid) - host_name = host_obj.name - recovery_method = host_obj.failover_segment.recovery_method - # Mark host on_maintenance mode as True - update_data = { - 'on_maintenance': True, - } - host_obj.update(update_data) - host_obj.save() - try: - self.driver.execute_host_failure( - context, host_name, - recovery_method, notification.notification_uuid) - except (exception.MasakariException, - exception.AutoRecoveryFailureException): - notification_status = fields.NotificationStatus.ERROR - else: - LOG.warning(_LW("Invalid event: %(event)s received for " - "notification: %(notification_uuid)s"), { - 'event': notification_event, - 'notification_uuid': notification.notification_uuid}) - notification_status = fields.NotificationStatus.IGNORED + notification_status = self._handle_notification_type_host( + context, notification) LOG.info(_LI("Notification %(notification_uuid)s exits with " "%(status)s."), { diff --git a/masakari/exception.py b/masakari/exception.py index d8be1e5c..5e6c973e 100644 --- a/masakari/exception.py +++ b/masakari/exception.py @@ -168,6 +168,11 @@ class APITimeout(APIException): msg_fmt = _("Timeout while requesting %(service)s API.") +class Conflict(MasakariException): + msg_fmt = _("Conflict") + code = 409 + + class Invalid(MasakariException): msg_fmt = _("Bad Request - Invalid Parameters") code = 400 @@ -303,3 +308,19 @@ class HostOnMaintenanceError(Invalid): class AutoRecoveryFailureException(MasakariException): msg_fmt = _('Failed to execute auto recovery method.') + + +class InstanceRecoveryFailureException(MasakariException): + msg_fmt = _('Failed to execute instance recovery workflow.') + + +class SkipInstanceRecoveryException(MasakariException): + msg_fmt = _('Skiping execution of instance recovery workflow.') + + +class SkipProcessRecoveryException(MasakariException): + msg_fmt = _('Skiping execution of process recovery workflow.') + + +class ProcessRecoveryFailureException(MasakariException): + msg_fmt = _('Failed to execute process recovery workflow.')