Limit parallel live migrations in progress

This patch extends the previous one[1] to allow limiting the total number of parallel
builds that nova-compute will attempt to cover live migrations. Since we can now
block immediately on the semaphore, this also implements the behavior we have in
build, which spawns a new thread for the process so that we don't starve our
RPC workers waiting on the semaphore. In reality, live migrations take a long time,
so this was something we should have already had.

Further, as soon as we receive the request to do the live migration, we mark the
migration object as status='queued' to indicate that it's waiting for its turn
on the compute node. Once we're given a slot to run, the normal status='preparing'
will be set. This will allow an operator to monitor the status of queued and
running migrations.

This includes a change to the libvirt driver to avoid spawning another thread
for the live migrations process. That makes it synchronous from the perspective
of compute manager, and in line with all the other drivers that support the
operation. Since compute manager now spawns the thread, libvirt is unaffected
and the other drivers avoid potentially starving the RPC worker pool as well.

[1] Commit 5a542e7706

DocImpact: Adds a new configuration variable to limit parallel live migrations.
           Zero means "unlimited" and nonzero means "this many in parallel".

Closes-Bug: #1478108
Change-Id: Ia8a796372746b7fc75485dc2e663f270dbd5893a
This commit is contained in:
Dan Smith 2015-08-11 12:30:14 -07:00
parent 65381d6a42
commit 2c0a306632
5 changed files with 112 additions and 25 deletions

View File

