From 7dd6a4a19311136c02d89cd2afd97236b0f4cc27 Mon Sep 17 00:00:00 2001 From: "Daniel P. Berrange" Date: Thu, 29 Jan 2015 14:33:32 +0000 Subject: [PATCH] libvirt: proper monitoring of live migration progress The current live migration code simply invokes migrateToURI and waits for it to finish, or raise an exception. It considers all exceptions to mean the live migration aborted and the VM is still running on the source host. This is totally bogus, as there are a number of reasons why an error could be raised from the migrateToURI call. There are at least 5 different scenarios for what the VM might be doing on source + dest host upon error. The migration might even still be going on, even if after the error has occurred. A more reliable way to deal with this is to actively query libvirt for the domain job status. This gives an indication of whether the job is completed, failed or cancelled. Even with that though, there is a need for a few heuristics to distinguish some of the possible error scenarios. This change to do active monitoring of the live migration process also opens the door for being able to tune live migration on the fly to adjust max downtime or bandwidth to improve chances of getting convergence, or to automatically abort it after too much time has elapsed instead of letting it carry on until the end of the universe. This change merely records memory transfer progress and leaves tuning improvements to a later date. Closes-bug: #1414065 Change-Id: I6fcbfa31a79c7808c861bb3a84b56bd096882004 --- nova/tests/unit/virt/libvirt/fakelibvirt.py | 16 +- nova/tests/unit/virt/libvirt/test_driver.py | 288 ++++++++++++++------ nova/tests/unit/virt/libvirt/test_host.py | 194 +++++++++++++ nova/virt/libvirt/driver.py | 246 +++++++++++++++-- nova/virt/libvirt/host.py | 115 ++++++++ 5 files changed, 759 insertions(+), 100 deletions(-) diff --git a/nova/tests/unit/virt/libvirt/fakelibvirt.py b/nova/tests/unit/virt/libvirt/fakelibvirt.py index 1bf071b4c4d9..20b94a422989 100644 --- a/nova/tests/unit/virt/libvirt/fakelibvirt.py +++ b/nova/tests/unit/virt/libvirt/fakelibvirt.py @@ -173,6 +173,14 @@ VIR_SECRET_USAGE_TYPE_CEPH = 2 VIR_SECRET_USAGE_TYPE_ISCSI = 3 +VIR_DOMAIN_JOB_NONE = 0 +VIR_DOMAIN_JOB_BOUNDED = 1 +VIR_DOMAIN_JOB_UNBOUNDED = 2 +VIR_DOMAIN_JOB_COMPLETED = 3 +VIR_DOMAIN_JOB_FAILED = 4 +VIR_DOMAIN_JOB_CANCELLED = 5 + + def _parse_disk_info(element): disk_info = {} disk_info['type'] = element.get('type', 'file') @@ -663,6 +671,12 @@ class Domain(object): def blockJobInfo(self, disk, flags): return {} + def jobInfo(self): + return [] + + def jobStats(self, flags=0): + return {} + class DomainSnapshot(object): def __init__(self, name, domain): @@ -1151,7 +1165,7 @@ class Connection(object): pass -def openAuth(uri, auth, flags): +def openAuth(uri, auth, flags=0): if type(auth) != list: raise Exception("Expected a list for 'auth' parameter") diff --git a/nova/tests/unit/virt/libvirt/test_driver.py b/nova/tests/unit/virt/libvirt/test_driver.py index db81450df8a8..8b9c6a37a53c 100644 --- a/nova/tests/unit/virt/libvirt/test_driver.py +++ b/nova/tests/unit/virt/libvirt/test_driver.py @@ -6563,15 +6563,6 @@ class LibvirtConnTestCase(test.NoDBTestCase): None, _bandwidth).AndRaise(libvirt.libvirtError("ERR")) - def fake_lookup(instance_name): - if instance_name == instance_ref['name']: - return vdmock - - self.create_fake_libvirt_mock(lookupByName=fake_lookup) - self.mox.StubOutWithMock(self.compute, "_rollback_live_migration") - self.compute._rollback_live_migration(self.context, instance_ref, - 'dest', False) - # start test migrate_data = {'pre_live_migration_result': {'graphics_listen_addrs': @@ -6579,10 +6570,9 @@ class LibvirtConnTestCase(test.NoDBTestCase): self.mox.ReplayAll() drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False) self.assertRaises(libvirt.libvirtError, - drvr._live_migration, - self.context, instance_ref, 'dest', False, - self.compute._rollback_live_migration, - migrate_data=migrate_data) + drvr._live_migration_operation, + self.context, instance_ref, 'dest', + False, migrate_data, vdmock) @mock.patch.object(libvirt, 'VIR_DOMAIN_XML_MIGRATABLE', 8675, create=True) def test_live_migration_update_volume_xml(self): @@ -6626,10 +6616,9 @@ class LibvirtConnTestCase(test.NoDBTestCase): instance_id='foo') mget_domain.return_value = test_mock test_mock.XMLDesc.return_value = target_xml - self.assertFalse(drvr._live_migration( - self.context, instance_ref, 'dest', test_mock, - self.compute._rollback_live_migration, - migrate_data=migrate_data)) + self.assertFalse(drvr._live_migration_operation( + self.context, instance_ref, 'dest', False, + migrate_data, test_mock)) mupdate.assert_called_once_with(target_xml, volume, None) def test_update_volume_xml(self): @@ -6783,15 +6772,6 @@ class LibvirtConnTestCase(test.NoDBTestCase): None, _bandwidth).AndRaise(libvirt.libvirtError("ERR")) - def fake_lookup(instance_name): - if instance_name == instance_ref['name']: - return vdmock - - self.create_fake_libvirt_mock(lookupByName=fake_lookup) - self.mox.StubOutWithMock(self.compute, "_rollback_live_migration") - self.compute._rollback_live_migration(self.context, instance_ref, - 'dest', False) - # start test migrate_data = {'pre_live_migration_result': {'graphics_listen_addrs': @@ -6799,10 +6779,9 @@ class LibvirtConnTestCase(test.NoDBTestCase): self.mox.ReplayAll() drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False) self.assertRaises(libvirt.libvirtError, - drvr._live_migration, - self.context, instance_ref, 'dest', False, - self.compute._rollback_live_migration, - migrate_data=migrate_data) + drvr._live_migration_operation, + self.context, instance_ref, 'dest', + False, migrate_data, vdmock) def test_live_migration_uses_migrateToURI_without_dest_listen_addrs(self): self.compute = importutils.import_object(CONF.compute_manager) @@ -6821,24 +6800,14 @@ class LibvirtConnTestCase(test.NoDBTestCase): None, _bandwidth).AndRaise(libvirt.libvirtError("ERR")) - def fake_lookup(instance_name): - if instance_name == instance_ref['name']: - return vdmock - - self.create_fake_libvirt_mock(lookupByName=fake_lookup) - self.mox.StubOutWithMock(self.compute, "_rollback_live_migration") - self.compute._rollback_live_migration(self.context, instance_ref, - 'dest', False) - # start test migrate_data = {} self.mox.ReplayAll() drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False) self.assertRaises(libvirt.libvirtError, - drvr._live_migration, - self.context, instance_ref, 'dest', False, - self.compute._rollback_live_migration, - migrate_data=migrate_data) + drvr._live_migration_operation, + self.context, instance_ref, 'dest', + False, migrate_data, vdmock) @mock.patch.object(libvirt, 'VIR_DOMAIN_XML_MIGRATABLE', None, create=True) def test_live_migration_fails_without_migratable_flag_or_0_addr(self): @@ -6854,15 +6823,6 @@ class LibvirtConnTestCase(test.NoDBTestCase): vdmock = self.mox.CreateMock(libvirt.virDomain) self.mox.StubOutWithMock(vdmock, "migrateToURI") - def fake_lookup(instance_name): - if instance_name == instance_ref['name']: - return vdmock - - self.create_fake_libvirt_mock(lookupByName=fake_lookup) - self.mox.StubOutWithMock(self.compute, "_rollback_live_migration") - self.compute._rollback_live_migration(self.context, instance_ref, - 'dest', False) - # start test migrate_data = {'pre_live_migration_result': {'graphics_listen_addrs': @@ -6870,10 +6830,9 @@ class LibvirtConnTestCase(test.NoDBTestCase): self.mox.ReplayAll() drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False) self.assertRaises(exception.MigrationError, - drvr._live_migration, - self.context, instance_ref, 'dest', False, - self.compute._rollback_live_migration, - migrate_data=migrate_data) + drvr._live_migration_operation, + self.context, instance_ref, 'dest', + False, migrate_data, vdmock) def test_live_migration_raises_exception(self): # Confirms recover method is called when exceptions are raised. @@ -6906,15 +6865,6 @@ class LibvirtConnTestCase(test.NoDBTestCase): _bandwidth).AndRaise( libvirt.libvirtError('ERR')) - def fake_lookup(instance_name): - if instance_name == instance_ref['name']: - return vdmock - - self.create_fake_libvirt_mock(lookupByName=fake_lookup) - self.mox.StubOutWithMock(self.compute, "_rollback_live_migration") - self.compute._rollback_live_migration(self.context, instance_ref, - 'dest', False) - # start test migrate_data = {'pre_live_migration_result': {'graphics_listen_addrs': @@ -6922,10 +6872,9 @@ class LibvirtConnTestCase(test.NoDBTestCase): self.mox.ReplayAll() drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False) self.assertRaises(libvirt.libvirtError, - drvr._live_migration, - self.context, instance_ref, 'dest', False, - self.compute._rollback_live_migration, - migrate_data=migrate_data) + drvr._live_migration_operation, + self.context, instance_ref, 'dest', + False, migrate_data, vdmock) self.assertEqual(vm_states.ACTIVE, instance_ref.vm_state) self.assertEqual(power_state.RUNNING, instance_ref.power_state) @@ -6959,15 +6908,6 @@ class LibvirtConnTestCase(test.NoDBTestCase): mox.IgnoreArg(), None, _bandwidth).AndRaise(test.TestingException('oops')) - def fake_lookup(instance_name): - if instance_name == instance_ref.name: - return vdmock - - self.create_fake_libvirt_mock(lookupByName=fake_lookup) - - def fake_recover_method(context, instance, dest, block_migration): - pass - graphics_listen_addrs = {'vnc': '0.0.0.0', 'spice': '127.0.0.1'} migrate_data = {'pre_live_migration_result': {'graphics_listen_addrs': graphics_listen_addrs}} @@ -6979,10 +6919,10 @@ class LibvirtConnTestCase(test.NoDBTestCase): self.mox.ReplayAll() # start test - self.assertRaises(test.TestingException, drvr._live_migration, - self.context, instance_ref, 'dest', post_method=None, - recover_method=fake_recover_method, - migrate_data=migrate_data) + self.assertRaises(test.TestingException, + drvr._live_migration_operation, + self.context, instance_ref, 'dest', + False, migrate_data, vdmock) @mock.patch('shutil.rmtree') @mock.patch('os.path.exists', return_value=True) @@ -7028,6 +6968,190 @@ class LibvirtConnTestCase(test.NoDBTestCase): self.assertFalse(mock_exist.called) self.assertFalse(mock_shutil.called) + @mock.patch.object(time, "sleep", + side_effect=lambda x: eventlet.sleep(0)) + @mock.patch.object(host.DomainJobInfo, "for_domain") + @mock.patch.object(objects.Instance, "save") + @mock.patch.object(fakelibvirt.Connection, "_mark_running") + def _test_live_migration_monitoring(self, + job_info_records, + expect_success, + mock_running, + mock_save, + mock_job_info, + mock_sleep): + drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False) + instance = objects.Instance(**self.test_instance) + dom = fakelibvirt.Domain(drvr._get_connection(), "", True) + finish_event = eventlet.event.Event() + + def fake_job_info(hostself): + while True: + self.assertTrue(len(job_info_records) > 0) + rec = job_info_records.pop() + if type(rec) == str: + if rec == "thread-finish": + finish_event.send() + elif rec == "domain-stop": + dom.destroy() + else: + return rec + return rec + + mock_job_info.side_effect = fake_job_info + + def fake_post_method(self, *args, **kwargs): + fake_post_method.called = True + + def fake_recover_method(self, *args, **kwargs): + fake_recover_method.called = True + + fake_post_method.called = False + fake_recover_method.called = False + + drvr._live_migration_monitor(self.context, instance, + "somehostname", + fake_post_method, + fake_recover_method, + False, + {}, + dom, + finish_event) + + self.assertEqual(fake_post_method.called, expect_success) + self.assertEqual(fake_recover_method.called, not expect_success) + + def test_live_migration_monitor_success(self): + # A normal sequence where see all the normal job states + domain_info_records = [ + host.DomainJobInfo( + type=libvirt.VIR_DOMAIN_JOB_NONE), + host.DomainJobInfo( + type=libvirt.VIR_DOMAIN_JOB_UNBOUNDED), + host.DomainJobInfo( + type=libvirt.VIR_DOMAIN_JOB_UNBOUNDED), + host.DomainJobInfo( + type=libvirt.VIR_DOMAIN_JOB_UNBOUNDED), + "thread-finish", + "domain-stop", + host.DomainJobInfo( + type=libvirt.VIR_DOMAIN_JOB_COMPLETED), + ] + + self._test_live_migration_monitoring(domain_info_records, True) + + def test_live_migration_monitor_success_race(self): + # A normalish sequence but we're too slow to see the + # completed job state + domain_info_records = [ + host.DomainJobInfo( + type=libvirt.VIR_DOMAIN_JOB_NONE), + host.DomainJobInfo( + type=libvirt.VIR_DOMAIN_JOB_UNBOUNDED), + host.DomainJobInfo( + type=libvirt.VIR_DOMAIN_JOB_UNBOUNDED), + host.DomainJobInfo( + type=libvirt.VIR_DOMAIN_JOB_UNBOUNDED), + "thread-finish", + "domain-stop", + host.DomainJobInfo( + type=libvirt.VIR_DOMAIN_JOB_NONE), + ] + + self._test_live_migration_monitoring(domain_info_records, True) + + def test_live_migration_monitor_failed(self): + # A failed sequence where we see all the expected events + domain_info_records = [ + host.DomainJobInfo( + type=libvirt.VIR_DOMAIN_JOB_NONE), + host.DomainJobInfo( + type=libvirt.VIR_DOMAIN_JOB_UNBOUNDED), + host.DomainJobInfo( + type=libvirt.VIR_DOMAIN_JOB_UNBOUNDED), + host.DomainJobInfo( + type=libvirt.VIR_DOMAIN_JOB_UNBOUNDED), + "thread-finish", + host.DomainJobInfo( + type=libvirt.VIR_DOMAIN_JOB_FAILED), + ] + + self._test_live_migration_monitoring(domain_info_records, False) + + def test_live_migration_monitor_failed_race(self): + # A failed sequence where we are too slow to see the + # failed event + domain_info_records = [ + host.DomainJobInfo( + type=libvirt.VIR_DOMAIN_JOB_NONE), + host.DomainJobInfo( + type=libvirt.VIR_DOMAIN_JOB_UNBOUNDED), + host.DomainJobInfo( + type=libvirt.VIR_DOMAIN_JOB_UNBOUNDED), + host.DomainJobInfo( + type=libvirt.VIR_DOMAIN_JOB_UNBOUNDED), + "thread-finish", + host.DomainJobInfo( + type=libvirt.VIR_DOMAIN_JOB_NONE), + ] + + self._test_live_migration_monitoring(domain_info_records, False) + + def test_live_migration_monitor_cancelled(self): + # A cancelled sequence where we see all the events + domain_info_records = [ + host.DomainJobInfo( + type=libvirt.VIR_DOMAIN_JOB_NONE), + host.DomainJobInfo( + type=libvirt.VIR_DOMAIN_JOB_UNBOUNDED), + host.DomainJobInfo( + type=libvirt.VIR_DOMAIN_JOB_UNBOUNDED), + host.DomainJobInfo( + type=libvirt.VIR_DOMAIN_JOB_UNBOUNDED), + "thread-finish", + "domain-stop", + host.DomainJobInfo( + type=libvirt.VIR_DOMAIN_JOB_CANCELLED), + ] + + self._test_live_migration_monitoring(domain_info_records, False) + + @mock.patch.object(greenthread, "spawn") + @mock.patch.object(libvirt_driver.LibvirtDriver, "_live_migration_monitor") + @mock.patch.object(host.Host, "get_domain") + @mock.patch.object(fakelibvirt.Connection, "_mark_running") + def test_live_migration_main(self, mock_running, mock_dom, + mock_monitor, mock_thread): + drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False) + instance = objects.Instance(**self.test_instance) + dom = fakelibvirt.Domain(drvr._get_connection(), "", True) + migrate_data = {} + + mock_dom.return_value = dom + + def fake_post(): + pass + + def fake_recover(): + pass + + drvr._live_migration(self.context, instance, "fakehost", + fake_post, fake_recover, False, + migrate_data) + + class AnyEventletEvent(object): + def __eq__(self, other): + return type(other) == eventlet.event.Event + + mock_thread.assert_called_once_with( + drvr._live_migration_operation, + self.context, instance, "fakehost", False, + migrate_data, dom) + mock_monitor.assert_called_once_with( + self.context, instance, "fakehost", + fake_post, fake_recover, False, + migrate_data, dom, AnyEventletEvent()) + def _do_test_create_images_and_backing(self, disk_type): drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False) self.mox.StubOutWithMock(drvr, '_fetch_instance_kernel_ramdisk') diff --git a/nova/tests/unit/virt/libvirt/test_host.py b/nova/tests/unit/virt/libvirt/test_host.py index 8354d2d5b7ca..0af95ae7dd31 100644 --- a/nova/tests/unit/virt/libvirt/test_host.py +++ b/nova/tests/unit/virt/libvirt/test_host.py @@ -710,3 +710,197 @@ class HostTestCase(test.NoDBTestCase): mock_find_secret.return_value = None self.host.delete_secret("rbd", "rbdvol") + + +class DomainJobInfoTestCase(test.NoDBTestCase): + + def setUp(self): + super(DomainJobInfoTestCase, self).setUp() + + self.useFixture(fakelibvirt.FakeLibvirtFixture()) + + self.conn = fakelibvirt.openAuth("qemu:///system", + [[], lambda: True]) + xml = ("" + " instance-0000000a" + "") + self.dom = self.conn.createXML(xml, 0) + host.DomainJobInfo._have_job_stats = True + + @mock.patch.object(fakelibvirt.virDomain, "jobInfo") + @mock.patch.object(fakelibvirt.virDomain, "jobStats") + def test_job_stats(self, mock_stats, mock_info): + mock_stats.return_value = { + "type": libvirt.VIR_DOMAIN_JOB_UNBOUNDED, + "memory_total": 75, + "memory_processed": 50, + "memory_remaining": 33, + "some_new_libvirt_stat_we_dont_know_about": 83 + } + + info = host.DomainJobInfo.for_domain(self.dom) + + self.assertIsInstance(info, host.DomainJobInfo) + self.assertEqual(libvirt.VIR_DOMAIN_JOB_UNBOUNDED, info.type) + self.assertEqual(75, info.memory_total) + self.assertEqual(50, info.memory_processed) + self.assertEqual(33, info.memory_remaining) + self.assertEqual(0, info.disk_total) + self.assertEqual(0, info.disk_processed) + self.assertEqual(0, info.disk_remaining) + + mock_stats.assert_called_once_with() + self.assertFalse(mock_info.called) + + @mock.patch.object(fakelibvirt.virDomain, "jobInfo") + @mock.patch.object(fakelibvirt.virDomain, "jobStats") + def test_job_info_no_support(self, mock_stats, mock_info): + mock_stats.side_effect = fakelibvirt.make_libvirtError( + libvirt.libvirtError, + "virDomainGetJobStats not implemented", + libvirt.VIR_ERR_NO_SUPPORT) + + mock_info.return_value = [ + libvirt.VIR_DOMAIN_JOB_UNBOUNDED, + 100, 99, 10, 11, 12, 75, 50, 33, 1, 2, 3] + + info = host.DomainJobInfo.for_domain(self.dom) + + self.assertIsInstance(info, host.DomainJobInfo) + self.assertEqual(libvirt.VIR_DOMAIN_JOB_UNBOUNDED, info.type) + self.assertEqual(100, info.time_elapsed) + self.assertEqual(99, info.time_remaining) + self.assertEqual(10, info.data_total) + self.assertEqual(11, info.data_processed) + self.assertEqual(12, info.data_remaining) + self.assertEqual(75, info.memory_total) + self.assertEqual(50, info.memory_processed) + self.assertEqual(33, info.memory_remaining) + self.assertEqual(1, info.disk_total) + self.assertEqual(2, info.disk_processed) + self.assertEqual(3, info.disk_remaining) + + mock_stats.assert_called_once_with() + mock_info.assert_called_once_with() + + @mock.patch.object(fakelibvirt.virDomain, "jobInfo") + @mock.patch.object(fakelibvirt.virDomain, "jobStats") + def test_job_info_attr_error(self, mock_stats, mock_info): + mock_stats.side_effect = AttributeError("No such API") + + mock_info.return_value = [ + libvirt.VIR_DOMAIN_JOB_UNBOUNDED, + 100, 99, 10, 11, 12, 75, 50, 33, 1, 2, 3] + + info = host.DomainJobInfo.for_domain(self.dom) + + self.assertIsInstance(info, host.DomainJobInfo) + self.assertEqual(libvirt.VIR_DOMAIN_JOB_UNBOUNDED, info.type) + self.assertEqual(100, info.time_elapsed) + self.assertEqual(99, info.time_remaining) + self.assertEqual(10, info.data_total) + self.assertEqual(11, info.data_processed) + self.assertEqual(12, info.data_remaining) + self.assertEqual(75, info.memory_total) + self.assertEqual(50, info.memory_processed) + self.assertEqual(33, info.memory_remaining) + self.assertEqual(1, info.disk_total) + self.assertEqual(2, info.disk_processed) + self.assertEqual(3, info.disk_remaining) + + mock_stats.assert_called_once_with() + mock_info.assert_called_once_with() + + @mock.patch.object(fakelibvirt.virDomain, "jobInfo") + @mock.patch.object(fakelibvirt.virDomain, "jobStats") + def test_job_stats_no_domain(self, mock_stats, mock_info): + mock_stats.side_effect = fakelibvirt.make_libvirtError( + libvirt.libvirtError, + "No such domain with UUID blah", + libvirt.VIR_ERR_NO_DOMAIN) + + info = host.DomainJobInfo.for_domain(self.dom) + + self.assertIsInstance(info, host.DomainJobInfo) + self.assertEqual(libvirt.VIR_DOMAIN_JOB_COMPLETED, info.type) + self.assertEqual(0, info.time_elapsed) + self.assertEqual(0, info.time_remaining) + self.assertEqual(0, info.memory_total) + self.assertEqual(0, info.memory_processed) + self.assertEqual(0, info.memory_remaining) + + mock_stats.assert_called_once_with() + self.assertFalse(mock_info.called) + + @mock.patch.object(fakelibvirt.virDomain, "jobInfo") + @mock.patch.object(fakelibvirt.virDomain, "jobStats") + def test_job_info_no_domain(self, mock_stats, mock_info): + mock_stats.side_effect = fakelibvirt.make_libvirtError( + libvirt.libvirtError, + "virDomainGetJobStats not implemented", + libvirt.VIR_ERR_NO_SUPPORT) + + mock_info.side_effect = fakelibvirt.make_libvirtError( + libvirt.libvirtError, + "No such domain with UUID blah", + libvirt.VIR_ERR_NO_DOMAIN) + + info = host.DomainJobInfo.for_domain(self.dom) + + self.assertIsInstance(info, host.DomainJobInfo) + self.assertEqual(libvirt.VIR_DOMAIN_JOB_COMPLETED, info.type) + self.assertEqual(0, info.time_elapsed) + self.assertEqual(0, info.time_remaining) + self.assertEqual(0, info.memory_total) + self.assertEqual(0, info.memory_processed) + self.assertEqual(0, info.memory_remaining) + + mock_stats.assert_called_once_with() + mock_info.assert_called_once_with() + + @mock.patch.object(fakelibvirt.virDomain, "jobInfo") + @mock.patch.object(fakelibvirt.virDomain, "jobStats") + def test_job_stats_operation_invalid(self, mock_stats, mock_info): + mock_stats.side_effect = fakelibvirt.make_libvirtError( + libvirt.libvirtError, + "Domain is not running", + libvirt.VIR_ERR_OPERATION_INVALID) + + info = host.DomainJobInfo.for_domain(self.dom) + + self.assertIsInstance(info, host.DomainJobInfo) + self.assertEqual(libvirt.VIR_DOMAIN_JOB_COMPLETED, info.type) + self.assertEqual(0, info.time_elapsed) + self.assertEqual(0, info.time_remaining) + self.assertEqual(0, info.memory_total) + self.assertEqual(0, info.memory_processed) + self.assertEqual(0, info.memory_remaining) + + mock_stats.assert_called_once_with() + self.assertFalse(mock_info.called) + + @mock.patch.object(fakelibvirt.virDomain, "jobInfo") + @mock.patch.object(fakelibvirt.virDomain, "jobStats") + def test_job_info_operation_invalid(self, mock_stats, mock_info): + mock_stats.side_effect = fakelibvirt.make_libvirtError( + libvirt.libvirtError, + "virDomainGetJobStats not implemented", + libvirt.VIR_ERR_NO_SUPPORT) + + mock_info.side_effect = fakelibvirt.make_libvirtError( + libvirt.libvirtError, + "Domain is not running", + libvirt.VIR_ERR_OPERATION_INVALID) + + info = host.DomainJobInfo.for_domain(self.dom) + + self.assertIsInstance(info, host.DomainJobInfo) + self.assertEqual(libvirt.VIR_DOMAIN_JOB_COMPLETED, info.type) + self.assertEqual(0, info.time_elapsed) + self.assertEqual(0, info.time_remaining) + self.assertEqual(0, info.memory_total) + self.assertEqual(0, info.memory_processed) + self.assertEqual(0, info.memory_remaining) + + mock_stats.assert_called_once_with() + mock_info.assert_called_once_with() diff --git a/nova/virt/libvirt/driver.py b/nova/virt/libvirt/driver.py index 19c8a1e06e64..958c9c92faaa 100644 --- a/nova/virt/libvirt/driver.py +++ b/nova/virt/libvirt/driver.py @@ -5364,27 +5364,23 @@ class LibvirtDriver(driver.ComputeDriver): ' continue to listen on the current' ' addresses.')) - def _live_migration(self, context, instance, dest, post_method, - recover_method, block_migration=False, - migrate_data=None): - """Do live migration. + def _live_migration_operation(self, context, instance, dest, + block_migration, migrate_data, dom): + """Invoke the live migration operation :param context: security context :param instance: nova.db.sqlalchemy.models.Instance object instance object that is migrated. :param dest: destination host - :param post_method: - post operation method. - expected nova.compute.manager._post_live_migration. - :param recover_method: - recovery method when any exception occurs. - expected nova.compute.manager._rollback_live_migration. :param block_migration: if true, do block migration. :param migrate_data: implementation specific params + :param dom: the libvirt domain object + + This method is intended to be run in a background thread and will + block that thread until the migration is finished or failed. """ - # Do live migration. try: if block_migration: flaglist = CONF.libvirt.block_migration_flag.split(',') @@ -5393,8 +5389,6 @@ class LibvirtDriver(driver.ComputeDriver): flagvals = [getattr(libvirt, x.strip()) for x in flaglist] logical_sum = reduce(lambda x, y: x | y, flagvals) - dom = self._host.get_domain(instance) - pre_live_migrate_data = (migrate_data or {}).get( 'pre_live_migration_result', {}) listen_addrs = pre_live_migrate_data.get('graphics_listen_addrs') @@ -5449,15 +5443,233 @@ class LibvirtDriver(driver.ComputeDriver): CONF.libvirt.live_migration_bandwidth) else: raise - except Exception as e: with excutils.save_and_reraise_exception(): LOG.error(_LE("Live Migration failure: %s"), e, instance=instance) - recover_method(context, instance, dest, block_migration) - post_method(context, instance, dest, block_migration, - migrate_data) + # If 'migrateToURI' fails we don't know what state the + # VM instances on each host are in. Possibilities include + # + # 1. src==running, dst==none + # + # Migration failed & rolled back, or never started + # + # 2. src==running, dst==paused + # + # Migration started but is still ongoing + # + # 3. src==paused, dst==paused + # + # Migration data transfer completed, but switchover + # is still ongoing, or failed + # + # 4. src==paused, dst==running + # + # Migration data transfer completed, switchover + # happened but cleanup on source failed + # + # 5. src==none, dst==running + # + # Migration fully succeeded. + # + # Libvirt will aim to complete any migration operation + # or roll it back. So even if the migrateToURI call has + # returned an error, if the migration was not finished + # libvirt should clean up. + # + # So we take the error raise here with a pinch of salt + # and rely on the domain job info status to figure out + # what really happened to the VM, which is a much more + # reliable indicator. + # + # In particular we need to try very hard to ensure that + # Nova does not "forget" about the guest. ie leaving it + # running on a different host to the one recorded in + # the database, as that would be a serious resource leak + + LOG.debug("Migration operation thread has finished", + instance=instance) + + def _live_migration_monitor(self, context, instance, dest, post_method, + recover_method, block_migration, + migrate_data, dom, finish_event): + n = 0 + while True: + info = host.DomainJobInfo.for_domain(dom) + + if info.type == libvirt.VIR_DOMAIN_JOB_NONE: + # Annoyingly this could indicate many possible + # states, so we must fix the mess: + # + # 1. Migration has not yet begun + # 2. Migration has stopped due to failure + # 3. Migration has stopped due to completion + # + # We can detect option 1 by seeing if thread is still + # running. We can distinguish 2 vs 3 by seeing if the + # VM still exists & running on the current host + # + if not finish_event.ready(): + LOG.debug("Operation thread is still running", + instance=instance) + # Leave type untouched + else: + try: + if dom.isActive(): + LOG.debug("VM running on src, migration failed", + instance=instance) + info.type = libvirt.VIR_DOMAIN_JOB_FAILED + else: + LOG.debug("VM is shutoff, migration finished", + instance=instance) + info.type = libvirt.VIR_DOMAIN_JOB_COMPLETED + except libvirt.libvirtError as ex: + LOG.debug("Error checking domain status %(ex)s", + ex, instance=instance) + if ex.get_error_code() == libvirt.VIR_ERR_NO_DOMAIN: + LOG.debug("VM is missing, migration finished", + instance=instance) + info.type = libvirt.VIR_DOMAIN_JOB_COMPLETED + else: + LOG.info(_LI("Error %(ex)s, migration failed"), + instance=instance) + info.type = libvirt.VIR_DOMAIN_JOB_FAILED + + if info.type != libvirt.VIR_DOMAIN_JOB_NONE: + LOG.debug("Fixed incorrect job type to be %d", + info.type, instance=instance) + + if info.type == libvirt.VIR_DOMAIN_JOB_NONE: + # Migration is not yet started + LOG.debug("Migration not running yet", + instance=instance) + elif info.type == libvirt.VIR_DOMAIN_JOB_UNBOUNDED: + # We loop every 500ms, so don't log on every + # iteration to avoid spamming logs for long + # running migrations. Just once every 5 secs + # is sufficient for developers to debug problems. + # We log once every 30 seconds at info to help + # admins see slow running migration operations + # when debug logs are off. + if (n % 10) == 0: + # Ignoring memory_processed, as due to repeated + # dirtying of data, this can be way larger than + # memory_total. Best to just look at what's + # remaining to copy and ignore what's done already + # + # TODO(berrange) perhaps we could include disk + # transfer stats in the progress too, but it + # might make memory info more obscure as large + # disk sizes might dwarf memory size + progress = 0 + if info.memory_total != 0: + progress = round(info.memory_remaining * + 100 / info.memory_total) + instance.progress = 100 - progress + instance.save() + + lg = LOG.debug + if (n % 60) == 0: + lg = LOG.info + + lg(_LI("Migration running for %(secs)d secs, " + "memory %(progress)d%% remaining; " + "(bytes processed=%(processed)d, " + "remaining=%(remaining)d, " + "total=%(total)d)"), + {"secs": n / 2, "progress": progress, + "processed": info.memory_processed, + "remaining": info.memory_remaining, + "total": info.memory_total}, instance=instance) + + # Migration is still running + # + # This is where we'd wire up calls to change live + # migration status. eg change max downtime, cancel + # the operation, change max bandwidth + n = n + 1 + elif info.type == libvirt.VIR_DOMAIN_JOB_COMPLETED: + # Migration is all done + LOG.info(_LI("Migration operation has completed"), + instance=instance) + post_method(context, instance, dest, block_migration, + migrate_data) + break + elif info.type == libvirt.VIR_DOMAIN_JOB_FAILED: + # Migration did not succeed + LOG.error(_LE("Migration operation has aborted"), + instance=instance) + recover_method(context, instance, dest, block_migration) + break + elif info.type == libvirt.VIR_DOMAIN_JOB_CANCELLED: + # Migration was stopped by admin + LOG.warn(_LW("Migration operation was cancelled"), + instance=instance) + recover_method(context, instance, dest, block_migration) + break + else: + LOG.warn(_LW("Unexpected migration job type: %d"), + info.type, instance=instance) + + time.sleep(0.5) + + def _live_migration(self, context, instance, dest, post_method, + recover_method, block_migration, + migrate_data): + """Do live migration. + + :param context: security context + :param instance: + nova.db.sqlalchemy.models.Instance object + instance object that is migrated. + :param dest: destination host + :param post_method: + post operation method. + expected nova.compute.manager._post_live_migration. + :param recover_method: + recovery method when any exception occurs. + expected nova.compute.manager._rollback_live_migration. + :param block_migration: if true, do block migration. + :param migrate_data: implementation specific params + + This fires off a new thread to run the blocking migration + operation, and then this thread monitors the progress of + migration and controls its operation + """ + + dom = self._host.get_domain(instance) + + opthread = greenthread.spawn(self._live_migration_operation, + context, instance, dest, + block_migration, + migrate_data, dom) + + finish_event = eventlet.event.Event() + + def thread_finished(thread, event): + LOG.debug("Migration operation thread notification", + instance=instance) + event.send() + opthread.link(thread_finished, finish_event) + + # Let eventlet schedule the new thread right away + time.sleep(0) + + try: + LOG.debug("Starting monitoring of live migration", + instance=instance) + self._live_migration_monitor(context, instance, dest, + post_method, recover_method, + block_migration, migrate_data, + dom, finish_event) + except Exception as ex: + LOG.warn(_LW("Error monitoring migration: %(ex)s"), + {"ex": ex}, instance=instance) + raise + finally: + LOG.debug("Live migration monitoring is all done", + instance=instance) def _fetch_instance_kernel_ramdisk(self, context, instance): """Download kernel and ramdisk for instance in instance directory.""" diff --git a/nova/virt/libvirt/host.py b/nova/virt/libvirt/host.py index 95e266afe993..fcf200c74ddf 100644 --- a/nova/virt/libvirt/host.py +++ b/nova/virt/libvirt/host.py @@ -64,6 +64,121 @@ CONF.import_opt('host', 'nova.netconf') CONF.import_opt('my_ip', 'nova.netconf') +class DomainJobInfo(object): + """Information about libvirt background jobs + + This class encapsulates information about libvirt + background jobs. It provides a mapping from either + the old virDomainGetJobInfo API which returned a + fixed list of fields, or the modern virDomainGetJobStats + which returns an extendable dict of fields. + """ + + _have_job_stats = True + + def __init__(self, **kwargs): + + self.type = kwargs.get("type", libvirt.VIR_DOMAIN_JOB_NONE) + self.time_elapsed = kwargs.get("time_elapsed", 0) + self.time_remaining = kwargs.get("time_remaining", 0) + self.downtime = kwargs.get("downtime", 0) + self.setup_time = kwargs.get("setup_time", 0) + self.data_total = kwargs.get("data_total", 0) + self.data_processed = kwargs.get("data_processed", 0) + self.data_remaining = kwargs.get("data_remaining", 0) + self.memory_total = kwargs.get("memory_total", 0) + self.memory_processed = kwargs.get("memory_processed", 0) + self.memory_remaining = kwargs.get("memory_remaining", 0) + self.memory_constant = kwargs.get("memory_constant", 0) + self.memory_normal = kwargs.get("memory_normal", 0) + self.memory_normal_bytes = kwargs.get("memory_normal_bytes", 0) + self.memory_bps = kwargs.get("memory_bps", 0) + self.disk_total = kwargs.get("disk_total", 0) + self.disk_processed = kwargs.get("disk_processed", 0) + self.disk_remaining = kwargs.get("disk_remaining", 0) + self.disk_bps = kwargs.get("disk_bps", 0) + self.comp_cache = kwargs.get("compression_cache", 0) + self.comp_bytes = kwargs.get("compression_bytes", 0) + self.comp_pages = kwargs.get("compression_pages", 0) + self.comp_cache_misses = kwargs.get("compression_cache_misses", 0) + self.comp_overflow = kwargs.get("compression_overflow", 0) + + @classmethod + def _get_job_stats_compat(cls, dom): + # Make the old virDomainGetJobInfo method look similar to the + # modern virDomainGetJobStats method + try: + info = dom.jobInfo() + except libvirt.libvirtError as ex: + # When migration of a transient guest completes, the guest + # goes away so we'll see NO_DOMAIN error code + # + # When migration of a persistent guest completes, the guest + # merely shuts off, but libvirt unhelpfully raises an + # OPERATION_INVALID error code + # + # Lets pretend both of these mean success + if ex.get_error_code() in (libvirt.VIR_ERR_NO_DOMAIN, + libvirt.VIR_ERR_OPERATION_INVALID): + LOG.debug("Domain has shutdown/gone away: %s", ex) + return cls(type=libvirt.VIR_DOMAIN_JOB_COMPLETED) + else: + LOG.debug("Failed to get job info: %s", ex) + raise + + return cls( + type=info[0], + time_elapsed=info[1], + time_remaining=info[2], + data_total=info[3], + data_processed=info[4], + data_remaining=info[5], + memory_total=info[6], + memory_processed=info[7], + memory_remaining=info[8], + disk_total=info[9], + disk_processed=info[10], + disk_remaining=info[11]) + + @classmethod + def for_domain(cls, dom): + '''Get job info for the domain + + Query the libvirt job info for the domain (ie progress + of migration, or snapshot operation) + + Returns: a DomainJobInfo instance + ''' + + if cls._have_job_stats: + try: + stats = dom.jobStats() + return cls(**stats) + except libvirt.libvirtError as ex: + if ex.get_error_code() == libvirt.VIR_ERR_NO_SUPPORT: + # Remote libvirt doesn't support new API + LOG.debug("Missing remote virDomainGetJobStats: %s", ex) + cls._have_job_stats = False + return cls._get_job_stats_compat(dom) + elif ex.get_error_code() in ( + libvirt.VIR_ERR_NO_DOMAIN, + libvirt.VIR_ERR_OPERATION_INVALID): + # Transient guest finished migration, so it has gone + # away completely + LOG.debug("Domain has shutdown/gone away: %s", ex) + return cls(type=libvirt.VIR_DOMAIN_JOB_COMPLETED) + else: + LOG.debug("Failed to get job stats: %s", ex) + raise + except AttributeError as ex: + # Local python binding doesn't support new API + LOG.debug("Missing local virDomainGetJobStats: %s", ex) + cls._have_job_stats = False + return cls._get_job_stats_compat(dom) + else: + return cls._get_job_stats_compat(dom) + + class Host(object): def __init__(self, uri, read_only=False,