libvirt: proper monitoring of live migration progress

The current live migration code simply invokes migrateToURI
and waits for it to finish, or raise an exception. It considers
all exceptions to mean the live migration aborted and the VM is
still running on the source host. This is totally bogus, as there
are a number of reasons why an error could be raised from the
migrateToURI call. There are at least 5 different scenarios for
what the VM might be doing on source + dest host upon error.
The migration might even still be going on, even if after the
error has occurred.

A more reliable way to deal with this is to actively query
libvirt for the domain job status. This gives an indication
of whether the job is completed, failed or cancelled. Even
with that though, there is a need for a few heuristics to
distinguish some of the possible error scenarios.

This change to do active monitoring of the live migration process
also opens the door for being able to tune live migration on the
fly to adjust max downtime or bandwidth to improve chances of
getting convergence, or to automatically abort it after too much
time has elapsed instead of letting it carry on until the end of
the universe. This change merely records memory transfer progress
and leaves tuning improvements to a later date.

Closes-bug: #1414065
Change-Id: I6fcbfa31a79c7808c861bb3a84b56bd096882004
This commit is contained in:
Daniel P. Berrange 2015-01-29 14:33:32 +00:00
parent 75d0aabc4e
commit 7dd6a4a193
5 changed files with 759 additions and 100 deletions

View File

@ -173,6 +173,14 @@ VIR_SECRET_USAGE_TYPE_CEPH = 2
VIR_SECRET_USAGE_TYPE_ISCSI = 3
VIR_DOMAIN_JOB_NONE = 0
VIR_DOMAIN_JOB_BOUNDED = 1
VIR_DOMAIN_JOB_UNBOUNDED = 2
VIR_DOMAIN_JOB_COMPLETED = 3
VIR_DOMAIN_JOB_FAILED = 4
VIR_DOMAIN_JOB_CANCELLED = 5
def _parse_disk_info(element):
disk_info = {}
disk_info['type'] = element.get('type', 'file')
@ -663,6 +671,12 @@ class Domain(object):
def blockJobInfo(self, disk, flags):
return {}
def jobInfo(self):
return []
def jobStats(self, flags=0):
return {}
class DomainSnapshot(object):
def __init__(self, name, domain):
@ -1151,7 +1165,7 @@ class Connection(object):
pass
def openAuth(uri, auth, flags):
def openAuth(uri, auth, flags=0):
if type(auth) != list:
raise Exception("Expected a list for 'auth' parameter")

View File