@ -128,6 +128,14 @@ compute_opts = [
cfg.IntOpt('max_concurrent_builds',
default=10,
help='Maximum number of instance builds to run concurrently'),
cfg.IntOpt('max_concurrent_live_migrations',
default=1,
help='Maximum number of live migrations to run concurrently. '
'This limit is enforced to avoid outbound live migrations '
'overwhelming the host/network and causing failures. It '
'is not recommended that you change this unless you are '
'very sure that doing so is safe and stable in your '
'environment.'),
cfg.IntOpt('block_device_allocate_retries',
default=60,
help='Number of times to retry block device'
@ -686,6 +694,11 @@ class ComputeManager(manager.Manager):
CONF.max_concurrent_builds)
else:
self._build_semaphore = compute_utils.UnlimitedSemaphore()
if max(CONF.max_concurrent_live_migrations, 0) != 0:
self._live_migration_semaphore = eventlet.semaphore.Semaphore(
CONF.max_concurrent_live_migrations)
else:
self._live_migration_semaphore = compute_utils.UnlimitedSemaphore()
super(ComputeManager, self).__init__(service_name="compute",
*args, **kwargs)
@ -4925,22 +4938,8 @@ class ComputeManager(manager.Manager):
return pre_live_migration_data
@wrap_exception()
@wrap_instance_event
@wrap_instance_fault
def live_migration(self, context, dest, instance, block_migration,
migration, migrate_data):
"""Executing live migration.
:param context: security context
:param instance: a nova.objects.instance.Instance object
:param dest: destination host
:param block_migration: if true, prepare for block migration
:param migration: an nova.objects.Migration object
:param migrate_data: implementation specific params
"""
def _do_live_migration(self, context, dest, instance, block_migration,
migration, migrate_data):
# NOTE(danms): Remove these guards in v5.0 of the RPC API
if migration:
# NOTE(danms): We should enhance the RT to account for migrations
@ -4995,6 +4994,39 @@ class ComputeManager(manager.Manager):
migration.status = 'failed'
migration.save()
@wrap_exception()
@wrap_instance_event
@wrap_instance_fault
def live_migration(self, context, dest, instance, block_migration,
migration, migrate_data):
"""Executing live migration.
:param context: security context
:param dest: destination host
:param instance: a nova.objects.instance.Instance object
:param block_migration: if true, prepare for block migration
:param migration: an nova.objects.Migration object
:param migrate_data: implementation specific params
"""
# NOTE(danms): Remove these guards in v5.0 of the RPC API
if migration:
migration.status = 'queued'
migration.save()
def dispatch_live_migration(*args, **kwargs):
with self._live_migration_semaphore:
self._do_live_migration(*args, **kwargs)
# NOTE(danms): We spawn here to return the RPC worker thread back to
# the pool. Since what follows could take a really long time, we don't
# want to tie up RPC workers.
utils.spawn_n(dispatch_live_migration,
context, dest, instance,
block_migration, migration,
migrate_data)
def _live_migration_cleanup_flags(self, block_migration, migrate_data):
"""Determine whether disks or instance path need to be cleaned up after
live migration (at source on success, at destination on rollback)

View File

@ -3866,3 +3866,56 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase):
self.flags(enabled=True, group=console)
self.assertTrue(self.compute._consoles_enabled())
self.flags(enabled=False, group=console)
@mock.patch('nova.utils.spawn_n')
@mock.patch('nova.compute.manager.ComputeManager.'
'_do_live_migration')
def _test_max_concurrent_live(self, mock_lm, mock_spawn):
mock_spawn.side_effect = lambda f, *a, **k: f(*a, **k)
@mock.patch('nova.objects.Migration.save')
def _do_it(mock_mig_save):
instance = objects.Instance(uuid=str(uuid.uuid4()))
migration = objects.Migration()
self.compute.live_migration(self.context,
mock.sentinel.dest,
instance,
mock.sentinel.block_migration,
migration,
mock.sentinel.migrate_data)
self.assertEqual('queued', migration.status)
migration.save.assert_called_once_with()
with mock.patch.object(self.compute,
'_live_migration_semaphore') as mock_sem:
for i in (1, 2, 3):
_do_it()
self.assertEqual(3, mock_sem.__enter__.call_count)
def test_max_concurrent_live_limited(self):
self.flags(max_concurrent_live_migrations=2)
self._test_max_concurrent_live()
def test_max_concurrent_live_unlimited(self):
self.flags(max_concurrent_live_migrations=0)
self._test_max_concurrent_live()
def test_max_concurrent_live_semaphore_limited(self):
self.flags(max_concurrent_live_migrations=123)
self.assertEqual(
123,
manager.ComputeManager()._live_migration_semaphore.balance)
def test_max_concurrent_live_semaphore_unlimited(self):
self.flags(max_concurrent_live_migrations=0)
compute = manager.ComputeManager()
self.assertEqual(0, compute._live_migration_semaphore.balance)
self.assertIsInstance(compute._live_migration_semaphore,
compute_utils.UnlimitedSemaphore)
def test_max_concurrent_live_semaphore_negative(self):
self.flags(max_concurrent_live_migrations=-2)
compute = manager.ComputeManager()
self.assertEqual(0, compute._live_migration_semaphore.balance)
self.assertIsInstance(compute._live_migration_semaphore,
compute_utils.UnlimitedSemaphore)

View File

@ -760,7 +760,9 @@ class Domain(object):
return 0
def jobInfo(self):
return []
# NOTE(danms): This is an array of 12 integers, so just report
# something to avoid an IndexError if we look at this
return [0] * 12
def jobStats(self, flags=0):
return {}

View File

@ -11533,18 +11533,18 @@ class LibvirtConnTestCase(test.NoDBTestCase):
dstfile, "qcow2")
mock_define.assert_called_once_with(xmldoc)
@mock.patch.object(utils, "spawn")
def test_live_migration_hostname_valid(self, mock_spawn):
@mock.patch.object(libvirt_driver.LibvirtDriver, "_live_migration")
def test_live_migration_hostname_valid(self, mock_lm):
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)
drvr.live_migration(self.context, self.test_instance,
"host1.example.com",
lambda x: x,
lambda x: x)
self.assertEqual(1, mock_spawn.call_count)
self.assertEqual(1, mock_lm.call_count)
@mock.patch.object(utils, "spawn")
@mock.patch.object(libvirt_driver.LibvirtDriver, "_live_migration")
@mock.patch.object(fake_libvirt_utils, "is_valid_hostname")
def test_live_migration_hostname_invalid(self, mock_hostname, mock_spawn):
def test_live_migration_hostname_invalid(self, mock_hostname, mock_lm):
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)
mock_hostname.return_value = False
self.assertRaises(exception.InvalidHostname,

View File

@ -5436,9 +5436,9 @@ class LibvirtDriver(driver.ComputeDriver):
if not libvirt_utils.is_valid_hostname(dest):
raise exception.InvalidHostname(hostname=dest)
utils.spawn(self._live_migration, context, instance, dest,
post_method, recover_method, block_migration,
migrate_data)
self._live_migration(context, instance, dest,
post_method, recover_method, block_migration,
migrate_data)
def _update_xml(self, xml_str, volume, listen_addrs):
xml_doc = etree.fromstring(xml_str)