Add instance and process failure workflow

Implement taskflow for instance failure workflow which will
stop and start the failed instances.

For process failure notification it will set host on_maintenance
to True and disables compute service running on that host.

Change-Id: I45d772fa4502ca914141f650c85c192989b61439
This commit is contained in:
Abhishek Kekane 2016-11-01 19:16:31 +05:30
parent 686e9a7ced
commit 56fe73545e
7 changed files with 483 additions and 49 deletions

View File

@ -75,6 +75,9 @@ def translate_nova_exception(method):
nova_exception.NotFound) as exc:
err_msg = encodeutils.exception_to_unicode(exc)
_reraise(exception.NotFound(reason=err_msg))
except nova_exception.Conflict as exc:
err_msg = encodeutils.exception_to_unicode(exc)
_reraise(exception.Conflict(reason=err_msg))
return res
return wrapper
@ -207,6 +210,14 @@ class API(object):
LOG.info(_LI('Enable nova-compute on %s'), host_name)
nova.services.enable(host_name, 'nova-compute')
@translate_nova_exception
def is_service_down(self, context, host_name, binary):
"""Check whether service is up or down on given host."""
nova = novaclient(context, admin_endpoint=True,
privileged_user=True)
service = nova.services.list(host=host_name, binary=binary)[0]
return service.status == 'disabled'
@translate_nova_exception
def evacuate_instance(self, context, uuid, target=None,
on_shared_storage=True):
@ -237,3 +248,21 @@ class API(object):
msg = (_LI('Call get server command for instance %(uuid)s'))
LOG.info(msg, {'uuid': uuid})
return nova.servers.get(uuid)
@translate_nova_exception
def stop_server(self, context, uuid):
"""Stop a server."""
nova = novaclient(context, admin_endpoint=True,
privileged_user=True)
msg = (_LI('Call stop server command for instance %(uuid)s'))
LOG.info(msg, {'uuid': uuid})
return nova.servers.stop(uuid)
@translate_nova_exception
def start_server(self, context, uuid):
"""Start a server."""
nova = novaclient(context, admin_endpoint=True,
privileged_user=True)
msg = (_LI('Call start server command for instance %(uuid)s'))
LOG.info(msg, {'uuid': uuid})
return nova.servers.start(uuid)

View File

@ -71,6 +71,12 @@ notification_opts = [
cfg.IntOpt('verify_interval',
default=1,
help='The monitoring interval for looping'),
cfg.IntOpt('wait_period_after_power_off',
default=60,
help='Number of seconds to wait for instance to shut down'),
cfg.IntOpt('wait_period_after_power_on',
default=60,
help='Number of seconds to wait for instance to start'),
]

View File

@ -25,8 +25,10 @@ from masakari.compute import nova
from masakari.engine import driver
from masakari.engine.drivers.taskflow import base
from masakari.engine.drivers.taskflow import host_failure
from masakari.engine.drivers.taskflow import instance_failure
from masakari.engine.drivers.taskflow import process_failure
from masakari import exception
from masakari.i18n import _
from masakari.i18n import _, _LW
from masakari.objects import fields
@ -79,8 +81,58 @@ class TaskFlowDriver(driver.NotificationDriver):
def execute_instance_failure(self, context, instance_uuid,
notification_uuid):
raise NotImplementedError()
novaclient = nova.API()
# get flow for instance failure
process_what = {
'context': context,
'instance_uuid': instance_uuid
}
try:
flow_engine = instance_failure.get_instance_recovery_flow(
novaclient, process_what)
except Exception:
msg = (_('Failed to create instance failure flow.'),
notification_uuid)
LOG.exception(msg)
raise exception.MasakariException(msg)
# Attaching this listener will capture all of the notifications that
# taskflow sends out and redirect them to a more useful log for
# masakari's debugging (or error reporting) usage.
with base.DynamicLogListener(flow_engine, logger=LOG):
flow_engine.run()
def execute_process_failure(self, context, process_name, host_name,
notification_uuid):
raise NotImplementedError()
novaclient = nova.API()
# get flow for process failure
process_what = {
'context': context,
'process_name': process_name,
'host_name': host_name
}
# TODO(abhishekk) We need to create a map for process_name and
# respective python-client so that we can pass appropriate client
# as a input to the process.
if process_name == "nova-compute":
recovery_flow = process_failure.get_compute_process_recovery_flow
else:
LOG.warning(_LW("Skipping recovery for process: %s."),
process_name)
raise exception.SkipProcessRecoveryException()
try:
flow_engine = recovery_flow(novaclient, process_what)
except Exception:
msg = (_('Failed to create process failure flow.'),
notification_uuid)
LOG.exception(msg)
raise exception.MasakariException(msg)
# Attaching this listener will capture all of the notifications that
# taskflow sends out and redirect them to a more useful log for
# masakari's debugging (or error reporting) usage.
with base.DynamicLogListener(flow_engine, logger=LOG):
flow_engine.run()