@ -6563,15 +6563,6 @@ class LibvirtConnTestCase(test.NoDBTestCase):
None,
_bandwidth).AndRaise(libvirt.libvirtError("ERR"))
def fake_lookup(instance_name):
if instance_name == instance_ref['name']:
return vdmock
self.create_fake_libvirt_mock(lookupByName=fake_lookup)
self.mox.StubOutWithMock(self.compute, "_rollback_live_migration")
self.compute._rollback_live_migration(self.context, instance_ref,
'dest', False)
# start test
migrate_data = {'pre_live_migration_result':
{'graphics_listen_addrs':
@ -6579,10 +6570,9 @@ class LibvirtConnTestCase(test.NoDBTestCase):
self.mox.ReplayAll()
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)
self.assertRaises(libvirt.libvirtError,
drvr._live_migration,
self.context, instance_ref, 'dest', False,
self.compute._rollback_live_migration,
migrate_data=migrate_data)
drvr._live_migration_operation,
self.context, instance_ref, 'dest',
False, migrate_data, vdmock)
@mock.patch.object(libvirt, 'VIR_DOMAIN_XML_MIGRATABLE', 8675, create=True)
def test_live_migration_update_volume_xml(self):
@ -6626,10 +6616,9 @@ class LibvirtConnTestCase(test.NoDBTestCase):
instance_id='foo')
mget_domain.return_value = test_mock
test_mock.XMLDesc.return_value = target_xml
self.assertFalse(drvr._live_migration(
self.context, instance_ref, 'dest', test_mock,
self.compute._rollback_live_migration,
migrate_data=migrate_data))
self.assertFalse(drvr._live_migration_operation(
self.context, instance_ref, 'dest', False,
migrate_data, test_mock))
mupdate.assert_called_once_with(target_xml, volume, None)
def test_update_volume_xml(self):
@ -6783,15 +6772,6 @@ class LibvirtConnTestCase(test.NoDBTestCase):
None,
_bandwidth).AndRaise(libvirt.libvirtError("ERR"))
def fake_lookup(instance_name):
if instance_name == instance_ref['name']:
return vdmock
self.create_fake_libvirt_mock(lookupByName=fake_lookup)
self.mox.StubOutWithMock(self.compute, "_rollback_live_migration")
self.compute._rollback_live_migration(self.context, instance_ref,
'dest', False)
# start test
migrate_data = {'pre_live_migration_result':
{'graphics_listen_addrs':
@ -6799,10 +6779,9 @@ class LibvirtConnTestCase(test.NoDBTestCase):
self.mox.ReplayAll()
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)
self.assertRaises(libvirt.libvirtError,
drvr._live_migration,
self.context, instance_ref, 'dest', False,
self.compute._rollback_live_migration,
migrate_data=migrate_data)
drvr._live_migration_operation,
self.context, instance_ref, 'dest',
False, migrate_data, vdmock)
def test_live_migration_uses_migrateToURI_without_dest_listen_addrs(self):
self.compute = importutils.import_object(CONF.compute_manager)
@ -6821,24 +6800,14 @@ class LibvirtConnTestCase(test.NoDBTestCase):
None,
_bandwidth).AndRaise(libvirt.libvirtError("ERR"))
def fake_lookup(instance_name):
if instance_name == instance_ref['name']:
return vdmock
self.create_fake_libvirt_mock(lookupByName=fake_lookup)
self.mox.StubOutWithMock(self.compute, "_rollback_live_migration")
self.compute._rollback_live_migration(self.context, instance_ref,
'dest', False)
# start test
migrate_data = {}
self.mox.ReplayAll()
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)
self.assertRaises(libvirt.libvirtError,
drvr._live_migration,
self.context, instance_ref, 'dest', False,
self.compute._rollback_live_migration,
migrate_data=migrate_data)
drvr._live_migration_operation,
self.context, instance_ref, 'dest',
False, migrate_data, vdmock)
@mock.patch.object(libvirt, 'VIR_DOMAIN_XML_MIGRATABLE', None, create=True)
def test_live_migration_fails_without_migratable_flag_or_0_addr(self):
@ -6854,15 +6823,6 @@ class LibvirtConnTestCase(test.NoDBTestCase):
vdmock = self.mox.CreateMock(libvirt.virDomain)
self.mox.StubOutWithMock(vdmock, "migrateToURI")
def fake_lookup(instance_name):
if instance_name == instance_ref['name']:
return vdmock
self.create_fake_libvirt_mock(lookupByName=fake_lookup)
self.mox.StubOutWithMock(self.compute, "_rollback_live_migration")
self.compute._rollback_live_migration(self.context, instance_ref,
'dest', False)
# start test
migrate_data = {'pre_live_migration_result':
{'graphics_listen_addrs':
@ -6870,10 +6830,9 @@ class LibvirtConnTestCase(test.NoDBTestCase):
self.mox.ReplayAll()
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)
self.assertRaises(exception.MigrationError,
drvr._live_migration,
self.context, instance_ref, 'dest', False,
self.compute._rollback_live_migration,
migrate_data=migrate_data)
drvr._live_migration_operation,
self.context, instance_ref, 'dest',
False, migrate_data, vdmock)
def test_live_migration_raises_exception(self):
# Confirms recover method is called when exceptions are raised.
@ -6906,15 +6865,6 @@ class LibvirtConnTestCase(test.NoDBTestCase):
_bandwidth).AndRaise(
libvirt.libvirtError('ERR'))
def fake_lookup(instance_name):
if instance_name == instance_ref['name']:
return vdmock
self.create_fake_libvirt_mock(lookupByName=fake_lookup)
self.mox.StubOutWithMock(self.compute, "_rollback_live_migration")
self.compute._rollback_live_migration(self.context, instance_ref,
'dest', False)
# start test
migrate_data = {'pre_live_migration_result':
{'graphics_listen_addrs':
@ -6922,10 +6872,9 @@ class LibvirtConnTestCase(test.NoDBTestCase):
self.mox.ReplayAll()
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)
self.assertRaises(libvirt.libvirtError,
drvr._live_migration,
self.context, instance_ref, 'dest', False,
self.compute._rollback_live_migration,
migrate_data=migrate_data)
drvr._live_migration_operation,
self.context, instance_ref, 'dest',
False, migrate_data, vdmock)
self.assertEqual(vm_states.ACTIVE, instance_ref.vm_state)
self.assertEqual(power_state.RUNNING, instance_ref.power_state)
@ -6959,15 +6908,6 @@ class LibvirtConnTestCase(test.NoDBTestCase):
mox.IgnoreArg(), None,
_bandwidth).AndRaise(test.TestingException('oops'))
def fake_lookup(instance_name):
if instance_name == instance_ref.name:
return vdmock
self.create_fake_libvirt_mock(lookupByName=fake_lookup)
def fake_recover_method(context, instance, dest, block_migration):
pass
graphics_listen_addrs = {'vnc': '0.0.0.0', 'spice': '127.0.0.1'}
migrate_data = {'pre_live_migration_result':
{'graphics_listen_addrs': graphics_listen_addrs}}
@ -6979,10 +6919,10 @@ class LibvirtConnTestCase(test.NoDBTestCase):
self.mox.ReplayAll()
# start test
self.assertRaises(test.TestingException, drvr._live_migration,
self.context, instance_ref, 'dest', post_method=None,
recover_method=fake_recover_method,
migrate_data=migrate_data)
self.assertRaises(test.TestingException,
drvr._live_migration_operation,
self.context, instance_ref, 'dest',
False, migrate_data, vdmock)
@mock.patch('shutil.rmtree')
@mock.patch('os.path.exists', return_value=True)
@ -7028,6 +6968,190 @@ class LibvirtConnTestCase(test.NoDBTestCase):
self.assertFalse(mock_exist.called)
self.assertFalse(mock_shutil.called)
@mock.patch.object(time, "sleep",
side_effect=lambda x: eventlet.sleep(0))
@mock.patch.object(host.DomainJobInfo, "for_domain")
@mock.patch.object(objects.Instance, "save")
@mock.patch.object(fakelibvirt.Connection, "_mark_running")
def _test_live_migration_monitoring(self,
job_info_records,
expect_success,
mock_running,
mock_save,
mock_job_info,
mock_sleep):
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)
instance = objects.Instance(**self.test_instance)
dom = fakelibvirt.Domain(drvr._get_connection(), "<domain/>", True)
finish_event = eventlet.event.Event()
def fake_job_info(hostself):
while True:
self.assertTrue(len(job_info_records) > 0)
rec = job_info_records.pop()
if type(rec) == str:
if rec == "thread-finish":
finish_event.send()
elif rec == "domain-stop":
dom.destroy()
else:
return rec
return rec
mock_job_info.side_effect = fake_job_info
def fake_post_method(self, *args, **kwargs):
fake_post_method.called = True
def fake_recover_method(self, *args, **kwargs):
fake_recover_method.called = True
fake_post_method.called = False
fake_recover_method.called = False
drvr._live_migration_monitor(self.context, instance,
"somehostname",
fake_post_method,
fake_recover_method,
False,
{},
dom,
finish_event)
self.assertEqual(fake_post_method.called, expect_success)
self.assertEqual(fake_recover_method.called, not expect_success)
def test_live_migration_monitor_success(self):
# A normal sequence where see all the normal job states
domain_info_records = [
host.DomainJobInfo(
type=libvirt.VIR_DOMAIN_JOB_NONE),
host.DomainJobInfo(
type=libvirt.VIR_DOMAIN_JOB_UNBOUNDED),
host.DomainJobInfo(
type=libvirt.VIR_DOMAIN_JOB_UNBOUNDED),
host.DomainJobInfo(
type=libvirt.VIR_DOMAIN_JOB_UNBOUNDED),
"thread-finish",
"domain-stop",
host.DomainJobInfo(
type=libvirt.VIR_DOMAIN_JOB_COMPLETED),
]
self._test_live_migration_monitoring(domain_info_records, True)
def test_live_migration_monitor_success_race(self):
# A normalish sequence but we're too slow to see the
# completed job state
domain_info_records = [
host.DomainJobInfo(
type=libvirt.VIR_DOMAIN_JOB_NONE),
host.DomainJobInfo(
type=libvirt.VIR_DOMAIN_JOB_UNBOUNDED),
host.DomainJobInfo(
type=libvirt.VIR_DOMAIN_JOB_UNBOUNDED),
host.DomainJobInfo(
type=libvirt.VIR_DOMAIN_JOB_UNBOUNDED),
"thread-finish",
"domain-stop",
host.DomainJobInfo(
type=libvirt.VIR_DOMAIN_JOB_NONE),
]
self._test_live_migration_monitoring(domain_info_records, True)
def test_live_migration_monitor_failed(self):
# A failed sequence where we see all the expected events
domain_info_records = [
host.DomainJobInfo(
type=libvirt.VIR_DOMAIN_JOB_NONE),
host.DomainJobInfo(
type=libvirt.VIR_DOMAIN_JOB_UNBOUNDED),
host.DomainJobInfo(
type=libvirt.VIR_DOMAIN_JOB_UNBOUNDED),
host.DomainJobInfo(
type=libvirt.VIR_DOMAIN_JOB_UNBOUNDED),
"thread-finish",
host.DomainJobInfo(
type=libvirt.VIR_DOMAIN_JOB_FAILED),
]
self._test_live_migration_monitoring(domain_info_records, False)
def test_live_migration_monitor_failed_race(self):
# A failed sequence where we are too slow to see the
# failed event
domain_info_records = [
host.DomainJobInfo(
type=libvirt.VIR_DOMAIN_JOB_NONE),
host.DomainJobInfo(
type=libvirt.VIR_DOMAIN_JOB_UNBOUNDED),
host.DomainJobInfo(
type=libvirt.VIR_DOMAIN_JOB_UNBOUNDED),
host.DomainJobInfo(
type=libvirt.VIR_DOMAIN_JOB_UNBOUNDED),
"thread-finish",
host.DomainJobInfo(
type=libvirt.VIR_DOMAIN_JOB_NONE),
]
self._test_live_migration_monitoring(domain_info_records, False)
def test_live_migration_monitor_cancelled(self):
# A cancelled sequence where we see all the events
domain_info_records = [
host.DomainJobInfo(
type=libvirt.VIR_DOMAIN_JOB_NONE),
host.DomainJobInfo(
type=libvirt.VIR_DOMAIN_JOB_UNBOUNDED),
host.DomainJobInfo(
type=libvirt.VIR_DOMAIN_JOB_UNBOUNDED),
host.DomainJobInfo(
type=libvirt.VIR_DOMAIN_JOB_UNBOUNDED),
"thread-finish",
"domain-stop",
host.DomainJobInfo(
type=libvirt.VIR_DOMAIN_JOB_CANCELLED),
]
self._test_live_migration_monitoring(domain_info_records, False)
@mock.patch.object(greenthread, "spawn")
@mock.patch.object(libvirt_driver.LibvirtDriver, "_live_migration_monitor")
@mock.patch.object(host.Host, "get_domain")
@mock.patch.object(fakelibvirt.Connection, "_mark_running")
def test_live_migration_main(self, mock_running, mock_dom,
mock_monitor, mock_thread):
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)
instance = objects.Instance(**self.test_instance)
dom = fakelibvirt.Domain(drvr._get_connection(), "<domain/>", True)
migrate_data = {}
mock_dom.return_value = dom
def fake_post():
pass
def fake_recover():
pass
drvr._live_migration(self.context, instance, "fakehost",
fake_post, fake_recover, False,
migrate_data)
class AnyEventletEvent(object):
def __eq__(self, other):
return type(other) == eventlet.event.Event
mock_thread.assert_called_once_with(
drvr._live_migration_operation,
self.context, instance, "fakehost", False,
migrate_data, dom)
mock_monitor.assert_called_once_with(
self.context, instance, "fakehost",
fake_post, fake_recover, False,
migrate_data, dom, AnyEventletEvent())
def _do_test_create_images_and_backing(self, disk_type):
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)
self.mox.StubOutWithMock(drvr, '_fetch_instance_kernel_ramdisk')

