diff --git a/nova/compute/manager.py b/nova/compute/manager.py index 5fedd0035769..7bc7af1c3f57 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -128,6 +128,14 @@ compute_opts = [ cfg.IntOpt('max_concurrent_builds', default=10, help='Maximum number of instance builds to run concurrently'), + cfg.IntOpt('max_concurrent_live_migrations', + default=1, + help='Maximum number of live migrations to run concurrently. ' + 'This limit is enforced to avoid outbound live migrations ' + 'overwhelming the host/network and causing failures. It ' + 'is not recommended that you change this unless you are ' + 'very sure that doing so is safe and stable in your ' + 'environment.'), cfg.IntOpt('block_device_allocate_retries', default=60, help='Number of times to retry block device' @@ -686,6 +694,11 @@ class ComputeManager(manager.Manager): CONF.max_concurrent_builds) else: self._build_semaphore = compute_utils.UnlimitedSemaphore() + if max(CONF.max_concurrent_live_migrations, 0) != 0: + self._live_migration_semaphore = eventlet.semaphore.Semaphore( + CONF.max_concurrent_live_migrations) + else: + self._live_migration_semaphore = compute_utils.UnlimitedSemaphore() super(ComputeManager, self).__init__(service_name="compute", *args, **kwargs) @@ -4925,22 +4938,8 @@ class ComputeManager(manager.Manager): return pre_live_migration_data - @wrap_exception() - @wrap_instance_event - @wrap_instance_fault - def live_migration(self, context, dest, instance, block_migration, - migration, migrate_data): - """Executing live migration. - - :param context: security context - :param instance: a nova.objects.instance.Instance object - :param dest: destination host - :param block_migration: if true, prepare for block migration - :param migration: an nova.objects.Migration object - :param migrate_data: implementation specific params - - """ - + def _do_live_migration(self, context, dest, instance, block_migration, + migration, migrate_data): # NOTE(danms): Remove these guards in v5.0 of the RPC API if migration: # NOTE(danms): We should enhance the RT to account for migrations @@ -4995,6 +4994,39 @@ class ComputeManager(manager.Manager): migration.status = 'failed' migration.save() + @wrap_exception() + @wrap_instance_event + @wrap_instance_fault + def live_migration(self, context, dest, instance, block_migration, + migration, migrate_data): + """Executing live migration. + + :param context: security context + :param dest: destination host + :param instance: a nova.objects.instance.Instance object + :param block_migration: if true, prepare for block migration + :param migration: an nova.objects.Migration object + :param migrate_data: implementation specific params + + """ + + # NOTE(danms): Remove these guards in v5.0 of the RPC API + if migration: + migration.status = 'queued' + migration.save() + + def dispatch_live_migration(*args, **kwargs): + with self._live_migration_semaphore: + self._do_live_migration(*args, **kwargs) + + # NOTE(danms): We spawn here to return the RPC worker thread back to + # the pool. Since what follows could take a really long time, we don't + # want to tie up RPC workers. + utils.spawn_n(dispatch_live_migration, + context, dest, instance, + block_migration, migration, + migrate_data) + def _live_migration_cleanup_flags(self, block_migration, migrate_data): """Determine whether disks or instance path need to be cleaned up after live migration (at source on success, at destination on rollback) diff --git a/nova/tests/unit/compute/test_compute_mgr.py b/nova/tests/unit/compute/test_compute_mgr.py index 9cef5f6c75ef..73c65f61b9ed 100644 --- a/nova/tests/unit/compute/test_compute_mgr.py +++ b/nova/tests/unit/compute/test_compute_mgr.py @@ -3866,3 +3866,56 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase): self.flags(enabled=True, group=console) self.assertTrue(self.compute._consoles_enabled()) self.flags(enabled=False, group=console) + + @mock.patch('nova.utils.spawn_n') + @mock.patch('nova.compute.manager.ComputeManager.' + '_do_live_migration') + def _test_max_concurrent_live(self, mock_lm, mock_spawn): + mock_spawn.side_effect = lambda f, *a, **k: f(*a, **k) + + @mock.patch('nova.objects.Migration.save') + def _do_it(mock_mig_save): + instance = objects.Instance(uuid=str(uuid.uuid4())) + migration = objects.Migration() + self.compute.live_migration(self.context, + mock.sentinel.dest, + instance, + mock.sentinel.block_migration, + migration, + mock.sentinel.migrate_data) + self.assertEqual('queued', migration.status) + migration.save.assert_called_once_with() + + with mock.patch.object(self.compute, + '_live_migration_semaphore') as mock_sem: + for i in (1, 2, 3): + _do_it() + self.assertEqual(3, mock_sem.__enter__.call_count) + + def test_max_concurrent_live_limited(self): + self.flags(max_concurrent_live_migrations=2) + self._test_max_concurrent_live() + + def test_max_concurrent_live_unlimited(self): + self.flags(max_concurrent_live_migrations=0) + self._test_max_concurrent_live() + + def test_max_concurrent_live_semaphore_limited(self): + self.flags(max_concurrent_live_migrations=123) + self.assertEqual( + 123, + manager.ComputeManager()._live_migration_semaphore.balance) + + def test_max_concurrent_live_semaphore_unlimited(self): + self.flags(max_concurrent_live_migrations=0) + compute = manager.ComputeManager() + self.assertEqual(0, compute._live_migration_semaphore.balance) + self.assertIsInstance(compute._live_migration_semaphore, + compute_utils.UnlimitedSemaphore) + + def test_max_concurrent_live_semaphore_negative(self): + self.flags(max_concurrent_live_migrations=-2) + compute = manager.ComputeManager() + self.assertEqual(0, compute._live_migration_semaphore.balance) + self.assertIsInstance(compute._live_migration_semaphore, + compute_utils.UnlimitedSemaphore) diff --git a/nova/tests/unit/virt/libvirt/fakelibvirt.py b/nova/tests/unit/virt/libvirt/fakelibvirt.py index 1a3d636033d2..001ae6f65c5b 100644 --- a/nova/tests/unit/virt/libvirt/fakelibvirt.py +++ b/nova/tests/unit/virt/libvirt/fakelibvirt.py @@ -760,7 +760,9 @@ class Domain(object): return 0 def jobInfo(self): - return [] + # NOTE(danms): This is an array of 12 integers, so just report + # something to avoid an IndexError if we look at this + return [0] * 12 def jobStats(self, flags=0): return {} diff --git a/nova/tests/unit/virt/libvirt/test_driver.py b/nova/tests/unit/virt/libvirt/test_driver.py index b7ca5e44f552..e26cc0f4c9bb 100644 --- a/nova/tests/unit/virt/libvirt/test_driver.py +++ b/nova/tests/unit/virt/libvirt/test_driver.py @@ -11533,18 +11533,18 @@ class LibvirtConnTestCase(test.NoDBTestCase): dstfile, "qcow2") mock_define.assert_called_once_with(xmldoc) - @mock.patch.object(utils, "spawn") - def test_live_migration_hostname_valid(self, mock_spawn): + @mock.patch.object(libvirt_driver.LibvirtDriver, "_live_migration") + def test_live_migration_hostname_valid(self, mock_lm): drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False) drvr.live_migration(self.context, self.test_instance, "host1.example.com", lambda x: x, lambda x: x) - self.assertEqual(1, mock_spawn.call_count) + self.assertEqual(1, mock_lm.call_count) - @mock.patch.object(utils, "spawn") + @mock.patch.object(libvirt_driver.LibvirtDriver, "_live_migration") @mock.patch.object(fake_libvirt_utils, "is_valid_hostname") - def test_live_migration_hostname_invalid(self, mock_hostname, mock_spawn): + def test_live_migration_hostname_invalid(self, mock_hostname, mock_lm): drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False) mock_hostname.return_value = False self.assertRaises(exception.InvalidHostname, diff --git a/nova/virt/libvirt/driver.py b/nova/virt/libvirt/driver.py index 25639bd144ec..abe93114e244 100644 --- a/nova/virt/libvirt/driver.py +++ b/nova/virt/libvirt/driver.py @@ -5436,9 +5436,9 @@ class LibvirtDriver(driver.ComputeDriver): if not libvirt_utils.is_valid_hostname(dest): raise exception.InvalidHostname(hostname=dest) - utils.spawn(self._live_migration, context, instance, dest, - post_method, recover_method, block_migration, - migrate_data) + self._live_migration(context, instance, dest, + post_method, recover_method, block_migration, + migrate_data) def _update_xml(self, xml_str, volume, listen_addrs): xml_doc = etree.fromstring(xml_str)