View File

@ -0,0 +1,158 @@
# Copyright 2016 NTT DATA
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from eventlet import timeout as etimeout
from oslo_log import log as logging
from oslo_service import loopingcall
from oslo_utils import strutils
import taskflow.engines
from taskflow.patterns import linear_flow
import masakari.conf
from masakari.engine.drivers.taskflow import base
from masakari import exception
from masakari.i18n import _, _LI
CONF = masakari.conf.CONF
LOG = logging.getLogger(__name__)
ACTION = "instance:recovery"
class StopInstanceTask(base.MasakariTask):
def __init__(self, novaclient):
requires = ["instance_uuid"]
super(StopInstanceTask, self).__init__(addons=[ACTION],
requires=requires)
self.novaclient = novaclient
def execute(self, context, instance_uuid):
# Get vm infomation.
instance = self.novaclient.get_server(context, instance_uuid)
vm_state = getattr(instance, 'OS-EXT-STS:vm_state')
if vm_state != 'stopped':
if vm_state == 'resized':
self.novaclient.reset_instance_state(
context, instance.id, 'active')
self.novaclient.stop_server(context, instance.id)
def _wait_for_power_off():
new_instance = self.novaclient.get_server(context, instance_uuid)
vm_state = getattr(new_instance, 'OS-EXT-STS:vm_state')
if vm_state == 'stopped':
raise loopingcall.LoopingCallDone()
periodic_call = loopingcall.FixedIntervalLoopingCall(
_wait_for_power_off)
try:
# add a timeout to the periodic call.
periodic_call.start(interval=CONF.verify_interval)
etimeout.with_timeout(CONF.wait_period_after_power_off,
periodic_call.wait)
except etimeout.Timeout:
msg = _("Failed to stop instance %(instance)s") % {
'instance': instance.id
}
raise exception.InstanceRecoveryFailureException(message=msg)
finally:
# stop the periodic call, in case of exceptions or Timeout.
periodic_call.stop()
# If instance is not HA-Enabled then exit from the flow
if not strutils.bool_from_string(instance.metadata.get(
'HA_Enabled', False), strict=True):
LOG.info(_LI("Skipping recovery for instance: %s as it is "
"not Ha_Enabled."), instance_uuid)
raise exception.SkipInstanceRecoveryException()
class StartInstanceTask(base.MasakariTask):
def __init__(self, novaclient):
requires = ["instance_uuid"]
super(StartInstanceTask, self).__init__(addons=[ACTION],
requires=requires)
self.novaclient = novaclient
def execute(self, context, instance_uuid):
# Get vm infomation.
instance = self.novaclient.get_server(context, instance_uuid)
vm_state = getattr(instance, 'OS-EXT-STS:vm_state')
if vm_state == 'stopped':
self.novaclient.start_server(context, instance.id)
else:
msg = _("Invalid state for Instance %(instance)s. Expected state: "
"'STOPPED', Actual state: '%(actual_state)s'") % {
'instance': instance_uuid,
'actual_state': vm_state
}
raise exception.InstanceRecoveryFailureException(message=msg)
class ConfirmInstanceActiveTask(base.MasakariTask):
def __init__(self, novaclient):
requires = ["instance_uuid"]
super(ConfirmInstanceActiveTask, self).__init__(addons=[ACTION],
requires=requires)
self.novaclient = novaclient
def execute(self, context, instance_uuid):
def _wait_for_active():
new_instance = self.novaclient.get_server(context, instance_uuid)
vm_state = getattr(new_instance, 'OS-EXT-STS:vm_state')
if vm_state == 'active':
raise loopingcall.LoopingCallDone()
periodic_call = loopingcall.FixedIntervalLoopingCall(
_wait_for_active)
try:
# add a timeout to the periodic call.
periodic_call.start(interval=CONF.verify_interval)
etimeout.with_timeout(CONF.wait_period_after_power_on,
periodic_call.wait)
except etimeout.Timeout:
msg = _("Failed to start instance %(instance)s") % {
'instance': instance_uuid
}
raise exception.InstanceRecoveryFailureException(message=msg)
finally:
# stop the periodic call, in case of exceptions or Timeout.
periodic_call.stop()
def get_instance_recovery_flow(novaclient, process_what):
"""Constructs and returns the engine entrypoint flow.
This flow will do the following:
1. Stop the instance
2. Start the instance.
3. Confirm instance is in active state.
"""
flow_name = ACTION.replace(":", "_") + "_engine"
instance_recovery_workflow = linear_flow.Flow(flow_name)
instance_recovery_workflow.add(StopInstanceTask(novaclient),
StartInstanceTask(novaclient),
ConfirmInstanceActiveTask(novaclient))
return taskflow.engines.load(instance_recovery_workflow,
store=process_what)

