Merge "Fix race condition between evacuation and its confirmation"

This commit is contained in:
Jenkins 2017-06-23 10:21:37 +00:00 committed by Gerrit Code Review
commit 166abca802
7 changed files with 204 additions and 151 deletions

View File

@ -39,7 +39,7 @@ CONF.import_group('keystone_authtoken', 'keystonemiddleware.auth_token')
LOG = logging.getLogger(__name__)
NOVA_API_VERSION = "2.1"
NOVA_API_VERSION = "2.9"
nova_extensions = [ext for ext in
nova_client.discover_extensions(NOVA_API_VERSION)
@ -221,3 +221,19 @@ class API(object):
"aggregate '%(aggregate_name)s'.")
LOG.info(msg, {'host_name': host, 'aggregate_name': aggregate.name})
return nova.aggregates.add_host(aggregate.id, host)
@translate_nova_exception
def lock_server(self, context, uuid):
"""Lock a server."""
nova = novaclient(context)
msg = ('Call lock server command for instance %(uuid)s')
LOG.info(msg, {'uuid': uuid})
return nova.servers.lock(uuid)
@translate_nova_exception
def unlock_server(self, context, uuid):
"""Unlock a server."""
nova = novaclient(context)
msg = ('Call unlock server command for instance %(uuid)s')
LOG.info(msg, {'uuid': uuid})
return nova.servers.unlock(uuid)

View File

@ -90,6 +90,12 @@ notification_opts = [
"generated_time, then it is considered that notification "
"is ignored by the messaging queue and will be processed "
"by 'process_unfinished_notifications' periodic task."),
cfg.IntOpt('host_failure_recovery_threads',
default=3,
min=1,
help="Number of threads to be used for evacuating and "
"confirming instances during execution of host_failure "
"workflow."),
]

View File