View File

@ -710,3 +710,197 @@ class HostTestCase(test.NoDBTestCase):
mock_find_secret.return_value = None
self.host.delete_secret("rbd", "rbdvol")
class DomainJobInfoTestCase(test.NoDBTestCase):
def setUp(self):
super(DomainJobInfoTestCase, self).setUp()
self.useFixture(fakelibvirt.FakeLibvirtFixture())
self.conn = fakelibvirt.openAuth("qemu:///system",
[[], lambda: True])
xml = ("<domain type='kvm'>"
" <name>instance-0000000a</name>"
"</domain>")
self.dom = self.conn.createXML(xml, 0)
host.DomainJobInfo._have_job_stats = True
@mock.patch.object(fakelibvirt.virDomain, "jobInfo")
@mock.patch.object(fakelibvirt.virDomain, "jobStats")
def test_job_stats(self, mock_stats, mock_info):
mock_stats.return_value = {
"type": libvirt.VIR_DOMAIN_JOB_UNBOUNDED,
"memory_total": 75,
"memory_processed": 50,
"memory_remaining": 33,
"some_new_libvirt_stat_we_dont_know_about": 83
}
info = host.DomainJobInfo.for_domain(self.dom)
self.assertIsInstance(info, host.DomainJobInfo)
self.assertEqual(libvirt.VIR_DOMAIN_JOB_UNBOUNDED, info.type)
self.assertEqual(75, info.memory_total)
self.assertEqual(50, info.memory_processed)
self.assertEqual(33, info.memory_remaining)
self.assertEqual(0, info.disk_total)
self.assertEqual(0, info.disk_processed)
self.assertEqual(0, info.disk_remaining)
mock_stats.assert_called_once_with()
self.assertFalse(mock_info.called)
@mock.patch.object(fakelibvirt.virDomain, "jobInfo")
@mock.patch.object(fakelibvirt.virDomain, "jobStats")
def test_job_info_no_support(self, mock_stats, mock_info):
mock_stats.side_effect = fakelibvirt.make_libvirtError(
libvirt.libvirtError,
"virDomainGetJobStats not implemented",
libvirt.VIR_ERR_NO_SUPPORT)
mock_info.return_value = [
libvirt.VIR_DOMAIN_JOB_UNBOUNDED,
100, 99, 10, 11, 12, 75, 50, 33, 1, 2, 3]
info = host.DomainJobInfo.for_domain(self.dom)
self.assertIsInstance(info, host.DomainJobInfo)
self.assertEqual(libvirt.VIR_DOMAIN_JOB_UNBOUNDED, info.type)
self.assertEqual(100, info.time_elapsed)
self.assertEqual(99, info.time_remaining)
self.assertEqual(10, info.data_total)
self.assertEqual(11, info.data_processed)
self.assertEqual(12, info.data_remaining)
self.assertEqual(75, info.memory_total)
self.assertEqual(50, info.memory_processed)
self.assertEqual(33, info.memory_remaining)
self.assertEqual(1, info.disk_total)
self.assertEqual(2, info.disk_processed)
self.assertEqual(3, info.disk_remaining)
mock_stats.assert_called_once_with()
mock_info.assert_called_once_with()
@mock.patch.object(fakelibvirt.virDomain, "jobInfo")
@mock.patch.object(fakelibvirt.virDomain, "jobStats")
def test_job_info_attr_error(self, mock_stats, mock_info):
mock_stats.side_effect = AttributeError("No such API")
mock_info.return_value = [
libvirt.VIR_DOMAIN_JOB_UNBOUNDED,
100, 99, 10, 11, 12, 75, 50, 33, 1, 2, 3]
info = host.DomainJobInfo.for_domain(self.dom)
self.assertIsInstance(info, host.DomainJobInfo)
self.assertEqual(libvirt.VIR_DOMAIN_JOB_UNBOUNDED, info.type)
self.assertEqual(100, info.time_elapsed)
self.assertEqual(99, info.time_remaining)
self.assertEqual(10, info.data_total)
self.assertEqual(11, info.data_processed)
self.assertEqual(12, info.data_remaining)
self.assertEqual(75, info.memory_total)
self.assertEqual(50, info.memory_processed)
self.assertEqual(33, info.memory_remaining)
self.assertEqual(1, info.disk_total)
self.assertEqual(2, info.disk_processed)
self.assertEqual(3, info.disk_remaining)
mock_stats.assert_called_once_with()
mock_info.assert_called_once_with()
@mock.patch.object(fakelibvirt.virDomain, "jobInfo")
@mock.patch.object(fakelibvirt.virDomain, "jobStats")
def test_job_stats_no_domain(self, mock_stats, mock_info):
mock_stats.side_effect = fakelibvirt.make_libvirtError(
libvirt.libvirtError,
"No such domain with UUID blah",
libvirt.VIR_ERR_NO_DOMAIN)
info = host.DomainJobInfo.for_domain(self.dom)
self.assertIsInstance(info, host.DomainJobInfo)
self.assertEqual(libvirt.VIR_DOMAIN_JOB_COMPLETED, info.type)
self.assertEqual(0, info.time_elapsed)
self.assertEqual(0, info.time_remaining)
self.assertEqual(0, info.memory_total)
self.assertEqual(0, info.memory_processed)
self.assertEqual(0, info.memory_remaining)
mock_stats.assert_called_once_with()
self.assertFalse(mock_info.called)
@mock.patch.object(fakelibvirt.virDomain, "jobInfo")
@mock.patch.object(fakelibvirt.virDomain, "jobStats")
def test_job_info_no_domain(self, mock_stats, mock_info):
mock_stats.side_effect = fakelibvirt.make_libvirtError(
libvirt.libvirtError,
"virDomainGetJobStats not implemented",
libvirt.VIR_ERR_NO_SUPPORT)
mock_info.side_effect = fakelibvirt.make_libvirtError(
libvirt.libvirtError,
"No such domain with UUID blah",
libvirt.VIR_ERR_NO_DOMAIN)
info = host.DomainJobInfo.for_domain(self.dom)
self.assertIsInstance(info, host.DomainJobInfo)
self.assertEqual(libvirt.VIR_DOMAIN_JOB_COMPLETED, info.type)
self.assertEqual(0, info.time_elapsed)
self.assertEqual(0, info.time_remaining)
self.assertEqual(0, info.memory_total)
self.assertEqual(0, info.memory_processed)
self.assertEqual(0, info.memory_remaining)
mock_stats.assert_called_once_with()
mock_info.assert_called_once_with()
@mock.patch.object(fakelibvirt.virDomain, "jobInfo")
@mock.patch.object(fakelibvirt.virDomain, "jobStats")
def test_job_stats_operation_invalid(self, mock_stats, mock_info):
mock_stats.side_effect = fakelibvirt.make_libvirtError(
libvirt.libvirtError,
"Domain is not running",
libvirt.VIR_ERR_OPERATION_INVALID)
info = host.DomainJobInfo.for_domain(self.dom)
self.assertIsInstance(info, host.DomainJobInfo)
self.assertEqual(libvirt.VIR_DOMAIN_JOB_COMPLETED, info.type)
self.assertEqual(0, info.time_elapsed)
self.assertEqual(0, info.time_remaining)
self.assertEqual(0, info.memory_total)
self.assertEqual(0, info.memory_processed)
self.assertEqual(0, info.memory_remaining)
mock_stats.assert_called_once_with()
self.assertFalse(mock_info.called)
@mock.patch.object(fakelibvirt.virDomain, "jobInfo")
@mock.patch.object(fakelibvirt.virDomain, "jobStats")
def test_job_info_operation_invalid(self, mock_stats, mock_info):
mock_stats.side_effect = fakelibvirt.make_libvirtError(
libvirt.libvirtError,
"virDomainGetJobStats not implemented",
libvirt.VIR_ERR_NO_SUPPORT)
mock_info.side_effect = fakelibvirt.make_libvirtError(
libvirt.libvirtError,
"Domain is not running",
libvirt.VIR_ERR_OPERATION_INVALID)
info = host.DomainJobInfo.for_domain(self.dom)
self.assertIsInstance(info, host.DomainJobInfo)
self.assertEqual(libvirt.VIR_DOMAIN_JOB_COMPLETED, info.type)
self.assertEqual(0, info.time_elapsed)
self.assertEqual(0, info.time_remaining)
self.assertEqual(0, info.memory_total)
self.assertEqual(0, info.memory_processed)
self.assertEqual(0, info.memory_remaining)
mock_stats.assert_called_once_with()
mock_info.assert_called_once_with()