View File

@ -0,0 +1,102 @@
# Copyright 2016 NTT DATA
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from eventlet import timeout as etimeout
from oslo_log import log as logging
from oslo_service import loopingcall
import taskflow.engines
from taskflow.patterns import linear_flow
import masakari.conf
from masakari.engine.drivers.taskflow import base
from masakari import exception
from masakari.i18n import _, _LI
CONF = masakari.conf.CONF
LOG = logging.getLogger(__name__)
ACTION = "process:recovery"
class DisableComputeNodeTask(base.MasakariTask):
def __init__(self, novaclient):
requires = ["process_name", "host_name"]
super(DisableComputeNodeTask, self).__init__(addons=[ACTION],
requires=requires)
self.novaclient = novaclient
def execute(self, context, process_name, host_name):
if not self.novaclient.is_service_down(context, host_name,
process_name):
# disable compute node on given host
self.novaclient.enable_disable_service(context, host_name)
else:
LOG.info(_LI("Skipping recovery for process: %s as it is "
"already disabled."),
process_name)
class ConfirmComputeNodeDisabledTask(base.MasakariTask):
def __init__(self, novaclient):
requires = ["process_name", "host_name"]
super(ConfirmComputeNodeDisabledTask, self).__init__(addons=[ACTION],
requires=requires)
self.novaclient = novaclient
def execute(self, context, process_name, host_name):
def _wait_for_disable():
service_disabled = self.novaclient.is_service_down(
context, host_name, process_name)
if service_disabled:
raise loopingcall.LoopingCallDone()
periodic_call = loopingcall.FixedIntervalLoopingCall(
_wait_for_disable)
try:
# add a timeout to the periodic call.
periodic_call.start(interval=CONF.verify_interval)
etimeout.with_timeout(CONF.wait_period_after_service_disabled,
periodic_call.wait)
except etimeout.Timeout:
msg = _("Failed to disable service %(process_name)s") % {
'process_name': process_name
}
raise exception.ProcessRecoveryFailureException(message=msg)
finally:
# stop the periodic call, in case of exceptions or Timeout.
periodic_call.stop()
def get_compute_process_recovery_flow(novaclient, process_what):
"""Constructs and returns the engine entrypoint flow.
This flow will do the following:
1. Disable nova-compute process
2. Confirm nova-compute process is disabled
"""
flow_name = ACTION.replace(":", "_") + "_engine"
compute_process_recovery_workflow = linear_flow.Flow(flow_name)
compute_process_recovery_workflow.add(
DisableComputeNodeTask(novaclient),
ConfirmComputeNodeDisabledTask(novaclient))
return taskflow.engines.load(compute_process_recovery_workflow,
store=process_what)