@ -14,6 +14,7 @@
# under the License.
import eventlet
from eventlet import greenpool
from eventlet import timeout as etimeout
from oslo_log import log as logging
@ -87,7 +88,6 @@ class PrepareHAEnabledInstancesTask(base.MasakariTask):
class EvacuateInstancesTask(base.MasakariTask):
default_provides = set(["instance_list"])
def __init__(self, novaclient):
requires = ["host_name", "instance_list"]
@ -95,9 +95,77 @@ class EvacuateInstancesTask(base.MasakariTask):
requires=requires)
self.novaclient = novaclient
def _evacuate_and_confirm(self, context, instance, host_name,
failed_evacuation_instances, reserved_host=None):
vm_state = getattr(instance, "OS-EXT-STS:vm_state")
if vm_state in ['active', 'error', 'resized', 'stopped']:
# Before locking the instance check whether it is already locked
# by user, if yes don't lock the instance
instance_already_locked = self.novaclient.get_server(
context, instance.id).locked
if not instance_already_locked:
# lock the instance so that until evacuation and confirmation
# is not complete, user won't be able to perform any actions
# on the instance.
self.novaclient.lock_server(context, instance.id)
def _wait_for_evacuation():
new_instance = self.novaclient.get_server(context, instance.id)
instance_host = getattr(new_instance,
"OS-EXT-SRV-ATTR:hypervisor_hostname")
old_vm_state = getattr(instance, "OS-EXT-STS:vm_state")
new_vm_state = getattr(new_instance, "OS-EXT-STS:vm_state")
if instance_host != host_name:
if ((old_vm_state == 'error' and
new_vm_state == 'active') or
old_vm_state == new_vm_state):
raise loopingcall.LoopingCallDone()
try:
# Nova evacuates an instance only when vm_state is in active,
# stopped or error state. If an instance is in resized
# vm_state, masakari resets the instance state to *error* so
# that the instance can be evacuated.
if vm_state == 'resized':
self.novaclient.reset_instance_state(context, instance.id)
# evacuate the instance
self.novaclient.evacuate_instance(
context, instance.id,
target=reserved_host.name if reserved_host else None)
periodic_call = loopingcall.FixedIntervalLoopingCall(
_wait_for_evacuation)
try:
# add a timeout to the periodic call.
periodic_call.start(interval=CONF.verify_interval)
etimeout.with_timeout(
CONF.wait_period_after_evacuation,
periodic_call.wait)
except etimeout.Timeout:
# Instance is not evacuated in the expected time_limit.
failed_evacuation_instances.append(instance.id)
finally:
# stop the periodic call, in case of exceptions or
# Timeout.
periodic_call.stop()
except Exception:
# Exception is raised while resetting instance state or
# evacuating the instance itself.
failed_evacuation_instances.append(instance.id)
finally:
if not instance_already_locked:
# Unlock the server after evacuation and confirmation
self.novaclient.unlock_server(context, instance.id)
def execute(self, context, host_name, instance_list, reserved_host=None):
def _do_evacuate(context, host_name, instance_list,
reserved_host=None):
failed_evacuation_instances = []
if reserved_host:
if CONF.host_failure.add_reserved_host_to_aggregate:
# Assign reserved_host to an aggregate to which the failed
@ -128,21 +196,21 @@ class EvacuateInstancesTask(base.MasakariTask):
reserved_host.reserved = False
reserved_host.save()
thread_pool = greenpool.GreenPool(
CONF.host_failure_recovery_threads)
for instance in instance_list:
vm_state = getattr(instance, "OS-EXT-STS:vm_state")
if vm_state in ['active', 'error', 'resized', 'stopped']:
# Evacuate API only evacuates an instance in
# active, stop or error state. If an instance is in
# resized status, masakari resets the instance
# state to *error* to evacuate it.
if vm_state == 'resized':
self.novaclient.reset_instance_state(
context, instance.id)
thread_pool.spawn_n(self._evacuate_and_confirm, context,
instance, host_name,
failed_evacuation_instances, reserved_host)
thread_pool.waitall()
# evacuate the instance
self.novaclient.evacuate_instance(
context, instance.id,
target=reserved_host.name if reserved_host else None)
if failed_evacuation_instances:
msg = _("Failed to evacuate instances %(instances)s from "
"host %(host_name)s.") % {
'instances': failed_evacuation_instances,
'host_name': host_name
}
raise exception.HostRecoveryFailureException(message=msg)
lock_name = reserved_host.name if reserved_host else None
@ -160,57 +228,6 @@ class EvacuateInstancesTask(base.MasakariTask):
# 'auto' as the selection of compute host will be decided by nova.
_do_evacuate(context, host_name, instance_list)
return {
"instance_list": instance_list,
}
class ConfirmEvacuationTask(base.MasakariTask):
def __init__(self, novaclient):
requires = ["instance_list", "host_name"]
super(ConfirmEvacuationTask, self).__init__(addons=[ACTION],
requires=requires)
self.novaclient = novaclient
def execute(self, context, instance_list, host_name):
failed_evacuation_instances = []
for instance in instance_list:
def _wait_for_evacuation():
new_instance = self.novaclient.get_server(context, instance.id)
instance_host = getattr(new_instance,
"OS-EXT-SRV-ATTR:hypervisor_hostname")
old_vm_state = getattr(instance, "OS-EXT-STS:vm_state")
new_vm_state = getattr(new_instance,
"OS-EXT-STS:vm_state")
if instance_host != host_name:
if ((old_vm_state == 'error' and
new_vm_state == 'active') or
old_vm_state == new_vm_state):
raise loopingcall.LoopingCallDone()
periodic_call = loopingcall.FixedIntervalLoopingCall(
_wait_for_evacuation)
try:
# add a timeout to the periodic call.
periodic_call.start(interval=CONF.verify_interval)
etimeout.with_timeout(CONF.wait_period_after_evacuation,
periodic_call.wait)
except etimeout.Timeout:
# Instance is not evacuated in the expected time_limit.
failed_evacuation_instances.append(instance.id)
finally:
# stop the periodic call, in case of exceptions or Timeout.
periodic_call.stop()
if failed_evacuation_instances:
msg = _("Failed to evacuate instances %(instances)s from "
"host %(host_name)s.") % {
'instances': failed_evacuation_instances,
'host_name': host_name
}
raise exception.HostRecoveryFailureException(message=msg)
def get_auto_flow(novaclient, process_what):
"""Constructs and returns the engine entrypoint flow.
@ -228,8 +245,7 @@ def get_auto_flow(novaclient, process_what):
auto_evacuate_flow.add(DisableComputeServiceTask(novaclient),
PrepareHAEnabledInstancesTask(novaclient),
EvacuateInstancesTask(novaclient),
ConfirmEvacuationTask(novaclient))
EvacuateInstancesTask(novaclient))
return taskflow.engines.load(auto_evacuate_flow, store=process_what)
@ -252,8 +268,7 @@ def get_rh_flow(novaclient, process_what):
rebind=['reserved_host_list'], provides='reserved_host'))
rh_flow.add(PrepareHAEnabledInstancesTask(novaclient),
EvacuateInstancesTask(novaclient),
ConfirmEvacuationTask(novaclient))
EvacuateInstancesTask(novaclient))
nested_flow.add(DisableComputeServiceTask(novaclient), rh_flow)

View File

@ -261,3 +261,23 @@ class NovaApiTestCase(test.TestCase):
mock_novaclient.assert_called_once_with(self.ctx)
mock_aggregates.add_host.assert_called_once_with(
mock_aggregate.id, 'fake_host')
@mock.patch('masakari.compute.nova.novaclient')
def test_lock_server(self, mock_novaclient):
uuid = uuidsentinel.fake_server
mock_servers = mock.MagicMock()
mock_novaclient.return_value = mock.MagicMock(servers=mock_servers)
self.api.lock_server(self.ctx, uuid)
mock_novaclient.assert_called_once_with(self.ctx)
mock_servers.lock.assert_called_once_with(uuidsentinel.fake_server)
@mock.patch('masakari.compute.nova.novaclient')
def test_unlock_server(self, mock_novaclient):
uuid = uuidsentinel.fake_server
mock_servers = mock.MagicMock()
mock_novaclient.return_value = mock.MagicMock(servers=mock_servers)
self.api.unlock_server(self.ctx, uuid)
mock_novaclient.assert_called_once_with(self.ctx)
mock_servers.unlock.assert_called_once_with(uuidsentinel.fake_server)

View File

@ -32,6 +32,9 @@ from masakari.tests.unit import fakes
CONF = conf.CONF
@mock.patch.object(nova.API, "enable_disable_service")
@mock.patch.object(nova.API, "lock_server")
@mock.patch.object(nova.API, "unlock_server")
class HostFailureTestCase(test.TestCase):
def setUp(self):
@ -58,14 +61,11 @@ class HostFailureTestCase(test.TestCase):
getattr(instance,
'OS-EXT-SRV-ATTR:hypervisor_hostname'))
def _test_disable_compute_service(self):
def _test_disable_compute_service(self, mock_enable_disable):
task = host_failure.DisableComputeServiceTask(self.novaclient)
with mock.patch.object(
self.novaclient,
"enable_disable_service") as mock_enable_disable_service:
task.execute(self.ctxt, self.instance_host)
task.execute(self.ctxt, self.instance_host)
mock_enable_disable_service.assert_called_once_with(
mock_enable_disable.assert_called_once_with(
self.ctxt, self.instance_host)
def _test_instance_list(self):
@ -83,33 +83,26 @@ class HostFailureTestCase(test.TestCase):
return instance_list
def _evacuate_instances(self, instance_list, reserved_host=None):
def _evacuate_instances(self, instance_list, mock_enable_disable,
reserved_host=None):
task = host_failure.EvacuateInstancesTask(self.novaclient)
if reserved_host:
with mock.patch.object(
self.novaclient,
"enable_disable_service") as mock_enable_disable_service:
instance_list = task.execute(self.ctxt, self.instance_host,
instance_list['instance_list'],
reserved_host=reserved_host)
task.execute(self.ctxt, self.instance_host,
instance_list['instance_list'],
reserved_host=reserved_host)
mock_enable_disable_service.assert_called_once_with(
self.ctxt, reserved_host.name, enable=True)
self.assertTrue(mock_enable_disable.called)
else:
instance_list = task.execute(
task.execute(
self.ctxt, self.instance_host, instance_list['instance_list'])
return instance_list
def _test_confirm_evacuate_task(self, instance_list):
task = host_failure.ConfirmEvacuationTask(self.novaclient)
task.execute(self.ctxt, instance_list['instance_list'],
self.instance_host)
# make sure instance is active and has different host
self._verify_instance_evacuated()
@mock.patch('masakari.compute.nova.novaclient')
def test_host_failure_flow_for_auto_recovery(self, _mock_novaclient):
def test_host_failure_flow_for_auto_recovery(
self, _mock_novaclient, mock_unlock, mock_lock,
mock_enable_disable):
_mock_novaclient.return_value = self.fake_client
self.override_config("evacuate_all_instances",
True, "host_failure")
@ -120,20 +113,18 @@ class HostFailureTestCase(test.TestCase):
self.fake_client.servers.create(id="2", host=self.instance_host)
# execute DisableComputeServiceTask
self._test_disable_compute_service()
self._test_disable_compute_service(mock_enable_disable)
# execute PrepareHAEnabledInstancesTask
instance_list = self._test_instance_list()
# execute EvacuateInstancesTask
instance_list = self._evacuate_instances(instance_list)
# execute ConfirmEvacuationTask
self._test_confirm_evacuate_task(instance_list)
self._evacuate_instances(instance_list, mock_enable_disable)
@mock.patch('masakari.compute.nova.novaclient')
def test_host_failure_flow_for_reserved_host_recovery(
self, _mock_novaclient):
self, _mock_novaclient, mock_unlock, mock_lock,
mock_enable_disable):
_mock_novaclient.return_value = self.fake_client
self.override_config("evacuate_all_instances",
True, "host_failure")
@ -150,50 +141,44 @@ class HostFailureTestCase(test.TestCase):
hosts=[self.instance_host])
# execute DisableComputeServiceTask
self._test_disable_compute_service()
self._test_disable_compute_service(mock_enable_disable)
# execute PrepareHAEnabledInstancesTask
instance_list = self._test_instance_list()
# execute EvacuateInstancesTask
with mock.patch.object(host_obj.Host, "save") as mock_save:
instance_list = self._evacuate_instances(
instance_list, reserved_host=reserved_host)
self._evacuate_instances(
instance_list, mock_enable_disable,
reserved_host=reserved_host)
self.assertEqual(1, mock_save.call_count)
self.assertIn(reserved_host.name,
self.fake_client.aggregates.get('1').hosts)
# execute ConfirmEvacuationTask
self._test_confirm_evacuate_task(instance_list)
@mock.patch('masakari.compute.nova.novaclient')
def test_evacuate_instances_task(self, _mock_novaclient):
def test_evacuate_instances_task(self, _mock_novaclient, mock_unlock,
mock_lock, mock_enable_disable):
_mock_novaclient.return_value = self.fake_client
# create test data
self.fake_client.servers.create(id="1", host=self.instance_host,
ha_enabled=True)
vm_state="error", ha_enabled=True)
self.fake_client.servers.create(id="2", host=self.instance_host,
ha_enabled=True)
vm_state="error", ha_enabled=True)
# execute DisableComputeServiceTask
self._test_disable_compute_service()
self._test_disable_compute_service(mock_enable_disable)
# execute PrepareHAEnabledInstancesTask
instance_list = self._test_instance_list()
# execute EvacuateInstancesTask
task = host_failure.EvacuateInstancesTask(self.novaclient)
# mock evacuate method of FakeNovaClient to confirm that evacuate
# method is called.
with mock.patch.object(fakes.FakeNovaClient.ServerManager,
"evacuate") as mock_evacuate:
task.execute(self.ctxt, self.instance_host,
instance_list['instance_list'])
self.assertEqual(2, mock_evacuate.call_count)
self._evacuate_instances(instance_list, mock_enable_disable)
@mock.patch('masakari.compute.nova.novaclient')
def test_host_failure_flow_no_ha_enabled_instances(self, _mock_novaclient):
def test_host_failure_flow_no_ha_enabled_instances(
self, _mock_novaclient, mock_unlock, mock_lock,
mock_enable_disable):
_mock_novaclient.return_value = self.fake_client
# create test data
@ -201,7 +186,7 @@ class HostFailureTestCase(test.TestCase):
self.fake_client.servers.create(id="2", host=self.instance_host)
# execute DisableComputeServiceTask
self._test_disable_compute_service()
self._test_disable_compute_service(mock_enable_disable)
# execute PrepareHAEnabledInstancesTask
task = host_failure.PrepareHAEnabledInstancesTask(self.novaclient)
@ -209,21 +194,19 @@ class HostFailureTestCase(test.TestCase):
self.ctxt, self.instance_host)
@mock.patch('masakari.compute.nova.novaclient')
def test_host_failure_flow_evacuation_failed(self, _mock_novaclient):
def test_host_failure_flow_evacuation_failed(
self, _mock_novaclient, mock_unlock, mock_lock,
mock_enable_disable):
_mock_novaclient.return_value = self.fake_client
# create ha_enabled test data
server = self.fake_client.servers.create(id="1",
server = self.fake_client.servers.create(id="1", vm_state='active',
host=self.instance_host,
ha_enabled=True)
instance_list = {
"instance_list": self.fake_client.servers.list()
}
# execute EvacuateInstancesTask
instance_list = self._evacuate_instances(
instance_list)
def fake_get_server(context, host):
# assume that while evacuating instance goes into error state
fake_server = copy.deepcopy(server)
@ -231,15 +214,15 @@ class HostFailureTestCase(test.TestCase):
return fake_server
with mock.patch.object(self.novaclient, "get_server", fake_get_server):
# execute ConfirmEvacuationTask
task = host_failure.ConfirmEvacuationTask(self.novaclient)
# execute EvacuateInstancesTask
self.assertRaises(
exception.HostRecoveryFailureException, task.execute,
self.ctxt, instance_list['instance_list'],
self.instance_host)
exception.HostRecoveryFailureException,
self._evacuate_instances, instance_list, mock_enable_disable)
@mock.patch('masakari.compute.nova.novaclient')
def test_host_failure_flow_resized_instance(self, _mock_novaclient):
def test_host_failure_flow_resized_instance(
self, _mock_novaclient, mock_unlock, mock_lock,
mock_enable_disable):
_mock_novaclient.return_value = self.fake_client
# create ha_enabled test data
@ -254,14 +237,12 @@ class HostFailureTestCase(test.TestCase):
}
# execute EvacuateInstancesTask
instance_list = self._evacuate_instances(
instance_list)
# execute ConfirmEvacuationTask
self._test_confirm_evacuate_task(instance_list)
self._evacuate_instances(instance_list, mock_enable_disable)
@mock.patch('masakari.compute.nova.novaclient')
def test_host_failure_flow_shutdown_instance(self, _mock_novaclient):
def test_host_failure_flow_shutdown_instance(
self, _mock_novaclient, mock_unlock, mock_lock,
mock_enable_disable):
_mock_novaclient.return_value = self.fake_client
# create ha_enabled test data
@ -276,14 +257,12 @@ class HostFailureTestCase(test.TestCase):
}
# execute EvacuateInstancesTask
instance_list = self._evacuate_instances(
instance_list)
# execute ConfirmEvacuationTask
self._test_confirm_evacuate_task(instance_list)
self._evacuate_instances(instance_list, mock_enable_disable)
@mock.patch('masakari.compute.nova.novaclient')
def test_host_failure_flow_instance_in_error(self, _mock_novaclient):
def test_host_failure_flow_instance_in_error(
self, _mock_novaclient, mock_unlock, mock_lock,
mock_enable_disable):
_mock_novaclient.return_value = self.fake_client
# create ha_enabled test data
@ -298,20 +277,18 @@ class HostFailureTestCase(test.TestCase):
}
# execute EvacuateInstancesTask
instance_list = self._evacuate_instances(
instance_list)
# execute ConfirmEvacuationTask
self._test_confirm_evacuate_task(instance_list)
self._evacuate_instances(instance_list, mock_enable_disable)
@mock.patch('masakari.compute.nova.novaclient')
def test_host_failure_flow_no_instances_on_host(self, _mock_novaclient):
def test_host_failure_flow_no_instances_on_host(
self, _mock_novaclient, mock_unlock, mock_lock,
mock_enable_disable):
_mock_novaclient.return_value = self.fake_client
self.override_config("evacuate_all_instances",
True, "host_failure")
# execute DisableComputeServiceTask
self._test_disable_compute_service()
self._test_disable_compute_service(mock_enable_disable)
# execute PrepareHAEnabledInstancesTask
task = host_failure.PrepareHAEnabledInstancesTask(self.novaclient)

View File

@ -24,13 +24,14 @@ NOW = timeutils.utcnow().replace(microsecond=0)
class FakeNovaClient(object):
class Server(object):
def __init__(self, id=None, uuid=None, host=None, vm_state=None,
ha_enabled=None):
ha_enabled=None, locked=False):
self.id = id
self.uuid = uuid or uuidutils.generate_uuid()
self.host = host
setattr(self, 'OS-EXT-SRV-ATTR:hypervisor_hostname', host)
setattr(self, 'OS-EXT-STS:vm_state', vm_state)
self.metadata = {"HA_Enabled": ha_enabled}
self.locked = locked
class ServerManager(object):
def __init__(self):

View File

@ -0,0 +1,18 @@
---
fixes:
- |
Fixes `bug 1693728`_ which will fix the race condition where after
evacuation of an instance to other host user might perform some actions on
that instance which gives wrong instance vm_state to ConfirmEvacuationTask
that results into notification failure.
To fix this issue, following config option is added under ``DEFAULT``
section in 'masakari.conf' file::
[DEFAULT]
host_failure_recovery_threads = 3
This config option decides the number of threads going to be used for
evacuating the instances.
.. _`bug 1693728`: https://bugs.launchpad.net/masakari/+bug/1693728