Run build_and_run_instance in a separate greenthread

If we're doing a lot of build operations, we are using a large portion
of the limited rpc worker pool for long periods of time. Since we may wait
on external services (like neutron or glance) during those times, we could
fully deplete that pool.

This patch makes us spawn a new greenthread for that task and return the
rpc worker to the pool. Due to some funkiness with the stack of decorators,
this breaks the inner function out to an object method, which is probably
good anyway, given its size. This also moves the wrap_instance_event
decorator to the inner function so that the start and stop events properly
demarcate the actual task and not just the (now very quick) RPC call.

Change-Id: Ife712c43c5a61424bc68b2f5ab47cefdb46ac168
Closes-Bug: #1372049
(cherry picked from commit 1d8eddb261)
This commit is contained in:
Dan Smith 2014-10-23 10:10:48 -07:00 committed by Ihar Hrachyshka
parent e98738d55a
commit f9be9467a3
2 changed files with 133 additions and 96 deletions

View File

@ -1959,7 +1959,6 @@ class ComputeManager(manager.Manager):
# callers all pass objects already
@wrap_exception()
@reverts_task_state
@wrap_instance_event
@wrap_instance_fault
def build_and_run_instance(self, context, instance, image, request_spec,
filter_properties, admin_password=None,
@ -1976,100 +1975,111 @@ class ComputeManager(manager.Manager):
for t in requested_networks])
@utils.synchronized(instance.uuid)
def do_build_and_run_instance(context, instance, image, request_spec,
filter_properties, admin_password, injected_files,
requested_networks, security_groups, block_device_mapping,
node=None, limits=None):
def _locked_do_build_and_run_instance(*args, **kwargs):
self._do_build_and_run_instance(*args, **kwargs)
try:
LOG.audit(_('Starting instance...'), context=context,
# NOTE(danms): We spawn here to return the RPC worker thread back to
# the pool. Since what follows could take a really long time, we don't
# want to tie up RPC workers.
utils.spawn_n(_locked_do_build_and_run_instance,
context, instance, image, request_spec,
filter_properties, admin_password, injected_files,
requested_networks, security_groups,
block_device_mapping, node, limits)
@wrap_exception()
@reverts_task_state
@wrap_instance_event
@wrap_instance_fault
def _do_build_and_run_instance(self, context, instance, image,
request_spec, filter_properties, admin_password, injected_files,
requested_networks, security_groups, block_device_mapping,
node=None, limits=None):
try:
LOG.audit(_('Starting instance...'), context=context,
instance=instance)
instance.vm_state = vm_states.BUILDING
instance.task_state = None
instance.save(expected_task_state=
(task_states.SCHEDULING, None))
except exception.InstanceNotFound:
msg = 'Instance disappeared before build.'
LOG.debug(msg, instance=instance)
return
except exception.UnexpectedTaskStateError as e:
LOG.debug(e.format_message(), instance=instance)
return
# b64 decode the files to inject:
decoded_files = self._decode_files(injected_files)
if limits is None:
limits = {}
if node is None:
node = self.driver.get_available_nodes(refresh=True)[0]
LOG.debug('No node specified, defaulting to %s', node,
instance=instance)
instance.vm_state = vm_states.BUILDING
instance.task_state = None
instance.save(expected_task_state=
(task_states.SCHEDULING, None))
except exception.InstanceNotFound:
msg = 'Instance disappeared before build.'
LOG.debug(msg, instance=instance)
return
except exception.UnexpectedTaskStateError as e:
LOG.debug(e.format_message(), instance=instance)
return
# b64 decode the files to inject:
decoded_files = self._decode_files(injected_files)
if limits is None:
limits = {}
if node is None:
node = self.driver.get_available_nodes(refresh=True)[0]
LOG.debug('No node specified, defaulting to %s', node,
instance=instance)
try:
self._build_and_run_instance(context, instance, image,
decoded_files, admin_password, requested_networks,
security_groups, block_device_mapping, node, limits,
filter_properties)
except exception.RescheduledException as e:
LOG.debug(e.format_message(), instance=instance)
retry = filter_properties.get('retry', None)
if not retry:
# no retry information, do not reschedule.
LOG.debug("Retry info not present, will not reschedule",
instance=instance)
self._cleanup_allocated_networks(context, instance,
requested_networks)
compute_utils.add_instance_fault_from_exc(context,
instance, e, sys.exc_info())
self._set_instance_error_state(context, instance)
return
retry['exc'] = traceback.format_exception(*sys.exc_info())
# NOTE(comstud): Deallocate networks if the driver wants
# us to do so.
if self.driver.deallocate_networks_on_reschedule(instance):
self._cleanup_allocated_networks(context, instance,
requested_networks)
instance.task_state = task_states.SCHEDULING
instance.save()
self.compute_task_api.build_instances(context, [instance],
image, filter_properties, admin_password,
injected_files, requested_networks, security_groups,
block_device_mapping)
except (exception.InstanceNotFound,
exception.UnexpectedDeletingTaskStateError):
msg = 'Instance disappeared during build.'
LOG.debug(msg, instance=instance)
try:
self._build_and_run_instance(context, instance, image,
decoded_files, admin_password, requested_networks,
security_groups, block_device_mapping, node, limits,
filter_properties)
except exception.RescheduledException as e:
LOG.debug(e.format_message(), instance=instance)
retry = filter_properties.get('retry', None)
if not retry:
# no retry information, do not reschedule.
LOG.debug("Retry info not present, will not reschedule",
instance=instance)
self._cleanup_allocated_networks(context, instance,
requested_networks)
except exception.BuildAbortException as e:
LOG.exception(e.format_message(), instance=instance)
self._cleanup_allocated_networks(context, instance,
requested_networks)
self._cleanup_volumes(context, instance.uuid,
block_device_mapping, raise_exc=False)
compute_utils.add_instance_fault_from_exc(context, instance,
e, sys.exc_info())
requested_networks)
compute_utils.add_instance_fault_from_exc(context,
instance, e, sys.exc_info())
self._set_instance_error_state(context, instance)
except Exception as e:
# Should not reach here.
msg = _LE('Unexpected build failure, not rescheduling build.')
LOG.exception(msg, instance=instance)
return
retry['exc'] = traceback.format_exception(*sys.exc_info())
# NOTE(comstud): Deallocate networks if the driver wants
# us to do so.
if self.driver.deallocate_networks_on_reschedule(instance):
self._cleanup_allocated_networks(context, instance,
requested_networks)
self._cleanup_volumes(context, instance.uuid,
block_device_mapping, raise_exc=False)
compute_utils.add_instance_fault_from_exc(context, instance,
e, sys.exc_info())
self._set_instance_error_state(context, instance)
do_build_and_run_instance(context, instance, image, request_spec,
filter_properties, admin_password, injected_files,
requested_networks, security_groups, block_device_mapping,
node, limits)
instance.task_state = task_states.SCHEDULING
instance.save()
self.compute_task_api.build_instances(context, [instance],
image, filter_properties, admin_password,
injected_files, requested_networks, security_groups,
block_device_mapping)
except (exception.InstanceNotFound,
exception.UnexpectedDeletingTaskStateError):
msg = 'Instance disappeared during build.'
LOG.debug(msg, instance=instance)
self._cleanup_allocated_networks(context, instance,
requested_networks)
except exception.BuildAbortException as e:
LOG.exception(e.format_message(), instance=instance)
self._cleanup_allocated_networks(context, instance,
requested_networks)
self._cleanup_volumes(context, instance.uuid,
block_device_mapping, raise_exc=False)
compute_utils.add_instance_fault_from_exc(context, instance,
e, sys.exc_info())
self._set_instance_error_state(context, instance)
except Exception as e:
# Should not reach here.
msg = _LE('Unexpected build failure, not rescheduling build.')
LOG.exception(msg, instance=instance)
self._cleanup_allocated_networks(context, instance,
requested_networks)
self._cleanup_volumes(context, instance.uuid,
block_device_mapping, raise_exc=False)
compute_utils.add_instance_fault_from_exc(context, instance,
e, sys.exc_info())
self._set_instance_error_state(context, instance)
def _build_and_run_instance(self, context, instance, image, injected_files,
admin_password, requested_networks, security_groups,

View File

@ -2046,7 +2046,9 @@ class ComputeManagerBuildInstanceTestCase(test.NoDBTestCase):
exc_val=mox.IgnoreArg(), exc_tb=mox.IgnoreArg(),
want_result=False)
def test_build_and_run_instance_called_with_proper_args(self):
@mock.patch('nova.utils.spawn_n')
def test_build_and_run_instance_called_with_proper_args(self, mock_spawn):
mock_spawn.side_effect = lambda f, *a, **k: f(*a, **k)
self.mox.StubOutWithMock(self.compute, '_build_and_run_instance')
self._do_build_instance_update()
self.compute._build_and_run_instance(self.context, self.instance,
@ -2073,9 +2075,11 @@ class ComputeManagerBuildInstanceTestCase(test.NoDBTestCase):
@mock.patch('nova.objects.InstanceActionEvent.event_start')
@mock.patch('nova.objects.Instance.save')
@mock.patch('nova.compute.manager.ComputeManager._build_and_run_instance')
@mock.patch('nova.utils.spawn_n')
def test_build_and_run_instance_with_icehouse_requested_network(
self, mock_build_and_run, mock_save, mock_event_start,
self, mock_spawn, mock_build_and_run, mock_save, mock_event_start,
mock_event_finish):
mock_spawn.side_effect = lambda f, *a, **k: f(*a, **k)
mock_save.return_value = self.instance
self.compute.build_and_run_instance(self.context, self.instance,
self.image, request_spec={},
@ -2092,7 +2096,18 @@ class ComputeManagerBuildInstanceTestCase(test.NoDBTestCase):
self.assertEqual('10.0.0.1', str(requested_network.address))
self.assertEqual('fake_port_id', requested_network.port_id)
def test_build_abort_exception(self):
@mock.patch('nova.utils.spawn_n')
def test_build_abort_exception(self, mock_spawn):
def fake_spawn(f, *args, **kwargs):
# NOTE(danms): Simulate the detached nature of spawn so that
# we confirm that the inner task has the fault logic
try:
return f(*args, **kwargs)
except Exception:
pass
mock_spawn.side_effect = fake_spawn
self.mox.StubOutWithMock(self.compute, '_build_and_run_instance')
self.mox.StubOutWithMock(self.compute, '_cleanup_allocated_networks')
self.mox.StubOutWithMock(self.compute, '_cleanup_volumes')
@ -2128,7 +2143,9 @@ class ComputeManagerBuildInstanceTestCase(test.NoDBTestCase):
block_device_mapping=self.block_device_mapping, node=self.node,
limits=self.limits)
def test_rescheduled_exception(self):
@mock.patch('nova.utils.spawn_n')
def test_rescheduled_exception(self, mock_spawn):
mock_spawn.side_effect = lambda f, *a, **k: f(*a, **k)
self.mox.StubOutWithMock(self.compute, '_build_and_run_instance')
self.mox.StubOutWithMock(self.compute, '_set_instance_error_state')
self.mox.StubOutWithMock(self.compute.compute_task_api,
@ -2191,7 +2208,9 @@ class ComputeManagerBuildInstanceTestCase(test.NoDBTestCase):
self.block_device_mapping, self.node,
self.limits, self.filter_properties)
def test_rescheduled_exception_without_retry(self):
@mock.patch('nova.utils.spawn_n')
def test_rescheduled_exception_without_retry(self, mock_spawn):
mock_spawn.side_effect = lambda f, *a, **k: f(*a, **k)
self.mox.StubOutWithMock(self.compute, '_build_and_run_instance')
self.mox.StubOutWithMock(compute_utils, 'add_instance_fault_from_exc')
self.mox.StubOutWithMock(self.compute, '_set_instance_error_state')
@ -2224,7 +2243,9 @@ class ComputeManagerBuildInstanceTestCase(test.NoDBTestCase):
block_device_mapping=self.block_device_mapping, node=self.node,
limits=self.limits)
def test_rescheduled_exception_do_not_deallocate_network(self):
@mock.patch('nova.utils.spawn_n')
def test_rescheduled_exception_do_not_deallocate_network(self, mock_spawn):
mock_spawn.side_effect = lambda f, *a, **k: f(*a, **k)
self.mox.StubOutWithMock(self.compute, '_build_and_run_instance')
self.mox.StubOutWithMock(self.compute.driver,
'deallocate_networks_on_reschedule')
@ -2258,7 +2279,9 @@ class ComputeManagerBuildInstanceTestCase(test.NoDBTestCase):
block_device_mapping=self.block_device_mapping, node=self.node,
limits=self.limits)
def test_rescheduled_exception_deallocate_network(self):
@mock.patch('nova.utils.spawn_n')
def test_rescheduled_exception_deallocate_network(self, mock_spawn):
mock_spawn.side_effect = lambda f, *a, **k: f(*a, **k)
self.mox.StubOutWithMock(self.compute, '_build_and_run_instance')
self.mox.StubOutWithMock(self.compute.driver,
'deallocate_networks_on_reschedule')
@ -2322,7 +2345,9 @@ class ComputeManagerBuildInstanceTestCase(test.NoDBTestCase):
self._instance_action_events()
self.mox.ReplayAll()
self.compute.build_and_run_instance(self.context, self.instance,
with mock.patch('nova.utils.spawn_n') as mock_spawn:
mock_spawn.side_effect = lambda f, *a, **k: f(*a, **k)
self.compute.build_and_run_instance(self.context, self.instance,
self.image, request_spec={},
filter_properties=self.filter_properties,
injected_files=self.injected_files,
@ -2505,7 +2530,9 @@ class ComputeManagerBuildInstanceTestCase(test.NoDBTestCase):
self.assertEqual(network_info, inst.info_cache.network_info)
inst.save.assert_called_with(expected_task_state=task_states.SPAWNING)
def test_reschedule_on_resources_unavailable(self):
@mock.patch('nova.utils.spawn_n')
def test_reschedule_on_resources_unavailable(self, mock_spawn):
mock_spawn.side_effect = lambda f, *a, **k: f(*a, **k)
reason = 'resource unavailable'
exc = exception.ComputeResourcesUnavailable(reason=reason)