View File

@ -28,7 +28,7 @@ import oslo_messaging as messaging
import masakari.conf
from masakari.engine import driver
from masakari import exception
from masakari.i18n import _, _LI, _LW
from masakari.i18n import _LI, _LW
from masakari import manager
from masakari import objects
from masakari.objects import fields
@ -52,12 +52,115 @@ class MasakariManager(manager.Manager):
self.driver = driver.load_masakari_driver(masakari_driver)
def _handle_notification_type_process(self, context, notification):
notification_status = fields.NotificationStatus.FINISHED
notification_event = notification.payload.get('event')
process_name = notification.payload.get('process_name')
if notification_event.upper() == 'STARTED':
LOG.info(_LI("Notification type '%(type)s' received for host "
"'%(host_uuid)s': '%(process_name)s' has been "
"%(event)s."), {
'type': notification.type,
'host_uuid': notification.source_host_uuid,
'process_name': process_name,
'event': notification_event
})
elif notification_event.upper() == 'STOPPED':
host_obj = objects.Host.get_by_uuid(
context, notification.source_host_uuid)
host_name = host_obj.name
# Mark host on_maintenance mode as True
update_data = {
'on_maintenance': True,
}
host_obj.update(update_data)
host_obj.save()
try:
self.driver.execute_process_failure(
context, process_name, host_name,
notification.notification_uuid)
except exception.SkipProcessRecoveryException:
notification_status = fields.NotificationStatus.FINISHED
except (exception.MasakariException,
exception.ProcessRecoveryFailureException):
notification_status = fields.NotificationStatus.ERROR
else:
LOG.warning(_LW("Invalid event: %(event)s received for "
"notification type: %(notification_type)s"), {
'event': notification_event,
'notification_type': notification.type
})
notification_status = fields.NotificationStatus.IGNORED
return notification_status
def _handle_notification_type_instance(self, context, notification):
notification_status = fields.NotificationStatus.FINISHED
try:
self.driver.execute_instance_failure(
context, notification.payload.get('instance_uuid'),
notification.notification_uuid)
except exception.SkipInstanceRecoveryException:
notification_status = fields.NotificationStatus.FINISHED
except (exception.MasakariException,
exception.InstanceRecoveryFailureException):
notification_status = fields.NotificationStatus.ERROR
return notification_status
def _handle_notification_type_host(self, context, notification):
notification_status = fields.NotificationStatus.FINISHED
notification_event = notification.payload.get('event')
if notification_event.upper() == 'STARTED':
LOG.info(_LI("Notification type '%(type)s' received for host "
"'%(host_uuid)s' has been %(event)s."), {
'type': notification.type,
'host_uuid': notification.source_host_uuid,
'event': notification_event
})
elif notification_event.upper() == 'STOPPED':
host_obj = objects.Host.get_by_uuid(
context, notification.source_host_uuid)
host_name = host_obj.name
recovery_method = host_obj.failover_segment.recovery_method
# Mark host on_maintenance mode as True
update_data = {
'on_maintenance': True,
}
host_obj.update(update_data)
host_obj.save()
try:
self.driver.execute_host_failure(
context, host_name, recovery_method,
notification.notification_uuid)
except (exception.MasakariException,
exception.AutoRecoveryFailureException):
notification_status = fields.NotificationStatus.ERROR
else:
LOG.warning(_LW("Invalid event: %(event)s received for "
"notification type: %(type)s"), {
'event': notification_event,
'type': notification.type
})
notification_status = fields.NotificationStatus.IGNORED
return notification_status
def process_notification(self, context, notification=None):
"""Processes the notification"""
@utils.synchronized(notification.source_host_uuid)
def do_process_notification(notification):
LOG.info(_LI('Processing notification %s'),
notification.notification_uuid)
LOG.info(_LI('Processing notification %(notification_uuid)s of '
'type: %(type)s'), {
'notification_uuid': notification.notification_uuid,
'type': notification.type
})
update_data = {
'status': fields.NotificationStatus.RUNNING,
@ -65,52 +168,15 @@ class MasakariManager(manager.Manager):
notification.update(update_data)
notification.save()
notification_status = fields.NotificationStatus.FINISHED
if notification.type == fields.NotificationType.PROCESS:
# TODO(Dinesh_Bhor) Execute workflow for process-failure
# notification.
raise NotImplementedError(_("Flow not implemented for "
"notification type"),
notification.type)
notification_status = self._handle_notification_type_process(
context, notification)
elif notification.type == fields.NotificationType.VM:
# TODO(Dinesh_Bhor) Execute workflow for instnace-failure
# notification.
raise NotImplementedError(_("Flow not implemented for "
"notification type"),
notification.type)
notification_status = self._handle_notification_type_instance(
context, notification)
elif notification.type == fields.NotificationType.COMPUTE_HOST:
notification_event = notification.payload.get('event')
if notification_event.upper() == 'STARTED':
LOG.info(_LI("Notification event: '%(event)s' received "
"for host: '%(host_uuid)s'."), {
'event': notification_event,
'host_uuid': notification.source_host_uuid
})
notification_status = fields.NotificationStatus.FINISHED
elif notification_event.upper() == 'STOPPED':
host_obj = objects.Host.get_by_uuid(
context, notification.source_host_uuid)
host_name = host_obj.name
recovery_method = host_obj.failover_segment.recovery_method
# Mark host on_maintenance mode as True
update_data = {
'on_maintenance': True,
}
host_obj.update(update_data)
host_obj.save()
try:
self.driver.execute_host_failure(
context, host_name,
recovery_method, notification.notification_uuid)
except (exception.MasakariException,
exception.AutoRecoveryFailureException):
notification_status = fields.NotificationStatus.ERROR
else:
LOG.warning(_LW("Invalid event: %(event)s received for "
"notification: %(notification_uuid)s"), {
'event': notification_event,
'notification_uuid': notification.notification_uuid})
notification_status = fields.NotificationStatus.IGNORED
notification_status = self._handle_notification_type_host(
context, notification)
LOG.info(_LI("Notification %(notification_uuid)s exits with "
"%(status)s."), {

View File

@ -168,6 +168,11 @@ class APITimeout(APIException):
msg_fmt = _("Timeout while requesting %(service)s API.")
class Conflict(MasakariException):
msg_fmt = _("Conflict")
code = 409
class Invalid(MasakariException):
msg_fmt = _("Bad Request - Invalid Parameters")
code = 400
@ -303,3 +308,19 @@ class HostOnMaintenanceError(Invalid):
class AutoRecoveryFailureException(MasakariException):
msg_fmt = _('Failed to execute auto recovery method.')
class InstanceRecoveryFailureException(MasakariException):
msg_fmt = _('Failed to execute instance recovery workflow.')
class SkipInstanceRecoveryException(MasakariException):
msg_fmt = _('Skiping execution of instance recovery workflow.')
class SkipProcessRecoveryException(MasakariException):
msg_fmt = _('Skiping execution of process recovery workflow.')
class ProcessRecoveryFailureException(MasakariException):
msg_fmt = _('Failed to execute process recovery workflow.')