View File

@ -5364,27 +5364,23 @@ class LibvirtDriver(driver.ComputeDriver):
' continue to listen on the current'
' addresses.'))
def _live_migration(self, context, instance, dest, post_method,
recover_method, block_migration=False,
migrate_data=None):
"""Do live migration.
def _live_migration_operation(self, context, instance, dest,
block_migration, migrate_data, dom):
"""Invoke the live migration operation
:param context: security context
:param instance:
nova.db.sqlalchemy.models.Instance object
instance object that is migrated.
:param dest: destination host
:param post_method:
post operation method.
expected nova.compute.manager._post_live_migration.
:param recover_method:
recovery method when any exception occurs.
expected nova.compute.manager._rollback_live_migration.
:param block_migration: if true, do block migration.
:param migrate_data: implementation specific params
:param dom: the libvirt domain object
This method is intended to be run in a background thread and will
block that thread until the migration is finished or failed.
"""
# Do live migration.
try:
if block_migration:
flaglist = CONF.libvirt.block_migration_flag.split(',')
@ -5393,8 +5389,6 @@ class LibvirtDriver(driver.ComputeDriver):
flagvals = [getattr(libvirt, x.strip()) for x in flaglist]
logical_sum = reduce(lambda x, y: x | y, flagvals)
dom = self._host.get_domain(instance)
pre_live_migrate_data = (migrate_data or {}).get(
'pre_live_migration_result', {})
listen_addrs = pre_live_migrate_data.get('graphics_listen_addrs')
@ -5449,15 +5443,233 @@ class LibvirtDriver(driver.ComputeDriver):
CONF.libvirt.live_migration_bandwidth)
else:
raise
except Exception as e:
with excutils.save_and_reraise_exception():
LOG.error(_LE("Live Migration failure: %s"), e,
instance=instance)
recover_method(context, instance, dest, block_migration)
post_method(context, instance, dest, block_migration,
migrate_data)
# If 'migrateToURI' fails we don't know what state the
# VM instances on each host are in. Possibilities include
#
# 1. src==running, dst==none
#
# Migration failed & rolled back, or never started
#
# 2. src==running, dst==paused
#
# Migration started but is still ongoing
#
# 3. src==paused, dst==paused
#
# Migration data transfer completed, but switchover
# is still ongoing, or failed
#
# 4. src==paused, dst==running
#
# Migration data transfer completed, switchover
# happened but cleanup on source failed
#
# 5. src==none, dst==running
#
# Migration fully succeeded.
#
# Libvirt will aim to complete any migration operation
# or roll it back. So even if the migrateToURI call has
# returned an error, if the migration was not finished
# libvirt should clean up.
#
# So we take the error raise here with a pinch of salt
# and rely on the domain job info status to figure out
# what really happened to the VM, which is a much more
# reliable indicator.
#
# In particular we need to try very hard to ensure that
# Nova does not "forget" about the guest. ie leaving it
# running on a different host to the one recorded in
# the database, as that would be a serious resource leak
LOG.debug("Migration operation thread has finished",
instance=instance)
def _live_migration_monitor(self, context, instance, dest, post_method,
recover_method, block_migration,
migrate_data, dom, finish_event):
n = 0
while True:
info = host.DomainJobInfo.for_domain(dom)
if info.type == libvirt.VIR_DOMAIN_JOB_NONE:
# Annoyingly this could indicate many possible
# states, so we must fix the mess:
#
# 1. Migration has not yet begun
# 2. Migration has stopped due to failure
# 3. Migration has stopped due to completion
#
# We can detect option 1 by seeing if thread is still
# running. We can distinguish 2 vs 3 by seeing if the
# VM still exists & running on the current host
#
if not finish_event.ready():
LOG.debug("Operation thread is still running",
instance=instance)
# Leave type untouched
else:
try:
if dom.isActive():
LOG.debug("VM running on src, migration failed",
instance=instance)
info.type = libvirt.VIR_DOMAIN_JOB_FAILED
else:
LOG.debug("VM is shutoff, migration finished",
instance=instance)
info.type = libvirt.VIR_DOMAIN_JOB_COMPLETED
except libvirt.libvirtError as ex:
LOG.debug("Error checking domain status %(ex)s",
ex, instance=instance)
if ex.get_error_code() == libvirt.VIR_ERR_NO_DOMAIN:
LOG.debug("VM is missing, migration finished",
instance=instance)
info.type = libvirt.VIR_DOMAIN_JOB_COMPLETED
else:
LOG.info(_LI("Error %(ex)s, migration failed"),
instance=instance)
info.type = libvirt.VIR_DOMAIN_JOB_FAILED
if info.type != libvirt.VIR_DOMAIN_JOB_NONE:
LOG.debug("Fixed incorrect job type to be %d",
info.type, instance=instance)
if info.type == libvirt.VIR_DOMAIN_JOB_NONE:
# Migration is not yet started
LOG.debug("Migration not running yet",
instance=instance)
elif info.type == libvirt.VIR_DOMAIN_JOB_UNBOUNDED:
# We loop every 500ms, so don't log on every
# iteration to avoid spamming logs for long
# running migrations. Just once every 5 secs
# is sufficient for developers to debug problems.
# We log once every 30 seconds at info to help
# admins see slow running migration operations
# when debug logs are off.
if (n % 10) == 0:
# Ignoring memory_processed, as due to repeated
# dirtying of data, this can be way larger than
# memory_total. Best to just look at what's
# remaining to copy and ignore what's done already
#
# TODO(berrange) perhaps we could include disk
# transfer stats in the progress too, but it
# might make memory info more obscure as large
# disk sizes might dwarf memory size
progress = 0
if info.memory_total != 0:
progress = round(info.memory_remaining *
100 / info.memory_total)
instance.progress = 100 - progress
instance.save()
lg = LOG.debug
if (n % 60) == 0:
lg = LOG.info
lg(_LI("Migration running for %(secs)d secs, "
"memory %(progress)d%% remaining; "
"(bytes processed=%(processed)d, "
"remaining=%(remaining)d, "
"total=%(total)d)"),
{"secs": n / 2, "progress": progress,
"processed": info.memory_processed,
"remaining": info.memory_remaining,
"total": info.memory_total}, instance=instance)
# Migration is still running
#
# This is where we'd wire up calls to change live
# migration status. eg change max downtime, cancel
# the operation, change max bandwidth
n = n + 1
elif info.type == libvirt.VIR_DOMAIN_JOB_COMPLETED:
# Migration is all done
LOG.info(_LI("Migration operation has completed"),
instance=instance)
post_method(context, instance, dest, block_migration,
migrate_data)
break
elif info.type == libvirt.VIR_DOMAIN_JOB_FAILED:
# Migration did not succeed
LOG.error(_LE("Migration operation has aborted"),
instance=instance)
recover_method(context, instance, dest, block_migration)
break
elif info.type == libvirt.VIR_DOMAIN_JOB_CANCELLED:
# Migration was stopped by admin
LOG.warn(_LW("Migration operation was cancelled"),
instance=instance)
recover_method(context, instance, dest, block_migration)
break
else:
LOG.warn(_LW("Unexpected migration job type: %d"),
info.type, instance=instance)
time.sleep(0.5)
def _live_migration(self, context, instance, dest, post_method,
recover_method, block_migration,
migrate_data):
"""Do live migration.
:param context: security context
:param instance:
nova.db.sqlalchemy.models.Instance object
instance object that is migrated.
:param dest: destination host
:param post_method:
post operation method.
expected nova.compute.manager._post_live_migration.
:param recover_method:
recovery method when any exception occurs.
expected nova.compute.manager._rollback_live_migration.
:param block_migration: if true, do block migration.
:param migrate_data: implementation specific params
This fires off a new thread to run the blocking migration
operation, and then this thread monitors the progress of
migration and controls its operation
"""
dom = self._host.get_domain(instance)
opthread = greenthread.spawn(self._live_migration_operation,
context, instance, dest,
block_migration,
migrate_data, dom)
finish_event = eventlet.event.Event()
def thread_finished(thread, event):
LOG.debug("Migration operation thread notification",
instance=instance)
event.send()
opthread.link(thread_finished, finish_event)
# Let eventlet schedule the new thread right away
time.sleep(0)
try:
LOG.debug("Starting monitoring of live migration",
instance=instance)
self._live_migration_monitor(context, instance, dest,
post_method, recover_method,
block_migration, migrate_data,
dom, finish_event)
except Exception as ex:
LOG.warn(_LW("Error monitoring migration: %(ex)s"),
{"ex": ex}, instance=instance)
raise
finally:
LOG.debug("Live migration monitoring is all done",
instance=instance)
def _fetch_instance_kernel_ramdisk(self, context, instance):
"""Download kernel and ramdisk for instance in instance directory."""

View File

@ -64,6 +64,121 @@ CONF.import_opt('host', 'nova.netconf')
CONF.import_opt('my_ip', 'nova.netconf')
class DomainJobInfo(object):
"""Information about libvirt background jobs
This class encapsulates information about libvirt
background jobs. It provides a mapping from either
the old virDomainGetJobInfo API which returned a
fixed list of fields, or the modern virDomainGetJobStats
which returns an extendable dict of fields.
"""
_have_job_stats = True
def __init__(self, **kwargs):
self.type = kwargs.get("type", libvirt.VIR_DOMAIN_JOB_NONE)
self.time_elapsed = kwargs.get("time_elapsed", 0)
self.time_remaining = kwargs.get("time_remaining", 0)
self.downtime = kwargs.get("downtime", 0)
self.setup_time = kwargs.get("setup_time", 0)
self.data_total = kwargs.get("data_total", 0)
self.data_processed = kwargs.get("data_processed", 0)
self.data_remaining = kwargs.get("data_remaining", 0)
self.memory_total = kwargs.get("memory_total", 0)
self.memory_processed = kwargs.get("memory_processed", 0)
self.memory_remaining = kwargs.get("memory_remaining", 0)
self.memory_constant = kwargs.get("memory_constant", 0)
self.memory_normal = kwargs.get("memory_normal", 0)
self.memory_normal_bytes = kwargs.get("memory_normal_bytes", 0)
self.memory_bps = kwargs.get("memory_bps", 0)
self.disk_total = kwargs.get("disk_total", 0)
self.disk_processed = kwargs.get("disk_processed", 0)
self.disk_remaining = kwargs.get("disk_remaining", 0)
self.disk_bps = kwargs.get("disk_bps", 0)
self.comp_cache = kwargs.get("compression_cache", 0)
self.comp_bytes = kwargs.get("compression_bytes", 0)
self.comp_pages = kwargs.get("compression_pages", 0)
self.comp_cache_misses = kwargs.get("compression_cache_misses", 0)
self.comp_overflow = kwargs.get("compression_overflow", 0)
@classmethod
def _get_job_stats_compat(cls, dom):
# Make the old virDomainGetJobInfo method look similar to the
# modern virDomainGetJobStats method
try:
info = dom.jobInfo()
except libvirt.libvirtError as ex:
# When migration of a transient guest completes, the guest
# goes away so we'll see NO_DOMAIN error code
#
# When migration of a persistent guest completes, the guest
# merely shuts off, but libvirt unhelpfully raises an
# OPERATION_INVALID error code
#
# Lets pretend both of these mean success
if ex.get_error_code() in (libvirt.VIR_ERR_NO_DOMAIN,
libvirt.VIR_ERR_OPERATION_INVALID):
LOG.debug("Domain has shutdown/gone away: %s", ex)
return cls(type=libvirt.VIR_DOMAIN_JOB_COMPLETED)
else:
LOG.debug("Failed to get job info: %s", ex)
raise
return cls(
type=info[0],
time_elapsed=info[1],
time_remaining=info[2],
data_total=info[3],
data_processed=info[4],
data_remaining=info[5],
memory_total=info[6],
memory_processed=info[7],
memory_remaining=info[8],
disk_total=info[9],
disk_processed=info[10],
disk_remaining=info[11])
@classmethod
def for_domain(cls, dom):
'''Get job info for the domain
Query the libvirt job info for the domain (ie progress
of migration, or snapshot operation)
Returns: a DomainJobInfo instance
'''
if cls._have_job_stats:
try:
stats = dom.jobStats()
return cls(**stats)
except libvirt.libvirtError as ex:
if ex.get_error_code() == libvirt.VIR_ERR_NO_SUPPORT:
# Remote libvirt doesn't support new API
LOG.debug("Missing remote virDomainGetJobStats: %s", ex)
cls._have_job_stats = False
return cls._get_job_stats_compat(dom)
elif ex.get_error_code() in (
libvirt.VIR_ERR_NO_DOMAIN,
libvirt.VIR_ERR_OPERATION_INVALID):
# Transient guest finished migration, so it has gone
# away completely
LOG.debug("Domain has shutdown/gone away: %s", ex)
return cls(type=libvirt.VIR_DOMAIN_JOB_COMPLETED)
else:
LOG.debug("Failed to get job stats: %s", ex)
raise
except AttributeError as ex:
# Local python binding doesn't support new API
LOG.debug("Missing local virDomainGetJobStats: %s", ex)
cls._have_job_stats = False
return cls._get_job_stats_compat(dom)
else:
return cls._get_job_stats_compat(dom)
class Host(object):
def __init__(self, uri, read_only=False,