diff --git a/masakari/compute/nova.py b/masakari/compute/nova.py index da9a6455..56a12c67 100644 --- a/masakari/compute/nova.py +++ b/masakari/compute/nova.py @@ -39,7 +39,7 @@ CONF.import_group('keystone_authtoken', 'keystonemiddleware.auth_token') LOG = logging.getLogger(__name__) -NOVA_API_VERSION = "2.1" +NOVA_API_VERSION = "2.9" nova_extensions = [ext for ext in nova_client.discover_extensions(NOVA_API_VERSION) @@ -221,3 +221,19 @@ class API(object): "aggregate '%(aggregate_name)s'.") LOG.info(msg, {'host_name': host, 'aggregate_name': aggregate.name}) return nova.aggregates.add_host(aggregate.id, host) + + @translate_nova_exception + def lock_server(self, context, uuid): + """Lock a server.""" + nova = novaclient(context) + msg = ('Call lock server command for instance %(uuid)s') + LOG.info(msg, {'uuid': uuid}) + return nova.servers.lock(uuid) + + @translate_nova_exception + def unlock_server(self, context, uuid): + """Unlock a server.""" + nova = novaclient(context) + msg = ('Call unlock server command for instance %(uuid)s') + LOG.info(msg, {'uuid': uuid}) + return nova.servers.unlock(uuid) diff --git a/masakari/conf/engine.py b/masakari/conf/engine.py index 4e4c65d9..225360a6 100644 --- a/masakari/conf/engine.py +++ b/masakari/conf/engine.py @@ -90,6 +90,12 @@ notification_opts = [ "generated_time, then it is considered that notification " "is ignored by the messaging queue and will be processed " "by 'process_unfinished_notifications' periodic task."), + cfg.IntOpt('host_failure_recovery_threads', + default=3, + min=1, + help="Number of threads to be used for evacuating and " + "confirming instances during execution of host_failure " + "workflow."), ] diff --git a/masakari/engine/drivers/taskflow/host_failure.py b/masakari/engine/drivers/taskflow/host_failure.py index 541eac0b..3b1e003a 100644 --- a/masakari/engine/drivers/taskflow/host_failure.py +++ b/masakari/engine/drivers/taskflow/host_failure.py @@ -14,6 +14,7 @@ # under the License. import eventlet +from eventlet import greenpool from eventlet import timeout as etimeout from oslo_log import log as logging @@ -87,7 +88,6 @@ class PrepareHAEnabledInstancesTask(base.MasakariTask): class EvacuateInstancesTask(base.MasakariTask): - default_provides = set(["instance_list"]) def __init__(self, novaclient): requires = ["host_name", "instance_list"] @@ -95,9 +95,77 @@ class EvacuateInstancesTask(base.MasakariTask): requires=requires) self.novaclient = novaclient + def _evacuate_and_confirm(self, context, instance, host_name, + failed_evacuation_instances, reserved_host=None): + vm_state = getattr(instance, "OS-EXT-STS:vm_state") + if vm_state in ['active', 'error', 'resized', 'stopped']: + + # Before locking the instance check whether it is already locked + # by user, if yes don't lock the instance + instance_already_locked = self.novaclient.get_server( + context, instance.id).locked + + if not instance_already_locked: + # lock the instance so that until evacuation and confirmation + # is not complete, user won't be able to perform any actions + # on the instance. + self.novaclient.lock_server(context, instance.id) + + def _wait_for_evacuation(): + new_instance = self.novaclient.get_server(context, instance.id) + instance_host = getattr(new_instance, + "OS-EXT-SRV-ATTR:hypervisor_hostname") + old_vm_state = getattr(instance, "OS-EXT-STS:vm_state") + new_vm_state = getattr(new_instance, "OS-EXT-STS:vm_state") + + if instance_host != host_name: + if ((old_vm_state == 'error' and + new_vm_state == 'active') or + old_vm_state == new_vm_state): + raise loopingcall.LoopingCallDone() + + try: + # Nova evacuates an instance only when vm_state is in active, + # stopped or error state. If an instance is in resized + # vm_state, masakari resets the instance state to *error* so + # that the instance can be evacuated. + if vm_state == 'resized': + self.novaclient.reset_instance_state(context, instance.id) + + # evacuate the instance + self.novaclient.evacuate_instance( + context, instance.id, + target=reserved_host.name if reserved_host else None) + + periodic_call = loopingcall.FixedIntervalLoopingCall( + _wait_for_evacuation) + + try: + # add a timeout to the periodic call. + periodic_call.start(interval=CONF.verify_interval) + etimeout.with_timeout( + CONF.wait_period_after_evacuation, + periodic_call.wait) + except etimeout.Timeout: + # Instance is not evacuated in the expected time_limit. + failed_evacuation_instances.append(instance.id) + finally: + # stop the periodic call, in case of exceptions or + # Timeout. + periodic_call.stop() + except Exception: + # Exception is raised while resetting instance state or + # evacuating the instance itself. + failed_evacuation_instances.append(instance.id) + finally: + if not instance_already_locked: + # Unlock the server after evacuation and confirmation + self.novaclient.unlock_server(context, instance.id) + def execute(self, context, host_name, instance_list, reserved_host=None): def _do_evacuate(context, host_name, instance_list, reserved_host=None): + failed_evacuation_instances = [] if reserved_host: if CONF.host_failure.add_reserved_host_to_aggregate: # Assign reserved_host to an aggregate to which the failed @@ -128,21 +196,21 @@ class EvacuateInstancesTask(base.MasakariTask): reserved_host.reserved = False reserved_host.save() + thread_pool = greenpool.GreenPool( + CONF.host_failure_recovery_threads) for instance in instance_list: - vm_state = getattr(instance, "OS-EXT-STS:vm_state") - if vm_state in ['active', 'error', 'resized', 'stopped']: - # Evacuate API only evacuates an instance in - # active, stop or error state. If an instance is in - # resized status, masakari resets the instance - # state to *error* to evacuate it. - if vm_state == 'resized': - self.novaclient.reset_instance_state( - context, instance.id) + thread_pool.spawn_n(self._evacuate_and_confirm, context, + instance, host_name, + failed_evacuation_instances, reserved_host) + thread_pool.waitall() - # evacuate the instance - self.novaclient.evacuate_instance( - context, instance.id, - target=reserved_host.name if reserved_host else None) + if failed_evacuation_instances: + msg = _("Failed to evacuate instances %(instances)s from " + "host %(host_name)s.") % { + 'instances': failed_evacuation_instances, + 'host_name': host_name + } + raise exception.HostRecoveryFailureException(message=msg) lock_name = reserved_host.name if reserved_host else None @@ -160,57 +228,6 @@ class EvacuateInstancesTask(base.MasakariTask): # 'auto' as the selection of compute host will be decided by nova. _do_evacuate(context, host_name, instance_list) - return { - "instance_list": instance_list, - } - - -class ConfirmEvacuationTask(base.MasakariTask): - def __init__(self, novaclient): - requires = ["instance_list", "host_name"] - super(ConfirmEvacuationTask, self).__init__(addons=[ACTION], - requires=requires) - self.novaclient = novaclient - - def execute(self, context, instance_list, host_name): - failed_evacuation_instances = [] - for instance in instance_list: - def _wait_for_evacuation(): - new_instance = self.novaclient.get_server(context, instance.id) - instance_host = getattr(new_instance, - "OS-EXT-SRV-ATTR:hypervisor_hostname") - old_vm_state = getattr(instance, "OS-EXT-STS:vm_state") - new_vm_state = getattr(new_instance, - "OS-EXT-STS:vm_state") - - if instance_host != host_name: - if ((old_vm_state == 'error' and - new_vm_state == 'active') or - old_vm_state == new_vm_state): - raise loopingcall.LoopingCallDone() - - periodic_call = loopingcall.FixedIntervalLoopingCall( - _wait_for_evacuation) - try: - # add a timeout to the periodic call. - periodic_call.start(interval=CONF.verify_interval) - etimeout.with_timeout(CONF.wait_period_after_evacuation, - periodic_call.wait) - except etimeout.Timeout: - # Instance is not evacuated in the expected time_limit. - failed_evacuation_instances.append(instance.id) - finally: - # stop the periodic call, in case of exceptions or Timeout. - periodic_call.stop() - - if failed_evacuation_instances: - msg = _("Failed to evacuate instances %(instances)s from " - "host %(host_name)s.") % { - 'instances': failed_evacuation_instances, - 'host_name': host_name - } - raise exception.HostRecoveryFailureException(message=msg) - def get_auto_flow(novaclient, process_what): """Constructs and returns the engine entrypoint flow. @@ -228,8 +245,7 @@ def get_auto_flow(novaclient, process_what): auto_evacuate_flow.add(DisableComputeServiceTask(novaclient), PrepareHAEnabledInstancesTask(novaclient), - EvacuateInstancesTask(novaclient), - ConfirmEvacuationTask(novaclient)) + EvacuateInstancesTask(novaclient)) return taskflow.engines.load(auto_evacuate_flow, store=process_what) @@ -252,8 +268,7 @@ def get_rh_flow(novaclient, process_what): rebind=['reserved_host_list'], provides='reserved_host')) rh_flow.add(PrepareHAEnabledInstancesTask(novaclient), - EvacuateInstancesTask(novaclient), - ConfirmEvacuationTask(novaclient)) + EvacuateInstancesTask(novaclient)) nested_flow.add(DisableComputeServiceTask(novaclient), rh_flow) diff --git a/masakari/tests/unit/compute/test_nova.py b/masakari/tests/unit/compute/test_nova.py index 171a7869..0c9f95bc 100644 --- a/masakari/tests/unit/compute/test_nova.py +++ b/masakari/tests/unit/compute/test_nova.py @@ -261,3 +261,23 @@ class NovaApiTestCase(test.TestCase): mock_novaclient.assert_called_once_with(self.ctx) mock_aggregates.add_host.assert_called_once_with( mock_aggregate.id, 'fake_host') + + @mock.patch('masakari.compute.nova.novaclient') + def test_lock_server(self, mock_novaclient): + uuid = uuidsentinel.fake_server + mock_servers = mock.MagicMock() + mock_novaclient.return_value = mock.MagicMock(servers=mock_servers) + self.api.lock_server(self.ctx, uuid) + + mock_novaclient.assert_called_once_with(self.ctx) + mock_servers.lock.assert_called_once_with(uuidsentinel.fake_server) + + @mock.patch('masakari.compute.nova.novaclient') + def test_unlock_server(self, mock_novaclient): + uuid = uuidsentinel.fake_server + mock_servers = mock.MagicMock() + mock_novaclient.return_value = mock.MagicMock(servers=mock_servers) + self.api.unlock_server(self.ctx, uuid) + + mock_novaclient.assert_called_once_with(self.ctx) + mock_servers.unlock.assert_called_once_with(uuidsentinel.fake_server) diff --git a/masakari/tests/unit/engine/drivers/taskflow/test_host_failure_flow.py b/masakari/tests/unit/engine/drivers/taskflow/test_host_failure_flow.py index 210e6c4f..d17e2434 100644 --- a/masakari/tests/unit/engine/drivers/taskflow/test_host_failure_flow.py +++ b/masakari/tests/unit/engine/drivers/taskflow/test_host_failure_flow.py @@ -32,6 +32,9 @@ from masakari.tests.unit import fakes CONF = conf.CONF +@mock.patch.object(nova.API, "enable_disable_service") +@mock.patch.object(nova.API, "lock_server") +@mock.patch.object(nova.API, "unlock_server") class HostFailureTestCase(test.TestCase): def setUp(self): @@ -58,14 +61,11 @@ class HostFailureTestCase(test.TestCase): getattr(instance, 'OS-EXT-SRV-ATTR:hypervisor_hostname')) - def _test_disable_compute_service(self): + def _test_disable_compute_service(self, mock_enable_disable): task = host_failure.DisableComputeServiceTask(self.novaclient) - with mock.patch.object( - self.novaclient, - "enable_disable_service") as mock_enable_disable_service: - task.execute(self.ctxt, self.instance_host) + task.execute(self.ctxt, self.instance_host) - mock_enable_disable_service.assert_called_once_with( + mock_enable_disable.assert_called_once_with( self.ctxt, self.instance_host) def _test_instance_list(self): @@ -83,33 +83,26 @@ class HostFailureTestCase(test.TestCase): return instance_list - def _evacuate_instances(self, instance_list, reserved_host=None): + def _evacuate_instances(self, instance_list, mock_enable_disable, + reserved_host=None): task = host_failure.EvacuateInstancesTask(self.novaclient) if reserved_host: - with mock.patch.object( - self.novaclient, - "enable_disable_service") as mock_enable_disable_service: - instance_list = task.execute(self.ctxt, self.instance_host, - instance_list['instance_list'], - reserved_host=reserved_host) + task.execute(self.ctxt, self.instance_host, + instance_list['instance_list'], + reserved_host=reserved_host) - mock_enable_disable_service.assert_called_once_with( - self.ctxt, reserved_host.name, enable=True) + self.assertTrue(mock_enable_disable.called) else: - instance_list = task.execute( + task.execute( self.ctxt, self.instance_host, instance_list['instance_list']) - return instance_list - - def _test_confirm_evacuate_task(self, instance_list): - task = host_failure.ConfirmEvacuationTask(self.novaclient) - task.execute(self.ctxt, instance_list['instance_list'], - self.instance_host) # make sure instance is active and has different host self._verify_instance_evacuated() @mock.patch('masakari.compute.nova.novaclient') - def test_host_failure_flow_for_auto_recovery(self, _mock_novaclient): + def test_host_failure_flow_for_auto_recovery( + self, _mock_novaclient, mock_unlock, mock_lock, + mock_enable_disable): _mock_novaclient.return_value = self.fake_client self.override_config("evacuate_all_instances", True, "host_failure") @@ -120,20 +113,18 @@ class HostFailureTestCase(test.TestCase): self.fake_client.servers.create(id="2", host=self.instance_host) # execute DisableComputeServiceTask - self._test_disable_compute_service() + self._test_disable_compute_service(mock_enable_disable) # execute PrepareHAEnabledInstancesTask instance_list = self._test_instance_list() # execute EvacuateInstancesTask - instance_list = self._evacuate_instances(instance_list) - - # execute ConfirmEvacuationTask - self._test_confirm_evacuate_task(instance_list) + self._evacuate_instances(instance_list, mock_enable_disable) @mock.patch('masakari.compute.nova.novaclient') def test_host_failure_flow_for_reserved_host_recovery( - self, _mock_novaclient): + self, _mock_novaclient, mock_unlock, mock_lock, + mock_enable_disable): _mock_novaclient.return_value = self.fake_client self.override_config("evacuate_all_instances", True, "host_failure") @@ -150,50 +141,44 @@ class HostFailureTestCase(test.TestCase): hosts=[self.instance_host]) # execute DisableComputeServiceTask - self._test_disable_compute_service() + self._test_disable_compute_service(mock_enable_disable) # execute PrepareHAEnabledInstancesTask instance_list = self._test_instance_list() # execute EvacuateInstancesTask with mock.patch.object(host_obj.Host, "save") as mock_save: - instance_list = self._evacuate_instances( - instance_list, reserved_host=reserved_host) + self._evacuate_instances( + instance_list, mock_enable_disable, + reserved_host=reserved_host) self.assertEqual(1, mock_save.call_count) self.assertIn(reserved_host.name, self.fake_client.aggregates.get('1').hosts) - # execute ConfirmEvacuationTask - self._test_confirm_evacuate_task(instance_list) - @mock.patch('masakari.compute.nova.novaclient') - def test_evacuate_instances_task(self, _mock_novaclient): + def test_evacuate_instances_task(self, _mock_novaclient, mock_unlock, + mock_lock, mock_enable_disable): _mock_novaclient.return_value = self.fake_client # create test data self.fake_client.servers.create(id="1", host=self.instance_host, - ha_enabled=True) + vm_state="error", ha_enabled=True) self.fake_client.servers.create(id="2", host=self.instance_host, - ha_enabled=True) + vm_state="error", ha_enabled=True) # execute DisableComputeServiceTask - self._test_disable_compute_service() + self._test_disable_compute_service(mock_enable_disable) # execute PrepareHAEnabledInstancesTask instance_list = self._test_instance_list() # execute EvacuateInstancesTask - task = host_failure.EvacuateInstancesTask(self.novaclient) - # mock evacuate method of FakeNovaClient to confirm that evacuate - # method is called. - with mock.patch.object(fakes.FakeNovaClient.ServerManager, - "evacuate") as mock_evacuate: - task.execute(self.ctxt, self.instance_host, - instance_list['instance_list']) - self.assertEqual(2, mock_evacuate.call_count) + self._evacuate_instances(instance_list, mock_enable_disable) @mock.patch('masakari.compute.nova.novaclient') - def test_host_failure_flow_no_ha_enabled_instances(self, _mock_novaclient): + def test_host_failure_flow_no_ha_enabled_instances( + self, _mock_novaclient, mock_unlock, mock_lock, + mock_enable_disable): _mock_novaclient.return_value = self.fake_client # create test data @@ -201,7 +186,7 @@ class HostFailureTestCase(test.TestCase): self.fake_client.servers.create(id="2", host=self.instance_host) # execute DisableComputeServiceTask - self._test_disable_compute_service() + self._test_disable_compute_service(mock_enable_disable) # execute PrepareHAEnabledInstancesTask task = host_failure.PrepareHAEnabledInstancesTask(self.novaclient) @@ -209,21 +194,19 @@ class HostFailureTestCase(test.TestCase): self.ctxt, self.instance_host) @mock.patch('masakari.compute.nova.novaclient') - def test_host_failure_flow_evacuation_failed(self, _mock_novaclient): + def test_host_failure_flow_evacuation_failed( + self, _mock_novaclient, mock_unlock, mock_lock, + mock_enable_disable): _mock_novaclient.return_value = self.fake_client # create ha_enabled test data - server = self.fake_client.servers.create(id="1", + server = self.fake_client.servers.create(id="1", vm_state='active', host=self.instance_host, ha_enabled=True) instance_list = { "instance_list": self.fake_client.servers.list() } - # execute EvacuateInstancesTask - instance_list = self._evacuate_instances( - instance_list) - def fake_get_server(context, host): # assume that while evacuating instance goes into error state fake_server = copy.deepcopy(server) @@ -231,15 +214,15 @@ class HostFailureTestCase(test.TestCase): return fake_server with mock.patch.object(self.novaclient, "get_server", fake_get_server): - # execute ConfirmEvacuationTask - task = host_failure.ConfirmEvacuationTask(self.novaclient) + # execute EvacuateInstancesTask self.assertRaises( - exception.HostRecoveryFailureException, task.execute, - self.ctxt, instance_list['instance_list'], - self.instance_host) + exception.HostRecoveryFailureException, + self._evacuate_instances, instance_list, mock_enable_disable) @mock.patch('masakari.compute.nova.novaclient') - def test_host_failure_flow_resized_instance(self, _mock_novaclient): + def test_host_failure_flow_resized_instance( + self, _mock_novaclient, mock_unlock, mock_lock, + mock_enable_disable): _mock_novaclient.return_value = self.fake_client # create ha_enabled test data @@ -254,14 +237,12 @@ class HostFailureTestCase(test.TestCase): } # execute EvacuateInstancesTask - instance_list = self._evacuate_instances( - instance_list) - - # execute ConfirmEvacuationTask - self._test_confirm_evacuate_task(instance_list) + self._evacuate_instances(instance_list, mock_enable_disable) @mock.patch('masakari.compute.nova.novaclient') - def test_host_failure_flow_shutdown_instance(self, _mock_novaclient): + def test_host_failure_flow_shutdown_instance( + self, _mock_novaclient, mock_unlock, mock_lock, + mock_enable_disable): _mock_novaclient.return_value = self.fake_client # create ha_enabled test data @@ -276,14 +257,12 @@ class HostFailureTestCase(test.TestCase): } # execute EvacuateInstancesTask - instance_list = self._evacuate_instances( - instance_list) - - # execute ConfirmEvacuationTask - self._test_confirm_evacuate_task(instance_list) + self._evacuate_instances(instance_list, mock_enable_disable) @mock.patch('masakari.compute.nova.novaclient') - def test_host_failure_flow_instance_in_error(self, _mock_novaclient): + def test_host_failure_flow_instance_in_error( + self, _mock_novaclient, mock_unlock, mock_lock, + mock_enable_disable): _mock_novaclient.return_value = self.fake_client # create ha_enabled test data @@ -298,20 +277,18 @@ class HostFailureTestCase(test.TestCase): } # execute EvacuateInstancesTask - instance_list = self._evacuate_instances( - instance_list) - - # execute ConfirmEvacuationTask - self._test_confirm_evacuate_task(instance_list) + self._evacuate_instances(instance_list, mock_enable_disable) @mock.patch('masakari.compute.nova.novaclient') - def test_host_failure_flow_no_instances_on_host(self, _mock_novaclient): + def test_host_failure_flow_no_instances_on_host( + self, _mock_novaclient, mock_unlock, mock_lock, + mock_enable_disable): _mock_novaclient.return_value = self.fake_client self.override_config("evacuate_all_instances", True, "host_failure") # execute DisableComputeServiceTask - self._test_disable_compute_service() + self._test_disable_compute_service(mock_enable_disable) # execute PrepareHAEnabledInstancesTask task = host_failure.PrepareHAEnabledInstancesTask(self.novaclient) diff --git a/masakari/tests/unit/fakes.py b/masakari/tests/unit/fakes.py index 62a1e707..9badced9 100644 --- a/masakari/tests/unit/fakes.py +++ b/masakari/tests/unit/fakes.py @@ -24,13 +24,14 @@ NOW = timeutils.utcnow().replace(microsecond=0) class FakeNovaClient(object): class Server(object): def __init__(self, id=None, uuid=None, host=None, vm_state=None, - ha_enabled=None): + ha_enabled=None, locked=False): self.id = id self.uuid = uuid or uuidutils.generate_uuid() self.host = host setattr(self, 'OS-EXT-SRV-ATTR:hypervisor_hostname', host) setattr(self, 'OS-EXT-STS:vm_state', vm_state) self.metadata = {"HA_Enabled": ha_enabled} + self.locked = locked class ServerManager(object): def __init__(self): diff --git a/releasenotes/notes/evacuation_in_threads-cc9c79b10acfb5f6.yaml b/releasenotes/notes/evacuation_in_threads-cc9c79b10acfb5f6.yaml new file mode 100644 index 00000000..23cc9098 --- /dev/null +++ b/releasenotes/notes/evacuation_in_threads-cc9c79b10acfb5f6.yaml @@ -0,0 +1,18 @@ +--- +fixes: + - | + Fixes `bug 1693728`_ which will fix the race condition where after + evacuation of an instance to other host user might perform some actions on + that instance which gives wrong instance vm_state to ConfirmEvacuationTask + that results into notification failure. + + To fix this issue, following config option is added under ``DEFAULT`` + section in 'masakari.conf' file:: + + [DEFAULT] + host_failure_recovery_threads = 3 + + This config option decides the number of threads going to be used for + evacuating the instances. + + .. _`bug 1693728`: https://bugs.launchpad.net/masakari/+bug/1693728