Add host failure workflow

Implement taskflow for AUTO recovery method which will evacuate
instances from failed hosts.

Change-Id: I4ae6d4a12fe651f058d0341bedd3f8251a6cb40d
This commit is contained in:
dineshbhor 2016-10-17 20:45:44 +05:30
parent 484da30293
commit 686e9a7ced
5 changed files with 286 additions and 7 deletions

View File

@ -62,6 +62,15 @@ notification_opts = [
"the notification will be considered as duplicate and "
"it will be ignored."
),
cfg.IntOpt('wait_period_after_service_disabled',
default=180,
help='Wait until service is disabled'),
cfg.IntOpt('wait_period_after_evacuation',
default=90,
help='Wait until instance is evacuated'),
cfg.IntOpt('verify_interval',
default=1,
help='The monitoring interval for looping'),
]

View File

@ -19,8 +19,18 @@ Driver TaskFlowDriver:
Execute notification workflows using taskflow.
"""
from oslo_log import log as logging
from masakari.compute import nova
from masakari.engine import driver
from masakari.engine.drivers.taskflow import base
from masakari.engine.drivers.taskflow import host_failure
from masakari import exception
from masakari.i18n import _
from masakari.objects import fields
LOG = logging.getLogger(__name__)
class TaskFlowDriver(driver.NotificationDriver):
@ -29,7 +39,43 @@ class TaskFlowDriver(driver.NotificationDriver):
def execute_host_failure(self, context, host_name, recovery_method,
notification_uuid):
raise NotImplementedError()
novaclient = nova.API()
# get flow for host failure
process_what = {
'context': context,
'host_name': host_name
}
try:
if recovery_method == fields.FailoverSegmentRecoveryMethod.AUTO:
flow_engine = host_failure.get_auto_flow(novaclient,
process_what)
elif recovery_method == (
fields.FailoverSegmentRecoveryMethod.RESERVED_HOST):
raise NotImplementedError(_("Flow not implemented for "
"recovery_method"),
recovery_method)
elif recovery_method == (
fields.FailoverSegmentRecoveryMethod.AUTO_PRIORITY):
raise NotImplementedError(_("Flow not implemented for "
"recovery_method"),
recovery_method)
elif recovery_method == (
fields.FailoverSegmentRecoveryMethod.RH_PRIORITY):
raise NotImplementedError(_("Flow not implemented for "
"recovery_method"),
recovery_method)
except Exception:
msg = (_('Failed to create host failure flow.'),
notification_uuid)
LOG.exception(msg)
raise exception.MasakariException(msg)
# Attaching this listener will capture all of the notifications that
# taskflow sends out and redirect them to a more useful log for
# masakari's debugging (or error reporting) usage.
with base.DynamicLogListener(flow_engine, logger=LOG):
flow_engine.run()
def execute_instance_failure(self, context, instance_uuid,
notification_uuid):

View File

@ -0,0 +1,172 @@
# Copyright 2016 NTT DATA
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
from eventlet import timeout as etimeout
from oslo_log import log as logging
from oslo_service import loopingcall
from oslo_utils import strutils
import taskflow.engines
from taskflow.patterns import linear_flow
import masakari.conf
from masakari.engine.drivers.taskflow import base
from masakari import exception
from masakari.i18n import _, _LI
CONF = masakari.conf.CONF
LOG = logging.getLogger(__name__)
ACTION = 'instance:evacuate'
class DisableComputeServiceTask(base.MasakariTask):
def __init__(self, novaclient):
requires = ["host_name"]
super(DisableComputeServiceTask, self).__init__(addons=[ACTION],
requires=requires)
self.novaclient = novaclient
def execute(self, context, host_name):
self.novaclient.enable_disable_service(context, host_name)
# Sleep until nova-compute service is marked as disabled.
msg = _LI("Sleeping %(wait)s sec before starting recovery "
"thread until nova recognizes the node down.")
LOG.info(msg, {'wait': CONF.wait_period_after_service_disabled})
eventlet.sleep(CONF.wait_period_after_service_disabled)
class PrepareHAEnabledInstancesTask(base.MasakariTask):
"""Get all HA_Enabled instances."""
default_provides = set(["ha_enabled_instances"])
def __init__(self, novaclient):
requires = ["host_name"]
super(PrepareHAEnabledInstancesTask, self).__init__(addons=[ACTION],
requires=requires)
self.novaclient = novaclient
def execute(self, context, host_name):
all_instances = self.novaclient.get_servers(context, host_name)
ha_enabled_instances = (
[instance for instance in all_instances
if strutils.bool_from_string(instance.metadata.get('HA_Enabled',
False),
strict=True)])
return {
"ha_enabled_instances": ha_enabled_instances,
}
class AutoEvacuationInstancesTask(base.MasakariTask):
default_provides = set(["ha_enabled_instances"])
def __init__(self, novaclient):
requires = ["ha_enabled_instances"]
super(AutoEvacuationInstancesTask, self).__init__(addons=[ACTION],
requires=requires)
self.novaclient = novaclient
def execute(self, context, ha_enabled_instances):
for instance in ha_enabled_instances:
vm_state = getattr(instance, "OS-EXT-STS:vm_state")
if vm_state in ['active', 'error', 'resized', 'stopped']:
# Evacuate API only evacuates an instance in
# active, stop or error state. If an instance is in
# resized status, masakari resets the instance
# state to *error* to evacuate it.
if vm_state == 'resized':
self.novaclient.reset_instance_state(
context, instance.id)
# evacuate the instances to new host
self.novaclient.evacuate_instance(context, instance.id)
return {
"ha_enabled_instances": ha_enabled_instances,
}
class ConfirmEvacuationTask(base.MasakariTask):
def __init__(self, novaclient):
requires = ["ha_enabled_instances", "host_name"]
super(ConfirmEvacuationTask, self).__init__(addons=[ACTION],
requires=requires)
self.novaclient = novaclient
def execute(self, context, ha_enabled_instances, host_name):
failed_evacuation_instances = []
for instance in ha_enabled_instances:
def _wait_for_evacuation():
new_instance = self.novaclient.get_server(context, instance.id)
instance_host = getattr(new_instance,
"OS-EXT-SRV-ATTR:hypervisor_hostname")
old_vm_state = getattr(instance, "OS-EXT-STS:vm_state")
new_vm_state = getattr(new_instance,
"OS-EXT-STS:vm_state")
if instance_host != host_name:
if ((old_vm_state == 'error' and
new_vm_state == 'active') or
old_vm_state == new_vm_state):
raise loopingcall.LoopingCallDone()
periodic_call = loopingcall.FixedIntervalLoopingCall(
_wait_for_evacuation)
try:
# add a timeout to the periodic call.
periodic_call.start(interval=CONF.verify_interval)
etimeout.with_timeout(CONF.wait_period_after_evacuation,
periodic_call.wait)
except etimeout.Timeout:
# Instance is not evacuated in the expected time_limit.
failed_evacuation_instances.append(instance.id)
finally:
# stop the periodic call, in case of exceptions or Timeout.
periodic_call.stop()
if failed_evacuation_instances:
msg = _("Failed to evacuate instances %(instances)s from "
"host %(host_name)s.") % {
'instances': failed_evacuation_instances,
'host_name': host_name
}
raise exception.AutoRecoveryFailureException(message=msg)
def get_auto_flow(novaclient, process_what):
"""Constructs and returns the engine entrypoint flow.
This flow will do the following:
1. Disable compute service on source host
2. Get all HA_Enabled instances.
3. Evacuate all the HA_Enabled instances.
4. Confirm evacuation of instances.
"""
flow_name = ACTION.replace(":", "_") + "_engine"
auto_evacuate_flow = linear_flow.Flow(flow_name)
auto_evacuate_flow.add(DisableComputeServiceTask(novaclient),
PrepareHAEnabledInstancesTask(novaclient),
AutoEvacuationInstancesTask(novaclient),
ConfirmEvacuationTask(novaclient))
return taskflow.engines.load(auto_evacuate_flow, store=process_what)

View File

@ -27,8 +27,10 @@ import oslo_messaging as messaging
import masakari.conf
from masakari.engine import driver
from masakari.i18n import _LI
from masakari import exception
from masakari.i18n import _, _LI, _LW
from masakari import manager
from masakari import objects
from masakari.objects import fields
from masakari import utils
@ -63,17 +65,63 @@ class MasakariManager(manager.Manager):
notification.update(update_data)
notification.save()
notification_status = fields.NotificationStatus.FINISHED
if notification.type == fields.NotificationType.PROCESS:
# TODO(Dinesh_Bhor) Execute workflow for process-failure
# notification.
pass
raise NotImplementedError(_("Flow not implemented for "
"notification type"),
notification.type)
elif notification.type == fields.NotificationType.VM:
# TODO(Dinesh_Bhor) Execute workflow for instnace-failure
# notification.
pass
raise NotImplementedError(_("Flow not implemented for "
"notification type"),
notification.type)
elif notification.type == fields.NotificationType.COMPUTE_HOST:
# TODO(Dinesh_Bhor) Execute workflow for host-failure
# notification.
pass
notification_event = notification.payload.get('event')
if notification_event.upper() == 'STARTED':
LOG.info(_LI("Notification event: '%(event)s' received "
"for host: '%(host_uuid)s'."), {
'event': notification_event,
'host_uuid': notification.source_host_uuid
})
notification_status = fields.NotificationStatus.FINISHED
elif notification_event.upper() == 'STOPPED':
host_obj = objects.Host.get_by_uuid(
context, notification.source_host_uuid)
host_name = host_obj.name
recovery_method = host_obj.failover_segment.recovery_method
# Mark host on_maintenance mode as True
update_data = {
'on_maintenance': True,
}
host_obj.update(update_data)
host_obj.save()
try:
self.driver.execute_host_failure(
context, host_name,
recovery_method, notification.notification_uuid)
except (exception.MasakariException,
exception.AutoRecoveryFailureException):
notification_status = fields.NotificationStatus.ERROR
else:
LOG.warning(_LW("Invalid event: %(event)s received for "
"notification: %(notification_uuid)s"), {
'event': notification_event,
'notification_uuid': notification.notification_uuid})
notification_status = fields.NotificationStatus.IGNORED
LOG.info(_LI("Notification %(notification_uuid)s exits with "
"%(status)s."), {
'notification_uuid': notification.notification_uuid,
'status': notification_status
})
update_data = {
'status': notification_status
}
notification.update(update_data)
notification.save()
do_process_notification(notification)

View File

@ -299,3 +299,7 @@ class DuplicateNotification(Invalid):
class HostOnMaintenanceError(Invalid):
msg_fmt = _('Host %(host_name)s is already under maintenance.')
code = 409
class AutoRecoveryFailureException(MasakariException):
msg_fmt = _('Failed to execute auto recovery method.')